Kafka的幂等性与事务性理解

    最近在深入理解Flink的Exactly-Once,发现Flink Checkpoint只能保障Flink程序内部的一致性,无法保证Sink到外部系统的Exactly-Once语义。但是Sink到外部如果实现了TwoPhaseCommitSinkFunction这个抽象类就能实现端到端的Exactly-Once语义,而Kafka刚好也实现了这个这个类,所以先来研究下Kafka的Exactly-Once是怎么实现的。

    在Producer向Kafka发送消息的时候,如果消息成功被写入到日志文件里面,消息就不会丢失了(多副本机制)。但是如果中间发生网络中断等异常,Producer没有接受到Ack消息,无法判断消息是否已经提交了,此时选择重复发送消息到Kafka,这样就会造成数据的重复写入(At Least Once)。因此在0.11.0.0版本中,Kafka引入了幂等性和事务性,以此来实现Exactly-Once语义。

 

Kafka幂等性:

幂等,就是指多接口的多次调用所产生的结果和只调用一次是一致的。没有幂等性的情况下就会重复发送数据,如下图所示:

Kafka的幂等性与事务性理解

    Kafka的幂等性机制能保证单个分区不会重复写入数据,而实现幂等性的核心就是引入了producer id 和 sequence number这两个概念。

    Kafka内部会自动为每个Producer分配一个producer id(PID),broker端会为producer每个Partition维护一个<PID,Partition> -> sequence number映射。sequence number时从0开始单调递增的。

对于新接受到的消息,broker端会进行如下判断:

  1. 如果新消息的sequence number正好是broker端维护的<PID,Partition> -> sequence number大1,说broker会接受处理这条消息。
  2. 如果新消息的sequence number比broker端维护的sequence number要小,说明时重复消息,broker可以将其直接丢弃
  3. 如果新消息的sequence number比broker端维护的sequence number要大过1,说明中间存在了丢数据的情况,那么会响应该情况,对应的Producer会抛出OutOfOrderSequenceException。

 

Producer如何开启幂等性:

    Properties.put(“enable.idempotence”,true);

    如果使用的深入的话,还需要修改下producer端的retries、acks、max.in.flignt.requests.per.connection这几个参数,当然默认值已经能够应付大部分情况了,也不太建议修改。

 

Kafka事务性:

    Kafka事务性主要是为了解决幂等性无法跨Partition运作的问题,事务性提供了多个Partition写入的原子性,即写入多个Partition要么全部成功,要么全部失败,不会出现部分成功部分失败这种情况。Flink正是基于Kafka的事务性,实现了端到端的Exactly Once语义 (ps: 当然FLink配合其他系统也可以实现Exactly Once语义,比如参考链接中的MySQL,我这里想说明的是source和sink都是Kafka的情况下,可以做到端到端Exactly Once)。

