首页 > 编程语言 >Netty源码-10-ChannelFuture

Netty源码-10-ChannelFuture

时间:2022-11-16 22:34:42浏览次数:39  
标签:异步 return Netty ChannelFuture listeners 源码 线程 result 监听器

Netty为了提高系统的吞吐,大量使用异步线程模型

一 Demo

public class FutureTest00 {

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        CountDownLatch latch = new CountDownLatch(3);
        EventLoopGroup group = new DefaultEventLoopGroup(3);

        Future<Long> f = group.submit(() -> {
            System.out.println("task...");
            Thread.sleep(100_000);
            return 100L;
        });

        new Thread(() -> {
            try {
                Long ans = f.get();
                System.out.println("get..." + Thread.currentThread().getName() + " " + ans);
            } catch (InterruptedException | ExecutionException e) {
                throw new RuntimeException(e);
            } finally {
                latch.countDown();
            }
        }, "get").start();

        new Thread(() -> {
            try {
                Long ans = f.sync().getNow();
                System.out.println("sync..." + Thread.currentThread().getName() + " " + ans);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            } finally {
                latch.countDown();
            }
        }, "sync").start();

        new Thread(() -> {
            f.addListener(future -> {
                System.out.println("future..." + Thread.currentThread().getName() + " " + f.get());
                latch.countDown();
            });
        }, "listen").start();

        latch.await();
        group.shutdownGracefully();
    }
}

异步线程模型一定是依托于多线程实现的

提交任务的线程负责提交任务,有专门的线程去关注任务过程,对于结果而言就有两种方式获取

  • 提交任务的线程自己去取,但是不知道什么时候执行线程才执行结束,所以可以阻塞等待执行线程的结果
  • 任务提交线程不要干等,通过监听器的回调机制,执行线程负责执行过程,自然知道什么时候执行结束,所以主动权交给执行线程,等有结果了让执行线程按照监听器的定义处理结果

二 类图

三 任务提交流程

// AbstractExecutorService.java
public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }

我们要关注ftask的实现类型是什么

// AbstractEventExecutor.java
@Override
    protected final <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
        return new PromiseTask<T>(this, callable); // 提交给EventLoop线程的任务被封装称PromiseTask实现
    }

四 sync阻塞等待

// Demo.java
Long ans = f.get();
// DefaultPromise.java
@Override
    public V get() throws InterruptedException, ExecutionException {
        Object result = this.result;
        if (!isDone0(result)) { // 异步任务执行完了就直接返回
            this.await(); // 异步任务还没执行完 取结果的线程需要阻塞等待异步结果的到来
            result = this.result; // 阻塞等待异步结果的线程被唤醒了 说明异步线程已经将执行结果放到了result阈上
        }
        if (result == SUCCESS || result == UNCANCELLABLE) {
            return null;
        }
        Throwable cause = cause0(result);
        if (cause == null) {
            return (V) result;
        }
        if (cause instanceof CancellationException) {
            throw (CancellationException) cause;
        }
        throw new ExecutionException(cause);
    }

@Override
    public Promise<V> await() throws InterruptedException {
        if (this.isDone()) { // 异步任务已经执行结束了 直接返回
            return this;
        }

        if (Thread.interrupted()) {
            throw new InterruptedException(toString());
        }

        checkDeadLock(); // 避免EventLoop线程自锁

        synchronized (this) { // 多线程获取同一个异步任务的执行结果
            while (!isDone()) {
                incWaiters(); // 阻塞获取异步任务的线程计数
                try {
                    wait(); // 阻塞住线程等待被notify唤醒 获取异步结果的线程释放了管程锁 进入了当前promise的阻塞列表
                } finally {
                    decWaiters(); // 阻塞在promise上的线程会唤醒 说明异步结果已经被异步线程放回了promise 更新阻塞获取异步结果的线程数量
                }
            }
        }
        return this;
    }

