大家好,我是小寒~
今天给大家带来一篇 flink 作业提交相关的文章。
在 FLink Client 中通过反射启动 Jar 中的 main 函数,生成 Flink StreamGraph、JobGraph,将 JobGraph 提交给 Flink 集群。
FLink 集群收到 JobGraph 之后,将 JobGraph 翻译成 ExecutionGraph,然后开始调度执行,启动成功之后开始消费数据。
提交流程
作业执行可以选择 Session 和 Per-Job 模式两种集群。
Session 模式的集群,一个集群中运行多个作业。 Per-Job 模式的集群,一个集群中只运行一个作业,作业执行完毕则集群销毁。
流水线执行器 PipelineExecutor
Session 模式
该模式下,作业共享集群资源,作业通过 Http 协议进行提交。
Per-Job 模式
该模式下,一个作业一个集群,作业之间相互隔离。
在 FLink 1.10 版本中,只有 Yarn 上实现了 Per-Job 模式。
yarn session 的提交流程
启动集群
(1) 使用 yarn-session.sh 提交会话模式的作业
如果是启动新的 Yarn Session 集群,则进入到步骤 (2)。
(2)Yarn 启动新的 Flink 集群
2、作业提交
Yarn 集群准备好后,开始作业提交。
(1)Flink Client 通过 Rest 向 Dispatcher 提交 JobGraph。
作业调度执行
至此,作业进入执行阶段。
Yarn Per-Job 提交流程
启动集群
(1)使用 flink run -m yarn-cluster 提交 Per-Job 模式的作业。
(2)Yarn 启动 Flink 集群。该模式下 Flink 集群的启动入口是 YarnJobClusterEntryPoint,其它与 Yarn-Session 模式启动类似。 作业提交
该步骤与 Session 模式下的不同之处在于,Client 并不会通过 Rest 向 Dispacher 提交 JobGraph,由 Dispacher 从本地文件系统获取 JobGraph,其后的步骤与 Session 模式一样。 作业调度执行
与 Yarn-Session 模式下一致。
流处理的转换过程
StreamGraph
我们以熟知的 WordCount 程序为例,它的 StreamGraph 如下图所示。
StreamNode
StreamNode 是 StreamGraph 中的节点,从 Transformation 转换而来,可以简单理解为一个 StreamNode 表示为一个算子;从逻辑上来说,StreamNode 在 StreamGraph 中存在实体和虚拟的 StreamNode。StreamNode 可以有多个输入,也可以有多个输出。 实体的 StreamNode 会最终变为物理的算子。虚拟的 StreamNode 会附着在 StreamEdge 上。 StreamEdge
StreamEdge 是 StreamGraph 中的边, 用来连接两个 StreamNode,一个 StreamNode 可以有多个出边、入边。StreamEdge 中包含了盘路输出、分区器、字段筛选输出等的信息。
作业图
JobGraph 的核心对象是 JobVertex、JobEdge 和 IntermediateDateSet。
JobVertex
经过算子融合优化后符合条件的多个 StreamNode 可能会融合在一起生成一个 JobVertex,即一个 JobVertex 包含一个或多个算子,JobVertex 的输入是 JobEdge,输出是 IntermediateDateSet。 JobEdge
JobEdge 是 JobGraph 中连接 IntermediateDateSet 和 JobVertex 的边,表示 JobGraph 中的一个数据流转通道,其上游数据源是 IntermediateDateSet,下游消费者是 JobVertex ,即数据通过 JobEdge 由 IntermediateDateSet 传递给目标 JobVertex 。 JobEdge 中的数据分发模式会直接影响执行时 Task 之间的数据连接关系,是点对点连接还是全连接。 IntermediateDateSet
中间数据集 IntermediateDataSet 是一种逻辑结构,用来表示 JobVertex 的输出,即该 JobVertex 中包含的算子会产生的数据集。不同的执行模式下,其对应的结果分区类型不同,决定了在执行时刻数据交换的模式。 IntermediateDataSet 的个数与该 JobVertex 对应的 StreamNode 的出边数量相同,可以是一个或者多个。
执行图
加入了并行度的概念,成为真正可调度的图结构。 生成了与 JobVertex 对应的 ExecutionJobVertex 和 ExecutionVertex,与IntermediateDataSet 对应的 IntermediateResult 和 IntermediateResultPartition 等。
生成的图如下图所示。
ExecutionGraph 的核心对象有 ExecutionJobVertex 、ExecutionVertex、IntermediateResult 、IntermediateResultPartition、ExecutionEdge 和 Execution。
ExecutionJobVertex
该对象和 JobGraph 中的 JobVertex 一一对应。该对象还包含一组 ExecutionVertex,数量与该 JobVertex 中所包含的 StreamNode 的并行度一致,假设 StreamNode 的并行度为3,那么 ExecutionJobVertex 也会包含 3个 ExecutionVertex。 ExecutionVertex
ExecutionJobVertex 中会对作业进行并行化处理,构造可以并行执行的实例,每一个并行执行的实例就是 ExecutionVertex。 构造 ExecutionVertex 的同时,也会构建 ExecutionVertex 的输出 IntermediateResult。 IntermediateResult
IntermediateResult 又叫中间结果集,该对象是个逻辑概念,表示 ExecutionJobVertex 的输出,和 JobVertex 中的 IntermediateDataSet 一一对应,同样,一个ExecutionJobVertex 可以有多个中间结果,取决于当前 JobVertex 有几个出边(JobEdge) 一个中间结果包含多个中间结果分区 IntermediateResultPartition,其个数等于该 JobVertex 的并发度,或者叫作算子的并行度。 IntermediateResultPartition
IntermediateResultPartition 又叫作中间结果分区,表示一个 ExecutionVertex 的输出结果,与 ExecutionEdge 相关联。 ExecutionEdge
表示 ExecutionVertex 的输入,连接到上游产生的 IntermediateResultPartition 。 Execution
ExecutionVertex 相当于每个 Task 的模板,在真正执行的时候,会将 ExecutionVertex 中的信息包装为一个 Execution,执行一个 ExecutionVertex 的一次尝试。JobManager 和 TaskManager 之间关于 Task 的部署和 Task 的执行状态的更新都是通过 ExecutionAttemptID 来标识实例的。在发生故障或者数据需要重算的情况下,ExecutionVertex 可能会有多个ExecutionAttemptID 。一个 Execution 通过 ExecutionAttemptID 来唯一标识。
总结
最后
—
这期文章就分享到这里,如果觉得不错,转发、在看、点赞安排起来吧。
你知道的越多,你的思维越开阔。我们下期再见。
往期回顾
如果对本文有疑问可以加作者微信直接交流。进技术交流群的可以拉你进群。