首页 > 其他分享 >4.step into netty

4.step into netty

时间:2024-11-01 21:49:55浏览次数:1  
标签:netty var1 ChannelHandlerContext void EventLoop throws step into Channel

1.NIO现存的问题

1.1 客户端中断导致死循环

详情在3.网络多路复用通信模型中

1.2 粘包/拆包问题

  1. 可能P1和P2被合在一起发送给了服务端(粘包现象)
  2. 可能P1和P2的前半部分合在一起发送给了服务端(拆包现象)
  3. 可能P1的前半部分就被单独作为一个部分发给了服务端,后面的和P2一起发给服务端(也是拆包现象)

解决方案:

  • 消息定长,发送方和接收方规定固定大小的消息长度,例如每个数据包大小固定为200字节,如果不够,空位补空格,只有接收了200个字节之后,作为一个完整的数据包进行处理。(声明一个持续进行存储的buffer)
  • 在每个包的末尾使用固定的分隔符,比如每个数据包末尾都是\r\n,这样就一定需要读取到这样的分隔符才能将前面所有的数据作为一个完整的数据包进行处理。
  • 将消息分为头部和本体,在头部中保存有当前整个数据包的长度,只有在读到足够长度之后才算是读到了一个完整的数据包。

2.使用netty-服务端

  • 依赖

    <dependencies>
        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.76.Final</version>
        </dependency>
    </dependencies>
    
  • 代码

    public static void main(String[] args) {
        //这里我们使用NioEventLoopGroup实现类即可,创建BossGroup和WorkerGroup
        //当然还有EpollEventLoopGroup,但是仅支持Linux,这是Netty基于Linux底层Epoll单独编写的一套本地实现,没有使用NIO那套
        EventLoopGroup bossGroup = new NioEventLoopGroup(), workerGroup = new NioEventLoopGroup();
    
        //创建服务端启动引导类
        ServerBootstrap bootstrap = new ServerBootstrap();
        //可链式,就很棒
        bootstrap
                .group(bossGroup, workerGroup)   //指定事件循环组
                .channel(NioServerSocketChannel.class)   //指定为NIO的ServerSocketChannel
                .childHandler(new ChannelInitializer<SocketChannel>() {   //注意,这里的SocketChannel不是我们NIO里面的,是Netty的
                    @Override
                    protected void initChannel(SocketChannel channel) {
                        //获取流水线,当我们需要处理客户端的数据时,实际上是像流水线一样在处理,这个流水线上可以有很多Handler
                        channel.pipeline().addLast(new ChannelInboundHandlerAdapter(){   //添加一个Handler,这里使用ChannelInboundHandlerAdapter
                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) {  //ctx是上下文,msg是收到的消息,默认以ByteBuf形式(也可以是其他形式,后面再说)
                                ByteBuf buf = (ByteBuf) msg;   //类型转换一下
                                System.out.println(Thread.currentThread().getName()+" >> 接收到客户端发送的数据:"+buf.toString(StandardCharsets.UTF_8));
                                //通过上下文可以直接发送数据回去,注意要writeAndFlush才能让客户端立即收到
                                ctx.writeAndFlush(Unpooled.wrappedBuffer("已收到!".getBytes()));
                            }
                        });
                    }
                });
        //最后绑定端口,启动
        bootstrap.bind(8080);
    }
    

