暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Worker模块源码实战:万字长文解析DolphinScheduler如何实现亿级任务调度

海豚调度 5天前
19

点击蓝字,关注我们

Apache DolphinScheduler的Worker模块是其分布式调度系统的核心组件之一,负责任务执行、资源管理及集群动态调度。本文将通过源码剖析,揭示其设计思想与实现细节.

1

Worker接收Master RPC请求架构图



Worker服务的Netty提供和Master JDK动态代理接口调用,请参考Dolphinscheduler告警模块解说,不再重复地说。
简说 :
org.apache.dolphinscheduler.extract.worker.ITaskInstanceOperator
@RpcService
public interface ITaskInstanceOperator {

@RpcMethod
TaskInstanceDispatchResponse dispatchTask(TaskInstanceDispatchRequest taskInstanceDispatchRequest);

@RpcMethod
TaskInstanceKillResponse killTask(TaskInstanceKillRequest taskInstanceKillRequest);

@RpcMethod
TaskInstancePauseResponse pauseTask(TaskInstancePauseRequest taskPauseRequest);

@RpcMethod
UpdateWorkflowHostResponse updateWorkflowInstanceHost(UpdateWorkflowHostRequest updateWorkflowHostRequest);
}

对实现了@RpcService的接口和@RpcMethod的方法,进行Worker的Netty handler注入和Master动态代理实现。

2

分发任务



(TaskInstanceDispatchOperationFunction)

2.1、WorkerConfig

WorkerConfig : 其实就是从Worker模块下 application.yaml 下读取 worker 开头的配置

2.2、WorkerTaskExecutorFactoryBuilder

WorkerTaskExecutorFactoryBuilder : 是任务执行器工厂的构造器,里面封装了 DefaultWorkerTaskExecutorFactory(默认Worker任务执行器工厂) ,DefaultWorkerTaskExecutorFactory工厂又封装了 DefaultWorkerTaskExecutor 的创建。DefaultWorkerTaskExecutor 的父类是WorkerTaskExecutor,WorkerTaskExecutor又是一个线程。好玩不?

2.3、WorkerTaskExecutorThreadPool

WorkerTaskExecutorThreadPool : 其实就是Fixed线程池的封装而已

2.4、从operator开始说

public TaskInstanceDispatchResponse operate(TaskInstanceDispatchRequest taskInstanceDispatchRequest) {
log.info("Receive TaskInstanceDispatchRequest: {}", taskInstanceDispatchRequest);
TODO 任务执行上下文
TaskExecutionContext taskExecutionContext = taskInstanceDispatchRequest.getTaskExecutionContext();
try {
TODO 设置worker地址
taskExecutionContext.setHost(workerConfig.getWorkerAddress());
TODO 设置task日志存放路径
taskExecutionContext.setLogPath(LogUtils.getTaskInstanceLogFullPath(taskExecutionContext));

TODO MDC中设置流程实例id和任务实例id,好像只是put,没有get使用
LogUtils.setWorkflowAndTaskInstanceIDMDC(
taskExecutionContext.getProcessInstanceId(),
taskExecutionContext.getTaskInstanceId());

check server status, if server is not running, return failed to reject this task
if (!ServerLifeCycleManager.isRunning()) {
log.error("server is not running. reject task: {}", taskExecutionContext.getProcessInstanceId());
return TaskInstanceDispatchResponse.failed(taskExecutionContext.getTaskInstanceId(),
"server is not running");
}

TaskMetrics.incrTaskTypeExecuteCount(taskExecutionContext.getTaskType());

TODO 通过WorkerTaskExecutorFactoryBuilder创建了一个WorkerTaskExecutor
WorkerTaskExecutor workerTaskExecutor = workerTaskExecutorFactoryBuilder
.createWorkerTaskExecutorFactory(taskExecutionContext)
.createWorkerTaskExecutor();
todo: hold the workerTaskExecutor
TODO 直接进行任务的提交
if (!workerTaskExecutorThreadPool.submitWorkerTaskExecutor(workerTaskExecutor)) {
log.info("Submit task: {} to wait queue failed", taskExecutionContext.getTaskName());
return TaskInstanceDispatchResponse.failed(taskExecutionContext.getTaskInstanceId(),
"WorkerManagerThread is full");
} else {
log.info("Submit task: {} to wait queue success", taskExecutionContext.getTaskName());
return TaskInstanceDispatchResponse.success(taskExecutionContext.getTaskInstanceId());
}
} finally {
LogUtils.removeWorkflowAndTaskInstanceIdMDC();
}
}

LogUtils.getTaskInstanceLogFullPath(taskExecutionContext) 解析
org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils#getTaskInstanceLogFullPath : 获取任务日志的全路径
/**
* Get task instance log full path.
*
* @param taskExecutionContext task execution context.
* @return task instance log full path.
*/
public static String getTaskInstanceLogFullPath(TaskExecutionContext taskExecutionContext) {
return getTaskInstanceLogFullPath(
DateUtils.timeStampToDate(taskExecutionContext.getFirstSubmitTime()),
taskExecutionContext.getProcessDefineCode(),
taskExecutionContext.getProcessDefineVersion(),
taskExecutionContext.getProcessInstanceId(),
taskExecutionContext.getTaskInstanceId());
}

org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils#getTaskInstanceLogFullPath : 拼接出任务日志的全路径
/**
* todo: Remove the submitTime parameter?
* The task instance log full path, the path is like:{log.base}/{taskSubmitTime}/{workflowDefinitionCode}/{workflowDefinitionVersion}/{}workflowInstance}/{taskInstance}.log
*
* @param taskFirstSubmitTime task first submit time
* @param workflowDefinitionCode workflow definition code
* @param workflowDefinitionVersion workflow definition version
* @param workflowInstanceId workflow instance id
* @param taskInstanceId task instance id.
* @return task instance log full path.
*/
public static String getTaskInstanceLogFullPath(Date taskFirstSubmitTime,
Long workflowDefinitionCode,
int workflowDefinitionVersion,
int workflowInstanceId,
int taskInstanceId) {
if (TASK_INSTANCE_LOG_BASE_PATH == null) {
throw new IllegalArgumentException(
"Cannot find the task instance log base path, please check your logback.xml file");
}
final String taskLogFileName = Paths.get(
String.valueOf(workflowDefinitionCode),
String.valueOf(workflowDefinitionVersion),
String.valueOf(workflowInstanceId),
String.format("%s.log", taskInstanceId)).toString();
return TASK_INSTANCE_LOG_BASE_PATH
.resolve(DateUtils.format(taskFirstSubmitTime, DateConstants.YYYYMMDD, null))
.resolve(taskLogFileName)
.toString();
}

org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils#getTaskInstanceLogBasePath : 读取logback-spring.xml中的配置,获取任务实例日志的基础路径,其实就是获取根目录下/logs为基础路径
/**
* Get task instance log base absolute path, this is defined in logback.xml
*
* @return
*/
public static Path getTaskInstanceLogBasePath() {
return Optional.of(LoggerFactory.getILoggerFactory())
.map(e -> (AppenderAttachable<ILoggingEvent>) (e.getLogger("ROOT")))
.map(e -> (SiftingAppender) (e.getAppender("TASKLOGFILE")))
.map(e -> ((TaskLogDiscriminator) (e.getDiscriminator())))
.map(TaskLogDiscriminator::getLogBase)
.map(e -> Paths.get(e).toAbsolutePath())
.orElse(null);
}

worker的 logback-spring.xml :
<configuration scan="true" scanPeriod="120 seconds">
<property name="log.base" value="logs"/>
...
<appender name="TASKLOGFILE" class="ch.qos.logback.classic.sift.SiftingAppender">
<filter class="org.apache.dolphinscheduler.plugin.task.api.log.TaskLogFilter"/>
<Discriminator class="org.apache.dolphinscheduler.plugin.task.api.log.TaskLogDiscriminator">
<key>taskInstanceLogFullPath</key>
<logBase>${log.base}</logBase>
</Discriminator>
<sift>
<appender name="FILE-${taskInstanceLogFullPath}" class="ch.qos.logback.core.FileAppender">
<file>${taskInstanceLogFullPath}</file>
<encoder>
<pattern>
[%level] %date{yyyy-MM-dd HH:mm:ss.SSS Z} - %message%n
</pattern>
<charset>UTF-8</charset>
</encoder>
<append>true</append>
</appender>
</sift>
</appender>
...
<root level="INFO">
<appender-ref ref="STDOUT"/>
<appender-ref ref="TASKLOGFILE"/>
</root>

</configuration>

最终地址是:
/opt/dolphinscheduler/worker-server/logs/20240615/13929490938784/1/1815/1202.log

2.5、DefaultWorkerTaskExecutor解说

