暂无图片
暂无图片
暂无图片
暂无图片
1
暂无图片

Dinky FlinkCDC 整库入仓 StarRocks

2706
摘要:本文由来自神州数码的大数据主管——赵岩硕老师带来的基于 Dinky 实现 FlinkCDC 整库实时入仓 StarRocks 的实践与踩坑分享。内容包括:
  1. 背景
  2. 环境准备
  3. 部署教程
  4. 平台配置
  5. FlinkCDC 整库入仓 StarRocks
  6. FAQ
  7. 总结


Tips:历史传送门
打造 Flink + StarRocks+ Dinky 的极速统一分析平台
Dinky 扩展 ChunJun 的实践分享
Dinky 扩展 iceberg 的实践分享
Dinky 扩展 kudu 实践分享
 

 GitHub 地址 
https://github.com/DataLinkDC/dlink
https://gitee.com/DataLinkDC/Dinky
欢迎大家关注 Dinky 的发展~



一、背景

通过 Dinky + Flink CDC + Starrocks 的方式构建整库实时入仓,在我使用期间遇到的大部分是依赖冲突或者缺包的问题, 所以编写了该文档为大家提前避坑,避免大家重蹈我的覆辙,如有不严谨的地方还请大家提出宝贵意见,希望大家共同进步。(整体流程切记一步一步操作莫急莫急)



二、环境准备

1.Flink  1.13.6

2.Dinky 0.6.5

3.StarRocks 2.2



三、部署教程


Flink(参考官方)

下载地址:  https://flink.apache.org/zh/downloads.html


Dinky 0.6.5

1.下载地址

    https://github.com/DataLinkDC/dlink/releases

2.上传 dlink 压缩包到服务器(/opt/software)

