首页 > 其他分享 >【Netty】「萌新入门」(三)ChannelFuture 与 CloseFuture

【Netty】「萌新入门」(三)ChannelFuture 与 CloseFuture

时间:2023-07-04 15:31:51浏览次数:62  
标签:Netty 关闭 ChannelFuture sync CloseFuture 线程 萌新 操作 channel

前言

本篇博文是《从0到1学习 Netty》中入门系列的第三篇博文,主要内容是介绍 Netty 中 ChannelFuture 与 CloseFuture 的使用,解决连接问题与关闭问题,往期系列文章请访问博主的 Netty 专栏,博文中的所有代码全部收集在博主的 GitHub 仓库中;


连接问题与 ChannelFuture

在 Netty 中,所有的 I/O 操作都是异步的,因此当你发起一个 I/O 操作时,它会立即返回一个 ChannelFuture 对象,该对象代表了尚未完成的操作。ChannelFuture 提供了一种在操作完成时通知应用程序的机制,以便应用程序可以执行某些操作或检索操作的结果。

例如,在写入数据到 Channel 时,调用 write() 方法将立即返回一个 ChannelFuture 对象,而不是等待数据实际被写入。通过添加侦听器(Listener)到 ChannelFuture,当写操作完成时,侦听器将被通知,从而使应用程序能够对写入数据的结果做出响应。

sync

sync() 方法是 ChannelFuture 接口中的一个同步方法,它将阻塞当前线程,直到这个 ChannelFuture 执行完毕。调用 sync() 方法后会等待对应的 I/O 操作完成,如果操作失败则会抛出异常。

复用上篇博文 从0到1(七):入门-EventLoop 中的服务端代码,略微调整一下客户端代码如下:

@Slf4j
public class ChannelFutureClient {
    public static void main(String[] args) throws InterruptedException {
        ChannelFuture channelFuture = new Bootstrap()
                .group(new NioEventLoopGroup())
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new StringEncoder());
                    }
                })
                .connect(new InetSocketAddress(7999));

        channelFuture.sync();
        Channel channel = channelFuture.channel();
        log.debug(channel.toString());
        channel.writeAndFlush("sidiot.");
    }
}

服务端运行结果:

20:24:04 [DEBUG] [nioEventLoopGroup-4-1] c.s.n.c.EventLoopServer - h1: sidiot..
20:24:09 [DEBUG] [defaultEventLoopGroup-2-1] c.s.n.c.EventLoopServer - h2: sidiot..

但如果将 channelFuture.sync(); 注释掉后,会发现客户端运行之后,服务端并没有像之前一样接收到消息。

客户端运行结果:

# 存在 sync()
20:24:04 [DEBUG] [main] c.s.n.c.ChannelFutureClient - [id: 0x473d8e1a, L:/169.254.80.84:57837 - R:IDIOT/169.254.80.84:7999]

# 注释 sync()
20:24:14 [DEBUG] [main] c.s.n.c.ChannelFutureClient - [id: 0x871ab919]

这是因为 ChannelFuture 是用于异步操作结果通知的类。调用 sync() 将会阻塞当前线程,等待异步操作完成并获取其结果。如果注释掉了 sync() 方法,则程序不会等到连接建立成功后再向服务端发送消息,而是直接执行 writeAndFlush() 方法,此时连接还没有建立成功,所以服务端收不到客户端发的消息。

使用 sync() 方法可以保证在后续代码执行之前,完成当前的操作,这样可以避免一些并发问题。但是需要注意的是,由于 sync() 方法会阻塞当前线程,因此应该尽可能地避免在 I/O 线程中调用 sync() 方法,以免影响整个系统的性能表现。


addListener

除了 sync() 方法之外,我们还可以使用 addListener() 方法来处理结果。

在 Netty 中,addListener() 方法是异步方法,其作用是向 ChannelFuture 添加一个或多个 GenericFutureListener 监听器,用于监听异步操作(例如网络 I/O 操作)执行完成时的事件。当异步操作完成后,这些监听器会被通知,并且可以获取到操作的结果。

channelFuture.addListener(new ChannelFutureListener() {
    @Override
    public void operationComplete(ChannelFuture future) throws Exception {
        Channel channel = future.channel();
        log.debug(channel.toString());
        channel.writeAndFlush("sidiot.");
    }
});

运行结果:

# 服务端
21:21:03 [DEBUG] [nioEventLoopGroup-4-2] c.s.n.c.EventLoopServer - h1: sidiot..
21:21:08 [DEBUG] [defaultEventLoopGroup-2-6] c.s.n.c.EventLoopServer - h2: sidiot..

