首页 > 其他分享 >RabbitMQ 发布订阅(Publish Subscribe)模式示例

RabbitMQ 发布订阅(Publish Subscribe)模式示例

时间:2024-10-19 10:20:59浏览次数:7  
标签:示例 队列 Publish Subscribe FANOUT 参数 消息 交换机 channel

总结自:BV15k4y1k7Ep

交换机

订阅模式示例图:

1556014499573

简单模式工作队列模式中,只有 3 个角色:

  • P:生产者,也就是要发送消息的程序。
  • C:消费者,消息的接受者,会一直等待消息到来。
  • Queue:消息队列,图中红色部分。

而在订阅模型中,多了一个 Exchange 角色,而且工作过程略有变化:

  • P:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给 X(交换机)。
  • C:消费者,消息的接受者,会一直等待消息到来。
  • Queue:消息队列,接收消息、缓存消息。
  • Exchange:交换机,图中的 X。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于 Exchange 的类型。Exchange 有常见以下 3 种类型:
    • Fanout:广播,将消息交给所有绑定到交换机的队列。
    • Direct:定向,把消息交给符合指定 routing key 的队列。
    • Topic:通配符,把消息交给符合 routing pattern(路由模式)的队列。

Exchange(交换机)只负责转发消息,不具备存储消息的能力。因此如果没有任何队列与 Exchange 绑定,或者没有符合路由规则的队列,那么消息会丢失!

模式说明

1556010329032

发布订阅模式:

1、每个消费者监听自己的队列。

2、生产者将消息发给 broker,由交换机将消息转发到绑定此交换机的每个队列,每个绑定交换机的队列都将接收
到消息。

代码

1)生产者

和简单模式和工作队列模式相比,发布订阅模式会声明交换机,声明多个队列,并将队列和交换机绑定:

/*
 * 声明交换机
 * 参数 1:交换机名称
 * 参数 2:交换机类型,fanout、topic、direct、headers
 */
channel.exchangeDeclare(FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT);

// 声明(创建)队列
/*
 * 参数 1:队列名称
 * 参数 2:是否定义持久化队列
 * 参数 3:是否独占本次连接
 * 参数 4:是否在不使用的时候自动删除队列
 * 参数 5:队列其它参数
 */
channel.queueDeclare(FANOUT_QUEUE_1, true, false, false, null);
channel.queueDeclare(FANOUT_QUEUE_2, true, false, false, null);

// 队列绑定交换机
channel.queueBind(FANOUT_QUEUE_1, FANOUT_EXCHANGE, "");
channel.queueBind(FANOUT_QUEUE_2, FANOUT_EXCHANGE, "");

发送消息时,会发送到指定交换机:

/*
 * 参数 1:交换机名称,如果没有指定则使用默认 Default Exchange
 * 参数 2:路由 key,简单模式可以传递队列名称
 * 参数 3:消息其它属性
 * 参数 4:消息内容
 */
channel.basicPublish(FANOUT_EXCHANGE, "", null, message.getBytes());

以下为完整代码:

package com.zhangmingge.rabbitmq.ps;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.zhangmingge.rabbitmq.ConnectionUtil;

/**
 * 发布与订阅使用的交换机类型为:fanout
 */
public class Producer {

    // 交换机名称
    static final String FANOUT_EXCHANGE = "fanout_exchange";
    // 队列名称
    static final String FANOUT_QUEUE_1 = "fanout_queue_1";
    // 队列名称
    static final String FANOUT_QUEUE_2 = "fanout_queue_2";

    public static void main(String[] args) throws Exception {

        // 创建连接
        Connection connection = ConnectionUtil.getConnection();

        // 创建频道
        Channel channel = connection.createChannel();

        /*
         * 声明交换机
         * 参数 1:交换机名称
         * 参数 2:交换机类型,fanout、topic、direct、headers
         */
        channel.exchangeDeclare(FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT);

        // 声明(创建)队列
        /*
         * 参数 1:队列名称
         * 参数 2:是否定义持久化队列
         * 参数 3:是否独占本次连接
         * 参数 4:是否在不使用的时候自动删除队列
         * 参数 5:队列其它参数
         */
        channel.queueDeclare(FANOUT_QUEUE_1, true, false, false, null);
        channel.queueDeclare(FANOUT_QUEUE_2, true, false, false, null);

        // 队列绑定交换机
        channel.queueBind(FANOUT_QUEUE_1, FANOUT_EXCHANGE, "");
        channel.queueBind(FANOUT_QUEUE_2, FANOUT_EXCHANGE, "");

        for (int i = 1; i <= 10; i++) {
            // 发送信息
            String message = "你好:小兔子!发布订阅模式--" + i;
            /*
             * 参数 1:交换机名称,如果没有指定则使用默认 Default Exchange
             * 参数 2:路由 key,简单模式可以传递队列名称
             * 参数 3:消息其它属性
             * 参数 4:消息内容
             */
            channel.basicPublish(FANOUT_EXCHANGE, "", null, message.getBytes());
            System.out.println("已发送消息:" + message);
        }

        // 关闭资源
        channel.close();
        connection.close();
    }
}

