-
Future :获取异步返回的结果需要使用轮询的方式,消耗cup
ExecutorService executorService = Executors.newFixedThreadPool(10); Future<String> future = executorService.submit(()->{ try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } return "future"; }); while(true){ if(future.isDone()){ System.out.println(future.get()); break; } }
-
CompletableFuture:采用观察者模式,阻塞获取异步返回的结果,性能得到优化
System.out.println("=============CompletableFuture==================="); CompletableFuture testFuture1 = CompletableFuture.supplyAsync(()->{ return "丽丽1"; }).thenApply((element)->{ System.out.println("testFuture1后续操作:"+element); return "丽丽2"; }); System.out.println(testFuture1.get()); System.out.println("=============CompletableFuture==================="); CompletableFuture testFuture2 = CompletableFuture.supplyAsync(()->{ return "丽丽1"; }).thenAccept((element)->{ System.out.println("testFuture2后续操作:"+element); }); System.out.println(testFuture2.get());
-
CompletableFuture的使用明细
- 官方文档:https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/operators/asyncio
* runAsync 无返回值 * supplyAsync 有返回值 * * thenAccept 无返回值 * thenApply 有返回值 * thenRun 不关心上一步执行结果,执行下一个操作 * get() 为阻塞获取 可设置超时时间 避免长时间阻塞
实现接口 AsyncFunction 用于请求分发 定义一个callback回调函数,该函数用于取出异步请求的返回结果,并将返回的结果传递给ResultFuture 对DataStream的数据使用Async操作
-
例子
/** * An implementation of the 'AsyncFunction' that sends requests and sets the callback. * 通过向数据库发送异步请求并设置回调方法 */ class AsyncDatabaseRequest extends RichAsyncFunction<String, Tuple2<String, String>> { /** The database specific client that can issue concurrent requests with callbacks 可以异步请求的特定数据库的客户端 */ private transient DatabaseClient client; @Override public void open(Configuration parameters) throws Exception { client = new DatabaseClient(host, post, credentials); } @Override public void close() throws Exception { client.close(); } @Override public void asyncInvoke(String key, final ResultFuture<Tuple2<String, String>> resultFuture) throws Exception { // issue the asynchronous request, receive a future for result // 发起一个异步请求,返回结果的 future final Future<String> result = client.query(key); // set the callback to be executed once the request by the client is complete // the callback simply forwards the result to the result future // 设置请求完成时的回调.将结果传递给 result future CompletableFuture.supplyAsync(new Supplier<String>() { @Override public String get() { try { return result.get(); } catch (InterruptedException | ExecutionException e) { // Normally handled explicitly. return null; } } }).thenAccept( (String dbResult) -> { resultFuture.complete(Collections.singleton(new Tuple2<>(key, dbResult))); }); } } // create the original stream // 创建一个原始的流 DataStream<String> stream = ...; // apply the async I/O transformation // 添加一个 async I/O ,指定超时时间,和进行中的异步请求的最大数量 DataStream<Tuple2<String, String>> resultStream = AsyncDataStream.unorderedWait(stream, new AsyncDatabaseRequest(), 1000, TimeUnit.MILLISECONDS, 100);
-
注意事项
- Timeout:定义请求超时时间,异步请求多久没完成会被认为是超时了
- Capacity:定义了同时进行的异步请求的数量,可以限制并发请求数量,不会积压过多的请求
- 超时处理:默认当一个异步 I/O 请求超时时,会引发异常并重新启动作业。 如果要处理超时,可以覆盖该
AsyncFunction的timeout
方法来自定义超时之后的处理方式 - 响应结果的顺序:AsyncDataStream包含两种输出模式,
- unorderedWait无序:响应结果的顺序与异步请求的顺序不同
- orderedWait有序:响应结果的顺序与异步请求的顺序相同