org.apache.dolphinscheduler.server.worker.runner.operator.TaskInstanceDispatchOperationFunction#operate
...
// TODO 通过WorkerTaskExecutorFactoryBuilder创建了一个WorkerTaskExecutor
WorkerTaskExecutor workerTaskExecutor = workerTaskExecutorFactoryBuilder
.createWorkerTaskExecutorFactory(taskExecutionContext)
.createWorkerTaskExecutor();
todo: hold the workerTaskExecutor
TODO 直接进行任务的提交
if (!workerTaskExecutorThreadPool.submitWorkerTaskExecutor(workerTaskExecutor)) {
log.info("Submit task: {} to wait queue failed", taskExecutionContext.getTaskName());
return TaskInstanceDispatchResponse.failed(taskExecutionContext.getTaskInstanceId(),
"WorkerManagerThread is full");
} else {
log.info("Submit task: {} to wait queue success", taskExecutionContext.getTaskName());
return TaskInstanceDispatchResponse.success(taskExecutionContext.getTaskInstanceId());
}
...

直接使用 workerTaskExecutorThreadPool.submitWorkerTaskExecutor(workerTaskExecutor)
进行任务的提交
WorkerTaskExecutor 是一个线程,既然是线程,是不是要看一下run :
public void run() {
try {
TODO MDC中设置流程实例和任务实例,其实就相当于是ThreadLocal使用一样
LogUtils.setWorkflowAndTaskInstanceIDMDC(
taskExecutionContext.getProcessInstanceId(),
taskExecutionContext.getTaskInstanceId());

TODO MDC中设置任务的日志路径
LogUtils.setTaskInstanceLogFullPathMDC(taskExecutionContext.getLogPath());

TODO 打印任务的头部
TaskInstanceLogHeader.printInitializeTaskContextHeader();

TODO 进行任务的初始化,其实就是做了任务的开始时间和taskAppId(流程实例id + 任务实例id)
initializeTask();

TODO DRY_RUN其实就是空跑,其实就是直接设置状态为成功
if (DRY_RUN_FLAG_YES == taskExecutionContext.getDryRun()) {
taskExecutionContext.setCurrentExecutionStatus(TaskExecutionStatus.SUCCESS);
taskExecutionContext.setEndTime(System.currentTimeMillis());
WorkerTaskExecutorHolder.remove(taskExecutionContext.getTaskInstanceId());
TODO 通过worker消息发送器将结果信息发送过去
workerMessageSender.sendMessageWithRetry(taskExecutionContext,
ITaskInstanceExecutionEvent.TaskInstanceExecutionEventType.FINISH);
log.info(
"The current execute mode is dry run, will stop the subsequent process and set the taskInstance status to success");
return;
}
TODO 打印任务插件的头部
TaskInstanceLogHeader.printLoadTaskInstancePluginHeader();

TODO 执行之前
beforeExecute();

TODO 回调函数
TaskCallBack taskCallBack = TaskCallbackImpl.builder()
.workerMessageSender(workerMessageSender)
.taskExecutionContext(taskExecutionContext)
.build();

TaskInstanceLogHeader.printExecuteTaskHeader();
TODO 执行
executeTask(taskCallBack);

TaskInstanceLogHeader.printFinalizeTaskHeader();

TODO 执行之后
afterExecute();

closeLogAppender();
} catch (Throwable ex) {
log.error("Task execute failed, due to meet an exception", ex);
afterThrowing(ex);
closeLogAppender();
} finally {
LogUtils.removeWorkflowAndTaskInstanceIdMDC();
LogUtils.removeTaskInstanceLogFullPathMDC();
}
}

重点分析 :
  • 2.5.1、空跑
    如果是空跑,任务直接成功,不执行
// TODO DRY_RUN其实就是空跑,其实就是直接设置状态为成功
if (DRY_RUN_FLAG_YES == taskExecutionContext.getDryRun()) {
taskExecutionContext.setCurrentExecutionStatus(TaskExecutionStatus.SUCCESS);
taskExecutionContext.setEndTime(System.currentTimeMillis());
WorkerTaskExecutorHolder.remove(taskExecutionContext.getTaskInstanceId());
TODO 通过worker消息发送器将结果信息发送过去
workerMessageSender.sendMessageWithRetry(taskExecutionContext,
ITaskInstanceExecutionEvent.TaskInstanceExecutionEventType.FINISH);
log.info(
"The current execute mode is dry run, will stop the subsequent process and set the taskInstance status to success");
return;
}

  • 2.5.2、 beforeExecute()
执行之前的准备工作,比如说给Master汇报说自己正在运行、创建租户(linux上用户)、创建工作路径、下载所需资源文件、任务初始化**
protected void beforeExecute() {
TODO 先设置为RUNNING状态
taskExecutionContext.setCurrentExecutionStatus(TaskExecutionStatus.RUNNING_EXECUTION);
TODO 向Master发送消息,告诉Master这个任务正在运行
workerMessageSender.sendMessageWithRetry(taskExecutionContext,
ITaskInstanceExecutionEvent.TaskInstanceExecutionEventType.RUNNING);
log.info("Send task status {} master: {}", TaskExecutionStatus.RUNNING_EXECUTION.name(),
taskExecutionContext.getWorkflowInstanceHost());

In most of case the origin tenant is the same as the current tenant
Except `default` tenant. The originTenant is used to download the resources
TODO 租户信息
String originTenant = taskExecutionContext.getTenantCode();
String tenant = TaskExecutionContextUtils.getOrCreateTenant(workerConfig, taskExecutionContext);
taskExecutionContext.setTenantCode(tenant);
log.info("TenantCode: {} check successfully", taskExecutionContext.getTenantCode());

TODO 创建工作路径
TaskExecutionContextUtils.createTaskInstanceWorkingDirectory(taskExecutionContext);
log.info("WorkflowInstanceExecDir: {} check successfully", taskExecutionContext.getExecutePath());

TaskChannel taskChannel =
Optional.ofNullable(taskPluginManager.getTaskChannelMap().get(taskExecutionContext.getTaskType()))
.orElseThrow(() -> new TaskPluginException(taskExecutionContext.getTaskType()
+ " task plugin not found, please check the task type is correct."));

log.info("Create TaskChannel: {} successfully", taskChannel.getClass().getName());

TODO 下载资源
ResourceContext resourceContext = TaskExecutionContextUtils.downloadResourcesIfNeeded(originTenant, taskChannel,
storageOperate, taskExecutionContext);

taskExecutionContext.setResourceContext(resourceContext);
log.info("Download resources successfully: \n{}", taskExecutionContext.getResourceContext());

TaskFilesTransferUtils.downloadUpstreamFiles(taskExecutionContext, storageOperate);
log.info("Download upstream files: {} successfully",
TaskFilesTransferUtils.getFileLocalParams(taskExecutionContext, Direct.IN));

TODO 创建任务
task = taskChannel.createTask(taskExecutionContext);
log.info("Task plugin instance: {} create successfully", taskExecutionContext.getTaskType());

todo: remove the init method, this should initialize in constructor method
TODO 任务进行初始化
task.init();
log.info("Success initialized task plugin instance successfully");

task.getParameters().setVarPool(taskExecutionContext.getVarPool());
log.info("Set taskVarPool: {} successfully", taskExecutionContext.getVarPool());
}

1、日志打印
log.info("Send task status {} master: {}", TaskExecutionStatus.RUNNING_EXECUTION.name(),
taskExecutionContext.getWorkflowInstanceHost());
这里需要打印的是 taskExecutionContext.getWorkflowInstanceHost(),不应该是taskExecutionContext.getHost()。就是说你给Master汇报信息的呢,打印自己Worker节点的host干啥(自己肯定知道啊),有用的是当前Worker节点是给哪个Master节点汇报自己的任务状态的

2、创建租户
org.apache.dolphinscheduler.server.worker.utils.TaskExecutionContextUtils#getOrCreateTenant
public static String getOrCreateTenant(WorkerConfig workerConfig, TaskExecutionContext taskExecutionContext) {
try {
TenantConfig tenantConfig = workerConfig.getTenantConfig();

String tenantCode = taskExecutionContext.getTenantCode();
if (TenantConstants.DEFAULT_TENANT_CODE.equals(tenantCode) && tenantConfig.isDefaultTenantEnabled()) {
log.info("Current tenant is default tenant, will use bootstrap user: {} to execute the task",
TenantConstants.BOOTSTRAPT_SYSTEM_USER);
return TenantConstants.BOOTSTRAPT_SYSTEM_USER;
}
boolean osUserExistFlag;
if Using distributed is true and Currently supported systems are linux,Should not let it
automatically
create tenants,so TenantAutoCreate has no effect
if (tenantConfig.isDistributedTenantEnabled() && SystemUtils.IS_OS_LINUX) {
use the id command to judge in linux
osUserExistFlag = OSUtils.existTenantCodeInLinux(tenantCode);
} else if (OSUtils.isSudoEnable() && tenantConfig.isAutoCreateTenantEnabled()) {
if not exists this user, then create
TODO 默认走的是这里的分支,直接通过 sudo useradd -g %s %s 进行创建
OSUtils.createUserIfAbsent(tenantCode);
osUserExistFlag = OSUtils.getUserList().contains(tenantCode);
} else {
osUserExistFlag = OSUtils.getUserList().contains(tenantCode);
}
if (!osUserExistFlag) {
throw new TaskException(String.format("TenantCode: %s doesn't exist", tenantCode));
}
return tenantCode;
} catch (TaskException ex) {
throw ex;
} catch (Exception ex) {
throw new TaskException(
String.format("TenantCode: %s doesn't exist", taskExecutionContext.getTenantCode()));
}
}

