1.MQ
1.1 概念
MQ(Message Queue,消息队列)是一种用于在分布式系统中实现消息传递和异步通信的技术。它充当了发送方和接收方之间的中间人,用于在应用程序或服务之间传递消息。MQ 允许系统中的不同组件彼此独立运行,而无需直接通信或相互依赖,从而提高系统的可扩展性、可靠性和灵活性。
概念剖析:
- 消息: 消息是应用程序之间传递的数据单元,可以是文本、JSON、XML 或其他格式。
- 队列: 队列是一个用于存储消息的容器,消息按照先进先出(FIFO)的顺序存储。
- 生产者: 生产者是发送消息到消息队列的应用程序或服务。它生成并推送消息到队列中。
- 消费者: 消费者是从消息队列中接收和处理消息的应用程序或服务。它会从队列中取出消息并执行相应的操作。
- 中间件: 消息队列通常由消息中间件实现,如 RabbitMQ、Kafka、ActiveMQ、Redis等。中间件负责管理消息的存储、传递和路由。
1.2 MQ的工作原理
MQ的工作方式通常包含以下步骤:
- 消息生产: 生产者将消息发送到消息队列。
- 消息存储: 消息队列中间件接收并存储消息,直到有消费者来取走它们。
- 消息消费: 消费者从队列中取出消息并处理。
- 消息确认: 消费者处理完消息后,通常会向队列发送确认,通知中间件消息已成功处理,可以将其删除或标记为已处理。
1.3 MQ应用场景
1.3.1 异步处理
场景说明:
用户注册后发送邮件和短信,假设每个步骤耗时100ms
-
做法一:串行 300ms
将注册信息写入数据库后,发送邮箱,在发送短信,以上三个任务全部完成后才返回客户端。
-
做法二:并行 200ms
将注册信息写入数据库后,发送邮箱的同时发送短信,以上三个任务全部完成后才返回客户端。
-
做法三:消息队列 100ms+写入消息队列时间
在将注册信息写入数据库后,立即返回客户端,显示注册成功。邮件和短信的发送可以异步处理,因为它们对网站的正常使用没有直接影响,客户端无需等待它们完成。
1.3.2 应用解耦
场景说明:
用户下单后,订单系统需要通知库存系统
-
做法一:接口直接调用
订单系统直接调用库存系统进行扣减,订单系统与库存系统强耦合,倘若库存系统出现故障或者性能瓶颈,会直接影响订单系统的使用。
-
做法二:消息队列
用户下单后,将消息写入到消息队列,返回用户下单成功,库存订阅消息处理。
1.3.3 流量削峰
场景说明:
秒杀活动,一般会因为流量过大,导致应用挂掉,为了解决这个问题,一般在应用前端加入消息队列。
1.4 常见的MQ
- ActiveMQ:Apache 软件基金会开发的 ActiveMQ 是一款广泛使用的消息中间件,支持多种协议。虽然其吞吐量有限,单机处理能力在万级左右,足以满足中小型项目的需求,但在大型互联网项目中可能会显得力不从心。
- Kafka: Kafka 主要用于实时数据流处理和大数据场景,擅长处理高吞吐量的消息。最初设计的目标是日志收集和传输。虽然不支持事务且对消息的重复、丢失或错误没有严格要求,但其在高吞吐量的应用场景中表现尤为出色。
- RocketMQ: RocketMQ是阿里开源的消息中间件,它是纯Java开发,具有高吞吐量、高可用性、适合大规模分布式系统应用的特点。MQ的所有优点它基本都满足。但是它最大的缺点:商业版收费。因此它有许多功能是不对外提供的。
- RabbitMQ: 使用Erlang语言开发的开源消息队列系统,基于AMQP协议来实现。AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。在高可用上,它使用的是镜像集群模式,可以保证高可用。在消息可靠性上,它是可以保证数据不丢失的,这也是它的一大优点。同时它也支持一些消息中间件的高级功能,如:消息重试、死信队列等。
1.5 为什么选择RabbitMQ
- ActiveMQ,性能不是很友好
- Kafka,主要强调性能,可靠性差
- RocketMQ,MQ的所有优点它基本都满足。但是它最大的缺点:商业版收费。因此它有许多功能是不对外提供的。
2.RabbitMQ 介绍
2.1 简介
RabbitMQ
是由erlang
语言开发,基于AMQP
(Advanced Message Queue 高级消息队列协议)协议实现的消息队列,它是一种应用程序之间的通信方法,消息队列在分布式系统开发中应用非常广泛。
特点:
- 基于
AMQP
协议来实现。主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。AMQP
协议更多用在企业系统内,对数据一致性、稳定性和可靠性要求很高的场景,对性能和吞吐量的要求还在其次。 - 可靠性:采用一系列机制来确保消息的可靠性,如持久化、传输确认和发布确认等方法。能够保证百分之百的不丢失。
- 高可用性:队列可以在集群的多台机器上进行镜像设置,即使其中的某些节点出现故障,队列仍然可用。
- 可扩展性: RabbitMQ支持构建集群,多个节点可以组成一个集群。
- 灵活的路由: 消息在进入队列之前会通过交换器进行路由,使得消息能够按照特定的规则进行分发。
2.2 架构模型
模型常用概念详解:
-
Producer:
生产者是消息的发送者。它将消息发送到 RabbitMQ 中,消息被发送到一个 Exchange(交换器)中。生产者并不知道消息将被发送到哪个队列(Queue),只负责将消息发送到指定的交换器。
-
Exchange:
交换器是用来接收生产者发送的消息并根据一定的规则(Binding)将消息路由到一个或多个队列中。交换器有不同的类型,每种类型的路由行为不同:
- Direct Exchange:根据消息的 Routing Key(路由键)精确地将消息路由到指定的队列中。
- Topic Exchange:根据 Routing Key 的模式匹配,将消息路由到符合条件的队列中,支持模糊匹配。
- Fanout Exchange:将消息广播到所有绑定到该交换器的队列中,忽略 Routing Key。
- Headers Exchange:根据消息的 Headers 属性进行路由。
-
Queue:
队列是 RabbitMQ 中存储消息的容器。消息进入队列后,会被消费者(Consumer)从中取出并处理。一个队列可以绑定到多个交换器上,且一个队列可以有多个消费者,但每条消息只能被一个消费者消费。
-
Consumer:
消费者是从队列中接收消息并处理的应用程序或服务。消费者可以主动拉取消息,也可以通过订阅的方式被动接收消息。
-
Message:
消息是生产者发送给 RabbitMQ 的数据载体,通常包括两个部分:
- Payload:消息的实际数据。
- Attributes:消息的元数据,比如 Routing Key、Headers、优先级等。
-
Ack:
消费者处理完消息后,可以发送确认(ACK)给 RabbitMQ,以表示消息已成功处理。未确认的消息会被重新投递。
-
Routing Key:
路由键是生产者发送消息时指定的,用于匹配交换器和队列的绑定规则。不同类型的交换器对路由键的处理方式不同。
3.Centos7 使用Docker 安装RabbitMQ
3.1 拉取镜像
$ docker pull docker.io/rabbitmq
3.2 运行容器
**运行脚本:**restart_service.sh
sedir=`pwd`
datadir=$basedir/data
image=docker.io/rabbitmq
name=rabbitmq
mkdir -p $datadir
docker stop $name
docker rm $name
docker run -d --name=$name --restart always -p 5672:5672 -p 15672:15672 -p 15692:15692 -v $datadir:/var/lib/rabbitmq $image
运行:
$ sh restart_service.sh
3.3 配置RabbitMQ
先进入容器内部:
$ docker exec -it rabbitmq bash
3.3.1 启用网页版后台管理插件
$ rabbitmq-plugins enable rabbitmq_management
此时可以通过浏览器访问管理页面:http://ip地址:15672/
如果是腾讯云之类的话记得开放端口5672和15672
可以通过默认账号 guest/guest 登录
3.4 web管理界面详解
3.4.1 概览
3.4.2 Admin添加用户
Tags选项可以选择不同的角色来创建用户
-
超级管理员(administrator)
可登录管理控制台,可查看所有的信息,并且可以对用户,策略(policy)进行操作。
-
监控者(monitoring)
可登录管理控制台,同时可以查看rabbitmq节点的相关信息(进程数,内存使用情况,磁盘使用情况等)
-
策略制定者(policymaker)
可登录管理控制台, 同时可以对policy进行管理。但无法查看节点的相关信息(上图红框标识的部分)。
-
普通管理者management
仅可登录管理控制台,无法看到节点信息,也无法对策略进行管理
-
其他
无法登录管理控制台,通常就是普通的生产者和消费者。
新建用户xz
目前xz这个用户是没有虚拟机可以用的。接下来新建一个。
3.4.3 新建虚拟机
RabbitMQ添加了虚拟主机(Virtual Hosts)的概念,目的就是为了让各个用户可以互不干扰的工作。其实就是一个独立的访问路径,不同用户使用不同路径,各自有自己的队列、交换机,相互独立。
3.4.4 虚拟机绑定用户
3.4.5 切换到新建用户xz
出现警告
解决方案开启 management_agent 插件
进入容器
$ docker exec -it rabbitmq /bin/bash
切换对应目录
$ cd /etc/rabbitmq/conf.d/
修改management_agent.disable_metrics_collector = false
$ echo management_agent.disable_metrics_collector = false > 20-management_agent.disable_metrics_collector.conf
重启容器
$ docker restart rabbitmq
刷新可以看到更多页面数据了
4.RabbitMQ 四种工作模型实战
4.1 Simple模式
单发单收,消息的消费者监听消息队列,如果队列中有消息,就消费掉,消息被拿走后,自动从队列中删除
生产者端
package main
import (
"github.com/streadway/amqp"
"log"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func main() {
// 连接到 RabbitMQ 服务器
conn, err := amqp.Dial("amqp://xz:[email protected]:5672/xz")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
// 创建一个通道
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
// 声明一个队列
q, err := ch.QueueDeclare(
"test_queue", // 队列名称
false, // 是否持久化
false, // 是否自动删除
false, // 是否为排他性队列
false, // 是否等待服务器返回响应
nil, // 额外参数
)
failOnError(err, "Failed to declare a queue")
// 要发送的消息
body := "Hello RabbitMQ!"
err = ch.Publish(
"", // 交换机名称,空表示默认的交换机
q.Name, // 路由键,这里使用队列名称
false, // 是否等待响应
false, // 是否强制消息路由到队列
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(body),
})
failOnError(err, "Failed to publish a message")
log.Printf(" [x] Sent %s", body)
}
消费者端
package main
import (
"github.com/streadway/amqp"
"log"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func main() {
// 连接到 RabbitMQ 服务器
conn, err := amqp.Dial("amqp://xz:[email protected]:5672/xz")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
// 创建一个通道
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
// 声明一个队列,确保消费者也使用相同的队列
q, err := ch.QueueDeclare(
"test_queue", // 队列名称
false, // 是否持久化
false, // 是否自动删除
false, // 是否为排他性队列
false, // 是否等待服务器返回响应
nil, // 额外参数
)
failOnError(err, "Failed to declare a queue")
// 注册一个消费者来接收消息
msgs, err := ch.Consume(
q.Name, // 队列名称
"", // 消费者标识符
true, // 自动确认
false, // 是否为排他性消费者
false, // 如果服务器没有消息会阻塞消费者
false, // 是否等待服务器返回响应
nil, // 额外参数
)
failOnError(err, "Failed to register a consumer")
// 使用 goroutine 来处理消息
go func() {
for d := range msgs {
log.Printf("Received a message: %s", d.Body)
}
}()
// 阻塞主 goroutine,直到程序终止
log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
select {}
}
运行:
可以先启动消费者端,在启动生产者端,每运行一次生产者端,消费者端就会消费一次
4.2 工作队列 Work Queue
让多个消费者绑定到一个队列,共同消费队列中的消息。队列中的消息一旦消费,就会消失,因此任务是不会被重复执行的。
生产者端
package main
import (
"github.com/streadway/amqp"
"log"
"os"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func main() {
// 连接到 RabbitMQ 服务器
conn, err := amqp.Dial("amqp://user:pwd@IP地址:5672/虚拟机")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
// 创建一个通道
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
// 声明一个持久化队列
q, err := ch.QueueDeclare(
"work_queue", // 队列名称
true, // 是否持久化
false, // 是否自动删除
false, // 是否为排他性队列
false, // 是否等待服务器返回响应
nil, // 额外参数
)
failOnError(err, "Failed to declare a queue")
// 获取要发送的消息内容
body := bodyFrom(os.Args)
// 发布消息到队列
err = ch.Publish(
"", // 默认交换机
q.Name, // 路由键为队列名
false, // 是否等待响应
false, // 是否强制消息路由到队列
amqp.Publishing{
DeliveryMode: amqp.Persistent, // 消息持久化
ContentType: "text/plain",
Body: []byte(body),
})
failOnError(err, "Failed to publish a message")
log.Printf(" [x] Sent %s", body)
}
// 从命令行参数获取消息内容
func bodyFrom(args []string) string {
var s string
if len(args) < 2 || os.Args[1] == "" {
s = "Hello RabbitMQ!"
} else {
s = os.Args[1]
}
return s
}
消费者端
package main
import (
"github.com/streadway/amqp"
"log"
"time"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func main() {
// 连接到 RabbitMQ 服务器
conn, err := amqp.Dial("amqp://user:pwd@IP地址:5672/虚拟机")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
// 创建一个通道
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
// 声明一个持久化队列
q, err := ch.QueueDeclare(
"work_queue", // 队列名称
true, // 是否持久化
false, // 是否自动删除
false, // 是否为排他性队列
false, // 是否等待服务器返回响应
nil, // 额外参数
)
failOnError(err, "Failed to declare a queue")
// 设置每次只分发一个任务给消费者
err = ch.Qos(
1, // 每次预取一个任务
0, // 不限制消息总数
false, // 应用在当前通道
)
failOnError(err, "Failed to set QoS")
// 注册消费者,接收消息
msgs, err := ch.Consume(
q.Name, // 队列名称
"work queue consumer", // 消费者标识符
false, // 手动确认消息
false, // 是否为排他性消费者
false, // 如果服务器没有消息会阻塞消费者
false, // 是否等待服务器返回响应
nil, // 额外参数
)
failOnError(err, "Failed to register a consumer")
// 使用 goroutine 处理消息
go func() {
for d := range msgs {
log.Printf("Received a message: %s", d.Body)
doWork(d.Body)
log.Printf("Done")
d.Ack(false) // 手动确认消息处理完毕
}
}()
log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
select {}
}
// 模拟任务处理,根据消息中的点号数量来延迟处理时间
func doWork(body []byte) {
for _, char := range body {
if char == '.' {
time.Sleep(1 * time.Second)
}
}
}
运行,可以同时启动多个消费者端,不断运行生产者端
4.3 发布订阅Pub/Sub模式
在这种模型中,多了一个 Exchange 角色,而且过程略有变化:
P:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给X (交换机)。
C:消费者,消息的接收者,会一直等待消息到来。
Queue:消息队列,接收消息、缓存消息。
Exchange:交换机(X) ,一方面,接收生产者发送的消息。另一方面,如何处理消息,递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。
Exchange类型:
- Fanout:广播,将消息交给所有绑定到交换机的队列。
- Direct:全值匹配,把消息交给符合指定
routing key
的队列。 - Topic:通配符,与Direct类型类似,但Direct类型要求
routing key
完全相等,而Topic类型是对routing key
进行模糊匹配,比Direct灵活。 - Headers:根据Message的一些头部信息来分发过滤Message,用的比较少。
4.3.1Fanout
生产者端
package main
import (
"github.com/streadway/amqp"
"log"
"os"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func main() {
exchangeName := "fanout exchange"
// 连接到 RabbitMQ 服务器
conn, err := amqp.Dial("amqp://xz:[email protected]:5672/xz")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
// 创建一个通道
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
// 声明一个交换机,类型为fanout
err = ch.ExchangeDeclare(
exchangeName, // 交换机名称
"fanout", // 交换机类型
true, // 是否持久化
false, // 是否自动删除
false, // 是否为内置交换机
false, // 是否等待服务器返回响应
nil, // 额外参数
)
failOnError(err, "Failed to declare an exchange")
// 获取要发送的消息内容
body := bodyFrom(os.Args)
// 发布消息到交换机
err = ch.Publish(
exchangeName, // 交换机名称
"", // 路由键为空,fanout模式忽略
false, // 是否等待响应
false, // 是否强制消息路由到队列
amqp.Publishing{
DeliveryMode: amqp.Persistent, // 消息持久化
ContentType: "text/plain",
Body: []byte(body),
})
failOnError(err, "Failed to publish a message")
log.Printf(" [x] Sent %s", body)
}
// 从命令行参数获取消息内容
func bodyFrom(args []string) string {
var s string
if len(args) < 2 || os.Args[1] == "" {
s = "info: Hello RabbitMQ!"
} else {
s = os.Args[1]
}
return s
}
消费者端
package main
import (
"github.com/streadway/amqp"
"log"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func main() {
exchangeName := "fanout exchange"
// 连接到 RabbitMQ 服务器
conn, err := amqp.Dial("amqp://xz:[email protected]:5672/xz")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
// 创建一个通道
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
// 声明交换机,类型为fanout,确保消费者能找到交换机
err = ch.ExchangeDeclare(
exchangeName, // 交换机名称
"fanout", // 交换机类型
true, // 是否持久化
false, // 是否自动删除
false, // 是否为内置交换机
false, // 是否等待服务器返回响应
nil, // 额外参数
)
failOnError(err, "Failed to declare an exchange")
// 声明一个随机名称的临时队列,用于接收交换机的消息
q, err := ch.QueueDeclare(
"", // 队列名称留空,RabbitMQ 会生成一个随机名称
false, // 是否持久化
false, // 是否自动删除
true, // 是否为排他性队列
false, // 是否等待服务器返回响应
nil, // 额外参数
)
failOnError(err, "Failed to declare a queue")
// 绑定队列到交换机
err = ch.QueueBind(
q.Name, // 队列名称
"", // 路由键为空,fanout模式忽略
exchangeName, // 交换机名称
false, // 是否等待服务器返回响应
nil, // 额外参数
)
failOnError(err, "Failed to bind a queue")
// 注册消费者,接收消息
msgs, err := ch.Consume(
q.Name, // 队列名称
"", // 消费者标识符
true, // 自动确认消息
false, // 是否为排他性消费者
false, // 如果服务器没有消息会阻塞消费者
false, // 是否等待服务器返回响应
nil, // 额外参数
)
failOnError(err, "Failed to register a consumer")
// 使用 goroutine 处理消息
go func() {
for d := range msgs {
log.Printf("Received a message: %s", d.Body)
}
}()
log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
select {}
}
4.3.2 Direct
生产者端
package main
import (
"github.com/streadway/amqp"
"log"
"os"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func main() {
exchangeName := "direct exchange"
// 连接到 RabbitMQ 服务器
conn, err := amqp.Dial("amqp://xz:[email protected]:5672/xz")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
// 创建一个通道
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
// 声明一个交换机,类型为direct
err = ch.ExchangeDeclare(
exchangeName, // 交换机名称
"direct", // 交换机类型
true, // 是否持久化
false, // 是否自动删除
false, // 是否为内置交换机
false, // 是否等待服务器返回响应
nil, // 额外参数
)
failOnError(err, "Failed to declare an exchange")
// 获取要发送的消息内容和路由键
body := bodyFrom(os.Args)
severity := severityFrom(os.Args)
// 发布消息到交换机
err = ch.Publish(
exchangeName, // 交换机名称
severity, // 路由键,用于消息路由
false, // 是否等待响应
false, // 是否强制消息路由到队列
amqp.Publishing{
DeliveryMode: amqp.Persistent, // 消息持久化
ContentType: "text/plain",
Body: []byte(body),
})
failOnError(err, "Failed to publish a message")
log.Printf(" [x] Sent %s: %s", severity, body)
}
// 从命令行参数获取消息内容
func bodyFrom(args []string) string {
if len(args) < 3 || os.Args[2] == "" {
return "Hello RabbitMQ!"
}
return os.Args[2]
}
// 从命令行参数获取消息的路由键
func severityFrom(args []string) string {
if len(args) < 2 || os.Args[1] == "" {
return "info"
}
return os.Args[1]
}
消费者端
package main
import (
"github.com/streadway/amqp"
"log"
"os"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func main() {
exchangeName := "direct exchange"
// 连接到 RabbitMQ 服务器
conn, err := amqp.Dial("amqp://xz:[email protected]:5672/xz")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
// 创建一个通道
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
// 声明交换机,类型为direct,确保消费者能找到交换机
err = ch.ExchangeDeclare(
exchangeName, // 交换机名称
"direct", // 交换机类型
true, // 是否持久化
false, // 是否自动删除
false, // 是否为内置交换机
false, // 是否等待服务器返回响应
nil, // 额外参数
)
failOnError(err, "Failed to declare an exchange")
// 声明一个随机名称的临时队列,用于接收交换机的消息
q, err := ch.QueueDeclare(
"", // 队列名称留空,RabbitMQ 会生成一个随机名称
false, // 是否持久化
false, // 是否自动删除
true, // 是否为排他性队列
false, // 是否等待服务器返回响应
nil, // 额外参数
)
failOnError(err, "Failed to declare a queue")
// 从命令行参数获取要绑定的路由键(日志级别)
severity := severityFrom(os.Args)
// 绑定队列到交换机,使用指定的路由键
err = ch.QueueBind(
q.Name, // 队列名称
severity, // 路由键(日志级别)
exchangeName, // 交换机名称
false, // 是否等待服务器返回响应
nil, // 额外参数
)
failOnError(err, "Failed to bind a queue")
// 注册消费者,接收消息
msgs, err := ch.Consume(
q.Name, // 队列名称
"", // 消费者标识符
true, // 自动确认消息
false, // 是否为排他性消费者
false, // 如果服务器没有消息会阻塞消费者
false, // 是否等待服务器返回响应
nil, // 额外参数
)
failOnError(err, "Failed to register a consumer")
// 使用 goroutine 处理消息
go func() {
for d := range msgs {
log.Printf("Received a message: %s", d.Body)
}
}()
log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
select {}
}
// 从命令行参数获取路由键
func severityFrom(args []string) string {
if len(args) < 2 || os.Args[1] == "" {
return "info"
}
return os.Args[1]
}
4.3.4 Topic
生产者端
package main
import (
"github.com/streadway/amqp"
"log"
"os"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func main() {
exchangeName := "topic exchange"
// 连接到 RabbitMQ 服务器
conn, err := amqp.Dial("amqp://xz:[email protected]:5672/xz")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
// 创建一个通道
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
// 声明一个交换机,类型为topic
err = ch.ExchangeDeclare(
exchangeName, // 交换机名称
"topic", // 交换机类型
true, // 是否持久化
false, // 是否自动删除
false, // 是否为内置交换机
false, // 是否等待服务器返回响应
nil, // 额外参数
)
failOnError(err, "Failed to declare an exchange")
// 获取要发送的消息内容和路由键
body := bodyFrom(os.Args)
routingKey := routingKeyFrom(os.Args)
// 发布消息到交换机
err = ch.Publish(
exchangeName, // 交换机名称
routingKey, // 路由键,用于消息路由
false, // 是否等待响应
false, // 是否强制消息路由到队列
amqp.Publishing{
DeliveryMode: amqp.Persistent, // 消息持久化
ContentType: "text/plain",
Body: []byte(body),
})
failOnError(err, "Failed to publish a message")
log.Printf(" [x] Sent %s: %s", routingKey, body)
}
// 从命令行参数获取消息内容
func bodyFrom(args []string) string {
if len(args) < 3 || os.Args[2] == "" {
return "Hello RabbitMQ!"
}
return os.Args[2]
}
// 从命令行参数获取消息的路由键
func routingKeyFrom(args []string) string {
if len(args) < 2 || os.Args[1] == "" {
return "anonymous.info"
}
return os.Args[1]
}
消费者端
package main
import (
"github.com/streadway/amqp"
"log"
"os"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func main() {
exchangeName := "topic exchange"
// 连接到 RabbitMQ 服务器
conn, err := amqp.Dial("amqp://xz:[email protected]:5672/xz")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
// 创建一个通道
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
// 声明交换机,类型为topic,确保消费者能找到交换机
err = ch.ExchangeDeclare(
exchangeName, // 交换机名称
"topic", // 交换机类型
true, // 是否持久化
false, // 是否自动删除
false, // 是否为内置交换机
false, // 是否等待服务器返回响应
nil, // 额外参数
)
failOnError(err, "Failed to declare an exchange")
// 声明一个随机名称的临时队列,用于接收交换机的消息
q, err := ch.QueueDeclare(
"", // 队列名称留空,RabbitMQ 会生成一个随机名称
false, // 是否持久化
false, // 是否自动删除
true, // 是否为排他性队列
false, // 是否等待服务器返回响应
nil, // 额外参数
)
failOnError(err, "Failed to declare a queue")
// 从命令行参数获取要绑定的路由键(日志类别)
bindingKey := bindingKeyFrom(os.Args)
// 绑定队列到交换机,使用指定的路由键
err = ch.QueueBind(
q.Name, // 队列名称
bindingKey, // 路由键(日志类别)
exchangeName, // 交换机名称
false, // 是否等待服务器返回响应
nil, // 额外参数
)
failOnError(err, "Failed to bind a queue")
// 注册消费者,接收消息
msgs, err := ch.Consume(
q.Name, // 队列名称
"", // 消费者标识符
true, // 自动确认消息
false, // 是否为排他性消费者
false, // 如果服务器没有消息会阻塞消费者
false, // 是否等待服务器返回响应
nil, // 额外参数
)
failOnError(err, "Failed to register a consumer")
// 使用 goroutine 处理消息
go func() {
for d := range msgs {
log.Printf("Received a message: %s", d.Body)
}
}()
log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
select {}
}
// 从命令行参数获取路由键
func bindingKeyFrom(args []string) string {
if len(args) < 2 || os.Args[1] == "" {
return "#"
}
return os.Args[1]
}
运行
启动多个消费者端,观察控制台的打印
go run consumer.go "kern.*" # 只接收与内核相关的日志
go run consumer.go "*.critical" # 只接收严重级别的日志
go run consumer.go "#" # 接收所有日志
发送消息
go run producer.go "kern.critical" "Kernel panic - critical error"
go run producer.go "app.info" "App started successfully"
go run producer.go "app.error" "App failed to start"
5.保证消息可靠性机制详解
需要保证消息不能丢失,消息从生产者生产出来一直到消费者消费成功,这条链路是这样的:
将从以下几个方面来保证消息的可靠性。
5.1 消息持久化
- 队列持久化:在声明队列时,将
durable
参数设置为true
,使得队列在 RabbitMQ 重启后仍然存在。 - 消息持久化:在发送消息时,将
delivery_mode
设置为2
,这样消息会被写入磁盘,即使 RabbitMQ 崩溃或重启,消息也不会丢失。
5.2 确认机制
消息的可靠投递分为了两大内容:发送端的确认和消费端的确认。
5.2.1 发送端
发送端的消息可靠性投递:confirm 确认模式和return 退回模式。
**confirm 确认模式:**消息从 producer 到达 exchange 则会给 producer 发送一个应答,我们需要开启confirm模式,才能接收到这条应答。开启方式是将Channel.Confirm(noWait bool)
参数设置为false
,表示同意发送者将当前channel信道设置为confirm模式。
**return 退回模式:**消息从 exchange–>queue 投递失败,会将消息退回给producer。
5.2.2 消费端
消息从Queue发送到消费端之后,消费端会发送一个确认消息:Consumer Ack
,有两种确认方式:自动确认和手动确认。
**自动确认:**当消息一旦被Consumer接收到,则自动确认收到,并将相应 message 从 RabbitMQ 的消息缓存中移除。
**手动确认:**在实际业务处理中,很可能消息接收到,业务处理出现异常,那么该消息就会丢失。如果设置了手动确认方式,则需要在业务处理成功后,调用ch.Ack(false)
,手动确认,如果出现异常,则调用d.Reject(true)
让其自动重新发送消息。
5.3 事务机制
RabbitMQ 支持 AMQP 事务机制,允许生产者将消息发送操作包裹在一个事务中。如果事务提交成功,则消息被投递;如果事务回滚,则消息不会被投递。但是这种方式比较消耗性能,实际场景中使用比较少。
5.4 消息重发机制
消息重发:如果生产者未收到确认,可以重发消息。
死信队列(Dead-Letter Exchange, DLX):消息在多次重发失败后,可以被投递到一个死信交换器(DLX),由专门的消费者进行处理。
5.5 优先级队列
当有多个消息等待被投递时,优先级队列可以确保高优先级的消息先被处理,从而提高重要消息的可靠性。
5.6 集群模式/镜像队列
通过 RabbitMQ 的镜像队列(集群模式),消息可以在多个节点间复制,确保在节点故障时,消息仍然可用。
6.RabbitMQ 高级特性
6.1 延时队列&死信队列
通过设置消息的过期时间到TTL这种方式来讲解
当消息没有配置消费者,消息就一直停留在队列中,停留时间超过存活时间后,消息就会被自动移动到死信交换机
6.1.1 rabbitmq_delayed_message_exchange 插件安装
插件下载地址:rabbitmq-delayed-message-exchange
将下载好的插件上传到linux系统中,拷贝到容器内部
$ docker cp rabbitmq_delayed_message_exchange-3.13.0.ez rabbitmq:/plugins
执行
$ rabbitmq-plugins enable rabbitmq_delayed_message_exchange
刷新界面 在add exchange就能看到x-delayed-message
这个选项
6.1.2 插件使用
生产者的实现很简单,只需要在消息的header
中加入"x-delay"
字段并用其值表示消息的 TTL,最后将其发送到延迟队列即可。
生产者
package main
import (
"github.com/streadway/amqp"
"log"
"time"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func main() {
conn, err := amqp.Dial("amqp://xz:[email protected]:5672/xz")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
var (
exchange = "x-delayed-message"
queue = "delay_queue"
routingKey = "log_delay"
body string
)
// 申请交换机
err = ch.ExchangeDeclare(exchange, exchange, true, false, false, false, amqp.Table{
"x-delayed-type": "direct",
})
failOnError(err, "交换机申请失败!")
err = ch.QueueBind(queue, routingKey, exchange, false, nil)
failOnError(err, "绑定交换机失败!")
body = "==========10000=================" + time.Now().Local().Format("2006-01-02 15:04:05")
// 将消息发送到延时队列上
err = ch.Publish(
exchange,
routingKey,
false,
false,
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(body),
Headers: map[string]interface{}{
"x-delay": "10000", // 消息从交换机过期时间,毫秒(x-dead-message插件提供)
},
})
failOnError(err, "Failed to publish a message")
log.Printf(" [x] Sent %s", body)
body = "==========20000=================" + time.Now().Local().Format("2006-01-02 15:04:05")
// 将消息发送到延时队列上
err = ch.Publish(
exchange,
routingKey,
false,
false,
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(body),
Headers: map[string]interface{}{
"x-delay": "20000", // 消息从交换机过期时间,毫秒(x-dead-message插件提供)
},
})
failOnError(err, "Failed to publish a message")
log.Printf(" [x] Sent %s", body)
body = "==========5000=================" + time.Now().Local().Format("2006-01-02 15:04:05")
// 将消息发送到延时队列上
err = ch.Publish(
exchange,
routingKey,
false,
false,
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(body),
Headers: map[string]interface{}{
"x-delay": "5000", // 消息从交换机过期时间,毫秒(x-dead-message插件提供)
},
})
failOnError(err, "Failed to publish a message")
log.Printf(" [x] Sent %s", body)
}
消费者
package main
import (
"github.com/streadway/amqp"
"log"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func main() {
// 建立链接
conn, err := amqp.Dial("amqp://xz:[email protected]:5672/xz")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
var (
exchange = "x-delayed-message"
queue = "delay_queue"
routingKey = "log_delay"
)
// 申请交换机
err = ch.ExchangeDeclare(
exchange,
exchange,
true,
false,
false,
false,
amqp.Table{
"x-delayed-type": "direct",
})
failOnError(err, "交换机申请失败!")
// 声明一个常规的队列, 其实这个也没必要声明,因为 exchange 会默认绑定一个队列
q, err := ch.QueueDeclare(
queue,
true,
true,
false,
false,
nil,
)
failOnError(err, "Failed to declare a queue")
err = ch.QueueBind(
q.Name,
routingKey,
exchange,
false,
nil)
failOnError(err, "Failed to bind a queue")
// 这里监听的是 test_logs
msgs, err := ch.Consume(
q.Name,
"",
true,
false,
false,
false,
nil,
)
failOnError(err, "Failed to register a consumer")
forever := make(chan bool)
go func() {
for d := range msgs {
log.Printf("接收数据 [x] %s", d.Body)
}
}()
log.Printf(" [*] Waiting for logs. To exit press CTRL+C")
<-forever
}
运行
生产者
2024/08/22 09:06:39 [x] Sent ==========10000=================2024-08-22 09:06:39
2024/08/22 09:06:39 [x] Sent ==========20000=================2024-08-22 09:06:39
2024/08/22 09:06:39 [x] Sent ==========5000=================2024-08-22 09:06:39
消费者
2024/08/22 09:06:44 接收数据 [x] ==========5000=================2024-08-22 09:06:39
2024/08/22 09:06:49 接收数据 [x] ==========10000=================2024-08-22 09:06:39
2024/08/22 09:06:59 接收数据 [x] ==========20000=================2024-08-22 09:06:39
6.2 优先级队列
在实现 RabbitMQ 优先级队列时,你需要为队列设置
x-max-priority
参数,指定一个优先级范围,通常建议在 0 到 10 之间,这个值表示队列中消息的最高优先级。当生产者发送消息时,需要设置
priority
属性,建议不要超过你设置的最大优先级值。如果超过了这个范围,设置的优先级将不再生效。在指定范围内,数字越大,优先级越高。优先级队列的处理场景主要适用于生产者的消息产生速度快于消费者的处理速度。当队列中有消息堆积时,优先级策略才会发挥作用,通过优先调度高优先级的消息,提高处理效率。如果消费者消费速度更快或等于生产速度,则优先级队列的作用不明显。
生产者
package main
import (
"fmt"
"github.com/streadway/amqp"
"log"
"strconv"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s:%s", msg, err)
panic(fmt.Sprintf("%s:%s", msg, err))
}
}
func main() {
conn, err := amqp.Dial("amqp://xz:[email protected]:5672/xz")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
var args amqp.Table
args = amqp.Table{"x-max-priority": int32(10)}
q, err := ch.QueueDeclare(
"priqueue",
true,
false,
false,
false,
args,
)
failOnError(err, "Failed to declare q queue")
// 生产者一次性创建6个消息,其中奇数优先级为2,偶数优先级为1,并阻塞到RabbitMQ上面,先不启动消费者 或则一下子就消费了 体现不出来优先级
for i := 0; i < 6; i++ {
body := "hello rabbitmq"
body += strconv.Itoa(i)
pri := i%2 + 1
err = ch.Publish(
"",
q.Name,
false,
false,
amqp.Publishing{
ContentType: "text/plain",
Priority: uint8(pri),
Body: []byte(body),
})
failOnError(err, "Failed to publish a message")
log.Printf(" [x] Sent %s", body)
}
return
}
消费者
package main
import (
"fmt"
"github.com/streadway/amqp"
"log"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s:%s", msg, err)
panic(fmt.Sprintf("%s:%s", msg, err))
}
}
func main() {
conn, err := amqp.Dial("amqp://xz:[email protected]:5672/xz")
failOnError(err, "Failed to connect to server")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to connect to channel")
defer ch.Close()
var args amqp.Table
args = amqp.Table{"x-max-priority": int32(10)}
q, err := ch.QueueDeclare(
"priqueue",
true,
false,
false,
false,
args,
)
failOnError(err, "Failed to declare a queue")
err = ch.Qos(
1,
0,
false,
)
failOnError(err, "Failed to set QoS")
msgs, err := ch.Consume(
q.Name,
"",
true,
false,
false,
false,
nil,
)
failOnError(err, "Failed to register a consumer")
forever := make(chan bool)
go func() {
for d := range msgs {
log.Printf("接收数据 [x] %s", d.Body)
}
}()
log.Printf(" [*] Waiting for logs. To exit press CTRL+C")
<-forever
}
运行 先运行生产者阻塞住 在运行消费者
优先级队列会优先处理优先级为 2 的消息,之后再处理优先级为 1 的消息。对于具有相同优先级的消息,队列则按照先进先出(FIFO)的顺序进行消费。这种机制确保了高优先级的消息能被优先处理,而同优先级的消息则保持原有的发送顺序。
2024/08/22 10:05:44 接收数据 [x] hello rabbitmq1
2024/08/22 10:05:44 接收数据 [x] hello rabbitmq3
2024/08/22 10:05:44 接收数据 [x] hello rabbitmq5
2024/08/22 10:05:44 接收数据 [x] hello rabbitmq0
2024/08/22 10:05:44 接收数据 [x] hello rabbitmq2
2024/08/22 10:05:44 接收数据 [x] hello rabbitmq4
6.3 监控和告警
使用 Prometheus 方式
6.3.1 RabbitMq 启动 prometheus监控插件
rabbitmq-plugins enable rabbitmq_prometheus
启动之后可以看到开放了15692端口
验证
http://yourIP:15692/metrics
小坑
我这边遇到访问不通的原因有两个:
1.用的腾讯云 必须在防火墙开发15692端口
2.在用docker启动的时候必须做端口映射 -p 15692:15692
6.3.2 prometheus 安装
6.3.2.1 下载安装
上传到linux系统中,解压
tar xvfz prometheus-2.54.0.linux-amd64.tar.gz
# 改名 名字太长了(可选)
mv prometheus-2.54.0.linux-amd64 prometheus
6.3.2.2 修改配置
# 进入目录
cd prometheus/
# 修改配置
vim prometheus.yml
修改配置
scrape_configs:
# The job name is added as a label `job=<job_name>` to any timeseries scraped from this config.
# 监控的表示名
- job_name: "rabbitmq"
# metrics_path defaults to '/metrics'
# scheme defaults to 'http'.
static_configs:
- targets: ["localhost:15692"] # rabbitmq部署路径
6.3.2.3 启动
./prometheus
也可以设置成一个服务 可以直接后台启动
# 创建数据目录
$ mkdir -p /data/prometheus/prometheus/data
# 创建用户并授权
$ useradd prometheus
$ chown -R prometheus:prometheus /usr/local/prometheus /data/prometheus
# 添加服务
$ vim /usr/lib/systemd/system/prometheus.service
服务文件内容
[Unit]
Description= Prometheus
After=network.target
[Service]
Type=simple
User=prometheus
ExecStart=/usr/local/prometheus/prometheus/prometheus --config.file=/usr/local/prometheus/prometheus/prometheus.yml --storage.tsdb.path=/data/prometheus/prometheus/data
ExecReload=/bin/kill -HUP $MAINPID
Restart=on-failure
[Install]
WantedBy=multi-user.target
启动
# 启动
$ systemctl start prometheus.service
# 查看状态
$ systemctl status prometheus.service
验证
访问http://ip:9090/targets?search=
是否有rabbitmq
6.3.3 Grafana 安装
安装
$ yum install -y https://dl.grafana.com/enterprise/release/grafana-enterprise-11.1.4-1.x86_64.rpm
启动服务
$ systemctl start grafana-server
$ systemctl status grafana-server
访问
http://ip:3000
账号密码admin/admin 设置新密码
添加 data source
修改 prometheus 地址即可 然后保存
下载grafana 模版
地址 选择一个自己喜欢的样式下载就行
导入模版
Dashboards-New-import
将刚才下载的json上传
展示
6.3.4 配置邮箱告警通知
邮箱开启stmp 获取授权码
修改grafana配置
vim /etc/grafana/grafana.ini
修改配置,保存重启
[smtp]
enabled = true
host = smtp.163.com:465
# 你的邮箱地址
user [email protected]
# 获得的授权码
password =xxxxx
# 你的邮箱地址
from_address = [email protected]
from_name = Grafana
新增联络点
点击测试回发送一份测试邮箱到你填写的邮件
新增告警配置
看具体需要监测cpu、内存还是什么指标添加告警规则即可。
这份《RabbitMQ从入门到实战》学习笔记到这里就告一段落了。作为一名Go后端工程师,我在项目中经常使用RabbitMQ,这些笔记是我在自学过程中整理出来的。由于我并非一名运维工程师,因此最后章节关于告警的内容可能讲得不够详尽,如果你对此感兴趣,可以查阅相关的深度文章。另外,如果笔记中有任何错误或不足之处,欢迎指正,非常感谢大家的理解与包容。
标签:实战,ch,false,err,队列,RabbitMQ,golang,failOnError,消息 From: https://blog.csdn.net/luozong2689/article/details/141439507