首页 > 其他分享 >rabbitmq的推(push)拉(pull)模式介绍及代码实现

rabbitmq的推(push)拉(pull)模式介绍及代码实现

时间:2023-12-02 10:55:06浏览次数:33  
标签:pull 消费者 factory rabbitmq 消息 push RabbitMQ 模式 channel

在rabbitmq中有两种消息处理的模式,一种是推模式/订阅模式/投递模式(也叫push模式),消费者调用channel.basicConsume方法订阅队列后,由RabbitMQ主动将消息推送给订阅队列的消费者;另一种是拉模式/检索模式(也叫pull模式),需要消费者调用channel.basicGet方法,主动从指定队列中拉取消息。

推模式:消息中间件主动将消息推送给消费者
拉模式:消费者主动从消息中间件拉取消息
推模式(push)
       推模式接收消息是最有效的一种消息处理方式。channel.basicConsume(queneName,consumer)方法将信道(channel)设置成投递模式,直到取消队列的订阅为止;在投递模式期间,当消息到达RabbitMQ时,RabbitMQ会自动地、不断地投递消息给匹配的消费者,而不需要消费端手动来拉取,当然投递消息的个数还是会受到channel.basicQos的限制。
推模式将消息提前推送给消费者,消费者必须设置一个缓冲区缓存这些消息。优点是消费者总是有一堆在内存中待处理的消息,所以当真正去消费消息时效率很高。缺点就是缓冲区可能会溢出。
由于推模式是信息到达RabbitMQ后,就会立即被投递给匹配的消费者,所以实时性非常好,消费者能及时得到最新的消息。
拉模式(pull)
        如果只想从队列中获取单条消息而不是持续订阅,则可以使用channel.basicGet方法来进行消费消息。
拉模式在消费者需要时才去消息中间件拉取消息,这段网络开销会明显增加消息延迟,降低系统吞吐量。
由于拉模式需要消费者手动去RabbitMQ中拉取消息,所以实时性较差;消费者难以获取实时消息,具体什么时候能拿到新消息完全取决于消费者什么时候去拉取消息。
结论
推模式更关注消息的实时性
推模式直接从内存缓冲区中获取消息,能有效的提高消息的处理效率以及吞吐量
拉模式更关注消费者的消费能力,只有消费者主动去拉取消息才会去获取消息
默认使用推消息,由于某些限制,消费者在某个条件成立时才能消费消息的场景需要从批量获取消息的场景
Qos相关内容
为什么要设置Qos:
在RabbitMQ中,队列向消费者发送消息,如果没有设置Qos的值,那么队列中有多少消息就发送多少消息给消费者,完全不管消费者是否能够消费完,这样可能就会形成大量未ack的消息在缓存区堆积,因为这些消息未收到消费者发送的ack,所以只能暂时存储在缓存区中,等待ack,然后删除对应消息。

这样的话,因此希望开发人员能限制此缓冲区的大小,以避免缓冲区里面无限制的未确认消息问题。这个时候就可以通过使用 basic.qos 方法设置“预取计数”值来完成的。该值定义通道上允许的未确认消息的最大数量,一旦数量达到配置的数量,RabbitMQ 将停止在通道上传递更多消息,除非至少有一个未处理的消息被确认,例如,假设在通道上有未确认的消息 5、6、7,8,并且通道的预取计数设置为 4,此时 RabbitMQ 将不会在该通道上再传递任何消息,除非至少有一个未应答的消息被 ack。比方说 tag=6 这个消息刚刚被确认 ACK,RabbitMQ 将会感知这个情况到并再发送一条消息。

消息应答和 QoS 预取值对用户吞吐量有重大影响。通常,增加预取将提高向消费者传递消息的速度。虽然自动应答传输消息速率是最佳的,但是,在这种情况下已传递但尚未处理的消息的数量也会增加,从而增加了消费者的RAM 消耗(随机存取存储器)应该小心使用具有无限预处理的自动确认模式或手动确认模式,消费者消费了大量的消息如果没有确认的话,会导致消费者连接节点的内存消耗变大,

所以找到合适的预取值是一个反复试验的过程,不同的负载该值取值也不同 100 到 300 范围内的值通常可提供最佳的吞吐量,并且不会给消费者带来太大的风险。预取值为 1 是最保守的。当然这将使吞吐量变得很低,特别是消费者连接延迟很严重的情况下,特别是在消费者连接等待时间较长的环境中。对于大多数应用来说,稍微高一点的值将是最佳的。

Qos的取值问题:
在传输效率和消费者消费速度之间做一个平衡。这个值是需要不断尝试的,

因为太低,信道传输消息效率太低,

如果太高,消费者来不及确认消息导致消息积累问题,内存消耗不断增大。

不公平分发:
Qos是一个典型的不公平分发的场景:

能者多劳

对于两个消费者,设置同样preCount的值,消费能力快的,给其推送的消息是最多的

推模式的代码实现

