首页 > 其他分享 >dremio FileSystem 简单说明

dremio FileSystem 简单说明

时间:2024-02-07 09:02:04浏览次数:35  
标签:hdfs dremio fs java FileSystem 简单 metadata

dremio 尽管对于文件系统的使用很多底层都是hdfs 的(s3,发射加速),dremio 为了减少直接依赖hdfs,自己抽象了一个FileSystem 接口
对于不同的实现可以方便进行扩展,当然和刚才说的一样,不少底层依赖的是hdfs 的FileSystem

参考子类

如下图


简单说明:
FilterFileSystem 实现了FileSystem,具体的操作是对于接口的实现,委托给了另外一个FileSystem 实现
AccelerationFileSystem 扩展了FilterFileSystem,主要是对于反射加速的,核心使用在文件系统存储扩展中,同时因为dremio 不建议使用伪分布式系统了
所以进行了check 以及异常处理

  public FileSystem createFS(String userName, OperatorContext operatorContext, boolean metadata) throws IOException {
    FileSystem fs = new AccelerationFileSystem(super.createFS(userName, operatorContext, metadata));
    if (fs.isPdfs()) {
      // Logging to help with debugging DX-54664
      IllegalStateException exception = new IllegalStateException("AccelerationStoragePlugin does not support PDFS.  User: " + userName);
      logger.error(exception.getMessage(), exception);
    }
    return fs;
  }

DistStorageMetadataPathRewritingFileSystem 官方的介绍是为了动态元数据路径的rewriting,核心是iceberg 等的元数据,主要在MetadataStoragePlugin 中使用, 目前RepairKvstoreFromIcebergMetadata 中也有使用到
MetadataStoragePlugin的使用

  @Override
  public FileSystem createFS(String userName, OperatorContext operatorContext, boolean metadata) throws IOException {
    if (getContext().getOptionManager().getOption(ExecConstants.ENABLE_UNLIMITED_SPLITS_DISTRIBUTED_STORAGE_RELOCATION)) {
      FileSystem f = super.createFS(userName, operatorContext, metadata);
      Path configPath = this.getConfig().getPath();
      return new DistStorageMetadataPathRewritingFileSystem(f, configPath);
    } else {
      return super.createFS(userName, operatorContext, metadata);
    }
  }

RepairKvstoreFromIcebergMetadata 的使用,目前看主要是对于iceberg 表的一些修复

  /**
   * re-syncs the unlimited splits metadata by frontloading the last-seen metadata.json file within fs dir, to the HEAD of the commit.
   * this commit will be used at reference point to conduct the repair method.
   * @param icebergTableIdentifier: the icebergTableIdentifier used to locate the internal iceberg table
   */
  private void repairMetadataDesyncIfNecessary(IcebergTableIdentifier icebergTableIdentifier) throws IOException {
    FileSystem fs = this.metaStoragePlugin.getSystemUserFS();
    String icebergMetadataFileLocation = this.datasetConfig.getPhysicalDataset().getIcebergMetadata().getMetadataFileLocation();
    // If feature flag ON, and the file does not exist in the fs, perform metadata re-sync
    if (metaStoragePlugin.getContext().getOptionManager().getOption(ExecConstants.ENABLE_UNLIMITED_SPLITS_DISTRIBUTED_STORAGE_RELOCATION) &&
      !fs.exists(Path.of(icebergMetadataFileLocation))) {
 
      logger.warn(String.format("metadata file: %s was not found within storage %s. Attempting self-heal to re-sync metadata",
          icebergMetadataFileLocation,
        ((DistStorageMetadataPathRewritingFileSystem) fs).getDistStoragePath()));
      Provider<NessieApiV2> api = ((IcebergNessieModel) icebergModel).getNessieApi();
 
      //Construct the internal iceberg's TableOperations
      IcebergNessieTableOperations icebergNessieTableOperations = new IcebergNessieTableOperations(
        null,
        api,
        metaStoragePlugin.createIcebergFileIO(fs, null, null, null, null),
        (IcebergNessieTableIdentifier) icebergTableIdentifier,
        null,
        optionManager);
 
      Path metadataPathLocation = getLatestMetastoreVersionIfPathExists(Path.of(icebergMetadataFileLocation), fs);
 
      //Refresh the object to populate the rest of the table operations.
      //fs.open and fs.getFileAttributes will be called during refresh. Calls to the "current metadata location" will
      //be redirected to point to the latest-existing metastore version found within the fs directory.
      //The table metadata returned represents the contents found by the re-written path.
      icebergNessieTableOperations.doRefreshFromPreviousCommit(metadataPathLocation.toString());
      TableMetadata LatestExistingtableMetadata = icebergNessieTableOperations.current();
 
      //Commit the outdated metadata to Nessie. Committing the latest-existing metadata pushes it to HEAD.
      //The new commit generates a new (temporary) metadata.json file.
      //The new (temporary) commit serves as the "old metadata" reference point during the repair.
      icebergNessieTableOperations.doCommit(null, LatestExistingtableMetadata);
    }
  }

