首页 > 其他分享 >RabbitMQ(兔子队列入门/消息队列)

RabbitMQ(兔子队列入门/消息队列)

时间:2024-09-26 11:56:11浏览次数:10  
标签:交换器 false 入门 队列 RabbitMQ 消息 channel


介绍

(本笔记不涉及RabbitMQ的环境搭建,主要用于了解和上手使用RabbitMQ)

RabbitMQ是一种消息队列,什么是消息队列?

消息(Message):是指在应用之间传送的数据,消息可以非常简单,比如只包含文本字符串,也可以更复杂,可能包含嵌入对象。

**队列:**可以说是一个数据结构,可以存储数据,如下图,我们从右侧(队尾)插入元素(入队),从队头获取元素(出队)。

RabbitMQ(兔子队列入门/消息队列)_消息路由

**消息队列(Message Queue)**是一种应用间的通信方式,消息发送后可以立即返回,有消息系统来确保信息的可靠专递,消息发布者只管把消息发布到MQ中而不管谁来取,消息使用者只管从MQ中取消息而不管谁发布的,这样发布者和使用者都不用知道对方的存在。

RabbitMQ(兔子队列入门/消息队列)_持久化_02

消息队列的作用

消息队列主要用于解决以下问题:

  • 应用耦合:多应用间通过消息队列对同一消息进行处理,避免调用接口失败导致整个过程失败
  • 异步处理:多应用对消息队列中同一消息进行处理,应用间并发处理消息,相比串行处理,减少处理时间
  • 限流削峰:广泛应用于秒杀或抢购活动中,避免流量过大导致应用系统挂掉的情况
  • 例如:将订单创建完成后续的操作,放入到RabbitMQ中在后台执行。

例:当我们在秒杀活动场景时,比如你的服务器一秒能处理100个订单,但秒杀活动1秒进来1000个订单,持续10秒,在后端能力无法增加的情况下,你可以用消息队列将总共10000个请求压在队列里,后台消费服务按原有能力处理,比如100多秒后处理完所有请求,在这种情况下后台服务消费可能会比正常慢一点,但是不至于让系统直接宕机丢失订单数据。

RabbitMQ介绍

RabbitMQ——Rabbit Message Queue的简写,但不能仅仅理解其为消息队列,消息代理更合适。RabbitMQ 是一个由 Erlang 语言开发的AMQP(高级消息队列协议)的开源实现,其内部结构如下:

RabbitMQ(兔子队列入门/消息队列)_消息路由_03

RabbitMQ作为一个消息代理,主要和消息打交道,负责接收并转发消息。

RabbitMQ提供了可靠的消息机制、跟踪机制和灵活的消息路由,支持消息集群和分布式部署。适用于排队算法、秒杀活动、消息分发、异步处理、数据同步、处理耗时任务、CQRS等应用场景。

下面我们先来看看其内部结构中出现的角色承担者的作用:

Producer 生产者

生产者创建消息,然后发布到RabbitMQ中。消息一般可以包含2个部分:消息体标签(Label)。消息体也可以称之为payload,在实际应用中,消息体一般是一个带有业务逻辑结构的数据,比如一个JSON字符串。当然可以进一步对这个消息体进行序列化操作。消息的标签用来表述这条消息,比如一个交换器的名称和一个路由键。

Consumer 消费者

消费者连接到RabbitMQ服务器,并订阅到队列上。当消费者消费一条消息时,只是消费消息的消息体(payload)。在消息路由的过程中,消息的标签会丢弃,存入到队列中的消息只有消息体,也就不知道消息的生产者是谁。

Broker:消息中间件的服务节点

对于RabbitMQ来说,一个RabbitMQ Broker可以简单地看作一个RabbitMQ服务节点,或者RabbitMQ服务实例。大多数情况下也可以将一个RabbitMQ Broker看作一台RabbitMQ服务器。

Queue:队列,是RabbitMQ的内部对象,用于存储消息

RabbitMQ中消息都只能存储在队列中,多个消费者可以订阅同一个队列,这时队列中的消息会被平均分摊(Round-Robin,即轮询)给多个消费者进行处理,而不是每个消费者都收到所有的消息并处理,如图

RabbitMQ(兔子队列入门/消息队列)_消息路由_04

例:

//声明一个队列
channel.QueueDeclare(queueName, false, false, false, null);
Exchange:交换器

RabbitMQ中生产者将消息发送到Exchange(交换器,通常也可以用大写的“X”来表示),由交换器将消息路由到一个或者多个队列中。如果路由不到,或许会返回给生产者,或许直接丢弃。

RabbitMQ有4种交换器:

  • Direct exchange(直连)
  • Fanout exchange(扇形)
  • Topic exchange(主题)
  • Headers exchange(头部)

