首页 > 编程语言 >Netty源码学习7——netty是如何发送数据的

Netty源码学习7——netty是如何发送数据的

时间:2023-12-03 18:11:21浏览次数:42  
标签:Netty 调用 ChannelOutboundBuffer 写入 write 源码 flush 发送数据

零丶引入

系列文章目录和关于我
经过《Netty源码学习4——服务端是处理新连接的&netty的reactor模式《Netty源码学习5——服务端是如何读取数据的》,我们了解了netty服务端是如何建立连接,读取客户端数据的,通过《Netty源码学习6——netty编码解码器&粘包半包问题的解决》我们认识到编解码在网络编程中的作用以及netty是如何解决TCP粘包,半包问题的。

那么netty客户端是如何发送数据的,以及服务端是如何将响应发送给客户端的昵?

在我们之前编写的小demo当中,有如下代码:

image-20231203122424715

可以看到无论是客户端还是服务端发送数据均可调用Channel#writeAndFlush,在此之外netty还可以直接使用ChannelHandlerContext#writeAndFlush写数据,二者有什么区别昵?

这篇文章,我们将深入源码看看netty是如何write和flush的

一丶Write事件的产生和传播

在业务逻辑处理完毕后,需要调用write 或者 writeAndFlush方法

  • ChannelHandlerContext#write or writeAndFlush方法会从当前 ChannelHandler 开始在 pipeline 中向前传播 write 事件直到 HeadContext。
  • ChannelHandlerContext.channel()#write or writeAndFlush 方法则会从 pipeline 的尾结点 TailContext 开始在 pipeline 中向前传播 write 事件直到 HeadContext 。

write 方法并不是真将数据写到socket缓存区,而是写道Netty的ChannelOutBoundBuffer中,调用flush方法才会真正调用JDK SockectChannel将数据写入。

如下是pipeline#writeAndFlush,可以看到直接调用TailContext#writeAndFlush进行处理

image-20231203150809488

关键源码如下:

private void write(Object msg, boolean flush, ChannelPromise promise) {
   // 省略 部分
	
    //flush 表示是否需要flush,调用writeAndFlush的时候为true
    // 找到下一个ChannelHandlerContext
    final AbstractChannelHandlerContext next = findContextOutbound(flush ?
            (MASK_WRITE | MASK_FLUSH) : MASK_WRITE);
    final Object m = pipeline.touch(msg, next);
    EventExecutor executor = next.executor();
    // 在eventLoop 中
    if (executor.inEventLoop()) {
        // 需要flush 那么调用invokeWriteAndFlush
        if (flush) {
            next.invokeWriteAndFlush(m, promise);
        } else {
            next.invokeWrite(m, promise);
        }
    } else {
        // 在eventLoop 中 那么提交一个WriteTask到eventLoop中
        final WriteTask task = WriteTask.newInstance(next, m, promise, flush);
        if (!safeExecute(executor, task, promise, m, !flush)) {
            task.cancel();
        }
    }
}

可以看到TailContext会通过 findContextOutbound 方法在当前 ChannelHandler 的前边找到 ChannelOutboundHandler 类型并且覆盖实现 write 回调方法的 ChannelHandler 作为下一个要执行的对象。

然后如果当前执行的线程就是EventLoop线程,那么直接调用,反之提交一个异步任务,从而保证执行write的一定是 reactor 线程——保证线程安全性

如下是next.invokeWriteAndFlush的源码

image-20231203152143663

最终事件会传播到HeadContext进行处理(如果中间的ChannelHandler不截胡的话)

图片

二丶write 源码解析

write 事件最终会由HeadContext进行处理

image-20231203152515740

可以看到HeadContext#write其实就是使用Channel的Unsafe#write,其主要逻辑如下

image-20231203153403361

ChannelOutboundBuffer#addMessage

ChannelOutboundBuffer 是 Netty 内部使用的一个数据结构,它用于存储待发送的出站数据。在 Netty 的网络框架中,当需要写数据到网络时,数据并不会立即被发送出去,而是首先被放入一个出站缓冲区中,即 ChannelOutboundBuffer。这个缓冲区负责管理和存储所有待写入通道的数据。

  • 批量发送优化: ChannelOutboundBuffer 允许 Netty 批量地发送数据,而不是每次写操作都立即进行网络发送。这样可以减少系统调用次数,提高网络效率。
  • 流量控制: 它有助于实现流量控制,防止数据发送过快,导致接收方处理不过来。
  • 缓冲区管理: 可以有效地管理内存,当数据被写入网络后,及时释放相应的内存。
  • 异步处理: Netty 是异步事件驱动的框架,使用 ChannelOutboundBuffer 可以将数据发送的异步化,提升处理性能

下面是向ChannelOutboundBuffer写入messge的源码

image-20231203163632745

可与看到ChannelOutboundBuffer会将msg和promise包装为Entry,然后改变tailEntry,flushedEntry,unflushedEntry指针的指向

img

