暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Hudi Spark Sql Procedures 回滚 Hudi 表数据

伦少的博客 2024-06-05
538

前言

因为有 Hudi Rollback 的需求,所以单独总结 Hudi Spark Sql Procedures Rollback。

版本

  • Hudi 0.13.0(发现有bug)、(然后升级)0.14.1

  • Spark 3.2.3

Procedures

官方文档:https://hudi.apache.org/docs/procedures
相关阅读:Hudi Spark SQL Call Procedures学习总结(一)(查询统计表文件信息)

Rollback

官方文档:https://hudi.apache.org/docs/rollbacks

回滚数据,可以回滚到至上一个commit(成功的),也可以回滚到指定 commit (savepoint)。本文暂不涉及savepoint,只总结 回滚到上一个 commit。

  • 自动回滚:默认写任务时会自动触发回滚:rollbackFailedWrites

    每次写数据前会先检测上一个commit是否完成,如果上一个commit失败,删除上一个commit,如果成功,则不作任务操作。
    如何检测commit失败:一个完整的commit 有 .commit.requested-> .inflight->.commit ,如果只有  .commit.requested 或 .inflight 则认为不完整,commit失败。
    在哪里触发自动回滚:在 startCommit 方法中 会先调用 CleanerUtils.rollbackFailedWrites 执行 rollback操作,相关源码分析见:Hudi源码|Insert源码分析总结(一)(整体流程)
    举例:
    001 commit 成功、002 commit 失败:先删除002 commit ,然后执行003 commit
    001 commit 成功、002 commit 成功:不回滚,只执行 003 commit

  • 手动回滚:rollback_to_instant
    可以删除上一个成功的commit,回滚到上上个成功的commit。
    举例:
    001 commit 成功、002 commit 成功,可以回滚到001:删除002,最后一次commit 是001
    限制:只能一个一个回滚,也就是只能删除最后一个commit,回滚到倒数第二个commit。可以连续回滚,如果想回滚到某个时间点的coomit,只能用 rollback_to_savepoint 回滚到指定的 savepoint。

本文主要总结手动回滚:rollback_to_instant

SQL 语法

call show_commits(table => 'hudi.test_hudi_table'limit => 10);
call rollback_to_instant(table => 'hudi.test_hudi_table', instant_time => 'committime');
call show_rollbacks(table => 'hudi.test_hudi_table');
call show_rollback_detail(table => 'hudi.test_hudi_table', instant_time => 'committime');

复制

示例

数据准备

