Stream流水線原理

1.無狀態(tài)、有狀態(tài)操作及短路、非短路操作

Stream上的所有操作分為兩類:中間操作和終止操作。中間操作只是一種標(biāo)記,只有終止操作才會(huì)觸發(fā)實(shí)際計(jì)算。


中間操作又可以分為無狀態(tài)的(Stateless)和有狀態(tài)的(Stateful):

  • 無狀態(tài)中間操作是指元素的處理不受前面元素的影響
  • 有狀態(tài)的中間操作必須等到所有元素處理之后才知道最終結(jié)果,比如排序是有狀態(tài)操作,在讀取所有元素之前并不能確定排序結(jié)果,

結(jié)束操作又可以分為短路操作和非短路操作:

  • 短路操作是指不用處理全部元素就可以返回結(jié)果,比如找到第一個(gè)滿足條件的元素。

之所以要進(jìn)行如此精細(xì)的劃分,是因?yàn)榈讓訉?duì)每一種情況的處理方式不同。

2.迭代實(shí)現(xiàn)

        OptionalInt longestStringLengthStartingWithA
                = strings.stream()
                .filter(s -> s.startsWith("A"))
                .mapToInt(String::length)
                .max();

這種實(shí)現(xiàn)的兩個(gè)弊端:

  • 迭代次數(shù)多。迭代次數(shù)跟函數(shù)調(diào)用的次數(shù)相等。
  • 頻繁產(chǎn)生中間結(jié)果。每次函數(shù)調(diào)用都產(chǎn)生一次中間結(jié)果,存儲(chǔ)開銷無法接受。

使用一次迭代實(shí)現(xiàn):

int longest = 0;
for(String str : strings){
    if(str.startsWith("A")){ // 1. filter()
        int len = str.length(); // 2. mapToInt()
        longest = Math.max(len, longest); // 3. max()
    }
}

3.流水線怎樣實(shí)現(xiàn)盡可能減少迭代次數(shù)?

應(yīng)該采用某種方式記錄用戶每一步的操作,當(dāng)用戶調(diào)用結(jié)束操作時(shí)將之前記錄的操作疊加到一起在一次迭代中全部執(zhí)行掉。有幾個(gè)問題需要解決:

  • 用戶的操作如何記錄?
  • 操作如何疊加?
  • 疊加之后的操作如何執(zhí)行?
  • 執(zhí)行后的結(jié)果(如果有)在哪里?

3.1 操作如何記錄

指的是Stream中間操作,很多Stream操作會(huì)需要一個(gè)回調(diào)函數(shù)(Lambda表達(dá)式),因此一個(gè)完整的操作是<數(shù)據(jù)源,操作,回調(diào)函數(shù)>構(gòu)成的三元組。Stream中使用Stage的概念來描述一個(gè)完整的操作,并用某種實(shí)例化后的PipelineHelper來代表Stage,將具有先后順序的各個(gè)Stage連到一起,就構(gòu)成了整個(gè)流水線。跟Stream相關(guān)類和接口的繼承關(guān)系圖示。


IntPipeline, LongPipeline, DoublePipeline沒三個(gè)類專門為三種基本類型(不是包裝類型)而定制的,跟ReferencePipeline是并列關(guān)系。圖中Head用于表示第一個(gè)Stage,即調(diào)用調(diào)用諸如Collection.stream()方法產(chǎn)生的Stage,很顯然這個(gè)Stage里不包含任何操作;StatelessOp和StatefulOp分別表示無狀態(tài)和有狀態(tài)的Stage,對(duì)應(yīng)于無狀態(tài)和有狀態(tài)的中間操作。

通過Collection.stream()方法得到Head也就是stage0,緊接著調(diào)用一系列的中間操作,不斷產(chǎn)生新的Stream。這些Stream對(duì)象以雙向鏈表的形式組織在一起,構(gòu)成整個(gè)流水線,由于每個(gè)Stage都記錄了前一個(gè)Stage和本次的操作以及回調(diào)函數(shù),依靠這種結(jié)構(gòu)就能建立起對(duì)數(shù)據(jù)源的所有操作。這就是Stream記錄操作的方式。

3.2 操作如何疊加

以上只是解決了操作記錄的問題,要想讓流水線起到應(yīng)有的作用我們需要一種將所有操作疊加到一起的方案。你可能會(huì)覺得這很簡單,只需要從流水線的head開始依次執(zhí)行每一步的操作(包括回調(diào)函數(shù))就行了。這聽起來似乎是可行的,但是你忽略了前面的Stage并不知道后面Stage到底執(zhí)行了哪種操作,以及回調(diào)函數(shù)是哪種形式。換句話說,只有當(dāng)前Stage本身才知道該如何執(zhí)行自己包含的動(dòng)作。這就需要有某種協(xié)議來協(xié)調(diào)相鄰Stage之間的調(diào)用關(guān)系。

這種協(xié)議由Sink接口完成,Sink接口包含的方法如下表所示:


有了上面的協(xié)議,相鄰Stage之間調(diào)用就很方便了,每個(gè)Stage都會(huì)將自己的操作封裝到一個(gè)Sink里,前一個(gè)Stage只需調(diào)用后一個(gè)Stage的accept()方法即可,并不需要知道其內(nèi)部是如何處理的。當(dāng)然對(duì)于有狀態(tài)的操作,Sink的begin()和end()方法也是必須實(shí)現(xiàn)的。比如Stream.sorted()是一個(gè)有狀態(tài)的中間操作,其對(duì)應(yīng)的Sink.begin()方法可能創(chuàng)建一個(gè)乘放結(jié)果的容器,而accept()方法負(fù)責(zé)將元素添加到該容器,最后end()負(fù)責(zé)對(duì)容器進(jìn)行排序。對(duì)于短路操作,Sink.cancellationRequested()也是必須實(shí)現(xiàn)的,比如Stream.findFirst()是短路操作,只要找到一個(gè)元素,cancellationRequested()就應(yīng)該返回true,以便調(diào)用者盡快結(jié)束查找。Sink的四個(gè)接口方法常常相互協(xié)作,共同完成計(jì)算任務(wù)。實(shí)際上Stream API內(nèi)部實(shí)現(xiàn)的的本質(zhì),就是如何重載Sink的這四個(gè)接口方法。

有了Sink對(duì)操作的包裝,Stage之間的調(diào)用問題就解決了,執(zhí)行時(shí)只需要從流水線的head開始對(duì)數(shù)據(jù)源依次調(diào)用每個(gè)Stage對(duì)應(yīng)的Sink.{begin(), accept(), cancellationRequested(), end()}方法就可以了。一種可能的Sink.accept()方法流程是這樣的:

void accept(U u){
    1. 使用當(dāng)前Sink包裝的回調(diào)函數(shù)處理u
    2. 將處理結(jié)果傳遞給流水線下游的Sink
}

Sink接口的其他幾個(gè)方法也是按照這種[處理->轉(zhuǎn)發(fā)]的模型實(shí)現(xiàn)。下面我們結(jié)合具體例子看看Stream的中間操作是如何將自身的操作包裝成Sink以及Sink是如何將處理結(jié)果轉(zhuǎn)發(fā)給下一個(gè)Sink的。

3.2.1 無狀態(tài)的Stream.map()方法

// Stream.map(),調(diào)用該方法將產(chǎn)生一個(gè)新的Stream
public final <R> Stream<R> map(Function<? super P_OUT, ? extends R> mapper) {
    ...
    return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,
                                 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
        @Override /*opWripSink()方法返回由回調(diào)函數(shù)包裝而成Sink*/
        Sink<P_OUT> opWrapSink(int flags, Sink<R> downstream) {
            return new Sink.ChainedReference<P_OUT, R>(downstream) {
                @Override
                public void accept(P_OUT u) {
                    R r = mapper.apply(u);// 1. 使用當(dāng)前Sink包裝的回調(diào)函數(shù)mapper處理u
                    downstream.accept(r);// 2. 將處理結(jié)果傳遞給流水線下游的Sink
                }
            };
        }
    };
}

上述代碼看似復(fù)雜,其實(shí)邏輯很簡單,就是將回調(diào)函數(shù)mapper包裝到一個(gè)Sink當(dāng)中。由于Stream.map()是一個(gè)無狀態(tài)的中間操作,所以map()方法返回了一個(gè)StatelessOp內(nèi)部類對(duì)象(一個(gè)新的Stream),調(diào)用這個(gè)新Stream的opWripSink()方法將得到一個(gè)包裝了當(dāng)前回調(diào)函數(shù)的Sink。

