dremio 对于比较大的处理(聚合函数操作可能会触发spill,同时会方法临时文件中),对于比较频繁的job 查询可能会有不少临时文件
TemporaryFolderManager核心是为了进行这些临时文件的管理,包括清理,创建,已经对于临时异常执行器节点的文件处理,对于清理
处理dremio 没有使用自己的开发的那个任务调度,而是简单的基于java ScheduledThreadPoolExecutor 扩展的CloseableSchedulerThreadPool
同时为了方便防止误删,对于临时文件的创建基于了自己的一个格式,以下是一些简单说明
临时文件创建
DefaultTemporaryFolderManager createTmpDirectory 方法
public Path createTmpDirectory(Path rootPath) throws IOException {
if (closed) {
throw new IOException("Temporary Folder Manager already closed");
}
if (thisExecutor == null || thisExecutor.get() == null) {
// retain old behaviour
return rootPath;
}
final String prefix = thisExecutor.get().toPrefix(purpose);
logger.info("Registering path '{}' for temporary file monitoring and cleanup", rootPath.toString());
List<Path> pathsToDelete = new ArrayList<>();
fsWrapper.visitDirectory(rootPath, prefix, fileStatus -> {
if (fileStatus.isDirectory()) {
try {
final long incarnation = Long.parseLong(fileStatus.getPath().getName());
if (incarnation > Instant.EPOCH.toEpochMilli() && incarnation < Instant.now().toEpochMilli()) {
pathsToDelete.add(fileStatus.getPath());
} else {
logger.warn("Not deleting old incarnation {} as it is too recent or invalid", incarnation);
}
} catch (NumberFormatException ignored) {
logger.debug("Ignoring directory {} as it is not a valid incarnation", fileStatus.getPath().getName());
}
}
});
if (!pathsToDelete.isEmpty()) {
// 一次性任务,判断最近访问时间以及修改时间与允许的过期时间,默认是90s
this.oneShotTask = executorService.schedule(() -> doCleanupOneShot(pathsToDelete),
cleanupConfig.getOneShotCleanupDelaySeconds(), TimeUnit.SECONDS);
}
// 创建临时文件加
final Path newIncarnation = fsWrapper.createTmpDirectory(rootPath, prefix);
if (folderMonitor != null) {
// 对于文件进行监控
folderMonitor.startMonitoring(rootPath);
}
return newIncarnation;
}
临时文件夹格式如下,实际上是 purpose_hostname_port
模式,目的是创建的原因,主要主要是spilling
文件监控处理
DefaultTemporaryFolderManager 构造函数中
if (availableExecutors != null) {
// if available executors can be monitored, start a background task to clean up on dead executors.
//
this.folderMonitor = new TemporaryFolderMonitor(thisExecutor, availableExecutors, purpose,
cleanupConfig.getStalenessLimitSeconds(), cleanupConfig.getMinUnhealthyCyclesBeforeDelete(), fsWrapper);
} else {
this.folderMonitor = null;
}
临时文件清理处理
DefaultTemporaryFolderManager 类中,是由SpillService 调用的
- 参考处理
public void startMonitoring() {
if (folderMonitor != null) {
final int increment = (cleanupConfig.getCleanupDelayMaxVariationSeconds() > 0) ?
(int) (Instant.now().toEpochMilli() * Thread.currentThread().getId())
% cleanupConfig.getCleanupDelayMaxVariationSeconds() : 0;
final int delay = cleanupConfig.getCleanupDelaySeconds() + increment;
logger.debug("Starting folder monitoring for cleanup with {} seconds as interval", delay);
// 基于固定频率的调度处理
this.monitorTask = executorService.scheduleAtFixedRate(folderMonitor::doCleanupOther, delay, delay,
TimeUnit.SECONDS);
}
}
- SpillService 调用
SpillService start 部分处理的,包含了创建,清理,健康检测
public void start() throws Exception {
// TODO: Implement the following:
// TODO: 1. global pool of compression buffers
// TODO: 2. pool of I/O completion threads (Note: for local FS only)
// TODO: 3. create the spill filesystem adapter
for (String spillDir : this.spillDirs) {
try {
final Path spillDirPath = new Path(spillDir);
final FileSystem fileSystem = spillDirPath.getFileSystem(getSpillingConfig());
healthCheckEnabled = healthCheckEnabled || isHealthCheckEnabled(fileSystem.getUri().getScheme());
} catch (Exception ignored) {}
}
// healthySpillDirs set at start()
this.healthySpillDirs = Lists.newArrayList();
this.monitoredSpillDirectoryMap = new ConcurrentHashMap<>();
final Supplier<Set<ExecutorId>> nodesConverter =
(nodesProvider == null) ? null : () -> convertEndpointsToId(nodesProvider);
final Supplier<ExecutorId> identityConverter =
(identityProvider == null) ? null : () -> convertEndpointToId(identityProvider);
this.folderManager = new DefaultTemporaryFolderManager(identityConverter, getSpillingConfig(), nodesConverter,
TEMP_FOLDER_PURPOSE);
minDiskSpace = options.minDiskSpace();
minDiskSpacePercentage = options.minDiskSpacePercentage();
healthCheckInterval = options.healthCheckInterval();
healthCheckEnabled = healthCheckEnabled && options.enableHealthCheck();
spillSweepInterval = options.spillSweepInterval();
spillSweepThreshold = options.spillSweepThreshold();
// 清理处理任务
folderManager.startMonitoring();
// Create spill directories, in case it doesn't already exist
assert healthySpillDirs.isEmpty();
for (String spillDir : this.spillDirs) {
try {
final Path spillDirPath = new Path(spillDir);
final FileSystem fileSystem = spillDirPath.getFileSystem(getSpillingConfig());
if (fileSystem.exists(spillDirPath) || fileSystem.mkdirs(spillDirPath, PERMISSIONS)) {
// 创建
monitoredSpillDirectoryMap.put(spillDir, folderManager.createTmpDirectory(spillDirPath));
if (healthCheckEnabled) {
healthySpillDirs.add(spillDir);
}
} else {
logger.warn("Unable to find or create spill directory {} due to lack of permissions", spillDir);
}
} catch (Exception e) {
logger.info("Sub directory creation in spill directory {} hit a temporary error `{}` " +
"and is not added to healthy list. Will monitor periodically", spillDir, e.getMessage());
}
}
// 健康检测,此方法中的数据主要在SpillManager 中使用
if (healthCheckEnabled) {
healthCheckTask = schedulerService.get()
.schedule(Schedule.Builder
.everyMillis(healthCheckInterval)
.startingAt(Instant.now())
.build(),
new SpillHealthCheckTask()
);
}
}
说明
以上只是一个简单说明,对于临时文件的直接操作是通过SpillManager,SpillManager 会调用SpillService 服务包装的文件操作
(实际上还有一个BoostBufferManager 主要是对于arrow cache 的处理),如下图是一些操作(后边再介绍,里边比较复杂,关联到
不关于流控,以及数据执行的一些细节)
参考资料
services/spill/src/main/java/com/dremio/common/io/TemporaryFolderManager.java
services/spill/src/main/java/com/dremio/common/io/DefaultTemporaryFolderManager.java
services/spill/src/main/java/com/dremio/service/spill/SpillService.java
sabot/kernel/src/main/java/com/dremio/sabot/op/sort/external/SpillManager.java
services/spill/src/main/java/com/dremio/exec/store/LocalSyncableFileSystem.java
services/spill/src/main/java/com/dremio/common/io/TemporaryFolderMonitor.java
common/legacy/src/main/java/com/dremio/common/concurrent/CloseableSchedulerThreadPool.java
services/spill/src/main/java/com/dremio/common/io/ExecutorId.java
sabot/kernel/src/main/java/com/dremio/sabot/op/sort/external/SpillManager.java
sabot/kernel/src/main/java/com/dremio/exec/store/parquet/BoostBufferManager.java