3、TaskChannel
TaskPluginManager Master启动的时候通 google的 @AutoService来完成SPI注册。
Master启动时候TaskPluginManager初始化
org.apache.dolphinscheduler.server.master.MasterServer#run
@PostConstruct
public void run() throws SchedulerException {
......

install task plugin
TODO 是通过 google的 @AutoService来进行SPI注册的
this.taskPluginManager.loadPlugin();

......
}

org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager#loadPlugin
public void loadPlugin() {
if (!loadedFlag.compareAndSet(false, true)) {
log.warn("The task plugin has already been loaded");
return;
}

TODO 实例化的时候是通过SPI进行加载的
PrioritySPIFactory<TaskChannelFactory> prioritySPIFactory = new PrioritySPIFactory<>(TaskChannelFactory.class);
for (Map.Entry<String, TaskChannelFactory> entry : prioritySPIFactory.getSPIMap().entrySet()) {
String factoryName = entry.getKey();
TaskChannelFactory factory = entry.getValue();

log.info("Registering task plugin: {} - {}", factoryName, factory.getClass().getSimpleName());

taskChannelFactoryMap.put(factoryName, factory);
taskChannelMap.put(factoryName, factory.create());

log.info("Registered task plugin: {} - {}", factoryName, factory.getClass().getSimpleName());
}

}

核心逻辑其实就是
TaskChannelFactory 接口 :
public interface TaskChannelFactory extends UiChannelFactory, PrioritySPI {

TaskChannel create();

default SPIIdentify getIdentify() {
return SPIIdentify.builder().name(getName()).build();
}
}

Task插件都实现了TaskChannelFactory接口并使用@AutoService注解 :
以ShellTaskChannelFactory为例 :
@AutoService(TaskChannelFactory.class)
public class ShellTaskChannelFactory implements TaskChannelFactory {

@Override
public TaskChannel create() {
return new ShellTaskChannel();
}

@Override
public String getName() {
return "SHELL";
}

@Override
public List<PluginParams> getParams() {
List<PluginParams> paramsList = new ArrayList<>();

InputParam nodeName = InputParam.newBuilder("name", "$t('Node name')")
.addValidate(Validate.newBuilder()
.setRequired(true)
.build())
.build();

RadioParam runFlag = RadioParam.newBuilder("runFlag", "RUN_FLAG")
.addParamsOptions(new ParamsOptions("NORMAL", "NORMAL", false))
.addParamsOptions(new ParamsOptions("FORBIDDEN", "FORBIDDEN", false))
.build();

paramsList.add(nodeName);
paramsList.add(runFlag);
return paramsList;
}
}

在这里创建了 ShellTaskChannel,也就是TaskChannel
4、下载所需资源
org.apache.dolphinscheduler.server.worker.utils.TaskExecutionContextUtils#downloadResourcesIfNeeded
public static ResourceContext downloadResourcesIfNeeded(String tenant,
TaskChannel taskChannel,
StorageOperate storageOperate,
TaskExecutionContext taskExecutionContext) {
AbstractParameters abstractParameters = taskChannel.parseParameters(
ParametersNode.builder()
.taskType(taskExecutionContext.getTaskType())
.taskParams(taskExecutionContext.getTaskParams())
.build());

TODO 其实这里如果要是Sql,这里直接 ArrayList<>()了,下面就不走了
List<ResourceInfo> resourceFilesList = abstractParameters.getResourceFilesList();
if (CollectionUtils.isEmpty(resourceFilesList)) {
log.debug("There is no resource file need to download");
return new ResourceContext();
}

ResourceContext resourceContext = new ResourceContext();
String taskWorkingDirectory = taskExecutionContext.getExecutePath();

for (ResourceInfo resourceInfo : resourceFilesList) {
TODO 在存储中的路径,比如说hdfs上的文件路径
String resourceAbsolutePathInStorage = resourceInfo.getResourceName();
TODO 文件名称
String resourceRelativePath = storageOperate.getResourceFileName(tenant, resourceAbsolutePathInStorage);
TODO 本地的绝对路径
String resourceAbsolutePathInLocal = Paths.get(taskWorkingDirectory, resourceRelativePath).toString();
File file = new File(resourceAbsolutePathInLocal);
if (!file.exists()) {
try {
long resourceDownloadStartTime = System.currentTimeMillis();
TODO 资源进行下载
storageOperate.download(resourceAbsolutePathInStorage, resourceAbsolutePathInLocal, true);
log.debug("Download resource file {} under: {} successfully", resourceAbsolutePathInStorage,
resourceAbsolutePathInLocal);
FileUtils.setFileTo755(file);
WorkerServerMetrics
.recordWorkerResourceDownloadTime(System.currentTimeMillis() - resourceDownloadStartTime);
WorkerServerMetrics
.recordWorkerResourceDownloadSize(Files.size(Paths.get(resourceAbsolutePathInLocal)));
WorkerServerMetrics.incWorkerResourceDownloadSuccessCount();
} catch (Exception ex) {
WorkerServerMetrics.incWorkerResourceDownloadFailureCount();
throw new TaskException(
String.format("Download resource file: %s error", resourceAbsolutePathInStorage), ex);
}
}

TODO 封装resourceContext
ResourceContext.ResourceItem resourceItem = ResourceContext.ResourceItem.builder()
.resourceAbsolutePathInStorage(resourceAbsolutePathInStorage)
.resourceRelativePath(resourceRelativePath)
.resourceAbsolutePathInLocal(resourceAbsolutePathInLocal)
.build();
resourceContext.addResourceItem(resourceItem);
}
return resourceContext;
}

5、下载上游文件(上下游文件的传递),示例如下 :

upTask:

downTask:

核心逻辑 : 上下游文件传递其实也很简单,就是针对本节点来说就是在本地生成对应的文件,然后上传到比如说HDFS类型的资源中心,然后下游节点会跟进上游taskName.输出变量进行指定资源中心文件的下载
downTask中的downloadUpstreamFiles逻辑:
org.apache.dolphinscheduler.server.worker.utils.TaskFilesTransferUtils#downloadUpstreamFiles
public static void downloadUpstreamFiles(TaskExecutionContext taskExecutionContext, StorageOperate storageOperate) {
TODO 上游传递过来的变量池
List<Property> varPools = getVarPools(taskExecutionContext);

get map of varPools for quick search
Map<String, Property> varPoolsMap = varPools.stream().collect(Collectors.toMap(Property::getProp, x -> x));

get "IN FILE" parameters
TODO 其实就是看localParams的参数有没有为IN的FILE的本地参数
List<Property> localParamsProperty = getFileLocalParams(taskExecutionContext, Direct.IN);

TODO 一般情况下,就是这里就结束了
if (localParamsProperty.isEmpty()) {
return;
}

String executePath = taskExecutionContext.getExecutePath();
data path to download packaged data
TODO 下载的临时目录
String downloadTmpPath = String.format("%s/%s", executePath, DOWNLOAD_TMP);

log.info("Download upstream files...");
for (Property property : localParamsProperty) {
TODO 这里其实就是获取
**
* varPoolsMap 如下 :
* {"prop":"upTask.file-data","direct":"IN","type":"FILE","value":"DATA_TRANSFER/20240624/13978983404960/2_1893/upTask_1320_text.txt"}
* {"prop":"upTask.dir-data","direct":"IN","type":"FILE","value":"DATA_TRANSFER/20240624/13978983404960/2_1893/upTask_1320_data_ds_pack.zip"}
*
* localParamsProperty 如下 :
* {"prop":"input_dir","direct":"IN","type":"FILE","value":"upTask.dir-data"}
*/
TODO 所以这里是不为null的
Property inVarPool = varPoolsMap.get(property.getValue());
if (inVarPool == null) {
log.error("{} not in {}", property.getValue(), varPoolsMap.keySet());
throw new TaskException(String.format("Can not find upstream file using %s, please check the key",
property.getValue()));
}

String resourcePath = inVarPool.getValue();
TODO 其实就是在封装本地的路径
TODO 这里注意啊,比如说脚本中 cat input_dir/test1/text.txt,input_dir这个东西是下载路径拼接上的
String targetPath = String.format("%s/%s", executePath, property.getProp());

String downloadPath;
If the data is packaged, download it to a special directory (DOWNLOAD_TMP) and unpack it to the
targetPath
TODO 判断是否是zip压缩
boolean isPack = resourcePath.endsWith(PACK_SUFFIX);
if (isPack) {
downloadPath = String.format("%s/%s", downloadTmpPath, new File(resourcePath).getName());
} else {
downloadPath = targetPath;
}

try {
TODO 资源中心路径
String resourceWholePath =
storageOperate.getResourceFullName(taskExecutionContext.getTenantCode(), resourcePath);
log.info("{} --- Remote:{} to Local:{}", property, resourceWholePath, downloadPath);
TODO 系在到本地
storageOperate.download(resourceWholePath, downloadPath, true);
} catch (IOException ex) {
throw new TaskException("Download file from storage error", ex);
}

unpack if the data is packaged
if (isPack) {
File downloadFile = new File(downloadPath);
log.info("Unpack {} to {}", downloadPath, targetPath);
TODO 如果是zip就是将本地临时目录下的压缩文件解压到目标路径下
ZipUtil.unpack(downloadFile, new File(targetPath));
}
}

delete DownloadTmp Folder if DownloadTmpPath exists
try {
TODO 临时目录下文件删除掉
org.apache.commons.io.FileUtils.deleteDirectory(new File(downloadTmpPath));
} catch (IOException e) {
log.error("Delete DownloadTmpPath {} failed, this will not affect the task status", downloadTmpPath, e);
}
}

