首页 > 其他分享 >DolphinScheduler心脏:Quartz的定时任务调度框架深度解析

DolphinScheduler心脏:Quartz的定时任务调度框架深度解析

时间:2024-11-18 14:30:29浏览次数:1  
标签:Quartz schedule DolphinScheduler scheduleObj command QRTZ 任务调度 TODO

Quartz是一个开源的Java作业调度框架,它提供了强大的定时任务调度功能。在DolphinScheduler中,Quartz用于实现定时任务的调度和管理。DolphinScheduler通过QuartzExecutorImpl类与Quartz集成,将工作流及其定时管理操作与Quartz调度框架相结合,实现任务的调度执行。

本文将详细剖析Quartz的原理机制,以及在Dolphinscheduler中使用Quartz的原理。

Quartz ER图

file

  1. QRTZ_JOB_DETAILSQRTZ_TRIGGERS 是中心表,定义了任务与触发器之间的关系;
  2. QRTZ_TRIGGERS 表通过外键关联了多个触发器类型表,如 QRTZ_SIMPLE_TRIGGERSQRTZ_CRON_TRIGGERS,用于实现不同类型的触发方式;
  3. QRTZ_FIRED_TRIGGERS 用于记录每次任务执行的历史,与任务和触发器表都有关联;
  4. QRTZ_CALENDARS 用于定义触发器的日历排除规则,QRTZ_PAUSED_TRIGGER_GRPS 用于管理触发器组的暂停状态;
  5. QRTZ_SCHEDULER_STATEQRTZ_LOCKS 主要用于集群环境中的任务调度协调,确保高可用性。

Dolphinscheduler Quartz使用

新建SHELL任务

file

流程定义上线并配置调度

file
file

定时上线

file

流程实例运行结果

file

原理剖析

创建调度

org.apache.dolphinscheduler.api.controller.SchedulerController#createSchedule
--org.apache.dolphinscheduler.api.service.impl.SchedulerServiceImpl#insertSchedule
....
Schedule scheduleObj = new Schedule();
Date now = new Date();
scheduleObj.setTenantCode(tenantCode);
scheduleObj.setProjectName(project.getName());
scheduleObj.setProcessDefinitionCode(processDefineCode);
scheduleObj.setProcessDefinitionName(processDefinition.getName());
ScheduleParam scheduleParam = JSONUtils.parseObject(schedule, ScheduleParam.class);
scheduleObj.setCrontab(scheduleParam.getCrontab());
scheduleObj.setTimezoneId(scheduleParam.getTimezoneId());
scheduleObj.setWarningType(warningType);
scheduleObj.setWarningGroupId(warningGroupId);
scheduleObj.setFailureStrategy(failureStrategy);
scheduleObj.setCreateTime(now);
scheduleObj.setUpdateTime(now);
scheduleObj.setUserId(loginUser.getId());
scheduleObj.setUserName(loginUser.getUserName());
scheduleObj.setReleaseState(ReleaseState.OFFLINE);
scheduleObj.setProcessInstancePriority(processInstancePriority);
scheduleObj.setWorkerGroup(workerGroup);
scheduleObj.setEnvironmentCode(environmentCode);
scheduleMapper.insert(scheduleObj);
....

核心其实就是向 schedule 表中插入了一条数据而已,如下 :
file

调度上线

org.apache.dolphinscheduler.api.controller.SchedulerController#publishScheduleOnline
--org.apache.dolphinscheduler.api.service.impl.SchedulerServiceImpl#onlineScheduler
----org.apache.dolphinscheduler.api.service.impl.SchedulerServiceImpl#doOnlineScheduler
------org.apache.dolphinscheduler.scheduler.quartz.QuartzScheduler#insertOrUpdateScheduleTask

精简代码如下 : 
// TODO 使用schedule id和projectId封装 JobKey,比如jobName=job_25(schedulerId),jobGroup=jobgroup_1(projectId)
JobKey jobKey = QuartzTaskUtils.getJobKey(schedule.getId(), projectId);
// TODO 使用projectId和schedule封装jobDataMap,里面封装的是projectId、scheduleId和schedule(JSON存储)
Map<String, Object> jobDataMap = QuartzTaskUtils.buildDataMap(projectId, schedule);
// TODO 获取cron表达式
String cronExpression = schedule.getCrontab();
// TODO 获取时区
String timezoneId = schedule.getTimezoneId();

// TODO 定时调度的开启时间
Date startDate = DateUtils.transformTimezoneDate(schedule.getStartTime(), timezoneId);
// TODO 定时调度的结束时间
Date endDate = DateUtils.transformTimezoneDate(schedule.getEndTime(), timezoneId);
jobDetail jobDetail = newJob(ProcessScheduleTask.class).withIdentity(jobKey).build();
jobDetail.getJobDataMap().putAll(jobDataMap);
// TODO 创建一个Job
scheduler.addJob(jobDetail, false, true);

// TODO 封装Trigger
TriggerKey triggerKey = new TriggerKey(jobKey.getName(), jobKey.getGroup());
CronTrigger cronTrigger = newTrigger()
                    .withIdentity(triggerKey)
                    .startAt(startDate)
                    .endAt(endDate)
                    .withSchedule(
                            cronSchedule(cronExpression)
                                    .withMisfireHandlingInstructionIgnoreMisfires()
                                    .inTimeZone(DateUtils.getTimezone(timezoneId)))
                    .forJob(jobDetail).build();

// TODO 开始调度
scheduler.scheduleJob(cronTrigger);

对应的表

存储每个任务的详细信息
file

存储触发器的基本信息,是所有触发器类型的父表
file

存储 Cron 表达式触发器(Cron Trigger)的信息
file

