转载自:https://juejin.cn/post/6992551868748529677
RabbitMQ是使用Erlang语言开发的开源消息队列系统,基于AMQP协议来实现。 AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。 AMQP协议更多用在企业系统内,对数据一致性、稳定性和可靠性要求很高的场景,对性能和吞吐量的要求还在其次。 RabbitMQ的可靠性是非常好的,数据能够保证百分之百的不丢失。可以使用镜像队列,它的稳定性非常好。所以说在我们互联网的金融行业。对数据的稳定性和可靠性要求都非常高的情况下,我们都会选择RabbitMQ。当然没有kafka性能好,但是要比AvtiveMQ性能要好很多。也可以自己做一些性能的优化。 RabbitMQ可以构建异地双活架构,包括每一个节点存储方式可以采用磁盘或者内存的方式。#先查看一下我的版本号 root@guofu:~# cat /etc/issue Ubuntu 18.04.5 LTS \n \l #从前面的mq对比中已经说了,rabbitmq是erlang实现的,所以需要安装erlang 26 sudo apt-get install erlang-nox # 添加公钥 27 wget -O- https://www.rabbitmq.com/rabbitmq-release-signing-key.asc | sudo apt-key add - # 更新软件包 28 sudo apt-get update # 安装rabbitmq ,安装完毕自动启动 29 sudo apt-get install rabbitmq-server # 查看rabbitmq的运行状态 service rabbitmq-server status 也可以查看 30 systemctl status rabbitmq-server info:Active: active (running) since Mon 2021-07-26 11:15:54 CST; 13s ago #服务的启动、停止、重启 31 sudo service rabbitmq-server stop 32 sudo service rabbitmq-server start 33 sudo service rabbitmq-server # 安装可视化的web操作页面 34 sudo rabbitmq-plugins enable rabbitmq_management 35 sudo service rabbitmq-server restart 36 curl http://localhost:15672
至此,rabbitmq安装完毕,web页面也可以访问了。默认用户名和密码是guest/guest,但是,rabbitmq默认会创建guest用户,但是只能服务器本机登录,建议创建其他新用户,授权,用来做其他操作。所以我们接下来开始创建一个新的用户
# 查看所有用户 38 sudo rabbitmqctl list_users #增加用户admin 密码是passwd(根据需求自定义即可) 39 sudo rabbitmqctl add_user admin passwd # 给普通用户分配管理员角色 40 sudo rabbitmqctl set_user_tags admin administrator #赋予virtual host中所有资源的配置、写、读权限以便管理其中的资源,也是添加远程访问权限 41 sudo rabbitmqctl set_permissions -p / admin '.*' '.*' '.*'
使用admin远程登录
配置文件解读
rabbitmq-env.conf rabbitmq的环境变量
root@guofu:~# cd /etc/rabbitmq/ root@guofu:/etc/rabbitmq# ls enabled_plugins rabbitmq-env.conf root@guofu:/etc/rabbitmq# cat rabbitmq-env.conf # Defaults to rabbit. This can be useful if you want to run more than one node # per machine - RABBITMQ_NODENAME should be unique per erlang-node-and-machine # combination. See the clustering on a single machine guide for details: # http://www.rabbitmq.com/clustering.html#single-machine #NODENAME=rabbit --节点名称,如果服务是集群的形式,每个节点的名称必须唯一 # By default RabbitMQ will bind to all interfaces, on IPv4 and IPv6 if # available. Set this if you only want to bind to one network interface or# # address family. #NODE_IP_ADDRESS=127.0.0.1 --节点的ip地址 # Defaults to 5672. #NODE_PORT=5672 --节点的端口号 # Default rabbitmq-server wait timeout.
mq服务器的架构
- 我们先来看一下rabbitmq的架构图
-
- Broker : 标识消息队列服务器实体rabbitmq-server
- v-host : Virtual Host 虚拟主机。vhost是rabbitmq分配权限的最小细粒度,比如,我有两个用户a和b,我如果想让a用户只访问a1队列,b用户访问b1队列,那么在同一个vhost下,这是做不到的。 我们可以为一个用户分配一个可以访问哪个或者哪一些vhost的权限。但是不能为用户分配一个可以访问哪一些exchange,或者queue的权限,因为rabbitmq的权限细粒度没有细化到交换器和队列,他的最小细粒度是vhost(vhost中包含许多的exchanges,queues,bingdings)。 所以如果exchangeA 和queueA 只能让用户A访问,exchangeB 和queueB 只能让用户B访问,要达到这种需求,只能为exchangeA 和queueA创建一个vhostA,为exchangeB 和queueB 创建vhostB,这样就隔离开来了。。vhost是AMQP概念的基础,必须在链接时指定。 RabbitMQ默认的vhost是 /。查看所有虚拟主机的命令是
root@guofu:/etc/rabbitmq# rabbitmqctl add_vhost test_vhost Creating vhost "test_vhost" root@guofu:/etc/rabbitmq# rabbitmqctl list_vhosts Listing vhosts / test_vhost # 查看用户列表 root@guofu:/etc/rabbitmq# rabbitmqctl list_users Listing users admin [administrator] guest [administrator] # 分配访问权限 set_permissions [-p <vhost>] <user> <conf> <write> <read> # 需要注意的是RabbitMQ会缓存每个connection或channel的权限验证结果、因此权限发生变化后需要重连才能生效。 root@guofu:/etc/rabbitmq# set rabbitmqctl set_permissions -p test_host admin ".*" ".*" ".*"
exchange:交换器用来接收生产者发送的消息并将这些消息路由给服务器中的队列。
从web页面可以看到,exchange可以选择的有四种,持久化方式有两种,一种是内存,一种是硬盘
- fanout / (Publish/Subscribe) / 发布订阅
-
生产者将消息给交换机,交换机根据自身的类型(fanout)将会把所有消息复制同步到所有与其绑定的队列,每个队列可以有一个消费者接收消息进行消费逻辑。需要我们自己创建交换器并进行绑定,创建多个队列进行绑定即可,若一个消费者绑定多个队列则进行轮询,因为mq有阅后即焚的特点,只能保证一个消费者阅读接受。常用于群发消息。
- 路由模式 / Routing / direct
生产者将消息发送到交换机信息携带具体的路由key,交换机的类型是direct,将接收到的信息中的routingKey,比对与之绑定的队列routingkey。消费者监听一个队列,获取消息,执行消费逻辑。一个队列可以绑定一个routingKey也可以绑定多个。在消息进行路由时会携带一个routingKey寻找对应的队列。
- Topic/ 通配符匹配
生产者发送消息,消息中带有具体的路由key,交换机的类型是topic,队列绑定交换机不在使用具体的路由key而是一个范围值,例如: .yell.,hlll.iii,jjj.#。其中* 表示一个字符串(不能携带特殊字符)#表示任意
- header exchange(头交换机)和主题交换机有点相似,但是不同于主题交换机的路由是基于路由键,头交换机的路由值基于消息的header数据。举栗说明
队列A:绑定交换机参数是:format=pdf,type=report,x-match=all, 队列B: 绑定交换机参数是:format=pdf,type=log,x-match=any, 队列C:绑定交换机参数是:format=zip,type=report,x-match=all, 消息1发送交换机的头参数是:format=pdf,type=reprot则消息传送到队列A 消息2发送交换机的头参数是:format=pdf则消息传送到队列A和队列B 消息3发送交换机的头参数是:format=zip,type=log则消息没有匹配队列,此消息会被丢弃 all: 默认值。一个传送消息的header里的键值对和交换机的header键值对全部匹配,才可以路由到对应交换机 any: 一个传送消息的header里的键值对和交换机的header键值对任意一个匹配,就可以路由到对应交换机
- queen:消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。
-
Banding : 绑定,用于消息队列和交换机之间的关联。一个绑定就是基于路由键将交换机和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。### Virtual Host的使用
-
Channel : 信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内的虚拟链接,AMQP命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说,建立和销毁TCP都是非常昂贵的开销,所以引入了信道的概念,以复用一条TCP连接。
-
Connection : 网络连接,比如一个TCP连接。
接下来我们根据上面的exchang的不同类型做一个演示
- 先来创建用户和vhost(这里为了演示,会尽可能多的使用到前面讲的命令,具体要根据需求 是否创建vhost),另外这些操作通过web页面也可以完成。
# 创建vhost root@guofu:/etc/rabbitmq# rabbitmqctl add_vhost guofu_vhost Creating vhost "guofu_vhost" #查看vhost列表 root@guofu:/etc/rabbitmq# rabbitmqctl list_vhosts Listing vhosts guofu_vhost / test_vhost # 创建用户和密码 root@guofu:/etc/rabbitmq# rabbitmqctl add_user guofu guofu Creating user "guofu" #查看用户列表 root@guofu:/etc/rabbitmq# rabbitmqctl list_users Listing users vhost1 [] admin [administrator] guofu [] guest [administrator] # 给用户设置角色,否则远程登录不了 root@guofu:/etc/rabbitmq# sudo rabbitmqctl set_user_tags guofu administrator Setting tags for user "guofu" to [administrator] #给用户 vhost的权限,3个* 代表 配置 读 写的权限 root@guofu:/etc/rabbitmq# sudo rabbitmqctl set_permissions -p guofu_vhost guofu ".*" ".*" ".*" Setting permissions for user "guofu" in vhost "guofu_vhost" # 查看用户权限 root@guofu:/etc/rabbitmq# sudo rabbitmqctl list_user_permissions guofu Listing permissions for user "guofu" guofu_vhost .* .* .*
配置完毕后,我们在页面也可以看到,已经生效了
新建一个交换机并指定vhost
新建两个队列并绑定exchange
-
我们把信息配置到代码中去相关参考资料
package main import ( "github.com/streadway/amqp" ) /** * @Description: 演示rabbitmq的exchange类型-生产,fanout 为了方便演示,忽略错误捕捉 */ func main() { //交换机 var exchange="guofu_exchange" //建立连接 用户名+密码+ip+端口号+vhost conn, _ := amqp.Dial("amqp://guofu:[email protected]:5672/guofu_vhost") //建立通道 ch, _ := conn.Channel() //声明交换机类型 ch.ExchangeDeclare( exchange, "fanout", true, false, false, false, nil, ) //定义消息 msgBody:="i am a msg3" //发送消息 相关参数 exchange, key string, mandatory, immediate bool, msg Publishing err := ch.Publish( exchange, //exchange "", //routing key(queue name) false, false, amqp.Publishing{ DeliveryMode: amqp.Persistent, //Msg set as persistent ContentType: "text/plain", Body: []byte(msgBody), }) if err !=nil{ panic(err) } }
我们通过web页面看一下
-
-
可见对于fanout 发布订阅 ,其实我们在推送消息的时候,只用到了exchange和type,而不关系队列,因为只要是绑定了该exchange的队列,都会被推送消息。也就是说,fanout模式,一个消息会被推送到多个队列,那么哪种情景会用到这种模式呢?比如 用户注册后,我既要发邮件,又要发短信,那么发短信和发邮件,就可以用fanout 这种模式
-
下面我写一下消费的代码,消费队列的方法其实都一样,这里演示一次,后面的其他类型的exchange就不演示了。
package main import ( "github.com/streadway/amqp" ) /** * @Description: 演示rabbitmq的exchange类型-生产,fanout 为了方便演示,忽略错误捕捉 */ func main() { //交换机 var exchange="guofu_exchange" //建立连接 用户名+密码+ip+端口号+vhost conn, _ := amqp.Dial("amqp://guofu:[email protected]:5672/guofu_vhost") //建立通道 ch, _ := conn.Channel() //声明交换机类型 ch.ExchangeDeclare( exchange, "fanout", true, false, false, false, nil, ) //定义消息 msgBody:="i am a msg3" //发送消息 相关参数 exchange, key string, mandatory, immediate bool, msg Publishing err := ch.Publish( exchange, //exchange "", //routing key(queue name) false, false, amqp.Publishing{ DeliveryMode: amqp.Persistent, //Msg set as persistent ContentType: "text/plain", Body: []byte(msgBody), }) if err !=nil{ panic(err) } }
上面的代码相信大家都看的明白,但是要注意的是,里面有一个点 【试探性创建】 这是什么意思?这是说,如果有这个exchange/queue,就用,没有的话就创建,刚才我并没有创建guofu_queue3,但是我监听这个队列也得到消息了
-
那么我们用消费代码创建一下新的exchange和queue
package main import ( "fmt" "github.com/streadway/amqp" ) func main() { //交换机 var exchange = "guofu_exchange_test" var queue = "guofu_queue_test" var key = "" //建立连接 用户名+密码+ip+端口号+vhost conn, _ := amqp.Dial("amqp://guofu:[email protected]:5672/guofu_vhost") //建立通道 ch, _ := conn.Channel() //试探性声明交换机类型 ch.ExchangeDeclare( exchange, "fanout", true, false, false, false, nil, ) //试探性创建队列 //声明queue 和相关属性 相关参数 name string, durable, autoDelete, exclusive, noWait bool, args Table _, err := ch.QueueDeclare( queue, true, false, false, false, nil, ) if err != nil { panic(err) } //绑定队列 (name, key, exchange string, noWait bool, args Table) 发布订阅模式的key为空 ch.QueueBind(queue, key, exchange, false, nil) // 消费队列 Consume(queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args Table) msg, err := ch.Consume( queue, "", false, false, false, false, nil, ) for d:=range msg{ fmt.Println(string(d.Body)) d.Ack(false) } }
交换机和队列被创建
另外一点是在php和java中,还有一种生产者消息确认机制,消息推送成功后支持函数回调,但是golang里面我没有找到这个方法
-
好了,我们回归exchange的第二种类型direct 路由模式,这次我们直接使用消费端的代码直接建立队列并监听
package main import ( "fmt" "github.com/streadway/amqp" ) func main() { //交换机 var exchange = "direct_guofu_exchange" var queue = "direct_guofu_queue" var key = "direct_key" //建立连接 用户名+密码+ip+端口号+vhost conn, _ := amqp.Dial("amqp://guofu:[email protected]:5672/guofu_vhost") //建立通道 ch, _ := conn.Channel() //试探性声明交换机类型 ch.ExchangeDeclare( exchange, "direct", true, false, false, false, nil, ) //试探性创建队列 //声明queue 和相关属性 相关参数 name string, durable, autoDelete, exclusive, noWait bool, args Table _, err := ch.QueueDeclare( queue, true, false, false, false, nil, ) if err != nil { panic(err) } //绑定队列 (name, key, exchange string, noWait bool, args Table) 发布订阅模式的key为空 ch.QueueBind(queue, key, exchange, false, nil) // 消费队列 Consume(queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args Table) msg, err := ch.Consume( queue, "", false, false, false, false, nil, ) for d:=range msg{ fmt.Println(string(d.Body)) d.Ack(false) } }
-
使用同样方法 创建队列direct_guofu_queue
-
推送消息到该队列,需要注意的是,如果你两个queue使用了同一个key,那么exchange会根据key 推送给两个队列,如果不是业务需要,尽量避免重复key ,减少脏数据的生成
package main import ( "github.com/streadway/amqp" ) /** * @Description: 演示rabbitmq的exchange类型-生产,fanout 为了方便演示,忽略错误捕捉 */ func main() { var exchange = "direct_guofu_exchange" var key = "direct_key" //建立连接 用户名+密码+ip+端口号+vhost conn, _ := amqp.Dial("amqp://guofu:[email protected]:5672/guofu_vhost") //建立通道 ch, _ := conn.Channel() //声明交换机类型 ch.ExchangeDeclare( exchange, "direct", true, false, false, false, nil, ) //定义消息 msgBody:="i am a direct" //发送消息 相关参数 exchange, key string, mandatory, immediate bool, msg Publishing err := ch.Publish( exchange, //exchange key, //routing key(queue name) false, false, amqp.Publishing{ DeliveryMode: amqp.Persistent, //Msg set as persistent ContentType: "text/plain", Body: []byte(msgBody), }) if err !=nil{ panic(err) } }
- topic模式 topic 类似于mysql的模糊查询,只要是能模糊匹配到的,都会推送消息。,推送的路由可以是一个包含了多个属性,以.分割的字符串,最大程长度是200左右。推送之后,其他会匹配其他队列的路由,如果匹配到了,则推送进去。现在我们假设有以下场景 一共有两个队列,第一个队列animal 如果是动物,进入这个队列,第二个队列是 plant,第三个队列是表示颜色yellow,如果是黄色的都进入这个队列,现在我们要推送这个几个到队列里面去 1.橘猫 既要去animal 也要去 yellow 2.菊花 既要去plant 也要去yellow
如代码所示,我创建了三个队列,绑定的key 分别是 #.animal.#,#.plant.#,yellow.#,
var exchange = "topic_guofu_exchange" var queue = "topic727_yellow" var key = "yellow.#" var exchange = "topic_guofu_exchange" var queue = "topic727_animal" var key = "#.animal.#" var exchange = "topic_guofu_exchange" var queue = "topic727_plant" var key = "#.plant.#"
那么当我推送消息的时候,如果我topic绑定的路由键 是 yellow.animal.plant ,那么推送的时候 三个消息队列都会被匹配。我们来看一下
- 把生产的代码贴出来
-
package main import ( "github.com/streadway/amqp" ) /** * @Description: 演示rabbitmq的exchange类型-生产,fanout 为了方便演示,忽略错误捕捉 */ func main() { var exchange = "topic_guofu_exchange" var key = "yellow.animal.plant " var queue = "topic727" //建立连接 用户名+密码+ip+端口号+vhost conn, _ := amqp.Dial("amqp://guofu:[email protected]:5672/guofu_vhost") //建立通道 ch, _ := conn.Channel() //声明交换机类型 ch.ExchangeDeclare( exchange, "topic", true, false, false, false, nil, ) //试探性创建队列 //声明queue 和相关属性 相关参数 name string, durable, autoDelete, exclusive, noWait bool, args Table _, err := ch.QueueDeclare( queue, true, false, false, false, nil, ) if err != nil { panic(err) } //绑定队列 (name, key, exchange string, noWait bool, args Table) 发布订阅模式的key为空 ch.QueueBind(queue, key, exchange, false, nil) //定义消息 msgBody := key //发送消息 相关参数 exchange, key string, mandatory, immediate bool, msg Publishing err = ch.Publish( exchange, //exchange key, //routing key(queue name) false, false, amqp.Publishing{ DeliveryMode: amqp.Persistent, //Msg set as persistent ContentType: "text/plain", Body: []byte(msgBody), }) if err != nil { panic(err) } }
推送完毕,发现四个队列都有了数据(第一个队列是topic 推送时候绑定的,后面三个是路由匹配的)
那么此时,如果我推送的key是yellow.animal,那么路由会匹配到 yellow.# 和 #.animal.#,我们来看一下
-
topic的功能是比较强大的,利用好topic ,可以实现 direct和fanout的功能,路由密钥中可以包含任意多个单词,最多255个字节。
-
header exchange(头交换机)和主题交换机有点相似,但是不同于主题交换机的路由是基于路由键,头交换机的路由值基于消息的header数据。在此也不做赘述了。有兴趣的同学可以去官网看看。