首页 > 其他分享 >RabbitMQ(四)RabbitMQ的确认机制

RabbitMQ(四)RabbitMQ的确认机制

时间:2023-07-22 22:22:58浏览次数:38  
标签:ack 确认 broker RabbitMQ deliveryTag 消息 机制 null public

RabbitMQ(四)RabbitMQ的确认机制


  • 保证消息不丢失、可靠抵达,可以使用事务消息,但性能会下降250倍,因此引入确认机制
    • publisherConfirm Callback确认模式
    • publisherreturn CallBack未投递到queue退回模式
    • consumer:ack机制
image-20230722175615280

1 发送端的确认机制

Confirm Callback
  • Confirm CallbackRabbitMQ发送端确认机制主要是在Broker(消息代理)收到消息的时候执行的回调

  • Spring.rabbitmq.publisher-confirms=true

    • 编码是通过在创建connectionFactory的时候设置PublisherConfirms(true)选项开启confirmcallback
    • CorrelationData:用来表示当前消息的唯一性

​ 首先配置RabbitMQ:

spring:
  datasource:
    username: root
    password: root
    url: jdbc:mysql://192.168.60.3:3306/grainmall_oms
    driver-class-name: com.mysql.cj.jdbc.Driver
  rabbitmq:
    # 开启发送端消息抵达队列的回调
    publisher-returns: true
    # 抵达队列后以异步方式优先回调
    template:
      mandatory: true
    # 开启发送端消息抵达broker的通知
    publisher-confirm-type: correlated

​ 在配置类自定义Confirm CallBack消息回调:

  • @PostConstruct标识在配置类对象创建成功的时候执行此注解标注的方法
  • setConfirmCallback的参数主要有:
    • correlationData 当前消息的唯一关联数据(消息的唯一id)
    • ack 消息是否被broker接收到
    • s 失败的原因
@Configuration
public class RabbitConfig {
    @Autowired
    RabbitTemplate rabbitTemplate;

    @Bean
    public MessageConverter messageConverter() {
        return new Jackson2JsonMessageConverter();
    }

    @PostConstruct
    public void initRabbitConfirmCallBack() {
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            /**
             * 只要broker收到消息,就会触发这个回调
             * @param correlationData 当前消息的唯一关联数据(消息的唯一id)
             * @param ack 消息是否被broker接收到
             * @param s 失败的原因
             */
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String s) {
                System.out.println("confirm:    " + correlationData + " " + ack + " " + s);
            }
        });
    }

}

​ 测试成功和失败的回调分别为:

confirm:    null true null

confirm:    null false channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'java-exchange1' in vhost '/', class-id=60, method-id=40)
Return Callback
  • 配置rabbitmq-returns:true

    spring:
      datasource:
        username: root
        password: root
        url: jdbc:mysql://192.168.60.3:3306/grainmall_oms
        driver-class-name: com.mysql.cj.jdbc.Driver
      rabbitmq:
        # 开启发送端消息抵达队列的回调
        publisher-returns: true
        # 抵达队列后以异步方式优先回调
        template:
          mandatory: true
    
  • Confirm模式只能保证消息到达broker,不能保证消息准确投递到目标queue中,这需要使用return模式实现

  • 即如果消息未能够投递到目标queue里将调用returnCallBack,可以记录下详细的投递记录、定期的巡检或者自动纠错都需要的数据

@Configuration
public class RabbitConfig {
    @Autowired
    RabbitTemplate rabbitTemplate;

    @Bean
    public MessageConverter messageConverter() {
        return new Jackson2JsonMessageConverter();
    }

    @PostConstruct
    public void initRabbitConfirmCallBack() {
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            /**
             * 只要broker收到消息,就会触发这个回调
             * @param correlationData 当前消息的唯一关联数据(消息的唯一id)
             * @param ack 消息是否被broker接收到
             * @param s 失败的原因
             */
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String s) {
                System.out.println("confirm:    " + correlationData + " " + ack + " " + s);
            }
        });


        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            /**
             * 只要消息没有传递给指定的队列,就会触发这个回调
             * @param message 投递失败的消息的详细信息
             * @param i 回复的状态码
             * @param s 回复的消息文本
             * @param s1 这个消息是交给哪个交换机
             * @param s2 用的交换机的那个route-key
             */
            @Override
            public void returnedMessage(Message message, int i, String s, String s1, String s2) {
                System.out.println("return: " + message + " " + i + " " + s + " " + s1 + " " + s2);
            }
        });
    }

}

​ 测试写错route-key的情况下的return callback

