reduce操作用于对数据进行聚合,比如求和等。
一、reduce(BinaryOperator
例子:
List<User> users = new ArrayList<>();
users.add(new User("张三",30));
users.add(new User("李四",39));
users.add(new User("王五",20));
Optional<Integer> reduce = users.stream().map(User::getAge).reduce((a, b) -> a + b);
System.out.println(reduce.get());
输出:
源码分析:
ReferencePipeline#reduce(BinaryOperator<P_OUT> accumulator)
public final Optional<P_OUT> reduce(BinaryOperator<P_OUT> accumulator) {
return evaluate(ReduceOps.makeRef(accumulator));
}
进入ReduceOps.makeRef(BinaryOperator
ReduceOps#makeRef(BinaryOperator
public static <T> TerminalOp<T, Optional<T>>
makeRef(BinaryOperator<T> operator) {
Objects.requireNonNull(operator);
class ReducingSink
implements AccumulatingSink<T, Optional<T>, ReducingSink> {
private boolean empty;
private T state;
public void begin(long size) {
empty = true;
state = null;
}
@Override
public void accept(T t) {
if (empty) {
empty = false;
state = t;
} else {
state = operator.apply(state, t);
}
}
@Override
public Optional<T> get() {
return empty ? Optional.empty() : Optional.of(state);
}
@Override
public void combine(ReducingSink other) {
if (!other.empty)
accept(other.state);
}
}
return new ReduceOp<T, Optional<T>, ReducingSink>(StreamShape.REFERENCE) {
@Override
public ReducingSink makeSink() {
return new ReducingSink();
}
};
}
ReduceOps会调用makeSink获取Sink。begin方法设置empty和state的初始值。看看ReducingSink里的accept方法。第一次进入,empty为true,将state设置t,以后将t和state运行BinaryOperator聚合数据且将结果赋给state。如果Stream中没有元素则empty为true,state为null。
二、 reduce(T identity, BinaryOperator
identity是初始值,功能和reduce(BinaryOperator
例子:
List<User> users = new ArrayList<>();
users.add(new User("张三",30));
users.add(new User("李四",39));
users.add(new User("王五",20));
Integer reduce = users.stream().map(User::getAge).reduce(0,(a, b) -> a + b);
System.out.println(reduce);
输出:
源码分析:
ReferencePipeline#reduce(final P_OUT identity, final BinaryOperator<P_OUT> accumulator)
public final P_OUT reduce(final P_OUT identity, final BinaryOperator<P_OUT> accumulator) {
return evaluate(ReduceOps.makeRef(identity, accumulator, accumulator));
}
进入ReduceOps.makeRef(U seed, BiFunction<U, ? super T, U> reducer, BinaryOperator<U> combiner )
方法
makeRef(U seed, BiFunction<U, ? super T, U> reducer, BinaryOperator<U> combiner) {
Objects.requireNonNull(reducer);
Objects.requireNonNull(combiner);
class ReducingSink extends Box<U> implements AccumulatingSink<T, U, ReducingSink> {
@Override
public void begin(long size) {
state = seed;
}
@Override
public void accept(T t) {
state = reducer.apply(state, t);
}
@Override
public void combine(ReducingSink other) {
state = combiner.apply(state, other.state);
}
}
return new ReduceOp<T, U, ReducingSink>(StreamShape.REFERENCE) {
@Override
public ReducingSink makeSink() {
return new ReducingSink();
}
};
}
ReduceOps会调用makeSink获取Sink。ReducingSink的begin将state设置为seed,seed就是传进去的初始值。每次调用accept都会运行BiFunction将state 和 t执行聚合。
三、reduce(U identity, BiFunction<U, ? super T, U> accumulator, BinaryOperator<U> combiner)
第一个参数是返回值的初始值,accumulator是对Stream中每个元素执行的聚合操作,combiner是在并行流中对每个线程的执行结果进行合并。
例子
List<User> users = new ArrayList<>();
users.add(new User("张三",30));
users.add(new User("李四",34));
users.add(new User("王五",20));
ArrayList<User> reduce1 = users.stream().reduce(new ArrayList<User>(), (list, user) -> {
if (user.getAge() >= 30){
list.add(user);
}
System.out.println("执行accumulator");
return list;
}, ((users1, users2) -> {
System.out.println("执行combiner");
users1.addAll(users2);
return users1;
}));
System.out.println(reduce1);
输出:
源码分析:
ReferencePipeline#reduce(R identity, BiFunction<R, ? super P_OUT, R> accumulator, BinaryOperator
public final <R> R reduce(R identity, BiFunction<R, ? super P_OUT, R> accumulator, BinaryOperator<R> combiner) {
return evaluate(ReduceOps.makeRef(identity, accumulator, combiner));
}
进入ReduceOps.makeRef。
ReduceOps#makeRef
public static <T, U> TerminalOp<T, U>
makeRef(U seed, BiFunction<U, ? super T, U> reducer, BinaryOperator<U> combiner) {
Objects.requireNonNull(reducer);
Objects.requireNonNull(combiner);
class ReducingSink extends Box<U> implements AccumulatingSink<T, U, ReducingSink> {
@Override
public void begin(long size) {
state = seed;
}
@Override
public void accept(T t) {
state = reducer.apply(state, t);
}
@Override
public void combine(ReducingSink other) {
state = combiner.apply(state, other.state);
}
}
return new ReduceOp<T, U, ReducingSink>(StreamShape.REFERENCE) {
@Override
public ReducingSink makeSink() {
return new ReducingSink();
}
};
}
关注ReducingSink。ReducingSink#begin将state初始化为seed,accept对Stream中的每个元素执行reducer操作。
标签:BinaryOperator,ReducingSink,reduce,public,state,引用,new,java8 From: https://www.cnblogs.com/shigongp/p/17224405.html