首页 > 其他分享 >tsf定时任务迁移到xxl-job

tsf定时任务迁移到xxl-job

时间:2022-11-02 23:04:07浏览次数:96  
标签:执行器 tsf 任务 job import com xxl


tsf定时任务迁移到xxl-job

  • ​​1. 介绍​​
  • ​​2. 原理​​
  • ​​2.1 设计思想​​
  • ​​2.2 系统组成​​
  • ​​2.3 架构图​​
  • ​​3. 迁移方案​​
  • ​​3.1 现状​​
  • ​​3.2 迁移方案​​
  • ​​3.3 xxl-job 配置​​
  • ​​3.4 调度配置​​
  • ​​3.5 执行器配置​​
  • ​​4. 后续开发方案​​
  • ​​4.1 tsf 方式​​
  • ​​4.2 xxl-job 注解​​
  • ​​5. xxl-job 接入手册​​
  • ​​5.1 maven 依赖​​
  • ​​5.2 配置类​​
  • ​​5.3 配置yml​​
  • ​​5.4 增加常量接口​​
  • ​​5.5 增加任务注册​​
  • ​​6. 使用​​
  • ​​7. 总结​​

xxl-job 项目gitee:https://gitee.com/xuxueli0323/xxl-job

xxl-job 项目github:https://github.com/xuxueli/xxl-job/

xxl-job 项目文档:https://www.xuxueli.com/xxl-job/

1. 介绍

XXL-JOB是一个分布式任务调度平台,其核心设计目标是开发迅速、学习简单、轻量级、易扩展。现已开放源代码并接入多家公司线上产品线,开箱即用。

xxl-job 是一个国人开发的框架,于15年开源启动,到现在也有7年历史,从最开始的基础功能实现,到现在支持分布式,集群,任务重试,分片等等。基本上已经很全面了,很能打了。

2. 原理

xxl-job 分为两部分,xxl-job-admin,xxl-job-executor。调度中心,执行器。

  • 调度中心提供管理、调度功能
  • 执行器提供任务注册,任务执行功能

执行器启动后,扫描spring 容器中的执行类(​​jobHandler​​),然后将执行类注册到调度中心。调度中心会保存执行类和执行器的关系,当调度触发某个执行类执行时,调度中心就会要求执行器执行指定的执行类,完成任务调度。

在xxl-job框架中,目前主流有两种方案:一是独立的执行器;二是集成的执行器。

独立的执行器:

单独部署spring boot 服务,专门用于执行定时任务,针对执行器服务,支持横向动态扩展,类似于资源池,用于执行任务。

独立执行器适合于定时任务较多,且同时存在任务执行时间差距较大,需要对定时任务做单独资源管理的场景。

因为可能部分定时任务耗费资源较多,一定程度上会争夺资源。

案例:账务模块中账单的生成,结算模块中结算单生成。这两种场景,都是指定时间的大并发场景,如果和其他业务放在一起,那么在指定时间,触发大并发的时候,会影响其他业务,因此抽离出来比较合适。

实现方案:

  1. 使用xxl-job的在线任务配置,将源码直接在在线编辑器中执行。
    这种方式使用特别少,适合与系统依赖非常低的场景。
  2. 使用 spring 远程调用的方式,在执行器中调用其他服务。
    这种方式使用普遍,比如定时任务中用到了其他服务的功能.
    这种是在执行器中开发任务逻辑。
  3. 完全使用 spring 远程调用。
    这种方式与2的区别在于:在执行器外面开发任务逻辑,执行器只是一个调用的作用。

集成的执行器:

不单独部署 spring boot 服务,定时任务与业务接口部署在同一个服务中。

2.1 设计思想

将调度行为抽象形成“调度中心”公共平台,而平台自身并不承担业务逻辑,“调度中心”负责发起调度请求。

将任务抽象成分散的JobHandler,交由“执行器”统一管理,“执行器”负责接收调度请求并执行对应的JobHandler中业务逻辑。

因此,“调度”和“任务”两部分可以相互解耦,提高系统整体稳定性和扩展性;

