首页 > 编程语言 >dolphinscheduler 实现master宕机故障转移能力源码分析

dolphinscheduler 实现master宕机故障转移能力源码分析

时间:2024-03-10 21:22:18浏览次数:22  
标签:dolphinscheduler 宕机 failover cmd 故障 processInstance 源码 master 转移

DS(dolphinscheduler)的master 是去中心化的,而故障转移能力是由master完成的,那么是多个master同时干故障转移,还是选举出一个master来干这件事情呢?

回归到源码进行分析

1. master 启动方法


@PostConstruct
public void run() throws SchedulerException {

....

    this.failoverExecuteThread.start();

....
}

也就是每个master都会启动一个failoverExecuteThread 线程去处理故障转移,看一下内部逻辑

2. failoverExecuteThread run方法

public void run() {
    // when startup, wait 10s for ready
    ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS * 10);

    while (!ServerLifeCycleManager.isStopped()) {
        try {
            if (!ServerLifeCycleManager.isRunning()) {
                continue;
            }
            // 在这里进行检查故障转移,其实转移动作也是在这里执行了
            masterFailoverService.checkMasterFailover();
        } catch (Exception e) {
            log.error("Master failover thread execute error", e);
        } finally {
           // 定时检查故障转移的间隔 ThreadUtils.sleep(masterConfig.getFailoverInterval().toMillis());
        }
    }
}

3. masterFailoverService.checkMasterFailover()

public void checkMasterFailover() {
    List<String> needFailoverMasterHosts = processService.queryNeedFailoverProcessInstanceHost()
            .stream()
            // failover myself || dead server
            .filter(host -> localAddress.equals(host)
                    || !registryClient.checkNodeExists(host, RegistryNodeType.MASTER))
            .distinct()
            .collect(Collectors.toList());
    if (CollectionUtils.isEmpty(needFailoverMasterHosts)) {
        return;
    }
    log.info("Master failover service {} begin to failover hosts:{}", localAddress, needFailoverMasterHosts);

    for (String needFailoverMasterHost : needFailoverMasterHosts) {
        failoverMaster(needFailoverMasterHost);
    }
}

processService.queryNeedFailoverProcessInstanceHost 将所有处理已提交,正在执行,延迟执行,执行暂停,执行停止状态的 工作流实例全部取出,用于后面failoverMaster进行比较

private static final int[] NEED_FAILOVER_STATES = new int[]{
        SUBMITTED_SUCCESS.getCode(),
        RUNNING_EXECUTION.getCode(),
        DELAY_EXECUTION.getCode(),
        READY_PAUSE.getCode(),
        READY_STOP.getCode()
};

4. failoverMaster 真正检查故障转移和执行故障转移动作的入口

public void failoverMaster(String masterHost) {
    String failoverPath = RegistryNodeType.MASTER_FAILOVER_LOCK.getRegistryPath() + "/" + masterHost;
    try {
        //第一步拿取分布式锁
        registryClient.getLock(failoverPath);
        //拿到锁干活
        doFailoverMaster(masterHost);
    } catch (Exception e) {
        log.error("Master server failover failed, host:{}", masterHost, e);
    } finally {
        //释放锁
        registryClient.releaseLock(failoverPath);
    }
}

也就是说,这里是通过拿取Zookeeper 分布式锁来保证同一时间只有一个master在干这事

5. doFailoverMaster

