Kafka 异常对应用程序造成的影响及应用程序处理kafka发送解耦合的思路

多线程解析文件并发送kafka的应用程序发生故障的原因和解决方案:

 

1.故障发生的背景:

                中国本地服务器上部署了XXXMQ(一种Java编写的可实时传输文件的MQ)服务,国外服务器会实时的通过MQ发送需要解析的文件到本地。

本地服务器上部署了两个应用程序(应用程序1 :对外提供的 Webservice ,应用程序2:解析程序 ImportData)。本次出故障的是解析程序ImportData(以下简称解析程序)。

 

2. 解析程序的实现原理:

              程序启动时开启多线程和定时任务。当服务器的work目录有文件时先通过定时任务转移到queue目录,与此同时将文件放到阻塞队列中;

              多线程从阻塞队列中获取文件,解析到数据库之后,将解析成功的文件移到backup目录,失败的移到error目录;

              解析成功的Data Object会根据不同业务规则分别发送到KafKa 服务器对应不同的Topic上(一个线程中进行); --- 故障点

             前置机上部署不同的Kafka Consumer 去消费不同Topic 上的Data Object,插入前置机的数据库,前置机再部署webservice 对外提供服务。

 

3.上述标黄的故障点发生的原因:

                由于解析到总部数据库和发送Kafka 的操作在同一个线程中进行,当kafka客户端在连接服务端的时候,

有可能会抛出TimeoutException (Kafka 的某台机器宕机或者某个broker节点down 掉),默认会阻塞当前线程1分钟。以此造成文件的积压越来越大。

我模拟制造了kafka 内部的一个TimeoutException,work中放了6个文件,启动2个线程,文件解析总时长需要3分钟。

第一个文件开始时间:

Kafka 异常对应用程序造成的影响及应用程序处理kafka发送解耦合的思路

第一个文件超时时间:

Kafka 异常对应用程序造成的影响及应用程序处理kafka发送解耦合的思路

最后一个文件完成时间:

Kafka 异常对应用程序造成的影响及应用程序处理kafka发送解耦合的思路

 

4.解决方案:

a.  总部数据解析入库后,单独起一个独立的线程去向KafKa 发送数据 ,由于KafKa 内部是线程安全的,相比多线程,可能单线程的处理效率更高,占用系统资源更少。(具体源码分析可参照参考资料1)

     如果发现要发送的数据积压或者kafka服务器连接异常,将所有未发送完成的数据,实时写入到一张临时表中。待KafKa 连接恢复正常后,从临时表取出数据继续发送,发完之后立即删除即可。

     如果出现业务高峰期kafka 宕机或者kafka超时连接1个小时以上的情况,直接将临时表的job停掉即可。发邮件通知所有服务中心全部切换到总部webserice。

b.  多线程将数据解析完之后,会将文件保存在backup 中,单独起一个job从backup中拿文件,解析后根据不同服务中心发送到Kafka服务端。这样需要考虑对backup的检索效率。

c.  多线程在解析文件之前,将work目录文件先拷贝一份到work1中。work1中的文件单独做kafka的发送。

      这样相当于同一个文件解析两次,对系统资源消耗较大;如果出现kafka 宕机,也会造成work1的文件积压,积压之后再去解析,若超过1个小时,前置机就无法正常提供服务了。

d.  多线程直接解析入库,再从数据库查询需要发送的数据给kafka服务端。(同一时间段内会持续的对数据库同一张表造成读写,高峰期无法预知数据库压力。而且现在的表需要加字段,标记哪些数据未发送完成,

对于未发送成功的,如若kafka 宕机,重复查询的量加大,加上各服务中心切换过来的压力(最坏情况),对目前的数据库是一个很大的考验)。

 

综上,请各位大佬决定采用哪种方案比较合理。本人菜鸟一个,不过还是倾向于方案a。

 

参考资料:https://www.cnblogs.com/dafanjoy/p/10292875.html (这个作者真的大牛,膜拜~~)

https://blog.****.net/lipeng_bigdata/article/details/51112870