ManagedStoragePlugin 从字面意思可以看出就是托管存储插件,从目前官方的设计来说就是将
自己开发的存储扩展,包装为dremio 可以管理的插件(统一模型以及统一处理)
ManagedStoragePlugin 提供的能力
- 数据源信息维护,插件会包含一个关联的数据源
- 元数据策略
- 数据集的保存工具类
- 存储插件的规则
- 修改数据集
- 获取数据源的配置信息(连接信息)
- 权限check (此功能属于企业版,可以自己尝试扩展)
提供的能力参考下图
处理简单说明
dremio 定义了一套插件开发机制,具体可以参考我以前写的,活着看看官方提供的一个jdbc,mongo ,es 插件的开发
对于开发的插件,整体会包含两大类(系统,以及用户)对于系统的dremio 在服务启动的时候会进行系统存储插件的扫描,具
体是SystemStoragePluginInitializer 处理的,实际内部处理也基于了CatalogService
- 系统存储扩展处理
SystemStoragePluginInitializer 类中
private void pluginsCreation(final BindingProvider provider, final SabotContext sabotContext) throws Exception {
final DremioConfig config = provider.lookup(DremioConfig.class);
final CatalogService catalogService = provider.lookup(CatalogService.class);
final NamespaceService ns = provider.lookup(SabotContext.class).getNamespaceService(SYSTEM_USERNAME);
final DeferredException deferred = new DeferredException();
final ProjectConfig projectConfig = provider.lookup(ProjectConfig.class);
final Path supportPath = Paths.get(sabotContext.getOptionManager().getOption(TEMPORARY_SUPPORT_PATH));
final Path logPath = Paths.get(System.getProperty(DREMIO_LOG_PATH_PROPERTY, "/var/log/dremio"));
final ProjectConfig.DistPathConfig uploadsPathConfig = projectConfig.getUploadsConfig();
final ProjectConfig.DistPathConfig accelerationPathConfig = projectConfig.getAcceleratorConfig();
final ProjectConfig.DistPathConfig scratchPathConfig = projectConfig.getScratchConfig();
final ProjectConfig.DistPathConfig metadataPathConfig = projectConfig.getMetadataConfig();
final URI downloadPath = config.getURI(DremioConfig.DOWNLOADS_PATH_STRING);
final URI resultsPath = config.getURI(DremioConfig.RESULTS_PATH_STRING);
// Do not construct URI simply by concatenating, as it might not be encoded properly
final URI logsPath = new URI("pdfs", "///" + logPath.toUri().getPath(), null);
final URI supportURI = supportPath.toUri();
final boolean enableAsyncForUploads = enable(config, DremioConfig.DEBUG_UPLOADS_ASYNC_ENABLED);
createSafe(catalogService, ns,
HomeFileConf.create(HomeFileSystemStoragePlugin.HOME_PLUGIN_NAME, uploadsPathConfig.getUri(), config.getThisNode(),
SchemaMutability.USER_TABLE, CatalogService.NEVER_REFRESH_POLICY,
enableAsyncForUploads, scratchPathConfig.getDataCredentials()), deferred);
final int maxCacheSpacePercent = config.hasPath(DremioConfig.DEBUG_DIST_MAX_CACHE_SPACE_PERCENT)?
config.getInt(DremioConfig.DEBUG_DIST_MAX_CACHE_SPACE_PERCENT) : MAX_CACHE_SPACE_PERCENT;
final boolean enableAsyncForAcceleration = enable(config, DremioConfig.DEBUG_DIST_ASYNC_ENABLED);
final boolean enableS3FileStatusCheck = isEnableS3FileStatusCheck(config, accelerationPathConfig);
boolean enableCachingForAcceleration = isEnableCaching(sabotContext, config, accelerationPathConfig, CLOUD_CACHING_ENABLED);
createSafe(catalogService, ns,
AccelerationStoragePluginConfig.create(accelerationPathConfig.getUri(), enableAsyncForAcceleration,
enableCachingForAcceleration, maxCacheSpacePercent, enableS3FileStatusCheck,
accelerationPathConfig.getDataCredentials()), deferred);
final boolean enableAsyncForJobs = enable(config, DremioConfig.DEBUG_JOBS_ASYNC_ENABLED);
createSafe(catalogService, ns,
InternalFileConf.create(DACDaemonModule.JOBS_STORAGEPLUGIN_NAME, resultsPath, SchemaMutability.SYSTEM_TABLE,
CatalogService.DEFAULT_METADATA_POLICY_WITH_AUTO_PROMOTE, enableAsyncForJobs, null), deferred);
final boolean enableAsyncForScratch = enable(config, DremioConfig.DEBUG_SCRATCH_ASYNC_ENABLED);
createSafe(catalogService, ns,
InternalFileConf.create(DACDaemonModule.SCRATCH_STORAGEPLUGIN_NAME, scratchPathConfig.getUri(), SchemaMutability.USER_TABLE,
CatalogService.NEVER_REFRESH_POLICY_WITH_AUTO_PROMOTE, enableAsyncForScratch, scratchPathConfig.getDataCredentials()), deferred);
final boolean enableAsyncForDownload = enable(config, DremioConfig.DEBUG_DOWNLOAD_ASYNC_ENABLED);
createSafe(catalogService, ns,
InternalFileConf.create(DATASET_DOWNLOAD_STORAGE_PLUGIN, downloadPath, SchemaMutability.USER_TABLE,
CatalogService.NEVER_REFRESH_POLICY, enableAsyncForDownload, null), deferred);
final boolean enableAsyncForMetadata = enable(config, DremioConfig.DEBUG_METADATA_ASYNC_ENABLED);
final boolean enableS3FileStatusCheckForMetadata = isEnableS3FileStatusCheck(config, metadataPathConfig);
boolean enableCachingForMetadata = isEnableCaching(sabotContext, config, metadataPathConfig, METADATA_CLOUD_CACHING_ENABLED);
createSafe(catalogService, ns,
MetadataStoragePluginConfig.create(metadataPathConfig.getUri(), enableAsyncForMetadata,
enableCachingForMetadata, maxCacheSpacePercent, enableS3FileStatusCheckForMetadata,
metadataPathConfig.getDataCredentials()), deferred);
final boolean enableAsyncForLogs = enable(config, DremioConfig.DEBUG_LOGS_ASYNC_ENABLED);
createSafe(catalogService, ns,
InternalFileConf.create(LOGS_STORAGE_PLUGIN, logsPath, SchemaMutability.NONE,
CatalogService.NEVER_REFRESH_POLICY, enableAsyncForLogs, null), deferred);
final boolean enableAsyncForSupport = enable(config, DremioConfig.DEBUG_SUPPORT_ASYNC_ENABLED);
createSafe(catalogService, ns,
InternalFileConf.create(LOCAL_STORAGE_PLUGIN, supportURI, SchemaMutability.SYSTEM_TABLE,
CatalogService.NEVER_REFRESH_POLICY, enableAsyncForSupport, null), deferred);
deferred.throwAndClear();
}
- 用户开发的
这个是有类加载机制,结合用户进行的api 操作创建的,实现上可以参考下SourceCatalog
之后会统一放到一个集合中
PluginsManager 类中的方法
public ManagedStoragePlugin create(SourceConfig config, String userName, NamespaceAttribute... attributes) throws TimeoutException, Exception {
if (hasPlugin(config.getName())) {
throw new SourceAlreadyExistsException();
}
ManagedStoragePlugin plugin = newPlugin(config);
try {
plugin.createSource(config, userName, attributes);
} catch(UserException e) {
//The creation of Source can fail due to various reasons.
//In case of failure, we need to cleanup the in-memory state of the source. Hence closing the plugin.
logger.error("Exception while creating source.", e);
try {
plugin.close();
} catch (Exception ex) {
e.addSuppressed(ex);
}
throw e;
}
// use concurrency features of concurrent hash map to avoid locking.
ManagedStoragePlugin existing = plugins.putIfAbsent(c(config.getName()), plugin);
if (existing == null) {
return plugin;
}
// This means it has been added by a concurrent thread doing create with the same name
final SourceAlreadyExistsException e = new SourceAlreadyExistsException();
try {
// this happened in time with someone else.
plugin.close();
} catch (Exception ex) {
e.addSuppressed(ex);
}
throw e;
}
参考资料
sabot/kernel/src/main/java/com/dremio/exec/catalog/ManagedStoragePlugin.java
sabot/kernel/src/main/java/com/dremio/exec/catalog/PluginsManager.java
dac/backend/src/main/java/com/dremio/dac/daemon/SystemStoragePluginInitializer.java
common/src/main/java/com/dremio/service/InitializerRegistry.java
sabot/kernel/src/main/java/com/dremio/exec/catalog/CatalogServiceImpl.java
sabot/kernel/src/main/java/com/dremio/exec/catalog/SourceCatalog.java
sabot/kernel/src/main/java/com/dremio/exec/catalog/PermissionCheckCache.java