首页 > 编程语言 >Kafka日志压实算法

Kafka日志压实算法

时间:2023-12-16 14:45:21浏览次数:27  
标签:__ 12 log offsets kafka 日志 Kafka consumer 压实

概念介绍

我们有时候可以把Kafka当作key、value数据库用(当然kafka中的消息可以不指定key)。
__consumer_offsets 这个topic的数据,就是典型的key、value数据。

/usr/local/kafka2.8/bin/kafka-run-class.sh kafka.tools.DumpLogSegments --deep-iteration --print-data-log --files /mnt/kafka-disk/kafka-logs/__consumer_offsets-7/00000000000310221664.log --offsets-decoder

key中存储的是【消费者组、分区】信息,value中存储的是最新提交的offset。

显然,磁盘上只需要保留某一个key的最新值即可。即只需要知道某一个消费者组的最新提交的offset,并不关心昨天的offset。

针对这个场景,压实算法就派上用场了。

本文引用信息大多来自于《Kafka权威指南》一书
一般情况下,Kafka会根据设置的时间来保留数据,把超过时效的旧数据删除。但是,请试想一种场景,假设你用Kafka来保存客户的收货地址,那么保存客户的最新地址比保存客户上周甚至去年的地址更有意义,这样你就不用保留客户的旧地址了。另外一种场景是应用程序使用Kafka来保存它的当前状态,每次状态发生变化,就将新状态写入Kafka。当应用程序从故障中恢复时,它会从Kafka读取之前保存的消息,以便恢复到最近的状态。应用程序只关心发生崩溃前的那个状态,并不关心在运行过程中发生的所有状态变化。
Kafka通过改变主题的保留策略来满足这些应用场景。如果保留策略是delete,那么早于保留时间的旧事件将被删除;如果保留策略是compact(压实),那么只为每个键保留最新的值。很显然,只有当应用程序生成的事件里包含了键–值对时,设置compact才有意义。如果主题中包含了null键,那么这个策略就会失效。

以生产环境一次压实过程为例,分析算法执行步骤

1、压实__consumer_offsets-7 相关日志如下:

