核心调用逻辑

接下来介绍具体的调用逻辑:

org.apache.flink.runtime.taskmanager.Task
org.apache.flink.streaming.runtime.tasks.StreamTask
org.apache.flink.streaming.runtime.tasks.OperatorChain
org.apache.flink.streaming.api.operators.StreamOperator
Method 是如何被调用的
public class StreamMap<IN, OUT> extends AbstractUdfStreamOperator<OUT, MapFunction<IN, OUT>>implements OneInputStreamOperator<IN, OUT> {private static final long serialVersionUID = 1L;public StreamMap(MapFunction<IN, OUT> mapper) {super(mapper);chainingStrategy = ChainingStrategy.ALWAYS;}@Overridepublic void processElement(StreamRecord<IN> element) throws Exception {output.collect(element.replace(userFunction.map(element.getValue())));}}
userFunction 即代表自定义的MapFunction , 其被processElement 方法内部调用, 而processElement 是被上面提到的StreamTask以OperatorChain 方式不断被调用。
StreamMap 继承了抽象的AbstractUdfStreamOperator 类, 该operator 包含了所有userFunction 的方法调用。(部分代码如下)
public abstract class AbstractUdfStreamOperator<OUT, F extends Function>extends AbstractStreamOperator<OUT> implements OutputTypeConfigurable<OUT> {private static final long serialVersionUID = 1L;/** The user function. */protected final F userFunction;@Overridepublic void setup(StreamTask<?, ?> containingTask,StreamConfig config,Output<StreamRecord<OUT>> output) {super.setup(containingTask, config, output);FunctionUtils.setFunctionRuntimeContext(userFunction, getRuntimeContext());}@Overridepublic void snapshotState(StateSnapshotContext context) throws Exception {super.snapshotState(context);StreamingFunctionUtils.snapshotFunctionState(context, getOperatorStateBackend(), userFunction);}@Overridepublic void initializeState(StateInitializationContext context) throws Exception {super.initializeState(context);StreamingFunctionUtils.restoreFunctionState(context, userFunction);}@Overridepublic void open() throws Exception {super.open();FunctionUtils.openFunction(userFunction, new Configuration());}@Overridepublic void close() throws Exception {super.close();functionsClosed = true;FunctionUtils.closeFunction(userFunction);}@Overridepublic void dispose() throws Exception {super.dispose();if (!functionsClosed) {functionsClosed = true;FunctionUtils.closeFunction(userFunction);}}// ------------------------------------------------------------------------// checkpointing and recovery// ------------------------------------------------------------------------@Overridepublic void notifyCheckpointComplete(long checkpointId) throws Exception {super.notifyCheckpointComplete(checkpointId);if (userFunction instanceof CheckpointListener) {((CheckpointListener) userFunction).notifyCheckpointComplete(checkpointId);}}@Overridepublic void notifyCheckpointAborted(long checkpointId) throws Exception {super.notifyCheckpointAborted(checkpointId);if (userFunction instanceof CheckpointListener) {((CheckpointListener) userFunction).notifyCheckpointAborted(checkpointId);}}}
AbstractUdfStreamOperator类, 通过这个类可以熟知其整体调用链路。
总结
🧐 分享、点赞、在看,给个3连击呗!👇
文章转载自伦少的博客,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。




