暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Apache DolphinScheduler 任务调度3.1.0版本源码剖析

海豚调度 2022-12-22
3368

目录(本文章基于3.1.0版本研究)

1、数据库表介绍

2、 整体流程运行

3、 源码剖析

3.1 apiserver任务执行入口

3.2 master调度任务

  • 3.2.1 master启动
  • 3.2.2 command扫描
  • 3.2.3 workerFlowEvent消费
  • 3.2.4 workerflow事件处理逻辑
  • 3.2.5 workerflowRunnable运行逻辑
  • 3.2.6 任务消费
  • 3.2.7 任务分派

3.3. worker执行任务

  • 3.3.1 Worker启动

  • 3.3.2 Worker启动command

  • 3.3.3 workerManager消费

  • 3.3.4 任务运行

3.4 master接受任务反馈

  • 3.4.1 master接受反馈消息

  • 3.4.2 taskEventService处理taskevent

  • 3.4.3 TaskResultEventHandler处理taskevent

3.5. master闭环提交下游任务

  • 3.5.1 EventExecuteService处理stateEvent

  • 3.5.2 workflowExecuteThread处理stateEvent事件

  • 3.5.3 TaskStateEventHandler处理stateEvent事件

  • 3.5.4 wokerflowExecuteThread调度下游任务

4、总结

4.1 各个组件的作用
4.2 线程的作用


01

数据库表介绍

t_ds_process_definition:工作流定义表

用于保存工作流,一个工作流一条数据;

 

t_ds_process_instance:工作流运行实例表,工作流根据crontab每调度一次则生成一条数据;

 

t_ds_task_definition:任务定义表

画布中节点信息表,多少个节点就有多少条数据;

 

t_ds_process_task_relation:任务关系表

任务与任务之间连了线,则会有一条数据;

 

t_ds_task_instance:task运行实例表

task每运行一次会生成一条数据;

 

t_ds_command:发起任务工作流运行,向apiserver发送http请求,然后接口往该表输出要运行工作流的信息;

02

整体流程运行


  • 用户点击WEB界面的启动工作流按钮

  • apiserver 封装 commnd 到 db(往 t_ds_command 表中插入一条数据)。

  • master 扫描到 commad,进行 dga 构建,初始化,将源头 task 提交到 priority 队列中。

  • taskConsumer 消费队列数据得到 task,选择一台 worker 分配任务。

  • worker 接收到分配任务的消息启动任务。

  • worker 返回结果给 master,master 更新任务信息到 db 。

03

DolphinScheduler源码剖析


3.1 apiserver任务执行入口

当用户在前端点击执行任务,则会向海豚调度的接口发送请求,最终由 ExecutorController 的 startProcessInstance 方法来处理请求。


ExecutorController.startProcessInstance() 方法。


