首页 > 其他分享 >springBoot-RabbitMQ 高级特性(保姆级教程,一步一步带你熟悉RabbitMQ 相关高级特性)

springBoot-RabbitMQ 高级特性(保姆级教程,一步一步带你熟悉RabbitMQ 相关高级特性)

时间:2024-11-15 13:19:59浏览次数:3  
标签:队列 springframework 高级 特性 交换机 消息 org import RabbitMQ

话不多说,看项目整体架构

RabbitMQ 高级特性 保姆级教程
在这里插入图片描述
好了,下面县开始贴生产者代码:publisher

父依赖:


<parent>
        <artifactId>spring-boot-starter-parent</artifactId>
        <groupId>org.springframework.boot</groupId>
        <version>2.7.18</version>
    </parent>


    <dependencies>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
            <!--        <version>2.7.18</version>-->
        </dependency>


        <!--rabbitmq-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <!--mysql-->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.33</version>
            <scope>runtime</scope>
        </dependency>
        <!--lombok-->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <!--jdbc-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-jdbc</artifactId>
        </dependency>
        <!--fastjson-->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.17</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <scope>test</scope>
        </dependency>

    </dependencies>

publisher yml文件:


server:
  port: 8091
spring:
  datasource:
    driver-class-name: com.mysql.cj.jdbc.Driver
    url: jdbc:mysql://127.0.0.1:3306/rabbitmq?useUnicode=true&characterEncoding=UTF-8&useSSL=false&serverTimezone=GMT%2B8&allowMultiQueries=true
    username: SQL用户名
    password: SQL密码
  rabbitmq:
    publisher-confirm-type: correlated # 开启确认模式
    # 开启publisher-confirm,且选择correlated:【异步】回调,定义ConfirmCallback,MQ返回结果时会回调这个ConfirmCallback
    publisher-returns: true # 开启publish-return功能  开启回退模式
    template:
      mandatory: true # 定义当消息从交换机路由到队列失败时的策略。【true,则调用ReturnCallback;false:则直接丢弃消息】
    host: 你自己MQ IP
    virtual-host: vHost1
    port: 5672
    username: 你自己用户名
    password: 密码
logging:
  pattern:
    dateformat: HH:mm:ss:SSS
  level:
    com.zy: debug

在上面的MySQL连接可以不要,那是为了方便我自己后续使用,所以添加,如果删除,记得删除依赖中的MYSQL 依赖,不然maven检查会报错


CommonConfig:


package com.zy.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.annotation.Configuration;

/**
 * @Author: zy
 * @Date: 2024-11-14-12:45
 * @Description: 定义Return回退:
 * 说明:因为在yml配置文件中定义消息路由失败时的策略为true,所以当消息从交换机路由到队列失败时,会调用ReturnCallback
 * <p>
 * 每个RabbitTemplate只能配置一个ReturnCallback,因此需要在项目加载时添加配置:
 * 修改publisher服务,添加一个【配置类】:
 * 位置:config/commic配置类
 * <p>
 * 如何保证在项目加载时添加配置?
 * 1、实现ApplicationContextAware(实现了ApplicationContextAware接口的实现类,在Spring容器的Bean工厂创建完毕后会通知该实现类)
 * 2、此时,该实现/配置类有了Spring容器的Bean工厂类;就可以获取并设置ReturnCallback(Spring容器的Bean对象)
 * 3、开始配置ReturnCallback;
 * <p>
 * ReturnCallback的回调函数:当消息成功发送到交换机,但是没有成功发送到消息队列时,回退到回调函数,应该如何处理?就是回调函数里面的内容
 */
@Slf4j
@Configuration
public class CommonConfig implements ApplicationContextAware {
    //实现了ApplicationContextAware接口的实现类,在Spring容器的Bean工厂创建完毕后会通知该实现类
    //有了Bean工厂类,然后就可以获取并设置ReturnCallback(Spring容器的Bean对象)
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        // (从Spring容器中)获取RabbitTemplate对象
        RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
        // 配置ReturnCallback
        /**
         * ReturnCallback回调函数:当消息成功发送到交换机,
         * 但是没有成功发送到消息队列时,回退到回调函数,应该如何处理?就是回调函数里面的内容)
         */
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
            // 判断是否是延迟消息
            Integer receivedDelay = message.getMessageProperties().getReceivedDelay();
            if (receivedDelay != null && receivedDelay > 0) {
                //判断延迟值非空且大于0==》是一个延迟消息,忽略这个错误提示
                return;
            }

