首页 > 其他分享 >RabbitMQ03

RabbitMQ03

时间:2023-06-29 20:47:31浏览次数:38  
标签:队列 RabbitMQ 死信 消息 message public RabbitMQ03

1. RabbitMQ死信队列

1.1 死信队列简介

在实际开发项目是,在较为重要的业务场景中,要确保未被消费的消息不被丢弃(例如:订单业 务),那为了保证消息数据的不丢失,可以使用RabbitMQ的死信队列机制,当消息消费发生异 常时,将消息投入到死信队列中进行处理。

死信队列:RabbitMQ中并不是直接声明一个公共的死信队列,然后死信消息就会跑到死信队列 中。而是为每个需要使用死信的消息队列配置一个死信交换机,当消息成为死信后,可以被重新 发送到死信交换机,然后再发送给使用死信的消息队列。

死信交换机:英文缩写:DLX 。Dead Letter Exchange(死信交换机),死信交换机其实就是 普通的交换机,通过给队列设置参数: x-dead-letter-exchange 和x-dead-letter-routing-key, 来指向死信交换机

image

1.2 死信

RabbitMQ规定消息符合以下某种情况时,将会成为死信

  • 队列消息长度到达限制(队列消息个数限制);
  • 消费者拒接消费消息,basicNack/basicReject,并且不把消息重新放入原目标队 列,requeue=false;
  • 原队列存在消息过期设置,消息到达超时时间未被消费;

死信消息会被RabbitMQ特殊处理,如果配置了死信队列,则消息会被丢到死信队列中,如果没 有配置死信队列,则消息会被丢弃。

1.3 死信队列简介

1.3.1 消费者拒绝接收消息

1.创建Springboot项目

2.在项目的pom.xml文件中引入相关起步依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>

3.创建RabbitMQConfiguration配置类,进行队列配置

@Configuration
public class RabbitMQConfiguration {
    //普通交换机
    @Bean
    public FanoutExchange fanoutExchange(){
    	return new FanoutExchange("Fanout_Exchange");
    }
    //死信交换机
    @Bean
    public DirectExchange deadExchange(){
    	return new DirectExchange("Dead_Exchange");
    }
    //普通消息队列
    @Bean
    public Queue fanoutQueue(){
        Map<String,Object> map = new HashMap<>();
        map.put("x-dead-letter-exchange","Dead_Exchange"); //当前队列绑定的死信交换机
    	map.put("x-dead-letter-routing-key","Dead_Routing_Key");//当前队列的死信路由key
    	return QueueBuilder.durable("Fanout_Queue").withArguments(map).build();
    }
    //死信消息队列
    @Bean
    public Queue deadQueue(){
    	return QueueBuilder.durable("Dead_Queue").build();
    }
    //普通交换机和普通队列绑定
    @Bean
    @Autowired
    public Binding fanoutBinding(@Qualifier("fanoutExchange")
        FanoutExchange exchange, @Qualifier("fanoutQueue") Queue queue){
        return BindingBuilder.bind(queue).to(exchange);
    }
    //死信交换机和死信队列绑定,并指定对应的路由
    @Bean
    @Autowired
    public Binding deadBinding(@Qualifier("deadExchange") DirectExchange exchange,@Qualifier("deadQueue") Queue queue){
        return BindingBuilder.bind(queue).to(exchange).with("Dead_Routing_Key");
    }
}

4.在application.yml文件中配置

spring:
    rabbitmq:
        host: 192.168.200.129
        port: 5672
        username: song
        password: 123456
        virtual-host: /song
        listener:
            simple:
            	acknowledge-mode: manual

5.创建普通队列消费者和死信队列消费者

@Component
public class RabbitMQListener {
    //监听接收rabbitmq的消息队列中的消息
    @RabbitListener(queues = {"Fanout_Queue"})
    public void listenerMessage(Message message, Channel channel) throws IOException {
        String msg = new String(message.getBody());
        System.out.println("消费者接收到消息:"+msg);
        if ("MLGB".equals(msg)){
        //拒收消息
    channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,fa
lse);
        }else{
        //接收消息
      channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
        }
    }
}
@Component
public class DeadListener {
    //监听接收rabbitmq的消息队列中的消息
    @RabbitListener(queues = {"Dead_Queue"})
    public void listenerMessage(Message message, Channel channel) throws IOException {
    String msg = new String(message.getBody());
    System.out.println("接收到死信消息:"+msg);
    channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
    }
}

6.创建生产者发送消息

