DATAX简介
DataX 是阿里巴巴集团内被广泛使用的离线数据同步工具/平台,实现包括 MySQL、Oracle、SqlServer、Postgre、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、DRDS 等各种异构数据源之间高效的数据同步功能。
DATAX3.0 框架

DataX本身作为离线数据同步框架,采用Framework + plugin架构构建。将数据源读取和写入抽象成为Reader/Writer插件,纳入到整个同步框架中。
Reader:Reader为数据采集模块,负责采集数据源的数据,将数据发送给Framework。
Writer: Writer为数据写入模块,负责不断向Framework取数据,并将数据写入到目的端。
Framework:Framework用于连接reader和writer,作为两者的数据传输通道,并处理缓冲,流控,并发,数据转换等核心技术问题。
DATAX3.0 插件体系

DataX Framework提供了简单的接口与插件交互,提供简单的插件接入机制,只需要任意加上一种插件,就能无缝对接其他数据源。
*(clickhouse在DATAX插件体系中被归为通用RDBMS)

DataX完成单个数据同步的作业,我们称之为Job,DataX接受到一个Job之后,将启动一个进程来完成整个作业同步过程。DataX Job模块是单个作业的中枢管理节点,承担了数据清理、子任务切分(将单一作业计算转化为多个子Task)、TaskGroup管理等功能。
DataXJob启动后,会根据不同的源端切分策略,将Job切分成多个小的Task(子任务),以便于并发执行。Task便是DataX作业的最小单元,每一个Task都会负责一部分数据的同步工作。
切分多个Task之后,DataX Job会调用Scheduler模块,根据配置的并发数据量,将拆分成的Task重新组合,组装成TaskGroup(任务组)。每一个TaskGroup负责以一定的并发运行完毕分配好的所有Task,默认单个任务组的并发数量为5。
每一个Task都由TaskGroup负责启动,Task启动后,会固定启动Reader—>Channel—>Writer的线程来完成任务同步工作。
DataX作业运行起来之后, Job监控并等待多个TaskGroup模块任务完成,等待所有TaskGroup任务完成后Job成功退出。否则,异常退出,进程退出值非0
DATAX配置
部署DATAX
下载源码:
git clone git@github.com:alibaba/DataX.git
or
https://github.com/alibaba/DataX (Download ZIP)复制maven打包:
cd {DataX_source_code_home}
mvn -U clean package assembly:assembly -Dmaven.test.skip=true复制
遇到无法打包的问题,需要手动修改仓库地址http://maven.aliyun.com/nexus/content/groups/public/:
cd {DataX_source_code_home}
vim pom.xml
# 注释该段代码
<!--
<repositories>
<repository>
<id>central</id>
<name>Nexus aliyun</name>
<url>https://maven.aliyun.com/repository/central</url>
<releases>
<enabled>true</enabled>
</releases>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
</repositories>
-->
#替换成新的地址
<repositories>
<repository>
<id>maven-ali</id>
<url>http://maven.aliyun.com/nexus/content/groups/public//</url>
<releases>
<enabled>true</enabled>
</releases>
<snapshots>
<enabled>true</enabled>
<updatePolicy>always</updatePolicy>
<checksumPolicy>fail</checksumPolicy>
</snapshots>
</repository>
</repositories>复制
打包成功后显示:
[INFO] BUILD SUCCESS
[INFO] -----------------------------------------------------------------
[INFO] Total time: 10:44 min
[INFO] Finished at: 2020-08-18T10:26:48+08:00
[INFO] Final Memory: 133M/960M
[INFO] -----------------------------------------------------------------复制
配置DATAX
查看mysql2clickhouse的模板
[root@localhost datax]# python bin/datax.py -r mysqlreader -wclickhousewriter
DataX (DATAX-OPENSOURCE-3.0), From Alibaba !
Copyright (C) 2010-2017, Alibaba Group. All Rights Reserved.
Please refer to the mysqlreader document:
https://github.com/alibaba/DataX/blob/master/mysqlreader/doc/mysqlreadr.md
Please refer to the clickhousewriter document:
https://github.com/alibaba/DataX/blob/master/clickhousewriter/doc/clichousewriter.md
Please save the following configuration as a json file and use
python {DATAX_HOME}/bin/datax.py {JSON_FILE_NAME}.json
to run the job.
{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"column": [],
"connection": [
{
"jdbcUrl": [],
"table": []
}
],
"password": "",
"username": "",
"where": ""
}
},
"writer": {
"name": "clickhousewriter",
"parameter": {
"batchByteSize": 134217728,
"batchSize": 65536,
"column": [
"col1",
"col2",
"col3"
],
"connection": [
{
"jdbcUrl":"jdbc:clickhouse://<host>:<port>[/<database]",
"table": [
"table1",
"table2"
]
}
],
"dryRun": false,
"password": "password",
"postSql": [],
"preSql": [],
"username": "username",
"writeMode": "insert"
}
}
}
],
"setting": {
"speed": {
"channel": ""
}
}
}
}复制
把格式复制至 job 目录下的 job.json 文件内,附上已验证的json文件:
{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"column": [],
"connection": [
{
"jdbcUrl": ["jdbc:mysql://172.16.150.223:3306/test"],
"table": ["test_update"]
}
],
"password": "123456",
"username": "root",
"column": [
"id","uid","name"],
"where": ""
}
},
"writer": {
"name": "clickhousewriter",
"parameter": {
"batchByteSize": 134217728,
"batchSize": 65536,
"column": ["id","uid","name"],
"connection": [
{
"jdbcUrl": "jdbc:clickhouse://127.0.0.1:8123/test",
"table": [
"test_update"
]
}
],
"dryRun": false,
"password": "123456",
"postSql": [],
"preSql": [],
"username": "default",
"writeMode": "insert"
}
}
}
],
"setting": {
"speed": {
"channel": "1"
}
}
}
}复制
开启DATAX任务
[root@localhost datax]# python bin/datax.py job/job.json
DataX (DATAX-OPENSOURCE-3.0), From Alibaba !
Copyright (C) 2010-2017, Alibaba Group. All Rights Reserved.
2020-08-18 14:02:20.251 [main] INFO VMInfo - VMInfo# operatingSystem class => sun.management.OperatingSystemImpl
2020-08-18 14:02:20.261 [main] INFO Engine - the machine info =>
osInfo: Oracle Corporation 1.8 25.162-b12
jvmInfo: Linux amd64 3.10.0-862.el7.x86_64
cpu num: 4
totalPhysicalMemory: -0.00G
freePhysicalMemory: -0.00G
maxFileDescriptorCount: -1
currentOpenFileDescriptorCount: -1
GC Names [PS MarkSweep, PS Scavenge]
MEMORY_NAME | allocation_size | init_size
PS Eden Space | 256.00MB | 256.00MB
Code Cache | 240.00MB | 2.44MB
Compressed Class Space | 1,024.00MB | 0.00MB
PS Survivor Space | 42.50MB | 42.50MB
PS Old Gen | 683.00MB | 683.00MB
Metaspace | -0.00MB | 0.00MB
2020-08-18 14:02:20.316 [main] WARN Engine - prioriy set to 0, because NumberFormatException, the value is: null
2020-08-18 14:02:20.319 [main] INFO PerfTrace - PerfTrace traceId=job_-1, isEnable=false, priority=0
2020-08-18 14:02:20.320 [main] INFO JobContainer - DataX jobContainer starts job.
2020-08-18 14:02:20.322 [main] INFO JobContainer - Set jobId = 0
2020-08-18 14:02:20.701 [job-0] INFO OriginalConfPretreatmentUtil - Available jdbcUrl:jdbc:mysql://172.16.150.223:3306/test?yearIsDateType=false&zeroDateTimeBehavior=convertToNull&tinyInt1isBit=false&rewriteBatchedStatements=true.
2020-08-18 14:02:20.720 [job-0] INFO OriginalConfPretreatmentUtil - table:[test_update] has columns:[id,uid,name].
2020-08-18 14:02:20.773 [job-0] INFO ClickHouseDriver - Driver registered
2020-08-18 14:02:20.972 [job-0] INFO OriginalConfPretreatmentUtil - table:[test_update] all columns:[
id,uid,name,create_date
].
2020-08-18 14:02:20.986 [job-0] INFO OriginalConfPretreatmentUtil - Write data [
insert INTO %s (id,uid,name) VALUES(?,?,?)
], which jdbcUrl like:[jdbc:clickhouse://127.0.0.1:8123/test]
2020-08-18 14:02:20.986 [job-0] INFO JobContainer - jobContainer starts to do prepare ...
2020-08-18 14:02:20.987 [job-0] INFO JobContainer - DataX Reader.Job [mysqlreader] do prepare work .
2020-08-18 14:02:20.987 [job-0] INFO JobContainer - DataX Writer.Job [clickhousewriter] do prepare work .
2020-08-18 14:02:20.988 [job-0] INFO JobContainer - jobContainer starts to do split ...
2020-08-18 14:02:20.989 [job-0] INFO JobContainer - Job set Channel-Number to 1 channels.
2020-08-18 14:02:20.995 [job-0] INFO JobContainer - DataX Reader.Job [mysqlreader] splits to [1] tasks.
2020-08-18 14:02:20.996 [job-0] INFO JobContainer - DataX Writer.Job [clickhousewriter] splits to [1] tasks.
2020-08-18 14:02:21.022 [job-0] INFO JobContainer - jobContainer starts to do schedule ...
2020-08-18 14:02:21.027 [job-0] INFO JobContainer - Scheduler starts [1] taskGroups.
2020-08-18 14:02:21.030 [job-0] INFO JobContainer - Running by standalone Mode.
2020-08-18 14:02:21.039 [taskGroup-0] INFO TaskGroupContainer - taskGroupId=[0] start [1] channels for [1] tasks.
2020-08-18 14:02:21.043 [taskGroup-0] INFO Channel - Channel set byte_speed_limit to -1, No bps activated.
2020-08-18 14:02:21.043 [taskGroup-0] INFO Channel - Channel set record_speed_limit to -1, No tps activated.
2020-08-18 14:02:21.053 [taskGroup-0] INFO TaskGroupContainer - taskGroup[0] taskId[0] attemptCount[1] is started
2020-08-18 14:02:21.057 [0-0-0-reader] INFO CommonRdbmsReader$Task - Begin to read record by Sql: [select id,uid,name from test_update
] jdbcUrl:[jdbc:mysql://172.16.150.223:3306/test?yearIsDateType=false&zeroDateTimeBehavior=convertToNull&tinyInt1isBit=false&rewriteBatchedStatements=true].
2020-08-18 14:02:21.074 [0-0-0-reader] INFO CommonRdbmsReader$Task - Finished read record by Sql: [select id,uid,name from test_update
] jdbcUrl:[jdbc:mysql://172.16.150.223:3306/test?yearIsDateType=false&zeroDateTimeBehavior=convertToNull&tinyInt1isBit=false&rewriteBatchedStatements=true].
2020-08-18 14:02:21.154 [taskGroup-0] INFO TaskGroupContainer - taskGroup[0] taskId[0] is successed, used[102]ms
2020-08-18 14:02:21.155 [taskGroup-0] INFO TaskGroupContainer - taskGroup[0] completed it's tasks.
2020-08-18 14:02:31.055 [job-0] INFO StandAloneJobContainerCommunicator - Total 2 records, 15 bytes | Speed 1B/s, 0 records/s | Error 0 records, 0 bytes | All Task WaitWriterTime 0.000s | All Task WaitReaderTime 0.000s | Percentage 100.00%
2020-08-18 14:02:31.056 [job-0] INFO AbstractScheduler - Scheduler accomplished all tasks.
2020-08-18 14:02:31.057 [job-0] INFO JobContainer - DataX Writer.Job [clickhousewriter] do post work.
2020-08-18 14:02:31.058 [job-0] INFO JobContainer - DataX Reader.Job [mysqlreader] do post work.
2020-08-18 14:02:31.058 [job-0] INFO JobContainer - DataX jobId [0] completed successfully.
2020-08-18 14:02:31.060 [job-0] INFO HookInvoker - No hook invoked, because base dir not exists or is a file: /opt/DataX-master/target/datax/datax/hook
2020-08-18 14:02:31.063 [job-0] INFO JobContainer -
[total cpu info] =>
averageCpu | maxDeltaCpu | minDeltaCpu
-1.00% | -1.00% | -1.00%
[total gc info] =>
NAME | totalGCCount | maxDeltaGCCount | minDeltaGCCount | totalGCTime | maxDeltaGCTime | minDeltaGCTime
PS MarkSweep | 0 | 0 | 0 | 0.000s | 0.000s | 0.000s
PS Scavenge | 0 | 0 | 0 | 0.000s | 0.000s | 0.000s
2020-08-18 14:02:31.064 [job-0] INFO JobContainer - PerfTrace not enable!
2020-08-18 14:02:31.065 [job-0] INFO StandAloneJobContainerCommunicator - Total 2 records, 15 bytes | Speed 1B/s, 0 records/s | Error 0 records, 0 bytes | All Task WaitWriterTime 0.000s | All Task WaitReaderTime 0.000s | Percentage 100.00%
2020-08-18 14:02:31.067 [job-0] INFO JobContainer -
任务启动时刻 : 2020-08-18 14:02:20
任务结束时刻 : 2020-08-18 14:02:31
任务总计耗时 : 10s
任务平均流量 : 1B/s
记录写入速度 : 0rec/s
读出记录总数 : 2
读写失败总数 : 0复制
整个输出还是非常详细的,可以看到具体的同步状态,同步效率。
DATAX 插件
我们实现的是mysql2clickhouse,索引用到了 MysqlReader 和 ClickHouseWriter 插件:
MysqlReader
ysqlReader通过JDBC连接器连接到远程的Mysql数据库,并根据用户配置的信息生成查询SELECT SQL语句,然后发送到远程Mysql数据库,并将该SQL执行返回结果使用DataX自定义的数据类型拼装为抽象的数据集,并传递给下游Writer处理。
对于用户配置Table、Column、Where的信息,MysqlReader将其拼接为SQL语句发送到Mysql数据库;对于用户配置querySql信息,MysqlReader直接将其发送到Mysql数据库。
样例
{
"job": {
"setting": {
"speed": {
"channel":1
}
},
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "root",
"password": "root",
"connection": [
{
"querySql": [
"select db_id,on_line_flag from db_info where db_id < 10;"
],
"jdbcUrl": [
"jdbc:mysql://bad_ip:3306/database",
"jdbc:mysql://127.0.0.1:bad_port/database",
"jdbc:mysql://127.0.0.1:3306/database"
]
}
]
}
},
"writer": {
"name": "streamwriter",
"parameter": {
"print": false,
"encoding": "UTF-8"
}
}
}
]
}
}复制
参数说明
jdbcUrl
JDBC连接信息,使用JSON的数组描述,并支持一个库填写多个连接地址
必选:是
默认值:无
table
描述:所选取的需要同步的表。使用JSON的数组描述,因此支持多张表同时抽取。当配置为多张表时,用户自己需保证多张表是同一schema结构,MysqlReader不予检查表是否同一逻辑表。注意,table必须包含在connection配置单元中。
必选:是
默认值:无
column
支持列裁剪,即列可以挑选部分列进行导出。
支持列换序,即列可以不按照表schema信息进行导出。
支持常量配置,用户需要按照Mysql SQL语法格式: ["id", "
table
", "1", "'bazhen.csy'", "null", "to_char(a + 1)", "2.3" , "true"] id为普通列名,table
为包含保留在的列名,1为整形数字常量,'bazhen.csy'为字符串常量,null为空指针,to_char(a + 1)为表达式,2.3为浮点数,true为布尔值。必选:是
默认值:无
描述:所配置的表中需要同步的列名集合,使用JSON的数组描述字段信息。用户使用*代表默认使用所有列配置,例如 ['*']。
splitPk
描述:MysqlReader进行数据抽取时,如果指定splitPk,表示用户希望使用splitPk代表的字段进行数据分片,DataX因此会启动并发任务进行数据同步,这样可以大大提供数据同步的效能。
推荐splitPk用户使用表主键,因为表主键通常情况下比较均匀,因此切分出来的分片也不容易出现数据热点。
目前splitPk仅支持整形数据切分,不支持浮点、字符串、日期等其他类型。如果用户指定其他非支持类型,MysqlReader将报错!
如果splitPk不填写,包括不提供splitPk或者splitPk值为空,DataX视作使用单通道同步该表数据。
必选:否
默认值:空
where
描述:筛选条件,MysqlReader根据指定的column、table、where条件拼接SQL,并根据这个SQL进行数据抽取。在实际业务场景中,往往会选择当天的数据进行同步,可以将where条件指定为gmt_create > $bizdate 。注意:不可以将where条件指定为limit 10,limit不是SQL的合法where子句。
where条件可以有效地进行业务增量同步。如果不填写where语句,包括不提供where的key或者value,DataX均视作同步全量数据。
必选:否
默认值:无
querySql
描述:在有些业务场景下,where这一配置项不足以描述所筛选的条件,用户可以通过该配置型来自定义筛选SQL。当用户配置了这一项之后,DataX系统就会忽略table,column这些配置型,直接使用这个配置项的内容对数据进行筛选,例如需要进行多表join后同步数据,使用select a,b from table_a join table_b on table_a.id = table_b.id
当用户配置querySql时,MysqlReader直接忽略table、column、where条件的配置,querySql优先级大于table、column、where选项。
必选:否
默认值:无
ClickHouseWriter
使用clickhousewriter的官方jdbc接口, 批量把从reader读入的数据写入ClickHouse
样例
{
"job": {
"setting": {
"speed": {
"channel": 2
}
},
"content": [
{
"reader": {
...
},
"writer": {
"name": "clickhousewriter",
"parameter": {
"username": "default",
"password": "zifei123",
"column":["belonger","belonger_type","bs_flag","id","income_asset_code","income_fee","insert_time","logName","order_no","pair_id","price","quantity","trade_date","trade_no","trade_time","update_time"],
"connection": [
{
"jdbcUrl": "jdbc:clickhouse://127.0.0.1:8123/default",
"table":["XXX"]
}
]
}
}
}
]
}
}复制
参数说明
jdbcUrl
描述:ClickHouse的连接地址,目前支持多数据源并行导入,支持随机负载均衡,其格式为:jdbc:clickhouse://ip1:8123,ip2:8123/database
必选:是
默认值:无
batchSize
描述:每次批量数据的条数
必选:否
默认值:2048
trySize
描述:失败后重试的次数
必选:否
默认值:30
导入建议
1)每次传8192,既batchSize设为8192
2)数据最好跟ClickHouse分区Key分组排序,这样有更好的插入性能
总结
整个测试过程中遇到非常多的坑,主要集中在job.json文件的配置上。DATAX对json文件的要求非常高,因为整个同步完全依赖这个json文件,有一点不标准的地方,都会导致无法同步。
本次实验的操作应用场景有限,比如可以用于流水表(数据全是新增,没有update,同步判断条件可以根据id)。复杂的场景可能就需要考虑其他同步方案了,比如用python实现mysql到ck的同步。
除此以外,对于clickhouse不熟悉也增加了测试的难度。虽然之前研究测试过ck,但实际运维还是需要多加练习,下面补充本次测试用到的操作。
clickhouse相关操作
1. 用户改密
DATAX job.json里的password为必填项,而ck的默认账户default密码为空,需要设置一个密码或新建一个同步账号。
vim /etc/clickhouse-server/users.xml
<password>123456</password> #取消这行注释,配一个明文密码复制
一般来说,ck都会配置加密的密码,方法类似,生成个加密串,指定在
2. 连接方式
默认安装完的ck,直接使用 clickhouse-client 即可登录
[root@localhost ~]# clickhouse-client
ClickHouse client version 19.17.4.11 (official build).
Connecting to localhost:9000 as user default.
Connected to ClickHouse server version 19.17.4 revision 54428.
localhost :)复制
加完密后,可以使用参数指定:
[root@localhost ~]# clickhouse-client --user default --password 123456 --host 127.0.0.1 --port 9000
ClickHouse client version 19.17.4.11 (official build).
Connecting to 127.0.0.1:9000 as user default.
Connected to ClickHouse server version 19.17.4 revision 54428.
localhost :)复制
3. clickhouse 的建表语句
先看下mysql的测试表结构:
mysql> desc test_update;
+-------+-------------+------+-----+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+-------+-------------+------+-----+---------+-------+
| id | int(11) | NO | PRI | NULL | |
| uid | int(11) | YES | | NULL | |
| name | varchar(10) | YES | | NULL | |
+-------+-------------+------+-----+---------+-------+复制
对应ck的建表语句:
CREATE TABLE test_update
(
`id` UInt16,
`uid` String,
`name` String,
`create_date` date
)
ENGINE = MergeTree(create_date, id, 8192)复制