首页 > 其他分享 >Spring AMQP-保证消息的可靠性

Spring AMQP-保证消息的可靠性

时间:2025-01-08 16:02:07浏览次数:3  
标签:可靠性 持久 AMQP Spring springframework 发送 消息 org import

1. 消息发送者的可靠性

保证消息的可靠性可以通过发送者重连发送者确认来实现


发送者重连

发送者重连机制就是在发送信息的时候如果连接不上mq不会立即结束,而是会在一定的时间间隔之类进行重新连接,连接的次数和时间都是由我们在配置文件中指定的,具体的就是通过retry属性来

spring: 
  rabbitmq: # rabbitmq配置 
    host: localhost # rabbitmq地址
    port: 5672 # rabbitmq端口
    virtual-host: /hmall # 虚拟主机
    username: hmall # 用户名
    password: 123 # 密码
    template: # 消息发送相关配置
      retry: # 重试相关配置
        enabled: true # 启用重试
        max-attempts: 3 # 最大重试次数
        initial-interval: 1000 # 初始重试间隔
        multiplier: 2 # 重试间隔倍数
        max-interval: 10000 # 最大重试间隔

测试

将MQ关闭,然后随便写一个消息发送案例,就能够看见效果

package com.itheima.publisher;

import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

import java.util.HashMap;
import java.util.Map;

@SpringBootTest
public class PublisherApplicationTest {
    @Autowired
    RabbitTemplate rabbitTemplate;

    @Test
    public void test() {
        String exchangeName = "fanout.hamll";
        Map map=new HashMap();
        map.put("name","hamll");
        map.put("age",18);
        map.put("sex","男");
        rabbitTemplate.convertAndSend("fanout.hamll.query2", map);
    }

}

发送者确认

在一般的情况下,消息很少会出现问题,但是还是有出现问题的可能性,比如:

1. 消息发送后无法路由键找不到相关队列

2. 绑定的交换机不存在

3. 消息发送出现异常

针对这一情况,MQ为我们提供了多种消息确认机制,比如:Publisher Return、Publisher Confirm

spring:
  rabbitmq:
    publisher-confirm-type: correlated # 开启publisher confirm机制,并设置confirm类型
    publisher-returns: true # 开启publisher return机制

Publisher Return

着重于绑定的队列、交换机、路由是否成功,并且能够监听到相关的信息,比如交换机、路由、提示等

在使用的过程总需要一个全局的配置类

package com.itheima.publisher.config;

import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Configuration;

import javax.annotation.PostConstruct;

/**
 * RabbitMQ 配置类,用于配置 RabbitTemplate 的回调函数。
 */
@Slf4j // 使用 Lombok 注解引入日志记录器
@AllArgsConstructor // 使用 Lombok 注解生成全参构造函数
@Configuration // 标记为 Spring 配置类
public class MqConfig {
    private final RabbitTemplate rabbitTemplate; // 注入 RabbitTemplate 实例

    /**
     * 初始化方法,在 Bean 创建后立即执行。
     * 设置 RabbitTemplate 的返回消息回调函数。
     */
    @PostConstruct // 标记为初始化方法
    public void init(){
        rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
            /**
             * 当消息被 broker 返回时触发的回调函数。
             * @param returned 返回的消息对象
             */
            @Override
            public void returnedMessage(ReturnedMessage returned) {
                log.error("触发return callback,"); // 记录错误日志,表示触发了返回回调
                log.debug("exchange: {}", returned.getExchange()); // 记录交换机名称
                log.debug("routingKey: {}", returned.getRoutingKey()); // 记录路由键
                log.debug("message: {}", returned.getMessage()); // 记录消息内容
                log.debug("replyCode: {}", returned.getReplyCode()); // 记录回复代码
                log.debug("replyText: {}", returned.getReplyText()); // 记录回复文本
            }
        });
    }
}


Publisher Confirm

适用于更加复杂复杂的业务,MQ通过方法回调来告诉发送者消息是否发送成功,提供了两个方法的回调:

1. onFailure 在发送消息出现异常的时候会被捕获、并且接收了一个异常对象来返回异常信息。