@RestController
public class RabbitMQController {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    @RequestMapping("/send")
    public String sendMessage(String message){
        rabbitTemplate.convertSendAndReceive("Fanout_Exchange","",message);
        return "success";
    }
}

7.运行项目,浏览器地址:http://localhost:9001/send?message=hello时

image

image

1.3.2 过期时间

过期时间+死信队列也可以实现延迟队列操作。

在RabbitMQConfiguration配置类中设置过期时间

//普通消息队列
@Bean
public Queue fanoutQueue(){
    Map<String,Object> map = new HashMap<>();
    map.put("x-dead-letter-exchange","Dead_Exchange"); //当前队列绑定的死信交换机
    map.put("x-dead-letter-routing-key","Dead_Routing_Key");//当前队列的死信路由key
    map.put("x-message-ttl",10000);//队列消息过期时间,时间ms
    return QueueBuilder.durable("Fanout_Queue").withArguments(map).build();
}

将普通队列消费者类上的@Component注解注释掉,表示没有消费者。

运行项目,浏览器地址:http://localhost:9001/send?message=hello

image

1.3.3 队列消息长度达到限制

在RabbitMQConfiguration配置类中设置队列能存放消息的个数

@Bean
public Queue fanoutQueue(){
    Map<String,Object> map = new HashMap<>();
    map.put("x-dead-letter-exchange","Dead_Exchange"); //当前队列绑定的死信交换机
    map.put("x-dead-letter-routing-key","Dead_Routing_Key");//当前队列的死信路由key
    map.put("x-max-length",2);//队列长度
    return QueueBuilder.durable("Fanout_Queue").withArguments(map).build();
}

将普通队列消费者类上的@Component注解注释掉,表示没有消费者。

修改生产者,使用循环发送超过消息队列长度个的消息。

@RestController
public class RabbitMQController {
      @Autowired
      private RabbitTemplate rabbitTemplate;
      @RequestMapping("/send")
      public String sendMessage(String message){
      for (int i=0;i<3;i++){
        rabbitTemplate.convertSendAndReceive("Fanout_Exchange","",message);
      }
    return "success";
  }
}

运行项目,浏览器地址:http://localhost:9001/send?message=hello

image

2. RabbitMQ延迟队列

2.1 延迟队列简介

延迟队列,即消息进入队列后不会立即被消费,只有到达指定时间后,才会被消费。

TTL:消息时间到了,删除消息。

延迟队列:消息时间到了,才能被消费者消费。

image

RabbitMQ中没有延迟队列,但是可以用ttl+死信队列方式和延迟插件两种方式来实现

2.2 ttl+死信队列

image

ttl+死信队列代码在讲死信队列时已经实现,这个不再阐述。

2.3 延迟插件

人们一直在寻找用RabbitMQ实现延迟消息的传递方法,到目前为止,公认的解决方案是混合使 用TTL和DLX。rabbitmq_delayed_message_exchange插件就是基于此来实现的,RabbitMQ延 迟消息插件新增了一种新的交换器类型,消息通过这种交换器路由就可以实现延迟发送。

1.将资料中的rabbitmq_delayed_message_exchange-3.9.0.ez上传到Linux 的/usr/lib/rabbitmq/lib/rabbitmq_server-3.9.0/plugins目录中

2.执行指令

开启延迟插件:rabbitmq-plugins enable rabbitmq_delayed_message_exchange
关闭延迟插件:rabbitmq-plugins disable rabbitmq_delayed_message_exchange

3.创建Springboot项目

4.起步依赖和application.yml和之前配置一样

5.创建RabbitMQDelayConfiguration,构建延迟队列

使用延迟插件时,只需要声明一个 x-delayed-message 类型的交换器。

@Configuration
public class RabbitMQDelayConfiguration {
    //定义自定义交换机,设置为延迟交换机
    @Bean
    public CustomExchange delayExchange(){
    	Map<String, Object> args = new HashMap<>();
        /**
        * 设置自定义交换器路由消息的类型,direct类似direct交换器路由消息的模
        式,也可以传递topic、fanout,或者其它插件提供的自定义的交换器类型
        */
        args.put("x-delayed-type", "fanout");
        return new CustomExchange("Delay_Fanout_Exchange","x-delayedmessage",true,false,args);
    }
    @Bean
    public Queue delayQueue(){
    	return QueueBuilder.durable("Delay_Queue").build();
    }
    @Bean
    public Binding delayBinding(@Qualifier("delayQueue")Queue queue,@Qualifier("delayExchange") CustomExchange exchange){
    return BindingBuilder.bind(queue).to(exchange).with("Delay_Fanout_Routing_Key").noargs();
    }
}

