Kafka消费者手动提交详解及demo
在上一篇 Kafka基础及java客户端使用Kafka使用Java实现数据的生产和消费demo 中介绍如何简单的使用kafka进行数据传输。本篇则重点介绍kafka中的 consumer 消费者的讲解。
应用场景
在上一篇kafka的consumer消费者,我们使用的是自动提交offset下标。
但是offset下标自动提交其实在很多场景都不适用,因为自动提交是在kafka拉取到数据之后就直接提交,这样很容易丢失数据,尤其是在需要事物控制的时候。
很多情况下我们需要从kafka成功拉取数据之后,对数据进行相应的处理之后再进行提交。如拉取数据之后进行写入mysql这种 , 所以这时我们就需要进行手动提交kafka的offset下标。
这里顺便说下offset具体是什么。
offset:指的是kafka的topic中的每个消费组消费的下标。
简单的来说就是一条消息对应一个offset下标,每次消费数据的时候如果提交offset,那么下次消费就会从提交的offset加一那里开始消费。
比如一个topic中有100条数据,我消费了50条并且提交了,那么此时的kafka服务端记录提交的offset就是49(offset从0开始),那么下次消费的时候offset就从50开始消费。
测试
- 首先使用producer发送一百条测试数据
public static void main(String[] args) throws InterruptedException {
//消息实体
ProducerRecord<String , String> record = null;
for (int i = 1; i <= 100; i++) {
record = new ProducerRecord<String, String>(MQDict.PRODUCER_TOPIC, "value"+i);
//发送消息
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (null != e){
log.info("send error" + e.getMessage());
}else {
System.out.println(String.format("offset:%s,partition:%s",recordMetadata.offset(),recordMetadata.partition()));
}
}
});
}
producer.close();
}
操作结果:
- kafka客户端配置取消自动提交
package com.example.demo.MQUtil;
import java.time.Duration;
/**
* @author created 旨酒思柔
* @date 2019/3/27
*/
public class MQDict {
public static final String MQ_ADDRESS_COLLECTION = "127.0.0.1:9092"; //kafka地址
public static final String CONSUMER_TOPIC = "test"; //消费者连接的topic
public static final String PRODUCER_TOPIC = "test"; //生产者连接的topic
public static final String CONSUMER_GROUP_ID = "1"; //groupId,可以分开配置
public static final String CONSUMER_ENABLE_AUTO_COMMIT = "false"; //是否自动提交(消费者)!这里为false,不自动提交
public static final String CONSUMER_AUTO_COMMIT_INTERVAL_MS = "1000";
public static final String CONSUMER_SESSION_TIMEOUT_MS = "30000"; //连接超时时间
public static final int CONSUMER_MAX_POLL_RECORDS = 10; //每次拉取数
public static final Duration CONSUMER_POLL_TIME_OUT = Duration.ofMillis(3000); //拉去数据超时时间
}
public static final String CONSUMER_ENABLE_AUTO_COMMIT = “false”; //是否自动提交(消费者)!这里为false,不自动提交
- 消费者(手动提交注释不打开)
public static void main(String[] args) {
Logger.getLogger("org").setLevel(Level.ERROR);
while (true) {
ConsumerRecords<String, String> records = consumer.poll(MQDict.CONSUMER_POLL_TIME_OUT);
records.forEach((ConsumerRecord<String, String> record)->{
log.info("revice: key ==="+record.key()+" value ===="+record.value()+" topic ==="+record.topic());
});
//consumer.commitSync(); 提交
}
}
}
执行结果:
可见提交的100条全部取出,当再次启动consumer时,执行结果:
因为没有提交消费,已经被消费的数据还会被再次消费。
- 消费者当消费50条时,提交一次消费,并且关闭消费者
public static void main(String[] args) {
Logger.getLogger("org").setLevel(Level.ERROR);
int i = 1 ;
while (true) {
ConsumerRecords<String, String> records = consumer.poll(MQDict.CONSUMER_POLL_TIME_OUT);
records.forEach((ConsumerRecord<String, String> record)->{
log.info("revice: key ==="+record.key()+" value ===="+record.value()+" topic ==="+record.topic());
});
i++;
if (i >=5 ){
consumer.commitSync();
consumer.close();
break;
}
}
}
执行结果:
消费50条,当再次执行消费者时:
是从第51条开始消费,由此可见当提交消费时,同一组下的消费者不会重复消费数据。