然后incrementPendingOutboundBytes将记录下待写出数据size,如果大于高水位还会触发channelWritabilityChanged事件

image-20231203165712236

channelWritabilityChanged会在pipeline上传播,并触发ChannelInboundHandler#channelWritabilityChanged,我们可以实现此方法调用flush将数据写出

三丶flush源码解析

上面看了write将待发送的数据缓存到ChannelOutboundBuffer中,正真将数据写到SocketChannel中的是flush方法

image-20231203170748620

1.addFlush

此方法只是负责更改flushedEntry 和 unflushedEntry 指针指向

image-20231203171224099

将 flushedEntry 指针指向 unflushedEntry 指针表示的第一个未被 flush 的 Entry 节点。并将 unflushedEntry 指针置为空,准备开始 flush 发送数据流程。

这样在 flushedEntry 与 tailEntry 之间的 Entry 节点即为本次 flush 操作需要发送的数据范围。

public void addFlush() {
        Entry entry = unflushedEntry;
        if (entry != null) {
            if (flushedEntry == null) {
                flushedEntry = entry;
            }
            do {
                flushed ++;
                //如果当前entry对应的write操作被用户取消,则释放msg,并降低channelOutboundBuffer水位线
                if (!entry.promise.setUncancellable()) {
                  
                    int pending = entry.cancel();
                    decrementPendingOutboundBytes(pending, false, true);
                }
                entry = entry.next;
            } while (entry != null);

            // All flushed so reset unflushedEntry
            unflushedEntry = null;
        }
    }

2.flush0

image-20231203171814884

可以看到如果注册了write到selector上,那么不会进行flush,

如下是NioSockectChannel发送数据的源码

@Override
    protected void doWrite(ChannelOutboundBuffer in) throws Exception {
        //获取jdk nio底层socketChannel
        SocketChannel ch = javaChannel();
        //最大写入次数 默认为16 ,因为EventLoop可能单线程处理多Channel,需要雨露均沾
        int writeSpinCount = config().getWriteSpinCount();
        do {
            if (in.isEmpty()) {
                // 如果全部数据已经写完 则移除OP_WRITE事件并直接退出writeLoop
                clearOpWrite();             
                return;
            }

            // 获取单次发送最大字节数
            int maxBytesPerGatheringWrite = ((NioSocketChannelConfig) config).getMaxBytesPerGatheringWrite();
            //Netty的DirectBuffer底层就是JDK的DirectByteBuffer
            // 将ChannelOutboundBuffer中缓存的DirectBuffer转换成JDK的ByteBuffer,
            ByteBuffer[] nioBuffers = in.nioBuffers(1024, maxBytesPerGatheringWrite);
            // ChannelOutboundBuffer中总共的DirectBuffer数
            int nioBufferCnt = in.nioBufferCount();

            switch (nioBufferCnt) {
                    // 真正进行发送
               //java.nio.channels.SocketChannel#write(java.nio.ByteBuffer)进行写回
            }
        } while (writeSpinCount > 0);
        // 处理Socket可写但已经写满16次还没写完的情况
     incompleteWrite(writeSpinCount < 0);
    }

可以看到

  • 如果数据全部写完了,会调用clearOpWrite清除当前 Channel 在 Reactor 上注册的 OP_WRITE 事件

    image-20231203173230345

    这意味着,不需要再监听write来触发flush了

  • 写入的过程会写入多次,并控制自旋次数,做到雨露均沾

image-20231203173115191

如上是写入的过程

  • 如果ByteBuffer个数为0,说明发送的是FileRegion 类型,case 0 的分支主要就是用于处理网络文件传输的情况

    image-20231203173837962

  • case1 和 default则调用jdk SocketChannel#write进行数据发送,如果写入的数据小于等于0,说明当前Socket发送缓冲区满了写不进去了,则注册OP_WRITE事件,等待Socket发送缓冲区可写时再写

    image-20231203173938710

    触发Write后,再Sockect写缓冲区可写后,会触发对应事件,即可再NioEventLoop中进行处理,如下图中会直接调用forceFlush

    image-20231203174154759

  • 完成发送会调用adjustMaxBytesPerGatheringWrite进行调整

两个分支分别表示

  • 期望写入和真正写入的相等,说明数据能全部写入到 Socket 的写缓冲区中了,那么下次 write loop 就应该尝试去写入更多的数据。

    本次写入的数量x2>maxBytesPerGatheringWrite 说明要写的数据很多,那么更新为本次 write loop 两倍的写入量大小

  • 如果本次写入的数据还不及尝试写入数据的一半,说明Socket写缓冲区容量不多了,尝试缩容为一半