2.2 系统组成

  • 调度模块(调度中心)
    负责管理调度信息,按照调度配置发出调度请求,自身不承担业务代码。调度系统与任务解耦,提高了系统可用性和稳定性,同时调度系统性能不再受限于任务模块;
    支持可视化、简单且动态的管理调度信息,包括任务新建,更新,删除,GLUE开发和任务报警等,所有上述操作都会实时生效,同时支持监控调度结果以及执行日志,支持执行器Failover。
  • 执行模块(执行器)
    负责接收调度请求并执行任务逻辑。任务模块专注于任务的执行等操作,开发和维护更加简单和高效;
    接收“调度中心”的执行请求、终止请求和日志请求等。

2.3 架构图

tsf定时任务迁移到xxl-job_xxl-job

原理更多资料请看:https://www.xuxueli.com/xxl-job/#%E4%BA%94%E3%80%81%E6%80%BB%E4%BD%93%E8%AE%BE%E8%AE%A1

3. 迁移方案

3.1 现状

现在我们使用的是 tsf 的任务框架。

tsf 的任务要求我们实现 ​​com.tencent.cloud.task.sdk.client.spi.ExecutableTask​​ 接口。

源码如下:

package com.tencent.cloud.task.sdk.client.spi;

import com.tencent.cloud.task.sdk.client.model.ExecutableTaskData;
import com.tencent.cloud.task.sdk.client.model.ProcessResult;

public interface ExecutableTask {
ProcessResult execute(ExecutableTaskData var1);
}

很简单,就是实现一个​​execute​​方法。

其参数是封装了一些任务信息,包括任务调度,任务重试,任务超时,任务参数等信息。

返回值封装了任务执行结果,成功失败,成功失败信息,以及任务元数据信息。

3.2 迁移方案

因为我们切换了新的任务调度框架,结合 tsf 的任务现状,迁移方案设计如下:

首先增加一个 spring 上下文容器属性操作,扫描 srping 容器中 ​​com.tencent.cloud.task.sdk.client.spi.ExecutableTask​​ 接口的实现类,然后将实现类注册到xxl-job。

在注册到xxl-job的时候,新建xxl-job的执行类,做 tsf 的任务的实现代理。

具体源码如下:

import com.tencent.cloud.task.sdk.client.model.ExecutableTaskData;
import com.tencent.cloud.task.sdk.client.model.ProcessResult;
import com.tencent.cloud.task.sdk.client.model.ProcessResultCode;
import com.tencent.cloud.task.sdk.client.spi.ExecutableTask;
import com.xxl.job.core.context.XxlJobHelper;
import com.xxl.job.core.executor.XxlJobExecutor;
import com.xxl.job.core.handler.IJobHandler;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;

/**
* xxl-job registry
*
* @author
* @date 2022/10/17 15:27
**/
@Slf4j
@Component
public class XxlJobRegistry implements ApplicationContextAware {

@Value("${spring.application.name:}")
private String appName;

@Override
public void setApplicationContext (ApplicationContext applicationContext) throws BeansException {
// 1. 获取全部的 tsf 的任务
Map<String, ExecutableTask> beansOfType = applicationContext.getBeansOfType(ExecutableTask.class);
beansOfType.forEach((key, value) -> {
// 2. 注册到 xxl-job
// name = appName # className # execute
// name = appName # 任务 method 全引用路径
// 使用 method 的全引用路径,防止 beanName 重复的任务
// 使用 appName 前缀,防止跨项目 method 全引用路径重复
// 使用 匿名实现类 做 tsf 到 xxl-job 的转换
XxlJobExecutor.registJobHandler(appName + "#" + value.getClass().getName() + "#" + "execute", new IJobHandler() {
@Override
public void execute () throws Exception {
// XxlJobHelper.log 是 xxl-job 的 日志打印方式
XxlJobHelper.log(" job : {} start , param : {} ", key, XxlJobHelper.getJobParam());
// log.info 是 tsf 的日志打印方式,也是我们统一日志集成方式
log.info(" job : {} start , param : {} ", key, XxlJobHelper.getJobParam());
// 构造 tsf 任务触发请求参数
ExecutableTaskData executableTaskData = new ExecutableTaskData();
// 将 xxl-job 的任务参数 转发 到 tsf 任务请求参数中
// 兼容现有的 tsf 任务,同时支持后续任务开发继续使用 tsf 模式
executableTaskData.setTaskArgument(XxlJobHelper.getJobParam());
// 调用 stf 任务
ProcessResult execute = value.execute(executableTaskData);
// 对 tsf 任务做结果转换
if (ProcessResultCode.SUCCESS == execute.getResultCode()) {
XxlJobHelper.handleSuccess(execute.getResultMsg());
} else {
XxlJobHelper.handleFail(execute.getResultMsg());
}
XxlJobHelper.log("job : {} , executor : {} , message : {} ", key, execute.getResultCode(), execute.getResultMsg());
log.info("job : {} , executor : {} , message : {} ", key, execute.getResultCode(), execute.getResultMsg());
}
});
log.info(" xxl-job registry {} success!", key);
});
}

}

