暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

纯干货 | Dolphinscheduler Master模块源码剖析

海豚调度 2025-04-01
32

点击蓝字



关注我们


此前我们曾用万字长文解释了Apache DolphinScheduler的Worker模块源码,今天,我们再来一起看看Master模块源码的原理。


1

Master Slot计算

核心代码逻辑
org.apache.dolphinscheduler.server.master.registry.MasterSlotManager.SlotChangeListener#notify

public void notify(Map<String, MasterHeartBeat> masterNodeInfo) {
    List<Server> serverList = masterNodeInfo.values().stream()
            TODO 这里其实就是过滤掉buzy的master节点
            .filter(heartBeat -> !heartBeat.getServerStatus().equals(ServerStatus.BUSY))
            .map(this::convertHeartBeatToServer).collect(Collectors.toList());
    TODO 同步master节点
    syncMasterNodes(serverList);
}

复制

计算 totalSlot和currentSlot

private void syncMasterNodes(List<Server> masterNodes) {
    slotLock.lock();
    try {
        this.masterPriorityQueue.clear();
        TODO 这里会把所有的master节点都放入到masterPriorityQueue中,比如说 [192.168.220.1:12345,192.168.220.2:12345]
        this.masterPriorityQueue.putAll(masterNodes);
        TODO 就是获取到本地ip的在队列中的位置
        int tempCurrentSlot = masterPriorityQueue.getIndex(masterConfig.getMasterAddress());
        TODO 所有节点数量
        int tempTotalSlot = masterNodes.size();
        TODO 正常情况下不会小于0
        if (tempCurrentSlot < 0) {
            totalSlot = 0;
            currentSlot = 0;
            log.warn("Current master is not in active master list");
        } else if (tempCurrentSlot != currentSlot || tempTotalSlot != totalSlot) {
            TODO 这里其实就是记录的是比如说一共有两个slot,我的slot是0或者1
            totalSlot = tempTotalSlot;
            currentSlot = tempCurrentSlot;
            log.info("Update master nodes, total master size: {}, current slot: {}", totalSlot, currentSlot);
        }
    } finally {
        slotLock.unlock();
    }
}

复制

this.masterPriorityQueue.putAll(masterNodes); 会计算索引

public void putAll(Collection<Server> serverList) {
    for (Server server : serverList) {
        this.queue.put(server);
    }
    TODO 这里更新了hostIndexMap,存放的是 <host:port> -> 索引
    refreshMasterList();
}


private void refreshMasterList() {
    hostIndexMap.clear();
    Iterator<Server> iterator = queue.iterator();
    int index = 0;
    while (iterator.hasNext()) {
        Server server = iterator.next();
        String addr = NetUtils.getAddr(server.getHost(), server.getPort());
        hostIndexMap.put(addr, index);
        index += 1;
    }

}

复制


2

Master消费Command生成流程实例

command最终的获取逻辑:

比如说两个Master节点 : 
masterCount=2 thisMasterSlot=0  master1
masterCount=2 thisMasterSlot=1  master2

command中的数据如下 :
1 master2
2 master1
3 master2
4 master1

select *
        from t_ds_command
        where id % #{masterCount} = #{thisMasterSlot}
        order by process_instance_priority, id asc
            limit #{limit}

复制

有没有感到疑惑,就是如果一个master更新到的最新的,一个没有更新到,怎么办?

比如说,master1节点是这样的
1  master2
2  master1
3  master2
4  master1

比如说,master2节点是这样的,是不是发现master2节点都是他的,都可以拉取消费?那就导致重复消费,比如说1这个command
1 master1
2 master1
3 master1
4 master1

复制

org.apache.dolphinscheduler.service.process.ProcessServiceImpl#handleCommand

@Transactional
public @Nullable ProcessInstance handleCommand(String host,
                                                   Command command) throws CronParseException, CodeGenerateException {
    TODO 创建流程实例
    ProcessInstance processInstance = constructProcessInstance(command, host);
    cannot construct process instance, return null
    if (processInstance == null) {
        log.error("scan command, command parameter is error: {}", command);
        commandService.moveToErrorCommand(command, "process instance is null");
        return null;
    }
    processInstance.setCommandType(command.getCommandType());
    processInstance.addHistoryCmd(command.getCommandType());
    processInstance.setTestFlag(command.getTestFlag());
    if the processDefinition is serial
    ProcessDefinition processDefinition = this.findProcessDefinition(processInstance.getProcessDefinitionCode(),
            processInstance.getProcessDefinitionVersion());
    TODO 是否是串行执行
    if (processDefinition.getExecutionType().typeIsSerial()) {
        saveSerialProcess(processInstance, processDefinition);
        if (processInstance.getState() != WorkflowExecutionStatus.RUNNING_EXECUTION) {
            setSubProcessParam(processInstance);
            triggerRelationService.saveProcessInstanceTrigger(command.getId(), processInstance.getId());
            deleteCommandWithCheck(command.getId());
            todo: this is a bad design to return null here, whether trigger the task
            return null;
        }
    } else {
        TODO 并行执行
        processInstanceDao.upsertProcessInstance(processInstance);
    }

    TODO 这里其实还会向triggerRelation表中插入一条数据,是流程实例和triggerCode的关系
    triggerRelationService.saveProcessInstanceTrigger(command.getId(), processInstance.getId());
    TODO 设置子流程参数
    setSubProcessParam(processInstance);
    TODO 删除command
    deleteCommandWithCheck(command.getId());
    return processInstance;
}

