首页 > 其他分享 >RabbitMQ核心架构

RabbitMQ核心架构

时间:2024-08-30 16:55:36浏览次数:17  
标签:交换器 架构 String 队列 核心 RabbitMQ 消息 路由

RabbitMQ架构设计 

  1. Producer:负责产生消息。

  2. Connection:RabbitMQ客户端和代理服务器之间的TCP连接。

  3. Channel:建立在连接之上的虚拟连接,RabbitMQ操作都是在信道中进行。

  4. Broker:一个Broker可以看做一个RabbitMQ服务节点或者服务实例。

  5. Exchange:生产者发送消息到交换器,交换器根据路由key投递到相应的队列。

  6. Queue:存储消息的队列 。

  7. RoutingKey:路由键,指定消息的路由规则。

  8. BindingKey:绑定键,关联交换器和队列。

  9. Consumer:消费消息。

路由机制

  1. Direct:默认方式,根据消息的路由键完全匹配队列的绑定键来分发消息。
  2. fanout:广播模式,将消息投递到所有绑定到交换器的队列。
  3. topic:使用模糊匹配的方式根据路由键将消息分发到不同的队列中,支持通配符(*和#)进行匹配。
  4. header:不依赖路由键,而是根据消息的头部信息来进行匹配和分发。

连接RabbitMQ

final ConnectionFactory factory = new ConnectionFactory();
factory.setHost(IP_ADDRESS);
factory.setPort(PORT);
factory.setUsername(USERNAME);
factory.setPassword(PASSWORD);
factory.setVirtualHost(VIRTUAL_HOST);
final Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();

Connection 可以用来创建多个 Channel,但是 Channel 不能线程共享使用。channel 的开启有一个 isOpen 方法可以得知

com.rabbitmq.client.impl.ShutdownNotifierComponent#isOpen    
@Override
    public boolean isOpen() {
    synchronized(this.monitor) {
        return this.shutdownCause == null;
    }
}

生产者发送消息

  1. 生产者连接到 RabbitMO Broker,建立一个连接(Connection),开启一个信道(Channel)
  2. 生产者声明一个交换器,并设置相关属性,比如交换器类型、是否持久化等
  3. 生产者声明一个队列并设置相关属性,比如是否排他、是否持久化、是否自动删除等
  4. 生产者通过路由键将交换器和队列绑定起来
  5. 生产者发送消息至 RabbitMO Broker,其中包含路由键、交换器等信息
  6. 相应的交换器根据接收到的路由键查找相匹配的队列。
  7. 如果找到,则将从生产者发送过来的消息存入相应的队列中。
  8. 如果没有找到,则根据生产者配置的属性选择丢弃还是回退给生产者
  9. 关闭信道。
  10. 关闭连接。

交换器和队列

创建临时队列

channel.exchangeDeclare(EXCHANGE_NAME, "direct", true);
final String queue = channel.queueDeclare().getQueue();
channel.queueBind(queue, EXCHANGE_NAME, routingKey);

上面创建一个持久化的、绑定类型为 direct 的交换器,同时也创建了一个非持久化的、排他的、自动删除的队列(队列名称由 RabbitMQ 自动生成)。

创建持久化队列

channel.exchangeDeclare(EXCHANGE_NAME, "direct", true);
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
channel.queueBind(queue, EXCHANGE_NAME, routingKey);

分配一个固定的队列名称,并设置持久化、非排他的、非自动删除的队列

生产者和消费者都可以声明一个交换器或则队列,如果尝试声明一个已经存在的交换器或队列(只要声明的参数完全匹配已存在的交换器或队列),RabbitMQ 则什么都不做,直接返回成功。如果参数不匹配则会抛出异常

创建交换器

public Exchange.DeclareOk exchangeDeclare(String exchange, String type,
                                          boolean durable,
                                          boolean autoDelete,
                                          boolean internal,
                                          Map<String, Object> arguments)

返回 Exchange.DeclareOk 标识成功声明了一个交换器

  • exchange:交换器名称

  • type:交换器类型;常见的有:fanout、direct、topic...

    com.rabbitmq.client.BuiltinExchangeType 类定义了交换器类型

  • durable:是否持久化

    持久化将交换器存盘,服务重启时不会丢失相关信息

  • autoDelete:是否自动删除;自动删除的前提是:至少有一个队列或则交换器与这个交换器绑定,之后所有与这个交换器绑定的队列或则交换器都与此解绑

    **注意:**这里自动删除,不是当连接断开时,自动删除这个交换器。

  • internal:是否内置的;如果是内置的交换器,客户端程序无法直接发送消息到这个交换器 中,只能通过交换器路由 到交换器这种方式。

  • arguments:其他一些结构化参数

删除交换器

void exchangeDeleteNoWait(String exchange, boolean ifUnused) throws IOException;
  • exchange:交换器名称
  • isUnused:设置为 true ,则只有交换器没有被使用时,才被删除。

创建队列

Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
                             Map<String, Object> arguments) throws IOException;
  • queue:队列名称

  • durable:是否持久化

  • exclusive:是否排他;当一个队列被声明为排他队列,该队列 仅对首次声明它 的连接可见,并在连接断开时自动删除。这里需要注意一点:就算是持久化的,一旦连接关闭,这个排他队列也会被自动删除。

  • autoDelete:是否自动删除,与交换器定义一致;

  • arguments:设置队列的其他一些参数

    如 x-message-ttlx-expiresx-max-lengthx-max-length-bytesx-dead-letter-exchangex-dead-letter-routing-keyx-max-priority