// 这里删除了一些非核心的代码
private void doFailoverMaster(@NonNull String masterHost) {
    // 从zk获取入参master的启动时间,如果已挂,那么得到的将是null
    Optional<Date> masterStartupTimeOptional =
            getServerStartupTime(registryClient.getServerList(RegistryNodeType.MASTER),
                    masterHost);
    // 获取入参master 所操作的所有工作流实例的列表
    List<ProcessInstance> needFailoverProcessInstanceList = processService.queryNeedFailoverProcessInstances(
            masterHost);
    if (CollectionUtils.isEmpty(needFailoverProcessInstanceList)) {
        return;
    }


    for (ProcessInstance processInstance : needFailoverProcessInstanceList) {
        try {
        // 这里在检查这个工作流实例是否需要被转移
            if (!checkProcessInstanceNeedFailover(masterStartupTimeOptional, processInstance)) {
                continue;
            }

            ProcessInstanceMetrics.incProcessInstanceByStateAndProcessDefinitionCode("failover",
                    processInstance.getProcessDefinitionCode().toString());
    // 这里处理需要故障转移的实例
    processService.processNeedFailoverProcessInstances(processInstance);
        } finally {
        }
    }

}

6. checkProcessInstanceNeedFailover

checkProcessInstanceNeedFailover的逻辑很简单,就是检查是否需要转移,比如当前工作流的启动时间是否小于对应的master的启动时间,如果小于,说明master重启过,需要转移等等

7. processService.processNeedFailoverProcessInstances

public void processNeedFailoverProcessInstances(ProcessInstance processInstance) {
    // updateProcessInstance host is null to mark this processInstance has been failover
    // and insert a failover command
    processInstance.setHost(Constants.NULL);
    processInstanceMapper.updateById(processInstance);

    // 2 insert into recover command
    Command cmd = new Command();
    cmd.setProcessDefinitionCode(processInstance.getProcessDefinitionCode());
    cmd.setProcessDefinitionVersion(processInstance.getProcessDefinitionVersion());
    // 注意:这里写入了当前的instance的id,用于后期复用
    cmd.setProcessInstanceId(processInstance.getId());
    cmd.setCommandParam(JSONUtils.toJsonString(createCommandParams(processInstance)));
    cmd.setExecutorId(processInstance.getExecutorId());
    cmd.setCommandType(CommandType.RECOVER_TOLERANCE_FAULT_PROCESS);
    cmd.setProcessInstancePriority(processInstance.getProcessInstancePriority());
    cmd.setTestFlag(processInstance.getTestFlag());
    commandService.createCommand(cmd);
}

这里很简单,就是吧instance的host置为null,然后创建一个新的Command (RECOVER_TOLERANCE_FAULT_PROCESS),然后通过去多个master去查询时执行这个Command 实现故障转移(有了复用的id,就可以再查出来使用这个processInstance)

标签:dolphinscheduler,宕机,failover,cmd,故障,processInstance,源码,master,转移
From: https://www.cnblogs.com/gradyblog/p/18064846

