首页 > 其他分享 >RabbitMQ如何保证消息的可靠性

RabbitMQ如何保证消息的可靠性

时间:2022-10-13 00:22:52浏览次数:61  
标签:可靠性 return 队列 RabbitMQ 重试 交换机 保证 消息 失败

如何确保RabbitMQ消息的可靠性

  • 对于生产者,开启生产者确认机制,确保生产者的消息能到达队队列

  • 对于mq,开启持久化功能,确保消息未消费前在队列中不会丢失

  • 对于消费者,开启消费者确认机制为auto,有spring确认消息处理成功后完成ack

  • 开启消费者失败重试机制,并设置MessageRevocerer,多次重试失败后会将消息投递到异常交换机,进行人工处理等

MQ一些常见问题

  1. 消息可靠性问题:如何确保发送的消息至少被消费一次

  2. 消息延迟问题:如何实现消息的延迟投递

  3. 消息堆积问题:如何解决数百万消息堆积,无法及时消费的问题

  4. 高可用性问题:避免单点MQ故障而导致的不可用

1、消息可靠性

消息丢失的可能性

  • 发送时丢失:

    • 生产者发送消息未送达exchange

    • 消息到达exchange后未到达queue

  • MQ宕机,queue将消息丢失

  • consumer接受消息后未消费就宕机

生产者消息确认

RabbitMQ提供了publisher confirm机制来避免消息发送到MQ过程中丢失。消息发送到MQ以后,会返回一个结果给发送者,表示消息是否处理成功。结果有两种请求

  • publisher-confirm,发送者确认

    • 消息返回成功投递到交换机,返回ack

    • 消息未成功投递到交换机,返回nack

  • publisher-return,发送者回执

    • 消息投递到交换机了,但是没有路由到队列。返回ack,及路由失败原因

确认机制发送消息时,需要给每个消息设置一个全局唯一ID,以区分不同消息,避免ACK冲突

生产者确认消息案例

1、在publisher服务的application.yml中添加配置
spring: 
  rabbitmq:
    publisher-confirm-type: correlated #异步回调确认类型
    publisher-returns: true #开启publisher-returns的功能
    template:
    mandatory: true #定义路由失败的策略
  • publish-confirm-type:开启publisher-confirm,这里支持两种类型:

    • simple:同步等待confirm结果,直到超时,可能会导致进程阻塞,不建议使用

    • correlated:异步回调,定义ConfirmCllBack。MQ返回结果时会回调这个ConfirmCallBack,有点像ajax,推荐使用

  • publish-returns:开启publish-return功能,同样是基于callback机制,不过是定义ReturnCallback

  • template.mandatory:定义消息路由失败的策略。true,则调用ReturnCallback;false:则直接丢弃消息

2、编写回调函数ReturnCallback

每个RabbitTemplate只能配置一个ReturnCallback,因此需要在项目启动过程中配置

  • ReturnCallback代表你消息到了交换机路由消息失败了,回调此函数

  • 实现ApplicationContextAware接口 —— Spring中bean工厂的通知,当Springbean工厂准备好了以后就来通知你,并且重写setApplicationContext方法,传入一个ApplicationContext工厂对象,通过这个工厂获取bean,获取rabbitTemplate,设置ReturnCallback

  • 五个参数:有了这些参数可以记录日志,有了交换机和rotingKey也可以重新发送消息,实现消息重试

    • message你发的是什么消息

    • replyCode失败状态码

    • replyText失败描述原因

    • exchange交换机是什么

    • routingKey路由key是什么

