首页 > 编程语言 >Netty源码分析——客户端接入accept过程

Netty源码分析——客户端接入accept过程

时间:2022-10-17 12:08:35浏览次数:80  
标签:Netty pipeline ch accept 源码 线程 new final channel

基于Netty源代码版本:netty-all-4.1.33.Final

netty中的reactor线程

netty中最核心的东西莫过于两种类型的reactor线程,可以看作netty中两种类型的发动机,驱动着netty整个框架的运转。

一种类型的reactor线程是boos线程组,专门用来接受新的连接,然后封装成channel对象扔给worker线程组;还有一种类型的reactor线程是worker线程组,专门用来处理连接的读写。

不管是boos线程还是worker线程,所做的事情均分为以下三个步骤

  • 轮询出注册在selector上面的IO事件(select)
  • 处理这些IO事件(process selected keys)
  • 执行异步task

对于boos线程来说,第一步轮询出来的基本都是 accept 事件,表示有新的连接,而worker线程轮询出来的基本都是read/write事件,表示网络的读写事件

新连接的建立

简单来说,新连接的建立可以分为三个步骤:

  • 1、检测到有新的连接
  • 2、将新的连接注册到worker线程组
  • 3、注册新连接的读事件

检测到有新连接进入

我们已经知道,当服务端绑启动之后,服务端的channel已经注册到boos reactor线程中,reactor不断检测有新的事件,直到检测出有accept事件发生 NioEventLoop.java

private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
	final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
	//检查该SelectionKey是否有效,如果无效,则关闭channel
	if (!k.isValid()) {
		final EventLoop eventLoop;
		try {
			eventLoop = ch.eventLoop();
		} catch (Throwable ignored) {
			// If the channel implementation throws an exception because there is no event loop, we ignore this
			// because we are only trying to determine if ch is registered to this event loop and thus has authority
			// to close ch.
			return;
		}
		// Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop
		// and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is
		// still healthy and should not be closed.
		// See https://github.com/netty/netty/issues/5125
		if (eventLoop != this || eventLoop == null) {
			return;
		}
		// close the channel if the key is not valid anymore
		unsafe.close(unsafe.voidPromise());
		return;
	}

	try {
		int readyOps = k.readyOps();
		// We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise
		// the NIO JDK channel implementation may throw a NotYetConnectedException.
		// 如果是OP_CONNECT,则需要移除OP_CONNECT否则Selector.select(timeout)将立即返回不会有任何阻塞,这样可能会出现cpu 100%
		if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
			// remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
			// See https://github.com/netty/netty/issues/924
			int ops = k.interestOps();
			ops &= ~SelectionKey.OP_CONNECT;
			k.interestOps(ops);

			unsafe.finishConnect();
		}

		// Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
		// 如果准备好了WRITE则将缓冲区中的数据发送出去,如果缓冲区中数据都发送完成,则清除之前关注的OP_WRITE标记
		if ((readyOps & SelectionKey.OP_WRITE) != 0) {
			// Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
			ch.unsafe().forceFlush();
		}

		// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
		// to a spin loop
		// 如果准备好READ或ACCEPT则触发unsafe.read() ,检查是否为0,如上面的源码英文注释所说:解决JDK可能会产生死循环的一个bug。
		if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
			unsafe.read();
		}
	} catch (CancelledKeyException ignored) {
		unsafe.close(unsafe.voidPromise());
	}
}

该方法主要是对SelectionKey k进行了检查,有如下几种不同的情况

  • 1)OP_ACCEPT,接受客户端连接
  • 2)OP_READ, 可读事件, 即 Channel 中收到了新数据可供上层读取。
  • 3)OP_WRITE, 可写事件, 即上层可以向 Channel 写入数据。
  • 4)OP_CONNECT, 连接建立事件, 即 TCP 连接已经建立, Channel 处于 active 状态。

主要来看下当boss线程 selector检测到OP_ACCEPT事件时,内部干了些什么。

// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
// to a spin loop
// 如果准备好READ或ACCEPT则触发unsafe.read() ,检查是否为0,如上面的源码英文注释所说:解决JDK可能会产生死循环的一个bug。
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
	unsafe.read();
}

boos reactor线程已经轮询到 SelectionKey.OP_ACCEPT 事件,说明有新的连接进入,此时将调用channel的 unsafe来进行实际的操作,此时的channel为 NioServerSocketChannel,则unsafe为NioServerSocketChannel的属性NioMessageUnsafe

 