            // 记录日志
            log.error("消息发送到队列失败,响应码:{}, 失败原因:{}, 交换机: {}, 路由key:{}, 消息: {}", replyCode, replyText, exchange,
                routingKey, message.toString());
            // 如果有需要的话,重发消息

        });

        //        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnsCallback() {
        //            @Override
        //            public void returnedMessage(ReturnedMessage returnedMessage) {
        //
        //            }
        //        });
    }
}


publisher app 启动类:

package com.zy;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

/**
 * @Author: zy
 * @Date: 2024-11-14-12:34
 * @Description:
 */
@SpringBootApplication
public class RabbitMqApp {

    public static void main(String[] args) {
        SpringApplication.run(RabbitMqApp.class, args);
    }
}




测试类:

package com.zy;

import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import java.util.UUID;

/**
 * @Author: zy
 * @Date: 2024-11-14-12:50
 * @Description:
 */
@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void testSendMessage1SimpleQueue() throws InterruptedException {
        // 1.准备消息
        String message = "hello, spring amqp!";


        // 3.发送消息(这里如果没有绑定交换机和队列关系等,可以去管控台绑定,也可以在消费者的配置类中声明)
        rabbitTemplate.convertAndSend("amq.topic", "simple.test", message);
    }

    @Test
    public void testSendMessage2SimpleQueue() throws InterruptedException {
        // 1.准备消息
        String message = "hello  2, spring amqp!";
        // 2.准备CorrelationData(消息ID需要封装到CorrelationData)
        // 2.1.消息ID,确认机制发送消息时,需要给每个消息设置一个全局唯一id,以区分不同消息,避免ack冲突
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        // 2.2.准备ConfirmCallback(Future是对将来的一种处理的封装)(Future.addCallback)
        correlationData.getFuture().addCallback(result -> {
            System.out.println("消息ID:" + correlationData.getId());
            // 判断结果
            if (result.isAck()) {
                // ACK
                System.out.println("ACK");
                log.debug("消息成功投递到交换机!消息ID: {}", correlationData.getId());
            } else {
                // NACK
                System.out.println("NACK");
                log.error("消息投递到交换机失败!消息ID:{}", correlationData.getId());
                // 重发消息
            }
        }, ex -> {
            // 记录日志
            System.out.println("ERROR:" + ex.getMessage());
            log.error("消息发送失败!", ex);
            // 重发消息
        });
        // 3.发送消息(这里如果没有绑定交换机和队列关系等,可以去管控台绑定,也可以在消费者的配置类中声明)
        rabbitTemplate.convertAndSend("amq.topic", "simple.test", message, correlationData);
    }

    /**
     * confirm:消息未成功到达交换机——返回nack
     * 故意写错交换机名称
     * @throws InterruptedException
     */
    @Test
    public void testSendMessage3SimpleQueue() throws InterruptedException {
        // 1.准备消息
        String message = "hello  2, spring amqp!";
        // 2.准备CorrelationData(消息ID需要封装到CorrelationData)
        // 2.1.消息ID,确认机制发送消息时,需要给每个消息设置一个全局唯一id,以区分不同消息,避免ack冲突
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        // 2.2.准备ConfirmCallback(Future是对将来的一种处理的封装)(Future.addCallback)
        correlationData.getFuture().addCallback(result -> {
            System.out.println("消息ID:" + correlationData.getId());
            // 判断结果
            if (result.isAck()) {
                // ACK
                System.out.println("ACK");
                log.debug("消息成功投递到交换机!消息ID: {}", correlationData.getId());
            } else {
                // NACK
                System.out.println("NACK");
                log.error("消息投递到交换机失败!消息ID:{}", correlationData.getId());
                // 重发消息
            }
        }, ex -> {
            // 记录日志
            System.out.println("ERROR:" + ex.getMessage());
            log.error("消息发送失败!", ex);
            // 重发消息
        });
        // 3.发送消息(这里如果没有绑定交换机和队列关系等,可以去管控台绑定,也可以在消费者的配置类中声明)
        rabbitTemplate.convertAndSend("aamq.topic", "simple.test", message, correlationData);
    }

    /**
     * 测试3:消息发送到了交换机但没有发送到队列——返回ack,但是return回退
     * 故意将队列名字写错(交换机不存在绑定该队列)
     * 返回ACK,及路由失败原因.
     * @throws InterruptedException
     */
    @Test
    public void testSendMessage4SimpleQueue() throws InterruptedException {
        // 1.准备消息
        String message = "hello  2, spring amqp!";
        // 2.准备CorrelationData(消息ID需要封装到CorrelationData)
        // 2.1.消息ID,确认机制发送消息时,需要给每个消息设置一个全局唯一id,以区分不同消息,避免ack冲突
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        // 2.2.准备ConfirmCallback(Future是对将来的一种处理的封装)(Future.addCallback)
        correlationData.getFuture().addCallback(result -> {
            System.out.println("消息ID:" + correlationData.getId());
            // 判断结果
            if (result.isAck()) {
                // ACK
                System.out.println("ACK");
                log.debug("消息成功投递到交换机!消息ID: {}", correlationData.getId());
            } else {
                // NACK
                System.out.println("NACK");
                log.error("消息投递到交换机失败!消息ID:{}", correlationData.getId());
                // 重发消息
            }
        }, ex -> {
            // 记录日志
            System.out.println("ERROR:" + ex.getMessage());
            log.error("消息发送失败!", ex);
            // 重发消息
        });
        // 3.发送消息(这里如果没有绑定交换机和队列关系等,可以去管控台绑定,也可以在消费者的配置类中声明)
        rabbitTemplate.convertAndSend("amq.topic", "a.simple.test", message, correlationData);
    }


}



