首页 > 其他分享 >RabbitMQ——消息的可靠性处理

RabbitMQ——消息的可靠性处理

时间:2024-09-24 23:24:00浏览次数:13  
标签:可靠性 处理 RabbitMQ 重试 队列 交换机 MQ 消息 public

1.业务分析

        在业务的开发中,我们通常将业务的非核心业务交给MQ来处理,比如支付,在支付过后,我们需要扣减余额,修改支付单状态,修改订单状态,发送短信提醒用户,给用户增加积分等等(可能真是场景并非这么简单,这里举个例子),在这套业务中,修改订单状态,发送短信提醒用户,给用户增加积分这三个业务在特定场景下并非是核心业务,所以把他放在MQ消息队列中进行处理,在非核心业务执行的时候,可能出现多个问题,导致数据不一致,比如用户买了东西,发现自己的钱已经扣了,但是页面显示的还是未支付状态,这种情况就非常严重了,造成这个现象的原因可能有多种,比如,网络丢包,发布消息者挂了,MQ挂了,消费者挂了等等,可能有硬件层面,也可能有软件层面,甚至是网络层面,对于这一系列问题,我们应该尽量的保证消息的可靠性,让数据一致性得到保证,接下来,从三个方面进行分析,以及提出解决方案,不过具体还是得看业务需求,是否需要数据的强一致性。

2.发送者的可靠性

        2-1:发送者重连

        消息的发布有三个角色,发布者,MQ,消费者,在发消息的时候需要和MQ进行连接,这个连接时一个网络连接,如果因为网络问题,消息发送失败,可能会导致数据不一致产生。

解决方案:在发布者的application.yaml中配置

spring:
  rabbitmq:
    connection-timeout: 1s # 设置MQ的连接超时时间
    template:
      retry:
        enabled: true # 开启超时重试机制
        initial-interval: 1000ms # 失败后的初始等待时间
        multiplier: 1 # 失败后下次的等待时长倍数,下次等待时长 = initial-interval * multiplier,如果是2,第一此初始化1秒,2秒,4秒,以此类推
        max-attempts: 3 # 最大重试次数

        注意:当网络不稳定的时候,利用重试机制可以有效提高消息发送的成功率。不过SpringAMQP提供的重试机制是阻塞式的重试,也就是说多次重试等待的过程中,当前线程是被阻塞的,会影响业务性能。如果对于业务性能有要求,建议禁用重试机制。如果一定要使用,请合理配置等待时长和重试次数,当然也可以考虑使用异步线程来执行发送消息的代码。

        2-2:发送者确认机制

SpringAMQP提供了Publisher Confirm和Publisher Return两种确认机制。开启确机制认后,当发送者发送消息给MQ后,MQ会返回确认结果给发送者,发送者再证实这个返回结果。返回的结果有以下几种情况:

  • 消息投递到了MQ,但是路由失败。此时会通过PublisherReturn返回路由异常原因,然后返回ACK,告知投递成功
  • 临时消息投递到了MQ,并且入队成功,返回ACK,告知投递成功
  • 持久消息投递到了MQ,并且入队完成持久化,返回ACK,告知投递成功
  • 其它情况都会返回NACK,告知投递失败

92c4e225b2cc46f1bddcb507065bc3c8.png

返回ACK不需要重发,NACK需要重发

在publisher模块的application.yaml中添加配置:

spring:
  rabbitmq:
    publisher-confirm-type: correlated # 开启publisher confirm机制,并设置confirm类型
    publisher-returns: true # 开启publisher return机制

这里 publisher-confirm-type 有三种模式可选:

  • none : 关闭confirm机制
  • simple:同步阻塞等待MQ的回执
  • correlated:MQ异步回调返回回执

一般推荐使用 correlated,回调机制。

每个RabbitTemplate只能配置一个ReturnCallback,因此我们可以在配置类中统一设置。我们在publisher模块定义一个配置类:

@Configuration
@RequiredArgsConstructor
@Slf4j
public class MqConfig {

    private final RabbitTemplate rabbitTemplate;

    @PostConstruct
    public void init(){
        rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
            @Override
            public void returnedMessage(ReturnedMessage returnedMessage) {
                log.error("触发returnCallback");
                log.debug("exchange:{}",returnedMessage.getExchange());
                log.debug("message:{}",returnedMessage.getMessage());
                log.debug("routingKey:{}",returnedMessage.getRoutingKey());
                log.debug("replyCode:{}",returnedMessage.getReplyCode());
                log.debug("replyText:{}",returnedMessage.getReplyText());
            }
        });
    }
}

由于每个消息发送时的处理逻辑不一定相同,因此ConfirmCallback需要在每次发消息时定义。具体来说,是在调用RabbitTemplate中的convertAndSend方法时,多传递一个参数:

@RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "simple2.queue"),
            exchange = @Exchange(name = "simple2.direct"),
            key = "simple2"
    ))
    public void testPublisherConfirmRule() {
        CorrelationData cd = new CorrelationData(UUID.randomUUID().toString());
        cd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {
            @Override
            public void onFailure(Throwable ex) {
                // Future发生异常时的处理逻辑,基本不会触发
                log.error("Future发生异常时的处理逻辑", ex);
            }

            @Override
            public void onSuccess(CorrelationData.Confirm result) {
                if (result.isAck()){
                    log.debug("发送消息成功,收到ACK!");
                }
                else {
                    log.error("发送消息失败,收到NACK!");
                    //TODO: 重发消息

                }
            }
        });
        //交换机名称
        String exchange = "simple2.direct";
        //消息
        String message = "hello mq";
        //发送消息
        rabbitTemplate.convertAndSend(exchange, "simple2", message, cd);
    }

3.MQ的可靠性

        在默认情况下,RabbitMQ会将接受到的消息保存在内存中以降低消息收发的延迟。这样会导致两个问题:

  • 一旦MQ宕机,内存中的消息会丢失
  • 内存空间有限,当消费者故障或处理过慢时,会导致消息积压,引发MQ阻塞

方案一:数据持久化

数据持久化包括三个方面:交换机持久化,队列持久化,消息持久化

SpringAMQP默认生成的交换机和队列以及发消息都是持久化的

方案二:lazy queue(既能保证并发能力,又不用写内存)

        从RabbitMQ的3.6.0版本开始,就增加了LazyQueue的概念,也就是惰性队列。惰性队列的特征如下:

  • 接收到消息后直接存入磁盘,不再存储到内存
  • 消费者要消费消息时才会从磁盘中读取并加载到内存(可以提前缓存部分消息到内存,最多2048条)在3.12版本后,所有队列都是LazyQueue模式,无法更改。

那如何把queue变成lazy queue,可以基于声明bean的形式,也可以通过注解的方式

@Bean
public Queue lazyQueue(){
    return QueueBuilder
            .durable("lazy.queue")
            .lazy() // 开启Lazy模式
            .build();
}
@RabbitListener(queuesToDeclare = @Queue(
        name = "lazy.queue",
        durable = "true",
        arguments = @Argument(name = "x-queue-mode", value = "lazy") //这两个是固定的
))
public void listenLazyQueue(String msg){
    log.info("接收到 lazy.queue的消息:{}", msg);
}

开启持久化和生产者确认时,RabbitMQ只有在消息持久化完成后才会给生产者返回ACK回执

4.消费者的可靠性

        4-1:消费者确认机制

        消费者确认机制时为了确认消费者是否成功处理消息。当消费者处理消息结束后,应该向RabbitMQ发送一个回执,告知RabbitMQ自己消息的处理状态。状态有三种:

  • ack:成功的处理了消息,RabbitMQ队列中删除这个消息
  • nack:消息处理失败,RabbitMQ就会再次给消费者投递消息,持续投递
  • reject:消息处理失败,RabbitMQ队列中删除这个消息(一般是消息内容有问题,所以拒绝)

