首页 > 其他分享 >XXL-JOB完全开发手册(一篇学会XXL-JOB所有知识点)

XXL-JOB完全开发手册(一篇学会XXL-JOB所有知识点)

时间:2024-03-24 20:58:20浏览次数:36  
标签:执行器 知识点 triggerParam job 调度 JOB 任务 xxl XXL

目录

1、什么是XXL-JOB

1.1、XXL-JOB简介

1.2、XXL-JOB构成

调度模块(调度中心):

执行模块(执行器):

任务:

1.3、XXL-JOB总结

​编辑

2、XXL-JOB原理

2.1、执行器的注册和发现

2.2、调度中心调用执行器

调度中心的操作:

执行器的操作:

3、XXL-JOB能够解决哪些问题

4、XXL-JOB优点特性

5、XXL-JOB安装部署

5.1、文档及源码

5.2、调度中心部署

5.2.1、初始化【调度数据库】

5.2.2、部署xxl-job-admin配置(调度中心)

5.2.3、调度中心部署

5.3、执行器配置及部署

5.3.1、引入maven依赖

5.3.2、执行器配置&说明

5.3.3、XxlJobConfig配置类

5.3.4、demo示例

5.3.5、执行器页面配置

5.4、任务创建与执行

5.4.1、创建任务

5.4.2、执行任务

​编辑

5.5、GLUE模式(java)

5.5.1、添加任务

5.5.2、编写代码

6、XXL-JOB集群部署

6.1、调度中心集群

6.1.1、问题概述

6.1.2、启动多个调度中心

6.1.3、配置Nginx负载均衡

6.2、执行器项目集群

6.2.1、启动多个执行器项目

​编辑

6.2.2、配置定时任务

7、XXL-JOB源码解析

7.1、XXL-JOB时序图

​编辑

7.2、XxlJobSpringExecutor 启动

7.2.1、jobHandler 注册

7.2.2、XxlJobSpringExecutor 初始化

7.2.2.1、初始化admin控制台

7.2.2.2、初始化日志清理进程

7.2.2.3、初始化出发回调进程

7.2.2.4、初始化内置事务

7.3、调度平台发布任务

7.4、执行器接收并处理任务

7.4.1、EmbedServer接收调度请求

7.4.2、ExecutorBiz(执行器单元)

7.4.3、JobThread(任务处理线程)

7.4.4、IJobHandler(任务单元)

7.5、心跳检测

8、XXL-JOB改造

8.1、自动装配

8.1.1、XxlJobAutoConfigure

8.1.2、XxlJobSpringExecutorProperties

8.1.3、配置spring.factories

8.2、集成spring-session

8.2.1、引入spring-session相关依赖

8.2.2、添加相关配置

9、几种任务调度平台及对比

9.1、任务调度平台有哪些

9.2、几种任务调度平台对比


1、什么是XXL-JOB

1.1XXL-JOB简介

xxl-job是一个分布式的任务调度平台,其核心设计目标是:学习简单、开发迅速、轻量级、易扩展,现在已经开放源代码并接入多家公司的线上产品线,开箱即用。

1.2XXL-JOB构成

xxl-job框架主要用于处理分布式的定时任务,其主要由调度中心和执行器组成。

调度模块(调度中心):

负责管理调度信息,按照调度配置发出调度请求,自身不承担业务代码。调度系统与任务解耦,提高了系统可用性和稳定性,同时调度系统性能不再受限于任务模块;

支持可视化、简单且动态的管理调度信息,包括任务新建,更新,删除,GLUE开发和任务报警等,所有上述操作都会实时生效,同时支持监控调度结果以及执行日志,支持执行器Failover。

执行模块(执行器):

负责接收调度请求并执行任务逻辑。任务模块专注于任务的执行等操作,开发和维护更加简单和高效;

接收“调度中心”的执行请求、终止请求和日志请求等。

任务:

负责执行具体的业务处理。

调度中心与执行器之间的工作流程如下:

执行流程:

1.任务执行器根据配置的调度中心的地址,自动注册到调度中心

2.达到任务触发条件,调度中心下发任务

3.执行器基于线程池执行任务,并把执行结果放入内存队列中、把执行日志写入日志文件中

4.执行器消费内存队列中的执行结果,主动上报给调度中心

5.当用户在调度中心查看任务日志,调度中心请求任务执行器,任务执行器读取任务日志文件并返回日志详情

1.3XXL-JOB总结

调度中心:统一管理任务调度平台上的调度任务,负责触发调度执行,并且提供任务管理平台。

执行器:接收调度中心的调度并且执行,可以直接执行也可以集成到项目中。

​ 调度中心和执行器两个模块分开部署,相互分离,两者之间通过RPC进行通信,其中调度中心主要是提供一个平台,管理调度信息,发送调度请求,自己不承担业务代码,而执行器接受调度中心的调度执行业务逻辑。

2XXL-JOB原理

2.1、执行器的注册和发现

执行器的注册和发现主要是关系两张表:

xxl_job_registry:执行器的实例表,保存实例信息和心跳信息。

xxl_job_group:每个服务注册的实例列表。

执行器启动线程每隔30秒向注册表xxl_job_registry请求一次,更新执行器的心跳信息,调度中心启动线程每隔30秒检测一次xxl_job_registry,将超过90秒还没有收到心跳的实例信息从xxl_job_registry删除,并更新xxl_job_group服务的实例列表信息。

2.2、调度中心调用执行器
调度中心的操作:

调度中心循环不停的执行以下操作:

(1)关闭自动提交事务

(2)利用mysql的悲观锁使其他事务无法进入

select * from xxl_job_lock where lock_name = 'schedule_lock' for update

(3)读取数据库中的xxl_job_info:记录定时任务的相关信息,该表中有trigger_next_time字段表示下一次任务的触发时间。拿到距离当前时间5s内的任务列表。

分为三种情况处理:

1、对于当前时间-任务的下一次触发时间>5,直接调过不执行,重置trigger_next_time的时间。(超过5s)

2、对于任务的下一次触发时间<当前时间<任务的下一次触发时间+5的任务(不超过5s的):

(1)开线程处理执行触发逻辑,根据当前时间更新下一次任务触发时间

(2)如果新的任务下一次触发时间-当前时间<5,放到时间轮中,时间轮是一个map:

private volatile static Map<Integer, List<Integer>> ringData = new ConcurrentHashMap<>();

(3)根据新的任务下一次触发时间更新下下一次任务触发时间

3、对于任务的下一次触发时间>当前时间,将其放入时间轮中,根据任务下一次触发时间更新下下一次任务触发时间

   (4)commit提交事务,同时释放排他锁

执行器的操作:

1、执行器接收到调度中心的调度信息,将调度信息放到对应的任务的等待队列中

2、执行器的任务处理线程从任务队列中取出调度信息,执行业务逻辑,将结果放入一个公共的等待队列中(每个任务都有一个单独的处理线程和等待队列,任务信息放入该队列中)

3、执行器有一个专门的回调线程定时批量从结果队列中取出任务结果,并且回调告知调度中心

3、XXL-JOB能够解决哪些问题

XXL-JOB是一个分布式任务调度平台,主要用于解决分布式定时任务

解析:

在平时的业务场景中,经常有一些场景需要使用定时任务,比如:

时间驱动的场景:某个时间点发送优惠券,发送短信等等。批量处理数据:批量统计上个月的账单,统计上个月销售数据等等。固定频率的场景:每隔5分钟需要执行一次。所以定时任务在平时开发中并不少见,而且对于现在快速消费的时代,每天都需要发送各种推送,消息都需要依赖定时任务去完成

4、XXL-JOB优点特性

 1、简单:支持通过Web页面对任务进行CRUD操作,操作简单,一分钟上手;

​ 2、动态:支持动态修改任务状态、启动/停止任务,以及终止运行中任务,即时生效;

​ 3、调度中心HA(中心式):调度采用中心式设计,“调度中心”自研调度组件并支持集群部署,可保证调度中心HA;

​ 4、执行器HA(分布式):任务分布式执行,任务"执行器"支持集群部署,可保证任务执行HA;

​ 5、注册中心:执行器会周期性自动注册任务, 调度中心将会自动发现注册的任务并触发执行。同时,也支持手动录入执行器地址;

​ 6、弹性扩容缩容:一旦有新执行器机器上线或者下线,下次调度时将会重新分配任务;

​ 7、路由策略:执行器集群部署时提供丰富的路由策略,包括:第一个、最后一个、轮询、随机、一致性HASH、最不经常使用、最近最久未使用、故障转移、忙碌转移等;

​ 8、故障转移:任务路由策略选择"故障转移"情况下,如果执行器集群中某一台机器故障,将会自动Failover切换到一台正常的执行器发送调度请求。

​ 9、阻塞处理策略:调度过于密集执行器来不及处理时的处理策略,策略包括:单机串行(默认)、丢弃后续调度、覆盖之前调度;

​ 10、任务超时控制:支持自定义任务超时时间,任务运行超时将会主动中断任务;

​ 11、任务失败重试:支持自定义任务失败重试次数,当任务失败时将会按照预设的失败重试次数主动进行重试;其中分片任务支持分片粒度的失败重试;

​ 12、任务失败告警:默认提供邮件方式失败告警,同时预留扩展接口,可方便的扩展短信、钉钉等告警方式;

​ 13、分片广播任务:执行器集群部署时,任务路由策略选择"分片广播"情况下,一次任务调度将会广播触发集群中所有执行器执行一次任务,可根据分片参数开发分片任务;

​ 14、动态分片:分片广播任务以执行器为维度进行分片,支持动态扩容执行器集群从而动态增加分片数量,协同进行业务处理;在进行大数据量业务操作时可显著提升任务处理能力和速度。