6、创建任务并初始化
其实就是步骤3中,创建完毕TaskChannel,然后调用createTask,返回AbstractTask,然后调用init方法
......
// TODO 创建任务
task = taskChannel.createTask(taskExecutionContext);
log.info("Task plugin instance: {} create successfully", taskExecutionContext.getTaskType());

// todo: remove the init method, this should initialize in constructor method
// TODO 任务进行初始化
task.init();
log.info("Success initialized task plugin instance successfully");
......

7、给AbstractParameters设置变量池
// TODO 给任务设置变量池
// TODO 一般情况下 taskExecutionContext.getVarPool()这里就为null
task.getParameters().setVarPool(taskExecutionContext.getVarPool());
log.info("Set taskVarPool: {} successfully", taskExecutionContext.getVarPool());

注意: 默认情况下,这个taskExecutionContext.getVarPool()
是空的,除非上游有OUT变量
  • 2.5.3、任务执行
// TODO 回调函数,这个还是很关键的把workerMessageSender、taskExecutionContext以构造函数放到了TaskCallBack中
// TODO 所以taskExecutionContext里面是有之前的内容的
TaskCallBack taskCallBack = TaskCallbackImpl.builder()
.workerMessageSender(workerMessageSender)
.taskExecutionContext(taskExecutionContext)
.build();
.......

// TODO 执行
executeTask(taskCallBack);

executeTask(taskCallBack):是核心代码,封装了Worker任务的真正的执行逻辑,参数传递的TaskCallBack,用于任务状态的回报(向Master)
下面就来细说executeTask(taskCallBack)的逻辑 :
public void executeTask(TaskCallBack taskCallBack) throws TaskException {
if (task == null) {
throw new IllegalArgumentException("The task plugin instance is not initialized");
}

TODO 这里会进行真正的任务处理
task.handle(taskCallBack);
}

其中的task其实就是AbstractTask,在beforeExecute中 taskChannel.createTask。是Task抽象父类(以ShellTask为例展开说明,其他任务类型类似)
org.apache.dolphinscheduler.plugin.task.shell.ShellTask#handle
public void handle(TaskCallBack taskCallBack) throws TaskException {
try {

IShellInterceptorBuilder<?, ?> shellActuatorBuilder = ShellInterceptorBuilderFactory.newBuilder()
.properties(ParameterUtils.convert(taskExecutionContext.getPrepareParamsMap())) TODO 这里就是要进行变量的替换
.appendScript(shellParameters.getRawScript());

TODO shell执行
TaskResponse commandExecuteResult = shellCommandExecutor.run(shellActuatorBuilder, taskCallBack);
TODO 执行结果,退出状态码
setExitStatusCode(commandExecuteResult.getExitStatusCode());
TODO 设置进程ID
setProcessId(commandExecuteResult.getProcessId());
TODO shellCommandExecutor.getTaskOutputParams()这返回的是 output -> 123
shellParameters.dealOutParam(shellCommandExecutor.getTaskOutputParams());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.error("The current Shell task has been interrupted", e);
setExitStatusCode(EXIT_CODE_FAILURE);
throw new TaskException("The current Shell task has been interrupted", e);
} catch (Exception e) {
log.error("shell task error", e);
setExitStatusCode(EXIT_CODE_FAILURE);
throw new TaskException("Execute shell task error", e);
}
}

org.apache.dolphinscheduler.plugin.task.api.shell.ShellInterceptorBuilderFactory
默认走的是 BashShellInterceptorBuilder
public class ShellInterceptorBuilderFactory {
private final static String INTERCEPTOR_TYPE = PropertyUtils.getString("shell.interceptor.type", "bash");

@SuppressWarnings("unchecked")
public static IShellInterceptorBuilder newBuilder() {
TODO 默认的走的是这个逻辑
if (INTERCEPTOR_TYPE.equalsIgnoreCase("bash")) {
return new BashShellInterceptorBuilder();
}
if (INTERCEPTOR_TYPE.equalsIgnoreCase("sh")) {
return new ShShellInterceptorBuilder();
}
if (INTERCEPTOR_TYPE.equalsIgnoreCase("cmd")) {
return new CmdShellInterceptorBuilder();
}
throw new IllegalArgumentException("not support shell type: " + INTERCEPTOR_TYPE);
}
}

.properties(ParameterUtils.convert(taskExecutionContext.getPrepareParamsMap()))

是向BaseShellInterceptorBuilder的propertyMap中进行taskExecutionContext.getPrepareParamsMap()参数的设置(注意 : taskExecutionContext.getPrepareParamsMap()是在Master中进行的封装。
.appendScript(shellParameters.getRawScript())

是向BaseShellInterceptorBuilder的scripts进行设置值。
org.apache.dolphinscheduler.plugin.task.api.AbstractCommandExecutor#run
public TaskResponse run(IShellInterceptorBuilder iShellInterceptorBuilder,
TaskCallBack taskCallBack) throws Exception {
TaskResponse result = new TaskResponse();
todo: we need to use state like JDK Thread to make sure the killed task should not be executed
iShellInterceptorBuilder = iShellInterceptorBuilder
TODO 设置执行路径
.shellDirectory(taskRequest.getExecutePath())
TODO 这里设置shell 名字
.shellName(taskRequest.getTaskAppId());

Set system env
TODO 在这里是设置默认的,比如说可以设置为 etc/profile
if (CollectionUtils.isNotEmpty(ShellUtils.ENV_SOURCE_LIST)) {
TODO 这里其实就是向 systemEnvs 中加入ENV_SOURCE_LIST中配置的环境文件的列表
ShellUtils.ENV_SOURCE_LIST.forEach(iShellInterceptorBuilder::appendSystemEnv);
}

Set custom env
TODO 设置自定义的env
if (StringUtils.isNotBlank(taskRequest.getEnvironmentConfig())) {
TODO 向 customEnvScripts 中加入
iShellInterceptorBuilder.appendCustomEnvScript(taskRequest.getEnvironmentConfig());
}

Set k8s config (This is only work in Linux)
if (taskRequest.getK8sTaskExecutionContext() != null) {
iShellInterceptorBuilder.k8sConfigYaml(taskRequest.getK8sTaskExecutionContext().getConfigYaml());
}

Set sudo (This is only work in Linux)
TODO 设置sudo为true的模式
iShellInterceptorBuilder.sudoMode(OSUtils.isSudoEnable());

Set tenant (This is only work in Linux)
TODO 设置租户
iShellInterceptorBuilder.runUser(taskRequest.getTenantCode());

Set CPU Quota (This is only work in Linux)
if (taskRequest.getCpuQuota() != null) {
iShellInterceptorBuilder.cpuQuota(taskRequest.getCpuQuota());
}

Set memory Quota (This is only work in Linux)
if (taskRequest.getMemoryMax() != null) {
iShellInterceptorBuilder.memoryQuota(taskRequest.getMemoryMax());
}

TODO 这个是重点
IShellInterceptor iShellInterceptor = iShellInterceptorBuilder.build();
TODO 使用ProcessBuilder进行执行,支持sudo模式,和bash模式
process = iShellInterceptor.execute();

parse process output
TODO 这里解析到进程的输出
parseProcessOutput(this.process);

collect pod log
collectPodLogIfNeeded();

int processId = getProcessId(this.process);

result.setProcessId(processId);

cache processId
taskRequest.setProcessId(processId);

print process id
log.info("process start, process id is: {}", processId);

if timeout occurs, exit directly
long remainTime = getRemainTime();

update pid before waiting for the run to finish
if (null != taskCallBack) {
TODO 更新任务实例信息
taskCallBack.updateTaskInstanceInfo(processId);
}

waiting for the run to finish
boolean status = this.process.waitFor(remainTime, TimeUnit.SECONDS);

TaskExecutionStatus kubernetesStatus =
ProcessUtils.getApplicationStatus(taskRequest.getK8sTaskExecutionContext(), taskRequest.getTaskAppId());

if (taskOutputFuture != null) {
try {
Wait the task log process finished.
taskOutputFuture.get();
} catch (ExecutionException e) {
log.error("Handle task log error", e);
}
}

if (podLogOutputFuture != null) {
try {
Wait kubernetes pod log collection finished
podLogOutputFuture.get();
delete pod after successful execution and log collection
ProcessUtils.cancelApplication(taskRequest);
} catch (ExecutionException e) {
log.error("Handle pod log error", e);
}
}

if SHELL task exit
if (status && kubernetesStatus.isSuccess()) {

SHELL task state
result.setExitStatusCode(this.process.exitValue());

} else {
log.error("process has failure, the task timeout configuration value is:{}, ready to kill ...",
taskRequest.getTaskTimeout());
result.setExitStatusCode(EXIT_CODE_FAILURE);
cancelApplication();
}
int exitCode = this.process.exitValue();
String exitLogMessage = EXIT_CODE_KILL == exitCode ? "process has killed." : "process has exited.";
log.info("{} execute path:{}, processId:{} ,exitStatusCode:{} ,processWaitForStatus:{} ,processExitValue:{}",
exitLogMessage, taskRequest.getExecutePath(), processId, result.getExitStatusCode(), status, exitCode);
return result;
}

设置默认的环境变量:
// Set system env
// TODO 在这里是设置默认的,比如说可以设置为 etc/profile
if (CollectionUtils.isNotEmpty(ShellUtils.ENV_SOURCE_LIST)) {
TODO 这里其实就是向 systemEnvs 中加入ENV_SOURCE_LIST中配置的环境文件的列表 ShellUtils.ENV_SOURCE_LIST.forEach(iShellInterceptorBuilder::appendSystemEnv);
}

org.apache.dolphinscheduler.plugin.task.api.utils.ShellUtils
public List<String> ENV_SOURCE_LIST = Arrays.stream(
Optional.ofNullable(PropertyUtils.getString("shell.env_source_list")).map(s -> s.split(","))
.orElse(new String[0]))
.map(String::trim)
.filter(StringUtils::isNotBlank)
.collect(Collectors.toList());

读取的是 common.properties
,这里可以配置默认的环境变量
# The default env list will be load by Shell task, e.g. etc/profile,~/.bash_profile
# 默认是空,比如说可以是
shell.env_source_list=/etc/profile

// TODO 这个是重点
IShellInterceptor iShellInterceptor = iShellInterceptorBuilder.build();

org.apache.dolphinscheduler.plugin.task.api.shell.bash.BashShellInterceptorBuilder#build
public BashShellInterceptor build() throws FileOperateException, IOException {
TODO 这里是生成shell脚本的核心点,写到指定目录下
generateShellScript();
TODO 封装命令
List<String> bootstrapCommand = generateBootstrapCommand();
TODO 实例化BashShellInterceptor
return new BashShellInterceptor(bootstrapCommand, shellDirectory);
}

org.apache.dolphinscheduler.plugin.task.api.shell.BaseLinuxShellInterceptorBuilder#bootstrapCommandInSudoMode
注意 : 这个方法里面有两层含义,如果是资源限制走的是bootstrapCommandInResourceLimitMode
,其实这里还蕴藏着一个大大的BUG(我只修改了ShellTask),针对其他类型的Shell封装的任务,比如说MR、Spark、Flink等等,如果走资源限制,这里就有问题,因为这些任务在页面上不能设置CPU和内存的Quota),否则走的是sudo -u 租户 -i opt/xx.sh
private List<String> bootstrapCommandInSudoMode() {
TODO 如果task.resource.limit.state为false,这里的逻辑不会走,也不会走CPU和内存的限制
if (PropertyUtils.getBoolean(AbstractCommandExecutorConstants.TASK_RESOURCE_LIMIT_STATE, false)) {
return bootstrapCommandInResourceLimitMode();
}
List<String> bootstrapCommand = new ArrayList<>();
bootstrapCommand.add("sudo");
if (StringUtils.isNotBlank(runUser)) {
bootstrapCommand.add("-u");
bootstrapCommand.add(runUser);
}
bootstrapCommand.add("-i");
bootstrapCommand.add(shellAbsolutePath().toString());
return bootstrapCommand;
}

