Broker 集群工作原理
- broker 启动后,会向 zookeeper 注册,并记录在 Kafka 配置节点下的 /brokers/ids 节点下,之后抢占 /controller 节点,率先注册的节点的 Controller 就会负责 Leader 的选举
- 选举节点会监听 /brokers/ids 节点的变化,之后根据选举规则选举出 Leader,并将 Leader 信息记录在 /borkers/topics 节点对应主题下的对应分区的 /state 节点下
- 其他节点会读取 /borkers/topics 下的 Leader 信息并同步给自己的 Controller 用来在选举节点意外下线后重新抢占 /controller 节点以保证服务的正常运行
- 当生产者给 broker 集群的 Leader 节点发送消息后,集群中的 Follower 会主动向 Leader 同步信息,从而将信息存储在磁盘上(数据以 segment 形式存储,多个 segment 逻辑上统称为 log,每个 segment 除了本身的 .log 文件外还有采用稀疏索引方式记录索引编号范围的 .index文件)
- 当 Leader 意外下线时,选举节点会监听到 /brokers/ids 节点的变化,选举节点的 Controller 会从 /borkers/topics 节点下拉取对应的 Leader 信息和 ISR 信息,选举出新的 Leader 并将新 Leader 的信息和 ISR 信息写入到 /borkers/topics 节点对应主题下的对应分区的 /state 节点下
Producer(生产者)工作原理
- main 线程调用 send() 方法发送数据,期间经过拦截器(如果有配置)对数据进行处理
- 数据处理完毕后,会经过序列化器进行序列化
- 序列化完毕后会把数据传给分区器,由分区器来决定数据发往哪个分区
- 分区器会把数据分成多批次(默认16k)存储在一个缓存队列(双端队列,默认32m)中,等待 sender 线程读取
- sender 线程会创建一个网络客户端,在每批次数据到达 batch.size(默认16k) 或等待时间到达 linger.ms(默认0毫秒) 后读取缓存队列中的数据来准备发送给 Kafka 集群,最多缓存5批次
- 在发送给 Kafka 集群之前,会先由选择器打通输入输出流,之后再将数据发送给 Kafka 集群
- 当 Kafka 集群收到数据后会进行应答,有以下三种应答级别:
- 0:接收到数据后,无需落盘即可应答
- 1:接收到数据后,Leader 落盘完毕后即可应答
- -1(all):接收到数据后,Leader 和 ISR(可以理解为和 Leader 保持同步的所有副本的集合) 队列里所有节点落盘完毕后才可以应答
- 如果数据发送成功,sender 线程会清除网络客户端中缓存的已发送的数据,之后通知缓存队列清除已发送的数据
- 如果发送不成功,sender 线程会进行重试(重试次数默认为 int 的最大值,即 2147483647)
Consumer(消费者)工作原理
- broker 集群先选出一个 coordinator(辅助实现消费者组的初始化和分区的分配) 节点
- coordinator 节点选取是先通过 groupid 的 hashcode 值与 _consumer_offset 的分区数量(默认50)进行取余所得的值假设为 5,那么 _consumer_offset 主题的 5 号分区在哪个 broker 节点上,那么这个节点的 cooreinator 就作为消费者组的老大,消费者组中的所有消费者提交 offset 时都往这个分区提交()
- 消费者组中的所有消费者会向选出来的 coordinator 发送连接请求
- coordinator 会随机选择其中一个消费者作为 Leader,并把消费的 topic 相关信息发送给 Leader
- Leader 会根据分区分配策略制定一个消费方案,并把制定好的消费方案返回给 coordinator
- coordinator 会把收到的消费方案分发给消费者组中的每个消费者
- 每个消费者会和 coordinator 保持心跳(默认3秒)一旦超过 session.timeout.ms(默认45秒)没有没有发送心跳或消费者处理消息时间超过 max.poll.interval.ms(默认5分钟)都会触发再平衡,至此消费者初始化完毕
- 当消费者准备消费数据时,会先调用 sendFetches() 方法创建一个 ConsumerNetworkClient(消费者网络客户端)对象
- ConsumerNetworkClient 会根据 Fetch.min.bytes(默认1 byte)、Fetch.max.wait.ms(默认500毫秒)、Fetch.max.bytes(默认50m)三个参数调用 send() 方法发送发送请求,broker 收到请求后,会把数据通过 onSeccess() 回调方法把数据传给消费者
- 消费者会把数据放在消息队列中,消费者每次读取 Max.poll.records(默认500)条消息进行消费
- 消费消息首先会经过反序列化器对数据进行反序列化
- 接着会经过拦截器(如果有配置)对数据进行相关处理
- 之后会把数据交给下游服务来对数据进行处理
补充说明
Consumer Group:消费者组,简称 CG 是由多个 groupid 相同的 Consumer(消费者)组成
- 消费者必须存在消费者组内,哪怕这个消费者组中只有一个消费者
- 消费者组内每个消费者负责消费不同分区的数据,一个分区的数据只能被同一消费者组中的一个消费者消费,但一个消费者可以消费多个分区的数据
- 消费者组之间互不影响,即消费者组是逻辑上的订阅者