RabbitMq
什么是MQ:
是一种存放消息的队列;还是一种跨进程的通讯机制,用于两个微服务之间的消息通讯;
消息中间件
作用于分布式系统之间的通讯
且必须是异步处理的场景
提升系统的吞吐量:单位时间处理请求的个数
底层就是一个队列,但是队列是不支持持久化的;且不支持跨进程的;
mq的作用
流量削峰:当请求超过了系统能处理的最大值时,使用消息队列来作为缓冲,将一秒内的请求分成一段时间来处理;
应用解耦:微服务之间的依赖性降低,当一个微服务出现故障时,正常功能依旧能够实现,等出现故障的微服务修复后,依旧能够完成自己的功能: (比如物流系统因为发生故障,需要几分钟来修复。在这几分钟的时间里,物流系统要处理的内存被缓存在消息队列中,用户的下单操作可以正常完成。当物流系统恢复后,继续处理订单信息即可,中单用户感受不到物流系统的故障,提升系统的可用性。)
异步处理:用于提升系统的吞吐量
MQ的分类
kafka:为大数据而生的消息中间件,以其百万级TPS的吞吐量名声大噪
优点:性能卓越,单机写入TPS约在百万条/秒,最大的优点,就是吞吐量高。
但是需要专业的团队来进行维护,且一般的企业是不需要那么高的吞吐量的
RabbitMQ:
吞吐量也可以达到万级,足够一般公司使用
性能稳定,使用erlang语言编写的;
开源提供的管理界面很好
社区活跃度高--->更新频率高
RabbitMq的四大核心概念
生产者:产生数据发送消息的程序
交换机:接收生产者发送到的消息,并且推送到队列中:
- 注意的是交换机必须确切的知道如何处理接收到的消息
- 可以推送到多个队列
- 也可以推送到特定的队列
- 或者是将消息丢弃
- 这些都是和交换机的类型有关的
队列:队列的本质就是一个很大的消息缓冲区,队列仅受主机的内存和磁盘的约束;
多个生产者可以向同一个队列发送消息,多个消费者也可以从同一个队列中接收消息
消费者:消费者就是等待从队列中接收消息的程序;
消费者和生产者是同一个程序
RabbitMq的基本架构
由生产者通过信道发送消息哦到信道--->信道(原来会发送一次信息给交换机就需要建立一次连接,这样很耗费资源,所以使用信道来减少资源的浪费Channel作为轻量级的Connection极大减少了操作系统建立TCP connection的开销)
由交换机将消息推送至队列,需要注意的是交换机是不能储存消息的,根据交换机的类型不同,匹配队列的方式是不同的:逻辑上的结构是由一个一个的虚拟机组成一个Broker
Broker:是负责消息的分发的
且Broker里面包含多个虚拟机
每个虚拟机里面都可以由一个交换机和多个队列组成
队列是整个消息中间件的核心组件
可以设置队列中消息的过期时间,以及持久化
队列和交换机的绑定需要路由key的,由路由key--->发送到交换机的消息,根据路由key匹配到队列中
关于AMQP协议的补充
AMQP协议是具有现代特征的二进制协议。是一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。
- Server:接收客户端的连接,实现AMQP实体服务。
- Connection:连接,应用程序与Server的网络连接,TCP连接。
- Channel:信道,消息读写等操作在信道中进行。客户端可以建立多个信道,每个信道代表一个会话任务。
- Message:消息,应用程序和服务器之间传送的数据,消息可以非常简单,也可以很复杂。由Properties和Body组成。Properties为外包装,可以对消息进行修饰,比如消息的优先级、延迟等高级特性;Body就是消息体内容。
- Virtual Host:虚拟主机,用于逻辑隔离。一个虚拟主机里面可以有若干个Exchange和Queue,同一个虚拟主机里面不能有相同名称的Exchange或Queue。
- Exchange:交换器,接收消息,按照路由规则将消息路由到一个或者多个队列。如果路由不到,或者返回给生产者,或者直接丢弃。RabbitMQ常用的交换器常用类型有direct、topic、fanout、headers四种,后面详细介绍。
- Binding:绑定,交换器和消息队列之间的虚拟连接,绑定中可以包含一个或者多个RoutingKey。
- RoutingKey:路由键,生产者将消息发送给交换器的时候,会发送一个RoutingKey,用来指定路由规则,这样交换器就知道把消息发送到哪个队列。路由键通常为一个“.”分割的字符串,例如“com.rabbitmq”。
- Queue:消息队列,用来保存消息,供消费者消费。
消费原理
集群中有两个节点,每个节点上有一个broker,每个broker负责本机上队列的维护,并且borker之间可以互相通信。集群中有两个队列A和B,每个队列都分为master queue和mirror queue(备份)。
需要注意的是:只有主队列可以完成读写操作,若连接的不是主队列则会通过路由,还是会连接主队列;
由于这种结构,导致RabbitMq,只能是单节点进行读写操作,吞吐量就会受限;
自动应答与手动应答的区别
自动应答:吞吐量大,但是消息容易丢失
手动应答:消息不容易丢失:这也是消息百分百投递的一个重点
- A.Channel.basicAck(用于肯定确认)
- B.Channel.basicNack(用于否定确认
- C.Channel.basicReject(用于否定确认):与Channel.basicNack相比少一个参数不处理该消息了直接拒绝,可以将其丢弃
消息的过期时间
Time To Live,也就是生存时间,是一条消息在队列中的最大存活时间
RabbitMQ可以对消息和队列设置TTL。
RabbitMQ支持设置消息的过期时间,在消息发送的时候可以进行指定,每条消息的过期时间可以不同。
RabbitMQ支持设置队列的过期时间,从消息入队列开始计算,直到超过了队列的超时时间配置,那么消息会变成死信,自动清除。
如果两种方式一起使用,则过期时间以两者中较小的那个数值为准。
当然也可以不设置TTL,不设置表示消息不会过期;如果设置为0,则表示除非此时可以直接将消息投递到消费者,否则该消息将被立即丢弃。
需要注意的是,若是在一个队列中存在两条消息,A消息的过期时间为10s,B消息的过期时间为5秒,但是A消息先进入队列,那么在A消息没有被抛弃之前B消息也不会被抛弃;
Mq的持久化
RabbitMQ的持久化分为三个部分:交换器持久化、队列持久化和消息的持久化。
交换机的持久化:
① Exchange 的持久化,声明交换机时指定持久化参数(durable)为 true 即可。声明时是默认持久化的
如果交换机不设置持久化,那么在 RabbitMQ 服务器重启之后,相关的交换机元数据会丢失,不过消息不会丢失,只是不能将消息发送到这个交换机中了。对于一个长期使用的交换机来说,建议将其设置为持久化的。
消息的持久化:
③ message 的持久化,使用 convertAndSend 方式发送消息,消息默认是持久化的
在保证了队列的持久化后,开启 Message 的持久化就能保证在其内部的持久化存储
队列的持久化:
② queue 的持久化,声明队列时指定持久化参数(durable)为 true 即可。声明时是默认持久化的
如果队列不设置持久化,那么在RabbitMQ服务重启之后,相关队列的元数据会丢失,此时数据也会丢失。
队列的持久化能保证其本身的元数据不会因异常情况而丢失,但是不能保证内部所存储的消息不会丢失。
消息发布确认(参考----消息的百分百投递)
发布确认:生产者向交换机发送一个消息,交换机需要给一个回馈(若给交换机发送一个消息,但是队列中不存在,就存在两种可能性:第一生产者发送给交换机时就消息丢失了;第二种:消息在交换机推送到队列中失败了,可能没有对应路由的交换机);即若没有消息发布确认机制,消息丢失以后可能不知在哪丢失了
回调方法:
配置一个发布确认模式开启
回退消息
在交换机无法处理消息时,将消息回退
交换机:
交换机的类型
direct(直连交换机):将队列绑定到交换机,消息的 routeKey 需要与队列绑定的 routeKey 相同。
fanout (扇形交换机):不处理 routeKey ,直接把消息转发到与其绑定的所有队列中。
topic(主题交换机):根据一定的规则,根据 routeKey 把消息转发到符合规则的队列中,其中 # 用于匹配符合一个或者多个词(范围更广), * 用于匹配一个词。
headers (头部交换机):根据消息的 headers 转发消息而不是根据 routeKey 来转发消息, 其中 header 是一个 Map,也就意味着不仅可以匹配字符串类型,也可以匹配其他类型数据。 规则可以分为所有键值对匹配或者单一键值对匹配。
绑定
交换机和队列的桥梁
死信队列
什么是死信:没办法被消费的消息
- 消息ttl:消息的存放时间超时了
- 消息被拒
- 队列满了,消息无法进入队列
死信队列
死信交换机:
死信队列:
实质上就是普通的队列
延迟队列
一般用在订单的自动取消
利用TTL和死信队列是可以完成延时队列效果的
方案一:使用队列的过期时间
@Bean public Queue testQueue(){ return QueueBuilder.durable(TEST_QUEUE) .deadLetterExchange(DEAD_EXCHANGE) .deadLetterRoutingKey(DEAD_ROUTING_KEY) // 10s 过期 .ttl(10000) .build(); }
方案二:使用消息的过期时间
当两条消息进入同一个交换机时,且都设置了过期时间,那么若先过期的消息没有进入死信交换机,后面进入的消息就算过期了也会被阻塞住!
(补充知识点----当消息和队列都设置了过期时间,以时间先到的为准)
实现精准延时的方法
使用插件
延迟交换机并不是rabbitmq自带的功能,而是要通过安装延迟交换机插件
delayed_message_exchange
来实现其插件的安装我们之间已经讲解过,不再累叙,可以参考如下博文 springcloud:安装rabbitmq并配置延迟队列插件
通过延迟交换机实现的延迟消息,其重点主要在交换机上,队列就是普通队列,消息发送到交换机上后,会记录消息的延迟时间,到达时间后才会发送到队列中,这样消费者通过监控队列,就能在指定时间获取到消息
因此延迟交换机与普通交换机的实现,只在创建交换机时,其他的操作与普通交换机无异,因此使用起来也很方便
创建延迟交换机,通过
x-delayed-type
属性声明交换机类型,可以是direct也可以是topic,具体支持4中交换机类型
应用场景
1.订单超时未支付,自动关闭订单
通过使用延时队列来实现
由订单微服务自己发送消息---->订单微服务自己消费
2.自动签收
使用实例:
标签:持久,队列,RabbitMq,过期,交换机,消息,路由 From: https://www.cnblogs.com/xuzhidong/p/16845459.html依赖:
<!--RabbitMQ依赖--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
配置文件
spring: rabbitmq: host: 192.168.18.230 port: 5672 username: admin password: 123 listener: simple: acknowledge-mode: manual//设置手动签收
配置类(用来设置交换机为延迟交换机,注意需要延时交换机的插件)
@Configuration public class CancelOrderQueueConfig { @Bean public Queue cancelOrderQueue(){ return new Queue(MqConst.CANCEL_ORDER_QUEUE,false); } //由于采用延迟插件 自定义交换机 @Bean public CustomExchange cancelOrderExchange(){ Map<String, Object> arguments=new HashMap<>(); //常规交换机类型 arguments.put("x-delayed-type","direct"); return new CustomExchange(MqConst.CANCEL_ORDER_EXCHANGE,"x-delayed-message",false,true,arguments); } //队列和交换机的绑定 @Bean public Binding bindingDelayedQueue(@Qualifier("cancelOrderQueue") Queue cancelOrderQueue, @Qualifier("cancelOrderExchange") CustomExchange cancelOrderExchange){ return BindingBuilder.bind(cancelOrderQueue).to(cancelOrderExchange).with(MqConst.CANCEL_ORDER_ROUTE_KEY).noargs(); } }
生产者
@Autowired private RabbitTemplate rabbitTemplate; //发送一个延迟消息 超时自动取消订单 rabbitTemplate.convertAndSend(MqConst.CANCEL_ORDER_EXCHANGE, MqConst.CANCEL_ORDER_ROUTE_KEY, orderInfoId, correlationData -> { correlationData.getMessageProperties().setDelay(cancelOrderDelay); return correlationData;
消费者
@Autowired private RabbitTemplate rabbitTemplate; @RabbitListener(queues = MqConst.CANCEL_ORDER_QUEUE) public void cancelOrder(Long orderInfoId, Channel channel, Message message)throws Exception{ //手动的确认消息 channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); }