草稿 ES数据同步方案
中间件取名
hd-sycn
架构图
请点击:架构图链接
模块说明
业务数据
因为hd-sycn为通用且与业务分离的,故而不可以掺杂与业务耦合的复杂SQL语句(如Join语句)。业务方需提供单表数据,以供hd-sycn抽取。如果业务方的数据来自于多张表,业务方需自行创建中间表,融合join多表之后的结果,将该中间表作为数据源,提供给hd-sycn。
数据抽取
1. HA方案选型
方案 | 优点 | 缺点 |
---|---|---|
zookeeper | 临时节点的特性,可天然支持对节点变化的感知; 可利用临时节点作为集群节点,实现对节点宕机和上线的及时处理 |
相比DB方案,多引入了一层zk组件,架构复杂; 由于完全是三方组件,技术问题不可控; |
db表+守护线程 |
用最基本的线程和数据库表来实现集群节点的健康监控,方法论和问题解决方案更丰富; 完全自研,架构简单,风险更可控; |
需要开发人员手动编码来实现对集群节点健康状态的感知 |
综合对比:我们选择“db表+守护线程”的方案。
2.数据库表设计
时间戳偏移量
表名:offset_info
id | schema | table | last_offset | node_id |
---|---|---|---|---|
1 | hdd_tms | d_dispatch | 2020-06-22 05:18:22 | node-01 |
集群节点
表名:node_info
id | node_id | node_ip | heartbeat_timestamp |
---|---|---|---|
1 | node-01 | 172.168.5.18 | 1234567887 |
2 | node-02 | 172.168.5.16 | 1234566799 |
增量批次
表名:batch_info
id | batch_num | schema | table | last_offset | node_id | status | start_time | complete_time | total_count | success_count | fail_count | create_time | udpate_time |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|
1 | 1000001 | hdd_tms | d_dispatch | 2020-06-22 05:18:22 | node-01 | COMPLETED | 2020-06-22 05:18:50 | 2020-06-22 05:25:50 | 900 | 850 | 50 | 2020-06-22 05:18:50 | 2020-06-22 05:18:50 |
2 | 1000002 | hdd_tms | d_dispatch | 2020-06-22 05:18:22 | node-01 | RUNNING | 2020-06-22 05:18:50 | 900 | 2020-06-22 05:18:50 | 2020-06-22 05:18:50 |
3.线程设计
增量数据抽取
- 每张表,都有一个专用线程做数据抽取。例如,如果当前节点被分配负责5张表的数据抽取,则应该动态分配5个线程,每个线程抽取一张表的数据。【抽取线程采用线程池】
- 每个线程只负责抽取一张表的数据,每次抽取需要检查当前节点是否依然负责该表;如果重平衡之后,当前节点不再负责该表,则线程return;
- 增量抽取为“分段”抽取,防止因为大量数据被刷导致的增量数据过多。首先统计本次抽取的时间段内,总行数是否大于阈值L(如10万)。
3.1. 如果大于L,则将该时间段内的所有数据的id全量查出放于list(1000万个id大约占用内存70M,可进一步确认),然后每L条数据一组(首先要确认一下,10万条完整数据要占用多大内存),做in语句查询(in语句不能大于4M)。把每组的查询结果,放于kafka中。
3.2. 如果小于L,则直接全量取出,然后一次性放于kafka; - 数据抽取,应做好日志打印,能通过日志辨识出数据抽取的进度,防止假死、死锁等问题导致跑批记录长期状态为RUNNING,影响下次跑批。
重平衡感知线程
- 每隔X秒检查当前节点负责的表集合,对比原集合,找出新增的表,直接提交到线程池;
集群节点心跳
- 每个节点都会有一个后台线程,每X秒做一次心跳。心跳,即为更新node_info表中当前节点记录的heartbeat_timestamp字段为当前时间。
- 如果node_info表中没有当前节点的信息,则insert一条记录,heartbeat_timestamp字段为当前时间。
集群节点健康检查
- 每个节点都会启动一个后台线程,每隔X秒,遍历node_info表记录,检查heartbeat_timestamp字段与当前时间的距离值,如果大于阈值,则物理删除该记录。
- 每次遍历时,首先检查node_info表中第一条记录(即最小id记录)是否为当前节点,如果是,则继续;如果不是,则continue,等待下次定时任务。以此保证:所以节点中,只有第一个节点,会负责所有节点的健康检查。
重平衡分配
节点加入
如果有新节点加入,则会在心跳线程的第一次心跳时,执行重平衡。
- 检查offset_info表中是否有当前节点的记录,如果有,说明已经被其他节点的重平衡线程分配过了,无需再执行;如果offset_info表中没有当前节点记录,则触发重平衡,执行以下步骤。
- 获取重平衡分布式锁;二次校验步骤1,如果当前节点依然没有被分配,则执行步骤3;
- 获取offset_info表数据按id排序,存入数组ao,假设有m个;获取node_info表数据按id排序,存入数组an,假设有n个;遍历ao下标,将ao[i]的node_id更新为an[i mod n]。
节点删除
如果有节点宕机,则会在健康检查线程中,待所有宕机的节点都被物理删除之后,执行重平衡。
- 检查offset_info表中是否有刚刚删除的节点对应的记录,如果没有,说明已经被其他节点的重平衡线程分配过了,无需再执行;如果offset_info表中有刚刚删除的节点对应的记录,则触发重平衡,执行以下步骤。
- 获取重平衡分布式锁;二次校验步骤1,如果offset_info表中依然有刚刚删除的节点对应的记录,则执行步骤3;
- 获取offset_info表数据按id排序,存入数组ao,假设有m个;获取node_info表数据按id排序,存入数组an,假设有n个;遍历ao下标,将ao[i]的node_id更新为an[i mod n]。
4. 流程设计
手动全量与定时增量
手动全量往往是在新表初始化上线时,需要触发一次;在手动全量彻底结束之前,不可以提前开始定时增量任务;所以,应该在手动全量开始时,记录偏移量,在手动全量完成之后,将该偏移量入库,此时,定时增量才可以根据该偏移量,开始执行;
5. 故障处理
增量同步互斥
场景描述: 表t目前是有节点node1负责的,并且此时此刻,节点node1正在抽取表t的数据;然而在此时,节点node2加入集群,表t恰好被分配给了node2,此时node2会立即进行数据抽取,导致数据抽取被重复处理。
解决方案: nodeX在做数据抽取之前,会先检查batch_info表,根据schema+table+last_offset+status做重复校验,如果status为RUNNING,则说明其他节点正在抽取该批数据,需等待其为COMPLETED之后再行执行。同时,也要检查batch_info表中node_id对应的节点是否健康(node_info表中是否存在),如果不健康,则立即执行跑批。
ES-Adapter
一致性校验
在将消费的数据写入到ES之前,会根据ID+update_time先行校验,看ES中的数据与当前消费的数据是否一致;如果ES中无次数据,则新增;如果ES中的数据交旧,则更新;如果ES中数据较新或与当前消费数据一致,则无需任何操作;
字段不一致
针对kafka中消费的数据,字段如果与ES中index的字段不一致:针对新增情况,则要触发ES字段新增,针对其他情况,需要重建index;