暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Dinky 扩展 Flink Redis 的实践分享

Dinky开源 2022-08-09
678
摘要:本文介绍了来自广州九四智能科技有限公司的韩公子老师带来的 Dinky 扩展 Flink Redis Connector 的实践分享。内容包括:
  1. 场景
  2. 组件准备
  3. 部署
  4. FlinkCDC 实时写入 Redis
  5. Kafka 与 Redis 流维关联
  6. 总结


Tips:历史传送门
Dinky 扩展 ClickHouse 的实践分享
Dinky 实践系列之 Flink Catalog 元数据管理
Dinky实践系列之FlinkCDC整库实时入仓入湖
打造 Flink + StarRocks+ Dinky 的极速统一分析平台
 

 GitHub 地址 
https://github.com/DataLinkDC/dlink
https://gitee.com/DataLinkDC/Dinky
欢迎大家关注 Dinky 的发展~




一、场景

1. 通过 Flink CDC 实时同步 MySQL 数据库数据到 Redis;

2. 通过 FlinkSQL 将 Kafka 的流数据与 Redis 维度数据进行关联。




二、组件准备

组件
版本
Flink1.14.4
Flink-mysql-cdc
2.2.1
Redis
6.2.4
Mysql
5.7+
Dinky
0.6.6
commons-pool2
2.11.0
jedis
3.3.0
flink-connector-redis
1.0.11
flink-sql-connector-kafka
2.12-1.14.4

温馨提示

    commons-pool2 和 jedis 包是 flink-connector-redis 的 jar 引用到了,故添加上,这两个maven仓库都能下载,或者自己编译源码。其中 jedis 的编译的版本比较旧了,有更新的 RC 包,安全起见,如部署生产环境,应自行编译 flink-connector-redis。

    github地址: https://github.com/jeff-zou/flink-connector-redis 在此感谢 jeff-zou 大佬贡献的 connector!





三、部署

    Dlink 的 plugins 下添加 commons-pool2-2.11.0.jar、 jedis-3.3.0.jar、flink-connector-redis-1.0.11.jar、flink-sql-connector-kafka_2.12-1.14.4.jar, 重启 Dlink。flink standalone 模式或者 yarn 模式,请自行往需要的地方添加依赖。





四、FlinkCDC实时写入Redis


