SchedulerService内部调度服务算是一个比较重要的模块,比如dremio的功能都依赖此模块(元数据获取,一些数据清理任务,反射加速)
参考实现子类
SchedulerService 实现也比较多,因为dremio 集群中的节点有多种角色,为了保证数据的一致性会对于不同集群角色的节点进行不同的处理
如下图
简单说明:
ModifiableSchedulerService: 字面意思也很清晰,就是支持调度任务的修改,支持添加任务组,以及对于任务组进行修改
LocalSchedulerService: 实际上就是统一由一个节点进行调度的,为了支持修改也扩展了ModifiableSchedulerService
参考初始化,可以看出就是我们实际日常部署的dremio 集群模式
if (isLeaderlessScheduler) {
final ScheduleTaskGroup defaultTaskGroup = ScheduleTaskGroup.create("clustered-singleton-default", capacity);
ModifiableSchedulerService distributedSchedulerService = null;
if (isDistributedCoordinator) {
distributedSchedulerService = new ClusteredSingletonTaskScheduler(defaultTaskGroup, getRootName(),
clusterCoordinatorProvider, currentEndPoint);
}
final SchedulerService localSchedulerService = new SimpleLocalSchedulerService(capacity);
ModifiableSchedulerService scheduler;
scheduler = new RoutingSchedulerService(clusterServiceSetManagerProvider, localSchedulerService,
distributedSchedulerService);
registry.bind(SchedulerService.class, scheduler);
registry.bind(ModifiableSchedulerService.class, scheduler);
} else {
registry.bind(SchedulerService.class, new LocalSchedulerService(
capacity, clusterServiceSetManagerProvider, clusterElectionManagerProvider, currentEndPoint,
isDistributedCoordinator));
}
ClusteredSingletonTaskScheduler、ModifiableLocalSchedulerService、ModifiableWrappedSchedulerService、RoutingSchedulerService 等都实现了此接口
ClusteredSingletonTaskScheduler: 核心是对于一个任务只能在集群级别一个服务实例运行,主要应该场景是对于部署的dremio 环境不master的分布式模式
参考处理
if (isLeaderlessScheduler) {
final ScheduleTaskGroup defaultTaskGroup = ScheduleTaskGroup.create("clustered-singleton-default", capacity);
ModifiableSchedulerService distributedSchedulerService = null;
if (isDistributedCoordinator) {
distributedSchedulerService = new ClusteredSingletonTaskScheduler(defaultTaskGroup, getRootName(),
clusterCoordinatorProvider, currentEndPoint);
}
final SchedulerService localSchedulerService = new SimpleLocalSchedulerService(capacity);
ModifiableSchedulerService scheduler;
scheduler = new RoutingSchedulerService(clusterServiceSetManagerProvider, localSchedulerService,
distributedSchedulerService);
registry.bind(SchedulerService.class, scheduler);
registry.bind(ModifiableSchedulerService.class, scheduler);
} else {
registry.bind(SchedulerService.class, new LocalSchedulerService(
capacity, clusterServiceSetManagerProvider, clusterElectionManagerProvider, currentEndPoint,
isDistributedCoordinator));
}
ModifiableLocalSchedulerService: 实现了ModifiableLocalSchedulerService以及LocalSchedulerService 支持基于配置的线程池修改
主要是在ModifiableWrappedSchedulerService 中使用
RoutingSchedulerService:就是支持路由的,可以路由到不同调度服务,本地或者集群单一的调度服务
SimpleLocalSchedulerService:任务的执行调度只能在本地,从上边ClusteredSingletonTaskScheduler部分的使用以及RoutingSchedulerService 创建可以看出是统一基于RoutingSchedulerService管理的
ModifiableWrappedSchedulerService: 主要是一个包装
使用的地方,可以看出主要是元数据刷新调度的
protected ModifiableSchedulerService getMetadataRefreshModifiableScheduler(SingletonRegistry registry,
boolean isDistributedCoordinator,
boolean isLeaderless) {
return new ModifiableWrappedSchedulerService(
ExecConstants.MAX_CONCURRENT_METADATA_REFRESHES.getDefault().getNumVal().intValue(),
isDistributedCoordinator,
"metadata-refresh-modifiable-scheduler-",
() -> registry.provider(ClusterCoordinator.class).get(),
() -> registry.provider(ClusterCoordinator.class).get(),
() -> registry.provider(SabotContext.class).get().getEndpoint(),
ExecConstants.MAX_CONCURRENT_METADATA_REFRESHES,
registry.provider(OptionManager.class),
() -> registry.provider(ModifiableSchedulerService.class).get(),
isLeaderless);
}
内部对于使用的调度是基于集群是支持leader的选举配置创建的
private ModifiableSchedulerService createModifiableSchedulerService() {
if (leaderlessEnabled) {
LOGGER.info("Using leaderless Clustered singleton");
final ModifiableSchedulerService service = schedulerServiceProvider.get();
service.addTaskGroup(ScheduleTaskGroup.create(taskGroupName, initialCapacity));
return service;
} else {
LOGGER.info("Using leader election based Clustered singleton");
return new ModifiableLocalSchedulerService(
initialCapacity,
taskGroupName,
clusterServiceSetManagerProvider,
clusterElectionManagerProvider,
currentNodeProvider,
isDistributedCoordinator,
option,
optionManagerProvider);
}
}
调度任务的执行
调度服务的核心是对于定义的调度策略对于特定任务的执行
- Schedule
Schedule 的定义主要包含了调度的执行模式,比如定时周期执行类的任务,以及在什么啥时候触发执行(前置条件) - 主要使用调度的模块
如下图,可以看到是不少模块都使用了,比如反射加速,namespace,catalog 服务,source 的元数据管理,清理job 结果,系统配置同步
本地job 制定状态的监控,nessie repo的维护,sql 运行session 清理,系统统计服务,系统iceberg 表清理,移除获取token 任务
说明
对于任务执行部分,我只是简单说明了下,可以结合上图,结合实际源码了解不同服务模块是如何使用调度服务的,通过以上简单说明,我们可以看出
dremio的调度服务还是很重要的,不少服务都有使用
参考资料
services/scheduler/src/main/java/com/dremio/service/scheduler/SchedulerService.java
dac/backend/src/main/java/com/dremio/dac/daemon/DACDaemonModule.java
services/scheduler/src/main/java/com/dremio/service/scheduler/Schedule.java