首页 > 编程语言 >并发编程-CompletableFuture解析 | 京东物流技术团队

并发编程-CompletableFuture解析 | 京东物流技术团队

时间:2023-07-30 10:01:47浏览次数:37  
标签:执行 return 编程 任务 CompletableFuture 京东 null public

1、CompletableFuture介绍

CompletableFuture对象是JDK1.8版本新引入的类,这个类实现了两个接口,一个是Future接口,一个是CompletionStage接口。

CompletionStage接口是JDK1.8版本提供的接口,用于异步执行中的阶段处理,CompletionStage定义了一组接口用于在一个阶段执行结束之后,要么继续执行下一个阶段,要么对结果进行转换产生新的结果等,一般来说要执行下一个阶段都需要上一个阶段正常完成,这个类也提供了对异常结果的处理接口

2、CompletableFuture的API

2.1 提交任务

在CompletableFuture中提交任务有以下几种方式:

public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)

这四个方法都是用来提交任务的,不同的是supplyAsync提交的任务有返回值,runAsync提交的任务没有返回值。两个接口都有一个重载的方法,第二个入参为指定的线程池,如果不指定,则默认使用ForkJoinPool.commonPool()线程池。在使用的过程中尽量根据不同的业务来指定不同的线程池,方便对不同线程池进行监控,同时避免业务共用线程池相互影响。

2.2 结果转换

2.2.1 thenApply
public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)

thenApply这一组函数入参是Function,意思是将上一个CompletableFuture执行结果作为入参,再次进行转换或者计算,重新返回一个新的值。

2.2.2 handle
public <U> CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn)
public <U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn)
public <U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn, Executor executor)

handle这一组函数入参是BiFunction,该函数式接口有两个入参一个返回值,意思是处理上一个CompletableFuture的处理结果,同时如果有异常,需要手动处理异常。

2.2.3 thenRun
public CompletableFuture<Void> thenRun(Runnable action)
public CompletableFuture<Void> thenRunAsync(Runnable action)
public CompletableFuture<Void> thenRunAsync(Runnable action, Executor executor)

thenRun这一组函数入参是Runnable函数式接口,该接口无需入参和出参,这一组函数是在上一个CompletableFuture任务执行完成后,在执行另外一个接口,不需要上一个任务的结果,也不需要返回值,只需要在上一个任务执行完成后执行即可。

2.2.4 thenAccept
public CompletableFuture<Void> thenAccept(Consumer<? super T> action)
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action)
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action, Executor executor)

thenAccept这一组函数的入参是Consumer,该函数式接口有一个入参,没有返回值,所以这一组接口的意思是处理上一个CompletableFuture的处理结果,但是不返回结果。

2.2.5 thenAcceptBoth
public <U> CompletableFuture<Void> thenAcceptBoth(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action)
public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action)
public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action, Executor executor)

thenAcceptBoth这一组函数入参包括CompletionStage以及BiConsumer,CompletionStage是JDK1.8新增的接口,在JDK中只有一个实现类:CompletableFuture,所以第一个入参就是CompletableFuture,这一组函数是用来接受两个CompletableFuture的返回值,并将其组合到一起。BiConsumer这个函数式接口有两个入参,并且没有返回值,BiConsumer的第一个入参就是调用方CompletableFuture的执行结果,第二个入参就是thenAcceptBoth接口入参的CompletableFuture的执行结果。所以这一组函数意思是将两个CompletableFuture执行结果合并到一起。

2.2.6 thenCombine
public <U,V> CompletableFuture<V> thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn, Executor executor)

thenCombine这一组函数和thenAcceptBoth类似,入参都包含一个CompletionStage,也就是CompletableFuture对象,意思也是组合两个CompletableFuture的执行结果,不同的是thenCombine的第二个入参为BiFunction,该函数式接口有两个入参,同时有一个返回值。所以与thenAcceptBoth不同的是,thenCombine将两个任务结果合并后会返回一个全新的值作为出参。

2.2.7 thenCompose
public <U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn)
public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn)
public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn, Executor executor)

thenCompose这一组函数意思是将调用方的执行结果作为Function函数的入参,同时返回一个新的CompletableFuture对象。

2.3 回调方法

public CompletableFuture<T> whenComplete(BiConsumer<? super T, ? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action, Executor executor)

