前言
Java 8 的 Stream 使得代碼更加簡潔易懂,本篇文章深入分析 Java Stream 的工作原理,并探討 Steam 的性能問題。
Java 8 集合中的 Stream 相當(dāng)于高級版的 Iterator,它可以通過 Lambda 表達(dá)式對集合進(jìn)行各種非常便利、高效的聚合操作(Aggregate Operation),或者大批量數(shù)據(jù)操作 (Bulk Data Operation)。
Stream 的聚合操作與數(shù)據(jù)庫 SQL 的聚合操作 sorted、filter、map 等類似。我們在應(yīng)用層就可以高效地實現(xiàn)類似數(shù)據(jù)庫 SQL 的聚合操作了,而在數(shù)據(jù)操作方面,Stream 不僅可以通過串行的方式實現(xiàn)數(shù)據(jù)操作,還可以通過并行的方式處理大批量數(shù)據(jù),提高數(shù)據(jù)的處理效率。
操作分類
官方將 Stream 中的操作分為兩大類:
-
中間操作(Intermediate operations),只對操作進(jìn)行了記錄,即只會返回一個流,不會進(jìn)行計算操作。 -
終結(jié)操作(Terminal operations),實現(xiàn)了計算操作。
中間操作又可以分為:
-
無狀態(tài)(Stateless)操作,元素的處理不受之前元素的影響。 -
有狀態(tài)(Stateful)操作,指該操作只有拿到所有元素之后才能繼續(xù)下去。
終結(jié)操作又可以分為:
-
短路(Short-circuiting)操作,指遇到某些符合條件的元素就可以得到最終結(jié)果 -
非短路(Unshort-circuiting)操作,指必須處理完所有元素才能得到最終結(jié)果。
操作分類詳情如下圖所示:
源碼結(jié)構(gòu)
Stream 相關(guān)類和接口的繼承關(guān)系如下圖所示:
BaseStream
最頂端的接口類,定義了流的基本接口方法,最主要的方法為 spliterator、isParallel。
Stream
最頂端的接口類。定義了流的常用方法,例如 map、filter、sorted、limit、skip、collect 等。
ReferencePipeline
ReferencePipeline 是一個結(jié)構(gòu)類,定義內(nèi)部類組裝了各種操作流,定義了Head、StatelessOp、StatefulOp三個內(nèi)部類,實現(xiàn)了 BaseStream 與 Stream 的接口方法。
Sink
Sink 接口定義了 Stream 之間的操作行為,包含 begin()、end()、cancellationRequested()、accpt()四個方法。ReferencePipeline 最終會將整個 Stream 流操作組裝成一個調(diào)用鏈,而這條調(diào)用鏈上的各個 Stream 操作的上下關(guān)系就是通過 Sink 接口協(xié)議來定義實現(xiàn)的。
操作疊加
Stream 的基礎(chǔ)用法就不再敘述了,這里從一段代碼開始,分析 Stream 的工作原理。
@Test
public void testStream() {
List<String> names = Arrays.asList("kotlin", "java", "go");
int maxLength = names.stream().filter(name -> name.length() <= 4).map(String::length)
.max(Comparator.naturalOrder()).orElse(-1);
System.out.println(maxLength);
}
當(dāng)使用 Stream 時,主要有 3 部分組成,下面一一講解。
加載數(shù)據(jù)源
調(diào)用 names.stream() 方法,會初次加載 ReferencePipeline 的 Head 對象,此時為加載數(shù)據(jù)源操作。
java.util.Collection#stream
default Stream<E> stream() {
return StreamSupport.stream(spliterator(), false);
}
StreamSupport 類中的 stream 方法,初始化了一個 ReferencePipeline 的 Head 內(nèi)部類對象。
java.util.stream.StreamSupport#stream(java.util.Spliterator<t style="margin: 0px; padding: 0px;">, boolean)</t>
public static <T> Stream<T> stream(Spliterator<T> spliterator, boolean parallel) {
Objects.requireNonNull(spliterator);
return new ReferencePipeline.Head<>(spliterator,
StreamOpFlag.fromCharacteristics(spliterator),
parallel);
}
中間操作
接著為 filter(name -> name.length() <= 4).mapToInt(String::length),是中間操作,分為無狀態(tài)中間操作 StatelessOp 對象和有狀態(tài)操作 StatefulOp 對象,此時的 Stage 并沒有執(zhí)行,而是通過 AbstractPipeline 生成了一個中間操作 Stage 鏈表。
java.util.stream.ReferencePipeline#filter
@Override
public final Stream<P_OUT> filter(Predicate<? super P_OUT> predicate) {
Objects.requireNonNull(predicate);
return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE,
StreamOpFlag.NOT_SIZED) {
@Override
Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
return new Sink.ChainedReference<P_OUT, P_OUT>(sink) {
@Override
public void begin(long size) {
downstream.begin(-1);
}
@Override
public void accept(P_OUT u) {
if (predicate.test(u))
downstream.accept(u);
}
};
}
};
}
java.util.stream.ReferencePipeline#map
@Override
@SuppressWarnings("unchecked")
public final <R> Stream<R> map(Function<? super P_OUT, ? extends R> mapper) {
Objects.requireNonNull(mapper);
return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,
StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
@Override
Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {
return new Sink.ChainedReference<P_OUT, R>(sink) {
@Override
public void accept(P_OUT u) {
downstream.accept(mapper.apply(u));
}
};
}
};
}
可以看到 filter 和 map 方法都返回了一個新的 StatelessOp 對象。new StatelessOp 將會調(diào)用父類 AbstractPipeline 的構(gòu)造函數(shù),這個構(gòu)造函數(shù)將前后的 Stage 聯(lián)系起來,生成一個 Stage 鏈表:
AbstractPipeline(AbstractPipeline<?, E_IN, ?> previousStage, int opFlags) {
if (previousStage.linkedOrConsumed)
throw new IllegalStateException(MSG_STREAM_LINKED);
previousStage.linkedOrConsumed = true;
previousStage.nextStage = this;
this.previousStage = previousStage;
this.sourceOrOpFlags = opFlags & StreamOpFlag.OP_MASK;
this.combinedFlags = StreamOpFlag.combineOpFlags(opFlags, previousStage.combinedFlags);
this.sourceStage = previousStage.sourceStage;
if (opIsStateful())
sourceStage.sourceAnyStateful = true;
this.depth = previousStage.depth + 1;
}
終結(jié)操作
最后為 max(Comparator.naturalOrder()),是終結(jié)操作,會生成一個最終的 Stage,通過這個 Stage 觸發(fā)之前的中間操作,從最后一個 Stage 開始,遞歸產(chǎn)生一個 Sink 鏈。
java.util.stream.ReferencePipeline#max
@Override
public final Optional<P_OUT> max(Comparator<? super P_OUT> comparator) {
return reduce(BinaryOperator.maxBy(comparator));
}
最終調(diào)用到 java.util.stream.AbstractPipeline#wrapSink,這個方法會調(diào)用 opWrapSink 生成一個 Sink 鏈表,對應(yīng)到本文的例子,就是 filter 和 map 操作。
@Override
@SuppressWarnings("unchecked")
final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) {
Objects.requireNonNull(sink);
for ( @SuppressWarnings("rawtypes") AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) {
sink = p.opWrapSink(p.previousStage.combinedFlags, sink);
}
return (Sink<P_IN>) sink;
}
在上面 opWrapSink 上斷點調(diào)試,發(fā)現(xiàn)最終會調(diào)用到本例中的 filter 和 map 操作。
wrapAndCopyInto 生成 Sink 鏈表后,會通過 copyInfo 方法執(zhí)行 Sink 鏈表的具體操作。
@Override
final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {
Objects.requireNonNull(wrappedSink);
if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) {
wrappedSink.begin(spliterator.getExactSizeIfKnown());
spliterator.forEachRemaining(wrappedSink);
wrappedSink.end();
}
else {
copyIntoWithCancel(wrappedSink, spliterator);
}
}
上面的核心代碼是:
spliterator.forEachRemaining(wrappedSink);
java.util.Spliterators.ArraySpliterator#forEachRemaining
@Override
public void forEachRemaining(Consumer<? super T> action) {
Object[] a; int i, hi; // hoist accesses and checks from loop
if (action == null)
throw new NullPointerException();
if ((a = array).length >= (hi = fence) &&
(i = index) >= 0 && i < (index = hi)) {
do { action.accept((T)a[i]); } while (++i < hi);
}
}
斷點調(diào)試,可以發(fā)現(xiàn)首先進(jìn)入了 filter 的 Sink,其中 accept 方法的入?yún)⑹?list 中的第一個元素“kotlin”(代碼中的 3 個元素是:"kotlin", "java", "go")。filter 的傳入是一個 Lambda 表達(dá)式:
filter(name -> name.length() <= 4)
顯然這個第一個元素“kotlin”的 predicate 是不會進(jìn)入的。
對于第二個元素“java”,predicate.test 會返回 true(字符串“java”的長度<=4),則會進(jìn)入 map 的 accept 方法。
本次調(diào)用 accept 方法時,empty 為 false,會將 map 后的結(jié)果(int 類型的 4)賦值給 t。
public static <T> TerminalOp<T, Optional<T>>
makeRef(BinaryOperator<T> operator) {
Objects.requireNonNull(operator);
class ReducingSink
implements AccumulatingSink<T, Optional<T>, ReducingSink> {
private boolean empty;
private T state;
public void begin(long size) {
empty = true;
state = null;
}
@Override
public void accept(T t) {
if (empty) {
empty = false;
state = t;
} else {
state = operator.apply(state, t);
}
}
……
}
}
對于第三個元素“go”,也會進(jìn)入 accept 方法,此時 empty 為 true, map 后的結(jié)果(int 類型的 2)會與上次的結(jié)果 4 通過自定義的比較器相比較,存入符合結(jié)果的值。
public static <T> BinaryOperator<T> maxBy(Comparator<? super T> comparator) {
Objects.requireNonNull(comparator);
return (a, b) -> comparator.compare(a, b) >= 0 ? a : b;
}
本文代碼中的 max 傳入的比較器為:
max(Comparator.naturalOrder())
至此會返回 int 類型的 4。
并行處理
上面的例子是串行處理的,如果要改成并行也很簡單,只需要在 stream() 方法后加上 parallel() 就可以了,并行代碼可以寫成:
@Test
public void testStream() {
List<String> names = Arrays.asList("kotlin", "java", "go");
int maxLength = names.stream().parallel().filter(name -> name.length() <= 4)
.map(String::length).max(Comparator.naturalOrder()).orElse(-1);
System.out.println(maxLength);
}
Stream 的并行處理在執(zhí)行終結(jié)操作之前,跟串行處理的實現(xiàn)是一樣的。而在調(diào)用終結(jié)方法之后,實現(xiàn)的方式就有點不太一樣,會調(diào)用 TerminalOp 的 evaluateParallel 方法進(jìn)行并行處理。
final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {
assert getOutputShape() == terminalOp.inputShape();
if (linkedOrConsumed)
throw new IllegalStateException(MSG_STREAM_LINKED);
linkedOrConsumed = true;
return isParallel()
? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))
: terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
}
核心是使用了 ForkJoin 框架,對 Stream 處理進(jìn)行分片,最終會調(diào)用下面的代碼,這里就不展開分析了。
java.util.stream.AbstractTask#compute
@Override
public void compute() {
Spliterator<P_IN> rs = spliterator, ls; // right, left spliterators
long sizeEstimate = rs.estimateSize();
long sizeThreshold = getTargetSize(sizeEstimate);
boolean forkRight = false;
@SuppressWarnings("unchecked") K task = (K) this;
while (sizeEstimate > sizeThreshold && (ls = rs.trySplit()) != null) {
K leftChild, rightChild, taskToFork;
task.leftChild = leftChild = task.makeChild(ls);
task.rightChild = rightChild = task.makeChild(rs);
task.setPendingCount(1);
if (forkRight) {
forkRight = false;
rs = ls;
task = leftChild;
taskToFork = rightChild;
}
else {
forkRight = true;
task = rightChild;
taskToFork = leftChild;
}
taskToFork.fork();
sizeEstimate = rs.estimateSize();
}
task.setLocalResult(task.doLeaf());
task.tryComplete();
}
并行錯誤的使用方法
@Test
public void testParallelWrong() {
List<Integer> parallelList = new ArrayList<>();
IntStream.range(0, 1000).boxed().parallel().filter(i -> i % 2 == 1)
.forEach(parallelList::add);
System.out.println(parallelList.size());
}
上面的輸出結(jié)果會經(jīng)常小于 500,這是因為 parallelList 的類型是 ArrayList,并不是線程安全的,在執(zhí)行 add 操作時,可能正好趕上擴容或者線程被占用,會覆蓋其他線程的賦好的值。
并行正確的使用方法
@Test
public void testParallelRight() {
List<Integer> parallelList = IntStream.range(0, 1000).boxed().parallel()
.filter(i -> i % 2 == 1).collect(Collectors.toList());
System.out.println(parallelList.size());
}
上面的輸出結(jié)果是 500。
性能
為保證測試結(jié)果真實可信,我們將 JVM 運行在-server模式下,測試數(shù)據(jù)在 GB 量級,測試機器采用常見的商用服務(wù)器,配置如下:
| OS | CentOS 6.7 x86_64 |
| CPU | Intel Xeon X5675, 12M Cache 3.06 GHz, 6 Cores 12 Threads |
| 內(nèi)存 | 96GB |
| JDK | java version 1.8.0_91, Java HotSpot(TM) 64-Bit Server VM |
測試方法和測試數(shù)據(jù)
性能測試并不是容易的事,Java 性能測試更費勁,因為虛擬機對性能的影響很大,JVM 對性能的影響有兩方面:
- GC 的影響。GC 的行為是 Java 中很不好控制的一塊,為增加確定性,我們手動指定使用 CMS 收集器,并使用 10GB 固定大小的堆內(nèi)存。具體到 JVM 參數(shù)就是
-XX:+UseConcMarkSweepGC -Xms10G -Xmx10G - JIT(Just-In-Time) 即時編譯技術(shù)。即時編譯技術(shù)會將熱點代碼在 JVM 運行的過程中編譯成本地代碼,測試時我們會先對程序預(yù)熱,觸發(fā)對測試函數(shù)的即時編譯。相關(guān)的 JVM 參數(shù)是
-XX:CompileThreshold=10000。
Stream 并行執(zhí)行時用到ForkJoinPool.commonPool()得到的線程池,為控制并行度我們使用 Linux 的taskset命令指定 JVM 可用的核數(shù)。
測試數(shù)據(jù)由程序隨機生成。為防止一次測試帶來的抖動,測試 4 次求出平均時間作為運行時間。
實驗一 基本類型迭代
測試內(nèi)容:找出整型數(shù)組中的最小值。對比 for 循環(huán)外部迭代和 Stream API 內(nèi)部迭代性能。
測試程序 IntTest,測試結(jié)果如下圖:
圖中展示的是 for 循環(huán)外部迭代耗時為基準(zhǔn)的時間比值。分析如下:
- 對于基本類型 Stream 串行迭代的性能開銷明顯高于外部迭代開銷(兩倍);
- Stream 并行迭代的性能比串行迭代和外部迭代都好。
并行迭代性能跟可利用的核數(shù)有關(guān),上圖中的并行迭代使用了全部 12 個核,為考察使用核數(shù)對性能的影響,我們專門測試了不同核數(shù)下的 Stream 并行迭代效果:
分析,對于基本類型:
- 使用 Stream 并行 API 在單核情況下性能很差,比 Stream 串行 API 的性能還差;
- 隨著使用核數(shù)的增加,Stream 并行效果逐漸變好,比使用 for 循環(huán)外部迭代的性能還好。
以上兩個測試說明,對于基本類型的簡單迭代,Stream 串行迭代性能更差,但多核情況下 Stream 迭代時性能較好。
實驗二 對象迭代
再來看對象的迭代效果。
測試內(nèi)容:找出字符串列表中最小的元素(自然順序),對比 for 循環(huán)外部迭代和 Stream API 內(nèi)部迭代性能。
測試程序 StringTest,測試結(jié)果如下圖:
結(jié)果分析如下:
- 對于對象類型 Stream 串行迭代的性能開銷仍然高于外部迭代開銷(1.5 倍),但差距沒有基本類型那么大。
- Stream 并行迭代的性能比串行迭代和外部迭代都好。
再來單獨考察 Stream 并行迭代效果:
分析,對于對象類型:
- 使用 Stream 并行 API 在單核情況下性能比 for 循環(huán)外部迭代差;
- 隨著使用核數(shù)的增加,Stream 并行效果逐漸變好,多核帶來的效果明顯。
以上兩個測試說明,對于對象類型的簡單迭代,Stream 串行迭代性能更差,但多核情況下 Stream 迭代時性能較好。
實驗三 復(fù)雜對象歸約
從實驗一、二的結(jié)果來看,Stream 串行執(zhí)行的效果都比外部迭代差(很多),是不是說明 Stream 真的不行了?先別下結(jié)論,我們再來考察一下更復(fù)雜的操作。
測試內(nèi)容:給定訂單列表,統(tǒng)計每個用戶的總交易額。對比使用外部迭代手動實現(xiàn)和 Stream API 之間的性能。
我們將訂單簡化為<userName, price, timeStamp>構(gòu)成的元組,并用Order對象來表示。測試程序 ReductionTest,測試結(jié)果如下圖:
分析,對于復(fù)雜的歸約操作:
- Stream API 的性能普遍好于外部手動迭代,并行 Stream 效果更佳;
再來考察并行度對并行效果的影響,測試結(jié)果如下:
分析,對于復(fù)雜的歸約操作:
- 使用 Stream 并行歸約在單核情況下性能比串行歸約以及手動歸約都要差,簡單說就是最差的;
- 隨著使用核數(shù)的增加,Stream 并行效果逐漸變好,多核帶來的效果明顯。
以上兩個實驗說明,對于復(fù)雜的歸約操作,Stream 串行歸約效果好于手動歸約,在多核情況下,并行歸約效果更佳。我們有理由相信,對于其他復(fù)雜的操作,Stream API 也能表現(xiàn)出相似的性能表現(xiàn)。
結(jié)論
上述三個實驗的結(jié)果可以總結(jié)如下:
- 對于
簡單操作,比如最簡單的遍歷,Stream 串行 API 性能明顯差于顯式迭代,但并行的 Stream API 能夠發(fā)揮多核特性。 - 對于
復(fù)雜操作,Stream 串行 API 性能可以和手動實現(xiàn)的效果匹敵,在并行執(zhí)行時 Stream API 效果遠(yuǎn)超手動實現(xiàn)。
所以,如果出于性能考慮:
- 對于簡單操作推薦使用外部迭代手動實現(xiàn)
- 對于復(fù)雜操作,推薦使用 Stream API
- 在多核情況下,推薦使用并行 Stream API 來發(fā)揮多核優(yōu)勢
- 單核情況下不建議使用并行 Stream API
為什么簡單操作,Stream API 不如迭代,而復(fù)雜操作性能差不多?
其實根據(jù)上面 Stream 源碼的分析,結(jié)合 Stream 的使用,可以看出如果是簡單操作的話,Stream 也會包括加載數(shù)據(jù)源、中間操作、終結(jié)操作等,所以簡單操作會慢于顯式迭代。














