rabbitmq实现可以使用java或者springboot的封装方法,自己创建实现,也可以使用中间件实现,相对于自建,使用rabbitmq应用场景及使用更系统安全。本文具体介绍rabbitmq中间件部署。
消息队列中间件是分布式系统中重要的组件,主要解决应用耦合,异步消息,流量削锋等问题实现高性能,高可用,可伸缩和最终一致性[架构] 使用较多的消息队列有 ActiveMQ(安全),RabbitMQ,ZeroMQ,Kafka(大数据),MetaMQ,RocketMQ
在实际应用中常用的使用场景:异步处理,应用解耦,流量削锋和消息通讯四个场景
以下本人linux部署步骤:
安装erlang(rabbitmq需要该语言运行)
配置相关依赖环境
##yum -y install gcc glibc-devel make ncurses-devel openssl-devel xmlto perl wget yum install -y make gcc gcc-c++ m4 openssl openssl-devel ncurses-devel unixODBC unixODBC-devel java java-devel socat
下载安装包(本人rabbitmq要求的版本是22以上)
cd /home/erlang wget http://www.erlang.org/download/otp_src_23.3.tar.gz
解压并安装
tar -xzvf otp_src_23.3.tar.gz cd otp_src_23.3 ./configure --prefix=/usr/local/erlang make && make install
配置环境
vim /etc/profile ###为文件添加值start: ERL_PATH=/home/erlang/otp_src_23.3/bin PATH=$ERL_PATH:$PATH ###end source /etc/profile 使配置生效,在shell中使用
查看安装情况
erl -version 查看版本号 #启动erlang命令 ./bin/erl
下载rabbitmq安装包
rabbitmq官网下载地址:https://www.rabbitmq.com/ 下载压缩包(注意下载linux版的,类似:rabbitmq-server-3.9.6-1.el7.noarch.rpm
)本人下载的 centos7相应版本:在window机中下载相关包上传到linux机中
https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.8.16/rabbitmq-server-3.8.16-1.el7.noarch.rpm
rpm包安装后没具象文件位置集,相关文件应该是融入主文件里其他位置中,使用模糊查询,查看安装位置,如: rpm -aq rabbitmq*
安装rabbitmq
##安装命令 rpm -Uvh rabbitmq-server-3.9.6-1.el7.noarch.rpm ##若报错:erlang >= 23.2 被 rabbitmq-server-3.9.6-1.el7.noarch 需要;可使用如下命令:(nodeps是忽视依赖关系,可不加) rpm -ivh --nodeps rabbitmq-server-3.8.16-1.el7.noarch --force --nodeps
启动rabbitmq
service rabbitmq-server start # 配置用户及WEBUI插件 ##启用插件: rabbitmq-plugins enable rabbitmq_management ##添加用户: rabbitmqctl add_user admin admin ##设置用户tag: rabbitmqctl set_user_tags admin administrator ##赋予用户默认vhost的全部操作权限: rabbitmqctl set_permissions -p / admin ".*" ".*" ".*" ##开机自动启动 chkconfig rabbitmq-server on
找不到。启动程序找不到erl命令,那么添加erlang安装的bin目录,我这里是编译安装的,目录如下。
vim /usr/lib/rabbitmq/bin/rabbitmq-server
在set -e 下面加两行
#erlang export PATH=$PATH:/opt/otp_src_23.3/bin
无法访问页面,查看message日志,发现命令没有发现,rabbitmq失败
查看位置
在环境文件前面也加上他的环境变量
这下重启没有报错了
添加admin用户
http://localhost:15672 登录,无法直接使用远程ip登录访问,需要进行配置,在配置之前先创建一个admin账号,并进行授权:
查看用户 :rabbitmqctl list_users
创建一个admin用户:rabbitmqctl add_user admin admin
用户授权 :rabbitmqctl set_user_tags admin administrator
# rabbitmqctl set_permissions -p / asdf ".*" ".*" ".*" #这条命令好像不用执行就可以
创建用户之后需要进行配置,若使用rpm安装,则修改配置/etc/rabbitmq/rabbitmq.config,配置内容大致如下:
[ {rabbit, [%% %% Network Connectivity %% ==================== %% %% By default, RabbitMQ will listen on all interfaces, using %% the standard (reserved) AMQP port. %% {tcp_listeners, [5672]}, {loopback_users, ["admin"]} ]} ].
最后面那个点必填的
配置之后,如果rabbitmq无法正常启动,出现错误:
init terminating in do_boot () Crash dump is being written to: erl_crash.dump...done
此时有可能是配置文件配置有问题,确保配置文件没问题之后,重启即可。
可以看到,没有那个配置文件,创建并写上面的配置,然后重启rabbitmq,可以看到rabbitmq服务状态中,显示使用这个新建的配置
再次查看用户,可以看到多了个admin用户
开启管理页面
需要开启15672 web管理页面端口,需要执行命令 ,不然页面无法访问,这个端口没有监听
rabbitmq-plugins enable rabbitmq_management
访问Rabbitmq http://xxx.xxx.xxx.xxx:15672/
默认账号密码:guest guest 只能在本地访问
使用新添用户:admin admin
访问即可,创建成功:
其他命令
1、可以查看服务状态:
[root@localhost ~]# rabbitmqctl status[root@mcw14 opt]# rabbitmqctl status Status of node rabbit@mcw14 ... Runtime OS PID: 114466 OS: Linux Uptime (seconds): 1512 Is under maintenance?: false RabbitMQ version: 3.8.16 Node name: rabbit@mcw14 Erlang configuration: Erlang/OTP 23 [erts-11.2] [source] [64-bit] [smp:1:1] [ds:1:1:10] [async-threads:1] [hipe] Erlang processes: 371 used, 1048576 limit Scheduler run queue: 1 Cluster heartbeat timeout (net_ticktime): 60 Plugins Enabled plugin file: /etc/rabbitmq/enabled_plugins Enabled plugins: * rabbitmq_management * amqp_client * rabbitmq_web_dispatch * cowboy * cowlib * rabbitmq_management_agent Data directory Node data directory: /var/lib/rabbitmq/mnesia/rabbit@mcw14 Raft data directory: /var/lib/rabbitmq/mnesia/rabbit@mcw14/quorum/rabbit@mcw14 Config files * /etc/rabbitmq/rabbitmq.config Log file(s) * /var/log/rabbitmq/[email protected] * /var/log/rabbitmq/rabbit@mcw14_upgrade.log Alarms (none) Memory Total memory used: 0.0906 gb Calculation strategy: rss Memory high watermark setting: 0.4 of available memory, computed to: 1.4708 gb code: 0.0282 gb (28.15 %) other_proc: 0.0273 gb (27.19 %) allocated_unused: 0.0202 gb (20.11 %) other_system: 0.0124 gb (12.36 %) plugins: 0.006 gb (5.97 %) other_ets: 0.0033 gb (3.26 %) atom: 0.0015 gb (1.45 %) binary: 0.0011 gb (1.1 %) mgmt_db: 0.0002 gb (0.21 %) mnesia: 0.0001 gb (0.09 %) metrics: 0.0001 gb (0.07 %) msg_index: 0.0 gb (0.03 %) quorum_ets: 0.0 gb (0.01 %) connection_other: 0.0 gb (0.0 %) connection_channels: 0.0 gb (0.0 %) connection_readers: 0.0 gb (0.0 %) connection_writers: 0.0 gb (0.0 %) queue_procs: 0.0 gb (0.0 %) queue_slave_procs: 0.0 gb (0.0 %) quorum_queue_procs: 0.0 gb (0.0 %) reserved_unallocated: 0.0 gb (0.0 %) File Descriptors Total: 2, limit: 32671 Sockets: 0, limit: 29401 Free Disk Space Low free disk space watermark: 0.05 gb Free disk space: 16.7942 gb Totals Connection count: 0 Queue count: 0 Virtual host count: 1 Listeners Interface: [::], port: 25672, protocol: clustering, purpose: inter-node and CLI tool communication Interface: [::], port: 5672, protocol: amqp, purpose: AMQP 0-9-1 and AMQP 1.0 Interface: [::], port: 15672, protocol: http, purpose: HTTP API [root@mcw14 opt]#命令执行结果
2、重启服务:
[root@localhost ~]# service rabbitmq-server restart
Restarting rabbitmq-server (via systemctl):
[ 确定 ]
3、关闭服务
[root@localhost ~]# service rabbitmq-server stop
4、重置服务
[root@localhost ~]# service rabbitmq-server reset
force_reset
强制RabbitMQ node还原到最初状态.
不同于reset , force_reset 命令会无条件地重设node,不论当前管理数据库的状态和集群配置是什么。它只能在数据库或集群配置已损坏的情况下才可使用。
执行reset和force_reset之前,必须停止RabbitMQ application
将RabbitMQ node还原到最初状态.包括从所在群集中删除此node,从管理数据库中删除所有配置数据,如已配置的用户和虚拟主机,以及删除所有持久化消息.
角色及权限
1.RabbitMQ的用户角色分类:
none、management、policymaker、monitoring、administrator
2.RabbitMQ各类角色描述:
none
不能访问 management plugin
management
用户可以通过AMQP做的任何事外加:
列出自己可以通过AMQP登入的virtual hosts
查看自己的virtual hosts中的queues, exchanges 和 bindings
查看和关闭自己的channels 和 connections
查看有关自己的virtual hosts的“全局”的统计信息,包含其他用户在这些virtual hosts中的活动。
policymaker
management可以做的任何事外加:
查看、创建和删除自己的virtual hosts所属的policies和parameters
monitoring
management可以做的任何事外加:
列出所有virtual hosts,包括他们不能登录的virtual hosts
查看其他用户的connections和channels
查看节点级别的数据如clustering和memory使用情况
查看真正的关于所有virtual hosts的全局的统计信息
administrator
policymaker和monitoring可以做的任何事外加:
创建和删除virtual hosts
查看、创建和删除users
查看创建和删除permissions
关闭其他用户的connections
3.创建用户并设置角色
可以创建管理员用户,负责整个MQ的运维;可以创建RabbitMQ监控用户,负责整个MQ的监控;可以创建某个项目的专用用户,只能访问项目自己的virtual hosts
常用命令
服务管理
启动: service rabbitmq-server start 或 rabbitmq-service start
关闭: service rabbitmq-server stop 或 rabbitmq-service stop
重启: service rabbitmq-server restart
状态: rabbitmqctl status
[root@mcw14 ~]# rabbitmq-service stop -bash: rabbitmq-service: command not found [root@mcw14 ~]# service rabbitmq-server restart Redirecting to /bin/systemctl restart rabbitmq-server.service [root@mcw14 ~]# rabbitmqctl status Status of node rabbit@mcw14 ... Runtime OS PID: 81996 OS: Linux Uptime (seconds): 28 Is under maintenance?: false RabbitMQ version: 3.8.16 Node name: rabbit@mcw14 Erlang configuration: Erlang/OTP 23 [erts-11.2] [source] [64-bit] [smp:1:1] [ds:1:1:10] [async-threads:1] [hipe] Erlang processes: 367 used, 1048576 limit Scheduler run queue: 1 Cluster heartbeat timeout (net_ticktime): 60 Plugins Enabled plugin file: /etc/rabbitmq/enabled_plugins Enabled plugins: * rabbitmq_management * amqp_client * rabbitmq_web_dispatch * cowboy * cowlib * rabbitmq_management_agent Data directory Node data directory: /var/lib/rabbitmq/mnesia/rabbit@mcw14 Raft data directory: /var/lib/rabbitmq/mnesia/rabbit@mcw14/quorum/rabbit@mcw14 Config files * /etc/rabbitmq/rabbitmq.config Log file(s) * /var/log/rabbitmq/[email protected] * /var/log/rabbitmq/rabbit@mcw14_upgrade.log Alarms (none) Memory Total memory used: 0.1046 gb Calculation strategy: rss Memory high watermark setting: 0.4 of available memory, computed to: 1.4708 gb other_proc: 0.0357 gb (34.15 %) code: 0.0282 gb (27.01 %) other_system: 0.0124 gb (11.84 %) allocated_unused: 0.0105 gb (10.05 %) reserved_unallocated: 0.0069 gb (6.62 %) plugins: 0.0055 gb (5.27 %) other_ets: 0.0033 gb (3.12 %) atom: 0.0015 gb (1.4 %) binary: 0.0002 gb (0.22 %) mgmt_db: 0.0002 gb (0.15 %) mnesia: 0.0001 gb (0.09 %) metrics: 0.0001 gb (0.06 %) msg_index: 0.0 gb (0.03 %) quorum_ets: 0.0 gb (0.01 %) connection_other: 0.0 gb (0.0 %) connection_channels: 0.0 gb (0.0 %) connection_readers: 0.0 gb (0.0 %) connection_writers: 0.0 gb (0.0 %) queue_procs: 0.0 gb (0.0 %) queue_slave_procs: 0.0 gb (0.0 %) quorum_queue_procs: 0.0 gb (0.0 %) File Descriptors Total: 2, limit: 32671 Sockets: 0, limit: 29401 Free Disk Space Low free disk space watermark: 0.05 gb Free disk space: 16.5811 gb Totals Connection count: 0 Queue count: 0 Virtual host count: 1 Listeners Interface: [::], port: 15672, protocol: http, purpose: HTTP API Interface: [::], port: 25672, protocol: clustering, purpose: inter-node and CLI tool communication Interface: [::], port: 5672, protocol: amqp, purpose: AMQP 0-9-1 and AMQP 1.0 [root@mcw14 ~]#rabbitmqctl status
或者systemctl管理
用户管理
新增账号: rabbitmqctl add_user username password 删除用户: rabbitmqctl delete_user username 所有用户: rabbitmqctl list_users 修改密码: rabbitmqctl change_password username newpassword 清除密码: rabbitmqctl clear_password {userName}
admin登录
创建用户和查看用户:
rabbitmqctl add_user machangwei 123456
页面上面也可以查看到这个用户
刚刚创建的用户没有授权,不是管理用户,上面两个账号是管理用户,所以可以登录,好像是授权管理用户才能登录管理页面
根据下面的角色管理,给账号赋予administrator角色后,就i你登录到管理页面了
修改machangwei密码:
rabbitmqctl change_password machangwei 12
再看已经登录的页面,提示登录失败
刷新页面就这样了
使用新密码登录
接下来演示清除密码
清除密码之后
页面又退出登录了
admin用户登录查看,machangwei账号是没有密码了的
重新加上密码,直接修改密码就可以
清除了密码,跟你查看用户没有关系,照样是查到的
清除密码必须接账号
角色管理
用户角色分为5中类型:
none:无任何角色。新创建的用户的角色默认为 none。
management:可以访问web管理页面。
policymaker: 包含managerment所有权限,并且可以管理策略(Policy)和参数(Parameter)
monitoring: 包含management所有权限,并且可以看到所有链接、信道及节点相关的信息
administartor:包含monitoring所有权限,并且可以管理用户、虚拟机、权限、策略、参数等。(最高权限)
设置用户角色: rabbitmqctl set_user_tags zhaojigang administrator
设置多个角色: rabbitmqctl set_user_tags hncscwc monitoring policymaker
查看用户角色: rabbitmqctl list_users
设置上面创建的machangwei为管理用户
页面上查看machangwei账号拥有的角色。
继续执行命令,设置machangwei账号有多个角色
页面上查看,可以看到,它这个是设置标签,每次都是重新设置,会将之前的角色清空,换成本次设置的角色,这里不是新增,这点需要注意
我们再加上管理角色,
这下machangwei账号有登录页面的权限了,查看页面,显示是machangwei登录。
命令行查看用户,也可以看到用户有哪些角色
Vhost管理
所有虚拟主机: rabbitmqctl list_vhosts
添加虚拟主机: rabbitmqctl add_vhost vhostname
删除虚拟主机: rabbitmqctl delete_vhost vhostname
查看虚拟主机
页面上面查看下虚拟机主机
添加一个虚拟主机
可以看到刚刚创建的虚拟主机跟账号没有绑定,名字也能看到页面和命令行看到的一样
权限管理
命令格式如下:rabbitmqctl set_permissions [-p vhost] {user} {conf} {write} {read}
查询所有权限:rabbitmqctl list_permissions [-p VHostPath]
查看用户权限:rabbitmqctl list_user_permissions username
清除用户权限:rabbitmqctl clear_permissions [-p VHostPath] username
rabbitmqctl set_permissions -p mcw_vhost1 machangwei /etc/rabbitmq/rabbitmq.config read [-p vhost] {user} {conf} {write} {read}
rabbitmqctl set_permissions [-p vhost] {user} {conf} {write} {read} # 表示设置用户权限。 {vhost} 表示待授权用户访问的vhost名称,默认为 "/"; {user} 表示待授权反问特定vhost的用户名称; {conf}表示待授权用户的配置权限,是一个匹配资源名称的正则表达式; {write} 表示待授权用户的写权限,是一个匹配资源名称的正则表达式; {read}表示待授权用户的读权限,是一个资源名称的正则表达式。 # rabbitmqctl set_permissions -p / admin "^mip-.*" ".*" ".*" # 例如上面例子,表示授权给用户 "admin" 具有所有资源名称以 "mip-" 开头的 配置权限;所有资源的写权限和读权限。
设置相关 vhost 权限案例
rabbitmqctl add_vhost /myhost # 添加 vhost rabbitmqctl add_user me me123 # 设置用户和密码 rabbitmqctl set_permissions -p /myhost me ".*" ".*" ".*" # vhost 设置权限
下面查看所有用户权限,查看用户权限:
查看,me这个没有添加角色,列出权限的时候没有看到它,指定vhost的列出权限,就看到me了。也就是/myhost目前只有这个用户。而权限,是分为对vhost的配置权限,写权限,读权限。配置权限那里,可以用以什么开头的权限,这种来通配,就像之前在redis作为celery的bakend一样,写入的数据是以指定字符串开头,这样来区分消息,这里,不清楚是不是这样的作用。
页面显示如下
清除用户的vhost权限
没有了
其它几个页面
这里有vhost
也可以页面上添加用户
也可以页面上添加vhost
查看插件
rabbitmq-plugins list
如下,之前开启了管理页面的插件,才能通过页面访问rabbitmq,现在可以看到,前面是有标记的,也就是没有标记的应该就是没有开通,而且标记不同,大写的标记,应该是插件的服务端程序吧。
[root@mcw14 ~]# rabbitmq-plugins list Listing plugins with pattern ".*" ... Configured: E = explicitly enabled; e = implicitly enabled | Status: * = running on rabbit@mcw14 |/ [ ] rabbitmq_amqp1_0 3.8.16 [ ] rabbitmq_auth_backend_cache 3.8.16 [ ] rabbitmq_auth_backend_http 3.8.16 [ ] rabbitmq_auth_backend_ldap 3.8.16 [ ] rabbitmq_auth_backend_oauth2 3.8.16 [ ] rabbitmq_auth_mechanism_ssl 3.8.16 [ ] rabbitmq_consistent_hash_exchange 3.8.16 [ ] rabbitmq_event_exchange 3.8.16 [ ] rabbitmq_federation 3.8.16 [ ] rabbitmq_federation_management 3.8.16 [ ] rabbitmq_jms_topic_exchange 3.8.16 [E*] rabbitmq_management 3.8.16 [e*] rabbitmq_management_agent 3.8.16 [ ] rabbitmq_mqtt 3.8.16 [ ] rabbitmq_peer_discovery_aws 3.8.16 [ ] rabbitmq_peer_discovery_common 3.8.16 [ ] rabbitmq_peer_discovery_consul 3.8.16 [ ] rabbitmq_peer_discovery_etcd 3.8.16 [ ] rabbitmq_peer_discovery_k8s 3.8.16 [ ] rabbitmq_prometheus 3.8.16 [ ] rabbitmq_random_exchange 3.8.16 [ ] rabbitmq_recent_history_exchange 3.8.16 [ ] rabbitmq_sharding 3.8.16 [ ] rabbitmq_shovel 3.8.16 [ ] rabbitmq_shovel_management 3.8.16 [ ] rabbitmq_stomp 3.8.16 [ ] rabbitmq_top 3.8.16 [ ] rabbitmq_tracing 3.8.16 [ ] rabbitmq_trust_store 3.8.16 [e*] rabbitmq_web_dispatch 3.8.16 [ ] rabbitmq_web_mqtt 3.8.16 [ ] rabbitmq_web_mqtt_examples 3.8.16 [ ] rabbitmq_web_stomp 3.8.16 [ ] rabbitmq_web_stomp_examples 3.8.16 [root@mcw14 ~]#
监控管理器
这个就是前面开启和关闭rabbitmq的管理页面的命令
rabbitmq-plugins enable rabbitmq_management #启动
rabbitmq-plugins disable rabbitmq_management #关闭
应用管理
慎用,好像是开启和停止节点用的
关闭应用:rabbitmqctl stop_app
启动应用:rabbitmqctl start_app
stop之后,就不能访问页面了,根据命令返回打印信息,好像是停止的这个节点
stop之后,程序进程还是在的,再次start一下,又能正常访问页面了。我这里只有一个node,如果有多个node,停止一个,应该是可以在页面访问看到情况的
队列管理
查看所有队列:rabbitmqctl list_queues
清除所有队列:rabbitmqctl reset #需要先执行rabbitmqctl stop_app
强制清除队列:rabbitmqctl force_reset
删除单个队列:rabbitmqctl delete_queue queue_name
目前没有队列
添加之后,访问拒绝
参考:https://blog.csdn.net/hefeng_aspnet/article/details/125865990
查看权限
有正则匹配的配置
点击设置权限,把以mip-开头的配置,给重新设置了,为有所有权限
再次添加队列
成功添加队列,点击进入队列页面
这个消息下面,我们看到了之前添加的三个键值对。
再次查询消息队列,可以看到我们刚刚创建的队列,但是消息,显示的是0,不知道是为什么
这里有个push消息,我们试一试
消息被推送了
之前添加的参数,不是消息,我们上面点击发布消息,才是真的往这个队列里面添加了消息,可以看到多了条消息,persistent应该是是否持久化的意思吧,而memory是指内存里面有条消息,但是没有持久化存储呢吧,持久化存储为0,总共一条消息
此时再去Linux上查看,可以看到消息队列mcwcountage已经多了一条消息了。
再次发布一条消息
可以看到,两条消息了
再次查看,两条消息了
点击get消息,可以看到我们刚刚推送的消息
两条消息都get一下。如果填的消息数据,大于实际 有的消息数量,那么只显示已存在的消息
点击之后,消息被清空了
再次push两个消息
移除消息,没有安装插件,那么安装一下
显示开启了两个插件在这个节点
可以看到,这两个插件已经开启了
刚刚的操作没有截图,我们将多出来的队列1删除掉
看上面,可以知道此时只有一个队列。我们进入之后,点击移动,这时候会新建mcwqueue2队列,并把mcwcountage下的所有消息移动到新的队列,有点像是备份似的,不清楚,如果下面移动时,填写的是已有的队列名称,是不是只是新增到已有的队列里面,大概率是这样子吧。
点击移动消息,跳转到展示所有消息队列的页面,可以看到新增的队列,并且两条 消息已经从旧的mcwcountage队列移走,移动到新的队列mcwqueue2里面去了。
我们再测试一下删除,这里的删除好像是删除队列
可以看到,进入队列的详情页面,点击删除,的确把该队列删除掉了,如下,已经看不到mcwqueue2队列了
测试重启消息队列服务,对没有持久化消息的影响
我们push两个消息进入消息队列
我们重启之后,可以看到消息丢失。那么重启之后是否需要将消息持久化呢,怎么做持久化呢,持久化后是否上图中persistent就是2了呢
消息如何持久化呢?
下面看下有两条消息
[root@controller ~]# rabbitmqctl list_queues Listing queues ... q-agent-notifier-security_group-update_fanout_0134e1dc992d4f3291571f4f1150f033 0 q-l3-plugin_fanout_67ad4da1e291429b9a561a0b03734552 0 q-plugin 0 conductor_fanout_e2d9de48537d4ec09edb946f2fdb5008 0 q-agent-notifier-l2population-update_fanout_48739ab147b04dc895fe2fd340aa7eae 0 q-agent-notifier-network-delete_fanout_ed0e5abc181c4fe0bb4a802ea928a2e8 0 reply_20ac7e9ae5a349aea2ea680a5aec7d79 0 consoleauth.controller 0 q-agent-notifier-network-delete.controller 0 q-plugin.controller 0 compute_fanout_9f8cf7456e73451e8ce7d65460849817 0 reply_2455b8e5c5dc40da8650f56ae4e45174 0 q-agent-notifier-l2population-update.controller 0 cinder-volume_fanout_8dc268f1cf7d46d8bbfe4a521da8b202 0 cinder-volume.controller@lvm_fanout_b606cbf8a67c4600879b2a64a5320db4 0 q-agent-notifier-network-update 0 q-agent-notifier-port-update.controller 0 conductor_fanout_2b33be46a0d448868f6dc32678a21500 0 q-agent-notifier-port-update_fanout_2b5d1a5c039e4e2caddea9ed34185fe8 0 q-agent-notifier-port-update_fanout_ebc99b8c158c48529e653e9ceb2f7760 0 consoleauth_fanout_4964cf77ca7944f8a8d85fe555703a0c 0 q-server-resource-versions_fanout_753834a40abd430998a0ba30955f1122 0 cinder-volume.compute2@nfs_fanout_2868e267441e4e528f4d300f33282d19 0 conductor.controller 0 q-agent-notifier-l2population-update_fanout_9cae8c6abdc84202bc35ebbee373b422 0 reply_c9910c029ba1432c98ae4c3576ef7df6 0 conductor 0 q-agent-notifier-network-update.controller 0 scheduler_fanout_5eea12b3d0114d58ac0d10fc3799f0d1 0 q-agent-notifier-security_group-update_fanout_1452a407fcd54a5ab47285254b86a8d3 0 q-agent-notifier-security_group-update.compute1 0 reply_52a5090b02f5436c9e7aa541e732ef52 0 cinder-volume 0 q-agent-notifier-network-update_fanout_a4a9e02adfc748fd9a8835996e802bfa 0 cinder-volume_fanout_75d1c7f92a394bd088b7f65e121d6f9a 0 q-server-resource-versions 0 neutron-vo-Trunk-1.1_fanout_138759917974485faad26891827e7447 0 compute.compute2 0 neutron-vo-Trunk-1.1_fanout_676122d00eb34a59be324d0604c9111c 0 scheduler.controller 0 neutron-vo-SubPort-1.0_fanout_e49ddc715b1943389e7e0ddd6dfb5032 0 [email protected] 0 dhcp_agent.controller 0 neutron-vo-Trunk-1.1.controller 0 q-agent-notifier-port-update.compute1 0 neutron-vo-SubPort-1.0.compute2 0 dhcp_agent_fanout_54b2d45e5d0d490f8d62283a9abcb0b8 0 cinder-volume.compute2@nfs 0 q-reports-plugin.controller 0 neutron-vo-SubPort-1.0_fanout_496e5f4cee6445af8a1edcb5c312abe6 0 q-reports-plugin_fanout_d4e460c745f445c8a22c39cbc15b9f92 0 q-agent-notifier-network-update.compute1 0 cinder-scheduler 0 consoleauth 0 q-reports-plugin_fanout_89d519d055464103a1ec5029a36390a8 0 dhcp_agent 0 q-agent-notifier-security_group-update.compute2 0 q-agent-notifier-l2population-update.compute1 0 q-agent-notifier-network-delete_fanout_d518f4f111774991971c7cdc5302fb0d 0 q-reports-plugin 0 q-agent-notifier-network-delete_fanout_4368adffe18e4742a12e2c4559b07985 0 neutron-vo-SubPort-1.0_fanout_2a542add26d8456396efe5ef3cb4c23f 0 [email protected] 0 q-agent-notifier-l2population-update_fanout_5e726a5d09f942e1afb8b02d89cdf9fe 0 q-agent-notifier-network-update.compute2 0 reply_f3414e08cc93430a914663877436085d 0 cinder-volume.controller@lvm 0 scheduler 0 neutron-vo-SubPort-1.0 0 q-l3-plugin.controller 0 neutron-vo-Trunk-1.1.compute1 0 q-agent-notifier-l2population-update.compute2 0 neutron-vo-SubPort-1.0.controller 0 q-agent-notifier-port-update.compute2 0 reply_4961a61a307a428fb0b03043c7654f7d 0 q-agent-notifier-security_group-update.controller 0 q-agent-notifier-security_group-update 0 compute_fanout_b980ee35b47c47688d4fda1a1eb9fe34 0 cinder-scheduler_fanout_3b662fb1565b4a9f8c1224380cf8c8da 0 neutron-vo-Trunk-1.1 0 compute 0 q-agent-notifier-network-delete.compute1 0 cinder-scheduler.controller 0 compute.compute1 0 q-l3-plugin_fanout_31c332c953be4c9194085590c844f934 0 neutron-vo-Trunk-1.1_fanout_a8dca45b75d8436a9ba44f8832d04d58 0 q-agent-notifier-l2population-update 0 q-agent-notifier-network-update_fanout_84a4aa333c7e435482fc64b04d120051 0 q-server-resource-versions.controller 0 neutron-vo-SubPort-1.0.compute1 0 q-agent-notifier-port-update 0 q-agent-notifier-security_group-update_fanout_49d673aeec024d8fa36a49523cf335fd 0 q-agent-notifier-network-delete.compute2 0 q-agent-notifier-port-update_fanout_e3f806abc57744b290540a21aba6626f 0 q-l3-plugin 0 reply_80e9590155d64a2480314f86613ea532 0 neutron-vo-Trunk-1.1.compute2 0 q-agent-notifier-network-delete 0 q-agent-notifier-network-update_fanout_c60c9bcfa3eb40c7b5c025980bece3f5 0 q-plugin_fanout_a07db20790814d52ba25d914e01fe82f 0 [root@controller ~]#查看openstack中有的消息队列
集群管理
查看集群状态: rabbitmqctl cluster_status
摘除节点: rabbitmqctl forget_cluster_node [--offline]
组成集群命令: rabbitmqctl join_cluster <clusternode> [--ram]
修改节点存储形式: rabbitmqctl change_cluster_node_type disc | ram
修改节点名称: rabbitmqctl rename_cluster_node oldnode1 newnode1 [oldnode2 newnode2] [oldnode3 newnode3...]
[root@mcw14 ~]# rabbitmqctl cluster_status Cluster status of node rabbit@mcw14 ... Basics Cluster name: rabbit@mcw14 Disk Nodes rabbit@mcw14 Running Nodes rabbit@mcw14 Versions rabbit@mcw14: RabbitMQ 3.8.16 on Erlang 23 Maintenance status Node: rabbit@mcw14, status: not under maintenance Alarms (none) Network Partitions (none) Listeners Node: rabbit@mcw14, interface: [::], port: 15672, protocol: http, purpose: HTTP API Node: rabbit@mcw14, interface: [::], port: 25672, protocol: clustering, purpose: inter-node and CLI tool communication Node: rabbit@mcw14, interface: [::], port: 5672, protocol: amqp, purpose: AMQP 0-9-1 and AMQP 1.0 Feature flags Flag: drop_unroutable_metric, state: disabled Flag: empty_basic_get_metric, state: disabled Flag: implicit_default_bindings, state: enabled Flag: maintenance_mode_status, state: enabled Flag: quorum_queue, state: enabled Flag: user_limits, state: enabled Flag: virtual_host_metadata, state: enabled [root@mcw14 ~]#rabbitmqctl cluster_status
看下我们部署的openstack,它的集群状态是什么样的,也就一个节点,跟上面命令访问结果不一样,那么上面可能是没有形成集群,只是一个单节点。
加入集群失败
信息查看
rabbitmqadmin list connections #查看所有连接
rabbitmqadmin show overview #概览 Overview
rabbitmqadmin list nodes #查看所有节点 Node
rabbitmqadmin list channels #查看所有通道 Channel
rabbitmqadmin list consumers #查看所有消费者 Consumer
rabbitmqadmin list exchanges #查看所有路由 Exchange
rabbitmqadmin list bindings #查看所有路由与队列的关系绑定 Binding
这样就可以了,之前节点被关闭了。开启节点后,然后查看有哪些节点,可以正常看到
查看所有连接
概览
查看所有节点
查看所有通道
查看所有消费者
查看所有路由
查看所有路由与队列的关系绑定
python操作rabbitmq
RabbitMq生产者消费者模型
生产者(producter) 队列消息的产生者,复制生产消息,并将消息传入队列
生产者代码:
import pika import json credentials = pika.PlainCredentials('admin','admin')#mq用户名和密码,用于认证 #虚拟队列需要指定参数virtual_host,如果是默认的可以不填 connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.0.0.24',port=5672,virtual_host='/',credentials=credentials)) channel = connection.channel()# 创建一个AMQP信道 #声明队列,并设置durable为True,为了避免rabbitMq-server挂掉数据丢失,将durable设为True channel.queue_declare(queue='1',durable=True) for i in range(10): # 创建10个q message = json.dumps({'OrderId':"1000%s"%i}) # exchange表示交换器,可以精确的指定消息应该发到哪个队列中,route_key设置队列的名称,body表示发送的内容 channel.basic_publish(exchange='',routing_key='1',body=message) print(message) connection.close()操作前
通过pika生命一个认证用的凭证,然后用pika创建rabbitmq的块连接,再用上面的连接创建一个AMQP信道 。创建消息队列的连接时,需要指定ip,断开,虚拟主机,凭证。
然后根据上面的信道,声明一个队列,
我们可以看到,下面信道点队列声明里的queue参数值就队列的名字。这里是遍历0到9,然后打印了下消息,这里的生成的消息,是json序列化后的数据。然后将数据作为i,信道点基础发布的body参数的值。上面信道点队列声明是创建一个队列,队列名字是’1‘,下面我们用信道点基本发布,是将我们创建的消息体发送到队列中,路由_key就是指定队列名称,指定发布消息到哪个队列,消息是作为body的参数,
最后,需要将这个消息队列的连接关闭。
我们通过页面可以看到,已经创建好了这个队列,队列名字为1,并且已经通过遍历生成的10个消息,调用十次信道点基础发布方法,将这十个产生的消息发布到消息队列中
我们可以再看下,可以看到我们创建的消息的具体内容。
消费者(consumer):队列消息的接收者,扶着接收并处理消息队列中的消息
import pika credentials = pika.PlainCredentials('admin','admin') connection = pika.BlockingConnection(pika.ConnectionParameters( host='10.0.0.24', port=5672, virtual_host='/', credentials=credentials )) channel = connection.channel() #声明消息队列,消息在这个队列中传递,如果不存在,则创建队列 channel.queue_declare(queue='1',durable=True) # 定义一个回调函数来处理消息队列中消息,这里是打印出来 def callback(ch,method,properties,body): ch.basic_ack(delivery_tag=method.delivery_tag) print(body.decode()) #告诉rabbitmq,用callback来接收消息 channel.basic_consume('1',callback) #开始接收信息,并进入阻塞状态,队列里有信息才会调用callback进行处理 channel.start_consuming()
获取消息,创建凭证,连接,信道,然后什么一下队列。指定我们要获取哪个队列中的消息,如果没有这个队列,就会创建这个队列,存在,那么后面使用这个信道,就会从这个队列中获取数据。信道是通过rabbitmq的连接对象来生成的,连接对象中放了连接用的凭证。所以,信道点基础消费方法,指定是哪个消息队列,那么就会从这个队列中获取消息。然后传参回调函数。而回调函数中,
我们可以看到,基础消费方法里面有消息回调,就是上面我们自定义的回调函数
这个方法定义了回调函数的写法。第一个参数是信道
第二个参数是方法,第三个参数是属性,第四个是body,这些不用管,只需要按如下格式,就可以从body,做个解码,就将信道点基础消费中指定的队列中的消息,取出来了,我们是用回调函数来接收消息,当需要获取消息的时候,就需要执行信道点开始消费的方法。这里好像是遍历队列一个一个的将消息获取出来。那么怎样实现,实时监听消息,实时消费呢
RabbitMq持久化
RabbitMq持久化
MQ默认建立的临时的queue和exchange,如果不声明持久化,一旦rabbitmq挂掉,queue,exchange将会全部丢失,所以我们一般在创建queue或者exchange的时候会声明持久化
1.queue声明持久化
# 声明消息队列,消息将在这个队列传递,如不存在,则创建。durable = True 代表消息队列持久化存储,False 非持久化存储 result = channel.queue_declare(queue = 'python-test',durable = True)
使用True
重启消息队列服务
消息队列还在,但是消息被清空了
当我改为false的时候,因为队列1已经存在,并且是Tue声明的,所以这里就报错了
我们设置为false,然后声明一个不存在的队列2
创建好了队列,并且10个消息
重启一下消息队列服务
刚刚上面创建的队列2已经不存在,这已经不是消息被清空了,而是队列直接被清除了
也就是这个Ture,是保留队列用的,持久化队列的。
channel.queue_declare(queue='2',durable=True)
2、exchange声明持久化
# 声明exchange,由exchange指定消息在哪个队列传递,如不存在,则创建.durable = True 代表exchange持久化存储,False 非持久化存储 channel.exchange_declare(exchange = 'python-test', durable = True)
注意:如果已存在一个非持久化的queue或exchange,执行上述代码会报错,因为当前状态不能更该queue 或 exchange存储属性,需要删除重建,如果queue和exchange中一个声明了持久化,另一个没有声明持久化,则不允许绑定
我们在1处改了,但是在2处没有修改。结果有问题。
队列2不存在,所以没有将消息放进去
而exchange这里,没有写将消息推送到声明的python-test里面,所以里面也没有消息
这次是声明的exchange,并且将消息推送到python-test里面
还是没有看到有东西呀
我们这里发布个消息,可以看到,是需要路由的
加上路由,再次执行程序
由于队列2 不存在,好像还是不行
我在这里给它bind一个路由
感觉还是没有弄明白,先放弃了
原来是如下方式呀。
首先,在python-test2里面,
给exchange绑定队列1和2
1和2目前的消息数量
我往路由1里面push一个消息
push成功
然后再看队列1里面,可以看到多了一条刚刚push的消息
接下来用程序实现,声明exchange,然后发布方法不变,发布到exchage中,因为已经绑定了两个路由了,这里指定路由key,根据路由key,可以将消息push到对应的队列中去
我们可以看到,之前是页面点击push了一条,上面程序push了十条到exchange,现在这个队列就有11条数据。可是这个exchange和队列的绑定,是我自己在页面上绑定的,这个应该不合理。以后有时间看下,怎么用程序绑定。
我们可以看到,应该是程序中缺少使用这个绑定方法吧
3、消息持久化
虽然exchange和queue都声明了持久化,但如果消息只存在内存里,rabbitmq重启后,内存里的东西还是会丢失,所以必须声明消息也是持久化,从内存转存到到硬盘
# 向队列插入数值 routing_key是队列名。delivery_mode = 2 声明消息在队列中持久化,delivery_mod = 1 消息非持久化 channel.basic_publish(exchange = '',routing_key = 'python-test',body = message, properties=pika.BasicProperties(delivery_mode = 2))
4、acknowledgement消息不丢失
消费者(consume)调用callback函数时,会存在处理消息失败的风险,如果处理失败,则消息会丢失,但是也可以选择消费者处理失败时,将消息回退给rabbitmq,重新再被消费者消费,这个时候需要设置确认标识。
channel.basic_consume(callback,queue = 'python-test',
# no_ack 设置成 False,在调用callback函数时,未收到确认标识,消息会重回队列。True,无论调用callback成功与否,消息都被消费掉
no_ack = False)
参考链接:https://www.jianshu.com/p/cc322adf060c
https://blog.csdn.net/weixin_45144837/article/details/104335115
https://blog.csdn.net/weixin_45144837/article/details/104335115
标签:部署,rabbitmq,队列,rabbitmqctl,gb,agent,常用命令,消息 From: https://www.cnblogs.com/machangwei-8/p/17471499.html