首页 > 其他分享 >RocketMQ应用-实现周期性自动任务

RocketMQ应用-实现周期性自动任务

时间:2024-01-29 23:02:24浏览次数:28  
标签:info task utf8mb4 周期性 任务 SET 自动 NULL RocketMQ

应用背景

提供配置功能,用于固定周期的执行某个动作;如在基金交易的每个交易日结束时,需要根据当天交易量计算基金的收益,可以提供定时任务,在每天晚上固定的时间计算收益数据。

功能设计

提供任务数据表task_info和任务执行记录表task_log_info;通过扫描task_info表中所有的任务配置数据,将任务发送的RocketMQ,当消费者消费任务后按照指定的周期时间向RocketMQ发送延时执行任务,同时在task_log_info记录当此任务的消费记录。

数据表

task_info

CREATE TABLE `task_info`  (
  `id` char(36) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL,
  `code` char(8) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL DEFAULT NULL,
  `version` int NOT NULL,
  `create_time` datetime NOT NULL,
  `content` varchar(256) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL DEFAULT NULL COMMENT '消息内容,可传递自定义参数',
  `title` varchar(128) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT '消息标题',
  `topic` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT '消息主题',
  `tag` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT '消息tag',
  `execute_time` datetime NOT NULL COMMENT '待执行时间',
  `interval_time` mediumtext CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT '周期间隔时间,单位:秒',
  `send_status` varchar(1) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT '发送状态 0-待发送 1-发送成功 2-发送失败',
  PRIMARY KEY (`id`) USING BTREE,
  INDEX `topic`(`topic`, `tag`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_0900_ai_ci ROW_FORMAT = Dynamic;

SET FOREIGN_KEY_CHECKS = 1;

task_log_info

CREATE TABLE `task_log_info`  (
  `id` char(36) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL,
  `code` char(8) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL DEFAULT NULL,
  `version` int NOT NULL,
  `create_time` datetime NOT NULL,
  `key` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL DEFAULT NULL COMMENT '消息唯一键',
  `topic` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT '消息主题',
  `tag` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT '消息tag',
  PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_0900_ai_ci ROW_FORMAT = Dynamic;

SET FOREIGN_KEY_CHECKS = 1;

功能逻辑

自动任务核心功能

  1. 在task_info表中插入自动任务功能的基本任务-任务表扫描,并将这个任务初始化发送到RocketMQ(仅需要初始化一次),设置间隔执行时间为5分钟。
  2. 提供“任务表扫描”的消费者,消费者查询任务表中当天和当天之前所有待发送到RocketMQ的任务,并根据配置的任务执行时间向RocketMQ发送定时消息
  3. “任务表扫描”消息每次消费结束时,需要向RocketMQ重新发送延时的“任务表扫描”消息,实现周期性执行的效果。

自动任务业务功能

在核心功能“任务表扫描”周期执行完成的基础上,按照如下方式添加业务类任务:

  1. 在task_info表中插入业务任务
  2. 一对一提供业务任务的消费监听代码
  3. 消费时按照校验重复消费、业务处理、消息重新延时发送到RocketMQ三个步骤进行

功能实现

核心功能部分

  1. 基本抽象类提供
    将任务的消费分成消费前幂等性校验(通过task_log_info判断消息是否已消费)、业务处理、消费后向RocketMQ重新发送延时消息三个部分
/**
 * 自动任务基本实现
 */
public abstract class ATaskAutoHandle implements RocketMQListener<MessageExt> {

    @Override
    public void onMessage(MessageExt messageExt) {
        //前置任务,避免重复消费
        if (!before(messageExt)) {
            return;
        }
        //处理业务
        handle(messageExt);
        //后置任务
        after(messageExt);
    }

    public boolean before(MessageExt messageExt) {
        //判断当前消息是否已经消费
        TaskLogInfoEntity entity = getTaskCenterService().queryTaskLogByKey(messageExt.getKeys());
        return entity == null;
    }

    abstract void handle(MessageExt messageExt);

    public void after(MessageExt msgExt) {
        //添加消息的消费记录
        getTaskCenterService().addTaskSendLog(new TaskLogInfoEntity(msgExt.getTopic(), msgExt.getTags(), msgExt.getKeys()));
        //重新发送消息
        // 获取间隔时间
        long intervalTime = Long.parseLong(msgExt.getProperty(TaskCenterConstants.INTERVAL_TIME_KEY));
        SendResult sendResult = getRocketMQTemplate().syncSendDelayTimeSeconds(String.format("%s:%s", msgExt.getTopic(), msgExt.getTags()), MessageBuilder
                .withPayload(msgExt.getBody())
                .setHeader(RocketMQHeaders.KEYS, msgExt.getTopic() + UUID.randomUUID())
                .setHeader(TaskCenterConstants.INTERVAL_TIME_KEY, intervalTime)
                .build(), intervalTime);
        if (!sendResult.getSendStatus().equals(SendStatus.SEND_OK)){
            throw new RuntimeException("消息重新发送失败,key: " + msgExt.getKeys());
        }
    }

    abstract RocketMQTemplate getRocketMQTemplate();

    abstract TaskCenterService getTaskCenterService();
}
  1. 提供“任务表扫描”消息的消费实现
    继承“基本抽象类”,实现业务部分-查询当日及当日之前未发送的任务、执行时间小于当前时间的任务立即发送到RocketMQ、执行时间晚于当前时间的任务发送定时消息到RocketMQ
@Component
@RocketMQMessageListener(
    topic = "taskAutoTopic",
    consumerGroup = "taskAutoConsumerGroup"
)
public class TaskAutoListener extends ATaskAutoHandle {

    private static final Logger logger = LoggerFactory.getLogger(TaskAutoListener.class);

    @Autowired
    private TaskCenterService taskCenterService;

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    @Override
    public void handle(MessageExt messageExt) {
        Date now = new Date();
        logger.info("待执行任务扫描开始:{}", DateFormatUtils.format(now, DateFormatUtils.ISO_DATETIME_FORMAT.getPattern()));
        //查询当天需要执行的所有任务
        List<TaskInfoDTO> taskInfoDTOS = taskCenterService.queryNeedSendTask();
        if (CollectionUtils.isEmpty(taskInfoDTOS)) {
            logger.info("待执行任务扫描结束");
            return;
        }
        //执行时间在当前时间之前的立即发送
        taskInfoDTOS.stream().filter(task -> task.getExecuteTime().before(now) || task.getExecuteTime().equals(now)).forEach(task -> {
            SendResult sendResult = rocketMQTemplate.syncSend(String.format("%s:%s", task.getTopic(), task.getTag()), MessageBuilder
                    .withPayload(task.getContent())
                    .setHeader(RocketMQHeaders.KEYS, task.getTopic() + UUID.randomUUID())
                    .setHeader(TaskCenterConstants.INTERVAL_TIME_KEY, task.getIntervalTime())
                    .build());
            task.setSendStatus(sendResult.getSendStatus().equals(SendStatus.SEND_OK) ? SendStatusEnum.SEND_SUCCESS.getCode() : SendStatusEnum.SEND_FAIL.getCode());
            //更新任务执行状态
            taskCenterService.modifyTaskSendStatus(task);
        });

        //执行时间在当前时间之后的定时发送
        taskInfoDTOS.stream().filter(task -> task.getExecuteTime().after(now)).forEach(task -> {
            SendResult sendResult = rocketMQTemplate.syncSendDeliverTimeMills(String.format("%s:%s", task.getTopic(), task.getTag()), MessageBuilder
                    .withPayload(task.getContent())
                    .setHeader(RocketMQHeaders.KEYS, task.getTopic() + UUID.randomUUID())
                    .setHeader(TaskCenterConstants.INTERVAL_TIME_KEY, task.getIntervalTime())
                    .build(), task.getExecuteTime().getTime());
            task.setSendStatus(sendResult.getSendStatus().equals(SendStatus.SEND_OK) ? SendStatusEnum.SEND_SUCCESS.getCode() : SendStatusEnum.SEND_FAIL.getCode());
            //更新任务执行状态
            taskCenterService.modifyTaskSendStatus(task);
        });

        logger.info("待执行任务扫描结束");
    }

    @Override
    public RocketMQTemplate getRocketMQTemplate() {
        return this.rocketMQTemplate;
    }

    @Override
    public TaskCenterService getTaskCenterService() {
        return this.taskCenterService;
    }
}

自定义业务任务

  1. 向task_info插入自定义业务任务配置
  2. 继承“基本抽象类”,实现业务处理部分即可
@Component
@RocketMQMessageListener(
    topic = "fundIncomeCalTopic",
    consumerGroup = "fundIncomeCalConsumerGroup"
)
public class FundIncomeCalTaskListener extends ATaskAutoHandle {

    private static final Logger logger = LoggerFactory.getLogger(FundIncomeCalTaskListener.class);

    @Autowired
    private TaskCenterService taskCenterService;

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    @Override
    public void handle(MessageExt messageExt) {
        Date now = new Date();
        logger.info("基金收益计算开始:{}", DateFormatUtils.format(now, DateFormatUtils.ISO_DATETIME_FORMAT.getPattern()));
        logger.info("基金收益计算结束");
    }

    @Override
    public RocketMQTemplate getRocketMQTemplate() {
        return rocketMQTemplate;
    }

    @Override
    public TaskCenterService getTaskCenterService() {
        return taskCenterService;
    }
}

标签:info,task,utf8mb4,周期性,任务,SET,自动,NULL,RocketMQ
From: https://www.cnblogs.com/zly1015/p/17995542

相关文章

  • linux利用shell+crontab自动备份mysql数据库
    一.准备条件,得先安装一个mysql数据库mysql5.7数据库安装教程https://www.cnblogs.com/magepi/p/16983507.html 二.然后准备编写shell脚本编写备份单个库shell脚本#!/bin/bash#保存备份个数,备份3天数据number=3#备份保存路径backup_dir=/opt/mysqlbackup#日期dd=......
  • 物流平台如何与电商平台进行自动化流程管理
    为什么要实现物流与电商平台进行自动化管理实现物流平台与电商平台的自动化流程管理对企业和消费者都有着重要的意义,比如以下几点:提高效率:自动化流程管理可以减少人为操作的错误和延误,提高订单处理和物流配送的效率。通过定义清晰的流程图和自动化工具,可以快速而准确地完成订单处理......
  • pycharm新建文档自动显示编码格式和作者信息等信息
    1.设置方法setting--editor--fileandcodetemplates--选择pythonscript--输入信息模版--apply2.信息模版--coding:utf-8--"""Author:wrTime:${DATE}${TIME}File:${NAME}.pySoftware:${PRODUCT_NAME}"""从上到下依次是:作者:时间:文件名:ide名称:还......
  • AI智能视频监控系统自动检测、识别工服工装,准确率更高!
    工服工装的识别在很多行业场景中都具有重要的意义,如公共场所监控、工地管理、人员安全、明厨亮灶、医疗卫生等,员工穿戴特定的工服工装已然成为各个行业的安全要求与标准。随着AI+人工智能技术的快速发展,工服工装识别技术得以应用于实际项目现场中,实现自动化、高效化的管理。方......
  • Jenkins + Gitlab 前后端项目自动化构建部署
    Jenkins+Gitlab前后端项目自动化构建部署:https://blog.csdn.net/IT_ZRS/article/details/115032509?spm=1001.2014.3001.5501Docker+Jenkins+Gitlab自动化构建部署:https://blog.csdn.net/IT_ZRS/article/details/117533847?spm=1001.2101.3001.6650.1&utm_medium=distrib......
  • web自动化测试复习巩固第二天
    定位方法selenium4:driver.find_element(定位策略用By类,定位表达式)---返回表达式匹配的第一个元素,如果找不到元素,会报错。driver.find_elements(定位策略用By类,定位表达式)---返回表达式匹配的所有元素,返回的是列表类型。By类:管理8种定位方式,每一种定位方......
  • Cisco Catalyst Center 2.3.7.4-VA - 网络管理和自动化
    CiscoCatalystCenter2.3.7.4-VA-网络管理和自动化CiscoCatalystCenter-NetworkManagementandAutomation请访问原文链接:https://sysin.org/blog/cisco-catalyst-center/,查看最新版。原创作品,转载请保留出处。作者主页:sysin.orgCiscoCatalystCenter节约时间,不再......
  • 解锁亚马逊鲲鹏系统奥秘:自动生成AI角色,随心定制养号计划
    亚马逊鲲鹏系统是一款强大的软件,是一款可以批量注册亚马逊买家账号并能自动化养号的软件。其独特的功能和操作流程使得用户能够轻松管理多个账号,实现高效的运营和管理。首先,该软件提供了AI智能养号功能,用户可以自动生成具有真实特征的AI角色,包括姓名、性别、年龄、职业及爱好等信息......
  • 局域网电脑自动监控软件,微信泄密|QQ泄密|邮件泄密通通一网打尽
    本文一共:1035字|预计阅读时间:5分钟局域网电脑自动监控软件1.功能概述在当前信息时代,随着电脑网络的广泛应用,网络安全问题日益突出。为了有效应对各种潜在的泄密风险,域智盾软件推出了一款强大的局域网电脑自动监控软件。该软件具备微信、QQ、企业微信、钉钉、邮件等多方面监......
  • 和好友一起学习如何3分钟自动化搭建幻兽帕鲁联机服务器]
    和好友一起学习如何3分钟自动化搭建幻兽帕鲁联机服务器《幻兽帕鲁》的意外火爆,源于其巧妙地将各种元素进行融合,形成了一种独特的游戏体验。这款游戏,让人看到了“缝合”游戏的新高度,也让人感叹,原来缝合得当,也能如此吸引人。帕鲁,是游戏中的核心元素,它既是战斗的小精灵,也是生活的......