首页 > 编程语言 >CompletableFuture异步编程

CompletableFuture异步编程

时间:2022-11-22 14:01:57浏览次数:51  
标签:异步 编程 System 任务 CompletableFuture println out

1、创建

/**
     * public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier){..}
     * public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor){..}
     * public static CompletableFuture<Void> runAsync(Runnable runnable){..}
     * public static CompletableFuture<Void> runAsync(Runnable runnable,Executor executor){..}
     *
     * @throws ExecutionException
     * @throws InterruptedException
     */
    @Test
    public void testCreateCompletableFuture() throws ExecutionException, InterruptedException {
        // 方式一:
        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName() + " ===> 执行任务,有返回值");
            return "qiu";
        });
        String result = future1.get();
        System.out.println("result = " + result);

        // 方式二:
        ExecutorService pool = Executors.newFixedThreadPool(2);
        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName() + " ===> 执行任务,有返回值,自定义线程池");
            return "qiu";
        }, pool);
        result = future2.get();
        System.out.println("future2 = " + future2);

        // 方式三:
        CompletableFuture<Void> future3 = CompletableFuture.runAsync(() -> {
            System.out.println(Thread.currentThread().getName() + " ===> 执行任务,无返回值");
        });

        // 方式四:
        CompletableFuture<Void> future4 = CompletableFuture.runAsync(() -> {
            System.out.println(Thread.currentThread().getName() + " ===> 执行任务,有返回值,自定义线程池");
        }, pool);
    }
  • supplyAsync :执行任务,支持返回值。
  • runAsync:执行任务,没有返回值。

2、结果获取

/**
     * //方式一
     * public T get()
     * //方式二
     * public T get(long timeout, TimeUnit unit)
     * //方式三
     * public T getNow(T valueIfAbsent)
     * //方式四
     * public T join()
     */
@SneakyThrows
@Test
public void testGet(){

	CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
		try {
			Thread.sleep(1000);
		} catch (InterruptedException e) {
			throw new RuntimeException(e);
		}
		System.out.println("执行了一个任务");
		return "qiu";
	});

	// getNow()
	String futureNow = future.getNow("立刻获取");
	System.out.println("futureNow = " + futureNow);

	// get(time):超时抛出异常
	// String r = future.get(10, TimeUnit.MICROSECONDS);
	// System.out.println("r = " + r);

	// get()
	String result = future.get();
	System.out.println("result = " + result);

	// join()
	CompletableFuture<Void> future1 = CompletableFuture.runAsync(() -> System.out.println(1 / 0));
	future1.join();

}
  • get()和get(long timeout, TimeUnit unit) => 在Future中就已经提供了,后者提供超时处理,如果在指定时间内未获取结果将抛出超时异常
  • getNow => 立即获取结果不阻塞,结果计算已完成将返回结果或计算过程中的异常,如果未计算完成将返回设定的valueIfAbsent值
  • join => 方法里不会抛出异常

3、异步回调方法

thenRun()|thenRunAsync()
/**
		无参无返回值 : 做完第一个任务后,再做第二个任务,第二个任务也没有返回值
     * thenRun()
     * thenRunAsync()
     * @throws InterruptedException
     */
@Test
public void testUse() throws InterruptedException {
	ExecutorService pool = Executors.newFixedThreadPool(1);

	LocalDateTime startTime = LocalDateTime.now();

	CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
		try {
			Thread.sleep(500);
		} catch (InterruptedException e) {
			throw new RuntimeException(e);
		}
		System.out.println(Thread.currentThread().getName() + "第一个任务");
	});
	// thenRun()
	CompletableFuture<Void> future1 = future.thenRun(() -> {
		try {
			Thread.sleep(400);
		} catch (InterruptedException e) {
			throw new RuntimeException(e);
		}
		System.out.println(Thread.currentThread().getName() + "第二个任务");
	});
	// thenRunAsync()
	future1.thenRunAsync(() -> {
		try {
			Thread.sleep(300);
		} catch (InterruptedException e) {
			throw new RuntimeException(e);
		}
		System.out.println(Thread.currentThread().getName() + "第三个任务");
	},pool);
	Thread.sleep(1300);

	LocalDateTime endTime = LocalDateTime.now();
	Duration duration = Duration.between(startTime, endTime);
	System.out.println("共耗: " + duration.toHours() + " 小时, " + duration.toMinutes() + " 分钟, "
					   + duration.getSeconds() + " 秒, " + duration.toMillis() + " 毫秒");
}
如果你执行第一个任务的时候,传入了一个自定义线程池:
  • 调用thenRun方法执行第二个任务时,则第二个任务和第一个任务是共用同一个线程池。
  • 调用thenRunAsync执行第二个任务时,则第一个任务使用的是你自己传入的线程池,第二个任务使用的是ForkJoin线程池。
