[debezium 源码分析] debezium history topic
概述
对于mysql
, debezium
提供了一个history topic
,history topic
的配置比较特殊,过期策略为delete
,过期时间非常大,一般我设置为Long - 1
(只要数据不删除即可),分区个数必须为1
。具体原因会在之后给出。
对于dbz mysql connector
,一般来说history topic
是必须的,不可省略。
History Topic 机制
dbz history topic
用来存储数据库的ddl
语句。
-
dbz mysql connector
会在内存中维护订阅的数据库的所有的表结构信息,具体的通过MySqlSchema
这个类来维护。MySqlSchema
内部含有DatabaseHistory
对象,一般我们使用的是KafkaDatabaseHistory
,它会利用history topic
保存所有的ddl
; -
dbz
如果要将获取到的数据库的行变更数据,进行正确的类型转换(表的某行变更,必定有多个字段,此行变更内容需要转换成相应的字段类型),就需要在内存中维护最新的mysql schema
;MysqlSchema
类内部有一个Tables
对象,保存了监听数据库的表所有表结构。 -
MysqlSchema
类内部还有一个DdlParser
,用来解析所有的ddl
语句。通过DdlParser
,2
中提到的内存维护的表结构能够保持最新,保证dbz
交付的数据正确性。 - 当
dbz
任务重启时,我们必须保证内存中维护的表结构正确,否则提供的数据可能存在格式错误。dbz
通过重启时读取history topic
,来设置内存中维护的表结构。重启时获取history topic
所有内容,当然简单理解获取的都是ddl
语句,然后利用DdlParser
解析来自history topic
的ddl
,更新内存中的表结构。
数据库执行ddl
语句后,dbz
处理ddl
的流程如下
dbz
任务重启后,处理history topic
和内存table
的流程如下
debezium history topic
的作用基本如上,但是此机制并不完美,有几率导致dbz
任务重启失败,抛出异常。
History Topic 缺点
Hisotry Topic
存的ddl
可能会非常多,当量多到一定程度时,dbz
可能无法在规定的默认的history topic
读取时间、读取次数完成历史ddl
处理,导致任务启动失败。
所以必须要设置合理的参数,保证debezium
正常重启。
-
database.history.kafka.recovery.poll.interval.ms
;kafkaConsumer
读取history topic
,每次poll
的时间。默认100ms
; -
database.history.kafka.recovery.attempts
;kafkaConsumer
最多能够读取hisotry topic
的次数,默认4
次。
Hisotry Topic 个人看法
我觉得history topic
比一定需要存所有的ddl
,我们只需要存表的最新schema
。当然只是个人猜想,我之后试试 ^.^
。