前言
在上一篇随笔中,我们探讨了如何使用 Netty 处理自定义协议中的粘包和拆包问题。Netty 提供了高度封装的 API,帮助开发者轻松应对这一挑战,因此很多人都对其解决方案非常熟悉。
但如果我们直接使用 Java NIO 来实现类似的功能,应该怎么做呢?
Kafka,作为一个成熟的分布式消息队列系统,正是直接基于 Java NIO 实现的,它的设计与实现方式值得我们深入学习。今天我们就来看看,Kafka 是如何使用 Java NIO 处理粘包拆包问题的。
注:以下内容基于 Kafka 3.9.0 源码进行分析。
Kafka协议格式
Kafka 消息的协议格式比较简单,采用了经典的消息头+消息体格式。消息头包含一个 4 字节的 int
,表示消息体的长度。剩下的部分就是实际的消息体。
简而言之,Kafka 协议的消息格式大致如下:
- 消息头:4 字节,表示消息体的长度。
- 消息体:根据消息头的长度字段读取相应大小的数据。
这种简单的设计方式使得 Kafka 消息的解码变得非常高效,但也带来了一个需要解决的关键问题:如何处理粘包和拆包?
架构背景
为了理解 Kafka 如何处理消息的粘包拆包问题,我们首先来看看 Kafka 的NIO架构。下图展示了 Kafka 的基本架构,其中 Processor
组件负责接收和解析数据包,并将请求写入请求队列(RequestQueue
)等待进一步处理。既然能写入请求队列,则解码操作肯定已经完成了,下面我们具体看看Processor的代码。
Processor处理线程
Kafka 的 Processor
线程负责从网络通道中读取消息,解码并处理请求。以下是 Processor
的 run
函数实现,我们可以看到它执行了一些关键操作:
// SocketServer.scala
override def run(): Unit = { try { while (shouldRun.get()) { try { // 设置新连接 configureNewConnections() // 处理新响应 processNewResponses() // 轮询处理网络事件 poll() // 处理完成的接收操作 processCompletedReceives() // 处理完成的发送操作 processCompletedSends() // 处理断开连接的操作 processDisconnected() // 关闭多余的连接 closeExcessConnections() } catch { case e: Throwable => processException("Processor got uncaught exception.", e) } } } finally { debug(s"Closing selector - processor $id") CoreUtils.swallow(closeAll(), this, Level.ERROR) } }
在 run
方法中,Processor
线程会执行一系列的操作来管理网络连接和消息的处理。可以看到,整个处理过程是通过轮询(poll
)和不同的处理方法(如 processCompletedReceives
和 processCompletedSends
)来实现的。
定位解码逻辑
根据方法名,解包操作应该是在 processCompletedReceives
方法中进行的。然而,实际代码中,我们发现 processCompletedReceives
方法在处理时,已经能够读取到请求头信息,这意味着消息已经是完整的。如果消息还不完整,肯定会存在边界问题。
因此,可以推测,completedReceives
中的消息已经经过了解码处理。接下来,我们只需要找到 completedReceives
是何时写入的,便能清楚了解解码的时机。
//SocketServer.scala private def processCompletedReceives(): Unit = { // 遍历所有已完成的接收(消息) selector.completedReceives.forEach { receive => try { // 根据消息的源(source)获取通道,判断该通道是否处于打开或关闭状态 openOrClosingChannel(receive.source) match { case Some(channel) => // 解析请求头(消息头部分) val header = parseRequestHeader(receive.payload) // 如果消息类型是 SASL 握手请求,可能需要开始重新认证 if (header.apiKey == ApiKeys.SASL_HANDSHAKE && channel.maybeBeginServerReauthentication(receive, () => time.nanoseconds())) { trace(s"Begin re-authentication: $channel") } else { // 获取当前时间戳(用于后续操作) val nowNanos = time.nanoseconds() // 判断通道的认证会话是否过期 if (channel.serverAuthenticationSessionExpired(nowNanos)) { // 如果过期,关闭连接并记录过期连接数 debug(s"Disconnecting expired channel: $channel : $header") close(channel.id) expiredConnectionsKilledCount.record(null, 1, 0) } else { // 获取连接ID(消息来源) val connectionId = receive.source // 构建请求上下文 val context = new RequestContext(header, connectionId, channel.socketAddress, Optional.of(channel.socketPort()), channel.principal, listenerName, securityProtocol, channel.channelMetadataRegistry.clientInformation, isPrivilegedListener, channel.principalSerde) // 创建请求对象 val req = new RequestChannel.Request(processor = id, context = context, startTimeNanos = nowNanos, memoryPool, receive.payload, requestChannel.metrics, None) // KIP-511: 拦截 API_VERSIONS 请求以捕获客户端软件名和版本信息 if (header.apiKey == ApiKeys.API_VERSIONS) { val apiVersionsRequest = req.body[ApiVersionsRequest] if (apiVersionsRequest.isValid) { // 注册客户端信息 channel.channelMetadataRegistry.registerClientInformation(new ClientInformation( apiVersionsRequest.data.clientSoftwareName, apiVersionsRequest.data.clientSoftwareVersion)) } } // 将请求发送到请求通道 requestChannel.sendRequest(req) // 将通道设置为静默状态,防止进一步的操作,直到请求处理完毕 selector.mute(connectionId) // 处理通道的静默事件 handleChannelMuteEvent(connectionId, ChannelMuteEvent.REQUEST_RECEIVED) } } case None => // 如果通道没有在选择器中找到,这应该永远不会发生 // 因为已完成的接收总是会在 `poll()` 后立即处理 throw new IllegalStateException(s"Channel ${receive.source} removed from selector before processing completed receive") } } catch { // 如果处理请求时发生异常,捕获异常并继续处理 case e: Throwable => processChannelException(receive.source, s"Exception while processing request from ${receive.source}", e) } } // 清空已完成的接收队列,确保下次可以处理新的消息 selector.clearCompletedReceives() }
进一步深入代码后,发现解码操作实际上是在 poll()
方法中完成的。其中具体执行流程如下:
Processor
通过 KafkaChannel
和 NetworkReceive
来处理每一条消息。NetworkReceive
是 Kafka 用来包装完整消息的对象,它包含了两个缓冲区(ByteBuffer
):
- 消息头缓冲区:用于存放固定大小的消息头(4 字节)。
- 消息体缓冲区:用于存放动态大小的消息体。
KafkaChannel
任何时候都只持有一个NetworkReceive
对象。意味着它缓存的数据不会超过1条消息。
NetworkReceive类的属性如下:
//NetworkReceive.java
//KafkaChannel id private final String source; //消息头缓存,new的时候固定4字节 private final ByteBuffer size; //最大值,消息体不能超过这个大小。超过则报错 private final int maxSize; //内存分配器,使用这个分配消息体缓存空间 private final MemoryPool memoryPool; //消息体请求分配的空间大小 private int requestedBufferSize = -1; //消息体缓存 private ByteBuffer buffer;
数据读取过程
Kafka 的数据读取过程分为几个步骤:
-
读取消息头:Kafka 会首先尝试从
SocketChannel
中读取消息头。消息头固定为 4 字节,表示消息体的大小。每次读取时,Kafka 会优先读取这 4 字节的数据。 -
计算消息体大小:根据读取到的消息头(消息体的长度),Kafka 会分配一个新的缓冲区用于存放消息体数据。
-
读取消息体:接下来,Kafka 会继续从
SocketChannel
中读取剩余的数据,直到消息体完全被读取到新的缓冲区中。
//NetworkReceive.java public long readFrom(ScatteringByteChannel channel) throws IOException { int read = 0; // 初始化已读取字节数 // 首先检查 size 缓冲区是否还有剩余字节可读 if (size.hasRemaining()) { // 从 channel 中读取数据到 size 缓冲区 int bytesRead = channel.read(size); // 如果读取到文件末尾,抛出 EOFException 异常 if (bytesRead < 0) throw new EOFException(); read += bytesRead; // 累加已读取的字节数 // 如果 size 缓冲区已读取完成(没有剩余字节) if (!size.hasRemaining()) { size.rewind(); // 重置 position 为 0,准备读取数据 // 获取消息体的大小 int receiveSize = size.getInt(); // 如果消息体大小无效(小于 0),抛出异常 if (receiveSize < 0) throw new InvalidReceiveException("Invalid receive (size = " + receiveSize + ")"); // 如果消息体大小超过了允许的最大值,抛出异常 if (maxSize != UNLIMITED && receiveSize > maxSize) throw new InvalidReceiveException("Invalid receive (size = " + receiveSize + " larger than " + maxSize + ")"); // 设置请求的缓冲区大小,可能是 0(例如用于处理某些 SASL 消息) requestedBufferSize = receiveSize; // 如果消息体大小为 0,表示没有有效的消息体,则使用空缓冲区 if (receiveSize == 0) { buffer = EMPTY_BUFFER; } } } // 如果 buffer 仍然为 null 且 requestedBufferSize 不是 -1,表示还没有为消息体分配缓冲区 if (buffer == null && requestedBufferSize != -1) { // 尝试为消息体分配请求的缓冲区大小 buffer = memoryPool.tryAllocate(requestedBufferSize); // 如果内存池没有足够的空间分配缓冲区,记录警告日志 if (buffer == null) log.trace("Broker low on memory - could not allocate buffer of size {} for source {}", requestedBufferSize, source); } // 如果缓冲区已分配 if (buffer != null) { // 从 channel 中读取数据到 buffer 缓冲区 int bytesRead = channel.read(buffer); // 如果读取到文件末尾,抛出 EOFException 异常 if (bytesRead < 0) throw new EOFException(); read += bytesRead; // 累加已读取的字节数 } return read; // 返回已读取的总字节数 }
NetworkReceive
并不主动判断数据是否已经准备就绪,它的生命周期和状态完全由外部组件(如 Processor
线程)来管理。它只是一个数据容器,负责保存消息头和消息体。
//判断是否已完成的方式,就是看看两个缓冲区是否都已填满 @Override public boolean complete() { return !size.hasRemaining() && buffer != null && !buffer.hasRemaining(); }
与 Netty 的比较
读取方式:Netty 提供了更高层次的封装,自动化地从 SocketChannel
中读取数据,并将其存储在 ByteBuf
中,极大地简化了开发者的工作。而Kafka则需要手动从SocketChannel中读取数据,给开发者带来了更多的控制力,但也增加了代码的复杂度。
解码方式:Netty 提供了 ByteToMessageDecoder
这样的解码器来处理粘包和拆包问题,帮助开发者轻松应对消息边界问题。与此相比,Kafka需要开发者手动实现更多的解码逻辑,因此代码量较大,当然灵活性也更高。
读取数据量:Kafka 每次最多从 Socket
的内核缓冲区读取一条完整的消息,这种设计确保了每次读取的数据都是单一的完整消息,有助于提高处理的精确性和效率。相比之下,Netty 则可能在一次读取中处理多个消息,这取决于数据的大小和 ByteBuf
的容量。Kafka 的这种方式能避免一次读取过多数据,减少了内存的压力,同时也能给其他 Channel
留出更多的处理机会,提升系统的响应性。
标签:读取,处理,Kafka,源码,消息,缓冲区,channel,拆包 From: https://www.cnblogs.com/longfurcat/p/18664750