首页 > 其他分享 >dremio ManagedStoragePlugin 简单说明

dremio ManagedStoragePlugin 简单说明

时间:2023-01-05 13:11:24浏览次数:46  
标签:dremio java DremioConfig boolean 简单 ManagedStoragePlugin config final

ManagedStoragePlugin 从字面意思可以看出就是托管存储插件,从目前官方的设计来说就是将
自己开发的存储扩展,包装为dremio 可以管理的插件(统一模型以及统一处理)

ManagedStoragePlugin 提供的能力

  • 数据源信息维护,插件会包含一个关联的数据源
  • 元数据策略
  • 数据集的保存工具类
  • 存储插件的规则
  • 修改数据集
  • 获取数据源的配置信息(连接信息)
  • 权限check (此功能属于企业版,可以自己尝试扩展)

提供的能力参考下图

 

 

处理简单说明

dremio 定义了一套插件开发机制,具体可以参考我以前写的,活着看看官方提供的一个jdbc,mongo ,es 插件的开发
对于开发的插件,整体会包含两大类(系统,以及用户)对于系统的dremio 在服务启动的时候会进行系统存储插件的扫描,具
体是SystemStoragePluginInitializer 处理的,实际内部处理也基于了CatalogService

  • 系统存储扩展处理
    SystemStoragePluginInitializer 类中
 