相关文章

  • 通达信买点100%指标公式源码1
    {通达信买点100%指标公式源码1}Var1:=1;趋势线:((3*SMA((CLOSE-LLV(LOW,27))/(HHV(HIGH,27)-LLV(LOW,27))*100,5,1)-2*SMA(SMA((CLOSE-LLV(LOW,27))/(HHV(HIGH,27)-LLV(LOW,27))*100,5,1),3,1)-50)*1.032+50),COLORRED;Var2:=(2*CLOSE+HIGH+LOW+OPEN)/5;Var3:=LLV(LOW,34)......
  • 通达信《鱼窝打分+鱼游打分》鱼仙指标 尾盘专用打分1支 止跌止盈量化计算 盘中捉涨停
    {通达信《鱼窝打分+鱼游打分》鱼仙指标尾盘专用打分1支止跌止盈量化计算盘中捉涨停捉妖源码文件分享}通达信《鱼窝打分+鱼游打分》鱼仙指标尾盘专用打分1支止跌止盈量化计算源码文件分享本指标每天尾盘打分1只《2022鱼仙指标盘中捉涨停妖栏》鱼窝打分鱼游打分稳......
  • 通达信成本无敌主图指标公式源码
    {通达信成本无敌主图指标公式源码}{指标介绍:出现绿色曲线时,此时我们持股,绿色曲线消失时我们持币。同时对于股价站上60日均线即大红粗线,成本无敌向上发散,此时就是打板时重点关注的对象。}JJJ:=IF(DYNAINFO(8)>0.01,0.01*DYNAINFO(10)/DYNAINFO(8),DYNAINFO(3));DDD:=(DYNAINF......
  • 通达信蓝防守黄进攻主图指标公式源码
    {通达信蓝防守黄进攻主图指标公式源码}DIFF:=EMA(CLOSE,12)-EMA(CLOSE,26);DEA:=EMA(DIFF,9);macd:=2*(DIFF-DEA);H9:=HHV(C,9),LINETHICK1,COLORGRAY;L9:=LLV(C,9),LINETHICK1,COLORGRAY;DRAWBAND(H9,RGB(255,217,40),L9,RGB(255,255,255));STICKLINE(MacD<0OR(DIF......
  • 通达信庄家诱空洗盘出击指标公式源码
    {通达信庄家诱空洗盘出击指标公式源码}{庄家诱空洗盘出击}MA05:=MA(C,5); MA10:=MA(C,10);MA30:=MA(C,30);BB05:=ATAN((MA05/REF(MA05,1)-1)*100)*180/3.1416;BB10:=ATAN((MA10/REF(MA10,1)-1)*100)*180/3.1416;BB30:=ATAN((MA30/REF(MA30,1)-1)*100)*180/3.1416;出......
  • 通达信短线抄底避险指标公式源码
    {通达信短线抄底避险指标公式源码}N:=5;走势:4*SMA((CLOSE-LLV(LOW,N))/(HHV(HIGH,N)-LLV(LOW,N))*100,5,1)-3*SMA(SMA((CLOSE-LLV(LOW,N))/(HHV(HIGH,N)-LLV(LOW,N))*100,5,1),3.2,1),COLOR0099FF,LINETHICK1;底部线:8,COLORGREEN,LINETHICK1;脱离底部:IF(crOSS(走势,底......
  • spring-security源码-FilterChainProxy
    FilterChainProxy内部存储了我们各个HttpSecurty生成的SecurityFilterChain。FilterChainProxy实现了ServletFilter接口。只真正的入口org.springframework.security.web.FilterChainProxy.doFilterpublicvoiddoFilter(ServletRequestrequest,ServletResponseresponse,F......
  • spring-security源码-如何初始化SecurityFilterChain到Servlet
    1.SecurityFilterChain是由HttpSecurty根据各个config配置生成的FilterSecurityFilterChain是接口,默认实现是由DefaultSecurityFilterChainSecurityFilterChain只充当描述的作用,描述哪些url走这批filterpublicfinalclassDefaultSecurityFilterChainimplementsSecurityF......
  • 通达信空转多反击副图指标公式源码
    {通达信空转多反击副图指标公式源码}ZEROS:0,NODRAW;RSV:=(CLOSE-LLV(LOW,9))/(HHV(HIGH,9)-LLV(LOW,9))*100;K:=SMA(RSV,3,1);D:=SMA(K,3,1);J:=3*K-2*D;VARV:=(2*C+H+L)/4;VARU:=LLV(LOW,30);VARA1:=HHV(HIGH,30);B:=EMA((VARV-VARU)/(VARA1-VARU)*100,8);B1:=EMA......
  • 通达信强烈反转指标,趋势改变上翘有力度源码
    {通达信强烈反转指标,趋势改变上翘有力度源码}N1:=20;N2:=60;VAR1:=(LOW+HIGH+CLOSE)/3;X:MA(VAR1,5);A1:HHV(X,N1)COLORMAGENTA;A2:HHV(X,N2),COLORGREEN;A3:HHV(HIGH,N2)*0.98,COLOR0000FF;B1:LLV(X,N1);B2:LLV(LOW,N2)*1.02;DRAWICON(X>B1ANDREF(X,1)=REF......