其中前3者比较常用后面会通过代码分别演示这几个交换器的用法

例:

//创建一个非持久化的、非自动删除的、绑定类型为direct的交换器
channel.ExchangeDeclare(exchangeName, ExchangeType.Direct, false, false, null);
Binding:绑定

RabbitMQ中通过绑定将交换器与队列关联起来,在绑定的时候一般会指定一个Key,告诉RabbitMQ如何正确地将消息路由到队列

RabbitMQ(兔子队列入门/消息队列)_持久化_05

//将队列与交换机绑定channel.QueueBind(queueName, exchangeName, routeKey);

Connection:连接

表示到消息代理的真实 TCP 连接

Channel:信道

多路复用连接中的一条独立的双向数据流通道,可读可写。Channel是我们与RabbitMQ打交道的最重要的一个接口,我们大部分的业务操作都是在Channel这个接口中完成的,包括定义Queue、Exchange、绑定Queue与Exchange、发布消息等等。在我们进行这些操作之前我们都需要与RabbitMQ建立一个Connection(Connection 表示到消息代理的真实 TCP 连接),但是如果每次访问RabbitMQ都需要建立Connection的话,在消息量大的时候建立TCP Connection的开销无疑也是巨大的,效率也比较低。这时候Channel起了作用。Channel是Connection中的虚拟连接(AMPQ连接),Channel可以复用Connection的TCP连接,当每个信道的流量不是很大时,复用单一的Connection可以在产生性能瓶颈的情况下有效地节省TCP连接资源。但是当信道的流量很大时,这时候多个信道复用一个Connection就会产生性能瓶颈,就需要开辟多个Connection,将这些信道均摊到这些Connection中。

感兴趣的同学可以移步官网:Channels

var cfg=new ConnectionFactiory();//创建一个连接工厂
var conn=cf.newConnection();//创建一个连接
var ch=conn.CreateModel();//创建一个信道
Virtual Host:虚拟主机

当我们配置好本地环境后使用账号密码guest、guest登录时就是在访问默认的VHost("/")了。RabbitMQ中的资源权限是针对每个Virtual Host的,用户没有全局权限,只有一个或多个的Virtual Host权限。每一个Virtual Host就相当于一个小型的RabbitMQ服务器,拥有自己的交换机、队列、绑定等、拥有自己的一套权限机制。VHost之间是相互独立的,它既能将同一个RabbitMQ的众多客户区分开来,又可以避免队列和交换器的命名冲突。

