首页 > 编程语言 >【Netty】「源码解析」(三)设置连接超时:深入分析 ChannelFuture.sync() 的执行过程

【Netty】「源码解析」(三)设置连接超时:深入分析 ChannelFuture.sync() 的执行过程

时间:2023-07-11 22:33:00浏览次数:49  
标签:Netty ChannelFuture sync 源码 线程 超时 连接

前言

本篇博文是《从0到1学习 Netty》中源码系列的第三篇博文,主要内容是深入分析连接超时的实现原理,包括了 connect 方法的源码解析和 ChannelFuture.sync() 执行过程的解析。,往期系列文章请访问博主的 Netty 专栏,博文中的所有代码全部收集在博主的 GitHub 仓库中;

介绍

在实际应用中,当客户端尝试连接服务器时,可能会面临多种原因导致连接失败的情况。为了避免无限等待,我们可以在客户端代码中设置一个超时连接时间 CONNECT_TIMEOUT_MILLIS,该时间表示客户端尝试连接服务器的最长时间限制,如果在指定的超时时间内未能成功建立连接,客户端应该主动抛出连接超时的异常。

.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 1000)

上述代码的作用是设置连接超时时间为 1000 毫秒,这个选项用于指定连接建立的最大时间,如果超过该时间仍未建立连接,则会放弃连接尝试。

运行结果:

【Netty】「源码解析」(三)设置连接超时:深入分析 ChannelFuture.sync() 的执行过程_java

然而,当服务器没有启动时,且连接超时时间大于 2 秒钟时,则会抛出连接被拒绝的异常,运行结果如下所示:

【Netty】「源码解析」(三)设置连接超时:深入分析 ChannelFuture.sync() 的执行过程_后端_02

这是 Java 底层的网络异常。

需要完整代码的读者请访问博主的 Github:TestConnectionTimeout

接下来让我们探索 connect()ChannelFuture.sync() 的执行过程。

connect 源码解析

我们先来探究成功执行连接超时所进行的过程,核心方法 connect() 的部分源码如下所示:

@Override
public final void connect(
        final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
    ...
    try {
        ...
        // Schedule connect timeout.
        int connectTimeoutMillis = config().getConnectTimeoutMillis();
        if (connectTimeoutMillis > 0) {
            connectTimeoutFuture = eventLoop().schedule(new Runnable() {
                @Override
                public void run() {
                    ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise;
                    ConnectTimeoutException cause =
                            new ConnectTimeoutException("connection timed out: " + remoteAddress);
                    if (connectPromise != null && connectPromise.tryFailure(cause)) {
                        close(voidPromise());
                    }
                }
            }, connectTimeoutMillis, TimeUnit.MILLISECONDS);
        }
        ...
    } catch (Throwable t) {
        promise.tryFailure(annotateConnectException(t, remoteAddress));
        closeIfClosed();
    }
}

上述代码的主要内容是根据配置获取连接超时时间,并使用事件循环调度一个定时任务,在指定的时间内检查连接是否超时。如果连接超时,会触发一个 ConnectTimeoutException 异常,并尝试向 connectPromise 发送连接超时的失败信息;否则,连接超时任务被取消,通道关闭。

那主线程是如何知道消息的呢?其实是通过 connectPromise 进行传递消息,我们可以在主线程中标记一下 future,如下图所示:

【Netty】「源码解析」(三)设置连接超时:深入分析 ChannelFuture.sync() 的执行过程_连接超时_03

然后切换至 NIO 线程,可以发现 connectPromise 也被标记了,说明他们共属于一个主体,如下图所示:

【Netty】「源码解析」(三)设置连接超时:深入分析 ChannelFuture.sync() 的执行过程_java_04

如果不是很了解 FuturePromise 之间的联系的话,可以阅读博主的另一篇文章:异步编程模型:利用 Future 和 Promise 提高性能与响应能力

在上述事例中,我们设置了两秒钟的连接超时时间,由于两秒钟内客户端并没有与服务器建立连接,因此触发了定时任务,执行了 run() 方法,抛出了连接超时异常 ConnectTimeoutException

ChannelFuture.sync() 执行过程解析

