erlang 安装
编译依赖:yum install make gcc gcc-c++ build-essential openssl openssl-devel unixODBC unixODBC-devel kernel-devel m4 ncurses-devel
解压:tar -zxvf
创建存放环境目录:mkdir /opt/rabbitMq/erlang
进入 erlang 解压目录执行命令:./configure --prefix=/opt/rabbitmq/erlang --without-javac
安装:make && make install
配置环境变量:vim /etc/profile
#set erlang environment
export PATH=$PATH:/路径和上面黄色部分对应/erlang/bin
刷新配置文件:source profile
测试安装是否成功:erl
RabbitMQ 安装
安装:rpm -ivh --nodeps rabbitmq-server-xxx.noarch.rpm
启动:rabbitmq-server start &
// 如果没有权限,使用 find / -name .erlang.cookie,找到后授权
find / -name .erlang.cookie
chown rabbitmq:rabbitmq /var/lib/rabbitmq/.erlang.cookie
chmod 400 /var/lib/rabbitmq/.erlang.cookie
停止:rabbitmqctl stop
插件添加:rabbitmq-plugins enable {插件名}
插件卸载:rabbitmq-plugins disable {插件名}
查看插件:rabbitmq-plugins list
rabbitmq_management 为 web 管理端插件
本地的话可以使用 username:guest password:guest 进行登录,但是远程就不行了,需要新建用户(必须是 rabbitmq 启动状态下才能新建)
# 第一步:添加 admin 用户并设置密码
rabbitmqctl add_user admin 1234
# 第二步:添加 admin 用户为 administrator 角色
rabbitmqctl set_user_tags admin administrator
# 第三步:设置 admin 用户的权限,指定允许访问的 vhost 以及 write/read
rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"
# 第四步:查看 vhost (/) 允许哪些用户访问
rabbitmqctl list_permissions -p /
# 第五步:查看用户列表
rabbitmqctl list_users
# 第六步:重启 RabbitMQ,使用设置好的账户和密码登录。
go -> RabbitMQ
package rabbitmq
import (
"fmt"
"github.com/streadway/amqp"
"log"
)
// 连接信息标准格式: amqp://用户名:密码@ RabbitMQ 服务器名称:RabbitMQ 所在服务器的端口/virtual hosts
const MQURL = "amqp://admin:admin@[email protected]:5672/mq1"
type RabbitMQ struct {
conn *amqp.Connection
channel *amqp.Channel
QueueName string // 队列名称
Exchange string // 交换机
Key string // key
MQURL string // 连接信息
}
// NewRabbitMQ - 创建 RabbitMQ 结构体实例
func NewRabbitMQ(queueName string, exchange string, key string) *RabbitMQ {
rabbitmq := &RabbitMQ{
QueueName: queueName,
Exchange: exchange,
Key: key,
MQURL: MQURL,
}
var err error
// 创建 RabbitMQ 连接
rabbitmq.conn, err = amqp.Dial(rabbitmq.MQURL)
rabbitmq.failOnErr(err, "创建连接错误")
rabbitmq.channel, err = rabbitmq.conn.Channel()
rabbitmq.failOnErr(err, "获取 channel 失败")
return rabbitmq
}
// Destory - 断开 channel 和 connection
func (r *RabbitMQ) Destory() {
r.channel.Close()
r.conn.Close()
}
// failOnErr - 错误处理函数
func (r *RabbitMQ) failOnErr(err error, message string) {
if err != nil {
log.Fatalf("%s:%s", message, err)
panic(fmt.Sprintf("%s:%s", message, err))
}
}
Simple
最简单常用的工作模式。
![[Simple.png]]
一个生产者对应一个消费者
获取实例
// NewRabbitMQSimple - 创建 Simple 模式下的 RabbitMQ 实例
func NewRabbitMQSimple(queueName string) *RabbitMQ {
return NewRabbitMQ(queueName, "", "")
}
生产者
// PublishSimple - Smiple 模式下的生产者
func (r *RabbitMQ) PublishSimple(message string) {
// 1. 申请队列,找不到就创建,存在就跳过创建
// 保证队列存在,消息能放到队列中
_, err := r.channel.QueueDeclare(
r.QueueName,
false, // 是否持久化
false, // 是否自动删除
false, // 是否具有排他性
false, // 是否阻塞
nil, // 额外属性
)
if err != nil {
fmt.Println(err)
return
}
// 2. 发送消息到队列中
r.channel.Publish(
r.Exchange, // 交换机
r.QueueName, // 队列名称
false, // 如果为 true,根据 exchange 类型和 routkey 规则,如果无法找到符合条件的队列,那么会把发送的消息返回给发送者
false, // 如果为 true,当 exchange 发送消息到队列后发现队列上没有绑定消费者,则会把消息发还给发送者
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(message),
},
)
}
消费者
// ConsumeSimple - Smiple 模式下的消费者
func (r *RabbitMQ) ConsumeSimple() {
// 1. 申请队列,找不到就创建,存在就跳过创建
// 保证队列存在,消息能放到队列中
_, err := r.channel.QueueDeclare(
r.QueueName,
false, // 是否持久化
false, // 是否自动删除
false, // 是否具有排他性
false, // 是否阻塞
nil, // 额外属性
)
if err != nil {
fmt.Println(err)
return
}
// 2. 接收消息
msgs, err := r.channel.Consume(
r.QueueName, // 队列名称
"", // 用来区分多个消费者
true, // 是否自动应答
false, // 是否具有排他性
false, // 如果为 true,表示不能将同一个 connections 中发送的消息传递给这个 connections 中的消费者
false, // 队列消费是否阻塞
nil, // 额外属性
)
if err != nil {
fmt.Println(err)
return
}
forever := make(chan bool)
// 启用协程处理消息
go func() {
for d := range msgs {
// 实现我们要处理的逻辑判断
fmt.Printf("Received a message: %s\n", d.Body)
}
}()
log.Printf("[*] Waiting for messages, To exit press CTRL + C")
<-forever
}
示例
mainSimplePublish
package main
import (
"fmt"
"rabbitmq-base/rabbitmq"
)
func main() {
r := rabbitmq.NewRabbitMQSimple("simple")
r.PublishSimple("aaaa")
fmt.Println("发送成功")
}
mainSimpleRecieve
package main
import "rabbitmq-base/rabbitmq"
func main() {
r := rabbitmq.NewRabbitMQSimple("simple")
r.ConsumeSimple()
}
Work
工作模式
一个消息只能被一个消费者获取,也就是消息只能被消费一次。
一个生产者对应多个消费者。
![[Work.png]]
与 Simple 模式最大的区别就是多个消费者
Publish/Subscribe
订阅模式
消息被路由投递给多个队列,一个消息被多个消费者获取。
![[Publish&Subscribe.png]]
获取实例
// NewRabbitMQPubSub - 创建订阅模式下的 RabbitMQ 实例
func NewRabbitMQPubSub(exchange string) *RabbitMQ {
return NewRabbitMQ("", exchange, "")
}
生产者
// PublishPub - 订阅模式下的生产者
func (r *RabbitMQ) PublishPub(message string) {
// 1. 尝试创建交换机,不存在创建
err := r.channel.ExchangeDeclare(
r.Exchange, // 交换机名称
"fanout", // 交换机类型,fanout 是广播类型
true, // 是否持久化
false, // 自动删除
false, // 如果为 true,表示这个 exchange 不可以被 client 用来推送消息,仅用来进行 exchange 和 exchange 之间的绑定
false, // 是否阻塞
nil, // 额外属性
)
r.failOnErr(err, "Failed to declare an exchange")
// 2. 发送消息
err = r.channel.Publish(
r.Exchange,
"",
false, // 如果为 true,根据 exchange 类型和 routkey 规则,如果无法找到符合条件的队列,那么会把发送的消息返回给发送者
false, // 如果为 true,当 exchange 发送消息到队列后发现队列上没有绑定消费者,则会把消息发还给发送者
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(message),
},
)
}
消费者
// RecieveSub - 订阅模式下的消费者
func (r *RabbitMQ) RecieveSub() {
// 1. 试探性创建交换机
err := r.channel.ExchangeDeclare(
r.Exchange, // 交换机名称
"fanout", // 交换机类型,fanout 是广播类型
true, // 是否持久化
false, // 自动删除
false, // 如果为 true,表示这个 exchange 不可以被 client 用来推送消息,仅用来进行 exchange 和 exchange 之间的绑定
false, // 是否阻塞
nil, // 额外属性
)
r.failOnErr(err, "Failed to declare an exchange")
// 2. 试探性创建队列,这里注意队列名称不要写,使用随机名称
q, err := r.channel.QueueDeclare(
"", // 随机生产队列名称
false, // 是否持久化
false, // 自动删除
true, // 是否排他
false, // 是否阻塞
nil, // 额外属性
)
r.failOnErr(err, "Failed to declare a queue")
// 绑定队列到 exchange 中
err = r.channel.QueueBind(
q.Name,
"", // 在 Pub/Sub 模式下,这里的 key 要为空
r.Exchange,
false, // 是否阻塞
nil, // 额外属性
)
// 3. 消费消息
message, err := r.channel.Consume(
q.Name, // 队列名称
"", // 用来区分多个消费者
true, // 是否自动应答
false, // 是否具有排他性
false, // 如果为 true,表示不能将同一个 connections 中发送的消息传递给这个 connections 中的消费者
false, // 队列消费是否阻塞
nil, // 额外属性
)
forever := make(chan bool)
go func() {
for d := range message {
log.Printf("Received a message: %s\n", d.Body)
}
}()
fmt.Println("退出请按 CTRL + C")
<-forever
}
示例
mainPub
package main
import (
"fmt"
"rabbitmq-base/rabbitmq"
"strconv"
"time"
)
func main() {
r := rabbitmq.NewRabbitMQPubSub("exchange1")
for i := 0; i < 100; i++ {
r.PublishPub("订阅模式生产第" + strconv.Itoa(i) + "条数据")
fmt.Println("第" + strconv.Itoa(i) + "条消息")
time.Sleep(1 * time.Second)
}
}
mainSub1
package main
import "rabbitmq-base/rabbitmq"
func main() {
r := rabbitmq.NewRabbitMQPubSub("exchange1")
r.RecieveSub()
}
mainSub2
package main
import "rabbitmq-base/rabbitmq"
func main() {
r := rabbitmq.NewRabbitMQPubSub("exchange1")
r.RecieveSub()
}
Routing
路由模式
一个消息被多个消费者获取。并且消息的目标队列可被生产者指定。
![[Routing.png]]
获取实例
// NewRabbitMQRouting - 创建路由模式下的 RabbitMQ 实例
func NewRabbitMQRouting(exchange string, routingKey string) *RabbitMQ {
return NewRabbitMQ("", exchange, routingKey)
}
生产者
// PublishRouting - 路由模式下的生产者
func (r *RabbitMQ) PublishRouting(message string) {
// 1. 尝试创建交换机,不存在创建
err := r.channel.ExchangeDeclare(
r.Exchange, // 交换机名称
"direct", // 交换机类型,direct
true, // 是否持久化
false, // 自动删除
false, // 如果为 true,表示这个 exchange 不可以被 client 用来推送消息,仅用来进行 exchange 和 exchange 之间的绑定
false, // 是否阻塞
nil, // 额外属性
)
r.failOnErr(err, "Failed to declare an exchange")
// 2. 发送消息
err = r.channel.Publish(
r.Exchange,
r.Key, // 设置 routingKey
false, // 如果为 true,根据 exchange 类型和 routkey 规则,如果无法找到符合条件的队列,那么会把发送的消息返回给发送者
false, // 如果为 true,当 exchange 发送消息到队列后发现队列上没有绑定消费者,则会把消息发还给发送者
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(message),
},
)
}
消费者
// RecieveRouting - 路由模式下的消费者
func (r *RabbitMQ) RecieveRouting() {
// 1. 试探性创建交换机
err := r.channel.ExchangeDeclare(
r.Exchange, // 交换机名称
"direct", // 交换机类型,direct
true, // 是否持久化
false, // 自动删除
false, // 如果为 true,表示这个 exchange 不可以被 client 用来推送消息,仅用来进行 exchange 和 exchange 之间的绑定
false, // 是否阻塞
nil, // 额外属性
)
r.failOnErr(err, "Failed to declare an exchange")
// 2. 试探性创建队列,这里注意队列名称不要写,使用随机名称
q, err := r.channel.QueueDeclare(
"", // 随机生产队列名称
false, // 是否持久化
false, // 自动删除
true, // 是否排他
false, // 是否阻塞
nil, // 额外属性
)
r.failOnErr(err, "Failed to declare a queue")
// 绑定队列到 exchange 中
err = r.channel.QueueBind(
q.Name,
r.Key, // 需要绑定 routingKey
r.Exchange,
false, // 是否阻塞
nil, // 额外属性
)
// 3. 消费消息
message, err := r.channel.Consume(
q.Name, // 队列名称
"", // 用来区分多个消费者
true, // 是否自动应答
false, // 是否具有排他性
false, // 如果为 true,表示不能将同一个 connections 中发送的消息传递给这个 connections 中的消费者
false, // 队列消费是否阻塞
nil, // 额外属性
)
forever := make(chan bool)
go func() {
for d := range message {
log.Printf("Received a message: %s\n", d.Body)
}
}()
fmt.Println("退出请按 CTRL + C")
<-forever
}
示例
mainRoutingPublish
package main
import (
"rabbitmq-base/rabbitmq"
"strconv"
"time"
)
func main() {
r1 := rabbitmq.NewRabbitMQRouting("exchange2", "error")
r2 := rabbitmq.NewRabbitMQRouting("exchange2", "info")
for i := 0; i < 5; i++ {
r1.PublishRouting("Hello exchange1 error" + strconv.Itoa(i))
r2.PublishRouting("Hello exchange1 info" + strconv.Itoa(i))
time.Sleep(1 * time.Second)
}
}
mainRoutingRecieve1
package main
import "rabbitmq-base/rabbitmq"
func main() {
r := rabbitmq.NewRabbitMQRouting("exchange2", "error")
r.RecieveRouting()
}
mainRoutingRecieve2
package main
import "rabbitmq-base/rabbitmq"
func main() {
r := rabbitmq.NewRabbitMQRouting("exchange2", "info")
r.RecieveRouting()
}
Topic
话题模式
一个消息被多个消费者获取。消息的目标 queue 可用 BindingKey 以通配符,(#:一个或多个词,*:一个词)的方式指定。
![[Topic.png]]
获取实例
// NewRabbitMQTopic - 创建话题模式下的 RabbitMQ 实例
func NewRabbitMQTopic(exchange string, routingKey string) *RabbitMQ {
return NewRabbitMQ("", exchange, routingKey)
}
生产者
// PublishTopic - 话题模式下的生产者
func (r *RabbitMQ) PublishTopic(message string) {
// 1. 尝试创建交换机,不存在创建
err := r.channel.ExchangeDeclare(
r.Exchange, // 交换机名称
"topic", // 交换机类型,topic
true, // 是否持久化
false, // 自动删除
false, // 如果为 true,表示这个 exchange 不可以被 client 用来推送消息,仅用来进行 exchange 和 exchange 之间的绑定
false, // 是否阻塞
nil, // 额外属性
)
r.failOnErr(err, "Failed to declare an exchange")
// 2. 发送消息
err = r.channel.Publish(
r.Exchange,
r.Key, // 设置 routingKey
false, // 如果为 true,根据 exchange 类型和 routkey 规则,如果无法找到符合条件的队列,那么会把发送的消息返回给发送者
false, // 如果为 true,当 exchange 发送消息到队列后发现队列上没有绑定消费者,则会把消息发还给发送者
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(message),
},
)
}
消费者
// RecieveTopic - 话题模式下的消费者
func (r *RabbitMQ) RecieveTopic() {
// 1. 试探性创建交换机
err := r.channel.ExchangeDeclare(
r.Exchange, // 交换机名称
"topic", // 交换机类型,direct
true, // 是否持久化
false, // 自动删除
false, // 如果为 true,表示这个 exchange 不可以被 client 用来推送消息,仅用来进行 exchange 和 exchange 之间的绑定
false, // 是否阻塞
nil, // 额外属性
)
r.failOnErr(err, "Failed to declare an exchange")
// 2. 试探性创建队列,这里注意队列名称不要写,使用随机名称
q, err := r.channel.QueueDeclare(
"", // 随机生产队列名称
false, // 是否持久化
false, // 自动删除
true, // 是否排他
false, // 是否阻塞
nil, // 额外属性
)
r.failOnErr(err, "Failed to declare a queue")
// 绑定队列到 exchange 中
err = r.channel.QueueBind(
q.Name,
r.Key, // 需要绑定 routingKey
r.Exchange,
false, // 是否阻塞
nil, // 额外属性
)
// 3. 消费消息
message, err := r.channel.Consume(
q.Name, // 队列名称
"", // 用来区分多个消费者
true, // 是否自动应答
false, // 是否具有排他性
false, // 如果为 true,表示不能将同一个 connections 中发送的消息传递给这个 connections 中的消费者
false, // 队列消费是否阻塞
nil, // 额外属性
)
forever := make(chan bool)
go func() {
for d := range message {
log.Printf("Received a message: %s\n", d.Body)
}
}()
fmt.Println("退出请按 CTRL + C")
<-forever
}
示例
mainTopicPublish
package main
import (
"rabbitmq-base/rabbitmq"
"strconv"
"time"
)
func main() {
r1 := rabbitmq.NewRabbitMQTopic("exchange3", "xylx.topic1")
r2 := rabbitmq.NewRabbitMQTopic("exchange3", "xylx.topic2")
for i := 0; i <= 100; i++ {
r1.PublishTopic("Hello exchange3 xylx.topic1 -> " + strconv.Itoa(i))
r2.PublishTopic("Hello exchange3 xylx.topic2 -> " + strconv.Itoa(i))
time.Sleep(1 * time.Second)
}
}
mainTopicRecieve1
. 用来分隔词
package main
import "rabbitmq-base/rabbitmq"
func main() {
r := rabbitmq.NewRabbitMQTopic("exchange3", "#.topic1")
r.RecieveTopic()
}
mainTopicRecieve2
package main
import "rabbitmq-base/rabbitmq"
func main() {
r := rabbitmq.NewRabbitMQTopic("exchange3", "#.topic2")
r.RecieveTopic()
}
标签:false,err,exchange,队列,rabbitmq,RabbitMQ,go,true
From: https://www.cnblogs.com/xylx1/p/17445730.html