首页 > 其他分享 >RabbitMQ(兔子队列入门/消息队列)

RabbitMQ(兔子队列入门/消息队列)

时间:2024-09-26 11:56:11浏览次数:15  
标签:交换器 false 入门 队列 RabbitMQ 消息 channel


介绍

(本笔记不涉及RabbitMQ的环境搭建,主要用于了解和上手使用RabbitMQ)

RabbitMQ是一种消息队列,什么是消息队列?

消息(Message):是指在应用之间传送的数据,消息可以非常简单,比如只包含文本字符串,也可以更复杂,可能包含嵌入对象。

**队列:**可以说是一个数据结构,可以存储数据,如下图,我们从右侧(队尾)插入元素(入队),从队头获取元素(出队)。

RabbitMQ(兔子队列入门/消息队列)_消息路由

**消息队列(Message Queue)**是一种应用间的通信方式,消息发送后可以立即返回,有消息系统来确保信息的可靠专递,消息发布者只管把消息发布到MQ中而不管谁来取,消息使用者只管从MQ中取消息而不管谁发布的,这样发布者和使用者都不用知道对方的存在。

RabbitMQ(兔子队列入门/消息队列)_持久化_02

消息队列的作用

消息队列主要用于解决以下问题:

  • 应用耦合:多应用间通过消息队列对同一消息进行处理,避免调用接口失败导致整个过程失败
  • 异步处理:多应用对消息队列中同一消息进行处理,应用间并发处理消息,相比串行处理,减少处理时间
  • 限流削峰:广泛应用于秒杀或抢购活动中,避免流量过大导致应用系统挂掉的情况
  • 例如:将订单创建完成后续的操作,放入到RabbitMQ中在后台执行。

例:当我们在秒杀活动场景时,比如你的服务器一秒能处理100个订单,但秒杀活动1秒进来1000个订单,持续10秒,在后端能力无法增加的情况下,你可以用消息队列将总共10000个请求压在队列里,后台消费服务按原有能力处理,比如100多秒后处理完所有请求,在这种情况下后台服务消费可能会比正常慢一点,但是不至于让系统直接宕机丢失订单数据。

RabbitMQ介绍

RabbitMQ——Rabbit Message Queue的简写,但不能仅仅理解其为消息队列,消息代理更合适。RabbitMQ 是一个由 Erlang 语言开发的AMQP(高级消息队列协议)的开源实现,其内部结构如下:

RabbitMQ(兔子队列入门/消息队列)_消息路由_03

RabbitMQ作为一个消息代理,主要和消息打交道,负责接收并转发消息。

RabbitMQ提供了可靠的消息机制、跟踪机制和灵活的消息路由,支持消息集群和分布式部署。适用于排队算法、秒杀活动、消息分发、异步处理、数据同步、处理耗时任务、CQRS等应用场景。

下面我们先来看看其内部结构中出现的角色承担者的作用:

Producer 生产者

生产者创建消息,然后发布到RabbitMQ中。消息一般可以包含2个部分:消息体标签(Label)。消息体也可以称之为payload,在实际应用中,消息体一般是一个带有业务逻辑结构的数据,比如一个JSON字符串。当然可以进一步对这个消息体进行序列化操作。消息的标签用来表述这条消息,比如一个交换器的名称和一个路由键。

Consumer 消费者

消费者连接到RabbitMQ服务器,并订阅到队列上。当消费者消费一条消息时,只是消费消息的消息体(payload)。在消息路由的过程中,消息的标签会丢弃,存入到队列中的消息只有消息体,也就不知道消息的生产者是谁。

Broker:消息中间件的服务节点

对于RabbitMQ来说,一个RabbitMQ Broker可以简单地看作一个RabbitMQ服务节点,或者RabbitMQ服务实例。大多数情况下也可以将一个RabbitMQ Broker看作一台RabbitMQ服务器。

Queue:队列,是RabbitMQ的内部对象,用于存储消息

RabbitMQ中消息都只能存储在队列中,多个消费者可以订阅同一个队列,这时队列中的消息会被平均分摊(Round-Robin,即轮询)给多个消费者进行处理,而不是每个消费者都收到所有的消息并处理,如图

RabbitMQ(兔子队列入门/消息队列)_消息路由_04

例:

//声明一个队列
channel.QueueDeclare(queueName, false, false, false, null);
Exchange:交换器

