首页 > 其他分享 >RabbitMq 集成到 SpringBoot

RabbitMq 集成到 SpringBoot

时间:2024-03-15 15:33:25浏览次数:27  
标签:集成 SpringBoot 队列 RabbitMq 重试 消息 import message public

1. 最低配置使用

1. 添加依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
    <version>2.3.2.RELEASE</version>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
    <version>2.3.2.RELEASE</version>
</dependency>
<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <version>1.18.12</version>
</dependency>

2. 添加配置

application.yml

server:
  port: 10001
spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
    virtual-host: /demo

3. 配置类

声明队列和交换机,以及绑定关系

package com.demo.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMqConfig {

    public static final String queue1 = "demo_queue_1";
    public static final String queue2 = "demo_queue_2";

    public static final String exchange = "demo_exchange";

    public static final String queue_key_1 = "demo_queue_key_1";
    public static final String queue_key_2 = "demo_queue_key_2";

    //队列
    @Bean
    public Queue demoQueue1(){
        return new Queue(queue1);
    }
    //队列
    @Bean
    public Queue demoQueue2(){
        return new Queue(queue2);
    }
    //直连交换机
    @Bean
    public DirectExchange exchange(){
        return new DirectExchange(exchange, false, false);
    }
    //绑定 key,关联交换机和队列
    @Bean
    public Binding binding(Queue demoQueue1, DirectExchange exchange){
        return BindingBuilder.bind(demoQueue1).to(exchange).with(queue_key_1);
    }
    @Bean
    public Binding binding1(Queue demoQueue2, DirectExchange exchange){
        return BindingBuilder.bind(demoQueue2).to(exchange).with(queue_key_2);
    }
}

创建 Queue 和 Exchange 也可以使用 QueueBuilder 和 ExchangeBuilder,同理 BindingBuilder 也可以被替换为 new Binding();

4. 生产者

package com.demo.controller;

import com.demo.entity.DogDto;
import com.demo.entity.ResultVo;
import com.demo.mq.producter.MqProductor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("/demo")
public class TestController {

    @Autowired
    private MqProductor mqProductor;

    @RequestMapping("/sendMsg")
    public ResultVo sendMsg(){
        mqProductor.sendMessage("我乃常山赵子龙");
        return new ResultVo(200, "发送消息成功!", null);
    }
}

package com.demo.mq.producter;

import com.demo.config.RabbitMqConfig;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * @author admin
 */
@Component
public class MqProductor {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendMessage(String data){
        rabbitTemplate.convertAndSend(RabbitMqConfig.exchange, RabbitMqConfig.queue_key_1, data);
    }
}

通过 RabbitTemplate.convertAndSend() 发送的消息实体,需要实现序列化接口

5. 消费者

package com.demo.mq.comsumer;

import com.demo.config.RabbitMqConfig;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class MqConsumer {
    @RabbitListener(queues = RabbitMqConfig.queue1)
    public void c1(Message message) {

        System.out.print( "消费者消费信息:");
        System.out.println(new String(message.getBody()));
    }
}

主要通过 @RabbitListener 来监听一个队列,message.getBody() 获取的是一个字节数组。

如此就是 rabbitmq 最低配使用。

2. 消息序列化

如果消息的类型为引用类型时,生产者发送这个消息对象必须实现 Serializable 接口

package com.demo.entity;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.io.Serializable;

@Data
@AllArgsConstructor
@NoArgsConstructor
public class DogDto implements Serializable {
    private String name;
    private String color;

    @Override
    public String toString() {
        return "DogDto{" +
                "name='" + name + '\'' +
                ", color='" + color + '\'' +
                '}';
    }
}

否则会抛出 java.lang.IllegalArgumentException: SimpleMessageConverter only supports String, byte[] and Serializable payloads, received: com.demo.entity.DogDto 异常。

当使用 rabbitTemplate.convertAndSend 发送消息时,mq 会使用 MessageConverter 进行序列化,默认实现是 SimpleMessageConverter,此时我们消费者要想获取消息实例,必须对消息进行反序列化。

三种方式:

    1. 消费者手动反序列化

由于 SimpleMessageConverter 使用的是 ObjectOutPutStream 序列化,所以我们使用 ObjectInputStream 反序列化。

