暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

DataX同步SQL Server数据到Starrocks

skylines 2024-04-07
168

上一期文章讲述了flink同步MySQL数据到Starrocks,其实也可以使用flink同步SQL Server的数据到Starrocks。这一期,就讲述使用阿里开发的DataX数据同步工具,从SQL Server将数据同步到Starrocks。

从DataX和flink的功能和作用看,还是有点区别的,虽然大体上说,都支持CDC,都可以实现全量初始化。但是,后者的CDC能力更强,可以实现增量数据的实时同步,而前者不能完成实时同步工作。

使用DataX就比较简单,首先需要安装python3,其次就是将dataX的介质包解压出来,然后将starrockswriter.tar解压到/opt/datax/plugin/writer路径下,再配置上job.json文件,就可以使用了。

安装与配置

    --解压安装包
    tar -zxvf datax.tar.gz
    cd opt/datax/plugin/writer


    tar -xvf starrockswriter.tar




    cd opt/datax/job
    cat >mssql_job.json
    {
    "job": {
    "setting": {
    "speed": {
    "channel": 1
    },
    "errorLimit": {
    "record": 0,
    "percentage": 0
    }
    },
    "content": [ { "reader": { "name": "sqlserverreader", "parameter": { "username": "susutostar", "password": "Manager123", "column": [ "id", "name", "age" ],
    "splitPk": "id",
    "where": " id >= 1111 and id < 1115",
    "connection": [ { "table": ["dbo.mytest_mssql_dx"],
    "jdbcUrl": [ "jdbc:sqlserver://172.17.199.37:1433;DatabaseName=mssqldxtostardb" ]
    }
    ]
    }
    },
    "writer": {
    "name": "starrockswriter",
    "parameter": {
    "username": "root",
                            "password": "password",
    "database": "mssqldxtostardb",
    "table": "mytest_mssql_dx",
    "column": ["id", "name", "age"],
    "preSql": [],
    "postSql": [],
    "jdbcUrl": "jdbc:mysql://172.17.199.36:9030/",
    "loadUrl": ["172.17.199.36:8030"],
    "loadProps": {}
    }
    }
    }
    ]
    }
    }


    ##这个job.json配置文件,格式要求非常严格,稍有格式不正确,就会报错。




    复制

    "table": ["dbo.mytest_mssql_dx"]:表名前需要加上dbo schema【模式】。

    "where": " id >= 1111 and id < 1115":是CDC增量同步使用的参数,全量初始化时候不需要where参数。

    执行数据同步

      python3 datax/bin/datax.py --jvm="-Xms1G -Xmx1G" --loglevel=debug datax/job/mssql_job.json


      DataX (DATAX-OPENSOURCE-3.0), From Alibaba !
      Copyright (C) 2010-2017, Alibaba Group. All Rights Reserved.




      2024-04-01 19:39:09.414 [main] INFO MessageSource - JVM TimeZone: GMT+08:00, Locale: zh_CN
      2024-04-01 19:39:09.417 [main] INFO MessageSource - use Locale: zh_CN timeZone: sun.util.calendar.ZoneInfo[id="GMT+08:00",offset=28800000,dstSavings=0,useDaylight=false,transitions=0,lastRule=null]
      2024-04-01 19:39:09.419 [main] DEBUG MessageSource - initEnvironment MessageSource.locale[zh_CN], MessageSource.timeZone[sun.util.calendar.ZoneInfo[id="GMT+08:00",offset=28800000,dstSavings=0,useDaylight=false,transitions=0,lastRule=null]]
      2024-04-01 19:39:09.419 [main] DEBUG MessageSource - loadResourceBundle with locale[zh_CN], timeZone[sun.util.calendar.ZoneInfo[id="GMT+08:00",offset=28800000,dstSavings=0,useDaylight=false,transitions=0,lastRule=null]], baseName[com.alibaba.datax.common.util.LocalStrings]
      2024-04-01 19:39:09.419 [main] DEBUG MessageSource - loadResourceBundle classLoader:jdk.internal.loader.ClassLoaders$AppClassLoader@799f7e29
      2024-04-01 19:39:09.471 [main] INFO VMInfo - VMInfo# operatingSystem class => com.sun.management.internal.OperatingSystemImpl
      2024-04-01 19:39:09.476 [main] INFO Engine - the machine info =>


      osInfo: Linux amd64 5.15.0-92-generic
      jvmInfo: Oracle Corporation 11 11.0.1+13
      cpu num: 2


      totalPhysicalMemory: -0.00G
      freePhysicalMemory: -0.00G
      maxFileDescriptorCount: -1
      currentOpenFileDescriptorCount: -1


      GC Names [Copy, MarkSweepCompact]


      MEMORY_NAME | allocation_size | init_size
      CodeHeap 'profiled nmethods' | 117.22MB | 2.44MB
      Eden Space | 273.06MB | 273.06MB
      Survivor Space | 34.13MB | 34.13MB
      CodeHeap 'non-profiled nmethods' | 117.22MB | 2.44MB
      Compressed Class Space | 1,024.00MB | 0.00MB
      Metaspace | -0.00MB | 0.00MB
      CodeHeap 'non-nmethods' | 5.56MB | 2.44MB
      Tenured Gen | 682.69MB | 682.69MB




      2024-04-01 19:39:09.488 [main] INFO Engine -
      {
      "setting":{
      "speed":{
      "channel":1
      },
      "errorLimit":{
      "record":0,
      "percentage":0
      }
      },
      "content":[
      {
      "reader":{
      "name":"sqlserverreader",
      "parameter":{
      "username":"susutostar",
      "password":"**********",
      "column":[
      "id",
      "name",
      "age"
      ],
      "splitPk":"id",
      "connection":[
      {
      "table":[
      "dbo.mytest_mssql_dx"
      ],
      "jdbcUrl":[
      "jdbc:sqlserver://172.17.199.37:1433;DatabaseName=mssqldxtostardb"
      ]
      }
      ]
      }
      },
      "writer":{
      "name":"starrockswriter",
      "parameter":{
      "username":"root",
      "password":"*********",
      "database":"mssqldxtostardb",
      "table":"mytest_mssql_dx",
      "column":[
      "id",
      "name",
      "age"
      ],
      "preSql":[


      ],
      "postSql":[


      ],
      "jdbcUrl":"jdbc:mysql://172.17.199.36:9030/",
      "loadUrl":[
      "172.17.199.36:8030"
      ],
      "loadProps":{


      }
      }
      }
      }
      ]
      }


      2024-04-01 19:39:09.492 [main] DEBUG Engine - {"job":{"setting":{"speed":{"channel":1},"errorLimit":{"record":0,"percentage":0}},"content":[{"reader":{"name":"sqlserverreader","parameter":{"username":"susutostar","password":"Manager123","column":["id","name","age"],"splitPk":"id","connection":[{"table":["dbo.mytest_mssql_dx"],"jdbcUrl":["jdbc:sqlserver://172.17.199.37:1433;DatabaseName=mssqldxtostardb"]}]}},"writer":{"name":"starrockswriter","parameter":{"username":"root","password":"FAct@5500","database":"mssqldxtostardb","table":"mytest_mssql_dx","column":["id","name","age"],"preSql":[],"postSql":[],"jdbcUrl":"jdbc:mysql://172.17.199.36:9030/","loadUrl":["172.17.199.36:8030"],"loadProps":{}}}}]},"core":{"container":{"trace":{"enable":"false"},"job":{"reportInterval":10000,"id":-1},"taskGroup":{"channel":5}},"dataXServer":{"reportDataxLog":false,"address":"http://localhost:7001/api","timeout":10000,"reportPerfLog":false},"transport":{"exchanger":{"class":"com.alibaba.datax.core.plugin.BufferedRecordExchanger","bufferSize":32},"channel":{"byteCapacity":67108864,"flowControlInterval":20,"class":"com.alibaba.datax.core.transport.channel.memory.MemoryChannel","speed":{"byte":-1,"record":-1},"capacity":512}},"statistics":{"collector":{"plugin":{"taskClass":"com.alibaba.datax.core.statistics.plugin.task.StdoutPluginCollector","maxDirtyNumber":10}}}},"common":{"column":{"dateFormat":"yyyy-MM-dd","datetimeFormat":"yyyy-MM-dd HH:mm:ss","timeFormat":"HH:mm:ss","extraFormats":["yyyyMMdd"],"timeZone":"GMT+8","encoding":"utf-8"}},"entry":{"jvm":"-Xms1G -Xmx1G"},"plugin":{"reader":{"sqlserverreader":{"path":"/opt/datax/plugin/reader/sqlserverreader","name":"sqlserverreader","description":"useScene: test. mechanism: use datax framework to transport data from SQL Server. warn: The more you know about the data, the less problems you encounter.","developer":"alibaba","class":"com.alibaba.datax.plugin.reader.sqlserverreader.SqlServerReader"}},"writer":{"starrockswriter":{"path":"/opt/datax/plugin/writer/starrockswriter","name":"starrockswriter","description":"useScene: prod. mechanism: StarRocksStreamLoad. warn: The more you know about the database, the less problems you encounter.","developer":"starrocks","class":"com.starrocks.connector.datax.plugin.writer.starrockswriter.StarRocksWriter"}}}}
      2024-04-01 19:39:09.512 [main] INFO PerfTrace - PerfTrace traceId=job_-1, isEnable=false
      2024-04-01 19:39:09.512 [main] INFO JobContainer - DataX jobContainer starts job.
      2024-04-01 19:39:09.513 [main] DEBUG JobContainer - jobContainer starts to do preHandle ...
      2024-04-01 19:39:09.513 [main] DEBUG JobContainer - jobContainer starts to do init ...
      2024-04-01 19:39:09.514 [main] INFO JobContainer - Set jobId = 0
      2024-04-01 19:39:10.047 [job-0] INFO OriginalConfPretreatmentUtil - Available jdbcUrl:jdbc:sqlserver://172.17.199.37:1433;DatabaseName=mssqldxtostardb.
      2024-04-01 19:39:10.227 [job-0] INFO OriginalConfPretreatmentUtil - table:[dbo.mytest_mssql_dx] has columns:[id,name,age].
      2024-04-01 19:39:10.228 [job-0] DEBUG CommonRdbmsReader$Job - After job init(), job config now is:[
      {"username":"susutostar","password":"Manager123","column":"id,name,age","splitPk":"id","connection":[{"table":["dbo.mytest_mssql_dx"],"jdbcUrl":"jdbc:sqlserver://172.17.199.37:1433;DatabaseName=mssqldxtostardb"}],"fetchSize":1024,"isTableMode":true,"tableNumber":1,"columnList":["id","name","age"]}
      ]
      2024-04-01 19:39:10.245 [job-0] INFO JobContainer - jobContainer starts to do prepare ...
      2024-04-01 19:39:10.246 [job-0] INFO JobContainer - DataX Reader.Job [sqlserverreader] do prepare work .
      2024-04-01 19:39:10.246 [job-0] INFO JobContainer - DataX Writer.Job [starrockswriter] do prepare work .
      2024-04-01 19:39:10.247 [job-0] INFO JobContainer - jobContainer starts to do split ...
      2024-04-01 19:39:10.248 [job-0] INFO JobContainer - Job set Channel-Number to 1 channels.
      2024-04-01 19:39:10.252 [job-0] INFO JobContainer - DataX Reader.Job [sqlserverreader] splits to [1] tasks.
      2024-04-01 19:39:10.255 [job-0] INFO JobContainer - DataX Writer.Job [starrockswriter] splits to [1] tasks.
      2024-04-01 19:39:10.256 [job-0] DEBUG JobContainer - transformer configuration: null
      2024-04-01 19:39:10.280 [job-0] DEBUG JobContainer - contentConfig configuration: [{"internal":{"reader":{"parameter":{"username":"susutostar","password":"Manager123","column":"id,name,age","splitPk":"id","fetchSize":1024,"isTableMode":true,"tableNumber":1,"columnList":["id","name","age"],"jdbcUrl":"jdbc:sqlserver://172.17.199.37:1433;DatabaseName=mssqldxtostardb","table":"dbo.mytest_mssql_dx","querySql":"select id,name,age from dbo.mytest_mssql_dx "},"name":"sqlserverreader"},"writer":{"parameter":{"username":"root","password":"FAct@5500","database":"mssqldxtostardb","table":"mytest_mssql_dx","column":["id","name","age"],"preSql":[],"postSql":[],"jdbcUrl":"jdbc:mysql://172.17.199.36:9030/","loadUrl":["172.17.199.36:8030"],"loadProps":{}},"name":"starrockswriter"},"taskId":0},"keys":["reader.parameter.columnList[2]","reader.parameter.columnList[0]","writer.parameter.column[1]","writer.parameter.jdbcUrl","reader.parameter.splitPk","reader.parameter.fetchSize","reader.parameter.password","writer.name","reader.parameter.querySql","writer.parameter.password","writer.parameter.database","reader.name","reader.parameter.columnList[1]","writer.parameter.column[2]","reader.parameter.column","writer.parameter.column[0]","reader.parameter.jdbcUrl","writer.parameter.table","reader.parameter.tableNumber","reader.parameter.table","writer.parameter.loadUrl[0]","reader.parameter.isTableMode","reader.parameter.username","taskId","writer.parameter.username"],"secretKeyPathSet":[]}]
      2024-04-01 19:39:10.280 [job-0] INFO JobContainer - jobContainer starts to do schedule ...
      2024-04-01 19:39:10.283 [job-0] INFO JobContainer - Scheduler starts [1] taskGroups.
      2024-04-01 19:39:10.285 [job-0] INFO JobContainer - Running by standalone Mode.
      2024-04-01 19:39:10.296 [taskGroup-0] DEBUG TaskGroupContainer - taskGroup[0]'s task configs[[{"internal":{"reader":{"parameter":{"username":"susutostar","password":"Manager123","column":"id,name,age","splitPk":"id","fetchSize":1024,"isTableMode":true,"tableNumber":1,"columnList":["id","name","age"],"jdbcUrl":"jdbc:sqlserver://172.17.199.37:1433;DatabaseName=mssqldxtostardb","table":"dbo.mytest_mssql_dx","querySql":"select id,name,age from dbo.mytest_mssql_dx "},"name":"sqlserverreader"},"writer":{"parameter":{"username":"root","password":"FAct@5500","database":"mssqldxtostardb","table":"mytest_mssql_dx","column":["id","name","age"],"preSql":[],"postSql":[],"jdbcUrl":"jdbc:mysql://172.17.199.36:9030/","loadUrl":["172.17.199.36:8030"],"loadProps":{}},"name":"starrockswriter"},"taskId":0},"keys":["reader.parameter.columnList[2]","reader.parameter.columnList[0]","writer.parameter.column[1]","writer.parameter.jdbcUrl","reader.parameter.splitPk","reader.parameter.fetchSize","reader.parameter.password","writer.name","reader.parameter.querySql","writer.parameter.password","writer.parameter.database","reader.name","reader.parameter.columnList[1]","writer.parameter.column[2]","reader.parameter.column","writer.parameter.column[0]","reader.parameter.jdbcUrl","writer.parameter.table","reader.parameter.tableNumber","reader.parameter.table","writer.parameter.loadUrl[0]","reader.parameter.isTableMode","reader.parameter.username","taskId","writer.parameter.username"],"secretKeyPathSet":[]}]]
      2024-04-01 19:39:10.301 [taskGroup-0] INFO TaskGroupContainer - taskGroupId=[0] start [1] channels for [1] tasks.
      2024-04-01 19:39:10.307 [taskGroup-0] INFO Channel - Channel set byte_speed_limit to -1, No bps activated.
      2024-04-01 19:39:10.307 [taskGroup-0] INFO Channel - Channel set record_speed_limit to -1, No tps activated.
      2024-04-01 19:39:10.315 [job-0] DEBUG AbstractScheduler - com.alibaba.datax.core.statistics.communication.Communication@117e0fe5[
      counter={}
      state=RUNNING
      throwable=<null>
      timestamp=1711971550289
      message={}
      ]
      2024-04-01 19:39:10.331 [0-0-0-writer] DEBUG WriterRunner - task writer starts to do init ...
      2024-04-01 19:39:10.336 [taskGroup-0] INFO TaskGroupContainer - taskGroup[0] taskId[0] attemptCount[1] is started
      2024-04-01 19:39:10.341 [0-0-0-reader] DEBUG ReaderRunner - task reader starts to do init ...
      2024-04-01 19:39:10.347 [0-0-0-reader] DEBUG ReaderRunner - task reader starts to do prepare ...
      2024-04-01 19:39:10.348 [0-0-0-reader] DEBUG ReaderRunner - task reader starts to read ...
      2024-04-01 19:39:10.348 [0-0-0-reader] INFO CommonRdbmsReader$Task - Begin to read record by Sql: [select id,name,age from dbo.mytest_mssql_dx
      ] jdbcUrl:[jdbc:sqlserver://172.17.199.37:1433;DatabaseName=mssqldxtostardb].
      2024-04-01 19:39:10.369 [0-0-0-writer] DEBUG WriterRunner - task writer starts to do prepare ...
      2024-04-01 19:39:10.369 [0-0-0-writer] DEBUG WriterRunner - task writer starts to write ...
      2024-04-01 19:39:10.431 [0-0-0-reader] INFO CommonRdbmsReader$Task - Finished read record by Sql: [select id,name,age from dbo.mytest_mssql_dx
      ] jdbcUrl:[jdbc:sqlserver://172.17.199.37:1433;DatabaseName=mssqldxtostardb].
      2024-04-01 19:39:10.433 [0-0-0-reader] DEBUG ReaderRunner - task reader starts to do post ...
      2024-04-01 19:39:10.433 [0-0-0-reader] DEBUG ReaderRunner - task reader starts to do destroy ...
      2024-04-01 19:39:10.434 [0-0-0-writer] DEBUG WriterRunner - task writer starts to do post ...
      2024-04-01 19:39:10.434 [0-0-0-writer] DEBUG StarRocksWriterManager - StarRocks Sink is about to close: label[2ed2214a-6684-452f-a838-828a3d918759].
      2024-04-01 19:39:10.435 [Thread-0] DEBUG StarRocksWriterManager - Async stream load: rows[6] bytes[74] label[2ed2214a-6684-452f-a838-828a3d918759].
      2024-04-01 19:39:10.441 [Thread-0] DEBUG StarRocksStreamLoadVisitor - Start to join batch data: rows[6] bytes[74] label[2ed2214a-6684-452f-a838-828a3d918759].
      2024-04-01 19:39:10.441 [Thread-0] INFO StarRocksStreamLoadVisitor - Executing stream load to: 'http://172.17.199.36:8030/api/mssqldxtostardb/mytest_mssql_dx/_stream_load', size: '80'
      2024-04-01 19:39:10.721 [Thread-0] DEBUG StarRocksStreamLoadVisitor - StreamLoad response:
      {"Status":"Success","BeginTxnTimeMs":1,"Message":"OK","NumberUnselectedRows":0,"CommitAndPublishTimeMs":25,"Label":"2ed2214a-6684-452f-a838-828a3d918759","LoadBytes":80,"StreamLoadPlanTimeMs":2,"NumberTotalRows":6,"WriteDataTimeMs":11,"TxnId":3018,"LoadTimeMs":42,"ReadDataTimeMs":0,"NumberLoadedRows":6,"NumberFilteredRows":0}
      2024-04-01 19:39:10.721 [Thread-0] INFO StarRocksWriterManager - Async stream load finished: label[2ed2214a-6684-452f-a838-828a3d918759].
      2024-04-01 19:39:10.721 [0-0-0-writer] DEBUG WriterRunner - task writer starts to do destroy ...
      2024-04-01 19:39:10.738 [taskGroup-0] INFO TaskGroupContainer - taskGroup[0] taskId[0] is successed, used[413]ms
      2024-04-01 19:39:10.738 [taskGroup-0] INFO TaskGroupContainer - taskGroup[0] completed it's tasks.
      2024-04-01 19:39:20.317 [job-0] DEBUG AbstractScheduler - com.alibaba.datax.core.statistics.communication.Communication@184497d1[
      counter={writeSucceedRecords=7, readSucceedRecords=6, totalErrorBytes=0, writeSucceedBytes=62, byteSpeed=0, totalErrorRecords=0, recordSpeed=0, waitReaderTime=62491427, writeReceivedBytes=62, stage=1, waitWriterTime=66961, percentage=1.0, totalReadRecords=6, writeReceivedRecords=7, readSucceedBytes=62, totalReadBytes=62}
      state=SUCCEEDED
      throwable=<null>
      timestamp=1711971560316
      message={}
      ]
      2024-04-01 19:39:20.319 [job-0] INFO StandAloneJobContainerCommunicator - Total 6 records, 62 bytes | Speed 6B/s, 0 records/s | Error 0 records, 0 bytes | All Task WaitWriterTime 0.000s | All Task WaitReaderTime 0.062s | Percentage 100.00%
      2024-04-01 19:39:20.319 [job-0] INFO AbstractScheduler - Scheduler accomplished all tasks.
      2024-04-01 19:39:20.319 [job-0] DEBUG JobContainer - jobContainer starts to do post ...
      2024-04-01 19:39:20.319 [job-0] INFO JobContainer - DataX Writer.Job [starrockswriter] do post work.
      2024-04-01 19:39:20.320 [job-0] INFO JobContainer - DataX Reader.Job [sqlserverreader] do post work.
      2024-04-01 19:39:20.320 [job-0] DEBUG JobContainer - jobContainer starts to do postHandle ...
      2024-04-01 19:39:20.320 [job-0] INFO JobContainer - DataX jobId [0] completed successfully.
      2024-04-01 19:39:20.320 [job-0] INFO HookInvoker - No hook invoked, because base dir not exists or is a file: opt/datax/hook
      2024-04-01 19:39:20.322 [job-0] INFO JobContainer -
      [total cpu info] =>
      averageCpu | maxDeltaCpu | minDeltaCpu
      -1.00% | -1.00% | -1.00%



      [total gc info] =>
      NAME | totalGCCount | maxDeltaGCCount | minDeltaGCCount | totalGCTime | maxDeltaGCTime | minDeltaGCTime
      Copy | 0 | 0 | 0 | 0.000s | 0.000s | 0.000s
      MarkSweepCompact | 1 | 1 | 1 | 0.025s | 0.025s | 0.025s


      2024-04-01 19:39:20.322 [job-0] INFO JobContainer - PerfTrace not enable!
      2024-04-01 19:39:20.322 [job-0] INFO StandAloneJobContainerCommunicator - Total 6 records, 62 bytes | Speed 6B/s, 0 records/s | Error 0 records, 0 bytes | All Task WaitWriterTime 0.000s | All Task WaitReaderTime 0.062s | Percentage 100.00%
      2024-04-01 19:39:20.324 [job-0] INFO JobContainer -
      任务启动时刻 : 2024-04-01 19:39:09
      任务结束时刻 : 2024-04-01 19:39:20
      任务总计耗时 : 10s
      任务平均流量 : 6B/s
      记录写入速度 : 0rec/s
      读出记录总数 : 6
      读写失败总数 : 0


      root@startestdb01:/opt#
      复制

      我初始化时候,就只同步了6条记录,也就是我同步的表只有6条记录,没有where参数,进行了全表扫,获取到6条记录,然后通过StarRocksWriterManager将获取到的6条记录写入到Starrocks数据库中。

      到这里,使用DataX已经完成了将SQL Server的数据同步到了Starrocks数据库上面,但这还不是实时同步,也不是CDC功能完成的事情。

      DataX CDC功能延伸

      从上面图片中我们可以看到,既然DataX也具有CDC能力,那我们稍微看一下它是如何实现这个增量数据同步的,但还说不上是实时同步。

      原理上讲,就是在job.json配置文件里加上where参数,其实就是加上过滤条件。DataX在实现CDC能力的方法上说,我们可以通过以下说明。

      1、DataxWeb

      现在生产实践上,已经有人开发了DataxWeb这一套数据同步工具,使原本只是文件脚本操作的DataX工具,演化成了可视化操作界面配置,并进行执行。本质上就是给DataX工具,提供一个可记录前面同步操作信息的程序,并在程序设置一个定时任务。这套程序,采用MySQL作为数据存储,然后应用程序采用docker容器。这套DataxWeb程序只支持表中带有时间字段或者自增id字段这两种,作为增量同步的过滤条件。作用的业务场景还是相对有限,同时还消耗相当一部分的内存资源,用来部署MySQL和docker容器,亲测,这部分至少需要2G内存,才能供DataxWeb跑起来。

      2、自主编写Python脚本工具

      这种方法实现起来,满足更多的业务数据同步场景,只是没有这个高的稳定性。

      这种方法,不需要部署一个MySQL数据库和docker容器,跑相应程序,所以也不需要消耗磁盘资源和内存资源。只需要编写一个Python程序,先去同步的源端获取同步表增量过滤字段的信息,作为where参数的变量字段,该字段的变化值,作为增量数据的过滤范围,然后更改到job.json文件上,最后Python程序再调用执行datax进行增量数据同步。这种方法,操作简便,只需要根据自己需求,将where过滤条件,可以扩展到任何类型的字段,额外部署一个contab自动任务调度,就完成了DataX 的CDC功能。

      第二种方法,可以再后续单独分享出来,目前还在优化Python程序。

      文章转载自skylines,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

      评论