首页 > 其他分享 >rabbitmq自动及手动ACK

rabbitmq自动及手动ACK

时间:2023-05-29 10:34:53浏览次数:35  
标签:false 消费者 ACK 手动 确认 rabbitmq 消息 message true


  mq的ack  主要是确认消息被消费者消费完成后通知服务器将队列里面的消息清除。

而如果不配置Ack的话呢,我测试过他会自动的忽略,也就是说此时的服务是no_ack=true的模式,就是说只要我发现你是消费了这个数据,至于异常不异常的,我不管了。通知Ack机制就是这么来的,更加灵活的,我们需要Ack不自动,而是手动,这样做的好处,就是使得我们开发人员更加人性化或者灵活的来处理我们的业务罗杰代码,更加方便的处理异常的问题以及数据的返回处理等。下面是通话机制的四条原则:

Basic.Ack 发回给 RabbitMQ 以告知,可以将相应 message 从 RabbitMQ 的消息缓存中移除。
Basic.Ack 未被 consumer 发回给 RabbitMQ 前出现了异常,RabbitMQ 发现与该 consumer 对应的连接被断开,之后将该 message 以轮询方式发送给其他 consumer (假设存在多个 consumer 订阅同一个 queue)。
在 no_ack=true 的情况下,RabbitMQ 认为 message 一旦被 deliver 出去了,就已被确认了,所以会立即将缓存中的 message 删除。所以在 consumer 异常时会导致消息丢失。
来自 consumer 侧的 Basic.Ack 与 发送给 Producer 侧的 Basic.Ack 没有直接关系。
 

正题部分(配置手动Ack,实现异常消息回滚)
 

A. 在消费者端的mq配置文件上添加,配置  关键代码为 acknowledeg = "manual",意为表示该消费者的ack方式为手动(此时的queue已经和生产者的exchange通过某个routeKey绑定了)

<rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual"> <rabbit:listener queues="queue_xxx" ref="MqConsumer"/> <rabbit:listener queues="queue_xxx" ref="MqConsumer2"/> </rabbit:listener-container>
B. 新建一个类 MqConsumer ,并实现接口  ChannelAwareMessageListener ,实现onMessage方法,不需要指定方法。

springAMQP中已经实现了一个功能,如果该监听器已经实现了下面2个接口,则直接调用onMessage方法

C. 关键点在实现了ChannelAwareMessageListener的onMessage方法后,会有2个参数。

一个是message(消息实体),一个是channel就是当前的通道,很多地方都没有说清楚怎么去手动ack,其实手动ack就是在当前channel里面调用basicAsk的方法,并传入当前消息的tagId就可以了。

    消息监听接口实现    
        1.MessageListener消费者消息监听(自动进行任务完成确认)
              基于实现MessageListener的消费者监听消息时,如果xml里没有配置acknowledge,则是默认如同xml配置acknowledge="auto"  ,是自动确认消费者完成任务(消息ack), 如果此时消费者抛出异常 ,消息会返回该队列并发送给其他消费者 ,如没有其他消费者  则会继续发到该消费者
        如果xml配置中acknowledge="manual",则无法收到消息。该消息会停留在服务器,然后会发给可以收到消息的消费者。
        

        2.ChannelAwareMessageListener消费者消息监听(手动进行任务完成确认)
             基于实现ChannelAwareMessageListener的消费者监听消息时,xml配置中acknowledge="auto"或不配置acknowledge时,调用方法进行消费者任务完成确认时会报如下异常(com.rabbitmq.client.ShutdownSignalException: channel error;)
        所以若要实现手动消费则任务完成确认,xml的监听标签中需要配置acknowledge="manual" 手动确认消费者任务完成(消息ack)
        
         消息确认  如未调用如下方法确认,则消息不再发到该消费者(如有其它的消费者,则轮询到其他的消费者,否则返回队列保留在服务器),multiple 为 false只确认当前一个消息收到,true确认所有consumer获得的消息
        (1) 消息确认  如未确认则消息不再发到该消费者,multiple 为 false只确认当前一个消息收到,true确认所有consumer获得的消息
                        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
       (2) 消息确认并返回队列   如未确认则消息不在发到该消费者,multiple 为 false只确认当前一个消息收到,true确认所有  consumer获得的消息;requeue 为true该消息重新回到队列,并发到该队列的其他消费者,为false则直接丢掉该消息
                            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
       (3) 拒绝消息  requeue 为true该消息重新回到队列,并发到该队列的其他消费者,如没有其他消费者,则会一直发到该消费者,为false则直接丢掉该消息

        channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);

