Datax介绍
Datax是一个异构数据源同步工具(离线的),通过星型数据链路替换复杂的网状链路,完成数据同步。当接入一个新的数据源,只要将其接入到Datax,以Datax作中间转换,再将数据输出到目标数据源。
DataX本身作为离线数据同步框架,采用Framework + plugin架构构建。将数据源读取和写入抽象成为Reader/Writer插件,纳入到整个同步框架中。
Reader
Reader为数据采集模块,负责采集数据源的数据,将数据发送给Framework。
Writer
Writer为数据写入模块,负责不断从Framework取数据,并将数据写入到目的端。
Framework
Framework用于连接reader和writer,作为两者的数据传输通道,并处理缓冲,流控,并发,数据转换等核心技术问题。
Datax支持对象
DataX目前已经有了比较全面的插件体系,主流的RDBMS数据库、NOSQL、大数据计算系统都已经接入。DataX目前支持数据如下:
类型 | 数据源 | Reader(读) | Writer(写) | 文档 |
---|---|---|---|---|
RDBMS 关系型数据库 | MySQL | √ | √ | 读 、写 |
Oracle | √ | √ | 读 、写 | |
SQLServer | √ | √ | 读 、写 | |
PostgreSQL | √ | √ | 读 、写 | |
DRDS | √ | √ | 读 、写 | |
达梦 | √ | √ | 读 、写 | |
通用RDBMS(支持所有关系型数据库) | √ | √ | 读 、写 | |
阿里云数仓数据存储 | ODPS | √ | √ | 读 、写 |
ADS | √ | 写 | ||
OSS | √ | √ | 读 、写 | |
OCS | √ | √ | 读 、写 | |
NoSQL数据存储 | OTS | √ | √ | 读 、写 |
Hbase0.94 | √ | √ | 读 、写 | |
Hbase1.1 | √ | √ | 读 、写 | |
MongoDB | √ | √ | 读 、写 | |
Hive | √ | √ | 读 、写 | |
无结构化数据存储 | TxtFile | √ | √ | 读 、写 |
FTP | √ | √ | 读 、写 | |
HDFS | √ | √ | 读 、写 | |
Elasticsearch | √ | 写 |
Datax核心架构
核心模块介绍
DataX完成单个数据同步作业,称之为Job,DataX接受到一个Job之后,将启动一个进程来完成整个作业同步。DataX Job模块是单个作业的中枢管理节点,承担了数据清理、子任务切分(将单一作业计算转化为多个子Task)、TaskGroup管理等功能。
DataX Job启动后,会根据不同的源端切分策略,将Job切分成多个小的Task(子任务),以便于并发执行。Task成为DataX作业的最小单元,每一个Task都会负责一部分数据的同步工作。
切分多个Task之后,DataX Job会调用Scheduler模块,根据配置的并发数据量,将拆分成的Task重新组合,组装成TaskGroup(任务组)。每一个TaskGroup负责以一定的并发运行分配好的所有Task,默认单个任务组的并发数量为5。
每一个Task都由TaskGroup负责启动,Task启动后,会固定启动Reader—>Channel—>Writer的线程来完成任务同步工作。
DataX作业运行起来之后, Job监控并等待多个TaskGroup模块任务完成,等待所有TaskGroup任务完成后Job成功退出。否则,异常退出,进程退出值非0。
Datax调度流程
举例来说,用户提交了一个DataX作业,并且配置了20个并发,目的是将一个100张分表的mysql数据同步到odps里面。DataX的调度决策思路是:
DataX Job根据分库分表切分成100个Task(子任务)。
根据20个并发,DataX计算共需要分配4个TaskGroup。
4个TaskGroup平分切分好的100个Task,每一个TaskGroup负责以5个并发共计运行25个Task。
Datax流控模式
DataX3.0提供了包括通道(并发)、记录流、字节流三种流控模式,可以随意控制作业速度,让你的作业在库可以承受的范围内达到最佳的同步速度。
"speed": { "channel": 5, "byte": 1048576, "record": 10000}复制
传输过程中打印传输速度、进度等
传输过程中会打印进程相关的CPU、JVM等
在任务结束之后,打印总体运行情况
Datax部署
1、安装JDK
JDK1.8以上版本,推荐1.8
[root@localhost ~]# rpm -ivh jdk-8u291-linux-i586.rpmwarning: jdk-8u291-linux-i586.rpm: Header V3 RSA/SHA256 Signature, key ID ec551f03: NOKEYPreparing... ################################# [100%]Updating installing... 1:jdk1.8-2000:1.8.0_291-fcs ################################# [100%]Unpacking JAR files... tools.jar...复制
配置/etc/profile环境变量
JAVA_HOME=/usr/java/jdk1.8.0_291CLASSPATH=$JAVA_HOME/lib/PATH=$PATH:$JAVA_HOME/binexport PATH JAVA_HOME CLASSPATH复制
2、python
推荐python 2.6.x
3、解压datax安装包
4、json文件配置
{ "job": { "setting": { "speed": { "channel": 5//分配5个通道 } }, "content": [ { "reader": { "name": "mysqlreader",//从源端(MySQL)读取数据,datax支持的reader插件在/datax/plugin/reader目录中 "parameter": { "username": "***",//源端数据库登录用户 "password": "*****",//源端数据库登录用户密码 "column": [ "id", "user"//源端数据库需要同步的表字段 ], "splitPk": "id",//使用splitPk代表的字段进行数据分片,DataX会启动并发任务进行数据同步,这样可以大大提供数据同步的效能 "connection": [ { "table": [ "tablename"//同步表名 ], "jdbcUrl": [ "jdbc:mysql://xxx.xxx.xxx.xxxx:3306/databasename"//源端数据库信息 ] } ] } }, "writer": { "name": "mysqlwriter",//目标端写入,所有datax支持的写插件在/datax/plugin/writer目录下 "parameter": { "writeMode": "insert", "username": "***",//目标端登录数据库用户 "password": "*****",//目标端登录数据库用户密码 "column": [ "id", "user" ], "session": [ "set session sql_mode='ANSI'" ], "preSql": [ "delete from @table" ], "connection": [ { "jdbcUrl": "jdbc:mysql://xxx.xxx.xxx.xxx:3306/databasename",//目标端数据库信息 "table": [ "tablename" ] } ] } } } ] }}复制
5、目标端建表结构
6、执行同步
python datax.py m2m_tf.json复制