首页 > 其他分享 >提高MQ可靠性

提高MQ可靠性

时间:2024-05-06 16:46:04浏览次数:22  
标签:可靠性 exchange 提高 投递 交换机 失败 消息 MQ

提高可靠性通过以下四个方面:

  生产者的可靠性(发送消息时丢失)

    生产者发送消息时连接MQ失败

    生产者发送消息到达MQ后未找到exchange

    生产者发生消息到达MQ的exchange后,未找到合适的queue

    消息到达MQ后,处理消息的进程发生异常

  MQ的可靠性(MQ导致消息丢失)

    消息保存到queue后,未消费就突然宕机

  消费者的可靠性(消费者处理消息丢失)

    消息接收后未处理就发生异常

    消息接收后处理过程中抛出异常

  延迟消息

 

注意:!生产者重连与生产者确认都相当影响系统性能,一般情况下不建议开启 | rabbitMQ在3.12后无需配置,在3.6后只需将队列改为懒惰队列

 

生产者:

生产者重连:网络波动可能导致发送者连接MQ失败,我们可以通过配置实现连接失败后重连的机制

spring:
  rabbitmq:
    connection-timeout: 1s # 设置MQ的连接超时时间
    template:
      retry:
        enabled: true # 开启超时重试机制
        initial-interval: 1000ms # 失败后的初始等待时间
        multiplier: 1 # 失败后下次的等待时长倍数,下次等待时长 = initial-interval * multiplier
        max-attempts: 3 # 最大重试次数

注意:重连是阻塞式的重试,多次等待的过程中,当前线程是阻塞的,会影响性能

 

生产者确认:开启确认机制后,MQ收到消息会返回确认结果给生产者。SpringAMQP提供了publisherConfirm和publisherReturn两种确认机制

返回结果一般有以下几种:

  消息投递给MQ,路由失败,此时通过publisherRetrun返回路由异常的原因,返回ACK,告知投递成功

  临时消息投递给MQ,入队成功,返回ACK,告知投递成功

  持久化消息投递给MQ,入队成功并持久化,返回ACK,告知投递成功

  其他情况都是NACK,告知投递失败

使用步骤:

spring:
  rabbitmq:
    publisher-confirm-type: correlated # 开启publisher confirm机制,并设置confirm类型 1.none关闭确认机制,2.simple同步阻塞等待MQ消息,3.correlated异步回调等待MQ消息
    
   publisher-returns: true # 开启publisher return机制

publisher-return(这个比下面那个常见,一般replyText内会写明为什么失败)为rabbitMQ模板添加return callback(每个template只能添加一个,因此我们统一配置)

@Slf4j
@Configuration
public class MqConfig {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @PostConstruct
    public void init() {
        rabbitTemplate.setReturnsCallback(returnedMessage -> {
            log.error("监听到了return callback");
            log.debug("exchange:{}", returnedMessage.getExchange());
            log.debug("routingKey:{}", returnedMessage.getRoutingKey());
            log.debug("msg:{}", returnedMessage.getMessage());
            log.debug("replaytext:{}", returnedMessage.getReplyText());
            log.debug("replaycode:{}", returnedMessage.getReplyCode());
        });
    }
}

publish-confirm(这个一般是编码失败,很少见,而且直接接收到nack。在演示过程中,exchange写错会导致这个问题。所以说这个问题就相当于客户端和rabbitMQ交互时消息压根都没发给正确的交换机

 

 @Test
    public void testCallback(){
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        correlationData.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {
            @Override
            public void onFailure(Throwable ex) {
                log.error("AMQP的异常,不怪我!:",ex);
            }
            @Override
            public void onSuccess(CorrelationData.Confirm result) {
                if(result.isAck()){
                    log.info("收到MQ的ack");
                }else {
                    log.debug("收到MQ的nack:{}",result.getReason());
                    // TODO 重发消息
                }
            }
        });
        rabbitTemplate.convertAndSend("exchangeName","routeKey","message",correlationData);
    }

简单来说,生产者确认机制就是一句话:confirm是保证我们发送信息到exchange了,而returns则是保证我们发送到queue了。

 

MQ:(这一栏基本都是不用配置的)

