首页 > 其他分享 >dremio CTAS STORE AS && WITH SINGLE WRITER 简单说明

dremio CTAS STORE AS && WITH SINGLE WRITER 简单说明

时间:2024-02-09 09:02:21浏览次数:43  
标签:dremio java convertedRelNode addOrGet WRITER RecordWriter SINGLE output

dremio CTAS 支持存储格式以及写入的文件数量(相对分区还说)

参考CTAS格式

CREATE TABLE "s3"."91733d30-d1d2-46bf-8f2b-3c34d587a96c" STORE AS (type => 'text', fieldDelimiter => ',', lineDelimiter => '
') WITH SINGLE WRITER AS SELECT * FROM (
SELECT * FROM "ops-action"
) LIMIT 1000000

store as 配置参考处理

  • CreateTableHandler.java
createStorageOptionsMap(sqlCreateTable.getFormatOptions());
if (CatalogUtil.requestedPluginSupportsVersionedTables(path, catalog)) {
  return doVersionedCtas(config, path, catalog, sql, sqlCreateTable);
}
 
public void createStorageOptionsMap(final SqlNodeList args) {
    if (args == null || args.size() == 0) {
      return;
    }
 
    final ImmutableMap.Builder<String, Object> storageOptions = ImmutableMap.builder();
    for (SqlNode operand : args) {
      if (operand.getKind() != SqlKind.ARGUMENT_ASSIGNMENT) {
        throw UserException.unsupportedError()
          .message("Unsupported argument type. Only assignment arguments (param => value) are supported.")
          .build(logger);
      }
      final List<SqlNode> operandList = ((SqlCall) operand).getOperandList();
 
      final String name = ((SqlIdentifier) operandList.get(1)).getSimple();
      SqlNode literal = operandList.get(0);
      if (!(literal instanceof SqlLiteral)) {
        throw UserException.unsupportedError()
          .message("Only literals are accepted for storage option values")
          .build(logger);
      }
 
      Object value = ((SqlLiteral)literal).getValue();
      if (value instanceof NlsString) {
        value = ((NlsString)value).getValue();
      }
      storageOptions.put(name, value);
    }
   // 存储为一个map
    this.storageOptionsMap = storageOptions.build();
  }

参数使用
DataAdditionCmdHandler.java

 
 内部实际会到对应的存储扩展
tableEntry = datasetCatalog.createNewTable(
  key,
  icebergTableProps,
  options,
  storageOptions);

s3 存储扩展的处理

  public CreateTableEntry createNewTable(
    NamespaceKey tableSchemaPath, SchemaConfig config,
    IcebergTableProps icebergTableProps,
    WriterOptions writerOptions,
    Map<String, Object> storageOptions,
    boolean isResultsTable
  ) {
    Preconditions.checkArgument(tableSchemaPath.size() >= 2, "key must be at least two parts");
    final List<String> resolvedPath = resolveTableNameToValidPath(tableSchemaPath.getPathComponents()); // strips source name
    final String containerName = resolvedPath.get(0);
    if (resolvedPath.size() == 1) {
      throw UserException.validationError()
        .message("Creating buckets is not supported (name: %s)", containerName)
        .build(logger);
    }
   // 调用父类文件系统插件的createNewTable
    final CreateTableEntry entry = super.createNewTable(tableSchemaPath, config,
      icebergTableProps, writerOptions, storageOptions, isResultsTable);
 
    final S3FileSystem fs = getSystemUserFS().unwrap(S3FileSystem.class);
 
    if (!fs.containerExists(containerName)) {
      throw UserException.validationError()
          .message("Cannot create the table because '%s' bucket does not exist", containerName)
          .build(logger);
    }
 
    return entry;
  }

