首页 > 其他分享 >CompletableFuture使用详解

CompletableFuture使用详解

时间:2023-06-21 11:22:25浏览次数:58  
标签:java util concurrent 详解 CompletableFuture 使用 import public

一、介绍

简单的任务,用Future获取结果还好,但我们并行提交的多个异步任务,往往并不是独立的,很多时候业务逻辑处理存在串行[依赖]、并行、聚合的关系。如果要我们手动用 Fueture 实现,是非常麻烦的。

CompletableFuture是Future接口的扩展和增强。CompletableFuture实现了Future接口,并在此基础上进行了丰富地扩展,完美地弥补了Future上述的种种问题。

更为重要的是,CompletableFuture实现了对任务的编排能力。借助这项能力,我们可以轻松地组织不同任务的运行顺序、规则以及方式。从某种程度上说,这项能力是它的核心能力。而在以往,虽然通过CountDownLatch等工具类也可以实现任务的编排,但需要复杂的逻辑处理,不仅耗费精力且难以维护。

 CompletionStage接口: 执行某一个阶段,可向下执行后续阶段。异步执行,默认线程池是ForkJoinPool.commonPool()

二、应用场景

1、描述依赖关系

thenApply() 把前面异步任务的结果,交给后面的Function

thenCompose()用来连接两个有依赖关系的任务,结果由第二个任务返回

2、描述and聚合关系

thenCombine 任务合并,有返回值

thenAccepetBoth 两个任务执行完成后,将结果交给thenAccepetBoth消耗,无返回值

runAfterBoth 两个任务都执行完成后,执行下一步操作(Runnable)

3、描述or聚合关系

applyToEither 两个任务谁执行的快,就使用那一个结果,有返回值

acceptEither 两个任务谁执行的快,就消耗那一个结果,无返回值

runAfterEither 任意一个任务执行完成,进行下一步操作(Runnable)

4、并行执行

CompletableFuture类自己也提供了anyOf()和allOf()用于支持多个CompletableFuture并行执行。

三、常用操作

1、创建异步操作

CompletableFuture 提供了四个静态方法来创建一个异步操作:

public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)

这四个方法区别在于:

(1)、runAsync 方法以Runnable函数式接口类型为参数,没有返回结果,supplyAsync 方法以Supplier函数式接口类型为参数,返回结果类型为U;Supplier 接口的 get() 方法是有返回值的(会阻塞)

(2)、没有指定Executor的方法会使用ForkJoinPool.commonPool() 作为它的线程池执行异步代码。如果指定线程池,则使用指定的线程池运行。

(3)、默认情况下 CompletableFuture 会使用公共的 ForkJoinPool 线程池,这个线程池默认创建的线程数是 CPU 的核数(也可以通过 JVM option:-Djava.util.concurrent.ForkJoinPool.common.parallelism 来设置 ForkJoinPool 线程池的线程数)。如果所有 CompletableFuture 共享一个线程池,那么一旦有任务执行一些很慢的 I/O 操作,就会导致线程池中所有线程都阻塞在 I/O 操作上,从而造成线程饥饿,进而影响整个系统的性能。所以,强烈建议你要根据不同的业务类型创建不同的线程池,以避免互相干扰。

案例1:

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

public class Test1 {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        Runnable runnable = ()-> {
            try {
                TimeUnit.SECONDS.sleep(3);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("[" + Thread.currentThread().getName() + "]" + "执行无返回结果的异步任务");
        };
        CompletableFuture.runAsync(runnable); // 创建异步操作

        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            System.out.println("[" + Thread.currentThread().getName() + "]" + "执行有返回值的异步任务...");
            try {
                TimeUnit.SECONDS.sleep(5);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "[" + Thread.currentThread().getName() + "]" + "Hello World";
        });

        String s = future.get();
        System.out.println(s);
    }
}

结果如下:

[ForkJoinPool.commonPool-worker-2]执行有返回值的异步任务...
[ForkJoinPool.commonPool-worker-1]执行无返回结果的异步任务
[ForkJoinPool.commonPool-worker-2]Hello World

2、获取结果

join()和get()方法都是用来获取CompletableFuture异步之后的返回值。

  • join() 方法抛出的是uncheck异常(即未经检查的异常),不会强制开发者抛出。
  • get() 方法抛出的是经过检查的异常,ExecutionException。

InterruptedException 需要用户手动处理(抛出或者 try catch)。

 

