首页 > 其他分享 >RabbitMQ

RabbitMQ

时间:2022-12-01 16:57:49浏览次数:53  
标签:queue 消费 队列 RabbitMQ 发送 消息

RabbitMQ

消息队列基础知识

消息队列概念

MQ(Message Queue)消息队列,是基础数据结构中先进先出的一种数据机构。指把要传输的数据(消息)放在队列中,用队列机制来实现软件之间的通信(生产者产生消息并把消息放入队列,然后由消费者去处理)。消费者可以到指定队列拉取消息,或者订阅相应的队列,由MQ服务端给其推送消息。

消息队列优点

优点 说明
异步处理 相比于传统的串行、并行方式,提高了系统吞吐量,媲美多线程。
应用解耦 系统间通过消息通信,不用关心其他系统的处理。就是一个系统或者一个模块,调用了多个系统或者模块,互相之间的调用很复杂,维护起来很麻烦。但是其实这个调用是不需要直接同步调用接口的,如果用MQ给它异步化解耦(需要发送消息给其他系统时无需修改代码,断开消息接收即可)。
流量削锋 可以通过消息队列长度控制请求量,可以缓解短时间内的高并发请求。减少高峰时期对服务器压力,主要是还是来自于互联网的业务场景,例如,马上即将开始的春节火车票抢购,大量的用户需要同一时间去抢购,以及大家熟知的阿里双11秒杀,这个时候在同一时间点请求来的太猛烈,服务器受不住。削峰,等于是减缓服务器压力,可以以漏斗或者水管的方式来进行消费。
日志处理 解决大量日志传输。
消息通讯 消息队列一般都内置了高效的通信机制,因此也可以用在纯的消息通讯。比如实现点对点消息队列或者聊天室、短信发送等。

消息队列缺点

缺点 说明
系统可用性降低 任何添加中间件处理业务,都需要单独去保证该中间件的运行
系统复杂度提高 需要考虑中间件带来的一系列问题,如一致性问题、保证消息不被重复消费、保证消息可靠性传输等
一致性问题 消息丢失或者接受者未收到消息,从而没有执行对应的方法

消息队列对比

效率:Kafka(卡夫卡,大数据使用)> RabbitMQ(有交换机)> ActiveMQ

安全:Kafka(卡夫卡,大数据使用)< RabbitMQ(有交换机)< ActiveMQ

image-20221017144318847

RabbitMQ可视化界面

界面各功能详解:https://juejin.cn/post/7051469607806173221#heading-21

RabbitMQ7种工作模式

image-20221017143922217

常见术语

术语 说明
Broker 简单来说就是消息队列服务器实体
Exchange 消息交换机,它指定消息按什么规则,路由到哪个队列
Queue 消息队列载体,每个消息都会被投入到一个或多个队列
Binding 绑定,它的作用就是把exchange和queue按照路由规则绑定起来
Routing Key 路由关键字,exchange根据这个关键字进行消息投递
VHost vhost可以理解为虚拟broker ,即mini-RabbitMQ server。其内部均含有独立的queue、exchange和binding等,但最最重要的是,其拥有独立的权限系统,可以做到vhost范围的用户控制。当然,从RabbitMQ的全局角度,vhost 可以作为不同权限隔离的手段(一个典型的例子就是不同的应用可以跑在不同的vhost中)
Producer 消息生产者,就是投递消息的程序
Consumer 消息消费者,就是接受消息的程序
Channel 消息通道,在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务

简单模式

在这里插入图片描述

简单模式(simple):生产者往队列添加数据,消费者从队列拿数据,如果业务场景确实这么简单,还可以使用Redis的集合来代替,减少整个系统的复杂度,系统越简单问题越少。隐患是消息可能没有被消费者正确处理,已经从队列中消失了,造成消息的丢失,这里可以设置成手动确认(ack),但如果设置成手动确认,处理完后要及时发送确认消息给队列,否则会造成内存溢出,手动触发告诉消息队列消息已经被消费需要清除该消息。这个时候使用的其实是默认的直连交换机(DirectExchange),DirectExchange的路由策略是将消息队列绑定到一个DirectExchange上,当一条消息到达DirectExchange时会被转发到与该条消息routing key相同的Queue上,例如消息队列名为hello-queue,则routingkey为hello-queue的消息会被该消息队列接收。

