datax安装+配置

 1、下载压缩包
下载页面地址:https://github.com/alibaba/DataX
不要在【Clone or download】处下载,那里下载的是源码;对于Java不是很在行的人来说,自行编译显得有点困难。
而是在:【Quick Start】--->【Download DataX下载地址】进行下载。下载后的包名:datax.tar.gz。解压后{datax}目录下有{bin  conf  job  lib  log  log_perf  plugin  script  tmp}几个目录。
2、安装
将下载后的压缩包直接解压后可用,前提是对应的java及python环境满足要求。
System Requirements:
Linux
JDK(1.6以上,推荐1.6)
Python(推荐Python2.6.X)
Apache Maven 3.x (Compile DataX)
3、参考文档
具体安装部署及使用文档可以参考【Quick Start】部分。
json文档配置可以参考[Support Data Channels]里各接口读写配置说明。
其他的一些说明也可以在此文档中能够找到。
4、运行作业
$ cd  {YOUR_DATAX_HOME}/bin
$ python datax.py {YOUR_JOB.json}
 自检脚本:python {YOUR_DATAX_HOME}/bin/datax.py {YOUR_DATAX_HOME}/job/job.json
 
二、配置示例:从stream读取数据并打印到控制台

(1)、第一步、创建创业的的配置文件(json格式)
可以通过命令查看配置模板:python datax.py -r {YOUR_READER} -w {YOUR_WRITER}
cd /home/installed/datax/bin
  python datax.py -r streamreader -w streamwriter
 

案例:datax mysql 和 mysql之间相互导入

DataX MySQLWriter

1 快速介绍
MysqlWriter 插件实现了写入数据到 Mysql 主库的目的表的功能。在底层实现上, MysqlWriter 通过 JDBC 连接远程 Mysql 数据库,并执行相应的 insert into … 或者 ( replace into …) 的 sql 语句将数据写入 Mysql,内部会分批次提交入库,需要数据库本身采用 innodb 引擎。
MysqlWriter 面向ETL开发工程师,他们使用 MysqlWriter 从数仓导入数据到 Mysql。同时 MysqlWriter 亦可以作为数据迁移工具为DBA等用户提供服务。

2 实现原理
MysqlWriter 通过 DataX 框架获取 Reader 生成的协议数据,根据你配置的 writeMode 生成
• insert into…(当主键/唯一性索引冲突时会写不进去冲突的行)
或者
• replace into…(没有遇到主键/唯一性索引冲突时,与 insert into 行为一致,冲突时会用新行替换原有行所有字段) 的语句写入数据到 Mysql。出于性能考虑,采用了 PreparedStatement + Batch,并且设置了:rewriteBatchedStatements=true,将数据缓冲到线程上下文 Buffer 中,当 Buffer 累计到预定阈值时,才发起写入请求。

注意:目的表所在数据库必须是主库才能写入数据;整个任务至少需要具备 insert/replace into…的权限,是否需要其他权限,取决于你任务配置中在 preSql 和 postSql 中指定的语句。
 

读(8.3)写(8.253)

使用命令生成mysql导入导出的json模板, 修改模板

{

    "job": {

        "setting": {

            "speed": {

                "channel": 500

            }

        },

        "content": [

            {

                "reader": {

                    "name": "mysqlreader",

                    "parameter": {

                        "username": "mAA",

                        "password": "NA43ajog9bGYQk",

                        "column": ["*"],

                        "connection": [

                           {

                              "table": ["app_sap_source "],

                              "jdbcUrl": ["jdbc:mysql://127.0.0.1:3306/meeting20200309?useUnicode=true&characterEncoding=utf8"]

                           }

                        ]

                    }

                },

                "writer": {

                    "name": "mysqlwriter",

                    "parameter": {

                        "writeMode": "insert",

                        "username": "mosh",

                        "password": "NA43ajog9bGYQk",

                        "column": ["*"],

                        "connection": [

                            {

                               "jdbcUrl": "jdbc:mysql://192.168.8.253:3306/meeting20200311",

                               "table": ["app_sap_source"]

                            }

                        ]

                      }

                    }

                }

        ]

    }

}

 

参数说明:

* jdbcUrl

    * 描述:描述的是到对端数据库的JDBC连接信息,使用JSON的数组描述,并支持一个库填写多个连接地址。之所以使用JSON数组描述连接信息,是因为阿里集团内部支持多个IP探测,如果配置了多个,MysqlReader可以依次探测ip的可连接性,直到选择一个合法的IP。如果全部连接失败,MysqlReader报错。 注意,jdbcUrl必须包含在connection配置单元中。对于阿里集团外部使用情况,JSON数组填写一个JDBC连接即可。

jdbcUrl按照Mysql官方规范,并可以填写连接附件控制信息。具体请参看Mysql官方文档。

    * 必选:是

    * 默认值:无

* username

    * 描述:数据源的用户名

    * 必选:是

    * 默认值:无

* password

    * 描述:数据源指定用户名的密码

    * 必选:是

    * 默认值:无

