暂无图片
暂无图片
2
暂无图片
暂无图片
暂无图片

Flink CDC 测试Mysql-Kafka-Elasticsearch

原创 木木 2022-04-20
2462

开启MySQL binlog日志

vim  /etc/my.cnf
#添加以下内容

log-bin=mysql-bin
server_id=1
binlog_format=ROW

重启MySQL:
systemctl restart mysqld

Dlink配置

1)Web UI

ip:8888

image 2.png

2)注册中心

选择Yarn Session类型

image 3.png

填写flink集群地址

image 4.png

3)数据开发

创建根目录

根据不同的业务创建不同的目录,以便于管理

image 5.png

创建作业

右键目录创建不同的作业

image 6.png

SQL开发

以mysql-cdc为样例,不同的数据库,连接方式不一样

image 7.png

4)执行模式

Yarn Session

用于测试流程以及数据,将任务提交到flink web ui(ip:8081)
执行模式:选择Yarn Session 
Flink集群:选择在注册中心创建好的Yarn-session集群

image 8.png

保存配置

image 9.png

执行SQL

image 1.png

Flink Web UI(ip:8081) 页面查看已提交的作业

image.png

FlinkSQL开发准备

第一步:创建MySQL表

CREATE TABLE test(
id int, 
name varchar(32), 
age int, 
PRIMARY KEY (id)
);

第二步:创建kafka topic

bin/kafka-topics.sh --zookeeper 172.16.16.64:2181 --create --replication-factor 1 --partitions 1 --topic test

第三步:创建Elasticsearch的索引

这里是使用Postman创建索引的方式
image 1.png

FlinkSQL开发流程

第一步:创建MySQL映射表

CREATE TABLE test(
id int, 
name varchar(32), 
age int, 
PRIMARY KEY (id) NOT ENFORCED
) with( 
'connector'='mysql-cdc',        
'hostname'='172.16.16.65',        
'port'='3306',        
'username'='root',        
'password'='BigData@',        
'database-name'='test',        
'table-name'='test'
);

第二步:创建kafka映射表

CREATE TABLE kafka_sink (
id int, 
name varchar(32), 
age int, 
PRIMARY KEY (id) NOT ENFORCED
 ) WITH (
'connector' = 'kafka',
'topic' = 'test',
'scan.startup.mode' = 'latest-offset',
'properties.bootstrap.servers' = '172.16.16.64:9092',
'format' = 'canal-json'
);

第三步:创建Elasticsearch映射表

CREATE TABLE es_sink (
id int, 
name varchar(32), 
age int, 
PRIMARY KEY (id) NOT ENFORCED
)with (
'connector' = 'elasticsearch-7',
'hosts' = 'http://172.16.16.64:9200',
'index' = 'es_sink');

第四步:插入语句

insert into kafka_sink select * from test;
insert into es_sink select * from kafka_sink;
「喜欢这篇文章,您的关注和赞赏是给作者最好的鼓励」
关注作者
【版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论