問(wèn)題
在學(xué)習(xí) Java Stream 的過(guò)程中遇到了一段代碼:
int sum = IntStream.range(1, 20)
.peek(n -> System.out.print(n + "\t"))
.skip(5)
.limit(5)
.sum();
System.out.println();
System.out.println("sum is :" + sum);
運(yùn)行結(jié)果:
1 2 3 4 5 6 7 8 9 10
sum is :40
改為并行流:
int sum = IntStream.range(1, 20)
.parallel()
.peek(n -> System.out.print(n + "\t"))
.skip(5)
.limit(5)
.sum();
System.out.println();
System.out.println("sum is :" + sum);
原本以為被打印出來(lái)的數(shù)字應(yīng)該是無(wú)序的 1~10 的數(shù)字排列,但運(yùn)行結(jié)果出乎我的意料:每次都是只打印 5~10 這6個(gè)數(shù)字,卻沒(méi)有打印 1~4。運(yùn)行結(jié)果如下:
7 9 5 10 8 6
sum is :40
就算是 JDK 做了優(yōu)化,那為何不是跳過(guò)前 5 個(gè)數(shù)字,只打印 5 個(gè)數(shù)字,而是打印了 6 個(gè)數(shù)字?
后來(lái)又發(fā)現(xiàn)更加奇怪的現(xiàn)象,將 skip 由 5 改為 8 ,每次都能打印 1~13 全部數(shù)字:
12 11 13 1 10 2 3 4 5 6 7 8 9
sum is :55
skip 改為7,每次則只打印8~12這5個(gè)數(shù)字,跳過(guò)了 1~7 這7個(gè)數(shù)字:
8 9 11 10 12
sum is :50
這三次 skip 參數(shù)的變動(dòng),打印出來(lái)的結(jié)果截然不同,也沒(méi)有看出什么規(guī)律,那就只能從源碼中找答案。
原因
流與操作
在上面那段代碼中,我們一共會(huì)遇到3種類型的流(Head、StatelessOp、StatefulOp)及1種終止操作(ReduceOp)。
- range(...)是一個(gè)創(chuàng)建流的動(dòng)作,創(chuàng)建了一個(gè)Head類型(IntPipeline.Head)的流,是其他流的源頭。
- peek(...) 是屬于無(wú)狀態(tài)的操作(S[圖片上傳中...(類關(guān)系圖.png-e38fae-1519806177856-0)]
tatelessOp),本質(zhì)是創(chuàng)建了一個(gè)IntPipeline.StatelessOp類型的流,是用于處理流(BaseStream)的中間操作管道(Pipeline)。 - skip(...) 和 limit(...) 是屬于有狀態(tài)的操作(StatefulOp),本質(zhì)是創(chuàng)建了一個(gè)IntPipeline.StatefulOp類型的流,是用于處理流(BaseStream)的中間操作管道(Pipeline)。
- sum(...) 本質(zhì)是創(chuàng)建了一個(gè)ReduceOp,是屬于終止操作(TerminalOp)的一種,遇到終止操作的時(shí)候,才會(huì)觸發(fā)對(duì)整個(gè)流的求值(AbstractPipeline#evaluate)。
以上幾種流以及終止操作的類關(guān)系圖如下所示:

- BaseStream:提供流的基本接口,如迭代器(iterator、spliterator),流的類型及判斷(isParallel、sequential、parallel、unordered)等。
- IntStream:定義了流的各種最常見(jiàn)的操作接口,如skip、limit、peek等。
- PipelineHelper:定義了流水線過(guò)程中需要用到的輔助方法,如evaluate、exactOutputSizeIfKnown、copyInto等。
- AbstractPipeline:繼承了PipelineHelper,包含了基本的管道操作實(shí)現(xiàn)。里面包含了sourceStage、previousStage、nextStage,由此可以組成一個(gè)雙向鏈表。
可以看出,Head、StatefulOp、StatelessOp都屬于BaseStream,并且也都繼承了AbstractPipeline。而終止操作既不屬于流(BaseStream),也不屬于管道(AbstractPipeline)。
了解了這些關(guān)系,再來(lái)看一下上述代碼各個(gè)管道組成的流水線:

Sum觸發(fā)了整個(gè)流的求值,在這個(gè)流中,除了Head之外,還有三個(gè)操作,即peek、skip、limit。peek其本質(zhì)是一個(gè)StatelessOp,僅將遍歷到的值按照給定的Consumer執(zhí)行,再將值傳遞給下游,peek方法的源碼如下:

在 skip 值等于 5 和 7 的時(shí)候,并行流中之所以沒(méi)有打印出部分?jǐn)?shù)字,說(shuō)明這部分?jǐn)?shù)字根本就沒(méi)有傳遞給 peek ,但為何會(huì)沒(méi)有遍歷到這部分?jǐn)?shù)字呢?那就只能猜測(cè)是 skip 和 limit 對(duì)流的源頭產(chǎn)生了影響。
SliceOps
再仔細(xì)翻看代碼,在AbstractPipeline的evaluate方法中發(fā)現(xiàn)了sourceSpliterator方法:

