以前对于S3StoragePlugin 有过简单的说明,以下结合S3StoragePlugin 说明下如何开发一个存储扩展
一些标准模式
- 选择实现的存储插件的接口
比如s3的因为实际上就是文件系统,所以S3StoragePlugin 实现了FileSystemPlugin 接口的,同时可以复用不少已有的实现(比如表创建,修改,删除相关的),但是s3 有特殊性,S3StoragePlugin 做了一些重写
- getDatasetHandle 实现
这个属于一个标配,可以知道存储插件中dataset 的path 信息 - getDatasetMetadata 实现
与getDatasetHandle类似也是一个标配 - listPartitionChunks 实现
与getDatasetHandle类似也是一个标配 - containerExists 实现
与getDatasetHandle类似也是一个标配 - SupportsListingDatasets 实现
属于可选,mongo以及jdbc 存储插件都实现了此接口 - SupportsExternalQuery 接口实现
属于一个可选,jdbc 实现了此接口,方便进行外部查询 - 实现规则工厂方法
FileSystemPlugin的实现如下,可以看到可以配置,但是有一个默认实现
public Class<? extends StoragePluginRulesFactory> getRulesFactoryClass() {
return context.getConfig().getClass("dremio.plugins.dfs.rulesfactory", StoragePluginRulesFactory.class, FileSystemRulesFactory.class);
}
getRulesFactoryClass 类在dremio 中还是比较重要的,会影响到查询计划的具体执行
FileSystemRulesFactory 提供的核心规则如下,主要是对于逻辑计划以及物理计划的优化规则,详细的可以查看具体实现
public Set<RelOptRule> getRules(OptimizerRulesContext optimizerContext, PlannerPhase phase, SourceType pluginType) {
switch(phase){
case LOGICAL:
ImmutableSet.Builder<RelOptRule> builder = ImmutableSet.builder();
builder.add(new FileSystemDrule(pluginType));
if(optimizerContext.getPlannerSettings().isPartitionPruningEnabled()){
builder.add(new PruneScanRuleFilterOnProject<>(pluginType, FilesystemScanDrel.class, optimizerContext));
builder.add(new PruneScanRuleFilterOnScan<>(pluginType, FilesystemScanDrel.class, optimizerContext));
builder.add(new PruneScanRuleFilterOnSampleScan<>(pluginType, FilesystemScanDrel.class, optimizerContext));
}
return builder.build();
case PHYSICAL:
return ImmutableSet.<RelOptRule>of(
new IcebergMetadataFilesystemScanPrule(pluginType, optimizerContext),
new EasyFilesystemScanPrule(pluginType),
new ParquetFilesystemScanPrule(pluginType),
new IcebergFilesystemScanPrule(pluginType, optimizerContext),
new DeltaLakeFilesystemScanPrule(pluginType, optimizerContext),
new DeltaLakeFilesystemHistoryScanPrule(pluginType),
ConvertCountToDirectScan.getAggOnScan(pluginType),
ConvertCountToDirectScan.getAggProjOnScan(pluginType),
new TableFilesFunctionScanPrule(pluginType),
new FileSystemTableOptimizePrule(optimizerContext),
new FileSystemVacuumTablePrule(optimizerContext),
new VacuumCatalogPrule()
);
default:
return ImmutableSet.<RelOptRule>of();
}
}
- subscan 实现
dremio 不管任何存储扩展实际上都会是表的处理,需要进行scan,只是不同的scan 会有一些优化规则,比如文件系统的就有不少的实现
- 对应subscan 的ProducerOperator.Creator 实现
因为s3基于了FileSystemPlugin 里边的实现不少,此Creator是在PipelineCreator中结合实际的规则选择合适的实现, 我简单介绍一个parquet 的
参考实现(ParquetOperatorCreator),里边会关联到具体的reader 处理,也是dremio 如何获取外部source 数据的地方,对于文件系统包含了不少
实现,jdbc 的就相对简单了,直接就是jdbc 的查询处理
public class ParquetOperatorCreator implements Creator<ParquetSubScan> {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ParquetOperatorCreator.class);
@Override
public ProducerOperator create(FragmentExecutionContext fragmentExecContext, final OperatorContext context, final ParquetSubScan config) throws ExecutionSetupException {
final Stopwatch watch = Stopwatch.createStarted();
try {
ParquetSplitReaderCreatorIterator creator = new ParquetSplitReaderCreatorIterator(fragmentExecContext, context, config, true);
logger.debug("Took {} ms to create Parquet Scan.", watch.elapsed(TimeUnit.MILLISECONDS));
return creator.createScan();
} catch (Exception ex) {
throw new ExecutionSetupException("Failed to create scan operator.", ex);
}
}
public RecordReaderIterator getReaders(FragmentExecutionContext fragmentExecContext, final OperatorContext context, final ParquetSubScan config) throws ExecutionSetupException {
ParquetSplitReaderCreatorIterator creator = new ParquetSplitReaderCreatorIterator(fragmentExecContext, context, config, true);
return new PrefetchingIterator(creator);
}
}
说明
以上是结合一个实现,简单的串下dremio 存储插件实现应该做的一些事情,实际内部的处理可以多结合源码学习
参考资料
https://www.cnblogs.com/rongfengliang/p/17150854.html
sabot/kernel/src/main/java/com/dremio/exec/store/dfs/FileSystemRulesFactory.java
plugins/s3/src/main/java/com/dremio/plugins/s3/store/S3StoragePlugin.java
sabot/kernel/src/main/java/com/dremio/exec/store/StoragePluginRulesFactory.java
sabot/kernel/src/main/java/com/dremio/exec/physical/base/SubScan.java
sabot/kernel/src/main/java/com/dremio/exec/store/parquet/ParquetOperatorCreator.java
connector/src/main/java/com/dremio/connector/metadata/SourceMetadata.java
connector/src/main/java/com/dremio/connector/metadata/extensions/SupportsListingDatasets.java