首页 > 其他分享 >【Netty】「萌新入门」(二)剖析 EventLoop

【Netty】「萌新入门」(二)剖析 EventLoop

时间:2023-06-15 20:31:43浏览次数:44  
标签:Netty EventLoop nioEventLoopGroup EventLoopServer 萌新 DEBUG 线程 EventLoopGroup

前言

本篇博文是《从0到1学习 Netty》中入门系列的第二篇博文,主要内容是介绍 Netty 中 EventLoop 的使用,优化及源码解析,往期系列文章请访问博主的 Netty 专栏,博文中的所有代码全部收集在博主的 GitHub 仓库中;


概述

事件循环对象 EventLoop

在 Netty 中,EventLoop 是用于处理 I/O 事件的线程。它允许我们在单线程中同时处理多个连接,避免了阻塞和等待。

简单来说,Netty 的 EventLoop 具有以下几个特点:

  1. 单线程执行:每个 EventLoop 都是由一个线程负责执行,这样可以减少线程切换开销,提高网络应用程序的性能。
  2. 多路复用:EventLoop 使用底层操作系统提供的 I/O 模型(例如 Java NIO)实现多路复用,即一条线程可以监听多个 Channel,从而实现并发处理多个连接的能力。
  3. 事件驱动:EventLoop 通过监听与 Channel 相关的 I/O 事件(例如读、写、连接等),并将其转化为事件对象(例如 ChannelReadEventChannelWriteEvent 等)进行处理。这种事件驱动模型可以让 Netty 高效地响应 I/O 事件,避免了轮询等不必要的操作。
  4. 非阻塞操作:EventLoop 中所有的操作都是非阻塞的,包括 I/O 操作和异步任务的执行。这样可以确保整个系统始终处于高效运行状态,并提高了系统的可伸缩性。

其中,EventLoop 的继承关系如下:

public interface EventLoop extends OrderedEventExecutor, EventLoopGroup {
    EventLoopGroup parent();
}

public interface EventLoopGroup extends EventExecutorGroup {...}

public interface EventExecutorGroup extends 
    ScheduledExecutorService, Iterable<EventExecutor> {...}
  • 继承自 java.util.concurrent.ScheduledExecutorService; 因此包含了线程池中所有的方法;
  • 继承自 Netty 自己的 OrderedEventExecutor
  • 提供了 boolean inEventLoop(Thread var1) 方法判断一个线程是否属于此 EventLoop;
  • 提供了 EventLoopGroup parent() 方法来看看自己属于哪个 EventLoopGroup;

总之,Netty 中的 EventLoop 提供了一种高性能、高可靠性的网络编程模型,它解决了网络应用程序中的并发、高负载等问题,是构建高性能网络应用程序的重要组成部分。


事件循环组 EventLoopGroup

在 Netty 中,EventLoopGroup 是用于处理 I/O 操作和任务执行的线程池。它负责管理一个或多个 EventLoop 实例,每个 EventLoop 负责处理其分配的所有 Channel 的生命周期中发生的事件。

EventLoopGroup 通常会创建两种类型的线程池:Boss GroupWorker Group。Boss Group 负责监听传入连接的请求,而 Worker Group 则负责处理已经建立的连接的读写操作

当一个新的连接到来时,Boss Group 会将连接请求注册到某个 EventLoop 的 Selector 上,并将其关联到对应的 Channel 对象。随后,Worker Group 将负责处理新连接上的所有 I/O 操作。

EventLoopGroup 还提供了一些方法,例如 shutdownGracefully(),可以优雅关闭 EventLoopGroup 的所有线程,并等待它们完成未完成的任务。这对于保证程序的安全关闭非常有用。

其中,EventLoopGroup 的继承关系如下:

public interface EventLoopGroup extends EventExecutorGroup {...}

public interface EventExecutorGroup extends 
    ScheduledExecutorService, Iterable<EventExecutor> {...}
  • 继承自 Netty 自己的 EventExecutorGroup
  • 实现了 Iterable 接口提供遍历 EventLoop 的能力;
  • 另有 next 方法获取集合中下一个 EventLoop;

执行任务

1、创建事件循环组,线程数 nThreads 可采用默认设置或者根据需求自定义:

EventLoopGroup group = new NioEventLoopGroup(2);

2、获取下一个事件循环对象:

group.next();

这里采用的是轮询的方式进行分配,主要是为了任务分工比较均匀:

System.out.println(group.next());  
System.out.println(group.next());  
System.out.println(group.next());  
System.out.println(group.next());

运行结果:

io.netty.channel.nio.NioEventLoop@4b14c583
io.netty.channel.nio.NioEventLoop@65466a6a
io.netty.channel.nio.NioEventLoop@4b14c583
io.netty.channel.nio.NioEventLoop@65466a6a