​ 15、事件触发:除了"Cron方式"和"任务依赖方式"触发任务执行之外,支持基于事件的触发任务方式。调度中心提供触发任务单次执行的API服务,可根据业务事件灵活触发;

5、XXL-JOB安装部署

5.1、文档及源码

文档地址:https://www.xuxueli.com/xxl-job/

源码地址:https://github.com/xuxueli/xxl-job

xxl-job: 一个分布式任务调度平台,其核心设计目标是开发迅速、学习简单、轻量级、易扩展。现已开放源代码并接入多家公司线上产品线,开箱即用。

代码结构如下:

doc 文档,即SQL脚本所在目录

db : “调度数据库”建表脚本

xxl-job-admin : 调度中心项目源码

xxl-job-core : 核心模块,公共Jar依赖

xxl-job-executor-samples : 执行器,Sample示例项目(大家可以在该项目上进行开发,也可以将现有项目改造生成执行器项目)

5.2、调度中心部署
5.2.1、初始化【调度数据库】

打开项目代码,获取 “调度数据库初始化SQL脚本” 并执行即可。

“调度数据库初始化SQL脚本” 位置为: /xxl-job/doc/db/tables_xxl_job.sql ,数据库名:xxl_job

数据库如下:

数据库表解析:

xxl_job_lock任务调度锁表;

xxl_job_group执行器信息表,维护任务执行器信息;

xxl_job_info调度扩展信息表:用于保存XXL-JOB调度任务的扩展信息,如任务分组、任务名、机器地址、执行器、执行入参和报警邮件等等;

xxl_job_log调度日志表:用于保存XXL-JOB任务调度的历史信息,如调度结果、执行结果、调度入参、调度机器和执行器等等;

xxl_job_log_report调度日志报表:用户存储XXL-JOB任务调度日志的报表,调度中心报表功能页面会用到;

xxl_job_logglue任务GLUE日志:用于保存GLUE更新历史,用于支持GLUE的版本回溯功能;

xxl_job_registry执行器注册表,维护在线的执行器和调度中心机器地址信息;

xxl_job_user系统用户表;

调度中心支持集群部署,集群情况下各节点务必连接同一个mysql实例

如果mysql做主从,调度中心集群节点务必强制走主库

5.2.2、部署xxl-job-admin配置(调度中心)

调度中心统一管理任务调度平台上调度任务,负责出发调度执行,并且提供任务管理平台系统

打开 xxl-job-admin 的配置文件,/xxl-job/xxl-job-admin/src/main/resources/application.properties 对调度中心进行配置

调度中心配置如下(根据自己情况修改,切勿直接复制):

### 调度中心JDBC链接
spring.datasource.url=jdbc:mysql://database_ip:port/xxl_job?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&serverTimezone=Asia/Shanghai
spring.datasource.username=your_username
spring.datasource.password=your_password
spring.datasource.driver-class-name=com.mysql.jdbc.Driver
### 报警邮箱
spring.mail.host=smtp.qq.com
spring.mail.port=25
[email protected]
spring.mail.password=xxx
spring.mail.properties.mail.smtp.auth=true
spring.mail.properties.mail.smtp.starttls.enable=true
spring.mail.properties.mail.smtp.starttls.required=true
spring.mail.properties.mail.smtp.socketFactory.class=javax.net.ssl.SSLSocketFactory
### 调度中心通讯TOKEN:非空时启用;
xxl.job.accessToken=
### 调度中心国际化配置:"zh_CN"/中文简体, "zh_TC"/中文繁体 and "en"/英文;
xxl.job.i18n=zh_CN
## 调度线程池最大线程配置
xxl.job.triggerpool.fast.max=200
xxl.job.triggerpool.slow.max=100
### 调度中心日志表数据保存天数:过期日志自动清理;限制大于等于7时生效,否则, 如-1,关闭自动清理功能;
xxl.job.logretentiondays=30

其中重要配置如下:

server.port : 根据情况修改端口

spring.datasource.url 指向刚才准备的数据库

spring.datasource.password : 记得修改成自己的数据库密码

spring.mail.username 配置自己的邮件账号

spring.mail.password 邮件的授权码

5.2.3、调度中心部署

如果已经正确进行上述配置,可将项目编译打包部署

启动之后,浏览器访问 http://localhost:18080/xxl-job-admin (该地址执行器将会使用到,作为回调地址)

默认登录账号 “admin/123456”, 登录后运行界面如下图所示:

 

调度中心集群部署时,需要注意以下事项:

1、DB配置保持一致

2、集群机器时钟保持一致(单机集群可以忽视)

3、建议:推荐通过nginx为调度中心集群做负载均衡,分配域名。调度中心访问、执行器回调配置,调用API服务等操作均通过该域名进行。

5.3、执行器配置及部署
5.3.1、引入maven依赖
 <dependency>

            <groupId>com.xuxueli</groupId>

            <artifactId>xxl-job-core</artifactId>

            <version>2.4.0-SNAPSHOT</version>

  </dependency>
5.3.2、执行器配置&说明
### 调度中心部署根地址 [选填]:如调度中心集群部署存在多个地址则用逗号分隔。执行器将会使用该地址进行"执行器心跳注册"和"任务结果回调";为空则关闭自动注册;

xxl.job.admin.addresses=http://127.0.0.1:8080/xxl-job-admin

### 执行器通讯TOKEN [选填]:非空时启用;

xxl.job.accessToken=

### 执行器AppName [选填]:执行器心跳注册分组依据;为空则关闭自动注册

xxl.job.executor.appname=xxl-job-demo

### 执行器注册 [选填]:优先使用该配置作为注册地址,为空时使用内嵌服务 ”IP:PORT“ 作为注册地址。从而更灵活的支持容器类型执行器动态IP和动态映射端口问题。

xxl.job.executor.address=

### 执行器IP [选填]:默认为空表示自动获取IP,多网卡时可手动设置指定IP,该IP不会绑定Host仅作为通讯实用;地址信息用于 "执行器注册" 和 "调度中心请求并触发任务";

xxl.job.executor.ip=

### 执行器端口号 [选填]:小于等于0则自动获取;默认端口为9999,单机部署多个执行器时,注意要配置不同执行器端口;

xxl.job.executor.port=9999

### 执行器运行日志文件存储磁盘路径 [选填] :需要对该路径拥有读写权限;为空则使用默认路径;

xxl.job.executor.logpath=/data/applogs/xxl-job/jobhandler

### 执行器日志文件保存天数 [选填] : 过期日志自动清理, 限制值大于等于3时生效; 否则, 如-1, 关闭自动清理功能;

xxl.job.executor.logretentiondays=30
5.3.3XxlJobConfig配置类

执行器组件配置,现已自动装配,执行器中的XxlJobConfig将根据配置文件中配置生成XxlJobSpringExecuto

@Configuration

@Slf4j

public class XxlJobConfig {



    @Value("${xxl.job.admin.addresses}")

    private String adminAddresses;



    @Value("${xxl.job.executor.appname}")

    private String appName;



    /*@Value("${xxl.job.executor.ip}")

    private String ip;*/



    @Value("${xxl.job.executor.port}")

    private int port;



    @Value("${xxl.job.accessToken}")

    private String accessToken;



    @Value("${xxl.job.executor.logpath}")

    private String logPath;



    @Value("${xxl.job.executor.logretentiondays}")

    private int logRetentionDays;



    @Bean(initMethod = "start", destroyMethod = "destroy")

    public XxlJobSpringExecutor xxlJobExecutor() {

        log.info(">>>>>>>>>>> xxl-job config init.");

        XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();

        xxlJobSpringExecutor.setAdminAddresses(adminAddresses);

        xxlJobSpringExecutor.setAppName(appName);

        //xxlJobSpringExecutor.setIp(ip);

        xxlJobSpringExecutor.setPort(port);

        xxlJobSpringExecutor.setAccessToken(accessToken);

        xxlJobSpringExecutor.setLogPath(logPath);

        xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);

        return xxlJobSpringExecutor;

    }


}
5.3.4demo示例

在com.xxl.job.executor.service.jobhandler.SampleXxlJob中提供了简单的定时任务实例,为方便用户参考与快速实用,示例执行器内原生提供多个Bean模式任务Handler,可以直接配置实用,如下:

        demoJobHandler:简单示例任务,任务内部模拟耗时任务逻辑,用户可在线体验Rolling Log等功能;

        shardingJobHandler:分片示例任务,任务内部模拟处理分片参数,可参考熟悉分片任务;

        httpJobHandler:通用HTTP任务Handler;业务方只需要提供HTTP链接等信息即可,不限制语言、平台;

demo示例:

/**

 * XxlJob开发示例(Bean模式)

 *

 * 开发步骤:

 * 1、在Spring Bean实例中,开发Job方法,方式格式要求为 "public ReturnT<String> execute(String param)"

 * 2、为Job方法添加注解 "@XxlJob(value="自定义jobhandler名称", init = "JobHandler初始化方法", destroy = "JobHandler销毁方法")",注解value值对应的是调度中心新建任务的JobHandler属性的值。

 * 3、执行日志:需要通过 "XxlJobLogger.log" 打印执行日志;

 *

 * @author xuxueli 2019-12-11 21:52:51

 */

@Component

public class SampleXxlJob {

    private static Logger logger = LoggerFactory.getLogger(SampleXxlJob.class);

 /**

     * 1、简单任务示例(Bean模式)

     */

    @XxlJob("demoJobHandler")

    public ReturnT<String> demoJobHandler(String param) throws Exception {

        logger.info("XXL-JOB, Hello World. param={}",param);

        return ReturnT.SUCCESS;

    }

    //...省略...

}

