首页 > 编程语言 >java自定义stream

java自定义stream

时间:2022-08-27 11:56:02浏览次数:76  
标签:java 自定义 stream 管道 sink return SelfSink public

一、流程

1

        // 自定义集合,继承ArrayList,与ArrayList没啥区别
        SelfList<Apple> appleList = new SelfList<>();
import java.util.ArrayList;
import java.util.Iterator;

/**
 * 自定义集合,继承ArrayList,与ArrayList没啥区别
 */
public class SelfList<T> extends ArrayList<T> {
    /**
     * 数据源放进头部管道,头部管段很窄,深度为0,没有sink操作
     *
     * @return 头部管道(作用是持有数据源)
     */
    public SelfStream<T> selfStream() {
        Iterator<T> listIterator = super.iterator();
        return new SelfPipeline.SelfHead<>(listIterator);
    }
}
    /**
     * 头部管道:继承管道,属于管道的一种
     */
    static class SelfHead<P_IN, P_OUT> extends SelfPipeline<P_IN, P_OUT> {
        SelfHead(Iterator<?> source) {
            // 初始化头部管道
            super(source);
    /**
     * 初始化头部管道
     *
     * @param source 数据源
     */
    SelfPipeline(Iterator<?> source) {
        // 数据源
        this.sourceIterator = source;
        // 数据源Op,即是头部管道自身,后面每一段管道通过持有“头部管道”获取数据源
        this.sourceStage = this;

        // 上段管道,头部管道没有上段管道
        this.previousStage = null;

        // 头部管道的深度=0,下面的每段管道的深度依次+1
        this.depth = 0;
    }
        }

        // 头部管道无需sink
        @Override
        final SelfSink<P_IN> opWrapSink(SelfSink<P_OUT> sink) {
            throw new UnsupportedOperationException();
        }
    }
        appleList.add(new Apple(1, "青色"));
        appleList.add(new Apple(2, "橙色"));
        appleList.add(new Apple(3, "红色"));
        appleList.add(new Apple(4, "绿色"));
        appleList.add(new Apple(5, "绿色"));
        appleList.add(new Apple(6, "紫色"));

2

        // 把数据源(appleList)放进头部管道(pipelineHead),接下来每段管道都持有头部管道以获取数据源
        SelfStream<Apple> pipelineHead = appleList.selfStream();
见SelfList

3

        // 线性拼接下一段管道(FilterOp),并定义这段管道的职责(计划由FilterSink执行)
        SelfStream<Apple> statelessOpFilter = pipelineHead.filter(item -> "绿色".equals(item.getColor()));
    /**
     * 生成Op管道(FilterOp),定义这段管道的职责(并由FilterSink执行该职责)
     *
     * @param predicate 断言型函数式接口
     * @return Op管道(FilterOp)
     */
    @Override
    public final SelfStream<P_OUT> filter(Predicate<? super P_OUT> predicate) {
        return new SelfStatelessOp<P_OUT, P_OUT>(this) {
    /**
     * Op管道:继承管道,属于管道的一种
     */
    abstract static class SelfStatelessOp<P_IN, P_OUT> extends SelfPipeline<P_IN, P_OUT> {
        /**
         * @param upstream 上段管道
         */
        SelfStatelessOp(SelfPipeline<?, P_IN> upstream) {
            // 初始化Op管道
            super(upstream);
    /**
     * 初始化Op管道
     *
     * @param upstream 上段管道
     */
    SelfPipeline(SelfPipeline<?, P_IN> upstream) {
        // 自己的上段管道
        this.previousStage = upstream;
        // 上段管道的下段是自己
        upstream.nextStage = this;

        // 每一段管道都持有“头部管道”
        this.sourceStage = upstream.sourceStage;

        // 深度+1
        this.depth = upstream.depth + 1;
    }
        }
    }
       // 定义这段管道的职责(并由FilterSink执行该职责)
            @Override
            SelfSink<P_OUT> opWrapSink(SelfSink<P_OUT> sink) {
                // FilterSink
                return new SelfSink.Chain<P_OUT, P_OUT>(sink) {
    /**
     * sink链
     */
    abstract class Chain<T, E_OUT> implements SelfSink<T> {
        // 下个sink
        protected final SelfSink<? super E_OUT> downstream;

        public Chain(SelfSink<? super E_OUT> downstream) {
            // 向下的单向链条
            this.downstream = downstream;
        }

        // 本例:单向链条传递到最后一个ReducingSink,初始化结果集合
        @Override
        public void begin(long size) {
            downstream.begin(size);
        }
    }

                    @Override
                    public void begin(long size) {
                        downstream.begin(-1);
                    }

                    @Override
                    public void accept(P_OUT u) {
                        if (predicate.test(u)) {
                            // 触发下一段管道的动作,下沉
                            downstream.accept(u);
                        }
                    }
                };
            }
        };
    }

