首页 > 其他分享 >16-RabbitMQ高级特性-消费端的消息ACK与重回队列

16-RabbitMQ高级特性-消费端的消息ACK与重回队列

时间:2022-10-04 23:35:41浏览次数:48  
标签:false 16 ACK RabbitMQ 队列 import 重回 channel

消费端的消息ACK与重回队列

消费端的手工ACK和NACK

  • ACK分为自动和手动
  • 消费端进行消费的时候, 如果由于业务异常我们可以进行日志的记录, 然后进行补偿
  • 如果由于服务器宕机等严重问题, 那我们就需要手工进行ACK保障消费端消费成功

消费端的重回队列

  • 消费端重回队列是为了对没有处理成功的消息, 把消息重新会递给Broker
  • 一般我们在实际应用中, 都会关闭重回队列, 也就是设置为FALSE
  • 为什么不使用重回队列的功能呢, 因为消息重回队列会加入到队列的尾部, 也会造成一条甚至大量消息一直重复投递在队列中死循环
  • 说道这里, 其实我是真实碰到过的, 当时正是双11, 我们的失败策略就是用的重回队列, 导致有大量的消息一直因为业务的异常, 重回队列, 导致了4000万的订单MQ消息, 一直压力下不去, 差点被领导骂死~, 后面还做了重大事故回顾会议, 哎

消息重回队列代码实现

消费者

package com.dance.redis.mq.rabbit.rqueue;

import com.dance.redis.mq.rabbit.RabbitMQHelper;
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeUnit;
 
public class Receiver {
 
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMQHelper.getChannel();
        String queueName = "test001";
        RabbitMQHelper.queueDeclare(channel,queueName,true);
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag,
                                       Envelope envelope,
                                       AMQP.BasicProperties properties,
                                       byte[] body)
                    throws IOException {
                try {
                    System.out.println("receive message:" + new String(body) + ", RoutingKey: " + envelope.getRoutingKey());
                    /**
                     * 手工ACK
                     * 消费失败, 但是消息不重回队列
                     *  channel.basicNack(envelope.getDeliveryTag(), false, false);
                     * 消费失败, 将消息重新丢回消息队列尾部
                     *  channel.basicNack(envelope.getDeliveryTag(), false, true);
                     * 消费成功
                     *  channel.basicAck(envelope.getDeliveryTag(), false);
                     */
                    if((Integer)properties.getHeaders().get("flag") == 0) {
                        //throw new RuntimeException("异常");
                        // 设置为false表示关闭重回队列
                        channel.basicNack(envelope.getDeliveryTag(), false, false);
                        // 设置为true表示开启重回队列 将这条消息重回放入队列
//                        channel.basicNack(envelope.getDeliveryTag(), false, true);
                    } else {
                        channel.basicAck(envelope.getDeliveryTag(), false);
                    }
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };
        //    参数:队列名称、是否自动ACK、Consumer
        channel.basicConsume(queueName, false, consumer);
        // 等待回调函数执行完毕之后,关闭资源。
        TimeUnit.SECONDS.sleep(50);
        channel.close();
        RabbitMQHelper.closeConnection();
    }
}

生产者

package com.dance.redis.mq.rabbit.rqueue;

import com.dance.redis.mq.rabbit.RabbitMQHelper;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;

import java.util.HashMap;
import java.util.Map;

public class Sender {

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMQHelper.getChannel();
        //4 声明
        String queueName = "test001";
        RabbitMQHelper.queueDeclare(channel, queueName, true);
        for (int i = 0; i < 5; i++) {
            String msg = "Hello World RabbitMQ " + i;
            Map<String, Object> headers = new HashMap<>();
            headers.put("flag", i);
            AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
                    .deliveryMode(2)
                    .contentEncoding("UTF-8")
                    .headers(headers).build();
            channel.basicPublish("", queueName, props, msg.getBytes());
        }
    }

}

测试

开启重回队列测试

启动消费者

启动生产者

查看消费者

可以看到flag=0的消息, 再一直被重回队列, 当然, 我们可以通过程序去控制这个是不是要重回队列

关闭重回队列测试

启动消费者

启动生产者

查看消费者

可以看到哪怕, 我们手工NACK之后, 消息也没有重回队列

标签:false,16,ACK,RabbitMQ,队列,import,重回,channel
From: https://www.cnblogs.com/flower-dance/p/16754820.html

相关文章

  • 17-RabbitMQ高级特性-TTL队列/消息
    TTL队列/消息TTL:TimeToLive,生存时间RabbitMQ支持消息的过期时间,在消息发送时可以指定RabbitMQ支持队列的过期时间,从消息进入队列开始计算,只要超过了队列......
  • 18-RabbitMQ高级特性-死信队列
    死信队列死信队列:DLX,Dead-Letter-Exchange利用DLX,当消息在一个队列中变成死信(deadmessage)之后,它能被重新publish到另一个Exchange,这个Exchange就是DLXDL......
  • 07-RabbitMQ核心API-Direct Exchange
    DirectExchange简介所有发送到directexchange的消息被转发到Routekey中指定的Queue注意:Direct模式可以使用RabbitMQ自带的Exchange(defaultexchange),所以不需......
  • 08-RabbitMQ核心API-Topic Exchange
    TopicExchange简介所有发送到TopicExchange的消息被转发到所有关心RouteKey中指定Topic的Queue上Exchange将RouteKey和某Topic进行模糊匹配,此时队列需要绑定一个T......
  • 09-RabbitMQ核心API-Fanout Exchange
    FanoutExchange简介不处理路由键,只需要简单的将队列绑定到交换机上发送到交换机的消息都会被转发到与该交换机绑定的所有队列上Fanout交换机转发消息是最快的......
  • 10-RabbitMQ核心API-其他[Binding, Queue, Message, Virtual host]
    Binding绑定关系Exchange和Exchange,Queue之间的连接关系Binding中可以包含RouteKey或者参数Queue消息队列,实际存储消息数据Durability:是否持久化,Durable......
  • 11-RabbitMQ高级特性-消息如何保证100%的投递成功
    消息如何保证100%的投递成功什么是生产端的可靠性投递保障消息的成功发出保障MQ节点的成功接收发送端收到MQ节点(Broker)确认应答完善的消息进行补偿机制BAT/TMD......
  • 12-RabbitMQ高级特性-在海量订单产生的业务高峰期, 如何避免消息的重复消费问题
    幂等性概念详解幂等性是什么可以借鉴数据库的乐观锁机制比如执行一条更新库存的SQLupdatet_repssetcount=count-1,version=version+1whereversion......
  • 13-RabbitMQ高级特性-Confirm确认消息
    Confirm确认消息理解Confirm消息确认机制消息的确认,是指投递消息后,如果Broker收到消息,则会给我们生产者一个应答生产者进行接收应答用来确定这条消息是否正常的......
  • 04-基于CentOS7安装RabbitMQ3.10.7
    RabbitMQ安装与入门安装与启动我实在是找不到这么老的版本了,直接用最新版本的,按照道理来说,新版本是兼容老版本的官网地址https://www.rabbitmq.com/Erlang安......