消息队列的基本作用
- 解耦
- 异步
- 削峰
引入消息队列会导致什么
- 1.降低系统的可用性:系统引入的外部依赖越多,越容易挂掉
- 2.系统的复杂度变高:使用 MQ 后可能需要保证消息没有被重复消费、处理消息丢失的情况、保证消息传递的顺序性等等问题
- 3.一致性问题:A 系统处理完了直接返回成功了,但问题是:要是 B、C、D 三个系统那里,B 和 D 两个系统写库成功了,结果 C 系统写库失败了,就造成数据不一致了。
如何保证消息队列的高可用
RabbitMQ:镜像集群模式
rabbit是基于主从做高可用的,有三种模式
- 单机模式:生产环境很少用
- 普通集群模式:让集群中多节点服务某个queue的读写操作,只是提高了系统的吞吐量
- 镜像集群模式:
创建的Queue,无论元数据还是queue里的消息都会存在多个实例中,每次写消息到Queue时,都会自动和多个实例的Queue进行消息同步,这样设计的好处:
任何一个机器宕机不影响其他机器的使用。坏处在于:1. 性能开销太大:消息同步所有机器,导致网络带宽压力和消耗很重;2. 扩展性差:如果某个 Queue 负载很重,即便加机器,新增的机器也包含了这个 Queue 的所有数据,并没有办法线性扩展你的 Queue。
kafka:partition和 replica(副本)机制
Kafka 基本架构是多个 broker 组成,每个 broker 是一个节点。创建一个 topic 可以划分为多个 partition,每个 partition 可以存在于不同的 broker 上,每个 partition 就放一部分数据,这就是天然的分布式消息队列。就是说一个 topic 的数据,是分散放在多个机器上的,每个机器就放一部分数据。
Kafka 0.8 以前,是没有 HA 机制的,任何一个 broker 宕机了,它的 partition 就没法写也没法读了,没有什么高可用性可言。
Kafka 0.8 以后,提供了 HA 机制,就是 replica 副本机制。每个 partition 的数据都会同步到其他机器上,形成自己的多个 replica 副本。然后所有 replica 会选举一个 leader 出来,生产和消费都跟这个 leader 打交道,然后其他 replica 就是 follower。写的时候,leader 会负责把数据同步到所有 follower 上去,读的时候就直接读 leader 上数据即可。Kafka 会均匀的将一个 partition 的所有 replica 分布在不同的机器上,这样才可以提高容错性。
如何保证消息不被重复消费/幂等性
- 写数据时,先根据主键查询数据是否存在,如果已经存在,则更新数据
- 数据库的唯一键约束
- 写redis,分布式锁
如何保证消息不丢失
rabbitMQ:
- 生产者
- 1.开启rabbitmq事务(不推荐)太耗性能
- 2.confirm模式:异步回调 ack消息
- mq本身丢失:开启rabbitmq持久化(开启2个才行)
- 1.创建queue时将其设置为持久化,只会持久化queue的元数据
- 2.将消息的deliverymode设置为2:持久化到磁盘
- 消费端丢失:关闭rabbitmq的自动ack机制
kafka
- 生产者:设置acks =all,一定不会丢,无限重试
- kafka本身丢失
- 1.给topic设置replication.factor参数,大于1,保证每个partition必须有2个副本
- 2.在kafka服务端设置 min.insync.replicas,必须大于1,保证leader至少有一个followe保持联系
- 3.生产端设置acks=all,保证数据必须写入所有replica,才算成功
- 4.生产端设置retries=max,设置无限重试
- 消费者丢失:关闭自动提交offset
如何保证消息的顺序性
RabbitMQ:
拆分多个 Queue,每个 Queue一个 Consumer,就是多一些 Queue 而已,确实是麻烦点;或者就一个 Queue 但是对应一个 Consumer,然后这个 Consumer 内部用内存队列做排队,然后分发给底层不同的 Worker 来处理。
Kafka:
-
一个 Topic,一个 Partition,一个 Consumer,内部单线程消费,单线程吞吐量太低,一般不会用这个。
-
写 N 个内存 Queue,具有相同 key 的数据都到同一个内存 Queue;然后对于 N 个线程,每个线程分别消费一个内存 Queue 即可,这样就能保证顺序性。
大量消息在mq里长时间积压,如何解决
一般这个时候,只能临时紧急扩容了,具体操作步骤和思路如下:
-
- 先修复 consumer 的问题,确保其恢复消费速度,然后将现有 consumer 都停掉;
-
- 新建一个 topic,partition 是原来的 10 倍,临时建立好原先 10 倍的 queue 数量;
-
- 然后写一个临时的分发数据的 consumer 程序,这个程序部署上去消费积压的数据,消费之后不做耗时的处理,直接均匀轮询写入临时建立好的 10 倍数量的 queue;
-
- 接着临时征用 10 倍的机器来部署 consumer,每一批 consumer 消费一个临时 queue 的数据。这种做法相当于是临时将 queue 资源和 consumer 资源扩大 10 倍,以正常的 10 倍速度来消费数据;
-
- 等快速消费完积压数据之后,得恢复原先部署的架构,重新用原先的 consumer 机器来消费消息。
MQ中的消息过期失效怎么办
假设你用的是 RabbitMQ,RabbtiMQ 是可以设置过期时间的,也就是 TTL。如果消息在 Queue 中积压超过一定的时间就会被 RabbitMQ 给清理掉,这个数据就没了。这时的问题就不是数据会大量积压在 MQ 里,而是大量的数据会直接搞丢。这个情况下,就不是说要增加 Consumer 消费积压的消息,因为实际上没啥积压,而是丢了大量的消息。
我们可以采取一个方案,就是批量重导。就是大量积压的时候,直接丢弃数据了,然后等过了高峰期以后开始写程序,将丢失的那批数据一点一点的查出来,然后重新灌入 MQ 里面去,把丢的数据给补回来。
RabbitMQ的重要组件
-
- ConnectionFactory(连接管理器):应用程序与 rabbit 之间建立连接的管理器,程序代码中使用;
-
- Channel(信道):消息推送使用的通道;
-
- Exchange(交换器):用于接受、分配消息;
-
- Queue(队列):用于存储生产者的消息;
-
- RoutingKey(路由键):用于把生成者的数据分配到交换器上;
-
- BindingKey(绑定键):用于把交换器的消息绑定到队列上。
RabbitMQ有几种广播类型
-
- fanout:所有 bind 到此 exchange 的 queue 都可以接收消息;很像子网广播,每台子网内的主机都获得了一份复制的消息。fanout 交换机转发消息是最快的。
-
- direct:通过 routingKey 和 exchange 中的 bindingKey 决定的那个唯一的 queue 可以接收消息;
-
- topic:所有符合 routingKey 所 bind 的 queue 可以接收消息。
kafka
kafka不能脱离zookeeper单独使用,因为kafka使用zookeeper管理和协调kafka的节点服务器
kafka有两种数据保存策略:
- 按照过期时间保留
- 按照存储的消息大小保留
kafka的分区策略
所谓分区策略就是决定生产者将消息发送到那个分区的算法
- 轮询策略:(默认)能保证消息最大限度的平均分配到所有分区上
- 随机策略:实现随机策略版的partition方法
- 按消息键保序策略:(key-ordering)可以保证同一个key的所有消息都进入到相同的分区,由于分区下的消息处理是有顺序的,所以称为消息键保序策略
- 自定义分区策略: