话不多说,看项目整体架构
RabbitMQ 高级特性 保姆级教程
好了,下面县开始贴生产者代码:publisher
父依赖:
<parent>
<artifactId>spring-boot-starter-parent</artifactId>
<groupId>org.springframework.boot</groupId>
<version>2.7.18</version>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<!-- <version>2.7.18</version>-->
</dependency>
<!--rabbitmq-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!--mysql-->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.33</version>
<scope>runtime</scope>
</dependency>
<!--lombok-->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<!--jdbc-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>
<!--fastjson-->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.17</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
publisher yml文件:
server:
port: 8091
spring:
datasource:
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://127.0.0.1:3306/rabbitmq?useUnicode=true&characterEncoding=UTF-8&useSSL=false&serverTimezone=GMT%2B8&allowMultiQueries=true
username: SQL用户名
password: SQL密码
rabbitmq:
publisher-confirm-type: correlated # 开启确认模式
# 开启publisher-confirm,且选择correlated:【异步】回调,定义ConfirmCallback,MQ返回结果时会回调这个ConfirmCallback
publisher-returns: true # 开启publish-return功能 开启回退模式
template:
mandatory: true # 定义当消息从交换机路由到队列失败时的策略。【true,则调用ReturnCallback;false:则直接丢弃消息】
host: 你自己MQ IP
virtual-host: vHost1
port: 5672
username: 你自己用户名
password: 密码
logging:
pattern:
dateformat: HH:mm:ss:SSS
level:
com.zy: debug
在上面的MySQL连接可以不要,那是为了方便我自己后续使用,所以添加,如果删除,记得删除依赖中的MYSQL 依赖,不然maven检查会报错
CommonConfig:
package com.zy.config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.annotation.Configuration;
/**
* @Author: zy
* @Date: 2024-11-14-12:45
* @Description: 定义Return回退:
* 说明:因为在yml配置文件中定义消息路由失败时的策略为true,所以当消息从交换机路由到队列失败时,会调用ReturnCallback
* <p>
* 每个RabbitTemplate只能配置一个ReturnCallback,因此需要在项目加载时添加配置:
* 修改publisher服务,添加一个【配置类】:
* 位置:config/commic配置类
* <p>
* 如何保证在项目加载时添加配置?
* 1、实现ApplicationContextAware(实现了ApplicationContextAware接口的实现类,在Spring容器的Bean工厂创建完毕后会通知该实现类)
* 2、此时,该实现/配置类有了Spring容器的Bean工厂类;就可以获取并设置ReturnCallback(Spring容器的Bean对象)
* 3、开始配置ReturnCallback;
* <p>
* ReturnCallback的回调函数:当消息成功发送到交换机,但是没有成功发送到消息队列时,回退到回调函数,应该如何处理?就是回调函数里面的内容
*/
@Slf4j
@Configuration
public class CommonConfig implements ApplicationContextAware {
//实现了ApplicationContextAware接口的实现类,在Spring容器的Bean工厂创建完毕后会通知该实现类
//有了Bean工厂类,然后就可以获取并设置ReturnCallback(Spring容器的Bean对象)
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
// (从Spring容器中)获取RabbitTemplate对象
RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
// 配置ReturnCallback
/**
* ReturnCallback回调函数:当消息成功发送到交换机,
* 但是没有成功发送到消息队列时,回退到回调函数,应该如何处理?就是回调函数里面的内容)
*/
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
// 判断是否是延迟消息
Integer receivedDelay = message.getMessageProperties().getReceivedDelay();
if (receivedDelay != null && receivedDelay > 0) {
//判断延迟值非空且大于0==》是一个延迟消息,忽略这个错误提示
return;
}
// 记录日志
log.error("消息发送到队列失败,响应码:{}, 失败原因:{}, 交换机: {}, 路由key:{}, 消息: {}", replyCode, replyText, exchange,
routingKey, message.toString());
// 如果有需要的话,重发消息
});
// rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnsCallback() {
// @Override
// public void returnedMessage(ReturnedMessage returnedMessage) {
//
// }
// });
}
}
publisher app 启动类:
package com.zy;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* @Author: zy
* @Date: 2024-11-14-12:34
* @Description:
*/
@SpringBootApplication
public class RabbitMqApp {
public static void main(String[] args) {
SpringApplication.run(RabbitMqApp.class, args);
}
}
测试类:
package com.zy;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import java.util.UUID;
/**
* @Author: zy
* @Date: 2024-11-14-12:50
* @Description:
*/
@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testSendMessage1SimpleQueue() throws InterruptedException {
// 1.准备消息
String message = "hello, spring amqp!";
// 3.发送消息(这里如果没有绑定交换机和队列关系等,可以去管控台绑定,也可以在消费者的配置类中声明)
rabbitTemplate.convertAndSend("amq.topic", "simple.test", message);
}
@Test
public void testSendMessage2SimpleQueue() throws InterruptedException {
// 1.准备消息
String message = "hello 2, spring amqp!";
// 2.准备CorrelationData(消息ID需要封装到CorrelationData)
// 2.1.消息ID,确认机制发送消息时,需要给每个消息设置一个全局唯一id,以区分不同消息,避免ack冲突
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
// 2.2.准备ConfirmCallback(Future是对将来的一种处理的封装)(Future.addCallback)
correlationData.getFuture().addCallback(result -> {
System.out.println("消息ID:" + correlationData.getId());
// 判断结果
if (result.isAck()) {
// ACK
System.out.println("ACK");
log.debug("消息成功投递到交换机!消息ID: {}", correlationData.getId());
} else {
// NACK
System.out.println("NACK");
log.error("消息投递到交换机失败!消息ID:{}", correlationData.getId());
// 重发消息
}
}, ex -> {
// 记录日志
System.out.println("ERROR:" + ex.getMessage());
log.error("消息发送失败!", ex);
// 重发消息
});
// 3.发送消息(这里如果没有绑定交换机和队列关系等,可以去管控台绑定,也可以在消费者的配置类中声明)
rabbitTemplate.convertAndSend("amq.topic", "simple.test", message, correlationData);
}
/**
* confirm:消息未成功到达交换机——返回nack
* 故意写错交换机名称
* @throws InterruptedException
*/
@Test
public void testSendMessage3SimpleQueue() throws InterruptedException {
// 1.准备消息
String message = "hello 2, spring amqp!";
// 2.准备CorrelationData(消息ID需要封装到CorrelationData)
// 2.1.消息ID,确认机制发送消息时,需要给每个消息设置一个全局唯一id,以区分不同消息,避免ack冲突
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
// 2.2.准备ConfirmCallback(Future是对将来的一种处理的封装)(Future.addCallback)
correlationData.getFuture().addCallback(result -> {
System.out.println("消息ID:" + correlationData.getId());
// 判断结果
if (result.isAck()) {
// ACK
System.out.println("ACK");
log.debug("消息成功投递到交换机!消息ID: {}", correlationData.getId());
} else {
// NACK
System.out.println("NACK");
log.error("消息投递到交换机失败!消息ID:{}", correlationData.getId());
// 重发消息
}
}, ex -> {
// 记录日志
System.out.println("ERROR:" + ex.getMessage());
log.error("消息发送失败!", ex);
// 重发消息
});
// 3.发送消息(这里如果没有绑定交换机和队列关系等,可以去管控台绑定,也可以在消费者的配置类中声明)
rabbitTemplate.convertAndSend("aamq.topic", "simple.test", message, correlationData);
}
/**
* 测试3:消息发送到了交换机但没有发送到队列——返回ack,但是return回退
* 故意将队列名字写错(交换机不存在绑定该队列)
* 返回ACK,及路由失败原因.
* @throws InterruptedException
*/
@Test
public void testSendMessage4SimpleQueue() throws InterruptedException {
// 1.准备消息
String message = "hello 2, spring amqp!";
// 2.准备CorrelationData(消息ID需要封装到CorrelationData)
// 2.1.消息ID,确认机制发送消息时,需要给每个消息设置一个全局唯一id,以区分不同消息,避免ack冲突
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
// 2.2.准备ConfirmCallback(Future是对将来的一种处理的封装)(Future.addCallback)
correlationData.getFuture().addCallback(result -> {
System.out.println("消息ID:" + correlationData.getId());
// 判断结果
if (result.isAck()) {
// ACK
System.out.println("ACK");
log.debug("消息成功投递到交换机!消息ID: {}", correlationData.getId());
} else {
// NACK
System.out.println("NACK");
log.error("消息投递到交换机失败!消息ID:{}", correlationData.getId());
// 重发消息
}
}, ex -> {
// 记录日志
System.out.println("ERROR:" + ex.getMessage());
log.error("消息发送失败!", ex);
// 重发消息
});
// 3.发送消息(这里如果没有绑定交换机和队列关系等,可以去管控台绑定,也可以在消费者的配置类中声明)
rabbitTemplate.convertAndSend("amq.topic", "a.simple.test", message, correlationData);
}
}
在上面的各类代码中都有详细的注释以及注解,在测试类中也针对相应的问题进行了Mock
Consumer:
server:
port: 8090
spring:
datasource:
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://127.0.0.1:3306/rabbitmq?useUnicode=true&characterEncoding=UTF-8&useSSL=false&serverTimezone=GMT%2B8&allowMultiQueries=true
username: MYSQL用户名
password: 密码
rabbitmq:
publisher-confirm-type: correlated # 开启确认模式
# 开启publisher-confirm,且选择correlated:【异步】回调,定义ConfirmCallback,MQ返回结果时会回调这个ConfirmCallback
publisher-returns: true # 开启publish-return功能 开启回退模式
template:
mandatory: true # 定义当消息从交换机路由到队列失败时的策略。【true,则调用ReturnCallback;false:则直接丢弃消息】
host: MQ 机器IP
virtual-host: vHost1# 用你自己的host
port: 5672
username: 用户名
password: 密码
# yml配置文件中配置消息确认模式】:
listener:
simple:
acknowledge-mode: auto #由spring监测listener代码是否出现异常,没有异常则返回ack;抛出异常则返回nack
# 我们可以利用Spring的retry机制,在消费者出现异常时利用本地重试,
#而不是无限制的requeue到mq队列(不返回ack,也不返回nack),
# 而是可以自己设置重试的次数
#(如果在重试n次后仍然失败,那么后面在继续重入队大概率也会失败,那么就直接扔掉,
#不再重入队,此时,Spring会返回ack)
retry: # Spring消费者失败重试
enabled: true # 【开关】开启消费者失败重试
initial-interval: 1000 # 初识的失败等待时长为1秒(第一次失败后1s重试)
multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval(举例:倍数*第一次等待时长1s,这样子永远都是1s)
#但是如果设置为2,下次等待时长为上次的2倍,因此等待时长依次为1、2、4、8、16....
max-attempts: 4 # 最大重试次数
stateless: true # (默认为true)true无状态;false有状态【如果业务中包含事务,这里改为false】
#(备注:如果设置为false,那么Spring在重试的时候保留事务——消耗性能,所以没有事务时设置为true提升性能)
max-interval: 10000 # 最大等待时长,大于此时长的一律按最大时长来计算
logging:
pattern:
dateformat: HH:mm:ss:SSS
level:
com.zy: debug
CommonConfig:
package com.zy.common;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.ExchangeBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @Author: zy
* @Date: 2024-11-14-13:23
* @Description: 声明完队列和交换机,可以在RabbitMQ控制台看到持久化的交换机和队列都会带上D的标示:
*/
@Configuration
public class CommonConfig {
/**
* 三个参数:交换机名称、是否持久化、
* 当没有queue与其绑定时是否自动删除
* (事实上,默认情况下,由SpringAMQP声明的交换机和队列都是持久化的)
*/
@Bean
public DirectExchange simpleDirect() {
/*
* 默认为return new DirectExchange("simple.direct",true,false);
*/
return new DirectExchange("simple.direct");
}
/**
* 使用QueueBuilder构建队列,durable就是持久化的
*/
@Bean
public Queue simpleQueue() {
//new Queue("");默认代码为public Queue(String name) { this(name, true, false, false);}
return QueueBuilder.durable("simple.queue").build();
}
// 【声明普通的 simple.queue队列,并且为其指定死信交换机:dl.direct】 死信配置
@Bean
public Queue simpleQueue2() {
return QueueBuilder.durable("simple.dl.queue") // 指定队列名称,并持久化
.deadLetterExchange("dl.direct") // 【指定死信交换机】
.build();
}
// 声明死信交换机 dl.direct
@Bean
public DirectExchange dlExchange() {
return new DirectExchange("dl.direct", true, false);
}
// 声明存储死信的队列 dl.queue
@Bean
public Queue dlQueue() {
return new Queue("dl.queue", true);
}
// 将死信队列 与 死信交换机绑定
@Bean
public Binding dlBinding() {
return BindingBuilder.bind(dlQueue()).to(dlExchange()).with("simple");
}
/**
* 延时队列 需要安装插件,所以目前暂时无法测试
*
* @return
*/
@Bean
public DirectExchange delayedExchange() {
return ExchangeBuilder.directExchange("delay.direct").delayed().durable(true).build();
}
@Bean
public Queue delayedQueue() {
return new Queue("delay.queue");
}
@Bean
public Binding delayedBinding() {
return BindingBuilder.bind(delayedQueue()).to(delayedExchange()).with("delay");
}
}
TTLMessageConfig:
package com.zy.common;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @Author: zy
* @Date: 2024-11-14-14:30
* @Description: 声明交换机、队列,绑定关系、为队列指定TTL超时时间
* <p>
* 新建一个配置类(便于管理),配置类记得添加@Configuration
* 位置:consumer/config/TTLMessageConfig类
* 要给队列设置超时时间,需要在声明队列时配置ttl属性
*/
@Configuration
public class TTLMessageConfig {
//声明队列ttl.queue,设置超时时间
@Bean
public Queue ttlQueue() {
return QueueBuilder.durable("1-ttl.queue") // 指定队列名称,并持久化
.ttl(10000) // 设置队列的超时时间,10秒
.deadLetterExchange("1-dl.ttl.direct") // 队列指定死信交换机,即消息超时就投到这个交换机
.deadLetterRoutingKey("1-dl")//消息到死信交换机的RoutingKey
.build();
}
//正常的声明交换机ttl.direct
@Bean
public DirectExchange ttlExchange() {
return new DirectExchange("1-ttl.direct");
}
//正常的绑定交换机和队列,routingkey为ttl
@Bean
public Binding ttlBinding() {
return BindingBuilder.bind(ttlQueue()).to(ttlExchange()).with("1-ttl");
}
}
ErrorMessageConfig:
package com.zy.errordeal;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @Author: zy
* @Date: 2024-11-14-13:57
* @Description: 处理失败消息的【交换机和队列】
* 位置:在consumer服务中的配置类config/ErrorMessageConfig.java
* 作用:声明交换机、队列、绑定关系
* <p>
* 绑定关系:交换机error.direct–routingkey(error)–》error.queue
* <p>
* 失败策略:
* 1、参数rabbitTemplate(Spring容器自动注入)
* 2、通过rabbitTemplate将消息发送到(处理失败消息的)交换机(routingKey为error)
* <p>
* 这里的RepublishMessageRecoverer的作用:
* 当消费者的消息失败重试次数用尽后,将失败的消息【丢弃给指定的error交换机的error队列】
*/
@Configuration
public class ErrorMessageConfig {
@Bean
public DirectExchange errorMessageExchange() {
return new DirectExchange("error.direct");
}
@Bean
public Queue errorQueue() {
return new Queue("error.queue", true);
}
@Bean
public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange) {
return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");
}
/**
* 参数 Spring自动注入的rabbitTemplate
*
* @param rabbitTemplate
* @return
*
* Republishing failed message to
* exchange 'error.direct' with routing key error
*/
@Bean
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate) {
return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
}
}
SpringRabbitListener:
package com.zy.listener;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* @Author: zy
* @Date: 2024-11-14-13:39
* @Description:
*/
@Slf4j
@Component
public class SpringRabbitListener {
/**
* auto模式下:由spring监测listener代码是否出现异常,
* 没有异常则返回ack;抛出异常则返回nack
* <p>
* 【问题】:当消费者出现异常后,消息会不断requeue(重入队)到队列,
* 再重新发送给消费者,然后再次异常,再次requeue,
* 无限循环,导致mq的消息处理飙升,
* 带来不必要的压力:(这里测试一条数据就已经达到 3000条/s 了)
*
* @param msg
*/
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueue(String msg) {
//none模式下,消费者在这里接收到消息后,消息就从队列中被删除了
log.debug("消费者接收到simple.queue的消息:【" + msg + "】");
System.out.println(1 / 0);//抛出异常、后面就不会执行业务代码
log.info("消费者处理消息成功!");//模拟业务代码
}
/**
* 死信监听
*
* @param msg
*/
@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "1-dl.ttl.queue", durable = "true"), exchange = @Exchange(name = "1-dl.ttl.direct"), key = "1-dl"))
public void listenDlQueue(String msg) {
log.info("接收到 dl.ttl.queue的延迟消息:{}", msg);
}
/**
* 延时对列监听
* @param msg
*/
@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "delay.queue", durable = "true"), exchange = @Exchange(name = "delay.direct", delayed = "true"), key = "delay"))
public void listenDelayExchange(String msg) {
log.info("消费者接收到了delay.queue的延迟消息:" + msg);
}
}
ConsumerApp:
package com.zy;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* @Author: zy
* @Date: 2024-11-14-13:22
* @Description:
*/
@SpringBootApplication
public class ConsumerApp {
/**
* 问题:在上面的测试中,达到最大重试次数后,消息会被丢弃,这是由Spring内部机制决定的。
* 但是,有些数据特别重要,我们不希望任何消息被丢弃,此时,我们应该如何实现?
*
* 在开启重试模式后,重试次数耗尽,如果消息依然失败,则需要有MessageRecovery接口来处理,它包含三种不同的实现:
*
* RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,【丢弃消息】【默认】就是这种方式
* ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队(Immediate立刻重入队)(但是频率比没有配置消费失败重载机制低一些)
* RepublishMessageRecoverer(推荐):重试耗尽后,将失败消息投递到指定的交换机
* @param args
*/
public static void main(String[] args) {
SpringApplication.run(ConsumerApp.class, args);
}
}
测试类:
package com.zy;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import java.nio.charset.StandardCharsets;
import java.util.UUID;
/**
* @Author: zy
* @Date: 2024-11-14-13:32
* @Description:
*/
@SpringBootTest
@RunWith(SpringRunner.class)
@Slf4j
public class ConsumerTest {
@Autowired
RabbitTemplate rabbitTemplate;
/**
* RabbitMQ是阅后即焚机制,RabbitMQ确认消息被消费者消费后会立刻删除。
* 此时,如果消费者还没有处理消息,然后消费者挂掉了,就会导致消息丢失。
*/
@Test
public void testDurableMessage() {
// 1.准备消息
Message message = MessageBuilder.withBody("hello, spring".getBytes(StandardCharsets.UTF_8)).setDeliveryMode(
MessageDeliveryMode.PERSISTENT).build();
// 2.发送消息
rabbitTemplate.convertAndSend("simple.queue", message);
}
/**
* 1)RabbitMQ投递消息给消费者
* 2)消费者获取消息后,【返回ACK给RabbitMQ】
* 3)RabbitMQ删除消息
* 4)消费者宕机,消息尚未处理
* <p>
* 三种方式
* manual:手动ack,需要在业务代码结束后,调用api发送ack。
* auto:自动ack,由spring监测listener代码是否出现异常,没有异常则返回ack;抛出异常则返回nack
* none:关闭ack,MQ假定消费者获取消息后会成功处理,因此消息投递后立即被删除(此时,消息投递是不可靠的,可能丢失)
*/
@Test
public void testTTLMsg() {
// 创建消息
Message message = MessageBuilder.withBody("hello, ttl message".getBytes(StandardCharsets.UTF_8)).setExpiration(
"5000").build();
// 消息ID,需要封装到CorrelationData中
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
// 发送消息
System.out.println("消息ID:" + correlationData.getId());
rabbitTemplate.convertAndSend("1-ttl.direct", "1-ttl", message, correlationData);
log.debug("发送消息成功");
}
/**
* 使用DelayExchange-代码方式
* DelayExchange需要将一个交换机声明为delayed类型。当我们发送消息到delayExchange时,流程如下:
* <p>
* 接收消息
* 判断消息是否具备x-delay属性
* 如果有x-delay属性,说明是延迟消息,持久化到硬盘,读取x-delay值,作为延迟时间
* 返回routing not found结果给消息发送者
* x-delay时间到期后,重新投递消息到指定队列
*
* @throws InterruptedException 异常
*/
@Test
public void testSendDelayMessage() throws InterruptedException {
// 1.准备消息
Message message =
MessageBuilder.withBody("hello, delayed test ttl messsage".getBytes(StandardCharsets.UTF_8)).setDeliveryMode(
MessageDeliveryMode.PERSISTENT).setHeader("x-delay", 5000).build();
// 2.准备CorrelationData
//消息ID
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
System.out.println("消息ID:" + correlationData.getId());
// 3.发送消息
rabbitTemplate.convertAndSend("delay.direct", "delay", message, correlationData);
log.info("发送消息成功");
}
}
基于上述代码,概述:
消息队列在使用过程中,面临着很多实际问题需要思考:
消息可靠性问题:如何确保发送的消息至少被消费—次
延迟消息问题:如何实现消息的延迟投递
消息堆积问题:如何解决数百万消息堆积,无法及时消费的问题
高可用问题:如何避免单点的MQ故障而导致的不可用问题
一、消息可靠性
背景/需求:消息从发送,到消费者接收,会经历多个过程:
其中的每一步都可能导致消息丢失,常见的丢失原因包括:
发送时丢失:
生产者发送的消息【未送达exchange】——返回nack(消息确认模式)
消息【到达exchange】——返回ack(消息确认模式)
到达queue后,MQ宕机,queue将消息丢失
——返回ACK,及路由失败原因(回退模式)
consumer接收到消息后还未消费就宕机——消息持久化
1、【生产者】消息确认
RabbitMQ提供了publisher confirm机制来避免消息发送到MQ过程中丢失。这种机制必须给每个消息指定一个唯一ID。消息发送到MQ以后,会返回一个结果给发送者,表示消息是否处理成功。
返回结果有两种方式:
publisher-confirm,发送者确认
消息成功投递到交换机,返回ack
消息未投递到交换机,返回nack
publisher-return,发送者回执
消息投递到交换机了,但是没有路由到队列。返回ACK,及路由失败原因。
注意:确认机制发送消息时,需要给每个消息设置一个全局唯一id,以区分不同消息,避免ack冲突
1.1 修改application.yml配置文件,添加下面的内容:
位置:生产者/publisher服务
目的:
1、开启消息确认模式
2、开启消息回退(并设置消息路由到队列失败时,回退消息给回调接口)
CorrelationData的作用:
1、消息ID需要封装到CorrelationData
2、correlationData.getFuture().addCallback(…)是一个回调函数:决定了每个业务处理confirm成功或失败的逻辑
2、消息持久化(了解)
背景/需求:生产者确认可以确保消息投递到RabbitMQ的队列中,但是消息发送到RabbitMQ以后,如果消息队列突然宕机,也可能导致消息丢失。
(因为消息队列默认是内存存储)
(发送到消息队列成功+消息队列突然宕机=消息丢失)
要想确保消息在RabbitMQ中安全保存,必须开启消息持久化机制(写入到磁盘中)
注:SpringAMQP默认是进行持久化(包括声明队列、交换机、发送消息)(备注:通过管控台创建的默认是非持久化的)
那么,下面学的消息持久化有什么用呢?持久化毕竟是写磁盘,会有一定的性能损耗,不是所有的数据都需要持久化,学了下面的持久化后可以手动将不需要持久化的数据取消持久化
RabbitMQ Management 控制台设置:
说明:
Durable:持久的
Transient:转瞬即逝的
交换机持久化
消息队列持久化
3、【消费者】消息确认
RabbitMQ是阅后即焚机制,RabbitMQ确认消息被消费者消费后会立刻删除。此时,如果消费者还没有处理消息,然后消费者挂掉了,就会导致消息丢失。
场景如下:
1)RabbitMQ投递消息给消费者
2)消费者获取消息后,【返回ACK给RabbitMQ】
3)RabbitMQ删除消息
4)消费者宕机,消息尚未处理
(成功发送到消费者+消费者还没处理消息就宕机了)消息就丢失了。因此消费者返回ACK的时机非常重要。
而SpringAMQP则允许配置三种确认模式:
manual:手动ack,需要在业务代码结束后,调用api发送ack。
auto:自动ack,由spring监测listener代码是否出现异常,没有异常则返回ack;抛出异常则返回nack
none:关闭ack,MQ假定消费者获取消息后会成功处理,因此消息投递后立即被删除(此时,消息投递是不可靠的,可能丢失)
一般,我们都是使用默认的auto即可。
【yml配置文件中配置消息确认模式】:
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: auto #由spring监测listener代码是否出现异常,没有异常则返回ack;抛出异常则返回nack
4、消费失败重试机制
我们可以利用Spring的retry机制,在消费者出现异常时利用本地重试,而不是无限制的requeue到mq队列(不返回ack,也不返回nack),而是可以自己设置重试的次数(如果在重试n次后仍然失败,那么后面在继续重入队大概率也会失败,那么就直接扔掉,不再重入队,此时,Spring会返回ack)
总结:
开启本地重试时,消息处理过程中抛出异常,不会requeue到队列,而是在消费者本地重试
【重试达到最大次数后,Spring会返回ack,消息会被丢弃】
1、本地重试
修改consumer服务的application.yml文件,添加内容:
spring:
rabbitmq:
listener:
simple:
retry: # Spring消费者失败重试
enabled: true # 【开关】开启消费者失败重试
initial-interval: 1000 # 初识的失败等待时长为1秒(第一次失败后1s重试)
multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval(举例:倍数*第一次等待时长1s,这样子永远都是1s)
#但是如果设置为2,下次等待时长为上次的2倍,因此等待时长依次为1、2、4、8、16....
max-attempts: 3 # 最大重试次数
stateless: true # (默认为true)true无状态;false有状态【如果业务中包含事务,这里改为false】
#(备注:如果设置为false,那么Spring在重试的时候保留事务——消耗性能,所以没有事务时设置为true提升性能)
max-interval: 10000 # 最大等待时长,大于此时长的一律按最大时长来计算
重启consumer服务,重复之前的测试。可以发现:
在重试4次后,SpringAMQP会抛出异常AmqpRejectAndDontRequeueException,说明本地重试触发了
查看RabbitMQ控制台,发现消息被删除了,说明最后SpringAMQP返回的是ack,mq删除消息了
2、失败策略
问题:在上面的测试中,达到最大重试次数后,消息会被丢弃,这是由Spring内部机制决定的。但是,有些数据特别重要,我们不希望任何消息被丢弃,此时,我们应该如何实现?
在开启重试模式后,重试次数耗尽,如果消息依然失败,则需要有MessageRecovery接口来处理,它包含三种不同的实现:
RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,【丢弃消息】【默认】就是这种方式
ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队(Immediate立刻重入队)(但是频率比没有配置消费失败重载机制低一些)
RepublishMessageRecoverer(推荐):重试耗尽后,将失败消息投递到指定的交换机
RepublishMessageRecoverer:失败后将消息投递到一个指定的,专门存放异常消息的队列,后续由人工集中处理,这样所有的消息都不会丢失。
总结:
如何确保RabbitMQ消息的可靠性?
开启生产者确认机制,确保生产者的消息能到达队列
开启持久化功能,确保消息未消费前在队列中不会丢失
开启消费者确认机制为auto,由spring确认消息处理成功后完成ack
开启消费者失败重试机制,并设置MessageRecoverer,多次重试失败后将消息投递到异常交换机,交由人工处理
标签:队列,springframework,高级,特性,交换机,消息,org,import,RabbitMQ
From: https://blog.csdn.net/qq_41041630/article/details/143794099