首页 > 其他分享 >【RabbitMQ消息中间件】7.订阅模式

【RabbitMQ消息中间件】7.订阅模式

时间:2023-03-19 11:03:08浏览次数:41  
标签:订阅 NAME 队列 绑定 RabbitMQ 交换机 消息 消息中间件 channel


上一篇我们了解了RabbitMQ的消息的确认模式,本篇我们继续讲解RabbitMQ的五大队列模式之一的“订阅模式”。



在实际开发中,通常会遇到以下需求:


一个生产者,多个消费者,同一个消息被多个消费者获取。



对于“订阅模式”,其模式图如下所示:


【RabbitMQ消息中间件】7.订阅模式_queueBind


上图中,“P”为消息的生产者,而“X”是交换机(Exchange),即消息生产者将消息发送到“交换机”而不再是“队列”,然后需要承载成产者消息的队列,与该交换机进行绑定。余下就是消费者监听相关队列来获取服务端推送的消息,满足多个消费者共同获取同一个生产者的所有消息的情况。


当消费者不需要从生产者获取消息了,将相关消息队列与交换机的绑定关系解除即可。



所以上面的“订阅模式”具有以下特点:


(1)一个生产者,多个消费者
(2)每个消费者都有自己的一个队列
(3)生产者没有将消息直接发送至队列,而是发送到了交换机
(4)每个队列都要绑定到交换机
(5)生产者发送的消息,经过交换机,到达队列,实现一个消息被多个消费者获取的目的。


在“订阅模式”中,多了一个我们前面没有提到的“交换机”。具有交换机的消息传输模式如下:


【RabbitMQ消息中间件】7.订阅模式_exchangeDeclare_02


可以看到,作为交换机,当消息从生产者传递至交换机(Exchange)时,交换机会将消息复制,按照绑定规则(Bindings),分别将消息推送至绑定在该交换机(Exchange)上的队列(Queues)中。最终队列拿到交换机传递的消息后,消费者就可以通过消息队列获取生产推送的消息了,而且保证了每个连接不同消息队列的消费者拿到的数据是相同的。


要注意,交换机的作用仅仅是用于信息交换,它本身是不具有消息存储的能力的。



下面我们编写一下符合“订阅模式”的实例代码。还在之前创建的测试工程“RabbitMQ_Test”下新建一个生产者类:


【RabbitMQ消息中间件】7.订阅模式_exchangeDeclare_03

package cn.jack.rabbitmq.ps;
import cn.jack.rabbitmq.connection.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

public class Send {

private final static String EXCHANGE_NAME="test_exchange_fanout";

public static void main(String[] args) throws Exception {
//获取到连接以及mq通道
Connection connection = ConnectionUtil.getConnection();
//从连接中创建通道
Channel channel = connection.createChannel();

//声明Exchange
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

//消息内容
String message = "Hello World!";
channel.basicPublish(EXCHANGE_NAME, "",null, message.getBytes());
System.out.println("[product] Send '"+ message +"'");

//关闭通道和连接
channel.close();
connection.close();
}
}

与之前不同的是,之前的生产者是声明(创建)队列:


//声明(创建)队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
而该生产者不再声明队列,即不再与队列直接连接,而是换为声明交换机:
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

我们这里先不编写消费者,直接运行一下生产者:


【RabbitMQ消息中间件】7.订阅模式_exchangeDeclare_04


在RabbitMQ的管理工具的“Exchange”模块下,我们可以看到刚刚创建的交换机:


【RabbitMQ消息中间件】7.订阅模式_订阅模式_05



此时还没有队列绑定到交换机,那么此时消息到了哪里了呢?可以这么说,消息“丢失”了。因为当我们尝试将信息发送到一个没有绑定队列的交换机时,由于交换机无法及时将信息推送至消息队列,而交换机本身没有存储信息的能力,则会导致信息丢失。所以这里我们要注意。



下面我们创建第一个消费者类:


【RabbitMQ消息中间件】7.订阅模式_exchangeDeclare_06


package cn.jack.rabbitmq.ps;
import cn.jack.rabbitmq.connection.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;

