一文读懂kafka
目录
应用场景
kafka具有高吞吐量(分区机制),高容错率(解决并发消费问题)、高可靠性(副本备份)的优点,主要应用于以下三点
- 接口异步解耦:减少请求时间
- 行为分析:记录用户行为进行分析
- 日志收集
主要组件
- producer:生产者,负责发布消息到broker
- customer:消费者,从broker拉取消息进行处理
- zookeeper:维护kafka节点信息
- Topic:消息类型
- partitioner:分区,每个topic分为多个分区
- broker:kafka集群的服务器
- segment:磁盘上分区内的分片存储数据
- offset:分区消息存储的偏移量
- Coordinator:管理group的角色,产生于某个broker
- LEO:(log end offset)记录了副本底层日志中下一条消息的位移值
- HW:(high water)水位:所有副本中最小的已备份的位移值
配置分析
- group.id:多个消费者构成一个分组消费一个主题内的所有分区(partitioner),若只有一个分区,则只有一个消费者可以消费
- enable.auto.commit:上文提到的自动提交
- auto.offset.reset:控制新的groupid消费该topic时的消息读取:
a.lastest:从最后的offset开始消费
b.earlies:从最早的消息开始消费
c.none:之前不存在offset则抛异常 - max.poll.records:消费者每次最大拉去消息数量
各流程分析
生产者->broker
- producer发送消息到broker可通过同步跟异步(需提供回调)的方式:kafka会把消息放入队列,通过批量发送到broker,可通过batch.size(消息字节数总量->默认16KB)和linger.ms(发送间隔时间)参数控制,满足一个条件即发送到broker
- 消息分发到partitioner:kafka默认机制->key为null则随机分发,可通过自定义实现Partitioner实现不同的分发策略
消费者->broker(通知已消费)
- customer消费完后会向broker发送commit指令,broker存储该消费者的偏移量信息(磁盘上__consumer_offset默认有50个分区,通过goupid与分区数量取模确定存储分区),commit方式分为手动和自动提交
- 自动提交可能会导致消息丢失:自动提交时间不宜过长:若在提交时间内,已消费消息了,然后服务挂掉,服务重启后又会从这个offset开始消费(会导致重复消费的问题):可通过手动提交控制,或者offset存储在redis或者其他持久化层面
broker->消费者(消费者消费消息)
- 消费端可通过手动指定消费分区
- 分区分配策略:
1.Range(默认):单个Topic内近似平均分配,多个Topic会分配不均
2.RoundRobin(轮询):通过hashcode排序,按照轮询分配
3.StickyAssignor(粘性):类似轮询,再一个broker挂掉后,会在原先基础上分配,而不是重新分配
确定Coordinator
- Coordinator的作用在于管理group消费者组,分配分区。消费者会向任意一个broker发送GroupCoordinatorRequest请求,服务端返回一个最小的broker节点的id作为Coordinator
JoinGroup过程(新消费者加入或宕机)
- joinGroup肯定是发生在Coordinator之后,新消费者的加入或宕机会触发rebalance(通过zookeeper的watch机制监控节点或者当消费者指定时间内没发送给Coodinator心跳请求HeartbeatRequest):分为join和sync
- join:group组中的所有customer向任意一个broker发送innergroup请求,返回确定一个customer作为leader对象(随机算法)
- sync:leader以及其他customer向Coordinator发送SyncGroupRequest请求,将分区策略分发给其他消费者(主要信息是该消费哪个分区)
分区副本(replica)机制
- partitioner分区机制提高了kafka的吞吐量和并发消费的问题,但是单个partitioner肯定会有可靠性问题,因此引入的副本机制来实现备份
- sh kafka-topics.sh --create --zookeeper zookeeperip:2181 --replication-factor 3 --partitions 3 --topic testTopic 创建带有2个分区副本的partitioner
- 三种副本:
1.leader副本:响应读写请求
2.follower副本:只做数据备份,作为备份分区
3.ISR副本:包含leader和follow副本(replica.lag.time.max.ms控制follow副本同步数据时间。超过则剔除) - replica都宕机后处理方式:等待replica重新启用(一致性)或者不一定是ISR中的副本作为leader(可用性)中作取舍
副本同步原理
- producer发送到leader分区,leader写入磁盘log,follow向leader节点pull数据写入log后向leader发送ACK,leader收到所有ack后增加HW值并向producer发送ACK,则消息发送成功
- 主从副本之间同步过程:
1.leader获取数据后,follow节点
第一次发送fetch请求:副本同步leader的LEO数据
第二次发送fetch请求:更新leader的remoteLEO数据,更新follow的HW为remoteLEO数据
2.leader获取数据前,follow节点发送fetch请求:follow请求阻塞到新消息发送
-副本同步导致的数据丢失:由于HW是异步延迟更新,更新过程中如果leader宕机,就可能导致该数据丢失,leader恢复的时候该消息可能会截断。新kafka 0.11版本后,broker保存缓存定期写入checkpoint,保存epoch和offset的值
副本选举过程
选举策略:ISR优先选取第一个作为leader副本,如果ISR为空并且unclean.leader.election.enable(是否允许非ISR副本作为leader)为false,则抛出NoReplicaOnlineException异常
消息的存储
- 分区partitioner中又有分段概念(LogSegment):包括索引文件(.index)和(.log)数据文件
- 根据offset查找索引文件确定文件的position值,根据position值找到对应的log文件
kafka如何保证消息可靠性
- 分区副本机制
- 配置acks:0->producer只负责发送消息(不考虑broker是否宕机),1->leader写入本地日志就认为消息成功(follow未同步),-1->需要等到follow都备份数据通知producer发送成功(保证producer->broker)
- 消费者手动提交机制(自动提交可能导致消息丢失)保证customer->broker