暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Flink中: 你的Function是如何被执行的

伦少的博客 2023-08-26
173
在Flink编程中,不管你是使用DataStream api还是 Table/SQL ,接触最多的就是UserFunction , 比说MapFunction、ScalarFunction, 在这些Function 里面可以自定义用户的业务处理逻辑,但是这些Function是如何被调用的呢?本文主要介绍Function 被调用的流程以及对应的方法如何被调用的。

核心调用逻辑


当我们编写完成一个Flink-Job 就会将代码打包成为jar提交到集群中去,当整个资源申请、任务调度完成之后就开始执行这个job,从source到transform 到最后sink 都是在TaskManager 资源节点中执行。Flink-Job 会被划分为一个个Task(整个任务中的一部分处理逻辑)节点, 每一个Task节点都在一个Thread中执行,在这个Thread中会不断地调用UserFunction的相应方法(如上图)。这个是一个大概的处理流程, 让用户只需要关心自身业务处理逻辑,无须关心网络通信、数据传输等流程。


接下来介绍具体的调用逻辑:

当JobMaster 向TaskManager 提交Task(整个任务中的一部分处理逻辑)时,会携带该Task的相关信息,   之后:
  1. org.apache.flink.runtime.taskmanager.Task
创建一个Task对象,代表了并发Task中一个subTask, 里面包含了一个Thread对象,也就是提交的任务会在该Thread中执行。
  1. org.apache.flink.streaming.runtime.tasks.StreamTask
在Task中会创建StreamTask对象, 在StreamTask中完成任务的初始化工作(配置、operatorchain构建、状态恢复等)之后,执行数据处理流程。
  1. org.apache.flink.streaming.runtime.tasks.OperatorChain
Flink优化中有一环是operator-chain, 即将满足一定规则的operator链在一起,他们之前以函数调用的方式执行,减少(网络)数据传输,OperatorChain就代表了多个Operator。
  1. org.apache.flink.streaming.api.operators.StreamOperator
StreamOperator 代表了具体执行的某一个Operator, 每一个UserFunction都会有对应的StreamOperator, 例如MapFunction对应 StreamMap、KeyedProcessFunction 对应KeyedProcessOperator, 也就是在这些Operator中完成对应Function的调用。


Method 是如何被调用的


我们通常定义一个Function , 实现其相关的方法,例如MapFunction 实现map方法、WindowFunction 实现apply方法、KeyedProcessFunction 实现open/processElement/onTimer 方法,如果你的Function 还实现了CheckpointedFunction/CheckpointListener接口, 那么还得实现对应的initializeState、snapshotState、notifyCheckpointComplete方法等等。这些所有的方法都由对应的Operator调用, 下面以MapFunction 对应的StreamMap 这个operator 为例理解Function的调用。
    public class StreamMap<IN, OUT> extends AbstractUdfStreamOperator<OUT, MapFunction<IN, OUT>>
    implements OneInputStreamOperator<IN, OUT> {


    private static final long serialVersionUID = 1L;


    public StreamMap(MapFunction<IN, OUT> mapper) {
    super(mapper);
    chainingStrategy = ChainingStrategy.ALWAYS;
    }


    @Override
    public void processElement(StreamRecord<IN> element) throws Exception {
    output.collect(element.replace(userFunction.map(element.getValue())));
    }
    }


    • userFunction 即代表自定义的MapFunction , 其被processElement 方法内部调用, 而processElement 是被上面提到的StreamTask以OperatorChain 方式不断被调用。

    • StreamMap 继承了抽象的AbstractUdfStreamOperator 类, 该operator  包含了所有userFunction 的方法调用。(部分代码如下)

      

      public abstract class AbstractUdfStreamOperator<OUT, F extends Function>
      extends AbstractStreamOperator<OUT> implements OutputTypeConfigurable<OUT> {


      private static final long serialVersionUID = 1L;


      /** The user function. */
      protected final F userFunction;




      @Override
      public void setup(
      StreamTask<?, ?> containingTask,
      StreamConfig config,
      Output<StreamRecord<OUT>> output) {
      super.setup(containingTask, config, output);
      FunctionUtils.setFunctionRuntimeContext(userFunction, getRuntimeContext());
      }


      @Override
      public void snapshotState(StateSnapshotContext context) throws Exception {
      super.snapshotState(context);
      StreamingFunctionUtils.snapshotFunctionState(
      context, getOperatorStateBackend(), userFunction);
      }


      @Override
      public void initializeState(StateInitializationContext context) throws Exception {
      super.initializeState(context);
      StreamingFunctionUtils.restoreFunctionState(context, userFunction);
      }


      @Override
      public void open() throws Exception {
      super.open();
      FunctionUtils.openFunction(userFunction, new Configuration());
      }


      @Override
      public void close() throws Exception {
      super.close();
      functionsClosed = true;
      FunctionUtils.closeFunction(userFunction);
      }


      @Override
      public void dispose() throws Exception {
      super.dispose();
      if (!functionsClosed) {
      functionsClosed = true;
      FunctionUtils.closeFunction(userFunction);
      }
      }


      // ------------------------------------------------------------------------
      // checkpointing and recovery
      // ------------------------------------------------------------------------


      @Override
      public void notifyCheckpointComplete(long checkpointId) throws Exception {
      super.notifyCheckpointComplete(checkpointId);


      if (userFunction instanceof CheckpointListener) {
      ((CheckpointListener) userFunction).notifyCheckpointComplete(checkpointId);
      }
      }


      @Override
      public void notifyCheckpointAborted(long checkpointId) throws Exception {
      super.notifyCheckpointAborted(checkpointId);


      if (userFunction instanceof CheckpointListener) {
      ((CheckpointListener) userFunction).notifyCheckpointAborted(checkpointId);
      }
      }
      }


      可以看出对于UserFunction 中method的调用核心点就在operator,每个不同的UserFunction 会对应不同的operator, 但是都会继承这个抽象的

      AbstractUdfStreamOperator类, 通过这个类可以熟知其整体调用链路。

       


      总结

      本文主要以调用者的视角窥探UserFunction 的调用流程以及具体method 调用逻辑。

      🧐 分享、点赞、在看,给个3连击👇

      文章转载自伦少的博客,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

      评论