基础知识
特点:
(1)服务解耦:服务之间通过消息中介(如 RabbitMQ、Kafka)进行通信,减少直接依赖。
(2)异步处理:提高当前请求速度,代价是系统调用的降速,可能带来业务不一致。
(3)削峰(流量控制):消息产生速度 > 消息消费速度(先把请求存起来)
(4)增强系统可靠性:
I. MQ 给 consumer ACK
II.MQ broker 持久化
III. 消费者给MQ ACK
应用场景:
异步处理 --> 比如一个电商系统,需要进行库存处理,后期需求加上短信通知服务,这一整套流程下来时间较长,所以将所有短信服务扔到消息队列中直接返回响应(不用及时处理的服务)。 减少请求的等待时间,还能让服务一步并发处理,提升系统总体的性能。
服务解耦 ---> 拓展订单业务(不影响自身业务)
流量控制 ---> 对象:后端业务较重,处理时间较长,不需要及时处理的服务。 使用消息队列进行缓冲,网关请求放到消息队列中,超时的请求可以直接返回错误。
-
消息队列两种模型:队列模型 和 发布/订阅模型
队列模型 ---> 每条消息只能被一个消费者消费
<!--队列模型也可以通过消息全量存储至多个队列来解决一条消息被多个消费者消费问题,但是会有数据的冗余。-->
(1)订阅模型 ---> 解决一条消息能被多个消费者消费的问题
模型将消息发送至一个Topic(主题)中,所有订阅Topic的订阅者都能消费这个消息。
(2)发布/订阅模型
概念:生产者(发送消息) ---> Producer , 消费者(消费消息) ---> Consumer,消息队列服务端 ---> Broker。
总结:
抽象理解 --->
(1)发布/订阅模型等于我们都加入了一个群聊中,我发一条消息,加入了这个群聊的人都能收到这条消息。
(2)队列模型就是一对一聊天,我发给你的消息,只能在你的聊天窗口弹出,是不可能弹出到别人的聊天窗口中的。
如果我一对一聊天对每个人都发同样的消息,也实现了一条消息被多个人消费。通过多队列全量存储相同的消息,即数据的冗余,可以实现一条消息被多个消费者消费。RabbitMQ 就是采用队列模型,通过 Exchange 模块来将消息发送至多个队列,解决一条消息需要被多个消费者消费问题。
注:RabbitMQ 采用队列模型, RocketMQ 和 Kafka 采用发布/订阅模型。
面试题
1. 如何保证消息不丢失?
消息发送一共有三个阶段,分别是生产消息、存储消息和消费消息。
1.生产消息: 生产者发送消息至 Broker ,处理 Broker 的响应,不论是同步还是异步,回调都需要做好 try-catch,如果 Broker 返回写入失败等错误消息,需要重试发送。当多次发送失败需要作报警,日志记录等。这样就能保证在生产消息阶段消息不会丢失。
2. 存储消息:
(1)消息刷盘(将消息持久化到缓存中)之后再给生产者相应,如果消息写入缓存就返回相应。如果机器突然断电,消息就会丢失,而生产者以为已经发送成功,采用幂等机制解决(无论该操作被执行多少次,其结果都是相同的,不会产生副作用 ---> 如何处理重复消息)。如果Broker是集群部署,有多副本机制,即消息不仅要写入当前Broker,还需要写入副本中,那配置成至少写入两台机子后再给生产者响应。
3.消费消息: 消费者业务逻辑处理完成之后,再给Broker相应。
总结:
(1)Producer 需要处理好Broker的相应,出错情况下利用重试、报警手段。
(2)Broker需要控制相应时间,单机情况是消息刷盘后返回相应,集群多副本情况下,至少发送两个副本及以上的情况下再返回响应。
(3)Consumer需要执行完真正的业务逻辑之后再返回相应给Broker。 具体业务细节安排,有时候消息可靠性增强,性能就下降了,比如日志的传输丢失那么一两条关系不大,因此没必要等消息刷盘后再响应。
2. 如何处理重复消息?
业务场景:
(1)Producer已经发送给Broker消息,由于网络波动,Producer没有收到Broker的响应,然后消费者又重新发送一次,就导致消息重复了。
(2)Consumer拿到消息消费了,业务逻辑已经走完,事物提交,此时需更新Consumer offset,然而由于这个消费者服务挂了,另一个消费者顶上,此时Consumer offset还没更新,于是又拿到刚才的那条消息,业务又被执行一遍,于是消息又重复了。
解决:
正常的业务而言,消息重复是不可避免的,因此选择幂等机制来处理重复消息所带来的影响,幂等可以理解为,同样的参数多次调用同一个接口和调用一次产生的结果是一致的。
update t1 set money = 150 where id = 1 and money = 100;
与上面的sql一样,做一个前置判断,也就是money = 100情况直接修改,在实际项目中,我通过做了个项目version即版本号控制,对比消息中的版本号和数据库的版本号,进行判断,有时候通过数据库的约束,比如唯一键 或者 记录关键的key,比如处理订单,记录订单ID,如果有重复消息,先判断一下这个ID是否被处理过了,如果没处理再进行下一步。
3. 如何保证消息的有序性?
全局有序 ---> 消费者是单线程消费只有一个队列,一般情况下不使用,即使是同步Mysql Binlog也只需要保证单表的消息有序即可。
部分有序 ---> 将Topic 内部划分为我们所需要的队列数,把消息通过特定的策略发送到固定的队列中,然后多消费者单线程消费制定队列。
4. 如何处理消息堆积?
业务场景:
生产者生产速度 与 消费者消费速度不匹配、消息消费失败返回重试。
解决过程:
定位消费慢的原因,如果是 bug 则处理 bug ,如果是本身消费能力较弱,我们可以优化下消费逻辑,比如之前是一条一条消息消费处理的,这次批量处理,比如数据库的插入,一条一条插和批量插效率是不一样的。假如逻辑我们已经都优化了,但还是慢,那就得考虑水平扩容了,增加 Topic 的队列数和消费者数量,
实战所遇问题:
一定要增加队列数,不然新增加的消费者是没东西消费的。一个Topic中,一个队列只会分配给一个消费者。
5. 如何写一个消息中间件?
标签:01,消费者,队列,Broker,---,面试,消费,消息 From: https://blog.csdn.net/m0_74119287/article/details/1426386091. 消息中间件主要包括:生产者、消费者、Broker、注册中心。
2. 主要思路就是:
(1)生产者生产消息,发送至 Broker,Broker暂缓消息,消费者再从 Broker 获取消息,用于消费。
(2)注册中心用于服务的发现包括:Broker 的发现、生产者的发现、消费者的发现,当然还包括下线。
(3)各模块的通信可以基于 Netty自定义协议来实现,注册中心可以利用Zookeeper、Eureka、Nacos 等。
(4)考虑扩容和整体的性能,采用分布式的思想,像 Kafka 一样采取分区理念,一个 Topic 分为多个Partition,
(5)为保证数据可靠性,采取多副本存储,即 Leader 和 Follower,根据性能和数据可靠的权衡,提供异步和同步的刷盘存储。(6)利用选举算法( Bully 算法、Raft 算法、ZAB 算法)保证 Leader 挂了之后 Follower 可以顶上,保证消息队列的高可用。
(7)为提高消息队列可靠性,利用本地文件系统来存储消息,并采用顺序写入方式来提高性能。( 可根据消息队列的特性利用内存映射、零拷贝进一步的提升性能,还可利用像 Kafka 这种批处理思想提高整体的吞吐。)