揭秘kafka消费者(一)

由于消费者涉及的内容比较多,所以分成三个部分介绍,先介绍第一部分,包含:消费者概述,创建kafka消费者,订阅主题,轮询,消费者配置。

消费者概述

1:我们需要明确两个概念:
1.1:消费者:向kafka订阅主题,并接受消息。
1.2:消费者群组:消费者的逻辑分组,分组内的消费者订阅的是同一个主题,每个消费者接收主题一部分分区的消息。
1.3:下面几张图片展示了消费者接收分区消息的五种情况:
揭秘kafka消费者(一)
揭秘kafka消费者(一)
揭秘kafka消费者(一)
揭秘kafka消费者(一)
揭秘kafka消费者(一)
2:分区再均衡
2.1:先明确几个概念:
2.1.1:分区再均衡:分区的所有权从一个消费者转移到另外一个消费者,这样的过程称为再均衡。
2.1.2:心跳:判断消费者是否死亡。只要消费者在正常的时间间隔发送心跳则认为是活跃的,否则认为死亡。
2.1.3:分配过程:假设消费者1为群主
消费者1发送请求-加入群组,群组协调器将成员列表返回给消费者1,消费者1进行分区分配,并将分配后的结果发送给群组协调器,群组协调器将分配结果发送给各个消费者。
揭秘kafka消费者(一)

创建消费者

1:先上代码:
揭秘kafka消费者(一)
2:对关键信息做出解释:
揭秘kafka消费者(一)

订阅主题

1:通过subscribe()订阅主题
2:订阅方式:
2.1:可同时订阅多个主题:consumer.subscribe(Collections.singletonList(“customerCountries”));
2.2:可通过正则表达式订阅主题:consumer.subscribe(“test.*”);

轮询

1:轮询的作用:群组协调,分区再均衡,发送心跳,获取数据
2:代码:
try {
while (true) {
ConsumerRecord<String, String> records = consumer.poll(100) ;
for (ConsumerRecord<String, String> record : records) {
log.debug(record.topic() + record.partition() + record.offset() + record.key() + record.value()); //统计各个地区的客户数量,即模拟对消息的处理
int updatedCount = 1;
updatedCount += custCountryMap.getOrDefault(record.value(), 0) + 1;
custCountryMap.put(record.value(), updatedCount); //真实场景中,结果一般会被保存到数据存储系统中
JSONObject json = new JSONObject(custCountryMap);
System.out.println(json.toString(4));
}
}
} finally {
//退出应用程序前使用close方法关闭消费者,网络连接和socket也会随之关闭,并立即触发一次再均衡
consumer.close();
}
3:加粗斜体的数字是标记要解释的代码:
揭秘kafka消费者(一)

消费者配置

1:介绍一些比较主要的配置参数,所有配置参数可以到官网上查看:
揭秘kafka消费者(一)
揭秘kafka消费者(一)

结束语

kafka消费者第一部分介绍完毕,下一部分将介绍提交和偏移量。