首页 > 编程语言 >【源码】Kafka订制协议如何处理粘拆包

【源码】Kafka订制协议如何处理粘拆包

时间:2025-01-11 14:00:11浏览次数:1  
标签:读取 处理 Kafka 源码 消息 缓冲区 channel 拆包

前言

在上一篇随笔中,我们探讨了如何使用 Netty 处理自定义协议中的粘包和拆包问题。Netty 提供了高度封装的 API,帮助开发者轻松应对这一挑战,因此很多人都对其解决方案非常熟悉。

但如果我们直接使用 Java NIO 来实现类似的功能,应该怎么做呢?

Kafka,作为一个成熟的分布式消息队列系统,正是直接基于 Java NIO 实现的,它的设计与实现方式值得我们深入学习。今天我们就来看看,Kafka 是如何使用 Java NIO 处理粘包拆包问题的。

注:以下内容基于 Kafka 3.9.0 源码进行分析。

Kafka协议格式

Kafka 消息的协议格式比较简单,采用了经典的消息头+消息体格式。消息头包含一个 4 字节的 int,表示消息体的长度。剩下的部分就是实际的消息体。

简而言之,Kafka 协议的消息格式大致如下:

  1. 消息头:4 字节,表示消息体的长度。
  2. 消息体:根据消息头的长度字段读取相应大小的数据。

这种简单的设计方式使得 Kafka 消息的解码变得非常高效,但也带来了一个需要解决的关键问题:如何处理粘包和拆包?

架构背景

为了理解 Kafka 如何处理消息的粘包拆包问题,我们首先来看看 Kafka 的NIO架构。下图展示了 Kafka 的基本架构,其中 Processor 组件负责接收和解析数据包,并将请求写入请求队列(RequestQueue)等待进一步处理。既然能写入请求队列,则解码操作肯定已经完成了,下面我们具体看看Processor的代码。

 Processor处理线程

Kafka 的 Processor 线程负责从网络通道中读取消息,解码并处理请求。以下是 Processorrun 函数实现,我们可以看到它执行了一些关键操作:

// SocketServer.scala
override def run(): Unit = {
    try {
      while (shouldRun.get()) {
        try {
          // 设置新连接
          configureNewConnections()
          // 处理新响应
          processNewResponses()
          // 轮询处理网络事件
          poll()
          // 处理完成的接收操作
          processCompletedReceives()
          // 处理完成的发送操作
          processCompletedSends()
          // 处理断开连接的操作
          processDisconnected()
          // 关闭多余的连接
          closeExcessConnections()
        } catch {
          case e: Throwable => processException("Processor got uncaught exception.", e)
        }
      }
    } finally {
      debug(s"Closing selector - processor $id")
      CoreUtils.swallow(closeAll(), this, Level.ERROR)
    }
}

run 方法中,Processor 线程会执行一系列的操作来管理网络连接和消息的处理。可以看到,整个处理过程是通过轮询(poll)和不同的处理方法(如 processCompletedReceivesprocessCompletedSends)来实现的。

定位解码逻辑

根据方法名,解包操作应该是在 processCompletedReceives 方法中进行的。然而,实际代码中,我们发现 processCompletedReceives 方法在处理时,已经能够读取到请求头信息,这意味着消息已经是完整的。如果消息还不完整,肯定会存在边界问题。

因此,可以推测,completedReceives 中的消息已经经过了解码处理。接下来,我们只需要找到 completedReceives 是何时写入的,便能清楚了解解码的时机。

//SocketServer.scala