源库准备

    准备需要同步的数据,同步库 emp_1 下的 employees_1,employees_2。

    create database emp_1;


    CREATE TABLE IF NOT EXISTS `employees_1` (
    `emp_no` int(11) NOT NULL,
    `birth_date` date NOT NULL,
    `first_name` varchar(50) NOT NULL,
    `last_name` varchar(50) NOT NULL,
    `gender` enum('M','F') NOT NULL,
    `hire_date` date NOT NULL,
    PRIMARY KEY (`emp_no`)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8;


    insert into employees_1 VALUES ("10", "1992-09-12", "cai", "kunkun", "M", "2022-09-22");
    insert into employees_1 VALUES ("11", "1990-09-15", "wang", "meimei", "F", "2021-04-12");


    CREATE TABLE IF NOT EXISTS `employees_2` (
    `emp_no` int(11) NOT NULL,
    `birth_date` date NOT NULL,
    `first_name` varchar(50) NOT NULL,
    `last_name` varchar(50) NOT NULL,
    `gender` enum('M','F') NOT NULL,
    `hire_date` date NOT NULL,
    PRIMARY KEY (`emp_no`)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8;


    insert into employees_2 VALUES ("20", "1987-01-23", "huang", "menji", "M", "2000-03-04");
    insert into employees_2 VALUES ("21", "1993-04-21", "lu", "benweiniubi", "M", "2022-05-06");


    FlinkSQL

      SET execution.checkpointing.interval = 10s;
      SET execution.checkpointing.tolerable-failed-checkpoints = 3;
      SET execution.checkpointing.timeout = 300s;
      SET execution.checkpointing.externalized-checkpoint-retention = RETAIN_ON_CANCELLATION;
      SET execution.checkpointing.mode = EXACTLY_ONCE;
      SET execution.checkpointing.unaligned = true;
      SET pipeline.operator-chaining = false;


      CREATE TABLE employees_source (
      table_name STRING METADATA VIRTUAL,
      emp_no int NOT NULL,
      birth_date date,
      first_name STRING,
      last_name STRING,
      gender STRING,
      hire_date date,
      PRIMARY KEY (`emp_no`) NOT ENFORCED
      ) WITH (
      'connector' = 'mysql-cdc',
      'hostname' = '127.0.0.1',
      'port' = '3306',
      'username' = 'root',
      'password' = '000000',
      'database-name' = 'emp_1',
      'table-name' = 'employees_[0-9]+'
      );


      create table sink_redis(
      hsetKey VARCHAR,
      hsetField VARCHAR,
      hsetValue VARCHAR
      )
      with ('connector'='redis',
      'redis-mode'='single',
      'host'='hadoop102',
      'port' = '6379',
      'maxTotal' = '100',
      'maxIdle' = '5',
      'minIdle' = '5',
      'sink.parallelism' = '1',
      'sink.max-retries' = '3',
      'command'='hset'
      );


      create view temp_view (hsetKey, hsetField, hsetValue) AS
      select
      table_name as hsetKey,
      CAST(emp_no as STRING) as hsetField,
      '{"' || 'birth_date' || '":"' || CAST(birth_date as STRING) || '",' ||
      '"' || 'first_name' || '":"' || first_name || '",' ||
      '"' || 'last_name' || '":"' || last_name || '",' ||
      '"' || 'gender' || '":"' || gender || '",' ||
      '"' || 'hire_date' || '":"' || CAST(hire_date as STRING) || '"}' as hsetValue
      from employees_source;


      insert into sink_redis select hsetKey, hsetField, hsetValue from temp_view;

          sql说明:

        '{"' || 'birth_date' || '":"' || CAST(birth_date as STRING) || '",' ||
        '"' || 'first_name' || '":"' || first_name || '",' ||
        '"' || 'last_name' || '":"' || last_name || '",' ||
        '"' || 'gender' || '":"' || gender || '",' ||
        '"' || 'hire_date' || '":"' || CAST(hire_date as STRING) || '"}' as hsetValue

            上面只是把数据库的字段和值, 拼接成json串,作为 redis hset 的 value 而已,作为一个案例展示,没有别的含义。因 flink 1.15 版本以上才内置 json 生成函数,所以这里不得不用 || 拼接 (滑稽>< )。redis connector 其实支持很多操作,支持设置更多配置, 更详细的用法还请自行翻阅github。


        提交 perjob 任务


        FlinkWebUI

            上图可见,流任务已经成功被 Dinky 提交的远程集群了。


        数据效果图


        修改数据

            将 last_name 对应的值 kunkun
        改为 momoda

            flink webui 数据流变化。

            查看redis数据, 数据已经被修改。


        新增数据

            表增加一条数据。

          insert into employees_1 VALUES ("12", "1996-06-16", "dili", "reba", "F", "2000-07-25");

              redis 成功增加一条。





          五、Kafka 与 Redis 流维关联


          Kafka 待发送数据

            {"company_id":"employees_1""emp_id":"10""log_id":"log_a_001"}

            FlinkSQL

              SET execution.checkpointing.interval = 10s;
              SET execution.checkpointing.tolerable-failed-checkpoints = 3;
              SET execution.checkpointing.timeout = 300s;
              SET execution.checkpointing.externalized-checkpoint-retention = RETAIN_ON_CANCELLATION;
              SET execution.checkpointing.mode = EXACTLY_ONCE;
              SET execution.checkpointing.unaligned = true;
              SET pipeline.operator-chaining = false;
              -- SET restart-strategy = fixed-delay; --重启策略
              -- SET restart-strategy.fixed-delay.attempts = 5 ; --尝试次数
              -- SET restart-strategy.fixed-delay.delay = 30s; --固定延时时间


              create table kafka_source(
              company_id string,
              emp_id string,
              log_id string,
              event_time as procTime()
              ) with (
              'connector' = 'kafka',
              'topic' = 'test',
              'properties.bootstrap.servers' = '127.0.0.1:9092',
              'properties.group.id' = 'test',
              'properties.partition-discovery.interval-millis' = '30000',
              'format' = 'json',
              'scan.startup.mode' = 'latest-offset'
              );


              create table dim_redis(
              hsetKey VARCHAR,
              hsetField VARCHAR,
              hsetValue VARCHAR
              )
              with ('connector'='redis',
              'redis-mode'='single',
              'host'='hadoop102',
              'port' = '6379',
              'lookup.cache.max-rows' = '5000',
              'lookup.cache.ttl' = '300',
              'maxTotal' = '100',
              'maxIdle' = '5',
              'minIdle' = '5',
              'sink.parallelism' = '1',
              'sink.max-retries' = '3',
              'command'='hget'
              );


              create table sink_table(
              company_id varchar,
              emp_id varchar,
              log_id varchar,
              event_time timestamp,
              first_name varchar,
              last_name varchar,
              hire_date varchar
              ) with ('connector' = 'print');


              create view temp_view as
              select
              company_id as company_id,
              emp_id as emp_id,
              log_id as log_id,
              event_time as event_time,
              JSON_VALUE(d.hsetValue, '$.first_name') as first_name,
              JSON_VALUE(d.hsetValue, '$.last_name') as last_name,
              JSON_VALUE(d.hsetValue, '$.hire_date') as hire_date
              from
              kafka_source as k
              left join
              dim_redis for system_time as of k.event_time as d
              on
              k.company_id = d.hsetKey
              and
              k.emp_id = d.hsetField;


              insert into sink_table select* from temp_view;


              FlinkWebUI

              流维关联成功并输出控制台


              注意事项

                  参数 pipeline.operator-chaining 是为了临时测试,观看数据流图,业务上不推荐设置为 false。



              六、总结


              优势

                  dinky 作为 FlinkSql 一站式开发平台,集开发、监控、资源管理等诸多功能, 使用非常方便;很大程度上解决了业务开发的痛点, 内置的 OpenApi 功能,更可以跟其他调度平台协同,完成实时、离线的一整套开发,所谓工具用得好,下班下得早!


              不足之处

                  开发过程中自定义的 jar 依赖, 需要手动放置带到指定位置,但一般生产上,可能没有权限访问。希望增加功能, 可以有选项栏,点击添加、选择本地包之后,自动同步到指定位置;或者能添加 pom 依赖格式, 自动集成到作业里面。




              交流

              欢迎您加入社区交流分享与批评,也欢迎您为社区贡献自己的力量。

              QQ社区群:543709668,申请备注 “ Dinky+企业名+职位”,不写不批。

              微信官方群(推荐):添加 wenmo_ai ,申请备注“ Dinky+企业名+职位”,不写不批谢谢。

              钉钉直播社区群(推荐):

                     公众号:Dinky开源



              扫描二维码获取

              更多精彩

              Dinky开源




              文章转载自Dinky开源,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

              评论