based on tag: 0.287
presto的 scheduler 是 SqlQueryScheduler 这个类控制的, 这个class主要是负责调度物理执行计划。调度具体的每个SqlStageExecution. 这个Stage可以当成Fragment的一个概念
他会先把所有的stage创建一个schedule计划。一共有两种schedule计划,一个是all-at-once,另一个是phased,还有一个 adaptive-phased (其实就是根据stage数量来决定到底是all-at-once还是phased).
// AdaptivePhasedExecutionPolicy.java createExecutionSchedule
if (stages.size() > getMaxStageCountForEagerScheduling(session)) {
return new PhasedExecutionSchedule(stages);
}
else {
return new AllAtOnceExecutionSchedule(stages);
}
SqlQueryScheduler 的主要流程是什么呢?
scheduler:
schedules = // 每个 stages (section 可以理解为单个查询的所有SQL的fragment) 创建一个schedules
while (schedules.all_finish()) {
stage_wait_schedule = schedule.get_next_stages();
stages.addAll(stage_wait_schedule)
for stage in stages:
stage.scheduler.schedule() // (这里是stage的scheduler,比如发async rpc, 分配split/计算节点)
collect_block_schedule_result() // 收集block schedule result
wait_if_has_block_events() // 这里是等block result 的 future 最多等1s
}
可以看到其实SQLSchedule的流程就是先获取下次要调度哪些stage,然后执行每个stage中的scheduler的schedule()方法,
那么这个 ExecutionSchedule (schedule计划) 里面是如何提供这个顺序的呢?
AllAtOnceExecutionSchedule:
给出一个调度顺序,然后把所有的stage都拿出来调度。
PhasedExecutionSchedule:
获取一个拓扑执行序,每次返回一部分来执行, 如果存在join这类的fragment,会优先调度 build side,再调度probe side。对于union这类的会一个一个执行
他的大概算法就是在plan中添加一个 build 到 probe 的 edge
对于这样的一个plan (里面每个节点代表一个fragment),会先调度 build side,然后再调度 probe side。
但是对于broadcast join 这个算法会出现环,所以他这里做了一个处理:
@VisibleForTesting
static List<Set<PlanFragmentId>> extractPhases(Collection<PlanFragment> fragments)
{
// Build a graph where the plan fragments are vertexes and the edges represent
// a before -> after relationship. For example, a join hash build has an edge
// to the join probe.
// 先构建这个 graph
Graph<PlanFragmentId, DefaultEdge> graph = new DefaultDirectedGraph<>(DefaultEdge.class);
fragments.forEach(fragment -> graph.addVertex(fragment.getId()));
Visitor visitor = new Visitor(fragments, graph);
for (PlanFragment fragment : fragments) {
visitor.processFragment(fragment.getId());
}
// Computes all the strongly connected components of the directed graph.
// These are the "phases" which hold the set of fragments that must be started
// at the same time to avoid deadlock.
// 找到所有强关系集合 如果有A->B B->A 这两个就是强关系
List<Set<PlanFragmentId>> components = new KosarajuStrongConnectivityInspector<>(graph).stronglyConnectedSets();
Map<PlanFragmentId, Set<PlanFragmentId>> componentMembership = new HashMap<>();
for (Set<PlanFragmentId> component : components) {
for (PlanFragmentId planFragmentId : component) {
componentMembership.put(planFragmentId, component);
}
}
// build graph of components (phases)
// 只有两个边不是强关系才会添加
Graph<Set<PlanFragmentId>, DefaultEdge> componentGraph = new DefaultDirectedGraph<>(DefaultEdge.class);
components.forEach(componentGraph::addVertex);
for (DefaultEdge edge : graph.edgeSet()) {
PlanFragmentId source = graph.getEdgeSource(edge);
PlanFragmentId target = graph.getEdgeTarget(edge);
Set<PlanFragmentId> from = componentMembership.get(source);
Set<PlanFragmentId> to = componentMembership.get(target);
if (!from.equals(to)) { // the topological order iterator below doesn't include vertices that have self-edges, so don't add them
componentGraph.addEdge(from, to);
}
}
// 所以对于broadcast来说Join的fragment和 build side的fragment会同时被调度, 因为他们是一组强关系
List<Set<PlanFragmentId>> schedulePhases = ImmutableList.copyOf(new TopologicalOrderIterator<>(componentGraph));
return schedulePhases;
}
那么对于一个task,是否可以被调度多次呢?
file: PhasedExecutionSchedule.java
private void removeCompletedStages()
{
for (Iterator<StageExecutionAndScheduler> stageIterator = activeSources.iterator(); stageIterator.hasNext(); ) {
StageExecutionState state = stageIterator.next().getStageExecution().getState();
// state 的状态有 PLANNED SCHEDULING SCHEDULED RUNNING 以及 isDone(CANCELED FINISHED 等)
// 所以一个一次调度之后有可能是SCHEDUING (比如还有split没有分配),但是没调度完成,所以下次调度还要调度这些 task
if (state == SCHEDULED || state == RUNNING || state.isDone()) {
stageIterator.remove();
}
}
}
标签:schedule,fragment,graph,presto,调度,Coordinator,Scheduler,new,stage
From: https://www.cnblogs.com/stdpain/p/18241695