使用这种方式,可以同时支持 tsf 和 xxl-job ,一方面是 xxl-job 试用阶段的双框架支持,另一方面是 现有 任务的 xxl-job 支持。

3.3 xxl-job 配置

xxl-job的配置主要是 xxl-job 执行器的配置。

### 调度中心部署根地址 [选填]:如调度中心集群部署存在多个地址则用逗号分隔。执行器将会使用该地址进行"执行器心跳注册"和"任务结果回调";为空则关闭自动注册;
xxl.job.admin.addresses=http://127.0.0.1:8080/xxl-job-admin
### 执行器通讯TOKEN [选填]:非空时启用;
xxl.job.accessToken=
### 执行器AppName [选填]:执行器心跳注册分组依据;为空则关闭自动注册
xxl.job.executor.appname=xxl-job-executor-sample
### 执行器注册 [选填]:优先使用该配置作为注册地址,为空时使用内嵌服务 ”IP:PORT“ 作为注册地址。从而更灵活的支持容器类型执行器动态IP和动态映射端口问题。
xxl.job.executor.address=
### 执行器IP [选填]:默认为空表示自动获取IP,多网卡时可手动设置指定IP,该IP不会绑定Host仅作为通讯实用;地址信息用于 "执行器注册" 和 "调度中心请求并触发任务";
xxl.job.executor.ip=
### 执行器端口号 [选填]:小于等于0则自动获取;默认端口为9999,单机部署多个执行器时,注意要配置不同执行器端口;
xxl.job.executor.port=9999
### 执行器运行日志文件存储磁盘路径 [选填] :需要对该路径拥有读写权限;为空则使用默认路径;
xxl.job.executor.logpath=/data/applogs/xxl-job/jobhandler
### 执行器日志文件保存天数 [选填] : 过期日志自动清理, 限制值大于等于3时生效; 否则, 如-1, 关闭自动清理功能;
xxl.job.executor.logretentiondays=30

对应的配置类:

import com.xxl.job.core.executor.impl.XxlJobSpringExecutor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.context.config.annotation.RefreshScope;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
* xxl-job config
*
* @author jiayongqi
* @date 2022/10/17 14:27
**/
@Slf4j
@RefreshScope
@Configuration
public class XxlJobConfig {
@Value("${xxl.job.admin.addresses:}")
private String adminAddresses;
@Value("${xxl.job.executor.appname:}")
private String appName;
@Value("${xxl.job.accessToken:}")
private String accessToken;
@Value("${xxl.job.executor.address:}")
private String address;
@Value("${xxl.job.executor.ip:}")
private String ip;
@Value("${xxl.job.executor.port:0}")
private int port;
@Value("${xxl.job.executor.logpath:}")
private String logPath;
@Value("${xxl.job.executor.logretentiondays:-1}")
private int logRetentionDays;

@Bean
public XxlJobSpringExecutor xxlJobExecutor () {
XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
xxlJobSpringExecutor.setAccessToken(accessToken);
xxlJobSpringExecutor.setAppname(appName);
xxlJobSpringExecutor.setAddress(address);
xxlJobSpringExecutor.setIp(ip);
xxlJobSpringExecutor.setPort(port);
xxlJobSpringExecutor.setLogPath(logPath);
xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);
log.info("xxl-job config init : {}", xxlJobSpringExecutor);
return xxlJobSpringExecutor;
}

}

