Table of Contents
一. 数据源准备
建表:
CREATE TABLE `mysql_cdc` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`name` varchar(100) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB;
复制
写存储过程批量插入数据:
DELIMITER // CREATE PROCEDURE p5() BEGIN declare l_n1 int default 1; while l_n1 <= 10000000 DO insert into mysql_cdc (id,name) values (l_n1,concat('test',l_n1)); set l_n1 = l_n1 + 1; end while; END; // DELIMITER ;
复制
二. FLink SQL客户端操作
启动yarn session
内存尽量多指定,不然会包 OOM的错误
$FLINK_HOME/bin/yarn-session.sh -jm 8192 -tm 8192 -d 2>&1 & /home/flink-1.14.5/bin/sql-client.sh embedded -s yarn-session
复制
Flink SQL操作:
set execution.checkpointing.interval=10sec; CREATE TABLE flink_mysql_cdc8 ( id BIGINT NOT NULL PRIMARY KEY NOT ENFORCED, name varchar(100) ) WITH ( 'connector' = 'mysql-cdc', 'hostname' = 'hp8', 'port' = '3306', 'username' = 'root', 'password' = 'abc123', 'database-name' = 'test', 'table-name' = 'mysql_cdc', 'server-id' = '5409-5415', 'scan.incremental.snapshot.enabled'='true' ); set sql-client.execution.result-mode=tableau; select count(*) from flink_mysql_cdc8; CREATE TABLE flink_hudi_mysql_cdc8( id BIGINT NOT NULL PRIMARY KEY NOT ENFORCED, name varchar(100) ) WITH ( 'connector' = 'hudi', 'path' = 'hdfs://hp5:8020/tmp/hudi/flink_hudi_mysql_cdc8', 'table.type' = 'COPY_ON_WRITE', 'changelog.enabled' = 'true', 'hoodie.datasource.write.recordkey.field' = 'id', 'write.precombine.field' = 'name', 'compaction.async.enabled' = 'false' ); insert into flink_hudi_mysql_cdc8 select * from flink_mysql_cdc8; select count(*) from flink_hudi_mysql_cdc8 ;
复制
三. 查看运行情况
如果是生产环境,可以指定一个较高的并行度,我这个地方因为是测试环境,并行度指定为1
去掉checkpoint 且将并行度由1调整到4,速度提升了几十倍
set table.exec.resource.default-parallelism=4;
复制
「喜欢这篇文章,您的关注和赞赏是给作者最好的鼓励」
关注作者
【版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。