目录(本文章基于3.1.0版本研究)
1、数据库表介绍
2、 整体流程运行
3、 源码剖析
3.1 apiserver任务执行入口
3.2 master调度任务
3.2.1 master启动 3.2.2 command扫描 3.2.3 workerFlowEvent消费 3.2.4 workerflow事件处理逻辑 3.2.5 workerflowRunnable运行逻辑 3.2.6 任务消费 3.2.7 任务分派
3.3. worker执行任务
3.3.1 Worker启动
3.3.2 Worker启动command
3.3.3 workerManager消费
3.3.4 任务运行
3.4 master接受任务反馈
3.4.1 master接受反馈消息
3.4.2 taskEventService处理taskevent
3.4.3 TaskResultEventHandler处理taskevent
3.5. master闭环提交下游任务
3.5.1 EventExecuteService处理stateEvent
3.5.2 workflowExecuteThread处理stateEvent事件
3.5.3 TaskStateEventHandler处理stateEvent事件
3.5.4 wokerflowExecuteThread调度下游任务
4、总结
01
数据库表介绍
02
整体流程运行

03
DolphinScheduler源码剖析
3.1 apiserver任务执行入口

3.2 master 调度任务
3.2.1 master启动
DS
3.2.2 command扫描
DS
3.2.3 workerFlowEvent消费
DS
MasterSchedulerBootstrap.start() 方法
@Override
public synchronized void start() {
logger.info("Master schedule bootstrap starting..");
super.start();
workflowEventLooper.start();//工作流调度线程启动
logger.info("Master schedule bootstrap started...");
}
WorkflowEventLooper.run()方法
public void run() {
WorkflowEvent workflowEvent = null;
while (Stopper.isRunning()) {
try {
workflowEvent = workflowEventQueue.poolEvent();//拉取workflowevent
LoggerUtils.setWorkflowInstanceIdMDC(workflowEvent.getWorkflowInstanceId());
logger.info("Workflow event looper receive a workflow event: {}, will handle this", workflowEvent);
WorkflowEventHandler workflowEventHandler =
workflowEventHandlerMap.get(workflowEvent.getWorkflowEventType());//获取workflowevent,处理workflowevent事件
workflowEventHandler.handleWorkflowEvent(workflowEvent);
} catch (InterruptedException e) {
logger.warn("WorkflowEventLooper thread is interrupted, will close this loop", e);
Thread.currentThread().interrupt();
break;
} catch (WorkflowEventHandleException workflowEventHandleException) {
logger.error("Handle workflow event failed, will add this event to event queue again, event: {}",
workflowEvent, workflowEventHandleException);
workflowEventQueue.addEvent(workflowEvent);
ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);
} catch (WorkflowEventHandleError workflowEventHandleError) {
logger.error("Handle workflow event error, will drop this event, event: {}",
workflowEvent,
workflowEventHandleError);
} catch (Exception unknownException) {
logger.error(
"Handle workflow event failed, get a unknown exception, will add this event to event queue again, event: {}",
workflowEvent, unknownException);
workflowEventQueue.addEvent(workflowEvent);
ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);
} finally {
LoggerUtils.removeWorkflowInstanceIdMDC();
}
}
}
3.2.4 workerflow事件处理逻辑
DS
因为是START_WORKFLOW类型的所以获取到 WorkflowStartEventHandler.handleWorkflowEvent() 来处理该事件。
@Override
public void handleWorkflowEvent(WorkflowEvent workflowEvent) throws WorkflowEventHandleError {
logger.info("Handle workflow start event, begin to start a workflow, event: {}", workflowEvent);
//获取WorkflowExecuteRunnable
WorkflowExecuteRunnable workflowExecuteRunnable =
processInstanceExecCacheManager.getByProcessInstanceId(workflowEvent.getWorkflowInstanceId());
if (workflowExecuteRunnable == null) {
throw new WorkflowEventHandleError(
"The workflow start event is invalid, cannot find the workflow instance from cache");
}
ProcessInstance processInstance = workflowExecuteRunnable.getProcessInstance();
ProcessInstanceMetrics.incProcessInstanceSubmit();
//异步调用call方法执行workflowExecute运行逻辑。
CompletableFuture<WorkflowSubmitStatue> workflowSubmitFuture =
CompletableFuture.supplyAsync(workflowExecuteRunnable::call, workflowExecuteThreadPool);
workflowSubmitFuture.thenAccept(workflowSubmitStatue -> {
if (WorkflowSubmitStatue.SUCCESS == workflowSubmitStatue) {
// submit failed will resend the event to workflow event queue
logger.info("Success submit the workflow instance");//监听返回状态是否成功
if (processInstance.getTimeout() > 0) {//是否超时
stateWheelExecuteThread.addProcess4TimeoutCheck(processInstance);
}
} else {//出现异常,重试,重新进入队列,调用call方法
logger.error("Failed to submit the workflow instance, will resend the workflow start event: {}",
workflowEvent);
workflowEventQueue.addEvent(new WorkflowEvent(WorkflowEventType.START_WORKFLOW,
processInstance.getId()));
}
});
}
3.2.5 workerflowRunnable运行逻辑
DS
3.2.6 任务消费
DS
//通过注解启动
@PostConstruct
public void init() {
this.consumerThreadPoolExecutor = (ThreadPoolExecutor) ThreadUtils.newDaemonFixedThreadExecutor("TaskUpdateQueueConsumerThread", masterConfig.getDispatchTaskNumber());
logger.info("Task priority queue consume thread staring");
super.start();
logger.info("Task priority queue consume thread started");
}
@Override
public void run() {
int fetchTaskNum = masterConfig.getDispatchTaskNumber();
while (Stopper.isRunning()) {
try {
List<TaskPriority> failedDispatchTasks = this.batchDispatch(fetchTaskNum);
if (CollectionUtils.isNotEmpty(failedDispatchTasks)) {
TaskMetrics.incTaskDispatchFailed(failedDispatchTasks.size());
for (TaskPriority dispatchFailedTask : failedDispatchTasks) {
taskPriorityQueue.put(dispatchFailedTask);
}
// If the all task dispatch failed, will sleep for 1s to avoid the master cpu higher.
if (fetchTaskNum == failedDispatchTasks.size()) {
TimeUnit.MILLISECONDS.sleep(Constants.SLEEP_TIME_MILLIS);
}
}
} catch (Exception e) {
TaskMetrics.incTaskDispatchError();
logger.error("dispatcher task error", e);
}
}
}
批量调度任务
TaskPriorityQueueConsumer.batchDispatch()
/**
* batch dispatch with thread pool
*/
public List<TaskPriority> batchDispatch(int fetchTaskNum) throws TaskPriorityQueueException, InterruptedException {
List<TaskPriority> failedDispatchTasks = Collections.synchronizedList(new ArrayList<>());
CountDownLatch latch = new CountDownLatch(fetchTaskNum);
for (int i = 0; i < fetchTaskNum; i++) {//拉取任务
TaskPriority taskPriority = taskPriorityQueue.poll(Constants.SLEEP_TIME_MILLIS, TimeUnit.MILLISECONDS);
if (Objects.isNull(taskPriority)) {
latch.countDown();
continue;
}
consumerThreadPoolExecutor.submit(() -> {//创建异步线程分派任务
try {
boolean dispatchResult = this.dispatchTask(taskPriority);
if (!dispatchResult) {
failedDispatchTasks.add(taskPriority);
}
} finally {
// make sure the latch countDown
latch.countDown();
}
});
}
latch.await();
return failedDispatchTasks;
}
3.2.7 任务分派
DS
/**
* task dispatch
*
* @param context context
* @return result
* @throws ExecuteException if error throws ExecuteException
*/
public Boolean dispatch(final ExecutionContext context) throws ExecuteException {
// get executor manager 获取
ExecutorManager<Boolean> executorManager = this.executorManagers.get(context.getExecutorType());
if (executorManager == null) {
throw new ExecuteException("no ExecutorManager for type : " + context.getExecutorType());
}
// host select worke节点选择器,选择worker节点
Host host = hostManager.select(context);
if (StringUtils.isEmpty(host.getAddress())) {
logger.warn("fail to execute : {} due to no suitable worker, current task needs worker group {} to execute",
context.getCommand(), context.getWorkerGroup());
return false;
}
context.setHost(host);//设置host进上下文
executorManager.beforeExecute(context);
try {
// task execute 通过netty发送消息给worker,告知worker要执行该任务
return executorManager.execute(context);
} finally {
executorManager.afterExecute(context);
}
}
/**
* execute logic
*
* @param context context
* @return result
* @throws ExecuteException if error throws ExecuteException
*/
@Override
public Boolean execute(ExecutionContext context) throws ExecuteException {
// all nodes
Set<String> allNodes = getAllNodes(context);
// fail nodes
Set<String> failNodeSet = new HashSet<>();
// build command accord executeContext
Command command = context.getCommand();
// execute task host
Host host = context.getHost();
boolean success = false;
while (!success) {
try {
doExecute(host, command);
success = true;
context.setHost(host);
// We set the host to taskInstance to avoid when the worker down, this taskInstance may not be failovered, due to the taskInstance's host
// is not belongs to the down worker ISSUE-10842.
context.getTaskInstance().setHost(host.getAddress());
} catch (ExecuteException ex) {
logger.error(String.format("execute command : %s error", command), ex);
try {
failNodeSet.add(host.getAddress());
Set<String> tmpAllIps = new HashSet<>(allNodes);
Collection<String> remained = CollectionUtils.subtract(tmpAllIps, failNodeSet);
if (remained != null && remained.size() > 0) {
host = Host.of(remained.iterator().next());
logger.error("retry execute command : {} host : {}", command, host);
} else {
throw new ExecuteException("fail after try all nodes");
}
} catch (Throwable t) {
throw new ExecuteException("fail after try all nodes");
}
}
}
return success;
}
3.3 worker执行任务
3.3.1 worker启动
DS
WorkerServer.run()
public void run() {
this.workerRpcServer.start();//启动rpc通信组件
this.workerRpcClient.start();
this.taskPluginManager.loadPlugin();
this.workerRegistryClient.registry();
this.workerRegistryClient.setRegistryStoppable(this);
this.workerRegistryClient.handleDeadServer();
//启动workermaanager线程,消费command执行任务。
this.workerManagerThread.start();
this.messageRetryRunner.start();
/*
* registry hooks, which are called before the process exits
*/
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
if (Stopper.isRunning()) {
close("WorkerServer shutdown hook");
}
}));
}
3.3.2 worker消费任务启动command
DS
3.3.3 workerManager 消费任务
DS
3.3.4 任务运行
DS
@Override
public void run() {
try {
LoggerUtils.setWorkflowAndTaskInstanceIDMDC(taskExecutionContext.getProcessInstanceId(),
taskExecutionContext.getTaskInstanceId());
if (Constants.DRY_RUN_FLAG_YES == taskExecutionContext.getDryRun()) {//判断是不是测试运行
taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.SUCCESS);
taskExecutionContext.setStartTime(new Date());
taskExecutionContext.setEndTime(new Date());
TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
workerMessageSender.sendMessageWithRetry(taskExecutionContext,
masterAddress,
CommandType.TASK_EXECUTE_RESULT);
logger.info("Task dry run success");
return;
}
} finally {
LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
}
try {
LoggerUtils.setWorkflowAndTaskInstanceIDMDC(taskExecutionContext.getProcessInstanceId(),
taskExecutionContext.getTaskInstanceId());
logger.info("script path : {}", taskExecutionContext.getExecutePath());
if (taskExecutionContext.getStartTime() == null) {
taskExecutionContext.setStartTime(new Date());
}
logger.info("the task begins to execute. task instance id: {}", taskExecutionContext.getTaskInstanceId());
// callback task execute running
taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.RUNNING_EXECUTION);
workerMessageSender.sendMessageWithRetry(taskExecutionContext,
masterAddress,
CommandType.TASK_EXECUTE_RUNNING);
// copy hdfs/minio file to local
List<Pair<String, String>> fileDownloads = downloadCheck(taskExecutionContext.getExecutePath(),
taskExecutionContext.getResources());
if (!fileDownloads.isEmpty()) {
downloadResource(taskExecutionContext.getExecutePath(), logger, fileDownloads);
}
taskExecutionContext.setEnvFile(CommonUtils.getSystemEnvPath());
taskExecutionContext.setTaskAppId(String.format("%s_%s",
taskExecutionContext.getProcessInstanceId(),
taskExecutionContext.getTaskInstanceId()));
TaskChannel taskChannel = taskPluginManager.getTaskChannelMap().get(taskExecutionContext.getTaskType());//获取task构造器
if (null == taskChannel) {
throw new ServiceException(String.format("%s Task Plugin Not Found,Please Check Config File.", taskExecutionContext.getTaskType()));
}
String taskLogName = LoggerUtils.buildTaskId(taskExecutionContext.getFirstSubmitTime(),
taskExecutionContext.getProcessDefineCode(),
taskExecutionContext.getProcessDefineVersion(),
taskExecutionContext.getProcessInstanceId(),
taskExecutionContext.getTaskInstanceId());
taskExecutionContext.setTaskLogName(taskLogName);
// set the name of the current thread
Thread.currentThread().setName(taskLogName);
task = taskChannel.createTask(taskExecutionContext);//创建task
// task init
this.task.init();//初始化task
//init varPool
this.task.getParameters().setVarPool(taskExecutionContext.getVarPool());
// task handle
this.task.handle();//task处理的真正逻辑,每一种task对应自己的handle逻辑
// task result process
if (this.task.getNeedAlert()) {
sendAlert(this.task.getTaskAlertInfo(), this.task.getExitStatus().getCode());
}
taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.of(this.task.getExitStatus().getCode()));//返回任务的返回信息
taskExecutionContext.setEndTime(DateUtils.getCurrentDate());
taskExecutionContext.setProcessId(this.task.getProcessId());
taskExecutionContext.setAppIds(this.task.getAppIds());
taskExecutionContext.setVarPool(JSONUtils.toJsonString(this.task.getParameters().getVarPool()));
logger.info("task instance id : {},task final status : {}", taskExecutionContext.getTaskInstanceId(), this.task.getExitStatus());
} catch (Throwable e) {
logger.error("task scheduler failure", e);
kill();
taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.FAILURE);//返回失败
taskExecutionContext.setEndTime(DateUtils.getCurrentDate());
taskExecutionContext.setProcessId(this.task.getProcessId());
taskExecutionContext.setAppIds(this.task.getAppIds());
} finally {
TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());//返回任务运行结果
workerMessageSender.sendMessageWithRetry(taskExecutionContext,
masterAddress,
CommandType.TASK_EXECUTE_RESULT);
clearTaskExecPath();
LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
}
}
3.4 master接受任务反馈
3.4.1 master接受反馈消息
DS
Worker 反馈任务运行消息则是 Task_EXECUTE_RESULT 消息类型
MasterRPCServer.init()
@Override
public void run() {
try {
LoggerUtils.setWorkflowAndTaskInstanceIDMDC(taskExecutionContext.getProcessInstanceId(),
taskExecutionContext.getTaskInstanceId());
if (Constants.DRY_RUN_FLAG_YES == taskExecutionContext.getDryRun()) {//判断是不是测试运行
taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.SUCCESS);
taskExecutionContext.setStartTime(new Date());
taskExecutionContext.setEndTime(new Date());
TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
workerMessageSender.sendMessageWithRetry(taskExecutionContext,
masterAddress,
CommandType.TASK_EXECUTE_RESULT);
logger.info("Task dry run success");
return;
}
} finally {
LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
}
try {
LoggerUtils.setWorkflowAndTaskInstanceIDMDC(taskExecutionContext.getProcessInstanceId(),
taskExecutionContext.getTaskInstanceId());
logger.info("script path : {}", taskExecutionContext.getExecutePath());
if (taskExecutionContext.getStartTime() == null) {
taskExecutionContext.setStartTime(new Date());
}
logger.info("the task begins to execute. task instance id: {}", taskExecutionContext.getTaskInstanceId());
// callback task execute running
taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.RUNNING_EXECUTION);
workerMessageSender.sendMessageWithRetry(taskExecutionContext,
masterAddress,
CommandType.TASK_EXECUTE_RUNNING);
// copy hdfs/minio file to local
List<Pair<String, String>> fileDownloads = downloadCheck(taskExecutionContext.getExecutePath(),
taskExecutionContext.getResources());
if (!fileDownloads.isEmpty()) {
downloadResource(taskExecutionContext.getExecutePath(), logger, fileDownloads);
}
taskExecutionContext.setEnvFile(CommonUtils.getSystemEnvPath());
taskExecutionContext.setTaskAppId(String.format("%s_%s",
taskExecutionContext.getProcessInstanceId(),
taskExecutionContext.getTaskInstanceId()));
TaskChannel taskChannel = taskPluginManager.getTaskChannelMap().get(taskExecutionContext.getTaskType());//获取task构造器
if (null == taskChannel) {
throw new ServiceException(String.format("%s Task Plugin Not Found,Please Check Config File.", taskExecutionContext.getTaskType()));
}
String taskLogName = LoggerUtils.buildTaskId(taskExecutionContext.getFirstSubmitTime(),
taskExecutionContext.getProcessDefineCode(),
taskExecutionContext.getProcessDefineVersion(),
taskExecutionContext.getProcessInstanceId(),
taskExecutionContext.getTaskInstanceId());
taskExecutionContext.setTaskLogName(taskLogName);
// set the name of the current thread
Thread.currentThread().setName(taskLogName);
task = taskChannel.createTask(taskExecutionContext);//创建task
// task init
this.task.init();//初始化task
//init varPool
this.task.getParameters().setVarPool(taskExecutionContext.getVarPool());
// task handle
this.task.handle();//task处理的真正逻辑,每一种task对应自己的handle逻辑
// task result process
if (this.task.getNeedAlert()) {
sendAlert(this.task.getTaskAlertInfo(), this.task.getExitStatus().getCode());
}
taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.of(this.task.getExitStatus().getCode()));//返回任务的返回信息
taskExecutionContext.setEndTime(DateUtils.getCurrentDate());
taskExecutionContext.setProcessId(this.task.getProcessId());
taskExecutionContext.setAppIds(this.task.getAppIds());
taskExecutionContext.setVarPool(JSONUtils.toJsonString(this.task.getParameters().getVarPool()));
logger.info("task instance id : {},task final status : {}", taskExecutionContext.getTaskInstanceId(), this.task.getExitStatus());
} catch (Throwable e) {
logger.error("task scheduler failure", e);
kill();
taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.FAILURE);//返回失败
taskExecutionContext.setEndTime(DateUtils.getCurrentDate());
taskExecutionContext.setProcessId(this.task.getProcessId());
taskExecutionContext.setAppIds(this.task.getAppIds());
} finally {
TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());//返回任务运行结果
workerMessageSender.sendMessageWithRetry(taskExecutionContext,
masterAddress,
CommandType.TASK_EXECUTE_RESULT);
clearTaskExecPath();
LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
}
}
将消息封装成 taskResultEvent 提交给taskEventService
TaskExecuteResponseProcessor.run()
@Override
public void process(Channel channel, Command command) {
Preconditions.checkArgument(CommandType.TASK_EXECUTE_RESULT == command.getType(),
String.format("invalid command type : %s", command.getType()));
TaskExecuteResultCommand taskExecuteResultMessage = JSONUtils.parseObject(command.getBody(),
TaskExecuteResultCommand.class);
TaskEvent taskResultEvent = TaskEvent.newResultEvent(taskExecuteResultMessage,
channel,
taskExecuteResultMessage.getMessageSenderAddress());
try {
LoggerUtils.setWorkflowAndTaskInstanceIDMDC(taskResultEvent.getProcessInstanceId(),
taskResultEvent.getTaskInstanceId());
logger.info("Received task execute result, event: {}", taskResultEvent);
//提交事件给taskEventService
taskEventService.addEvent(taskResultEvent);
} finally {
LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
}
}
3.4.2 taskEventService处理taskevent
DS
3.4.3 TaskResultEventHandler 处理 taskevent
DS
3.5.1 EventExecuteService 处理 stateEvent
DS
3.5.3 TaskStateEventHandler处理stateEvent事件
DS
判断是否是任务完成事件如果是完成事件,调用 workflowExecuteThread 的 taskFinished 方法,提交下游任务。
@Override
public boolean handleStateEvent(WorkflowExecuteRunnable workflowExecuteRunnable, StateEvent stateEvent)
throws StateEventHandleException, StateEventHandleError {
measureTaskState(stateEvent);
workflowExecuteRunnable.checkTaskInstanceByStateEvent(stateEvent);
Optional<TaskInstance> taskInstanceOptional =
workflowExecuteRunnable.getTaskInstance(stateEvent.getTaskInstanceId());
TaskInstance task = taskInstanceOptional.orElseThrow(() -> new StateEventHandleError(
"Cannot find task instance from taskMap by task instance id: " + stateEvent.getTaskInstanceId()));
if (task.getState() == null) {
throw new StateEventHandleError("Task state event handle error due to task state is null");
}
Map<Long, Integer> completeTaskMap = workflowExecuteRunnable.getCompleteTaskMap();
if (task.getState().typeIsFinished()) {
if (completeTaskMap.containsKey(task.getTaskCode())
&& completeTaskMap.get(task.getTaskCode()) == task.getId()) {
logger.warn("The task instance is already complete, stateEvent: {}", stateEvent);
return true;
}
//调用workflowexecuteRunnabletaskFinished方法。
workflowExecuteRunnable.taskFinished(task);
if (task.getTaskGroupId() > 0) {
workflowExecuteRunnable.releaseTaskGroup(task);
}
return true;
}
Map<Long, ITaskProcessor> activeTaskProcessMap = workflowExecuteRunnable.getActiveTaskProcessMap();
if (activeTaskProcessMap.containsKey(task.getTaskCode())) {
ITaskProcessor iTaskProcessor = activeTaskProcessMap.get(task.getTaskCode());
iTaskProcessor.action(TaskAction.RUN);
if (iTaskProcessor.taskInstance().getState().typeIsFinished()) {
if (iTaskProcessor.taskInstance().getState() != task.getState()) {
task.setState(iTaskProcessor.taskInstance().getState());
}
workflowExecuteRunnable.taskFinished(task);
}
return true;
}
throw new StateEventHandleException(
"Task state event handle error, due to the task is not in activeTaskProcessorMaps");
}
3.5.4 wokerflowExecuteThread调度下游任务
DS
4.1 各个组件的使用
DS
4.2 线程作用
DS
参与贡献
随着国内开源的迅猛崛起,Apache DolphinScheduler 社区迎来蓬勃发展,为了做更好用、易用的调度,真诚欢迎热爱开源的伙伴加入到开源社区中来,为中国开源崛起献上一份自己的力量,让本土开源走向全球。

