首页 > 其他分享 >聊聊消息队列(MQ)那些事

聊聊消息队列(MQ)那些事

时间:2022-11-13 18:12:05浏览次数:55  
标签:队列 中间件 系统 MQ 消息 聊聊 id 丢失

image

每年的双十一期间,各大电商平台流量暴增,同时,电商平台系统的负载压力也会很大。譬如订单支付的场景,每个订单支付成功后,服务器可能要完成扣减积分、扣减优惠券、扣减商品库存、发短信等一系列操作。单个用户请求,服务器处理起来并没有什么问题,但是,瞬时并发的多个请求到了服务器,数据库压力上来了,导致请求响应慢,甚至宕机。

为了解决这个问题,我们可能会想到,让数据库处理完一个请求后再处理下一个请求不就好了么。就这样,消息队列出来了。消息队列,又称为MQ(Message Queue),它实现了让多个请求以消息的形式排好队,让消息处理程序一个一个的处理,有效防止了高并发给服务器带来的压力。

消息队列应用场景

MQ典型的应用场景有异步、削峰、解耦三种。

异步
譬如说一个系统A,它有一个操作处理完自己的逻辑以后需要调用其他系统的接口,如下图:
image
这时候,代码是这样:

public class SystemA {

	@Resource
	SystemBapi systemBapi;

	@Resource
	SystemCapi systemCapi;

	@Resource
	SystemDapi systemDapi;

	public void doSomething() {

		//产生一个id
		long id = doSomethingAction();

		//调用其他系统接口  
		systemBapi.doSomething(id);
		systemCapi.doSomething(id);
		systemDapi.doSomething(id);
	}
}

上面的代码,系统A产生id的逻辑需要50ms,调用系统B的接口需要300ms,调用系统C的接口需要300ms,调用系统D的接口需要300ms。一个这样的操作就需要50+300+300+300=950ms。如果后面还要对接其他系统,这个操作的时间会更长。

如果调用其他系统接口实时性要求不高,那么,为了提高用户体验和吞吐量,调用其他系统接口的操作就可以交给MQ实现异步操作。如下图:
image
系统A执行完了以后,将id给到消息队列中,然后就直接返回了。

削峰
譬如有3台服务器组成集群,每台服务器的处理能力是1000个QPS/S,合起来就是3000个QPS/S。遇上了流量高峰,达到了5000个QPS/S,请求数量已经超过所有服务器总的处理能力,这时候就可以考虑利用MQ来控制并发数,以免服务器崩溃。

具体做法是所有请求先进入到MQ,然后每个服务器根据自己的能够处理的请求数去消费消息,也就是无论每秒多个QPS,系统只处理能力范围内的请求数,剩下的请求等有资源再去处理,这就是“削峰填谷”,如下图:
image

解耦
解耦就是降低了消息生产者与消费者的耦合度。耦合度高,程序维护起来就会很麻烦。譬如,系统A产生了一个id后,需要把id交给系统B、系统C、系统D去处理。如果由系统A直接去调用其他系统接口,系统A的程序代码需要写上调用系统B、系统C、系统D接口的代码。如果某一天系统C说不需要处理系统A的id了,让系统A不要调用系统C的接口,那系统A要改代码。又某一天系统E说我要处理系统A的id,让系统A调用系统E的接口,系统A又得改代码。系统A的程序员这样子搞烦不烦?

系统A程序员有一天开窍了,把程序里所有调用外部系统的代码都屏蔽,弄了个MQ中间件,让系统A产生id以后就给到MQ。然后发个公告告诉所有其他系统的程序员,你们谁想要我这边产生的id你们自己去MQ拿,别来烦我。image
这样一来,系统A跟其他系统就解耦了,代码也不用改来改去。

消息队列要注意的问题

问题一:可用性
MQ作为整个整个分布式架构的重要部件,如果MQ服务不可用,那整个系统都挂了。因此,MQ必须要支持集群。当下主流的MQ中间件都能够不同程度的支持集群,实现了MQ服务的高可用。

