1. 消息发送者的可靠性
保证消息的可靠性可以通过发送者重连和发送者确认来实现
发送者重连
发送者重连机制就是在发送信息的时候如果连接不上mq不会立即结束,而是会在一定的时间间隔之类进行重新连接,连接的次数和时间都是由我们在配置文件中指定的,具体的就是通过retry属性来
spring: rabbitmq: # rabbitmq配置 host: localhost # rabbitmq地址 port: 5672 # rabbitmq端口 virtual-host: /hmall # 虚拟主机 username: hmall # 用户名 password: 123 # 密码 template: # 消息发送相关配置 retry: # 重试相关配置 enabled: true # 启用重试 max-attempts: 3 # 最大重试次数 initial-interval: 1000 # 初始重试间隔 multiplier: 2 # 重试间隔倍数 max-interval: 10000 # 最大重试间隔
测试
将MQ关闭,然后随便写一个消息发送案例,就能够看见效果
package com.itheima.publisher; import org.junit.jupiter.api.Test; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import java.util.HashMap; import java.util.Map; @SpringBootTest public class PublisherApplicationTest { @Autowired RabbitTemplate rabbitTemplate; @Test public void test() { String exchangeName = "fanout.hamll"; Map map=new HashMap(); map.put("name","hamll"); map.put("age",18); map.put("sex","男"); rabbitTemplate.convertAndSend("fanout.hamll.query2", map); } }
发送者确认
在一般的情况下,消息很少会出现问题,但是还是有出现问题的可能性,比如:
1. 消息发送后无法路由键找不到相关队列
2. 绑定的交换机不存在
3. 消息发送出现异常
针对这一情况,MQ为我们提供了多种消息确认机制,比如:Publisher Return、Publisher Confirm
spring: rabbitmq: publisher-confirm-type: correlated # 开启publisher confirm机制,并设置confirm类型 publisher-returns: true # 开启publisher return机制
Publisher Return
着重于绑定的队列、交换机、路由是否成功,并且能够监听到相关的信息,比如交换机、路由、提示等
在使用的过程总需要一个全局的配置类
package com.itheima.publisher.config; import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.ReturnedMessage; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.context.annotation.Configuration; import javax.annotation.PostConstruct; /** * RabbitMQ 配置类,用于配置 RabbitTemplate 的回调函数。 */ @Slf4j // 使用 Lombok 注解引入日志记录器 @AllArgsConstructor // 使用 Lombok 注解生成全参构造函数 @Configuration // 标记为 Spring 配置类 public class MqConfig { private final RabbitTemplate rabbitTemplate; // 注入 RabbitTemplate 实例 /** * 初始化方法,在 Bean 创建后立即执行。 * 设置 RabbitTemplate 的返回消息回调函数。 */ @PostConstruct // 标记为初始化方法 public void init(){ rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() { /** * 当消息被 broker 返回时触发的回调函数。 * @param returned 返回的消息对象 */ @Override public void returnedMessage(ReturnedMessage returned) { log.error("触发return callback,"); // 记录错误日志,表示触发了返回回调 log.debug("exchange: {}", returned.getExchange()); // 记录交换机名称 log.debug("routingKey: {}", returned.getRoutingKey()); // 记录路由键 log.debug("message: {}", returned.getMessage()); // 记录消息内容 log.debug("replyCode: {}", returned.getReplyCode()); // 记录回复代码 log.debug("replyText: {}", returned.getReplyText()); // 记录回复文本 } }); } }
Publisher Confirm
适用于更加复杂复杂的业务,MQ通过方法回调来告诉发送者消息是否发送成功,提供了两个方法的回调:
1. onFailure 在发送消息出现异常的时候会被捕获、并且接收了一个异常对象来返回异常信息。
2. onSuccess 在发送的时候如果成功被MQ接收到就会触发、onSuccess通常会接收两个参数作为参数(
CorrelationData.Confirm
)、Confirm有一个IsAck()方法来表示是否被确认:
- true:表示消息被成功确认(ack),即消息已经被 RabbitMQ 正确接收并处理。
- false:表示消息未被确认(nack),可能是因为 RabbitMQ 内部错误或其他原因导致消息无法被正确处理
package com.itheima.publisher; import lombok.extern.slf4j.Slf4j; import org.junit.jupiter.api.Test; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.util.concurrent.ListenableFutureCallback; /** * 测试类,用于验证消息发布功能。 */ @SpringBootTest @Slf4j // 使用 Lombok 注解引入日志记录器 public class PublisherApplicationTest { @Autowired private RabbitTemplate rabbitTemplate; // 注入 RabbitTemplate 实例 /** * 测试方法,验证消息发布功能。 * @throws InterruptedException 可能抛出的中断异常 */ @Test public void test() throws InterruptedException { // 创建 CorrelationData 对象,用于唯一标识消息 CorrelationData correlationData = new CorrelationData(); // 设置回调函数,处理消息发送的结果 correlationData.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() { /** * 消息发送失败时的回调函数。 * @param ex 异常信息 */ @Override public void onFailure(Throwable ex) { // 记录消息发送失败的异常信息 log.info("消息发送失败、异常!{}", ex.getMessage()); } /** * 消息发送成功时的回调函数。 * @param result 确认结果 */ @Override public void onSuccess(CorrelationData.Confirm result) { // 检查消息是否被确认 if (result.isAck()) { // 记录消息发送成功的日志 log.info("消息发送成功"); } else { // 记录消息发送失败的原因 log.info("消息发送失败!{}", result.getReason()); } } }); // 发送消息到指定的交换机和路由键 rabbitTemplate.convertAndSend("pay.direct", "pay.success", "hello rabbitmq", correlationData); } }
标签:可靠性,持久,AMQP,Spring,springframework,发送,消息,org,import From: https://blog.csdn.net/a1241436267/article/details/144989603数据持久化
默认情况下MQ的数据都是临时数据,MQ故障重启后消息都会丢失,为了保证消息的可靠性就需要做持久化操作,MQ的持久化包括:
1. 交换机持久化
2. 队列持久化
3. 消息持久化
交换机持久化
可以在控制台创建的时候设置为
Durable
就是持久化模式,Transient
就是临时模式。
如果是代码注解开发可以在参数列表通过durable为true指定持久化或者不持久化,交换机通过注解创建一般都是默认的持久化@RabbitListener(bindings =@QueueBinding( value = @Queue(name = "fanout.hamll.query1",durable = "true"), exchange = @Exchange(name = "fanout.hamll", type = ExchangeTypes.FANOUT,durable = "true") ))
队列持久化
可以在控制台创建的时候设置为
Durable
就是持久化模式,Transient
就是临时模式。
如果是代码注解开发可以在参数列表通过durable为true指定持久化或者不持久化,队列一般都是默认不持久化,需要手动设置
@RabbitListener(bindings =@QueueBinding( value = @Queue(name = "fanout.hamll.query1",durable = "true"), exchange = @Exchange(name = "fanout.hamll", type = ExchangeTypes.FANOUT,durable = "true") ))
消息持久化
消费者发送的消息默认情况下都是临时的消息,在MQ重启的时候消息会丢失。而开启持久化之后消息会被永久保存在MQ,即使MQ服务器挂了也不会丢失。
在发送消息的时候会由java的api将我们传入的object转换成Message对象,默认是不会帮我们持久化的,MQ重启消息就没了
想要持久化也很简单,就是我们自己来创建Message对象然后开启持久化
//消息持久化 Message message = MessageBuilder.withBody("hello rabbitmq".getBytes(StandardCharsets.UTF_8)) // 消息内容 .setDeliveryMode(MessageDeliveryMode.PERSISTENT) // 消息持久化 .build();// 构建消息 rabbitTemplate.convertAndSend("pay.direct", "pay.su1ccess", message, correlationData);