调度执行

org.apache.dolphinscheduler.scheduler.quartz.ProcessScheduleTask,这个类是 qrtz_job_details 中的 JOB_CLASS_NAME 字段

protected void executeInternal(JobExecutionContext context) {
    JobDataMap dataMap = context.getJobDetail().getJobDataMap();

    int projectId = dataMap.getInt(QuartzTaskUtils.PROJECT_ID);
    int scheduleId = dataMap.getInt(QuartzTaskUtils.SCHEDULE_ID);

    Date scheduledFireTime = context.getScheduledFireTime();

    Date fireTime = context.getFireTime();

    Command command = new Command();
    command.setCommandType(CommandType.SCHEDULER);
    command.setExecutorId(schedule.getUserId());
    command.setFailureStrategy(schedule.getFailureStrategy());
    command.setProcessDefinitionCode(schedule.getProcessDefinitionCode());
    command.setScheduleTime(scheduledFireTime);
    command.setStartTime(fireTime);
    command.setWarningGroupId(schedule.getWarningGroupId());
    String workerGroup = StringUtils.isEmpty(schedule.getWorkerGroup()) ? Constants.DEFAULT_WORKER_GROUP
            : schedule.getWorkerGroup();
    command.setWorkerGroup(workerGroup);
    command.setTenantCode(schedule.getTenantCode());
    command.setEnvironmentCode(schedule.getEnvironmentCode());
    command.setWarningType(schedule.getWarningType());
    command.setProcessInstancePriority(schedule.getProcessInstancePriority());
    command.setProcessDefinitionVersion(processDefinition.getVersion());

    commandService.createCommand(command);
}

说白了,这个就是quartz的一个回调函数,最终生成Command。

转载自Journey
原文链接:https://segmentfault.com/a/1190000045471756

本文由 白鲸开源 提供发布支持!

标签:Quartz,schedule,DolphinScheduler,scheduleObj,command,QRTZ,任务调度,TODO
From: https://www.cnblogs.com/DolphinScheduler/p/18552582

相关文章

  • 10月月报 | Apache DolphinScheduler进展总结
    各位热爱ApacheDolphinScheduler的小伙伴们,社区10月份月报更新啦!这里将记录DolphinScheduler社区每月的重要更新,欢迎关注!月度Merge之星感谢以下小伙伴10月份为ApacheDolphinScheduler所做的精彩贡献(排名不分先后):@shouwangyw,@liunaijie,@binitshrest,@wangxj3,@Sblood......
  • Java学习之定时任务框架Quartz
    Quartz的核心类有以下三部分:Job:需要实现的任务类,实现execute()方法,执行后完成任务。Trigger:包括SimpleTrigger和CronTrigger。Scheduler:任务调度器,负责基于Trigger触发器,来执行Job任务。简单示例1.pom配置 2.创建JobpublicclassEplJobimplementsJob{......
  • Quartz集群增强版_01.集群及缺火处理(ClusterMisfireHandler)
    Quartz集群增强版_01.集群及缺火处理(ClusterMisfireHandler)转载请著名出处https://www.cnblogs.com/funnyzpc/p/18542452主要目的应用(app)与节点(node)状态同步不管是node还是app,都可以通过对应state来控制节点及整个应用的启停,这是很重要的功能,同时对于集群/缺......
  • Quartz集群增强版_00.How to use?(如何使用)
    Quartz集群增强版_00.Howtouse?(如何使用)转载请著名出处https://www.cnblogs.com/funnyzpc/p/18540378开源地址https://github.com/funnyzpc/quartz表的基本结构    总的来说任务的配置及开发基本遵从上图的表的基本关系,除app以及node之外均需要手动手动配置......
  • SpringBoot+ElasticJob实现分布式任务调度
    目录1相关简介2Zookeeper的Docker安装3Zookeeper的Windows版本安装4Zookeeper图形化客户端prettyZoo5示例代码6添加任务监听器7参考资料(感谢)1相关简介zookeeper:开源分布式应用程序协调服务下载地址:https://archive.apache.org/dist/zookeeper/2Zookeeper......
  • quartz集群增强版
    quartz集群增强版......
  • Apache DolphinScheduler将上游Task执行结果传递给下游
    01背景公司的数据开发平台需要用到DolphinScheduler做任务调度,其中一个场景是:上游任务执行结束后,需要将任务执行结果传递给下游任务。DolphinScheduler肯定是能实现任务之间的传参的,具体的可以看:DolphinScheduler|文档中心(https://dolphinscheduler.apache.org/zh-cn/docs/......
  • Apache DolphinScheduler + OceanBase,搭建分布式大数据调度平台的实践
    本文整理自白鲸开源联合创始人,ApacheDolphinSchedulerPMCChair,ApacheFoundationMember代立冬的演讲。主要介绍了DolphinScheduler及其架构、DolphinScheduler与OceanBase的联合大数据方案。DolphinScheduler是什么?ApacheDolphinScheduler,作为一款云原生且配备强大的可视......
  • 克服并发与容错:分布式任务调度系统实践
    5万字长文趁还没火赶紧收藏,比知识星球的项目质量高还不付费,你去哪儿找?基于spring环境开发,拒绝CRUD,让你体验后端的美我们为什么需要任务定时调度系统?从后端角度讲,任务定时调度系统广泛应用于数据备份、日志清理、系统维护等场景,能够显著提高系统的自动化程度和运......
  • Quartz的使用
    1.准备工作建立Maven工程2.引入Quartz的jar包 <dependencies> <dependency> <groupId>org.quartz-scheduler</groupId> <artifactId>quartz</artifactId> <version>2.3.0</version> </dependency> </dependenc......