首页 > 其他分享 >rabbitmq消费者--消息订阅时的权衡

rabbitmq消费者--消息订阅时的权衡

时间:2023-04-23 10:36:30浏览次数:44  
标签:消费者 -- 权衡 确认 rabbitmq delivery 消息 RabbitMQ channel


消息的获得方式

pull

拉取属于一种轮询模型,发送一次get请求,获得一个消息。如果此时RabbitMQ中没有消息,会获得一个表示空的回复。总的来说,这种方式性能比较
差,很明显,每获得一条消息,都要和RabbitMQ进行网络通信发出请求。而且对RabbitMQ来说,RabbitMQ无法进行任何优化,因为它永远不知道应用
程序何时会发出请求。

while (true) {
    GetResponse getResponse = channel.basicGet(QUEUE_NAME, true);
    if (null != getResponse) {
        System.out.println("received["
                + new String(getResponse.getBody()) + "]");
    }
    TimeUnit.SECONDS.sleep(1);
}

push

push属于一种推送模型,注册一个消费者后,RabbitMQ会在消息可用时,自动将消息进行推送给消费者。

DeliverCallback deliverCallback = (consumerTag, delivery) -> {
    String message = new String(delivery.getBody(), "UTF-8");
    System.out.println(" [x] Received '" + message + "'");
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {
});

消息的应答

消费者收到的每一条消息都必须进行确认。消息确认后,RabbitMQ才会从队列删除这条消息,RabbitMQ不会为未确认的消息设置超时时
间,它判断此消息是否需要重新投递给消费者的唯一依据是消费该消息的消费者连接是否已经断开。这么设计的原因是 RabbitMQ 允许消费者消费一条消
息的时间可以很久很久。 自动确认
消费者在声明队列时,可以指定autoAck参数,当autoAck=true时,一旦消费者接收到了消息,就视为自动确认了消息。如果消费者在处理消息的过
程中出了错,就没有什么办法重新处理这条消息,所以我们很多时候,需要在消息处理成功后,再确认消息,这就需要手动确认。

自动确认

autoAck=true,自动确认。

DeliverCallback deliverCallback = (consumerTag, delivery) -> {
    String message = new String(delivery.getBody(), "UTF-8");
    System.out.println(" [x] Received '" + message + "'");
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {
});

手动确认

当autoAck=false时,RabbitMQ会等待消费者显式发回ack信号后才从内存(和磁盘,如果是持久化消息的话)中移除消息。否则,RabbitMQ会在队列
中消息被消费后立即删除它。

采用消息确认机制后,只要令autoAck=false,消费者就有足够的时间处理消息(任务),不用担心处理消息过程中消费者进程挂掉后消息丢失的问题,
因为 RabbitMQ 会一直持有消息直到消费者显式调用basicAck为止。

当autoAck=false时,对于RabbitMQ服务器端而言,队列中的消息分成了两部分:一部分是等待投递给消费者的消息;一部分是已经投递给消费者,
但是还没有收到消费者ack信号的消息。如果服务器端一直没有收到消费者的ack信号,并且消费此消息的消费者已经断开连接,则服务器端会安排该消
息重新进入队列,等待投递给下一个消费者(也可能还是原来的那个消费者)。

ManualConfirmConsumerA:

DeliverCallback deliverCallback = (consumerTag, delivery) -> {
    try {
        String message = new String(delivery.getBody(), "UTF-8");
        System.out.println(" [x] Received '" + message + "'");
    } finally {
        channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);

    }
};
channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> {});

ManualConfirmConsumerB:

DeliverCallback deliverCallback = (consumerTag, delivery) -> {
    try {
        String message = new String(delivery.getBody(), "UTF-8");
        System.out.println(" [x] Received '" + message + "'");
    } finally {
        // 不进行确认
        //channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
    }
};
channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> {});

通过运行程序,启动两个消费者 A、B,都可以收到消息,但是其中有一个消费者B不会对消息进行确认,当把这个消费者B关闭后,消费者A又会
收到本来发送给消费者B的消息。所以我们一般使用手动确认的方法是,将消息的处理放在try/catch语句块中,成功处理了,就给RabbitMQ 一个确认应
答,如果处理异常了,就在catch中进行消息的拒绝。

批量手动确认

multiple=true,批量手动确认

AtomicInteger atomicInteger = new AtomicInteger();
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
    int i = atomicInteger.incrementAndGet();
    try {
        String message = new String(delivery.getBody(), "UTF-8");
        System.out.println(" [x] Received '" + message + "'");
    } finally {
        if (i % 3 == 0) {
            // 每3个消息确认一次
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), true);
        }
    }
};
channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> {
});

Qos预取模式

在确认消息被接收之前,消费者可以预先要求接收一定数量的消息,在处理完一定数量的消息后,批量进行确认。如果消费者应用程序在确认消息
之前崩溃,则所有未确认的消息将被重新发送给其他消费者。所以这里存在着一定程度上的可靠性风险。
这种机制一方面可以实现限速(将消息暂存到RabbitMQ内存中)的作用,一方面可以保证消息确认质量(比如确认了但是处理有异常的情况)。

注意:消费确认模式必须是非自动ACK机制(这个是使用baseQos的前提条件,否则会Qos不生效),然后设置basicQos的值;另外,还可以基于
consume和 channel的粒度进行设置(global)。

