首页 > 其他分享 >JUC-CompletableFuture 类

JUC-CompletableFuture 类

时间:2024-08-22 17:50:54浏览次数:10  
标签:JUC get System CompletableFuture Override new public

1. CompletableFuture 简介

在Java中CompletableFuture用于异步编程,异步编程是编写非阻塞的代码,运行的任务在一个单独的线程,与主线程隔离,并且会通知主线程它的进度,成功或者失败。

在这种方式中,主线程不会被阻塞,不需要一直等到子线程完成。主线程可以并行的执行其他任务。

使用这种并行方式,可以极大的提高程序的性能。

CompletableFuture 实现了 Future 和 CompletionStage接口,并且提供了许多关于创建,链式调用和组合多个 Future 的便利方法集,而且有广泛的异常处理支持。

CompletionStage 接口代表异步计算过程中的某一个阶段,一个阶段完成以后可能会触发另外一个阶段

2. 问题场景

  1. 将两个异步计算合并为一个——这两个异步计算之间相互独立,同时第二个又依赖于第一个的结果。

  2. 等待Future集合中的所有任务都完成。

  3. 仅等待Future集合中最快结束的任务完成(有可能因为它们试图通过不同的方式计算同一个值),并返回它的结果。

  4. 通过编程方式完成一个Future任务的执行(即以手工设定异步操作结果的方式)。

  5. 应对Future的完成事件(即当Future的完成事件发生时会收到通知,并能使用Future计算的结果进行下一步的操作,不只是简单地阻塞等待操作的结果)。

3. 方法详述

1. runAsync 和 supplyAsync方法

CompletableFuture 提供了四个静态方法来创建一个异步操作。

public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)

没有指定 Executor 的方法会使用 ForkjoinPool.commonPool(),使用线程池则使用指定的线程池。

  • runAsync 不支持返回值
  • supplyAsync 支持返回值

runAsync() 示例:

public void runAsyncTest() throws ExecutionException, InterruptedException {

    CompletableFuture<Void> f1 = CompletableFuture.runAsync(new Runnable() {
        @Override
        public void run() {
            System.out.println("wpz");
        }
    });
    CompletableFuture<Void> f2 = CompletableFuture.runAsync(new Runnable() {
        @Override
        public void run() {
            System.out.println("pz");
        }
    });
    
    //调用 get 方法就阻塞在这里
    //f1.get();
    //f2.get();

    System.out.println(Thread.currentThread().getName() + ":main");
}

supplyAsync() 示例:

public void supplyAsyncTest() throws ExecutionException, InterruptedException {

    CompletableFuture<String> future = CompletableFuture.supplyAsync(new Supplier<String>() {
        @Override
        public String get() {
            return "wpz";
        }
    });
    CompletableFuture<String> future1 = CompletableFuture.supplyAsync(new Supplier<String>() {
        @Override
        public String get() {
            return "wpz";
        }
    });
    
    //get() 方法有返回值,同样会阻塞主线程
    System.out.println(future.get());
    System.out.println(future1.get());
}

2. whenComplete

当 CompletableFuture 的计算结果完成,可以执行特定的Action。

public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)

没有指定 Executor 的方法会使用 ForkjoinPool.commonPool(),使用线程池则使用指定的线程池

  • whenComplete:是执行当前任务的线程执行继续执行 whenComplete 的任务。
  • whenCompleteAsync:是执行把 whenCompleteAsync 这个任务继续提交给线程池来进行执行。

示例:

public void callback() throws ExecutionException, InterruptedException {
    CompletableFuture<String> future = CompletableFuture.supplyAsync(new Supplier<String>() {
        @Override
        public String get() {
            System.out.println( "supplyAsync 线程:" + Thread.currentThread().getName() );
            return "z";
        }
    }).whenCompleteAsync(new BiConsumer<String, Throwable>() {
        @Override
        public void accept(String s, Throwable throwable) {
            System.out.println("whenCompleteAsync:" + Thread.currentThread().getName());
            System.out.println("whenCompleteAsync:" + s);
        }
    }).whenComplete(new BiConsumer<String, Throwable>() {
        @Override
        public void accept(String s, Throwable throwable) {
            System.out.println("whenComplete:" + Thread.currentThread().getName());
            System.out.println("whenCompleteAsync:" + s);
        }
    });
}

3. exceptionally

当 CompletableFuture 的计算结果抛出异常时,执行后续的回调

public CompletableFuture<T> exceptionally(Function<Throwable,? extends T> fn)

非检查异常示例:

public void exceptionallyTest() throws ExecutionException, InterruptedException {
    CompletableFuture<String> future = CompletableFuture.supplyAsync(new Supplier<String>() {
        @Override
        public String get() {
            System.out.println( "supplyAsync 线程:" + Thread.currentThread().getName() );
            System.out.println(1/0);
            return "wpz";
        }
    }).exceptionally(new Function<Throwable, String>() {
        @Override
        public String apply(Throwable throwable) {
            System.out.println(throwable.getMessage());
            return "xpz";
        }
    });
    
    //此时 get 的值,是 exceptionally() 方法的返回值
    System.out.println(future.get());
}

如果是检查异常,由于需要处理,所以需要封装成 new RuntimeException(new IOException("wpz")) 运行时异常,或者方法上添加 @SneakyThrows 来骗过编译器

4. thenApply

当一个线程依赖另一个线程时,可以使用 thenApply 方法来把这两个线程串行化。

public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)
  • thenApplyAsync 交由线程池处理
  • thenApply 当前主线程处理

示例:

public void thenApplyTest() throws ExecutionException, InterruptedException {
    CompletableFuture<String> future = CompletableFuture.supplyAsync(new Supplier<String>() {
        @Override
        public String get() {
            return "wpz";
        }
    }).thenApply(new Function<String, String>() {
        @Override
        public String apply(String s) {
            System.out.println(Thread.currentThread().getName() + ":" + s);
            return s + "1994";
        }
    }).thenApplyAsync(new Function<String, String>() {
        @Override
        public String apply(String s) {
            System.out.println(Thread.currentThread().getName() + ":" + s);
            return s + "sx";
        }
    });
    System.out.println(future.get());
}

5. handle

handle 是执行任务完成时对结果的处理。
handle 方法和 thenApply 方法处理方式基本一样。不同的是 handle 是在任务完成后再执行,还可以处理异常的任务。thenApply 只可以执行正常的任务,任务出现异常则不执行 thenApply 方法。

public <U> CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn);
public <U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn);
public <U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn,Executor executor);

示例:就是 thenApply() 添加了对异常的感知处理

public void handleTest() throws ExecutionException, InterruptedException {
     CompletableFuture<String> future = CompletableFuture.supplyAsync(new Supplier<String>() {
        @Override
        public String get() {
            System.out.println(1 / 0);
            return "wpz";
        }
    }).handle(new BiFunction<String, Throwable, String>() {
        @Override
        public String apply(String s, Throwable throwable) {
            if (throwable != null) {
                return s + "sx";
            }
            return s + "1994";
        }
    });
    System.out.println(future.get());
}

6. thenAccept

接收任务的处理结果,并消费处理,无返回结果。

public CompletableFuture<Void> thenAccept(Consumer<? super T> action);
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action);
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action,Executor executor);

与 whenComplete 相比:

    1. 返回值被消费了,返回值变成了 CompletableFuture<Void>,whenComplete 不影响原来的返回值(引用类型除外,这就是另一个问题了)
    1. 专注于结果,对异常无法感知。whenComplete 则会感知异常。

示例:

public void thenAcceptTest() throws ExecutionException, InterruptedException {
    CompletableFuture<Void> future = CompletableFuture.supplyAsync(new Supplier<String>() {
        @Override
        public String get() {
            //无法感知异常,如果这里有异常则直接报错
            //System.out.print(1/0);
            return "wpz";
        }
    }).thenAccept(new Consumer<String>() {
        @Override
        public void accept(String s) {
            System.out.println(s);
        }
    });
    future.get();
}

7. thenRun

跟 thenAccept 方法不一样的是,不关心任务的处理结果。只要上面的任务执行完成,就开始执行 thenRun 。

public CompletableFuture<Void> thenRun(Runnable action);
public CompletableFuture<Void> thenRunAsync(Runnable action);
public CompletableFuture<Void> thenRunAsync(Runnable action,Executor executor);

示例:

public void thenRunTest() throws ExecutionException, InterruptedException {
    CompletableFuture<Void> future = CompletableFuture.supplyAsync(new Supplier<String>() {
        @Override
        public String get() {
            return "wpz";
        }
    }).thenRun(new Runnable() {
        @Override
        public void run() {
            System.out.println("没有结果,反正完事了");
        }
    });
    future.get();
}

8. thenCombine

thenCombine 会把 两个 CompletableFuture 的任务都执行完成后,把两个任务的结果一块交给 thenCombine 来处理。

public <U,V> CompletableFuture <V> thenCombine(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);
public <U,V> CompletableFuture <V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);
public <U,V> CompletableFuture <V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn,Executor executor);

示例:

public void thenCombineTest() throws ExecutionException, InterruptedException {
    CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(new Supplier<Integer>() {
        @Override
        public Integer get() {
            System.out.println(Thread.currentThread().getName());
            return 1;
        }
    });
    CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(new Supplier<Integer>() {
        @Override
        public Integer get() {
            System.out.println(Thread.currentThread().getName());
            return 2;
        }
    });
    CompletableFuture<Integer> future = future1.thenCombine(future2, new BiFunction<Integer, Integer, Integer>() {
        @Override
        public Integer apply(Integer integer, Integer integer2) {
            System.out.println(Thread.currentThread().getName());
            return integer + integer2;
        }
    });

    System.out.println( future.get());
}

9. thenAcceptBoth

当两个 CompletionStage 都执行完成后,把结果一块交给 thenAcceptBoth 来进行消耗

public <U> CompletableFuture <Void> thenAcceptBoth(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action);
public <U> CompletableFuture <Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action);
public <U> CompletableFuture <Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action,     Executor executor);

示例:

public void thenAcceptBothTest() throws ExecutionException, InterruptedException {
    CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(new Supplier<Integer>() {
        @Override
        public Integer get() {
            System.out.println(Thread.currentThread().getName());
            return 1;
        }
    });
    CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(new Supplier<Integer>() {
        @Override
        public Integer get() {
            System.out.println(Thread.currentThread().getName());
            return 2;
        }
    });

    CompletableFuture<Void> future = future1.thenAcceptBoth(future2, new BiConsumer<Integer, Integer>() {
        @Override
        public void accept(Integer integer, Integer integer2) {
            System.out.println(integer + integer2);
        }
    });
    future.get();
}

10. applyToEither

两个 CompletableFuture,谁执行返回的结果快,我就用那个CompletionStage的结果进行下一步的转化操作,有种竞标的感觉。

public <U> CompletableFuture<U> applyToEither(CompletionStage<? extends T> other,Function<? super T, U> fn);
public <U> CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other,Function<? super T, U> fn);
public <U> CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other,Function<? super T, U> fn,Executor executor);

示例:

public void applyToEither() throws ExecutionException, InterruptedException {
    CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(new Supplier<Integer>() {
        @Override
        public Integer get() {
            return 1;
        }
    });
    CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(new Supplier<Integer>() {
        @Override
        public Integer get() {
            return 2;
        }
    });

    CompletableFuture<Integer> future = future1.applyToEither(future2, new Function<Integer, Integer>() {
        @Override
        public Integer apply(Integer integer) {
            return integer;
        }
    });
    System.out.println( future.get() );
}

11. acceptEither

两个 CompletableFuture,谁执行返回的结果快,我就用那个CompletionStage的结果进行下一步的消耗操作。

public CompletableFuture<Void> acceptEither(CompletionStage<? extends T> other,Consumer<? super T> action);
public CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other,Consumer<? super T> action);
public CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other,Consumer<? super T> action,Executor executor);

示例:

public void acceptEither() throws ExecutionException, InterruptedException {
    CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(new Supplier<Integer>() {
        @Override
        public Integer get() {
            return 1;
        }
    });
    CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(new Supplier<Integer>() {
        @Override
        public Integer get() {
            return 2;
        }
    });

    CompletableFuture<Void> future = future1.acceptEither(future2, new Consumer<Integer>() {
        @Override
        public void accept(Integer integer) {
            System.out.println(integer);
        }
    });
    future.get();
}

12. runAfterEither

两个 CompletableFuture,任何一个完成了都会执行下一步的操作(Runnable)

public CompletableFuture<Void> runAfterEither(CompletionStage<?> other,Runnable action);
public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action);
public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action,Executor executor);

示例:

public void runAfterEither() throws ExecutionException, InterruptedException {
    CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(new Supplier<Integer>() {
        @Override
        public Integer get() {
            return 1;
        }
    });
    CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(new Supplier<Integer>() {
        @Override
        public Integer get() {
            return 2;
        }
    });

    CompletableFuture<Void> future = future1.runAfterEither(future2, new Runnable() {
        @Override
        public void run() {
            System.out.println("pz 来了");
        }
    });
    future.get();
}

13. runAfterBoth

两个 CompletableFuture,都完成了计算才会执行下一步的操作(Runnable)

public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other,Runnable action);
public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other,Runnable action);
public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other,Runnable action,Executor executor);

示例:

public void runAfterBoth() throws ExecutionException, InterruptedException {
    CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(new Supplier<Integer>() {
        @Override
        public Integer get() {
            return 1;
        }
    });
    CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(new Supplier<Integer>() {
        @Override
        public Integer get() {
            return 2;
        }
    });
    CompletableFuture<Void> future = future1.runAfterBoth(future2, new Runnable() {
        @Override
        public void run() {
            System.out.println("呵呵");
        }
    });
    future.get();
}

14. thenCompose

thenCompose 方法允许你对两个 CompletableFuture 进行流水线操作,第一个操作完成时,将其结果作为参数传递给第二个操作。

