Spark之Spark Streaming处理文件流数据
Spark之Spark Streaming处理文件流数据
创建相关文件路径
先设置一个用于保存文件的路径,创建的路径地址为 /usr/local/spark/mycode/streaming/logfile
导入相关类
import org.apache.spark.streaming._
创建一个StreamingContext对象
val ssc=new StreamingContext(sc,Seconds(20))
数据流是每20秒进行一次切割
对StreamingContext对对象调用 .textFileStream()方法生成一个文件流类型的InputStream
val lines=ssc.textFileStream("file:///usr/local/spark/mycode/streaming/logfile")
文件目录监控地址为 /usr/local/spark/mycode/streaming/logfile
编写流计算过程
val words=lines.flatMap(_.split(" "))
val wordCounts=words.map(x=>(x,1)).reduceByKey(_+_)
wordCounts.print()
很眼熟的代码,这是spark词频统计的代码
启动流计算
ssc.start()
运行结果如下
刚开始运行的时候文件路径下并不包含任何文件,在一定时间后常见了一个文本文档,运行结果如图所示,只要文件路径下有文件被创建后,程序就会自动运行并计算结果