Spark Streaming和Storm对比
|
Spark Streaming |
Storm |
Data sources |
HDFS, HBase, Cassandra, Kafka |
HDFS, Base, Cassandra, Kafka |
Resource Manager |
YARN, Mesos |
YARN, Mesos |
Latency |
Few seconds |
<1 second |
Fault tolerance(every recourd processed) |
Exactly once |
At least once |
Reliability |
Imporoved reliability(Spark + YARN) |
Guarantees on data loss(Storm + Kafka) |
区别:
1.Latency
2.Fault tolerance
3.Reliability
Storm架构
Supervisor:从节点,物理机器
Worker: Supervisor的进程
executor: Worker的线程
Spark Streaming架构:
worker: 物理节点
executor: 进程
------------------------------
Data Source:
- SQL
- NOSQL
- Log Data
- Streaming Data
Ingestion:
- Flume
- Sqoop
- NFS
- Kafka
Processing:
- MapReduce
- Spark
- Storm
- Drill
- Mahout
- Ooize
- Hive
- Pig
- HBase
- Elasticsearch
- Slor
Visualization:
- WebTier
- Banana
- Kibana
- Data Warehouse
ELK:
- E: Elasticsearch
- L: Logstash
- K: Kibana
Alluxio(Tachyon)<br>
Real time Processing:
Storm/Trident/Spark Streaming/Samza/Flink<br>
Latency:<br>
Spark: Few Seconds<br>
Storm: <1 Seconds<br>
Fault tolerance:<br>
Spark: Exactly once<br>
Storm: at least once/at most once<br>
Trident:Exactly once<br>
Reliability:<br>
Spark: Improved reliability (cache)<br>
Storm: Guarantees no data loss<br>
Storm:<br>
- Niumus
- Zookeeper
- Supervisor
Worker: 进程<br>
Executor:线程<br>
Task: Bolt/Spout<br>
Spark Streaming:<br>
- Cluster Manager<br>
Mesos<br>
Yarn<br>
- Executor
Task<br>
cache<br>
Spark Streaming: DStream<br>
```
val conf = new SparkConf().setAppName().setMaster("local")
val ssc = new StreamingContext(conf, Seconds(1))
val lines = ssc.socketTextStream("spark001", 9999)
val lines2 = ssc.textFileStream("hdfs://spark:9000/wordcount_dir");
```
Streaming Window:<br>
reduceByKeyAndWindow(f(), Durations.seconds(60), Durations.seconds(10))<br>
每隔10s计算前60s的RDD。<br>
Kafka + Spark Streaming + HBase<br>
```
val lines = KafkaUtils.createStream(
ssc,
"192.168.80.201:2181,192.168.80.202:2181,192.168.80.203:2181", //zookeeper
"wordcountGroup",
"topicThreadMap"
)
```
```
topics.add("topic1")
KafkaParms.put("meta.broker.list",
192.168.80.201:9092,192.168.80.202:9092,192.168.80.203:9092); //broker.list
val lines = KafkaUtils.createDirectStream(
ssc,
String.class //key类型
String.class //value类型
StringDecoder.class, //解码器
StringDecode.class,
KafkaParms,
topics
)
```
jar包提交时 --master 会覆盖代码中的.setMaster("local")<br>
hdfs dfs -copyFromLocal spakr.txt /wordcount_dir<br>