淺析Java8 Stream原理

文章目錄

  • 操作符的分類
  • 流水線的結(jié)構(gòu)
  • AbstractPipeline
  • Stream的生成源碼分析
  • 添加中間操作
  • 萬事俱備,只欠東風(fēng)

上一篇文章中大體的介紹了Stream的概念和基本API的使用,Stream用起來的確非常的爽。這一篇文章將會講述Stream的底層實現(xiàn)原理。

操作符的分類

Stream中的操作可以分為兩大類:中間操作與結(jié)束操作,中間操作只是對操作進(jìn)行了記錄,只有結(jié)束操作才會觸發(fā)實際的計算(即惰性求值),這也是Stream在迭代大集合時高效的原因之一。
中間操作又可以分為無狀態(tài)(Stateless)操作與有狀態(tài)(Stateful)操作,前者是指元素的處理不受之前元素的影響;后者是指該操作只有拿到所有元素之后才能繼續(xù)下去。
結(jié)束操作又可以分為短路與非短路操作,這個應(yīng)該很好理解,前者是指遇到某些符合條件的元素就可以得到最終結(jié)果;而后者是指必須處理所有元素才能得到最終結(jié)果。

流水線的結(jié)構(gòu)

Stream的主要接口類的關(guān)系如下圖:


類關(guān)系圖
  • BaseStream規(guī)定了流的基本接口
  • Stream中定義了map、filter、flatmap等用戶關(guān)注的常用操作;
  • Int~ Long~ Double~是針對于基本類型的特化 方法與Stream中大致對應(yīng),當(dāng)然也有一些差別
  • BaseStream Stream IntStream LongStream DoubleStream 組建了Java的流體系根基
  • PipelineHelper主要用于Stream執(zhí)行過程中相關(guān)結(jié)構(gòu)的構(gòu)建ReferencePipeline和AbstractPipeline
  • AbstractPipeline是流水線的核心抽象類,用于構(gòu)建和管理流水線。它的實現(xiàn)類就是流水線的節(jié)點。
  • Head、StatelessOp、StatefulOp為ReferencePipeline中的內(nèi)部類,[Int | Long | Double]Pipeline 內(nèi)部也都是定義了這三個內(nèi)部類。

IntPipeline, LongPipeline, DoublePipeline這三個類專門為三種基本類型而定制的,Int、long、double進(jìn)行了優(yōu)化,主要用于頻繁的拆裝箱。三者跟ReferencePipeline是并列關(guān)系,
StatefulOp、StatelessOp分別對應(yīng)有狀態(tài)和無狀態(tài)中間操作。,很多Stream操作會需要一個回調(diào)函數(shù)(Lambda表達(dá)式),一個完整的操作是<數(shù)據(jù)來源,操作,回調(diào)函數(shù)>構(gòu)成的三元組。

Stream中使用Stage的概念來描述一個完整的操作,將具有先后順序的各個Stage連到一起,就構(gòu)成了整個流水線。

AbstractPipeline

前面說到AbstractPipeline是流水線的核心。AbstractPipeline中定義了三個AbstractPipeline類型的變量:sourceStage(源階段),previousStage(上游pipeline,前一階段),nextStage(下一階段)。

/**
 * Backlink to the head of the pipeline chain (self if this is the source stage).
 */
private final AbstractPipeline sourceStage;
/**
* The "upstream" pipeline, or null if this is the source stage.
*/
private final AbstractPipeline previousStage;
/**
 * The next stage in the pipeline, or null if this is the last stage.
 * Effectively final at the point of linking to the next pipeline.
 */
private AbstractPipeline nextStage;

它的直接實現(xiàn)類為ReferencePipeline,而Head 、StatefulOp 、StatelessOp又繼承了ReferencePipeline類。因此Head StatefulOp StatelessOp 他們本身也是AbstractPipeline類型的。
每一個stage就是一個AbstractPipeline的實例,注意,剛開始筆者和Netty中的Pipelie做以類比,其實根本不是一回事。這里的每一個pipeline都是一個節(jié)點。

Head用于表示第一個Stage,也就是source stage,調(diào)用諸如Collection.stream()方法產(chǎn)生的Stage,很顯然這個Stage里不包含任何操作;StatelessOp和StatefulOp分別表示無狀態(tài)和有狀態(tài)的Stage,對應(yīng)于無狀態(tài)和有狀態(tài)的中間操作。
注意:終結(jié)操作不會添加節(jié)點。


Pipeline

Collection.stream()方法得到Head也就是stage0,緊接著調(diào)用一系列的中間操作,不斷產(chǎn)生新的Stream。這些Stream對象以雙向鏈表的形式組織在一起,構(gòu)成整個流水線。
由于每個Stage都記錄了前一個Stage和本次的操作以及回調(diào)函數(shù),依靠這種結(jié)構(gòu)就能建立起對數(shù)據(jù)源的所有操作。

下面分析Head節(jié)點的構(gòu)建。

Stream的生成源碼分析

不管是Collection中調(diào)用StreamSupport.stream()還是Stream的of方法 都是調(diào)用了StreamSupport.stream方法。以Collecton.stream()為例

default Stream<E> parallelStream() {
    return StreamSupport.stream(spliterator(), true);
}

這是一個接口方法的默認(rèn)實現(xiàn),第一個參數(shù)是獲取一個 Spliterator的實例,它表示從數(shù)據(jù)源中獲取元素的方式。相當(dāng)于升級版的Iterator。第二個參數(shù)是是否并行。
繼續(xù)進(jìn)入StreamSupport類中:

public static <T> Stream<T> stream(Spliterator<T> spliterator, boolean parallel) {
    Objects.requireNonNull(spliterator);
    return new ReferencePipeline.Head<>(spliterator,
                                        StreamOpFlag.fromCharacteristics(spliterator),
                                        parallel);
}

看到這里和前面說的pipeline有點聯(lián)系了,此處構(gòu)造Head節(jié)點。我們看一下Head它的構(gòu)造函數(shù)鏈:

Head(Spliterator<?> source,
  int sourceFlags, boolean parallel) {
  super(source, sourceFlags, parallel);//source - 描述流的源 sourceFlags - 流的來原標(biāo)志 parallel - 是否為并行流
}
ReferencePipeline(Spliterator<?> source,
              int sourceFlags, boolean parallel) {
super(source, sourceFlags, parallel);
}
AbstractPipeline(Spliterator<?> source,
                 int sourceFlags, boolean parallel) {
    this.previousStage = null; // 上游管道,第一次創(chuàng)建流則為null
    this.sourceSpliterator = source; // 源分裂器。僅對頭管道有效。
    this.sourceStage = this; // 反向鏈接到管道鏈的頭部(如果這是源階段,則為它本身)
    this.sourceOrOpFlags = sourceFlags & StreamOpFlag.STREAM_MASK; //此管道對象中表示的中間操作的操作標(biāo)志。
    // The following is an optimization of:
    //源以及所有操作的組合源的操作標(biāo)志,包括此管道對象表示的操作。在評估管道準(zhǔn)備時有效。
    this.combinedFlags = (~(sourceOrOpFlags << 1)) & StreamOpFlag.INITIAL_OPS_VALUE;
    this.depth = 0; //此管道對象與流源(如果是順序)之間的中間操作數(shù),或之前的有狀態(tài)(如果并行)。在評估管道準(zhǔn)備時有效。
    this.parallel = parallel;//如果管道是并行的,則為真,否則管道是順序的;僅對源階段有效.
}

第一個構(gòu)造函數(shù)對應(yīng)的類為:static class Head<E_IN, E_OUT> extends ReferencePipeline<E_IN, E_OUT>,
注意這一有一個細(xì)節(jié):范型的命名,<E_IN>上游源中的元素類型 <E_OUT>此階段生成的元素類型
ReferencePipeline這個類由繼承了AbstractPipeline<P_IN, P_OUT, Stream<P_OUT>>,同時實現(xiàn)了Stream<P_OUT>的各種操作符。
至此頭結(jié)點已經(jīng)構(gòu)造完成。

添加中間操作

看看filter的代碼:

@Override
public final Stream<P_OUT> filter(Predicate<? super P_OUT> predicate) {
    Objects.requireNonNull(predicate);
    return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE,
                                 StreamOpFlag.NOT_SIZED) {
        @Override
        Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
            return new Sink.ChainedReference<P_OUT, P_OUT>(sink) {
                @Override
                public void begin(long size) {
                    downstream.begin(-1);
                }

                @Override
                public void accept(P_OUT u) {
                    if (predicate.test(u))
                        downstream.accept(u);
                }
            };
        }
    };
}

可以看到返回了一個無狀態(tài)stage,也是一個AbstractPipeline、stream,即是流水線的一個階段。同時還實現(xiàn)了AbstractPipeline定義的opWrapSink方法。
看到這里似乎有些困惑,一片Override方法到底什么意思?

Sink
Stream中將操作抽象化為stage 每個stage 也就是一個AbstractPipeline,每個stage 相當(dāng)于一個雙向鏈表的節(jié)點 ,每個節(jié)點都保存Head然后保存著上一個和下一個節(jié)點。這個雙向鏈表就構(gòu)成了整個流水線。

但是似乎有一個最重要的東西沒有提到,那么就是每一個階段的操作。如何將多個操作疊加到一起呢?
你可能會覺得這很簡單,只需要從流水線的head開始依次執(zhí)行每一步的操作(包括回調(diào)函數(shù))就行了。
這聽起來似乎是可行的,但是你忽略了前面的Stage并不知道后面Stage到底執(zhí)行了哪種操作,以及回調(diào)函數(shù)是哪種形式。換句話說,只有當(dāng)前Stage本身才知道該如何執(zhí)行自己包含的動作。這就需要有某種協(xié)議來協(xié)調(diào)相鄰Stage之間的調(diào)用關(guān)系。這就是Sink接口存在的意義。
創(chuàng)建的Sink.ChainedReference類構(gòu)造方法如下:

public ChainedReference(Sink<? super E_OUT> downstream) {
           this.downstream = Objects.requireNonNull(downstream);
       }

Sink接口相當(dāng)于對操作(我們實現(xiàn)的函數(shù)式接口)封裝了一層,每一個階段只需要調(diào)用自己的Sink的accept方法,accept內(nèi)部只要調(diào)用下一個階段的accept
不需要知道下一個極端的操作類型是什么。

當(dāng)然Sink添加進(jìn)行一些擴展功能,比如:begin表示開始遍歷元素前的方法,相當(dāng)于AOP。end表示元素遍歷結(jié)束之后,cancellationRequested表示是否可以結(jié)束操作,可以讓短路操作盡早結(jié)束。

實際上Stream 操作符內(nèi)部實現(xiàn)的的本質(zhì),就是實現(xiàn)Sink的這四個接口方法。
Sink接口的方法幾乎都是按照這種[處理->轉(zhuǎn)發(fā)]的模型實現(xiàn),如上面的accept

1. 使用當(dāng)前Sink包裝的回調(diào)函數(shù)處理u
2. 將處理結(jié)果傳遞給流水線下游的Sink

當(dāng)添加了中間操作符之后的鏈表結(jié)構(gòu)如下,Head中沒有任何操作,因此也沒有實現(xiàn)Sink。


pipeline-sink

在這里產(chǎn)生一個疑問,重寫的opWrapSink什么時候被調(diào)用?以及Sink的accept什么時候被調(diào)用?正如圖中所示,我們只是重寫了opWrapSink方法,保存在每一個節(jié)點中。每一個Sink都是獨立的,它的downstream還沒有賦值。

萬事俱備,只欠東風(fēng)

何為東風(fēng)?顯然是終結(jié)操作符,以forEach為例,實現(xiàn)在ReferencePipeline中:

@Override
public void forEach(Consumer<? super P_OUT> action) {
    evaluate(ForEachOps.makeRef(action, false));
}

ForEachOps是用戶創(chuàng)建TerminalOp實例的工廠類。TerminalOp是終止操作最頂層的一個接口。TerminalOp接口的實現(xiàn)類有ForEachOp, ReduceOp,FindOp, MatchOp。
先看ForEachOps.makeRef()方法:

public static <T> TerminalOp<T, Void> makeRef(Consumer<? super T> action,
                                              boolean ordered) {
    Objects.requireNonNull(action);
    return new ForEachOp.OfRef<>(action, ordered);
}

OfRef是引用流的默認(rèn)實現(xiàn)類,這里新建了一個OfRef的實例,構(gòu)造方法如下:

OfRef(Consumer<? super T> consumer, boolean ordered) {
    super(ordered);// 父類ForEachOp,參數(shù)表述遍歷是否有序,前面?zhèn)魅氲膄alse
    this.consumer = consumer;
}