4

        // 线性拼接下一段管道(MapOp),并定义这段管道的职责(计划由MapSink执行)
        SelfStream<Integer> statelessOpMap = statelessOpFilter.map(Apple::getWeight);
    /**
     * 生成Op管道(MapOp),定义这段管道的职责(并由MapSink执行该职责)
     *
     * @param mapper 函数型函数式接口:有入参和返回值
     * @return Op管道(MapOp)
     */
    @Override
    public final <R> SelfStream<R> map(Function<? super P_OUT, ? extends R> mapper) {
        return new SelfStatelessOp<P_OUT, R>(this) {
如上FilterOp
       // 定义这段管道的职责(并由MapSink执行该职责)
            @Override
            SelfSink<P_OUT> opWrapSink(SelfSink<R> sink) {
                // MapSink
                return new SelfSink.Chain<P_OUT, R>(sink) {
如上FilterOp
                    @Override
                    public void accept(P_OUT u) {
                        downstream.accept(mapper.apply(u));
                    }
                };
            }
        };
    }

5

        /*
         * 执行终结操作:
         * 1,生成ReducingSink(设计为收集汇聚上游的流);
         * 2,生成MapSink,让MapSink向下链接ReducingSink;
         * 3,通过MapOp来到FilterOp,生成FilterSink,让FilterSink向下链接MapSink;
         * 4,遍历数据源的每一个元素,让元素依次流经FilterSink->MapSink->ReducingSink,汇聚成最终形态
         */
        SelfList<Apple> terminalOpCollect = statelessOpMap.collect(SelfCollectors.toList());
    /**
     * 终结操作
     *
     * @param collector 此例子是Collectors.toList(),定义了结果集和操作结果集的方法
     * @return 最终结果:即汇聚最终结果都是在此方法中进行的
     */
    @Override
    public final <R, A> R collect(SelfCollector<? super P_OUT, A> collector) {
        // 在终结操作里尾部管道(TerminalOp)
        SelfTerminalOp<P_OUT, A> terminalOp = SelfReduceOpsFactory.makeRef(collector);
    // 生成尾部管道
    public static <T, I> SelfTerminalOp<T, I> makeRef(SelfCollector<? super T, I> collector) {
        // 此例子是(Supplier<List<T>>) ArrayList::new
        Supplier<I> supplier = Objects.requireNonNull(collector).supplier();
        // 此例子是List::add
        BiConsumer<I, ? super T> accumulator = collector.accumulator();

        // 定义了ReducingSink的职责
        class ReducingSink extends SelfBox<I> implements SelfTerminalSink<T, I> {
            @Override
            public void begin(long size) {
                // state = ArrayList::new = 初始化的空集合
                state = supplier.get();
            }

            @Override
            public void accept(T t) {
                // 向集合state中添加元素t
                accumulator.accept(state, t);
            }
        }

        // 生成ReduceOp(尾部管道接口的实现类),定义尾部管道的职责(并由ReducingSink执行该职责)
        return new SelfReduceOp<T, I, ReducingSink>() {
            // ReducingSink
            @Override
            public ReducingSink makeSink() {
                return new ReducingSink();
            }
        };
    }

     // 拉网
        A container = evaluate(terminalOp);
    /**
     * 使用terminalOp拉网
     *
     * @param terminalOp 使用statelessOp.collect方法的参数构造出来的
     * @param <R>
     * @return 拉网结果,ReducingSink的get方法得到最终集合
     */
    final <R> R evaluate(SelfTerminalOp<P_OUT, R> terminalOp) {
        // 当前对象为map的statelessOp
        Iterator<?> iterator = sourceStage.sourceIterator;
        // 按顺序拉网:terminalOp是使用statelessOp.collect方法的参数构造出来的
        return terminalOp.evaluateSequential(this, iterator);
        /**
         * 顺序评估管道
         */
        @Override
        public <P_IN> R evaluateSequential(SelfPipelineHelper<T> helper, Iterator<P_IN> iterator) {
            // 生成尾部管道的sink
            S reducingSink = makeSink();
如上makeRef
       // 管道包装sink:从ReducingSink(4)开始,按照3,4;2,3;1,2的顺序构建sink链,按照1,2,3,4的顺序执行流转,由4收集最终结果
            S wrappedReducingSink = helper.wrapAndCopyInto(reducingSink, iterator);
    /**
     * 管道包装sink
     *
     * @return 评估管道结束后,返回终结操作管道的sink
     */
    @Override
    final <P_IN, S extends SelfSink<P_OUT>> S wrapAndCopyInto(S sink, Iterator<P_IN> iterator) {
        // 向前遍历管道,根据管道之前的职责定义,包装生成对应的sink,并使用链条串联起来;返回sink链的第一个对象
        SelfSink<P_IN> firstSinkLink = wrapSink(Objects.requireNonNull(sink));
    /**
     * 管道包装sink
     *
     * @param sink 此例子是:传入的是ReducingSink
     * @return sink链的第一个对象
     */
    @Override
    final <P_IN> SelfSink<P_IN> wrapSink(SelfSink<P_OUT> sink) {
        // 向前遍历管道,根据管道之前的职责定义,包装生成对应的sink,并使用链条串联起来
        for (SelfPipeline p = SelfPipeline.this; p.depth > 0; p = p.previousStage) {
            sink = p.opWrapSink(sink);
见FilterOp、MapOp的职责定义部分
        }
        return (SelfSink<P_IN>) sink;
    }

     // 数据源的每一个元素都从sink链的第一个对象流转到最后一个对象,并被最后一个对象收集
        copyInto(firstSinkLink, iterator);
    /**
     * 遍历数据源元素,顺序执行sink的accept方法(consumer)
     *
     * @param wrappedSink sink链的第一个对象
     * @param iterator    数据源
     */
    @Override
    final <P_IN> void copyInto(SelfSink<P_IN> wrappedSink, Iterator<P_IN> iterator) {
        // 准备好尾部管道的收集装置
        wrappedSink.begin(-1);

        // 遍历流转和收集
        iterator.forEachRemaining(wrappedSink);
// 收尾工作(无)
        wrappedSink.end();
    }

     // 返回ReducingSink
        return sink;
    }

       // 返回ReducingSink持有和可以supplier的集合
            return wrappedReducingSink.get();
        }
    }
return (R) container;
    }