* table

    * 描述:所选取的需要同步的表。使用JSON的数组描述,因此支持多张表同时抽取。当配置为多张表时,用户自己需保证多张表是同一schema结构,MysqlReader不予检查表是否同一逻辑表。注意,table必须包含在connection配置单元中。

    * 必选:是

    * 默认值:无

* column

    * 描述:所配置的表中需要同步的列名集合,使用JSON的数组描述字段信息。用户使用*代表默认使用所有列配置,例如['*']。

支持列裁剪,即列可以挑选部分列进行导出。

支持列换序,即列可以不按照表schema信息进行导出。

支持常量配置,用户需要按照Mysql SQL语法格式: ["id", "`table`", "1", "'bazhen.csy'", "null", "to_char(a + 1)", "2.3" , "true"] id为普通列名,`table`为包含保留在的列名,1为整形数字常量,'bazhen.csy'为字符串常量,null为空指针,to_char(a + 1)为表达式,2.3为浮点数,true为布尔值。

    * 必选:是

    * 默认值:无

* splitPk

    * 描述:MysqlReader进行数据抽取时,如果指定splitPk,表示用户希望使用splitPk代表的字段进行数据分片,DataX因此会启动并发任务进行数据同步,这样可以大大提供数据同步的效能。

推荐splitPk用户使用表主键,因为表主键通常情况下比较均匀,因此切分出来的分片也不容易出现数据热点。

目前splitPk仅支持整形数据切分,不支持浮点、字符串、日期等其他类型。如果用户指定其他非支持类型,MysqlReader将报错!

* 如果splitPk不填写,包括不提供splitPk或者splitPk值为空,DataX视作使用单通道同步该表数据。

    * 必选:否

    * 默认值:空

* where

    * 描述:筛选条件,MysqlReader根据指定的column、table、where条件拼接SQL,并根据这个SQL进行数据抽取。在实际业务场景中,往往会选择当天的数据进行同步,可以将where条件指定为gmt_create > $bizdate 。注意:不可以将where条件指定为limit 10,limit不是SQL的合法where子句。

where条件可以有效地进行业务增量同步。如果不填写where语句,包括不提供where的key或者value,DataX均视作同步全量数据。

    * 必选:否

    * 默认值:无

* querySql

    * 描述:在有些业务场景下,where这一配置项不足以描述所筛选的条件,用户可以通过该配置型来自定义筛选SQL。当用户配置了这一项之后,DataX系统就会忽略table,column这些配置型,直接使用这个配置项的内容对数据进行筛选,例如需要进行多表join后同步数据,使用select a,b from table_a join table_b on table_a.id = table_b.id

当用户配置querySql时,MysqlReader直接忽略table、column、where条件的配置,querySql优先级大于table、column、where选项。

    * 必选:否

    * 默认值:无

*writeMode

    *描述:写入目标数据表的模式,可选项: replace(替换),update(更新),insert(插入)

https://www.cnblogs.com/spicy/p/11155510.html

3 据库表和数据准备: 

datax安装+配置

 

4,执行插入命令: python datax.py  mysql2mysql.json 

 

5。查看目标表数据:

datax安装+配置

 

 

6,如果再次执行一次,就会报脏数据的错误,因为上面你的writemode使用的是insert。

datax安装+配置

 

7。 如果在源数据表增加一条数据,配置不变得情况下,再次执行。前面4条旧数据不会更新到目标表,而新增的这条数据会新增

    来源表新增数据:

datax安装+配置

执行以后。目标表数据:

datax安装+配置

 

8.如果保证更改旧数据和新增的数据 都能同时更新到目标表,writemode使用 update

                配置文件:

            datax安装+配置

            

            来源数据表:

        datax安装+配置

        

        目标数据表:

        datax安装+配置

 

    9, 如果writemode 是replace,应该是全量替换的意思: 这个时候导入就不会报错。

        配置文件:

datax安装+配置

    来源数据表:

datax安装+配置

    目标数据表:

datax安装+配置

增量数据同步

MysqlReader使用JDBC SELECT语句完成数据抽取工作,因此可以使用SELECT...WHERE...进行增量数据抽取,方式有多种:

  • 数据库在线应用写入数据库时,填充modify字段为更改时间戳,包括新增、更新、删除(逻辑删)。对于这类应用,MysqlReader只需要WHERE条件跟上一同步阶段时间戳即可。
  • 对于新增流水型数据,MysqlReader可以WHERE条件后跟上一阶段最大自增ID即可。

对于业务上无字段区分新增、修改数据情况,MysqlReader也无法进行增量数据同步,只能同步全量数据。

采用DataX实现多表增量数据同步

增量同步实现

实现增量同步需要在表中增加一个时间戳字段,如update_time,在同步配置文件中,通过where条件,根据时间戳字段筛选当前时间向前一段时间内的增量数据。

