SourceCatalog 主要进行source 的管理,包含了获取信息,创建,更新,删除,包含了不同的实现
SourceCatalog 服务定义
/**
* Interface to perform actions on sources.
*/
// PrivilegeCatalog 可以实现权限处理,具体实际上是SqlGrant 提供的能力
public interface SourceCatalog extends PrivilegeCatalog {
SourceState refreshSourceStatus(NamespaceKey key) throws Exception;
/**
* Get a source based on the provided name. If the source doesn't exist, synchronize with the
* KVStore to confirm creation status.
*
* @param name
* @return A StoragePlugin casted to the expected output.
*/
<T extends StoragePlugin> T getSource(String name);
/**
* Create a source based on the provided configuration. Includes both the creation as well the
* startup of the source. If the source fails to start, no creation will be done. The provided
* configuration should have a null version. Failure to create or failure to start with throw an
* exception. Additionally, if "store.plugin.check_state" is enabled, a plugin that starts but
* then reveals a bad state, will also result in exception.
*
* @param config Configuration for the source.
* @param attributes Optional namespace attributes to pass to namespace entity creation
*/
void createSource(SourceConfig config, NamespaceAttribute... attributes);
/**
* Update an existing source with the given config. The config version must be the same as the
* currently active source. If it isn't, this call will fail with an exception.
*
* @param config Configuration for the source.
* @param attributes Optional namespace attributes to pass to namespace entity creation
*/
void updateSource(SourceConfig config, NamespaceAttribute... attributes);
/**
* Delete a source with the provided config. If the source doesn't exist or the config doesn't
* match, the method with throw an exception.
*
* @param config
*/
void deleteSource(SourceConfig config);
}
实现子类
参考下图,还是比较复杂的,实际实现是CatalogImpl,内部会调用CatalogServiceImpl
创建source创建处理
参考处理
private void createSource(SourceConfig config, CatalogIdentity subject, NamespaceAttribute... attributes) {
boolean afterUnknownEx = false;
try(final AutoCloseable sourceDistributedLock = getDistributedLock(config.getName())) {
logger.debug("Obtained distributed lock for source {}", "-source-"+config.getName());
setInfluxSource(config.getName());
// 通过 PluginsManager 插件管理进行实际source 的创建
getPlugins().create(config, subject.getName(), attributes);
communicateChange(config, RpcType.REQ_SOURCE_CONFIG);
} catch (SourceAlreadyExistsException e) {
throw UserException.concurrentModificationError(e).message("Source already exists with name %s.", config.getName()).buildSilently();
} catch (ConcurrentModificationException ex) {
throw ex;
} catch (UserException ue) {
// If it's a UserException, message is probably helpful, so rethrow
throw ue;
} catch (IllegalArgumentException e) {
throw e;
} catch (Exception ex) {
afterUnknownEx = true;
logger.error("Exception encountered: {}", ex.getMessage(), ex);
throw UserException.validationError(ex).message("Failed to create source with name %s.", config.getName()).buildSilently();
} finally {
logger.debug("Releasing distributed lock for source {}", "-source-"+config.getName());
removeInfluxSource(config.getName(), afterUnknownEx);
}
}
PluginsManager start 部分会对于注册插件的获取基于了NamespaceService.Factory
```code
ImmutableMap.Builder<String, CompletableFuture<SourceState>> futuresBuilder = ImmutableMap.builder();
// 此处获取已经注册的插件,转化为dremio 托管的存储插件
for (SourceConfig source : datasetListing.getSources(SystemUser.SYSTEM_USERNAME)) {
ManagedStoragePlugin plugin = newPlugin(source);
futuresBuilder.put(source.getName(), plugin.startAsync());
plugins.put(c(source.getName()), plugin);
}
ConnectionReader 进行插件信息获取
ConnectionReaderImpl 中,方便页面加载
参考处理
protected static Collection<Class<? extends ConnectionConf<?, ?>>> getCandidateSources(ScanResult scanResult) {
ImmutableList.Builder<Class<? extends ConnectionConf<?, ?>>> candidates = new ImmutableList.Builder<>();
for(Class<?> input : scanResult.getAnnotatedClasses(SourceType.class)) {
try {
if (Modifier.isAbstract(input.getModifiers())
|| Modifier.isInterface(input.getModifiers())
|| !ConnectionConf.class.isAssignableFrom(input)) {
logger.warn("Failure trying to recognize SourceConf for {}. Expected a concrete implementation of SourceConf.", input.getName());
continue;
}
} catch (Exception e) {
logger.warn("Failure trying to recognize SourceConf for {}", input.getName(), e);
continue;
}
// Check done just above
candidates.add((Class<? extends ConnectionConf<?, ?>>) input);
}
return candidates.build();
}
说明
source 类型,主要面向的是可以界面可见的存储扩展处理,实际上dremio 还包含了不少内置的存储扩展,系统存储扩展(比如加速,home)
同时通过阅读我们发现官方还是提供了一个PrivilegeCatalog 支持权限能力的扩展的,自己扩展下就可以实现一些企业版的特性了
参考资料
sabot/kernel/src/main/java/com/dremio/exec/catalog/SourceCatalog.java
sabot/kernel/src/main/java/com/dremio/exec/planner/sql/parser/SqlGrant.java
dac/backend/src/main/java/com/dremio/dac/api/SourceResource.java
sabot/kernel/src/main/java/com/dremio/exec/catalog/ConnectionReader.java
sabot/kernel/src/main/java/com/dremio/exec/catalog/ConnectionReaderImpl.java