whenComplete方法意思是当上一个CompletableFuture对象任务执行完成后执行该方法。BiConsumer函数式接口有两个入参没有返回值,这两个入参第一个是CompletableFuture任务的执行结果,第二个是异常信息。表示处理上一个任务的结果,如果有异常,则需要手动处理异常,与handle方法的区别在于,handle方法的BiFunction是有返回值的,而BiConsumer是没有返回值的。

以上方法都有一个带有Async的方法,带有Async的方法表示是异步执行的,会将该任务放到线程池中执行,同时该方法会有一个重载的方法,最后一个参数为Executor,表示异步执行可以指定线程池执行。为了方便进行控制,最好在使用CompletableFuture时手动指定我们的线程池。

2.4 异常处理

public CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn)

exceptionally是用来处理异常的,当任务抛出异常后,可以通过exceptionally来进行处理,也可以选择使用handle来进行处理,不过两者有些不同,hand是用来处理上一个任务的结果,如果有异常情况,就处理异常。而exceptionally可以放在CompletableFuture处理的最后,作为兜底逻辑来处理未知异常。

2.5 获取结果

public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)
public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)

allOf是需要入参中所有的CompletableFuture任务执行完成,才会进行下一步;

anyOf是入参中任何一个CompletableFuture任务执行完成都可以执行下一步。

public T get() throws InterruptedException, ExecutionException
public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException
public T getNow(T valueIfAbsent)
public T join()

get方法一个是不带超时时间的,一个是带有超时时间的。

getNow方法则是立即返回结果,如果还没有结果,则返回默认值,也就是该方法的入参。

join方法是不带超时时间的等待任务完成。

3、CompletableFuture原理

join方法同样表示获取结果,但是join与get方法有什么区别呢。

public T join() {
    Object r;
    return reportJoin((r = result) == null ? waitingGet(false) : r);
}

public T get() throws InterruptedException, ExecutionException {
    Object r;
    return reportGet((r = result) == null ? waitingGet(true) : r);
}

public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
        Object r;
        long nanos = unit.toNanos(timeout);
        return reportGet((r = result) == null ? timedGet(nanos) : r);
}

public T getNow(T valueIfAbsent) {
        Object r;
        return ((r = result) == null) ? valueIfAbsent : reportJoin(r);
}

以上是CompletableFuture类中两个方法的代码,可以看到两个方法几乎一样。区别在于reportJoin/reportGet,waitingGet方法是一致的,只不过参数不一样,我们在看下reportGet与reportJoin方法。

private static <T> T reportGet(Object r)
        throws InterruptedException, ExecutionException {
        if (r == null) // by convention below, null means interrupted
            throw new InterruptedException();
        if (r instanceof AltResult) {
            Throwable x, cause;
            if ((x = ((AltResult)r).ex) == null)
                return null;
            if (x instanceof CancellationException)
                throw (CancellationException)x;
            if ((x instanceof CompletionException) &&
                (cause = x.getCause()) != null)
                x = cause;
            throw new ExecutionException(x);
        }
        @SuppressWarnings("unchecked") T t = (T) r;
        return t;
    }
private static <T> T reportJoin(Object r) {
        if (r instanceof AltResult) {
            Throwable x;
            if ((x = ((AltResult)r).ex) == null)
                return null;
            if (x instanceof CancellationException)
                throw (CancellationException)x;
            if (x instanceof CompletionException)
                throw (CompletionException)x;
            throw new CompletionException(x);
        }
        @SuppressWarnings("unchecked") T t = (T) r;
        return t;
    }

可以看到这两个方法很相近,reportGet方法判断了r对象是否为空,并抛出了中断异常,而reportJoin方法没有判断,同时reportJoin抛出的都是运行时异常,所以join方法也是无需手动捕获异常的。

我们在看下waitingGet方法

private Object waitingGet(boolean interruptible) {
        Signaller q = null;
        boolean queued = false;
        int spins = -1;
        Object r;
        while ((r = result) == null) {
            if (spins < 0)
                spins = SPINS;
            else if (spins > 0) {
                if (ThreadLocalRandom.nextSecondarySeed() >= 0)
                    --spins;
            }
            else if (q == null)
                q = new Signaller(interruptible, 0L, 0L);
            else if (!queued)
                queued = tryPushStack(q);
            else if (interruptible && q.interruptControl < 0) {
                q.thread = null;
                cleanStack();
                return null;
            }
            else if (q.thread != null && result == null) {
                try {
                    ForkJoinPool.managedBlock(q);
                } catch (InterruptedException ie) {
                    q.interruptControl = -1;
                }
            }
        }
        if (q != null) {
            q.thread = null;
            if (q.interruptControl < 0) {
                if (interruptible)
                    r = null; // report interruption
                else
                    Thread.currentThread().interrupt();
            }
        }
        postComplete();
        return r;
    }

