幾個概念
需要了解Stream、Pipeline、Stage、Sink幾個概念。
Stream
Stream就是一系列元素。是怎樣的一系列元素呢?是支持順序和并行聚集操作的一系列元素。
Stream定義了一些中間操作(Intermediate operations)和結(jié)束操作(Terminal operations),
中間操作包括無狀態(tài)(Stateless)操作比如:filter, map, flatMap等,有狀態(tài)(Stateful)操作比如:distinct, sorted, limit等;
結(jié)束操作(Terminal operations)包括非短路操作(short-circuiting)比如:forEach, reduce, collect等和短路操作如:findFirst, findAny;
這些操作需要被按順序記錄下來,這就需要Pipeline了。
Pipeline
Pipeline就是管道的概念。
管道有一個基類PipelineHelper,他是執(zhí)行Stream管道的一個helper,將Stream的所有信息收集到一個地方。
上面所說的操作其實都定義在PipelineHelper的一個子類ReferencePipeline中,包括Head(Source stage of a ReferencePipeline)、StatelessOp(Base class for a stateless intermediate stage of a Stream.)、StatefulOp(Base class for a stateful intermediate stage of a Stream.)靜態(tài)內(nèi)部類。
Stage
Stream中使用Stage的概念來描述一個完整的操作,并用某種實例化后的PipelineHelper來代表Stage,將具有先后順序的各個Stage連到一起,就構(gòu)成了整個流水線。
簡單理解就是每一個操作就會產(chǎn)生一個
stage

Sink
現(xiàn)在有了Stage了,如何串聯(lián)起這些stage呢?通過Sink。
拿java.util.stream.Sink.ChainedReference來看,里面有一個downstream的實例成員,只要上游的sink調(diào)用下游的accept()方法,即可保證串聯(lián)起來執(zhí)行:即每一個上游的sink執(zhí)行完自己的accept()邏輯以后,調(diào)用下游sink的accept()方法(downstream.accept())。
通過
stage構(gòu)建起來的雙向鏈表只是給所有stage關(guān)聯(lián)在一起,每個stage都知道自己前后有沒有stage存在,卻沒法知道后面stage到底執(zhí)行了哪種操作,以及回調(diào)函數(shù)是哪種形式。這就好像一個網(wǎng)絡(luò)環(huán)境,根據(jù)iptables我知道了我下游的路由器在哪里,那我怎么把數(shù)據(jù)傳遞給它呢?通過協(xié)議,TCP協(xié)議!這么看的話,Sink更像是一個協(xié)議,讓每一個stage執(zhí)行完以后,知道調(diào)用某個方法來將數(shù)據(jù)傳遞給下一個stage,也就是Sink#accept()。