同样的,如果要Nack或者拒绝消息(reject)的时候,也是调用channel里面的basicXXX方法就可以了(当然要制定tagId)。注意如果抛异常或Nack(并且requeue为true),消息会一直重新入队列,一不小心就会重复一大堆消息不断出现~。

channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); // 消息的标识,false只确认当前一个消息收到,true确认所有consumer获得的消息
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); // ack返回false,并重新回到队列,api里面解释得很清楚 channel.basicReject(message.getMessageProperties().getDeliveryTag(), true); // 拒绝消息
D. 针对上面所描述的情况,我们在搭建一个消息队列时候,我们的思路应该是这样的,首先,我们要启动ack的手动方式,紧接着,我们处理代码逻辑,如果发生了异常信息,我们首先通知到ack,我已经表示接受到这条数据了,你可以进行删除了,不需要让他自动的重新进入队列中,然后,我们启用一个错误处理,手动将其重新插入队列中,在此之前,有几个类和Api一起来看一下。

    1. SimpleMessageListenerContainer

    这个是我们的基础监听,他的作用就是队列的总监听,可以为其配置ack模式,异常处理类等。。

    2. org.springframework.amqp.support.converter.SimpleMessageConverter

    这个类和下面的Converter类很容易搞混淆,这个类的作用是可以解析队列中的 message 转 obj

    3. org.springframework.amqp.rabbit.retry.MessageRecoverer

    这个接口,需要我们开发者自定义实现,其中的一个方法recover(Message message, Throwable cause),就可以看出来他是干嘛的,就是说在监听出错,也就是没有抓取的异常而是抛出的异常会触发该方法,我们就会在这个接口的实现中,将消息重新入队列

    4. org.springframework.util.ErrorHandler

    这个接口也是在出现异常时候,会触发他的方法

案例。。。。。。。。。。。。。。。。。。。
生产者

