首页 > 编程语言 >Netty源码-05-EventLoop

Netty源码-05-EventLoop

时间:2022-11-16 22:37:51浏览次数:72  
标签:Netty task args EventLoop 源码 线程 executor new null

前文已经了解过了NioEventLoopGroupNioEventLoop

在Netty中是用的是Reactor线程模型(IO多路复用器+多个线程),真正处理业务流程的worker线程都是单个线程,一个线程处理多个Channel,一个Channel始终都是由特定的线程进行处理。

在这样的情况下,如果某个Channel的业务流程耗时较久或者阻塞,那么绑定在当前线程上所有的任务都会受到影响,这样的场景如何处理呢

再看一下EventLoop的继承关系

从名字就可以看出来EventLoop实现分为两类

  • 非IO事件循环线程 DefaultEventLoop
  • IO事件循环线程 在非IO事件循环基础之上增加了对网络IO多路复用器的支持 NioEventLoop 从API上看就是多了register(...)Channel的支持
    • KQueueEventLoop
    • EpollEventLoop
    • ...

NioEventLoop的具体实现依赖操作系统

  • MacOSX -> KQueueEventLoop
  • Linux -> EPollEventLoop
  • Windows -> PollEventLoop

一 Demo

package io.netty.example.basic.eventloop;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

/**
 *
 * @since 2022/11/8
 * @author dingrui
 */
public class EventLoopGroupTest01 {

    public static void main(String[] args) throws InterruptedException {
        EventLoopGroup bizGroup = new DefaultEventLoopGroup();
        new ServerBootstrap()
                .group(new NioEventLoopGroup())
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        ch.pipeline()
                                .addLast("handler1", new ChannelInboundHandlerAdapter() {
                                    @Override
                                    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                        // TODO: 2022/11/8 业务处理1
                                        ctx.fireChannelRead(msg);
                                    }
                                })
                                .addLast(bizGroup, "handler1", new ChannelInboundHandlerAdapter() {
                                    @Override
                                    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                        // TODO: 2022/11/8 业务处理2
                                    }
                                });
                    }
                })
                .bind(8080)
                .sync();
    }
}

大抵的示例就是将一整条业务流程上比较耗时的部分拆开,使用适当的EventLoop来处理,尽量让每个线程处理的内容都短小,提升处理效率。

二 DefaultEventLoop

package io.netty.example.basic.eventloop;

import io.netty.channel.DefaultEventLoopGroup;
import io.netty.channel.EventLoopGroup;

/**
 *
 * @since 2022/11/8
 * @author dingrui
 */
public class EventLoopGroupTest00 {

    public static void main(String[] args) {
        EventLoopGroup group = new DefaultEventLoopGroup();
        group.next().execute(()-> System.out.println("execute..."));
        System.out.println();
    }
}

NioEventLoop的实现是比DefaultEventLoop更丰富的,因此跟踪DefaultEventLoop源码就会简单很多。

1 DefaultEventLoopGroup构造方法

// DefaultEventLoopGroup.java
public DefaultEventLoopGroup() {
        this(0);
    }

public DefaultEventLoopGroup(int nThreads) {
        this(nThreads, (ThreadFactory) null);
    }

public DefaultEventLoopGroup(int nThreads, ThreadFactory threadFactory) {
        super(nThreads, threadFactory);
    }
// MultithreadEventLoopGroup.java
protected MultithreadEventLoopGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
        super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, threadFactory, args);
    }
// MultithreadEventExecutorGroup.java
protected MultithreadEventExecutorGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
        this(nThreads, threadFactory == null ? null : new ThreadPerTaskExecutor(threadFactory), args);
    }

// MultithreadEventExecutorGroup.java
protected MultithreadEventExecutorGroup(int nThreads,
                                            Executor executor, // null
                                            Object... args // [SelectorProvider SelectStrategyFactory RejectedExecutionHandlers]
    ) {
        this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
    }