return: (Body:'{"id":null,"memberId":null,"orderSn":"94c9d88c-9ffb-41fd-8071-4f85fe16fc0e","couponId":null,"createTime":null,"memberUsername":null,"totalAmount":null,"payAmount":null,"freightAmount":null,"promotionAmount":null,"integrationAmount":null,"couponAmount":null,"discountAmount":null,"payType":null,"sourceType":null,"status":null,"deliveryCompany":null,"deliverySn":null,"autoConfirmDay":null,"integration":null,"growth":null,"billType":null,"billHeader":null,"billContent":null,"billReceiverPhone":null,"billReceiverEmail":null,"receiverName":null,"receiverPhone":null,"receiverPostCode":null,"receiverProvince":null,"receiverCity":null,"receiverRegion":null,"receiverDetailAddress":null,"note":null,"confirmStatus":null,"deleteStatus":null,"useIntegration":null,"paymentTime":null,"deliveryTime":null,"receiveTime":null,"commentTime":null,"modifyTime":null}' MessageProperties [headers={__TypeId__=com.hikaru.grainmall.order.entity.OrderEntity}, contentType=application/json, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0]) 312 NO_ROUTE java-exchange java-binding1
为消息添加唯一Id
  • rabbitTemplate的最后一个参数可以为:
    • CorrelationData:标识其唯一性id
    @GetMapping("/sendMessage")
    public String sendMessage(@RequestParam(value = "num", defaultValue = "10") Integer num) {
        for(int i = 0; i < num; i++) {
            if(i % 2 == 0) {
                OrderReturnReasonEntity orderReturnReasonEntity = new OrderReturnReasonEntity();
                orderReturnReasonEntity.setId(1L);
                orderReturnReasonEntity.setName("Utada Hikaru" + i);

                rabbitTemplate.convertAndSend("java-exchange",
                        "java-binding",
                        orderReturnReasonEntity,
                        new CorrelationData(UUID.randomUUID().toString()));
            } else {
                OrderEntity orderEntity = new OrderEntity();
                orderEntity.setOrderSn(UUID.randomUUID().toString());
                rabbitTemplate.convertAndSend("java-exchange",
                        "java-binding1",
                        orderEntity,
                        new CorrelationData(UUID.randomUUID().toString()));
            }

        }
        return "ok";
    }
本地事务表保证事务的可靠性抵达(怎么判断哪些消息没有抵达)
  • 在向MQ发送消息的时候,还可以将消息保存到数据库中,使用其唯一id为主键进行记录

  • 如果服务端收到了ConfirmCallBack,则表明消息被收到

  • 则只需要遍历数据库表即可知道哪些没有收到了

2 接受端的ack确认机制

​ 消费者获取到消息,成功处理后可以回复ack给broker,方式有:

  • basic.ack用于肯定确认,broker将移除此消息
  • basic.nack用于否定确认,可以指定broker是否丢弃该消息(即重新入队),可以批量
  • basic.reject用于否定确认,可以指定broker是否丢弃该消息,但不可以批量

​ 默认情况下位自动ack,即消息被消费者收到之后,就会从broker中移除,会造成消息的丢失,而上述的手动ack在队列没有消费者的情况下,仍然会存储消息直到出现消费者进行消费

2.1 自动消息签收

一种是自动接收的(auto)也是默认的处理方式:即只要消息接收到,客户端消息处理的方法执行完毕就会自动确认,服务端就会移除这个消息。但是这种模式存在的问题是会导致消息的丢失,比如我们向消息队列发送五个消息,在消息处理的地方加一个端点:

image-20230722205759495

然后放行一个消息:

image-20230722205113282

最后终止客户端,模拟宕机,然后发现消息丢失

image-20230722205512456
2.2 手动消息签收

另一种是手动处理(manual),即消息队列接收到消息后,只要没有明确告诉MQ消息被签收,也就是没有ack,消息就一直是unacked的状态,当接收方宕机重启后,就会重新开始处理这些消息,因此不会造成消息的丢失

image-20230722213443688

​ 此时中断服务器,接收端宕机,消息均变为Ready状态,下次宕机重启后会继续执行:

image-20230722213743577

那么接收端如何手动签收消息?

​ 接收方通过向MQ发送ack表示手动签收了消息,这个ack包括:

  • deliveryTag:存在于接收方收到的消息的消息头中,表示消息的唯一标识
  • multiple:是否开启批量签收

