首页 > 编程语言 >Netty源码学习5——服务端是如何读取数据的

Netty源码学习5——服务端是如何读取数据的

时间:2023-11-26 15:33:25浏览次数:49  
标签:Netty ByteBufAllocator 读取数据 allocHandle 源码 内存 ByteBuf 读取

系列文章目录和关于我

零丶引入

在前面《Netty源码学习4——服务端是处理新连接的&netty的reactor模式》的学习中,我们了解到服务端是如何处理新连接的,即注册ServerSocketChannel对accept事件感兴趣,然后包装ServerSocketChannel为NioServerSockectChannel,最后由主Reactor在循环中利用selector进行IO多路复用产生事件,如果产生accept事件那么调用ServerSocketChannel#accept将其结果(SocketChannel)包装为NioSockectChannel,然后传播channelRead事件,然后由ServerBootstrapAcceptor 将NioSockectChannel注册到子Reactor中。

图片

也就是说ServerBootstrapAcceptor 是派活的大哥,属于main reactor,而真正干活的是子reactor中的NioEventLoop,它们会负责后续的数据读写与写入。

这一篇我们就来学习NioSockectChannel是如何读取数据的。

一丶子Reactor打工人NioEventLoop处理Read事件

源码学习的入口和服务端处理accept事件一致

image-20231119144133597

区别在于这里的Channel是NioSocketChannel,而不是NioServerSockectChannel,并且就绪的事件是READ(这是因为注册到Selector上附件是包装了SocketChannel的NioSocketChannel,感兴趣的事件是read)并且这里的线程是worker NioEventLoopGroup中的线程!

image-20231126120615227

二丶NioSockectChannelUnsafe读取数据

可以看到子reactor线程读取客户端发送的数据,使用的是NioSockectChannelUnsafe#read方法。如下是read方法源码:

public final void read() {
    final ChannelConfig config = config();
    // 省略read-half相关处理
    final ChannelPipeline pipeline = pipeline();
    // ByteBuf分配器,默认为堆外内存+池化分配器
    final ByteBufAllocator allocator = config.getAllocator();
    // allocHandle用来控制下面读取流程
    final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
    allocHandle.reset(config);

    ByteBuf byteBuf = null;
    boolean close = false;
    try {
        do {
            // allocHandle使用allocator来分配内存给byteBuf
            byteBuf = allocHandle.allocate(allocator);
            // doReadBytes读取数据到byteBuf,记录最后一次读取的字节数
            allocHandle.lastBytesRead(doReadBytes(byteBuf));
            // 小于0==>通道已到达流结束
            if (allocHandle.lastBytesRead() <= 0) {
                // 释放byteBuf
                byteBuf.release();
                byteBuf = null;
                close = allocHandle.lastBytesRead() < 0;
                if (close) {
                    // There is nothing left to read as we received an EOF.
                    readPending = false;
                }
                break;
            }
			
            // 记录读取次数+1
            allocHandle.incMessagesRead(1);
            readPending = false;
            // 触发channelRead
            pipeline.fireChannelRead(byteBuf);
            byteBuf = null;
        } while (allocHandle.continueReading());//判断是否继续读
		
        allocHandle.readComplete();
        // 触发readComplete
        pipeline.fireChannelReadComplete();
		
        if (close) {
            closeOnRead(pipeline);
        }
    } catch (Throwable t) {
        // 省略
    } finally {
       // 省略
    }
}
  • 可以看到每次读取到数据都会触发channelRead,读取完毕后会触发readComplete

    我们的业务逻辑就需要自己实现ChannelHandler#channelRead和channelReadComplete进行数据处理(解码,执行业务操作,编码,写回)

  • 可以看到是否继续读取客户端发送数据,是由allocHandle.continueReading()决定的,并且读取客户端的数据会存放到ByteBuf中,ByteBuf的分配是allocHandle.allocate(allocator)来控制

2.0 读取客户端发送的数据