@RabbitListener(queues = RabbitMqConfig.queue2)
public void c2(Message message) throws IOException, ClassNotFoundException {
    System.out.println("消费者消费 q2 信息:");

    DogDto o = (DogDto)new ObjectInputStream(new ByteArrayInputStream(message.getBody())).readObject();

    System.out.println(o);
}
    1. 生产者手动序列化,消费者使用相同 json 反序列化

这里使用 jackson 举例:

生产者:

public <T> void sendMessage(T o) throws JsonProcessingException {
    rabbitTemplate.convertAndSend(RabbitMqConfig.exchange, RabbitMqConfig.queue_key_2, new ObjectMapper().writeValueAsString(o));
}

消费者:

@RabbitListener(queues = RabbitMqConfig.queue2)
public void c2(Message message) throws IOException, ClassNotFoundException {
    System.out.println("消费者消费 q2 信息:");
    DogDto o = new ObjectMapper().readValue(message.getBody(), DogDto.class);

    System.out.println(o);
}
    1. 消费者自动解析

将消息对象作为方法参数,mq 会自动解析

 @RabbitListener(queues = RabbitMqConfig.queue2)
public void c2(DogDto o, Message message) throws IOException, ClassNotFoundException {
    System.out.println("消费者消费 q2 信息:");
    System.out.println(o);
}

有个版本的问题,消费对象会抛异常 :Caused by: java.lang.SecurityException: Attempt to deserialize unauthorized class java.util.HashMap; add allowed class name patterns to the message converter or, if you trust the message orginiator, set environment variable 'SPRING_AMQP_DESERIALIZATION_TRUST_ALL' or system property 'spring.amqp.deserialization.trust.all' to true,目前发现 5.0 是在使用这个版本的情况下(巨坑),貌似是权限一类的问题,需要明确设置实体类可解析。

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
    <version>2.7.18</version>
</dependency>

这时候要么自定义一下 MessageConverter, 设置一下实体类的路径

@Bean
public MessageConverter jsonToMapMessageConverter() {
    DefaultClassMapper defaultClassMapper = new DefaultClassMapper();
    defaultClassMapper.setTrustedPackages("com.demo.entity"); // trusted packages
    Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
    jackson2JsonMessageConverter.setClassMapper(defaultClassMapper);
    return jackson2JsonMessageConverter;
}

要么生产者发布消息时候,手动转 json,当字符串传。

3. 发布确认

1. 问题描述

消息发布确认主要是要确保消息成功发送到了交换机和队列。

主要有异步和同步两种方式,异步确认是更推荐的,生产者可以先不管上条消息是否被确认就先发布下条消息,等到确认的消息或者是失败消息返回时再通过回调函数处理,这样处理效率更高。

2. 消息发布确认

配置文件:

spring.rabbitmq.publisher-confirm-type=correlated  # 开启消息确认 设置确认模式
spring.rabbitmq.publisher-returns=true  # 开启消息退回
  • NONE 值是禁用发布确认模式,是默认值

  • CORRELATED 值是发布消息成功到交换器后会触发回调方法 用这种即可√

  • SIMPLE 值有两种效果,其一效果和 CORRELATED 值一样会触发回调方法,其二在发布消息成功后使用 rabbitTemplate 调用 waitForConfirms 或 waitForConfirmsOrDie 方法等待 broker 节点返回发送结果,根据返回结果来判定下一步的逻辑,要注意的点是 waitForConfirmsOrDie 方法如果返回 false 则会关闭 channel,则接下来无法发送消息到 broker;

  在springboot,利用 RabbitTemplate 可以很轻松实现,主要要设置两个回调函数,确认回调函数 ConfirmCallback 和退回回调函数 ReturnsCallback(有些低版本是ReturnCallback),两个函数是对全局生效的,所以要在系统驱动的时候加载。

到达交换机:ConfirmCallback
@FunctionalInterface
public interface ConfirmCallback {
	/**
	 * Confirmation callback.
	 * @param correlationData 消息编号之类
	 * @param ack 是否成功,true 成功,false 失败
	 * @param cause 失败的原因.
	 */
	void confirm(@Nullable CorrelationData correlationData, boolean ack, @Nullable String cause);
}

  为了方便消息确认,所以在生产者发送消息时就不能简单地传递string的message,还要传入correlationData,这个对象用起来很容易,只需要传入一个全局唯一的ID,或者不传它默认构造器由UUID随机生成。

到达队列:ReturnsCallback
@FunctionalInterface
public interface ReturnsCallback extends ReturnCallback {