​ 代码为:

    @RabbitHandler
    public void receiveMessage1(Message message,
                               OrderReturnReasonEntity returnReasonEntity,
                               Channel channel) throws InterruptedException {

        System.out.println("收到消息:" + returnReasonEntity.getName());
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try {
            System.out.println("签收了" + deliveryTag);
            channel.basicAck(deliveryTag, false);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
2.3 手动消息拒签
  • channel.basicNack(deliveryTag, multiple, requeue)表示拒签消息,其中参数:

    • deliveryTag:存在于接收方收到的消息的消息头中,表示消息的唯一标识
    • multiple:是否开启批量签收
    • requeue:签收后是否重新入队

    requeue为true的话,等同于一直既不签收也不拒签的情况

    @RabbitHandler
    public void receiveMessage1(Message message,
                               OrderReturnReasonEntity returnReasonEntity,
                               Channel channel) throws InterruptedException {

        System.out.println("收到消息:" + returnReasonEntity.getName());
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try {
            if(deliveryTag % 2 == 0) {
                System.out.println("签收了" + deliveryTag);
                channel.basicAck(deliveryTag, false);
            } else {
                System.out.println("没有签收" + deliveryTag);
                channel.basicNack(deliveryTag, false, false);
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

标签:ack,确认,broker,RabbitMQ,deliveryTag,消息,机制,null,public
From: https://www.cnblogs.com/tod4/p/17574414.html

相关文章

  • RabbitMQ(三)整合SpringBoot
    RabbitMQ(三)整合SpringBoot1整合RabbitMQ1导入依赖<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>2查看容器的自动配置......
  • Dxitco德西科跟单社区首创CHATGPT人工+AI双重大数据分析机制
    交易信号对于跟单社区来说非常重要,信号多了,就必然面临选择困难。信号太少,可能会有老化的问题。维持一个不多不少、有新陈代谢的信号源,是做好跟单社区最核心的问题。Dxitco德西科跟单社区首创CHATGPT人工+AI双重分析把控,通过个性化服务、大数据分析和专家交易信号来提高客......
  • JUC并发编程(3)—锁中断机制
    目录1.什么是中断2.源码解读(中断的相关API)3.如何使用中断标识停止线程学习视频:https://www.bilibili.com/video/BV1ar4y1x7271.什么是中断一个线程不应该由其他线程来强制中断或停止,而是应该由线程自己自行停止,所以,Thread.stop、Thread.suspend、Thread.resume都已经被......
  • Java反射机制
    1、前置知识1.1、java虚拟机的方法区1.1、java虚拟机的方法区java虚拟机有一个运行时数据区,这个数据区又被分为方法区,堆区和栈区,我们这里需要了解的主要是方法区。方法区主要用来存放已经被虚拟机加载的类信息、静态变量、方法等信息。当虚拟机需要装载某个类的时候,需要类......
  • .NET 中使用RabbitMQ初体验
    在.NETCore中使用RabbitMQ前言逛园子的时候看到一篇.NET学习RabbitMq的文章(视频地址和文章地址放在文章底部了),写的不错,我也来实现一下。我是把RabbitMQ放在服务器的,然后先说一下如何部署它。注意:在使用到RabbitMQ的项目中需要安装Nuget包dotnetaddpackageRabbitMQ.Clien......
  • 最高法-实际控制人确认债权的,若相对人有理由相信其系公司职务行为,则可以参照表见代理
    (2021)最高法民申4920号  清涧县华阳鸿基置地有限责任公司、湖南鹏华装饰设计工程有限责任公司等建设工程施工合同纠纷其他民事民事裁定书本院认为:本院认为,本案系当事人申请再审案件,应当围绕华阳公司主张的再审事由能否成立进行审查。根据华阳公司的再审申请理由,本案主要审查了......
  • RabbitMQ 的CLI管理工具 rabbitmqadmin
     RabbitMQ的CLI管理工具rabbitmqadminApr20,2016Erlang的仓库下载配置,Erlang的升级,RabbitMQ的升级,服务启动,插件启用,RabbitMQCLI管理工具rabbitmqadmin的获取,RabbitMQ的架构、概念、消息投递过程,user、vhost、connection、exchange、binding、permission、channels、p......
  • 【项目实战】Kafka 重平衡 Consumer Group Rebalance 机制
    ......
  • MySQL锁机制与解决死锁问题
    引言:在数据库中,锁是一种重要的机制,用于控制并发访问数据,保证数据的一致性和完整性。MySQL作为一种常用的关系型数据库,也提供了丰富的锁机制来处理多个用户同时访问数据库时可能出现的并发问题。本篇博客将介绍MySQL的锁机制,包括如何添加锁以及解决可能出现的死锁问题。1.介绍MySQ......
  • Java-Day-33 ( 引出反射 + 反射机制 + 反射的优缺点 )
    Java-Day-33引出反射(reflection)引出传统new方法调用其方法:Dogdog=newDog();dog.hello();但若要根据以下配置文件指定信息,创建Dog对象并调用方法hello:classfullpath=com.zyz.Dogmethod=hello使用Properties类,可以读写配置文件Propertiesprope......