在上面的各类代码中都有详细的注释以及注解,在测试类中也针对相应的问题进行了Mock

Consumer:

在这里插入图片描述

server:
  port: 8090
spring:
  datasource:
    driver-class-name: com.mysql.cj.jdbc.Driver
    url: jdbc:mysql://127.0.0.1:3306/rabbitmq?useUnicode=true&characterEncoding=UTF-8&useSSL=false&serverTimezone=GMT%2B8&allowMultiQueries=true
    username: MYSQL用户名
    password: 密码
  rabbitmq:
    publisher-confirm-type: correlated # 开启确认模式
    # 开启publisher-confirm,且选择correlated:【异步】回调,定义ConfirmCallback,MQ返回结果时会回调这个ConfirmCallback
    publisher-returns: true # 开启publish-return功能  开启回退模式
    template:
      mandatory: true # 定义当消息从交换机路由到队列失败时的策略。【true,则调用ReturnCallback;false:则直接丢弃消息】
    host: MQ 机器IP
    virtual-host: vHost1# 用你自己的host 
    port: 5672
    username: 用户名
    password: 密码
    # yml配置文件中配置消息确认模式】:
    listener:
      simple:
        acknowledge-mode: auto  #由spring监测listener代码是否出现异常,没有异常则返回ack;抛出异常则返回nack
  # 我们可以利用Spring的retry机制,在消费者出现异常时利用本地重试,
  #而不是无限制的requeue到mq队列(不返回ack,也不返回nack),
  # 而是可以自己设置重试的次数
  #(如果在重试n次后仍然失败,那么后面在继续重入队大概率也会失败,那么就直接扔掉,
  #不再重入队,此时,Spring会返回ack)
        retry: #  Spring消费者失败重试
          enabled: true # 【开关】开启消费者失败重试
          initial-interval: 1000 # 初识的失败等待时长为1秒(第一次失败后1s重试)
          multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval(举例:倍数*第一次等待时长1s,这样子永远都是1s)
          #但是如果设置为2,下次等待时长为上次的2倍,因此等待时长依次为1、2、4、8、16....
          max-attempts: 4 # 最大重试次数
          stateless: true # (默认为true)true无状态;false有状态【如果业务中包含事务,这里改为false】
          #(备注:如果设置为false,那么Spring在重试的时候保留事务——消耗性能,所以没有事务时设置为true提升性能)
          max-interval: 10000 # 最大等待时长,大于此时长的一律按最大时长来计算

logging:
  pattern:
    dateformat: HH:mm:ss:SSS
  level:
    com.zy: debug


CommonConfig:

package com.zy.common;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.ExchangeBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @Author: zy
 * @Date: 2024-11-14-13:23
 * @Description: 声明完队列和交换机,可以在RabbitMQ控制台看到持久化的交换机和队列都会带上D的标示:
 */
