我以前简单写过关于元数据处理的说明(基于jprofiler+arthas 工具)会依赖namespace 服务
实际对于数据的操作都是通过SourceMetadataManager 执行的
DatasetSaver 服务提供的能力
- 核心提供的能力
主要是包装DatasetSaver 处理元数据的保存,将获取到的元数据通过namespace 服务保存到底层存储中(由datastore 服务提供的),但是因为保存需要一个逻辑流程
所以dremio 包装了一个私有的saveUsingV1Flow 方法,同时还需要依赖DatasetMetadataSaver 进行实际元数据的存储处理,SourceMetadata 是基本上所有存储插件都会
实现的,比如jdbc 就实现了,间接的会调用插件提供的元数据处理(jdbc 一般会基于information_schema 处理),但是有些存储插件会再实现SupportsListingDatasets
进行数据集的处理(此类在非关系数据库使用的比较多,比如mongo,es 扩展中)
- 参考类图
SourceMetadataManager 提供的功能
- 定时调度,基于60s,唤醒任务,进行插件元数据的获取
- 支持后台任务刷新以及对于即席查询任务的刷新(即席任务主要是首次添加数据源的时候以及测试的时候)
- 基于每个存储扩展插件的刷新策略进行任务处理
参考调用链
实际内部执行的处理
private void wakeup() {
monitor.onWakeup();
// if we've never refreshed, initialize the refresh start times. We do this on wakeup since that will happen if this
// node gets assigned refresh responsibilities much later than the node initially comes up. It does leave the gap
// where we may refresh early if we do a refresh and then the task immediately migrates but that is probably okay
// for now.
// 判断是否是刷新过
if (!initialized) {
initializeRefresh();
// on first wakeup, we'll skip work so we can avoid a bunch of distracting exceptions when a plugin is first starting.
return;
}
try {
bridge.refreshState();
} catch (TimeoutException ex) {
logger.debug("Source '{}' timed out while refreshing state, skipping refresh.", sourceKey, ex);
return;
} catch (Exception ex) {
logger.debug("Source '{}' refresh failed as we were unable to retrieve refresh it's state.", sourceKey, ex);
return;
}
if (!runLock.tryLock()) {
logger.info("Source '{}' delaying refresh since an adhoc refresh is currently active.", sourceKey);
return;
}
// 基于ManagedStoragePlugin.MetadataBridge 的刷新策略获取
try (Closeable c = AutoCloseableLock.ofAlreadyOpen(runLock, true)) {
if ( !(fullRefresh.shouldRun() || namesRefresh.shouldRun()) ) {
return;
}
final SourceState sourceState = bridge.getState();
if (sourceState == null || sourceState.getStatus() == SourceStatus.bad) {
logger.info("Source '{}' skipping metadata refresh since it is currently in a bad state of {}.",
sourceKey, sourceState);
return;
}
// 后台任务处理
final BackgroundRefresh refresh;
if(fullRefresh.shouldRun()) {
refresh = new BackgroundRefresh(fullRefresh, true);
} else {
refresh = new BackgroundRefresh(namesRefresh, false);
}
refresh.run();
} catch (RuntimeException e) {
logger.warn("Source '{}' metadata refresh failed to complete due to an exception.", sourceKey, e);
}
}
soure插件对于DatasetSaver 的调用
dremio 包含了一个PluginsManager,可以管理所有的存储扩展,存储扩展为转换为ManagedStoragePlugin 包装类型的
ManagedStoragePlugin 依赖SourceMetadataManager ,每个SourceMetadataManager包含一个定时任务扫描处理(60s)
然后在内部基于此WakeupWorker 进行元数据的刷新(包含full 以及其他模式的)
- 部分调用链
@com.dremio.service.namespace.NamespaceServiceImpl$DatasetMetadataSaverImpl.savePartitionChunk()
at com.dremio.exec.catalog.SafeNamespaceService$1.lambda$savePartitionChunk$0(SafeNamespaceService.java:342)
at com.dremio.exec.catalog.ManagedStoragePlugin$SafeRunner.doSafe(ManagedStoragePlugin.java:1233)
at com.dremio.exec.catalog.SafeNamespaceService$1.savePartitionChunk(SafeNamespaceService.java:342)
at com.dremio.exec.catalog.CatalogUtil.savePartitionChunksInSplitsStores(CatalogUtil.java:62)
at com.dremio.exec.catalog.DatasetSaverImpl.saveUsingV1Flow(DatasetSaverImpl.java:252)
at com.dremio.exec.catalog.DatasetSaverImpl.save(DatasetSaverImpl.java:121)
at com.dremio.exec.catalog.DatasetSaverImpl.save(DatasetSaverImpl.java:137)
at com.dremio.exec.catalog.MetadataSynchronizer.tryHandleExistingDataset(MetadataSynchronizer.java:316)
at com.dremio.exec.catalog.MetadataSynchronizer.handleExistingDataset(MetadataSynchronizer.java:234)
at com.dremio.exec.catalog.MetadataSynchronizer.synchronizeDatasets(MetadataSynchronizer.java:206)
at com.dremio.exec.catalog.MetadataSynchronizer.go(MetadataSynchronizer.java:136)
at com.dremio.exec.catalog.SourceMetadataManager$RefreshRunner.refreshFull(SourceMetadataManager.java:441)
at com.dremio.exec.catalog.SourceMetadataManager$BackgroundRefresh.run(SourceMetadataManager.java:555)
at com.dremio.exec.catalog.SourceMetadataManager.wakeup(SourceMetadataManager.java:264)
at com.dremio.exec.catalog.SourceMetadataManager.access$300(SourceMetadataManager.java:96)
at com.dremio.exec.catalog.SourceMetadataManager$WakeupWorker.run(SourceMetadataManager.java:203)
注意定义唤醒处理是在master 节点执行的, 构造函数可以看出来
if(isMaster) {
// we can schedule on all nodes since this is a clustered singleton and will only run on a single node.
// 基于了可修改modifiableScheduler
this.wakeupTask = modifiableScheduler.schedule(
Schedule.Builder.everyMillis(WAKEUP_FREQUENCY_MS)
.asClusteredSingleton("metadata-refresh-" + sourceKey)
.build(),
new WakeupWorker()); // 进行刷新,内部会DatasetSaverImpl 以及namespace 服务进行数据处理以及实际元数据的写入存储
} else {
wakeupTask = null;
}
参考资料
sabot/kernel/src/main/java/com/dremio/exec/catalog/DatasetSaver.java
sabot/kernel/src/main/java/com/dremio/exec/catalog/DatasetSaverImpl.java
https://www.cnblogs.com/rongfengliang/p/16795124.html
services/namespace/src/main/java/com/dremio/service/namespace/NamespaceService.java
sabot/kernel/src/main/java/com/dremio/exec/catalog/SourceMetadataManager.java
sabot/kernel/src/test/java/com/dremio/exec/catalog/TestDatasetSaverImpl.java
sabot/kernel/src/test/java/com/dremio/exec/catalog/TestSourceMetadataManager.java
connector/src/main/java/com/dremio/connector/metadata/DatasetMetadata.java
sabot/kernel/src/main/java/com/dremio/exec/catalog/ManagedStoragePlugin.java
sabot/kernel/src/main/java/com/dremio/exec/catalog/PluginsManager.java
sabot/kernel/src/main/java/com/dremio/exec/catalog/CatalogServiceImpl.java
sabot/kernel/src/main/java/com/dremio/exec/catalog/MetadataSynchronizer.java
connector/src/main/java/com/dremio/connector/metadata/SourceMetadata.java
sabot/kernel/src/main/java/com/dremio/exec/catalog/DatasetManager.java