多线程查询结果合并
使用CompletableFuture
来实现多线程查询和结果合并。CompletableFuture
提供了一种方便的方式来协调异步任务并处理其结果。下面是一个使用CompletableFuture
的示例:
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
public class MultiThreadQueryExample {
public static void main(String[] args) {
// 模拟要查询的数据
List<String> data = new ArrayList<>();
for (int i = 1; i <= 100; i++) {
data.add("Data " + i);
}
// 创建CompletableFuture列表,用于存储每个查询的结果
List<CompletableFuture<List<String>>> futures = new ArrayList<>();
// 每个CompletableFuture负责查询一部分数据
int batchSize = 20;
for (int i = 0; i < data.size(); i += batchSize) {
int endIndex = Math.min(i + batchSize, data.size());
List<String> subList = data.subList(i, endIndex);
CompletableFuture<List<String>> future = CompletableFuture.supplyAsync(() -> performQuery(subList));
futures.add(future);
}
// 使用CompletableFuture的allOf方法等待所有查询完成
CompletableFuture<Void> allFutures = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
// 当所有查询完成时,对所有结果进行合并
CompletableFuture<List<String>> mergedResult = allFutures.thenApply(v ->
futures.stream()
.map(CompletableFuture::join)
.flatMap(List::stream)
.collect(Collectors.toList())
);
// 阻塞等待合并结果
List<String> result = mergedResult.join();
// 输出合并后的结果
System.out.println("Merged Result:");
for (String r : result) {
System.out.println(r);
}
}
private static List<String> performQuery(List<String> data) {
// 在这里进行查询操作,返回查询结果
// 模拟查询耗时
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 返回查询结果
List<String> result = new ArrayList<>();
for (String item : data) {
result.add("Result for " + item);
}
return result;
}
}
在这个示例中,我们使用CompletableFuture.supplyAsync()
方法将查询任务提交给线程池执行异步查询操作,并返回一个CompletableFuture
对象表示查询的结果。所有的CompletableFuture
对象存储在futures
列表中。
然后,我们使用CompletableFuture.allOf()
方法等待所有查询任务完成。当所有任务都完成时,我们使用thenApply()
方法对所有结果进行合并操作,最终得到一个合并后的CompletableFuture<List<String>>
对象。
通过调用join()
方法,我们可以阻塞等待合并结果的完成,并获取最终的查询结果。
请注意,CompletableFuture
还提供了其他一些方法,如thenCompose()
、thenCombine()
等,可以进一步实现异步任务的流水线处理和组合操作,以满足不同的业务需求。
future.join()与future.get()的区别
future.join()
和 future.get()
都可以用于获取 CompletableFuture
的结果,但在具体的使用上有一些差异。
- 异常处理:
join()
方法在遇到异常时会将其包装在CompletionException
中直接抛出;而get()
方法会抛出InterruptedException
(需要处理线程中断)和ExecutionException
(需要处理任务执行过程中发生的异常)。因此,在使用时需要根据具体情况选择合适的异常处理方式。 - 受检异常:
get()
方法声明了抛出InterruptedException
和ExecutionException
这两个受检异常,因此在使用get()
方法时必须显式地处理这两种异常,或者将它们往上层抛出。相比之下,join()
方法没有声明受检异常,所以不需要在代码中强制处理。 - 线程中断:
get()
方法会抛出InterruptedException
,这意味着在调用get()
时,如果当前线程被中断,就会抛出该异常。这要求在使用get()
方法时必须处理线程中断的情况。而join()
方法并不抛出InterruptedException
,所以在调用join()
时不会对线程中断状态进行检查和处理。 - 返回值和异常封装:
get()
方法返回的是Future
的结果对象,通过调用get()
方法可以获取这个结果,或者通过isDone()
方法判断是否完成。而join()
方法直接返回结果值,并且如果遇到异常,会将异常包装在CompletionException
中抛出。
总结来说,join()
方法更加简洁,不需要显式处理受检异常和线程中断,但对于异常处理,其抛出的异常类型相对固定,无法区分具体的异常类型。而 get()
方法需要显式处理受检异常和线程中断,但可以更细粒度地对不同类型的异常进行处理。