首页 > 其他分享 >dremio TemporaryFolderManager 简单说明

dremio TemporaryFolderManager 简单说明

时间:2024-03-13 09:03:36浏览次数:20  
标签:TemporaryFolderManager dremio java spill 简单 main com final

dremio 对于比较大的处理(聚合函数操作可能会触发spill,同时会方法临时文件中),对于比较频繁的job 查询可能会有不少临时文件
TemporaryFolderManager核心是为了进行这些临时文件的管理,包括清理,创建,已经对于临时异常执行器节点的文件处理,对于清理
处理dremio 没有使用自己的开发的那个任务调度,而是简单的基于java ScheduledThreadPoolExecutor 扩展的CloseableSchedulerThreadPool
同时为了方便防止误删,对于临时文件的创建基于了自己的一个格式,以下是一些简单说明

临时文件创建

DefaultTemporaryFolderManager createTmpDirectory 方法

public Path createTmpDirectory(Path rootPath) throws IOException {
    if (closed) {
      throw new IOException("Temporary Folder Manager already closed");
    }
    if (thisExecutor == null || thisExecutor.get() == null) {
      // retain old behaviour
      return rootPath;
    }
    final String prefix = thisExecutor.get().toPrefix(purpose);
    logger.info("Registering path '{}' for temporary file monitoring and cleanup", rootPath.toString());
    List<Path> pathsToDelete = new ArrayList<>();
    fsWrapper.visitDirectory(rootPath, prefix, fileStatus -> {
      if (fileStatus.isDirectory()) {
        try {
          final long incarnation = Long.parseLong(fileStatus.getPath().getName());
          if (incarnation > Instant.EPOCH.toEpochMilli() && incarnation < Instant.now().toEpochMilli()) {
            pathsToDelete.add(fileStatus.getPath());
          } else {
            logger.warn("Not deleting old incarnation {} as it is too recent or invalid", incarnation);
          }
        } catch (NumberFormatException ignored) {
          logger.debug("Ignoring directory {} as it is not a valid incarnation", fileStatus.getPath().getName());
        }
      }
    });
    if (!pathsToDelete.isEmpty()) {
      // 一次性任务,判断最近访问时间以及修改时间与允许的过期时间,默认是90s
      this.oneShotTask = executorService.schedule(() -> doCleanupOneShot(pathsToDelete),
        cleanupConfig.getOneShotCleanupDelaySeconds(), TimeUnit.SECONDS);
    }
    // 创建临时文件加
    final Path newIncarnation = fsWrapper.createTmpDirectory(rootPath, prefix);
    if (folderMonitor != null) {
      // 对于文件进行监控
      folderMonitor.startMonitoring(rootPath);
    }
    return newIncarnation;
  }

临时文件夹格式如下,实际上是 purpose_hostname_port 模式,目的是创建的原因,主要主要是spilling

文件监控处理
DefaultTemporaryFolderManager 构造函数中

if (availableExecutors != null) {
  // if available executors can be monitored, start a background task to clean up on dead executors.
  // 
  this.folderMonitor = new TemporaryFolderMonitor(thisExecutor, availableExecutors, purpose,
    cleanupConfig.getStalenessLimitSeconds(), cleanupConfig.getMinUnhealthyCyclesBeforeDelete(), fsWrapper);
} else {
  this.folderMonitor = null;
}

临时文件清理处理

DefaultTemporaryFolderManager 类中,是由SpillService 调用的

  • 参考处理