在幂等性<producer id, Partition> -> sequence number的基础之上,Kafka还引入了如下这些角色来实现事务性:
    1.    TransactionalId 
    2.    _transaction_state(Topic)
    3.    Producer epoch
    4.    ControlBatch (又叫Control Message、Transaction Marker) 
    5.    TransactionCoordinator

    TransactionalId是需要用户显示进行设置的,用于唯一标识某个producer。这里为啥要重新引入一个TransactionalId而不是使用幂等性中引入的producer id呢?
    这是因为producer id 在producer每次重启的时候都会更新为一个新值,如果没有TransactionalId,那么producer在Fail恢复后就不能abort上次未完成的事务了。TransactionalId可以唯一标识一个事务操作,便于这个事务的所有操作都能在用一个TransactionCoordinator(负责事务真正执行的Kafka内部角色) 上进行处理。
    
    事务日志记录在_transaction_state Topic中。TransactionCoordinator如果发生异常进行恢复或者新选举时,可以通过读取_transaction_state 中的事务日志信息,来恢复其状态信息。

    Producer epoch配合TransactionalId用于唯一标识最新的那个producer,它是一个单调递增的值,在每次初始化事务的时候递增(在KafkaProducer.initTransactions()方法中,每个producer通过transactionId获取producer id的时候同时获取到这个值)。它的作用如下,如果有两个producer使用了相同的transactionId,那么比较旧的那个producer会抛出异常,避免事务干扰。

    ControlBatch是producer产生的并写入到Topic的特殊消息,ControlBatch一共有两种类型:COMMIT和ABORT,分别表示事务已经成功提交或者被成功中止。

    Producer 在持久化数据时跟之前一样,按照条件持久化到硬盘(数据会有一个标识,标识这条或这批数据是不是事务写入的数据),当收到 Transaction Marker 时,把这个 ControlBatch(Transaction Marker)数据也直接写入这个 Partition 中,这样在处理 Consumer 消费时,就可以根据 ControlBatch信息做相应的处理。真正执行时,Producer 只需要告诉 TransactionCoordinator 当前事务可以 commit,然后再由 TransactionCoordinator 来向其涉及到的 Topic-Partition 的 leader 发送 Transaction Marker 数据,这里减轻了 Client 的压力,而且 TransactionCoordinator 会做一些优化,如果这个目标 Broker 涉及到多个事务操作,是可以共享这个 TCP 连接的;

    对于Consumer来说ControlBatch是不可见,它们是用来让broker告知consumer之前拉取的消息是否被原子性提交,如下如所示:

Kafka的幂等性与事务性理解

 

    TransactionCoordinator 是每个producer在broker端都会对应分配到的这么一个角色,负责事务的真正执行,并跟进记录事务当前所处的状态。

 

Producer如何开启事务:

程序必须给producer显式指定唯一的transactionId:

    properties.put("transactionId"," ***id*** ")

    程序默认会将幂等性设置为true,无需用户手动设置。

同时Consumer端要注意设置一个与事务性相关的参数:

    isolation.level    默认值为read_uncommitted,意思是Consumer应用可以消费到未提交的事务。如果设置成 read_committed,Consumer应用就不能消费到未提交的事务。

    同时要将enable.auto.commit参数设置为false

producer事务使用代码示例如下:

/**

     * 在一个事务内,即有生产消息又有消费消息

     */

    public void consumeTransferProduce() {

        // 0. 构建生产者属性

        Properties pro = new Properties()

        pro.put(...key序列化设定...);

        pro.put(...value序列化设定...);

        pro.put("transactionId"," ***id*** ")

        // 1.构建生产者

        Producer producer = new Producer(properties);

        // 2.初始化事务(生成productId),对于一个生产者,只能执行一次初始化事务操作

        producer.initTransactions();

        while (true) {

            // 3.开启事务

            producer.beginTransaction();

            List<String> records = readCsvFile(path);

            try {

                for (String record : records) {

                      producer.send(new ProducerRecord<String, String>("test",record ));

                }

                // 4.事务提交

                producer.commitTransaction();

            } catch (Exception e) {

                // 4.放弃事务

                producer.abortTransaction();

            }

        }

    }

 

TransactionCoordinator 执行事务操作时,整体流程如下图所示:
Kafka的幂等性与事务性理解

 

1. 查找Tranaction Corordinator

    Producer向任意一个brokers发送 FindCoordinatorRequest请求来获取Transaction Coordinator的地址,返回对应的node_id,host,port信息。

2. 初始化事务 initTransaction

    Producer发送InitpidRequest给Transaction Coordinator,获取pid。同时Transaction Coordinator会在Transaciton Log中记录这<TransactionId,pid>的映射关系这一消息。同时消息内部还包含了transactio_status事务状态信息:Empty/Ongoing/PrepareCommit/PrepareAbort/CompleteCommit/CompleteAbort/Dead.

    另外,它还会做两件事:

  • 恢复(Commit或Abort)之前的Producer未完成的事务
  • 对PID对应的epoch进行递增,保证producer事务的唯一性。

只要开启了幂等特性即必须执行InitpidRequest,而无须考虑该Producer是否开启了事务特性。

