前言
Hadoop、Flink已经编译部署完成(见之前的发文),笔者终于可以体验下hudi使用flink引擎。虽然,目前只支持Kafka数据源写COW表,笔者对hudi这一数据湖技术极为感兴趣。
环境介绍
Zookeeper:3.5.8
Kafka:2.7.0
Hadoop:3.2.2
Flink:1.11.2
Hudi: 0.7.0
复制
Hudi源码编译
root@felixzh:/opt# wget https://downloads.apache.org/hudi/0.7.0/hudi-0.7.0.src.tgz
root@felixzh:/opt# tar –zxvf hudi-0.7.0.src.tgz&& cd hudi-0.7.0
root@felixzh:/opt/hudi-0.7.0# mvn clean package –DskipTests –Dhadoop.version=3.2.2–Dflink.version=1.11.2
复制
编译结果(大约耗时16分钟):
packaging/hudi-flink-bundle/target生成hudi-flink-bundle_2.11-0.7.0.jar,后续会用到。原计划是使用flink最新版本1.12.2的,结果编译失败,如下:
[ERROR] Failed to execute goal on project hudi-flink-bundle_2.11: Could not resolve dependencies for project org.apache.hudi:hudi-flink-bundle_2.11:jar:0.7.0: Could not find artifact org.apache.flink:flink-connector-kafka-base_2.11:jar:1.12.2 in Maven Central (https://repo.maven.apache.org/maven2) -> [Help 1]
复制
原因应该是flink1.12之后版本移除Kafka 0.10.x和0.11.x连接器有关,如下图所示,本文不做深究。
配置文件
root@felixzh:/opt/hudi-0.7.0#vim hudi-conf.properties
hoodie.datasource.write.recordkey.field=uuid
hoodie.datasource.write.partitionpath.field=ts
bootstrap.servers=felixzh:9092
hoodie.deltastreamer.keygen.timebased.timestamp.type=EPOCHMILLISECONDS
hoodie.deltastreamer.keygen.timebased.output.dateformat=yyyy/MM/dd
hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.TimestampBasedAvroKeyGenerator
hoodie.embed.timeline.server=false
hoodie.deltastreamer.schemaprovider.source.schema.file=hdfs:///hudi/flink/schema.avsc
hoodie.deltastreamer.schemaprovider.target.schema.file=hdfs:///hudi/flink/schema.avsc
复制
root@felixzh:/opt/hudi-0.7.0# vim schema.avsc
{
"type":"record",
"name":"stock_ticks",
"fields":[{
"name": "uuid",
"type": "string"
}, {
"name": "ts",
"type": "long"
}, {
"name": "symbol",
"type": "string"
},{
"name": "year",
"type": "int"
},{
"name": "month",
"type": "int"
},{
"name": "high",
"type": "double"
},{
"name": "low",
"type": "double"
},{
"name": "key",
"type": "string"
},{
"name": "close",
"type": "double"
}, {
"name": "open",
"type": "double"
}, {
"name": "day",
"type":"string"
}
]}
复制
将配置文件上传到Hdfs
root@felixzh:/opt/hudi-0.7.0# hdfs dfs -mkdir -p hudi/flink/
root@felixzh:/opt/hudi-0.7.0# hdfs dfs -put hudi-conf.properties schema.avsc /hudi/flink
复制
Kafka数据准备
./kafka-console-producer.sh --broker-list localhost:9092 --topic FelixZh < dataHudi.txt
复制
其中dataHudi.txt如下:
{"close":0.27172637588467297,"day":"2","high":0.4493211149337879,"key":"840ef1","low":0.030714155934507215,"month":11,"open":0.7762668153935262,"symbol":"77c-40d6-8412-6859d4757727","ts":1615429075261,"uuid":"840ef1ff-b77c-40d6-8412-6859d4757727","year":120}
复制
提交Flink任务
命令如下:
/opt/bigdata/flink-1.11.2/bin/flink run -m yarn-cluster -c org.apache.hudi.HoodieFlinkStreamer opt/bigdata/hudi-flink-bundle_2.11-0.7.0.jar \
--kafka-topic FelixZh \
--kafka-group-id hudi_on_flink \
--kafka-bootstrap-servers felixzh:9092 \
--table-type COPY_ON_WRITE \
--target-base-path hdfs:///hudi/flink/data/hudi_on_flink \
--target-table hudi_on_flink \
--props hdfs:///hudi/flink/hudi-conf.properties \
--checkpoint-interval 3000 \
--flink-checkpoint-path hdfs:///hudi/flink/hudi_on_flink_cp
复制
运行结果如下:
root@felixzh:/opt/bigdata/hadoop-3.2.2# hdfs dfs -ls hudi/flink/
Found 4 items
drwxr-xr-x - root supergroup 0 2021-03-11 10:37 /hudi/flink/data
-rw-r--r-- 1 root supergroup 586 2021-03-09 17:09 /hudi/flink/hudi-conf.properties
drwxr-xr-x - root supergroup 0 2021-03-11 10:36 /hudi/flink/hudi_on_flink_cp
-rw-r--r-- 1 root supergroup 593 2021-03-09 17:09 /hudi/flink/schema.avsc
root@felixzh:/opt/bigdata/hadoop-3.2.2# hdfs dfs -ls /hudi/flink/data/hudi_on_flink/2021/03/11
Found 2 items
-rw-r--r-- 1 root supergroup 93 2021-03-11 10:37 /hudi/flink/data/hudi_on_flink/2021/03/11/.hoodie_partition_metadata
-rw-r--r-- 1 root supergroup 436906 2021-03-11 10:37 /hudi/flink/data/hudi_on_flink/2021/03/11/cc042f7e-f2b7-4249-b9b2-e4544ca57c01-0_0-1-0_20210311103715.parquet
复制
文章转载自大数据从业者,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。