// TODO 使用ProcessBuilder进行执行,支持sudo模式,和bash模式
process = iShellInterceptor.execute();

org.apache.dolphinscheduler.plugin.task.api.shell.BaseShellInterceptor#execute
public Process execute() throws IOException {
init process builder
ProcessBuilder processBuilder = new ProcessBuilder();
setting up a working directory
TODO 设置工作路径,目的其实就是在执行脚本的时候,可以在该目录的位置来加载比如说jar包什么的
processBuilder.directory(new File(workingDirectory));
merge error information to standard output stream
processBuilder.redirectErrorStream(true);
processBuilder.command(executeCommands);
log.info("Executing shell command : {}", String.join(" ", executeCommands));
return processBuilder.start();
}

其实就是使用 ProcessBuilder
 进行任务的提交。
org.apache.dolphinscheduler.plugin.task.api.AbstractCommandExecutor#parseProcessOutput
// TODO 解析输出
private void parseProcessOutput(Process process) {
todo: remove this this thread pool.
ExecutorService getOutputLogService = ThreadUtils
.newSingleDaemonScheduledExecutorService("ResolveOutputLog-thread-" + taskRequest.getTaskName());
getOutputLogService.execute(() -> {
TaskOutputParameterParser taskOutputParameterParser = new TaskOutputParameterParser();
TODO 这里正好的读取process.getInputStream()的输入
try (BufferedReader inReader = new BufferedReader(new InputStreamReader(process.getInputStream()))) {
TODO 这里设置了任务的日志路径
LogUtils.setTaskInstanceLogFullPathMDC(taskRequest.getLogPath());
String line;
while ((line = inReader.readLine()) != null) {
TODO 日志缓冲区
logBuffer.add(line);
TODO 这里解析taskOutputParams,解析比如说 echo '${setValue(output=1)}'。其实就是字符串 ${setValue(output=1)}
taskOutputParameterParser.appendParseLog(line);
}
processLogOutputIsSuccess = true;
} catch (Exception e) {
log.error("Parse var pool error", e);
processLogOutputIsSuccess = true;
} finally {
TODO 在这里的时候就将 taskInstanceLogFullPath 删除了
LogUtils.removeTaskInstanceLogFullPathMDC();
}
taskOutputParams = taskOutputParameterParser.getTaskOutputParams();
});

getOutputLogService.shutdown();

ExecutorService parseProcessOutputExecutorService = ThreadUtils
.newSingleDaemonScheduledExecutorService("TaskInstanceLogOutput-thread-" + taskRequest.getTaskName());
taskOutputFuture = parseProcessOutputExecutorService.submit(() -> {
try {
LogUtils.setTaskInstanceLogFullPathMDC(taskRequest.getLogPath());
TODO 对于非pod(k8s)的任务,其实就是processLogOutputIsSuccess这个标识,这个标识是在上面,就是任务运行完毕了
while (logBuffer.size() > 1 || !processLogOutputIsSuccess || !podLogOutputIsFinished) {
if (logBuffer.size() > 1) {
logHandler.accept(logBuffer);
logBuffer.clear();
logBuffer.add(EMPTY_STRING);
} else {
TODO 如果没有日志输出,默认等待1s
Thread.sleep(TaskConstants.DEFAULT_LOG_FLUSH_INTERVAL);
}
}
} catch (Exception e) {
log.error("Output task log error", e);
} finally {
LogUtils.removeTaskInstanceLogFullPathMDC();
}
});
parseProcessOutputExecutorService.shutdown();
}

解说里面核心的两个逻辑
  1. 结果日志打印
protected LinkedBlockingQueue<String> logBuffer;

public AbstractCommandExecutor(Consumer<LinkedBlockingQueue<String>> logHandler,
TaskExecutionContext taskRequest) {
this.logHandler = logHandler;
this.taskRequest = taskRequest;
this.logBuffer = new LinkedBlockingQueue<>();
this.logBuffer.add(EMPTY_STRING);

if (this.taskRequest != null) {
set logBufferEnable=true if the task uses logHandler and logBuffer to buffer log messages
this.taskRequest.setLogBufferEnable(true);
}
}

通过 logBuffer 临时存放日志,供parseProcessOutputExecutorService现成消费
日志的生产端 :

while ((line = inReader.readLine()) != null) {
TODO 日志缓冲区
logBuffer.add(line);
TODO 这里解析taskOutputParams,解析比如说 echo '${setValue(output=1)}'。其实就是字符串 ${setValue(output=1)}
taskOutputParameterParser.appendParseLog(line);
}

日志的消费端 :

this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle, taskExecutionContext);

public void logHandle(LinkedBlockingQueue<String> logs) {

StringJoiner joiner = new StringJoiner("\n\t");
while (!logs.isEmpty()) {
joiner.add(logs.poll());
}
log.info(" -> {}", joiner);
}

while (logBuffer.size() > 1 || !processLogOutputIsSuccess || !podLogOutputIsFinished) {
if (logBuffer.size() > 1) {
logHandler.accept(logBuffer);
logBuffer.clear();
logBuffer.add(EMPTY_STRING);
} else {
TODO 如果没有日志输出,默认等待1s
Thread.sleep(TaskConstants.DEFAULT_LOG_FLUSH_INTERVAL);
}
}

