首页 > 其他分享 >rabbitmq发布确认高级

rabbitmq发布确认高级

时间:2023-09-04 11:33:06浏览次数:41  
标签:队列 确认 高级 rabbitmq 交换机 消息 message public

前言

在之前的发布确认中,生产者发送消息到mq中,消费者在从mq中取出消息满足持久化的.
发布确认是一个保证RabbitMQ 可靠性的一个机制
  保证生产者将信息成功的发送到 RabbitMQ的 server端了,那么broker就会回一个确认,如果没有收到或者收到拒绝信息,那么说明可能网络不好没有发送成功,server端宕机了,broker拒绝接收等情况,如果不进行后续处理,那么信息就会丢失,生产者收到失败的消息使用回调函数在进行处理。
  生产者将信道设置成 confirm 模式,所有在该信道上发布的消息都会指定一个唯一的ID,一旦消息投递到队列中,就是发送成功了,broker会立刻发送一个确认ack 给生产者,这个时候,生产者就知道消息已经发送成功了。
  如果队列和信息是持久化的,那么确认消息会在将消息写入磁盘之后再发出,broker返回的确认包含 确认消息的序列号,还可以设置 multiple,表示此序号前的所有消息都得到了处理。
  一旦发布消息,生产者等待确认的同时继续发送下一条消息,如果rabbitMq自身内部错误导致消息为发送成功,生产者就可以再回调方法中继续处理。

保证消息不丢失:

  • 设置要求队列必须持久化 channel.queueDeclare(QUEUE_NAME,isLasting,false,false,null);
  • 设置要求队列中的消息必须持久化channel.basicPublish("",QUEUE_NAME, MessageProperties.MINIMAL_PERSISTENT_BASIC,strMessage.getBytes());
  • 发布确认
    channel.confirmSelect();

但是即使这样数据还是存在丢失的可能,例如mq直接宕机了,或者交换机丢失,队列丢失,都会导致消息最后都丢失了。
解决方法:
image

在mq 中,消费者和生产者并不直接进行通信,生产者只负责把消息发送到队列,消费者只负责从队列获取消息(不管是push还是pull)。

  • 消费者从队列中获取到消息之后,这条消息就不存在队列中了,但是如果此时消费者所在的信道因为网络中断没有消费到,那这条消息就被永远的丢失了,所以,我们希望等待消费者成功消费掉这个消息之后再删除这条消息。
  • 而在发送消息的时候也是这样的,生产者发消息给交换机,也不能保证消息准确发送过去了,消息就像石沉大海一样,所以这样需要一个消息确认。

这个机制就是 消息确认机制。

案例引入

image

生产者确认

由生产者发送到 consumer 的链路为 producer -> broker -> exchange -> queue -> consumer 。
在编码时我们可以用两个选项用来控制消息投递的可靠性:

  • 消息从 producer 到 RabbitMQ broker cluster 成功,则会返回一个 confirmCallback;
  • 消息从 exchange 到 queue 投递失败,则会返回一个 returnCallback

我们可以利用这两个 callback 接口来控制消息的一致性和处理一部分的异常情况。

代码准备

配置文件
spring:
    rabbitmq
      publisher-confirm-type:

它有三个值:

  • NONE:禁用发布确认模式,是默认值
  • CORRELATED:发布消息成功到交换器后触发回调方法
  • SIMPLE:值经测试有两种效果,其一效果和CORRELATED值一样会触发回调方法,其二在发布消息成功后使用rabbitTemplate调用waitForConfirms或waitForConfirmsOrDie方法等待broker节点返回发送结果,根据返回结果来判定下一步的逻辑,要注意的点是waitForConfirmsOrDie方法如果返回false则会关闭channel,则接下来无法发送消息到broker;
配置类
/**
 * 发布确认配置类
 */
@Configuration
public class ConfirmConfig {

    // 交换机名称
    public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";
    // 队列名称
    public static final String CONFIRM_QUEUE_NAME = "confirm.queue";
    // routingkey
    public static final String CONFIRM_ROUTINGKEY  = "key1";

    //声明交换机
    @Bean
    public DirectExchange confirmExchange(){
        return new DirectExchange(CONFIRM_EXCHANGE_NAME);
    }

    //声明队列
    @Bean
    public Queue confirmQueue(){
        return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();
    }

