Stream原理研究

幾個概念

需要了解Stream、Pipeline、Stage、Sink幾個概念。

Stream

Stream就是一系列元素。是怎樣的一系列元素呢?是支持順序和并行聚集操作的一系列元素。
Stream定義了一些中間操作(Intermediate operations)和結(jié)束操作(Terminal operations),
中間操作包括無狀態(tài)(Stateless)操作比如:filter, map, flatMap等,有狀態(tài)(Stateful)操作比如:distinct, sorted, limit等;
結(jié)束操作(Terminal operations)包括非短路操作(short-circuiting)比如:forEach, reduce, collect等和短路操作如:findFirst, findAny;

這些操作需要被按順序記錄下來,這就需要Pipeline了。

Pipeline

Pipeline就是管道的概念。
管道有一個基類PipelineHelper,他是執(zhí)行Stream管道的一個helper,將Stream的所有信息收集到一個地方。

上面所說的操作其實都定義在PipelineHelper的一個子類ReferencePipeline中,包括Head(Source stage of a ReferencePipeline)、StatelessOp(Base class for a stateless intermediate stage of a Stream.)、StatefulOp(Base class for a stateful intermediate stage of a Stream.)靜態(tài)內(nèi)部類。

Stage

Stream中使用Stage的概念來描述一個完整的操作,并用某種實例化后的PipelineHelper來代表Stage,將具有先后順序的各個Stage連到一起,就構(gòu)成了整個流水線。

簡單理解就是每一個操作就會產(chǎn)生一個stage

image.png

Sink

現(xiàn)在有了Stage了,如何串聯(lián)起這些stage呢?通過Sink。
java.util.stream.Sink.ChainedReference來看,里面有一個downstream的實例成員,只要上游的sink調(diào)用下游的accept()方法,即可保證串聯(lián)起來執(zhí)行:即每一個上游的sink執(zhí)行完自己的accept()邏輯以后,調(diào)用下游sinkaccept()方法(downstream.accept())。

通過stage構(gòu)建起來的雙向鏈表只是給所有stage關(guān)聯(lián)在一起,每個stage都知道自己前后有沒有stage存在,卻沒法知道后面stage到底執(zhí)行了哪種操作,以及回調(diào)函數(shù)是哪種形式。這就好像一個網(wǎng)絡(luò)環(huán)境,根據(jù)iptables我知道了我下游的路由器在哪里,那我怎么把數(shù)據(jù)傳遞給它呢?通過協(xié)議,TCP協(xié)議!這么看的話,Sink更像是一個協(xié)議,讓每一個stage執(zhí)行完以后,知道調(diào)用某個方法來將數(shù)據(jù)傳遞給下一個stage,也就是Sink#accept()。

類圖1

通過代碼來講講:

@Test
public void testFilter() {
    Stream.of(1, 2, 3, 4, 5)
            .filter(item -> item > 3)
            .forEach(System.out::println);// 打印結(jié)果:4,5
}

其實這段代碼等價于:

@Test
public void testFilter() {
    Stream<Integer> head = Stream.of(1, 2, 3, 4, 5);
    Stream<Integer> filterStream = head.filter(i -> i > 3);
    filterStream.forEach(System.out::println);
    filterStream.count();
}

可以看到,前面每一步都會返回一個新的Stream,嚴(yán)格的說,其實是Stream的子類,更嚴(yán)格說,是ReferencePipeline的子類,參見上圖類圖1。具體如下:

// java.util.Arrays#stream(T[], int, int)
// 創(chuàng)建一個Stream實例
public static <T> Stream<T> stream(T[] array, int startInclusive, int endExclusive) {
    //  spliterator方法創(chuàng)建一個ArraySpliterator,后面會關(guān)注到ArraySpliterator#forEachRemaining
    return StreamSupport.stream(spliterator(array, startInclusive, endExclusive), false);
}

// java.util.stream.StreamSupport#stream(java.util.Spliterator<T>, boolean)
public static <T> Stream<T> stream(Spliterator<T> spliterator, boolean parallel) {
    Objects.requireNonNull(spliterator);
    // 創(chuàng)建一個管道(pipeline)的頭節(jié)點
    return new ReferencePipeline.Head<>(spliterator,
                                        StreamOpFlag.fromCharacteristics(spliterator),
                                        parallel);
}

通過上面的代碼可以看到,真正返回的是ReferencePipeline.Head的實例,當(dāng)然,就是Stream的子類。
接著看filter()方法:

// java.util.stream.ReferencePipeline#filter
@Override
public final Stream<P_OUT> filter(Predicate<? super P_OUT> predicate) {
    Objects.requireNonNull(predicate);
    // filter是一個無狀態(tài)操作(StatelessOp),StatelessOp是管道的一個節(jié)點,也是Stream的子類
    // 這段代碼重點關(guān)注返回的StatelessOp實例,this是當(dāng)前Stream實例,即上面代碼Stream#of方法產(chǎn)生的實例,也就是ReferencePipeline.Head實例
    // 通過這個方法串聯(lián)起新建節(jié)點StatelessOp與ReferencePipeline.Head的關(guān)系,即StatelessOp#previousStage=ReferencePipeline.Head
    return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE,
                                 StreamOpFlag.NOT_SIZED) {
        // 這個方法先不關(guān)注,后面還會說到
        @Override
        Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
            return new Sink.ChainedReference<P_OUT, P_OUT>(sink) {
                @Override
                public void begin(long size) {
                    downstream.begin(-1);
                }

                @Override
                public void accept(P_OUT u) {
                    if (predicate.test(u))
                        downstream.accept(u);
                }
            };
        }
    };
}

filter方法的默認(rèn)實現(xiàn)是在ReferencePipeline中實現(xiàn)的。上面這段代碼我們需要重點關(guān)注的是,返回了一個StatelessOp對象,也是ReferencePipeline的子類。關(guān)注一下構(gòu)造StatelessOp的過程:

// java.util.stream.ReferencePipeline.StatelessOp#StatelessOp
StatelessOp(AbstractPipeline<?, E_IN, ?> upstream,
            StreamShape inputShape,
            int opFlags) {
    super(upstream, opFlags);
    assert upstream.getOutputShape() == inputShape;
}

// java.util.stream.ReferencePipeline#ReferencePipeline(java.util.stream.AbstractPipeline<?,P_IN,?>, int)
ReferencePipeline(AbstractPipeline<?, P_IN, ?> upstream, int opFlags) {
    super(upstream, opFlags);
}

// java.util.stream.AbstractPipeline#AbstractPipeline(java.util.stream.AbstractPipeline<?,E_IN,?>, int)
AbstractPipeline(AbstractPipeline<?, E_IN, ?> previousStage, int opFlags) {
    if (previousStage.linkedOrConsumed)
        throw new IllegalStateException(MSG_STREAM_LINKED);
    previousStage.linkedOrConsumed = true;
    // 這里是關(guān)聯(lián)stage
    previousStage.nextStage = this;
    this.previousStage = previousStage;
    
    this.sourceOrOpFlags = opFlags & StreamOpFlag.OP_MASK;
    this.combinedFlags = StreamOpFlag.combineOpFlags(opFlags, previousStage.combinedFlags);
    this.sourceStage = previousStage.sourceStage;// sourceStage是ReferencePipeline.Head
    if (opIsStateful())
        sourceStage.sourceAnyStateful = true;
    this.depth = previousStage.depth + 1;
}

通過上面的操作,形成了這樣的雙向鏈表:
ReferencePipeline.Head <-> StatelessOp
一般后面在有其他操作,例如map()、sort()、limit()等,都是返回一個ReferencePipeline實例(StatefulOp/StatelessOp),在構(gòu)造方法里面將雙向鏈表串聯(lián)起來。

最后來看forEach()方法:

// java.util.stream.ReferencePipeline#forEach
@Override
public void forEach(Consumer<? super P_OUT> action) {
    // 通過工廠ForEachOps返回一個ForEachOp實例(下面會提到),本質(zhì)是一個TerminalOp,即:構(gòu)造了一個終止操作實例
    evaluate(ForEachOps.makeRef(action, false));
}

evaluate()方法就是一個執(zhí)行Stream的操作。

// java.util.stream.AbstractPipeline#evaluate(java.util.stream.TerminalOp<E_OUT,R>)
final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {
    assert getOutputShape() == terminalOp.inputShape();
    if (linkedOrConsumed)
        throw new IllegalStateException(MSG_STREAM_LINKED);
    linkedOrConsumed = true;

    return isParallel()
           ? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))
           // 關(guān)注順序執(zhí)行這里
           // this是由filter方法返回的Stream,嚴(yán)格說是PipelineHelper,terminalOp是上面提到的ForEachOp實例
           : terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
}

現(xiàn)在我們只關(guān)注順序執(zhí)行這一段:

// java.util.stream.ForEachOps.ForEachOp#evaluateSequential
@Override
public <S> Void evaluateSequential(PipelineHelper<T> helper,
                                   Spliterator<S> spliterator) {
    // this是上面提到的ForEachOp實例,helper是filter方法返回的PipelineHelper
    return helper.wrapAndCopyInto(this, spliterator).get();
}

wrapAndCopyInto方法主要干了兩件事:

  1. 封裝sink鏈;
  2. 將封裝好的sink鏈從頭到尾執(zhí)行。