6

        System.out.println(terminalOpCollect);

7

        // 简写
        SelfList<Apple> apples = appleList.selfStream()
                .filter(item -> "绿色".equals(item.getColor()))
                .map(Apple::getWeight)
                .collect(SelfCollectors.toList());
        System.out.println(apples);

二、完整代码

package com.simple.boot.java_skill.selfλ;

import com.simple.boot.java_skill.functionprograming.Apple;

/**
 * main函数
 */
public class SelfLambdaTest {
    public static void main(String[] args) {
        // 自定义集合,继承ArrayList,与ArrayList没啥区别
        SelfList<Apple> appleList = new SelfList<>();

        appleList.add(new Apple(1, "青色"));
        appleList.add(new Apple(2, "橙色"));
        appleList.add(new Apple(3, "红色"));
        appleList.add(new Apple(4, "绿色"));
        appleList.add(new Apple(5, "绿色"));
        appleList.add(new Apple(6, "紫色"));

        // 把数据源(appleList)放进头部管道(pipelineHead),接下来每段管道都持有头部管道以获取数据源
        SelfStream<Apple> pipelineHead = appleList.selfStream();
        // 线性拼接下一段管道(FilterOp),并定义这段管道的职责(计划由FilterSink执行)
        SelfStream<Apple> statelessOpFilter = pipelineHead.filter(item -> "绿色".equals(item.getColor()));
        // 线性拼接下一段管道(MapOp),并定义这段管道的职责(计划由MapSink执行)
        SelfStream<Integer> statelessOpMap = statelessOpFilter.map(Apple::getWeight);
        /*
         * 执行终结操作:
         * 1,生成ReducingSink(设计为收集汇聚上游的流);
         * 2,生成MapSink,让MapSink向下链接ReducingSink;
         * 3,通过MapOp来到FilterOp,生成FilterSink,让FilterSink向下链接MapSink;
         * 4,遍历数据源的每一个元素,让元素依次流经FilterSink->MapSink->ReducingSink,汇聚成最终形态
         */
        SelfList<Apple> terminalOpCollect = statelessOpMap.collect(SelfCollectors.toList());
        System.out.println(terminalOpCollect);

        // 简写
        SelfList<Apple> apples = appleList.selfStream()
                .filter(item -> "绿色".equals(item.getColor()))
                .map(Apple::getWeight)
                .collect(SelfCollectors.toList());
        System.out.println(apples);
    }
}
package com.simple.boot.java_skill.selfλ;