public class Recv1 {

private final static String QUEUE_NAME = "test_queue_ps_1";//队列名称

private final static String EXCHANGE_NAME="test_exchange_fanout";//交换机名称

public static void main(String[] argv) throws Exception {
// 获取到连接以及mq通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();

// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);

//绑定队列到交换机
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");

// 同一时刻服务器只会发一条消息给消费者
channel.basicQos(1);

// 定义队列的消费者
QueueingConsumer consumer = new QueueingConsumer(channel);
// 监听队列,手动返回完成
channel.basicConsume(QUEUE_NAME, false, consumer);

// 获取消息
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [consumer1] Received '" + message + "'");
//休眠10ms
Thread.sleep(10);
// 返回确认状态
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
}

这里我们为消费者声明了一个队列:


// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);

然后将该队列绑定到之前生产者绑定的交换机上:


//绑定队列到交换机
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");

这样就可以获取生产者通过交换机推送的信息了。


运行一下消费者,再运行一下生产者,可以发现消费者成功获取了信息:


【RabbitMQ消息中间件】7.订阅模式_RabbitMQ_07


再创建一个Recv2的消费者类,与Recv1代码相同,除了定义的队列名称不同即可:


private final static String QUEUE_NAME = "test_queue_ps_2";//队列名称

获取消息后的控制台打印记得更换名称为“consumer2”:


System.out.println(" [consumer2] Received '" + message + "'");

此时我们同时启动生产者,以及两个消费者,可以想看到两个消费者同时获取了生产者发送的全部信息:


【RabbitMQ消息中间件】7.订阅模式_RabbitMQ_08



最后,查看交换机与队列的绑定关系,可以点击“Exchange”模块下相关交换机的名称:


【RabbitMQ消息中间件】7.订阅模式_queueBind_09


然后点击下面的“Bindings”就可以看到交换机绑定的队列:


【RabbitMQ消息中间件】7.订阅模式_Exchange_10


可以看到目前绑定了名为“test_queue_ps_1”和“test_queue_ps_2”的队列。在下面的编辑区域还可以进行手动添加绑定队列。


标签:订阅,NAME,队列,绑定,RabbitMQ,交换机,消息,消息中间件,channel
From: https://blog.51cto.com/u_16012040/6131005

相关文章

  • 【RabbitMQ消息中间件】6.消息的确认模式
    上一篇讲解了如何使用Java实现一个work队列模式,并实现能做多劳的效果。本篇我们来了解一下有关RabbitMQ的“消息的确认模式”。当消费者从队列中获取......
  • 【RabbitMQ消息中间件】5.work模式
    上一篇讲解了如何使用Java连接RabbitMQ服务,并实现一个简单队列模式。本篇讲解RabbitMQ的另一个队列模式----work模式。work的队列模式图如下所示:可以看......
  • 【RabbitMQ消息中间件】3.管理界面中的功能
    上一篇我们讲解了RabbitMQ的安装和网页管理工具的启动,并且简单的创建了用户和权限。本篇讲解一下RabbitMQ管理工具中其它的一些管理功能。首先确保Rabb......
  • 【RabbitMQ消息中间件】1.RabbitMQ简介
    一、什么是MQ?MQ为MessageQueue,即是“消息队列”,它是应用程序和应用程序之间的同新方法。遵循MessageQueue规则开发出来的,具有消息队列特点的产品,都可以称之为“消息中间......
  • rabbitmq概述
    一、rabbitmq的目的rabbitmq主要有三个目的:1.流量削峰  优点是:使用消息队列做缓冲 2.应用解耦  订单系统只要发送命令等待mq回复就行,不用等待三个系统的命......
  • rabbitmq 基本常用操作
    目录常用操作当前窗口启动rabbitmq后台启动rabbitmq停止rabbitmq查看所有队列查看所有虚拟主机在ErlangVM运行的情况下启动RabbitMQ应用查看节点状态查看所有可用的插......
  • rabbitmq最佳实践
    在使用消息机制时,我们通常需要考虑以下几个问题:消息不能丢失保证消息一定能投递到目的地保证业务处理和消息发送/消费的一致性本文以RabbitMQ为例,讨论如何解决以上......
  • 微信小程序消息推送(订阅消息)接入 前端+后端
    1.第一步是需要在微信公众平台配置,具体参考官方文档订阅消息配置2.然后需要在前端代码中调用微信消息订阅api,调起客户端小程序订阅消息界面,具体api参考订阅消息api点......
  • 观察者模式和发布订阅模式
    首先我们必须清楚这两种模式都是设计模式,而不是某种语言的专属;观察者模式(Observer)概念理解观察者模式是一种一对多的依赖关系的行为设计模式,让多个观察者对象监听一......
  • rabbitmq使用
     Usagerabbitmqctl[--node<node>][--timeout<timeout>][--longnames][--quiet]<command>[<commandoptions>]Availablecommands:Help:help......