private def processCompletedReceives(): Unit = {
  // 遍历所有已完成的接收(消息)
  selector.completedReceives.forEach { receive =>
    try {
      // 根据消息的源(source)获取通道,判断该通道是否处于打开或关闭状态
      openOrClosingChannel(receive.source) match {
        case Some(channel) =>
          // 解析请求头(消息头部分)
          val header = parseRequestHeader(receive.payload)

          // 如果消息类型是 SASL 握手请求,可能需要开始重新认证
          if (header.apiKey == ApiKeys.SASL_HANDSHAKE && channel.maybeBeginServerReauthentication(receive,
            () => time.nanoseconds())) {
            trace(s"Begin re-authentication: $channel")
          } else {
            // 获取当前时间戳(用于后续操作)
            val nowNanos = time.nanoseconds()

            // 判断通道的认证会话是否过期
            if (channel.serverAuthenticationSessionExpired(nowNanos)) {
              // 如果过期,关闭连接并记录过期连接数
              debug(s"Disconnecting expired channel: $channel : $header")
              close(channel.id)
              expiredConnectionsKilledCount.record(null, 1, 0)
            } else {
              // 获取连接ID(消息来源)
              val connectionId = receive.source

              // 构建请求上下文
              val context = new RequestContext(header, connectionId, channel.socketAddress, Optional.of(channel.socketPort()),
                channel.principal, listenerName, securityProtocol, channel.channelMetadataRegistry.clientInformation,
                isPrivilegedListener, channel.principalSerde)

              // 创建请求对象
              val req = new RequestChannel.Request(processor = id, context = context,
                startTimeNanos = nowNanos, memoryPool, receive.payload, requestChannel.metrics, None)

              // KIP-511: 拦截 API_VERSIONS 请求以捕获客户端软件名和版本信息
              if (header.apiKey == ApiKeys.API_VERSIONS) {
                val apiVersionsRequest = req.body[ApiVersionsRequest]
                if (apiVersionsRequest.isValid) {
                  // 注册客户端信息
                  channel.channelMetadataRegistry.registerClientInformation(new ClientInformation(
                    apiVersionsRequest.data.clientSoftwareName,
                    apiVersionsRequest.data.clientSoftwareVersion))
                }
              }

              // 将请求发送到请求通道
              requestChannel.sendRequest(req)

              // 将通道设置为静默状态,防止进一步的操作,直到请求处理完毕
              selector.mute(connectionId)

              // 处理通道的静默事件
              handleChannelMuteEvent(connectionId, ChannelMuteEvent.REQUEST_RECEIVED)
            }
          }
        case None =>
          // 如果通道没有在选择器中找到,这应该永远不会发生
          // 因为已完成的接收总是会在 `poll()` 后立即处理
          throw new IllegalStateException(s"Channel ${receive.source} removed from selector before processing completed receive")
      }
    } catch {
      // 如果处理请求时发生异常,捕获异常并继续处理
      case e: Throwable =>
        processChannelException(receive.source, s"Exception while processing request from ${receive.source}", e)
    }
  }

  // 清空已完成的接收队列,确保下次可以处理新的消息
  selector.clearCompletedReceives()
}

进一步深入代码后,发现解码操作实际上是在 poll() 方法中完成的。其中具体执行流程如下:

 

Processor 通过 KafkaChannelNetworkReceive 来处理每一条消息。NetworkReceive 是 Kafka 用来包装完整消息的对象,它包含了两个缓冲区(ByteBuffer):

  • 消息头缓冲区:用于存放固定大小的消息头(4 字节)。
  • 消息体缓冲区:用于存放动态大小的消息体。

KafkaChannel 任何时候都只持有一个NetworkReceive 对象。意味着它缓存的数据不会超过1条消息。

NetworkReceive类的属性如下:

//NetworkReceive.java
//KafkaChannel id
private final String source;
//消息头缓存,new的时候固定4字节
private final ByteBuffer size;
//最大值,消息体不能超过这个大小。超过则报错
private final int maxSize;
//内存分配器,使用这个分配消息体缓存空间
private final MemoryPool memoryPool;
//消息体请求分配的空间大小
private int requestedBufferSize = -1;
//消息体缓存
private ByteBuffer buffer;

数据读取过程

Kafka 的数据读取过程分为几个步骤:

  1. 读取消息头:Kafka 会首先尝试从 SocketChannel 中读取消息头。消息头固定为 4 字节,表示消息体的大小。每次读取时,Kafka 会优先读取这 4 字节的数据。

  2. 计算消息体大小:根据读取到的消息头(消息体的长度),Kafka 会分配一个新的缓冲区用于存放消息体数据。

  3. 读取消息体:接下来,Kafka 会继续从 SocketChannel 中读取剩余的数据,直到消息体完全被读取到新的缓冲区中。