@Configuration
public class CommonConfig {
    /**
     * 三个参数:交换机名称、是否持久化、
     * 当没有queue与其绑定时是否自动删除
     * (事实上,默认情况下,由SpringAMQP声明的交换机和队列都是持久化的)
     */
    @Bean
    public DirectExchange simpleDirect() {

        /*
         * 默认为return new DirectExchange("simple.direct",true,false);
         */
        return new DirectExchange("simple.direct");
    }

    /**
     * 使用QueueBuilder构建队列,durable就是持久化的
     */
    @Bean
    public Queue simpleQueue() {
        //new Queue("");默认代码为public Queue(String name) { this(name, true, false, false);}
        return QueueBuilder.durable("simple.queue").build();
    }

    // 【声明普通的 simple.queue队列,并且为其指定死信交换机:dl.direct】  死信配置
    @Bean
    public Queue simpleQueue2() {
        return QueueBuilder.durable("simple.dl.queue") // 指定队列名称,并持久化
            .deadLetterExchange("dl.direct") // 【指定死信交换机】
            .build();
    }

    // 声明死信交换机 dl.direct
    @Bean
    public DirectExchange dlExchange() {
        return new DirectExchange("dl.direct", true, false);
    }

    // 声明存储死信的队列 dl.queue
    @Bean
    public Queue dlQueue() {
        return new Queue("dl.queue", true);
    }

    // 将死信队列 与 死信交换机绑定
    @Bean
    public Binding dlBinding() {
        return BindingBuilder.bind(dlQueue()).to(dlExchange()).with("simple");
    }

    /**
     * 延时队列   需要安装插件,所以目前暂时无法测试
     *
     * @return
     */
    @Bean
    public DirectExchange delayedExchange() {
        return ExchangeBuilder.directExchange("delay.direct").delayed().durable(true).build();
    }

    @Bean
    public Queue delayedQueue() {
        return new Queue("delay.queue");
    }

    @Bean
    public Binding delayedBinding() {
        return BindingBuilder.bind(delayedQueue()).to(delayedExchange()).with("delay");
    }
}


TTLMessageConfig:


package com.zy.common;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @Author: zy
 * @Date: 2024-11-14-14:30
 * @Description: 声明交换机、队列,绑定关系、为队列指定TTL超时时间
 * <p>
 * 新建一个配置类(便于管理),配置类记得添加@Configuration
 * 位置:consumer/config/TTLMessageConfig类
 * 要给队列设置超时时间,需要在声明队列时配置ttl属性
 */
@Configuration
public class TTLMessageConfig {

    //声明队列ttl.queue,设置超时时间
    @Bean
    public Queue ttlQueue() {
        return QueueBuilder.durable("1-ttl.queue") // 指定队列名称,并持久化
            .ttl(10000) // 设置队列的超时时间,10秒
            .deadLetterExchange("1-dl.ttl.direct") // 队列指定死信交换机,即消息超时就投到这个交换机
            .deadLetterRoutingKey("1-dl")//消息到死信交换机的RoutingKey
            .build();
    }

    //正常的声明交换机ttl.direct
    @Bean
    public DirectExchange ttlExchange() {
        return new DirectExchange("1-ttl.direct");
    }

    //正常的绑定交换机和队列,routingkey为ttl
    @Bean
    public Binding ttlBinding() {
        return BindingBuilder.bind(ttlQueue()).to(ttlExchange()).with("1-ttl");
    }
}


ErrorMessageConfig:


package com.zy.errordeal;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @Author: zy
 * @Date: 2024-11-14-13:57
 * @Description: 处理失败消息的【交换机和队列】
 * 位置:在consumer服务中的配置类config/ErrorMessageConfig.java
 * 作用:声明交换机、队列、绑定关系
 * <p>
 * 绑定关系:交换机error.direct–routingkey(error)–》error.queue
 * <p>
 * 失败策略:
 * 1、参数rabbitTemplate(Spring容器自动注入)
 * 2、通过rabbitTemplate将消息发送到(处理失败消息的)交换机(routingKey为error)
 * <p>
 * 这里的RepublishMessageRecoverer的作用:
 * 当消费者的消息失败重试次数用尽后,将失败的消息【丢弃给指定的error交换机的error队列】
 */
@Configuration
public class ErrorMessageConfig {
    @Bean
    public DirectExchange errorMessageExchange() {
        return new DirectExchange("error.direct");
    }

    @Bean
    public Queue errorQueue() {
        return new Queue("error.queue", true);
    }