我们先忽略ByteBufAllocator和RecvByteBufAllocator ,直接看看doReadBytes(byteBuf)是如何读取的数据

image-20231126142223614

最终调用setBytes进行读取

image-20231126142431394

下面我们看看ByteBufAllocator和RecvByteBufAllocator 在这个过程中取到了什么作用

2.1 ByteBufAllocator

ByteBufAllocator的主要作用是分配和回收ByteBuf对象,以及管理内存的分配和释放。

  • 内存池的管理机制,用于提高内存分配和回收的效率。
  • 支持可选的内存池类型,如池化和非池化等,以根据应用程序的需求进行灵活的内存管理。

image-20231126135012827

如上是ByteBufAllocator的具体实现

  • AbstractByteBufAllocator:这是一个抽象类,提供了一些通用的方法和逻辑,用于创建和管理ByteBuf实例。它是UnpooledByteBufAllocator和PooledByteBufAllocator的基类。

  • UnpooledByteBufAllocator:这是ByteBufAllocator的默认实现。它采用非池化的方式进行内存分配,每次都会创建新的ByteBuf对象,不会使用内存池。

  • PooledByteBufAllocator:这是使用内存池的ByteBufAllocator实现。它通过重用内存池中的ByteBuf对象来提高性能和内存利用率。PooledByteBufAllocator可以根据需求使用池化和非池化的ByteBuf实例。

  • PreferredDirectByteBufAllocator:偏好使用直接内存的分配器,ByteBufAllocator#buffer并没有说明是堆内还是堆外,PreferredDirectByteBufAllocator会优先使用堆内(装饰器模式)image-20231126140416886

  • PreferHeapByteBufAllocator:偏好使用堆内内存的分配器

2.2 RecvByteBufAllocator 与 RecvByteBufAllocator.Handler

ByteBufAllocator是真正分配内存产生ByteBuf的分配器,但是在网络io中通常需要根据读取数据的多少动态调整ByteBuf的。默认情况下netty在读取客户端数据的时候使用的是AdaptiveRecvByteBufAllocator,顾名思义可以调整ByteBuf的RecvByteBufAllocator 实现。

也就是说,ByteBufAllocator是真正负责内存分配的,RecvByteBufAllocator是负责根据网络IO情况去调用ByteBufAllocator调整ByteBuf的。

image-20231126141034205

  • FixedRecvByteBufAllocator:固定分配器。该实现分配固定大小的ByteBuf,不受网络环境和应用程序需求的影响。适用于在已知数据量的情况下进行分配,不需要动态调整大小。
  • AdaptiveRecvByteBufAllocator:自适应分配器。该实现根据当前的网络环境和应用程序的处理能力动态地调整ByteBuf的大小。可以根据实际情况自动增加或减少分配的大小,以优化性能。
  • DefaultMaxMessagesRecvByteBufAllocator:根据最大消息数量的分配器。该实现根据应用程序的需求来控制ByteBuf的分配。可以设置最大消息数量,当达到该数量时,不再分配ByteBuf,以控制内存的使用。
  • DefaultMaxBytesRecvByteBufAllocator:主要根据最大字节数来控制ByteBuf的分配。它与DefaultMaxMessagesRecvByteBufAllocator类似,但是以字节数为基础而不是消息数量。
  • ServerChannelRecvByteBufAllocator:控制服务端接收缓冲区大小

其内部还有一个Handler,Handler才是真正实现这些逻辑的类,这样做法的好处在于解耦合——RecvByteBufAllocator和Handler是松耦合的,多个RecvByteBufAllocator可以基于相同的Handler。

三丶 AdaptiveRecvByteBufAllocator

如下是 AdaptiveRecvByteBufAllocator#HandleImpl在读取客户端数据的过程中取到的作用:

image-20231126143057181

3.1 分配ByteBuf&控制ByteBuf大小

image-20231126143209048

可以看到真正分配ByteBuf的是ByteBufAllocator,而大小是AdaptiveRecvByteBufAllocator#HandleImpl使用guess方法猜测出来的