# 客户端
21:21:03 [DEBUG] [nioEventLoopGroup-2-1] c.s.n.c.ChannelFutureClient - [id: 0xc4465d09, L:/169.254.80.84:58393 - R:IDIOT/169.254.80.84:7999]

对比使用 sync()addListener() 两个方法的客户端结果可以发现,使用 sync() 的客户端的处理线程是当前线程,即 main 线程,而 addListener() 因为是异步方法的关系,其客户端的处理线程就不是当前线程,而是 NIO 线程 nioEventLoopGroup-2-1


小结

sync()addListener() 都是用于在不同组件之间进行通信的方法,但它们的实现方式略有不同。

sync() 是一种通过将属性绑定到一个共享状态来实现组件之间通信的方法。当某个组件更改该绑定的属性时,其他所有使用该属性的组件都会自动更新。这种方法的优点是简单直接,能够快速实现组件之间的数据同步,但缺点是对于大型应用程序,使用全局状态管理可能会变得复杂和混乱。

相比之下,addListener() 则是一种更加灵活的方法,它允许组件之间精确地控制何时以及如何进行通信。addListener() 可以被用于创建事件监听器,使得一个组件可以注册到另一个组件中发生的事件的通知。当事件发生时,触发监听器并向其传递相应的数据。这种方法的优点是,更容易实现针对特定事件的精细控制,并且可以减少对全局状态的依赖。

因此,总的来说,addListener() 更灵活,并且可以更好地适应复杂的应用程序需求,而 sync() 则更适合简单的应用场景。

关闭问题与 CloseFuture

在前面的博文中,博主都是以 DEBUG 的形式来操作客户端的,但这时的客户端都不是被正常关闭的,因此,接下来修改一下代码,使得客户端能够不断向服务端发送消息,并在某一时刻能够被关闭:

Channel channel = channelFuture.sync().channel();
log.debug(channel.toString());

new Thread(() -> {
    Scanner scanner = new Scanner(System.in);
    while (true) {
        String line = scanner.nextLine();
        if ("quit".equals(line)) {
            channel.close();
            break;
        }
        channel.writeAndFlush(line);
    }
}, "input").start();

log.debug("处理 channel 关闭之后的操作");

运行结果:

【Netty】「萌新入门」(三)ChannelFuture 与 CloseFuture_服务端

可以发现 “处理 channel 关闭之后的操作” 并没有等 channel 关闭之后再进行,这是因为在 input 线程运行过程中并没有阻塞主线程,因此,主线程就会继续向下运行,造成了上面的情况;

那如果将 “处理 channel 关闭之后的操作” 移动到 channel.close(); 后面是不是就可以了呢?

if ("quit".equals(line)) {  
    channel.close();  
    log.debug("处理 channel 关闭之后的操作");  
    break;  
}

接下来我们进行验证,在 pipeline 中新增一个 handler:

ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));

同时需要在配置文件 logback.xml 中增加下述代码:

<logger name="io.netty.handler.logging.LoggingHandler" level="DEBUG" additivity="false">  
    <appender-ref ref="STDOUT" />  
</logger>

运行结果:

【Netty】「萌新入门」(三)ChannelFuture 与 CloseFuture_服务端_02

根据运行结果可以发现,将 “处理 channel 关闭之后的操作” 移动到 channel.close(); 后面的方法也是行不通的,因为这两个操作不属于同一个线程;

“处理 channel 关闭之后的操作” 是在 input 线程中执行的,而 channel.close(); 则是在 NIO 线程 nioEventLoopGroup-2-1 中所执行的,因此两个线程谁先谁后是不一定的,这是由 CPU 调度器决定的;


这里,我们可以使用 closeFuture() 来解决问题,closeFuture() 方法可以让我们监听 Channel 关闭事件,从而在 Channel 关闭后执行一些特定的逻辑。例如,在处理连接断开的情况下,我们可以等待 closeFuture() 的完成,并在其完成后释放资源或清理状态。

closeFuture()ChannelFuture() 相似,同样是有同步方法 sync 和异步方法 addaddListener 两种方式;

sync

ChannelFuture closeFuture = channel.closeFuture();
System.out.println("Waiting Close...");
closeFuture.sync();
log.debug("处理 channel 关闭之后的操作");

运行结果:

【Netty】「萌新入门」(三)ChannelFuture 与 CloseFuture_客户端_03

addaddListener

ChannelFuture closeFuture = channel.closeFuture();
System.out.println("Waiting Close...");
closeFuture.addListener((ChannelFutureListener) future -> {  
    log.debug("处理 channel 关闭之后的操作");  
    group.shutdownGracefully();  
});

运行结果:

【Netty】「萌新入门」(三)ChannelFuture 与 CloseFuture_服务端_04


后记