    @Bean
    public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange) {
        return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");
    }

    /**
     * 参数 Spring自动注入的rabbitTemplate
     *
     * @param rabbitTemplate
     * @return
     *
     * Republishing failed message to
     * exchange 'error.direct' with routing key error
     */
    @Bean
    public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate) {
        return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
    }
}


SpringRabbitListener:


package com.zy.listener;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * @Author: zy
 * @Date: 2024-11-14-13:39
 * @Description:
 */
@Slf4j
@Component
public class SpringRabbitListener {

    /**
     * auto模式下:由spring监测listener代码是否出现异常,
     * 没有异常则返回ack;抛出异常则返回nack
     * <p>
     * 【问题】:当消费者出现异常后,消息会不断requeue(重入队)到队列,
     * 再重新发送给消费者,然后再次异常,再次requeue,
     * 无限循环,导致mq的消息处理飙升,
     * 带来不必要的压力:(这里测试一条数据就已经达到 3000条/s 了)
     *
     * @param msg
     */
    @RabbitListener(queues = "simple.queue")
    public void listenSimpleQueue(String msg) {
        //none模式下,消费者在这里接收到消息后,消息就从队列中被删除了
        log.debug("消费者接收到simple.queue的消息:【" + msg + "】");
        System.out.println(1 / 0);//抛出异常、后面就不会执行业务代码

        log.info("消费者处理消息成功!");//模拟业务代码
    }

    /**
     * 死信监听
     *
     * @param msg
     */
    @RabbitListener(bindings = @QueueBinding(value = @Queue(name = "1-dl.ttl.queue", durable = "true"), exchange = @Exchange(name = "1-dl.ttl.direct"), key = "1-dl"))
    public void listenDlQueue(String msg) {
        log.info("接收到 dl.ttl.queue的延迟消息:{}", msg);
    }

    /**
     * 延时对列监听
     * @param msg
     */
    @RabbitListener(bindings = @QueueBinding(value = @Queue(name = "delay.queue", durable = "true"), exchange = @Exchange(name = "delay.direct", delayed = "true"), key = "delay"))
    public void listenDelayExchange(String msg) {
        log.info("消费者接收到了delay.queue的延迟消息:" + msg);
    }

}


ConsumerApp:


package com.zy;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

/**
 * @Author: zy
 * @Date: 2024-11-14-13:22
 * @Description:
 */
@SpringBootApplication
public class ConsumerApp {

    /**
     * 问题:在上面的测试中,达到最大重试次数后,消息会被丢弃,这是由Spring内部机制决定的。
     * 但是,有些数据特别重要,我们不希望任何消息被丢弃,此时,我们应该如何实现?
     *
     * 在开启重试模式后,重试次数耗尽,如果消息依然失败,则需要有MessageRecovery接口来处理,它包含三种不同的实现:
     *
     * RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,【丢弃消息】【默认】就是这种方式
     * ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队(Immediate立刻重入队)(但是频率比没有配置消费失败重载机制低一些)
     * RepublishMessageRecoverer(推荐):重试耗尽后,将失败消息投递到指定的交换机
     * @param args
     */
    public static void main(String[] args) {
        SpringApplication.run(ConsumerApp.class, args);
    }
}

测试类:

package com.zy;

import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import java.nio.charset.StandardCharsets;
import java.util.UUID;

/**
 * @Author: zy
 * @Date: 2024-11-14-13:32
 * @Description:
 */
@SpringBootTest
@RunWith(SpringRunner.class)
@Slf4j
public class ConsumerTest {

    @Autowired
    RabbitTemplate rabbitTemplate;

    /**
     * RabbitMQ是阅后即焚机制,RabbitMQ确认消息被消费者消费后会立刻删除。
     * 此时,如果消费者还没有处理消息,然后消费者挂掉了,就会导致消息丢失。
     */
    @Test
    public void testDurableMessage() {
        // 1.准备消息
        Message message = MessageBuilder.withBody("hello, spring".getBytes(StandardCharsets.UTF_8)).setDeliveryMode(
            MessageDeliveryMode.PERSISTENT).build();
        // 2.发送消息
        rabbitTemplate.convertAndSend("simple.queue", message);
    }

    /**
     * 1)RabbitMQ投递消息给消费者
     * 2)消费者获取消息后,【返回ACK给RabbitMQ】
     * 3)RabbitMQ删除消息
     * 4)消费者宕机,消息尚未处理
     * <p>
     * 三种方式
     * manual:手动ack,需要在业务代码结束后,调用api发送ack。
     * auto:自动ack,由spring监测listener代码是否出现异常,没有异常则返回ack;抛出异常则返回nack
     * none:关闭ack,MQ假定消费者获取消息后会成功处理,因此消息投递后立即被删除(此时,消息投递是不可靠的,可能丢失)
     */

