生产中是否有遇到过:
调大 Flink 任务并行度,但任务的整体吞吐量几乎没有提升 Flink 任务整体 CPU 利用率特别低,如何提高资源利用率呢?
这篇文章主要解决生产实践过程中遇到的这两个问题。文中会先给出结论,方便读者知道这篇文章具体会涉及到哪些知识。如果对相关结论完全理解的,就没必要详细阅读本文了。文中会涉及到 Flink KeyGroup 和 Rescale 相关的概念,对 KeyGroup 和 Rescale 不熟悉的同学强烈建议先阅读《 从 KeyGroup 到 Rescale》。
1、结论
提前预估业务发展,为算子设计合理的 maxParallelism keyBy 之后的算子,如果并发较大,建议设置并行度为 2 的整数次幂,例如:128、256、512 调大并行度一定能提高处理能力吗?不一定 小并发任务的并行度不一定需要设置成 2 的整数次幂 大并发任务如果没有 KeyBy,并行度也无需设置为 2 的整数次幂 有 KeyBy 的场景,2 的整数次幂浪费资源的问题如何解决? 调大 KeyGroup 数量,控制好每个 TM 上分配 KeyGroup 的数量 提高 CPU 利用率的通用方案:调整 slot 与 CPU 的映射关系
2、任务介绍
2.1 DAG 图
4→ 3 → 2 之间的分区策略都是 keyBy,优化之前 4、3、2 三个算子的并发都为 500