3.4 调度配置

首先登陆到xxl-job的调度平台

tsf定时任务迁移到xxl-job_定时任务迁移_02

现有任务:

现有任务:​​com.boss.communal.xxl.job.task.TsfTestTask​

tsf定时任务迁移到xxl-job_1024程序员节_03

根据规则​​appName # className # methodName​​​拼接出Task的唯一标识​​boss-communal#com.boss.communal.xxl.job.task.TsfTestTask#execute​

然后在任务管理中创建任务

tsf定时任务迁移到xxl-job_tsf_04

创建成功后,就可以执行了

tsf定时任务迁移到xxl-job_1024程序员节_05

如果有参数,那么就设置参数,和 tsf 的任务管理控制台任务参数相同,需要和任务逻辑中参数解析的格式保持一致

tsf定时任务迁移到xxl-job_xxl-job_06

然后就可以查看执行日志了

tsf定时任务迁移到xxl-job_定时任务迁移_07

如下

tsf定时任务迁移到xxl-job_tsf_08

3.5 执行器配置

执行器只需要配置一次即可,与项目同步即可,与​​applicationName​​相同即可

tsf定时任务迁移到xxl-job_tsf_09

spring 服务要注册到那个执行器,由配置项配置

tsf定时任务迁移到xxl-job_1024程序员节_10

4. 后续开发方案

服务同时支持 tsf 和 xxl-job 后,后续开发定时任务,有两种方式:tsf 方式、xxl-job 注解。

4.1 tsf 方式

继续实现​​com.tencent.cloud.task.sdk.client.spi.ExecutableTask​​接口,别忘记将接口的实现类让spring 管理。

例子:

tsf定时任务迁移到xxl-job_tsf_11

4.2 xxl-job 注解

对定时任务的逻辑上增加​​@XxlJob​​​的注解即可,​​value​​​就是我们在新增任务时的唯一标识。请注意规则​​appName # className # methodName​

为了方便统一管理已将 ​​appName​​ 封装为接口常量

tsf定时任务迁移到xxl-job_xxl-job_12

后面写定时任务的时候,实现接口即可(接口没有需要实现的方法,只是提供常量)

然后在​​@XxlJob​​注解中使用即可

tsf定时任务迁移到xxl-job_1024程序员节_13

5. xxl-job 接入手册

5.1 maven 依赖

在需要接入的项目的最外层 ​​pom.xml​​​ 中的​​dependencies​​标签下增加依赖

<dependency>
<groupId>com.xuxueli</groupId>
<artifactId>xxl-job-core</artifactId>
<version>${xxl-job.version}</version>
</dependency>

别忘记在​​properties​​标签下增加版本号

<xxl-job.version>2.3.1</xxl-job.version>

刷新maven依赖

5.2 配置类

在配置包中,增加配置类(因为xxl-job需要指定执行器,加上xxl-job需要负责注册和监听任务执行,还是比较耗费资源的,所以,不建议将配置类和依赖集成到​​common​​中,后期如果全部接入后,可以考虑)

