首页 > 编程语言 >Kafka源码分析(十九)——Broker:日志子系统——Log

Kafka源码分析(十九)——Broker:日志子系统——Log

时间:2024-06-10 18:28:33浏览次数:13  
标签:Log appendInfo segment Broker 源码 日志 config val

作者简介:大家好,我是smart哥,前中兴通讯、美团架构师,现某互联网公司CTO

联系qq:184480602,加我进群,大家一起学习,一起进步,一起对抗互联网寒冬

学习必须往深处挖,挖的越深,基础越扎实!

阶段1、深入多线程

阶段2、深入多线程设计模式

阶段3、深入juc源码解析


阶段4、深入jdk其余源码解析


阶段5、深入jvm源码解析

码哥源码部分

码哥讲源码-原理源码篇【2024年最新大厂关于线程池使用的场景题】

码哥讲源码【炸雷啦!炸雷啦!黄光头他终于跑路啦!】

码哥讲源码-【jvm课程前置知识及c/c++调试环境搭建】

​​​​​​码哥讲源码-原理源码篇【揭秘join方法的唤醒本质上决定于jvm的底层析构函数】

码哥源码-原理源码篇【Doug Lea为什么要将成员变量赋值给局部变量后再操作?】

码哥讲源码【你水不是你的错,但是你胡说八道就是你不对了!】

码哥讲源码【谁再说Spring不支持多线程事务,你给我抽他!】

终结B站没人能讲清楚红黑树的历史,不服等你来踢馆!

打脸系列【020-3小时讲解MESI协议和volatile之间的关系,那些将x86下的验证结果当作最终结果的水货们请闭嘴】

Log是LogSegment日志段的容器,里面定义了很多管理日志段的操作。Log 对象是 Kafka Broker最核心的部分:

    // Log.scala
    
    class Log(@volatile var dir: File,
              @volatile var config: LogConfig,
              @volatile var recoveryPoint: Long = 0L,
              scheduler: Scheduler,
              time: Time = Time.SYSTEM) extends Logging with KafkaMetricsGroup {
    }

Log中包含两个核心属性: dir 和 logStartOffset 。dir是主题分区日志所在的文件夹路径,比如"topic-0"、"topic-1";而 logStartOffset,表示日志的当前最早位移。

一、核心方法

Log 的常见操作分为 4 大部分:

  • 高水位管理操作: 高水位(HW)的概念在 Kafka 中举足轻重,对它的管理,是 Log 最重要的功能之一;
  • 日志段管理: Log 是日志段的容器,高效组织与管理其下辖的所有日志段对象,是源码要解决的核心问题;
  • 关键位移值管理: 日志定义了很多重要的位移值,比如 Log Start Offset 和 LEO 等,确保这些位移值的正确性,是构建消息引擎一致性的基础;
  • 读写操作: 所谓的操作日志,大体上就是指读写日志,读写操作的作用之大,不言而喻。

 

1.1 日志段管理

Log 是LogSegment的容器,它使用 J.U.C中的 ConcurrentSkipListMap 类来保存所有日志段对象。Kafka 将每个日志段的 起始位移值作为 Key ,这样一来,我们就能够很方便地根据所有日志段的起始位移值对它们进行排序和比较,同时还能快速地找到与给定位移值相近的前后两个日志段:

    // Log.scala
    
    private val segments: ConcurrentNavigableMap[java.lang.Long, LogSegment] = new ConcurrentSkipListMap[java.lang.Long, LogSegment]

对于ConcurrentSkipListMap不了解的童鞋,可以去看我的专栏:《透彻理解Java并发编程》。

Kafka 提供了很多策略(包括基于时间维度、空间维度、Log Start Offset 维度),可以根据一定的规则决定哪些日志段可以删除:

1.2 写日志操作