MQ正常情况下储存机制和redis类似,都是存内存的,因此他会因为宕机丢失,因为内存而阻塞,同样具有持久化策略。

1.数据持久化(AMQP这些都是默认的,不用处理)

  交换机持久化(生成的时候默认就是durable)

  队列持久化(生成的时候默认就是durable)

  消息持久化(这仨用springAMQP发的时候默认都是持久化的)

2.LazyQueue:3.6.0加入,3.12后所有queue都是lazy模式,而且不能改

  特征:收到的消息直接写入磁盘,消费者消费的时候才在磁盘中加载,再投递给消费者,并且可以提前缓存部分消息在内存。

用bean的时候只需要在链式中加上一个.lazy(),用注解的时候需要加上arguments=@Argument(name="x-queue-mode",value="lazy")

 

消费者:

消费者确认机制:功能类似发送者确认机制。处理完消息后发送下面三种结果

  返回ack:处理成功了,告诉queue销毁msg

  返回nack:处理失败,让queue再发送msg

  返回reject:处理失败了,而且让queue销毁msg

SpringAMQP提供三种配置方式:none,manual(手动档,存在业务入侵,但是更灵活),auto(运用AOP进行环绕增强)

spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: auto# 自动挡 爽!

 消费者失败重连机制:

spring:
  rabbitmq:
    listener:
      simple:
        retry:
          enabled: true # 开启消费者失败重试
          initial-interval: 1000ms # 初识的失败等待时长为1秒
          multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval
          max-attempts: 3 # 最大重试次数
          stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false

失败后有三种策略:

1.重试次数耗尽后,直接reject,丢弃消息

2.重试次数耗尽后,返回nack,消息重新入队

3.重试次数耗尽后,将失败消息投递给指定的交换机

这里我们选择第三种,在失败后统一将消息传给一个指定投放error消息的队列

先是注册exchange,queue,binding

@Bean
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
    return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
}

这样会在消息发送失败之后将消息内容,错误栈统一发送到队列中。

 

业务幂等性:

幂等是一个数学概念,即f(x)=f(f(x)),在程序开发中,指的是同一个业务,执行一次或者多次对业务的状态是一样的

方案一:唯一消息ID

  1. 每一条消息都生成一个唯一的id,与消息一起投递给消费者。
  2. 消费者接收到消息后处理自己的业务,业务处理成功后将消息ID保存到数据库
  3. 如果又收到相同消息,判断ID是否存在,存在则为重复消息放弃处理

SpringAMQP中的MessageConverter自带了MessageID功能,开启即可

    @Bean
    public MessageConverter messageConverter(){
        Jackson2JsonMessageConverter jjmc = new Jackson2JsonMessageConverter();
        jjmc.setCreateMessageIds(true);
        return jjmc;
    }

缺点:业务侵入,而且会影响原本的性能

方案二:业务判断实现幂等性

实例

public void paySuccess(Long orderId){
        //1.查询订单状态为未支付
        Order byId = service.getById(orderId);
        //2.判断是否需要进行修改
        if(byId==null||byId.getStatus() != 1){
            return;
        }
        //3.修改订单
        service.markOrderPaySuccess(orderId);
    }

 

最终兜底方案:延迟消息

1.死信交换机

什么是死信(dead letter)交换机:满足下面条件之一的

  消费者使用basic.reject或者basic.nack声明消费失败,并且消息的requeue参数为false;(处理失败而拒绝的消息)

  消息是一个过期消息,超时无人消费(过期的消息)

  投递的消息队列满了,最早的消息可能成为死信消息(队列满了而被拒绝的消息)

如何实现:队列通过dead-letter-exchange属性指定一个交换机,并且他不会有消费者。那么该队列中的死信就会投递到这个交换机中,这个交换机就是死信交换机。我们可以通过设置消息的ttl来实现消息的延迟发送

 

2.延迟消息插件(常用,并且较简便)

将普通交换机改造为支持延迟消息功能的交换机,消息投递到队列后可以暂存一段时间,之后再投递到队列

首先是在rabbitMQ中挂载插件并启动 rabbitmq/rabbitmq-delayed-message-exchange: Delayed Messaging for RabbitMQ

在发送时在properties中加入delay,在接受消息的时候,exchange的delay属性设置为true