import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class TopicConsumerTest {
    private static final String EXCHANGE_NAME = "exchange_topic";
    private static final String QUEUE_NAME = "queue_topic1";
    private static final String IP_ADDRESS = "192.168.230.131";
    private static final int PORT = 5672;

    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {

        ConnectionFactory factory = new ConnectionFactory();
        // 设置RabbitMQ的链接参数
        factory.setUsername("echo");
        factory.setPassword("123456");
        factory.setPort(PORT);
        factory.setHost(IP_ADDRESS);

        // 和RabbitMQ建立一个链接
        Connection connection = factory.newConnection();
        final Channel channel = connection.createChannel();
        //声明交换机 Fanout模式
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC, true, false, null);
        //进行绑定,指定消费那个队列
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "", null);
        Consumer consumer = new DefaultConsumer(channel) {
            @SneakyThrows
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
                System.out.println("recv message: " + new String(body));
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        channel.basicConsume(QUEUE_NAME, consumer);
        //等待回调函数执行完毕之后 关闭资源
        TimeUnit.SECONDS.sleep(5);
        channel.close();
        connection.close();
    }
}

拉模式的代码实现

import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class TopicConsumerTest {

    private static final String EXCHANGE_NAME = "exchange_topic";
    private static final String QUEUE_NAME = "queue_topic1";
    private static final String IP_ADDRESS = "192.168.230.131";
    private static final int PORT = 5672;

    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {

        ConnectionFactory factory = new ConnectionFactory();
        // 设置RabbitMQ的链接参数
        factory.setUsername("echo");
        factory.setPassword("123456");
        factory.setPort(PORT);
        factory.setHost(IP_ADDRESS);

        // 和RabbitMQ建立一个链接
        Connection connection = factory.newConnection();
        final Channel channel = connection.createChannel();
        //声明交换机 Fanout模式
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC, true, false, null);
        //进行绑定,指定消费那个队列
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "", null);
        GetResponse getResponse = channel.basicGet(QUEUE_NAME, false);
        System.out.println(new String(getResponse.getBody()));
        //等待回调函数执行完毕之后 关闭资源
        TimeUnit.SECONDS.sleep(5);
        channel.close();
        connection.close();
    }
}

 

标签:pull,消费者,factory,rabbitmq,消息,push,RabbitMQ,模式,channel
From: https://www.cnblogs.com/franson-2016/p/17871355.html

相关文章

  • RabbitMQ work模型
    默认情况下,MQ队列如果绑定了多个消费者,那么队列在投递消息时就是轮询,一人投递一个(并且一条消息只能投递给监听该队列的某一个消费者)在一个MQ队列上绑定多个消费者的目的是加快队列中消息的处理效率,防止队列中消息的堆积问题。 注:要在消费者的application.yml文件中加上这个......
  • RabbitMQ 接收队列的消息
     代码示例:注:要把这个类加上Component注解packagecom.itheima.amqp_listener;importorg.springframework.amqp.rabbit.annotation.RabbitListener;importorg.springframework.stereotype.Component;@ComponentpublicclassMQListener{@RabbitListener(queues="simpl......
  • RabbitMQ 发送消息到队列(交换机不参与的那种)
    1.导包<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>2.在application.yml文件里编写配置信息spring:rabbitmq:host:192.168.88.130port:5672......
  • 【Spring】SpringBoot+RabbitMQ(direct/fanout/topic)の構築方法
     ■POM.xmlの中で、下記の内容を追加<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency>......
  • 使用RabbitMQ时使用MemoryPack序列化和反序列化对象
    [MemoryPackable]publicpartialclassUserEto{publicStringName{get;set;}} 发送端publicclassEventBus:IEventBus{publicvoidPublish(stringexchangeName,objecteventData){usingvarconnection=RabbitMQ......
  • RabbitMQ消息队列
    一.什么是消息队列1.简介在介绍消息队列之前,应该先了解什么是AMQP(AdvancedMessageQueuingProtocol,高级消息队列协议,点击查看)消息(Message)是指在应用间传送的数据,消息可以非常简单,比如只包含文本字符串,也可以更复杂,可能包含嵌入对象;而消息队列(MessageQueue)是一种应用间的......
  • 【MQ】RabbitMQの概念紹介及び実行方法
    参考URL:<https://www.cnblogs.com/yy-cola/p/11089800.html><https://blog.csdn.net/qq_41097820/article/details/88793329>■中心概念【Message】消息消息是不具名的,它由消息头和消息体组成,消息体式不透明的,而消息头则由一系列的可选属性组成,这些属性包括routing-key(路由......
  • Qt创建一个自定义QPushButton
    一、概述使用Qt创建一个通用的QPushButton。应用一些样式把按钮做的好看一些。步骤:1.新建一个Button类然后继承QPushButton2.设置Button的通用样式(ps:使用.qss文件的形式应用样式)3.个性化设置不同的样式4.做一个圆角按钮、带图标按......
  • linux中使用docker安装rabbitmq
    首先确保linux中docker环境正常运行。1、搜索docker镜像dockersearchrabbitmq2、拉取镜像dockerpullrabitmq3、启动rabbitmqdockerrun-d--namerabbitmq1--restartalways-p15672:15672-p5672:5672rabbitmq4、启动web工具 1)进入容器:dockerexec-itrabbi......
  • RabbitMQ -- 集群(二)
    镜像队列队列创建只在创建的机器上出现,不会出现在其他节点上,需要创建镜像队列,将队列备份到其他节点,可以备份到其他节点或者全部节点创建:在任意一个节点上添加一个policy即可在Web管理界面设置:admin->policies->Add/updateapolicy->Name:mirror-twoPattern:^mirror......