场景 组件准备 部署 FlinkCDC 实时写入 Redis Kafka 与 Redis 流维关联 总结
![](https://oss-emcsprod-public.modb.pro/wechatSpider/modb_20220818_79457cfc-1edd-11ed-b9a2-fa163eb4f6be.png)
![](https://oss-emcsprod-public.modb.pro/wechatSpider/modb_20220818_79457cfc-1edd-11ed-b9a2-fa163eb4f6be.png)
一、场景
1. 通过 Flink CDC 实时同步 MySQL 数据库数据到 Redis;
2. 通过 FlinkSQL 将 Kafka 的流数据与 Redis 维度数据进行关联。
二、组件准备
组件 | 版本 |
Flink | 1.14.4 |
Flink-mysql-cdc | 2.2.1 |
Redis | 6.2.4 |
Mysql | 5.7+ |
Dinky | 0.6.6 |
commons-pool2 | 2.11.0 |
jedis | 3.3.0 |
flink-connector-redis | 1.0.11 |
flink-sql-connector-kafka | 2.12-1.14.4 |
温馨提示
commons-pool2 和 jedis 包是 flink-connector-redis 的 jar 引用到了,故添加上,这两个maven仓库都能下载,或者自己编译源码。其中 jedis 的编译的版本比较旧了,有更新的 RC 包,安全起见,如部署生产环境,应自行编译 flink-connector-redis。
github地址: https://github.com/jeff-zou/flink-connector-redis 在此感谢 jeff-zou 大佬贡献的 connector!
三、部署
四、FlinkCDC实时写入Redis
源库准备
准备需要同步的数据,同步库 emp_1 下的 employees_1,employees_2。
create database emp_1;
CREATE TABLE IF NOT EXISTS `employees_1` (
`emp_no` int(11) NOT NULL,
`birth_date` date NOT NULL,
`first_name` varchar(50) NOT NULL,
`last_name` varchar(50) NOT NULL,
`gender` enum('M','F') NOT NULL,
`hire_date` date NOT NULL,
PRIMARY KEY (`emp_no`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
insert into employees_1 VALUES ("10", "1992-09-12", "cai", "kunkun", "M", "2022-09-22");
insert into employees_1 VALUES ("11", "1990-09-15", "wang", "meimei", "F", "2021-04-12");
CREATE TABLE IF NOT EXISTS `employees_2` (
`emp_no` int(11) NOT NULL,
`birth_date` date NOT NULL,
`first_name` varchar(50) NOT NULL,
`last_name` varchar(50) NOT NULL,
`gender` enum('M','F') NOT NULL,
`hire_date` date NOT NULL,
PRIMARY KEY (`emp_no`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
insert into employees_2 VALUES ("20", "1987-01-23", "huang", "menji", "M", "2000-03-04");
insert into employees_2 VALUES ("21", "1993-04-21", "lu", "benweiniubi", "M", "2022-05-06");
FlinkSQL
SET execution.checkpointing.interval = 10s;
SET execution.checkpointing.tolerable-failed-checkpoints = 3;
SET execution.checkpointing.timeout = 300s;
SET execution.checkpointing.externalized-checkpoint-retention = RETAIN_ON_CANCELLATION;
SET execution.checkpointing.mode = EXACTLY_ONCE;
SET execution.checkpointing.unaligned = true;
SET pipeline.operator-chaining = false;
CREATE TABLE employees_source (
table_name STRING METADATA VIRTUAL,
emp_no int NOT NULL,
birth_date date,
first_name STRING,
last_name STRING,
gender STRING,
hire_date date,
PRIMARY KEY (`emp_no`) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = '127.0.0.1',
'port' = '3306',
'username' = 'root',
'password' = '000000',
'database-name' = 'emp_1',
'table-name' = 'employees_[0-9]+'
);
create table sink_redis(
hsetKey VARCHAR,
hsetField VARCHAR,
hsetValue VARCHAR
)
with ('connector'='redis',
'redis-mode'='single',
'host'='hadoop102',
'port' = '6379',
'maxTotal' = '100',
'maxIdle' = '5',
'minIdle' = '5',
'sink.parallelism' = '1',
'sink.max-retries' = '3',
'command'='hset'
);
create view temp_view (hsetKey, hsetField, hsetValue) AS
select
table_name as hsetKey,
CAST(emp_no as STRING) as hsetField,
'{"' || 'birth_date' || '":"' || CAST(birth_date as STRING) || '",' ||
'"' || 'first_name' || '":"' || first_name || '",' ||
'"' || 'last_name' || '":"' || last_name || '",' ||
'"' || 'gender' || '":"' || gender || '",' ||
'"' || 'hire_date' || '":"' || CAST(hire_date as STRING) || '"}' as hsetValue
from employees_source;
insert into sink_redis select hsetKey, hsetField, hsetValue from temp_view;
sql说明:
'{"' || 'birth_date' || '":"' || CAST(birth_date as STRING) || '",' ||
'"' || 'first_name' || '":"' || first_name || '",' ||
'"' || 'last_name' || '":"' || last_name || '",' ||
'"' || 'gender' || '":"' || gender || '",' ||
'"' || 'hire_date' || '":"' || CAST(hire_date as STRING) || '"}' as hsetValue
上面只是把数据库的字段和值, 拼接成json串,作为 redis hset 的 value 而已,作为一个案例展示,没有别的含义。因 flink 1.15 版本以上才内置 json 生成函数,所以这里不得不用 || 拼接 (滑稽>< )。redis connector 其实支持很多操作,支持设置更多配置, 更详细的用法还请自行翻阅github。
提交 perjob 任务
FlinkWebUI
上图可见,流任务已经成功被 Dinky 提交的远程集群了。
数据效果图
修改数据
将 last_name 对应的值 kunkun
改为 momoda
flink webui 数据流变化。
查看redis数据, 数据已经被修改。
新增数据
表增加一条数据。
insert into employees_1 VALUES ("12", "1996-06-16", "dili", "reba", "F", "2000-07-25");
redis 成功增加一条。
五、Kafka 与 Redis 流维关联
Kafka 待发送数据
{"company_id":"employees_1", "emp_id":"10", "log_id":"log_a_001"}
FlinkSQL
SET execution.checkpointing.interval = 10s;
SET execution.checkpointing.tolerable-failed-checkpoints = 3;
SET execution.checkpointing.timeout = 300s;
SET execution.checkpointing.externalized-checkpoint-retention = RETAIN_ON_CANCELLATION;
SET execution.checkpointing.mode = EXACTLY_ONCE;
SET execution.checkpointing.unaligned = true;
SET pipeline.operator-chaining = false;
-- SET restart-strategy = fixed-delay; --重启策略
-- SET restart-strategy.fixed-delay.attempts = 5 ; --尝试次数
-- SET restart-strategy.fixed-delay.delay = 30s; --固定延时时间
create table kafka_source(
company_id string,
emp_id string,
log_id string,
event_time as procTime()
) with (
'connector' = 'kafka',
'topic' = 'test',
'properties.bootstrap.servers' = '127.0.0.1:9092',
'properties.group.id' = 'test',
'properties.partition-discovery.interval-millis' = '30000',
'format' = 'json',
'scan.startup.mode' = 'latest-offset'
);
create table dim_redis(
hsetKey VARCHAR,
hsetField VARCHAR,
hsetValue VARCHAR
)
with ('connector'='redis',
'redis-mode'='single',
'host'='hadoop102',
'port' = '6379',
'lookup.cache.max-rows' = '5000',
'lookup.cache.ttl' = '300',
'maxTotal' = '100',
'maxIdle' = '5',
'minIdle' = '5',
'sink.parallelism' = '1',
'sink.max-retries' = '3',
'command'='hget'
);
create table sink_table(
company_id varchar,
emp_id varchar,
log_id varchar,
event_time timestamp,
first_name varchar,
last_name varchar,
hire_date varchar
) with ('connector' = 'print');
create view temp_view as
select
company_id as company_id,
emp_id as emp_id,
log_id as log_id,
event_time as event_time,
JSON_VALUE(d.hsetValue, '$.first_name') as first_name,
JSON_VALUE(d.hsetValue, '$.last_name') as last_name,
JSON_VALUE(d.hsetValue, '$.hire_date') as hire_date
from
kafka_source as k
left join
dim_redis for system_time as of k.event_time as d
on
k.company_id = d.hsetKey
and
k.emp_id = d.hsetField;
insert into sink_table select* from temp_view;
FlinkWebUI
流维关联成功并输出控制台
注意事项
参数 pipeline.operator-chaining 是为了临时测试,观看数据流图,业务上不推荐设置为 false。
六、总结
优势
dinky 作为 FlinkSql 一站式开发平台,集开发、监控、资源管理等诸多功能, 使用非常方便;很大程度上解决了业务开发的痛点, 内置的 OpenApi 功能,更可以跟其他调度平台协同,完成实时、离线的一整套开发,所谓工具用得好,下班下得早!
不足之处
开发过程中自定义的 jar 依赖, 需要手动放置带到指定位置,但一般生产上,可能没有权限访问。希望增加功能, 可以有选项栏,点击添加、选择本地包之后,自动同步到指定位置;或者能添加 pom 依赖格式, 自动集成到作业里面。
交流
欢迎您加入社区交流分享与批评,也欢迎您为社区贡献自己的力量。
QQ社区群:543709668,申请备注 “ Dinky+企业名+职位”,不写不批。
微信官方群(推荐):添加 wenmo_ai ,申请备注“ Dinky+企业名+职位”,不写不批谢谢。
钉钉直播社区群(推荐):
![](https://oss-emcsprod-public.modb.pro/wechatSpider/modb_20220818_7a188714-1edd-11ed-b9a2-fa163eb4f6be.png)
扫描二维码获取
更多精彩
Dinky开源
![](https://oss-emcsprod-public.modb.pro/wechatSpider/modb_20220818_7a29e7ca-1edd-11ed-b9a2-fa163eb4f6be.png)