该waitingGet方法是通过while的方式循环判断是否任务已经完成并产生结果,如果结果为空,则会一直在这里循环,这里需要注意的是在这里初始化了一下spins=-1,当第一次进入while循环的时候,spins是-1,这时会将spins赋值为一个常量,该常量为SPINS。

private static final int SPINS = (Runtime.getRuntime().availableProcessors() > 1 ?
                                      1 << 8 : 0);

这里判断可用CPU数是否大于1,如果大于1,则该常量为 1<< 8,也就是256,否则该常量为0。

第二次进入while循环的时候,spins是256大于0,这里做了减一的操作,下次进入while循环,如果还没有结果,依然是大于0继续做减一的操作,此处用来做短时间的自旋等待结果,只有当spins等于0,后续会进入正常流程判断。

我们在看下timedGet方法的源码

private Object timedGet(long nanos) throws TimeoutException {
        if (Thread.interrupted())
            return null;
        if (nanos <= 0L)
            throw new TimeoutException();
        long d = System.nanoTime() + nanos;
        Signaller q = new Signaller(true, nanos, d == 0L ? 1L : d); // avoid 0
        boolean queued = false;
        Object r;
        // We intentionally don't spin here (as waitingGet does) because
        // the call to nanoTime() above acts much like a spin.
        while ((r = result) == null) {
            if (!queued)
                queued = tryPushStack(q);
            else if (q.interruptControl < 0 || q.nanos <= 0L) {
                q.thread = null;
                cleanStack();
                if (q.interruptControl < 0)
                    return null;
                throw new TimeoutException();
            }
            else if (q.thread != null && result == null) {
                try {
                    ForkJoinPool.managedBlock(q);
                } catch (InterruptedException ie) {
                    q.interruptControl = -1;
                }
            }
        }
        if (q.interruptControl < 0)
            r = null;
        q.thread = null;
        postComplete();
        return r;
    }

timedGet方法依然是通过while循环的方式来判断是否已经完成,timedGet方法入参为一个纳秒值,并通过该值计算出一个deadline截止时间,当while循环还未获取到任务结果且已经达到截止时间,则抛出一个TimeoutException异常。

4、CompletableFuture实现多线程任务

这里我们通过CompletableFuture来实现一个多线程处理异步任务的例子。

这里我们创建10个任务提交到我们指定的线程池中执行,并等待这10个任务全部执行完毕。

每个任务的执行流程为第一次先执行加法,第二次执行乘法,如果发生异常则返回默认值,当10个任务执行完成后依次打印每个任务的结果。

public void demo() throws InterruptedException, ExecutionException, TimeoutException {
        // 1、自定义线程池
        ExecutorService executorService = new ThreadPoolExecutor(5, 10,
                60L, TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(100));

        // 2、集合保存future对象
        List<CompletableFuture<Integer>> futures = new ArrayList<>(10);
        for (int i = 0; i < 10; i++) {
            int finalI = i;
            CompletableFuture<Integer> future = CompletableFuture
                    // 提交任务到指定线程池
                    .supplyAsync(() -> this.addValue(finalI), executorService)
                    // 第一个任务执行结果在此处进行处理
                    .thenApplyAsync(k -> this.plusValue(finalI, k), executorService)
                    // 任务执行异常时处理异常并返回默认值
                    .exceptionally(e -> this.defaultValue(finalI, e));
            // future对象添加到集合中
            futures.add(future);
        }

        // 3、等待所有任务执行完成,此处最好加超时时间
        CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get(5, TimeUnit.MINUTES);
        for (CompletableFuture<Integer> future : futures) {
            Integer num = future.get();
            System.out.println("任务执行结果为:" + num);
        }
        System.out.println("任务全部执行完成!");
    }

    private Integer addValue(Integer index) {
        System.out.println("第" + index + "个任务第一次执行");
        if (index == 3) {
            int value = index / 0;
        }
        return index + 3;
    }

    private Integer plusValue(Integer index, Integer num) {
        System.out.println("第" + index + "个任务第二次执行,上次执行结果:" + num);
        return num * 10;
    }

    private Integer defaultValue(Integer index, Throwable e) {
        System.out.println("第" + index + "个任务执行异常!" + e.getMessage());
        e.printStackTrace();
        return 10;
    }