[2023-12-16 12:04:18,974] INFO [Log partition=__consumer_offsets-7, dir=/usr/local/kafka/kafka-logs] Rolled new log segment at offset 310221664 in 8 ms. (kafka.log.Log)
[2023-12-16 12:04:19,086] DEBUG Finding range of cleanable offsets for log=__consumer_offsets-7. Last clean offset=Some(309313806) now=1702699459085 => firstDirtyOffset=309313806 firstUncleanableOffset=310221664 activeSegment.baseOffset=310221664 (kafka.log.LogCleanerManager$)
2023-12-16 12:04:19,086] INFO Cleaner 0: Beginning cleaning of log __consumer_offsets-7. (kafka.log.LogCleaner)
[2023-12-16 12:04:19,087] INFO Cleaner 0: Building offset map for __consumer_offsets-7... (kafka.log.LogCleaner)
[2023-12-16 12:04:19,108] INFO Cleaner 0: Building offset map for log __consumer_offsets-7 for 1 segments in offset range [309313806, 310221664). (kafka.log.LogCleaner)
[2023-12-16 12:04:19,697] INFO Cleaner 0: Offset map for log __consumer_offsets-7 complete. (kafka.log.LogCleaner)
[2023-12-16 12:04:19,697] INFO Cleaner 0: Cleaning log __consumer_offsets-7 (cleaning prior to Sat Dec 16 12:04:18 CST 2023, discarding tombstones prior to Fri Dec 15 07:50:41 CST 2023)... (kafka.log.LogCleaner)
[2023-12-16 12:04:19,697] INFO Cleaner 0: Cleaning LogSegment(baseOffset=0, size=739, lastModifiedTime=1702669040000, largestRecordTimestamp=Some(1702040094754)) in log __consumer_offsets-7 into 0 with deletion horizon 1702597841000, retaining deletes. (kafka.log.LogCleaner)
[2023-12-16 12:04:19,699] INFO Cleaner 0: Cleaning LogSegment(baseOffset=308405948, size=693, lastModifiedTime=1702684241000, largestRecordTimestamp=Some(1702684241337)) in log __consumer_offsets-7 into 0 with deletion horizon 1702597841000, retaining deletes. (kafka.log.LogCleaner)
[2023-12-16 12:04:19,709] INFO Cleaner 0: Swapping in cleaned segment LogSegment(baseOffset=0, size=739, lastModifiedTime=1702684241000, largestRecordTimestamp=Some(1702040094754)) for segment(s) List(LogSegment(baseOffset=0, size=739, lastModifiedTime=1702669040000, largestRecordTimestamp=Some(1702040094754)), LogSegment(baseOffset=308405948, size=693, lastModifiedTime=1702684241000, largestRecordTimestamp=Some(1702684241337))) in log Log(dir=/usr/local/kafka/kafka-logs/__consumer_offsets-7, topic=__consumer_offsets, partition=7, highWatermark=310221708, lastStableOffset=310221708, logStartOffset=0, logEndOffset=310221710) (kafka.log.LogCleaner)
[2023-12-16 12:04:19,709] INFO Cleaner 0: Cleaning LogSegment(baseOffset=309313806, size=104857599, lastModifiedTime=1702699458000, largestRecordTimestamp=Some(1702699458895)) in log __consumer_offsets-7 into 309313806 with deletion horizon 1702597841000, retaining deletes. (kafka.log.LogCleaner)
[2023-12-16 12:04:20,235] INFO Cleaner 0: Swapping in cleaned segment LogSegment(baseOffset=309313806, size=693, lastModifiedTime=1702699458000, largestRecordTimestamp=Some(1702699458895)) for segment(s) List(LogSegment(baseOffset=309313806, size=104857599, lastModifiedTime=1702699458000, largestRecordTimestamp=Some(1702699458895))) in log Log(dir=/usr/local/kafka/kafka-logs/__consumer_offsets-7, topic=__consumer_offsets, partition=7, highWatermark=310221742, lastStableOffset=310221742, logStartOffset=0, logEndOffset=310221742) (kafka.log.LogCleaner)
[2023-12-16 12:04:20,235] INFO [kafka-log-cleaner-thread-0]:
        Log cleaner thread 0 cleaned log __consumer_offsets-7 (dirty section = [309313806, 310221664])
        100.0 MB of log processed in 1.1 seconds (87.1 MB/sec).
        Indexed 100.0 MB in 0.6 seconds (163.9 Mb/sec, 53.1% of total time)
        Buffer utilization: 0.0%
        Cleaned 100.0 MB in 0.5 seconds (185.9 Mb/sec, 46.9% of total time)
        Start size: 100.0 MB (907,866 messages)
        End size: 0.0 MB (8 messages)
        100.0% size reduction (100.0% fewer messages)
 (kafka.log.LogCleaner)

从日志中我们可以看到:
1、先是新创建一个段文件。offset从310221664开始。 这个段就是 __consumer_offsets-7的活跃段。活跃段不会进行压实,也不会清理。

2、之后,看到几个关键信息:

  • firstDirtyOffset=309313806
  • activeSegment.baseOffset=310221664

什么是DirtyOffset呢?

即未进行过压实操作的段的offset。

新段创建之后,就要对段文件00000000000309313806.log (offset范围【309313806, 310221664】) 进行压实操作。

经过压实操作之后,这个文件大小只有693b, 我们来看下这个文件的内容:

/usr/local/kafka2.8/bin/kafka-run-class.sh kafka.tools.DumpLogSegments --deep-iteration --print-data-log --files /mnt/kafka-disk/kafka-logs/__consumer_offsets-7/00000000000309313806.log --offsets-decoder

段文件内容精简了很多,每个key只保留了一行数据。【309313806, 310221664】这么大的offset范围,只占不到1k空间。

总结

1、.log文件中的offset,必须是单调递增的。可以不连续(本来是连续的,由于压实算法的原因变的不连续),但必须是单调递增。
我们线上的业务,因为未知的原因,就出现了同一个段文件中offset突然变小导致脏数据,分区数据无法写入的问题。