2. onSuccess 在发送的时候如果成功被MQ接收到就会触发、onSuccess通常会接收两个参数作为参数(CorrelationData.Confirm )、Confirm有一个IsAck()方法来表示是否被确认:

  • true:表示消息被成功确认(ack),即消息已经被 RabbitMQ 正确接收并处理。
  • false:表示消息未被确认(nack),可能是因为 RabbitMQ 内部错误或其他原因导致消息无法被正确处理
package com.itheima.publisher;

import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.util.concurrent.ListenableFutureCallback;

/**
 * 测试类,用于验证消息发布功能。
 */
@SpringBootTest
@Slf4j // 使用 Lombok 注解引入日志记录器
public class PublisherApplicationTest {

    @Autowired
    private RabbitTemplate rabbitTemplate; // 注入 RabbitTemplate 实例

    /**
     * 测试方法,验证消息发布功能。
     * @throws InterruptedException 可能抛出的中断异常
     */
    @Test
    public void test() throws InterruptedException {
        // 创建 CorrelationData 对象,用于唯一标识消息
        CorrelationData correlationData = new CorrelationData();

        // 设置回调函数,处理消息发送的结果
        correlationData.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {
            /**
             * 消息发送失败时的回调函数。
             * @param ex 异常信息
             */
            @Override
            public void onFailure(Throwable ex) {
                // 记录消息发送失败的异常信息
                log.info("消息发送失败、异常!{}", ex.getMessage());
            }

            /**
             * 消息发送成功时的回调函数。
             * @param result 确认结果
             */
            @Override
            public void onSuccess(CorrelationData.Confirm result) {
                // 检查消息是否被确认
                if (result.isAck()) {
                    // 记录消息发送成功的日志
                    log.info("消息发送成功");
                } else {
                    // 记录消息发送失败的原因
                    log.info("消息发送失败!{}", result.getReason());
                }
            }
        });

        // 发送消息到指定的交换机和路由键
        rabbitTemplate.convertAndSend("pay.direct", "pay.success", "hello rabbitmq", correlationData);
    }
}

数据持久化

默认情况下MQ的数据都是临时数据,MQ故障重启后消息都会丢失,为了保证消息的可靠性就需要做持久化操作,MQ的持久化包括:

1. 交换机持久化

2. 队列持久化

3. 消息持久化


交换机持久化

可以在控制台创建的时候设置为Durable就是持久化模式,Transient就是临时模式。


如果是代码注解开发可以在参数列表通过durable为true指定持久化或者不持久化,交换机通过注解创建一般都是默认的持久化

 @RabbitListener(bindings =@QueueBinding(
            value = @Queue(name = "fanout.hamll.query1",durable = "true"),
            exchange = @Exchange(name = "fanout.hamll", type = ExchangeTypes.FANOUT,durable = "true")
    ))

队列持久化

可以在控制台创建的时候设置为Durable就是持久化模式,Transient就是临时模式。


如果是代码注解开发可以在参数列表通过durable为true指定持久化或者不持久化,队列一般都是默认不持久化,需要手动设置
 

@RabbitListener(bindings =@QueueBinding(
            value = @Queue(name = "fanout.hamll.query1",durable = "true"),
            exchange = @Exchange(name = "fanout.hamll", type = ExchangeTypes.FANOUT,durable = "true")
    ))

消息持久化

消费者发送的消息默认情况下都是临时的消息,在MQ重启的时候消息会丢失。而开启持久化之后消息会被永久保存在MQ,即使MQ服务器挂了也不会丢失。

在发送消息的时候会由java的api将我们传入的object转换成Message对象,默认是不会帮我们持久化的,MQ重启消息就没了

想要持久化也很简单,就是我们自己来创建Message对象然后开启持久化

 //消息持久化
        Message message = MessageBuilder.withBody("hello rabbitmq".getBytes(StandardCharsets.UTF_8)) // 消息内容
                .setDeliveryMode(MessageDeliveryMode.PERSISTENT) // 消息持久化
                .build();// 构建消息
        rabbitTemplate.convertAndSend("pay.direct", "pay.su1ccess", message, correlationData);

标签:可靠性,持久,AMQP,Spring,springframework,发送,消息,org,import
From: https://blog.csdn.net/a1241436267/article/details/144989603