【重要】 如果我们要写自己的定时任务,参照上面方法,在方法上注解一个@XxlJob("任务名字") ,方法可以接受一个字符串参数,方法需要返回ReturnT格式

5.3.5、执行器页面配置

进入调度中心:http://localhost:18080/xxl-job-admin

进入执行器管理,点击操作 -> 编辑,进入执行器编辑页面

appName : 执行器的名字,可以任意填写

名称:任意填写

注册方式:调度中心是通过RPC的方式对执行器发起调度,所以这里需要的是执行器项目的ip:port ,注意,该端口不是执行器项目的server.port ,而是:xxl.job.executor.port 端口。你可以选择自动注册,也可以手动录入。

5.4、任务创建与执行
5.4.1、创建任务

进入调度中心:http://localhost:18080/xxl-job-admin

进入任务管理,点击新增,进入新增任务页面

执行器:任务的绑定的执行器,任务触发调度时将会自动发现注册成功的执行器, 实现任务自动发现功能; 另一方面也可以方便的进行任务分组。每个任务必须绑定一个执行器, 可在 "执行器管理" 进行设置;

任务描述:任务的描述信息,便于任务管理

路由策略:

FIRST(第一个):固定选择第一个机器;

LAST(最后一个):固定选择最后一个机器;

ROUND(轮询):轮询;

RANDOM(随机):随机选择在线的机器;

CONSISTENT_HASH(一致性HASH):每个任务按照Hash算法固定选择某一台机器,且所有任务均匀散列在不同机器上;

LEAST_FREQUENTLY_USED(最不经常使用):使用频率最低的机器优先被选举;

LEAST_RECENTLY_USED(最近最久未使用):最久未使用的机器优先被选举;

FAILOVER(故障转移):按照顺序依次进行心跳检测,第一个心跳检测成功的机器选定为目标执行器并发起调度;

BUSYOVER(忙碌转移):按照顺序依次进行空闲检测,第一个空闲检测成功的机器选定为目标执行器并发起调度;

SHARDING_BROADCAST(分片广播):广播触发对应集群中所有机器执行一次任务,同时系统自动传递分片参数;可根据分片参数开发分片任务;

Cron定时任务的执行时间规则,时间表达式

运行模式:BEAN模式:任务以JobHandler方式维护在执行器端;需要结合 "JobHandler" 属性匹配执行器中任务;

GLUE模式(Java):任务以源码方式维护在调度中心;该模式的任务实际上是一段继承自IJobHandler的Java类代码并 "groovy" 源码方式维护,它在执行器项目中运行,可使用@Resource/@Autowire注入执行器里中的其他服务;

GLUE模式(Shell):任务以源码方式维护在调度中心;该模式的任务实际上是一段 "shell" 脚本;

GLUE模式(Python):任务以源码方式维护在调度中心;该模式的任务实际上是一段 "python" 脚本;

GLUE模式(PHP):任务以源码方式维护在调度中心;该模式的任务实际上是一段 "php" 脚本;

GLUE模式(NodeJS):任务以源码方式维护在调度中心;该模式的任务实际上是一段 "nodejs" 脚本;

GLUE模式(PowerShell):任务以源码方式维护在调度中心;该模式的任务实际上是一段 "PowerShell" 脚本;

JobHandler行模式为 "BEAN模式" 时生效,对应执行器中新开发的 JobHandler 类 “@JobHandler” 注解自定义的 value 值

阻塞处理策略:调度过于密集执行器来不及处理时的处理策略;

       单机串行(默认):调度请求进入单机执行器后,调度请求进入FIFO队列并以串行方式运行;

       丢弃后续调度:调度请求进入单机执行器后,发现执行器存在运行的调度任务,本次请求将会被丢弃并标记为失败;

       覆盖之前调度:调度请求进入单机执行器后,发现执行器存在运行的调度任务,将会终止运行中的调度任务并清空队      列,然后运行本地调度任务;

子任务:每个任务都拥有一个唯一的任务ID(任务ID可以从任务列表获取),当本任务执行结束并且执行成功时,将会触发子任务ID所对应的任务的一次主动调度

任务超时时间:支持自定义任务超时时间,任务运行超时将会主动中断任务

失败重试次数:支持自定义任务失败重试次数,当任务失败时将会按照预设的失败重试次数主动进行重试

报警邮件:任务调度失败时邮件通知的邮箱地址,支持配置多邮箱地址,配置多个邮箱地址时用逗号分隔

负责人:任务负责人

任务参数:任务执行所需的参数

5.4.2、执行任务

配置任务后,右击选择执行一次进行任务调度测试

任务执行后进入调度日志,可以查看任务执行日志