    //绑定关系
    @Bean
    public Binding queueBindingExchange(@Qualifier("confirmExchange") DirectExchange confirmExchange,
                                        @Qualifier("confirmQueue") Queue confirmQueue){
        return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(CONFIRM_ROUTINGKEY);
    }
}
交换机确认回调接口
@Slf4j
@Component
public class MyCallBack implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     * 向rabbitTemplate 注入回调失败的类
     * 后置处理器:其他注解都执行结束才执行。
     */
    @PostConstruct
    public void init(){
        rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.setReturnCallback(this);
    }
    /**
     * 交换机确认回调方法
     *  发消息 交换机接收到了  回调
     * @param correlationData :保存回调消息的 ID 及相关信息,交换机收到消息 ack=true代表成功;ack=false 代表失败
     * @param ack :true 代表交换机收到了
     * @param cause : 原因
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if(ack){
            log.info("交换机已经收到 ID 为:{} 的消息",correlationData.getId());
        }else{
            log.info("交换机未收到 ID为 {} 的消息,原因是 {}",correlationData.getId(),cause);
        }
    }

    /**
     * 当消息传送到队列过程中不可抵达的时候 将消息返回给生产者
     * @param message
     * @param replyCode
     * @param replyText
     * @param exchange
     * @param routingKey
     */
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        log.error("消息 {} ,被交换机 {} 退回原因 {}",message,exchange,replyText);
    }
}
交换机确认--生产消费测试

使用交换机回调,就配置 publisher-confirm-type: 为CORRELATED

 @GetMapping("/sendMsg/{message}")
    public void sendConfirmMsg(@PathVariable String message){
        rabbitTemplate.convertAndSend(ConfirmConfig.confirm_exchange_name+'1', ConfirmConfig.confirm_routing_key,message,new CorrelationData("1"));
        log.info("发送消息内容:{}",message);
    }

image

当生产者获取不到消息的时候进入回调函数执行 false 的代码。
image

实现接口 ConfirmCallback ,重写其confirm()方法,方法内有三个参数correlationData、ack、cause。

  • correlationData:对象内部只有一个 id 属性,用来表示当前消息的唯一性。需要生产者携带过来
  • ack:消息投递到broker 的状态,true表示成功。
  • cause:表示投递失败的原因。

队列确认--回退接口

交换机接收到消息后可以判断当前的路径发送没有问题,但是不能保证消息能够发送到路由队列的。而发送者是不知道这个消息有没有送达队列的,因此,我们需要在队列中进行消息确认。这就是回退消息。

实现接口ReturnCallback,重写 returnedMessage() 方法,方法有五个参数

  • message(消息体)、
  • replyCode(响应code)、
  • replyText(响应内容)、
  • exchange(交换机)、
  • routingKey(队列)。
添加配置
spring:
  rabbitmq:
    publisher-confirm-type: correlated #开启交换机确认回调
    publisher-returns: true # 开启队列确认回退消息
实现回退接口
@Slf4j
@Component
public class MyCallBack implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     * 向rabbitTemplate 注入回调失败的类
     * 后置处理器:其他注解都执行结束才执行。
     */
    @PostConstruct
    public void init(){
        rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.setReturnCallback(this);
    }
/**
     * 当消息传送到队列过程中不可抵达的时候 将消息返回给生产者
     * @param message
     * @param replyCode
     * @param replyText
     * @param exchange
     * @param routingKey
     */
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        log.error("消息 {} ,被交换机 {} 退回原因 {}",message,exchange,replyText);
    }
}
执行

例如生产者发送一条错误的消息给消费者,由于routingkey的错误,导致不能发送到队列中,模拟队列出现故障,此时消息会回退。
image
交换机确认收到消息了,但是队列退回消息,因为生产者发送到了交换机,交换机转交给队列时无法交接,那么此时回退消息。

阶段小结:

生产者的消息确认机制有两种:

  • 一个是从生产者发送到交换机的确认回调;
  • 一个是从交换机发送到队列的确认回调;

生产者发送到交换机,需要配置一个参数 publisher-confirm-type ,它默认是 none,没有开启,可以把它改为 correlated ,即消息成功发送后会触发一个回调;然后我们根据 ack 的一个状态进行判断,如果为 true ,则代表发送成功。
还有一个交换机到队列的回调,将 publisher-returns 改为 true 即可,触发 returnedMessage 。