相关文章

  • springboot水环境检测系统-计算机设计毕业源码43284
    目录1绪论1.1选题背景1.2选题的目的意义1.3论文结构与章节安排2系统分析2.1.1技术可行性分析2.1.2 经济可行性分析2.1.3法律可行性分析2.2系统流程分析2.2.1添加信息流程2.2.2修改信息流程2.2.3删除信息流程2.3 系统功能分析2.3.1功能......
  • springboot城乡居民医疗信息管理系统-计算机设计毕业源码70573
    目 录摘要Abstract绪论1.1 选题背景1.2研究内容1.3本文的组织结构2相关技术介绍2.1MySQL数据库2.2Java编程语言2.3SpringBoot框架介绍3 系统需求分析与设计3.1可行性分析3.1.1技术可行性分析3.1.2经济可行性分析3.1.3法律可行性分析......
  • 【Spring Boot开发】Spring Boot基于事件实现接口请求的性能监控
    前言在Spring框架中,监控接口请求的性能可以通过ServletRequestHandledEvent事件实现。这种方法简单有效,能够帮助开发者实时跟踪和分析请求的性能。它在请求处理完成后发布,包含了请求的详细信息,如客户端地址、请求URL、请求方法和处理时间。使用这个事件可以轻松地监控和记录每个......
  • 基于Spring Boot的动物之家平台
    目录项目介绍系统操作流程 系统架构设计演示视频系统功能实现代码实现 推荐项目项目开发总结为什么选择我 源码获取博主介绍:✌全网粉丝30W+,csdn特邀作者、博客专家、CSDN新星计划导师、Java领域优质创作者,博客之星、掘金/华为云/阿里云/InfoQ等平台优质作者......
  • 基于Spring Boot的二手在线交易平台
    目录项目介绍系统操作流程 系统架构设计演示视频系统功能实现代码实现 推荐项目项目开发总结为什么选择我 源码获取博主介绍:✌全网粉丝30W+,csdn特邀作者、博客专家、CSDN新星计划导师、Java领域优质创作者,博客之星、掘金/华为云/阿里云/InfoQ等平台优质作者......
  • 基于Spring Boot的飞天外卖配送系统
    目录项目介绍系统操作流程 系统架构设计演示视频系统功能实现代码实现 推荐项目项目开发总结为什么选择我 源码获取博主介绍:✌全网粉丝30W+,csdn特邀作者、博客专家、CSDN新星计划导师、Java领域优质创作者,博客之星、掘金/华为云/阿里云/InfoQ等平台优质作者......
  • SpringMVC详解(全网最全)
    起源1.三层架构:一个Servlet只能处理一个请求,耦合度高,复用性差,整页刷新用户体验差2.MVC模式:部分解耦但后端仍负责View层,高并发有限3.前后端分离:异步调用,复用性强,支持复杂交互,用户体验性强概念SpringMVC是Spring框架中的一个模块,用于构建Web的MVC架构,提供了......
  • SpringBoot汽车服务系统p79hp(程序+源码+数据库+调试部署+开发环境)
    本系统(程序+源码+数据库+调试部署+开发环境)带论文文档1万字以上,文末可获取,系统界面在最后面。系统程序文件列表用户,汽车类型,汽车信息,汽车品牌,汽车颜色开题报告内容一、研究背景与意义随着汽车保有量的持续增长,消费者对汽车服务的需求日益多样化与个性化。然而,传统汽车......
  • SpringBoot企业员工自助管理系统01ncs(程序+源码+数据库+调试部署+开发环境)
    本系统(程序+源码+数据库+调试部署+开发环境)带论文文档1万字以上,文末可获取,系统界面在最后面。系统程序文件列表员工,考勤信息,工作审批,费用报销,办公资源开题报告内容一、课题背景与意义随着信息技术的飞速发展,企业对于内部管理的信息化需求日益增强。传统的人工管理方式......
  • RabbitMQ高级篇之MQ可靠性 Lazy Queue
    文章目录数据持久化的背景和挑战引入惰性队列(LazyQueue)惰性队列的特点惰性队列的潜在问题RabbitMQ中的惰性队列实现如何创建惰性队列(LazyQueue)惰性队列的性能测试惰性队列的优势惰性队列的适用场景小结关键点总结数据持久化的背景和挑战持久化确保了即使Rabbit......