首页 > 其他分享 >go高并发之路——消息中间件kafka(中)

go高并发之路——消息中间件kafka(中)

时间:2024-08-04 17:06:51浏览次数:20  
标签:Group 消费者 Rebalance 分区 kafka 实例 消息中间件 go Consumer

接着上篇,我们继续聊聊kafka的那些事儿。

一、消费者组

消费者组,即 Consumer Group,是 Kafka 的一大亮点设计。一个组内可以有多个消费者或消费者实例(Consumer Instance),它们共享一个公共的 ID,这个 ID 被称为 Group ID。组内的所有消费者协调在一起来消费订阅主题(topic)的所有分区(Partition)。当然,每个分区只能由同一个消费者组内的一个 Consumer 实例来消费
理解消费者组,我们应该记住下面三个特性:
1、 Consumer Group 下可以有一个或多个 Consumer 实例。这里的实例一般是一个单独的进程,比如一个脚本起n个进程去消费,那么 Consumer就有n个。
2、Group ID 是一个字符串,在一个 Kafka 集群中,它标识唯一的一个 Consumer Group。
3、Consumer Group 下所有实例订阅的主题的单个分区,只能分配给组内的某个 Consumer 实例消费。这是一个很重要的概念,还记得我们上一篇讲到将用户的消息按顺序推入同一个分区,就能够保证顺序消费吗?就是因为这个机制,一个分区的数据只能被同一个消费者组的一个消费者实例消费

那么在实际项目中,我们怎么知道一个Group下应该设置多少个Consumer呢? 理想情况下,Consumer实例的数量应该等于该Group订阅topic的分区总数。举个例子,假设一个 Consumer Group 订阅了 4 个主题,分别是 A、B、C、D, 它们的分区数依次是 1、2、3、2,那么通常情况下,为该 Group 设置 8 个 Consumer 实例是比较理想的情形,因为它能最大限度地实现高伸缩性,并将性能最大化。

提到消费者组,有个消费方式的概念需要补充一下:
常见的消费方式有两种,一个是点对点的消费方式,又被称为消息队列,即一个主题的消息只能被一个消费者消费,比如redis的list类型中使用 lpush+brpop 即可实现这种消息队列方式,这种传统的消息队列有一种缺陷在于消息一旦被消费,就会从队列中被删除,而且只能被下游的一个Consumer消费。而另一种消费方式则很好地弥补了消息队列的缺陷,这就是发布-订阅模式,一个主题中的消息被多个消费者群组共同消费,kafka就是这种发布-订阅模式的典型。

下面给大家画了一些图,帮助大家理解消费者组、消费者和主题、分区的关系:
(1)一个消费者组中只有一个消费者去 消费Topic1中的四个分区数据:

(2)上面的图中只有一个消费者去消费四个分区,显然速度是很慢的,需要帮手,于是就演变成了下图这样,一个消费者组有两个消费者去 消费Topic1中的四个分区数据:

(3)上面的图中有两个消费者去消费四个分区数据了,平摊了一部分消费压力,但可能还不够快,于是继续增加消费者,于是就演变成了下图这样,一个消费者组有四个消费者去 消费Topic1中的四个分区数据,这种就是理想的生产消费模型了,多少个分区就对应多少个消费者

(4)Topic1只有四个分区,那么上面四个消费者已经足够了,假设搞五个、六个甚至更多的消费者还有意义吗?那就会出现下面图示的情况,多余的消费者会被闲置,消费者5和消费者6会永远处于空闲状态,这就造成了消费资源的浪费。

(5)上面讲了,kafka是因为有消费者组的概念,是天然支持发布-订阅模式的,即生产者只需写入一次消息,就可以支持任意多的应用读取这个消息。换句话说,每个应用都可以读到全量的消息。为了使得每个应用都能读到全量消息,应用需要有不同的消费者组,而每个消费者组之间是互不影响的。对于上面的例子,假如我们新增了一个新的消费者组 2,而这个消费组有三个消费者,那么就演变为下图这样:

二、消费者组重平衡