那么,我们进入到它的read方法,进入新连接处理的第二步 注册到reactor线程 NioMessageUnsafe.java

private final List<Object> readBuf = new ArrayList<Object>();

@Override
public void read() {
	assert eventLoop().inEventLoop();
	final ChannelConfig config = config();
	final ChannelPipeline pipeline = pipeline();
	final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
	allocHandle.reset(config);

	boolean closed = false;
	Throwable exception = null;
	try {
		try {
			do {
				int localRead = doReadMessages(readBuf);
				if (localRead == 0) {
					break;
				}
				if (localRead < 0) {
					closed = true;
					break;
				}

				allocHandle.incMessagesRead(localRead);
			} while (allocHandle.continueReading());
		} catch (Throwable t) {
			exception = t;
		}

		int size = readBuf.size();
		for (int i = 0; i < size; i ++) {
			readPending = false;
			pipeline.fireChannelRead(readBuf.get(i));
		}
		readBuf.clear();
		allocHandle.readComplete();
		pipeline.fireChannelReadComplete();

		if (exception != null) {
			closed = closeOnReadError(exception);

			pipeline.fireExceptionCaught(exception);
		}

		if (closed) {
			inputShutdown = true;
			if (isOpen()) {
				close(voidPromise());
			}
		}
	} finally {
		// Check if there is a readPending which was not processed yet.
		// This could be for two reasons:
		// * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
		// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
		//
		// See https://github.com/netty/netty/issues/2254
		if (!readPending && !config.isAutoRead()) {
			removeReadOp();
		}
	}
}

调用 doReadMessages 方法不断地读取消息,用 readBuf 作为容器,这里,其实可以猜到读取的是一个个连接,然后调用 pipeline.fireChannelRead(),将每条新连接经过一层服务端channel的洗礼,之后清理容器,触发 pipeline.fireChannelReadComplete() 下面我们具体看下这两个方法:

  • 1、doReadMessages(List)
  • 2、pipeline.fireChannelRead(NioSocketChannel)

doReadMessages()

@Override
protected int doReadMessages(List<Object> buf) throws Exception {
	SocketChannel ch = SocketUtils.accept(javaChannel());

	try {
		if (ch != null) {
			buf.add(new NioSocketChannel(this, ch));
			return 1;
		}
	} catch (Throwable t) {
		logger.warn("Failed to create a new channel from an accepted socket.", t);

		try {
			ch.close();
		} catch (Throwable t2) {
			logger.warn("Failed to close a socket.", t2);
		}
	}

	return 0;
}

@Override
protected ServerSocketChannel javaChannel() {
	return (ServerSocketChannel) super.javaChannel();
}

public final class SocketUtils {
	@Override
	protected int doReadMessages(List<Object> buf) throws Exception {
		SocketChannel ch = SocketUtils.accept(javaChannel());

		try {
			if (ch != null) {
				buf.add(new NioSocketChannel(this, ch));
				return 1;
			}
		} catch (Throwable t) {
			logger.warn("Failed to create a new channel from an accepted socket.", t);

			try {
				ch.close();
			} catch (Throwable t2) {
				logger.warn("Failed to close a socket.", t2);
			}
		}

		return 0;
	}
}

我们终于窥探到netty调用jdk底层nio的边界serverSocketChannel.accept();,由于netty中reactor线程第一步就扫描到有accept事件发生,因此,这里的accept方法是立即返回的,返回jdk底层nio创建的一条channel ServerSocketChannel有阻塞和非阻塞两种模式:

  • a、阻塞模式:ServerSocketChannel.accept() 方法监听新进来的连接,当 accept()方法返回的时候,它返回一个包含新进来的连接的 SocketChannel。阻塞模式下, accept()方法会一直阻塞到有新连接到达。
  • b、非阻塞模式:accept() 方法会立刻返回,如果还没有新进来的连接,返回的将是null。 因此,需要检查返回的SocketChannel是否是null.

在NioServerSocketChannel的构造函数分析中,我们知道,其通过ch.configureBlocking(false); 语句设置当前的ServerSocketChannel为非阻塞的。

 

在NioServerSocketChannel的构造函数分析中,我们知道,在其父类AbstractNioChannel的构造函数中通过ch.configureBlocking(false);语句设置当前的ServerSocketChannel为非阻塞的。

