常用交换机
DirectExchange直连交换机消费方式为一对一,即每个消息只会被消费一次,当有多个消费者时,消费方式为轮询。
TopicExchange主题交换机,可以绑定一个路由,路由可以是固定的也可以是通配符,当发送消息的路由同时满足时,都可以收到消息,多个消费者时,消费方式为轮询。
FanoutExchange扇形交换机,一个交换机绑定多个队列,多个队列都会消费消息,多个消费者时,消费方式为轮询。
消息确认机制
服务端推送消息的四种情况:
1. 消息推送到server,但是没有找到交换机,触发ConfirmCallback回调。
2. 消息推送到server,找到了交换机,但是没有找到队列,触发ConfirmCallback和RetrunCallback两个回调函数。
3. 消息推送到server,交换机和队列都找不到,触发ConfirmCallback回调。
4. 消息推送成功,触发ConfirmCallback回调。
消费者的消息确认机制:
1. 自动确认,AcknowledgeMode.NONE,消息成功发出后就认为是正确处理,不管消费者是否成功处理,所以当消费者出现异常时,就相当于丢失了消息。
2. 手动确认,消费者收到消息后,手动调用basic.ack/nackreject后,rabbitmq收到消息后,才认为是正确处理
消息队列的应用场景
异步处理:发送短信、发送消息等与主业务无关联的功能
应用解耦:可以减小应用之间的耦合
流量削锋:如果平台某一时间流量很大时,可能会导致平台挂掉。可以放入消息队列中,并控制访问个数,缓解平台压力。
日志处理:比如ELK,kafka用来接收用户日志,logstash用来解析日志统一转为JSON格式,ES用来存储日志并提供查询功能。
消息通讯:比如聊天室,点对点通讯,两个用户在同一个队列中进行消息发送;聊天室通讯,多个客户端订阅同一个主题后,消息会同时发送给多个客户端。
Rabbitmq构造
1. 生产者:生产消息
2. 消费者:接收消息
3. Broker:服务器实体
4. Queue:消息队列,用来存放消息,一个消息可进行一个或多个队列中,多个消费者订阅同一个队列,会轮询消费消息。
5. Exchange:交换机,接收生产者发送的消息,根据路由将消息存放到队列中。
6. Routing key:路由关键字,用来指定这个消息的路由规则,需要交换机来将路由和队列绑定。
AMQP的理解
Amqp协议是高级消息队列协议,就像http协议一样,只是amqp协议是用来规范消息队列的,涵盖了消息队列各个组件的定义,以及消息发送消费流程。
消息是如何路由的
消息生产者发送消息并携带路由到交换机,由交换机去匹配路由和队列的绑定key,匹配到则将消息发送到队列中,常用的有三种模式:
Fanout:无绑定key,收到消息后,会广播到绑定的所有队列。
Direct:收到消息后,只有路由完全匹配的,才会将消息发送到相应的队列。
Topic:收到消息后,路由完全匹配或者匹配通配符,就会将消息发送到相应队列。
死信交换机(延迟队列)
专门用来处理死了的信息,被拒绝可以重新投递的消息不算,消息变成死信一般是一下三种情况:
1. 消息被拒绝,并且设置requeue参数为false
2. 消息过期,默认没有过期时间,但是可以设置
3. 队列达到最大长度
消息传输的模式有哪些
1. 简单模式:一个生产者、一个消费者和一个队列,一对一的关系。
2. 工作模式:一个生产者、多个消费者和一个队列,消费者之间是竞争关系,一个消息只会被一个消费者消费。
3. 发布/订阅模式:一个生产者、一个交换机、多个队列、多个消费者,发送消息后,交换机绑定的每个队列都会收到消息,然后每个消费者都会消费。
4. 路由模式:通过路由,选择性的给多个消费者发送消息,交换机通过路由匹配绑定键,匹配成功后将消息发送到队列中。
5. 通配符模式:和路由模式差不多,通配符是使用*或#来匹配一个或多个单词,匹配成功发送到对应的队列中。
消息分发策略有哪些
1. 轮询分发:rabbitmq模式采用的的轮询分发。
2. 不公平分发:通过设置参数channel.basicQos(1)。
3. 预值分发:当消息被消费者接收后,但是没有确认,此时会被存储在一个未确认的缓冲区中。
Rabbitmq消息处理流程
生产者发送消息流程:
生产者连接到broker建立一个连接,并开启一个信道,然后生产者声明一个交换器,并设置相关属性,比如交换机类型、是否持久化等,然后生产者再声明一个队列,并设置相关属性,在通过绑定键将交换机和队列绑定。生产者发送消息到broker,对应的交换机根据接收到的路由去匹配队列的绑定键,如果一致则将消息存入队列中,如果没有匹配到,则根据生产者配置的属性选择丢弃还是回退给生产者。关闭信道,关闭连接。
消费者消费消息过程:
消费者连接到broker,建立一个连接,并开启一个信道,消费者再请求对应对应队列里的消息,消费者收到消息后,确认接收到的消息,从队列中删除已经被确认的消息,关闭信道,关闭连接。
如何保证消息的顺序消费
消息投放到队列中是有序的,如果只是单个消费者来消费,是不会出现顺序错乱的,但是如果有多个消费者来消费同一个队列,由于每个消费者消费消息的时间不通,导致消息未按照预期的顺序执行。解决方法为一个队列就一个消费者,消费者内部维护多个内存队列,然后根据业务关键值将消息加入到不同的队列中,然后由内存队列处理对应的业务,这样就可以保证业务中的操作时有序的,不同业务有不同的队列,不同业务可以同时执行,也保证了一定的效率。
消息队列如何保证消息的可靠性传输
消息的可靠性传输分为两个问题:消息重复消费、消息丢失。
解决消息重复消费,就是需要保证消息的幂等性问题,在mq中,消息只会被消费一次,没有重复消费的问题。
第一个消息丢失情况,可能发送在发送消息的过程中,出现网络问题导致mq接收不到消息,解决这个问题首先可以采用事务机制,在发送消息的时候实现事务机制,若是发送失败,则进行回滚并重新发送消息,但是开启了事务,发送方就需要等待事务执行完毕或者回滚,导致消息一朵,性能就会下降。更好的办法是使用mq的消息确认机制,发送方可以设置发送消息的回调方法,发送失败时,重新发送,确认机制是可以采用异步的,这样就可以在保证效率的情况下防止消息丢失。
第二个消息丢失情况,可能发生在mq,如果mq没有进行持久化,出现了宕机的情况,消息就会丢失,解决方法就是将消息持久化,这样宕机重启后消息可以回复。
第三个消息丢失情况,可能发生在消费方,和发送方丢失消息类似,解决这个问题也是使用消息确认机制。
但是解决了消息丢失问题,就会产生幂等性问题,mq发送消息给消费者后,如果消费者处理时间比较长,超过了mq的等待时间,就会认为mq发送消息失败,然后发送者重新发送消息,但是消费者可能是没有失败的,只是因为某个原因导致消费时间超过了mq的等待时间,这时候mq再次发送一次消息,就会导致重复消费。解决方法就是加一个全局id,配合redis,发送消息时在redis中添加一个全局id,消费时判断该id是否有消费记录,如果有则直接跳过,如果没有消费过,则进行处理,修改该id的消费记录,消费完之后将该id添加一个过期时间。
如何处理消息堆积情况
出现该问题的原因:
生产者生产消息的数据远远大于消费者消费消息的速度,有可能是消费者消费能力低,或因消息消费失败反反复复重新消费,或是消费端出现问题,导致不消费了或消费很慢,比如消费者依赖的一个组件挂了,导致消息消费失败。优化方法是处理bug或者优化消费者处理逻辑。
临时扩容,快速处理堆积消息:
1. 先修复消费者的问题,确保其回复速度,然后将所有消费者停掉。
2. 临时创建原先N倍的队列,并部署一个临时的分发消息的消费者,用来消费所有消息,消费后不做任何操作,只均匀的把消息写入临时建立好的N倍的队列中。
3. 然后临时启动N倍的消费者,每个消费者消费一个临时队列的数据。
4. 等堆积的消息消费完后,恢复原先部署架构。重新用原先的消费者来消费。
MQ长时间未处理导致MQ写满的问题:
如果消息都堆积在mq中,导致mq快写满了,说明是第一个方案执行太慢了,这种情况只能采用“丢弃+批量重导”的方法,首先临时写一个程序,连接到mq里面消费数据,消费一个丢一个,快速消费堆积的消息,然后在流量低峰期的时候手动查询丢失的数据重新导入mq中。
如何保证消息队列的高可用
Rabbitmq是基于主从来实现高可用的,有三种模式:单机模式、普通集群模式、镜像集群模式。
普通集群模式:通过添加节点来提高mq的吞吐量,启动多个rabbitmq实例,队列queue的数据消息只会存放在其中一个实例上,其他实例会同步queue的元数据(queue的一些配置信息,通过元数据可以找到queue所在的实例),消费的时候,如果连接到其他实例,那么该实例会把queue的数据拉取过来,也就是让多个节点来服务某个queue的读写操作。但是如果queue所在的节点宕机,其他实例就无法从那个实例拉取数据(只有持久化队列无法读写,非持久化队列访问到其他实例还是可以正常创建)。
镜像集群模式:真正的高可用模式,集群中包含一个master和多个slave,如果master挂掉,slave会按照加入时间排序,将最早的slave提升为master。每次读写都只访问master节点,处理完成后会将消息同步到各个slave实例,如果消费者与slave建立连接来访问,实质上也是从master上获取消息。但是该模式新能开销大,因为要同步消息到所有机器;非分布式,没有扩展性,如果queue的数据量大到服务器无法容纳就会出现问题。
Rabbitmq持久化机制
Rabbitmq的消息默认存放在内存上,如果节点重启消息就会丢失,所以需要对消息进行持久化处理,这样重启之后,会自动拉取持久化的数据加载到内存中。
Rabbitmq的持久化分为队列持久化、消息持久化和交换器持久化。一般设置持久化,三个都需要设置。
Rabbitmq可以对所有消息都持久化吗
不可用,因为持久化是将数据写到磁盘中,效率会比写到内存慢很多,所有持久化需要根据具体业务来设置,重要的不希望丢失的消息添加持久化。
Rabbitmq的消息应答机制
当rabbitmq向消费者发送一条消息后,会立即将该消息标记为删除,如果这是消费者挂掉,那么这个消息相当于丢失了。为了保证消息在发送过程中不丢失,rabbitmq引入了应答机制,消费者再处理完消息后,会告诉rabbitmq已经处理完了,然后rabbitmq才会将该消息删除。
自动应答就是消息发送后就删除,如果消费者出问题,就会导致消息丢失。
手动应答需要手动返回是否成功,返回成功rabbitmq才会删除消息。
Rabbitmq的消息确认机制
用来确认生产者是否将消息发送到了rabbitmq,生产者可以根据确认情况来做不同的操作,比如消息没有发送到rabbitmq,则可以重新发送,发送成功则跳过。
让你来设计一个消息队列
1. 支持动态扩展,横向纵向扩展,这样可以增加吞吐量。
2. 数据持久化,数据写到磁盘才能保证服务宕机的时候消息不会丢失,使用顺序写磁盘的方式,这样就没有磁盘读写的寻址开销。
3. 支持高可用,一个leader多个follower,leader挂掉后,follower重新选举一个leader。
标签:面试题,消费,消费者,队列,RabbitMQ,发送,交换机,理论知识,消息 From: https://blog.51cto.com/u_16441183/8925943