测试过程环境版本说明 集群服务器基础环境 Hudi 编译环境配置 Flink 环境配置 启动 Flink Yarn Session 服务 MySQL binlog 开启配置 Flink CDC sink Hudi 测试代码过程
一、测试过程环境版本说明
二、集群服务器基础环境
2.1 Maven 和 JDK 环境版本

2.2 Hadoop 集群环境版本

2.3 HADOOP环境变量配置
export HADOOP_HOME=/opt/cloudera/parcels/CDH/lib/hadoopexport HADOOP_CALSSPATH=`$HADOOP_HOME/bin/hadoop classpath`
三、Hudi 编译环境配置
3.1 Maven Home settings.xml 配置修改

<mirrors><mirror><id>alimaven</id><mirrorOf>central,!cloudera</mirrorOf><name>aliyun maven</name><url>http://maven.aliyun.com/nexus/content/groups/public/</url></mirror></mirrors>
3.2 下载 Hudi 源码包
git clone https://github.com/apache/hudi.git

Hudi 社区建议版本适配
Hudi0.9 适配 Flink 1.12.2 Hudi0.10(master) 适配 Flink 1.13.X (说明 master 分支上版本还未 release)
3.3 Hudi 客户端命令行

3.4 修改 Hudi 集成 Flink 和 Hive 编译依赖版本配置


<profile><id>flink-bundle-shade-hive2</id><properties><hive.version>2.1.1-cdh6.2.0</hive.version><flink.bundle.hive.scope>compile</flink.bundle.hive.scope></properties><dependencies><dependency><groupId>${hive.groupid}</groupId><artifactId>hive-service-rpc</artifactId><version>${hive.version}</version><scope>${flink.bundle.hive.scope}</scope></dependency></dependencies></profile>
3.5 编译 Hudi 指定 Hadoop 和 Hive 版本信息
mvn clean install -DskipTests -Drat.skip=true -Dscala-2.11 -Dhadoop.version=3.0.0 -Pflink-bundle-shade-hive2

3.6 Hudi 编译异常



3.7 Hudi 重新编译

3.8 Hudi 编译结果说明


四、Flink 环境配置
4.1 FLINK_HOME 下 sql-client-defaults.yaml 配置

4.2 flink-conf.yaml 配置修改


# state.backend: filesystemstate.backend: rocksdb# 开启增量checkpointstate.backend.incremental: true# state.checkpoints.dir: hdfs://namenode-host:port/flink-checkpointsstate.checkpoints.dir: hdfs://nameservice/flink/flink-checkpointsclassloader.check-leaked-classloader: falseclassloader.resolve-order: parent-first
4.3 FLINK_HOME lib下添加依赖

flink-sql-connector-mysql-cdc-1.4.0.jarflink-sql-connector-oracle-cdc-2.1-SNAPSHOT.jar.BAK – oracle cdc 依赖flink-format-changelog-json-1.4.0.jarflink-sql-connector-kafka_2.11-1.13.1.jar--- Hadoop home lib下copy过来hadoop-mapreduce-client-common-3.0.0-cdh6.2.0.jarhadoop-mapreduce-client-core-3.0.0-cdh6.2.0.jarhadoop-mapreduce-client-jobclient-3.0.0-cdh6.2.0.jar--- hudi编译jar copy过来hudi-flink-bundle_2.11-0.10.0-SNAPSHOT.jar
五、启动 Flink Yarn Session 服务
5.1 FLINK_HOME shell 命令
$FLINK_HOME/bin/yarn-session.sh -s 2-jm 2048-tm 2048-nm ys-hudi01 -d

5.2 Yarn Web UI

5.3 Flinksql Client 启动命令
$FLINK_HOME/bin/sql-client.sh embedded -j ./lib/hudi-flink-bundle_2.11-0.10.0-SNAPSHOT.jar shell


六、MySQL binlog 开启配置
6.1 创建 binlog 日志存储路径
mkdir logs
6.2 修改目录属主和 group
chown -R mysql:mysql /mysqldata/logs
6.3 修改 mysql 配置信息
vim /etc/my.cnfserver-id=2log-bin= /mysqldata/logs/mysql-binbinlog_format=rowexpire_logs_days=15binlog_row_image=full
6.4 修改完,重启 mysql server
service mysqld restart
6.5 客户端查看 binlog 日志情况


6.6 创建 mysql sources 表 DDL
create table users_cdc(id bigint auto_increment primary key,name varchar(20) null,birthday timestamp default CURRENT_TIMESTAMP notnull,ts timestamp default CURRENT_TIMESTAMP notnull);

