首页 > 其他分享 >通过dremio 一个单元测试了解基本查询处理过程

通过dremio 一个单元测试了解基本查询处理过程

时间:2022-12-24 13:23:12浏览次数:39  
标签:dremio join business 查询处理 单元测试 id queryContext new final

dremio 属于一个比较复杂的系统,官方有不少模块,官方同时也包含了一个不错的单元测试可以基本了解查询的处理
从session到查询,到sql 解析,关系节点转换,逻辑计划器以及物理计划以及执行计划

参考代码

 
public class Limit0LogicalToPhysicalTest extends BaseTestQuery {
 
  private static String TEST_PATH = TestTools.getWorkingPath() + "/src/test/resources";
  private static File tblPath = null;
 
  @BeforeClass
  public static void createTable() throws Exception {
    tblPath = new File(getDfsTestTmpSchemaLocation(), "yelp");
    FileUtils.deleteQuietly(tblPath);
    FileUtils.copyFileToDirectory(new File(TEST_PATH + "/yelp_business.json"), tblPath);
    FileUtils.moveFile(new File(tblPath + "/yelp_business.json"), new File(tblPath + "/1.json"));
    FileUtils.copyFile(new File(tblPath + "/1.json"), new File(tblPath + "/2.json"));
  }
 
  @AfterClass
  public static void cleanUpTable() throws Exception {
    FileUtils.deleteQuietly(tblPath);
  }
 
  @Ignore
  public void ExchangesKeepTest() throws Exception {
 
    final String yelpTable = TEMP_SCHEMA + ".\"yelp\"";
    final String sql = "SELECT nested_0.review_id AS review_id, nested_0.user_id AS user_id, nested_0.votes AS votes," +
      " nested_0.stars AS stars, join_business.business_id AS business_id0, join_business.neighborhoods AS neighborhoods, join_business.city AS city, join_business.latitude AS latitude, join_business.review_count AS review_count, join_business.full_address AS full_address, join_business.stars AS stars0, join_business.categories AS categories, join_business.state AS state, join_business.longitude AS longitude\n" +
      "FROM (\n" +
      "  SELECT review_id, user_id, votes, stars, business_id\n" +
      "  FROM cp.\"yelp_review.json\" where 1 = 0\n" +
      ") nested_0\n" +
      " FULL JOIN " + yelpTable + " AS join_business ON nested_0.business_id = join_business.business_id";
    // 上下文配置信息
    final SabotContext context = getSabotContext();
    context.getOptionManager().setOption(
      OptionValue.createLong(OptionValue.OptionType.SYSTEM, "planner.slice_target", 1)
    );
    context.getOptionManager().setOption(
      OptionValue.createLong(OptionValue.OptionType.SYSTEM, "planner.width.max_per_node", 10)
    );
    context.getOptionManager().setOption(
      OptionValue.createBoolean(OptionValue.OptionType.SYSTEM, "planner.enable_mux_exchange", true)
    );
   // 查询上下文
    final QueryContext queryContext = new QueryContext(session(), context, UserBitShared.QueryId.getDefaultInstance());
   // AttemptObserver
    final AttemptObserver observer = new PassthroughQueryObserver(ExecTest.mockUserClientConnection(null));
   // SqlConverter 进行sql 转换处理的
    final SqlConverter converter = new SqlConverter(
        queryContext.getPlannerSettings(),
        queryContext.getOperatorTable(),
        queryContext,
        queryContext.getMaterializationProvider(),
        queryContext.getFunctionRegistry(),
        queryContext.getSession(),
        observer,
        queryContext.getCatalog(),
        queryContext.getSubstitutionProviderFactory(),
        queryContext.getConfig(),
        queryContext.getScanResult(),
        queryContext.getRelMetadataQuerySupplier());
   // 解析sqlnode
    final SqlNode node = converter.parse(sql);
   // sql 处理器配置
    final SqlHandlerConfig config = new SqlHandlerConfig(queryContext, converter, observer, null);
    // sql 校验以及转换,依赖catalog,实际上是基于了calcite 处理
    final ConvertedRelNode convertedRelNode = PrelTransformer.validateAndConvert(config, node);
    final RelDataType validatedRowType = convertedRelNode.getValidatedRowType();
    final RelNode queryRelNode = convertedRelNode.getConvertedNode();
 
    final Rel drel = PrelTransformer.convertToDrel(config, queryRelNode, validatedRowType);
    // 物理节点获取
    final Pair<Prel, String> convertToPrel = PrelTransformer.convertToPrel(config, drel);
    final Prel prel = convertToPrel.getKey();
    final String prePhysicaltextPlan = convertToPrel.getValue();
 
    assertThat(prePhysicaltextPlan).contains("HashToRandomExchange");
    assertThat(prePhysicaltextPlan).contains("UnorderedMuxExchange");
    assertThat(prePhysicaltextPlan).contains("Empty");
    assertThat(prePhysicaltextPlan).contains("EasyScan");
   // 物理操作器
    final PhysicalOperator pop = PrelTransformer.convertToPop(config, prel);
   // 物理计划  
    final PhysicalPlan plan = PrelTransformer.convertToPlan(config, pop);
    final String postPhysicaltextPlan = plan.unparse(config.getContext().getLpPersistence().getMapper().writer());
 
    assertThat(postPhysicaltextPlan).contains("EmptyValues");
    assertThat(postPhysicaltextPlan).contains("EasyGroupScan");
    assertThat(postPhysicaltextPlan).contains("unordered-mux-exchange");
    assertThat(postPhysicaltextPlan).contains("hash-to-random-exchange");
   
    PhysicalPlanReader pPlanReader = new PhysicalPlanReader(
      DEFAULT_SABOT_CONFIG, CLASSPATH_SCAN_RESULT, new LogicalPlanPersistence(DEFAULT_SABOT_CONFIG, CLASSPATH_SCAN_RESULT),
      CoordinationProtos.NodeEndpoint.getDefaultInstance(),
      DirectProvider.wrap(Mockito.mock(CatalogService.class)), context);
    // 执行计划
    ExecutionPlan exec = ExecutionPlanCreator
      .getExecutionPlan(queryContext, pPlanReader, AbstractMaestroObserver.NOOP, plan,
      QueueType.SMALL);
    List<PlanFragmentFull> fragments  = exec.getFragments();
 
    int scanFrags = 0;
    for (PlanFragmentFull fragment : fragments) {
      if (new String(fragment.getMajor().getFragmentJson().toByteArray()).contains("easy-sub-scan")) {
        scanFrags++;
      }
    }
    assertEquals(2, scanFrags);
 
  }
 //  用户会话创建,mock 使用
  private static UserSession session() {
    return UserSession.Builder.newBuilder()
      .withSessionOptionManager(
        new SessionOptionManagerImpl(getSabotContext().getOptionValidatorListing()),
        getSabotContext().getOptionManager())
      .withUserProperties(UserProtos.UserProperties.getDefaultInstance())
      .withCredentials(UserBitShared.UserCredentials.newBuilder().setUserName("foo").build())
      .setSupportComplexTypes(true)
      .build();
  }
 
}
说明

