首页 > 其他分享 >RabbitMQ交换机

RabbitMQ交换机

时间:2023-09-02 15:46:24浏览次数:35  
标签:false 队列 绑定 RabbitMQ 交换机 消息 channel

概念

RabbitMQ 消息传递模型的核心思想是: 生产者生产的消息从不会直接发送到队列。实际上,通常生产 者甚至都不知道这些消息传递传递到了哪些队列中。

相反,生产者只能将消息发送到交换机(exchange),交换机工作的内容非常简单,一方面它接收来 自生产者的消息,另一方面将它们推入队列。交换机必须确切知道如何处理收到的消息。是应该把这些消 息放到特定队列还是说把他们到许多队列中还是说应该丢弃它们。这就的由交换机的类型来决定。
image

在交换机中衍生出两个概念:

  • routingkey(路由键):生产者发送消息给交换机时,需要指定一个routingkey
  • 绑定键:通过绑定键将交换机和队列绑定起来,这样mq在发送消息时能正确的发送到指定队列
  • 两者中的关系:生产者将消息发送到哪个交换机是由routingkey觉得的,在由交换机通过指定bindingkey指定队列。

交换机的类型

image

在RabbitMQ中,交换机(Exchange)和队列(Queue)的声明仅需要在生产者或消费者任意一方进行即可。

通常的做法是:

  • 生产者负责声明Exchange和绑定队列关系
  • 消费者负责声明Queue

扇出交换机(Fanout Exchange)

Fanout 这种类型非常简单。正如从名称中猜到的那样,它是将接收到的所有消息广播到它知道的 所有队列中
image

生产者:

/**
 * 扇出交换机,生产者
 * 生产者负责声明 Exchange和绑定队列关系
 */
public class FanoutProducer {
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMQUtil.getChannel();
        // 声明交换机参数:1-交换机名称  , 2-交换机类型
        channel.exchangeDeclare(FanoutContans.EXCHANGENAME, BuiltinExchangeType.FANOUT);
        // 绑定键,绑定队列与交换机的关系,参数:1-队列名称  2-交换机名称  3-路由键
        channel.queueBind(FanoutContans.QUEUENAME,FanoutContans.EXCHANGENAME,"");
        channel.queueBind(FanoutContans.QUEUENAME1,FanoutContans.EXCHANGENAME,"");

        Scanner scanner = new Scanner(System.in);

        while (scanner.hasNext()){
            String message = scanner.next();
            // 发送消息参数:1-交换机名称  2-路由键  3-其他参数  4-消息
            channel.basicPublish(FanoutContans.EXCHANGENAME,"123",null,message.getBytes());
            System.out.println("生产者发出消息是:"+message);
        }
    }
}

消费者:

/**
 * 消费者,测试扇出模式
 * 消费者负责声明Queue
 */
public class FanoutConsumer {
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMQUtil.getChannel();
        /**
         * 生成一个临时的队列 队列的名称是随机的
         * 当消费者断开和该队列的连接时 队列自动删除
         */

        // 声明队列
        channel.queueDeclare(FanoutContans.QUEUENAME,false,false,false,null);
        int prefetchCount = 1;
        channel.basicQos(prefetchCount);
        // 读取消息
        System.out.println("等待接收消息。。。。。把接收到的消息打印在控制台上。。。。。");

        // 回调函数 成功后的消息
        DeliverCallback deliverCallback = (consumerTag, message)->{
            System.out.println("接收到的消息是:"+new String(message.getBody()));
        };

        channel.basicConsume(FanoutContans.QUEUENAME,true,deliverCallback,null,null);
    }
}

经过测试可以实现一条消息发送个多个消费者
绑定关系如下:
image

然后我想着,如果说routingkey 不同是不是就不会发送消息,于是我修改了生产者的代码:

// 绑定键,绑定队列与交换机的关系,参数:1-队列名称  2-交换机名称  3-路由键
channel.queueBind(FanoutContans.QUEUENAME,FanoutContans.EXCHANGENAME,"aaaa");
channel.queueBind(FanoutContans.QUEUENAME1,FanoutContans.EXCHANGENAME,"bbbb");
// 发送消息
channel.basicPublish(FanoutContans.EXCHANGENAME,"aaaa",null,message.getBytes());