作者:京东物流 丁冬

来源:京东云开发者社区 自猿其说Tech

标签:执行,return,编程,任务,CompletableFuture,京东,null,public
From: https://blog.51cto.com/u_15714439/6898786

相关文章

  • 三个编程思想:面向对象编程、面向接口编程、面向过程编程【概念解析系列_1】【C# 基础
    〇、前言对于.Net中的编程思想还是十分重要的,也是编码出高效的程序的基础!在使用之前了解其本质,那么用起来就游刃有余。下面来简单对比下三个编程思想,看下它们都是什么,它们之间又有什么关系。一、对象、接口、过程稍等,在介绍主角之前呐,先来了解下它们思想的基础是怎么回事。......
  • 埃斯顿机器人在线编程
    1,设置电脑IP与机器人控制器LAN2口为同一网段; 2,打开,点击连接;   3,下载和上载程序   4,如果想通过电脑控制埃斯顿机器人点动,需要下载单独的示教器demo,且官网下载不到; ......
  • 笔记|《面向对象编程技术与方法(C++)》电子工业出版社
    第一章概述C++多态:https://blog.csdn.net/K346K346/article/details/82774937第二章编程基础数据类型枚举:https://www.runoob.com/w3cnote/cpp-enum-intro.html联合:https://www.runoob.com/cprogramming/c-unions.html作用域运算符:c++入门学习篇(1)之::作用域符解析c++条......
  • 一位大咖写给软件编程新手的建议 - 经验谈
    今天逛微信公众号的时候看到一个文,关于给软件新手建议的事情。看了之后有一点想法,然后就思考了下,觉得有必要记录一下自己的建议,所以就有了此博文。这里提一下,笔者自己非计算机专业,但是在学校的时候就对编程感兴趣,毕业后也从事编程工作,累积了一定的经验。按笔者的理解,不管......
  • 面向对象编程的 SOLID 原则 - 里氏替换原则
    里氏替换原则里氏替换原则描述的是子类应该能替换为它的基类。意思是,给定classB是classA的子类,在预期传入classA的对象的任何方法传入classB的对象,方法都不应该有异常。这是一个预期的行为,因为继承假定子类继承了父类的一切。子类可以扩展行为但不会收窄。因此,当......
  • 面向对象编程的 SOLID 原则 - 依赖倒置原则
    依赖倒置原则依赖倒置原则描述的是我们的class应该依赖接口和抽象类而不是具体的类和函数。在这篇文章(2000)里,Bob大叔如下总结该原则:“如果OCP声明了OO体系结构的目标,那么DIP则声明了主要机制”。这两个原则的确息息相关,我们在讨论开闭原则之前也要用到这一模式。......
  • 面向对象编程的 SOLID 原则 - 接口隔离原则
    接口隔离原则隔离意味着保持独立,接口隔离原则是关于接口的独立。该原则描述了很多客户端特定的接口优于一个多用途接口。客户端不应该强制实现他们不需要的函数。这是一个简单的原则,很好理解和实践,直接看例子。publicinterfaceParkingLot{ voidparkCar(); //Decrease......
  • 面向对象编程的 SOLID 原则
    SOLID原则是面向对象class设计的五条原则。他们是设计class结构时应该遵守的准则和最佳实践。通常,这五个原则可以帮助我们了解设计模式和软件架构。这是每个开发人员都应该了解的主题。这篇文章介绍了在项目中使用SOLID原则的细节。首先我们先看一下SOLID原则的历史。......
  • Python面向对象编程-学习笔记(二)
    5.类的继承classEmployee:raise_amount=1.04def__init__(self,first,last,pay):self.first=firstself.last=lastself.pay=payself.email=first+'.'+last+'@company.com'cla......
  • 面向对象编程的 SOLID 原则 - 开闭原则
    开闭原则开闭原则要求“class应该对扩展开放对修改关闭”。修改意味着修改存在class的代码,扩展意味着添加新的功能。这个原则想要表达的是:我们应该能在不动class已经存在代码的前提下添加新的功能。这是因为当我们修改存在的代码时,我们就面临着创建潜在bug的风险。因此,......