首页 > 其他分享 >3分钟白话RocketMQ系列—— 如何消费消息

3分钟白话RocketMQ系列—— 如何消费消息

时间:2023-11-13 12:03:11浏览次数:35  
标签:消费 消费者 白话 分钟 队列 消息 拉取 RocketMQ

=



白话3分钟,快速了解RocketMQ如何消费消息。

看完如果不了解,欢迎来打我。

我们知道RocketMQ主要分为消息 生产、存储(消息堆积)、消费 三大块领域。

前面已经介绍了 生产消息、存储消息 两大块内容,那接下来,我们白话一下RocketMQ是如何消费消息的,揭秘消息消费全过程。

注意,如果白话中不小心提到相关代码配置与类名,请参考RocketMQ 4.9.4版本



关键字摘要

  • 核心概念:消费者与消费组、订阅关系、消费模式
  • 核心流程:消费拉取、负载均衡、消息消费


Q1: 消息消费有哪些核心概念?

3分钟白话RocketMQ系列—— 如何消费消息_队列queue


消费者与消费组、订阅关系

1)消费者与消费组

消息消费以 组 的模式开展。每个消费组ConsumerGroup可以包含多个消费者Consumer,并且可以订阅多个主题Topic。如果多个消费者设置了相同的ConsumerGroup,我们认为这些消费者在同一个消费组ConsumerGroup内。


2)订阅关系

订阅关系Subscription由消费者组ConsumerGroup动态注册到服务端系统,并在后续的消息传输中按照订阅关系中的过滤规则进行 消息过滤与匹配

原则:

  • 不同消费组

ConsumerGroup

  • 对于同一个

Topic

  • 的订阅相互独立
  • 同一个消费组

ConsumerGroup

  • 对于不同

Topic

  • 的订阅也相互独立
  • 同一消费组

ConsumerGroup

  • 内的多个消费者

Consumer

  • 的订阅关系必须保持一致!否则可能会导致部分消息消费不到


3)消费模式

消费组之间有两种消费模式:「集群模式」和「广播模式」。

在「集群模式」下,同一主题下的消息只能被消费组内的某一个消费者处理,一条消息会被 1 个消费组内的 N 个消费者消费 1 次。

在「广播模式」下,同一主题下的消息将会被消费组内的所有消费者处理一次,一条消息会被 1 个消费组内的 N 个消费者消费 N 次。

如果消息消费是「集群模式」,那么消息进度保存在Broker上; 如果是「广播模式」,那么消息消费进度存储在Consumer端本地。



Q2:消费者怎么拉取消息?

整体流程包括:

  • 消费者启动。主要包括订阅

Topic

  • 、初始化消息进度。
  • 消费者发送拉取请求。主要查询路由表找到目标

Broker

  • 发送请求。

Broker

  • 查找并返回消息。根据订阅关系

Subscription

  • 和 消息进度 进行消息过滤和匹配,然后返回消息。
  • 消费者接收并处理消息。

消息服务器与消费者之间有两种消息传送方式:「推模式」和「拉模式」。

「拉模式」是消费者主动向消息服务器请求拉取消息。「推模式」是消息到达消息服务器后,由服务器主动推送给消息消费者。

在 RocketMQ 中,Consumer端的两种消费模式(Push/Pull)底层其实都是基于「拉模式」来获取消息的。

具体实现方式是,消息拉取线程从服务器 拉取 一批消息后,将其提交给消息消费线程池,并立即继续向服务器尝试拉取消息,以保持消息的连续性。

那如果拉取消息时,Broker端暂时没有新消息可以返回怎么办?会一直无脑发送拉取请求吗?

嗯,一定不会啦。

RocketMQ默认会开启「长轮询机制」,这个机制能够平衡 轮询压力新消息的实时性

  • 消费者发送拉取请求到

Broker

  • ,如果没有新消息,

Broker

  • 会暂时 挂起 请求不返回

Broker

  • 每隔5s检查一次挂起的请求,是否有满足条件的新消息,如果有就返回,如果没有就继续挂起,直到超时返回
  • 如果在挂起的过程中,有满足条件的新消息写入

commitLog

  • ,也会立即返回新消息


Q3:消费者怎么知道去哪里拉取消息?

这就需要聊一聊消息消费的「负载均衡机制」了。