LoggedFileSystem 包含日志的信息的FileSystem,dremio 不少扩展都扩展了此类,比如cache 的,当然日志支持基于support key 的配置
DremioHadoopFileSystemWrapper, 目前主要是对于hive 查询的包装,核心是hdfs 那套东西的处理
HadoopFileSystem 是dremio 对于实际内部文件系统的包装,比如dremio 使用的比较多的hdfs
参考处理

// 直接使用的底层hdfs 的getLocal
  public static FileSystem getLocal(Configuration fsConf) throws IOException {
    fsConf.set(HADOOP_SECURITY_CREDENTIAL_PROVIDER_PATH, DREMIO_CREDENTIAL_PROVIDER_PATH);
    org.apache.hadoop.fs.FileSystem fs = org.apache.hadoop.fs.FileSystem.getLocal(fsConf);
    return get(fs);
  }
// 文件打开
  public FSInputStream open(Path f) throws IOException {
    try (WaitRecorder metaRecorder = OperatorStats.getMetadataWaitRecorder(operatorStats, f)) {
     // newFSDataInputStreamWrapper dremio 包装的hdfs FSDataInputStream 处理,底层还是会到hdfs
      return newFSDataInputStreamWrapper(f, underlyingFs.open(toHadoopPath(f)), operatorStats, true);
    } catch(FSError e) {
      throw propagateFSError(e);
    }
  }

包装类

FileSystemWrapper 是对于FileSystem 的包装,参考实现


简单说明:
LoggedFileSystemWrapper 是FileSystemWrapper 的实现,主要支持日志记录功能,如果开启了log 会使用LoggedFileSystem

  public FileSystem wrap(FileSystem fs, String storageId, AsyncStreamConf conf, OperatorContext context,
      boolean enableAsync, boolean isMetadataRefresh) throws IOException {
    FileSystem wrappedFs = defaultWrapper.wrap(fs, storageId, conf, context, enableAsync, isMetadataRefresh);
    if (LoggedFileSystem.isLoggingEnabled()) {
      // use options from the OperatorContext if available, otherwise fall back to global options
      OptionResolver options = context != null && context.getOptions() != null ? context.getOptions() : globalOptions;
      wrappedFs = new LoggedFileSystem(wrappedFs, options);
    }
    return wrappedFs;
  }