將我們實現(xiàn)的Consumer函數(shù)式接口賦值給成員變量?;氐絜valuate方法:

final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {
    assert getOutputShape() == terminalOp.inputShape();
    if (linkedOrConsumed)
        throw new IllegalStateException(MSG_STREAM_LINKED);
    linkedOrConsumed = true;

    return isParallel()
           ? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))
           : terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
}

跟到串行流的實現(xiàn),實現(xiàn)在ForEachOp中:

@Override
public <S> Void evaluateSequential(PipelineHelper<T> helper,
                                   Spliterator<S> spliterator) {
    return helper.wrapAndCopyInto(this, spliterator).get();
}

數(shù)PipelineHelper類型其實是AbstractPipeline的父類,而AbstractPipeline又是ReferencePipeline的父類。再跟進(jìn)helper.wrapAndCopyInto方法,是現(xiàn)在AbstractPipeline中:

@Override
final <P_IN, S extends Sink<E_OUT>> S wrapAndCopyInto(S sink, Spliterator<P_IN> spliterator) {
    copyInto(wrapSink(Objects.requireNonNull(sink)), spliterator);
    return sink;
}
final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) {
    for ( @SuppressWarnings("rawtypes") AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) {
        sink = p.opWrapSink(p.previousStage.combinedFlags, sink);
    }
    return (Sink<P_IN>) sink;
}

可以看到通過ReferencePipeline的雙向鏈表,從最后一個操作(也就是終止操作)往前遍歷,將所有的操作都串聯(lián)起來,最終返回一個指向第一個操作的Sink引用。

這里有一個細(xì)節(jié)問題:
final <P_IN, S extends Sink<E_OUT>> S wrapAndCopyInto(S sink, Spliterator<P_IN> spliterator)方法的第一個入?yún)榉盒?,幾Sink的子類。對應(yīng)的有Sink.ofLong、Sink.ofInt等。并且這個參數(shù)也屬于TerminalOp類型,說白了終結(jié)操作符最終也被包裝成了Sink類型,這一切都通了,最后一個中間操作的downStream是終結(jié)操作符。

真的要執(zhí)行了
回到copyInto方法,wrapSink返回了Head后第一個中間操作的包裝Sink,繼續(xù)看copyInto的實現(xiàn):

@Override
final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {
    Objects.requireNonNull(wrappedSink);

    if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) {
        wrappedSink.begin(spliterator.getExactSizeIfKnown());
        spliterator.forEachRemaining(wrappedSink);
        wrappedSink.end();
    }
    else {
        copyIntoWithCancel(wrappedSink, spliterator);
    }
}

第一個參數(shù)我們知道Head后的節(jié)點,spliterator可以看作數(shù)據(jù)源。邏輯分為短路操作和非短路操作,如果有短路操作就會執(zhí)行下面的copyIntoWithCancel方法,否則指向上面的邏輯,這里我們非常熟悉啊,begin、accept、end先后串聯(lián)執(zhí)行。

短路流在執(zhí)行遍歷的時候會調(diào)用Sink封裝的cancellationRequested方法,如果返回出就不會進(jìn)行后面的操作,過程稍微比非短路復(fù)雜一點,但是原理大致相同。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • Int Double Long 設(shè)置特定的stream類型, 提高性能,增加特定的函數(shù) 無存儲。stream不是一...
    patrick002閱讀 1,322評論 0 0
  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn),斷路器,智...
    卡卡羅2017閱讀 136,533評論 19 139
  • /Library/Java/JavaVirtualMachines/jdk-9.jdk/Contents/Home...
    光劍書架上的書閱讀 4,183評論 2 8
  • Java8 Stream原理深度解析 Author:DoraeDate:2017年11月2日19:10:39轉(zhuǎn)載請...
    Dorae132閱讀 3,215評論 0 13
  • λ表達(dá)式 什么是λ表達(dá)式 λ表達(dá)式有三部分組成:參數(shù)列表,箭頭(->),以及一個表達(dá)式或者語句塊。 轉(zhuǎn)換為λ表達(dá)式...
    輕狂丨書生閱讀 2,088評論 1 11

友情鏈接更多精彩內(nèi)容