netty将jdk的 SocketChannel 封装成自定义的 NioSocketChannel,加入到list里面,这样外层就可以遍历该list,做后续处理

从上一篇文章中,我们已经知道服务端的创建过程中会创建netty中一系列的核心组件,包括pipeline,unsafe等等,那么,接受一条新连接的时候是否也会创建这一系列的组件呢?

带着这个疑问,我们跟进去 NioSocketChannel.java

public NioSocketChannel(Channel parent, SocketChannel socket) {
	super(parent, socket);
	config = new NioSocketChannelConfig(this, socket.socket());
}

我们重点分析 super(parent, socket),NioSocketChannel的父类为 AbstractNioByteChannel AbstractNioByteChannel.java

protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
	super(parent, ch, SelectionKey.OP_READ);
}

这里,我们看到jdk nio里面熟悉的影子—— SelectionKey.OP_READ,一般在原生的jdk nio编程中,也会注册这样一个事件,表示对channel的读感兴趣

我们继续往上,追踪到AbstractNioByteChannel的父类 AbstractNioChannel, 这里,我相信读了上一篇文章你对于这部分代码肯定是有印象的

protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
	super(parent);
	this.ch = ch;
	this.readInterestOp = readInterestOp;
	try {
		ch.configureBlocking(false);
	} catch (IOException e) {
		try {
			ch.close();
		} catch (IOException e2) {
			if (logger.isWarnEnabled()) {
				logger.warn(
						"Failed to close a partially initialized socket.", e2);
			}
		}

		throw new ChannelException("Failed to enter non-blocking mode.", e);
	}
}

在创建服务端channel的时候,最终也会进入到这个方法,super(parent), 便是在AbstractChannel中创建一系列和该channel绑定的组件,如下

protected AbstractChannel(Channel parent) {
	this.parent = parent;
	id = newId();
	unsafe = newUnsafe();
	pipeline = newChannelPipeline();
}

而这里的 readInterestOp 表示该channel关心的事件是 SelectionKey.OP_READ,后续会将该事件注册到selector,之后设置该通道为非阻塞模式,在channel中创建 unsafe 和一条 pipeline

pipeline.fireChannelRead(NioSocketChannel)

对于 pipeline我们前面已经了解过,在netty的各种类型的channel中,都会包含一个pipeline,字面意思是管道,我们可以理解为一条流水线工艺,流水线工艺有起点,有结束,中间还有各种各样的流水线关卡,一件物品,在流水线起点开始处理,经过各个流水线关卡的加工,最终到流水线结束

对应到netty里面,流水线的开始就是HeadContxt,流水线的结束就是TailConext,HeadContxt中调用Unsafe做具体的操作,TailConext中用于向用户抛出pipeline中未处理异常以及对未处理消息的警告

 

通过前面的文章中,我们已经知道在服务端的channel初始化时,在pipeline中,已经自动添加了一个pipeline处理器 ServerBootstrapAcceptor, 并已经将用户代码中设置的一系列的参数传入了构造函数,接下来,我们就来看下ServerBootstrapAcceptor ServerBootstrapAcceptor.java

private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter {

	private final EventLoopGroup childGroup;
	private final ChannelHandler childHandler;
	private final Entry<ChannelOption<?>, Object>[] childOptions;
	private final Entry<AttributeKey<?>, Object>[] childAttrs;
	private final Runnable enableAutoReadTask;

	ServerBootstrapAcceptor(
			final Channel channel, EventLoopGroup childGroup, ChannelHandler childHandler,
			Entry<ChannelOption<?>, Object>[] childOptions, Entry<AttributeKey<?>, Object>[] childAttrs) {
		this.childGroup = childGroup;
		this.childHandler = childHandler;
		this.childOptions = childOptions;
		this.childAttrs = childAttrs;

		// Task which is scheduled to re-enable auto-read.
		// It's important to create this Runnable before we try to submit it as otherwise the URLClassLoader may
		// not be able to load the class because of the file limit it already reached.
		//
		// See https://github.com/netty/netty/issues/1328
		enableAutoReadTask = new Runnable() {
			@Override
			public void run() {
				channel.config().setAutoRead(true);
			}
		};
	}

	@Override
	@SuppressWarnings("unchecked")
	public void channelRead(ChannelHandlerContext ctx, Object msg) {
		final Channel child = (Channel) msg;

		child.pipeline().addLast(childHandler);

		setChannelOptions(child, childOptions, logger);

		for (Entry<AttributeKey<?>, Object> e: childAttrs) {
			child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
		}

		try {
			childGroup.register(child).addListener(new ChannelFutureListener() {
				@Override
				public void operationComplete(ChannelFuture future) throws Exception {
					if (!future.isSuccess()) {
						forceClose(child, future.cause());
					}
				}
			});
		} catch (Throwable t) {
			forceClose(child, t);
		}
	}
}