6.创建生产者发送延迟消息

@RestController
public class RabbitMQController {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @RequestMapping("/send")
    public String sendMessage(String message){
    	MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
        @Override
        public Message postProcessMessage(Message message) throws AmqpException {
            MessageProperties messageProperties = message.getMessageProperties();
            messageProperties.setDelay(10000);
            return message;
        }
    };
        rabbitTemplate.convertSendAndReceive("Delay_Fanout_Exchange", "",message,messagePostProcessor);
        return "success";
    }
}

7.创建消费者消费消息

@Component
public class RabbitMQListener {
    //监听接收rabbitmq的消息队列中的消息
    @RabbitListener(queues = {"Delay_Queue"})
    public void listenerMessage(Message message, Channel channel) throws IOException {
        String msg = new String(message.getBody());
        System.out.println("消费者接收到消息:"+msg);
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
    }
}

3. RabbitMQ消息可靠性投递

RabbitMQ消息可靠性投递主要从三方面考虑

  • 生产阶段:消息在 Producer 发送端创建出来,经过网络传输发送到 Broker 存储端。
  • 存储阶段:消息在 Broker 端存储。
  • 消费阶段:Consumer 消费端从 Broker存储端拉取消息,经过网络传输发送到 Consumer 消费端上,并通过重试来最大限度的保证消息的消费。

3.1 生产阶段

RabbitMQ生产者保证消息不丢失,主要是保证生产者可以尽量100%的将消息发送给 RabbitMQ。

生产端投递的消息丢失的原因有很多,比如消息在网络传输的过程中发生网络故障消息丢失,或 者消息投递到RabbitMQ时RabbitMQ挂了,那消息也可能丢失。

生产者保证消息不丢失有两种方式:事务和Confirm。

3.1.1 RabbitMQ事务

RabbitMQ中与事务有关的主要有三个方法:

  • txSelect()
  • txCommit()
  • txRollback()

txSelect主要用于将当前channel设置成transaction模式,txCommit用于提交事务,txRollback用于回滚事务。

当我们使用txSelect提交开始事务之后,我们就可以发布消息给Broke代理服务器,如果 txCommit提交成功了,则消息一定到达了Broke了,如果在txCommit执行之前Broker出现异常崩溃或者由于其他原因抛出异常,这个时候我们便可以捕获异常通过txRollback方法进行回滚事务了。

示例

@RestController
public class RabbitMQController {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    @RequestMapping("/send")
    public String sendMessage(String message){
        rabbitTemplate.setChannelTransacted(true); //开启事务操作
        rabbitTemplate.execute(channel -> {
            try {
            	channel.txSelect();//开启事务
            channel.basicPublish("Fanout_Exchange","",null,message.getBytes());
                int i = 5/0;
                channel.txCommit();//没有问题提交事务
            }catch (Exception e){
                e.printStackTrace();
                channel.txRollback();//有问题回滚事务
            }
            return null;
        });
        return "success";
    }
}

消费者没有任何变化。

通过测试会发现,发送消息时只要Broker出现异常崩溃或者由于其他原因抛出异常,就会捕获异常通过txRollback方法进行回滚事务了,则消息不会发送,消费者就获取不到消息。

3.1.2. 发送方消息确认机制(Confirm模式)

使用事务机制的话会“吸干”RabbitMQ的性能,那么有没有更好的方法既能保证消息发送方确认消息已经正确送达,又能基本上不带来性能上的损失呢?从AMQP协议层面来看并没有更好的办 法但是RabbitMQ提供了一个改进方案,即发送方确认机制(publisher confirm)事务机制和发送方确认机制(publisher confirm)机制两者是互斥的,不能共存。

3.1.2.1. 同步方式

@RestController
public class RabbitMQController {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @RequestMapping("/send")
        public String sendMessage(String message){
        	rabbitTemplate.execute(channel -> {
        		channel.confirmSelect(); //开始confirm操作
        channel.basicPublish("Fanout_Exchange","",null,message.getBytes());
        if (channel.waitForConfirms()){
        	System.out.println("发送成功");
        }else{
        //进行消息重发
        	System.out.println("消息发送失败,进行消息重发");
        }
        	return null;
        });
        return "success";
    }
}

发送一条消息后,会等待服务器端Confirm,如果服务端返回false或者超时时间内未返回,生产 者就进行消息重传。

3.1.2.2. 异步方式

