应用背景
提供配置功能,用于固定周期的执行某个动作;如在基金交易的每个交易日结束时,需要根据当天交易量计算基金的收益,可以提供定时任务,在每天晚上固定的时间计算收益数据。
功能设计
提供任务数据表task_info和任务执行记录表task_log_info;通过扫描task_info表中所有的任务配置数据,将任务发送的RocketMQ,当消费者消费任务后按照指定的周期时间向RocketMQ发送延时执行任务,同时在task_log_info记录当此任务的消费记录。
数据表
task_info
CREATE TABLE `task_info` (
`id` char(36) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL,
`code` char(8) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL DEFAULT NULL,
`version` int NOT NULL,
`create_time` datetime NOT NULL,
`content` varchar(256) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL DEFAULT NULL COMMENT '消息内容,可传递自定义参数',
`title` varchar(128) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT '消息标题',
`topic` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT '消息主题',
`tag` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT '消息tag',
`execute_time` datetime NOT NULL COMMENT '待执行时间',
`interval_time` mediumtext CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT '周期间隔时间,单位:秒',
`send_status` varchar(1) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT '发送状态 0-待发送 1-发送成功 2-发送失败',
PRIMARY KEY (`id`) USING BTREE,
INDEX `topic`(`topic`, `tag`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_0900_ai_ci ROW_FORMAT = Dynamic;
SET FOREIGN_KEY_CHECKS = 1;
task_log_info
CREATE TABLE `task_log_info` (
`id` char(36) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL,
`code` char(8) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL DEFAULT NULL,
`version` int NOT NULL,
`create_time` datetime NOT NULL,
`key` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL DEFAULT NULL COMMENT '消息唯一键',
`topic` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT '消息主题',
`tag` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT '消息tag',
PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_0900_ai_ci ROW_FORMAT = Dynamic;
SET FOREIGN_KEY_CHECKS = 1;
功能逻辑
自动任务核心功能
- 在task_info表中插入自动任务功能的基本任务-任务表扫描,并将这个任务初始化发送到RocketMQ(仅需要初始化一次),设置间隔执行时间为5分钟。
- 提供“任务表扫描”的消费者,消费者查询任务表中当天和当天之前所有待发送到RocketMQ的任务,并根据配置的任务执行时间向RocketMQ发送定时消息
- “任务表扫描”消息每次消费结束时,需要向RocketMQ重新发送延时的“任务表扫描”消息,实现周期性执行的效果。
自动任务业务功能
在核心功能“任务表扫描”周期执行完成的基础上,按照如下方式添加业务类任务:
- 在task_info表中插入业务任务
- 一对一提供业务任务的消费监听代码
- 消费时按照校验重复消费、业务处理、消息重新延时发送到RocketMQ三个步骤进行
功能实现
核心功能部分
- 基本抽象类提供
将任务的消费分成消费前幂等性校验(通过task_log_info判断消息是否已消费)、业务处理、消费后向RocketMQ重新发送延时消息三个部分
/**
* 自动任务基本实现
*/
public abstract class ATaskAutoHandle implements RocketMQListener<MessageExt> {
@Override
public void onMessage(MessageExt messageExt) {
//前置任务,避免重复消费
if (!before(messageExt)) {
return;
}
//处理业务
handle(messageExt);
//后置任务
after(messageExt);
}
public boolean before(MessageExt messageExt) {
//判断当前消息是否已经消费
TaskLogInfoEntity entity = getTaskCenterService().queryTaskLogByKey(messageExt.getKeys());
return entity == null;
}
abstract void handle(MessageExt messageExt);
public void after(MessageExt msgExt) {
//添加消息的消费记录
getTaskCenterService().addTaskSendLog(new TaskLogInfoEntity(msgExt.getTopic(), msgExt.getTags(), msgExt.getKeys()));
//重新发送消息
// 获取间隔时间
long intervalTime = Long.parseLong(msgExt.getProperty(TaskCenterConstants.INTERVAL_TIME_KEY));
SendResult sendResult = getRocketMQTemplate().syncSendDelayTimeSeconds(String.format("%s:%s", msgExt.getTopic(), msgExt.getTags()), MessageBuilder
.withPayload(msgExt.getBody())
.setHeader(RocketMQHeaders.KEYS, msgExt.getTopic() + UUID.randomUUID())
.setHeader(TaskCenterConstants.INTERVAL_TIME_KEY, intervalTime)
.build(), intervalTime);
if (!sendResult.getSendStatus().equals(SendStatus.SEND_OK)){
throw new RuntimeException("消息重新发送失败,key: " + msgExt.getKeys());
}
}
abstract RocketMQTemplate getRocketMQTemplate();
abstract TaskCenterService getTaskCenterService();
}
- 提供“任务表扫描”消息的消费实现
继承“基本抽象类”,实现业务部分-查询当日及当日之前未发送的任务、执行时间小于当前时间的任务立即发送到RocketMQ、执行时间晚于当前时间的任务发送定时消息到RocketMQ
@Component
@RocketMQMessageListener(
topic = "taskAutoTopic",
consumerGroup = "taskAutoConsumerGroup"
)
public class TaskAutoListener extends ATaskAutoHandle {
private static final Logger logger = LoggerFactory.getLogger(TaskAutoListener.class);
@Autowired
private TaskCenterService taskCenterService;
@Autowired
private RocketMQTemplate rocketMQTemplate;
@Override
public void handle(MessageExt messageExt) {
Date now = new Date();
logger.info("待执行任务扫描开始:{}", DateFormatUtils.format(now, DateFormatUtils.ISO_DATETIME_FORMAT.getPattern()));
//查询当天需要执行的所有任务
List<TaskInfoDTO> taskInfoDTOS = taskCenterService.queryNeedSendTask();
if (CollectionUtils.isEmpty(taskInfoDTOS)) {
logger.info("待执行任务扫描结束");
return;
}
//执行时间在当前时间之前的立即发送
taskInfoDTOS.stream().filter(task -> task.getExecuteTime().before(now) || task.getExecuteTime().equals(now)).forEach(task -> {
SendResult sendResult = rocketMQTemplate.syncSend(String.format("%s:%s", task.getTopic(), task.getTag()), MessageBuilder
.withPayload(task.getContent())
.setHeader(RocketMQHeaders.KEYS, task.getTopic() + UUID.randomUUID())
.setHeader(TaskCenterConstants.INTERVAL_TIME_KEY, task.getIntervalTime())
.build());
task.setSendStatus(sendResult.getSendStatus().equals(SendStatus.SEND_OK) ? SendStatusEnum.SEND_SUCCESS.getCode() : SendStatusEnum.SEND_FAIL.getCode());
//更新任务执行状态
taskCenterService.modifyTaskSendStatus(task);
});
//执行时间在当前时间之后的定时发送
taskInfoDTOS.stream().filter(task -> task.getExecuteTime().after(now)).forEach(task -> {
SendResult sendResult = rocketMQTemplate.syncSendDeliverTimeMills(String.format("%s:%s", task.getTopic(), task.getTag()), MessageBuilder
.withPayload(task.getContent())
.setHeader(RocketMQHeaders.KEYS, task.getTopic() + UUID.randomUUID())
.setHeader(TaskCenterConstants.INTERVAL_TIME_KEY, task.getIntervalTime())
.build(), task.getExecuteTime().getTime());
task.setSendStatus(sendResult.getSendStatus().equals(SendStatus.SEND_OK) ? SendStatusEnum.SEND_SUCCESS.getCode() : SendStatusEnum.SEND_FAIL.getCode());
//更新任务执行状态
taskCenterService.modifyTaskSendStatus(task);
});
logger.info("待执行任务扫描结束");
}
@Override
public RocketMQTemplate getRocketMQTemplate() {
return this.rocketMQTemplate;
}
@Override
public TaskCenterService getTaskCenterService() {
return this.taskCenterService;
}
}
自定义业务任务
- 向task_info插入自定义业务任务配置
- 继承“基本抽象类”,实现业务处理部分即可
@Component
@RocketMQMessageListener(
topic = "fundIncomeCalTopic",
consumerGroup = "fundIncomeCalConsumerGroup"
)
public class FundIncomeCalTaskListener extends ATaskAutoHandle {
private static final Logger logger = LoggerFactory.getLogger(FundIncomeCalTaskListener.class);
@Autowired
private TaskCenterService taskCenterService;
@Autowired
private RocketMQTemplate rocketMQTemplate;
@Override
public void handle(MessageExt messageExt) {
Date now = new Date();
logger.info("基金收益计算开始:{}", DateFormatUtils.format(now, DateFormatUtils.ISO_DATETIME_FORMAT.getPattern()));
logger.info("基金收益计算结束");
}
@Override
public RocketMQTemplate getRocketMQTemplate() {
return rocketMQTemplate;
}
@Override
public TaskCenterService getTaskCenterService() {
return taskCenterService;
}
}
标签:info,task,utf8mb4,周期性,任务,SET,自动,NULL,RocketMQ
From: https://www.cnblogs.com/zly1015/p/17995542