// MultithreadEventExecutorGroup.java
protected MultithreadEventExecutorGroup(int nThreads, // 标识着group中有几个EventLoop
                                            Executor executor, // null
                                            EventExecutorChooserFactory chooserFactory, // DefaultEventExecutorChooserFactory.INSTANCE
                                            Object... args // [SelectorProvider SelectStrategyFactory RejectedExecutionHandlers]
    ) {
        if (executor == null) // 线程执行器 非守护线程(main线程退出可以继续执行)
            executor = new ThreadPerTaskExecutor(this.newDefaultThreadFactory()); // 构造一个executor线程执行器 一个任务对应一个线程(线程:任务=1:n)

        /**
         * 构建NioEventLoop
         * NioEventLoop children数组 线程池中的线程数组
         */
        this.children = new EventExecutor[nThreads];

        for (int i = 0; i < nThreads; i ++) { // 根据NioEventLoopGroup构造器指定的数量创建NioEventLoop 也就是指定数量的线程数(线程的创建动作延迟到任务提交时)
            boolean success = false;
            try {
                /**
                 * 初始化NioEventLoop事件循环器集合 也就是多个线程
                 */
                children[i] = this.newChild(executor, args); // args=[SelectorProvider SelectStrategyFactory RejectedExecutionHandlers]
                success = true;
            } catch (Exception e) {
                // TODO: Think about if this is a good exception type
                throw new IllegalStateException("failed to create a child event loop", e);
            } finally {
                if (!success) {
                    for (int j = 0; j < i; j ++) { // 但凡有一个child实例化失败 就把已经成功实例化的线程进行shutdown shutdown是异步操作
                        children[j].shutdownGracefully();
                    }

                    for (int j = 0; j < i; j ++) {
                        EventExecutor e = children[j];
                        try {
                            while (!e.isTerminated()) {
                                e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
                            }
                        } catch (InterruptedException interrupted) {
                            // Let the caller handle the interruption.
                            Thread.currentThread().interrupt(); // 把中断状态设置回去 交给关心的线程来处理
                            break;
                        }
                    }
                }
            }
        }

        /**
         * 创建线程选择器
         * 线程选择策略
         * NioEventLoopGroup都绑定一个chooser对象 作为线程选择器 通过这个线程选择器 为每一个channel发生的读写IO分配不同的线程进行处理
         */
        this.chooser = chooserFactory.newChooser(children);

        final FutureListener<Object> terminationListener = new FutureListener<Object>() { // 设置一个listener用来监听线程池中的termination事件 给线程池中的每一个线程都设置这个listener 当监听到所有线程都terminate以后 这个线程池就算真正的terminate了
            @Override
            public void operationComplete(Future<Object> future) throws Exception {
                if (terminatedChildren.incrementAndGet() == children.length)
                    terminationFuture.setSuccess(null);
            }
        };

        for (EventExecutor e: children)
            e.terminationFuture().addListener(terminationListener);

        Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
        Collections.addAll(childrenSet, children);
        readonlyChildren = Collections.unmodifiableSet(childrenSet); // 只读集合
    }

MultithreadEventExecutorGroup是父类,因此整体流程都是一样的,区别在于创建EventLoop的实现上

// MultithreadEventExecutorGroup.java
    protected abstract EventExecutor newChild(Executor executor, Object... args) throws Exception;

// DefaultEventExecutorGroup.java
@Override
    protected EventExecutor newChild(Executor executor, Object... args) throws Exception {
        return new DefaultEventExecutor(this, executor, (Integer) args[0], (RejectedExecutionHandler) args[1]);
    }
// DefaultEventExecutor.java
public DefaultEventExecutor(EventExecutorGroup parent, Executor executor, int maxPendingTasks,
                                RejectedExecutionHandler rejectedExecutionHandler) {
        super(parent, executor, true, maxPendingTasks, rejectedExecutionHandler);
    }
// SingleThreadEventExecutor.java
protected SingleThreadEventExecutor(EventExecutorGroup parent, // EventLoop线程归属的管理器
                                        Executor executor, // 线程执行器
                                        boolean addTaskWakesUp, // EventLoop是单线程 不能让一个线程没有任务时候处于空转状态 以事件响应机制来驱动线程执行 所以需要一定机制让那个线程阻塞/唤起 在NioEventLoop中利用IO多路复用器机制实现 在DefaultEventLoop中使用阻塞队列机制实现 addTaskWakesUp为true表示使用阻塞队列实现
                                        int maxPendingTasks,
                                        RejectedExecutionHandler
                                                rejectedHandler
    ) {
        super(parent);
        this.addTaskWakesUp = addTaskWakesUp; // NioEventLoop和DefaultEventLoop差异
        this.maxPendingTasks = Math.max(16, maxPendingTasks);
        this.executor = ThreadExecutorMap.apply(executor, this);
        this.taskQueue = this.newTaskQueue(this.maxPendingTasks); // NioEventLoop和DefaultEventLoop差异
        rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
    }

// SingleThreadEventExecutor.java
protected Queue<Runnable> newTaskQueue(int maxPendingTasks) { // DefaultEventLoop不支持复用器 阻塞点发生在任务队列的存取上 因此任务队列的实现使用阻塞队列 NioEventLoop阻塞点发生在复用器上 因此不需要依赖阻塞队列 自己单独去实现
        return new LinkedBlockingQueue<Runnable>(maxPendingTasks);
    }