5.5GLUE模式(java
5.5.1、添加任务

该模式支持在线编辑定时任务的内容,立刻执行,无需再开发工具中编辑代码,也无需重启项目。

请点击任务右侧 “GLUE” 按钮,进入 “GLUE编辑器开发界面” ,见下图。“GLUE模式(Java)” 运行模式的任务默认已经初始化了示例任务代码,即打印Hello World。

任务以源码方式维护在调度中心,支持通过Web IDE在线更新,实时编译和生效,因此不需要指定JobHandler

5.5.2、编写代码

保存之后可以在操作按钮里面去编写任务

(“GLUE模式(Java)” 运行模式的任务实际上是一段继承自IJobHandler的Java类代码,它在执行器项目中运行,可使用@Resource/@Autowire注入执行器里中的其他服务),比如我的定时任务如下,编辑好之后点击保存

保存好之后,启动定时任务,效果如下:

 

6、XXL-JOB集群部署

6.1、调度中心集群
6.1.1、问题概述

调度中心支持集群部署,提升调度系统容灾和可用性。调度中心集群部署时,几点要求和建议:

(1)DB配置保持一致;

(2)集群机器时钟保持一致(单机集群忽视);

(3)当启动多个调度器时,执行器配置调度中心部署跟地址可以用逗号分隔。执行器将会使用该地址进行"执行器心跳注册"和"任务结果回调";为空则关闭自动注册;

建议:推荐通过nginx为调度中心集群做负载均衡,分配域名。调度中心访问、执行器回调配置、调用API服务等操作均通过该域名进行。

6.1.2、启动多个调度中心

修改调度中心端口,启动多个调度中心,我这里启动两个如

6.1.3、配置Nginx负载均衡

当启动多个调度器时,执行器配置调度中心部署跟地址可以用逗号分隔。执行器将会使用该地址进行“执行器心跳注册”和“任务结果回调”;为空则关闭自动注册;

建议:推荐通过nginx为调度中心集群做负载均衡,分配域名。调度中心访问、执行器回调配置、调用API服务等操作均通过该域名进行。

我们启动了2个调度中心,那么我的执行器项目该注册到哪个调度中心呢?我们通过Nginx来解决这个问题,原理如下图:

我们再hosts配置 www.jobs.com 作为nginx的主机域名,然后反向代理到多个调度中心,这样一来执行器就只需要注册到 www.jobs.com Nginx即可。

修改 C:\Windows\System32\drivers\etc\hosts增加配置如下:

127.0.0.1  www.jobs.com

Nginx配置如下:

​
#调度中心

upstream jobs{

 server localhost:18080;

 server localhost:18081;

}

server {

      listen       80;

      #使用域名

      server_name  www.jobs.com;

      #charset koi8-r;

      #access_log  logs/host.access.log  main;

      location / {

       #调度中心反向代理配置

          proxy_pass   http://jobs/;

      }

      #error_page  404              /404.html;

      # redirect server error pages to the static page /50x.html

      #

      error_page   500 502 503 504  /50x.html;

      location = /50x.html {

          root   html;

      }

  }

​

启动Nginx,通过浏览器访问 Jobs - Employment - Career Search, Jobs Near You | Jobs.com ,可以访问到调度中心的管理界面

6.2、执行器项目集群

执行器支持集群部署,提升调度系统可用性,同时提升任务处理能力。

执行器集群部署时,几点要求和建议:

(1)执行器回调地址(xxl.job.admin.addresses)需要保持一致;执行器根据该配置进行执行器自动注册等操作。

(2)同一个执行器集群内AppName(xxl.job.executor.appname)需要保持一致;调度中心根据该配置动态发现不同集群的在线执行器列表。

6.2.1、启动多个执行器项目

现在对执行器项目做集群,修改xxl-job-executor-sample-springboot配置文件application.properties

server.port : 既然是做集群,项目端口需要修改

xxl.job.admin.addresses : 调度中心地址需要修改成www.jobs.com ,多个执行器配置同一个地址。

xxl.job.executor.port : RPC通信端口也要修改,多个执行器该端口需要不一样

第一个实例配置:

server.port=19090

xxl.job.admin.addresses=http://www.jobs.com/xxl-job-admin #对应Nginx地址

xxl.job.executor.port=9999

第二个实例配置:

server.port=19091

xxl.job.admin.addresses=http://www.jobs.com/xxl-job-admin

xxl.job.executor.port=9998

在 Configurations中配置,允许启动多个实例

启动实例如下:

6.2.2、配置定时任务

通过http://www.jobs.com/xxl-job-admin 访问调度中心管理界面,在执行器管理中可以看到多台执行器实例

在任务管理中,可以编辑任务,然后选择路由策略,比如:选择轮询,然后启动任务,就会看到两个执行器项目轮着执行定时任务

7、XXL-JOB源码解析

7.1XXL-JOB时序图
7.2XxlJobSpringExecutor 启动

项目启动后获取配置文件中的属性值并实例化配置类 XxlJobSpringExecutor,XxlJobSpringExecutor 实现了 SmartInitializingSingleton 接口的 afterSingletonsInstantiated 方法,在实例化 XxlJobSpringExecutor 后触发执行,XxlJobSpringExecutor 在该方法里做了两件事:

  1. 调用 initJobHandlerMethodRepository 方法扫描项目中带 @XxlJob 注解的方法(即 jobHandler)并注册;
  2. 调用父类 XxlJobExecutor 的 start 方法启动 Executor,并初始化核心组件:
    • 初始化 admin 控制台:initAdminBizList(adminAddresses, accessToken)
    • 初始化日志清理进程 JobLogFileCleanThread:JobLogFileCleanThread.getInstance().start(logRetentionDays)
    • 初始化触发回调进程 TriggerCallbackThread:TriggerCallbackThread.getInstance().start()
    • 初始化内置服务 executor-server:initEmbedServer(address, ip, port, appname, accessToken)
7.2.1、jobHandler 注册

在项目启动时,执行器会通过 “@JobHandler” 识别 Spring 容器中 “Bean模式任务”,以注解的 value 属性为 key 管理起来,保存至 jobHandlerRepository。

1  initJobHandlerMethodRepository(ApplicationContext applicationContext)

该方法扫描项目中的类,将带有 @XxlJob 注解的 job 进行解析,并调用 registJobHandler(String name, IJobHandler jobHandler) 进行注册:

// ---------------------- job handler repository ----------------------
private static ConcurrentMap<String, IJobHandler> jobHandlerRepository = new ConcurrentHashMap<String, IJobHandler>();

// registry jobhandler
/**
 * name: @XxlJob 的 value, jobHandler 的名字
 * bean:带 @XxlJob 方法对应的类名
 * method: 带 @XxlJob 注解的方法名
 * initMethod: @XxlJob 的 init 值对应方法的全限定名,即 execute 方法调用前执行的方法
 * destroyMethod: @XxlJob 的 destroy 值对应方法的全限定名,即 execute 方法调用后执行的方法
 */
registJobHandler(name, new MethodJobHandler(bean, method, initMethod, destroyMethod));

2  MethodJobHandler

public class MethodJobHandler extends IJobHandler {

private final Object target;
    private final Method method;
    private Method initMethod;
    private Method destroyMethod;

public MethodJobHandler(Object target, Method method, Method initMethod, Method destroyMethod) {
        this.target = target;
        this.method = method;

this.initMethod =initMethod;
        this.destroyMethod =destroyMethod;
    }

@Override
    public ReturnT<String> execute(String param) throws Exception {
        return (ReturnT<String>) method.invoke(target, new Object[]{param});
    }

@Override
    public void init() throws InvocationTargetException, IllegalAccessException {
        if(initMethod != null) {
            initMethod.invoke(target);
        }
    }

@Override
    public void destroy() throws InvocationTargetException, IllegalAccessException {
        if(destroyMethod != null) {
            destroyMethod.invoke(target);
        }
    }
}
7.2.2、XxlJobSpringExecutor 初始化
7.2.2.1、初始化admin控制台

initAdminBizList 实例化 AdminBizClient,并将其存在 adminBizList 中

private static List<AdminBiz> adminBizList;
private void initAdminBizList(String adminAddresses, String accessToken) throws Exception {
    if (adminAddresses!=null && adminAddresses.trim().length()>0) {
        // 多个 admin 控制台地址以 "," 分隔
        for (String address: adminAddresses.trim().split(",")) {
            if (address!=null && address.trim().length()>0) {
                AdminBiz adminBiz = new AdminBizClient(address.trim(), accessToken);
                if (adminBizList == null) {
                    adminBizList = new ArrayList<AdminBiz>();
                }
                adminBizList.add(adminBiz);
            }
        }
    }
}
7.2.2.2、初始化日志清理进程
7.2.2.3、初始化出发回调进程
7.2.2.4、初始化内置事务

private void initEmbedServer(String address, String ip, int port, String appname, String accessToken) throws Exception {
    // start
    embedServer = new EmbedServer();
    embedServer.start(address, port, appname, accessToken);
}

 

initEmbedServer 方法内部直接调用 EmbedServer#start 方法

public void start(final String address, final int port, final String appname, final String accessToken) {
    // 1 初始化 executorBiz 执行单元和线程池
    executorBiz = new ExecutorBizImpl();
    ThreadPoolExecutor bizThreadPool = new ThreadPoolExecutor(
        0,
        200,
        60L,
        TimeUnit.SECONDS,
        new LinkedBlockingQueue<Runnable>(2000),
        new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, "xxl-rpc, EmbedServer bizThreadPool-" + r.hashCode());
            }
        },
        new RejectedExecutionHandler() {
            @Override
            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                throw new RuntimeException("xxl-job, EmbedServer bizThreadPool is EXHAUSTED!");
            }
        });
    // 2 启动 ServerBootstrap 并绑定端口号
    ServerBootstrap bootstrap = new ServerBootstrap();
    bootstrap.group(bossGroup, workerGroup)
            .channel(NioServerSocketChannel.class)
            .childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel channel) throws Exception {
                    channel.pipeline()
                            .addLast(new IdleStateHandler(0, 0, 30 * 3, TimeUnit.SECONDS))  // beat 3N, close if idle
                            .addLast(new HttpServerCodec())
                            .addLast(new HttpObjectAggregator(5 * 1024 * 1024))  // merge request & reponse to FULL
                            .addLast(new EmbedHttpServerHandler(executorBiz, accessToken, bizThreadPool));
                }
            })
            .childOption(ChannelOption.SO_KEEPALIVE, true);

ChannelFuture future = bootstrap.bind(port).sync();
   
    // 3 将执行器信息注册到调度中心(admin 控制台)
    startRegistry(appname, address);
   
    // 4 wait util stop
    future.channel().closeFuture().sync();
}

EmbedServer 的 start 方法主要处理以下几件事:

  1. 初始化 executorBiz 和线程池 bizThreadPool
  2. 启动 ServerBootstrap 并绑定端口号
    调度中心实际的调度请求由 EmbedHttpServerHandler 处理
  3. 将执行器信息注册到调度中心(admin 控制台)

public void startRegistry(final String appname, final String address) {
    // start registry
    ExecutorRegistryThread.getInstance().start(appname, address);
}

 

ExecutorRegistryThread#start 方法封装执行器信息,并调用 AdminBizClient#registry 方法将执行器信息注册于调度中心,核心代码如下:

RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appname, address);
for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {
    try {
        // AdminBizClient 实现 AdminBiz 接口,将执行器信息进行注册
        ReturnT<String> registryResult = adminBiz.registry(registryParam);
        if (registryResult!=null && ReturnT.SUCCESS_CODE == registryResult.getCode()) {
            registryResult = ReturnT.SUCCESS;
            logger.debug(">>>>>>>>>>> xxl-job registry success, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
            break;
        } else {
            logger.info(">>>>>>>>>>> xxl-job registry fail, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
        }
    } catch (Exception e) {
        logger.info(">>>>>>>>>>> xxl-job registry error, registryParam:{}", registryParam, e);
    }
}

 

7.3、调度平台发布任务

前端页面处罚具体任务后,转发至 JobInfoController 的 triggerJob 方法

/**
 * 处理触发请求
 *
 * @param id                任务 id
 * @param executorParam     执行器参数
 * @param addressList       执行器地址
 * @return
 */
@RequestMapping("/trigger")
@ResponseBody
public ReturnT<String> triggerJob(int id, String executorParam, String addressList) {
    JobTriggerPoolHelper.trigger(id, TriggerTypeEnum.MANUAL, -1, null, executorParam, addressList);
    return ReturnT.SUCCESS;
}

JobTriggerPoolHelper 的 trigger 方法调用 addTrigger 方法,addTrigger 方法将调度请求交给线程池处理,线程池中通过 XxlJobTrigger 的 trigger 方法处理实际请求

public static void trigger(int jobId, TriggerTypeEnum triggerType, int failRetryCount, String executorShardingParam, String executorParam, String addressList) {
    // 获取任务详细信息
    XxlJobInfo jobInfo = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(jobId);
    if (jobInfo == null) {
        logger.warn(">>>>>>>>>>>> trigger fail, jobId invalid,jobId={}", jobId);
        return;
    }
    // 设置任务执行参数
    if (executorParam != null) {
        jobInfo.setExecutorParam(executorParam);
    }
    int finalFailRetryCount = failRetryCount>=0?failRetryCount:jobInfo.getExecutorFailRetryCount();
    // 获取执行器详细信息
    XxlJobGroup group = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().load(jobInfo.getJobGroup());

// 如果手动设置执行任务机器的地址,则覆盖执行器原有的地址(即新增执行器时自动注册或手动录入的地址)
    if (addressList!=null && addressList.trim().length()>0) {
        group.setAddressType(1);
        group.setAddressList(addressList.trim());
    }

// sharding param
    int[] shardingParam = null;
    if (executorShardingParam!=null){
        String[] shardingArr = executorShardingParam.split("/");
        if (shardingArr.length==2 && isNumeric(shardingArr[0]) && isNumeric(shardingArr[1])) {
            shardingParam = new int[2];
            shardingParam[0] = Integer.valueOf(shardingArr[0]);
            shardingParam[1] = Integer.valueOf(shardingArr[1]);
        }
    }
    // 如果是分片广播任务,则根据执行器数量将任务发布至各个机器地址
    if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null)
            && group.getRegistryList()!=null && !group.getRegistryList().isEmpty()
            && shardingParam==null) {
        for (int i = 0; i < group.getRegistryList().size(); i++) {
            processTrigger(group, jobInfo, finalFailRetryCount, triggerType, i, group.getRegistryList().size());
        }
    } else { // 其他路由策略的任务发布一条即可
        if (shardingParam == null) {
            shardingParam = new int[]{0, 1};
        }
        processTrigger(group, jobInfo, finalFailRetryCount, triggerType, shardingParam[0], shardingParam[1]);
    }
}