否则是sabotcontext 创建的,通过 FileSystemWrapper.FILE_SYSTEM_WRAPPER_CLASS 动态加载的class

    this.fileSystemWrapper = new LoggedFileSystemWrapper(
        config.getInstance(
            FileSystemWrapper.FILE_SYSTEM_WRAPPER_CLASS,
            FileSystemWrapper.class,
            (fs, storageId, conf, operatorContext, enableAsync, isMetadataEnabled) -> fs,
            dremioConfig,
            this.optionManager,
            allocator,
            new ServiceSetDecorator(coord.getServiceSet(Role.EXECUTOR)),
            endpoint),

当然dremio 说的c3 cache 也对于FileSystemWrapper 进行了实现 (CacheFileSystemWrapper.class)

说明

以上是一个简单的FileSystem 说明,总的来说dremio 内部实际对于文件系统的处理使用的是hdfs 的,但是自己进行了包装,不直接依赖hdfs 的FileSystem,默认dremio 不少文件系统的查询会直接用到dremio 说的c3 文件系统(ce 版本的,没有开源,就是通过上边说的 FileSystemWrapper.FILE_SYSTEM_WRAPPER_CLASS key 查找)

参考资料

common/legacy/src/main/java/com/dremio/io/file/FileSystem.java
sabot/kernel/src/main/java/com/dremio/exec/store/dfs/FileSystemWrapper.java
services/accelerator/src/main/java/com/dremio/service/reflection/materialization/AccelerationFileSystem.java
services/accelerator/src/main/java/com/dremio/service/reflection/materialization/AccelerationStoragePlugin.java
sabot/kernel/src/main/java/com/dremio/exec/store/dfs/MetadataStoragePlugin.java
sabot/kernel/src/main/java/com/dremio/exec/store/dfs/RepairKvstoreFromIcebergMetadata.java

标签:hdfs,dremio,fs,java,FileSystem,简单,metadata
From: https://www.cnblogs.com/rongfengliang/p/18004958

相关文章

  • 超简单!手把手实现axum简易中间件
    axum是Rust语言tokio生态中的重要一环,以轻量、模块化、易用而闻名于世。它的中间件系统集成自另一个叫tower的框架,这就意味着如果我们要写axum的中间件的话,就得了解一下这个tower的各个核心概念,并学习它的用法。但是,很多时候我们可能只是想写一点简单的小工具,为了小需求去学习一个......
  • harmonyOS基础(二)-简单认识UIAbility
    大家好!我是黑臂麒麟,一位6年的前端工程师;随着鸿蒙4.0的发布。鸿蒙的社区壮大,而且市场越来越对harmonyOS认可度越来越高。现很多大公司开始需要招聘鸿蒙应用开发工程师,待遇都非常好。以后中心厂跟进,也可以赶上红利;之前一直想入坑鸿蒙,但犹豫徘徊,2024不在等待,只争朝夕学,勇往直前。系统......
  • 用Java编译一个简单计算器
    作业写一个计算器,要求实现加减乘除功能,并且能够循环接收新的数据,通过用户交互实现。思路推荐:写4个方法,加减乘除利用循环+switch进行用户交互传递需要操作的两个数输出结构packagecom.hongyi.method;importjava.util.Scanner;//写一个计算器,要实现加减乘除功能,......
  • 【CPL-2023】W2笔记-变量、类型、简单IO
    int类型范围-2^31~2^31-1UB未定义行为2^31-1+1的话会发生未定义行为,产生溢出时时未定义行为编译器的开发者可以以任意的行为来应对c标准中的未定义行为int型默认保持32比特/0或者%0会产生UB(未定义行为)/0为了兼容多个厂商的除法器而存在UB,有些除法器抛出错误,有......
  • python简单加解密
    有的内容并不怕别人看,但仍想简单加解密一下,可以考虑以下代码:defencrypt(text):encrypted_text=""forcharintext:unicode_value=ord(char)+10#在原有的Unicode值上加上10encrypted_text+=chr(unicode_value)returnencrypted_tex......
  • Ubuntu环境下安装并简单测试ros2
    1.设置编码aptupdate&&aptinstalllocaleslocale-genen_USen_US.UTF-8update-localeLC_ALL=en_US.UTF-8LANG=en_US.UTF-8exportLANG=en_US.UTF-82.添加源aptupdate&&aptinstallcurlgnupglsb-releasecurl-sSLhttps://raw.githubuserconte......
  • dremio cloud cache 简单说明
    dremiocloudcache实际上就是对于云文件系统的cache加速(比如hdfs,s3。。。),在处理的时候使用了ce包装的包,详细源码并没有开源我们可以通过一些代码整体看下实现参考处理dremio-ce-services-cachemanager中的处理cecaache管理配置dremio:{classpath.scan......
  • 48从零开始用Rust编写nginx,搭建一个简单又好看官方网站
    wmproxywmproxy已用Rust实现http/https代理,socks5代理,反向代理,负载均衡,静态文件服务器,websocket代理,四层TCP/UDP转发,内网穿透等,会将实现过程分享出来,感兴趣的可以一起造个轮子项目地址国内:https://gitee.com/tickbh/wmproxygithub:https://github.com/tickbh/wmpro......
  • 2.2 如何把一道简单思维题变难
    今天是搞笑场,符合“精神状况记录”的tag。方法一:将\(O(n)\)甚至\(O(\logn)\)的需要一定思维的题目,将\(n\)开到\(1000\),\(100\)等两级。ARC108DAB给定\(c_{A/B,A/B}\in\{A,B\}\),每次可以在\(XY\)间插入\(c_{X,Y}\),问可以得到多少种长\(n\)序列。重要:数据范......
  • SRS实现网页和手机端简单直播
    一.SRS简介SRS官方网站:https://github.com/ossrs/srs/wiki/v3_CN_Home,https://ossrs.net/lts/zh-cn/SRS是一个开源的(MIT协议)简单高效的实时视频服务器,支持RTMP、WebRTC、HLS、HTTP-FLV、SRT、MPEG-DASH和GB28181等协议。SRS媒体服务器和FFmpeg、OBS、VLC、 WebRTC等客户端配合......