3. 开始事务beginTransaction

    执行Producer的beginTransacion(),它的作用是Producer在本地记录下这个transaction的状态为开始状态。这个操作并没有通知Transaction Coordinator,因为Transaction Coordinator只有在Producer发送第一条消息后才认为事务已经开启。

4. read-process-write流程

    一旦Producer开始发送消息,Transaction Coordinator会将该<Transaction, Topic, Partition>存于Transaction Log内,并将其状态置为开始。另外,如果该<Topic, Partition>为该事务中第一个<Topic, Partition>,Transaction Coordinator还会启动对该事务的计时(每个事务都有自己的超时时间)。

    在注册<Transaction, Topic, Partition>到Transaction Log后,生产者发送数据,虽然没有还没有执行commit或者abort,但是此时消息已经保存到Broker上了。即使后面执行abort,消息也不会删除,只是更改状态字段标识消息为abort状态。

5. 事务提交或终结 commitTransaction/abortTransaction

在Producer执行commitTransaction/abortTransaction时,Transaction Coordinator会执行一个二阶段提交:

  • 第一阶段,将Transaction Log内的该事务状态设置为PREPARE_COMMITPREPARE_ABORT
  • 第二阶段,将Transaction Marker写入该事务涉及到的所有消息(即将消息标记为committedaborted)。这一步骤Transaction Coordinator会发送给当前事务涉及到的每个<Topic, Partition>的Leader,Broker收到该请求后,会将对应的Transaction Marker控制信息写入日志。

    一旦Transaction Marker写入完成,Transaction Coordinator会将最终的COMPLETE_COMMITCOMPLETE_ABORT状态写入Transaction Log中以标明该事务结束。

 

    如上所述的事务性是能保证单个Producer的事务性。但是在Flink中会创建多个Producer,这个时候配合Flink中的Checkpoint机制,能保证多个Producer的事务性,实现端到端的Exactly-Once语义
    虽然Producer端保证了事务性,但是Consumer端很难保证一个已经commit的事务所有msg都会被消费,有以下几个原因:
        1. 对于 compacted topic,在一个事务中写入的数据可能会被新的值覆盖;
        2. 一个事务内的数据,可能会跨多个 log segment,如果旧的 segmeng 数据由于过期而被清除,那么这个事务的一部分数据就无法被消费到了;
        3. Consumer 在消费时可以通过 seek 机制,随机从一个位置开始消费,这也会导致一个事务内的部分数据无法消费;
        4. Consumer 可能没有订阅这个事务涉及的全部 Partition。

 

    对于一个 Partition 而言,offset 小于 LSO (Last Stable Offset)的数据,全都是已经确定的数据,这个主要是对于事务操作而言,在这个 offset 之前的事务操作都是已经完成的事务(已经 commit 或 abort),如果这个 Partition 没有涉及到事务数据,那么 LSO 就是其 HW(水位)。多个producer同时提交事务时,可能会出现先提交的事务未完成,但是事务已经完成的情况,此时后提交的事务的数据也不能被读取到,因为LSO结合事务时,保证之前的数据都是已提交的事务,所以此时会造成数据被消费的延迟,所以在实际的生产场景中,尽量避免 long transaction 这种操作,而且 long transaction可能也会容易触发事务超时

 

参考:

    https://www.cnblogs.com/smartloli/p/11922639.html (Kafka幂等性图解)

    https://blog.csdn.net/u011669700/article/details/80000744 (Kafka部分参数介绍)

    https://www.jianshu.com/p/5bdd9a0d7d02 (Flink写入MySQL实现Exactly-Once)

    https://blog.51cto.com/simplelife/2401411(Flink的二阶段提交)
    https://www.cnblogs.com/createweb/p/11971846.html (Flink Exactly Once写入Kafka出错)

    https://www.jianshu.com/p/64c93065473e(Kafka事务流程图)

    https://blog.csdn.net/mlljava1111/article/details/81180351(事务 java Demo)

    https://blog.csdn.net/muyimo/article/details/91439222(Kafka事务性分析)

    https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/connectors/kafka.html
    https://www.confluent.io/blog/transactions-apache-kafka/