暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Flink 源码阅读笔记(1)- StreamGraph 的生成

伦少的博客 2023-08-26
201

本文为转载文章,原文地址:https://blog.jrwang.me/2019/flink-source-code-streamgraph/   作者:jrthe42

前言

欢迎关注我的公众号:伦少的博客

在研究学习hudi flink源码时发现实际上写Hudi的主要逻辑是在Hudi自定义的StreamOperator
SinkFunction
中实现,它们是通过DataStream
transform
addSink
调用实现,继续研究发现和Flink的Transformation
StreamOperator
有关。那么就需要了解StreamOperator
的调用执行逻辑,最后发现这需要了解Flink Task的的运行逻辑,知道Flink的Task或者Function是如何运行的。而这里的逻辑是比较复杂的,大概包含StreamGraph
JobGraph
ExecutionGraph
Physical Graph
(虚拟结构)的生成或者构建,还有JobManager
TaskManager
的启动,而JobManager
又包含ResourceManager
Dispatcher
JobMaster
, 这里涉及Java8异步编程如CompletableFuture
和基于Akka
RPC
通信,最后才是Task
的的部署和启动,StreamOperator
相关方法的调用最终是通过启动Task.run方法
StreamTask
中实现的。

我现在只需要了解主要的调用逻辑,暂时没有精力研究具体的每个步骤的详细源码,正好查阅相关资料时发现了几篇不错的文章,所以转载一下,先从StreamGraph
开始。

注意:本篇文章对应的Flink版本比较老了(1.7或1.8),但主要的逻辑一样,可以参考文章和新版Flink源码进行学习,以下为原文

在编写 Flink 的程序的时候,核心的要点是构造出数据处理的拓扑结构,即任务执行逻辑的 DAG。我们先来看一下 Flink 任务的拓扑在逻辑上是怎么保存的。

StreamExecutionEnvironment

StreamExecutionEnvironment
是 Flink 在流模式下任务执行的上下文,也是我们编写 Flink 程序的入口。根据具体的执行环境不同,StreamExecutionEnvironment
有不同的具体实现类,如 LocalStreamEnvironment
, RemoteStreamEnvironment
等。StreamExecutionEnvironment
也提供了用来配置默认并行度、Checkpointing 等机制的方法,这些配置主要都保存在 ExecutionConfig
CheckpointConfig
中。我们现在先只关注拓扑结构的产生。

通常一个 Flink 任务是按照下面的流程来编写处理逻辑的:

env.addSource(XXX)
    .map(XXX)
    .filter(XXX)
    .addSink(XXX)

添加数据源后获得 DataStream
, 之后通过不同的算子不停地在 DataStream
上实现转换过滤等逻辑,最终将结果输出到 DataSink
中。

StreamExecutionEnvironment
内部使用一个 List<StreamTransformation<?>> transformations
来保留生成 DataStream
的所有转换。

StreamTransformation

StreamTransformation
代表了生成 DataStream
的操作,每一个 DataStream
的底层都有对应的一个 StreamTransformation
。在 DataStream
上面通过 map
等算子不断进行转换,就得到了由 StreamTransformation
构成的图。当需要执行的时候,底层的这个图就会被转换成 StreamGraph

StreamTransformation
在运行时并不一定对应着一个物理转换操作,有一些操作只是逻辑层面上的,比如 split/select/partitioning 等。

每一个 StreamTransformation
都有一个关联的 Id,这个 Id 是全局递增的。除此以外,还有 uid, slotSharingGroup, parallelism 等信息。

StreamTransformation
有很多具体的子类,如SourceTransformation
OneInputStreamTransformation
TwoInputTransformation
SideOutputTransformation
SinkTransformation
等等,这些分别对应了DataStream
上的不同转换操作。
由于 StreamTransformation
中通常保留了其前向的 StreamTransformation
,即其输入,因此可以据此还原出 DAG 的拓扑结构。

// OneInputTransformation
public OneInputTransformation(
            StreamTransformation<IN> input,
            String name,
            OneInputStreamOperator<IN, OUT> operator,
            TypeInformation<OUT> outputType,
            int parallelism)
 
{
        super(name, outputType, parallelism);
        this.input = input;
        this.operator = operator;
    }

// TwoInputTransformation
public TwoInputTransformation(
            StreamTransformation<IN1> input1,
            StreamTransformation<IN2> input2,
            String name,
            TwoInputStreamOperator<IN1, IN2, OUT> operator,
            TypeInformation<OUT> outputType,
            int parallelism)
 
{
        super(name, outputType, parallelism);
        this.input1 = input1;
        this.input2 = input2;
        this.operator = operator;
    }

DataStream