2.2 任务特征
存在多个 keyBy,图中的 Hash 连接表示上下游两个算子之前是 keyBy 复杂算子并发都为 500
2.3 异常现象
3-3.1 算子 Checkpoint 一般会持续将近 10 分钟,且每次有 488 个 subtask Checkpoint 比较快,12 个 subtask Checkpoint 较慢。
2-2.1 算子每次都有几个 subtask 的 Checkpoint 做不完,最后导致作业的 Checkpoint 超时。
该任务从 3→ 2 是按照用户 id 进行 keyBy,理论来讲按照用户分组每个 subtask 的数据量会非常均匀,但是有少量几个 subtask 处理的数据量比其他的 subtask 处理的数据量多一倍。
3、分析原因
3.1 为什么 2-2.1 算子的 subtask 数据会不均匀?
Flink keyBy 之后的算子会按照 KeyGroup 为单位对数据进行划分,KeyGroup 的数量为算子最大并行度的数量。
KeyGroup 的数量永远为 2 的整数次幂,例如:128、256、512 等。
KeyGroup 和 MaxParallelism 的相关概念和原理参考:从 KeyGroup 到 Rescale
每个 KeyGroup 会负责一部分的 key,每个 subtask 会处理至少一个 KeyGroup 的数据。
用户当前的 KeyGroup 数量为 512,所以 Flink 把所有的 Key 分为了 512 组,然后这 512 组数据被运行在 500 个 subtask 上。
所以就出现了:
有 488 个 subtask,每个 subtask 处理 1 个 KeyGroup 的数据 有 12 个 subtask,每个 subtask 处理 2 个 KeyGroup 的数据
3-3.1 和 2-2.1 算子都是 KeyBy 之后的算子,且并行度都是 500,所以存在数据不均匀的问题。
从现象来看,KeyGroup 的数量应该是 512 而不是 1024,说明这个任务之前的并发可能是 200 左右,所以对应的 KeyGroup 数量是 512。且任务每次从 Checkpoint 恢复,所以 KeyGroup 数量就被定死在 512 了。也就是《 从 KeyGroup 到 Rescale》文中提到的,并发不能再扩大了。
3.2 调大并行度一定能提高处理能力吗?
对于该任务,并行度 300 和 500 并没有本质区别。换言之:该任务的并行度从 256 调节成 511 也不一定能提高性能。
如果并行度设置为 256,那么每个 subtask 将会处理 2 个 KeyGroup 的数据。如果并行度设置为 511,那么将会有 510 个 subtask 处理 1 个 KeyGroup 的数据,剩余的 1 个 subtask 处理 2 个 KeyGroup 的数据。最后处理 2 个 KeyGroup 数据的 subtask 将会成为整个任务的瓶颈。
木桶效应原理:剩余的 510 个 subtask 虽然分配了同样的资源,但也会被闲置,资源被浪费。
结论:
某些场景下,并行度设置为 256 和 511 并不会影响系统整体的吞吐量。
上述两个问题的解决方案:
提前预估业务发展,为算子设计合理的 maxParallelism keyBy 之后的算子,如果并发较大,建议设置并行度为 2 的整数次幂,例如:128、256、512
4、 扩展问题
4.1 小并发任务的并行度也需要设置为 2 的整数次幂吗?
不是很需要,Flink 默认最小的 KeyGroup 数量是 128。假设某个算子有 10 个并行度,那么会有 8 个 subtask 处理 13 个 KeyGroup 的数据,剩余 2 个 subtask 处理 12 个 KeyGroup 的数据。
所以各个 subtask 处理的数据量仅差了十二分之一,数据量相差不大,理论来讲是可以接受的。
4.2 任何大并发任务都需要将算子的并行度设置为 2 的整数次幂吗?
不一定需要,假设任务没有 keyBy 就不需要考虑 KeyGroup 的概念。
示例一:Source 读到数据后面跟着 map 和 Sink,这样的 Flink 任务根本没有 shuffle,所以不需要考虑 KeyGroup 的概念。
只要 Source 端读到的数据足够均匀,则每个 subtask 处理的数据就是均匀的。
示例二:算子之间增加了 rebalance 或 rescale 之前的算子打散数据,则存在 shuffle。
此时也不需要考虑 KeyGroup 的问题,因为仅仅是 KeyBy 后的算子才会有 KeyGroup 的概念。
4.3 大任务有 KeyBy,且不想设置并行度为 2 的整数次幂怎么办?
确实有这样的场景,512 并行度不够,1024 又过于浪费,怎么办?
可以手动设置算子的 MaxParallelism,即 KeyGroup 的数量。假设将其设置为 2048,即 2048 个 KeyGroup。2048 / 3 = 682.6
,所以设置并行度为 683,每个 subtask 负责 3 的 KeyGroup 的数据,也是可以的。通过该思路无需将并行度从 512 提升到 1024,只需要提升到 683 即可满足需求。
还有一种通用方案可以解决该问题,具体阅读下一节。
5、提高 CPU 利用率的通用方案:调整 slot 与 CPU 的映射关系
默认情况下,slot 与 CPU core 是一一映射的。Flink on yarn 模式,假设 TM 中分配 4 个 slot,就会向 yarn 申请启动 Container,每个 Container 需要 4 个 core。
假设现在并行度必须从 512 调节成 1024(例如 KeyGroup 限制),但实际上 768 个并行度就已经能够满足业务要求的吞吐量。此时可以将并行度调节成 1024,同时将 TM 对应的 slot 个数调成 4,CPU 个数调成 3。即:每个 TM 内可以运行 4 个 slot,但只消耗 3 个 CPU core。原来需要使用 1024 个 CPU,经过上述设置只会用到 768 个 CPU。
上述例子是压缩CPU 个数为原来四分之三的案例,其他参数也都是可以的,例如 3 个 slot 使用 2 个 core 等。具体可以通过参数 yarn.containers.vcores
指定 TM 使用的 CPU core 个数。
其他应用场景
其实不只是上述场景需要指定 TM 使用 cpu 的个数,在其他场景,可能也有需要指定 cpu 个数的场景。例如:IO 密集型的任务,CPU 使用率一般会比较低,笔者之前写了一篇《Flink 单并行度内使用多线程来提高任务的整体性能》 ,主要讲述了 Flink 内使用多线程来提高 CPU 使用率,从而提升吞吐。
其实有个更简单的思路,代码完全不用改动:如果任务 IO 操作比较重,CPU 利用率仅有 10%,那么可以一个 TM 内运行 8 个 slot,但只申请一个 core。这样 8 个并行度都会使用这一个 core,理论来讲可以将单个 CPU 使用率用到 80%。这样设置的话,并行度个数不变,CPU 使用仅变成原来的八分之一,提高了集群资源利用率。
6、总结
文中通过分析案例得出 KeyGroup 分配策略可能导致数据在所有 subtask 上分布不均匀,根本原因是 KeyGroup 的数量永远是 2 的整数次幂。并分析原理讲述了:
为什么调大并行度不一定能提高任务的吞吐能力?
哪些场景并不需要将并行度设置为 2 的整数次幂
KeyGroup 场景如果不想设置并行度为 2 的整数次幂怎么办
最后给出了一个提高 CPU 利用率的通用方案:调整 slot 与 CPU 的映射关系。