暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Dinky 实践系列之 Flink Catalog 元数据管理

Dinky开源 2022-08-02
1090
摘要:本文介绍了韩非老师带来的 Dinky 实践系列之 Flink Catalog 元数据管理的分享。内容包括:
  1. 前言
  2. 环境要求
  3. 所需依赖
  4. 脚本准备
  5. 功能实践
  6. 总结


Tips:历史传送门
Dinky实践系列之FlinkCDC整库实时入仓入湖
Dinky FlinkCDC 整库入仓 StarRocks
打造 Flink + StarRocks+ Dinky 的极速统一分析平台
Dinky 构建 Flink CDC 整库入仓入湖
 

 GitHub 地址 
https://github.com/DataLinkDC/dlink
https://gitee.com/DataLinkDC/Dinky
欢迎大家关注 Dinky 的发展~


一、前言

Flink Catalog 持久化是 Dinky 实践系列的第二篇,通过阅读本文,您将会熟悉 Dinky MySQL Catalog 持久化的用法。这个系列中,我们以 MySQL 做为 Source 端,StarRocks 做为 Sink 端做为演示。

在 Dinky 0.6.5 之前,在编写 FlinkSQL 作业时,FlinkSQL 的 DDL 语句可以采用 FlinkSQLEnv 环境引入。但这种方式对大量表结构进行初始化管理时存在局限性,为提供统一的 Flink 元数据管理能力,Dinky 在 0.6.6 实现了 Flink MySQL Catalog 功能,此功能与 Hive Catalog 相似,相比之前大大降低了表结构的维护成本。


二、环境要求

软件
版本
CDH
6.2.0
Hadoop
3.0.0-cdh6.2.0
Flink
1.13.6
Flink CDC
2.2.1
StarRocks2.2.0
Dinky0.6.6
MYSQL
5.7




三、所需依赖