测试发现,还是一样的发送了,就像广播一样,不管你的routingkey是否一致,都会发送都同一个交换机上,由交换机统一发送给消费者。
原因如下:
对于fanout类型交换机,routing key不起任何作用,不会影响消息的传递。任何绑定的队列都会收到消息。
这确实是一个fanout交换机的设计特点,文档中也有明确说明。我们在使用时需要注意这一点。

对于上诉代码还是由缺点的,在mq中必须要先声明在使用,例如我生产者端,只声明了交换机和绑定关系,并没有声明队列,那么此时运行就会报错,只有消费者端(声明了队列)先运行了,在运行生产者才可以运行,解决办法:

// 在生产者中同样声明队列就好了
/**
 * 扇出交换机,生产者
 * 生产者负责声明 Exchange和绑定队列关系
 */
public class FanoutProducer {
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMQUtil.getChannel();
        // 声明交换机参数:1-交换机名称  , 2-交换机类型
        channel.exchangeDeclare(FanoutContans.EXCHANGENAME, BuiltinExchangeType.FANOUT);
        // 绑定键,绑定队列与交换机的关系,参数:1-队列名称  2-交换机名称  3-路由键
        channel.queueBind(FanoutContans.QUEUENAME,FanoutContans.EXCHANGENAME,"");
        channel.queueBind(FanoutContans.QUEUENAME1,FanoutContans.EXCHANGENAME,"");
        // 声明队列
        channel.queueDeclare(FanoutContans.QUEUENAME,false,false,false,null);
        channel.queueDeclare(FanoutContans.QUEUENAME1,false,false,false,null);

        Scanner scanner = new Scanner(System.in);

        while (scanner.hasNext()){
            String message = scanner.next();
            // 发送消息参数:1-交换机名称  2-路由键  3-其他参数  4-消息
            channel.basicPublish(FanoutContans.EXCHANGENAME,"123",null,message.getBytes());
            System.out.println("生产者发出消息是:"+message);
        }
    }
}

直连交换机(Direct Exchange)

直连交换机工作原理:将消息推送到与routingkey相同的队列上
image
在上面这张图中,我们可以看到 X 绑定了两个队列,绑定类型是 direct。队列 Q1 绑定键为 orange, 队列 Q2 绑定键有两个:一个绑定键为 black,另一个绑定键为 green.

在这种绑定情况下,生产者发布消息到 exchange 上,绑定键为 orange 的消息会被发布到队列 Q1。绑定键为 blackgreen 和的消息会被发布到队列 Q2,其他消息类型的消息将被丢弃。

那么根据这个原理,我们也可以得出,当指定的routingkey相同时,就变成了扇出交换机了。

生产者代码:

/**
 * 直接交换机---路由模式
 * 根据不同的routingkey 指定发送到不同的队列
 */
