首页 > 其他分享 >RabbitMQ的消息可靠性

RabbitMQ的消息可靠性

时间:2023-05-25 23:12:58浏览次数:49  
标签:publisher 可靠性 ack RabbitMQ 交换机 失败 消息

RabbitMQ如何保证消息可靠性?
1.开启生产者确认机制,确保生产者的消息能到达队列。
2.开启持久化功能,确保消息未消费前在队列中不会丢失。
3.开启消费者确认机制为auto,由Spring确认消息处理成功后完成ack。
4.开启消费者失败重试机制,并设置MessageRecoverer,多次重试失败后将消息投递到指定的交换机,交由人工处理。

实现生产者确认机制
原理:RabbitMQ提供了publisher confirm机制来避免消息发送到MQ过程中丢失。这种机制必须给每个消息指定一个唯一ID。消息发送到MQ以后,会返回一个结果给发送者,表示消息是否处理成功。
返回结果有两种情况:
1.publisher-confirm,发送者确认。发送者确认又分为两种情况。一种是消息成功到达交换机,返回ack;另一种是消息没有到达交换机,返回nack。
2.publisher-return,发送者回执。消息成功到达交换机,但是在投递到队列时失败。此时返回ack及失败原因。

注意:确认机制发送消息时,需要给每个消息设置一个全局唯一的id,以区分不同的消息,避免ack冲突。
实现步骤
1.修改配置。在publisher服务中的application.yml文件中添加下列内容

点击查看代码
spring:
  rabbitmq:
    publisher-confirm-type: correlated
    publisher-returns: true
    template:
      mandatory: true
说明:
点击查看代码
1.publisher-confirm-type:开启publisher-confirm,这里支持两种类型:
* simple:同步等待confirm结果,直到超时。
* correlated:异步回调,定义ConfirmCallback,MQ返回结果时会回调这个ConfirmCallback。
2.publisher-returns:开启publisher-return功能,同样是基于callback机制,不过是定义了ReturnCallback。
3.template.mandatory:定义消息路由失败时的策略。true,则调用ReturnCallback;false:则直接丢弃消息。
2.定义Return回调 每个RabbitTemplate只能配置一个ReturnCallback,因此需要在项目加载时配置。
点击查看代码
@Slf4j
@Configuration
public class CommonConfig implements ApplicationContextAware {
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        //获取RabbitTemplate
        RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
        //设置ReturnCallback
        rabbitTemplate.setReturnCallback((message,replyCode,replyText,exchane,routingKey)->{
            //投递失败,记录日志
            log.info("消息发送失败,应答码{},原因{},交换机{},路由键{},消息{}",
                    replyCode,replyText,exchane,routingKey,message.toString());
            //如果业务需要,可以重新发送消息
            /*if (routingKey==null || routingKey.startsWith("simple")){
                rabbitTemplate.convertAndSend(交换机,"队列",消息);
            }*/
        });
    }
}

3.定义ConfirmCallback ConfirmCallback可以在发送消息时指定,因为每个业务处理confirm成功或失败的逻辑不一定相同。
点击查看代码
@Slf4j
@SpringBootTest
@RunWith(SpringRunner.class)
public class ConfirmCallback {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Test
    public void ConfirmQueue(){
        //1.消息体
        String msg="Hello,RabbitMQ!";
        //2.设置一个全局唯一ID,封装到CorrelationData中
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        //3.添加callback
        correlationData.getFuture().addCallback(
                result ->{
                    //3.1:ack,消息发送成功
                    if (result.isAck()){
                        log.debug("消息发送成功,ID:{}"+correlationData.getId());
                    }else { //3.2:nack,消息发送失败
                        log.error("消息发送失败未送达交换机,ID:{},原因:{}",correlationData.getId(),result.getReason());
                    }
                },
                ex -> log.error("消息发送异常,ID:{},原因:{}",correlationData.getId(),ex.getMessage())
        );
        //4.发送消息
        rabbitTemplate.convertAndSend("wzh.direct","miku",msg,correlationData);
    }
}
4.测试并查看效果 先查看投递到交换机失败的效果。将“rabbitTemplate.convertAndSend("wzh.direct","miku",msg,correlationData);”中的交换机修改成错误的交换机。