該方法的文檔上寫著:
Get the source spliterator for this pipeline stage. For a sequential or stateless parallel pipeline, this is the source spliterator. For a stateful parallel pipeline, this is a spliterator describing the results of all computations up to and including the most recent stateful operation.
很明顯,在我們的流中包含了 2 個(gè)有狀態(tài)的操作: skip 和 limit 。那么最終返回的Spliterator就不是Source Spliterator(Head)了,而是一個(gè)包含了最近的 StatefulOps 所有計(jì)算結(jié)果的Spliterator。也就是說(shuō),有狀態(tài)的操作在并行流求值中,可能會(huì)改變?cè)剂鳌?/p>

上圖是 sourceSpliterator 中的代碼片段,可以看出,如果是有狀態(tài)的操作,則會(huì)調(diào)用 StatefulOp 的 opEvaluateParallelLazy 方法計(jì)算出新的 spliterator 。

再看 skip 和 limit 方法的代碼,可以發(fā)現(xiàn)他們都是通過(guò) SliceOps#makeInt(...)方法創(chuàng)建出來(lái)的。

當(dāng)原始流是有邊界(SIZED、SUBSIZED)的時(shí)候,會(huì)使用SliceSpliterator將原始流包裹起來(lái),并設(shè)置 SliceSpliterator 的 sliceOrigin 和 sliceFence 屬性。這是兩個(gè)對(duì)原始流做切片的位置標(biāo)識(shí),在原始流的基礎(chǔ)上,標(biāo)明了要切片的起始位置和終止位置,最終在遍歷的時(shí)候,會(huì)根據(jù) sliceOrigin 和 sliceFence 來(lái)控制流的起止。大致流程如下所示:


至此大致可以確定部分?jǐn)?shù)字未打印出來(lái),是跟有狀態(tài)的管道操作(StatefulOp)有關(guān)。但仍然不能解釋:為何當(dāng)skip=7時(shí),不會(huì)打印前7個(gè)數(shù),而當(dāng)skip=5時(shí),為何只跳過(guò)了前4個(gè)數(shù);當(dāng)skip=8時(shí),又為何能打印出全部數(shù)字?
ForkJoinTask & Spliterator
在并行流中,終止操作會(huì)調(diào)用 evaluateParallel 方法來(lái)對(duì)整個(gè)流求值,在這個(gè)方法中會(huì)創(chuàng)建一個(gè)ReduceTask并獲取其結(jié)果。
@Override
public <P_IN> R evaluateParallel(PipelineHelper<T> helper,
Spliterator<P_IN> spliterator) {
return new ReduceTask<>(this, helper, spliterator).invoke().get();
}
ReduceTask繼承于AbstractTask,而AbstractTask則是一種ForkJoinTask。類關(guān)系圖如下所示:

Fork/Join框架是Java7提供了的一個(gè)用于并行執(zhí)行任務(wù)的框架, 是一個(gè)把大任務(wù)分割成若干個(gè)小任務(wù),最終匯總每個(gè)小任務(wù)結(jié)果后得到大任務(wù)結(jié)果的框架。
...
ForkJoinTask與一般的任務(wù)的主要區(qū)別在于它需要實(shí)現(xiàn)compute方法,在這個(gè)方法里,首先需要判斷任務(wù)是否足夠小,如果足夠小就直接執(zhí)行任務(wù)。如果不足夠小,就必須分割成兩個(gè)子任務(wù),每個(gè)子任務(wù)在調(diào)用fork方法時(shí),又會(huì)進(jìn)入compute方法,看看當(dāng)前子任務(wù)是否需要繼續(xù)分割成孫任務(wù),如果不需要繼續(xù)分割,則執(zhí)行當(dāng)前子任務(wù)并返回結(jié)果。使用join方法會(huì)等待子任務(wù)執(zhí)行完并得到其結(jié)果。
from Fork/Join框架介紹

