Kafka 日志存储及其清除策略
日志存储结构
Kafka存储结构图:
kafka 中消息是以主题 topic 为基本单位进行归类的,这里的 topic 是逻辑上的概念,实际上在磁盘存储是根据分区存储的,每个主题可以分为多个分区、分区的数量可以在主题创建的时候进行指定。例如下面 kafka 命令创建了一个 topic 为 test 的主题、该主题下有 4 个分区、每个分区有两个副本保证高可用。
./bin/kafka-topics.sh --create --zookeeper 127.0.0.1:2181 --replication-factor 2 --partitions 4 --topic test
分区的修改除了在创建的时候指定。还可以动态的修改。如下将 kafka 的 test 主题分区数修改为 12 个
./kafka-topics.sh --alter --zookeeper 127.0.0.1:2181 --topic test --partitions 12
分区内每条消息都会被分配一个唯一的消息 id,也就是我们通常所说的 offset, 因此 kafak 只能保证每一个分区内部有序性,不能保证全局有序性。
如果分区设置的合理,那么所有的消息都可以均匀的分布到不同的分区中去,这样可以实现水平扩展。不考虑多副本的情况下,一个分区对应一个 log 日志、如上图所示。为了防止 log 日志过大,kafka 又引入了日志分段(LogSegment)的概念,将 log 切分为多个 LogSegement,相当于一个巨型文件被平均分配为相对较小的文件,这样也便于消息的维护和清理。事实上,Log 和 LogSegement 也不是纯粹物理意义上的概念,Log 在物理上只是以文件夹的形式存储,而每个 LogSegement 对应于磁盘上的一个日志文件和两个索引文件,以及可能的其他文件(比如以".txindex"为后缀的事务索引文件)。
kafak 中的 Log 对应了一个命名为<topic>-<partition>
的文件夹。举个例子、假如有一个 test 主题,此主题下游 3 个分区,那么在实际物理上的存储就是 "test-0","test-1","test-2" 这三个文件夹。
向 Log 中写入消息是顺序写入的。只有最后一个 LogSegement 才能执行写入操作,在此之前的所有 LogSegement 都不能执行写入操作。为了方便描述,我们将最后一个 LogSegement 成为"ActiveSegement",即表示当前活跃的日志分段。随着消息的不断写入,当 ActiveSegement 满足一定的条件时,就需要创建新的 activeSegement,之后在追加的消息写入新的 activeSegement。
为了便于消息的检索,每个 LogSegement 中的日志文件(以".log" 为文件后缀)都有对应的两个文件索引:偏移量索引文件(以".index" 为文件后缀)和时间戳索引文件(以".timeindex"为文件后缀)。每个 LogSegement 都有一个“基准偏移量” baseOffset,用来标识当前 LogSegement 中第一条消息的 offset。偏移量是一个 64 位的长整形。日志文件和两个索引文件都是根据基准偏移量(baseOffset)命名的,名称固定为 20 位数字,没有达到的位数则用 0 填充。比如第一个 LogSegment 的基准偏移量为 0,对应的日志文件为 00000000000000000000.log
示例中第 2 个 LogSegment 对应的基准位移是 256,也说明了该 LogSegment 中的第一条消息的偏移量为 256,同时可以反映出第一个 LogSegment 中共有 256 条消息(偏移量从 0 至 255 的消息)。
注意每个 LogSegment 中不只包含“.log”“.index”“.timeindex”这 3 种文件,还可能包含“.deleted”“.cleaned”“.swap”等临时文件,以及可能的“.snapshot”“.txnindex”“leader-epoch-checkpoint”等文件。
日志索引
kafka的日志文件索引是用来快速检索日志的,在kafka中日志索引分为2种类,kafka中索引以稀疏索引的方式构建索引,它不保证每个消息在索引文件中都存在索引。
每当写入一定数量log.index.interval.bytes default(4KB = 4096)的时候,偏移量索引以及时间戳索引各自创建一个对应的索引项,我们可以通过该参数调整索引的密度,通过MappedByteBuffer将索引文件映射到内存中。
日志分段文件切分条件如下
- 当日志分段文件的大小超过log.segment.bytes=1073741824(1GB)时;
- 当日志分段中的最大时间戳与当前系统的差值大于log.roll.ms或log.roll.hours,默认只配置了log.roll.hours =168(7天),前者优先级高
- 偏移量索引文件或者时间戳索引文件大小超过brokerlog.index.size.max.bytes=10MB
- 新追加消息的offset-baseOffset > Integer.MAX_VALUE时,也就是相对位移过大,用Integer-4个字节存不下了。
offset索引
offset索引格式
偏移量索引分为2个部分,总共占8个字节
具体的偏移量索引项如下图
- relativeOffset(4B)
- 消息的相对偏移量,即
offset - baseOffset
,其中baseOffset为整个segmentLogFile的起始消息的offset。 - 平常的offset占用8个Byte,而ralativeOffset只需要占用4个Byte
- 消息的相对偏移量,即
- position(4B)
- 物理地址,也就是日志在分段日志文件中的实际位置。
查看日志以及索引文件的方式
#以下2种方式都行
bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files /cxxxxxx.log
bin/kafka-dump-log.sh --files /xxxxx.index
>>>>>>
(base) bogon:topic1-0 shufang$ kafka-dump-log.sh --files 00000000000000000000.index
Dumping 00000000000000000000.index
offset: 45 position: 4140 #代表团一个RecordBatch
offset: 90 position: 8266 #代表另一个RecordBatch
offset索引的检索流程
假如有以下索引文件与分段日志文件,我们该如何找到偏移量为23的消息数据?
- 首先通过二分法找到不大于23的最大偏移量索引【22,656】;
- 然后从position(656)开始顺序查找偏移量为23的消息。
注意⚠️:log.index.size.max.bytes
必须是8的整数倍,如果你设置成67,那么系统会默认帮你纠正成64。
时间戳索引
时间戳索引格式
时间戳索引分为2部分,公占12个字节,具体索引样式如下
- timestamp
- 当前日志分段文件中建立索引的消息的时间戳
- 为了保证时间戳大的单调递增,我们可以将
log.message.timestamp.type
设置成logApendTime,而CreateTime不能保证
- relativeoffset
- 时间戳对应消息的相对偏移量
具体时间戳索引的检索流程
- 首先在时间戳索引文件中找到不大于该时间戳的最大时间戳对应的最大索引项【1526384718283,28】;
- 然后在偏移量索引文件中检索不超过对应relativeoffset(28)的最大偏移量索引的项【26,838】;
- 然后按照偏移量索引的检索方式找到对应的具体消息。
注意⚠️:时间戳索引文件的大小必须为12B的倍数。
Kafka消息格式
日志清除策略
Kafka将消息存储在磁盘中,为了控制磁盘占用空间的不断增加就需要对消息做一定的清理操作。Kafka中每一个分区partition都对应一个日志文件,而日志文件又可以分为多个日志分段文件,这样也便于日志的清理操作。Kafka提供了两种日志清理策略:
日志删除(Log Deletion):按照一定的保留策略来直接删除不符合条件的日志分段。
日志压缩(Log Compaction):针对每个消息的key进行整合,对于有相同key的的不同value值,只保留最后一个版本。
我们可以通过broker端参数log.cleanup.policy来设置日志清理策略,此参数默认值为“delete”,即采用日志删除的清理策略。如果要采用日志压缩的清理策略的话,就需要将log.cleanup.policy设置为“compact”,并且还需要将log.cleaner.enable(默认值为true)设定为true。通过将log.cleanup.policy参数设置为“delete,compact”还可以同时支持日志删除和日志压缩两种策略。日志清理的粒度可以控制到topic级别,比如与log.cleanup.policy对应的主题级别的参数为cleanup.policy,为了简化说明,本文只采用broker端参数做陈述,如若需要topic级别的参数可以查看官方文档。
日志删除-Log Deletion
Kafka日志管理器中会有一个专门的日志删除任务来周期性检测和删除不符合保留条件的日志分段文件,这个周期可以通过broker端参数log.retention.check.interval.ms来配置,默认值为300,000,即5分钟。当前日志分段的保留策略有3种:基于时间的保留策略、基于日志大小的保留策略以及基于日志起始偏移量的保留策略。
基于时间
日志删除任务会检查当前日志文件中是否有保留时间超过设定的阈值retentionMs来寻找可删除的的日志分段文件集合deletableSegments,参考下图所示。retentionMs可以通过broker端参数log.retention.hours、log.retention.minutes以及log.retention.ms来配置,其中log.retention.ms的优先级最高,log.retention.minutes次之,log.retention.hours最低。默认情况下只配置了log.retention.hours参数,其值为168,故默认情况下日志分段文件的保留时间为7天。
查找过期的日志分段文件,并不是简单地根据日志分段的最近修改时间lastModifiedTime来计算,而是根据日志分段中最大的时间戳largestTimeStamp来计算。因为日志分段的lastModifiedTime可以被有意或者无意的修改,比如执行了touch操作,或者分区副本进行了重新分配,lastModifiedTime并不能真实地反映出日志分段在磁盘的保留时间。要获取日志分段中的最大时间戳largestTimeStamp的值,首先要查询该日志分段所对应的时间戳索引文件,查找时间戳索引文件中最后一条索引项,若最后一条索引项的时间戳字段值大于0,则取其值,否则才设置为最近修改时间lastModifiedTime。
若待删除的日志分段的总数等于该日志文件中所有的日志分段的数量,那么说明所有的日志分段都已过期,但是该日志文件中还要有一个日志分段来用于接收消息的写入,即必须要保证有一个活跃的日志分段activeSegment,在此种情况下,会先切分出一个新的日志分段作为activeSegment,然后再执行删除操作。
删除日志分段时,首先会从日志文件对象中所维护日志分段的跳跃表中移除待删除的日志分段,以保证没有线程对这些日志分段进行读取操作。然后将日志分段文件添加上“.deleted”的后缀,当然也包括日志分段对应的索引文件。最后交由一个以“delete-file”命名的延迟任务来删除这些“.deleted”为后缀的文件,这个任务的延迟执行时间可以通过file.delete.delay.ms参数来设置,默认值为60000,即1分钟。
基于日志大小
日志删除任务会检查当前日志的大小是否超过设定的阈值retentionSize来寻找可删除的日志分段的文件集合deletableSegments,参考下图所示。retentionSize可以通过broker端参数log.retention.bytes来配置,默认值为-1,表示无穷大。注意log.retention.bytes配置的是日志文件的总大小,而不是单个的日志分段的大小,一个日志文件包含多个日志分段。
基于日志大小的保留策略与基于时间的保留策略类似,其首先计算日志文件的总大小size和retentionSize的差值diff,即计算需要删除的日志总大小,然后从日志文件中的第一个日志分段开始进行查找可删除的日志分段的文件集合deletableSegments。查找出deletableSegments之后就执行删除操作,这个删除操作和基于时间的保留策略的删除操作相同,这里不再赘述。
基于日志起始偏移量
一般情况下日志文件的起始偏移量logStartOffset等于第一个日志分段的baseOffset,但是这并不是绝对的,logStartOffset的值可以通过DeleteRecordsRequest请求、日志的清理和截断等操作修改。
基于日志起始偏移量的删除策略的判断依据是某日志分段的下一个日志分段的起始偏移量baseOffset是否小于等于logStartOffset,若是则可以删除此日志分段。参考上图,假设logStartOffset等于25,日志分段1的起始偏移量为0,日志分段2的起始偏移量为11,日志分段3的起始偏移为23,那么我们通过如下动作收集可删除的日志分段的文件集合deletableSegments:
- 从头开始遍历每个日志分段,日志分段1的下一个日志分段的起始偏移量为11,小于logStartOffset的大小,将日志分段1加入到deletableSegments中;
- 日志分段2的下一个日志偏移量的起始偏移量为23,也小于logStartOffset的大小,将日志分段2页加入到deletableSegments中;
- 日志分段3的下一个日志偏移量在logStartOffset的右侧,故从日志分段3开始的所有日志分段都不会被加入到deletableSegments中。
收集完可删除的日志分段的文件集合之后的删除操作同基于日志大小的保留策略和基于时间的保留策略相同,这里不再赘述。
日志压缩-Log Compaction
Kafka中的Log Compaction是指在默认的日志删除(Log Deletion)规则之外提供的一种清理过时数据的方式。Log Compaction对于有相同key的的不同value值,只保留最后一个版本。如果应用只关心key对应的最新value值,可以开启Kafka的日志清理功能,Kafka会定期将相同key的消息进行合并,只保留最新的value值。
Log Compaction执行前后,日志分段中的每条消息的偏移量和写入时的保持一致。Log Compaction会生成新的日志分段文件,日志分段中每条消息的物理位置会重新按照新文件来组织。Log Compaction执行过后的偏移量不再是连续的,不过这并不影响日志的查询。
Kafka的Log Compaction可以类比于Redis的SNAPSHOTTING的持久化模式。试想一下,如果一个系统使用Kafka来保存状态,每次有状态变更都会将其写入Kafka中。在某一时刻此系统异常崩溃,进而在恢复时通过读取Kafka中的消息来恢复其应有的状态,那么此系统关心的是它原本的最新状态而不是历史时刻中的每一个状态。如果Kafka的日志保存策略是日志删除(Log Deletion),那么系统势必要一股脑的读取Kafka中的所有数据来恢复,而如果日志保存策略是Log Compaction,那么可以减少数据的加载量进而加快系统的恢复速度。Log Compaction在某些应用场景下可以简化技术栈,提高系统整体的质量。
我们知道可以通过配置log.dir或者log.dirs参数来设置Kafka日志的存放目录,而对于每一个日志目录下都有一个名为“cleaner-offset-checkpoint”的文件,这个文件就是清理检查点文件,用来记录每个主题的每个分区中已清理的偏移量。通过清理检查点文件可以将日志文件(Log)分成两个部分,参考下图,通过检查点cleaner checkpoint来划分出一个已经清理过的clean部分和一个还未清理过的dirty部分。在日志清理的同时,客户端也会读取日志。dirty部分的消息偏移量是逐一递增的,而clean部分的消息偏移量是断续的,如果客户端总能赶上dirty部分,它就能读取到日志的所有消息,反之,就不可能读到全部的消息。
上图中firstDirtyOffset(与cleaner checkpoint相等)表示dirty部分的起始偏移量,而firstUncleanableOffset为dirty部分的截止偏移量,整个dirty部分的偏移量范围为[firstDirtyOffset, firstUncleanableOffset),注意这里是左闭右开区间。为了避免当前活跃的日志分段activeSegment成为热点文件,activeSegment不会参与Log Compaction的操作。同时Kafka支持通过参数log.cleaner.min.compaction.lag.ms(默认值为0)来配置消息在被清理前的最小保留时间,默认情况下firstUncleanableOffset等于activeSegment的baseOffset。
注意Log Compaction是针对key的,所以在使用时应注意每个消息的key值不为null。每个broker会启动log.cleaner.thread(默认值为1)个日志清理线程负责执行清理任务,这些线程会选择“污浊率”最高的日志文件进行清理。用cleanBytes表示clean部分的日志占用大小,dirtyBytes表示dirty部分的日志占用大小,那么这个日志的污浊率(dirtyRatio)为:
dirtyRatio = dirtyBytes / (cleanBytes + dirtyBytes)
为了防止日志不必要的频繁清理操作,Kafka还使用了参数log.cleaner.min.cleanable.ratio(默认值为0.5)来限定可进行清理操作的最小污浊率。
Kafka中用于保存消费者消费位移的主题“__consumer_offsets”使用的就是Log Compaction策略。
这里我们已经知道了怎样选择合适的日志文件做清理操作,然而我们怎么对日志文件中消息的key进行筛选操作呢?Kafka中的每个日志清理线程会使用一个名为“SkimpyOffsetMap”的对象来构建key与offset的映射关系的哈希表。
日志清理需要遍历两次日志文件,第一次遍历把每个key的哈希值和最后出现的offset都保存在SkimpyOffsetMap中,映射模型如下图所示。
第二次遍历检查每个消息是否符合保留条件,如果符合就保留下来,否则就会被清理掉。假设一条消息的offset为O1,这条消息的key在SkimpyOffsetMap中所对应的offset为O2,如果O1>=O2即为满足保留条件。
默认情况下SkimpyOffsetMap使用MD5来计算key的哈希值,占用空间大小为16B,根据这个哈希值来从SkimpyOffsetMap中找到对应的槽位,如果发生冲突则用线性探测法处理。为了防止哈希冲突过于频繁,我们也可以通过broker端参数log.cleaner.io.buffer.load.factor(默认值为0.9)来调整负载因子。偏移量占用空间大小为8B,故一个映射项占用大小为24B。每个日志清理线程的SkimpyOffsetMap的内存占用大小为log.cleaner.dedupe.buffer.size / log.cleaner.thread,默认值为 = 128MB/1 = 128MB。所以默认情况下SkimpyOffsetMap可以保存128MB * 0.9 /24B ≈ 5033164个key的记录。假设每条消息的大小为1KB,那么这个SkimpyOffsetMap可以用来映射4.8GB的日志文件,而如果有重复的key,那么这个数值还会增大,整体上来说SkimpyOffsetMap极大的节省了内存空间且非常高效。
“SkimpyOffsetMap”这个取名也很有意思,“Skimpy”可以直译为“不足的”,可以看出它最初的设计者也认为这种实现不够严谨。如果遇到两个不同的key但哈希值相同的情况,那么其中一个key所对应的消息就会丢失。虽然说MD5这类摘要算法的冲突概率非常小,但根据墨菲定律,任何一个事件,只要具有大于0的几率,就不能假设它不会发生,所以在使用Log Compaction策略时要注意这一点。
Log Compaction会为我们保留key相应的最新value值,那么当我们需要删除一个key怎么办?Kafka中提供了一个墓碑消息(tombstone)的概念,如果一条消息的key不为null,但是其value为null,那么此消息就是墓碑消息。日志清理线程发现墓碑消息时会先进行常规的清理,并保留墓碑消息一段时间。墓碑消息的保留条件是当前墓碑消息所在的日志分段的最近修改时间lastModifiedTime大于deleteHorizonMs,参考图2,这个deleteHorizonMs的计算方式为clean部分中最后一个日志分段的最近修改时间减去保留阈值deleteRetionMs(通过broker端参数log.cleaner.delete.retention.ms配置,默认值为86400000,即24小时)的大小,即:
deleteHorizonMs = clean部分中最后一个LogSegment的lastModifiedTime - deleteRetionMs
所以墓碑消息的保留条件为:
所在LogSegment的lastModifiedTime > deleteHorizonMs
=> 所在LogSegment的lastModifiedTime > clean部分中最后一个LogSegment的lastModifiedTime - deleteRetionMs
=> 所在LogSegment的lastModifiedTime + deleteRetionMs > clean部分中最后一个LogSegment的lastModifiedTime
(可以对照图2中的deleteRetionMs所标记的位置去理解)
Log Compaction执行过后的日志分段的大小会比原先的日志分段的要小,为了防止出现太多的小文件,Kafka在实际清理过程中并不对单个的日志分段进行单独清理,而是会将日志文件中offset从0至firstUncleanableOffset的所有日志分段进行分组,每个日志分段只属于一组,分组策略为:按照日志分段的顺序遍历,每组中日志分段的占用空间大小之和不超过segmentSize(可以通过broker端参数log.segments.bytes设置,默认值为1GB),且对应的索引文件占用大小之和不超过maxIndexSize(可以通过broker端参数log.index.interval.bytes设置,默认值为10MB)。同一个组的多个日志分段清理过后,只会生成一个新的日志分段。
参考上图,假设所有的参数配置都为默认值,在Log Compaction之前checkpoint的初始值为0。执行第一次Log Compaction之后,每个非活跃的日志分段的大小都有所缩减,checkpoint的值也有所变化。执行第二次Log Compaction时会将组队成[0.4GB, 0.4GB]、[0.3GB, 0.7GB]、[0.3GB]、[1GB]这4个分组,并且从第二次Log Compaction开始还会涉及墓碑消息的清除。同理,第三次Log Compaction过后的情形可参考上图尾部。Log Compaction过程中会将对每个日志分组中需要保留的消息拷贝到一个以“.clean”为后缀的临时文件中,此临时文件以当前日志分组中第一个日志分段的文件名命名,例如:00000000000000000000.log.clean。Log Compaction过后将“.clean”的文件修改为以“.swap”后缀的文件,例如:00000000000000000000.log.swap,然后删除掉原本的日志文件,最后才把文件的“.swap”后缀去掉,整个过程中的索引文件的变换也是如此,至此一个完整Log Compaction操作才算完成。