现在获取任务的线程已经阻塞了,只能等待异步线程执行完任务之后,通过notify或者notifyAll唤醒这个阻塞线程了

// PromiseTask.java
@Override
    public void run() {
        try {
            if (setUncancellableInternal()) { // 设置任务不可取消
                V result = runTask(); // 任务执行结果
                /**
                 * 将异步结果设置到DefaultPromise的result阈上
                 * 后置动作
                 *     - 唤醒所有阻塞在等待异步结果上的线程
                 *     - 执行监听器的回调
                 */
                setSuccessInternal(result);
            }
        } catch (Throwable e) {
            setFailureInternal(e);
        }
    }

protected final Promise<V> setSuccessInternal(V result) {
        /**
         * 将异步结果设置到DefaultPromise的result阈上
         * 后置动作
         *     - 唤醒所有阻塞在等待异步结果上的线程
         *     - 执行监听器的回调
         */
        super.setSuccess(result);
        clearTaskAfterCompletion(true, COMPLETED);
        return this;
    }
// DefaultPromise.java
/**
     * 将异步结果设置到DefaultPromise的result阈上
     * 后置动作
     *     - 唤醒所有阻塞在等待异步结果上的线程
     *     - 执行监听器的回调
     */
    @Override
    public Promise<V> setSuccess(V result) {
        if (this.setSuccess0(result)) {
            return this;
        }
        throw new IllegalStateException("complete already: " + this);
    }

/**
     * 将异步结果设置到DefaultPromise的result阈上
     * 后置动作
     *     - 唤醒所有阻塞在等待异步结果上的线程
     *     - 执行监听器的回调
     */
    private boolean setSuccess0(V result) {
        return this.setValue0(result == null ? SUCCESS : result); // CAS方式将异步任务结果设置到result阈上
    }

/**
     * 将异步结果设置到DefaultPromise的result阈上
     * 后置动作
     *     - 唤醒所有阻塞在等待异步结果上的线程
     *     - 执行监听器的回调
     */
    private boolean setValue0(Object objResult) { // 设置好值然后执行监听者的回调方法
        if (RESULT_UPDATER.compareAndSet(this, null, objResult) ||
            RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) { // CAS将异步结果设置到result阈上
            if (checkNotifyWaiters()) { // 唤醒所有阻塞等待异步结果的线程
                this.notifyListeners(); // 如果还有监听器 执行监听器的回调
            }
            return true;
        }
        return false;
    }

唤醒阻塞的线程

// DefaultPromise.java
private synchronized boolean checkNotifyWaiters() {
        if (waiters > 0) { // 阻塞等待异步结果的线程数量
            notifyAll(); // 唤醒所有阻塞等待异步结果的线程
        }
        return listeners != null;
    }

五 监听器回调

// DefaultPromise.java
private void notifyListeners() {
        EventExecutor executor = executor();
        // 线程切换 确保回调监听器的线程就是执行异步任务的线程
        if (executor.inEventLoop()) {
            final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();
            final int stackDepth = threadLocals.futureListenerStackDepth();
            if (stackDepth < MAX_LISTENER_STACK_DEPTH) {
                threadLocals.setFutureListenerStackDepth(stackDepth + 1);
                try {
                    notifyListenersNow();
                } finally {
                    threadLocals.setFutureListenerStackDepth(stackDepth);
                }
                return;
            }
        }

        safeExecute(executor, new Runnable() {
            @Override
            public void run() {
                notifyListenersNow();
            }
        });
    }

