DS(dolphinscheduler)的master 是去中心化的,而故障转移能力是由master完成的,那么是多个master同时干故障转移,还是选举出一个master来干这件事情呢?
回归到源码进行分析
1. master 启动方法
@PostConstruct
public void run() throws SchedulerException {
....
this.failoverExecuteThread.start();
....
}
也就是每个master都会启动一个failoverExecuteThread 线程去处理故障转移,看一下内部逻辑
2. failoverExecuteThread run方法
public void run() {
// when startup, wait 10s for ready
ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS * 10);
while (!ServerLifeCycleManager.isStopped()) {
try {
if (!ServerLifeCycleManager.isRunning()) {
continue;
}
// 在这里进行检查故障转移,其实转移动作也是在这里执行了
masterFailoverService.checkMasterFailover();
} catch (Exception e) {
log.error("Master failover thread execute error", e);
} finally {
// 定时检查故障转移的间隔 ThreadUtils.sleep(masterConfig.getFailoverInterval().toMillis());
}
}
}
3. masterFailoverService.checkMasterFailover()
public void checkMasterFailover() {
List<String> needFailoverMasterHosts = processService.queryNeedFailoverProcessInstanceHost()
.stream()
// failover myself || dead server
.filter(host -> localAddress.equals(host)
|| !registryClient.checkNodeExists(host, RegistryNodeType.MASTER))
.distinct()
.collect(Collectors.toList());
if (CollectionUtils.isEmpty(needFailoverMasterHosts)) {
return;
}
log.info("Master failover service {} begin to failover hosts:{}", localAddress, needFailoverMasterHosts);
for (String needFailoverMasterHost : needFailoverMasterHosts) {
failoverMaster(needFailoverMasterHost);
}
}
processService.queryNeedFailoverProcessInstanceHost
将所有处理已提交,正在执行,延迟执行,执行暂停,执行停止状态的 工作流实例全部取出,用于后面failoverMaster
进行比较
private static final int[] NEED_FAILOVER_STATES = new int[]{
SUBMITTED_SUCCESS.getCode(),
RUNNING_EXECUTION.getCode(),
DELAY_EXECUTION.getCode(),
READY_PAUSE.getCode(),
READY_STOP.getCode()
};
4. failoverMaster 真正检查故障转移和执行故障转移动作的入口
public void failoverMaster(String masterHost) {
String failoverPath = RegistryNodeType.MASTER_FAILOVER_LOCK.getRegistryPath() + "/" + masterHost;
try {
//第一步拿取分布式锁
registryClient.getLock(failoverPath);
//拿到锁干活
doFailoverMaster(masterHost);
} catch (Exception e) {
log.error("Master server failover failed, host:{}", masterHost, e);
} finally {
//释放锁
registryClient.releaseLock(failoverPath);
}
}
也就是说,这里是通过拿取Zookeeper 分布式锁来保证同一时间只有一个master在干这事
5. doFailoverMaster
// 这里删除了一些非核心的代码
private void doFailoverMaster(@NonNull String masterHost) {
// 从zk获取入参master的启动时间,如果已挂,那么得到的将是null
Optional<Date> masterStartupTimeOptional =
getServerStartupTime(registryClient.getServerList(RegistryNodeType.MASTER),
masterHost);
// 获取入参master 所操作的所有工作流实例的列表
List<ProcessInstance> needFailoverProcessInstanceList = processService.queryNeedFailoverProcessInstances(
masterHost);
if (CollectionUtils.isEmpty(needFailoverProcessInstanceList)) {
return;
}
for (ProcessInstance processInstance : needFailoverProcessInstanceList) {
try {
// 这里在检查这个工作流实例是否需要被转移
if (!checkProcessInstanceNeedFailover(masterStartupTimeOptional, processInstance)) {
continue;
}
ProcessInstanceMetrics.incProcessInstanceByStateAndProcessDefinitionCode("failover",
processInstance.getProcessDefinitionCode().toString());
// 这里处理需要故障转移的实例
processService.processNeedFailoverProcessInstances(processInstance);
} finally {
}
}
}
6. checkProcessInstanceNeedFailover
checkProcessInstanceNeedFailover的逻辑很简单,就是检查是否需要转移,比如当前工作流的启动时间是否小于对应的master的启动时间,如果小于,说明master重启过,需要转移等等
7. processService.processNeedFailoverProcessInstances
public void processNeedFailoverProcessInstances(ProcessInstance processInstance) {
// updateProcessInstance host is null to mark this processInstance has been failover
// and insert a failover command
processInstance.setHost(Constants.NULL);
processInstanceMapper.updateById(processInstance);
// 2 insert into recover command
Command cmd = new Command();
cmd.setProcessDefinitionCode(processInstance.getProcessDefinitionCode());
cmd.setProcessDefinitionVersion(processInstance.getProcessDefinitionVersion());
// 注意:这里写入了当前的instance的id,用于后期复用
cmd.setProcessInstanceId(processInstance.getId());
cmd.setCommandParam(JSONUtils.toJsonString(createCommandParams(processInstance)));
cmd.setExecutorId(processInstance.getExecutorId());
cmd.setCommandType(CommandType.RECOVER_TOLERANCE_FAULT_PROCESS);
cmd.setProcessInstancePriority(processInstance.getProcessInstancePriority());
cmd.setTestFlag(processInstance.getTestFlag());
commandService.createCommand(cmd);
}
这里很简单,就是吧instance的host置为null,然后创建一个新的Command (RECOVER_TOLERANCE_FAULT_PROCESS),然后通过去多个master去查询时执行这个Command 实现故障转移(有了复用的id,就可以再查出来使用这个processInstance)
标签:dolphinscheduler,宕机,failover,cmd,故障,processInstance,源码,master,转移 From: https://www.cnblogs.com/gradyblog/p/18064846