下面是 ChannelFuture.sync() 方法的执行过程:

  1. 调用 ChannelFuture.sync() 方法将当前线程阻塞,直到对应的操作完成或发生异常。
  2. sync() 方法内部,会获取当前线程绑定的 EventLoop 对象,然后将当前任务包装成一个特殊的 Promise 对象。
  3. Promise 对象会被注册到 EventLoop 中的任务队列中,等待执行。EventLoop 会按顺序从任务队列中取出任务并执行。
  4. 一旦 Promise 执行完成,即异步操作完成或发生异常,sync() 方法会解除当前线程的阻塞状态,并返回操作的结果或抛出异常。
  5. 操作成功完成,可以通过 ChannelFuture.isSuccess() 方法检查操作是否成功。如果成功,可以继续执行后续的操作;如果失败,可以通过 ChannelFuture.cause() 方法获取失败的原因。

【Netty】「源码解析」(三)设置连接超时:深入分析 ChannelFuture.sync() 的执行过程_后端_05

需要注意的是,由于 ChannelFuture.sync() 是一个同步阻塞方法,如果在事件循环线程中调用该方法,可能会导致死锁或性能问题。因此,通常建议在其他线程中使用 ChannelFuture.addListener() 方法注册监听器来处理异步操作的结果,而不是直接使用 sync() 方法。

sync 源码解析

首先使用 super.sync() 调用了父类的 sync() 方法,将当前对象作为结果返回。

@Override  
public ChannelPromise sync() throws InterruptedException {  
    super.sync();  
    return this;  
}

上述代码的目的是在执行特定的同步操作后,返回当前的 ChannelPromise 对象。在这种情况下,子类通过调用父类的 sync() 方法来实现同步操作,并在执行完成后返回当前对象,以便支持链式调用或其他需要获取该对象的操作。

然后在父类的 sync() 方法中,调用 await()rethrowIfFailed() 来实现同步等待和异常检查,并返回当前对象。

@Override  
public Promise<V> sync() throws InterruptedException {  
    await();  
    rethrowIfFailed();  
    return this;  
}

在之后的几个方法中,就不对子类做过多的介绍了。

await 源码解析

await 方法是一种等待机制的实现,它通过检查承诺是否已完成,处理中断异常以及使用同步块和等待机制来让线程等待承诺的完成。

@Override  
public Promise<V> await() throws InterruptedException {  
    if (isDone()) {  
        return this;  
    }  
    // 处理线程中断
    if (Thread.interrupted()) {  
        throw new InterruptedException(toString());  
    }  
    // 检查死锁
    checkDeadLock();  

    synchronized (this) {  
        while (!isDone()) {  
            incWaiters();  
            try {  
                wait();  
            } finally {  
                decWaiters();  
            }  
        }  
    }  
    return this;  
}

在上述代码中,如果 isDone() 方法返回 true,说明该承诺已经完成,直接返回当前对象。

Thread.interrupted() 用于检查当前线程是否被中断,如果是,则抛出 InterruptedException 异常,并将当前对象的字符串表示作为异常消息。

checkDeadLock() 方法用于检查是否存在死锁情况。

对于 synchronized (this) {...} 代码块,使用当前对象作为同步锁,确保在多线程环境下只有一个线程可以进入代码块。其中,该代码块核心为当承诺未完成时,一直执行循环。

在循环内部,调用 incWaiters() 方法增加等待中的线程计数器。同时,调用 wait() 方法,使当前线程进入等待状态,直到其他线程调用该对象的 notify()notifyAll() 方法唤醒。但无论如何,最终都会执行 decWaiters() 方法来减少等待中的线程计数器。

接下来,我们看看 isDone() 方法的具体实现。

isDone 源码解析

private static boolean isDone0(Object result) {  
    return result != null && result != UNCANCELLABLE;  
}

上述代码主要作用是判断给定的 result 是否满足完成的条件。

后记

我们深入分析了 ChannelFuture.sync() 方法的执行过程,通过对 connect 源码的解析,我们了解到它在超时连接设置中的作用。而在 ChannelFuture.sync() 的执行过程中,我们进一步解析了 syncawaitisDone 的源码。

这些源码解析的过程帮助我们更好地理解了 ChannelFuture.sync() 方法的执行流程,并且使我们能够更好地降低意外情况的发生率,并提高系统的稳定性和可靠性。