//NetworkReceive.java

public long readFrom(ScatteringByteChannel channel) throws IOException {
    int read = 0;  // 初始化已读取字节数

    // 首先检查 size 缓冲区是否还有剩余字节可读
    if (size.hasRemaining()) {
        // 从 channel 中读取数据到 size 缓冲区
        int bytesRead = channel.read(size);
        
        // 如果读取到文件末尾,抛出 EOFException 异常
        if (bytesRead < 0)
            throw new EOFException();
        
        read += bytesRead;  // 累加已读取的字节数
        
        // 如果 size 缓冲区已读取完成(没有剩余字节)
        if (!size.hasRemaining()) {
            size.rewind();  // 重置 position 为 0,准备读取数据
            
            // 获取消息体的大小
            int receiveSize = size.getInt();
            
            // 如果消息体大小无效(小于 0),抛出异常
            if (receiveSize < 0)
                throw new InvalidReceiveException("Invalid receive (size = " + receiveSize + ")");
            
            // 如果消息体大小超过了允许的最大值,抛出异常
            if (maxSize != UNLIMITED && receiveSize > maxSize)
                throw new InvalidReceiveException("Invalid receive (size = " + receiveSize + " larger than " + maxSize + ")");
            
            // 设置请求的缓冲区大小,可能是 0(例如用于处理某些 SASL 消息)
            requestedBufferSize = receiveSize;
            
            // 如果消息体大小为 0,表示没有有效的消息体,则使用空缓冲区
            if (receiveSize == 0) {
                buffer = EMPTY_BUFFER;
            }
        }
    }

    // 如果 buffer 仍然为 null 且 requestedBufferSize 不是 -1,表示还没有为消息体分配缓冲区
    if (buffer == null && requestedBufferSize != -1) {
        // 尝试为消息体分配请求的缓冲区大小
        buffer = memoryPool.tryAllocate(requestedBufferSize);
        
        // 如果内存池没有足够的空间分配缓冲区,记录警告日志
        if (buffer == null)
            log.trace("Broker low on memory - could not allocate buffer of size {} for source {}", requestedBufferSize, source);
    }

    // 如果缓冲区已分配
    if (buffer != null) {
        // 从 channel 中读取数据到 buffer 缓冲区
        int bytesRead = channel.read(buffer);
        
        // 如果读取到文件末尾,抛出 EOFException 异常
        if (bytesRead < 0)
            throw new EOFException();
        
        read += bytesRead;  // 累加已读取的字节数
    }

    return read;  // 返回已读取的总字节数
}

NetworkReceive 并不主动判断数据是否已经准备就绪,它的生命周期和状态完全由外部组件(如 Processor 线程)来管理。它只是一个数据容器,负责保存消息头和消息体。

//判断是否已完成的方式,就是看看两个缓冲区是否都已填满    
@Override
public boolean complete() {
     return !size.hasRemaining() && buffer != null && !buffer.hasRemaining();
}

与 Netty 的比较

读取方式:Netty 提供了更高层次的封装,自动化地从 SocketChannel 中读取数据,并将其存储在 ByteBuf 中,极大地简化了开发者的工作。而Kafka则需要手动从SocketChannel中读取数据,给开发者带来了更多的控制力,但也增加了代码的复杂度。

解码方式:Netty 提供了 ByteToMessageDecoder 这样的解码器来处理粘包和拆包问题,帮助开发者轻松应对消息边界问题。与此相比,Kafka需要开发者手动实现更多的解码逻辑,因此代码量较大,当然灵活性也更高。

读取数据量:Kafka 每次最多从 Socket 的内核缓冲区读取一条完整的消息,这种设计确保了每次读取的数据都是单一的完整消息,有助于提高处理的精确性和效率。相比之下,Netty 则可能在一次读取中处理多个消息,这取决于数据的大小和 ByteBuf 的容量。Kafka 的这种方式能避免一次读取过多数据,减少了内存的压力,同时也能给其他 Channel 留出更多的处理机会,提升系统的响应性。

 