文件系统插件的createNewTable的处理(此处是使用到定义的选项)

  public CreateTableEntry createNewTable(NamespaceKey tableSchemaPath, SchemaConfig config, IcebergTableProps icebergTableProps,
                                         WriterOptions writerOptions, Map<String, Object> storageOptions,
                                         boolean isResultsTable) {
    if(!getMutability().hasMutationCapability(MutationType.TABLE, config.isSystemUser())) {
      throw UserException.parseError()
        .message("Unable to create table. Schema [%s] is immutable for this user.", tableSchemaPath.getParent())
        .build(logger);
    }
 
    final String tableName = getTableName(tableSchemaPath);
 
    final FormatPlugin formatPlugin;
    // 默认的处理,配置为parquet 格式的
    if (storageOptions == null || storageOptions.isEmpty() || !storageOptions.containsKey("type")) {
      final String storage = config.getOptions().getOption(ExecConstants.OUTPUT_FORMAT_VALIDATOR);
      formatPlugin = getFormatPlugin(storage);
      if (formatPlugin == null) {
        throw new UnsupportedOperationException(String.format("Unsupported format '%s' in '%s'", storage, tableSchemaPath));
      }
    } else {
     // 通过配置查找到的
      final FormatPluginConfig formatConfig = createConfigForTable(tableName, storageOptions);
      formatPlugin = getFormatPlugin(formatConfig);
    }

后续执行计划处理(逻辑计划)

  if (!isCreate()) {
      BatchSchema partSchemaWithSelectedFields = tableSchemaFromKVStore.subset(fieldNames).orElse(tableSchemaFromKVStore);
      queryRowType = CalciteArrowHelper.wrap(partSchemaWithSelectedFields)
          .toCalciteRecordType(convertedRelNode.getCluster().getTypeFactory(), PrelUtil.getPlannerSettings(convertedRelNode.getCluster()).isFullNestedSchemaSupport());
      logger.debug("Inserting into table with schema : '{}' ", tableSchemaFromKVStore.toString());
    }
 
    // DX-54255: Don't add cast projection, if inserting values from another table
    if (RelOptUtil.findTables(convertedRelNode).isEmpty() && !(sqlCmd instanceof SqlCopyIntoTable)) {
      convertedRelNode = addCastProject(convertedRelNode, queryRowType);
    }
 
    // skip writer and display DML results on UI only
    if (!config.getContext().getOptions().getOption(ExecConstants.ENABLE_DML_DISPLAY_RESULT_ONLY) || !(sqlCmd instanceof SqlCopyIntoTable)) {
      convertedRelNode = new WriterRel(convertedRelNode.getCluster(),
        convertedRelNode.getCluster().traitSet().plus(Rel.LOGICAL),
        convertedRelNode, tableEntry, queryRowType);
    }
 
    convertedRelNode = SqlHandlerUtil.storeQueryResultsIfNeeded(config.getConverter().getParserConfig(),
      config.getContext(), convertedRelNode);
 
    return new ScreenRel(convertedRelNode.getCluster(), convertedRelNode.getTraitSet(), convertedRelNode);

后变就是物理计划的处理了

WITH SINGLE WRITER

这个实际上就是一个sql 字面量,转化为一个true 或者false 的标记,之后对于不同的writer 作为参数传递,对于实际执行会使用
dremio 的不同Writer 实现(比如parquet的)

  • parquet writer 的operator 处理
 
public class ParquetWriterBatchCreator implements SingleInputOperator.Creator<ParquetWriter>{
  @Override
  public SingleInputOperator create(OperatorContext context, ParquetWriter config) throws ExecutionSetupException {
    ParquetRecordWriter writer = new ParquetRecordWriter(context, config, new ParquetFormatConfig());
    return new WriterOperator(context, config.getOptions(), writer);
  }
}

WriterOperator 处理

public VectorAccessible setup(VectorAccessible incoming) throws Exception {
    state.is(State.NEEDS_SETUP);
     // 此处会结合实际的配置进行处理是否包含了分片
    if(options.hasPartitions() || options.hasDistributions()){
      partitionManager = new PartitionWriteManager(options, incoming, options.getTableFormatOptions().isTableFormatWriter());
      this.maskedContainer = partitionManager.getMaskedContainer();
      recordWriter.setup(maskedContainer, listener, statsListener);
    } else {
      // 单文件的处理
      recordWriter.setup(incoming, listener, statsListener);
    }
    // Create the RecordWriter.SCHEMA vectors.
    fragmentIdVector = output.addOrGet(RecordWriter.FRAGMENT);
    pathVector = output.addOrGet(RecordWriter.PATH);
    summaryVector = output.addOrGet(RecordWriter.RECORDS);
    fileSizeVector = output.addOrGet(RecordWriter.FILESIZE);
    metadataVector = output.addOrGet(RecordWriter.METADATA);
    partitionNumberVector = output.addOrGet(RecordWriter.PARTITION);
    icebergMetadataVector = output.addOrGet(RecordWriter.ICEBERG_METADATA);
    schemaVector = output.addOrGet(RecordWriter.FILE_SCHEMA);
    partitionDataVector  = output.addOrGet(RecordWriter.PARTITION_DATA);
    operationTypeVector = output.addOrGet(RecordWriter.OPERATION_TYPE);
    partitionValueVector = output.addOrGet(RecordWriter.PARTITION_VALUE);
    rejectedRecordVector = output.addOrGet(RecordWriter.REJECTED_RECORDS);
    output.buildSchema();
    output.setInitialCapacity(context.getTargetBatchSize());
    state = State.CAN_CONSUME;
    return output;
  }

说明

以上是对于dremio CTAS 几个配置参数的一个简单说明,实际上用好这些配置配置可以简化一些处理,比如dremio 的下载功能,就是结合了上边说的东西

参考资料

sabot/grammar/src/main/codegen/includes/parserImpls.ftl
sabot/kernel/src/main/java/com/dremio/exec/planner/sql/handlers/query/CreateTableHandler.java
sabot/kernel/src/main/java/com/dremio/exec/planner/sql/handlers/query/DataAdditionCmdHandler.java
sabot/kernel/src/main/java/com/dremio/exec/store/dfs/FileSystemPlugin.java
plugins/s3/src/main/java/com/dremio/plugins/s3/store/S3StoragePlugin.java
sabot/kernel/src/main/java/com/dremio/exec/store/dfs/FormatPluginOptionExtractor.java
common/legacy/src/main/java/com/dremio/common/logical/FormatPluginConfig.java
sabot/kernel/src/main/java/com/dremio/sabot/op/writer/WriterOperator.java
sabot/kernel/src/main/java/com/dremio/exec/store/parquet/ParquetWriterBatchCreator.java
https://docs.dremio.com/current/reference/sql/commands/tables#create-table-as
https://www.cnblogs.com/rongfengliang/p/15954623.html
https://www.cnblogs.com/rongfengliang/p/17039839.html
https://www.cnblogs.com/rongfengliang/p/18008209

标签:dremio,java,convertedRelNode,addOrGet,WRITER,RecordWriter,SINGLE,output
From: https://www.cnblogs.com/rongfengliang/p/18009827

相关文章

  • dremio SchedulerService 服务简单说明
    SchedulerService内部调度服务算是一个比较重要的模块,比如dremio的功能都依赖此模块(元数据获取,一些数据清理任务,反射加速)参考实现子类SchedulerService实现也比较多,因为dremio集群中的节点有多种角色,为了保证数据的一致性会对于不同集群角色的节点进行不同的处理如下图......
  • dremio FileSystem 简单说明
    dremio尽管对于文件系统的使用很多底层都是hdfs的(s3,发射加速),dremio为了减少直接依赖hdfs,自己抽象了一个FileSystem接口对于不同的实现可以方便进行扩展,当然和刚才说的一样,不少底层依赖的是hdfs的FileSystem参考子类如下图简单说明:FilterFileSystem实现了FileSy......
  • Unity Mono单例(MonoSingleton)C#脚本
    什么是单例模式?单例模式是设计模式的一种,一般来说,使用单例模式的类,在程序中全局只会存在一个实例,并且一般来讲其是全局可被访问的。在unity游戏开发中,单例模式广泛应用于GameManager和各种Controller这种只需要一个实例的脚本。以下就是UnityMono单例的泛用脚本usingSystem.......
  • dremio cloud cache 简单说明
    dremiocloudcache实际上就是对于云文件系统的cache加速(比如hdfs,s3。。。),在处理的时候使用了ce包装的包,详细源码并没有开源我们可以通过一些代码整体看下实现参考处理dremio-ce-services-cachemanager中的处理cecaache管理配置dremio:{classpath.scan......
  • PHP导出Excel,从xlswriter到golang的进化是2分缩减到5秒
    先看图 一、介绍xlswriter是一个高效处理excel文件的PHP扩展,底层以C语言实现;处理速度是PHPExcel几十倍甚至几百倍的效率。官方链接:https://gitee.com/viest/php-ext-xlswriter缺点:更深入的功能(例如读取excel图片)健全;导出excel样式不够丰富导出excel文件,xlswriter绝对是效......
  • WIP: SLM-DB:Single-Level Key-Value Store with Persistent Memory
    论文原文:https://www.usenix.org/system/files/fast19-kaiyrakhmet.pdf摘要:本文调查了如何利用新出现的可按照字节寻址的持久化内存(PersistentMemory)来增强KV存储的性能。我们充分利用PM,提出了一种新型的KV存储,SLM-DB,这种存储同时利用到了B+树索引和LSM-tree的优点。我们提出......
  • dremio 官方一篇关于使用了到的技术进行对象存储查询加速的博客
    dremio官方发布了一篇新博客是关于如何实现对象存储的快速查询的使用的技术主要是apchearrow,reflections,columnarcloudcache(c3)说明博客内容很简单,主要是进行了一个介绍,详细的可以看看里边的内容,同时里边包含了不少链接值得学习下参考资料https://docs.dremio.com/......
  • dremio 服务暴露的一些端口
    对于运行态的dremio我们可以看到服务开启的监听,同时也可以通过官方提供的配置文件看到dremio协调节点如果协调节点同时是提供执行,会暴露以下端口zk(可能) 如果使用了内嵌zk的2181client-endpoint 31010,主要是老遗留模式的jdbc端口flightserver 目前推荐的jdbc访问协......
  • dremio LivenessService 服务简单说明
    LivenessService是dremiobackend提供的一个http服务,提供了live(存活)以及metrics服务此服务在dremio集群中的每个节点上都会运行,以下是一些说明一些特点服务使用了jetty与官方dac的backend是不太一样,默认使用了jersey框架默认基于本地回环地址bind同时bind的端口是......
  • dremio cluster docker-compose 运行
    dremio社区版,集群安装比较简单,核心就是一个配置(zk,分布式存储),为了方便本地环境的测试我基于docker-compose提供了一个方便部署的环境,可以使用环境配置docker-compose version:"3"services:zk:image:zookeeperports:-2181:21......