EventLoop
- EventLoop本质上是一个单线程执行器,里面有run方法处理Channel上源源不断的IO事件。
- EventLoop继承了ScheduledExecutorService中的所有方法。
常用方法
- Future<?> submit(Runnable task) 提交任务
- ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) 循环执行,执行前需要等待延时
EventLoopGroup
- EventLoopGroup是一组EventLoop,Channel一般会调用EventLoopGroup的register方法来绑定到其中某个EventLoop,后续该Channel的所有io事件都由它处理。
- 常用实现:NioEventLoopGroup
常用方法
- Future<?> shutdownGracefully() 停止所有EventLoop
Channel
常用方法
- ChannelFuture close() 关闭
- ChannelFuture closeFuture() 获取一个ChannelFuture,当通道关闭后会被通知
- ChannelPipeline pipeline() 获取管道,用以操作
- ChannelFuture write(Object msg) 写数据
- ChannelFuture writeAndFlush(Object msg) 写数据并冲刷
可以使用EmbeddedChannel测试Channel
Future & Promise
- Future是对 JDK Future(
java.util.concurrent.Future
) 接口的一个扩展,以实现异步操作操作。 - Promise又对Future进行了扩展,允许手动设置异步操作的结果。常用实现为DefaultPromise。
JDK Future方法
- V get() 阻塞获取结果
- boolean isDone() 任务是否完成
- boolean isCancelled() 任务是否被取消
Future方法
- V getNow() 获取结果,非阻塞,没有结果返回null
- Future
sync() 阻塞等待任务结束 - boolean isSuccess() 任务是否成功
- Throwable cause() 获取失败的信息,非阻塞,没有失败返回null
- Future
addListener(GenericFutureListener<? extends Future<? super V>> listener) 添加监听器,有结果时会通知,异步获取结果
Promise方法
- Promise
setSuccess(V result) 标记为成功,设置结果并通知监听器 - Promise
setFailure(Throwable cause) 标记为失败,设置失败原因并通知监听器
Handler & Pipeline
- ChannelHandler用来处理Channel上的各种事件,分为入站和出站两种。
- Pipeline由一组ChannelHandler串起来组成。
- 入站处理器通常是ChannelInboundHandlerAdapter的子类,主要用来读取客户端数据,写回结果。(子类调用super.channelRead(ctx,msg)继续向下传递)
- 出站处理器通常是ChannelOutboundHandlerAdapter的子类,主要对写回结果进行加工
- 入站处理器从最上面的处理器往下传递运行,出站处理器从最底下的处理器往上传递运行。注意:nioSocketChannel.writeAndFlush(msg)是从Pipeline的末尾往前找Handler运行,而channelHandlerContext.writeAndFlush(msg)是从当前Handler往前找Handler运行。
ByteBuf
来处理字节数据。
特点
- 支持动态扩容
- 提供了零拷贝相关方法
- 堆缓冲区和直接缓冲区: 直接缓冲区性能高,但是分配与释放慢。
- 支持内存池
内存分配器
内存池减少了频繁分配和释放内存的开销,从而提高了性能。
ByteBufAllocator bba = ByteBufAllocator.DEFAULT; //默认分配器(池化) -Dio.netty.allocator.type={unpooled | pooled} 配置默认方式
ByteBufAllocator bba = PooledByteBufAllocator.DEFAULT; //池化
ByteBufAllocator bba = UnpooledByteBufAllocator.DEFAULT; //非池化
堆缓冲区与直接缓冲区
ByteBuf buf = ByteBufAllocator.DEFAULT.heapBuffer();//堆缓冲区
ByteBuf buf = ByteBufAllocator.DEFAULT.directBuffer();//直接缓冲区
操作
索引:
readerIndex
writerIndex
markedReaderIndex
markedWriterIndex
capacity 容量,容量不够会扩容,超过maxCapacity会报错
maxCapacity 最大容量
写:writerIndex随着写移动
- ByteBuf writeBoolean(boolean value);
- ByteBuf writeByte(int value);
- ByteBuf writeShort(int value);
- ByteBuf writeInt(int value);
- ByteBuf writeLong(long value);
- ByteBuf writeChar(int value);
- ByteBuf writeFloat(float value);
- ByteBuf writeDouble(double value);
- ByteBuf writeBytes(ByteBuf src);
- ByteBuf writeBytes(byte[] src);
- int writeCharSequence(CharSequence sequence, Charset charset);
读:readerIndex随着read相关操作移动
- boolean readBoolean();
- byte readByte();
- short readShort();
- int readInt();
- ......
get/set相关操作:获取或设置指定索引的数据
标记:
- ByteBuf markReaderIndex(); 读标记
- ByteBuf resetReaderIndex(); 将读索引跳转到读标记
其他:
- ByteBuf slice(int index, int length); 切割
释放
基于引用计数器,引用计数器到0时释放内存
boolean release(); 引用计数器-1
ByteBuf retain(); 引用计数器+1
释放时机为最后使用ByteBuf的Handler负责释放