首页 > 其他分享 >深入学习RabbitMQ五种模式(三)

深入学习RabbitMQ五种模式(三)

时间:2023-04-29 09:56:46浏览次数:34  
标签:String 队列 RabbitMQ 交换机 深入 key 五种 com channel

1.路由模式(精确匹配)

路由模式(Routing)的特点:

  • 该模式的交换机为direct,意思为定向发送,精准匹配。
  • 队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)
  • 消息的发送方在向Exchange发送消息时,也必须指定消息的 RoutingKey。
  • Exchange不再把消息交给每一个绑定的队列,而是根据消息的RoutingKey进行判断,只有队列的Routingkey与消息的Routing key完全一致,才会接收到消息。

生产者将消息发送到direct交换器,同时生产者在发送消息的时候会指定一个路由key,而在绑定队列和交换器的时候又会指定一个路由key,那么消息只会发送到相应routing key相同的队列,然后由监听该队列的消费者进行消费消息。模型如下图所示:

  • 创建生产者
package com.olive;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

/**
 * 生产者(路由模式)
 */
public class RoutorProducer {

    // 交换机名称
    private static final String EXCHANGE_NAME = "routing_exchange";

    public static void main(String[] args) throws Exception {
        // 1、创建连接
        Connection connection = ConnectionUtils.getConnection();
        // 2、创建通道(频道)
        Channel channel = connection.createChannel();
        // 3、发送消息,连续发3条
        for (int i = 0; i < 3; i++) {
            String routingKey = "";
            //发送消息的时候根据相关逻辑指定相应的routing key。
            switch (i) {
                case 0:  //假设i=0,为error消息
                    routingKey = "error";
                    break;
                case 1: //假设i=1,为info消息
                    routingKey = "info";
                    break;
                case 2: //假设i=2,为warning消息
                    routingKey = "warning";
                    break;
            }
            // 要发送的消息
            String message = "Hello World Message!!!~~~" + routingKey;
            // 消息发送 channel.basicPublish(交换机名称,路由key,消息其它属性,消息内容)
            channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("utf-8"));
            System.out.println("生产者发送的消息:" + message);
        }
        //释放资源
        channel.close();
        connection.close();
    }
}
  • 创建消费者

消费者1

package com.olive;

import java.io.IOException;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

/**
 * 消费者1(路由模式)
 */
public class RoutorCunsumer1 {

    // 队列名称
    private static final String QUEUE_NAME1 = "routing_queue1";
    // 交换机名称
    private static final String EXCHANGE_NAME = "routing_exchange";

    public static void main(String[] args) throws Exception {
        // 1、获取连接对象
        Connection connection = ConnectionUtils.getConnection();
        // 2、创建通道(频道)
        Channel channel = connection.createChannel();
        // 3、声明交换机(有则不创建,无则创建) channel.exchangeDeclare(交换机名字,交换机类型,是否持久化)
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true);
        // 4、声明队列Queue。channel.queueDeclare(队列名称,是否持久化,是否独占本连接,是否自动删除,附加参数)
        channel.queueDeclare(QUEUE_NAME1, true, false, false, null);

        // 5、根据指定的routingKey绑定队列和交换机 channel.queueBind(队列名, 交换机名, 路由key)
        channel.queueBind(QUEUE_NAME1, EXCHANGE_NAME, "error");

        // 6、监听队列,接收消息
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //获取路由的key
                String routingKey = envelope.getRoutingKey();
                //获取交换机信息
                String exchange = envelope.getExchange();
                //获取消息信息
                String message = new String(body, "utf-8");
                System.out.println("路由Key:" + routingKey + ", 交换机名称:" + exchange + ", 消费者获取消息: " + message);
            }
        };
        channel.basicConsume(QUEUE_NAME1, true, defaultConsumer);
        //注意,消费者这里不建议关闭资源,让程序一直处于读取消息的状态
    }
}

消费者2

package com.olive;

import java.io.IOException;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

/**
 * 消费者2(路由模式)
 */
public class RoutorCunsumer2 {

    // 队列名称
    private static final String QUEUE_NAME2 = "routing_queue2";
    // 交换机名称
    private static final String EXCHANGE_NAME = "routing_exchange";

