Spark Streaming整合Flume(Pull-based Approach)统计词频
Spark Streaming整合Flume(Pull-based Approach)统计词频
查看spark官网:
http://spark.apache.org/docs/2.2.0/streaming-flume-integration.html
flume的sink.type配置如图:
我的flume配置如下:
开发spark streaming程序
from pyspark.streaming import StreamingContext
from pyspark import SparkContext
from pyspark.streaming.flume import FlumeUtils
‘’‘Spark Streaming整合Flume(Pull-based Approach)统计词频’’’
#sc = SparkContext(master=“local[2]”,appName=“FlumePullWordCount”)
ssc = StreamingContext(sc,5)
address = [(“hadoop001”,41414)]
flumeStreams = FlumeUtils.createPollingStream(ssc=ssc,addresses=address)
#统计结果
counts = flumeStreams.map(lambda x: x[1])
.flatMap(lambda line: line.split(" "))
.map(lambda word: (word,1))
.reduceByKey(lambda a,b: a+b)
counts.pprint()
ssc.start()
ssc.awaitTermination()
采用拉取数据的方式需要先启动flume,数据会先存入缓存,再被streaming读取
./flume-ng agent
–name simple-agent
–conf $FLUME_HOME/conf
–conf-file $FLUME_HOME/conf/flume-pull-streaming.conf
-Dflume.root.logger=INFO,console &
将开发好的spark streaming程序复制到pyspark中执行,启动telnet,发送数据
查看词频统计
词频统计完成,有兴趣的小伙伴可以思考一下,如何将结果中的换行符去掉哦。。。