首页 > 其他分享 >RabbitMQ消息队列

RabbitMQ消息队列

时间:2024-08-14 17:25:56浏览次数:18  
标签:队列 rabbitmq 消息 RabbitMQ root localhost 3.3

消息队列概念

  • 什么是消息队列
    • 消息(Message)是指在应用间传送的数据
    • 消息队列(Message Queue)是一种应用间的通信方式解决方法,确保消息的可靠传递、
  • 主流消息队列
    • 目前主流的几大消息1队列有:RabitMQ、ActiveMQ、RocketMQ、Kafka、ZeroMQ等,也有一些小众的比如Beanstalk、当然我们之前学过的Redis也可以实现消息队列的功能

消息队列中角色名词

(1)Broker:

消息服务器,作为server提供消息核心服务

(2)Producer:

消息生产者,业务的发起方,负责生产消息传输给broker

(3)Consumer:

消息消费者,业务的处理方,负责从broker获取消息并进行业务逻辑处理

(4)Topic:

主题,发布订阅模式下的消息统一汇集地,不同生产者向topic发送消息,由MQ服务器分发到不同的订阅者,实现消息的广播

(5)Queue:

队列,PTP模式下,特定生产者向特定queue发送消息,消费者订阅特定的queue完成指定消息的接收

(6)Message:

消息体,根据不同通信协议定义的固定格式进行编码的数据包,来封装业务数据,实现消息的传输

消息队列中两种工作模式

  • Point-to-Point
    • 一方发送消息,另外一方接收
  • Pub/Sub
    • 即发布/订阅模式,消费者可以订阅一个或多个主题并使用该主题中的所有消息

消息队列的缺点

  • 系统可用性降低
  • 系统复杂性提高
  • 数据一致性无法保证

RabbitMQ基础概念

  • RabbitMQ相关术语

(1)生产者:产生消息的进程或服务

(2)消费者:接收消息的进程或服务

(3)队列:RabbitMQ是消息队列中间件,而真正储存消息数据的就是队列,队列可以有很多。

(4)交换器:类似于网络设备交换机,它可以根据不同的关键字,将消息发送到不同的队列。   

(5)虚拟主机:虚拟主机提供了资源的逻辑分组和分隔,每一个虚拟主机本质上指mini版的RabbitMQ服务器

RabbitMQ架构

安装RabbitMQ(单机)

安装erlang

在101主机上关闭防火墙

[root@localhost ~]# systemctl stop firewalld
[root@localhost ~]# setenforce 0
安装阿里仓库
rm -rf /etc/yum.repos.d/*
curl -o /etc/yum.repos.d/CentOS-Base.repo https://mirrors.aliyun.com/repo/Centos-7.repo
curl -o /etc/yum.repos.d/epel.repo https://mirrors.aliyun.com/repo/epel-7.repo
yum clean all
安装erlang
[root@localhost ~]# yum -y install erlang
安装RabbitMQ
[root@localhost ~]# yum -y install rabbitmq-server


启动RabbitMQ
[root@localhost ~]# systemctl start rabbitmq-server              #启动 RabbitMQ 服务的命令。
[root@localhost ~]# ps aux | grep rabbit                  #启动进程
[root@localhost ~]# rabbitmq-plugins list
                       #查看所有插件,如果前面的没有勾选中,就说明没有启动
[ ] amqp_client                       3.3.5
[ ] cowboy                            0.5.0-rmq3.3.5-git4b93c2d
[ ] eldap                             3.3.5-gite309de4
[ ] mochiweb                          2.7.0-rmq3.3.5-git680dba8
[ ] rabbitmq_amqp1_0                  3.3.5
[ ] rabbitmq_auth_backend_ldap        3.3.5
[ ] rabbitmq_auth_mechanism_ssl       3.3.5
[ ] rabbitmq_consistent_hash_exchange 3.3.5
[ ] rabbitmq_federation               3.3.5
[ ] rabbitmq_federation_management    3.3.5
[ ] rabbitmq_management               3.3.5
[ ] rabbitmq_management_agent         3.3.5
[ ] rabbitmq_management_visualiser    3.3.5
[ ] rabbitmq_mqtt                     3.3.5
[ ] rabbitmq_shovel                   3.3.5
[ ] rabbitmq_shovel_management        3.3.5
[ ] rabbitmq_stomp                    3.3.5
[ ] rabbitmq_test                     3.3.5
[ ] rabbitmq_tracing                  3.3.5
[ ] rabbitmq_web_dispatch             3.3.5
[ ] rabbitmq_web_stomp                3.3.5
[ ] rabbitmq_web_stomp_examples       3.3.5
[ ] sockjs                            0.3.4-rmq3.3.5-git3132eb9
[ ] webmachine                        1.10.3-rmq3.3.5-gite9359c7
[root@localhost ~]# rabbitmq-plugins enable rabbitmq_management
        #激活插件
[root@localhost ~]# rabbitmq-plugins list
[e] amqp_client                       3.3.5
[ ] cowboy                            0.5.0-rmq3.3.5-git4b93c2d
[ ] eldap                             3.3.5-gite309de4
[e] mochiweb                          2.7.0-rmq3.3.5-git680dba8
[ ] rabbitmq_amqp1_0                  3.3.5
[ ] rabbitmq_auth_backend_ldap        3.3.5
[ ] rabbitmq_auth_mechanism_ssl       3.3.5
[ ] rabbitmq_consistent_hash_exchange 3.3.5
[ ] rabbitmq_federation               3.3.5
[ ] rabbitmq_federation_management    3.3.5
[E] rabbitmq_management               3.3.5
[e] rabbitmq_management_agent         3.3.5
[ ] rabbitmq_management_visualiser    3.3.5
[ ] rabbitmq_mqtt                     3.3.5
[ ] rabbitmq_shovel                   3.3.5
[ ] rabbitmq_shovel_management        3.3.5
[ ] rabbitmq_stomp                    3.3.5
[ ] rabbitmq_test                     3.3.5
[ ] rabbitmq_tracing                  3.3.5
[e] rabbitmq_web_dispatch             3.3.5
[ ] rabbitmq_web_stomp                3.3.5
[ ] rabbitmq_web_stomp_examples       3.3.5
[ ] sockjs                            0.3.4-rmq3.3.5-git3132eb9
[e] webmachine                        1.10.3-rmq3.3.5-gite9359c7
使用浏览器去进行访问,如果想要生效要去重启一下rabbitmq,在去浏览器去访问,账号密码为guest
[root@localhost ~]# systemctl restart rabbitmq-server
[root@localhost ~]# curl 127.0.0.1:15672

Nginx代理

将所有到达 Nginx 服务器的 HTTP 请求都转发到 http://127.0.0.1:15672

[root@localhost ~]# yum -y install nginx
[root@localhost ~]# cd /etc/nginx/
[root@localhost nginx]# cd conf.d/
[root@localhost conf.d]# vim rabbitmq.conf
server {
        listen 80;
        location / {
           proxy_pass http://127.0.0.1:15672;           #反向代理的语句
}

}
[root@localhost conf.d]# nginx -t                 #测试语法错误
[root@localhost conf.d]# nginx
                #启动nginx

同步102、103主机

安装python3的安装包,通过 Python 的包管理工具 pip 安装名为 pika 的库。
[root@localhost ~]# yum -y install python3
[root@localhost ~]# pip3 install pika

取消同步会话,分别在俩个主机上编写配置文件脚本,

102主机
[root@localhost ~]# python3 send_message.sh 
消息发送完毕
[root@localhost ~]# vim send_message.sh
import pika

# 连接到RabbitMQ服务器
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.10.101'))
channel = connection.channel()

# 声明队列;如果队列不存在会被创建
channel.queue_declare(queue='test_queue', durable=True)

# 发送消息到队列中
channel.basic_publish(
    exchange='',
    routing_key='test_queue',
    body='Hello, RabbitMQ!',
    properties=pika.BasicProperties(
        delivery_mode=2,  # 使消息持久化
    )
)

print("消息发送完毕")

# 关闭连接
connection.close()
~                   
  1. 连接到 RabbitMQ 服务器pika.BlockingConnection(pika.ConnectionParameters('192.168.10.101')) 连接到 IP 地址为 192.168.10.101 的 RabbitMQ 服务器。
  2. 创建频道connection.channel() 创建一个通信通道,用于与 RabbitMQ 服务器交换消息。
  3. 声明队列channel.queue_declare(queue='test_queue', durable=True) 确保队列 test_queue 存在,并设置其为持久化,这意味着队列在 RabbitMQ 重启后仍会保留。
  4. 发布消息channel.basic_publish(...) 将消息 Hello, RabbitMQ! 发送到队列 test_queue。delivery_mode=2 表示消息也将被持久化,以防 RabbitMQ 崩溃。
连接到 RabbitMQ 消息队列服务器,声明一个名为 test_queue 的队列,并消费该队列中的消息,将接收到的消息打印到控制台。

103主机
[root@localhost ~]# python3 receive_message.sh 
[root@localhost ~]# vim receive_message.sh
import pika

# 连接到RabbitMQ服务器
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.10.101'))
channel = connection.channel()

# 声明队列,确保RabbitMQ中有一个名为'test_queue'的队列
channel.queue_declare(queue='test_queue', durable=True)

# 定义回调函数来处理消息
def callback(ch, method, properties, body):
    print(f"Received {body.decode()}")

# 消费队列中的消息,回调函数为callback
channel.basic_consume(queue='test_queue', on_message_callback=callback, auto_ack=True)

print("Waiting for messages. To exit press CTRL+C")
# 开始监听消息
channel.start_consuming()

用102发送103去接收

虚拟机管理

来到101主机上创建虚拟机

[root@localhost ~]# rabbitmqctl list_vhosts
          #列出当前的虚拟机
[root@localhost ~]# rabbitmqctl add_vhost zhangsan
     #创建一个叫hzangsan的虚拟机
用户管理
[root@localhost ~]# rabbitmqctl add_user zhangsan pwd123       #创建一个zhangsan的用户密码为pwd123
[root@localhost ~]# rabbitmqctl list_users             #列出所有用户
Listing users ...
guest    [administrator]
zhangsan    []
...done.
设置角色
设置 RabbitMQ 中用户的权限
[root@localhost ~]# rabbitmqctl set_permissions -p zhangsan zhangsan '.*' '.*' '.*'
[root@localhost ~]# rabbitmqctl list_permissions -p zhangsan        #列出 RabbitMQ 中在指定虚拟主机 zhangsan 上的所有用户权限
Listing permissions in vhost "zhangsan" ...
zhangsan    .*    .*    .*
...done.
分配角色

将 zhangsan 用户的角色设置为 administrator。这意味着 zhangsan 将拥有 RabbitMQ 服务器上的完全管理权限。

[root@localhost ~]# rabbitmqctl set_user_tags zhangsan administrator
Setting tags for user "zhangsan" to [administrator] ...
...done.

 zhangsan
 pwd123

RabbitMQ集群

主机名

Ip

节点类型

Centos01

192.168.10.101

磁盘节点

Centos02

192.168.10.102

内存节点

Centos03

192.168.10.103

内存节点

给三台主机设置主机名
101
[root@localhost ~]# hostnamectl set-hostname mq01
[root@localhost ~]# bash
102
[root@localhost ~]# hostnamectl set-hostname mq02
[root@localhost ~]# bash
103
[root@localhost ~]# hostnamectl set-hostname mq01
[root@localhost ~]# bash


同步三台主机编辑hosts文件
[root@mq01 ~]# vim /etc/hosts
192.168.10.101 mq01
192.168.10.102 mq02
192.168.10.103 mq03
关闭三台主机的防火墙,设置永久关闭
[root@mq01 ~]# systemctl stop firewalld
[root@mq01 ~]# setenforce 0
[root@mq01 ~]# systemctl disable firewalld             #设置永久关闭
部署仓库
rm -rf /etc/yum.repos.d/*
curl -o /etc/yum.repos.d/CentOS-Base.repo https://mirrors.aliyun.com/repo/Centos-7.repo
curl -o /etc/yum.repos.d/epel.repo https://mirrors.aliyun.com/repo/epel-7.repo
yum clean all
安装rabbitmq
[root@mq01 ~]# yum -y install erlang
[root@mq01 ~]# yum -y install rabbitmq-server
启动服务

设置为开机自启

[root@mq01 ~]# systemctl start rabbitmq-server
[root@mq01 ~]# systemctl enable rabbitmq-server
[root@mq01 ~]# ps aux | grep rabbit             #查看进程
安装management插件
[root@mq01 ~]# rabbitmq-plugins enable rabbitmq_management
[root@mq01 ~]# rabbitmq-plugins list

要想让插件生效要重启一下服务

[root@mq01 ~]# systemctl restart rabbitmq-server
编辑cookie文件
取消同步

拷贝cookie文件到另外俩个主机三因为不同的主机文件不一样,要保持一致

[root@mq01 ~]# cd /var/lib/rabbitmq/
[root@mq01 rabbitmq]# scp .erlang.cookie mq02:/var/lib/rabbitmq
[root@mq01 rabbitmq]# scp .erlang.cookie mq03:/var/lib/rabbitmq
[root@mq03 rabbitmq]# cat .erlang.cookie           #显示文件内容

cookie复制完成以后把三个节点重启一下

同步三台主机

[root@mq01 rabbitmq]# reboot

取消同步对102.103做配置

来到102主机(103主机也一样安装相同的方法)

停止 RabbitMQ 应用程序,但不会停止 RabbitMQ 服务本身,加入集群,显示...done表示连接成功,在启动RabbitMQ 应用程序

[root@mq02 ~]# rabbitmqctl stop_app
[root@mq02 ~]# rabbitmqctl join_cluster --ram rabbit@mq01                #连接成功
...done.
[root@mq02 ~]# rabbitmqctl start_app

查看集群状态


[root@mq02 ~]# rabbitmqctl cluster_status
Cluster status of node rabbit@mq02 ...
[{nodes,[{disc,[rabbit@mq01]},{ram,[rabbit@mq02]}]},
 {running_nodes,[rabbit@mq01,rabbit@mq02]},
 {cluster_name,<<"rabbit@mq02">>},
 {partitions,[]}]
...done.

标签:队列,rabbitmq,消息,RabbitMQ,root,localhost,3.3
From: https://blog.csdn.net/m0_70627741/article/details/141195863

相关文章

  • RabbitMQ消息队列
    一:RabbitMQ介绍RabbitMQ是一款在全球范围内使用非常广泛的开源消息队列中间件。它轻量级、易部署、并支持多种协议。它基于Erlang开发,天生拥有高并发的能力。1:RabbitMQ相关术语(1)生产者产生消息的进程或服务(2)消费者接收消息的进程或服务(3)队列RabbitMQ是消息队列中间件,而......
  • 抖音私信消息接入客服系统
    抖音是当今社交媒体中最受欢迎的应用之一,已经吸引了数百万用户。近年来,许多公司开始在抖音上开展营销活动,以吸引更多的用户。与此同时,他们也在收到越来越多的抖音私信。在应对这种情况时,许多公司都面临一个问题:如何更有效地处理这些私信。这就是为什么抖音私信消息接入客服系统变......
  • 代码随想录day29 || 134 加油站,135 分糖果,860 柠檬水找零,406 根据身高重建队列
    加油站funccanCompleteCircuit(gas[]int,cost[]int)int{ //思路,首先统计一个差值数组,表示行驶到下一个加油站可以补充的油量,然后加总差值数组, //如果小于0,表示从起始位置到目前为止剩余油量小于0,不足以跑完全程,同时将起始位置放到遍历的下一个位置 iflen(gas)==0......
  • .NET 8 中利用 MediatR 实现高效消息传递
    前言MediatR是.NET下的一个实现消息传递的库,轻量级、简洁高效,用于实现进程内的消息传递机制。它基于中介者设计模式,支持请求/响应、命令、查询、通知和事件等多种消息传递模式。通过泛型支持,MediatR可以智能地调度不同类型的消息,非常适合用于领域事件处理。在本文中,将通过一......
  • 高级工程师面试大全- 消息中间件篇
    1.rabbitMQ1.1使用RabbitMQ有什么好处?1、解耦,系统A在代码中直接调用系统B和系统C的代码,如果将来D系统接入,系统A还需要修改代码,过于麻烦!2、异步,将消息写入消息队列,非必要的业务逻辑以异步的方式运行,加快响应速度3、削峰,并发量大的时候,所有的请求直接怼到数据库,造成数据库连......
  • centos7 安装docker 并运行es、rabbitmq 服务 记录
    部署docker当执行 yuminstall-ydocker-cedocker-ce-clicontainerd.iodocker-buildx-plugindocker-compose-plugin出现  “[Errno14]curl#7-“Failedtoconnectto2a03:2880:f10e:83:face:b00c:0:25de:网络不可达”修改其下载源:yum-config-manager--add-repo......
  • 参加阿里云云消息队列 RabbitMQ 版动手操作,赠送博客园T恤
    这是8月份园子和阿里云的第3期推广合作,招募100人参加云消息队列RabbitMQ版动手操作,有效完成动手操作的前100人赠送1件原价79元的博客园T恤,如果不需要T恤,也可以选原价不高于79元的其他周边。活动官网:https://developer.aliyun.com/special/yunduanwendao/rabbitmq01参与步骤:1......
  • P5535 【XR-3】小道消息
    先介绍伯特兰·切比雪夫定理:伯特兰—切比雪夫定理说明:若整数n>3,则至少存在一个质数p,符合n<p<2n−2。另一个稍弱说法是:对于所有大于1的整数n,至少存在一个质数p,符合n<p<2n。知道这个之后这道题就很简单了,我们先简单想想一个质数在一天可以通知除去它的倍数的所有数。那我们来分讨一......
  • 陪玩系统app如何配置推送设置,手把手教你uniapp 如何打通消息推送
    进入uniapp开发者中心,开发者中心包名:每个app都有自己的一个身份证,叫做包名,很多地方会用到。包名去HBuilderX获取,点击发行app云打包可获取app包名说明:推送采用uniapp的官方推送,实际uniapp也是和个推合作,走的是个推的sdk。我们采用uniapp的推送1.0版本,不使用2.0,因为2.0必......
  • 58同城微聊消息自动回复 – 浏览器插件
    功能介绍浏览器插件源码开放,可以随意二次开发,无时间限制,无账号限制,无电脑限制实现原理,纯浏览器插件实现,监控浏览器界面元素变动,获取直播间或者直播中控后台的评论文本,匹配回复关键词或调用AI接口,再利用js模拟输入和点击等操作支持以下中控台或直播间地址,获取评论与回复评论......