开启MySQL binlog日志
vim /etc/my.cnf
#添加以下内容
log-bin=mysql-bin
server_id=1
binlog_format=ROW
重启MySQL:
systemctl restart mysqld
Dlink配置
1)Web UI
ip:8888
2)注册中心
选择Yarn Session类型
填写flink集群地址
3)数据开发
创建根目录
根据不同的业务创建不同的目录,以便于管理
创建作业
右键目录创建不同的作业
SQL开发
以mysql-cdc为样例,不同的数据库,连接方式不一样
4)执行模式
Yarn Session
用于测试流程以及数据,将任务提交到flink web ui(ip:8081)
执行模式:选择Yarn Session
Flink集群:选择在注册中心创建好的Yarn-session集群
保存配置
执行SQL
Flink Web UI(ip:8081) 页面查看已提交的作业
FlinkSQL开发准备
第一步:创建MySQL表
CREATE TABLE test(
id int,
name varchar(32),
age int,
PRIMARY KEY (id)
);
第二步:创建kafka topic
bin/kafka-topics.sh --zookeeper 172.16.16.64:2181 --create --replication-factor 1 --partitions 1 --topic test
第三步:创建Elasticsearch的索引
这里是使用Postman创建索引的方式
FlinkSQL开发流程
第一步:创建MySQL映射表
CREATE TABLE test(
id int,
name varchar(32),
age int,
PRIMARY KEY (id) NOT ENFORCED
) with(
'connector'='mysql-cdc',
'hostname'='172.16.16.65',
'port'='3306',
'username'='root',
'password'='BigData@',
'database-name'='test',
'table-name'='test'
);
第二步:创建kafka映射表
CREATE TABLE kafka_sink (
id int,
name varchar(32),
age int,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'kafka',
'topic' = 'test',
'scan.startup.mode' = 'latest-offset',
'properties.bootstrap.servers' = '172.16.16.64:9092',
'format' = 'canal-json'
);
第三步:创建Elasticsearch映射表
CREATE TABLE es_sink (
id int,
name varchar(32),
age int,
PRIMARY KEY (id) NOT ENFORCED
)with (
'connector' = 'elasticsearch-7',
'hosts' = 'http://172.16.16.64:9200',
'index' = 'es_sink');
第四步:插入语句
insert into kafka_sink select * from test;
insert into es_sink select * from kafka_sink;
「喜欢这篇文章,您的关注和赞赏是给作者最好的鼓励」
关注作者
【版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。