以上就是 设置连接超时:深入分析 ChannelFuture.sync() 的执行过程 的所有内容了,希望本篇博文对大家有所帮助!

参考:


标签:Netty,ChannelFuture,sync,源码,线程,超时,连接
From: https://blog.51cto.com/sidiot/6681779

相关文章

  • 使用LabVIEW实现 DeepLabv3+ 语义分割含源码
    (文章目录)前言图像分割可以分为两类:语义分割(SemanticSegmentation)和实例分割(InstanceSegmentation),前面已经给大家介绍过两者的区别,并就如何在labview上实现相关模型的部署也给大家做了讲解,今天和大家分享如何使用labview实现deeplabv3+的语义分割,并就PascalVOC2012(DeepL......
  • 老杜 JavaWeb 讲解(九) ——模板方法设计模式、HttpServlet源码分析
    (十一)模板方法设计模式、HttpServlet源码分析对应视频:20-HttpServlet源码分析及web欢迎页11.1模板方法设计模式不用使用在上面右侧表格中,Person就是模板方法设计模式当中的模板类,通常是抽象类。day()方法就是模板方法设计模式当中的模板方法。模......
  • 【.NET源码解读】深入剖析中间件的设计与实现
    合集-.NET源码解读系列(4) 1..NET通过源码深究依赖注入原理05-172.【.NET源码解读】Configuration组件及自动更新05-303..NET源码解读kestrel服务器及创建HttpContext对象流程06-164.【.NET源码解读】深入剖析中间件的设计与实现06-29收起 .NET本身就是一个基于......
  • day-3 路由底层源码
    1.定义路由本质比如在url.py定义以下路由,浏览器中输入http://192.168.0.1:8000/user/2003-04-21可以访问意味着此urlhttp://192.168.0.1:8000/user/2003-04-21和url.py里的路由们做了路由匹配如果匹配成功找到相应的试图函数  源码解析ctrl+鼠标左键点进re_path,会发......
  • 使用LabVIEW实现 DeepLabv3+ 语义分割含源码
    前言图像分割可以分为两类:语义分割(SemanticSegmentation)和实例分割(InstanceSegmentation),前面已经给大家介绍过两者的区别,并就如何在labview上实现相关模型的部署也给大家做了讲解,今天和大家分享如何使用labview实现deeplabv3+的语义分割,并就PascalVOC2012(DeepLabv3Plus-Mo......
  • 视频直播源码,调整颜色,附颜色大全
    视频直播源码,调整颜色,附颜色大全使用示范:importmatplotlib.pyplotaspltplt.imshow(data.images[0],   #负责对图像进行处理 imge类型:<class'numpy.ndarray'>      cmap=plt.cm.gray_r,    #cmap参数:为调整显示颜色 gray为黑白色,加_r取反为......
  • 禁忌搜索算法解决配电网无功优化问题对应的MATLAB源码,有对应的参考资料。
    禁忌搜索算法解决配电网无功优化问题对应的MATLAB源码,有对应的参考资料。电力系统配电网的无功优化规划是保证配电网安全、经济运行的一项有效手段,是降低网损、提高电压质量的重要措施。因此,电力系统配电网无功优化规划问题的研究,既具有理论意义,又具有工程实际应用价值。配电系统......
  • readability-lxml 源码解析(三):`readability.py`
    #!/usr/bin/envpythonfrom__future__importprint_functionimportloggingimportreimportsysfromlxml.etreeimporttounicodefromlxml.etreeimport_ElementTreefromlxml.htmlimportdocument_fromstringfromlxml.htmlimportfragment_fromstringfrom......
  • readability-lxml 源码解析(一)
    browser.pydefopen_in_browser(html):"""OpentheHTMLdocumentinawebbrowser,savingittoatemporaryfiletoopenit.Notethatthisdoesnotdeletethefileafteruse.Thisismainlymeantfordebugging."......
  • GGTalk 开源即时通讯系统源码剖析之:虚拟数据库
    继上篇《GGTalk开源即时通讯系统源码剖析之:服务端全局缓存》详细介绍了GGTalk对需要频繁查询数据库的数据做了服务端全局缓存处理,以降低数据库的读取压力以及加快客户端请求的响应,接下来我们将进入GGTalk服务端的虚拟数据库。GGTalkV8.0除了支持真实的数据库外,还内置了虚......