thenAccept()|thenAcceptAsync()
/**
     * 有参无返回值:第一个任务执行完成后,执行第二个回调方法任务,会将该任务的执行结果,作为入参,传递到回调方法中,但是回调方法是没有返回值的。
     * thenAccept(params)
     * thenAcceptAsync(params)
     */
    @Test
    public void testAccept() throws ExecutionException, InterruptedException {
        ExecutorService pool = Executors.newFixedThreadPool(2);

        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "qiu");

        CompletableFuture<Void> accept = future.thenAccept((e) -> {
            System.out.println("上一个任务的返回值 = " + e);
        });

        // 无返回值 null
        System.out.println("accept.get() = " + accept.get());

        CompletableFuture<Void> accept2 = future.thenAcceptAsync((e) -> {
            System.out.println("上一个任务的返回值 = " + e);
        });
        System.out.println("accept2.get() = " + accept2.get());
    }
thenApply()|thenApplyAsync()
/**
     * 有参有返回值:表示第一个任务执行完成后,执行第二个回调方法任务,会将该任务的执行结果,作为入参,传递到回调方法中,并且回调方法是有返回值的
     * thenApply()
     * thenApplyAsync()
     */
    @Test
    public void testApply() throws ExecutionException, InterruptedException {
        ExecutorService pool = Executors.newFixedThreadPool(2);

        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "qiu");

        CompletableFuture<String> future1 = future.thenApply((e) -> e);
        System.out.println("上一个任务的返回值future = " + future1.get());

        CompletableFuture<String> future2 = future1.thenApplyAsync((e) -> e, pool);
        System.out.println("上一个任务的返回值future1 = " + future2.get());

    }

4、异常回调

whenComplete:当CompletableFuture的任务不论是正常完成还是出现异常它都会调用「whenComplete」这个回调函数。
  • 正常完成:whenComplete返回结果和上级任务一致,异常为null;
  • 出现异常:whenComplete返回结果为null,异常为上级任务的异常;
    即调用get()时,正常完成时就获取到结果,出现异常时就会抛出异常,需要你处理该异常
/**
     * whenComplete:
     */
    @Test
    public void testWhenComplete() throws ExecutionException, InterruptedException {
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
            if (Math.random() < 0.5) {
                throw new RuntimeException("出错了!!!");
            }
            System.out.println("正常结束");
            return 11;
        }).whenComplete((e, t) -> {
            if (e == null) {
                System.out.println("whenComplete e is null");
            } else {
                System.out.println("whenComplete e is " + e);
            }
            if (t == null) {
                System.out.println("whenComplete t is null");
            } else {
                System.out.println("whenComplete t is " + t.getMessage());
            }
        });
        System.out.println("结果 = " + future.get());
    }
whenComplete + exceptionally exceptionally会捕获任务执行中的异常,然后给一个默认的返回值
@Test
    public void testWhenComplete() throws ExecutionException, InterruptedException {
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
            if (Math.random() < 0.5) {
                throw new RuntimeException("出错了!!!");
            }
            System.out.println("正常结束");
            return 11;
        }).whenComplete((e, t) -> {
            if (e == null) {
                System.out.println("whenComplete e is null");
            } else {
                System.out.println("whenComplete e is " + e);
            }
            if (t == null) {
                System.out.println("whenComplete t is null");
            } else {
                System.out.println("whenComplete t is " + t.getMessage());
            }
        }).exceptionally(t -> {
            System.out.println("出现异常了:" + t.getMessage());
            return 0;
        });
        System.out.println("结果 = " + future.get());
    }

结果:

whenComplete e is null
whenComplete t is java.lang.RuntimeException: 出错了!!!
出现异常了:java.lang.RuntimeException: 出错了!!!
结果 = 0

