暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

实战系列:Hudi on Flink案例演练

大数据从业者 2021-03-12
1589

前言

HadoopFlink已经编译部署完成(见之前的发文),笔者终于可以体验下hudi使用flink引擎。虽然,目前只支持Kafka数据源写COW表,笔者对hudi这一数据湖技术极为感兴趣。



环境介绍

    Zookeeper:3.5.8
    Kafka:2.7.0
    Hadoop:3.2.2
    Flink:1.11.2
    Hudi: 0.7.0
    复制


    Hudi源码编译

      root@felixzh:/opt# wget https://downloads.apache.org/hudi/0.7.0/hudi-0.7.0.src.tgz
      root@felixzh:/opt# tar –zxvf hudi-0.7.0.src.tgz&& cd hudi-0.7.0
      root@felixzh:/opt/hudi-0.7.0# mvn clean package –DskipTests –Dhadoop.version=3.2.2–Dflink.version=1.11.2
      复制


      编译结果(大约耗时16分钟):


      packaging/hudi-flink-bundle/target生成hudi-flink-bundle_2.11-0.7.0.jar,后续会用到。原计划是使用flink最新版本1.12.2的,结果编译失败,如下:

        [ERROR] Failed to execute goal on project hudi-flink-bundle_2.11: Could not resolve dependencies for project org.apache.hudi:hudi-flink-bundle_2.11:jar:0.7.0: Could not find artifact org.apache.flink:flink-connector-kafka-base_2.11:jar:1.12.2 in Maven Central (https://repo.maven.apache.org/maven2) -> [Help 1]
        复制

        原因应该是flink1.12之后版本移除Kafka 0.10.x0.11.x连接器有关,如下图所示,本文不做深究。


        配置文件

          root@felixzh:/opt/hudi-0.7.0#vim hudi-conf.properties
          hoodie.datasource.write.recordkey.field=uuid
          hoodie.datasource.write.partitionpath.field=ts
          bootstrap.servers=felixzh:9092
          hoodie.deltastreamer.keygen.timebased.timestamp.type=EPOCHMILLISECONDS
          hoodie.deltastreamer.keygen.timebased.output.dateformat=yyyy/MM/dd
          hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.TimestampBasedAvroKeyGenerator
          hoodie.embed.timeline.server=false
          hoodie.deltastreamer.schemaprovider.source.schema.file=hdfs:///hudi/flink/schema.avsc
          hoodie.deltastreamer.schemaprovider.target.schema.file=hdfs:///hudi/flink/schema.avsc
          复制
            root@felixzh:/opt/hudi-0.7.0# vim schema.avsc
            {
            "type":"record",
            "name":"stock_ticks",
            "fields":[{
            "name": "uuid",
            "type": "string"
            }, {
            "name": "ts",
            "type": "long"
            }, {
            "name": "symbol",
            "type": "string"
            },{
            "name": "year",
            "type": "int"
            },{
            "name": "month",
            "type": "int"
            },{
            "name": "high",
            "type": "double"
            },{
            "name": "low",
            "type": "double"
            },{
            "name": "key",
            "type": "string"
            },{
            "name": "close",
            "type": "double"
            }, {
            "name": "open",
            "type": "double"
            }, {
            "name": "day",
            "type":"string"
            }
            ]}
            复制

            将配置文件上传到Hdfs

              root@felixzh:/opt/hudi-0.7.0# hdfs dfs -mkdir -p hudi/flink/
              root@felixzh:/opt/hudi-0.7.0# hdfs dfs -put hudi-conf.properties schema.avsc /hudi/flink
              复制


              Kafka数据准备

                ./kafka-console-producer.sh --broker-list localhost:9092 --topic FelixZh < dataHudi.txt
                复制

                其中dataHudi.txt如下:

                  {"close":0.27172637588467297,"day":"2","high":0.4493211149337879,"key":"840ef1","low":0.030714155934507215,"month":11,"open":0.7762668153935262,"symbol":"77c-40d6-8412-6859d4757727","ts":1615429075261,"uuid":"840ef1ff-b77c-40d6-8412-6859d4757727","year":120}
                  复制


                  提交Flink任务

                  命令如下:

                    /opt/bigdata/flink-1.11.2/bin/flink run -m yarn-cluster -c org.apache.hudi.HoodieFlinkStreamer opt/bigdata/hudi-flink-bundle_2.11-0.7.0.jar \
                    --kafka-topic FelixZh \
                    --kafka-group-id hudi_on_flink \
                    --kafka-bootstrap-servers felixzh:9092 \
                    --table-type COPY_ON_WRITE \
                    --target-base-path hdfs:///hudi/flink/data/hudi_on_flink \
                    --target-table hudi_on_flink \
                    --props hdfs:///hudi/flink/hudi-conf.properties \
                    --checkpoint-interval 3000 \
                    --flink-checkpoint-path hdfs:///hudi/flink/hudi_on_flink_cp
                    复制

                    运行结果如下:

                      root@felixzh:/opt/bigdata/hadoop-3.2.2# hdfs dfs -ls hudi/flink/
                      Found 4 items
                      drwxr-xr-x - root supergroup 0 2021-03-11 10:37 /hudi/flink/data
                      -rw-r--r-- 1 root supergroup 586 2021-03-09 17:09 /hudi/flink/hudi-conf.properties
                      drwxr-xr-x - root supergroup 0 2021-03-11 10:36 /hudi/flink/hudi_on_flink_cp
                      -rw-r--r-- 1 root supergroup 593 2021-03-09 17:09 /hudi/flink/schema.avsc


                      root@felixzh:/opt/bigdata/hadoop-3.2.2# hdfs dfs -ls /hudi/flink/data/hudi_on_flink/2021/03/11
                      Found 2 items
                      -rw-r--r-- 1 root supergroup 93 2021-03-11 10:37 /hudi/flink/data/hudi_on_flink/2021/03/11/.hoodie_partition_metadata
                      -rw-r--r-- 1 root supergroup 436906 2021-03-11 10:37 /hudi/flink/data/hudi_on_flink/2021/03/11/cc042f7e-f2b7-4249-b9b2-e4544ca57c01-0_0-1-0_20210311103715.parquet
                      复制



                      文章转载自大数据从业者,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

                      评论