目录
7.2.2、XxlJobSpringExecutor 初始化
8.1.2、XxlJobSpringExecutorProperties
1、什么是XXL-JOB
1.1、XXL-JOB简介
xxl-job是一个分布式的任务调度平台,其核心设计目标是:学习简单、开发迅速、轻量级、易扩展,现在已经开放源代码并接入多家公司的线上产品线,开箱即用。
1.2、XXL-JOB构成
xxl-job框架主要用于处理分布式的定时任务,其主要由调度中心和执行器组成。
调度模块(调度中心):
负责管理调度信息,按照调度配置发出调度请求,自身不承担业务代码。调度系统与任务解耦,提高了系统可用性和稳定性,同时调度系统性能不再受限于任务模块;
支持可视化、简单且动态的管理调度信息,包括任务新建,更新,删除,GLUE开发和任务报警等,所有上述操作都会实时生效,同时支持监控调度结果以及执行日志,支持执行器Failover。
执行模块(执行器):
负责接收调度请求并执行任务逻辑。任务模块专注于任务的执行等操作,开发和维护更加简单和高效;
接收“调度中心”的执行请求、终止请求和日志请求等。
任务:
负责执行具体的业务处理。
调度中心与执行器之间的工作流程如下:
执行流程:
1.任务执行器根据配置的调度中心的地址,自动注册到调度中心
2.达到任务触发条件,调度中心下发任务
3.执行器基于线程池执行任务,并把执行结果放入内存队列中、把执行日志写入日志文件中
4.执行器消费内存队列中的执行结果,主动上报给调度中心
5.当用户在调度中心查看任务日志,调度中心请求任务执行器,任务执行器读取任务日志文件并返回日志详情
1.3、XXL-JOB总结
调度中心:统一管理任务调度平台上的调度任务,负责触发调度执行,并且提供任务管理平台。
执行器:接收调度中心的调度并且执行,可以直接执行也可以集成到项目中。
调度中心和执行器两个模块分开部署,相互分离,两者之间通过RPC进行通信,其中调度中心主要是提供一个平台,管理调度信息,发送调度请求,自己不承担业务代码,而执行器接受调度中心的调度执行业务逻辑。
2、XXL-JOB原理
2.1、执行器的注册和发现
执行器的注册和发现主要是关系两张表:
xxl_job_registry:执行器的实例表,保存实例信息和心跳信息。
xxl_job_group:每个服务注册的实例列表。
执行器启动线程每隔30秒向注册表xxl_job_registry请求一次,更新执行器的心跳信息,调度中心启动线程每隔30秒检测一次xxl_job_registry,将超过90秒还没有收到心跳的实例信息从xxl_job_registry删除,并更新xxl_job_group服务的实例列表信息。
2.2、调度中心调用执行器
调度中心的操作:
调度中心循环不停的执行以下操作:
(1)关闭自动提交事务
(2)利用mysql的悲观锁使其他事务无法进入
select * from xxl_job_lock where lock_name = 'schedule_lock' for update
(3)读取数据库中的xxl_job_info:记录定时任务的相关信息,该表中有trigger_next_time字段表示下一次任务的触发时间。拿到距离当前时间5s内的任务列表。
分为三种情况处理:
1、对于当前时间-任务的下一次触发时间>5,直接调过不执行,重置trigger_next_time的时间。(超过5s)
2、对于任务的下一次触发时间<当前时间<任务的下一次触发时间+5的任务(不超过5s的):
(1)开线程处理执行触发逻辑,根据当前时间更新下一次任务触发时间
(2)如果新的任务下一次触发时间-当前时间<5,放到时间轮中,时间轮是一个map:
private volatile static Map<Integer, List<Integer>> ringData = new ConcurrentHashMap<>();
(3)根据新的任务下一次触发时间更新下下一次任务触发时间
3、对于任务的下一次触发时间>当前时间,将其放入时间轮中,根据任务下一次触发时间更新下下一次任务触发时间
(4)commit提交事务,同时释放排他锁
执行器的操作:
1、执行器接收到调度中心的调度信息,将调度信息放到对应的任务的等待队列中
2、执行器的任务处理线程从任务队列中取出调度信息,执行业务逻辑,将结果放入一个公共的等待队列中(每个任务都有一个单独的处理线程和等待队列,任务信息放入该队列中)
3、执行器有一个专门的回调线程定时批量从结果队列中取出任务结果,并且回调告知调度中心
3、XXL-JOB能够解决哪些问题
XXL-JOB是一个分布式任务调度平台,主要用于解决分布式定时任务
解析:
在平时的业务场景中,经常有一些场景需要使用定时任务,比如:
时间驱动的场景:某个时间点发送优惠券,发送短信等等。批量处理数据:批量统计上个月的账单,统计上个月销售数据等等。固定频率的场景:每隔5分钟需要执行一次。所以定时任务在平时开发中并不少见,而且对于现在快速消费的时代,每天都需要发送各种推送,消息都需要依赖定时任务去完成
4、XXL-JOB优点特性
1、简单:支持通过Web页面对任务进行CRUD操作,操作简单,一分钟上手;
2、动态:支持动态修改任务状态、启动/停止任务,以及终止运行中任务,即时生效;
3、调度中心HA(中心式):调度采用中心式设计,“调度中心”自研调度组件并支持集群部署,可保证调度中心HA;
4、执行器HA(分布式):任务分布式执行,任务"执行器"支持集群部署,可保证任务执行HA;
5、注册中心:执行器会周期性自动注册任务, 调度中心将会自动发现注册的任务并触发执行。同时,也支持手动录入执行器地址;
6、弹性扩容缩容:一旦有新执行器机器上线或者下线,下次调度时将会重新分配任务;
7、路由策略:执行器集群部署时提供丰富的路由策略,包括:第一个、最后一个、轮询、随机、一致性HASH、最不经常使用、最近最久未使用、故障转移、忙碌转移等;
8、故障转移:任务路由策略选择"故障转移"情况下,如果执行器集群中某一台机器故障,将会自动Failover切换到一台正常的执行器发送调度请求。
9、阻塞处理策略:调度过于密集执行器来不及处理时的处理策略,策略包括:单机串行(默认)、丢弃后续调度、覆盖之前调度;
10、任务超时控制:支持自定义任务超时时间,任务运行超时将会主动中断任务;
11、任务失败重试:支持自定义任务失败重试次数,当任务失败时将会按照预设的失败重试次数主动进行重试;其中分片任务支持分片粒度的失败重试;
12、任务失败告警:默认提供邮件方式失败告警,同时预留扩展接口,可方便的扩展短信、钉钉等告警方式;
13、分片广播任务:执行器集群部署时,任务路由策略选择"分片广播"情况下,一次任务调度将会广播触发集群中所有执行器执行一次任务,可根据分片参数开发分片任务;
14、动态分片:分片广播任务以执行器为维度进行分片,支持动态扩容执行器集群从而动态增加分片数量,协同进行业务处理;在进行大数据量业务操作时可显著提升任务处理能力和速度。
15、事件触发:除了"Cron方式"和"任务依赖方式"触发任务执行之外,支持基于事件的触发任务方式。调度中心提供触发任务单次执行的API服务,可根据业务事件灵活触发;
5、XXL-JOB安装部署
5.1、文档及源码
文档地址:https://www.xuxueli.com/xxl-job/
源码地址:https://github.com/xuxueli/xxl-job
xxl-job: 一个分布式任务调度平台,其核心设计目标是开发迅速、学习简单、轻量级、易扩展。现已开放源代码并接入多家公司线上产品线,开箱即用。
代码结构如下:
doc :文档,即SQL脚本所在目录
db : “调度数据库”建表脚本
xxl-job-admin : 调度中心项目源码
xxl-job-core : 核心模块,公共Jar依赖
xxl-job-executor-samples : 执行器,Sample示例项目(大家可以在该项目上进行开发,也可以将现有项目改造生成执行器项目)
5.2、调度中心部署
5.2.1、初始化【调度数据库】
打开项目代码,获取 “调度数据库初始化SQL脚本” 并执行即可。
“调度数据库初始化SQL脚本” 位置为: /xxl-job/doc/db/tables_xxl_job.sql ,数据库名:xxl_job
数据库如下:
数据库表解析:
xxl_job_lock:任务调度锁表;
xxl_job_group:执行器信息表,维护任务执行器信息;
xxl_job_info:调度扩展信息表:用于保存XXL-JOB调度任务的扩展信息,如任务分组、任务名、机器地址、执行器、执行入参和报警邮件等等;
xxl_job_log:调度日志表:用于保存XXL-JOB任务调度的历史信息,如调度结果、执行结果、调度入参、调度机器和执行器等等;
xxl_job_log_report:调度日志报表:用户存储XXL-JOB任务调度日志的报表,调度中心报表功能页面会用到;
xxl_job_logglue:任务GLUE日志:用于保存GLUE更新历史,用于支持GLUE的版本回溯功能;
xxl_job_registry:执行器注册表,维护在线的执行器和调度中心机器地址信息;
xxl_job_user:系统用户表;
调度中心支持集群部署,集群情况下各节点务必连接同一个mysql实例
如果mysql做主从,调度中心集群节点务必强制走主库
5.2.2、部署xxl-job-admin配置(调度中心)
调度中心统一管理任务调度平台上调度任务,负责出发调度执行,并且提供任务管理平台系统
打开 xxl-job-admin 的配置文件,/xxl-job/xxl-job-admin/src/main/resources/application.properties 对调度中心进行配置
调度中心配置如下(根据自己情况修改,切勿直接复制):
### 调度中心JDBC链接
spring.datasource.url=jdbc:mysql://database_ip:port/xxl_job?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&serverTimezone=Asia/Shanghai
spring.datasource.username=your_username
spring.datasource.password=your_password
spring.datasource.driver-class-name=com.mysql.jdbc.Driver
### 报警邮箱
spring.mail.host=smtp.qq.com
spring.mail.port=25
[email protected]
spring.mail.password=xxx
spring.mail.properties.mail.smtp.auth=true
spring.mail.properties.mail.smtp.starttls.enable=true
spring.mail.properties.mail.smtp.starttls.required=true
spring.mail.properties.mail.smtp.socketFactory.class=javax.net.ssl.SSLSocketFactory
### 调度中心通讯TOKEN:非空时启用;
xxl.job.accessToken=
### 调度中心国际化配置:"zh_CN"/中文简体, "zh_TC"/中文繁体 and "en"/英文;
xxl.job.i18n=zh_CN
## 调度线程池最大线程配置
xxl.job.triggerpool.fast.max=200
xxl.job.triggerpool.slow.max=100
### 调度中心日志表数据保存天数:过期日志自动清理;限制大于等于7时生效,否则, 如-1,关闭自动清理功能;
xxl.job.logretentiondays=30
其中重要配置如下:
server.port : 根据情况修改端口
spring.datasource.url :指向刚才准备的数据库
spring.datasource.password : 记得修改成自己的数据库密码
spring.mail.username :配置自己的邮件账号
spring.mail.password :邮件的授权码
5.2.3、调度中心部署
如果已经正确进行上述配置,可将项目编译打包部署
启动之后,浏览器访问 http://localhost:18080/xxl-job-admin (该地址执行器将会使用到,作为回调地址)
默认登录账号 “admin/123456”, 登录后运行界面如下图所示:
调度中心集群部署时,需要注意以下事项:
1、DB配置保持一致
2、集群机器时钟保持一致(单机集群可以忽视)
3、建议:推荐通过nginx为调度中心集群做负载均衡,分配域名。调度中心访问、执行器回调配置,调用API服务等操作均通过该域名进行。
5.3、执行器配置及部署
5.3.1、引入maven依赖
<dependency>
<groupId>com.xuxueli</groupId>
<artifactId>xxl-job-core</artifactId>
<version>2.4.0-SNAPSHOT</version>
</dependency>
5.3.2、执行器配置&说明
### 调度中心部署根地址 [选填]:如调度中心集群部署存在多个地址则用逗号分隔。执行器将会使用该地址进行"执行器心跳注册"和"任务结果回调";为空则关闭自动注册;
xxl.job.admin.addresses=http://127.0.0.1:8080/xxl-job-admin
### 执行器通讯TOKEN [选填]:非空时启用;
xxl.job.accessToken=
### 执行器AppName [选填]:执行器心跳注册分组依据;为空则关闭自动注册
xxl.job.executor.appname=xxl-job-demo
### 执行器注册 [选填]:优先使用该配置作为注册地址,为空时使用内嵌服务 ”IP:PORT“ 作为注册地址。从而更灵活的支持容器类型执行器动态IP和动态映射端口问题。
xxl.job.executor.address=
### 执行器IP [选填]:默认为空表示自动获取IP,多网卡时可手动设置指定IP,该IP不会绑定Host仅作为通讯实用;地址信息用于 "执行器注册" 和 "调度中心请求并触发任务";
xxl.job.executor.ip=
### 执行器端口号 [选填]:小于等于0则自动获取;默认端口为9999,单机部署多个执行器时,注意要配置不同执行器端口;
xxl.job.executor.port=9999
### 执行器运行日志文件存储磁盘路径 [选填] :需要对该路径拥有读写权限;为空则使用默认路径;
xxl.job.executor.logpath=/data/applogs/xxl-job/jobhandler
### 执行器日志文件保存天数 [选填] : 过期日志自动清理, 限制值大于等于3时生效; 否则, 如-1, 关闭自动清理功能;
xxl.job.executor.logretentiondays=30
5.3.3、XxlJobConfig配置类
执行器组件配置,现已自动装配,执行器中的XxlJobConfig将根据配置文件中配置生成XxlJobSpringExecuto
@Configuration
@Slf4j
public class XxlJobConfig {
@Value("${xxl.job.admin.addresses}")
private String adminAddresses;
@Value("${xxl.job.executor.appname}")
private String appName;
/*@Value("${xxl.job.executor.ip}")
private String ip;*/
@Value("${xxl.job.executor.port}")
private int port;
@Value("${xxl.job.accessToken}")
private String accessToken;
@Value("${xxl.job.executor.logpath}")
private String logPath;
@Value("${xxl.job.executor.logretentiondays}")
private int logRetentionDays;
@Bean(initMethod = "start", destroyMethod = "destroy")
public XxlJobSpringExecutor xxlJobExecutor() {
log.info(">>>>>>>>>>> xxl-job config init.");
XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
xxlJobSpringExecutor.setAppName(appName);
//xxlJobSpringExecutor.setIp(ip);
xxlJobSpringExecutor.setPort(port);
xxlJobSpringExecutor.setAccessToken(accessToken);
xxlJobSpringExecutor.setLogPath(logPath);
xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);
return xxlJobSpringExecutor;
}
}
5.3.4、demo示例
在com.xxl.job.executor.service.jobhandler.SampleXxlJob中提供了简单的定时任务实例,为方便用户参考与快速实用,示例执行器内原生提供多个Bean模式任务Handler,可以直接配置实用,如下:
demoJobHandler:简单示例任务,任务内部模拟耗时任务逻辑,用户可在线体验Rolling Log等功能;
shardingJobHandler:分片示例任务,任务内部模拟处理分片参数,可参考熟悉分片任务;
httpJobHandler:通用HTTP任务Handler;业务方只需要提供HTTP链接等信息即可,不限制语言、平台;
demo示例:
/**
* XxlJob开发示例(Bean模式)
*
* 开发步骤:
* 1、在Spring Bean实例中,开发Job方法,方式格式要求为 "public ReturnT<String> execute(String param)"
* 2、为Job方法添加注解 "@XxlJob(value="自定义jobhandler名称", init = "JobHandler初始化方法", destroy = "JobHandler销毁方法")",注解value值对应的是调度中心新建任务的JobHandler属性的值。
* 3、执行日志:需要通过 "XxlJobLogger.log" 打印执行日志;
*
* @author xuxueli 2019-12-11 21:52:51
*/
@Component
public class SampleXxlJob {
private static Logger logger = LoggerFactory.getLogger(SampleXxlJob.class);
/**
* 1、简单任务示例(Bean模式)
*/
@XxlJob("demoJobHandler")
public ReturnT<String> demoJobHandler(String param) throws Exception {
logger.info("XXL-JOB, Hello World. param={}",param);
return ReturnT.SUCCESS;
}
//...省略...
}
【重要】 如果我们要写自己的定时任务,参照上面方法,在方法上注解一个@XxlJob("任务名字") ,方法可以接受一个字符串参数,方法需要返回ReturnT格式
5.3.5、执行器页面配置
进入调度中心:http://localhost:18080/xxl-job-admin
进入执行器管理,点击操作 -> 编辑,进入执行器编辑页面
appName : 执行器的名字,可以任意填写
名称:任意填写
注册方式:调度中心是通过RPC的方式对执行器发起调度,所以这里需要的是执行器项目的ip:port ,注意,该端口不是执行器项目的server.port ,而是:xxl.job.executor.port 端口。你可以选择自动注册,也可以手动录入。
5.4、任务创建与执行
5.4.1、创建任务
进入调度中心:http://localhost:18080/xxl-job-admin
进入任务管理,点击新增,进入新增任务页面
执行器:任务的绑定的执行器,任务触发调度时将会自动发现注册成功的执行器, 实现任务自动发现功能; 另一方面也可以方便的进行任务分组。每个任务必须绑定一个执行器, 可在 "执行器管理" 进行设置;
任务描述:任务的描述信息,便于任务管理
路由策略:
FIRST(第一个):固定选择第一个机器;
LAST(最后一个):固定选择最后一个机器;
ROUND(轮询):轮询;
RANDOM(随机):随机选择在线的机器;
CONSISTENT_HASH(一致性HASH):每个任务按照Hash算法固定选择某一台机器,且所有任务均匀散列在不同机器上;
LEAST_FREQUENTLY_USED(最不经常使用):使用频率最低的机器优先被选举;
LEAST_RECENTLY_USED(最近最久未使用):最久未使用的机器优先被选举;
FAILOVER(故障转移):按照顺序依次进行心跳检测,第一个心跳检测成功的机器选定为目标执行器并发起调度;
BUSYOVER(忙碌转移):按照顺序依次进行空闲检测,第一个空闲检测成功的机器选定为目标执行器并发起调度;
SHARDING_BROADCAST(分片广播):广播触发对应集群中所有机器执行一次任务,同时系统自动传递分片参数;可根据分片参数开发分片任务;
Cron:定时任务的执行时间规则,时间表达式
运行模式:BEAN模式:任务以JobHandler方式维护在执行器端;需要结合 "JobHandler" 属性匹配执行器中任务;
GLUE模式(Java):任务以源码方式维护在调度中心;该模式的任务实际上是一段继承自IJobHandler的Java类代码并 "groovy" 源码方式维护,它在执行器项目中运行,可使用@Resource/@Autowire注入执行器里中的其他服务;
GLUE模式(Shell):任务以源码方式维护在调度中心;该模式的任务实际上是一段 "shell" 脚本;
GLUE模式(Python):任务以源码方式维护在调度中心;该模式的任务实际上是一段 "python" 脚本;
GLUE模式(PHP):任务以源码方式维护在调度中心;该模式的任务实际上是一段 "php" 脚本;
GLUE模式(NodeJS):任务以源码方式维护在调度中心;该模式的任务实际上是一段 "nodejs" 脚本;
GLUE模式(PowerShell):任务以源码方式维护在调度中心;该模式的任务实际上是一段 "PowerShell" 脚本;
JobHandler:行模式为 "BEAN模式" 时生效,对应执行器中新开发的 JobHandler 类 “@JobHandler” 注解自定义的 value 值
阻塞处理策略:调度过于密集执行器来不及处理时的处理策略;
单机串行(默认):调度请求进入单机执行器后,调度请求进入FIFO队列并以串行方式运行;
丢弃后续调度:调度请求进入单机执行器后,发现执行器存在运行的调度任务,本次请求将会被丢弃并标记为失败;
覆盖之前调度:调度请求进入单机执行器后,发现执行器存在运行的调度任务,将会终止运行中的调度任务并清空队 列,然后运行本地调度任务;
子任务:每个任务都拥有一个唯一的任务ID(任务ID可以从任务列表获取),当本任务执行结束并且执行成功时,将会触发子任务ID所对应的任务的一次主动调度
任务超时时间:支持自定义任务超时时间,任务运行超时将会主动中断任务
失败重试次数:支持自定义任务失败重试次数,当任务失败时将会按照预设的失败重试次数主动进行重试
报警邮件:任务调度失败时邮件通知的邮箱地址,支持配置多邮箱地址,配置多个邮箱地址时用逗号分隔
负责人:任务负责人
任务参数:任务执行所需的参数
5.4.2、执行任务
配置任务后,右击选择执行一次进行任务调度测试
任务执行后进入调度日志,可以查看任务执行日志
5.5、GLUE模式(java)
5.5.1、添加任务
该模式支持在线编辑定时任务的内容,立刻执行,无需再开发工具中编辑代码,也无需重启项目。
请点击任务右侧 “GLUE” 按钮,进入 “GLUE编辑器开发界面” ,见下图。“GLUE模式(Java)” 运行模式的任务默认已经初始化了示例任务代码,即打印Hello World。
任务以源码方式维护在调度中心,支持通过Web IDE在线更新,实时编译和生效,因此不需要指定JobHandler
5.5.2、编写代码
保存之后可以在操作按钮里面去编写任务
(“GLUE模式(Java)” 运行模式的任务实际上是一段继承自IJobHandler的Java类代码,它在执行器项目中运行,可使用@Resource/@Autowire注入执行器里中的其他服务),比如我的定时任务如下,编辑好之后点击保存
保存好之后,启动定时任务,效果如下:
6、XXL-JOB集群部署
6.1、调度中心集群
6.1.1、问题概述
调度中心支持集群部署,提升调度系统容灾和可用性。调度中心集群部署时,几点要求和建议:
(1)DB配置保持一致;
(2)集群机器时钟保持一致(单机集群忽视);
(3)当启动多个调度器时,执行器配置调度中心部署跟地址可以用逗号分隔。执行器将会使用该地址进行"执行器心跳注册"和"任务结果回调";为空则关闭自动注册;
建议:推荐通过nginx为调度中心集群做负载均衡,分配域名。调度中心访问、执行器回调配置、调用API服务等操作均通过该域名进行。
6.1.2、启动多个调度中心
修改调度中心端口,启动多个调度中心,我这里启动两个如
6.1.3、配置Nginx负载均衡
当启动多个调度器时,执行器配置调度中心部署跟地址可以用逗号分隔。执行器将会使用该地址进行“执行器心跳注册”和“任务结果回调”;为空则关闭自动注册;
建议:推荐通过nginx为调度中心集群做负载均衡,分配域名。调度中心访问、执行器回调配置、调用API服务等操作均通过该域名进行。
我们启动了2个调度中心,那么我的执行器项目该注册到哪个调度中心呢?我们通过Nginx来解决这个问题,原理如下图:
我们再hosts配置 www.jobs.com 作为nginx的主机域名,然后反向代理到多个调度中心,这样一来执行器就只需要注册到 www.jobs.com Nginx即可。
修改 C:\Windows\System32\drivers\etc\hosts增加配置如下:
127.0.0.1 www.jobs.com
Nginx配置如下:
#调度中心
upstream jobs{
server localhost:18080;
server localhost:18081;
}
server {
listen 80;
#使用域名
server_name www.jobs.com;
#charset koi8-r;
#access_log logs/host.access.log main;
location / {
#调度中心反向代理配置
proxy_pass http://jobs/;
}
#error_page 404 /404.html;
# redirect server error pages to the static page /50x.html
#
error_page 500 502 503 504 /50x.html;
location = /50x.html {
root html;
}
}
启动Nginx,通过浏览器访问 Jobs - Employment - Career Search, Jobs Near You | Jobs.com ,可以访问到调度中心的管理界面
6.2、执行器项目集群
执行器支持集群部署,提升调度系统可用性,同时提升任务处理能力。
执行器集群部署时,几点要求和建议:
(1)执行器回调地址(xxl.job.admin.addresses)需要保持一致;执行器根据该配置进行执行器自动注册等操作。
(2)同一个执行器集群内AppName(xxl.job.executor.appname)需要保持一致;调度中心根据该配置动态发现不同集群的在线执行器列表。
6.2.1、启动多个执行器项目
现在对执行器项目做集群,修改xxl-job-executor-sample-springboot配置文件application.properties
server.port : 既然是做集群,项目端口需要修改
xxl.job.admin.addresses : 调度中心地址需要修改成www.jobs.com ,多个执行器配置同一个地址。
xxl.job.executor.port : RPC通信端口也要修改,多个执行器该端口需要不一样
第一个实例配置:
server.port=19090
xxl.job.admin.addresses=http://www.jobs.com/xxl-job-admin #对应Nginx地址
xxl.job.executor.port=9999
第二个实例配置:
server.port=19091
xxl.job.admin.addresses=http://www.jobs.com/xxl-job-admin
xxl.job.executor.port=9998
在 Configurations中配置,允许启动多个实例
启动实例如下:
6.2.2、配置定时任务
通过http://www.jobs.com/xxl-job-admin 访问调度中心管理界面,在执行器管理中可以看到多台执行器实例
在任务管理中,可以编辑任务,然后选择路由策略,比如:选择轮询,然后启动任务,就会看到两个执行器项目轮着执行定时任务
7、XXL-JOB源码解析
7.1、XXL-JOB时序图
7.2、XxlJobSpringExecutor 启动
项目启动后获取配置文件中的属性值并实例化配置类 XxlJobSpringExecutor,XxlJobSpringExecutor 实现了 SmartInitializingSingleton 接口的 afterSingletonsInstantiated 方法,在实例化 XxlJobSpringExecutor 后触发执行,XxlJobSpringExecutor 在该方法里做了两件事:
- 调用 initJobHandlerMethodRepository 方法扫描项目中带 @XxlJob 注解的方法(即 jobHandler)并注册;
- 调用父类 XxlJobExecutor 的 start 方法启动 Executor,并初始化核心组件:
- 初始化 admin 控制台:initAdminBizList(adminAddresses, accessToken)
- 初始化日志清理进程 JobLogFileCleanThread:JobLogFileCleanThread.getInstance().start(logRetentionDays)
- 初始化触发回调进程 TriggerCallbackThread:TriggerCallbackThread.getInstance().start()
- 初始化内置服务 executor-server:initEmbedServer(address, ip, port, appname, accessToken)
7.2.1、jobHandler 注册
在项目启动时,执行器会通过 “@JobHandler” 识别 Spring 容器中 “Bean模式任务”,以注解的 value 属性为 key 管理起来,保存至 jobHandlerRepository。
1 initJobHandlerMethodRepository(ApplicationContext applicationContext)
该方法扫描项目中的类,将带有 @XxlJob 注解的 job 进行解析,并调用 registJobHandler(String name, IJobHandler jobHandler) 进行注册:
// ---------------------- job handler repository ----------------------
private static ConcurrentMap<String, IJobHandler> jobHandlerRepository = new ConcurrentHashMap<String, IJobHandler>();
// registry jobhandler
/**
* name: @XxlJob 的 value, jobHandler 的名字
* bean:带 @XxlJob 方法对应的类名
* method: 带 @XxlJob 注解的方法名
* initMethod: @XxlJob 的 init 值对应方法的全限定名,即 execute 方法调用前执行的方法
* destroyMethod: @XxlJob 的 destroy 值对应方法的全限定名,即 execute 方法调用后执行的方法
*/
registJobHandler(name, new MethodJobHandler(bean, method, initMethod, destroyMethod));
2 MethodJobHandler
public class MethodJobHandler extends IJobHandler {
private final Object target;
private final Method method;
private Method initMethod;
private Method destroyMethod;
public MethodJobHandler(Object target, Method method, Method initMethod, Method destroyMethod) {
this.target = target;
this.method = method;
this.initMethod =initMethod;
this.destroyMethod =destroyMethod;
}
@Override
public ReturnT<String> execute(String param) throws Exception {
return (ReturnT<String>) method.invoke(target, new Object[]{param});
}
@Override
public void init() throws InvocationTargetException, IllegalAccessException {
if(initMethod != null) {
initMethod.invoke(target);
}
}
@Override
public void destroy() throws InvocationTargetException, IllegalAccessException {
if(destroyMethod != null) {
destroyMethod.invoke(target);
}
}
}
7.2.2、XxlJobSpringExecutor 初始化
7.2.2.1、初始化admin控制台
initAdminBizList 实例化 AdminBizClient,并将其存在 adminBizList 中
private static List<AdminBiz> adminBizList;
private void initAdminBizList(String adminAddresses, String accessToken) throws Exception {
if (adminAddresses!=null && adminAddresses.trim().length()>0) {
// 多个 admin 控制台地址以 "," 分隔
for (String address: adminAddresses.trim().split(",")) {
if (address!=null && address.trim().length()>0) {
AdminBiz adminBiz = new AdminBizClient(address.trim(), accessToken);
if (adminBizList == null) {
adminBizList = new ArrayList<AdminBiz>();
}
adminBizList.add(adminBiz);
}
}
}
}
7.2.2.2、初始化日志清理进程
7.2.2.3、初始化出发回调进程
7.2.2.4、初始化内置事务
private void initEmbedServer(String address, String ip, int port, String appname, String accessToken) throws Exception {
// start
embedServer = new EmbedServer();
embedServer.start(address, port, appname, accessToken);
}
initEmbedServer 方法内部直接调用 EmbedServer#start 方法
public void start(final String address, final int port, final String appname, final String accessToken) {
// 1 初始化 executorBiz 执行单元和线程池
executorBiz = new ExecutorBizImpl();
ThreadPoolExecutor bizThreadPool = new ThreadPoolExecutor(
0,
200,
60L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(2000),
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "xxl-rpc, EmbedServer bizThreadPool-" + r.hashCode());
}
},
new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
throw new RuntimeException("xxl-job, EmbedServer bizThreadPool is EXHAUSTED!");
}
});
// 2 启动 ServerBootstrap 并绑定端口号
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel channel) throws Exception {
channel.pipeline()
.addLast(new IdleStateHandler(0, 0, 30 * 3, TimeUnit.SECONDS)) // beat 3N, close if idle
.addLast(new HttpServerCodec())
.addLast(new HttpObjectAggregator(5 * 1024 * 1024)) // merge request & reponse to FULL
.addLast(new EmbedHttpServerHandler(executorBiz, accessToken, bizThreadPool));
}
})
.childOption(ChannelOption.SO_KEEPALIVE, true);
ChannelFuture future = bootstrap.bind(port).sync();
// 3 将执行器信息注册到调度中心(admin 控制台)
startRegistry(appname, address);
// 4 wait util stop
future.channel().closeFuture().sync();
}
EmbedServer 的 start 方法主要处理以下几件事:
- 初始化 executorBiz 和线程池 bizThreadPool
- 启动 ServerBootstrap 并绑定端口号
调度中心实际的调度请求由 EmbedHttpServerHandler 处理 - 将执行器信息注册到调度中心(admin 控制台)
public void startRegistry(final String appname, final String address) {
// start registry
ExecutorRegistryThread.getInstance().start(appname, address);
}
ExecutorRegistryThread#start 方法封装执行器信息,并调用 AdminBizClient#registry 方法将执行器信息注册于调度中心,核心代码如下:
RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appname, address);
for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {
try {
// AdminBizClient 实现 AdminBiz 接口,将执行器信息进行注册
ReturnT<String> registryResult = adminBiz.registry(registryParam);
if (registryResult!=null && ReturnT.SUCCESS_CODE == registryResult.getCode()) {
registryResult = ReturnT.SUCCESS;
logger.debug(">>>>>>>>>>> xxl-job registry success, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
break;
} else {
logger.info(">>>>>>>>>>> xxl-job registry fail, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
}
} catch (Exception e) {
logger.info(">>>>>>>>>>> xxl-job registry error, registryParam:{}", registryParam, e);
}
}
7.3、调度平台发布任务
前端页面处罚具体任务后,转发至 JobInfoController 的 triggerJob 方法
/**
* 处理触发请求
*
* @param id 任务 id
* @param executorParam 执行器参数
* @param addressList 执行器地址
* @return
*/
@RequestMapping("/trigger")
@ResponseBody
public ReturnT<String> triggerJob(int id, String executorParam, String addressList) {
JobTriggerPoolHelper.trigger(id, TriggerTypeEnum.MANUAL, -1, null, executorParam, addressList);
return ReturnT.SUCCESS;
}
JobTriggerPoolHelper 的 trigger 方法调用 addTrigger 方法,addTrigger 方法将调度请求交给线程池处理,线程池中通过 XxlJobTrigger 的 trigger 方法处理实际请求
public static void trigger(int jobId, TriggerTypeEnum triggerType, int failRetryCount, String executorShardingParam, String executorParam, String addressList) {
// 获取任务详细信息
XxlJobInfo jobInfo = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(jobId);
if (jobInfo == null) {
logger.warn(">>>>>>>>>>>> trigger fail, jobId invalid,jobId={}", jobId);
return;
}
// 设置任务执行参数
if (executorParam != null) {
jobInfo.setExecutorParam(executorParam);
}
int finalFailRetryCount = failRetryCount>=0?failRetryCount:jobInfo.getExecutorFailRetryCount();
// 获取执行器详细信息
XxlJobGroup group = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().load(jobInfo.getJobGroup());
// 如果手动设置执行任务机器的地址,则覆盖执行器原有的地址(即新增执行器时自动注册或手动录入的地址)
if (addressList!=null && addressList.trim().length()>0) {
group.setAddressType(1);
group.setAddressList(addressList.trim());
}
// sharding param
int[] shardingParam = null;
if (executorShardingParam!=null){
String[] shardingArr = executorShardingParam.split("/");
if (shardingArr.length==2 && isNumeric(shardingArr[0]) && isNumeric(shardingArr[1])) {
shardingParam = new int[2];
shardingParam[0] = Integer.valueOf(shardingArr[0]);
shardingParam[1] = Integer.valueOf(shardingArr[1]);
}
}
// 如果是分片广播任务,则根据执行器数量将任务发布至各个机器地址
if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null)
&& group.getRegistryList()!=null && !group.getRegistryList().isEmpty()
&& shardingParam==null) {
for (int i = 0; i < group.getRegistryList().size(); i++) {
processTrigger(group, jobInfo, finalFailRetryCount, triggerType, i, group.getRegistryList().size());
}
} else { // 其他路由策略的任务发布一条即可
if (shardingParam == null) {
shardingParam = new int[]{0, 1};
}
processTrigger(group, jobInfo, finalFailRetryCount, triggerType, shardingParam[0], shardingParam[1]);
}
}
processTrigger 方法主要处理以下几件事:
- 初始化触发参数:TriggerParam
- 根据不同路由策略获取执行器地址
根据不同的 executorRouteStrategy 策略获取 ExecutorRouter,并调用 ExecutorRouter 的 route 方法选择一个执行器地址
// 根据不同路由策略获取执行器 ip 地址
routeAddressResult = executorRouteStrategyEnum.getRouter().route(triggerParam, group.getRegistryList());
3. 发送调度请求至远程执行器
runExecutor 方法根据执行器地址获取执行单元 ExecutorBizClient,并调用 ExecutorBizClient 的 run 方法将调度请求以 HTTP POST 形式发送,由执行器的 EmbedServer 接受
public ReturnT<String> run(TriggerParam triggerParam) {
return XxlJobRemotingUtil.postBody(addressUrl + "run", accessToken, timeout, triggerParam, String.class);
}
- 保存任务执行信息
private static void processTrigger(XxlJobGroup group, XxlJobInfo jobInfo, int finalFailRetryCount, TriggerTypeEnum triggerType, int index, int total){
// param
ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(jobInfo.getExecutorBlockStrategy(), ExecutorBlockStrategyEnum.SERIAL_EXECUTION); // block strategy
ExecutorRouteStrategyEnum executorRouteStrategyEnum = ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null); // route strategy
String shardingParam = (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==executorRouteStrategyEnum)?String.valueOf(index).concat("/").concat(String.valueOf(total)):null;
// 1、save log-id
XxlJobLog jobLog = new XxlJobLog();
jobLog.setJobGroup(jobInfo.getJobGroup());
jobLog.setJobId(jobInfo.getId());
jobLog.setTriggerTime(new Date());
XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().save(jobLog);
logger.debug(">>>>>>>>>>> xxl-job trigger start, jobId:{}", jobLog.getId());
// 2、init trigger-param
TriggerParam triggerParam = new TriggerParam();
triggerParam.setJobId(jobInfo.getId());
triggerParam.setExecutorHandler(jobInfo.getExecutorHandler());
triggerParam.setExecutorParams(jobInfo.getExecutorParam());
triggerParam.setExecutorBlockStrategy(jobInfo.getExecutorBlockStrategy());
triggerParam.setExecutorTimeout(jobInfo.getExecutorTimeout());
triggerParam.setLogId(jobLog.getId());
triggerParam.setLogDateTime(jobLog.getTriggerTime().getTime());
triggerParam.setGlueType(jobInfo.getGlueType());
triggerParam.setGlueSource(jobInfo.getGlueSource());
triggerParam.setGlueUpdatetime(jobInfo.getGlueUpdatetime().getTime());
triggerParam.setBroadcastIndex(index);
triggerParam.setBroadcastTotal(total);
// 3、init address
String address = null;
ReturnT<String> routeAddressResult = null;
if (group.getRegistryList()!=null && !group.getRegistryList().isEmpty()) {
if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST == executorRouteStrategyEnum) {
if (index < group.getRegistryList().size()) {
address = group.getRegistryList().get(index);
} else {
address = group.getRegistryList().get(0);
}
} else {
// 根据不同路由策略获取执行器 ip 地址
routeAddressResult = executorRouteStrategyEnum.getRouter().route(triggerParam, group.getRegistryList());
if (routeAddressResult.getCode() == ReturnT.SUCCESS_CODE) {
address = routeAddressResult.getContent();
}
}
} else {
routeAddressResult = new ReturnT<String>(ReturnT.FAIL_CODE, I18nUtil.getString("jobconf_trigger_address_empty"));
}
// 4、trigger remote executor
ReturnT<String> triggerResult = null;
if (address != null) {
triggerResult = runExecutor(triggerParam, address);
} else {
triggerResult = new ReturnT<String>(ReturnT.FAIL_CODE, null);
}
// 5、collection trigger info
StringBuffer triggerMsgSb = new StringBuffer();
triggerMsgSb.append(I18nUtil.getString("jobconf_trigger_type")).append(":").append(triggerType.getTitle());
triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_admin_adress")).append(":").append(IpUtil.getIp());
triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_exe_regtype")).append(":")
.append( (group.getAddressType() == 0)?I18nUtil.getString("jobgroup_field_addressType_0"):I18nUtil.getString("jobgroup_field_addressType_1") );
triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_exe_regaddress")).append(":").append(group.getRegistryList());
triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorRouteStrategy")).append(":").append(executorRouteStrategyEnum.getTitle());
if (shardingParam != null) {
triggerMsgSb.append("("+shardingParam+")");
}
triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorBlockStrategy")).append(":").append(blockStrategy.getTitle());
triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_timeout")).append(":").append(jobInfo.getExecutorTimeout());
triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorFailRetryCount")).append(":").append(finalFailRetryCount);
triggerMsgSb.append("<br><br><span style=\"color:#00c0ef;\" > >>>>>>>>>>>"+ I18nUtil.getString("jobconf_trigger_run") +"<<<<<<<<<<< </span><br>")
.append((routeAddressResult!=null&&routeAddressResult.getMsg()!=null)?routeAddressResult.getMsg()+"<br><br>":"").append(triggerResult.getMsg()!=null?triggerResult.getMsg():"");
// 6、save log trigger-info
jobLog.setExecutorAddress(address);
jobLog.setExecutorHandler(jobInfo.getExecutorHandler());
jobLog.setExecutorParam(jobInfo.getExecutorParam());
jobLog.setExecutorShardingParam(shardingParam);
jobLog.setExecutorFailRetryCount(finalFailRetryCount);
//jobLog.setTriggerTime();
jobLog.setTriggerCode(triggerResult.getCode());
jobLog.setTriggerMsg(triggerMsgSb.toString());
XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateTriggerInfo(jobLog);
logger.debug(">>>>>>>>>>> xxl-job trigger end, jobId:{}", jobLog.getId());
}
7.4、执行器接收并处理任务
执行器接收到调度中心的调度请求时,如果任务类型为 “Bean模式”,将会匹配 Spring 容器中的 “Bean模式任务”,然后调用其 execute 方法,执行任务逻辑。如果任务类型为 “GLUE模式”,将会加载 GLue 代码,实例化 Java 对象,注入依赖的 Spring 服务(注意:Glue代码中注入的Spring服务,必须存在与该“执行器”项目的Spring容器中),然后调用execute方法,执行任务逻辑。
7.4.1、EmbedServer接收调度请求
EmbedHttpServerHandler 调用 channelRead0 方法处理任务调度请求,每个请求通过线程池 bizThreadPool 新开一个线程进行处理
@Override
protected void channelRead0(final ChannelHandlerContext ctx, FullHttpRequest msg) throws Exception {
// invoke
bizThreadPool.execute(new Runnable() {
@Override
public void run() {
// do invoke
Object responseObj = process(httpMethod, uri, requestData, accessTokenReq);
// to json
String responseJson = GsonTool.toJson(responseObj);
// write response
writeResponse(ctx, keepAlive, responseJson);
}
});
}
process 方法根据 uri 的不同取值调用 ExecutorBiz 不同的处理方法:
if ("/beat".equals(uri)) {
return executorBiz.beat();
} else if ("/idleBeat".equals(uri)) {
IdleBeatParam idleBeatParam = GsonTool.fromJson(requestData, IdleBeatParam.class);
return executorBiz.idleBeat(idleBeatParam);
} else if ("/run".equals(uri)) {
TriggerParam triggerParam = GsonTool.fromJson(requestData, TriggerParam.class);
return executorBiz.run(triggerParam);
} else if ("/kill".equals(uri)) {
KillParam killParam = GsonTool.fromJson(requestData, KillParam.class);
return executorBiz.kill(killParam);
} else if ("/log".equals(uri)) {
LogParam logParam = GsonTool.fromJson(requestData, LogParam.class);
return executorBiz.log(logParam);
} else {
return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping(" + uri + ") not found.");
}
7.4.2、ExecutorBiz(执行器单元)
ExecutorBiz#run(TriggerParam triggerParam) 方法处理任务请求,源码如下
public ReturnT<String> run(TriggerParam triggerParam) {
// load old:jobHandler + jobThread
JobThread jobThread = XxlJobExecutor.loadJobThread(triggerParam.getJobId());
IJobHandler jobHandler = jobThread!=null?jobThread.getHandler():null;
String removeOldReason = null;
// valid:jobHandler + jobThread
// 1 根据任务模式(GlueTypeEnum)的不同进行不同的任务处理逻辑
GlueTypeEnum glueTypeEnum = GlueTypeEnum.match(triggerParam.getGlueType());
if (GlueTypeEnum.BEAN == glueTypeEnum) { // “Bean模式” 任务
// 根据 triggerParam.getExecutorHandler() 加载任务处理器,此处 triggerParam.getExecutorHandler() 等于 @XxlJob 注解中的 value 值
IJobHandler newJobHandler = XxlJobExecutor.loadJobHandler(triggerParam.getExecutorHandler());
// valid old jobThread
if (jobThread!=null && jobHandler != newJobHandler) {
// change handler, need kill old thread
removeOldReason = "change jobhandler or glue type, and terminate the old job thread.";
jobThread = null;
jobHandler = null;
}
// valid handler
if (jobHandler == null) {
jobHandler = newJobHandler;
if (jobHandler == null) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "job handler [" + triggerParam.getExecutorHandler() + "] not found.");
}
}
} else if (GlueTypeEnum.GLUE_GROOVY == glueTypeEnum) { // “GLUE模式(Java)” 任务
// valid old jobThread
if (jobThread != null &&
!(jobThread.getHandler() instanceof GlueJobHandler
&& ((GlueJobHandler) jobThread.getHandler()).getGlueUpdatetime()==triggerParam.getGlueUpdatetime() )) {
// change handler or gluesource updated, need kill old thread
removeOldReason = "change job source or glue type, and terminate the old job thread.";
jobThread = null;
jobHandler = null;
}
// valid handler
if (jobHandler == null) {
try {
IJobHandler originJobHandler = GlueFactory.getInstance().loadNewInstance(triggerParam.getGlueSource());
jobHandler = new GlueJobHandler(originJobHandler, triggerParam.getGlueUpdatetime());
} catch (Exception e) {
logger.error(e.getMessage(), e);
return new ReturnT<String>(ReturnT.FAIL_CODE, e.getMessage());
}
}
} else if (glueTypeEnum!=null && glueTypeEnum.isScript()) { // 其他脚本模式任务
// valid old jobThread
if (jobThread != null &&
!(jobThread.getHandler() instanceof ScriptJobHandler
&& ((ScriptJobHandler) jobThread.getHandler()).getGlueUpdatetime()==triggerParam.getGlueUpdatetime() )) {
// change script or gluesource updated, need kill old thread
removeOldReason = "change job source or glue type, and terminate the old job thread.";
jobThread = null;
jobHandler = null;
}
// valid handler
if (jobHandler == null) {
jobHandler = new ScriptJobHandler(triggerParam.getJobId(), triggerParam.getGlueUpdatetime(), triggerParam.getGlueSource(), GlueTypeEnum.match(triggerParam.getGlueType()));
}
} else {
return new ReturnT<String>(ReturnT.FAIL_CODE, "glueType[" + triggerParam.getGlueType() + "] is not valid.");
}
// 2 处理不同的阻塞策略
if (jobThread != null) {
ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(triggerParam.getExecutorBlockStrategy(), null);
// 丢弃后续调度:调度请求进入单机执行器后,发现执行器存在运行的调度任务,本次请求将会被丢弃并标记为失败
if (ExecutorBlockStrategyEnum.DISCARD_LATER == blockStrategy) {
// discard when running
if (jobThread.isRunningOrHasQueue()) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "block strategy effect:"+ExecutorBlockStrategyEnum.DISCARD_LATER.getTitle());
}
} else if (ExecutorBlockStrategyEnum.COVER_EARLY == blockStrategy) { // 覆盖之前调度:调度请求进入单机执行器后,发现执行器存在运行的调度任务,将会终止运行中的调度任务并清空队列,然后运行本地调度任务
// kill running jobThread
if (jobThread.isRunningOrHasQueue()) {
removeOldReason = "block strategy effect:" + ExecutorBlockStrategyEnum.COVER_EARLY.getTitle();
jobThread = null;
}
} else {
// just queue trigger
}
}
// 新建或替换任务已有的 jobHandler
if (jobThread == null) {
jobThread = XxlJobExecutor.registJobThread(triggerParam.getJobId(), jobHandler, removeOldReason);
}
// 3 将调度请求直接放入 jobThread 的触发队列进行异步处理
ReturnT<String> pushResult = jobThread.pushTriggerQueue(triggerParam);
return pushResult;
}
run() 方法主要做了以下几件事:
- 根据任务模式(GlueTypeEnum)的不同进行不同的任务处理逻辑
- Bean模式
每个Bean模式任务都是一个Spring的Bean类实例,它被维护在“执行器”项目的Spring容器中。任务类需要加“@JobHandler(value=”名称”)”注解,因为“执行器”会根据该注解识别Spring容器中的任务。任务类需要继承统一接口“IJobHandler”,任务逻辑在execute方法中开发,因为“执行器”在接收到调度中心的调度请求时,将会调用“IJobHandler”的execute方法,执行任务逻辑 - GLUE模式(Java)
每个 “GLUE模式(Java)” 任务的代码,实际上是“一个继承自“IJobHandler”的实现类的类代码”,“执行器”接收到“调度中心”的调度请求时,会通过Groovy类加载器加载此代码,实例化成Java对象,同时注入此代码中声明的Spring服务(请确保Glue代码中的服务和类引用在“执行器”项目中存在),然后调用该对象的execute方法,执行任务逻辑 - 其他脚本任务
脚本任务的源码托管在调度中心,脚本逻辑在执行器运行。当触发脚本任务时,执行器会加载脚本源码在执行器机器上生成一份脚本文件,然后通过Java代码调用该脚本;并且实时将脚本输出日志写到任务日志文件中,从而在调度中心可以实时监控脚本运行情况
- Bean模式
- 根据 blockStrategy 处理不同的阻塞策略
- 丢弃后续调度:调度请求进入单机执行器后,发现执行器存在运行的调度任务,本次请求将会被丢弃并标记为失败
- 覆盖之前调度:调度请求进入单机执行器后,发现执行器存在运行的调度任务,将会终止运行中的调度任务并清空队列,然后运行本地调度任务
if (ExecutorBlockStrategyEnum.DISCARD_LATER == blockStrategy) {
/* 丢弃后续调度:调度请求进入单机执行器后,发现执行器存在运行的调度任务,本次请求将会被丢弃并标记为失败 */
if (jobThread.isRunningOrHasQueue()) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "block strategy effect:"+ExecutorBlockStrategyEnum.DISCARD_LATER.getTitle());
}
} else if (ExecutorBlockStrategyEnum.COVER_EARLY == blockStrategy) {
/*
覆盖之前调度:若当前任务的处理线程(jobThread)有正在运行的任务或调度请求队列不为空,则清空当前任务处理线程
然后创建一个新的 jobThread 处理当前调度请求
*/
if (jobThread.isRunningOrHasQueue()) {
removeOldReason = "block strategy effect:" + ExecutorBlockStrategyEnum.COVER_EARLY.getTitle();
jobThread = null;
}
} else {
// just queue trigger
}
if (jobThread == null) {
// registJobThread 创建新的任务处理进程并将已存在的任务处理进程清除
jobThread = XxlJobExecutor.registJobThread(triggerParam.getJobId(), jobHandler, removeOldReason);
}
- 将调度请求放入 JobThread 的阻塞队列 triggerQueue,等待后续处理
ReturnT<String> pushResult = jobThread.pushTriggerQueue(triggerParam);
LinkedBlockingQueue<TriggerParam> triggerQueue
public ReturnT<String> pushTriggerQueue(TriggerParam triggerParam) {
// avoid repeat
if (triggerLogIdSet.contains(triggerParam.getLogId())) {
logger.info(">>>>>>>>>>> repeate trigger job, logId:{}", triggerParam.getLogId());
return new ReturnT<String>(ReturnT.FAIL_CODE, "repeate trigger job, logId:" + triggerParam.getLogId());
}
// avoid repeat trigger for the same TRIGGER_LOG_ID
triggerLogIdSet.add(triggerParam.getLogId());
triggerQueue.add(triggerParam);
return ReturnT.SUCCESS;
}
JobThread 通过 run() 方法循环处理请求,具体处理细节参见下节
7.4.3、JobThread(任务处理线程)
public void run() {
// init
handler.init();
// 1 从阻塞队列获取调度请求并处理
while(!toStop){
// to check toStop signal, we need cycle, so wo cannot use queue.take(), instand of poll(timeout)
TriggerParam triggerParam = triggerQueue.poll(3L, TimeUnit.SECONDS);
// execute
XxlJobLogger.log("<br>----------- xxl-job job execute start -----------<br>----------- Param:" + triggerParam.getExecutorParams());
// 若任务设置超时时间,则在限定时间内异步获取执行结果
if (triggerParam.getExecutorTimeout() > 0) {
// limit timeout
final TriggerParam triggerParamTmp = triggerParam;
FutureTask<ReturnT<String>> futureTask = new FutureTask<ReturnT<String>>(new Callable<ReturnT<String>>() {
@Override
public ReturnT<String> call() throws Exception {
return handler.execute(triggerParamTmp.getExecutorParams());
}
});
futureThread = new Thread(futureTask);
futureThread.start();
// 异步获取执行结果
executeResult = futureTask.get(triggerParam.getExecutorTimeout(), TimeUnit.SECONDS);
} else {
// 若没有设置超时时间,则同步获取任务执行结果
executeResult = handler.execute(triggerParam.getExecutorParams());
}
if (executeResult == null) {
executeResult = IJobHandler.FAIL;
} else {
executeResult.setMsg(
(executeResult!=null&&executeResult.getMsg()!=null&&executeResult.getMsg().length()>50000)
?executeResult.getMsg().substring(0, 50000).concat("...")
:executeResult.getMsg());
executeResult.setContent(null); // limit obj size
}
XxlJobLogger.log("<br>----------- xxl-job job execute end(finish) -----------<br>----------- ReturnT:" + executeResult);
// 2 任务处理结果回调
if(triggerParam != null) {
// callback handler info
if (!toStop) {
// commonm
TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), triggerParam.getLogDateTime(), executeResult));
} else {
// is killed
ReturnT<String> stopResult = new ReturnT<String>(ReturnT.FAIL_CODE, stopReason + " [job running, killed]");
TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), triggerParam.getLogDateTime(), stopResult));
}
}
}
// 3 任务处理进行被杀死,队列中剩余的调度请求进行失败处理,并回调任务处理结果
while(triggerQueue !=null && triggerQueue.size()>0){
TriggerParam triggerParam = triggerQueue.poll();
if (triggerParam!=null) {
// is killed
ReturnT<String> stopResult = new ReturnT<String>(ReturnT.FAIL_CODE, stopReason + " [job not executed, in the job queue, killed.]");
TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), triggerParam.getLogDateTime(), stopResult));
}
}
// destroy
handler.destroy();
}
run() 方法主要做了以下几件事:
1、从阻塞队列获取调度请求并处理
// to check toStop signal, we need cycle, so wo cannot use queue.take(), instand of poll(timeout)
TriggerParam triggerParam = triggerQueue.poll(3L, TimeUnit.SECONDS);
// 若任务设置超时时间,则在限定时间内异步获取执行结果
if (triggerParam.getExecutorTimeout() > 0) {
// limit timeout
final TriggerParam triggerParamTmp = triggerParam;
FutureTask<ReturnT<String>> futureTask = new FutureTask<ReturnT<String>>(new Callable<ReturnT<String>>() {
@Override
public ReturnT<String> call() throws Exception {
return handler.execute(triggerParamTmp.getExecutorParams());
}
});
futureThread = new Thread(futureTask);
futureThread.start();
// 异步获取执行结果
executeResult = futureTask.get(triggerParam.getExecutorTimeout(), TimeUnit.SECONDS);
} else {
// 若没有设置超时时间,则同步获取任务执行结果
executeResult = handler.execute(triggerParam.getExecutorParams());
}
2、任务处理结果回调
if (!toStop) {
// commonm
TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), triggerParam.getLogDateTime(), executeResult));
} else {
// is killed
ReturnT<String> stopResult = new ReturnT<String>(ReturnT.FAIL_CODE, stopReason + " [job running, killed]");
TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), triggerParam.getLogDateTime(), stopResult));
}
将任务执行结果 executeResult 放入 TriggerCallbackThread 的任务结果队列 callBackQueue
/**
* job results callback queue
*/
private LinkedBlockingQueue<HandleCallbackParam> callBackQueue = new LinkedBlockingQueue<HandleCallbackParam>();
public static void pushCallBack(HandleCallbackParam callback){
getInstance().callBackQueue.add(callback);
}
TriggerCallbackThread 创建 triggerCallbackThread 线程从 callBackQueue 中获取 executeResult 并将其以 HTTP POST 形式发送给调度中心,核心处理方法见 doCallback()
private void doCallback(List<HandleCallbackParam> callbackParamList){
// callback, will retry if error
for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {
ReturnT<String> callbackResult = adminBiz.callback(callbackParamList);
}
}
public ReturnT<String> callback(List<HandleCallbackParam> callbackParamList) {
return XxlJobRemotingUtil.postBody(addressUrl+"api/callback", accessToken, timeout, callbackParamList, String.class);
}
调度中心获取任务结果参数后,调用AdminBizImpl#callback(com.xxl.job.core.biz.model.HandleCallbackParam) 方法进行处理
private ReturnT<String> callback(HandleCallbackParam handleCallbackParam) {
// valid log item
XxlJobLog log = xxlJobLogDao.load(handleCallbackParam.getLogId());
}
// trigger success, to trigger child job
String callbackMsg = null;
if (IJobHandler.SUCCESS.getCode() == handleCallbackParam.getExecuteResult().getCode()) {
XxlJobInfo xxlJobInfo = xxlJobInfoDao.loadById(log.getJobId());
if (xxlJobInfo!=null && xxlJobInfo.getChildJobId()!=null && xxlJobInfo.getChildJobId().trim().length()>0) {
callbackMsg = "<br><br><span style=\"color:#00c0ef;\" > >>>>>>>>>>>"+ I18nUtil.getString("jobconf_trigger_child_run") +"<<<<<<<<<<< </span><br>";
String[] childJobIds = xxlJobInfo.getChildJobId().split(",");
for (int i = 0; i < childJobIds.length; i++) {
int childJobId = (childJobIds[i]!=null && childJobIds[i].trim().length()>0 && isNumeric(childJobIds[i]))?Integer.valueOf(childJobIds[i]):-1;
if (childJobId > 0) {
JobTriggerPoolHelper.trigger(childJobId, TriggerTypeEnum.PARENT, -1, null, null, null);
ReturnT<String> triggerChildResult = ReturnT.SUCCESS;
// add msg
callbackMsg += MessageFormat.format(I18nUtil.getString("jobconf_callback_child_msg1"),
(i+1),
childJobIds.length,
childJobIds[i],
(triggerChildResult.getCode()==ReturnT.SUCCESS_CODE?I18nUtil.getString("system_success"):I18nUtil.getString("system_fail")),
triggerChildResult.getMsg());
} else {
callbackMsg += MessageFormat.format(I18nUtil.getString("jobconf_callback_child_msg2"),
(i+1),
childJobIds.length,
childJobIds[i]);
}
}
}
}
// handle msg
StringBuffer handleMsg = new StringBuffer();
if (log.getHandleMsg()!=null) {
handleMsg.append(log.getHandleMsg()).append("<br>");
}
if (handleCallbackParam.getExecuteResult().getMsg() != null) {
handleMsg.append(handleCallbackParam.getExecuteResult().getMsg());
}
if (callbackMsg != null) {
handleMsg.append(callbackMsg);
}
if (handleMsg.length() > 15000) {
handleMsg = new StringBuffer(handleMsg.substring(0, 15000)); // text最大64kb 避免长度过长
}
// success, save log
log.setHandleTime(new Date());
log.setHandleCode(handleCallbackParam.getExecuteResult().getCode());
log.setHandleMsg(handleMsg.toString());
xxlJobLogDao.updateHandleInfo(log);
return ReturnT.SUCCESS;
}
3、JobThread 停止后处理队列遗留调度请求
while(triggerQueue !=null && triggerQueue.size()>0){
TriggerParam triggerParam = triggerQueue.poll();
if (triggerParam!=null) {
// is killed
ReturnT<String> stopResult = new ReturnT<String>(ReturnT.FAIL_CODE, stopReason + " [job not executed, in the job queue, killed.]");
TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), triggerParam.getLogDateTime(), stopResult));
}
}
7.4.4、IJobHandler(任务单元)
public abstract class IJobHandler {
/** success */
public static final ReturnT<String> SUCCESS = new ReturnT<String>(200, null);
/** fail */
public static final ReturnT<String> FAIL = new ReturnT<String>(500, null);
/** fail timeout */
public static final ReturnT<String> FAIL_TIMEOUT = new ReturnT<String>(502, null);
/**
* execute handler, invoked when executor receives a scheduling request
*
* @param param
* @return
* @throws Exception
*/
public abstract ReturnT<String> execute(String param) throws Exception;
/**
* init handler, invoked when JobThread init
*/
public void init() throws InvocationTargetException, IllegalAccessException {
// do something
}
/**
* destroy handler, invoked when JobThread destroy
*/
public void destroy() throws InvocationTargetException, IllegalAccessException {
// do something
}
}
IJobHandler 有四个实现类:
- MethodJobHandler:负责处理 BEAN 模式的任务
- GlueJobHandler:负责处理 Glue 模式的任务
- ScriptJobHandler:负责处理脚本模式的任务
- CommandJobHandler:负责处理命令行任务
7.5、心跳检测
任务注册 Beat 周期默认30s
执行器以一倍 Beat 进行执行器注册
调度中心以一倍 Beat 进行动态任务发现
执行器注册信息的失效时间为三倍 Beat
执行器销毁,主动上报调度中心并摘除对应的执行器机器信息
8、XXL-JOB改造
8.1、自动装配
8.1.1、XxlJobAutoConfigure
XxlJobAutoConfigure 自动装配 XxlJobSpringExecutor 到项目的 Spring 容器中
/**
* matchIfMissing 属性为true时,配置文件中缺少对应的value或name的对应的属性值,也会注入成功
*/
@Configuration
@EnableConfigurationProperties(XxlJobSpringExecutorProperties.class)
@ConditionalOnClass(XxlJobSpringExecutor.class)
@ConditionalOnProperty(prefix = "xxl-job", value = "enabled", matchIfMissing = true)
public class XxlJobAutoConfigure {
private static Logger LOGGER = LoggerFactory.getLogger(XxlJobAutoConfigure.class);
@Autowired
private XxlJobSpringExecutorProperties properties;
@Bean
@ConditionalOnMissingBean(XxlJobSpringExecutor.class)
public XxlJobSpringExecutor xxlJobExecutor() {
LOGGER.info(">>>>>>>>>>> xxl-job config auto init.");
XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
xxlJobSpringExecutor.setAdminAddresses(properties.getAdmin().getAddresses());
xxlJobSpringExecutor.setAppname(properties.getExecutor().getAppname());
xxlJobSpringExecutor.setAddress(properties.getExecutor().getAddress());
xxlJobSpringExecutor.setIp(properties.getExecutor().getIp());
xxlJobSpringExecutor.setPort(properties.getExecutor().getPort());
xxlJobSpringExecutor.setAccessToken(properties.getAccessToken());
xxlJobSpringExecutor.setLogPath(properties.getExecutor().getLogPath());
xxlJobSpringExecutor.setLogRetentionDays(properties.getExecutor().getLogRetentionDays());
xxlJobSpringExecutor.setApplicationContext(SpringUtils.getApplicationContext());
LOGGER.info(">>>>>>>>>>> XxlJobSpringExecutor: {}", xxlJobSpringExecutor.toString());
LOGGER.info(">>>>>>>>>>> xxl-job config auto init end.");
return xxlJobSpringExecutor;
}
}
8.1.2、XxlJobSpringExecutorProperties
XxlJobSpringExecutor 的属性全部放在 XxlJobSpringExecutorProperties 中
@ConfigurationProperties(prefix = "xxl-job")
public class XxlJobSpringExecutorProperties {
private String accessToken;
@NestedConfigurationProperty
private Admin admin = new Admin();
@NestedConfigurationProperty
private Executor executor = new Executor();
public String getAccessToken() {
return accessToken;
}
public void setAccessToken(String accessToken) {
this.accessToken = accessToken;
}
public Admin getAdmin() {
return admin;
}
public void setAdmin(Admin admin) {
this.admin = admin;
}
public Executor getExecutor() {
return executor;
}
public void setExecutor(Executor executor) {
this.executor = executor;
}
public static class Admin {
private String addresses;
public String getAddresses() {
return addresses;
}
public void setAddresses(String addresses) {
this.addresses = addresses;
}
}
public static class Executor {
private String appname;
private String address;
private String ip;
private int port;
private String logPath;
private int logRetentionDays;
public String getAppname() {
return appname;
}
public void setAppname(String appname) {
this.appname = appname;
}
public String getAddress() {
return address;
}
public void setAddress(String address) {
this.address = address;
}
public String getIp() {
return ip;
}
public void setIp(String ip) {
this.ip = ip;
}
public int getPort() {
return port;
}
public void setPort(int port) {
this.port = port;
}
public String getLogPath() {
return logPath;
}
public void setLogPath(String logPath) {
this.logPath = logPath;
}
public int getLogRetentionDays() {
return logRetentionDays;
}
public void setLogRetentionDays(int logRetentionDays) {
this.logRetentionDays = logRetentionDays;
}
}
}
8.1.3、配置spring.factories
在项目的 resources\META-INF 目录新建 spring.factories 文件,并配置自动配置类 XxlJobAutoConfigure
org.springframework.boot.autoconfigure.EnableAutoConfiguration = com.xxl.job.core.properties.auto.configure.XxlJobAutoConfigure
8.2、集成spring-session
8.2.1、引入spring-session相关依赖
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.session</groupId>
<artifactId>spring-session-bom</artifactId>
<version>Corn-SR2</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<!-- Spring Session core -->
<dependency>
<groupId>org.springframework.session</groupId>
<artifactId>spring-session-core</artifactId>
</dependency>
<!-- Spring Session Data Redis -->
<dependency>
<groupId>org.springframework.session</groupId>
<artifactId>spring-session-data-redis</artifactId>
</dependency>
<!-- Spring Boot Redis Data Starter -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
<version>2.3.1.RELEASE</version>
</dependency>
8.2.2、添加相关配置
在 application.properties 文件中添加一下配置
# spring session
## equivalent to manually adding @EnableRedisHttpSession annotation
spring.session.store-type=redis
## flush-mode 有两个参数:ON_SAVE(表示在response commit前刷新缓存),IMMEDIATE(表示只要有更新,就刷新缓存)
spring.session.redis.flush-mode=on_save
## 添加后,redis中的key为spring:session:xxl-job
spring.session.redis.namespace=xxl-job
# redis config
## Redis server host.
## Redis 要开启事件通知 redis-cli config set notify-keyspace-events Egx
spring.redis.host=192.168.99.100
## Login password of the redis server.spring.redis.password=
## Redis server port.spring.redis.port=16379
9、几种任务调度平台及对比
9.1、任务调度平台有哪些
quartz、elastic-job、xxl-job
9.2、几种任务调度平台对比
quartz和xxl-job对比:
quartz采用api的方式调用任务,不方便,但是xxl-job使用的是管理界面。
quartz比xxl-job代码侵入更强
quartz调度逻辑和QuartzJobBean耦合在一个项目中,当任务增多,逻辑复杂的时候,性能会受到影响
quartz底层以抢占式获取db锁并且由抢占成功的节点运行,导致节点负载悬殊非常大;xxl-job通过执行器实现协同分配式运行任务,各个节点比较均衡。
elastic-job和xxl-job对比:
elastic-job是无中心化的,通过zookeeper的选举机制选出主服务器,如果主服务器挂了,重新选举出主服务器,因此elastic-job的扩展性和可用性较好,但是使用有一定的复杂度。使用于业务复杂,业务量大,服务器多。
xxl-job是中心式的调度平台调度执行器执行任务,使用的是DB锁来保证集群分布式调用的一致性,学习简单,操作容易,成本不高。
相对来说,xxl-job中心式的调度平台轻量级,开箱即用,操作简易,上手快,与SpringBoot有非常好的集成,而且监控界面就集成在调度中心,界面又简洁,对于企业维护起来成本不高,还有失败的邮件告警等等。这就使很多企业选择xxl-job做调度平台。
标签:执行器,知识点,triggerParam,job,调度,JOB,任务,xxl,XXL From: https://blog.csdn.net/hao_kkkkk/article/details/136853431