public class DirectProducer {
    public static void main(String[] args) throws Exception{
        Channel channel = RabbitMQUtil.getChannel();
        // 声明交换机-> 指明直连交换机
        channel.exchangeDeclare(DirectContians.DIRECT_EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
        // 声明队列
        channel.queueDeclare(DirectContians.DIRECT_QUEUE_NAME,false,false,false,null);
        channel.queueDeclare(DirectContians.DIRECT_QUEUE_NAME1,false,false,false,null);
        // 绑定队列关系 -->队列名称,交换机名称,关联的routingkey
        channel.queueBind(DirectContians.DIRECT_QUEUE_NAME,DirectContians.DIRECT_EXCHANGE_NAME,DirectContians.ROUTING_KEY_H1);
        channel.queueBind(DirectContians.DIRECT_QUEUE_NAME1,DirectContians.DIRECT_EXCHANGE_NAME,DirectContians.ROUTING_KEY_H2);
        Scanner scanner = new Scanner(System.in);

        while (scanner.hasNext()){
            String message = scanner.next();
            // 发送消息参数:1-交换机名称  2-路由键  3-其他参数  4-消息
            // 发送消息到 routingkeyH1中,那么另外一个队列由于绑定的是h2,所以h2 接收不到
            channel.basicPublish(DirectContians.DIRECT_EXCHANGE_NAME,DirectContians.ROUTING_KEY_H1,null,message.getBytes());
            System.out.println("生产者发出消息是:"+message);
        }
    }
}

消费者代码:

/**
 * 消费者代码,
 * 生产者通过direct模式指定routingkey发送给消费者1
 */
public class DirectConsumer1 {
    public static void main(String[] args) throws Exception{

        Channel channel = RabbitMQUtil.getChannel();
        /**
         * 生成一个临时的队列 队列的名称是随机的
         * 当消费者断开和该队列的连接时 队列自动删除
         */
        // 声明队列
        channel.queueDeclare(DirectContians.DIRECT_QUEUE_NAME,false,false,false,null);
        System.out.println("等待接收消息。。。。。把接收到的消息打印在控制台上。。。。。");
        // 回调函数 成功后的消息
        DeliverCallback deliverCallback = (consumerTag, message)->{
            System.out.println("接收到的消息是:"+new String(message.getBody()));
        };
        // 从队列当中读取消息
        channel.basicConsume(DirectContians.DIRECT_QUEUE_NAME,true,deliverCallback,null,null);
    }
}

消费者2代码:与1基本一致,唯一不同点是从另外一个队列中获取消息

channel.basicConsume(DirectContians.DIRECT_QUEUE_NAME1,true,deliverCallback,null,null);

测试结果:
在指定了routingkey 为 h1 的前提下,无论发送多少条消息,都由队列1 接收到

绑定关系如下:
image

主题交换机(Topic Exchange)

场景引入

比方说我们想接收的日志类型有 info.base 和 info.advantage,某个队列只想 info.base 的消息,那这个时候 direct 就办不到了。这个时候 就只能使用 topic 类型

Topic 的要求

发送到类型是 topic 交换机的消息的 routing_key 不能随意写,必须满足一定的要求,它必须是一个单 词列表,以点号分隔开。这些单词可以是任意单词,比如说:"stock.usd.nyse", "nyse.vmw", "quick.orange.rabbit".这种类型的。当然这个单词列表最多不能超过 255 个字节。

在这个规则列表中,其中有两个替换符是大家需要注意的

*(星号)可以代替一个单词

#(井号)可以替代零个或多个单词

image

下图绑定关系如下:

Q1-->绑定的是

  • 中间带 orange 带 3 个单词的字符串(.orange.)

Q2-->绑定的是