RabbitMQ中生产者将消息发送到Exchange(交换器,通常也可以用大写的“X”来表示),由交换器将消息路由到一个或者多个队列中。如果路由不到,或许会返回给生产者,或许直接丢弃。

RabbitMQ有4种交换器:

  • Direct exchange(直连)
  • Fanout exchange(扇形)
  • Topic exchange(主题)
  • Headers exchange(头部)

其中前3者比较常用后面会通过代码分别演示这几个交换器的用法

例:

//创建一个非持久化的、非自动删除的、绑定类型为direct的交换器
channel.ExchangeDeclare(exchangeName, ExchangeType.Direct, false, false, null);
Binding:绑定

RabbitMQ中通过绑定将交换器与队列关联起来,在绑定的时候一般会指定一个Key,告诉RabbitMQ如何正确地将消息路由到队列

RabbitMQ(兔子队列入门/消息队列)_持久化_05

//将队列与交换机绑定channel.QueueBind(queueName, exchangeName, routeKey);

Connection:连接

表示到消息代理的真实 TCP 连接

Channel:信道

多路复用连接中的一条独立的双向数据流通道,可读可写。Channel是我们与RabbitMQ打交道的最重要的一个接口,我们大部分的业务操作都是在Channel这个接口中完成的,包括定义Queue、Exchange、绑定Queue与Exchange、发布消息等等。在我们进行这些操作之前我们都需要与RabbitMQ建立一个Connection(Connection 表示到消息代理的真实 TCP 连接),但是如果每次访问RabbitMQ都需要建立Connection的话,在消息量大的时候建立TCP Connection的开销无疑也是巨大的,效率也比较低。这时候Channel起了作用。Channel是Connection中的虚拟连接(AMPQ连接),Channel可以复用Connection的TCP连接,当每个信道的流量不是很大时,复用单一的Connection可以在产生性能瓶颈的情况下有效地节省TCP连接资源。但是当信道的流量很大时,这时候多个信道复用一个Connection就会产生性能瓶颈,就需要开辟多个Connection,将这些信道均摊到这些Connection中。

感兴趣的同学可以移步官网:Channels

var cfg=new ConnectionFactiory();//创建一个连接工厂
var conn=cf.newConnection();//创建一个连接
var ch=conn.CreateModel();//创建一个信道
Virtual Host:虚拟主机

当我们配置好本地环境后使用账号密码guest、guest登录时就是在访问默认的VHost("/")了。RabbitMQ中的资源权限是针对每个Virtual Host的,用户没有全局权限,只有一个或多个的Virtual Host权限。每一个Virtual Host就相当于一个小型的RabbitMQ服务器,拥有自己的交换机、队列、绑定等、拥有自己的一套权限机制。VHost之间是相互独立的,它既能将同一个RabbitMQ的众多客户区分开来,又可以避免队列和交换器的命名冲突。

