首页 > 数据库 >基于mysql的异步事件框架的设计&实现

基于mysql的异步事件框架的设计&实现

时间:2023-08-23 16:37:10浏览次数:36  
标签:异步 return target 框架 trxMsg private trxMsgDO mysql public

背景

       事件驱动模型编程是程序设计中经常会用到的方法技巧,本质上是为了解耦事件的发布者和订阅者,实现组件之间的松耦合,提高应用程序的扩展性;另外,在一些业务场景中,顺序、阻塞式的执行任务会遇到一些比较耗时的中间步骤,但是往往我们不希望整个流程都停下来等待这些中间过程完成,这个时候我们就会考虑异步执行这些中间步骤; 综上所述,我们需要的是一个可异步执行的事件模型框架;

这里就很容易想到spring Event; spring Event的使用支持Event的发布和订阅,同时也支持@Async注解来达到异步执行的目的。但是spring event有以下缺点:

  1. 事件的队列是存储在内存中的,一旦机器重启或者宕机,事件本身会丢失
  2. listener消费事件,消费失败不会重试,缺乏重试机制

设计

因此设计的标准就是:

  1. 支持异步事件模型(发布、订阅)
  2. 事件存储在db或者别的地方,重启不会导致事件本身的丢失
  3. 事件的消费支持重试

image.png

实现

消息实体

Topic枚举

public enum Topic {
       
    /**
     * 测试topic
     **
    TEST_TOPIC1;
}

Group枚举

public enum Group {

    DEFAULT, //默认分组
    FAIL; //失败分组
}

TrxMsg实体:

@Data
public class TrxMsg<T> {

    private Long id;

    /**
     * 消息的topic
     */
    private Topic topic;

    /**
     * 消息体
     */
    private T content;

    /**
     * 累计执行次数
     */
    private int execTimes;

    /**
     * 执行时间(延迟执行)
     */
    private Date gmtExec;

    /**
     * 创建时间
     */
    private Date gmtCreate;

    /**
     * 更新时间
     */
    private Date gmtUpdate;

    /**
     * 消费分组
     */
    private Group group;

    /**
     * 消息的状态
     */
    private MsgStatus msgStatus;
}

消息的Listener

定义监听器接口

/**
 * 消息监听器
 */
public interface TrxMsgListener<Msg> {

    /**
     * 监听哪一类消息
     * @return
     */
    public Topic getTopic();

    /**
     * 获取消息并执行
     * @param msg
     * @return
     */
    public ExecResult execute(Msg msg);
}

消息mysql实体

建表语句