通過代碼來講講:
@Test
public void testFilter() {
Stream.of(1, 2, 3, 4, 5)
.filter(item -> item > 3)
.forEach(System.out::println);// 打印結(jié)果:4,5
}
其實這段代碼等價于:
@Test
public void testFilter() {
Stream<Integer> head = Stream.of(1, 2, 3, 4, 5);
Stream<Integer> filterStream = head.filter(i -> i > 3);
filterStream.forEach(System.out::println);
filterStream.count();
}
可以看到,前面每一步都會返回一個新的Stream,嚴(yán)格的說,其實是Stream的子類,更嚴(yán)格說,是ReferencePipeline的子類,參見上圖類圖1。具體如下:
// java.util.Arrays#stream(T[], int, int)
// 創(chuàng)建一個Stream實例
public static <T> Stream<T> stream(T[] array, int startInclusive, int endExclusive) {
// spliterator方法創(chuàng)建一個ArraySpliterator,后面會關(guān)注到ArraySpliterator#forEachRemaining
return StreamSupport.stream(spliterator(array, startInclusive, endExclusive), false);
}
// java.util.stream.StreamSupport#stream(java.util.Spliterator<T>, boolean)
public static <T> Stream<T> stream(Spliterator<T> spliterator, boolean parallel) {
Objects.requireNonNull(spliterator);
// 創(chuàng)建一個管道(pipeline)的頭節(jié)點
return new ReferencePipeline.Head<>(spliterator,
StreamOpFlag.fromCharacteristics(spliterator),
parallel);
}
通過上面的代碼可以看到,真正返回的是ReferencePipeline.Head的實例,當(dāng)然,就是Stream的子類。
接著看filter()方法:
// java.util.stream.ReferencePipeline#filter
@Override
public final Stream<P_OUT> filter(Predicate<? super P_OUT> predicate) {
Objects.requireNonNull(predicate);
// filter是一個無狀態(tài)操作(StatelessOp),StatelessOp是管道的一個節(jié)點,也是Stream的子類
// 這段代碼重點關(guān)注返回的StatelessOp實例,this是當(dāng)前Stream實例,即上面代碼Stream#of方法產(chǎn)生的實例,也就是ReferencePipeline.Head實例
// 通過這個方法串聯(lián)起新建節(jié)點StatelessOp與ReferencePipeline.Head的關(guān)系,即StatelessOp#previousStage=ReferencePipeline.Head
return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE,
StreamOpFlag.NOT_SIZED) {
// 這個方法先不關(guān)注,后面還會說到
@Override
Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
return new Sink.ChainedReference<P_OUT, P_OUT>(sink) {
@Override
public void begin(long size) {
downstream.begin(-1);
}
@Override
public void accept(P_OUT u) {
if (predicate.test(u))
downstream.accept(u);
}
};
}
};
}
filter方法的默認(rèn)實現(xiàn)是在ReferencePipeline中實現(xiàn)的。上面這段代碼我們需要重點關(guān)注的是,返回了一個StatelessOp對象,也是ReferencePipeline的子類。關(guān)注一下構(gòu)造StatelessOp的過程:
// java.util.stream.ReferencePipeline.StatelessOp#StatelessOp
StatelessOp(AbstractPipeline<?, E_IN, ?> upstream,
StreamShape inputShape,
int opFlags) {
super(upstream, opFlags);
assert upstream.getOutputShape() == inputShape;
}
// java.util.stream.ReferencePipeline#ReferencePipeline(java.util.stream.AbstractPipeline<?,P_IN,?>, int)
ReferencePipeline(AbstractPipeline<?, P_IN, ?> upstream, int opFlags) {
super(upstream, opFlags);
}
// java.util.stream.AbstractPipeline#AbstractPipeline(java.util.stream.AbstractPipeline<?,E_IN,?>, int)
AbstractPipeline(AbstractPipeline<?, E_IN, ?> previousStage, int opFlags) {
if (previousStage.linkedOrConsumed)
throw new IllegalStateException(MSG_STREAM_LINKED);
previousStage.linkedOrConsumed = true;
// 這里是關(guān)聯(lián)stage
previousStage.nextStage = this;
this.previousStage = previousStage;
this.sourceOrOpFlags = opFlags & StreamOpFlag.OP_MASK;
this.combinedFlags = StreamOpFlag.combineOpFlags(opFlags, previousStage.combinedFlags);
this.sourceStage = previousStage.sourceStage;// sourceStage是ReferencePipeline.Head
if (opIsStateful())
sourceStage.sourceAnyStateful = true;
this.depth = previousStage.depth + 1;
}
通過上面的操作,形成了這樣的雙向鏈表:
ReferencePipeline.Head <-> StatelessOp
一般后面在有其他操作,例如map()、sort()、limit()等,都是返回一個ReferencePipeline實例(StatefulOp/StatelessOp),在構(gòu)造方法里面將雙向鏈表串聯(lián)起來。
最后來看forEach()方法:
// java.util.stream.ReferencePipeline#forEach
@Override
public void forEach(Consumer<? super P_OUT> action) {
// 通過工廠ForEachOps返回一個ForEachOp實例(下面會提到),本質(zhì)是一個TerminalOp,即:構(gòu)造了一個終止操作實例
evaluate(ForEachOps.makeRef(action, false));
}
evaluate()方法就是一個執(zhí)行Stream的操作。
// java.util.stream.AbstractPipeline#evaluate(java.util.stream.TerminalOp<E_OUT,R>)
final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {
assert getOutputShape() == terminalOp.inputShape();
if (linkedOrConsumed)
throw new IllegalStateException(MSG_STREAM_LINKED);
linkedOrConsumed = true;
return isParallel()
? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))
// 關(guān)注順序執(zhí)行這里
// this是由filter方法返回的Stream,嚴(yán)格說是PipelineHelper,terminalOp是上面提到的ForEachOp實例
: terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
}
現(xiàn)在我們只關(guān)注順序執(zhí)行這一段:
// java.util.stream.ForEachOps.ForEachOp#evaluateSequential
@Override
public <S> Void evaluateSequential(PipelineHelper<T> helper,
Spliterator<S> spliterator) {
// this是上面提到的ForEachOp實例,helper是filter方法返回的PipelineHelper
return helper.wrapAndCopyInto(this, spliterator).get();
}
wrapAndCopyInto方法主要干了兩件事:
- 封裝
sink鏈; - 將封裝好的
sink鏈從頭到尾執(zhí)行。
// 串聯(lián)sink并執(zhí)行
// 前面提到反向串聯(lián),這里是順序串聯(lián)
// 這個是helper的實例方法,也就是filter()方法構(gòu)造出來的實例對象
// java.util.stream.AbstractPipeline#wrapAndCopyInto
@Override
final <P_IN, S extends Sink<E_OUT>> S wrapAndCopyInto(S sink, Spliterator<P_IN> spliterator) {
// 這里的sink是上面提到的ForEachOp實例
copyInto(wrapSink(Objects.requireNonNull(sink)), spliterator);
return sink;
}
先看看對sink鏈的封裝:
// java.util.stream.AbstractPipeline#wrapSink
// sink是ForEachOp實例
@Override
@SuppressWarnings("unchecked")
final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) {
// 這里的sink是上面提到的ForEachOp實例
Objects.requireNonNull(sink);
// 不斷的將p設(shè)置為前一個stage節(jié)點,然后調(diào)用p#opWrapSink構(gòu)造出上一個stage對應(yīng)的sink實例,這樣p就知道了它的downstream是sink了
// 初始化的時候,AbstractPipeline.this對應(yīng)的是filter()方法返回的實例對象,sink是TerminalOp實例
// p.depth > 0意味著sourceStage,即ReferencePipeline.Head實例不會包含到sink鏈中,因為sourceStage.depth = 0
for ( @SuppressWarnings("rawtypes") AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) {
// 關(guān)聯(lián)好后,sink變成前一個stage
sink = p.opWrapSink(p.previousStage.combinedFlags, sink);
}
return (Sink<P_IN>) sink;// 這個sink已經(jīng)是sink鏈的鏈頭了
}