标签:读取,处理,Kafka,源码,消息,缓冲区,channel,拆包
From: https://www.cnblogs.com/longfurcat/p/18664750

相关文章

  • springboot汽车租赁智慧管理-计算机毕业设计源码96317
    基于springboot的汽车租赁智慧管理设计与实现-可点击查看演示录像https://member.bilibili.com/platform/upload-manager/article目 录第1章引 言1.1选题背景1.2研究现状1.3论文结构安排第2章系统的需求分析2.1系统可行性分析2.1.1技术方面可行......
  • springboot食物营养分析平台-计算机毕业设计源码75335
    基于SpringBoot的食物营养分析平台-可点击查看演示录像https://www.bilibili.com/video/BV1LuCtYXE6i/?vd_source=72970c26ba7734ebd1a34aa537ef5301摘要随着我国经济的发展,人民生活水平的提高,人们的饮食己由温饱型转向营养型。因此,营养问题日益受到重视。食物营养分析平台......
  • 【精选】基于Java的新闻发布及管理系统设计与实现(源码+定制+开发)新闻发布管理系统、在
    博主介绍:  ✌我是阿龙,一名专注于Java技术领域的程序员,全网拥有10W+粉丝。作为CSDN特邀作者、博客专家、新星计划导师,我在计算机毕业设计开发方面积累了丰富的经验。同时,我也是掘金、华为云、阿里云、InfoQ等平台的优质作者。通过长期分享和实战指导,我致力于帮助更多学生......
  • 计算机毕设项目源码 大数据深度学习 教育机构信息管理系统
    标题:教育机构信息管理系统教育机构信息管理系统的基本功能主要包括以下几个方面:1.用户管理用户注册与登录:教职工、学生及家长可以注册账户并登录。角色管理:根据不同的用户角色(如管理员、教师、学生、家长)设置相应的权限。2.学生管理学生信息管理:记录学生的基本信......
  • 计算机毕业设计源码 大数据深度学习 健身俱乐部管理系统
    标题:健身俱乐部管理系统健身俱乐部管理系统的基本功能主要包括以下几个方面:1.用户管理用户注册与登录:会员可以注册新账号并登录,管理人员可以登录后台进行管理。会员信息管理:记录和管理会员的基本信息,包括姓名、联系方式、性别、年龄、入会日期等。2.会员管理会员档......
  • 一对一视频直播源码,scss的推展写法详解
    scss拓展写法.a{.b{&:hover{width:100px;}&.c{width:100px;}&-d{width:100px;font:{size:20px;}}......
  • 图像识别-迁移学习-AlexNet-AlexNet源码
    文章目录迁移学习深度学习框架中可用的分类预训练模型AlexNettransforms.ToTensor()**`transforms.ToTensor()`的作用****1.为什么需要`ToTensor()`?****2.`ToTensor()`转换内容****输入数据类型****输出****注意:通道顺序****3.示例代码****3.1转换PIL图片****......
  • 2025年毕设ssm网上订餐论文+源码
    本系统(程序+源码)带文档lw万字以上 文末可获取一份本项目的java源码和数据库参考。系统程序文件列表开题报告内容选题背景随着互联网技术的飞速发展,网上订餐已成为现代生活的重要组成部分,极大地便利了人们的日常生活。关于网上订餐系统的研究,现有研究主要以电商平台和餐饮......
  • 2025年毕设ssm网上花店管理系统论文+源码
    本系统(程序+源码)带文档lw万字以上 文末可获取一份本项目的java源码和数据库参考。系统程序文件列表开题报告内容选题背景随着互联网的普及和电子商务的迅猛发展,网上购物已成为人们日常生活的一部分。花店作为传统零售业的一部分,也逐渐开始转型,通过线上平台拓展销售渠道。......
  • 时装购物|时装购物系统|基于springboot的时装购物系统设计与实现(源码+数据库+文档)
    时装购物系统目录目录基于springboot的时装购物系统设计与实现一、前言 二、系统功能设计三、系统实现5.1管理员功能模块 四、数据库设计1、实体ER图  2、具体的表设计如下所示:五、核心代码 六、论文参考  七、最新计算机毕设选题推荐八、源码获取:博主......