ServerBootstrap
主要介绍服务端的启动流程以及如何绑定端口号、开启服务端 Socket 并让其进入接收连接状态的
启动模板如下;
try {
ChannelFuture future = new ServerBootstrap().group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel sc) {
if (serverProperties.isSsl()) {
sc.pipeline().addFirst(sslContext.newHandler(sc.alloc()));
}
sc.pipeline().addLast(new IdleStateHandler(0, 0, serverProperties.getIdleTime()));
sc.pipeline().addLast(channelInitializer);
sc.pipeline().addLast(codecExceptionHandler);
sc.pipeline().addLast(serviceHandler);
}
})
.option(ChannelOption.SO_BACKLOG, 4096)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.bind(port);
future.sync().addListener((ChannelFutureListener) f -> {
if (f.isSuccess()) {
log.info("Started {} with Port {} in {} ms", serverName, port, System.currentTimeMillis() - startTime);
}
});
serverChannels.add(future.channel());
future.channel().closeFuture().sync();
} catch (Exception e) {
log.error("Failed to start server. port:'{}'", port, e);
} finally {
log.info("{} - Shutdown initiated...", serverName);
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
log.info("{} - Shutdown completed.", serverName);
}
NioServerSocketChannel 启动过程
简略过程
- 建立 Java 的 ServerSocketChannel 和 Selector,封装为 NioServerSocketChannel
- 为 NioServerSocketChannel pipeline 添加 ServerBootstrapAcceptor(负责处理传入的 NioSocketChannel,为其添加启动模板的 childHandler)
- 将其注册到 NioEventLoopGroup 中的一个 NioEventLoop 中(也就是将 serverSocketChannel 注册到这个 NioEventLoop 创建的 selector 上)
详细过程
其他方法都是进行配置,核心在 bind 方法,之后进入 io.netty.bootstrap.AbstractBootstrap#doBind 方法,它有两个核心方法,一个是 initAndRegister 方法,它负责创建 NioServerSocketChannel、初始化 Pipeline 以及将其注册到 bossGroup 其中的一个 NioEventLoop 中;另一个方法是 doBind0,它负责在前面所有流程完成后将 ServerSocket 绑定到端口号上,开始正式工作。
private ChannelFuture doBind(final SocketAddress localAddress) {
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
if (regFuture.isDone()) {
// At this point we know that the registration was complete and successful.
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
// 没完成就异步注册,其实也是调用 doBind0,省略...
});
return promise;
}
}
initAndRegister
首先看 initAndRegister 方法
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
channel = channelFactory.newChannel();
// init 方法对 nioServerSocketChannel 的 pipeline 添加了一个 ServerBootstrapAcceptor(ChannelInboundHandlerAdapter 类型),它的构造函数里有 workerGroup,并重写了 channelRead 方法,在其触发时将 channel(这个 channel 其实是传递进来的 msg 强转得到的,只不过对于 bossGroup 来说,传进来的消息一定是建立连接后的 nioSocketChannel)
init(channel);
} catch (Throwable t) {
// 省略...
return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
}
ChannelFuture regFuture = config().group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
return regFuture;
}
可以看到通过 initAndRegister 方法内通过 newChannel 创建了 NioServerSocketChannel(启动模板配置的 channel 参数,然后通过反射创建的),然后使用 group().register(channel) 对 NioServerSocketChannel 进行了注册,接着选择了 bossGroup 其中的一个 NioEventLoop 进行 register,毫无疑问,最终的调用是指向 AbstractUnsafe 的 register 方法:
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
ObjectUtil.checkNotNull(eventLoop, "eventLoop");
if (isRegistered()) {
promise.setFailure(new IllegalStateException("registered to an event loop already"));
return;
}
if (!isCompatible(eventLoop)) {
promise.setFailure(
new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
return;
}
AbstractChannel.this.eventLoop = eventLoop;
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
logger.warn(
"Force-closing a channel whose registration task was not accepted by an event loop: {}",
AbstractChannel.this, t);
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
}
可以看到 register 内调用了有 register0,调用是进行了 inEventLoop 判断,此时是主线程启动的,所以实际是异步执行的,先不管异步同步,看看 register0 的实现:
private void register0(ChannelPromise promise) {
try {
// check if the channel is still open as it could be closed in the mean time when the register
// call was outside of the eventLoop
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
boolean firstRegistration = neverRegistered;
doRegister();
neverRegistered = false;
registered = true;
// Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
// user may already fire events through the pipeline in the ChannelFutureListener.
pipeline.invokeHandlerAddedIfNeeded();
safeSetSuccess(promise);
pipeline.fireChannelRegistered();
// 省略...
} catch (Throwable t) {
// 省略...
}
}
可以看到 register0 内部又调用了 io.netty.channel.nio.AbstractNioChannel#doRegister,其中终于发现 javaChannel 被注册到 selector 了,不过它并没有注册兴趣
protected void doRegister() throws Exception {
// 省略...
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
// 省略...
}
register0 还调用了 invokeHandlerAddedIfNeeded,因为前面是通过 ChannelInitializer 添加 ServerBootstrapAcceptor 的,所以需要 invokeHandlerAddedIfNeeded 来使用 PendingHandlerAddedTask 进行 ctx.handlerAdded 通知,此时 ChannelInitializer 的 handlerAdded 被调用就会让 ServerBootstrapAcceptor 真正添加到 pipeline 中。
自此,NioServerSocketChannel(ServerSocket)的创建、pipeline ChannelInitializer 的添加、注册到 selector 就算完成了。
doBind0
这一步很简单,就是将刚才的 NioServerSocketChannel 绑定到真正的地址。
private static void doBind0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) {
// This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up
// the pipeline in its channelRegistered() implementation.
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (regFuture.isSuccess()) {
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}
channel 的 bind 是调用的 pipline 的,而 bind 是出站事件,因此 pipeline 的实现是从 tailContext 开始的,随着一路向前调,最后进入 headContext 的 bind,而后进入 io.netty.channel.AbstractChannel.AbstractUnsafe#bind:
@Override
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
// 省略...
boolean wasActive = isActive(); // NioServerSocketChannel 的 isActive 实现是 isOpen() && javaChannel().socket().isBound()
try {
doBind(localAddress);
} catch (Throwable t) {
safeSetFailure(promise, t);
closeIfClosed();
return;
}
if (!wasActive && isActive()) {
invokeLater(new Runnable() {
@Override
public void run() {
pipeline.fireChannelActive();
}
});
}
safeSetSuccess(promise);
}
doBind 方法将本地地址真正的绑定上了,此时的状态表示已经激活了,所以 pipeline.fireChannelActive() 触发,进而给 selectorKey 注册了 Accept 兴趣:
@Override
public void channelActive(ChannelHandlerContext ctx) {
ctx.fireChannelActive();
readIfIsAutoRead();
}
private void readIfIsAutoRead() {
if (channel.config().isAutoRead()) { // NioServerSocketChannel 和 NioSocketChannel 都是 true
channel.read();
}
}
readIfIsAutoRead 后面说过了,再说一遍,它通过 channel.read() -> pipeline.read() -> tailContext.read() 一系列消息传递最终将其 传递给 headContext.read(),它的 read 又将其转向 io.netty.channel.AbstractChannel.AbstractUnsafe#beginRead 的 read:
// headContext.read()
public void read(ChannelHandlerContext ctx) {
unsafe.beginRead();
}
// AbstractUnsafe#beginRead
public final void beginRead() {
assertEventLoop();
try {
doBeginRead();
} catch (final Exception e) {
invokeLater(new Runnable() {
@Override
public void run() {
pipeline.fireExceptionCaught(e);
}
});
close(voidPromise());
}
}
// io.netty.channel.nio.AbstractNioChannel#doBeginRead
@Override
protected void doBeginRead() throws Exception {
// Channel.read() or ChannelHandlerContext.read() was called
final SelectionKey selectionKey = this.selectionKey;
if (!selectionKey.isValid()) {
return;
}
readPending = true;
final int interestOps = selectionKey.interestOps();
if ((interestOps & readInterestOp) == 0) {
selectionKey.interestOps(interestOps | readInterestOp);
}
}
最终在 AbstractNioChannel 的 doBeginRead 的结尾判断,如果兴趣中没有 readInterestOp 就添加,而 readInterestOp 这个值是通过 AbstractNioChannel 的构造函数传入的,它的两个 Nio 实现类 NioServerSocketChannel 和 NioSocketChannel 分别传入了 SelectionKey.OP_ACCEPT 和 SelectionKey.OP_READ,很明显嘛,NioServerSocketChannel 是用来接收连接的,所以兴趣是 SelectionKey.OP_ACCEPT,而 NioSocketChannel 是与客户端对应的服务端连接,是用来接收和发送客户端消息的,所以兴趣是读数据。
自此,绑定地址以及兴趣注册就完成了
NioSocketChannel 启动过程
首先是连接建立,建立的过程一定是在 NioEventLoop 的死循环由 NioServerSocketChannel 负责的,即 NioEventLoop 的 run 方法的 io.netty.channel.nio.NioEventLoop#processSelectedKeys,里面发现 selectorKey 兴趣是 accept(或 read)就会调用 unsafe 的 read 方法,而 nioServerSocketChannel 的 unsafe 调用的是 io.netty.channel.nio.AbstractNioMessageChannel.NioMessageUnsafe#read:
public void read() {
// 省略...
do {
int localRead = doReadMessages(readBuf);
if (localRead == 0) {
break;
}
if (localRead < 0) {
closed = true;
break;
}
allocHandle.incMessagesRead(localRead);
} while (continueReading(allocHandle));
int size = readBuf.size();
for (int i = 0; i < size; i ++) {
readPending = false;
pipeline.fireChannelRead(readBuf.get(i));
}
// 省略...
}
其他地方全部省略,主要关注 doReadMessages 和 fireChannelRead,doReadMessages 的实现在 NioServerSocketChannel 中,它通过 javaChannel(ServerSocketChannel)serverSocketChannel.accept() 接收了一个连接,并将其封装为 NioSocketChannel 后添加到 readBuf,之后 fireChannelRead 处理每一个刚才接收的 NioSocketChannel,而 nioServerSocketChannel 的 pipeline 中的 ServerBootstrapAcceptor 就重写了该方法:
private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter {
@Override
@SuppressWarnings("unchecked")
public void channelRead(ChannelHandlerContext ctx, Object msg) {
final Channel child = (Channel) msg;
child.pipeline().addLast(childHandler);
setChannelOptions(child, childOptions, logger);
setAttributes(child, childAttrs);
try {
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}
}
它首先将启动模板中设置 childHandler 添加到了 nioSocketChannel(nioServerSocketChannel 传进来的 msg 当然是子连接啦) 的 pipeline 中(完成 childHandler 到 nioSocketChannel 的绑定),又将 msg(NioSocketChannel)注册到 WorkerGroup 的其中一个 EventLoop 中,到此为止就完成了连接的建立以及如何将建立后的连接绑定到 EventLoop。
与 NioServerSocketChannel 在 channelActive 中触发 readIfIsAutoRead 来完成兴趣注册相同,NioSocketChannel 也是在 channelActive 完成兴趣注册的,只是兴趣变成了 Read。
NioEventLoop 事件循环 run 方法
启动 EventLoop
NioEventLoop 是一个 SingleThreadEventExecutor,它的 execute 方法如下
@Override
public void execute(Runnable task) {
execute0(task);
}
private void execute0(@Schedule Runnable task) {
ObjectUtil.checkNotNull(task, "task");
// wakesUpForTask 直接返回的 true,是用于子类扩展的,比如通过 task instanceof 某个类型来决定是否理解唤醒 EventLoop
execute(task, wakesUpForTask(task));
}
private void execute(Runnable task, boolean immediate) {
boolean inEventLoop = inEventLoop();
addTask(task);
if (!inEventLoop) {
startThread();
if (isShutdown()) {
boolean reject = false;
try {
if (removeTask(task)) {
reject = true;
}
} catch (UnsupportedOperationException e) {
// The task queue does not support removal so the best thing we can do is to just move on and
// hope we will be able to pick-up the task before its completely terminated.
// In worst case we will log on termination.
}
if (reject) {
reject();
}
}
}
if (!addTaskWakesUp && immediate) { // 后面讲到唤醒时会说到
wakeup(inEventLoop);
}
}
当不在 eventLoop 中执行时就会判断线程是否启动,如果没有启动的话就进行启动线程:
private void startThread() {
if (state == ST_NOT_STARTED) {
if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) { // 经典 cas 判断是否执行
boolean success = false;
try {
doStartThread();
success = true;
} finally {
if (!success) {
STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED);
}
}
}
}
}
private void doStartThread() {
assert thread == null;
executor.execute(new Runnable() { // executor 是 ThreadPerTaskExecutor,就是每次都创建一个线程执行任务
@Override
public void run() {
thread = Thread.currentThread();
if (interrupted) {
thread.interrupt();
}
boolean success = false;
updateLastExecutionTime();
try {
SingleThreadEventExecutor.this.run();
success = true;
} catch (Throwable t) {
logger.warn("Unexpected exception from an event executor: ", t);
} finally {
// 省略...
}
}
});
}
事件循环
可以看到调用到了 run 方法,这个 run 方法是在 NioEventLoop 中实现的,它是重点!重点!重点!
protected void run() {
int selectCnt = 0;
for (;;) {
try {
int strategy;
try {
// hasTasks() 判断 taskQueue 中是否有任务
// calculateStrategy 方法返回值如下:hasTasks ? selectSupplier.get() : SelectStrategy.SELECT
// 如果 hasTasks 为 true 的话就表示应该立即执行,所以 selectSupplier.get() 即实际调用的 selectNow() 是立即返回 selector 中
// 准备好的 IO 事件,否则 hasTasks 为 false 就表示没有任务可以执行,从 selector 中获取 IO 事件可以是阻塞的(避免 CPU 自旋过度)
strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
// 先看 hasTasks 为 true,此时返回的 strategy 是大于等于 0 的,即跳过 switch,直接进入下一阶段
switch (strategy) {
case SelectStrategy.CONTINUE: // -2
continue;
case SelectStrategy.BUSY_WAIT: // -3
// fall-through to SELECT since the busy-wait is not supported with NIO
case SelectStrategy.SELECT: // -1
// 此时表示任务队列中没任务且此时 selector 没有准备好的 io 事件,即可以阻塞的获取 selector 中准备好的 io 事件
// 阻塞的时间由最将要发生的定时任务的执行时间执行,阻塞的时间如果到了,起码是可以执行这个定时任务的
// 如果没有定时任务,curDeadlineNanos 就为 -1,表示可以无限期阻塞,也就是 selector.select()
long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
if (curDeadlineNanos == -1L) {
// NONE 是 Long.MAX_VALUE
curDeadlineNanos = NONE; // nothing on the calendar
}
// nextWakeupNanos 的作用是记录什么时间后 eventLoop 不在阻塞状态,用于 eventLoop 的唤醒
// 比如 selector.select(5000),即 阻塞 5 秒后再返回,而此时提交了一个新的定时任务 A
// 这个定时任务 A 的执行时间就在发生阻塞时的 2s 后执行,如果不中断 select 的话,这个定时任务 A 就超时执行了
// 为了避免这种情况发生,就需要记录阻塞的时间,并提供通过判断阻塞时间判断是否可以中断(selector.wakeup())select 的手段
nextWakeupNanos.set(curDeadlineNanos);
try {
if (!hasTasks()) { // 进行二次判断,如果还没有的话就阻塞获取准备好的 io 事件
// 如果 curDeadlineNanos 是 NONE 的话就调用 selector.select() 进行无限期的阻塞获取 io 事件
strategy = select(curDeadlineNanos);
}
} finally {
// This update is just to help block unnecessary selector wakeups
// so use of lazySet is ok (no race condition)
// 此时已经不阻塞了,但 nextWakeupNanos 没有被更新
// 因此要更新这个状态,避免后面的 execute 提交任务时对 selector 进行 weakup 唤醒
nextWakeupNanos.lazySet(AWAKE);
}
// fall through
default:
}
} catch (IOException e) {
// If we receive an IOException here its because the Selector is messed up. Let's rebuild
// the selector and retry. https://github.com/netty/netty/issues/8566
rebuildSelector0();
selectCnt = 0;
handleLoopException(e);
continue;
}
selectCnt++;
cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
boolean ranTasks;
// 此时要么是有新任务导致被唤醒
// 要么是 selector 阻塞获取到了 io 事件
// 要么就是 switch 走的 default,即 selectNow 直接就获取到了 io 事件
if (ioRatio == 100) {
// ioRatio 为 100 表示不平衡 io 事件和非 io 事件执行的事件,
// 每次都把所有可以处理的 io 事件、任务队列和定时任务队列的任务处理完再进行下一轮循环
try {
if (strategy > 0) {
processSelectedKeys();
}
} finally {
// Ensure we always run tasks.
ranTasks = runAllTasks();
}
} else if (strategy > 0) {
// 判断 strategy > 0 即判断是否存在 io 事件,如果存在就先处理 io 事件,然后根据 ioRatio 计算非 io 事件执行时间
// ioRatio 不为 100 就表示要平衡 io 事件和非 io 事件,该值越小 io 事件执行占的总时间就越小
final long ioStartTime = System.nanoTime();
try {
processSelectedKeys(); // 首先处理 io 事件
} finally {
// Ensure we always run tasks.
// 处理完 io 事件就计算其占用事件,根据该时间再反向计算出非 io 事件可以占用的执行时间
final long ioTime = System.nanoTime() - ioStartTime;
ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
} else {
// 此时表示不存在 io 事件,尝试运行任务队列中的任务
// 因为传的时间的 0,也就是说只运行最少数量的任务(大于 64 就是最多运行 64 个任务,详见后面 runAllTasks 方法的分析)
ranTasks = runAllTasks(0); // This will run the minimum number of tasks
}
if (ranTasks || strategy > 0) {
if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
selectCnt - 1, selector);
}
selectCnt = 0;
} else if (unexpectedSelectorWakeup(selectCnt)) { // Unexpected wakeup (unusual case)
selectCnt = 0;
}
} catch (CancelledKeyException e) {
// Harmless exception - log anyway
if (logger.isDebugEnabled()) {
logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
selector, e);
}
} catch (Error e) {
throw e;
} catch (Throwable t) {
handleLoopException(t);
} finally {
// Always handle shutdown even if the loop processing threw an exception.
try {
if (isShuttingDown()) { // 最后进行 EventLoop 是否关闭的检测
closeAll(); // 先关闭所有连接
if (confirmShutdown()) { // 取消所有定时任务,执行所有任务队列中的任务
return;
}
}
} catch (Error e) {
throw e;
} catch (Throwable t) {
handleLoopException(t);
}
}
}
}
唤醒
NioEventLoop 的唤醒方法如下:
@Override
protected void wakeup(boolean inEventLoop) {
if (!inEventLoop && nextWakeupNanos.getAndSet(AWAKE) != AWAKE) { // 如果不是唤醒状态就唤醒,AWAKE 等于 -1
selector.wakeup();
}
}
回顾前面的 NioEventLoop execute 方法,如果方法需要立即执行,调用的就是 execute(Runnable)(包括提交的是定时任务,比如该定时任务提交了就发现已经可以执行了这种),最后的 !addTaskWakesUp && immediate 也就为 true,即尝试唤醒:
@Override
public void execute(Runnable task) {
execute0(task);
}
@Override
public void lazyExecute(Runnable task) {
lazyExecute0(task);
}
private void execute0(@Schedule Runnable task) {
ObjectUtil.checkNotNull(task, "task");
// wakesUpForTask 直接返回的 true,是用于子类扩展的,比如通过 task instanceof 某个类型来决定是否理解唤醒 EventLoop
execute(task, wakesUpForTask(task));
}
private void lazyExecute0(@Schedule Runnable task) {
execute(ObjectUtil.checkNotNull(task, "task"), false); // 此时不进行唤醒
}
private void execute(Runnable task, boolean immediate) {
boolean inEventLoop = inEventLoop();
addTask(task);
if (!inEventLoop) {
startThread();
if (isShutdown()) {
boolean reject = false;
try {
if (removeTask(task)) {
reject = true;
}
} catch (UnsupportedOperationException e) {
// The task queue does not support removal so the best thing we can do is to just move on and
// hope we will be able to pick-up the task before its completely terminated.
// In worst case we will log on termination.
}
if (reject) {
reject();
}
}
}
if (!addTaskWakesUp && immediate) {
wakeup(inEventLoop);
}
}
而如果提交的是定时任务时就对其时间进行判断,如果执行时间在唤醒的时间前就提交任务并主动唤醒,否则就只提交任务,不主动唤醒:
private <V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) {
if (inEventLoop()) {
scheduleFromEventLoop(task); // 不明白为什么在 EventLoop 中就直接插到定时任务队列中,而不是像下面一样
} else {
final long deadlineNanos = task.deadlineNanos();
// task will add itself to scheduled task queue when run if not expired
if (beforeScheduledTaskSubmitted(deadlineNanos)) { // 如果是在阻塞唤醒的时间前执行,就直接执行,并主动唤醒
execute(task);
} else {
lazyExecute(task); // 否则提交到任务队列就好,不需要主动唤醒(话说为什么要提交到任务队列而不是定时任务队列???)
// Second hook after scheduling to facilitate race-avoidance
if (afterScheduledTaskSubmitted(deadlineNanos)) {
// 可能这个任务 B 只比前面的任务 A 晚一毫秒执行,但前面的任务执行 A 完这个任务才添加进去,导致这个定时任务 B 需要等到下一个定时任务 C 时间触发才执行,为了避免这种情况,需要添加到队列后再检查一遍执行时间
execute(WAKEUP_TASK);
}
}
}
return task;
}
protected boolean beforeScheduledTaskSubmitted(long deadlineNanos) {
// Note this is also correct for the nextWakeupNanos == -1 (AWAKE) case
return deadlineNanos < nextWakeupNanos.get(); // 根据阻塞的唤醒时间判断定时任务是否应该在其之前执行
}
protected boolean afterScheduledTaskSubmitted(long deadlineNanos) {
// Note this is also correct for the nextWakeupNanos == -1 (AWAKE) case
return deadlineNanos < nextWakeupNanos.get();
}
处理 io 事件
主要的流程说完了,但如何处理的 io 事件和非 io 事件还没解释,那么首先看 io 事件的处理
private void processSelectedKeys() {
if (selectedKeys != null) {
// processSelectedKeysOptimized 处理的是 netty 优化后的 selector,它与 java 原生不同的是存储 selectionKey 的数据结构
// java 原生的是 hashSet,而 netty 的是数组,而数组的遍历是比 hashSet 快的
processSelectedKeysOptimized();
} else {
processSelectedKeysPlain(selector.selectedKeys());
}
}
最终的调用指向 io.netty.channel.nio.NioEventLoop#processSelectedKey(java.nio.channels.SelectionKey, io.netty.channel.nio.AbstractNioChannel)
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
if (!k.isValid()) {
final EventLoop eventLoop;
try {
eventLoop = ch.eventLoop();
} catch (Throwable ignored) {
// If the channel implementation throws an exception because there is no event loop, we ignore this
// because we are only trying to determine if ch is registered to this event loop and thus has authority
// to close ch.
return;
}
// Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop
// and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is
// still healthy and should not be closed.
// See https://github.com/netty/netty/issues/5125
if (eventLoop == this) {
// close the channel if the key is not valid anymore
unsafe.close(unsafe.voidPromise());
}
return;
}
try {
int readyOps = k.readyOps();
// We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise
// the NIO JDK channel implementation may throw a NotYetConnectedException.
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
// remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
// See https://github.com/netty/netty/issues/924
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
unsafe.finishConnect();
}
// Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
// Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
unsafe.forceFlush();
}
// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
// to a spin loop
// 这里进行了兴趣判断,对于 NioServerSocketChannel 来说,readyOps & SelectionKey.OP_ACCEPT 为 true
// 对于 NioSocketChannel 来说,readyOps & SelectionKey.OP_READ 为 true
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
// 重点
// 重点
// 重点
unsafe.read();
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}
上面的 unsafe.read() 是重点!是重点!是重点!
- 对于 NioServerSocketChannel 来说,调用的是 NioMessageUnsafe#read,里面完成了 NioSocketChannel 的创建、初始化、绑定以及兴趣注册(包括 pipeline 的 fireChannelRead 和 fireChannelReadComplete,只不过它的兴趣注册是在绑定后由 channelActive 注册的,前面提到过)
- 对于 NioSocketChannel 来说,调用的是 NioByteUnsafe#read,里面完成了消息读取(pipeline 的 fireChannelRead 和 fireChannelReadComplete,其中 fireChannelReadComplete 调用 HeadContext 完成了 readIfIsAutoRead,即 Read 兴趣的注册)
处理非 io 事件
protected boolean runAllTasks(long timeoutNanos) {
fetchFromScheduledTaskQueue(); // 将所有到达执行时间的定时任务全部迁移到 taskQueue 中
Runnable task = pollTask(); // 从 taskQueue 中取一个任务
if (task == null) {
afterRunningAllTasks();
return false;
}
// 根据 timeoutNanos 计算执行的截止时间,到达这个时间后就不再继续执行,剩余的任务等下次再执行
final long deadline = timeoutNanos > 0 ? getCurrentTimeNanos() + timeoutNanos : 0;
long runTasks = 0;
long lastExecutionTime;
for (;;) {
safeExecute(task); // 执行任务,如果有异常就只打印日志,不抛异常
runTasks ++; // 对任务计数
// Check timeout every 64 tasks because nanoTime() is relatively expensive.
// XXX: Hard-coded value - will make it configurable if it is really a problem.
if ((runTasks & 0x3F) == 0) { // 每 64 个任务检测一次截止时间,因为 getCurrentTimeNanos() 是比较昂贵(耗时)的操作
lastExecutionTime = getCurrentTimeNanos();
if (lastExecutionTime >= deadline) { // 如果超过截止时间,就立即返回
break;
}
}
task = pollTask(); // 一直从任务队列中获取并执行
if (task == null) {
lastExecutionTime = getCurrentTimeNanos();
break;
}
}
afterRunningAllTasks();
this.lastExecutionTime = lastExecutionTime;
return true;
}
EventLoop
EventExecutorGroup
io.netty.util.concurrent 包下,继承 ScheduledExecutorService
- next 方法:用于获取一个 EventExecutor
- execute 方法:next 获取一个 EventExecutor 并提交给它
- schedule 方法:因为实现了 ScheduledExecutorService 接口,所以可以实现定时任务,同样是交给 EventExecutor 执行。
主要负责管理 EventExecutor 生命周期,其实它还重写了 shutdown 和 shutdownNow 将它们标记为废弃,并新建了一个 shutdownGracefully 来保证安全关闭所有的 EventExector。
EventExecutor
继承 EventExecutorGroup,很奇怪,如果是我的话我不这样写。
-
execute:真正的执行在这里,具体由子类指定
-
inEventLoop 方法:判断当前线程是否是在 EventExecutor 中执行。
主要负责任务的执行,以及判断线程是否是事件循环的线程。
EventLoopGroup
io.netty.channel 包下,EventLoopGroup 和 EventLoop 继承关系如下:
- register 方法:注册一个 Channel 到 next 方法提供的 EventLoop 中
它主要是负责将 Channel 到注册 EventLoop
EventLoop
处理 Channel 有关的所有 IO 操作,一个 EventLoop 对应多个 Channel。
- register 方法:注册一个 Channel 到当前 EventLoop。
register 方法虽然是从 EventLoopGroup 继承来的,但 EventLoop 才是真正的核心实现。
NioEventLoop
NioEventLoop 继承的比较多,核心是下面几个:
- AbstractScheduledEventExecutor:netty 的 EventExeutor,提供定时任务的支持
- SingleThreadEventExecutor:单线程执行所有提交的任务
- SingleThreadEventLoop:单线程的 EventLoop
因此 NioEventLoop 是单线程处理它负责的 Channel 的 IO 等任务的!!!
AbstractEventExecutor
继承 AbstractExecutorService,并实现 EventExecutor 接口。
- 重写 AbstractExecutorService submit 方法:修改返回类型为 Netty 自己实现的 Future(当然这个 Future 是继承的 java 的,否则重写改不了返回类型)
- 重写 newTaskFor 方法:AbstractExecutorService 允许自定义 Task,因此从写为 Netty 的 PromiseTask
- 重写一堆定时任务方法为 UnsupportedOperationException 不可用,这些方法是 EventExecutor 接口继承来的(会在 AbstractEventExecutor 的子类 AbstractScheduledEventExecutor 中重新支持)
AbstractScheduledEventExecutor
继承 AbstractExecutorService,提供了对定时任务的支持
-
增加 PriorityQueue<ScheduledFutureTask<?>> scheduledTaskQueue 字段用来存放定时任务
-
再次重写 AbstractEventExecutor 中被定义为不可用的定时任务方法。添加定时任务时如果不在事件循环中就需要判断一下事件,如果当前时间晚于任务时间,那么应该立即自行,否则提交到 执行 execute 方法,(由子类实现来确定如何执行,如何提交定时任务?channel.executor().schedule())
-
增加一堆 scheduledTaskQueue 相关的任务方法,比如 pollScheduledTask(队头定时任务符合达到执行时间就返回,否则返回 null)等
SingleThreadEventExecutor
- 增加 Queue<Runnable> taskQueue 字段:任务队列。提交任务的线程不是 EventLoop 线程,就放在这里。默认 LinkedBlockingQueue 类型,也预留了 protected 的 newTaskQueue 方法用于自定义队列
- 重写 execute 方法:将任务提交到 taskQueue 字段,等待执行,如果 EventLoop 正在阻塞等待执行的状态,此时还有是否将 EventLoop 唤醒的功能(通过提交一个空的 Runnable 到 taskQueue 中),如果是 execute 方法,
- 和 AbstractScheduledEventExecutor 对 scheduledTaskQueue 的支持一样,增加了一堆 taskQueue 相关方法,比如 pollTask 等(获取一个任务)
- 增加 takeTask 方法:核心方法,但 NioEventLoop 从不调用该方法哦!!!作用是获取任务,来源包括 taskQueue 和 scheduledTaskQueue,如果线程被中断或者被唤醒,返回 null。方法如下:
protected Runnable takeTask() {
assert inEventLoop(); // 断言保证是在事件循环中
if (!(taskQueue instanceof BlockingQueue)) { // 必须是阻塞队列,可以让线程阻塞,阻塞天然线程安全,非常好用
throw new UnsupportedOperationException();
}
BlockingQueue<Runnable> taskQueue = (BlockingQueue<Runnable>) this.taskQueue;
for (;;) {
ScheduledFutureTask<?> scheduledTask = peekScheduledTask();// 先获取定时任务,因为可能已经过期了
if (scheduledTask == null) { // 如果没有就获取任务队列中的
Runnable task = null;
try {
task = taskQueue.take();
if (task == WAKEUP_TASK) { // 是被唤醒,返回空
task = null;
}
} catch (InterruptedException e) {
// Ignore
}
return task;
} else {
long delayNanos = scheduledTask.delayNanos();
Runnable task = null;
if (delayNanos > 0) { // 有定时任务,但没到执行时间,从任务队列中获取
try {
// 尝试在该定时任务执行前获取任务队列的任务,如果期间有新任务,poll 就返回该任务了
task = taskQueue.poll(delayNanos, TimeUnit.NANOSECONDS);
} catch (InterruptedException e) {
// Waken up.
// 这里的阻塞是可能被中断的
// 因为新加入定时任务可能比该定时任务还早执行,不能接着等任务队列的 poll 被新任务触发
// 所以需要中断
return null;
}
}
if (task == null) {
// We need to fetch the scheduled tasks now as otherwise there may be a chance that
// scheduled tasks are never executed if there is always one task in the taskQueue.
// This is for example true for the read task of OIO Transport
// See https://github.com/netty/netty/issues/1614
// 如果到该定时任务执行时间任务队列还没有新任务,
// 就将 scheduledTaskQueue 中满足执行条件的定时任务转移到任务队列 taskQueue 中,包括当前这个定时任务
fetchFromScheduledTaskQueue();
// 这个 task 还是可能为空的,因为前面转移可能失败
task = taskQueue.poll();
}
if (task != null) {// 如果为 null 就一直获取,直到成功
return task;
}
}
}
}
- wake up 机制:有一个叫 WAKEUP_TASK 的 Runable 字段,什么都不做,目的是唤醒正在阻塞的任务队列 poll,在任务队列 poll 的时候也会忽略这个 Runnable,队列任务不在 eventLoop 线程且需要立即执行的才会触发提交 WAKEUP_TASK(提交定时任务,此时定时已经到期,也会触发;队列任务不在 EventLoop 线程也会触发)。下面是相关方法:
@Override
public void execute(Runnable task) {
execute0(task);
}
@Override
public void lazyExecute(Runnable task) {
lazyExecute0(task);
}
private void execute0(@Schedule Runnable task) {
ObjectUtil.checkNotNull(task, "task");
execute(task, wakesUpForTask(task)); // wakesUpForTask 默认为 true,可以重写。
}
private void lazyExecute0(@Schedule Runnable task) {
execute(ObjectUtil.checkNotNull(task, "task"), false);
}
private void execute(Runnable task, boolean immediate) {
boolean inEventLoop = inEventLoop();
addTask(task);
// ...
if (!addTaskWakesUp && immediate) { // NioEventLoop 的 addTaskWakesUp 默认为 false
wakeup(inEventLoop);
}
}
protected boolean wakesUpForTask(Runnable task) { // 子类可以用该方法判断 Runnable 是否应该唤醒 EventLoop
return true;
}
protected void wakeup(boolean inEventLoop) {
if (!inEventLoop) {
// Use offer as we actually only need this to unblock the thread and if offer fails we do not care as there
// is already something in the queue.
// 唤醒线程
taskQueue.offer(WAKEUP_TASK);
}
}
SingleThreadEventLoop
继承 SingleThreadEventExecutor,实现 EventLoop
主要是重写了 EventLoop 的 register 方法,将其调用转向 Channel 的 io.netty.channel.Channel.Unsafe 的 register(最后转到具体的 io.netty.channel.nio.AbstractNioChannel.AbstractNioUnsafe#doRegister 实现),其中 java channel 注册的 selector 由 NioEventLoop 的 unwrappedSelector() 方法提供
Channel
netty 中所有 channel 的 IO 操作全是异步的,因此提供了 ChannelFuture 进行 IO 操作成功、失败和取消相关的通知。
Channel 的 实现非常多,最常用的 NioServerSocketChannel 和 NioSocketChannel 的继承结构如下:
-
channel:定义基本的操作,比如 id()、eventLoop()、pipeline()。
-
ServerChannel:表示它可以接受连接并创建子 Channel。
-
ServerSocketChannel:表示可以接受 TCP/IP 的服务器通道
-
AbstractChannel:Channel骨架,抽象类,实现 id()、eventLoop()、pipeline() 等方法,还有重要的 doRegister 等方法
-
AbstractNioChannel:基于Selector 的Nio Channel 实现,比如它的 doRegister 实现了将 javaChannel 注册到 Selector 的操作
-
SocketChannel:表示 TCP/IP socket 通道
Netty 的 channel 是如何避免被 gc 的呢,在 register 向 selector 注册时,待完成!!!
Unsafe
这个类不允许用户使用
- read:负责消息的读取,并将拿到的消息通过 pipeline 的 fireChannelRead 和 fireChannelReadComplete 通知给 channelHandler,在 NioEventLoop 中如果触发了 selector 的兴趣(Accept 或 Read)就会触发这个方法。实现类是 NioByteUnsafe 就是在读取 NioSocketChannel 客户端发来的数据;实现类是 NioMessageUnsafe 就是 serverSocket.accept() 接收一个新链接。
- register(EventLoop, ChannelPromise):负责真正的注册
- write:写到一个缓存区(io.netty.channel.ChannelOutboundBuffer,负责放置 ByteBuffer 对象)
- flush:从缓存区取数据并写入连接通道
ChannelHandler
处理或拦截 IO 事件,并将其转发到 ChannelPipline 中的下一个 ChannelHandler
- handlerAdded 方法:将 ChannelHandler 添加到实际 ChannelHandlerContext(ChannelPipeline) 后调用。ChannelInitializer 就是通过它来调用 initChannel 方法来注册其中的 channelHandler 的,最后将其自身 remove(this) 了
- handlerRemoved 方法:将 ChannelHandler 添加到实际 ChannelHandlerContext 后调用
- @ChannelHandler.Sharable 注解:文档型注释,表示被标注的 ChannelHandler 类可以共享(即单例),运行时,Netty 会给每个 ChannlHandler 加一个用于检测的 added 变量(在 ChannelHandlerAdapter 中定义的,这个变量不适用 volatile,只作为健康性检查,所以没问题),每次添加 ChannlHandler 时都会校验依次这个变量,如果重复添加(即 added 为 true)且没被这个注解标注,就会报错显示这个 ChannelHandler 被重复添加到不同的 ChannelPipline 里了
ChannelHandlerAdapter
是适配器,所以把上面的方法都用空实现代替了
ChannelInboundHandler
入站 ChannelHandler,负责状态变动的通知回调
- channelRegistered 方法:ChannelHandlerContext(实际是 ChannelPipeline,下面省略)的 Channel 被注册到 EventLoop
- channelUnregistered 方法:从 EventLoop 中注销
- channelActive 方法:被激活
- channelRead 方法:读取到一个消息
- channelReadComplete 方法:一个消息很长,channelRead 调用了 n 次才读完,读完整个消息时调用该方法
- userEventTriggered:用户事件被触发
- exceptionCaught:入站出现异常时。注意!!!出站不调用该方法,出站的 write 需要用添加回调来判断异常是否存在
ChannelInboundHandlerAdapter
继承自 ChannelHandlerAdapter,实现 ChannelInboundHandler。
ChannelInboundHandler 的适配器,提供默认实现,即 ctx.fireXXX
ChannelOutboundHandler
出站 ChannelHandler,收到 IO 出站操作时通知此 ChannelHandler
- read 方法:比较神奇的是,出站处理器竟然有 read 方法?实际 Netty 连接成功后对 selector 注册读事件就是通过它实现的,具体是在连接是 Accept 即接收就绪时触发 channelActive,首先来到 DefaultChannelPipeline 的 第一个节点 HeadContext,触发它的 channelActive ,此时会先向后面的 节点 fire 进行通知,然后就会检测 Channel 是否 autoRead(默认是),如果是就调用 channel.read() 方法,channel.read() 方法实际调用的是 pipeline 的 read,而 pineline 实际是调用的 tailContext 的read,这个 read 是 outbound 即出站处理器的方法,所以向前传递,具体方法如下:
final ChannelHandler handler = handler();
final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
if (handler == headContext) {
headContext.read(this);
} else if (handler instanceof ChannelDuplexHandler) {
((ChannelDuplexHandler) handler).read(this);
} else {
((ChannelOutboundHandler) handler).read(this);
}
可以看到判断了如果属于出站处理器就调用,因此如果你重写了 ChannelOutboundHandler 的 read 方法,并且没有调用 ctx.read() 来向前传递通知,那么头结点 HeadContext 就收不到 read 通知,因此一般该方法和其他方法一样都是默认的 fire 调用,接着说头结点 HeadContext 的 read 收到通知(被调用)时,会调用 channel 的 unsafe 字段进行 beginRead 向实际的 selectionKey (AbstractNioChannel)注册 read 类型事件(这个 unsafe 也检测了 inEventLoop,只是它用的是 assert 强制检测,这样挺合理的,因为 read 是只有自己才关心的,和其他人没关系)
ChannelOutboundHandlerAdapter
与 ChannelInboundHandlerAdapter 类似,继承自 ChannelHandlerAdapter,实现 ChannelOutboundHandler。
ChannelOutboundHandler 的适配器,提供默认实现,即 ctx.fireXXX
ChannelDuplexHandler
同时继承了 ChannelInboundHandlerApapter (默认都是 ctx.fire 向下传递)并实现 ChannelOutboundHandler(没继承ChannelOutboundHandlerAdapter 是因为 java 不能继承多个类),实现的 ChannelOutboundHandler 和 ChannelOutboundHandlerAdapter 一样都是调用 ctx 向上传递
常用的 SimpleChannelInboundHandler 和 MessageToMessageCodec
两个抽象出来的类,把常见的用法封装到一起,并提供抽象方法使用,比如 SimpleChannelInboundHandler 的 channelRead0 方法,自行后会自动释放 channelRead0 参数(如果这个参数对象属于引用计数类型)。
还比如 MessageToMessageCodec 就是 ChannelDuplexHandler 的一种常见的编解码封装。
它们的继承关系如下:
特殊的 ChannelInitializer
继承自 ChannelInboundHandlerAdapter,负责将一些 channelHandler 添加到 pipeline 中,在 channel 注册到 eventLoop 时执行,当其添加到 pipeline 后,handlerAdded 方法被通知执行,之后 ChannelInitializer 调用 initChannel 完成其中一些 ChannelHandler 的注册,全部添加后再将自身 remove(remove(this))
注意这个类被 @ChannelHandler.Sharable 标注,因此它必须是“单例”的
ChannelHandlerContext
- channel 方法:获取其 pipeline 中的 channel,作为便捷方法
- executor 方法:获取其 channel 中的 eventLoop,作为便捷方法
- name 方法:获取设置到 pipeline 时的名字,不能重复,不写就用默认命名规则 simpleClassName + #0,默认的还重复(有缓存)就生成(生成规则忘了,不重要,忘了就忘了)
- handler 方法:获取具体的 ChannelHandler
- alloc 方法:分配的 ByteBufAllocator,用于分配 ByteBuf
它实现了 ChannelInboundInvoker 接口(传到尾节点 TailContext,如果最后传到这里了,参数如果属于计数类型就会释放,保证内存安全):
- fireChannelActive:传递给 pipeline 中当前 ctx 下一个包含 ChannelInboundHandler 的 ctx
- fireChannelRead:和上面相同,传递读取到的消息
- fireExceptionCaught:只有它和其他 fire 方法不同,异常如果处理完了就不会传递了!!!
因为是双向链表的节点,所以还实现了 ChannelOutboundInvoker 接口(头结点 HeadContext):
- read 方法:前面提到过,请求从通道中读取数据到入站缓冲区,真的读到数据了就触发 ChannelInboundHandler 的 channelRead 方法
- write 方法:传递给 pipeline 中当前 ctx 前一个包含 ChannelOutboundHandler 的 ctx(到头结点 HeadContext 停,该调用 unsafe 的调用 unsafe,比如 write、flush 等)
AbstractChannelHandlerContext
定义前后指针 prev/next、以及所述的 pipeline
DefaultChannelHandlerContext
定义 handler 字段,重写 handler() 方法,约等于什么都没干。
HeadContext 和 TailContext
在 DefaultChannelPipeline 中定义的双向链表头尾节点(当然继承了 AbstractChannelHandlerContext)
-
HeadContext:实现 ChannelInboundHandler 和 ChannelOutboundHandler,实现 Inbound 是负责日常入站的最初的 ctx.fire 以及其他自动操作,实现 outbound 是将出站的最后操作委托给 unsafe。比如在 io.netty.channel.DefaultChannelPipeline.HeadContext#channelRegistered 时检查是否需要为当前 pipeline 添加 channelInitializer(实际是委托给 PendingHandlerCallback,由其添加 channelInitializer 添加到 pipeline,进而触发 handlerAdded 来执行 initChannel 里 ChannelHandler 注册的操作)。还比如 channelActive 中的自动 read,详见前面提到 HeadContext 中 channel.read() 调到 TailContext 后再往回调最后触发 head.read() 即 unsafe.read() 的解释
-
TailContext:实现 ChannelInboundHandler,因为是尾节点,所以默认将一些传递到此的计数资源释放,以及异常到此处时告知用户“你没有默认的异常处理”,对于其他不需要释放计数资源的方法就是空实现
ChannelPipeline
Channelhandler 列表,用于处理入站和出站信息,入站从左向右,出站从右向左
实现 ChannelInboundInvoker 和 ChannelOutboundInvoker 接口来完成入站和出站处理
- addLast 方法:添加 channelHandler 到 pipeline 的最后一个位置,最常使用的添加 handler 方法,名字不允许重复,否则报 IllegalArgumentException 异常
- remove 方法:从 pipeline 中删除一个 channelHandler
- fireXXX 方法:实现 ChannelInboundInvoker 的一堆入站通知
- write、writeAndFlush、read 等方法:实现 ChannelOutboundInvoker 的一堆出站方法
DefaultChannelPipeline
channelpipeline 的默认实现,实现入站和出站 ChannelHandler 的双向链表,每个节点是 ChannelHandlerContext 类型,由 ctx 中的 handler 确定是否向右通知入站或向左通知出站。
- head 字段:HeadContext 类型,作为双向链表的头指针,作用详见前面
- tail 字段:TailContext 类型,作为双向链表的尾指针,作用详见前面
- pendingHandlerCallbackHead 字段:PendingHandlerCallback 类型,在 channel 在 boss 注册时设置为 PendingHandlerAddedTask(存疑),在 io.netty.channel.AbstractChannel.AbstractUnsafe#register0 中间接(一层调一层)调用了 io.netty.channel.DefaultChannelPipeline#callHandlerAddedForAllHandlers,进而执行了 pendingHandlerAddedTask 的 executre 方法将 ChannelInitializer 中待注册的 handler 全部添加 pipeline
ByteBuf
java.nio 下的 ByteBuffer 不好用,所以 Netty 自己搞了一个。
介绍
特性
几个重要的特性:池化、引用计数、零拷贝(不是)
- 继承了 ReferenceCounted 接口实现引用计数。refCnt 方法获取对象的引用计数,为 0 表示对象解除占用;retain 方法对引用计数加 1;release 方法对引用计数减 1,减完为 0 时返回 true 表示对象解除占用
- 继承了 ByteBufConvertible 接口实现 asByteBuf 方法将当前对象转为 ByteBuf
实现
ByteBuf 的实现主要是 ByteBuf -> AbstractByteBuf -> AbstractReferenceCountedByteBuf
- AbstractByteBuf:ByteBuf 骨架,实现 readerIndex、writerIndex、maxCapacity 等基础功能
- AbstractReferenceCountedByteBuf:实现了 ReferenceCounted 接口的引用计数功能,比如 refCnt、retain 和 release
分类
实现类有两种分类方法,按照内存分配方式不同分为:
- XxxHeapByteBuf:堆内存分配的 ByteBuf
- XxxDirectByteBuf:直接内存分配的 ByteBuf
按照池化非池化又分为:
- PooledByteBuf:池化的,PooledDirectByteBuf、PooledHeapByteBuf、PooledUnsafeDirectByteBuf 和 PooledUnsafeHeapByteBuf
- UnpooledXxxByteBuf:非池化的
特殊的 ByteBuf
UnreleasableByteBuf
包装一个 ByteBuf 后返回,目的是防止用户增加或减少其包装的 ByteBuf 引用计数,实际上就是重写了控制引用技术的相关方法(比如 retain、release 等),什么都不做而已,还有 slice 分片、duplicate 复制等新建副本之类的也用 UnreleasableByteBuf 包装后再返回。
可以用 io.netty.buffer.Unpooled#unreleasableBuffer 方法调用,通常用于一些特殊的常量 ByteBuf,将其包装为 UnreleasableByteBuf 防止其他人错误的销毁
CompositeByteBuf
组合多个 ByteBuf,将其抽象成一个
有趣的 ByteBufHolder
不是 ByteBuf 的子类,而是继承 ReferenceCounted,表示发送和接收的数据包,默认的 DefaultByteBufHolder 里有一个 ByteBuf 类型的 data 字段作为组合设计模式使用,WebSocketFrame 等数据包就是它的子类
- content 方法:返回此 ByteBufHolder 持有的 ByteBuf
- copy 方法:返回一个深拷贝 ByteBuf 副本的新 ByteBufHolder
- duplicate 方法:返回一个浅拷贝 ByteBuf(只新建了读写指针,数据还是同一个对象)的新 ByteBufHolder
- retainedDuplicate 方法:返回一个浅拷贝 ByteBuf(只新建了读写指针,数据还是同一个对象)的新 ByteBufHolder,同时对 ByteBuf 进行 retain 引用计数加 1
池化
本地线程缓存加 PoolArena 数组,太复杂,没看
引用计数
AbstractReferenceCountedByteBuf 是实现引用计数的 ByteBuf 抽象类,继承自 AbstractByteBuf
- updater 字段:ReferenceCountUpdater<AbstractReferenceCountedByteBuf> 类型,实际的实现是 AtomicIntegerFieldUpdater<AbstractReferenceCounted> 来 cas 更新 volatile 类型 refCnt 字段,cas 加 volatile 很明显是为了不加锁进行更新,减少并发竞争锁资源以及其带来的上下文切换时间
- retain 方法:利用上面的 updater 进行 cas 加 1
- release 方法:同样是用 updater 进行 cas 减 1
- deallocate 方法:用于泄露检测,release 如果返回 true,即引用计数为 0 时,触发该方法
ByteBuf 泄露检测
ByteBuf 把释放权利交给了用户(池化的 ByteBuf 以及非池化的 DirectByteBuf),而用户可能在不再持有该 ByteBuf 后没有将其引用计数清零归还到池中,从而导致 jvm 把 ByteBuf gc 后让 ByteBuf 池发生了泄漏,最后导致 ByteBuf 池无内存可用,因此需要泄漏检测,检测的原理很简单,利用 ReferenceQueue 和 WeakReference,在 ByteBuf release 时判断如果引用计数为 0 了,就对其标记为已释放,在 ReferenceQueue 接到 WeakReference<ByteBuf> 时(gc 回收了该对象)检测该对象是否被标记为已释放,如果没被标记为已释放就表示放生了内存泄漏。
下面先介绍下相关类:
- ResourceLeakDetector:资源泄露检测器,增加泄漏检测的 api 入口。track 方法用于创建一个 ResourceLeakTracker 进行资源跟踪,同时检查是否有泄漏的资源要报告(调用 reportLeak 方法)
- ResourceLeakTracker:具体的跟踪者,close 方法用于关闭泄漏,即正确的释放资源;record 方法用于记录调用方的当前堆栈,Throwable.getStackTrace() 和 Thread.currentThread().getStackTrace() 都可以获取堆栈,不过后者在 jdk6前是直接 dump 的堆栈,因为打印堆栈的目标线程和执行打印的工作线程不一定是一个,这样会很慢(需要进入安全点),所以后面优化为如果工作线程和目标线程是同一个线程,就使用 new Exception().getStackTrace() 快速获取堆栈
- DefaultResourceLeak:ResourceLeakTracker 的默认实现,继承 WeakReference
- TraceRecord:记录一次使用的位置,继承 Throwable
接着开始说流程,我们以 PooledByteBufAllocator 为例:
- 分配时:如果是直接内存,buffer() 最终调用的就是 newDirectBuffer(),获取到的 ByteBuf 是被 toLeakAwareBuffer() 处理了一遍才返回的,其中是 AbstractByteBuf.leakDetector.track(buf) 将其包裹为 DefaultResourceLeak(WeakReference类型,创建时添加到 Set<DefaultResourceLeak<?>> allLeaks 表示跟踪),并将 defaultResourceLeak 与 buf 封装为 SimpleLeakAwareByteBuf(只以默认的这个 Simple 级别的讲述)返回,利用的是组合加继承的设计模式,目的是保持其作为 ByteBuf 的功能,同时在其 release 时判断如果引用技术为 0 则调用 defaultResourceLeak 的 close (从 中删除 defaultResourceLeak)告知其正确的释放了资源
- 检测时:检测的时机发生在 AbstractByteBuf.leakDetector.track() 方法中,每次创建新的 DefaultResourceLeak 时都会检测,过程是从 resourceLeakDetector 的 referenceQueue 取元素,如果元素不为空且 dispose 为 true 就表示检测到泄漏(dispose 方法见下方),而在前面“分配时”讲到如果正确释放了资源 allLeaks 是不会有这个元素的,即返回 false。
boolean dispose() {
clear();
return allLeaks.remove(this);
}
最后说一下为什么用 allLeaks 这个 set 存放 DefaultResourceLeak,因为 gc 后 ByteBuf 和 SimpleLeakAwareByteBuf 都被 gc 了,需要一个和他们同样引用关系的对象(同时记载这个 ByteBuf 使用的记录)由 set 强引用,直到泄漏检测完成后再完全释放引用交给 gc。
netty中的内存泄漏检测机制ResourceLeakDetector - 简书 (jianshu.com)
零拷贝
传统的 Zero-Copy 是 IO 数据传输无需由内核态到用户态,再从用户态到内核态,较少拷贝次数
Netty 的 Zero-Copy 是完全在用户态下的,即传输层的零拷贝机制,比如在拆包、合并包时,常见的做法是 System.arrayCopy 拷贝数据,但这样有一定的开销,而 Netty 通过 slice、wrapedBuffer 等操作拆分合并 ByteBuf,从而无需进行数据拷贝
标签:分析,Netty,task,read,任务,源码,io,方法,channel From: https://www.cnblogs.com/hligy/p/17858932.html