使用场景
消息队列:削峰,解耦(服务间调用从直接的rpc、http调用改为主动拉取)技术对比
类似技术方案:rabbitMQ、 memcache、 rocketMQ kafka 优点-
高吞吐量:
单机每秒处理几十上百万的消息量。即使存储了TB及消息,也保持稳定的性能。
- 零拷贝 减少内核态到用户态的拷贝,磁盘通过sendfile实现 DMA 拷贝Socket buffer
- 顺序读写 充分利用磁盘顺序读写的超高性能
- 页缓存mmap ,将磁盘文件 映射 到内存, 用户通过修改内存就能修改磁盘文件。
- 高性能: 单节点支持上千个客户端,并保证零停机和零数据丢失。
- 持久化: 将消息持久化到磁盘。通过将数据持久化到硬盘以及replication防止数据丢失。
- 分布式系统 ,易扩展。所有的组件均为分布式的,无需停机即可扩展机器。
- 可靠性 - Kafka是分布式,分区,复制和容错的。
- 客户端状态维护: 消息被处理的状态是在Consumer端维护,当失败时能自动平衡。
架构
物理架构&逻辑架构
核心概念
- producer:消息生产者。
- consumer:消息生产者。
- 消费组ComsumerGroup :同一个消费组中的消费者共享消费偏移,用于分布式架构下的并发消费,但同时只有1个选举出的leader消费者在实际消费对应partition中的消息(避免相同消息重复消费)。
- 生产者偏移offset: 消息写入的时候,每一个分区都有一个offset,即每个分区的最新最大的offset。
- 消费组偏移offset: 不同消费组中的消费者可以针对一个分区存储不同的Offset,互不影响。
- broker:kafka运行进程实例。
-
控制器(Kafka Controller):
负责管理整个集群中所有分区和副本的状态的主节点broker。
- 当某个分区的leader副本(一个分区会有多个副本,其中只有leader副本对外提供读写服务)出现故障时,由控制器负责为该分区选举新的leader副本;
- 当检测到某个分区的ISR集合发生变化时,由控制器负责通知所有broker更新其元数据信息;
- 当为某个Topic增加分区数量时,由控制器负责分区的重新分配。
- topic:消息集合, 类似数据库中的表
- partition:topic子概念-分区, 单个分区内是有序的,partition设置为一才能保证全局有序
-
LogSegment:
kafka内部消息存储结构,追加写,磁盘顺序写入
- 一个分区由多个LogSegment组成
- 一个LogSegment由 .log .index .timeindex 组成
- .log 追加是顺序写入的,文件名是以文件中第一条message的offset来命名的
- .Index 进行日志删除的时候和数据查找的时候可以快速定位。
- .timeStamp 则根据 时间戳查找对应的偏移量 。
- AR(Assigned Replicas): 是指为每个分区分配的副本集合。在Kafka中,每个分区可以有多个副本,其中一个副本被选举为leader,其他副本为follower。AR是指包括leader副本在内的所有副本的集合。
- ISR( in-sync replica) : 已同步的可用副本, 是指与leader副本保持同步的follower副本集合。只有处于ISR中的副本才会被认为是同步的,其他副本将被视为不可靠的。当follower副本无法及时跟上leader副本的同步进度时,它将被移出ISR,直到它能够追赶上来。ISR机制确保了数据的一致性和可靠性。
- OSR(Out-of-Sync Replicas): 是指与leader副本不同步的follower副本集合。当follower副本无法及时跟上leader副本的同步进度时,它将被移出ISR,并被标记为OSR。OSR副本将尝试追赶上来,一旦追赶上来并与leader副本保持同步,它将被重新添加到ISR中。
- HW(High Watermark):是指已经被确认的最高偏移量,代表了消费者可以安全地读取的消息位置。消费者只能消费高于HW的消息,确保消息的可靠性。
- LEO(Log End Offset):是指当前分区中最新的消息位置,包括已经写入但尚未被确认的消息。LEO是一个动态的值,随着消息的写入和确认而变化。
核心流程
生产者发送消息过程
主要流程:kafka 生产者客户端SDK -> kafka集群broker接受tcp请求生成logSegment落库 -> 副本同步 -> 更新偏移- Producer创建时,会创建一个Sender线程并设置为守护线程。
- 生产的消息先经过拦截器->序列化器->分区器,然后将消息缓存在缓冲区。
- 批次发送的条件为:缓冲区数据大小达到 batch.size 或者 linger.ms 达到上限。
-
批次发送后,发往指定分区,然后落盘到broker;
- acks=0 只要将消息放到缓冲区,就认为消息已经发送完成。
- acks=1 表示消息 只需要写到主分区 即可。在该情形下,如果主分区收到消息确认之后就宕机了,而副本分区还没来得及同步该消息,则该消息丢失。
- acks=all (默认) 首领分区会等待 所有的ISR副本分区确认记录 。该处理保证了只要有一个ISR副本分区存活,消息就不会丢失。
- 如果生产者配置了 retrires参数大于0并且未收到确认 ,那么客户端会对该消息进行重试。
- 落盘到broker成功,返回生产元数据给生产者。
集群
选举机制
1、老版本基于zookeeper 。 目前,Kafka 使用 ZooKeeper 存放集群元数据、成员管理、Controller 选举,以及其他一些管理类任务。- 存放元数据 是指主题分区的所有数据都保存在 ZooKeeper 中,其他“人”都要与它保持对齐。
- 成员管理 是指 Broker 节点的注册、注销以及属性变更等 。
- Controller 选举 是指选举集群 Controller,包括但不限于主题删除、参数配置等。
Kafka控制器选举原理
Kafka中的控制器选举工作依赖于Zookeeper,成功竞选成为控制器的broker会在Zookeeper中创建/controller临时节点。 每个broker会对/controller节点添加监听器,以此来监昕此节点的数据变化,当/controller节点发生变更,就会触发新一轮的选举。 临时节点的内容: {"version":1,"brokerid":0,"timestamp":"1593330804078"}- version:Kafka版本相关,对同一个Kafka版本来说为固定值。
- brokerid:表示成为控制器的broker的id编号。
- timestamp:表示竞选成为控制器时的时间戳(精确到毫秒)。
Broker选举
在任意时刻,集群中有且只有一个控制器,每个broker都会在内存中保存当前控制器的brokerid值,这个值标识activeControllerId。启动时选举
集群中第一个启动的broker会通过在zookeeper中创建临时节点/controller来让自己成为控制器,其他broker启动时 会去尝试读取/controller节点的brokerid的值,读取到的brokerid的值不为-1知道已经有其他broker节点成功竞选为控制器,就会在zookeeper中创建watch对象,便于它们收到控制器变更的通知。leader异常选举
那么如果broker由于网络原因与zookeeper断开连接或者异常退出,那么其他broker通过watch收到控制器变更的通知,就会去尝试创建临时节点/controller,如果有一个broker创建成功,那么其他broker就会收到创建异常通知,也就意味着集群中已经有了控制器,其他broker只需创建watch对象即可。follower异常
如果集群中有一个broker发生异常退出了,那么控制器就会检查这个broker是否有分区的副本leader,如果有那么这个分区就需要一个新的leader,此时控制器就会去遍历其他副本,决定哪一个成为新的leader,同时更新分区的ISR集合。broker加入
如果有一个broker加入集群中,那么控制器就会通过brokerid去判断新加入的broker中是否含有现有分区的副本,如果有,就会从分区副本中去同步数据。epoch防止脑裂
Kafka通过controller_epoch来保证控制器的唯一性,进而保证相关操作的一致性。- controller_epoch是一个整型值,存放在Zookeeper的/controller_epoch这个持久节点中;
- controller_epoch值用于记录控制器发生变更的次数,即记录当前的控制器是第几代控制器;
- controller_epoch的初始值为1,当控制器发生变更时,就将该字段值加1。
- 如果请求的controller_epoch值小于内存中的controller_epoch值,则认为这个请求是向已经过期的控制器发送的请求,那么这个请求会被认定为无效的请求。
- 如果请求的controller_epoch值大于内存中的controller_epoch值,那么说明已经有新的控制器当选了。
分区Leader的选举
controller感知到分区leader所在的broker挂了,controller会从replicas副本列表(同时在ISR列表里)中取出第一个broker作为leader。leader副本介绍
- leader副本负责维护和跟踪ISR集合中所有follower副本的滞后状态,当follower副本落后太多或失效时,leader副本会把它从ISR集合中剔除。
- 当OSR集合中有follower副本“追上”了leader副本,那么leader副本会把它从OSR集合转移至ISR集合。
- 默认情况下,当leader副本发生故障时,只有在ISR集合中的副本才有资格被选举为新的leader。
消费组Leader的选举
GroupCoordinator需要为消费组内的消费者选举出一个消费组的leader,这个选举的算法很简单,当消费组内还没有leader,那么第一个加入消费组的消费者即为消费组的leader,如果当前leader退出消费组,则会挑选以HashMap结构保存的消费者节点数据中,第一个键值对来作为leader。 2、Kafka 2.8.0,移除了对Zookeeper的依赖,通过KRaft进行自己的集群管理数据一致性
主副本机制、leader机制( ISR(in-sync replica)元数据维护)数据一致性等级ack
- acks=0 只要将消息放到缓冲区,就认为消息已经发送完成。
- acks=1 表示消息 只需要写到主分区 即可。在该情形下,如果主分区收到消息确认之后就宕机了,而副本分区还没来得及同步该消息,则该消息丢失。
- acks=all (默认) 首领分区会等待 所有的ISR副本分区确认记录 。该处理保证了只要有一个ISR副本分区存活,消息就不会丢失。
副本消息同步
首先,Follower 发送 FETCH 请求给 Leader。接着,Leader 会读取底层日志文件中的消 息数据,再更新它内存中的 Follower 副本的 LEO 值,更新为 FETCH 请求中的 fetchOffset 值。最后,尝试更新分区高水位值。Follower 接收到 FETCH 响应之后,会把消息写入到底层日志,接着更新 LEO 和 HW 值。数据一致性-leader机制
- Kafka会在Zookeeper上针对每个Topic维护一个称为ISR(in-sync replica)的集合;
- 当集合中副本都跟Leader中的副本同步了之后,kafka才会认为消息已提交;
- 只有这些跟Leader保持同步的Follower才应该被选作新的Leader;
-
假设某个topic有N+1个副本,kafka可以容忍N个服务器不可用,冗余度较低
如果ISR中的副本都丢失了,则:
- 可以等待ISR中的副本任何一个恢复,接着对外提供服务,需要时间等待;
- 从OSR中选出一个副本做Leader副本,此时会造成数据丢失;
- 原理是按照消费者总数和分区总数进行整除运算平均分配给所有的消费者;
- 订阅Topic的消费者按照名称的字典序排序,分均分配,剩下的字典序从前往后分配;
节点变动
消费组Rebalance
- 组成员数量发生变化
- 订阅主题数量发生变化
- 订阅主题的分区数发生变化