Kafka消费者手动提交详解及demo

在上一篇 Kafka基础及java客户端使用Kafka使用Java实现数据的生产和消费demo 中介绍如何简单的使用kafka进行数据传输。本篇则重点介绍kafka中的 consumer 消费者的讲解。

应用场景

在上一篇kafka的consumer消费者,我们使用的是自动提交offset下标。
但是offset下标自动提交其实在很多场景都不适用,因为自动提交是在kafka拉取到数据之后就直接提交,这样很容易丢失数据,尤其是在需要事物控制的时候。
很多情况下我们需要从kafka成功拉取数据之后,对数据进行相应的处理之后再进行提交。如拉取数据之后进行写入mysql这种 , 所以这时我们就需要进行手动提交kafka的offset下标。

这里顺便说下offset具体是什么。
offset:指的是kafka的topic中的每个消费组消费的下标。
简单的来说就是一条消息对应一个offset下标,每次消费数据的时候如果提交offset,那么下次消费就会从提交的offset加一那里开始消费。
比如一个topic中有100条数据,我消费了50条并且提交了,那么此时的kafka服务端记录提交的offset就是49(offset从0开始),那么下次消费的时候offset就从50开始消费。

测试

  1. 首先使用producer发送一百条测试数据
public static void main(String[] args) throws InterruptedException {
        //消息实体
        ProducerRecord<String , String> record = null;
        for (int i = 1; i <= 100; i++) {
            record = new ProducerRecord<String, String>(MQDict.PRODUCER_TOPIC, "value"+i);
            //发送消息
            producer.send(record, new Callback() {
                @Override
                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                    if (null != e){
                        log.info("send error" + e.getMessage());
                    }else {
                        System.out.println(String.format("offset:%s,partition:%s",recordMetadata.offset(),recordMetadata.partition()));
                    }
                }
            });
        }
        producer.close();
    }

操作结果:
Kafka消费者手动提交详解及demo

  1. kafka客户端配置取消自动提交
package com.example.demo.MQUtil;

import java.time.Duration;

/**
 * @author created 旨酒思柔
 * @date 2019/3/27
 */
public class MQDict {
    public static final String MQ_ADDRESS_COLLECTION = "127.0.0.1:9092";			//kafka地址
    public static final String CONSUMER_TOPIC = "test";						//消费者连接的topic
    public static final String PRODUCER_TOPIC = "test";						//生产者连接的topic
    public static final String CONSUMER_GROUP_ID = "1";								//groupId,可以分开配置
    public static final String CONSUMER_ENABLE_AUTO_COMMIT = "false";				//是否自动提交(消费者)!这里为false,不自动提交
    public static final String CONSUMER_AUTO_COMMIT_INTERVAL_MS = "1000";			
    public static final String CONSUMER_SESSION_TIMEOUT_MS = "30000";				//连接超时时间
    public static final int CONSUMER_MAX_POLL_RECORDS = 10;							//每次拉取数
    public static final Duration CONSUMER_POLL_TIME_OUT = Duration.ofMillis(3000);	//拉去数据超时时间

}


public static final String CONSUMER_ENABLE_AUTO_COMMIT = “false”; //是否自动提交(消费者)!这里为false,不自动提交

  1. 消费者(手动提交注释不打开)

    public static void main(String[] args) {
        Logger.getLogger("org").setLevel(Level.ERROR);

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(MQDict.CONSUMER_POLL_TIME_OUT);
            records.forEach((ConsumerRecord<String, String> record)->{
                    log.info("revice: key ==="+record.key()+" value ===="+record.value()+" topic ==="+record.topic());
            });
            //consumer.commitSync();  提交
        }
    }

}


执行结果:
Kafka消费者手动提交详解及demo
可见提交的100条全部取出,当再次启动consumer时,执行结果:

Kafka消费者手动提交详解及demo
因为没有提交消费,已经被消费的数据还会被再次消费。

  1. 消费者当消费50条时,提交一次消费,并且关闭消费者
public static void main(String[] args) {
        Logger.getLogger("org").setLevel(Level.ERROR);

        int i = 1 ;
        while (true) {

            ConsumerRecords<String, String> records = consumer.poll(MQDict.CONSUMER_POLL_TIME_OUT);
            records.forEach((ConsumerRecord<String, String> record)->{
                    log.info("revice: key ==="+record.key()+" value ===="+record.value()+" topic ==="+record.topic());
            });
            i++;
            if (i >=5 ){
                consumer.commitSync();
                consumer.close();
                break;
            }
        }
    }

执行结果:
Kafka消费者手动提交详解及demo
消费50条,当再次执行消费者时:
Kafka消费者手动提交详解及demo
是从第51条开始消费,由此可见当提交消费时,同一组下的消费者不会重复消费数据。

参考资料:关于Kafka 的 consumer 消费者手动提交