dremio cloud cache 实际上就是对于云文件系统的cache加速(比如hdfs,s3。。。),在处理的时候使用了ce 包装的包,详细源码并没有开源
我们可以通过一些代码整体看下实现
参考处理
dremio-ce-services-cachemanager 中的处理
- ce caache 管理配置
dremio: {
classpath.scanning: {
packages += "com.dremio.service.cachemanager"
}
filesystemwrapper.class = "com.dremio.service.cachemanager.CacheFileSystemWrapper"
}
- 创建
SabotContext 中
// 使用了LoggedFileSystemWrapper 一个包含日志,以及基于动态类加载 (dremio.filesystemwrapper.class 这个config 文件中的key)
this.fileSystemWrapper = new LoggedFileSystemWrapper(
config.getInstance(
FileSystemWrapper.FILE_SYSTEM_WRAPPER_CLASS,
FileSystemWrapper.class,
(fs, storageId, conf, operatorContext, enableAsync, isMetadataEnabled) -> fs,
dremioConfig,
this.optionManager,
allocator,
new ServiceSetDecorator(coord.getServiceSet(Role.EXECUTOR)),
endpoint),
this.optionManager);
- LoggedFileSystemWrapper log包装
LoggedFileSystemWrapper.java 使用的ce 包中的com.dremio.service.cachemanager.CacheFileSystemWrapper
public FileSystem wrap(FileSystem fs, String storageId, AsyncStreamConf conf, OperatorContext context,
boolean enableAsync, boolean isMetadataRefresh) throws IOException {
FileSystem wrappedFs = defaultWrapper.wrap(fs, storageId, conf, context, enableAsync, isMetadataRefresh);
if (LoggedFileSystem.isLoggingEnabled()) {
// use options from the OperatorContext if available, otherwise fall back to global options
OptionResolver options = context != null && context.getOptions() != null ? context.getOptions() : globalOptions;
wrappedFs = new LoggedFileSystem(wrappedFs, options);
}
return wrappedFs;
}
- 使用
核心是FileSystemPlugin,在createFS 部分,实际上还是一个FileSystem 子类
参考代码(使用的SabotContext 的getFileSystemWrapper 方法)
public FileSystem createFS(String userName, OperatorContext operatorContext, boolean metadata) throws IOException {
// newFileSystem 底层实际上是hdfs 的FileSystem
return context.getFileSystemWrapper().wrap(newFileSystem(userName, operatorContext), name, config, operatorContext,
isAsyncEnabledForQuery(operatorContext) && getConfig().isAsyncEnabled(), metadata);
}
newFileSystem 处理
protected FileSystem newFileSystem(String userName, OperatorContext operatorContext) throws IOException {
// Create underlying filesystem
if (Strings.isNullOrEmpty(userName)) {
throw new IllegalArgumentException("Invalid value for user name");
}
final org.apache.hadoop.fs.FileSystem fs;
try {
fs = hadoopFS.get(getFSUser(userName));
} catch (ExecutionException e) {
Throwable cause = e.getCause();
Throwables.propagateIfPossible(cause, IOException.class);
throw new RuntimeException(cause != null ? cause : e);
}
// HadoopFileSystem 是dremio 对于FileSystem 的一个hdfs 实现,实际内部文件读取还是基于了hdfs 的FileSystem
return HadoopFileSystem.get(fs, (operatorContext == null) ? null : operatorContext.getStats(),
isAsyncEnabledForQuery(operatorContext) && getConfig().isAsyncEnabled());
}
- 实际使用配置
services: {
coordinator.enabled: false,
coordinator.master.enabled: false,
executor.enabled: true
executor.cache.path.db : "/mnt/cachemanagerdisk/db",
executor.cache.path.fs : [ "/mnt/cachemanagerdisk/dir1","/mnt/cachemanagerdisk/dir2","/mnt/cachemanagerdisk/dir3","/mnt/cachemanagerdisk/dir4"]
}
说明
以上是一个简单的说明,以及对于代码位置的说明,ce 包中的详细设计可以通过反编译查看,实际上按照此模式我们基于alluxio以及juicefs 也是可以直接
替代的,而且对于大量文件系统的性能会比默认的实现更好
参考资料
sabot/kernel/src/main/java/com/dremio/exec/store/dfs/FileSystemWrapper.java
sabot/kernel/src/main/java/com/dremio/exec/store/dfs/LoggedFileSystemWrapper.java
sabot/kernel/src/main/java/com/dremio/exec/server/SabotContext.java
sabot/kernel/src/main/java/com/dremio/exec/store/dfs/FileSystemPlugin.java
sabot/kernel/src/main/java/com/dremio/exec/hadoop/HadoopFileSystem.java
https://docs.dremio.com/current/get-started/cluster-deployments/customizing-configuration/dremio-conf/cloud-cache-config/
https://www.dremio.com/blog/how-dremio-delivers-fast-queries-on-object-storage-apache-arrow-reflections-and-the-columnar-cloud-cache/
https://community.dremio.com/t/caching-on-prem-implementation/8959/6
https://www.cnblogs.com/rongfengliang/p/16228551.html