import java.util.ArrayList;
import java.util.Iterator;

/**
 * 自定义集合,继承ArrayList,与ArrayList没啥区别
 */
public class SelfList<T> extends ArrayList<T> {
    /**
     * 数据源放进头部管道,头部管段很窄,深度为0,没有sink操作
     *
     * @return 头部管道(作用是持有数据源)
     */
    public SelfStream<T> selfStream() {
        Iterator<T> listIterator = super.iterator();
        return new SelfPipeline.SelfHead<>(listIterator);
    }
}
package com.simple.boot.java_skill.selfλ;

import java.util.Iterator;
import java.util.Objects;
import java.util.function.Function;
import java.util.function.Predicate;

/**
 * 管道:流的一种具化实现
 */
public abstract class SelfPipeline<P_IN, P_OUT>
        extends SelfPipelineHelper<P_OUT> implements SelfStream<P_OUT> {
    // 数据源
    private Iterator<?> sourceIterator;
    // 头部管道(持有数据源)
    private final SelfPipeline sourceStage;

    // 上段管道
    private final SelfPipeline previousStage;

    // 下段管道
    private SelfPipeline nextStage;

    // 本段管道深度
    private int depth;

    /**
     * 初始化Op管道
     *
     * @param upstream 上段管道
     */
    SelfPipeline(SelfPipeline<?, P_IN> upstream) {
        // 自己的上段管道
        this.previousStage = upstream;
        // 上段管道的下段是自己
        upstream.nextStage = this;

        // 每一段管道都持有“头部管道”
        this.sourceStage = upstream.sourceStage;

        // 深度+1
        this.depth = upstream.depth + 1;
    }

    /**
     * 初始化头部管道
     *
     * @param source 数据源
     */
    SelfPipeline(Iterator<?> source) {
        // 数据源
        this.sourceIterator = source;
        // 数据源Op,即是头部管道自身,后面每一段管道通过持有“头部管道”获取数据源
        this.sourceStage = this;

        // 上段管道,头部管道没有上段管道
        this.previousStage = null;

        // 头部管道的深度=0,下面的每段管道的深度依次+1
        this.depth = 0;
    }

    /**
     * 头部管道:继承管道,属于管道的一种
     */
    static class SelfHead<P_IN, P_OUT> extends SelfPipeline<P_IN, P_OUT> {
        SelfHead(Iterator<?> source) {
            // 初始化头部管道
            super(source);
        }

        // 头部管道无需sink
        @Override
        final SelfSink<P_IN> opWrapSink(SelfSink<P_OUT> sink) {
            throw new UnsupportedOperationException();
        }
    }

    /**
     * Op管道:继承管道,属于管道的一种
     */
    abstract static class SelfStatelessOp<P_IN, P_OUT> extends SelfPipeline<P_IN, P_OUT> {
        /**
         * @param upstream 上段管道
         */
        SelfStatelessOp(SelfPipeline<?, P_IN> upstream) {
            // 初始化Op管道
            super(upstream);
        }
    }

    abstract SelfSink<P_IN> opWrapSink(SelfSink<P_OUT> sink);

    /**
     * 生成Op管道(FilterOp),定义这段管道的职责(并由FilterSink执行该职责)
     *
     * @param predicate 断言型函数式接口
     * @return Op管道(FilterOp)
     */
    @Override
    public final SelfStream<P_OUT> filter(Predicate<? super P_OUT> predicate) {
        return new SelfStatelessOp<P_OUT, P_OUT>(this) {
            // 定义这段管道的职责(并由FilterSink执行该职责)
            @Override
            SelfSink<P_OUT> opWrapSink(SelfSink<P_OUT> sink) {
                // FilterSink
                return new SelfSink.Chain<P_OUT, P_OUT>(sink) {
                    @Override
                    public void begin(long size) {
                        downstream.begin(-1);
                    }

                    @Override
                    public void accept(P_OUT u) {
                        if (predicate.test(u)) {
                            // 触发下一段管道的动作,下沉
                            downstream.accept(u);
                        }
                    }
                };
            }
        };
    }

    /**
     * 生成Op管道(MapOp),定义这段管道的职责(并由MapSink执行该职责)
     *
     * @param mapper 函数型函数式接口:有入参和返回值
     * @return Op管道(MapOp)
     */
    @Override
    public final <R> SelfStream<R> map(Function<? super P_OUT, ? extends R> mapper) {
        return new SelfStatelessOp<P_OUT, R>(this) {
            // 定义这段管道的职责(并由MapSink执行该职责)
            @Override
            SelfSink<P_OUT> opWrapSink(SelfSink<R> sink) {
                // MapSink
                return new SelfSink.Chain<P_OUT, R>(sink) {
                    @Override
                    public void accept(P_OUT u) {
                        downstream.accept(mapper.apply(u));
                    }
                };
            }
        };
    }

    /**
     * 终结操作
     *
     * @param collector 此例子是Collectors.toList(),定义了结果集和操作结果集的方法
     * @return 最终结果:即汇聚最终结果都是在此方法中进行的
     */
    @Override
    public final <R, A> R collect(SelfCollector<? super P_OUT, A> collector) {
        // 在终结操作里尾部管道(TerminalOp)
        SelfTerminalOp<P_OUT, A> terminalOp = SelfReduceOpsFactory.makeRef(collector);

        // 拉网
        A container = evaluate(terminalOp);

        return (R) container;
    }


    /**
     * 使用terminalOp拉网
     *
     * @param terminalOp 使用statelessOp.collect方法的参数构造出来的
     * @param <R>
     * @return 拉网结果,ReducingSink的get方法得到最终集合
     */
    final <R> R evaluate(SelfTerminalOp<P_OUT, R> terminalOp) {
        // 当前对象为map的statelessOp
        Iterator<?> iterator = sourceStage.sourceIterator;
        // 按顺序拉网:terminalOp是使用statelessOp.collect方法的参数构造出来的
        return terminalOp.evaluateSequential(this, iterator);
    }

    /**
     * 管道包装sink
     *
     * @return 评估管道结束后,返回终结操作管道的sink
     */
    @Override
    final <P_IN, S extends SelfSink<P_OUT>> S wrapAndCopyInto(S sink, Iterator<P_IN> iterator) {
        // 向前遍历管道,根据管道之前的职责定义,包装生成对应的sink,并使用链条串联起来;返回sink链的第一个对象
        SelfSink<P_IN> firstSinkLink = wrapSink(Objects.requireNonNull(sink));

        // 数据源的每一个元素都从sink链的第一个对象流转到最后一个对象,并被最后一个对象收集
        copyInto(firstSinkLink, iterator);

        // 返回ReducingSink
        return sink;
    }

    /**
     * 遍历数据源元素,顺序执行sink的accept方法(consumer)
     *
     * @param wrappedSink sink链的第一个对象
     * @param iterator    数据源
     */
    @Override
    final <P_IN> void copyInto(SelfSink<P_IN> wrappedSink, Iterator<P_IN> iterator) {
        // 准备好尾部管道的收集装置
        wrappedSink.begin(-1);

        // 遍历流转和收集
        iterator.forEachRemaining(wrappedSink);

        // 收尾工作(无)
        wrappedSink.end();
    }

    /**
     * 管道包装sink
     *
     * @param sink 此例子是:传入的是ReducingSink
     * @return sink链的第一个对象
     */
    @Override
    final <P_IN> SelfSink<P_IN> wrapSink(SelfSink<P_OUT> sink) {
        // 向前遍历管道,根据管道之前的职责定义,包装生成对应的sink,并使用链条串联起来
        for (SelfPipeline p = SelfPipeline.this; p.depth > 0; p = p.previousStage) {
            sink = p.opWrapSink(sink);
        }
        return (SelfSink<P_IN>) sink;
    }

}
package com.simple.boot.java_skill.selfλ;

