[debezium 源码分析] debezium history topic

概述

对于mysql, debezium提供了一个history topichistory topic的配置比较特殊,过期策略为delete,过期时间非常大,一般我设置为Long - 1(只要数据不删除即可),分区个数必须为1。具体原因会在之后给出。

对于dbz mysql connector,一般来说history topic是必须的,不可省略。

History Topic 机制

dbz history topic用来存储数据库的ddl语句。

  1. dbz mysql connector会在内存中维护订阅的数据库的所有的表结构信息,具体的通过MySqlSchema这个类来维护。MySqlSchema内部含有DatabaseHistory对象,一般我们使用的是KafkaDatabaseHistory它会利用history topic保存所有的ddl;
  2. dbz如果要将获取到的数据库的行变更数据,进行正确的类型转换(表的某行变更,必定有多个字段,此行变更内容需要转换成相应的字段类型),就需要在内存中维护最新的mysql schema;MysqlSchema类内部有一个Tables对象,保存了监听数据库的表所有表结构。
  3. MysqlSchema类内部还有一个DdlParser,用来解析所有的ddl语句。通过DdlParser2中提到的内存维护的表结构能够保持最新,保证dbz交付的数据正确性。
  4. dbz任务重启时,我们必须保证内存中维护的表结构正确,否则提供的数据可能存在格式错误。dbz通过重启时读取history topic,来设置内存中维护的表结构。重启时获取history topic所有内容,当然简单理解获取的都是ddl语句,然后利用DdlParser解析来自history topicddl,更新内存中的表结构。

数据库执行ddl语句后,dbz处理ddl的流程如下

[debezium 源码分析] debezium history topic

dbz任务重启后,处理history topic和内存table的流程如下

[debezium 源码分析] debezium history topic

debezium history topic的作用基本如上,但是此机制并不完美,有几率导致dbz任务重启失败,抛出异常。

History Topic 缺点

Hisotry Topic存的ddl可能会非常多,当量多到一定程度时,dbz可能无法在规定的默认的history topic读取时间、读取次数完成历史ddl处理,导致任务启动失败。

所以必须要设置合理的参数,保证debezium正常重启。

  • database.history.kafka.recovery.poll.interval.ms; kafkaConsumer读取history topic,每次poll的时间。默认100ms;
  • database.history.kafka.recovery.attempts; kafkaConsumer最多能够读取hisotry topic的次数,默认4次。

Hisotry Topic 个人看法

我觉得history topic比一定需要存所有的ddl,我们只需要存表的最新schema。当然只是个人猜想,我之后试试 ^.^