首页 > 其他分享 >dremio ctas 内部原理简单说明

dremio ctas 内部原理简单说明

时间:2023-01-10 13:00:53浏览次数:55  
标签:dremio convertedRelNode sqlCmd ctas 原理 null config final

ctas 从使用上就是一个sql 操作,dremio 会基于sql 生成逻辑计划,然后到物理计划,之后到执行计划,然后会转变为对于支持写入操作
数据源的操作(目前包括了parquet 格式以及iceberg格式), 当前对于支持CTAS 的主要是分布式存储(NAS,对象存储。。。).我以前
简单说明,dremio 支持CTAS 的需要继承 MutablePlugin 接口,对于CTAS 的操作只能基于格式化插件,不能直接通过文件系统插件的
getWriter 方法

物理计划创建

DataAdditionCmdHandler

 
public PhysicalPlan getPlan(DatasetCatalog datasetCatalog,
                              NamespaceKey path,
                              SqlHandlerConfig config,
                              String sql,
                              DataAdditionCmdCall sqlCmd,
                              ResolvedVersionContext version
  ) throws Exception {
    try {
      final ConvertedRelNode convertedRelNode = PrelTransformer.validateAndConvert(config, sqlCmd.getQuery());
      final RelDataType validatedRowType = convertedRelNode.getValidatedRowType();
 
      long maxColumnCount = config.getContext().getOptions().getOption(CatalogOptions.METADATA_LEAF_COLUMN_MAX);
      if (validatedRowType.getFieldCount() > maxColumnCount) {
        throw new ColumnCountTooLargeException((int) maxColumnCount);
      }
 
      // Get and cache table info (if not already retrieved) to avoid multiple retrievals
      getDremioTable(datasetCatalog, path);
      final RelNode queryRelNode = convertedRelNode.getConvertedNode();
      ViewAccessEvaluator viewAccessEvaluator = null;
      if (config.getConverter().getSubstitutionProvider().isDefaultRawReflectionEnabled()) {
        final RelNode convertedRelWithExpansionNodes = ((DremioVolcanoPlanner) queryRelNode.getCluster().getPlanner()).getOriginalRoot();
        viewAccessEvaluator = new ViewAccessEvaluator(convertedRelWithExpansionNodes, config);
        config.getContext().getExecutorService().submit(viewAccessEvaluator);
      }
      final RelNode newTblRelNode = SqlHandlerUtil.resolveNewTableRel(false, sqlCmd.getFieldNames(),
          validatedRowType, queryRelNode, !isCreate());
 
      final long ringCount = config.getContext().getOptions().getOption(PlannerSettings.RING_COUNT);
 
      ByteString extendedByteString = null;
      if (!isCreate()) {
        extendedByteString = cachedDremioTable.getDatasetConfig().getReadDefinition().getExtendedProperty();
      }
 
      // For Insert command we make sure query schema match exactly with table schema,
      // which includes partition columns. So, not checking here
      final RelNode newTblRelNodeWithPCol = SqlHandlerUtil.qualifyPartitionCol(newTblRelNode,
        isCreate() ?
          sqlCmd.getPartitionColumns(null /*param is unused in this interface for create */) :
          Lists.newArrayList());
 
      PrelTransformer.log("Calcite", newTblRelNodeWithPCol, logger, null);
 
      final List<String> partitionFieldNames = sqlCmd.getPartitionColumns(cachedDremioTable);
      final Set<String> fieldNames = validatedRowType.getFieldNames().stream().collect(Collectors.toSet());
      final WriterOptions options = new WriterOptions(
        (int) ringCount,
        partitionFieldNames,
        sqlCmd.getSortColumns(),
        sqlCmd.getDistributionColumns(),
        sqlCmd.getPartitionDistributionStrategy(config, partitionFieldNames, fieldNames),
        sqlCmd.getLocation(),
        sqlCmd.isSingleWriter(),
        Long.MAX_VALUE,
        getIcebergWriterOperation(),
        extendedByteString,
        version
      );
 
      // Convert the query to Dremio Logical plan and insert a writer operator on top.
      Rel drel = this.convertToDrel(
        config,
        newTblRelNodeWithPCol,
        datasetCatalog,
        path,
        options,
        newTblRelNode.getRowType(),
        storageOptionsMap, sqlCmd.getFieldNames(), sqlCmd);
 
      final Pair<Prel, String> convertToPrel = PrelTransformer.convertToPrel(config, drel);
      final Prel prel = convertToPrel.getKey();
      textPlan = convertToPrel.getValue();
      PhysicalOperator pop = PrelTransformer.convertToPop(config, prel);
 
      PhysicalPlan plan = PrelTransformer.convertToPlan(config, pop,
        isIcebergTable() && !isVersionedTable() ?
          () -> refreshDataset(datasetCatalog, path, isCreate())
          : null,
        () -> cleanUp(datasetCatalog, path));
 
      PrelTransformer.log(config, "Dremio Plan", plan, logger);
 
      if (viewAccessEvaluator != null) {
        viewAccessEvaluator.getLatch().await(config.getContext().getPlannerSettings().getMaxPlanningPerPhaseMS(), TimeUnit.MILLISECONDS);
        if (viewAccessEvaluator.getException() != null) {
          throw viewAccessEvaluator.getException();
        }
      }
 
      return plan;
 
    } catch(Exception ex){
      throw SqlExceptionHelper.coerceException(logger, sql, ex, true);
    }
  }