标签:可靠性,exchange,提高,投递,交换机,失败,消息,MQ
From: https://www.cnblogs.com/kun1790051360/p/18171792

相关文章

  • Mac 安装 RabbitMQ
    一般来说,安装分为两种方式:通过brew命令安装。在这里,推荐使用brew来安装,非常强大的Mac端包管理工具。下载RabbitMQ源文件,解压源文件之后进行安装。Docker启动一、brew命令安装Mac安装RabbitMQ1、安装erlangbrewinstallerlang2、安装rabbitmqbrewinstall......
  • 快速入门一篇搞定RocketMq-实现微服务实战落地
    1、RocketMq介绍RocketMQ起源于阿里巴巴,最初是为了解决邮件系统的高可靠性和高性能而设计的。在2016年开源分布式消息中间件,并逐渐成为Apache顶级项目。现在是Apache的一个顶级项目,在阿里内部使用非常广泛,已经经过了"双11"这种万亿级的消息流转,性能稳定、高效。官网地址:https://......
  • RabbitMQ的基本使用
    在数据采集的过程中,可能需要一些进程间的通信,如一个进程负责构造爬取请求,另一个负责执行这些请求;某个数据爬取进程执行完毕,通知另一个负责数据处理的进程开始爬取数据;某个进程新建了一个爬取任务,通知另一个负责数据爬取的进程开始爬取数据。为了降低进程耦合度,需一个消息......
  • 线段树--RMQ
    这是带上lazy标记的线段树板子inta[N];intls(intp){returnp<<1;}intrs(intp){returnp<<1|1;}classSegmentTree{public: inttree[N<<2|1],tag[N<<2|1]; inlinevoidpush_up(intp){ tree[p]=tree[ls(p)]+tree[rs(p)]; } in......
  • MQTT上报阿里云
    1.代码实现1.主函数代码段#include<stdio.h>#include<string.h>#include<errno.h>#include<sys/types.h>#include<dirent.h>#include<fcntl.h>#include<unistd.h>#include<stdlib.h>#include<time.h>#includ......
  • MQTT服务器连接不上的问题
    问题描述环境:阿里云服务器Ubuntu22.04.3LTS,安装mosquitto后,在虚拟机端订阅消息出现报错(以前用阿里云Ubuntu20.04LTS的服务器装上就能用),以下服务器ip是我乱填的mosquitto_sub-t/iotstuff-h129.25.125.124-p1883Error:Connectionrefused解决办法云服务器ECS->安......
  • 注册表碎片整理是一种优化操作系统注册表的方法,旨在减少注册表文件的碎片化,从而提高系
    注册表碎片整理是一种优化操作系统注册表的方法,旨在减少注册表文件的碎片化,从而提高系统性能和响应速度。它通过重新整理和优化注册表文件的存储结构,以及压缩空闲空间等方式,来改善系统的整体表现。注册表是Windows操作系统中的核心组件之一,它存储了系统和安装的应用程序的配......
  • P1525 [NOIP2010 提高组] 关押罪犯
    原题链接题解这题我采用了带权并查集的做法,0代表两囚犯处于监狱,1代表两囚犯不同监狱。根据题意,我们想让冲突值尽可能的小,那么我们要先把仇恨值大的两罪犯放在不同监狱;即按仇恨值从大到小的去判断每条仇恨信息。(贪心思想)code #include<bits/stdc++.h>usingnamespacestd;......
  • 提高安全性,优雅实现拷贝与交换:C++中的Copy-and-Swap惯用法
     概述:拷贝并交换(Copy-and-Swap)是C++编程中的惯用法,用于实现赋值操作符和确保异常安全的拷贝构造函数。其核心思想是通过拷贝构造函数创建临时副本,再通过交换确保操作的异常安全性。这种方法在C++11之前和之后都适用,但在C++11中,移动语义和右值引用的引入使得实现可以更加高效。......
  • rabbitMQ
    同步调用的优点:时效性强,等待到结果后才会返回  缺点:拓展性差,性能下降,级联失败问题异步调用的优点:接触耦合,增强拓展性;无需等待,性能好;故障隔离;缓存消息,流量削峰填谷缺点:时效性差,不确定是否成功,业务安全依赖于broker的可靠性 rabbitMQ整体架构:virtua......