首页 > 其他分享 >【RabbitMQ消息中间件】8.路由模式

【RabbitMQ消息中间件】8.路由模式

时间:2023-03-19 11:03:19浏览次数:35  
标签:NAME 队列 rabbitmq RabbitMQ 交换机 路由 消息中间件 channel


上一篇讲解了RabbitMQ的“订阅模式”,本篇来讲解RabbitMQ的队列的“路由模式”。



其实严格来讲,RabbitMQ只有三种模式:“简单模式”、“work模式”以及“交换机模式”。


对于交换机模式来说,又分三种:“订阅模式”、“路由模式”、“通配符模式”,而他们之间的不同就是交换机类型的不同。



目前在实际开发中我们可能会遇到这种问题,生产者发布到消息队列的信息,消费者不一定全部需要,但是使用订阅模式的话,所有消费者都能够拿到,不满足只拿到自己所需信息的需求。



对于以上的需求,“路由模式”就可以进行实现。


路由模式的图示如下:


【RabbitMQ消息中间件】8.路由模式_服务器


还是一个生产者、一个交换机,多个绑定交换机的队列,和多个连接不同队列的消费者。与之前的订阅模式不同的是,这里指定了交换机的类型是“direct”;而消费者将队列绑定到交换机时,指定了一个路由Key(error/info/warning等类型)。


如果生产者发送的消息中的路由key是“error”,那么消费者c1和c2都可以接收到,而如果生产者发送的消息中的路由kay是“info”或者“warning”,则只有消费者c2能够收到。



由此可见。“路由模式”的作用就是可以让消费者有选择性的接收消息。



“路由模式”的实现代码和之前的“订阅模式”的生产者和消费者代码差不多,对于生产者:


【RabbitMQ消息中间件】8.路由模式_RabbitMQ_02

package cn.jack.rabbitmq.routing;
import cn.jack.rabbitmq.connection.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

public class Send {

private final static String EXCHANGE_NAME="test_exchange_direct";

public static void main(String[] args) throws Exception {
//获取到连接以及mq通道
Connection connection = ConnectionUtil.getConnection();
//从连接中创建通道
Channel channel = connection.createChannel();

//声明Exchange,指定交换机的类型为direct
channel.exchangeDeclare(EXCHANGE_NAME, "direct");

//消息内容1 info类型
String message = "Hello World!";
channel.basicPublish(EXCHANGE_NAME, "info",null, message.getBytes());
System.out.println("[product] Send '"+ message +"'");

//消息内容2 error类型
message = "Has Error!";
channel.basicPublish(EXCHANGE_NAME, "error",null, message.getBytes());
System.out.println("[product] Send '"+ message +"'");

//消息内容3 warning类型
message = "Has Warning!";
channel.basicPublish(EXCHANGE_NAME, "warning",null, message.getBytes());
System.out.println("[product] Send '"+ message +"'");

//关闭通道和连接
channel.close();
connection.close();
}
}

在上面的代码中,首先生产者绑定交换机的时候,指定类型为“direct”,然后发布消息时指定不同消息的路由key。



然后消费者1的代码:


【RabbitMQ消息中间件】8.路由模式_路由模式_03


package cn.jack.rabbitmq.routing;
import cn.jack.rabbitmq.connection.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;

public class Recv1 {

private final static String QUEUE_NAME = "test_queue_direct_1";//队列名称

private final static String EXCHANGE_NAME="test_exchange_direct";//交换机名称

public static void main(String[] argv) throws Exception {
// 获取到连接以及mq通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();

// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);

//绑定队列到交换机,并指定了两个路由key
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "info");
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error");

// 同一时刻服务器只会发一条消息给消费者
channel.basicQos(1);

// 定义队列的消费者
QueueingConsumer consumer = new QueueingConsumer(channel);
// 监听队列,手动返回完成
channel.basicConsume(QUEUE_NAME, false, consumer);

// 获取消息
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [consumer1] Received '" + message + "'");
//休眠10ms
Thread.sleep(10);
// 返回确认状态
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
}

对于消费者1,在进行绑定队列到交换机时,设置了两个路由key,也就是它只需要接受路由key为“info”和“error”的消息。



然后消费者2的代码:


【RabbitMQ消息中间件】8.路由模式_RabbitMQ_04


package cn.jack.rabbitmq.routing;
import cn.jack.rabbitmq.connection.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;

public class Recv2 {

private final static String QUEUE_NAME = "test_queue_direct_2";//队列名称

private final static String EXCHANGE_NAME="test_exchange_direct";//交换机名称

public static void main(String[] argv) throws Exception {
// 获取到连接以及mq通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();

// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);

//绑定队列到交换机,并指定了一个路由key
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "warning");

// 同一时刻服务器只会发一条消息给消费者
channel.basicQos(1);

// 定义队列的消费者
QueueingConsumer consumer = new QueueingConsumer(channel);
// 监听队列,手动返回完成
channel.basicConsume(QUEUE_NAME, false, consumer);

// 获取消息
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [consumer2] Received '" + message + "'");
//休眠10ms
Thread.sleep(10);
// 返回确认状态
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
}

对于消费者2,在进行绑定队列到交换机时,设置了一个路由key,也就是它只需要接受路由key为“warning”的消息。



下面首先运行两个消费者,然后运行生产者,他们的发送与接收情况如下:


【RabbitMQ消息中间件】8.路由模式_direct_05


可以看到,生产者一共发送了三种路由key的消息,而消费者按照自己绑定队列时的设置接收到需要的路由key的信息。

标签:NAME,队列,rabbitmq,RabbitMQ,交换机,路由,消息中间件,channel
From: https://blog.51cto.com/u_16012040/6131004

相关文章

  • 【RabbitMQ消息中间件】7.订阅模式
    上一篇我们了解了RabbitMQ的消息的确认模式,本篇我们继续讲解RabbitMQ的五大队列模式之一的“订阅模式”。在实际开发中,通常会遇到以下需求:一个生产者,多......
  • 【RabbitMQ消息中间件】6.消息的确认模式
    上一篇讲解了如何使用Java实现一个work队列模式,并实现能做多劳的效果。本篇我们来了解一下有关RabbitMQ的“消息的确认模式”。当消费者从队列中获取......
  • 【RabbitMQ消息中间件】5.work模式
    上一篇讲解了如何使用Java连接RabbitMQ服务,并实现一个简单队列模式。本篇讲解RabbitMQ的另一个队列模式----work模式。work的队列模式图如下所示:可以看......
  • 【RabbitMQ消息中间件】3.管理界面中的功能
    上一篇我们讲解了RabbitMQ的安装和网页管理工具的启动,并且简单的创建了用户和权限。本篇讲解一下RabbitMQ管理工具中其它的一些管理功能。首先确保Rabb......
  • 【RabbitMQ消息中间件】1.RabbitMQ简介
    一、什么是MQ?MQ为MessageQueue,即是“消息队列”,它是应用程序和应用程序之间的同新方法。遵循MessageQueue规则开发出来的,具有消息队列特点的产品,都可以称之为“消息中间......
  • 万字血书Vue—路由
    多个路由通过路由器进行管理。前端路由的概念和原理(编程中的)路由(router)就是一组key-value对应关系,分为:后端路由和前端路由后端路由指的是:请求方式、请求地址和functi......
  • rabbitmq概述
    一、rabbitmq的目的rabbitmq主要有三个目的:1.流量削峰  优点是:使用消息队列做缓冲 2.应用解耦  订单系统只要发送命令等待mq回复就行,不用等待三个系统的命......
  • 解决编程式路由跳转,多次执行抛出NavigationDuplicated异常
      解决:在vue-router的index.js中添加如下代码//先保存router原型对象的pushletoriginPush=VueRouter.prototype.pushletoriginReplace=VueRouter.prototype......
  • webpack性能优化(1):分隔/分包/异步加载+组件与路由懒加载
    webpackensure相信大家都听过。有人称它为异步加载,也有人说做代码切割,那这个家伙到底是用来干嘛的?其实说白了,它就是把js模块给独立导出一个.js文件的,然后使用这个模块的时......
  • 动态路由协议
    1动态路由的概述​虽然静态路由在有些时候很有用,但是是必须是手动操作每条路由条目,对于大型网络来说经常改变的情况,配置静态路由工作量非常繁重,因此使用动态路由是必要的。......