首页 > 其他分享 >RabbitMQ学习六 生产者可靠性

RabbitMQ学习六 生产者可靠性

时间:2024-01-21 11:44:54浏览次数:30  
标签:发送 可靠性 confirm 生产者 RabbitMQ 重试 xx MQ 消息

一、生产者重连

由于网络波动可能造成客户端连接MQ失败的情况,通过配置可以开启连接失败后的重连机制:

spring:
  rabbitmq:
    addresses: xxx.xx.xx.xx
    port: 5672
    username: xxxxx
    password: xxxx
    virtual-host: /xxxx
    connection-timeout: 1s #设置MQ的连接超时时间
    template:
      retry:
        max-attempts: 3 #最大重试次数
        enabled: true #开启重试机制
        multiplier: 1 #失败后下次等待时长倍数,下次等待时长=initial-interval * multiplier
        initial-interval: 1000ms # 失败后的初始等待时间

注意上述重试配置是写在template配置里面。 配置上述重试之后,把MQ服务停掉

停掉mq之后,向mq发送一个消息

注意:

当网络不稳定的时候,利用重试机制可以有效提高消息发送的成功率。不过SpingAMQP提供的重试机制是 阻塞式 的重试,也就是说多次重试等待的过程中,当前线程是被阻塞的,会影响业务性能。

如果对于业务性能有要求,建议 禁用 重试机制。如果一定要使用,请合理分配等待时长重试次数,当然也可以考虑使用异步线程来执行发送消息的代码。

 

二、生产者确认

生产者确认机制:生产者发送消息后,需要等待RabbitMQ服务器的确认消息,以确保消息已经被成功地发送到RabbitMQ服务器。如果RabbitMQ服务器没有收到消息或者消息发送失败,生产者会收到一个确认消息,从而可以进行重发或者其他处理。

RabbitMQ有Publisher ConfirmPublisher Return两种确认机制。开启确认机制之后,在MQ成功收到消息后会返回确认消息给生产者。返回的结果有如下几种情况:

  • 消息投递到了MQ服务器,但是路由失败(路由键写错或者交换机没有绑定队列),此时通过PublisherReturn返回路由异常原因,然后返回ACK,告知投递成功

  • 如果队列是非持久化的, 消息投递到了MQ,并且入队成功,返回ACK,告知投递成功

  • 如果队列是持久化的,消息投递到MQ服务器,并且入队完成且保存到磁盘之后,返回ACK,告知投递成功

  • 其他情况都会返回NACK,告知投递失败。

 

生产者发送消息给MQ,等待MQ回执有两种方式:同步方式和异步方式

spring:
  rabbitmq:
    addresses: xxx.xx.xx.xx
    port: 5672
    username: xx
    password: xx
    virtual-host: /xx
    connection-timeout: 1s #设置MQ的连接超时时间
    template:
      retry:
        max-attempts: 3 #最大重试次数
        enabled: true # 开启重试机制
        multiplier: 1 #失败后下次等待时长倍数,下次等待时长=initial-interval * multiplier
        initial-interval: 1000ms # 失败后的初始等待时间
    publisher-returns: true # 开启publisher return机制
    publisher-confirm-type: correlated #

说明:
publish-confirms: 开启publish-confirm功能,确认回调,springboot 2.2.0之后不用了,被publish-confirm-type替换了
publish-confirm-type:开启publisher-confirm,这里支持三种类型:
    none: 不开启生产者确认
    simple:同步等待confirm结果,直到超时
    correlated:异步回调,定义ConfirmCallback,MQ返回结果时会回调这个ConfirmCallback
publish-returns:开启publish-return功能,同样是基于callback机制,不过是定义ReturnCallback 

https://blog.csdn.net/qq_29385297/article/details/127545272

 

编写return配置类

每一个RabbitTemplate只能配置一个ReturnCallback,因此需要在项目启动过程中配置。

@Configuration
@Slf4j
public class RabbitMqReturnConfig implements ApplicationContextAware {

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {

        RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            /**
             *
             * @param message 发送的消息
             * @param replyCode 应答码
             * @param replyText 原因
             * @param exchange 交换机
             * @param routingKey 路由键
             */
            @Override
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                log.info("消息发送失败: 应答码={}, 原因={}, 交换机={},路由键={},消息={}",
                        replyCode,replyText,exchange,routingKey,message.getBody().toString());
            }
        });

    }
}

配置ConfirmCallback

ACK代表Acknowledgement,即确认消息。消息的Confirm需要配置在每次发送消息时

@RequestMapping("/sendMessageConfirm")
    public String sendMessageConfirm(String message){
        // 直接向队列里面发送消息
        CorrelationData correlationData=new CorrelationData();
        correlationData.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {
            /**
             * 此处的失败,并不代表nack,此处的失败是spring内部处理的时候失败了
             * @param throwable
             */
            @Override
            public void onFailure(Throwable throwable) {
                // future发送异常时的处理逻辑,一般不会发生直接打印就行
                log.error("on failure:{}",throwable);
            }

            /**
             * 代表MQ的回调成功,但并不代表消息发送的结果
             * @param confirm
             */
            @Override
            public void onSuccess(CorrelationData.Confirm confirm) {
               if(confirm.isAck()){
                    log.info("消息发送成功,收到ACK");
               }else {
                   log.info("消息发送失败,收到NACK,原因:{}",confirm.getReason());
               }


            }
        });
        Map<String,Object> map=new HashMap<>();
        map.put("name","张三");
        map.put("address","浙江杭州");
        rabbitTemplate.convertAndSend(BootMqConstant.TOPICS_EXCHANGE_NAME,"object.hahha",map,correlationData);
        return "ok";
    }

