1. RabbitMQ死信队列
1.1 死信队列简介
在实际开发项目是,在较为重要的业务场景中,要确保未被消费的消息不被丢弃(例如:订单业 务),那为了保证消息数据的不丢失,可以使用RabbitMQ的死信队列机制,当消息消费发生异 常时,将消息投入到死信队列中进行处理。
死信队列:RabbitMQ中并不是直接声明一个公共的死信队列,然后死信消息就会跑到死信队列 中。而是为每个需要使用死信的消息队列配置一个死信交换机,当消息成为死信后,可以被重新 发送到死信交换机,然后再发送给使用死信的消息队列。
死信交换机:英文缩写:DLX 。Dead Letter Exchange(死信交换机),死信交换机其实就是 普通的交换机,通过给队列设置参数: x-dead-letter-exchange 和x-dead-letter-routing-key, 来指向死信交换机
1.2 死信
RabbitMQ规定消息符合以下某种情况时,将会成为死信
- 队列消息长度到达限制(队列消息个数限制);
- 消费者拒接消费消息,basicNack/basicReject,并且不把消息重新放入原目标队 列,requeue=false;
- 原队列存在消息过期设置,消息到达超时时间未被消费;
死信消息会被RabbitMQ特殊处理,如果配置了死信队列,则消息会被丢到死信队列中,如果没 有配置死信队列,则消息会被丢弃。
1.3 死信队列简介
1.3.1 消费者拒绝接收消息
1.创建Springboot项目
2.在项目的pom.xml文件中引入相关起步依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
3.创建RabbitMQConfiguration配置类,进行队列配置
@Configuration
public class RabbitMQConfiguration {
//普通交换机
@Bean
public FanoutExchange fanoutExchange(){
return new FanoutExchange("Fanout_Exchange");
}
//死信交换机
@Bean
public DirectExchange deadExchange(){
return new DirectExchange("Dead_Exchange");
}
//普通消息队列
@Bean
public Queue fanoutQueue(){
Map<String,Object> map = new HashMap<>();
map.put("x-dead-letter-exchange","Dead_Exchange"); //当前队列绑定的死信交换机
map.put("x-dead-letter-routing-key","Dead_Routing_Key");//当前队列的死信路由key
return QueueBuilder.durable("Fanout_Queue").withArguments(map).build();
}
//死信消息队列
@Bean
public Queue deadQueue(){
return QueueBuilder.durable("Dead_Queue").build();
}
//普通交换机和普通队列绑定
@Bean
@Autowired
public Binding fanoutBinding(@Qualifier("fanoutExchange")
FanoutExchange exchange, @Qualifier("fanoutQueue") Queue queue){
return BindingBuilder.bind(queue).to(exchange);
}
//死信交换机和死信队列绑定,并指定对应的路由
@Bean
@Autowired
public Binding deadBinding(@Qualifier("deadExchange") DirectExchange exchange,@Qualifier("deadQueue") Queue queue){
return BindingBuilder.bind(queue).to(exchange).with("Dead_Routing_Key");
}
}
4.在application.yml文件中配置
spring:
rabbitmq:
host: 192.168.200.129
port: 5672
username: song
password: 123456
virtual-host: /song
listener:
simple:
acknowledge-mode: manual
5.创建普通队列消费者和死信队列消费者
@Component
public class RabbitMQListener {
//监听接收rabbitmq的消息队列中的消息
@RabbitListener(queues = {"Fanout_Queue"})
public void listenerMessage(Message message, Channel channel) throws IOException {
String msg = new String(message.getBody());
System.out.println("消费者接收到消息:"+msg);
if ("MLGB".equals(msg)){
//拒收消息
channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,fa
lse);
}else{
//接收消息
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}
}
}
@Component
public class DeadListener {
//监听接收rabbitmq的消息队列中的消息
@RabbitListener(queues = {"Dead_Queue"})
public void listenerMessage(Message message, Channel channel) throws IOException {
String msg = new String(message.getBody());
System.out.println("接收到死信消息:"+msg);
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}
}
6.创建生产者发送消息
@RestController
public class RabbitMQController {
@Autowired
private RabbitTemplate rabbitTemplate;
@RequestMapping("/send")
public String sendMessage(String message){
rabbitTemplate.convertSendAndReceive("Fanout_Exchange","",message);
return "success";
}
}
7.运行项目,浏览器地址:http://localhost:9001/send?message=hello时
1.3.2 过期时间
过期时间+死信队列也可以实现延迟队列操作。
在RabbitMQConfiguration配置类中设置过期时间
//普通消息队列
@Bean
public Queue fanoutQueue(){
Map<String,Object> map = new HashMap<>();
map.put("x-dead-letter-exchange","Dead_Exchange"); //当前队列绑定的死信交换机
map.put("x-dead-letter-routing-key","Dead_Routing_Key");//当前队列的死信路由key
map.put("x-message-ttl",10000);//队列消息过期时间,时间ms
return QueueBuilder.durable("Fanout_Queue").withArguments(map).build();
}
将普通队列消费者类上的@Component注解注释掉,表示没有消费者。
运行项目,浏览器地址:http://localhost:9001/send?message=hello
1.3.3 队列消息长度达到限制
在RabbitMQConfiguration配置类中设置队列能存放消息的个数
@Bean
public Queue fanoutQueue(){
Map<String,Object> map = new HashMap<>();
map.put("x-dead-letter-exchange","Dead_Exchange"); //当前队列绑定的死信交换机
map.put("x-dead-letter-routing-key","Dead_Routing_Key");//当前队列的死信路由key
map.put("x-max-length",2);//队列长度
return QueueBuilder.durable("Fanout_Queue").withArguments(map).build();
}
将普通队列消费者类上的@Component注解注释掉,表示没有消费者。
修改生产者,使用循环发送超过消息队列长度个的消息。
@RestController
public class RabbitMQController {
@Autowired
private RabbitTemplate rabbitTemplate;
@RequestMapping("/send")
public String sendMessage(String message){
for (int i=0;i<3;i++){
rabbitTemplate.convertSendAndReceive("Fanout_Exchange","",message);
}
return "success";
}
}
运行项目,浏览器地址:http://localhost:9001/send?message=hello
2. RabbitMQ延迟队列
2.1 延迟队列简介
延迟队列,即消息进入队列后不会立即被消费,只有到达指定时间后,才会被消费。
TTL:消息时间到了,删除消息。
延迟队列:消息时间到了,才能被消费者消费。
RabbitMQ中没有延迟队列,但是可以用ttl+死信队列方式和延迟插件两种方式来实现
2.2 ttl+死信队列
ttl+死信队列代码在讲死信队列时已经实现,这个不再阐述。
2.3 延迟插件
人们一直在寻找用RabbitMQ实现延迟消息的传递方法,到目前为止,公认的解决方案是混合使 用TTL和DLX。rabbitmq_delayed_message_exchange插件就是基于此来实现的,RabbitMQ延 迟消息插件新增了一种新的交换器类型,消息通过这种交换器路由就可以实现延迟发送。
1.将资料中的rabbitmq_delayed_message_exchange-3.9.0.ez上传到Linux 的/usr/lib/rabbitmq/lib/rabbitmq_server-3.9.0/plugins目录中
2.执行指令
开启延迟插件:rabbitmq-plugins enable rabbitmq_delayed_message_exchange
关闭延迟插件:rabbitmq-plugins disable rabbitmq_delayed_message_exchange
3.创建Springboot项目
4.起步依赖和application.yml和之前配置一样
5.创建RabbitMQDelayConfiguration,构建延迟队列
使用延迟插件时,只需要声明一个 x-delayed-message 类型的交换器。
@Configuration
public class RabbitMQDelayConfiguration {
//定义自定义交换机,设置为延迟交换机
@Bean
public CustomExchange delayExchange(){
Map<String, Object> args = new HashMap<>();
/**
* 设置自定义交换器路由消息的类型,direct类似direct交换器路由消息的模
式,也可以传递topic、fanout,或者其它插件提供的自定义的交换器类型
*/
args.put("x-delayed-type", "fanout");
return new CustomExchange("Delay_Fanout_Exchange","x-delayedmessage",true,false,args);
}
@Bean
public Queue delayQueue(){
return QueueBuilder.durable("Delay_Queue").build();
}
@Bean
public Binding delayBinding(@Qualifier("delayQueue")Queue queue,@Qualifier("delayExchange") CustomExchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("Delay_Fanout_Routing_Key").noargs();
}
}
6.创建生产者发送延迟消息
@RestController
public class RabbitMQController {
@Autowired
private RabbitTemplate rabbitTemplate;
@RequestMapping("/send")
public String sendMessage(String message){
MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
MessageProperties messageProperties = message.getMessageProperties();
messageProperties.setDelay(10000);
return message;
}
};
rabbitTemplate.convertSendAndReceive("Delay_Fanout_Exchange", "",message,messagePostProcessor);
return "success";
}
}
7.创建消费者消费消息
@Component
public class RabbitMQListener {
//监听接收rabbitmq的消息队列中的消息
@RabbitListener(queues = {"Delay_Queue"})
public void listenerMessage(Message message, Channel channel) throws IOException {
String msg = new String(message.getBody());
System.out.println("消费者接收到消息:"+msg);
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}
}
3. RabbitMQ消息可靠性投递
RabbitMQ消息可靠性投递主要从三方面考虑
- 生产阶段:消息在 Producer 发送端创建出来,经过网络传输发送到 Broker 存储端。
- 存储阶段:消息在 Broker 端存储。
- 消费阶段:Consumer 消费端从 Broker存储端拉取消息,经过网络传输发送到 Consumer 消费端上,并通过重试来最大限度的保证消息的消费。
3.1 生产阶段
RabbitMQ生产者保证消息不丢失,主要是保证生产者可以尽量100%的将消息发送给 RabbitMQ。
生产端投递的消息丢失的原因有很多,比如消息在网络传输的过程中发生网络故障消息丢失,或 者消息投递到RabbitMQ时RabbitMQ挂了,那消息也可能丢失。
生产者保证消息不丢失有两种方式:事务和Confirm。
3.1.1 RabbitMQ事务
RabbitMQ中与事务有关的主要有三个方法:
- txSelect()
- txCommit()
- txRollback()
txSelect主要用于将当前channel设置成transaction模式,txCommit用于提交事务,txRollback用于回滚事务。
当我们使用txSelect提交开始事务之后,我们就可以发布消息给Broke代理服务器,如果 txCommit提交成功了,则消息一定到达了Broke了,如果在txCommit执行之前Broker出现异常崩溃或者由于其他原因抛出异常,这个时候我们便可以捕获异常通过txRollback方法进行回滚事务了。
示例
@RestController
public class RabbitMQController {
@Autowired
private RabbitTemplate rabbitTemplate;
@RequestMapping("/send")
public String sendMessage(String message){
rabbitTemplate.setChannelTransacted(true); //开启事务操作
rabbitTemplate.execute(channel -> {
try {
channel.txSelect();//开启事务
channel.basicPublish("Fanout_Exchange","",null,message.getBytes());
int i = 5/0;
channel.txCommit();//没有问题提交事务
}catch (Exception e){
e.printStackTrace();
channel.txRollback();//有问题回滚事务
}
return null;
});
return "success";
}
}
消费者没有任何变化。
通过测试会发现,发送消息时只要Broker出现异常崩溃或者由于其他原因抛出异常,就会捕获异常通过txRollback方法进行回滚事务了,则消息不会发送,消费者就获取不到消息。
3.1.2. 发送方消息确认机制(Confirm模式)
使用事务机制的话会“吸干”RabbitMQ的性能,那么有没有更好的方法既能保证消息发送方确认消息已经正确送达,又能基本上不带来性能上的损失呢?从AMQP协议层面来看并没有更好的办 法但是RabbitMQ提供了一个改进方案,即发送方确认机制(publisher confirm)。事务机制和发送方确认机制(publisher confirm)机制两者是互斥的,不能共存。
3.1.2.1. 同步方式
@RestController
public class RabbitMQController {
@Autowired
private RabbitTemplate rabbitTemplate;
@RequestMapping("/send")
public String sendMessage(String message){
rabbitTemplate.execute(channel -> {
channel.confirmSelect(); //开始confirm操作
channel.basicPublish("Fanout_Exchange","",null,message.getBytes());
if (channel.waitForConfirms()){
System.out.println("发送成功");
}else{
//进行消息重发
System.out.println("消息发送失败,进行消息重发");
}
return null;
});
return "success";
}
}
发送一条消息后,会等待服务器端Confirm,如果服务端返回false或者超时时间内未返回,生产 者就进行消息重传。
3.1.2.2. 异步方式
@RestController
public class RabbitMQController {
@Autowired
private RabbitTemplate rabbitTemplate;
@RequestMapping("/send")
public String sendMessage(String message){
rabbitTemplate.execute(channel -> {
channel.confirmSelect();
channel.addConfirmListener(new ConfirmListener() {
//消息正确到达broker,就会发送一条ack消息
@Override
public void handleAck(long l, boolean b) throws IOException{
System.out.println("发送消息成功");
}
//RabbitMQ因为自身内部错误导致消息丢失,就会发送一条nack消息
@Override
public void handleNack(long l, boolean b) throws IOException {
System.out.println("发送消息失败,重新发送消息");
}
});
channel.basicPublish("Fanout_Exchange","",null,message.getBytes());
return null;
});
return "success";
}
}
维持异步调用要求不能断掉连接。
3.2 存储阶段
message消息到达RabbitMQ后先是到exchange交换机中,然后路由给queue队列,最后发送给 消费端。
所有需要给exchange、queue和message都进行持久化:
3.2.1. exchange持久化
第三个参数true表示这个exchange持久化。
channel.exchangeDeclare(EXCHANGE_NAME, "direct", true);
或
@Bean
public FanoutExchange fanoutExchange(){
//参数1:交换机名称;参数2:是否持久化 参数3:是否删除
return new FanoutExchange("Fanout_Exchange",true,false);
}
3.2.2. queue持久化
第二个参数true表示这个queue持久化
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
或
@Bean
public Queue deadQueue(){
return QueueBuilder.durable("Dead_Queue").build();
}
3.2.3. message持久化
MessageProperties.PERSISTENT_TEXT_PLAIN表示这条消息持久化。
channel.basicPublish(
EXCHANGE_NAME,
ROUTING_KEY,
MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes(StandardCharsets.UTF_8));
这样,如果RabbitMQ收到消息后挂了,重启后会自行恢复消息。
3.3 消费阶段
消费者导致数据丢失的场景如下:
- 在RabbitMQ将消息发出后,消费端还没接收到消息之前,发生网络故障,消费端与RabbitMQ断开连接,此时消息会丢失。
- 在RabbitMQ将消息发出后,消费端还没接收到消息之前,消费端挂了,此时消息会丢失。
- 消费端正确接收到消息,但在处理消息的过程中发生异常或宕机了,消息也会丢失。
解决以上三种场景的方式是一样的,只要将消费者的自动确认消息接收,改为手动确认消息接收,因为消费者改为手动确认消息接收,那RabbitMQ如果一直没有收到消费者的确认信号,并且消费此消息的消费者已经断开连接,则RabbitMQ会安排该消息重新进入队列,等待投递给下 一个消费者,或者等待原来的那个消费者重新连接。
那如果消费者接收消息成功,但是处理失败,也可以使用channel.basicNack设置消息拒绝接 收,将消息重新放置会消息队列中,重新发送。
示例
application.yml中配置
spring:
rabbitmq:
host: 192.168.200.129
port: 5672
username: song
password: 123456
virtual-host: /song
listener:
simple:
acknowledge-mode: manual #手动确认
消费者
@Component
public class RabbitMQListener {
//监听接收rabbitmq的消息队列中的消息
@RabbitListener(queues = {"Fanout_Queue"})
public void listenerMessage(Message message, Channel channel) throws IOException {
String msg = new String(message.getBody());
System.out.println("消费者接收到消息:"+msg);
if ("MLGB".equals(msg)){
//拒收消息
//第三个参数:requeue:重回队列。如果设置为true,则消息重新回到queue,broker会重新发送该消息给消费端
channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);
}else{
//接收消息
//参数1:收到消息的标签
//参数2:true:确认<=DeliveryTag接收,false:只确认标签消息
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}
}
}
3.4 消息入库
如果RabbitMQ收到消息还没来得及将消息持久化到硬盘时,RabbitMQ挂了,这样消息还是丢 失了,或者RabbitMQ在发送确认消息给生产端的过程中,由于网络故障而导致生产端没有收到 确认消息,这样生产端就不知道RabbitMQ到底有没有收到消息,这样也不太好进行处理。所以 为了避免RabbitMQ持久化失败而导致数据丢失,我们自己也要做一些消息补偿机制,以应对一 些极端情况。
标签:队列,RabbitMQ,死信,消息,message,public,RabbitMQ03 From: https://www.cnblogs.com/jiabaolatiao/p/17515144.html