点击蓝字
关注我们
此前我们曾用万字长文解释了Apache DolphinScheduler的Worker模块源码,今天,我们再来一起看看Master模块源码的原理。
1
Master Slot计算

核心代码逻辑:
org.apache.dolphinscheduler.server.master.registry.MasterSlotManager.SlotChangeListener#notify
public void notify(Map<String, MasterHeartBeat> masterNodeInfo) {
List<Server> serverList = masterNodeInfo.values().stream()
TODO 这里其实就是过滤掉buzy的master节点
.filter(heartBeat -> !heartBeat.getServerStatus().equals(ServerStatus.BUSY))
.map(this::convertHeartBeatToServer).collect(Collectors.toList());
TODO 同步master节点
syncMasterNodes(serverList);
}复制
计算 totalSlot和currentSlot
private void syncMasterNodes(List<Server> masterNodes) {
slotLock.lock();
try {
this.masterPriorityQueue.clear();
TODO 这里会把所有的master节点都放入到masterPriorityQueue中,比如说 [192.168.220.1:12345,192.168.220.2:12345]
this.masterPriorityQueue.putAll(masterNodes);
TODO 就是获取到本地ip的在队列中的位置
int tempCurrentSlot = masterPriorityQueue.getIndex(masterConfig.getMasterAddress());
TODO 所有节点数量
int tempTotalSlot = masterNodes.size();
TODO 正常情况下不会小于0
if (tempCurrentSlot < 0) {
totalSlot = 0;
currentSlot = 0;
log.warn("Current master is not in active master list");
} else if (tempCurrentSlot != currentSlot || tempTotalSlot != totalSlot) {
TODO 这里其实就是记录的是比如说一共有两个slot,我的slot是0或者1
totalSlot = tempTotalSlot;
currentSlot = tempCurrentSlot;
log.info("Update master nodes, total master size: {}, current slot: {}", totalSlot, currentSlot);
}
} finally {
slotLock.unlock();
}
}复制
this.masterPriorityQueue.putAll(masterNodes); 会计算索引
public void putAll(Collection<Server> serverList) {
for (Server server : serverList) {
this.queue.put(server);
}
TODO 这里更新了hostIndexMap,存放的是 <host:port> -> 索引
refreshMasterList();
}
private void refreshMasterList() {
hostIndexMap.clear();
Iterator<Server> iterator = queue.iterator();
int index = 0;
while (iterator.hasNext()) {
Server server = iterator.next();
String addr = NetUtils.getAddr(server.getHost(), server.getPort());
hostIndexMap.put(addr, index);
index += 1;
}
}复制
2
Master消费Command生成流程实例

command最终的获取逻辑:
比如说两个Master节点 :
masterCount=2 thisMasterSlot=0 master1
masterCount=2 thisMasterSlot=1 master2
command中的数据如下 :
1 master2
2 master1
3 master2
4 master1
select *
from t_ds_command
where id % #{masterCount} = #{thisMasterSlot}
order by process_instance_priority, id asc
limit #{limit}复制
有没有感到疑惑,就是如果一个master更新到的最新的,一个没有更新到,怎么办?
比如说,master1节点是这样的
1 master2
2 master1
3 master2
4 master1
比如说,master2节点是这样的,是不是发现master2节点都是他的,都可以拉取消费?那就导致重复消费,比如说1这个command
1 master1
2 master1
3 master1
4 master1复制
org.apache.dolphinscheduler.service.process.ProcessServiceImpl#handleCommand
@Transactional
public @Nullable ProcessInstance handleCommand(String host,
Command command) throws CronParseException, CodeGenerateException {
TODO 创建流程实例
ProcessInstance processInstance = constructProcessInstance(command, host);
cannot construct process instance, return null
if (processInstance == null) {
log.error("scan command, command parameter is error: {}", command);
commandService.moveToErrorCommand(command, "process instance is null");
return null;
}
processInstance.setCommandType(command.getCommandType());
processInstance.addHistoryCmd(command.getCommandType());
processInstance.setTestFlag(command.getTestFlag());
if the processDefinition is serial
ProcessDefinition processDefinition = this.findProcessDefinition(processInstance.getProcessDefinitionCode(),
processInstance.getProcessDefinitionVersion());
TODO 是否是串行执行
if (processDefinition.getExecutionType().typeIsSerial()) {
saveSerialProcess(processInstance, processDefinition);
if (processInstance.getState() != WorkflowExecutionStatus.RUNNING_EXECUTION) {
setSubProcessParam(processInstance);
triggerRelationService.saveProcessInstanceTrigger(command.getId(), processInstance.getId());
deleteCommandWithCheck(command.getId());
todo: this is a bad design to return null here, whether trigger the task
return null;
}
} else {
TODO 并行执行
processInstanceDao.upsertProcessInstance(processInstance);
}
TODO 这里其实还会向triggerRelation表中插入一条数据,是流程实例和triggerCode的关系
triggerRelationService.saveProcessInstanceTrigger(command.getId(), processInstance.getId());
TODO 设置子流程参数
setSubProcessParam(processInstance);
TODO 删除command
deleteCommandWithCheck(command.getId());
return processInstance;
}复制
注意:这个方法是加@Transactional的,所以说创建流程实例和删除Command是在一个事物里面的,如果不同的Master消费到同一个Command。肯定会有一个删除Command失败,这时会抛出一个异常,这样就会让数据库进行回滚。
3
工作流启动流程

4
DAG切分&任务提交

5
Master事件状态流转

图连接 :Master事件状态流转(https://www.processon.com/v/66820e565f927303a1701917)
TaskEventService组件中的TaskEventDispatchThread(线程)和TaskEventHandlerThread(线程)解析

其实就是Master自己状态(DISPATCH)和Worker汇报上来的状态(RUNNING、UPDATE_PID、RESULT)都会放入到eventQueue,TaskEventDispatchThread(线程)会阻塞的方式进行获取,然后放入到对应的TaskExecuteRunnable中(注意 : 不执行的),只有通过TaskEventHandlerThread(线程)才会使用TaskExecuteThreadPool线程进行TaskExecuteRunnable的提交。
转载自Journey
原文链接:https://segmentfault.com/a/1190000044992842

用户案例
迁移实战
发版消息
加入社区
关注社区的方式有很多:
GitHub: https://github.com/apache/dolphinscheduler 官网:https://dolphinscheduler.apache.org/en-us 订阅开发者邮件:dev@dolphinscheduler@apache.org X.com:@DolphinSchedule YouTube:https://www.youtube.com/@apachedolphinscheduler Slack:https://join.slack.com/t/asf-dolphinscheduler/shared_invite/zt-1cmrxsio1-nJHxRJa44jfkrNL_Nsy9Qg
同样地,参与Apache DolphinScheduler 有非常多的参与贡献的方式,主要分为代码方式和非代码方式两种。
📂非代码方式包括:
完善文档、翻译文档;翻译技术性、实践性文章;投稿实践性、原理性文章;成为布道师;社区管理、答疑;会议分享;测试反馈;用户反馈等。
👩💻代码方式包括:
查找Bug;编写修复代码;开发新功能;提交代码贡献;参与代码审查等。


你的好友秀秀子拍了拍你
并请你帮她点一下“分享”