首页 > 编程语言 >dolphinscheduler master实现去中心化源码分析

dolphinscheduler master实现去中心化源码分析

时间:2024-03-07 22:55:58浏览次数:20  
标签:error log dolphinscheduler command SlotId 源码 Master master 中心化

dolphinscheduler Master服务是去中心化的,也就是没有master和slave之分,每个master都参与工作,那么它是如何每个Master服务去取任务执行时,每个Master都取到不同的任务,并且不会漏掉,不会重复的呢 ,下面从源码角度来分析这个问题

MasterServer.java

/**
 * run master server
 */
@PostConstruct
public void run() throws SchedulerException {
    ......

    this.masterSlotManager.start();

    // self tolerant

    ......
    
    this.masterSchedulerBootstrap.start();
    ......
}

在DS(后面dolphinschedule的简称)中提出了一个Slot(插槽)的概念,每个Master都有专属于自己的SlotId,当master减少或增加时,会刷新这个Id。

2. 先看SlotId的生成原理

this.masterSlotManager.start();
public void start() {
    serverNodeManager.addMasterInfoChangeListener(new MasterSlotManager.SlotChangeListener());
}

这里注册了addMasterInfoChangeListener , 从字面上分析,这里会监听卸载Zookeeper中的Master信息的变化;先分析SlotChangeListener

public class SlotChangeListener implements MasterInfoChangeListener {

    private final Lock slotLock = new ReentrantLock();

    private final MasterPriorityQueue masterPriorityQueue = new MasterPriorityQueue();

    @Override
    public void notify(Map<String, MasterHeartBeat> masterNodeInfo) {
    // 一旦master信息发生变化会走到这里,masterNodeInfo是所有master的最新信息
        List<Server> serverList = masterNodeInfo.values().stream()
                .filter(heartBeat -> !heartBeat.getServerStatus().equals(ServerStatus.BUSY))
                .map(this::convertHeartBeatToServer).collect(Collectors.toList());
        syncMasterNodes(serverList);
    }

    /**
     * sync master nodes
     */
    private void syncMasterNodes(List<Server> masterNodes) {
        slotLock.lock();
        try {
            //masterPriorityQueue 是一个阻塞优先队列
            this.masterPriorityQueue.clear();
            // 把masterNodes,会获得一个根据createTime排序的队列,这样在各个master节点中,这个优先队列中的排序顺序就都是固定一样的
            this.masterPriorityQueue.putAll(masterNodes);
            // 这里是根据当前节点的IP来获取队列中的Index来当做SlotId
            int tempCurrentSlot = masterPriorityQueue.getIndex(masterConfig.getMasterAddress());
            int tempTotalSlot = masterNodes.size();
            if (tempCurrentSlot < 0) {
                totalSlot = 0;
                currentSlot = 0;
                log.warn("Current master is not in active master list");
            } else if (tempCurrentSlot != currentSlot || tempTotalSlot != totalSlot) {
                totalSlot = tempTotalSlot;
                currentSlot = tempCurrentSlot;
                log.info("Update master nodes, total master size: {}, current slot: {}", totalSlot, currentSlot);
            }
        } finally {
            slotLock.unlock();
        }
    }
    ......
 }

从SlotChangeListener源码中,可以知道SlotId的来源,来源于Zookeeper中masterInfo信息的变化,而masterinfo信息是由master启动时主动注册上去的(临时节点,下线自动删除)。

3. 如何利用SlotId来取任务Command

MasterSchedulerBootstrap.start()

这个方法最终会执行到MasterSchedulerBootstrap的run()方法

@Override
public void run() {
    MasterServerLoadProtection serverLoadProtection = masterConfig.getServerLoadProtection();
    while (!ServerLifeCycleManager.isStopped()) {
        try {
.......
            //SlotId就在这个findCommands方法中,实现不同的master取不同的command
            List<Command> commands = findCommands();
            if (CollectionUtils.isEmpty(commands)) {
                // indicate that no command ,sleep for 1s
                Thread.sleep(Constants.SLEEP_TIME_MILLIS);
                continue;
            }

            commands.parallelStream()
                    .forEach(command -> {
                        try {
                            Optional<WorkflowExecuteRunnable> workflowExecuteRunnableOptional =
                                    workflowExecuteRunnableFactory.createWorkflowExecuteRunnable(command);
                            if (!workflowExecuteRunnableOptional.isPresent()) {
                                log.warn(
                                        "The command execute success, will not trigger a WorkflowExecuteRunnable, this workflowInstance might be in serial mode");
                                return;
                            }
                            WorkflowExecuteRunnable workflowExecuteRunnable = workflowExecuteRunnableOptional.get();
                            ProcessInstance processInstance = workflowExecuteRunnable
                                    .getWorkflowExecuteContext().getWorkflowInstance();
                            if (processInstanceExecCacheManager.contains(processInstance.getId())) {
                                log.error(
                                        "The workflow instance is already been cached, this case shouldn't be happened");
                            }
                            processInstanceExecCacheManager.cache(processInstance.getId(), workflowExecuteRunnable);
                            workflowEventQueue.addEvent(
                                    new WorkflowEvent(WorkflowEventType.START_WORKFLOW, processInstance.getId()));
                        } catch (WorkflowCreateException workflowCreateException) {
                            log.error("Master handle command {} error ", command.getId(), workflowCreateException);
                            commandService.moveToErrorCommand(command, workflowCreateException.toString());
                        }
                    });
            MasterServerMetrics.incMasterConsumeCommand(commands.size());
        } catch (InterruptedException interruptedException) {
            log.warn("Master schedule bootstrap interrupted, close the loop", interruptedException);
            Thread.currentThread().interrupt();
            break;
        } catch (Exception e) {
            log.error("Master schedule workflow error", e);
            // sleep for 1s here to avoid the database down cause the exception boom
            ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);
        }
    }
}
private List<Command> findCommands() throws MasterException {
    try {
        long scheduleStartTime = System.currentTimeMillis();
        // 这里拿到自己的SlotId
        int thisMasterSlot = masterSlotManager.getSlot();
        int masterCount = masterSlotManager.getMasterSize();
        if (masterCount <= 0) {
            log.warn("Master count: {} is invalid, the current slot: {}", masterCount, thisMasterSlot);
            return Collections.emptyList();
        }
        int pageSize = masterConfig.getFetchCommandNum();
        //这里通过slotId去取command
        final List<Command> result =
                commandService.findCommandPageBySlot(pageSize, masterCount, thisMasterSlot);
        if (CollectionUtils.isNotEmpty(result)) {
            long cost = System.currentTimeMillis() - scheduleStartTime;
            log.info(
                    "Master schedule bootstrap loop command success, fetch command size: {}, cost: {}ms, current slot: {}, total slot size: {}",
                    result.size(), cost, thisMasterSlot, masterCount);
            ProcessInstanceMetrics.recordCommandQueryTime(cost);
        }
        return result;
    } catch (Exception ex) {
        throw new MasterException("Master loop command from database error", ex);
    }
}