0bdc481054844c36b234f5020190e542.png

        SpringAMQP已经实现了消息确认功能。并允许我们通过配置文件选择ACK处理方式,有三种方式:

  • none:不处理。即消息投递给消费者后立刻ack,消息会立刻从MQ删除。非常不安全,不建议使用
  • manual:手动模式。需要自己在业务代码中调用api,发送ack或reject,存在业务入侵,但更灵活
  • auto:自动模式。SpringAMQP利用AOP对我们的消息处理逻辑做了环绕增强,当业务正常执行时则自动返回ack.当业务出现异常时,根据异常判断返回不同结果:
  1. 如果是业务异常,会自动返回nack
  2. 如果是消息处理或校验异常,自动返回reject

需要配置在消费的application.yaml中

spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: auto # 自动ack

4-2:失败重试机制

        SpringAMQP提供了消费者失败重试机制,在消费者出现异常时利用本地重试,而不是无限的requeue到mq。通过在消费者的application.yaml文件中添加配置来开启重试机制:

spring:
  rabbitmq:
    listener:  # 这里注意区别,发布者有个失败重连机制和这个配置很像
      simple:
        retry:
          enabled: true # 开启消费者失败重试
          initial-interval: 1000ms # 初识的失败等待时长为1秒
          multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval
          max-attempts: 3 # 最大重试次数
          stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false

        在开启重试模式后,重试次数耗尽,如果消息依然失败,则需要有MessageRecoverer接口来处理,它包含三种不同的实现:

  • RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认是这种方式
  • ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队
  • RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机

第三种示例图

1d0061a9a9c04b3dae1fbf6915904e7c.png

1)在consumer服务中定义处理失败消息的交换机和队列

@Bean
public DirectExchange errorMessageExchange(){
    return new DirectExchange("error.direct");
}
@Bean
public Queue errorQueue(){
    return new Queue("error.queue", true);
}
@Bean
public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){
    return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");
}

2)定义一个RepublishMessageRecoverer,关联队列和交换机

@Bean
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
    return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
}

完整代码:

@Configuration
@ConditionalOnProperty(name = "spring.rabbitmq.listener.simple.retry.enabled", havingValue = "true")
public class ErrorMessageConfig {
    @Bean
    public DirectExchange errorMessageExchange(){
        return new DirectExchange("error.direct");
    }
    @Bean
    public Queue errorQueue(){
        return new Queue("error.queue", true);
    }
    @Bean
    public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){
        return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");
    }

    @Bean
    public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
        return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error"); //第二个参数是routingKey
    }
}

4-3:幂等性业务

        在程序开发中,是指同一个业务,执行一次或多次对业务状态的运行是一致的。

 

        如果网络问题出现故障,有可能出现把一个业务做了多次,比如扣减库存,总不能扣减两次吧,于是可以采取对每一个消息指定一个消息id,id值唯一,然后执行完这个消息后,把这个消息id存到数据库里,然后每次执行消息的时候,可以去数据库查一下有没有这个消息,如果有,就代表这个消息之前执行过了,于是就不对这个消息做处理。

        在做注册消息转换器为bean的时候,可以设置消息的id,然后我们接受消息的时候用Message来接就可以

@Bean
public MessageConverter messageConverter(){
    // 1.定义消息转换器
    Jackson2JsonMessageConverter jjmc = new Jackson2JsonMessageConverter();
    // 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息
    jjmc.setCreateMessageIds(true);
    return jjmc;
}

        用Message接,然后message.getMessageProperties().getMessageId() 获取id,那这个id做业务判断就可以了。

以上是对于非幂等业务的一种方案,但明显这种方案不太好。影响mq性能。