    @Test
    public void testTTLMsg() {
        // 创建消息
        Message message = MessageBuilder.withBody("hello, ttl message".getBytes(StandardCharsets.UTF_8)).setExpiration(
            "5000").build();
        // 消息ID,需要封装到CorrelationData中
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        // 发送消息
        System.out.println("消息ID:" + correlationData.getId());
        rabbitTemplate.convertAndSend("1-ttl.direct", "1-ttl", message, correlationData);
        log.debug("发送消息成功");
    }

    /**
     * 使用DelayExchange-代码方式
     * DelayExchange需要将一个交换机声明为delayed类型。当我们发送消息到delayExchange时,流程如下:
     * <p>
     * 接收消息
     * 判断消息是否具备x-delay属性
     * 如果有x-delay属性,说明是延迟消息,持久化到硬盘,读取x-delay值,作为延迟时间
     * 返回routing not found结果给消息发送者
     * x-delay时间到期后,重新投递消息到指定队列
     *
     * @throws InterruptedException 异常
     */
    @Test
    public void testSendDelayMessage() throws InterruptedException {
        // 1.准备消息
        Message message =
            MessageBuilder.withBody("hello, delayed test ttl messsage".getBytes(StandardCharsets.UTF_8)).setDeliveryMode(
                MessageDeliveryMode.PERSISTENT).setHeader("x-delay", 5000).build();
        // 2.准备CorrelationData
        //消息ID
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        System.out.println("消息ID:" + correlationData.getId());
        // 3.发送消息
        rabbitTemplate.convertAndSend("delay.direct", "delay", message, correlationData);

        log.info("发送消息成功");
    }

}

基于上述代码,概述:

消息队列在使用过程中,面临着很多实际问题需要思考:

消息可靠性问题:如何确保发送的消息至少被消费—次
延迟消息问题:如何实现消息的延迟投递
消息堆积问题:如何解决数百万消息堆积,无法及时消费的问题
高可用问题:如何避免单点的MQ故障而导致的不可用问题



一、消息可靠性
背景/需求:消息从发送,到消费者接收,会经历多个过程:
其中的每一步都可能导致消息丢失,常见的丢失原因包括:

发送时丢失:

生产者发送的消息【未送达exchange】——返回nack(消息确认模式)
消息【到达exchange】——返回ack(消息确认模式)
到达queue后,MQ宕机,queue将消息丢失
——返回ACK,及路由失败原因(回退模式)

consumer接收到消息后还未消费就宕机——消息持久化

1、【生产者】消息确认
RabbitMQ提供了publisher confirm机制来避免消息发送到MQ过程中丢失。这种机制必须给每个消息指定一个唯一ID。消息发送到MQ以后,会返回一个结果给发送者,表示消息是否处理成功。

返回结果有两种方式:

publisher-confirm,发送者确认
消息成功投递到交换机,返回ack
消息未投递到交换机,返回nack
publisher-return,发送者回执
消息投递到交换机了,但是没有路由到队列。返回ACK,及路由失败原因。
注意:确认机制发送消息时,需要给每个消息设置一个全局唯一id,以区分不同消息,避免ack冲突

1.1 修改application.yml配置文件,添加下面的内容:
位置:生产者/publisher服务
目的:
1、开启消息确认模式
2、开启消息回退(并设置消息路由到队列失败时,回退消息给回调接口)



CorrelationData的作用:
1、消息ID需要封装到CorrelationData
2、correlationData.getFuture().addCallback(…)是一个回调函数:决定了每个业务处理confirm成功或失败的逻辑




2、消息持久化(了解)
背景/需求:生产者确认可以确保消息投递到RabbitMQ的队列中,但是消息发送到RabbitMQ以后,如果消息队列突然宕机,也可能导致消息丢失。
(因为消息队列默认是内存存储)
(发送到消息队列成功+消息队列突然宕机=消息丢失)
要想确保消息在RabbitMQ中安全保存,必须开启消息持久化机制(写入到磁盘中)

注:SpringAMQP默认是进行持久化(包括声明队列、交换机、发送消息)(备注:通过管控台创建的默认是非持久化的)

