首页 > 其他分享 >dremio CatalogMaintenanceService 服务简单说明

dremio CatalogMaintenanceService 服务简单说明

时间:2024-05-10 09:05:00浏览次数:27  
标签:pathsWithCounts dremio java dac versions getKey CatalogMaintenanceService 简单

说明此服务是从25.0 开始包含的,同时在release note 中也有说明,以下主要说明下内部实现

release 信息

如下,具体就不翻译了,主要是添加了一个每个任务进行每个view最大保留50个历史信息

Added daily catalog maintenance tasks to trim history of views to a maximum of 50 records per view. This limits the storage needed for datasetVersions records in the KV store.

内部处理

是由CatalogMaintenanceService 服务启动的 CatalogMaintenanceRunnableProvider 任务

  • 服务注册

可以看到此服务是在协调节点执行的,也比较符合dremio的套路

private void registerCatalogMaintenanceService(
      SingletonRegistry registry, boolean isCoordinator) {
    if (isCoordinator) {
      registry.bindSelf(
          new CatalogMaintenanceService(
              registry.provider(SchedulerService.class),
              registry.provider(java.util.concurrent.ExecutorService.class),
              new CatalogMaintenanceRunnableProvider(
                      registry.provider(OptionManager.class),
                      registry.provider(KVStoreProvider.class).get())
                  .get(0)));
    }
  }
  • CatalogMaintenanceRunnableProvider 内部处理
CatalogMaintenanceRunnable.builder()
    .setName("TrimVersions")
    .setSchedule(makeDailySchedule(trimVersionsTime))
    .setRunnable(
        () ->
            DatasetVersionTrimmer.trimHistory(
                Clock.systemUTC(),
                storeProvider.getStore(DatasetVersionMutator.VersionStoreCreator.class),
               // 此值是50
                (int) optionManager.getOption(NamespaceOptions.DATASET_VERSIONS_LIMIT),
                minAgeInDays))
    .build());
  • DatasetVersionTrimmer.trimHistory 处理

实际处理,代码注释应该都说明了,可以结合分析下

private void trimHistory(int maxVersionsToKeep, int minAgeInDays) {
    Preconditions.checkArgument(maxVersionsToKeep > 0, "maxVersionsToKeep must be positive");
    Preconditions.checkArgument(minAgeInDays > 0, "minAgeInDays must be positive");
 
    // Assume number of datasets is somewhat small compared to number of versions.
    // First pass: count versions per dataset.
    Map<DatasetPath, Integer> counts = Maps.newHashMap();
    for (Document<DatasetVersionMutator.VersionDatasetKey, VirtualDatasetVersion> entry :
        datasetVersionsStore.find()) {
      counts.compute(entry.getKey().getPath(), (key, count) -> count != null ? count + 1 : 1);
    }
 
    // Collect and order paths with more than requested number of versions.
    ImmutableList<Map.Entry<DatasetPath, Integer>> pathsWithCounts =
        counts.entrySet().stream()
            .sorted(Comparator.comparing(e -> e.getKey().toPathString()))
            .collect(ImmutableList.toImmutableList());
    ImmutableSet<DatasetPath> pathsSet =
        pathsWithCounts.stream()
            .filter(e -> e.getValue() > maxVersionsToKeep)
            .map(Map.Entry::getKey)
            .collect(ImmutableSet.toImmutableSet());
 
    if (!pathsSet.isEmpty()) {
      // Second pass: get versions to delete (past the maxVersionsToKeep) and update (set previous
      // version to null in the last element of the kept history).
      ArrayList<DatasetVersionMutator.VersionDatasetKey> keysToDelete = new ArrayList<>();
      Map<DatasetVersionMutator.VersionDatasetKey, VirtualDatasetVersion> versionsToUpdate =
          Maps.newHashMap();
      DatasetPath startPath = pathsWithCounts.get(0).getKey();
      int versionsInRange = 0;
      for (int index = 0; index < pathsWithCounts.size(); index++) {
        Map.Entry<DatasetPath, Integer> pathAndCount = pathsWithCounts.get(index);
        DatasetPath endPath = pathAndCount.getKey();
        versionsInRange += pathAndCount.getValue();
        if (versionsInRange < MAX_VERSIONS_IN_RANGE && index + 1 < pathsWithCounts.size()) {
          continue;
        }
 
        // Collect versions to trim/update in the range.
        logger.info("Collecting records to trim, batch: s: {} e: {}", startPath, endPath);
        keysToDelete.clear();
        versionsToUpdate.clear();
        findVersionKeysToTrim(
            startPath,
            endPath,
            pathsSet,
            maxVersionsToKeep,
            minAgeInDays,
            keysToDelete,
            versionsToUpdate);
 
        // Update versions first, for any partial updates due to errors/conflicts etc, next run will
        // fix it.
        logger.info("Updating batch of {} older dataset versions", versionsToUpdate.size());
        for (Map.Entry<DatasetVersionMutator.VersionDatasetKey, VirtualDatasetVersion> entry :
            versionsToUpdate.entrySet()) {
          datasetVersionsStore.put(entry.getKey(), entry.getValue());
        }
        for (List<DatasetVersionMutator.VersionDatasetKey> keysRange :
            Lists.partition(keysToDelete, MAX_VERSIONS_TO_DELETE)) {
          logger.info("Deleting batch of {} older dataset versions", keysRange.size());
          datasetVersionsStore.bulkDelete(keysRange);
        }
 
        // Reset range.
        startPath = endPath;
        versionsInRange = 0;
      }
    }
  }