    public static void main(String[] args) throws Exception {
        // 1、获取连接对象
        Connection connection = ConnectionUtils.getConnection();
        // 2、创建通道(频道)
        Channel channel = connection.createChannel();
        // 3、声明交换机(有则不创建,无则创建) channel.exchangeDeclare(交换机名字,交换机类型,是否持久化)
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true);
        // 4、声明队列Queue。channel.queueDeclare(队列名称,是否持久化,是否独占本连接,是否自动删除,附加参数)
        channel.queueDeclare(QUEUE_NAME2, true, false, false, null);

        // 5、根据指定的routingKey绑定队列和交换机 channel.queueBind(队列名, 交换机名, 路由key)
        channel.queueBind(QUEUE_NAME2, EXCHANGE_NAME, "error");
        channel.queueBind(QUEUE_NAME2, EXCHANGE_NAME, "info");
        channel.queueBind(QUEUE_NAME2, EXCHANGE_NAME, "warning");

        // 6、监听队列,接收消息
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //获取路由的key
                String routingKey = envelope.getRoutingKey();
                //获取交换机信息
                String exchange = envelope.getExchange();
                //获取消息信息
                String message = new String(body, "utf-8");
                System.out.println("路由Key:" + routingKey + ", 交换机名称:" + exchange + ", 消费者获取消息: " + message);
            }
        };
        channel.basicConsume(QUEUE_NAME2, true, defaultConsumer);
    }
}
  • 验证

分别先启动所有消费者,再启动生产者发送消息;在消费者对应的控制台可以查看到生产者发送对应routing key对应队列的消息;达到按照需要接收的效果。

消费者1绑定的交换机和队列的路由Key为error,所以只要生产者发送消息时带有error的routingKey它都能够获取到消息。

消费者2绑定的交换机和队列的路由Key为error、info、warning,所以只要生产者发送消息时带有这3种的routingKey它都能够获取到消息。

从RabbitMQ管理后台也能看到对应的交换机,以及队列绑定情况

  • 总结
  1. Routing模式需要将交换机设置为Direct类型。
  2. Routing模式要求队列在绑定交换机时要指定routing key,当发送消息到交换机后,交换机会根据routing key将消息发送到对应的队列。

2.Topic模式(模糊匹配)

Topic类型与Direct相比,都是可以根据RoutingKey把消息路由到不同的队列。但是Topic类型的Exchange可以让队列在绑定Routing key的时候使用通配符进行匹配,也就是模糊匹配,这样与之前的模式比起来,它更加的灵活!

Topic主题模式的Routingkey一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: log.insert ,它的通配符规则如下:

*:匹配不多不少恰好1个词

#:匹配0或多个单词

简单举例:

log.*:只能匹配log.error,log.info等
log.#:能够匹配log.insert,log.insert.abc,log.news.update.abc等

图解:

  1. 红色Queue:绑定的是usa.# ,因此凡是以usa.开头的routing key都会被匹配到

  2. 黄色Queue:绑定的是#.news ,因此凡是以.news结尾的 routing key都会被匹配

  • 创建生产者
package com.olive;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

/**
 * 生产者(Topic主题模式)
 */
public class TopicProducer {

    // 交换机名称
    private static final String EXCHANGE_NAME = "topic_exchange";

    public static void main(String[] args) throws Exception {
        // 1、创建连接
        Connection connection = ConnectionUtils.getConnection();
        // 2、创建通道(频道)
        Channel channel = connection.createChannel();
        // 3、发送消息
        for (int i = 0; i < 4; i++) {
            String routingKey = "";
            //发送消息的时候根据相关逻辑指定相应的routing key。
            switch (i) {
                case 0:  //假设i=0,为select消息
                    routingKey = "log.select";
                    break;
                case 1: //假设i=1,为info消息
                    routingKey = "log.delete";
                    break;
                case 2: //假设i=2,为log.news.add消息
                    routingKey = "log.news.add";
                    break;
                case 3: //假设i=3,为log.news.update消息
                    routingKey = "log.news.update";
                    break;
            }
            // 要发送的消息
            String message = "Hello Message!!!~~~" + routingKey;
            // 消息发送 channel.basicPublish(交换机名称,路由key,消息其它属性,消息内容)
            channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("utf-8"));
            System.out.println("生产者发送的消息:" + message);
        }
        // 关闭资源
        channel.close();
        connection.close();
    }
}
  • 创建消费者

