草稿 ES数据同步方案

中间件取名

hd-sycn


架构图

请点击:架构图链接
草稿 ES数据同步方案


模块说明

业务数据

因为hd-sycn为通用且与业务分离的,故而不可以掺杂与业务耦合的复杂SQL语句(如Join语句)。业务方需提供单表数据,以供hd-sycn抽取。如果业务方的数据来自于多张表,业务方需自行创建中间表,融合join多表之后的结果,将该中间表作为数据源,提供给hd-sycn。

数据抽取

1. HA方案选型

方案 优点 缺点
zookeeper 临时节点的特性,可天然支持对节点变化的感知;
可利用临时节点作为集群节点,实现对节点宕机和上线的及时处理
相比DB方案,多引入了一层zk组件,架构复杂;
由于完全是三方组件,技术问题不可控;
db表+守护线程
用最基本的线程和数据库表来实现集群节点的健康监控,方法论和问题解决方案更丰富;
完全自研,架构简单,风险更可控;
需要开发人员手动编码来实现对集群节点健康状态的感知

综合对比:我们选择“db表+守护线程”的方案。

2.数据库表设计

时间戳偏移量

表名:offset_info

id schema table last_offset node_id
1 hdd_tms d_dispatch 2020-06-22 05:18:22 node-01

集群节点

表名:node_info

id node_id node_ip heartbeat_timestamp
1 node-01 172.168.5.18 1234567887
2 node-02 172.168.5.16 1234566799

增量批次

表名:batch_info

id batch_num schema table last_offset node_id status start_time complete_time total_count success_count fail_count create_time udpate_time
1 1000001 hdd_tms d_dispatch 2020-06-22 05:18:22 node-01 COMPLETED 2020-06-22 05:18:50 2020-06-22 05:25:50 900 850 50 2020-06-22 05:18:50 2020-06-22 05:18:50
2 1000002 hdd_tms d_dispatch 2020-06-22 05:18:22 node-01 RUNNING 2020-06-22 05:18:50 900 2020-06-22 05:18:50 2020-06-22 05:18:50

3.线程设计

增量数据抽取

  1. 每张表,都有一个专用线程做数据抽取。例如,如果当前节点被分配负责5张表的数据抽取,则应该动态分配5个线程,每个线程抽取一张表的数据。【抽取线程采用线程池】
  2. 每个线程只负责抽取一张表的数据,每次抽取需要检查当前节点是否依然负责该表;如果重平衡之后,当前节点不再负责该表,则线程return;
  3. 增量抽取为“分段”抽取,防止因为大量数据被刷导致的增量数据过多。首先统计本次抽取的时间段内,总行数是否大于阈值L(如10万)。
    3.1. 如果大于L,则将该时间段内的所有数据的id全量查出放于list(1000万个id大约占用内存70M,可进一步确认),然后每L条数据一组(首先要确认一下,10万条完整数据要占用多大内存),做in语句查询(in语句不能大于4M)。把每组的查询结果,放于kafka中。
    3.2. 如果小于L,则直接全量取出,然后一次性放于kafka;
  4. 数据抽取,应做好日志打印,能通过日志辨识出数据抽取的进度,防止假死、死锁等问题导致跑批记录长期状态为RUNNING,影响下次跑批。

重平衡感知线程

  1. 每隔X秒检查当前节点负责的表集合,对比原集合,找出新增的表,直接提交到线程池;

集群节点心跳

  1. 每个节点都会有一个后台线程,每X秒做一次心跳。心跳,即为更新node_info表中当前节点记录的heartbeat_timestamp字段为当前时间。
  2. 如果node_info表中没有当前节点的信息,则insert一条记录,heartbeat_timestamp字段为当前时间。

集群节点健康检查

  1. 每个节点都会启动一个后台线程,每隔X秒,遍历node_info表记录,检查heartbeat_timestamp字段与当前时间的距离值,如果大于阈值,则物理删除该记录。
  2. 每次遍历时,首先检查node_info表中第一条记录(即最小id记录)是否为当前节点,如果是,则继续;如果不是,则continue,等待下次定时任务。以此保证:所以节点中,只有第一个节点,会负责所有节点的健康检查。

重平衡分配

节点加入

如果有新节点加入,则会在心跳线程的第一次心跳时,执行重平衡。

  1. 检查offset_info表中是否有当前节点的记录,如果有,说明已经被其他节点的重平衡线程分配过了,无需再执行;如果offset_info表中没有当前节点记录,则触发重平衡,执行以下步骤。
  2. 获取重平衡分布式锁;二次校验步骤1,如果当前节点依然没有被分配,则执行步骤3;
  3. 获取offset_info表数据按id排序,存入数组ao,假设有m个;获取node_info表数据按id排序,存入数组an,假设有n个;遍历ao下标,将ao[i]的node_id更新为an[i mod n]。
节点删除

如果有节点宕机,则会在健康检查线程中,待所有宕机的节点都被物理删除之后,执行重平衡。

  1. 检查offset_info表中是否有刚刚删除的节点对应的记录,如果没有,说明已经被其他节点的重平衡线程分配过了,无需再执行;如果offset_info表中有刚刚删除的节点对应的记录,则触发重平衡,执行以下步骤。
  2. 获取重平衡分布式锁;二次校验步骤1,如果offset_info表中依然有刚刚删除的节点对应的记录,则执行步骤3;
  3. 获取offset_info表数据按id排序,存入数组ao,假设有m个;获取node_info表数据按id排序,存入数组an,假设有n个;遍历ao下标,将ao[i]的node_id更新为an[i mod n]。

4. 流程设计

手动全量与定时增量

手动全量往往是在新表初始化上线时,需要触发一次;在手动全量彻底结束之前,不可以提前开始定时增量任务;所以,应该在手动全量开始时,记录偏移量,在手动全量完成之后,将该偏移量入库,此时,定时增量才可以根据该偏移量,开始执行;

5. 故障处理

增量同步互斥

场景描述: 表t目前是有节点node1负责的,并且此时此刻,节点node1正在抽取表t的数据;然而在此时,节点node2加入集群,表t恰好被分配给了node2,此时node2会立即进行数据抽取,导致数据抽取被重复处理。
解决方案: nodeX在做数据抽取之前,会先检查batch_info表,根据schema+table+last_offset+status做重复校验,如果status为RUNNING,则说明其他节点正在抽取该批数据,需等待其为COMPLETED之后再行执行。同时,也要检查batch_info表中node_id对应的节点是否健康(node_info表中是否存在),如果不健康,则立即执行跑批。

ES-Adapter

一致性校验

在将消费的数据写入到ES之前,会根据ID+update_time先行校验,看ES中的数据与当前消费的数据是否一致;如果ES中无次数据,则新增;如果ES中的数据交旧,则更新;如果ES中数据较新或与当前消费数据一致,则无需任何操作;

字段不一致

针对kafka中消费的数据,字段如果与ES中index的字段不一致:针对新增情况,则要触发ES字段新增,针对其他情况,需要重建index;