public void startMonitoring() {
  if (folderMonitor != null) {
    final int increment = (cleanupConfig.getCleanupDelayMaxVariationSeconds() > 0) ?
      (int) (Instant.now().toEpochMilli() * Thread.currentThread().getId())
        % cleanupConfig.getCleanupDelayMaxVariationSeconds() : 0;
    final int delay = cleanupConfig.getCleanupDelaySeconds() + increment;
    logger.debug("Starting folder monitoring for cleanup with {} seconds as interval", delay);
   //  基于固定频率的调度处理
    this.monitorTask = executorService.scheduleAtFixedRate(folderMonitor::doCleanupOther, delay, delay,
      TimeUnit.SECONDS);
  }
}
  • SpillService 调用
    SpillService start 部分处理的,包含了创建,清理,健康检测
    public void start() throws Exception {
    // TODO: Implement the following:
    // TODO: 1. global pool of compression buffers
    // TODO: 2. pool of I/O completion threads (Note: for local FS only)
    // TODO: 3. create the spill filesystem adapter
     
    for (String spillDir : this.spillDirs) {
      try {
        final Path spillDirPath = new Path(spillDir);
        final FileSystem fileSystem = spillDirPath.getFileSystem(getSpillingConfig());
        healthCheckEnabled = healthCheckEnabled || isHealthCheckEnabled(fileSystem.getUri().getScheme());
      } catch (Exception ignored) {}
    }
     
    // healthySpillDirs set at start()
    this.healthySpillDirs = Lists.newArrayList();
    this.monitoredSpillDirectoryMap = new ConcurrentHashMap<>();
    final Supplier<Set<ExecutorId>> nodesConverter =
      (nodesProvider == null) ? null : () -> convertEndpointsToId(nodesProvider);
    final Supplier<ExecutorId> identityConverter =
      (identityProvider == null) ? null : () -> convertEndpointToId(identityProvider);
    this.folderManager = new DefaultTemporaryFolderManager(identityConverter, getSpillingConfig(), nodesConverter,
      TEMP_FOLDER_PURPOSE);
     
    minDiskSpace = options.minDiskSpace();
    minDiskSpacePercentage = options.minDiskSpacePercentage();
    healthCheckInterval = options.healthCheckInterval();
    healthCheckEnabled = healthCheckEnabled && options.enableHealthCheck();
    spillSweepInterval = options.spillSweepInterval();
    spillSweepThreshold = options.spillSweepThreshold();
    // 清理处理任务
    folderManager.startMonitoring();
     
    // Create spill directories, in case it doesn't already exist
    assert healthySpillDirs.isEmpty();
    for (String spillDir : this.spillDirs) {
      try {
        final Path spillDirPath = new Path(spillDir);
        final FileSystem fileSystem = spillDirPath.getFileSystem(getSpillingConfig());
        if (fileSystem.exists(spillDirPath) || fileSystem.mkdirs(spillDirPath, PERMISSIONS)) {
         // 创建
          monitoredSpillDirectoryMap.put(spillDir, folderManager.createTmpDirectory(spillDirPath));
          if (healthCheckEnabled) {
            healthySpillDirs.add(spillDir);
          }
        } else {
          logger.warn("Unable to find or create spill directory {} due to lack of permissions", spillDir);
        }
      } catch (Exception e) {
        logger.info("Sub directory creation in spill directory {} hit a temporary error `{}` " +
          "and is not added to healthy list. Will monitor periodically", spillDir, e.getMessage());
      }
    }
    // 健康检测,此方法中的数据主要在SpillManager 中使用
    if (healthCheckEnabled) {
      healthCheckTask = schedulerService.get()
        .schedule(Schedule.Builder
            .everyMillis(healthCheckInterval)
            .startingAt(Instant.now())
            .build(),
          new SpillHealthCheckTask()
        );
    }
    }

说明

以上只是一个简单说明,对于临时文件的直接操作是通过SpillManager,SpillManager 会调用SpillService 服务包装的文件操作
(实际上还有一个BoostBufferManager 主要是对于arrow cache 的处理),如下图是一些操作(后边再介绍,里边比较复杂,关联到
不关于流控,以及数据执行的一些细节)

参考资料