一个 DataStream
就表征了由同一种类型元素构成的数据流。通过对 DataStream
应用 map/filter 等操作,可以将一个 DataStream
转换为另一个 DataStream
,这个转换的过程就是根据不同的操作生成不同的 StreamTransformation
,并将其加入 StreamExecutionEnvironment
transformations
列表中。

例如:

//构造 StreamTransformation
OneInputTransformation<T, R> resultTransform = new OneInputTransformation<>(
                this.transformation,
                operatorName,
                operator,
                outTypeInfo,
                environment.getParallelism());

@SuppressWarnings({ "unchecked""rawtypes" })
SingleOutputStreamOperator<R> returnStream = new SingleOutputStreamOperator(environment, resultTransform);
//加入到 StreamExecutionEnvironment 的列表中
getExecutionEnvironment().addOperator(resultTransform);

DataStream
的子类包括 SingleOutputStreamOperator
DataStreamSource KeyedStream
IterativeStream
, SplitStream
(已弃用)。这里要吐槽一下 SingleOutputStreamOperator
的这个类的命名,太容易和 StreamOperator
混淆了。StreamOperator
的介绍见下一小节。

除了 DataStream
及其子类以外,其它的表征数据流的类还有 ConnectedStreams
(两个流连接在一起)、 WindowedStream
AllWindowedStream
。这些数据流之间的转换可以参考 Flink 的官方文档。

StreamOperator

在操作 DataStream
的时候,比如 DataStream#map
等,会要求我们提供一个自定义的处理函数。那么这些信息时如何保存在 StreamTransformation
中的呢?这里就要引入一个新的接口 StreamOperator

StreamOperator
定义了对一个具体的算子的生命周期的管理,包括:

    //生命周期
    void setup(StreamTask<?, ?> containingTask, StreamConfig config, Output<StreamRecord<OUT>> output);

    void open() throws Exception;

    void close() throws Exception;

    @Override
    void dispose() throws Exception;

    //状态管理
    OperatorSnapshotFutures snapshotState(
        long checkpointId,
        long timestamp,
        CheckpointOptions checkpointOptions,
        CheckpointStreamFactory storageLocation)
 throws Exception
;

    void initializeState() throws Exception;

//其它方法暂时省略

StreamOperator
的两个子接口 OneInputStreamOperator
TwoInputStreamOperator
则提供了操作数据流中具体元素的方法,而 AbstractUdfStreamOperator
这个抽象子类则提供了自定义处理函数对应的算子的基本实现:

//OneInputStreamOperator
void processElement(StreamRecord<IN> element) throws Exception;
void processWatermark(Watermark mark) throws Exception;
void processLatencyMarker(LatencyMarker latencyMarker) throws Exception;

//TwoInputStreamOperator
    void processElement1(StreamRecord<IN1> element) throws Exception;
    void processElement2(StreamRecord<IN2> element) throws Exception;


//AbstractUdfStreamOperator 接受一个用户自定义的处理函数
public AbstractUdfStreamOperator(F userFunction) {
        this.userFunction = requireNonNull(userFunction);
        checkUdfCheckpointingPreconditions();
    }

至于具体到诸如 map/fliter 等操作对应的 StreamOperator
,基本都是在 AbstractUdfStreamOperator
的基础上实现的。以 StreamMap
为例:

public class StreamMap<INOUT>
        extends AbstractUdfStreamOperator<OUTMapFunction<INOUT>>
        implements OneInputStreamOperator<INOUT
{

    private static final long serialVersionUID = 1L;

    public StreamMap(MapFunction<IN, OUT> mapper) {
        super(mapper);
        chainingStrategy = ChainingStrategy.ALWAYS;
    }

    @Override
    public void processElement(StreamRecord<IN> element) throws Exception {
        output.collect(element.replace(userFunction.map(element.getValue())));
    }
}

由此,通过 DataStream
–> StreamTransformation
–> StreamOperator
这样的依赖关系,就可以完成 DataStream
的转换,并且保留数据流和应用在流上的算子之间的关系。

StreamGraph

StreamGraphGenerator
会基于 StreamExecutionEnvironment
transformations
列表来生成 StreamGraph

在遍历 List<StreamTransformation>
生成 StreamGraph
的时候,会递归调用StreamGraphGenerator#transform
方法。对于每一个 StreamTransformation
, 确保当前其上游已经完成转换。StreamTransformations
被转换为 StreamGraph
中的节点 StreamNode
,并为上下游节点添加边 StreamEdge