首次guess会返回预设的值(2048)后续该方法根据之前读取数据的多少来“猜”这次使用多大ByteBuf比较合适

image-20231126144411201

这个猜其实就是返回内存中记录的下一次大小,那么是怎么实现猜测的过程的昵?

3.2 “猜“——动态调整ByteBuf大小

image-20231126144343564

可以看到在记录读取数量的时候,如果是满载而归(比如上一次猜需要2048字节,由于客户端发送数据很多,读满了ByteBuf)会调用record进行记录和调整

image-20231126144838342

可以看到容量大小记录在了SIZE_TABLE中,SIZE_TABLE的初始化如下

image-20231126145231870

可以看到AdaptiveRecvByteBufAllocator#HandlerImpl调整的策略有以下特点

  • 小于512的适合,容量大小增长缓慢,大于521的容量翻倍增加
  • 扩容大胆,缩容需要两次判断

3.3 是否继续读取

image-20231126145610438

continueReading会判断是否继续读取:需要开启自动读,且maybe存在更多数据需要读取,且累计读取消息数小于最大消息数,且上一次读到了数据

  • 自动读:默认情况下,Netty的Channel是处于自动读取模式的。这意味着当有新数据可读时,Netty会自动触发读事件,从Channel中读取数据并传递给下一个处理器进行处理。自动读适合在高吞吐量的场景开启,但是如果处理数据的速度跟不上读取数据速度会出现数据堆积,内存占用过高,rt增加的问题。

  • maybe存在更多数据需要读取:

    image-20231126150906833

    其实就是判断上一次读取的字节数和预估的数量是否相等,也就是是否满载而归

  • 累计读取消息数小于最大消息数

    虽然一个NioServerChannel只会绑定到一个线程,但是一个线程可以注册多个NioServerChannel,so如果一个客户端疯狂发数据, 服务端不做干预,将导致这个线程上的其他Channel永远得不到处理

    so netty设置maxMessagePerRead(单次read最多可以读取多少消息——指循环读取ServerChannel多少次)

四丶总结&启下

1.总结

这一篇我们看了NioServerChannel是如何读取数据的,其Unsafe依赖JDK原生的SocketChannel#read(ByteBuffer)来读取数据,但是netty在此之上做了如下优化

  • 使用ByteBufAllocator优化ByteBuf的分配,默认使用池化的直接内存策略

    内存池这一篇没用做过多学习,后续单独学习

  • 使用AdaptiveRecvByteBufAllocator对读取过程进行优化

    • guess会猜测多大的ByteBuf合适(每次读取后进行扩容or缩容)
    • 内部是SIZE_TABLE记录容量大小,小于512的适合,容量大小增长缓慢,大于521的容量翻倍增加
    • 扩容大胆——容量小了1次那么下一次使用SIZE_TABLE下一个下标对应的容量,缩容需要两次判断,连续两次不满足大小才进行缩容
    • 在是否继续读取上雨露均沾——控制最多读取16次,并且会根据读取数据是否满载而归判断是否需要继续读取

2.启下

这一篇我们看到每一次循环读取NioSocketChannel数据后会触发channelRead,读取完毕后会触发readComplete,

我们的业务逻辑就需要自己实现ChannelHandler#channelRead和channelReadComplete进行数据处理(解码,执行业务操作,编码,写回)

那么netty中有哪些内置的编码解码器昵?下一篇我们再来唠唠

标签:Netty,ByteBufAllocator,读取数据,allocHandle,源码,内存,ByteBuf,读取
From: https://www.cnblogs.com/cuzzz/p/17857310.html