接下來(lái)看一下在AbstractTask中的compute方法。
首先會(huì)計(jì)算預(yù)測(cè)數(shù)據(jù)大?。╯izeEstimate),同時(shí)會(huì)根據(jù) sizeEstimate 設(shè)置一個(gè)用于拆分任務(wù)的最細(xì)粒度閾值(sizeThreshold),計(jì)算方式為:
sizeThreshold = sizeEstimate / commonParallelism * 4
- sizeEstimate:預(yù)測(cè)流的數(shù)據(jù)大小,不同流計(jì)算方式不一樣。
- commonParallelism:默認(rèn)ForkJoinPool的大小,默認(rèn)為Runtime.getRuntime().availableProcessors() - 1,最小值為1,最大值為32767(0x7fff),默認(rèn)為可以通過(guò)設(shè)置java.util.concurrent.ForkJoinPool.common.parallelism來(lái)指定。具體詳見(jiàn)java.util.concurrent.FockJoinPool#makeCommonPool()。
還記得上一節(jié)的 SliceOps 嗎?原始流受 skip 和 limit 的影響,在傳入 ReduceTask 的時(shí)候已經(jīng)被包裝成了 SliceSpliterator 。原始流的 sizeEstimate = 19 ,而經(jīng)過(guò) SliceSpliterator 包裝之后的 SliceSpliterator ,其 sizeEstimate = 5 ,如下圖第三步綠色部分所示:

我是在一臺(tái)8核的筆記本上運(yùn)行該程序,因此計(jì)算得到 sizeThreshold = 5 / ((8 - 1) * 4) = 5 / 28 ≈ 1 (最小值為1,不能為0),即用于拆分任務(wù)的最細(xì)粒度閾值(sizeThreshold)為 1 。
計(jì)算完sizeThreshold后,接著進(jìn)行任務(wù)拆分。
將Stream拆分成多個(gè)部分的算法是一個(gè)遞歸過(guò)程,如圖所示。第一步是對(duì)第一個(gè) Spliterator調(diào)用trySplit,生成第二個(gè)Spliterator。第二步對(duì)這兩個(gè)Spliterator調(diào)用 trysplit,這樣總共就有了四個(gè)Spliterator。這個(gè)框架不斷對(duì)Spliterator調(diào)用trySplit 直到它返回null,表明它處理的數(shù)據(jù)結(jié)構(gòu)不能再分割,如第三步所示。最后,這個(gè)遞歸拆分過(guò) 程到第四步就終止了,這時(shí)所有的Spliterator在調(diào)用trySplit時(shí)都返回了null。
image
from 《Java8實(shí)戰(zhàn)》Spliterator拆分過(guò)程
在 SliceSpliterator#trySplit() 中,會(huì)直接調(diào)用原始流的trySplit()方法,上述代碼用到的是range,則調(diào)用的是RangeIntSpliterator的trySplit()方法。
@Override
public Spliterator.OfInt trySplit() {
long size = estimateSize();
return size <= 1
? null
// Left split always has a half-open range
: new RangeIntSpliterator(from, from = from + splitPoint(size), 0);
}
private static final int BALANCED_SPLIT_THRESHOLD = 1 << 24;
private static final int RIGHT_BALANCED_SPLIT_RATIO = 1 << 3;
private int splitPoint(long size) {
int d = (size < BALANCED_SPLIT_THRESHOLD) ? 2 : RIGHT_BALANCED_SPLIT_RATIO;
// Cast to int is safe since:
// 2 <= size < 2^32
// 2 <= d <= 8
return (int) (size / d);
}
由于原始流的大小遠(yuǎn)遠(yuǎn)小于BALANCED_SPLIT_THRESHOLD,因此默認(rèn)是對(duì)半拆分。當(dāng)拆分到 預(yù)測(cè)大小小于或等于拆分最細(xì)粒度(estimateSize <= sizeThreshold)時(shí) ,則停止拆分,才進(jìn)行流的求值操作。
在測(cè)試代碼中,原始流是由RangeIntSpliterator生成,其 estimateSize 的計(jì)算方式如下,通俗地說(shuō)為range圈定的大小 [1,20):
@Override
public long estimateSize() {
return upTo - from + last;
}
但是,經(jīng)過(guò) SliceSpliterator 包裝后,其 estimateSize 的計(jì)算方式如下,通俗地說(shuō)為切片后的大?。?/p>
public long estimateSize() {
return (sliceOrigin < fence)
? fence - Math.max(sliceOrigin, index) : 0;
}
當(dāng)skip(5).limit(5)的時(shí)候,其切片過(guò)程如下所示:

最終會(huì)被分割成5個(gè)子任務(wù):
- 子任務(wù)1:[5,6], estimateSize = 1,因?yàn)橛行?shù)字是6,5是被skip掉的,所以estimateSize =1
- 子任務(wù)2:[7], estimateSize = 1
- 子任務(wù)3:[8], estimateSize = 1
- 子任務(wù)4:[9], estimateSize = 1
- 子任務(wù)5:[10,19], estimateSize = 1,數(shù)字10之后的數(shù)字會(huì)被切片切掉
注意,在[1,9]拆分的過(guò)程中,[1,4]由于剛好是在在切片范圍之外(estimateSize=0,被skip掉的部分),在trySplit方法中返回的是null,直接被丟棄掉,在后續(xù)對(duì)流切片進(jìn)行求值的時(shí)候,[1,4]也就永遠(yuǎn)不會(huì)被peek處理到。而切片1中的數(shù)字5剛好和數(shù)字6分配在了同一個(gè)切片中,在求值的時(shí)候,peek始終不會(huì)打印前4個(gè)數(shù)字,這也就是為什么當(dāng)skip(5)的時(shí)候,數(shù)字5能夠被peek出來(lái)。
同理,當(dāng)skip(7).limit(5)的時(shí)候,原本的[5,6]切片和[7]切片都因?yàn)閑stimateSize=0而被丟棄,所以peek始終不會(huì)打印前7個(gè)數(shù)字,而只會(huì)打印[8,12]這5個(gè)數(shù)字,最終子任務(wù)分割如下所示:
- 子任務(wù)1:[8], estimateSize = 1
- 子任務(wù)2:[9], estimateSize = 1
- 子任務(wù)3:[10], estimateSize = 1
- 子任務(wù)4:[11], estimateSize = 1
- 子任務(wù)5:[12], estimateSize = 1
最后再來(lái)看,當(dāng)skip(8).limit(5)的時(shí)候,最終子任務(wù)分割如下所示:
- 子任務(wù)1:[1,9], estimateSize = 1,因?yàn)閟kip前8個(gè)數(shù)字,有效數(shù)字僅剩數(shù)字9一個(gè)了
- 子任務(wù)2:[10], estimateSize = 1
- 子任務(wù)3:[11], estimateSize = 1
- 子任務(wù)4:[12], estimateSize = 1
- 子任務(wù)5:[13,14], estimateSize = 1,數(shù)字14會(huì)被切片切掉
很容易就可以看出,前8個(gè)數(shù)字剛好跟數(shù)字9分配在一個(gè)子任務(wù)中,且剛好構(gòu)成了最小子任務(wù)(estimateSize = 1),因此peek始終可以打印出[1,13]這13個(gè)數(shù)字。
總結(jié)
- 在并行流的情況下,有狀態(tài)操作(StatefulOp)可能會(huì)直接對(duì)原始流(Head)的遍歷產(chǎn)生影響,如 skip 和 limit 對(duì)原始流進(jìn)行切片(SliceOps)。
- 在并行流的情況下,會(huì)使用 ForkJoinTask 進(jìn)行并行計(jì)算,而 ForkJoinTask 會(huì)對(duì)任務(wù)(流)進(jìn)行分割,同時(shí)會(huì)丟棄掉estimateSize=0的子任務(wù)(流),做個(gè)不是很恰當(dāng)?shù)谋扔鳎嚎梢哉J(rèn)為是對(duì)整個(gè)流做一個(gè)不太精確的 “trim” 操作(如skip=5和skip=8的情況)。切片范圍不同,會(huì)對(duì) ForkJoinTask 分割子任務(wù)產(chǎn)生不同的影響,從而影響了程序運(yùn)行結(jié)果。
寫得有點(diǎn)啰嗦,不過(guò)總算是搞清楚問(wèn)題的原因了,如有理解不對(duì)的地方,還請(qǐng)批評(píng)指正。