3.ByteBuf介绍

  • netty中自定义了一个ByteBuf类,相比于NIO中的ByteBuffer,它具有以下优点:

    • 写操作完成后无需进行flip()翻转。
    • 具有比ByteBuffer更快的响应速度。
    • 动态扩容。
  • 源码分析

    public abstract class AbstractByteBuf extends ByteBuf {
        ...
        int readerIndex;   //index被分为了读和写,是两个指针在同时工作
        int writerIndex;
        private int markedReaderIndex;    //mark操作也分两种
        private int markedWriterIndex;
        private int maxCapacity;    //最大容量,没错,这玩意能动态扩容
    

    由此将读和写的指针分为两个单独的变量,这样就不用在读写切换的时候手动调用flip了

  • 动态扩容:

    • 如果写入后的尺寸没有超过16则扩容到16、超过16没有超过64则扩容到64

    • 写入后容量大于64则扩容到下一个2^n

    • 扩容最大不超过maxCapacity

  • 复合模式

    • CompositeByteBuf.addComponent(ByteBuf)拼接缓冲区
  • 池化缓冲区和非池化缓冲区:池化思想-设置内存池进行内存复用从而减少申请内存带来的性能损耗

4.零拷贝

  • 零拷贝是一种I/O操作优化技术,可以快速高效地将数据从文件系统移动到网络接口,而不需要将其从内核空间复制到用户空间

  • 传统io的流程:硬盘数据->页缓存(内核空间)->用户空间->socket缓存(内核空间)->网络,一共四次拷贝

  • 零拷贝的实现方案

    1. 虚拟内存方案

      现在的操作系统基本都是支持虚拟内存的,我们可以让内核空间和用户空间的虚拟地址指向同一个物理地址,这样就相当于是直接共用了这一块区域,也就谈不上拷贝操作了

    2. 使用mmap/write内存映射

      实际上这种方式就是将内核空间中的缓存直接映射到用户空间缓存,比如我们之前在学习NIO中使用的MappedByteBuffer,就是直接作为映射存在,当我们需要将数据发送到Socket缓冲区时,直接在内核空间中进行操作就行了(仍然存在用户态和内核态的切换)

    3. sendfile方式

      在Linux2.1开始,引入了sendfile方式来简化操作,我们可以直接告诉内核要把哪个文件数据拷贝拷贝到Socket上,直接在内核空间中一步到位(NIO中的transferTo()就是这种方式)

5.netty工作模型

  • 模型:clients ----> Boss Group(EventLoopGroup-accept) ---->register ----> Worker Group(EventLoopGroup) ----> Handler

  • 设计思路:

    • Netty 抽象出两组线程池BossGroup和WorkerGroup,BossGroup专门负责接受客户端的连接, WorkerGroup专门负读写,就像前面说的主从Reactor一样。
    • 无论是BossGroup还是WorkerGroup,都是使用EventLoop来进行事件监听的,整个Netty也是使用事件驱动来运作的,比如当客户端已经准备好读写、连接建立时,都会进行事件通知,说白了就像NIO多路复用那样,只不过这里换成EventLoop了而已,它已经帮助我们封装好了一些常用操作,而且我们可以自己添加一些额外的任务,如果有多个EventLoop,会存放在EventLoopGroup中,EventLoopGroup就是BossGroup和WorkerGroup的具体实现。
    • 在BossGroup之后,会正常将SocketChannel绑定到WorkerGroup中的其中一个EventLoop上,进行后续的读写操作监听。

6.channel & pipeline详解

6.1 channel

  • netty中重新实现了channel类型

    public interface Channel extends AttributeMap, ChannelOutboundInvoker, Comparable<Channel> {
        ChannelId id();   //通道ID
        EventLoop eventLoop();   //获取此通道所属的EventLoop,因为一个Channel在它的生命周期内只能注册到一个EventLoop中
        Channel parent();   //Channel是具有层级关系的,这里是返回父Channel
        ChannelConfig config();
        boolean isOpen();   //通道当前的相关状态
        boolean isRegistered();
        boolean isActive();
        ChannelMetadata metadata();   //通道相关信息
        SocketAddress localAddress(); 
        SocketAddress remoteAddress();
        ChannelFuture closeFuture();  //关闭通道,但是会用到ChannelFuture,后面说
        boolean isWritable();
        long bytesBeforeUnwritable();
        long bytesBeforeWritable();
        Unsafe unsafe();
        ChannelPipeline pipeline();   //流水线,之后也会说
        ByteBufAllocator alloc();   //可以直接从Channel拿到ByteBufAllocator的实例,来分配ByteBuf
        Channel read();
        Channel flush();   //刷新
    }
    
  • 注:channel中的所有io操作都是异步的

6.2 ChannelHandler

  • ChannelHandler其实就是Reactor模型中的Handler,不过这里进行了抽象,我们只需要实现对应的对象就行。ChannelHandler的两个主要子接口分别是

    • interface ChannelInBoundHandler extends ChannelHandler - 入站
    • interface ChannelOutBoundHandler extends ChannelHandler - 出站

    同时以上二者分别有自身的适配器,ChannelInboundHandlerAdapter和ChannelOutboundHandlerAdapter,使用适配器的好处是适配器实现了接口中的所有方法,那么使用时就不需要开发人员再把所有的方法都实现了,只需要重写部分需要的方法。

  • 看源码

    public interface ChannelHandler {
      	//当ChannelHandler被添加到流水线中时调用
        void handlerAdded(ChannelHandlerContext var1) throws Exception;
    		//当ChannelHandler从流水线中移除时调用
        void handlerRemoved(ChannelHandlerContext var1) throws Exception;
    
        /** @deprecated 已过时那咱就不管了 */
        @Deprecated
        void exceptionCaught(ChannelHandlerContext var1, Throwable var2) throws Exception;
    
        @Inherited
        @Documented
        @Target({ElementType.TYPE})
        @Retention(RetentionPolicy.RUNTIME)
        public @interface Sharable {
        }
    }
    
    //ChannelInboundHandler用于处理入站相关事件
    public interface ChannelInboundHandler extends ChannelHandler {
      	//当Channel已经注册到自己的EventLoop上时调用,前面我们说了,一个Channel只会注册到一个EventLoop上,注册到			EventLoop后,这样才会在发生对应事件时被通知。
        void channelRegistered(ChannelHandlerContext var1) throws Exception;
    		//从EventLoop上取消注册时
        void channelUnregistered(ChannelHandlerContext var1) throws Exception;
    		//当Channel已经处于活跃状态时被调用,此时Channel已经连接/绑定,并且已经就绪
        void channelActive(ChannelHandlerContext var1) throws Exception;
    		//跟上面相反,不再活跃了,并且不在连接它的远程节点
        void channelInactive(ChannelHandlerContext var1) throws Exception;
    		//当从Channel读取数据时被调用,可以看到数据被自动包装成了一个Object(默认是ByteBuf)
        void channelRead(ChannelHandlerContext var1, Object var2) throws Exception;
    		//上一个读取操作完成后调用
        void channelReadComplete(ChannelHandlerContext var1) throws Exception;
    		//暂时不介绍
        void userEventTriggered(ChannelHandlerContext var1, Object var2) throws Exception;
    		//当Channel的可写状态发生改变时被调用
        void channelWritabilityChanged(ChannelHandlerContext var1) throws Exception;
    		//出现异常时被调用
        void exceptionCaught(ChannelHandlerContext var1, Throwable var2) throws Exception;
    }
    

6.3 ChannelPipeline

  • 在Channel初始化的时候会自动创建一个ChannelPipeline(ChannelHandlerContext构成的双向链表),就像它的名字一样,它就像一个流水线一样,这个‘流水线’上会有若干个handler用来处理响应的数据(从时机上分为入站操作和出站操作),此外,‘流水线’两端有两个默认的处理器用于一些预置操作和后续操作,开发者只需要关心中间操作即可。

  • pipeline模型:HeadContext <=> e.g.in1 <=> e.g.out1 <=> e.g.in2 <=> ... <=> TailContext

  • 事件触发时机

    • 入站事件

      • 客户端触发unsafe.read() - inBoundHandler需要调用ctx.fireChannelRead(msg)才能传递到下一个入站handler

      • 传递方法:

        ChannelHandlerContext.fireChannelRegistered()
        ChannelHandlerContext.fireChannelActive()
        ChannelHandlerContext.fireChannelRead(Object)
        ChannelHandlerContext.fireChannelReadComplete()
        ChannelHandlerContext.fireExceptionCaught(Throwable)
        ChannelHandlerContext.fireUserEventTriggered(Object)
        ChannelHandlerContext.fireChannelWritabilityChanged()
        ChannelHandlerContext.fireChannelInactive()
        ChannelHandlerContext.fireChannelUnregistered()
        
    • 出站事件

      • 调用writeAndFlush()时 - 向前选择身份为outBound(outBound=true)的执行

      • 传递方法:

        ChannelHandlerContext.bind(SocketAddress, ChannelPromise)
        ChannelHandlerContext.connect(SocketAddress, SocketAddress, ChannelPromise)
        ChannelHandlerContext.write(Object, ChannelPromise)
        ChannelHandlerContext.flush()
        ChannelHandlerContext.read()
        ChannelHandlerContext.disconnect(ChannelPromise)
        ChannelHandlerContext.close(ChannelPromise)
        
  • ctx.channel().writeAndFlush()和ctx.writeAndFlush()

    • ctx.writeAndFlush()是从当前handler向前执行outboundHandler 入站事件不会传递到tailcontext

    • ctx.channel().writeAndFlush()是从tail开始向前执行outBoundHandler 入站事件传递到tailcontext

      @Override
      public final ChannelFuture writeAndFlush(Object msg) {
          return tail.writeAndFlush(msg); // 注解@2
      }
      

7.EventLoop

标签:netty,var1,ChannelHandlerContext,void,EventLoop,throws,step,into,Channel
From: https://www.cnblogs.com/yuqiu2004/p/18521356

相关文章

  • Java语言的Netty框架+云快充协议1.5+充电桩系统+新能源汽车充电桩系统源码
    云快充协议+云快充1.5协议+云快充1.6+云快充协议开源代码+云快充底层协议+云快充桩直连+桩直连协议+充电桩协议+云快充源码介绍云快充协议+云快充1.5协议+云快充1.6+云快充协议开源代码+云快充底层协议+云快充桩直连+桩直连协议+充电桩协议+云快充源码软件架构1、提供云快......
  • antdesign vue 步骤条a-step按审核人员节点排序显示逻辑
    一、需求内容目前审核人员角色有:学术、法务、售后,串行执行审核流程。审核流程:发起/修改审核-》审核节点审核节点规则:学术-》法务-》售后,每个节点均可以审核或修改。审核状态:发起、修改、待审核、已审核。因此前端根据节点规则来展示审核步骤给用户。二......
  • 处理容器报错:[ERROR] .. Get “http://safeline-fvm/skynetinto“: dial tp: lookup s
    雷池社区版(WAF)是基于容器部署的在容器化应用的部署和运行过程中,我们常常会遇到各种报错信息。其中,形如“[ERROR]detect/skynet.go:114Get“http://safeline-fvm/skynetinto":dialtp:lookupsafeline-fvmon127.0.0.11:53:servermisbehaving”以及“panic:Get......
  • Netty、Go、Apache Tomcat、grpc-go、jetty、nghttp2、Apache Traffic Server是什么
    这些都是与网络编程和服务器应用相关的技术,下面我将分别简要介绍它们:Netty:Netty是一个异步事件驱动的网络应用程序框架,用于快速开发高性能、高可靠性的网络服务器和客户端程序。它支持多种协议,包括HTTP、HTTPS、FTP、SMTP等,广泛应用于游戏、移动、物联网、大数据等领域。......
  • 谈一谈 Netty 的内存管理 —— 且看 Netty 如何实现 Java 版的 Jemalloc
    本文基于Netty4.1.112.Final版本进行讨论在之前的Netty系列中,笔者是以4.1.56.Final版本为基础和大家讨论的,那么从本文开始,笔者将用最新版本4.1.112.Final对Netty的相关设计展开解析,之所以这么做的原因是Netty的内存池设计一直在不断地演进优化。在4.1.52.Final......
  • PyTorchStepByStep - Chapter 6: Rock, Paper, Scissors…
     https://storage.googleapis.com/download.tensorflow.org/data/rps.ziphttps://storage.googleapis.com/download.tensorflow.org/data/rps-test-set.zip ......
  • 二、Netty核心组件
    Netty的核心组件有:BootstrapEventLoopGroupChannelChannelHandlerChannelPipelineChannelHandlerContextChannelOptionByteBufChannelFutureBootstrapBootstrap负责装配Netty的其他组件和启动服务。从上个例子可以看到,Netty的组件较多,如果不使用Bootstrap而是自己负......
  • Let’s Verify Step by Step
    本文是LLM系列文章,针对《Let’sVerifyStepbyStep》的翻译。让我们一步一步地验证摘要1引言2方法3大规模监督4小规模合成监督5OOD泛化6讨论7相关工作8结论摘要近年来,大型语言模型在执行复杂多步推理的能力方面有了很大提高。然而,即使是最先进......
  • 【Coroutines】Deep and Deep Into Kotlin Coroutines
    StructureofCoroutineFramworkcoroutineframworkconsistoftwopartsBasicCoroutineLibrary,whichisnaturallyintegratedinkotlinlanguageCoroutineApplicationFramwork,whichisorganizedtosimplifyadvancedusageofcoroutinesThefirstpartis......
  • PyTorchStepByStep - Chapter 5: Convolutions
     single=np.array([[[[5,0,8,7,8,1],[1,9,5,0,7,7],[6,0,2,4,6,6],[9,7,6,6,8,4],[8,3,8,5,1,3],[7,2,7,0,1,0]]]])single.shape#(1,1,6,6)identity=np.array([[[[0,0,......