总结:
-
为什么使用消息队列?
异步、解耦、削峰。 -
消息队列有什么缺点?
可用性降低、系统复杂度提高、一致性问题。 -
如何保证消息队列的可用性?
镜像集群模式(RabbitMQ),主从复制(Kafka)。 -
如何避免消息重复消费?
幂等性。 -
如何避免消息丢失,保证消息的可靠传输?
生产者的confirm
模式,消息队列服务器的持久化功能,消费者的手动ack
。 -
如何保证消息的顺序处理?
一个queue
对应一个consumer
。
1 为什么使用消息队列?
异步、解耦、削峰。
当别人问你为什么要使用消息队列时,其实就是问你消息队列有什么功能。
日常项目中,使用消息队列主要是为了使用它的以下核心功能:
- 异步:优化不必实时处理的耗时操作,提高系统处理性能。
- 解耦
- 削峰
当日常项目中遇到了这三个需求,都可以使用消息队列。我们使用消息队列,也主要是为了解决以上三个业务问题。
所以:消息队列 <==> 异步/解耦/削峰。
1.1 异步
消息队列异步处理:耗时操作&不要求实时处理
当某个业务场景需要经过多个步骤处理时,如果其中某个步骤的处理过程十分耗时,并且不要求实时完成,我们通常会考虑异步处理。
举个简单例子,某个业务需要经过3个步骤,step1()
和step2()
都耗时300ms,step3()
耗时3ms,并且step3()
依赖step1()
的返回值。此时,我们可以考虑将step1()
和step2()
进行异步处理。:
public void service() {
Object res1 = step1(); // 300ms
step2(); // 300ms
step3(res1); // 3ms
}
由于step1()
会影响其他步骤的执行顺序,它和step3()
有先后顺序,因此不能简单使用消息队列异步处理。
而step2()
与其他步骤没有强烈顺序关系,如果该步骤不要求实时执行,则可以使用消息队列进行异步处理:
public void service() {
Object res1 = step1(); // 300ms
sendMessageToStep2(); // 3ms
step3(res1); // 3ms
}
此时,该业务处理耗时由最初的603ms
优化到了306ms
,大大提高了系统的性能。
1.2 解耦
消息队列充当中间层,解决多个系统间的强耦合问题。
假设存在多个系统,其中系统A在业务处理过程中需要通知其他系统,可以使用强耦合的方式:
public void notifyOtherSystem() {
notifySystemB();
notifySystemC();
notifySystemD();
}
对于其他系统来说,它们需要各自定义接收通知的方法。对于系统A来说,它需要针对每一个接收通知的系统定义一个通知方法。在这个过程中,还会涉及到系统间的对接。有过相关经历的人都会知道,这个过程十分耗时耗力。
如果系统B不需要接收通知了,或者新增系统E需要接收通知。那么对于这些系统的维护人员来说,都是十分崩溃的过程。
对于这种业务场景,我们可以使用消息队列的发布订阅功能进行优化。
系统A可以定义统一的通知消息,发送到消息队列中:
public void sendToMq() {
// 发送统一格式的消息
}
其他系统如果需要接收通知,它可以主动监听。如果不需要接收通知,也可以主动断开监听。例如,系统E需要监听消息,它可以在自己系统中新增监听方法:
public void listener(Object msg) {
// 系统E针对通知的特殊处理
}
此时,消息队列作为中间层,将系统A和其他系统进行了解耦。每个其他系统都可以在单个系统的最小维度进行监听或取消监听消息,大大降低了繁琐的重复开发步骤。
1.3 削峰
大量请求分时延后处理。
消息队列削峰功能其实是对其异步功能的一种应用,它也需要满足异步处理的基础要求:不需要实时处理。
考虑在系统访问高峰期,同一时间内触发大量请求,如果系统处理线程资源耗尽,则会导致系统奔溃。
此时,如果使用消息队列将异步请求延后处理,则可以将该高访问时间段的业务处理,平摊到后续的请求低峰期,大大提高了系统的稳定性。
2 消息队列有什么缺点?
可用性降低、系统复杂度提高、一致性问题
消息队列主要有以下缺点,需要我们采取不同的策略进行处理:
- 可用性降低。
- 系统复杂度提高。
- 一致性问题。
2.1 可用性降低
由于引入的消息队列作为第三方系统,我们需要考虑到消息队列宕机的可能性。如果消息队列宕机了,就会造成整体系统的可用性降低。
实际上,每种消息队列产品都会提供高可用的功能。
RabbitMQ可以使用镜像集群模式来保证高可用性,通过集群方式对多个消息队列服务器进行部署,实例之间会对消息进行同步,因此每个实例都会保存完整消息。如果其中某些实例宕机了,还有其他实例保存消息数据。
Kafka的架构本身就会使用多个服务器进行部署,消息会分布到不同实例(数据分区)之中,每个实例(数据分区)存放一部分数据。Kafka会使用主从复制的模式保证高可用性,即每个数据分区可以有一个leader
和多个follwer
,leader
负责跟生产者和消费者打交道,并且会将操作结果同步到follwer
中。如果某个leader
宕机了,那么该数据分区可以通过某种策略选取一个follwer
作为新的leader
。因为leader
和follwer
之间保持着数据同步,因此提高了消息队列的可用性。
2.2 系统复杂度提高
由于引入了消息队列,系统中会增加对消息处理的业务,所以需要考虑到消息处理可能引发的问题:
- 消息重复消费。
- 消息丢失。
- 消息的顺序性。
2.2.1 如何避免消息重复消费?
在解决消息重复消费问题前,我们要先考虑:为什么会有消息重复消费?
主要有两种情况造成重复消费:
- 消息重复发送
- 消息消费后未确认
出现消息重复发送的情况,一方面可能是网络问题,例如RabbitMQ接收到消息后,由于网络问题未能及时发送确认,生产者重复发送消息。另一方面可能是业务问题,例如使用定时任务轮询,某个任务处理未完成,新一轮的轮询造成消息重复发送。
如果消息消费后没有发送确认消息,那么该消息会被重新入队,发送给下一个消费者。如果始终无法处理,则会造成消息堆积。
为了避免消息重复消费,需要根据业务的实际情况进行考虑。但是最根本的是要保证消息处理的幂等性。
所谓幂等性,简单来说就是保证多次处理的结果都是一致的。
例如,我们需要接收消息来发送短信,那么需要判断该短信是否已经发送过,保证重复消息只发送一次。
2.2.2 如何避免消息丢失(可靠传输)?
消息队列在使用过程中,消息会经过以下流程:生产者 → 消息队列服务器 → 消费者。
消息在传递过程中可能会在以下步骤中丢失:
- 生产者发送消息丢失。
- 消息队列服务器丢失消息。
- 消费者接收消息丢失。
生产者发送消息时,由于网络问题,可能没能将消息送达消息队列服务器,造成消息丢失。RabbitMQ主要通过confirm
机制避免生产者发送消息过程的丢失。生产者发送消息后,消息队列服务器需要响应confirm
,如果生产着没有接收到ack
,则会进行重复发送。
消息在发送给消费者之前,会先在消息队列服务器中存储。一般来说,消息会存储在内存中,但是如果消息队列服务器宕机了,就会造成消息丢失。RabbitMQ通过持久化功能来避免消息丢失:在创建queue
时将其设置为持久化,会将队列信息持久化到磁盘;发送消息时设置消息为持久化,会将消息持久化到磁盘。如果消息队列宕机了,重启后会从磁盘中恢复数据。持久化可以和confirm
机制相结合,只有消息被持久化到磁盘后,才发送confirm
响应,避免在持久化前宕机造成消息丢失的特殊情况。
消费者接收消息后,如果在处理过程中发生了异常,没能成功处理,就会造成业务上的消息丢失。RabbitMQ通过ack
机制避免消费者的消息丢失。消息队列服务器发送消息给消费者,消费者需要响应ack
,如果消息队列服务器没有接收到ack
,会将该消息重新入队,重新发送给消费者。需要注意的时,消费者通常默认会自动ack
,我们需要关闭该功能,在处理过程中进行手动ack
。
2.2.3 如何保证消息的顺序性?
一个
queue
对应一个consumer
。
所谓消息的顺序性,指的是消息按顺序由发送者发送到消息队列服务器后,业务逻辑中会按照原有顺序进行处理。
由于消息在消息队列服务器中是按照顺序存储的,为了保证消息的顺序处理,我们就需要保证消费者按顺序获取消息,这就需要保证一个queue
对应一个consumer
。
2.3 一致性问题
消息队列的一致性问题主要包括:
- 消息发送的一致性问题
- 消息处理的一致性问题
2.3.1 消息发送的一致性问题
消息发送一致性是指产生消息的业务和发送消息的步骤要保持一致。
如果业务操作成功了,那么消息就一定要发送;如果业务操作失败了,那么消息就一定不能发送。
《大型网站系统与Java中间件实践》中提供了一种解决方案:
- 业务处理应用首先将消息发给消息中间件,标记消息的状态为待处理。
- 消息中间件收到消息后,把消息存储在消息存储中,并不投递该消息。
- 消息中间件返回消息处理的结果(仅是入库的结果),结果是成功或者失败。
- 业务方收到消息中间件返回的结果并进行处理:
- 如果收到的结果是失败,那么就放弃业务处理,结束。
- 如果收到的结果是成功,则进行业务自身的操作。
- 业务操作完成,把业务操作的结果发送给消息中间件。
- 消息中间件收到业务操作结果,根据结果进行处理:
- 如果业务失败,则删除消息存储中的消息,结束。
- 如果业务失败,则更新消息存储中的消息状态为可发送,并且进行调度,进行消息的投递。
可以封装一个方法来统一处理:
public Result sendMessage(Message msg, Callback callback) {
// 发送消息给消息中间件
// 获取返回结果
// 如果失败,返回失败
// 进行业务操作
// 获取业务操作结果
// 发送业务操作结果给消息中间件
// 返回处理结果
}
在这个过程中,除了最后一步发送业务操作结果给消息中间件
发生异常时会造成业务处理成功但消息未发送的不一致问题,前面几个步骤都没有问题。
为了解决这个问题,需要消息中间件主动请求业务系统,获取待处理消息的业务操作结果,可能是成功/失败/处理中。
2.3.2 消息处理的一致性问题
如果系统A业务处理成功后发送消息,多个系统监听该消息进行业务处理,其中某些系统操作成功,但是某些系统操作失败,这就会造成消息处理的不一致。
这就需要开发人员对业务处理流程进行详细的设计了。
3 如何解决消息堆积问题?
消息堆积一般都是consumer
的问题,所以根本上要查找消费者业务处理的bug
。
但是为了快速处理堆积的消息,需要进行特殊处理:
- 修复
consumer
的bug
。 - 重启所有
consumer
。 - 临时部署多个
consumer
,快速处理堆积的消息。 - 处理完成后,停止临时部署的
consumer
,恢复成原先的架构。