// 串聯(lián)sink并執(zhí)行
// 前面提到反向串聯(lián),這里是順序串聯(lián)
// 這個是helper的實例方法,也就是filter()方法構(gòu)造出來的實例對象
// java.util.stream.AbstractPipeline#wrapAndCopyInto
@Override
final <P_IN, S extends Sink<E_OUT>> S wrapAndCopyInto(S sink, Spliterator<P_IN> spliterator) {
    // 這里的sink是上面提到的ForEachOp實例
    copyInto(wrapSink(Objects.requireNonNull(sink)), spliterator);
    return sink;
}

先看看對sink鏈的封裝:

// java.util.stream.AbstractPipeline#wrapSink
// sink是ForEachOp實例
@Override
@SuppressWarnings("unchecked")
final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) {
    // 這里的sink是上面提到的ForEachOp實例
    Objects.requireNonNull(sink);

    // 不斷的將p設(shè)置為前一個stage節(jié)點,然后調(diào)用p#opWrapSink構(gòu)造出上一個stage對應(yīng)的sink實例,這樣p就知道了它的downstream是sink了
    // 初始化的時候,AbstractPipeline.this對應(yīng)的是filter()方法返回的實例對象,sink是TerminalOp實例
    // p.depth > 0意味著sourceStage,即ReferencePipeline.Head實例不會包含到sink鏈中,因為sourceStage.depth = 0
    for ( @SuppressWarnings("rawtypes") AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) {
        // 關(guān)聯(lián)好后,sink變成前一個stage
        sink = p.opWrapSink(p.previousStage.combinedFlags, sink);
    }
    return (Sink<P_IN>) sink;// 這個sink已經(jīng)是sink鏈的鏈頭了
}

image.png

回看前面提到的filter()方法,重點關(guān)注opWrapSink()方法:

// java.util.stream.ReferencePipeline#filter
@Override
public final Stream<P_OUT> filter(Predicate<? super P_OUT> predicate) {
    Objects.requireNonNull(predicate);
    return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE,
                                 StreamOpFlag.NOT_SIZED) {
        // 重點關(guān)注這里
        // opWrapSink的作用是創(chuàng)建當(dāng)前節(jié)點的Sink實例。
        @Override
        Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
            // 將當(dāng)前的stage封裝成Sink實例,實例的downstream就是參數(shù)sink
            return new Sink.ChainedReference<P_OUT, P_OUT>(sink) {
                @Override
                public void begin(long size) {
                    downstream.begin(-1);
                }

                @Override
                public void accept(P_OUT u) {
                    // 這里先不關(guān)注,待會還要再提到
                    if (predicate.test(u))
                        downstream.accept(u);
                }
            };
        }
    };
}

// java.util.stream.Sink.ChainedReference#ChainedReference
// 構(gòu)造方法,創(chuàng)建sink實例,同時告知downstream,這樣當(dāng)前sink實例和downstream實例就串聯(lián)起來了
public ChainedReference(Sink<? super E_OUT> downstream) {
    this.downstream = Objects.requireNonNull(downstream);
}

可以看到上面方法就是構(gòu)造filter()方法這一步stage對應(yīng)的Sink實例,并且串聯(lián)起當(dāng)前sink實例與downstream。這樣在執(zhí)行的時候就知道了,當(dāng)前sink執(zhí)行完了,就調(diào)用downstream的方法執(zhí)行。
構(gòu)造好的sink鏈如下:

image.png

sink鏈構(gòu)造好了,下面就是執(zhí)行的過程:

// java.util.stream.AbstractPipeline#copyInto
@Override
final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {
    // 此時的wrappedSink已經(jīng)是鏈表頭了
    Objects.requireNonNull(wrappedSink);

    // 沒有短路操作,執(zhí)行這里
    if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) {
        wrappedSink.begin(spliterator.getExactSizeIfKnown());// 執(zhí)行Sink#begin
        // 這里的spliterator是ArraySpliterator,前面提到過
        // 對spliterator實例里面的元素逐個執(zhí)行
        spliterator.forEachRemaining(wrappedSink);
        wrappedSink.end();
    }
    else {
        copyIntoWithCancel(wrappedSink, spliterator);
    }
}

// java.util.Spliterators.ArraySpliterator#forEachRemaining
@SuppressWarnings("unchecked")
@Override
public void forEachRemaining(Consumer<? super T> action) {
    Object[] a; int i, hi; // hoist accesses and checks from loop
    if (action == null)
        throw new NullPointerException();
    if ((a = array).length >= (hi = fence) &&
        (i = index) >= 0 && i < (index = hi)) {
        // 對于串行執(zhí)行的,就是對數(shù)組里的每個元素執(zhí)行action.accept
        // 并行因為調(diào)用了trySplit,index和fence會不同,不是這里關(guān)注的重點。
        // 這里的action就是filter方法里的Sink.ChainedReference
        do { action.accept((T)a[i]); } while (++i < hi);
    }
}