create table hudi.test_hudi_table (
  id int,
  name string,
  price double,
  ts long,
  dt string
using hudi
 partitioned by (dt)
 options (
  type = 'cow',
  primaryKey = 'id',
  preCombineField = 'ts'
 ); 

insert into hudi.test_hudi_table values (1,'hudi',10,100,'2024-06-03'); 
insert into hudi.test_hudi_table values (2,'hudi',20,200,'2024-06-03'); 
insert into hudi.test_hudi_table values (3,'hudi',30,300,'2024-06-03');  

复制

SQL 执行

call show_commits(table => 'hudi.test_hudi_table'limit => 10);
20240603145257060       commit  435474  0       1       1       3       0       0
20240603145214563       commit  435442  0       1       1       2       0       0
20240603145154120       commit  435293  1    

复制
call rollback_to_instant(table => 'hudi.test_hudi_table', instant_time => '20240603145257060');

复制
call show_commits(table => 'hudi.test_hudi_table'limit => 10);
20240603145214563       commit  435442  0       1       1       2       0       0
20240603145154120       commit  435293  1       0       1       1       0       0

复制
call show_rollbacks(table => 'hudi.test_hudi_table');
20240603145441704       20240603145257060       1       637     1

复制
call show_rollback_detail(table => 'hudi.test_hudi_table', instant_time => '20240603145441704');
20240603145441704       [20240603145257060]     dt=2024-06-03   //cluster1/warehouse/tablespace/managed/hive/hudi.db/test_hudi_table/dt=2024-06-03/358c406e-12b6-4718-b102-f303eecec6bf-0_0-243-200_20240603145257060.parquet   true

复制

继续 rollback

call rollback_to_instant(table => 'hudi.test_hudi_table', instant_time => '20240603145214563');

复制
call show_commits(table => 'hudi.test_hudi_table'limit => 10);
20240603145154120       commit  435293  1       0       1       1       0       0

复制
call show_rollbacks(table => 'hudi.test_hudi_table');
20240603145441704       20240603145257060       1       637     1
20240603150458282       20240603145214563       1       628     1

复制

结果验证

效果截图:




可以看到:第一次 rollback 删除了id=3的数据和commit,生成了 20240603145441704.rollback;第二次删除了id=2的数据和commit,生成了 20240603150458282.rollback

Flink 结合

验证 Spark SQL rollback Flink写的表是否有问题

Flink 造数

rollback 既可以回滚 cow 表也可以,也可以回滚 mor 表,上面验证的 cow 表,这次用 mor 表验证。

set execution.target=yarn-per-job;
set taskmanager.numberOfTaskSlots=1;
set execution.runtime-mode=batch;
set table.dml-sync = true;

CREATE TABLE test_flink_mor (
  id int PRIMARY KEY NOT ENFORCED,
  name string,
  price int,
  ts int,
  dt string
)
PARTITIONED BY (dt)
WITH (
  'connector' = 'hudi',
  'path' = '/tmp/hudi/test_flink_mor',
  'table.type' = 'MERGE_ON_READ',
  'hoodie.datasource.write.keygenerator.class' = 'org.apache.hudi.keygen.ComplexAvroKeyGenerator',
  'hoodie.datasource.write.recordkey.field' = 'id',
  'hoodie.datasource.write.hive_style_partitioning' = 'true',
  'hive_sync.enable' = 'true',
  'hive_sync.mode' = 'hms',
  'hive_sync.conf.dir'='/usr/hdp/3.1.0.0-78/hive/conf',
  'hive_sync.db' = 'hudi',
  'hive_sync.table' = 'test_flink_mor',
  'hive_sync.partition_fields' = 'dt',
  'hive_sync.partition_extractor_class' = 'org.apache.hudi.hive.HiveStylePartitionValueExtractor',
  'hoodie.datasource.hive_sync.create_managed_table' = 'true',
  'compaction.async.enabled''false',
  'compaction.delta_commits' = '2',
  'hoodie.compact.inline' = 'true',
  'hoodie.compact.inline.max.delta.commits' = '2'
);

insert into test_flink_mor values (1,'hudi',10,100,'2024-06-03');
insert into test_flink_mor values (2,'hudi',20,200,'2024-06-03');
insert into test_flink_mor values (3,'hudi',30,300,'2024-06-03');

复制

验证

call show_commits(table => 'hudi.test_flink_mor_ro'limit => 10);
20240603152727949       deltacommit     903     0       1       1       1       1       0
20240603152637632       commit  435262  1       0       1       2       0       0
20240603152636150       deltacommit     903     0       1       1       1       1       0
20240603152552562       deltacommit     903     0       1       1       1       0       0

复制
call rollback_to_instant(table => 'hudi.test_flink_mor_ro', instant_time => '20240603152727949');

复制
call show_commits(table => 'hudi.test_flink_mor_ro'limit => 10);
20240603152637632       commit  435262  1       0       1       2       0       0
20240603152636150       deltacommit     903     0       1       1       1       1       0
20240603152552562       deltacommit     903     0       1       1       1       0       0

复制

继续回滚:

call rollback_to_instant(table => 'hudi.test_flink_mor_ro', instant_time => '20240603152637632');

复制

hudi 0.13.0 会抛异常

call rollback_to_instant(table => 'hudi.test_flink_mor_ro', instant_time => '20240603152637632');
24/06/03 15:49:40 ERROR SparkSQLDriver: Failed in [call rollback_to_instant(table => 'hudi.test_flink_mor_ro', instant_time => '20240603152637632')]
org.apache.hudi.exception.HoodieRollbackException: Failed to rollback hdfs://cluster1/tmp/hudi/test_flink_mor commits 20240603152637632
Caused by: org.apache.hudi.exception.HoodieMetadataException: The instant [20240603152637632__deltacommit__COMPLETED] required to sync rollback of 20240603152637632 has been archived

复制



问题

这里实际删除了对应的commit 和数据 :20240603152637632.commit fc621eda-7f97-465d-b191-9eab0b4bb660_0-1-0_20240603152637632.parquet,但是.requested和 .inflight 没有删除成功就报错了。

也就是 Hudi 0.13.0 spark sql rollback spark 写的hudi 表没有问题,rollback flink写的表只有第一次成功,第二次会失败,导致之后无法继续进行rollback了。

问题原因:版本代码bug

问题解决

1、升级到目前最新发行版:0.14.1
2、如果因各种原因不能升级版本,可以合并相关PR到0.13.0 (暂不清楚涉及哪些PR)

升级验证

重新造数:test_flink_mor




可以看到连续两次rollback都是成功的!

测试 rollback 失败的 commit

rollback_to_instant

首先人为删除表 hudi.test_hudi_table 的 20240603145154120.commit,这样20240603145154120对应的commit就是不完整的、失败的。

hadoop fs -rm -r /warehouse/tablespace/managed/hive/hudi.db/test_hudi_table/.hoodie/20240603145154120.commit

复制

先测试 hudi 0.13.0 rollback_to_instant
是否正常

call rollback_to_instant(table => 'hudi.test_hudi_table', instant_time => '20240603145154120');
24/06/04 09:51:34 ERROR SparkSQLDriver: Failed in [call rollback_to_instant(table => 'hudi.test_hudi_table', instant_time => '20240603145154120')]
org.apache.hudi.exception.HoodieException: Commit 20240603145154120 not found in Commits org.apache.hudi.common.table.timeline.HoodieDefaultTimeline:

复制

0.13.0 抛出异常:Commit 20240603145154120 not found in Commits

然后测试 0.14.1 是否正常


自动回滚

测试写数据时是否会自动回滚。
先用flink insert一条数据,生成 .deltacommit 后,人为删除 .deltacommit,保留 .deltacommit.requested 和 .deltacommit.inflight
然后执行再用flink insert一条数据,看是否会触发rollback。

可以看到写数据时,会先rollback上一次失败的commit,再写数据生成新的commit

相关异常

记录遇到的相关异常信息:https://note.youdao.com/s/EtQEL1wC

小结

对于 rollback_to_instant

  • 只能 rollback 最后一个commit

  • 可以连续rollback,对于Spark写的表没有问题,而对于Flink写的表 hudi 0.13.0 有bug ,可以升级到0.14.1解决。

  • 可以rollback 失败的commit,无论是Spark写的表还是Flink写的表,hudi 0.13.0 都有bug,均可以升级到0.14.1解决。

  • rollback 会删除对应的 .commit 和 数据文件,生成 .rollback

  • 可以回滚哪些?对于cow 表:.commit;对于 mor表:.deltacommit 和 .commit

相关阅读

🧐 分享、点赞、在看,给个3连击👇

文章转载自伦少的博客,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论