暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Hudi系列25: Flink SQL使用checkpoint恢复job异常

原创 只是甲 2023-05-23
903

Table of Contents

一. 通过Flink SQL将MySQL数据写入Hudi

启动Yarn Session

$FLINK_HOME/bin/yarn-session.sh -jm 16384 -tm 16384 -d  2>&1 &

/home/flink-1.14.5/bin/sql-client.sh embedded -s yarn-session 

Flink SQL 代码:

-- 设置checkpoint的时间间隔
set execution.checkpointing.interval=60sec;
-- 设置任务结束后不清空checkpoint文件,便于后续恢复
set execution.checkpointing.externalized-checkpoint-retention=RETAIN_ON_CANCELLATION;
-- 同时只能有一个checkpoint进程
set execution.checkpointing.max-concurrent-checkpoints=1;

CREATE TABLE flink_mysql_cdc1 (
    id BIGINT NOT NULL PRIMARY KEY NOT ENFORCED,
    name varchar(100)
  ) WITH (
    'connector' = 'mysql-cdc',
    'hostname' = 'hp8',
    'port' = '3306',
    'username' = 'root',
    'password' = 'abc123',
    'database-name' = 'test',
    'table-name' = 'mysql_cdc',
    'server-id' = '5409-5415',
    'scan.incremental.snapshot.enabled'='true'
);

CREATE TABLE flink_hudi_mysql_cdc1(
    id BIGINT NOT NULL PRIMARY KEY NOT ENFORCED,
    name varchar(100)
  ) WITH (
   'connector' = 'hudi',
   'path' = 'hdfs://hp5:8020/tmp/hudi/flink_hudi_mysql_cdc1',
   'table.type' = 'MERGE_ON_READ',
   'changelog.enabled' = 'true',
   'hoodie.datasource.write.recordkey.field' = 'id',
   'write.precombine.field' = 'name',
   'compaction.async.enabled' = 'true',
   'hive_sync.enable' = 'true',
   'hive_sync.table' = 'flink_hudi_mysql_cdc1',
   'hive_sync.db' = 'test',
   'hive_sync.mode' = 'hms',
   'hive_sync.metastore.uris' = 'thrift://hp5:9083',
   'hive_sync.conf.dir'='/home/apache-hive-3.1.2-bin/conf'
);


set table.exec.resource.default-parallelism=4;

insert into flink_hudi_mysql_cdc1 select * from flink_mysql_cdc1;

Flink web界面:
2000w数据初始化已经完成
image.png

checkpoint日志量真的多
image.png

hdfs查看checkpoint日志量
image.png

二. 模拟Flink任务异常

2.1 手工停止job

在Flink web界面将Flink SQL任务手工结束掉
image.png

2.2 指定checkpoint来恢复数据

查找最近的checkpoint:
image.png

代码:

set 'execution.savepoint.path'='hdfs://hp5:8020/vmcluster/flink-checkpoints/a2874606453b4aebfdaca2f627355f99/chk-23';

insert into flink_hudi_mysql_cdc1 select * from flink_mysql_cdc1;

image.png

image.png

2.3 整个yarn-session上的任务恢复

待测试:
如果是整个yarn-session异常,也可以启动yarnsession的时候指定checkpoint。

$FLINK_HOME/bin/yarn-session.sh -jm 8192 -tm 8192 -d -s hdfs://hp5:8020/vmcluster/flink-checkpoints/c12eb538c2e8965d2d94c170b67641f2/chk-1/_metadata

/home/flink-1.14.5/bin/sql-client.sh embedded -s yarn-session 

三. 模拟源端异常

3.1 手工关闭源端 MySQL 服务

service mysqld stop

3.2 FLink任务查看

Flink可以自己重试,这个还是比较不错,无需人工干预。

等mysql启动成功之后,任务又可以继续衔接上。

image.png

FAQ:

1. checkpoint未写入数据

别人的checkpoint
image.png

我的checkpoint
看来是我的checkpoint都没成功
image.png

修改checkpoint时间间隔:

-- 修改前:
set execution.checkpointing.interval=10sec;

-- 修改后:
set execution.checkpointing.interval=60sec;

2. checkpoint 失败

报错信息:

Checkpoint Coordinator is suspending.

image.png

解决方案:
把yarn-session的资源由8G提升到16G问题解决。

对于一些大表,最好还是先通过Spark进行初始化,然后在接增量。

3. 手工取消Flink job后,checkpoint文件自动删除

https://developer.aliyun.com/ask/435979
http://events.jianshu.io/p/032396543ceb

在网上看到的资源都是针对代码级别的,没有看到Flink SQL级别的
还是得上官网查找
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/config/

set execution.checkpointing.externalized-checkpoint-retention=RETAIN_ON_CANCELLATION;

参考:

  1. https://blog.csdn.net/qq_31866793/article/details/103069646
「喜欢这篇文章,您的关注和赞赏是给作者最好的鼓励」
关注作者
【版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

文章被以下合辑收录

评论

暂无图片
获得了16次点赞
暂无图片
内容获得8次评论
暂无图片
获得了18次收藏
目录
  • 一. 通过Flink SQL将MySQL数据写入Hudi
  • 二. 模拟Flink任务异常
    • 2.1 手工停止job
    • 2.2 指定checkpoint来恢复数据
    • 2.3 整个yarn-session上的任务恢复
  • 三. 模拟源端异常
    • 3.1 手工关闭源端 MySQL 服务
    • 3.2 FLink任务查看
  • FAQ:
    • 1. checkpoint未写入数据
    • 2. checkpoint 失败
    • 3. 手工取消Flink job后,checkpoint文件自动删除
  • 参考: