重复消费产生原因
- 生产者多次投递-投递时服务端接收后客户端网络原因确认失败,重新投递
- 消费者扩容重试-消费者扩容导致正在消费的消息没有正常应答,服务端重新推送
重复消费解决方案
- 给消息增加唯一key,消费时校验key是否已经消费过
- 消费者控制消息的幂等性(多次同样的操作结果一致)
幂等性保证方案
- 业务上判断重复消费是否会导致幂等性,如查询操作并不会影响操作结果,不需要处理
- 数据库提供重复表,每次操作前往重复表插入数据,插入成功后再消费
- 使用redis分布式锁的特性,消费前先将key存储在redis中,存储成功则进行消费
数据库重复表
去重表
创建数据库表,提供字段存储消息的key,并对该字段添加唯一索引;本文提供示例:
CREATE TABLE `mq_repeat_check` (
`mq_key` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL,
## 将key设置为主键,确保唯一性
PRIMARY KEY (`mq_key`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_0900_ai_ci ROW_FORMAT = Dynamic;
SET FOREIGN_KEY_CHECKS = 1;
业务代码添加幂等校验
@Component
@RocketMQMessageListener(
topic = "mqRepeatTopic",
consumerGroup = "mqRepeatConsumerGroup1",
messageModel = MessageModel.BROADCASTING
)
public class MqRepeatTopicListener1 implements RocketMQListener<MessageExt> {
private static final Logger logger = LoggerFactory.getLogger(MqRepeatTopicListener1.class);
@Autowired
private MqRepeatCheckService mqRepeatCheckService;
@Override
public void onMessage(MessageExt msgExt) {
// 执行本地事务逻辑,返回事务状态
try {
String bodyStr = new String(msgExt.getBody());
logger.info("【重复消费验证1】消息消费开始:{}", bodyStr);
/*
* 数据库重复表验证方式
*/
Integer num = mqRepeatCheckService.addMqMsg(msgExt.getKeys());
if (num < 1) {
logger.info("【重复消费验证1】消息已经消费");
return;
}
logger.info("【重复消费验证1】消息消费结束");
} catch (Exception e) {
logger.error("消息消费失败!", e);
throw e;
}
}
}
redis分布式锁
利用redis中set命令的NX特性,将消息key存储在redis中,存储成功则进行消费
示例代码:
@Component
@RocketMQMessageListener(
topic = "mqRepeatTopic",
consumerGroup = "mqRepeatConsumerGroup2",
messageModel = MessageModel.BROADCASTING
)
public class MqRepeatTopicListener2 implements RocketMQListener<MessageExt> {
private static final Logger logger = LoggerFactory.getLogger(MqRepeatTopicListener2.class);
@Autowired
private RedisUtils redisUtils;
@Override
public void onMessage(MessageExt msgExt) {
// 执行本地事务逻辑,返回事务状态
try {
String bodyStr = new String(msgExt.getBody());
logger.info("【重复消费验证2】消息消费开始:{}", bodyStr);
/*
* redis分布式锁验证方式
*/
boolean locked = redisUtils.setnx(msgExt.getKeys(),60*60*24);
if (!locked) {
logger.info("【重复消费验证1】消息已经消费");
return;
}
logger.info("【重复消费验证2】消息消费结束");
} catch (Exception e) {
logger.error("消息消费失败!", e);
throw e;
}
}
}
标签:消费,应用,重复,redis,msgExt,key,解决,logger,RocketMQ
From: https://www.cnblogs.com/zly1015/p/17996886