参与 DolphinScheduler 社区有非常多的参与贡献的方式,包括:

贡献第一个PR(文档、代码) 我们也希望是简单的,第一个PR用于熟悉提交的流程和社区协作以及感受社区的友好度。
社区汇总了以下适合新手的问题列表:https://github.com/apache/dolphinscheduler/issues/5689
非新手问题列表:https://github.com/apache/dolphinscheduler/issues?q=is%3Aopen+is%3Aissue+label%3A%22volunteer+wanted%22
如何参与贡献链接:https://dolphinscheduler.apache.org/zh-cn/community/development/contribute.html
来吧,DolphinScheduler开源社区需要您的参与,为中国开源崛起添砖加瓦吧,哪怕只是小小的一块瓦,汇聚起来的力量也是巨大的。
参与开源可以近距离与各路高手切磋,迅速提升自己的技能,如果您想参与贡献,我们有个贡献者种子孵化群,可以添加社区小助手微信(Leonard-ds) ,手把手教会您( 贡献者不分水平高低,有问必答,关键是有一颗愿意贡献的心 )。
添加社区小助手微信(Leonard-ds)
添加小助手微信时请说明想参与贡献。
来吧,开源社区非常期待您的参与。
☞DolphinScheduler 登陆 AWS AMI 应用市场!
☞DolphinScheduler 机器学习工作流预测今年 FIFA 世界杯冠军大概率是荷兰!
☞手把手教你上手Apache DolphinScheduler机器学习工作流
☞DolphinScheduler 快速构建 Hugging Face 文本分类工作流,基于工作流的机器学习训练部署太强了!
☞最佳实践 | 如何基于GitHub Actions创建 DolphinScheduler Python API的CI/CD?