5、多任务组合之 AND

/**
     * 多任务组合之 AND
     * thenCombine / thenAcceptBoth / runAfterBoth都表示:「当任务一和任务二都完成再执行任务三」。
     * 区别在于:
     *   runAfterBoth: 不会把执行结果当做方法入参,且没有返回值
     *   thenAcceptBoth: 会将两个任务的执行结果作为方法入参,传递到指定方法中,且无返回值
     *   thenCombine: 会将两个任务的执行结果作为方法入参,传递到指定方法中,且有返回值
     */
    @Test
    public void testAnd() throws ExecutionException, InterruptedException {
        //创建线程池
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        //开启异步任务1
        CompletableFuture<Integer> task = CompletableFuture.supplyAsync(() -> {
            System.out.println("异步任务1,当前线程是:" + Thread.currentThread().getId());
            int result = 1 + 1;
            System.out.println("异步任务1结束");
            return result;
        }, executorService);

        //开启异步任务2
        CompletableFuture<Integer> task2 = CompletableFuture.supplyAsync(() -> {
            System.out.println("异步任务2,当前线程是:" + Thread.currentThread().getId());
            int result = 1 + 1;
            System.out.println("异步任务2结束");
            return result;
        }, executorService);

        //任务组合
        CompletableFuture<Integer> task3 = task.thenCombineAsync(task2, (f1, f2) -> {
            System.out.println("执行任务3,当前线程是:" + Thread.currentThread().getId());
            System.out.println("任务1返回值:" + f1);
            System.out.println("任务2返回值:" + f2);
            return f1 + f2;
        }, executorService);

        Integer res = task3.get();
        System.out.println("最终结果:" + res);
    }

6、多任务组合之 OR

/**
     * 多任务组合之 OR
     * applyToEither / acceptEither / runAfterEither 都表示:「两个任务,只要有一个任务完成,就执行任务三」。
     * 区别在于:
     * runAfterEither:不会把执行结果当做方法入参,且没有返回值
     * acceptEither: 会将已经执行完成的任务,作为方法入参,传递到指定方法中,且无返回值
     * applyToEither:会将已经执行完成的任务,作为方法入参,传递到指定方法中,且有返回值
     */
    @Test
    public void testOR() {
        // 创建线程池
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        // 开启异步任务1
        CompletableFuture<Integer> task = CompletableFuture.supplyAsync(() -> {
            System.out.println("异步任务1,当前线程是:" + Thread.currentThread().getId());

            int result = 1 + 1;
            System.out.println("异步任务1结束");
            return result;
        }, executorService);

        // 开启异步任务2
        CompletableFuture<Integer> task2 = CompletableFuture.supplyAsync(() -> {
            System.out.println("异步任务2,当前线程是:" + Thread.currentThread().getId());
            int result = 1 + 2;
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("异步任务2结束");
            return result;
        }, executorService);

        // 任务组合
        task.acceptEitherAsync(task2, (res) -> {
            System.out.println("执行任务3,当前线程是:" + Thread.currentThread().getId());
            System.out.println("上一个任务的结果为:" + res);
        }, executorService);
    }

7、多任务组合