/**
     * km 重写setAppicationContext方法,传入一个ApplicationContext工厂对象,通过这个工厂获取bean,获取rabbitTemplate,设置ReturnCallback
     *
     * @param applicationContext 工厂对象
     * @throws BeansException 抛出bean异常
     */
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        // kp 获取RabbitTemplate对象
        RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
        // kp 配置ReturnCallback
        // ks 五个参数:消息路由失败了,message你发的是什么消息,replyCode失败状态码。replyText失败描述原因,exchange交换机是什么,routingKey路由key是什么,有了这些参数可以记录日志,也可以重新发送消息,实现消息重试
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
            // 判断是否是延迟消息
            Integer receivedDelay = message.getMessageProperties().getReceivedDelay();
            if (receivedDelay != null && receivedDelay > 0) {
                // 是一个延迟消息,忽略这个错误提示
                return;
            }
            // 记录日志
            log.error("消息发送到队列失败,响应码:{}, 失败原因:{}, 交换机: {}, 路由key:{}, 消息: {}",
                     replyCode, replyText, exchange, routingKey, message.toString());
            // 如果有需要的话,重发消息
        });
    }
3、发送消息ConfirmCallback

每个RabbitMQ,可以有多个ConfirmCallback,每次发消息都可以发不同的ConfirmCallback,弄不同的业务处理方案,在每次发消息那一刻配置ConfirmCallBack

  • ConfirmCallback是指消息根本没到交换机

  • 准备一条消息体

  • 生成消息的唯一ID,获取Future并且使用lambda表达式添加callback

    • result是正常的callback回调,受到的回执又分为两种

      • ack成功的回执,消息成功投递到交换机

      • nack消息失败的回执,消息没成功投递到交换机

    • ex是异常的callback回调,就是执行过程中直接没收到回执,抛异常了才会进入失败的水貂

  • 发送消息convertAndSend("交换机的名称", "路由key的名称", "消息体",correlatiohnData)

public void testSendMessage2SimpleQueue() throws InterruptedException {
        // 1.准备消息
        String message = "hello, spring amqp!";
        // ks 2.准备CorrelationData
        // kp 2.1.准备消息的唯一ID
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        // kp 2.2.准备ConfirmCallback
        correlationData.getFuture().addCallback(
            result -> { // kp result正常回调
            // 判断结果
            if (result.isAck()) {
                // kp 发送ACK回执,代表消息成功投递到交换机
                log.debug("消息成功投递到交换机!消息ID: {}", correlationData.getId());
            } else {
                // kp 发送NACK回执,代表消息投递到交换机失败,取得消息id
                log.error("消息投递到交换机失败!消息ID:{}", correlationData.getId());
                // 重发消息
            }
        }, ex -> { //kp ex异常回调
            // 记录日志
            log.error("消息发送失败!", ex);
            // todo 重发消息
        });
        // 3.发送消息
        rabbitTemplate.convertAndSend("amq.topic", "a.simple.test", message, correlationData);
    }

总结SpringAMQP中处理消息确认的几种情况

  • publisher-confirm:

    • 消息成功发送到exchange,返回ack

    • 消息发送失败,没有到达交换机,返回nack

    • 消息发送过程中出现异常,没有受到回执 ex

  • 消息成功发送到exchange,但是没有路由到queue

    • 调用ReturnCallBack

2、消息持久化

宕机消息能否保存到磁盘上

durable参数,代表可持久化,在配置中心可视化配置交换机、队列的时候设置duriability设置为

使用代码

  • MQ默认是内存消息,开启持久化功能可以确保缓存在MQ中的消息不丢失

  • 消费者在启动的时候,可以初始化交换机和队列,所以在消费者代码中进行配置交换机和队列持久化策略

  • 消息持久化在生产者发送消息的时候

1、交换机持久化

AMQP默认情况下新建的交换机都是持久化的

public class CommonConfig {
    /**
     * km 定义交换机持久化策略
     *
     * @return
     */
    @Bean
    public DirectExchange simpleDirect(){
        //ks 三个参数:交换机名称、是否持久化、当没queue与其绑定的时候是否自动删除
        return new DirectExchange("simple.direct", true, false);
    }
}

2、队列持久化

AMQP默认情况下新建的队列都是持久化的

public class CommonConfig {
    /**
     * km 定义交换机持久化策略
     *
     * @return
     */
    @Bean
    public Queue simpleQueue(){
        //ks 使用QueueBuilder构建队列,durable就是持久化的
        return QueueBuilder.durable("simple.queue").build();
    }
}

