kafka 笔记九 kafka 消息重复
1 生产者阶段重复场景
生产发送的消息没有收到正确的broke响应,导致生产者重试。
生产者发出一条消息,broke落盘以后因为网络等种种原因发送端得到一个发送失败的响应或者网络中断,然后生产者收到一个可恢复的Exception重试消息导致消息重复。
说明:
1. new KafkaProducer()后创建一个后台线程KafkaThread扫描RecordAccumulator中是否有消 息;
2. 调用KafkaProducer.send()发送消息,实际上只是把消息保存到RecordAccumulator中;
3. 后台线程KafkaThread扫描到RecordAccumulator中有消息后,将消息发送到kafka集群;
4. 如果发送成功,那么返回成功;
5. 如果发送失败,那么判断是否允许重试。如果不允许重试,那么返回失败的结果;如果允许重试,把消息再保存到RecordAccumulator中,等待后台线程KafkaThread扫描再次发送;
可恢复异常说明
记录顺序问题
如果设置 max.in.flight.requests.per.connection 大于1(默认5,单个连接上发送的未确认请求的最大数量,表示上一个发出的请求没有确认下一个请求又发出了)。大于1可能会改变记录的顺序,因为如果将两个batch发送到单个分区,第一个batch处理失败并重试,但是第二个batch处理成功,那么第二个batch处理中的记录可能先出现被消费。
设置 max.in.flight.requests.per.connection 为1,可能会影响吞吐量,可以解决单个生产者发送顺序问题。如果多个生产者,生产者1先发送一个请求,生产者2后发送请求,此时生产者1返回可恢复异常,重试一定次数成功了。虽然生产者1先发送消息,但生产者2发送的消息会被先消费。
生产者发送重复解决方案
1 启动kafka的幂等性
要启动kafka的幂等性,设置: enable.idempotence=true ,以及 ack=all 以及 retries > 1。
2 ack=0,不重试。
可能会丢消息,适用于吞吐量指标重要性高于数据丢失,例如:日志收集。
生产者和broke阶段消息丢失场景
1 ack=0,不重试
生产者发送消息完,不管结果了,如果发送失败也就丢失了。
解决生产者和broke阶段消息丢失
消费者数据重复场景及解决方案
数据消费完没有及时提交offset到broker。
每次消费完或者程序退出时手动提交。这可能也没法保证一条重复。