Stream是跟随Lambda表达式一起发布的java8新特性。是支持串行和并行处理数据的工具。有四种类型的Stream。在StreamShape枚举中定义了Stream的类型。分别是REFERENCE(引用流,对应的是类BaseStream),INT_VALUE(元素是int的Stream,对应类IntStream),LONG_VALUE(元素是long的Stream,对应类LongStream),DOUBLE_VALUE(元素是double的Stream,对应类DoubleStream)。同时还要考虑中间操作的数据传递,每个操作之间如何聚合以及聚合后数据如何处理等。Stream主要分为三个部分,分别是头部,中间操作和终止操作。头部主要是提供数据以及数据的特征。中间操作是对数据处理方式。终止操作时才对数据进行数据。即惰性求值。中间操作有两种特征,分别是无状态操作和有状态操作。无状态操作是元素的处理不受之前元素的的影响,有状态操作是拿到所有元素后才能操作下一步。终止操作有短路操作和非短路操作,短路操作是求得满足条件的值之后就不再处理,非短路操作是处理所有元素后才返回。
Stream操作分类如下:
BaseStream类是所有流类的顶级接口。
public interface BaseStream<T, S extends BaseStream<T, S>>
extends AutoCloseable {
Iterator<T> iterator();
Spliterator<T> spliterator();
boolean isParallel();
S sequential();
S parallel();
S unordered();
S onClose(Runnable closeHandler);
@Override
void close();
}
函数名 | 作用 |
---|---|
iterator() | 返回此流元素的迭代器。 |
spliterator() | 返回此流元素的拆分器。 |
isParallel(); | 如果返回true表示流并行执行。 |
sequential(); | 返回顺序流。 |
parallel(); | 返回并行流。 |
unordered(); | 返回无序流。 |
onClose(Runnable closeHandler); | 返回具有附加关闭处理程序的等效流。当close()方法在流上调用,并按它们的顺序执行。 |
close(); | 关闭流。 |
操作之间的数据聚合由Sink表示。
interface Sink<T> extends Consumer<T> {
default void begin(long size) {}
default void end() {}
default boolean cancellationRequested() {
return false;
}
default void accept(int value) {
throw new IllegalStateException("called wrong accept method");
}
default void accept(long value) {
throw new IllegalStateException("called wrong accept method");
}
default void accept(double value) {
throw new IllegalStateException("called wrong accept method");
}
interface OfInt extends Sink<Integer>, IntConsumer {
@Override
void accept(int value);
@Override
default void accept(Integer i) {
if (Tripwire.ENABLED)
Tripwire.trip(getClass(), "{0} calling Sink.OfInt.accept(Integer)");
accept(i.intValue());
}
}
interface OfLong extends Sink<Long>, LongConsumer {
@Override
void accept(long value);
@Override
default void accept(Long i) {
if (Tripwire.ENABLED)
Tripwire.trip(getClass(), "{0} calling Sink.OfLong.accept(Long)");
accept(i.longValue());
}
}
interface OfDouble extends Sink<Double>, DoubleConsumer {
@Override
void accept(double value);
@Override
default void accept(Double i) {
if (Tripwire.ENABLED)
Tripwire.trip(getClass(), "{0} calling Sink.OfDouble.accept(Double)");
accept(i.doubleValue());
}
}
static abstract class ChainedReference<T, E_OUT> implements Sink<T> {
protected final Sink<? super E_OUT> downstream;
public ChainedReference(Sink<? super E_OUT> downstream) {
this.downstream = Objects.requireNonNull(downstream);
}
@Override
public void begin(long size) {
downstream.begin(size);
}
@Override
public void end() {
downstream.end();
}
@Override
public boolean cancellationRequested() {
return downstream.cancellationRequested();
}
}
static abstract class ChainedInt<E_OUT> implements Sink.OfInt {
protected final Sink<? super E_OUT> downstream;
public ChainedInt(Sink<? super E_OUT> downstream) {
this.downstream = Objects.requireNonNull(downstream);
}
@Override
public void begin(long size) {
downstream.begin(size);
}
@Override
public void end() {
downstream.end();
}
@Override
public boolean cancellationRequested() {
return downstream.cancellationRequested();
}
}
static abstract class ChainedLong<E_OUT> implements Sink.OfLong {
protected final Sink<? super E_OUT> downstream;
public ChainedLong(Sink<? super E_OUT> downstream) {
this.downstream = Objects.requireNonNull(downstream);
}
@Override
public void begin(long size) {
downstream.begin(size);
}
@Override
public void end() {
downstream.end();
}
@Override
public boolean cancellationRequested() {
return downstream.cancellationRequested();
}
}
static abstract class ChainedDouble<E_OUT> implements Sink.OfDouble {
protected final Sink<? super E_OUT> downstream;
public ChainedDouble(Sink<? super E_OUT> downstream) {
this.downstream = Objects.requireNonNull(downstream);
}
@Override
public void begin(long size) {
downstream.begin(size);
}
@Override
public void end() {
downstream.end();
}
@Override
public boolean cancellationRequested() {
return downstream.cancellationRequested();
}
}
}
Consumer接口是Lambda表达式用于处理不返回值的操作。Sink是Consumer的一个扩展,用于在具有管理大小信息的附加方法的流管道,在调用accept()方法之前,必须首先调用begin()方法通知它数据即将到来,并且在所有数据发送后,您必须调用end()方法。调用end()后,不应调用accept()。Sink也是提供了一种机制,通过调用cancelueRequest()方法,接收器可以协同发出信号不希望接收更多数据,源可以在将更多数据发送到Sink。
TerminalOp表示终止操作,对数据进行实际处理。
interface TerminalOp<E_IN, R> {
default StreamShape inputShape() { return StreamShape.REFERENCE; }
default int getOpFlags() { return 0; }
default <P_IN> R evaluateParallel(PipelineHelper<E_IN> helper,
Spliterator<P_IN> spliterator) {
if (Tripwire.ENABLED)
Tripwire.trip(getClass(), "{0} triggering TerminalOp.evaluateParallel serial default");
return evaluateSequential(helper, spliterator);
}
<P_IN> R evaluateSequential(PipelineHelper<E_IN> helper,
Spliterator<P_IN> spliterator);
}
函数名 | 作用 |
---|---|
inputShape() | 返回流类型 |
getOpFlags() | 返回流操作标志 |
evaluateParallel | 使用PipelineHelper并行执行流操作 |
evaluateSequential | 使用PipelineHelper顺序执行流操作 |