工作模式

在这里插入图片描述

工作模式(work):一个队列由多个消费者共享,如果消费者处理速度落后于生产者,可以不断扩充消费,提高消息的处理能力。注意:这种模式队列的数据一旦被其中一个消费者拿走,其他消费者就不会再拿到。隐患是高并发情况下,默认会产生某一个消息被多个消费者共同使用,可以设置一个开关(syncronize)保证一条消息只能被一个消费者使用。

发布订阅模式

在这里插入图片描述

发布订阅模式(publish/subscribe):生产者将消息发送交换机,交换机在将消息发给N个队列,消费者连到响应队列取消息即可,此功能比较适合将某单一系统的简单业务数据消息广播给所有接口。其实发布模式也能结合工作模式一起使用,即多个消费者监听同一个队列,则消息会在消费者之间轮询消费。

路由模式

在这里插入图片描述

路由模式(routing):两个消费者,可以更改生产者的routingKey观察消费者获取数据的变化。从观察结果可以看到,生产者的routingKey和消费者指定的routingKey完全一致,消费者才能拿到消息。其实也能实现发布模式,即多个队列设置相同的routingKey,则可以拿到相同的消息。根据业务功能定义路由字符串(orange、black、green等),从系统的代码逻辑中获取对应的功能字符串,将消息任务扔到对应的队列中。

主题模式

在这里插入图片描述

主题模式,路由模式的一种(topic):主题模式从使用上看,就是支持ANT,用*代表一个词,#代表多个词,否则就是精确匹配。如:inform.#号可以匹配inform.email.message;而inform.*号则不能,但是它能匹配inform.email。

RPC模式

image-20221017171243724

首先Client发送一条消息,和普通的消息相比,这条消息多了两个关键内容:一个是correlation_id,这个表示这条消息的唯一id,还有一个内容是reply_to,这个表示消息回复队列的名字。Server从消息发送队列获取消息并处理相应的业务逻辑,处理完成后,将处理结果发送到reply_to指定的回调队列中。Client从回调队列中读取消息,就可以知道消息的执行情况是什么样子了,这种情况其实非常适合处理异步调用。

发布确认模式

1)单个确认模式:每发送一条消息,确认一次,发布同数量消息,其耗时最长。

2)批量确认模式:每发送一部分消息,批量同步确认一次,若有消息无法发出,该模式无法确认是哪个消息无法发送。

3)异步批量确认模式:批量异步确认,该模式性能最好,在有错误情况下很好处理。

RabbitMQ问题合集

保证消息的可靠性

消息不可靠的情况可能是消息丢失,劫持等原因,丢失又分为:生产者丢失消息、消息列表丢失消息、消费者丢失消息。生产者消息没到交换机或交换机没有把消息路由到队列都相当于是生产者弄丢消息。RabbitMQ宕机导致队列、队列中的消息丢失,相当于RabbitMQ列表弄丢消息。消费者消费出现异常,业务没执行,相当于消费者弄丢消息。

生产者丢失消息

1)开启事务机制:transaction机制就是说发送消息前,开启事务,然后发送消息,如果发送过程中出现什么异常,事务就会回滚,如果发送成功则提交事务。然而,这种方式有个缺点:吞吐量下降。

2)发送方确认机制:发布确认模式用的居多,一旦channel进入confirm模式,所有在该信道上发布的消息都将会被指派一个唯一的ID(从1开始),一旦消息被投递到所有匹配的队列之后;RabbitMQ就会发送一个ACK给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达目的队列了;如果RabbitMQ没能处理该消息,则会发送一个Nack消息给你,你可以进行重试操作。这个又称为消息重发机制,RabbitMQ提供了确认和回退机制,有一个异步监听机制,每次发送消息,如果成功或未成功发送到交换机都可以触发一个监听,从交换机路由到队列失败也会有一个监听,只需要开启这两个监听机制即可。实际上一些企业并不会在这两个监听里面去做重发,因为成本太高了,首先RabbitMQ本身丢失的可能性就非常低,其次如果这里需要落库再用定时任务扫描重发还要开发一堆代码,分布式定时任务等。再其次定时任务扫描肯定会增加消息延迟,不是很有必要。真实业务场景是记录一下日志就行了,方便问题回溯,顺便发个邮件给相关人员,如果真的极其罕见的是生产者弄丢消息,那么开发往数据库补数据就行了。