注意,RocketMQ 5.x版本,对「推模式」底层增加了一种「Pop模式」的实现。Pop和Pull区别在于,Pop消费的重平衡是在 Broker 端做的,而之前的 Pull

消费端的负载均衡是指将Broker端中多个队列queue按照某种算法分配给同一个消费组中的不同消费者,负载均衡是客户端开始消费的起点。

注意,从RocketMQ服务端5.0版本开始额外支持了「消息粒度」的负载均衡策略,4.x/3.x版本仅支持「队列粒度」的负载均衡策略。本文只介绍4.x的「队列粒度」的。

RocketMQ「队列粒度」的负载均衡的核心设计理念是:

  • 消费队列在同一时间只允许被同一消费组内的一个消费者消费
  • 一个消费者能同时消费多个消息队列

负载均衡基本流程:

Consumer

  • 启动后,它就会通过定时任务向所有

Broker

  • 实例发送心跳包(包含消费分组名称、订阅关系集合、消息通信模式和客户端id等信息),

Broker

  • 会缓存这些信息。

Consumer

  • 每隔10ms从

Nameserver

  • 获取Topic与队列

queue

  • 的路由信息,缓存本地
  • 每隔20s,

Consumer

  • 端会请求

Broekr

  • 获取该消费组下消费者Id列表,然后根据

Topic

  • 下的队列

queue

  • 、消费组下消费者Id进行排序,计算出待拉取的队列

queue

  • 根据新算出的本地应该消费队列

queue

  • ,重新计算本地队列消费任务。

特别注意,无论是消息粒度负载均衡策略还是队列粒度负载均衡策略,在消费者上线或下线、服务端扩缩容等场景下,都会触发短暂的重新负载均衡动作,可能会存在短暂的负载不一致情况,出现少量消息重复的现象。

因此,需要在下游消费逻辑中做好消息「幂等去重」处理。



Q4: 消费者拉到消息了,怎么消费呢?

消息消费,主要关注两个事情:

  • 会不会消息丢失?
  • 会不会消费重复?

怎么保证消息消费不丢失?

其实思路是比较直接的,就是 「消息确认机制」和「失败重试机制」

消费者从RocketMQ拉取消息后,需要返回"CONSUME_SUCCESS"来表示业务方已经正常完成消费。只有返回"CONSUME_SUCCESS"才算作消费完成。这就是消费时的「消息确认机制」。

如果返回"CONSUME_LATER",则会按照不同的消息延迟级别进行再次消费,延迟级别从秒到小时不等,最长延迟时间为2个小时后再次尝试消费。这就是消费时的「失败重试机制」。

重试消息会被存入名为 "%RETRY%+消费组名称"Topic中,原始主题Topic会存入属性中。然后会基于定时任务机制,在到期时将任务再次拉取出来。

注意,从重试Topic的名称我们可以了解到,RocketMQ消息重试是以消费组为单位,而不是Topic。

另外,RocketMQ跟kafka不同的是,天然支持了 「死信队列机制」

如果在尝试消费的过程中达到了最大重试次数(通常为16次),仍然无法成功消费,则消息将被发送到死信队列,以确保消息存储的可靠性。后续业务可以根据死信队列,来做相关补偿措施。

怎么保证消息消费不重复?

其实思路也很直接,就是不保证不重复。

所有消息队列的设计,都是不保证消息消费不重复的。所以使用消息队列时,要特别注意,如果有唯一性要求,必须做好消费端的「幂等设计」。



总结

  • 消息拉取:「推模式」与「拉模式」本质都是「拉模式」、「长轮询机制」平衡 轮询压力 与 新消息的实时性。
  • 消息消费负载均衡:定时获取

Topic

  • 下的队列

queue

  • 、消费组下消费者Id等信息,本地计算负载均衡策略,存在消息重复的可能性。
  • 消息消费:「消息确认机制」和「失败重试机制」 保证消息不丢失、消息队列都存在重复消费。

3分钟到了吗?应该对RocketMQ如何消费消息有全面了解了吧。
如果还想了解更多,欢迎关注下一期内容。






阿丸笔记(微信公众号:aone_note)



标签:消费,消费者,白话,分钟,队列,消息,拉取,RocketMQ
From: https://blog.51cto.com/u_15270048/8340474