3.2.2 有狀態(tài)的Stream.sorted()方法

Stream.sorted()方法將對(duì)Stream中的元素進(jìn)行排序,顯然這是一個(gè)有狀態(tài)的中間操作,因?yàn)樽x取所有元素之前是沒法得到最終順序的。拋開模板代碼直接進(jìn)入問題本質(zhì),sorted()方法是如何將操作封裝成Sink的呢?sorted()一種可能封裝的Sink代碼如下:

// Stream.sort()方法用到的Sink實(shí)現(xiàn)
class RefSortingSink<T> extends AbstractRefSortingSink<T> {
    private ArrayList<T> list;// 存放用于排序的元素
    RefSortingSink(Sink<? super T> downstream, Comparator<? super T> comparator) {
        super(downstream, comparator);
    }
    @Override
    public void begin(long size) {
        ...
        // 創(chuàng)建一個(gè)存放排序元素的列表
        list = (size >= 0) ? new ArrayList<T>((int) size) : new ArrayList<T>();
    }
    @Override
    public void end() {
        list.sort(comparator);// 只有元素全部接收之后才能開始排序
        downstream.begin(list.size());
        if (!cancellationWasRequested) {// 下游Sink不包含短路操作
            list.forEach(downstream::accept);// 2. 將處理結(jié)果傳遞給流水線下游的Sink
        }
        else {// 下游Sink包含短路操作
            for (T t : list) {// 每次都調(diào)用cancellationRequested()詢問是否可以結(jié)束處理。
                if (downstream.cancellationRequested()) break;
                downstream.accept(t);// 2. 將處理結(jié)果傳遞給流水線下游的Sink
            }
        }
        downstream.end();
        list = null;
    }
    @Override
    public void accept(T t) {
        list.add(t);// 1. 使用當(dāng)前Sink包裝動(dòng)作處理t,只是簡單的將元素添加到中間列表當(dāng)中
    }
}

上述代碼完美的展現(xiàn)了Sink的四個(gè)接口方法是如何協(xié)同工作的:

  • 首先beging()方法告訴Sink參與排序的元素個(gè)數(shù),方便確定中間結(jié)果容器的的大??;
  • 之后通過accept()方法將元素添加到中間結(jié)果當(dāng)中,最終執(zhí)行時(shí)調(diào)用者會(huì)不斷調(diào)用該方法,直到遍歷所有元素;
  • 最后end()方法告訴Sink所有元素遍歷完畢,啟動(dòng)排序步驟,排序完成后將結(jié)果傳遞給下游的Sink;
  • 如果下游的Sink是短路操作,將結(jié)果傳遞給下游時(shí)不斷詢問下游cancellationRequested()是否可以結(jié)束處理。

3.3 疊加之后的操作如何執(zhí)行

Sink完美封裝了Stream每一步操作,并給出了[處理->轉(zhuǎn)發(fā)]的模式來疊加操作。這一連串的齒輪已經(jīng)咬合,就差最后一步撥動(dòng)齒輪啟動(dòng)執(zhí)行。是什么啟動(dòng)這一連串的操作呢?也許你已經(jīng)想到了啟動(dòng)的原始動(dòng)力就是終止操作(Terminal Operation),一旦調(diào)用某個(gè)終止操作,就會(huì)觸發(fā)整個(gè)流水線的執(zhí)行。

終止操作之后不能再有別的操作,所以終止操作不會(huì)創(chuàng)建新的流水線階段(Stage),直觀的說就是流水線的鏈表不會(huì)在往后延伸了。終止操作會(huì)創(chuàng)建一個(gè)包裝了自己操作的Sink,這也是流水線中最后一個(gè)Sink,這個(gè)Sink只需要處理數(shù)據(jù)而不需要將結(jié)果傳遞給下游的Sink(因?yàn)闆]有下游)。對(duì)于Sink的[處理->轉(zhuǎn)發(fā)]模型,終止操作的Sink就是調(diào)用鏈的出口。