3、结果处理

当CompletableFuture的计算结果完成,或者抛出异常的时候,我们可以执行特定的 Action。主要是下面的方法:

public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)

(1)、Action的类型是BiConsumer<? super T,? super Throwable>,它可以处理正常的计算结果,或者异常情况。

(2)、方法不以Async结尾,意味着Action使用相同的线程执行,而Async可能会使用其它的线程去执行(如果使用相同的线程池,也可能会被同一个线程选中执行)。

(3)、这几个方法都会返回CompletableFuture,当Action执行完毕后它的结果返回原始的CompletableFuture的计算结果或者返回异常。

案例2:

import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Function;

public class Test2 {
    public static void main(String[] args) throws InterruptedException {
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            if (new Random().nextInt(10) % 2 == 0) {
                int i = 12 / 0;
            }

            System.out.println("执行结束!");
            return "test";
        });

        future.whenComplete(new BiConsumer<String, Throwable>() {
            @Override
            public void accept(String s, Throwable throwable) {
                System.out.println(s + "执行完成!");
            }
        });

        future.exceptionally(new Function<Throwable, String>() {
            @Override
            public String apply(Throwable throwable) {
                System.out.println("执行失败:" + throwable.getMessage());
                return "异常xxxx";
            }
        });
        TimeUnit.SECONDS.sleep(3);//线程阻塞的方法
    }
}

正常执行的结果:

执行结束!
test执行完成!

异常执行的结果:

执行失败:java.lang.ArithmeticException: / by zero
null执行完成!

可以改造成如下所示:

import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Function;

public class Test3 {
    public static void main(String[] args) throws InterruptedException {
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            if (new Random().nextInt(10) % 2 == 0) {
                int i = 12 / 0;
            }

            System.out.println("执行结束!");
            return "test";
        }).whenComplete(new BiConsumer<String, Throwable>() {
            @Override
            public void accept(String s, Throwable throwable) {
                System.out.println(s + "执行完成!");
            }
        }).exceptionally(new Function<Throwable, String>() {
            @Override
            public String apply(Throwable throwable) {
                System.out.println("执行失败:" + throwable.getMessage());
                return "异常xxxx";
            }
        });
        TimeUnit.SECONDS.sleep(3);//线程阻塞的方法
    }
}

4、结果转换

所谓结果转换,就是将上一段任务的执行结果作为下一阶段任务的入参参与重新计算,产生新的结果。

(1)、thenApply

thenApply 接收一个函数作为参数,使用该函数处理上一个CompletableFuture 调用的结果,并返回一个具有处理结果的Future对象。

public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)

 

 

 

案例3:

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

public class Test4 {
    public static void main(String[] args) throws InterruptedException {
        CompletableFuture.supplyAsync(() -> {
            int result = 100;
            System.out.println("一阶段:" + result);

            return result;
        }).thenApply(number -> { // 这里的number是上个方法返回的结果
            int result = number * 3;
            System.out.println("二阶段:" + result);
            return result;
        });

        TimeUnit.SECONDS.sleep(3);//线程阻塞的方法
    }
}

结果如下:

一阶段:100
二阶段:300

(2)、thenCompose

thenCompose 的参数为一个返回 CompletableFuture 实例的函数,该函数的参数是先前计算步骤的结果。

public <U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn);
public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn) ;

 

案例四

import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.Supplier;
public class Test5 {
    public static void main(String[] args) throws InterruptedException {
        CompletableFuture.supplyAsync(() -> {
            int number = new Random().nextInt(30);
            System.out.println("第一阶段:" + number);
            return number;
        }).thenCompose(new Function<Integer, CompletionStage<Integer>>() {
            @Override
            public CompletionStage<Integer> apply(Integer number) {
                return CompletableFuture.supplyAsync(new Supplier<Integer>() {

                    @Override
                    public Integer get() {
                        int number2 = number * 2;
                        System.out.println("第二阶段:" + number2);
                        return number2;
                    }
                });
            }
        });
        TimeUnit.SECONDS.sleep(3);//线程阻塞的方法
    }
}

结果如下:

第一阶段:12
第二阶段:24

thenApply 和 thenCompose的区别

(1)、thenApply 转换的是泛型中的类型,返回的是同一个CompletableFuture;