NioEventLoop中队列实现跟DefaultEventLoop中队列实现不同

2 线程轮询

// DefaultEventLoop.java
@Override
    protected void run() {
        for (;;) {
            Runnable task = this.takeTask();
            if (task != null) {
                task.run();
                updateLastExecutionTime();
            }

            if (confirmShutdown()) {
                break;
            }
        }
    }
// SingleThreadEventExecutor.java
protected Runnable takeTask() {
        assert inEventLoop();
        if (!(taskQueue instanceof BlockingQueue)) {
            throw new UnsupportedOperationException();
        }

        BlockingQueue<Runnable> taskQueue = (BlockingQueue<Runnable>) this.taskQueue;
        for (;;) {
            ScheduledFutureTask<?> scheduledTask = super.peekScheduledTask();
            if (scheduledTask == null) {
                Runnable task = null;
                try {
                    task = taskQueue.take(); // 阻塞点 阻塞队列为空了发生线程阻塞
                    if (task == WAKEUP_TASK) {
                        task = null;
                    }
                } catch (InterruptedException e) {
                    // Ignore
                }
                return task;
            } else {
                long delayNanos = scheduledTask.delayNanos();
                Runnable task = null;
                if (delayNanos > 0) {
                    try {
                        task = taskQueue.poll(delayNanos, TimeUnit.NANOSECONDS); // 阻塞点
                    } catch (InterruptedException e) {
                        // Waken up.
                        return null;
                    }
                }
                if (task == null) {
                    // We need to fetch the scheduled tasks now as otherwise there may be a chance that
                    // scheduled tasks are never executed if there is always one task in the taskQueue.
                    // This is for example true for the read task of OIO Transport
                    // See https://github.com/netty/netty/issues/1614
                    fetchFromScheduledTaskQueue();
                    task = taskQueue.poll(); // 非阻塞方式
                }

                if (task != null) {
                    return task;
                }
            }
        }
    }

三 工作流程图

标签:Netty,task,args,EventLoop,源码,线程,executor,new,null
From: https://www.cnblogs.com/miss-u/p/16897734.html

相关文章

  • Netty源码-06-MpscQueue
    在IO线程NioEventLoop中维护了一个队列实现,用于存放非IO任务,一个IO线程负责N个Channel,为了保证一个线程贯穿始终负责一个Channel的所有任务(任务执行次序有先后区分需要),因......
  • Netty源码-07-Channel
    一类图关系在Java的NIO体系中定义了ServerSocketChannel和SocketChannelNetty为了支持Reactor线程模型和异步编程,自己也实现了与Java中对应的两个实现NioServerSocke......
  • Netty源码-09-ServerBootstrapAcceptor
    在ServerBootstrapAcceptor启用之前,此刻Reactor状态应该是NioServerSocketChannel在IO多路复用器上关注着Accept(16)事件pipeline中有4个handlerheadbossHandlerSer......
  • Netty源码-08-ChannelInitializer
    一回顾几个时机点pipeline的初始化用户向pipeline添加ChannelInitializer辅助实例Channel注册到复用器之后回调1pipeline的初始化初始化Channel的时候触发了pipel......
  • Netty源码-10-ChannelFuture
    Netty为了提高系统的吞吐,大量使用异步线程模型一DemopublicclassFutureTest00{publicstaticvoidmain(String[]args)throwsInterruptedException,Execut......
  • Netty源码-01-NioEventLoopGroup
    一定义摘自源码JavaDoc/***The{@linkEventExecutorGroup}isresponsibleforprovidingthe{@linkEventExecutor}'stouse*viaits{@link#next()}method......
  • Netty源码-02-FastThreadLocalThread
    一DemopublicclassFastThreadLocalTest00{privatefinalstaticFastThreadLocal<Long>v=newFastThreadLocal<Long>(){@Overrideprotec......
  • apache启动遇到phpinfo只显示源码问题
    在安装php和apache的时候,会遇到只显示源码的问题网上找了好多帖子都是在改php.ini的东西,但是改了半天也不对,发现我安装的wordpress目录也打不开,所以我认为这就是apache服......
  • python源码通过词语标记化器tokenize提取注释并正则匹配测试用例作者名
    提取代码如下importtokenizeimportrewithtokenize.open('readcomment.py')asf:list=[]fortoktype,tok,start,end,lineintokenize.generate_t......
  • SpringBoot源码解析
    1.创建SpringApplication对象1.1webApplicationType设置(SERVLET)1.2setInitializers设置通过getSpringFactoriesInstances方法从META-INF/spring.factories下获取k......