import java.util.function.BiConsumer;
import java.util.function.Supplier;

public interface SelfCollector<T, A> {

    Supplier<A> supplier();

    BiConsumer<A, T> accumulator();
}
package com.simple.boot.java_skill.selfλ;

import java.util.List;
import java.util.function.BiConsumer;
import java.util.function.Supplier;

public final class SelfCollectors {

    public static <T> SelfCollector<T, ?> toList() {
        return new SelfCollectorImpl<>((Supplier<List<T>>) SelfList::new, List::add);
    }

    static class SelfCollectorImpl<T, A> implements SelfCollector<T, A> {
        private final Supplier<A> supplier;
        private final BiConsumer<A, T> accumulator;

        SelfCollectorImpl(Supplier<A> supplier, BiConsumer<A, T> accumulator) {
            this.supplier = supplier;
            this.accumulator = accumulator;
        }

        @Override
        public BiConsumer<A, T> accumulator() {
            return accumulator;
        }

        @Override
        public Supplier<A> supplier() {
            return supplier;
        }
    }
}
package com.simple.boot.java_skill.selfλ;

import java.util.Iterator;

/**
 * 管道工具类
 */
public abstract class SelfPipelineHelper<P_OUT> {

    abstract <P_IN, S extends SelfSink<P_OUT>> S wrapAndCopyInto(S sink, Iterator<P_IN> iterator);

