dremio 尽管对于文件系统的使用很多底层都是hdfs 的(s3,发射加速),dremio 为了减少直接依赖hdfs,自己抽象了一个FileSystem 接口
对于不同的实现可以方便进行扩展,当然和刚才说的一样,不少底层依赖的是hdfs 的FileSystem
参考子类
如下图
简单说明:
FilterFileSystem 实现了FileSystem,具体的操作是对于接口的实现,委托给了另外一个FileSystem 实现
AccelerationFileSystem 扩展了FilterFileSystem,主要是对于反射加速的,核心使用在文件系统存储扩展中,同时因为dremio 不建议使用伪分布式系统了
所以进行了check 以及异常处理
public FileSystem createFS(String userName, OperatorContext operatorContext, boolean metadata) throws IOException {
FileSystem fs = new AccelerationFileSystem(super.createFS(userName, operatorContext, metadata));
if (fs.isPdfs()) {
// Logging to help with debugging DX-54664
IllegalStateException exception = new IllegalStateException("AccelerationStoragePlugin does not support PDFS. User: " + userName);
logger.error(exception.getMessage(), exception);
}
return fs;
}
DistStorageMetadataPathRewritingFileSystem 官方的介绍是为了动态元数据路径的rewriting,核心是iceberg 等的元数据,主要在MetadataStoragePlugin 中使用, 目前RepairKvstoreFromIcebergMetadata 中也有使用到
MetadataStoragePlugin的使用
@Override
public FileSystem createFS(String userName, OperatorContext operatorContext, boolean metadata) throws IOException {
if (getContext().getOptionManager().getOption(ExecConstants.ENABLE_UNLIMITED_SPLITS_DISTRIBUTED_STORAGE_RELOCATION)) {
FileSystem f = super.createFS(userName, operatorContext, metadata);
Path configPath = this.getConfig().getPath();
return new DistStorageMetadataPathRewritingFileSystem(f, configPath);
} else {
return super.createFS(userName, operatorContext, metadata);
}
}
RepairKvstoreFromIcebergMetadata 的使用,目前看主要是对于iceberg 表的一些修复
/**
* re-syncs the unlimited splits metadata by frontloading the last-seen metadata.json file within fs dir, to the HEAD of the commit.
* this commit will be used at reference point to conduct the repair method.
* @param icebergTableIdentifier: the icebergTableIdentifier used to locate the internal iceberg table
*/
private void repairMetadataDesyncIfNecessary(IcebergTableIdentifier icebergTableIdentifier) throws IOException {
FileSystem fs = this.metaStoragePlugin.getSystemUserFS();
String icebergMetadataFileLocation = this.datasetConfig.getPhysicalDataset().getIcebergMetadata().getMetadataFileLocation();
// If feature flag ON, and the file does not exist in the fs, perform metadata re-sync
if (metaStoragePlugin.getContext().getOptionManager().getOption(ExecConstants.ENABLE_UNLIMITED_SPLITS_DISTRIBUTED_STORAGE_RELOCATION) &&
!fs.exists(Path.of(icebergMetadataFileLocation))) {
logger.warn(String.format("metadata file: %s was not found within storage %s. Attempting self-heal to re-sync metadata",
icebergMetadataFileLocation,
((DistStorageMetadataPathRewritingFileSystem) fs).getDistStoragePath()));
Provider<NessieApiV2> api = ((IcebergNessieModel) icebergModel).getNessieApi();
//Construct the internal iceberg's TableOperations
IcebergNessieTableOperations icebergNessieTableOperations = new IcebergNessieTableOperations(
null,
api,
metaStoragePlugin.createIcebergFileIO(fs, null, null, null, null),
(IcebergNessieTableIdentifier) icebergTableIdentifier,
null,
optionManager);
Path metadataPathLocation = getLatestMetastoreVersionIfPathExists(Path.of(icebergMetadataFileLocation), fs);
//Refresh the object to populate the rest of the table operations.
//fs.open and fs.getFileAttributes will be called during refresh. Calls to the "current metadata location" will
//be redirected to point to the latest-existing metastore version found within the fs directory.
//The table metadata returned represents the contents found by the re-written path.
icebergNessieTableOperations.doRefreshFromPreviousCommit(metadataPathLocation.toString());
TableMetadata LatestExistingtableMetadata = icebergNessieTableOperations.current();
//Commit the outdated metadata to Nessie. Committing the latest-existing metadata pushes it to HEAD.
//The new commit generates a new (temporary) metadata.json file.
//The new (temporary) commit serves as the "old metadata" reference point during the repair.
icebergNessieTableOperations.doCommit(null, LatestExistingtableMetadata);
}
}
LoggedFileSystem 包含日志的信息的FileSystem,dremio 不少扩展都扩展了此类,比如cache 的,当然日志支持基于support key 的配置
DremioHadoopFileSystemWrapper, 目前主要是对于hive 查询的包装,核心是hdfs 那套东西的处理
HadoopFileSystem 是dremio 对于实际内部文件系统的包装,比如dremio 使用的比较多的hdfs
参考处理
// 直接使用的底层hdfs 的getLocal
public static FileSystem getLocal(Configuration fsConf) throws IOException {
fsConf.set(HADOOP_SECURITY_CREDENTIAL_PROVIDER_PATH, DREMIO_CREDENTIAL_PROVIDER_PATH);
org.apache.hadoop.fs.FileSystem fs = org.apache.hadoop.fs.FileSystem.getLocal(fsConf);
return get(fs);
}
// 文件打开
public FSInputStream open(Path f) throws IOException {
try (WaitRecorder metaRecorder = OperatorStats.getMetadataWaitRecorder(operatorStats, f)) {
// newFSDataInputStreamWrapper dremio 包装的hdfs FSDataInputStream 处理,底层还是会到hdfs
return newFSDataInputStreamWrapper(f, underlyingFs.open(toHadoopPath(f)), operatorStats, true);
} catch(FSError e) {
throw propagateFSError(e);
}
}
包装类
FileSystemWrapper 是对于FileSystem 的包装,参考实现
简单说明:
LoggedFileSystemWrapper 是FileSystemWrapper 的实现,主要支持日志记录功能,如果开启了log 会使用LoggedFileSystem
public FileSystem wrap(FileSystem fs, String storageId, AsyncStreamConf conf, OperatorContext context,
boolean enableAsync, boolean isMetadataRefresh) throws IOException {
FileSystem wrappedFs = defaultWrapper.wrap(fs, storageId, conf, context, enableAsync, isMetadataRefresh);
if (LoggedFileSystem.isLoggingEnabled()) {
// use options from the OperatorContext if available, otherwise fall back to global options
OptionResolver options = context != null && context.getOptions() != null ? context.getOptions() : globalOptions;
wrappedFs = new LoggedFileSystem(wrappedFs, options);
}
return wrappedFs;
}
否则是sabotcontext 创建的,通过 FileSystemWrapper.FILE_SYSTEM_WRAPPER_CLASS 动态加载的class
this.fileSystemWrapper = new LoggedFileSystemWrapper(
config.getInstance(
FileSystemWrapper.FILE_SYSTEM_WRAPPER_CLASS,
FileSystemWrapper.class,
(fs, storageId, conf, operatorContext, enableAsync, isMetadataEnabled) -> fs,
dremioConfig,
this.optionManager,
allocator,
new ServiceSetDecorator(coord.getServiceSet(Role.EXECUTOR)),
endpoint),
当然dremio 说的c3 cache 也对于FileSystemWrapper 进行了实现 (CacheFileSystemWrapper.class)
说明
以上是一个简单的FileSystem 说明,总的来说dremio 内部实际对于文件系统的处理使用的是hdfs 的,但是自己进行了包装,不直接依赖hdfs 的FileSystem,默认dremio 不少文件系统的查询会直接用到dremio 说的c3 文件系统(ce 版本的,没有开源,就是通过上边说的 FileSystemWrapper.FILE_SYSTEM_WRAPPER_CLASS key 查找)
参考资料
common/legacy/src/main/java/com/dremio/io/file/FileSystem.java
sabot/kernel/src/main/java/com/dremio/exec/store/dfs/FileSystemWrapper.java
services/accelerator/src/main/java/com/dremio/service/reflection/materialization/AccelerationFileSystem.java
services/accelerator/src/main/java/com/dremio/service/reflection/materialization/AccelerationStoragePlugin.java
sabot/kernel/src/main/java/com/dremio/exec/store/dfs/MetadataStoragePlugin.java
sabot/kernel/src/main/java/com/dremio/exec/store/dfs/RepairKvstoreFromIcebergMetadata.java