消费者1

接收所有与log.*相匹配的路由key队列中的消息

package com.olive;

import java.io.IOException;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

/**
 * 消费者(Topic模式)
 */
public class TopicConsumer1 {

    // 队列名称
    private static final String QUEUE_NAME1 = "topic_queue1";
    // 交换机名称
    private static final String EXCHANGE_NAME = "topic_exchange";

    public static void main(String[] args) throws Exception {
        // 1、获取连接对象
        Connection connection = ConnectionUtils.getConnection();
        // 2、创建通道(频道)
        Channel channel = connection.createChannel();
        // 3、声明交换机(有则不创建,无则创建) channel.exchangeDeclare(交换机名字,交换机类型,是否持久化)
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC, true);
        // 4、声明队列Queue channel.queueDeclare(队列名称,是否持久化,是否独占本连接,是否自动删除,附加参数)
        channel.queueDeclare(QUEUE_NAME1, true, false, false, null);

        // 5、根据指定的routingKey绑定队列和交换机,设置路由key channel.queueBind(队列名, 交换机名, 路由key)
        channel.queueBind(QUEUE_NAME1, EXCHANGE_NAME, "log.*");

        // 6、监听队列,接收消息
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //获取路由的key
                String routingKey = envelope.getRoutingKey();
                //获取交换机信息
                String exchange = envelope.getExchange();
                //获取消息信息
                String message = new String(body, "utf-8");
                System.out.println("路由Key:" + routingKey + ", 交换机名称:" + exchange + ", 消费者获取消息: " + message);
            }
        };
        channel.basicConsume(QUEUE_NAME1, true, defaultConsumer);

        //注意,消费者这里不建议关闭资源,让程序一直处于读取消息的状态
    }
}

消费者2