我们重点看下Log的写日志操作,也就是append方法。整个流程可以用下面这张图表述:

    // Log.scala
    
    def append(records: MemoryRecords, assignOffsets: Boolean = true): LogAppendInfo = {
      // 1.分析和校验待写入消息集合,并返回校验结果
      val appendInfo = analyzeAndValidateRecords(records)
    
      // 如果不需要写入任何消息,直接返回
      if (appendInfo.shallowCount == 0)
        return appendInfo
    
      // 2.消息格式规整,即删除无效格式消息或无效字节
      var validRecords = trimInvalidBytes(records, appendInfo)
    
      try {
        lock synchronized {
          if (assignOffsets) {
            // 3.使用当前LEO值作为待写入消息集合中第一条消息的位移值
            val offset = new LongRef(nextOffsetMetadata.messageOffset)
            appendInfo.firstOffset = offset.value
            val now = time.milliseconds
            val validateAndOffsetAssignResult = try {
              LogValidator.validateMessagesAndAssignOffsets(validRecords,
                                                            offset,
                                                            now,
                                                            appendInfo.sourceCodec,
                                                            appendInfo.targetCodec,
                                                            config.compact,
                                                            config.messageFormatVersion.messageFormatVersion,
                                                            config.messageTimestampType,
                                                            config.messageTimestampDifferenceMaxMs)
            } catch {
              case e: IOException => throw new KafkaException("Error in validating messages while appending to log '%s'".format(name), e)
            }
            // 更新校验结果对象类LogAppendInfo
            validRecords = validateAndOffsetAssignResult.validatedRecords
            appendInfo.maxTimestamp = validateAndOffsetAssignResult.maxTimestamp
            appendInfo.offsetOfMaxTimestamp = validateAndOffsetAssignResult.shallowOffsetOfMaxTimestamp
            appendInfo.lastOffset = offset.value - 1
            if (config.messageTimestampType == TimestampType.LOG_APPEND_TIME)
              appendInfo.logAppendTime = now
    
            // 4.验证消息,确保消息大小不超限
            if (validateAndOffsetAssignResult.messageSizeMaybeChanged) {
              for (logEntry <- validRecords.shallowEntries.asScala) {
                if (logEntry.sizeInBytes > config.maxMessageSize) { BrokerTopicStats.getBrokerTopicStats(topicPartition.topic).bytesRejectedRate.mark(records.sizeInBytes)
                  BrokerTopicStats.getBrokerAllTopicsStats.bytesRejectedRate.mark(records.sizeInBytes)
                  throw new RecordTooLargeException("Message size is %d bytes which exceeds the maximum configured message size of %d."
                    .format(logEntry.sizeInBytes, config.maxMessageSize))
                }
              }
            }
    
          } else {    // 直接使用给定的位移值,无需自己分配位移值;
                      // 确保消息位移值的单调递增性
            if (!appendInfo.offsetsMonotonic || appendInfo.firstOffset < nextOffsetMetadata.messageOffset)
              throw new IllegalArgumentException("Out of order offsets found in " + records.deepEntries.asScala.map(_.offset))
          }
    
          // 5.确保消息大小不超限
          if (validRecords.sizeInBytes > config.segmentSize) {
            throw new RecordBatchTooLargeException("Message set size is %d bytes which exceeds the maximum configured segment size of %d."
              .format(validRecords.sizeInBytes, config.segmentSize))
          }
    
          // 6.执行日志切分,当前日志段剩余容量可能无法容纳新消息集合,因此有必要创建一个新的日志段来保存待写入的所有消息
          val segment = maybeRoll(messagesSize = validRecords.sizeInBytes,
            maxTimestampInMessages = appendInfo.maxTimestamp,
            maxOffsetInMessages = appendInfo.lastOffset)
    
          // 7.执行真正的消息写入操作,主要调用LogSegment.append方法实现
          segment.append(firstOffset = appendInfo.firstOffset,
            largestOffset = appendInfo.lastOffset,
            largestTimestamp = appendInfo.maxTimestamp,
            shallowOffsetOfMaxTimestamp = appendInfo.offsetOfMaxTimestamp,
            records = validRecords)
    
          // 8.更新LEO对象,其中,LEO值是消息集合中最后一条消息位移值+1
          updateLogEndOffset(appendInfo.lastOffset + 1)
    
          // 9.是否需要手动落盘(根据消息数判断)
          // 一般情况下我们不需要设置Broker端参数log.flush.interval.messages, 落盘操作由OS完成
          // 但某些情况下,可以设置该参数来确保高可靠性
          if (unflushedMessages >= config.flushInterval)
            flush()
    
          appendInfo
        }
      } catch {
        case e: IOException => throw new KafkaStorageException("I/O exception in append to log '%s'".format(name), e)
      }
    }