删除队列

Queue.DeleteOk queueDelete(String queue, boolean ifUnused, boolean ifEmpty) throws IOException;

 队列绑定交换器

Queue.BindOk queueBind(String queue, String exchange, String routingKey, Map<String, Object> arguments) throws IOException;
  • queue:队列名
  • exchange:交换器名称
  • routingKey:用来绑定队列和交换器的路由键
  • arguments:定义绑定的一些参数

交换器与交换器绑定

Exchange.BindOk exchangeBind(String destination, String source, String routingKey, Map<String, Object> arguments) throws IOException;

 发送消息

void basicPublish(String exchange, String routingKey, boolean mandatory, BasicProperties props, byte[] body)
    throws IOException;
  • exchange:交换器名称,如果为空,则会发送到 RabbitMQ 默认的交换器中
  • routingKey:路由键
  • mandatory:mandatory 参数设为 true 时,交换器无法根据自身的类型和路由键找到一个符合条件的队列,那么 RabbitMQ会调用 Basic.Return 命令将消息返回给生产者 。当 mandatory
    数设置为 false 时,出现上述情形,则消息直接被丢弃那么生产者如何获取到没有被正确路由到合适队列的消息呢?这时候可以通过调用channel addReturnListener 来添加 ReturnListener 监昕器实现。
  • props:消息的基本属性集

消费消息

RabbitMQ 消费模式分两种:

  • Push:推模式;采用 Basic.Consume 进行消费

  • Pull:拉模式;则使用 Basic.Get 进行消费

消息分发

当RabbitMQ 队列拥有多个消费者时 ,队列收到的消息将以轮询 (round-robin )的分发方式发送给消费者。每条消息只会发送给订阅列表里的一个消费者。这种方式非常适合扩展,而且它是专门为并发程序设计的。如果现在负载加重,那么只需要创建更多的消费者来消费处理消息即可。

默认情况下,如果有 个消费者,那么 RabbitMQ会将第 条消息分发给第 m%n (取余的方式)个消费者, RabbitMQ 不管消费者是否消费并己经确认 (Basic.Ack) 了消息。

如果某些消费者任务繁重,来不及消费那么多的消息,而某些其他消费者由于某些原因很快地处理完了所分配到的消息,进而进程空闲,这样就会造成整体应用吞吐量的下降。