七、Flink CDC sink Hudi 测试代码过程
7.1 Flink sql cdc DDL 语句:(具体参数说明可参考 Flink 官网)
CREATE TABLE mysql_users (id BIGINT PRIMARY KEY NOT ENFORCED ,name STRING,birthday TIMESTAMP(3),ts TIMESTAMP(3)) WITH ('connector'= 'mysql-cdc','hostname'= '127.0.0.1','port'= '3306','username'= '','password'=’’,'server-time-zone'= 'Asia/Shanghai','debezium.snapshot.mode'='initial','database-name'= 'luo','table-name'= 'users_cdc');

7.2 查询 mysql cdc 表
Flink SQL> select * from mysql_users;




7.3 创建一个临时视图,增加分区列方便后续同步 Hive 分区表
Flink SQL> create view mycdc_v AS SELECT *, DATE_FORMAT(birthday, 'yyyyMMdd') as partition FROM mysql_users;
说明:partition 关键字需要 `` 引起来

Flink SQL> select * from mycdc_v;



7.4 设置 checkpoint 间隔时间,存储路径已在 flink-conf 配置设置全局路径
Flink SQL> set execution.checkpointing.interval=30sec;

7.5 Flinksql 创建 cdc sink hudi 文件,并自动同步 Hive 分区表 DDL 语句
CREATE TABLE mysqlcdc_sync_hive01(id bigint ,name string,birthday TIMESTAMP(3),ts TIMESTAMP(3),`partition` VARCHAR(20),primary key(id) not enforced --必须指定uuid 主键)PARTITIONED BY (`partition`)with('connector'='hudi','path'= 'hdfs://nameservice /luo/hudi/mysqlcdc_sync_hive01', 'hoodie.datasource.write.recordkey.field'= 'id'-- 主键, 'write.precombine.field'= 'ts'-- 自动precombine的字段, 'write.tasks'= '1', 'compaction.tasks'= '1', 'write.rate.limit'= '2000'-- 限速, 'table.type'= 'MERGE_ON_READ'-- 默认COPY_ON_WRITE,可选MERGE_ON_READ, 'compaction.async.enabled'= 'true'-- 是否开启异步压缩, 'compaction.trigger.strategy'= 'num_commits'-- 按次数压缩, 'compaction.delta_commits'= '1'-- 默认为5, 'changelog.enabled'= 'true'-- 开启changelog变更, 'read.streaming.enabled'= 'true'-- 开启流读, 'read.streaming.check-interval'= '3'-- 检查间隔,默认60s, 'hive_sync.enable'= 'true'-- 开启自动同步hive, 'hive_sync.mode'= 'hms'-- 自动同步hive模式,默认jdbc模式, 'hive_sync.metastore.uris'= 'thrift://hadoop:9083'-- hive metastore地址-- , 'hive_sync.jdbc_url'= 'jdbc:hive2://hadoop:10000'-- hiveServer地址, 'hive_sync.table'= 'mysqlcdc_sync_hive01'-- hive 新建表名, 'hive_sync.db'= 'luo'-- hive 新建数据库名, 'hive_sync.username'= ''-- HMS 用户名, 'hive_sync.password'= ''-- HMS 密码, 'hive_sync.support_timestamp'= 'true'-- 兼容hive timestamp类型);
说明:Hudi 目前支持 MOR 和 COW 两种模式
Copy on Write:使用列式存储来存储数据 (例如:parquet),通过在写入期间执行同步合并来简单地更新和重现文件 Merge on Read:使用列式存储 (parquet) + 行式文件 (arvo) 组合存储数据。更新记录到增量文件中,然后进行同步或异步压缩来生成新版本的列式文件。
使用场景上:
COW 适用写少读多的场景 ,MOR 适用写多读少的场景; MOR 适合 CDC 场景,更新延迟要求较低,COW 目前不支持 changelog mode 不适合处理 cdc 场景;



7.6 Flink sql mysql cdc 数据写入 Hudi 文件数据
Flink SQL> insert into mysqlcdc_sync_hive01 select id,name,birthday,ts,`partition` from mycdc_v;


7.7 HDFS 上 Hudi 文件目录情况


7.8 Mysql 数据源写入测试数据
insert into users_cdc (name) values ('cdc01');

7.9 Flinksql 查询 mysql cdc insert 数据
Flink SQL> set execution.result-mode=tableau;[WARNING] The specified key 'execution.result-mode' is deprecated. Please use 'sql-client.execution.result-mode' instead.[INFO] Session property has been set.Flink SQL> select * from mysql_users; -- 查询到一条insert数据

7.10 Flink web UI 页面可以看到 DAG 各个环节产生一条测试数据

7.11 Flinksql 查询 sink 的 Hudi 表数据
Flink SQL> select * from mysqlcdc_sync_hive01; --已查询到一条insert数据

7.12 Hdfs 上 Hudi 文件目录变化情况

7.13 Hive 分区表和数据自动同步情况

7.14 查看自动创建 Hive 表结构
hive> show create table mysqlcdc_sync_hive01_ro;

hive> show create table mysqlcdc_sync_hive01_rt;

