使用Parallel Stream时,在适当的环境中,通过适当地使用并行度级别,可以在某些情况下获得性能提升。
如果程序创建一个自定义ThreadPool,必须记住调用它的shutdown()方法来避免内存泄漏。
Parallel Stream默认使用的线程池
如下代码示例,Parallel Stream并行处理使用的线程池是ForkJoinPool.commonPool(),这个线程池是由整个应用程序共享的线程池。
@Test
public void givenList_whenCallingParallelStream_shouldBeParallelStream(){
List<Long> aList = new ArrayList<>();
Stream<Long> parallelStream = aList.parallelStream();
assertTrue(parallelStream.isParallel());
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
如何自定义线程池
简单示例
如下代码示例说明如下:
- 使用的ForkJoinPool构造函数的并行级别为4。为了确定不同环境下的最佳值,需要进行一些实验,但一个好的经验法则是根据CPU的核数选择数值。
- 接下来,处理并行流的内容,在reduce调用中对它们进行汇总。
这个简单的示例可能不能充分说明使用自定义线程池的用处,但是在不希望将公共线程池与长时间运行的任务绑定在一起(例如处理来自网络源的数据)或应用程序中的其他组件正在使用公共线程池的情况下,其好处就很明显了。
@Test
public void giveRangeOfLongs_whenSummedInParallel_shouldBeEqualToExpectedTotal()
throws InterruptedException, ExecutionException {
long firstNum = 1;
long lastNum = 1_000_000;
List<Long> aList = LongStream.rangeClosed(firstNum, lastNum).boxed()
.collect(Collectors.toList());
ForkJoinPool customThreadPool = new ForkJoinPool(4);
long actualTotal = customThreadPool.submit(
() -> aList.parallelStream().reduce(0L, Long::sum)).get();
assertEquals((lastNum + firstNum) * lastNum / 2, actualTotal);
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
复杂点的示例
通过查看日志,可以看到使用了自定义的线程池,提高了并发处理的效率
@Test
public void testCustomThreadPool() throws ExecutionException, InterruptedException {
List<Long> firstRange = LongStream.rangeClosed(1, 10).boxed()
.collect(Collectors.toList());
List<Long> secondRange = LongStream.rangeClosed(5000, 6000).boxed()
.collect(Collectors.toList());
ForkJoinPool forkJoinPool = new ForkJoinPool(3);
Future<Long> future = forkJoinPool.submit(() -> {
return firstRange.parallelStream().map((number) -> {
try {
print(Thread.currentThread().getName() +" 正在处理 "+number);
Thread.sleep(5);
} catch (InterruptedException e) {
}finally {
return number;
}
}).reduce(0L, Long::sum);
});
assertEquals((1 + 10) * 10 / 2, future.get());
forkJoinPool.shutdown();
ForkJoinPool forkJoinPool2 = new ForkJoinPool(10);
forkJoinPool2.submit(() -> {
secondRange.parallelStream().forEach((number) -> {
try {
print(Thread.currentThread().getName() +" 正在处理 "+number);
Thread.sleep(1);
} catch (InterruptedException e) {
}
});
});
forkJoinPool2.shutdown();
TimeUnit.SECONDS.sleep(2);
}
private static void print(String msg){
System.out.println(msg);
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
求质数
实现方案有如下2种:
- 将Parallel task 直接提交给自定义的ForkJoinPool中
- 将自定义线程池传递到完整的future.supplyAsync方法中
public class StreamTest {
@Test
public void testCompletableFuture()throws InterruptedException, ExecutionException {
ForkJoinPool forkJoinPool = new ForkJoinPool(2);
CompletableFuture<List<Long>> primes = CompletableFuture.supplyAsync(() ->
//parallel task here, for example
range(1, 1_000_000).parallel().filter(StreamTest::isPrime).boxed().collect(toList()),
forkJoinPool
);
forkJoinPool.shutdown();
System.out.println(primes.get());
}
@Test
public void testCustomForkJoinPool() throws InterruptedException {
final int parallelism = 4;
ForkJoinPool forkJoinPool = null;
try {
forkJoinPool = new ForkJoinPool(parallelism);
final List<Integer> primes = forkJoinPool.submit(() ->
// Parallel task here, for example
IntStream.range(1, 1_000_000).parallel()
.filter(StreamTest::isPrime)
.boxed().collect(Collectors.toList())
).get();
System.out.println(primes);
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
} finally {
if (forkJoinPool != null) {
forkJoinPool.shutdown();
}
}
}
public static boolean isPrime(long n) {
return n > 1 && rangeClosed(2, (long) sqrt(n)).noneMatch(divisor -> n % divisor == 0);
}
private static void print(String msg){
System.out.println(msg);
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
注意事项:小心内存泄漏【Memory Leak】
正如前面所讨论的,整个应用程序默认使用公共线程池。公共线程池是一个静态ThreadPool实例。
因此,如果使用默认线程池,就不会发生内存泄漏。
但是针对使用自定义线程池的场景下,customThreadPool对象不会被解引用和垃圾收集——相反,它将等待分配新任务【the customThreadPool object won’t be dereferenced and garbage collected — instead, it will be waiting for new tasks to be assigned】
也就是说,每次调用测试方法时,都会创建一个新的customThreadPool对象,并且它不会被释放。
解决这个问题很简单:在执行了这个方法之后关闭customThreadPool对象:
@Test
public void giveRangeOfLongs_whenSummedInParallel_shouldBeEqualToExpectedTotal()
throws InterruptedException, ExecutionException {
long firstNum = 1;
long lastNum = 1_000_000;
List<Long> aList = LongStream.rangeClosed(firstNum, lastNum).boxed()
.collect(Collectors.toList());
ForkJoinPool customThreadPool = new ForkJoinPool(4);
try {
long actualTotal = customThreadPool.submit(
() -> aList.parallelStream().reduce(0L, Long::sum)).get();
assertEquals((lastNum + firstNum) * lastNum / 2, actualTotal);
}finally {
customThreadPool.shutdown();
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19