4、消息幂等性
在系统中当使用消息队列时,无论做哪种技术选型,有很多问题是无论如何也不能忽视的,如:消息必达、消息幂等等。本章节以典型的RabbitMQ为例,讲解如何保证消息幂等的可实施解决方案,其他MQ选型均可参考。
4.1 消息重试演示
消息队列的消息幂等性,主要是由MQ重试机制引起的。因为消息生产者将消息发送到MQ-Server后,MQ-Server会将消息推送到具体的消息消费者。假设由于网络抖动或出现异常时,MQ-Server根据重试机制就会将消息重新向消息消费者推送,造成消息消费者多次收到相同消息,造成数据不一致。
在RabbitMQ中,消息重试机制是默认开启的,但只会在consumer出现异常时,才会重复推送。在使用中,异常的出现有可能是由于消费方又去调用第三方接口,由于网络抖动而造成异常,但是这个异常有可能是暂时的。所以当消费者出现异常,可以让其重试几次,如果重试几次后,仍然有异常,则需要进行数据补偿。
数据补偿方案:当重试多次后仍然出现异常,则让此条消息进入死信队列,最终进入到数据库中,接着设置定时job查询这些数据,进行手动补偿。
本节中以consumer消费异常为演示主体,因此需要修改RabbitMQ配置文件。
- 修改consumer一方的配置文件
# 消费者监听相关配置
listener:
simple:
retry:
# 开启消费者(程序出现异常)重试机制,默认开启并一直重试
enabled: true
# 最大重试次数
max-attempts: 5
# 重试间隔时间(毫秒)
initial-interval: 3000
- 设置消费异常
当consumer消息监听类中添加异常,最终接受消息时,可以发现,消息在接收五次后,最终出现异常。
4.2 消息幂等解决
要保证消息幂等性的话,其实最终要解决的就是保证多次操作,造成的影响是相同的。那么其解决方案的思路与服务间幂等的思路其实基本都是一致的。
- 消息防重表,解决思路与服务间幂等的防重表一致。
- redis。利用redis防重。
这两种方案是最常见的解决方案。其实现思路其实都是一致的。
4.2.1 修改OrderController
/**
* 此处为了方便演示,不做基础添加数据库操作
*/
@PostMapping("/addOrder")
public String addOrder(){
String uniqueKey = String.valueOf(idWorker.nextId());
MessageProperties messageProperties = new MessageProperties();
messageProperties.setMessageId(uniqueKey);
messageProperties.setContentType("text/plain");
messageProperties.setContentEncoding("utf8");
Message message = new Message("1271700536000909313".getBytes(),messageProperties);
rabbitTemplate.convertAndSend(RabbitMQConfig.REDUCE_STOCK_QUEUE,message);
return "success";
}
4.2.2 修改stockApplication
@Bean
public JedisPool jedisPool(){
JedisPoolConfig poolConfig = new JedisPoolConfig();
return new JedisPool(poolConfig, "192.168.150.4",6379, 2000, "");
}
4.2.3 新增消息监听类
/**
* @author xinlei
* @date 2024/12/16
*/
@Component
public class ReduceStockListener {
@Autowired
private StockService stockService;
@Autowired
private JedisPool jedisPool;
@Autowired
private StockFlowService stockFlowService;
@RabbitListener(queues = RabbitMQConfig.REDUCE_STOCK_QUEUE)
@Transactional
public void receiveMessage(Message message) {
// 获取消息id
String messageId = message.getMessageProperties().getMessageId();
Jedis jedis = jedisPool.getResource();
System.out.println(messageId);
try {
// redis锁去重校验
if (!"OK".equals(jedis.set(messageId, messageId, "NX", "PX", 300000))) {
System.out.println("重复请求");
return;
}
// mysql状态校验
if (!(stockFlowService.findByFlag(messageId).size() == 0)) {
System.out.println("数据已处 理");
return;
}
String goodsId = null;
try {
// 获取消息体中goodsId
goodsId = new String(message.getBody(), "utf-8");
stockService.reduceStock(goodsId, messageId)
;
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
int nextInt = new Random().nextInt(100);
System.out.println("随机数:" + nextInt);
if (nextInt % 2 == 0) {
int i = 1 / 0;
}
} catch (RuntimeException e) {
// 解锁
String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
jedis.eval(script, Collections.singletonList(messageId), Collections.singletonList(messageId));
System.out.println("出现异常了");
System.out.println(messageId + ":释放锁");
throw e;
}
}
}
4.3 消息缓冲区
对于RabbitMQ的使用,默认情况下,每条消息都会进行分别的ack通知,消费完一条后,再来消费下一条。但是这样就会造成大量消息的阻塞情况。所以为了提升消费者对于消息的消费速度,可以增加consumer数据或者对消息进行批量消费。MQ接收到producer发送的消息后,不会直接推送给consumer。而是积攒到一定数量后,再进行消息的发送。这种方式的实现,可以理解为是一种缓冲区的实现,提升了消息的消费速度,但是会在一定程度上舍弃结果返回的实时性。
对于批量消费来说,也是需要考虑幂等的。对于幂等性的解决方案,沿用刚才的思路即可解决。
标签:String,业务,技术,重试,消息,messageId,println,out From: https://blog.csdn.net/qq_54698124/article/details/144506860