消息队列丢失消息

处理消息队列丢数据的情况,一般是开启持久化磁盘的配置。这个持久化配置可以和confirm机制配合使用,你可以在消息持久化磁盘后,再给生产者发送一个Ack信号。这样,如果消息持久化磁盘之前,RabbitMQ阵亡了,那么生产者收不到Ack信号,生产者会自动重发。持久化流程:首先将queue的持久化标识durable设置为true,则代表是一个持久的队列,发送消息的时候将deliveryMode=2,这样设置以后,即使RabbitMQ挂了,重启后也能恢复数据。一般来说在实际业务中持久化是必须开的。

消费者丢失消息

RabbitMQ给我们提供了消费者应答(ack)机制,默认情况下这个机制是自动应答,只要消息推送到消费者就会自动ack ,然后RabbitMQ删除队列中的消息。启用手动应答之后我们在消费端调用API手动ack确认之后,RabbitMQ才会从队列删除这条消息。即执行完业务代码之后手动ack,注意开启之后必须手动确认才能消费这条消息,不然RabbitMQ就会将刚刚的消息重新放回队列中,等待下一次被消费。

重试方案

1)使用Spring自带重试机制,配置后启动项目会自动重试,这个重试是重新执行消费者方法,而不是让RabbitMQ重新推送消息,注意一定要手动throw一个异常,因为SpringBoot触发重试是根据方法中发生未捕捉的异常来决定的。

2)业务重试主要是针对消息没有到达交换器的情况。流程大致如下:首先创建一张表,用来记录发送到中间件上的消息,像下面这样:

image-20221018162612998

每次发送消息的时候,就往数据库中添加一条记录。字段简要说明:

status:表示消息的状态,有三个取值,0,1,2 分别表示消息发送中、消息发送成功以及消息发送失败。
tryTime:表示消息的第一次重试时间(消息发出去之后,在tryTime这个时间点还未显示发送成功,此时就可以开始重试了)。
count:表示消息重试次数。

在消息发送的时候,我们就往该表中保存一条消息发送记录,并设置状态status为0,tryTime 为1分钟之后。在confirm回调方法中,如果收到消息发送成功的回调,就将该条消息的status设置为1(在消息发送时为消息设置msgId,在消息发送成功回调时,通过msgId来唯一锁定该条消息)。另外开启一个定时任务,定时任务每隔10s就去数据库中查询一次消息,专门去捞那些status为0并且已经过了tryTime时间记录,把这些消息拎出来后,首先判断其重试次数是否已超过3次,如果超过3次,则修改该条消息的status为2,表示这条消息发送失败,并且不再重试。对于重试次数没有超过3次的记录,则重新去发送消息,并且为其count的值+1。

当然这种思路有两个弊端:

1、去数据库走一遍,可能拖慢MQ的Qos(服务质量),不过有的时候我们并不需要MQ有很高的Qos,所以这个应用时要看具体情况。
2、按照上面的思路,可能会出现同一条消息重复发送的情况,不过这都不是事,我们在消息消费时,解决好幂等性问题就行了。

可靠性总结

消息可靠性本身就是无法保证的,所谓的各种可靠性机制只是为了以后消息丢失提供可查询的日志而已,不过通过这些机制耗费一些巨大成本的确是能够缩小消息丢失的可能性。重试机制一般也无法实际解决问题,一般会采用下面方案进行处理更加友好:

1)当消费失败后将此消息存到Redis,记录消费次数,如果消费了三次还是失败,就丢弃掉消息,记录日志落库保存。
2)直接填false ,不重回队列,记录日志、发送邮件等待开发手动处理。
3)不启用手动ack(异常可重试),使用SpringBoot提供的消息重试。

保证消息的顺序性

RabbitMQ可能存在的顺序错乱的问题

image-20221019110030012

其实队列本身是有顺序的,但是生产环境服务实例一般都是集群,当消费者是多个实例时,队列中的消息会分发到所有实例进行消费(同一个消息只能发给一个消费者实例),这样就不能保证消息顺序的消费,因为你不能确保哪台机器执行消费端业务代码的速度快。所以对于需要保证顺序消费的业务,我们可以拆分多个消息队列,每个消息队列对应同一个消费者,只部署一个消费者实例,然后设置RabbitMQ每次只推送一个消息,再开启手动ack即可。这样RabbitMQ每次只会从队列推送一个消息过来,处理完成之后我们ack回应,再消费下一个,就能确保消息顺序性。方式二可以设置一个queue对应一个consumer,然后这个cunsumer内部用内存队列做排队,然后分发给底层不同的work来处理。

image-20221019110203871

防止消息被重复消费

正常情况下,消费者在消费消息的时候,消费完毕后,会发送一个确认消息给消息队列,消息队列就知道该消息被消费了,就会将该消息从消息队列中删除;但是因为网络传输等等故障,确认信息没有传送到消息队列,导致消息队列不知道自己已经消费过该消息了,再次将消息分发给其他的消费者。我们解决消息重复消费有两种角度,第一种就是不让消费端执行两次,第二种是让它重复消费了,但是不会对我的业务数据造成影响就行了。

确保消费端只执行一次(接口幂等性)

一般来说消息重复消费都是在短暂的一瞬间消费多次,我们可以使用Redis将消费过的消息唯一标识存储起来,然后在消费端业务执行之前判断Redis中是否已经存在这个标识。举个例子,订单使用优惠券后,要通知优惠券系统,增加使用流水。这里可以用订单号+优惠券id做唯一标识。业务开始先判断Redis是否已经存在这个标识,如果已经存在代表处理过了。不存在就放进Redis设置过期时间,执行业务。

Boolean flag =  stringRedisTemplate.opsForValue().setIfAbsent("orderNo+couponId");
    //先检查这条消息是不是已经消费过了
    if (!Boolean.TRUE.equals(flag)) {
        return;
    }
    //执行业务...
    //消费过的标识存储到Redis,10秒过期
    stringRedisTemplate.opsForValue().set("orderNo+couponId","1", Duration.ofSeconds(10L));

允许消费端执行多次,保证数据不受影响

数据库唯一键约束:如果消费端业务是新增操作,我们可以利用数据库的唯一键约束,比如优惠券流水表的优惠券编号,如果重复消费将会插入两条相同的优惠券编号记录,数据库会给我们报错,可以保证数据库数据不会插入两条。

数据库乐观锁思想:如果消费端业务是更新操作,可以给业务表加一个version字段,每次更新把version作为条件,更新之后version+1。由于MySQL的innoDB是行锁,当其中一个请求成功更新之后,另一个请求才能进来,由于版本号version已经变成2,必定更新的SQL语句影响行数为0,不会影响数据库数据。

防止消息堆积/积压

所谓消息积压一般是由于消费端消费的速度远小于生产者发消息的速度,导致大量消息在RabbitMQ的队列中无法消费。可以对生产者发消息接口进行适当限流(不太推荐,影响用户体验)、多部署几台消费者实例(推荐)或适当增加prefetch的数量,让消费端一次多接受一些消息(推荐,可以和第二种方案一起用)。其实主要的方式就是临时紧急扩容,将积压的消息消费掉,如果是已过期的消息,则后期查询缺失的部分,等空闲的时候再放入MQ进行数据补充。

RabbitMQ集群高可用

单机模式

单机模式:就是Demo级别的,一般生产都不用单机模式。

普通集群模式