processTrigger 方法主要处理以下几件事:

  1. 初始化触发参数:TriggerParam
  2. 根据不同路由策略获取执行器地址
    根据不同的 executorRouteStrategy 策略获取 ExecutorRouter,并调用 ExecutorRouter 的 route 方法选择一个执行器地址

// 根据不同路由策略获取执行器 ip 地址

routeAddressResult = executorRouteStrategyEnum.getRouter().route(triggerParam, group.getRegistryList());

   3.   发送调度请求至远程执行器

runExecutor 方法根据执行器地址获取执行单元 ExecutorBizClient,并调用 ExecutorBizClient 的 run 方法将调度请求以 HTTP POST 形式发送,由执行器的 EmbedServer 接受

public ReturnT<String> run(TriggerParam triggerParam) {
    return XxlJobRemotingUtil.postBody(addressUrl + "run", accessToken, timeout, triggerParam, String.class);
}
  1. 保存任务执行信息
private static void processTrigger(XxlJobGroup group, XxlJobInfo jobInfo, int finalFailRetryCount, TriggerTypeEnum triggerType, int index, int total){

// param
    ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(jobInfo.getExecutorBlockStrategy(), ExecutorBlockStrategyEnum.SERIAL_EXECUTION);  // block strategy
    ExecutorRouteStrategyEnum executorRouteStrategyEnum = ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null);    // route strategy
    String shardingParam = (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==executorRouteStrategyEnum)?String.valueOf(index).concat("/").concat(String.valueOf(total)):null;

// 1、save log-id
    XxlJobLog jobLog = new XxlJobLog();
    jobLog.setJobGroup(jobInfo.getJobGroup());
    jobLog.setJobId(jobInfo.getId());
    jobLog.setTriggerTime(new Date());
    XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().save(jobLog);
    logger.debug(">>>>>>>>>>> xxl-job trigger start, jobId:{}", jobLog.getId());

// 2、init trigger-param
    TriggerParam triggerParam = new TriggerParam();
    triggerParam.setJobId(jobInfo.getId());
    triggerParam.setExecutorHandler(jobInfo.getExecutorHandler());
    triggerParam.setExecutorParams(jobInfo.getExecutorParam());
    triggerParam.setExecutorBlockStrategy(jobInfo.getExecutorBlockStrategy());
    triggerParam.setExecutorTimeout(jobInfo.getExecutorTimeout());
    triggerParam.setLogId(jobLog.getId());
    triggerParam.setLogDateTime(jobLog.getTriggerTime().getTime());
    triggerParam.setGlueType(jobInfo.getGlueType());
    triggerParam.setGlueSource(jobInfo.getGlueSource());
    triggerParam.setGlueUpdatetime(jobInfo.getGlueUpdatetime().getTime());
    triggerParam.setBroadcastIndex(index);
    triggerParam.setBroadcastTotal(total);

// 3、init address
    String address = null;
    ReturnT<String> routeAddressResult = null;
    if (group.getRegistryList()!=null && !group.getRegistryList().isEmpty()) {
        if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST == executorRouteStrategyEnum) {
            if (index < group.getRegistryList().size()) {
                address = group.getRegistryList().get(index);
            } else {
                address = group.getRegistryList().get(0);
            }
        } else {
            // 根据不同路由策略获取执行器 ip 地址
            routeAddressResult = executorRouteStrategyEnum.getRouter().route(triggerParam, group.getRegistryList());
            if (routeAddressResult.getCode() == ReturnT.SUCCESS_CODE) {
                address = routeAddressResult.getContent();
            }
        }
    } else {
        routeAddressResult = new ReturnT<String>(ReturnT.FAIL_CODE, I18nUtil.getString("jobconf_trigger_address_empty"));
    }

// 4、trigger remote executor
    ReturnT<String> triggerResult = null;
    if (address != null) {
        triggerResult = runExecutor(triggerParam, address);
    } else {
        triggerResult = new ReturnT<String>(ReturnT.FAIL_CODE, null);
    }

// 5、collection trigger info
    StringBuffer triggerMsgSb = new StringBuffer();
    triggerMsgSb.append(I18nUtil.getString("jobconf_trigger_type")).append(":").append(triggerType.getTitle());
    triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_admin_adress")).append(":").append(IpUtil.getIp());
    triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_exe_regtype")).append(":")
            .append( (group.getAddressType() == 0)?I18nUtil.getString("jobgroup_field_addressType_0"):I18nUtil.getString("jobgroup_field_addressType_1") );
    triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_exe_regaddress")).append(":").append(group.getRegistryList());
    triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorRouteStrategy")).append(":").append(executorRouteStrategyEnum.getTitle());
    if (shardingParam != null) {
        triggerMsgSb.append("("+shardingParam+")");
    }
    triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorBlockStrategy")).append(":").append(blockStrategy.getTitle());
    triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_timeout")).append(":").append(jobInfo.getExecutorTimeout());
    triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorFailRetryCount")).append(":").append(finalFailRetryCount);

triggerMsgSb.append("<br><br><span style=\"color:#00c0ef;\" > >>>>>>>>>>>"+ I18nUtil.getString("jobconf_trigger_run") +"<<<<<<<<<<< </span><br>")
            .append((routeAddressResult!=null&&routeAddressResult.getMsg()!=null)?routeAddressResult.getMsg()+"<br><br>":"").append(triggerResult.getMsg()!=null?triggerResult.getMsg():"");

// 6、save log trigger-info
    jobLog.setExecutorAddress(address);
    jobLog.setExecutorHandler(jobInfo.getExecutorHandler());
    jobLog.setExecutorParam(jobInfo.getExecutorParam());
    jobLog.setExecutorShardingParam(shardingParam);
    jobLog.setExecutorFailRetryCount(finalFailRetryCount);
    //jobLog.setTriggerTime();
    jobLog.setTriggerCode(triggerResult.getCode());
    jobLog.setTriggerMsg(triggerMsgSb.toString());
    XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateTriggerInfo(jobLog);

logger.debug(">>>>>>>>>>> xxl-job trigger end, jobId:{}", jobLog.getId());
}
7.4、执行器接收并处理任务

执行器接收到调度中心的调度请求时,如果任务类型为 “Bean模式”,将会匹配 Spring 容器中的 “Bean模式任务”,然后调用其 execute 方法,执行任务逻辑。如果任务类型为 “GLUE模式”,将会加载 GLue 代码,实例化 Java 对象,注入依赖的 Spring 服务(注意:Glue代码中注入的Spring服务,必须存在与该“执行器”项目的Spring容器中),然后调用execute方法,执行任务逻辑。

7.4.1EmbedServer接收调度请求

EmbedHttpServerHandler 调用 channelRead0 方法处理任务调度请求,每个请求通过线程池 bizThreadPool 新开一个线程进行处理

@Override
protected void channelRead0(final ChannelHandlerContext ctx, FullHttpRequest msg) throws Exception {
    // invoke
    bizThreadPool.execute(new Runnable() {
        @Override
        public void run() {
            // do invoke
            Object responseObj = process(httpMethod, uri, requestData, accessTokenReq);
            // to json
            String responseJson = GsonTool.toJson(responseObj);
            // write response
            writeResponse(ctx, keepAlive, responseJson);
        }
    });
}

process 方法根据 uri 的不同取值调用 ExecutorBiz 不同的处理方法:

if ("/beat".equals(uri)) {
    return executorBiz.beat();
} else if ("/idleBeat".equals(uri)) {
    IdleBeatParam idleBeatParam = GsonTool.fromJson(requestData, IdleBeatParam.class);
    return executorBiz.idleBeat(idleBeatParam);
} else if ("/run".equals(uri)) {
    TriggerParam triggerParam = GsonTool.fromJson(requestData, TriggerParam.class);
    return executorBiz.run(triggerParam);
} else if ("/kill".equals(uri)) {
    KillParam killParam = GsonTool.fromJson(requestData, KillParam.class);
    return executorBiz.kill(killParam);
} else if ("/log".equals(uri)) {
    LogParam logParam = GsonTool.fromJson(requestData, LogParam.class);
    return executorBiz.log(logParam);
} else {
    return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping(" + uri + ") not found.");
}

7.4.2ExecutorBiz(执行器单元)

ExecutorBiz#run(TriggerParam triggerParam) 方法处理任务请求,源码如下

