首页 > 编程语言 >Dolphinscheduler DAG核心源码剖析

Dolphinscheduler DAG核心源码剖析

时间:2024-12-05 10:20:59浏览次数:10  
标签:DAG Dolphinscheduler fromNode toNode 源码 processDag TODO 节点

背景描述

file

注意 : 在 Dolphinscheduler 中,离线任务是有完整的声明周期的,比如说停止、暂停、暂停恢复、重跑等等,都是以DAG(有向无环图的形式进行任务组织)T+1离线任务的。

Dolphinscheduler DAG实现

org.apache.dolphinscheduler.common.graph.DAG

DAG三个重要的数据结构 :

// 顶点信息
private final Map<Node, NodeInfo> nodesMap;

// 边关联信息,作用是记录顶点和边的关系,可以找到叶子节点,也可以获取下游节点
private final Map<Node, Map<Node, EdgeInfo>> edgesMap;

// 反向边关联信息,作用是可以快速找到入度为0的节点(起始节点),也可以获取上游节点
private final Map<Node, Map<Node, EdgeInfo>> reverseEdgesMap;

如下示例 :

DAG<String, String, String> graph = new DAG<>();
graph.addNode("A", "A");
graph.addNode("B", "B");
graph.addNode("C", "C");

// 添加一个B -> C的边,当前A还飘着呢
graph.addEdge("B", "C");

// 如果添加A -> B,其实就是会从B开始一直到子节点,看有没有可连接的线到A,如果有,说明这个A -> B的边添加不得,因为会形成环,否则就可以添加
graph.addEdge("A", "B");

源码分析 :
org.apache.dolphinscheduler.common.graph.DAG#addEdge

public boolean addEdge(Node fromNode, Node toNode, EdgeInfo edge, boolean createNode) {
    lock.writeLock().lock();

    try {
        // TODO 是否可以添加该边
        if (!isLegalAddEdge(fromNode, toNode, createNode)) {
            log.error("serious error: add edge({} -> {}) is invalid, cause cycle!", fromNode, toNode);
            return false;
        }

        // TODO 添加节点
        addNodeIfAbsent(fromNode, null);
        addNodeIfAbsent(toNode, null);

        // TODO 添加边
        addEdge(fromNode, toNode, edge, edgesMap);
        addEdge(toNode, fromNode, edge, reverseEdgesMap);

        return true;
    } finally {
        lock.writeLock().unlock();
    }
}


private boolean isLegalAddEdge(Node fromNode, Node toNode, boolean createNode) {
    // TODO 如果fromNode和toNode两个是同一个顶点,这个边是不能添加的
    if (fromNode.equals(toNode)) {
        log.error("edge fromNode({}) can't equals toNode({})", fromNode, toNode);
        return false;
    }

    // TODO 这里其实就是想说,不是创建节点,也就是说要求fromNode和toNode是需要存在的顶点
    if (!createNode) {
        if (!containsNode(fromNode) || !containsNode(toNode)) {
            log.error("edge fromNode({}) or toNode({}) is not in vertices map", fromNode, toNode);
            return false;
        }
    }

    // Whether an edge can be successfully added(fromNode -> toNode),need to determine whether the
    // DAG has cycle!
    // TODO 这里获取节点的数量
    int verticesCount = getNodesCount();

    Queue<Node> queue = new LinkedList<>();

    // TODO 将toNode放入到queue中
    queue.add(toNode);

    // if DAG doesn't find fromNode, it's not has cycle!
    // TODO 当queue不为空,这里肯定就不为空了
    while (!queue.isEmpty() && (--verticesCount > 0)) {
        // TODO 获取队列里面的元素
        Node key = queue.poll();

        for (Node subsequentNode : getSubsequentNodes(key)) {
            // TODO 其实这里判断的是比如说A -> B 有连接的DAG图,传入的是节点B,看B节点的边是不是有A,如果有A说明已经有B -> A的关联了,
            // TODO 就不能添加了。如果比如说B的下游节点,比如说 A -> B -> C,这样的话,B的下游节点就是C,C是需要放入queue中的
            // TODO 核心思想其实就是要找到它要添加的目标节点的连线,是否有目标节点到源节点的连线存在(这样来判断是否存在环)
            if (subsequentNode.equals(fromNode)) {
                return false;
            }

            queue.add(subsequentNode);
        }
    }

    return true;
}

Dolphinscheduler DagHelper解说

DAG类是一个基础通用的DAG工具类,而DagHelper是任务定义、任务定义直接的关系组装成DAG的一个业务工具类。

org.apache.dolphinscheduler.server.master.graph.WorkflowGraphFactory#createWorkflowGraph

public IWorkflowGraph createWorkflowGraph(ProcessInstance workflowInstance) throws Exception {

    // TODO 这里其实就是获取的流程实例对应的任务数和之间的关系
    List<ProcessTaskRelation> processTaskRelations = processService.findRelationByCode(
            workflowInstance.getProcessDefinitionCode(),
            workflowInstance.getProcessDefinitionVersion());

    // TODO 获取对应的任务定义log
    List<TaskDefinitionLog> taskDefinitionLogs = taskDefinitionLogDao.queryTaskDefineLogList(processTaskRelations);

    // TODO 获取TaskNode
    List<TaskNode> taskNodeList = processService.transformTask(processTaskRelations, taskDefinitionLogs);

    // generate process to get DAG info
    // TODO 这里其实解析的是是否自己手动指定的启动节点列表,默认不会
    List<Long> recoveryTaskNodeCodeList = getRecoveryTaskNodeCodeList(workflowInstance.getCommandParam());

    // TODO 如果 默认startNodeNameList为空
    List<Long> startNodeNameList = parseStartNodeName(workflowInstance.getCommandParam());

    // TODO 构建ProcessDag对象实例
    ProcessDag processDag = DagHelper.generateFlowDag(
            taskNodeList,
            startNodeNameList,
            recoveryTaskNodeCodeList,
            workflowInstance.getTaskDependType());

    if (processDag == null) {
        log.error("ProcessDag is null");
        throw new IllegalArgumentException("Create WorkflowGraph failed, ProcessDag is null");
    }

    // TODO 生成DAG
    DAG<Long, TaskNode, TaskNodeRelation> dagGraph = DagHelper.buildDagGraph(processDag);
    log.debug("Build dag success, dag: {}", dagGraph);

    // TODO 使用WorkflowGraph来封装任务节点列表和dagGraph
    return new WorkflowGraph(taskNodeList, dagGraph);
}

org.apache.dolphinscheduler.service.utils.DagHelper#generateFlowDag

public static ProcessDag generateFlowDag(
                                             List<TaskNode> totalTaskNodeList,
                                             List<Long> startNodeNameList,
                                             List<Long> recoveryNodeCodeList,
                                             TaskDependType depNodeType) throws Exception {

    // TODO 其实就是拿到所有的节点
    List<TaskNode> destTaskNodeList =
            generateFlowNodeListByStartNode(
                    totalTaskNodeList, startNodeNameList, recoveryNodeCodeList, depNodeType);

    if (destTaskNodeList.isEmpty()) {
        return null;
    }

    // TODO 获取任务节点之前的关系
    List<TaskNodeRelation> taskNodeRelations = generateRelationListByFlowNodes(destTaskNodeList);

    // TODO 其实就是实例化一个ProcessDag
    ProcessDag processDag = new ProcessDag();
    // TODO 设置DAG的边
    processDag.setEdges(taskNodeRelations);
    // TODO 设置DAG的顶点
    processDag.setNodes(destTaskNodeList);
    return processDag;
}

设置了destTaskNodeList和taskNodeRelations

org.apache.dolphinscheduler.service.utils.DagHelper#buildDagGraph

public static DAG<Long, TaskNode, TaskNodeRelation> buildDagGraph(ProcessDag processDag) {

    DAG<Long, TaskNode, TaskNodeRelation> dag = new DAG<>();

    // TODO 添加顶点
    if (CollectionUtils.isNotEmpty(processDag.getNodes())) {
        for (TaskNode node : processDag.getNodes()) {
            dag.addNode(node.getCode(), node);
        }
    }

    // TODO 添加边
    if (CollectionUtils.isNotEmpty(processDag.getEdges())) {
        for (TaskNodeRelation edge : processDag.getEdges()) {
            dag.addEdge(edge.getStartNode(), edge.getEndNode());
        }
    }
    return dag;
}