@RestController
public class RabbitMQController {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @RequestMapping("/send")
    public String sendMessage(String message){
    	rabbitTemplate.execute(channel -> {
    		channel.confirmSelect();
    		channel.addConfirmListener(new ConfirmListener() {
                //消息正确到达broker,就会发送一条ack消息
                @Override
                public void handleAck(long l, boolean b) throws IOException{
                	System.out.println("发送消息成功");
                }
        		//RabbitMQ因为自身内部错误导致消息丢失,就会发送一条nack消息
                @Override
                public void handleNack(long l, boolean b) throws IOException {
                	System.out.println("发送消息失败,重新发送消息");
                }
             });
    channel.basicPublish("Fanout_Exchange","",null,message.getBytes());
        	return null;
        });
        return "success";
    }
}

维持异步调用要求不能断掉连接。

3.2 存储阶段

message消息到达RabbitMQ后先是到exchange交换机中,然后路由给queue队列,最后发送给 消费端。

所有需要给exchange、queue和message都进行持久化:

3.2.1. exchange持久化

第三个参数true表示这个exchange持久化。

channel.exchangeDeclare(EXCHANGE_NAME, "direct", true);
或
@Bean
public FanoutExchange fanoutExchange(){
    //参数1:交换机名称;参数2:是否持久化 参数3:是否删除
    return new FanoutExchange("Fanout_Exchange",true,false);
}

3.2.2. queue持久化

第二个参数true表示这个queue持久化

channel.queueDeclare(QUEUE_NAME, true, false, false, null);
或
@Bean
public Queue deadQueue(){
	return QueueBuilder.durable("Dead_Queue").build();
}

3.2.3. message持久化

MessageProperties.PERSISTENT_TEXT_PLAIN表示这条消息持久化。

channel.basicPublish(
        EXCHANGE_NAME,
        ROUTING_KEY,
        MessageProperties.PERSISTENT_TEXT_PLAIN,
        message.getBytes(StandardCharsets.UTF_8));

这样,如果RabbitMQ收到消息后挂了,重启后会自行恢复消息。

3.3 消费阶段

消费者导致数据丢失的场景如下:

  • 在RabbitMQ将消息发出后,消费端还没接收到消息之前,发生网络故障,消费端与RabbitMQ断开连接,此时消息会丢失。
  • 在RabbitMQ将消息发出后,消费端还没接收到消息之前,消费端挂了,此时消息会丢失。
  • 消费端正确接收到消息,但在处理消息的过程中发生异常或宕机了,消息也会丢失。

解决以上三种场景的方式是一样的,只要将消费者的自动确认消息接收,改为手动确认消息接收,因为消费者改为手动确认消息接收,那RabbitMQ如果一直没有收到消费者的确认信号,并且消费此消息的消费者已经断开连接,则RabbitMQ会安排该消息重新进入队列,等待投递给下 一个消费者,或者等待原来的那个消费者重新连接。

那如果消费者接收消息成功,但是处理失败,也可以使用channel.basicNack设置消息拒绝接 收,将消息重新放置会消息队列中,重新发送。

示例

application.yml中配置

spring:
    rabbitmq:
        host: 192.168.200.129
        port: 5672
        username: song
        password: 123456
        virtual-host: /song
        listener:
            simple:
            	acknowledge-mode: manual #手动确认

消费者

@Component
public class RabbitMQListener {
    //监听接收rabbitmq的消息队列中的消息
    @RabbitListener(queues = {"Fanout_Queue"})
    public void listenerMessage(Message message, Channel channel) throws IOException {
        String msg = new String(message.getBody());
        System.out.println("消费者接收到消息:"+msg);
        if ("MLGB".equals(msg)){
            //拒收消息
            //第三个参数:requeue:重回队列。如果设置为true,则消息重新回到queue,broker会重新发送该消息给消费端
        channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);
        }else{
            //接收消息
            //参数1:收到消息的标签
            //参数2:true:确认<=DeliveryTag接收,false:只确认标签消息
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
        }
    }
}

3.4 消息入库

如果RabbitMQ收到消息还没来得及将消息持久化到硬盘时,RabbitMQ挂了,这样消息还是丢 失了,或者RabbitMQ在发送确认消息给生产端的过程中,由于网络故障而导致生产端没有收到 确认消息,这样生产端就不知道RabbitMQ到底有没有收到消息,这样也不太好进行处理。所以 为了避免RabbitMQ持久化失败而导致数据丢失,我们自己也要做一些消息补偿机制,以应对一 些极端情况。

image

标签:队列,RabbitMQ,死信,消息,message,public,RabbitMQ03
From: https://www.cnblogs.com/jiabaolatiao/p/17515144.html

相关文章