感兴趣的同学可以移步官网:[Virtual Hosts](https://www.rabbitmq.com/vhosts.html#:~:text=RabbitMQ is multi-tenant system%3A connections%2C exchanges%2C queues%2C bindings%2C,server blocks in Nginx%2C the idea is similar.)

代码实操(主要用于使用不同的交换器)

直连交换器(Direct)
using RabbitMQ.Client;
using System.Text;

// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory
{
    UserName = "guest",
    Password = "guest",
    HostName = "localhost"
};

using (var connection = factory.CreateConnection())
// 创建通道
using (var channel = connection.CreateModel())
{
    //使用QueueDeclare方法声明一个队列
    channel.QueueDeclare("hello1", false, false, false, null);
	//如果没有声明交换器,则会绑定一个默认的交换器,类型为Direct
    Console.WriteLine("RabbitMQ 连接成功!请输入消息,输入 exit 退出!");
    string input;
    // 创建一个非持久化的、非自动删除的、绑定类型为 direct 的交换器
    channel.ExchangeDeclare("direct_Exchange", ExchangeType.Direct, false, false, null);
    // 将队列与交换机绑定,路由键设置为Orange
    channel.QueueBind("hello1", "direct_Exchange", "orange");
     channel.QueueBind("hello2", "direct_Exchange", "genng");
    do
    {
        input = Console.ReadLine();
        var sendBytes = Encoding.UTF8.GetBytes(input);
        // 发布消息
        //参数解析:exchange:交换器名称。如果为空字符串(""),则消息会被发布到默认交换器,默认交换器会将消息路由到与消息的路由键(routing key)同名的队列中。
        //routingKey:路由键,用于决定消息被路由到哪个队列。不同的交换器类型对路由键的处理方式不同。直连必须匹配
        //mandatory:布尔类型。如果设置为true,当无法将消息路由到任何队列时,会将消息返回给生产者,并触发一个BasicReturn事件。如果设置为false,无法路由的消息会被丢弃。
        //basicProperties:一个IBasicProperties对象,用于设置消息的属性,如消息的优先级、过期时间、内容类型等。如果不需要设置特殊属性,可以设置为null。
        //body:消息内容,一个字节数组。这是要发送的实际消息数据
        //指向的队列为hello1
        channel.BasicPublish("direct_Exchange", "orange", null, sendBytes);
         //指向的队列为hello2
        channel.BasicPublish("direct_Exchange", "genng", null, sendBytes);
    } while (input.Trim().ToLower()!= "exit");
}
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;

// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory
{
    UserName = "guest",
    Password = "guest",
    HostName = "localhost"
};

// 创建连接
using (var connection = factory.CreateConnection())
// 创建通道
using (var channel = connection.CreateModel())
{
    // 创建事件基本消费者
    EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
    // 接收到消息事件处理
    consumer.Received += (ch, ea) =>
    {
        var message = Encoding.UTF8.GetString(ea.Body.ToArray());
        Console.WriteLine($"收到消息:{message}");
        // 确认该消息已被消费
        channel.BasicAck(ea.DeliveryTag, false);
    };
    // 启动消费者,设置为手动应答消息,消费队列的消息
    //BasicConsume('队列名称',是否自动应答消息)
    channel.BasicConsume("hello1", false, consumer);
    Console.WriteLine("消费者 1 已启动");
    Console.ReadKey();
}

路由键为:orange

RabbitMQ(兔子队列入门/消息队列)_持久化_06

路由键为:genng

RabbitMQ(兔子队列入门/消息队列)_消息队列_07

扇形交换器(Fanout)
using RabbitMQ.Client;
using System.Text;

// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory
{
    UserName = "guest",
    Password = "guest",
    HostName = "localhost"
};

using (var connection = factory.CreateConnection())
// 创建通道
using (var channel = connection.CreateModel())
{
   //使用QueueDeclare方法声明一个队列
    channel.QueueDeclare("hello1", false, false, false, null);
	//如果没有声明交换器,则会绑定一个默认的交换器,类型为Direct
    Console.WriteLine("RabbitMQ 连接成功!请输入消息,输入 exit 退出!");
    string input;
    // 创建一个非持久化的、非自动删除的、绑定类型为 Fanout 的交换器
    channel.ExchangeDeclare("direct_Exchange", ExchangeType.Fanout, false, false, null);
    //扇形交换器不需要路由键
    channel.QueueBind("hello1", "direct_Exchange", "");
     channel.QueueBind("hello2", "direct_Exchange", "");
    do
    {
        input = Console.ReadLine();
        var sendBytes = Encoding.UTF8.GetBytes(input);
        // 发布消息
         //指向的所以绑定了direct_Exchange的队列,Fanout类型无需路由键
        channel.BasicPublish("direct_Exchange", "", null, sendBytes);
    } while (input.Trim().ToLower()!= "exit");
}

RabbitMQ(兔子队列入门/消息队列)_消息队列_08

主题交换器(Topic)
using RabbitMQ.Client;
using System.Text;

// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory
{
    UserName = "guest",
    Password = "guest",
    HostName = "localhost"
};

using (var connection = factory.CreateConnection())
// 创建通道
using (var channel = connection.CreateModel())
{
   //使用QueueDeclare方法声明一个队列
    channel.QueueDeclare("hello1", false, false, false, null);
	//如果没有声明交换器,则会绑定一个默认的交换器,类型为Direct
    Console.WriteLine("RabbitMQ 连接成功!请输入消息,输入 exit 退出!");
    string input;
    // 创建一个非持久化的、非自动删除的、绑定类型为 Topic 的交换器
    channel.ExchangeDeclare("direct_Exchange", ExchangeType.Topic, false, false, null);
    //扇形交换器不需要路由键
    channel.QueueBind("hello1", "direct_Exchange", "routerkey.*");
     //channel.QueueBind("hello2", "direct_Exchange", "routerkey.#");
    do
    {
        input = Console.ReadLine();
        var sendBytes = Encoding.UTF8.GetBytes(input);
        // 发布消息
         //指向的键值routerkey.*和routerkey.#符合的队列,Fanout类型无需路由键
        channel.BasicPublish("direct_Exchange", "routerkey.one", null, sendBytes);
    } while (input.Trim().ToLower()!= "exit");
}

RabbitMQ(兔子队列入门/消息队列)_消息路由_09

方法解析

QueueDeclare
  • .queue:队列名称
  • durable:持久化(用于重启队列不丢失)
  • exclusive:排他性(只有创建这个队列的连接可以使用)
  • autoDelete:自动删除(当最后一个消费者取消订阅或关闭连接时,队列会被自动删除。)
  • arguments:一些额外的参数
ExchangeDeclare
  • exchange:交换器的名称,一个字符串。用于唯一标识这个交换器。
  • type:交换器的类型,是一个枚举值。常见的交换器类型有Direct(直连)、Fanout(扇出)、Topic(主题)等。不同类型的交换器有不同的路由规则。
  • durable:
  • 如果设置为true,表示交换器是持久化的。持久化的交换器会在服务器重启后仍然存在。
  • 如果设置为false,交换器是非持久化的,在服务器重启后可能会丢失。
  • autoDelete:
  • 如果设置为true,表示交换器是自动删除的。当没有队列或交换器与该交换器绑定时,交换器会被自动删除。
  • 如果设置为false,交换器不会自动删除,需要手动管理其生命周期。
  • arguments:一个字典类型的参数,可以传递一些额外的交换器参数,例如交换器的特定属性等。如果不需要传递额外参数,可以设置为null。
BasicPublish
  • exchange:交换器名称。如果为空字符串(""),则消息会被发布到默认交换器,默认交换器会将消息路由到与消息的路由键(routing key)同名的队列中。
  • routingKey:路由键,用于决定消息被路由到哪个队列。不同的交换器类型对路由键的处理方式不同。直连必须匹配
  • mandatory:布尔类型。如果设置为true,当无法将消息路由到任何队列时,会将消息返回给生产者,并触发一个BasicReturn事件。如果设置为false,无法路由的消息会被丢弃。
  • basicProperties:一个IBasicProperties对象,用于设置消息的属性,如消息的优先级、过期时间、内容类型等。如果不需要设置特殊属性,可以设置为null。
  • body:消息内容,一个字节数组。这是要发送的实际消息数据

标签:交换器,false,入门,队列,RabbitMQ,消息,channel
From: https://blog.51cto.com/u_16927457/12118193

相关文章

  • Leetcode 622. 设计循环队列
    1.题目基本信息1.1.题目描述设计你的循环队列实现。循环队列是一种线性数据结构,其操作表现基于FIFO(先进先出)原则并且队尾被连接在队首之后以形成一个循环。它也被称为“环形缓冲器”。循环队列的一个好处是我们可以利用这个队列之前用过的空间。在一个普通队列里,一旦一个队列......
  • Docker入门
    Dockerfile文件详解Docker的常用命令Centos7基于容器安装运行Docker私有仓库及添加认证Centos7本地安装运行Dockerregistry私有仓库及添加认证Centos下安装Docker集群管理工具ShipyardDocker镜像仓库Harbor1.7.0搭建及配置docke通信之Linux网络命名空间docke网络之bridge......
  • Python从入门到放弃
    python基础知识PYTHON命名规范python字符串常用操作方法python字符串格式化输出python列表的常用操作方法python字典的常用操作方法Pycharm优化Python转义序列python中set和frozenset方法和区别python函数基础以及函数参数简解python的文件操作方法python常用内置函数Python常用模......
  • AI Agent学习攻略:从入门到精通,看我这篇就够了!附资料
    可以先学习AIAgent相关理论,再结合应用和实践去理解。下面我从AIAgent的基本概念、原理、组成、应用、实现方法等方面来详细介绍~一、理论篇AIagent是什么?AIagent人工智能代理是指能够感知环境、做出决策并采取行动以实现特定目标的智能系统。更先进的系统还可以随着......
  • 【Shiro】1.快速入门
    Shiro官网地址:Shiro官网:https://shiro.apache.org/Shiro简介概述ApacheShiro是Java的一个安全框架。Shiro可以帮助我们完成:认证、授权、加密、会话管理、与Web集成、缓存等。使用Shiro易于理解的API,您可以快速轻松地保护任何应用程序—从最小的移动应用程序到最大的web......
  • 黑马PM-基础入门-认识产品经理
    合格的产品经理什么是产品什么是产品经理想清楚产品怎么做的人产品经理的分类产品经理的岗位职责产品经理的能力素质产品经理未来趋势C端-轻量化应用开发,智能终端,视频内容创作,电商消费升级AR购物,虚拟社交,自动驾驶B端-基础侧云数智物赋能,云服务,业务侧低代......
  • 黑马PM-基础入门-问题思考维度
    抓住核心用户为什么要抓住核心用户了解付费点,更好优化产品怎么描述核心用户供需关系人口特征(年龄,地域)熟悉程度(新,老,专家用户)业务场景产品定位+运营数据如何抓取核心用户场景化分析场景5要素用户停留在某个特定的空间的时间里,对应的情境下产生的故事情节就是场......
  • 黑马PM-基础入门-产品解决方案
    需求分析多多体验产品功能优先级划分功能价值用户体验......
  • 黑马PM-基础入门-项目立项
    项目方案可行性分析产品规划立项评审......
  • 黑马PM-基础入门-思维导图入门
    ......