3、消息持久化 生产者发送消息的时候设置

SpringAMQP中的消息持久化是默认持久的,可以通过MessageProperties中的DeliveryMode来指定d:

public void testDurableMessage() {
        // ks 1.准备消息
        Message message = MessageBuilder.withBody("hello, spring".getBytes(StandardCharsets.UTF_8)) //kp 消息体,字符集
                .setDeliveryMode(MessageDeliveryMode.PERSISTENT) //kp 持久化
                .build();
        // 2.发送消息
        rabbitTemplate.convertAndSend("simple.queue", message);
    }

3、消费者确认消息

  • 消费者处理消息后可以向MQ发送ack回执,MQ收到ack回执后才会删除该消息

    • 消费者拿到消息后处理完,发送ack回执处理完了,MQ收到回执,就把消息从队列中删除

    • 如果消费者在处理消息中抛出了异常,这时候就会返回nack回执,MQ受到回执就会重新投递一次消息

    • 处理消息的时候服务宕机,什么都没发,MQ就认为消费者废了,等待启动以后再发送给消费者一次,确保消息必须被消费一次

    • 如果消费者消费失败了,会返回unack回执,队列就会再次重试投递给消费者重试,一直到消费者消费了,这样就会导致一直发消息重复,一直重试不停,会导致mq崩溃

  • 而SpringAMQP则允许配置三种确认模式

    • manual:手动ack,需要在业务代码结束后,调用api发送ack

    • auto:自动ack,由spring监测listener代码是否出现异常,没异常则返回ack;抛出异常则返回nack,一般都是用这个

    • none:关闭ack,MQ假定消费者获取消息后会成功处理,因此消息投递后立刻被删除

通过配置设定确认模式

spring:
  rabbitmq:
    listener:
      simple:
        prefetch: 1
        acknowledge-mode: auto #none关闭ack,manual手动ack,auto

4、失败重试机制

  • 当消费者出现异常后,消息会不断requeue重新入队到队列,在重新发送给消费者,然后再次异常,再次requeue,无限循环,导致mq的消息处理飙升,带来不必要的压力、

  • 我们可以利用spring的retry机制,在消费者出现异常时利用本地重试,而不是返回unack进行无限制的requeue到mq队列

    • 当然spring的retry也不是无限重试,可能到一定次数就要停止,重试MQ或者交给人工干预等其他方法

通过配置设定retry

spring:
  rabbitmq:
    listener:
      simple:
        prefetch: 1
        retry: 
            enabled: true #开启消费者失败重试
            initial-interval: 1000 #初试的失败等待时长为1秒
            multiplier: 1 #下次失败的等待时长倍数,下次等待时长 = multiplier * last-interval(上次等待时长)
            max-attempts: 3 #最大重试次数
            stateless: true #默认true无状态,无状态的服务效率高;false有状态。如果业务中包含事务,重试的时候保留事务不会导致事务失效,建议改成false,

4.1、消费者失败消息处理策略

在开启重试模式之后,重试次数耗尽,如果消息依然失败,则需要有MessageRecoverer接口来处理,包含三种不同的实现

  • RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认就是这种方式

  • ImmediateRequeueMessageRevoverer:重试耗尽,返回nack,消息requeue重新入队,有点像没开重试机制,但是消息requeue频率更低了

  • RepublishMeaageRecoverer:重试耗尽,消费者把失败的消息重新投递到指定的交换机,交换机再路由到指定队列里,可以理解为指定交换机和指定队列为失败消息垃圾桶,可以专门指定一个垃圾桶消费者去监听失败消息垃圾桶队列,然后这个消费者可以通知管理员有消息本地重试失败了,然后就可以进行人工处理

Coding

1、在ErrorMessageConfig中,定义接受失败消息的交换机、队列及其绑定关系
@Configuration
public class ErrorMessageConfig {

    /**
     * km 定义异常处理交换机
     *
     * @return
     */
    @Bean
    public DirectExchange errorMessageExchange(){
        return new DirectExchange("error.direct");
    }