标签:__,12,log,offsets,kafka,日志,Kafka,consumer,压实
From: https://www.cnblogs.com/xushengbin/p/17904792.html

相关文章

  • 归档日志清理后还能不能闪回
    在Oracle数据库中,归档日志的清理不会影响数据库的闪回功能。归档日志是用于数据库恢复的重要组成部分,特别是在进行数据库闪回操作时。如果你已经启用了Oracle数据库的归档模式,数据库会自动将归档日志保存在指定的归档目录中。即使你清理了一些旧的归档日志,数据库仍然会保留足够的......
  • 快速打印docker容器日志
    有的时候需要在服务器上查看日志信息。往往敲命令又太多,觉得麻烦,所以写了一个这个脚本。赋权之后,这个脚本里面丢到/usr/local/bin/下面。就可以在任何地方使用lgs,然后输入容器部分的名字。如果有多个输入序号就可以打印日志啦。#/bin/bashread-p"entername:"contain......
  • MySQL如何输出发生死锁的SQL到日志文件
    一、背景首先我们在日常的开发中,大概率会使用批量更新,或者在一个事务里面做增删改查,那么就有可能不同事务之间导致死锁的发生。这里主要讲的是如何将当时发生死锁的信息输出到日志文件中,以及具体的SQL打印。二、如何实现查了很多网上的文章,都是使用什么下面之类的命令showengin......
  • 60道KafKa高频题整理(附答案背诵版)
    废话不多说,直接上干货简述什么是Kafka的Topic?Kafka的Topic是一个存储消息的逻辑概念,可以认为是一个消息集合。每条消息发送到Kafka集群的消息都有一个类别,这个类别就是Topic。物理上来说,不同的Topic的消息是分开存储的,每个Topic可以有多个生产者向它发送消息,也可......
  • Mysql慢日志getshell
    Mysql慢日志getshellshowvariableslike'%slow%';Variable_nameValuelog_slow_queriesOFFslow_launch_time2slow_query_logOFFslow_query_log_fileC:\phpStudy\PHPTutorial\MySQL\data\WIN-374NAWYudt-slow.logsetGLOBALsl......
  • Kafka 分布式消息系统
    文章目录消息中间件对比Kafka概述kafka安装和配置kafka入门生产者发送消息消费者接收消息Kafka高可用设计集群备份机制(Replication)备份机制(Replication)-同步方式kafka生产者详解同步发送异步发送参数详解(ack)参数详解(retries)参数详解-消息压缩kafka消费者详解消费者组消息有......
  • php tp框架 自定义日志
    调用方法$file_log=['order_id'=>123,];(newLogs('log'))->infos('日志文案',$file_log);[2023-12-1415:24:13][INFO][log]{"msg":"日志文案","params":{"order_id":123},"file......
  • kafka kafka-ui 单机部署
    1.下载https://kafka.apache.org/downloads解压压缩包,以/root/kafka为例2.启动zk/root/kafka/bin/zookeeper-server-start.sh/root/kafka/config/zookeeper.properties3.启动broker/root/kafka/bin/kafka-server-start.sh/root/kafka/config/server.properties4.do......
  • Docker安装Kafka安装zookeeper教程(超详细)
    1Docker安装Kafka安装zookeeper教程(超详细)2app-tier:网络名称3-driver:网络类型为bridge41.dockernetworkcreateapp-tier--driverbridge561、安装zookeeper7Kafka依赖zookeeper所以先安装zookeeper8-p:设置映射端口(默认2181)9-d:后台启动101......
  • Java GC日志分析
    阅读分析虚拟机和垃圾收集器的日志是处理Java虚拟机内存问题必备的基础技能。一、JDK统一日志格式垃圾收集器日志是一系列人为设定的规则,多少有点随开发者编码时的心情而定,没有任何的“业界标准”可言,换句话说,每个收集器的日志格式都可能不一样。除此以外还有一个麻烦,在JDK9以前......