普通集群模式:意思就是在多台机器上启动多个RabbitMQ实例,每个机器启动一个。创建的queue,只会放在一个RabbitMQ实例上,但是每个实例都同步queue的元数据(元数据可以认为是queue的一些配置信息,通过元数据,可以找到queue所在实例)。你消费的时候,实际上如果连接到了另外一个实例,那么那个实例会从queue所在实例上拉取数据过来。这方案主要是提高吞吐量的,就是说让集群中多个节点来服务某个queue的读写操作。

镜像集群模式

镜像集群模式:这种模式,才是所谓的RabbitMQ的高可用模式。跟普通集群模式不一样的是,在镜像集群模式下,你创建的queue,无论元数据还是queue里的消息都会存在于多个实例上,就是说,每个RabbitMQ节点都有这个queue的一个完整镜像,包含queue的全部数据的意思。然后每次你写消息到queue的时候,都会自动把消息同步到多个实例的queue上。RabbitMQ有很好的管理控制台,就是在后台新增一个策略,这个策略是镜像集群模式的策略,指定的时候是可以要求数据同步到所有节点的,也可以要求同步到指定数量的节点,再次创建queue的时候,应用这个策略,就会自动将数据同步到其他的节点上去了。这样的好处在于,你任何一个机器宕机了,其它机器节点还包含了这个queue的完整数据,别的consumer都可以到其它节点上去消费数据。坏处在于,第一,这个性能开销太大,消息需要同步到所有机器上,导致网络带宽压力和消耗很重。RabbitMQ一个queue的数据都是放在一个节点里的,镜像集群下,也是每个节点都放这个queue的完整数据。

标签:queue,消费,队列,RabbitMQ,发送,消息
From: https://www.cnblogs.com/xdzy/p/16941912.html

相关文章

  • lavinmq & rabbitmq压测对比
    环境准备docker-compose文件version:'3'services:lavinmq:image:cloudamqp/lavinmq:1.0.0-beta.8volumes:-./mq:/var/li......
  • RabbitMQ 3.7.18安装延时插件rabbitmq-delayed-message-exchange后启动报错
    环境:安装请参考:https://blog.csdn.net/qq_37487520/article/details/126079651https://www.cnblogs.com/myifb/articles/16638251.html启用插件时,不用写插件版本号:r......
  • debian 安装 RabbitMQ
    首先安装Erlang环境apt-getinstallerlang检查安装是否成功erl搜索apt下是否有rabbitmq-serverapt-cachesearchrabbitmq-server 安装apt-getinstall......
  • springboot消息之使用RabbitTemplate给rabbitmq发送和接收消息&序列化机制
    1-引入spring-boot-starter-amqp2-application.yml配置3-测试RabbitMQ  1--AmqpAdmin:管理组件  2--RabbitTemplate:消息发送处理组件======================......
  • springboot消息之RabbitMQ简介
    RabbitMQ是一个有erlang开发的AMQP(AavancedMessageQueueProtocol)的开源实现核心概念:Publisher :消息的生产者,也是一个向交换器发布消息的客户端应用程序Mess......
  • rabbitmq的consumer_timeout修改
    问题:项目中使用了rabbitmq来做异步任务,最近突然发现一个耗时比较长的任务一直在重复执行。排查:排查日志后发现了超时,channel关闭。如果超过了consumer_timeout时间默认......
  • rabbitmq
    title:rabbitmqdate:2022-11-2919:02:37tags:使用pom<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp......
  • Rabbitmq #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue ‘
    #method<channel.close>(reply-code=404,reply-text=NOT_FOUND-noqueue‘myQueueDirect‘invhost‘/‘是因为消息发布者queueDeclare方法中传递的exclusive参数传......
  • RabbitMQ帮助类
    一、RabbitMQHelper///<summary>///RabbitMQHelper的摘要说明///</summary>publicclassRabbitMQHelper{//主机privatere......
  • RabbitMQ基础
    1.初识MQ1.1.同步和异步通讯微服务间通讯有同步和异步两种方式:同步通讯:就像打电话,需要实时响应。异步通讯:就像发邮件,不需要马上回复。是你却不能跟多个人同时通话。......