(2)、thenCompose 将内部的 CompletableFuture 调用展开来并使用上一个CompletableFutre 调用的结果在下一步的 CompletableFuture 调用中进行运算,是生成一个新的CompletableFuture。

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
public class Test6 {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello");
        CompletableFuture<String> result1 = future.thenApply(param -> param + " World!");

        CompletableFuture<String> result2 =
                future.thenCompose(param -> CompletableFuture.supplyAsync(() -> param + " World!"));

        System.out.println(result1.get());
        System.out.println(result2.get());

        TimeUnit.SECONDS.sleep(3);//线程阻塞的方法
    }
}

结果如下:

Hello World!
Hello World!

 

5、结果消费

与结果处理和结果转换系列函数返回一个新的 CompletableFuture 不同,结果消费系列函数只对结果执行Action,而不返回新的计算值。

 

根据对结果的处理方式,结果消费函数又分为:

  • thenAccept系列:对单个结果进行消费
  • thenAcceptBoth系列:对两个结果进行消费
  • thenRun系列:不关心结果,只对结果执行Action

 

(1)、thenAccept

通过观察该系列函数的参数类型可知,它们是函数式接口Consumer,这个接口只有输入,没有返回值。

 

public CompletionStage<Void> thenAccept(Consumer<? super T> action);
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action);

(2)、thenAcceptBoth

thenAcceptBoth 函数的作用是,当两个 CompletionStage 都正常完成计算的时候,就会执行提供的action消费两个异步的结果。

public <U> CompletionStage<Void> thenAcceptBoth(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action);
public <U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action);

案例

import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
public class Test8 {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Integer> futrue1 = CompletableFuture.supplyAsync(() -> {
            int number1 = new Random().nextInt(3) + 1;
            try {
                TimeUnit.SECONDS.sleep(number1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("第一阶段:" + number1);
            return number1;
        });

        CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
            int number2 = new Random().nextInt(3) + 1;
            try {
                TimeUnit.SECONDS.sleep(number2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("第二阶段:" + number2);
            return number2;
        });

        futrue1.thenAcceptBoth(future2, (number1, number2) -> System.out.println("最终结果:" + (number1 + number2)));

        TimeUnit.SECONDS.sleep(3);//线程阻塞的方法
    }
}

结果如下:

第二阶段:1
第一阶段:1
最终结果:2

 

(3)、thenRun

thenRun 也是对线程任务结果的一种消费函数,与thenAccept不同的是,thenRun 会在上一阶段 CompletableFuture 计算完成的时候执行一个Runnable,Runnable并不使用该 CompletableFuture 计算的结果。

public CompletionStage<Void> thenRun(Runnable action);
public CompletionStage<Void> thenRunAsync(Runnable action);

案例

import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

public class Test9 {
    public static void main(String[] args) throws InterruptedException {
        CompletableFuture.supplyAsync(()-> {
            int number = new Random().nextInt(10);
            System.out.println("第一阶段:" + number);
            return number;
        }).thenRun(()-> System.out.println("thenRun()执行..."));

        TimeUnit.SECONDS.sleep(3);//线程阻塞的方法

    }
}

 

6、结果组合

(1)、thenCombine

thenCombine 方法,合并两个线程任务的结果,并进一步处理。

public <U,V> CompletionStage<V> thenCombine(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);
public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);

案例

import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

public class Test10 {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Integer> future1 = CompletableFuture
                .supplyAsync(() -> {
                    int number1 = new Random().nextInt(10);
                    System.out.println("第一阶段:" + number1);
                    return number1;
                });

        CompletableFuture<Integer> future2 = CompletableFuture
                .supplyAsync(() -> {
                    int number2 = new Random().nextInt(10);
                    System.out.println("第二阶段:" + number2);
                    return number2;
                });

        CompletableFuture<Integer> result = future1
                .thenCombine(future2, (number1, number2) -> number1 + number2);
        System.out.println("最终结果:" + result.get());

        TimeUnit.SECONDS.sleep(3);//线程阻塞的方法
    }
}

结果:

第一阶段:5
第二阶段:9
最终结果:14

7、任务交互

所谓线程交互,是指将两个线程任务获取结果的速度相比较,按一定的规则进行下一步处理。

(1)、applyToEither

两个线程任务相比较,先获得执行结果的,就对该结果进行下一步的转化操作。

public <U> CompletionStage<U> applyToEither(CompletionStage<? extends T> other,Function<? super T, U> fn);
public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other,Function<? super T, U> fn);

