介绍
RabbitMQ是一个开源的消息代理软件,支持多种消息协议。它允许不同的应用程序通过消息队列进行通信,促进了系统之间的解耦和异步处理。
1. 解耦
解耦是指将系统中的不同组件分离,使它们可以独立开发和部署。RabbitMQ通过消息队列实现了解耦,生产者和消费者不需要直接知道彼此的存在。
2. 提速
RabbitMQ可以通过异步处理来提高系统的响应速度。生产者可以将消息发送到队列中,而消费者可以在后台处理这些消息,从而提高整体性能。
3. 削峰
削峰是指在高负载时通过消息队列平衡请求。RabbitMQ可以将突发的请求分散到一段时间内处理,避免系统过载。
4. 分发
RabbitMQ支持多种消息分发模式,包括点对点和发布/订阅。可以根据需求选择合适的模式来实现消息的分发。
RabbitMQ安装
docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:management
打开浏览器,访问:http://localhost:15672,默认用户名和密码为guest
。
示例
1. 环境准备
go get github.com/streadway/amqp
2. 生产者代码
package main
import (
"log"
"github.com/streadway/amqp"
)
func main() {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
if err != nil {
log.Fatalf("Failed to connect to RabbitMQ: %s", err)
}
defer conn.Close()
ch, err := conn.Channel()
if err != nil {
log.Fatalf("Failed to open a channel: %s", err)
}
defer ch.Close()
q, err := ch.QueueDeclare(
"task_queue", // 队列名称
true, // 持久化
false, // 排他
false, // 自动删除
false, // 阻塞
nil, // 额外属性
)
if err != nil {
log.Fatalf("Failed to declare a queue: %s", err)
}
for i := 0; i < 10; i++ {
body := "Task " + strconv.Itoa(i)
err = ch.Publish(
"", // 默认交换机
q.Name, // 队列名称
false, // 强制发送
false, // 立即发送
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(body),
})
if err != nil {
log.Fatalf("Failed to publish a message: %s", err)
}
log.Printf("Sent %s", body)
}
}
3. 消费者代码
package main
import (
"log"
"time"
"github.com/streadway/amqp"
)
func main() {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
if err != nil {
log.Fatalf("Failed to connect to RabbitMQ: %s", err)
}
defer conn.Close()
ch, err := conn.Channel()
if err != nil {
log.Fatalf("Failed to open a channel: %s", err)
}
defer ch.Close()
q, err := ch.QueueDeclare(
"task_queue", // 队列名称
true, // 持久化
false, // 排他
false, // 自动删除
false, // 阻塞
nil, // 额外属性
)
if err != nil {
log.Fatalf("Failed to declare a queue: %s", err)
}
msgs, err := ch.Consume(
q.Name, // 队列名称
"", // 消费者名称
false, // 自动确认
false, // 排他
false, // 阻塞
false, // 优先
nil, // 额外属性
)
if err != nil {
log.Fatalf("Failed to register a consumer: %s", err)
}
go func() {
for d := range msgs {
log.Printf("Received a message: %s", d.Body)
time.Sleep(2 * time.Second) // 模拟处理时间
log.Printf("Done processing message: %s", d.Body)
d.Ack(false) // 手动确认
}
}()
log.Println("Waiting for messages. To exit press CTRL+C")
select {}
}
运行示例
- 启动RabbitMQ服务。
- 运行消费者程序。
- 运行生产者程序。
简单模式下发布者发布消息,消费者消费消息
环境准备
docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:management
go get github.com/streadway/amqp
示例
1. 生产者代码
package main
import (
"log"
"os"
"github.com/streadway/amqp"
)
func main() {
// 连接到RabbitMQ
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
if err != nil {
log.Fatalf("Failed to connect to RabbitMQ: %s", err)
}
defer conn.Close()
// 创建通道
ch, err := conn.Channel()
if err != nil {
log.Fatalf("Failed to open a channel: %s", err)
}
defer ch.Close()
// 声明队列
q, err := ch.QueueDeclare(
"simple_queue", // 队列名称
false, // 是否持久化
false, // 是否排他
false, // 是否自动删除
false, // 是否阻塞
nil, // 额外属性
)
if err != nil {
log.Fatalf("Failed to declare a queue: %s", err)
}
// 发送消息
body := "Hello, RabbitMQ!"
err = ch.Publish(
"", // 默认交换机
q.Name, // 队列名称
false, // 强制发送
false, // 立即发送
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(body),
})
if err != nil {
log.Fatalf("Failed to publish a message: %s", err)
}
log.Printf("Sent %s", body)
}
2. 消费者代码
package main
import (
"log"
"github.com/streadway/amqp"
)
func main() {
// 连接到RabbitMQ
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
if err != nil {
log.Fatalf("Failed to connect to RabbitMQ: %s", err)
}
defer conn.Close()
// 创建通道
ch, err := conn.Channel()
if err != nil {
log.Fatalf("Failed to open a channel: %s", err)
}
defer ch.Close()
// 声明队列
q, err := ch.QueueDeclare(
"simple_queue", // 队列名称
false, // 是否持久化
false, // 是否排他
false, // 是否自动删除
false, // 是否阻塞
nil, // 额外属性
)
if err != nil {
log.Fatalf("Failed to declare a queue: %s", err)
}
// 消费消息
msgs, err := ch.Consume(
q.Name, // 队列名称
"", // 消费者名称
true, // 自动确认
false, // 排他
false, // 阻塞
false, // 优先
nil, // 额外属性
)
if err != nil {
log.Fatalf("Failed to register a consumer: %s", err)
}
// 处理消息
go func() {
for d := range msgs {
log.Printf("Received a message: %s", d.Body)
}
}()
log.Println("Waiting for messages. To exit press CTRL+C")
select {}
}
运行示例
- 启动RabbitMQ服务。
- 先运行消费者程序,确保它在等待消息。
- 然后运行生产者程序,发送消息。
工作模式下发送消费消息手动确认信息
环境准备
确保你已经安装了RabbitMQ并且可以访问管理界面。如果你还没有安装,可以使用以下Docker命令:
docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:management
go get github.com/streadway/amqp
代码示例
1. 生产者代码
package main
import (
"log"
"strconv"
"github.com/streadway/amqp"
)
func main() {
// 连接到RabbitMQ
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
if err != nil {
log.Fatalf("Failed to connect to RabbitMQ: %s", err)
}
defer conn.Close()
// 创建通道
ch, err := conn.Channel()
if err != nil {
log.Fatalf("Failed to open a channel: %s", err)
}
defer ch.Close()
// 声明队列
q, err := ch.QueueDeclare(
"work_queue", // 队列名称
true, // 是否持久化
false, // 是否排他
false, // 是否自动删除
false, // 是否阻塞
nil, // 额外属性
)
if err != nil {
log.Fatalf("Failed to declare a queue: %s", err)
}
// 发送消息
for i := 0; i < 10; i++ {
body := "Task " + strconv.Itoa(i)
err = ch.Publish(
"", // 默认交换机
q.Name, // 队列名称
false, // 强制发送
false, // 立即发送
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(body),
})
if err != nil {
log.Fatalf("Failed to publish a message: %s", err)
}
log.Printf("Sent %s", body)
}
}
2. 消费者代码
package main
import (
"log"
"time"
"github.com/streadway/amqp"
)
func main() {
// 连接到RabbitMQ
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
if err != nil {
log.Fatalf("Failed to connect to RabbitMQ: %s", err)
}
defer conn.Close()
// 创建通道
ch, err := conn.Channel()
if err != nil {
log.Fatalf("Failed to open a channel: %s", err)
}
defer ch.Close()
// 声明队列
q, err := ch.QueueDeclare(
"work_queue", // 队列名称
true, // 是否持久化
false, // 是否排他
false, // 是否自动删除
false, // 是否阻塞
nil, // 额外属性
)
if err != nil {
log.Fatalf("Failed to declare a queue: %s", err)
}
// 消费消息
msgs, err := ch.Consume(
q.Name, // 队列名称
"", // 消费者名称
false, // 自动确认
false, // 排他
false, // 阻塞
false, // 优先
nil, // 额外属性
)
if err != nil {
log.Fatalf("Failed to register a consumer: %s", err)
}
// 处理消息
go func() {
for d := range msgs {
log.Printf("Received a message: %s", d.Body)
// 模拟处理时间
time.Sleep(2 * time.Second)
log.Printf("Done processing message: %s", d.Body)
// 手动确认消息
d.Ack(false)
}
}()
log.Println("Waiting for messages. To exit press CTRL+C")
select {}
}
运行示例
- 启动RabbitMQ服务。
- 先运行消费者程序,确保它在等待消息。
- 然后运行生产者程序,发送消息。
Publist、Subscribe发布订阅模式下发送消费消息获取运行程序传递的参数args
环境准备
docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:management
go get github.com/streadway/amqp
代码示例
1. 发布者代码
package main
import (
"log"
"os"
"github.com/streadway/amqp"
)
func main() {
// 连接到RabbitMQ
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
if err != nil {
log.Fatalf("Failed to connect to RabbitMQ: %s", err)
}
defer conn.Close()
// 创建通道
ch, err := conn.Channel()
if err != nil {
log.Fatalf("Failed to open a channel: %s", err)
}
defer ch.Close()
// 声明交换机
err = ch.ExchangeDeclare(
"logs", // 交换机名称
"fanout", // 交换机类型
true, // 是否持久化
false, // 是否排他
false, // 是否自动删除
false, // 是否阻塞
nil, // 额外属性
)
if err != nil {
log.Fatalf("Failed to declare an exchange: %s", err)
}
// 从命令行获取消息内容
body := "Hello, RabbitMQ!"
if len(os.Args) > 1 {
body = os.Args[1]
}
// 发布消息
err = ch.Publish(
"logs", // 交换机名称
"", // 路由键
false, // 强制发送
false, // 立即发送
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(body),
})
if err != nil {
log.Fatalf("Failed to publish a message: %s", err)
}
log.Printf("Sent %s", body)
}
2. 消费者代码
package main
import (
"log"
"github.com/streadway/amqp"
)
func main() {
// 连接到RabbitMQ
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
if err != nil {
log.Fatalf("Failed to connect to RabbitMQ: %s", err)
}
defer conn.Close()
// 创建通道
ch, err := conn.Channel()
if err != nil {
log.Fatalf("Failed to open a channel: %s", err)
}
defer ch.Close()
// 声明交换机
err = ch.ExchangeDeclare(
"logs", // 交换机名称
"fanout", // 交换机类型
true, // 是否持久化
false, // 是否排他
false, // 是否自动删除
false, // 是否阻塞
nil, // 额外属性
)
if err != nil {
log.Fatalf("Failed to declare an exchange: %s", err)
}
// 声明队列
q, err := ch.QueueDeclare(
"", // 随机队列名称
false, // 是否持久化
false, // 是否排他
true, // 是否自动删除
false, // 是否阻塞
nil, // 额外属性
)
if err != nil {
log.Fatalf("Failed to declare a queue: %s", err)
}
// 绑定队列到交换机
err = ch.QueueBind(
q.Name, // 队列名称
"", // 路由键
"logs", // 交换机名称
false,
nil)
if err != nil {
log.Fatalf("Failed to bind a queue: %s", err)
}
// 消费消息
msgs, err := ch.Consume(
q.Name, // 队列名称
"", // 消费者名称
true, // 自动确认
false, // 排他
false, // 阻塞
false, // 优先
nil, // 额外属性
)
if err != nil {
log.Fatalf("Failed to register a consumer: %s", err)
}
// 处理消息
go func() {
for d := range msgs {
log.Printf("Received a message: %s", d.Body)
}
}()
log.Println("Waiting for messages. To exit press CTRL+C")
select {}
}
运行示例
- 启动RabbitMQ服务。
- 先运行消费者程序,确保它在等待消息。
- 然后运行发布者程序,传递要发送的消息。例如:
go run publisher.go "Hello, World!"
路由模式下发送消费消息
环境准备
docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:management
go get github.com/streadway/amqp
代码示例
1. 生产者代码
package main
import (
"log"
"os"
"github.com/streadway/amqp"
)
func main() {
// 连接到RabbitMQ
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
if err != nil {
log.Fatalf("Failed to connect to RabbitMQ: %s", err)
}
defer conn.Close()
// 创建通道
ch, err := conn.Channel()
if err != nil {
log.Fatalf("Failed to open a channel: %s", err)
}
defer ch.Close()
// 声明交换机
err = ch.ExchangeDeclare(
"direct_logs", // 交换机名称
"direct", // 交换机类型
true, // 是否持久化
false, // 是否排他
false, // 是否自动删除
false, // 是否阻塞
nil, // 额外属性
)
if err != nil {
log.Fatalf("Failed to declare an exchange: %s", err)
}
// 从命令行获取路由键和消息内容
if len(os.Args) < 3 {
log.Fatalf("Usage: %s <severity> <message>", os.Args[0])
}
severity := os.Args[1]
body := os.Args[2]
// 发布消息
err = ch.Publish(
"direct_logs", // 交换机名称
severity, // 路由键
false, // 强制发送
false, // 立即发送
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(body),
})
if err != nil {
log.Fatalf("Failed to publish a message: %s", err)
}
log.Printf("Sent [%s] %s", severity, body)
}
2. 消费者代码
package main
import (
"log"
"github.com/streadway/amqp"
)
func main() {
// 连接到RabbitMQ
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
if err != nil {
log.Fatalf("Failed to connect to RabbitMQ: %s", err)
}
defer conn.Close()
// 创建通道
ch, err := conn.Channel()
if err != nil {
log.Fatalf("Failed to open a channel: %s", err)
}
defer ch.Close()
// 声明交换机
err = ch.ExchangeDeclare(
"direct_logs", // 交换机名称
"direct", // 交换机类型
true, // 是否持久化
false, // 是否排他
false, // 是否自动删除
false, // 是否阻塞
nil, // 额外属性
)
if err != nil {
log.Fatalf("Failed to declare an exchange: %s", err)
}
// 声明队列
q, err := ch.QueueDeclare(
"", // 随机队列名称
false, // 是否持久化
false, // 是否排他
true, // 是否自动删除
false, // 是否阻塞
nil, // 额外属性
)
if err != nil {
log.Fatalf("Failed to declare a queue: %s", err)
}
// 绑定队列到交换机,指定路由键
severity := "info" // 你可以根据需要更改这个值
err = ch.QueueBind(
q.Name, // 队列名称
severity, // 路由键
"direct_logs", // 交换机名称
false,
nil)
if err != nil {
log.Fatalf("Failed to bind a queue: %s", err)
}
// 消费消息
msgs, err := ch.Consume(
q.Name, // 队列名称
"", // 消费者名称
true, // 自动确认
false, // 排他
false, // 阻塞
false, // 优先
nil, // 额外属性
)
if err != nil {
log.Fatalf("Failed to register a consumer: %s", err)
}
// 处理消息
go func() {
for d := range msgs {
log.Printf("Received [%s] %s", d.RoutingKey, d.Body)
}
}()
log.Println("Waiting for messages. To exit press CTRL+C")
select {}
}
运行示例
- 启动RabbitMQ服务。
- 先运行消费者程序,确保它在等待消息。
go run consumer.go
- 然后运行生产者程序,传递路由键和要发送的消息。
go run producer.go info "This is an info message"
go run producer.go error "This is an error message"
主题订阅模式和RPC模式
1. 主题生产者代码
package main
import (
"log"
"os"
"github.com/streadway/amqp"
)
func main() {
// 连接到RabbitMQ
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
if err != nil {
log.Fatalf("Failed to connect to RabbitMQ: %s", err)
}
defer conn.Close()
// 创建通道
ch, err := conn.Channel()
if err != nil {
log.Fatalf("Failed to open a channel: %s", err)
}
defer ch.Close()
// 声明主题交换机
err = ch.ExchangeDeclare(
"topic_logs", // 交换机名称
"topic", // 交换机类型
true, // 是否持久化
false, // 是否排他
false, // 是否自动删除
false, // 是否阻塞
nil, // 额外属性
)
if err != nil {
log.Fatalf("Failed to declare an exchange: %s", err)
}
// 从命令行获取路由键和消息内容
if len(os.Args) < 3 {
log.Fatalf("Usage: %s <routing_key> <message>", os.Args[0])
}
routingKey := os.Args[1]
body := os.Args[2]
// 发布消息
err = ch.Publish(
"topic_logs", // 交换机名称
routingKey, // 路由键
false, // 强制发送
false, // 立即发送
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(body),
})
if err != nil {
log.Fatalf("Failed to publish a message: %s", err)
}
log.Printf("Sent [%s] %s", routingKey, body)
}
2. 主题消费者代码
package main
import (
"log"
"github.com/streadway/amqp"
)
func main() {
// 连接到RabbitMQ
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
if err != nil {
log.Fatalf("Failed to connect to RabbitMQ: %s", err)
}
defer conn.Close()
// 创建通道
ch, err := conn.Channel()
if err != nil {
log.Fatalf("Failed to open a channel: %s", err)
}
defer ch.Close()
// 声明主题交换机
err = ch.ExchangeDeclare(
"topic_logs", // 交换机名称
"topic", // 交换机类型
true, // 是否持久化
false, // 是否排他
false, // 是否自动删除
false, // 是否阻塞
nil, // 额外属性
)
if err != nil {
log.Fatalf("Failed to declare an exchange: %s", err)
}
// 声明队列
q, err := ch.QueueDeclare(
"", // 随机队列名称
false, // 是否持久化
false, // 是否排他
true, // 是否自动删除
false, // 是否阻塞
nil, // 额外属性
)
if err != nil {
log.Fatalf("Failed to declare a queue: %s", err)
}
// 绑定队列到交换机,指定路由键模式
bindingKey := "#.info" // 你可以根据需要更改这个值
err = ch.QueueBind(
q.Name, // 队列名称
bindingKey, // 路由键模式
"topic_logs", // 交换机名称
false,
nil)
if err != nil {
log.Fatalf("Failed to bind a queue: %s", err)
}
// 消费消息
msgs, err := ch.Consume(
q.Name, // 队列名称
"", // 消费者名称
true, // 自动确认
false, // 排他
false, // 阻塞
false, // 优先
nil, // 额外属性
)
if err != nil {
log.Fatalf("Failed to register a consumer: %s", err)
}
// 处理消息
go func() {
for d := range msgs {
log.Printf("Received [%s] %s", d.RoutingKey, d.Body)
}
}()
log.Println("Waiting for messages. To exit press CTRL+C")
select {}
}
运行主题示例
- 启动RabbitMQ服务。
- 先运行消费者程序,确保它在等待消息。例如,运行以下命令以接收所有“info”级别的消息:
go run consumer.go
- 然后运行生产者程序,传递路由键和要发送的消息。例如:
go run producer.go "quick.info" "This is an info message"
go run producer.go "lazy.error" "This is an error message"
你将看到消费者接收到与其绑定的路由键匹配的消息。
二、RPC模式
1. RPC 服务端代码
package main
import (
"log"
"strconv"
"github.com/streadway/amqp"
)
func fib(n int) int {
if n <= 0 {
return 0
}
if n == 1 {
return 1
}
return fib(n-1) + fib(n-2)
}
func main() {
// 连接到RabbitMQ
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
if err != nil {
log.Fatalf("Failed to connect to RabbitMQ: %s", err)
}
defer conn.Close()
// 创建通道
ch, err := conn.Channel()
if err != nil {
log.Fatalf("Failed to open a channel: %s", err)
}
defer ch.Close()
// 声明队列
q, err := ch.QueueDeclare(
"rpc_queue", // 队列名称
false, // 是否持久化
false, // 是否排他
false, // 是否自动删除
false, // 是否阻塞
nil, // 额外属性
)
if err != nil {
log.Fatalf("Failed to declare a queue: %s", err)
}
// 处理请求
msgs, err := ch.Consume(
q.Name, // 队列名称
"", // 消费者名称
true, // 自动确认
false, // 排他
false, // 阻塞
false, // 优先
nil, // 额外属性
)
if err != nil {
log.Fatalf("Failed to register a consumer: %s", err)
}
log.Println("Awaiting RPC requests")
for d := range msgs {
n, err := strconv.Atoi(string(d.Body))
if err != nil {
log.Printf("Failed to convert body to int: %s", err)
continue
}
log.Printf("Calculating fib(%d)", n)
response := fib(n)
// 发布响应
ch.Publish(
"", // 交换机
d.ReplyTo, // 路由键
false, // 强制发送
false, // 立即发送
amqp.Publishing{
CorrelationId: d.CorrelationId,
Body: []byte(strconv.Itoa(response)),
})
}
}
2. RPC 客户端代码
package main
import (
"log"
"os"
"strconv"
"github.com/streadway/amqp"
)
func main() {
// 连接到RabbitMQ
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
if err != nil {
log.Fatalf("Failed to connect to RabbitMQ: %s", err)
}
defer conn.Close()
// 创建通道
ch, err := conn.Channel()
if err != nil {
log.Fatalf("Failed to open a channel: %s", err)
}
defer ch.Close()
// 声明一个随机的响应队列
replyQueue, err := ch.QueueDeclare(
"", // 随机队列名称
false, // 是否持久化
false, // 是否排他
true, // 是否自动删除
false, // 是否阻塞
nil, // 额外属性
)
if err != nil {
log.Fatalf("Failed to declare a reply queue: %s", err)
}
// 消费响应消息
corrId := ""
msgs, err := ch.Consume(
replyQueue.Name, // 队列名称
"", // 消费者名称
true, // 自动确认
false, // 排他
false, // 阻塞
false, // 优先
nil, // 额外属性
)
if err != nil {
log.Fatalf("Failed to register a consumer: %s", err)
}
if len(os.Args) < 2 {
log.Fatalf("Usage: %s <n>", os.Args[0])
}
n, err := strconv.Atoi(os.Args[1])
if err != nil {
log.Fatalf("Invalid argument: %s", os.Args[1])
}
// 发送请求
corrId = randomString(32)
err = ch.Publish(
"", // 交换机
"rpc_queue", // 路由键
false, // 强制发送
false, // 立即发送
amqp.Publishing{
CorrelationId: corrId,
ReplyTo: replyQueue.Name,
Body: []byte(strconv.Itoa(n)),
})
if err != nil {
log.Fatalf("Failed to publish a message: %s", err)
}
// 等待响应
for d := range msgs {
if d.CorrelationId == corrId {
log.Printf("Got response: %s", d.Body)
break
}
}
}
// randomString 生成一个随机字符串
func randomString(n int) string {
letters := []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789")
b := make([]rune, n)
for i := range b {
b[i] = letters[rand.Intn(len(letters))]
}
return string(b)
}
运行RPC示例
- 启动RabbitMQ服务。
- 启动RPC服务端:
go run rpc_server.go
- 启动RPC客户端,传递要计算的斐波那契数:
go run rpc_client.go 10
可靠性、数据持久化、消费端限流、消费者确认和消息过期处理
在RabbitMQ中,确保消息的可靠性和数据的持久性是非常重要的。我们可以通过以下几个方面来实现这些目标:
- 消息持久化:确保消息在RabbitMQ重启后仍然存在。
- 消费端限流:控制消费者的消息处理速率。
- 消费者确认:确保消息被成功处理后再从队列中移除。
- 消息过期处理:设置消息的过期时间,超时后自动删除。
1. 消息持久化
生产者代码(持久化消息)
package main
import (
"log"
"os"
"github.com/streadway/amqp"
)
func main() {
// 连接到RabbitMQ
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
if err != nil {
log.Fatalf("Failed to connect to RabbitMQ: %s", err)
}
defer conn.Close()
// 创建通道
ch, err := conn.Channel()
if err != nil {
log.Fatalf("Failed to open a channel: %s", err)
}
defer ch.Close()
// 声明持久化队列
q, err := ch.QueueDeclare(
"durable_queue", // 队列名称
true, // 是否持久化
false, // 是否排他
false, // 是否自动删除
false, // 是否阻塞
nil, // 额外属性
)
if err != nil {
log.Fatalf("Failed to declare a queue: %s", err)
}
// 从命令行获取消息内容
if len(os.Args) < 2 {
log.Fatalf("Usage: %s <message>", os.Args[0])
}
body := os.Args[1]
// 发布持久化消息
err = ch.Publish(
"", // 交换机
q.Name, // 路由键
false, // 强制发送
false, // 立即发送
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(body),
DeliveryMode: amqp.Persistent, // 设置消息持久化
})
if err != nil {
log.Fatalf("Failed to publish a message: %s", err)
}
log.Printf("Sent %s", body)
}
2. 消费端限流
可以通过设置prefetch
来控制消费者在确认之前可以处理的消息数量。
消费者代码(限流)
package main
import (
"log"
"github.com/streadway/amqp"
)
func main() {
// 连接到RabbitMQ
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
if err != nil {
log.Fatalf("Failed to connect to RabbitMQ: %s", err)
}
defer conn.Close()
// 创建通道
ch, err := conn.Channel()
if err != nil {
log.Fatalf("Failed to open a channel: %s", err)
}
defer ch.Close()
// 声明持久化队列
_, err = ch.QueueDeclare(
"durable_queue", // 队列名称
true, // 是否持久化
false, // 是否排他
false, // 是否自动删除
false, // 是否阻塞
nil, // 额外属性
)
if err != nil {
log.Fatalf("Failed to declare a queue: %s", err)
}
// 设置限流
err = ch.Qos(
1, // 每次只处理一条消息
0, // 不限制消息大小
false, // 不阻塞
)
if err != nil {
log.Fatalf("Failed to set QoS: %s", err)
}
// 消费消息
msgs, err := ch.Consume(
"durable_queue", // 队列名称
"", // 消费者名称
false, // 自动确认
false, // 排他
false, // 阻塞
false, // 优先
nil, // 额外属性
)
if err != nil {
log.Fatalf("Failed to register a consumer: %s", err)
}
// 处理消息
go func() {
for d := range msgs {
log.Printf("Received %s", d.Body)
// 模拟处理时间
// time.Sleep(2 * time.Second)
d.Ack(false) // 手动确认
}
}()
log.Println("Waiting for messages. To exit press CTRL+C")
select {}
}
3. 消费者确认
在消费者处理完消息后,可以通过手动确认来确保消息被成功处理。
在上面的消费者代码中,d.Ack(false)
就是手动确认的实现。
4. 消息过期处理
可以通过设置队列的x-message-ttl
属性来设置消息的过期时间。
修改消费者代码以支持消息过期
package main
import (
"log"
"github.com/streadway/amqp"
)
func main() {
// 连接到RabbitMQ
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
if err != nil {
log.Fatalf("Failed to connect to RabbitMQ: %s", err)
}
defer conn.Close()
// 创建通道
ch, err := conn.Channel()
if err != nil {
log.Fatalf("Failed to open a channel: %s", err)
}
defer ch.Close()
// 声明持久化队列并设置消息过期时间
_, err = ch.QueueDeclare(
"durable_queue", // 队列名称
true, // 是否持久化
false, // 是否排他
false, // 是否自动删除
false, // 是否阻塞
amqp.Table{
"x-message-ttl": 10000, // 消息过期时间(毫秒)
},
)
if err != nil {
log.Fatalf("Failed to declare a queue: %s", err)
}
// 消费消息
msgs, err := ch.Consume(
"durable_queue", // 队列名称
"", // 消费者名称
false, // 自动确认
false, // 排他
false, // 阻塞
false, // 优先
nil, // 额外属性
)
if err != nil {
log.Fatalf("Failed to register a consumer: %s", err)
}
// 处理消息
go func() {
for d := range msgs {
log.Printf("Received %s", d.Body)
d.Ack(false) // 手动确认
}
}()
log.Println("Waiting for messages. To exit press CTRL+C")
select {}
}
运行示例
- 启动RabbitMQ服务。
- 启动生产者,发送持久化消息:
go run producer.go "Hello World!"
- 启动消费者,处理消息并设置限流和过期时间:
go run consumer.go
实现高并发秒杀、抢购、预约、订票系统
系统设计
- 商品库存管理:使用数据库或内存来管理商品库存。
- RabbitMQ作为消息队列:将用户请求放入RabbitMQ队列中,由消费者处理实际的库存扣减操作。
- 并发控制:通过消息队列来控制并发,确保在高并发情况下不会超卖。
- 消费者确认:确保每个请求被成功处理后再从队列中移除。
代码实现
1. 商品库存管理
package main
import (
"sync"
)
type Product struct {
ID string
Quantity int
}
var inventory = map[string]*Product{
"product_1": {ID: "product_1", Quantity: 10},
}
var mu sync.Mutex
func reduceStock(productID string) bool {
mu.Lock()
defer mu.Unlock()
product, exists := inventory[productID]
if !exists || product.Quantity <= 0 {
return false
}
product.Quantity--
return true
}
2. 生产者代码
package main
import (
"log"
"os"
"github.com/streadway/amqp"
)
func sendRequest(productID string) {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
if err != nil {
log.Fatalf("Failed to connect to RabbitMQ: %s", err)
}
defer conn.Close()
ch, err := conn.Channel()
if err != nil {
log.Fatalf("Failed to open a channel: %s", err)
}
defer ch.Close()
// 声明队列
_, err = ch.QueueDeclare(
"order_queue", // 队列名称
true, // 是否持久化
false, // 是否排他
false, // 是否自动删除
false, // 是否阻塞
nil, // 额外属性
)
if err != nil {
log.Fatalf("Failed to declare a queue: %s", err)
}
// 发布消息
err = ch.Publish(
"", // 交换机
"order_queue", // 队列名称
false, // 强制发送
false, // 立即发送
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(productID),
})
if err != nil {
log.Fatalf("Failed to publish a message: %s", err)
}
log.Printf("Sent request for %s", productID)
}
func main() {
if len(os.Args) < 2 {
log.Fatalf("Usage: %s <product_id>", os.Args[0])
}
productID := os.Args[1]
sendRequest(productID)
}
3. 消费者代码
package main
import (
"log"
"github.com/streadway/amqp"
)
func main() {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
if err != nil {
log.Fatalf("Failed to connect to RabbitMQ: %s", err)
}
defer conn.Close()
ch, err := conn.Channel()
if err != nil {
log.Fatalf("Failed to open a channel: %s", err)
}
defer ch.Close()
// 声明队列
_, err = ch.QueueDeclare(
"order_queue", // 队列名称
true, // 是否持久化
false, // 是否排他
false, // 是否自动删除
false, // 是否阻塞
nil, // 额外属性
)
if err != nil {
log.Fatalf("Failed to declare a queue: %s", err)
}
msgs, err := ch.Consume(
"order_queue", // 队列名称
"", // 消费者名称
false, // 自动确认
false, // 排他
false, // 阻塞
false, // 优先
nil, // 额外属性
)
if err != nil {
log.Fatalf("Failed to register a consumer: %s", err)
}
go func() {
for d := range msgs {
productID := string(d.Body)
log.Printf("Received request for %s", productID)
if reduceStock(productID) {
log.Printf("Successfully purchased %s", productID)
d.Ack(false) // 手动确认
} else {
log.Printf("Failed to purchase %s: out of stock", productID)
d.Nack(false, false) // 拒绝消息,不重新入队
}
}
}()
log.Println("Waiting for messages. To exit press CTRL+C")
select {}
}
运行示例
- 启动RabbitMQ服务。
- 启动消费者:
go run consumer.go
- 启动多个生产者模拟用户请求:
go run producer.go product_1
使用Gin+PostgreSQL解决高并发增加数据问题、以及使用RabbitMQ结合PostgreSQL优化。
环境准备
- 安装Go:确保你已经安装了Go语言。
- 安装Gin:使用以下命令安装Gin框架:
go get -u github.com/gin-gonic/gin
- 安装PostgreSQL:确保你已经安装并运行PostgreSQL。
- 安装RabbitMQ:确保你已经安装并运行RabbitMQ。
- 安装PostgreSQL驱动:
go get -u github.com/lib/pq
- 安装RabbitMQ驱动:
go get -u github.com/streadway/amqp
数据库设置
CREATE DATABASE testdb;
\c testdb
CREATE TABLE records (
id SERIAL PRIMARY KEY,
data TEXT NOT NULL
);
代码实现
1. 数据库连接
package db
import (
"database/sql"
"log"
_ "github.com/lib/pq"
)
var DB *sql.DB
func InitDB() {
var err error
connStr := "user=yourusername dbname=testdb sslmode=disable"
DB, err = sql.Open("postgres", connStr)
if err != nil {
log.Fatal(err)
}
if err = DB.Ping(); err != nil {
log.Fatal(err)
}
}
2. RabbitMQ连接
package rabbitmq
import (
"log"
"github.com/streadway/amqp"
)
var Channel *amqp.Channel
func InitRabbitMQ() {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
if err != nil {
log.Fatalf("Failed to connect to RabbitMQ: %s", err)
}
Channel, err = conn.Channel()
if err != nil {
log.Fatalf("Failed to open a channel: %s", err)
}
_, err = Channel.QueueDeclare(
"task_queue", // 队列名称
true, // 是否持久化
false, // 是否排他
false, // 是否自动删除
false, // 是否阻塞
nil, // 额外属性
)
if err != nil {
log.Fatalf("Failed to declare a queue: %s", err)
}
}
3. 处理请求
package main
import (
"bytes"
"encoding/json"
"net/http"
"github.com/gin-gonic/gin"
"yourmodule/db"
"yourmodule/rabbitmq"
)
type Record struct {
Data string `json:"data"`
}
func main() {
db.InitDB()
rabbitmq.InitRabbitMQ()
r := gin.Default()
r.POST("/records", func(c *gin.Context) {
var record Record
if err := c.ShouldBindJSON(&record); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
return
}
// 将记录发送到RabbitMQ
body, _ := json.Marshal(record)
err := rabbitmq.Channel.Publish(
"", // 交换机
"task_queue", // 路由键
false, // 强制发送
false, // 立即发送
amqp.Publishing{
ContentType: "application/json",
Body: body,
DeliveryMode: amqp.Persistent, // 设置消息持久化
})
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to publish message"})
return
}
c.JSON(http.StatusAccepted, gin.H{"status": "Request accepted"})
})
r.Run(":8080")
}
4. 消费者代码
package main
import (
"encoding/json"
"log"
"github.com/streadway/amqp"
"yourmodule/db"
)
type Record struct {
Data string `json:"data"`
}
func main() {
db.InitDB()
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
if err != nil {
log.Fatalf("Failed to connect to RabbitMQ: %s", err)
}
defer conn.Close()
ch, err := conn.Channel()
if err != nil {
log.Fatalf("Failed to open a channel: %s", err)
}
defer ch.Close()
msgs, err := ch.Consume(
"task_queue", // 队列名称
"", // 消费者名称
false, // 自动确认
false, // 排他
false, // 阻塞
false, // 优先
nil, // 额外属性
)
if err != nil {
log.Fatalf("Failed to register a consumer: %s", err)
}
log.Println("Waiting for messages. To exit press CTRL+C")
for d := range msgs {
var record Record
if err := json.Unmarshal(d.Body, &record); err != nil {
log.Printf("Failed to unmarshal message: %s", err)
d.Nack(false, false) // 拒绝消息,不重新入队
continue
}
// 将数据插入到PostgreSQL
_, err := db.DB.Exec("INSERT INTO records(data) VALUES($1)", record.Data)
if err != nil {
log.Printf("Failed to insert record: %s", err)
d.Nack(false, false) // 拒绝消息,不重新入队
continue
}
log.Printf("Inserted record: %s", record.Data)
d.Ack(false) // 手动确认
}
}
运行示例
- 启动RabbitMQ服务。
- 启动PostgreSQL服务并创建数据库和表。
- 启动消费者:
go run consumer.go
- 启动Gin服务:
go run main.go
- 使用
curl
或Postman发送请求:
curl -X POST http://localhost:8080/records -H "Content-Type: application/json" -d '{"data": "test data"}'
百万、千万并发的秒杀预约系统,负载均衡、Redis集群限流和RabbitMQ消峰
系统架构
- 负载均衡:使用Nginx或HAProxy等负载均衡器,将请求分发到多个后端服务实例。
- Redis集群限流:使用Redis来实现限流,确保在高并发情况下不会超卖。
- RabbitMQ消峰:使用RabbitMQ将请求异步处理,减轻后端服务的压力。
- 数据库:使用关系型数据库(如PostgreSQL)来持久化数据。
环境准备
- 安装Go:确保你已经安装了Go语言。
- 安装Gin:使用以下命令安装Gin框架:
go get -u github.com/gin-gonic/gin
- 安装Redis:确保你已经安装并运行Redis。
- 安装RabbitMQ:确保你已经安装并运行RabbitMQ。
- 安装PostgreSQL:确保你已经安装并运行PostgreSQL。
- 安装依赖:
go get -u github.com/go-redis/redis/v8 go get -u github.com/streadway/amqp go get -u github.com/lib/pq
数据库设置
CREATE DATABASE testdb;
\c testdb
CREATE TABLE records (
id SERIAL PRIMARY KEY,
data TEXT NOT NULL
);
Redis 限流实现
1. Redis连接
package redisdb
import (
"context"
"github.com/go-redis/redis/v8"
)
var ctx = context.Background()
var Rdb *redis.Client
func InitRedis() {
Rdb = redis.NewClient(&redis.Options{
Addr: "localhost:6379", // Redis地址
})
}
2. 限流函数
func RateLimit(key string, limit int) bool {
// 使用Redis的INCR命令增加请求计数
count, err := Rdb.Incr(ctx, key).Result()
if err != nil {
return false
}
// 设置过期时间
if count == 1 {
Rdb.Expire(ctx, key, 1) // 1秒的过期时间
}
return count <= int64(limit)
}
RabbitMQ 消费者
package main
import (
"context"
"encoding/json"
"log"
"github.com/go-redis/redis/v8"
"github.com/streadway/amqp"
"yourmodule/db"
"yourmodule/redisdb"
)
type Record struct {
Data string `json:"data"`
}
func main() {
db.InitDB()
redisdb.InitRedis()
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
if err != nil {
log.Fatalf("Failed to connect to RabbitMQ: %s", err)
}
defer conn.Close()
ch, err := conn.Channel()
if err != nil {
log.Fatalf("Failed to open a channel: %s", err)
}
defer ch.Close()
msgs, err := ch.Consume(
"task_queue", // 队列名称
"", // 消费者名称
false, // 自动确认
false, // 排他
false, // 阻塞
false, // 优先
nil, // 额外属性
)
if err != nil {
log.Fatalf("Failed to register a consumer: %s", err)
}
log.Println("Waiting for messages. To exit press CTRL+C")
for d := range msgs {
var record Record
if err := json.Unmarshal(d.Body, &record); err != nil {
log.Printf("Failed to unmarshal message: %s", err)
d.Nack(false, false) // 拒绝消息,不重新入队
continue
}
// 将数据插入到PostgreSQL
_, err := db.DB.Exec("INSERT INTO records(data) VALUES($1)", record.Data)
if err != nil {
log.Printf("Failed to insert record: %s", err)
d.Nack(false, false) // 拒绝消息,不重新入队
continue
}
log.Printf("Inserted record: %s", record.Data)
d.Ack(false) // 手动确认
}
}
Gin HTTP 服务
package main
import (
"encoding/json"
"net/http"
"github.com/gin-gonic/gin"
"yourmodule/db"
"yourmodule/rabbitmq"
"yourmodule/redisdb"
)
type Record struct {
Data string `json:"data"`
}
func main() {
db.InitDB()
redisdb.InitRedis()
rabbitmq.InitRabbitMQ()
r := gin.Default()
r.POST("/records", func(c *gin.Context) {
var record Record
if err := c.ShouldBindJSON(&record); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
return
}
// 限流
if !redisdb.RateLimit("rate_limit_key", 100) { // 每秒100个请求
c.JSON(http.StatusTooManyRequests, gin.H{"error": "Too many requests"})
return
}
// 将记录发送到RabbitMQ
body, _ := json.Marshal(record)
err := rabbitmq.Channel.Publish(
"", // 交换机
"task_queue", // 队列名称
false, // 强制发送
false, // 立即发送
amqp.Publishing{
ContentType: "application/json",
Body: body,
DeliveryMode: amqp.Persistent, // 设置消息持久化
})
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to publish message"})
return
}
c.JSON(http.StatusAccepted, gin.H{"status": "Request accepted"})
})
r.Run(":8080")
}
启动服务
- 启动Redis服务。
- 启动RabbitMQ服务。
- 启动PostgreSQL服务并创建数据库和表。
- 启动消费者:
go run consumer.go
- 启动Gin服务:
go run main.go
- 使用
curl
或Postman发送请求:
curl -X POST http://localhost:8080/records -H "Content-Type: application/json" -d '{"data": "test data"}'
负载均衡
http {
upstream myapp {
server localhost:8080;
server localhost:8081;
server localhost:8082;
}
server {
listen 80;
location / {
proxy_pass http://myapp;
}
}
}
标签:false,err,nil,Failed,RabbitMQ,备忘录,Fatalf,log
From: https://www.cnblogs.com/mugetsukun/p/18406515