Collection<Integer> transformedIds;
        if (transform instanceof OneInputTransformation<?, ?>) {
            transformedIds = transformOneInputTransform((OneInputTransformation<?, ?>) transform);
        } else if (transform instanceof TwoInputTransformation<?, ?, ?>) {
            transformedIds = transformTwoInputTransform((TwoInputTransformation<?, ?, ?>) transform);
        } else if (transform instanceof SourceTransformation<?>) {
            transformedIds = transformSource((SourceTransformation<?>) transform);
        } else if (transform instanceof SinkTransformation<?>) {
            transformedIds = transformSink((SinkTransformation<?>) transform);
        } else if (transform instanceof UnionTransformation<?>) {
            transformedIds = transformUnion((UnionTransformation<?>) transform);
        } else if (transform instanceof SplitTransformation<?>) {
            transformedIds = transformSplit((SplitTransformation<?>) transform);
        } else if (transform instanceof SelectTransformation<?>) {
            transformedIds = transformSelect((SelectTransformation<?>) transform);
        } else if (transform instanceof FeedbackTransformation<?>) {
            transformedIds = transformFeedback((FeedbackTransformation<?>) transform);
        } else if (transform instanceof CoFeedbackTransformation<?>) {
            transformedIds = transformCoFeedback((CoFeedbackTransformation<?>) transform);
        } else if (transform instanceof PartitionTransformation<?>) {
            transformedIds = transformPartition((PartitionTransformation<?>) transform);
        } else if (transform instanceof SideOutputTransformation<?>) {
            transformedIds = transformSideOutput((SideOutputTransformation<?>) transform);
        } else {
            throw new IllegalStateException("Unknown transformation: " + transform);
        }

对于不同类型的 StreamTransformation
,分别调用对应的转换方法,以最典型的 transformOneInputTransform
为例:

private <IN, OUT> Collection<Integer> transformOneInputTransform(OneInputTransformation<IN, OUT> transform) {

        //首先确保上游节点完成转换
        Collection<Integer> inputIds = transform(transform.getInput());

        // the recursive call might have already transformed this
        // 由于是递归调用的,可能已经完成了转换
        if (alreadyTransformed.containsKey(transform)) {
            return alreadyTransformed.get(transform);
        }

        //确定资源共享组,用户如果没有指定,默认是default
        String slotSharingGroup = determineSlotSharingGroup(transform.getSlotSharingGroup(), inputIds);

        //向 StreamGraph 中添加 Operator, 这一步会生成对应的 StreamNode
        streamGraph.addOperator(transform.getId(),
                slotSharingGroup,
                transform.getCoLocationGroupKey(),
                transform.getOperator(),
                transform.getInputType(),
                transform.getOutputType(),
                transform.getName());

        if (transform.getStateKeySelector() != null) {
            TypeSerializer<?> keySerializer = transform.getStateKeyType().createSerializer(env.getConfig());
            streamGraph.setOneInputStateKey(transform.getId(), transform.getStateKeySelector(), keySerializer);
        }

        streamGraph.setParallelism(transform.getId(), transform.getParallelism());
        streamGraph.setMaxParallelism(transform.getId(), transform.getMaxParallelism());

        //依次连接到上游节点,创建 StreamEdge
        for (Integer inputId: inputIds) {
            streamGraph.addEdge(inputId, transform.getId(), 0);
        }

        return Collections.singleton(transform.getId());
    }

接着看一看 StreamGraph
中对应的添加节点和边的方法:

    protected StreamNode addNode(Integer vertexID,
        String slotSharingGroup,
        @Nullable String coLocationGroup,
        Class<? extends AbstractInvokable> vertexClass,
        StreamOperator<?> operatorObject,
        String operatorName)
 
{

        if (streamNodes.containsKey(vertexID)) {
            throw new RuntimeException("Duplicate vertexID " + vertexID);
        }

        StreamNode vertex = new StreamNode(environment,
            vertexID,
            slotSharingGroup,
            coLocationGroup,
            operatorObject,
            operatorName,
            new ArrayList<OutputSelector<?>>(),
            vertexClass);

        //创建 StreamNode,这里保存了 StreamOperator 和 vertexClass 信息
        streamNodes.put(vertexID, vertex);

        return vertex;
    }

StreamNode
中,保存了对应的 StreamOperator
(从 StreamTransformation
得到),并且还引入了变量 jobVertexClass
来表示该节点在 TaskManager
中运行时的实际任务类型。

private final Class<? extends AbstractInvokable> jobVertexClass;

AbstractInvokable
是所有可以在 TaskManager
中运行的任务的抽象基础类,包括流式任务和批任务。StreamTask
是所有流式任务的基础类,其具体的子类包括 SourceStreamTask
, OneInputStreamTask
, TwoInputStreamTask
等。