2)消费者 1

和简单模式和工作队列模式相比,发布订阅模式的消费者多了声明交换机和将队列绑定到交换机的过程:

// 声明交换机
channel.exchangeDeclare(Producer.FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT);

// 声明(创建)队列
/*
 * 参数 1:队列名称
 * 参数 2:是否定义持久化队列
 * 参数 3:是否独占本次连接
 * 参数 4:是否在不使用的时候自动删除队列
 * 参数 5:队列其它参数
 */
channel.queueDeclare(Producer.FANOUT_QUEUE_1, true, false, false, null);

// 队列绑定交换机
channel.queueBind(Producer.FANOUT_QUEUE_1, Producer.FANOUT_EXCHANGE, "");

下面为完整代码:

package com.zhangmingge.rabbitmq.ps;

import com.rabbitmq.client.*;
import com.zhangmingge.rabbitmq.ConnectionUtil;

import java.io.IOException;

public class Consumer1 {

    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtil.getConnection();

        // 创建频道
        Channel channel = connection.createChannel();

        // 声明交换机
        channel.exchangeDeclare(Producer.FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT);

        // 声明(创建)队列
        /*
         * 参数 1:队列名称
         * 参数 2:是否定义持久化队列
         * 参数 3:是否独占本次连接
         * 参数 4:是否在不使用的时候自动删除队列
         * 参数 5:队列其它参数
         */
        channel.queueDeclare(Producer.FANOUT_QUEUE_1, true, false, false, null);

        // 队列绑定交换机
        channel.queueBind(Producer.FANOUT_QUEUE_1, Producer.FANOUT_EXCHANGE, "");

        // 创建消费者,并设置消息处理
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            /*
             * consumerTag 消息者标签,在 channel.basicConsume 时候可以指定
             * envelope 消息包的内容,可从中获取消息 id,消息 routingKey,交换机,消息和重传标志 (收到消息失败后是否需要重新发送)
             * properties 属性信息
             * body 消息
             */
            @Override
            public void handleDelivery(String consumerTag,
                                       Envelope envelope,
                                       AMQP.BasicProperties properties,
                                       byte[] body) throws IOException {
                // 路由 key
                System.out.println("路由 key 为:" + envelope.getRoutingKey());
                // 交换机
                System.out.println("交换机为:" + envelope.getExchange());
                // 消息 id
                System.out.println("消息 id 为:" + envelope.getDeliveryTag());
                // 收到的消息
                System.out.println("消费者 1-接收到的消息为:" + new String(body, "utf-8"));
            }
        };
        // 监听消息
        /*
         * 参数 1:队列名称
         * 参数 2:是否自动确认,设置为 true 为表示消息接收到自动向 mq 回复接收到了,mq 接收到回复会删除消息,设置为 false 则需要手动确认
         * 参数 3:消息接收到后回调
         */
        channel.basicConsume(Producer.FANOUT_QUEUE_1, true, consumer);
    }
}

3)消费者 2

消费者 2 和消费者 1 只有日志内容和绑定的队列不同,其他没有区别。

package com.zhangmingge.rabbitmq.ps;

import com.rabbitmq.client.*;
import com.zhangmingge.rabbitmq.ConnectionUtil;

import java.io.IOException;

public class Consumer2 {

    public static void main(String[] args) throws Exception {
        ...
        channel.queueDeclare(Producer.FANOUT_QUEUE_2, true, false, false, null);
        channel.queueBind(Producer.FANOUT_QUEUE_2, Producer.FANOUT_EXCHANGE, "");

        // 创建消费者,并设置消息处理
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag,
                                       Envelope envelope,
                                       AMQP.BasicProperties properties,
                                       byte[] body) throws IOException {
                ...
                System.out.println("消费者 2-接收到的消息为:" + new String(body, "utf-8"));
            }
        };
        // 监听消息
        ...
        channel.basicConsume(Producer.FANOUT_QUEUE_2, true, consumer);
    }
}

测试

启动所有消费者,然后使用生产者发送消息,在每个消费者对应的控制台可以查看到生产者发送的所有消息,达到广播的效果。

在执行完测试代码后,其实到 RabbitMQ 的管理后台找到Exchanges选项卡,点击fanout_exchange的交换机,可以查看到如下的绑定:

1556015006220

小结

交换机需要与队列进行绑定,绑定之后,一个消息可以被多个消费者都收到。

发布订阅模式与工作队列模式的区别

