首页 > 其他分享 >RabbitMQ重试机制

RabbitMQ重试机制

时间:2024-04-15 18:45:20浏览次数:17  
标签:队列 RabbitMQ 重试 ACK 死信 消息 机制

RabbitMQ重试机制+死信队列

RabbitMQ重试机制

RabbitMQ的消息重试机制,就是消息消费失败后进行重试,重试机制的触发条件是消费者显式的抛出异常,这个很类似@Transactional,如果没有显式地抛出异常或者try catch起来没有手动回滚,事务是不会回滚的。

if("ACK重试机制".equals(messageBody)){
    message.getMessageProperties().getHeaders().put("x-death", count+1);
    throw new RuntimeException("手动出发异常,测试重试机制");
}

还有一种情况就是消息被拒绝后重新加入队列,比如basic.reject和basic.nack,并且requeue = true,但是这个是重新进入到了消息队列然后重新被消费,并且也不会触发我们重试机制的配置(如重试间隔、最大重试次数等等)。重试机制是默认开启的,但是如果没有重试机制相关的配置会导致消息一直无间隔的重试,直到消费成功,所以要使用重试机制一定要有相关配置。

死信队列

死信就是消息在特定场景下的一种表现形式,这些场景包括:

  1. 消息被拒绝(basic.reject / basic.nack),并且requeue = false
  2. 消息的 TTL 过期时
  3. 消息队列达到最大长度
  4. 达到最大重试限制

消息在这些场景中时,被称为死信。

死信队列就是用于储存死信的消息队列,在死信队列中,有且只有死信构成,不会存在其余类型的消息。死信队列也是一个普通队列,也可以被消费者消费,区别在于业务队列需要绑定在死信队列上,才能正常地把死信发送到死信队列上。

业务队列绑定死信队列

    @Bean
    public Queue directQueue() {
        /**
         * 绑定死信交换机及路由key
         */
        Map<String, Object> args = new HashMap<>();
        // x-dead-letter-exchange:这里声明当前业务队列绑定的死信交换机
        //消息被拒绝、消息过期,或者队列达到其最大长度。消息会变成死信
        args.put("x-dead-letter-exchange", DEAD_TCP_DATA_DIRECT_EXCHANGE);
        // x-dead-letter-routing-key:这里声明当前业务队列的死信路由 key
        args.put("x-dead-letter-routing-key", DEAD_TCP_DATA_DIRECT_ROUTING);
        return QueueBuilder.durable(DIRECT_QUEUE).withArguments(args).build();
    }

自动ACK + RabbitMQ重试机制

appliction.properties

# 消息重试机制: 自动ACK+MQ消息重试
spring.rabbitmq.listener.simple.acknowledge-mode=auto
spring.rabbitmq.listener.simple.retry.enabled=true
spring.rabbitmq.listener.simple.retry.max-attempts=5
spring.rabbitmq.listener.simple.retry.initial-interval=5000

消费者

   @RabbitListener(queues = RabbitMqConfig.USER_ADD_QUEUE, concurrency = "3")
    public void userAddReceiver(String data, Message message, Channel channel) throws Exception {
        UserVo vo = OBJECT_MAPPER.readValue(data, UserVo.class);
        boolean success = messageHandle(vo);
        // 通过业务控制是否消费成功,消费失败则抛出异常触发重试
        if (!success) {
            log.error("消费失败");
            throw new Exception("消息消费失败");
        }
    }

一定要开启自动ACK,才会在到达最大重试上限后发送到死信队列,而且在重试过程中会独占当前线程,如果是单线程的消费者会导致其他消息阻塞,直至重试完成,所以可以使用@RabbitListener上的concurrency属性来控制并发数量。

手动ACK + 手动重试机制

appliction.properties

# 手动ACK
spring.rabbitmq.listener.simple.acknowledge-mode=manual

手动ACK配置了重试机制,在抛出异常的时候仍会触发重试,但是达到重试上限之后,会永远处于Unacked状态,不会进入到死信队列,必须要手动拒绝才可以进入死信队列,所以说这里不用配置重试机制而是采用手动重试的方式

消费者

    @RabbitHandler
    @RabbitListener(queues = DirectExchangeConfig.DIRECT_QUEUE2,concurrency = "3")
    public void process3(Message message, Channel channel) throws InterruptedException, IOException {
        // 重试次数
        int retryCount = 0;
        boolean success = false;
        // 消费失败并且重试次数<=重试上限次数
        while (!success && retryCount < MAX_RETRIES) {
            retryCount++;
            // 具体业务逻辑
            String messageBody = new String(message.getBody(), "UTF-8");
            success = !messageBody.equals("ACK重试机制");  //如果消息体等于ACK重试机制
            // 如果失败则重试
            if (!success) {
                String errorTip = "第" + retryCount + "次消费失败" +
                        ((retryCount < 3) ? "," + RETRY_INTERVAL + "s后重试" : ",进入死信队列");
                log.error(errorTip);
                Thread.sleep(RETRY_INTERVAL * 1000);
            }
        }
        if (success) {
            // 消费成功,确认
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            log.info("消费成功");
        } else {
            // 重试多次之后仍失败,进入死信队列
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
            log.info("消费失败");
        }
    }

标签:队列,RabbitMQ,重试,ACK,死信,消息,机制
From: https://www.cnblogs.com/aeolian/p/18136388

相关文章

  • 消息中间件RabbitMQ_RabbitMQ集群搭建8
    一、集群搭建概述摘要:实际生产应用中都会采用消息队列的集群方案,如果选择RabbitMQ那么有必要了解下它的集群方案原理一般来说,如果只是为了学习RabbitMQ或者验证业务工程的正确性那么在本地环境或者测试环境上使用其单实例部署就可以了,但是出于MQ中间件本身的可靠性、并发性......
  • 消息中间件RabbitMQ_RabbitMQ应用问题7
    一、RabbitMQ应用问题1、消息可靠性保障消息补偿机制2、消息幂等性保障乐观锁解决方案 二、消息可靠性保障需求: 100%确保消息发送成功 消息补偿:三、消息幂等性保障幂等性指一次和多次请求某一个资源,对于资源本身应该具有同样的结果。也就是说,其......
  • 消息中间件RabbitMQ_RabbitMQ高级特性6
    一、RabbitMQ高级特性消息可靠性投递ConsumerACK消费端限流TTL死信队列延迟队列日志与监控消息可靠性分析与追踪管理二、消息的可靠投递1、模式在使用RabbitMQ的时候,作为消息发送方希望杜绝任何消息丢失或者投递失败场景。RabbitMQ为我们提供了两种方式......
  • docker安装rabbitmq
    //查找镜像dockersearchrabbitmq//默认拉取官方最新版本dockerpullrabbitmq//创建容器,也可直接执行该命令,没有镜像会去先拉取镜像dockerrun-d--namemyrabbitmq-p5672:5672-p15672:15672rabbitmq//进入镜像dockerexec-it容器id/bin/bash//安装UI插件rabb......
  • java基础_03_包机制
    1、包的本质,就是文件夹 2、建包方法: packagecom.baidu.hhb;//这就是包,必须加在整个类的最上边,不能删,删除后下面的类就找不到包importxiaodi_java_base.*;//*通配符,可以导入该目录下所有的类。importxiaodi_java_base.khhhk;//导入类importjava.util.Date;......
  • Rust 的 Pin 机制
    背景我相信大多数人在学习Rust异步编程时都会被Futuretrait中的Pin指针感到困惑:pubtraitFuture{typeOutput;fnpoll(self:Pin<&mutSelf>,cx:&mutContext<'_>)->Poll<Self::Output>;}特别是搜索了一圈文档之后,更会对这个Pin一头雾水,彷佛自己也......
  • 原来Rust的panic也能被捕捉?浅谈Rust的panic机制
    这一系列文章的创作目的主要是帮助我自己深入学习Rust,同时也为已经具备一定Rust编程经验,但还没有深入研究过语言和标准库的朋友提供参考。对于正在入门Rust的同学,我更建议你们看《Rust圣经》或者《TheBook》,而不是这种晦涩难懂的文章。你用过panic!宏吗?在Rust里,panic!宏可以用......
  • Zookeeper的选举机制
    为什么要进行Leader选举?Leader主要作用是保证分布式数据一致性,即每个节点的存储的数据同步。遇到以下两种情况需要进行Leader选举服务器初始化启动服务器运行期间无法和Leader保持连接,Leader节点崩溃,逻辑时钟崩溃。服务器初始化时Leader选举Zookeeper由于其自身的性质,一般......
  • RabbitMQ 入门:我的思维导图总结
    刚看完尚硅谷的RabbitMQ入门教程,感觉自己对知识点的理解还有些混乱,看完相关面试题后整理了一份简单的思维导图,仅供参考,如有错误,欢迎指正!视频教程:尚硅谷RabbitMQ教程丨快速掌握MQ消息中间件rabbitmq_哔哩哔哩_bilibili1.概览2.具体细节2.1基本概念常见面试题:什......
  • 4. 会话机制 Cookie和Session
    4.1会话机制一次会话指的是:就好比打电话,A给B打电话,接通之后,会话开始,直到挂断电话,该次会话就结束了,而浏览器访问服务器,就跟打电话一样,浏览器A给服务器发送请求,访问web程序,该次会话就已经接通,其中不管浏览器发送多少请求(就相当于接通电话后说话一样),都视为一次会话,直到浏览器关闭......