那么,下面学的消息持久化有什么用呢?持久化毕竟是写磁盘,会有一定的性能损耗,不是所有的数据都需要持久化,学了下面的持久化后可以手动将不需要持久化的数据取消持久化

RabbitMQ Management 控制台设置:
说明:
Durable:持久的
Transient:转瞬即逝的

交换机持久化

消息队列持久化



3、【消费者】消息确认
RabbitMQ是阅后即焚机制,RabbitMQ确认消息被消费者消费后会立刻删除。此时,如果消费者还没有处理消息,然后消费者挂掉了,就会导致消息丢失。

场景如下:

1)RabbitMQ投递消息给消费者
2)消费者获取消息后,【返回ACK给RabbitMQ】
3)RabbitMQ删除消息
4)消费者宕机,消息尚未处理
(成功发送到消费者+消费者还没处理消息就宕机了)消息就丢失了。因此消费者返回ACK的时机非常重要。

而SpringAMQP则允许配置三种确认模式:

manual:手动ack,需要在业务代码结束后,调用api发送ack。
auto:自动ack,由spring监测listener代码是否出现异常,没有异常则返回ack;抛出异常则返回nack
none:关闭ack,MQ假定消费者获取消息后会成功处理,因此消息投递后立即被删除(此时,消息投递是不可靠的,可能丢失)
一般,我们都是使用默认的auto即可。

【yml配置文件中配置消息确认模式】:
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: auto #由spring监测listener代码是否出现异常,没有异常则返回ack;抛出异常则返回nack



4、消费失败重试机制
我们可以利用Spring的retry机制,在消费者出现异常时利用本地重试,而不是无限制的requeue到mq队列(不返回ack,也不返回nack),而是可以自己设置重试的次数(如果在重试n次后仍然失败,那么后面在继续重入队大概率也会失败,那么就直接扔掉,不再重入队,此时,Spring会返回ack)
总结:

开启本地重试时,消息处理过程中抛出异常,不会requeue到队列,而是在消费者本地重试
【重试达到最大次数后,Spring会返回ack,消息会被丢弃】
1、本地重试
修改consumer服务的application.yml文件,添加内容:

spring:
rabbitmq:
listener:
simple:
retry: # Spring消费者失败重试
enabled: true # 【开关】开启消费者失败重试
initial-interval: 1000 # 初识的失败等待时长为1秒(第一次失败后1s重试)
multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval(举例:倍数*第一次等待时长1s,这样子永远都是1s)
#但是如果设置为2,下次等待时长为上次的2倍,因此等待时长依次为1、2、4、8、16....
max-attempts: 3 # 最大重试次数
stateless: true # (默认为true)true无状态;false有状态【如果业务中包含事务,这里改为false】
#(备注:如果设置为false,那么Spring在重试的时候保留事务——消耗性能,所以没有事务时设置为true提升性能)
max-interval: 10000 # 最大等待时长,大于此时长的一律按最大时长来计算



重启consumer服务,重复之前的测试。可以发现:

在重试4次后,SpringAMQP会抛出异常AmqpRejectAndDontRequeueException,说明本地重试触发了
查看RabbitMQ控制台,发现消息被删除了,说明最后SpringAMQP返回的是ack,mq删除消息了

2、失败策略
问题:在上面的测试中,达到最大重试次数后,消息会被丢弃,这是由Spring内部机制决定的。但是,有些数据特别重要,我们不希望任何消息被丢弃,此时,我们应该如何实现?

在开启重试模式后,重试次数耗尽,如果消息依然失败,则需要有MessageRecovery接口来处理,它包含三种不同的实现:

RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,【丢弃消息】【默认】就是这种方式
ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队(Immediate立刻重入队)(但是频率比没有配置消费失败重载机制低一些)
RepublishMessageRecoverer(推荐):重试耗尽后,将失败消息投递到指定的交换机
RepublishMessageRecoverer:失败后将消息投递到一个指定的,专门存放异常消息的队列,后续由人工集中处理,这样所有的消息都不会丢失。




总结:
如何确保RabbitMQ消息的可靠性?

开启生产者确认机制,确保生产者的消息能到达队列
开启持久化功能,确保消息未消费前在队列中不会丢失
开启消费者确认机制为auto,由spring确认消息处理成功后完成ack
开启消费者失败重试机制,并设置MessageRecoverer,多次重试失败后将消息投递到异常交换机,交由人工处理