import com.xxl.job.core.executor.impl.XxlJobSpringExecutor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.context.config.annotation.RefreshScope;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
* xxl-job config
*
* @author
* @date 2022/10/17 14:27
**/
@Slf4j
@RefreshScope
@Configuration
public class XxlJobConfig {
@Value("${xxl.job.admin.addresses:}")
private String adminAddresses;
@Value("${xxl.job.executor.appname:}")
private String appName;
@Value("${xxl.job.accessToken:}")
private String accessToken;
@Value("${xxl.job.executor.address:}")
private String address;
@Value("${xxl.job.executor.ip:}")
private String ip;
@Value("${xxl.job.executor.port:0}")
private int port;
@Value("${xxl.job.executor.logpath:}")
private String logPath;
@Value("${xxl.job.executor.logretentiondays:-1}")
private int logRetentionDays;

@Bean
public XxlJobSpringExecutor xxlJobExecutor () {
XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
xxlJobSpringExecutor.setAccessToken(accessToken);
xxlJobSpringExecutor.setAppname(appName);
xxlJobSpringExecutor.setAddress(address);
xxlJobSpringExecutor.setIp(ip);
xxlJobSpringExecutor.setPort(port);
xxlJobSpringExecutor.setLogPath(logPath);
xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);
log.info("xxl-job config init : {}", xxlJobSpringExecutor);
return xxlJobSpringExecutor;
}

}

5.3 配置yml

在yml中增加配置

xxl:
job:
accessToken: test
admin:
addresses: http://slaver:18080/xxl-job-admin
executor:
appname: boss-communal
# address: http://master
ip: 172.16.10.251
port: 9999 # 默认 9999
logpath: ./xxl-job/
logretentiondays: -1

应用程序只能获取到k8s容器内的ip,在访问的时候需要注意,如果调度中心与执行器处于同一个k8s网络中,那么使用ip可以,否则就需要使用 address。

需要保证 执行器 和 调度器 之间能相互访问。

5.4 增加常量接口

在配置包或者工具包中增加常量接口

/**
* @author
* @date 2022/10/20 10:47
**/
public interface ApplicationName {

String APP_NAME = "boss-communal";

}

5.5 增加任务注册

在应用初始化的包中增加xxl-job任务注册类

import com.boss.communal.xxl.job.common.ApplicationName;
import com.tencent.cloud.task.sdk.client.model.ExecutableTaskData;
import com.tencent.cloud.task.sdk.client.model.ProcessResult;
import com.tencent.cloud.task.sdk.client.model.ProcessResultCode;
import com.tencent.cloud.task.sdk.client.spi.ExecutableTask;
import com.xxl.job.core.context.XxlJobHelper;
import com.xxl.job.core.executor.XxlJobExecutor;
import com.xxl.job.core.handler.IJobHandler;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;

/**
* xxl-job registry
*
* @author
* @date 2022/10/17 15:27
**/
@Slf4j
@Component
public class XxlJobRegistry implements ApplicationContextAware, ApplicationName {
@Override
public void setApplicationContext (ApplicationContext applicationContext) throws BeansException {
// 1. 获取全部的 tsf 的任务
Map<String, ExecutableTask> beansOfType = applicationContext.getBeansOfType(ExecutableTask.class);
beansOfType.forEach((key, value) -> {
// 2. 注册到 xxl-job
// name = appName # className # execute
// 使用 method 的全引用路径,防止 beanName 重复的任务
// 使用 appName 前缀,防止跨项目 method 全引用路径重复
// name = appName # 任务 method 全引用路径
XxlJobExecutor.registJobHandler(APP_NAME + "#" + value.getClass().getName() + "#" + "execute", new IJobHandler() {
@Override
public void execute () throws Exception {
// XxlJobHelper.log 是 xxl-job 的 日志打印方式
XxlJobHelper.log(" job : {} start , param : {} ", key, XxlJobHelper.getJobParam());
// log.info 是 tsf 的日志打印方式,也是我们统一日志集成方式
log.info(" job : {} start , param : {} ", key, XxlJobHelper.getJobParam());
// 构造 tsf 任务触发请求参数
ExecutableTaskData executableTaskData = new ExecutableTaskData();
// 将 xxl-job 的任务参数 转发 到 tsf 任务请求参数中
// 兼容现有的 tsf 任务,同时支持后续任务开发继续使用 tsf 模式
executableTaskData.setTaskArgument(XxlJobHelper.getJobParam());
// 调用 stf 任务
ProcessResult execute = value.execute(executableTaskData);
// 对 tsf 任务做结果转换
if (ProcessResultCode.SUCCESS == execute.getResultCode()) {
XxlJobHelper.handleSuccess(execute.getResultMsg());
} else {
XxlJobHelper.handleFail(execute.getResultMsg());
}
XxlJobHelper.log("job : {} , executor : {} , message : {} ", key, execute.getResultCode(), execute.getResultMsg());
log.info("job : {} , executor : {} , message : {} ", key, execute.getResultCode(), execute.getResultMsg());
}
});
log.info(" xxl-job registry {} success!", key);
});
}

}

