RabbitMQ如何保证消息可靠性?
1.开启生产者确认机制,确保生产者的消息能到达队列。
2.开启持久化功能,确保消息未消费前在队列中不会丢失。
3.开启消费者确认机制为auto,由Spring确认消息处理成功后完成ack。
4.开启消费者失败重试机制,并设置MessageRecoverer,多次重试失败后将消息投递到指定的交换机,交由人工处理。
实现生产者确认机制
原理:RabbitMQ提供了publisher confirm机制来避免消息发送到MQ过程中丢失。这种机制必须给每个消息指定一个唯一ID。消息发送到MQ以后,会返回一个结果给发送者,表示消息是否处理成功。
返回结果有两种情况:
1.publisher-confirm,发送者确认。发送者确认又分为两种情况。一种是消息成功到达交换机,返回ack;另一种是消息没有到达交换机,返回nack。
2.publisher-return,发送者回执。消息成功到达交换机,但是在投递到队列时失败。此时返回ack及失败原因。
注意:确认机制发送消息时,需要给每个消息设置一个全局唯一的id,以区分不同的消息,避免ack冲突。
实现步骤
1.修改配置。在publisher服务中的application.yml文件中添加下列内容
点击查看代码
spring:
rabbitmq:
publisher-confirm-type: correlated
publisher-returns: true
template:
mandatory: true
点击查看代码
1.publisher-confirm-type:开启publisher-confirm,这里支持两种类型:
* simple:同步等待confirm结果,直到超时。
* correlated:异步回调,定义ConfirmCallback,MQ返回结果时会回调这个ConfirmCallback。
2.publisher-returns:开启publisher-return功能,同样是基于callback机制,不过是定义了ReturnCallback。
3.template.mandatory:定义消息路由失败时的策略。true,则调用ReturnCallback;false:则直接丢弃消息。
点击查看代码
@Slf4j
@Configuration
public class CommonConfig implements ApplicationContextAware {
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
//获取RabbitTemplate
RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
//设置ReturnCallback
rabbitTemplate.setReturnCallback((message,replyCode,replyText,exchane,routingKey)->{
//投递失败,记录日志
log.info("消息发送失败,应答码{},原因{},交换机{},路由键{},消息{}",
replyCode,replyText,exchane,routingKey,message.toString());
//如果业务需要,可以重新发送消息
/*if (routingKey==null || routingKey.startsWith("simple")){
rabbitTemplate.convertAndSend(交换机,"队列",消息);
}*/
});
}
}
点击查看代码
@Slf4j
@SpringBootTest
@RunWith(SpringRunner.class)
public class ConfirmCallback {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void ConfirmQueue(){
//1.消息体
String msg="Hello,RabbitMQ!";
//2.设置一个全局唯一ID,封装到CorrelationData中
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
//3.添加callback
correlationData.getFuture().addCallback(
result ->{
//3.1:ack,消息发送成功
if (result.isAck()){
log.debug("消息发送成功,ID:{}"+correlationData.getId());
}else { //3.2:nack,消息发送失败
log.error("消息发送失败未送达交换机,ID:{},原因:{}",correlationData.getId(),result.getReason());
}
},
ex -> log.error("消息发送异常,ID:{},原因:{}",correlationData.getId(),ex.getMessage())
);
//4.发送消息
rabbitTemplate.convertAndSend("wzh.direct","miku",msg,correlationData);
}
}
这里打印出了消息的id和发送失败的原因。
查看投递到队列失败的效果。将“rabbitTemplate.convertAndSend("wzh.direct","miku",msg,correlationData);”中的routingKey修改成错误的routingKey。
这里打印出了相关信息。
实现消息持久化
生产者确认可以确保消息投递到RabbitMQ的队列中,但是消息发送到RabbitMQ以后,如果突然宕机,也可能导致消息丢失。
要想确保消息在RabbitMQ中安全保存,必须开启消息持久化机制。
实现消息持久化从三个方面入手
1.交换机持久化
2.队列持久化
3.消息持久化
默认情况下,SpringAMQP发出的任何消息都是持久化的,不用特意指定。
实现消费者确认机制
RabbitMQ是阅后即焚机制,RabbitMQ确认消息被消费者消费后会立刻删除。而RabbitMQ是通过消费者回执来确认消费者是否成功处理消息的:消费者获取消息后,应该向RabbitMQ发送ACK回执,表明自己已经处理消息。
设想一个场景:RabbitMQ将消息投递给消费者,消费者返回ack给RabbitMQ,RabbitMQ删除消息,但是此时消费者宕机,消费还没有处理。如此一来,消息便丢失了。因此需要把握消费者返回ack的时机。
SpringAMQP允许三种确认模式
点击查看代码
1.manual:手动ack,需要在业务代码结束后,调用api发送ack。(manual:自己根据业务情况,判断什么时候该返回ack)
2.auto:自动ack,由Spring检测listener代码是否出现异常,没有异常则返回ack,抛出异常则返回nack。(auto模式类似事务机制,出现异常时返回nack,消息回滚到mq;没有异常,返回ack)
3.none:关闭ack,MQ假定消费者获取消息后会处理成功,在投递完成后立即删除消息。(该模式下消息投递不可靠,可能丢失。)
实现消费者失败重试机制
当消费者出现异常后,消息会不断requeue(重入队)到队列,再重新发送给消费者,然后再次异常,再次requeue,无限循环,导致mq的消息处理飙升,带来不必要的压力。这就需要使用重试机制进行介入。
实现步骤
1.修改consumer服务的application.yml文件,添加内容
点击查看代码
spring:
rabbitmq:
listener:
simple:
retry:
enabled: true # 开启消费者失败重试
initial-interval: 1000 # 初识的失败等待时长为1秒
multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval
max-attempts: 3 # 最大重试次数
stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false
2.选择失败策略,一般是将发送失败的消息发送到指定的交换机。
三种失败策略
点击查看代码
- RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认就是这种方式
- ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队
- RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机
点击查看代码
@Bean
public DirectExchange errorExchange(){
return new DirectExchange("wzh.error");
}
@Bean
public Queue errorQueue(){
return new Queue("error.queue");
}
@Bean
public Binding errorBinding(){
return BindingBuilder.bind(errorQueue()).to(errorExchange()).with("error");
}
点击查看代码
@Bean
public MessageRecoverer republicMeassageRecoverer(RabbitTemplate rabbitTemplate){
return new RepublishMessageRecoverer(rabbitTemplate,"wzh.error","error");
}
在消费者一端弄一个错误,之后向该消费者发送一条消息。
等待几秒后会在控制台上打印出一条信息。
到RabbitMQ客户端查看,此时error.queue已经有了一条消息。
点击去查看消息,会发现异常原因和消息体都会出现。