消息队列概念
- 什么是消息队列
- 消息(Message)是指在应用间传送的数据
- 消息队列(Message Queue)是一种应用间的通信方式解决方法,确保消息的可靠传递、
- 主流消息队列
- 目前主流的几大消息1队列有:RabitMQ、ActiveMQ、RocketMQ、Kafka、ZeroMQ等,也有一些小众的比如Beanstalk、当然我们之前学过的Redis也可以实现消息队列的功能
消息队列中角色名词
(1)Broker:
消息服务器,作为server提供消息核心服务
(2)Producer:
消息生产者,业务的发起方,负责生产消息传输给broker
(3)Consumer:
消息消费者,业务的处理方,负责从broker获取消息并进行业务逻辑处理
(4)Topic:
主题,发布订阅模式下的消息统一汇集地,不同生产者向topic发送消息,由MQ服务器分发到不同的订阅者,实现消息的广播
(5)Queue:
队列,PTP模式下,特定生产者向特定queue发送消息,消费者订阅特定的queue完成指定消息的接收
(6)Message:
消息体,根据不同通信协议定义的固定格式进行编码的数据包,来封装业务数据,实现消息的传输
消息队列中两种工作模式
- Point-to-Point
- 一方发送消息,另外一方接收
- Pub/Sub
- 即发布/订阅模式,消费者可以订阅一个或多个主题并使用该主题中的所有消息
消息队列的缺点
- 系统可用性降低
- 系统复杂性提高
- 数据一致性无法保证
RabbitMQ基础概念
- RabbitMQ相关术语
(1)生产者:产生消息的进程或服务
(2)消费者:接收消息的进程或服务
(3)队列:RabbitMQ是消息队列中间件,而真正储存消息数据的就是队列,队列可以有很多。
(4)交换器:类似于网络设备交换机,它可以根据不同的关键字,将消息发送到不同的队列。
(5)虚拟主机:虚拟主机提供了资源的逻辑分组和分隔,每一个虚拟主机本质上指mini版的RabbitMQ服务器
RabbitMQ架构
安装RabbitMQ(单机)
安装erlang
在101主机上关闭防火墙
[root@localhost ~]# systemctl stop firewalld
[root@localhost ~]# setenforce 0
安装阿里仓库
rm -rf /etc/yum.repos.d/*
curl -o /etc/yum.repos.d/CentOS-Base.repo https://mirrors.aliyun.com/repo/Centos-7.repo
curl -o /etc/yum.repos.d/epel.repo https://mirrors.aliyun.com/repo/epel-7.repo
yum clean all
安装erlang
[root@localhost ~]# yum -y install erlang
安装RabbitMQ
[root@localhost ~]# yum -y install rabbitmq-server
启动RabbitMQ
[root@localhost ~]# systemctl start rabbitmq-server #启动 RabbitMQ 服务的命令。
[root@localhost ~]# ps aux | grep rabbit #启动进程
[root@localhost ~]# rabbitmq-plugins list
#查看所有插件,如果前面的没有勾选中,就说明没有启动
[ ] amqp_client 3.3.5
[ ] cowboy 0.5.0-rmq3.3.5-git4b93c2d
[ ] eldap 3.3.5-gite309de4
[ ] mochiweb 2.7.0-rmq3.3.5-git680dba8
[ ] rabbitmq_amqp1_0 3.3.5
[ ] rabbitmq_auth_backend_ldap 3.3.5
[ ] rabbitmq_auth_mechanism_ssl 3.3.5
[ ] rabbitmq_consistent_hash_exchange 3.3.5
[ ] rabbitmq_federation 3.3.5
[ ] rabbitmq_federation_management 3.3.5
[ ] rabbitmq_management 3.3.5
[ ] rabbitmq_management_agent 3.3.5
[ ] rabbitmq_management_visualiser 3.3.5
[ ] rabbitmq_mqtt 3.3.5
[ ] rabbitmq_shovel 3.3.5
[ ] rabbitmq_shovel_management 3.3.5
[ ] rabbitmq_stomp 3.3.5
[ ] rabbitmq_test 3.3.5
[ ] rabbitmq_tracing 3.3.5
[ ] rabbitmq_web_dispatch 3.3.5
[ ] rabbitmq_web_stomp 3.3.5
[ ] rabbitmq_web_stomp_examples 3.3.5
[ ] sockjs 0.3.4-rmq3.3.5-git3132eb9
[ ] webmachine 1.10.3-rmq3.3.5-gite9359c7
[root@localhost ~]# rabbitmq-plugins enable rabbitmq_management
#激活插件
[root@localhost ~]# rabbitmq-plugins list
[e] amqp_client 3.3.5
[ ] cowboy 0.5.0-rmq3.3.5-git4b93c2d
[ ] eldap 3.3.5-gite309de4
[e] mochiweb 2.7.0-rmq3.3.5-git680dba8
[ ] rabbitmq_amqp1_0 3.3.5
[ ] rabbitmq_auth_backend_ldap 3.3.5
[ ] rabbitmq_auth_mechanism_ssl 3.3.5
[ ] rabbitmq_consistent_hash_exchange 3.3.5
[ ] rabbitmq_federation 3.3.5
[ ] rabbitmq_federation_management 3.3.5
[E] rabbitmq_management 3.3.5
[e] rabbitmq_management_agent 3.3.5
[ ] rabbitmq_management_visualiser 3.3.5
[ ] rabbitmq_mqtt 3.3.5
[ ] rabbitmq_shovel 3.3.5
[ ] rabbitmq_shovel_management 3.3.5
[ ] rabbitmq_stomp 3.3.5
[ ] rabbitmq_test 3.3.5
[ ] rabbitmq_tracing 3.3.5
[e] rabbitmq_web_dispatch 3.3.5
[ ] rabbitmq_web_stomp 3.3.5
[ ] rabbitmq_web_stomp_examples 3.3.5
[ ] sockjs 0.3.4-rmq3.3.5-git3132eb9
[e] webmachine 1.10.3-rmq3.3.5-gite9359c7
使用浏览器去进行访问,如果想要生效要去重启一下rabbitmq,在去浏览器去访问,账号密码为guest
[root@localhost ~]# systemctl restart rabbitmq-server
[root@localhost ~]# curl 127.0.0.1:15672
Nginx代理
将所有到达 Nginx 服务器的 HTTP 请求都转发到 http://127.0.0.1:15672。
[root@localhost ~]# yum -y install nginx
[root@localhost ~]# cd /etc/nginx/
[root@localhost nginx]# cd conf.d/
[root@localhost conf.d]# vim rabbitmq.conf
server {
listen 80;
location / {
proxy_pass http://127.0.0.1:15672; #反向代理的语句
}
}
[root@localhost conf.d]# nginx -t #测试语法错误
[root@localhost conf.d]# nginx
#启动nginx
同步102、103主机
安装python3的安装包,通过 Python 的包管理工具 pip 安装名为 pika 的库。
[root@localhost ~]# yum -y install python3
[root@localhost ~]# pip3 install pika
取消同步会话,分别在俩个主机上编写配置文件脚本,
102主机
[root@localhost ~]# python3 send_message.sh
消息发送完毕
[root@localhost ~]# vim send_message.sh
import pika
# 连接到RabbitMQ服务器
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.10.101'))
channel = connection.channel()
# 声明队列;如果队列不存在会被创建
channel.queue_declare(queue='test_queue', durable=True)
# 发送消息到队列中
channel.basic_publish(
exchange='',
routing_key='test_queue',
body='Hello, RabbitMQ!',
properties=pika.BasicProperties(
delivery_mode=2, # 使消息持久化
)
)
print("消息发送完毕")
# 关闭连接
connection.close()
~
- 连接到 RabbitMQ 服务器pika.BlockingConnection(pika.ConnectionParameters('192.168.10.101')) 连接到 IP 地址为 192.168.10.101 的 RabbitMQ 服务器。
- 创建频道connection.channel() 创建一个通信通道,用于与 RabbitMQ 服务器交换消息。
- 声明队列channel.queue_declare(queue='test_queue', durable=True) 确保队列 test_queue 存在,并设置其为持久化,这意味着队列在 RabbitMQ 重启后仍会保留。
- 发布消息channel.basic_publish(...) 将消息 Hello, RabbitMQ! 发送到队列 test_queue。delivery_mode=2 表示消息也将被持久化,以防 RabbitMQ 崩溃。
连接到 RabbitMQ 消息队列服务器,声明一个名为 test_queue 的队列,并消费该队列中的消息,将接收到的消息打印到控制台。
103主机
[root@localhost ~]# python3 receive_message.sh
[root@localhost ~]# vim receive_message.sh
import pika
# 连接到RabbitMQ服务器
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.10.101'))
channel = connection.channel()
# 声明队列,确保RabbitMQ中有一个名为'test_queue'的队列
channel.queue_declare(queue='test_queue', durable=True)
# 定义回调函数来处理消息
def callback(ch, method, properties, body):
print(f"Received {body.decode()}")
# 消费队列中的消息,回调函数为callback
channel.basic_consume(queue='test_queue', on_message_callback=callback, auto_ack=True)
print("Waiting for messages. To exit press CTRL+C")
# 开始监听消息
channel.start_consuming()
用102发送103去接收
虚拟机管理
来到101主机上创建虚拟机
[root@localhost ~]# rabbitmqctl list_vhosts
#列出当前的虚拟机
[root@localhost ~]# rabbitmqctl add_vhost zhangsan
#创建一个叫hzangsan的虚拟机
用户管理
[root@localhost ~]# rabbitmqctl add_user zhangsan pwd123 #创建一个zhangsan的用户密码为pwd123
[root@localhost ~]# rabbitmqctl list_users #列出所有用户
Listing users ...
guest [administrator]
zhangsan []
...done.
设置角色
设置 RabbitMQ 中用户的权限
[root@localhost ~]# rabbitmqctl set_permissions -p zhangsan zhangsan '.*' '.*' '.*'
[root@localhost ~]# rabbitmqctl list_permissions -p zhangsan #列出 RabbitMQ 中在指定虚拟主机 zhangsan 上的所有用户权限
Listing permissions in vhost "zhangsan" ...
zhangsan .* .* .*
...done.
分配角色
将 zhangsan 用户的角色设置为 administrator。这意味着 zhangsan 将拥有 RabbitMQ 服务器上的完全管理权限。
[root@localhost ~]# rabbitmqctl set_user_tags zhangsan administrator
Setting tags for user "zhangsan" to [administrator] ...
...done.
zhangsan
pwd123
RabbitMQ集群
主机名 | Ip | 节点类型 |
Centos01 | 192.168.10.101 | 磁盘节点 |
Centos02 | 192.168.10.102 | 内存节点 |
Centos03 | 192.168.10.103 | 内存节点 |
给三台主机设置主机名
101
[root@localhost ~]# hostnamectl set-hostname mq01
[root@localhost ~]# bash
102
[root@localhost ~]# hostnamectl set-hostname mq02
[root@localhost ~]# bash
103
[root@localhost ~]# hostnamectl set-hostname mq01
[root@localhost ~]# bash
同步三台主机编辑hosts文件
[root@mq01 ~]# vim /etc/hosts
192.168.10.101 mq01
192.168.10.102 mq02
192.168.10.103 mq03
关闭三台主机的防火墙,设置永久关闭
[root@mq01 ~]# systemctl stop firewalld
[root@mq01 ~]# setenforce 0
[root@mq01 ~]# systemctl disable firewalld #设置永久关闭
部署仓库
rm -rf /etc/yum.repos.d/*
curl -o /etc/yum.repos.d/CentOS-Base.repo https://mirrors.aliyun.com/repo/Centos-7.repo
curl -o /etc/yum.repos.d/epel.repo https://mirrors.aliyun.com/repo/epel-7.repo
yum clean all
安装rabbitmq
[root@mq01 ~]# yum -y install erlang
[root@mq01 ~]# yum -y install rabbitmq-server
启动服务
设置为开机自启
[root@mq01 ~]# systemctl start rabbitmq-server
[root@mq01 ~]# systemctl enable rabbitmq-server
[root@mq01 ~]# ps aux | grep rabbit #查看进程
安装management插件
[root@mq01 ~]# rabbitmq-plugins enable rabbitmq_management
[root@mq01 ~]# rabbitmq-plugins list
要想让插件生效要重启一下服务
[root@mq01 ~]# systemctl restart rabbitmq-server
编辑cookie文件
取消同步
拷贝cookie文件到另外俩个主机三因为不同的主机文件不一样,要保持一致
[root@mq01 ~]# cd /var/lib/rabbitmq/
[root@mq01 rabbitmq]# scp .erlang.cookie mq02:/var/lib/rabbitmq
[root@mq01 rabbitmq]# scp .erlang.cookie mq03:/var/lib/rabbitmq
[root@mq03 rabbitmq]# cat .erlang.cookie #显示文件内容
cookie复制完成以后把三个节点重启一下
同步三台主机
[root@mq01 rabbitmq]# reboot
取消同步对102.103做配置
来到102主机(103主机也一样安装相同的方法)
停止 RabbitMQ 应用程序,但不会停止 RabbitMQ 服务本身,加入集群,显示...done表示连接成功,在启动RabbitMQ 应用程序
[root@mq02 ~]# rabbitmqctl stop_app
[root@mq02 ~]# rabbitmqctl join_cluster --ram rabbit@mq01 #连接成功
...done.
[root@mq02 ~]# rabbitmqctl start_app
查看集群状态
[root@mq02 ~]# rabbitmqctl cluster_status
Cluster status of node rabbit@mq02 ...
[{nodes,[{disc,[rabbit@mq01]},{ram,[rabbit@mq02]}]},
{running_nodes,[rabbit@mq01,rabbit@mq02]},
{cluster_name,<<"rabbit@mq02">>},
{partitions,[]}]
...done.
标签:队列,rabbitmq,消息,RabbitMQ,root,localhost,3.3
From: https://blog.csdn.net/m0_70627741/article/details/141195863