感兴趣的同学可以移步官网:[Virtual Hosts](https://www.rabbitmq.com/vhosts.html#:~:text=RabbitMQ is multi-tenant system%3A connections%2C exchanges%2C queues%2C bindings%2C,server blocks in Nginx%2C the idea is similar.)

代码实操(主要用于使用不同的交换器)

直连交换器(Direct)
using RabbitMQ.Client;
using System.Text;

// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory
{
    UserName = "guest",
    Password = "guest",
    HostName = "localhost"
};

using (var connection = factory.CreateConnection())
// 创建通道
using (var channel = connection.CreateModel())
{
    //使用QueueDeclare方法声明一个队列
    channel.QueueDeclare("hello1", false, false, false, null);
	//如果没有声明交换器,则会绑定一个默认的交换器,类型为Direct
    Console.WriteLine("RabbitMQ 连接成功!请输入消息,输入 exit 退出!");
    string input;
    // 创建一个非持久化的、非自动删除的、绑定类型为 direct 的交换器
    channel.ExchangeDeclare("direct_Exchange", ExchangeType.Direct, false, false, null);
    // 将队列与交换机绑定,路由键设置为Orange
    channel.QueueBind("hello1", "direct_Exchange", "orange");
     channel.QueueBind("hello2", "direct_Exchange", "genng");
    do
    {
        input = Console.ReadLine();
        var sendBytes = Encoding.UTF8.GetBytes(input);
        // 发布消息
        //参数解析:exchange:交换器名称。如果为空字符串(""),则消息会被发布到默认交换器,默认交换器会将消息路由到与消息的路由键(routing key)同名的队列中。
        //routingKey:路由键,用于决定消息被路由到哪个队列。不同的交换器类型对路由键的处理方式不同。直连必须匹配
        //mandatory:布尔类型。如果设置为true,当无法将消息路由到任何队列时,会将消息返回给生产者,并触发一个BasicReturn事件。如果设置为false,无法路由的消息会被丢弃。
        //basicProperties:一个IBasicProperties对象,用于设置消息的属性,如消息的优先级、过期时间、内容类型等。如果不需要设置特殊属性,可以设置为null。
        //body:消息内容,一个字节数组。这是要发送的实际消息数据
        //指向的队列为hello1
        channel.BasicPublish("direct_Exchange", "orange", null, sendBytes);
         //指向的队列为hello2
        channel.BasicPublish("direct_Exchange", "genng", null, sendBytes);
    } while (input.Trim().ToLower()!= "exit");
}
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;

// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory
{
    UserName = "guest",
    Password = "guest",
    HostName = "localhost"
};

// 创建连接
using (var connection = factory.CreateConnection())
// 创建通道
using (var channel = connection.CreateModel())
{
    // 创建事件基本消费者
    EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
    // 接收到消息事件处理
    consumer.Received += (ch, ea) =>
    {
        var message = Encoding.UTF8.GetString(ea.Body.ToArray());
        Console.WriteLine($"收到消息:{message}");
        // 确认该消息已被消费
        channel.BasicAck(ea.DeliveryTag, false);
    };
    // 启动消费者,设置为手动应答消息,消费队列的消息
    //BasicConsume('队列名称',是否自动应答消息)
    channel.BasicConsume("hello1", false, consumer);
    Console.WriteLine("消费者 1 已启动");
    Console.ReadKey();
}

路由键为:orange

RabbitMQ(兔子队列入门/消息队列)_持久化_06

路由键为:genng

RabbitMQ(兔子队列入门/消息队列)_消息队列_07

扇形交换器(Fanout)
using RabbitMQ.Client;
using System.Text;

// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory
{
    UserName = "guest",
    Password = "guest",
    HostName = "localhost"
};

using (var connection = factory.CreateConnection())
// 创建通道
using (var channel = connection.CreateModel())
{
   //使用QueueDeclare方法声明一个队列
    channel.QueueDeclare("hello1", false, false, false, null);
	//如果没有声明交换器,则会绑定一个默认的交换器,类型为Direct
    Console.WriteLine("RabbitMQ 连接成功!请输入消息,输入 exit 退出!");
    string input;
    // 创建一个非持久化的、非自动删除的、绑定类型为 Fanout 的交换器
    channel.ExchangeDeclare("direct_Exchange", ExchangeType.Fanout, false, false, null);
    //扇形交换器不需要路由键
    channel.QueueBind("hello1", "direct_Exchange", "");
     channel.QueueBind("hello2", "direct_Exchange", "");
    do
    {
        input = Console.ReadLine();
        var sendBytes = Encoding.UTF8.GetBytes(input);
        // 发布消息
         //指向的所以绑定了direct_Exchange的队列,Fanout类型无需路由键
        channel.BasicPublish("direct_Exchange", "", null, sendBytes);
    } while (input.Trim().ToLower()!= "exit");
}

RabbitMQ(兔子队列入门/消息队列)_消息队列_08

主题交换器(Topic)
using RabbitMQ.Client;
using System.Text;

// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory
{
    UserName = "guest",
    Password = "guest",
    HostName = "localhost"
};

using (var connection = factory.CreateConnection())
// 创建通道
using (var channel = connection.CreateModel())
{
   //使用QueueDeclare方法声明一个队列
    channel.QueueDeclare("hello1", false, false, false, null);
	//如果没有声明交换器,则会绑定一个默认的交换器,类型为Direct
    Console.WriteLine("RabbitMQ 连接成功!请输入消息,输入 exit 退出!");
    string input;
    // 创建一个非持久化的、非自动删除的、绑定类型为 Topic 的交换器
    channel.ExchangeDeclare("direct_Exchange", ExchangeType.Topic, false, false, null);
    //扇形交换器不需要路由键
    channel.QueueBind("hello1", "direct_Exchange", "routerkey.*");
     //channel.QueueBind("hello2", "direct_Exchange", "routerkey.#");
    do
    {
        input = Console.ReadLine();
        var sendBytes = Encoding.UTF8.GetBytes(input);
        // 发布消息
         //指向的键值routerkey.*和routerkey.#符合的队列,Fanout类型无需路由键
        channel.BasicPublish("direct_Exchange", "routerkey.one", null, sendBytes);
    } while (input.Trim().ToLower()!= "exit");
}

RabbitMQ(兔子队列入门/消息队列)_消息路由_09

方法解析

QueueDeclare
  • .queue:队列名称
  • durable:持久化(用于重启队列不丢失)
  • exclusive:排他性(只有创建这个队列的连接可以使用)
  • autoDelete:自动删除(当最后一个消费者取消订阅或关闭连接时,队列会被自动删除。)
  • arguments:一些额外的参数
ExchangeDeclare
  • exchange:交换器的名称,一个字符串。用于唯一标识这个交换器。
  • type:交换器的类型,是一个枚举值。常见的交换器类型有Direct(直连)、Fanout(扇出)、Topic(主题)等。不同类型的交换器有不同的路由规则。
  • durable:
  • 如果设置为true,表示交换器是持久化的。持久化的交换器会在服务器重启后仍然存在。
  • 如果设置为false,交换器是非持久化的,在服务器重启后可能会丢失。
  • autoDelete:
  • 如果设置为true,表示交换器是自动删除的。当没有队列或交换器与该交换器绑定时,交换器会被自动删除。
  • 如果设置为false,交换器不会自动删除,需要手动管理其生命周期。
  • arguments:一个字典类型的参数,可以传递一些额外的交换器参数,例如交换器的特定属性等。如果不需要传递额外参数,可以设置为null。
BasicPublish
  • exchange:交换器名称。如果为空字符串(""),则消息会被发布到默认交换器,默认交换器会将消息路由到与消息的路由键(routing key)同名的队列中。
  • routingKey:路由键,用于决定消息被路由到哪个队列。不同的交换器类型对路由键的处理方式不同。直连必须匹配
  • mandatory:布尔类型。如果设置为true,当无法将消息路由到任何队列时,会将消息返回给生产者,并触发一个BasicReturn事件。如果设置为false,无法路由的消息会被丢弃。
  • basicProperties:一个IBasicProperties对象,用于设置消息的属性,如消息的优先级、过期时间、内容类型等。如果不需要设置特殊属性,可以设置为null。
  • body:消息内容,一个字节数组。这是要发送的实际消息数据

标签:交换器,false,入门,队列,RabbitMQ,消息,channel
From: https://blog.51cto.com/u_16927457/12118193

相关文章

  • Leetcode 622. 设计循环队列
    1.题目基本信息1.1.题目描述设计你的循环队列实现。循环队列是一种线性数据结构,其操作表现基于FIFO(先进先出)原则并且队尾被连接在队首之后以形成一个循环。它也被称为“环形缓冲器”。循环队列的一个好处是我们可以利用这个队列之前用过的空间。在一个普通队列里,一旦一个队列......
  • Docker入门
    Dockerfile文件详解Docker的常用命令Centos7基于容器安装运行Docker私有仓库及添加认证Centos7本地安装运行Dockerregistry私有仓库及添加认证Centos下安装Docker集群管理工具ShipyardDocker镜像仓库Harbor1.7.0搭建及配置docke通信之Linux网络命名空间docke网络之bridge......
  • AI Agent学习攻略:从入门到精通,看我这篇就够了!附资料
    可以先学习AIAgent相关理论,再结合应用和实践去理解。下面我从AIAgent的基本概念、原理、组成、应用、实现方法等方面来详细介绍~一、理论篇AIagent是什么?AIagent人工智能代理是指能够感知环境、做出决策并采取行动以实现特定目标的智能系统。更先进的系统还可以随着......
  • 黑马PM-基础入门-问题思考维度
    抓住核心用户为什么要抓住核心用户了解付费点,更好优化产品怎么描述核心用户供需关系人口特征(年龄,地域)熟悉程度(新,老,专家用户)业务场景产品定位+运营数据如何抓取核心用户场景化分析场景5要素用户停留在某个特定的空间的时间里,对应的情境下产生的故事情节就是场......
  • 黑马PM-基础入门-思维导图入门
    ......