暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Datax

DB小榴莲 2021-06-25
2693
Datax介绍

Datax是一个异构数据源同步工具(离线的),通过星型数据链路替换复杂的网状链路,完成数据同步。当接入一个新的数据源,只要将其接入到Datax,以Datax作中间转换,再将数据输出到目标数据源。

DataX本身作为离线数据同步框架,采用Framework + plugin架构构建。将数据源读取和写入抽象成为Reader/Writer插件,纳入到整个同步框架中。

  • Reader

    Reader为数据采集模块,负责采集数据源的数据,将数据发送给Framework。

  • Writer

    Writer为数据写入模块,负责不断从Framework取数据,并将数据写入到目的端。

  • Framework

    Framework用于连接reader和writer,作为两者的数据传输通道,并处理缓冲,流控,并发,数据转换等核心技术问题。

Datax支持对象

DataX目前已经有了比较全面的插件体系,主流的RDBMS数据库、NOSQL、大数据计算系统都已经接入。DataX目前支持数据如下:

类型数据源Reader(读)Writer(写)文档
RDBMS 关系型数据库MySQL
Oracle    √    √  
SQLServer
PostgreSQL
DRDS
达梦
通用RDBMS(支持所有关系型数据库)
阿里云数仓数据存储ODPS
ADS
OSS
OCS
NoSQL数据存储OTS
Hbase0.94
Hbase1.1
MongoDB
Hive
无结构化数据存储TxtFile
FTP
HDFS
Elasticsearch

Datax核心架构

核心模块介绍

  1. DataX完成单个数据同步作业,称之为Job,DataX接受到一个Job之后,将启动一个进程来完成整个作业同步。DataX Job模块是单个作业的中枢管理节点,承担了数据清理、子任务切分(将单一作业计算转化为多个子Task)、TaskGroup管理等功能。

  1. DataX Job启动后,会根据不同的源端切分策略,将Job切分成多个小的Task(子任务),以便于并发执行。Task成为DataX作业的最小单元,每一个Task都会负责一部分数据的同步工作。

  1. 切分多个Task之后,DataX Job会调用Scheduler模块,根据配置的并发数据量,将拆分成的Task重新组合,组装成TaskGroup(任务组)。每一个TaskGroup负责以一定的并发运行分配好的所有Task,默认单个任务组的并发数量为5。

  1. 每一个Task都由TaskGroup负责启动,Task启动后,会固定启动Reader—>Channel—>Writer的线程来完成任务同步工作。

  1. DataX作业运行起来之后, Job监控并等待多个TaskGroup模块任务完成,等待所有TaskGroup任务完成后Job成功退出。否则,异常退出,进程退出值非0。

Datax调度流程

举例来说,用户提交了一个DataX作业,并且配置了20个并发,目的是将一个100张分表的mysql数据同步到odps里面。DataX的调度决策思路是:

  1. DataX Job根据分库分表切分成100个Task(子任务)。

  2. 根据20个并发,DataX计算共需要分配4个TaskGroup。

  3. 4个TaskGroup平分切分好的100个Task,每一个TaskGroup负责以5个并发共计运行25个Task。

Datax流控模式

DataX3.0提供了包括通道(并发)、记录流、字节流三种流控模式,可以随意控制作业速度,让你的作业在库可以承受的范围内达到最佳的同步速度。

"speed": {   "channel": 5,   "byte": 1048576,   "record": 10000}
复制

传输过程中打印传输速度、进度等

传输过程中会打印进程相关的CPU、JVM等

在任务结束之后,打印总体运行情况

Datax部署

1、安装JDK

JDK1.8以上版本,推荐1.8

[root@localhost ~]# rpm -ivh jdk-8u291-linux-i586.rpmwarning: jdk-8u291-linux-i586.rpm: Header V3 RSA/SHA256 Signature, key ID ec551f03: NOKEYPreparing...                          ################################# [100%]Updating  installing...   1:jdk1.8-2000:1.8.0_291-fcs        ################################# [100%]Unpacking JAR files...        tools.jar...
复制

配置/etc/profile环境变量

JAVA_HOME=/usr/java/jdk1.8.0_291CLASSPATH=$JAVA_HOME/lib/PATH=$PATH:$JAVA_HOME/binexport PATH JAVA_HOME CLASSPATH
复制
2、python

推荐python 2.6.x

3、解压datax安装包
4、json文件配置
{    "job": {        "setting": {            "speed": {                "channel": 5//分配5个通道            }        },        "content": [            {                "reader": {                    "name": "mysqlreader",//从源端(MySQL)读取数据,datax支持的reader插件在/datax/plugin/reader目录中                    "parameter": {                        "username": "***",//源端数据库登录用户                        "password": "*****",//源端数据库登录用户密码                        "column": [                            "id",                            "user"//源端数据库需要同步的表字段                        ],                        "splitPk": "id",//使用splitPk代表的字段进行数据分片,DataX会启动并发任务进行数据同步,这样可以大大提供数据同步的效能                        "connection": [                            {                                "table": [                                    "tablename"//同步表名                                ],                                "jdbcUrl": [     "jdbc:mysql://xxx.xxx.xxx.xxxx:3306/databasename"//源端数据库信息                                ]                            }                        ]                    }                },                                "writer": {                    "name": "mysqlwriter",//目标端写入,所有datax支持的写插件在/datax/plugin/writer目录下                    "parameter": {                        "writeMode": "insert",                        "username": "***",//目标端登录数据库用户                        "password": "*****",//目标端登录数据库用户密码                        "column": [                            "id",                            "user"                        ],                        "session": [                            "set session sql_mode='ANSI'"                        ],                        "preSql": [                            "delete from @table"                        ],                        "connection": [                            {                                "jdbcUrl": "jdbc:mysql://xxx.xxx.xxx.xxx:3306/databasename",//目标端数据库信息                                "table": [                                    "tablename"                                ]                            }                        ]                    }                }            }        ]    }}
复制

5、目标端建表结构
6、执行同步
python datax.py m2m_tf.json
复制


----------------------------
长按下图二维码关注我,每篇涨点小知识!

文章转载自DB小榴莲,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论