1. 工作流程以及文件存储机制
kafka 中的消息是以topic进行分类的,生产消费消息都是面向topic。
topic是逻辑上的概念,partition 分区是物理上的概念,每个分区对应一个log文件,该log文件存储的就是producer 生产的log 数据。producer生产的数据会追加到文件末端。消费者组中的每个消费者,都会实时记录自己消费到了哪个offset,以便出错时恢复从上次的位置继续消费。
存储结构:
存储结构类似于rocketMQ。一个topic 对应多个partion 分区,每个分区对应一个目录,命名为topicName-indexNum。
kafka采取分片和索引机制,每个分区下面分为多个Segment片段(文件大小超过1GB自动创建下一个Segment),分为多个Segment的目的是解决单个Log文件过大的问题。每个Segment 对应两个文件,".index"和".log" 文件。
每个Segment包含一个log文件、一个index文件。文件名称是20位,以当前 segment 的第一条消息的 offset 命名。 其内容以及作用:
“.index”文件存储大量的索引信息,“.log”文件存储大量的数据,索引文件中的元 数据指向对应数据文件中 message 的物理偏移地址。
log文件存放的是真实的数据(按固定位数添加一些消息整体偏移量、消息长度、消息体等属性,然后顺序追加到文件末尾)。
index: 其实就是一个索引,记录了一条消息在log文件中的位置,查找消息的时候先从index获取位置,然后就可以定位到消息在log文件具体哪个地方
index采用了稀疏索引的方式去存储,不是每来一条消息就记录一个索引,而是当消息大于某个值的时候,就会记录一次索引,默认是4KB
稀疏存储也就是选取一些消息的offset以及position进行存储,因为如果把对应片段的所有消息的索引都存储,那么必然会占用大量的内存。
index文件和log文件结构示意图:
测试:
# 创建topic
[root@VM-8-16-centos kafka_2.13-3.3.1]# bin/kafka-topics.sh --create --topic test001 --bootstrap-server localhost:9092 --partitions 4 --replication-factor 1
Created topic test001.
# 查看目录
[root@VM-8-16-centos kafka_2.13-3.3.1]# ls -l /tmp/kraft-combined-logs/ | grep 001
drwxr-xr-x 2 root root 4096 1月 17 14:55 test001-0
drwxr-xr-x 2 root root 4096 1月 17 14:55 test001-1
drwxr-xr-x 2 root root 4096 1月 17 14:55 test001-2
drwxr-xr-x 2 root root 4096 1月 17 14:55 test001-3
# 查看某一个目录
[root@VM-8-16-centos test001-0]# ll -h /tmp/kraft-combined-logs/test001-0/
总用量 1.1G
-rw-r--r-- 1 root root 513K 1月 17 15:36 00000000000000000000.index
-rw-r--r-- 1 root root 1.0G 1月 17 15:36 00000000000000000000.log
-rw-r--r-- 1 root root 747K 1月 17 15:36 00000000000000000000.timeindex
-rw-r--r-- 1 root root 10M 1月 17 15:37 00000000000045646443.index
-rw-r--r-- 1 root root 91M 1月 17 15:37 00000000000045646443.log
-rw-r--r-- 1 root root 10 1月 17 15:36 00000000000045646443.snapshot
-rw-r--r-- 1 root root 10M 1月 17 15:37 00000000000045646443.timeindex
-rw-r--r-- 1 root root 8 1月 17 15:22 leader-epoch-checkpoint
-rw-r--r-- 1 root root 43 1月 17 15:22 partition.metadata
2. 生产者
1. 分区策略
- 分区原因
(1). 方便在集群中扩展,每个partition可以调整以适应所在的机器;一个topic 可以由多个partition 组成,因此整个集群就可以适应任意大小的数据。
(2). 可以提高并发,可以以partition 为单位进行读写。
- 分区原则
(1). 生产的时候指定了partition 直接按partition 发送
(2). 生产时候指定了key,hash(key)%length(partition) 得到partition 值
(3) 未指定partition 也未传key的情况下,第一次随机生成一个整数(后续每次都递增这个数), num % length(partition) 得到partition 值。 类似于round-robin 轮询算法。
2. 数据可靠性保证
为保证producer发送的数据能可靠的到达指定的topic。topic的每个partition 在收到producer 的消息之后都需要向producer 发送ack(acknowledgement 确认收到),如果producer 收到ack 则进行下一轮消息的发送、否则重新发送消息。
1. 副本数据同步策略
- 半数以上同步完成就发送ack。 优点是延迟低;缺点是:选举新的leader 时,容忍n台节点的故障,需要2N+1个副本。
- 全部同步完成才发生ack。优点是:选举新的leader 时,容忍n台节点的故障,需要n+1 个副本;缺点是延迟高。
kafka 选取的是方式二,原因是:
- 同样为了容忍n台节点的故障,方案一需要2n+1个副本,方案二需要n+1个副本。对于kafka 来说,每个partition 都有大量的数据,方案一会造成大量冗余数据。
- 方案二虽然网络延迟比较高,网络延迟对kafka 的影响比较小。
ISR: (In-sync-replica set)
假设如下场景:
leader 收到数据,所有follower 都开始同步数据,但有一个follower 因为故障一直未能同步成功,按照方案二leader 就要一直等下去。
解决方案:
leader 维护了一个ISR,意为和leader 保持同步的follower 集合。当ISR 中的follower 完成数据的同步之后,leader 就会给producer 发送ack。如果follower 长时间未向leader 同步数据,则将该follower 剔出ISR。该时间由replica.lag.time.max.ms 控制。 leader 发生故障之后,从ISR中重新选择leader。
2. ack 应答机制
0: producer 不等待broker 的ack,最低延迟,broker 收到消息未落盘就ack,当broker 故障时可能造成数据丢失。只会发生一次,At Most Once。
1:producer 等待broker 的ack。leader 落盘成功才ack,如果在follower 同步成功之前leader 发生故障,有可能数据丢失。
-1:all, leader和follower 都落盘成功才ack。如果follower 同步之后,broker发送ack之前,leader 发生故障,有可能数据重复。不会丢失数据, At Least Once.
3. 故障处理
LEO:每个副本的最大offset
HW:消费者能见到的最大的offset,ISR队列中最小的LEO
- follower 故障
follower 故障后会被临时剔出ISR,恢复后读取本地磁盘记录的上次的HW,并将log 文件高于HW部分的截掉,从HW开始向leader 进行同步。 等待follower 的LEO大于等于该partition 的HW,即follower 追上leader 之后即可重新加入ISR。
- leader 故障
从ISR 重新选一个新leader。为保证数据一致性,其余的follower 会先将各自的log文件高于HW部分的截掉,然后重新向leader同步数据。
这里只保证副本之间的数据一致性,不能保证数据不丢失或数据重复。
4. 幂等性
producer 不论向server 发送多少条消息,server 端只会持久化一条。
At least once + 幂等性 = Exactly Once
要启动幂等性,只需要将producer 的参数中的 enable.idompotence 设置为true 即可。kafka 实现是将去重放在了上游。 开启幂等性的producer 会在初始化的时候分配一个PID,发往同一partition 的消息会附带Sequence Number。 而broker 会对<pid, partition, SequenceNumber> 做缓存,当具有相同主键的消息提交时,broker 只会持久化一条。
pid 重启就会变,同时不同的partition 也具有不同的主键, 所以幂等性无法保证跨分区跨回话的Exactly Once。
3. 消费者
kafka 一个consumer group 的consumer 可以订阅多个topic,比如:
# terminal1
bin/kafka-console-consumer.sh --topic myTest1 --group g1 --bootstrap-server localhost:9092
# terminal2
bin/kafka-console-consumer.sh --topic myTopic1 --group g1 --bootstrap-server localhost:9092
# 查看group 信息
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
g1 myTopic1 2 0 0 0 console-consumer-0076a6e5-3137-4ea1-9c13-152ef28769a5 /43.143.155.103 console-consumer
g1 myTopic1 5 0 0 0 console-consumer-0076a6e5-3137-4ea1-9c13-152ef28769a5 /43.143.155.103 console-consumer
g1 myTopic1 8 0 0 0 console-consumer-0076a6e5-3137-4ea1-9c13-152ef28769a5 /43.143.155.103 console-consumer
g1 myTopic1 9 0 0 0 console-consumer-0076a6e5-3137-4ea1-9c13-152ef28769a5 /43.143.155.103 console-consumer
g1 myTopic1 3 0 0 0 console-consumer-0076a6e5-3137-4ea1-9c13-152ef28769a5 /43.143.155.103 console-consumer
g1 myTopic1 10 0 0 0 console-consumer-0076a6e5-3137-4ea1-9c13-152ef28769a5 /43.143.155.103 console-consumer
g1 myTopic1 0 5 5 0 console-consumer-0076a6e5-3137-4ea1-9c13-152ef28769a5 /43.143.155.103 console-consumer
g1 myTopic1 4 0 0 0 console-consumer-0076a6e5-3137-4ea1-9c13-152ef28769a5 /43.143.155.103 console-consumer
g1 myTopic1 1 1 1 0 console-consumer-0076a6e5-3137-4ea1-9c13-152ef28769a5 /43.143.155.103 console-consumer
g1 myTopic1 7 0 0 0 console-consumer-0076a6e5-3137-4ea1-9c13-152ef28769a5 /43.143.155.103 console-consumer
g1 myTopic1 11 1 1 0 console-consumer-0076a6e5-3137-4ea1-9c13-152ef28769a5 /43.143.155.103 console-consumer
g1 myTopic1 6 0 0 0 console-consumer-0076a6e5-3137-4ea1-9c13-152ef28769a5 /43.143.155.103 console-consumer
g1 myTest1 0 3 3 0 console-consumer-22834513-e91b-4c76-9b01-351f51cd5285 /43.143.155.103 console-consumer
1. 消费模式
推拉一般说的是consumer和broker 之间的数据交互,kafka 是拉模式。
push 模式很难适应效率速率不同的消费者,因为消息发送速率是由broker 决定的。它的目标是以最快的速度投递消息,但是这样很容易造成消费者来不及处理消息。典型的表现就是拒绝服务或者网络拥塞。
pull模式则可以根据consumer 的消费能力拉取消息。pull 模式的不足之处是,kafka 中没有数据时,消费者可能陷入空循环,一直返回空数据。针对这一点,kafka 的消费者在消费数据时会传入一个时长参数timeout,如果没有数据可消费则等等timeout 之后再返回。
2. 分区分配策略
一个consumer group 有多个consumer,一个topic 有多个partition。所以需要分配partition,即决定哪个partition 由哪个consumer 消费。kafka 有两种策略,RoundRobin 和 Range。
- range: 对同一个topic中的partition按照序号排序,并对consumer按照字典顺序排序。假设分区:0, 1, 2, 3, 4, 5, 6, 7, 8, 9;CG1中消费者线程为C0-0、C1-0、C1-1。然后因为 10除3除不尽,那么消费者线程C0-0将会多分配分区,所以分区分配之后结果如下:
C0-0 将消费0、1、2、3分区 C1-0 将消费4、5、6分区 C1-1 将消费7、8、9分区
- RoundRobin:如果同一个消费组内所有的消费者的订阅信息都是相同的,那么RoundRobinAssignor策略的分区分配会是均匀的。举例,假设消费组中有2个消费者C0和C1,都订阅了主题t0和t1,并且每个主题都有3个分区,那么所订阅的所有分区可以标识为:t0p0、t0p1、t0p2、t1p0、t1p1、t1p2。最终的分配结果为
消费者C0:t0p0、t0p2、t1p1 消费者C1:t0p1、t1p0、t1p2
3. Offset 维护
Offset 用于consumer 实时记录自己的消费位置,便于故障恢复后继续消费。
kafka从0.9 版本之后,将offset 保存在一个内置的topic 中,该topic名为 __consumer_offsets。
4. 高效读写数据
- 顺序写磁盘
kafka的producer生产数据,要写入到log 文件中,写的过程是一直追加到文件末端,为顺序写。官网有数据表明,同样的磁盘,顺序写能达到600M/S,随机写只有100K/S。这与磁盘的机械机构有关,顺序写之所以快,是因为其省去了大量磁头寻址的时间。
- 零拷贝
5. 事务
事务可以保证kafka 在Exactly Once 语义的基础上,生产和消费可以跨分区和会话,要么全部成功,要么全部失败。 kafka中的事务可以使应用程序将消费消息,生产消息、提交消费位移当作原子操作来处理,同时成功或者失败,即使该生产或消费跨越多个分区。
为了实现EOS(exactly once semantics,精确一次处理语义)karka从0.11.0.0版本开始引入了幂等性和事务两个特性来支撑。
1. producer 事务
为了实现跨分区跨会话的事务,需要引入一个全局唯一的TransactionID,并将producer 获得的PID和TransactionID 进行绑定。这样当Producer 重启后就可以通过正在进行的TransactionID 获得原来的PID。
为了管理Transaction,kafka 引入了新的组件Transaction Coordinator。Producer 就是和Coordinator 交互获得TransactionID 对应的任务状态。Coordinator 还负责将事务写入kafka 的一个topic,这样即使服务重启,由于事务状态得到保存,进行中的事务状态可以得到恢复,从而继续进行。
2. consumer 事务
对于consumer 而言,事务的保证相对较弱,无法保证commit 的信息被精确消费。
6. kafka rebalance 机制
1. 发生时机
同一个consumer 消费者组 group.id 中,新增了消费者
消费者组中有消费者下线
topic 的分区数量发生变化
消费者主动取消订阅
2. relabance 过程
- 所有成员向Coordinator 发送请求,请求入组。 一旦所有成员都发送了请求,Coordinator 会从中选择一个consumer 担任leader 的角色,并把组成员信息以及订阅信息发给leader。
- leader 开始分配消费方案,指明哪个consumer 负责消费哪些topic 的哪些partition。一旦完成分配,leader会将这个方案发给coordinator。coordinator接收到分配方案之后会把方案发给各个consumer,这样组内的所有成员就都知道自己应该消费哪些分区了。
【当你用心写完每一篇博客之后,你会发现它比你用代码实现功能更有成就感!】