  • 最后一个单词是 rabbit 的 3 个单词(..rabbit)
  • 第一个单词是 lazy 的多个单词(lazy.#)

当队列绑定关系是下列这种情况时需要引起注意

  • 当一个队列绑定键是#,那么这个队列将接收所有数据,就有点像 fanout 了
  • 如果队列绑定键当中没有#和*出现,那么该队列绑定类型就是 direct 了

生产者代码:

/**
 * 主题交换机,routing_key 不能随便写,以点号分隔开
 * *号代替一个代词
 * #号代替多个单词或者无单词
 */
public class TopicsProducer {
    public static void main(String[] args) throws Exception {
        //1.创建连接
        Channel channel = RabbitMQUtil.getChannel();
        //2 声明交换机为topic模式
        channel.exchangeDeclare("topic_logs", BuiltinExchangeType.TOPIC);
        //3.声明三个队列
        channel.queueDeclare("Q1",false,false,false,null);
        channel.queueDeclare("Q2",false,false,false,null);
        channel.queueDeclare("Q3",false,false,false,null);
        //4.绑定
        channel.queueBind("Q1","topic_logs","*.orange.*");
        channel.queueBind("Q2","topic_logs","*.*.rabbit");
        channel.queueBind("Q3","topic_logs","lazy.#");
        //测试数据
        Map<String, String> bindingKeyMap = new HashMap<>();
        // key 为 routing-key 值为消息
        bindingKeyMap.put("quick.orange.rabbit","被队列 Q1Q2 接收到");
        bindingKeyMap.put("lazy.orange.elephant","被队列 Q1Q2 接收到");
        bindingKeyMap.put("quick.orange.fox","被队列 Q1 接收到");
        bindingKeyMap.put("lazy.brown.fox","被队列 Q2 接收到");
        bindingKeyMap.put("lazy.pink.rabbit","虽然满足两个绑定但只被队列 Q2 接收一次");
        bindingKeyMap.put("quick.brown.fox","不匹配任何绑定不会被任何队列接收到会被丢弃");
        bindingKeyMap.put("quick.orange.male.rabbit","是四个单词不匹配任何绑定会被丢弃");
        bindingKeyMap.put("lazy.orange.male.rabbit","是四个单词但匹配 Q2");

        //发送消息
        Set<Map.Entry<String, String>> entries = bindingKeyMap.entrySet();
        Iterator<Map.Entry<String, String>> iterator = entries.iterator();
        while (iterator.hasNext()){
            Map.Entry<String, String> next = iterator.next();
            String key = next.getKey();
            String value = next.getValue();
            channel.basicPublish("topic_logs",key,null,value.getBytes(StandardCharsets.UTF_8));
            System.out.println("生产者发送消息:"+value);
        }

    }
}

消费者代码:

public class TopicConsumer {
    public static void main(String[] args) throws Exception{
        Channel channel = RabbitMQUtil.getChannel();
        // 接收消息的回调
        DeliverCallback ackCallback = ( consumerTag,  message)->{
            System.out.println("消费者1接收到的消息为:"+new String(message.getBody()));
        };
        // 取消消息的回调
        CancelCallback nackCallback =(consumerTag)->{
            System.out.println("消息被取消了");
        };

        channel.basicConsume("Q1",true,ackCallback,nackCallback);
    }
}

绑定关系:
image

标签:false,队列,绑定,RabbitMQ,交换机,消息,channel
From: https://www.cnblogs.com/zgf123/p/17673744.html

相关文章

  • 24 路由器,交换机,IP,DNS,子网掩码,网关
    昨天有同志遇到了电脑连接问题,他是在一个大型局域网中,他们的网络交换机没打开自动分配IP的功能,所以IP地址都是手动配置,期间遇到了子网掩码,IP地址,网关,DNS服务器等概念,逐一记录,希望能让所有人看懂。一、交换机主要功能为端口拓展,让你有更多网络端口,扩大局域网接入点。。工作在TCPI......
  • rabbitmq消息持久化
    概念消息是可以持久化保存的,持久的目的是为了处理任务丢失情况的,采用持久化可以保证消息存储,且消息不被丢失。默认情况下RabbitMQ退出或由于某种原因崩溃时,它忽视队列和消息,除非告知它不要这样做。确保消息不会丢失需要做两件事:我们需要将队列和消息都标记为持久化。代码......
  • 华为交换机接口类型汇总
    接入链路和干道链路如何处理标签!   华为设备默认所有接口的缺省标签为1   只要接口配置有缺省标签,就会对标签做处理交换机链路分为干道链路和接入链路.    接入链路:一条链路一端为二层接口,另外一端为非二层接口.    干道链路:一条链路的两端均为二层接口. ......
  • RabbitMQ Stream类型队列
    RabbitMQ提供了三种类型的队列:ClassicQuorumStream官方文档对于流队列的描述是:高性能、可持久化、可复制、非破坏性消费、只追加写入的日志使用场景:一个队列将同一条消息分发给不同消费者可重复消费消息更高的性能存储大量消息而不影响性能更高的吞......
  • RabbitMQ快速入门--简单队列模型
             ......
  • RabbitMQ快速入门--介绍和安装
                     ......
  • 06 二层交换机转发行为
    接入层负责终端的接入,平常在家使用的WiFi是利用胖AP实现的接入层工作,AP是无线的接入层,在有线的环境负责终端接入的是二层交换机,将终端的RJ45口(网口)通过网线和交换机相连就完成终端接入了,无线终端使用手机连接对应的WiFi就实现接入了,本章介绍的有线的接入层设备,以太网二层交换机......
  • RabbitMQ 管理页面该如何查看有哪些连接
    用浏览器访问  http://192.168.1.100:15672   默认用户名:admin   密码: admin 登陆后显示 在Connections页中查看所有连接    ......
  • 10在centos7安装RabbitMQ Server
    一.erlang环境安装erlang语言环境和RabbitMQ版本的对应关系如下:https://www.rabbitmq.com/which-erlang.html  本次安装RabbitMQ 3.11.20 和erlang25.3.2.5进入erlang官网下载https://www.erlang.org/patches/otp-25.3.2.5 安装编译环境yuminstallmakeg......
  • openstack nova基础知识——RabbitMQ
    nova中各个组件之间的交互是通过“消息队列”来实现的,其中一种实现方法就是使用RabbitMQ,对RabbitMQ的使用,官方文档上有一个非常好的GetStarted,由浅及深,结合例子,很容易理解。现在对RabbitMQ的理解,就是利用它可以非常灵活的定制自己想要实现的消息收发机制。其中,有这样几个角色:produ......