日志的写入操作,是通过LogSegment.append()方法完成的,我下一章节会对LogSegment进行分析。

1.3 日志切分

Log在写日志的过程中,有一个很重要的maybeRoll方法,负责进行 日志切分 。所谓日志切分,就是说如果当前日志段的剩余容量无法容纳新消息时,就需要新创建一个日志段:

    // Log.scala
    
    private def maybeRoll(messagesSize: Int, maxTimestampInMessages: Long, maxOffsetInMessages: Long): LogSegment = {
      val segment = activeSegment
      val now = time.milliseconds
      val reachedRollMs = segment.timeWaitedForRoll(now, maxTimestampInMessages) > config.segmentMs - segment.rollJitterMs
      // 容量不足,执行切分
      if (segment.size > config.segmentSize - messagesSize ||
          (segment.size > 0 && reachedRollMs) ||
          segment.index.isFull || segment.timeIndex.isFull || !segment.canConvertToRelativeOffset(maxOffsetInMessages)) {
        roll(maxOffsetInMessages - Integer.MAX_VALUE)
      } else {
        segment
      }
    }
    
    def roll(expectedNextOffset: Long = 0): LogSegment = {
      val start = time.nanoseconds
      lock synchronized {
        val newOffset = Math.max(expectedNextOffset, logEndOffset)
        val logFile = Log.logFile(dir, newOffset)
        val indexFile = indexFilename(dir, newOffset)
        val timeIndexFile = timeIndexFilename(dir, newOffset)
        for(file <- List(logFile, indexFile, timeIndexFile); if file.exists) {
          warn("Newly rolled segment file " + file.getName + " already exists; deleting it first")
          file.delete()
        }
    
        segments.lastEntry() match {
          case null =>
          case entry => {
            val seg = entry.getValue
            seg.onBecomeInactiveSegment()
            seg.index.trimToValidSize()
            seg.timeIndex.trimToValidSize()
            seg.log.trim()
          }
        }
        // 新建一个日志段
        val segment = new LogSegment(dir,
                                     startOffset = newOffset,
                                     indexIntervalBytes = config.indexInterval,
                                     maxIndexSize = config.maxIndexSize,
                                     rollJitterMs = config.randomSegmentJitter,
                                     time = time,
                                     fileAlreadyExists = false,
                                     initFileSize = initFileSize,
                                     preallocate = config.preallocate)
        val prev = addSegment(segment)
        if(prev != null)
          throw new KafkaException("Trying to roll a new log segment for topic partition %s with start offset %d while it already exists.".format(name, newOffset))
        updateLogEndOffset(nextOffsetMetadata.messageOffset)
        scheduler.schedule("flush-log", () => flush(newOffset), delay = 0L)
        segment
      }
    }

可以通过Kafka Broker端的参数segment.bytes设置日志分段的大小,默认为1G,当前正在写入的LogSegment称为Active LogSegment

二、总结

本章,我对Log对象进行了简单的讲解。Log是LogSegment日志段的容器,里面定义了很多管理日志段的操作,我们目前只需要关注它的写日志方法即可。

标签:Log,appendInfo,segment,Broker,源码,日志,config,val
From: https://blog.csdn.net/smart_an/article/details/139580192