依赖

    Mysql Catalog 持久化需要在 Flink下加载周围组件所需要的 Flink connector 即可。依赖如下:

    # hadoop依赖
    flink-shaded-hadoop-2-uber-3.0.0-cdh6.3.0-7.0.jar
    # Flink Starrrocks依赖
    flink-connector-starrocks-1.2.2_flink-1.13_2.12.jar
    # Dinky hadoop依赖
    flink-shaded-hadoop-3-uber-3.1.1.7.2.8.0-224-9.0.jar
    # Dinky mysql catalog依赖
    dlink-catalog-mysql-1.13-0.6.6.jar
    # flink cdc依赖包
    flink-sql-connector-mysql-cdc-2.2.1.jar
    # mysql 驱动依赖
    mysql-connector-java-8.0.21.jar

    说明

    1.hadoop 依赖包放置 $FLINK_HOME/lib下

    2.Flink  StarRocks 依赖包放置 $FLINK_HOME/lib 和 $DINKY_HOME/plugins 下

    3.Dinky hadoop 依赖包放置 $DINKY_HOME/plugins 下(网盘或者群公告下载)

    4.Dinky MySQL Catalog 依赖放置 $FLINK_HOME/lib 下

    5.Flink cdc 依赖包放置 $FLINK_HOME/lib 和 $DINKY_HOME/plugins 下

    6.MySQL 驱动依赖放置 $FLINK_HOME/lib 和 $DINKY_HOME/plugins 下



    四、脚本准备


    MySQL 建表

        如下 sql 脚本采用 Flink CDC 官网。

      # mysql建表语句(同步到Starocks)
      CREATE TABLE bigdata.products (
      id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
      name VARCHAR(255) NOT NULL,
      description VARCHAR(512)
      );
      ALTER TABLE bigdata.products AUTO_INCREMENT = 101;
      INSERT INTO bigdata.products
      VALUES (default,"scooter","Small 2-wheel scooter"),
      (default,"car battery","12V car battery"),
      (default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3"),
      (default,"hammer","12oz carpenter's hammer"),
      (default,"hammer","14oz carpenter's hammer"),
      (default,"hammer","16oz carpenter's hammer"),
      (default,"rocks","box of assorted rocks"),
      (default,"jacket","water resistent black wind breaker"),
      (default,"spare tire","24 inch spare tire");

      CREATE TABLE bigdata.orders (
      order_id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
      order_date DATETIME NOT NULL,
      customer_name VARCHAR(255) NOT NULL,
      price DECIMAL(10, 5) NOT NULL,
      product_id INTEGER NOT NULL,
      order_status BOOLEAN NOT NULL -- Whether order has been placed
      ) AUTO_INCREMENT = 10001;


      INSERT INTO bigdata.orders
      VALUES (default, '2020-07-30 10:08:22', 'Jark', 50.50, 102, false),
      (default, '2020-07-30 10:11:09', 'Sally', 15.00, 105, false),
      (default, '2020-07-30 12:00:30', 'Edward', 25.25, 106, false);


      表解析

          MySQL Catalog 持久化目前默认的 catalog 为 my_catalog,默认的 FlinkSQLEnv 为 DefaultCatalog。目前存储 MySQL Catalog 元数据的表结构如下:

      元数据表
      表含义
      metadata_database元数据 schema 信息
      metadata_table元数据table信息
      metadata_database_propertyschema 属性信息
      metadata_table_propertytable 属性信息
      metadata_column数据列信息
      metadata_functionUDF 信息






      五、功能实践


      作业脚本准备

        DROP  TABLE IF EXISTS ods_orders_src;
        CREATE TABLE IF NOT EXISTS ods_orders_src (
        `order_id` int COMMENT ''
        , `order_date` timestamp(3) COMMENT ''
        , `customer_name` string COMMENT ''
        , `price` decimal(12,2) COMMENT ''
        , `product_id` int COMMENT ''
        , `order_status` tinyint COMMENT ''
        ,PRIMARY KEY(order_id) NOT ENFORCED
        ) COMMENT ''
        WITH (
        'connector' = 'mysql-cdc'
        ,'hostname' = '192.168.0.40'
        ,'port' = '4406'
        ,'username' = 'root'
        ,'password' = 'Percona@020*'
        ,'server-time-zone' = 'Asia/Shanghai'
        ,'scan.incremental.snapshot.enabled' = 'true'
        ,'scan.startup.mode'='initial'
        ,'scan.incremental.snapshot.chunk.size' = '20000'
        ,'heartbeat.interval' = '120s'
        ,'database-name' = 'bigdata'
        ,'table-name' = 'orders'
        );
        DROP TABLE IF EXISTS ods_orders_sink;
        CREATE TABLE IF NOT EXISTS ods_orders_sink (
        `order_id` int COMMENT ''
        , `order_date` timestamp(3) COMMENT ''
        , `customer_name` string COMMENT ''
        , `price` decimal(12,2) COMMENT ''
        , `product_id` int COMMENT ''
        , `order_status` tinyint COMMENT ''
        ,PRIMARY KEY(order_id) NOT ENFORCED
        ) COMMENT ''
        WITH (
        'jdbc-url' = 'jdbc:mysql://192.168.0.5:19035',
        'connector' = 'starrocks',
        'database-name' = 'qhc_ods',
        'table-name' = 'ods_orders',
        'password' = '123456',
        'load-url' = '192.168.0.5:18035',
        'username' = 'devuser',
        'sink.properties.format' = 'json',
        'sink.properties.strip_outer_array' = 'true',
        'sink.max-retries' = '10',
        'sink.buffer-flush.interval-ms' = '15000',
        'sink.parallelism' = '1'
        );


        DROP TABLE IF EXISTS ods_products_src;
        CREATE TABLE IF NOT EXISTS ods_products_src (
        `id` int COMMENT ''
        , `name` string COMMENT ''
        , `description` string COMMENT ''
        ,PRIMARY KEY(id ) NOT ENFORCED
        ) COMMENT ''
        WITH (
        'connector' = 'mysql-cdc'
        ,'hostname' = '192.168.0.4'
        ,'port' = '3306'
        ,'username' = 'root'
        ,'password' = '123456'
        ,'server-time-zone' = 'Asia/Shanghai'
        ,'scan.incremental.snapshot.enabled' = 'true'
        ,'scan.startup.mode'='initial'
        ,'scan.incremental.snapshot.chunk.size' = '20000'
        ,'heartbeat.interval' = '120s'
        ,'database-name' = 'bigdata'
        ,'table-name' = 'products'
        );
        DROP TABLE IF EXISTS ods_products_sink;
        CREATE TABLE IF NOT EXISTS ods_products_sink (
        `id` int COMMENT ''
        , `name` string COMMENT ''
        , `description` string COMMENT ''
        ,PRIMARY KEY(id ) NOT ENFORCED
        ) COMMENT ''
        WITH (
        'jdbc-url' = 'jdbc:mysql://192.168.0.5:19035',
        'connector' = 'starrocks',
        'database-name' = 'qhc_ods',
        'table-name' = 'ods_products',
        'password' = '123456',
        'load-url' = '192.168.0.5:18035',
        'username' = 'devuser',
        'sink.properties.format' = 'json',
        'sink.properties.strip_outer_array' = 'true',
        'sink.max-retries' = '10',
        'sink.buffer-flush.interval-ms' = '15000',
        'sink.parallelism' = '1'
           );


        创建初始化脚本作业

            创建一个 ddl_init 作业,通过 yarn session模式提交,FlinkSQLEnv采用DefaultCatalog,作业如下:

            执行作业后,在 dinky 元数据库查询是否表已经存在。

        查看元数据表

            每执行一次初始化DDL,将会覆盖之前的元数据。

            说明: 对于所创建的表元信息不能清空,否则会报错如下:


        查询 Source 的数据

            新建一个作业 ods_order_src。

          select * from ods_orders_src;

              由此可以看到,对于所创建的表其实已经存在与 DefaultCatalog,即保存与 Mysql 元数据中。此时,可以通过创建任意作业去使用DefaultCatalog 中的表。

              首先清空 StarRocks 数据。

          插入 Sink 表

              还是在 ods_order_src 作业中,使用 insert 语句。将数据写入 StarRocks 中。

            INSERT INTO ods_orders_sink
            select * from ods_orders_src;

            提交 Flink 作业

            查看 StarRocks 数据

            说明

                对于 MySQL Catalog 除上面用默认的 DefaultCatalog,那么也可以通过 create 创建 catalog,然后在对应数据库下执行 dlinkmysqlcatalog.sql。语法如下:

              create catalog my_catalog with(
              'type' = 'dlink_mysql',
              'username' = 'dlink',
              'password' = 'dlink',
              'url' = 'jdbc:mysql://192.168.0.4:3306/dlink2?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&useSSL=false&zeroDateTimeBehavior=convertToNull&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=true'
              );


              use catalog my_catalog;


              六、总结

                  随着 Dinky 的不断扩大以及在业界的影响力。为方便大家的学习和使用,此次系列文章作为 Dinky 系列文章的第二篇,后期系列文章尽请期待。




              交流

              欢迎您加入社区交流分享与批评,也欢迎您为社区贡献自己的力量。

              QQ社区群:543709668,申请备注 “ Dinky+企业名+职位”,不写不批。

              微信官方群(推荐):添加 wenmo_ai ,申请备注“ Dinky+企业名+职位”,不写不批谢谢。

              钉钉社区群(推荐):

                     公众号:DataLink数据中台



              扫描二维码获取

              更多精彩

              DataLink

              数据中台




              文章转载自Dinky开源,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

              评论