{

    "job": {

        "content": [

            {

                "reader": {

                    "name""mysqlreader"

                    "parameter": {

                        "column": [

                        "doc_id","title","file_path","approval_id","page_count","version"

                        ], 

                        "connection": [

                            {

                                "jdbcUrl": ["jdbc:mysql://192.168.81.1:3306/bootdo?useUnicode=true&characterEncoding=utf8"], 

                                "table": ["es_approval_doc"]

                            }

                        ], 

                        "password""123456"

                        "username""root",

                        "where""version > FROM_UNIXTIME(${start_time}) and version < FROM_UNIXTIME(${end_time})",

                    }

                }, 

                "writer": {

                    "name""mysqlwriter"

                    "parameter": {

                        "column": [

                        "doc_id","title","file_path","approval_id","page_count","version"

                        ], 

                        "writeMode":"update",

                        "connection": [

                            {

                                "jdbcUrl""jdbc:mysql://192.168.81.1:3306/bootdo?useUnicode=true&characterEncoding=utf8"

                                "table": ["es_approval_doc_copy"]

                            }

                        ], 

                        "password""123456"

                        "username""root"

                    }

                }

            }

        ], 

        "setting": {

            "speed": {

                "channel""1"

            }

        }

    }

}

json文件中,${start_time}和${end_time}为调用datax.py时传入的参数。

datax/bin/datax.py ../../mysql2mysql.json -p "-Dstart_time=1546337137 -Dend_time=1546337237"

定时同步实现

定时同步可以采用操作系统的定时任务+shell脚本实现。以下为在linux系统中的方案:

1、编写shell脚本,命名为syntask.sh:

#!/bin/bash

# source /etc/profile

# 截至时间设置为当前时间戳

end_time=$(date +%s)

# 开始时间设置为120s前时间戳

start_time=$(($end_time - 120))

# datax/bin/datax.py ../../mysql2mysql.json -p "-Dstart_time=$start_time -Dend_time=$end_time"

这里通过脚本获取用于筛选条件中的开始时间start_time和结束时间end_time,将两个时间作为参数传给datax.py。

2、在crontab中,添加任务计划:

$crontab -e

* */1 * * * /syntask.sh

DataX不适合实时数据同步或太频繁的定时同步,因为同步都需要去读取源表,频率过大对源表会造成压力。

此外,最好每次增量同步的时间段比定时任务时间间隔大一些,以保证所有时间产生的数据都被覆盖到。

异常情况下的补救措施:

如果某段时间内由于服务器、操作系统、网络等原因造成某个时间段内数据没有正常同步,那么可以通过手动执行同步的方式进行补救,执行同步时,将筛选的时间段加大大覆盖异常发生的整个时间段。

多表同步实现

通常我们的业务系统存在有多个表,表之间有外键关系。为实现多表的数据同步,我们需要理清外键依赖关系,为每个表分别编写json同步配置文件,并按外键依赖关系逐个调用datax.py。

如对于主表es_approval和子表es_approval_doc,可以对应写两个json配置文件:mysql2mysql-approval.json和mysql2mysql-approval-doc.json,在syntask.sh中先调用主表配置文件,再调用子表配置文件。

#!/bin/bash

source /etc/profile

# 截至时间设置为当前时间戳

end_time=$(date +%s)

# 开始时间设置为120s前时间戳

start_time=$(($end_time - 3600))

/datax/bin/datax.py /mysql2mysql-approval.json -p "-Dstart_time=$start_time -Dend_time=$end_time" 

/datax/bin/datax.py /mysql2mysql-approval-doc.json -p "-Dstart_time=$start_time -Dend_time=$end_time"

多级多路同步

要实现多级同步,可以在每两级之间搭建一个datax实例实现这两级之间的数据同步。

要实现多路同步,可以为同一个表编写多个配置文件,向多个目标库同步。

 

{

    "job": {

        "setting": {

            "speed": {

                "channel": 500

            }

        },

        "content": [

            {

                "reader": {

                    "name": "mysqlreader",

                    "parameter": {

                        "username": "DD",

                        "password": "6DDD8d",

                        "column": ["*"],

                        "connection": [

                           {
                                "querySql": [ "select * from app_sap_source_log_list  where id>14940537;"],

                              "jdbcUrl": ["jdbc:mysql://127.0.0.1:3306/ scrm?useUnicode=true&characterEncoding=utf8"]

                           }

                        ]

                    }

                },

                "writer": {

                    "name": "mysqlwriter",

                    "parameter": {

                        "writeMode": "insert",

                        "username": "GG",

                        "password": "6UKD",

                        "column": ["*"],

                        "connection": [

                            {

                               "jdbcUrl": "jdbc:mysql://192.168.8.253:3306/biz_meeting",

                               "table": ["app_sap_source"]

                            }

                        ]

                      }

                    }

                }

        ]

    }

}

 

 

MySQL超时参数以及相关DataX数据同步案例分享

https://blog.51cto.com/11778640/1906477

 

 

.解决方案:将DataX使用的mysql驱动包替换为对应的Mysql8.x版本

  • mysql驱动包路径: ${HOME目录}/datax/plugin/reader/mysqlreader/libs和${HOME目录}/datax/plugin/writer/mysqlwriter/libs
  •  wget https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.18/mysql-connector-java-8.0.18.jar
  •  datax安装+配置