目录
消息队列
消息队列:Message Queue(MQ)
概念
- 消息(Message)是指在应用间传送的数据
- 消息队列(Message Queue)是一种应用间的通信方式,确保消息的可靠传递
- 临时存放数据的空间(缓存)
- 消息队列是中间件的一种
主流的消息队列
- 目前主流的几大消息队列有:RabbitMQ、ActiveMQ、RocketMQ、Kafka、ZeroMQ
消息队列名词
(1)Broker
- 消息服务器,作为Server提供消息核心服务
(2)Topic
- 主题,同类消息的集合,不同生产者向Topic发送消息,由MQ服务器分发到不同的订阅者,实现消息的传输
(3)Producer
- 消息的生产者,业务的发起方,负责生产消息传输给broker
(4)Consumer
- 消息的消费者,业务的处理方,负责从broker获取消息并进行业务逻辑处理
(5)Queue
- 队列,Point-to-Point模式下,特定生产者向特定的queue发送消息,消费者订阅特定的queue完成指定消息的接收,先进先出
(6)Message
- 消息,根据不同通信协议定义的固定格式进行编码的数据包,来封装业务数据,实现消息的传输
消息队列中两种工作模式
Point-to-Point(PTP、点到点)
- 一方发送消息,另外一方接收
比如消息发布之后,消息在队列中会等待消费者提取,消费者提取完后,整个队列会向前进一位,并且释放掉被提取出的消息(一对一)
Pub/Sub
- 发布/订阅模式,消费者可以订阅一个或多个主题并使用该主题中的所有消息
比如消息发布之后,某个消费者提取消息之后,被提取的消息不会释放,这些消息需要提供给多个消费者接收(一对多)
消息队列的缺点
系统可用性降低
- 增加了消息队列,结构就越复杂,故障率就会提高
系统复杂性提高
- 添加了消息队列的技术,不管是开发人员还是运维人员,在构建业务的时候都要针对消息队列做出处理
数据一致性无法保证
- 因为增加了和接收方之间的一个中间环节,数据的延迟会增大
RabbitMQ
相关术语
(1)生产者
产生消息的进程或服务
(2)消费者
接受消息的进程或服务
(3)队列
RabbitMQ是消息队列中间件,而真正储存消息数据的就是队列,不同的程序可以使用同一个队列,而队列也可以有多个
(4)交换器
把消息进行分类,它可以根据不同的关键字,将消息发送到不同的队列
(5)虚拟主机
虚拟主机可以把一个服务器上的RabbitMQ拆分成多个单元,每一个单元都可以当成一个单独的、轻量的RabbitMQ服务器
单节点示例
单节点就是只有一台主机部署RabbitMQ,但为了充分体现消息队列作为中间件角色,就需要把该主机部署到两个程序的中间
因此我们需要3台CentOS 7虚拟机,两台作为两边的程序,其中一台部署RabbitMQ作为中间件
示例环境
操作系统 | IP 地址 | 角色 |
CentOS 7 | 192.168.10.101 | 中间件 部署RabbitMQ消息队列 |
CentOS 7 | 192.168.10.102 | 消息的生产者 |
CentOS 7 | 192.168.10.103 | 消息的消费者 |
部署RabbitMQ
打开101主机,并连接上XShell,我们先来部署RabbitMQ
安装RabbitMQ需要仓库具有epel扩展源和安装erlang语言,自带光盘上是没有的,需要借助阿里的仓库源来安装
为了方便实验,关闭防火墙和内核安全机制
[root@localhost ~]# systemctl stop firewalld
[root@localhost ~]# setenforce 0
然后把下方命令全部复制带终端直接回车即可替换仓库源为阿里云的
rm -rf /etc/yum.repos.d/*
curl -o /etc/yum.repos.d/CentOS-Base.repo https://mirrors.aliyun.com/repo/Centos-7.repo
curl -o /etc/yum.repos.d/epel.repo https://mirrors.aliyun.com/repo/epel-7.repo
yum clean all
然后安装所需依赖,因为刚刚清除过元数据,在第一次安装时就重建元数据,需要等待一段时间
安装完后启动rabbitmq-server服务,然后可以使用ps命令查看是否有rabbitmq的进程在运行
[root@localhost ~]# yum install -y erlang rabbitmq-server
[root@localhost ~]# systemctl start rabbitmq-server
[root@localhost ~]# ps aux | grep rabbitmq
除此之外,RabbitMQ还提供了一个管理插件,是一个Web页面,可以通过这个页面进行一些简单的配置,这里我们可以先查看一下这个插件有没有启用
使用rabbitmq-plugins命令可以查看RabbitMQ所有的插件,如果列出信息的前方的中括号是空的,说明该插件没有启用,这里也可以看到我们要开启的管理插件也默认没有启用
[root@localhost ~]# rabbitmq-plugins list
[ ] rabbitmq_management 3.3.5
还是使用rabbitmq-plugins命令的enable选项指定插件名来开启指定插件,开启以后,想要指定该插件是否成功开启,再使用list选项查询所有插件
第二次查询时我们可以发现,有许多插件被开启了,同时也包含了rabbitmq_management
- 小写字母e表示:作为依赖一起被启动
- 大写字母E表示:被明确地启用了
[root@localhost ~]# rabbitmq-plugins enable rabbitmq_management
[root@localhost ~]# rabbitmq-plugins list
[e] amqp_client 3.3.5
[e] mochiweb 2.7.0-rmq3.3.5-git680dba8
[E] rabbitmq_management 3.3.5
[e] rabbitmq_management_agent 3.3.5
[e] rabbitmq_web_dispatch 3.3.5
[e] webmachine 1.10.3-rmq3.3.5-gite9359c7
在修改了RabbitMQ的插件状态后,需要重启服务使插件生效,然后可以使用本地IP+该管理插件的端口号来访问该页面
[root@localhost ~]# systemctl restart rabbitmq-server
[root@localhost ~]# curl 127.0.0.1:15672
或者此时在宿主机的浏览器访问RabbitMQ服务器的IP+该插件的端口,就可以进入该页面进行管理,
当然了,不使用该插件提供的页面而在终端使用命令的方式也同样可以配置RabbitMQ
账号和密码都是guest,是RabbitMQ内置默认的超级管理员账号
测试消息
这里我们编写一个Python来测试消息队列
打开102和103主机,并连接上XShell,然后开启会话同步
在102和103的任意一台主机上操作
CentOS 7系统本身是自带Python的,但是是2.7的版本,而Python已经不再维护2.7版本更新,而且2.7和3.0以上版本的语句有细微不同
所以我们这里先安装Python3的版本,然后还需要安装pika软件包,是Python里的一个函数,利用该函数连接到消息队列的主机
[root@localhost ~]# yum -y install python3
[root@localhost ~]# pip3 install pika
在102上操作
这里把102主机作为消息发布者
编写生产消息的脚本
[root@localhost ~]# vim send_message.sh
import pika
# 连接到RabbitMQ服务器
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.10.101'))
channel = connection.channel()
# 声明队列;如果队列不存在会被创建
channel.queue_declare(queue='test_queue', durable=True)
# 发送消息到队列中
channel.basic_publish(
exchange='',
routing_key='test_queue',
body='Hello, RabbitMQ!',
properties=pika.BasicProperties(
delivery_mode=2, # 使消息持久化
)
)
print("消息发送完毕")
# 关闭连接
connection.close()
使用python3命令运行该python脚本,发送消息
[root@localhost ~]# python3 send_message.sh
消息发送完毕
此时在管理插件的页面上查看,也可以看到消息成功被发布了
在103上操作
把103主机作为消息消费者
编写接收消息的脚本
[root@localhost ~]# vim receice_message.sh
import pika
# 连接到RabbitMQ服务器
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.10.101'))
channel = connection.channel()
# 声明队列,确保RabbitMQ中有一个名为'test_queue'的队列
channel.queue_declare(queue='test_queue', durable=True)
# 定义回调函数来处理消息
def callback(ch, method, properties, body):
print(f"Received {body.decode()}")
# 消费队列中的消息,回调函数为callback
channel.basic_consume(queue='test_queue', on_message_callback=callback, auto_ack=True)
print("Waiting for messages. To exit press CTRL+C")
# 开始监听消息
channel.start_consuming()
执行该python脚本,可以看到消息已经接收到了
[root@localhost ~]# python3 receive_message.sh Waiting for messages. To exit press CTRL+C Received Hello, RabbitMQ!
消息被提取之后就会被释放,在管理页面上消息的数量就变成0了
RabbitMQ常用命令
(1)虚拟机管理
列出所有的虚拟主机
[root@localhost ~]# rabbitmqctl list_vhosts
创建名为 test 的虚拟主机
[root@localhost ~]# rabbitmqctl add_vhost test
删除名字叫 test 的虚拟主机
[root@localhost ~]# rabbitmqctl delete_vhost test
(2)用户管理
创建tom用户,密码为123456
[root@localhost ~]# rabbitmqctl add_user tom 123456
列出所有用户
[root@localhost ~]# rabbitmqctl list_users
更改 tom 的密码为 new_passwd
[root@localhost ~]# rabbitmqctl change_password tom new_passwd
删除名为 tom 的用户
[root@localhost ~]# rabbitmqctl delete_user user1
(3)tags角色
角色 | 说明 |
超级管理员(administrator):guest | 可登陆管理控制台(启用management plugin的情况下),可查看所有的信息,并且可以对用户,策略(policy)进行操作。 |
监控者(monitoring) | 可登陆管理控制台(启用management plugin的情况下),同时可以查看rabbitmq节点的相关信息(进程数,内存使用情况,磁盘使用情况等) |
策略制定者(policymaker) | 可登陆管理控制台(启用management plugin的情况下), 同时可以对policy进行管理。但无法查看节点的相关信息。 |
普通管理者(management) | 仅可登陆管理控制台(启用management plugin的情况下),无法看到节点信息,也无法对策略进行管理。 |
其他 | 无法登陆管理控制台,通常就是普通的生产者和消费者。 |
赋予tom用户management角色
[root@localhost ~]# rabbitmqctl add_user tom user1_passwd
[root@localhost ~]# rabbitmqctl set_user_tags tom managemnet
针对test虚拟主机给tom用户设置所有的配置、读写queue和exchange的权限。默认是没有任何权限的
[root@localhost ~]# rabbitmqctl add_vhost test
[root@localhost ~]# rabbitmqctl set_permissions -p test tom '.*' '.*' '.*'
列出tom的权限
[root@localhost ~]# rabbitmqctl list_user_permissions user1
列出test下的所有用户权限
[root@localhost ~]# rabbitmqctl list_permissions -p fll
如果需要清除tom在test上的权限
[root@localhost ~]# rabbitmqctl clear_permissions -p test tom
(4)插件管理
rabbitmq-plugins list # 获取RabbitMQ插件列表
rabbitmq-plugins enable <插件名字> # 安装RabbitMQ插件
rabbitmq-plugins disable <插件名字> # 卸载某个插件
(5)其他
列出所有的交换器
rabbitmqctl list_exchanges
列出所有的绑定,即把exchange和queue按照路由规则绑定起来
rabbitmqctl list_bindings
分别查看当前系统种存在的Exchange和Exchange上绑定的Queue信息。
rabbitmqctl list_queues
查看运行信息
rabbitmqctl status
RabbitMQ集群示例
RabbitMQ是基于Erlang编写的,Erlang本身就支持分布式(通过同步Erlang集群各节点的cookie来实现),因此不需要像Kafka那样通过ZooKeeper来实现分布式集群。
RabbitMQ集群概念
在RabbitMQ集群中的每个节点,有内存节点和磁盘节点
内存节点
内存节点会将所有的元数据信息仅存储到内存中
磁盘节点
磁盘节点则不仅会将所有元数据存储到内存上, 还会将其持久化到磁盘。
关系
内存节点用于提高性能,而磁盘节点用于数据持久性。
在 RabbitMQ 集群中,通常会混合使用两者,以平衡性能和可靠性。内存节点可以帮助减少磁盘 I/O 负担,但重要的消息和队列仍然需要在磁盘节点上持久化。
系统 | ip | 角色 |
Centos | 192.168.10.101 | 磁盘节点 |
Centos | 192.168.10.102 | 内存节点 |
Centos | 192.168.10.103 | 内存节点 |
恢复快照,打开3台主机并连接上XShell
初步设置
分别在3个主机操作
分别修改3个主机的主机名
101主机
[root@localhost ~]# hostname mq01
[root@localhost ~]# bash
102主机
[root@localhost ~]# hostname mq02
[root@localhost ~]# bash
103主机
[root@localhost ~]# hostname mq03
[root@localhost ~]# bash
开启会话同步
在101(磁盘节点)主机操作
为了方便实验,关闭防火墙并设为永久关闭,再关闭内核安全机制
实现主机直接通过主机名通信,在hosts文件末尾追加以下内容
[root@mq01 ~]# systemctl stop firewalld
[root@mq01 ~]# systemctl disable firewalld
[root@mq01 ~]# setenforce 0
[root@mq01 ~]# vim /etc/hosts
192.168.10.101 mq01
192.168.10.102 mq02
192.168.10.103 mq03
然后把下方命令全部复制带终端直接回车即可替换仓库源为阿里云的
rm -rf /etc/yum.repos.d/*
curl -o /etc/yum.repos.d/CentOS-Base.repo https://mirrors.aliyun.com/repo/Centos-7.repo
curl -o /etc/yum.repos.d/epel.repo https://mirrors.aliyun.com/repo/epel-7.repo
yum clean all
安装所需依赖,启动服务并设为开机自启,然后使用ps命令查看是否有相关进程
[root@mq01 ~]# yum install -y erlang rabbitmq-server
[root@mq01 ~]# systemctl start rabbitmq-server
[root@mq01 ~]# systemctl enable rabbitmq-server
[root@mq01 ~]# ps aux | grep rabbit
启用插件,重启服务
[root@mq01 ~]# rabbitmq-plugins enable rabbitmq_management
[root@mq01 ~]# systemctl restart rabbitmq-server
同步Cookie
关闭会话同步
在101(磁盘节点)主机操作
确保集群节点能够相互认证,如果.erlang.cookie文件不一致,节点间将无法建立信任关系,从而无法正常加入集群。
[root@mq01 ~]# scp /var/lib/rabbitmq/.erlang.cookie root@mq02:/var/lib/rabbitmq/
[root@mq01 ~]# scp /var/lib/rabbitmq/.erlang.cookie root@mq03:/var/lib/rabbitmq/
开启会话同步
RabbitMQ 节点需要重启系统以加载新的认证配置。重启确保每个节点重新加载了更新后的认证信息,并建立正确的集群连接。
[root@mq01 ~]# reboot
重启三台主机后关闭消息同步,然后检查三台主机的RabbitMQ服务,正常情况下是都在运行的
[root@mq01 ~]# ps aux | grep rabbit
关闭会话同步
分配节点
在102(内存节点①)主机操作
先停止RabbitMQ应用,以便进行集群配置而不影响正在运行的服务
再让当前节点(rabbit@mq02)加入现有的 RabbitMQ 集群,并设置为内存节点。内存节点不持久化消息,只在内存中存储数据。
然后启动 RabbitMQ 应用,使节点重新开始处理请求。
[root@mq02 ~]# rabbitmqctl stop_app
[root@mq02 ~]# rabbitmqctl join_cluster --ram rabbit@mq01
[root@mq02 ~]# rabbitmqctl start_app
在103(内存节点②)主机操作
执行上面的相同操作
[root@mq03 ~]# rabbitmqctl stop_app
[root@mq03 ~]# rabbitmqctl join_cluster --ram rabbit@mq01
[root@mq03 ~]# rabbitmqctl start_app
最后可以在任意节点查看集群状态,完成了集群的部署
[root@mq02 ~]# rabbitmqctl cluster_status
Cluster status of node rabbit@mq02 ...
[{nodes,[{disc,[rabbit@mq01]},{ram,[rabbit@mq03,rabbit@mq02]}]},
{running_nodes,[rabbit@mq03,rabbit@mq01,rabbit@mq02]},
{cluster_name,<<"rabbit@mq02">>},
{partitions,[]}]
...done.
- disc:表示磁盘节点
- ram:表示内存节点