我們再來考察一下上游的Sink是如何找到下游Sink的。一種可選的方案是在PipelineHelper中設(shè)置一個(gè)Sink字段,在流水線中找到下游Stage并訪問Sink字段即可。但Stream類庫的設(shè)計(jì)者沒有這么做,而是設(shè)置了一個(gè)Sink AbstractPipeline.opWrapSink(int flags, Sink downstream)方法來得到Sink,該方法的作用是返回一個(gè)新的包含了當(dāng)前Stage代表的操作以及能夠?qū)⒔Y(jié)果傳遞給downstream的Sink對(duì)象。為什么要產(chǎn)生一個(gè)新對(duì)象而不是返回一個(gè)Sink字段?這是因?yàn)槭褂胦pWrapSink()可以將當(dāng)前操作與下游Sink(上文中的downstream參數(shù))結(jié)合成新Sink。試想只要從流水線的最后一個(gè)Stage開始,不斷調(diào)用上一個(gè)Stage的opWrapSink()方法直到最開始(不包括stage0,因?yàn)閟tage0代表數(shù)據(jù)源,不包含操作),就可以得到一個(gè)代表了流水線上所有操作的Sink,用代碼表示就是這樣:

// AbstractPipeline.wrapSink()
// 從下游向上游不斷包裝Sink。如果最初傳入的sink代表結(jié)束操作,
// 函數(shù)返回時(shí)就可以得到一個(gè)代表了流水線上所有操作的Sink。
final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) {
    ...
    for (AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) {
        sink = p.opWrapSink(p.previousStage.combinedFlags, sink);
    }
    return (Sink<P_IN>) sink;
}

現(xiàn)在流水線上從開始到結(jié)束的所有的操作都被包裝到了一個(gè)Sink里,執(zhí)行這個(gè)Sink就相當(dāng)于執(zhí)行整個(gè)流水線,執(zhí)行Sink的代碼如下:

// AbstractPipeline.copyInto(), 對(duì)spliterator代表的數(shù)據(jù)執(zhí)行wrappedSink代表的操作。
final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {
    ...
    if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) {
        wrappedSink.begin(spliterator.getExactSizeIfKnown());// 通知開始遍歷
        spliterator.forEachRemaining(wrappedSink);// 迭代
        wrappedSink.end();// 通知遍歷結(jié)束
    }
    ...
}

上述代碼首先調(diào)用wrappedSink.begin()方法告訴Sink數(shù)據(jù)即將到來,然后調(diào)用spliterator.forEachRemaining()方法對(duì)數(shù)據(jù)進(jìn)行迭代(Spliterator是容器的一種迭代器,參閱),最后調(diào)用wrappedSink.end()方法通知Sink數(shù)據(jù)處理結(jié)束。邏輯如此清晰。

3.4 執(zhí)行后的結(jié)果在哪里

流水線上所有操作都執(zhí)行后,用戶所需要的結(jié)果(如果有)在哪里?首先要說明的是不是所有的Stream結(jié)束操作都需要返回結(jié)果,有些操作只是為了使用其副作用(Side-effects),比如使用Stream.forEach()方法將結(jié)果打印出來就是常見的使用副作用的場景(事實(shí)上,除了打印之外其他場景都應(yīng)避免使用副作用),對(duì)于真正需要返回結(jié)果的結(jié)束操作結(jié)果存在哪里呢?

特別說明:副作用不應(yīng)該被濫用,也許你會(huì)覺得在Stream.forEach()里進(jìn)行元素收集是個(gè)不錯(cuò)的選擇,就像下面代碼中那樣,但遺憾的是這樣使用的正確性和效率都無法保證,因?yàn)镾tream可能會(huì)并行執(zhí)行。大多數(shù)使用副作用的地方都可以使用歸約操作更安全和有效的完成。

// 錯(cuò)誤的收集方式
ArrayList<String> results = new ArrayList<>();
stream.filter(s -> pattern.matcher(s).matches())
      .forEach(s -> results.add(s));  // Unnecessary use of side-effects!
// 正確的收集方式
List<String>results =
     stream.filter(s -> pattern.matcher(s).matches())
             .collect(Collectors.toList());  // No side-effects!