private void pluginsCreation(final BindingProvider provider, final SabotContext sabotContext) throws Exception {
    final DremioConfig config = provider.lookup(DremioConfig.class);
    final CatalogService catalogService = provider.lookup(CatalogService.class);
    final NamespaceService ns = provider.lookup(SabotContext.class).getNamespaceService(SYSTEM_USERNAME);
    final DeferredException deferred = new DeferredException();
    final ProjectConfig projectConfig = provider.lookup(ProjectConfig.class);
 
    final Path supportPath = Paths.get(sabotContext.getOptionManager().getOption(TEMPORARY_SUPPORT_PATH));
    final Path logPath = Paths.get(System.getProperty(DREMIO_LOG_PATH_PROPERTY, "/var/log/dremio"));
 
    final ProjectConfig.DistPathConfig uploadsPathConfig = projectConfig.getUploadsConfig();
    final ProjectConfig.DistPathConfig accelerationPathConfig = projectConfig.getAcceleratorConfig();
    final ProjectConfig.DistPathConfig scratchPathConfig = projectConfig.getScratchConfig();
    final ProjectConfig.DistPathConfig metadataPathConfig = projectConfig.getMetadataConfig();
    final URI downloadPath = config.getURI(DremioConfig.DOWNLOADS_PATH_STRING);
    final URI resultsPath = config.getURI(DremioConfig.RESULTS_PATH_STRING);
    // Do not construct URI simply by concatenating, as it might not be encoded properly
    final URI logsPath = new URI("pdfs", "///" + logPath.toUri().getPath(), null);
    final URI supportURI = supportPath.toUri();
 
    final boolean enableAsyncForUploads = enable(config, DremioConfig.DEBUG_UPLOADS_ASYNC_ENABLED);
    createSafe(catalogService, ns,
      HomeFileConf.create(HomeFileSystemStoragePlugin.HOME_PLUGIN_NAME, uploadsPathConfig.getUri(), config.getThisNode(),
        SchemaMutability.USER_TABLE, CatalogService.NEVER_REFRESH_POLICY,
        enableAsyncForUploads, scratchPathConfig.getDataCredentials()), deferred);
   
    final int maxCacheSpacePercent = config.hasPath(DremioConfig.DEBUG_DIST_MAX_CACHE_SPACE_PERCENT)?
      config.getInt(DremioConfig.DEBUG_DIST_MAX_CACHE_SPACE_PERCENT) : MAX_CACHE_SPACE_PERCENT;
 
    final boolean enableAsyncForAcceleration = enable(config, DremioConfig.DEBUG_DIST_ASYNC_ENABLED);
 
    final boolean enableS3FileStatusCheck = isEnableS3FileStatusCheck(config, accelerationPathConfig);
    boolean enableCachingForAcceleration = isEnableCaching(sabotContext, config, accelerationPathConfig, CLOUD_CACHING_ENABLED);
 
    createSafe(catalogService, ns,
        AccelerationStoragePluginConfig.create(accelerationPathConfig.getUri(), enableAsyncForAcceleration,
            enableCachingForAcceleration, maxCacheSpacePercent, enableS3FileStatusCheck,
          accelerationPathConfig.getDataCredentials()), deferred);
 
    final boolean enableAsyncForJobs = enable(config, DremioConfig.DEBUG_JOBS_ASYNC_ENABLED);
    createSafe(catalogService, ns,
      InternalFileConf.create(DACDaemonModule.JOBS_STORAGEPLUGIN_NAME, resultsPath, SchemaMutability.SYSTEM_TABLE,
        CatalogService.DEFAULT_METADATA_POLICY_WITH_AUTO_PROMOTE, enableAsyncForJobs, null), deferred);
 
    final boolean enableAsyncForScratch = enable(config, DremioConfig.DEBUG_SCRATCH_ASYNC_ENABLED);
    createSafe(catalogService, ns,
      InternalFileConf.create(DACDaemonModule.SCRATCH_STORAGEPLUGIN_NAME, scratchPathConfig.getUri(), SchemaMutability.USER_TABLE,
        CatalogService.NEVER_REFRESH_POLICY_WITH_AUTO_PROMOTE, enableAsyncForScratch, scratchPathConfig.getDataCredentials()), deferred);
 
    final boolean enableAsyncForDownload = enable(config, DremioConfig.DEBUG_DOWNLOAD_ASYNC_ENABLED);
    createSafe(catalogService, ns,
      InternalFileConf.create(DATASET_DOWNLOAD_STORAGE_PLUGIN, downloadPath, SchemaMutability.USER_TABLE,
        CatalogService.NEVER_REFRESH_POLICY, enableAsyncForDownload, null), deferred);
 
    final boolean enableAsyncForMetadata = enable(config, DremioConfig.DEBUG_METADATA_ASYNC_ENABLED);
 
    final boolean enableS3FileStatusCheckForMetadata = isEnableS3FileStatusCheck(config, metadataPathConfig);
    boolean enableCachingForMetadata = isEnableCaching(sabotContext, config, metadataPathConfig, METADATA_CLOUD_CACHING_ENABLED);
 
    createSafe(catalogService, ns,
            MetadataStoragePluginConfig.create(metadataPathConfig.getUri(), enableAsyncForMetadata,
                    enableCachingForMetadata, maxCacheSpacePercent, enableS3FileStatusCheckForMetadata,
                    metadataPathConfig.getDataCredentials()), deferred);
   
    final boolean enableAsyncForLogs = enable(config, DremioConfig.DEBUG_LOGS_ASYNC_ENABLED);
    createSafe(catalogService, ns,
      InternalFileConf.create(LOGS_STORAGE_PLUGIN, logsPath, SchemaMutability.NONE,
        CatalogService.NEVER_REFRESH_POLICY, enableAsyncForLogs, null), deferred);
 
    final boolean enableAsyncForSupport = enable(config, DremioConfig.DEBUG_SUPPORT_ASYNC_ENABLED);
    createSafe(catalogService, ns,
      InternalFileConf.create(LOCAL_STORAGE_PLUGIN, supportURI, SchemaMutability.SYSTEM_TABLE,
        CatalogService.NEVER_REFRESH_POLICY, enableAsyncForSupport, null), deferred);
 
    deferred.throwAndClear();
  }
  • 用户开发的
    这个是有类加载机制,结合用户进行的api 操作创建的,实现上可以参考下SourceCatalog
    之后会统一放到一个集合中
    PluginsManager 类中的方法
 