上面的方法中調(diào)用了sink鏈的accept()方法,回看前面提到的filter()方法,重點關(guān)注accept()方法:

// java.util.stream.ReferencePipeline#filter
@Override
public final Stream<P_OUT> filter(Predicate<? super P_OUT> predicate) {
    Objects.requireNonNull(predicate);
    return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE,
                                 StreamOpFlag.NOT_SIZED) {
        @Override
        Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
            return new Sink.ChainedReference<P_OUT, P_OUT>(sink) {
                @Override
                public void begin(long size) {
                    downstream.begin(-1);
                }

                @Override
                public void accept(P_OUT u) {
                    // 重點關(guān)注這里
                    if (predicate.test(u))
                        // 調(diào)用downstream的accept,就串聯(lián)執(zhí)行了
                        downstream.accept(u);
                }
            };
        }
    };
}

// java.util.stream.ForEachOps.ForEachOp.OfRef#accept
@Override
public void accept(T t) {
    // comsumer就是forEach方法里的lambda表達(dá)式
    consumer.accept(t);
}

至此,一個完整的Stream流程執(zhí)行完畢。
附上Sink對應(yīng)的部分類圖:

image.png

這里XXXOps都是對應(yīng)XXXOp的工廠,如ReduceOpsReduceOp的工廠,用于生產(chǎn)ReduceOp實例。

總結(jié)

總結(jié)一下:

  1. Stream是支持順序和并行聚集操作的一系列元素。這里著重描述的是:Stream = 操作 + 元素
  2. Pipeline是管道,PipelineHelper是將Stream的信息收集到一個地方;
  3. Stage就是一個完整的操作,每一個PipelineHelper(或者說ReferencePipeline)的實例就是一個Stage,關(guān)聯(lián)起所有Stage將形成一個雙向鏈表。
  4. Sink鏈的主要作用是控制Stream中的每一個元素通過Stage形成的管道。
  5. Stream的邏輯是:
  • 構(gòu)造stage雙向鏈表
  • 利用stage雙向鏈表逆序構(gòu)造Sink
  • 通過Spliterator#forEachRemaining逐個處理元素,代碼類似于:
for (T element : allElements) {// 逐個元素處理
    Sink consumer = sinkHead;// sink鏈頭
    While (consumer != null) {
        consumer.accept(element);// 消費元素
        consumer = consumer.downstream;// 傳遞給下游處理
    }
}

關(guān)于Stream帶來好處的思考

首先是編程思維的轉(zhuǎn)變,原來是面向?qū)ο?,現(xiàn)在是面向數(shù)據(jù)。你需要考慮如何通過對手里的數(shù)據(jù)進(jìn)行操作得到預(yù)期的結(jié)果。這個有點想SQL。
其次是代碼解決性、可讀性,這個感覺更多是函數(shù)式編程帶來的好處
最后,也是最重要的,依托java.util.concurrent包,將并行化能力進(jìn)行封裝,也就是說,開發(fā)者可以通過最簡單的方式實現(xiàn)并行化數(shù)據(jù)處理,不在需要控制線程(新建、啟動、通信、異常處理)。

一些參考

深入理解Java Stream流水線
Java 8 Stream探秘
JAVA8中的stream原理解析——1(串行)
java8 Stream的實現(xiàn)原理 (從零開始實現(xiàn)一個stream流)
Stream源碼分析
jdk8中Spliterator的作用
java8 Stream Pipelines 淺析

Side-effect是只一個方法不只是返回一個結(jié)果,還會改變對象的狀態(tài)。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • Int Double Long 設(shè)置特定的stream類型, 提高性能,增加特定的函數(shù) 無存儲。stream不是一...
    patrick002閱讀 1,322評論 0 0
  • 1.無狀態(tài)、有狀態(tài)操作及短路、非短路操作 Stream上的所有操作分為兩類:中間操作和終止操作。中間操作只是一種標(biāo)...
    王偵閱讀 741評論 0 1
  • 文章目錄 操作符的分類 流水線的結(jié)構(gòu) AbstractPipeline Stream的生成源碼分析 添加中間操作 ...
    TheLudlows_閱讀 14,368評論 1 12
  • λ表達(dá)式 什么是λ表達(dá)式 λ表達(dá)式有三部分組成:參數(shù)列表,箭頭(->),以及一個表達(dá)式或者語句塊。 轉(zhuǎn)換為λ表達(dá)式...
    輕狂丨書生閱讀 2,088評論 1 11
  • 為了計算,“流”操作組成了一個流管道。一個流管道包括數(shù)據(jù)源、中間操作和終端操作。數(shù)據(jù)源可以是數(shù)組、集合、I/O通道...
    Unyielding_L閱讀 2,766評論 0 2

友情鏈接更多精彩內(nèi)容