public ReturnT<String> run(TriggerParam triggerParam) {
    // load old:jobHandler + jobThread
    JobThread jobThread = XxlJobExecutor.loadJobThread(triggerParam.getJobId());
    IJobHandler jobHandler = jobThread!=null?jobThread.getHandler():null;
    String removeOldReason = null;

// valid:jobHandler + jobThread
    // 1 根据任务模式(GlueTypeEnum)的不同进行不同的任务处理逻辑
    GlueTypeEnum glueTypeEnum = GlueTypeEnum.match(triggerParam.getGlueType());
    if (GlueTypeEnum.BEAN == glueTypeEnum) { // “Bean模式” 任务

// 根据 triggerParam.getExecutorHandler() 加载任务处理器,此处 triggerParam.getExecutorHandler() 等于 @XxlJob 注解中的 value 值
        IJobHandler newJobHandler = XxlJobExecutor.loadJobHandler(triggerParam.getExecutorHandler());

// valid old jobThread
        if (jobThread!=null && jobHandler != newJobHandler) {
            // change handler, need kill old thread
            removeOldReason = "change jobhandler or glue type, and terminate the old job thread.";

jobThread = null;
            jobHandler = null;
        }

// valid handler
        if (jobHandler == null) {
            jobHandler = newJobHandler;
            if (jobHandler == null) {
                return new ReturnT<String>(ReturnT.FAIL_CODE, "job handler [" + triggerParam.getExecutorHandler() + "] not found.");
            }
        }

} else if (GlueTypeEnum.GLUE_GROOVY == glueTypeEnum) { // “GLUE模式(Java)” 任务

// valid old jobThread
        if (jobThread != null &&
                !(jobThread.getHandler() instanceof GlueJobHandler
                    && ((GlueJobHandler) jobThread.getHandler()).getGlueUpdatetime()==triggerParam.getGlueUpdatetime() )) {
            // change handler or gluesource updated, need kill old thread
            removeOldReason = "change job source or glue type, and terminate the old job thread.";

jobThread = null;
            jobHandler = null;
        }

// valid handler
        if (jobHandler == null) {
            try {
                IJobHandler originJobHandler = GlueFactory.getInstance().loadNewInstance(triggerParam.getGlueSource());
                jobHandler = new GlueJobHandler(originJobHandler, triggerParam.getGlueUpdatetime());
            } catch (Exception e) {
                logger.error(e.getMessage(), e);
                return new ReturnT<String>(ReturnT.FAIL_CODE, e.getMessage());
            }
        }
    } else if (glueTypeEnum!=null && glueTypeEnum.isScript()) { // 其他脚本模式任务

// valid old jobThread
        if (jobThread != null &&
                !(jobThread.getHandler() instanceof ScriptJobHandler
                        && ((ScriptJobHandler) jobThread.getHandler()).getGlueUpdatetime()==triggerParam.getGlueUpdatetime() )) {
            // change script or gluesource updated, need kill old thread
            removeOldReason = "change job source or glue type, and terminate the old job thread.";

jobThread = null;
            jobHandler = null;
        }

// valid handler
        if (jobHandler == null) {
            jobHandler = new ScriptJobHandler(triggerParam.getJobId(), triggerParam.getGlueUpdatetime(), triggerParam.getGlueSource(), GlueTypeEnum.match(triggerParam.getGlueType()));
        }
    } else {
        return new ReturnT<String>(ReturnT.FAIL_CODE, "glueType[" + triggerParam.getGlueType() + "] is not valid.");
    }

// 2 处理不同的阻塞策略
    if (jobThread != null) {
        ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(triggerParam.getExecutorBlockStrategy(), null);
        // 丢弃后续调度:调度请求进入单机执行器后,发现执行器存在运行的调度任务,本次请求将会被丢弃并标记为失败
        if (ExecutorBlockStrategyEnum.DISCARD_LATER == blockStrategy) {
            // discard when running
            if (jobThread.isRunningOrHasQueue()) {
                return new ReturnT<String>(ReturnT.FAIL_CODE, "block strategy effect:"+ExecutorBlockStrategyEnum.DISCARD_LATER.getTitle());
            }
        } else if (ExecutorBlockStrategyEnum.COVER_EARLY == blockStrategy) { // 覆盖之前调度:调度请求进入单机执行器后,发现执行器存在运行的调度任务,将会终止运行中的调度任务并清空队列,然后运行本地调度任务
            // kill running jobThread
            if (jobThread.isRunningOrHasQueue()) {
                removeOldReason = "block strategy effect:" + ExecutorBlockStrategyEnum.COVER_EARLY.getTitle();

jobThread = null;
            }
        } else {
            // just queue trigger
        }
    }

// 新建或替换任务已有的 jobHandler
    if (jobThread == null) {
        jobThread = XxlJobExecutor.registJobThread(triggerParam.getJobId(), jobHandler, removeOldReason);
    }

// 3 将调度请求直接放入 jobThread 的触发队列进行异步处理
    ReturnT<String> pushResult = jobThread.pushTriggerQueue(triggerParam);
    return pushResult;
}

run() 方法主要做了以下几件事:

  1. 根据任务模式(GlueTypeEnum)的不同进行不同的任务处理逻辑
    • Bean模式
      每个Bean模式任务都是一个Spring的Bean类实例,它被维护在“执行器”项目的Spring容器中。任务类需要加“@JobHandler(value=”名称”)”注解,因为“执行器”会根据该注解识别Spring容器中的任务。任务类需要继承统一接口“IJobHandler”,任务逻辑在execute方法中开发,因为“执行器”在接收到调度中心的调度请求时,将会调用“IJobHandler”的execute方法,执行任务逻辑
    • GLUE模式(Java)
      每个 “GLUE模式(Java)” 任务的代码,实际上是“一个继承自“IJobHandler”的实现类的类代码”,“执行器”接收到“调度中心”的调度请求时,会通过Groovy类加载器加载此代码,实例化成Java对象,同时注入此代码中声明的Spring服务(请确保Glue代码中的服务和类引用在“执行器”项目中存在),然后调用该对象的execute方法,执行任务逻辑
    • 其他脚本任务
      脚本任务的源码托管在调度中心,脚本逻辑在执行器运行。当触发脚本任务时,执行器会加载脚本源码在执行器机器上生成一份脚本文件,然后通过Java代码调用该脚本;并且实时将脚本输出日志写到任务日志文件中,从而在调度中心可以实时监控脚本运行情况
  2. 根据 blockStrategy 处理不同的阻塞策略
    • 丢弃后续调度:调度请求进入单机执行器后,发现执行器存在运行的调度任务,本次请求将会被丢弃并标记为失败
    • 覆盖之前调度:调度请求进入单机执行器后,发现执行器存在运行的调度任务,将会终止运行中的调度任务并清空队列,然后运行本地调度任务
if (ExecutorBlockStrategyEnum.DISCARD_LATER == blockStrategy) {
 /* 丢弃后续调度:调度请求进入单机执行器后,发现执行器存在运行的调度任务,本次请求将会被丢弃并标记为失败 */
    if (jobThread.isRunningOrHasQueue()) {
        return new ReturnT<String>(ReturnT.FAIL_CODE, "block strategy effect:"+ExecutorBlockStrategyEnum.DISCARD_LATER.getTitle());
    }
} else if (ExecutorBlockStrategyEnum.COVER_EARLY == blockStrategy) {
 /*
     覆盖之前调度:若当前任务的处理线程(jobThread)有正在运行的任务或调度请求队列不为空,则清空当前任务处理线程
     然后创建一个新的 jobThread 处理当前调度请求
 */
    if (jobThread.isRunningOrHasQueue()) {
        removeOldReason = "block strategy effect:" + ExecutorBlockStrategyEnum.COVER_EARLY.getTitle();
        jobThread = null;
    }
} else {
    // just queue trigger
}

if (jobThread == null) {
    // registJobThread 创建新的任务处理进程并将已存在的任务处理进程清除
    jobThread = XxlJobExecutor.registJobThread(triggerParam.getJobId(), jobHandler, removeOldReason);
}
  1. 将调度请求放入 JobThread 的阻塞队列 triggerQueue,等待后续处理
ReturnT<String> pushResult = jobThread.pushTriggerQueue(triggerParam);

LinkedBlockingQueue<TriggerParam> triggerQueue

public ReturnT<String> pushTriggerQueue(TriggerParam triggerParam) {
 // avoid repeat
 if (triggerLogIdSet.contains(triggerParam.getLogId())) {
     logger.info(">>>>>>>>>>> repeate trigger job, logId:{}", triggerParam.getLogId());
     return new ReturnT<String>(ReturnT.FAIL_CODE, "repeate trigger job, logId:" + triggerParam.getLogId());
 }

// avoid repeat trigger for the same TRIGGER_LOG_ID
 triggerLogIdSet.add(triggerParam.getLogId());
 triggerQueue.add(triggerParam);
 return ReturnT.SUCCESS;
}

JobThread 通过 run() 方法循环处理请求,具体处理细节参见下节

7.4.3JobThread(任务处理线程)
public void run() {
    // init
    handler.init();
   
    // 1 从阻塞队列获取调度请求并处理
    while(!toStop){
        // to check toStop signal, we need cycle, so wo cannot use queue.take(), instand of poll(timeout)
        TriggerParam triggerParam = triggerQueue.poll(3L, TimeUnit.SECONDS);
   
        // execute
        XxlJobLogger.log("<br>----------- xxl-job job execute start -----------<br>----------- Param:" + triggerParam.getExecutorParams());
        // 若任务设置超时时间,则在限定时间内异步获取执行结果
        if (triggerParam.getExecutorTimeout() > 0) {
            // limit timeout
            final TriggerParam triggerParamTmp = triggerParam;
            FutureTask<ReturnT<String>> futureTask = new FutureTask<ReturnT<String>>(new Callable<ReturnT<String>>() {
                @Override
                public ReturnT<String> call() throws Exception {
                    return handler.execute(triggerParamTmp.getExecutorParams());
                }
            });
            futureThread = new Thread(futureTask);
            futureThread.start();
            // 异步获取执行结果
            executeResult = futureTask.get(triggerParam.getExecutorTimeout(), TimeUnit.SECONDS);
        } else {
            // 若没有设置超时时间,则同步获取任务执行结果
            executeResult = handler.execute(triggerParam.getExecutorParams());
        }
   
        if (executeResult == null) {
            executeResult = IJobHandler.FAIL;
        } else {
            executeResult.setMsg(
                    (executeResult!=null&&executeResult.getMsg()!=null&&executeResult.getMsg().length()>50000)
                            ?executeResult.getMsg().substring(0, 50000).concat("...")
                            :executeResult.getMsg());
            executeResult.setContent(null); // limit obj size
        }
        XxlJobLogger.log("<br>----------- xxl-job job execute end(finish) -----------<br>----------- ReturnT:" + executeResult);
   
        // 2 任务处理结果回调
        if(triggerParam != null) {
            // callback handler info
            if (!toStop) {
                // commonm
                TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), triggerParam.getLogDateTime(), executeResult));
            } else {
                // is killed
                ReturnT<String> stopResult = new ReturnT<String>(ReturnT.FAIL_CODE, stopReason + " [job running, killed]");
                TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), triggerParam.getLogDateTime(), stopResult));
            }
        }
    }
   
    // 3 任务处理进行被杀死,队列中剩余的调度请求进行失败处理,并回调任务处理结果
    while(triggerQueue !=null && triggerQueue.size()>0){
        TriggerParam triggerParam = triggerQueue.poll();
        if (triggerParam!=null) {
            // is killed
            ReturnT<String> stopResult = new ReturnT<String>(ReturnT.FAIL_CODE, stopReason + " [job not executed, in the job queue, killed.]");
            TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), triggerParam.getLogDateTime(), stopResult));
        }
    }
   
    // destroy
    handler.destroy();
}

 