回看前面提到的
filter()方法,重點關(guān)注opWrapSink()方法:
// java.util.stream.ReferencePipeline#filter
@Override
public final Stream<P_OUT> filter(Predicate<? super P_OUT> predicate) {
Objects.requireNonNull(predicate);
return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE,
StreamOpFlag.NOT_SIZED) {
// 重點關(guān)注這里
// opWrapSink的作用是創(chuàng)建當(dāng)前節(jié)點的Sink實例。
@Override
Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
// 將當(dāng)前的stage封裝成Sink實例,實例的downstream就是參數(shù)sink
return new Sink.ChainedReference<P_OUT, P_OUT>(sink) {
@Override
public void begin(long size) {
downstream.begin(-1);
}
@Override
public void accept(P_OUT u) {
// 這里先不關(guān)注,待會還要再提到
if (predicate.test(u))
downstream.accept(u);
}
};
}
};
}
// java.util.stream.Sink.ChainedReference#ChainedReference
// 構(gòu)造方法,創(chuàng)建sink實例,同時告知downstream,這樣當(dāng)前sink實例和downstream實例就串聯(lián)起來了
public ChainedReference(Sink<? super E_OUT> downstream) {
this.downstream = Objects.requireNonNull(downstream);
}
可以看到上面方法就是構(gòu)造filter()方法這一步stage對應(yīng)的Sink實例,并且串聯(lián)起當(dāng)前sink實例與downstream。這樣在執(zhí)行的時候就知道了,當(dāng)前sink執(zhí)行完了,就調(diào)用downstream的方法執(zhí)行。
構(gòu)造好的sink鏈如下:

sink鏈構(gòu)造好了,下面就是執(zhí)行的過程:
// java.util.stream.AbstractPipeline#copyInto
@Override
final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {
// 此時的wrappedSink已經(jīng)是鏈表頭了
Objects.requireNonNull(wrappedSink);
// 沒有短路操作,執(zhí)行這里
if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) {
wrappedSink.begin(spliterator.getExactSizeIfKnown());// 執(zhí)行Sink#begin
// 這里的spliterator是ArraySpliterator,前面提到過
// 對spliterator實例里面的元素逐個執(zhí)行
spliterator.forEachRemaining(wrappedSink);
wrappedSink.end();
}
else {
copyIntoWithCancel(wrappedSink, spliterator);
}
}
// java.util.Spliterators.ArraySpliterator#forEachRemaining
@SuppressWarnings("unchecked")
@Override
public void forEachRemaining(Consumer<? super T> action) {
Object[] a; int i, hi; // hoist accesses and checks from loop
if (action == null)
throw new NullPointerException();
if ((a = array).length >= (hi = fence) &&
(i = index) >= 0 && i < (index = hi)) {
// 對于串行執(zhí)行的,就是對數(shù)組里的每個元素執(zhí)行action.accept
// 并行因為調(diào)用了trySplit,index和fence會不同,不是這里關(guān)注的重點。
// 這里的action就是filter方法里的Sink.ChainedReference
do { action.accept((T)a[i]); } while (++i < hi);
}
}
上面的方法中調(diào)用了sink鏈的accept()方法,回看前面提到的filter()方法,重點關(guān)注accept()方法:
// java.util.stream.ReferencePipeline#filter
@Override
public final Stream<P_OUT> filter(Predicate<? super P_OUT> predicate) {
Objects.requireNonNull(predicate);
return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE,
StreamOpFlag.NOT_SIZED) {
@Override
Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
return new Sink.ChainedReference<P_OUT, P_OUT>(sink) {
@Override
public void begin(long size) {
downstream.begin(-1);
}
@Override
public void accept(P_OUT u) {
// 重點關(guān)注這里
if (predicate.test(u))
// 調(diào)用downstream的accept,就串聯(lián)執(zhí)行了
downstream.accept(u);
}
};
}
};
}
// java.util.stream.ForEachOps.ForEachOp.OfRef#accept
@Override
public void accept(T t) {
// comsumer就是forEach方法里的lambda表達(dá)式
consumer.accept(t);
}
至此,一個完整的Stream流程執(zhí)行完畢。
附上Sink對應(yīng)的部分類圖:

這里
XXXOps都是對應(yīng)XXXOp的工廠,如ReduceOps是ReduceOp的工廠,用于生產(chǎn)ReduceOp實例。
總結(jié)
總結(jié)一下:
-
Stream是支持順序和并行聚集操作的一系列元素。這里著重描述的是:Stream=操作+元素; -
Pipeline是管道,PipelineHelper是將Stream的信息收集到一個地方; -
Stage就是一個完整的操作,每一個PipelineHelper(或者說ReferencePipeline)的實例就是一個Stage,關(guān)聯(lián)起所有Stage將形成一個雙向鏈表。 -
Sink鏈的主要作用是控制Stream中的每一個元素通過Stage形成的管道。 -
Stream的邏輯是:
- 構(gòu)造
stage雙向鏈表 - 利用
stage雙向鏈表逆序構(gòu)造Sink鏈 - 通過
Spliterator#forEachRemaining逐個處理元素,代碼類似于:
for (T element : allElements) {// 逐個元素處理
Sink consumer = sinkHead;// sink鏈頭
While (consumer != null) {
consumer.accept(element);// 消費元素
consumer = consumer.downstream;// 傳遞給下游處理
}
}
關(guān)于Stream帶來好處的思考
首先是編程思維的轉(zhuǎn)變,原來是面向?qū)ο?,現(xiàn)在是面向數(shù)據(jù)。你需要考慮如何通過對手里的數(shù)據(jù)進(jìn)行操作得到預(yù)期的結(jié)果。這個有點想SQL。
其次是代碼解決性、可讀性,這個感覺更多是函數(shù)式編程帶來的好處
最后,也是最重要的,依托java.util.concurrent包,將并行化能力進(jìn)行封裝,也就是說,開發(fā)者可以通過最簡單的方式實現(xiàn)并行化數(shù)據(jù)處理,不在需要控制線程(新建、啟動、通信、異常處理)。
一些參考
深入理解Java Stream流水線
Java 8 Stream探秘
JAVA8中的stream原理解析——1(串行)
java8 Stream的實現(xiàn)原理 (從零開始實現(xiàn)一個stream流)
Stream源碼分析
jdk8中Spliterator的作用
java8 Stream Pipelines 淺析
Side-effect是只一個方法不只是返回一個結(jié)果,還會改變對象的狀態(tài)。