CREATE TABLE `trx_msg` (
  `id` bigint unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',
  `gmt_create` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
  `gmt_update` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '更新时间',
  `topic` varchar(30) NOT NULL COMMENT '主题',
  `gmt_exec` datetime NOT NULL COMMENT '运行时间',
  `exec_time` int NOT NULL COMMENT '运行次数',
  `content` text COMMENT '内容',
  `java_class` varchar(150) DEFAULT NULL COMMENT '类名',
  `group` varchar(20) NOT NULL DEFAULT 'DEFAULT' COMMENT '分组',
  `status` int NOT NULL DEFAULT '0' COMMENT '状态',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='事务消息';

实体TrxMsgPO对象的定义(这里使用了mybatis-plus, 参考文档https://baomidou.com/)

@TableName("trx_msg")
@Data
public class TrxMsgPO {

    @TableId(value = "id", type = AUTO)
    private Long id;

    @TableField(value = "gmt_create")
    private Date gmtCreate;

    @TableField(value = "gmt_update", update = "now()")
    private Date gmtUpdate;

    @TableField(value = "topic")
    private String topic;

    @TableField(value = "gmt_exec")
    private Date gmtExec;

    @TableField(value = "exec_time", update = "exec_time + 1")
    private int execTime;

    @TableField(value = "content")
    private String content;

    @TableField(value = "java_class")
    private String className;

    @TableField(value = "`group`")
    private String group;

    @TableField(value = "`status`")
    private int status;
}

TrxMsg领域模型 跟 TrxMsgPO 存储模型转换

@Slf4j
@Component
public class TrxMsgConvert implements ModelConverter<TrxMsgPO, TrxMsg> {

    @Override
    public TrxMsg convert(TrxMsgPO source) {

        try {
            TrxMsg trxMsg = new TrxMsg();

            trxMsg.setId(source.getId());
            trxMsg.setMsgStatus(MsgStatus.get(source.getStatus()));
            trxMsg.setExecTimes(source.getExecTime());
            trxMsg.setGmtCreate(source.getGmtCreate());
            trxMsg.setGmtExec(source.getGmtExec());
            trxMsg.setGmtUpdate(source.getGmtUpdate());
            trxMsg.setGroup(Group.DEFAULT);
            trxMsg.setTopic(Topic.valueOf(source.getTopic()));
            Class<?> cls = Class.forName(source.getClassName());
            trxMsg.setContent(JSON.parseObject(source.getContent(), cls));

            return trxMsg;
        } catch (ClassNotFoundException e) {
            log.error("className={} is not fund in system", source.getClassName());
        }

        return null;
    }

    @Override
    public List<TrxMsg> convert(List<TrxMsgPO> trxMsgDOS) {
        if (CollectionUtils.isEmpty(trxMsgDOS)) {
            return new ArrayList<>(0);
        }

        return trxMsgDOS.stream().map(this::convert).collect(Collectors.toList());
    }

    @Override
    public List<TrxMsgPO> reconvertList(List<TrxMsg> sList) {
        if (CollectionUtils.isEmpty(sList)) {
            return new ArrayList<>(0);
        }

        return sList.stream().map(this::reconvert).collect(Collectors.toList());
    }

    @Override
    public TrxMsgPO reconvert(TrxMsg target) {
        TrxMsgPO trxMsgDO = new TrxMsgPO();

        trxMsgDO.setId(target.getId());
        trxMsgDO.setContent(JSON.toJSONString(target.getContent()));
        trxMsgDO.setClassName(target.getContent().getClass().getName());
        trxMsgDO.setExecTime(target.getExecTimes());

        if (target.getGmtCreate()!=null) {
            trxMsgDO.setGmtCreate(target.getGmtCreate());
        } else {
            trxMsgDO.setGmtCreate(new Date());
        }

        if (target.getGmtExec()!=null) {
            trxMsgDO.setGmtExec(target.getGmtExec());
        } else {
            trxMsgDO.setGmtExec(new Date());
        }

        if (target.getGmtUpdate()!=null) {
            trxMsgDO.setGmtUpdate(target.getGmtUpdate());
        } else {
            trxMsgDO.setGmtUpdate(new Date());
        }

        if (target.getGroup()!=null) {
            trxMsgDO.setGroup(target.getGroup().name());
        } else {
            trxMsgDO.setGroup(Group.DEFAULT.name());
        }

        trxMsgDO.setTopic(target.getTopic().name());
        trxMsgDO.setStatus(target.getMsgStatus().code);

        trxMsgDO.setId(target.getId());

        return trxMsgDO;
    }
}

消息队列

@Slf4j
@Component
public class TrxMsgService {

    private volatile static Map<Topic, List<TrxMsgListener>> msgListenerMap;

    @Autowired
    List<TrxMsgListener> listenerList;

    private int MaxExecTime = 1;

    static {
        msgListenerMap = new HashMap<>();
    }

    @Resource
    TrxMsgConvert trxMsgConvert;

    @Resource
    TrxMsgMapper trxMsgMapper;

    /**
     * 消息发布的入口
     **
    public void publishMsg(TrxMsg trxMsg) {
        if (trxMsg.getGmtExec()==null) {
            trxMsg.setGmtExec(new Date());
        }

        if (trxMsg.getGroup()==null) {
            trxMsg.setGroup(Group.DEFAULT);
        }

        trxMsg.setExecTimes(0);
        trxMsg.setMsgStatus(MsgStatus.INIT);


        trxMsgMapper.insert(trxMsgConvert.reconvert(trxMsg));
    }

    /**
     * 立即执行,失败转异步
     * @param trxMsg 事物消息
     */
    public <T> void execMsgImmediately(TrxMsg<T> trxMsg) {
        List<TrxMsgListener> filteredList = getListenerList(trxMsg.getTopic());
        if (CollectionUtils.isEmpty(filteredList)) {
            return;
        }

        for (TrxMsgListener listener : filteredList) {
            try {
                ExecResult result = listener.execute(trxMsg.getContent());

                if (ExecResult.FAIL.equals(result)) {
                    publishMsg(trxMsg);
                    return;
                }
            } catch (Exception e) {
                log.error("system error", e);
                publishMsg(trxMsg);
                return;
            }
        }
    }

    public void execMsg(TrxMsg trxMsg) {
        List<TrxMsgListener> filteredList = getListenerList(trxMsg.getTopic());

        if (CollectionUtils.isEmpty(filteredList)) {
            return;
        }

        for (TrxMsgListener listener : filteredList) {
            try {
                ExecResult result = listener.execute(trxMsg.getContent());

                boolean force = false;

                if (ExecResult.FAIL.equals(result)) {
                    //执行失败,且执行次数超过最大限制,则默认划分到FAIL分组去,防止消息反复失败从而积压导致消息消费阻塞
                    if (!Group.FAIL.equals(trxMsg.getGroup()) && trxMsg.getExecTimes()>=(MaxExecTime-1)) {
                        trxMsg.setMsgStatus(MsgStatus.FAIL);
                        trxMsg.setGroup(Group.FAIL); //置为失败分组
                        trxMsg.setExecTimes(0);
                        force = true;
                    } else {
                        trxMsg.setMsgStatus(MsgStatus.FAIL_WAIT_RETRY);
                        //延迟10*execTime分钟后执行, 指数级增长
                        trxMsg.setGmtExec(DateUtils.addMinutes(new Date(), (trxMsg.getExecTimes()+1)*10));
                    }

                    update(trxMsg, force);
                    return;
                }
            } catch (Exception e) {
                log.error("system error", e);
                return;
            }
        }

        //成功执行,直接删除就行了
        trxMsgMapper.deleteById(trxMsg.getId());
    }

    /**
     * 单个执行
     */
    public void execMsgById(Long id) {
        TrxMsgPO msg = trxMsgMapper.selectById(id);
        TrxMsg trxMsg = trxMsgConvert.convert(msg);
        //trxMsg.setExecTimes(0); //强制执行的情况下,执行次数强行置为0
        execMsg(trxMsg);
    }

    private void update(TrxMsg trxMsg, boolean forceUpdateExecTime) {
        trxMsgMapper.updateById(trxMsgConvert.reconvert(trxMsg));

        if (forceUpdateExecTime) {
            trxMsgMapper.updateExecTime(trxMsg.getId(), trxMsg.getExecTimes());
        }
    }

    private List<TrxMsgListener> getListenerList(Topic topic) {
        if (msgListenerMap.containsKey(topic)) {
            return msgListenerMap.get(topic);
        } else {
            List<TrxMsgListener> msgList = listenerList.stream().filter(item->item.getTopic().equals(topic)).collect(Collectors.toList());
          synchronized (TrxMsgService.class)  {
              msgListenerMap.put(topic, msgList);
          }
          return msgList;
        }
    }
}

消息发布

发布消息的代码片段如下

public class Test {

    @Resource
    TrxMsgService trxMsgService;

    public void test() {
        TrxMsg<String> msg = new TrxMsg<>();
        msg.setTopic(Topic.TEST_TOPIC1);
        msg.setContent("test msg");
        //这里可以设置消息的执行时间,主要运用于延迟消息
        msg.setGmtExec(new Date());

        trxMsgService.publishMsg(msg);
    }

}

调度任务

调度任务的作用就是定时按照分组捞取可执行状态的消息,并分发到对应的listener上去执行;

@Slf4j
@Component
public class TrxMsgJob {

    @Resource
    TrxMsgService trxMsgService;

    @Resource
    TrxMsgMapper trxMsgMapper;

    @Resource
    TrxMsgConvert trxMsgConvert;

    /**
     * 调度任务的入口
     **
    public void execute() {
        String params = NdspHelper.getJobParam();
        List<Integer> statusList = new ArrayList<>();
        statusList.add(MsgStatus.INIT.code);
        statusList.add(MsgStatus.FAIL_WAIT_RETRY.code);

        LambdaQueryWrapper<TrxMsgPO> q = new LambdaQueryWrapper<>();
        if (StringUtils.isNotBlank(params)) {
            log.info("GROUP={}", params);
            q.eq(TrxMsgPO::getGroup, params);
        }

        String now = DateFormatUtils.format(new Date(),"yyyy-MM-dd HH:mm:ss");
        q.apply("UNIX_TIMESTAMP(gmt_exec) <= UNIX_TIMESTAMP({0})", now);
        q.in(TrxMsgPO::getStatus, statusList);

        try {
            PageHelper.offsetPage(0, 10);
            List<TrxMsgPO> trxMsgDOS = trxMsgMapper.selectList(q);

            for (TrxMsgPO msg : trxMsgDOS) {
                TrxMsg trxMsg = trxMsgConvert.convert(msg);
                if (trxMsg!=null) {
                    trxMsgService.execMsg(trxMsg);
                }
            }
        } catch (Exception e) {
            log.error("system error", e);
        }
    }
}

标签:异步,return,target,框架,trxMsg,private,trxMsgDO,mysql,public
From: https://www.cnblogs.com/viogs/p/17652020.html

相关文章

  • 直播网站源码,mysql,mariadb 密码忘记,修改密码
    直播网站源码,mysql,mariadb密码忘记,修改密码两种修改方法: 1、直接在shell命令行使用mysqladm命令修改。 #mysqladmin-uroot-poldpasswordpasswordnewpassword ​这种方法的弊端在于会明文显示密码。2、登陆数据库修改密码。 #mysql-uroot-p  2.1更新......
  • mac卸载mysql教程(按照步骤可完全卸载)
    1、关闭mysql系统偏好设置-点击mysql图标-点击stop...或者查看mysql是否启动:ps-ef|grepmysql输入:kill-9 然后回车,关闭mysql2、卸载:在Mac终端使用下面的命令删除所有mysql文件即可sudorm/usr/local/mysqlsudorm-rf/usr/local/mysql*rm-rf/Library/Startu......
  • mysql超出最大连接数解决方法
    遇到mysql超出最大连接数,相信不少人第一反应就是查看mysql进程,看有没有慢查询,当然这个做法是完全正确的!但是很多时候真正的问题不在这里。今天有遇到同样的问题,一味查看mysql进程和慢查询日志,无果。后来老大提点了一下,查看一下nginx日志,发现有一两个访问执行时候比较长,然后使用top......
  • docker部署MySQL、tomcat
    一、安装MySQL1、dockerpullmysql:latest拉取镜像dockersearchmysql命令将返回与关键字“mysql”相关的Docker镜像列表。在结果中,你将看到镜像名称、描述、星级评分、官方/认证标志以及其他相关信息。结果包括了许多不同的MySQL镜像,包括官方支持的和由社区维护的镜像。你......
  • 【MySQL数据库总结】
    【一】MySQL初识存储数据的演变过程SQL语句的由来库/表/记录/表头/表单MySQL安装初识SQL语句【二】MySQL基础存储引擎创建表语法基本数据类型字段类型严格模式约束条件【三】MySQL之约束条件引言约束条件外键修改表语法复制表语法【四】MySQL之约束条件......
  • js 异步改成同步Promise
    functionPromise(executor){letself=this;if(typeofexecutor!=='function')thrownewTypeError('Promiseresolver'+executor+'isnotafunction');if(!(selfinstanceofPromise))thrownewTypeError(�......
  • 【Django框架基础总结】
    【一】Django框架之初识【二】Django框架之静态文件配置说明【三】Django框架之数据操作(ORM)初识【四】Django框架之ORM创建表关系【五】Django框架之请求生命周期流程图【六】Django框架之路由层【七】Django框架之视图层【八】Django框架之模板层【九】Django框架之模......
  • MySQL对小数进行四舍五入等操作
    数学函数是MySQL中常用的一类函数。其主要用于处理数字,包括整型和浮点数等等。MySQL常用的四舍五入函数:函数说明FLOOR(X)返回不大于X的最大整数。CEIL(X)、CEILING(X) 返回不小于X的最小整数。ROUND(X) 返回离X最近的整数,截断时要进行四舍五入。ROUND(X,D) 保留X小数点后D位......
  • Gorm实现数据库增删查改——mysql篇
    本文的目标本文力图实现用简短的篇幅,结合一些具体的应用场景来讲述gorm基本的增删查改功能。Gorm介绍在现代软件开发中,数据库操作是构建应用程序不可或缺的一部分。GORM是Go语言的一个ORM(ObjectRelationalMapping)库。它可以让我们通过Go来操作数据库中的数据。其中ORM(Obje......
  • mysql
    Mysql基础以最常见的学生表查询为基础数据库与测试的关系检查界面可见输入数据存储检查界面不可见数据存储检查是否符合数据库事务的一致性聚合函数max(字段):求该字段的最大值min(字段):求该字段的最小值avg(字段):求该字段的平均值sum(字段):对该字段的值求和count(*......