标签:队列,springframework,高级,特性,交换机,消息,org,import,RabbitMQ
From: https://blog.csdn.net/qq_41041630/article/details/143794099

相关文章

  • 表单表格与选择器高级
    1.表单当您想要通过网页来收集一些用户的信息(例如用户名、电话、邮箱地址等)时,就需要用到HTML表单。表单可以接收用户输入的信息,然后将其发送到后端应用程序,后端应用程序将根据定义好的业务逻辑对表单传递来的数据进行处理。表单属于HTML文档的一部分,其中包含了如输入框......
  • Mybatis-plus之新特性,你都用过哪些?
    1.lambda方式查询在使用Mybatis-plus进行查询时,我们正常的操作是创建一个QueryWrapper,然后根据字段去做查询操作(如下图)那么就有一个问题,每个数据库的字段都需要写出来,遇到驼峰字段还需要转换为下划线形式,非常影响开发效率。而官方也考虑到这个问题,后续的版本已经提供了lambda的......
  • Qt/C++地图高级绘图/指定唯一标识添加删除修改/动态显示和隐藏/支持天地图高德地图百
    一、前言说明已经有了最基础的接口用来添加覆盖物,而且还有通过进入覆盖物模式动态添加覆盖物的功能,为什么还要来个高级绘图?因为又有新的需求,给钱就搞,一点底线都没有。无论哪个地图厂家,提供的接口都是没有唯一标识参数的,也就类似于学号,这就是需要自己主动定一个属性用来存储唯一标......
  • 【大数据学习 | HBASE高级】hbase-phoenix 与二次索引应用
    1. hbase-phoenix的应用1.1概述:上面我们学会了hbase的操作和原理,以及外部集成的mr的计算方式,但是我们在使用hbase的时候,有的时候我们要直接操作hbase做部分数据的查询和插入,这种原生的方式操作在工作过程中还是比较常见的,以上这些方式需要使用外部的框架进行协助处理,其实hb......
  • Spring Boot编程训练系统:核心特性与实现策略
    3系统分析3.1可行性分析通过对本编程训练系统实行的目的初步调查和分析,提出可行性方案并对其一一进行论证。我们在这里主要从技术可行性、经济可行性、操作可行性等方面进行分析。3.1.1技术可行性本编程训练系统采用SSM框架,JAVA作为开发语言,是基于WEB平台的B/S架构系统......
  • 【HAProxy09】企业级反向代理HAProxy高级功能之压缩功能与后端服务器健康性监测
    HAProxy高级功能介绍HAProxy高级配置及实用案例压缩功能对响应给客户端的报文进行压缩,以节省网络带宽,但是会占用部分CPU性能建议在后端服务器开启压缩功能,而非在HAProxy上开启压缩注意:默认Ubuntu的包安装nginx开启压缩功能配置选项compressionalgo<algorithm>.......
  • C++11新特性lambda
    文章目录前言lambda进阶用法总结前言本文介绍C++11新特性中的lamdba在C++11中支持了lambda表达式,如果你想生成一个简短、方便调用、函数内部清晰的轻量级函数,lambda是一个不错的选择。它允许我们在函数中生成函数,也可以向对待函数一样对待lambda。lambda首先看......
  • RabbitMQ文档
    说明本文基于Centos7系统测试RabbitMQ版本为3.2.*本文所有操作均使用root用户官方软件下载地址erlang:https://github.com/rabbitmq/erlang-rpm/releasesrabbitmq:https://www.rabbitmq.com/install-rpm.html#downloads安装1.将本站点上的erlang和rabbitmq安装包传......
  • 高级语言调用C接口(二)回调函数(1)
    前言先说一下上一篇文章给出了各高级语言类型和C类型的对应关系,只包含基本类型,不包含结构体等复杂结构,高级语言只有常见的JAVA(Android通用)、C#、Python、Arkts(鸿蒙系)。其它语言如delphi、PB之类的古老语言目前使用的人非常稀少,默认不写了;还有js调用需要编译位wasm,但限制非......
  • 【新品速递】一文详解 ALINX NVMe IP 特性
    -ALINXNVMe IP- 在当下数据驱动的时代,企业对高性能存储解决方案的需求不断增加。NVMeAXIIP凭借其支持大数据量、高速传输、低延迟等存储性能优势,成为众多开发者和企业的理想选择。NVMe 专为SSD而生,通过直接利用PCIe通道,避免SATA协议和外置控制器(PCH)的额外延迟,使......