需要返回結(jié)果的流水線結(jié)果存在哪里呢?這要分不同的情況討論,下表給出了各種有返回結(jié)果的Stream結(jié)束操作。

  • 1.對(duì)于表中返回boolean或者Optional的操作的操作,由于值返回一個(gè)值,只需要在對(duì)應(yīng)的Sink中記錄這個(gè)值,等到執(zhí)行結(jié)束時(shí)返回就可以了。
  • 2.對(duì)于歸約操作,最終結(jié)果放在用戶調(diào)用時(shí)指定的容器中(容器類型通過收集器指定)。collect(), reduce(), max(), min()都是歸約操作,雖然max()和min()也是返回一個(gè)Optional,但事實(shí)上底層是通過調(diào)用reduce()方法實(shí)現(xiàn)的。
  • 3.對(duì)于返回是數(shù)組的情況,毫無疑問的結(jié)果會(huì)放在數(shù)組當(dāng)中。這么說當(dāng)然是對(duì)的,但在最終返回?cái)?shù)組之前,結(jié)果其實(shí)是存儲(chǔ)在一種叫做Node的數(shù)據(jù)結(jié)構(gòu)中的。Node是一種多叉樹結(jié)構(gòu),元素存儲(chǔ)在樹的葉子當(dāng)中,并且一個(gè)葉子節(jié)點(diǎn)可以存放多個(gè)元素。這樣做是為了并行執(zhí)行方便。

4.示例

public class StreamTest {
    public static void main(String[] args) {
        String[] strs = {"ABS", "dedfa", "American", "Becareful"};
        ArrayList<String> strings = new ArrayList<>(Arrays.asList(strs));
        OptionalInt longestStringLengthStartingWithA
                = strings.stream()
                .filter(s -> s.startsWith("A"))
                .mapToInt(String::length)
                .max();
        System.out.println(longestStringLengthStartingWithA);
    }
}

4.1 Colletion.stream()

    default Stream<E> stream() {
        return StreamSupport.stream(spliterator(), false);
    }

調(diào)用ArrayList.spliterator():

    @Override
    public Spliterator<E> spliterator() {
        return new ArrayListSpliterator<>(this, 0, -1, 0);
    }
    public static <T> Stream<T> stream(Spliterator<T> spliterator, boolean parallel) {
        Objects.requireNonNull(spliterator);
        return new ReferencePipeline.Head<>(spliterator,
                                            StreamOpFlag.fromCharacteristics(spliterator),
                                            parallel);
    }

4.2 filter

使用了匿名類的技巧:

  • 構(gòu)建StatelessOp
  • 構(gòu)建Sink.ChainedReference
    @Override
    public final Stream<P_OUT> filter(Predicate<? super P_OUT> predicate) {
        Objects.requireNonNull(predicate);
        return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE,
                                     StreamOpFlag.NOT_SIZED) {
            @Override
            Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
                return new Sink.ChainedReference<P_OUT, P_OUT>(sink) {
                    @Override
                    public void begin(long size) {
                        downstream.begin(-1);
                    }

                    @Override
                    public void accept(P_OUT u) {
                        if (predicate.test(u))
                            downstream.accept(u);
                    }
                };
            }
        };
    }

4.3 mapToInt

    @Override
    public final IntStream mapToInt(ToIntFunction<? super P_OUT> mapper) {
        Objects.requireNonNull(mapper);
        return new IntPipeline.StatelessOp<P_OUT>(this, StreamShape.REFERENCE,
                                              StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
            @Override
            Sink<P_OUT> opWrapSink(int flags, Sink<Integer> sink) {
                return new Sink.ChainedReference<P_OUT, Integer>(sink) {
                    @Override
                    public void accept(P_OUT u) {
                        downstream.accept(mapper.applyAsInt(u));
                    }
                };
            }
        };
    }

4.4 IntPipeline.max

    @Override
    public final OptionalInt max() {
        return reduce(Math::max);
    }
    @Override
    public final OptionalInt reduce(IntBinaryOperator op) {
        return evaluate(ReduceOps.makeInt(op));
    }