run() 方法主要做了以下几件事:

1、从阻塞队列获取调度请求并处理

// to check toStop signal, we need cycle, so wo cannot use queue.take(), instand of poll(timeout)
TriggerParam triggerParam = triggerQueue.poll(3L, TimeUnit.SECONDS);
// 若任务设置超时时间,则在限定时间内异步获取执行结果
if (triggerParam.getExecutorTimeout() > 0) {
    // limit timeout
    final TriggerParam triggerParamTmp = triggerParam;
    FutureTask<ReturnT<String>> futureTask = new FutureTask<ReturnT<String>>(new Callable<ReturnT<String>>() {
        @Override
        public ReturnT<String> call() throws Exception {
            return handler.execute(triggerParamTmp.getExecutorParams());
        }
    });
    futureThread = new Thread(futureTask);
    futureThread.start();
    // 异步获取执行结果
    executeResult = futureTask.get(triggerParam.getExecutorTimeout(), TimeUnit.SECONDS);
} else {
    // 若没有设置超时时间,则同步获取任务执行结果
    executeResult = handler.execute(triggerParam.getExecutorParams());
}

2、任务处理结果回调

if (!toStop) {
    // commonm
    TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), triggerParam.getLogDateTime(), executeResult));
} else {
    // is killed
    ReturnT<String> stopResult = new ReturnT<String>(ReturnT.FAIL_CODE, stopReason + " [job running, killed]");
    TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), triggerParam.getLogDateTime(), stopResult));
}

将任务执行结果 executeResult 放入 TriggerCallbackThread 的任务结果队列 callBackQueue

/**
 * job results callback queue
 */
private LinkedBlockingQueue<HandleCallbackParam> callBackQueue = new LinkedBlockingQueue<HandleCallbackParam>();
public static void pushCallBack(HandleCallbackParam callback){
    getInstance().callBackQueue.add(callback);
}

TriggerCallbackThread 创建 triggerCallbackThread 线程从 callBackQueue 中获取 executeResult 并将其以 HTTP POST 形式发送给调度中心,核心处理方法见 doCallback()

private void doCallback(List<HandleCallbackParam> callbackParamList){
    // callback, will retry if error
    for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {
        ReturnT<String> callbackResult = adminBiz.callback(callbackParamList);
    }
}

public ReturnT<String> callback(List<HandleCallbackParam> callbackParamList) {
    return XxlJobRemotingUtil.postBody(addressUrl+"api/callback", accessToken, timeout, callbackParamList, String.class);
}

调度中心获取任务结果参数后,调用AdminBizImpl#callback(com.xxl.job.core.biz.model.HandleCallbackParam) 方法进行处理

private ReturnT<String> callback(HandleCallbackParam handleCallbackParam) {
    // valid log item
    XxlJobLog log = xxlJobLogDao.load(handleCallbackParam.getLogId());
    }

// trigger success, to trigger child job
    String callbackMsg = null;
    if (IJobHandler.SUCCESS.getCode() == handleCallbackParam.getExecuteResult().getCode()) {
        XxlJobInfo xxlJobInfo = xxlJobInfoDao.loadById(log.getJobId());
        if (xxlJobInfo!=null && xxlJobInfo.getChildJobId()!=null && xxlJobInfo.getChildJobId().trim().length()>0) {
            callbackMsg = "<br><br><span style=\"color:#00c0ef;\" > >>>>>>>>>>>"+ I18nUtil.getString("jobconf_trigger_child_run") +"<<<<<<<<<<< </span><br>";

String[] childJobIds = xxlJobInfo.getChildJobId().split(",");
            for (int i = 0; i < childJobIds.length; i++) {
                int childJobId = (childJobIds[i]!=null && childJobIds[i].trim().length()>0 && isNumeric(childJobIds[i]))?Integer.valueOf(childJobIds[i]):-1;
                if (childJobId > 0) {

JobTriggerPoolHelper.trigger(childJobId, TriggerTypeEnum.PARENT, -1, null, null, null);
                    ReturnT<String> triggerChildResult = ReturnT.SUCCESS;

// add msg
                    callbackMsg += MessageFormat.format(I18nUtil.getString("jobconf_callback_child_msg1"),
                            (i+1),
                            childJobIds.length,
                            childJobIds[i],
                            (triggerChildResult.getCode()==ReturnT.SUCCESS_CODE?I18nUtil.getString("system_success"):I18nUtil.getString("system_fail")),
                            triggerChildResult.getMsg());
                } else {
                    callbackMsg += MessageFormat.format(I18nUtil.getString("jobconf_callback_child_msg2"),
                            (i+1),
                            childJobIds.length,
                            childJobIds[i]);
                }
            }

}
    }

// handle msg
    StringBuffer handleMsg = new StringBuffer();
    if (log.getHandleMsg()!=null) {
        handleMsg.append(log.getHandleMsg()).append("<br>");
    }
    if (handleCallbackParam.getExecuteResult().getMsg() != null) {
        handleMsg.append(handleCallbackParam.getExecuteResult().getMsg());
    }
    if (callbackMsg != null) {
        handleMsg.append(callbackMsg);
    }

if (handleMsg.length() > 15000) {
        handleMsg = new StringBuffer(handleMsg.substring(0, 15000));  // text最大64kb 避免长度过长
    }

// success, save log
    log.setHandleTime(new Date());
    log.setHandleCode(handleCallbackParam.getExecuteResult().getCode());
    log.setHandleMsg(handleMsg.toString());
    xxlJobLogDao.updateHandleInfo(log);

return ReturnT.SUCCESS;
}

3、JobThread 停止后处理队列遗留调度请求

while(triggerQueue !=null && triggerQueue.size()>0){
    TriggerParam triggerParam = triggerQueue.poll();
    if (triggerParam!=null) {
        // is killed
        ReturnT<String> stopResult = new ReturnT<String>(ReturnT.FAIL_CODE, stopReason + " [job not executed, in the job queue, killed.]");
        TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), triggerParam.getLogDateTime(), stopResult));
    }
}

7.4.4IJobHandler(任务单元)
public abstract class IJobHandler {
   
    /** success */
    public static final ReturnT<String> SUCCESS = new ReturnT<String>(200, null);
    /** fail */
    public static final ReturnT<String> FAIL = new ReturnT<String>(500, null);
    /** fail timeout */
    public static final ReturnT<String> FAIL_TIMEOUT = new ReturnT<String>(502, null);

/**
     * execute handler, invoked when executor receives a scheduling request
     *
     * @param param
     * @return
     * @throws Exception
     */
    public abstract ReturnT<String> execute(String param) throws Exception;

/**
     * init handler, invoked when JobThread init
     */
    public void init() throws InvocationTargetException, IllegalAccessException {
        // do something
    }

/**
     * destroy handler, invoked when JobThread destroy
     */
    public void destroy() throws InvocationTargetException, IllegalAccessException {
        // do something
    }
}

IJobHandler 有四个实现类:

  • MethodJobHandler:负责处理 BEAN 模式的任务
  • GlueJobHandler:负责处理 Glue 模式的任务
  • ScriptJobHandler:负责处理脚本模式的任务
  • CommandJobHandler:负责处理命令行任务

7.5、心跳检测

任务注册 Beat 周期默认30s

执行器以一倍 Beat 进行执行器注册

调度中心以一倍 Beat 进行动态任务发现

执行器注册信息的失效时间为三倍 Beat

执行器销毁,主动上报调度中心并摘除对应的执行器机器信息

 

8XXL-JOB改造

8.1、自动装配
8.1.1XxlJobAutoConfigure

XxlJobAutoConfigure 自动装配 XxlJobSpringExecutor 到项目的 Spring 容器中

/**
 * matchIfMissing 属性为true时,配置文件中缺少对应的value或name的对应的属性值,也会注入成功
 */
@Configuration
@EnableConfigurationProperties(XxlJobSpringExecutorProperties.class)
@ConditionalOnClass(XxlJobSpringExecutor.class)
@ConditionalOnProperty(prefix = "xxl-job", value = "enabled", matchIfMissing = true)
public class XxlJobAutoConfigure {

private static Logger LOGGER = LoggerFactory.getLogger(XxlJobAutoConfigure.class);

@Autowired
    private XxlJobSpringExecutorProperties properties;

@Bean
    @ConditionalOnMissingBean(XxlJobSpringExecutor.class)
    public XxlJobSpringExecutor xxlJobExecutor() {
        LOGGER.info(">>>>>>>>>>> xxl-job config auto init.");
        XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
        xxlJobSpringExecutor.setAdminAddresses(properties.getAdmin().getAddresses());
        xxlJobSpringExecutor.setAppname(properties.getExecutor().getAppname());
        xxlJobSpringExecutor.setAddress(properties.getExecutor().getAddress());
        xxlJobSpringExecutor.setIp(properties.getExecutor().getIp());
        xxlJobSpringExecutor.setPort(properties.getExecutor().getPort());
        xxlJobSpringExecutor.setAccessToken(properties.getAccessToken());
        xxlJobSpringExecutor.setLogPath(properties.getExecutor().getLogPath());
        xxlJobSpringExecutor.setLogRetentionDays(properties.getExecutor().getLogRetentionDays());
        xxlJobSpringExecutor.setApplicationContext(SpringUtils.getApplicationContext());

LOGGER.info(">>>>>>>>>>> XxlJobSpringExecutor: {}", xxlJobSpringExecutor.toString());
        LOGGER.info(">>>>>>>>>>> xxl-job config auto init end.");
        return xxlJobSpringExecutor;
    }
}
8.1.2、XxlJobSpringExecutorProperties