import java.io.IOException;
  
 import javax.annotation.Resource;
  
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.amqp.core.AmqpTemplate;
 import org.springframework.amqp.rabbit.support.CorrelationData;
 import org.springframework.stereotype.Component;
  
  
 @Component
 public class MessageProducer{
  
      private Logger logger = LoggerFactory.getLogger(MessageProducer.class);  
  
         @Resource(name="amqpTemplate")  
         private AmqpTemplate amqpTemplate;  
  
         @Resource(name="amqpTemplate2")  
         private AmqpTemplate amqpTemplate2;  
  
         public void sendMessage(Object message) throws IOException {  
             logger.info("to send message:{}", message);  
             amqpTemplate.convertAndSend("queue.Test.admin", message); 
 //            implements  ConfirmCallback
 //            Message re = (Message)amqpTemplate.convertSendAndReceive("queue.Test.admin", message);
             
 //            amqpTemplate2.convertAndSend("queue.Test.admin", message);  
         }
 消费者 1 及其配置文件.xmlimport org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.amqp.core.Message;
 import org.springframework.amqp.core.MessageListener;
 import org.springframework.stereotype.Component;
  
  
 @Component
 public class MessageConsumer implements MessageListener {
  
     private Logger logger = LoggerFactory.getLogger(MessageConsumer.class);  
     @Override
     public void onMessage(Message message) {
          logger.info("admin  MessageConsumer  consumer receive message------->:{}", message);  
 //         try {
 //            Thread.sleep(3000);
 //        } catch (InterruptedException e) {
 //            // TODO Auto-generated catch block
 //            e.printStackTrace();
 //        }
          xml配置中acknowledge="auto" 时 是自动确认ack  如果此时消费者抛出异常 消息会发到该队列其他消费者  如没有其他消费者  则会一直发到该消费者
 //        throw  new NullPointerException(".....admin.....消费者异常。。。。。。。。");
        
     }
  
 }
 <?xml version="1.0" encoding="UTF-8"?>
 <beans xmlns="http://www.springframework.org/schema/beans"
     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
     xsi:schemaLocation="http://www.springframework.org/schema/beans  
      http://www.springframework.org/schema/beans/spring-beans-3.0.xsd        http://www.springframework.org/schema/rabbit  
      http://www.springframework.org/schema/rabbit/spring-rabbit-1.2.xsd">
     <!--配置connection-factory,指定连接rabbit server参数 -->
     <rabbit:connection-factory id="connectionFactory" virtual-host="/" 
         username="homy" password="homy" host="120.25.212.10" port="5672" channel-cache-size="5"/>
  
     <!--配置connection-factory,指定连接rabbit server参数 
     <rabbit:connection-factory id="connectionFactory" virtual-host="hymn" 
         username="hy" password="hy2018627" host="120.25.212.10" port="5672" />
         -->
         
     <!--通过指定下面的admin信息,当前producer中的exchange和queue会在rabbitmq服务器上自动生成 -->
     <rabbit:admin id="connectAdmin" connection-factory="connectionFactory" />
         
     <!--定义queue -->
     <rabbit:queue name="queueTest" durable="true" auto-delete="false"
         exclusive="false" declared-by="connectAdmin" >
     </rabbit:queue>
     
  
     <!-- 定义direct exchange,绑定queueTest -->
     <rabbit:direct-exchange name="exchangeTest"
         durable="true" auto-delete="false" declared-by="connectAdmin">
         <rabbit:bindings>
             <rabbit:binding queue="queueTest" key="queue.Test.admin"></rabbit:binding>
         </rabbit:bindings>
     </rabbit:direct-exchange>
  
     <!--定义rabbit template用于数据的接收和发送 -->
     <rabbit:template id="amqpTemplate" connection-factory="connectionFactory"
         exchange="exchangeTest"/>
  
 <!-- queue litener 观察 监听模式 当有消息到达时会通知监听在对应的队列上的监听对象  acknowledge="manual" -->
     <rabbit:listener-container connection-factory="connectionFactory">
         <rabbit:listener queues="queueTest" ref="messageConsumer" method="onMessage"/>
     </rabbit:listener-container>
    
    
     <!--定义rabbit template用于数据的接收和发送 -->
     <rabbit:template id="amqpTemplate2" connection-factory="connectionFactory" exchange="exchangeTopic" />
     
     <!--定义queue -->
     <rabbit:queue name="queueTest2" durable="true"  auto-delete="false"
         exclusive="false" declared-by="connectAdmin" />
  
     <!-- 定义direct exchange,绑定queueTest -->
     <rabbit:topic-exchange name="exchangeTopic"
         durable="true" auto-delete="false" declared-by="connectAdmin">
         <rabbit:bindings>
             <rabbit:binding queue="queueTest2" pattern="queue.#"></rabbit:binding>
             <rabbit:binding queue="queueTest" pattern="queue.Test.*"></rabbit:binding>
         </rabbit:bindings>
     </rabbit:topic-exchange>
  
  
     <!-- queue litener 观察 监听模式 当有消息到达时会通知监听在对应的队列上的监听对象-->
     <rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto">
         <rabbit:listener queues="queueTest2" ref="messageConsumer" method="onMessage"/>
     </rabbit:listener-container> 
 </beans>

消费者 2 及其配置文件.xml(另外一个项目)

import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.amqp.core.Message;
 import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
 import org.springframework.stereotype.Component;
  
 import com.rabbitmq.client.Channel;
  
 @Component
 public class MessageConsumer implements ChannelAwareMessageListener {
  
     private Logger logger = LoggerFactory.getLogger(MessageConsumer.class);  
  
     @Override
     public void onMessage(Message message, Channel channel) throws Exception {
         // TODO Auto-generated method stub
 //    消息监听接口实现    
 //        1.MessageListener消费者消息监听(自动进行任务完成确认)
 //              基于实现MessageListener的消费者监听消息时,如果xml里没有配置acknowledge,则是默认如同xml配置acknowledge="auto"  ,是自动确认消费者完成任务(消息ack), 如果此时消费者抛出异常 ,消息会返回该队列并发送给其他消费者 ,如没有其他消费者  则会继续发到该消费者
 //        如果xml配置中acknowledge="manual",则无法收到消息。该消息会停留在服务器,然后会发给可以收到消息的消费者。
 //        
 //        2.ChannelAwareMessageListener消费者消息监听(手动进行任务完成确认)
 //             基于实现ChannelAwareMessageListener的消费者监听消息时,xml配置中acknowledge="auto"或不配置acknowledge时,调用方法进行消费者任务完成确认时会报如下异常(com.rabbitmq.client.ShutdownSignalException: channel error;)
 //        所以若要实现手动消费则任务完成确认,xml的监听标签中需要配置acknowledge="manual" 手动确认消费者任务完成(消息ack) 
 //        
 //         消息确认  如未调用如下方法确认,则消息不再发到该消费者(如有其它的消费者,则轮询到其他的消费者),multiple 为 false只确认当前一个消息收到,true确认所有consumer获得的消息
 //         消息确认  如未确认则消息不在发到该消费者,multiple 为 false只确认当前一个消息收到,true确认所有consumer获得的消息
 //        (1)channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
 //        消息确认并返回队列   如未确认则消息不在发到该消费者,multiple 为 false只确认当前一个消息收到,true确认所有consumer获得的消息;requeue 为true该消息重新回到队列,并发到该队列的其他消费者,为false则直接丢掉该消息
 //        (2)channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
 //        拒绝消息  requeue 为true该消息重新回到队列,并发到该队列的其他消费者,如没有其他消费者,则会一直发到该消费者,为false则直接丢掉该消息
 //        (3)channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
         
         
         
         
         
         //..........手动消息确认。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。
 //         try {
 //                Thread.sleep(3000);
 //            } catch (InterruptedException e) {
 //                // TODO Auto-generated catch block
 //                e.printStackTrace();
 //            }
              xml配置中acknowledge="auto" 时 是自动确认ack  如果此时消费者抛出异常 消息会发到该队列其他消费者  如没有其他消费者  则会一直发到该消费者 
 //         if(true){
 //             throw  new NullPointerException(".....admin.....消费者异常。。。。。。。。");
 //         }
         
         logger.error("收到");
          //消息确认  如未确认则消息不在发到该消费者,multiple 为 false只确认当前一个消息收到,true确认所有consumer获得的消息
         channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
         logger.info("business-admin  MessageConsumer receive message  出现异常   并将该消息重新入队列------->:{}", message);
         logger.info("messageid:"+message.getMessageProperties().getDeliveryTag()+" ...messageBody:"+message.getBody());
         //消息确认并返回队列   如未确认则消息不在发到该消费者,multiple 为 false只确认当前一个消息收到,true确认所有consumer获得的消息;requeue 为true该消息重新回到队列,并发到该队列的其他消费者,为false则直接丢掉该消息
 //        channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
  
         //拒绝消息  requeue 为true该消息重新回到队列,并发到该队列的其他消费者,如没有其他消费者,则会一直发到该消费者,为false则直接丢掉该消息
 //        channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
         
         
         //........................手动通知消息生产者。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。
         
         
         
         
     }
  
 }<?xml version="1.0" encoding="UTF-8"?>
 <beans xmlns="http://www.springframework.org/schema/beans"
     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
     xsi:schemaLocation="http://www.springframework.org/schema/beans  
      http://www.springframework.org/schema/beans/spring-beans-3.0.xsd        http://www.springframework.org/schema/rabbit  
      http://www.springframework.org/schema/rabbit/spring-rabbit-1.2.xsd">
     <!--配置connection-factory,指定连接rabbit server参数 -->
     <rabbit:connection-factory id="connectionFactory" virtual-host="/" 
         username="homy" password="homy" host="120.25.212.10" port="5672" channel-cache-size="5"/>
     <!--配置connection-factory,指定连接rabbit server参数 
     <rabbit:connection-factory id="connectionFactory2" virtual-host="hymn" 
         username="hy" password="hy2018627" host="120.25.212.10" port="5672" publisher-confirms="true"/>
         -->
         
     <!--通过指定下面的admin信息,当前producer中的exchange和queue会在rabbitmq服务器上自动生成 -->
     <rabbit:admin id="connectAdmin" connection-factory="connectionFactory" />
  
     <!--定义queue -->
     <rabbit:queue name="queueTest" durable="true" auto-delete="false"
         exclusive="false" declared-by="connectAdmin" />
     <rabbit:queue name="queueTest3" durable="true" auto-delete="false"
         exclusive="false" declared-by="connectAdmin" />
  
     <!-- 定义direct exchange,绑定queueTest -->
     <rabbit:direct-exchange name="exchangeTest"
         durable="true" auto-delete="false" declared-by="connectAdmin">
         <rabbit:bindings>
             <rabbit:binding queue="queueTest" key="queue.Test.admin"></rabbit:binding>
         </rabbit:bindings>
     </rabbit:direct-exchange>
  
     <!--定义rabbit template用于数据的接收和发送 -->
     <rabbit:template id="amqpTemplate" connection-factory="connectionFactory"
         exchange="exchangeTest" />
  
  
     <!-- queue litener 观察 监听模式 当有消息到达时会通知监听在对应的队列上的监听对象 acknowledge="manual"-->
     <rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual">
         <rabbit:listener queues="queueTest" ref="messageConsumer" method="onMessage" />
     </rabbit:listener-container>
  
 <!-- .............................................................................. -->
  
  
      <!--定义rabbit template用于数据的接收和发送 -->
     <rabbit:template id="amqpTemplate2" connection-factory="connectionFactory" exchange="exchangeTopic" />
     
     <!--定义queue -->
     <rabbit:queue name="queueTest2" durable="true" auto-delete="false" exclusive="false" declared-by="connectAdmin" />
  
     <!-- 定义direct exchange,绑定queueTest -->
     <rabbit:topic-exchange name="exchangeTopic" durable="true" auto-delete="false" declared-by="connectAdmin">
         <rabbit:bindings>
             <rabbit:binding queue="queueTest2" pattern="queue.#"></rabbit:binding>
             <rabbit:binding queue="queueTest" pattern="queue.Test.*"></rabbit:binding>
         </rabbit:bindings>
     </rabbit:topic-exchange>
  
  
     <!-- queue litener 观察 监听模式 当有消息到达时会通知监听在对应的队列上的监听对象 -->
     <rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto">
         <rabbit:listener queues="queueTest2" ref="topicConsumer" method="onMessage"/>
     </rabbit:listener-container>
 </beans>

标签:false,消费者,ACK,手动,确认,rabbitmq,消息,message,true
From: https://blog.51cto.com/chengzheng183/6368211

相关文章

  • 静态路由、Track与NQA联动配置举例
    1.6.4 静态路由、Track与NQA联动配置举例1.组网需求SwitchA、SwitchB、SwitchC和SwitchD连接了20.1.1.0/24和30.1.1.0/24两个网段,在交换机上配置静态路由以实现两个网段的互通,并配置路由备份以提高网络的可靠性。SwitchA作为20.1.1.0/24网段内主机的缺省网关,在SwitchA......
  • rabbitMQ windows环境重装后报错RabbitMQ service is already present - only updatin
    错误如下:C:\Users\Administrator>rabbitmq-serviceinstallRabbitMQserviceisalreadypresent-onlyupdatingserviceparametersC:\ProgramFiles\erl\erts\bin\erlsrv:Warning,couldnotsetcorrectinteractivemode.Error:句柄无效。---此行有时显示中文乱码C:\Progr......
  • 交换机策略路由、Track与NQA联动配置总结
    一、  组网需求:SwitchA、SwitchB、SwitchC两两互联,在SwitchA上配置策略路由,使不同的业务流量报文转发到不同的网段。SwitchA作为10.1.1.0/24网段内主机的缺省网关,配置20.1.1.0/24网段的静态路由指向SwitchB,并配置静态路由使SwitchA、SwitchB、SwitchC所有直连网段能......
  • HUAWEI&VRRP+NQA+TRACK
    VRRP虚拟网关协议,具体的协议原理,可以到cisco部份去看,这里主讲配置原理及命令(后面还有一个专门针对于MST设计的VRRP)    此图可以很好的理解 VRRP工作原理IP地址已经写在图中,配置即可上边的三面router 配置ospf协议,此处不模拟外网NAT部份,因为HUAWEI的NAT部份,还没整理......
  • webpack的工作流程(附带部分源码分析)
    @目录webpack的工作流程webpack的准备阶段本阶段流程和钩子modules和chunks的生成阶段module解析解析流程chunks生成文件生成阶段模板hash更新模板渲染chunk生成文件总结webpack的工作流程说明工作流程之前,先抛出两个结论:webpack的核心功能,是抽离成很多个内部插件来实现......
  • C语言进阶--#pragma pack
    为什么需要内存对齐?--CPU对内存的读取不是连续的,而是分成块读取的,块的大小只能是1、2、4、16、。。。字节--当读取操作的数据未对齐,则需要两次总线周期来访问内存,因此性能会大打折扣--某些硬件平台只能从规定的相对地址处读取特定类型的数据,否则产生硬件异常#pragmapack用于......
  • webpack-loader-使用babel-loader转换处理高级的js语法
    webpack只能打包处理一部分高级的JavaScript语法。对于那些webpack无法处理的高级js语法,需要借助于babel-loader进行打包处理。例如webpack无法处理下面的JavaScript代码://定义装饰器函数functioninfo(target){target.info='Personinfo.'}//定义一个普通的类@info......
  • P9356 「SiR-1」Bracket 题解
    P9356「SiR-1」Bracket题解首先我们来先考虑一下如何计算一个给定的\(f(s[1,n])\)。一般括号序列的题目都是比较套路的将\(\texttt{(}\)赋值为\(1\),将\(\texttt{)}\)赋值为\(-1\),然后求一下前缀和记为\(sum_i\),那么一个括号序列是合法的当且仅当\(\foralli\in[1,n],......
  • Linux 安装 RabbitMQ
    一、概要1.环境(1)RockyLinux9.1(2)RabbitMQ3.11.162.安装方式针对RHEL系统,RabbitMQ官方介绍了两种安装方式:(1)通过Yumrepositories安装,需要配置Yumrepositories文件并设置RabbitMQ镜像地址。这是官方强烈推荐的安装方式,也是本文选择的安装方式;(2)下载RPM包......
  • Java:SpringBoot整合Canal+RabbitMQ组合实现MySQL数据监听
    canal[kə’næl],译意为水道/管道/沟渠,主要用途是基于MySQL数据库增量日志解析,提供增量数据订阅和消费目录一、MySQL设置二、启动Canal服务端三、通过Canal客户端消费数据四、通过RabbitMQ消费数据1、启动RabbitMQ2、修改canal配置3、消费RabbitMQ中的数据文档资料github:https......