4.4.1 構(gòu)造ReduceOp

    public static TerminalOp<Integer, OptionalInt>
    makeInt(IntBinaryOperator operator) {
        Objects.requireNonNull(operator);
        class ReducingSink
                implements AccumulatingSink<Integer, OptionalInt, ReducingSink>, Sink.OfInt {
            private boolean empty;
            private int state;

            public void begin(long size) {
                empty = true;
                state = 0;
            }

            @Override
            public void accept(int t) {
                if (empty) {
                    empty = false;
                    state = t;
                }
                else {
                    state = operator.applyAsInt(state, t);
                }
            }

            @Override
            public OptionalInt get() {
                return empty ? OptionalInt.empty() : OptionalInt.of(state);
            }

            @Override
            public void combine(ReducingSink other) {
                if (!other.empty)
                    accept(other.state);
            }
        }
        return new ReduceOp<Integer, OptionalInt, ReducingSink>(StreamShape.INT_VALUE) {
            @Override
            public ReducingSink makeSink() {
                return new ReducingSink();
            }
        };
    }

4.4.2 evaluate

    final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {
        assert getOutputShape() == terminalOp.inputShape();
        if (linkedOrConsumed)
            throw new IllegalStateException(MSG_STREAM_LINKED);
        linkedOrConsumed = true;

        return isParallel()
               ? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))
               : terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
    }
   /**
     * Get the source spliterator for this pipeline stage.  For a sequential or
     * stateless parallel pipeline, this is the source spliterator.  For a
     * stateful parallel pipeline, this is a spliterator describing the results
     * of all computations up to and including the most recent stateful
     * operation.
     */
    @SuppressWarnings("unchecked")
    private Spliterator<?> sourceSpliterator(int terminalFlags) {
        // Get the source spliterator of the pipeline
        Spliterator<?> spliterator = null;
        if (sourceStage.sourceSpliterator != null) {
            spliterator = sourceStage.sourceSpliterator;
            sourceStage.sourceSpliterator = null;
        }
        else if (sourceStage.sourceSupplier != null) {
            spliterator = (Spliterator<?>) sourceStage.sourceSupplier.get();
            sourceStage.sourceSupplier = null;
        }
        else {
            throw new IllegalStateException(MSG_CONSUMED);
        }

        if (isParallel() && sourceStage.sourceAnyStateful) {
            // Adapt the source spliterator, evaluating each stateful op
            // in the pipeline up to and including this pipeline stage.
            // The depth and flags of each pipeline stage are adjusted accordingly.
            int depth = 1;
            for (@SuppressWarnings("rawtypes") AbstractPipeline u = sourceStage, p = sourceStage.nextStage, e = this;
                 u != e;
                 u = p, p = p.nextStage) {

                int thisOpFlags = p.sourceOrOpFlags;
                if (p.opIsStateful()) {
                    depth = 0;

                    if (StreamOpFlag.SHORT_CIRCUIT.isKnown(thisOpFlags)) {
                        // Clear the short circuit flag for next pipeline stage
                        // This stage encapsulates short-circuiting, the next
                        // stage may not have any short-circuit operations, and
                        // if so spliterator.forEachRemaining should be used
                        // for traversal
                        thisOpFlags = thisOpFlags & ~StreamOpFlag.IS_SHORT_CIRCUIT;
                    }

                    spliterator = p.opEvaluateParallelLazy(u, spliterator);

                    // Inject or clear SIZED on the source pipeline stage
                    // based on the stage's spliterator
                    thisOpFlags = spliterator.hasCharacteristics(Spliterator.SIZED)
                            ? (thisOpFlags & ~StreamOpFlag.NOT_SIZED) | StreamOpFlag.IS_SIZED
                            : (thisOpFlags & ~StreamOpFlag.IS_SIZED) | StreamOpFlag.NOT_SIZED;
                }
                p.depth = depth++;
                p.combinedFlags = StreamOpFlag.combineOpFlags(thisOpFlags, u.combinedFlags);
            }
        }

        if (terminalFlags != 0)  {
            // Apply flags from the terminal operation to last pipeline stage
            combinedFlags = StreamOpFlag.combineOpFlags(terminalFlags, combinedFlags);
        }

        return spliterator;
    }

        @Override
        public <P_IN> R evaluateSequential(PipelineHelper<T> helper,
                                           Spliterator<P_IN> spliterator) {
            return helper.wrapAndCopyInto(makeSink(), spliterator).get();
        }
    @Override
    final <P_IN, S extends Sink<E_OUT>> S wrapAndCopyInto(S sink, Spliterator<P_IN> spliterator) {
        copyInto(wrapSink(Objects.requireNonNull(sink)), spliterator);
        return sink;
    }
    @Override
    @SuppressWarnings("unchecked")
    final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) {
        Objects.requireNonNull(sink);

        for ( @SuppressWarnings("rawtypes") AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) {
            sink = p.opWrapSink(p.previousStage.combinedFlags, sink);
        }
        return (Sink<P_IN>) sink;
    }
    static abstract class ChainedReference<T, E_OUT> implements Sink<T> {
        protected final Sink<? super E_OUT> downstream;

        public ChainedReference(Sink<? super E_OUT> downstream) {
            this.downstream = Objects.requireNonNull(downstream);
        }

使用opWrapSink()可以將當(dāng)前操作與下游Sink(上文中的downstream參數(shù))結(jié)合成新Sink。

這里調(diào)用的opWrapSink是上面mapToInt和filter中重寫的方法。,在這里調(diào)用是為了構(gòu)造Sink.ChainedReference,通過downstream將其單向鏈接起來。

    @Override
    final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {
        Objects.requireNonNull(wrappedSink);

        if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) {
            wrappedSink.begin(spliterator.getExactSizeIfKnown());
            spliterator.forEachRemaining(wrappedSink);
            wrappedSink.end();
        }
        else {
            copyIntoWithCancel(wrappedSink, spliterator);
        }
    }