	/**
	 * Returned message callback.
	 * @param message the returned message.
	 * @param replyCode the reply code.
	 * @param replyText the reply text.
	 * @param exchange the exchange.
	 * @param routingKey the routing key.
	 * @deprecated in favor of {@link #returnedMessage(ReturnedMessage)} which is
	 * easier to use with lambdas.
	 */
	@Override
	@Deprecated
	default void returnedMessage(Message message, int replyCode, String replyText, String exchange,
			String routingKey) {

		throw new UnsupportedOperationException(
				"This should never be called, please open a GitHub issue with a stack trace");
	};

	/**
	 * Returned message callback.
	 * @param returned the returned message and metadata.
	 */
	@Override
	void returnedMessage(ReturnedMessage returned);

	/**
	 * Internal use only; transitional during deprecation.
	 * @return the legacy delegate.
	 * @deprecated - will be removed with {@link ReturnCallback}.
	 */
	@Deprecated
	@Nullable
	default ReturnCallback delegate() {
		return null;
	}

}

  退回回调函数 ReturnsCallback 是消息转发到队列失败回调的函数,成功则不会执行。失败的原因有很多,比如路由错误,服务崩溃,队列失效。开启回退除了配置文件外,还需要设置mandatory参数,配置文件,配置类中都可以设置

rabbitmq:
	template:
  		mandatory: true
rabbitTemplate.setMandatory(true);

消息发布确认示例-配置文件

server:
  port: 10001
spring:
  rabbitmq:
    #基本配置
    host: localhost
    port: 5672
    username: guest
    password: guest
    virtual-host: /demo
    #监听
    listener:
      simple:
        retry:
          enabled: true #开启重试,默认不开启
          max-attempts: 3 #最大重试次数
          initial-interval: 1000ms #重试间隔时间(单位毫秒)
          max-interval: 1200000ms #重试最大时间间隔(单位毫秒)
          # 乘子。间隔时间*乘子=下一次的间隔时间,不能超过 max-interval
          # 以本处为例:第一次间隔 5 秒,第二次间隔 10 秒,以此类推
          multiplier: 2
    #发送
    publisher-confirm-type: correlated
    publisher-returns: true

消息发布确认示例-类:

@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
    RabbitTemplate rt = new RabbitTemplate(connectionFactory);
    rt.setMandatory(true);

    //确认回调函数ConfirmCallback是消息转发到交换机就会回调的函数,转发是否成功都会被调用
    rt.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
        @Override
        public void confirm(CorrelationData correlationData, boolean b, String s) {
            if(b){
                log.info("交换机接收消息编号:{},消息:{} 成功!", s, correlationData);
            }else{
                log.info("交换机接收消息失败:{} 失败!原因:{}", correlationData, s);
            }
        }
    });
    //退回回调函数ReturnsCallback是消息转发到队列失败回调的函数,成功则不会执行
    rt.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
        @Override
        public void returnedMessage(ReturnedMessage rs) {
            log.info("消息id:{}", rs.getMessage().getMessageProperties().getHeader("spring_returned_message_correlation").toString());
            log.info("交换机:{}", rs.getExchange());
            log.info("应答码:{}", rs.getReplyCode());
            log.info("应答文本:{}", rs.getReplyText());
            log.info("绑定键:{}", rs.getRoutingKey());
            log.info("returnsCallback 返回消息:{}", rs.getMessage());
        }
    });
    return rt;
}

4. 重试机制

1. 问题描述

消费者默认是自动提交,如果消费时出现了RuntimException,会导致消息直接重新入队,再次投递(进入队首),进入死循环,继而导致后面的消息被阻塞。

消息阻塞带来的后果是:后边的消息无法被消费;RabbitMQ服务端继续接收消息,占内存和磁盘越来越多。

重试并不是RabbitMQ重新发送了消息,仅仅是消费者内部进行的重试,换句话说就是重试跟 mq 没有任何关系。

不管消息被消费了之后是手动确认还是自动确认,代码中不能使用try/catch捕获异常,否则重试机制失效。

2. 自动确认分四种情况

(第一就是正常消费,其他三种为异常情况)

  • 消息成功被消费,没有抛出异常,则自动确认,回复ack。不涉及requeue,毕竟已经成功了。requeue是对被拒绝的消息生效。
  • 当抛出ImmediateAcknowledgeAmqpException异常的时候,则视为成功消费,确认该消息。
  • 当抛出AmqpRejectAndDontRequeueException异常的时候,则消息会被拒绝,且requeue = false(该异常会在重试超过限制后抛出)
  • 抛出其他的异常,消息会被拒绝,且requeue = true