问题二:消息丢失
消息丢失有可能发生在生产者丢失消息、MQ本身丢失消息、消费者丢失消息3个方面。

  • 生产者丢失消息
    生产者丢失消息一般是在发送消息的时候出现异常(譬如网络异常),导致MQ无法接收到消息。这个问题可以采用本地消息表+回调通知+定时任务的方式解决。

    就以系统A发送消息,系统B消费消息为例,具体解决方案如下:
    1、系统A执行本地事务业务逻辑,并且往本地消息表插入一条数据(代表准备要发送的消息),消息状态为“未发送”。本地事务成功,提交保存本地数据,失败则回滚。
    2、本地事务成功后,发送消息给MQ。
    3、MQ接收到消息后,回调通知系统A,系统A把本地消息表对应的消息记录状态变为“已发送”。
    4、定时任务轮询本地消息表,超过一定时间状态为“未发送”的消息重新发送给MQ。
    5、定时任务处理超过一定次数一直发送不成功的消息告警,人工介入。

  • MQ丢失消息
    消息成功发送到MQ,是先放到内存里的,如果还没来得及给消费者消费消息,MQ服务就挂了,就会丢失消息。这个问题一方面可以做集群,但集群的数据同步也需要一定时间,如果在同步数据之前就MQ服务挂了,消息也会丢失。还有一个方法就是MQ接收到消息的同时,把消息数据持久化到磁盘,这样,MQ服务恢复的时候就可以从磁盘获取数据重新给消费者消费。可能有人会问,那消息还没来得及持久化到磁盘MQ服务就挂了咋办?如果是这样,就可以用到前面说到的本地消息表,把本地消息表里的数据重新发一遍。

  • 消费者丢失消息
    消费者从MQ拉取消息,还没来得及处理消息,消费者服务器挂了。此时,可能造成消费者丢失消息。这种情况,可以让消费者处理完消息时给MQ一个确认消息来解决。如果MQ没有收到确认消息,就会有重试的机制,最终确保消息给到消费者消费。当然了,如果重试超过一定次数,就应该告警,人工介入。

问题三:重复消费
因为在网络延迟的情况下,消息重复发送的问题不可避免的发生。譬如,生产者发送消息的时候使用了重试机制,发送消息后由于网络原因没有收到MQ的确认信息,然后又去重新发送了一次消息。但其实MQ已经接到了消息,并返回了响应,只是因为网络原因导致生产者没有收到MQ的确认信息。这种情况下,生产者的消息重试机制就会继续就这个消息重新发送,从而导致同一条消息多次发送,这样消费者也会重复消费这条消息。当然,这只是列举了一种情况,实际上还有其他情况会导致消息被重复消费。

解决重复消费的关键就是在消费者端引入幂等性机制。什么是幂等性机制呢?我们可以把它理解成,假如一个接口被重复调用,依然可以保证数据的准确性。举个例子,比如每条消息都会有一条唯一的id,消费者处理完这个消息会存储这个id,如果处理消息之前能找到这个id,就说明这条消息已经处理过了,就不做处理并且返回给MQ一个确认信息。

消息队列中间件

为什么要用消息队列中间件?自己写不行吗?我们之所以要用中间件,是因为这些中间件已经解决了很多消息队列常见的问题(高可用、消息丢失、重复消费......),而且各种中间件都有各自的特性,已经做得非常成熟了,你确定你写的有这些中间件好用吗?

目前在市面上比较主流的MQ中间件主要有,Kafka、ActiveMQ、RabbitMQ、RocketMQ 等这几种。网上找来这几个中间件的对比,如下表:

特性 ActiveMQ RabbitMQ Kafka RocketMQ
所属社区/公司 Apache Mozilla Public License Apache Apache/Ali
单机呑吐量 万级(最差) 万级 十万级 十万级(最高)
时效性 毫秒级 微秒级 毫秒级 毫秒级
可用性 高(主从) 高(主从) 非常高(分布式) 非常高(分布式)
功能特性 MQ领域功能极其完备 基础erlang开发,所以并发能力很强,性能极其好,延时很低 功能较为简单,主要支持简单的MQ功能,在大数据领域的实时计算以及日志采集被大规模使用 MQ功能比较完备,扩展性佳
消息可靠性 有较低的概率丢失数据 基本不丢 经过参数优化配置,可以做到 0 丢失 同 Kafka
事务 支持 不支持 支持 支持
broker端消息过滤 支持 不支持 不支持 可以支持Tag标签过滤和SQL表达式过滤
消息查询 支持 根据消息id查询 不支持 支持Message id或Key查询
消息回溯 支持 不支持 理论上可以支持时间或offset回溯,但是得修改代码。 支持按时间来回溯消息,精度毫秒,例如从一天之前的某时某分某秒开始重新消费消息。
路由逻辑 基于交换机,可配置复杂路由逻辑 根据topic 根据topic,可以配置过滤消费
持久化 内存、文件、数据库 队列基于内存,只能少量堆积 磁盘,大量堆积 磁盘,大量堆积
顺序消息 支持 不支持 支持 支持
社区活跃度
适用场景 主要场景就是解耦和异步调用,较少在大规模吞吐的场景中使用 数据量没有那么大,小公司 一般配合大数据类的系统来进行实时数据计算、日志采集等场景。 目前在阿里被广泛应用在订单、交易、充值、流计算、消息推送、日志流式处理、binglog分发消息等场景。

根据上表,我个人认为对性能要求比较高的,推荐选择RocketMQ,毕竟经历了多年阿里双十一极端并发的场景。如果是大数据领域的,可以选择Kafka。

标签:队列,中间件,系统,MQ,消息,聊聊,id,丢失
From: https://www.cnblogs.com/ayic/p/16886475.html

相关文章

  • rocketmq环境搭建-windows
    最近使用到了rockmetmq,官网上有快速开始的指导,简单记录下自己搭建的过程;1、先到官网下载mq:https://rocketmq.apache.org/download/,我这里下载的是:rocketmq-all-4.9.2-bin-......
  • (作者推荐)【RocketMQ入门到精通】— RocketMQ中级特性能力 | ​长轮询Pull和Push模式
    名言警句任何先进的技术均与魔法无异追本溯源【​​经历了6个月的失踪,我将带着干货终究归来!【RocketMQ入门到精通】​​】RocketMQ消费机制回顾   在众多MQ的体系中,一......
  • 编写C程序,实现链队列的下列功能: 1、设计一个虚拟界面,让用户选择操作(根据提示输入数据
    编写C程序,实现链队列的下列功能: 1、设计一个虚拟界面,让用户选择操作(根据提示输入数据)2、采用模块化编程思想,编写main函数和若干子函数(实现功能)3、队列的基本功能有:......
  • RabbitMq的学习
    RabbitMQ快速入门​MQ的应用场景MQ(消息队列)常见的应用场景解析-云+社区-腾讯云(tencent.com)RPC进程间通信(IPC,Inter-ProcessCommunication),指至少两个进程或......
  • 数据结构——链式队列
    定义特点:先进先出(FIFO)队尾:入队操作队头:出队操作.h文件typedefintdatatype;typedefstructnode{datatypedata;structnode*next;}listnode,*linklist;typed......
  • LeetCode 406.根据身高重建队列
    首先根据身高从小到大排序如果身高相等那么根据第二个值降序排序classSolution{public:vector<vector<int>>reconstructQueue(vector<vector<int>>&people){......
  • 【工具】RabbitMQ的安装
    安装安装erlang环境官网提示:https://www.erlang-solutions.com/resources/download.html安装erlang:yum-yinstallesl-erlang_23.0.2-1_centos_7_amd64.rpm检测erla......
  • MQTT Qos
    MQTTQos等级MQTT设计了一套保证消息稳定传输的机制,包括消息应答、存储和重传。在这套机制下,提供了三种不同层次QoS(QualityofService):Qos0:最多发送一次发送端发......
  • Yii2-Queue实现轻量级消息队列
    概述Yii2-Queue是Yii2官方制作的一个消息队列,提供多个缺点:Syncronous,File,DB,Redis,RabbitMQ,AMQPInterop,Beanstalk,Gearman等,使用Yii2开发的时候使用该扩展......
  • RocketMQ 在物流行业的应用与运维
    本文作者:丁威-中通快递资深架构师,《RocketMQ技术内幕》作者,ApacheRocketMQ社区首席布道师,公众号「中间件兴趣圈」维护者。01物流行业的业务特点物流行业有三大业务特点:业......