流基础
流是数据管道,表示一系列数据。流的操作是针对数据源来说的,但是流的操作不会改变数据源的数据,只会产生新的流。
最基础的流是 BaseStream 接口。
interface BaseStream<T, S extends BaseStream<T, S>>
T 表示流中数据的类型,S 表示扩展了 BaseStream 的流。BaseStream 扩展了 AutoCloseable 接口。
Stream 接口继承了 BaseStream 接口。
interface Stream<T>
T 表示流中数据的类型,任何引用类型数据都可以使用这个流。
流的操作分为两种,第一种是终止(terminal)操作,该操作会消耗流,产生一个结果,被消耗的流不能再次使用;另一种是中间(intermediate)操作,该操作会产生另一个流,可以用于创建管道,进行多个操作。中间操作不会立即执行,只有当终止操作需要执行时,前面的中间操作才会执行,称为 lazy behavior,这个机制可以提高执行效率。
中间操作又可以分为无状态(stateless)和有状态的(stateful)两种。无状态表示流中每个元素独立处理,与其他元素无关;有状态表示流中一个元素的处理依赖于其他元素。在并发处理流时操作有无状态很重要。
Stream 流中的元素必须是引用类型,为了处理原始类型,提供了三种继承自 BaseStream 的流接口,如下所示
- DoubleStream
- IntStream
- LongStream
这些流除了用于处理原始类型,其他方面与 Stream 接口相似。
JDK8 开始,Collection 接口引入了 stream()
方法用于获取流。
// 返回流
default Stream<E> stream();
// 如果可以,返回并发流;否则返回流
default Stream<E> parallelStream();
Arrays 类的方法 stream()
方法将数组作为流的数据源。形式之一为
static <T> Stream<T> stream(T[] array);
其他形式的 stream()
可以返回用于原始类型的三种流。
BufferedReader 的 lines()
方法返回流。
public static void m() {
var list = new ArrayList<Integer>();
for (int i = 0; i < 10; i++) {
list.add(i);
}
Stream<Integer> s = list.stream();
Optional<Integer> min = s.min(Integer::compare);
if (min.isPresent()) {
min.get();
}
s = list.stream();
Optional<Integer> max = s.max(Integer::compare);
if (max.isPresent()) {
max.get();
}
list.stream().sorted().forEach(e -> System.out.print(e + " "));
System.out.println();
list.stream().filter(e -> (e % 2) == 1).filter(e -> e > 5)
.forEach(e -> System.out.print(e + " "));
System.out.println();
}
Optional 对象要么包含一个值要么为空。值的类型由 T 指定。
class Optional<T>
归约操作
归约操作(reduction operations)是对流进行处理,最终得到一个值的操作类型。流的 count()
方法返回流中数据的个数。reduce()
方法可以使用指定条件对流进行处理得到一个值。
Optional<T> reduce(BinaryOperator<T> accu);
T reduce(T i, BinaryOperator<T> accu);
T 表示流中数据类型,i
必须满足流中任一数据和它执行 accu 操作得到的结果是该数据。例如,若 accu 为加法,则 i = 0;如果 accu 为乘法,则 i = 1。BinaryOperator 接口继承自 BiFunction 接口,BiFunction 的抽象方法为
R apply(T v1, U v2)
BinaryOperator 的抽象方法为
T apply(T v1, T v2)
v1 根据 reduce()
方法参数的不同首次表示第一个元素或者 i
的值,v2 表示下一个元素。之后,v1 表示最近计算得到的结果,v2 表示下一个元素。accu 操作必须满足三个要求
- 无状态
- 无干扰:流不改变数据源
- 结合性:类比数学中的结合律
结合性对于并发流的操作很重要。
public static void m() {
var list = new ArrayList<Integer>();
for (int i = 0; i < 10; i++) {
list.add(i);
}
Optional<Integer> r1 = list.stream().reduce((a, b) -> a * b);
if (r1.isPresent()) {
r1.get();
}
int r2 = list.stream().reduce(1, (a, b) -> a * b);
}
除了 Collection 接口的 parallelStream()
方法获取并发流之外,流调用 BaseStream 接口的 parallel()
方法也可以获得并发流。当环境支持时,才能使用并发流;否则,并发流自动转变为流。
一般情况下,对并发流执行的操作必须满足结合性要求,其他两个要求也应该满足。
并发流一个特有的 reduce()
方法为
<U> U reduce(U i, BiFunction<U, ? super T, U> accu, BinaryOperator<U> comb);
其中,accu 用于归约操作,comb 用于将部分结果合并,得到最终的结果。
public static void m() {
var list = new ArrayList<Integer>();
for (int i = 0; i < 10; i++) {
list.add(i);
}
double r = list.parallelStream().reduce(1.0,
(a, b) -> a * Math.sqrt(b),
(a, b) -> a * b);
}
并发流可以调用 BaseStream 的 sequential()
方法获得流。
使用并发流时,如果数据源的数据有序,那么流中数据有序。如果对并发流进行操作时顺序不重要,可以调用 BaseStream 接口的 unordered()
方法获得一个无序流,对无序流使用并发操作可以提高性能。
并发流使用 forEach()
方法不会保留流中数据的顺序,如果想按照顺序执行,使用 forEachOrdered()
方法。
Mapping
映射指通过某种规则将一个元素转换成另一个元素。通用方法为 map()
,形式为
<R> Stream<R> map(Function<? super T, ? extends R> mapF);
T 表示当前流中元素类型,R 表示返回流中元素类型。mapF 函数必须是无状态和无干扰的,Function 的抽象方法为
R apply(T val)
public static void m() {
var list = new ArrayList<Integer>();
for (int i = 0; i < 10; i++) {
list.add(i);
}
double r = list.stream().map(e -> Math.sqrt(e)).reduce(1.0, (a, b) -> a * b);
}
map()
方法的其他版本如下
IntStream mapToInt(ToIntFunction<? super T> mapF);
LongStream mapToLong(ToLongFunction<? super T> mapF);
DoubleStream mapToDouble(ToDoubleFunction<? super T> mapF);
ToIntFunction 的方法为 applyAsInt(T v)
,必须返回 int 类型的结果。
public static void m() {
var list = new ArrayList<Double>();
list.add(1.2);
list.add(3.9);
list.add(8.1);
list.add(2.4);
list.stream()
.mapToInt(e -> (int)Math.ceil(e)).forEach(e -> System.out.print(e + " "));
System.out.println();
}
flatMap()
方法用于将一个由聚合元素组成的流拆分成由单个元素组成的流。有 flatMapToXX()
方法,XX 表示 Int/Long/Double。
flatMap(Function<? super T,? extends Stream<? extends R>> mapper)
public static void m() {
String[][] a = new String[][] {
{"a", "b"},
{"c", "d"}
};
var r = Arrays.stream(a).flatMap(Arrays::stream).toArray(String[]::new);
Arrays.stream(a).flatMap(Arrays::stream).forEach(e -> System.out.print(e + " "));
}
Collecting
流的 collect()
方法提供了从流中获取数据构建集合的功能。一种形式如下
<R, A> R collect(Collector<? super T, A, R> colFun);
R 表示结果类型,A 表示中间结果类型,T 表示流中数据类型。Collector 接口定义如下,类型参数同前述
interface Collector<T, A, R>;
Collectors 类定义了若干返回 colletor 的静态方法。例如
static <T> Collector<T, ?, List<T>> toList();
static <T> Collector<T, ?, Set<T>> toSet();
分别返回将流中数据构建 List 和 Set 的 collector。
public static void m() {
var list = new ArrayList<Integer>();
for (int i = 0; i < 10; i++) {
list.add(i);
}
var stream = list.stream().filter(e -> e % 2 == 0);
List<Integer> r1 = stream.collect(Collectors.toList());
Set<Integer> r2 = stream.collect(Collectors.toSet());
}
collect()
方法的另一种形式为
<R> R
collect(Supplier<R> target, BiConsumer<R, ? super T> accu, BiConsumer<R, R> comb);
类似于 reduce()
方法。target 表示创建集合的方式,Supplier 的方法 get()
返回一个 R 类型的引用,accu 表示的 accept(T v1, U v2)
中,v1 表示集合,v2 表示元素;comb 表示的accept(T v1, U v2)
中,v1 和 v2 都表示集合。
LinkedList<Integer> r3 = stream.collect(() -> new LinkedList(),
(a, b) -> a.add(b),
(a, b) -> a.addAll(b));
HashSet<Integer> r4 = stream.collect(HashSet::new,
HashSet::add,
HashSet::addAll);
迭代器和流
流的 iterator()
方法返回和流关联的迭代器。
Iterator<T> iterator();
如果流中数据类型是原始类型,则返回与原始类型对应的流。
public static void m() {
var list = new ArrayList<String>();
list.add("al");
list.add("bc");
list.add("ed");
Iterator<String> ite = list.stream().iterator();
while (ite.hasNext()) {
System.out.print(ite.next() + " ");
}
System.out.println();
}
使用 spliterator 迭代器遍历元素时,使用 tryAdvance()
方法,当有元素需要遍历时,执行遍历操作,遍历操作由参数提供,返回 true;没有下一个元素时,返回 false。
boolean tryAdvance(Consumer<? super T> action);
public static void m() {
var list = new ArrayList<String>();
list.add("al");
list.add("bc");
list.add("ed");
Spliterator<String> ite = list.stream().spliterator();
while (ite.tryAdvance(e -> System.out.print(e + " ")));
System.out.println();
}
Spliterator 接口的 forEachRemaining()
方法接收一个 Consumer 函数式接口对象,对剩余的每个元素执行对象表示的操作。和 tryAdvance()
相比,不需要使用 while 语句。
default void forEachRemaining(Consumer<? super T> action);
Spliterator 接口的 trySplit()
方法将迭代器关联的数据划分成两份,一份由原迭代器关联,另一个份由返回的迭代器关联,并发编程时加快数据的处理。当迭代器不允许分割时,返回 null。
public static void m() {
var list = new ArrayList<String>();
list.add("al");
list.add("bc");
list.add("ed");
Spliterator<String> ite = list.stream().spliterator();
Spliterator<String> it2 = ite.trySplit();
if (it2 != null) {
it2.forEachRemaining(e -> System.out.println(e));
}
System.out.println();
ite.forEachRemaining(e -> System.out.println(e));
}
参考
[1] Herbert Schildt, Java The Complete Reference 11th, 2019.
[2] https://docs.oracle.com/javase/8/docs/api/java/util/stream/Stream.html
[3] java-8-flatmap-example