最近要做一个日志实时分析的应用,采用了flume+kafka+sparkstreaming框架,先搞了一个测试Demo,本文没有分析其架构原理。
简介:flume是一个分布式,高可靠,可用的海量日志聚合系统,kafka是一高吞吐量的分布式发布订阅系统,sparkstreaming是建立在spark上的实时计算框架,这这个Demo中,以上内容均为单机版伪分布,flume的source为exec,agent的名称为producer,sink为kafka。
运行所需要的环境直接到官网上下载即可:
我的环境是:flume1.6+kafka_2.10+spark1.2.0
flume的配置:
在conf下编辑配置文件roomy.conf如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
#agent section producer.sources = s producer.channels = c producer.sinks = r #source section producer.sources.s. type = exec producer.sources.s. command = tail -F -n+1 /Users/roomy/Desktop/Coding/scala/real_time_project/debug .log #监听日志所在 producer.sources.s.channels = c # Each sink's type must be defined producer.sinks.r. type = org.apache.flume.plugins.KafkaSink producer.sinks.r.metadata.broker.list=192.168.1.102:9092 #这里换成自己Kafka的地址 producer.sinks.r.partition.key=0 producer.sinks.r.partitioner.class=org.apache.flume.plugins.SinglePartition producer.sinks.r.serializer.class=kafka.serializer.StringEncoder producer.sinks.r.request.required.acks=0 producer.sinks.r.max.message.size=1000000 producer.sinks.r.producer. type = sync producer.sinks.r.custom.encoding=UTF-8 |
在flume文件夹下运行
1 |
bin /flume-ng agent --conf conf --conf- file conf /roomy .conf --name producer -Dflume.root.logger=INFO,console |
flume的部分完成。
在kafka目录下运行:
1 |
bin /zookeeper-server-start .sh config /zookeeper .properties |
启动zookeeper
运行:
1 |
bin /kafka-server-start .sh config /server .properties |
启动kafka,这里无需做什么额外配置。
最后编写spark streaming测试Demo程序
直接新建SBT项目,build.sbt如下:
1 2 3 4 5 6 7 8 9 10 11 |
name := "sk" version := "1.0" scalaVersion := "2.10.4" libraryDependencies += "org.apache.spark" % "spark-streaming_2.10" % "1.6.1" libraryDependencies += "org.apache.spark" % "spark-streaming-kafka_2.10" % "1.6.1" libraryDependencies += "log4j" % "log4j" % "1.2.17" |
需要注意的是,由于GFW,下载慢的要死,接下来就是测试程序
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 |
import kafka.serializer.StringDecoder import org.apache.spark.SparkConf import org.apache.spark.streaming.dstream.DStream import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.kafka.KafkaUtils /** * Created by roomy on 16/3/23. */ object KafkaStreaming { def main(agrs : Array[String]) : Unit = { val sparkConf = new SparkConf().setMaster( "local[2]" ).setAppName( "Streamingtest" ) val ssc = new StreamingContext(sparkConf, Seconds( 20 )) val topic = "test" val topicSet = topic.split( " " ).toSet //create direct kafka stream with brokers and topics val kafkaParams = Map[String, String]( "metadata.broker.list" -> "localhost:9092" ) val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder]( ssc, kafkaParams, topicSet ) val lines = messages.map( _ . _ 2 ) lines.print() val words : DStream[String] = lines.flatMap( _ .split( "\n" )) words.count().print() //启动 ssc.start() ssc.awaitTermination() } } |
可以通过StreamContext的构造函数设置数据采集分析的间隔。
程序会监听/Users/roomy/Desktop/Coding/scala/real_time_project/debug.log中的变动,并以20秒一次的频率总计增加行数输出在控制台。
日志没有变动的时候如下:

运行测试程序产生日志:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 |
import org.apache.log4j.Logger; /** * Created by roomy on 16/3/23. * to generate some log to test */ public class LogGenerator implements Runnable{ private static Logger logger = Logger.getLogger(LogGenerator. class ); private int no; public LogGenerator( int no){ this .no=no; } public static void main(String [] agrs) throws InterruptedException { for ( int i= 0 ;i< 5 ;i++){ new Thread( new LogGenerator(i)).start(); } } @Override public void run() { while ( true ){ logger.debug( "this is a test information produced by roomy no:" +Thread.currentThread().getName()); try { Thread.sleep(( int )Math.random()* 100 ); } catch (Exception e){ e.printStackTrace(); } } } } |
控制台输出如下:
streaming的输出操作会把每个批次的前十个元素输出如下:

在这20秒内总共产生的日志行数为:
参考文档:
https://flume.apache.org/FlumeUserGuide.html
http://kafka.apache.org/documentation.html
Spark快速大数据分析