XxlJobSpringExecutor 的属性全部放在 XxlJobSpringExecutorProperties 中

@ConfigurationProperties(prefix = "xxl-job")
public class XxlJobSpringExecutorProperties {

private String accessToken;

@NestedConfigurationProperty
    private Admin admin = new Admin();

@NestedConfigurationProperty
    private Executor executor = new Executor();

public String getAccessToken() {
        return accessToken;
    }

public void setAccessToken(String accessToken) {
        this.accessToken = accessToken;
    }

public Admin getAdmin() {
        return admin;
    }

public void setAdmin(Admin admin) {
        this.admin = admin;
    }

public Executor getExecutor() {
        return executor;
    }

public void setExecutor(Executor executor) {
        this.executor = executor;
    }

public static class Admin {
        private String addresses;

public String getAddresses() {
            return addresses;
        }

public void setAddresses(String addresses) {
            this.addresses = addresses;
        }
    }

public static class Executor {
        private String appname;
        private String address;
        private String ip;
        private int port;
        private String logPath;
        private int logRetentionDays;

public String getAppname() {
            return appname;
        }

public void setAppname(String appname) {
            this.appname = appname;
        }

public String getAddress() {
            return address;
        }

public void setAddress(String address) {
            this.address = address;
        }

public String getIp() {
            return ip;
        }

public void setIp(String ip) {
            this.ip = ip;
        }

public int getPort() {
            return port;
        }

public void setPort(int port) {
            this.port = port;
        }

public String getLogPath() {
            return logPath;
        }

public void setLogPath(String logPath) {
            this.logPath = logPath;
        }

public int getLogRetentionDays() {
            return logRetentionDays;
        }

public void setLogRetentionDays(int logRetentionDays) {
            this.logRetentionDays = logRetentionDays;
        }
    }
}
8.1.3、配置spring.factories

在项目的 resources\META-INF 目录新建 spring.factories 文件,并配置自动配置类 XxlJobAutoConfigure

org.springframework.boot.autoconfigure.EnableAutoConfiguration = com.xxl.job.core.properties.auto.configure.XxlJobAutoConfigure
8.2、集成spring-session
8.2.1、引入spring-session相关依赖
<dependencyManagement>
  <dependencies>
    <dependency>
      <groupId>org.springframework.session</groupId>
      <artifactId>spring-session-bom</artifactId>
      <version>Corn-SR2</version>
      <type>pom</type>
      <scope>import</scope>
    </dependency>
  </dependencies>
</dependencyManagement>

<!-- Spring Session core -->
<dependency>
    <groupId>org.springframework.session</groupId>
    <artifactId>spring-session-core</artifactId>
</dependency>

<!-- Spring Session Data Redis -->
<dependency>
    <groupId>org.springframework.session</groupId>
    <artifactId>spring-session-data-redis</artifactId>
</dependency>

<!-- Spring Boot Redis Data Starter -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
    <version>2.3.1.RELEASE</version>
</dependency>
8.2.2、添加相关配置

在 application.properties 文件中添加一下配置

# spring session
## equivalent to manually adding @EnableRedisHttpSession annotation
spring.session.store-type=redis
## flush-mode 有两个参数:ON_SAVE(表示在response commit前刷新缓存),IMMEDIATE(表示只要有更新,就刷新缓存)
spring.session.redis.flush-mode=on_save
## 添加后,redis中的key为spring:session:xxl-job
spring.session.redis.namespace=xxl-job

# redis config
## Redis server host.
## Redis 要开启事件通知 redis-cli config set notify-keyspace-events Egx
spring.redis.host=192.168.99.100
## Login password of the redis server.spring.redis.password=
## Redis server port.spring.redis.port=16379

9、几种任务调度平台及对比

9.1、任务调度平台有哪些

quartz、elastic-job、xxl-job

9.2、几种任务调度平台对比

 

quartzxxl-job对比:

quartz采用api的方式调用任务,不方便,但是xxl-job使用的是管理界面。

quartz比xxl-job代码侵入更强

quartz调度逻辑和QuartzJobBean耦合在一个项目中,当任务增多,逻辑复杂的时候,性能会受到影响

quartz底层以抢占式获取db锁并且由抢占成功的节点运行,导致节点负载悬殊非常大;xxl-job通过执行器实现协同分配式运行任务,各个节点比较均衡。

elastic-jobxxl-job对比:

elastic-job是无中心化的,通过zookeeper的选举机制选出主服务器,如果主服务器挂了,重新选举出主服务器,因此elastic-job的扩展性和可用性较好,但是使用有一定的复杂度。使用于业务复杂,业务量大,服务器多。

xxl-job是中心式的调度平台调度执行器执行任务,使用的是DB锁来保证集群分布式调用的一致性,学习简单,操作容易,成本不高。

相对来说,xxl-job中心式的调度平台轻量级,开箱即用,操作简易,上手快,与SpringBoot有非常好的集成,而且监控界面就集成在调度中心,界面又简洁,对于企业维护起来成本不高,还有失败的邮件告警等等。这就使很多企业选择xxl-job做调度平台。

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

标签:执行器,知识点,triggerParam,job,调度,JOB,任务,xxl,XXL
From: https://blog.csdn.net/hao_kkkkk/article/details/136853431

相关文章

  • python知识点总结(八)
    python知识点总结八1、数组与链表的区别?2、函数中全局变量和局部变量例1:函数中使用global,将改变原变量例2:函数中不使用global,函数外部不会改变原变量例3例4例53、函数传参爬坑:例1例2例3例4例5例64、字符串常用方法a、split:如果括号中不加任何内容,按照空格进行拆分b、......
  • 2023年红蓝对抗-HW蓝队基本知识点
    1.Linux排查思路(1)首先检测用户账号安全,如新增用户和可疑用户以及高权限用户。(2)history命令查看历史linux指令,uptime查看用户登录信息(3)检查端口(netstat命令)和进程(ps命令),重点观察资源占用率高的进程(4)检查日志信息,var/log文件夹内的一些系统日志和安全日志。(5)利用自动......
  • transfomer知识点梳理
    1:自注意力机制的结构是什么?答:输入是一个序列,初始化三个权重矩阵;对于序列中的每个元素,分别经过这些权重矩阵生成QKV,然后Q与序列中的所有元素的K进行点积乘法,得到当前元素与序列中每个元素的注意力得分,这样得到了所有元素关于序列中所有元素的注意力得分;再对每个元素与序列......
  • kafka知识点
    传统的消息队列的主要应用场景包括:缓存/消峰解耦异步通信消息队列的两种模式点对点模式发布/订阅模式基础架构1.为方便扩展,并提高吞吐量,一个topic分为多个partition2.配合分区的设计,提出消费者组的概念,组内每个消费者并行消费3.为提高可用性,为每个partition增加若干副......
  • (Java)猛刷LeetCode——数组知识点篇
    数组Array在连续的内存空间中,存储一组相同类型的元素元素:值索引:数组的下标数组访问(Access)和数组搜索(Search)●数组访问:索引●数组搜索:找2这个元素数组中有没有以下是数组的常规操作:数组创建、添加元素、访问元素、修改元素、删除元素、遍历数组、查找元素、数组......
  • (Python)知识点——数组篇
    在连续的内存空间中,存储一组相同类型的元素元素:值索引:数组的下标数组访问(Access)和数组搜索(Search)●数组访问:索引●数组搜索:找2这个元素数组中有没有常规操作数组的代码如下:#-*-coding:utf-8-*-#@Time:2024-03-2022:14#@Author:Lindand#@Fil......
  • 基础篇--Python重要知识点总结
    Python语言不同于Java和C,它属于高层次的脚本语言,简单易学。但是如果你去找python教程,那真的是要好好啃很久,但是其实最常用的和最重点的东西没有那么多,最近看了北大的一个老师讲的课程进行了以一些总结,手动Run一Run以下知识点,基本上就可以上手撕数据分析或者人工智能相关的编......
  • python3最全知识点,从入门到开车就是这么简单(建议收藏)
    前言:此文是自己学习python过程中的笔记和总结.适合有语言基础的人快速了解python3和没基础的作为学习的大纲了解学习的方向、知识点;笔记是从多本书和视频上学习后的整合版。(一)初识python1、一般文件以.py为格式;用#作注释.2、一般对象不用特别声明,python会自动识别;一......
  • Java回溯知识点(含面试大厂题和源码)
    回溯算法是一种通过遍历所有可能的候选解来寻找所有解的算法,如果候选解被确认不是一个解(或至少不是最后一个解),回溯算法会通过在上一步进行一些变化来丢弃这个解,即“回溯”并尝试另一个候选解。回溯法通常用递归方法来实现,在解决排列、组合、选择问题时非常有效。回溯算法的......
  • SpringBoot项目集成XXL-job
    文章目录首先引入依赖配置信息配置类定义定时任务执行方法配置任务执行器配置任务执行计划在集成XXL-job前,首先确保部署了XXL-job的admin服务,如果还没有部署的话请参照Docker安装部署XXL-Job将XXL-job部署起来.此时,XXL-job已经部署好了,下来一......