    abstract <P_IN> void copyInto(SelfSink<P_IN> wrappedSink, Iterator<P_IN> iterator);

    abstract <P_IN> SelfSink<P_IN> wrapSink(SelfSink<P_OUT> sink);
}
package com.simple.boot.java_skill.selfλ;

import java.util.Iterator;
import java.util.Objects;
import java.util.function.BiConsumer;
import java.util.function.Supplier;

/**
 * 尾部管道工厂模式
 */
public class SelfReduceOpsFactory {
    // 生成尾部管道
    public static <T, I> SelfTerminalOp<T, I> makeRef(SelfCollector<? super T, I> collector) {
        // 此例子是(Supplier<List<T>>) ArrayList::new
        Supplier<I> supplier = Objects.requireNonNull(collector).supplier();
        // 此例子是List::add
        BiConsumer<I, ? super T> accumulator = collector.accumulator();

        // 定义了ReducingSink的职责
        class ReducingSink extends SelfBox<I> implements SelfTerminalSink<T, I> {
            @Override
            public void begin(long size) {
                // state = ArrayList::new = 初始化的空集合
                state = supplier.get();
            }

            @Override
            public void accept(T t) {
                // 向集合state中添加元素t
                accumulator.accept(state, t);
            }
        }

        // 生成ReduceOp(尾部管道接口的实现类),定义尾部管道的职责(并由ReducingSink执行该职责)
        return new SelfReduceOp<T, I, ReducingSink>() {
            // ReducingSink
            @Override
            public ReducingSink makeSink() {
                return new ReducingSink();
            }
        };
    }

    /**
     * 结果集
     */
    private static abstract class SelfBox<U> {
        U state;

        SelfBox() {
        }

        public U get() {
            return state;
        }
    }

    /**
     * 尾部管道
     */
    private static abstract class SelfReduceOp<T, R, S extends SelfTerminalSink<T, R>>
            implements SelfTerminalOp<T, R> {
        SelfReduceOp() {
        }

        // 生成尾部管道的sink,定义为抽象方法,终结操作终结结果有多种
        public abstract S makeSink();

        /**
         * 顺序评估管道
         */
        @Override
        public <P_IN> R evaluateSequential(SelfPipelineHelper<T> helper, Iterator<P_IN> iterator) {
            // 生成尾部管道的sink
            S reducingSink = makeSink();

            // 管道包装sink:从ReducingSink(4)开始,按照3,4;2,3;1,2的顺序构建sink链,按照1,2,3,4的顺序执行流转,由4收集最终结果
            S wrappedReducingSink = helper.wrapAndCopyInto(reducingSink, iterator);

            // 返回ReducingSink持有和可以supplier的集合
            return wrappedReducingSink.get();
        }
    }
}
package com.simple.boot.java_skill.selfλ;

import java.util.function.Consumer;

/**
 * 下沉:一种Consumer函数式接口,执行指定动作
 */
