介绍
(本笔记不涉及RabbitMQ的环境搭建,主要用于了解和上手使用RabbitMQ)
RabbitMQ是一种消息队列,什么是消息队列?
消息(Message):是指在应用之间传送的数据,消息可以非常简单,比如只包含文本字符串,也可以更复杂,可能包含嵌入对象。
**队列:**可以说是一个数据结构,可以存储数据,如下图,我们从右侧(队尾)插入元素(入队),从队头获取元素(出队)。
**消息队列(Message Queue)**是一种应用间的通信方式,消息发送后可以立即返回,有消息系统来确保信息的可靠专递,消息发布者只管把消息发布到MQ中而不管谁来取,消息使用者只管从MQ中取消息而不管谁发布的,这样发布者和使用者都不用知道对方的存在。
消息队列的作用
消息队列主要用于解决以下问题:
- 应用耦合:多应用间通过消息队列对同一消息进行处理,避免调用接口失败导致整个过程失败
- 异步处理:多应用对消息队列中同一消息进行处理,应用间并发处理消息,相比串行处理,减少处理时间
- 限流削峰:广泛应用于秒杀或抢购活动中,避免流量过大导致应用系统挂掉的情况
- 例如:将订单创建完成后续的操作,放入到RabbitMQ中在后台执行。
例:当我们在秒杀活动场景时,比如你的服务器一秒能处理100个订单,但秒杀活动1秒进来1000个订单,持续10秒,在后端能力无法增加的情况下,你可以用消息队列将总共10000个请求压在队列里,后台消费服务按原有能力处理,比如100多秒后处理完所有请求,在这种情况下后台服务消费可能会比正常慢一点,但是不至于让系统直接宕机丢失订单数据。
RabbitMQ介绍
RabbitMQ——Rabbit Message Queue的简写,但不能仅仅理解其为消息队列,消息代理更合适。RabbitMQ 是一个由 Erlang 语言开发的AMQP(高级消息队列协议)的开源实现,其内部结构如下:
RabbitMQ作为一个消息代理,主要和消息打交道,负责接收并转发消息。
RabbitMQ提供了可靠的消息机制、跟踪机制和灵活的消息路由,支持消息集群和分布式部署。适用于排队算法、秒杀活动、消息分发、异步处理、数据同步、处理耗时任务、CQRS等应用场景。
下面我们先来看看其内部结构中出现的角色承担者的作用:
Producer 生产者
生产者创建消息,然后发布到RabbitMQ中。消息一般可以包含2个部分:消息体和标签(Label)。消息体也可以称之为payload,在实际应用中,消息体一般是一个带有业务逻辑结构的数据,比如一个JSON字符串。当然可以进一步对这个消息体进行序列化操作。消息的标签用来表述这条消息,比如一个交换器的名称和一个路由键。
Consumer 消费者
消费者连接到RabbitMQ服务器,并订阅到队列上。当消费者消费一条消息时,只是消费消息的消息体(payload)。在消息路由的过程中,消息的标签会丢弃,存入到队列中的消息只有消息体,也就不知道消息的生产者是谁。
Broker:消息中间件的服务节点
对于RabbitMQ来说,一个RabbitMQ Broker可以简单地看作一个RabbitMQ服务节点,或者RabbitMQ服务实例。大多数情况下也可以将一个RabbitMQ Broker看作一台RabbitMQ服务器。
Queue:队列,是RabbitMQ的内部对象,用于存储消息
RabbitMQ中消息都只能存储在队列中,多个消费者可以订阅同一个队列,这时队列中的消息会被平均分摊(Round-Robin,即轮询)给多个消费者进行处理,而不是每个消费者都收到所有的消息并处理,如图
例:
//声明一个队列
channel.QueueDeclare(queueName, false, false, false, null);
Exchange:交换器
RabbitMQ中生产者将消息发送到Exchange(交换器,通常也可以用大写的“X”来表示),由交换器将消息路由到一个或者多个队列中。如果路由不到,或许会返回给生产者,或许直接丢弃。
RabbitMQ有4种交换器:
- Direct exchange(直连)
- Fanout exchange(扇形)
- Topic exchange(主题)
- Headers exchange(头部)
其中前3者比较常用后面会通过代码分别演示这几个交换器的用法
例:
//创建一个非持久化的、非自动删除的、绑定类型为direct的交换器
channel.ExchangeDeclare(exchangeName, ExchangeType.Direct, false, false, null);
Binding:绑定
RabbitMQ中通过绑定将交换器与队列关联起来,在绑定的时候一般会指定一个Key,告诉RabbitMQ如何正确地将消息路由到队列
//将队列与交换机绑定channel.QueueBind(queueName, exchangeName, routeKey);
Connection:连接
表示到消息代理的真实 TCP 连接
Channel:信道
多路复用连接中的一条独立的双向数据流通道,可读可写。Channel是我们与RabbitMQ打交道的最重要的一个接口,我们大部分的业务操作都是在Channel这个接口中完成的,包括定义Queue、Exchange、绑定Queue与Exchange、发布消息等等。在我们进行这些操作之前我们都需要与RabbitMQ建立一个Connection(Connection 表示到消息代理的真实 TCP 连接),但是如果每次访问RabbitMQ都需要建立Connection的话,在消息量大的时候建立TCP Connection的开销无疑也是巨大的,效率也比较低。这时候Channel起了作用。Channel是Connection中的虚拟连接(AMPQ连接),Channel可以复用Connection的TCP连接,当每个信道的流量不是很大时,复用单一的Connection可以在产生性能瓶颈的情况下有效地节省TCP连接资源。但是当信道的流量很大时,这时候多个信道复用一个Connection就会产生性能瓶颈,就需要开辟多个Connection,将这些信道均摊到这些Connection中。
感兴趣的同学可以移步官网:Channels
var cfg=new ConnectionFactiory();//创建一个连接工厂
var conn=cf.newConnection();//创建一个连接
var ch=conn.CreateModel();//创建一个信道
Virtual Host:虚拟主机
当我们配置好本地环境后使用账号密码guest、guest登录时就是在访问默认的VHost("/")了。RabbitMQ中的资源权限是针对每个Virtual Host的,用户没有全局权限,只有一个或多个的Virtual Host权限。每一个Virtual Host就相当于一个小型的RabbitMQ服务器,拥有自己的交换机、队列、绑定等、拥有自己的一套权限机制。VHost之间是相互独立的,它既能将同一个RabbitMQ的众多客户区分开来,又可以避免队列和交换器的命名冲突。
感兴趣的同学可以移步官网:[Virtual Hosts](https://www.rabbitmq.com/vhosts.html#:~:text=RabbitMQ is multi-tenant system%3A connections%2C exchanges%2C queues%2C bindings%2C,server blocks in Nginx%2C the idea is similar.)
代码实操(主要用于使用不同的交换器)
直连交换器(Direct)
using RabbitMQ.Client;
using System.Text;
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory
{
UserName = "guest",
Password = "guest",
HostName = "localhost"
};
using (var connection = factory.CreateConnection())
// 创建通道
using (var channel = connection.CreateModel())
{
//使用QueueDeclare方法声明一个队列
channel.QueueDeclare("hello1", false, false, false, null);
//如果没有声明交换器,则会绑定一个默认的交换器,类型为Direct
Console.WriteLine("RabbitMQ 连接成功!请输入消息,输入 exit 退出!");
string input;
// 创建一个非持久化的、非自动删除的、绑定类型为 direct 的交换器
channel.ExchangeDeclare("direct_Exchange", ExchangeType.Direct, false, false, null);
// 将队列与交换机绑定,路由键设置为Orange
channel.QueueBind("hello1", "direct_Exchange", "orange");
channel.QueueBind("hello2", "direct_Exchange", "genng");
do
{
input = Console.ReadLine();
var sendBytes = Encoding.UTF8.GetBytes(input);
// 发布消息
//参数解析:exchange:交换器名称。如果为空字符串(""),则消息会被发布到默认交换器,默认交换器会将消息路由到与消息的路由键(routing key)同名的队列中。
//routingKey:路由键,用于决定消息被路由到哪个队列。不同的交换器类型对路由键的处理方式不同。直连必须匹配
//mandatory:布尔类型。如果设置为true,当无法将消息路由到任何队列时,会将消息返回给生产者,并触发一个BasicReturn事件。如果设置为false,无法路由的消息会被丢弃。
//basicProperties:一个IBasicProperties对象,用于设置消息的属性,如消息的优先级、过期时间、内容类型等。如果不需要设置特殊属性,可以设置为null。
//body:消息内容,一个字节数组。这是要发送的实际消息数据
//指向的队列为hello1
channel.BasicPublish("direct_Exchange", "orange", null, sendBytes);
//指向的队列为hello2
channel.BasicPublish("direct_Exchange", "genng", null, sendBytes);
} while (input.Trim().ToLower()!= "exit");
}
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory
{
UserName = "guest",
Password = "guest",
HostName = "localhost"
};
// 创建连接
using (var connection = factory.CreateConnection())
// 创建通道
using (var channel = connection.CreateModel())
{
// 创建事件基本消费者
EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
// 接收到消息事件处理
consumer.Received += (ch, ea) =>
{
var message = Encoding.UTF8.GetString(ea.Body.ToArray());
Console.WriteLine($"收到消息:{message}");
// 确认该消息已被消费
channel.BasicAck(ea.DeliveryTag, false);
};
// 启动消费者,设置为手动应答消息,消费队列的消息
//BasicConsume('队列名称',是否自动应答消息)
channel.BasicConsume("hello1", false, consumer);
Console.WriteLine("消费者 1 已启动");
Console.ReadKey();
}
路由键为:orange
路由键为:genng
扇形交换器(Fanout)
using RabbitMQ.Client;
using System.Text;
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory
{
UserName = "guest",
Password = "guest",
HostName = "localhost"
};
using (var connection = factory.CreateConnection())
// 创建通道
using (var channel = connection.CreateModel())
{
//使用QueueDeclare方法声明一个队列
channel.QueueDeclare("hello1", false, false, false, null);
//如果没有声明交换器,则会绑定一个默认的交换器,类型为Direct
Console.WriteLine("RabbitMQ 连接成功!请输入消息,输入 exit 退出!");
string input;
// 创建一个非持久化的、非自动删除的、绑定类型为 Fanout 的交换器
channel.ExchangeDeclare("direct_Exchange", ExchangeType.Fanout, false, false, null);
//扇形交换器不需要路由键
channel.QueueBind("hello1", "direct_Exchange", "");
channel.QueueBind("hello2", "direct_Exchange", "");
do
{
input = Console.ReadLine();
var sendBytes = Encoding.UTF8.GetBytes(input);
// 发布消息
//指向的所以绑定了direct_Exchange的队列,Fanout类型无需路由键
channel.BasicPublish("direct_Exchange", "", null, sendBytes);
} while (input.Trim().ToLower()!= "exit");
}
主题交换器(Topic)
using RabbitMQ.Client;
using System.Text;
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory
{
UserName = "guest",
Password = "guest",
HostName = "localhost"
};
using (var connection = factory.CreateConnection())
// 创建通道
using (var channel = connection.CreateModel())
{
//使用QueueDeclare方法声明一个队列
channel.QueueDeclare("hello1", false, false, false, null);
//如果没有声明交换器,则会绑定一个默认的交换器,类型为Direct
Console.WriteLine("RabbitMQ 连接成功!请输入消息,输入 exit 退出!");
string input;
// 创建一个非持久化的、非自动删除的、绑定类型为 Topic 的交换器
channel.ExchangeDeclare("direct_Exchange", ExchangeType.Topic, false, false, null);
//扇形交换器不需要路由键
channel.QueueBind("hello1", "direct_Exchange", "routerkey.*");
//channel.QueueBind("hello2", "direct_Exchange", "routerkey.#");
do
{
input = Console.ReadLine();
var sendBytes = Encoding.UTF8.GetBytes(input);
// 发布消息
//指向的键值routerkey.*和routerkey.#符合的队列,Fanout类型无需路由键
channel.BasicPublish("direct_Exchange", "routerkey.one", null, sendBytes);
} while (input.Trim().ToLower()!= "exit");
}
方法解析
QueueDeclare
- .queue:队列名称
- durable:持久化(用于重启队列不丢失)
- exclusive:排他性(只有创建这个队列的连接可以使用)
- autoDelete:自动删除(当最后一个消费者取消订阅或关闭连接时,队列会被自动删除。)
- arguments:一些额外的参数
ExchangeDeclare
- exchange:交换器的名称,一个字符串。用于唯一标识这个交换器。
- type:交换器的类型,是一个枚举值。常见的交换器类型有Direct(直连)、Fanout(扇出)、Topic(主题)等。不同类型的交换器有不同的路由规则。
- durable:
- 如果设置为true,表示交换器是持久化的。持久化的交换器会在服务器重启后仍然存在。
- 如果设置为false,交换器是非持久化的,在服务器重启后可能会丢失。
- autoDelete:
- 如果设置为true,表示交换器是自动删除的。当没有队列或交换器与该交换器绑定时,交换器会被自动删除。
- 如果设置为false,交换器不会自动删除,需要手动管理其生命周期。
- arguments:一个字典类型的参数,可以传递一些额外的交换器参数,例如交换器的特定属性等。如果不需要传递额外参数,可以设置为null。
BasicPublish
- exchange:交换器名称。如果为空字符串(""),则消息会被发布到默认交换器,默认交换器会将消息路由到与消息的路由键(routing key)同名的队列中。
- routingKey:路由键,用于决定消息被路由到哪个队列。不同的交换器类型对路由键的处理方式不同。直连必须匹配
- mandatory:布尔类型。如果设置为true,当无法将消息路由到任何队列时,会将消息返回给生产者,并触发一个BasicReturn事件。如果设置为false,无法路由的消息会被丢弃。
- basicProperties:一个IBasicProperties对象,用于设置消息的属性,如消息的优先级、过期时间、内容类型等。如果不需要设置特殊属性,可以设置为null。
- body:消息内容,一个字节数组。这是要发送的实际消息数据