private void notifyListenersNow() {
        Object listeners;
        synchronized (this) {
            // Only proceed if there are listeners to notify and we are not already notifying listeners.
            if (notifyingListeners || this.listeners == null) {
                return;
            }
            notifyingListeners = true;
            listeners = this.listeners;
            this.listeners = null;
        }
        for (;;) { // 回调监听器
            if (listeners instanceof DefaultFutureListeners) {
                notifyListeners0((DefaultFutureListeners) listeners);
            } else {
                notifyListener0(this, (GenericFutureListener<?>) listeners);
            }
            synchronized (this) {
                if (this.listeners == null) {
                    // Nothing can throw from within this method, so setting notifyingListeners back to false does not
                    // need to be in a finally block.
                    notifyingListeners = false;
                    return;
                }
                listeners = this.listeners;
                this.listeners = null;
            }
        }
    }

private void notifyListeners0(DefaultFutureListeners listeners) {
        GenericFutureListener<?>[] a = listeners.listeners();
        int size = listeners.size();
        for (int i = 0; i < size; i ++) {
            notifyListener0(this, a[i]);
        }
    }

private static void notifyListener0(Future future, GenericFutureListener l) {
        try {
            l.operationComplete(future); // 回调执行监听器的operationComplete方法 这个方法是放置监听器的线程自定义的
        } catch (Throwable t) {
            if (logger.isWarnEnabled()) {
                logger.warn("An exception was thrown by " + l.getClass().getName() + ".operationComplete()", t);
            }
        }
    }

标签:异步,return,Netty,ChannelFuture,listeners,源码,线程,result,监听器
From: https://www.cnblogs.com/miss-u/p/16897742.html

相关文章

  • Netty源码-01-NioEventLoopGroup
    一定义摘自源码JavaDoc/***The{@linkEventExecutorGroup}isresponsibleforprovidingthe{@linkEventExecutor}'stouse*viaits{@link#next()}method......
  • Netty源码-02-FastThreadLocalThread
    一DemopublicclassFastThreadLocalTest00{privatefinalstaticFastThreadLocal<Long>v=newFastThreadLocal<Long>(){@Overrideprotec......
  • apache启动遇到phpinfo只显示源码问题
    在安装php和apache的时候,会遇到只显示源码的问题网上找了好多帖子都是在改php.ini的东西,但是改了半天也不对,发现我安装的wordpress目录也打不开,所以我认为这就是apache服......
  • python源码通过词语标记化器tokenize提取注释并正则匹配测试用例作者名
    提取代码如下importtokenizeimportrewithtokenize.open('readcomment.py')asf:list=[]fortoktype,tok,start,end,lineintokenize.generate_t......
  • SpringBoot源码解析
    1.创建SpringApplication对象1.1webApplicationType设置(SERVLET)1.2setInitializers设置通过getSpringFactoriesInstances方法从META-INF/spring.factories下获取k......
  • (笔者推荐)【Java权威指南】「官方文档-中英互译」AQS的源码注释分析,贯穿总体核心流程
    前提说明本文主要针对于Java官方文档中的先关的官方注释进行中英文互译,保证了源码坐着的设计思路以及相关知识技能介绍分析等,本文主要进行介绍AQS的源码官方注释的含义介绍,......
  • vue源码分析-插槽原理
    Vue组件的另一个重要概念是插槽,它允许你以一种不同于严格的父子关系的方式组合组件。插槽为你提供了一个将内容放置到新位置或使组件更通用的出口。这一节将围绕官网对插......
  • Spring源码整体脉络
    BeanFactory:Spring顶层核心接口,使用了简单工厂模式,负责生产Bean。BeanDefinition:Spring顶层核心接口,封装了生产Bean的一切原料。从读取配置到扫描到注册bean,主要用到......
  • @PreDestroy与@PostConstruct关键源码实现
    CommonAnnotationBeanPostProcessor(继承InitDestroyAnnotationBeanPostProcessor)在InitDestroyAnnotationBeanPostProcessor中,buildLifecycleMetadata方法会将目标类中......
  • MySQL 源码解读之-语法解析(三)
    MySQL源码解读之-语法解析(三)在前两篇文章中已经讲述了bison如何解析sql语句并生成AST树。那么MySQL是如何和bison的程序关联起来的呢,并通过gdb调试一下。在MyS......