“消息从 producer 到 RabbitMQ broker cluster 成功,则会返回一个 confirmCallback;”comfirm模式下,这个ComfirmCallback中的confirm()实现方法,应该是在生产者发送消息之后,不论是否成功发送到exchange都会回调这个方法,可以通过ack这个参数true或者false来判断消息是否投递到交换机。
而接收者nack这种模式下,消息重新入队,要控制好重试的次数,否则这个消息一直消费失败,又一直重新入

消费者确认

首先介绍消息消费的前提,rabbitmq 消费消息有两种模式,一个是推送 push ,一个是自己拉取pull。

  • 推模式:消息中间件主动将消息推送给消费者
  • 拉模式:消费者主动从消息中间件拉取消息。

但实际使用中,拉取消息是会降低系统吞吐量的,以及消费者很难实时获取消息,因此,一般使用的是push 模式。
在 mq 推消息给消费者不是等消费者消费完一个再推一个,而是根据prefetch_count 参数来决定可以推多个消息到消费者的缓存里面。
在消费者确认中,为了保证数据不会丢失,RabbitMQ 支持消息确定ACK。ACK 机制是消费者从 RabbitMQ 收到消息并处理完成后,返回给RabbitMQ,RabbitMQ 收到反馈后才将此消息从队列中删除。

在消费者确认当中又分为:

  • 自动确认ack
  • 手动确认ack

自动确认

自动确认是指消费者在消费消息的时候,当消费者收到消息后,消息就会被 RabbitMQ 从队列中删除掉。这种模式认为 “发送即成功”。这是不安全的,因为消费者可能在业务中并没有成功消费完就中断了。

手动确认 autoAck:false

手动确认又分为肯定确认和否定确认。

肯定确认 BasicAck

// false 表示只确认 b.DelivertTag 这条消息,true 表示确认 小于等于 b.DelivertTag 的所有消息(批量确认)
channel.basicAck(b.getEnvelope().getDeliveryTag(),false);

否定确认: BasicNack、BasicReject

否定确认的场景不多,但有时候某个消费者因为某种原因无法立即处理某条消息时,就需要否定确认了.

否定确认时,需要指定是丢弃掉这条消息,还是让这条消息重新排队,过一会再来,又或者是让这条消息重新排队,并尽快让另一个消费者接收并处理它.

丢弃:requeue: false:channel.BasicNack(deliveryTag: e.DeliveryTag, multiple: false, requeue: false);
重新排队( requeue: true): channel.BasicNack(deliveryTag: e.DeliveryTag, multiple: false, requeue: true);

一般来说,如果出现异常,就使用channel.BasicNack 把消费失败的消息重新放入到队列中去。

springboot 版本确认

Springboot 的确认模式有三种,配置如下:

spring.rabbitmq.listener.simple.acknowledge-mode=manual
  • NONE : 不确认 :
    • 1、默认所有消息消费成功,会不断的向消费者推送消息
    • 2、因为 rabbitmq 认为所有消息都被消费成功。所以队列中存在丢失消息风险。
  • AUTO:自动确认
    • 1、根据消息处理逻辑是否抛出异常自动发送 ack(正常)和nack(异常)给服务端,如果消费者本身逻辑没有处理好这条数据就存在丢失消息的风险。
    • 2、使用自动确认模式时,需要考虑的另一件事情就是消费者过载。
  • MANUAL:手动确认
    • 1、手动确认在业务失败后进行一些操作,消费者调用 ack、nack、reject 几种方法进行确认,如果消息未被 ACK 则发送到下一个消费者或重回队列。
    • 2、ack 用于肯定确认;nack 用于 否定确认 ;reject 用于否定确认(一次只能拒绝单条消息)
@Component
@Slf4j
public class MsgConfirmController {
    @RabbitListener(queues = ConfirmConfig.confirm_queue_name)
    public void consumerConfirm(Message message, Channel channel) throws IOException {
        if(message.getBody().equals("2")){
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); 
            //basicAck:表示成功确认,使用此回执方法后,消息会被rabbitmq broker 删除。
            log.info("接收的消息为:{}",message);
        }else{
            channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);
            log.info("未消费数据");
        }
    }
}