ChannelFuture 表示一个操作的异步结果,它提供了一种可以等待操作完成的机制,并且可以注册监听器来处理操作完成后的回调;而 CloseFuture 则表示一个通道关闭的异步结果,它允许我们等待通道关闭操作的完成,并在关闭完成后执行相应的逻辑。

总之,ChannelFuture 和 CloseFuture 提供了强大的功能来处理连接问题和关闭问题,使得网络编程变得更加高效、可靠和易于管理。通过深入理解和灵活运用这些概念,我们可以更好地构建稳定和可靠的网络应用程序。

以上就是 ChannelFuture 与 CloseFuture 的所有内容了,希望本篇博文对大家有所帮助!

参考:


标签:Netty,关闭,ChannelFuture,sync,CloseFuture,线程,萌新,操作,channel
From: https://blog.51cto.com/sidiot/6615232

相关文章

  • Netty-LengthFieldBasedFrameDecoder-解决拆包粘包问题的解码器
    LengthFieldBasedFrameDecoder的构造器参数中包括:maxFrameLength:指定解码器所能处理的数据包的最大长度,超过该长度则抛出TooLongFrameException异常。lengthFieldOffset:指定长度字段的起始位置。lengthFieldLength:指定长度字段的长度。lengthAdjustment:指定长度字段所表示......
  • 和 @血源萌新☜ 的 论战帖
    是论战帖, 不是引战帖 。  其实我挺喜欢引战 的 。 本帖的起因是   《出个题:证明欧氏几何下两点之间线段最短》     https://tieba.baidu.com/p/8485185542   。 还有  《检验反相能力,一题不会者不配反相!》     https://ti......
  • Netty-TCP 01.编解码
    本文是使用Netty开发一个简单的TCP通讯(聊天)应用程序的第【1】部分,主要介绍编解码实现。定制协议一般来说,开发TCP通讯应用程序,定制通讯协议是不可避免的,这里以一种最简单的协议为例,假设一个TCP通讯数据包,包含三部分:[type][size][content]type:数据包类型(长度为一个字节,即1个by......
  • Netty-TCP 02.客户端
    本文是使用Netty开发一个简单的TCP通讯(聊天)应用程序的第【2】部分,主要介绍客户端的实现。模块划分TCP简单TCP通讯(聊天)应用程序客户端主要分为三个部分:心跳保活处理消息消费处理TCP连接实现心跳保活心跳保活是目的是告诉服务端客户端是在线的,当客户端空闲时,定时给服务端发......
  • Netty-TCP 03.服务端
    本文是使用Netty开发一个简单的TCP通讯(聊天)应用程序的第【3】部分,主要介绍服务端的实现。模块划分跟客户端类似,服务端也是主要分为三个部分:心跳检测处理消息消费处理TCP服务实现心跳检测服务端需要定时检测客户端是否在线(即是否发送心跳),如果没有,那么将客户端连接断开,同样......
  • Netty-TCP 04.发消息
    本文是使用Netty开发一个简单的TCP通讯(聊天)应用程序的第【4】部分,主要测试客户端和服务端的通讯。服务端下面是服务端测试代码:/***@authormichong*/publicclassTCPServer{publicstaticvoidmain(String[]args){TCPServerBootstrapbootstrap=ne......
  • Netty——5、源码分析
    1、启动剖析我们来看看netty中对下面的代码是怎样处理的。publicclassTest{publicstaticvoidmain(String[]args)throwsIOException{//1netty中使用NioEventLoop(简称nioboss线程)来封装线程和selectorSelectorselector=Selector.o......
  • @血源萌新☜ 怎么推导 黎曼几何 球面短程线 ?
    怎么推导黎曼几何球面短程线,  我问过   @血源萌新☜  两次, 一次是在反相吧, 一次是在高级民科吧,  见  反相吧 《【水】老杨终于露出了维相真面目》     https://tieba.baidu.com/p/8297248311   15楼, 高级民科吧   《4维度正方......
  • Netty
    Netty是一个高性能、异步事件驱动的NIO框架,基于JavaNIO提供的API实现。它提供了对TCP、UDP和文件传输的支持,作为一个异步NIO框架,Netty的所有IO操作都是异步非阻塞的,通过Future-Listener机制,用户可以方便的主动获取或者通过通知机制获取IO操作结果。Netty高性能在IO编程过程中,当需......
  • Netty——4、优化
    1、扩展序列化算法序列化、反序列化主要用在消息正文的转换上:序列化时,需要将Java对象变为要传输的数据(可以是byte[],或json等,最终都需要变成byte[]);反序列化时,需要将传入的正文数据还原成Java对象,便于处理。目前的代码仅支持Java自带的序列化,反序列化机制,核心代码如......