消费者组重平衡,英文名也叫做 Rebalance。在kafka这个圈子算是一个大(臭)名(名)鼎(昭)鼎(著)的概念,使用过kafka的同学一定或多或少地听过这个概念。那我们究竟怎么去理解这个重平衡呢?

首先,我们从上面的消费者演变图中可以知道这么一个过程:最初是一个Consumer订阅一个Topic并消费其全部分区的消息,后来有一个Consumer加入Group,随后又有更多的Consumer加入Group,而新加入的Consumer实例分摊了最初消费者的部分消息。这种机制类似于一种协议,规定了一个Group 下的所有Consumer如何达成一致,来分配订阅Topic 的每个分区。举个更简单的例子:比如某个 Group下有 20 个Consumer实例,它订阅了一个具有 100 个分区的Topic,那么正常情况下,Kafka 平均会为每个Consumer实例 分配5个分区。

那么 Consumer Group 何时进行 Rebalance 呢?Rebalance 的触发条件主要有 3 个
1、 组成员数发生变更。比如有新的 Consumer 实例加入组或者离开组。这个很好理解,比如新增或者删除了某个消费脚本都会出现这种情况。
2、订阅主题数发生变更。Consumer Group 可以使用正则表达式的方式订阅主题,比如
topicRegex, err := regexp.Compile(^.*c$) 就表明该 Group 订阅所有以字母 c 结尾的topic。在 Consumer Group 的运行过程中,你新创建了一个满足这样条件的主题,那么该 Group 就会发生 Rebalance。
3、订阅Topic的分区数发生变更。当分区数增加时,就会触发订阅该主题的所有 Group 开启 Rebalance。

可能有些人还会列举更多引起Rebalance的情况,但其实本质上就是上面这3个变化。最终引起了资源的不协调,触发了Rebalance。

重平衡非常重要,它为消费者组带来了高可用性 和 高伸缩性,我们可以放心的添加消费者或者移除消费者,不过在正常情况下我们并不希望发生这样的行为。因为他也有几个被人诟病的地方:
1、首先,Rebalance 过程对 Consumer Group 消费过程有极大的影响。在 Rebalance 过程中,所有 Consumer 实例都会停止消费(Java中万物静止的概念),等待 Rebalance 完成。
2、Rebalance的过程很慢,因为要做一系列的动作,分区算法分配、分区消息中断处理等等。反正慢就对了,据说,有个国外用户的Group 内有几百个 Consumer 实例,成功 Rebalance 一次要几个小时!这对于实时性要求高点的项目,是完全不能忍受的。Rebalance确实不是一个很好的东西,虽然结果很美好(让各方面资源都得到了协调),但过程太艰辛了(Rebalance过程什么都干不了)。

那么我们如何尽量去避免触发Rebalance呢

其实Rebalance触发条件中的第2、3个都很好知道,一般都是人为的,比如运维手动操作,肯定是在业务低峰期去操作的,所以基本对业务无影响。真正有影响是第1点,组成员数变更。组成员数增加或者减少,其实就是增加或者减少了一个消费者实例,比如机器弹性伸缩等原因导致消费脚本新增或者减少了一台机器去消费Topic,那么Rebalance机制会根据分配规则,给这个新Consumer实例分配分区。这个时候就要注意对现网数据的影响了,主要考虑业务的延迟带来的影响,还有异常情况下可能会出现消息丢失或者消息重复等情况

kafka是如何知道某个消费者组断开或连接的呢?其实是依赖心跳机制,就是消费者组会定时向broker的Coordinator组件发送心跳,告诉broker我这个消费者组还存活着,不要把我踢掉,而这个心跳的时间是可以设置的,包括session.timeout.ms、heartbeat.interval.ms等参数。事实上,kafka有很多很多参数配置,但是一般不建议去动它们,尤其是使用腾讯云、阿里云这些云厂商的kafka的话,基本用默认配置就好了。

以上,就是跟大家分享的kafka消费者这块的一些知识点了,这些都是我们用好kafka所必须掌握的东西。好了,下篇文章见~