image
发现它可以将消息返回给队列,然后又消费这个数据,不断消费,造成了死循环,消息无限投递。
image
这时候可以改成 false,然后配置下死信队列,将该消息发送到死信队列中。

总结

本文主要对消息的确认进行了 springboot 微服务版本的测试,通过两个服务之间的互相调用来验证 rabbitmq 的消息确认可行性。在后面的文章中,我们将对rabbitmq 更多细节进行深入研究。

标签:队列,确认,高级,rabbitmq,交换机,消息,message,public
From: https://www.cnblogs.com/zgf123/p/17676492.html

相关文章

  • Flink高级特性(2)
    watermark水位线处理乱序数据流从数据产生到DataSource,再到具体的算子,中间是有一个过程和时间,有可能会导致数据乱序问题,通过watermark+EventTime来处理。作用:由于网络延迟等原因,一条数据会迟到计算,比如使用eventtime来划分窗口,我们知道窗口中的数据是计算一段时间的数据,如果一......
  • Android并发编程高级面试题汇总(含详细解析 十八)
    Android并发编程高级面试题汇总最全最细面试题讲解持续更新中......
  • java面向对象高级(根据青空的霞光总结)
    #面向对象高级(青空)基本类型包装类前置:虽然java是面向对象的语言,但是基本类型不是面向对象的,如果想要让基本类型也能像面向对象的形式进行表达,就可以是用包装类包装类实际上就是将我们的基本数据类型,封装成一个类(运用了封装的思想)类型:byte->Byteboolean->Booleans......
  • rabbitmq延迟队列
    概念所谓“延迟消息”是指当消息被发送以后,并不想让消费者立刻拿到消息,而是等待特定时间后,消费者才能拿到这个消息进行消费使用场景1、订单在十分钟之内未支付则自动取消2、预定会议后,需要在预定时间点前十分钟通知各个与会人员参加会议。3、淘宝七天自动确认收货,自动评价功......
  • 函数高级
    函数默认参数、占位参数,函数重载1#include<iostream>2usingnamespacestd;34//1、函数默认参数5//如果传入数据,使用传入的数据,没有则用默认的6//函数声明与实现只能由一个有默认参数7intfunc(inta,intb=20,intc=30)8{9returna+b+c;10}1......
  • rabbitmq死信队列
    死信的概念死信队列(DeadLetterQueue)是指当消息无法被消费者正常消费时,将这些无法消费的消息发送到专门的死信队列中,以便进行进一步的处理。这种处理方式通常被称为“死信处理”。应用场景:为了保证订单业务的消息数据不丢失,需要使用到RabbitMQ的死信队列机制,当消息消费发生......
  • docker 安装rabbitmq
    dockerpullrabbitmqdockerrun-d--hostnamemyrabbitmq--namerabbitmq-p15672:15672-p5672:5672rabbitmqdockerexec-itrabbitmq/bin/bashrabbitmq-pluginsenablerabbitmq_management可以通过访问http://localhost-ip:15672,访问web界面,这里的用户名和密......
  • Xshell永久安装完全指南:畅享所有高级功能
    前言Xshell是一款功能强大的SSH远程终端客户端。Xshell支持远程协议Telnet、Rlogin、SSH/SSHPKCS,主要用于在Windows系统上远程操控服务器进行工作以及统一管理多台服务器集群,它通过多种不同的连接协议和密码,保障着用户的连接服务器安全。一、安装xshell安装包在文末附带,并提供了免......
  • RabbitMQ交换机
    概念RabbitMQ消息传递模型的核心思想是:生产者生产的消息从不会直接发送到队列。实际上,通常生产者甚至都不知道这些消息传递传递到了哪些队列中。相反,生产者只能将消息发送到交换机(exchange),交换机工作的内容非常简单,一方面它接收来自生产者的消息,另一方面将它们推入队列。......
  • openGauss学习笔记-57 openGauss 高级特性-并行查询
    openGauss学习笔记-57openGauss高级特性-并行查询openGauss的SMP并行技术是一种利用计算机多核CPU架构来实现多线程并行计算,以充分利用CPU资源来提高查询性能的技术。在复杂查询场景中,单个查询的执行较长,系统并发度低,通过SMP并行执行技术实现算子级的并行,能够有效减少查询执行时......