暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

万字长文 | 透过现象看本质 | 从WordCount 到Flink Streaming API 源码详细解读

大数据技能圈 2023-12-20
34

上篇文章我们已经知道了如何去编译修改Flink源码:

源码剖析 | Flink源码修改编译保姆级教程-->

今天我们通过Flink源码自带的 Streaming API 下面的WordCount案例来学习Flink Streaming API 源码是如何运行的,先上WordCount代码

    import org.apache.flink.api.common.functions.FlatMapFunction;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.examples.wordcount.util.CLI;
    import org.apache.flink.streaming.examples.wordcount.util.WordCountData;
    import org.apache.flink.util.Collector;


    import java.time.Duration;
    public class WordCount {
    public static void main(String[] args) throws Exception {
    final CLI params = CLI.fromArgs(args);


    *创建执行环境。这是构建Flink应用程序的主要入口点。*/
    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();


    *通过将运行时模式设置为AUTOMATIC,如果所有数据源都是有界的,Flink将选择BATCH,否则选择STREAMING*/
            env.setRuntimeMode(params.getExecutionMode());


    DataStream<String> text;
    *创建一个新的文件源,将从给定的一组目录中读取文件。每个文件将被处理为纯文本,并根据换行符进行分割。*/
    text = env.fromElements(WordCountData.WORDS).name("in-memory-input");
    DataStream<Tuple2<String, Integer>> counts =
    *从源读取的文本行将使用用户定义的函数拆分为单词。下面实现的分词器将输出每个单词作为包含(单词,1)的二元组。*/
    text.flatMap(new Tokenizer())
    .name("tokenizer")
    *keyBy根据字段 0位置,即单词,对元组进行分组。使用keyBy允许在按键执行聚合和其他有状态的数据转换。这类似于SQL查询中的GROUP BY子句。*/
    .keyBy(value -> value.f0)
    *对于每个键,我们对字段 "1",即计数,执行简单的求和操作。如果输入数据流是有界的,sum将为每个单词输出最终计数。
    如果它是无界的,它将在流中每次看到每个单词的新实例时连续输出更新。*/
    .sum(1)
    .name("counter");


    counts.print().name("print-sink");


    *Apache Flink应用程序是懒惰执行的。调用execute会提交作业并开始处理*/
    env.execute("WordCount");
    }




    *实现字符串分词器,将句子拆分为单词,作为用户定义的FlatMapFunction。该函数接受一行(String),
    并将其拆分为多个形如 "(word,1)" 的二元组(Tuple2<String, Integer>)。*/
    public static final class Tokenizer
    implements FlatMapFunction<String, Tuple2<String, Integer>> {


    @Override
    public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
    normalize and split the line
    String[] tokens = value.toLowerCase().split("\\W+");


    emit the pairs
    for (String token : tokens) {
    if (token.length() > 0) {
    out.collect(new Tuple2<>(token, 1));
    }
    }
    }
    }
    }






    /*提供WordCount示例程序使用的默认数据集。如果未向程序提供参数,则使用默认数据集。*/
    public class WordCountData {


    public static final String[] WORDS =
    new String[] {
    "To be, or not to be,--that is the question:--",
    "Whether 'tis nobler in the mind to suffer",
    "The slings and arrows of outrageous fortune",
    "Or to take arms against a sea of troubles,",
    "And by opposing end them?--To die,--to sleep,--",
    "No more; and by a sleep to say we end",
    "The heartache, and the thousand natural shocks",
    "That flesh is heir to,--'tis a consummation",
    "Devoutly to be wish'd. To die,--to sleep;--",
    "To sleep! perchance to dream:--ay, there's the rub;",
    "For in that sleep of death what dreams may come,",
    "When we have shuffled off this mortal coil,",
    "Must give us pause: there's the respect",
    "That makes calamity of so long life;",
    "For who would bear the whips and scorns of time,",
    "The oppressor's wrong, the proud man's contumely,",
    "The pangs of despis'd love, the law's delay,",
    "The insolence of office, and the spurns",
    "That patient merit of the unworthy takes,",
    "When he himself might his quietus make",
    "With a bare bodkin? who would these fardels bear,",
    "To grunt and sweat under a weary life,",
    "But that the dread of something after death,--",
    "The undiscover'd country, from whose bourn",
    "No traveller returns,--puzzles the will,",
    "And makes us rather bear those ills we have",
    "Than fly to others that we know not of?",
    "Thus conscience does make cowards of us all;",
    "And thus the native hue of resolution",
    "Is sicklied o'er with the pale cast of thought;",
    "And enterprises of great pith and moment,",
    "With this regard, their currents turn awry,",
    "And lose the name of action.--Soft you now!",
    "The fair Ophelia!--Nymph, in thy orisons",
    "Be all my sins remember'd."
    };
    }


    我们通过代码,一步步的进行解析

    01

    获取执行环境

      /*创建执行环境。这是构建Flink应用程序的主要入口点。*/
      final StreamExecutionEnvironment env =
      StreamExecutionEnvironment.getExecutionEnvironment();

      ctrl + 鼠标点击 getExecutionEnvironment()方法,将进入位于flink-streaming-java下面的org.apache.flink.streaming.api.environment包下面的StreamExecutionEnvironment类中

      我们看getExecutionEnvironment()方法,可以看到它调用了getExecutionEnvironment(new Configuration())方法
        /**
        * 创建一个表示程序当前执行上下文的执行环境。如果程序是独立调用的,该方法将返回一个本地执行环境,
        * 就像{@link #createLocalEnvironment()}方法返回的那样。
        *
        * @return 在程序执行的上下文中的执行环境。
        */
        public static StreamExecutionEnvironment getExecutionEnvironment() {
        return getExecutionEnvironment(new Configuration());
        }

        继续点击getExecutionEnvironment(new Configuration())方法。

          /**
          * 创建一个表示程序当前执行上下文的执行环境。如果程序是独立调用的,该方法将返回一个本地执行环境,
          * 就像通过{@link #createLocalEnvironment(Configuration)}返回的那样。
          *
          * 当从命令行执行时,给定的配置将堆叠在全局配置之上,全局配置来自于{@code flink-conf.yaml},可能覆盖重复的选项。
          *
          * @param configuration 使用的实例化环境的配置。
          * @return 程序执行上下文中的执行环境。
          */
          public static StreamExecutionEnvironment getExecutionEnvironment(Configuration configuration) {
          return Utils.resolveFactory(threadLocalContextEnvironmentFactory, contextEnvironmentFactory)
          .map(factory -> factory.createExecutionEnvironment(configuration))
          .orElseGet(() -> StreamExecutionEnvironment.createLocalEnvironment(configuration));
          }
          这段代码的目的是根据给定的配置对象,选择一个适当的工厂对象来创建执行环境。如果存在线程本地的工厂对象 (threadLocalContextEnvironmentFactory),则使用它,否则使用全局的工厂对象 (contextEnvironmentFactory)。如果两者都不存在,则创建一个本地执行环境。 这里是两者都不存在,需要创建一个本地环境。点击
             StreamExecutionEnvironment.createLocalEnvironment(configuration))
            可以看到在这个方法中创建了一个 LocalStreamEnvironment对象并返回。

            02

            设置运行模式

              /*通过将运行时模式设置为AUTOMATIC,如果所有数据源都是有界的,Flink将选择BATCH,否则选择STREAMING*/
              env.setRuntimeMode(params.getExecutionMode());

              点击 setRuntimeMode(params.getExecutionMode()) 方法 可以看到调用了configuration.set(ExecutionOptions.RUNTIME_MODE, executionMode)方法,然后返回 StreamExecutionEnvironment 对象
                /**
                * 设置应用程序的运行时执行模式(参见{@link RuntimeExecutionMode})。这相当于在应用程序的配置文件中设置{@code execution.runtime-mode}。
                *
                * 我们建议用户不要使用这个方法,而是在提交应用程序时使用命令行设置{@code execution.runtime-mode}。
                * 保持应用程序代码无需配置可以提供更大的灵活性,因为相同的应用程序可以在任何执行模式下执行。
                *
                * @param executionMode 期望的执行模式。
                * @return 您的应用程序的执行环境。
                */
                @PublicEvolving
                public StreamExecutionEnvironment setRuntimeMode(final RuntimeExecutionMode executionMode) {
                checkNotNull(executionMode);
                configuration.set(ExecutionOptions.RUNTIME_MODE, executionMode);
                return this;
                }

                点击 ExecutionOptions.RUNTIME_MODE 参数,可以看到  RuntimeExecutionMode 的 默认值是  STREAMING

                  public static final ConfigOption<RuntimeExecutionMode> RUNTIME_MODE =
                  ConfigOptions.key("execution.runtime-mode")
                  .enumType(RuntimeExecutionMode.class)
                  .defaultValue(RuntimeExecutionMode.STREAMING)
                  .withDescription(
                  "Runtime execution mode of DataStream programs. Among other things, "
                  + "this controls task scheduling, network shuffle behavior, and time semantics.");

                  03

                  读取文件源

                    /*创建一个新的文件源,将从给定的一组目录中读取文件。每个文件将被处理为纯文本,并根据换行符进行分割。*/
                    text = env.fromElements(WordCountData.WORDS).name("in-memory-input");

                    点击 fromElements(WordCountData.WORDS)  方法

                      /**
                      * 创建一个包含给定元素的新数据流。这些元素必须是相同类型的,例如,都是{@link String}或{@link Integer}。
                      *
                      * 框架将尝试从元素中确定确切的类型。对于通用元素,
                      * 可能需要通过{@link #fromCollection(java.util.Collection, org.apache.flink.api.common.typeinfo.TypeInformation)}手动提供类型信息。
                      *
                      * 请注意,此操作将导致非并行数据流源,即并行度为1的数据流源。
                      *
                      * @param data 从中创建数据流的元素数组。
                      * @param <OUT> 返回的数据流的类型。
                      * @return 表示给定元素数组的数据流。
                      */
                      @SafeVarargs
                      public final <OUT> DataStreamSource<OUT> fromElements(OUT... data) {
                      if (data.length == 0) {
                      throw new IllegalArgumentException(
                      "fromElements needs at least one element as argument");
                      }


                      TypeInformation<OUT> typeInfo;
                      try {
                      typeInfo = TypeExtractor.getForObject(data[0]);
                      } catch (Exception e) {
                      throw new RuntimeException(
                      "Could not create TypeInformation for type "
                      + data[0].getClass().getName()
                      + "; please specify the TypeInformation manually via "
                      + "StreamExecutionEnvironment#fromElements(Collection, TypeInformation)",
                      e);
                      }
                      return fromCollection(Arrays.asList(data), typeInfo);
                      }
                      里面调用获取了数据类型,并调用了  fromCollection(Arrays.asList(data), typeInfo) 方法  在这里将 Collection<OUT> data 转换成 SourceFunction<OUT> 
                        /**
                        * 从给定的非空集合创建数据流。
                        * 请注意,此操作将导致非并行数据流源,即并行度为1的数据流源。
                        *
                        * @param data 从中创建数据流的元素集合。
                        * @param typeInfo 生成的数据流的TypeInformation。
                        * @param <OUT> 返回的数据流的类型
                        * @return 表示给定集合的数据流
                        */
                        public <OUT> DataStreamSource<OUT> fromCollection(
                        Collection<OUT> data, TypeInformation<OUT> typeInfo) {
                        Preconditions.checkNotNull(data, "Collection must not be null");


                        must not have null elements and mixed elements
                        FromElementsFunction.checkCollection(data, typeInfo.getTypeClass());


                        SourceFunction<OUT> function = new FromElementsFunction<>(data);
                        return addSource(function, "Collection Source", typeInfo, Boundedness.BOUNDED)
                        .setParallelism(1);
                        }


                        public FromElementsFunction(Iterable<T> elements) {
                        this.serializer = null;
                        this.elements = elements;
                        this.numElements =
                        elements instanceof Collection
                        ? ((Collection<T>) elements).size()
                        : (int) IterableUtils.toStream(elements).count();
                        checkIterable(elements, Object.class);
                        }
                        最后调用 addSource(function, "Collection Source", typeInfo, Boundedness.BOUNDED) 方法,该方法将数据类型和数据组合后返回了  DataStreamSource 对象

                        04

                        编写数据处理逻辑

                          DataStream<Tuple2<String, Integer>> counts =
                          /*从源读取的文本行将使用用户定义的函数拆分为单词。下面实现的分词器将输出每个单词作为包含(单词,1)的二元组。*/
                          text.flatMap(new Tokenizer())
                          .name("tokenizer")
                          /*keyBy根据字段 0位置,即单词,对元组进行分组。使用keyBy允许在按键执行聚合和其他有状态的数据转换。这类似于SQL查询中的GROUP BY子句。*/
                          .keyBy(value -> value.f0)
                          /*对于每个键,我们对字段 "1",即计数,执行简单的求和操作。如果输入数据流是有界的,sum将为每个单词输出最终计数。
                          如果它是无界的,它将在流中每次看到每个单词的新实例时连续输出更新。*/
                          .sum(1)
                          .name("counter");


                          counts.print().name("print-sink");
                          这里需要注意的是里面的所有算子都是基于DataStream类进行流转的,所以都会被封装成 transformations
                            @Public
                            public class DataStream<T> {


                            protected final StreamExecutionEnvironment environment;


                            protected final Transformation<T> transformation;


                            /**
                            * Create a new {@link DataStream} in the given execution environment with partitioning set to
                            * forward by default.
                            *
                            * @param environment The StreamExecutionEnvironment
                            */
                            public DataStream(StreamExecutionEnvironment environment, Transformation<T> transformation) {
                            this.environment =
                            Preconditions.checkNotNull(environment, "Execution Environment must not be null.");
                            this.transformation =
                            Preconditions.checkNotNull(
                            transformation, "Stream Transformation must not be null.");
                                }
                            }    

                            05

                            执行代码

                              /*Apache Flink应用程序是懒惰执行的。调用execute会提交作业并开始处理*/
                              env.execute("WordCount");
                              点击 env.execute("WordCount");   可以看到 transformations 被转换成了 StreamGraph
                                /**
                                * 触发程序执行。环境将执行所有导致"sink"操作的程序部分。"Sink"操作包括打印结果或将其转发到消息队列等操作。程序执行将被记录并显示出提供的名称。
                                *
                                * @param jobName 作业的期望名称。
                                * @return 作业执行的结果,包含经过的时间和累加器。
                                * @throws Exception 在作业执行期间发生的。
                                */
                                public JobExecutionResult execute(String jobName) throws Exception {
                                final List<Transformation<?>> originalTransformations = new ArrayList<>(transformations);
                                StreamGraph streamGraph = getStreamGraph();
                                if (jobName != null) {
                                streamGraph.setJobName(jobName);
                                }


                                try {
                                return execute(streamGraph);
                                } catch (Throwable t) {
                                Optional<ClusterDatasetCorruptedException> clusterDatasetCorruptedException =
                                ExceptionUtils.findThrowable(t, ClusterDatasetCorruptedException.class);
                                if (!clusterDatasetCorruptedException.isPresent()) {
                                throw t;
                                }


                                // Retry without cache if it is caused by corrupted cluster dataset.
                                invalidateCacheTransformations(originalTransformations);
                                streamGraph = getStreamGraph(originalTransformations);
                                return execute(streamGraph);
                                }
                                }
                                /**
                                * 获取流式作业的{@link StreamGraph}的方法。此调用会清除先前注册的{@link Transformation 转换}。
                                *
                                * @return 表示转换的流图。
                                */
                                @Internal
                                public StreamGraph getStreamGraph() {
                                return getStreamGraph(true);
                                }


                                    /**
                                * 获取流式作业的{@link StreamGraph}的方法,可以选择清除先前注册的{@link Transformation 转换}。
                                * 清除转换允许,在调用{@link #execute()}多次时,例如,不重新执行相同的操作。
                                *
                                * @param clearTransformations 是否清除先前注册的转换。
                                * @return 表示转换的流图。
                                */
                                @Internal
                                public StreamGraph getStreamGraph(boolean clearTransformations) {
                                final StreamGraph streamGraph = getStreamGraph(transformations);
                                if (clearTransformations) {
                                transformations.clear();
                                }
                                return streamGraph;
                                }


                                private StreamGraph getStreamGraph(List<Transformation<?>> transformations) {
                                synchronizeClusterDatasetStatus();
                                return getStreamGraphGenerator(transformations).generate();
                                }
                                      
                                private StreamGraphGenerator getStreamGraphGenerator(List<Transformation<?>> transformations) {
                                if (transformations.size() <= 0) {
                                throw new IllegalStateException(
                                "No operators defined in streaming topology. Cannot execute.");
                                }


                                // We copy the transformation so that newly added transformations cannot intervene with the
                                // stream graph generation.
                                return new StreamGraphGenerator(
                                new ArrayList<>(transformations), config, checkpointCfg, configuration)
                                .setStateBackend(defaultStateBackend)
                                .setChangelogStateBackendEnabled(changelogStateBackendEnabled)
                                .setSavepointDir(defaultSavepointDirectory)
                                .setChaining(isChainingEnabled)
                                .setChainingOfOperatorsWithDifferentMaxParallelism(
                                isChainingOfOperatorsWithDifferentMaxParallelismEnabled)
                                .setUserArtifacts(cacheFile)
                                .setTimeCharacteristic(timeCharacteristic)
                                .setDefaultBufferTimeout(bufferTimeout)
                                .setSlotSharingGroupResource(slotSharingGroupResources);
                                }

                                继续调用 execute(streamGraph); 方法 这里会返回  JobExecutionResult 对象
                                  /**
                                  * 触发程序执行。环境将执行所有导致 "sink" 操作的程序部分。"Sink" 操作包括打印结果或将其转发到消息队列等
                                  *
                                  * @param streamGraph 表示转换的流图。
                                  * @return 作业执行的结果,包括经过的时间和累加器
                                  * @throws Exception 在作业执行期间发生的。
                                  */
                                  @Internal
                                  public JobExecutionResult execute(StreamGraph streamGraph) throws Exception {
                                  final JobClient jobClient = executeAsync(streamGraph);


                                  try {
                                  final JobExecutionResult jobExecutionResult;


                                  if (configuration.getBoolean(DeploymentOptions.ATTACHED)) {
                                  jobExecutionResult = jobClient.getJobExecutionResult().get();
                                  } else {
                                  jobExecutionResult = new DetachedJobExecutionResult(jobClient.getJobID());
                                  }


                                  jobListeners.forEach(
                                  jobListener -> jobListener.onJobExecuted(jobExecutionResult, null));


                                  return jobExecutionResult;
                                  } catch (Throwable t) {
                                  // get() on the JobExecutionResult Future will throw an ExecutionException. This
                                  // behaviour was largely not there in Flink versions before the PipelineExecutor
                                  // refactoring so we should strip that exception.
                                  Throwable strippedException = ExceptionUtils.stripExecutionException(t);


                                  jobListeners.forEach(
                                  jobListener -> {
                                  jobListener.onJobExecuted(null, strippedException);
                                  });
                                  ExceptionUtils.rethrowException(strippedException);


                                  // never reached, only make javac happy
                                  return null;
                                  }
                                  }
                                  继续调用 executeAsync(streamGraph) 方法,可以看到 streamGraph 被转成了 pipeline并 实现了 PipelineExecutor 接口的 execute 方法
                                    /**
                                    * 异步触发程序执行。环境将执行所有导致 "sink" 操作的程序部分。"Sink" 操作包括打印结果或将其转发到消息队列等。
                                    *
                                    * @param streamGraph 表示转换的流图。
                                    * @return 一个可以用来与已提交的作业通信的{@link JobClient},在提交成功后完成
                                    * @throws Exception 在作业执行期间发生。
                                    */
                                    @Internal
                                    public JobClient executeAsync(StreamGraph streamGraph) throws Exception {
                                    checkNotNull(streamGraph, "StreamGraph cannot be null.");
                                    final PipelineExecutor executor = getPipelineExecutor();


                                    CompletableFuture<JobClient> jobClientFuture =
                                    executor.execute(streamGraph, configuration, userClassloader);


                                    try {
                                    JobClient jobClient = jobClientFuture.get();
                                    jobListeners.forEach(jobListener -> jobListener.onJobSubmitted(jobClient, null));
                                    collectIterators.forEach(iterator -> iterator.setJobClient(jobClient));
                                    collectIterators.clear();
                                    return jobClient;
                                    } catch (ExecutionException executionException) {
                                    final Throwable strippedException =
                                    ExceptionUtils.stripExecutionException(executionException);
                                    jobListeners.forEach(
                                    jobListener -> jobListener.onJobSubmitted(null, strippedException));


                                    throw new FlinkException(
                                    String.format("Failed to execute job '%s'.", streamGraph.getJobName()),
                                    strippedException);
                                    }
                                    }


                                    @Internal
                                    public interface PipelineExecutor {


                                    /**
                                    * 执行基于提供的配置的{@link Pipeline},并返回一个{@link JobClient},该客户端允许与正在执行的作业进行交互,例如取消它或创建一个保存点。
                                    *
                                    * 调用方负责管理返回的{@link JobClient}的生命周期。这意味着在调用方的代码中应当显式调用例如 {@code close()} 方法
                                    *
                                    * @param pipeline 要执行的{@link Pipeline}。
                                    * @param configuration 具有所需执行参数的{@link Configuration}
                                    * @param userCodeClassloader 用于反序列化用户代码的{@link ClassLoader}
                                    * @return 一个{@link CompletableFuture},其中包含与流水线对应的{@link JobClient}。
                                    */
                                    CompletableFuture<JobClient> execute(
                                    final Pipeline pipeline,
                                    final Configuration configuration,
                                    final ClassLoader userCodeClassloader)
                                    throws Exception;
                                    }
                                    查看实现类 LocalExecutor 的 execute() 方法将  pipeline 转成了 JobGraph 并执行了 submitJob(jobGraph, userCodeClassloader) 方法
                                      @Override
                                      public CompletableFuture<JobClient> execute(
                                      Pipeline pipeline, Configuration configuration, ClassLoader userCodeClassloader)
                                      throws Exception {
                                      checkNotNull(pipeline);
                                      checkNotNull(configuration);


                                      Configuration effectiveConfig = new Configuration();
                                      effectiveConfig.addAll(this.configuration);
                                      effectiveConfig.addAll(configuration);


                                      // we only support attached execution with the local executor.
                                      checkState(configuration.getBoolean(DeploymentOptions.ATTACHED));


                                      final JobGraph jobGraph = getJobGraph(pipeline, effectiveConfig, userCodeClassloader);


                                      return PerJobMiniClusterFactory.createWithFactory(effectiveConfig, miniClusterFactory)
                                      .submitJob(jobGraph, userCodeClassloader);
                                      }
                                       submitJob() 方法启动了本地集群,提交了任务,如何是有界数字流的话,等程序结束后,还会关闭本地集群
                                        public CompletableFuture<JobClient> submitJob(
                                        JobGraph jobGraph, ClassLoader userCodeClassloader) throws Exception {
                                        MiniClusterConfiguration miniClusterConfig =
                                        getMiniClusterConfig(jobGraph.getMaximumParallelism());
                                        MiniCluster miniCluster = miniClusterFactory.apply(miniClusterConfig);
                                        miniCluster.start();


                                        return miniCluster
                                        .submitJob(jobGraph)
                                        .thenApplyAsync(
                                        FunctionUtils.uncheckedFunction(
                                        submissionResult -> {
                                        org.apache.flink.client.ClientUtils
                                        .waitUntilJobInitializationFinished(
                                        () ->
                                        miniCluster
                                        .getJobStatus(
                                        submissionResult
                                        .getJobID())
                                        .get(),
                                        () ->
                                        miniCluster
                                        .requestJobResult(
                                        submissionResult
                                        .getJobID())
                                        .get(),
                                        userCodeClassloader);
                                        return submissionResult;
                                        }))
                                        .thenApply(
                                        result ->
                                        new MiniClusterJobClient(
                                        result.getJobID(),
                                        miniCluster,
                                        userCodeClassloader,
                                        MiniClusterJobClient.JobFinalizationBehavior
                                        .SHUTDOWN_CLUSTER))
                                        .whenComplete(
                                        (ignored, throwable) -> {
                                        if (throwable != null) {
                                        // We failed to create the JobClient and must shutdown to ensure
                                        // cleanup.
                                        shutDownCluster(miniCluster);
                                        }
                                        })
                                        .thenApply(Function.identity());
                                        }

                                        至此,整个wordcount任务运行的源码就解读完成了。

                                        总结如下:

                                        第一步:传参

                                        第二步:获取执行环境

                                        第三步:对环境设置额外参数

                                        第四步:读取数据,转成DataStream类型

                                        第五步:编写代码逻辑,封装进transformations

                                        第六步:transformations转换成StreamGraph

                                        第七步:StreamGraphPipeline

                                        第八步:excute Pipeline

                                        第九步:Pipeline JobGraph

                                        第十步:submitJob 提交任务 返回结果

                                        往期回顾:

                                        文章合集

                                        文章转载自大数据技能圈,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

                                        评论