RabbitMQ 是基于 Erlang 语言开发的,该语言天然支持集群分布式模式,因此部署 RabbitMQ 集群非常简单。
RabbitMQ 的集群部署有两种模式:
-
普通集群:又称为标准集群,是一种分布式集群,将队列分散到集群的各个节点,提高整个集群的并发能力。
-
镜像集群:在普通集群的基础上,添加了主从备份功能,又称为主从复制集群,提高集群的数据可用性。
镜像集群虽然支持主从复制,但主从同步并不是强一致的,某些情况下可能有数据丢失的风险。因此在 RabbitMQ 的 3.8 版本以后,推出了新的功能:使用仲裁队列来代替镜像集群,底层采用 Raft 协议确保主从的数据一致性。
本篇博客基于 RabbitMQ 3.12 版本搭建普通集群,以及介绍如何使用仲裁队列,镜像集群就不介绍了。
一、普通集群部署
普通集群,又称为标准集群,具备下列特征:
-
在集群的各个节点间共享部分数据,包括:交换机、队列元信息。但不包含队列中的消息。
-
当访问集群某节点时,如果队列不在该节点,会从数据所在节点传递到当前节点并返回
-
队列所在节点如果宕机,队列中的消息就会丢失,因此普通集群只是提高了并发能力,并未实现高可用
要想真正实现高可用,只需要创建仲裁队列即可,操作非常容易,下面会进行介绍。我们先把普通集群搭建出来。
我在自己的 CentOS7 虚拟机(IP 地址是 192.168.136.129)上采用 docker-compose 进行搭建
既然是集群,至少需要 3 个节点,具体部署细节规划如下:
主机名 hostname | 节点名称 | AMQP通信端口 | WEB 控制台端口 |
---|---|---|---|
mq1 | rabbit@mq1 | 5671 | 15671 |
mq2 | rabbit@mq2 | 5672 | 15672 |
mq3 | rabbit@mq3 | 5673 | 15673 |
需要注意的是:RabbitMQ 节点的名称默认是 rabbit@主机名。由于在集群部署中,每个 RabbitMQ 节点的配置文件中,需要配置其它节点的名称,因此主机名 hostname 命名很重要。
先创建一个节点所需要的相关文件夹,将相关文件准备好后,直接复制出两外 2 个节点的文件夹即可。
mkdir -p /root/rabbitmq_cluster/mq1/data
在 mq1 文件夹内创建配置文件 rabbitmq.conf,该文件用于每个 RabbitMQ 节点的配置文件:
vim /root/rabbitmq_cluster/mq1/rabbitmq.conf
在 rabbitmq.conf 文件中填写以下内容并保存:
loopback_users.guest = false
listeners.tcp.default = 5672
cluster_formation.peer_discovery_backend = rabbit_peer_discovery_classic_config
cluster_formation.classic_config.nodes.1 = rabbit@mq1
cluster_formation.classic_config.nodes.2 = rabbit@mq2
cluster_formation.classic_config.nodes.3 = rabbit@mq3
集群模式中的每个 RabbitMQ 节点必须使用相同的 cookie 才能被允许相互通信。cookie 只是一串最多 255 个字符的字母数字字符。因此我们在 mq1 文件夹中再创建一个 cookie 文件,并设置权限,防止其它用户读写:
vim /root/rabbitmq_cluster/mq1/.erlang.cookie
# 内容可以随便填写,这里填写为如下字符串
TCMKLFQVGZWWJYUVOLWI
# 保存后,设置该文件的权限,防止其它用户读写
chmod 600 /root/rabbitmq_cluster/mq1/.erlang.cookie
经过以上操作步骤,RabbitMQ 的一个节点的部署文件都准备好了,然后复制出另外 2 个节点的部署文件:
cp -r /root/rabbitmq_cluster/mq1 /root/rabbitmq_cluster/mq2
cp -r /root/rabbitmq_cluster/mq1 /root/rabbitmq_cluster/mq3
然后在 rabbitmq_cluster 目录下,创建出 docker-compose.yml 文件,填写以下内容并保存:
version: '3'
services:
# 服务名称
rabbitmq1:
# 镜像名称
image: rabbitmq:3.12-management
# 容器名称
container_name: mq1
# 主机名称
hostname: mq1
# 容器随着docker启动而自动启动
restart: always
# 宿主机映射到容器内的相应端口
ports:
- 5671:5672
- 15671:15672
# 配置自动创建的账号信息
environment:
RABBITMQ_DEFAULT_USER: jobs
RABBITMQ_DEFAULT_PASS: 123456
RABBITMQ_DEFAULT_VHOST: '/'
volumes:
# 数据目录映射
- /root/rabbitmq_cluster/mq1/data:/var/lib/rabbitmq
# 配置文件映射
- /root/rabbitmq_cluster/mq1/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf
# cookie文件映射
- /root/rabbitmq_cluster/mq1/.erlang.cookie:/var/lib/rabbitmq/.erlang.cookie
# 配置使用的桥接网络
networks:
- mqNetwork
rabbitmq2:
image: rabbitmq:3.12-management
container_name: mq2
hostname: mq2
restart: always
ports:
- 5672:5672
- 15672:15672
environment:
RABBITMQ_DEFAULT_USER: jobs
RABBITMQ_DEFAULT_PASS: 123456
RABBITMQ_DEFAULT_VHOST: '/'
volumes:
- /root/rabbitmq_cluster/mq2/data:/var/lib/rabbitmq
- /root/rabbitmq_cluster/mq2/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf
- /root/rabbitmq_cluster/mq2/.erlang.cookie:/var/lib/rabbitmq/.erlang.cookie
networks:
- mqNetwork
rabbitmq3:
image: rabbitmq:3.12-management
container_name: mq3
hostname: mq3
restart: always
ports:
- 5673:5672
- 15673:15672
environment:
RABBITMQ_DEFAULT_USER: jobs
RABBITMQ_DEFAULT_PASS: 123456
RABBITMQ_DEFAULT_VHOST: '/'
volumes:
- /root/rabbitmq_cluster/mq3/data:/var/lib/rabbitmq
- /root/rabbitmq_cluster/mq3/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf
- /root/rabbitmq_cluster/mq3/.erlang.cookie:/var/lib/rabbitmq/.erlang.cookie
networks:
- mqNetwork
# 创建一个桥接网络,把各个 rabbitmq 实例连接在一起
networks:
mqNetwork:
driver: bridge
最后的目录结构如下图所示:
然后进入 docker-compose.yml 所在的目录,运行 docker-compose up -d
即可启动集群
随便访问其中一个节点的 web 控制台,比如访问 http://192.168.136.129:15671
并登录进入即可看到集群包含的 3 个节点:
最后我们为集群中每个节点安装 RabbitMQ 的延迟插件,具体安装步骤参考上篇博客。
二、普通队列和仲裁队列
集群搭建好之后,在创建队列的界面中,Type 选择 Classic 表示创建普通队列,选择 Quorum 表示创建仲裁队列。
普通队列和仲裁队列的区别在于:
普通队列只会存放在集群中的一个节点上,虽然通过其它节点访问普通队列,但是其它节点只是把请求转发到队列所在的节点进行操作。一旦队列所在节点如果宕机,队列中的消息就会丢失,因此普通集群只是提高了并发能力,并未实现高可用。
仲裁队列是 3.8 版本以后才有的新功能,用来替代镜像队列,属于主从模式,支持基于 Raft 协议强一致的主从数据同步。虽然请求仍然都是由主节点进行操作,然后同步到从节点中。但是对于任何节点来说,既可能是某个仲裁队列的主节点,也可能是其它仲裁队列的从节点。因此也具有分散节点压力,提高并发访问的特点。另外如果主节点挂了,其中的某个从节点就会变成主节点,并在其它节点上尽可能创建出新的主节点,保障主从数量一致。
一个仲裁队列的默认数量是 5,即一个主节点,4个副本节点,如果集群中节点数量少于 5 ,比如我们搭建了 3 个节点的集群,那么创建的仲裁队列就是 1 主 2 副本。当然如果集群中的节点数大于 5 个的话,那么就只会在 5 个节点中创建出 1 主 4 副本。
由此可见:仲裁队列使用非常简单,集群中使用仲裁队列,可以极大的保障 RabbitMQ 集群对接的高可用。
三、代码连接操作集群
首先 application.yml 配置文件,以发送者为例,连接 RabbitMQ 的集群信息配置如下:
spring:
rabbitmq:
# 连接 RabbitMQ 集群,填写上每个节点的地址,使用英文逗号分隔
addresses: 192.168.136.129:5671,192.168.136.129:5672,192.168.136.129:5673
username: jobs
password: 123456
virtual-host: /
区别在于不使用 host 和 port 这两项配置了,使用 addresses 代替,填写上所有节点的 ip 和端口即可。
下面列出消费者接收程序,有关普通队列和仲裁队列的注解声明创建方式:
package com.jobs.listener;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class SpringAmqpListener {
//接收普通队列的消息
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "normal.queue"),
exchange = @Exchange(name = "test.exchange", type = ExchangeTypes.DIRECT),
key = "normal"
))
public void listenerNormarlQueue(String msg) {
log.info("接收到 normal.queue 消息:" + msg);
}
//接收仲裁队列的消息
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "quorum.queue",
//给队列设置参数 x-queue-type 值为 quorum 就可将队列变为仲裁队列
arguments = @Argument(name = "x-queue-type", value = "quorum")),
exchange = @Exchange(name = "test.exchange", type = ExchangeTypes.DIRECT),
key = "quorum"
))
public void listenerQuorumQueue(String msg) {
log.info("接收到 quorum.queue 消息:" + msg);
}
}
发送程序跟之前一样,没啥变化,如下所示:
package com.jobs;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@Slf4j
@SpringBootTest
public class PublishMsgTest {
@Autowired
private RabbitTemplate rabbitTemplate;
//发送消息到普通队列
@Test
void publishNormalTest() {
String message = "normal message test";
String exchange = "test.exchange";
String rootingkey = "normal";
//发送消息
rabbitTemplate.convertAndSend(exchange, rootingkey, message);
}
//发送到消息到仲裁队列
@Test
void publishQuorumTest() {
String message = "quorum message test";
String exchange = "test.exchange";
String rootingkey = "quorum";
//发送消息
rabbitTemplate.convertAndSend(exchange, rootingkey, message);
}
}
下图是普通队列和仲裁队列的区别:
可以发现仲裁队列后面有个 +2 的提示,表示有 2 个副本,点击 quorum.queue 查看详情:
可以发现:主节点在 rabbit@mq1,目前 3 个节点都在线,其它两个节点是副本节点。
如果我们把 rabbit@mq1 节点的 docker 容器停掉后,副本中的某个节点就会变成主节点,如果再把 rabbit@mq1 节点的 docker 容器启动后,它就变成了从节点,如下图所示,这里只截图最终的效果,大家可以自行进行验证过程。
到此为止,RabbitMQ 集群的搭建以及仲裁队列的使用已经介绍完毕,非常简单。
本篇博客的源代码下载地址为:https://files.cnblogs.com/files/blogs/699532/spring_rmq_cluster.zip