首页 > 其他分享 >RabbitMQ学习八 消费者可靠性

RabbitMQ学习八 消费者可靠性

时间:2024-01-21 17:44:55浏览次数:25  
标签:可靠性 消费者 RabbitMQ 重试 失败 消息 message

一、消费者确认机制

消费者的可靠性是靠消费者确认机制来保证。RabbitMQ提供了消费者确认机制(consumer Acknowledgement)。当消费者处理消息结束后,应该向RabbitMQ发送一个回执,告知RabbitMQ自己处理状态。回执有三种可选值:

  • ack: 成功处理消息,RabbitMQ从队列中删除该消息

  • nack: 消息处理失败,RabbitMQ需要再次投递消息

  • reject: 消息处理失败并拒绝该消息,RabbitMQ从队列中删除该消息。

 

SpringAMQP已经实现了消息确认功能。并允许我们通过配置文件选择ACK处理方式,有三种方式:

  • none: 不处理。即消息投递给消费者后立刻ack,消息会立刻从MQ删除。非常不安全,不建议使用

    消费者还没有处理完消息,RabbitMQ就直接把消息删除
  • manual: 手动模式。需要自己在业务代码中调用api,发送ack或者reject,存在业务入侵,但更灵活。

  • auto: 自动模式。 SpringAMQP利用AOP对我们的消息处理逻辑做了环绕增强,但业务正常执行时则自动返回ack。当业务出现异常时,根据异常判断返回不同结果。

    • 如果是业务异常,会自动返回nack

    • 如果是消息处理或校验异常,自动返回reject

直接抛异常

server:
  port: 8081

spring:
  rabbitmq:
    addresses: xxx.xx.x.x
    port: 5672
    username: xx
    password: xxx
    virtual-host: /xx
    listener:
      simple:
        prefetch: 1
        acknowledge-mode: auto
 /**
     * 监听某个队列的消息
     * @param message 接收到的消息
     */
    @RabbitListener(queues = BootMqConstant.SIMPLE_QUEUE)
    public void consumer(String message ){
        System.out.println("消费者消息"+message);
        throw new RuntimeException("error");
    }

 

当采取auto模式时,

1、如果消费者监听者没有返回,消息会处于未确认状态

2、如果消费者监听器正确返回之后,会删除消息,

3、如果消费者抛出异常,RabbitMQ会再次把消息投递给消费者(可能造成一直死循环)。

当消费者一直处理消息失败时,需要设置重试次数

 

reject

 /**
     * 监听某个队列的消息
     * @param message 接收到的消息
     */
    @RabbitListener(queues = BootMqConstant.SIMPLE_QUEUE)
    public void consumer(String message ){
        System.out.println("消费者消息"+message);
        throw new MessageConversionException("消息异常");
    }

当抛出MessageConversionException ,消息处理失败并拒绝该消息,RabbitMQ从队列中删除该消息。

 

 

二、消费者确认重试机制

当消费者出现异常后,消息会不断requeue(重新入队)到队列,再次重新发送给消费者,然后再次异常,再次requeue,无限循环,导致mq的消息处理飙升,带来不必要的压力。

我们可以利用spring的retry机制,在消费者出现异常时利用本地重试,而不是无限制的requeue到mq队列:

重试配置

消费者重试机制,在消费者端进行配置

spring:
  rabbitmq:
    addresses: xxx.xx.xx.xx
    port: 5672
    username: xxxx
    password: xxx
    virtual-host: /xxx
    listener:
      simple:
        prefetch: 1
        acknowledge-mode: auto
        retry:
          enabled: true #开启消费者失败重试
          initial-interval: 1000ms # 初始的失败等待时长为1秒
          multiplier: 1 # 下次失败的等待时长倍数,下次等待时长=multiplier*last-interval
          max-attempts: 3 # 最大重试次数
          stateless: true # true无状态; false有状态。如果业务中包含事务,这里改为false

注意:在消费者端重试次数配置的是listener,在生产者端重试次数配置的是template

消费者代码:

/**
     * 监听某个队列的消息
     * @param message 接收到的消息
     */
    @RabbitListener(queues = BootMqConstant.SIMPLE_QUEUE)
    public void consumer(String message ){
        System.out.println("消费者消息:"+message);
        throw new RuntimeException("消息异常");
    }

 

实验效果

RabbitMQ尝试3次投递消息给消费者,重试3次之后不再重试,但是此时RabbtiMQ把消息删除。

 

失败消息处理策略

在开启重启重试模式后,重试次数耗尽,如果消息依然失败,则需要有MessageRecoverer接口来处理,它包含三个实现:

  • RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。这种是默认方式

  • ImmediateRequeueMessageRecoverer: 重试耗尽后,返回nack,消息重新入队。

  • RepublishMessageRecoverer: 重试耗尽后,将失败消息投递到指定的交换机。

 

消息失败建议采用RepublishMessageRecoverer,这样可以把消息路由到一个专门处理失败情况的队列,然后把失败情况记录下来,由人工干预处理。

定义一个专门用来接收错误消息的队列和交换机,并配置 RepublishMessageRecoverer

配置类如下:


