首页 > 其他分享 >dremio FormatMatcher 简单说明

dremio FormatMatcher 简单说明

时间:2023-01-10 12:12:04浏览次数:36  
标签:dremio fs false defaultFormats FormatMatcher return 简单 put new

FormatMatcher 核心是对于文件系统进行进行格式匹配,方便查询以及执行引擎了解具体支持的数据格式,进行实际数据的处理
每个FormatPlugin 都需要包含一个格式化匹配器

参考类图

 

 

IcebergFormatMatcher 参考处理

  • 参考iceberg table 格式

 

 

  • 参考代码
 
/**
  iceberg 特征判断内容
 * Matcher for iceberg format. We expect :
 *
 * a. directory with name "metadata",
 *  (and)
 * b. file with pattern v\d*.metadata.json in (a)
 *  (and)
 * c. file with name "version-hint.text" in (a)
 *
 */
public class IcebergFormatMatcher extends FormatMatcher {
  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(IcebergFormatMatcher.class);
  public static final String METADATA_DIR_NAME = "metadata";
  private static final Pattern METADATA_FILE_PATTERN = Pattern.compile("v\\\\d*\\.metadata\\.json$");
  private static final String VERSION_HINT_FILE_NAME = "version-hint.text";
  private final FormatPlugin plugin;
 
  public IcebergFormatMatcher(FormatPlugin plugin) {
    this.plugin = plugin;
  }
 
  @Override
  public FormatPlugin getFormatPlugin()  {
    return this.plugin;
  }
 
  @Override
  public boolean matches(FileSystem fs, FileSelection fileSelection, CompressionCodecFactory codecFactory) throws IOException {
    return isIcebergTable(fs, fileSelection.getSelectionRoot());
  }
  // 提供的工具类,方便格式化插件使用
  public boolean isFileSystemSupportedIcebergTable(FileSystem fs, String tableRootPath) throws IOException {
    if (!isIcebergTable(fs, tableRootPath)) {
      return false;
    }
 
    Path rootDir = Path.of(tableRootPath);
    Path metaDir = rootDir.resolve(METADATA_DIR_NAME);
    Path versionHintPath = metaDir.resolve(VERSION_HINT_FILE_NAME);
    if (!fs.exists(versionHintPath) || !fs.isFile(versionHintPath)) {
      return false;
    }
 
    for (FileAttributes file : fs.list(metaDir)) {
      if (METADATA_FILE_PATTERN.matcher(file.getPath().getName()).matches()) {
        return true;
      }
    }
    return false;
  }
 // 简单格式判断,此处只进行了目录以及明明的判断,实际上isFileSystemSupportedIcebergTable 才是一个比较完整的判定
  private boolean isIcebergTable(FileSystem fs, String tableRootPath) throws IOException {
    Path rootDir = Path.of(tableRootPath);
    Path metaDir = rootDir.resolve(METADATA_DIR_NAME);
    return fs.isDirectory(rootDir) && fs.exists(metaDir) && fs.isDirectory(metaDir);
  }
}

实际使用

当前实际使用FormatMatcher 的地方包含了格式化插件的创建管理FormatCreator(主要是一个工具类)以及FileSystemPlugin(利用了FormatCreator)
FormatCreator 主要是进行格式化插件的创建,

  • FileSystemPlugin的使用
 
@Override
  public void start() throws IOException {
    List<Property> properties = getProperties();
    if (properties != null) {
      for (Property prop : properties) {
        fsConf.set(prop.name, prop.value);
      }
    }
 
    if (!Strings.isNullOrEmpty(config.getConnection())) {
      org.apache.hadoop.fs.FileSystem.setDefaultUri(fsConf, config.getConnection());
    }
 
    Map<String,String> map =  ImmutableMap.of(
            "fs.classpath.impl", ClassPathFileSystem.class.getName(),
            "fs.dremio-local.impl", LocalSyncableFileSystem.class.getName()
    );
    for(Entry<String, String> prop : map.entrySet()) {
      fsConf.set(prop.getKey(), prop.getValue());
    }
 
    this.optionExtractor = new FormatPluginOptionExtractor(context.getClasspathScan());
    this.matchers = Lists.newArrayList();
    this.layeredMatchers = Lists.newArrayList();
    this.formatCreator = new FormatCreator(context, config, context.getClasspathScan(), this);
    // Use default Hadoop implementation
    this.codecFactory = HadoopCompressionCodecFactory.DEFAULT;
    // 通过formatCreator 创建格式化匹配器
    matchers.addAll(formatCreator.getFormatMatchers());
    layeredMatchers.addAll(formatCreator.getLayeredFormatMatchers());
 
//    boolean footerNoSeek = contetMutext.getOptionManager().getOption(ExecConstants.PARQUET_FOOTER_NOSEEK);
    // NOTE: Add fallback format matcher if given in the configuration. Make sure fileMatchers is an order-preserving list.
    this.systemUserFS = createFS(SYSTEM_USERNAME);
    dropFileMatchers = matchers.subList(0, matchers.size());
    this.fsHealthChecker = FSHealthChecker.getInstance(config.getPath(), config.getConnection(), getFsConf()).orElse((p,m) -> healthCheck(p, m));
 
    createIfNecessary();
  }
  • dremio 目前支持的格式
