目录
2.重复调用接口时,会提示错误所以改进,自己搞一个template,
发送方确认
当消息的生产者发送消息以后,怎么知道是否到达服务器呢?
发送方确认:生产者到Broker的解决方案,
confirm:确认模式
在发送消息的时候,不管消息是否到达exchange,这个监听都会被执行,如果exchange的ack为true,如果为false,ACK为false;
return:退回模式
消息到达exchange之后,会根据路由规则匹配,把消息放入Queue中,Exchange到Queue的过程,如果一条消息无法被任何队列消费,可以选择把消息退回给发送者,消息退回给发送者时候,可以设置一个返回回调方法,对消息进行处理
不是互斥,可以单独使用,也可以结合使用
confirm:确认模式
package com.example.constant; public class Constants { public static final String CONFIRM_QUEUE="confirm.queue"; public static final String CONFIRM_EXCHANGE="confirm.exchange"; }
//发送方确认 @Bean("confirmQueue") public Queue confirmQueue(){ return QueueBuilder.durable(Constants.CONFIRM_QUEUE).build(); } @Bean("confirmExchange") public DirectExchange confirmExchange(){ return ExchangeBuilder.directExchange(Constants.CONFIRM_EXCHANGE).build(); } @Bean("confirmBinding") public Binding confirmBinding(@Qualifier("confirmQueue")Queue queue,@Qualifier("confirmExchange")Exchange exchange){ return BindingBuilder.bind(queue).to(exchange).with("confirm").noargs(); }
@RequestMapping("/confirm") public String confirm(){ //设置回调方法 rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { System.out.println("执行了confirm方法"); if(ack){ System.out.printf("接收到消息,消息的ID:%s \n",correlationData==null? null:correlationData.getId()); }else{ System.out.printf("未接收到消息,消息ID:%s,cause:%s \n",correlationData==null?null:correlationData.getId(),cause); } } }); CorrelationData correlationData=new CorrelationData("1"); rabbitTemplate.convertAndSend(Constants.CONFIRM_EXCHANGE+"1","confirm,","confirm test...",correlationData); return "消息发送成功"; }
存在两个问题:
这种方式设置confirmcallback影响所有使用RabbitTemplate的方法
package com.example.config; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitTemplateConfig { @Bean public RabbitTemplate confirmrabbitTemplate(ConnectionFactory connectionFactory){ RabbitTemplate rabbitTemplate=new RabbitTemplate(connectionFactory); //设置回调方法 rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { System.out.println("执行了confirm方法"); if(ack){ System.out.printf("接收到消息,消息的ID:%s \n",correlationData==null? null:correlationData.getId()); }else{ System.out.printf("未接收到消息,消息ID:%s,cause:%s \n",correlationData==null?null:correlationData.getId(),cause); } } }); return rabbitTemplate; } }
但是又会一个问题,
当我们使用以上改动时候,发现执行上面的方法时候,根本没执行confirm方法
但是确报执行力confirm方法。(我们用的初始想法是用的是两个分别的rabbitMQtemplate,)你引入的template都是你自己声明的了
2.重复调用接口时,会提示错误所以改进,自己搞一个template,
假如发送到了queue,但是routingKey不正确,但是走的逻辑还是消费到了,
return退回模式:
消息到达Exchange之后,会根据路由规则匹配,把消息放入Queue中,Exchange到Queue中,Exchange到Queue的过程,如果一条消息无法被任何队列消息(即使没有队列与消息的路由匹配键匹配或者队列不存在等),可以选择把消息退回给发送者,消息退回给发送者,我们可以设置一个回调方法,对消息进行处理
回调函数有一个参数 ReturnedMessage
Message message:返回的消息对象,包含了消息体和消息属性
int replyCode:由Broker提供回复码,表示消息无法路由的原因,通常是一个数字代码,每个数字代表不同的含义。
String replyTest:一个文本串,提供了无法路由消息的额外信息或者错误描述
String exchange:消息被发送到交换机名称
String routingKey:消息的路由键,即发送消息时指定的键
package com.example.config; import org.springframework.amqp.core.ReturnedMessage; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitTemplateConfig { @Bean public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){ RabbitTemplate rabbitTemplate=new RabbitTemplate(); return rabbitTemplate; } @Bean public RabbitTemplate confirmrabbitTemplate(ConnectionFactory connectionFactory){ RabbitTemplate rabbitTemplate=new RabbitTemplate(connectionFactory); //设置回调方法 rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { System.out.println("执行了confirm方法"); if(ack){ System.out.printf("接收到消息,消息的ID:%s \n",correlationData==null? null:correlationData.getId()); }else{ System.out.printf("未接收到消息,消息ID:%s,cause:%s \n",correlationData==null?null:correlationData.getId(),cause); } } }); //消息被退回的时候,回调方法,不设置就不会执行这个回调方法 rabbitTemplate.setMandatory(true); rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback(){ @Override public void returnedMessage(ReturnedMessage returnedMessage) { System.out.println("消息退回:"+returnedMessage); } }); return rabbitTemplate; } }
RabbitMQ的可靠性/保证消息不丢失..
1.对应的生产者发送消息到RabbitMQ失败,解决方法,confirm确认模式
消息在交换机中无法路由到制定队列:return模式
消息队列自身数据丢失
可能原因:消息到达RabbitMQ之后,RabbitMQ宕机导致数据消失,
解决方法:持久性,开启持久化,消息写入之后,会持久化到磁盘,如果RabbitMQ挂了,恢复之后就会自动读取之前的数据(极端情况,没持久化的时候宕机,集群,提高可靠性)
4.消费者异常,导致消息丢失:
可能原因:消息到达消费者,还没来得及消费,消费者宕机,消费者逻辑有问题
解决方法:[消息确认]RabbitMQ提供了消费者应答机制,来使RabbitMQ能够感知到消费者是否消费成功之后,才会删除消息,从而避免消息丢失,也可以重试机制
重试机制
RabbitMQ的SDK提供方式:
自动确认:消息到达消费者,消息就去删除了
手动确认:消息处理成功后,需要进行ACK.
这个里面的AUTO时消费者处理消息时,抛出异常,会自动进行重试.会一直进行重试
这样就会重试五次啦。但是他的deliverTag不会变化.
手动确认 和那个auto就差不多了,但是也是因为我们选择的是重新入队deliver Tag是不断变化的,因为是不断重新入队,手动的话,我们配置重试次数啥的是无效的,我们的重试机制是在自动确认下才有效
标签:RabbitTemplate,rabbitTemplate,confirm,确认,模式,RabbitMQ,correlationData,public,消息 From: https://blog.csdn.net/weixin_72953218/article/details/141966923