概述
Kafka作为商业级消息中间件,消息的可靠性保障是非常重要。那Kafka是怎么保障消息的可靠性的呢?
上图是Kafka的消息发送基础架构,一条消息的完整生命周期是:
- 生产者发送消息至Kafka集群
- Kafka集群保存消息至磁盘
- 消费拉取消息,成功消费后提交位移Offset
Kafka作为分布式系统,通过数据副本持久化在不同的节点上来保障数据的可靠性。当某一个节点上存储的数据丢失时,可以从副本上读取该数据,这也是解决分布式系统数据丢失问题最有效的手段。那么,Kafka里面副本是怎么设计的?副本之间的数据同步是怎么进行的?多副本之间数据的一致性是如何解决的?
副本基础概念
- Kafka里面副本是相对分区而言的,也就是说只有分区有副本
- 副本分leader副本和follower副本,每个副本处于不同broker节点上,只有leader副本对外提供读写服务,follower副本只负责从leader副本同步数据
- 分区中所有的副本统称为AR
- 与leader副本数据保持同步状态的副本集合叫ISR,leader副本本身也是ISR中的一员
- 副本还分为本地副本和远程副本,本地副本是指对应的Log分配在当前的broker节点上,远程副本是指对应的Log分配在其他broker节点上
高水位HW&LEO
首先说起水位,在教科书里面,是这么描述水位的:
在时刻T,任意创建时间(Event Time)为T',且T' <= T的所有事件都已经到达或被观测到,那么T就被定义为水位
如图,蓝色标注的"Completed"表示已完成的工作,红色标注的"In-Flight"表示正在进行中的工作,俩者的边界就是水位。水位一词多用在流式处理领域,比如,在Spark Streaming或Flink里面都有水位的概念。
在Kafka的世界中,水位用来消息位移来表征的。而且在Kafka里面,通常只说高水位(High Watermark)
高水位的作用
在Kafka中,高水位主要有俩个作用:
- 定义消息可见性,即标识分区下哪些消息是可以被消费者获取到的
- 帮助Kafka完成副本同步
HW和LEO是副本对象俩个很重要的属性,每个副本都有自己的HW和LEO。Kafka用leader副本的HW来定义分区的HW。
高水位更新机制
在分区副本中,所有副本都保存自己的HW和LEO,而leader副本则还保存了远程副本的LEO,如下图:
leader副本保存所有副本的LEO主要的目的是确定leader的HW,也就是分区的HW
每个副本HW和LEO更新机制表如下:
更新对象 | 更新时机 |
leader副本的LEO | leader副本接收到生产者发送的消息,写入本地磁盘后,会更新其LEO值 |
leader副本的HW | 主要是俩个时机: 1.leader副本更新其LEO之后 2.更新完远程副本的LEO 算法:取leader副本和所有远程副本中最小的LEO |
leader副本上远程副本的LEO | follower副本在拉取数据的时候,会带上自己的LEO,leader副本会用这个值来更新远程副本LEO |
follower副本的LEO | follower副本从leader副本拉取消息,写入本地副本后,更新其LEO |
follower副本的HW | follower副本在更新完自己的LEO后,会比较leader返回的HW和本地LEO,取较小的值来更新HW |
在每个broker上Kafka的根目录上有俩个文件:
- recovery-point-offset-checkpoint:保存每个分区的LEO,Kafka中有一个定时任务会将所有分区的LEO刷写到恢复点文件recovery-point-offset-checkpoint中。
- replication-offset-checkpoint: 保存每个分区的HW,Kafka中有一个定时任务会将所有分区的HW刷写到恢复点文件replication-offset-checkpoint中。
副本同步机制解析
- 首先,生产者向leader写入一条消息,leader先将本地的LEO更新为1
- follower副本向leader发送拉去数据请求(延迟操作),这个时候会带上自己LEO=0(通过fetchOffset传递)
- leader收到请求后,会更新自己RemoteLEO=0,然后取min(LEO, RemoteLEO) = 0 来更新自己HW=0
- leader返回数据和自己的HW=0给follower,follower收到数据后,将数据写入本地,更新LEO=1,然后取leader发来的HW和本地LEO最小值来更新自己的HW
可以从上面一轮请求看出,这个时候消息已经同步完成了,但是副本各自的HW还没有完成更新,也就是消息对消费者是不可见的,这个时候需要在下一轮的拉取中被更新,如下图:
- 在新一轮的拉去请求中,由于位移值是0的消息已经拉取成功,因此follower副本这次请求拉去的位移值为1的消息
- leader副本接收此请求后,更新Remote LEO=1,然后更新leader的HW=1,然后才会将更新过的高水位值1发送给Follower副本
- Follower副本接收到以后,也将自己的高水位值更新为1
至此,一个完整的消息同步周期就结束了。
消息丢失&消息一致性问题
以下场景假设min.insync.replicas参数配置为1
消息丢失场景
通过上面副本同步机制可以看到,在B副本写入消息m2后,需要新的一轮FetchRequest/FetchResponse才能更新自己的HW为2
如果在这个时候B副本发生了重启,那么在重启之后,B副本会根据自己之前的HW位置(这个值会存入本地的复制点文件replication-offset-checkpoint)进行日志截断,这样便会吧m2消息删除掉,然后在向A副本发送请求拉取消息
这个时候,如果A宕机了,那么B就会被选为leader。A恢复后会成为follower,由于follower副本的HW不能比leader副本的HW高,所以这里会做一次日志截断,这样A就会将自己的HW调整为1。
那么,这样一来,m2消息就丢失了。
数据不一致场景
如上图,A副本里面有俩条消息m1和m2,并且HW和LEO都为2;B 副本里面有一条消息m1,并且HW和LEO都为1,假设A和B同时挂掉
然后B副本先恢复过来并成为了leader,这个时候B写入了消息m3,并将LEO和HW都更新为2了
这个时候A恢复过来了,成为了follower副本,这个时候发现本地HW和leader的HW都为2,正好不用做日志截断
如此一来,副本中offset为1存储的消息不一样,这个时候就出现了数据不一致的情况。
Leader Epoch机制引入
为了解决上述的俩种问题,Kafka从0.11.0.0开始引入了leader epoch的概念,在需要截断数据的时候使用leader epoch作为参考依据而不是原本的HW。
leader epoch代表leader的纪元信息,初始值为0。每当leader变更一次,leader epoch的值就会加1,相当于为leader增设了一个版本号。与此同时,每个副本中还会增设一个矢量<LeaderEpoch => StartOffset>,其中StartOffset表示当前Leader Epoch下写入的第一条消息的偏移量。
每个副本的Log下都有一个leader-epoch-checkpoint文件,在发生leader epoch变更时,会将对应的矢量对追加到这个文件中。
下面看看leader epoch怎么解决数据丢失和数据不一致的问题。
首先看下数据丢失场景:
- 同样B发生重启,之后B不是先根据自身的HW截断日志,而是先发送OffsetsForLeaderEpochRequest请求给A,其中包含了自身的LE=0
- 如果A本身的LE和B发送过来的LE相等,将自己的LEO返回给B即可;如果A本身的LE和B不同,那边A会查找LE_B+1对应的Start Offset返回给A,也就是LE_B对应的LEO,所以我们可以将OffsetForLeaderEpochRequest的请求看作用来查找follower副本当前LeaderEpoch的LEO。
- 如图,B收到A返回的LE_B=2,然后对比自身的LEO,发现相同,也就不需要截断日志了,这样消息m2就得到保留了
之后,如果A发生了宕机,这个时候B成为了leader,不管后面A是否恢复,后续的消息都可以已LE1为Leader Epoch陆续追加到B中。
下面看下Leader Epoch如何应对数据不一致的场景:
- 首先当A为leader,B为follower的时候,A中有俩条消息m1和m2,B中有一条消息m1,假设A和B同时挂掉,然后B第一个恢复过来,成为了新的leader
- 之后B写入消息m2,并将LEO和HW更新为2,不过,此时是以Leader Epoch = 1写入的
- 紧接着A恢复过来,成为了follower,并向B发送了OffsetsForLeaderEpochRequest请求,这个时候A的Leader Epoch为LE0
- B收到请求后,查找LE0的LEO是多少,也就是LE1的start Offset返回给A,也就是1
- A收到自己所属的Leader Epoch的LEO后,截断自身的日志,也就是把消息 m2截断了,之后在向B发送FetchRequest请求同步数据
- 最终,A和B中都有俩条消息m1和m3,HW和LEO都为2,并且Leader Epoch都为1
如此,便解决了数据不一致的问题。
总结
- 就可靠性本身而言,并不是一个可用简单用是或否描述的,一般是采用几个9来衡量的
- 生产者客户端参数acks,相比于0和1,acks=-1可以最大程度的提高消息的可靠性
- 对于Kafka服务端而言,分区的副本数越多越能够保障数据的可靠性,不过副本数越多也会引起磁盘、网络带宽的浪费,同时会引起性能的下降。这是一个需要根据实际场景平衡的选项
- 对于消费者而言,如果是自动提交消费位移的话,会发生消息消费失败但是位移提交成功的情况,这样会造成数据丢失,这个时候需要消费者手动提交位移,并且保证在位移提交之前消息消费成功。当然,可能会发生重复消费,消息幂等性需要消费者自行保证