这里就要用到 channel.basicQos(int prefetchCount) 这个方法,channel.basicQos 方法允许限制信道上的消费者所能保持的最大未确认消息的数量。例如在订阅消费队列之前,消费端程序调用了 channel.basicQos(5) ,之后订阅了某个队列进行消费。 RabbitMQ会保存一个消费者的列表,每发送一条消息都会为对应的消费者计数,如果达到了所设定的上限,那么 RabbitMQ 就不会向这个消费者再发送任何消息。直到消费者确认了某条消息之后 RabbitMQ 将相应的计数减1,之后消费者可以继续接收消息,直到再次到达计数上限。

Basic.Qos 的使用对于拉模式的消费方式无效.

标签:交换器,架构,String,队列,核心,RabbitMQ,消息,路由
From: https://blog.csdn.net/weixin_35973945/article/details/141690809

相关文章

  • Clobotics 计算机视觉场景存储实践:多云架构、 POSIX 全兼容、低运维的统一存储HB
    Clobotics是一家将计算机视觉和机器学习技术应用于风电以及零售行业的企业。在风电行业,Clobotics利用无人机对风力发电机叶片进行检查,显著降低了对人工作业的依赖。在零售领域,公司通过分析捕获的包装商品图像来提供基于实时数据的洞察,以增加销售额并减少运营成本。存储方面,Clob......
  • OpenHarmony开发:应用分层架构设计
    HarmonyOS应用的分层架构设计以一套代码工程为基础,旨在为华为的手机、2in1等1+8全场景设备提供支持,实现了“一次开发,多端部署”的开发理念。HarmonyOS应用的分层架构主要包括三个层次:产品定制层、基础特性层和公共能力层,为开发者构建了一个清晰、高效、可扩展的设计架构。本......
  • 【案例分享】如何利用京东云建设高可用业务架构
    本文以2022年一个实际项目为基础,来演示在京东云上构建高可用业务的整个过程。公有云及私有云客户可通过使用京东云的弹性IAAS、PAAS服务,创建高可用、高弹性、高可扩展、高安全的云上业务环境,提升业务SLA,提升运维自动化水平,降低资源成本及运维成本。有业务迁移上云需求,希望构建云上......
  • RabbitMQ 集群与高可用性
    目录单节点与集群部署1.1.单节点部署1.2.集群部署 镜像队列1.定义与工作原理2.配置镜像队列3.应用场景4.优缺点5.Java示例分布式部署1.分布式部署的主要目标2.典型架构设计3.RabbitMQ分布式部署的关键技术4.部署策略和实践5.分布式部署的挑战和解......
  • 英飞凌HSM内核开发-软硬件架构
    veHsm硬件和软件架构概述1.软件硬件架构veHsm是一个嵌入式硬件安全模块,它通过硬件提供的安全区域来增强安全性,这个区域包括:专用核心:负责执行安全操作。安全内存:用于存储敏感数据,如密钥和资产,这部分内存仅对专用核心可访问。密码硬件加速器:硬件组件,用于加速密码算法的执......
  • Clobotics 计算机视觉场景存储实践:多云架构、 POSIX 全兼容、低运维的统一存储
    Clobotics是一家将计算机视觉和机器学习技术应用于风电以及零售行业的企业。在风电行业,Clobotics利用无人机对风力发电机叶片进行检查,显著降低了对人工作业的依赖。在零售领域,公司通过分析捕获的包装商品图像来提供基于实时数据的洞察,以增加销售额并减少运营成本。存储方面,Clob......
  • FFmpeg开发笔记(四十八)从0开始搭建直播系统的开源软件架构
    音视频技术的一个主要用途是直播,包括电视直播、电脑直播、手机直播等等,甚至在线课堂、在线问诊、安防监控等应用都属于直播系统的范畴。由于直播系统不仅涉及到音视频数据的编解码,还涉及到音视频数据的实时传输,因此直播领域采用的网络技术标准比较高,实现起来也比一般的WEB系统复杂......