1. 保证MQ消息的可靠性
保证MQ消息的可靠性分两个方面:保证生产消息的可靠性、保证消费消息的可靠性。
1.1 保证生产消息的可靠性
1.1.1 重试机制
首先发送消息的方法如果执行失败会进行重试,这里我们在发送消息的工具类中使用spring提供的@Retryable注解,实现发送失败重试机制,通过注解的backoff属性指定重试等待策略,通过Recover注解指定失败回调方法,失败重试后仍然失败的会走失败回调方法,在回调方法中将失败消息写入一个失效消息表由定时任务进行补偿(重新发送),如果系统无法补偿成功则由人工进行处理,单独开发人工处理失败消息的功能模块。
通过@Retryable
注解可以实现消息发送失败时的重试机制,避免因为网络抖动或临时故障导致的发送失败:
- backoff属性用于定义重试间隔。
- maxAttempts属性用于定义最大重试次数。
- Recover注解用于指定重试失败后的回调逻辑,将未成功发送的消息记录到失败表中。
@Retryable(value = Exception.class, maxAttempts = 3, backoff = @Backoff(delay = 2000))
public void sendMessage(String message) {
// 发送消息逻辑
}
@Recover
public void recover(Exception e, String message) {
// 发送失败的回调逻辑,如记录失败消息
}
1.1.2 生产者确认机制(Producer Confirm)
RabbitMQ提供了生产者确认机制,通过Publisher Confirm
来确保消息从生产者成功发送到Broker,并最终到达队列。这一机制通过ACK(Acknowledgment)和NACK(Negative Acknowledgment)来确认消息的处理结果。
- ACK: 表示消息已成功到达Broker。
- NACK: 表示消息没有成功到达,可以根据NACK结果进行处理。
具体的做法:
- 消息唯一标识:每个消息发送时指定一个唯一ID,用于标识该消息的状态。
- 回调机制:通过设置回调方法(ConfirmCallback),可以监控ACK/NACK的回执状态。
- 如果返回ACK,则表示消息已成功投递。
- 如果返回NACK,生产者可根据NACK结果进行重发或记录到失败消息表,交给定时任务处理。
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
if (ack) {
// 消息成功投递
System.out.println("消息成功发送到Broker,ID:" + correlationData.getId());
} else {
// 消息投递失败
System.out.println("消息发送失败,原因:" + cause);
// 记录到失败表
}
});
1.1.3 ReturnCallback回调机制
在某些情况下,消息可能会成功到达Broker,但未能路由到队列。这时会触发ReturnCallback
回调方法,通过它可以接收到未成功投递的消息,并进行相应的处理或补偿。
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
System.out.println("消息无法路由,交换机:" + exchange + ",路由键:" + routingKey);
// 补偿逻辑,如记录消息
});
1.1.4 定时任务
上述三种方法,将失败的消息写入到一个失败消息记录表,然后由定时任务进行补偿(重新发送),如果系统无法补偿成功则由人工进行处理,单独开发人工处理失败消息的功能模块。
1.2 保证消费消息可靠性
为了防止消息在Broker中丢失,可以将消息设置为持久化,具体需要设置交换机和队列支持持久化,发送消息设置deliveryMode=2。这样即使RabbitMQ重启,也能从磁盘中恢复未处理的消息。
1.2.1 重试机制
消费者在处理消息时,如果出现异常,可以使用重试机制来进行故障恢复。重试机制通常会和死信队列结合使用:
- 当重试次数超过一定阈值,消息将被投递到失败消息队列,由定时任务或者人工处理。
- 通过设置队列的
x-dead-letter-exchange
和x-dead-letter-routing-key
,可以指定消息在重试失败后进入失败消息队列。
1.2.2 消费确认机制(Consumer Acknowledgment)
RabbitMQ是通过消费者回执来确认消费者是否成功处理消息的:消费者获取消息后,应该向RabbitMQ发送ACK回执,表明自己已经处理完成消息,RabbitMQ收到ACK后删除消息。
RabbitMQ提供了三种消费确认模式:
- 自动ACK(默认): RabbitMQ自动确认消息,不管消费者是否成功处理了消息,这种模式下容易丢失消息,不建议使用。
- 手动ACK: 需要消费者手动确认消息是否被处理成功,如果成功处理则发送ACK回执,RabbitMQ会删除该消息;如果处理失败,可以进行重试或将消息放入死信队列。
- 关闭ACK: 消费者不发送任何ACK,不推荐使用。
一般,我们都是使用默认的auto即可。
spring:
rabbitmq:
....
listener:
simple:
acknowledge-mode: auto #,出现异常时返回nack,消息回滚到mq;没有异常,返回ack
retry:
enabled: true # 开启消费者失败重试
initial-interval: 1000 # 初识的失败等待时长为1秒
multiplier: 10 # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval
max-attempts: 3 # 最大重试次数
stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false
1.2.3 定时任务
重试机制达到上限后,将消息投递到失败消息队列,失败消息队列由定时任务程序定时处理,如果系统无法处理成功则由人工进行处理。
1.3 无法百分百保证MQ的消息可靠性
尽管通过生产者确认机制、消费者ACK机制、持久化、消息重试和补偿等措施可以极大提高MQ消息的可靠性,但由于不可控因素(如网络断开、硬件故障、进程异常终止等),无法百分之百保证消息的可靠性。因此,企业在设计使用MQ时,需要充分考虑异常情况的存在,并通过补偿机制、消息幂等性设计、消息重试等手段进一步降低消息丢失的风险。
2. 保证消息幂等性
2.1 幂等性是什么
幂等性(Idempotency)是指一个操作可以重复执行多次,但无论执行多少次,结果都是相同的,不会有副作用。对于分布式系统中使用消息队列的场景,幂等性通常用于保证消息不会因重复消费而导致数据错误。
举个例子:
假设你在电商系统中处理“用户下单”的消息,如果这个操作不是幂等的,用户下单的消息被重复消费时,可能会导致系统生成多笔订单、重复扣款。而如果“用户下单”这个操作是幂等的,不管消息被消费几次,最终都只会有一笔订单生成,避免重复处理。
2.2 为什么会有重复消费
MQ(消息队列)中,可能会发生消费者重复消费的情况。导致重复消费的原因有很多:
- 消费者处理失败:消费者处理消息时出现异常或超时,导致MQ认为消息未被成功处理,于是重新投递。
- 网络抖动:消费者收到消息后未能及时发送ACK确认,导致消息被再次投递。
- 业务逻辑错误:系统没有考虑幂等性设计,在同一条消息重复消费时导致数据出错。
2.3 如何保证MQ消息的幂等性
2.3.1 使用数据库的唯一约束控制幂等性
数据库的唯一约束可以帮助确保一条消息只被处理一次。例如,在插入一条订单数据时,可以为订单ID设置唯一约束。这样即使同一条消息被重复消费,数据库的唯一约束也会保证只有第一次插入成功,后续的重复插入会失败。
- 适用场景: 比如订单系统,确保同一个订单号不会被重复处理。
CREATE TABLE orders (
order_id VARCHAR(255) PRIMARY KEY, -- 唯一约束,订单ID
user_id VARCHAR(255),
product_id VARCHAR(255),
quantity INT,
status VARCHAR(50)
);
try {
// 尝试插入订单
insertOrder(order);
} catch (DuplicateKeyException e) {
// 捕获唯一键冲突异常,表明该订单已经存在,无需重复处理
System.out.println("重复订单,不再处理");
}
2.3.2 使用Token机制
Token机制是另一种确保幂等性的常见手段。可以在发送消息时为每条消息指定一个唯一的标识符(Token或消息ID),然后在消费时通过Redis等缓存系统去判断该消息是否已经被消费过。
-
消息发送时生成唯一Token: 每条消息在生产时会生成一个唯一ID,比如UUID。
-
消息ID记录到Redis: 生产者在发送消息时,将该消息的ID存储到Redis,作为消息已经处理的标记。
-
消费者消费前先查询Redis: 当消费者接收到消息时,首先在Redis中查找该消息ID,如果发现ID已经存在,说明该消息已经被处理过,则跳过该次处理。
-
消费完成后,记录消息ID到Redis: 消费者成功处理完消息后,将该消息ID记录到Redis,表示该消息已成功消费。
String messageId = message.getId(); // 消息唯一ID
// 检查消息是否已处理
if (redisTemplate.hasKey(messageId)) {
// 如果存在,表明消息已处理过,跳过
return;
}
// 处理消息
processMessage(message);
// 消费完成后,将消息ID存储到Redis,设置过期时间
redisTemplate.opsForValue().set(messageId, "consumed", 10, TimeUnit.MINUTES);
优点: 这种方式适合需要临时性幂等检查的场景,过期时间可以根据业务场景进行设置。
标签:方案,处理,技术,发送,重试,失败,消息,MQ,ID From: https://blog.csdn.net/qq_46637011/article/details/142170167