3、执行普通任务,使用 submit

group.next().submit(() -> {
    try {
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    }
    log.debug(Thread.currentThread().getName());
});

log.debug(Thread.currentThread().getName());

运行结果:

15:49:31 [DEBUG] [main] c.s.n.c.TestEventLoop - main
15:49:32 [DEBUG] [nioEventLoopGroup-2-1] c.s.n.c.TestEventLoop - nioEventLoopGroup-2-1

4、执行定时任务,使用 scheduleAtFixedRate

group.next().scheduleAtFixedRate(() -> {
    log.debug("sidiot.");
}, 0, 1, TimeUnit.SECONDS);

log.debug(Thread.currentThread().getName());

运行结果:

16:26:44 [DEBUG] [nioEventLoopGroup-2-1] c.s.n.c.TestEventLoop - sidiot.
16:26:44 [DEBUG] [main] c.s.n.c.TestEventLoop - main
16:26:45 [DEBUG] [nioEventLoopGroup-2-1] c.s.n.c.TestEventLoop - sidiot.
16:26:46 [DEBUG] [nioEventLoopGroup-2-1] c.s.n.c.TestEventLoop - sidiot.
16:26:47 [DEBUG] [nioEventLoopGroup-2-1] c.s.n.c.TestEventLoop - sidiot.

5、关闭 EventLoopGroup

group.shutdownGracefully();

shutdownGracefully 方法会优雅地关闭 EventLoopGroup。该方法首先会切换 EventLoopGroup 到关闭状态从而拒绝新的任务的加入,然后在任务队列的任务都处理完成后,停止线程的运行,从而确保整体应用是在正常有序的状态下退出的。


IO 任务

在上篇博文 从0到1(六):入门-Hello World 中有详细的解析,因此这里就不展开叙述,直接给出代码;

服务端

@Slf4j
public class EventLoopServer {
    public static void main(String[] args) {
        new ServerBootstrap()
                .group(new NioEventLoopGroup())
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                ByteBuf buf = (ByteBuf) msg;
                                log.debug(buf.toString(StandardCharsets.UTF_8));
                            }
                        });
                    }
                })
                .bind(7999);
    }
}

客户端

public class EventLoopClient {
    public static void main(String[] args) throws InterruptedException {
        Channel channel = new Bootstrap()
                .group(new NioEventLoopGroup())
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new StringEncoder());
                    }
                })
                .connect(new InetSocketAddress(7999))
                .sync()
                .channel();

        System.out.println(channel);
    }
}

运行结果:

17:57:12 [DEBUG] [nioEventLoopGroup-2-2] c.s.n.c.EventLoopServer - C1: sidiot.
17:57:34 [DEBUG] [nioEventLoopGroup-2-3] c.s.n.c.EventLoopServer - C2: sidiot.
17:57:50 [DEBUG] [nioEventLoopGroup-2-3] c.s.n.c.EventLoopServer - C2: sid10t.
17:57:58 [DEBUG] [nioEventLoopGroup-2-2] c.s.n.c.EventLoopServer - C1: sid10t.

从运行结果中不难发现,每个客户端都会绑定一个固定的线程进行处理,这是因为当一个客户端连接到服务器时,Netty 会创建一个新的 Channel,并将其注册到一个 EventLoop 上。

每个客户端所对应的 Channel 将由同一个 EventLoop 来处理,这样可以保证处理消息的顺序,并且避免了线程上下文切换的开销。

【Netty】「萌新入门」(二)剖析 EventLoop_java


细化分工

在上述 IO 任务中,我们只使用了一个 NioEventLoopGroup 对所有事件进行处理。为了提高效率,需要进一步的细化分工,我们将使用两个 EventLoopGroup 分别作为 BossWorkerBoss 负责处理 Accept 事件,而 Worker 负责 Read & Write 事件;

.group(new NioEventLoopGroup(), new NioEventLoopGroup(2))

同时考虑到,如果 handler 的执行时间过长,则会占用 Worker 的 NIO 线程,即会影响 Worker 的读写效率,因此还需要再次进行细分,创建一个独立的 EventLoopGroup 用来处理比较耗时的 handler;

在不细分的情况下,运行结果如下:

10:30:19 [DEBUG] [nioEventLoopGroup-4-2] c.s.n.c.EventLoopServer - h1: C1: sid10t.
10:30:20 [DEBUG] [nioEventLoopGroup-4-1] c.s.n.c.EventLoopServer - h1: C3: sid10t.
10:30:24 [DEBUG] [nioEventLoopGroup-4-2] c.s.n.c.EventLoopServer - h2: C1: sid10t.
10:30:24 [DEBUG] [nioEventLoopGroup-4-2] c.s.n.c.EventLoopServer - h1: C2: sid10t.
10:30:25 [DEBUG] [nioEventLoopGroup-4-1] c.s.n.c.EventLoopServer - h2: C3: sid10t.
10:30:29 [DEBUG] [nioEventLoopGroup-4-2] c.s.n.c.EventLoopServer - h2: C2: sid10t.
10:30:29 [DEBUG] [nioEventLoopGroup-4-2] c.s.n.c.EventLoopServer - h1: C4: sid10t.
10:30:34 [DEBUG] [nioEventLoopGroup-4-2] c.s.n.c.EventLoopServer - h2: C4: sid10t.

修改后的代码如下:

EventLoopGroup group = new DefaultEventLoopGroup();

.childHandler(new ChannelInitializer<NioSocketChannel>() {
    @Override
    protected void initChannel(NioSocketChannel ch) throws Exception {
        ch.pipeline().addLast("h1", new ChannelInboundHandlerAdapter() {
            @Override
            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                ByteBuf buf = (ByteBuf) msg;
                log.debug("h1: {}.", buf.toString(StandardCharsets.UTF_8));
                ctx.fireChannelRead(msg);
            }
        }).addLast(group, "h2", new ChannelInboundHandlerAdapter() {
            @Override
            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                try {
                    Thread.sleep(5000);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                ByteBuf buf = (ByteBuf) msg;
                log.debug("h2: {}.", buf.toString(StandardCharsets.UTF_8));
            }
        });
    }
})

运行结果:

10:35:31 [DEBUG] [nioEventLoopGroup-4-2] c.s.n.c.EventLoopServer - h1: C1: sidiot..
10:35:32 [DEBUG] [nioEventLoopGroup-4-1] c.s.n.c.EventLoopServer - h1: C2: sidiot..
10:35:32 [DEBUG] [nioEventLoopGroup-4-2] c.s.n.c.EventLoopServer - h1: C3: sidiot..
10:35:33 [DEBUG] [nioEventLoopGroup-4-1] c.s.n.c.EventLoopServer - h1: C4: sidiot..
10:35:36 [DEBUG] [defaultEventLoopGroup-2-2] c.s.n.c.EventLoopServer - h2: C1: sidiot..
10:35:37 [DEBUG] [defaultEventLoopGroup-2-3] c.s.n.c.EventLoopServer - h2: C2: sidiot..
10:35:37 [DEBUG] [defaultEventLoopGroup-2-4] c.s.n.c.EventLoopServer - h2: C3: sidiot..
10:35:38 [DEBUG] [defaultEventLoopGroup-2-1] c.s.n.c.EventLoopServer - h2: C4: sidiot..

通过前后的结果对比,可以看出,在不使用独立的 EventLoopGroup 进行细分的情况下,耗时的 handler 会一直占用 NIO 线程,从而使得其他的 channel 需要进行等待,导致效率低下;

【Netty】「萌新入门」(二)剖析 EventLoop_后端_02


源码浅析

在上述过程中,多个 handler 用的是不同的 EventLoop,那它们是如何进行切换的呢?

在 Netty 框架中,当有数据需要被读取时,会将读取操作封装成一个 ChannelRead 事件,并通过 ChannelHandlerContext 传递给后续的处理器来处理。

其中,有一个重要的方法 invokeChannelRead,用于调用下一个 ChannelHandler 的 channelRead 方法,即实现数据的传递和处理的过程。

static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
    final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        next.invokeChannelRead(m);
    } else {
        executor.execute(new Runnable() {
            @Override
            public void run() {
                next.invokeChannelRead(m);
            }
        });
    }
}

在上述代码中,抽象类 AbstractChannelHandlerContext 表示了一个 ChannelHandler 上下文对象,它记录了处理当前 ChannelHandler 的信息,以及与之前后的 ChannelHandler 的关系等。

接着,使用了 pipeline.touch() 方法来标记并检查消息对象 msg 是否为空。该方法可以避免在处理过程中重复处理同一个消息,提高处理效率。如果 msg 为空,则会抛出异常提示 “msg 不能为空”。

然后,通过 next.executor() 获取到 EventExecutor,即执行 ChannelHandler 的线程池。如果当前线程为 Netty 线程(即 IO 线程),则直接调用 next.invokeChannelRead(m) 执行下一个 ChannelHandler 的 channelRead 方法;否则,将该任务加入到线程池中异步执行。

最后,使用匿名内部类实现 Runnable 接口,定义 run 方法来执行下一个 ChannelHandler 的 channelRead 方法。