public ManagedStoragePlugin create(SourceConfig config, String userName, NamespaceAttribute... attributes) throws TimeoutException, Exception {
    if (hasPlugin(config.getName())) {
      throw new SourceAlreadyExistsException();
    }
 
    ManagedStoragePlugin plugin = newPlugin(config);
    try {
      plugin.createSource(config, userName, attributes);
    } catch(UserException e) {
      //The creation of Source can fail due to various reasons.
      //In case of failure, we need to cleanup the in-memory state of the source. Hence closing the plugin.
      logger.error("Exception while creating source.", e);
      try {
        plugin.close();
      } catch (Exception ex) {
        e.addSuppressed(ex);
      }
      throw e;
    }
 
    // use concurrency features of concurrent hash map to avoid locking.
    ManagedStoragePlugin existing = plugins.putIfAbsent(c(config.getName()), plugin);
 
    if (existing == null) {
      return plugin;
    }
 
    // This means it  has been added by a concurrent thread doing create with the same name
    final SourceAlreadyExistsException e = new SourceAlreadyExistsException();
    try {
      // this happened in time with someone else.
      plugin.close();
    } catch (Exception ex) {
      e.addSuppressed(ex);
    }
    throw e;
  }

参考资料

sabot/kernel/src/main/java/com/dremio/exec/catalog/ManagedStoragePlugin.java
sabot/kernel/src/main/java/com/dremio/exec/catalog/PluginsManager.java
dac/backend/src/main/java/com/dremio/dac/daemon/SystemStoragePluginInitializer.java
common/src/main/java/com/dremio/service/InitializerRegistry.java
sabot/kernel/src/main/java/com/dremio/exec/catalog/CatalogServiceImpl.java
sabot/kernel/src/main/java/com/dremio/exec/catalog/SourceCatalog.java
sabot/kernel/src/main/java/com/dremio/exec/catalog/PermissionCheckCache.java

标签:dremio,java,DremioConfig,boolean,简单,ManagedStoragePlugin,config,final
From: https://www.cnblogs.com/rongfengliang/p/17027256.html

相关文章

  • dremio SourceCatalog 服务说明
    SourceCatalog主要进行source的管理,包含了获取信息,创建,更新,删除,包含了不同的实现SourceCatalog服务定义/***Interfacetoperformactionsonsources.......
  • 简单动态规划:最长上升子序列
    什么是最长上升子序列?最长上升子序列是指,从原序列中按顺序取出一些数字排在一起,这些数字是逐渐增大的,这样的序列的最长长度。引自kkksc03校长的题面【狗头保命】看懂......
  • wfp一个简单的动画
    https://www.cnblogs.com/Peter-Luo/p/12431549.html<Windowx:Class="WpfApp1.Window1"xmlns="http://schemas.microsoft.com/winfx/2006/xaml/presentation"......
  • 技术汇总:第十八章:枚举的简单使用
    结合上一章阅读:https://blog.csdn.net/java_wxid/article/details/99168098枚举代码:packagecom.javaliao.backstage;importlombok.Getter;publicenumMyData{......
  • 简单又好用的财务分析工具有哪些?
    什么样的财务分析工具才能算是简单又好用?是能够快速完成组合多变的财务指标运算分析;能够充分发挥企业经营健康晴雨表作用,反映企业财务健康状态;还是能够支持多维度动态分析、......
  • Ansible安装及简单使用
    一、配置免密互通参考地址:https://www.cnblogs.com/qq1035807396/p/16998602.html二、开始安装Ansible1、添加repoyuminstall-yhttps://dl.fedoraproject.org/......
  • Exchanger的简单使用
    Exchanger适用于超过一个线程之间数据交换  代码案例packagecom.java.test.exchanger;importorg.junit.Test;importjava.util.Random;importjava.util.con......
  • 09-简单数据类型和复杂数据类型
    简单数据类型简单数据类型null,返回的是一个空的对象object如果有个变量以后打算存储为对象,暂时没想好放啥,这个时候可以放null简单数据类型是存放在栈里面,里面......
  • 设计模式简单介绍
    注:所有知识来源于《设计模式:可复用软件面向对象的基础》1什么是设计模式ChristopherAlexander说过:“每一个模式描述了一个在我们周围不断重复发生的问题,以及该问题的解......
  • dremio DatasetSaver 服务说明
    我以前简单写过关于元数据处理的说明(基于jprofiler+arthas工具)会依赖namespace服务实际对于数据的操作都是通过SourceMetadataManager执行的DatasetSaver服务提供的......