Flume+Kafka+Sparkstreaming日志分析

最近要做一个日志实时分析的应用,采用了flume+kafka+sparkstreaming框架,先搞了一个测试Demo,本文没有分析其架构原理。

  简介:flume是一个分布式,高可靠,可用的海量日志聚合系统,kafka是一高吞吐量的分布式发布订阅系统,sparkstreaming是建立在spark上的实时计算框架,这这个Demo中,以上内容均为单机版伪分布,flume的source为exec,agent的名称为producer,sink为kafka。

  运行所需要的环境直接到官网上下载即可:

  我的环境是:flume1.6+kafka_2.10+spark1.2.0

  flume的配置:

  在conf下编辑配置文件roomy.conf如下:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

#agent section

producer.sources = s

producer.channels = c

producer.sinks = r

 

#source section

producer.sources.s.type exec

producer.sources.s.command tail -F -n+1 /Users/roomy/Desktop/Coding/scala/real_time_project/debug.log#监听日志所在

producer.sources.s.channels = c

 

# Each sink's type must be defined

producer.sinks.r.type = org.apache.flume.plugins.KafkaSink

producer.sinks.r.metadata.broker.list=192.168.1.102:9092#这里换成自己Kafka的地址

producer.sinks.r.partition.key=0

producer.sinks.r.partitioner.class=org.apache.flume.plugins.SinglePartition

producer.sinks.r.serializer.class=kafka.serializer.StringEncoder

producer.sinks.r.request.required.acks=0

producer.sinks.r.max.message.size=1000000

producer.sinks.r.producer.type=sync

producer.sinks.r.custom.encoding=UTF-8

  在flume文件夹下运行

1

bin/flume-ng agent --conf conf --conf-file conf/roomy.conf --name producer -Dflume.root.logger=INFO,console

   flume的部分完成。

 

  在kafka目录下运行:

1

bin/zookeeper-server-start.sh config/zookeeper.properties

  启动zookeeper

  运行:

1

bin/kafka-server-start.sh config/server.properties

   启动kafka,这里无需做什么额外配置。

 

  最后编写spark streaming测试Demo程序

  直接新建SBT项目,build.sbt如下:

1

2

3

4

5

6

7

8

9

10

11

name := "sk"

 

version := "1.0"

 

scalaVersion := "2.10.4"

 

libraryDependencies += "org.apache.spark" "spark-streaming_2.10" "1.6.1"

 

libraryDependencies += "org.apache.spark" "spark-streaming-kafka_2.10" "1.6.1"

 

libraryDependencies += "log4j" "log4j" "1.2.17"

   需要注意的是,由于GFW,下载慢的要死,接下来就是测试程序

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

import kafka.serializer.StringDecoder

import org.apache.spark.SparkConf

import org.apache.spark.streaming.dstream.DStream

import org.apache.spark.streaming.{Seconds, StreamingContext}

import org.apache.spark.streaming.kafka.KafkaUtils

 

/**

  * Created by roomy on 16/3/23.

  */

object KafkaStreaming {

 

  def main(agrs: Array[String]): Unit = {

 

    val sparkConf = new SparkConf().setMaster("local[2]").setAppName("Streamingtest")

    val ssc = new StreamingContext(sparkConf, Seconds(20))

 

    val topic = "test"

    val topicSet = topic.split(" ").toSet

 

    //create direct kafka stream with brokers and topics

    val kafkaParams = Map[String, String]("metadata.broker.list" -> "localhost:9092")

 

    val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](

      ssc, kafkaParams, topicSet

    )

    val lines = messages.map(_._2)

    lines.print()

    val words: DStream[String] = lines.flatMap(_.split("\n"))

    words.count().print()

 

    //启动

    ssc.start()

    ssc.awaitTermination()

  }

}

   可以通过StreamContext的构造函数设置数据采集分析的间隔。

   程序会监听/Users/roomy/Desktop/Coding/scala/real_time_project/debug.log中的变动,并以20秒一次的频率总计增加行数输出在控制台。

   日志没有变动的时候如下:

Flume+Kafka+Sparkstreaming日志分析

   运行测试程序产生日志:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

import org.apache.log4j.Logger;

 

/**

 * Created by roomy on 16/3/23.

 * to generate some log to test

 */

public class LogGenerator implements Runnable{

 

    private static Logger logger = Logger.getLogger(LogGenerator.class);

    private int no;

 

    public LogGenerator(int no){

        this.no=no;

    }

 

    public static void main(String [] agrs) throws InterruptedException {

        for(int i=0;i<5;i++){

            new Thread(new LogGenerator(i)).start();

        }

    }

 

    @Override

    public void run() {

        while (true){

            logger.debug("this is a test information produced by roomy no:"+Thread.currentThread().getName());

            try{

                Thread.sleep((int)Math.random()*100);

            }

            catch (Exception e){

                e.printStackTrace();

            }

        }

    }

}

   控制台输出如下:

  streaming的输出操作会把每个批次的前十个元素输出如下:

Flume+Kafka+Sparkstreaming日志分析

  在这20秒内总共产生的日志行数为:

Flume+Kafka+Sparkstreaming日志分析 

  参考文档:

  https://flume.apache.org/FlumeUserGuide.html

  http://kafka.apache.org/documentation.html 

  Spark快速大数据分析

转载于:https://my.oschina.net/hblt147/blog/1840271