前言
环境准备 配置参数 经典示例 常见问题 说明


一、前言
上一篇文章讲述了 Dinky 在 Apache Doris 整库同步和模式演变的实践分享,相信大家已经跃跃欲试了。
本文将带来 Dinky 0.7.2 最新版本的整库同步实践和经验汇总。
二、环境准备
作业配置
Flink 版本区分
其他 FlinkCDC 支持
依赖上传
# 将下面 Dinky根目录下 整库同步依赖包放置 $FLINK_HOME/lib下
lib/dlink-client-base-${version}.jar
lib/dlink-common-${version}.jar
plugins/flink-${flink-version}/dlink-client-${version}.jar
复制
Application 作业提交
目前已经支持 application ,需提前准备好相关 jar 包,或者和 add jar 语法并用。以 mysqlcdc-2.3.0 和 flink-1.14 为例,需要以下 jar:
flink-shaded-guava-18.0-13.0.jar
HikariCP-4.0.3.jar
druid-1.2.8.jar
dlink-metadata-mysql-0.7.2.jar
dlink-metadata-base-0.7.2.jar
jackson-datatype-jsr310-2.13.4.jar
flink-sql-connector-mysql-cdc-2.3.0.jar
dlink-client-1.14-0.7.2.jar
复制
注意事项
一个 FlinkSQL 任务只能写一个 CDCSOURCE,CDCSOURCE 前可写 set、add jar 和 ddl 语句。
配置项中的英文逗号前不能加空格,需要紧随右单引号。
三、配置参数
配置项 | 是否必须 | 默认值 | 说明 |
---|---|---|---|
connector | 是 | 无 | 指定要使用的连接器 |
hostname | 是 | 无 | 数据库服务器的 IP 地址或主机名 |
port | 是 | 无 | 数据库服务器的端口号 |
username | 是 | 无 | 连接到数据库服务器时要使用的数据库的用户名 |
password | 是 | 无 | 连接到数据库服务器时要使用的数据库的密码 |
scan.startup.mode | 否 | latest-offset | 消费者的可选启动模式,有效枚举为“initial”和“latest-offset” |
database-name | 否 | 无 | 此参数非必填 |
table-name | 否 | 无 | 只支持正则,示例:"test\.student,test\.score",所有表示例:"test\..*" |
source.* | 否 | 无 | 指定个性化的 CDC 配置,如 source.server-time-zone 即为 server-time-zone 配置参数。 |
checkpoint | 否 | 无 | 单位 ms |
parallelism | 否 | 无 | 任务并行度 |
sink.connector | 是 | 无 | 指定 sink 的类型,如 datastream-kafka、datastream-doris、datastream-hudi、kafka、doris、hudi、jdbc 等等,以 datastream- 开头的为 DataStream 的实现方式 |
sink.sink.db | 否 | 无 | 目标数据源的库名,不指定时默认使用源数据源的库名 |
sink.table.prefix | 否 | 无 | 目标表的表名前缀,如 ODS_ 即为所有的表名前拼接 ODS_ |
sink.table.suffix | 否 | 无 | 目标表的表名后缀 |
sink.table.upper | 否 | false | 目标表的表名全大写 |
sink.table.lower | 否 | false | 目标表的表名全小写 |
sink.auto.create | 否 | false | 目标数据源自动建表,目前只支持 Mysql,其他可自行扩展 |
sink.timezone | 否 | UTC | 指定目标数据源的时区,在数据类型转换时自动生效 |
sink.column.replace.line-break | 否 | false | 指定是否去除换行符,即在数据转换中进行 REGEXP_REPLACE(column, '\n', '') |
sink.* | 否 | 无 | 目标数据源的配置信息,同 FlinkSQL,使用 ${schemaName} 和 ${tableName} 可注入经过处理的源表名 |
sink[N].* | 否 | 无 | N代表为多数据源写入, 默认从0开始到N, 其他配置参数信息参考sink.*的配置. |
四、经典示例
整库同步到 Print
EXECUTE CDCSOURCE demo_print WITH (
'connector' = 'mysql-cdc',
'hostname' = '127.0.0.1',
'port' = '3306',
'username' = 'root',
'password' = '123456',
'checkpoint' = '10000',
'scan.startup.mode' = 'initial',
'parallelism' = '1',
'table-name' = 'test\.student,test\.score',
'sink.connector' = 'print'
);
复制
整库同步到 Apache Doris
Doris 的 Flink 连接器参数随版本变化较大,以下为 Doris 1.2.0 版本的参数配置。
每次提交作业都需要手动修改
'sink.sink.label-prefix' = '${schemaName}_${tableName}_1'
,
比如改变尾部的数值。
EXECUTE CDCSOURCE demo_doris WITH (
'connector' = 'mysql-cdc',
'hostname' = '127.0.0.1',
'port' = '3306',
'username' = 'root',
'password' = '123456',
'checkpoint' = '10000',
'scan.startup.mode' = 'initial',
'parallelism' = '1',
'table-name' = 'test\.student,test\.score',
'sink.connector' = 'doris',
'sink.fenodes' = '127.0.0.1:8030',
'sink.username' = 'root',
'sink.password' = '123456',
'sink.doris.batch.size' = '1000',
'sink.sink.max-retries' = '1',
'sink.sink.batch.interval' = '60000',
'sink.sink.db' = 'test',
'sink.sink.properties.format' ='json',
'sink.sink.properties.read_json_by_line' ='true',
'sink.table.identifier' = '${schemaName}.${tableName}',
'sink.sink.label-prefix' = '${schemaName}_${tableName}_1'
);
复制
字段模式演变
EXECUTE CDCSOURCE demo_doris_schema_evolution WITH (
'connector' = 'mysql-cdc',
'hostname' = '127.0.0.1',
'port' = '3306',
'username' = 'root',
'password' = '123456',
'checkpoint' = '10000',
'scan.startup.mode' = 'initial',
'parallelism' = '1',
'table-name' = 'test\.student,test\.score',
'sink.connector' = 'datastream-doris-schema-evolution',
'sink.fenodes' = '127.0.0.1:8030',
'sink.username' = 'root',
'sink.password' = '123456',
'sink.doris.batch.size' = '1000',
'sink.sink.max-retries' = '1',
'sink.sink.batch.interval' = '60000',
'sink.sink.db' = 'test',
'sink.table.identifier' = '${schemaName}.${tableName}'
);
复制
整库同步到 Apache Hudi
${pkList}
,表示把每个表的主键字段用
.
号拼接起来,如表主键为
cid
和
sid
则表示为
cid.sid
,专门用于 hudi 指定recordkey.field 参数。
EXECUTE CDCSOURCE demo_hudi WITH (
'connector' = 'mysql-cdc',
'hostname' = '127.0.0.1',
'port' = '3306',
'username' = 'root',
'password' = '123456',
'checkpoint' = '10000',
'scan.startup.mode' = 'initial',
'parallelism' = '1',
'database-name'='bigdata',
'table-name'='bigdata\.products,bigdata\.orders',
'sink.connector'='hudi',
'sink.path'='hdfs://nameservice1/data/hudi/${tableName}',
'sink.hoodie.datasource.write.recordkey.field'='${pkList}',
'sink.hoodie.parquet.max.file.size'='268435456',
'sink.write.tasks'='1',
'sink.write.bucket_assign.tasks'='2',
'sink.write.precombine'='true',
'sink.compaction.async.enabled'='true',
'sink.write.task.max.size'='1024',
'sink.write.rate.limit'='3000',
'sink.write.operation'='upsert',
'sink.table.type'='COPY_ON_WRITE',
'sink.compaction.tasks'='1',
'sink.compaction.delta_seconds'='20',
'sink.compaction.async.enabled'='true',
'sink.read.streaming.skip_compaction'='true',
'sink.compaction.delta_commits'='20',
'sink.compaction.trigger.strategy'='num_or_time',
'sink.compaction.max_memory'='500',
'sink.changelog.enabled'='true',
'sink.read.streaming.enabled'='true',
'sink.read.streaming.check.interval'='3',
'sink.hive_sync.skip_ro_suffix' = 'true',
'sink.hive_sync.enable'='true',
'sink.hive_sync.mode'='hms',
'sink.hive_sync.metastore.uris'='thrift://bigdata1:9083',
'sink.hive_sync.db'='qhc_hudi_ods',
'sink.hive_sync.table'='${tableName}',
'sink.table.prefix.schema'='true'
)
复制
整库同步到 StarRocks
ods_
并转小写。
EXECUTE CDCSOURCE jobname WITH (
'connector' = 'mysql-cdc',
'hostname' = '127.0.0.1',
'port' = '3306',
'username' = 'root',
'password' = '123456',
'checkpoint' = '3000',
'scan.startup.mode' = 'initial',
'parallelism' = '1',
'table-name' = 'bigdata\.products,bigdata\.orders',
'sink.connector' = 'starrocks',
'sink.jdbc-url' = 'jdbc:mysql://127.0.0.1:19035',
'sink.load-url' = '127.0.0.1:18035',
'sink.username' = 'root',
'sink.password' = '123456',
'sink.sink.db' = 'ods',
'sink.table.prefix' = 'ods_',
'sink.table.lower' = 'true',
'sink.database-name' = 'ods',
'sink.table-name' = '${tableName}',
'sink.sink.properties.format' = 'json',
'sink.sink.properties.strip_outer_array' = 'true',
'sink.sink.max-retries' = '10',
'sink.sink.buffer-flush.interval-ms' = '15000',
'sink.sink.parallelism' = '1'
)
复制
整库同步到 Mysql
test_
,表名全小写,开启自动建表。
EXECUTE CDCSOURCE cdc_mysql WITH (
'connector' = 'mysql-cdc',
'hostname' = '127.0.0.1',
'port' = '3306',
'username' = 'root',
'password' = '123456',
'checkpoint' = '3000',
'scan.startup.mode' = 'initial',
'parallelism' = '1',
'table-name' = 'bigdata\.products,bigdata\.orders',
'sink.connector' = 'jdbc',
'sink.url' = 'jdbc:mysql://127.0.0.1:3306/test?characterEncoding=utf-8&useSSL=false',
'sink.username' = 'root',
'sink.password' = '123456',
'sink.sink.db' = 'test',
'sink.table.prefix' = 'test_',
'sink.table.lower' = 'true',
'sink.table-name' = '${tableName}',
'sink.driver' = 'com.mysql.jdbc.Driver',
'sink.sink.buffer-flush.interval' = '2s',
'sink.sink.buffer-flush.max-rows' = '100',
'sink.sink.max-retries' = '5',
'sink.auto.create' = 'true'
)
复制
整库同步到 Oracle
EXECUTE CDCSOURCE cdc_oracle WITH (
'connector' = 'oracle-cdc',
'hostname' = '127.0.0.1',
'port' = '1521',
'username'='root',
'password'='123456',
'database-name'='ORCL',
'checkpoint' = '3000',
'scan.startup.mode' = 'initial',
'parallelism' = '1',
'table-name' = 'TEST\..*',
'connector' = 'jdbc',
'url' = 'jdbc:oracle:thin:@127.0.0.1:1521:orcl',
'username' = 'root',
'password' = '123456',
'table-name' = 'TEST2.${tableName}'
)
复制
整库同步到 Kafka
sink.topic
参数时,所有 Change Log 会被写入这一个 topic。
EXECUTE CDCSOURCE cdc_kafka_one WITH (
'connector' = 'mysql-cdc',
'hostname' = '127.0.0.1',
'port' = '3306',
'username' = 'root',
'password' = '123456',
'checkpoint' = '3000',
'scan.startup.mode' = 'initial',
'parallelism' = '1',
'table-name' = 'bigdata\.products,bigdata\.orders',
'sink.connector'='datastream-kafka',
'sink.topic'='cdctest',
'sink.brokers'='bigdata2:9092,bigdata3:9092,bigdata4:9092'
)
复制
sink.topic
参数时,所有 Change Log 会被写入对应库表名的 topic。
EXECUTE CDCSOURCE cdc_kafka_mul WITH (
'connector' = 'mysql-cdc',
'hostname' = '127.0.0.1',
'port' = '3306',
'username' = 'root',
'password' = '123456',
'checkpoint' = '3000',
'scan.startup.mode' = 'initial',
'parallelism' = '1',
'table-name' = 'bigdata\.products,bigdata\.orders',
'sink.connector'='datastream-kafka',
'sink.brokers'='bigdata2:9092,bigdata3:9092,bigdata4:9092'
)
复制
EXECUTE CDCSOURCE cdc_upsert_kafka WITH (
'connector' = 'mysql-cdc',
'hostname' = '127.0.0.1',
'port' = '3306',
'username' = 'root',
'password' = '123456',
'checkpoint' = '3000',
'scan.startup.mode' = 'initial',
'parallelism' = '1',
'table-name' = 'bigdata\.products,bigdata\.orders',
'sink.connector' = 'upsert-kafka',
'sink.topic' = '${tableName}',
'sink.properties.bootstrap.servers' = 'bigdata2:9092,bigdata3:9092,bigdata4:9092',
'sink.key.format' = 'avro',
'sink.value.format' = 'avro'
)
复制
整库同步到 PostgreSQL
EXECUTE CDCSOURCE cdc_postgresql WITH (
'connector' = 'mysql-cdc',
'hostname' = '127.0.0.1',
'port' = '3306',
'username' = 'root',
'password' = '123456',
'checkpoint' = '3000',
'scan.startup.mode' = 'initial',
'parallelism' = '1',
'table-name' = 'bigdata\.products,bigdata\.orders',
'sink.connector' = 'jdbc',
'sink.url' = 'jdbc:postgresql://127.0.0.1:5432/test',
'sink.username' = 'test',
'sink.password' = '123456',
'sink.sink.db' = 'test',
'sink.table.prefix' = 'test_',
'sink.table.lower' = 'true',
'sink.table-name' = '${tableName}',
'sink.driver' = 'org.postgresql.Driver',
'sink.sink.buffer-flush.interval' = '2s',
'sink.sink.buffer-flush.max-rows' = '100',
'sink.sink.max-retries' = '5'
)
复制
整库同步到 ClickHouse
EXECUTE CDCSOURCE cdc_clickhouse WITH (
'connector' = 'mysql-cdc',
'hostname' = '127.0.0.1',
'port' = '3306',
'username' = 'root',
'password' = '123456',
'checkpoint' = '3000',
'scan.startup.mode' = 'initial',
'parallelism' = '1',
'table-name' = 'bigdata\.products,bigdata\.orders',
'sink.connector' = 'clickhouse',
'sink.url' = 'clickhouse://127.0.0.1:8123',
'sink.username' = 'default',
'sink.password' = '123456',
'sink.sink.db' = 'test',
'sink.table.prefix' = 'test_',
'sink.table.lower' = 'true',
'sink.database-name' = 'test',
'sink.table-name' = '${tableName}',
'sink.sink.batch-size' = '500',
'sink.sink.flush-interval' = '1000',
'sink.sink.max-retries' = '3'
)
复制
整库同步到 HiveCatalog
EXECUTE CDCSOURCE demo WITH (
'connector' = 'mysql-cdc',
'hostname' = '127.0.0.1',
'port' = '3306',
'username' = 'root',
'password' = '123456',
'checkpoint' = '10000',
'scan.startup.mode' = 'initial',
'parallelism' = '1',
'table-name' = 'test\..*',
'sink.connector' = 'sql-catalog',
'sink.catalog.name' = 'hive',
'sink.catalog.type' = 'hive',
'sink.default-database' = 'hdb',
'sink.hive-conf-dir' = '/usr/local/dlink/hive-conf'
);
复制
整库同步到 Flink Table Store
EXECUTE CDCSOURCE demo WITH (
'connector' = 'mysql-cdc',
'hostname' = '127.0.0.1',
'port' = '3306',
'username' = 'root',
'password' = '123456',
'checkpoint' = '10000',
'scan.startup.mode' = 'initial',
'parallelism' = '1',
'table-name' = 'test\..*',
'sink.connector' = 'sql-catalog',
'sink.catalog.name' = 'fts',
'sink.catalog.type' = 'table-store',
'sink.warehouse'='file:/tmp/table_store'
);
复制
整库同步到 DlinkCatalog
EXECUTE CDCSOURCE demo WITH (
'connector' = 'mysql-cdc',
'hostname' = '127.0.0.1',
'port' = '3306',
'username' = 'root',
'password' = '123456',
'checkpoint' = '10000',
'scan.startup.mode' = 'initial',
'parallelism' = '1',
'table-name' = 'test\..*',
'sink.connector' = 'sql-catalog',
'sink.catalog.name' = 'dlinkmysql',
'sink.catalog.type' = 'dlink_mysql',
'sink.catalog.username' = 'dlink',
'sink.catalog.password' = 'dlink',
'sink.catalog.url' = 'jdbc:mysql://127.0.0.1:3306/dlink?useUnicode=true&characterEncoding=utf8&serverTimezone=UTC',
'sink.sink.db' = 'default_database'
);
复制
整库同步到两个数据源
EXECUTE CDCSOURCE jobname WITH (
'connector' = 'mysql-cdc',
'hostname' = '127.0.0.1',
'port' = '3306',
'username' = 'root',
'password' = '123456',
'checkpoint' = '3000',
'scan.startup.mode' = 'initial',
'parallelism' = '1',
'table-name' = 'test\.student,test\.score',
'sink[0].connector' = 'doris',
'sink[0].fenodes' = '127.0.0.1:8030',
'sink[0].username' = 'root',
'sink[0].password' = 'dw123456',
'sink[0].sink.batch.size' = '1',
'sink[0].sink.max-retries' = '1',
'sink[0].sink.batch.interval' = '60000',
'sink[0].sink.db' = 'test',
'sink[0].table.prefix' = 'ODS_',
'sink[0].table.upper' = 'true',
'sink[0].table.identifier' = '${schemaName}.${tableName}',
'sink[0].sink.label-prefix' = '${schemaName}_${tableName}_1',
'sink[0].sink.enable-delete' = 'true',
'sink[1].connector'='datastream-kafka',
'sink[1].topic'='cdc',
'sink[1].brokers'='127.0.0.1:9092'
)
复制
五、常见问题
如何确认整库任务提交成功
配置中心-系统信息-Logs
查看后台日志,寻找报错原因。
多并行度乱序如何解决
设置并行度为1;或者设置目标数据源的相关配置来实现最终一致性,如 Doris Sequence 列。
源库 DDL 变动怎么办
set 'execution.savepoint.ignore-unclaimed-state' = 'true';
。
是否支持完整的模式演变
不支持,目前模式演变取决于 Sink 的数据源连接器能力,如 Doris 连接器支持字段级模式演变。
No operators defined in streaming topology. Cannot execute
jdbc 连接超时导致无法获取正确的元数据信息,可以重启 Dinky 或者升级到 0.7.2 版本。
NoClassDefFoundError
源码位置
其他 cdc 和其他 sink 的支持
sink.
即可;其他特殊的 DataStream Connector 可自行扩展。
六、说明
本总结并未全部验证,若有错误和疏漏请及时提出,如果有其他实践请补充,将同步更新至官网文档,多谢支持。
交流
欢迎您加入社区交流分享与批评,也欢迎您为社区贡献自己的力量。
QQ社区群:543709668,申请备注 “ Dinky+企业名+职位”,不写不批。
微信官方群:添加 wenmo_ai ,申请备注“ Dinky+企业名+职位”,不写不批谢谢。
钉钉社区群(推荐):

扫描二维码获取
更多精彩
Dinky开源