这里打印出了消息的id和发送失败的原因。
查看投递到队列失败的效果。将“rabbitTemplate.convertAndSend("wzh.direct","miku",msg,correlationData);”中的routingKey修改成错误的routingKey。

这里打印出了相关信息。


实现消息持久化
生产者确认可以确保消息投递到RabbitMQ的队列中,但是消息发送到RabbitMQ以后,如果突然宕机,也可能导致消息丢失。
要想确保消息在RabbitMQ中安全保存,必须开启消息持久化机制。
实现消息持久化从三个方面入手
1.交换机持久化
2.队列持久化
3.消息持久化
默认情况下,SpringAMQP发出的任何消息都是持久化的,不用特意指定。


实现消费者确认机制
RabbitMQ是阅后即焚机制,RabbitMQ确认消息被消费者消费后会立刻删除。而RabbitMQ是通过消费者回执来确认消费者是否成功处理消息的:消费者获取消息后,应该向RabbitMQ发送ACK回执,表明自己已经处理消息。
设想一个场景:RabbitMQ将消息投递给消费者,消费者返回ack给RabbitMQ,RabbitMQ删除消息,但是此时消费者宕机,消费还没有处理。如此一来,消息便丢失了。因此需要把握消费者返回ack的时机。
SpringAMQP允许三种确认模式

点击查看代码
1.manual:手动ack,需要在业务代码结束后,调用api发送ack。(manual:自己根据业务情况,判断什么时候该返回ack)
2.auto:自动ack,由Spring检测listener代码是否出现异常,没有异常则返回ack,抛出异常则返回nack。(auto模式类似事务机制,出现异常时返回nack,消息回滚到mq;没有异常,返回ack)
3.none:关闭ack,MQ假定消费者获取消息后会处理成功,在投递完成后立即删除消息。(该模式下消息投递不可靠,可能丢失。)
一般情况下使用默认的auto模式即可。

实现消费者失败重试机制
当消费者出现异常后,消息会不断requeue(重入队)到队列,再重新发送给消费者,然后再次异常,再次requeue,无限循环,导致mq的消息处理飙升,带来不必要的压力。这就需要使用重试机制进行介入。
实现步骤
1.修改consumer服务的application.yml文件,添加内容

点击查看代码
spring:
  rabbitmq:
    listener:
      simple:
        retry:
          enabled: true # 开启消费者失败重试
          initial-interval: 1000 # 初识的失败等待时长为1秒
          multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval
          max-attempts: 3 # 最大重试次数
          stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false

2.选择失败策略,一般是将发送失败的消息发送到指定的交换机。
三种失败策略

点击查看代码
- RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认就是这种方式
- ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队
- RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机
2.1在consumer服务中定义处理失败消息的交换机和队列
点击查看代码
@Bean
    public DirectExchange errorExchange(){
        return new DirectExchange("wzh.error");
    }
    @Bean
    public Queue errorQueue(){
        return new Queue("error.queue");
    }
    @Bean
    public Binding errorBinding(){
        return BindingBuilder.bind(errorQueue()).to(errorExchange()).with("error");
    }
2.2定义一个RepublishMessageRecoverer,关联队列和交换机
点击查看代码
@Bean
    public MessageRecoverer republicMeassageRecoverer(RabbitTemplate rabbitTemplate){
        return new RepublishMessageRecoverer(rabbitTemplate,"wzh.error","error");
    }
3.测试并查看效果


在消费者一端弄一个错误,之后向该消费者发送一条消息。

等待几秒后会在控制台上打印出一条信息。
到RabbitMQ客户端查看,此时error.queue已经有了一条消息。


点击去查看消息,会发现异常原因和消息体都会出现。