services/spill/src/main/java/com/dremio/common/io/TemporaryFolderManager.java
services/spill/src/main/java/com/dremio/common/io/DefaultTemporaryFolderManager.java
services/spill/src/main/java/com/dremio/service/spill/SpillService.java
sabot/kernel/src/main/java/com/dremio/sabot/op/sort/external/SpillManager.java
services/spill/src/main/java/com/dremio/exec/store/LocalSyncableFileSystem.java
services/spill/src/main/java/com/dremio/common/io/TemporaryFolderMonitor.java
common/legacy/src/main/java/com/dremio/common/concurrent/CloseableSchedulerThreadPool.java
services/spill/src/main/java/com/dremio/common/io/ExecutorId.java
sabot/kernel/src/main/java/com/dremio/sabot/op/sort/external/SpillManager.java
sabot/kernel/src/main/java/com/dremio/exec/store/parquet/BoostBufferManager.java

标签:TemporaryFolderManager,dremio,java,spill,简单,main,com,final
From: https://www.cnblogs.com/rongfengliang/p/18025551

相关文章

  • 设计模式 -- 1:简单工厂模式
    目录代码记录代码部分代码记录设计模式的代码注意要运用到面向对象的思想考虑到紧耦合和松耦合把具体的操作类分开不让其互相影响(注意这点)下面是UML类图代码部分#include<iostream>#include<memory>//引入智能指针的头文件usingnamespacestd;......
  • 实验1 C语言输入输出和简单程序编写
    实验任务11_11#include<stdio.h>2intmain()3{4printf("O\n");5printf("<H>\n");6printf("II\n");78printf("O\n");9printf("<H>\n");10......
  • 实验1 C语言输入输出和简单程序编写
    1#include<stdio.h>usingnamespacestd;intmain(){ printf("O\n"); printf("<H>\n"); printf("II\n"); return0;}2#include<stdio.h>usingnamespacestd;intmain(){floata,b,c; scanf......
  • nodejs集成C++代码:手写简单的addon
    文章目录nodejs与node-gyp手写一个简单的addon编写一个简单的binding.gyp文件编写C++源文件V8版本:addon.ccnapi_api版本:addon_api.cc编译命令JS调用在这个专栏里,已经提到过web系统中c++的两大应用场景了:assembly和cef框架的应用,这两个可以说都是客户......
  • python singledispatch 使用简单说明
    singledispatch可以实现类似方法的范型能力,以下是使用的简单说明方法参考代码fromfunctoolsimportsingledispatch@singledispatchdefadd(a,b):returnf"default---{a}-{b}" @add.registerdef_(a:int,b:int)->int:returna+b......
  • Android RecyclerView的使用(以实现一个简单的动态聊天界面为例)
    RecycleView可以实现动态列表的功能,毕竟在实际开发中大多数情况下不可能提前知道一个列表要塞进去多少东西。比如说QQ微信的聊天栏界面,可以抽象成一个RecycleView(或者一个ListView),没人说话时列表为空,你发一句话我发一句话,这列表就长起来了。再或者像是一些管理系统里,每一个物品......
  • 简单的 ping 测试
     以下命令将模拟网络负载,相当于机器人由FCI控制的场景:sudoping<fci-ip>-i0.001-D-c10000-s1200示例输出:PING<fci-ip>1200(1228)bytesofdata.[1500982522.977579]1208bytesfrom<fci-ip>:icmp_seq=1ttl=64time=0.279ms[1500982522.978423]......
  • 简单的 ping 测试
    以下命令将模拟网络负载,相当于机器人由FCI控制的场景:sudoping<fci-ip>-i0.001-D-c10000-s1200示例输出:PING<fci-ip>1200(1228)bytesofdata.[1500982522.977579]1208bytesfrom<fci-ip>:icmp_seq=1ttl=64time=0.279ms[1500982522.978423]1208......
  • 自己简单实现一个线程池
    线程池Java中的线程池是运用最多的并发框架,几乎所有需要异步或并发执行任务的程序都可以使用线程池。线程池的好处/作用:1.不需要频繁的创建和销毁线程,提高相应速度2.管理线程,避免无休止的创建线程导致资源枯竭。(不会每来一个任务就创建一个线程,线程不断的取任务执行)线程池......
  • 实验1 C语言输入输出和简单程序编写
    task1_1.c`#include<stdio.h>include<stdlib.h>intmain(){printf("O\n");printf("\n");printf("II\n");printf("O\n");printf("\n");printf("II\n");system("pause&......