总之,invokeChannelRead 方法的主要作用就是封装了 ChannelHandlerContext 的读取操作,根据当前线程的执行情况选择是否异步执行。这种机制可以提高网络读取的效率和性能,避免了阻塞 IO 操作带来的性能问题,并使得处理过程更加灵活和可控。


后记

以上就是 剖析 EventLoop 的所有内容了,希望本篇博文对大家有所帮助!

参考:


标签:Netty,EventLoop,nioEventLoopGroup,EventLoopServer,萌新,DEBUG,线程,EventLoopGroup
From: https://blog.51cto.com/sidiot/6476868

相关文章

  • 【Netty】Netty部分源码分析(启动流程,EventLoop,accept流程,read流程)
    源码分析Netty源码中调用链特别长,且涉及到线程切换等步骤,令人头大:)1启动剖析我们就来看看netty中对下面的代码是怎样进行处理的//1netty中使用NioEventLoopGroup(简称nioboss线程)来封装线程和selectorSelectorselector=Selector.open();//2创建NioServerSo......
  • 【Netty】一个RPC实例
    Netty实现简易RPC调用总体流程:客户端发起rpc调用请求,封装好调用的接口名,函数名,返回类型,函数参数类型,函数参数值等属性,将消息发送给服务器。服务器的handler解析rpc请求,调用对应方法,并将方法结果写回客户端。客户端在主线程发送消息后,准备一个空Promise对象,用来接收结果。在......
  • 事件队列(EventLoop)【宏任务,微任务】
    一、概念event:事件loop:循环,循环的是一个又一个的任务队列任务队列:是一个先进先出的数据结构,排在前面的事件,优先被主线程读取任务队列分为:宏队列,微队列,分别存放宏任务和微任务二、宏任务【多个】、微任务【1个】微任务一般比宏任务先执行,并且微任务队列只有一个,宏任务队列可......
  • 跟着源码学IM(十一):一套基于Netty的分布式高可用IM详细设计与实现(有源码)
    本文由will分享,个人博客zhangyaoo.github.io,原题“基于Netty的IM系统设计与实现”,有修订和重新排版。1、引言本文将要分享的是如何从零实现一套基于Netty框架的分布式高可用IM系统,它将支持长连接网关管理、单聊、群聊、聊天记录查询、离线消息存储、消息推送、心跳、分布式唯......
  • 【Netty】02-入门
    二.Netty入门1.概述1.1Netty是什么?Nettyisanasynchronousevent-drivennetworkapplicationframeworkforrapiddevelopmentofmaintainablehighperformanceprotocolservers&clients.Netty是一个异步的、基于事件驱动的网络应用框架,用于快速开发可维护、......
  • 【Netty】从0到1(二):NIO-阻塞模式与非阻塞模式
    前言本篇博文是《从0到1学习Netty》系列的第二篇博文,主要内容是通过NIO来理解阻塞模式与非阻塞模式,往期系列文章请访问博主的Netty专栏,博文中的所有代码全部收集在博主的GitHub仓库中;介绍阻塞模式在JavaNIO中,阻塞模式是一种传统的I/O处理方式,当我们试图从通道进行读取......
  • 记一次线上问题,Netty接收到的报文一次有数据一次没有数据
    最近线上遇到一个问题,客户端发送的tcp报文第一次连接成功后没有数据,第二次连接后正常带数据,第三次又没有数据...问题排查1:是否有负载均衡,其中有一台机器出现了异常,会出现一次成功一次失败的情况经过排查,本服务是没有负载均衡的,排除问题排查2:抓包分析 根据抓包数据,异常情况时......
  • 【Netty】从0到1(一):NIO-认识 ByteBuffer
    前言本篇博文是《从0到1学习Netty》系列的第一篇博文,主要内容是介绍NIO的核心之一Buffer中的ByteBuffer,往期系列文章请访问博主的Netty专栏,博文中的所有代码全部收集在博主的GitHub仓库中;什么是Netty?Netty是一个高性能、异步事件驱动的网络应用程序框架,主要用于快速开......
  • 【Netty底层数据交互源码】
    (文章目录)如何学习Netty的底层深入了解Netty的底层实现需要对JavaNIO、OSI模型、TCP/IP协议栈等底层网络知识有一定的了解。下面是一些建议,可以帮助你更深入地了解Netty的底层实现:学习JavaNIO:JavaNIO是Java中用于处理I/O操作的一套库。在深入了解Netty的底层实现时,你需要......
  • netty入门demo
    参考博客:(14条消息)【Netty整理01-快速入门】Netty简单使用Demo(已验证)_the_fool_的博客-CSDN博客ServerHandler.javapackagecom.hmb;importio.netty.buffer.ByteBuf;importio.netty.channel.ChannelHandlerContext;importio.netty.channel.ChannelInboundHandlerAdapt......