另一种就是基于具体业务逻辑来进行判断来实现业务的幂等,比如我在一个业务执行前,先判断这个业务的一个状态,如果状态以及修改过了,我直接不做处理就行了,如果没有,我在进行修改。

5.延迟消息

        5-1:死信交换机

当一个队列中的消息满足下列情况之一时,可以成为死信(dead letter):

  • 消费者使用basic.rejectbasic.nack声明消费失败,并且消息的requeue参数设置为false

  • 消息是一个过期消息,超时无人消费

  • 要投递的队列消息满了,无法投递

        如果一个队列中的消息已经成为死信,并且这个队列通过dead-letter-exchange属性指定了一个交换机,那么队列中的死信就会投递到这个交换机中,而这个交换机就称为死信交换机(Dead Letter Exchange)。而此时加入有队列与死信交换机绑定,则最终死信就会被投递到这个队列中。

5ec9b7a546924919b0d7ac7fca942f88.png

死信交换机有什么作用呢?

  1. 收集那些因处理失败而被拒绝的消息

  2. 收集那些因队列满了而被拒绝的消息

  3. 收集因TTL(有效期)到期的消息

声明正常队列和正常交换机和的时候使用bean方式,因为正常队列要指定死信交换机,

 @Bean
 public Queue normalQueue(){
     return QueueBuilder
             .durable("normal.queue")
             .deadLetterExchange("dlx.direct")
             .build();
 }

发送者示例代码:给正常队列设置一个TTL,如果时间到了,把消息给死信交换机,模拟延迟消息

rabbitTemplate.convertAndSend("normal.direct", "hi", "hello", new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                message.getMessageProperties().setExpiration("1000000"); 
                return message;
            }
        });

5-2:延迟消息插件

先去社区下载 https://blog.rabbitmq.com/posts/2015/04/scheduling-messages-with-rabbitmq

去查看RabbitMQ的插件目录对应的数据卷。

docker volume inspect mq-plugins

然后进入插件那么目录

docker exec -it mq rabbitmq-plugins enable rabbitmq_delayed_message_exchange

这样插件就装好了

然后声明一个延迟交换机,基于注解方式

@RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = "delay.queue", durable = "true"),
        exchange = @Exchange(name = "delay.direct", delayed = "true"),
        key = "delay"
))
public void listenDelayMessage(String msg){
    log.info("接收到delay.queue的延迟消息:{}", msg);
}

基于bean方式

@Bean
public DirectExchange delayExchange(){
    return ExchangeBuilder
            .directExchange("delay.direct") // 指定交换机类型和名称
            .delayed() // 设置delay的属性为true
            .durable(true) // 持久化
            .build();
}

发送延迟消息:

@Test
void testPublisherDelayMessage() {
    // 1.创建消息
    String message = "hello, delayed message";
    // 2.发送消息,利用消息后置处理器添加消息头
    rabbitTemplate.convertAndSend("delay.direct", "delay", message, new MessagePostProcessor() {
        @Override
        public Message postProcessMessage(Message message) throws AmqpException {
            // 添加延迟消息属性
            message.getMessageProperties().setDelay(5000);
            return message;
        }
    });
}

注意:

延迟消息插件内部会维护一个本地数据库表,同时使用Elang Timers功能实现计时。如果消息的延迟时间设置较长,可能会导致堆积的延迟消息非常多,会带来较大的CPU开销,同时延迟消息的时间会存在误差。

因此,不建议设置延迟时间过长的延迟消息

 

标签:可靠性,处理,RabbitMQ,重试,队列,交换机,MQ,消息,public
From: https://blog.csdn.net/iwjijksw/article/details/142487628

