1.RabbitMQ中间件
1.1. 什么是中间件
什么是中间件
我国企业从20世纪80年代开始就逐渐进行信息化建设,由于方法和体系的不成熟,以及企业业务的市场需求的不断变化,一个企业可能同时运行着多个不同的业务系统,这些系统可能基于不同的操作系统、不同的数据库、异构的网络环境。现在的问题是,如何把这些信息系统结合成一个有机地协同工作的整体,真正实现企业跨平台、分布式应用。中间件便是解决之道,它用自己的复杂换取了企业应用的简单。
中间件(Middleware)是处于操作系统和应用程序之间的软件,也有人认为它应该属于操作系统中的一部分。人们在使用中间件时,往往是一组中间件集成在一起,构成一个平台(包括开发平台和运行平台),但在这组中间件中必须要有一个通信中间件,即中间件+平台+通信,这个定义也限定了只有用于分布式系统中才能称为中间件,同时还可以把它与支撑软件和使用软件区分开来
为什么需要使用消息中间件
具体地说,中间件屏蔽了底层操作系统的复杂性,使程序开发人员面对一个简单而统一的开发环境,减少程序设计的复杂性,将注意力集中在自己的业务上,不必再为程序在不同系统软件上的移植而重复工作,从而大大减少了技术上的负担,中间件带给应用系统的,不只是开发的简便、开发周期的缩短,也减少了系统的维护、运行和管理的工作量,还减少了计算机总体费用的投入。
中间件特点
为解决分布异构问题,人们提出了中间件(middleware)的概念。中间件时位于平台(硬件和操作系统)和应用之间的通用服务,如下图所示,这些服务具有标准的程序接口和协议。针对不同的操作系统和硬件平台,它们可以有符合接口的协议规范的多种实现。
也很难给中间件一个严格的定义,但中间件应具有如下的一些特点:
(1)满足大量应用的需要
(2)运行于多种硬件和 OS平台
(3)支持分布计算,提供跨网络、硬件和 OS平台的透明性的应用或服务的交互
(4)支持标准的协议
(5)支持标准的接口
由于标准接口对于可移植性和标准协议对于互操作性的重要性,中间件已成为许多标准化工作的主要部分。对于应用软件开发,中间件远比操作系统和网络服务更为重要,中间件提供的程序接口定义了一个相对稳定的高层应用环境,不管底层的计算机硬件和系统软件怎样更新换代,只要将中间件升级更新,并保持中间件对外的接口定义不变,应用软件几乎不需任何修改,从而保护了企业在应用软件开发和维护中的重大投资。
简单说:中间件有个很大的特点,是脱离于具体设计目标,而具备提供普遍独立功能需求的模块。这使得中间件一定是可替换的。如果一个系统设计中,中间件时不可替代的,不是架构、框架设计有问题,那么就是这个中间件,在别处可能是个中间件,在这个系统内是引擎。
在项目中什么时候使用中间件技术
在项目的架构和重构中,使用任何技术和架构的改变我们都需要谨慎斟酌和思考,因为任何技术的融入和变化都可能人员,技术,和成本的增加,中间件的技术一般现在一些互联网公司或者项目中使用比较多,如果你仅仅还只是一个初创公司建议还是使用单体架构,最多加个缓存中间件即可,不要盲目追求新或者所谓的高性能,而追求的背后一定是业务的驱动和项目的驱动,因为一旦追求就意味着你的学习成本,公司的人员结构以及服务器成本,维护和运维的成本都会增加,所以需要谨慎选择和考虑。
但是作为一个开放人员,一定要有学习中间件技术的能力和思维,否则很容易当项目发展到一个阶段在去掌握估计或者在面试中提及,就会给自己带来不小的困扰,在当今这个时代这些技术也并不是什么新鲜的东西,如果去掌握和挖掘最关键的还是自己花时间和经历去探讨和研究。
1.2. 中间件技术及架构的概述
学习中间件的方式和技巧
- 理解中间件在项目架构中的作用,以及各中间件的底层实现
- 可以使用一些类比的生活概念去理解中间件
- 使用一些流程图或者脑图的方式去梳理各个中间件在架构中的作用
- 尝试用 java技术去实现中间件的原理
- 静下来去思考中间件在项目中设计的和使用的原因
- 如果找到对应的代替总结方案
- 尝试编写博文总结类同中间件技术的对比和使用场景
- 学会查看中间件的源码以及开源项目和博文
什么是消息中间件
在实际的项目中,大部分的企业项目开发中,在早起都采用的是单体的架构模式
单体架构
在企业开发当中,大部分的初期架构都采用的是单体架构的模式进行架构,而这种架构的典型的特点:就是把所有的业务和模块,源代码,静态资源文件等都放在一个工程中,如果其中的一个模块升级或者迭代发生一个很小的变动都会重新编译和重新部署项目。这种这狗存在的问题是:
- 耦合度太高
- 不易维护
- 服务器的成本高
- 以及升级架构的复杂度也会增大
这样就有后续的分布式架构系统。如下
分布式架构
何谓分布式系统:
通俗一点:就是一个请求由服务器端的多个服务(服务或者系统)协同处理完成
和单体架构不同的是,单体架构是一个请求发起 jvm调度线程(确切的是 tomcat线程池)分配线程 Thread来处理请求直到释放,而分布式系统是:一个请求时由多个系统共同来协同完成,jvm和环境都可能是独立。如果生活中的比喻的话,单体架构就像建设一个小房子很快就能够搞定,如果你要建设一个鸟巢或者大型的建筑,你就必须是各个环节的协同和分布,这样目的也是项目发展到后期的时候要去部署和思考的问题。我们也不难看出来:分布式架构系统存在的特点和问题如下:
存在问题:
- 学习成本高,技术栈过多
- 运维成本和服务器成本增高
- 人员的成本也会增高
- 项目的负载度也会上升
- 面临的错误和容错性也会成倍增加
- 占用的服务器端口和通讯的选择的成本高
- 安全性的考虑和因素逼迫可能选择 RMI/MQ相关的服务器端通讯
好处:
- 服务系统的独立,占用的服务器资源减少和占用的硬件成本减少,确切的说是:可以合理的分配服务资
- 源,不造成服务器资源的浪费
- 系统的独立维护和部署,耦合度降低,可插拔性
- 系统的架构和技术栈的选择可以变的灵活(而不是单纯地选择 java)
- 弹性的部署,不会造成平台因部署造成的瘫痪和停服的状态
1.3. 基于消息中间件的分布式系统的架构
从上图中可以看出来,消息中间件的是
- 利用可靠的消息传递机制进行系统和系统直接的通讯
- 通过提供消息传递和消息的派对机制,它可以在分布式系统环境下扩展进程间的通讯
消息中间件应用的场景
- 跨系统数据传递
- 高并发的流量削峰
- 数据的并发和异步处理
- 大数据分析与传递
- 分布式事务比如你有一个数据要进行迁移或者请求并发过多的时候,
比如你有10 W的并发请求下订单,我们可以在这些订单入库之前,我们可以把订单请求堆积到消息队列中,让它稳健可靠的入库和执行
常见的消息中间件
ActiveMQ、RabbitMQ、Kafka、RocketMQ等
消息中间件的本质及设计
它是一种接受数据、接受请求、存储数据、发送数据等功能的技术服务
MQ消息队列:负责数据的传接受,存储和传递,所以性能要高于普通服务和技术
谁来生产消息,存储消息和消费消息呢?
消息中间件的核心组成部分
-
消息的协议
-
消息的持久化机制
-
消息的分发策略
-
消息的高可用,高可靠
-
消息的容错机制
小结
其实不论选择单体架构还是分布式架构都是项目开发的一个阶段,在什么阶段选择合适的架构方式,而不能盲目追求,最后造成的后果和问题都需要自己买单。但作为一个开发人员学习和探讨新的技术使我们每个程序开发者都应该去保持和思考的问题。当我们没办法去改变社会和世界的时候,我们为了生活和生存那就必须要迎合企业和市场的需求,发挥你的价值和所学的才能,创造价值和实现自我
1.4. 消息队列协议
什么是协议
所谓协议是指:
- 计算机底层操作系统和应用程序通讯时共同遵守的一组约定,只有遵循共同的约定和规范,系统和底层操作系统之间才能相互交流
- 和一般的网络应用程序的不同它主要负责数据的接受和传递,所以性能比较的高
- 协议对数据格式和计算机之间交换数据都必须严格遵守规范
网络协议的三要素
- 语法:语法是用户数据与控制信息的结构与格式,以及数据出现的顺序
- 语义:语义是解释控制信息每个部分的意义,它规定了需要发出何种控制信息,以及完成的动作与做出什么样的响应
- 时序:时序是对事件发生顺序的详细说明
比如我 MQ发送一个信息,是以什么数据格式发送到队列中,然后每个部分的含义是什么,发送完毕以后的执行的动作,以及消费者消费消息的动作,消费完毕的相应结构和反馈是什么,然后按照对应的执行顺序进行处理。如果你还是不理解:大家每天都在接触的 http请求协议:
- 语法:http规定了请求报文和响应报文的格式
- 语义:客户端主动发起请求称之为请求(这是一种定义,同时你发起的是 post/get请求)
- 时序:一个请求对应一个响应(一定先有请求在有响应,这个是时序)
而消息中间件采用的并不是 http协议,而常见的消息中间件协议有有:OpenWire、AMQP、MQTT、Kafka,OpenMessage协议
面试题:为什么消息中间件不直接使用 http协议
- 因为 http请求报文头和响应报文头是比较复杂的,包含了Cookie,数据的加密解密,窗台吗,响应码等附加的功能,但是对于一个消息而言,我们并不需要这么复杂,也没有这个必要性,它其实就是负责数据传递,存储,分发就行,一定要追求的是高性能。尽量简洁,快速
- 大部分情况下 http大部分都是短链接,在实际的交互过程中,一个请求到响应都很有可能会中断,中断以后就不会执行持久化,就会造成请求的丢失。这样就不利于消息中间件的业务场景,因为消息中间件可能是一个长期的获取信息的过程,出现问题和故障要对数据或消息执行持久化等,目的是为了保证消息和数据的高可靠和稳健的运行
AMQP协议
AMQP:(全称:Advanced Message Queuing Protocol)是高级消息队列协议。由摩根大通集团联合其他公司共同设计。是一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。Erlang中的实现由 RabbitMQ等
特性:
- 分布式事务支持
- 消息的持久化支持
- 高性能和高可靠的消息处理优势
MQTT协议
MQTT协议(Message Queueing Telemetry Transport)消息队列是 IBM开放的及时通讯协议,物联网系统架构中的重要组成部分
特点:
- 轻量
- 结构简单
- 传输快,不支持事务
- 没有持久化设计
应用场景:
-
适用于计算能力有限
-
低带宽
-
网络不稳定的场景
OpenMessage协议
是近几年由阿里、雅虎和滴滴出行、Stremalio等公司共同参与创立的分布式信息中间件、流处理等领域的应用开发标准
特点:
-
结构简单
-
解析速度快
-
支持事务和持久化设计
Kafka协议
Kafka协议是基于 TCP/IP的二进制协议。消息内部是 通过长度来分割,由一些基本数据类型组成
特点:
-
结构简单
-
解析速度快
-
无事务支持
-
有持久化设计
小结
协议:实在 tcp/ip协议基础之上构建的一种约定俗称的规范和机制、它的主要目的可以让客户端(应用程序 java,go)进行沟通和通讯。并且这种写一下规范必须具有持久性,高可用,高可靠的性能
1.5. 消息队列持久化
持久化
简单来说就是将数据存入磁盘,而不是存在内存中随服务器重启断开而消失,使数据能够永久保存
常见的持久化方式
ActiveMQ | RabbitMQ | Kafka | RocketMQ | |
---|---|---|---|---|
文件存储 | 支持 | 支持 | 支持 | 支持 |
数据库 | 支持 | / | / | / |
1.6. 消息的分发策略
消息的分发策略
MQ消息 队列有如下几个角色
- 生产者
- 存储消息
- 消费者
那么生产者生成消息以后,MQ进行存储,消费者是如何获取消息的呢?一般获取数据的方式无外乎推(push)或者拉(pull)两种方式,典型的 git就有推拉机制,我们发送的 http请求就是一种典型的拉取数据库数据返回的过程。而消息队列 MQ是一种推送的过程,而这些推机制会使用到很多的业务场景也有很多对应推机制策略
场景分析一
比如我在 APP上下了一个订单,我们的系统和服务很多,我们如何得知这个消息被哪个系统或者哪些服务器或者系统进行消费,那这个时候就需要一个分发的策略。这就需要消费策略。或者称之为消费的方法论
场景分析二
在发送消息的过程中可能会出现异常,或者网络的抖动,故障等等因为造成消息的无法消费,比如用户在下订单,消费 MQ接受,订单系统出现故障,导致用户支付失败,那么这个时候就需要消息中间件就必须支持消息重试机制策略。也就是支持:出现问题和故障的情况下,消息不丢失还可以进行重发
消息分发策略的机制和对比
ActiveMQ | RabbitMQ | Kafka | RocketMQ | |
---|---|---|---|---|
发布订阅 | 支持 | 支持 | 支持 | 支持 |
轮询分发 | 支持 | 支持 | 支持 | / |
公平分发 | / | 支持 | 支持 | / |
重发 | 支持 | 支持 | / | 支持 |
消息拉取 | / | 支持 | 支持 | 支持 |
1.7. 消息队列高可用和高可靠
什么是高可用机制
所谓高可用:是指产品在规定的条件和规定的时刻或时间内处于可执行规定功能状态的能力
当业务量增加时,请求也过大,一台消息中间件服务器的会触及硬件(CPU,内存,磁盘)的极限,一台消息服务器你已经无法满足业务的需求,所以消息中间件必须支持集群部署,来达到高可用的目的
集群模式1 - Master-slave主从共享数据的部署方式
集群模式2 - Master-slave主从同步部署方式
解释:这种模式写入消息同样在 Master主节点上,但是主节点会同步数据到 slave节点形成副本,和 zookeeper或者 redis主从机制很雷同。这样可以达到负载均衡的效果,如果消费者有多个这样就可以去不同的节点进行消费,以为消息的拷贝和同步会占用很大的带宽和网络资源。在后去的 rabbitmq中会有使用
集群模式3 - 多主集群同步部署模式
解释:和上面的区别不是特别的大,但是它的写入可以往任意节点去写入
集群模式4 - 多主集群转发部署模式
解释:如果你插入的数据是 broker-1中国,元数据信息会存储数据的相关描述和记录存放的位置(队列)。它会对描述信息也就是元数据信息进行同步,如果消费者在 broker-2中进行消费,发现自己节点没有对应的信息,可以从对应的元数据信息中去查询,然后返回对应的消息信息,场景:比如买火车票或者黄牛买演唱会门票,比如第一个黄牛有顾客说要买的演唱会门票,但是没有但是他回去联系其他的黄牛询问,如果有就返回
集群模式5 Master-slave与 Broker-cluster组合的方案
解释:实现多主多从的热备机制来完成消息的高可用以及数据的热备机制,在生产规模达到一定的阶段的时候,这种使用的频率比较高
什么是高可靠机制
所谓高可靠是指:系统可以无故障低持续运行,比如一个系统突然崩溃,报错,异常等等并不影响线上业务的正常运行,出错的几率极低,就称之为:高可靠
在高并发的业务场景中,如果不能保证系统的高可靠,那造成的隐患和损失是非常严重的
如何保证中间件消息的可靠性呢,可以从两个方面考虑:
- 消息的传输:通过协议来保证系统间数据解析的正确性
- 消息的存储区可靠:通过持久化来保证消息的可靠性
2.入门及安装
2.1.RabbitMQ入门及安装
2.1.1.概述
简单概述:
RabbitMQ是一个开源的遵循 AMQP协议实现的基于 Erlang语言编写,支持多种客户端(语言),用于在分布式系统中存储消息,转发消息,具有高可用,高可扩性,易用性等特征
安装有两种方式 :
第一种去官网下载压缩包自己上传解压下载
第二种直接使用wget联网下载 这里使用的是第二种
2.1.2.下载RabbitMQ
- 下载地址:Otp 21.3 - Erlang/OTP
- 环境准备:CentOS8.x + /Erlang
RabbitMQ是采用 Erlang语言开发的,所以系统环境必须提供 Erlang环境,第一步就是安装 Erlang
RabbitMQ Erlang 版本对照
RabbitMQversion | Minimum required Erlang/OTP | Maximum supported Erlang/OTP | Notes |
---|---|---|---|
3.9.13 | |||
3.9.12 | |||
3.9.11 | |||
3.9.10 | |||
3.9.9 | |||
3.9.8 | |||
3.9.7 | 23.2 | 24.x | Erlang/OTP 24 support announcement Erlang 24 was released on May 12, 2021 Some community plugins and tools may be incompatible with Erlang 24 |
3.9.6 | |||
3.9.5 | |||
3.9.4 | |||
3.9.3 | |||
3.9.2 | |||
3.9.1 | |||
3.9.0 | |||
3.8.27 | |||
3.8.26 | |||
3.8.25 | |||
3.8.24 | |||
3.8.23 | |||
3.8.22 | |||
3.8.21 | 23.2 | 24.x | Erlang/OTP 24 support announcement Erlang 24 was released on May 12, 2021 Some community plugins and tools may be incompatible with Erlang 24 |
3.8.20 | |||
3.8.19 | |||
3.8.18 | |||
3.8.17 | |||
3.8.16 | |||
3.8.15 | |||
3.8.14 | |||
3.8.13 | |||
3.8.12 | 22.3 | 23.x | Erlang/OTP 23 compatibility notes Erlang 23.x is recommended Erlang 22.x dropped support for HiPE |
3.8.11 | |||
3.8.10 | |||
3.8.9 | |||
3.8.8 | |||
3.8.7 | |||
3.8.6 | 21.3 | 23.x | Erlang/OTP 23 compatibility notes Erlang 22.x or 23.x is recommended Erlang 22.x dropped support for HiPE |
3.8.5 | |||
3.8.4 | |||
3.8.3 | |||
3.8.2 | 21.3 | 22.x | Erlang 22.x is recommended. Erlang 22.x dropped support for HiPE |
3.8.1 | |||
3.8.0 |
2.1.3.安装Erlang
查看系统版本号
[root@iZ2ze6fjqn2bx4j4fw1x8tZ src]# lsb_release -a
LSB Version: :core-4.1-amd64:core-4.1-noarch
Distributor ID: CentOS
Description: CentOS Linux release 7.8.2003 (Core)
Release: 7.8.2003
Codename: Core
下载Erlang
wget -P /home/download https://github.com/rabbitmq/erlang-rpm/releases/download/v21.2.3/erlang-21.2.3-1.el7.centos.x86_64.rpm
安装Erlang
sudo rpm -Uvh /home/download/erlang-21.2.3-1.el7.centos.x86_64.rpm
2.1.4.安装Socat
安装下载
# 安装Socat
sudo yum install -y socat
2.1.5.安装RabbitMQ
下载RabbitMQ
# 下载RabbitMQ
wget -P /home/download https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.7.9/rabbitmq-server-3.7.9-1.el7.noarch.rpm
安装RabbitMQ
sudo rpm -Uvh /home/download/rabbitmq-server-3.7.9-1.el7.noarch.rpm
启动服务
# 启动服务
systemctl start rabbitmq-server
# 查看服务状态
systemctl status rabbitmq-server.service
# 开机自启动
systemctl enable rabbitmq-server
# 停止服务
systemctl stop rabbitmq-server
2.2.RabbitMQWeb管理界面及授权操作
2.2.1.RabbitMQ管理界面
默认情况下 是没有安装web端的客户端插件 需要安装才可以生效
rabbitmq-plugins enable rabbitmq_management
说明:rabbitmq有一个默认账号和密码是:guest
默认情况只能在 localhost本计下访问,所以需要添加一个远程登录的用户
安装完毕以后,重启服务即可
systemctl restart rabbitmq-server
一定要记住,在对应服务器(阿里云,腾讯云等)的安全组中开放15672
端口
在浏览器访问
# 10.关闭防火墙服务
systemctl disable firewalld
Removed symlink /etc/systemd/system/multi-user.target.wants/firewalld.service.
Removed symlink /etc/systemd/system/dbus-org.fedoraproject.FirewallD1.service.
systemctl stop firewalld
# 11.访问web管理界面
http://101.200.132.168:15672/
2.2.2.授权账号和密码
新增用户
rabbitmqctl add_user admin admin
设置用户分配操作权限
rabbitmqctl set_user_tags admin administrator
用户级别:
- administrator:可以登录控制台、查看所有信息、可以对 rabbitmq进行管理
- monitoring:监控者 登录控制台,查看所有信息
- policymaker:策略制定者 登录控制台,指定策略
- managment 普通管理员 登录控制台
为用户添加资源权限
rabbitmqctl set_permissions -p / admin ".*"".*"".*"
网页登录成功
2.2.3.小结:
# 添加角色
# 1.rabbitmqctl add_user 账号 密码
# 赋予权限
# 2.rabbitmqctl set_user_tags 账号 权限
# 修改密码
# 3.rabbitmqctl change_password username NewPassword
# 删除用户
# 4.rabbitmqctl delete_user Username
# 查看用户
# 5.rabbitmqctl list_user
# 为用户设置administrator角色
# 6.rabbitmqctl set_paermissions -p / 用户名 "*",".*",".*"
# 为用户设置administrator角色
# 7.rabbitmqctl set_paermissions -p root "*",".*",".*"
2.3.RabbitMQ之Docker安装
01 Dokcer安装RabbitMQ
虚拟化容器技术 - Docker的安装
#1.卸载旧版本
yum remove docker \
docker-client \
docker-client-latest \
docker-common \
docker-latest \
docker-latest-logrotate \
docker-logrotate \
docker-engine
#2.需要的安装包
yum install -y yum-utils
#3.设置镜像的仓库
yum-config-manager \
--add-repo \
https://download.docker.com/linux/centos/docker-ce.repo
#上述方法默认是从国外的,不推荐
#推荐使用国内的
yum-config-manager \
--add-repo \
https://mirrors.aliyun.com/docker-ce/linux/centos/docker-ce.repo
#更新yum软件包索引
yum makecache fast
#4.安装docker相关的 docker-ce 社区版 docker-ee 企业版
yum install docker-ce docker-ce-cli containerd.io # 这里我们使用社区版即可
#5.启动docker
systemctl start docker
#6. 使用docker version查看是否按照成功
docker version
docker的相关命令
# 1.启动docker
systemctl start docker
# 2.停止docker
systemctl stop docker
# 3.重启docker
systemctl restart docker
# 4.查看docker状态
systemctl status docker
# 5.开机启动docker
systemctl enable docker
# 6.关闭开机启动dokcer
systemctl unenable docker
# 7.查看docker概要信息
docker info
# 8.查看docker帮助文档
docker --help
配置阿里云的镜像加速
#1.创建一个目录
sudo mkdir -p /etc/docker
#2.编写配置文件
sudo tee /etc/docker/daemon.json <<-'EOF'
{
"registry-mirrors": ["https://oltiqh2r.mirror.aliyuncs.com"]
}
EOF
#3.重启服务
sudo systemctl daemon-reload
sudo systemctl restart docker
安装rabbitmq
参考网站:
可以直接走图中代码,不用走下面两项!
获取rabbit镜像
docker pull rabbitmq:management
创建并运行容器
docker run -di --name myrabbit -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -p 15672:15672 -p 5672:5672 -p 25672:26572 -p 61613:61613 -p 1883:1883 rabbitmq:management
# --hostname 指定容器主机名称
# --name 指定容器名称
# -p 将mq端口号映射到本地
# -e 设置用户信息和密码
# 或者运行时设置用户和密码
启动
测试访问
4. RabbitMQ的角色分类
1.None
- 不能访问management plugin
2.Management
- 查看自己相关节点信息
- 列出自己可以通过AMQP登入的虚拟机
- 查看自己的虚拟机节点virtual hosts的queues exchanges和bindings信息
- 查看和关闭自己的channels和connections
- 查看有关自己的虚拟机节点virtual hosts的统计信息 包括其他用户在这个节点virtual hosts中活动的信息
3.Policymaker
- 包含management所有的权限
- 查看和创建和删除自己的virtual hosts所属的policies和parameters信息
4.Monitoring
- 包含management所有的权限
- 罗列出所有的virtual hosts 包括不能登陆的virtual hosts
- 查看其他用户的connections和channels信息
- 查看节点级别的数据如clustering和memory使用情况
- 查看所有的virtual hosts的全局统计信息
5.Administrator
- 最高权限
- 可以创建和删除virtual hosts
- 可以查看 创建和删除users
- 查看创建permisssions
- 关闭所有用户的connections
6.具体操作界面
3.入门案例
3.1.RabbitMQ入门案例-Simple 简单模式
https://www.bilibili.com/video/BV1dX4y1V73G?p=44 实现步骤
- jdk1.8
- 构建一个 maven工程
- 导入 rabbitmq的 maven依赖
- 启动 rabbitmq-server服务
- 定义生产者
- 定义消费者
- 观察消息的在 rabbitmq-server服务中的进程
3.1.1.构建一个maven工程
3.1.2.导入依赖
java原生依赖
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.10.0</version>
</dependency>
3.1.3.第一种模型
在上图的模型中,有以下概念:
- 生产者,也就是要发送消息的程序
- 消费者:消息的接受者 会一直等待消息到来
- 消息队列:图中红色部分 类似一个邮箱,可以缓存消息 生产者向其中投递消息 消费者从其中取出消息
生产者
package com.xiao.rabbitmq.simple;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @Classname 生产者
* @Description TODO
* @Date 2022/9/13 19:29
* @Created by ylp
*/
public class SimpleProducer {
public static void main(String[] args) throws IOException, TimeoutException {
// 1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 2.设置参数
// 2.1.设置IP地址
factory.setHost("101.200.132.168");
// 2.2.设置端口号
factory.setPort(5672);
// 2.3.设置用户名
factory.setUsername("admin");
// 2.4.设置密码
factory.setPassword("admin");
// 2.5.设置虚拟节点
factory.setVirtualHost("/");
// 3.创建连接
Connection connection = factory.newConnection();
// 4.创建channel
Channel channel = connection.createChannel();
// 5.创建队列
/**
* @params1:队列的名字
* @params2:是否持久化 durable=false 所谓持久化:消息是否存盘
* @params3:是否具有排他性 1.独占队列:只能有一个消费者来监听队列 2.当Connection关闭时是否删除队列
* @params4:是否自动删除 是否自动删除队列 当没有Consumer自动删除掉
* @params5:是否有额外的参数 携带附属参数
* 如果没有名字为hello_world的队列则会创建该队列
* 如果有则不会创建
*/
channel.queueDeclare("hello_world",true,false,false,null);
// 6.发送内容
String body = "SimpleProducer";
// 7.发送信息
/**
* 功能描述: 发送消息给队列queue
* @params1:交换机
* @params2:队列、路由key
* @params3:消息是否持久化
* @params4:消息内容
* @面试题:可以存在没有交换机的队列吗?不可能,虽然没有交换机但是一定会存在一个默认的交换机
*/
channel.basicPublish("","hello_world",null,body.getBytes());
// 8.关闭通道
channel.close();
// 9.关闭连接
connection.close();
}
}
消费者
package com.xiao.rabbitmq.simple;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @Classname 消费者
* @Description TODO
* @Date 2022/9/13 19:29
* @Created by ylp
*/
public class SimpleConsumer {
public static void main(String[] args) throws IOException, TimeoutException {
// 1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 2.设置参数
// 2.1.设置IP地址
factory.setHost("101.200.132.168");
// 2.2.设置端口号
factory.setPort(5672);
// 2.3.设置用户名
factory.setUsername("admin");
// 2.4.设置密码
factory.setPassword("admin");
// 2.5.设置虚拟节点
factory.setVirtualHost("/");
// 3.创建连接
Connection connection = factory.newConnection();
// 4.创建channel
Channel channel = connection.createChannel();
// 5.创建队列
/**
* @params1:队列的名字
* @params2:是否持久化 durable=false 所谓持久化:消息是否存盘
* @params3:是否具有排他性 1.独占队列:只能有一个消费者来监听队列 2.当Connection关闭时是否删除队列
* @params4:是否自动删除 是否自动删除队列 当没有Consumer自动删除掉
* @params5:是否有额外的参数 携带附属参数
* 如果没有名字为hello_world的队列则会创建该队列
* 如果有则不会创建
*/
channel.queueDeclare("hello_world",true,false,false,null);
// 6.接收信息
com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
/**
* 7.功能描述:回调方法 当收到消息后会自动执行该方法 处理收到的信息
* @param consumerTag 唯一的标识
* @param envelope 获取对应信息 交换机、routing信息等等
* @param properties 配置信息
* @param body 真实数据
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("consumerTag:" + consumerTag);
System.out.println("Exchange:" + envelope.getExchange());
System.out.println("RoutingKey:" + envelope.getRoutingKey());
System.out.println("properties:" + properties);
// 由于生产者发送的消息是字节数组 所以我们要把字节数组转换成String
System.err.println("body:" + new String(body));
}
};
/**
* @Param 队列名称
* @Param 自动确认
* @Param 回调函数
*/
channel.basicConsume("hello_world",false,consumer);
// 8.关闭通道
channel.close();
// 9.关闭连接
connection.close();
}
}
3.2.什么是AMQP
3.2.1.什么是AMQP
AMQP全称:Advanced Message Queuing Protocol(高级消息队列协议)。是应用层协议的一个开发标准,为面向消息的中间件设计
3.2.2.AMQP生产者流转过程
3.2.3.AMQP消费者流转过程
3.3.RabbitMQ的核心组成部分
3.3.1.RabbitMQ的核心组成部分
核心概念
Server:又称Broker 接受客户端的连接 实现AMQP实体服务 安装rabbitmq-server
Connection:连接 应用程序与Broker的网络连接TCP/IP三次握手和四次挥手
Channel:网络信道 几乎所有的操作都在Channel中进行 Channel是进行消息读写的通道 客户端可以建立对各Channel 每个Channel代表一个会话任务
Message:消息 服务与应用程序之间传送的数据 由Properties和body组成 Properties可是对消息进行修饰 比如消息优先级 延迟等高级特性 Body则就是消息体的内容
Virtual Host:虚拟地址 用于进行逻辑隔离 最上层的消息路由 一个虚拟主机理由可以有若干个Exchange和Queue 同一个虚拟主机里不能有相同名字的Exchange
Exchange:交换机 接受消息 根据路由键发送消息到绑定的队列 不具备消息存储能力
Bindings:Exchange和Queue之间的虚拟连接 binding中可以保护多个 routing key
Routing key:是一个路由规则 虚拟机可以用它来确定如何由一个特定消息
Queue:队列 也成为Message Queue消息队列 保存消息并将它们转发给消费者
3.3.2.RabbitMQ整体架构是什么样子的?
3.3.3.RabbitMQ的运行流程
简单模式Simple
- 类型:Simple
- 特点:一个生产者对应一个消费者
工作模式Work
- 类型:Work
- 特点:分发机制
发布订阅模式
- 类型:fanout
- 特点:Fanout—发布订阅模式,是一种广播机制,它是没有路由key的模式
路由模式
- 类型:direct
- 特点:有routing-key的匹配模式
主题Topic模式
- 类型:topic
- 特点:模糊的routing-key的匹配模式
参数模式
- 类型:headers
- 特点:参数匹配机制
小结:
RabbitMQ发送消息一定要有一个交换机 如果没有指定会使用默认的交换机
3.3.4.RabbitMQ支持的消息模型
- 简单模式 Simple
- 工作模式 Work
- 发布订阅模式
- 路由模式
- 主题 Topic模式
- 参数模式
3.4.RabbitMQ入门案例 - fanout 模式
3.4.1.RabbitMQ的模式之发布订阅模式
图解
发布订阅模式的具体实现
- web操作查看视频
- 类型:fanout
- 特点:Fanout - 发布与订阅模式,是一种广播机制,它是没有路由 key的模式
生产者
package com.xiao.rabbitmq.fanout;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @Classname 生产者
* @Description TODO
* @Date 2022/9/13 19:29
* @Created by ylp
*/
public class FanoutProducer {
public static void main(String[] args) throws IOException, TimeoutException {
// 1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 2.设置参数
// 2.1.设置IP地址
factory.setHost("101.200.132.168");
// 2.2.设置端口号
factory.setPort(5672);
// 2.3.设置用户名
factory.setUsername("admin");
// 2.4.设置密码
factory.setPassword("admin");
// 2.5.设置虚拟节点
factory.setVirtualHost("/");
// 3.创建连接
Connection connection = factory.newConnection();
// 4.创建channel
Channel channel = connection.createChannel();
// 5.这里队列跟交换机的绑定信息在图形化就已经绑定了 所以这里没有关于交换机和队列绑定关系
// 6.发送的内容
String body = "AllProducer";
// 7.创建交换机
String exchangeName = "fanout-exchange";
// 8.交换机类型
String exchangeType = "fanout";
// 9.路由Key
String routeKey = "";
/**
* 10.功能描述: 发送消息给队列queue
* @params1:交换机
* @params2:队列、路由key
* @params3:消息是否持久化
* @params4:消息内容
* @面试题:可以存在没有交换机的队列吗?不可能,虽然没有交换机但是一定会存在一个默认的交换机
*/
channel.basicPublish(exchangeName, routeKey, null, body.getBytes());
System.out.println("发送信息成功~");
// 11.关闭通道
channel.close();
// 12.关闭连接
connection.close();
}
}
消费者
package com.xiao.rabbitmq.fanout;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* @Classname 消费者
* @Description TODO
* @Date 2022/9/13 19:29
* @Created by ylp
*/
public class FanoutConsumer {
private static Runnable runnable = new Runnable() {
@Override
public void run() {
// 1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("101.200.132.168");
factory.setPort(5672);
factory.setUsername("admin");
factory.setPassword("admin");
factory.setVirtualHost("/");
// 获取队列名称
final String queueName = Thread.currentThread().getName();
// 创建连接
Connection connection = null;
// 创建通道
Channel channel = null;
try {
// 3.从连接工厂获取连接
connection = factory.newConnection();
// 4.从连接获取通道Channel
channel = connection.createChannel();
// 5.创建接收信息回调函数
com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
/**
* 功能描述:回调方法 当收到消息后会自动执行该方法 处理收到的信息
* @param consumerTag 唯一的标识
* @param envelope 获取对应信息 交换机、routing信息等等
* @param properties 配置信息
* @param body 真实数据
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("consumerTag:" + consumerTag);
System.out.println("Exchange:" + envelope.getExchange());
System.out.println("RoutingKey:" + envelope.getRoutingKey());
System.out.println("properties:" + properties);
// 由于生产者发送的消息是字节数组 所以我们要把字节数组转换成String
System.err.println(queueName+"开始接收信息:" + new String(body));
}
};
// 6.接收信息
/**
* @Param 队列名称
* @Param 自动确认
* @Param 回调函数
*/
channel.basicConsume(queueName, true, consumer);
} catch (Exception e) {
e.printStackTrace();
System.err.println("接收消息出现异常!");
} finally {
// 7.关闭通道
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (Exception e) {
e.printStackTrace();
}
// 8.关闭连接
if (connection != null && connection.isOpen()) {
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
};
public static void main(String[] args) {
// 启动三个线程去接收信息
new Thread(runnable,"queue1").start();
new Thread(runnable,"queue2").start();
new Thread(runnable,"queue3").start();
}
}
此处没有通过代码去绑定交换机和队列,而是通过可视化界面去绑定的!
3.5.RabbitMQ入门案例 - Direct 模式
生产者
package com.xiao.rabbitmq.direct;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @Classname 生产者
* @Description TODO
* @Date 2022/9/13 19:29
* @Created by ylp
*/
public class DirectProducer {
public static void main(String[] args) throws IOException, TimeoutException {
// 1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 2.设置参数
// 2.1.设置IP地址
factory.setHost("101.200.132.168");
// 2.2.设置端口号
factory.setPort(5672);
// 2.3.设置用户名
factory.setUsername("admin");
// 2.4.设置密码
factory.setPassword("admin");
// 2.5.设置虚拟节点
factory.setVirtualHost("/");
// 3.创建连接
Connection connection = factory.newConnection();
// 4.创建channel
Channel channel = connection.createChannel();
// 5.这里队列跟交换机的绑定信息在图形化就已经绑定了 所以这里没有关于交换机和队列绑定关系
// 6.发送的内容
String body = "DirectProducer";
// 7.创建交换机
String exchangeName = "direct-exchange";
// 8.路由Key
String routeKey = "email";
// 9.交换机类型
String exchangeType = "direct";
/**
* 10.功能描述: 发送消息给队列queue
* @params1:交换机
* @params2:队列、路由key
* @params3:消息是否持久化
* @params4:消息内容
* @面试题:可以存在没有交换机的队列吗?不可能,虽然没有交换机但是一定会存在一个默认的交换机
*/
channel.basicPublish(exchangeName,routeKey, null, body.getBytes());
// 11.关闭通道
channel.close();
// 12.关闭连接
connection.close();
}
}
消费者
package com.xiao.rabbitmq.direct;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* @Classname 消费者
* @Description TODO
* @Date 2022/9/13 19:29
* @Created by ylp
*/
public class DirectConsumer {
private static Runnable runnable = new Runnable() {
@Override
public void run() {
// 1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("101.200.132.168");
factory.setPort(5672);
factory.setUsername("admin");
factory.setPassword("admin");
factory.setVirtualHost("/");
// 获取队列名称
final String queueName = Thread.currentThread().getName();
// 创建连接
Connection connection = null;
// 创建通道
Channel channel = null;
try {
// 3.从连接工厂获取连接
connection = factory.newConnection();
// 4.从连接获取通道Channel
channel = connection.createChannel();
// 5.创建接收信息回调函数
com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
/**
* 功能描述:回调方法 当收到消息后会自动执行该方法 处理收到的信息
* @param consumerTag 唯一的标识
* @param envelope 获取对应信息 交换机、routing信息等等
* @param properties 配置信息
* @param body 真实数据
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("consumerTag:" + consumerTag);
System.out.println("Exchange:" + envelope.getExchange());
System.out.println("RoutingKey:" + envelope.getRoutingKey());
System.out.println("properties:" + properties);
// 由于生产者发送的消息是字节数组 所以我们要把字节数组转换成String
System.err.println("body:" + new String(body));
}
};
// 6.接收信息
/**
* @Param 队列名称
* @Param 自动确认
* @Param 回调函数
*/
channel.basicConsume(queueName, true, consumer);
} catch (Exception e) {
e.printStackTrace();
System.err.println("接收消息出现异常!");
} finally {
// 7.关闭通道
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (Exception e) {
e.printStackTrace();
}
// 8.关闭连接
if (connection != null && connection.isOpen()) {
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
};
public static void main(String[] args) {
// 启动三个线程去接收信息
new Thread(runnable,"queue1").start();
new Thread(runnable,"queue2").start();
new Thread(runnable,"queue3").start();
}
}
此处没有通过代码去绑定交换机和队列,而是通过可视化界面去绑定的!
3.6.RabbitMQ入门案例 - Topic 模式
生产者
package com.xiao.rabbitmq.topics;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @Classname 生产者
* @Description TODO
* @Date 2022/9/13 19:29
* @Created by ylp
*/
public class TopicProducer {
public static void main(String[] args) throws IOException, TimeoutException {
// 1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 2.设置参数
// 2.1.设置IP地址
factory.setHost("101.200.132.168");
// 2.2.设置端口号
factory.setPort(5672);
// 2.3.设置用户名
factory.setUsername("admin");
// 2.4.设置密码
factory.setPassword("admin");
// 2.5.设置虚拟节点
factory.setVirtualHost("/");
// 3.创建连接
Connection connection = factory.newConnection();
// 4.创建channel
Channel channel = connection.createChannel();
// 5.这里队列跟交换机的绑定信息在图形化就已经绑定了 所以这里没有关于交换机和队列绑定关系
// 6.发送的内容
String body = "TopicProducer";
// 7.创建交换机
String exchangeName = "topic-exchange";
// 8.交换机类型
String exchangeType = "topic";
/**
* 9.路由Key
* #代表一级或多级至少有一个最多随便
* *代表只能有一级至少有一个最多有一个
*/
String routeKey = "com.order.test.xxxx";
/**
* 10.功能描述: 发送消息给队列queue
* @params1:交换机
* @params2:队列、路由key
* @params3:消息是否持久化
* @params4:消息内容
* @面试题:可以存在没有交换机的队列吗?不可能,虽然没有交换机但是一定会存在一个默认的交换机
*/
channel.basicPublish(exchangeName, routeKey, null, body.getBytes());
System.out.println("发送信息成功~");
// 11.关闭通道
channel.close();
// 12.关闭连接
connection.close();
}
}
消费者
package com.xiao.rabbitmq.topics;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* @Classname 消费者
* @Description TODO
* @Date 2022/9/13 19:29
* @Created by ylp
*/
public class TopicConsumer {
private static Runnable runnable = new Runnable() {
@Override
public void run() {
// 1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("101.200.132.168");
factory.setPort(5672);
factory.setUsername("admin");
factory.setPassword("admin");
factory.setVirtualHost("/");
// 获取队列名称
final String queueName = Thread.currentThread().getName();
// 创建连接
Connection connection = null;
// 创建通道
Channel channel = null;
try {
// 3.从连接工厂获取连接
connection = factory.newConnection();
// 4.从连接获取通道Channel
channel = connection.createChannel();
// 5.创建接收信息回调函数
Consumer consumer = new DefaultConsumer(channel) {
/**
* 功能描述:回调方法 当收到消息后会自动执行该方法 处理收到的信息
* @param consumerTag 唯一的标识
* @param envelope 获取对应信息 交换机、routing信息等等
* @param properties 配置信息
* @param body 真实数据
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("consumerTag:" + consumerTag);
System.out.println("Exchange:" + envelope.getExchange());
System.out.println("RoutingKey:" + envelope.getRoutingKey());
System.out.println("properties:" + properties);
// 由于生产者发送的消息是字节数组 所以我们要把字节数组转换成String
System.err.println(queueName+"开始接收信息:" + new String(body));
}
};
// 6.接收信息
/**
* @Param 队列名称
* @Param 自动确认
* @Param 回调函数
*/
channel.basicConsume(queueName, false, consumer);
} catch (Exception e) {
e.printStackTrace();
System.err.println("接收消息出现异常!");
} finally {
// 7.关闭通道
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (Exception e) {
e.printStackTrace();
}
// 8.关闭连接
if (connection != null && connection.isOpen()) {
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
};
public static void main(String[] args) {
// 启动三个线程去接收信息
new Thread(runnable,"queue1").start();
new Thread(runnable,"queue2").start();
new Thread(runnable,"queue3").start();
}
}
3.6.1.程序中进行绑定不通过web图形化绑定
生产者
package com.xiao.rabbitmq.all;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @Classname 生产者
* @Description TODO
* @Date 2022/9/13 19:29
* @Created by ylp
*/
public class AllProducer {
public static void main(String[] args) throws IOException, TimeoutException {
// 1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 2.设置参数
// 2.1.设置IP地址
factory.setHost("101.200.132.168");
// 2.2.设置端口号
factory.setPort(5672);
// 2.3.设置用户名
factory.setUsername("admin");
// 2.4.设置密码
factory.setPassword("admin");
// 2.5.设置虚拟节点
factory.setVirtualHost("/");
// 3.创建连接
Connection connection = factory.newConnection();
// 4.创建channel
Channel channel = connection.createChannel();
// 5.这里队列跟交换机的绑定信息在图形化就已经绑定了 所以这里没有关于交换机和队列绑定关系
// 6.发送的内容
String body = "DiyAllProducer";
// 7.创建交换机
String exchangeName = "direct_message_exchange";
// 8.交换机类型 direct/fanout/topic/headers
String exchangeType = "direct";
/**
* 9.声名交换机
* @param 交换机名字
* @param 交换子类型
* @param 是否持久化:持久化代表重启broker交换机是否删除 为true不会删除 为false代表删除
*/
channel.exchangeDeclare(exchangeName,exchangeType,true);
// 10.创建队列
String queueName_1 = "queue4";
String queueName_2 = "queue5";
String queueName_3 = "queue6";
/**
* 11.声名队列
* @param 是否持久化:持久化代表重启broker队列是否删除 为true不会删除 为false代表删除
* @param 是否排他性
* @param 是否自动删除
* @param 是否有参数
*/
channel.queueDeclare(queueName_1,true,false,false,null);
channel.queueDeclare(queueName_2,true,false,false,null);
channel.queueDeclare(queueName_3,true,false,false,null);
// 12.路由Key
String routeKey_1 = "order";
String routeKey_2 = "course";
/**
* 13.绑定交换机和队列关系
* @param 绑定的队列名字
* @param 绑定的交换机名字
* @param 绑定的路由Key
*/
channel.queueBind(queueName_1,exchangeName,routeKey_1);
channel.queueBind(queueName_2,exchangeName,routeKey_1);
channel.queueBind(queueName_3,exchangeName,routeKey_2);
/**
* 14.功能描述: 发送消息给队列queue
* @params1:交换机
* @params2:队列、路由key
* @params3:消息是否持久化
* @params4:消息内容
* @面试题:可以存在没有交换机的队列吗?不可能,虽然没有交换机但是一定会存在一个默认的交换机
*/
channel.basicPublish(exchangeName, routeKey_1, null, body.getBytes());
System.out.println("发送信息成功~");
// 15.关闭通道
channel.close();
// 16.关闭连接
connection.close();
}
}
消费者
package com.xiao.rabbitmq.all;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* @Classname 消费者
* @Description TODO
* @Date 2022/9/13 19:29
* @Created by ylp
*/
public class AllConsumer {
private static Runnable runnable = new Runnable() {
@Override
public void run() {
// 1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("101.200.132.168");
factory.setPort(5672);
factory.setUsername("admin");
factory.setPassword("admin");
factory.setVirtualHost("/");
// 获取队列名称
final String queueName = Thread.currentThread().getName();
// 创建连接
Connection connection = null;
// 创建通道
Channel channel = null;
try {
// 3.从连接工厂获取连接
connection = factory.newConnection();
// 4.从连接获取通道Channel
channel = connection.createChannel();
// 5.创建接收信息回调函数
Consumer consumer = new DefaultConsumer(channel) {
/**
* 功能描述:回调方法 当收到消息后会自动执行该方法 处理收到的信息
* @param consumerTag 唯一的标识
* @param envelope 获取对应信息 交换机、routing信息等等
* @param properties 配置信息
* @param body 真实数据
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("consumerTag:" + consumerTag);
System.out.println("Exchange:" + envelope.getExchange());
System.out.println("RoutingKey:" + envelope.getRoutingKey());
System.out.println("properties:" + properties);
// 由于生产者发送的消息是字节数组 所以我们要把字节数组转换成String
System.err.println(queueName+"开始接收信息:" + new String(body));
}
};
// 6.接收信息
/**
* @Param 队列名称
* @Param 自动确认
* @Param 回调函数
*/
channel.basicConsume(queueName, false, consumer);
} catch (Exception e) {
e.printStackTrace();
System.err.println("接收消息出现异常!");
} finally {
// 7.关闭通道
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (Exception e) {
e.printStackTrace();
}
// 8.关闭连接
if (connection != null && connection.isOpen()) {
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
};
public static void main(String[] args) {
// 启动三个线程去接收信息
new Thread(runnable,"queue4").start();
new Thread(runnable,"queue5").start();
new Thread(runnable,"queue6").start();
}
}
3.7.RabbitMQ入门案例 - Work模式
3.7.1.Work模式轮询模式(Round-Robin)
图解
当有多个消费者时,我们的消息会被哪个消费者消费呢,我们又该如何均衡消费者消费信息的多少呢?
主要有两种模式:
- 轮询模式的分发:一个消费者一条 按均分配
- 公平分发:根据消费者的消费能力进行公平分发 处理快的处理的多 处理慢的处理的少 按劳分配
- 类型:无
- 特点:该模式接收信息是当有多个消费者介入时 消息的分配模式时一个消费者分配一条 直至消息消费完成
生产者
package com.xiao.rabbitmq.work.polling;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/**
* @Classname 生产者
* @Description TODO
* @Date 2022/9/13 19:29
* @Created by ylp
*/
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
// 1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 2.设置参数
// 2.1.设置IP地址
factory.setHost("101.200.132.168");
// 2.2.设置端口号
factory.setPort(5672);
// 2.3.设置用户名
factory.setUsername("admin");
// 2.4.设置密码
factory.setPassword("admin");
// 2.5.设置虚拟节点
factory.setVirtualHost("/");
// 3.创建连接
Connection connection = factory.newConnection();
// 4.创建channel
Channel channel = connection.createChannel();
// 5.这里队列跟交换机的绑定信息在图形化就已经绑定了 所以这里没有关于交换机和队列绑定关系
// 6.路由Key
String routeKey = "queue7";
// 7.发送的内容
for (int i = 0; i <=20 ; i++) {
String body = "xiao"+i;
/**
* 8.功能描述: 发送消息给队列queue
* @params1:交换机
* @params2:队列、路由key
* @params3:消息是否持久化
* @params4:消息内容
* @面试题:可以存在没有交换机的队列吗?不可能,虽然没有交换机但是一定会存在一个默认的交换机
*/
channel.basicPublish("", routeKey, null, body.getBytes());
// 这里让他睡眠3秒 我们可以观看一下
TimeUnit.SECONDS.sleep(3);//秒
}
System.out.println("发送信息成功~");
// 11.关闭通道
channel.close();
// 12.关闭连接
connection.close();
}
}
消费者
package com.xiao.rabbitmq.work.polling;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/**
* @Classname 消费者1
* @Description TODO
* @Date 2022/9/13 19:29
* @Created by ylp
*/
public class ConsumerWork_1 {
public static void main(String[] args) throws IOException, TimeoutException {
// 1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 2.设置参数
// 2.1.设置IP地址
factory.setHost("101.200.132.168");
// 2.2.设置端口号
factory.setPort(5672);
// 2.3.设置用户名
factory.setUsername("admin");
// 2.4.设置密码
factory.setPassword("admin");
// 2.5.设置虚拟节点
factory.setVirtualHost("/");
// 3.创建连接
Connection connection = factory.newConnection();
// 4.创建channel
Channel channel = connection.createChannel();
// 5.接收信息
com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
/**
* 6.功能描述:回调方法 当收到消息后会自动执行该方法 处理收到的信息
* @param consumerTag 唯一的标识
* @param envelope 获取对应信息 交换机、routing信息等等
* @param properties 配置信息
* @param body 真实数据
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// 由于生产者发送的消息是字节数组 所以我们要把字节数组转换成String
System.err.println("queue7"+"body:" + new String(body));
}
};
/**
* @Param 队列名称
* @Param 自动确认
* @Param 回调函数
*/
//channel.basicQos(1);
channel.basicConsume("queue7",true,consumer);
System.out.println("开始接收信息~");
// 这里不要关闭连接和通道让他持续接收消息
}
}
// 创建两个一样的
轮询分发:不管你服务器性能怎么样 都是一个消费者分配一条这样轮询 不管消费者消费多久都会等你消费完 再去分配其他消费者
轮询分发:必须应答 如果不应答其他消费者就会一直等待 导致死循环 默认就是轮询分发
3.7.2.Work模式公平分发模式
生产者
package com.xiao.rabbitmq.work.fair;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/**
* @Classname 生产者
* @Description TODO
* @Date 2022/9/13 19:29
* @Created by ylp
*/
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
// 1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 2.设置参数
// 2.1.设置IP地址
factory.setHost("101.200.132.168");
// 2.2.设置端口号
factory.setPort(5672);
// 2.3.设置用户名
factory.setUsername("admin");
// 2.4.设置密码
factory.setPassword("admin");
// 2.5.设置虚拟节点
factory.setVirtualHost("/");
// 3.创建连接
Connection connection = factory.newConnection();
// 4.创建channel
Channel channel = connection.createChannel();
// 5.这里队列跟交换机的绑定信息在图形化就已经绑定了 所以这里没有关于交换机和队列绑定关系
// 6.路由Key
String routeKey = "queue7";
// 7.发送的内容
for (int i = 0; i <=20 ; i++) {
String body = "xiao"+i;
/**
* 8.功能描述: 发送消息给队列queue
* @params1:交换机
* @params2:队列、路由key
* @params3:消息是否持久化
* @params4:消息内容
* @面试题:可以存在没有交换机的队列吗?不可能,虽然没有交换机但是一定会存在一个默认的交换机
*/
channel.basicPublish("", routeKey, null, body.getBytes());
// 这里让他睡眠3秒 我们可以观看一下
TimeUnit.SECONDS.sleep(3);//秒
}
System.out.println("发送信息成功~");
// 11.关闭通道
channel.close();
// 12.关闭连接
connection.close();
}
}
消费者
package com.xiao.rabbitmq.work.fair;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @Classname 消费者1
* @Description TODO
* @Date 2022/9/13 19:29
* @Created by ylp
*/
public class ConsumerWork_1 {
public static void main(String[] args) throws IOException, TimeoutException {
// 1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 2.设置参数
// 2.1.设置IP地址
factory.setHost("101.200.132.168");
// 2.2.设置端口号
factory.setPort(5672);
// 2.3.设置用户名
factory.setUsername("admin");
// 2.4.设置密码
factory.setPassword("admin");
// 2.5.设置虚拟节点
factory.setVirtualHost("/");
// 3.创建连接
Connection connection = factory.newConnection();
// 4.创建channel
Channel channel = connection.createChannel();
// 5.接收信息
channel.basicQos(1); // 从队列里一次拿多少条信息进行消费 根据服务器性能、磁盘状况 定义一次消费多少条信息
Consumer consumer = new DefaultConsumer(channel) {
/**
* 6.功能描述:回调方法 当收到消息后会自动执行该方法 处理收到的信息
* @param consumerTag 唯一的标识
* @param envelope 获取对应信息 交换机、routing信息等等
* @param properties 配置信息
* @param body 真实数据
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// 由于生产者发送的消息是字节数组 所以我们要把字节数组转换成String
System.err.println("queue7"+"body:" + new String(body));
// 公平分发必须手动应答
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
/**
* @Param 队列名称
* @Param 自动确认
* @Param 回调函数
*/
//channel.basicQos(1);
channel.basicConsume("queue7",false,consumer);
System.out.println("开始接收信息~");
// 这里不要关闭连接和通道让他持续接收消息
}
}
// 创建两个一样的!
公平分发:根据你服务器的性能进行分发 谁快谁就消费的多 谁慢谁就消费的少
公平分发:必须改为手动应答 必须设置Qos Qos根据你的服务器性能、磁盘的状况来设置 建议不要设太大 否则承受不住会拒绝消费
3.8.RabbitMQ使用场景
3.8.1.解耦、削峰、异步
同步异步的问题(串行)
串行方式:将订单信息写入数据库成功后,发送注册邮件,再发送注册短信。以上三个任务全部完成后,返回给客户端
public void makeOrder(){
//1.保存订单
orderService.saveOrder();
//2.发送短信服务
messageService.sendSMS("order"); //1-2 s
//3.发送email服务
emailService.sendEmail("order"); //1-2 s
//4.发送app服务
appService.sendApp("order");
}
并行方式 异步线程池
并行方式:将订单信息写入数据库成功后,发送注册邮件的同时,发送注册短信。以上三个任务完成后,返回给客户端。与串行的差别是,并行的方式可以提高处理的时间
public void makeOrder(){
//1.保存订单
orderService.saveOrder();
// 相关发送
relationMessage();
}
public void relationMessage(){
//异步
theadpool.submit(new Callable<Object>{
public Object call(){
//2.发送短信服务
emailService.sendEmail("order");
}
})
//异步
theadpool.submit(new Callable<Object>{
public Object call(){
//3.发送email服务
emailService.sendEmail("order");
}
})
//异步
theadpool.submit(new Callable<Object>{
public Object call(){
//4.发送app服务
appService.sendApp("order");
}
})
//异步
theadpool.submit(new Callable<Object>{
public Object call(){
//4.发送短信服务
emailService.sendEmail("order");
}
})
}
存在问题
- 耦合度高
- 需要自己写线程池自己维护成本太高
- 出现了消息可能会丢失,需要你自己做消息补偿
- 如何保证消息的可靠性你自己写
- 如果服务器承载不了,你需要自己去写高可用
异步消息队列的方式
好处:
- 完全解耦,用 MQ建立桥接
- 有独立的线程池和运行模型
- 出现了消息可能会丢失,MQ有持久化功能
- 如何保证消息的可靠性,死信队列和消息转移等
- 如果服务器承载不了,你需要自己去写高可用,HA镜像模型高可用
按照以上约定,用户的响应时间相当于是订单信息写入数据库的时间,也就是50毫秒。注册邮件,发送短信写入消息队列后,直接返回,因此写入消息队列的速度很快,基本可以忽略,因此用户的响应时间可能是50毫秒。因此架构改变后,系统的吞吐量提高到每秒20QPS。比串行提高了3倍,比并行提高了两倍
public void makeOrder(){
//1.保存订单
orderService.saveOrder();
rabbitTemplate.convertSend("ex","2","消息内容");
}
3.8.2.高内聚,低耦合
好处:
- 完全解耦,用 MQ建立桥接
- 有独立的线程池和运行模型
- 出现了消息可能会丢失,MQ有持久化功能
- 如何保证消息的可靠性,死信队列和消息转移等
- 如果服务器承载不了,你需要自己去写高可用,HA镜像模型高可用
按照以上约定,用户的响应时间相当于是订单信息写入数据库的时间,也就是50毫秒。注册邮件,发送短信写入消息队列后,直接返回,因此写入消息队列的速度很快,基本可以忽略,因此用户的响应时间可能是50毫秒。因此架构改变后,系统的吞吐量提高到每秒20QPS。比串行提高了3倍,比并行提高了两倍
面试高频问题:为什么使用RbbitMQ
在2020年我加入到一家公司刚开始进入公司的时候公司的架构比较单一 采用的单体结构 而他的单体结构就是把所有项目堆积在一个里面 但是随着我们公司的发展和推进 在第二年的时候我们公司的项目负责人他接下来就把我们的项目进行分辨 变成了一个分布式架构 就把系统进行了拆分 拆分的过程中 我们就会考虑到一个问题 比如我们在拆的过程中我们做的什么系统的某个模块 和另外一个模块 它们之间会进行一个互相沟通和协同那么这个时候我们公司就采用了消息队列 而我们公司在选择消息队列的时候就一直在思考用什么消息队列 后来我们决定了用RabbitMQ 我自己使用RabbitMQ的感受其实说来说去的话 其实最核心的点就是异步的点因为它是一个多线程嘛 是一个分发的机制 是一个多线程机制 而且可以让我们网站性能啊会提高 因为他的异步就可以让我们处理数据的能力非常高效稳健 还有一个点就是我们在开发的时候可以把这些服务会分裂 分裂以后呢就可以解耦
4.Springboot案例
4.1.Fanout 模式
生产者
application.yml
# 服务端口
server:
port: 8080
# 配置rabbitmq服务
spring:
rabbitmq:
username: admin
password: admin
virtual-host: /
host: 101.200.132.168
port: 5672
OrderService.java
package com.xiao.rabbitmq.service;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.UUID;
/**
* @Classname 订单
* @Description TODO
* @Date 2022/9/17 0:42
* @Created by ylp
*/
@Service
public class OrderService {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 功能描述:模拟用户下单
*
* @param userId 用户ID
* @param productId 商品ID
* @param num 订单数量
*/
public void makeOrder(String userId, String productId, int num) {
// 1.根据商品id查询库存是否充足
// 2.保存订单
String orderId = UUID.randomUUID().toString();
System.out.println("订单生成成功:" + orderId);
/**
* 3.通过消息队列完成消息分发
* @param 交换机
* @param 路由Key 没有指定交换机就是队列名
* @param 消息内同
*/
// 创建交换机
String exchangeName = "fanout_order_exchange";
// 创建路由Key
String routeKey = "";
rabbitTemplate.convertAndSend(exchangeName, routeKey, orderId);
}
}
FanoutRabbitMQConfiguration.java
package com.xiao.rabbitmq.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @Classname RabbitMQ配置类
* @Description TODO
* @Date 2022/9/17 0:50
* @Created by ylp
*/
@Configuration
public class FanoutRabbitMQConfiguration {
// 1.声名注册fanout模式的交换机
@Bean
public FanoutExchange fanoutExchange() {
/**
* @param 交换机名字
* @param 是否持久化
* @param 是否自动删除
*/
return new FanoutExchange("fanout_order_exchange", true, false);
}
// 2.声名注册队列sms_fanout_queue、email_fanout_queue、message_fanout_queue
@Bean
public Queue smsFanoutQueue() {
/**
* sms短信
* @param 队列名字
* @param 是否持久化
*/
return new Queue("sms_fanout_queue", true);
}
@Bean
public Queue emailFanoutQueue() {
/**
* email邮件
* @param 队列名字
* @param 是否持久化
*/
return new Queue("email_fanout_queue", true);
}
@Bean
public Queue messageFanoutQueue() {
/**
* message短信
* @param 队列名字
* @param 是否持久化
*/
return new Queue("message_fanout_queue", true);
}
// 3.完成绑定关系(把我们的队列和我们的交换机完成绑定关系)
@Bean
public Binding smsFanoutBinding() {
/**
* sms短信 绑定
* @param 绑定的队列
* @param 绑定的交换机
*/
return BindingBuilder.bind(smsFanoutQueue()).to(fanoutExchange());
}
@Bean
public Binding emailFanoutBinding() {
/**
* email邮件 绑定
* @param 绑定的队列
* @param 绑定的交换机
*/
return BindingBuilder.bind(emailFanoutQueue()).to(fanoutExchange());
}
@Bean
public Binding messageFanoutBinding() {
/**
* message短信 绑定
* @param 绑定的队列
* @param 绑定的交换机
*/
return BindingBuilder.bind(messageFanoutQueue()).to(fanoutExchange());
}
}
SpringbootOrderRabbitmqProducerApplicationTests.java
package com.xiao.rabbitmq;
import com.xiao.rabbitmq.service.fanout.OrderService;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
class SpringbootOrderRabbitmqProducerApplicationTests {
@Autowired
private OrderService orderService;
@Test
void contextLoadsFanout() {
orderService.makeOrderFanout("1","1",12);
}
}
消费者
application.yaml
# 服务端口
server:
port: 8081
# 配置rabbitmq服务
spring:
rabbitmq:
username: admin
password: admin
virtual-host: /
host: 101.200.132.168
port: 5672
FanoutSMSConsumer.java
package com.xiao.rabbitmq.service.fanout;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* @Classname SMS短信消费者
* @Description TODO
* @Date 2022/9/17 1:20
* @Created by ylp
*/
@Component
@RabbitListener(queues = {"sms_fanout_queue"})
public class FanoutSMSConsumer {
@RabbitHandler
public void receiveMessage(String sms) {
System.out.println("sms fanout---接收到了订单信息是:---->" + sms);
}
}
FanoutMessageConsumer.java
package com.xiao.rabbitmq.service.fanout;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* @Classname Message短信消费者
* @Description TODO
* @Date 2022/9/17 1:20
* @Created by ylp
*/
@Component
@RabbitListener(queues = {"message_fanout_queue"})
public class FanoutMessageConsumer {
@RabbitHandler
public void receiveMessage(String message) {
System.out.println("message fanout---接收到了订单信息是:---->" + message);
}
}
FanoutEmailConsumer.java
package com.xiao.rabbitmq.service.fanout;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* @Classname Email邮箱消费者
* @Description TODO
* @Date 2022/9/17 1:20
* @Created by ylp
*/
@Component
@RabbitListener(queues = {"email_fanout_queue"})
public class FanoutEmailConsumer {
@RabbitHandler
public void receiveMessage(String email) {
System.out.println("email fanout---接收到了订单信息是:---->" + email);
}
}
图解
4.2.Direct 模式
生产者
DirectRabbitMQConfiguration.java
package com.xiao.rabbitmq.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @Classname RabbitMQ配置类
* @Description TODO
* @Date 2022/9/17 0:50
* @Created by ylp
*/
@Configuration
public class DirectRabbitMQConfiguration {
// 1.声名注册direct模式的交换机
@Bean
public DirectExchange directExchange() {
/**
* @param 交换机名字
* @param 是否持久化
* @param 是否自动删除
*/
return new DirectExchange("direct_order_exchange", true, false);
}
// 2.声名注册队列sms_direct_queue、email_direct_queue、message_direct_queue
@Bean
public Queue smsDirectQueue() {
/**
* sms短信
* @param 队列名字
* @param 是否持久化
*/
return new Queue("sms_direct_queue", true);
}
@Bean
public Queue emailDirectQueue() {
/**
* email邮件
* @param 队列名字
* @param 是否持久化
*/
return new Queue("email_direct_queue", true);
}
@Bean
public Queue messageDirectQueue() {
/**
* message短信
* @param 队列名字
* @param 是否持久化
*/
return new Queue("message_direct_queue", true);
}
// 3.完成绑定关系(把我们的队列和我们的交换机完成绑定关系)
@Bean
public Binding smsDirectBinding() {
/**
* sms短信 绑定
* @param 绑定的队列
* @param 绑定的交换机
*/
return BindingBuilder.bind(smsDirectQueue()).to(directExchange()).with("sms");
}
@Bean
public Binding emailDirectBinding() {
/**
* email邮件 绑定
* @param 绑定的队列
* @param 绑定的交换机
*/
return BindingBuilder.bind(emailDirectQueue()).to(directExchange()).with("email");
}
@Bean
public Binding messageDirectBinding() {
/**
* message短信 绑定
* @param 绑定的队列
* @param 绑定的交换机
*/
return BindingBuilder.bind(messageDirectQueue()).to(directExchange()).with("message");
}
}
OrderService.java
package com.xiao.rabbitmq.service.fanout;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.UUID;
/**
* @Classname 订单
* @Description TODO
* @Date 2022/9/17 0:42
* @Created by ylp
*/
@Service
public class OrderService {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 功能描述:模拟用户下单
*
* @param userId 用户ID
* @param productId 商品ID
* @param num 订单数量
*/
public void makeOrderDirect(String userId, String productId, int num) {
// 1.根据商品id查询库存是否充足
// 2.保存订单
String orderId = UUID.randomUUID().toString();
System.out.println("订单生成成功:" + orderId);
/**
* 3.通过消息队列完成消息分发
* @param 交换机
* @param 路由Key 没有指定交换机就是队列名
* @param 消息内同
*/
// 创建交换机
String exchangeName = "direct_order_exchange";
// 创建路由Key
String routeSMSKey = "sms";
String routeEmailKey = "email";
String routeMessageKey = "message";
rabbitTemplate.convertAndSend(exchangeName, routeSMSKey, orderId);
rabbitTemplate.convertAndSend(exchangeName, routeEmailKey, orderId);
rabbitTemplate.convertAndSend(exchangeName, routeMessageKey, orderId);
}
}
SpringbootOrderRabbitmqProducerApplicationTests.java
package com.xiao.rabbitmq;
import com.xiao.rabbitmq.service.fanout.OrderService;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
class SpringbootOrderRabbitmqProducerApplicationTests {
@Test
void contextLoadsDirect() {
orderService.makeOrderDirect("1","1",12);
}
}
消费者
DirectRabbitMQConfiguration.java
package com.xiao.rabbitmq.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @Classname RabbitMQ配置类
* @Description TODO
* @Date 2022/9/17 0:50
* @Created by ylp
*/
@Configuration
public class DirectRabbitMQConfiguration {
// 1.声名注册Direct模式的交换机
@Bean
public DirectExchange directExchange() {
/**
* @param 交换机名字
* @param 是否持久化
* @param 是否自动删除
*/
return new DirectExchange("direct_order_exchange", true, false);
}
// 2.声名注册队列sms_direct_queue、email_direct_queue、message_direct_queue
@Bean
public Queue smsQueue() {
/**
* sms短信
* @param 队列名字
* @param 是否持久化
*/
return new Queue("sms_direct_queue", true);
}
@Bean
public Queue emailQueue() {
/**
* email邮件
* @param 队列名字
* @param 是否持久化
*/
return new Queue("email_direct_queue", true);
}
@Bean
public Queue messageQueue() {
/**
* message短信
* @param 队列名字
* @param 是否持久化
*/
return new Queue("message_direct_queue", true);
}
// 3.完成绑定关系(把我们的队列和我们的交换机完成绑定关系)
@Bean
public Binding smsBinding() {
/**
* sms短信 绑定
* @param 绑定的队列
* @param 绑定的交换机
*/
return BindingBuilder.bind(smsQueue()).to(directExchange()).with("sms");
}
@Bean
public Binding emailBinding() {
/**
* email邮件 绑定
* @param 绑定的队列
* @param 绑定的交换机
*/
return BindingBuilder.bind(emailQueue()).to(directExchange()).with("email");
}
@Bean
public Binding messageBinding() {
/**
* message短信 绑定
* @param 绑定的队列
* @param 绑定的交换机
*/
return BindingBuilder.bind(messageQueue()).to(directExchange()).with("message");
}
}
DirectEmailConsumer.java
package com.xiao.rabbitmq.service.direct;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* @Classname Email邮箱消费者
* @Description TODO
* @Date 2022/9/17 1:20
* @Created by ylp
*/
@Component
@RabbitListener(queues = {"email_direct_queue"})
public class DirectEmailConsumer {
@RabbitHandler
public void receiveMessage(String email) {
System.out.println("email direct---接收到了订单信息是:---->" + email);
}
}
DirectMessageConsumer.java
package com.xiao.rabbitmq.service.direct;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* @Classname Message短信消费者
* @Description TODO
* @Date 2022/9/17 1:20
* @Created by ylp
*/
@Component
@RabbitListener(queues = {"message_direct_queue"})
public class DirectMessageConsumer {
@RabbitHandler
public void receiveMessage(String message) {
System.out.println("message direct---接收到了订单信息是:---->" + message);
}
}
DirectSMSConsumer.java
package com.xiao.rabbitmq.service.direct;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* @Classname SMS短信消费者
* @Description TODO
* @Date 2022/9/17 1:20
* @Created by ylp
*/
@Component
@RabbitListener(queues = {"sms_direct_queue"})
public class DirectSMSConsumer {
@RabbitHandler
public void receiveMessage(String sms) {
System.out.println("sms direct---接收到了订单信息是:---->" + sms);
}
}
4.3.Topic 模式
生产者
OrderService.java
package com.xiao.rabbitmq.service;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.UUID;
/**
* @Classname 订单
* @Description TODO
* @Date 2022/9/17 0:42
* @Created by ylp
*/
@Service
public class OrderService {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 功能描述:模拟用户下单
*
* @param userId 用户ID
* @param productId 商品ID
* @param num 订单数量
*/
public void makeOrderTopic(String userId, String productId, int num) {
// 1.根据商品id查询库存是否充足
// 2.保存订单
String orderId = UUID.randomUUID().toString();
System.out.println("订单生成成功:" + orderId);
/**
* 3.通过消息队列完成消息分发
* @param 交换机
* @param 路由Key 没有指定交换机就是队列名
* @param 消息内同
*/
// 创建交换机
String exchangeName = "topic_order_exchange";
// 创建路由Key 发送SMS Message
String routeSMSKey = "com.message";
/**
* 路由key
* @param SMS:com.#
* @param Email:*.email.#
* @param Message:#.message.#
*/
rabbitTemplate.convertAndSend(exchangeName, routeSMSKey, orderId);
}
}
SpringbootOrderRabbitmqProducerApplicationTests.java
package com.xiao.rabbitmq;
import com.xiao.rabbitmq.service.OrderService;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
class SpringbootOrderRabbitmqProducerApplicationTests {
@Test
void contextLoadsTopic() {
orderService.makeOrderTopic("1","1",12);
}
}
消费者(采用注解)
TopicEmailConsumer.java
package com.xiao.rabbitmq.service.topic;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;
/**
* @Classname Email邮箱消费者
* @Description TODO
* @Date 2022/9/17 1:20
* @Created by ylp
*/
@Component
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "email_topic_queue",durable = "true",autoDelete = "false"),
exchange = @Exchange(value = "topic_order_exchange",type = ExchangeTypes.TOPIC),
key = "*.email.#"
))
public class TopicEmailConsumer {
@RabbitHandler
public void receiveMessage(String email) {
System.out.println("email topic---接收到了订单信息是:---->" + email);
}
}
TopicMessageConsumer.java
package com.xiao.rabbitmq.service.topic;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;
/**
* @Classname Message短信消费者
* @Description TODO
* @Date 2022/9/17 1:20
* @Created by ylp
*/
@Component
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "message_topic_queue",durable = "true",autoDelete = "false"),
exchange = @Exchange(value = "topic_order_exchange",type = ExchangeTypes.TOPIC),
key = "#.message.#"
))
public class TopicMessageConsumer {
@RabbitHandler
public void receiveMessage(String message) {
System.out.println("message topic---接收到了订单信息是:---->" + message);
}
}
TopicSMSConsumer.java
package com.xiao.rabbitmq.service.topic;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;
/**
* @Classname SMS短信消费者
* @Description TODO
* @Date 2022/9/17 1:20
* @Created by ylp
*/
@Component
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "sms_topic_queue",durable = "true",autoDelete = "false"),
exchange = @Exchange(value = "topic_order_exchange",type = ExchangeTypes.TOPIC),
key = "com.#"
))
public class TopicSMSConsumer {
@RabbitHandler
public void receiveMessage(String sms) {
System.out.println("sms topic---接收到了订单信息是:---->" + sms);
}
}
5.RabbitMQ高级
5.1.过期时间TTL
https://www.bilibili.com/video/BV1dX4y1V73G?p=44
概述
过期时间 TTl表示可以对消息设置预期的时间,在这个时间内都可以被消费者接收获取;过了之后消息将自动被删除。RabbitMQ可以对消息和队列设置 TTL,目前有两种方法可以设置
- 第一种方法是通过队列属性设置 队列中所有消息都有相同的过期时间
- 第二种方法是对消息进行单独设置 每条消息 TTL可以不同
如果上述两种方法同时使用,则消息的过期时间以两者 TTL较小的那个数值为准。消息在队列的生存时间一旦超过设置的 TTL值,就称为 dead message被投递到死信队列,消费者将无法再收到该消息
设置队列TTL
TtlRabbitMQConfiguration.java
package com.xiao.rabbitmq.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
/**
* @Classname RabbitMQ配置类
* @Description TODO
* @Date 2022/9/17 0:50
* @Created by ylp
*/
@Configuration
public class TTLRabbitMQConfiguration {
// 1.声名注册ttl_direct_exchange模式的交换机
@Bean
public DirectExchange ttlExchange() {
/**
* @param 交换机名字
* @param 是否持久化
* @param 是否自动删除
*/
return new DirectExchange("ttl_direct_exchange", true, false);
}
/**
* 队列的过期时间
* @return
*/
@Bean
public Queue ttlQueue(){
// 设置过期时间
HashMap<String, Object> args = new HashMap<>();
args.put("x-message-ttl",5000); // 默认单位ms 这里一定是int类型 否则报错
return new Queue("ttl_direct_queue",true,false,false,args);
}
/**
* 交换机和队列绑定关系
* @return
*/
@Bean
public Binding ttlDirectBinding(){
return BindingBuilder.bind(ttlQueue()).to(ttlExchange()).with("ttl");
}
}
OrderService.java
public class OrderService{
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 功能描述:模拟用户下单
*
* @param userId 用户ID
* @param productId 商品ID
* @param num 订单数量
*/
public void makeOrderTtlDirect(String userId, String productId, int num) {
// 1.根据商品id查询库存是否充足
// 2.保存订单
String orderId = UUID.randomUUID().toString();
System.out.println("订单生成成功:" + orderId);
/**
* 3.通过消息队列完成消息分发
* @param 交换机
* @param 路由Key 没有指定交换机就是队列名
* @param 消息内同
*/
// 创建交换机
String exchangeName = "ttl_direct_exchange";
// 创建路由Key
String routeKey = "ttl";
rabbitTemplate.convertAndSend(exchangeName, routeKey, orderId);
}
}
设置消息TTL
TTLRabbitMQConfiguration.java
package com.xiao.rabbitmq.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
/**
* @Classname RabbitMQ配置类
* @Description TODO
* @Date 2022/9/17 0:50
* @Created by ylp
*/
@Configuration
public class TTLRabbitMQConfiguration {
/**
* 声名注册ttl_direct_exchange模式的交换机
* @return
*/
@Bean
public DirectExchange ttlExchange() {
/**
* @param 交换机名字
* @param 是否持久化
* @param 是否自动删除
*/
return new DirectExchange("ttl_direct_exchange", true, false);
}
/**
* 声名注册队列
* @return
*/
@Bean
public Queue ttlMessageQueue(){
return new Queue("ttl_message_queue",true,false,false);
}
/**
* 交换机和队列绑定关系
* @return
*/
@Bean
public Binding ttlMessageBinding(){
return BindingBuilder.bind(ttlMessageQueue()).to(ttlExchange()).with("ttlMessage");
}
}
OrderService.java
package com.xiao.rabbitmq.service;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.UUID;
/**
* @Classname 订单
* @Description TODO
* @Date 2022/9/17 0:42
* @Created by ylp
*/
@Service
public class OrderService {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 功能描述:模拟用户下单
*
* @param userId 用户ID
* @param productId 商品ID
* @param num 订单数量
*/
public void makeOrderTtlMessage(String userId, String productId, int num) {
// 1.根据商品id查询库存是否充足
// 2.保存订单
String orderId = UUID.randomUUID().toString();
System.out.println("订单生成成功:" + orderId);
/**
* 3.通过消息队列完成消息分发
* @param 交换机
* @param 路由Key 没有指定交换机就是队列名
* @param 消息内同
*/
// 创建交换机
String exchangeName = "ttl_direct_exchange";
// 创建路由Key
String routeKey = "ttlMessage";
/**
* 给消息设定过期时间
*/
MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
// 这里是字符串类型 不是int类型
message.getMessageProperties().setExpiration("5000");
// 设置编码
message.getMessageProperties().setContentEncoding("UTF-8");
return message; // 这里一定要返回message 如果返回null会报错
}
};
rabbitTemplate.convertAndSend(exchangeName, routeKey, orderId,messagePostProcessor);
}
}
5.2.死信队列
概述
DLX,全称 Dead-Letter-Exchange
,可以称之为死信交换机,也有人称之为死信邮箱。当消息再一个队列中变成死信之后,它能被重新发送到另一个交换机中,这个交换机就是 DLX,绑定 DLX的队列就称之为死信队列。消息变成死信,可能是由于以下原因:
- 消息被拒绝
- 消息过期
- 队列达到最大长度
DLX也是一个正常的交换机,和一般的交换机没有区别,它能在任何的队列上被指定,实际上就是设置某一个队列的属性,当这个队列中存在死信时,Rabbitmq就会自动地将这个消息重新发布到设置的 DLX上去,进而被路由到另一个队列,即死信队列。
要想使用死信队列,只需要在定义队列的时候设置队列参数x-dead-letter-exchange
指定交换机即可
死信队列
DeadRabbitMqConfiguration.java
package com.xiao.rabbitmq.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
/**
* @Classname RabbitMQ配置类
* @Description TODO
* @Date 2022/9/17 0:50
* @Created by ylp
*/
@Configuration
public class DeadRabbitMQConfiguration {
/**
* 声名注册dead_direct_exchange模式的交换机
* @return
*/
@Bean
public DirectExchange deadDirectExchange() {
/**
* @param 交换机名字
* @param 是否持久化
* @param 是否自动删除
*/
return new DirectExchange("dead_direct_exchange", true, false);
}
/**
* 声名注册队列
* @return
*/
@Bean
public Queue deadQueue(){
return new Queue("dead_direct_queue",true);
}
/**
* 交换机和队列绑定关系
* @return
*/
@Bean
public Binding deadBinding(){
return BindingBuilder.bind(deadQueue()).to(deadDirectExchange()).with("dead");
}
}
TTLRabbitMQConfiguration.java
package com.xiao.rabbitmq.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
/**
* @Classname RabbitMQ配置类
* @Description TODO
* @Date 2022/9/17 0:50
* @Created by ylp
*/
@Configuration
public class TTLRabbitMQConfiguration {
/**
* 声名注册ttl_direct_exchange模式的交换机
* 测试消息过期时间
* @return
*/
@Bean
public DirectExchange ttlExchange() {
/**
* @param 交换机名字
* @param 是否持久化
* @param 是否自动删除
*/
return new DirectExchange("ttl_direct_exchange", true, false);
}
/**
* 声名注册ttl_direct_exchange模式的交换机
* 测试消息队列最大接收长度
* @return
*/
@Bean
public DirectExchange ttlMaxExchange() {
/**
* @param 交换机名字
* @param 是否持久化
* @param 是否自动删除
*/
return new DirectExchange("ttl_direct_max_exchange", true, false);
}
/**
* 队列的过期时间-死信队列
* @return
*/
@Bean
public Queue ttlQueue(){
// 设置过期时间
HashMap<String, Object> args = new HashMap<>();
args.put("x-message-ttl",5000); // 默认单位ms 这里一定是int类型 否则报错
// 死信队列 配置
args.put("x-dead-letter-exchange","dead_direct_exchange"); // 交换机名字
// 有路由key一定要设置上没有的话就不用设置
args.put("x-dead-letter-routing-key","dead");
return new Queue("ttl_direct_queue",true,false,false,args);
}
/**
* 队列最大接收长度-死信队列
* @return
*/
@Bean
public Queue ttlMaxQueue(){
HashMap<String, Object> args = new HashMap<>();
// 死信队列 配置
args.put("x-max-length",5);
args.put("x-dead-letter-exchange","dead_direct_exchange"); // 交换机名字
// 有路由key一定要设置上没有的话就不用设置
args.put("x-dead-letter-routing-key","dead");
return new Queue("ttl_direct_max_queue",true,false,false,args);
}
/**
* 交换机和队列绑定关系
* @return
*/
@Bean
public Binding ttlDirectBinding(){
/**
* 死信队列-队列消息过期-绑定
*/
return BindingBuilder.bind(ttlQueue()).to(ttlExchange()).with("ttl");
}
/**
* 交换机和队列绑定关系
* @return
*/
@Bean
public Binding ttlDirectMaxBinding(){
/**
* 死信队列-队列接收最大长度-绑定
*/
return BindingBuilder.bind(ttlMaxQueue()).to(ttlMaxExchange()).with("ttlMax");
}
}
OrderService.java
package com.xiao.rabbitmq.service;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.UUID;
/**
* @Classname 订单
* @Description TODO
* @Date 2022/9/17 0:42
* @Created by ylp
*/
@Service
public class OrderService {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 功能描述:模拟用户下单
* 死信队列测试
* @param userId 用户ID
* @param productId 商品ID
* @param num 订单数量
*/
public void makeOrderTtlDirect(String userId, String productId, int num) {
// 1.根据商品id查询库存是否充足
// 2.保存订单
String orderId = UUID.randomUUID().toString();
System.out.println("订单生成成功:" + orderId);
/**
* 3.通过消息队列完成消息分发
* @param 交换机
* @param 路由Key 没有指定交换机就是队列名
* @param 消息内同
*/
//--------------------------测试队列过期-------------------------------
// 创建交换机
String exchangeName = "ttl_direct_exchange";
// 创建路由Key
String routeKey = "ttl";
rabbitTemplate.convertAndSend(exchangeName, routeKey, orderId);
//---------------------测试队列接收信息最大长度---------------------------
/**
* 创建交换机
* 创建路由Key
* String exchangeName = "ttl_direct_max_exchange";
* String routeKey = "ttlMax";
* rabbitTemplate.convertAndSend(exchangeName, routeKey, orderId);
*/
}
}
SpringbootOrderRabbitmqProducerApplicationTests.java
package com.xiao.rabbitmq;
import com.xiao.rabbitmq.service.OrderService;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
class SpringbootOrderRabbitmqProducerApplicationTests {
@Test
void contextLoadsTtlDirect() {
// 测试队列最大接收长度
for (int i = 0; i < 11; i++) {
orderService.makeOrderTtlDirect("1","1",12);
}
// orderService.makeOrderTtlDirect("1","1",12);
}
}
总结:
不管是消息过期还是队列最大接收长度 只要你指定了死信队列到最后都会转移到死信队列里 就不一 一演示了
死信队列其实就是一个接盘机制
5.3.内存磁盘的监控
5..1.RabbitMQ内存警告
5.3.2.RabbitMQ的内存控制
参考帮助文档:http://www.rabbbitmq.com/configure.html
当出现警告的时候,可以通过配置去修改和调整
命令的方式
rabbitmqctl set_vm_memory_high_watermark <fraction>
rabbitmqctl set_vm_memory_high_watermark absolute 50MB
fraction/value 为内存阈值。默认情况是:0.4/2GB,代表的含义是:当 RabbitMQ的内存超过40%时,就会产生警告并且会阻塞所有生产者的连接。通过此命令修改阈值在 Broker重启以后将会失效,通过修改配置文件设置的阈值则不会随着重启而消失,但修改了配置文件一样要重启 Broker才会生效
分析
rabbitmqctl set_vm_memory_high_watermark absolute 50MB
配置文件方式 rabbitmq.conf
# 默认
# vm_memory_high_watermark.relative = 0.4
# 使用relative相对值进行设置fraction 建议取值在0.4~0.7之间 不建议超过0.7
vm_memory_high_watermark.relative = 0.6
# 使用absolute绝对值的方式 KB、MB、GB对应的命令如下
vm_memory_high_watermark.absolute = 2GB
5.3.3.RabbitMQ的内存换页
在某个Broker节点及内存阻塞生产者之前,它会尝试将队列中的消息换页到磁盘以释放内存空间,持久化和非持久化的消思都会写入磁盘中,其中持久化的消息本身就在磁盘中有一个副本,所以在转移的过程中持久化的消息会先从内存中清除掉
默认认情况下 内存到达的國值是50%时就会换页处理
也就是说 在默认情况下该内存的阈值是0.4的情况下 当内存超过0.4*0.5=0.2时 会进行换页动作
比如有1000MB内存 当内存的使用率达到500MB已经达到了极限 但是因为配置的换页内存0.5 这个时候会在达到极限400MB之前 会把内存中的200MB进行转移到磁盘中 从而达到稳健的运行
可以通过设置vm_memory_high_watermark_paging_ratio
来进行调整
vm_memory_high_watermark.relative = 0.4
vm_memory_high_watermark_paging_ratio = 0.7 # (设置小于1的值)
为什么设置小于1 因为你如果设置为1的阈值 内存都已经达到了极限了 你再去换页意义不是很大!
5..4.RabbitMQ的磁盘预警
当磁盘的剩余空间低于确定的阈值时 RabbitMQ同样会阻塞生产者 这样可以避免非持久化的消息持续换页而耗尽磁盘空间导致服务器崩溃
默认情况下:磁盘预警为50MB的时候会进行预警 表示当前磁盘空间到50MB的时候会阻塞生产者并且停止内存消息换页到磁盘过程
这个阈值可以减小 但是不能完全的消除 因磁盘耗尽而导致服务器崩溃的可能性 比如在两次磁盘空间的检查空隙内 第一次检查是60MB 第二次检查是1MB就会出现警告
说白了就是比如磁盘是100GB 你总的给人家留个5个GB 因为还要运行其他的 如果消息大小超过了 95个GB就会出现警告
通过命令方式修改如下:
# rabbitmqctl set_disk_free_limit <disk_limit>
# rabbitmqctl set_disk_free_limit memory_limit <fraction>
# disk_limit:固定单位 KB MB GB
# fraction:是相对的阈值 建议范围在:1.0~2.0之间 (相对内存)
5.4.集群
RabbitMQ这款消息队列中间件产品本身是基于Erlang编写,Erlang语言天生具备分布式特性(通过同步Erlang集群各节点的magic cookie来实现)。因此,RabbitMQ天然支持Clustering。这使得RabbitMQ本身不需要像ActiveMQ Kafka那样通过ZooKeeper分别来实现HA方案和保存集群的元数据。集群是保证可靠性的一种方式,同时可以通过水平扩展以达到增加消息吞吐量能力的目的。
在实际使用过程中多采取多机多实例部署方式,为了便于同学们练习搭建,有时候你不得不在一台机器上去搭建一个 rabbitmg集群,本章主要针对单机多实例这种方式来进行开展。
5.4.1.集群搭建
配置的前提是你的 rabbitmq可以运行起来,比如ps aix|grep rebbitmq
你能看到相关进程,又比如运行rabbitmqct status
你可以看到类似如下信息而不报错:
执行下面命令进行查看:
ps aux|grep rabbitmq
执行下面命令查看状态:
systemctl status rabbitmq-server
注意:确保RabbitMQ可以运行的 确保完成之后 把单机版的RabbitMQ服务停止 后台看不到RabbitMQ的进程为止
5.4.2.单机多实例搭建
场景:假设有两个rabbitmq节点 分别为rabbit-1和rabbit-2 rabbit-1作为主节点 rabbit-2作为从节点
启动命令:RABBITMQ_NODE_PORT=5672 RABBITMQ_NODENAME=rabbit-1 rabbitmq-server -detached
结束命令:rabbitmqctl -n rabbit-1 stop
启动第一个节点
RABBITMQ_NODE_PORT=5672 RABBITMQ_NODENAME=rabbit-1 rabbitmq-server start &
# 结果如下
[root@RabbitMQ rabbitmq]#
RabbitMQ 3.6.5. Copyright (C) 2007-2016 Pivotal Software, Inc.
## ## Licensed under the MPL. See http://www.rabbitmq.com/
## ##
########## Logs: /var/log/rabbitmq/rabbit-1.log
###### ## /var/log/rabbitmq/rabbit-1-sasl.log
##########
Starting broker...
至此节点rabbit-1启动完成
启动第二个节点
RABBITMQ_NODE_PORT=5673 RABBITMQ_SERVER_START_ARGS="-rabbitmq_management listener [{port,15673}]" RABBITMQ_NODENAME=rabbit-2 rabbitmq-server start &
# 结果如下
[root@RabbitMQ rabbitmq]#
RabbitMQ 3.6.5. Copyright (C) 2007-2016 Pivotal Software, Inc.
## ## Licensed under the MPL. See http://www.rabbitmq.com/
## ##
########## Logs: /var/log/rabbitmq/rabbit-2.log
###### ## /var/log/rabbitmq/rabbit-2-sasl.log
##########
Starting broker...
至此节点rabbit-2启动完成
验证启动
ps aux|grep rabbitmq
# 结果如下
[root@RabbitMQ rabbitmq]# ps aux|grep rabbitmq
root 6463 0.0 0.0 113116 1460 pts/0 S 19:01 0:00 /bin/sh /usr/sbin/rabbitmq-server start
root 6473 0.0 0.0 191692 2344 pts/0 S 19:01 0:00 su rabbitmq -s /bin/sh -c /usr/lib/rabbitmq/bin/rabbitmq-server 'start'
rabbitmq 6474 0.0 0.0 113124 1576 ? Ss 19:01 0:00 /bin/sh -e /usr/lib/rabbitmq/bin/rabbitmq-server start
rabbitmq 6573 2.1 1.8 1252860 72920 ? Sl 19:01 0:04 /usr/lib64/erlang/erts-7.3/bin/beam.smp -W w -A 64 -P 1048576 -t 5000000 -stbt db -K true -B i -- -root /usr/lib64/erlang -progname erl -- -home /var/lib/rabbitmq -- -pa /usr/lib/rabbitmq/lib/rabbitmq_server-3.6.5/ebin -noshell -noinput -s rabbit boot -sname rabbit-1 -boot start_sasl -kernel inet_default_connect_options [{nodelay,true}] -rabbit tcp_listeners [{"auto",5672}] -sasl errlog_type error -sasl sasl_error_logger false -rabbit error_logger {file,"/var/log/rabbitmq/rabbit-1.log"} -rabbit sasl_error_logger {file,"/var/log/rabbitmq/rabbit-1-sasl.log"} -rabbit enabled_plugins_file "/etc/rabbitmq/enabled_plugins" -rabbit plugins_dir "/usr/lib/rabbitmq/lib/rabbitmq_server-3.6.5/plugins" -rabbit plugins_expand_dir "/var/lib/rabbitmq/mnesia/rabbit-1-plugins-expand" -os_mon start_cpu_sup false -os_mon start_disksup false -os_mon start_memsup false -mnesia dir "/var/lib/rabbitmq/mnesia/rabbit-1" -kernel inet_dist_listen_min 25672 -kernel inet_dist_listen_max 25672 start
rabbitmq 6752 0.0 0.0 11536 460 ? Ss 19:01 0:00 inet_gethost 4
rabbitmq 6761 0.0 0.0 13632 720 ? S 19:01 0:00 inet_gethost 4
rabbitmq 8725 0.0 0.0 11572 332 ? S 18:08 0:00 /usr/lib64/erlang/erts-7.3/bin/epmd -daemon
root 11195 0.0 0.0 112664 980 pts/0 S+ 19:05 0:00 grep --color=auto rabbitmq
rabbit-1操作作为主节点
# 停止应用
rabbitmqctl -n rabbit-1 stop_app
# 目的是清除节点上的历史数据(如果不清除 无法将节点加入到集群)
rabbitmqctl -n rabbit-1 reset
# 启动应用
rabbitmqctl -n rabbit-1 start_app
rabbit-2操作作为从节点
# 停止应用
rabbitmqctl -n rabbit-2 stop_app
# 目的是清除节点上的历史数据(如果不清除 无法将节点加入到集群)
rabbitmqctl -n rabbit-2 reset
# 将rabbit-2节点加入到rabbit-1(主节点)集群当中[Server-node服务器的主机名]
rabbitmqctl -n rabbit-2 join_cluster rabbit-1@'rabbitmq'
# 启动应用
rabbitmqctl -n rabbit-2 start_app
验证集群状态
sudo rabbitmqctl cluster_status -n rabbit-1
# 结果如下
[root@rabbitmq ~]# sudo rabbitmqctl cluster_status -n rabbit-1
Cluster status of node rabbit-1@rabbitmq ...
[{nodes,[{disc,['rabbit-1@rabbitmq','rabbit-2@rabbitmq']}]},
{running_nodes,['rabbit-2@rabbitmq','rabbit-1@rabbitmq']},
{cluster_name,<<"rabbit-1@rabbitmq">>},
{partitions,[]},
{alarms,[{'rabbit-2@rabbitmq',[]},{'rabbit-1@rabbitmq',[]}]}]
Web监控
rabbitmq-plugins enable rabbitmq_management
注意在访问的时候:web界面的管理需要给15672 node-1和15673 node-2 设置用户和密码 如下:
rabbitmqctl -n rabbit-1 add_user admin admin
rabbitmqctl -n rabbit-1 set_user_tags admin administrator
rabbitmqctl -n rabbit-1 set_permissions -p / admin ".*" ".*" ".*"
rabbitmqctl -n rabbit-2 add_user admin admin
rabbitmqctl -n rabbit-2 set_user_tags admin administrator
rabbitmqctl -n rabbit-2 set_permissions -p / admin ".*" ".*" ".*"
小结:
Tips:
如果采用多机部署 需要读取其中一个节点的cookie 并且复制到其它节点(节点之间通过cookie确定相互是否通信)
cookie 存放在 /var/lib/rabbitmq/.erlang.cookie
例如:主机名分别为 rabbit-1 rabbit-2
逐个启动各节点
配置各节点的hosts文件(vim/etc/hosts)
IP1:rabbit-1
IP2:rabbit-2
如果不配置hosts文件的话 就需要指定主节点的IP地址
其它步骤跟单机模式部署雷同
5.5.分布式事务
5.5.1.简述
-
分布式事务指事务的操作位于不同的节点上,需要保证事务的ACID特性。
-
例如在下单场景下,库存和订单如果不在同一个节点上,就涉及分布式事务
5..2.分布式事务方式
在分布式系统中,要实现分布式事务,无外乎哪几种解决方案。
两阶段提交(2PC)需要数据库严商
两阶段提交(Two-phase Commit,2PC),通过引协调者(coordinator)来协调参与者的行为,并最终决定这些参与者是否真正要执行事务。
准备阶段
协调者询问参与事务是否执行成功,参与者发回事务执行结果
提交阶段
如果事务在每个参与者上都执行成功,事务协调者发送通知让参与者提交事务;否则,协调者发送通知让参与者回滚事务。
需要注意的是,在准备阶段,参与者执行了事务,但是还未提交。只有在提交阶段接收到协调者发来的通知后,才进行提交或者回滚。
存在的问题
- 同步阻塞所有事务参与者在等待其它参与者响应的时候都处于同步阻塞状态,无法进行其它操作。
- 单点问题协调者在2PC中起到非常大的作用,发生故障将会造成很大影响。特别是在阶段二发生故障,所有参与者会—直等待状态,无法完成其它操作。
- 数据不一致在阶段二,如果协调者只发送了部分Commit 消息,此时网络发生异常,那么只有部分参与者接收到Commit消息,也就是说只有部分参与者提交了事务,使得系统数据不一致。
- 太过保守任意一个节点失败就会导致整个事务失败,没有完善的容错机制。
补偿事务(TCC)严选,阿里、蚂蚁金服
TCC 其实就是采用的补偿机制,其核心思想是:针对每个操作,都要注册一个与其对应的确认和补偿(撒销)操作。它分为三个阶段:
- Try阶段主要是对业务系统做检测及资源预留
- Confirm阶段主要是对业务系统做确认提交,Try阶段执行成功并开始执行Confirm阶段时,默认---Confirm阶段是不会出错的。即:只要Try成功,Confirm一定成功。
- Cancel阶段主要是在业务执行错误,需要回滚的状态下执行的业务取消,预留资源释放。
举个例子,假入Bob要向Smith转账,思路大概是:我们有一个本地方法,里面依次调用
- 首先在Try阶段,要先调用远程接口把Smith 和 Bob 的钱给冻结起来。
- 在 Confirm阶段,执行远程调用的转账的操作,转账成功进行解冻。
- 如果第2步执行成功,那么转账成功,如果第二步执行失败,则调用远程冻结接口对应的解冻方法(Cancel)。
优点:跟2PC比起来,实现以及流程相对简单了一些,但数据的一致性比2PC也要差一些
缺点:缺点还是比较明显的,在2,3步中都有可能失败。TCC属于应用层的一种补偿方式,所以需要程序员在实现的时候多写很多补偿的代码,在一些场景中,一些业务流程可能用TCC不太好定义及处理。
本地消息(异步确保)比如:支付宝、微信支付主动查询支付状态,对账单的形式
本地消息表与业务数据表处于同一个数据库中,这样就能利用本地事务来保证在对这两个表的操作满足事务特性,并且使用了消息队列来保证最终—致性。
- 在分布式事务操作的一方完成写业务数据的操作之后向本地消息表发送一个消息,本地事务能保证这个消息一定会被写入本地消息表中。
- 之后将本地消息表中的消息转发到Kafka等消息队列中,如果转发成功则将消息从本地消息表中删除,否则继续重新转发。
- 在分布式事务操作的另一方从消息队列中读取一个消息,并执行消息中的操作。
- 优点:一种非常经典的实现,避免了分布式事务,实现了最终—致性。
- 缺点:消息表会耦合到业务系统中,如果没有封装好的解决方案,会有很多杂活需要处理。
MQ事务消息,异步场景,通用性较强,拓展性较高。
有一些第三方的MQ是支持事务消息的,比如RocketMQ,他们支持事务消息的方式也是类似于采用的二阶段提交,但是市面上一些主流的MQ都是不支持事务消息的,比如Kafka不支持。
以阿里的RabbitMQ中间件为例,其思路大致为:
- 第一阶段Prepared消息,会拿到消息的地址。第二阶段执行本地事务,第三阶段通过第一阶段拿到的地址去访问消息,并修改状态。
- 也就是说在业务方法内要想消息队列提交两次请求,一次发送消息和一次确认消息。如果确认消息发送失败了,RabbitMQ会定期扫描消息集群中的事务消息,这时候发现了Prepared消息,它会向消息发送者确认,所以生产方需要实现一个check接口,RabbitMQ会根据发送端设置的第略来决定是回滚还是继续发送确认消息。这样就保证了消息发送与本地事务同时成功或同时失败。
优点:实现了最终一致性,不需要依赖本地数据库事务。
缺点:实现难度大,主流MQ不支持,RocketMQ事务消息部分代码也未开源。
总结
通过本文我们总结并对比了几种分布式分解方案的优缺点,分布式事务本身是一个技术难题,是没有一种完美的方案应对所有场景的,具体还是要根据业务场景去抉择吧。阿里RocketMQ去实现的分布式事务,现在也有除了很多分布式事务的协调器,比如LCN等,大家可以多去尝试。
5.5.3.具体实现
分布式事务的完整架构图
系统与系统之间的分布式事务问题
系统间调用过程中事务回滚问题
package com.xiao.rabbitmq.service;2.
import com.xiao.rabbitmq.dao.orderDataBaseService;
import com.xiao.rabbitmq.pojo.Order;
import org.springframework.beans.factory .annotation.Autowired;
import org.springframework.http.client.SimpleclientHttpRequestFactory;
import org.springframework.stereotype. Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.web.client.RestTemplate;
public class OrderService {
@Autowired
private OrderDataBaseService orderDataBaseservice;
//创建订单
@Transactional(rollbackFor = Exception.class)//订单创建整个方法添加事务
public void createOrder(Order orderInfo) throws Exception {
// 1:订单信息--插入丁订单系统,订单数据库事务orderDataBaseService.saveOrder(orderInfo);
//2∶通通Http接口发途订单信息到运单系统
String result = dispatchHttpApi(orderInfo.getorderId());
if( !"success".equals(result)) {
throw new Exception("订单创建失败,原因是运单接口调用失败!");
}
}
/**
* 模拟http请求接口发途,运单系统,将订单号传过去 springcloud
*/
private String dispatchHttpApi(String orderId){
SimpleclientHttpRehyuestFactory factory - new SimpleClientHttpRequestFactory();
//链接超时>3秒
factory .setConnectTimeout ( 300e) ;
//处理超时>2秒
factory .setReadTimeout ( 2000) ;
//发送http请求
String url = "http: / /localhost:9000/dispatch/order?orderId="+orderId;
RestTemplate restTemplate = new RestTemplate(factory);//异常
String result = restTemplate.getForobject(url,string.class);
return result;
}
}
基于MQ的分布式事务整体设计思路
基于MQ的分布式事务消息的可靠生产问题-定时重发
如果这个时候MQ服务器出现了异常和故障,那么消息是无法获取到回执信息。怎么解决呢?
基于MQ的分布式事务消息的可靠消费
基于MQ的分布式事务消息的消息重发
解决消息重试的集中方案
- 控制重发的次数
- try+catch+手动ack
- try+catch+手动ack +死信队列处理
如果死信队列报错就进行人工处理
5.5.4.总结
基于MQ的分布式事务解决方案优点:
- 通用性强
- 拓展方便
- 耦合度低,方案也比较成熟
基于MQ的分布式事务解决方案缺点:
- 基于消息中间件,只适合异步场景
- 消息会延迟处理,需要业务上能够容忍
建议
- 尽量去避免分布式事务
- 尽量将非核心业务做成异步