其中forEachRemaining的調(diào)用:

        public void forEachRemaining(Consumer<? super E> action) {
            int i, hi, mc; // hoist accesses and checks from loop
            ArrayList<E> lst; Object[] a;
            if (action == null)
                throw new NullPointerException();
            if ((lst = list) != null && (a = lst.elementData) != null) {
                if ((hi = fence) < 0) {
                    mc = lst.modCount;
                    hi = lst.size;
                }
                else
                    mc = expectedModCount;
                if ((i = index) >= 0 && (index = hi) <= a.length) {
                    for (; i < hi; ++i) {
                        @SuppressWarnings("unchecked") E e = (E) a[i];
                        action.accept(e);
                    }
                    if (lst.modCount == mc)
                        return;
                }
            }
            throw new ConcurrentModificationException();
        }

4.5 總結(jié)

核心都是在spliterator.forEachRemaining(wrappedSink):

  • 1)迭代器spliterator遍歷流的每個(gè)元素
  • 2)針對(duì)每個(gè)元素調(diào)用action.accept(e)

前面的操作都是在組合這個(gè)wrappedSink也即action,將所有action從前到后串成一個(gè)單向鏈表。

該例子實(shí)現(xiàn)了一次迭代實(shí)現(xiàn)所有操作的功能。

參考

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • Int Double Long 設(shè)置特定的stream類型, 提高性能,增加特定的函數(shù) 無存儲(chǔ)。stream不是一...
    patrick002閱讀 1,322評(píng)論 0 0
  • 文章目錄 操作符的分類 流水線的結(jié)構(gòu) AbstractPipeline Stream的生成源碼分析 添加中間操作 ...
    TheLudlows_閱讀 14,368評(píng)論 1 12
  • Lambda表達(dá)式的作用就相當(dāng)于一個(gè)回調(diào)方法,Stream API中大量使用Lambda表達(dá)式作為回調(diào)方法。 再談...
    紹圣閱讀 811評(píng)論 0 0
  • λ表達(dá)式 什么是λ表達(dá)式 λ表達(dá)式有三部分組成:參數(shù)列表,箭頭(->),以及一個(gè)表達(dá)式或者語句塊。 轉(zhuǎn)換為λ表達(dá)式...
    輕狂丨書生閱讀 2,088評(píng)論 1 11
  • Java8 Stream原理深度解析 Author:DoraeDate:2017年11月2日19:10:39轉(zhuǎn)載請(qǐng)...
    Dorae132閱讀 3,215評(píng)論 0 13

友情鏈接更多精彩內(nèi)容