SparkStreaming——实例3:带状态操作

带状态操作,updateStateByKey

实例1、实例2只是把这5s中产生的数据wordcount,也就是每5s统计一次这5s的数据,

而如果我们需要统计从一开始到现在产生的数据,就需要updateStateByKey

 

updateStateByKey操作,可以让我们为每个key维护一份state,并持续不断的更新该state。

1、首先,要定义一个state,可以是任意的数据类型;

2、其次,要定义state更新函数——指定一个函数如何使用之前的state和新值来更新state。

对于每个batch,Spark都会为每个之前已经存在的key去应用一次state更新函数,无论这个key在batch中是否有新的数据。如果state更新函数返回none,那么key对应的state就会被删除。

当然,对于每个新出现的key,也会执行state更新函数。

注意,updateStateByKey操作,要求必须开启Checkpoint机制。

 

 

 

目标:

获取netcat产生的数据进行实时的分析

 

启动hdfs:

start-all.sh

启动netcat:

nc -l -p 9999

 

代码:

import org.apache.spark.{HashPartitioner, SparkConf}

import org.apache.spark.streaming.{Seconds, StreamingContext}

object StreamingWC {

/**

* 实现按批次累加的功能,wordcount

*/

def main(args: Array[String]): Unit = {

// local[2]:至少需要两个线程,一个接收,一个执行

val conf = new SparkConf().setAppName("streamingWC").setMaster("local[2]")

// 批次间隔5s

val ssc = new StreamingContext(conf,Seconds(5))

// 设置检查点

ssc.checkpoint("hdfs://master:9000/ck-20190428")

// 创建DStream,接受9999端口数据

val dStream = ssc.socketTextStream("192.168.32.128", 9999)

// wordcount的map

val tuples = dStream.flatMap(_.split(" ")).map((_,1))

// 按批次累加需要调用方法updateStateByKey方法

val res = tuples.updateStateByKey(func, new HashPartitioner(ssc.sparkContext.defaultParallelism),false)

res.print()

ssc.start() #开始计算

ssc.awaitTermination() 等待计算停止

}

 

val func = (it: Iterator[(String, Seq[Int], Option[Int])]) => {

it.map(t => {

(t._1, t._2.sum + t._3.getOrElse(0))

})

}

}

 

 

结果:

SparkStreaming——实例3:带状态操作