首页 > 编程语言 >java8新特性-Stream基础

java8新特性-Stream基础

时间:2023-03-04 20:34:54浏览次数:50  
标签:Stream downstream void 特性 public Sink accept Override java8

Stream是跟随Lambda表达式一起发布的java8新特性。是支持串行和并行处理数据的工具。有四种类型的Stream。在StreamShape枚举中定义了Stream的类型。分别是REFERENCE(引用流,对应的是类BaseStream),INT_VALUE(元素是int的Stream,对应类IntStream),LONG_VALUE(元素是long的Stream,对应类LongStream),DOUBLE_VALUE(元素是double的Stream,对应类DoubleStream)。同时还要考虑中间操作的数据传递,每个操作之间如何聚合以及聚合后数据如何处理等。Stream主要分为三个部分,分别是头部,中间操作和终止操作。头部主要是提供数据以及数据的特征。中间操作是对数据处理方式。终止操作时才对数据进行数据。即惰性求值。中间操作有两种特征,分别是无状态操作和有状态操作。无状态操作是元素的处理不受之前元素的的影响,有状态操作是拿到所有元素后才能操作下一步。终止操作有短路操作和非短路操作,短路操作是求得满足条件的值之后就不再处理,非短路操作是处理所有元素后才返回。

Stream操作分类如下:

BaseStream类是所有流类的顶级接口。

    public interface BaseStream<T, S extends BaseStream<T, S>>
        extends AutoCloseable {
   
    Iterator<T> iterator();

   
    Spliterator<T> spliterator();

   
    boolean isParallel();

   
    S sequential();

    
    S parallel();

   
    S unordered();

    S onClose(Runnable closeHandler);

   
    @Override
    void close();
}
函数名 作用
iterator() 返回此流元素的迭代器。
spliterator() 返回此流元素的拆分器。
isParallel(); 如果返回true表示流并行执行。
sequential(); 返回顺序流。
parallel(); 返回并行流。
unordered(); 返回无序流。
onClose(Runnable closeHandler); 返回具有附加关闭处理程序的等效流。当close()方法在流上调用,并按它们的顺序执行。
close(); 关闭流。

操作之间的数据聚合由Sink表示。

interface Sink<T> extends Consumer<T> {
   
    default void begin(long size) {}

   
    default void end() {}

   
    default boolean cancellationRequested() {
        return false;
    }

    default void accept(int value) {
        throw new IllegalStateException("called wrong accept method");
    }

    
    default void accept(long value) {
        throw new IllegalStateException("called wrong accept method");
    }

   
    default void accept(double value) {
        throw new IllegalStateException("called wrong accept method");
    }

    
    interface OfInt extends Sink<Integer>, IntConsumer {
        @Override
        void accept(int value);

        @Override
        default void accept(Integer i) {
            if (Tripwire.ENABLED)
                Tripwire.trip(getClass(), "{0} calling Sink.OfInt.accept(Integer)");
            accept(i.intValue());
        }
    }

   
    interface OfLong extends Sink<Long>, LongConsumer {
        @Override
        void accept(long value);

        @Override
        default void accept(Long i) {
            if (Tripwire.ENABLED)
                Tripwire.trip(getClass(), "{0} calling Sink.OfLong.accept(Long)");
            accept(i.longValue());
        }
    }

  
    interface OfDouble extends Sink<Double>, DoubleConsumer {
        @Override
        void accept(double value);

        @Override
        default void accept(Double i) {
            if (Tripwire.ENABLED)
                Tripwire.trip(getClass(), "{0} calling Sink.OfDouble.accept(Double)");
            accept(i.doubleValue());
        }
    }

  
    static abstract class ChainedReference<T, E_OUT> implements Sink<T> {
        protected final Sink<? super E_OUT> downstream;

        public ChainedReference(Sink<? super E_OUT> downstream) {
            this.downstream = Objects.requireNonNull(downstream);
        }

        @Override
        public void begin(long size) {
            downstream.begin(size);
        }

        @Override
        public void end() {
            downstream.end();
        }

        @Override
        public boolean cancellationRequested() {
            return downstream.cancellationRequested();
        }
    }

   
    static abstract class ChainedInt<E_OUT> implements Sink.OfInt {
        protected final Sink<? super E_OUT> downstream;

        public ChainedInt(Sink<? super E_OUT> downstream) {
            this.downstream = Objects.requireNonNull(downstream);
        }

        @Override
        public void begin(long size) {
            downstream.begin(size);
        }

        @Override
        public void end() {
            downstream.end();
        }

        @Override
        public boolean cancellationRequested() {
            return downstream.cancellationRequested();
        }
    }

   
    static abstract class ChainedLong<E_OUT> implements Sink.OfLong {
        protected final Sink<? super E_OUT> downstream;

        public ChainedLong(Sink<? super E_OUT> downstream) {
            this.downstream = Objects.requireNonNull(downstream);
        }

        @Override
        public void begin(long size) {
            downstream.begin(size);
        }