相关文章

  • 开源音频处理项目推荐【持续更新】
    Audacity介绍:Audacity是一款功能强大的开源音频编辑软件,适用于多种操作系统,包括Windows、macOS和Linux。它支持多轨音频编辑、录制,并且提供了丰富的音频处理功能,如剪切、复制、粘贴、混音、降噪等。Audacity的最新版本是3.6版本,于2024年7月16日发布,带来了主通道、新效果、......
  • RabbitMQ通讯方式第二讲:Work Queues
    了解WorkQueues  1.1官网中的图片:通过官网里的图片,我们可以看到WordQueues与HelloWorld的区别,这里的消费者增加,但是时多个消费者消费单个队列,在这里我们依然要注意,这里面使用的是默认的交换机,并不是直接连接的队列。  1.2直观的图片:更好的理解每次的连接都是......
  • 实例讲解电动汽车故障分级处理策略及Simulink建模方法
    电动汽车的故障有很多种,每种故障发生时产生危害性是不同的,因此对于不同故障应采取不同的处理方式。目前一般有两种故障处理方式,一种是针对每一种故障对其故障危害性进行判断,然后针对不同故障设定不同的故障处理机制;另一种是针对危害性相似的故障进行分类分级划分,同一级别故障采......
  • python--数据处理分析模块pandas
    一、什么是pandas?Pandas是基于Numpy的一套数据分析工具,该工具是为了解决数据分析任务而创建的。Pandas纳入了大量标准的数据模型,提供了高效地操作大型数据集所需的工具。Pandas提供了大量能使我们快速便捷地处理数据的函数和方法。它是使Python成为强大而高效的数据......
  • 并发处理的利器:深入探讨锁分离设计+6大分离场景(高并发篇)
    锁分离设计的本质在于将对共享资源的访问操作根据其类型或性质区分开来,并为每种操作提供独立的锁。这种设计背景通常源于对高并发系统的需求,其中多个线程或进程需要频繁地对共享资源进行读写或其他操作。在传统的锁机制中,所有操作都可能使用同一把锁,这在高并发环境下会导致严重的......
  • 浅谈如何处理大语言模型训练数据之三开源数据集介绍
    随着最近这些年来基于统计机器学习的自然语言处理的算法的发展,以及信息检索研究的需求,特别是近年来深度学习和预训练语言模型的研究以及国内国外许多大模型的开源,研究人员们构建了多种大规模开源数据集,涵盖了网页、图片、论文、百科等多个领域。在构建大语言模型时,数据的质量和多......
  • RabbitMQ在.net core中的应用
    RabbitMQ是一个可靠且成熟的消息传递和流代理,它很容易部署在云环境、内部部署和本地机器上。它目前被全世界数百万人使用。1.基本概念生产者(Producer)生产者是一个发送消息的程序。发送消息的程序可以是任何语言编写的,只要它能够连接到RabbitMQ服务器,并且能够发送消息到RabbitM......
  • Python字典进阶:setdefault技巧让你的代码更优雅,用setdefault优化你的Python数据处理流
    推荐阅读:数据科学的秘密武器:defaultdict——Python字典的自动化填充神器,让数据结构更灵活一、什么是setdefaultPython中的setdefault方法是字典(dict)类型的一个非常实用的方法,它允许开发者在尝试访问字典中不存在的键时,自动为该键设置一个默认值,并返回这个默认值。 二、s......
  • 处理第k小问题
    题目正数通常比较好处理,那我们先想个办法把所有的负数转为正数,我们可以求一下所有负数的和\(sum\),这一定是最小数,那我们考虑如何将其变小一点,无非是去掉一个加上的负数或是加上一个正数,诶,那这样,去掉负数不就等于加上一个正数吗,这样我们就可以将所有的负数转化为正数,选出来的数......
  • Java 音视频处理详解
    Java作为一种通用的编程语言,具备强大的跨平台能力和丰富的第三方库支持,使其在音视频处理领域也能大展拳脚。本文将详细介绍Java在音视频处理中的常用技术和方法,包括音视频捕获、处理、存储和播放。通过对实际代码示例的讲解,帮助读者深入理解并掌握Java音视频处理的核心内容。......