小蒋:
在java开发的不同场景下,会出现很多种可能性。所以掌握多种编程的方法可以让开发更多可能。
Java中的Stream流操作是Java 8及更高版本中引入的一种高级数据处理机制,它提供了一种声明式的编程模型,用于处理集合、数组或其他数据源中的数据序列。Stream API使得开发者能够以函数式编程风格编写代码,简洁高效地执行批量数据处理任务,比如过滤、映射、排序、统计、聚合以及并行计算等。
Stream流的开发使用场景是什么?
Java 8 中引入的Stream API提供了一种声明式的编程模型来处理集合数据。Stream流的开发使用场景非常广泛,主要集中在批量数据处理、聚合操作、并行计算以及简化代码等方面。以下是几个典型的Stream流使用场景:
数据过滤:
从一个集合中筛选出满足特定条件的元素,例如获取所有年龄大于18岁的用户列表。
List<User> adults = userList.stream() .filter(user -> user.getAge() > 18) .collect(Collectors.toList());
数据映射
对集合中的每个元素应用某种转换操作,例如将用户列表中的用户名转换为大写
List<String> uppercaseNames = userList.stream()
.map(User::getName)
.map(String::toUpperCase)
.collect(Collectors.toList());
数据排序
对集合中的元素进行排序,如按照年龄升序或降序排列用户列表
List<User> sortedUsers = userList.stream()
.sorted(Comparator.comparing(User::getAge))
.collect(Collectors.toList());
数据聚合
计算集合中的统计信息,例如求所有用户年龄的总和、最大值或平均值
int totalAge = userList.stream()
.mapToInt(User::getAge)
.sum();
double averageAge = userList.stream()
.mapToInt(User::getAge)
.average()
.orElse(0);
连接多个数据源:
将两个或多个集合合并或按某种逻辑关联起来,例如通过姓名将用户列表与另一个包含用户地址的列表关联起来
Map<String, Address> userAddresses = users.stream()
.flatMap(user -> user.getAddresses().stream()
.map(addr -> new AbstractMap.SimpleEntry<>(user.getName(), addr)))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
并行处理
利用多核处理器的优势,在多个线程间划分工作负载,加速大规模数据集的处理
List<Integer> processedData = dataList.parallelStream()
.map(data -> expensiveComputation(data))
.collect(Collectors.toList());
文件/IO操作
虽然题目中有提到“流”在文件操作中的概念,但这里的“流”指的是I/O流,而非Java 8的Stream API。不过,Java 8 Stream API也可以用于处理文件内容,例如逐行读取文本文件并进行处理
Files.lines(Paths.get("input.txt"))
.filter(line -> line.startsWith("//")) // 过滤注释行
.map(line -> line.substring(2)) // 移除注释符号
.forEach(System.out::println); // 输出非注释行
Stream流的运行性能解析:
Java Stream API 自从Java 8首次引入以来,其设计目标之一就是提高数据处理效率,尤其是针对大量数据时可以通过并行流实现高性能。在不同JDK版本中,Stream API的性能表现会随着JDK的优化和改进而有所提升
单线程运行性能解析:
在单线程环境下,Stream API的核心在于它的内部优化和延迟计算(lazy evaluation)。这意味着只有当实际需要结果的时候,Stream的操作才会真正执行,从而减少不必要的计算。例如,filter() 和 map() 操作会尽可能地合并在一起,减少迭代次数。不过,单线程流的性能提升更多依赖于算法优化和底层JVM的改进。
并发线程运行性能解析:
Java 8及后续版本中,Stream API支持并行流(parallel streams),可以利用多核处理器的优势来分散计算任务。在并行流中,数据源会被分成多个部分,然后在不同的线程上同时处理,最后再合并结果。为了实现高效的并行处理,JDK内部对分割任务、调度线程池、合并结果等环节做了大量的优化工作。
然而,并不是所有的Stream操作都适合并行化,也不是所有数据集都能从并行处理中获得显著的性能提升。较小的数据集由于线程创建和上下文切换的开销,可能并行处理反而不如单线程快。此外,如果数据集的内在结构不利于切分或者中间操作存在过多的状态共享,也可能影响到并行流的性能。
随着时间推移,JDK的更新不断优化了Stream API的并行处理能力,包括但不限于:
在JDK 9中,对ForkJoinPool进行了调整,改进了线程池管理机制。
后续版本中可能进一步优化了任务调度策略,减少了内存分配和同步开销,提高了CPU缓存利用率等。
Stream流的核心概念包括:
1.流(Stream): 不是数据结构,而是一个可以从支持数据源获取数据的通道。它可以是从集合、数组、I/O资源等产生的连续数据序列。
2.数据源(Source): 创建Stream的源头,可以是集合(如List、Set)、数组、文件、生成器函数(generate)等。
3.中间操作(Intermediate Operations): 一系列不会立即执行的惰性操作,它们会构建一个流水线,比如filter(过滤)、map(映射)、sorted(排序)、distinct(去重)等。这些操作会创建一个新的Stream,并且原Stream保持不变。
4.终止操作(Terminal Operations): 执行中间操作后最终触发流上数据处理的操作,如collect(收集到集合中)、forEach(遍历执行消费者函数)、reduce(归约求和、求最大值等)、anyMatch(是否存在满足条件的元素)等。一旦执行了终止操作,Stream管道就会被执行,并产生最终结果。
5.并行流(Parallel Streams): Stream API还支持自动并行化处理,当对大量数据进行操作时,可以利用多核处理器的优势,显著提高处理速度。
Stream的基础语法
Stream API的设计围绕三个核心概念:数据源(source)、中间操作(intermediate operations)和终止操作(terminal operations)。下面是一些关键点的概述:
数据源(Source)
首先需要创建一个Stream来表示数据序列。常见的创建Stream的方式有:
从集合创建
List<String> list = Arrays.asList("a", "b", "c");
Stream<String> streamFromList = list.stream();
从数组创建
String[] array = {"d", "e", "f"};
Stream<String> streamFromArray = Arrays.stream(array);
使用Stream.of()静态方法
Stream<String> streamFromStaticMethod = Stream.of("g", "h", "i");
Stream.of("g", "h", "i") 是Java 8引入的一个静态工厂方法,用于创建一个包含指定元素的串行流(顺序流)。
在底层实现上,java.util.stream.Stream.of() 方法其实构建了一个固定大小的数组来存储提供的元素,然后返回一个基于该数组的 Stream 实例。这个 Stream 的 spliterator() 方法被设计为按顺序遍历数组元素。
我们可以简单剖析一下 Stream.of() 的行为,尽管具体的实现细节依赖于JDK的源码,大致过程如下:
1.创建一个足够容纳传递给 of() 方法的所有元素的数组(对于字符串来说,是一个 String[] 数组)。
2.将每个传入的元素复制到数组中。
3.创建一个 Arrays.Spliterator 对象,它是数组的分割迭代器,负责按照数组的顺序遍历元素。
4.基于这个 Spliterator 构建一个 Stream 实例,并返回给调用者。
中间操作(Intermediate Operations)
中间操作是链式调用的一部分,它们不会直接计算结果,而是创建新的Stream,其中包含了原始数据经过特定转换后的元素。中间操作通常都是懒加载执行的,只有在遇到终止操作时才会真正开始处理数据。常见的中间操作包括:
过滤(Filtering)
Stream<String> filteredStream = streamFromList.filter(s -> s.startsWith("a"));
映射(Mapping)
Stream<Integer> mappedStream = streamFromArray.map(String::length);
排序(Sorting)
Stream<String> sortedStream = streamFromList.sorted();
// 或者按自定义比较器排序
Stream<String> customSortedStream = streamFromList.sorted((s1, s2) -> s1.compareTo(s2));
终止操作(Terminal Operations)
终止操作会触发中间操作的执行,并得出最终结果。执行终止操作后,Stream pipeline 不可再使用。常见的终止操作包括:
收集(Collecting)
将Stream中的元素转换为集合、数组或其他形式的结果。
List<String> collectedList = streamFromList.collect(Collectors.toList());
查找与匹配(Find and Match)
Optional<String> firstMatch = streamFromList.findAny();
boolean allMatchCondition = streamFromList.allMatch(s -> s.length() > 1);
使用了Java 8中的Stream API来处理集合流的操作。假设streamFromList是从某个列表(例如ArrayList、LinkedList等)生成的流,里面包含多个字符串元素。接下来解释每行代码的作用:
1.Optional<String> firstMatch = streamFromList.findAny();
1.1.findAny() 是Stream API中的一个终端操作,它从流中任意返回一个满足条件的元素(如果流为空,则返回一个空的Optional)。在这里,它没有指定任何特定的筛选条件,因此它会随机选择流中的一个字符串元素并将其封装在Optional对象中。这意味着firstMatch变量可能包含流中的任意一个字符串,或者如果流为空则为Optional.empty()。
2.boolean allMatchCondition = streamFromList.allMatch(s -> s.length() > 1);
2.1.allMatch() 同样是一个终端操作,它检查流中的所有元素是否都满足给定的布尔条件。这里的条件是一个Lambda表达式 s -> s.length() > 1,表示字符串s的长度是否大于1。
2.2.执行这行代码后,allMatchCondition 变量将会是true,当且仅当流中的每一个字符串元素的长度都大于1。如果流中有任何一个字符串长度不大于或等于1,那么 allMatchCondition 将会是false。
综上所述,这段代码的目的一个是获取流中的任意一个字符串,另一个则是判断流中的所有字符串是否都是至少包含两个字符的。
归约(Reducing)
int sum = streamFromArray.mapToInt(Integer::intValue).sum();
完整的 描述一下这个代码:
int[] array = ... // 假设这里有一个整数数组
IntStream streamFromArray = Arrays.stream(array); // 将数组转换为IntStream
int sum = streamFromArray
.mapToInt(Integer::intValue)
.sum();
// 或者更简洁地直接从数组创建IntStream:
int sum = Arrays.stream(array)
.mapToInt(Integer::intValue)
.sum();
--- Arrays.stream(array) 创建了一个 IntStream,它是专门用于处理整数类型的流,这里的流包含了数组中的所有元素。
--- .mapToInt(Integer::intValue) 是一个映射(map)操作,它将流中的每个元素(在此例中实际上是自动装箱后的 Integer 对象)转换为其对应的原始 int 类型值。这里 Integer::intValue 是一个方法引用来指向 Integer 类的 intValue() 方法,此方法就是将 Integer 对象转换为 int 类型的。
--- .sum() 是一个归约(reduction)操作,它将流中的所有 int 值累加起来,最终得到它们的总和。
关于 :: 符号,这是Java 8引入的方法引用(Method Reference)语法的一部分,它允许我们直接引用某个类或对象的方法作为函数式接口的实现。在这个例子中,Integer::intValue 就是指向 Integer.intValue() 方法的一个简写形式。
forEach(迭代)
streamFromList.forEach(System.out::println);
小重点:
并行流(Parallel Streams)
Stream API也支持并行处理,通过.parallel()
方法可以将顺序流转换为并行流,从而利用多核处理器的优势提升性能。
Stream<String> parallelStream = streamFromList.parallel();
在整个Stream流中并行流是加速的很好的方式。
所以这里延伸一下。
Java 8 引入了并行流(Parallel Streams),它可以充分利用现代多核处理器的优势,在处理大量数据时显著提高性能。并行流会在后台自动把大的数据集分割成多个部分,然后在不同的线程上同时处理这些部分,最后合并结果。
以下是并行流的一些基本用法:
创建并行流
从现有的流转换: 如果你已经有了一个顺序流,可以通过调用 parallel() 方法将其转换为并行流。
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, ...); // 大量数据
Stream<Integer> sequentialStream = numbers.stream();
Stream<Integer> parallelStream = sequentialStream.parallel();
直接从集合创建并行流: 也可以直接从集合创建并行流。
Stream<Integer> parallelStreamDirectly = numbers.parallelStream();
使用并行流进行操作
假设我们有一个需求是计算一个整数列表所有数字的和,我们可以对比顺序流和并行流的实现:
// 序列流求和
long sumSequential = numbers.stream()
.mapToInt(Integer::intValue)
.sum();
// 并行流求和
long sumParallel = numbers.parallelStream()
.mapToInt(Integer::intValue)
.sum();
在上面的例子中,mapToInt 和 sum 都是终止操作,当使用并行流时,它们会在多个线程上分别对数据的不同部分进行映射和求和,然后再合并结果。
注意事项
·数据依赖与线程安全性:对于包含状态的非线程安全对象的操作,需要确保线程安全或者避免数据依赖性,否则可能会导致错误的结果。
·负载均衡:并行流内部会尝试平衡不同线程间的负载,但并不是所有的操作都能从中受益,特别是当数据集较小或操作本身开销较大时,可能并行处理并不会比顺序处理更快。
·性能评估:使用并行流前,最好先进行性能测试,因为并行化并不总是能带来性能提升,取决于数据规模、处理器核心数量以及具体操作的性质。
·短小的任务不适合并行化:对于非常小的数据集或者非常快就能完成的任务,由于线程创建和上下文切换的成本,使用并行流可能反而会导致效率下降。
因此,正确地使用并行流要求开发者了解其内部原理,并针对具体的场景选择合适的方法。在实践中,往往需要结合Spliterator的特性和底层硬件资源来进行有效的并行处理。