
<dependency>
<groupId>org.apache.doris</groupId>
<artifactId>flink-doris-connector-1.15</artifactId>
<!--artifactId>flink-doris-connector-1.16</artifactId-->
<!--artifactId>flink-doris-connector-1.17</artifactId-->
<version>1.4.0</version>
</dependency>复制
重要优化
在之前版本中,如果想快速将上游整个 MySQL 业务库接入到 Doris 中,通常需要手动编写 DataStream 程序,并在 Apache Doris 中创建映射表。对于包含成千上万张表的业务库,整库同步就显得非常复杂,同步难度相对较高。
使用方法
mysql_db
中
tbl
表和
test
开头的表进行同步。只需要执行以下命令,即可将上游多表整库同步到 Doris 中,同时在 Doris 中无需提前创建表。
<FLINK_HOME>/bin/flink run \
-Dexecution.checkpointing.interval=10s \
-Dparallelism.default=1 \
-c org.apache.doris.flink.tools.cdc.CdcTools \
lib/flink-doris-connector-1.16-1.4.0.jar \
mysql-sync-database \
--database test_db \
--mysql-conf hostname=127.0.0.1 \
--mysql-conf username=root \
--mysql-conf password=123456 \
--mysql-conf database-name=mysql_db \
--including-tables "tbl|test.*" \
--sink-conf fenodes=127.0.0.1:8030 \
--sink-conf username=root \
--sink-conf password=123456 \
--sink-conf jdbc-url=jdbc:mysql://127.0.0.1:9030 \
--sink-conf sink.label-prefix=label1 \
--table-conf replication_num=1复制
同步性能



实际使用反馈
对用户特别友好,不用关注建表语句怎么写,大大提高了工作效率
—— 盛玉,蜀海供应链 资深大数据工程师
—— 李俊,恒信集团 资深大数据工程师
解决了全量 + 增量数据同步的统一性问题,提高了数据同步的效率,同时支持了源数据结构实时同步,极大提升使用的体验。
最新特性
维表 Join
引入 Thrift SDK
实现 Stream Load 按需连接
轮询 BE 节点导入
支持 Doris Array/JSON 等类型
问题修复
修复 Stream Load 连接 Check 时的并发问题
修复连接 Apache Doris 部分超时参数不生效的问题 修复数据写入时隐藏分隔符不生效的问题 修复读取 Doris 查询计划过长,序列化失败的问题
使用示例
CREATE TABLE flink_doris_source (
name STRING,
age INT,
score DECIMAL(5,2)
)
WITH (
'connector' = 'doris',
'fenodes' = '127.0.0.1:8030',
'table.identifier' = 'database.table',
'username' = 'root',
'password' = 'password',
'doris.filter.query' = 'age=18'
);
SELECT * FROM flink_doris_source; 复制
维表 Join
CREATE TABLE fact_table (
`id` BIGINT,
`name` STRING,
`city` STRING,
`process_time` as proctime()
) WITH (
'connector' = 'kafka',
...
);
create table dim_city(
`city` STRING,
`level` INT ,
`province` STRING,
`country` STRING
) WITH (
'connector' = 'doris',
'fenodes' = '127.0.0.1:8030',
'jdbc-url' = 'jdbc:mysql://127.0.0.1:9030',
'lookup.jdbc.async' = 'true',
'table.identifier' = 'dim.dim_city',
'username' = 'root',
'password' = ''
);
SELECT a.id, a.name, a.city, c.province, c.country,c.level
FROM fact_table a
LEFT JOIN dim_city FOR SYSTEM_TIME AS OF a.process_time AS c
ON a.city = c.city 复制
CREATE TABLE doris_sink (
name STRING,
age INT,
score DECIMAL(5,2)
)
WITH (
'connector' = 'doris',
'fenodes' = '127.0.0.1:8030',
'table.identifier' = 'database.table',
'username' = 'root',
'password' = '',
'sink.label-prefix' = 'doris_label',
//json格式写入
'sink.properties.format' = 'json',
'sink.properties.read_json_by_line' = 'true'
);复制
致谢
在此向所有参与版本设计、开发、测试、讨论的社区贡献者们表示感谢,感谢他们为本次版本开发和优化提供了强有力支持。
贡献者名单
@caoliang-web
@DongLiang-0
@gnehil
@GoGoWen
@JNSimba
@legendtkl
@lsy3993
@Myasuka
@wolfboys
欢迎更多的开源技术爱好者加入 Apache Doris 社区交流群,携手成长,共建社区生态。Apache Doris 社区当前已容纳了上万名开发者和使用者,承载了 30+ 交流社群,如果你也是 Apache Doris 的爱好者,非常欢迎您的加入!
文章转载自锋哥聊DORIS数仓,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。
评论
无法同步表结构呀,还需要自己创建数据库表。是我使用方法不对嘛?
1年前

评论
相关阅读
【专家有话说第五期】在不同年龄段,DBA应该怎样规划自己的职业发展?
墨天轮编辑部
1450次阅读
2025-03-13 11:40:53
MySQL8.0统计信息总结
闫建(Rock Yan)
543次阅读
2025-03-17 16:04:03
2月“墨力原创作者计划”获奖名单公布
墨天轮编辑部
497次阅读
2025-03-13 14:38:19
SQL优化 - explain查看SQL执行计划(一)
金同学
433次阅读
2025-03-13 16:04:22
MySQL突然崩溃?教你用gdb解剖core文件,快速锁定“元凶”!
szrsu
421次阅读
2025-03-13 00:29:43
MySQL生产实战优化(利用Index skip scan优化性能提升257倍)
chengang
365次阅读
2025-03-17 10:36:40
MySQL数据库当前和历史事务分析
听见风的声音
342次阅读
2025-04-01 08:47:17
墨天轮个人数说知识点合集
JiekeXu
313次阅读
2025-04-01 15:56:03
MySQL 生产实践-Update 二级索引导致的性能问题排查
chengang
306次阅读
2025-03-28 16:28:31
MySQL8.0直方图功能简介
Rock Yan
256次阅读
2025-03-21 15:30:53