DataAdditionCmdHandler

private Rel convertToDrel(
    SqlHandlerConfig config,
    RelNode relNode,
    DatasetCatalog datasetCatalog,
    NamespaceKey key,
    WriterOptions options,
    RelDataType queryRowType,
    final Map<String, Object> storageOptions,
    final List<String> fieldNames,
    DataAdditionCmdCall sqlCmd)
      throws SqlUnsupportedException {
    Rel convertedRelNode = PrelTransformer.convertToDrel(config, relNode);
 
    // Put a non-trivial topProject to ensure the final output field name is preserved, when necessary.
    // Only insert project when the field count from the child is same as that of the queryRowType.
 
    String queryId = "";
    IcebergTableProps icebergTableProps = null;
    if (isIcebergTable()) {
      queryId = QueryIdHelper.getQueryId(config.getContext().getQueryId());
      ByteString partitionSpec = null;
      BatchSchema tableSchema = null;
      String icebergSchema = null;
      if (!isCreate()) {
        tableSchemaFromKVStore = cachedDremioTable.getSchema();
        partitionColumns = cachedDremioTable.getDatasetConfig().getReadDefinition().getPartitionColumnsList();
        partitionSpec = IcebergUtils.getCurrentPartitionSpec(cachedDremioTable.getDatasetConfig().getPhysicalDataset(), cachedDremioTable.getSchema(), options.getPartitionColumns());
        tableSchema = cachedDremioTable.getSchema();
        icebergSchema = IcebergUtils.getCurrentIcebergSchema(cachedDremioTable.getDatasetConfig().getPhysicalDataset(), cachedDremioTable.getSchema());
        // This is insert statement update  key to use existing table from catalog
        DremioTable table = datasetCatalog.getTable(key);
        if(table != null) {
          key = table.getPath();
        }
      } else {
        tableSchema = CalciteArrowHelper.fromCalciteRowType(queryRowType);
        PartitionSpec partitionSpecBytes = IcebergUtils.getIcebergPartitionSpecFromTransforms(tableSchema, sqlCmd.getPartitionTransforms(null), null);
        partitionSpec = ByteString.copyFrom(IcebergSerDe.serializePartitionSpec(partitionSpecBytes));
        icebergSchema = IcebergSerDe.serializedSchemaAsJson(partitionSpecBytes.schema());
      }
      icebergTableProps = new IcebergTableProps(null, queryId,
        null,
        options.getPartitionColumns() ,
        isCreate() ? IcebergCommandType.CREATE : IcebergCommandType.INSERT,
        null, key.getName(), null, options.getVersion(), partitionSpec, icebergSchema); // TODO: DX-43311 Should we allow null version?
      icebergTableProps.setPersistedFullSchema(tableSchema);
      options.setIcebergTableProps(icebergTableProps);
    }
 
    logger.debug("Creating new table with WriterOptions : '{}' icebergTableProps : '{}' ",
      options,
      icebergTableProps);
 
    tableEntry = datasetCatalog.createNewTable(
      key,
      icebergTableProps,
      options,
      storageOptions);
    if (isIcebergTable()) {
      Preconditions.checkState(tableEntry.getIcebergTableProps().getTableLocation() != null &&
            !tableEntry.getIcebergTableProps().getTableLocation().isEmpty(),
        "Table folder location must not be empty");
    }
 
    if (!isCreate()) {
      BatchSchema partSchemaWithSelectedFields = tableSchemaFromKVStore.subset(fieldNames).orElse(tableSchemaFromKVStore);
      queryRowType = CalciteArrowHelper.wrap(partSchemaWithSelectedFields)
          .toCalciteRecordType(convertedRelNode.getCluster().getTypeFactory(), PrelUtil.getPlannerSettings(convertedRelNode.getCluster()).isFullNestedSchemaSupport());
      logger.debug("Inserting into table with schema : '{}' ", tableSchemaFromKVStore.toString());
    }
 
    // DX-54255: Don't add cast projection, if inserting values from another table
    if (RelOptUtil.findTables(convertedRelNode).isEmpty()) {
      convertedRelNode = addCastProject(convertedRelNode, queryRowType);
    }
    convertedRelNode = new WriterRel(convertedRelNode.getCluster(),
      convertedRelNode.getCluster().traitSet().plus(Rel.LOGICAL),
      convertedRelNode, tableEntry, queryRowType);
 
    convertedRelNode = SqlHandlerUtil.storeQueryResultsIfNeeded(config.getConverter().getParserConfig(),
      config.getContext(), convertedRelNode);
 
    return new ScreenRel(convertedRelNode.getCluster(), convertedRelNode.getTraitSet(), convertedRelNode);
  }