correlationData.getFuture()中的Future其实就是多线程中的Future。

上述代码发送了两条消息到topics_object_queue队列中,发送成功之后,生产者被回调了两次

ack

returnCallback

故意写路由键,实验ReturnCallback

 

故意写路由键,实验ReturnCallback

nack

运行结果

生产者确认机制有额外的网络和系统资源的开销,一般不建议使用。如果一定要使用,不用开启Publisher-Return机制,因为一般是由于路由失败做成,需要开发人员去更改路由键。

在生产中,实际上只需要处理nack的情况,如果遇到nack的情况,可以做有限次的重试,如果重试之后还失败则需要记录失败的异常消息,可以记录到数据库中。

 

标签:发送,可靠性,confirm,生产者,RabbitMQ,重试,xx,MQ,消息
From: https://www.cnblogs.com/cplinux/p/17977666

相关文章

  • RabbitMQ学习五 springboot连接RabbitMQ
    一、入门引入依赖在springboot中引入spring-amqp-starter<!--amqp的起步依赖--><dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId></dependency>编写配置文件spring:rabbitmq......
  • RabbitMQ安装-Windows
      Windows安装RabbitMQ配置:Eralng:opt-20.2RabbitMQ-server-3.7.4(习惯安装到无中文且无空格目录下) 1.安装erlang并配置环境变量安装:otp_win64_20.2.exeotp_win64_20.2.exe配置环境变量变量名:ERLANG_HOME变量值:(安......
  • 进程间通信(生产者消费者模型)
    【一】进程间通信介绍什么是进程间通信进程间通信(Inter-processCommunication,IPC)是指在不同进程之间传输数据或信号的机制。由于每个进程拥有自己独立的内存空间,所以不同进程之间无法直接访问对方的变量或数据结构。因此,操作系统提供了多种IPC机制来允许进程之间共享信息和协......
  • RabbitMq基础版
    微服务一旦拆分,必然涉及到服务之间的相互调用,目前我们服务之间调用采用的都是基于OpenFeign的调用。这种调用中,调用者发起请求后需要等待服务提供者执行业务返回结果后,才能继续执行后面的业务。也就是说调用者在调用过程中处于阻塞状态,因此我们成这种调用方式为同步调用,也可以叫同......
  • RabbitMQ学习四 java客户端连接RabbitMQ
    RabbitMQ的工作模式,可以参考官网:https://www.rabbitmq.com/getstarted.html一、简单方式以下两种都是生产者直接发消息给队列,不通过交换机。且发送给队列的消息只能被消费一次。比如一个队列被C1和C2消费,在队列中的消息只会被一个消费者消费。生产者代码逻辑图代码如下:p......
  • docker环境下安装RabbitMQ
    环境系统为debian12将安装docker将安装rabbitmq3.8.5安装步骤1Docker安装1.1Debian下安装非常简单:aptinstalldocker.io1.2查看docker的版本dockerversion1.3修改源vi/etc/docker/daemon.json{"registry-mirrors":["https://registry.docker-cn.com"]......
  • RabbitMQ
    RabbitMQ是一个开源的消息代理软件,它使用消息队列来处理系统间的通信。主要原理是基于发布-订阅模式。生产者(Producer)发送消息到队列,消费者(Consumer)则从队列中接收消息。RabbitMQ支持多种消息模型,包括点对点、发布/订阅和路由等。它还支持消息确认、持久化和高可用性,确保消息......
  • FastAPi Celery RabbitMQ 与 Redis 的使用,并使用 Flower 监控 Celery 状态
    FastAPiCeleryRabbitMQ与Redis的使用,并使用Flower监控Celery状态本文介绍了Windows下FastAPiCelery使用RabbitMQ与Redis做代理的使用方法,本文参考了国外大佬的文章,并做了修改与补充,原文见这里,SumanDas,他文章中的完整代码,见这里,GitHubRabbitMQ与Redis的......
  • Windows RabbitMQ 安装-截止当前最新版本(rabbitmq-server-3.12.12)图文教程
    WindowsRabbitMQ安装(图文教程)WindowsRabbitMQ安装,截止当前最新版本(rabbitmq-server-3.12.12)图文教程,本文只是最简单的安装方法,旨在能快速使用,若需要更多的配置,则需要你自行查阅官方文档,或互联网搜索答案咯,哈哈哈哈本文安装步骤共分4步:第1步:下载RabbitMQ与依赖Erl......
  • RabbitMQ部署安装
    1、RabbitMQ部署1.1.概述RabbitMQ是一套开源(MPL)的消息队列服务软件,是由 LShift 提供的一个是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件),由以高性能、健壮以及可伸缩性出名的 Erlang 写成。因此使用RabbitMQ必须安装Erlang环境。说明:1、演示部署服......