首页 > 其他分享 >dremio fragment 执行简单说明

dremio fragment 执行简单说明

时间:2022-12-27 17:56:17浏览次数:48  
标签:dremio java observer queryContext fragment getExecutionControls context 简单 INJEC

dremio 的内部执行实际上与drill 是比较类似的,只是dremio 做了不少的优化处理

一个调用流程说明

来自官方文档

 

 

  • 参考调用链

 

 

实际执行管理的类

AttemptManager

 
 @Override
  public void run() {
    // rename the thread we're using for debugging purposes
    final Thread currentThread = Thread.currentThread();
    final String originalName = currentThread.getName();
    currentThread.setName(queryIdString + ":foreman");
   
    try {
      injector.injectChecked(queryContext.getExecutionControls(), INJECTOR_TRY_BEGINNING_ERROR,
        ForemanException.class);
 
      observer.beginState(AttemptObserver.toEvent(AttemptEvent.State.PENDING));
 
      observer.queryStarted(queryRequest, queryContext.getSession().getCredentials().getUserName());
 
      String ruleSetEngine = ruleBasedEngineSelector.resolveAndUpdateEngine(queryContext);
      ResourceSchedulingProperties resourceSchedulingProperties = new ResourceSchedulingProperties();
      resourceSchedulingProperties.setRoutingEngine(queryContext.getSession().getRoutingEngine());
      resourceSchedulingProperties.setRuleSetEngine(ruleSetEngine);
      final GroupResourceInformation groupResourceInformation =
        maestroService.getGroupResourceInformation(queryContext.getOptions(), resourceSchedulingProperties);
      queryContext.setGroupResourceInformation(groupResourceInformation);
 
      // Checks for Run Query privileges for the selected Engine
      checkRunQueryAccessPrivilege(groupResourceInformation);
 
      // planning is done in the command pool
      commandPool.submit(CommandPool.Priority.LOW, attemptId.toString() + ":foreman-planning",
        (waitInMillis) -> {
          observer.commandPoolWait(waitInMillis);
 
          injector.injectPause(queryContext.getExecutionControls(), INJECTOR_PENDING_PAUSE, logger);
          injector.injectChecked(queryContext.getExecutionControls(), INJECTOR_PENDING_ERROR,
            ForemanException.class);
 
          plan();
          injector.injectPause(queryContext.getExecutionControls(), INJECTOR_PLAN_PAUSE, logger);
          injector.injectChecked(queryContext.getExecutionControls(), INJECTOR_PLAN_ERROR,
            ForemanException.class);
          return null;
        }, runInSameThread).get();
   
      if (command.getCommandType() == CommandType.ASYNC_QUERY) {
        AsyncCommand asyncCommand = (AsyncCommand) command;
        committer = asyncCommand.getPhysicalPlan().getCommitter();
        queryCleaner =  asyncCommand.getPhysicalPlan().getCleaner();
 
        moveToState(QueryState.STARTING, null);
        // 通过maestroService 执行查询
        maestroService.executeQuery(queryId, queryContext, asyncCommand.getPhysicalPlan(), runInSameThread,
          new MaestroObserverWrapper(observer), new CompletionListenerImpl());
        asyncCommand.executionStarted();
      }
 
      observer.beginState(AttemptObserver.toEvent(AttemptEvent.State.RUNNING));
      moveToState(QueryState.RUNNING, null);
 
      injector.injectChecked(queryContext.getExecutionControls(), INJECTOR_TRY_END_ERROR,
        ForemanException.class);
    } catch (ResourceUnavailableException e) {
      // resource allocation failure is treated as a cancellation and not a failure
      try {
        // the caller (JobEventCollatingObserver) expects metadata event before a cancel/complete event.
        observer.planCompleted(null);
      } catch (Exception ignore) {
      }
      profileTracker.setCancelReason(e.getMessage());
      moveToState(QueryState.CANCELED, null); // ENQUEUED/STARTING -> CANCELED transition
    } catch (final UserException | ForemanException e) {
      moveToState(QueryState.FAILED, e);
    } catch (final OutOfMemoryError e) {
      if (ErrorHelper.isDirectMemoryException(e)) {
        moveToState(QueryState.FAILED, UserException.memoryError(e).build(logger));
      } else {
        /*
         * FragmentExecutors use a NodeStatusListener to watch out for the death of their query's AttemptManager. So, if we
         * die here, they should get notified about that, and cancel themselves; we don't have to attempt to notify
         * them, which might not work under these conditions.
         */
        ProcessExit.exitHeap(e);
      }
    } catch (Throwable ex) {
      UserCancellationException t = ErrorHelper.findWrappedCause(ex, UserCancellationException.class);
      if (t != null) {
        moveToState(QueryState.CANCELED, null);
      } else {
        moveToState(QueryState.FAILED,
          new ForemanException("Unexpected exception during fragment initialization: " + ex.getMessage(), ex));
      }
 
    } finally {
      /*
       * Begin accepting external events.
       *
       * Doing this here in the finally clause will guarantee that it occurs. Otherwise, if there
       * is an exception anywhere during setup, it wouldn't occur, and any events that are generated
       * as a result of any partial setup that was done (such as the FragmentSubmitListener,
       * the ResponseSendListener, or an external call to cancel()), will hang the thread that makes the
       * event delivery call.
       *
       * If we do throw an exception during setup, and have already moved to QueryState.FAILED, we just need to
       * make sure that we can't make things any worse as those events are delivered, but allow
       * any necessary remaining cleanup to proceed.
       *
       * Note that cancellations cannot be simulated before this point, i.e. pauses can be injected, because AttemptManager
       * would wait on the cancelling thread to signal a resume and the cancelling thread would wait on the AttemptManager
       * to accept events.
       */
      try {
        stateSwitch.start();
      } catch (Exception e) {
        moveToState(QueryState.FAILED, e);
      }
 
      // restore the thread's original name
      currentThread.setName(originalName);
    }
 
    /*
     * Note that despite the run() completing, the AttemptManager could continue to exist, and receives
     * events about fragment completions. It won't go away until everything is completed, failed, or cancelled.
     */
  }