标签:Group,消费者,Rebalance,分区,kafka,实例,消息中间件,go,Consumer
From: https://www.cnblogs.com/lmz-blogs/p/18341969

相关文章

  • 完全用python 实现消息中间件2
    为了完善这个简单的消息中间件,我们可以添加以下功能:消息持久化:虽然在这个示例中我们不会使用数据库,但我们可以将消息保存到文件中,以模拟持久化存储。消息确认:添加一个机制来确认消息已经被消费。并发控制:确保在多线程或多进程环境中消息的安全处理。以下是更新后的代码:fr......
  • 在 Python 中从 HTML 中抓取嵌入的 Google Sheet
    这对我来说相对棘手。我正在尝试提取来自python中的google工作表的嵌入表。这是链接我不拥有该工作表,但它是公开可用的。这是迄今为止我的代码,当我输出标题时,它向我显示“”。任何帮助将不胜感激。最终目标是将此表转换为pandasDF。多谢你们importlx......
  • 使用 django 的 EmailMessage 发送波斯语电子邮件时出现问题
    我对django相当陌生,并尝试使用django.core.mail.EmailMessage发送包含波斯语字母的电子邮件。这是我的代码:fromdjango.core.mailimportEmailMessagefromdjango.confimportsettingsdefcustom_sender(subject:str,body:str,recipient_list:list[str],......
  • 如何使用 Python 在 Google 或 DuckDuckGo 中快速获取答案
    我有一个人工智能助手项目,我希望它在互联网上搜索。我想使用适用于Python的GoogleQuickAnswerBox或DuckDuckGoInstantAnswerAPI。我看到了其他问题,但它们对我没有多大帮助。这是我想要实现的一个示例:问题:什么是长颈鹿?Google的答案:DuckDuckGo的......
  • django增删改查
    1.增classRole(models.Model):title=models.CharField(verbose_name="角色",max_length=32)od=models.IntegerField(verbose_name="排序",default=0)defstr(self):return"{}-{}-{}".format(self.id,self.title,self.od)方法一:obj1......
  • django多数据库操作
    1.读写分离192.168.1.2default主数据库负责写入192.168.1.3slave从数据库负责读取2.生成数据库表pythonmanage.pymakemigrations分别迁移到主从数据库pythonmanage.pymigrate--database=defaultpythonmanage.pymigrate--database=slave多个app分......
  • 实现一个终端文本编辑器来学习golang语言:第二章Raw模式下的输入输出
    从第二章开始,在每个小节的最后都会有一些代码实操作业,你可以选择自己完成(比较推荐),再对照我的实现方式,当然也可以直接看我的代码实现。不过,之后的各个功能实现,我都会基于我先前的代码实现版本,在它的基础上进行扩展。首先,我们先来解决第一章遗留的第一个问题:输入数据会被stdin缓存......
  • ] Failed to execute goal org.mybatis.generator:mybatis-generator-maven-plugin Ca
    [ERROR]Failedtoexecutegoalorg.mybatis.generator:mybatis-generator-maven-plugin:1.3.7:generate(default-cli)onprojectcom-zhianchen-pgsql:Executiondefault-cliofgoalorg.mybatis.generator:mybatis-generator-maven-plugin:1.3.7:generatefailed:Cann......
  • gogs的安装和使用(docker)
    1.docker安装gogs1.1拉取gogs镜像dockerpullgogs/gogs#也可改为自己需要的版本号 1.2创建存储目录mkdir/root/gogs 1.3 运行镜像 dockerrun--name=gogs-d-p10022:22-p13000:3000-v/root/gogs:/datagogs/gogs1.3.1指令解析 --name=gogs:指......
  • go pkg包名,如何在pkg.go.dev发布golang软件包, package包名, import包名, git库名, g
    golang的包名在不同的地方作用不同,用途不同。在我们通过pkg.go.dev发布自己的包的时候,了解这些包名和他们的用途就很有必要了。下面我们以go语言中的pkg库 github.com/tekintian/strutils为例说明各种名称的区别和用途。如何在pkg.go.dev发布golang软件包要在pkg.go.dev发......