首页 > 其他分享 >presto 查询调度流程 (Coordinator Scheduler)

presto 查询调度流程 (Coordinator Scheduler)

时间:2024-06-11 11:33:07浏览次数:15  
标签:schedule fragment graph presto 调度 Coordinator Scheduler new stage

based on tag: 0.287

presto的 scheduler 是 SqlQueryScheduler 这个类控制的, 这个class主要是负责调度物理执行计划。调度具体的每个SqlStageExecution. 这个Stage可以当成Fragment的一个概念

他会先把所有的stage创建一个schedule计划。一共有两种schedule计划,一个是all-at-once,另一个是phased,还有一个 adaptive-phased (其实就是根据stage数量来决定到底是all-at-once还是phased).

        // AdaptivePhasedExecutionPolicy.java createExecutionSchedule
        if (stages.size() > getMaxStageCountForEagerScheduling(session)) {
            return new PhasedExecutionSchedule(stages);
        }
        else {
            return new AllAtOnceExecutionSchedule(stages);
        }

SqlQueryScheduler 的主要流程是什么呢?

  scheduler:
  schedules = // 每个 stages (section 可以理解为单个查询的所有SQL的fragment) 创建一个schedules 
  while (schedules.all_finish()) {
      stage_wait_schedule = schedule.get_next_stages();
      stages.addAll(stage_wait_schedule)
      for stage in stages:
        stage.scheduler.schedule() // (这里是stage的scheduler,比如发async rpc, 分配split/计算节点)
        collect_block_schedule_result() // 收集block schedule result
      wait_if_has_block_events() // 这里是等block result 的 future 最多等1s
  }

可以看到其实SQLSchedule的流程就是先获取下次要调度哪些stage,然后执行每个stage中的scheduler的schedule()方法,
那么这个 ExecutionSchedule (schedule计划) 里面是如何提供这个顺序的呢?
AllAtOnceExecutionSchedule:
给出一个调度顺序,然后把所有的stage都拿出来调度。
PhasedExecutionSchedule:
获取一个拓扑执行序,每次返回一部分来执行, 如果存在join这类的fragment,会优先调度 build side,再调度probe side。对于union这类的会一个一个执行
他的大概算法就是在plan中添加一个 build 到 probe 的 edge

对于这样的一个plan (里面每个节点代表一个fragment),会先调度 build side,然后再调度 probe side。
但是对于broadcast join 这个算法会出现环,所以他这里做了一个处理:

@VisibleForTesting
    static List<Set<PlanFragmentId>> extractPhases(Collection<PlanFragment> fragments)
    {
        // Build a graph where the plan fragments are vertexes and the edges represent
        // a before -> after relationship.  For example, a join hash build has an edge
        // to the join probe.
        // 先构建这个 graph
        Graph<PlanFragmentId, DefaultEdge> graph = new DefaultDirectedGraph<>(DefaultEdge.class);
        fragments.forEach(fragment -> graph.addVertex(fragment.getId()));

        Visitor visitor = new Visitor(fragments, graph);
        for (PlanFragment fragment : fragments) {
            visitor.processFragment(fragment.getId());
        }

        // Computes all the strongly connected components of the directed graph.
        // These are the "phases" which hold the set of fragments that must be started
        // at the same time to avoid deadlock.
        // 找到所有强关系集合 如果有A->B B->A 这两个就是强关系 
        List<Set<PlanFragmentId>> components = new KosarajuStrongConnectivityInspector<>(graph).stronglyConnectedSets();

        Map<PlanFragmentId, Set<PlanFragmentId>> componentMembership = new HashMap<>();
        for (Set<PlanFragmentId> component : components) {
            for (PlanFragmentId planFragmentId : component) {
                componentMembership.put(planFragmentId, component);
            }
        }

        // build graph of components (phases)
        // 只有两个边不是强关系才会添加
        Graph<Set<PlanFragmentId>, DefaultEdge> componentGraph = new DefaultDirectedGraph<>(DefaultEdge.class);
        components.forEach(componentGraph::addVertex);
        for (DefaultEdge edge : graph.edgeSet()) {
            PlanFragmentId source = graph.getEdgeSource(edge);
            PlanFragmentId target = graph.getEdgeTarget(edge);

            Set<PlanFragmentId> from = componentMembership.get(source);
            Set<PlanFragmentId> to = componentMembership.get(target);
            if (!from.equals(to)) { // the topological order iterator below doesn't include vertices that have self-edges, so don't add them
                componentGraph.addEdge(from, to);
            }
        }

        // 所以对于broadcast来说Join的fragment和 build side的fragment会同时被调度, 因为他们是一组强关系
        List<Set<PlanFragmentId>> schedulePhases = ImmutableList.copyOf(new TopologicalOrderIterator<>(componentGraph));
        return schedulePhases;
    }