相关文章

  • Java互联网+医院智能导诊系统源码 自动兼容H5小程序、Uniapp
    智能导诊系统是一种基于人工智能技术的医疗辅助系统,旨在帮助患者快速找到合适的就诊路径。患者可以通过系统了解医院的科室设置、医生排班、就诊流程等信息,并根据自己的症状和描述,系统会推荐合适的科室和医生。同时,智能导诊系统还可以为医院提供数据分析和决策支持,提高医院的管理效......
  • 一个关于用netty的小错误反思
    一个关于用netty的小认知在使用netty时,观看了黑马的netty网课,没想就直接用他的依赖了依赖如下<dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId><version>4.1.39.Final</version></dependency>不过幸运的是,我意识到了老师的没......
  • 基于python的计算机网络在线考试系统-计算机毕业设计源码+LW文档
    摘 要现代经济快节奏发展以及不断完善升级的信息化技术,让传统数据信息的管理升级为软件存储,归纳,集中处理数据信息的管理方式。本基于python的计算机网络在线考试系统就是在这样的大环境下诞生,其可以帮助管理者在短时间内处理完毕庞大的数据信息,使用这种软件工具可以帮助管理人员......
  • 汽车租赁管理网站-计算机毕业设计源码+LW文档
    摘 要社会发展日新月异,用计算机应用实现数据管理功能已经算是很完善的了,但是随着移动互联网的到来,处理信息不再受制于地理位置的限制,处理信息及时高效,备受人们的喜爱。本次开发一套汽车租赁管理网站有管理员和用户两个角色。管理员功能有个人中心,用户管理,汽车品牌管理,汽车信息管......
  • python021餐厅点餐系统-计算机毕业设计源码+LW文档
    一、课题简介在信息技术逐渐渗入到生活各个方面的今天,互联网在我们的生活中扮演着越来越重要的角色。现在信息化技术不仅可以提高我们的工作效率,而且能有效的规避错误风险,节约人力成本。美食行业现如今也越来越利用信息化技术来提高点餐效率和质量,餐厅点餐管理系统现在也越来越被......
  • 基于python的旅游景点推荐系统-计算机毕业设计源码+LW文档
    摘要 随着科技和网络的进步,计算机技术与网络、生活贴和的更加紧密。需要依靠客户端的单机系统逐渐被淘汰,利用互联网可以处理大量数据的新型系统如雨后春笋般迅速发展起来。这类系统和信息化时代的同步发展对传统的办公管理方式造成了很大的压力。当今时代,信息数据是一切的根本,是......
  • 基于django的4s店客户管理系统-计算机毕业设计源码+LW文档
    摘 要 进入21世纪网络和计算机得到了飞速发展,并和生活进行了紧密的结合。目前,网络的运行速度以达到了千兆,覆盖范围更是深入到生活中的角角落落。这就促使管理系统的发展。网上办公可以实现远程处理事务,远程提交工作和随时追踪工作的状态。网上管理系统给人们带来前所未有的体......
  • 超宽带无线通信技术(UWB)源码,室内定位系统
    UWB超宽带定位技术概念:   超宽带无线通信技术(UWB)是一种无载波通信技术,UWB不使用载波,而是使用短的能量脉冲序列,并通过正交频分调制或直接排序将脉冲扩展到一个频率范围内。UWB的主要特点是传输速率高、空间容量大、成本低、功耗低等,必将成为解决企业、家庭、公共场所等高速因......
  • 【Lustre相关】应用部署-01-源码编译IB驱动及lustre软件包
    一、编译安装系统版本:CentOSLinuxrelease7.9.2009(Core)内核版本:3.10.0-1160.el7.x86_64网卡型号:MellanoxTechnologiesMT2892Family[ConnectX-6Dx]软件版本:lfs2.12.9ib注:使用CentOS-7-x86_64-Everything-2009ISO,选择Minimalinstall安装,勾选Debugging......
  • spring boot工业互联网高精度位置信息服务平台源码
    UWB定位系统源码,UWB人员定位系统全套源码行业背景工业企业多存在很多有毒有害、高危高压等生产环境,带电设备众多,容易发生安全事故;人员只能凭记忆遵守各项生产安全规范,如某些危险区域范围、带电体的安全距离、各项作业的规范;一旦疏忽后果严重,安全作业无后盾;生产安全的重点区域,无全方......