DataX运行流程-源码解读

说着前面的话:DataX的简介等这儿就不多说了。本文主要记录DataX运行流程的主要步骤,通过源码大概过一遍主*分和过程中的一些重要方法。也是自己这些天学习DataX的记录,之后应该会再完善这个。

进入主题,DataX的入口是Engine这个类,直接进去看它的main方法,开始 …

  1. Engine入口(com.alibaba.datax.core.Engine)main方法里面执行entry()方法:
    DataX运行流程-源码解读

  2. Entry()方法主要做了2件事情,解析生成configration生成一个新的Engine然后启动Engine的start()方法
    DataX运行流程-源码解读

  3. ConfigParser.parse()这个方法主要做以下三件事,然后生成一个configration用来执行start()方法:
    1) 解析job的配置信息,由启动参数指定job.json文件。
    2) 解析DataX自带配置信息,由默认指定的core.json文件。
    3) 解析读写插件配置信息,由job.json指定的reader和writer插件信息

  4. Engine.start()方法主要目的就是启动JobContainer.start()的方法:
    DataX运行流程-源码解读

  5. JobContainer.start()方法主要里面主要顺序执行如下图的方法,而最重要的就是 split(), schedule(),先简单了解下这几个方法具体是干嘛的:
    1) init()方法:涉及到根据configuration来初始化reader和writer插件,这里涉及到jar包热加载以及调用插件init()操作方法,同时设置reader和writer的configuration信息。
    2)prepare()方法:涉及到初始化reader和writer插件的初始化,保存当前classLoader,并将当前线程的classLoader设置为所给classLoader,再将当前线程的类加载器设置为保存的类加载,通过调用插件的prepare()方法实现,每个插件都有自己的jarLoader,通过集成URLClassloader实现而来。
    3)split()方法:通过adjustChannelNumber()方法调整channel个数,同时执行reader和writer最细粒度的切分,需要注意的是,writer的切分结果要参照reader的切分结果,达到切分后数目相等,才能满足1:1的通道模型;channel的计数主要是根据byte和record的限速来实现的,在split()的函数中第一步就是计算channel的大小;split()方法reader插件会根据channel的值进行拆分,但是有些reader插件可能不会参考channel的值,writer插件会完全根据reader的插件1:1进行返回;split()方法内部的mergeReaderAndWriterTaskConfigs()负责合并reader、writer、以及transformer三者关系,生成task的配置,并且重写job.content的配置
    4)schedule()方法:根据split()拆分生成的task配置分配生成taskGroup对象,根据task的数量和单个taskGroup支持的task数量进行配置,两者相除就可以得出taskGroup的数量;schdule()内部通过AbstractScheduler的schedule()执行,继续执行startAllTaskGroup()方法创建所有的TaskGroupContainer组织相关的task,TaskGroupContainerRunner负责运行TaskGroupContainer执行分配的task;taskGroupContainerExecutorService启动固定的线程池用以执行TaskGroupContainerRunner对象,TaskGroupContainerRunner的run()方法调用taskGroupContainer.start()方法,针对每个channel创建一个TaskExecutor,通过taskExecutor.doStart()启动任务
    JobContainer.start()方法如下图:
    DataX运行流程-源码解读

  6. JobContainer.start()中的init()方法,主要调用了JobContainer中的initJobReader()和initJobWriter()方法:
    DataX运行流程-源码解读
    initJobReader()和initJobWriter()方法差不多,看一下initJobReader(),主要就是根据json配置文件获取插件具体的class,然后进行加载class,获取具体的插件对象(write类似),然后执行插件的init()方法并返回这个插件:

  7. 然后看下JobContainer的split()方法,主要做三件事:
    1)adjustChannelNumber()
    2)doReaderSplit() & doWriterSplit()
    3)mergeReaderAndWriterTaskConfigs
    DataX运行流程-源码解读
    1)获取需要多少个channel数,通过adjustChannelNumber() 设置 needChannelNumber,这个方法做了3件事来确定这个needChannelNumber值:
    • needChannelNumberByByte = (globalLimitedByteSpeed / channelLimitedByteSpeed)
    needChannelNumberByRecord = (globalLimitedRecordSpeed / channelLimitedRecordSpeed)
    • 上面两个值取最小赋给 needChannelNumber
    • 如果这时候 needChannelNumber 已经有值了就返回,没有值就找 job.setting.speed.channel 赋值给 needChannelNumber,不然报错。
    DataX运行流程-源码解读
    2)调用reader和writer插件中的split方法,获取readTask和writerTask,而这个split方法最终调用了ReaderSplitUtil.doSplit()方法,大致意思如下:
    • 先判断是tableMode还是querySqlMode
    • 如果是querySqlMode,那么从配置文件获取querySql然后逐一配置到List splittedConfigs中去
    • 如果是tableMode,判断是否需要切分,如果需要切分那么先计算出 eachTableShouldSplittedNumber = ceil(1.0 * adviceNumber / tableNumber),如果只有一张表eachTableShouldSplittedNumber = eachTableShouldSplittedNumber * 5,然后根据这个 eachTableShouldSplittedNumber 这个值调用SingleTableSplitUtil.splitSingleTable(tempSlice, eachTableShouldSplittedNumber) 切分并得到所有分片的配置加到splittedConfigs中去
    • 如果无需切分,那么就从配置信息获取jdbc,column,where等组成配置加到splittedConfigs中去
    DataX运行流程-源码解读
    3)一一对应合并readerTask,writerTask,transformer并把结果赋值给 job.content以便下一步schedule()调用