public <U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn);
public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn) ;
public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn, Executor executor) ;

示例:

public void thenCompose() throws ExecutionException, InterruptedException {
    CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(new Supplier<Integer>() {
        @Override
        public Integer get() {
            return 1;
        }
    });

    CompletableFuture<Integer> future = future1.thenCompose(new Function<Integer, CompletionStage<Integer>>() {
        @Override
        public CompletionStage<Integer> apply(Integer integer) {
            return CompletableFuture.supplyAsync(new Supplier<Integer>() {
                @Override
                public Integer get() {
                    return integer + 2;
                }
            });
        }
    });

    System.out.println( future.get() );
}

标签:JUC,get,System,CompletableFuture,Override,new,public
From: https://www.cnblogs.com/cnff/p/18373892

相关文章

  • 异步编程CompletableFuture的一些使用demo
      publicstaticThreadPoolExecutorexecutor=newThreadPoolExecutor(5,5,5L,TimeUnit.SECONDS,newLinkedBlockingQueue<>(1000),newThreadPoolExecutor.CallerRunsPolicy());publicstaticvoidmain(String[]args)throwsException{Complet......
  • Java并发编程 - JUC介绍、JUC锁(公平锁、非公平锁、可重入锁/递归锁、自旋锁、Reentran
    Java并发编程中的java.util.concurrent(简称JUC)包提供了许多高级并发工具和类,使得开发人员能够更加方便地编写高性能的并发程序。下面将详细介绍JUC包中的一些锁相关的概念和类。JUC介绍java.util.concurrent包提供了许多高级并发工具类,包括但不限于Executor框架......
  • Java 使用 CompletableFuture 简化异步调用
    使用CompletableFuture可以大大简化处理多线程之间的异步调用关系,如串行依赖、并行、聚合等等。CompletableFuture是对Future接口的扩展和增强,进行了丰富的接口方法扩展,完美的弥补了Future的不足。本篇博客通过代码的方式,展示CompletableFuture的常用方法,体验其强大灵......
  • JUC4-共享模型之内存
    目录Java内存模型(JMM)可见性退不出的循环解决方法可见性&原子性终止模式之两阶段终止模式同步模式之Balking有序性原理:指令级并行指令重排序优化支持流水线的处理器多线程下的问题原理:volatile如何保证可见性如何保证有序性double-checkedlockinghappens-befo......
  • JUC3-共享模型之管程
    目录共享带来的问题分析临界区CriticalSection竞态条件RaceConditionsynchronized解决方案语法1语法2线程八锁变量的线程安全成员变量和静态变量是否线程安全?局部变量是否线程安全?常用线程安全类MonitorJava对象头Monitor(监视器/管程)原理:synchronized轻量级......
  • 如何使用Java CompletableFuture
    Java的CompletableFuture是处理异步编程的利器。它不仅简化了异步任务的执行,还提供了丰富的API来支持任务的组合、异常处理、以及多任务并行。1.CompletableFuture的基础概念CompletableFuture是Java8中引入的,属于java.util.concurrent包。它实现了Future接口,允......
  • 【JUC】读写锁+邮戳锁
    文章目录关于锁的面试题简单聊聊ReentrantReadWriteLock是什么?锁的演变读写锁案例读写锁特点锁降级案例演示为什么设计锁降级(源码分析)邮戳锁StampedLock(比读写锁更快的锁)邮戳锁是什么?它是由饥饿问题引出StampedLock的特点乐观读模式Code演示传统的读写锁模式----读的......
  • JUC锁-AQS源码解读
    JUC锁-Java8中AbstractQueuedSynchronizer源码解读分析总体介绍AbstractQueuedSynchronizer概述在锁框架中,AbstractQueuedSynchronizer抽象类可以毫不夸张的说,占据着核心地位,它提供了一个基于FIFO队列,可以用于构建锁或者其他相关同步装置的基础框架。所以很有必要好好......
  • jUC中的锁
    在JUC中可以使用synchronized关键字进行加锁如下所示Objectobject=newObject();synchronized(object){//TODO}synchronized关键字所加的锁是逐步升级的,顺序是无锁->偏向锁->轻量级锁->重量级锁、随着锁等级的提高,所带来的消耗也会越大。在介绍......
  • Java中的异步编程模型与事件处理框架:从CompletableFuture到Reactive Streams
    Java中的异步编程模型与事件处理框架:从CompletableFuture到ReactiveStreams大家好,我是微赚淘客系统3.0的小编,是个冬天不穿秋裤,天冷也要风度的程序猿!在现代软件开发中,异步编程和事件驱动架构变得越来越重要。它们能有效提高应用程序的性能和响应速度,特别是在处理高并发和I......