kafka笔记三 工作流程 生产者 消费者
文章目录
kafka工作流程
kafka中的数据只能保证分区内有序,不能保证全局有序
topic是逻辑上的概念,而partition是物理上的概念,可以在本地看见真实的目录
kafka文件存储机制
由于生产者生产的消息会不断追加到 log 文件末尾,为防止 log 文件过大导致数据定位效率低下,Kafka 采取了分片和索引机制,将每个 partition 分为多个 segment。每个 segment对应两个文件:
.index文件和 .log 文件。这些文件位于一个文件夹下,该文件夹的命名规则为:
topic 名称+分区序号。例如,first 这个 topic 有三个分区,则其对应的文件夹为 first-0,first-1,first-2
index 和 log 文件以当前 segment 的第一条消息的 offset 命名。
找消息的步骤 :先在index中 查找存储消息的编号 和 每条消息的起始偏移量再去log(真正存储的消息)中查找数据(采用二分法查找)
kafka生产者
分区策略
分区的原因:
(1)方便在集群中扩展,每个 Partition 可以通过调整以适应它所在的机器,而一个 topic又可以有多个 Partition 组成,因此整个集群就可以适应任意大小的数据了;
(2)可以提高并发,因为可以以 Partition 为单位读写了
分区的原则
将 producer 发送的数据封装成一个ProducerRecord 对象。
ISR
当leader 收到数据,所有 follower 都开始同步数据,但有一个 follower,因为某种故障,迟迟不能与leader 进行同步,那 leader 就要一直等下去,直到它完成同步,才能发送 ack。这个问题怎么解决呢?
Leader 维护了一个动态的 in-sync replica set (ISR),意为和 leader 保持同步的 follower 集合。当 ISR 中的 follower 完成数据的同步之后,leader 就会给 follower 发送 ack。如果 follower长时间 未 向 leader 同 步 数 据 , 则 该 follower 将 被 踢 出 ISR , 该 时 间 阈 值 由replica.lag.time.max.ms 参数设定。如果当Leader 发生故障时,就会从 ISR 中选举新的 leader。
0.9版本之后移除的参数 :
replica.lag.max.messags
原因:如果生产者每次发送的batch大小大于max.messags(默认10000)的值时,当leader写完后follow开始同步数据时所有的follower与leader的差值都大于参数设置,所以所有的follower都移除出ISR,当follower写完后又重新加入进ISR,频繁进出ISR会增加资源消耗。也增加zk的压力
数据可靠性
为保证 producer 发送的数据,能可靠的发送到指定的 topic,topic 的每个 partition 收到producer 发送的数据后,都需要向 producer 发送ack(acknowledgement) 确认收到,如producer 收到 ack,就会进行下一轮的发送,否则重新发送数据。
发送ack的两种策略Kafka 选择了第二种方案,原因如下:
1.同样为了容忍 n 台节点的故障,第一种方案需要 2n+1 个副本,而第二种方案只需要 n+1个副本,而 Kafka 的每个分区都有大量的数据,第一种方案会造成大量数据的冗余。
2.虽然第二种方案的网络延迟会比较高,但网络延迟对 Kafka 的影响较小。
ack的参数设置
(此处fllower都为ISR中的follower)
0:producer 不等待 broker 的 ack,这一操作提供了一个最低的延迟,broker 一接收到还没有写入磁盘就已经返回,当 broker 故障时丢失数据几率最大;
1:producer 等待 broker 的 ack,partition 的leader 落盘成功后返回 ack,如果在 follower同步成功之前 leader 故障,那么将会丢失数据;
-1(all):producer 等待 broker 的 ack,partition 的 leader 和 follower 全部落盘成功后才返回 ack。但是如果在 follower 同步完成后,broker 发送 ack 之前,leader 发生故障,那么会造成数据重复。
故障处理 (一致性问题)
HW (high watermark)所有副本中最小的LEO
LEO(log end offset)每个副本的最后一个offset
存储数据的一致性:
follower 故障:
当follower 故障时会被临时踢出 ISR,待该follower 恢复后,follower 会读取本地磁盘记录的上次的 HW,如果此时log文件高于HW会将 log 文件高于 HW 的部分截取掉,从 HW 开始向 leader 进行同步。等该 follower 的 LEO 大于等于该 Partition 的 HW,即 follower 追上 leader 之后,就可以重新加入 ISR 了。
leader故障:
leader 发生故障之后,会从 ISR 中选出一个新的 leader,之后,为保证多个副本之间的数据一致性,其余的 follower 会先将各自的 log 文件高于 HW 的部分截掉,然后从新的 leader同步数据。
注意:
1.从新选举的leader自己不截取 其他follower截取到HW位置再向leader同步
2 HW只保证副本之间数据一致性问题,不能解决数据重复和丢失问题
消费数据一致性:
在leader和follower中kafka只暴露给消费者hw之前的数据 保证消费数据一致性
幂等性
解决消费者消费数据即不重复也不丢失
enable.idompotence 设置为 true
开启幂等性的 Producer 在初始化的时候会被分配一个 PID,发往同一 Partition 的消息会附带 Sequence Number。而Broker 端会对<PID, Partition, SeqNumber>做缓存,当具有相同主键的消息提交时,Broker 只会持久化一条。
但是 PID 重启就会变化,同时不同的 Partition 也具有不同主键,所以幂等性无法保证跨分区跨会话的 Exactly Once。
消费者
消费方式
kafka中consumer 采用 pull(拉)模式从 broker 中读取数据。
push(推)模式很难适应消费速率不同的消费者,因为消息发送速率是由 broker 决定的。它的目标是尽可能以最快速度传递消息,但是这样很容易造成 consumer 来不及处理消息,而 pull 模式则可以根据 consumer 的消费能力以适当的速率消费消息
pull 模式不足之处是,如果 kafka 没有数据,消费者可能会陷入循环中,一直返回空数据(空轮询)。针对这一点,Kafka 的消费者在消费数据时会传入一个时长参数 timeout,如果当前没有数据可供消费,consumer 会等待一段时间之后再返回,这段时长即为timeout。
分区分配策略
一个 consumer group 中有多个 consumer,一个 topic 有多个 partition,所以必然会涉及到 partition 的分配问题,即确定那个 partition 由哪个 consumer 来消费。
Kafka 有两种分配策略,一是 RoundRobin,一是 Range(默认方式)
RoundRobin 轮询方式 面向分组
当consumer消费多个topic时
把一个消费者组订阅的多个topic看作一个整体,将多个 topic中的partition按照hash重新排序,将排序后的整体结果轮询分发给消费者组
尽量做到consumer消费数据数量平均。
Range 范围模式 面向主题如果订阅多个topic 可能会带来消费者消费数据数量不对等的问题
offset的维护
由于 consumer 在消费过程中可能会出现断电宕机等故障,consumer 恢复后,需要从故障前的位置的继续消费,所以 consumer 需要实时记录自己消费到了哪个 offset,以便故障恢复后继续消费
由消费者组+topic+partition唯一决定一个offset,保证了kafka动态扩展消费者和宕机恢复后继续消费的功能。