首页 > 其他分享 >RabbitMQ 通配符(Topic)模式示例

RabbitMQ 通配符(Topic)模式示例

时间:2024-10-19 12:22:57浏览次数:7  
标签:TOPIC Producer 示例 RabbitMQ Topic item 交换机 key channel

总结自:BV15k4y1k7Ep

模式说明

Topic类型与Direct相比,都是可以根据Routing key把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定Routing key的时候使用通配符

Topic类型的Routing key一般都是由一个或多个单词组成,多个单词之间以.分隔,例如: item.insert

通配符规则:

#:匹配一个或多个词。

*:匹配不多不少恰好 1 个词。

举例:

item.#:能够匹配item.insert.abc或者item.insert

item.*:只能匹配item.insert

1556031362048

1556031519931

图解:

  • 红色 Queue:绑定的是usa.# ,因此凡是以usa.开头的Routing key都会被匹配到。
  • 黄色 Queue:绑定的是#.news ,因此凡是以.news结尾的Routing key都会被匹配。

代码

1)生产者

使用 Topic 类型的 Exchange,发送消息时用到了item.insertitem.updateitem.delete3 种 routing key。。

package com.zhangmingge.rabbitmq.topic;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.zhangmingge.rabbitmq.ConnectionUtil;

/**
 * 通配符 Topic 的交换机类型为:topic
 */
public class Producer {

    // 交换机名称
    static final String TOPIC_EXCHANGE = "topic_exchange";
    // 队列名称
    static final String TOPIC_QUEUE_1 = "topic_queue_1";
    // 队列名称
    static final String TOPIC_QUEUE_2 = "topic_queue_2";

    public static void main(String[] args) throws Exception {

        // 创建连接
        Connection connection = ConnectionUtil.getConnection();

        // 创建频道
        Channel channel = connection.createChannel();

        /*
         * 声明交换机
         * 参数 1:交换机名称
         * 参数 2:交换机类型,fanout、topic、topic、headers
         */
        channel.exchangeDeclare(TOPIC_EXCHANGE, BuiltinExchangeType.TOPIC);

        // 发送信息
        String message = "新增了商品。Topic 模式;routing key 为 item.insert ";
        channel.basicPublish(TOPIC_EXCHANGE, "item.insert", null, message.getBytes());
        System.out.println("已发送消息:" + message);

        // 发送信息
        message = "修改了商品。Topic 模式;routing key 为 item.update";
        channel.basicPublish(TOPIC_EXCHANGE, "item.update", null, message.getBytes());
        System.out.println("已发送消息:" + message);

        // 发送信息
        message = "删除了商品。Topic 模式;routing key 为 item.delete";
        channel.basicPublish(TOPIC_EXCHANGE, "item.delete", null, message.getBytes());
        System.out.println("已发送消息:" + message);

        // 关闭资源
        channel.close();
        connection.close();
    }
}

注意其中声明交换机为 Topic 类型:

/*
 * 声明交换机
 * 参数 1:交换机名称
 * 参数 2:交换机类型,fanout、topic、topic、headers
 */
channel.exchangeDeclare(TOPIC_EXCHANGE, BuiltinExchangeType.TOPIC);

并在发送消息时指定了 routing key:

// 发送信息
String message = "新增了商品。Topic 模式;routing key 为 item.insert ";
channel.basicPublish(TOPIC_EXCHANGE, "item.insert", null, message.getBytes());
System.out.println("已发送消息:" + message);

2)消费者 1

消费者 1 接收两种类型的消息:更新商品(item.update)和删除商品(item.delete)。

package com.zhangmingge.rabbitmq.topic;

import com.rabbitmq.client.*;
import com.zhangmingge.rabbitmq.ConnectionUtil;

import java.io.IOException;

public class Consumer1 {

    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtil.getConnection();

        // 创建频道
        Channel channel = connection.createChannel();

        // 声明交换机
        channel.exchangeDeclare(Producer.TOPIC_EXCHANGE, BuiltinExchangeType.TOPIC);

        // 声明(创建)队列
        /*
         * 参数 1:队列名称
         * 参数 2:是否定义持久化队列
         * 参数 3:是否独占本次连接
         * 参数 4:是否在不使用的时候自动删除队列
         * 参数 5:队列其它参数
         */
        channel.queueDeclare(Producer.TOPIC_QUEUE_1, true, false, false, null);

        // 队列绑定交换机
        channel.queueBind(Producer.TOPIC_QUEUE_1, Producer.TOPIC_EXCHANGE, "item.update");
        channel.queueBind(Producer.TOPIC_QUEUE_1, Producer.TOPIC_EXCHANGE, "item.delete");