案例

import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

public class Test11 {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Integer> future1 = CompletableFuture
                .supplyAsync(() -> {
                    int number = new Random().nextInt(10);
                    System.out.println("第一阶段start:" + number);
                    try {
                        TimeUnit.SECONDS.sleep(number);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("第一阶段end:" + number);
                    return number;
                });

        CompletableFuture<Integer> future2 = CompletableFuture
                .supplyAsync(() -> {
                    int number = new Random().nextInt(10);
                    System.out.println("第二阶段start:" + number);
                    try {
                        TimeUnit.SECONDS.sleep(number);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("第二阶段end:" + number);
                    return number;
                });

        future1.applyToEither(future2, number -> {
            System.out.println("最快结果:" + number);
            return number * 2;
        });
        TimeUnit.SECONDS.sleep(3);//线程阻塞的方法
    }
}

结果;

第一阶段start:2
第二阶段start:9
第一阶段end:2
最快结果:2

 

(2)、acceptEither

两个线程任务相比较,先获得执行结果的,就对该结果进行下一步的消费操作。

public CompletionStage<Void> acceptEither(CompletionStage<? extends T> other,Consumer<? super T> action);
public CompletionStage<Void> acceptEitherAsync(CompletionStage<? extends T> other,Consumer<? super T> action);

案例

import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

public class Test12 {
    private static final CountDownLatch countDownLatch = new CountDownLatch(1);
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Integer> future1 = CompletableFuture
                .supplyAsync(() -> {
                    int number = new Random().nextInt(10) + 1;
                    try {
                        TimeUnit.SECONDS.sleep(number);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("第一阶段:" + number);
                    return number;
                });

        CompletableFuture<Integer> future2 = CompletableFuture
                .supplyAsync(() -> {
                    int number = new Random().nextInt(10) + 1;
                    try {
                        TimeUnit.SECONDS.sleep(number);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("第二阶段:" + number);
                    return number;
                });

        future1.acceptEither(future2, number -> System.out.println("最快结果:" + number));
//        TimeUnit.SECONDS.sleep(3);//线程阻塞的方法
        countDownLatch.await();
    }

}

结果:

第一阶段:1
最快结果:1
第二阶段:9

(3)、runAfterEither

两个线程任务相比较,有任何一个执行完成,就进行下一步操作,不关心运行结果。

public CompletionStage<Void> runAfterEither(CompletionStage<?> other,Runnable action);
public CompletionStage<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action);

案例

import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

public class Test13 {
    private static final CountDownLatch countDownLatch = new CountDownLatch(1);
    public static void main(String[] args) throws ExecutionException, InterruptedException {

        CompletableFuture<Integer> future1 = CompletableFuture
                .supplyAsync(() -> {
                    int number = new Random().nextInt(5);
                    try {
                        TimeUnit.SECONDS.sleep(number);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("第一阶段:" + number);
                    return number;
                });

        CompletableFuture<Integer> future2 = CompletableFuture
                .supplyAsync(() -> {
                    int number = new Random().nextInt(5);
                    try {
                        TimeUnit.SECONDS.sleep(number);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("第二阶段:" + number);
                    return number;
                });

        future1.runAfterEither(future2, () -> System.out.println("已经有一个任务完成了")).join();

        countDownLatch.await();
    }
}

结果:

第一阶段:1
已经有一个任务完成了
第二阶段:4

 

(4)、runAfterBoth

两个线程任务相比较,两个全部执行完成,才进行下一步操作,不关心运行结果。

public CompletionStage<Void> runAfterBoth(CompletionStage<?> other,Runnable action);
public CompletionStage<Void> runAfterBothAsync(CompletionStage<?> other,Runnable action);

案例

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

public class Test14 {
    private static final CountDownLatch countDownLatch = new CountDownLatch(1);

    public static void main(String[] args) throws ExecutionException, InterruptedException {

        CompletableFuture<Integer> future1 = CompletableFuture
                .supplyAsync(() -> {
                    try {
                        TimeUnit.SECONDS.sleep(1);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("第一阶段:1");
                    return 1;
                });

        CompletableFuture<Integer> future2 = CompletableFuture
                .supplyAsync(() -> {
                    try {
                        TimeUnit.SECONDS.sleep(2);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("第二阶段:2");
                    return 2;
                });

        future1.runAfterBoth(future2, () -> System.out.println("上面两个任务都执行完成了。"));

        countDownLatch.await();
    }
}

(5)、anyOf

anyOf 方法的参数是多个给定的 CompletableFuture,当其中的任何一个完成时,方法返回这个 CompletableFuture。

public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)

案例:

import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

public class Test15 {
    private static final CountDownLatch countDownLatch = new CountDownLatch(1);
    public static void main(String[] args) throws ExecutionException, InterruptedException {

        Random random = new Random();
        CompletableFuture<String> future1 = CompletableFuture
                .supplyAsync(() -> {
                    try {
                        TimeUnit.SECONDS.sleep(random.nextInt(5));
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    return "hello";
                });

        CompletableFuture<String> future2 = CompletableFuture
                .supplyAsync(() -> {
                    try {
                        TimeUnit.SECONDS.sleep(random.nextInt(1));
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    return "world";
                });
        CompletableFuture<Object> result = CompletableFuture.anyOf(future1, future2);
        System.out.println(result.get());

        countDownLatch.await();
    }
}

结果:

world

 

(6)、allOf

allOf方法用来实现多 CompletableFuture 的同时返回。

public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)

案例

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

public class Test16 {
    private static final CountDownLatch countDownLatch = new CountDownLatch(1);
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> future1 = CompletableFuture
                .supplyAsync(() -> {
                    try {
                        TimeUnit.SECONDS.sleep(2);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("future1完成!");
                    return "future1完成!";
                });

        CompletableFuture<String> future2 = CompletableFuture
                .supplyAsync(() -> {
                    System.out.println("future2完成!");
                    return "future2完成!";
                });

        CompletableFuture<Void> combindFuture = CompletableFuture
                .allOf(future1, future2);
        try {
            combindFuture.get();
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }

        countDownLatch.await();
    }
}

结果:

future2完成!
future1完成!

CompletableFuture常用方法总结

四、For循环中使用CompletableFuture

有时候,我们需要在某个接口中,远程调用第三方的某个接口。

比如:在注册企业时,需要调用天眼查接口,查一下该企业的名称和统一社会信用代码是否正确。

这时候在企业注册接口中,不得不先调用天眼查接口校验数据。如果校验失败,则直接返回。如果校验成功,才允许注册。

如果只是一个企业还好,但如果某个请求有10个企业需要注册,是不是要在企业注册接口中,循环调用10次天眼查接口才能判断所有企业是否正常呢?

public void register(List<Corp> corpList) {
  for(Corp corp: corpList) {
      CorpInfo info = tianyanchaService.query(corp);  
      if(null == info) {
         throw new RuntimeException("企业名称或统一社会信用代码不正确");
      }
  }
  doRegister(corpList);
}

这样做可以,但会导致整个企业注册接口性能很差,极容易出现接口超时问题。

那么,如何解决这类在循环中调用远程接口的问题呢?

1、批量操作

远程接口支持批量操作,比如天眼查支持一次性查询多个企业的数据,这样就无需在循环中查询该接口了。

但实际场景中,有些第三方不愿意提供第三方接口。

2、并发操作

java8以后通过CompletableFuture类,实现多个线程查天眼查接口,并且把查询结果统一汇总到一起。

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public class CompletableFutureFor {
    public static void main(String[] args) {
        //循环调用方法
        List<CompletableFuture<?>> futures = new ArrayList<>();
        for(int y = 1;y < 10;y++){
            futures.add(CompletableFuture.supplyAsync(() ->{
                try {
                    Thread.sleep(1000);
                    List<String> result = query(y,1000);
                    return result;
                }catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }));
        }
        

        long start = System.currentTimeMillis();
        List<CompletableFuture<?>> futures = new ArrayList<>();
        for (int i = 0; i < 100; i++) {
            int finalI = i;
                futures.add(CompletableFuture.supplyAsync(() ->{
                    try {
                        Thread.sleep(1000);
                        System.out.println("线程:CompletableFuture" + Thread.currentThread().getName());
                    }
                    catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    return finalI;
                }));
        }
        //等待全部完成    
        CompletableFuture.allOf(futures.toArray(newCompletableFuture[0])).join();
        //获取内容    
        for (CompletableFuture future : futures) {
            try {
                Object s = future.get();
                System.out.println(s);
            }
            catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
        }
        long end = System.currentTimeMillis();
        System.out.println("主线程:" + Thread.currentThread().getName());
        System.out.println("cost" + (end - start));
    }
}

 

标签:java,util,concurrent,详解,CompletableFuture,使用,import,public
From: https://www.cnblogs.com/zwh0910/p/17483514.html

相关文章

  • 【Java】使用 validation 完成自定义校验注解
    总括:validation让我们简化了开发过程,可以使用简单的一个注解就实现了很多常见的检验数据的功能,同时支持自定义注解。spring-boot-starter-validation是由SpringBoot整合的一套用于处理 validation的约定化自动配置启动器。Spring系列框架通过简单的安装依赖即可直接使用......
  • 对imxrt 1050 flexspi 多设备的使用
    本文主要是对近期学习flexspi的一个大致总结对于imxrt1050系列,只提供了一个flexspi,而手册中说可接4个设备,听着很不错,但其实有个很大限制,必须是同时钟频率的。因为只有一个外设,其中的时钟配置也只有一个时钟配置。而且对于AHB和IP两种访问方式,同一时间只能一个有效,同时在wr......
  • 手把手教你使用Flex 3——《Flex 3程序设计》
     手把手教你使用Flex3——《Flex3 1954年Fortran语言的发明,使软件业跨入了高级语言时代;1972年Smalltalk的发布,标志着“面向对象”语言时代的到来;2004年Adobe公司推出的Flex框架,预示着富因特网应用程序(RIA)浓墨重彩地登上了历史舞台,从此网络应用程序的表示层只能......
  • 安全警告:使用非微软官方Windows镜像安装系统的公司或同学们请注意!
    杀毒软件公司DoctorWeb6月13发布公告:非官方发布的Windows系统安装镜像可能内置木ma,此木ma内置于系统的EFI分区,不容易被杀毒软件识别,专门用来盗取电脑使用者的加密钱包密码。公告链接:https://news.drweb.com/show/?i=14712在我所有视频课程中,我都强烈建议用到的软件尽量从官方下载......
  • os模块的使用
    路径的拼接importos​path=os.path.join("db",'root',"a.txt")print(path)上级目录importos​path=os.path.join("db",'root',"a.txt")print(path) #db/root/a.txt​​folder_path=os.path.dirname(path)p......
  • 《Linux命令详解手册》——Linux畅销书作家又一力作
     关注IT,更要关心IT人,让系统管理员以及程序员工作得更加轻松和快乐。鉴于此,图灵公司引进了国外知名出版社JohnWileyandSons出版的FedoraLinuxToolbox:1000+CommandsforFedora,CentOSandRedHatPowerUsers (中文名《 Linux命令详解手册》预计在9月份上市,敬请期待!内......
  • Ubuntu正常启动黑屏,但可以使用recovery resume启动
    硬件:huaweimatebookEi712代iT问题:安装Ubuntu系统grub正常工作,直接启动黑屏,需强制关机。尝试:grub使用recoveryresume启动成功,但分辨率、刷新率不可调整。分析:显卡兼容问题处理方法:百度解决决方案:首先recoveryresume启动,进入终端方案一:直接修改grub.cfgsudoge......
  • C#中的字符串格式化详解
    在日常使用中,对于字符串的格式化这块也仅止步于能用就行。如日期格式化,小数点格式化等。有时在MSDN上查看一些示例代码时,会看到一些没有见过的字符串格式化输出,这里做个详细的总结,以后需要用时,直接到这里来看就好了。 说明:本文全部以字符串内插(C#6.0)的形式实现,而不是使用S......
  • 开源独角数卡使用Brevo配置SMTP
    引用Sendinblue免费SMTP邮局,每天免费发送300封邮件,每个月免费发送9000封邮件!https://www.otakusay.com/527.html介绍Brevo原Sendinblue免费邮局,支持API、SMTP方式发送邮件。接下来我们使用Brevo来申请SMTP配置到独角数卡。请不要用作任何违法犯罪活动,本站与此教程......
  • MySQL 8 如何解决快速获取数据库中所有业务库表列的distinct 值,不使用SQL
    开头还是介绍一下群,如果感兴趣polardb,mongodb,mysql,postgresql,redis等有问题,有需求都可以加群群内有各大数据库行业大咖,CTO,可以解决你的问题。最近我们接到一个需求,在数据库内,无准确目标的寻找每个表中的字里面包含某些特殊字符的列。工作了快半辈子了,也是第一次听说这样......