SparkStreaming——实例3:带状态操作
带状态操作,updateStateByKey
实例1、实例2只是把这5s中产生的数据wordcount,也就是每5s统计一次这5s的数据,
而如果我们需要统计从一开始到现在产生的数据,就需要updateStateByKey
updateStateByKey操作,可以让我们为每个key维护一份state,并持续不断的更新该state。
1、首先,要定义一个state,可以是任意的数据类型;
2、其次,要定义state更新函数——指定一个函数如何使用之前的state和新值来更新state。
对于每个batch,Spark都会为每个之前已经存在的key去应用一次state更新函数,无论这个key在batch中是否有新的数据。如果state更新函数返回none,那么key对应的state就会被删除。
当然,对于每个新出现的key,也会执行state更新函数。
注意,updateStateByKey操作,要求必须开启Checkpoint机制。
目标:
获取netcat产生的数据进行实时的分析
启动hdfs:
start-all.sh
启动netcat:
nc -l -p 9999
代码:
import org.apache.spark.{HashPartitioner, SparkConf}
import org.apache.spark.streaming.{Seconds, StreamingContext}
object StreamingWC {
/**
* 实现按批次累加的功能,wordcount
*/
def main(args: Array[String]): Unit = {
// local[2]:至少需要两个线程,一个接收,一个执行
val conf = new SparkConf().setAppName("streamingWC").setMaster("local[2]")
// 批次间隔5s
val ssc = new StreamingContext(conf,Seconds(5))
// 设置检查点
ssc.checkpoint("hdfs://master:9000/ck-20190428")
// 创建DStream,接受9999端口数据
val dStream = ssc.socketTextStream("192.168.32.128", 9999)
// wordcount的map
val tuples = dStream.flatMap(_.split(" ")).map((_,1))
// 按批次累加需要调用方法updateStateByKey方法
val res = tuples.updateStateByKey(func, new HashPartitioner(ssc.sparkContext.defaultParallelism),false)
res.print()
ssc.start() #开始计算
ssc.awaitTermination() 等待计算停止
}
val func = (it: Iterator[(String, Seq[Int], Option[Int])]) => {
it.map(t => {
(t._1, t._2.sum + t._3.getOrElse(0))
})
}
}
结果: