RabbitMQ
1.RabbitMQ简介
1.1 什么是消息中间件
- 什么是MQ消息中间件?
- 全称MessageQueue,主要是⽤于程序和程序直接通信,异步+解耦
1.2 MQ使用场景
- 核心应用:
- 异步
- 解耦
- 削峰:秒杀、⽇志处理
- 分布式事务、最终⼀致性
- RPC调⽤上下游对接,数据源变动->通知下属
1.3 为什么使用MQ?MQ的优点?缺点?
- 异步处理 - 相比于传统的串行、并行方式,提高了系统吞吐量。
- 解耦 - 系统间通过消息通信,不用关心其他系统的处理,系统之间的关系解耦了。
- 流量削锋 - 可以通过消息队列长度控制请求量;可以缓解短时间内的高并发请 求。
- 日志处理 - 解决大量日志传输。
- 消息通讯 - 消息队列一般都内置了高效的通信机制,因此也可以用在纯的消息通 讯。比如实现点对点消息队列,或者聊天室等。
-
缺点:
-
系统可用性降低
本来系统运行好好的,现在你非要加入个消息队列进去,那消息队列挂了,你的 系统不是呵呵了。因此,系统可用性会降低;
-
系统复杂度提高
加入了消息队列,要多考虑很多方面的问题,比如:一致性问题、如何保证消息 不被重复消费、如何保证消息可靠性传输等。因此,需要考虑的东西更多,复杂 性增大。 一致性 A 系统处理完了直接返回成功了,人都以为你这个请求就成功了;但是问题是, 要是 BCD 三个系统那里,BD 两个系统写库成功了,结果 C 系统写库失败了, 咋整?你这数据就 不一致了。
-
1.4 公司生产环境用的是什么消息中间件
比如用的是 RabbitMQ,然后可以初步给一些你对不同MQ中间件技术的选型分析。
举个例子:比如说ActiveMQ是老牌的消息中间件,国内很多公司过去运用的还 是非常广泛的,功能很强大。
但是问题在于没法确认ActiveMQ可以支撑互联网公司的高并发、高负载以及高 吞吐的复杂场景,在国内互联网公司落地较少。而且使用较多的是一些传统企 业,用ActiveMQ做异步调用和系统解耦。
然后你可以说说RabbitMQ,他的好处在于可以支撑高并发、高吞吐、性能很 高,同时有非常完善便捷的后台管理界面可以使用。另外,他还支持集群化、高可用部署架构、消息高可靠支持,功能较为完善。而且经过调研,国内各大互联网公司落地大规模RabbitMQ集群支撑自身业务的 case较多,国内各种中小型互联网公司使用RabbitMQ的实践也比较多。除此之外,RabbitMQ的开源社区很活跃,较高频率的迭代版本,来修复发现的 bug以及进行各种优化,因此综合考虑过后,公司采取了RabbitMQ。
但是RabbitMQ也有一点缺陷,就是他自身是基于erlang语言开发的,所以导致 较为难以分析里面的源码,也较难进行深层次的源码定制和改造,毕竟需要较为 扎实的erlang语
言功底才可以。
然后可以聊聊RocketMQ,是阿里开源的,经过阿里的生产环境的超高并发、 高吞吐的考验,性能卓越,同时还支持分布式事务等特殊场景。
而且RocketMQ是基于Java语言开发的,适合深入阅读源码,有需要可以站在 源码层面解决线上生产问题,包括源码的二次开发和改造。
另外就是Kafka。Kafka提供的消息中间件的功能明显较少一些,相对上述几款 MQ中间件要少很多。
但是Kafka的优势在于专为超高吞吐量的实时日志采集、实时数据同步、实时数 据计算等场景来设计。
因此Kafka在大数据领域中配合实时计算技术(比如Spark Streaming、 Storm、Flink)使用的较多。但是在传统的MQ中间件使用场景中较少采用。
1.5 RabbitMQ核心概念
-
Broker
- RabbitMQ的服务端程序,可以认为⼀个mq节点就是⼀个broker
-
Producer⽣产者
- 创建消息Message,然后发布到RabbitMQ中
-
Consumer消费者:
- 消费队列⾥⾯的消息
-
Message 消息
- ⽣产消费的内容,有消息头和消息体,也包括多个属性配置,⽐如routingKey路由键
-
Queue 队列
- 是RabbitMQ 的内部对象,⽤于存储消息,消息都只能存储在队列中
-
Channel 信道
- 消息通道,在客户端的每个连接里,可建立多个channel,每个 channel代表一个会话任务
-
Connection连接
-
RabbitMQ的socket链接,它封装了socket协议相关部分逻辑,⼀个连接上可以有多个channel进⾏通
信
-
-
Exchange 交换器
- 消息交换机,它指定消息按什么规则,路由到哪个队列。队列和交换机是多对多的关系。
-
RoutingKey 路由键
- ⽣产者将消息发给交换器的时候,⼀般会指定⼀个RoutingKey,⽤来指定这个消息的路由规则
- 最⼤⻓度255 字节
- ⽣产者将消息发给交换器的时候,⼀般会指定⼀个RoutingKey,⽤来指定这个消息的路由规则
-
Binding 绑定
- 绑定,它的作用就是把exchange和queue按照路由规则绑定起来
⽣产者将消息发送给交换器时,需要⼀个RoutingKey,当BindingKey和 RoutingKey相匹配时,消息会被路由到对
应的队列中
- Virtual host 虚拟主机
- /dev:开发环境
- /test:测试环境
- /pro:生产环境
1.6 JMS消息服务和和常⻅核⼼概念
-
什么是JMS: Java消息服务(Java Message Service),Java平台中关于⾯向消息中间件的接⼝
-
特性
- 面向Java平台的标准消息传递API
- 在Java或JVM语言比如Scala、Groovy中具有互用性
- 无需担心底层协议
- 有queues和topics两种消息传递模型
- 支持事务、能够定义消息格式(消息头、属性和内容)
-
常⻅概念
-
JMS提供者:连接⾯向消息中间件的,JMS接⼝的⼀个
-
实现,RocketMQ,ActiveMQ,Kafka等等
-
JMS⽣产者(Message Producer):⽣产消息的服务
-
JMS消费者(Message Consumer):消费消息的服务
-
JMS消息:数据对象
-
JMS队列:存储待消费消息的区域
-
JMS主题:⼀种⽀持发送消息给多个订阅者的机制
-
JMS消息通常有两种类型:点对点(Point-to-Point)、发布/订阅(Publish/Subscribe)
-
-
什么是AMQP
- 解决消息传递交互问题的⼀种协议,兼容JMS
-
特性
- 独立于平台的底层消息传递协议
- 消费者驱动消息传递
- 跨语言和平台的互用性、属于底层协议
- 有5种交换类型direct,fanout,topic,headers,system
- 面向缓存的、可实现高性能、支持经典的消息队列,循环,存储和转发
- 支持长周期消息传递、支持事务(跨消息队列)
-
AMQP和JMS的主要区别
2.Docker快速安装RabbitMQ
-
Docker安装RabbitMQ
地址:https://hub.docker.com/_/rabbitmq/
#拉取镜像 docker pull rabbitmq:management docker run -d --hostname rabbit_host1 --name xd_rabbit -e RABBIT_MQ_DEFAULT=moufan -e RABBITMQ_DEFAULT_PASSWORD=moufan -p 15672:15672 -p 5672:5672 rabbitmq:management #介绍 -d 以守护进程⽅式在后台运⾏ -p 15672:15672 management 界⾯管理访问端⼝ -p 5672:5672 amqp 访问端⼝ --name:指定容器名 --hostname:设定容器的主机名,它会被写到容器内的/etc/hostname 和 /etc/hosts,作为容器主机IP的别名,并且将显示在容器的bash中 -e 参数 RABBITMQ_DEFAULT_USER ⽤户名 RABBITMQ_DEFAULT_PASS 密码
-
主要端口介绍
4369 erlang 发现⼝ 5672 client 端通信⼝ 15672 管理界⾯ ui 端⼝ 25672 server 间内部通信⼝
3.RabbitMQ工作队列实战模型
-
与SpringBoot整合
-
添加依赖
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.16.0</version> </dependency>
-
3.1 RabbitMQ简单队列
-
文档
-
消息生产者
-
步骤:
-
1.创建连接工厂ConnectionFactory
-
2.在ConnectionFactory对象中设置相关连接配置(mq主机地址、用户名、密码、虚拟主机、client端口号)
-
3.通过连接工厂创建与RabbitMQ进行socket连接的对象Connection
-
4.通过连接对象Connection创建信道(Channel)
-
5.通过信道(Channel)使用
channel.queueDeclare()
方法创建队列(队列已存在会被覆盖)- 第一个参数:队列名称
- 第二个参数:是否开启持久化配置:mq重启后还在
- 第三个参数:是否独占:只能有一个消费者监听队列;当connection关闭是否删除队列,一般是false,发布订阅是独占
- 第四个参数:自动删除: 当没有消费者的时候,自动删除掉,一般是false
- 第五个参数:其他,一般为null
-
6.通过信道(Channel)发布消息到队列中
channel.basicPublish()
-
第一个参数:交换机名称:不写则是默认的交换机
-
第二个参数:路由健名称(保证路由健需要和队列名称⼀样才可以被路由)
-
第三个参数: 配置信息
-
第四个参数:消息的字节数组
-
-
/** * @Author:fan * @Description:消息生产者 * @Date:2022-11-19 10:47 */ public class Send { private static final String QUEUE_NAME = "hello"; public static void main(String[] args) throws IOException, TimeoutException { //创建工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); //设置mq安装的主机地址 connectionFactory.setHost("47.99.247.95"); //设置账号 connectionFactory.setUsername("moufan"); //设置密码 connectionFactory.setPassword("moufan"); //设置虚拟主机 connectionFactory.setVirtualHost("/dev"); //设置端口号 connectionFactory.setPort(5672); Connection connection = connectionFactory.newConnection(); //创建信道 Channel channel = connection.createChannel(); /** * 队列名称 * 持久化配置:mq重启后还在 * 是否独占:只能有一个消费者监听队列;当connection关闭是否删除队列,一般是false,发布订阅是独占 * 自动删除: 当没有消费者的时候,自动删除掉,一般是false * 其他参数 * * 队列不存在则会自动创建,如果存在则不会覆盖,所以此时的时候需要注意属性 */ channel.queueDeclare(QUEUE_NAME, false, false, false, null); String message = "Hello World"; /** * 参数说明: * 交换机名称:不写则是默认的交换机,那路 * 由健需要和队列名称⼀样才可以被路由, * 路由健名称 * 配置信息 * 发送的消息数据:字节数组 * */ channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8)); } }
-
-
消息消费者
-
- 创建消费者对象Comsumer,重写
handleDelivery()
- 第一个参数 String consumerTag:一般是固定的,可以作为会话的名称
- 第二个参数 Envelope envelope:可以获取交换机、路由健等信息
- 第三个参数 AMQP.BasicProperties properties:获取配置信息
- 第四个参数 byte[] body :消息内容
- 创建消费者对象Comsumer,重写
-
- 通过Channel对象的
basicConsume(String QUEUE_NAME, Boolean true,Comsumer consumer)
方法,消费消息- 第一个参数:队列名称
- 第二个参数:autoAck:是否开启自动应答
- 第三个参数:消费者对象的回调接口
- 通过Channel对象的
public class Recv { private final static String QUEUE_NAME = "hello"; public static void main(String[] argv) throws Exception { //1.创建工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); //设置mq安装的主机地址 connectionFactory.setHost("47.99.247.95"); //设置账号 connectionFactory.setUsername("moufan"); //设置密码 connectionFactory.setPassword("moufan"); //设置虚拟主机 connectionFactory.setVirtualHost("/dev"); //设置端口号 connectionFactory.setPort(5672); //2.创建连接 Connection connection = connectionFactory.newConnection(); //3.创建信道 Channel channel = connection.createChannel(); /** * 队列名称 * 持久化配置:mq重启后还在 * 是否独占:只能有一个消费者监听队列;当connection关闭是否删除队列,一般是false,发布订阅是独占 * 自动删除: 当没有消费者的时候,自动删除掉,一般是false * 其他参数 * * 队列不存在则会自动创建,如果存在则不会覆盖,所以此时的时候需要注意属性 */ //4.队列声明 channel.queueDeclare(QUEUE_NAME, false, false, false, null); //5.创建消费者,重写handleDelivery Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //一般是固定的,可以作为会话的名称 System.out.println("consumerTag=" + consumerTag); //可以获取交换机、路由健等信息 System.out.println("envelope=" + envelope); System.out.println("properties=" + properties); System.out.println("body=" + new String(body, "utf-8")); } }; //6.消费消息 channel.basicConsume(QUEUE_NAME, true, consumer); } }
-
3.2 RabbitMQ工作队列
- 工作队列
- 消息生产者能力大于消费者能力,消费者多加节点
- 默认消费策略:轮询
3.2.1 轮询策略
-
生产者
private static final String QUEUE_NAME="work_mq_rr"; public static void main(String[] args) throws IOException, TimeoutException { //1.创建连接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("47.99.247.95"); connectionFactory.setPort(5672); connectionFactory.setUsername("moufan"); connectionFactory.setPassword("moufan"); connectionFactory.setVirtualHost("/dev"); //2.创建连接 Connection connection = connectionFactory.newConnection(); //3.创建信道 Channel channel = connection.createChannel(); //4.声明队列 channel.queueDeclare(QUEUE_NAME,false,false,false,null); //5.发送内容到消息队列 for(int i=1;i<=10;i++){ String message = "扬帆起航"+i; channel.basicPublish("",QUEUE_NAME,null,message.getBytes(StandardCharsets.UTF_8)); } }
-
消费者1
public class Recv1 { private static final String QUEUE_NAME="work_mq_rr"; public static void main(String[] args) throws IOException, TimeoutException { //1.创建连接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("47.99.247.95"); connectionFactory.setPort(5672); connectionFactory.setUsername("moufan"); connectionFactory.setPassword("moufan"); connectionFactory.setVirtualHost("/dev"); //2.创建连接 Connection connection = connectionFactory.newConnection(); //3.创建信道 Channel channel = connection.createChannel(); //4.创建队列声明 channel.queueDeclare(QUEUE_NAME,false,false,false,null); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //模拟消费者消费慢 try { TimeUnit.SECONDS.sleep(6); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("body="+new String(body,"utf-8")); //手工确认消息 channel.basicAck(envelope.getDeliveryTag(), false); } }; channel.basicConsume(QUEUE_NAME,false,consumer); } }
-
消费者2
public class Recv2 { private static final String QUEUE_NAME="work_mq_rr"; public static void main(String[] args) throws IOException, TimeoutException { //1.创建连接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("47.99.247.95"); connectionFactory.setPort(5672); connectionFactory.setUsername("moufan"); connectionFactory.setPassword("moufan"); connectionFactory.setVirtualHost("/dev"); //2.创建连接 Connection connection = connectionFactory.newConnection(); //3.创建信道 Channel channel = connection.createChannel(); //4.创建队列声明 channel.queueDeclare(QUEUE_NAME,false,false,false,null); //开启手工确认 Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //模拟消费者消费慢 try { TimeUnit.SECONDS.sleep(6); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("body="+new String(body,"utf-8")); //手工确认消息 channel.basicAck(envelope.getDeliveryTag(), false); } }; channel.basicConsume(QUEUE_NAME,false,consumer); } }
-
轮训策略验证
- 先启动两个消费者,再启动⽣产者
- 缺点:存在部分节点消费过快,部分节点消费慢,导致不能合理处理消息
3.2.2公平策略
- 公平策略验证
- 修改消费者策略
- 解决消费者能⼒消费不⾜的问题,降低消费时间问题
//限制消费者每次消费1个,消费完成再消息下一个
channel.basicQos(1);
4.Exchange交换机
- 什么是交换机
- 生产者将消息发送到Exchange,交换机将消息路由到⼀个或者多个队列中
- 如果没有队列和exchange绑定,或者没有符合的路由规则,则消息会被丢失
- 交换机类型
Direct exchange
定向(一对一)- 将⼀个队列绑定到交换机上,要求该消息与⼀个特定的路由键完全匹配
- 处理路由键
Fanout exchange
(常用)- 只需要简单的将队列绑定到交换机上,⼀个发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。
- Fanout交换机转发消息是最快的,⽤于发布订阅,⼴播形式,中⽂是扇形
- 不处理路由键
Topic exchange
通配符(最常用)- 主题交换机是⼀种发布/订阅的模式,结合了直连交换机与扇形交换机的特点
- 将路由键和某模式进⾏匹配。此时队列需要绑定要⼀个模式上
- 符号“#”匹配⼀个或多个词,符号“*”匹配不多不少⼀个词
- 例⼦:因此“abc.#”能够匹配到“abc.def.ghi”,但是“abc.*” 只会匹配到“abc.def”。
Headers exchange
5.发布订阅模型
-
什么是RabbitMQ的发布订阅模型
发布-订阅模型中,消息⽣产者不再是直接⾯对queue(队列名称),⽽是直⾯exchange,都需要经过 exchange来进⾏消息的发送, 所有发往同⼀个fanout交换机的消息都会被所有监听这个交换机的消费者接收到
-
⽂档:https://www.rabbitmq.com/tutorials/tutorial-three-java.htm
-
发布订阅模型应⽤场景
- 微信公众号
- 新浪微博关注
-
rabbitmq发布订阅模型
- 通过把消息发送给交换机,交互机转发给对应绑定的队列
- 交换机绑定的队列是排它独占队列,⾃动删除
-
生产者
public class Send { private final static String EXCHANGE_NAME = "exchange_fanout"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("10.211.55.13"); factory.setUsername("admin"); factory.setPassword("password"); factory.setVirtualHost("/dev"); factory.setPort(5672); //JDK7语法,自动关闭,创建连接 try (Connection connection = factory.newConnection(); //创建信道 Channel channel = connection.createChannel()) { //绑定交换机,fanout扇形,即广播 channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.FANOUT); String msg = "小滴课堂 rabbitmq 发布大课训练营综合项目"; channel.basicPublish(EXCHANGE_NAME,"",null,msg.getBytes(StandardCharsets.UTF_8)); System.out.println("广播消息发送成功"); } } }
-
消费者
ublic class Recv1 { private final static String EXCHANGE_NAME = "exchange_fanout"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("10.211.55.13"); factory.setUsername("admin"); factory.setPassword("password"); factory.setVirtualHost("/dev"); factory.setPort(5672); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); //绑定交换机,fanout扇形,即广播 channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.FANOUT); //获取队列 String queueName = channel.queueDeclare().getQueue(); //绑定交换机和队列, fanout交换机不用routingkey channel.queueBind(queueName,EXCHANGE_NAME,""); Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("body="+new String(body,"utf-8")); //手工确认消息消费,不是多条确认 channel.basicAck(envelope.getDeliveryTag(),false); } }; //消费,关闭消息消息自动确认,重要 channel.basicConsume(queueName,false,consumer); }
6.路由模式
-
什么是rabbitmq的路由模式
-
⽂档:https://www.rabbitmq.com/tutorials/tutorial-four-java.html
-
交换机类型是Direct
队列和交换机绑定,需要指定⼀个路由key( 也叫Bingding Key) 消息⽣产者发送消息给交换机,需要指定routingKey 交换机根据消息的路由key,转发给对应的队列
-
-
例⼦:⽇志采集系统 ELK
- ⼀个队列收集error信息-》告警
- ⼀个队列收集全部信息-》⽇常使⽤
-
消费者
public class Send { private final static String EXCHANGE_NAME = "exchange_direct"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("10.211.55.13"); factory.setUsername("admin"); factory.setPassword("password"); factory.setVirtualHost("/dev"); factory.setPort(5672); //JDK7语法,自动关闭,创建连接 try (Connection connection = factory.newConnection(); //创建信道 Channel channel = connection.createChannel()) { //绑定交换机,直连交换机 channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.DIRECT); String error = "我是订单服务的error日志"; String info = "我是订单服务的info日志"; String debug = "我是订单服务的debug日志"; channel.basicPublish(EXCHANGE_NAME,"errorRoutingKey",null,error.getBytes(StandardCharsets.UTF_8)); channel.basicPublish(EXCHANGE_NAME,"infoRoutingKey",null,info.getBytes(StandardCharsets.UTF_8)); channel.basicPublish(EXCHANGE_NAME,"debugRoutingKey",null,debug.getBytes(StandardCharsets.UTF_8)); System.out.println("direct消息发送成功"); } } }
-
消费者一,获取
errorRoutingKey
,infoRoutingKey
,debugRoutingKey
消息public class Recv1 { private final static String EXCHANGE_NAME = "exchange_direct"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("10.211.55.13"); factory.setUsername("admin"); factory.setPassword("password"); factory.setVirtualHost("/dev"); factory.setPort(5672); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); //绑定交换机,fanout扇形,即广播 channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.DIRECT); //获取队列 String queueName = channel.queueDeclare().getQueue(); //绑定交换机和队列, direct交换机需要指定routingkey channel.queueBind(queueName,EXCHANGE_NAME,"errorRoutingKey"); channel.queueBind(queueName,EXCHANGE_NAME,"infoRoutingKey"); channel.queueBind(queueName,EXCHANGE_NAME,"debugRoutingKey"); Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("body="+new String(body,"utf-8")); //手工确认消息消费,不是多条确认 channel.basicAck(envelope.getDeliveryTag(),false); } }; //消费,关闭消息消息自动确认,重要 channel.basicConsume(queueName,false,consumer); } }
-
消费者二、获取
errorRoutingKey
public class Recv2 { private final static String EXCHANGE_NAME = "exchange_direct"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("10.211.55.13"); factory.setUsername("admin"); factory.setPassword("password"); factory.setVirtualHost("/dev"); factory.setPort(5672); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); //绑定交换机,fanout扇形,即广播 channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.DIRECT); //获取队列 String queueName = channel.queueDeclare().getQueue(); //绑定交换机和队列, direct交换机需要指定routingkey channel.queueBind(queueName,EXCHANGE_NAME,"errorRoutingKey"); Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("body="+new String(body,"utf-8")); //手工确认消息消费,不是多条确认 channel.basicAck(envelope.getDeliveryTag(),false); } }; //消费,关闭消息消息自动确认,重要 channel.basicConsume(queueName,false,consumer); } }
7. topic主题通配符
- 什么是rabbitmq的主题模式
- ⽂档 https://www.rabbitmq.com/tutorials/tutorial-five-java.html
- 交换机是 topic, 可以实现发布订阅模式fanout和路由模式Direct 的功能,更加灵活,⽀持模式匹配,通配符等
- 交换机同过通配符进⾏转发到对应的队列,*** 代表⼀个词,#代表1个或多个词**,⼀般⽤#作为通配符居多
- 交换机和队列绑定时⽤的binding使⽤通配符的路由健
- ⽣产者发送消息时需要使⽤具体的路由健
-
例⼦:⽇志采集系统
- ⼀个队列收集订单系统的全部⽇志信息,order.log.#
- ⼀个队列收集全部系统的全部⽇志信息, #.log
-
生产者
public class Send { private final static String EXCHANGE_NAME = "exchange_topic"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("10.211.55.13"); factory.setUsername("admin"); factory.setPassword("password"); factory.setVirtualHost("/dev"); factory.setPort(5672); //JDK7语法,自动关闭,创建连接 try (Connection connection = factory.newConnection(); //创建信道 Channel channel = connection.createChannel()) { //绑定交换机,topic交换机 channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.TOPIC); String error = "我是订单服务的error日志"; String info = "我是订单服务的info日志"; String debug = "我是商品服务的debug日志"; channel.basicPublish(EXCHANGE_NAME,"order.log.error",null,error.getBytes(StandardCharsets.UTF_8)); channel.basicPublish(EXCHANGE_NAME,"order.log.info",null,info.getBytes(StandardCharsets.UTF_8)); channel.basicPublish(EXCHANGE_NAME,"product.log.debug",null,debug.getBytes(StandardCharsets.UTF_8)); System.out.println("TOPIC消息发送成功"); } } }
-
消费者一
public class Recv1 { private final static String EXCHANGE_NAME = "exchange_topic"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("10.211.55.13"); factory.setUsername("admin"); factory.setPassword("password"); factory.setVirtualHost("/dev"); factory.setPort(5672); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); //绑定交换机, channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.TOPIC); //获取队列 String queueName = channel.queueDeclare().getQueue(); //绑定交换机和队列, 需要指定routingkey channel.queueBind(queueName,EXCHANGE_NAME,"order.log.error"); Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("body="+new String(body,"utf-8")); //手工确认消息消费,不是多条确认 channel.basicAck(envelope.getDeliveryTag(),false); } }; //消费,关闭消息消息自动确认,重要 channel.basicConsume(queueName,false,consumer); } }
-
消费者二
public class Recv2 { private final static String EXCHANGE_NAME = "exchange_topic"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("10.211.55.13"); factory.setUsername("admin"); factory.setPassword("password"); factory.setVirtualHost("/dev"); factory.setPort(5672); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); //绑定交换机, channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.TOPIC); //获取队列 String queueName = channel.queueDeclare().getQueue(); //绑定交换机和队列, 需要指定routingkey channel.queueBind(queueName,EXCHANGE_NAME,"*.log.*"); Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("body="+new String(body,"utf-8")); //手工确认消息消费,不是多条确认 channel.basicAck(envelope.getDeliveryTag(),false); } }; //消费,关闭消息消息自动确认,重要 channel.basicConsume(queueName,false,consumer); } }
-
RabbitMQ的多个⼯作模式总结
-
官网
-
简单模式
- ⼀个⽣产、⼀个消费,不⽤指定交换机,使⽤默认交换机
-
⼯作队列模式
- ⼀个⽣产,多个消费,可以有轮训和公平策略,不⽤指定交换机,使⽤默认交换机
-
发布订阅模式
- fanout类型交换机,通过交换机和队列绑定,不⽤指定绑定的路由健,⽣产者发送消息到交换机,fanout交换机直接进⾏转发,消息不⽤指定routingkey路由健
-
路由模式
- direct类型交换机,过交换机和队列绑定,指定绑定的路由健,⽣产者发送消息到交换机,交换机根据消息的路由key进⾏转发到对应的队列,消息要指定routingkey路由健
-
8.SpringAMQP介绍+SpringBoot2.X项⽬创建
-
什么是Spring-AMQP
- 官⽹:https://spring.io/projects/spring-amqp
- Spring 框架的AMQP消息解决⽅案,提供模板化的发送和接收消息的抽象层,提供基于消息驱动的 POJO的消息监听等
-
创建方式一
-
官网创建
-
-
创建方式二
-
添加依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
-
修改配置文件
server: port: 8888 spring: rabbitmq: host: 47.99.247.95 port: 5672 virtual-host: /dev username: moufan password: moufan
-
添加RabbitMQConfig⽂件,
/** * @Author:fan * @Description:自动生成交换机和队列,并把交换机与队列进行绑定 * @Date:2022-11-19 22:14 */ @Configuration public class RabbitMQConfig { public static final String EXCHANGE_NAME = "order_exchange"; public static final String QUEUE_NAME = "order_queue"; /** * 交换机 * @return */ @Bean public Exchange orderExchange(){ return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build(); } /** * 队列 * @return */ @Bean public Queue orderQueue(){ return QueueBuilder.durable(QUEUE_NAME).build(); } /** * 交换机和队列绑定关系 */ @Bean public Binding orderBinding(Queue queue,Exchange exchange){ return BindingBuilder.bind(queue).to(exchange).with("order.#").noargs(); } }
-
-
消费者---对队列中消息进行监听并处理
@Component //监听到 order_queue队列中有消息,便进行处理 @RabbitListener(queues = "order_queue") public class OrderMQListener { /** * * 对消息进行处理 */ @RabbitHandler public void messageHandler(String body, Message message, Channel channel) throws IOException { long msgTag = message.getMessageProperties().getDeliveryTag(); System.out.println("msgTag="+msgTag); System.out.println("message="+message.toString()); System.out.println("body="+body); channel.basicAck(msgTag,true); } }
-
生产者,使用
RabbitTemplate
@SpringBootTest class DemoApplicationTests { @Autowired private RabbitTemplate template; @Test void send() { template.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"order.new","新订单来啦1"); } }
9.消息可靠性投递加消费
9.1 简介
-
什么是消息可靠性投递
-
保证消息百分百发送到消息队列中去
保证mq节点成功接受消息 消息发送端需要接受到mq服务端接受到消息的确认应答 完善的消息补偿机制,发送失败的消息可以再感知并⼆次处理
-
-
RabbitMQ消息投递路径
-
⽣产者-->交换机->队列->消费者
- 消息发送确认机制
- confirmCallback
- returnCallback
⽣产者到交换机 通过confirmCallback 交换机到队列 通过returnCallback
- 消息发送确认机制
-
建议
开启消息确认机制以后,保证了消息的准确送达,但由于频繁的确认交互, rabbitmq 整体效率变低,吞吐量下降严重,不是⾮常重要的消息真⼼不建议⽤消息确认机制
-
9.2 confirmCallback
-
⽣产者到交换机
-
通过confirmCallback,⽣产者投递消息后,如果Broker收到消息后,会给⽣产者⼀个ACK。⽣产者通过ACK,可以确认这条消息是否正常发送到Broker,这种⽅式是消息可靠性投递的核⼼
-
开启confirmCallback
server: port: 8080 #消息队列 spring: rabbitmq: host: 10.211.55.13 port: 5672 virtual-host: /dev password: password username: admin #开启消息二次确认,生产者到broker的交换机 publisher-confirm-type: correlated
-
生产者到交换机可靠性投递测试
@Autowired private RabbitTemplate template; /** * 生产者到交换机可靠性投递测试 */ @Test void testConfirmCallback(){ template.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { /** * @param correlationData 配置 * @param ack 交换机是否收到消息,true是成功,false是失败 * @param cause 失败的原因 */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { System.out.println("ConfirmCallback======>"); System.out.println("correlationData======>correlationData="+correlationData); System.out.println("ack======>ack="+ack); System.out.println("cause======>cause="+cause); if(ack){ System.out.println("发送成功"); //更新数据库 消息的状态为成功 TODO }else { System.out.println("发送失败,记录到日志或者数据库"); //更新数据库 消息的状态为失败 TODO } } }); //数据库新增一个消息记录,状态是发送 TODO //发送消息 //template.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, "order.new","新订单"); //模拟异常 template.convertAndSend(RabbitMQConfig.EXCHANGE_NAME+" xdclass", "order.new","新订单"); }
-
9.3 returnCallback
-
交换机到队列
- 通过returnCallback,消息从交换器发送到对应队列失败时触发
- 两种模式
- 交换机到队列不成功,则丢弃消息(默认)
- 交换机到队列不成功,返回给消息⽣产者,触发
-
第一步 开启returnCallback
#新版 spring.rabbitmq.publisher-returns=true
-
第⼆步 修改交换机投递到队列失败的策略
#为true,则交换机处理消息到路由失败,则会返回给⽣产者 spring.rabbitmq.template.mandatory=true
-
配置文件
server: port: 8080 #消息队列 spring: rabbitmq: host: 10.211.55.13 port: 5672 virtual-host: /dev password: password username: admin #开启消息二次确认,生产者到broker的交换机 publisher-confirm-type: correlated #开启消息二次确认,交换机到队列的可靠性投递 publisher-returns: true #为true,则交换机处理消息到路由失败,则会返回给生产者 template: mandatory: true
-
交换机到队列,可靠性投递测试
/** * 交换机到队列可靠性投递测试 */ @Test void testReturnCallback(){ template.setReturnsCallback(new RabbitTemplate.ReturnsCallback() { @Override public void returnedMessage(ReturnedMessage returned) { int code = returned.getReplyCode(); System.out.println("code="+code); System.out.println("returned="+returned.toString()); } }); //template.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, "order.new","新订单ReturnsCallback"); //模拟异常 template.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, "xdclass.order.new","新订单ReturnsCallback"); }
9.4 消息确认机制ACK
-
消费者从broker中监听消息,需要确保消息被合理处理
-
ACK
- 消费者从RabbitMQ收到消息并处理完成后,反馈给RabbitMQ,RabbitMQ收到反馈后才将此消息从队列中删除
- 消费者在处理消息出现了⽹络不稳定、服务器异常等现象,那么就不会有ACK反馈,RabbitMQ会认为这个消息没有正常消费,会将消息重新放⼊队列中
- 只有当消费者正确发送ACK反馈,RabbitMQ确认收到后,消息才会从RabbitMQ服务器的数据中删除
- 消息的ACK确认机制默认是打开的,消息如未被进⾏ACK的消息确认机制,这条消息被锁定Unacked
-
确认方式
- ⾃动确认(默认)
- ⼿动确认 manual
spring: rabbitmq: #开启⼿动确认消息,如果消息重新⼊队,进⾏重试 listener: simple: acknowledge-mode: manual
-
实战
@Component @RabbitListener(queues = "order_queue") public class OrderMQListener { @RabbitHandler public void messageHandler(String body, Message message, Channel channel) throws IOException { long msgTag = message.getMessageProperties().getDeliveryTag(); System.out.println("msgTag="+msgTag); System.out.println("message="+message.toString()); System.out.println("body="+body); //复杂业务逻辑 //告诉broker,消息已经被确认 channel.basicAck(msgTag,false); //告诉broker,消息拒绝确认 //channel.basicNack(msgTag,false,true); //channel.basicReject(msgTag,true); } }
-
deliveryTag介绍
- 表示消息投递序号,每次消费消息或者消息重新投递后,deliveryTag都会增加
-
basicNack和basicReject介绍
- basicReject⼀次只能拒绝接收⼀个消息,可以设置是否requeue。
- basicNack⽅法可以⽀持⼀次0个或多个消息的拒收,可以设置是否requeue。
-
当消息发送失败,会一直拒绝,怎样处理
- ⼈⼯审核异常消息
- 设置重试阈值,超过后确认消费成功,记录消息,⼈⼯处理
- ⼈⼯审核异常消息
10.死信队列+延迟队列
- 什么是TTL
- time to live 消息存活时间,如果消息在存活时间内未被消费,则会被清除
- RabbitMQ⽀持两种ttl设置
- 单独消息进⾏配置ttl
- x-message-ttl
- 如果队列头部消息未过期,队列中级消息已经过期,已经还在队列⾥⾯
- 整个队列进⾏配置ttl(居多)
- expiration
- 单独消息进⾏配置ttl
- 两者都配置的话,时间短的先触发
- 什么是rabbitmq的死信队列
- 没有被及时消费的消息存放的队列
- 什么是rabbitmq的死信交换机
- Dead Letter Exchange(死信交换机,缩写:DLX)当消息成为死信后,会被重新发送到另⼀个交换机
当消息生产者推送消息到交换机中,交换机根据路由key推送到队列中;当队列中的消息过期成为死信,消息会重新发送到死信交换机中,死信交换机根据路由key把消息发送到死信队列中,让里一个消费者消费死信队列中的消息
-
消息有哪⼏种情况成为死信
- 消费者拒收消息(basic.reject/ basic.nack),并且没有重新⼊队 requeue=false
- 消息在队列中未被消费,且超过队列或者消息本身的过期时间TTL(time-to-live)
- 队列的消息⻓度达到极限
- 结果:消息成为死信后,如果该队列绑定了死信交换机,则消息会被死信交换机重新路由到死信队列
-
模拟实战
- 新建死信交换机(和普通没区别)
- 新建死信交换机(和普通没区别)
-
新建死信队列 (和普通没区别)
-
死信交换机和队列绑定
-
新建普通队列,设置过期时间、指定死信交换机
11 延迟队列应用场景
- 什么是延迟队列
- ⼀种带有延迟功能的消息队列,Producer 将消息发送到消息队列 服务端,但并不期望这条消息⽴⻢投递,⽽是推迟到在当前时间点之后的某⼀个时间投递到Consumer 进⾏消费,该消息即定时消息
- 使⽤场景
- 通过消息触发⼀些定时任务,⽐如在某⼀固定时间点向⽤户发送提醒消息
- ⽤户登录之后5分钟给⽤户做分类推送、⽤户多少天未登录给⽤户做召回推送;
- 消息⽣产和消费有时间窗⼝要求:⽐如在天猫电商交易中超时未⽀付关闭订单的场景,在订单创建时会发送⼀条 延时消息。这条消息将会在 30 分钟以后投递给消费者,消费者收到此消息后需要判断对应的订单是否已完成⽀付。 如⽀付未完成,则关闭订单。如已完成⽀付则忽略
12.常用API
12.1 ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory()
:创建连接工厂setHost(String ip)
:设置MQ的主机地址setUserName(String userName)
:设置MQ账号setPassword(String pwd)
:设置MQ密码setVirtualHost(String dev)
:设置虚拟主机setPort(String port)
:设置client 通信端口号Connection connection = connectionFactory.newConnection()
:与RabbitMQ的socket连接对象
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置mq安装的主机地址
connectionFactory.setHost("47.99.247.95");
//设置账号
connectionFactory.setUsername("moufan");
//设置密码
connectionFactory.setPassword("moufan");
//设置虚拟主机
connectionFactory.setVirtualHost("/dev");
//设置client端口号
connectionFactory.setPort(5672);
//创建与RabbitMQ的socket连接的对象Connection
Connection connection = connectionFactory.newConnection();
12.2 Connection
-
RabbitMQ的socket链接,它封装了socket协议相关部分逻辑,⼀个连接上可以有多个channel进⾏通
信
-
Channel createChannel()
:创建信道//1.创建连接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); //2.创建与RabbitMQ的socket连接的对象Connection Connection connection = connectionFactory.newConnection(); //3.创建信道 Channel channel = connection.createChannel();
12.3 Channel
- 信道:消息通道,在客户端的每个连接里,可建立多个channel,每个 channel代表一个会话任务
Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
:创建队列- String queue:队列名称
- boolean durable:是否开启持久化配置:mq重启后还在
- boolean exclusive:是否独占:只能有一个消费者监听队列;当connection关闭是否删除队列,一般是false,发布订阅是独占
- boolean autoDelete:自动删除: 当没有消费者的时候,自动删除掉,一般是false
- Map<String, Object> arguments:设置队列的一些其他参数,一般为null
void basicPublish(String exchange, String routingKey, AMQP.BasicProperties props, byte[] body)
:发布消息到队列- String exchange:交换机名称:不写则是默认的交换机
- String routingKey:路由健名称(保证路由健需要和队列名称⼀样才可以被路由)
- AMQP.BasicProperties props: 配置信息
- byte[] body:消息的字节数组
String basicConsume(String queue, boolean autoAck, Consumer callback)
:消费队列中消息- 第一个参数:队列名称
- 第二个参数:autoAck:是否开启自动应答
- 第三个参数:消费者对象的回调接口
void basicAck(long deliveryTag, boolean multiple)
:手动应当,- long deliveryTag:表示消息投递序号,每次消费消息或者消息重新投递后,deliveryTag都会增加,用来区分消费者
- boolean multiple:是否批量true:将一次性ack所有小于deliveryTag的消息。
当开启手动应答时,需要关闭自动应答
-
void basicNack(long deliveryTag, boolean multiple, boolean requeue)
:可以⽀持⼀次0个或多个消息的拒收,可以设置是否requeue- long deliveryTag:表示消息投递序号,每次消费消息或者消息重新投递后,deliveryTag都会增加,用来区分消费者
- boolean multiple:是否批量true:将一次性ack所有小于deliveryTag的消息。
- boolean requeue:消息拒收后是否重回队列
-
void basicReject(long deliveryTag, boolean requeue)
:⼀次只能拒绝接收⼀个消息- long deliveryTag:表示消息投递序号,每次消费消息或者消息重新投递后,deliveryTag都会增加,用来区分消费者
- boolean requeue:消息拒收后是否重回队列
-
void basicQos(int prefetchCount)
:限制消费者每次消费的个,消费完成再消息下一个 -
Exchange.DeclareOk exchangeDeclare(String exchange, BuiltinExchangeType type)
:信道绑定交换机-
第一个参数String exchange:交换机名字
-
第二个参数BuiltinExchangeType type:交换机类型
目录
-