首页 > 其他分享 >深入解析 DolphinScheduler 任务调度、拆分与执行全流程

深入解析 DolphinScheduler 任务调度、拆分与执行全流程

时间:2024-10-10 14:46:39浏览次数:7  
标签:任务调度 processInstanceId DolphinScheduler vertex 任务 edge master 拆分 执行

Apache DolphinScheduler介绍

Apache DolphinScheduler 是一个分布式易扩展的可视化DAG工作流任务调度开源系统。适用于企业级场景,提供了一个可视化操作任务、工作流和全生命周期数据处理过程的解决方案。

Dag背景知识

摘录了一下Dag的offical定义

A graph is formed by vertices and by edges connecting pairs of vertices, where the
vertices can be any kind of object that is connected in pairs by edges.
In the case of a directed graph, each edge has an orientation, from one vertex to another vertex. A path in a directed graph is a sequence of edges having the property that the ending vertex of each
edge in the sequence is the same as the starting vertex of the next edge in the sequence; a path forms a cycle if the starting vertex of its first edge equals the ending vertex of its last edge.

A directed acyclic graph is a directed graph that has no cycles.[1][2][3]

A vertex v of a directed graph is said to be reachable from another
vertex u when there exists a path that starts at u and ends at v.
As a special case, every vertex is considered to be reachable from itself (by a path with zero edges). If a vertex can reach itself via a nontrivial path (a path with one or more edges), then that path is a cycle, so another way to define directed acyclic graphs is that they are the graphs in which no vertex can reach itself via a nontrivial path.

在offical的定义中,有两对象的集合,集合中的元素是

  1. vertex
    一个实体或者元素,可以是任何抽象的object
  2. edge
    一条有方向直线,包含两个vertex,分别扮演起点和终点
  • Dag约束
  1. 在Dag中,一个edge(a,b)的终点可以作为另一个edge(b,c)的起点,这个链路中所有的vertex都是可到达的, c是从a可达的。
  2. 在Dag中允许vertex不存在于任何一个edge中,这个节点可以从自己到达自己(一个孤岛,不和其他vertex有任何联系)
  3. 如果一个vertex可以从自己到达自己,但是中间经过了其他的vertex,那么这就存在一个环circle
  4. 在Dag中没有环

在DolphinScheduler中表示Dag的数据结构为

public class DAG<Node, NodeInfo, EdgeInfo> {

    private final ReadWriteLock lock = new ReentrantReadWriteLock();

    /**
     * node map, key is node, value is node information
     */
    private final Map<Node, NodeInfo> nodesMap;

    /**
     * edge map. key is node of origin;value is Map with key for destination node and value for edge
     */
    private final Map<Node, Map<Node, EdgeInfo>> edgesMap;

    /**
     * reversed edge set,key is node of destination, value is Map with key for origin node and value for edge
     */
    private final Map<Node, Map<Node, EdgeInfo>> reverseEdgesMap;
}

其中

  • Node表示任务的id
  • NodeInfo表示任务的详细信息
  • EdgeInfo包含任务id和依赖任务id

数仓建设任务和任务依赖

