KAFKA 2.3 以后,consumer 分为 dynamic 和 static,以是否设置了 group.instance.id 属性区分。
以默认的 consumer 为例,即 dynamic consumer,以下图描述其正常的生命周期:
依赖 FindCoordinator, JoinGroup, SyncGroup, Heatbeat, LeaveGroup 等接口,kafka consumer 和 broker 联合完成了 group 管理。
协议命令表:
命令 | consumer | broker |
FindCoordinator | consumer 启动时,从配置的 bootstrap server 中选择一个负载较小的 broker,向其发送查找 coordinator 的请求 | |
JoinGroup |
consumer 查询到 coordinator 后,开始加入组,join group 时携带了客户端的配置信息,具体内容参见 JoinGroupRequestData |
|
SyncGroup |
consumer 接收到 join group 响应后,如果当前实例分配到了 leader,则按照分配策略分配分区给各消费者,并将分配结果以 SyncGroup 请求发送给 broker。如果当前实例非 leader,则发送一个不带内容的 SyncGroup 请求。具体内容参见 SyncGroupRequestData |
|
Heatbeat | 从某种程度上看,一次 poll 算是一次心跳,poll 时会更新心跳计时器的时刻,如果 consumer 一直没有 poll 动作,HeartbeatThread 则自己发起心跳请求 | |
LeaveGroup |
配置信息表:
属性 | 默认值 | 别名 | consumer | broker |
group.id | groupId | |||
group.instance.id | groupInstanceId | 如果提供了值, 则为 static consumer,当 consumer close 时,不会主动 leave group。可避免滚动发布时,频繁的 rebalance | ||
max.poll.interval.ms | 300000 | rebalanceTimeoutMs | HeartbeatThread 检查 consumer 两次 poll 的时间间隔,如果超过了配置值,则主动 leave group | |
session.timeout.ms | 10000 | sessionTimeoutMs | HeartbeatThread 检查 session 超过了配置的值,则标记 coordinator 为 unkonwn,并主动断开连接,重新开始下一轮 group 流程,即 rebalance | |
heartbeat.interval.ms | 3000 | heartbeatIntervalMs | 以默认值为例,假定 consumer 在没有 poll 行为且还没超过 rebalance 间隔时,HeartbeatThread 每隔 3 秒向 broker 发送一次心跳,成功接收到心跳响应后,则更新 seesion 的时刻 | |
partition.assignment.strategy | RangeAssignor | consumer 在发送 join group 请求时,会携带分配的策略 |
梳理完上面的表格,得出结论:在 HeartbeatThread 无限循环中, consumer 利用心跳来维持 session,当 session 过期时触发 rebalance,当 poll 时间过期时,触发 rebalance
分析 2 种常见的异常情况:
1. consumer 发送心跳,由于网络原因,或者 coordinator broker 宕机,consumer 一直没有接收到心跳响应,则 session 随之会过期,会触发重新加入组,即 reblance
2. consumer 和 coordinator 心跳正常,但是 consumer 一直没有 poll 动作,此时 consumer 会主动离开 group