MaestroServiceImpl

@Override
  public void executeQuery(
    QueryId queryId,
    QueryContext context,
    PhysicalPlan physicalPlan,
    boolean runInSameThread,
    MaestroObserver observer,
    CompletionListener listener) throws ExecutionSetupException, ResourceAllocationException {
 
    injector.injectChecked(context.getExecutionControls(), INJECTOR_EXECUTE_QUERY_BEGIN_ERROR,
      ExecutionSetupException.class);
 
    // Set up the active query.
    QueryTracker queryTracker = new QueryTrackerImpl(queryId, context, physicalPlan, reader,
      resourceAllocator.get(), executorSetService.get(), executorSelectionService.get(),
      executorServiceClientFactory.get(), jobTelemetryClient.get(), observer,
      listener,
      () -> closeQuery(queryId), closeableSchedulerThreadPool);
    Preconditions.checkState(activeQueryMap.putIfAbsent(queryId, queryTracker) == null,
    "query already queued for execution " + QueryIdHelper.getQueryId(queryId));
 
    // allocate execution resources on the calling thread, as this will most likely block
    queryTracker.allocateResources();
 
    try {
      observer.beginState(AttemptObserver.toEvent(AttemptEvent.State.EXECUTION_PLANNING));
 
      // do execution planning in the bound pool
      commandPool.get().submit(CommandPool.Priority.MEDIUM,
        QueryIdHelper.getQueryId(queryId) + ":execution-planning",
        (waitInMillis) -> {
          injector.injectChecked(context.getExecutionControls(),
            INJECTOR_COMMAND_POOL_SUBMIT_ERROR, ExecutionSetupException.class);
 
          observer.commandPoolWait(waitInMillis);
          queryTracker.planExecution();
          return null;
        }, runInSameThread).get();
    } catch (ExecutionException|InterruptedException e) {
      throw new ExecutionSetupException("failure during execution planning", e);
    }
 
    observer.beginState(AttemptObserver.toEvent(AttemptEvent.State.STARTING));
    // propagate the fragments.
    queryTracker.startFragments();
 
    injector.injectChecked(context.getExecutionControls(), INJECTOR_EXECUTE_QUERY_END_ERROR,
      ExecutionSetupException.class);
  }

执行计划

QueryTrackerImpl

@Override
public void planExecution() throws ExecutionSetupException {
  executionPlanningResources = ExecutionPlanCreator.getParallelizationInfo(context, observer,
    physicalPlan, executorSelectionService, resourceTracker.getResourceSchedulingDecisionInfo());
 
  injector.injectChecked(context.getExecutionControls(),
    INJECTOR_EXECUTION_PLANNING_ERROR, ExecutionSetupException.class);
  injector.injectPause(context.getExecutionControls(),
    INJECTOR_EXECUTION_PLANNING_PAUSE, logger);
 
  executionPlan = ExecutionPlanCreator.getExecutionPlan(context, reader, observer, physicalPlan,
    resourceTracker.getResources(),
    executionPlanningResources.getPlanningSet(), executorSelectionService,
    resourceTracker.getResourceSchedulingDecisionInfo(),
    executionPlanningResources.getGroupResourceInformation());
  observer.planCompleted(executionPlan);
  physicalPlan = null; // no longer needed
}

Fragment 执行

QueryTrackerImpl