    /**
     * km 定义异常处理队列
     *
     * @return
     */
    @Bean
    public Queue errorQueue(){
        return new Queue("error.queue");
    }

    /**
     * km 绑定异常队列和交换机,路由key为error
     *
     * @return
     */
    @Bean
    public Binding errorMessageBinding(){
        return BindingBuilder.bind(errorQueue()).to(errorMessageExchange()).with("error");
    }

    /**
     * km 定义异常消息处理器
     *
     * @param rabbitTemplate 传入模板
     * @return 返回一个
     */
    @Bean
    public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
        //kp 指定具体的异常处理交换机名称和绑定异常队列的路由key
        return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
    }
}
2、然后,在ErrorMessageConfig定义RepublishMessageRecoverer
/**
     * km 定义异常消息处理器
     *
     * @param rabbitTemplate 传入模板
     * @return 返回一个
     */
    @Bean
    public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
        //kp 指定具体的异常处理交换机名称和绑定异常队列的路由key
        return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
    }

 

标签:可靠性,return,队列,RabbitMQ,重试,交换机,保证,消息,失败
From: https://www.cnblogs.com/phonk/p/16786624.html

相关文章

  • RabbitMQ 生产者和消费代码实例
       启动服务   1.进入rabbitmq的安装sbin目录下cmd进入命令窗口          2.cmd输入命令rabbitmq-server.bat,如图启动成功    ......
  • 如何保证缓存与数据库的双写一致性?
    一般来说,如果允许缓存可以稍微的跟数据库偶尔有不一致的情况,也就是说如果你的系统不是严格要求“缓存+数据库”必须保持一致性的话,最好不要做这个方案,即:读请求和写请求......
  • RabbitMQ 镜像队列
    一、基本原理1.工作原理搭建RabbitMQ集群以后,尽管交换器和绑定关系能够在单点故障问题上幸免于难,但是队列及其存储的消息却不行,这是因为队列进程及其内容仅仅维持......
  • RabbitMQ 存储机制
    一、消息存储机制不管是持久化的消息还是非持久化的消息都可以被写入到磁盘。持久化的消息在到达队列时就被写入到磁盘,非持久化的消息一般只保存在内存中,在内存吃紧的......
  • 它让你1小时精通RabbitMQ消息队列(新增死信处理)
    支持.NET/.NETFramework/.NETCoreRabbitMQ作为一款主流的消息队列工具早已广受欢迎。相比于其它的MQ工具,RabbitMQ支持的语言更多、功能更完善。本文提供一种市面上最/......
  • 02 RabbitMQ 3.8 Feature Focus - Quorum Queues
    标题:RabbitMQ3.8FeatureFocus-QuorumQueues原文:https://www.cloudamqp.com/blog/rabbitmq-quorum-queues.html时间:2019-03-28RabbitMQ3.8将于今年推出,它将带来四......
  • [翻译] Quorum Queues - Making RabbitMQ More Competitive In Reliable Messaging
    目录AnIntroductiontoRaftBriefOverviewofRaftWriteSafetyReadSafetyRabbitMQQuorumQueuesNewDrawbacksQuestionsandOpenTopicsCouldtheidempotencyofwr......
  • Windows 上下载安装 RabbitMQ 的方法步骤
    RabbitMQ是一套开源(MPL)的消息队列服务软件,是由LShift提供的一个AdvancedMessageQueuingProtocol(AMQP)的开源实现,由以高性能、健壮以及可伸缩性出名的Erlang写成......
  • Windows10 RabbitMQ安装和启动详细步骤
    配置Erlang语言下载Erlang原因:RabbitMQ安装包依赖于Erlang语言包的支持,所以要先安装Erlang语言包,再安装RabbitMQ安装包。下载地址:http://www.erlang.org/downloads注......
  • Linux环境RabbitMQ安装教程new
    在安装RabbitMQ中需要注意:1、RabbitMQ依赖于Erlang,需要先安装Erlang2、Erlang和RabbitMQ版本有对应关系https://www.rabbitmq.com/which-erlang.html一、安装Erlang1.先安......