首页 > 其他分享 >RabbitMQ 集群搭建和仲裁队列的使用

RabbitMQ 集群搭建和仲裁队列的使用

时间:2023-10-06 09:22:05浏览次数:62  
标签:队列 rabbitmq mq1 cluster 集群 仲裁 RabbitMQ 节点

RabbitMQ 是基于 Erlang 语言开发的,该语言天然支持集群分布式模式,因此部署 RabbitMQ 集群非常简单。

RabbitMQ 的集群部署有两种模式:

  • 普通集群:又称为标准集群,是一种分布式集群,将队列分散到集群的各个节点,提高整个集群的并发能力。

  • 镜像集群:在普通集群的基础上,添加了主从备份功能,又称为主从复制集群,提高集群的数据可用性。

镜像集群虽然支持主从复制,但主从同步并不是强一致的,某些情况下可能有数据丢失的风险。因此在 RabbitMQ 的 3.8 版本以后,推出了新的功能:使用仲裁队列来代替镜像集群,底层采用 Raft 协议确保主从的数据一致性。

本篇博客基于 RabbitMQ 3.12 版本搭建普通集群,以及介绍如何使用仲裁队列,镜像集群就不介绍了。


一、普通集群部署

普通集群,又称为标准集群,具备下列特征:

  • 在集群的各个节点间共享部分数据,包括:交换机、队列元信息。但不包含队列中的消息。

  • 当访问集群某节点时,如果队列不在该节点,会从数据所在节点传递到当前节点并返回

  • 队列所在节点如果宕机,队列中的消息就会丢失,因此普通集群只是提高了并发能力,并未实现高可用

要想真正实现高可用,只需要创建仲裁队列即可,操作非常容易,下面会进行介绍。我们先把普通集群搭建出来。

我在自己的 CentOS7 虚拟机(IP 地址是 192.168.136.129)上采用 docker-compose 进行搭建

既然是集群,至少需要 3 个节点,具体部署细节规划如下:

主机名 hostname 节点名称 AMQP通信端口 WEB 控制台端口
mq1 rabbit@mq1 5671 15671
mq2 rabbit@mq2 5672 15672
mq3 rabbit@mq3 5673 15673

需要注意的是:RabbitMQ 节点的名称默认是 rabbit@主机名。由于在集群部署中,每个 RabbitMQ 节点的配置文件中,需要配置其它节点的名称,因此主机名 hostname 命名很重要。

先创建一个节点所需要的相关文件夹,将相关文件准备好后,直接复制出两外 2 个节点的文件夹即可。

mkdir -p /root/rabbitmq_cluster/mq1/data

在 mq1 文件夹内创建配置文件 rabbitmq.conf,该文件用于每个 RabbitMQ 节点的配置文件:

vim /root/rabbitmq_cluster/mq1/rabbitmq.conf

在 rabbitmq.conf 文件中填写以下内容并保存:

loopback_users.guest = false
listeners.tcp.default = 5672
cluster_formation.peer_discovery_backend = rabbit_peer_discovery_classic_config
cluster_formation.classic_config.nodes.1 = rabbit@mq1
cluster_formation.classic_config.nodes.2 = rabbit@mq2
cluster_formation.classic_config.nodes.3 = rabbit@mq3

集群模式中的每个 RabbitMQ 节点必须使用相同的 cookie 才能被允许相互通信。cookie 只是一串最多 255 个字符的字母数字字符。因此我们在 mq1 文件夹中再创建一个 cookie 文件,并设置权限,防止其它用户读写:

vim /root/rabbitmq_cluster/mq1/.erlang.cookie
# 内容可以随便填写,这里填写为如下字符串
TCMKLFQVGZWWJYUVOLWI
# 保存后,设置该文件的权限,防止其它用户读写
chmod 600 /root/rabbitmq_cluster/mq1/.erlang.cookie

经过以上操作步骤,RabbitMQ 的一个节点的部署文件都准备好了,然后复制出另外 2 个节点的部署文件:

cp -r /root/rabbitmq_cluster/mq1 /root/rabbitmq_cluster/mq2
cp -r /root/rabbitmq_cluster/mq1 /root/rabbitmq_cluster/mq3

然后在 rabbitmq_cluster 目录下,创建出 docker-compose.yml 文件,填写以下内容并保存:

version: '3'
services:
  # 服务名称
  rabbitmq1:
    # 镜像名称
    image: rabbitmq:3.12-management
    # 容器名称
    container_name: mq1
    # 主机名称
    hostname: mq1
    # 容器随着docker启动而自动启动
    restart: always
    # 宿主机映射到容器内的相应端口
    ports:
      - 5671:5672
      - 15671:15672
    # 配置自动创建的账号信息
    environment:
      RABBITMQ_DEFAULT_USER: jobs
      RABBITMQ_DEFAULT_PASS: 123456
      RABBITMQ_DEFAULT_VHOST: '/'
    volumes:
      # 数据目录映射
      - /root/rabbitmq_cluster/mq1/data:/var/lib/rabbitmq
      # 配置文件映射
      - /root/rabbitmq_cluster/mq1/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf
      # cookie文件映射
      - /root/rabbitmq_cluster/mq1/.erlang.cookie:/var/lib/rabbitmq/.erlang.cookie
    # 配置使用的桥接网络
    networks:
      - mqNetwork

  rabbitmq2:
    image: rabbitmq:3.12-management
    container_name: mq2
    hostname: mq2
    restart: always
    ports:
      - 5672:5672
      - 15672:15672
    environment:
      RABBITMQ_DEFAULT_USER: jobs
      RABBITMQ_DEFAULT_PASS: 123456
      RABBITMQ_DEFAULT_VHOST: '/'
    volumes:
      - /root/rabbitmq_cluster/mq2/data:/var/lib/rabbitmq
      - /root/rabbitmq_cluster/mq2/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf
      - /root/rabbitmq_cluster/mq2/.erlang.cookie:/var/lib/rabbitmq/.erlang.cookie
    networks:
      - mqNetwork

  rabbitmq3: 
    image: rabbitmq:3.12-management
    container_name: mq3
    hostname: mq3
    restart: always
    ports:
      - 5673:5672
      - 15673:15672
    environment:
      RABBITMQ_DEFAULT_USER: jobs
      RABBITMQ_DEFAULT_PASS: 123456
      RABBITMQ_DEFAULT_VHOST: '/'
    volumes:
      - /root/rabbitmq_cluster/mq3/data:/var/lib/rabbitmq
      - /root/rabbitmq_cluster/mq3/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf
      - /root/rabbitmq_cluster/mq3/.erlang.cookie:/var/lib/rabbitmq/.erlang.cookie
    networks:
      - mqNetwork

# 创建一个桥接网络,把各个 rabbitmq 实例连接在一起
networks:
  mqNetwork:
    driver: bridge

最后的目录结构如下图所示:

image

然后进入 docker-compose.yml 所在的目录,运行 docker-compose up -d 即可启动集群

随便访问其中一个节点的 web 控制台,比如访问 http://192.168.136.129:15671并登录进入即可看到集群包含的 3 个节点:

image

最后我们为集群中每个节点安装 RabbitMQ 的延迟插件,具体安装步骤参考上篇博客。


二、普通队列和仲裁队列

集群搭建好之后,在创建队列的界面中,Type 选择 Classic 表示创建普通队列,选择 Quorum 表示创建仲裁队列。

image

普通队列和仲裁队列的区别在于:

普通队列只会存放在集群中的一个节点上,虽然通过其它节点访问普通队列,但是其它节点只是把请求转发到队列所在的节点进行操作。一旦队列所在节点如果宕机,队列中的消息就会丢失,因此普通集群只是提高了并发能力,并未实现高可用。

仲裁队列是 3.8 版本以后才有的新功能,用来替代镜像队列,属于主从模式,支持基于 Raft 协议强一致的主从数据同步。虽然请求仍然都是由主节点进行操作,然后同步到从节点中。但是对于任何节点来说,既可能是某个仲裁队列的主节点,也可能是其它仲裁队列的从节点。因此也具有分散节点压力,提高并发访问的特点。另外如果主节点挂了,其中的某个从节点就会变成主节点,并在其它节点上尽可能创建出新的主节点,保障主从数量一致。

一个仲裁队列的默认数量是 5,即一个主节点,4个副本节点,如果集群中节点数量少于 5 ,比如我们搭建了 3 个节点的集群,那么创建的仲裁队列就是 1 主 2 副本。当然如果集群中的节点数大于 5 个的话,那么就只会在 5 个节点中创建出 1 主 4 副本。

由此可见:仲裁队列使用非常简单,集群中使用仲裁队列,可以极大的保障 RabbitMQ 集群对接的高可用。


三、代码连接操作集群

首先 application.yml 配置文件,以发送者为例,连接 RabbitMQ 的集群信息配置如下:

spring:
  rabbitmq:
    # 连接 RabbitMQ 集群,填写上每个节点的地址,使用英文逗号分隔
    addresses: 192.168.136.129:5671,192.168.136.129:5672,192.168.136.129:5673
    username: jobs
    password: 123456
    virtual-host: /

区别在于不使用 host 和 port 这两项配置了,使用 addresses 代替,填写上所有节点的 ip 和端口即可。

下面列出消费者接收程序,有关普通队列和仲裁队列的注解声明创建方式:

package com.jobs.listener;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class SpringAmqpListener {

    //接收普通队列的消息
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "normal.queue"),
            exchange = @Exchange(name = "test.exchange", type = ExchangeTypes.DIRECT),
            key = "normal"
    ))
    public void listenerNormarlQueue(String msg) {
        log.info("接收到 normal.queue 消息:" + msg);
    }

    //接收仲裁队列的消息
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "quorum.queue",
                    //给队列设置参数 x-queue-type 值为 quorum 就可将队列变为仲裁队列
                    arguments = @Argument(name = "x-queue-type", value = "quorum")),
            exchange = @Exchange(name = "test.exchange", type = ExchangeTypes.DIRECT),
            key = "quorum"
    ))
    public void listenerQuorumQueue(String msg) {
        log.info("接收到 quorum.queue 消息:" + msg);
    }
}

发送程序跟之前一样,没啥变化,如下所示:

package com.jobs;

import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

@Slf4j
@SpringBootTest
public class PublishMsgTest {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    //发送消息到普通队列
    @Test
    void publishNormalTest() {
        String message = "normal message test";
        String exchange = "test.exchange";
        String rootingkey = "normal";
        //发送消息
        rabbitTemplate.convertAndSend(exchange, rootingkey, message);
    }

    //发送到消息到仲裁队列
    @Test
    void publishQuorumTest() {
        String message = "quorum message test";
        String exchange = "test.exchange";
        String rootingkey = "quorum";
        //发送消息
        rabbitTemplate.convertAndSend(exchange, rootingkey, message);
    }
}

下图是普通队列和仲裁队列的区别:

image

可以发现仲裁队列后面有个 +2 的提示,表示有 2 个副本,点击 quorum.queue 查看详情:

image

可以发现:主节点在 rabbit@mq1,目前 3 个节点都在线,其它两个节点是副本节点。

如果我们把 rabbit@mq1 节点的 docker 容器停掉后,副本中的某个节点就会变成主节点,如果再把 rabbit@mq1 节点的 docker 容器启动后,它就变成了从节点,如下图所示,这里只截图最终的效果,大家可以自行进行验证过程。

image


到此为止,RabbitMQ 集群的搭建以及仲裁队列的使用已经介绍完毕,非常简单。

本篇博客的源代码下载地址为:https://files.cnblogs.com/files/blogs/699532/spring_rmq_cluster.zip

标签:队列,rabbitmq,mq1,cluster,集群,仲裁,RabbitMQ,节点
From: https://www.cnblogs.com/studyjobs/p/17744247.html

相关文章

  • 04_猫狗队列
    猫狗队列【题目】宠物、狗和猫的类如下:publicclassPet{ privateStringtype;publicPet(Stringtype){this.type=type;}publicStringgetPetType(){returnthis.type;}}publicclassDogextendsPet{publ......
  • RabbitMQ 死信交换机、延迟队列、惰性队列
    如果一个队列设置了死信交换机,该队列的消息就有了极大的可靠性保障,当出现以下情况时,消息就会投递到死信交换机中:队列中的消息在被消费者处理后,抛出异常,返回了nack或者reject如果队列设置了ttl或者消息本身设置了ttl,消息因为超时而未消费队列容量已经满了,后续发来的消息......
  • RabbitMQ 消息发送和消费的可靠性保障
    在一些比较重要的场景中,我们必须要保障RabbitMQ消息的可靠性,也就是发送给rabbitmq的消息必须最终成功,消费者接收消息进行处理也必须最终成功。即使是中间失败了,也必须要有其它保障措施,哪怕最后进行人工进行干预处理。消息出现丢失的场景主要有:发送消息时丢失:比如消息发送到......
  • 【UVA 12100】Printer Queue 题解(队列+优先队列)
    计算机科学学生会中唯一的打印机正经历着极其繁重的工作量。有时,打印机队列中有100个作业,您可能需要等待数小时才能获得一页输出。由于某些作业比其他作业更重要,黑客将军为打印作业队列发明并实现了一个简单的优先级系统。现在,为每个作业分配1到9之间的优先级(9是最高优先级,1是最低......
  • [数据结构和算法] 堆/优先队列的实现
    预备知识:完全二叉树可以用数组表示:从下标0开始存储数据:左子节点=2*父节点+1,右子节点=2*父节点+2;从下标1开始存储数据:左子结点=2*父节点,右子节点=2*父节点+1;堆:大根堆:父节点的值大于等于左右子节点的值;小根堆:父节点的值小于等于左右子节点的值;......
  • 【数据结构】2.栈和队列
    1.栈1.1栈的抽象父类#pragmaoncetemplate<classT>classStack{public://析构函数virtual~Stack(){}//栈是否为空virtualboolempty()const=0;//栈的大小virtualintsize()const=0;//栈顶元素virtualT&top()=0......
  • 【数据结构】栈、队列和数组
    栈、队列和数组栈队列数组数组的顺序表示和实现顺序表中查找和修改数组元素矩阵的压缩存储特殊矩阵稀疏矩阵栈初始化#defineMaxSize50//栈中元素的最大个数typedefcharElemType;//数据结构typedefstruct{inttop;//栈顶指针ElemTypedata[MaxSize];//存放栈中......
  • Java语言通过三种方法来实现队列
    队列关于作者作者介绍......
  • 多重背包单调队列优化
    引用自:动态规划-背包问题(01背包、完全背包、多重背包)#include<cstdio>#include<algorithm>#include<cstring>usingnamespacestd;constintmaxn=100005;intn,m,cnt;intv[102],num[102],dp[maxn];structQueue{intpos,val;}q[maxn];intmain(){......
  • 使用链表模拟队列和栈
    使用链表模拟队列案例1//创建节点类publicclassNode{intn;Nodenext;}//编写方法publicclassQueue{Nodehead=newNode();Nodelast=newNode();privateintlen=0;publicintsize(){returnthis.len;}......