Spark Streaming整合Flume统计词频

Spark Streaming整合Flume统计词频

采用命令行模式,配置flume的配置文件,如下:
Spark Streaming整合Flume统计词频
开发sparkstreaming程序,如下:
from pyspark.streaming import StreamingContext
from pyspark import SparkContext
from pyspark.streaming.flume import FlumeUtils

‘’‘Spark Streaming整合Flume统计词频’’’

#sc = SparkContext(master=“local[2]”,appName=“FlumeNetWorkWordCount”)
ssc = StreamingContext(sc,5)

flumeStreams = FlumeUtils.createStream(ssc, “hadoop001”, 41414)
counts = flumeStreams.map(lambda x:x[1])
.flatMap(lambda line:line.split(","))
.map(lambda word:(word,1))
.reduceByKey(lambda a,b:a+b)

counts.pprint()

ssc.start()
ssc.awaitTermination()

因为我们采用的数据读取方式是push,所以需要先启动pyspark,将上述代码复制到命令行,然后启动flume:
./flume-ng agent
–name simple-agent
–conf $FLUME_HOME/conf
–conf-file $FLUME_HOME/conf/flume-push-streaming.conf
-Dflume.root.logger=INFO,console &

使用telnet,发送数据
Spark Streaming整合Flume统计词频
查看词频统计如下:
Spark Streaming整合Flume统计词频
程序能够运行,实现我们想要的功能。

注:如果程序报如下错误:
Spark Streaming整合Flume统计词频
需要去maven仓库下载相应的jar包依赖,链接:
https://search.maven.org/
搜索自己需要的jar包版本,wget到spark的安装目录jars下即可。