MQ使用过程中可能出现的问题以及解决方案 一、MQ如何避免消息堆积的问题: 1)产生背景: producer发送消息的速率远大于consumer消费消息的速率,从而导致消息堆积在mq服务端中; 2)需要注意的是:rocketmq或kafka如果消息消费成功,消息是不会立即从mq服务端中被删除的;rabbitmq的消费者如果消息消费成功,消息会被立即删除; 3)解决办法: 首先,可以通过提高消费者消费速率的方法,消费者实现集群,开启多个消费者服务,只需要保证多个消费者在同一个消费组中,多个消费者不会重复消费同一条信息; 其次,消费者在mq服务端拉取信息时,应该使用批量方式获取,减少网络连接交互成本; 二、MQ如何保证消息不丢失 分布式系统中一个重要的前提假设是所有的网络传输都不可靠,在网络传输不可靠的情况下保证消息传输的可靠性,除了重试投递别无他法,大多数消息队列在消息传输上只能保证至少成功一次,而不能保证只传输成功一次(这就需要保证消息幂等问题,纺织重复消费)。由于分布式系统网络的不可靠,可能会出现消息丢失的现象,那么如何保证RocketMq最大限度的消息不丢失?那就需要从消息的产生到消息被消费整个过程来分析,消息分完整链路可分为三个阶段: 1)生产阶段(Producer):消息通过producer发送出来,通过网络传输发送到Broker存储端; 2)存储阶段(Broker):消息在broker端存储,如果是主备或者多副本,消息会在这个阶段被复制到其他节点或者副本上; 3)消费阶段(Consumer):Consumer消费端从Broker存储端拉取消息,通过网络传输发送到Consumer消费端,并通过重试来保证消息消费不丢失; 4)NameServer:它是RocketMq的服务注册中心,用来保存Broker的相关元信息并为Producer和Consumer查找Broker信息。每个Broker在启动的时候会到NameServer进行注册,Producer在发送信息前会根据Topic到NameServer获取Broker的路由信息,进而和Broker连接。Consumer也会定时根据Topic从Nameserver获取Broker的路由信息,从功能上和zookeeper有点类似;NameServer被设计成几乎无状态的,可以横向扩展,节点相互之间无通信,可集群部署;
1、发送端发送信息的可靠性:
发送端发送消息的核心逻辑如下图所示:
消息的发送方式主要有:同步发送,异步发送,单向发送,业务具体选择哪种发送方式需要根据业务场景判断:
1)同步发送(ASK):
同步发送是指发送端在发送消息时,阻塞线程进行等待,直到服务器返回发送的结果。发送端如果需要保证消息发送的可靠性,防止发送失败,可以采用同步阻塞式的发送,然后同步检查Broker返回的状态来判断消息是否持久化成功。如果发送失败则会默认重试2次,RocketMq选择至少传输成功一次的模型,因为网络传输不可靠可能出现抖动延迟,从而可能重复投递。
2)异步发送:
异步发送是producer在发送消息时,传入回调接口实现类,调用该接口后不会阻塞,发送方法会立即返回,回调任务会在另一个线程中执行,消息发送结果会回传给回调函数。具体的业务实现可以根据发送结果的信息来判断是否需要重试来保证发送消息的可靠性;
3)单向发送:
单向发送是发送消息时,不会阻塞立即返回,返回信息没有发送信息的结果状态,无法根据发送的状态来判断消息是否发送成功,单向发送相对前两种是一种不可靠的信息发送方式,有点类似于UDP通信,发送快但是安全性低,因此要保证消息发送的可靠性,不推荐采用这种方式
2、存储端消息可靠性
RocketMQ的消息存储结构如下图所示:
消息队列存储的最小单位是消息Message。
同一个Topic下的消息映射成多个逻辑队列。
不同Topic的消息按照到达broker的先后顺序以Append的方式添加至CommitLog,顺序写,随机读。
目前RocketMQ存储模型使用本地磁盘进行存储,数据写入为producer -> direct memory -> pagecache -> 磁盘,数据读取如果pagecache有数据则直接从pagecache读,否则需要先从磁盘加载到pagecache中。Broker存储节点的文件存储模式如下图所示:
Broker端CommitLog采用顺序写,可以大大提高写入效率,同时采用不同的刷盘模式提供不同的数据可靠性保证,此外采用了ConsumeQueue中间结构来存储偏移量信息,实现消息的分发。由于ConsumeQueue结构固定且大小有限,在实际情况中,大部分的ConsumeQueue 能够被全部读入内存,可以达到内存读取的速度。此外为了保证CommitLog和ConsumeQueue的一致性, CommitLog里存储了Consume Queues 、Message Key、Tag等所有信息,即使ConsumeQueue丢失,也可以通过 commitLog完全恢复出来,这样只要保证commitLog数据的可靠性,就可以保证Consume Queue的可靠性。
RocketMQ存储端采用本地磁盘进行CommitLog消息数据的存储,不可避免的就会带来存储可靠性的挑战,如何保证消息不丢失,RocketMQ消息服务一直在不断提高数据的可靠性。
1 )存储可靠性挑战
RocketMQ存储端也即Broker端在存储消息的时候会面临以下的存储可靠性挑战:Broker正常关闭
Broker异常Crash
OS Crash
机器掉电,但是能立即恢复供电情况
机器无法开机(可能是cpu、主板、内存等关键设备损坏)
磁盘设备损坏
1正常关闭,Broker 可以正常启动并恢复所有数据。2、3、4同步刷盘可以保证数据不丢失,异步刷盘可能导致少量数据丢失。5、6属于单点故障,且无法恢复。解决单点故障可以采用增加Slave节点,主从异步复制仍然可能有极少量数据丢失,同步复制可以完全避免单点问题。
这里一般来说就需要在性能和可靠性之间做出取舍,对于RocketMQ来说,Broker的可靠性主要由两个方面保障:单机的刷盘机制
主从之间的数据复制
如果设置为每条消息都强制刷盘、主从复制,那么性能无疑会降低;如果不这样设置,就会有一定的可能性丢失消息。RocketMQ一般都是先把消息写到PageCache中,然后再持久化到磁盘上,数据从pagecache刷新到磁盘有两种方式,同步和异步。整体的消息写入和读取如下图所示:
针对broker端单机存储可靠性,主要依赖单机的刷盘策略。
2 )同步刷盘
消息写入内存的 PageCache后,立刻通知刷盘线程刷盘,然后等待刷盘完成,刷盘线程执行完成后唤醒等待的线程,返回消息写成功的状态。这种方式可以保证数据绝对安全,但是吞吐量不大。
3 )异步刷盘(默认)
消息写入到内存的 PageCache中,就立刻给客户端返回写操作成功,当 PageCache中的消息积累到一定的量时,触发一次写操作,或者定时等策略将 PageCache中的消息写入到磁盘中。这种方式吞吐量大,性能高,但是 PageCache中的数据可能丢失,不能保证数据绝对的安全。
实际应用中要结合业务场景,合理设置刷盘方式,尤其是同步刷盘的方式,由于频繁的触发磁盘写动作,会明显降低性能。
4 )过期文件删除
由于RocketMQ操作CommitLog、ConsumeQueue文件是基于文件内存映射机制,并且在启动的时候会将所有的文件加载,为了避免内存与磁盘的浪费、能够让磁盘能够循环利用、避免因为磁盘不足导致消息无法写入等引入了文件过期删除机制。最终使得磁盘水位保持在一定水平,最终保证新写入消息的可靠存储。
什么时候清理物理消息文件?那消息文件到底删不删,什么时候删?
消息存储在CommitLog之后,的确是会被清理的,但是这个清理只会在以下任一条件成立才会批量删除消息文件(CommitLog):
1)消息文件过期(默认48小时),且到达清理时点(默认是凌晨4点),删除过期文件。
2)消息文件过期(默认48小时),且磁盘空间达到了水位线(默认75%),删除过期文件。
3)磁盘已经达到必须释放的上限(85%水位线)的时候,则开始批量清理文件(无论是否过期),直到空间充足。
4)注:若磁盘空间达到危险水位线(默认90%),出于保护自身的目的,broker会拒绝写入服务。
3、 消费端消息可靠性
消费者从broker拉取消息,然后进行相应的业务的消费,消费成功会返回一个消费成功的状态给broker,broker如果没收到确认信息,消费者下次拉取重新拉取该消息
consumer自身可以维护一个持久化的offset,对应MessageQueue里面的min offset,标记已经成功消费或者已经成功发回到broker的消息下标
如果consumer消费失败,会把这个消息发回给broker,发回成功后,更新自己的offset
如果发回给broker时,broker挂掉了,那么consumer也会定时重试这个操作
即使consumer和broker一起挂掉了,消息也不会丢失,因为consumer里面的offset会定时持久化,重启之后,继续拉取offset之前的消息到本地,重新消费