[大数据]连载No17之Kafka搭建+SparkStreaming整合Kafka

本次总结图如下

[大数据]连载No17之Kafka搭建+SparkStreaming整合Kafka

Kafka
特点:
1、分布式流平台,零拷贝,依赖zookeper,保存每一个消费者偏移量,kafka集群中一些元素(topic,partitions)
2、主从建构,需要每一个节点启动kafka进程,主(leader)是在集群启动后选举出来的,不是用户选择的,类似zookeper


概念:
broker:每个节点
producer:消息生产者
consumer:消息消费者
topic:主题,有多个partition,分布式


搭建步骤
1、安装包 kafka_2.10-0.8.2.2.tgz 解压
2  vim kafk/config/server.properties
   broker.id=0  (节点)
   broker.id=1
   broker.id=2
   log.dirs=/data/kafka #kafka存放数据目录
   auto.leader.rebalance.enable=true #开起kafka的leader进程机制
   zookeeper.connect=node1:2181,node2:2181,node3:2181 #zookeeper地址
3、 复制kafka到各个节点,各个节点启动kafka  
    ./bin/kafka-server-start.sh -daemon ../config/server.properties #daemon后台启动
4、验证、创建topic
   ./bin/kafka-topics.sh --create --zookeeper node1:2181,node2:2181,node3:2181  --topic kafka_new --partitions 3 --replication-factor 3
   #partitions: 创建topic有几个ppartition
   #replication-factor :每一个partion有几个副本(备份)
5、查看topic
   ./bin/kafka-topics.sh --list --zookeeper node1:2181,node2:2181,node3:2181
6、topic生产数据
   ./bin/kafka-console-producer.sh  --broker-list node1:9092,node2:9092,node3:9092 --topic kafka_new
7、消费数据
   ./bin/kafka-console-consumer.sh  --zookeeper node1:2181,node2:2181,node3:2181 --topic kafka_new
   #默认消费启动consumer后产生的数据,如果要消费从头消费数据,需指定  --from-beginning
   #消费连接端口2181,是因为zookeeper保存消费数据偏移量,保存消费者上一次消费位置

SparkStreaming+Kafka整合的两种方式
1、receiver(接受者模式,由kafka推送数据)
 缺点:并不能一次的恰好处理一次事物
 流程如图下
[大数据]连载No17之Kafka搭建+SparkStreaming整合Kafka


2、Direct【常用,推荐】(直连kafka) 
 特点、不需要跟新偏移量,有sparkstreaming自己维护,不依赖zookeeper
 优势
 a、消费偏移量有sparkingstreaming自己来维护,数据处理成功就跟新偏移量,处理失败就从kafka从新拉数据,恰好一次的事物处理的比较好,
 b、每隔一段时间发送task到executo中去请求执行
 流程图 
[大数据]连载No17之Kafka搭建+SparkStreaming整合Kafka


代码:
 接受者模式

        /**
         * 【接受者模式】
         */

        SparkConf conf = new SparkConf()
                .setAppName("SparkStreamingOnKafkaReceiver")
                .setMaster("local[2]")
                /**
                 *设置预写日志,防止更鞋zookeeper偏移量后,executor或者dirver挂掉,导致数据没有执行
                 */
                .set("spark.streaming.receiver.writeAheadLog.enable","true");
//          .set("spark.streaming.concurrentJobs", "10");


        JavaStreamingContext jsc = new JavaStreamingContext(conf, Durations.seconds(5));
        //在这里不只是能够设置本地目录,也可以设置成HDFS上的一个目录
        jsc.checkpoint("d://receivedata0731");

        /**
         * 1、KafkaUtils.createStream 使用五个参数的方法,设置receiver的存储级别
         * 2、在java里面使用多个receiver,需要将JavaPairReceiverInputDStream转换成javaDstream使用toJavaDstream
         *  val numStreams = 5
         val kafkaStreams = (1 to numStreams).map { i => KafkaUtils.createStream(...) }
         val unifiedStream = streamingContext.union(kafkaStreams)
         *
         */
        Map<String, Integer> topicConsumerConcurrency = new HashMap<String, Integer>();
        topicConsumerConcurrency.put("dingzheng", 1);
        JavaPairReceiverInputDStream<String,String> lines = KafkaUtils.createStream(jsc,
                "192.168.126.111:2181,192.168.126.112:2181,192.168.126.113:2181", "MyFirstConsumerGroup", topicConsumerConcurrency, StorageLevel.MEMORY_ONLY());


        //处理。。。
直连模式
        /**
         * 直连模式
         */
        SparkConf confd = new SparkConf().
                setAppName("SparkStreamingOnKafkaDirected");
        /**
         * 直连模式不需要额外开启进程,监听kafka推送过来的数据,
         * 而是每隔一段时间去请求数据
         */
//          .setMaster("local[1]");

        JavaStreamingContext jscd = new JavaStreamingContext(confd, Durations.seconds(10));
//    jsc.checkpoint("d:/checkpointdddd");

        Map<String, String> kafkaParameters = new HashMap<String, String>();
        /**
         * kafkaurl,直接连kafka,当做一个存储集群了
         */
        kafkaParameters.put("metadata.broker.list", "192.168.126.111:9092,192.168.126.112:9092,192.168.126.113:9092");

        HashSet<String> topics = new HashSet<String>();
        topics.add("dingzheng");
        JavaPairInputDStream<String,String> linesd = KafkaUtils.createDirectStream(jscd,
                String.class,
                String.class,
                StringDecoder.class,
                StringDecoder.class,
                kafkaParameters,
                topics);