以上是一个简单的说明,实际上dremio 执行是比较复杂的,使用了不同的优化器,同时还包含了物化处理,查询规则,底层数据存储读取,元数据关联
以上是一个单元测试的,实际执行还包含了具体命令的处理,线程调度。。。。。,但是基于上边可以大致了解执行的处理是值得参考的

参考资料

sabot/kernel/src/test/java/com/dremio/exec/planner/sql/handlers/commands/Limit0LogicalToPhysicalTest.java

标签:dremio,join,business,查询处理,单元测试,id,queryContext,new,final
From: https://www.cnblogs.com/rongfengliang/p/17002782.html

相关文章

  • 什么单元测试、执行单元测试的目的、单元测试环境、单元测试的测试策略
    ​​单元测试​​,对软件在设计的最小单元中进行正确性检测的测试,将可能存在的错误在最小范围内发现并解决。对于单元测试中单元的含义,一般来说,要根据实际情况去判定其具体含......
  • Python单元测试报告框架PyTestReport
    文章目录开局一张图安装通过pip安装通过安装包通过源码(最新版本)使用单元测试使用样例附录​​原文链接​​如果你是Java栈的同学,那么你可能知道extentreport测试报告框架。......
  • Python单元测试框架unittest+requests +HTMLTestRunnerNew
    1)写用例TestCase2)执行用例1:TestSuite存储用例,2:TestLoader找用例,存储用例,存放指定的TestSuite3)对比实际结果/期望结果,判定用例是否通过#断言Assert4)出局测试报告TextT......
  • spring依赖注入单元测试:expected single matching bean but found 2
    异常信息:org.springframework.beans.factory.UnsatisfiedDependencyException:Causedby:org.springframework.beans.factory.NoSuchBeanDefinitionException:Nouniqueb......
  • 一个方便IO单元测试的C#扩展库
    对于我们.Net程序员,System.Web.Abstractions我们都非常熟悉,主要作用于Web可以实现单元测试,他是在.Netframework3.5sp1开始引入的,很好的解决项目表示层不好做单元测试的......
  • dremio CommandPool简单说明
    CommandPool实际上是一个线程池的处理,官方实现了好几种线程池主要作用限制并行请求以以及job的运行定义优先级任务特点任务基于优先级以及提交时间进行自然排序当线程空闲......
  • Spring Batch -单元测试
    与其他应用程序样式一样,对编写的任何代码进行单元测试非常重要。作为批处理作业的一部分。Spring核心文档涵盖了如何单元和集成用Spring进行了非常详细的测试,因此这里......
  • Spring boot controller单元测试
    工具准备测试框架依赖包<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><version>R......
  • 系统学习Python——单元测试unittest:测试报告
    分类目录:​​《系统学习Python》总目录​​相关文章:·单元测试unittest:框架结构·单元测试unittest:测试固件·单元测试unittest:编写测试用例·单元测试unittest:执......
  • [Java SE/Junit] 基于Java的单元测试框架Mockito[转载]
    Mockito是一个模拟测试框架,主要功能是在单元测试中模拟类/对象的行为。1为什么要使用Mockito?Mock可以理解为创建一个虚假的对象,或者说模拟出一个对象.在测试环境中用......