image-20231203174406700

  • 处理

    protected final void incompleteWrite(boolean setOpWrite) {
        
            if (setOpWrite) {
                //socket缓冲区已满写不进去的情况 注册write事件
                setOpWrite();
            } else {
                //处理socket缓冲区依然可写,但是写了16次还没写完,提交flushTask异步写
                clearOpWrite();
                eventLoop().execute(flushTask);
    
            }
    

四丶总结

这一节中我们学习了netty写入数据的流程,写入数据时出站事件,一般最终将有HeadContext进行处理

  • write方法将写入的数据转换为DirectByteBuf包装到ChannelOutboundBuffer中,并且记录了对应的Promise实现异步驱动,还可以减少系统调用

  • flush方法,调用jdk SocketChannel#write进行写入,使用自旋次数控制,让多个Channel的处理得到平衡,如果Socket 缓冲区满无法在继续写入那么会OP_WRITE 事件,等 Socket 缓冲区变的可写时,epoll 通知 EventLoop线程继续发送。

    Socket 缓冲区可写,写满 16 次但依然没有写完,这时候注册异步任务使用EventLoop线程进行异步发送。如果写的时FileRegion类型,那么会使用transferTo进行零拷贝写入。

标签:Netty,调用,ChannelOutboundBuffer,写入,write,源码,flush,发送数据
From: https://www.cnblogs.com/cuzzz/p/17873524.html

相关文章

  • Java智慧工地一体化解决方案(里程碑管理)源码
    智慧工地为管理人员提供及时、高效、优质的远程管理服务,提升安全管理水平,确保施工安全提高施工质量。实现对人、机、料、法、环的全方位实时监控,变被动“监督”为主动“监控”。一、建设背景施工现场有数量多、分布广,总部统一管理难度大;工地作业流程节点多,缺少过程可视化管理,成本......
  • linux中make编译源码包失败
    报错如下,gcc版本太低^server.c:5346:31:错误:‘structredisServer’没有名为‘server_cpulist’的成员redisSetCpuAffinity(server.server_cpulist);^server.c:在函数‘hasActiveChildProcess’中:server.c:1478:1:警告:在有返回值......
  • CentOS中安装redis源码包
    下载地址#将redis压缩包上传到服务器/home/software,并解压tar-zxvfredis-6.0.6.tar.gz#安装gccyuminstallgcc-c++-y#查看版本gcc-v#进入解压目录#编译make#安装(默认安装到/usr/local/bin,不建议默认安装)#makeinstall#指定安装路径安装(......
  • 基于FPGA的数字时钟设计与实现(含源码)
    随着数字电子技术的不断发展,基于FPGA(现场可编程门阵列)的数字时钟设计方案逐渐成为了一种流行的选择。本篇博客将详细介绍如何利用FPGA实现一个简单的数字时钟,涉及到分频器、数码管驱动、时分秒计数、三八译码器和扫描数码管等模块。1.系统设计概述在本设计中,我们将使用FPGA来实......
  • HashMap超详细源码解析
    原文链接:HashMap和HashSet源码解析1、HashMap概念HashMap实现了Map接口,是一种使用键值对存储数据的数据结构。HashMap允许null作为键和值。HashMap不保证元素的顺序,特别是不保证顺序恒定。HashMap是基于哈希表实现的数据结构,具有快速的插入、删除和查找操作。HashMap使用......
  • 如何开发互联网医院系统源码?互联网医院小程序开发全流程解析
    互联网医院系统源码的开发以及互联网医院小程序的设计是关键环节,本文将为您详细解析开发全流程。 一、需求分析与规划第一步,明确系统的功能模块。同时,规划系统的整体架构、技术栈,在这里需要想到系统的可扩展性和性能。二、数据库设计与建模建立互联网医院系统数据库是整个开发流程......
  • Vue源码学习(十七):实现computed计算属性
    好家伙,本章我们尝试实现computed属性 0.完整代码已开源https://github.com/Fattiger4399/analytic-vue.git 1.分析1.1computed的常见使用方法1.计算依赖数据:当某个数据发生变化时,computed属性可以自动更新,并返回计算结果。例如:<template><div><p>用户姓名:{{u......
  • linux源码趣读总结
    总结linux源码趣读花了半个月左右,看完了闪客的linux源码趣读。感觉之前上的操作系统原理课程只能给你一个模糊的印象,啊,有这个概念来着,有这个算法来着。比起从理论到实践的文字游戏,我还是更喜欢从实践讲理论的脚踏实地。从阅读linux-0.11源码,了解操作系统的构成。所谓的总结......
  • 小市值选股策略代码分享(附python源码)
    小市值选股策略的核心在于通过综合分析公司的基本面、行业定位、财务健康状况以及市场趋势,来寻找那些被市场低估但具备显著成长潜力的股票,同时也要重视风险管理和投资组合的多样化。 今天来给大家分享下小市值策略代码如下:#显式导入BigQuant相关SDK模块frombigdatas......
  • kprobes源码走读
    粗略看了下kernel/kprobes.c下的register_kprobe方法。逻辑:调用kprobe_addr方法来根据symbol或者addr+offset来获取需要劫持的地址,symbol和addr不能同时设置,symbol是利用kprobe_lookup_name->kallsyms_lookup_name来查找内核中的符号地址。检查这个kprobe是否重注册了?......