对于一些不包含物理转换操作的 StreamTransformation
,如 Partitioning, split/select, union,并不会生成 StreamNode
,而是生成一个带有特定属性的虚拟节点。当添加一条有虚拟节点指向下游节点的边时,会找到虚拟节点上游的物理节点,在两个物理节点之间添加边,并把虚拟转换操作的属性附着上去。

PartitionTansformation
为例, PartitionTansformation
KeyedStream
对应的转换:

//StreamGraphGenerator#transformPartition
private <T> Collection<Integer> transformPartition(PartitionTransformation<T> partition) {
        StreamTransformation<T> input = partition.getInput();
        List<Integer> resultIds = new ArrayList<>();

        //递归地转换上游节点
        Collection<Integer> transformedIds = transform(input);

        for (Integer transformedId: transformedIds) {
            int virtualId = StreamTransformation.getNewNodeId();
            //添加虚拟的 Partition 节点
            streamGraph.addVirtualPartitionNode(transformedId, virtualId, partition.getPartitioner());
            resultIds.add(virtualId);
        }

        return resultIds;
    }

// StreamGraph#addVirtualPartitionNode
public void addVirtualPartitionNode(Integer originalId, Integer virtualId, StreamPartitioner<?> partitioner) {

        if (virtualPartitionNodes.containsKey(virtualId)) {
            throw new IllegalStateException("Already has virtual partition node with id " + virtualId);
        }

        //添加一个虚拟节点,后续添加边的时候会连接到实际的物理节点
        virtualPartitionNodes.put(virtualId,
                new Tuple2<Integer, StreamPartitioner<?>>(originalId, partitioner));
    }

前面提到,在每一个物理节点的转换上,会调用 StreamGraph#addEdge
在输入节点和当前节点之间建立边的连接:

private void addEdgeInternal(Integer upStreamVertexID,
            Integer downStreamVertexID,
            int typeNumber,
            StreamPartitioner<?> partitioner,
            List<String> outputNames,
            OutputTag outputTag)
 
{

        //先判断是不是虚拟节点上的边,如果是,则找到虚拟节点上游对应的物理节点
        //在两个物理节点之间添加边,并把对应的 StreamPartitioner,或者 OutputTag 等补充信息添加到StreamEdge中
        if (virtualSideOutputNodes.containsKey(upStreamVertexID)) {
            ......
        } else if (virtualPartitionNodes.containsKey(upStreamVertexID)) {
            int virtualId = upStreamVertexID;
            upStreamVertexID = virtualPartitionNodes.get(virtualId).f0;
            if (partitioner == null) {
                partitioner = virtualPartitionNodes.get(virtualId).f1;
            }
            addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, outputNames, outputTag);
        } else {

            //两个物理节点
            StreamNode upstreamNode = getStreamNode(upStreamVertexID);
            StreamNode downstreamNode = getStreamNode(downStreamVertexID);

            // If no partitioner was specified and the parallelism of upstream and downstream
            // operator matches use forward partitioning, use rebalance otherwise.
            if (partitioner == null && upstreamNode.getParallelism() == downstreamNode.getParallelism()) {
                partitioner = new ForwardPartitioner<Object>();
            } else if (partitioner == null) {
                partitioner = new RebalancePartitioner<Object>();
            }

            if (partitioner instanceof ForwardPartitioner) {
                if (upstreamNode.getParallelism() != downstreamNode.getParallelism()) {
                    throw new UnsupportedOperationException("Forward partitioning does not allow " +
                            "change of parallelism. Upstream operation: " + upstreamNode + " parallelism: " + upstreamNode.getParallelism() +
                            ", downstream operation: " + downstreamNode + " parallelism: " + downstreamNode.getParallelism() +
                            " You must use another partitioning strategy, such as broadcast, rebalance, shuffle or global.");
                }
            }

            //创建 StreamEdge,保留了 StreamPartitioner 等属性
            StreamEdge edge = new StreamEdge(upstreamNode, downstreamNode, typeNumber, outputNames, partitioner, outputTag);

            //分别将StreamEdge添加到上游节点和下游节点
            getStreamNode(edge.getSourceId()).addOutEdge(edge);
            getStreamNode(edge.getTargetId()).addInEdge(edge);
        }
    }

这样通过 StreamNode
SteamEdge
,就得到了 DAG 中的所有节点和边,以及它们之间的连接关系,拓扑结构也就建立了。

小结

本文简单分析了从 DataStream API
StramGraph
的过程。 StreamGraph
是 Flink 任务最接近用户逻辑的 DAG 表示,后面到具体执行的时候还会进行一系列转换,我们在后续的文章中再逐一加以分析。

🧐 分享、点赞、在看,给个3连击👇

最后修改时间:2023-08-29 10:37:23
文章转载自伦少的博客,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论