定时任务实现方式
- quartz 定时调用http 请求
- quertz 定时调用 openfegin
- quartz 定时调用普通定时任务
springboot 集成quartz
pom.xml 添加配置
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-quartz</artifactId>
</dependency>
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
</dependency>
数据库设计
CREATE TABLE `wxapp_dispatch` (
`id` varchar(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT 'id',
`name` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '任务名称',
`sys_name` varchar(20) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '系统名称',
`user_id` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '登录用户id',
`cron` varchar(30) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT 'corn表达式',
`url` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '请求地址',
`type` varchar(30) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '请求类型',
`code` varchar(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '代码',
`header` varchar(255) COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '头部参数',
`param` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '请求参数',
`weight` int DEFAULT NULL COMMENT '权重',
`request_type` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '请求类型',
`open` tinyint(1) DEFAULT NULL COMMENT '是否开启',
`counts` mediumtext CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci COMMENT '执行次数',
`last_time` datetime DEFAULT NULL COMMENT '最后一次执行时间',
`stop_time` datetime DEFAULT NULL COMMENT '停止时间',
`dispatch_group` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT 'group',
`class_name` varchar(200) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '类名',
`dispatch_status` varchar(20) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '强求状态',
`remark` varchar(200) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '备注',
`status` tinyint(1) DEFAULT NULL COMMENT '状态;true 存在 false 不存在',
`create_by` varchar(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '创建人',
`create_time` datetime DEFAULT NULL COMMENT '创建时间',
`update_by` varchar(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '更新人',
`update_time` datetime DEFAULT NULL COMMENT '更新时间',
PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci ROW_FORMAT=DYNAMIC COMMENT='定时调度表';
CREATE TABLE `wxapp_dispatch_logs` (
`id` bigint NOT NULL AUTO_INCREMENT COMMENT 'id',
`dispatch_code` varchar(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '调度id',
`run_time` mediumtext CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci COMMENT '运行时间',
`type` varchar(30) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '失败类型',
`logs` varchar(500) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '日志',
`status` tinyint(1) DEFAULT NULL COMMENT '状态;true 存在 false 不存在',
`create_by` varchar(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '创建人',
`create_time` datetime DEFAULT NULL COMMENT '创建时间',
`update_by` varchar(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '更新人',
`update_time` datetime DEFAULT NULL COMMENT '更新时间',
PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=1684530168993525763 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci ROW_FORMAT=DYNAMIC COMMENT='调度日志';
quartz 配置
package com.wxapp.dispatch.service.config;
import org.quartz.spi.JobFactory;
import org.springframework.beans.factory.config.PropertiesFactoryBean;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.quartz.SchedulerFactoryBean;
import org.springframework.scheduling.quartz.SpringBeanJobFactory;
import java.io.IOException;
import java.util.Properties;
@Configuration
public class SchedulerConfig {
@Bean
public JobFactory jobFactory(ApplicationContext applicationContext) {
SpringBeanJobFactory jobFactory = new SpringBeanJobFactory();
jobFactory.setApplicationContext(applicationContext);
return jobFactory;
}
/**
* 调度工厂bean
*
* @param jobFactory
* @throws IOException
*/
@Bean
public SchedulerFactoryBean schedulerFactoryBean(JobFactory jobFactory) throws IOException {
SchedulerFactoryBean factory = new SchedulerFactoryBean();
factory.setJobFactory(jobFactory);
//QuartzScheduler 延时启动,应用启动完5秒后 QuartzScheduler 再启动
factory.setStartupDelay(1);
// this allows to update triggers in DB when updating settings in config file:
//用于quartz集群,QuartzScheduler 启动时更新己存在的Job,这样就不用每次修改targetObject后删除qrtz_job_details表对应记录了
factory.setOverwriteExistingJobs(true);
//用于quartz集群,加载quartz数据源配置
factory.setQuartzProperties(quartzProperties());
factory.setAutoStartup(true);
return factory;
}
/**
* 加载quartz数据源配置,quartz集群时用到
*/
@Bean
public Properties quartzProperties() throws IOException {
PropertiesFactoryBean propertiesFactoryBean = new PropertiesFactoryBean();
Properties properties = new Properties();
properties.put("org.quartz.scheduler.instanceName", "DefaultQuartzScheduler");
properties.put("org.quartz.scheduler.instanceId", "AUTO");
properties.put("org.quartz.scheduler.makeSchedulerThreadDaemon", "true");
properties.put("org.quartz.threadPool.class", "org.quartz.simpl.SimpleThreadPool");
properties.put("org.quartz.threadPool.makeThreadsDaemons", "true");
properties.put("org.quartz.threadPool.threadCount", "20");
properties.put("org.quartz.threadPool.threadPriority", "5");
properties.put("org.quartz.jobStore.class", "org.quartz.simpl.RAMJobStore");
properties.put("org.quartz.jobStore.misfireThreshold", "60000");
propertiesFactoryBean.setProperties(properties);
propertiesFactoryBean.afterPropertiesSet();
return propertiesFactoryBean.getObject();
}
}
quartz util
package com.wxapp.dispatch.service.util;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.wxapp.common.base.util.jackson.JacksonUtil;
import com.wxapp.common.util.check.CheckUtil;
import com.wxapp.common.util.copy.CopyUtil;
import com.wxapp.dispatch.api.pojo.entity.DispatchDO;
import com.wxapp.dispatch.api.pojo.vm.dispatch.DispatchVm;
import com.wxapp.dispatch.api.pojo.vo.dispatch.DispatchVo;
import com.wxapp.dispatch.service.listener.TimerSchedulerLister;
import com.wxapp.dispatch.service.mapper.DispatchLogsMapper;
import com.wxapp.dispatch.service.mapper.DispatchMapper;
import org.quartz.*;
import org.quartz.impl.StdSchedulerFactory;
import org.quartz.impl.matchers.GroupMatcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
import org.springframework.util.Assert;
import java.lang.reflect.InvocationTargetException;
import java.util.*;
/**
* 定时任务增删改查
* CommandLineRunner 项目启动时调用run 方法把已开启的定时任务开启
*/
@Component
public class JobUtil implements CommandLineRunner {
private final Logger log = LoggerFactory.getLogger(JobUtil.class);
private final DispatchMapper dispatchMapper;
private final DispatchLogsMapper dispatchLogsMapper;
public JobUtil(DispatchMapper dispatchMapper, DispatchLogsMapper dispatchLogsMapper) {
this.dispatchMapper = dispatchMapper;
this.dispatchLogsMapper = dispatchLogsMapper;
}
@Override
public void run(String... args) throws Exception {
var query = new LambdaQueryWrapper<DispatchDO>();
query.eq(DispatchDO::getStatus, true).eq(DispatchDO::getOpen,true)
.orderByAsc(DispatchDO::getWeight);
var dispatchDOS = dispatchMapper.selectList(query);
if (CheckUtil.isNotNullList(dispatchDOS)) {
addOrUpdateJob(CopyUtil.copyListNew(dispatchDOS, DispatchVm::new, null));
}
}
/**
* 添加或者更新job
*/
public void addOrUpdateJob(List<DispatchVm> dispatchVm) throws SchedulerException {
if (CheckUtil.isNullList(dispatchVm)) {
return;
}
Map<JobDetail, Set<? extends Trigger>> triggersAndJobs = new HashMap<>(0);
var scheduler = getScheduler();
dispatchVm.forEach(st -> {
//新增定时任务
try {
//获去调度器实例
TriggerKey triggerKey = TriggerKey.triggerKey(st.getCode(), st.getDispatchGroup());
CronTrigger trigger = (CronTrigger) scheduler.getTrigger(triggerKey);
Class clazz = Class.forName(st.getClassName());
clazz.getDeclaredConstructor().newInstance();
// 构建job信息
JobDetail jobDetail = JobBuilder.newJob(clazz).usingJobData(dealData(st)).withIdentity(st.getCode(), st.getDispatchGroup()).build();
// 表达式调度构建器(即任务执行的时间)
CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(st.getCron());
TriggerBuilder triggerBuilder;
if (trigger == null) {
// 按新的cronExpression表达式构建一个新的trigger
triggerBuilder = TriggerBuilder.newTrigger().withIdentity(st.getCode(), st.getDispatchGroup()).withSchedule(scheduleBuilder).endAt(st.getStopTime());
} else {
triggerBuilder = trigger.getTriggerBuilder().withIdentity(triggerKey).withSchedule(scheduleBuilder).endAt(st.getStopTime());
}
if (st.getOpen()){
trigger= (CronTrigger) triggerBuilder.startNow().build();
}else {
trigger= (CronTrigger) triggerBuilder.build();
}
//设置job执行
triggersAndJobs.put(jobDetail, Set.of(trigger));
} catch (Exception e) {
throw new RuntimeException(e);
}
});
if (scheduler != null) {
scheduler.scheduleJobs(triggersAndJobs, true);
scheduler.start();
}
}
/**
* 暂停job
*/
public void removeOrStopJob(DispatchVm dispatchVm) {
try {
var scheduler = getScheduler();
JobKey jobKey = JobKey.jobKey(dispatchVm.getCode(), dispatchVm.getDispatchGroup());
scheduler.deleteJob(jobKey);
} catch (SchedulerException e) {
throw new RuntimeException(e);
}
}
/**
* 立即执行
*/
public void runAJobNow(DispatchVm dispatchVm) {
try {
var scheduler = getScheduler();
JobKey jobKey = JobKey.jobKey(dispatchVm.getCode(), dispatchVm.getDispatchGroup());
scheduler.triggerJob(jobKey);
} catch (SchedulerException e) {
throw new RuntimeException(e);
}
}
/**
* 获取所有的执行job
*/
public List<DispatchVo> getAllJob() {
try {
var scheduler = getScheduler();
List<DispatchVo> jobList = new ArrayList<DispatchVo>();
GroupMatcher<JobKey> matcher = GroupMatcher.anyJobGroup();
Set<JobKey> jobKeys = scheduler.getJobKeys(matcher);
jobKeys.forEach(jobKey -> {
List<? extends Trigger> triggers = null;
try {
triggers = scheduler.getTriggersOfJob(jobKey);
} catch (SchedulerException e) {
throw new RuntimeException(e);
}
if (triggers != null) {
triggers.forEach(trigger -> {
DispatchVo job = new DispatchVo();
job.setName(jobKey.getName());
job.setDispatchGroup(jobKey.getGroup());
if (trigger instanceof CronTrigger) {
CronTrigger cronTrigger = (CronTrigger) trigger;
String cronExpression = cronTrigger.getCronExpression();
job.setCron(cronExpression);
}
jobList.add(job);
});
}
});
return jobList;
} catch (SchedulerException e) {
throw new RuntimeException(e);
}
}
/**
* 获取正在运行的job
*/
public List<DispatchVo> getRunningJob() {
try {
var scheduler = getScheduler();
List<JobExecutionContext> executingJobs = scheduler.getCurrentlyExecutingJobs();
var jobList = new ArrayList<DispatchVo>();
executingJobs.forEach(s -> {
DispatchVo job = new DispatchVo();
JobDetail jobDetail = s.getJobDetail();
JobKey jobKey = jobDetail.getKey();
job.setCode(jobKey.getName());
job.setDispatchGroup(jobKey.getGroup());
Trigger trigger = s.getTrigger();
if (trigger instanceof CronTrigger) {
CronTrigger cronTrigger = (CronTrigger) trigger;
String cronExpression = cronTrigger.getCronExpression();
job.setCron(cronExpression);
}
jobList.add(job);
});
return jobList;
} catch (SchedulerException e) {
throw new RuntimeException(e);
}
}
/**
* 暂停所有job
*/
public void pauseAllJobs() {
try {
var scheduler = getScheduler();
scheduler.pauseAll();
} catch (SchedulerException e) {
throw new RuntimeException(e);
}
}
/**
* 恢复所有job
*/
public void resumeAllJobs() {
try {
var scheduler = getScheduler();
scheduler.resumeAll();
} catch (SchedulerException e) {
throw new RuntimeException(e);
}
}
/**
* 设置参数
*
* @param dispatchVm
* @return
*/
private JobDataMap dealData(DispatchVm dispatchVm) {
JobDataMap jobDataMap = new JobDataMap();
if (CheckUtil.isNotNullObject(dispatchVm.getParam())) {
jobDataMap.putAll(JacksonUtil.parseObject(dispatchVm, Map.class));
}
log.info("调度参数" + JacksonUtil.toJSONStringPretty(jobDataMap));
return jobDataMap;
}
private Scheduler getScheduler() {
try {
var scheduler = StdSchedulerFactory.getDefaultScheduler();
scheduler.getListenerManager().addSchedulerListener(new TimerSchedulerLister(dispatchLogsMapper));
return scheduler;
} catch (SchedulerException e) {
e.printStackTrace();
}
return null;
}
}
quartz 定时任务增删改查
- controller
package com.wxapp.dispatch.service.controller;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.wxapp.common.constant.base.IBaseController;
import com.wxapp.common.constant.vo.ResultVo;
import com.wxapp.dispatch.api.pojo.vm.dispatch.AddVm;
import com.wxapp.dispatch.api.pojo.vm.dispatch.PageVm;
import com.wxapp.dispatch.api.pojo.vm.dispatch.UpdateVm;
import com.wxapp.dispatch.api.pojo.vo.dispatch.DetailVo;
import com.wxapp.dispatch.api.pojo.vo.dispatch.PageVo;
import com.wxapp.dispatch.service.service.DispatchService;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import org.quartz.SchedulerException;
import org.springframework.web.bind.annotation.*;
/**
* @Description 调度管理
* @Datetime 2023/5/26 17:58
* @Modified By
*/
@RestController
@Api(tags = "调度管理")
@RequestMapping("dispatch")
public class DispatchController extends IBaseController {
private DispatchService dispatchService;
public DispatchController(DispatchService dispatchService) {
this.dispatchService = dispatchService;
}
@PostMapping("page")
@ApiOperation("定时任务分页")
public ResultVo<Page<PageVo>> page(@RequestBody PageVm vm) {
return dispatchService.page(vm);
}
@GetMapping("detail/{id}")
@ApiOperation("详情")
public ResultVo<DetailVo> detail(@PathVariable String id) {
return dispatchService.detail(id);
}
@PostMapping("add")
@ApiOperation("添加")
public ResultVo<String> add(@RequestBody AddVm vm) throws SchedulerException {
return dispatchService.add(vm);
}
@PutMapping("update")
@ApiOperation("更新")
public ResultVo<String> update(@RequestBody UpdateVm vm) throws SchedulerException {
return dispatchService.update(vm);
}
@DeleteMapping("delete/{id}")
@ApiOperation("删除定时任务")
public ResultVo<String> delete(@PathVariable String id) {
return dispatchService.delete(id);
}
@PutMapping("pauseOrRes/{id}")
@ApiOperation("暂停或恢复执行定时任务")
public ResultVo<String> pauseOrRes(@PathVariable String id, @RequestBody Boolean isPause) throws SchedulerException {
return dispatchService.pauseOrRes(id, isPause);
}
}
- service
package com.wxapp.dispatch.service.service;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.wxapp.common.constant.vo.ResultVo;
import com.wxapp.dispatch.api.pojo.vm.dispatch.AddVm;
import com.wxapp.dispatch.api.pojo.vm.dispatch.PageVm;
import com.wxapp.dispatch.api.pojo.vm.dispatch.UpdateVm;
import com.wxapp.dispatch.api.pojo.vo.dispatch.DetailVo;
import com.wxapp.dispatch.api.pojo.vo.dispatch.PageVo;
import org.quartz.SchedulerException;
/**
* @Description
* @Datetime 2023/5/26 18:01
* @Modified By
*/
public interface DispatchService {
ResultVo<String> add(AddVm vm) throws SchedulerException;
ResultVo<DetailVo> detail(String id);
ResultVo<Page<PageVo>> page(PageVm vm);
ResultVo<String> update(UpdateVm vm) throws SchedulerException;
ResultVo<String> delete(String id);
ResultVo<String> pauseOrRes(String id, Boolean isPause) throws SchedulerException;
}
- serviceimpl
package com.wxapp.dispatch.service.service.impl;
import com.baomidou.mybatisplus.core.enums.SqlKeyword;
import com.baomidou.mybatisplus.core.toolkit.support.SFunction;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.wxapp.common.constant.base.IBaseService;
import com.wxapp.common.constant.vo.ResultVo;
import com.wxapp.common.util.bean.SpringUtils;
import com.wxapp.common.util.condition.ConditionUtil;
import com.wxapp.common.util.copy.CopyUtil;
import com.wxapp.common.util.random.RandomUtil;
import com.wxapp.dispatch.api.pojo.entity.DispatchDO;
import com.wxapp.dispatch.api.enums.DispatchStatusEnum;
import com.wxapp.dispatch.api.pojo.vm.dispatch.AddVm;
import com.wxapp.dispatch.api.pojo.vm.dispatch.DispatchVm;
import com.wxapp.dispatch.api.pojo.vm.dispatch.PageVm;
import com.wxapp.dispatch.api.pojo.vm.dispatch.UpdateVm;
import com.wxapp.dispatch.api.pojo.vo.dispatch.DetailVo;
import com.wxapp.dispatch.api.pojo.vo.dispatch.PageVo;
import com.wxapp.dispatch.service.mapper.DispatchMapper;
import com.wxapp.dispatch.service.service.DispatchService;
import com.wxapp.dispatch.service.util.JobUtil;
import org.quartz.SchedulerException;
import org.springframework.stereotype.Service;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* @Description
* @Datetime 2023/5/26 18:01
* @Modified By
*/
@Service
public class DispatchServiceImpl extends IBaseService implements DispatchService {
private DispatchMapper dispatchMapper;
private JobUtil jobUtil;
public DispatchServiceImpl(DispatchMapper dispatchMapper, JobUtil jobUtil) {
this.dispatchMapper = dispatchMapper;
this.jobUtil = jobUtil;
}
@Override
public ResultVo<String> add(AddVm vm) throws SchedulerException {
try {
var dispatch = CopyUtil.copyObjectNew(vm, DispatchDO::new, (s, r) -> {
r.setCode(RandomUtil.getUUID());
r.setOpen(false);
r.setDispatchStatus(DispatchStatusEnum.PAUSE);
});
Object classBean = SpringUtils.getBean(vm.getClassName());
dispatch.setClassName(classBean.getClass().getName());
dispatchMapper.insert(dispatch);
var dispatchVm = CopyUtil.copyObjectNew(dispatch, DispatchVm::new, null);
if (dispatch.getOpen()) {
jobUtil.addOrUpdateJob(List.of(dispatchVm));
}
} catch (Exception e) {
return error("没有找到类信息");
}
return success();
}
@Override
public ResultVo<DetailVo> detail(String id) {
var dispatchDO = dispatchMapper.selectById(id);
return success(CopyUtil.copyObjectNew(dispatchDO, DetailVo::new, null));
}
@Override
public ResultVo<Page<PageVo>> page(PageVm vm) {
Map<SFunction<DispatchDO, ?>, SqlKeyword> condition = new HashMap<>();
condition.put(DispatchDO::getName, SqlKeyword.LIKE);
condition.put(DispatchDO::getLastTime, SqlKeyword.DESC);
var page = ConditionUtil.getPage(vm, DispatchDO.class);
var conditions = ConditionUtil.getSCondition(DispatchDO::new, vm, PageVo.class, condition);
var doPage = dispatchMapper.selectPage(page, conditions);
return success(CopyUtil.copyPage(doPage, PageVo::new, null));
}
@Override
public ResultVo<String> update(UpdateVm vm) throws SchedulerException {
var dispatchDO = dispatchMapper.selectById(vm.getId());
CopyUtil.copyObject(vm, dispatchDO, (s, r) -> {
r.setClassName(SpringUtils.getBean(s.getClassName()).getClass().getName());
});
dispatchMapper.updateById(dispatchDO);
var dispatchVm = CopyUtil.copyObjectNew(dispatchDO, DispatchVm::new, null);
jobUtil.addOrUpdateJob(List.of(dispatchVm));
return success();
}
@Override
public ResultVo<String> delete(String id) {
var dispatchDO = dispatchMapper.selectById(id);
dispatchDO.setStatus(false);
dispatchMapper.updateById(dispatchDO);
var dispatchVm = CopyUtil.copyObjectNew(dispatchDO, DispatchVm::new, null);
jobUtil.removeOrStopJob(dispatchVm);
return success();
}
@Override
public ResultVo<String> pauseOrRes(String id, Boolean isPause) throws SchedulerException {
var dispatchDO = dispatchMapper.selectById(id);
dispatchDO.setOpen(!isPause);
if (isPause) {
dispatchDO.setDispatchStatus(DispatchStatusEnum.PAUSE);
} else {
dispatchDO.setDispatchStatus(DispatchStatusEnum.RUN);
}
dispatchMapper.updateById(dispatchDO);
var dispatchVm = CopyUtil.copyObjectNew(dispatchDO, DispatchVm::new, null);
if (isPause) {
jobUtil.removeOrStopJob(dispatchVm);
} else {
jobUtil.addOrUpdateJob(List.of(dispatchVm));
}
return success();
}
}
SchedulerLister 定时任务监听
package com.wxapp.dispatch.service.listener;
import com.wxapp.common.base.util.jackson.JacksonUtil;
import com.wxapp.common.util.bean.SpringUtils;
import com.wxapp.dispatch.api.pojo.entity.DispatchLogsDO;
import com.wxapp.dispatch.service.mapper.DispatchLogsMapper;
import com.wxapp.dispatch.service.util.JobUtil;
import lombok.AllArgsConstructor;
import lombok.NoArgsConstructor;
import org.quartz.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class TimerSchedulerLister implements SchedulerListener {
private static final Logger log = LoggerFactory.getLogger(JobUtil.class);
private DispatchLogsMapper dispatchLogsMapper;
private Map map = new ConcurrentHashMap();
public TimerSchedulerLister(DispatchLogsMapper dispatchLogsMapper) {
this.dispatchLogsMapper = dispatchLogsMapper;
}
@Override
public void jobScheduled(Trigger trigger) {
map.put("name", trigger.getKey().getName());
map.put("group", trigger.getKey().getGroup());
System.out.println("调用时参数:" + JacksonUtil.toJSONString(map));
}
@Override
public void jobUnscheduled(TriggerKey triggerKey) {
}
@Override
public void triggerFinalized(Trigger trigger) {
}
@Override
public void triggerPaused(TriggerKey triggerKey) {
log.info("暂停:" + triggerKey.getName());
}
@Override
public void triggersPaused(String triggerGroup) {
}
@Override
public void triggerResumed(TriggerKey triggerKey) {
log.info("取消暂停:" + triggerKey.getName());
}
@Override
public void triggersResumed(String triggerGroup) {
log.info("取消暂停:" + triggerGroup);
}
@Override
public void jobAdded(JobDetail jobDetail) {
}
@Override
public void jobDeleted(JobKey jobKey) {
}
@Override
public void jobPaused(JobKey jobKey) {
}
@Override
public void jobsPaused(String jobGroup) {
}
@Override
public void jobResumed(JobKey jobKey) {
}
@Override
public void jobsResumed(String jobGroup) {
}
@Override
public void schedulerError(String msg, SchedulerException cause) {
log.error("定时任务保存:" + msg);
DispatchLogsDO dispatchLogsDO = new DispatchLogsDO();
dispatchLogsDO.setDispatchCode((String) map.get("code"));
dispatchLogsDO.setLogs(cause.getMessage());
dispatchLogsMapper.insert(dispatchLogsDO);
}
@Override
public void schedulerInStandbyMode() {
}
@Override
public void schedulerStarted() {
}
@Override
public void schedulerStarting() {
}
@Override
public void schedulerShutdown() {
}
@Override
public void schedulerShuttingdown() {
}
@Override
public void schedulingDataCleared() {
}
}
job 实现
我自己目录如下
- BaseJob
package com.wxapp.dispatch.service.job;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.wxapp.common.base.util.jackson.JacksonUtil;
import com.wxapp.common.constant.vo.ResultVo;
import com.wxapp.common.util.bean.SpringUtils;
import com.wxapp.common.util.check.CheckUtil;
import com.wxapp.common.util.redisson.RedissonUtil;
import com.wxapp.dispatch.api.feign.LoginFeign;
import com.wxapp.dispatch.api.pojo.entity.DispatchDO;
import com.wxapp.dispatch.api.pojo.po.vm.PLoginVm;
import com.wxapp.dispatch.api.pojo.vm.dispatch.DispatchVm;
import com.wxapp.dispatch.service.job.openfegin.OpenFeginJob;
import com.wxapp.dispatch.service.mapper.DispatchMapper;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.quartz.QuartzJobBean;
import java.time.LocalDateTime;
import java.util.Date;
public abstract class BaseJob extends QuartzJobBean {
private static final Logger log = LoggerFactory.getLogger(OpenFeginJob.class);
@Override
protected void executeInternal(JobExecutionContext context) throws JobExecutionException {
// 分布式事务
if (!RedissonUtil.tryLock(context.getJobDetail().getKey().toString())) {
log.error("*****************调度任务" + context.getJobDetail().getKey().toString() + "正在执行*********************");
return;
}
log.info("**********执行定时任务调用开始******************");
var startTime = System.currentTimeMillis();
try {
//获取请求参数
var dispatchVm = JacksonUtil.parseObject(JacksonUtil.toJSONString(context.getMergedJobDataMap()), DispatchVm.class);
//获取token
if (CheckUtil.isNotNullString(dispatchVm.getUserId())) {
ResultVo<PLoginVm> resultVo;
String token = "";
var loginFeign = SpringUtils.getBean(LoginFeign.class);
if (context.getNextFireTime() != null) {
resultVo = loginFeign.remoteLogin(dispatchVm.getUserId(), (new Date().getTime() - context.getNextFireTime().getTime()) / 1000 - 1);
} else {
resultVo = loginFeign.remoteLogin(dispatchVm.getUserId(), null);
}
if (resultVo.getData() != null) {
token = resultVo.getData().getToken();
}
dispatchVm.setToken(token);
log.info("登录获取的token" + token);
}
log.info("**********执行结果:" + JacksonUtil.toJSONString(toDealJob(context, dispatchVm)) + "******************");
} catch (Exception e) {
log.error(e.getMessage());
} finally {
saveDispatch(context.getJobDetail().getKey().getName());
RedissonUtil.unLock(context.getJobDetail().getKey().toString());
log.info("**********执行定时任务" + context.getJobDetail().getKey().toString() + "结束******************");
log.info("************************执行时间:" + (System.currentTimeMillis() - startTime) + "ms ******************************");
}
}
@Async
protected void saveDispatch(String code) {
var wrapper = new LambdaUpdateWrapper<DispatchDO>();
wrapper.eq(DispatchDO::getCode, code)
.set(DispatchDO::getLastTime, LocalDateTime.now())
.setSql("`counts`=`counts`+1");
SpringUtils.getBean(DispatchMapper.class).update(null, wrapper);
}
protected abstract Object toDealJob(JobExecutionContext context, DispatchVm dispatchVm);
}
- OpenFeginJob quartz 调用OpenFegin
package com.wxapp.dispatch.service.job.openfegin;
import com.wxapp.common.constant.vo.ResultVo;
import com.wxapp.dispatch.api.pojo.vm.dispatch.DispatchVm;
import com.wxapp.dispatch.service.job.BaseJob;
import com.wxapp.dispatch.service.service.DynamicClientService;
import lombok.NoArgsConstructor;
import org.quartz.JobExecutionContext;
import org.springframework.stereotype.Component;
@Component(value = "OpenFeginJob")
@NoArgsConstructor
public class OpenFeginJob extends BaseJob {
private DynamicClientService dynamicClientService;
public OpenFeginJob(DynamicClientService dynamicClientService) {
this.dynamicClientService = dynamicClientService;
}
@Override
public Object toDealJob(JobExecutionContext context, DispatchVm dispatchVm) {
ResultVo<Object> res;
//请求
switch (dispatchVm.getType().getValue()) {
case "POST":
res = dynamicClientService.executePostApi(dispatchVm.getSysName().getValue(), dispatchVm.getUrl(), dispatchVm.getToken(), dispatchVm.getParam());
break;
case "DELETE":
res = dynamicClientService.executeDeleteApi(dispatchVm.getSysName().getValue(), dispatchVm.getUrl(), dispatchVm.getToken(), dispatchVm.getParam());
break;
case "PUT":
res = dynamicClientService.executePutApi(dispatchVm.getSysName().getValue(), dispatchVm.getUrl(), dispatchVm.getToken(), dispatchVm.getParam());
break;
case "GET":
default:
res = dynamicClientService.executeGetApi(dispatchVm.getSysName().getValue(), dispatchVm.getUrl(), dispatchVm.getToken(), dispatchVm.getParam());
break;
}
return res;
}
}
- HttpRequestJob quartz http 调用
package com.wxapp.dispatch.service.job.http;
import com.wxapp.common.base.util.jackson.JacksonUtil;
import com.wxapp.common.constant.vo.HttpRequestVo;
import com.wxapp.common.util.http.HttpRequestUtil;
import com.wxapp.dispatch.api.pojo.vm.dispatch.DispatchVm;
import com.wxapp.dispatch.service.job.BaseJob;
import lombok.NoArgsConstructor;
import org.quartz.JobExecutionContext;
import org.springframework.http.HttpMethod;
import org.springframework.stereotype.Component;
import java.util.Map;
@Component(value = "HttpJob")
@NoArgsConstructor
public class HttpRequestJob extends BaseJob {
@Override
public Object toDealJob(JobExecutionContext context, DispatchVm dispatchVm) {
Map<String, Object> data = dispatchVm.getParam() == null ? null : JacksonUtil.parseObject(dispatchVm.getParam(), Map.class);
HttpRequestVo<Map<String, Object>> request = new HttpRequestVo<>(dispatchVm.getUrl(), null, data, dispatchVm.getMapHearder(), data, "200", "code", "定时任务调用失败");
Map res;
//请求
switch (dispatchVm.getRequestType().getValue()) {
case "POST":
request.setMethod(HttpMethod.POST);
break;
case "DELETE":
request.setMethod(HttpMethod.DELETE);
break;
case "PUT":
request.setMethod(HttpMethod.PUT);
break;
case "GET":
default:
request.setMethod(HttpMethod.GET);
break;
}
return HttpRequestUtil.request(request);
}
}
- CommonTextJob 普通的quartz 调用
package com.wxapp.dispatch.service.job.common;
import com.wxapp.dispatch.api.pojo.vm.dispatch.DispatchVm;
import com.wxapp.dispatch.service.job.BaseJob;
import lombok.NoArgsConstructor;
import org.quartz.JobExecutionContext;
import org.springframework.stereotype.Component;
/**
* @Description
* @Datetime 2023/6/2 9:28
* @Modified By
*/
@Component(value = "CommonTextJob")
@NoArgsConstructor
public class CommonTextJob extends BaseJob {
@Override
protected Object toDealJob(JobExecutionContext context, DispatchVm dispatchVm) {
// 处理业务
return null;
}
}
openfegin 调用是用代理模式开发的
- openfegin service
package com.wxapp.dispatch.service.service;
import com.wxapp.common.constant.vo.ResultVo;
/**
* @Description
* @Datetime 2023/5/25 16:18
* @Modified By
*/
public interface DynamicClientService {
/**
* post 请求
*/
public ResultVo<Object> executePostApi(String feignName, String url, String token, Object params);
/**
* get请求
*/
public ResultVo<Object> executeGetApi(String feignName, String url, String token, Object params);
/**
* delete 请求
*/
public ResultVo<Object> executeDeleteApi(String feignName, String url, String token, Object params);
/**
* put 请求
*/
public ResultVo<Object> executePutApi(String feignName, String url, String token, Object params);
}
- openfegin serviceimpl
package com.wxapp.dispatch.service.service.impl;
import com.wxapp.common.constant.vo.ResultVo;
import com.wxapp.dispatch.api.feign.DynamicFeign;
import com.wxapp.dispatch.service.service.DynamicClientService;
import com.wxapp.dispatch.service.service.FeignClientFactoryService;
import org.springframework.stereotype.Service;
@Service
public class DynamicClientServiceImpl implements DynamicClientService {
private FeignClientFactoryService<DynamicFeign> feignFeignClientFactory;
public DynamicClientServiceImpl(FeignClientFactoryService<DynamicFeign> feignFeignClientFactory) {
this.feignFeignClientFactory = feignFeignClientFactory;
}
@Override
public ResultVo<Object> executePostApi(String feignName, String url, String token, Object params) {
DynamicFeign dynamicService = feignFeignClientFactory.getFeignClient(DynamicFeign.class, feignName);
return dynamicService.executePostApi(url, token, params);
}
@Override
public ResultVo<Object> executeGetApi(String feignName, String url, String token, Object params) {
DynamicFeign dynamicService = feignFeignClientFactory.getFeignClient(DynamicFeign.class, feignName);
return dynamicService.executeGetApi(url, token, params);
}
@Override
public ResultVo<Object> executeDeleteApi(String feignName, String url, String token, Object params) {
DynamicFeign dynamicService = feignFeignClientFactory.getFeignClient(DynamicFeign.class, feignName);
return dynamicService.executeDeleteApi(url, token, params);
}
@Override
public ResultVo<Object> executePutApi(String feignName, String url, String token, Object params) {
DynamicFeign dynamicService = feignFeignClientFactory.getFeignClient(DynamicFeign.class, feignName);
return dynamicService.executePutApi(url, token, params);
}
}
- openFeign 调用接口
package com.wxapp.dispatch.api.feign;
import com.wxapp.common.constant.constant.LoginConstant;
import com.wxapp.common.constant.vo.ResultVo;
import org.springframework.cloud.openfeign.SpringQueryMap;
import org.springframework.web.bind.annotation.*;
public interface DynamicFeign {
@PostMapping("{url}")
ResultVo<Object> executePostApi(@PathVariable("url") String url, @RequestHeader(LoginConstant.INTERNAL_TOKEN_KEY) String token, @RequestBody Object params);
@GetMapping("{url}")
ResultVo<Object> executeGetApi(@PathVariable("url") String url, @RequestHeader(LoginConstant.INTERNAL_TOKEN_KEY) String token, @SpringQueryMap Object params);
@DeleteMapping("{url}")
ResultVo<Object> executeDeleteApi(@PathVariable("url") String url, @RequestHeader(LoginConstant.INTERNAL_TOKEN_KEY) String token, @RequestBody Object params);
@PutMapping("{url}")
ResultVo<Object> executePutApi(@PathVariable("url") String url, @RequestHeader(LoginConstant.INTERNAL_TOKEN_KEY) String token, @RequestBody Object params);
}
FeignClientFactory 设计
- FeignClientFactoryService
package com.wxapp.dispatch.service.service;
/**
* @Description
* @Datetime 2023/5/25 16:13
* @Modified By
*/
public interface FeignClientFactoryService<T> {
public T getFeignClient(final Class<T> type, String serviceId);
}
- FeignClientFactoryServiceImpl
package com.wxapp.dispatch.service.service.impl;
import com.wxapp.dispatch.service.service.FeignClientFactoryService;
import org.springframework.cloud.openfeign.FeignClientBuilder;
import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Service;
@Service
public class FeignClientFactoryServiceImpl<T> implements FeignClientFactoryService<T> {
private FeignClientBuilder feignClientBuilder;
public FeignClientFactoryServiceImpl(ApplicationContext appContext) {
this.feignClientBuilder = new FeignClientBuilder(appContext);
}
@Override
public T getFeignClient(final Class<T> type, String serviceId) {
return this.feignClientBuilder.forType(type, serviceId).build();
}
}
http 调用工具类
package com.wxapp.common.util.http;
import com.wxapp.common.base.util.jackson.JacksonUtil;
import com.wxapp.common.constant.vo.HttpRequestVo;
import com.wxapp.common.core.filter.except.ThrowMyException;
import com.wxapp.common.util.check.CheckUtil;
import org.springframework.http.HttpEntity;
import org.springframework.web.client.RestTemplate;
import java.lang.reflect.Field;
import java.util.Map;
/**
* @Description http请求工具包
* @Datetime 2022/4/23 11:56
* @Modified By
*/
public class HttpRequestUtil {
private static final RestTemplate restTemplate = new RestTemplate();
/**
* TODO get请求
*/
public static <T> Map request(HttpRequestVo<T> requestVo) {
try {
var url = urlAppendValue(requestVo.getUrl(), requestVo.getUrlAppend());
url = urlAppendValue(url, requestVo.getData());
String exchange;
HttpEntity<T> httpEntity = new HttpEntity<>(requestVo.getData(), requestVo.getHeader());
if (CheckUtil.isNotNullObject(requestVo.getUrlAppend())) {
exchange = (String) restTemplate.exchange(url, requestVo.getMethod(), httpEntity, String.class, requestVo.getUrlAppend()).getBody();
} else {
exchange = restTemplate.exchange(url, requestVo.getMethod(), httpEntity, String.class).getBody();
}
var map = JacksonUtil.parseObject(exchange, Map.class);
if (valiResult(map, requestVo)) {
throw new ThrowMyException(requestVo.getErrMassage());
}
return map;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
/**
* TODO: url 处理
*/
private static <T> String urlAppendValue(String url, T urlAppend) {
StringBuffer urlApendValue = new StringBuffer();
if (CheckUtil.isNotNullObject(urlAppend)) {
if (urlAppend instanceof Map) {
Map uls = (Map) urlAppend;
var set = uls.keySet();
for (var en : set) {
urlApendValue.append(en).append("=").append("{").append(en).append("}&");
}
return url + "?" + (urlApendValue.toString()).substring(0, urlApendValue.length() - 1);
}
var keySet = urlAppend.getClass();
var fields = keySet.getFields();
for (Field field : fields) {
var name = field.getName();
urlApendValue.append(name).append("=").append("{").append(name).append("}&");
}
return url + "?" + (urlApendValue.toString()).substring(0, urlApendValue.length() - 1);
}
return url;
}
public static <T> Boolean valiResult(Map map, HttpRequestVo<T> requestVo) {
String code = (String) map.get(requestVo.getStatusName());
return CheckUtil.isNullString(code) || !code.equals(requestVo.getSuccCode());
}
}
http 请求工具类封装
package com.wxapp.common.constant.vo;
import com.wxapp.common.base.util.jackson.JacksonUtil;
import lombok.Data;
import org.springframework.http.HttpMethod;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
import java.util.Map;
/**
* @Description 第三方请求封装类
* @Datetime 2023/4/7 11:17
* @Modified By
*/
@Data
public class HttpRequestVo<T> {
/**
* 请求url
*/
private String url;
/**
* 请求数据
*/
private T Data;
/**
* 请求头
*/
private MultiValueMap<String, String> header;
/**
* url 拼接参数 key 参数名称 value 参数值
*/
private Map urlAppend;
/**
* 成功码
*/
private String succCode;
/**
* 状态码字段
*/
private String statusName;
/**
* 失败信息
*/
private String errMassage;
/**
* 请求方式
*/
private HttpMethod method;
public HttpRequestVo() {
}
public HttpRequestVo(String url, HttpMethod method, T data, Object header, Object urlAppend, String succCode, String statusName, String errMassage) {
this.url = url;
this.method = method;
Data = data;
this.succCode = succCode;
this.statusName = statusName;
this.errMassage = errMassage;
if (urlAppend != null && !"".equals(String.valueOf(urlAppend))) {
try {
this.urlAppend = JacksonUtil.parseObject(urlAppend, Map.class);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
if (header != null && !"".equals(String.valueOf(header))) {
try {
this.header = JacksonUtil.parseObject(JacksonUtil.toJSONString(header), MultiValueMap.class);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
public MultiValueMap getHeader() {
if (this.header == null) {
return new LinkedMultiValueMap<>();
}
return this.header;
}
public void setHeader(String header) throws Exception {
if (header == null || "".equals(header)) {
return;
}
this.header = JacksonUtil.parseObject(header, MultiValueMap.class);
}
public void setUrlAppend(Map urlAppend) {
this.urlAppend = urlAppend;
}
public void setUrlAppend(String urlAppend) throws Exception {
if (urlAppend == null || "".equals(urlAppend)) {
return;
}
this.urlAppend = JacksonUtil.parseObject(urlAppend, Map.class);
}
}
有什么改进的欢迎评论
标签:quartz,springboot,dispatch,wxapp,import,定时,com,public,String From: https://www.cnblogs.com/wyxqlgl/p/17621169.html