前面的 pipeline.fireChannelRead(NioSocketChannel); 最终通过head->unsafe->ServerBootstrapAcceptor的调用链,调用到这里的 ServerBootstrapAcceptor 的channelRead方法,而 channelRead 一上来就把这里的msg强制转换为 Channel

然后,拿到该channel,也就是我们之前new出来的 NioSocketChannel中对应的pipeline,将用户代码中的 childHandler,添加到pipeline,这里的 childHandler 在用户代码中的体现为

ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup)
 .channel(NioServerSocketChannel.class)
 .childHandler(new ChannelInitializer<SocketChannel>() {
     @Override
     public void initChannel(SocketChannel ch) throws Exception {
         ChannelPipeline pipeline = ch.pipeline();
         pipeline.addLast(new EchoServerHandler());
     }
 });

其实对应的是 ChannelInitializer,到了这里,NioSocketChannel中pipeline对应的处理器为 head->ChannelInitializer->tail,牢记,后面会再次提到!

接着,设置 NioSocketChannel 对应的 attr和option,然后进入到 childGroup.register(child),这里的childGroup就是我们在启动代码中new出来的NioEventLoopGroup

我们进入到NioEventLoopGroup的register方法,代理到其父类MultithreadEventLoopGroup MultithreadEventLoopGroup.java

@Override
public ChannelFuture register(Channel channel) {
	return next().register(channel);
}

这里又扯出来一个 next()方法,我们跟进去

@Override
public EventLoop next() {
	return (EventLoop) super.next();
}

回到其父类 MultithreadEventExecutorGroup.java

private final EventExecutorChooserFactory.EventExecutorChooser chooser;

@Override
public EventExecutor next() {
	return chooser.next();
}

protected MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) {
	chooser = chooserFactory.newChooser(children);
}

这里的chooser对应的类为 EventExecutorChooser,字面意思为事件执行器选择器,放到我们这里的上下文中的作用就是从worker reactor线程组中选择一个reactor线程

@UnstableApi
public interface EventExecutorChooserFactory {

    /**
     * Returns a new {@link EventExecutorChooser}.
     */
    EventExecutorChooser newChooser(EventExecutor[] executors);

    /**
     * Chooses the next {@link EventExecutor} to use.
     */
    @UnstableApi
    interface EventExecutorChooser {

        /**
         * Returns the new {@link EventExecutor} to use.
         */
        EventExecutor next();
    }
}

chooser的实现有两种

@UnstableApi
public final class DefaultEventExecutorChooserFactory implements EventExecutorChooserFactory {

    public static final DefaultEventExecutorChooserFactory INSTANCE = new DefaultEventExecutorChooserFactory();

    private DefaultEventExecutorChooserFactory() { }

    @SuppressWarnings("unchecked")
    @Override
    public EventExecutorChooser newChooser(EventExecutor[] executors) {
        if (isPowerOfTwo(executors.length)) {
            return new PowerOfTwoEventExecutorChooser(executors);
        } else {
            return new GenericEventExecutorChooser(executors);
        }
    }

    private static boolean isPowerOfTwo(int val) {
        return (val & -val) == val;
    }

    private static final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser {
        private final AtomicInteger idx = new AtomicInteger();
        private final EventExecutor[] executors;

        PowerOfTwoEventExecutorChooser(EventExecutor[] executors) {
            this.executors = executors;
        }

        @Override
        public EventExecutor next() {
            return executors[idx.getAndIncrement() & executors.length - 1];
        }
    }

    private static final class GenericEventExecutorChooser implements EventExecutorChooser {
        private final AtomicInteger idx = new AtomicInteger();
        private final EventExecutor[] executors;

        GenericEventExecutorChooser(EventExecutor[] executors) {
            this.executors = executors;
        }

