Spark Streaming整合Flume统计词频
Spark Streaming整合Flume统计词频
采用命令行模式,配置flume的配置文件,如下:
开发sparkstreaming程序,如下:
from pyspark.streaming import StreamingContext
from pyspark import SparkContext
from pyspark.streaming.flume import FlumeUtils
‘’‘Spark Streaming整合Flume统计词频’’’
#sc = SparkContext(master=“local[2]”,appName=“FlumeNetWorkWordCount”)
ssc = StreamingContext(sc,5)
flumeStreams = FlumeUtils.createStream(ssc, “hadoop001”, 41414)
counts = flumeStreams.map(lambda x:x[1])
.flatMap(lambda line:line.split(","))
.map(lambda word:(word,1))
.reduceByKey(lambda a,b:a+b)
counts.pprint()
ssc.start()
ssc.awaitTermination()
因为我们采用的数据读取方式是push,所以需要先启动pyspark,将上述代码复制到命令行,然后启动flume:
./flume-ng agent
–name simple-agent
–conf $FLUME_HOME/conf
–conf-file $FLUME_HOME/conf/flume-push-streaming.conf
-Dflume.root.logger=INFO,console &
使用telnet,发送数据
查看词频统计如下:
程序能够运行,实现我们想要的功能。
注:如果程序报如下错误:
需要去maven仓库下载相应的jar包依赖,链接:
https://search.maven.org/
搜索自己需要的jar包版本,wget到spark的安装目录jars下即可。