@Override
  public void startFragments() throws ExecutionSetupException {
    Preconditions.checkNotNull(executionPlan, "execution plan required");
 
    // Populate fragments before sending the query fragments.
    fragmentTracker.populate(executionPlan.getFragments(), resourceTracker.getResourceSchedulingDecisionInfo());
 
    AbstractMaestroObserver fragmentActivateObserver  = new AbstractMaestroObserver() {
      @Override
      public void activateFragmentFailed(Exception ex) {
        fragmentTracker.sendOrActivateFragmentsFailed(ex);
      }
    };
 
    injector.injectPause(context.getExecutionControls(),
      INJECTOR_STARTING_PAUSE, logger);
    try {
     // FragmentStarter 此处会通过rpc 调用ExecutorService,里边包含了对于FragmentExecutors 服务的调用
      FragmentStarter starter = new FragmentStarter(executorServiceClientFactory,
        resourceTracker.getResourceSchedulingDecisionInfo(),
        context.getExecutionControls(),context.getOptions());
      starter.start(executionPlan, MaestroObservers.of(observer, fragmentActivateObserver));
      executionPlan = null; // no longer needed
 
      progressTracker = new ProgressTracker(queryId, jobTelemetryClient, observer);
    } catch (Exception ex) {
      fragmentTracker.sendOrActivateFragmentsFailed(ex);
      throw ex;
    }
  }

说明

以上是一个简单的执行说明,实际上只包含了具体的查询调用,关于fragment 的数据聚合部分没有说明,后边进行详细说明下

参考资料

sabot/kernel/src/main/java/com/dremio/exec/planner/sql/handlers/query/NormalHandler.java
sabot/kernel/src/main/java/com/dremio/exec/planner/sql/handlers/commands/HandlerToPreparePlan.java
sabot/kernel/src/main/java/com/dremio/exec/planner/sql/handlers/commands/HandlerToPrepareArrowPlan.java
sabot/kernel/src/main/java/com/dremio/exec/planner/sql/handlers/commands/HandlerToExec.java
sabot/kernel/src/main/java/com/dremio/exec/work/foreman/AttemptManager.java
sabot/kernel/src/main/java/com/dremio/exec/maestro/MaestroService.java
sabot/kernel/src/main/java/com/dremio/exec/planner/fragment/SimpleParallelizer.java
https://drill.apache.org/docs/drill-query-execution/

标签:dremio,java,observer,queryContext,fragment,getExecutionControls,context,简单,INJEC
From: https://www.cnblogs.com/rongfengliang/p/17008654.html

相关文章

  • dremio ExecutionPlanCreator 简单说明
    dremio在进行了一系列的sql解析,sql校验,逻辑计划、物理计划之后就需要实际的执行计划生成以及具体的数据处理了ExecutionPlanCreator的作用就是进行执行计划的生成,在d......
  • net中基于lodap组件实现web打印 代码简单、部署轻松
    实现Web场景下的打印功能,推荐Lodap组件。虽然国产,但功能一点不输国外的组件。提供的示例也很多,这里就抛砖引玉下传送门net中基于lodap组件实现web打印代码简单、部署轻......
  • Uploader上传 控件升级6.5,增加简单图片压缩上传
    一、 Uploader上传控件升级6.5,增加简单图片压缩上传升级了2个前端功能1.前端压缩,图片上传(imgsingle):不改变图片的比例,在指定范围内等比例缩放,最小(minWidth*minHeight);......
  • 卡片— 蓝桥杯(简单)
    importjava.util.Scanner;//1:无需package//2:类名必须Main,不可修改publicclassMain{publicstaticvoidmain(String[]args){inti=2021;......
  • el-admin框架简单解析
    el-admin框架简单解析​​el-admin简单了解​​​​使用框架的四大步​​​​前端文件架构​​​​el-admin前端部分解析​​​​前端Vue目录结构​​​​Layout布局​​......
  • Vue :一种简单的前端分辨率适配和echarts适配方案
    一、背景对前端不甚了解,对分辨率适配一窍不通,奈何不得不用。文章中的分辨率适配原理我可能说不太明白,但会写出清晰可行的操作步骤。二、核心代码分辨率适配用到了rem、......
  • vue3_02ref操作简单类型
    vue3中提供了ref()函数可以把数据转换为响应式数据。<template><div>{{num}}</div><button@click="add">这是按钮</button></template><sc......
  • 各个测试工具功能的简单描述及使用
    一、Xmind功能:1.编写测试用例、执行记录测试用例;使用:编写测试用例方法:等价类划分、边界值、场景法、错误推测法;二、Charles(代理服务器,安装SS......
  • phonegap3.1.0自学笔记01_命令行界面(CLI)简单使用
    要使用phonegap的CLI必须首先安装好phonegap,phonegap的安装还请参看我的另外一篇文章:​​windows7搭建phonegap3Android开发环境​​。本篇文章介绍CLI的简单使用,由于本人......
  • 同步、异步、阻塞、非阻塞---BIO、NIO、AIO的简单理解
    概念BIO:同步并阻塞,服务实现模式为一个连接对应一个线程,即客户端发送一个连接,服务端要有一个线程来处理。如果连接多了,线程数量不够,就只能等待,即会发生阻塞。NIO:同步非阻塞,服......