消费者组 Consumer Group
- Kafka 提供的可扩展且具有容错性的消费者机制
- 共享一个公共的 ID,这个 ID 被称为 Group ID。组内的所有消费者协调在一起来消费订阅主题(Subscribed Topics)的所有分区(Partition)
- 每个分区只能由同一个消费者组内的一个 Consumer 实例来消费。理想情况下,Consumer 实例的数量应该等于该 Group 订阅主题的分区总数
- 多出分区数的消费者会处于空闲状态,造成资源浪费
三个特性
- Consumer Group 下可以有一个或多个 Consumer 实例。这里的实例可以是一个单独的进程,也可以是同一进程下的线程。在实际场景中,使用进程更为常见一些。
- Group ID 是一个字符串,在一个 Kafka 集群中,它标识唯一的一个 Consumer Group。
- Consumer Group 下所有实例订阅的主题的单个分区,只能分配给组内的某个 Consumer 实例消费。这个分区当然也可以被其他的 Group 消费。
位移 Offset
- 针对 Consumer Group的位移
- 一组 KV 对,Key 是分区,V 对应 Consumer 消费该分区的最新位移。Map<TopicPartition, Long>,其中 TopicPartition 表示一个分区,而 Long 表示位移
- 采用了将位移保存在 Kafka 内部主题__consumer_offsets的方法,保存在 Broker 端
Rebalance
- 本质上是一种协议,就是让一个 Consumer Group 下所有的 Consumer 实例就如何消费订阅主题的所有分区达成共识的过程
- 比如一个消费组,订阅了100个partition的topic,20个consumer,每个consumer分到5个partition,这个分配的过程就是Rebalance
Rebalance触发条件
- 组成员数发生变更
- 比如有新的 Consumer 实例加入组或者离开组,抑或是有 Consumer 实例崩溃被“踢出”组
- 订阅主题数量发生变化
- 订阅主题的分区数发生变化
负面影响
- Rebalance 影响 Consumer 端 TPS。类似于JVM在GC时的STW(Stop the World):所有应用线程都会停止工作,表现为整个应用程序僵在那边一动不动。Rebalance 过程中,所有 Consumer 实例共同参与,都会停止消费,等待 Rebalance 完成
- Rebalance 很慢。
- Rebalance 效率不高。
协调者 Coordinator
- 专门为 Consumer Group 服务,负责为 Group 执行 Rebalance 以及提供位移管理和组成员管理等
- Consumer 端应用程序在提交位移时,其实是向 Coordinator 所在的 Broker 提交位移
- Consumer 应用启动时,也是向 Coordinator 所在的 Broker 发送各种请求,然后由 Coordinator 负责执行消费者组的注册、成员管理记录等元数据管理操作
确定 Coordinator 所在的 Broker步骤
- 确定由位移主题的哪个分区来保存该 Group 数据:partitionId=Math.abs(groupId.hashCode() % offsetsTopicPartitionCount)。
- 比如你有个 Group 的 group.id 设置成了“test-group”,那么它的 hashCode 值就应该是 627841412。其次,Kafka 会计算 __consumer_offsets 的分区数,通常是 50 个分区,之后将刚才那个哈希值对分区数进行取模加求绝对值计算,即 abs(627841412 % 50) = 12。此时,我们就知道了位移主题的分区 12 负责保存这个 Group 的数据
- 找出该分区 Leader 副本所在的 Broker,该 Broker 即为对应的 Coordinator。
解决思路
- ”本事大不如不摊上“。最好的解决方案就是避免 Rebalance 的发生
- 在真实的业务场景中,很多 Rebalance 都是计划外的或者说是不必要的。对于订阅主题数量发生变化,订阅主题的分区数发生变化一般属于运维主动操作,无可避免。重点需要解决避免组成员数发生变更的情况,而大部分Rebalance情况都是这个原因导致。
- Consumer 实例增加。增加 Consumer 实例的操作都是计划内的,可能是出于增加 TPS 或提高伸缩性的需要。也不属于不必要。
- Consumer 实例减少。某些情况下,Coordinator 错误地认为“已停止”从而被“踢出”Group。
session.timeout.ms
:Coordinator收到Consumer心跳的超时时间,决定了 Consumer 存活性的时间间隔,默认10s。完成 Rebalance 之后,每个 Consumer 实例都会定期地向 Coordinator 发送心跳请求,表明它还存活着。如果某个 Consumer 实例不能及时地发送这些心跳请求,Coordinator 就会认为该 Consumer 已经“死”了,从而将其从 Group 中移除,然后开启新一轮 Rebalance。heartbeat.interval.ms
:Consumer发送心跳给Coordinator的时间间隔,值越小频率越高。频繁地发送心跳请求会额外消耗带宽资源,但好处是能够更加快速地知晓当前是否开启 Rebalance,因为,目前 Coordinator 通知各个 Consumer 实例开启 Rebalance 的方法,就是将 REBALANCE_NEEDED 标志封装进心跳请求的响应体中max.poll.interval.ms
:限定了 Consumer 端应用程序两次调用 poll 方法的最大时间间隔。默认值是 5 分钟。表示Consumer 程序如果在 5 分钟之内无法消费完 poll 方法返回的消息,那么 Consumer 会主动发起“离开组”的请求,Coordinator 也会开启新一轮 Rebalance。
- 非必要Rebalance解决
- 因为未能及时发送心跳,导致 Consumer 被“踢出”Group 而引发的
- 设置 session.timeout.ms = 6s。主要是为了让 Coordinator 能够更快地定位已经挂掉的 Consumer
- 设置 heartbeat.interval.ms = 2s。
- 要保证 Consumer 实例在被判定为“dead”之前,能够发送至少 3 轮的心跳请求,即 session.timeout.ms >= 3 * heartbeat.interval.ms。
- Consumer 消费时间过长导致的,一般是业务处理时间过长
- 设置max.poll.interval.ms时间比业务处理时间长
- Consumer 端频繁GC导致停顿,影响心跳频率与业务处理时间
- 因为未能及时发送心跳,导致 Consumer 被“踢出”Group 而引发的
小结
- Rebalance本身是一个很慢、效率不高、影响Consumer TPS的过程。”本事大不如不摊上“,与其优化Rebalance本身,不如尽力避免
- Rebalance触发条件。对于订阅主题数量发生变化,订阅主题的分区数发生变化一般属于运维主动操作,无可避免。重点需要解决避免组成员数发生变更的情况,而大部分Rebalance情况都是这个原因导致
- 组成员数发生变更
- 订阅主题数量发生变化
- 订阅主题的分区数发生变化
- Consumer退组的触发条件
session.timeout.ms
:Coordinator收到Consumer心跳的超时时间,决定了 Consumer 存活性的时间间隔,默认10s。heartbeat.interval.ms
:Consumer发送心跳给Coordinator的时间间隔,值越小频率越高max.poll.interval.ms
:限定了 Consumer 端应用程序两次调用 poll 方法的最大时间间隔。默认值是 5 分钟。GC 参数
:频繁的GC会影响上面3个时间