public static Map<String, FormatPluginConfig> getDefaultFormats() {
  Map<String, FormatPluginConfig> defaultFormats = new TreeMap<>();
  defaultFormats.put("csv", createTextFormatPlugin(false, ',', Lists.newArrayList("csv")));
  defaultFormats.put("csvh", createTextFormatPlugin(true, ',', Lists.newArrayList("csvh")));
  defaultFormats.put("tsv", createTextFormatPlugin(false, '\t', Lists.newArrayList("tsv")));
  defaultFormats.put("psv", createTextFormatPlugin(false, '|', Lists.newArrayList("psv", "tbl")));
  defaultFormats.put("txt", createTextFormatPlugin(false, '\u0000', Lists.newArrayList("txt")));
  TextFormatConfig psva = createTextFormatPlugin(false, '|', Lists.newArrayList("psva", "tbla"));
  psva.autoGenerateColumnNames = true;
  defaultFormats.put("psva", psva);
 
  defaultFormats.put("parquet", new ParquetFormatConfig());
  defaultFormats.put("json", new JSONFormatPlugin.JSONFormatConfig());
  defaultFormats.put("dremarrow1", new ArrowFormatPluginConfig());
  defaultFormats.put("iceberg", new IcebergFormatConfig());
  defaultFormats.put("delta", new DeltaLakeFormatConfig());
  defaultFormats.put("xls", new ExcelFormatPluginConfig(true));
  defaultFormats.put("excel", new ExcelFormatPluginConfig(false));
  return defaultFormats;
}

说明

以上只是一个简单的说明,dremio 实际上还是复用了apache drill 的easy plugin 套路进行格式化的处理,同时基于此包装了不少,后边详细说明下

参考资料

sabot/kernel/src/main/java/com/dremio/exec/store/dfs/FormatMatcher.java
sabot/kernel/src/main/java/com/dremio/exec/store/dfs/FormatPlugin.java
sabot/kernel/src/main/java/com/dremio/exec/store/dfs/FormatCreator.java

标签:dremio,fs,false,defaultFormats,FormatMatcher,return,简单,put,new
From: https://www.cnblogs.com/rongfengliang/p/17039764.html

相关文章

  • 面向对象程序设计 第二章 C++简单的程序设计
    目录C++语言的特点1.兼容C语言·它保持了C的简洁、高效和接近汇编语言等特点。·对C的类型系统进行了改革和扩充。·C++也支持面向过程的程序设计,不是一个纯正的面......
  • VUE 防抖简单实现
    防抖代码实现:exportclassDebounce{staticcurrent:Debounce=newDebounce();staticasyncInvoke<T>(func:()=>Promise<T>,timeout:number=300)......
  • leetcode简单:[1, 9, 13, 14, 20, 21, 26, 27, 35, 58]
    目录1.两数之和9.回文数13.罗马数字转整数14.最长公共前缀20.有效的括号21.合并两个有序链表26.删除有序数组中的重复项27.移除元素35.搜索插入位置58.最后一个......
  • leetcode简单:[66, 67, 70, 83, 121, 141, 160, 169, ,206, 338]
    目录66.加一67.二进制求和70.爬楼梯83.删除排序链表中的重复元素121.买卖股票的最佳时机141.环形链表160.相交链表169.多数元素206.反转链表338.比特位计数66.......
  • HIVE简单操作命令
    beelinebeeline>!connectjdbc:hive2://192.168.2.2:10000hdfs回车回车直接用默认表,不需要配置权限createtablest(idint,namestring)rowformatdelimitedfieldster......
  • Redis 数据结构-简单动态字符串
    Redis数据结构-简单动态字符串 无边落木萧萧下,不尽长江滚滚来。 1、简介Redis之所以快主要得益于它的数据结构、操作内存数据库、单线程和多路I/O......
  • 硬盘检测工具 Victoria 简单使用教程
    一、注意事项1.下载完成后不要在压缩包内运行软件直接使用,先解压;2.如果软件无法正常打开,请右键使用管理员模式运行。3.为确保检测结果准确(避免卡深灰块),运行Victo......
  • 简单聊聊数据脱敏与数据加密
    数据的脱敏与加密在数据安全领域极为重要,也是大数据平台、数据中台建设中不可缺少的一环,我将以数据脱敏与数据加密的特征以及区别、方法论来进行研究学习。一、数据脱敏与......
  • 论文查重|三种免费查重方法、五种论文降重技巧,论文查重就是这么简单!
    三种免费论文降重方法方法一:论文查重软件这款查重软件可以帮助大家免费论文查重,并得出重复率以及检测报告,按照检测报告的要求修改论文,重复率降到20%,论文就合规了。查重软件......
  • 倒排索引的 JAVA 简单实现
      倒排索引的简单JAVA实现,当玩具其实都很粗糙,简单实现下原理:publicclassIntertedIndex{//倒排索引privateMap<String,List<String>>indexMa......