首页 > 其他分享 >图解Kafka | 28张图彻底搞懂消费者

图解Kafka | 28张图彻底搞懂消费者

时间:2024-08-26 15:52:14浏览次数:13  
标签:消费 消费者 分区 28 偏移量 Kafka 消息 搞懂

消费者

消费者角色

Kafka 消费者与生产者一样,是优化 Kafka 数据处理的重要角色。消费者的主要任务是与 Kafka 集群建立连接,并基于配置的消费者属性从相应的 Kafka broker读取记录。
外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

多应用消费

多个应用程序可以从同一个 Kafka 主题中消费记录,每个应用程序都会获取该数据的一个独立副本,并按照各自的节奏进行读取。换句话说,一个应用程序的偏移量可能与另一个应用程序的偏移量不同。Kafka在内部的“__consumer_offset”主题中跟踪每个应用程序消费偏移量。

在这里插入图片描述

消费者组和消费者

在Kafka中,消费者组(Consumer Group)是一个非常重要的概念,消费者组是由一个或多个消费者实例组成的集合,这些消费者共同消费一个或多个主题中的消息。

每个消费 Kafka 数据的应用程序都被视为一个消费者组,应用程序的每个实例就是一个消费者。

下图描绘了一个只有一个消费者的消费者组。
在这里插入图片描述

一个消费者可以消费一个主题的多个分区消息,下图中,Consumer 1消费了Topic A中的Partition 0和Partition 1.
在这里插入图片描述

一个分区只能被同一个消费组中的一个消费者消费,换句话说,同一个分区不会由两个消费者处理,理解这一点非常重要,如下图所示:
在这里插入图片描述

因此当消费者组中的消费者多于主题中的分区时,多余的消费者将会空闲,不被使用,下图中,Topic A有1个分区,但是 Consumer Group 1有2个消费者,但是只有1个消费者能够消费Topic A的这个分区消息。
在这里插入图片描述

当应用程序想要提高处理速度时,可以向消费者组添加更多消费者。Kafka负责管理每个消费者组中消费者的偏移量以及在添加或删除消费者时重新分配分区,下面的例子展示了往消费者组中新增消费者时重新分配分区的过程。

当Consumer Group 1只包含一个消费者时,这个消费者会消费 Topic T1中所有分区的消息。
在这里插入图片描述

如果新增一个消费者加入到Consumer Group 1,此时消费者组包含2个消费者,组中的每个消费者都被分配到Topic T1中一半的分区。
在这里插入图片描述

随着消费者的源源不断加入,Topic T1中的分区会尽可能被均衡分配到每个消费者上供其消费。直到消费者数量与分区数量一致。
在这里插入图片描述

一旦消费者数量超过分区数量,多余的消费者将处于闲置状态,不会收到任何消息,因为同一个消费组中的不同消费者不能消费同一个分区的消息。
在这里插入图片描述

如果需要,您还可以让多个消费者组读取同一Topic的消息。每个消费者组将维护自己的一组消费者偏移量,简单地说,Consumer Group 1 收到的消息也会被Consumer Group 2 收到。但同一个消费组里不同的消费者不能消费相同的消息,这里再次强调这点。
在这里插入图片描述

消费者偏移(Consumer Offset)

在 Kafka 中,消费者偏移(Consumer Offset)是一个很关键的概念,它是用于管理消费者在主题(Topic)分区(Partition)中的消费进度和状态,通过它可以了解到每个消费者已经从分区中消费了多少条消息以及下一步从哪里开始。

消费者偏移量是消费者在分区内读取的最后一条记录的位置的数值标识符。消费者需要定期提交它们到Broker上。Kafka 在内部维护了一个名为 “__consumer_offsets” 的主题,Kafka 就是将消费者位移存储在这个主题中。

假设你有一个 Kafka 主题叫 MyTopic,用来记录订单信息。这个主题有两个分区(分区 0 和分区 1)。你有两个消费者组:一个叫 Consumer Group A,负责处理订单;另一个叫 Consumer Group B,负责审计订单。

  • Consumer Group A 消费者组:这个消费者组的一个消费者从 MyTopic 的分区 0 中读取了消息,并处理到了偏移量 2。它提交了这个偏移量给 Kafka,Kafka 便把这个信息存储在 __consumer_offsets 主题中。于是,__consumer_offsets 记录了 Consumer Group A 组在 MyTopic 的分区 0 中最新的偏移量是 2。

  • Consumer Group B 消费者组:与此同时,Consumer Group B 组可能在审计数据,它的一个消费者从 MyTopic的分区 0 中读取并处理到了偏移量 8。它提交了这个偏移量,Kafka 同样把这个信息写入 __consumer_offsets 主题中。

在这里插入图片描述

消费者偏移量提交方式

Kafka 提供了两种主要的提交方式:自动提交和手动提交。

手动提交(Manual Commit)

