首页 > 其他分享 >RabbitMQ消息队列:概念、单节点和集群示例

RabbitMQ消息队列:概念、单节点和集群示例

时间:2024-08-19 17:55:47浏览次数:10  
标签:示例 队列 RabbitMQ 消息 root 节点 localhost

目录

消息队列

概念

主流的消息队列

消息队列名词

(1)Broker

(2)Topic

(3)Producer

(4)Consumer

(5)Queue

(6)Message

消息队列中两种工作模式

Point-to-Point(PTP、点到点)

Pub/Sub

消息队列的缺点

系统可用性降低

系统复杂性提高

数据一致性无法保证

RabbitMQ

相关术语

(1)生产者

(2)消费者

(3)队列

(4)交换器

(5)虚拟主机

单节点示例

示例环境

部署RabbitMQ

测试消息

RabbitMQ常用命令

(1)虚拟机管理

(2)用户管理

(3)tags角色

(4)插件管理

(5)其他

RabbitMQ集群示例

RabbitMQ集群概念

内存节点

磁盘节点

关系

初步设置

同步Cookie

分配节点


消息队列

消息队列:Message Queue(MQ)

概念

  • 消息(Message)是指在应用间传送的数据
  • 消息队列(Message Queue)是一种应用间的通信方式,确保消息的可靠传递
  • 临时存放数据的空间(缓存)
  • 消息队列是中间件的一种

主流的消息队列

  • 目前主流的几大消息队列有:RabbitMQ、ActiveMQ、RocketMQ、Kafka、ZeroMQ

消息队列名词

(1)Broker

  • 消息服务器,作为Server提供消息核心服务

(2)Topic

  • 主题,同类消息的集合,不同生产者向Topic发送消息,由MQ服务器分发到不同的订阅者,实现消息的传输

(3)Producer

  • 消息的生产者,业务的发起方,负责生产消息传输给broker

(4)Consumer

  • 消息的消费者,业务的处理方,负责从broker获取消息并进行业务逻辑处理

(5)Queue

  • 队列,Point-to-Point模式下,特定生产者向特定的queue发送消息,消费者订阅特定的queue完成指定消息的接收,先进先出

(6)Message

  • 消息,根据不同通信协议定义的固定格式进行编码的数据包,来封装业务数据,实现消息的传输

消息队列中两种工作模式

Point-to-Point(PTP、点到点)

  • 一方发送消息,另外一方接收

比如消息发布之后,消息在队列中会等待消费者提取,消费者提取完后,整个队列会向前进一位,并且释放掉被提取出的消息(一对一)

Pub/Sub

  • 发布/订阅模式,消费者可以订阅一个或多个主题并使用该主题中的所有消息

比如消息发布之后,某个消费者提取消息之后,被提取的消息不会释放,这些消息需要提供给多个消费者接收(一对多)


消息队列的缺点

系统可用性降低

  • 增加了消息队列,结构就越复杂,故障率就会提高

系统复杂性提高

  • 添加了消息队列的技术,不管是开发人员还是运维人员,在构建业务的时候都要针对消息队列做出处理

数据一致性无法保证

  • 因为增加了和接收方之间的一个中间环节,数据的延迟会增大

RabbitMQ

相关术语

(1)生产者

产生消息的进程或服务

(2)消费者

接受消息的进程或服务

(3)队列

RabbitMQ是消息队列中间件,而真正储存消息数据的就是队列,不同的程序可以使用同一个队列,而队列也可以有多个

(4)交换器

把消息进行分类,它可以根据不同的关键字,将消息发送到不同的队列

(5)虚拟主机

虚拟主机可以把一个服务器上的RabbitMQ拆分成多个单元,每一个单元都可以当成一个单独的、轻量的RabbitMQ服务器


单节点示例

单节点就是只有一台主机部署RabbitMQ,但为了充分体现消息队列作为中间件角色,就需要把该主机部署到两个程序的中间

因此我们需要3台CentOS 7虚拟机,两台作为两边的程序,其中一台部署RabbitMQ作为中间件

示例环境

操作系统

IP 地址

角色

CentOS 7

192.168.10.101

中间件

部署RabbitMQ消息队列

CentOS 7

192.168.10.102

消息的生产者

CentOS 7

192.168.10.103

消息的消费者

部署RabbitMQ

打开101主机,并连接上XShell,我们先来部署RabbitMQ

安装RabbitMQ需要仓库具有epel扩展源和安装erlang语言,自带光盘上是没有的,需要借助阿里的仓库源来安装

为了方便实验,关闭防火墙和内核安全机制

[root@localhost ~]# systemctl stop firewalld
[root@localhost ~]# setenforce 0

然后把下方命令全部复制带终端直接回车即可替换仓库源为阿里云的

rm -rf /etc/yum.repos.d/*
curl -o /etc/yum.repos.d/CentOS-Base.repo https://mirrors.aliyun.com/repo/Centos-7.repo
curl -o /etc/yum.repos.d/epel.repo https://mirrors.aliyun.com/repo/epel-7.repo
yum clean all

然后安装所需依赖,因为刚刚清除过元数据,在第一次安装时就重建元数据,需要等待一段时间

安装完后启动rabbitmq-server服务,然后可以使用ps命令查看是否有rabbitmq的进程在运行

[root@localhost ~]# yum install -y erlang rabbitmq-server
[root@localhost ~]# systemctl start rabbitmq-server
[root@localhost ~]# ps aux | grep rabbitmq

除此之外,RabbitMQ还提供了一个管理插件,是一个Web页面,可以通过这个页面进行一些简单的配置,这里我们可以先查看一下这个插件有没有启用

使用rabbitmq-plugins命令可以查看RabbitMQ所有的插件,如果列出信息的前方的中括号是空的,说明该插件没有启用,这里也可以看到我们要开启的管理插件也默认没有启用

[root@localhost ~]# rabbitmq-plugins list
[ ] rabbitmq_management               3.3.5

还是使用rabbitmq-plugins命令的enable选项指定插件名来开启指定插件,开启以后,想要指定该插件是否成功开启,再使用list选项查询所有插件

第二次查询时我们可以发现,有许多插件被开启了,同时也包含了rabbitmq_management

  • 小写字母e表示:作为依赖一起被启动
  • 大写字母E表示:被明确地启用了
[root@localhost ~]# rabbitmq-plugins enable rabbitmq_management
[root@localhost ~]# rabbitmq-plugins list
[e] amqp_client                       3.3.5
[e] mochiweb                          2.7.0-rmq3.3.5-git680dba8

[E] rabbitmq_management               3.3.5
[e] rabbitmq_management_agent         3.3.5

[e] rabbitmq_web_dispatch             3.3.5

[e] webmachine                        1.10.3-rmq3.3.5-gite9359c7

在修改了RabbitMQ的插件状态后,需要重启服务使插件生效,然后可以使用本地IP+该管理插件的端口号来访问该页面

[root@localhost ~]# systemctl restart rabbitmq-server
[root@localhost ~]# curl 127.0.0.1:15672

或者此时在宿主机的浏览器访问RabbitMQ服务器的IP+该插件的端口,就可以进入该页面进行管理,

当然了,不使用该插件提供的页面而在终端使用命令的方式也同样可以配置RabbitMQ

账号和密码都是guest,是RabbitMQ内置默认的超级管理员账号


测试消息

这里我们编写一个Python来测试消息队列

打开102和103主机,并连接上XShell,然后开启会话同步

在102和103的任意一台主机上操作

CentOS 7系统本身是自带Python的,但是是2.7的版本,而Python已经不再维护2.7版本更新,而且2.7和3.0以上版本的语句有细微不同

所以我们这里先安装Python3的版本,然后还需要安装pika软件包,是Python里的一个函数,利用该函数连接到消息队列的主机

[root@localhost ~]# yum -y install python3
[root@localhost ~]# pip3 install pika

在102上操作

这里把102主机作为消息发布者

编写生产消息的脚本

[root@localhost ~]# vim send_message.sh
import pika
 
# 连接到RabbitMQ服务器
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.10.101'))
channel = connection.channel()
 
# 声明队列;如果队列不存在会被创建
channel.queue_declare(queue='test_queue', durable=True)
 
# 发送消息到队列中
channel.basic_publish(
    exchange='',
    routing_key='test_queue',
    body='Hello, RabbitMQ!',
    properties=pika.BasicProperties(
        delivery_mode=2,  # 使消息持久化
    )
)
 
print("消息发送完毕")
 
# 关闭连接
connection.close()

使用python3命令运行该python脚本,发送消息

[root@localhost ~]# python3 send_message.sh 
消息发送完毕

此时在管理插件的页面上查看,也可以看到消息成功被发布了


在103上操作

把103主机作为消息消费者

编写接收消息的脚本

[root@localhost ~]# vim receice_message.sh
import pika
 
# 连接到RabbitMQ服务器
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.10.101'))
channel = connection.channel()
 
# 声明队列,确保RabbitMQ中有一个名为'test_queue'的队列
channel.queue_declare(queue='test_queue', durable=True)
 
# 定义回调函数来处理消息
def callback(ch, method, properties, body):
    print(f"Received {body.decode()}")
 
# 消费队列中的消息,回调函数为callback
channel.basic_consume(queue='test_queue', on_message_callback=callback, auto_ack=True)
 
print("Waiting for messages. To exit press CTRL+C")
# 开始监听消息
channel.start_consuming()

执行该python脚本,可以看到消息已经接收到了

[root@localhost ~]# python3 receive_message.sh Waiting for messages. To exit press CTRL+C Received Hello, RabbitMQ!

消息被提取之后就会被释放,在管理页面上消息的数量就变成0了


RabbitMQ常用命令

(1)虚拟机管理

列出所有的虚拟主机

[root@localhost ~]# rabbitmqctl list_vhosts

创建名为 test 的虚拟主机

[root@localhost ~]# rabbitmqctl add_vhost test

删除名字叫 test 的虚拟主机

[root@localhost ~]# rabbitmqctl delete_vhost test

(2)用户管理

创建tom用户,密码为123456

[root@localhost ~]# rabbitmqctl add_user tom 123456

列出所有用户

[root@localhost ~]# rabbitmqctl list_users

更改 tom 的密码为 new_passwd

[root@localhost ~]# rabbitmqctl change_password tom new_passwd

删除名为 tom 的用户

[root@localhost ~]# rabbitmqctl delete_user user1

(3)tags角色

角色

说明

超级管理员(administrator):guest

可登陆管理控制台(启用management plugin的情况下),可查看所有的信息,并且可以对用户,策略(policy)进行操作。

监控者(monitoring)

可登陆管理控制台(启用management plugin的情况下),同时可以查看rabbitmq节点的相关信息(进程数,内存使用情况,磁盘使用情况等)

策略制定者(policymaker)

可登陆管理控制台(启用management plugin的情况下), 同时可以对policy进行管理。但无法查看节点的相关信息。

普通管理者(management)

仅可登陆管理控制台(启用management plugin的情况下),无法看到节点信息,也无法对策略进行管理。

其他

无法登陆管理控制台,通常就是普通的生产者和消费者。

赋予tom用户management角色

[root@localhost ~]# rabbitmqctl add_user tom user1_passwd
[root@localhost ~]# rabbitmqctl set_user_tags tom managemnet  

针对test虚拟主机给tom用户设置所有的配置、读写queue和exchange的权限。默认是没有任何权限的

[root@localhost ~]# rabbitmqctl add_vhost test
[root@localhost ~]# rabbitmqctl set_permissions -p test tom '.*' '.*' '.*'

列出tom的权限

[root@localhost ~]# rabbitmqctl list_user_permissions user1

列出test下的所有用户权限

[root@localhost ~]# rabbitmqctl list_permissions -p fll

如果需要清除tom在test上的权限

[root@localhost ~]# rabbitmqctl clear_permissions -p test tom

(4)插件管理

rabbitmq-plugins list                     # 获取RabbitMQ插件列表
rabbitmq-plugins enable <插件名字>         # 安装RabbitMQ插件
rabbitmq-plugins disable <插件名字>         # 卸载某个插件

(5)其他

列出所有的交换器

rabbitmqctl list_exchanges

列出所有的绑定,即把exchange和queue按照路由规则绑定起来

rabbitmqctl list_bindings

分别查看当前系统种存在的Exchange和Exchange上绑定的Queue信息。

rabbitmqctl list_queues

查看运行信息

rabbitmqctl status

RabbitMQ集群示例

RabbitMQ是基于Erlang编写的,Erlang本身就支持分布式(通过同步Erlang集群各节点的cookie来实现),因此不需要像Kafka那样通过ZooKeeper来实现分布式集群。

RabbitMQ集群概念

在RabbitMQ集群中的每个节点,有内存节点和磁盘节点

内存节点

内存节点会将所有的元数据信息仅存储到内存中

磁盘节点

磁盘节点则不仅会将所有元数据存储到内存上, 还会将其持久化到磁盘。

关系

内存节点用于提高性能,而磁盘节点用于数据持久性。

在 RabbitMQ 集群中,通常会混合使用两者,以平衡性能和可靠性。内存节点可以帮助减少磁盘 I/O 负担,但重要的消息和队列仍然需要在磁盘节点上持久化。


系统

ip

角色

Centos

192.168.10.101

磁盘节点

Centos

192.168.10.102

内存节点

Centos

192.168.10.103

内存节点

恢复快照,打开3台主机并连接上XShell

初步设置

分别在3个主机操作

分别修改3个主机的主机名

101主机
[root@localhost ~]# hostname mq01
[root@localhost ~]# bash
102主机
[root@localhost ~]# hostname mq02
[root@localhost ~]# bash
103主机
[root@localhost ~]# hostname mq03
[root@localhost ~]# bash

开启会话同步

在101(磁盘节点)主机操作

为了方便实验,关闭防火墙并设为永久关闭,再关闭内核安全机制

实现主机直接通过主机名通信,在hosts文件末尾追加以下内容

[root@mq01 ~]# systemctl stop firewalld
[root@mq01 ~]# systemctl disable firewalld
[root@mq01 ~]# setenforce 0
[root@mq01 ~]# vim /etc/hosts
192.168.10.101 mq01
192.168.10.102 mq02
192.168.10.103 mq03

然后把下方命令全部复制带终端直接回车即可替换仓库源为阿里云的

rm -rf /etc/yum.repos.d/*
curl -o /etc/yum.repos.d/CentOS-Base.repo https://mirrors.aliyun.com/repo/Centos-7.repo
curl -o /etc/yum.repos.d/epel.repo https://mirrors.aliyun.com/repo/epel-7.repo
yum clean all

安装所需依赖,启动服务并设为开机自启,然后使用ps命令查看是否有相关进程

[root@mq01 ~]# yum install -y erlang rabbitmq-server
[root@mq01 ~]# systemctl start rabbitmq-server
[root@mq01 ~]# systemctl enable rabbitmq-server
[root@mq01 ~]# ps aux | grep rabbit

启用插件,重启服务

[root@mq01 ~]# rabbitmq-plugins enable rabbitmq_management
[root@mq01 ~]# systemctl restart rabbitmq-server

同步Cookie

关闭会话同步

在101(磁盘节点)主机操作

确保集群节点能够相互认证,如果.erlang.cookie文件不一致,节点间将无法建立信任关系,从而无法正常加入集群。

[root@mq01 ~]# scp /var/lib/rabbitmq/.erlang.cookie root@mq02:/var/lib/rabbitmq/
[root@mq01 ~]# scp /var/lib/rabbitmq/.erlang.cookie root@mq03:/var/lib/rabbitmq/

开启会话同步

RabbitMQ 节点需要重启系统以加载新的认证配置。重启确保每个节点重新加载了更新后的认证信息,并建立正确的集群连接。

[root@mq01 ~]# reboot

重启三台主机后关闭消息同步,然后检查三台主机的RabbitMQ服务,正常情况下是都在运行的

[root@mq01 ~]# ps aux | grep rabbit

关闭会话同步

分配节点

在102(内存节点①)主机操作

先停止RabbitMQ应用,以便进行集群配置而不影响正在运行的服务

再让当前节点(rabbit@mq02)加入现有的 RabbitMQ 集群,并设置为内存节点。内存节点不持久化消息,只在内存中存储数据。

然后启动 RabbitMQ 应用,使节点重新开始处理请求。

[root@mq02 ~]# rabbitmqctl stop_app
[root@mq02 ~]# rabbitmqctl join_cluster --ram rabbit@mq01
[root@mq02 ~]# rabbitmqctl start_app

在103(内存节点②)主机操作

执行上面的相同操作

[root@mq03 ~]# rabbitmqctl stop_app
[root@mq03 ~]# rabbitmqctl join_cluster --ram rabbit@mq01
[root@mq03 ~]# rabbitmqctl start_app

最后可以在任意节点查看集群状态,完成了集群的部署


[root@mq02 ~]# rabbitmqctl cluster_status
Cluster status of node rabbit@mq02 ...
[{nodes,[{disc,[rabbit@mq01]},{ram,[rabbit@mq03,rabbit@mq02]}]},
 {running_nodes,[rabbit@mq03,rabbit@mq01,rabbit@mq02]},
 {cluster_name,<<"rabbit@mq02">>},
 {partitions,[]}]
...done.
  • disc:表示磁盘节点
  • ram:表示内存节点

标签:示例,队列,RabbitMQ,消息,root,节点,localhost
From: https://blog.csdn.net/m0_65551023/article/details/141286308

相关文章

  • 【数据结构】详细介绍栈和队列,解析栈和队列每一处细节
    目录一.栈1. 栈的概念2. 栈的实现2.1栈的结构2.2初始化栈2.3入栈2.4出栈2.5获取栈顶元素2.6获取栈中有效个数2.7判断栈是否为空2.8销毁栈 二.队列1.队列的概念2.队列的实现 2.1队列的结构2.2队列初始化 2.3销毁队列 2.4入队列(队尾) ......
  • axios取消请求CancelToken的原理解析及用法示例
    文章目录一、axios的实例与请求流程二、CancelToken的作用三、CancelToken的实现原理四、取消请求的流程五、CancelToken用法六、利用拦截器取消请求1、axios请求拦截器2、axios响应拦截器3、利用路由导航守卫取消请求一、axios的实例与请求流程下图是axios实例......
  • Linux学习记录(十一)———进程间的通信(消息队列)
    文章目录4.消息队列4.1特点4.2.相关函数ftok函数消息队列进程间的通信消息队列全双工通信4.消息队列消息队列,是消息的链表,存放在内核中,一个消息队列由一个标识符(队列ID)来标识。查看消息队列指令ipcs-q4.1特点消息队列是面向记录的,其中的消息具有特......
  • 【RabbitMQ】超详细Windows系统下RabbitMQ的安装配置
    RabbitMQ是用Erlang语言编写的,因此在安装RabbitMQ之前,需要先安装Erlang环境。一、安装Erlang环境1、准备工作确定Erlang版本:根据具体需求以及RabbitMQ或其他Erlang应用程序的兼容性要求确定要安装的Erlang版本。下载Erlang安装包:在Erlang的官方下载页面,选择适合你的Win......
  • Java中ArrayList集合—基础详解(知识点+代码示例)
    ArrayList(集合)ArrayList(集合)ArrayList(集合)10.1ArrayList成员方法10.2集合练习10.2.1添加字符串10.2.2添加数字10.2.3添加学生对象并遍历10.2.4集合概述:集合可以直接存储引用数据类型,不能直接存储基本数据类型,如果要存储基本数据类型,需要将基本数据类型变成对......
  • java队列
    1.队列定义:在Java中,队列(Queue)是一种常用的数据结构,属于java.util包。Queue接口继承自Collection接口,定义了一些基本操作,如入队、出队、查看队列头部等。Java提供了多种实现Queue接口的类,这些类可以满足不同的使用需求。2.Java队列的常见实现LinkedList:实现了Que......
  • Sonarqube 自定义规则,部署SonarSource / sonar-java源码中示例规则:docs/java-custom-r
    自定义规则,可以参考sonar-java/docs/CUSTOM_RULES_101.mdat8.0.0.36314·SonarSource/sonar-java·GitHub1、下载一份sonarqube源码,配置好本地的环境,JDK17和mavendocs/java-custom-rules-example示例项目中会有写好的规则;我们可以先尝试将这些写好的规则添加到Sonarqube......
  • 基于SpringBoot的停车场管理系统+毕业设计示例参考
    郑重声明:项目经过本地测试,确保可以运行。项目仅供学习和毕业设计参考~1.项目介绍技术栈+工具:SpringBoot+BootStrap+MySQL5.7+IDEA2022+Maven系统功能:系统管理、角色管理、车辆管理、车位管理、停车记录、充值、停车出场等系统角色:管理员、普通用户(可自行设定角......
  • 基于SSM的酒店客房管理系统+毕业设计示例参考
    郑重声明:项目经过本地测试,确保可以运行。项目仅供学习和毕业设计参考~1.项目介绍技术栈+工具:SSM+MySQL+IDEA2022+Maven+JSP系统功能:登录注册、客房预订、公告查看、留言功能、客房管理、入驻管理、退房管理等系统角色:管理员、酒店前台、普通用户2.项目部署创......
  • 使用SSMS操作AdventureWorks 示例数据库
    简介AdventureWorks示例数据库,官方文档:https://learn.microsoft.com/zh-cn/sql/samples/adventureworks-install-configure?view=sql-server-ver16&tabs=ssms 下载备份文件OLTP数据适用于大多数典型的联机事务处理工作负载。数据仓库(DW)数据适用于数据仓库工作负载。轻......