接收所有与`log.``#相匹配的路由key队列中的消息

package com.olive;

import java.io.IOException;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

/**
 * 消费者(Topic模式)
 */
public class TopicConsumer2 {

    // 队列名称
    private static final String QUEUE_NAME2 = "topic_queue2";
    // 交换机名称
    private static final String EXCHANGE_NAME = "topic_exchange";

    public static void main(String[] args) throws Exception {
        // 1、获取连接对象
        Connection connection = ConnectionUtils.getConnection();
        // 2、创建通道(频道)
        Channel channel = connection.createChannel();
        // 3、声明交换机(有则不创建,无则创建) channel.exchangeDeclare(交换机名字,交换机类型,是否持久化)
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC, true);
        // 4、声明队列Queue。channel.queueDeclare(队列名称,是否持久化,是否独占本连接,是否自动删除,附加参数)
        channel.queueDeclare(QUEUE_NAME2, true, false, false, null);

        // 5、根据指定的routingKey绑定队列和交换机 channel.queueBind(队列名, 交换机名, 路由key)
        channel.queueBind(QUEUE_NAME2, EXCHANGE_NAME, "log.#");

        // 6、监听队列,接收消息
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //获取路由的key
                String routingKey = envelope.getRoutingKey();
                //获取交换机信息
                String exchange = envelope.getExchange();
                //获取消息信息
                String message = new String(body, "utf-8");
                System.out.println("路由Key:" + routingKey + ", 交换机名称:" + exchange + ", 消费者获取消息: " + message);
            }
        };
        channel.basicConsume(QUEUE_NAME2, true, defaultConsumer);
    }
}

分别先启动所有消费者,再使用生产者发送消息;在消费者对应的控制台可以查看到生产者发送对应routing key对应队列的消息;达到按照需要接收的效果。

消费者1的路由key匹配规则为log.*,所有该路由规则的绑定的队列应该只有2条信息。

消费者2的路由key匹配规则为log.#,它能够匹配以log.开头的所有路由key,所有该路由规则的绑定的队列应该只有4条信息。

从RabbitMQ管理后台也能看到对应的交换机,以及队列绑定情况

  • 总结

Topic主题模式需要设置类型为topic的交换机,交换机和队列进行绑定,并且指定通配符方式的routing key,当发送消息到交换机后,交换机会根据routing key将消息发送到对应的队列。

Topic主题模式可以实现Publish/Subscribe发布与订阅模式 和Routing路由模式的功能;只是Topic在配置routing key 的时候可以使用通配符,所以更加灵活。

来源
cnblogs.com/tanghaorong/p/14992330.html#_label0

标签:String,队列,RabbitMQ,交换机,深入,key,五种,com,channel
From: https://www.cnblogs.com/happyhuangjinjin/p/17363600.html

相关文章

  • 深入学习RabbitMQ五种模式(一)
    1.安装erlang下载otp_win64_25.3.exehttps://www.erlang.org/downloadserlang安装完成,需要配置erlang环境变量ERLANG_HOME=E:\software\ErlangOTPPATH=%PATH%;%ERLANG_HOME%\bin;2.安装RabbitMQ下载rabbitmq-server-3.11.13.exehttps://www.rabbitmq.com/download.htm......
  • SpringCloud Stream集成RabbitMQ
    1.概述SpringCloudStream框架抽象出了三个最基础的概念来对各种消息中间件提供统一调用:DestinationBinders:负责集成外部消息系统的组件。DestinationBinding:由Binders创建的,负责沟通外部消息系统、消息发送者和消息消费者的桥梁。Message:消息发送者与消息消费......
  • rabbitMQ--类型
    1.五种消息模型1.1基本消息模型 1.2work消息模型 1.3订阅模型1.3.1Fanout,也称为广播。流程说明流程图: 在广播模式下,消息发送流程是这样的:1)可以有多个消费者2)每个消费者有自己的queue(队列)3)每个队列都要绑定到Exchange(交换机)4)生产者发送的消息,只能......
  • 深入探讨源码--ArrayList
    持续推送技术干货目录深入探讨源码之ArrayListArrayList类图ArrayList的数据结构ArrayList的关键属性ArrayList构造方法ArrayList常用方法add方法ArrayList中的fast-fail机制add(i,o)方法set(i,o)方法get(i)方法remove(index)方法remove(Object)方法clear方法indexOf(o)方法深......
  • RabbitMQ _ How to Close a Channel
    https://low-orbit.net/rabbitmq-how-to-close-a-channel RabbitMQHowtoCloseaChannelIfyouhavefoundyourwaytothispageyouareprobablywonderinghowtocloseachannelinRabbitMQ.Channelsshouldbeclosedwhentheyarenolongerinuse.There......
  • 深入jar包:从jar包中读取资源文件
    我们常常在代码中读取一些资源文件(比如图片,音乐,文本等等)。在单独运行的时候这些简单的处理当然不会有问题。但是,如果我们把代码打成一个jar包以后,即使将资源文件一并打包,这些东西也找不出来了。看看下面的代码:Java代码 //源代码1:packageedu.hxraid;import......
  • Linux安装RabbitMQ
    前言:还是和以前一样,linux安装软件的目录都是data目录 1.这次稍微不一样,不过还是进入data目录,创建RabbitMq目录并进入该目录cd/datamkdirrabbitMqcdrabbitMq 2.上传"erlang-21.1-1.el7.x86_64.rpm"文件和 "rabbitmq-server-3.7.7-1.el7.noarch.rpm"文件到当前......
  • 字节前端--深入JS
    首先先介绍JS的基本概念:比如是单线程,动态,弱类型等等。除了这些东西之外还有:下面的一些基础概念:JavaScript是一种脚本语言,通常在网页上运行。JavaScript不需要编译,因为它是一种解释性语言。在网页上添加JavaScript的方式有多种,包括内联脚本、嵌入式脚本和外部脚本。......
  • 关于聚合根,领域事件的那点事---深入浅出理解DDD
    作者:京东物流赵勇萍前言最近有空会跟同事讨论DDD架构的实践落地的情况,但真实情况是,实际中对于领域驱动设计中的实体,值对象,聚合根,领域事件这些战术类的实践落地,每个人理解依然因人而异,大概率是因为这些概念还是有一些抽象,同时有有别于传统的MVC架构开发。在此,通过小demo的方式......
  • 用友iuap 让企业数智化能力深入、让业务价值浅出
     本文转自科技正能量作者郑凯IDC近期的一份调研中提出:数字化转型在2023年开始进入到一个新的阶段:从数字化转型时代,进入到数字化业务时代。在分水岭的两端,一个变化是:数字化转型时代是业务的数字化,而数字化业务时代则是数字的业务化。这个潜移默化的改变背后,其实有一重很深的含义。......