所以查看如果是Shell输出日志都是 -> 开头的,比如说
[INFO] 2024-06-24 09:35:44.678 +0800 - ->
.
├── 1893_1321.sh
└── input_dir
├── test1
│ └── text.txt
└── test2
└── text.txt

3 directories, 3 files
test1 message
test2 message

  1. 解析变量池
while ((line = inReader.readLine()) != null) {
TODO 日志缓冲区
logBuffer.add(line);
TODO 这里解析taskOutputParams,解析比如说 echo '${setValue(output=1)}'。其实就是字符串 ${setValue(output=1)}
taskOutputParameterParser.appendParseLog(line);
}

org.apache.dolphinscheduler.plugin.task.api.parser.TaskOutputParameterParser#appendParseLog
public void appendParseLog(String logLine) {
if (logLine == null) {
return;
}

TODO 刚开始进来,是不会走这里的
if (currentTaskOutputParam != null) {
if (currentTaskOutputParam.size() > maxOneParameterRows
|| currentTaskOutputParamLength > maxOneParameterLength) {
log.warn(
"The output param expression '{}' is too long, the max rows is {}, max length is {}, will skip this param",
String.join("\n", currentTaskOutputParam), maxOneParameterLength, maxOneParameterRows);
currentTaskOutputParam = null;
currentTaskOutputParamLength = 0;
return;
}
continue to parse the rest of line
int i = logLine.indexOf(")}");
if (i == -1) {
the end of var pool not found
currentTaskOutputParam.add(logLine);
currentTaskOutputParamLength += logLine.length();
} else {
the end of var pool found
currentTaskOutputParam.add(logLine.substring(0, i + 2));
Pair<String, String> keyValue = parseOutputParam(String.join("\n", currentTaskOutputParam));
if (keyValue.getKey() != null && keyValue.getValue() != null) {
TODO 解析完毕就放入到taskOutputParams中
taskOutputParams.put(keyValue.getKey(), keyValue.getValue());
}
currentTaskOutputParam = null;
currentTaskOutputParamLength = 0;
continue to parse the rest of line
if (i + 2 != logLine.length()) {
appendParseLog(logLine.substring(i + 2));
}
}
return;
}

int indexOfVarPoolBegin = logLine.indexOf("${setValue(");
if (indexOfVarPoolBegin == -1) {
indexOfVarPoolBegin = logLine.indexOf("#{setValue(");
}
if (indexOfVarPoolBegin == -1) {
return;
}
currentTaskOutputParam = new ArrayList<>();
appendParseLog(logLine.substring(indexOfVarPoolBegin));
}

解析完毕就放入到taskOutputParams中
更新Pid(向Master汇报)
// update pid before waiting for the run to finish
if (null != taskCallBack) {
TODO 更新任务实例信息
taskCallBack.updateTaskInstanceInfo(processId);
}

超时判断
long remainTime = getRemainTime();

private long getRemainTime() {
long usedTime = (System.currentTimeMillis() - taskRequest.getStartTime()) 1000;
long remainTime = taskRequest.getTaskTimeout() - usedTime;

if (remainTime < 0) {
throw new RuntimeException("task execution time out");
}

return remainTime;
}

......
// waiting for the run to finish
// TODO 这里其实就是一个超时等待,其实就是说如果不设置超时等待时间,无限等待
boolean status = this.process.waitFor(remainTime, TimeUnit.SECONDS);

// TODO 设置退出码
// if SHELL task exit
if (status && kubernetesStatus.isSuccess()) {

SHELL task state
result.setExitStatusCode(this.process.exitValue());

} else {
log.error("process has failure, the task timeout configuration value is:{}, ready to kill ...",
taskRequest.getTaskTimeout());
result.setExitStatusCode(EXIT_CODE_FAILURE);
cancelApplication();
}
int exitCode = this.process.exitValue();
String exitLogMessage = EXIT_CODE_KILL == exitCode ? "process has killed." : "process has exited.";
log.info("{} execute path:{}, processId:{} ,exitStatusCode:{} ,processWaitForStatus:{} ,processExitValue:{}",
exitLogMessage, taskRequest.getExecutePath(), processId, result.getExitStatusCode(), status, exitCode);

// TODO 执行结果,退出状态码
setExitStatusCode(commandExecuteResult.getExitStatusCode());
// TODO 设置进程ID
setProcessId(commandExecuteResult.getProcessId());
// TODO shellCommandExecutor.getTaskOutputParams()这返回的是比如说 output -> 123
shellParameters.dealOutParam(shellCommandExecutor.getTaskOutputParams());

org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters#dealOutParam
public void dealOutParam(Map<String, String> taskOutputParams) {
TODO 其实就是说如果localParams不存在,就算设置了输出也不管用
if (CollectionUtils.isEmpty(localParams)) {
return;
}

TODO 这里其实就是过滤出来localParams为OUT的参数
List<Property> outProperty = getOutProperty(localParams);
if (CollectionUtils.isEmpty(outProperty)) {
return;
}


TODO 如果taskOutputParams为空,输出参数会放入到varPool中
if (MapUtils.isEmpty(taskOutputParams)) {
outProperty.forEach(this::addPropertyToValPool);
return;
}

TODO 这里其实就是想说,找到outProperty和taskOutputParams相同的key,然后把对应的value换成taskOutputParams中的value
TODO 最终放到变量池中
for (Property info : outProperty) {
String propValue = taskOutputParams.get(info.getProp());
if (StringUtils.isNotEmpty(propValue)) {
info.setValue(propValue);
addPropertyToValPool(info);
} else {
log.warn("Cannot find the output parameter {} in the task output parameters", info.getProp());
}
}
}

这里其实就是想说,找到outProperty和taskOutputParams相同的key,然后把对应的value换成taskOutputParams中的value,等待向Master汇报存在TaskInstance的变量池中。
  • 2.5.4、任务执行之后(收尾工作)
protected void afterExecute() throws TaskException {
if (task == null) {
throw new TaskException("The current task instance is null");
}
TODO 是否要发送告警,使用JDK动态代理 RPC通信调用alert模块AlertBootstrapService
sendAlertIfNeeded();

TODO 发送结果
sendTaskResult();

WorkerTaskExecutorHolder.remove(taskExecutionContext.getTaskInstanceId());

TODO common.properties development.state=false,默认是false。如果设置true
TODO 就会开发模式,意味着Dolpinscheduler封装的脚本、jar包不清理
log.info("Remove the current task execute context from worker cache");
clearTaskExecPathIfNeeded();
}

发送结果
protected void sendTaskResult() {
taskExecutionContext.setCurrentExecutionStatus(task.getExitStatus());
taskExecutionContext.setProcessId(task.getProcessId());
taskExecutionContext.setAppIds(task.getAppIds());

TODO 其实就是发送变量池,这里是变量池
taskExecutionContext.setVarPool(JSONUtils.toJsonString(task.getParameters().getVarPool()));
taskExecutionContext.setEndTime(System.currentTimeMillis());

upload out files and modify the "OUT FILE" property in VarPool
TODO 上传输出文件并修改输出文件到变量池中
TaskFilesTransferUtils.uploadOutputFiles(taskExecutionContext, storageOperate);
log.info("Upload output files: {} successfully",
TaskFilesTransferUtils.getFileLocalParams(taskExecutionContext, Direct.OUT));

TODO 发送任务的结果
workerMessageSender.sendMessageWithRetry(taskExecutionContext,
ITaskInstanceExecutionEvent.TaskInstanceExecutionEventType.FINISH);
log.info("Send task execute status: {} to master : {}", taskExecutionContext.getCurrentExecutionStatus().name(),
taskExecutionContext.getWorkflowInstanceHost());
}

Shell状态码小插曲
[root@node opt]# vim test.sh
[root@node opt]# sh test.sh
me is journey
[root@node opt]# echo $?
0
[root@node opt]# vim test.sh
[root@node opt]# sh test.sh
test.sh: line 2: echo1: command not found
[root@node opt]# echo $?
127
[root@node opt]# vim test.sh
[root@node opt]# sh test.sh
me is 10.253.26.85
Killed
[root@node opt]# echo $?
137

总结 : 其实就是想说SHELL任务正常的退出码为0,被kill掉的状态码为137。其他为异常。
任务状态码判断逻辑:
taskExecutionContext.setCurrentExecutionStatus(task.getExitStatus());

org.apache.dolphinscheduler.plugin.task.api.AbstractTask#getExitStatus
// 其实就是说如果状态码返回为0,任务为成功;状态码为137为KILL。其他状态为失败。而task.getExitStatus()状态是由executeTask中设置完成的
public TaskExecutionStatus getExitStatus() {
switch (getExitStatusCode()) {
case TaskConstants.EXIT_CODE_SUCCESS:
return TaskExecutionStatus.SUCCESS;
case TaskConstants.EXIT_CODE_KILL:
return TaskExecutionStatus.KILL;
default:
return TaskExecutionStatus.FAILURE;
}
}

