Kafka之生产者

生产者以什么样的策略把消息发到各个分区?

1.分区原因

(1)方便在集群中扩展,一个topic可以有多个partition组成,而每个partition可以通过调整以适应它所在的机器

(2)可以提高并发,因为可以以partition为单位读写

2.分区原则

我们需要将生产者发送的数据封装成一个ProducerRecord对象。
Kafka之生产者
(1)指明partition的情况下,直接将指明的值作为partition值;

(2)没指明partition,但有key的值,将key的hash值与topic的partition数进行取余得到partition值

(3)既没有partition又没有key的值时,第一次调用时随机生成一个整数(后面每次调用在这个整数上自增),将这个值与topic可用的partition总数取余得到partition值,也就是常说的round-robin算法

3.如何保证数据发送成功?

当topic的每个partition收到producer发送的数据后,都需要向producer发送ack(acknowledgement 确认收到),如果producer收到ack后,就会进行下一轮的发送,否则重新发送数据。

4.何时适合发送ack?

确保全部的follower同步完成之后,才可以发ack,这样能保证leader挂掉之后,能在follower中选举出新的leader。(follower 同步方案还有第二种,半数副本以上完成同步,也可以发ack,但缺点是选举新的leader时,容忍n台节点故障,但需要2n+1个副本;而全部同步完成选新leader,容忍n台节点故障,只需要n+1个副本。不过第二种方案会造成大量数据的冗余,第一种虽然网略延迟高,但对kafka影响较小)

5.全部副本同步过程中,有一个follower迟迟不能进行同步,怎么办?

leader维护了一个动态的in-sync replica set (ISR),意为和leader保持同步的follower集合。当ISR中的follower完成数据同步之后,leader就会给producer发送ack。如果follower长时间未向leader同步数据,则该follower将被踢出ISR,该时间阈值由replica.lag.time.max.ms参数设定。leader发生故障之后,就会从ISR中选举新的leader。
Kafka之生产者

6.ack应答机制

对某些不太重要的数据,对数据的可靠性要求不是很高,能容忍数据的少量丢失,所以没必要等ISR中的follower全部接收成功。

kafka提供了三种可靠级别

acks参数配置:

0:producer不等broker的ack,这样延迟最低,但broker接收到还没写入磁盘就已经返回,当broker故障时有可能丢失数据
1:producer等待broker的ack,leader落盘成功后返回ack。如果follower还没同步,但这个时候leader返回了ack,并且leader故障,此时producer已经收到ack,默认发送成功,不会继续发送上一条信息,这就导致数据丢失
-1:prodicer等待broker的ack,partition的leader和follower全部落盘成功后才返回ack。如果在follower同步之后,broker发送ack之前,leader出现故障,会从follower中选举一个leader,但是此时producer没收到ack,会重新发送上条信息,但是副本已经同步完故障前的leader了,这样就会造成数据重复

7.故障处理细节

Kafka之生产者
(1)follower故障

follower发生故障会被临时踢出ISR, 待该follower恢复后,会读取本地磁盘上次记录的HW, 并将log文件高于HW的部分截取掉,从HW开始向leader进行同步。等该follower的LEO大于等于该partition的HW,即follower追上leader之后,就可以重新加入ISR了

(2)leader故障

leader发生故障,会从ISR中选出一个新的leader,为保证副本间数据一致性,其余follower会先将各自的log文件高于HW的部分截掉,然后从新的leader同步数据。
注意:这只能保证副本之间的数据一致性,并不能保证数据不丢失或不重复

8.生产者幂等性(去重)

有些非常重要的信息,比如交易数据,要求数据既不重复也不能丢失,即Exactly Once语义

将服务器的ACK级别设置为-1,可以保证Producer到Server之间不会丢失数据,即At Least Once语义

0.11版本的Kafka,引入了一项重大特性:幂等性。所谓的幂等性就是指Producer不论向Server发送多少次重复数据,Server端都只会持久化一条。

Kafka的Exactly Once语义:

At Least Once + 幂等性 = Exactly Once

如何启用幂等性:

只需要将Producer的参数中enable.idompotence设置为true即可(kafka自动将acks属性设为-1,并将retries属性设为Integer.MAX_VALUE。)。

Kafka的幂等性实现其实就是将原来下游需要做的去重放在了数据上游。

开启幂等性的Producer在初始化的时候会被分配一个PID,发往同一Partition的消息会附带Sequence Number。而Broker端会对<PID, Partition, SeqNumber>做缓存,当具有相同主键的消息提交时,Broker只会持久化一条。

Producer重启,PID就会变化,所以幂等性只能保证单会话的Exactly Once

9.生产者事务

作用:开启事务后,生产者能实现跨会话的幂等性,也就是说能实现不同分区、不同topic发送的多条信息的原子性。

为了实现跨分区跨会话的事务,需要引入一个全局唯一的Transaction ID(存在zookeeper上边),并将Producer获得的PID和Transaction ID绑定。这样当Producer重启后就可以通过正在进行的Transaction ID获得原来的PID。