最终会走到Mapper中

<select id="queryCommandPageBySlot" resultType="org.apache.dolphinscheduler.dao.entity.Command">
    select *
    from t_ds_command
    where id % #{masterCount} = #{thisMasterSlot}
    order by process_instance_priority, id asc
        limit #{limit}
</select>

这里通过id 对master的总数取余,如果等于当前的SlotId,则取出,实现多master取同一张表中的command,而master之间相互不冲突

标签:error,log,dolphinscheduler,command,SlotId,源码,Master,master,中心化
From: https://www.cnblogs.com/gradyblog/p/18059971

相关文章

  • ChatGLM3 源码解析(三)
    RotaryEmbedding#旋转位置嵌入,应用于每一层Q和KclassRotaryEmbedding(nn.Module):def__init__(self,dim,rope_ratio=1,original_impl=False,device=None,dtype=None):super().__init__()#除法项定义inv_freq=1.0/(10000**(to......
  • PokéLLMon 源码解析(二)
    .\PokeLLMon\poke_env\environment\double_battle.py#从logging模块中导入Logger类fromloggingimportLogger#从typing模块中导入Any,Dict,List,Optional,Union类型fromtypingimportAny,Dict,List,Optional,Union#从poke_env.environment.abstract......
  • PokéLLMon 源码解析(三)
    .\PokeLLMon\poke_env\environment\move_category.py#导入需要的模块fromenumimportEnum,auto,unique#定义一个枚举类MoveCategory,表示一个移动类别@uniqueclassMoveCategory(Enum):"""Enumeration,representamovecategory."""#定......
  • PokéLLMon 源码解析(四)
    .\PokeLLMon\poke_env\exceptions.py"""Thismodulecontainsexceptions."""#定义一个自定义异常类ShowdownException,继承自内置异常类ExceptionclassShowdownException(Exception):"""Thisexceptionisraisedwhena......
  • PokéLLMon 源码解析(五)
    .\PokeLLMon\poke_env\player\player.py"""Thismoduledefinesabaseclassforplayers."""importasyncioimportrandomfromabcimportABC,abstractmethodfromasyncioimportCondition,Event,Queue,Semaphorefromlogg......
  • PokéLLMon 源码解析(六)
    .\PokeLLMon\poke_env\teambuilder\teambuilder_pokemon.py"""ThismoduledefinestheTeambuilderPokemonclass,whichisusedasanintermediateformattospecifypokemonbuildsinteambuilderscustomclasses."""#导入必要的模块......
  • PokéLLMon 源码解析(一)
    .\PokeLLMon\poke_env\concurrency.py#导入必要的模块importasyncioimportatexitimportsysfromloggingimportCRITICAL,disablefromthreadingimportThreadfromtypingimportAny,List#在新线程中运行事件循环def__run_loop(loop:asyncio.AbstractEventLo......
  • opencv+opencv_contrib源码安装及卸载
    源码安装opencv3.4.16+opencv_contrib-3.4.16及卸载1.下载opencv-3.4.16和opencv_contrib-3.4.16opencv官网下载链接:opencv点击Sources下载压缩包在GitHub中下载opencv_contrib-3.4.16:opencv_contrib-3.4.16选择与opencv相同版本的opencv_contrib2.解压opencv-3.4.1......
  • 速存,详细罗列香橙派AIpro外设接口样例大全(附源码)
    本文分享自华为云社区《香橙派AIpro外设接口样例大全(附源码)》,作者:昇腾CANN。OrangePiAIPro开发板是香橙派联合华为精心打造的高性能AI开发板,其搭载了昇腾AI处理器,可提供8TOPSINT8的计算能力,内存提供了8GB和16GB两种版本。可以实现图像、视频等多种数据分析与推理计......
  • drf源码剖析----request
    点击查看代码源码解析:#请求函数内的request,*args,**kwargs传给父类APIView内的dispatch()函数defdispatch(self,request,*args,**kwargs):self.args=argsself.kwargs=kwargs#request,*args,**kwargs传给self.initialize_request(request,......