标签:publisher,可靠性,ack,RabbitMQ,交换机,失败,消息
From: https://www.cnblogs.com/wzh-Official/p/17433247.html

相关文章

  • python 发送微信消息
    python自动化,可以模拟键盘输入,因此,可以控制微信,发送消息,代码如下:1importsys2importpyautogui3importpyperclip4importtime5importconfigparser67"""8安装依赖:9pipinstallpyautoguipyperclippyinstaller1011打包成exe:12pyins......
  • rabbitmq中的queueDeclare方法
    queueDeclareQueue.DeclareOkqueueDeclare()throwsIOException;/***Declareaqueue*@seecom.rabbitmq.client.AMQP.Queue.Declare*@seecom.rabbitmq.client.AMQP.Queue.DeclareOk*@paramqueuethenameofthequeue*@param......
  • RabbitMQ消费消息方法basicConsume
    RabbitMQ-消费消息 Address[]addresses=newAddress[]{newAddress(IP_ADDRESS,PORT)};/***1.建立连接工厂*/ConnectionFactoryconnectionFactory=newConnectionFactory();connectionFactory.setUsername(USER_NAME);......
  • 【消息队列】概览
    消息队列作用:解耦、异步、削峰(大促)引入的问题:如何保证消息的高可用如何保证消息消费的幂等性如何处理消息丢失问题如何保证消息的顺序性如何解决消息积压如何保持数据一致需关注的技术实现:1、存储模型:如何快速的读写 ......
  • RabbitMQ之消息确认机制
    RabbitMQ之消息确认机制标签(空格分隔):php,rabbitmq在使用RabbitMQ的时候,我们可以通过消息持久化操作来解决因为服务器的异常奔溃导致的消息丢失,除此之外我们还会遇到一个问题,当消息的发布者在将消息发送出去之后,消息到底有没有正确到达broker代理服务器呢?如果不进行特殊配置的话......
  • Delphi 12 最新消息
    1.新版本将集成Skia4Delphi。这绝对是有史以来最重要的更新,官方通过深度集成Skia4Delphi,将实现更高质量的界面,更有效率的界面,从此,毛刺现象将成为历史!2.C++将集成Visual Assist。这结于C++用户来说,绝对的福音!3.RTL,IDE,Compiler,Platform等质量优化与改进新版将在这几个方面,继续......
  • docker安装rabbitMQ
    输入命令dockerpullrabbitmq:3.7.7-management  设置账号和密码dockerrun-d--namerabbitmq3.7.7-p5672:5672-p15672:15672-v`pwd`/data:/var/lib/rabbitmq--hostnamemyRabbit-eRABBITMQ_DEFAULT_VHOST=my_vhost-eRABBITMQ_DEFAULT_USER=admin-eRABBITM......
  • 单片机消息队列的实现原理和机制2
    出处消息队列在RTOS中基本都有消息队列这个组件,也是使用最常见的组件之一。1.消息队列的基本概念消息队列是一种常用于任务间通信的数据结构,队列可以在任务与任务间、中断和任务间传递信息,实现了任务接收来自其他任务或中断的不固定长度的消息。通过消息队列服务,任务或中断服务......
  • 单片机消息队列的实现原理和机制1
    出处单片机开发过程中通常会用到“消息队列”,一般实现的方法有多种。本文给大家分享一下队列实现的原理和机制。环形队列环形队列是在实际编程极为有用的数据结构,它是一个首尾相连的FIFO的数据结构,采用数组的线性空间,数据组织简单,能很快知道队列是否满为空,能以很快速度的来存......
  • RabbitMQ系列-概念及安装
    1.消息队列消息队列是指利用队列这种数据结构进行消息发送、缓存、接收,使得进程间能相互通信,是点对点的通信而消息代理是对消息队列的扩展,支持对消息的路由,是发布-订阅模式的通信,消息的发送者并不清楚消息的接收者,消息可以被多个消费者接收。使用消息队列的作用如下异步:对于......