allOf:等待所有任务完成
@Test
    public void testallOf() throws ExecutionException, InterruptedException {
        //创建线程池
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        //开启异步任务1
        CompletableFuture<Integer> task = CompletableFuture.supplyAsync(() -> {
            System.out.println("异步任务1,当前线程是:" + Thread.currentThread().getId());
            int result = 1 + 1;
            System.out.println("异步任务1结束");
            return result;
        }, executorService);

        //开启异步任务2
        CompletableFuture<Integer> task2 = CompletableFuture.supplyAsync(() -> {
            System.out.println("异步任务2,当前线程是:" + Thread.currentThread().getId());
            int result = 1 + 2;
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("异步任务2结束");
            return result;
        }, executorService);

        //开启异步任务3
        CompletableFuture<Integer> task3 = CompletableFuture.supplyAsync(() -> {
            System.out.println("异步任务3,当前线程是:" + Thread.currentThread().getId());
            int result = 1 + 3;
            try {
                Thread.sleep(4000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("异步任务3结束");
            return result;
        }, executorService);

        //任务组合,这里可以理解为,当我们需要去其他系统调用数据时,组合所有任务
        CompletableFuture<Void> allOf = CompletableFuture.allOf(task, task2, task3);

        //等待所有任务完成
        allOf.get();

        //获取任务的返回结果
        System.out.println("task结果为:" + task.get());
        System.out.println("task2结果为:" + task2.get());
        System.out.println("task3结果为:" + task3.get());
    }
anyOf() : 只要有一个任务完成
@Test
    public void testAnyOf() throws ExecutionException, InterruptedException {
        //创建线程池
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        //开启异步任务1
        CompletableFuture<Integer> task = CompletableFuture.supplyAsync(() -> {
            System.out.println("异步任务1,当前线程是:" + Thread.currentThread().getId());
            int result = 1 + 1;
            System.out.println("异步任务1结束");
            return result;
        }, executorService);

        //开启异步任务2
        CompletableFuture<Integer> task2 = CompletableFuture.supplyAsync(() -> {
            System.out.println("异步任务2,当前线程是:" + Thread.currentThread().getId());
            int result = 1 + 2;
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("异步任务2结束");
            return result;
        }, executorService);

        //开启异步任务3
        CompletableFuture<Integer> task3 = CompletableFuture.supplyAsync(() -> {
            System.out.println("异步任务3,当前线程是:" + Thread.currentThread().getId());
            int result = 1 + 3;
            try {
                Thread.sleep(4000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("异步任务3结束");
            return result;
        }, executorService);

        //任务组合,这里可以理解为,当我们需要去其他系统调用数据时,组合所有任务
        CompletableFuture<Object> future = CompletableFuture.anyOf(task, task2, task3);

        //某一人任务完成
        Object res = future.get();
        System.out.println("res = " + res);

        //获取任务的返回结果
        System.out.println("task结果为:" + task.get());
        System.out.println("task2结果为:" + task2.get());
        System.out.println("task3结果为:" + task3.get());
    }

标签:异步,编程,System,任务,CompletableFuture,println,out
From: https://www.cnblogs.com/qbbit/p/16912929.html

相关文章

  • NModbus4项目3——异步读写数据的方法
    使用NModbus4进行一部数据读取。以读寄存器数据和写寄存器数据为例,其他功能的使用方法类似。读寄存器数据:ushort[]datas=newushort[8];Task<ushort[]......
  • 用php入门网络编程
    学习思路以下是我对学习网络编程的一个简单的学习思路,之后我将会按照这个计划去逐步学习网络编程相关的知识。step1.原生php实现TCPServer->原生php实现http协议->掌......
  • NumPy笔记(2)—— 使用数组进行面向数组编程
    参考:《利用python进行数据分析》第4章注意,由于本文是jupyter文档转换来的,代码不一定可以直接运行,有些注释是jupyter给出的交互结果,而非运行结果!!文章目录​​1.生成网格数......
  • 极客编程python入门-切片
    切片取一个list或tuple的部分元素是非常常见的操作。>>>L=['Michael','Sarah','Tracy','Bob','Jack']>>>[L[0],L[1],L[2]]['Michael','Sarah','Tracy']Python提......
  • 22.大促期间网络编程与安全解读【双元】(1) _完全没用
               ......
  • Java NIO编程实例
     文章目录前言一、NIO与BIO的比较二、Buffer的机制及其子类1.Buffer的使用2.Buffer的四个基本类型三、Channel的使用1.Channel的特征2.Channel的子......
  • 自己学网页编程。
    自学网页编程的CSS代码展示:*{  margin:0;  padding:0;}.w{  width:1200px;  margin:auto;}body{  background-color:#f3f5f7; ......
  • 并发编程(完结)
    多进程实现TCP服务端并发服务端frommultiprocessingimportProcessimportsocket"""服务端的三个条件:1、有固定的IP和PORT。2、24小时不间断提供服务。......
  • 并发编程:多线程、GIL、协程
    目录一、多进程实现TCP服务器并发1.服务端2.客户端二、线程1.什么是线程2.进程与线程的关系3.创建线程的两种方式4.线程对象的其他方法5.同进程内多个线程数据共享三、互斥......
  • day39并发编程
    多进程实现TCP服务端并发importsocketfrommultiprocessingimportProcessdefget_server():server=socket.socket()server.bind(('127.0.0.1',8080))......