3.解压

    tar -zxvf dlink-release-0.6.5.tar.gz -C opt/module
    mv dlink-release-0.6.5 dlink
    cd dlink
    复制

    4.初始化数据库

    Dinky 采用 mysql 作为后端的存储库,mysql 支持 5.7+。这里假设你已经安装了 mysql 。首先需要创建 Dinky 的后端数据库,这里以配置文件中默认库创建。

      #登录mysql
      mysql -uroot -proot@123


      #授权并创建数据库
      mysql> grant all privileges on *.* to 'dlink'@'%' identified by 'dlink' with grant option;
      mysql> grant all privileges on *.* to 'dlink'@'fdw1' identified by 'dlink' with grant option;
      mysql> flush privileges;


      #此处用 dlink 用户登录
      mysql -h fdw1 -udlink -pdlink
      mysql> create database dlink;
      复制

      在 Dinky 根目录 sql 文件夹下有 2 个 sql 文件 , 分别是 dlink.sql 和 dlink_history.sql。如果第一次部署,可以直接将 dlink.sql 文件在 dlink 数据库下执行。(如果之前已经建立了 dlink 的数据库,那 dlink_history.sql 存放了各版本的升级 sql ,根据版本号及日期按需执行即可)

        #首先登录 mysql


        mysql -h fdw1 -udlink -pdlink
        mysql> use dlink;
        mysql> source opt/module/dlink/sql/dlink.sql
        复制

        5.配置dlink文件

         创建完成数据库后,那肯定要连接了呀。

          #切换目录


          cd /opt/module/dlink/config/
          vim application.yml
          复制

          6.加载依赖 (重点)

           具体介绍可以看dlink官网介绍 (http://www.dlink.top/docs/build_deploy/deploy#%E5%8A%A0%E8%BD%BD%E4%BE%9D%E8%B5%96

            #创建目录 
            cd opt/dlink/
            mkdir plugins
            cp -r opt/module/flink/lib/flink-*.jar


            #必须复制过去 不然你懂得(报错缠身) !!!
            cd opt/flink/lib
            cp -r opt/module/dlink/extends/dlink-client-1.14-0.6.5.jar
            cp -r opt/module/dlink/jar/dlink-client-base-0.6.5.jar
            cp -r opt/module/dlink/jar/dlink-common-0.6.5.jar
            复制

            最终Dlink plugins如下 ,仅供参考:

            Flink lib 如下:

            7.启动Dinky

              CDH模式下在 auto.sh 里加一行 export HADOOP_HOME=/opt/cloudera/parcels/CDH/lib/hadoop


              #启动
              $sh auto.sh start
              #停止
              $sh auto.sh stop
              #重启
              $sh auto.sh restart
              #查看状态
              $sh auto.sh status


              访问地址 : http://hadoop01:8888/
              默认用户名/密码 : admin/admin
              复制

              备注 :日志查看 tail -1000f opt/module/dlink/logs/dlink.log

              端口号可以在 vim opt/module/dlink/config/application.yml 中修改(server : port: 8888)

              Docker 部署可参考官网(http://www.dlink.top/docs/build_deploy/deploy#docker-%E9%83%A8%E7%BD%B2%E5%8F%82%E8%80%83%E5%91%BD%E4%BB%A4


              StarRocks 2.2 部署(参考官方)

                1.下载地址

                https://www.starrocks.com/zh-CN/download/community

                 2.安装教程参考官方

                    https://docs.starrocks.com/zh-cn/2.2/quick_start/Deploy#%E6%89%8B%E5%8A%A8%E9%83%A8%E7%BD%B2



              四、平台配置


              配置集群配置管理(Yarn Per-Job 模式)

                      进入注册中心 -> 集群管理 -> 集群配置管理 -> 新建

                      hdfs:///flink/lib 如下:


              修改提交 FlinkSQL 的 Jar 文件路径

                      进入系统设置 -> Flink设置->修改 与 HDFS 的 jar 相呼应文件地址选择合适的版本复制到 hdfs 上。




              五、FlinkCDC 整库入仓 StarRocks


              任务开发   

                      1.编写FlinkSql代码 创建源库 目标库DDL

                      2.执行模式选择 YarnPer-Job

                      3.点击右上方小火箭(提交任务到集群)

                -- mysql DDL


                CREATE TABLE `wm_do1` (
                `do_id` varchar(20) COLLATE utf8mb4_bin NOT NULL COMMENT '出库单号',
                PRIMARY KEY (`do_id`)
                ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin COMMENT='出库单主表'






                -- starrocks DDL


                CREATE TABLE IF NOT EXISTS `ods_k3`.`wm_do1` (
                `do_id` STRING NOT NULL COMMENT "出库单号"
                ) ENGINE=olap
                PRIMARY KEY(`do_id`)
                COMMENT "出库单主表"
                DISTRIBUTED BY HASH(`do_id`) BUCKETS 1
                PROPERTIES (
                "replication_num" = "3"
                );






                -- dlink CDCSOURCE代码


                EXECUTE CDCSOURCE jobname WITH (
                'connector' = 'mysql-cdc',
                'hostname' = '127.0.0.1',
                'port' = '3306',
                'username' = 'root',
                'password' = '123456',
                'checkpoint' = '3000',
                'scan.startup.mode' = 'initial',
                'parallelism' = '1',
                'table-name' = 'k3_wms\.wm_do1',
                'sink.connector' = 'starrocks',
                'sink.jdbc-url' = 'jdbc:mysql://127.0.0.1:9030',
                'sink.load-url' = '127.0.0.1:8030',
                'sink.username' = 'root',
                'sink.password' = '',
                'sink.sink.db' = 'ods_k3',
                -- 'sink.table.prefix' = 'ods_bak_',
                -- 'sink.table.lower' = 'true',
                'sink.database-name' = 'ods_k3',
                'sink.table-name' = '${tableName}',
                'sink.sink.properties.format' = 'json',
                'sink.sink.properties.strip_outer_array' = 'true',
                'sink.sink.max-retries' = '10',
                'sink.sink.buffer-flush.interval-ms' = '15000',
                'sink.sink.parallelism' = '1'
                )


                复制

                查看任务运行状态

                进入任务查看各种信息

                进入 FlinkWebUI 查看查看具体构造及日志信息

                对比数据(同步成功,很丝滑)




                六、FAQ


                Dinky 整合 StarRocks 的 Flink Connector 服务启动报错

                解决方案:

                上方报错主要是因为依赖冲突而导致启动报错, 对应的删除starrocks com下fasterxml 即可。

                执行图中未体现 Sink 阶段的问题

                解决方案:

                    1.查看dlink后台日志根据报错信息解决问题。

                    2.可使用dlink远程调试跟踪代码进行查看。(远程调试地址:http://www.dlink.top/docs/next/developer_guide/remote_debug/)



                七、总结

                在使用 Dinky 期间很大程度上降低了流式数据实时入仓的开发时间,以及多余的重复工作消耗。并且 Dinky 将整个数据开发过程进行了 SQL 化,使开发运维使用的更加方便 快捷 。就如 Dinky 介绍的 「开箱既用 易扩展 」 赞赞赞!

                最后也欢迎大家加入 Dinky 的官方社区,共建共赢~





                交流

                欢迎您加入社区交流分享与批评,也欢迎您为社区贡献自己的力量。

                QQ社区群:543709668,申请备注 “ Dinky+企业名+职位”,不写不批

                微信官方群(推荐):添加 wenmo_ai ,申请备注“ Dinky+企业名+职位”,缺一不批谢谢。

                       公众号:DataLink数据中台



                扫描二维码获取

                更多精彩

                DataLink

                数据中台




                文章转载自DataLink数据中台,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

                评论

                S
                shing
                暂无图片
                1年前
                评论
                暂无图片 0
                最近在用ETLCloud平台,如果是想要快速实现数据库CDC操作,ETLCloud也能实现flink cdc的效果,并且是无需代码即能实现,etlcloud cdc部分在增量同步、断点续传、全量同步的表现都很好,支持全增量一体化同步
                1年前
                暂无图片 点赞
                评论