DeliverCallback deliverCallback = (consumerTag, delivery) -> {
    try {
        String message = new String(delivery.getBody(), "UTF-8");
        System.out.println(" [x] Received '" + message + "'");
    } finally {
        channel.basicAck(delivery.getEnvelope().getDeliveryTag(), true);
    }
};
channel.basicQos(3);
channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> {
});

basicQos方法参数详细解释:

  • prefetchSize:最多传输的内容的大小的限制,0 为不限制,但据说 prefetchSize 参数,rabbitmq 没有实现。
  • prefetchCount:会告诉RabbitMQ不要同时给一个消费者推送多于N个消息,即一旦有N个消息还没有ack,则该consumer将block掉,直到有消息
    ack。
  • global:true\false是否将上面设置应用于channel,简单点说,就是上面限制是channel级别的还是consumer级别。

如果同时设置 channel 和消费者,会怎么样?AMQP 规范没有解释如果使用不同的全局值多次调用 basic.qos 会发生什么。 RabbitMQ 将此解释为意味

着两个预取限制应该彼此独立地强制执行; 消费者只有在未达到未确认消息限制时才会收到新消息。

channel.basicQos(10, false); // Per consumer limit
channel.basicQos(15, true); // Per channel limit
channel.basicConsume("my-queue1", false, consumer1);
channel.basicConsume("my-queue2", false, consumer2);

也就是说,整个通道加起来最多允许15条未确认的消息,每个消费者则最多有10条消息。


标签:消费者,--,权衡,确认,rabbitmq,delivery,消息,RabbitMQ,channel
From: https://blog.51cto.com/u_6784072/6216459

相关文章

  • 数据结构之布隆过滤器
    布隆过滤器如果要经常判断某个元素是否存在,你会怎么做?很容易想到使用哈希表(HashSet、HashMap),将元素作为key去查找。时间复杂度为O(1),但是空间利用率不高,需要占用比较多的内存资源。如果需要编写一个网络爬虫去爬10亿个网站数据,为了避免爬到重复的网站,如何判断某个网站是否爬过?很显......
  • InnoDB之Undo log
    事务回滚的需求事务需要保证原子性,也就是事务中的操作要么全部完成,要么什么也不做。但是偏偏有时候事务执行到一半会出现一些情况,比如:1、事务执行过程中可能遇到各种错误,比如服务器本身的错误,操作系统错误,甚至是突然断电导致的错误。2、程序员可以在事务执行过程中手动输入ROLLBACK......
  • 遭遇出金延迟怎么办?牢记这招助你快速出金!
    外汇市场作为全球最大的金融市场之一,吸引了众多投资者的关注和参与。然而在外汇交易中,投资者们或多或少都遭遇过出金延迟、滑点等问题,这些都有可能导致他们无法及时获得自己的资金,给之后的投资计划和资金安排带来不便和困扰。而当投资者在外汇平台遭遇出金延迟时,可能会感到焦虑和无......
  • 十大排序算法之归并排序
    归并排序归并排序(MERGE-SORT)是利用归并的思想实现的排序方法,该算法采用经典的分治(divide-and-conquer)策略(分治法将问题分(divide)成一些小的问题然后递归求解,而治(conquer)的阶段则将分的阶段得到的各答案"修补"在一起,即分而治之)。算法步骤不断地将当前序列平均分割成2个子序列,直......
  • MySQL数据类型之日期型
    日期类型日期类型占用空间(字节数)表示范围date41000-01-01~9999-12-31datetime81000-01-0100:00:00.000000~9999-12-3123:59:59.999999timestamp41970-01-0100:00:00.000000UTC~2038-01-1903:14:07.000000UTCyear11901-2155time3-838:59:59.000000~838:59:59.000000date......
  • jvm之线程上下文加载器与SPI
    线程上下文加载器线程上下文类加载器(ThreadContextClassLoader,简称TCCL)是从JDK1.2开始引入的。类java.lang.Thread中的方法getContextClassLoader()和setContextClassLoader(ClassLoadercl)用来获取和设置线程的上下文类加载器。如果没有通过setContextClassLoader(ClassLoader......
  • netty之TCP粘包拆包问题解决
    TCP粘包拆包问题解决什么TCP粘包和拆包问题假设客户端向服务端连续发送了两个数据包,分别用ABC和DEF来表示,那么服务端收到的数据可以分为以下三种情况:第一种情况,接收端正常收到两个数据包,即没有发生拆包和粘包的现象。第二种情况,接收端只收到一个数据包,这一个数据包中包含了发送端发......
  • jvm如何打破双亲委托机制
    打破双亲委托机制重写父类ClassLoader的loadClass方法packagecom.morris.jvm.classloader;publicclassBreakDelegateClassLoaderextendsMyClassLoader{@OverrideprotectedClass<?>loadClass(Stringname,booleanresolve)throwsClassNotFoundException{......
  • Servlet3无web.xml的原理
    在最新的SpringMVC中,一个web项目中无需传统的web.xml文件,这是怎么实现的呢?其实这并不是SpringMVC的功劳,而是servlet3规范以及web容器对这个规范的支持。简单使用配置引入依赖:.......<!--指定servlet版本为3.0--><dependency><groupId>javax.serv......
  • Spring缓存注解的使用与源码分析
    SpringCache提供了一个对缓存使用的抽象,以及大量的实现方便开发者使用。SpringCache主要提供了如下注解:注解说明@Cacheable根据方法的请求参数对其结果进行缓存@CachePut根据方法的请求参数对其结果进行缓存,和@Cacheable不同的是,它每次都会触发真实方法的调用@CacheEvict根据一定......