        @Override
        public EventExecutor next() {
            return executors[Math.abs(idx.getAndIncrement() % executors.length)];
        }
    }
}

默认情况下,chooser通过 DefaultEventExecutorChooserFactory被创建,在创建reactor线程选择器的时候,会判断reactor线程的个数,如果是2的幂,就创建PowerOfTowEventExecutorChooser,否则,创建GenericEventExecutorChooser

两种类型的选择器在选择reactor线程的时候,都是通过Round-Robin的方式选择reactor线程,唯一不同的是,PowerOfTowEventExecutorChooser是通过与运算,而GenericEventExecutorChooser是通过取余运算,与运算的效率要高于求余运算

选择完一个reactor线程,即 NioEventLoop 之后,我们回到注册的地方

public ChannelFuture register(Channel channel) {
    return next().register(channel);
}

SingleThreadEventLoop.java

@Override
public ChannelFuture register(Channel channel) {
    return register(new DefaultChannelPromise(channel, this));
}

其实,这里已经和服务端启动的过程一样了,可以参考我前面的文章

AbstractChannel$AbstractUnsafe

private void register0(ChannelPromise promise) {
	try {
		// check if the channel is still open as it could be closed in the mean time when the register
		// call was outside of the eventLoop
		if (!promise.setUncancellable() || !ensureOpen(promise)) {
			return;
		}
		boolean firstRegistration = neverRegistered;
		doRegister();
		neverRegistered = false;
		registered = true;

		// Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
		// user may already fire events through the pipeline in the ChannelFutureListener.
		pipeline.invokeHandlerAddedIfNeeded();

		safeSetSuccess(promise);
		pipeline.fireChannelRegistered();
		// Only fire a channelActive if the channel has never been registered. This prevents firing
		// multiple channel actives if the channel is deregistered and re-registered.
		if (isActive()) {
			if (firstRegistration) {
				pipeline.fireChannelActive();
			} else if (config().isAutoRead()) {
				// This channel was registered before and autoRead() is set. This means we need to begin read
				// again so that we process inbound data.
				//
				// See https://github.com/netty/netty/issues/4805
				beginRead();
			}
		}
	} catch (Throwable t) {
		// Close the channel directly to avoid FD leak.
		closeForcibly();
		closeFuture.setClosed();
		safeSetFailure(promise, t);
	}
}

和服务端启动过程一样,先是调用 doRegister();做真正的注册过程,如下 AbstractNioChannel

@Override
protected void doRegister() throws Exception {
	boolean selected = false;
	for (;;) {
		try {
			selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
			return;
		} catch (CancelledKeyException e) {
			if (!selected) {
				// Force the Selector to select now as the "canceled" SelectionKey may still be
				// cached and not removed because no Select.select(..) operation was called yet.
				eventLoop().selectNow();
				selected = true;
			} else {
				// We forced a select operation on the selector before but the SelectionKey is still cached
				// for whatever reason. JDK bug ?
				throw e;
			}
		}
	}
}

将该条channel绑定到一个selector上去,一个selector被一个reactor线程使用,后续该channel的事件轮询,以及事件处理,异步task执行都是由此reactor线程来负责

绑定完reactor线程之后,调用 pipeline.invokeHandlerAddedIfNeeded()

前面我们说到,到目前为止NioSocketChannel 的pipeline中有三个处理器,head->ChannelInitializer->tail,最终会调用到 ChannelInitializer 的 handlerAdded 方法

@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
	if (ctx.channel().isRegistered()) {
		// This should always be true with our current DefaultChannelPipeline implementation.
		// The good thing about calling initChannel(...) in handlerAdded(...) is that there will be no ordering
		// surprises if a ChannelInitializer will add another ChannelInitializer. This is as all handlers
		// will be added in the expected order.
		if (initChannel(ctx)) {

			// We are done with init the Channel, removing the initializer now.
			removeState(ctx);
		}
	}
}

handlerAdded方法调用 initChannel 方法之后,调用remove(ctx);将自身删除

private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
	if (initMap.add(ctx)) { // Guard against re-entrance.
		try {
			initChannel((C) ctx.channel());
		} catch (Throwable cause) {
			// Explicitly call exceptionCaught(...) as we removed the handler before calling initChannel(...).
			// We do so to prevent multiple calls to initChannel(...).
			exceptionCaught(ctx, cause);
		} finally {
			ChannelPipeline pipeline = ctx.pipeline();
			if (pipeline.context(this) != null) {
				pipeline.remove(this);
			}
		}
		return true;
	}
	return false;
}