        // 创建消费者;并设置消息处理
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            /*
             * consumerTag 消息者标签,在 channel.basicConsume 时候可以指定
             * envelope 消息包的内容,可从中获取消息 id,消息 routingkey,交换机,消息和重传标志 (收到消息失败后是否需要重新发送)
             * properties 属性信息
             * body 消息
             */
            public void handleDelivery(String consumerTag,
                                       Envelope envelope,
                                       AMQP.BasicProperties properties,
                                       byte[] body) throws IOException {
                // 路由 key
                System.out.println("路由 key 为:" + envelope.getRoutingKey());
                // 交换机
                System.out.println("交换机为:" + envelope.getExchange());
                // 消息 id
                System.out.println("消息 id 为:" + envelope.getDeliveryTag());
                // 收到的消息
                System.out.println("消费者 1-接收到的消息为:" + new String(body, "utf-8"));
            }
        };
        // 监听消息
        /*
         * 参数 1:队列名称
         * 参数 2:是否自动确认,设置为 true 为表示消息接收到自动向 mq 回复接收到了,mq 接收到回复会删除消息,设置为 false 则需要手动确认
         * 参数 3:消息接收到后回调
         */
        channel.basicConsume(Producer.TOPIC_QUEUE_1, true, consumer);
    }
}

注意启动声明交换机为 Topic 类型:

// 声明交换机
channel.exchangeDeclare(Producer.TOPIC_EXCHANGE, BuiltinExchangeType.TOPIC);

并且将队列通过item.updateitem.deleterouting key 与交换机绑定:

// 队列绑定交换机
channel.queueBind(Producer.TOPIC_QUEUE_1, Producer.TOPIC_EXCHANGE, "item.update");
channel.queueBind(Producer.TOPIC_QUEUE_1, Producer.TOPIC_EXCHANGE, "item.delete");

3)消费者 2

消费者 2 接收所有类型的消息:新增商品(item.insert),更新商品(item.update)和删除商品(item.delete)。

package com.zhangmingge.rabbitmq.topic;

import com.rabbitmq.client.*;
import com.zhangmingge.rabbitmq.ConnectionUtil;

import java.io.IOException;

public class Consumer2 {

    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtil.getConnection();

        // 创建频道
        Channel channel = connection.createChannel();

        // 声明交换机
        channel.exchangeDeclare(Producer.TOPIC_EXCHANGE, BuiltinExchangeType.TOPIC);

        // 声明(创建)队列
        /*
         * 参数 1:队列名称
         * 参数 2:是否定义持久化队列
         * 参数 3:是否独占本次连接
         * 参数 4:是否在不使用的时候自动删除队列
         * 参数 5:队列其它参数
         */
        channel.queueDeclare(Producer.TOPIC_QUEUE_2, true, false, false, null);

        // 队列绑定交换机
        channel.queueBind(Producer.TOPIC_QUEUE_2, Producer.TOPIC_EXCHANGE, "item.*");

        // 创建消费者;并设置消息处理
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            /*
             * consumerTag 消息者标签,在 channel.basicConsume 时候可以指定
             * envelope 消息包的内容,可从中获取消息 id,消息 routingkey,交换机,消息和重传标志 (收到消息失败后是否需要重新发送)
             * properties 属性信息
             * body 消息
             */
            public void handleDelivery(String consumerTag,
                                       Envelope envelope,
                                       AMQP.BasicProperties properties,
                                       byte[] body) throws IOException {
                // 路由 key
                System.out.println("路由 key 为:" + envelope.getRoutingKey());
                // 交换机
                System.out.println("交换机为:" + envelope.getExchange());
                // 消息 id
                System.out.println("消息 id 为:" + envelope.getDeliveryTag());
                // 收到的消息
                System.out.println("消费者 2-接收到的消息为:" + new String(body, "utf-8"));
            }
        };
        // 监听消息
        /*
         * 参数 1:队列名称
         * 参数 2:是否自动确认,设置为 true 为表示消息接收到自动向 mq 回复接收到了,mq 接收到回复会删除消息,设置为 false 则需要手动确认
         * 参数 3:消息接收到后回调
         */
        channel.basicConsume(Producer.TOPIC_QUEUE_2, true, consumer);
    }
}

注意其队列绑定交换机时使用了通配符:

// 队列绑定交换机
channel.queueBind(Producer.TOPIC_QUEUE_2, Producer.TOPIC_EXCHANGE, "item.*");

测试

启动所有消费者,然后使用生产者发送消息,在消费者对应的控制台可以查看到生产者发送对应 routing key 对应队列的消息,达到按照需要接收的效果,并且这些 routing key 可以使用通配符。

在执行完测试代码后,其实到 RabbitMQ 的管理后台找到Exchanges选项卡,点击topic_exchange的交换机,可以查看到如下的绑定:

1556032433333

小结

Topic 主题模式/通配符模式可以实现发布与订阅模式和路由模式的功能,Topic 在配置 routing key 的时候可以使用通配符,要更加灵活。

标签:TOPIC,Producer,示例,RabbitMQ,Topic,item,交换机,key,channel
From: https://www.cnblogs.com/Higurashi-kagome/p/18475737

相关文章

  • RabbitMQ 路由(Routing)模式示例
    总结自:BV15k4y1k7Ep模式说明和消费订阅模式相比,路由模式特点:交换机的类型为Direct。队列与交换机绑定时,要指定一个Routingkey(路由key)。消息的发送方在向Exchange发送消息时,也必须指定消息的Routingkey。Exchange不再把消息交给每一个绑定的队列,而是根据消息的Rout......
  • RabbitMQ 发布订阅(Publish Subscribe)模式示例
    总结自:BV15k4y1k7Ep交换机订阅模式示例图:在简单模式和工作队列模式中,只有3个角色:P:生产者,也就是要发送消息的程序。C:消费者,消息的接受者,会一直等待消息到来。Queue:消息队列,图中红色部分。而在订阅模型中,多了一个Exchange角色,而且工作过程略有变化:P:生产者,也就是要......
  • 使用AES 128位加解密,加解密模式采用CBC,填充模式采用PKCS5Padding的Java工具方法示例
    importjavax.crypto.Cipher;importjavax.crypto.spec.IvParameterSpec;importjavax.crypto.spec.SecretKeySpec;importjava.util.Base64;publicclassAESUtils{privatestaticfinalStringAES_ALGORITHM="AES/CBC/PKCS5Padding";private......
  • RabbitMQ 普通模式
    RabbitMQ普通模式一、普通模式示意图二、普通模式介绍RabbitMQ普通模式也称为点对点模式,它是消息队列的一种基本实现方式。在这种模式下,生产者将消息发送到队列中,消费者从队列中接收并处理消息。每条消息只会被一个消费者接收,其他消费者无法重复消费。特点:单一消费......
  • RabbitMQ系列学习笔记(八)--发布订阅模式
    文章目录一、发布订阅模式原理二、发布订阅模式实战1、消费者代码2、生产者代码3、查看运行结果本文参考:尚硅谷RabbitMQ教程丨快速掌握MQ消息中间件rabbitmqRabbitMQ详解Centos7环境安装Erlang、RabbitMQ详细过程(配图)一、发布订阅模式原理在开发过程中,有一......
  • RabbitMQ系列学习笔记(十)--通配符模式
    文章目录一、通配符模式原理二、通配符模式实战1、消费者代码2、生产者代码3、查看运行结果本文参考:尚硅谷RabbitMQ教程丨快速掌握MQ消息中间件rabbitmqRabbitMQ详解Centos7环境安装Erlang、RabbitMQ详细过程(配图)一、通配符模式原理通配符模式(Topics)是在路......
  • ThreeJS入门(123):THREE.Skeleton 知识详解,示例代码
    作者:还是大剑师兰特,曾为美国某知名大学计算机专业研究生,现为国内GIS领域高级前端工程师,CSDN知名博主,深耕openlayers、leaflet、mapbox、cesium,webgl,ThreeJS,canvas,echarts等技术开发,欢迎加微信(gis-dajianshi),一起交流。查看本专栏目录-本文是第123篇入门文章......
  • Rabbitmq集群
    根据项目需要,三台机器搭建一个rabbitmq集群,10.10.10.1(虚拟IP,下同)为主节点,10.10.10.2和10.10.10.3为从节点。1、安装erlang,该软件包是rabbitMQ依赖软件包,三台机器同步安装。将安装包otp_src_18.3.tar上传到三台服务器的data目录下解压并更改文件名为erlang。tar-xvfot......
  • RabbitMQ系列学习笔记(三)--工作队列模式
    文章目录一、工作队列模式原理二、工作队列模式实战1、抽取工具类2、消费者代码3、生产者代码4、查看运行结果本文参考尚硅谷RabbitMQ教程丨快速掌握MQ消息中间件rabbitmqRabbitMQ详解Centos7环境安装Erlang、RabbitMQ详细过程(配图)一、工作队列模式原理与......
  • 《RabbitMQ系列》之RabbitMQ的4种Exchange
    大家好,我是tc,今天为大家介绍一下RabbitMQ中的4种exchange,水平一般,能力有限,若有错误之处,欢迎指正。 对RabbitMQ稍有了解的朋友应该都知道,在RabbitMQ中,一个有4中Exchange,分别是direct、topic、fanout、headers。其实,还有一个默认的交换机,称为defaultexchange,其本质也是一个di......