转载自 Journey

原文链接:https://segmentfault.com/a/1190000045117764

本文由 白鲸开源 提供发布支持!

标签:DAG,Dolphinscheduler,fromNode,toNode,源码,processDag,TODO,节点
From: https://www.cnblogs.com/DolphinScheduler/p/18587974

相关文章

  • 连锁管理系统:进销存、收银系统、线上商城和会员营销设计与源码分享
    随着电商和线下零售市场的蓬勃发展,越来越多的连锁企业开始寻求一体化的管理解决方案,以提高运营效率、提升客户体验和优化财务流程。一个功能齐全的连锁管理系统可以为企业提供从库存管理到订单处理,从收银系统到线上商城的全方位支持。本文商淘云介绍如何设计一个集进销存、收银......
  • 基于大数据的滴滴出行数据分析与可视化系统(源码+vue+可视化大屏展示+爬虫分析+讲解等
    收藏关注不迷路!!......
  • YOLO11: 目标检测原理与源码
    视频链接:YOLO11:目标检测原理与源码_哔哩哔哩_bilibili 《YOLO11:目标检测原理与源码》课程致力于帮助学生学习YOLO11目标检测算法的原理与源码。常心老师将手把手从0开始解读YOLO11工程目录结构,解读YOLO11的Backbone,Neck,Head网络结构原理与源码,解读训练全流程的原理与源码......
  • springboot/ssm二手书籍交易系统Java二手书商城购物系统web书籍源码
    springboot/ssm二手书籍交易系统Java二手书商城购物系统web书籍源码基于springboot(可改ssm)+vue项目开发语言:Java框架:springboot/可改ssm+vueJDK版本:JDK1.8(或11)服务器:tomcat数据库:mysql5.7(或8.0)数据库工具:Navicat/sqlyog开发软件:eclipse/idea依赖管理包:Maven......
  • springboot/ssm线上教育培训办公系统Java代码web项目在线课程作业源码
    springboot/ssm线上教育培训办公系统Java代码web项目在线课程作业源码基于springboot(可改ssm)+html+vue项目开发语言:Java框架:springboot/可改ssm+vueJDK版本:JDK1.8(或11)服务器:tomcat数据库:mysql5.7(或8.0)数据库工具:Navicat/sqlyog开发软件:eclipse/idea依赖管理......
  • 【免费毕设文档】自习室预约选座与门禁系统平台小程序uniapp源码开题报告
       博主介绍:......
  • flask框架毕业生信息管理毕设源码+论文
    本系统(程序+源码+数据库+调试部署+开发环境)带论文文档1万字以上,文末可获取,系统界面在最后面。系统程序文件列表开题报告内容一、选题背景关于毕业生信息管理的研究,现有研究主要以学校整体的信息管理系统为主,专门针对毕业生这一特定群体信息管理的研究较少。在国内外的教育......
  • flask框架宠物之家电子商务网站毕设源码+论文
    本系统(程序+源码+数据库+调试部署+开发环境)带论文文档1万字以上,文末可获取,系统界面在最后面。系统程序文件列表开题报告内容一、选题背景关于宠物之家电子商务网站的研究,现有研究主要以传统电子商务模式为主,专门针对宠物领域电子商务网站的研究较少。在当前社会,宠物行业蓬......
  • flask框架大学生社团管理系统毕设源码+论文
    本系统(程序+源码+数据库+调试部署+开发环境)带论文文档1万字以上,文末可获取,系统界面在最后面。系统程序文件列表开题报告内容一、选题背景关于大学生社团管理系统的研究,现有研究主要以社团基本管理功能为主,如成员管理、活动管理等,专门针对社团内部细致的部门结构相关功能,像......
  • flask框架宠物领养系统毕设源码+论文
    本系统(程序+源码+数据库+调试部署+开发环境)带论文文档1万字以上,文末可获取,系统界面在最后面。系统程序文件列表开题报告内容一、选题背景关于宠物领养相关系统的研究,现有研究主要以宠物交易系统或者单纯的宠物信息展示平台为主,专门针对宠物领养系统全方位功能(如包含用户、......