而这里的 initChannel 方法又是神马玩意?让我们回到用户方法,比如下面这段用户代码 用户代码

ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup)
 .channel(NioServerSocketChannel.class)
 .option(ChannelOption.SO_BACKLOG, 100)
 .handler(new LoggingHandler(LogLevel.INFO))
 .childHandler(new ChannelInitializer<SocketChannel>() {
     @Override
     public void initChannel(SocketChannel ch) throws Exception {
         ChannelPipeline pipeline = ch.pipeline();
         pipeline.addLast(new LoggingHandler(LogLevel.INFO));
         pipeline.addLast(new EchoServerHandler());
     }
 });

原来最终跑到我们自己的代码里去了啊!完了之后,NioSocketChannel绑定的pipeline的处理器就包括 head->LoggingHandler->EchoServerHandler->tail

注册读事件

接下来,我们还剩下这些代码没有分析完 AbstractChannel$AbstractUnsafe

private void register0(ChannelPromise promise) {
	try {
		// check if the channel is still open as it could be closed in the mean time when the register
		// call was outside of the eventLoop
		if (!promise.setUncancellable() || !ensureOpen(promise)) {
			return;
		}
		boolean firstRegistration = neverRegistered;
		doRegister();
		neverRegistered = false;
		registered = true;

		// Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
		// user may already fire events through the pipeline in the ChannelFutureListener.
		pipeline.invokeHandlerAddedIfNeeded();

		safeSetSuccess(promise);
		pipeline.fireChannelRegistered();
		// Only fire a channelActive if the channel has never been registered. This prevents firing
		// multiple channel actives if the channel is deregistered and re-registered.
		if (isActive()) {
			if (firstRegistration) {
				pipeline.fireChannelActive();
			} else if (config().isAutoRead()) {
				// This channel was registered before and autoRead() is set. This means we need to begin read
				// again so that we process inbound data.
				//
				// See https://github.com/netty/netty/issues/4805
				beginRead();
			}
		}
	} catch (Throwable t) {
		// Close the channel directly to avoid FD leak.
		closeForcibly();
		closeFuture.setClosed();
		safeSetFailure(promise, t);
	}
}

pipeline.fireChannelRegistered();,其实没有干啥有意义的事情,最终无非是再调用一下业务pipeline中每个处理器的 ChannelHandlerAdded方法处理下回调

isActive()在连接已经建立的情况下返回true,所以进入方法块,进入到 pipeline.fireChannelActive();在这里我详细步骤先省略,直接进入到关键环节

@Override
public final void beginRead() {
	assertEventLoop();

	if (!isActive()) {
		return;
	}

	try {
		doBeginRead();
	} catch (final Exception e) {
		invokeLater(new Runnable() {
			@Override
			public void run() {
				pipeline.fireExceptionCaught(e);
			}
		});
		close(voidPromise());
	}
}

AbstractNioChannel

public abstract class AbstractNioChannel extends AbstractChannel {    
	@Override
    protected void doBeginRead() throws Exception {
        // Channel.read() or ChannelHandlerContext.read() was called
        final SelectionKey selectionKey = this.selectionKey;
        if (!selectionKey.isValid()) {
            return;
        }

        readPending = true;

        final int interestOps = selectionKey.interestOps();
        if ((interestOps & readInterestOp) == 0) {
            selectionKey.interestOps(interestOps | readInterestOp);
        }
    }
}

这里其实就是将 SelectionKey.OP_READ事件注册到selector中去,表示这条通道已经可以开始处理read事件了

至此,netty中关于新连接的处理已经向你展示完了,我们做下总结

  • 1、boos reactor线程轮询到有新的连接进入
  • 2、通过封装jdk底层的channel创建 NioSocketChannel以及一系列的netty核心组件
  • 3、将该条连接通过chooser,选择一条worker reactor线程绑定上去
  • 4、注册读事件,开始新连接的读写

 

参考: https://www.cnblogs.com/java-chen-hao/p/11477358.html

https://www.cnblogs.com/zhangboyu/p/7452611.html

https://www.cnblogs.com/cr1719/p/6360201.html

标签:Netty,pipeline,ch,accept,源码,线程,new,final,channel
From: https://blog.51cto.com/u_14014612/5762044

相关文章