1、工作队列模式不用定义交换机,而发布/订阅模式需要定义交换机。

2、发布/订阅模式的生产方是面向交换机发送消息,工作队列模式的生产方是面向队列发送消息(底层使用默认交换机)。

3、发布/订阅模式需要设置队列和交换机的绑定,工作队列模式不需要设置,实际上工作队列模式会将队列绑定到默认的交换机。

标签:示例,队列,Publish,Subscribe,FANOUT,参数,消息,交换机,channel
From: https://www.cnblogs.com/Higurashi-kagome/p/18475559

相关文章

  • 使用AES 128位加解密,加解密模式采用CBC,填充模式采用PKCS5Padding的Java工具方法示例
    importjavax.crypto.Cipher;importjavax.crypto.spec.IvParameterSpec;importjavax.crypto.spec.SecretKeySpec;importjava.util.Base64;publicclassAESUtils{privatestaticfinalStringAES_ALGORITHM="AES/CBC/PKCS5Padding";private......
  • ThreeJS入门(123):THREE.Skeleton 知识详解,示例代码
    作者:还是大剑师兰特,曾为美国某知名大学计算机专业研究生,现为国内GIS领域高级前端工程师,CSDN知名博主,深耕openlayers、leaflet、mapbox、cesium,webgl,ThreeJS,canvas,echarts等技术开发,欢迎加微信(gis-dajianshi),一起交流。查看本专栏目录-本文是第123篇入门文章......
  • APP视频活体检测API代码示例-动作活体检测开发
    视频活体检测技术是一种用于判断摄像头前的对象是否为真实活人的方法。传统的面部识别技术虽然能够有效地识别面部特征,但在面对照片、视频甚至是高精度的3D面具时,却显得力不从心。视频活体检测技术通过分析面部的微表情、皮肤纹理以及光线反射等多维度信息,能够有效地区分真实......
  • 支付宝沙箱版(什么是支付宝沙箱、配置支付宝沙箱、配置内网穿透、在SpringBoot项目中对
    文章目录0.前言1.什么是支付宝沙箱2.配置支付宝沙箱2.1沙箱应用的应用信息(获取app-id和gateway-url)2.2沙箱账号的商家信息和买家信息2.3下载秘钥工具2.4生成秘钥(获取private-key)2.5配置秘钥(获取alipay-public-key)3.配置内网穿透3.1使用cpolar实现内网穿透3.2......
  • 识别图形验证码 (Elixir 示例)
    安装所需依赖在你的mix.exs文件中添加以下依赖:elixirdefpdepsdo[{:httpoison,"~>1.8"},{:mogrify,"~>0.7"},{:tesseract,"~>0.1"}]更多内容联系1436423940end然后运行mixdeps.get来安装这些库。下载并保存验证码图片使用HTTPoison下载验证码图片并保存......
  • 识别图形验证码 (Scala 示例)
    安装所需依赖在你的build.sbt文件中添加以下依赖:scalalibraryDependencies+="org.scalaj"%%"scalaj-http"%"2.4.2"下载并保存验证码图片使用scalaj-http下载验证码图片并保存到本地:scalaimportscalaj.http._importjava.nio.file.{Files,Paths}objectCaptch......
  • 识别图形验证码 (Julia 示例)
    安装所需依赖在JuliaREPL中使用以下命令安装所需的包:juliausingPkgPkg.add("HTTP")Pkg.add("Images")Pkg.add("Tesseract")下载并保存验证码图片使用HTTP下载验证码图片并保存到本地:juliausingHTTPusingFileIOfunctiondownload_captcha(url::String,save_pa......
  • python+eel入门示例
    安装eelpipinstalleelpyimporteelimportrandom#笑话列表jokes=["为什么电脑经常生病?因为窗户(Windows)总是开着!","为什么数学书看起来总是很悲伤?因为它里面有太多的问题(problems)","为什么海洋里没有电脑?因为它们总是遇到短路(seals)","为什么冰......
  • Vue3 + Openlayers10示例 台风轨迹和台风圈
    前言假装已经完成了vue3和Openlayers10开发环境的搭建,如果有需要,可搜索vue+Openlayers环境搭建的相关文章。本示例基于Vue3和Openlayers10的环境,实现台风轨迹和台风圈的效果。一、安装插件安装Element-plus插件,其实只在台风列表的地方用到了el-checkbox,可根据实际需......
  • 例2.3列表操作示例
    '''首先先定义一个列表,列表是写在[]里,用逗号隔开的,元素是可以改变的列表的截取语法结构是:变量[头下标:尾下标]'''L=['abc',12,3.45,'python',2.789]#输出完整列表print(L)#输出列表的第一个元素print(L[0])#将列表的第一个元素修改为‘a’L[0]='a'#将列表的第2个元素到第3个元素......