3. 消息重试的两种情况

    1. 消息是自动确认时,如果抛出了异常导致多次重试都失败,消息被自动确认,消息就丢失了
    1. 消息是手动确认时,如果抛出了异常导致多次重试都失败,消息没被确认,也无法nack,就一直是unacked状态,导致消息积压

4. 重试示例

application.yml 添加配置:

server:
  port: 10001
spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
    virtual-host: /demo

    listener:
      simple:
        retry:
          enabled: false #开启重试,默认不开启
          max-attempts: 3 #最大重试次数
          initial-interval: 5000ms #重试间隔时间(单位毫秒)
          max-interval: 1200000ms #重试最大时间间隔(单位毫秒)
          # 乘子。间隔时间*乘子=下一次的间隔时间,不能超过max-interval
          # 以本处为例:第一次间隔 5 秒,第二次间隔 10 秒,以此类推
          multiplier: 2

消费者模拟异常:

@RabbitListener(queues = RabbitMqConfig.queue1)
public void c1(Message message) {

    System.out.print("消费者消费 q1 信息,时间:" + LocalDateTime.now());
    System.out.println(new String(message.getBody()));

    //模拟异常
    throw new RuntimeException();
}

可以看到测试时候重试了三次,第三次依然报错忍无可忍,不再重试,如果没有额外配置,则丢弃消息。

异常会附带 重试次数已用尽

5. 重试完对消息的处理

消息在重试完之后,会调用MessageRecoverer接口的recover方法。MessageRecoverer接口有如下三个实现类(看它们名字即可知道含义):

  • RejectAndDontRequeueRecoverer:拒绝而且不把消息重新放入队列(默认)
  • RepublishMessageRecoverer:重新发布消息
  • ImmediateRequeueMessageRecoverer:立即把消息重新放入队列

重试失败的消息发布到其他的队列中

package com.demo.config;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMqRecoverConfig {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    //重试消息处理
    @Bean
    public MessageRecoverer messageRecoverer(){
        System.out.println("重试也失败的消息!");
        return new RepublishMessageRecoverer(rabbitTemplate, RabbitMqConfig.exchange, RabbitMqConfig.queue_key_3);
    }
}

发布失败的消息到其他队列中去。

可以看到发布到第三个队列中了

5. 消费确认

1. 问题描述

RabbitMQ 是通过消费者回执来确认消费者是否成功处理消息的:消费者获取消息后,应该向 RabbitMQ 发送回执,表明自己已经处理消息,RabbitMq 将根据情况处理已消费消息。

2. 三种确认模式

设置消费者确认模式

spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: none
  • none:关闭ack,MQ假定消费者获取消息后会成功处理,因此消息投递后立即被删除。消息可能丢失,用于不重要的消息。

  • auto:模式类似事务机制,默认的状态,自动ack,由 spring 监测 listener 代码是否出现异常,没有异常则返回ack,队列中消息删除;抛出异常则返回 nack,消息回滚到 mq。貌似跟重试机制冲突,如果开启重试机制,本地重试过后依然抛出异常,则会按照重试机制的处理,同时返回 ack,mq 确认并丢弃消息,而不是重新回滚消息到 mq。这应该是重试机制的正确用法。

  • manual:手动ack,需要在业务代码结束后,调用 api 发送 ack,接收不到回执的消息一直为 Unacked 状态存在于 RabbitMq 队列中,当对应的消费者断开连接,该消息会重新回到队列中变成 Ready 状态。

3. 手动确认

手动确认主要通过几个 API 来实现:

Channel.basicAck

/**
 * long deliveryTag:消息的传递标识
 * boolean multiple:是否批量确认,为true,则确认后,其他消息deliveryTag小于当前消息的deliveryTag的消息全部变为确认;(慎重)
 * */
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);

Channel.basicReject

/**
 * long deliveryTag:消息的传递标识。
 * boolean requeue:设置为true 会把消费失败的消息从新添加到队列的尾端,设置为false不会重新回到队列,
 *                  设置为 false 表示不再重新入队,如果配置了死信队列则进入死信队列。
 * */
channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);

Channel.basicNack