相关文章

  • 3分钟白话RocketMQ系列—— 如何存储消息
    白话3分钟,快速了解RocketMQ如何存储消息。看完如果不了解,欢迎来打我。我们知道RocketMQ主要分为消息生产、存储(消息堆积)、消费三大块领域。那接下来,我们白话一下,RocketMQ是如何存储消息的,揭秘消息存储全过程。注意,如果白话中不小心提到相关代码配置与类名,请参考RocketMQ4.9.4......
  • WorkPlus即时通讯app:10分钟快速搭建,支持局域网私有化部署!
    随着数字通讯的飞速发展,“IM+办公”模式被越来越多的政企组织所接受和采用。然而,公有云IM服务的信息安全问题时有发生,这使得一些政府部门和事业单位对此存在着爱恨交加的复杂心态。在这样的背景下,私有化IM作为一种解决方案逐渐受到关注。私有化IM可以在企业自己的服务器上部署和运......
  • PHP函数封装分分钟帮你实现数据脱敏处理, 支持手机、邮箱、身份证号 中文字符串!
    ......
  • GPT最佳实践:五分钟打造你自己的GPT
    前几天OpenAI的MyGPTs栏目还是灰色的,就在今天已经开放使用了。有幸第一时间体验了一把生成自己的GPT,效果着实惊艳!!!我打造的GPT模型我会放到文章末尾,大家感兴趣也可以自己体验一下。打造自己的GPT模型点击CreateaGPT,可以进入到下面这个界面,左侧是一个GPTBuilder的对话框,右边......
  • 【视频课】纯新手如何快速掌握深度学习必备的Python基础能力,150分钟助你入门!...
    前言欢迎大家关注有三AI的视频课程系列,我们的视频课程系列共分为5层境界,内容和学习路线图如下:第1层:掌握学习算法必要的预备知识,包括Python编程,深度学习基础,数据使用,框架使用。第2层:掌握CV算法最底层的能力,包括模型设计基础,图像分类,模型分析。第3层:掌握CV算法最核心的方向,包括图像分......
  • 每天5分钟复习OpenStack(九)存储发展史
    上一章节我们介绍了使用本地硬盘做kvm的存储池,这章开始将介绍下存储的发展历程,并介绍什么是分布式存储,为什么HDFS为有中心节点的分布式存储?1、存储发展在单机计算时代(大型机、小型机、微机),内部存储器可以理解为内存(即Memory),外部存储器可以理解为物理硬盘(包括本地硬盘和通过......
  • 电子公章怎么制作?1分钟免费在线生成
    电子公章已经成为很多企业日常运营中不可或缺的一部分。那么,电子公章怎么制作呢?是否需要专业的电子公章制作工具?是否存在免费在线生成电子公章的选项?本文将为你揭示如何一分钟免费在线生成电子公章,解答你对电子公章如何制作的种种疑惑。首先,电子公章的制作需要依托于专业正规的电子......
  • 【Microsoft Azure 的1024种玩法】七十四.五分钟在Azure Virtual Machines中快速部署
    【简介】ApacheMaven由Apache软件基金会所提供的一个软件项目管理及自动构建工具,Maven为开发者提供了一套完整的构建生命周期框架。开发团队几乎不用花多少时间就能够自动完成工程的基础构建配置,因为Maven使用了一个标准的目录结构和一个默认的构建生命周期,Maven能够在很短......
  • 电子公章怎么制作?1分钟免费在线生成
    电子公章已经成为很多企业日常运营中不可或缺的一部分。那么,电子公章怎么制作呢?是否需要专业的电子公章制作工具?是否存在免费在线生成电子公章的选项?本文将为你揭示如何一分钟免费在线生成电子公章,解答你对电子公章如何制作的种种疑惑。首先,电子公章的制作需要依托于专业正规的电子......
  • 【面试题】消息队列面试题总结(RocketMQ版)
    自己整理、总结了一些消息队列相关面试题,并想了一些RocketMQ面试过程中可能会问的知识点。使用消息队列的优点系统解耦比如系统A产生的某个事件,系统B需要感知,简单实现就是在系统A产生事件之后,调用系统B的接口通知系统B,如果此时再增加一个系统C,还需要修改系统A的代码,再加入调用......