7.15 查看自动生成的表分区信息
hive> show partitions mysqlcdc_sync_hive01_ro;hive> show partitions mysqlcdc_sync_hive01_rt;

mysqlcdc_sync_hive01_romysqlcdc_sync_hive01_rt
ro 表和 rt 表区别:
ro 表全称 read oprimized table,对于 MOR 表同步的 xxx_ro 表,只暴露压缩后的 parquet。其查询方式和 COW 表类似。设置完 hiveInputFormat 之后和普通的 Hive 表一样查询即可; rt 表示增量视图,主要针对增量查询的 rt 表;
7.16 Hive 访问 Hudi 数据
引入 Hudi 依赖 jar 方式:
引入到 $HIVE_HOME/lib 下; 引入到 $HIVE_HOME/auxlib 自定义第三方依赖 修改 hive-site.xml 配置文件; Hive shell 命令行引入 Session 级别有效;
查询 Hive 分区表数据:
hive> select * from mysqlcdc_sync_hive01_ro; --已查询到mysq insert的一条数据

hive> select * from mysqlcdc_sync_hive01_rt; --已查询到mysq insert的一条数据

Hive 条件查询:
hive> select name,ts from mysqlcdc_sync_hive01_ro where partition='20211109';

Hive ro 表 count 查询
hive> select count(1) from mysqlcdc_sync_hive01_ro;

Hive Count 异常解决:
hive> add jar hdfs://nameservice /luo/hudi-hadoop-mr-bundle-0.10.0-SNAPSHOT.jar;hive> set hive.input.format = org.apache.hudi.hadoop.hive.HoodieCombineHiveInputFormat;

hive> select count(1) from mysqlcdc_sync_hive01_ro; --可正常count

Hive rt 表 count 查询
hive> select count(1) from mysqlcdc_sync_hive01_rt;

7.17 Mysql 数据源写入多条测试数据
insert into users_cdc (name) values ('cdc02');insert into users_cdc (name) values ('cdc03');insert into users_cdc (name) values ('cdc04');insert into users_cdc (name) values ('cdc05');insert into users_cdc (name) values ('cdc06');


7.18 Flinksql 中新写入数据查询情况

Yarn web UI application_1626256835287_40351[1] 资源使用情况

Hdfs 上 Hudi 文件目录变化情况

Hudi 状态文件说明:
requested:表示一个动作已被安排,但尚未启动 inflight:表示当前正在执行操作 completed:表示在时间线上完成了操作
Flink jobmanager log sync hive过程详细日志



7.19 Mysql 数据源更新数据
update users_cdc set name = 'cdc05-bj'where id = 5;

7.20 Flinksql 查询 cdc update 数据产生两条 binlog 数据


7.21 MySQL 数据源 delete 一条数据
deletefrom users_cdc where id = 3;

Flink Web UI job DAG 中捕获一条新数据:

Flinksql changlog delete 数据变化查询

HDFS 上 Hudi 数据文件生成情况


Hudi 文件类型说明:
commits:表示将一批数据原子性写入表中; cleans:清除表中不在需要的旧版本文件的后台活动; delta_commit:增量提交是指将一批数据原子性写入 MergeOnRead 类型的表中,其中部分或者所有数据可以写入增量日志中; compaction:协调 Hudi 中差异数据结构的后台活动,例如:将更新从基于行的日志文件变成列格式。在内部,压缩的表现为时间轴上的特殊提交; rollback:表示提交操作不成功且已经回滚,会删除在写入过程中产生的数据。

Flink 任务 checkpoint 情况:


7.22 Hive shell 查询数据 update 和 delete 变化情况
hive> select * from mysqlcdc_sync_hive01_ro;

hive> select * from mysqlcdc_sync_hive01_rt;

7.23 Hudi Client 端操作 Hudi 表
进入 Hudi 客户端命令行
hudi-master/hudi-cli/hudi-cli.sh
连接 Hudi 表,查看表信息
hudi->connect --path hdfs://nameservice1/tmp/luo/hudi/mysqlcdc_sync_hive01

查看 Hudi commit 信息
hudi:mysqlcdc_sync_hive01->commits show --sortBy "CommitTime"

hudi:mysqlcdc_sync_hive01->compactions show all

7.24 PrestoDB 查询 Hive 表 Hudi 数据
版本说明:PrestoDB 0.256 DBeaver7.0.4
可通过 presto-cli 连接 hive metastore 开启查询,presto-cli 的设置参考 presto官方配置;
DBeaver 客户端查询 Hive ro 表数据:

Hive ro 表 count 正常:

查询 Hive rt 表数据查询异常:

Hive rt 表 count 异常:

Presto Web UI:


2022 年 1 月 8-9 日,FFA 2021 重磅开启,全球 40+ 多行业一线厂商,80+ 干货议题,带来专属于开发者的技术盛宴。


戳我,点击右上角预约 FFA 2021~
文章转载自Flink 中文社区,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。




