暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

IDEA 中实现 Flink 的 Checkpoint Restore

大数据渣渣瑞 2021-07-08
4378

本文主要介绍如何在 IDEA 中实现 Flink 的 Checkpoint Restore。完整代码参考:github:https://github.com/1996fanrui/fanrui-learning/tree/master/module-flink/src/main/java/com/dream/flink/state/restore/ide

1、IDEA 中直接运行 Flink 代码

Flink 代码很容易在 IDEA 中运行,方便用户调试、定位问题。如下图所示,点击绿色的 Run 按钮,即在 IDEA 中开始运行 Flink Job。

IDEA 中运行 Flink Job,就是在 env.execute() 中将任务提交到 MiniCluster 中。

有些用户可能会说:我在本地 IDEA 中直接运行会报错,类似于 ClassNotFound 的错误。一般这种错误是因为有些 Maven 依赖在运行时依赖不到。

简单的方法:直接下载渣渣瑞的 flink 项目,都是配置好的,链接:https://github.com/1996fanrui/fanrui-learning

IDEA 中直接运行 Flink Job 简单方便,但是存在一些痛点。例如,渣渣瑞创建的《大数据技术交流群》里前几天有群友问:有人试过本地 IDEA 运行任务从 Savepoint 处启动吗?

其实渣渣瑞之前也有类似的需求,有时候想本地调试 Checkpoint 恢复的流程,但 IDEA 运行 Job 一般都是直接启动的方式,根本不会运行 restore 相关的流程。

基于上述的需求背景,渣渣瑞开始研究一波骚操作可以在 IDEA 中指定 Checkpoint 或 Savepoint 目录进行 restore。最后封装的代码非常简单。

2、IDEA 中实现 Restore

其实 Flink 源码中有很多的测试案例,用来验证 Flink Job 的 Checkpoint 和 Restore。渣渣瑞就模拟源码中的实现去设计,具体源码参考 ResumeCheckpointManuallyITCase 类。

这里多唠两句,阅读源码的单元测试也是一个熟悉源码的过程。通过测试案例可以了解被测试类的功能,是了解源码的一些途径。一般单元测试中会考虑到边界条件、异常条件,通过阅读单测可以学习大佬们严谨的思想。

借鉴源码中的思想设计完以后,封装了 CheckpointRestoreByIDEUtils 工具类。如果想让 Flink Job 从 Checkpoint 目录 Restore,代码改动非常小 ,只需要改两行代码即可。 

正常的 Flink DataStream 的代码最后一行执行 env.execute(),现在只需要替换成 CheckpointRestoreByIDEUtils.run(env.getStreamGraph(), externalCheckpoint) 即可。如下图所示,注释掉 env.execute(),新增了两行代码。

重点在于 CheckpointRestoreByIDEUtils 类 run 方法的实现,参考注释:

public static void run(
        @Nonnull StreamGraph streamGraph,
        @Nullable String externalCheckpoint)
 throws Exception 
{
    // 根据 StreamGraph 生成 JobGraph
    JobGraph jobGraph = streamGraph.getJobGraph();
    if (externalCheckpoint != null) {
        // 将 Checkpoint 目录设置到 JobGraph 中
        jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(externalCheckpoint));
    }

    // 计算 jobGraph 需要的 slot 个数
    int slotNum = getSlotNum(jobGraph);
    // 初始化 MiniCluster
    ClusterClient<?> clusterClient = initCluster(slotNum);
    // 提交任务到 MiniCluster
    clusterClient.submitJob(jobGraph).get();
}

复制

run 方法实现很简单,有一个操作比较有意思,即:如何计算 jobGraph 需要的 slot 个数。感兴趣的同学可以看一下代码实现。完整代码在 github:https://github.com/1996fanrui/fanrui-learning/tree/master/module-flink/src/main/java/com/dream/flink/state/restore/ide

这里涉及到 Flink slotSharingGroup 的概念。默认情况下,Flink 只有一个 slotSharingGroup,即:default。概念比较生硬,举个例子:假设 Flink Job 中有三个算子 A 和 B,没有单独指定 slotSharingGroup,算子 A 的并行度为 10,算子 B 的并行度为 15,算子 C 的并行度为 20。那么任务总共只需要申请 20 个 slot。

文章转载自大数据渣渣瑞,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论