最终会往 mysql 表 t_ds_command 插入一条数据,将要运行的工作流信息写入该表。


    @PostMapping(value = "start-process-instance")
    @ResponseStatus(HttpStatus.OK)
    @ApiException(START_PROCESS_INSTANCE_ERROR)
    @AccessLogAnnotation(ignoreRequestArgs = "loginUser")
    public Result startProcessInstance(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
    @ApiParam(name = "projectCode", value = "PROJECT_CODE", required = true) @PathVariable long projectCode,
    @RequestParam(value = "processDefinitionCode") long processDefinitionCode,
    @RequestParam(value = "scheduleTime") String scheduleTime,
    @RequestParam(value = "failureStrategy") FailureStrategy failureStrategy,
    @RequestParam(value = "startNodeList", required = false) String startNodeList,
    @RequestParam(value = "taskDependType", required = false) TaskDependType taskDependType,
    @RequestParam(value = "execType", required = false) CommandType execType,
    @RequestParam(value = "warningType") WarningType warningType,
    @RequestParam(value = "warningGroupId", required = false, defaultValue = "0") Integer warningGroupId,
    @RequestParam(value = "runMode", required = false) RunMode runMode,
    @RequestParam(value = "processInstancePriority", required = false) Priority processInstancePriority,
    @RequestParam(value = "workerGroup", required = false, defaultValue = "default") String workerGroup,
    @RequestParam(value = "environmentCode", required = false, defaultValue = "-1") Long environmentCode,
    @RequestParam(value = "timeout", required = false) Integer timeout,
    @RequestParam(value = "startParams", required = false) String startParams,
    @RequestParam(value = "expectedParallelismNumber", required = false) Integer expectedParallelismNumber,
    @RequestParam(value = "dryRun", defaultValue = "0", required = false) int dryRun,
    @RequestParam(value = "complementDependentMode", required = false) ComplementDependentMode complementDependentMode) {


    if (timeout == null) {
    timeout = Constants.MAX_TASK_TIMEOUT;
    }
    Map<String, String> startParamMap = null;
    if (startParams != null) {
    startParamMap = JSONUtils.toMap(startParams);
    }


    if (complementDependentMode == null) {
    complementDependentMode = ComplementDependentMode.OFF_MODE;
    }
    生成commnd信息入库
    Map<String, Object> result = execService.execProcessInstance(loginUser, projectCode, processDefinitionCode,
    scheduleTime, execType, failureStrategy,
    startNodeList, taskDependType, warningType, warningGroupId, runMode, processInstancePriority,
    workerGroup, environmentCode, timeout, startParamMap, expectedParallelismNumber, dryRun, complementDependentMode);
    return returnDataList(result);
    }


    3.2 master 调度任务


    3.2.1  master启动

    DS

    MasterServer.run() 方法

    启动 master 的工作线程

      public void run() throws SchedulerException {
      init rpc server
      this.masterRPCServer.start();//启动netty rpc服务,与worker通信使用


      install task plugin
      this.taskPluginManager.loadPlugin();//加载taskplugin


      self tolerant
      this.masterRegistryClient.init();//加载高可用的一些注册信息
      this.masterRegistryClient.start();
      this.masterRegistryClient.setRegistryStoppable(this);
      command扫描线程
      this.masterSchedulerBootstrap.init();
      this.masterSchedulerBootstrap.start();
      事件处理线程
      this.eventExecuteService.start();
      this.failoverExecuteThread.start();
      定时调度
      this.schedulerApi.start();


      Runtime.getRuntime().addShutdownHook(new Thread(() -> {
      if (Stopper.isRunning()) {
      close("MasterServer shutdownHook");
      }
      }));
      }


      3.2.2 command扫描

      DS

      MasterSchedulerBootstrap.run()方法


      该线程在3.2.1启动,启动之后,进入循环,一直扫描 command 表,查询出 command,然后封装成 processInstants 入库,创建 WorkflowExecuteRunnable (此对象后续很多地方用到) 写入到 workflowEventQueue 中。

        public void run() {
        while (Stopper.isRunning()) {
        try {
        todo: if the workflow event queue is much, we need to handle the back pressure
        boolean isOverload =
        OSUtils.isOverload(masterConfig.getMaxCpuLoadAvg(), masterConfig.getReservedMemory());
        if (isOverload) {
        MasterServerMetrics.incMasterOverload();
        Thread.sleep(Constants.SLEEP_TIME_MILLIS);
        continue;
        }
        List<Command> commands = findCommands();
        if (CollectionUtils.isEmpty(commands)) {
        indicate that no command ,sleep for 1s
        Thread.sleep(Constants.SLEEP_TIME_MILLIS);
        continue;
        }
        将command转换成processInstance,并入库
        List<ProcessInstance> processInstances = command2ProcessInstance(commands);
        if (CollectionUtils.isEmpty(processInstances)) {
        indicate that the command transform to processInstance error, sleep for 1s
        Thread.sleep(Constants.SLEEP_TIME_MILLIS);
        continue;
        }
        MasterServerMetrics.incMasterConsumeCommand(commands.size());


        processInstances.forEach(processInstance -> {
        try {
        LoggerUtils.setWorkflowInstanceIdMDC(processInstance.getId());
        if (processInstanceExecCacheManager.contains(processInstance.getId())) {
        logger.error("The workflow instance is already been cached, this case shouldn't be happened");
        }
        WorkflowExecuteRunnable workflowRunnable = new WorkflowExecuteRunnable(processInstance,
        processService,
        nettyExecutorManager,
        processAlertManager,
        masterConfig,
        stateWheelExecuteThread,
        curingGlobalParamsService);
        processInstanceExecCacheManager.cache(processInstance.getId(), workflowRunnable);//processInstanceExecCacheManager设置进cache 被 workflowEventLoop获取
        workflowEventQueue.addEvent(new WorkflowEvent(WorkflowEventType.START_WORKFLOW,
        processInstance.getId()));
        } finally {
        LoggerUtils.removeWorkflowInstanceIdMDC();
        }
        });
        } catch (InterruptedException interruptedException) {
        logger.warn("Master schedule bootstrap interrupted, close the loop", interruptedException);
        Thread.currentThread().interrupt();
        break;
        } catch (Exception e) {
        logger.error("Master schedule workflow error", e);
        sleep for 1s here to avoid the database down cause the exception boom
        ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);
        }
        }
        }


        3.2.3 workerFlowEvent消费

        DS


        在 command 扫描线程中启动了 workflowEventLooper 线程用于消费 workerFlowEvent 。

        MasterSchedulerBootstrap.start() 方法





          @Override
          public synchronized void start() {
          logger.info("Master schedule bootstrap starting..");
          super.start();
          workflowEventLooper.start();//工作流调度线程启动
          logger.info("Master schedule bootstrap started...");
          }


          从workflowEventQueue 拉取 workflowevent 事件,调用 workflowEventHandler 处理该事件。


          WorkflowEventLooper.run()方法

            public void run() {
            WorkflowEvent workflowEvent = null;
            while (Stopper.isRunning()) {
            try {
            workflowEvent = workflowEventQueue.poolEvent();//拉取workflowevent
            LoggerUtils.setWorkflowInstanceIdMDC(workflowEvent.getWorkflowInstanceId());
            logger.info("Workflow event looper receive a workflow event: {}, will handle this", workflowEvent);
            WorkflowEventHandler workflowEventHandler =
            workflowEventHandlerMap.get(workflowEvent.getWorkflowEventType());//获取workflowevent,处理workflowevent事件
            workflowEventHandler.handleWorkflowEvent(workflowEvent);
            } catch (InterruptedException e) {
            logger.warn("WorkflowEventLooper thread is interrupted, will close this loop", e);
            Thread.currentThread().interrupt();
            break;
            } catch (WorkflowEventHandleException workflowEventHandleException) {
            logger.error("Handle workflow event failed, will add this event to event queue again, event: {}",
            workflowEvent, workflowEventHandleException);
            workflowEventQueue.addEvent(workflowEvent);
            ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);
            } catch (WorkflowEventHandleError workflowEventHandleError) {
            logger.error("Handle workflow event error, will drop this event, event: {}",
            workflowEvent,
            workflowEventHandleError);
            } catch (Exception unknownException) {
            logger.error(
            "Handle workflow event failed, get a unknown exception, will add this event to event queue again, event: {}",
            workflowEvent, unknownException);
            workflowEventQueue.addEvent(workflowEvent);
            ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);
            } finally {
            LoggerUtils.removeWorkflowInstanceIdMDC();
            }
            }
            }

            3.2.4 workerflow事件处理逻辑

            DS

            因为是START_WORKFLOW类型的所以获取到 WorkflowStartEventHandler.handleWorkflowEvent() 来处理该事件。


            该方法中,获取 WorkflowExecuteRunnable ,运行异步任务调用 call 方法。

              @Override
              public void handleWorkflowEvent(WorkflowEvent workflowEvent) throws WorkflowEventHandleError {
              logger.info("Handle workflow start event, begin to start a workflow, event: {}", workflowEvent);
              //获取WorkflowExecuteRunnable
              WorkflowExecuteRunnable workflowExecuteRunnable =
              processInstanceExecCacheManager.getByProcessInstanceId(workflowEvent.getWorkflowInstanceId());
              if (workflowExecuteRunnable == null) {
              throw new WorkflowEventHandleError(
              "The workflow start event is invalid, cannot find the workflow instance from cache");
              }
              ProcessInstance processInstance = workflowExecuteRunnable.getProcessInstance();


              ProcessInstanceMetrics.incProcessInstanceSubmit();
              //异步调用call方法执行workflowExecute运行逻辑。
              CompletableFuture<WorkflowSubmitStatue> workflowSubmitFuture =
              CompletableFuture.supplyAsync(workflowExecuteRunnable::call, workflowExecuteThreadPool);
              workflowSubmitFuture.thenAccept(workflowSubmitStatue -> {
              if (WorkflowSubmitStatue.SUCCESS == workflowSubmitStatue) {
              // submit failed will resend the event to workflow event queue
              logger.info("Success submit the workflow instance");//监听返回状态是否成功
              if (processInstance.getTimeout() > 0) {//是否超时
              stateWheelExecuteThread.addProcess4TimeoutCheck(processInstance);
              }
              } else {//出现异常,重试,重新进入队列,调用call方法
              logger.error("Failed to submit the workflow instance, will resend the workflow start event: {}",
              workflowEvent);
              workflowEventQueue.addEvent(new WorkflowEvent(WorkflowEventType.START_WORKFLOW,
              processInstance.getId()));
              }
              });
              }


              3.2.5 workerflowRunnable运行逻辑

              DS

              WorkflowExecuteRunnable.call()


              • 初始化workerflow的有向无环图。

              • 初始化任务调度配置

              • 提交源头任务到任务优先级队列中。

                @Override
                public WorkflowSubmitStatue call() {
                if (isStart()) {
                // This case should not been happened
                logger.warn("[WorkflowInstance-{}] The workflow has already been started", processInstance.getId());
                return WorkflowSubmitStatue.DUPLICATED_SUBMITTED;
                }


                try {
                LoggerUtils.setWorkflowInstanceIdMDC(processInstance.getId());
                if (workflowRunnableStatus == WorkflowRunnableStatus.CREATED) {
                buildFlowDag();//创建dag有向无环图
                workflowRunnableStatus = WorkflowRunnableStatus.INITIALIZE_DAG;
                logger.info("workflowStatue changed to :{}", workflowRunnableStatus);
                }
                if (workflowRunnableStatus == WorkflowRunnableStatus.INITIALIZE_DAG) {
                initTaskQueue();//初始化任务调度配置
                workflowRunnableStatus = WorkflowRunnableStatus.INITIALIZE_QUEUE;
                logger.info("workflowStatue changed to :{}", workflowRunnableStatus);
                }
                if (workflowRunnableStatus == WorkflowRunnableStatus.INITIALIZE_QUEUE) {
                submitPostNode(null);//提交任务到队列中,注意是先提交源头结点,源头结点运行万再提交源头结点的下有节点
                workflowRunnableStatus = WorkflowRunnableStatus.STARTED;
                logger.info("workflowStatue changed to :{}", workflowRunnableStatus);
                }
                return WorkflowSubmitStatue.SUCCESS;
                } catch (Exception e) {
                logger.error("Start workflow error", e);
                return WorkflowSubmitStatue.FAILED;
                } finally {
                LoggerUtils.removeWorkflowInstanceIdMDC();
                }
                }

                3.2.6 任务消费

                DS

                TaskPriorityQueueConsumer.run方法,该线程通过注解启动。

                  //通过注解启动
                  @PostConstruct
                  public void init() {
                  this.consumerThreadPoolExecutor = (ThreadPoolExecutor) ThreadUtils.newDaemonFixedThreadExecutor("TaskUpdateQueueConsumerThread", masterConfig.getDispatchTaskNumber());
                  logger.info("Task priority queue consume thread staring");
                  super.start();
                  logger.info("Task priority queue consume thread started");
                  }


                  @Override
                  public void run() {
                  int fetchTaskNum = masterConfig.getDispatchTaskNumber();
                  while (Stopper.isRunning()) {
                  try {
                  List<TaskPriority> failedDispatchTasks = this.batchDispatch(fetchTaskNum);


                  if (CollectionUtils.isNotEmpty(failedDispatchTasks)) {
                  TaskMetrics.incTaskDispatchFailed(failedDispatchTasks.size());
                  for (TaskPriority dispatchFailedTask : failedDispatchTasks) {
                  taskPriorityQueue.put(dispatchFailedTask);
                  }
                  // If the all task dispatch failed, will sleep for 1s to avoid the master cpu higher.
                  if (fetchTaskNum == failedDispatchTasks.size()) {
                  TimeUnit.MILLISECONDS.sleep(Constants.SLEEP_TIME_MILLIS);
                  }
                  }
                  } catch (Exception e) {
                  TaskMetrics.incTaskDispatchError();
                  logger.error("dispatcher task error", e);
                  }
                  }
                  }

                  批量调度任务

                  TaskPriorityQueueConsumer.batchDispatch()

                    /**
                    * batch dispatch with thread pool
                    */
                    public List<TaskPriority> batchDispatch(int fetchTaskNum) throws TaskPriorityQueueException, InterruptedException {
                    List<TaskPriority> failedDispatchTasks = Collections.synchronizedList(new ArrayList<>());
                    CountDownLatch latch = new CountDownLatch(fetchTaskNum);


                    for (int i = 0; i < fetchTaskNum; i++) {//拉取任务
                    TaskPriority taskPriority = taskPriorityQueue.poll(Constants.SLEEP_TIME_MILLIS, TimeUnit.MILLISECONDS);
                    if (Objects.isNull(taskPriority)) {
                    latch.countDown();
                    continue;
                    }
                    consumerThreadPoolExecutor.submit(() -> {//创建异步线程分派任务
                    try {
                    boolean dispatchResult = this.dispatchTask(taskPriority);
                    if (!dispatchResult) {
                    failedDispatchTasks.add(taskPriority);
                    }
                    } finally {
                    // make sure the latch countDown
                    latch.countDown();
                    }
                    });
                    }
                    latch.await();
                    return failedDispatchTasks;
                    }


                    3.2.7 任务分派

                    DS



                      /**
                      * Dispatch task to worker.
                      *
                      * @param taskPriority taskPriority
                      * @return dispatch result, return true if dispatch success, return false if dispatch failed.
                      */
                      protected boolean dispatchTask(TaskPriority taskPriority) {
                      TaskMetrics.incTaskDispatch();
                      boolean result = false;
                      try {
                      WorkflowExecuteRunnable workflowExecuteRunnable =//获取workflowexecuteRunnable对象
                      processInstanceExecCacheManager.getByProcessInstanceId(taskPriority.getProcessInstanceId());
                      if (workflowExecuteRunnable == null) {
                      logger.error("Cannot find the related processInstance of the task, taskPriority: {}", taskPriority);
                      return true;
                      }
                      Optional<TaskInstance> taskInstanceOptional =//获取任务实例
                      workflowExecuteRunnable.getTaskInstance(taskPriority.getTaskId());
                      if (!taskInstanceOptional.isPresent()) {
                      logger.error("Cannot find the task instance from related processInstance, taskPriority: {}",
                      taskPriority);
                      // we return true, so that we will drop this task.
                      return true;
                      }
                      TaskInstance taskInstance = taskInstanceOptional.get();
                      TaskExecutionContext context = taskPriority.getTaskExecutionContext();
                      ExecutionContext executionContext =//创建执行上下文
                      new ExecutionContext(toCommand(context), ExecutorType.WORKER, context.getWorkerGroup(), taskInstance);


                      if (isTaskNeedToCheck(taskPriority)) {
                      if (taskInstanceIsFinalState(taskPriority.getTaskId())) {
                      // when task finish, ignore this task, there is no need to dispatch anymore
                      return true;
                      }
                      }
                      //分派任务
                      result = dispatcher.dispatch(executionContext);


                      if (result) {
                      logger.info("Master success dispatch task to worker, taskInstanceId: {}, worker: {}",
                      taskPriority.getTaskId(),
                      executionContext.getHost());
                      addDispatchEvent(context, executionContext);
                      } else {
                      logger.info("Master failed to dispatch task to worker, taskInstanceId: {}, worker: {}",
                      taskPriority.getTaskId(),
                      executionContext.getHost());
                      }
                      } catch (RuntimeException | ExecuteException e) {
                      logger.error("Master dispatch task to worker error, taskPriority: {}", taskPriority, e);
                      }
                      return result;
                      }


                      选出一台worker节点通过netty向worker发送command,让worker运行该任务。


                        /**
                        * task dispatch
                        *
                        * @param context context
                        * @return result
                        * @throws ExecuteException if error throws ExecuteException
                        */
                        public Boolean dispatch(final ExecutionContext context) throws ExecuteException {
                        // get executor manager 获取
                        ExecutorManager<Boolean> executorManager = this.executorManagers.get(context.getExecutorType());
                        if (executorManager == null) {
                        throw new ExecuteException("no ExecutorManager for type : " + context.getExecutorType());
                        }


                        // host select worke节点选择器,选择worker节点
                        Host host = hostManager.select(context);
                        if (StringUtils.isEmpty(host.getAddress())) {
                        logger.warn("fail to execute : {} due to no suitable worker, current task needs worker group {} to execute",
                        context.getCommand(), context.getWorkerGroup());
                        return false;
                        }
                        context.setHost(host);//设置host进上下文
                        executorManager.beforeExecute(context);
                        try {
                        // task execute 通过netty发送消息给worker,告知worker要执行该任务
                        return executorManager.execute(context);
                        } finally {
                        executorManager.afterExecute(context);
                        }
                        }


                          /**
                          * execute logic
                          *
                          * @param context context
                          * @return result
                          * @throws ExecuteException if error throws ExecuteException
                          */
                          @Override
                          public Boolean execute(ExecutionContext context) throws ExecuteException {
                          // all nodes
                          Set<String> allNodes = getAllNodes(context);
                          // fail nodes
                          Set<String> failNodeSet = new HashSet<>();
                          // build command accord executeContext
                          Command command = context.getCommand();
                          // execute task host
                          Host host = context.getHost();
                          boolean success = false;
                          while (!success) {
                          try {
                          doExecute(host, command);
                          success = true;
                          context.setHost(host);
                          // We set the host to taskInstance to avoid when the worker down, this taskInstance may not be failovered, due to the taskInstance's host
                          // is not belongs to the down worker ISSUE-10842.
                          context.getTaskInstance().setHost(host.getAddress());
                          } catch (ExecuteException ex) {
                          logger.error(String.format("execute command : %s error", command), ex);
                          try {
                          failNodeSet.add(host.getAddress());
                          Set<String> tmpAllIps = new HashSet<>(allNodes);
                          Collection<String> remained = CollectionUtils.subtract(tmpAllIps, failNodeSet);
                          if (remained != null && remained.size() > 0) {
                          host = Host.of(remained.iterator().next());
                          logger.error("retry execute command : {} host : {}", command, host);
                          } else {
                          throw new ExecuteException("fail after try all nodes");
                          }
                          } catch (Throwable t) {
                          throw new ExecuteException("fail after try all nodes");
                          }
                          }
                          }


                          return success;
                          }


                          3.3 worker执行任务


                          3.3.1 worker启动

                          DS

                          WorkerServer.run()

                            public void run() {
                            this.workerRpcServer.start();//启动rpc通信组件
                            this.workerRpcClient.start();
                            this.taskPluginManager.loadPlugin();


                            this.workerRegistryClient.registry();
                            this.workerRegistryClient.setRegistryStoppable(this);
                            this.workerRegistryClient.handleDeadServer();
                            //启动workermaanager线程,消费command执行任务。
                            this.workerManagerThread.start();


                            this.messageRetryRunner.start();


                            /*
                            * registry hooks, which are called before the process exits
                            */
                            Runtime.getRuntime().addShutdownHook(new Thread(() -> {
                            if (Stopper.isRunning()) {
                            close("WorkerServer shutdown hook");
                            }
                            }));
                            }


                            3.3.2 worker消费任务启动command

                            DS

                            worker 通过接收到 master 发送过来的消息最终会调用TaskDispatchProcessor.process(在3.3.1启动netty rpc通信组件的时候注册的handler)方法处理 command 消息。


                            TaskDispatchProcessor.process()


                            提取 command 中的 task 上下文对象封装成 TaskExecuteThread ,提交到 
                            workerManage中。

                              @Counted(value = "ds.task.execution.count", description = "task execute total count")
                              @Timed(value = "ds.task.execution.duration", percentiles = {0.5, 0.75, 0.95, 0.99}, histogram = true)
                              @Override
                              public void process(Channel channel, Command command) {
                              Preconditions.checkArgument(CommandType.TASK_DISPATCH_REQUEST == command.getType(),
                              String.format("invalid command type : %s", command.getType()));


                              TaskDispatchCommand taskDispatchCommand = JSONUtils.parseObject(command.getBody(), TaskDispatchCommand.class);


                              if (taskDispatchCommand == null) {
                              logger.error("task execute request command content is null");
                              return;
                              }
                              final String masterAddress = taskDispatchCommand.getMessageSenderAddress();
                              logger.info("task execute request message: {}", taskDispatchCommand);


                              TaskExecutionContext taskExecutionContext = taskDispatchCommand.getTaskExecutionContext();//获取上下文


                              if (taskExecutionContext == null) {
                              logger.error("task execution context is null");
                              return;
                              }
                              try {
                              LoggerUtils.setWorkflowAndTaskInstanceIDMDC(taskExecutionContext.getProcessInstanceId(),
                              taskExecutionContext.getTaskInstanceId());


                              TaskMetrics.incrTaskTypeExecuteCount(taskExecutionContext.getTaskType());


                              // set cache, it will be used when kill task
                              TaskExecutionContextCacheManager.cacheTaskExecutionContext(taskExecutionContext);


                              // todo custom logger


                              taskExecutionContext.setHost(workerConfig.getWorkerAddress());
                              taskExecutionContext.setLogPath(LogUtils.getTaskLogPath(taskExecutionContext));


                              if (Constants.DRY_RUN_FLAG_NO == taskExecutionContext.getDryRun()) {//不是测试运行
                              boolean osUserExistFlag;
                              //if Using distributed is true and Currently supported systems are linux,Should not let it automatically
                              //create tenants,so TenantAutoCreate has no effect
                              if (workerConfig.isTenantDistributedUser() && SystemUtils.IS_OS_LINUX) {
                              //use the id command to judge in linux
                              osUserExistFlag = OSUtils.existTenantCodeInLinux(taskExecutionContext.getTenantCode());
                              } else if (CommonUtils.isSudoEnable() && workerConfig.isTenantAutoCreate()) {
                              // if not exists this user, then create
                              OSUtils.createUserIfAbsent(taskExecutionContext.getTenantCode());
                              osUserExistFlag = OSUtils.getUserList().contains(taskExecutionContext.getTenantCode());
                              } else {
                              osUserExistFlag = OSUtils.getUserList().contains(taskExecutionContext.getTenantCode());
                              }


                              // check if the OS user exists
                              if (!osUserExistFlag) {//校验操作系统用户是否存在
                              logger.error("tenantCode: {} does not exist, taskInstanceId: {}",
                              taskExecutionContext.getTenantCode(),
                              taskExecutionContext.getTaskInstanceId());
                              TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
                              taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.FAILURE);
                              taskExecutionContext.setEndTime(new Date());
                              workerMessageSender.sendMessageWithRetry(taskExecutionContext,//发送失败消息
                              masterAddress,
                              CommandType.TASK_EXECUTE_RESULT);
                              return;
                              }


                              // local execute path
                              String execLocalPath = getExecLocalPath(taskExecutionContext);
                              logger.info("task instance local execute path : {}", execLocalPath);
                              taskExecutionContext.setExecutePath(execLocalPath);


                              try {
                              FileUtils.createWorkDirIfAbsent(execLocalPath);
                              } catch (Throwable ex) {//workerdir创建失败
                              logger.error("create execLocalPath fail, path: {}, taskInstanceId: {}",
                              execLocalPath,
                              taskExecutionContext.getTaskInstanceId(),
                              ex);
                              TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
                              taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.FAILURE);
                              workerMessageSender.sendMessageWithRetry(taskExecutionContext,
                              masterAddress,
                              CommandType.TASK_EXECUTE_RESULT);
                              return;
                              }
                              }


                              // delay task process
                              long remainTime = DateUtils.getRemainTime(taskExecutionContext.getFirstSubmitTime(),
                              taskExecutionContext.getDelayTime() * 60L);
                              if (remainTime > 0) {
                              logger.info("delay the execution of task instance {}, delay time: {} s",
                              taskExecutionContext.getTaskInstanceId(),
                              remainTime);
                              taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.DELAY_EXECUTION);
                              taskExecutionContext.setStartTime(null);
                              workerMessageSender.sendMessage(taskExecutionContext, masterAddress, CommandType.TASK_EXECUTE_RESULT);
                              }


                              // submit task to manager//提交任务 workerManager 中
                              boolean offer = workerManager.offer(new TaskExecuteThread(taskExecutionContext,
                              masterAddress,
                              workerMessageSender,
                              alertClientService,
                              taskPluginManager,
                              storageOperate));
                              if (!offer) {
                              logger.warn("submit task to wait queue error, queue is full, queue size is {}, taskInstanceId: {}",
                              workerManager.getWaitSubmitQueueSize(),
                              taskExecutionContext.getTaskInstanceId());
                              workerMessageSender.sendMessageWithRetry(taskExecutionContext, masterAddress, CommandType.TASK_REJECT);
                              }
                              } finally {
                              LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
                              }
                              }


                              3.3.3 workerManager 消费任务

                              DS


                                public void start() {
                                logger.info("Worker manager thread starting");
                                Thread thread = new Thread(this, this.getClass().getName());
                                thread.setDaemon(true);
                                thread.start();
                                logger.info("Worker manager thread started");
                                }


                                public void submit(TaskExecuteThread taskExecuteThread) {
                                taskExecuteThreadMap.put(taskExecuteThread.getTaskExecutionContext().getTaskInstanceId(), taskExecuteThread);//设置进正在运行task的map中。
                                ListenableFuture future = this.listeningExecutorService.submit(taskExecuteThread);//提交任务到线程池中运行任务
                                FutureCallback futureCallback = new FutureCallback() {
                                @Override
                                public void onSuccess(Object o) {//注册毁掉函数
                                taskExecuteThreadMap.remove(taskExecuteThread.getTaskExecutionContext().getTaskInstanceId());
                                }


                                @Override
                                public void onFailure(Throwable throwable) {
                                logger.error("task execute failed, processInstanceId:{}, taskInstanceId:{}",
                                taskExecuteThread.getTaskExecutionContext().getProcessInstanceId(),
                                taskExecuteThread.getTaskExecutionContext().getTaskInstanceId(),
                                throwable);
                                taskExecuteThreadMap.remove(taskExecuteThread.getTaskExecutionContext().getTaskInstanceId());//移除
                                }
                                };
                                Futures.addCallback(future, futureCallback, this.listeningExecutorService);
                                }


                                3.3.4 任务运行

                                DS

                                  @Override
                                  public void run() {
                                  try {
                                  LoggerUtils.setWorkflowAndTaskInstanceIDMDC(taskExecutionContext.getProcessInstanceId(),
                                  taskExecutionContext.getTaskInstanceId());
                                  if (Constants.DRY_RUN_FLAG_YES == taskExecutionContext.getDryRun()) {//判断是不是测试运行
                                  taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.SUCCESS);
                                  taskExecutionContext.setStartTime(new Date());
                                  taskExecutionContext.setEndTime(new Date());
                                  TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
                                  workerMessageSender.sendMessageWithRetry(taskExecutionContext,
                                  masterAddress,
                                  CommandType.TASK_EXECUTE_RESULT);
                                  logger.info("Task dry run success");
                                  return;
                                  }
                                  } finally {
                                  LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
                                  }
                                  try {
                                  LoggerUtils.setWorkflowAndTaskInstanceIDMDC(taskExecutionContext.getProcessInstanceId(),
                                  taskExecutionContext.getTaskInstanceId());
                                  logger.info("script path : {}", taskExecutionContext.getExecutePath());
                                  if (taskExecutionContext.getStartTime() == null) {
                                  taskExecutionContext.setStartTime(new Date());
                                  }
                                  logger.info("the task begins to execute. task instance id: {}", taskExecutionContext.getTaskInstanceId());


                                  // callback task execute running
                                  taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.RUNNING_EXECUTION);
                                  workerMessageSender.sendMessageWithRetry(taskExecutionContext,
                                  masterAddress,
                                  CommandType.TASK_EXECUTE_RUNNING);


                                  // copy hdfs/minio file to local
                                  List<Pair<String, String>> fileDownloads = downloadCheck(taskExecutionContext.getExecutePath(),
                                  taskExecutionContext.getResources());
                                  if (!fileDownloads.isEmpty()) {
                                  downloadResource(taskExecutionContext.getExecutePath(), logger, fileDownloads);
                                  }


                                  taskExecutionContext.setEnvFile(CommonUtils.getSystemEnvPath());


                                  taskExecutionContext.setTaskAppId(String.format("%s_%s",
                                  taskExecutionContext.getProcessInstanceId(),
                                  taskExecutionContext.getTaskInstanceId()));


                                  TaskChannel taskChannel = taskPluginManager.getTaskChannelMap().get(taskExecutionContext.getTaskType());//获取task构造器
                                  if (null == taskChannel) {
                                  throw new ServiceException(String.format("%s Task Plugin Not Found,Please Check Config File.", taskExecutionContext.getTaskType()));
                                  }
                                  String taskLogName = LoggerUtils.buildTaskId(taskExecutionContext.getFirstSubmitTime(),
                                  taskExecutionContext.getProcessDefineCode(),
                                  taskExecutionContext.getProcessDefineVersion(),
                                  taskExecutionContext.getProcessInstanceId(),
                                  taskExecutionContext.getTaskInstanceId());
                                  taskExecutionContext.setTaskLogName(taskLogName);


                                  // set the name of the current thread
                                  Thread.currentThread().setName(taskLogName);


                                  task = taskChannel.createTask(taskExecutionContext);//创建task


                                  // task init
                                  this.task.init();//初始化task


                                  //init varPool
                                  this.task.getParameters().setVarPool(taskExecutionContext.getVarPool());


                                  // task handle
                                  this.task.handle();//task处理的真正逻辑,每一种task对应自己的handle逻辑


                                  // task result process
                                  if (this.task.getNeedAlert()) {
                                  sendAlert(this.task.getTaskAlertInfo(), this.task.getExitStatus().getCode());
                                  }


                                  taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.of(this.task.getExitStatus().getCode()));//返回任务的返回信息
                                  taskExecutionContext.setEndTime(DateUtils.getCurrentDate());
                                  taskExecutionContext.setProcessId(this.task.getProcessId());
                                  taskExecutionContext.setAppIds(this.task.getAppIds());
                                  taskExecutionContext.setVarPool(JSONUtils.toJsonString(this.task.getParameters().getVarPool()));
                                  logger.info("task instance id : {},task final status : {}", taskExecutionContext.getTaskInstanceId(), this.task.getExitStatus());
                                  } catch (Throwable e) {
                                  logger.error("task scheduler failure", e);
                                  kill();
                                  taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.FAILURE);//返回失败
                                  taskExecutionContext.setEndTime(DateUtils.getCurrentDate());
                                  taskExecutionContext.setProcessId(this.task.getProcessId());
                                  taskExecutionContext.setAppIds(this.task.getAppIds());
                                  } finally {
                                  TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());//返回任务运行结果
                                  workerMessageSender.sendMessageWithRetry(taskExecutionContext,
                                  masterAddress,
                                  CommandType.TASK_EXECUTE_RESULT);
                                  clearTaskExecPath();
                                  LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
                                  }
                                  }


                                  3.4 master接受任务反馈


                                  3.4.1 master接受反馈消息

                                  DS


                                  Worker运行完任务通过netty向master汇报任务运行结果。Master在启动的时候也启动了netty,同时也注册了很多handler。一种消息类型是一个业务对应一个handler来处理业务。

                                  Worker 反馈任务运行消息则是  Task_EXECUTE_RESULT 消息类型


                                  MasterRPCServer.init()

                                    @Override
                                    public void run() {
                                    try {
                                    LoggerUtils.setWorkflowAndTaskInstanceIDMDC(taskExecutionContext.getProcessInstanceId(),
                                    taskExecutionContext.getTaskInstanceId());
                                    if (Constants.DRY_RUN_FLAG_YES == taskExecutionContext.getDryRun()) {//判断是不是测试运行
                                    taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.SUCCESS);
                                    taskExecutionContext.setStartTime(new Date());
                                    taskExecutionContext.setEndTime(new Date());
                                    TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
                                    workerMessageSender.sendMessageWithRetry(taskExecutionContext,
                                    masterAddress,
                                    CommandType.TASK_EXECUTE_RESULT);
                                    logger.info("Task dry run success");
                                    return;
                                    }
                                    } finally {
                                    LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
                                    }
                                    try {
                                    LoggerUtils.setWorkflowAndTaskInstanceIDMDC(taskExecutionContext.getProcessInstanceId(),
                                    taskExecutionContext.getTaskInstanceId());
                                    logger.info("script path : {}", taskExecutionContext.getExecutePath());
                                    if (taskExecutionContext.getStartTime() == null) {
                                    taskExecutionContext.setStartTime(new Date());
                                    }
                                    logger.info("the task begins to execute. task instance id: {}", taskExecutionContext.getTaskInstanceId());


                                    // callback task execute running
                                    taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.RUNNING_EXECUTION);
                                    workerMessageSender.sendMessageWithRetry(taskExecutionContext,
                                    masterAddress,
                                    CommandType.TASK_EXECUTE_RUNNING);


                                    // copy hdfs/minio file to local
                                    List<Pair<String, String>> fileDownloads = downloadCheck(taskExecutionContext.getExecutePath(),
                                    taskExecutionContext.getResources());
                                    if (!fileDownloads.isEmpty()) {
                                    downloadResource(taskExecutionContext.getExecutePath(), logger, fileDownloads);
                                    }


                                    taskExecutionContext.setEnvFile(CommonUtils.getSystemEnvPath());


                                    taskExecutionContext.setTaskAppId(String.format("%s_%s",
                                    taskExecutionContext.getProcessInstanceId(),
                                    taskExecutionContext.getTaskInstanceId()));


                                    TaskChannel taskChannel = taskPluginManager.getTaskChannelMap().get(taskExecutionContext.getTaskType());//获取task构造器
                                    if (null == taskChannel) {
                                    throw new ServiceException(String.format("%s Task Plugin Not Found,Please Check Config File.", taskExecutionContext.getTaskType()));
                                    }
                                    String taskLogName = LoggerUtils.buildTaskId(taskExecutionContext.getFirstSubmitTime(),
                                    taskExecutionContext.getProcessDefineCode(),
                                    taskExecutionContext.getProcessDefineVersion(),
                                    taskExecutionContext.getProcessInstanceId(),
                                    taskExecutionContext.getTaskInstanceId());
                                    taskExecutionContext.setTaskLogName(taskLogName);


                                    // set the name of the current thread
                                    Thread.currentThread().setName(taskLogName);


                                    task = taskChannel.createTask(taskExecutionContext);//创建task


                                    // task init
                                    this.task.init();//初始化task


                                    //init varPool
                                    this.task.getParameters().setVarPool(taskExecutionContext.getVarPool());


                                    // task handle
                                    this.task.handle();//task处理的真正逻辑,每一种task对应自己的handle逻辑


                                    // task result process
                                    if (this.task.getNeedAlert()) {
                                    sendAlert(this.task.getTaskAlertInfo(), this.task.getExitStatus().getCode());
                                    }


                                    taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.of(this.task.getExitStatus().getCode()));//返回任务的返回信息
                                    taskExecutionContext.setEndTime(DateUtils.getCurrentDate());
                                    taskExecutionContext.setProcessId(this.task.getProcessId());
                                    taskExecutionContext.setAppIds(this.task.getAppIds());
                                    taskExecutionContext.setVarPool(JSONUtils.toJsonString(this.task.getParameters().getVarPool()));
                                    logger.info("task instance id : {},task final status : {}", taskExecutionContext.getTaskInstanceId(), this.task.getExitStatus());
                                    } catch (Throwable e) {
                                    logger.error("task scheduler failure", e);
                                    kill();
                                    taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.FAILURE);//返回失败
                                    taskExecutionContext.setEndTime(DateUtils.getCurrentDate());
                                    taskExecutionContext.setProcessId(this.task.getProcessId());
                                    taskExecutionContext.setAppIds(this.task.getAppIds());
                                    } finally {
                                    TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());//返回任务运行结果
                                    workerMessageSender.sendMessageWithRetry(taskExecutionContext,
                                    masterAddress,
                                    CommandType.TASK_EXECUTE_RESULT);
                                    clearTaskExecPath();
                                    LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
                                    }
                                    }


                                    将消息封装成 taskResultEvent 提交给taskEventService


                                    TaskExecuteResponseProcessor.run()

                                      @Override
                                      public void process(Channel channel, Command command) {
                                      Preconditions.checkArgument(CommandType.TASK_EXECUTE_RESULT == command.getType(),
                                      String.format("invalid command type : %s", command.getType()));


                                      TaskExecuteResultCommand taskExecuteResultMessage = JSONUtils.parseObject(command.getBody(),
                                      TaskExecuteResultCommand.class);
                                      TaskEvent taskResultEvent = TaskEvent.newResultEvent(taskExecuteResultMessage,
                                      channel,
                                      taskExecuteResultMessage.getMessageSenderAddress());
                                      try {
                                      LoggerUtils.setWorkflowAndTaskInstanceIDMDC(taskResultEvent.getProcessInstanceId(),
                                      taskResultEvent.getTaskInstanceId());
                                      logger.info("Received task execute result, event: {}", taskResultEvent);
                                      //提交事件给taskEventService
                                      taskEventService.addEvent(taskResultEvent);
                                      } finally {
                                      LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
                                      }
                                      }


                                      3.4.2 taskEventService处理taskevent

                                      DS

                                      TaskEventService 启动源码,启动  taskeventdispacher 和 taskeventhandler 。


                                      Eventservice 由这两个组件组成。


                                      TaskEventService.start() 方法

                                        @Override
                                        public void process(Channel channel, Command command) {
                                        Preconditions.checkArgument(CommandType.TASK_EXECUTE_RESULT == command.getType(),
                                        String.format("invalid command type : %s", command.getType()));


                                        TaskExecuteResultCommand taskExecuteResultMessage = JSONUtils.parseObject(command.getBody(),
                                        TaskExecuteResultCommand.class);
                                        TaskEvent taskResultEvent = TaskEvent.newResultEvent(taskExecuteResultMessage,
                                        channel,
                                        taskExecuteResultMessage.getMessageSenderAddress());
                                        try {
                                        LoggerUtils.setWorkflowAndTaskInstanceIDMDC(taskResultEvent.getProcessInstanceId(),
                                        taskResultEvent.getTaskInstanceId());
                                        logger.info("Received task execute result, event: {}", taskResultEvent);
                                        //提交事件给taskEventService
                                        taskEventService.addEvent(taskResultEvent);
                                        } finally {
                                        LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
                                        }
                                        }





                                        接3.4.1 处理

                                          @Override
                                          public void process(Channel channel, Command command) {
                                          Preconditions.checkArgument(CommandType.TASK_EXECUTE_RESULT == command.getType(),
                                          String.format("invalid command type : %s", command.getType()));


                                          TaskExecuteResultCommand taskExecuteResultMessage = JSONUtils.parseObject(command.getBody(),
                                          TaskExecuteResultCommand.class);
                                          TaskEvent taskResultEvent = TaskEvent.newResultEvent(taskExecuteResultMessage,
                                          channel,
                                          taskExecuteResultMessage.getMessageSenderAddress());
                                          try {
                                          LoggerUtils.setWorkflowAndTaskInstanceIDMDC(taskResultEvent.getProcessInstanceId(),
                                          taskResultEvent.getTaskInstanceId());
                                          logger.info("Received task execute result, event: {}", taskResultEvent);
                                          //提交事件给taskEventService
                                          taskEventService.addEvent(taskResultEvent);
                                          } finally {
                                          LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
                                          }
                                          }


                                          进入submitTaskEvent方法

                                            public void submitTaskEvent(TaskEvent taskEvent) {
                                            if (!processInstanceExecCacheManager.contains(taskEvent.getProcessInstanceId())) {
                                            logger.warn("Cannot find workflowExecuteThread from cacheManager, event: {}", taskEvent);
                                            return;
                                            } //创建taskExecuteRunnable,并且将事件提交到taskEventRunnable,后续该对象负责处理该process的所有提交过来的TaskEvent
                                            TaskExecuteRunnable taskExecuteRunnable = taskExecuteThreadMap.computeIfAbsent(taskEvent.getProcessInstanceId(),
                                            (processInstanceId) -> new TaskExecuteRunnable(processInstanceId, taskEventHandlerMap));
                                            taskExecuteRunnable.addEvent(taskEvent);
                                            }


                                            处理 taskevent ,最终调用 taskExecuteThread.run  方法处理 taskevent。

                                              class TaskEventHandlerThread extends BaseDaemonThread {


                                              protected TaskEventHandlerThread() {
                                              super("TaskEventHandlerThread");
                                              }


                                              @Override
                                              public void run() {
                                              logger.info("event handler thread started");
                                              while (Stopper.isRunning()) {
                                              try {//处理事件
                                              taskExecuteThreadPool.eventHandler();
                                              TimeUnit.MILLISECONDS.sleep(Constants.SLEEP_TIME_MILLIS);
                                              } catch (InterruptedException e) {
                                              Thread.currentThread().interrupt();
                                              logger.warn("TaskEvent handle thread interrupted, will return this loop");
                                              break;
                                              } catch (Exception e) {
                                              logger.error("event handler thread error", e);
                                              }
                                              }
                                              }
                                              }




                                              //进入eventHandler方法
                                              public void eventHandler() {
                                              for (TaskExecuteRunnable taskExecuteThread : taskExecuteThreadMap.values()) {
                                              executeEvent(taskExecuteThread);
                                              }
                                              }




                                              //执行事件
                                              public void executeEvent(TaskExecuteRunnable taskExecuteThread) {
                                              if (taskExecuteThread.isEmpty()) {
                                              return;
                                              }
                                              if (multiThreadFilterMap.containsKey(taskExecuteThread.getKey())) {
                                              return;
                                              }
                                              //获取对应task的事件处理线程
                                              multiThreadFilterMap.put(taskExecuteThread.getKey(), taskExecuteThread);
                                              ListenableFuture future = this.submitListenable(taskExecuteThread::run);
                                              future.addCallback(new ListenableFutureCallback() {//注册毁掉
                                              @Override
                                              public void onFailure(Throwable ex) {
                                              Integer processInstanceId = taskExecuteThread.getProcessInstanceId();
                                              logger.error("[WorkflowInstance-{}] persist event failed", processInstanceId, ex);
                                              if (!processInstanceExecCacheManager.contains(processInstanceId)) {
                                              taskExecuteThreadMap.remove(processInstanceId);
                                              logger.info("[WorkflowInstance-{}] Cannot find processInstance from cacheManager, remove process instance from threadMap",
                                              processInstanceId);
                                              }
                                              multiThreadFilterMap.remove(taskExecuteThread.getKey());
                                              }


                                              @Override
                                              public void onSuccess(Object result) {
                                              Integer processInstanceId = taskExecuteThread.getProcessInstanceId();
                                              logger.info("[WorkflowInstance-{}] persist events succeeded", processInstanceId);
                                              if (!processInstanceExecCacheManager.contains(processInstanceId)) {
                                              taskExecuteThreadMap.remove(processInstanceId);
                                              logger.info("[WorkflowInstance-{}] Cannot find processInstance from cacheManager, remove process instance from threadMap",
                                              processInstanceId);
                                              }
                                              multiThreadFilterMap.remove(taskExecuteThread.getKey());
                                              }
                                              });
                                              }






                                              //最终提交异步任务TaskExecuteRunnable调用run方法


                                              TaskExecuteRunnable.run()


                                              public void run() {
                                              while (!this.events.isEmpty()) {
                                              // we handle the task event belongs to one task serial, so if the event comes in wrong order,
                                              TaskEvent event = this.events.peek();
                                              try {
                                              LoggerUtils.setWorkflowAndTaskInstanceIDMDC(event.getProcessInstanceId(), event.getTaskInstanceId());
                                              logger.info("Handle task event begin: {}", event);
                                              //根据事件类型获取handler,处理事件
                                              taskEventHandlerMap.get(event.getEvent()).handleTaskEvent(event);
                                              events.remove(event);
                                              logger.info("Handle task event finished: {}", event);
                                              } catch (TaskEventHandleException taskEventHandleException) {
                                              // we don't need to resubmit this event, since the worker will resubmit this event
                                              logger.error("Handle task event failed, this event will be retry later, event: {}", event,
                                              taskEventHandleException);
                                              } catch (TaskEventHandleError taskEventHandleError) {
                                              logger.error("Handle task event error, this event will be removed, event: {}", event,
                                              taskEventHandleError);
                                              events.remove(event);
                                              } catch (Exception unknownException) {
                                              logger.error("Handle task event error, get a unknown exception, this event will be removed, event: {}",
                                              event, unknownException);
                                              events.remove(event);
                                              } finally {
                                              LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
                                              }
                                              }
                                              }


                                              3.4.3 TaskResultEventHandler 处理 taskevent

                                              DS

                                              TaskResultEventHandler.handlerTaskEvent()

                                                class TaskEventHandlerThread extends BaseDaemonThread {


                                                protected TaskEventHandlerThread() {
                                                super("TaskEventHandlerThread");
                                                }


                                                @Override
                                                public void run() {
                                                logger.info("event handler thread started");
                                                while (Stopper.isRunning()) {
                                                try {//处理事件
                                                taskExecuteThreadPool.eventHandler();
                                                TimeUnit.MILLISECONDS.sleep(Constants.SLEEP_TIME_MILLIS);
                                                } catch (InterruptedException e) {
                                                Thread.currentThread().interrupt();
                                                logger.warn("TaskEvent handle thread interrupted, will return this loop");
                                                break;
                                                } catch (Exception e) {
                                                logger.error("event handler thread error", e);
                                                }
                                                }
                                                }
                                                }




                                                //进入eventHandler方法
                                                public void eventHandler() {
                                                for (TaskExecuteRunnable taskExecuteThread : taskExecuteThreadMap.values()) {
                                                executeEvent(taskExecuteThread);
                                                }
                                                }




                                                //执行事件
                                                public void executeEvent(TaskExecuteRunnable taskExecuteThread) {
                                                if (taskExecuteThread.isEmpty()) {
                                                return;
                                                }
                                                if (multiThreadFilterMap.containsKey(taskExecuteThread.getKey())) {
                                                return;
                                                }
                                                //获取对应task的事件处理线程
                                                multiThreadFilterMap.put(taskExecuteThread.getKey(), taskExecuteThread);
                                                ListenableFuture future = this.submitListenable(taskExecuteThread::run);
                                                future.addCallback(new ListenableFutureCallback() {//注册毁掉
                                                @Override
                                                public void onFailure(Throwable ex) {
                                                Integer processInstanceId = taskExecuteThread.getProcessInstanceId();
                                                logger.error("[WorkflowInstance-{}] persist event failed", processInstanceId, ex);
                                                if (!processInstanceExecCacheManager.contains(processInstanceId)) {
                                                taskExecuteThreadMap.remove(processInstanceId);
                                                logger.info("[WorkflowInstance-{}] Cannot find processInstance from cacheManager, remove process instance from threadMap",
                                                processInstanceId);
                                                }
                                                multiThreadFilterMap.remove(taskExecuteThread.getKey());
                                                }


                                                @Override
                                                public void onSuccess(Object result) {
                                                Integer processInstanceId = taskExecuteThread.getProcessInstanceId();
                                                logger.info("[WorkflowInstance-{}] persist events succeeded", processInstanceId);
                                                if (!processInstanceExecCacheManager.contains(processInstanceId)) {
                                                taskExecuteThreadMap.remove(processInstanceId);
                                                logger.info("[WorkflowInstance-{}] Cannot find processInstance from cacheManager, remove process instance from threadMap",
                                                processInstanceId);
                                                }
                                                multiThreadFilterMap.remove(taskExecuteThread.getKey());
                                                }
                                                });
                                                }






                                                //最终提交异步任务TaskExecuteRunnable调用run方法


                                                TaskExecuteRunnable.run()


                                                public void run() {
                                                while (!this.events.isEmpty()) {
                                                // we handle the task event belongs to one task serial, so if the event comes in wrong order,
                                                TaskEvent event = this.events.peek();
                                                try {
                                                LoggerUtils.setWorkflowAndTaskInstanceIDMDC(event.getProcessInstanceId(), event.getTaskInstanceId());
                                                logger.info("Handle task event begin: {}", event);
                                                //根据事件类型获取handler,处理事件
                                                taskEventHandlerMap.get(event.getEvent()).handleTaskEvent(event);
                                                events.remove(event);
                                                logger.info("Handle task event finished: {}", event);
                                                } catch (TaskEventHandleException taskEventHandleException) {
                                                // we don't need to resubmit this event, since the worker will resubmit this event
                                                logger.error("Handle task event failed, this event will be retry later, event: {}", event,
                                                taskEventHandleException);
                                                } catch (TaskEventHandleError taskEventHandleError) {
                                                logger.error("Handle task event error, this event will be removed, event: {}", event,
                                                taskEventHandleError);
                                                events.remove(event);
                                                } catch (Exception unknownException) {
                                                logger.error("Handle task event error, get a unknown exception, this event will be removed, event: {}",
                                                event, unknownException);
                                                events.remove(event);
                                                } finally {
                                                LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
                                                }
                                                }
                                                }



                                                3.5 master闭环提交下游任务


                                                3.5.1 EventExecuteService 处理 stateEvent

                                                DS

                                                EventExecuteService.run()

                                                该service线程由3.2.1启动

                                                  @Override
                                                  public void run() {
                                                  while (Stopper.isRunning()) {
                                                  try {
                                                  //处理事件
                                                  eventHandler();
                                                  TimeUnit.MILLISECONDS.sleep(Constants.SLEEP_TIME_MILLIS_SHORT);
                                                  } catch (InterruptedException interruptedException) {
                                                  logger.warn("Master event service interrupted, will exit this loop", interruptedException);
                                                  Thread.currentThread().interrupt();
                                                  break;
                                                  } catch (Exception e) {
                                                  logger.error("Master event execute service error", e);
                                                  }
                                                  }
                                                  }


                                                  进入eventHnnder方法
                                                  private void eventHandler() {
                                                  for (WorkflowExecuteRunnable workflowExecuteThread : this.processInstanceExecCacheManager.getAll()) {
                                                  try {
                                                  LoggerUtils.setWorkflowInstanceIdMDC(workflowExecuteThread.getProcessInstance().getId());
                                                  workflowExecuteThreadPool.executeEvent(workflowExecuteThread);
                                                  } finally {
                                                  LoggerUtils.removeWorkflowInstanceIdMDC();
                                                  }
                                                  }
                                                  }

                                                  最终调用 workflowExecuteThread 的 handlerEvents 处理事件。

                                                    /**
                                                    * Handle the events belong to the given workflow.
                                                    */
                                                    public void executeEvent(final WorkflowExecuteRunnable workflowExecuteThread) {
                                                    if (!workflowExecuteThread.isStart() || workflowExecuteThread.eventSize() == 0) {
                                                    return;
                                                    }
                                                    if (multiThreadFilterMap.containsKey(workflowExecuteThread.getKey())) {
                                                    logger.warn("The workflow has been executed by another thread");
                                                    return;
                                                    }
                                                    multiThreadFilterMap.put(workflowExecuteThread.getKey(), workflowExecuteThread);
                                                    int processInstanceId = workflowExecuteThread.getProcessInstance().getId();
                                                    ListenableFuture<?> future = this.submitListenable(workflowExecuteThread::handleEvents);//
                                                    future.addCallback(new ListenableFutureCallback() {
                                                    @Override
                                                    public void onFailure(Throwable ex) {
                                                    LoggerUtils.setWorkflowInstanceIdMDC(processInstanceId);
                                                    try {
                                                    logger.error("Workflow instance events handle failed", ex);
                                                    multiThreadFilterMap.remove(workflowExecuteThread.getKey());
                                                    } finally {
                                                    LoggerUtils.removeWorkflowInstanceIdMDC();
                                                    }
                                                    }


                                                    @Override
                                                    public void onSuccess(Object result) {
                                                    try {
                                                    LoggerUtils.setWorkflowInstanceIdMDC(workflowExecuteThread.getProcessInstance().getId());
                                                    if (workflowExecuteThread.workFlowFinish()) {
                                                    stateWheelExecuteThread.removeProcess4TimeoutCheck(workflowExecuteThread.getProcessInstance().getId());
                                                    processInstanceExecCacheManager.removeByProcessInstanceId(processInstanceId);
                                                    notifyProcessChanged(workflowExecuteThread.getProcessInstance());
                                                    logger.info("Workflow instance is finished.");
                                                    }
                                                    } catch (Exception e) {
                                                    logger.error("Workflow instance is finished, but notify changed error", e);
                                                    } finally {
                                                    // make sure the process has been removed from multiThreadFilterMap
                                                    multiThreadFilterMap.remove(workflowExecuteThread.getKey());
                                                    LoggerUtils.removeWorkflowInstanceIdMDC();
                                                    }
                                                    }
                                                    });
                                                    }

                                                    3.5.2 workflowExecuteThread 处理 stateEvent 事件

                                                    DS

                                                    WorkflowExecuteRunnable.handlerEvents()

                                                      /**
                                                      * handle event
                                                      */
                                                      public void handleEvents() {
                                                      if (!isStart()) {
                                                      logger.info(
                                                      "The workflow instance is not started, will not handle its state event, current state event size: {}",
                                                      stateEvents);
                                                      return;
                                                      }
                                                      StateEvent stateEvent = null;
                                                      while (!this.stateEvents.isEmpty()) {
                                                      try {
                                                      stateEvent = this.stateEvents.peek();
                                                      LoggerUtils.setWorkflowAndTaskInstanceIDMDC(stateEvent.getProcessInstanceId(),
                                                      stateEvent.getTaskInstanceId());
                                                      // if state handle success then will remove this state, otherwise will retry this state next time.
                                                      // The state should always handle success except database error.
                                                      checkProcessInstance(stateEvent);


                                                      StateEventHandler stateEventHandler =
                                                      StateEventHandlerManager.getStateEventHandler(stateEvent.getType())
                                                      .orElseThrow(() -> new StateEventHandleError("Cannot find handler for the given state event"));
                                                      logger.info("Begin to handle state event, {}", stateEvent);
                                                      if (stateEventHandler.handleStateEvent(this, stateEvent)) {//调用stateEventHandler处理事件
                                                      this.stateEvents.remove(stateEvent);
                                                      }
                                                      } catch (StateEventHandleError stateEventHandleError) {
                                                      logger.error("State event handle error, will remove this event: {}", stateEvent, stateEventHandleError);
                                                      this.stateEvents.remove(stateEvent);
                                                      ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);
                                                      } catch (StateEventHandleException stateEventHandleException) {
                                                      logger.error("State event handle error, will retry this event: {}",
                                                      stateEvent,
                                                      stateEventHandleException);
                                                      ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);
                                                      } catch (Exception e) {
                                                      // we catch the exception here, since if the state event handle failed, the state event will still keep in the stateEvents queue.
                                                      logger.error("State event handle error, get a unknown exception, will retry this event: {}",
                                                      stateEvent,
                                                      e);
                                                      ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);
                                                      } finally {
                                                      LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
                                                      }


                                                      }
                                                      }


                                                      3.5.3 TaskStateEventHandler处理stateEvent事件

                                                      DS


                                                      判断是否是任务完成事件如果是完成事件,调用 workflowExecuteThread 的 taskFinished 方法,提交下游任务。


                                                      TaskStateEventHandler.handlerStateEvent()


                                                        @Override
                                                        public boolean handleStateEvent(WorkflowExecuteRunnable workflowExecuteRunnable, StateEvent stateEvent)
                                                        throws StateEventHandleException, StateEventHandleError {
                                                        measureTaskState(stateEvent);
                                                        workflowExecuteRunnable.checkTaskInstanceByStateEvent(stateEvent);


                                                        Optional<TaskInstance> taskInstanceOptional =
                                                        workflowExecuteRunnable.getTaskInstance(stateEvent.getTaskInstanceId());


                                                        TaskInstance task = taskInstanceOptional.orElseThrow(() -> new StateEventHandleError(
                                                        "Cannot find task instance from taskMap by task instance id: " + stateEvent.getTaskInstanceId()));


                                                        if (task.getState() == null) {
                                                        throw new StateEventHandleError("Task state event handle error due to task state is null");
                                                        }


                                                        Map<Long, Integer> completeTaskMap = workflowExecuteRunnable.getCompleteTaskMap();


                                                        if (task.getState().typeIsFinished()) {
                                                        if (completeTaskMap.containsKey(task.getTaskCode())
                                                        && completeTaskMap.get(task.getTaskCode()) == task.getId()) {
                                                        logger.warn("The task instance is already complete, stateEvent: {}", stateEvent);
                                                        return true;
                                                        }
                                                        //调用workflowexecuteRunnabletaskFinished方法。
                                                        workflowExecuteRunnable.taskFinished(task);
                                                        if (task.getTaskGroupId() > 0) {
                                                        workflowExecuteRunnable.releaseTaskGroup(task);
                                                        }
                                                        return true;
                                                        }
                                                        Map<Long, ITaskProcessor> activeTaskProcessMap = workflowExecuteRunnable.getActiveTaskProcessMap();
                                                        if (activeTaskProcessMap.containsKey(task.getTaskCode())) {
                                                        ITaskProcessor iTaskProcessor = activeTaskProcessMap.get(task.getTaskCode());
                                                        iTaskProcessor.action(TaskAction.RUN);


                                                        if (iTaskProcessor.taskInstance().getState().typeIsFinished()) {
                                                        if (iTaskProcessor.taskInstance().getState() != task.getState()) {
                                                        task.setState(iTaskProcessor.taskInstance().getState());
                                                        }
                                                        workflowExecuteRunnable.taskFinished(task);
                                                        }
                                                        return true;
                                                        }
                                                        throw new StateEventHandleException(
                                                        "Task state event handle error, due to the task is not in activeTaskProcessorMaps");
                                                        }


                                                        3.5.4 wokerflowExecuteThread调度下游任务

                                                        DS

                                                        WorkflowExecuteRunnable.taskFinished方法
                                                          public void taskFinished(TaskInstance taskInstance) throws StateEventHandleException {
                                                          logger.info("TaskInstance finished task code:{} state:{}", taskInstance.getTaskCode(), taskInstance.getState());
                                                          try {


                                                          activeTaskProcessorMaps.remove(taskInstance.getTaskCode());
                                                          stateWheelExecuteThread.removeTask4TimeoutCheck(processInstance, taskInstance);
                                                          stateWheelExecuteThread.removeTask4RetryCheck(processInstance, taskInstance);
                                                          stateWheelExecuteThread.removeTask4StateCheck(processInstance, taskInstance);


                                                          if (taskInstance.getState().typeIsSuccess()) {
                                                          completeTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId());
                                                          // todo: merge the last taskInstance
                                                          processInstance.setVarPool(taskInstance.getVarPool());
                                                          processService.saveProcessInstance(processInstance);
                                                          if (!processInstance.isBlocked()) {
                                                          submitPostNode(Long.toString(taskInstance.getTaskCode()));
                                                          }
                                                          } else if (taskInstance.taskCanRetry() && processInstance.getState() != ExecutionStatus.READY_STOP) {
                                                          // retry task
                                                          logger.info("Retry taskInstance taskInstance state: {}", taskInstance.getState());
                                                          retryTaskInstance(taskInstance);
                                                          } else if (taskInstance.getState().typeIsFailure()) {
                                                          completeTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId());
                                                          // There are child nodes and the failure policy is: CONTINUE
                                                          if (processInstance.getFailureStrategy() == FailureStrategy.CONTINUE && DagHelper.haveAllNodeAfterNode(
                                                          Long.toString(taskInstance.getTaskCode()),
                                                          dag)) {
                                                          submitPostNode(Long.toString(taskInstance.getTaskCode()));
                                                          } else {
                                                          errorTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId());
                                                          if (processInstance.getFailureStrategy() == FailureStrategy.END) {
                                                          killAllTasks();
                                                          }
                                                          }
                                                          } else if (taskInstance.getState().typeIsFinished()) {
                                                          // todo: when the task instance type is pause, then it should not in completeTaskMap
                                                          completeTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId());
                                                          }
                                                          logger.info("TaskInstance finished will try to update the workflow instance state, task code:{} state:{}",
                                                          taskInstance.getTaskCode(),
                                                          taskInstance.getState());
                                                          this.updateProcessInstanceState();
                                                          } catch (Exception ex) {
                                                          logger.error("Task finish failed, get a exception, will remove this taskInstance from completeTaskMap", ex);
                                                          // remove the task from complete map, so that we can finish in the next time.
                                                          completeTaskMap.remove(taskInstance.getTaskCode());
                                                          throw ex;
                                                          }
                                                          }


                                                          04

                                                          总结


                                                          4.1 各个组件的使用

                                                          DS

                                                          • ApiServer负责前端的接口请求。

                                                          • Master负责工作流的任务调度,根据策略选择一台合适的worker执行任务,并更新任务状态到db。

                                                          • Worker负责接受到master的任务,然后运行,并报告运行结果给master

                                                          4.2 线程作用

                                                          DS

                                                          • MasterSchedulerBootstrap:command扫描线程,负责扫描apiserver写入的command信息,并创建processInstants入库。

                                                          • WorkflowEventLooper:WorkflowEvent处理线程,负责处理MasterSchedulerBootstrap写入的WorkflowEvent

                                                          • WorkflowExecuteRunnable:负责工作流的启动逻辑,构建dag有向无环图,初始化调度配置,提交任务到任务队列中。

                                                          • TaskPriorityQueueConsumer:负责消费优先级队列的任务,异步通过dispacher选择wokrer,向worker发送启动任务的command。

                                                          • TaskDispatchProcessor:worker节点负责处理 master的command信息。

                                                          • WorkerManagerThread:worker节点负责管理当前正则运行的task信息

                                                          • TaskExecuteThread:task的执行线程,负责执行task的逻辑,并返回执行结果给master。

                                                          • TaskExecuteResponseProcessor:负责处理worker返回的任务结果,并包装成taskEvent

                                                          • TaskEventService:负责处理taskEvent TaskResultEventHandler:被taskEventService调用,负责task结果处理。

                                                          • EventExecuteService:负责处理工作流产生的事件。

                                                          • TaskStateEventHandler:负责处理workflowExecute的event,event类型为TaskStateEvent 并且如果是taskstate是运行完成信息,还要提交调度下游任务。

                                                          本文是个人阅读 DolphinScheduler 源码的一些见解,欢迎大家跟我交流~如有错误,请批评指正!


                                                          参与贡献


                                                          随着国内开源的迅猛崛起,Apache DolphinScheduler 社区迎来蓬勃发展,为了做更好用、易用的调度,真诚欢迎热爱开源的伙伴加入到开源社区中来,为中国开源崛起献上一份自己的力量,让本土开源走向全球。


                                                          参与 DolphinScheduler 社区有非常多的参与贡献的方式,包括:


                                                          贡献第一个PR(文档、代码) 我们也希望是简单的,第一个PR用于熟悉提交的流程和社区协作以及感受社区的友好度。


                                                          社区汇总了以下适合新手的问题列表:https://github.com/apache/dolphinscheduler/issues/5689


                                                          非新手问题列表:https://github.com/apache/dolphinscheduler/issues?q=is%3Aopen+is%3Aissue+label%3A%22volunteer+wanted%22


                                                          如何参与贡献链接:https://dolphinscheduler.apache.org/zh-cn/community/development/contribute.html


                                                          来吧,DolphinScheduler开源社区需要您的参与,为中国开源崛起添砖加瓦吧,哪怕只是小小的一块瓦,汇聚起来的力量也是巨大的。


                                                          参与开源可以近距离与各路高手切磋,迅速提升自己的技能,如果您想参与贡献,我们有个贡献者种子孵化群,可以添加社区小助手微信(Leonard-ds) ,手把手教会您( 贡献者不分水平高低,有问必答,关键是有一颗愿意贡献的心 )。

                                                          添加社区小助手微信(Leonard-ds) 



                                                          添加小助手微信时请说明想参与贡献。


                                                          来吧,开源社区非常期待您的参与。



                                                          < 🐬🐬 >
                                                          更多精彩推荐

                                                          DolphinScheduler 登陆 AWS AMI 应用市场!

                                                          DolphinScheduler 机器学习工作流预测今年 FIFA 世界杯冠军大概率是荷兰!

                                                          手把手教你上手Apache DolphinScheduler机器学习工作流

                                                          DolphinScheduler 快速构建 Hugging Face 文本分类工作流,基于工作流的机器学习训练部署太强了!

                                                          Apache DolphinScheduler 获评「2022 年度优秀开源技术团队」

                                                          【Meetup讲师】您有一张社区认证讲师证书未领取,点击领取!

                                                          最佳实践 | 如何基于GitHub Actions创建 DolphinScheduler Python API的CI/CD?



                                                          我知道你在看

                                                          文章转载自海豚调度,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

                                                          评论