说明

CatalogMaintenanceService 是新添加的服务模块,对于release note 的信息集合源码看会更加清晰

参考资料

dac/backend/src/main/java/com/dremio/dac/service/catalog/CatalogMaintenanceRunnableProvider.java
services/catalog/src/main/java/com/dremio/catalog/CatalogMaintenanceService.java
dac/backend/src/main/java/com/dremio/dac/service/datasets/DatasetVersionMutator.java
dac/backend/src/main/java/com/dremio/dac/service/datasets/DatasetVersionTrimmer.java
dac/backend/src/main/java/com/dremio/dac/daemon/DACDaemonModule.java

标签:pathsWithCounts,dremio,java,dac,versions,getKey,CatalogMaintenanceService,简单
From: https://www.cnblogs.com/rongfengliang/p/18132254

相关文章

  • react + antd + js 简单Cron组件,支持国际化
    Cron.jsimportReact,{Fragment,useState,useCallback,useRef,useEffect}from'react';import{Select,TimePicker,Input}from'antd';constOption=Select.Option;constmwidth80={minWidth:80,marginRight:10};constwidt......
  • 前端导出简单的Excel
    //报表导出exportProjectCount:asyncfunction(){letthat=this;awaitthat.getProjectCount().then(()=>{console.log("日志输出",that.dataCount.length)letdataLi......
  • PCIE思考:简单路由
    上电:主机设备上电,BIOS通过扫描下游设备的BAR,为其注册响应的空间,当需要对这些空间进行操作的时候,就会转换成TLP包的形式进行访问,当然直接和PCIE设备交互的还是RC;其中BAR的低位(具体情况具体分析)作为寻址其的地址;简单DMA读步骤(PCIE设备发起读):1.下游设备发起请求;2.CPU把数据写到......
  • .net core web项目在docker中简单部署方法
    #这是我的dockerFile文件FROMmcr.microsoft.com/dotnet/aspnet:8.0ASbaseUSERappWORKDIR/appEXPOSE8080EXPOSE8081#设置入口点CMD["dotnet","ImageCreate.dll"]#ImageCreate.dll代表你的应用 docker-compose.yml version:'3'servic......
  • nicegui:Python 图形界面库,简单好用
    前言在现代计算机应用程序开发中,图形用户界面(GUI)是用户与程序交互的重要组成部分。然而,GUI开发往往需要大量的代码和复杂的布局,给开发者带来了一定的挑战。在本篇博文中,将介绍nicegui,它是一个简单易用的图形用户界面库,提供了一种简化GUI开发的方式,使开发者能够更快速地构建吸......
  • dremio-stress dremio 压力测试工具
    dremio-stress是基于rest埃及jdbc的dremio压力测试工具,实现上相对简单,并不是比较复杂的东西,可以结合一些业务场景使用同时建议和dremio-diagnostic-collector配合起来说明此工具应该并非官方的,实际如果需要进行大规模测试基于tpc-h会更加标准参考资料https://github.com......
  • dbt plugin 系统简单说明
    dbt实际上提供了一个plugin架构(属于扩展与adapter的plugin机制是不一样的)只是目前官方缺少文档的说明以下是一些简单说明内部处理插件接口定义目前相对简单,只提供了核心是3个方法initialize,get_nodes,get_manifest_artifactsclassdbtPlugin:"""......
  • nicegui:Python 图形界面库,简单好用
    前言在现代计算机应用程序开发中,图形用户界面(GUI)是用户与程序交互的重要组成部分。然而,GUI开发往往需要大量的代码和复杂的布局,给开发者带来了一定的挑战。在本篇博文中,将介绍nicegui,它是一个简单易用的图形用户界面库,提供了一种简化GUI开发的方式,使开发者能够更快速地构建吸......
  • Altium PCB添加平衡铜/盗铜的方法(依旧是简单粗暴)
    最近画的板子遇到了PCB残铜率不足的问题,一般想法也是用整板覆铜的方法来填满空旷的区域,但是这个会带来很多碎铜,特别是表层有元器件,覆铜会产生更多碎铜,但是不覆铜又会导致残铜率低,板厂的说法是残铜率过低会导致PCB外层电镀时电流不均衡,后果就是铜箔厚度不均匀,内层残铜率过低会影响......
  • 矩阵之间的关系简单整理
    等价:可以通过初等变换互相转化的矩阵。当A和B为同型矩阵,且r(A)=r(B)时,A,B一定等价。相似:\(P^{-1}AP=B\)。本质是基坐标转换,表示在不同坐标系下效果相同的线性变换过程。P为基坐标转换矩阵,是新基向量按列排列形成的矩阵。重要性质(原理):A与B相似,则A与B有相同的特征值(亦有相同的迹与......