1. 消息的生产过程
消息生产过程,经历如下过程:
1.向NameServer发出获取消息Topic的路由信息的请求
2.nameServer返回该Topic的路由表以及Broker列表
3.Producer根据代码中指定的Queue选择策略,从Queue中选择一个队列,用于存储消息
4.Producer对消息做一些处理,例如消息本身超过4M进行压缩
5.Producer向选择出的Queue所在的Broker发出RPC请求,将消息发送到选择出的Queue
跟踪源码可以从org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendDefaultImpl 跟踪。
补充:
(1)路由表: 实际是一个Map, key 为Topic 名称,value 是一个QueueData 实例列表。 org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#tryToFindTopicPublishInfo 先从本地map 获取,获取不到就从nameServer 获取到之后加入到本地缓存。
(2) org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#selectOneMessageQueue 选择一个队列, 会调用到上面封装的TopicPublishInfo 去选择org.apache.rocketmq.client.impl.producer.TopicPublishInfo#selectOneMessageQueue()
(3) org.apache.rocketmq.remoting.netty.NettyRemotingClient#invokeSync 发起RPC 调用发送消息
这里会调用netty的channel 与broker建立socker连接通道,第一次建立通道然后缓存到本地,建立长连接用于发送请求(有点类似于Dubbo 的请求流程)。
2. 消息的存储
rocketMQ的消息存储在本地文件系统中,默认在当前用户目录下的store 目录中
[root@k8smaster01 rocketmq-4.9.2]# ll ~/store/
total 12
-rw-r--r-- 1 root root 0 Jul 12 06:31 abort
-rw-r--r-- 1 root root 4096 Jul 12 08:48 checkpoint
drwxr-xr-x 2 root root 34 Apr 15 03:51 commitlog
drwxr-xr-x 2 root root 280 Jul 12 08:48 config
drwxr-xr-x 25 root root 4096 Apr 29 04:06 consumequeue
drwxr-xr-x 2 root root 31 Jul 12 06:15 index
-rw-r--r-- 1 root root 4 Jul 12 06:31 lock
abort:broker启动后自动创建,正常关闭Broker该文件会消失。异常关闭该文件是存在的。
checkpoint:存放着commitlog、consumerqueue、index 文件的最后刷盘时间戳
commitlog:其中存放着commitlog文件,而消息是写在commitlog 文件中的
config:存放broker运行期间的一些配置数据
consumequeue:存放着消费者队列信息,以Topic为目录进行区分
[root@redisnode01 consumequeue]# ls
DefaultCluster_REPLY_TOPIC RMQ_SYS_TRACE_TOPIC spring-batch-order spring-string syncTopic
mySprintbootTopic1 RMQ_SYS_TRANS_HALF_TOPIC spring-delay spring-trans TRANS_CHECK_MAX_TIME_TOPIC
mytopic1 RMQ_SYS_TRANS_OP_HALF_TOPIC spring-ext-string spring-transaction-topic user-topic
OFFSET_MOVED_EVENT SCHEDULE_TOPIC_XXXX spring-receive-obj spring-user
%RETRY%spring-self-ack-consumer spring-batch spring-self-ack string-topic
[root@redisnode01 consumequeue]# ls -R syncTopic/
syncTopic/:
0 1 2 3
syncTopic/0:
00000000000000000000
syncTopic/1:
00000000000000000000
syncTopic/2:
00000000000000000000
syncTopic/3:
00000000000000000000
index: 其中存放着消息索引文件indexFile
lock: 运行期间使用到的全局资源锁
1. commitlog 文件
commitlog文件也被命名为mappedFile
[root@redisnode01 store]# ll -h commitlog/
total 264K
-rw-r--r--. 1 root root 1.0G Jul 12 08:08 00000000000000000000
commitlog 目录中存放着很多mappedFile 文件。当前broker的所有消息都是罗盘到这些文件中。 mappedFile大小为1G(小于等于1G)。 文件名称由20位十进制数组成,表示当前文件的第一条消息的起始位移偏移量。(第一个文件一定是20个0构成的,因为第一条消息的偏移量offset 为0。第n个文件的名称是前n-1 个文件大小之和。 一个Broker 中的所有mappedFile 文件的commitLog offset 是连续的)
一个broker 仅包含一个commitlog 目录,无论多少topic 都被顺序写入到mappedFile 中。也就是说消息并没有按照topic 进行分类。
mappedFile 是顺序读写的文件,所以访问效率很高。
消息单元:
mappedFile 文件由很多消息单元组成。每个消息单元包括消息长度MsgLen,消息的物理偏移量,消息体内容,消息体长度,消息生产者BornHost,消息发送时间戳,主题,队列ID,所在队列的偏移量等信息。(消息单元中包含Queue 相关属性,后续解释)
2. consumequeue
[root@redisnode01 consumequeue]# ls
DefaultCluster_REPLY_TOPIC RMQ_SYS_TRACE_TOPIC spring-batch-order spring-string syncTopic
mySprintbootTopic1 RMQ_SYS_TRANS_HALF_TOPIC spring-delay spring-trans TRANS_CHECK_MAX_TIME_TOPIC
mytopic1 RMQ_SYS_TRANS_OP_HALF_TOPIC spring-ext-string spring-transaction-topic user-topic
OFFSET_MOVED_EVENT SCHEDULE_TOPIC_XXXX spring-receive-obj spring-user
%RETRY%spring-self-ack-consumer spring-batch spring-self-ack string-topic
[root@redisnode01 consumequeue]# ls syncTopic/
0 1 2 3
[root@redisnode01 consumequeue]# ls syncTopic/1/
00000000000000000000
消费者队列目录。每个topic 在consumequeue 都会创建一个目录,目录名称为topic 名称。在该topic 目录下,会再为每个queue 建立一个目录,目录为queueId。每个目录存放着若干个consumequeue 文件, consumequeue 是commitlog 的索引文件,可以根据consumequeue 文件定位到具体的消息。
consumequeue 文件名也是20位数字构成,表示当前文件的第一个索引条目的起始偏移位移量。与mappedFile 不同的是,后续文件名是固定的,因为consumequeue的文件大小是固定不变的。
索引条目如下:
每个consumequeue 文件可以包含30W个索引条目,每个索引条目包含了三个消息的重要属性: 消息在mappedFile文件中的偏移量、消息长度、消息Tag的hashcode值。这三个属性占20 个字节,所以每个文件的大小是固定的30W*20字节。
3. indexfile
indexFile(索引文件)提供了一种可以通过key或时间区间来查询消息的方法。Index文件的存储位置是:~/store/index/{fileName},文件名fileName是以创建时的时间戳命名的,固定的单个IndexFile文件大小约为400M,一个IndexFile可以保存 2000W个索引,IndexFile的底层存储设计为在文件系统中实现HashMap结构,故rocketmq的索引文件其底层实现为hash索引。
3. 消息的写入和拉取
1. 消息的写入
消息到达Broker 之后经历下面几步然后最终被持久化:
1.broker根据queueId,获取到该消息对应索引条目要在consumequeue目录中的写入偏移量,即queueOffset
2.将queueId、queueOffset等数据与消息一起封装为消息单元
3.将消息单元写入commitlog,同时形成消息索引条目
4.将索引条目分发到相应的consumequeue
2. 消息拉取
当consumer 来拉取消息时会经历如下几步:
1.consumer 获取到其要消费消息所在的消费偏移量offset,计算出其要消费消息的消息offset(消费offset即消费进度,消息offset=消费offset+1)
2.consumer向broker发送拉取请求,其中会包含要拉取消息的queue、消息offset以及消息tag
3.broker计算在该consumequeue中的queueOffset。 queueOffset=消息offset*20字节
4.从该queueOffset处开始向后查找第一个指定Tag的索引条目
5.解析该索引条目的前8个字节,即可定位到该消息在commitlog中的commitlog offset;从对应commitlog offset 中读取消息,并发送给consumer
3. 性能提升
rocketmq无论是消息本身还是索引都是存储在磁盘上的。其通过一系列机制打打提升了性能。
首先读写操作是通过mmap 零拷贝实现的,将对文件的操作转换为直接对内存的操作。consumequeue 中的数据是顺序存放的,还引入了PageCache 的预读取机制(写的时候先写到pageCache,然后以异步方式由内核线程刷到物理磁盘; 读取的时候先读取pageCache,未命中就从物理磁盘读取并读取其相邻数据然后加载到pageCache)。
4. 消息的消费
消费者从broker获取消息的方式有两种:pull和push;消费者组对于消息消费的模式又分为两种:集群消费Clustering和广播模式Broadcasting。
1. 获取消息类型
pull类型
consumer 主动从broker拉取消息,主动权由consumer控制。一旦获取了批量消息,就会启动消费过程。有个缺点就是实时性较弱。
push模型:
broker收到消息推送给consumer,典型的发布-订阅模式,这些都是基于consumer与broker之间的长连接,长连接的维护需要消耗系统资源。也就是实时性强,但会占用较多的系统资源。
2. 消费模式
广播模式:相同consumer group 的每个consumer 实例都接收同一个Topic的全量消息。即每条消息都会被发送带ConsumerGroup 中的每个Consumer。(可以为每个consumergroup 都会收到相同的消息,多实例部署会收到一样的消息)
集群消费:集群消费模式下,相同consumer group 中的每个consumer 实例平均分摊同一个topic的消息。即每条消息只会被发送到consumer group 中的某个consumer。
两种模式的消费进度保存也略有不同:
广播模式:消息进度保存在consumer端。
集群模式:消费进度保存在broker中。保存在~/store/config/consumeroffset.json。
3. rebalance 机制
rebalance 针对的是集群消费。
rebalance 即再均衡,指的是,将一个topic下的多个queue在同一个consumer group 中的多个consumer间进行重新分配的过程。
本意是为了提升消息的并行消费能力。可以让多个消费者同时消费队列中的消息。
限制:
一个队列最多分配给一个消费者,当某个消费者组下的消费者实例数量大于队列的数量时多余的消费者实例将分配不到任何队列。
危害:
消费暂停: 发生rebalance 时,原consumer 就需要暂停部分队列的消费,等到这些队列分配给新的consumer后,这些暂停消费的队列才能继续被消费。
消费重复:consumer 在消费分类给自己的队列时,必须接着之前consumer提交的消费进度的offset继续消费。然而默认情况下,offset 是异步提交的,这个异步性导致提交的brker的offset与consumer实际消费的消息并不一致,这个不一致的差值就是可能会重复消费的消息。
消费突刺:由于rebalance 可能会导致重复消费,如果重复消费的消息过多或者由于暂停积压的消息过多,在恢复后就会瞬间消费大量的消息。
产生原因:
queue数量发生变化:broker发生变化、queue 发生变化、broker与nameserver网络抖动
消费者组中消费者的数量变化:consumer group 扩容缩容、consumer与nameserver 间网络抖动
rebalance过程:
在broker中维护着多个map集合,这些集合中动态存放着当前topic中queue 的信息、consumer group中consumer 实例的信息。一旦发现消费者所订阅的queue 数量发生变化,或者消费者组中的消费者数量发生变化,会立即向consumergroup中的每个实例发出rebalance通知。然后consumer 自己采用queue 分配算法自主进行rebalance。
4. queue 分配算法
一个topic中的queue只能由consumer group 中的一个consumer 进行消费,而一个consumer 可以同时消费多个queue 中的消息。常见的有四种策略:
假设queueId 为0-9, 消费者组为A-D, 四种策略如下:
平均分配:
avg = queuecount / consumercount; 多出的按照顺序进行逐个分配。
结果:A(012),B(345),C(67),D(89)
环形平均策略:
按照环形逐个分配
结果:A(048),B(159),C(26),D(37)
一致性hash:
该算法会将consumer的hash值作为node节点存放到hash 环上,然后将queue的hash 值也放到hash 换上,通过顺时针方向,距离queue最近的那个consumer就是该queue分配的consumer。存在的问题是: 分配不均匀。适用于consumer 数量频繁变化的场景。
同机房策略:
根据queue的部署机房位置和consumer的位置,过滤出当前consumer机房相同机房的queue。然后按照平均分配算法或者环形平均分配算法对同机房queue进行分配。如果没有同机房queue,则按照平均分配策略或环形平均策略对所有queue进行分配。
5.至少一次原则
rocketmq有一个原则: 每条消息必须要被成功消费一次, 也就是consumer消费完消息后向消费进度记录器。
消息进度记录器: 对于广播消费模式来说,consumer本身就是消费进度记录器;对于集群消费模式来说,broker 就是消费进度记录器。
5. 订阅关系一致性
订阅关系的一致性指的是,同一个消费者组(groupId相同)下所有consumer实例所订阅的topic与tag以及对消息的处理逻辑必须完全一致。否则消息的消费逻辑就会混乱,甚至导致消息丢失。
6.消费offset 管理
1.本地管理模式
广播消费时,offset采用本地模式存储。相当于每个消费者存储自己的消费进度。
默认文件再当前用户主目录下 .rocketmq_offsets/{clientId}/{group}; ${clientId} 为当前消费者id,默认为ip@DEFAULT
2.远程管理模式
集群消费模式时,存在broker端。在~/store/config/consumerOffset.json
3.offset 用途
通过该属性知道从哪里开始继续消费消息。在consumer 启动后,其要消费的第一条新消息的起始位置常有的有三种,参考下面:
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
package org.apache.rocketmq.common.consumer;
public enum ConsumeFromWhere {
CONSUME_FROM_LAST_OFFSET,
@Deprecated
CONSUME_FROM_LAST_OFFSET_AND_FROM_MIN_WHEN_BOOT_FIRST,
@Deprecated
CONSUME_FROM_MIN_OFFSET,
@Deprecated
CONSUME_FROM_MAX_OFFSET,
CONSUME_FROM_FIRST_OFFSET,
CONSUME_FROM_TIMESTAMP,
}
4.offset的同步提交与异步提交
集群消费模式下,consumer消费完消息后会向broker提交消费进度offset,其提交方式分为两种:
同步提交:消费完一批进行提交,等待broker响应后才继续消费;若没有收到就继续进行提交。会影响消息的吞吐量。
异步提交:提交后无需等待broker的响应。
7. 消费幂等
1. 消费幂等
当出现消息重复消费的情况,重复消费的结果与一次消费的结果相同,则该操作是幂等的。
2. 场景分析
发送消息重复(由于网络原因抖动消费者没有收到briker对producer的ack)
消费重复(consumer消费完给broker 发送ACK发生网络抖动,由于rocketMQ的最少消费一次原则会再次投递)
rebalance时发生重复:consumergroup中的consumer 数量发生变化或其订阅的topic的queue的数量发生变化触发rebalance
3. 通用解决方案
两要素:幂等令牌与唯一性处理、
幂等令牌:生产者和消费者约定的唯一业务标识的字符串,例如订单号、流水号
唯一性处理:服务端通过采用一定的算法策略,保证同一个业务逻辑不会被重复执行多次。比如订单只能一次支付。
8. 消息堆积与消费延迟
1. 概念
消息处理时,consumer的速度跟不上producer 的速度就会堆积消息。消息堆积进而造成消息的消费延迟。以下场景需要重点关注消息堆积和消费延迟问题:
(1)业务系统上下游能力不匹配造成的持续堆积,且无法自行恢复。
(2)业务系统对消息的消费实时性要求较高,即使是短暂的堆积造成的消费延迟也无法接受。
2. 产生原因
consumer使用长轮询pull模式消费消息时,分为两个阶段:
消息拉取:将拉取到的消息缓存到本地缓冲队列中。内网环境下会有很高的吞吐量。
消息消费:将缓存的消息提交到消费线程进行业务处理。此时消费能力完全依赖于消息的消费耗时和消费并发度。如果整体消费太慢,导致本地缓存队列达到上限就会停止从服务端拉取消息。
总结:消息堆积主要在于客户端的消费能力,由消费耗时和消费并发度决定。
3. 消费耗时和消费并发度
消费耗时:主要就是代码逻辑。一般两种:cpu计算密集、io操作密集。
并发度:并发度由单节点线程数和节点数量共同觉得。单机线程计算:
不能盲目调大线程数,设置过大反而会带来大量的线程切换的开销。单节点下最优线程数量的计算模型为:C(T1+T2)/T1=C+C(T2/T1)
C:CPU核数
T1:CPU内部逻辑计算耗时
T2:外部IO操作耗时
4. 如何避免
梳理消息的消费耗时和设置消息消费的并发度。
消费耗时:通过压测,判断代码是否存在无限循环和递归等;减少IO操作,用本地缓存规避;复杂操作能否异步处理
并发度:逐步调大consumer节点的线程数进行测试
9. 消息清理
消息被消费过后不会被清理。
消息是顺序存储在commitLog文件,且消息大小不定长,所以消息的清理不能以消息为单位进行,而是以commitlog文件为单位清理。
commitlog文件存在一个过期时间:默认为72小时。除了用户手动清理,一下情况也会自动清理,无论文件中的消息是否被消费过:
1.文件过期,且到达清理时间点(默认为凌晨4点)
2.文件过期,且磁盘空间占用率已达到过期清理警戒线(75%)
3.磁盘占用率达到清理警戒线(85%),无论是否过期都会从最老的文件开始清理
4.磁盘达到系统危险警戒线(90%),broker 拒绝消息写入
参考:https://github.com/apache/rocketmq/blob/master/docs/cn/design.md
【当你用心写完每一篇博客之后,你会发现它比你用代码实现功能更有成就感!】