上传输出文件到资源中心:
org.apache.dolphinscheduler.server.worker.utils.TaskFilesTransferUtils#uploadOutputFiles
public static void uploadOutputFiles(TaskExecutionContext taskExecutionContext,
StorageOperate storageOperate) throws TaskException {
List<Property> varPools = getVarPools(taskExecutionContext);
get map of varPools for quick search
Map<String, Property> varPoolsMap = varPools.stream().collect(Collectors.toMap(Property::getProp, x -> x));

get OUTPUT FILE parameters
List<Property> localParamsProperty = getFileLocalParams(taskExecutionContext, Direct.OUT);

if (localParamsProperty.isEmpty()) {
return;
}

log.info("Upload output files ...");
for (Property property : localParamsProperty) {
// get local file path
String path = String.format("%s/%s", taskExecutionContext.getExecutePath(), property.getValue());

// TODO packIfDir 和 crc其实就是想说,如果是目录,就对目录进行打zip包,然后生成crc。如果是文件就对文件生成crc
String srcPath = packIfDir(path);

// get crc file path
String srcCRCPath = srcPath + CRC_SUFFIX;
try {
FileUtils.writeContent2File(FileUtils.getFileChecksum(path), srcCRCPath);
} catch (IOException ex) {
throw new TaskException(ex.getMessage(), ex);
}

// get remote file path
// TODO DATA_TRANSFER/DATE/ProcessDefineCode/ProcessDefineVersion_ProcessInstanceID/TaskName_TaskInstanceID_FileName
String resourcePath = getResourcePath(taskExecutionContext, new File(srcPath).getName());
String resourceCRCPath = resourcePath + CRC_SUFFIX;
try {
// upload file to storage
// TODO 以hdfs来说
// TODO hdfs跟路径/tenantCode/resources/DATA_TRANSFER/DATE/ProcessDefineCode/ProcessDefineVersion_ProcessInstanceID/TaskName_TaskInstanceID_FileName
String resourceWholePath =
storageOperate.getResourceFullName(taskExecutionContext.getTenantCode(), resourcePath);
String resourceCRCWholePath =
storageOperate.getResourceFullName(taskExecutionContext.getTenantCode(), resourceCRCPath);
log.info("{} --- Local:{} to Remote:{}", property, srcPath, resourceWholePath);
storageOperate.upload(taskExecutionContext.getTenantCode(), srcPath, resourceWholePath, false, true);
log.info("{} --- Local:{} to Remote:{}", "CRC file", srcCRCPath, resourceCRCWholePath);
storageOperate.upload(taskExecutionContext.getTenantCode(), srcCRCPath, resourceCRCWholePath, false,
true);
} catch (IOException ex) {
throw new TaskException("Upload file to storage error", ex);
}

// update varPool
Property oriProperty;
// if the property is not in varPool, add it
if (varPoolsMap.containsKey(property.getProp())) { // 理论上不会走到这个分支
oriProperty = varPoolsMap.get(property.getProp());
} else {
oriProperty = new Property(property.getProp(), Direct.OUT, DataType.FILE, property.getValue());
// TODO 添加到变量池中
varPools.add(oriProperty);
}

// TODO 这里就设置了任务名称.property name
oriProperty.setProp(String.format("%s.%s", taskExecutionContext.getTaskName(), oriProperty.getProp()));
// TODO 这里很关键,其实就是把资源的相对路径放入到了变量池对应的value中
oriProperty.setValue(resourcePath);
}

// TODO 这里是设置FILE的变量池
taskExecutionContext.setVarPool(JSONUtils.toJsonString(varPools));
}

发送任务的结果 :
workerMessageSender.sendMessageWithRetry(taskExecutionContext,
ITaskInstanceExecutionEvent.TaskInstanceExecutionEventType.FINISH);

3

WorkerMessageSender组件作用



4

Kill任务逻辑



org.apache.dolphinscheduler.server.worker.runner.operator.TaskInstanceKillOperationFunction#operate
public TaskInstanceKillResponse operate(TaskInstanceKillRequest taskInstanceKillRequest) {
log.info("Receive TaskInstanceKillRequest: {}", taskInstanceKillRequest);

// TODO 任务实例
int taskInstanceId = taskInstanceKillRequest.getTaskInstanceId();
try {
LogUtils.setTaskInstanceIdMDC(taskInstanceId);
// TODO Worker任务执行器
WorkerTaskExecutor workerTaskExecutor = WorkerTaskExecutorHolder.get(taskInstanceId);
if (workerTaskExecutor == null) {
log.error("Cannot find WorkerTaskExecutor for taskInstance: {}", taskInstanceId);
return TaskInstanceKillResponse.fail("Cannot find WorkerTaskExecutor");
}

// TODO 任务执行上下文
TaskExecutionContext taskExecutionContext = workerTaskExecutor.getTaskExecutionContext();

LogUtils.setTaskInstanceLogFullPathMDC(taskExecutionContext.getLogPath());

// TODO 这里会进行kill
boolean result = doKill(taskExecutionContext);

// TODO 使用 Process.destroy() 是 Java 中 Process 类的一个方法,用于销毁与该 Process 对象关联的子进程
this.cancelApplication(workerTaskExecutor);

int processId = taskExecutionContext.getProcessId();
// TODO 这里其实想说的是,如果processId为0,直接把该任务的状态设置为KILL,然后在Worker上报信息的时候就会把KILL状态上报上去
// TODO 一定要注意,当前情况不一定是真正的kill掉,只是让DS里面的状态是对的
if (processId == 0) {
workerManager.killTaskBeforeExecuteByInstanceId(taskInstanceId);
taskExecutionContext.setCurrentExecutionStatus(TaskExecutionStatus.KILL);
// todo: the task might be executed, but the processId is 0
WorkerTaskExecutorHolder.remove(taskInstanceId);
log.info("The task has not been executed and has been cancelled, task id:{}", taskInstanceId);
return TaskInstanceKillResponse.success(taskExecutionContext);
}

// TODO 这个其实就是说明,我kill掉了。成功了。然后这个时候Worker其实会感知到任务被kill掉,在他的sendResult FINISH的时候上报
// TODO 上去就可以了
taskExecutionContext
.setCurrentExecutionStatus(result ? TaskExecutionStatus.SUCCESS : TaskExecutionStatus.FAILURE);

WorkerTaskExecutorHolder.remove(taskExecutionContext.getTaskInstanceId());
// TODO 删除重试消息
messageRetryRunner.removeRetryMessages(taskExecutionContext.getTaskInstanceId());
return TaskInstanceKillResponse.success(taskExecutionContext);
} finally {
LogUtils.removeTaskInstanceIdMDC();
LogUtils.removeTaskInstanceLogFullPathMDC();
}
}

杀进程和yarn上的任务 :
// TODO 这里会进行kill
boolean result = doKill(taskExecutionContext);

org.apache.dolphinscheduler.server.worker.runner.operator.TaskInstanceKillOperationFunction#doKill
private boolean doKill(TaskExecutionContext taskExecutionContext) {
// kill system process
// TODO 杀死Shell关联的进程
boolean processFlag = killProcess(taskExecutionContext.getTenantCode(), taskExecutionContext.getProcessId());

// TODO kill yarn or k8s application
try {
ProcessUtils.cancelApplication(taskExecutionContext);
} catch (TaskException e) {
return false;
}
return processFlag;
}

org.apache.dolphinscheduler.server.worker.runner.operator.TaskInstanceKillOperationFunction#killProcess
杀进程和子进程: 注意,这里和官网有修改,如果有异常打印warn就好,因为有时候不能有权限杀死所有进程。
protected boolean killProcess(String tenantCode, Integer processId) {
// todo: directly interrupt the process
if (processId == null || processId.equals(0)) {
return true;
}

try {
String pidsStr = ProcessUtils.getPidsStr(processId);
if (!Strings.isNullOrEmpty(pidsStr)) {
String cmd = String.format("kill -9 %s", pidsStr);
cmd = OSUtils.getSudoCmd(tenantCode, cmd);
log.info("process id:{}, cmd:{}", processId, cmd);
OSUtils.exeCmd(cmd);
}
} catch (Exception e) {
log.warn("kill task error", e);
}
return true;
}

杀死yarn上的任务
org.apache.dolphinscheduler.plugin.task.api.utils.ProcessUtils#cancelApplication
public static void cancelApplication(TaskExecutionContext taskExecutionContext) {
try {
// TODO k8s
if (Objects.nonNull(taskExecutionContext.getK8sTaskExecutionContext())) {
if (!TASK_TYPE_SET_K8S.contains(taskExecutionContext.getTaskType())) {
// Set empty container name for Spark on K8S task
applicationManagerMap.get(ResourceManagerType.KUBERNETES)
.killApplication(new KubernetesApplicationManagerContext(
taskExecutionContext.getK8sTaskExecutionContext(),
taskExecutionContext.getTaskAppId(), ""));
}
} else {
// TODO YARN
String host = taskExecutionContext.getHost();
String executePath = taskExecutionContext.getExecutePath();
String tenantCode = taskExecutionContext.getTenantCode();
List<String> appIds;
// TODO 容错的走这个逻辑
if (StringUtils.isNotEmpty(taskExecutionContext.getAppIds())) {
// is failover
appIds = Arrays.asList(taskExecutionContext.getAppIds().split(COMMA));
} else {
String logPath = taskExecutionContext.getLogPath();
String appInfoPath = taskExecutionContext.getAppInfoPath();
if (logPath == null || appInfoPath == null || executePath == null || tenantCode == null) {
log.error(
"Kill yarn job error, the input params is illegal, host: {}, logPath: {}, appInfoPath: {}, executePath: {}, tenantCode: {}",
host, logPath, appInfoPath, executePath, tenantCode);
throw new TaskException("Cancel application failed!");
}

log.info("Get appIds from worker {}, taskLogPath: {}", host, logPath);
// TODO 这里就是正则解析log获取appIds
appIds = LogUtils.getAppIds(logPath, appInfoPath,
PropertyUtils.getString(APPID_COLLECT, DEFAULT_COLLECT_WAY));
taskExecutionContext.setAppIds(String.join(TaskConstants.COMMA, appIds));
}

// TODO 如果这里说明appIds是不存在的
if (CollectionUtils.isEmpty(appIds)) {
log.info("The appId is empty");
return;
}

ApplicationManager applicationManager = applicationManagerMap.get(ResourceManagerType.YARN);
applicationManager.killApplication(new YarnApplicationManagerContext(executePath, tenantCode, appIds));
}
} catch (Exception e) {
log.error("Cancel application failed.", e);
}
}

task日志中使用正则表达式来解析appIds,这里默认走log,不走aop。
appIds = LogUtils.getAppIds(logPath, appInfoPath,
PropertyUtils.getString(APPID_COLLECT, DEFAULT_COLLECT_WAY));

public List<String> getAppIds(String logPath, String appInfoPath, String fetchWay) {
if (!StringUtils.isEmpty(fetchWay) && fetchWay.equals("aop")) {
log.info("Start finding appId in {}, fetch way: {} ", appInfoPath, fetchWay);
// TODO 如果走aop拦截的写的日志文件中读取
return getAppIdsFromAppInfoFile(appInfoPath);
} else {
log.info("Start finding appId in {}, fetch way: {} ", logPath, fetchWay);
// TODO 从日志中进行正则匹配
return getAppIdsFromLogFile(logPath);
}
}

真正地来杀yarn上的任务
applicationManager.killApplication(new YarnApplicationManagerContext(executePath, tenantCode, appIds));
org.apache.dolphinscheduler.plugin.task.api.am.YarnApplicationManager#killApplication

public boolean killApplication(ApplicationManagerContext applicationManagerContext) throws TaskException {
YarnApplicationManagerContext yarnApplicationManagerContext =
(YarnApplicationManagerContext) applicationManagerContext;
String executePath = yarnApplicationManagerContext.getExecutePath();
String tenantCode = yarnApplicationManagerContext.getTenantCode();
List<String> appIds = yarnApplicationManagerContext.getAppIds();

try {
String commandFile = String.format("%s/%s.kill", executePath, String.join(Constants.UNDERLINE, appIds));
String cmd = getKerberosInitCommand() + "yarn application -kill " + String.join(Constants.SPACE, appIds);
execYarnKillCommand(tenantCode, commandFile, cmd);
} catch (Exception e) {
log.warn("Kill yarn application {} failed", appIds, e);
}
return true;
}
execYarnKillCommand需要注意,因为使用 yarn application -kill。yarn命令可能没有。增加ENV_SOURCE_LIST

private void execYarnKillCommand(String tenantCode, String commandFile,
String cmd) throws Exception {
StringBuilder sb = new StringBuilder();
sb.append("#!/bin/sh\n");
sb.append("BASEDIR=$(cd `dirname $0`; pwd)\n");
sb.append("cd $BASEDIR\n");

// TODO 在这里是设置默认的,比如说可以设置为 /etc/profile
if (CollectionUtils.isNotEmpty(ShellUtils.ENV_SOURCE_LIST)) {
// TODO 这里其实就是向 systemEnvs 中加入ENV_SOURCE_LIST中配置的环境文件的列表
ShellUtils.ENV_SOURCE_LIST.forEach(env -> sb.append("source " + env + "\n"));
}

sb.append("\n\n");
sb.append(cmd);

File f = new File(commandFile);

if (!f.exists()) {
org.apache.commons.io.FileUtils.writeStringToFile(new File(commandFile), sb.toString(),
StandardCharsets.UTF_8);
}

String runCmd = String.format("%s %s", Constants.SH, commandFile);
runCmd = org.apache.dolphinscheduler.common.utils.OSUtils.getSudoCmd(tenantCode, runCmd);
log.info("kill cmd:{}", runCmd);
org.apache.dolphinscheduler.common.utils.OSUtils.exeCmd(runCmd);
}

总结 : 如果成功把任务kill掉了,WorkerTaskExecutor会感知到的,进而进行KILL状态的FINISH汇报。如果任务已经完成,如果PID=0,将任务上下文状态设置为KILL,线程池中移除该WorkerTaskExecutor线程,WorkerTaskExecutorHolder移除该缓存


5

暂停



public class TaskInstancePauseOperationFunction
implements
ITaskInstanceOperationFunction<TaskInstancePauseRequest, TaskInstancePauseResponse> {

@Override
public TaskInstancePauseResponse operate(TaskInstancePauseRequest taskInstancePauseRequest) {
try {
LogUtils.setTaskInstanceIdMDC(taskInstancePauseRequest.getTaskInstanceId());
log.info("Receive TaskInstancePauseRequest: {}", taskInstancePauseRequest);
log.warn("TaskInstancePauseOperationFunction is not support for worker task yet!");
return TaskInstancePauseResponse.success();
} finally {
LogUtils.removeTaskInstanceIdMDC();
}
}
}

划重点 :
其实暂停来说对于Worker来说,什么也不做。也做不了,你想想真的都能让任务暂停么?除非是引擎程序中有所控制,像MR、SPARK、FLINK这种是不能暂停,暂停的核心逻辑是给流程实例发送一个通知,告诉流程实例我要进行流程的暂停,让正在运行任务的下一个任务进行暂停,当然比如说只有一个任务,任务暂停不了,最后只能成功。还有一种情况就是比如说是最后一个任务,也暂停不了。还有就是执行的很快,你暂停的时候,正好程序要往下执行,而下游已没有任务的情况。这种都是暂停不了的。


6

更新流程实例Host



这个属于容错,容错章节再详细说,敬请期待。
转载自Journey
原文链接:
https://segmentfault.com/a/1190000044966573




用户案例



每日互动 惠生工程  作业帮 博世智驾 
蔚来汽车 长城汽车 集度 长安汽车
思科网讯 食行生鲜 联通医疗 联想
新网银行 唯品富邦消费金融  
自如 有赞 伊利 当贝大数据
珍岛集团 传智教育 Bigo
YY直播  三合一 太美医疗
Cisco Webex 兴业证券




迁移实战



Azkaban   Ooize(当贝迁移案例)   
Airflow (有赞迁移案例) 
Air2phin(迁移工具)
Airflow迁移实践



发版消息




Apache DolphinScheduler 3.2.2版本正式发布!
Apache DolphinScheduler 3.2.1 版本发布:增强功能与安全性的全面升级
Apache DolphinScheduler 3.1.9 版本发布:提升系统的稳定性和性能
Apache DolphinScheduler 3.1.8 版本发布,修复 SeaTunnel 相关 Bug
Apache DolphinScheduler 2.0.9 发布,或将是最后一个 2.0.X 版本
Apache DolphinScheduler 发布 3.1.7 版本,修复 SeaTunnel 任务保存错误




加入社区



参与Apache DolphinScheduler 社区有非常多的参与贡献的方式,主要分为代码方式和非代码方式两种。

📂非代码方式包括:

完善文档、翻译文档;翻译技术性、实践性文章;投稿实践性、原理性文章;成为布道师;社区管理、答疑;会议分享;测试反馈;用户反馈等。

👩‍💻代码方式包括:

查找Bug;编写修复代码;开发新功能;提交代码贡献;参与代码审查等。

贡献第一个PR(文档、代码) 我们也希望是简单的,第一个PR用于熟悉提交的流程和社区协作以及感受社区的友好度。

社区汇总了以下适合新手的问题列表https://github.com/apache/dolphinscheduler/pulls?q=is%3Apr+is%3Aopen+label%3A%22first+time+contributor%22

优先级问题列表https://github.com/apache/dolphinscheduler/pulls?q=is%3Apr+is%3Aopen+label%3Apriority%3Ahigh

如何参与贡献链接https://dolphinscheduler.apache.org/zh-cn/docs/3.2.2/%E8%B4%A1%E7%8C%AE%E6%8C%87%E5%8D%97_menu/%E5%A6%82%E4%BD%95%E5%8F%82%E4%B8%8E_menu

如果你❤️小海豚,就来为我点亮Star吧!

https://github.com/apache/dolphinscheduler


你的好友秀秀子拍了拍你

并请你帮她点一下“分享”

文章转载自海豚调度,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论