执行pipeline 的处理

包含了iceberg writter,以及基于nessie 的元数据处理,属于dremio 执行计划的一部分
参考图

 

 

说明

dremio CTAS 相比原生apache drill 提供了不少其他功能,比如iceberg 强大能力的支持

参考资料

sabot/kernel/src/main/java/com/dremio/exec/planner/sql/handlers/query/CreateTableHandler.java
sabot/kernel/src/main/java/com/dremio/exec/planner/sql/parser/DataAdditionCmdCall.java
sabot/kernel/src/main/java/com/dremio/exec/planner/sql/handlers/query/DataAdditionCmdHandler.java
sabot/kernel/src/main/java/com/dremio/exec/planner/sql/parser/SqlCreateTable.java
sabot/kernel/src/main/java/com/dremio/exec/planner/sql/handlers/ViewAccessEvaluator.java
https://www.cnblogs.com/rongfengliang/p/15954623.html
sabot/kernel/src/main/java/com/dremio/exec/store/iceberg/model/IcebergCommand.java

标签:dremio,convertedRelNode,sqlCmd,ctas,原理,null,config,final
From: https://www.cnblogs.com/rongfengliang/p/17039839.html

相关文章

  • VMware虚拟化的CPU调度原理及实践建议
    简介:ESXi的CPU调度原理及实践建议ESXi的CPU调度原理CPU调度器的设计目标公平性:确保虚机按照各自配置的份额占用物理CPU。吞吐量:最大化物理CPU的使用率。响应性:vCPU......
  • 深入理解CPU的调度原理
    前言软件工程师们总习惯把OS(OperatingSystem,操作系统)当成是一个非常值得信赖的管家,我们只管把程序托管到OS上运行,却很少深入了解操作系统的运行原理。确实,OS作为一个通用......
  • 详细讲述了CPU的调度原理,本篇讲一下内存的分配过程。
    运行在ESXi主机上的虚拟机分配内存之和可以超过物理机的实际内存大小,这个技术叫做超额分配(overcommitment),即使单个虚拟机的内存分配值都可以超分。但是超分的结果就是可能......
  • dremio FormatMatcher 简单说明
    FormatMatcher核心是对于文件系统进行进行格式匹配,方便查询以及执行引擎了解具体支持的数据格式,进行实际数据的处理每个FormatPlugin都需要包含一个格式化匹配器参考......
  • 深入学习IO多路复用 select/poll/epoll 实现原理
    select/poll/epoll是Linux服务器提供的三种处理高并发网络请求的IO多路复用技术,是个老生常谈又不容易弄清楚其底层原理的知识点,本文打算深入学习下其实现机制。Li......
  • 深入学习IO多路复用 select/poll/epoll 实现原理
    select/poll/epoll是Linux服务器提供的三种处理高并发网络请求的IO多路复用技术,是个老生常谈又不容易弄清楚其底层原理的知识点,本文打算深入学习下其实现机制。Li......
  • 深入学习IO多路复用 select/poll/epoll 实现原理
    select/poll/epoll是Linux服务器提供的三种处理高并发网络请求的IO多路复用技术,是个老生常谈又不容易弄清楚其底层原理的知识点,本文打算深入学习下其实现机制。Li......
  • 230109_50_RPC底层原理
    Hessian与jdk序列化方法对比,hessian的序列化长度更短packagecom.bill.rpc09;importcom.bill.rpc.common.User;importcom.caucho.hessian.io.Hessian2Input;impor......
  • ThreadLocal底层原理
    文章目录1.什么是ThreadLocal?2.ThreadLocal基本用法3.ThreadLocal的应用场景4.ThreadLocal底层原理5.强软弱引用之间的区别5.1强引用5.2软引用5.3弱引用5.4虚引用6.Thr......
  • 认识git工作原理以及SVN和git的区别
    1.本地版本控制:在自己本地电脑上生成版本,每次使用快照进行备份2.集中版本控制:多个用户向一个服务器上提交代码3.分布式版本控制:服务器上存一份代码,自己本地也存一份代码......