1.NIO现存的问题
1.1 客户端中断导致死循环
详情在3.网络多路复用通信模型中
1.2 粘包/拆包问题
- 可能P1和P2被合在一起发送给了服务端(粘包现象)
- 可能P1和P2的前半部分合在一起发送给了服务端(拆包现象)
- 可能P1的前半部分就被单独作为一个部分发给了服务端,后面的和P2一起发给服务端(也是拆包现象)
解决方案:
- 消息定长,发送方和接收方规定固定大小的消息长度,例如每个数据包大小固定为200字节,如果不够,空位补空格,只有接收了200个字节之后,作为一个完整的数据包进行处理。(声明一个持续进行存储的buffer)
- 在每个包的末尾使用固定的分隔符,比如每个数据包末尾都是
\r\n
,这样就一定需要读取到这样的分隔符才能将前面所有的数据作为一个完整的数据包进行处理。 - 将消息分为头部和本体,在头部中保存有当前整个数据包的长度,只有在读到足够长度之后才算是读到了一个完整的数据包。
2.使用netty-服务端
-
依赖
<dependencies> <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.76.Final</version> </dependency> </dependencies>
-
代码
public static void main(String[] args) { //这里我们使用NioEventLoopGroup实现类即可,创建BossGroup和WorkerGroup //当然还有EpollEventLoopGroup,但是仅支持Linux,这是Netty基于Linux底层Epoll单独编写的一套本地实现,没有使用NIO那套 EventLoopGroup bossGroup = new NioEventLoopGroup(), workerGroup = new NioEventLoopGroup(); //创建服务端启动引导类 ServerBootstrap bootstrap = new ServerBootstrap(); //可链式,就很棒 bootstrap .group(bossGroup, workerGroup) //指定事件循环组 .channel(NioServerSocketChannel.class) //指定为NIO的ServerSocketChannel .childHandler(new ChannelInitializer<SocketChannel>() { //注意,这里的SocketChannel不是我们NIO里面的,是Netty的 @Override protected void initChannel(SocketChannel channel) { //获取流水线,当我们需要处理客户端的数据时,实际上是像流水线一样在处理,这个流水线上可以有很多Handler channel.pipeline().addLast(new ChannelInboundHandlerAdapter(){ //添加一个Handler,这里使用ChannelInboundHandlerAdapter @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { //ctx是上下文,msg是收到的消息,默认以ByteBuf形式(也可以是其他形式,后面再说) ByteBuf buf = (ByteBuf) msg; //类型转换一下 System.out.println(Thread.currentThread().getName()+" >> 接收到客户端发送的数据:"+buf.toString(StandardCharsets.UTF_8)); //通过上下文可以直接发送数据回去,注意要writeAndFlush才能让客户端立即收到 ctx.writeAndFlush(Unpooled.wrappedBuffer("已收到!".getBytes())); } }); } }); //最后绑定端口,启动 bootstrap.bind(8080); }
3.ByteBuf介绍
-
netty中自定义了一个ByteBuf类,相比于NIO中的ByteBuffer,它具有以下优点:
- 写操作完成后无需进行
flip()
翻转。 - 具有比ByteBuffer更快的响应速度。
- 动态扩容。
- 写操作完成后无需进行
-
源码分析
public abstract class AbstractByteBuf extends ByteBuf { ... int readerIndex; //index被分为了读和写,是两个指针在同时工作 int writerIndex; private int markedReaderIndex; //mark操作也分两种 private int markedWriterIndex; private int maxCapacity; //最大容量,没错,这玩意能动态扩容
由此将读和写的指针分为两个单独的变量,这样就不用在读写切换的时候手动调用flip了
-
动态扩容:
-
如果写入后的尺寸没有超过16则扩容到16、超过16没有超过64则扩容到64
-
写入后容量大于64则扩容到下一个2^n
-
扩容最大不超过maxCapacity
-
-
复合模式
- CompositeByteBuf.addComponent(ByteBuf)拼接缓冲区
-
池化缓冲区和非池化缓冲区:池化思想-设置内存池进行内存复用从而减少申请内存带来的性能损耗
4.零拷贝
-
零拷贝是一种I/O操作优化技术,可以快速高效地将数据从文件系统移动到网络接口,而不需要将其从内核空间复制到用户空间
-
传统io的流程:硬盘数据->页缓存(内核空间)->用户空间->socket缓存(内核空间)->网络,一共四次拷贝
-
零拷贝的实现方案
-
虚拟内存方案
现在的操作系统基本都是支持虚拟内存的,我们可以让内核空间和用户空间的虚拟地址指向同一个物理地址,这样就相当于是直接共用了这一块区域,也就谈不上拷贝操作了
-
使用mmap/write内存映射
实际上这种方式就是将内核空间中的缓存直接映射到用户空间缓存,比如我们之前在学习NIO中使用的MappedByteBuffer,就是直接作为映射存在,当我们需要将数据发送到Socket缓冲区时,直接在内核空间中进行操作就行了(仍然存在用户态和内核态的切换)
-
sendfile方式
在Linux2.1开始,引入了sendfile方式来简化操作,我们可以直接告诉内核要把哪个文件数据拷贝拷贝到Socket上,直接在内核空间中一步到位(NIO中的transferTo()就是这种方式)
-
5.netty工作模型
-
模型:clients ----> Boss Group(EventLoopGroup-accept) ---->register ----> Worker Group(EventLoopGroup) ----> Handler
-
设计思路:
- Netty 抽象出两组线程池BossGroup和WorkerGroup,BossGroup专门负责接受客户端的连接, WorkerGroup专门负读写,就像前面说的主从Reactor一样。
- 无论是BossGroup还是WorkerGroup,都是使用EventLoop来进行事件监听的,整个Netty也是使用事件驱动来运作的,比如当客户端已经准备好读写、连接建立时,都会进行事件通知,说白了就像NIO多路复用那样,只不过这里换成EventLoop了而已,它已经帮助我们封装好了一些常用操作,而且我们可以自己添加一些额外的任务,如果有多个EventLoop,会存放在EventLoopGroup中,EventLoopGroup就是BossGroup和WorkerGroup的具体实现。
- 在BossGroup之后,会正常将SocketChannel绑定到WorkerGroup中的其中一个EventLoop上,进行后续的读写操作监听。
6.channel & pipeline详解
6.1 channel
-
netty中重新实现了channel类型
public interface Channel extends AttributeMap, ChannelOutboundInvoker, Comparable<Channel> { ChannelId id(); //通道ID EventLoop eventLoop(); //获取此通道所属的EventLoop,因为一个Channel在它的生命周期内只能注册到一个EventLoop中 Channel parent(); //Channel是具有层级关系的,这里是返回父Channel ChannelConfig config(); boolean isOpen(); //通道当前的相关状态 boolean isRegistered(); boolean isActive(); ChannelMetadata metadata(); //通道相关信息 SocketAddress localAddress(); SocketAddress remoteAddress(); ChannelFuture closeFuture(); //关闭通道,但是会用到ChannelFuture,后面说 boolean isWritable(); long bytesBeforeUnwritable(); long bytesBeforeWritable(); Unsafe unsafe(); ChannelPipeline pipeline(); //流水线,之后也会说 ByteBufAllocator alloc(); //可以直接从Channel拿到ByteBufAllocator的实例,来分配ByteBuf Channel read(); Channel flush(); //刷新 }
-
注:channel中的所有io操作都是异步的
6.2 ChannelHandler
-
ChannelHandler其实就是Reactor模型中的Handler,不过这里进行了抽象,我们只需要实现对应的对象就行。ChannelHandler的两个主要子接口分别是
- interface ChannelInBoundHandler extends ChannelHandler - 入站
- interface ChannelOutBoundHandler extends ChannelHandler - 出站
同时以上二者分别有自身的适配器,ChannelInboundHandlerAdapter和ChannelOutboundHandlerAdapter,使用适配器的好处是适配器实现了接口中的所有方法,那么使用时就不需要开发人员再把所有的方法都实现了,只需要重写部分需要的方法。
-
看源码
public interface ChannelHandler { //当ChannelHandler被添加到流水线中时调用 void handlerAdded(ChannelHandlerContext var1) throws Exception; //当ChannelHandler从流水线中移除时调用 void handlerRemoved(ChannelHandlerContext var1) throws Exception; /** @deprecated 已过时那咱就不管了 */ @Deprecated void exceptionCaught(ChannelHandlerContext var1, Throwable var2) throws Exception; @Inherited @Documented @Target({ElementType.TYPE}) @Retention(RetentionPolicy.RUNTIME) public @interface Sharable { } }
//ChannelInboundHandler用于处理入站相关事件 public interface ChannelInboundHandler extends ChannelHandler { //当Channel已经注册到自己的EventLoop上时调用,前面我们说了,一个Channel只会注册到一个EventLoop上,注册到 EventLoop后,这样才会在发生对应事件时被通知。 void channelRegistered(ChannelHandlerContext var1) throws Exception; //从EventLoop上取消注册时 void channelUnregistered(ChannelHandlerContext var1) throws Exception; //当Channel已经处于活跃状态时被调用,此时Channel已经连接/绑定,并且已经就绪 void channelActive(ChannelHandlerContext var1) throws Exception; //跟上面相反,不再活跃了,并且不在连接它的远程节点 void channelInactive(ChannelHandlerContext var1) throws Exception; //当从Channel读取数据时被调用,可以看到数据被自动包装成了一个Object(默认是ByteBuf) void channelRead(ChannelHandlerContext var1, Object var2) throws Exception; //上一个读取操作完成后调用 void channelReadComplete(ChannelHandlerContext var1) throws Exception; //暂时不介绍 void userEventTriggered(ChannelHandlerContext var1, Object var2) throws Exception; //当Channel的可写状态发生改变时被调用 void channelWritabilityChanged(ChannelHandlerContext var1) throws Exception; //出现异常时被调用 void exceptionCaught(ChannelHandlerContext var1, Throwable var2) throws Exception; }
6.3 ChannelPipeline
-
在Channel初始化的时候会自动创建一个ChannelPipeline(ChannelHandlerContext构成的双向链表),就像它的名字一样,它就像一个流水线一样,这个‘流水线’上会有若干个handler用来处理响应的数据(从时机上分为入站操作和出站操作),此外,‘流水线’两端有两个默认的处理器用于一些预置操作和后续操作,开发者只需要关心中间操作即可。
-
pipeline模型:HeadContext <=> e.g.in1 <=> e.g.out1 <=> e.g.in2 <=> ... <=> TailContext
-
事件触发时机
-
入站事件
-
客户端触发unsafe.read() - inBoundHandler需要调用ctx.fireChannelRead(msg)才能传递到下一个入站handler
-
传递方法:
ChannelHandlerContext.fireChannelRegistered() ChannelHandlerContext.fireChannelActive() ChannelHandlerContext.fireChannelRead(Object) ChannelHandlerContext.fireChannelReadComplete() ChannelHandlerContext.fireExceptionCaught(Throwable) ChannelHandlerContext.fireUserEventTriggered(Object) ChannelHandlerContext.fireChannelWritabilityChanged() ChannelHandlerContext.fireChannelInactive() ChannelHandlerContext.fireChannelUnregistered()
-
-
出站事件
-
调用writeAndFlush()时 - 向前选择身份为outBound(outBound=true)的执行
-
传递方法:
ChannelHandlerContext.bind(SocketAddress, ChannelPromise) ChannelHandlerContext.connect(SocketAddress, SocketAddress, ChannelPromise) ChannelHandlerContext.write(Object, ChannelPromise) ChannelHandlerContext.flush() ChannelHandlerContext.read() ChannelHandlerContext.disconnect(ChannelPromise) ChannelHandlerContext.close(ChannelPromise)
-
-
-
ctx.channel().writeAndFlush()和ctx.writeAndFlush()
-
ctx.writeAndFlush()是从当前handler向前执行outboundHandler 入站事件不会传递到tailcontext
-
ctx.channel().writeAndFlush()是从tail开始向前执行outBoundHandler 入站事件传递到tailcontext
@Override public final ChannelFuture writeAndFlush(Object msg) { return tail.writeAndFlush(msg); // 注解@2 }
-