/**
 * long deliveryTag:消息的传递标识
 * boolean multiple:是否批量拒绝,如果为true,则拒绝所有consumer获得的小于deliveryTag的消息。(慎重)
 * boolean requeue:是否重新入列,设置为true 会把消费失败的消息从新添加到队列的尾端,设置为false不会重新回到队列,有死信队列就进入死信队列
 * */
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);

一般配合 try-catch 使用

@RabbitListener(queues = RabbitMqConfig.queue4)
public void c3(Message message, Channel channel) throws IOException {
    try{
        System.out.println(new String(message.getBody()));
        System.out.println(1/0);
        /**
         * long deliveryTag:消息的传递标识
         * boolean multiple:是否批量确认,为true,则确认后,其他消息deliveryTag小于当前消息的deliveryTag的消息全部变为确认;(慎重)
         * */
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }catch (Exception e){
        /**
         * long deliveryTag:消息的传递标识。
         * boolean requeue:设置为true 会把消费失败的消息从新添加到队列的尾端,设置为false不会重新回到队列,
         *                  设置为 false 表示不再重新入队,如果配置了死信队列则进入死信队列。
         * */
        channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
    }

无论消费者是否成功,都要给 mq 一个回执,否则这个消息会一直以 unacked 的状态存在于队列中,除非这个消费者断开连接,消息会重新回到 ready 状态。

对于 Consumer 来说,未 ack 的消息,在客户端处理(个人理解)

此时第一个个线程存放消息的队列有个因异常或其他情况 unacked 的消息,这个消息无法被消费完,就一直占着这个坑,然后这个线程就只能操作其他四个坑了,当这个线程的坑被占完了,就不能处理消息了。

模拟一个因异常无法回执的消息

//提前队列里放了以 0-9 结尾的十条消息
//prefetch: 3
//concurrency: 2
@RabbitListener(queues = RabbitMqConfig.queue5)
public void c5(Message message, Channel channel) throws InterruptedException, IOException {
    Thread.sleep(1000);

    String s = new String(message.getBody());
    log.info("当前线程:{},当前时间:{} 接收到消息:{}" ,Thread.currentThread().getName(), LocalDateTime.now(), s);
    if(s.endsWith("1") || s.endsWith("2") || s.endsWith("3")){
        System.out.println(1/0);
    }else{
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }
}

4. 并发参数

  • prefetch:RabbitMq 每次向消费者预推的消息数量,这个值越高,消息传递的越快,但是非顺序处理的风险更高。如果 ack 模式为 none,则忽略这个参数。

      prefetch默认值以前是1,这可能会导致高效使用者的利用率不足。从spring-amqp 2.0版开始,默认的prefetch值是250,这将使消费者在大多数常见场景中保持忙碌,从而提高吞吐量。不过在有些情况下,尤其是处理速度比较慢的大消息,消息可能在内存中大量堆积,消耗大量内存;以及对于一些严格要求顺序的消息,prefetch的值应当设置为1。对于低容量消息和多个消费者的情况(也包括单listener容器的concurrency配置)希望在多个使用者之间实现更均匀的消息分布,建议在手动ack下并设置prefetch=1。

  • concurrency:消费者开启线程数,concurrency = 1,即每个 Listener 容器将开启一个线程去处理消息。也可以在 @RabbitListener 为每个监听设置这个参数。

      另外 @RabbitListener 有个 exclusive 参数,表示此容器的单个 Consumer 是否对队列访问独占权限。如果为 true,那么 concurrency 只能是 1;实际一个消费者容器每次从 Mq 预取消息数 = concurrentcy * perfetch;

配置示例:

spring:
  rabbitmq:
    #监听
    listener:
      simple:
        prefetch: 3
        concurrency: 2
        max-concurrency: 2

此时 mq 中消息情况:实际从 10 哥消息中预取了 perfetch * concurrency = 6 条消息在处理

6. 持久化

持久化就是当 RabbitMq 重启,消息不会丢失,也就是存在磁盘上。

1. exchange 持久化

AbstractExchange 类构造方法
public abstract class AbstractExchange extends AbstractDeclarable implements Exchange {
    private final String name;
    private final boolean durable;
    private final boolean autoDelete;
    private final Map<String, Object> arguments;
    private volatile boolean delayed;
    private boolean internal;

    public AbstractExchange(String name) {
        this(name, true, false);
    }

    public AbstractExchange(String name, boolean durable, boolean autoDelete) {
        this(name, durable, autoDelete, (Map)null);
    }

    public AbstractExchange(String name, boolean durable, boolean autoDelete, Map<String, Object> arguments) {
        this.name = name;
        this.durable = durable;
        this.autoDelete = autoDelete;
        if (arguments != null) {
            this.arguments = arguments;
        } else {
            this.arguments = new HashMap();
        }

    }

在申明exchange的时候,有个参数:durable。当该参数为true,则对该exchange做持久化,重启rabbitmq服务器,该exchange不会消失。无参构造 durable的 默认值为 true

2. queue 持久化

Queue 类构造方法
public Queue(String name, boolean durable, boolean exclusive, boolean autoDelete,
			@Nullable Map<String, Object> arguments) {

		super(arguments);
		Assert.notNull(name, "'name' cannot be null");
		this.name = name;
		this.actualName = StringUtils.hasText(name) ? name
				: (Base64UrlNamingStrategy.DEFAULT.generateName() + "_awaiting_declaration");
		this.durable = durable;
		this.exclusive = exclusive;
		this.autoDelete = autoDelete;
	}

申明队列时也有个参数:durable。当该参数为true,则对该queue做持久化,重启rabbitmq服务器,该queue不会消失。durable的默认值为true

3. message 持久化

默认 RabbitTemplate 类下的 convertAndSend 就是持久化的消息。参考:https://blog.csdn.net/weixin_43831204/article/details/113589733

7. 死信队列

1. 问题说明

消息中间件因为某种原因拒收该消息后,可以转移到死信队列中存放,死信 队列也可以有交换机和路由key等。消费者在消费生产者生产的消息时发生了某些特殊情况(下文会说),导致消息无法被正常消费,存放这些未被消费的消息的队列即为死信队列。

2. 产生死信队列的原因

    1. 消息投递到MQ中存放,消息 TTL 已经过期的消息。
    1. 队列达到最大的长度 (队列容器已经满了)。
    1. 消息被消费者拒绝,并且 requeue=false 的消息。
    1. 重试机制生效,并且没有定义 MessageRecoverer,如果消息经过重试失败,也会被丢弃到死信队列中。

总的来说,被丢弃的消息,如果配置了死信队列,就会丢到死信队列。

3. 配置死信队列

//定义死信队列
@Bean
public Queue deadQueue(){
    return new Queue(dead_queue);
}
//定义死信交换机
@Bean
public DirectExchange deadExchange(){
    return new DirectExchange(dead_exchange);
}
//绑定
@Bean
public Binding deadBinding(Queue deadQueue, DirectExchange deadExchange){
    return BindingBuilder.bind(deadQueue).to(deadExchange).with(dead_queue_key_1);
}
//正常队列,绑定死信交换机
@Bean
public Queue demoQueue6(){
    return QueueBuilder.durable(queue6) 
                .deadLetterExchange(dead_exchange)  //死信交换机名称
                .deadLetterRoutingKey(dead_queue_key_1) //死信队列绑定名
                .maxLength(1)
                .ttl(10_000)
                .build();
}

队列参数:

对应使用 Map 设置时候的参数:

参数名

说明

x-dead-letter-exchange

死信交换器

x-dead-letter-routing-key

死信消息的可选路由键

x-expires

队列在指定毫秒数后被删除

x-message-ttl

毫秒为单位的消息过期时间,队列级别

x-ha-policy

创建HA队列,此参数已失效

x-ha-nodes

HA队列的分布节点,此参数已失效

x-max-length

队列的消息条数限制。限制加入queue中消息的条数。先进先出原则,超过后,后面的消息会顶替前面的消息。

x-max-length-bytes

消息容量限制,该参数和x-max-length目的一样限制队列的容量,但是这个是靠队列大小(bytes)来达到限制。

x-max-priority

最大优先值为255的队列优先排序功能

x-overflow

设置队列溢出行为。这决定了当达到队列的最大长度时消息会发生什么。有效值是drop-head、reject-publish或reject-publish-dlx。

x-single-active-consumer

表示队列是否是单一活动消费者,true时,注册的消费组内只有一个消费者消费消息,其他被忽略,false时消息循环分发给所有消费者(默认false)

x-queue-mode

将队列设置为延迟模式,在磁盘上保留尽可能多的消息,以减少RAM的使用;如果未设置,队列将保留内存缓存以尽可能快地传递消息

x-queue-master-locator

在集群模式下设置镜像队列的主节点信息

8. 延时队列

标签:集成,SpringBoot,队列,RabbitMq,重试,消息,import,message,public
From: https://www.cnblogs.com/cnff/p/18070920

相关文章

  • 【人才梯队建设】某系统集成企业技术人才梯队建设项目纪实
    ——量化工作饱和度,建设虚拟金字塔形技术梯队与很多企业一样,为了适应公司规模的扩大和业务增长的需要,该公司扩建了技术团队,但是技术人员之间的分工仍比较混乱,虽然人数多了,但是业绩仍然上不去,企业徒增了人工成本。另一方面,新进的技术人员有很多是刚毕业的大学生,能力、经验都有......
  • java毕业设计-基于springboot开发的会员制医疗预约服务管理信息系统-毕业论文+答辩PPT
    文章目录前言一、毕设成果演示(源代码在文末)二、毕设摘要展示1、开发说明2、需求分析3、系统功能结构三、系统实现展示1、系统功能模块2、管理员功能模块3、医生功能模块3、会员功能模块四、毕设内容和源代码获取总结java毕业设计-基于springboot开发的会员制医疗预......
  • Gitlab+Jenkins+Docker+Harbor+K8s集群搭建CICD平台(持续集成部署Hexo博客Demo)
    目录涉及内容:一、CICD服务器环境搭建1、docker环境安装(1)、拉取镜像,启动并设置开机自启(2)、配置docker加速器2、安装并配置GitLab(1)、创建共享卷目录(2)、创建gitlab容器(3)、关闭容器修改配置文件(4)、修改完配置文件之后。直接启动容器(5)、相关的git命令(针对已存在的文件夹)3、安装配......
  • Java毕业设计-基于springboot开发的4S店车辆管理系统-毕业论文+答辩PPT(附源代码+演示
    文章目录前言一、毕设成果演示(源代码在文末)二、毕设摘要展示1.开发说明2.需求分析3、系统功能结构三、系统实现展示1、系统登录2、管理员功能模块3、销售员功能模块4、维修员功能模块四、毕设内容和源代码获取总结Java毕业设计-基于springboot开发的4S店车辆管理系......
  • springBoot 配置 国产达梦数据库
    1.pom<!--达梦数据库驱动--> <dependency> <groupId>com.dm</groupId> <artifactId>DmJdbcDriver18</artifactId> <version>1.8</version> </dependency>maven中央仓库里面没有,需要手动安装到maven本地仓库mvni......
  • Java基于 Springboot+Vue 的招生管理系统,前后端分离
    博主介绍:✌程序员徐师兄、8年大厂程序员经历。全网粉丝15w+、csdn博客专家、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java技术领域和毕业项目实战✌......
  • springboot自动配置
    首先,你在pom文件里引入的很多第三方jar里都有一个文件 META-INF/spring.factories,这个文件里的内容和关系到能否自动配置,那有的jar为啥没有,是不需要SpringBoot来自动配置吗,这个我们后面再说。    先来看一下redissonstarter的/META-INF/spring.factories是怎么写的,......
  • 在Docker上传我们自己的镜像(以springboot项目为例)
    首先确定好在我们的centOS服务器上已经安装并配置好docker配置自己的springboot镜像并运行获取springboot的jar包mavenclean--》mavenuepackage --》复制target目录下生成的jar包在服务器选择一个文件夹上传jar包,我这里选用的文件夹叫做/opt/dockertest在jar包的同一......
  • springboot3+vue3(十一)springboot多环境开发
    在开发中我们往往会遇到,本地环境、测试环境、生产环境分别一套配置。如数据库连接,端口号等配置各不相同的问题。 1、多文件配置    2、多文件分组配置如果配置文件有很多的配置信息几百行的情况,为了方便维护我们可以根据功能的情况进行分组拆分。如:服务器相关配......
  • RabbitMQ入门
    RabbitMq入门目录RabbitMq入门1.MQ1.1同步调用1.2异步调用1.3MQ技术选型2.RabbitMQ2.1安装2.2收发消息(交换机、队列)2.3数据隔离2.3.1用户管理2.3.2VirtualHost3.SpringAMQP3.1创建项目3.2快速入门3.2.1消息发送3.2.2消息接收3.2.3总结3.3WorkQueues模型3.3.1消费者消息推送限制3......