Kafka使用过程中的一些最佳实践
组内最近一年都在使用Kafka做消息中间件做数据的流式处理,总结一下使用过程的经验教训
1.一些参数值的设定,主要是保证数据的不丢失
- block.on.buffer.full = true
开启buffer缓存
- acks = all
- retries = MAX_VALUE
生产者Producer端想kafka发送数据,要想数据不丢失,应该设置以上两个配置
- max.in.flight.requests.per.connection = 1
如果需要严格保证每个partition消息到达的有序性,需要保证每次处理的请求连接为1
- 使用KafkaProducer.send(record, callback)
- callback逻辑中显式关闭producer:close(0)
Producer使用回调函数,处理异常情况, 并进行生产者的显式关闭
- unclean.leader.election.enable=false
关闭unclean的leader选举,避免在所有某个partitiond的所有副本都挂了的情况下造成数据丢失
- replication.factor = 3
- min.insync.replicas = 2
- replication.factor > min.insync.replicas
对于每个partition都设置为3个副本,同时保证某个时刻处于同步状态的副本数至少为2,ps:这里遇到过生产环境的issue,原因就是当时6台服务器挂了两台,导致某个partition的最小同步副本小于2,producer无法往kafka上发送数据
- enable.auto.commit=false
- 消息处理完成之后再提交位移
consumer端要在提交offset时,最好采用手动提交的方式,关闭自动提交,对于数据完整性要求很高的业务时,可以采用事务,把consumer的处理数据的逻辑和提交offset的逻辑放在同一个事务当中,避免处理了数据但是没有及时提交offset,consumer如果此时挂了,会造成数据的重复消费,但是引入事务,也会导致并发量下降,所以需要综合考虑业务情况
2,遇到kafka数据大量堆积,此时consumer没办法及时消费,这个原因可能是上游发来的速度突然过大或者我们消费者消费速度过慢造成的,对于这个问题,我们组设计了一个可拓展的应急方法
如图所示,假如之前我们的kafka有3个partition,由于消息大量积压,比如积压了1000万条,我们的消费者无法再短时间内处理完这些数据,那我们可以对消费者端进行修改,不在进项业务逻辑的处理,而是优先将数据在转发到一个有30个partition的topic上,然后开30个consumer线程进程处理,这样会大大增加处理效率