        @Override
        public void end() {
            downstream.end();
        }

        @Override
        public boolean cancellationRequested() {
            return downstream.cancellationRequested();
        }
    }

    
    static abstract class ChainedDouble<E_OUT> implements Sink.OfDouble {
        protected final Sink<? super E_OUT> downstream;

        public ChainedDouble(Sink<? super E_OUT> downstream) {
            this.downstream = Objects.requireNonNull(downstream);
        }

        @Override
        public void begin(long size) {
            downstream.begin(size);
        }

        @Override
        public void end() {
            downstream.end();
        }

        @Override
        public boolean cancellationRequested() {
            return downstream.cancellationRequested();
        }
    }
}

Consumer接口是Lambda表达式用于处理不返回值的操作。Sink是Consumer的一个扩展,用于在具有管理大小信息的附加方法的流管道,在调用accept()方法之前,必须首先调用begin()方法通知它数据即将到来,并且在所有数据发送后,您必须调用end()方法。调用end()后,不应调用accept()。Sink也是提供了一种机制,通过调用cancelueRequest()方法,接收器可以协同发出信号不希望接收更多数据,源可以在将更多数据发送到Sink。

TerminalOp表示终止操作,对数据进行实际处理。

interface TerminalOp<E_IN, R> {

    default StreamShape inputShape() { return StreamShape.REFERENCE; }

    
    default int getOpFlags() { return 0; }

    
    default <P_IN> R evaluateParallel(PipelineHelper<E_IN> helper,
                                      Spliterator<P_IN> spliterator) {
        if (Tripwire.ENABLED)
            Tripwire.trip(getClass(), "{0} triggering TerminalOp.evaluateParallel serial default");
        return evaluateSequential(helper, spliterator);
    }

   
    <P_IN> R evaluateSequential(PipelineHelper<E_IN> helper,
                                Spliterator<P_IN> spliterator);
}
函数名 作用
inputShape() 返回流类型
getOpFlags() 返回流操作标志
evaluateParallel 使用PipelineHelper并行执行流操作
evaluateSequential 使用PipelineHelper顺序执行流操作

标签:Stream,downstream,void,特性,public,Sink,accept,Override,java8
From: https://www.cnblogs.com/shigongp/p/17178888.html

相关文章

  • 提前中止Java Stream forEach的优雅解决方案
    1.概述JavaStreamforeach是一种声明式的,整洁的遍历循环。但是当我们满足某个条件想要中止这个循环的时候,通常却十分繁琐,比如使用异常,但通常这不是一种很好的实现方案......
  • 第125篇: 期约Promise基本特性
    好家伙,本篇为《JS高级程序设计》第十章“期约与异步函数”学习笔记 1.非重入期约1.1.可重入代码(百度百科)先来了解一个概念可重入代码(Reentrycode)也叫纯代码(Pure......
  • text/event-stream
    content-type为text/event-streamwebpack热更新需要向浏览器推送信息,一般都会想到websocket,但是还有一种方式,叫做Server-SentEvents(简称SSE)。SSE是websocket的一种轻型......
  • LT8619C-HDMI转LVDS/RGB/BT1120芯片功能特性及应用
    LT8619CHDMIToTTL/LVDSConverter 1.特性●HDMI/双模DP输入接口◆兼容DVIV1.0和HDMIV1.4◆支持高达1.65Gbps的DVI◆支持高达3.4Gbps的HDMI......
  • 【Dubbo RPC 框架-服务发现&常用特性】
    零、本文纲要一、服务发现二、Dubbo快速入门(Spring+SpringMVC)①dubbo-interface模块②dubbo-service模块③dubbo-web模块三、其他特性①序列化implementsSerial......
  • LT8911EXB-MIPI转EDP视频转换芯片功能特性及概述
    LT8911EXB:MIPI®DSI/CSIBridgetoeDP 1.特性●单端口MIPI®DSI接收器◆符合D-PHY1.2、DSI1.3、CSI1.3标准◆1个时钟通道和1~4个可配置的数据通道......
  • java 类的高级特性
    类的高级特性1类包1.1类名冲突类包的存在就是为了解决类名的冲突,就是重名。1.2完整的类路径例如:java.uitl.Date=newjava.util.Date();java.sql.Date=new......
  • 面向对象高级特性
    面向对象高级特性抽象为什么会有抽象类当子类中都有一个共同的方法,每一个子类都有不同的实现,在父类中又要体现所有子类的共同的特点,所以要体现有这个方法,但是在父类......
  • Java8中Stream详细用法大全
    Java8中Stream详细用法大全一、概述Stream是Java8中处理集合的关键抽象概念,它可以指定你希望对集合进行的操作,可以执行非常复杂的查找、过滤和映射数据等操......
  • sonar代码扫描bug:Use try-with-resources or close this "FileInputStream" in a "fi
      下面代码/***读取文件到byte数组**@paramtradeFile*@return*/publicstaticbyte[]file2byte(FiletradeFile){try{FileInputSt......