暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

DATAX - Mysql TO Clickhouse

辣肉面加蛋加素鸡 2021-08-06
2637

DATAX简介

DataX 是阿里巴巴集团内被广泛使用的离线数据同步工具/平台,实现包括 MySQL、Oracle、SqlServer、Postgre、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、DRDS 等各种异构数据源之间高效的数据同步功能。

DATAX3.0 框架

DataX本身作为离线数据同步框架,采用Framework + plugin架构构建。将数据源读取和写入抽象成为Reader/Writer插件,纳入到整个同步框架中。

  • Reader:Reader为数据采集模块,负责采集数据源的数据,将数据发送给Framework。

  • Writer: Writer为数据写入模块,负责不断向Framework取数据,并将数据写入到目的端。

  • Framework:Framework用于连接reader和writer,作为两者的数据传输通道,并处理缓冲,流控,并发,数据转换等核心技术问题。

DATAX3.0 插件体系

DataX Framework提供了简单的接口与插件交互,提供简单的插件接入机制,只需要任意加上一种插件,就能无缝对接其他数据源。

*(clickhouse在DATAX插件体系中被归为通用RDBMS)

  1. DataX完成单个数据同步的作业,我们称之为Job,DataX接受到一个Job之后,将启动一个进程来完成整个作业同步过程。DataX Job模块是单个作业的中枢管理节点,承担了数据清理、子任务切分(将单一作业计算转化为多个子Task)、TaskGroup管理等功能。

  2. DataXJob启动后,会根据不同的源端切分策略,将Job切分成多个小的Task(子任务),以便于并发执行。Task便是DataX作业的最小单元,每一个Task都会负责一部分数据的同步工作。

  3. 切分多个Task之后,DataX Job会调用Scheduler模块,根据配置的并发数据量,将拆分成的Task重新组合,组装成TaskGroup(任务组)。每一个TaskGroup负责以一定的并发运行完毕分配好的所有Task,默认单个任务组的并发数量为5。

  4. 每一个Task都由TaskGroup负责启动,Task启动后,会固定启动Reader—>Channel—>Writer的线程来完成任务同步工作。

  5. DataX作业运行起来之后, Job监控并等待多个TaskGroup模块任务完成,等待所有TaskGroup任务完成后Job成功退出。否则,异常退出,进程退出值非0

DATAX配置

部署DATAX

  1. 下载源码:

    git clone git@github.com:alibaba/DataX.git

    or

    https://github.com/alibaba/DataX  (Download ZIP)

    复制
  2. maven打包:

    cd  {DataX_source_code_home}

    mvn -U clean package assembly:assembly -Dmaven.test.skip=true

    复制

遇到无法打包的问题,需要手动修改仓库地址http://maven.aliyun.com/nexus/content/groups/public/:

cd {DataX_source_code_home}

vim pom.xml
    # 注释该段代码
    <!--
    <repositories>
        <repository>
            <id>central</id>
            <name>Nexus aliyun</name>
            <url>https://maven.aliyun.com/repository/central</url>
            <releases>
                <enabled>true</enabled>
            </releases>
            <snapshots>
                <enabled>true</enabled>
            </snapshots>
        </repository>
    </repositories>
    -->

    #替换成新的地址
    <repositories>
        <repository>
            <id>maven-ali</id>
            <url>http://maven.aliyun.com/nexus/content/groups/public//</url>
            <releases>
                <enabled>true</enabled>
            </releases>
            <snapshots>
                <enabled>true</enabled>
                <updatePolicy>always</updatePolicy>
                <checksumPolicy>fail</checksumPolicy>
            </snapshots>
        </repository>
    </repositories>

复制

打包成功后显示:

[INFO] BUILD SUCCESS
[INFO] -----------------------------------------------------------------
[INFO] Total time: 10:44 min
[INFO] Finished at: 2020-08-18T10:26:48+08:00
[INFO] Final Memory: 133M/960M
[INFO] -----------------------------------------------------------------

复制

配置DATAX

查看mysql2clickhouse的模板

[root@localhost datax]# python bin/datax.py -r mysqlreader -wclickhousewriter

DataX (DATAX-OPENSOURCE-3.0), From Alibaba !
Copyright (C) 2010-2017, Alibaba Group. All Rights Reserved.


Please refer to the mysqlreader document:
     https://github.com/alibaba/DataX/blob/master/mysqlreader/doc/mysqlreadr.md 

Please refer to the clickhousewriter document:
     https://github.com/alibaba/DataX/blob/master/clickhousewriter/doc/clichousewriter.md 

Please save the following configuration as a json file and  use
     python {DATAX_HOME}/bin/datax.py {JSON_FILE_NAME}.json 
to run the job.

{
    "job": {
        "content": [
            {
                "reader": {
                    "name""mysqlreader"
                    "parameter": {
                        "column": [], 
                        "connection": [
                            {
                                "jdbcUrl": [], 
                                "table": []
                            }
                        ], 
                        "password"""
                        "username"""
                        "where"""
                    }
                }, 
                "writer": {
                    "name""clickhousewriter"
                    "parameter": {
                        "batchByteSize"134217728
                        "batchSize"65536
                        "column": [
                            "col1"
                            "col2"
                            "col3"
                        ], 
                        "connection": [
                            {
                                "jdbcUrl":"jdbc:clickhouse://<host>:<port>[/<database]"
                                "table": [
                                    "table1"
                                    "table2"
                                ]
                            }
                        ], 
                        "dryRun"false
                        "password""password"
                        "postSql": [], 
                        "preSql": [], 
                        "username""username"
                        "writeMode""insert"
                    }
                }
            }
        ], 
        "setting": {
            "speed": {
                "channel"""
            }
        }
    }
}

复制

把格式复制至 job 目录下的 job.json 文件内,附上已验证的json文件:

{
    "job": {
        "content": [
            {
                "reader": {
                    "name""mysqlreader",
                    "parameter": {
                        "column": [],
                        "connection": [
                            {
                                "jdbcUrl": ["jdbc:mysql://172.16.150.223:3306/test"],
                                "table": ["test_update"]
                            }
                        ],
                        "password""123456",
                        "username""root",
                        "column": [
                            "id","uid","name"],
                        "where"""
                    }
                },
                "writer": {
                    "name""clickhousewriter",
                    "parameter": {
                        "batchByteSize"134217728,
                        "batchSize"65536,
                        "column": ["id","uid","name"],
                        "connection": [
                            {
                                "jdbcUrl""jdbc:clickhouse://127.0.0.1:8123/test",
                                "table": [
                                    "test_update"
                                ]
                            }
                        ],
                        "dryRun"false,
                        "password""123456",
                        "postSql": [],
                        "preSql": [],
                        "username""default",
                        "writeMode""insert"
                    }
                }
            }
        ],
        "setting": {
            "speed": {
                "channel""1"
            }
        }
    }
}

复制

开启DATAX任务

[root@localhost datax]# python bin/datax.py job/job.json

DataX (DATAX-OPENSOURCE-3.0), From Alibaba !
Copyright (C) 2010-2017, Alibaba Group. All Rights Reserved.


2020-08-18 14:02:20.251 [main] INFO  VMInfo - VMInfo# operatingSystem class => sun.management.OperatingSystemImpl
2020-08-18 14:02:20.261 [main] INFO  Engine - the machine info  => 

    osInfo: Oracle Corporation 1.8 25.162-b12
    jvmInfo:    Linux amd64 3.10.0-862.el7.x86_64
    cpu num:    4

    totalPhysicalMemory:    -0.00G
    freePhysicalMemory: -0.00G
    maxFileDescriptorCount: -1
    currentOpenFileDescriptorCount: -1

    GC Names    [PS MarkSweep, PS Scavenge]

    MEMORY_NAME                    | allocation_size                | init_size                      
    PS Eden Space                  | 256.00MB                       | 256.00MB                       
    Code Cache                     | 240.00MB                       | 2.44MB                         
    Compressed Class Space         | 1,024.00MB                     | 0.00MB                         
    PS Survivor Space              | 42.50MB                        | 42.50MB                        
    PS Old Gen                     | 683.00MB                       | 683.00MB                       
    Metaspace                      | -0.00MB                        | 0.00MB 