那么对于一个task,是否可以被调度多次呢?

    file: PhasedExecutionSchedule.java
    private void removeCompletedStages()
    {
        for (Iterator<StageExecutionAndScheduler> stageIterator = activeSources.iterator(); stageIterator.hasNext(); ) {
            StageExecutionState state = stageIterator.next().getStageExecution().getState();
            // state 的状态有 PLANNED SCHEDULING SCHEDULED RUNNING 以及 isDone(CANCELED FINISHED 等)
            // 所以一个一次调度之后有可能是SCHEDUING (比如还有split没有分配),但是没调度完成,所以下次调度还要调度这些 task
            if (state == SCHEDULED || state == RUNNING || state.isDone()) {
                stageIterator.remove();
            }
        }
    }

标签:schedule,fragment,graph,presto,调度,Coordinator,Scheduler,new,stage
From: https://www.cnblogs.com/stdpain/p/18241695

相关文章

  • PrestoUDF故障排除与恢复:快速解决问题
    PrestoUDF故障排除与恢复:快速解决问题1.背景介绍Presto是一种开源的大数据分析引擎,由Facebook开发和维护。它旨在快速高效地查询来自不同数据源的大型分布式数据集。Presto支持使用SQL语言进行查询,并支持用户定义函数(UDF)的扩展功能。UDF(UserDefinedFunction)允许......
  • 【Django技术深潜】揭秘Django定时任务利器:django_apscheduler全面解析与实战
    在现代Web开发中,定时任务是不可或缺的一部分,无论是定期数据分析、定时发送邮件、还是系统维护脚本,都需要精准的定时调度。Django作为Python世界中强大的Web框架,其对定时任务的支持自然也是开发者关注的重点。本文将深入探讨Django定时任务解决方案,特别是聚焦于django_apscheduler......
  • Django中使用Celery和APScheduler实现定时任务
    在之前的文章我们已经学习了Celery和APScheduler的基本使用,下面让我们来了解一下如何在Django中使用Celery和APSchedulerCelery1.前提工作python3.7pipinstallcelerypipinstalleventlet#5.0版本以下pipinstallimportlib-metadata==4.8.3(python3.7下可能会出现报......
  • 制作dolphinscheduler+spark+hadoop镜像
    项目需要在ds中执行spark集群任务,并且交付方式是提供一个镜像,所以要把这3者做成一个镜像配置进行相应配置。 1.准备基础镜像有大神已经制作好了spark+hadoop镜像,参考链接:https://zhuanlan.zhihu.com/p/421375012我们下载此镜像dockerpulls1mplecc/spark-hadoop:3然后准......
  • Dolphinscheduler不重启加载Oracle驱动
    转载自刘茫茫看山问题背景某天我们的租户反馈数据库连接缺少必要的驱动,我们通过日志查看确实是缺少部分数据库的驱动,因为DolphinScheduler默认只带了Oracle和MySQL的驱动,并且需要将pom文件中的test模式去掉才可以在打包的时候引入。我们的任务量比较大,在3.0存在容错机制的情况下......
  • Apache DolphinScheduler(2.x和3.x版本) 本地环境搭建教程一览
    在迅速变化的技术领域,本地环境的搭建和调试对于软件开发的效率和效果至关重要。本文将详细介绍如何为ApacheDolphinScheduler搭建一个高效的本地开发环境,包括2.x和3.x版本的设置方法。无论您是初学者还是有经验的开发者,本指南都将帮助您快速启动并运行,有效地进行本地代码调试。......
  • DolphinScheduler 3.3.0版本更新一览
    ApacheDolphinScheduler即将迎来3.3.0版本的发布,届时将有一系列重要的更新和改进。在近期的社区5月份用户线上分享会上,项目PMC阮文俊为大家介绍了3.3.0版本将带来的主要更新和改进,并为大家指出了如何参与社区的方式。什么是DolphinScheduler?DolphinScheduler是一个开源的项目,......
  • dolphinscheduler 租户不存在问题 记录
    ds1、创建租户(在数据库中)test-20240522,此时linux服务上并未创建目录和用户test-202405222、在配置好工作流,真正执行的时候,在创建linux/home文件夹下创建相应用户目录和和登录用户test-20240522case1:1、如果删除服务器上userdel-rtest-20240522删除test-20......
  • APScheduler的基本使用
    第一步:安装APSchedulerpipinstallapscheduler第二步:配置APScheduler#导入模块fromapscheduler.schedulers.backgroundimportBackgroundScheduler#自定义定时启动的任务defmy_job():print("HelloWorld")#创建调度器实例scheduler=BackgroundScheduler......
  • Dolphinscheduler-3.2.0集群部署安装
    一、下载二进制安装包集群配置主机名IP部署服务hadoop101192.168.12.101MasterServer、WorkServer、ApiServer、AlertServerhadoop102192.168.12.102WorkServerhadoop103192.168.12.103WorkServer 二、配置环境1. 2. 3. 4.#Licensedto......