复制

注意:这个方法是加@Transactional的,所以说创建流程实例和删除Command是在一个事物里面的,如果不同的Master消费到同一个Command。肯定会有一个删除Command失败,这时会抛出一个异常,这样就会让数据库进行回滚。


3

工作流启动流程


4

DAG切分&任务提交


5

Master事件状态流转


图连接 :Master事件状态流转(https://www.processon.com/v/66820e565f927303a1701917)

TaskEventService组件中的TaskEventDispatchThread(线程)和TaskEventHandlerThread(线程)解析

其实就是Master自己状态(DISPATCH)和Worker汇报上来的状态(RUNNING、UPDATE_PID、RESULT)都会放入到eventQueue,TaskEventDispatchThread(线程)会阻塞的方式进行获取,然后放入到对应的TaskExecuteRunnable中(注意 : 不执行的),只有通过TaskEventHandlerThread(线程)才会使用TaskExecuteThreadPool线程进行TaskExecuteRunnable的提交。

转载自Journey
原文链接:https://segmentfault.com/a/1190000044992842





用户案例



每日互动 惠生工程  作业帮 博世智驾
蔚来汽车 长城汽车集度长安汽车
思科网讯食行生鲜联通医疗联想
新网银行唯品富邦消费金融 
自如有赞伊利当贝大数据
珍岛集团传智教育Bigo
YY直播  三合一太美医疗
Cisco Webex兴业证券




迁移实战



Azkaban   Ooize(当贝迁移案例)
Airflow (有赞迁移案例)
Air2phin(迁移工具)
Airflow迁移实践



发版消息




Apache DolphinScheduler 3.2.2版本正式发布!
Apache DolphinScheduler 3.2.1 版本发布:增强功能与安全性的全面升级
Apache DolphinScheduler 3.1.9 版本发布:提升系统的稳定性和性能




加入社区



关注社区的方式有很多:

  • GitHub: https://github.com/apache/dolphinscheduler
  • 官网:https://dolphinscheduler.apache.org/en-us
  • 订阅开发者邮件:dev@dolphinscheduler@apache.org
  • X.com:@DolphinSchedule
  • YouTube:https://www.youtube.com/@apachedolphinscheduler
  • Slack:https://join.slack.com/t/asf-dolphinscheduler/shared_invite/zt-1cmrxsio1-nJHxRJa44jfkrNL_Nsy9Qg

同样地,参与Apache DolphinScheduler 有非常多的参与贡献的方式,主要分为代码方式和非代码方式两种。

📂非代码方式包括:

完善文档、翻译文档;翻译技术性、实践性文章;投稿实践性、原理性文章;成为布道师;社区管理、答疑;会议分享;测试反馈;用户反馈等。

👩‍💻代码方式包括:

查找Bug;编写修复代码;开发新功能;提交代码贡献;参与代码审查等。

贡献第一个PR(文档、代码) 我们也希望是简单的,第一个PR用于熟悉提交的流程和社区协作以及感受社区的友好度。

社区汇总了以下适合新手的问题列表https://github.com/apache/dolphinscheduler/pulls?q=is%3Apr+is%3Aopen+label%3A%22first+time+contributor%22

优先级问题列表https://github.com/apache/dolphinscheduler/pulls?q=is%3Apr+is%3Aopen+label%3Apriority%3Ahigh

如何参与贡献链接https://dolphinscheduler.apache.org/zh-cn/docs/3.2.2/%E8%B4%A1%E7%8C%AE%E6%8C%87%E5%8D%97_menu/%E5%A6%82%E4%BD%95%E5%8F%82%E4%B8%8E_menu

如果你❤️小海豚,就来为我点亮Star吧!

https://github.com/apache/dolphinscheduler


你的好友秀秀子拍了拍你

并请你帮她点一下“分享”


文章转载自海豚调度,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论