手动提交消费者偏移量(enable.auto.commit参数设置为fale)就像手动保存文件,你可以完全控制什么时候保存进度。你可以在确保消息处理完毕后再提交偏移量。

在手动提交偏移量情况下,如果消费者在处理消息后但在提交其偏移量之前发生故障,就会导致重复消费消息,下图中,CONSUMER-1成功消费了PARTITION 0的消息0-4,但在提交偏移量之前crash了,这时经过重新平衡之后,组内的CONSUMER-2接管了PARTITION 0,重新从消息0开始消费。
在这里插入图片描述

自动提交(Auto Commit)

如果消费者设置为自动提交(enable.auto.commit参数设置为true),则意味着偏移量在poll()函数成功返回时已经提交了,而不管消费者是否对消息是否已经成功进行了业务处理,因此如果消费者在处理此消息时崩溃,会导致消息丢失。在下图中,消费者设置了自动提交,CONSUMER-1成功拉取到了PARTITION 0的消息0-4,消费偏移量已经自动提交了,但是在真正处理消息的时候,CONSUMER-1宕机了,经过重新平衡之后,组内的CONSUMER-2接管了PARTITION 0,将会从消息5开始拉取消息,这样消息0-4用于也没机会消费了。
在这里插入图片描述

消费者启动和偏移量

当 Kafka 消费者启动时,它需要决定从哪个位置开始消费消息。这主要取决于两个因素:

  1. 消费者组的已提交偏移量
  2. auto.offset.reset 属性
1. 消费者组的已提交偏移量

如果消费者组之前已经消费过该分区的数据,Kafka 会查看 __consumer_offsets 主题中记录的最新偏移量。这是消费者上次停止时的进度,消费者会从这个位置开始继续消费,以确保不会重复消费消息。

2. auto.offset.reset 属性

如果消费者组是新的,或者消费者组从未处理过该分区的数据(例如,第一次消费该主题),Kafka 会根据 auto.offset.reset 属性来决定从哪个位置开始消费。

  • earliest(最早):如果 auto.offset.reset 设置为 earliest,消费者会从分区中最早的消息开始消费。这意味着它会从分区的起始位置开始读取所有可用的消息,包括消费者组未处理的旧消息。

  • latest(最新):如果 auto.offset.reset 设置为 latest,消费者会从分区中最新的消息开始消费。这意味着它会跳过已经存在的消息,直接从分区中新到达的消息开始读取。

消费者滞后

消费者滞后是指 Kafka 中消费者处理消息的速度跟不上生产者生产消息的速度,从而导致消息堆积在 Kafka 中的现象。通俗地讲,就像你在处理一堆快递包裹,而包裹送来的速度远远超过你拆包裹的速度,于是未拆的包裹越堆越多,这就是“滞后”。

消费者滞后可能给系统带来丢失消息的风险,broker在工作过程中可能移除旧日志段,如果消费者滞后过大,可能会导致一些旧消息还没被消费就已经被删除了。
在这里插入图片描述

可以使用kafka-consumer-groups工具查看和管理消费者滞后。

如何减少滞后?
  • 提高消费者的处理速度:优化消费逻辑或增加消费者数量。
  • 增加分区数量:通过更多分区,分散消息压力,增加并行处理能力。
  • 监控滞后:使用 Kafka 提供的监控工具,及时发现和处理滞后问题。

Kafka消费者和协调器协议

在这里插入图片描述

协调员的角色

当一个消费者想要加入或创建一个消费者群组时,它首先需要找到 Kafka 集群中负责管理该消费者群组的协调员。这个过程开始于消费者向一个引导服务器(Bootstrap Server)发送一个“查找协调员”的请求。

如果消费者群组的协调员还没有确定,Kafka 会根据特定的哈希公式计算出一个合适的 broker 作为该群组的协调员。然后,这个协调员的地址将作为对“查找协调员”请求的响应返回给消费者。

加入组

一旦消费者得知了协调员的地址,它就会向协调员发送一个“join group”的请求。协调员会处理这个请求,并返回消费者群组的领导者和其他相关的元数据详细信息。如果这个群组还没有领导者,那么第一个加入的消费者会被选为领导者。

此外,消费应用程序可以通过特定的配置来控制哪个消费者会被选为领导者
在这里插入图片描述

同步组

在消费者收到领导者的详细信息后,他们会向协调员发送一个“sync group”的请求。这一请求会触发消费者组内部的重新平衡过程,分配给消费者的分区将在“sync group”请求后发生变化。
在这里插入图片描述

重新平衡

重新平衡是在以下几种情况下触发的:当有新的消费者加入或已有的消费者退出群组,或者当某个消费者发送了“sync group”请求时。

在重新平衡过程中,消费者组中的所有消费者都会收到更新后的分区分配。这意味着消费者在重新平衡完成之前将暂停数据消费,以确保数据分配的有序性和一致性。

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

心跳