/**
 * 当前配置类需要在开启 rabbitMQ消费者失败重试的情况才需要起作用。
 */
@Configuration
@ConditionalOnProperty(prefix ="spring.rabbitmq" ,name ="listener.simple.retry.enabled" ,havingValue = "true")
public class ErrorConfig {

    @Bean
    public DirectExchange errorExchange(){
        return new DirectExchange("error.exchange");
    }

    @Bean
    public Queue errorQueue(){
        return new Queue("error.queue");
    }

    @Bean
    public Binding errorBinding(){
        return BindingBuilder.bind(errorQueue()).to(errorExchange()).with("error");
    }

    @Bean
    public MessageRecoverer messageConverter(RabbitTemplate rabbitTemplate){
        return  new RepublishMessageRecoverer(rabbitTemplate,"error.queue","error");
    }
}

ErrorConfig只有在消费者 开启 消息投递失败的情况下才会起作用,所以加上一个ConditionalOnProperty注解。

ErrorConfig只有在消费者 开启 消息投递失败的情况下才会起作用,所以加上一个ConditionalOnProperty注解。

 

标签:可靠性,消费者,RabbitMQ,重试,失败,消息,message
From: https://www.cnblogs.com/cplinux/p/17978066

相关文章

  • RabbitMQ学习六 生产者可靠性
    一、生产者重连由于网络波动可能造成客户端连接MQ失败的情况,通过配置可以开启连接失败后的重连机制:spring:rabbitmq:addresses:xxx.xx.xx.xxport:5672username:xxxxxpassword:xxxxvirtual-host:/xxxxconnection-timeout:1s#设置MQ的连......
  • RabbitMQ学习五 springboot连接RabbitMQ
    一、入门引入依赖在springboot中引入spring-amqp-starter<!--amqp的起步依赖--><dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId></dependency>编写配置文件spring:rabbitmq......
  • RabbitMQ安装-Windows
      Windows安装RabbitMQ配置:Eralng:opt-20.2RabbitMQ-server-3.7.4(习惯安装到无中文且无空格目录下) 1.安装erlang并配置环境变量安装:otp_win64_20.2.exeotp_win64_20.2.exe配置环境变量变量名:ERLANG_HOME变量值:(安......
  • 进程间通信(生产者消费者模型)
    【一】进程间通信介绍什么是进程间通信进程间通信(Inter-processCommunication,IPC)是指在不同进程之间传输数据或信号的机制。由于每个进程拥有自己独立的内存空间,所以不同进程之间无法直接访问对方的变量或数据结构。因此,操作系统提供了多种IPC机制来允许进程之间共享信息和协......
  • RabbitMq基础版
    微服务一旦拆分,必然涉及到服务之间的相互调用,目前我们服务之间调用采用的都是基于OpenFeign的调用。这种调用中,调用者发起请求后需要等待服务提供者执行业务返回结果后,才能继续执行后面的业务。也就是说调用者在调用过程中处于阻塞状态,因此我们成这种调用方式为同步调用,也可以叫同......
  • RabbitMQ学习四 java客户端连接RabbitMQ
    RabbitMQ的工作模式,可以参考官网:https://www.rabbitmq.com/getstarted.html一、简单方式以下两种都是生产者直接发消息给队列,不通过交换机。且发送给队列的消息只能被消费一次。比如一个队列被C1和C2消费,在队列中的消息只会被一个消费者消费。生产者代码逻辑图代码如下:p......
  • 当“低价高质”成行业共识,零食品牌还能靠什么拿捏消费者?
    文|螳螂观察作者|图霖年关将至,一年一度的“年货内卷赛”已一触即发。尤其是,2024年是疫情过后的首个春节,热闹必不可少,大众走亲访友的年礼更必不可少。而在这个赛场里,具备购买力但又尤其厌倦千篇一律传统年货形式的年轻人,是决定品牌能否抢占年货节制高点的关键。今年的情况尚待定......
  • docker环境下安装RabbitMQ
    环境系统为debian12将安装docker将安装rabbitmq3.8.5安装步骤1Docker安装1.1Debian下安装非常简单:aptinstalldocker.io1.2查看docker的版本dockerversion1.3修改源vi/etc/docker/daemon.json{"registry-mirrors":["https://registry.docker-cn.com"]......
  • RabbitMQ
    RabbitMQ是一个开源的消息代理软件,它使用消息队列来处理系统间的通信。主要原理是基于发布-订阅模式。生产者(Producer)发送消息到队列,消费者(Consumer)则从队列中接收消息。RabbitMQ支持多种消息模型,包括点对点、发布/订阅和路由等。它还支持消息确认、持久化和高可用性,确保消息......
  • FastAPi Celery RabbitMQ 与 Redis 的使用,并使用 Flower 监控 Celery 状态
    FastAPiCeleryRabbitMQ与Redis的使用,并使用Flower监控Celery状态本文介绍了Windows下FastAPiCelery使用RabbitMQ与Redis做代理的使用方法,本文参考了国外大佬的文章,并做了修改与补充,原文见这里,SumanDas,他文章中的完整代码,见这里,GitHubRabbitMQ与Redis的......