背景
事件驱动模型编程是程序设计中经常会用到的方法技巧,本质上是为了解耦事件的发布者和订阅者,实现组件之间的松耦合,提高应用程序的扩展性;另外,在一些业务场景中,顺序、阻塞式的执行任务会遇到一些比较耗时的中间步骤,但是往往我们不希望整个流程都停下来等待这些中间过程完成,这个时候我们就会考虑异步执行这些中间步骤; 综上所述,我们需要的是一个可异步执行的事件模型框架;
这里就很容易想到spring Event; spring Event的使用支持Event的发布和订阅,同时也支持@Async注解来达到异步执行的目的。但是spring event有以下缺点:
- 事件的队列是存储在内存中的,一旦机器重启或者宕机,事件本身会丢失
- listener消费事件,消费失败不会重试,缺乏重试机制
设计
因此设计的标准就是:
- 支持异步事件模型(发布、订阅)
- 事件存储在db或者别的地方,重启不会导致事件本身的丢失
- 事件的消费支持重试
实现
消息实体
Topic枚举
public enum Topic {
/**
* 测试topic
**
TEST_TOPIC1;
}
Group枚举
public enum Group {
DEFAULT, //默认分组
FAIL; //失败分组
}
TrxMsg实体:
@Data
public class TrxMsg<T> {
private Long id;
/**
* 消息的topic
*/
private Topic topic;
/**
* 消息体
*/
private T content;
/**
* 累计执行次数
*/
private int execTimes;
/**
* 执行时间(延迟执行)
*/
private Date gmtExec;
/**
* 创建时间
*/
private Date gmtCreate;
/**
* 更新时间
*/
private Date gmtUpdate;
/**
* 消费分组
*/
private Group group;
/**
* 消息的状态
*/
private MsgStatus msgStatus;
}
消息的Listener
定义监听器接口
/**
* 消息监听器
*/
public interface TrxMsgListener<Msg> {
/**
* 监听哪一类消息
* @return
*/
public Topic getTopic();
/**
* 获取消息并执行
* @param msg
* @return
*/
public ExecResult execute(Msg msg);
}
消息mysql实体
建表语句
CREATE TABLE `trx_msg` (
`id` bigint unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',
`gmt_create` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`gmt_update` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '更新时间',
`topic` varchar(30) NOT NULL COMMENT '主题',
`gmt_exec` datetime NOT NULL COMMENT '运行时间',
`exec_time` int NOT NULL COMMENT '运行次数',
`content` text COMMENT '内容',
`java_class` varchar(150) DEFAULT NULL COMMENT '类名',
`group` varchar(20) NOT NULL DEFAULT 'DEFAULT' COMMENT '分组',
`status` int NOT NULL DEFAULT '0' COMMENT '状态',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='事务消息';
实体TrxMsgPO对象的定义(这里使用了mybatis-plus, 参考文档https://baomidou.com/)
@TableName("trx_msg")
@Data
public class TrxMsgPO {
@TableId(value = "id", type = AUTO)
private Long id;
@TableField(value = "gmt_create")
private Date gmtCreate;
@TableField(value = "gmt_update", update = "now()")
private Date gmtUpdate;
@TableField(value = "topic")
private String topic;
@TableField(value = "gmt_exec")
private Date gmtExec;
@TableField(value = "exec_time", update = "exec_time + 1")
private int execTime;
@TableField(value = "content")
private String content;
@TableField(value = "java_class")
private String className;
@TableField(value = "`group`")
private String group;
@TableField(value = "`status`")
private int status;
}
TrxMsg领域模型 跟 TrxMsgPO 存储模型转换
@Slf4j
@Component
public class TrxMsgConvert implements ModelConverter<TrxMsgPO, TrxMsg> {
@Override
public TrxMsg convert(TrxMsgPO source) {
try {
TrxMsg trxMsg = new TrxMsg();
trxMsg.setId(source.getId());
trxMsg.setMsgStatus(MsgStatus.get(source.getStatus()));
trxMsg.setExecTimes(source.getExecTime());
trxMsg.setGmtCreate(source.getGmtCreate());
trxMsg.setGmtExec(source.getGmtExec());
trxMsg.setGmtUpdate(source.getGmtUpdate());
trxMsg.setGroup(Group.DEFAULT);
trxMsg.setTopic(Topic.valueOf(source.getTopic()));
Class<?> cls = Class.forName(source.getClassName());
trxMsg.setContent(JSON.parseObject(source.getContent(), cls));
return trxMsg;
} catch (ClassNotFoundException e) {
log.error("className={} is not fund in system", source.getClassName());
}
return null;
}
@Override
public List<TrxMsg> convert(List<TrxMsgPO> trxMsgDOS) {
if (CollectionUtils.isEmpty(trxMsgDOS)) {
return new ArrayList<>(0);
}
return trxMsgDOS.stream().map(this::convert).collect(Collectors.toList());
}
@Override
public List<TrxMsgPO> reconvertList(List<TrxMsg> sList) {
if (CollectionUtils.isEmpty(sList)) {
return new ArrayList<>(0);
}
return sList.stream().map(this::reconvert).collect(Collectors.toList());
}
@Override
public TrxMsgPO reconvert(TrxMsg target) {
TrxMsgPO trxMsgDO = new TrxMsgPO();
trxMsgDO.setId(target.getId());
trxMsgDO.setContent(JSON.toJSONString(target.getContent()));
trxMsgDO.setClassName(target.getContent().getClass().getName());
trxMsgDO.setExecTime(target.getExecTimes());
if (target.getGmtCreate()!=null) {
trxMsgDO.setGmtCreate(target.getGmtCreate());
} else {
trxMsgDO.setGmtCreate(new Date());
}
if (target.getGmtExec()!=null) {
trxMsgDO.setGmtExec(target.getGmtExec());
} else {
trxMsgDO.setGmtExec(new Date());
}
if (target.getGmtUpdate()!=null) {
trxMsgDO.setGmtUpdate(target.getGmtUpdate());
} else {
trxMsgDO.setGmtUpdate(new Date());
}
if (target.getGroup()!=null) {
trxMsgDO.setGroup(target.getGroup().name());
} else {
trxMsgDO.setGroup(Group.DEFAULT.name());
}
trxMsgDO.setTopic(target.getTopic().name());
trxMsgDO.setStatus(target.getMsgStatus().code);
trxMsgDO.setId(target.getId());
return trxMsgDO;
}
}
消息队列
@Slf4j
@Component
public class TrxMsgService {
private volatile static Map<Topic, List<TrxMsgListener>> msgListenerMap;
@Autowired
List<TrxMsgListener> listenerList;
private int MaxExecTime = 1;
static {
msgListenerMap = new HashMap<>();
}
@Resource
TrxMsgConvert trxMsgConvert;
@Resource
TrxMsgMapper trxMsgMapper;
/**
* 消息发布的入口
**
public void publishMsg(TrxMsg trxMsg) {
if (trxMsg.getGmtExec()==null) {
trxMsg.setGmtExec(new Date());
}
if (trxMsg.getGroup()==null) {
trxMsg.setGroup(Group.DEFAULT);
}
trxMsg.setExecTimes(0);
trxMsg.setMsgStatus(MsgStatus.INIT);
trxMsgMapper.insert(trxMsgConvert.reconvert(trxMsg));
}
/**
* 立即执行,失败转异步
* @param trxMsg 事物消息
*/
public <T> void execMsgImmediately(TrxMsg<T> trxMsg) {
List<TrxMsgListener> filteredList = getListenerList(trxMsg.getTopic());
if (CollectionUtils.isEmpty(filteredList)) {
return;
}
for (TrxMsgListener listener : filteredList) {
try {
ExecResult result = listener.execute(trxMsg.getContent());
if (ExecResult.FAIL.equals(result)) {
publishMsg(trxMsg);
return;
}
} catch (Exception e) {
log.error("system error", e);
publishMsg(trxMsg);
return;
}
}
}
public void execMsg(TrxMsg trxMsg) {
List<TrxMsgListener> filteredList = getListenerList(trxMsg.getTopic());
if (CollectionUtils.isEmpty(filteredList)) {
return;
}
for (TrxMsgListener listener : filteredList) {
try {
ExecResult result = listener.execute(trxMsg.getContent());
boolean force = false;
if (ExecResult.FAIL.equals(result)) {
//执行失败,且执行次数超过最大限制,则默认划分到FAIL分组去,防止消息反复失败从而积压导致消息消费阻塞
if (!Group.FAIL.equals(trxMsg.getGroup()) && trxMsg.getExecTimes()>=(MaxExecTime-1)) {
trxMsg.setMsgStatus(MsgStatus.FAIL);
trxMsg.setGroup(Group.FAIL); //置为失败分组
trxMsg.setExecTimes(0);
force = true;
} else {
trxMsg.setMsgStatus(MsgStatus.FAIL_WAIT_RETRY);
//延迟10*execTime分钟后执行, 指数级增长
trxMsg.setGmtExec(DateUtils.addMinutes(new Date(), (trxMsg.getExecTimes()+1)*10));
}
update(trxMsg, force);
return;
}
} catch (Exception e) {
log.error("system error", e);
return;
}
}
//成功执行,直接删除就行了
trxMsgMapper.deleteById(trxMsg.getId());
}
/**
* 单个执行
*/
public void execMsgById(Long id) {
TrxMsgPO msg = trxMsgMapper.selectById(id);
TrxMsg trxMsg = trxMsgConvert.convert(msg);
//trxMsg.setExecTimes(0); //强制执行的情况下,执行次数强行置为0
execMsg(trxMsg);
}
private void update(TrxMsg trxMsg, boolean forceUpdateExecTime) {
trxMsgMapper.updateById(trxMsgConvert.reconvert(trxMsg));
if (forceUpdateExecTime) {
trxMsgMapper.updateExecTime(trxMsg.getId(), trxMsg.getExecTimes());
}
}
private List<TrxMsgListener> getListenerList(Topic topic) {
if (msgListenerMap.containsKey(topic)) {
return msgListenerMap.get(topic);
} else {
List<TrxMsgListener> msgList = listenerList.stream().filter(item->item.getTopic().equals(topic)).collect(Collectors.toList());
synchronized (TrxMsgService.class) {
msgListenerMap.put(topic, msgList);
}
return msgList;
}
}
}
消息发布
发布消息的代码片段如下
public class Test {
@Resource
TrxMsgService trxMsgService;
public void test() {
TrxMsg<String> msg = new TrxMsg<>();
msg.setTopic(Topic.TEST_TOPIC1);
msg.setContent("test msg");
//这里可以设置消息的执行时间,主要运用于延迟消息
msg.setGmtExec(new Date());
trxMsgService.publishMsg(msg);
}
}
调度任务
调度任务的作用就是定时按照分组捞取可执行状态的消息,并分发到对应的listener上去执行;
@Slf4j
@Component
public class TrxMsgJob {
@Resource
TrxMsgService trxMsgService;
@Resource
TrxMsgMapper trxMsgMapper;
@Resource
TrxMsgConvert trxMsgConvert;
/**
* 调度任务的入口
**
public void execute() {
String params = NdspHelper.getJobParam();
List<Integer> statusList = new ArrayList<>();
statusList.add(MsgStatus.INIT.code);
statusList.add(MsgStatus.FAIL_WAIT_RETRY.code);
LambdaQueryWrapper<TrxMsgPO> q = new LambdaQueryWrapper<>();
if (StringUtils.isNotBlank(params)) {
log.info("GROUP={}", params);
q.eq(TrxMsgPO::getGroup, params);
}
String now = DateFormatUtils.format(new Date(),"yyyy-MM-dd HH:mm:ss");
q.apply("UNIX_TIMESTAMP(gmt_exec) <= UNIX_TIMESTAMP({0})", now);
q.in(TrxMsgPO::getStatus, statusList);
try {
PageHelper.offsetPage(0, 10);
List<TrxMsgPO> trxMsgDOS = trxMsgMapper.selectList(q);
for (TrxMsgPO msg : trxMsgDOS) {
TrxMsg trxMsg = trxMsgConvert.convert(msg);
if (trxMsg!=null) {
trxMsgService.execMsg(trxMsg);
}
}
} catch (Exception e) {
log.error("system error", e);
}
}
}
标签:异步,return,target,框架,trxMsg,private,trxMsgDO,mysql,public
From: https://www.cnblogs.com/viogs/p/17652020.html