为了保持与协调员的连接,消费者组中的每个消费者都会定期向协调员发送心跳信号。如果协调员在一段时间内未收到某个消费者的心跳信号,就会认为该消费者已失联,并启动重新平衡过程,以重新分配该消费者负责的分区给其他活跃的消费者。
在这里插入图片描述

离开组

消费者可以随时通过发送“leave group”请求离开组。协调器将确认请求并启动重新平衡。如果消费者组中的领导节点离开组,组内将选出一个新的领导者并启动重新平衡。
外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

标签:消费,消费者,分区,28,偏移量,Kafka,消息,搞懂
From: https://blog.csdn.net/weixin_42627385/article/details/141531868

相关文章

  • 利用kafka和kafka connect插件debezium实现oracle表同步
    1.kafka安装1.1.java安装openjdk下载,建议使用17,至少应该高于版本11#进入家目录,解压下载的java包,配置环境变量tarvxfopenjdk-20.0.1_linux-x64_bin.tar.gz-C/usr/local/vi.bash_profile#注意要把JAVA的目录放到$PATH之前exportJAVA_HOME=/usr/local/jdk-20exportP......
  • 网站提示428 Precondition Required:必须在请求中设置先决条件怎么办
    当遇到“428PreconditionRequired”错误时,这意味着服务器要求客户端在请求中包含特定的先决条件(precondition)。这种错误通常出现在客户端尝试执行某项操作时,服务器需要确认某些条件得到满足。解决方案检查请求头确认请求头中是否包含了服务器要求的先决条件。例如,服务器......
  • day9第四章 字符串part02| 151.翻转字符串里的单词 |卡码网:55.右旋转字符串|28. 实现
    151.翻转字符串里的单词classSolution{publicStringreverseWords(Strings){////删除首尾空格,分割字符串String[]str=s.trim().split("");StringBuildersb=newStringBuilder();////倒序遍历单词列表for(inti......
  • 异源数据同步 → DataX 为什么要支持 kafka?
    开心一刻昨天发了一条朋友圈:酒吧有什么好去的,上个月在酒吧当服务员兼职,一位大姐看上了我,说一个月给我10万,要我陪她去上海,我没同意朋友评论道:你没同意,为什么在上海?我回复到:上个月没同意前情回顾关于DataX,官网有很详细的介绍,鄙人不才,也写过几篇文章异构数据源同步之数据......
  • adc-ads1281驱动流程
            ADS1281是一款高性能、低功耗的模拟-数字转换器(ADC),关于其数据读写,从数据手册中获取的一些重要信息。1.时序        同步信号:上电SYNC引脚给出一个高低脉冲后挂载多个ADS1281后,同一个DREADY信号控制从机数据同步。2.连续读数据模式     ......
  • FastAdmin目录穿越 CVE-2024-7928
    0x01漏洞描述:FastAdmin是一款基于ThinkPHP+Bootstrap开发的快速后台开发框架。FastAdmin基于Apache2.0开源协议发布,免费且不限制商业使用,目前被广泛应用于各大行业应用后台管理。其接口lang存在目录穿越漏洞,攻击者可通过该漏洞获取系统库敏感信息。0x02影响版本:FastAdmin......
  • Kubernetes v1.28.2 & Calico eBPF
    集群初始化简略步骤初始化集群kubeadminit\--skip-phases=addon/kube-proxy\--apiserver-cert-extra-sans=35.229.220.159,127.0.0.1,10.0.0.3,10.0.0.4,10.0.0.5,10.254.0.2\--control-plane-endpoint=apiserver.unlimit.club\--apiserver-advertis......
  • linux下试验中间件canal的example示例-binlog日志的实时获取显示以及阿里巴巴中间件ca
    一、linux下试验中间件canal的example示例-binlog日志的实时获取显示    今天重装mysql后,进行了canal的再次试验,原来用的mysql5.7,今天重装直接换了5.6算了。反正测试服务器的mysql也不常用。canal启动后日志显示examplepreparetofindstartpositionjustshowmaste......
  • rocketmq 是参考了 kafka架构, 为什么rocketmq吞吐量是10万/秒, kafka吞吐量是17万/秒
    我们都知道,为了防止消息在服务器丢失,一般都是进行持久化(保存在磁盘),在发送消失时那就涉及到从磁盘拷贝到内核空间,从内核空间到用户态,再从用户态到socket缓存区,从socket缓存区到网卡四次拷贝。kafka使用的是零拷贝-sendfile,把内核态数据发送到网卡,减少两次拷......
  • kafka
    消息队列的流派MQ是什么MessageQueue(MQ)是一种消息队列中间件。MQ的主要作用是通过分离消息的发送和接收来实现应用程序的异步和解耦。然而,MQ的核心目的是通信:它屏蔽了底层复杂的通信协议,并定义了一套更简单的应用层通信协议。在分布式系统中,模块间通信通常使用HTTP......