上一期文章讲述了flink同步MySQL数据到Starrocks,其实也可以使用flink同步SQL Server的数据到Starrocks。这一期,就讲述使用阿里开发的DataX数据同步工具,从SQL Server将数据同步到Starrocks。
从DataX和flink的功能和作用看,还是有点区别的,虽然大体上说,都支持CDC,都可以实现全量初始化。但是,后者的CDC能力更强,可以实现增量数据的实时同步,而前者不能完成实时同步工作。
使用DataX就比较简单,首先需要安装python3,其次就是将dataX的介质包解压出来,然后将starrockswriter.tar解压到/opt/datax/plugin/writer路径下,再配置上job.json文件,就可以使用了。
安装与配置
--解压安装包
tar -zxvf datax.tar.gz
cd opt/datax/plugin/writer
tar -xvf starrockswriter.tar
cd opt/datax/job
cat >mssql_job.json
{
"job": {
"setting": {
"speed": {
"channel": 1
},
"errorLimit": {
"record": 0,
"percentage": 0
}
},
"content": [ { "reader": { "name": "sqlserverreader", "parameter": { "username": "susutostar", "password": "Manager123", "column": [ "id", "name", "age" ],
"splitPk": "id",
"where": " id >= 1111 and id < 1115",
"connection": [ { "table": ["dbo.mytest_mssql_dx"],
"jdbcUrl": [ "jdbc:sqlserver://172.17.199.37:1433;DatabaseName=mssqldxtostardb" ]
}
]
}
},
"writer": {
"name": "starrockswriter",
"parameter": {
"username": "root",
"password": "password",
"database": "mssqldxtostardb",
"table": "mytest_mssql_dx",
"column": ["id", "name", "age"],
"preSql": [],
"postSql": [],
"jdbcUrl": "jdbc:mysql://172.17.199.36:9030/",
"loadUrl": ["172.17.199.36:8030"],
"loadProps": {}
}
}
}
]
}
}
##这个job.json配置文件,格式要求非常严格,稍有格式不正确,就会报错。
复制
"table": ["dbo.mytest_mssql_dx"]:表名前需要加上dbo schema【模式】。
"where": " id >= 1111 and id < 1115":是CDC增量同步使用的参数,全量初始化时候不需要where参数。
执行数据同步
python3 datax/bin/datax.py --jvm="-Xms1G -Xmx1G" --loglevel=debug datax/job/mssql_job.json
DataX (DATAX-OPENSOURCE-3.0), From Alibaba !
Copyright (C) 2010-2017, Alibaba Group. All Rights Reserved.
2024-04-01 19:39:09.414 [main] INFO MessageSource - JVM TimeZone: GMT+08:00, Locale: zh_CN
2024-04-01 19:39:09.417 [main] INFO MessageSource - use Locale: zh_CN timeZone: sun.util.calendar.ZoneInfo[id="GMT+08:00",offset=28800000,dstSavings=0,useDaylight=false,transitions=0,lastRule=null]
2024-04-01 19:39:09.419 [main] DEBUG MessageSource - initEnvironment MessageSource.locale[zh_CN], MessageSource.timeZone[sun.util.calendar.ZoneInfo[id="GMT+08:00",offset=28800000,dstSavings=0,useDaylight=false,transitions=0,lastRule=null]]
2024-04-01 19:39:09.419 [main] DEBUG MessageSource - loadResourceBundle with locale[zh_CN], timeZone[sun.util.calendar.ZoneInfo[id="GMT+08:00",offset=28800000,dstSavings=0,useDaylight=false,transitions=0,lastRule=null]], baseName[com.alibaba.datax.common.util.LocalStrings]
2024-04-01 19:39:09.419 [main] DEBUG MessageSource - loadResourceBundle classLoader:jdk.internal.loader.ClassLoaders$AppClassLoader@799f7e29
2024-04-01 19:39:09.471 [main] INFO VMInfo - VMInfo# operatingSystem class => com.sun.management.internal.OperatingSystemImpl
2024-04-01 19:39:09.476 [main] INFO Engine - the machine info =>
osInfo: Linux amd64 5.15.0-92-generic
jvmInfo: Oracle Corporation 11 11.0.1+13
cpu num: 2
totalPhysicalMemory: -0.00G
freePhysicalMemory: -0.00G
maxFileDescriptorCount: -1
currentOpenFileDescriptorCount: -1
GC Names [Copy, MarkSweepCompact]
MEMORY_NAME | allocation_size | init_size
CodeHeap 'profiled nmethods' | 117.22MB | 2.44MB
Eden Space | 273.06MB | 273.06MB
Survivor Space | 34.13MB | 34.13MB
CodeHeap 'non-profiled nmethods' | 117.22MB | 2.44MB
Compressed Class Space | 1,024.00MB | 0.00MB
Metaspace | -0.00MB | 0.00MB
CodeHeap 'non-nmethods' | 5.56MB | 2.44MB
Tenured Gen | 682.69MB | 682.69MB
2024-04-01 19:39:09.488 [main] INFO Engine -
{
"setting":{
"speed":{
"channel":1
},
"errorLimit":{
"record":0,
"percentage":0
}
},
"content":[
{
"reader":{
"name":"sqlserverreader",
"parameter":{
"username":"susutostar",
"password":"**********",
"column":[
"id",
"name",
"age"
],
"splitPk":"id",
"connection":[
{
"table":[
"dbo.mytest_mssql_dx"
],
"jdbcUrl":[
"jdbc:sqlserver://172.17.199.37:1433;DatabaseName=mssqldxtostardb"
]
}
]
}
},
"writer":{
"name":"starrockswriter",
"parameter":{
"username":"root",
"password":"*********",
"database":"mssqldxtostardb",
"table":"mytest_mssql_dx",
"column":[
"id",
"name",
"age"
],
"preSql":[
],
"postSql":[
],
"jdbcUrl":"jdbc:mysql://172.17.199.36:9030/",
"loadUrl":[
"172.17.199.36:8030"
],
"loadProps":{
}
}
}
}
]
}
2024-04-01 19:39:09.492 [main] DEBUG Engine - {"job":{"setting":{"speed":{"channel":1},"errorLimit":{"record":0,"percentage":0}},"content":[{"reader":{"name":"sqlserverreader","parameter":{"username":"susutostar","password":"Manager123","column":["id","name","age"],"splitPk":"id","connection":[{"table":["dbo.mytest_mssql_dx"],"jdbcUrl":["jdbc:sqlserver://172.17.199.37:1433;DatabaseName=mssqldxtostardb"]}]}},"writer":{"name":"starrockswriter","parameter":{"username":"root","password":"FAct@5500","database":"mssqldxtostardb","table":"mytest_mssql_dx","column":["id","name","age"],"preSql":[],"postSql":[],"jdbcUrl":"jdbc:mysql://172.17.199.36:9030/","loadUrl":["172.17.199.36:8030"],"loadProps":{}}}}]},"core":{"container":{"trace":{"enable":"false"},"job":{"reportInterval":10000,"id":-1},"taskGroup":{"channel":5}},"dataXServer":{"reportDataxLog":false,"address":"http://localhost:7001/api","timeout":10000,"reportPerfLog":false},"transport":{"exchanger":{"class":"com.alibaba.datax.core.plugin.BufferedRecordExchanger","bufferSize":32},"channel":{"byteCapacity":67108864,"flowControlInterval":20,"class":"com.alibaba.datax.core.transport.channel.memory.MemoryChannel","speed":{"byte":-1,"record":-1},"capacity":512}},"statistics":{"collector":{"plugin":{"taskClass":"com.alibaba.datax.core.statistics.plugin.task.StdoutPluginCollector","maxDirtyNumber":10}}}},"common":{"column":{"dateFormat":"yyyy-MM-dd","datetimeFormat":"yyyy-MM-dd HH:mm:ss","timeFormat":"HH:mm:ss","extraFormats":["yyyyMMdd"],"timeZone":"GMT+8","encoding":"utf-8"}},"entry":{"jvm":"-Xms1G -Xmx1G"},"plugin":{"reader":{"sqlserverreader":{"path":"/opt/datax/plugin/reader/sqlserverreader","name":"sqlserverreader","description":"useScene: test. mechanism: use datax framework to transport data from SQL Server. warn: The more you know about the data, the less problems you encounter.","developer":"alibaba","class":"com.alibaba.datax.plugin.reader.sqlserverreader.SqlServerReader"}},"writer":{"starrockswriter":{"path":"/opt/datax/plugin/writer/starrockswriter","name":"starrockswriter","description":"useScene: prod. mechanism: StarRocksStreamLoad. warn: The more you know about the database, the less problems you encounter.","developer":"starrocks","class":"com.starrocks.connector.datax.plugin.writer.starrockswriter.StarRocksWriter"}}}}
2024-04-01 19:39:09.512 [main] INFO PerfTrace - PerfTrace traceId=job_-1, isEnable=false
2024-04-01 19:39:09.512 [main] INFO JobContainer - DataX jobContainer starts job.
2024-04-01 19:39:09.513 [main] DEBUG JobContainer - jobContainer starts to do preHandle ...
2024-04-01 19:39:09.513 [main] DEBUG JobContainer - jobContainer starts to do init ...
2024-04-01 19:39:09.514 [main] INFO JobContainer - Set jobId = 0
2024-04-01 19:39:10.047 [job-0] INFO OriginalConfPretreatmentUtil - Available jdbcUrl:jdbc:sqlserver://172.17.199.37:1433;DatabaseName=mssqldxtostardb.
2024-04-01 19:39:10.227 [job-0] INFO OriginalConfPretreatmentUtil - table:[dbo.mytest_mssql_dx] has columns:[id,name,age].
2024-04-01 19:39:10.228 [job-0] DEBUG CommonRdbmsReader$Job - After job init(), job config now is:[
{"username":"susutostar","password":"Manager123","column":"id,name,age","splitPk":"id","connection":[{"table":["dbo.mytest_mssql_dx"],"jdbcUrl":"jdbc:sqlserver://172.17.199.37:1433;DatabaseName=mssqldxtostardb"}],"fetchSize":1024,"isTableMode":true,"tableNumber":1,"columnList":["id","name","age"]}
]
2024-04-01 19:39:10.245 [job-0] INFO JobContainer - jobContainer starts to do prepare ...
2024-04-01 19:39:10.246 [job-0] INFO JobContainer - DataX Reader.Job [sqlserverreader] do prepare work .
2024-04-01 19:39:10.246 [job-0] INFO JobContainer - DataX Writer.Job [starrockswriter] do prepare work .
2024-04-01 19:39:10.247 [job-0] INFO JobContainer - jobContainer starts to do split ...
2024-04-01 19:39:10.248 [job-0] INFO JobContainer - Job set Channel-Number to 1 channels.
2024-04-01 19:39:10.252 [job-0] INFO JobContainer - DataX Reader.Job [sqlserverreader] splits to [1] tasks.
2024-04-01 19:39:10.255 [job-0] INFO JobContainer - DataX Writer.Job [starrockswriter] splits to [1] tasks.
2024-04-01 19:39:10.256 [job-0] DEBUG JobContainer - transformer configuration: null
2024-04-01 19:39:10.280 [job-0] DEBUG JobContainer - contentConfig configuration: [{"internal":{"reader":{"parameter":{"username":"susutostar","password":"Manager123","column":"id,name,age","splitPk":"id","fetchSize":1024,"isTableMode":true,"tableNumber":1,"columnList":["id","name","age"],"jdbcUrl":"jdbc:sqlserver://172.17.199.37:1433;DatabaseName=mssqldxtostardb","table":"dbo.mytest_mssql_dx","querySql":"select id,name,age from dbo.mytest_mssql_dx "},"name":"sqlserverreader"},"writer":{"parameter":{"username":"root","password":"FAct@5500","database":"mssqldxtostardb","table":"mytest_mssql_dx","column":["id","name","age"],"preSql":[],"postSql":[],"jdbcUrl":"jdbc:mysql://172.17.199.36:9030/","loadUrl":["172.17.199.36:8030"],"loadProps":{}},"name":"starrockswriter"},"taskId":0},"keys":["reader.parameter.columnList[2]","reader.parameter.columnList[0]","writer.parameter.column[1]","writer.parameter.jdbcUrl","reader.parameter.splitPk","reader.parameter.fetchSize","reader.parameter.password","writer.name","reader.parameter.querySql","writer.parameter.password","writer.parameter.database","reader.name","reader.parameter.columnList[1]","writer.parameter.column[2]","reader.parameter.column","writer.parameter.column[0]","reader.parameter.jdbcUrl","writer.parameter.table","reader.parameter.tableNumber","reader.parameter.table","writer.parameter.loadUrl[0]","reader.parameter.isTableMode","reader.parameter.username","taskId","writer.parameter.username"],"secretKeyPathSet":[]}]
2024-04-01 19:39:10.280 [job-0] INFO JobContainer - jobContainer starts to do schedule ...
2024-04-01 19:39:10.283 [job-0] INFO JobContainer - Scheduler starts [1] taskGroups.
2024-04-01 19:39:10.285 [job-0] INFO JobContainer - Running by standalone Mode.
2024-04-01 19:39:10.296 [taskGroup-0] DEBUG TaskGroupContainer - taskGroup[0]'s task configs[[{"internal":{"reader":{"parameter":{"username":"susutostar","password":"Manager123","column":"id,name,age","splitPk":"id","fetchSize":1024,"isTableMode":true,"tableNumber":1,"columnList":["id","name","age"],"jdbcUrl":"jdbc:sqlserver://172.17.199.37:1433;DatabaseName=mssqldxtostardb","table":"dbo.mytest_mssql_dx","querySql":"select id,name,age from dbo.mytest_mssql_dx "},"name":"sqlserverreader"},"writer":{"parameter":{"username":"root","password":"FAct@5500","database":"mssqldxtostardb","table":"mytest_mssql_dx","column":["id","name","age"],"preSql":[],"postSql":[],"jdbcUrl":"jdbc:mysql://172.17.199.36:9030/","loadUrl":["172.17.199.36:8030"],"loadProps":{}},"name":"starrockswriter"},"taskId":0},"keys":["reader.parameter.columnList[2]","reader.parameter.columnList[0]","writer.parameter.column[1]","writer.parameter.jdbcUrl","reader.parameter.splitPk","reader.parameter.fetchSize","reader.parameter.password","writer.name","reader.parameter.querySql","writer.parameter.password","writer.parameter.database","reader.name","reader.parameter.columnList[1]","writer.parameter.column[2]","reader.parameter.column","writer.parameter.column[0]","reader.parameter.jdbcUrl","writer.parameter.table","reader.parameter.tableNumber","reader.parameter.table","writer.parameter.loadUrl[0]","reader.parameter.isTableMode","reader.parameter.username","taskId","writer.parameter.username"],"secretKeyPathSet":[]}]]
2024-04-01 19:39:10.301 [taskGroup-0] INFO TaskGroupContainer - taskGroupId=[0] start [1] channels for [1] tasks.
2024-04-01 19:39:10.307 [taskGroup-0] INFO Channel - Channel set byte_speed_limit to -1, No bps activated.
2024-04-01 19:39:10.307 [taskGroup-0] INFO Channel - Channel set record_speed_limit to -1, No tps activated.
2024-04-01 19:39:10.315 [job-0] DEBUG AbstractScheduler - com.alibaba.datax.core.statistics.communication.Communication@117e0fe5[
counter={}
state=RUNNING
throwable=<null>
timestamp=1711971550289
message={}
]
2024-04-01 19:39:10.331 [0-0-0-writer] DEBUG WriterRunner - task writer starts to do init ...
2024-04-01 19:39:10.336 [taskGroup-0] INFO TaskGroupContainer - taskGroup[0] taskId[0] attemptCount[1] is started
2024-04-01 19:39:10.341 [0-0-0-reader] DEBUG ReaderRunner - task reader starts to do init ...
2024-04-01 19:39:10.347 [0-0-0-reader] DEBUG ReaderRunner - task reader starts to do prepare ...
2024-04-01 19:39:10.348 [0-0-0-reader] DEBUG ReaderRunner - task reader starts to read ...
2024-04-01 19:39:10.348 [0-0-0-reader] INFO CommonRdbmsReader$Task - Begin to read record by Sql: [select id,name,age from dbo.mytest_mssql_dx
] jdbcUrl:[jdbc:sqlserver://172.17.199.37:1433;DatabaseName=mssqldxtostardb].
2024-04-01 19:39:10.369 [0-0-0-writer] DEBUG WriterRunner - task writer starts to do prepare ...
2024-04-01 19:39:10.369 [0-0-0-writer] DEBUG WriterRunner - task writer starts to write ...
2024-04-01 19:39:10.431 [0-0-0-reader] INFO CommonRdbmsReader$Task - Finished read record by Sql: [select id,name,age from dbo.mytest_mssql_dx
] jdbcUrl:[jdbc:sqlserver://172.17.199.37:1433;DatabaseName=mssqldxtostardb].
2024-04-01 19:39:10.433 [0-0-0-reader] DEBUG ReaderRunner - task reader starts to do post ...
2024-04-01 19:39:10.433 [0-0-0-reader] DEBUG ReaderRunner - task reader starts to do destroy ...
2024-04-01 19:39:10.434 [0-0-0-writer] DEBUG WriterRunner - task writer starts to do post ...
2024-04-01 19:39:10.434 [0-0-0-writer] DEBUG StarRocksWriterManager - StarRocks Sink is about to close: label[2ed2214a-6684-452f-a838-828a3d918759].
2024-04-01 19:39:10.435 [Thread-0] DEBUG StarRocksWriterManager - Async stream load: rows[6] bytes[74] label[2ed2214a-6684-452f-a838-828a3d918759].
2024-04-01 19:39:10.441 [Thread-0] DEBUG StarRocksStreamLoadVisitor - Start to join batch data: rows[6] bytes[74] label[2ed2214a-6684-452f-a838-828a3d918759].
2024-04-01 19:39:10.441 [Thread-0] INFO StarRocksStreamLoadVisitor - Executing stream load to: 'http://172.17.199.36:8030/api/mssqldxtostardb/mytest_mssql_dx/_stream_load', size: '80'
2024-04-01 19:39:10.721 [Thread-0] DEBUG StarRocksStreamLoadVisitor - StreamLoad response:
{"Status":"Success","BeginTxnTimeMs":1,"Message":"OK","NumberUnselectedRows":0,"CommitAndPublishTimeMs":25,"Label":"2ed2214a-6684-452f-a838-828a3d918759","LoadBytes":80,"StreamLoadPlanTimeMs":2,"NumberTotalRows":6,"WriteDataTimeMs":11,"TxnId":3018,"LoadTimeMs":42,"ReadDataTimeMs":0,"NumberLoadedRows":6,"NumberFilteredRows":0}
2024-04-01 19:39:10.721 [Thread-0] INFO StarRocksWriterManager - Async stream load finished: label[2ed2214a-6684-452f-a838-828a3d918759].
2024-04-01 19:39:10.721 [0-0-0-writer] DEBUG WriterRunner - task writer starts to do destroy ...
2024-04-01 19:39:10.738 [taskGroup-0] INFO TaskGroupContainer - taskGroup[0] taskId[0] is successed, used[413]ms
2024-04-01 19:39:10.738 [taskGroup-0] INFO TaskGroupContainer - taskGroup[0] completed it's tasks.
2024-04-01 19:39:20.317 [job-0] DEBUG AbstractScheduler - com.alibaba.datax.core.statistics.communication.Communication@184497d1[
counter={writeSucceedRecords=7, readSucceedRecords=6, totalErrorBytes=0, writeSucceedBytes=62, byteSpeed=0, totalErrorRecords=0, recordSpeed=0, waitReaderTime=62491427, writeReceivedBytes=62, stage=1, waitWriterTime=66961, percentage=1.0, totalReadRecords=6, writeReceivedRecords=7, readSucceedBytes=62, totalReadBytes=62}
state=SUCCEEDED
throwable=<null>
timestamp=1711971560316
message={}
]
2024-04-01 19:39:20.319 [job-0] INFO StandAloneJobContainerCommunicator - Total 6 records, 62 bytes | Speed 6B/s, 0 records/s | Error 0 records, 0 bytes | All Task WaitWriterTime 0.000s | All Task WaitReaderTime 0.062s | Percentage 100.00%
2024-04-01 19:39:20.319 [job-0] INFO AbstractScheduler - Scheduler accomplished all tasks.
2024-04-01 19:39:20.319 [job-0] DEBUG JobContainer - jobContainer starts to do post ...
2024-04-01 19:39:20.319 [job-0] INFO JobContainer - DataX Writer.Job [starrockswriter] do post work.
2024-04-01 19:39:20.320 [job-0] INFO JobContainer - DataX Reader.Job [sqlserverreader] do post work.
2024-04-01 19:39:20.320 [job-0] DEBUG JobContainer - jobContainer starts to do postHandle ...
2024-04-01 19:39:20.320 [job-0] INFO JobContainer - DataX jobId [0] completed successfully.
2024-04-01 19:39:20.320 [job-0] INFO HookInvoker - No hook invoked, because base dir not exists or is a file: opt/datax/hook
2024-04-01 19:39:20.322 [job-0] INFO JobContainer -
[total cpu info] =>
averageCpu | maxDeltaCpu | minDeltaCpu
-1.00% | -1.00% | -1.00%
[total gc info] =>
NAME | totalGCCount | maxDeltaGCCount | minDeltaGCCount | totalGCTime | maxDeltaGCTime | minDeltaGCTime
Copy | 0 | 0 | 0 | 0.000s | 0.000s | 0.000s
MarkSweepCompact | 1 | 1 | 1 | 0.025s | 0.025s | 0.025s
2024-04-01 19:39:20.322 [job-0] INFO JobContainer - PerfTrace not enable!
2024-04-01 19:39:20.322 [job-0] INFO StandAloneJobContainerCommunicator - Total 6 records, 62 bytes | Speed 6B/s, 0 records/s | Error 0 records, 0 bytes | All Task WaitWriterTime 0.000s | All Task WaitReaderTime 0.062s | Percentage 100.00%
2024-04-01 19:39:20.324 [job-0] INFO JobContainer -
任务启动时刻 : 2024-04-01 19:39:09
任务结束时刻 : 2024-04-01 19:39:20
任务总计耗时 : 10s
任务平均流量 : 6B/s
记录写入速度 : 0rec/s
读出记录总数 : 6
读写失败总数 : 0
root@startestdb01:/opt#
复制
我初始化时候,就只同步了6条记录,也就是我同步的表只有6条记录,没有where参数,进行了全表扫,获取到6条记录,然后通过StarRocksWriterManager将获取到的6条记录写入到Starrocks数据库中。
到这里,使用DataX已经完成了将SQL Server的数据同步到了Starrocks数据库上面,但这还不是实时同步,也不是CDC功能完成的事情。
DataX CDC功能延伸
从上面图片中我们可以看到,既然DataX也具有CDC能力,那我们稍微看一下它是如何实现这个增量数据同步的,但还说不上是实时同步。
原理上讲,就是在job.json配置文件里加上where参数,其实就是加上过滤条件。DataX在实现CDC能力的方法上说,我们可以通过以下说明。
1、DataxWeb
现在生产实践上,已经有人开发了DataxWeb这一套数据同步工具,使原本只是文件脚本操作的DataX工具,演化成了可视化操作界面配置,并进行执行。本质上就是给DataX工具,提供一个可记录前面同步操作信息的程序,并在程序设置一个定时任务。这套程序,采用MySQL作为数据存储,然后应用程序采用docker容器。这套DataxWeb程序只支持表中带有时间字段或者自增id字段这两种,作为增量同步的过滤条件。作用的业务场景还是相对有限,同时还消耗相当一部分的内存资源,用来部署MySQL和docker容器,亲测,这部分至少需要2G内存,才能供DataxWeb跑起来。
2、自主编写Python脚本工具
这种方法实现起来,满足更多的业务数据同步场景,只是没有这个高的稳定性。
这种方法,不需要部署一个MySQL数据库和docker容器,跑相应程序,所以也不需要消耗磁盘资源和内存资源。只需要编写一个Python程序,先去同步的源端获取同步表增量过滤字段的信息,作为where参数的变量字段,该字段的变化值,作为增量数据的过滤范围,然后更改到job.json文件上,最后Python程序再调用执行datax进行增量数据同步。这种方法,操作简便,只需要根据自己需求,将where过滤条件,可以扩展到任何类型的字段,额外部署一个contab自动任务调度,就完成了DataX 的CDC功能。
第二种方法,可以再后续单独分享出来,目前还在优化Python程序。