2020-08-18 14:02:20.316 [main] WARN  Engine - prioriy set to 0, because NumberFormatException, the value is: null
2020-08-18 14:02:20.319 [main] INFO  PerfTrace - PerfTrace traceId=job_-1, isEnable=false, priority=0
2020-08-18 14:02:20.320 [main] INFO  JobContainer - DataX jobContainer starts job.
2020-08-18 14:02:20.322 [main] INFO  JobContainer - Set jobId = 0
2020-08-18 14:02:20.701 [job-0] INFO  OriginalConfPretreatmentUtil - Available jdbcUrl:jdbc:mysql://172.16.150.223:3306/test?yearIsDateType=false&zeroDateTimeBehavior=convertToNull&tinyInt1isBit=false&rewriteBatchedStatements=true.
2020-08-18 14:02:20.720 [job-0] INFO  OriginalConfPretreatmentUtil - table:[test_update] has columns:[id,uid,name].
2020-08-18 14:02:20.773 [job-0] INFO  ClickHouseDriver - Driver registered
2020-08-18 14:02:20.972 [job-0] INFO  OriginalConfPretreatmentUtil - table:[test_update] all columns:[
id,uid,name,create_date
].
2020-08-18 14:02:20.986 [job-0] INFO  OriginalConfPretreatmentUtil - Write data [
insert INTO %s (id,uid,name) VALUES(?,?,?)
], which jdbcUrl like:[jdbc:clickhouse://127.0.0.1:8123/test]
2020-08-18 14:02:20.986 [job-0] INFO  JobContainer - jobContainer starts to do prepare ...
2020-08-18 14:02:20.987 [job-0] INFO  JobContainer - DataX Reader.Job [mysqlreader] do prepare work .
2020-08-18 14:02:20.987 [job-0] INFO  JobContainer - DataX Writer.Job [clickhousewriter] do prepare work .
2020-08-18 14:02:20.988 [job-0] INFO  JobContainer - jobContainer starts to do split ...
2020-08-18 14:02:20.989 [job-0] INFO  JobContainer - Job set Channel-Number to 1 channels.
2020-08-18 14:02:20.995 [job-0] INFO  JobContainer - DataX Reader.Job [mysqlreader] splits to [1] tasks.
2020-08-18 14:02:20.996 [job-0] INFO  JobContainer - DataX Writer.Job [clickhousewriter] splits to [1] tasks.
2020-08-18 14:02:21.022 [job-0] INFO  JobContainer - jobContainer starts to do schedule ...
2020-08-18 14:02:21.027 [job-0] INFO  JobContainer - Scheduler starts [1] taskGroups.
2020-08-18 14:02:21.030 [job-0] INFO  JobContainer - Running by standalone Mode.
2020-08-18 14:02:21.039 [taskGroup-0] INFO  TaskGroupContainer - taskGroupId=[0] start [1] channels for [1] tasks.
2020-08-18 14:02:21.043 [taskGroup-0] INFO  Channel - Channel set byte_speed_limit to -1, No bps activated.
2020-08-18 14:02:21.043 [taskGroup-0] INFO  Channel - Channel set record_speed_limit to -1, No tps activated.
2020-08-18 14:02:21.053 [taskGroup-0] INFO  TaskGroupContainer - taskGroup[0] taskId[0] attemptCount[1] is started
2020-08-18 14:02:21.057 [0-0-0-reader] INFO  CommonRdbmsReader$Task - Begin to read record by Sql: [select id,uid,name from test_update 
jdbcUrl:[jdbc:mysql://172.16.150.223:3306/test?yearIsDateType=false&zeroDateTimeBehavior=convertToNull&tinyInt1isBit=false&rewriteBatchedStatements=true].
2020-08-18 14:02:21.074 [0-0-0-reader] INFO  CommonRdbmsReader$Task - Finished read record by Sql: [select id,uid,name from test_update 
jdbcUrl:[jdbc:mysql://172.16.150.223:3306/test?yearIsDateType=false&zeroDateTimeBehavior=convertToNull&tinyInt1isBit=false&rewriteBatchedStatements=true].
2020-08-18 14:02:21.154 [taskGroup-0] INFO  TaskGroupContainer - taskGroup[0] taskId[0] is successed, used[102]ms
2020-08-18 14:02:21.155 [taskGroup-0] INFO  TaskGroupContainer - taskGroup[0] completed it's tasks.


2020-08-18 14:02:31.055 [job-0] INFO  StandAloneJobContainerCommunicator - Total 2 records, 15 bytes | Speed 1B/s, 0 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 0.000s |  All Task WaitReaderTime 0.000s | Percentage 100.00%
2020-08-18 14:02:31.056 [job-0] INFO  AbstractScheduler - Scheduler accomplished all tasks.
2020-08-18 14:02:31.057 [job-0] INFO  JobContainer - DataX Writer.Job [clickhousewriter] do post work.
2020-08-18 14:02:31.058 [job-0] INFO  JobContainer - DataX Reader.Job [mysqlreader] do post work.
2020-08-18 14:02:31.058 [job-0] INFO  JobContainer - DataX jobId [0] completed successfully.
2020-08-18 14:02:31.060 [job-0] INFO  HookInvoker - No hook invoked, because base dir not exists or is a file: /opt/DataX-master/target/datax/datax/hook
2020-08-18 14:02:31.063 [job-0] INFO  JobContainer - 
     [total cpu info] => 
        averageCpu                     | maxDeltaCpu                    | minDeltaCpu                    
        -1.00%                         | -1.00%                         | -1.00%


     [total gc info] => 
         NAME                 | totalGCCount       | maxDeltaGCCount    | minDeltaGCCount    | totalGCTime        | maxDeltaGCTime     | minDeltaGCTime     
         PS MarkSweep         | 0                  | 0                  | 0                  | 0.000s             | 0.000s             | 0.000s             
         PS Scavenge          | 0                  | 0                  | 0                  | 0.000s             | 0.000s             | 0.000s             

2020-08-18 14:02:31.064 [job-0] INFO  JobContainer - PerfTrace not enable!
2020-08-18 14:02:31.065 [job-0] INFO  StandAloneJobContainerCommunicator - Total 2 records, 15 bytes | Speed 1B/s, 0 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 0.000s |  All Task WaitReaderTime 0.000s | Percentage 100.00%
2020-08-18 14:02:31.067 [job-0] INFO  JobContainer - 
任务启动时刻                    : 2020-08-18 14:02:20
任务结束时刻                    : 2020-08-18 14:02:31
任务总计耗时                    :                 10s
任务平均流量                    :                1B/s
记录写入速度                    :              0rec/s
读出记录总数                    :                   2
读写失败总数                    :                   0

复制

整个输出还是非常详细的,可以看到具体的同步状态,同步效率。

DATAX 插件

我们实现的是mysql2clickhouse,索引用到了 MysqlReader 和 ClickHouseWriter 插件:

MysqlReader

ysqlReader通过JDBC连接器连接到远程的Mysql数据库,并根据用户配置的信息生成查询SELECT SQL语句,然后发送到远程Mysql数据库,并将该SQL执行返回结果使用DataX自定义的数据类型拼装为抽象的数据集,并传递给下游Writer处理。

对于用户配置Table、Column、Where的信息,MysqlReader将其拼接为SQL语句发送到Mysql数据库;对于用户配置querySql信息,MysqlReader直接将其发送到Mysql数据库。

样例
{
    "job": {
        "setting": {
            "speed": {
                 "channel":1
            }
        },
        "content": [
            {
                "reader": {
                    "name""mysqlreader",
                    "parameter": {
                        "username""root",
                        "password""root",
                        "connection": [
                            {
                                "querySql": [
                                    "select db_id,on_line_flag from db_info where db_id < 10;"
                                ],
                                "jdbcUrl": [
                                    "jdbc:mysql://bad_ip:3306/database",
                                    "jdbc:mysql://127.0.0.1:bad_port/database",
                                    "jdbc:mysql://127.0.0.1:3306/database"
                                ]
                            }
                        ]
                    }
                },
                "writer": {
                    "name""streamwriter",
                    "parameter": {
                        "print"false,
                        "encoding""UTF-8"
                    }
                }
            }
        ]
    }
}

复制
参数说明
  • jdbcUrl

    • JDBC连接信息,使用JSON的数组描述,并支持一个库填写多个连接地址

    • 必选:是

    • 默认值:无

  • table

    • 描述:所选取的需要同步的表。使用JSON的数组描述,因此支持多张表同时抽取。当配置为多张表时,用户自己需保证多张表是同一schema结构,MysqlReader不予检查表是否同一逻辑表。注意,table必须包含在connection配置单元中。

    • 必选:是

    • 默认值:无

  • column

    支持列裁剪,即列可以挑选部分列进行导出。

    支持列换序,即列可以不按照表schema信息进行导出。

    支持常量配置,用户需要按照Mysql SQL语法格式: ["id", "table
    ", "1", "'bazhen.csy'", "null", "to_char(a + 1)", "2.3" , "true"] id为普通列名,table
    为包含保留在的列名,1为整形数字常量,'bazhen.csy'为字符串常量,null为空指针,to_char(a + 1)为表达式,2.3为浮点数,true为布尔值。

    • 必选:是

    • 默认值:无

    • 描述:所配置的表中需要同步的列名集合,使用JSON的数组描述字段信息。用户使用*代表默认使用所有列配置,例如 ['*']。

  • splitPk

    • 描述:MysqlReader进行数据抽取时,如果指定splitPk,表示用户希望使用splitPk代表的字段进行数据分片,DataX因此会启动并发任务进行数据同步,这样可以大大提供数据同步的效能。

    • 推荐splitPk用户使用表主键,因为表主键通常情况下比较均匀,因此切分出来的分片也不容易出现数据热点。

    • 目前splitPk仅支持整形数据切分,不支持浮点、字符串、日期等其他类型。如果用户指定其他非支持类型,MysqlReader将报错!

    • 如果splitPk不填写,包括不提供splitPk或者splitPk值为空,DataX视作使用单通道同步该表数据。

    • 必选:否

    • 默认值:空

  • where

    • 描述:筛选条件,MysqlReader根据指定的column、table、where条件拼接SQL,并根据这个SQL进行数据抽取。在实际业务场景中,往往会选择当天的数据进行同步,可以将where条件指定为gmt_create > $bizdate 。注意:不可以将where条件指定为limit 10,limit不是SQL的合法where子句。

    • where条件可以有效地进行业务增量同步。如果不填写where语句,包括不提供where的key或者value,DataX均视作同步全量数据。

    • 必选:否

    • 默认值:无

  • querySql

    • 描述:在有些业务场景下,where这一配置项不足以描述所筛选的条件,用户可以通过该配置型来自定义筛选SQL。当用户配置了这一项之后,DataX系统就会忽略table,column这些配置型,直接使用这个配置项的内容对数据进行筛选,例如需要进行多表join后同步数据,使用select a,b from table_a join table_b on table_a.id = table_b.id

    • 当用户配置querySql时,MysqlReader直接忽略table、column、where条件的配置,querySql优先级大于table、column、where选项。

    • 必选:否

    • 默认值:无

ClickHouseWriter

使用clickhousewriter的官方jdbc接口, 批量把从reader读入的数据写入ClickHouse

样例
{
  "job": {
    "setting": {
        "speed": {
            "channel"2
        }
    },
    "content": [
      {
        "reader": {
          ...
        },
        "writer": {
            "name""clickhousewriter",
            "parameter": {
                "username""default",
                "password""zifei123",
                "column":["belonger","belonger_type","bs_flag","id","income_asset_code","income_fee","insert_time","logName","order_no","pair_id","price","quantity","trade_date","trade_no","trade_time","update_time"],
                "connection": [
                    {
                        "jdbcUrl""jdbc:clickhouse://127.0.0.1:8123/default",
                        "table":["XXX"]
                    }
                ]
            }
        }
      }
    ]
  }
}

复制
参数说明
  • jdbcUrl

    • 描述:ClickHouse的连接地址,目前支持多数据源并行导入,支持随机负载均衡,其格式为:jdbc:clickhouse://ip1:8123,ip2:8123/database

    • 必选:是

    • 默认值:无

  • batchSize

    • 描述:每次批量数据的条数

    • 必选:否

    • 默认值:2048

  • trySize

    • 描述:失败后重试的次数

    • 必选:否

    • 默认值:30

导入建议

1)每次传8192,既batchSize设为8192

2)数据最好跟ClickHouse分区Key分组排序,这样有更好的插入性能

总结

整个测试过程中遇到非常多的坑,主要集中在job.json文件的配置上。DATAX对json文件的要求非常高,因为整个同步完全依赖这个json文件,有一点不标准的地方,都会导致无法同步。

本次实验的操作应用场景有限,比如可以用于流水表(数据全是新增,没有update,同步判断条件可以根据id)。复杂的场景可能就需要考虑其他同步方案了,比如用python实现mysql到ck的同步。

除此以外,对于clickhouse不熟悉也增加了测试的难度。虽然之前研究测试过ck,但实际运维还是需要多加练习,下面补充本次测试用到的操作。

clickhouse相关操作

1. 用户改密

DATAX job.json里的password为必填项,而ck的默认账户default密码为空,需要设置一个密码或新建一个同步账号。

vim /etc/clickhouse-server/users.xml

<password>123456</password> #取消这行注释,配一个明文密码

复制

一般来说,ck都会配置加密的密码,方法类似,生成个加密串,指定在里即可。

2. 连接方式

默认安装完的ck,直接使用 clickhouse-client 即可登录

[root@localhost ~]clickhouse-client
ClickHouse client version 19.17.4.11 (official build).
Connecting to localhost:9000 as user default.
Connected to ClickHouse server version 19.17.4 revision 54428.

localhost :)

复制

加完密后,可以使用参数指定:

[root@localhost ~]clickhouse-client --user default --password 123456 --host 127.0.0.1 --port 9000
ClickHouse client version 19.17.4.11 (official build).
Connecting to 127.0.0.1:9000 as user default.
Connected to ClickHouse server version 19.17.4 revision 54428.

localhost :) 

复制
3. clickhouse 的建表语句

先看下mysql的测试表结构:

mysql> desc test_update;
+-------+-------------+------+-----+---------+-------+
| Field | Type        | Null | Key | Default | Extra |
+-------+-------------+------+-----+---------+-------+
|
 id    | int(11)     | NO   | PRI | NULL    |       |
| uid   | int(11)     | YES  |     | NULL    |       |
|
 name  | varchar(10) | YES  |     | NULL    |       |
+-------+-------------+------+-----+---------+-------+

复制

对应ck的建表语句:

CREATE TABLE test_update
(
    `id` UInt16, 
    `uid` String
    `name` String
    `create_date` date
)
ENGINE = MergeTree(create_date, id8192)

复制


文章转载自辣肉面加蛋加素鸡,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论