在企业数仓建设中,普遍的做法是进行数据分层(引用https://juejin.cn/post/6969874734355841031)

file

在生产环境,由于分层的需要,业务逻辑分布广泛,数据存储类型多样,这就造成了数仓建设的任务多,任务之间依赖复杂,dag就成了最佳的任务依赖和调度的存储结构。在Dag结构中每个节点表示一个具体的调度任务,任务之间的连线表示依赖关系,针对Dag结构化数据的遍历过程,就是对数仓任务的执行过程。

一个简单的数仓依赖任务关系(数仓建设中会有很多任务依赖关系和更复杂的任务依赖关系)

file

DolphinScheduler系统角色拆分

Apache DolphinScheduler核心角色包括MasterServer和WorkerServer,这遵循模块化设计,master和worker专注于自己本身的角色和任务,模块遵循高内聚低耦合的设计,大大提高了系统的稳定性和可扩展性,同时也有利于并行开发,缩短系统的研发时间,提高系统的健壮性。

MasterServer主要负责 DAG 任务切分、任务提交监控,并同时监听其它MasterServer和WorkerServer的健康状态。 MasterServer服务启动时向Zookeeper注册临时节点,通过监听Zookeeper临时节点变化来进行容错处理。

WorkerServer主要负责任务的执行和提供日志服务。 WorkerServer服务启动时向Zookeeper注册临时节点,并维持心跳。

DolphinScheduler任务调度流程

参考官网,DolphinScheduler核心任务任务执行流程如下
file

鉴于任务调度的复杂性,一个大的流程可以划分为小的流程,在主线流程之外还附加了支线流程,下面对执行调度流程拆分进行分析一下,这样更容易理解。

file

Command分发流程

处理方式

异步,分布式master server节点。

生产者

api-server将用户的运行工作流http请求封装成command数据,insert到t_ds_command表中
一个启动工作流实例的command样例

{
    "commandType": "START_PROCESS",
    "processDefinitionCode": 14285512555584,
    "executorId": 1,
    "commandParam": "{}",
    "taskDependType": "TASK_POST",
    "failureStrategy": "CONTINUE",
    "warningType": "NONE",
    "startTime": 1723444881372,
    "processInstancePriority": "MEDIUM",
    "updateTime": 1723444881372,
    "workerGroup": "default",
    "tenantCode": "default",
    "environmentCode": -1,
    "dryRun": 0,
    "processInstanceId": 0,
    "processDefinitionVersion": 1,
    "testFlag": 0
}

消费者

master server中的MasterSchedulerBootstrap loop程序, MasterSchedulerBootstrap使用zk分配到自己的slot,从t_ds_command表中select属于slot的command列表处理
查询语句

<select id="queryCommandPageBySlot" resultType="org.apache.dolphinscheduler.dao.entity.Command">
        select *
        from t_ds_command
        where id % #{masterCount} = #{thisMasterSlot}
        order by process_instance_priority, id asc
            limit #{limit}
</select>

MasterSchedulerBootstrap loop轮训查到待处理的command任务,将command任务和master host生成ProcessInstance,将ProcessInstance对象插入到t_ds_process_instance表中,
同时生成包含运行所需要的上下文信息的可执行任务workflowExecuteRunnable
workflowExecuteRunnablecache到本地cache processInstanceExecCacheManager,同时生产将ProcessInstanceWorkflowEventType.START_WORKFLOW生产到workflowEventQueue队列中。

Dag遍历执行任务

Master本地cache缓冲

cache实现ProcessInstanceExecCacheManagerImpl,提供如下核心功能

public interface ProcessInstanceExecCacheManager {

    /**
     * get WorkflowExecuteThread by process instance id
     *
     * @param processInstanceId processInstanceId
     * @return WorkflowExecuteThread
     */
    WorkflowExecuteRunnable getByProcessInstanceId(int processInstanceId);

    /**
     * judge the process instance does it exist
     *
     * @param processInstanceId processInstanceId
     * @return true - if process instance id exists in cache
     */
    boolean contains(int processInstanceId);

    /**
     * remove cache by process instance id
     *
     * @param processInstanceId processInstanceId
     */
    void removeByProcessInstanceId(int processInstanceId);

    /**
     * cache
     *
     * @param processInstanceId     processInstanceId
     * @param workflowExecuteThread if it is null, will not be cached
     */
    void cache(int processInstanceId, @NonNull WorkflowExecuteRunnable workflowExecuteThread);

    /**
     * get all WorkflowExecuteThread from cache
     *
     * @return all WorkflowExecuteThread in cache
     */
    Collection<WorkflowExecuteRunnable> getAll();

    void clearCache();
}

生产者

MasterSchedulerBootstrap loop将command transform to可以运行的任务,任务对象中包含了要处理的所有上下文信息

消费者

EventExecuteService根据dag信息,拿到第一批没有任何依赖的TaskInstance添加到待执行任务队列standByTaskInstancePriorityQueue中, standByTaskInstancePriorityQueue按照优先级先后顺序执行,处理任务状态,将待执行任务提交到globalTaskDispatchWaitingQueue队列中。

可执行任务Dispatch

Master进城内优先级队列

到了globalTaskDispatchWaitingQueue中,已经是可执行任务的最小单元了

生产者

EventExecuteService根据parent node,对Dag进行广度优先遍历,提交任务到globalTaskDispatchWaitingQueue队列中。

消费者

消费者为GlobalTaskDispatchWaitingQueueLooperGlobalTaskDispatchWaitingQueueLooper消费待dispatch的任务,根据任务类型执行任务调度,对任务的调度是走的rpc接口,目前来看根据任务类型分为两种:

  1. MasterTaskDispatcher
  2. WorkerTaskDispatcher

对于WorkerTaskDispatcher来说,rpc server收到rpc request之后提交任务到了workerTaskExecutorThreadPool执行。所以这是一个异步处理任务的过程,不至于让master server hang在这个地方。对于任务的执行进度,会在关键节点进行回调通知。

任务执行状态回调通知

Worker被dispatch任务,异步提交到线程池中之行,在任务异步执行的节点,调用rpc接口通知master任务的状态。

生产者

Worker异步执行节点,对于任务执行状态回调包括四个

  1. TaskExecutionStatus.FAILURE 执行抛出异常,运行失败
  2. TaskExecutionStatus.RUNNING_EXECUTION 开始执行
  3. TaskExecutionStatus.KILL 被杀死
  4. TaskExecutionStatus.SUCCESS 执行成功

备注:在官方的事件流程中Ack的方向搞错了,Ack不是worker通知给master,而是master通知workerer,我的这个事件状态的处理结束了。

经过校正一下,比较概括性的总结,整体的流程大致如下图

file

消费者

master节点ITaskInstanceExecutionEventListener服务,服务接受rpc请求,并将任务添加到TaskEventService eventQueue队列中。

任务状态处理

缓冲队列

master节点TaskEventService eventQueue队列。

生产者

这个生产者可能会很多

  1. api-server用户行为
  2. master节点任务调度
  3. work节点任务执行
  4. master任务执行

消费者

为master节点的TaskInstanceListenerImpl服务,TaskInstanceListenerImplTaskEvent transform to TaskExecuteRunnable,并且提交到线程池执行taskExecuteThreadMap待执行,在线程池中修改任务的执行状态。

本文由 白鲸开源 提供发布支持!

标签:任务调度,processInstanceId,DolphinScheduler,vertex,任务,edge,master,拆分,执行
From: https://www.cnblogs.com/DolphinScheduler/p/18456339

相关文章

  • Apache DolphinScheduler-1.3.9源码分析(二)
    引言随着大数据的发展,任务调度系统成为了数据处理和管理中至关重要的部分。ApacheDolphinScheduler是一款优秀的开源分布式工作流调度平台,在大数据场景中得到广泛应用。在本文中,我们将对ApacheDolphinScheduler1.3.9版本的源码进行深入分析,主要分析一下Master和Worker的......
  • Apache DolphinScheduler社区9月进展记录
    各位热爱ApacheDolphinScheduler的小伙伴们,社区9月月报更新啦!这里将记录ApacheDolphinScheduler社区每月的重要更新,欢迎关注!月度MergeStar感谢以下小伙伴上个月为ApacheDolphinScheduler做的精彩贡献(排名不分先后):@Mighten,@ChaoquanTao,@wangxj3,@Xuxiaotuan,@sd......
  • 总奖金高达10万元!华为算法精英实战营“亲和任务调度系统”来啦!
    随着物联网、大数据、AI时代的到来,时延、可靠性等指标要求越来越高,海量的数据分析、大量复杂的运算对CPU的算力要求越来越高。CPU内部的大部分资源用于缓存和逻辑控制,适合运行具有分支跳转、逻辑复杂、数据结构不规则、递归等特点的串行程序。在集成电路工艺制程将要达到极限,摩尔......
  • leetcode 刷题day37动态规划Part06背包问题( 322. 零钱兑换、279.完全平方数、139.单词
    322.零钱兑换思路:每种硬币的数量是无限的,是典型的完全背包问题。但是题目要求等于目标值的最小硬币个数。所以这里需要对动规五部曲进行分析。动规五部曲:1、确定dp数组以及下标的含义dp[j]:凑足总额为j所需钱币的最少个数为dp[j]2、确定递推公式凑足总额为j-coins[i......
  • pycharm 拆分窗口, 取消分屏; VS code 分屏
    SplitVertically或者SplitHorizontally可以把当前编辑窗口垂直或者水平拆分成两个。 SplitVertically或者SplitHorizontally可以把当前编辑窗口垂直或者水平拆分成两个。 取消拆分窗口: VScode分屏: ......
  • leetcode刷题day33|动态规划Part02(62.不同路径、63. 不同路径 II、 343.整数拆分、96.
    62.不同路径机器人从(0,0)位置出发,到(m-1,n-1)终点。动规五部曲1、确定dp数组(dptable)以及下标的含义dp[i][j]:表示从(0,0)出发,到(i,j)有dp[i][j]条不同的路径。2、确定递推公式想要求dp[i][j],只能有两个方向来推导出来,即dp[i-1][j]和dp[i][j-1]。dp[i]......
  • 信息学奥赛复赛复习05-CSP-J2020-01优秀的拆分-对数函数、自然对数、以2为底的对数、
    PDF文档回复:2024092712020CSP-J题目1优秀的拆分[题目描述]一般来说,一个正整数可以拆分成若干个正整数的和例如,1=1,10=1+2+3+4等。对于正整数n的一种特定拆分,我们称它为“优秀的”,当且仅当在这种拆分下,n被分解为了若干个不同的2的正整数次幂。注意,一个数x能......
  • 按照分页预览页码拆分工作表并在打印标题区域写入相应页码信息
    适用于具备水平分页,无垂直分页的Excel工作表的情况。尽量保持原工作表格式而采用建立副本的方法,代码存在优化的可能。本文假设打印标题不低于两行。SubSplitWorkbookByPrintPages()DimwsSourceAsWorksheetDimwsDestAsWorksheetDimiAsLongDimlas......
  • ERROR:start workflow error,dolphinscheduler log重复刷屏(死循环)直至磁盘存满
    在使用ds过后发现,我虚拟机中的磁盘内存全部沾满了查看目录下大于100M的文件:find/-size+100M查看后发现问题在于ds产生的日志文件特别大而且多,查看日志后发现日志中一直都在死循环错误:startworkflowerror 等其中文件下的目录可以直接全部删除:cd /opt/install......
  • Apache DolphinScheduler-1.3.9源码分析(一)
    引言随着大数据的发展,任务调度系统成为了数据处理和管理中至关重要的部分。ApacheDolphinScheduler是一款优秀的开源分布式工作流调度平台,在大数据场景中得到广泛应用。在本文中,我们将对ApacheDolphinScheduler1.3.9版本的源码进行深入分析,介绍Master启动以及调度流程......