FormatMatcher 核心是对于文件系统进行进行格式匹配,方便查询以及执行引擎了解具体支持的数据格式,进行实际数据的处理
每个FormatPlugin 都需要包含一个格式化匹配器
参考类图
IcebergFormatMatcher 参考处理
- 参考iceberg table 格式
- 参考代码
/**
iceberg 特征判断内容
* Matcher for iceberg format. We expect :
*
* a. directory with name "metadata",
* (and)
* b. file with pattern v\d*.metadata.json in (a)
* (and)
* c. file with name "version-hint.text" in (a)
*
*/
public class IcebergFormatMatcher extends FormatMatcher {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(IcebergFormatMatcher.class);
public static final String METADATA_DIR_NAME = "metadata";
private static final Pattern METADATA_FILE_PATTERN = Pattern.compile("v\\\\d*\\.metadata\\.json$");
private static final String VERSION_HINT_FILE_NAME = "version-hint.text";
private final FormatPlugin plugin;
public IcebergFormatMatcher(FormatPlugin plugin) {
this.plugin = plugin;
}
@Override
public FormatPlugin getFormatPlugin() {
return this.plugin;
}
@Override
public boolean matches(FileSystem fs, FileSelection fileSelection, CompressionCodecFactory codecFactory) throws IOException {
return isIcebergTable(fs, fileSelection.getSelectionRoot());
}
// 提供的工具类,方便格式化插件使用
public boolean isFileSystemSupportedIcebergTable(FileSystem fs, String tableRootPath) throws IOException {
if (!isIcebergTable(fs, tableRootPath)) {
return false;
}
Path rootDir = Path.of(tableRootPath);
Path metaDir = rootDir.resolve(METADATA_DIR_NAME);
Path versionHintPath = metaDir.resolve(VERSION_HINT_FILE_NAME);
if (!fs.exists(versionHintPath) || !fs.isFile(versionHintPath)) {
return false;
}
for (FileAttributes file : fs.list(metaDir)) {
if (METADATA_FILE_PATTERN.matcher(file.getPath().getName()).matches()) {
return true;
}
}
return false;
}
// 简单格式判断,此处只进行了目录以及明明的判断,实际上isFileSystemSupportedIcebergTable 才是一个比较完整的判定
private boolean isIcebergTable(FileSystem fs, String tableRootPath) throws IOException {
Path rootDir = Path.of(tableRootPath);
Path metaDir = rootDir.resolve(METADATA_DIR_NAME);
return fs.isDirectory(rootDir) && fs.exists(metaDir) && fs.isDirectory(metaDir);
}
}
实际使用
当前实际使用FormatMatcher 的地方包含了格式化插件的创建管理FormatCreator(主要是一个工具类)以及FileSystemPlugin(利用了FormatCreator)
FormatCreator 主要是进行格式化插件的创建,
- FileSystemPlugin的使用
@Override
public void start() throws IOException {
List<Property> properties = getProperties();
if (properties != null) {
for (Property prop : properties) {
fsConf.set(prop.name, prop.value);
}
}
if (!Strings.isNullOrEmpty(config.getConnection())) {
org.apache.hadoop.fs.FileSystem.setDefaultUri(fsConf, config.getConnection());
}
Map<String,String> map = ImmutableMap.of(
"fs.classpath.impl", ClassPathFileSystem.class.getName(),
"fs.dremio-local.impl", LocalSyncableFileSystem.class.getName()
);
for(Entry<String, String> prop : map.entrySet()) {
fsConf.set(prop.getKey(), prop.getValue());
}
this.optionExtractor = new FormatPluginOptionExtractor(context.getClasspathScan());
this.matchers = Lists.newArrayList();
this.layeredMatchers = Lists.newArrayList();
this.formatCreator = new FormatCreator(context, config, context.getClasspathScan(), this);
// Use default Hadoop implementation
this.codecFactory = HadoopCompressionCodecFactory.DEFAULT;
// 通过formatCreator 创建格式化匹配器
matchers.addAll(formatCreator.getFormatMatchers());
layeredMatchers.addAll(formatCreator.getLayeredFormatMatchers());
// boolean footerNoSeek = contetMutext.getOptionManager().getOption(ExecConstants.PARQUET_FOOTER_NOSEEK);
// NOTE: Add fallback format matcher if given in the configuration. Make sure fileMatchers is an order-preserving list.
this.systemUserFS = createFS(SYSTEM_USERNAME);
dropFileMatchers = matchers.subList(0, matchers.size());
this.fsHealthChecker = FSHealthChecker.getInstance(config.getPath(), config.getConnection(), getFsConf()).orElse((p,m) -> healthCheck(p, m));
createIfNecessary();
}
- dremio 目前支持的格式
public static Map<String, FormatPluginConfig> getDefaultFormats() {
Map<String, FormatPluginConfig> defaultFormats = new TreeMap<>();
defaultFormats.put("csv", createTextFormatPlugin(false, ',', Lists.newArrayList("csv")));
defaultFormats.put("csvh", createTextFormatPlugin(true, ',', Lists.newArrayList("csvh")));
defaultFormats.put("tsv", createTextFormatPlugin(false, '\t', Lists.newArrayList("tsv")));
defaultFormats.put("psv", createTextFormatPlugin(false, '|', Lists.newArrayList("psv", "tbl")));
defaultFormats.put("txt", createTextFormatPlugin(false, '\u0000', Lists.newArrayList("txt")));
TextFormatConfig psva = createTextFormatPlugin(false, '|', Lists.newArrayList("psva", "tbla"));
psva.autoGenerateColumnNames = true;
defaultFormats.put("psva", psva);
defaultFormats.put("parquet", new ParquetFormatConfig());
defaultFormats.put("json", new JSONFormatPlugin.JSONFormatConfig());
defaultFormats.put("dremarrow1", new ArrowFormatPluginConfig());
defaultFormats.put("iceberg", new IcebergFormatConfig());
defaultFormats.put("delta", new DeltaLakeFormatConfig());
defaultFormats.put("xls", new ExcelFormatPluginConfig(true));
defaultFormats.put("excel", new ExcelFormatPluginConfig(false));
return defaultFormats;
}
说明
以上只是一个简单的说明,dremio 实际上还是复用了apache drill 的easy plugin 套路进行格式化的处理,同时基于此包装了不少,后边详细说明下
参考资料
sabot/kernel/src/main/java/com/dremio/exec/store/dfs/FormatMatcher.java
sabot/kernel/src/main/java/com/dremio/exec/store/dfs/FormatPlugin.java
sabot/kernel/src/main/java/com/dremio/exec/store/dfs/FormatCreator.java