[大数据]连载No17之Kafka搭建+SparkStreaming整合Kafka
Kafka
特点:
1、分布式流平台,零拷贝,依赖zookeper,保存每一个消费者偏移量,kafka集群中一些元素(topic,partitions)
2、主从建构,需要每一个节点启动kafka进程,主(leader)是在集群启动后选举出来的,不是用户选择的,类似zookeper
概念:
broker:每个节点
producer:消息生产者
consumer:消息消费者
topic:主题,有多个partition,分布式
搭建步骤
1、安装包 kafka_2.10-0.8.2.2.tgz 解压
2 vim kafk/config/server.properties
broker.id=0 (节点)
broker.id=1
broker.id=2
log.dirs=/data/kafka #kafka存放数据目录
auto.leader.rebalance.enable=true #开起kafka的leader进程机制
zookeeper.connect=node1:2181,node2:2181,node3:2181 #zookeeper地址
3、 复制kafka到各个节点,各个节点启动kafka
./bin/kafka-server-start.sh -daemon ../config/server.properties #daemon后台启动
4、验证、创建topic
./bin/kafka-topics.sh --create --zookeeper node1:2181,node2:2181,node3:2181 --topic kafka_new --partitions 3 --replication-factor 3
#partitions: 创建topic有几个ppartition
#replication-factor :每一个partion有几个副本(备份)
5、查看topic
./bin/kafka-topics.sh --list --zookeeper node1:2181,node2:2181,node3:2181
6、topic生产数据
./bin/kafka-console-producer.sh --broker-list node1:9092,node2:9092,node3:9092 --topic kafka_new
7、消费数据
./bin/kafka-console-consumer.sh --zookeeper node1:2181,node2:2181,node3:2181 --topic kafka_new
#默认消费启动consumer后产生的数据,如果要消费从头消费数据,需指定 --from-beginning
#消费连接端口2181,是因为zookeeper保存消费数据偏移量,保存消费者上一次消费位置
SparkStreaming+Kafka整合的两种方式
1、receiver(接受者模式,由kafka推送数据)
缺点:并不能一次的恰好处理一次事物
流程如图下
2、Direct【常用,推荐】(直连kafka)
特点、不需要跟新偏移量,有sparkstreaming自己维护,不依赖zookeeper
优势
a、消费偏移量有sparkingstreaming自己来维护,数据处理成功就跟新偏移量,处理失败就从kafka从新拉数据,恰好一次的事物处理的比较好,
b、每隔一段时间发送task到executo中去请求执行
流程图
代码:
接受者模式
/** * 【接受者模式】 */ SparkConf conf = new SparkConf() .setAppName("SparkStreamingOnKafkaReceiver") .setMaster("local[2]") /** *设置预写日志,防止更鞋zookeeper偏移量后,executor或者dirver挂掉,导致数据没有执行 */ .set("spark.streaming.receiver.writeAheadLog.enable","true"); // .set("spark.streaming.concurrentJobs", "10"); JavaStreamingContext jsc = new JavaStreamingContext(conf, Durations.seconds(5)); //在这里不只是能够设置本地目录,也可以设置成HDFS上的一个目录 jsc.checkpoint("d://receivedata0731"); /** * 1、KafkaUtils.createStream 使用五个参数的方法,设置receiver的存储级别 * 2、在java里面使用多个receiver,需要将JavaPairReceiverInputDStream转换成javaDstream使用toJavaDstream * val numStreams = 5 val kafkaStreams = (1 to numStreams).map { i => KafkaUtils.createStream(...) } val unifiedStream = streamingContext.union(kafkaStreams) * */ Map<String, Integer> topicConsumerConcurrency = new HashMap<String, Integer>(); topicConsumerConcurrency.put("dingzheng", 1); JavaPairReceiverInputDStream<String,String> lines = KafkaUtils.createStream(jsc, "192.168.126.111:2181,192.168.126.112:2181,192.168.126.113:2181", "MyFirstConsumerGroup", topicConsumerConcurrency, StorageLevel.MEMORY_ONLY()); //处理。。。直连模式
/** * 直连模式 */ SparkConf confd = new SparkConf(). setAppName("SparkStreamingOnKafkaDirected"); /** * 直连模式不需要额外开启进程,监听kafka推送过来的数据, * 而是每隔一段时间去请求数据 */ // .setMaster("local[1]"); JavaStreamingContext jscd = new JavaStreamingContext(confd, Durations.seconds(10)); // jsc.checkpoint("d:/checkpointdddd"); Map<String, String> kafkaParameters = new HashMap<String, String>(); /** * kafkaurl,直接连kafka,当做一个存储集群了 */ kafkaParameters.put("metadata.broker.list", "192.168.126.111:9092,192.168.126.112:9092,192.168.126.113:9092"); HashSet<String> topics = new HashSet<String>(); topics.add("dingzheng"); JavaPairInputDStream<String,String> linesd = KafkaUtils.createDirectStream(jscd, String.class, String.class, StringDecoder.class, StringDecoder.class, kafkaParameters, topics);