畅聊Spark(四)Streaming
为什么要有Spark Streaming?
建议先学习ZK、Kafka、Storm,可在博客文章找到。
什么是Spark Streaming?
Spark Streaming是Spark中的一个组件,基于Spark Core进行构建,用于对流式数据处理,类似Storm。
Spark Streaming采用微批次架构,对于整个流式计算来说,数据流可以想象成为水流,微批次架构的意思就是将水流按照用户设定的时间间隔分为多个水流段,一个段的水会在Spark转换为一个RDD,所以对水流的操作也就是对这些分割后的RDD进行单独的操作,每个RDD操作可以认为就是一个小的批次处理(离线处理)。
和Spark基于RDD的概念很相似,SparkStreaming使用离散化流(Discretized Stream)作为抽象表示,简称DStream。
DStream是类似RDD和DataFream针对流式计算的抽象类,在源码中DStream通过HashMap来保存所管理的数据,K是RDD的数据流时间,V是保存数据流的RDD。
DStream是随着时间的推移而受到数据的序列,在内部,每个时间区间收到的数据都作为RDD存在,而Dtream是由这些RDD所组成的序列(离散化),即代表一个连续的数据流。
DStream可以从各种输入源创建,比如Flume、Kafka或HDFS、创建出来的DStream支持两种操作,一种是转换操作(Transformation),此操作会生成一个新的DStream,另一种是输出操作(Output Operation),可以将数据写入外部存储系统中。
特点
1.能够和Spark Core、Spark SQL来进行混合式编程。
2.能够接收Kafka、Flume、HDFS、Twitter等数据。
3.分为无状态转换(前后处理的数据没有关系)和有状态转换(前后数据有关联关系)
4.具有高吞吐量、容错能力强
5.可保证数据被消费,至少一次
Spark Streaming、Storm、Flink对比
对比项 |
Storm / JStorm |
Spark Streaming |
Flink |
开发语言 |
Scala |
Clojure |
Java |
实时性 |
高 |
有延迟 |
高 |
吞吐量 |
低 |
高 |
高 |
离线/实时 |
实时 |
离线+实时 |
离线+实时 |
算子 |
单一 |
丰富 |
丰富 |
机器学习 |
无 |
有 |
无 |
图计算 |
无 |
有 |
无 |
编程模型 |
Spout/Bolt |
DStream |
DataStream |
|
|
|
|
IDEA编程
Spark Streaming WordCount
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} import org.apache.spark.streaming.{Milliseconds, StreamingContext} import org.apache.spark.{SparkConf, SparkContext} object StreamingWordCount { def main(args: Array[String]): Unit = { //SparkStreaming有2中Task,一种是receiver,一种是calculate, //所以只有一个线程的话,会出现一直receiver,calculate一直进不去,所以最少要2个线程 //创建配置 //val conf: SparkConf = new SparkConf().setAppName("SparkStreaming").setMaster("local") val conf: SparkConf = new SparkConf().setAppName("SparkStreaming").setMaster("local[2]") //程序执行入口 val sc: SparkContext = new SparkContext(conf) //离线任务是创建SparkContext,现在要实现实时计算,要使用StreamingContext //StreamingContext是对SparkContext的包装 //第二个参数是批次产生的时间间隔[5秒] val ssc: StreamingContext = new StreamingContext(sc,Milliseconds(5000)) //有了StreamingContext就可以创建SparkStreaming的抽象DStream //从一个Socket端口读取数据 val lines:ReceiverInputDStream[String] = ssc.socketTextStream("hadoop100",8888) //对DStream进行操作,操作这个抽象(代理、描述)就像操作本地集合一样 //这是一个高度封装的抽象 val words:DStream[String] = lines.flatMap(_.split(" ")) //将单词和1组合在一起 val wordAndOne:DStream[(String,Int)] = words.map((_,1)) //聚合 val reduced:DStream[(String,Int)] = wordAndOne.reduceByKey(_+_) //打印结果 reduced.print() //开启Spark程序 ssc.start() //等待优雅的退出 //退出时,不在接收请求,将当前已接收的请求处理完就退出 ssc.awaitTermination() //服务器端使用 nc 来做模拟 } }
|
Spark Streaming StatefulWordCount
import org.apache.spark.{HashPartitioner, SparkConf} import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} import org.apache.spark.streaming.kafka.KafkaUtils import org.apache.spark.streaming.{Seconds, StreamingContext} /** * 累加的WordCount */ object StatefulWordCount { /** * 第一个参数:聚合的key,就是单词 * 第二个参数:当前批次产生批次该单词在每一个分区出现的次数 * 第三个参数:初始值或累加中间结果,初始可能为空 */ val updateFunc = (iterator:Iterator[(String,Seq[Int],Option[Int])]) => { iterator.map(t => (t._1,t._2.sum + t._3.getOrElse(0))) } def main(args: Array[String]): Unit = { //创建配置 val conf: SparkConf = new SparkConf().setAppName("StatefulWordCount").setMaster("local[*]") //创建SparkStreaming入口,内部也是实现SparkContext val ssc = new StreamingContext(conf,Seconds(5)) //如果要使用更新历史数据(累加),就要把中间结果保存起来,一般会弄到redis //前面的操作都是每次拿到新数据,后面的数据都给干掉了。 //而且如果程序关闭了,重新启动,之前的数据全部就丢失了, // 如果持久化下来就不用担心了,持久化后每次再次加载回来就好 ssc.checkpoint("e:/spark/ck") //创建DStream,需要kafkaDStream val zkQuprum = "hadoop100:2181,hadoop101:2181,hadoop102:2181" val groupId = "g1" //消费组ID val topic = Map[String,Int]("levi" -> 1) //消费主题 //高级的方式,但是效率比较低。开发一般不用 val data:ReceiverInputDStream[(String,String)] = KafkaUtils.createStream(ssc,zkQuprum,groupId,topic) //对数据进行处理 //ReceiverInputDStream[(String,String)]装的是一个元组,Key是写入的key,value是实际写入的内容 val lines: DStream[String] = data.map(_._2) //切分压平 val words: DStream[String] = lines.flatMap(_.split(" ")) //将单词和1组合 val wordAndOne: DStream[(String, Int)] = words.map((_,1)) //聚合 val reduced: DStream[(String, Int)] = wordAndOne.updateStateByKey(updateFunc,new HashPartitioner(ssc.sparkContext.defaultParallelism),true) //打印结果Action reduced.print() //启动程序 ssc.start() //优雅退出 ssc.awaitTermination() } }
|
Spark Streaming 整合 Kafka
import org.apache.spark.SparkConf import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} import org.apache.spark.streaming.kafka.KafkaUtils import org.apache.spark.streaming.{Seconds, StreamingContext} object KafkaWordCount { def main(args: Array[String]): Unit = { //配置 //一个是Rceiver,一个是calcuate,最少需要2线程 val conf = new SparkConf().setAppName("SparkStreaming").setMaster("local[2]") //应用程序入口 /* 底层还是:SparkContext private[streaming] def createNewSparkContext(conf: SparkConf): SparkContext = { new SparkContext(conf) } */ //5秒钟一次 val ssc: StreamingContext = new StreamingContext(conf,Seconds(5)) //创建DStream,需要KafkaDStream val zkQuperum: String = "hadoop100:2181,hadoop101:2181,hadoop102:2181" val groupId = "g1" //kafka消费组 val topic = Map[String,Int]("levi" -> 1) //消费的主题 //高级的方式创建,但是效率比较低,开发一般不用 val data:ReceiverInputDStream[(String,String)] = KafkaUtils.createStream(ssc,zkQuperum,groupId,topic) //对接收到的数据进行处理 //Kafka的ReceiverInputDStream[(String,String)]装的是一个元组,key是写入的key,value是实际写入的内容 val lines:DStream[String] = data.map(_._2) //对DStream进行操作,操作这个抽象(代理、描述),就像操作一个本地集合 //切分压平 val words:DStream[String] = lines.flatMap(_.split(" ")) //把单词和1组合在一起 val wordAndOne: DStream[(String, Int)] = words.map((_,1)) //聚合 val reduced: DStream[(String, Int)] = wordAndOne.reduceByKey(_+_) //打印结果 reduced.print() //启动Spark程序 ssc.start() //优雅的退出 ssc.awaitTermination() //出现了循环报错,就检查 C:\Windows\System32\drivers\etc\hosts文件和Kafka服务器的主机名映射 } }
|
Spark Streaming 整合 Kafka 管理偏移量(效率高,生产环境使用)
import kafka.common.TopicAndPartition import kafka.message.MessageAndMetadata import kafka.serializer.StringDecoder import kafka.utils.{ZKGroupTopicDirs, ZkUtils} import org.I0Itec.zkclient.ZkClient import org.apache.spark.SparkConf import org.apache.spark.streaming.dstream.{DStream, InputDStream} import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils, OffsetRange} import org.apache.spark.streaming.{Duration, StreamingContext} /** * 记录偏移量 */ object KafkaDirectWordCount { def main(args: Array[String]): Unit = { //创建Spark配置 val conf: SparkConf = new SparkConf().setAppName("KafkaDirectWordCount").setMaster("local[*]") //创建SparkStreaming的入口,底层还是跑了SparkContext val ssc: StreamingContext = new StreamingContext(conf,Duration(5)) //指定消费者组Id val groupId = "g1" val topic = "wordcount" // 消费主题 //指定kafka的broker地址(SparkStream的Task直连到Kafka分区上,用更底层的Api消费,效率更高) val brokerList = "hadoop100:9092,hadoop101:9092,hadoop102:9092" //指定ZK地址,后期更新消费的偏移量时使用 (以后可以用Redice来存储偏移量) val zkQuprum = "hadoop100:2181,hadoop101:2181,hadoop102:2181" //创建Stream时,使用的topic集合,SparkStreaming可以同时消费多个topic val topics:Set[String] = Set(topic) //创建ZKGroup和TopicDirs对象,其实就是指定向ZK中写入数据的目录,用于记录偏移量 val topicDirs = new ZKGroupTopicDirs(groupId,topic) //获取到ZK中的路径,因为ZK是用于存储记录的偏移量的 val zkTopicPath = s"${topicDirs.consumerOffsetDir}" //准备kafka的参数 val kafkaParams = Map( "deserializer.encoding" -> "UTF-8", //默认编码就是UTF-8 "metadata.broker.list" -> brokerList, "group.id" -> groupId, //从头开始读取数据 "auto.offset.reset" -> kafka.api.OffsetRequest.SmallestTimeString ) //ZK的Host和IP,创建一个client,用于更新偏移量 val zkClient = new ZkClient(zkQuprum) //查询该路径下是否有子节点(默认是有节点为我们保存不同partition时生成的) val children = zkClient.countChildren(zkTopicPath) var kafkaStream:InputDStream[(String,String)] = null //如果zk中保存了offset,则会利用这个offset作为KafkaStream的起始位置 var fromOffsets: Map[TopicAndPartition, Long] = Map() //如果保存过 if(children > 0){ for (i <- 0 until children){ val partitionOffset: String = zkClient.readData[String](s"$zkTopicPath/${i}") val tp = TopicAndPartition(topic,i) //将同partition对应的offset增加到fromOffsets中 fromOffsets += (tp -> partitionOffset.toLong) } //这个会把kafka的消息进行transform,最终kafka的数据会变成(topic_name,message) val messageHandler = (mmd: MessageAndMetadata[String,String]) => (mmd.key,mmd.message) //通过KafkaUtils创建直连的DStream(fromOffsets参数的作用是:按照前面计算好了的偏移量继续消费数据) //key value key解码方式 value解码方式 kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](ssc, kafkaParams, fromOffsets, messageHandler) }else{ //如果为保存,就根据kafkaParmas的配置使用最新的(largest)或最旧的(smallest) offset //key value key解码方式 value解码方式 kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics) } //偏移量范围 var offsetRanges = Array[OffsetRange]() //直连的方式只有KafkaDStream的RDD才能获取偏移量,那么就不能调用到DStream的Transformtion //所以只能在kafkaStream调用foreachRDD,来获取RDD的偏移量,然后对RDD进行操作 //从Kafka读取的消息,DStream的Tranform方法可以将当前批次的RDD获取出来 //该transform方法计算获取到当前批次RDD的偏移量 val transform:DStream[(String,String)] = kafkaStream.transform{rdd => offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges rdd } val message: DStream[String] = transform.map(_._2) //一次迭代DStream中的RDD message.foreachRDD{ rdd => //对RDD操作,触发Action //相当于写完一个分区的数据 rdd.foreachPartition(partition => { partition.foreach(x => { println(x) }) }) //该分区的偏移量写到ZK for(o <- offsetRanges) { val zkPath = s"${topicDirs.consumerOffsetDir}/${o.partition}" //将该partition的offset保存到ZK,更新 ZkUtils.updatePersistentPath(zkClient,zkPath,o.untilOffset.toString) } } //开始 ssc.start() //退出 ssc.awaitTermination() } }
|