一、生产者重连
由于网络波动可能造成客户端连接MQ失败的情况,通过配置可以开启连接失败后的重连机制:
spring:
rabbitmq:
addresses: xxx.xx.xx.xx
port: 5672
username: xxxxx
password: xxxx
virtual-host: /xxxx
connection-timeout: 1s #设置MQ的连接超时时间
template:
retry:
max-attempts: 3 #最大重试次数
enabled: true #开启重试机制
multiplier: 1 #失败后下次等待时长倍数,下次等待时长=initial-interval * multiplier
initial-interval: 1000ms # 失败后的初始等待时间
注意上述重试配置是写在template配置里面。 配置上述重试之后,把MQ服务停掉
停掉mq之后,向mq发送一个消息
注意:
当网络不稳定的时候,利用重试机制可以有效提高消息发送的成功率。不过SpingAMQP提供的重试机制是 阻塞式 的重试,也就是说多次重试等待的过程中,当前线程是被阻塞的,会影响业务性能。
如果对于业务性能有要求,建议 禁用 重试机制。如果一定要使用,请合理分配等待时长和重试次数,当然也可以考虑使用异步线程来执行发送消息的代码。
二、生产者确认
生产者确认机制:生产者发送消息后,需要等待RabbitMQ服务器的确认消息,以确保消息已经被成功地发送到RabbitMQ服务器。如果RabbitMQ服务器没有收到消息或者消息发送失败,生产者会收到一个确认消息,从而可以进行重发或者其他处理。
RabbitMQ有Publisher Confirm和 Publisher Return两种确认机制。开启确认机制之后,在MQ成功收到消息后会返回确认消息给生产者。返回的结果有如下几种情况:
- 消息投递到了MQ服务器,但是路由失败(路由键写错或者交换机没有绑定队列),此时通过PublisherReturn返回路由异常原因,然后返回ACK,告知投递成功
-
如果队列是非持久化的, 消息投递到了MQ,并且入队成功,返回ACK,告知投递成功
-
如果队列是持久化的,消息投递到MQ服务器,并且入队完成且保存到磁盘之后,返回ACK,告知投递成功
-
其他情况都会返回NACK,告知投递失败。
生产者发送消息给MQ,等待MQ回执有两种方式:同步方式和异步方式
spring:
rabbitmq:
addresses: xxx.xx.xx.xx
port: 5672
username: xx
password: xx
virtual-host: /xx
connection-timeout: 1s #设置MQ的连接超时时间
template:
retry:
max-attempts: 3 #最大重试次数
enabled: true # 开启重试机制
multiplier: 1 #失败后下次等待时长倍数,下次等待时长=initial-interval * multiplier
initial-interval: 1000ms # 失败后的初始等待时间
publisher-returns: true # 开启publisher return机制
publisher-confirm-type: correlated #
说明:
publish-confirms: 开启publish-confirm功能,确认回调,springboot 2.2.0之后不用了,被publish-confirm-type替换了
publish-confirm-type:开启publisher-confirm,这里支持三种类型:
none: 不开启生产者确认
simple:同步等待confirm结果,直到超时
correlated:异步回调,定义ConfirmCallback,MQ返回结果时会回调这个ConfirmCallback
publish-returns:开启publish-return功能,同样是基于callback机制,不过是定义ReturnCallback
https://blog.csdn.net/qq_29385297/article/details/127545272
编写return配置类
每一个RabbitTemplate只能配置一个ReturnCallback,因此需要在项目启动过程中配置。
@Configuration
@Slf4j
public class RabbitMqReturnConfig implements ApplicationContextAware {
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
/**
*
* @param message 发送的消息
* @param replyCode 应答码
* @param replyText 原因
* @param exchange 交换机
* @param routingKey 路由键
*/
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
log.info("消息发送失败: 应答码={}, 原因={}, 交换机={},路由键={},消息={}",
replyCode,replyText,exchange,routingKey,message.getBody().toString());
}
});
}
}
配置ConfirmCallback
ACK代表Acknowledgement,即确认消息。消息的Confirm需要配置在每次发送消息时
@RequestMapping("/sendMessageConfirm")
public String sendMessageConfirm(String message){
// 直接向队列里面发送消息
CorrelationData correlationData=new CorrelationData();
correlationData.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {
/**
* 此处的失败,并不代表nack,此处的失败是spring内部处理的时候失败了
* @param throwable
*/
@Override
public void onFailure(Throwable throwable) {
// future发送异常时的处理逻辑,一般不会发生直接打印就行
log.error("on failure:{}",throwable);
}
/**
* 代表MQ的回调成功,但并不代表消息发送的结果
* @param confirm
*/
@Override
public void onSuccess(CorrelationData.Confirm confirm) {
if(confirm.isAck()){
log.info("消息发送成功,收到ACK");
}else {
log.info("消息发送失败,收到NACK,原因:{}",confirm.getReason());
}
}
});
Map<String,Object> map=new HashMap<>();
map.put("name","张三");
map.put("address","浙江杭州");
rabbitTemplate.convertAndSend(BootMqConstant.TOPICS_EXCHANGE_NAME,"object.hahha",map,correlationData);
return "ok";
}
correlationData.getFuture()
中的Future其实就是多线程中的Future。
上述代码发送了两条消息到topics_object_queue
队列中,发送成功之后,生产者被回调了两次
ack
returnCallback
故意写路由键,实验ReturnCallback
故意写路由键,实验ReturnCallback
nack
运行结果
生产者确认机制有额外的网络和系统资源的开销,一般不建议使用。如果一定要使用,不用开启Publisher-Return机制,因为一般是由于路由失败做成,需要开发人员去更改路由键。
在生产中,实际上只需要处理nack的情况,如果遇到nack的情况,可以做有限次的重试,如果重试之后还失败则需要记录失败的异常消息,可以记录到数据库中。
标签:发送,可靠性,confirm,生产者,RabbitMQ,重试,xx,MQ,消息 From: https://www.cnblogs.com/cplinux/p/17977666