public interface SelfSink<T> extends Consumer<T> {
    default void begin(long size) {
    }

    default void end() {
    }

    /**
     * sink链
     */
    abstract class Chain<T, E_OUT> implements SelfSink<T> {
        // 下个sink
        protected final SelfSink<? super E_OUT> downstream;

        public Chain(SelfSink<? super E_OUT> downstream) {
            // 向下的单向链条
            this.downstream = downstream;
        }

        // 本例:单向链条传递到最后一个ReducingSink,初始化结果集合
        @Override
        public void begin(long size) {
            downstream.begin(size);
        }
    }
}
package com.simple.boot.java_skill.selfλ;

import java.util.function.Function;
import java.util.function.Predicate;

public interface SelfStream<T> {
    SelfStream<T> filter(Predicate<? super T> predicate);

    <R> SelfStream<R> map(Function<? super T, ? extends R> mapper);

    <R, A> R collect(SelfCollector<? super T, A> collector);
}
package com.simple.boot.java_skill.selfλ;

import java.util.Iterator;

public interface SelfTerminalOp<E_IN, R> {
    <P_IN> R evaluateSequential(SelfPipelineHelper<E_IN> helper, Iterator<P_IN> iterator);
}
package com.simple.boot.java_skill.selfλ;

import java.util.function.Supplier;

/**
 * 尾部管道的sink
 */
public interface SelfTerminalSink<T, R> extends SelfSink<T>, Supplier<R> {
}

 

标签:java,自定义,stream,管道,sink,return,SelfSink,public
From: https://www.cnblogs.com/seeall/p/16630280.html

相关文章

  • Java调用Windows系统命令CMD
    Java的Runtime.getRuntime().exec(commandStr)可以调用执行cmd指令。cmd/cdir是执行完dir命令后关闭命令窗口。 cmd/kdir是执行完dir命令后不关闭命令窗口。 ......
  • win10环境安装vs2015的问题:缺少JavaScript_ProjectSystem.msi和JavaScript_LanguageSe
    最近有同事在win10下安装vs2015总是报错,安装中途报缺少文件JavaScript_ProjectSystem.msi和JavaScript_LanguageService.msi。想想看微软发布的产品应该不至于丢三落四,缺......
  • D11(java基础)
    D11(java基础)流概念:内存与存储设备之间传输数据的通道。流的分类按方向:输入流:将存储设备中的内容读入到内存中。输出流:将内存中的内容写入到存储设备中。......
  • JavaSE-Day02-Java方法
    Java方法什么是方法System.out.println() 类.对象.方法()Java方法是语句的集合,他们在一起执行一个功能方法是解决一类问题的步骤的有序集合方法包含于类或对象之中......
  • 【FAQ】【JAVA UI】HarmonyOS 如何获取uid和pid
    ​ 【问题描述】鸿蒙中怎么样可以获取Uid和Pid 【解决方案】try{BundleInfobundleInfo=getContext().getBundleManager().getBundleInfo(getBund......
  • KingbaseES V8R3集群运维案例之---用户自定义表空间管理
    ​案例说明:KingbaseES数据库支持用户自定义表空间的创建,并建议表空间的文件存储路径配置到数据库的data目录之外。本案例复现了,当用户自定义表空间存储路径配置到data下......
  • 肖sir__java__02基本
    注意点:1、java中首字母大小写区分2、类名首个字母大写3、方法名,首字母小写,多个单词后面大写4.文件名和类名一致,后缀名为.java5.出现波浪线就市报错6、都是main方法......
  • java如何实现对List集合进行分页
    对List集合进行分页:private<T>Page<T>listToPage(List<T>dataList,IntegerpageSize,IntegerpageNumber){Page<T>result=newPage<T>();List<T>monit......
  • java前前期复习
     1配置环境:a去官网或者下载之家下载jdk8安装      b配置环境变量首先JAVA_HOME然后进入path增加%JAVA_HOME%\bin和%JAVA_HOME%\bin\jre     ......
  • Java Servlet 入门: 问题系列:Filter中通过HttpServletRequest.getParts()获取不到上传
    问题:一开始以为Servlet 没有提供对文件读取的相关内容。后来发现,HttpServletRequest中有getParts方法,可以获取上传的文件。再后发,经过反复测试,发现都读不到相关内容。......