DataX运行流程-源码解读
8. split()方法执行完之后JobContainer执行schedule()方法,这个方法中主要做2件事:分组并启动。
DataX运行流程-源码解读
1)先来看下分组,分组是调用了assignFairly()这个方法,在这个方法之前,先调整needChannelNumber的值,从上一步split()方法获得的needChannelNumber和task的数量,这两个值取一个小的重新赋值给needChannelNumber。
然后进入这个assignFairly(),可以看到这个里面还做了3件事:第一,确定taskGroupNumber,第二,做分组分配,第三,做分组优化。
DataX运行流程-源码解读
进而,再看一下这个doAssign()方法,这个方法主要就是根据传入的resourceMarkAndTaskIdMap和taskGroupNumber来把task分配到taskGroup中去。
DataX运行流程-源码解读
再看下这段逻辑的示意图:
DataX运行流程-源码解读
最后,再来看下assignFairly()中的adjustChannelNumPerTaskGroup(),这个方法主要就是把整除之后多余的余数个task的组多加一个channel,从而使整个分配最优化。
DataX运行流程-源码解读

2)然后,看下JobContainer.schedule()方法分配完之后做的事情,就是调用StandAloneScheduler的schedule()方法,也就是AbstractScheduler.schedule(),而这个方法中最主要的就是调用了startAllTaskGroup(configurations)来启动所有任务组。
DataX运行流程-源码解读
接着,再来看下startAllTaskGroup()这个方法(ProcessInnerScheduler. startAllTaskGroup())。这个方法中先启动了一个固定的线程池,然后启动逐一执行taskGroupContainer。
DataX运行流程-源码解读

接着我们来看下被启动的TaskGroupContainer,也就是看下这个类的start()方法:
DataX运行流程-源码解读

显然这个start()方法中最重要的就是在红色框出来的代码,即taskExecutor.doStart()方法,在这里启动了真正读写的task了,然后看下这个doStart()方法:
DataX运行流程-源码解读
我们看到了 this.writerThread.start() 和this.readerThread.start() 也就是调用了插件Writer.Task和Reader.Task开始读写了。

  1. 至此,我们可以看下Reader.Task是怎么读写并与Channel如何交互的,举个例子,我们可以看一下PostgresqlReader.java这个类中的startRead()方法,而这个方法中调用了this.commonRdbmsReaderSlave.startRead()方法
    DataX运行流程-源码解读
    顺着代码看下 this.commonRdbmsReaderSlave.startRead()
    DataX运行流程-源码解读
    而在这个方法中,最重要的方法就是this.transportOneRecord(),在看下这个方法:
    DataX运行流程-源码解读
    在这个CommonRdbmsReader. transportOneRecord()方法中最重要的是 recordSender.sendToWriter(record),显然我们要去看下sendToWriter,但在此之前我们先搞清楚这个recordSender是谁。我们可以看到这里的recordSender是 BufferedRecordExchanger,在TaskExcutor初始化的时候设置的也就是generateRunner的时候:
    DataX运行流程-源码解读

现在可以看下 BufferedRecordExchanger. sendToWriter()这个方法,而在这个方法中,最主要的就是flush()方法:

DataX运行流程-源码解读
而在这个flush方法中调用了Channel的pushAll():
DataX运行流程-源码解读

而再进入这个pushAll()方法:

DataX运行流程-源码解读
这个statPush()方法主要就是限速所在了,其实限速的逻辑就是:
1) channel初始化的时候有两个值byteSpeed和recordSpeed,这两个值就是每秒限制的byte数和每秒限制的record条数:
DataX运行流程-源码解读
2) 通过这两个参数,通过比较和判断计算出需要休眠的时间,从而达到限速,然后再来看statPush()

DataX运行流程-源码解读

至此,大致先就这样,结束。