消费方式
并发消费:一个消费者队列中的消息可以同时被消费者的多个线程并发消费
顺序消费:一个消费队列中的消息同一时间只能被一个消费者的一个线程消费
消费流程
消费流程主要分为以下几个步骤:队列分配、拉取消息、消息处理
队列分配
队列分配目的:指定消费者负责的队列集合,默认使用的是平均分配算法
由RebalanceService 组件定时触发,周期为20s一次。
分配流程如下
1 获取指定topic下的消息队列集合
2 如果是广播模式,则不需要进行负载均衡,每个消费者均负责所有消息队列
3 集群模式则根据负载均衡策略分配给消费者
4 消息队列分配完毕后,则需要给每个消息队列创建【任务队列】 ProcessQueue。
5 为每个【任务队列】创建【消息拉取任务】 PullRequest(一个topic创建一个消息拉取任务)
拉取消息
消息的拉取依赖PullMessageService,这是一个后台运行的服务,只要任务队列中存在任务就会拉取
如果拉取消息成功,则将消息加入到待处理任务队列中(ProcessQueue),并且提交一个消费请求给ConsumeMessageService,并且提交下一次拉取任务。
如果没有拉取成功,则根据服务端返回的offset进行矫正,并且提交下一次拉取任务。
注意:在默认情况下,一次拉取数据的条数最大为32
注意:拉取下来的消息,会存入一个有序的TreeMap中,key就是offsetId
消息处理
ConsumeMessageService有两个实现类
并发处理:ConsumeMessageConcurrentlyService
顺序处理:ConsumeMessageOrderlyService
并发消费
在拉取到消息之后,并且提交给ConsumeMessageConcurrentlyService ,会依次为每一条消息创建一个ConsumeRequest,然后交给一个线程池进行处理。
如果消费成功:则把此次消费的消息从任务队列中【移除】,并且【更新消费位点】。
如果消费失败:会把消息重发至broker,等待后续的重新消费。
如果重发broker失败:则会将消息重新提交到任务队列等待消费者处理。
注意:在获取需要更新的消费位点的时候,会获取TreeMap的第一条数据,换言之,就是如果offsetId大的消息先被消费,是无法被及时上报的,如果这个时候产生宕机,会存在重复消费的风险
顺序消费
RocketMQ可以通过将消息发送至同一个队列中,利用队列天然的有序性实现顺序消费。
但是,通过这种方式只能实现发送到同一个队列,在某些情况下还是会打破这个有序性。
例子
1 消费者A正在消费消息,消费者B发生了负载均衡,也分配到了A,这种情况下相当于有两个消费者同时进行消费。
2 当队列A有一个消费者,但是队列A内部可以进行并发消费。
在RocketMQz中还会通过锁机制来实现顺序消费。
同一时间,一个队列只能分配给一个消费者,通过给broker队列上锁进行实现
同一时间,一个队列只能有一个消费线程进行消费,需要给本地队列上锁
在顺序消费中如果消费失败,会直接将消息放回任务队列中等待重新消费
标签:消费,消费者,队列,拉取,任务,消息,简单,原理,RocketMQ From: https://blog.csdn.net/sjdxx/article/details/142943938