datastore 实际上是进行数据存储的实现(主要是配置以及元数据相关的)不少服务都使用到了此功能(namespace,catalog,user,job )
实际上dremio 官方对于dremio 的部署(软件版,尤其是是社区版)有比较明确的说明,需要zk 以及本地存储(或者nas)对于协调节点
的元数据进行存储,同时如果需要实现ha 模式,可以支持对于协调节点配置主从抢占模式,从节点可以进行数据查询处理,但是不进行
元数据操作(比如比较典型的反射处理,元数据刷新。。。。),通过基本的阅读datastore
datastore 提供的能力
- key,value 存储
- key,value 序列化,反序列化处理,当然部分上就有了protobuf
- 索引能力提供,这部分实际上应该是加速key,value 检索的,实现上基于了lucene,同时自己包装了,实际上我们实际使用的kv存储,基本都会包含存储以及索引
- rpc 能力,提供通用rpc 包装,方便不通client 使用DatastoreRpcService,DatastoreRpcClient,rpc 整体是基于FabricProtocol 协议的,dremio 通用的rpc 处理
参加实现图
从下图可以看出面向不同场景实现的还是不少的
KVStoreProviderHelper 对于kv 存储的初始化
KVStoreProviderHelper 包含了不少判断,基于系统配置 ,以及基于动态类创建机制,在DACDaemonModule 模块中
private static KVStoreProvider internalKVStoreProvider(DACConfig dacConfig,
BootStrapContext bootstrap,
Provider<FabricService> fabricService,
Provider<NodeEndpoint> endPoint) {
DremioConfig dremioConfig = dacConfig.getConfig();
Map<String, Object> config = new HashMap<>();
String thisNode = dremioConfig.getThisNode();
// instantiate NoopKVStoreProvider on all non-coordinator nodes.
boolean isCoordinator = dremioConfig.getBoolean(DremioConfig.ENABLE_COORDINATOR_BOOL);
if (!isCoordinator) {
return new NoopKVStoreProvider(bootstrap.getClasspathScan(), fabricService, endPoint, bootstrap.getAllocator(), config);
}
// Configure the default KVStore , 通过配置约定
String datastoreType = System.getProperty(KVSTORE_TYPE_PROPERTY_NAME, DEFAULT_DB);
config.put(DremioConfig.DEBUG_USE_MEMORY_STRORAGE_BOOL, dacConfig.inMemoryStorage);
config.put(LocalKVStoreProvider.CONFIG_DISABLEOCC, "false");
config.put(LocalKVStoreProvider.CONFIG_VALIDATEOCC, "true");
config.put(LocalKVStoreProvider.CONFIG_TIMED, "true");
config.put(LocalKVStoreProvider.CONFIG_BASEDIRECTORY, dremioConfig.getString(DremioConfig.DB_PATH_STRING));
config.put(LocalKVStoreProvider.CONFIG_HOSTNAME, System.getProperty(KVSTORE_HOSTNAME_PROPERTY_NAME, thisNode));
config.put(RemoteKVStoreProvider.HOSTNAME, thisNode);
config.put(DremioConfig.REMOTE_DATASTORE_RPC_TIMEOUT_SECS, dremioConfig.getLong(DremioConfig.REMOTE_DATASTORE_RPC_TIMEOUT_SECS));
// find the appropriate KVStoreProvider from path
// first check for the default behavior (if services.datastore.type is set to "default")
// if services.datastore.type is set, check ClassPath for associated KVStoreProvider type
Class<? extends KVStoreProvider> cls = null;
switch (datastoreType) {
// 默认因为就没有配置KVSTORE_TYPE_PROPERTY_NAME 的定义,所以就只能是本地LocalKVStoreProvider 的
case DEFAULT_DB:
config.put(LocalKVStoreProvider.CONFIG_HOSTNAME, thisNode);
// fall through to TEST_CLUSTER_DB
// 从命名上主要是测试使用的
case TEST_CLUSTER_DB:
boolean isMasterless = dremioConfig.isMasterlessEnabled();
boolean isMaster = (!isMasterless && dremioConfig.getBoolean(DremioConfig.ENABLE_MASTER_BOOL));
boolean needsLocalKVStore = (isMasterless && thisNode.equals(config.get(LocalKVStoreProvider.CONFIG_HOSTNAME)));
cls = (isMaster || needsLocalKVStore)? LocalKVStoreProvider.class : RemoteKVStoreProvider.class;
break;
default:
// 基于ClassPathScanner 查找实现,所以我们可以自己扩展
final ScanResult results = ClassPathScanner.fromPrescan(dremioConfig.getSabotConfig());
final Set<Class<? extends KVStoreProvider>> classes = results.getImplementations(KVStoreProvider.class);
for (Class<? extends KVStoreProvider> it : classes) {
try {
KVStoreProviderType anno = it.getAnnotation(KVStoreProviderType.class);
if (anno != null && anno.type().equals(datastoreType)) {
cls = it;
break;
}
} catch (Exception e) {
logger.info(String.format("Unable to find KVStoreProviderType annotation in %s during search, skipping", cls.getName()));
continue;
}
}
break;
}
// not able to find a KVStoreProvider for the requested services.datastore.type
if (cls == null) {
throw new RuntimeException("Unable to find appropriate KVStoreProvider for " + datastoreType);
}
try {
// 动态创建KVStoreProvider 的定义
final Constructor<? extends KVStoreProvider> con = cls.getDeclaredConstructor(
ScanResult.class,
Provider.class,
Provider.class,
BufferAllocator.class,
Map.class
);
return con.newInstance(bootstrap.getClasspathScan(),
fabricService,
endPoint,
bootstrap.getAllocator(),
config
);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
历史遗留问题
从官方代码上可以看出,官方包含了一个遗留实现,具体通过的了一个adapter 进行适配解决
DACDaemonModule 参考
registry.bind(
LegacyKVStoreProvider.class,
new LegacyKVStoreProviderAdapter(
registry.provider(KVStoreProvider.class).get()) // i此处就使用到了上边的初始化定义了
);
说明
以上只是一个简单的说明,具体会通过namespaceservice 以及其他模块进行集成说明,关于datastore服务的一些配置实际上官方文档并没有介绍
但是通过学习源码,我们可以了解一些内部处理,方便自己扩展以及研究
参考资料
dac/backend/src/main/java/com/dremio/dac/daemon/KVStoreProviderHelper.java
services/datastore/src/main/java/com/dremio/datastore/RemoteKVStoreProvider.java
services/datastore/src/main/java/com/dremio/datastore/DatastoreRpcClient.java
services/datastore/src/main/java/com/dremio/datastore/DatastoreRpcService.java