6. 使用

要使用xxl-job,就需要先将执行器注册到调度中心。

如果执行器还未在调度中心创建,需要先创建,然后在创建基于执行器的任务,最后配置定时任务或者手动任务。

具体使用教程见文档:https://www.xuxueli.com/xxl-job/#%E5%9B%9B%E3%80%81%E6%93%8D%E4%BD%9C%E6%8C%87%E5%8D%97

7. 总结

xxl-job毕竟是国人开发的框架,其文档非常适合国人习惯,而且非常通俗易懂。遇到的问题在文档中都能找到。

​https://www.xuxueli.com/xxl-job/​


标签:执行器,tsf,任务,job,import,com,xxl
From: https://blog.51cto.com/jiayq/5818444

相关文章

  • effective cmake 和 effective job
    effectivecmake的思想是面向target,影响我target是有编译选项(debug信息),include的头文件,关联的lib,第三方包,log系统。相当与写代码的逆向过程。工作也是一项,你的......
  • xxl-job 报:java.net.BindException: Address already in use: bind
    在低版本的xxl-job中,初始化XxlJobSpringExecutor执行器需要在@Bean中加上initMethod="start",destroyMethod="destroy",但是在高版本的xxl-job(如2.1.2)则需......
  • xxl-job配置及使用
    源码源码仓库地址ReleaseDownloadhttps://github.com/xuxueli/xxl-jobDownloadxxl-job-gitee:一个分布式任务调度平台,其核心设计目标是开发迅速、学习简单......
  • xxl-job 源码初探
    xxl-job客户端把JobHandle的value和method映射关系存储到map中1.1启动入口1.2在该文件getBean后被调用1.3xxl-job实际映射位置1.4放置到map中......
  • tensorflow:OOM when allocating tensor with shape[225,256,256,36] and type float
    运行TensorFlow(GPU)报错如下:OOMwhenallocatingtensorwithshape[225,256,256,36]andtypefloaton/job:localhost/replica:0/tas原因是GPUOOM内存不够,因此可尝试改成......
  • 使用 oceanbase做为xxljob 以及nacos 的mysql存储
    如果我们的xxljob以及nacos集群比较多,同时需要提供ha以及灵活的mysql管理能力,基于tidb以及oceanbase是一个不错的选择oceanbase多租户能力上是比较方便的,可以更好......
  • bladex-xxl-job使用
    1.整合bladex,配置docker,install 后编辑DockerFile.. build..push到HaBor,并且安装jdknacossentinelredisminio等等组件,这里只展示dockercomposeyml内容vers......
  • 14. Kubernetes - Job 和 CronJob
    JobJob负责处理任务,主要为仅执行一次的任务,它保证批处理任务的一个或多个Pod成功结束。Job的资源清单如下所示:apiVersion:batch/v1kind:Jobmetadata:name:j......
  • Newtonsoft.Json笔记 -JToken、JObject、JArray详解
    在原来解析json数据是,一般都是用反序列化来实现json数据的解读,这需要首先知道json数据的结构并且建立相应的类才能反序列化,一旦遇到动态的json数据,这种方法就不使用。为了......
  • 自动化测试 - php调用jenkins-api构建job
    背景:前一节搭建jenkins并创建了job任务后,用户可以在jenkins平台手动点击构建来执行pytest自动化任务,想实现一个从web界面去触发构建jenkins中的job任务,在这选择使用php来调......