相关文章

  • 【源码】企业发卡系统源码蓝色UI模板+商户+手机端+对接易支付+搭建教程
    企业发卡系统源码蓝色UI模板+商户+手机端+对接易支付,是基于Thinkphp5开发的后台管理系统,集成后台系统常用功能,拥有多用户,多支付通道对接功能,全响应式界面,简约易操作,一站式寄售卡系统。史上最强大的企业自动发卡平台,支持多用户入驻,后台功能超强大!支持虚拟卡密自动发卡,支持优......
  • 【源码】源码物品销售系统多种支付接口出售源码轻松赚钱
    源码物品销售系统,多种支付接口,出售源码轻松赚钱。一款基于php+mysql开发的内容付费管理系统。系统支持多种收费方式,免签收款,三级分销,实名认证,用户投稿/奖励,自动升级,佣金提现等。高度开源:除核心授权文件外全部开源,二开方便。文章内容收费:可设置部分内容收费或自动截取部分内......
  • Zgo - custom_log.go
     packagemainimport("fmt""io""log""os""path")funcmain(){flag:=os.O_APPEND|os.O_CREATE|os.O_WRONLYlogFile:=path.Join(os.TempDir(),"mGo.log")......
  • 最新AI系统源码,ChatGPT运营网站H5系统源码,支持GPTs应用、AI绘画、文档分析、GPT语音对
    一、文章前言SparkAi创作系统是基于ChatGPT进行开发的Ai智能问答系统和Midjourney绘画系统,支持OpenAI-GPT全模型+国内AI全模型。支持GPT-4o大模型、文档分析、识图图片理解、GPTs应用、GPT语音对话、联网提问、GPT-4全模型、DALL-E3文生图、GPT4-All联网搜索模型、思维导图、......
  • Tomcat源码解析(八):一个请求的执行流程(附Tomcat整体总结)
    Tomcat源码系列文章Tomcat源码解析(一):Tomcat整体架构Tomcat源码解析(二):Bootstrap和CatalinaTomcat源码解析(三):LifeCycle生命周期管理Tomcat源码解析(四):StandardServer和StandardServiceTomcat源码解析(五):StandardEngine、StandardHost、StandardContext、Standard......
  • 1900springboot VUE 生态菜园管理系统开发mysql数据库web结构java编程计算机网页源码m
    一、源码特点 springbootVUE生态菜园管理系统是一套完善的完整信息管理类型系统,结合springboot框架和VUE完成本系统,对理解JSPjava编程开发语言有帮助系统采用springboot框架(MVC模式开发),系统具有完整的源代码和数据库,系统主要采用B/S模式开发。前段主要技术vue 后端主......
  • SpringBoot个人网盘系统-计算机毕业设计源码92922
    摘 要随着科学技术的飞速发展,社会的方方面面、各行各业都在努力与现代的先进技术接轨,通过科技手段来提高自身的优势或改善自身的缺点,互联网的发展文件管理带来了福音。个人网盘系统是以实际运用为开发背景,运用软件工程原理和开发方法,采用Java技术构建的一个线上系统。整个......
  • springboot高校运动会信息管理系统设计与实现-计算机毕业设计源码92968
    摘 要本论文介绍了一个高校运动会信息管理系统的设计和实现过程。首先是高校运动会的需求分析和可行性分析,通过比较运动会的各个工作流程,确定了系统的数据流程和数据库结构,然后介绍了高校运动会信息管理系统开发所使用的软件开发工具,最后描述了系统的详细设计与实现。本系统......
  • WebLogic XMLDecoder反序列化漏洞
    目录前言XMLDecoder概述XMLDecoder反序列化漏洞漏洞复现前言上篇复现了T3反序列化漏洞,XMLDecoder反序列化在WebLogic中也是一类影响很大的反序化漏洞。XMLDecoder概述XMLDecoder是JDK自带的以SAX方式解析xml的类,实现java对象和xml文件之间的转化。其中序列化过程是将java对象......
  • 仿饿了么的谁去拿外卖游戏源码
    源码介绍喝酒没有游戏玩?懒得下床不想出去那么好这个游戏会满足你!玩法每人都选择一个序号4个人为例张三选第①李四选第②王五选第③赵前选第④然后就按4下其中最小的数对应的序号就是他输了就去拿外卖!源码下载仿饿了么的谁去拿外卖游戏源码......