基于JDK12
Stream的獲取過程
import java.util.stream.Stream;
// 創(chuàng)建一個String流
Stream<String> xx = Stream.of("xx", "xx");
//Stream.of方法調用鏈
public static<T> Stream<T> of(T... values) {
return Arrays.stream(values);
}
import java.util.Arrays;
public static <T> Stream<T> stream(T[] array) {
return stream(array, 0, array.length);
}
public static <T> Stream<T> stream(T[] array, int startInclusive,
int endExclusive) {
return StreamSupport.stream(
spliterator(array, startInclusive, endExclusive), false);
}
Q1:什么是ArraySpliterator?
A1:ArraySpliterator是一個維護一個數組的可分割的迭代器,可用于并行迭代。
// spliterator方法的調用鏈
import java.util.Arrays;
public static <T> Spliterator<T> spliterator(T[] array, int startInclusive,
int endExclusive) {
return Spliterators.spliterator(array, startInclusive, endExclusive,
Spliterator.ORDERED | Spliterator.IMMUTABLE);
}
import java.util.Spliterators;
public static <T> Spliterator<T> spliterator(Object[] array, int fromIndex,
int toIndex, int additionalCharacteristics) {
checkFromToBounds(Objects.requireNonNull(array).length,fromIndex,toIndex);
// 最后返回一個數組分離迭代器
return new ArraySpliterator<>(array, fromIndex, toIndex,
additionalCharacteristics);
}

package java.util;
public interface Spliterator<T> {
// 定義許多特征量表示此分離迭代器維護的數據的特征和操作數據的特征
/* ORDERED == 1 << 4,代表數組元素根據索引順序訪問是有有意義的,
例如List而hash-based的集合根據索引順序訪問是沒有意義的
*/
public static final int ORDERED = 0x00000010;
// DISTINCT == 1,表示元素不重復,x.equals(y) == false;
public static final int DISTINCT = 0x00000001;
/* SORTED == 1 << 2,代表元素是有序的(可能是自然順序),
getComparator方法返回null時,表示自然排序
必須同時具有ORDERED特征
*/
public static final int SORTED = 0x00000004;
/* SIZED == 1 << 6,表示元素的數量是有限的大小
從estimateSize()遍歷或分割之前返回的值是有限且精確的
*/
public static final int SIZED = 0x00000040;
// NONNULL == 1 << 8,表示元素不能為null
public static final int NONNULL = 0x00000100;
// IMMUTABLE == 1 << 10,表示不能CRUD
public static final int IMMUTABLE = 0x00000400;
/* CONCURRENT == 1 << 12,代表并發(fā)CURD
在最上層分離器中不能和SIZED同時出現,子分離器可以
在最上層分離器中不能和IMMUTABLE同時出現,子分離器可以
*/
public static final int CONCURRENT = 0x00001000;
/* SUBSIZED == 1 << 14代表trySplit()分離出來的子分離器都將是SIZED和SUBSIZED
但是沒有SIZED,卻有SUBSIZED是不恰當的
*/
public static final int SUBSIZED = 0x00004000
// 在Spliterator,判斷是否具有某個特征
default boolean hasCharacteristics(int characteristics) {
return (characteristics() & characteristics) == characteristics;
}
// 在Spliterator,存在SIZED特征返回estimateSize(),否則返回-1
default long getExactSizeIfKnown() {
return (characteristics() & SIZED) == 0 ? -1L : estimateSize();
}
}
package java.util;
static final class ArraySpliterator<T> implements Spliterator<T> {
/* array表示分離迭代器維護的數組
index至fence表示當前分離迭代器維護array的范圍(不包括fence)
characteristics表示特征量
*/
private final Object[] array;
private int index;
private final int fence;
private final int characteristics;
public ArraySpliterator(Object[] array, int additionalCharacteristics) {
this(array, 0, array.length, additionalCharacteristics);
}
public ArraySpliterator(Object[] array, int origin, int fence,
int additionalCharacteristics) {
/* this.origin = 0, this.fence = array.length,
也就是本分離器維護整個數組
this.characteristics特征為Spliterator.SIZED | Spliterator.SUBSIZED
| Spliterator.ORDERED | Spliterator.IMMUTABLE
*/
this.array = array;
this.index = origin;
this.fence = fence;
this.characteristics = additionalCharacteristics | Spliterator.SIZED
| Spliterator.SUBSIZED;
}
/* 此方法將創(chuàng)一個新的分離迭代器用于維護原分離迭代器的一半范圍的元素
并將原分離迭代器的范圍設置成原來的一半
*/
@Override
public Spliterator<T> trySplit() {
int lo = index, mid = (lo + fence) >>> 1; // 中間索引
/* lo >= mid 維護的索引范圍為1,或者出現lo > fence的錯誤,返回null
否則,創(chuàng)建一個新的ArraySpliterator對象,
index = mid, 原分離迭代維護mid至fence范圍,新分離迭代器維護lo至mid范圍
*/
return (lo >= mid)
? null
: new ArraySpliterator<>(array, lo, index = mid,
characteristics);
}
/* 這個是一個終止操作類型的方法,使用在流操作的最后一個操作
對維護范圍內的元素進行消費,index = fence導致分離迭器代維護0個元素
也就是分離迭代器不能使用了
*/
@SuppressWarnings("unchecked")
@Override
public void forEachRemaining(Consumer<? super T> action) {
Object[] a; int i, hi;
if (action == null)
throw new NullPointerException();
/* 判斷fence是否<= array.length
判斷index是否>=0并且<fence
最后index=fence,終結迭代器
*/
if ((a = array).length >= (hi = fence) &&
(i = index) >= 0 && i < (index = hi)) {
// 消費元素
do { action.accept((T)a[i]); } while (++i < hi);
}
}
/* 每次消費一個元素位于index處,并且index += 1
對比forEachRemaining不是一個終止操作的方法,
但是index == fence 的時候必定終結
*/
@Override
public boolean tryAdvance(Consumer<? super T> action) {
if (action == null)
throw new NullPointerException();
if (index >= 0 && index < fence) {
@SuppressWarnings("unchecked") T e = (T) array[index++];
action.accept(e);
return true;
}
return false;
}
// 迭代器維護的元素的數量
@Override
public long estimateSize() { return (long)(fence - index); }
// 迭代器的特征
@Override
public int characteristics() {
return characteristics;
}
/* 獲取元素比較器,具有SORTED特征則返回null
由于構造鏈中沒有傳遞SORTED特征,所以永遠返回null
*/
@Override
public Comparator<? super T> getComparator() {
if (hasCharacteristics(Spliterator.SORTED))
return null;
throw new IllegalStateException();
}
}
Q2:StreamOpFlag是什么?
A2:StreamOpFlag是流特征及流操作特征的標志的枚舉類。用流的優(yōu)化計算的狀態(tài)控制以及優(yōu)化計算。
import java.util.stream.StreamSupport;
public static <T> Stream<T> stream(Spliterator<T> spliterator,
boolean parallel) {
Objects.requireNonNull(spliterator);
/* StreamOpFlag.fromCharacteristics(spliterator) == 0b0101_0101
parallel == false
*/
return new ReferencePipeline.Head<>(spliterator,
StreamOpFlag.fromCharacteristics(spliterator),
parallel);
}
package java.util.stream;
enum StreamOpFlag {
/*
StreamOpFlag類似于Spliterator的特征量,表示stream特征和stream操作的特征,
主要用于優(yōu)化運算
Stream對象內部使用int類型變量維護特征的集合
一個stream的一生分為3個階段:
1. stream sources, 資源階段,e.g. 一個Stream.of('xx', 'xx')生成一個stream
2. intermediate operations, 中間操作階段,e.g. .filter(x -> x > 5)
3. terminal operations, 終止階段,e.g. .foreach(System.out::println)
分離器的一些特征是與Stream的特征相匹配的, 共有特征包括
1. DISTINCT, 表示內部元素不重復
2. SORTED, 表示內部元素是有序的
3. ORDERED, 表示元素是操作元素是自然順序,
4. SIZED, 表示元素數量有限且精確
SHORT_CIRCUIT是流獨有的特征,‘短路’, 不會遍歷所有元素的操作的特征
.e.g. findAny方法在匹配到元素的就結束遍歷
*/
// Matches Spliterator.DISTINCT
DISTINCT(0, set(Type.SPLITERATOR).set(Type.STREAM).setAndClear(Type.OP)),
// Matches Spliterator.SORTED
SORTED(1, set(Type.SPLITERATOR).set(Type.STREAM).setAndClear(Type.OP)),
// Matches Spliterator.ORDERED
ORDERED(2, set(Type.SPLITERATOR).set(Type.STREAM).setAndClear(Type.OP)
.clear(Type.TERMINAL_OP.clear(Type.UPSTREAM_TERMINAL_OP)),
// Matches Spliterator.SIZED
SIZED(3, set(Type.SPLITERATOR).set(Type.STREAM).clear(Type.OP)),
SHORT_CIRCUIT(12, set(Type.OP).set(Type.TERMINAL_OP));
/*
枚舉特征量的構造方法
1. position
與Splitrerator特征不同的是:stream特征做了更細化的分類
e.g. 只有DISTINCT特征時,
spl有:0b01, spl無:0b00
stream有:0b01, stream無:0b10, stream未知:0b11
stream使用兩個bit存儲一個特征
使用DISTINCT.set存儲特征值‘有’
使用DISTINCT.clear存儲特征值‘無’
使用DISTINCT.preserve存儲未知,也是該特征值的掩碼,
一個特征值即沒有set,也沒有clear,那么就應該是preserve
*/
private StreamOpFlag(int position, MaskBuilder maskBuilder) {
this.maskTable = maskBuilder.build();
// Two bits per flag
position *= 2;
// bitPosition是存儲位置.
this.bitPosition = position;
this.set = SET_BITS << position;
this.clear = CLEAR_BITS << position;
this.preserve = PRESERVE_BITS << position;
}
private static final int SET_BITS = 0b01;
private static final int CLEAR_BITS = 0b10;
private static final int PRESERVE_BITS = 0b11;
private final Map<Type, Integer> maskTable;
private final int bitPosition;
private final int set;
private final int clear;
private final int preserve;
/*
2. maskBuilder 用來生成不同時期的特征的掩碼
思考一個問題:stream有三個階段,特征set,clear在這三個階段是否都是有意義的?
e.g. SIZED表示容量有限且精確,
如果使用了filter方法,那么SIZED就不精確,可以設置成SIZED.clear
如果SIZED特征本身就是preserve,只有可能在中間操作之后設置成SIZED.clear
也就是無法將一個非SIZED數據源操作成SIZED.
所以SIZED.set在中間操作時期是沒有意義的
因此,特征在不同的時期有不同的具體表現
enum Type表示更細化的時期
*/
enum Type {
SPLITERATOR, //分離器時期
STREAM, // 分離器剛轉化為流,還沒有操作的時期,
OP, // 中間操作時期
TERMINAL_OP, // 終止操作時期
UPSTREAM_TERMINAL_OP // 略,未應用
}
/*
以DISTINCT(0, set(Type.SPLITERATOR).set(Type.STREAM)
. setAndClear(Type.OP)),為例
set(Type.SPLITERATOR)創(chuàng)建了MaskBuilder代理了一個EnumMap
以Type枚舉值為key,
Maskbuilder.this.set(t) k = t, v = SET_BITS
Maskbuilder.this.clear(t) k = t, v = CLEAR_BITS
Maskbuilder.this.setAndClear(t) k = t, v = PRESERVE_BITS
// 表示set和clear都有可能
意義是,不同的時期(Type), 由不同的具體特征值
DISTINCT.maskBuilder {Type.SPLITERATOR = SET_BITS,
TYPE.STREAM = SET_BITS,
Type.OP = PRESERVE_BITS}
this.maskTable = DISTINCT.maskBuilder.build()方法將
沒有加入的Type枚舉值的v設置成0,表示此時期無效
總結:就是為了生成不同時期的所有特征的掩碼
每一列表示特征在不同時期的具體值
每一行表示時期內不同特征的具體值
01 = set
10= clear
11 = preserve
*/
| DISTINCT | SORTED | ORDERED | SIZED | SHORT_CIRCUIT | |
|---|---|---|---|---|---|
| SPLITERATOR | 01 | 01 | 01 | 01 | 00 |
| STREAM | 01 | 01 | 01 | 01 | 00 |
| OP | 11 | 11 | 11 | 10 | 01 |
| TERMINAL_OP | 00 | 00 | 10 | 00 | 10 |
| UPSTREAM_TERMINAL_OP | 00 | 00 | 10 | 00 | 00 |
private static MaskBuilder set(Type t) {
return new MaskBuilder(new EnumMap<>(Type.class)).set(t);
}
private static class MaskBuilder {
final Map<Type, Integer> map;
MaskBuilder(Map<Type, Integer> map) { this.map = map; }
MaskBuilder mask(Type t, Integer i) { map.put(t, i); return this; }
MaskBuilder set(Type t) { return mask(t, SET_BITS); }
MaskBuilder clear(Type t) { return mask(t, CLEAR_BITS); }
MaskBuilder setAndClear(Type t) { return mask(t, PRESERVE_BITS); }
Map<Type, Integer> build() {
for (Type t : Type.values()) {
map.putIfAbsent(t, 0b00);
}
return map;
}
}
// 不同時期的掩碼
static final int SPLITERATOR_CHARACTERISTICS_MASK = createMask(
Type.SPLITERATOR); // 0b0101_0101
static final int STREAM_MASK = createMask(Type.STREAM); // 0b0101_0101
static final int OP_MASK = createMask(
Type.OP); // 0b01_0000_0000_0000_0000_1011_1111
static final int TERMINAL_OP_MASK = createMask(
Type.TERMINAL_OP); // 0b01_0000_0000_0000_0000_1010_0000
static final int UPSTREAM_TERMINAL_OP_MASK = createMask(
Type.UPSTREAM_TERMINAL_OP); // 未應用
private static int createMask(Type t) {
int mask = 0;
for (StreamOpFlag flag : StreamOpFlag.values()) {
mask |= flag.maskTable.get(t) << flag.bitPosition;
}
return mask;
}
// 該特征是否是Stream時期有效的特征
boolean isStreamFlag() { return maskTable.get(Type.STREAM) > 0; }
// flags是否是包含該特征的set
boolean isKnown(int flags) { return (flags & preserve) == set; }
// flags是否是包含該特征的clear
boolean isCleared(int flags) { return (flags & preserve) == clear; }
// flags是否包含該特征的preserve
boolean isPreserved(int flags) { return (flags & preserve) == preserve; }
// 該特征在t時期能否設置成set
boolean canSet(Type t) { return (maskTable.get(t) & SET_BITS) > 0; }
// 全特征掩碼,0b11_0000_0000_0000_0000_1111_1111
private static final int FLAG_MASK = createFlagMask();
private static int createFlagMask() {
int mask = 0;
for (StreamOpFlag flag : StreamOpFlag.values()) {
mask |= flag.preserve;
}
return mask;
}
/*
FlAG_MASK_IS,0b0101_0101,取流特征中的所有set的掩碼
FLAG_MASK_NOT, 0b1010_1010, 取流特征中的所有的clear的掩碼
INITAL_OPS_VALUE, 0b1111_1111, 分離器特征轉化成流特征的掩碼
*/
private static final int FLAG_MASK_IS = STREAM_MASK;
private static final int FLAG_MASK_NOT = STREAM_MASK << 1;
static final int INITIAL_OPS_VALUE = FLAG_MASK_IS | FLAG_MASK_NOT;
// 用于設置流特征
static final int IS_DISTINCT = DISTINCT.set;
static final int NOT_DISTINCT = DISTINCT.clear;
static final int IS_SORTED = SORTED.set;
static final int NOT_SORTED = SORTED.clear;
static final int IS_ORDERED = ORDERED.set;
static final int NOT_ORDERED = ORDERED.clear;
static final int IS_SIZED = SIZED.set;
static final int NOT_SIZED = SIZED.clear;
static final int IS_SHORT_CIRCUIT = SHORT_CIRCUIT.set;
/*
從分離器特征轉化為流的分離器時期特征
分離器具有SORTED和對象比較器,那么分離器SORTED特征不會被傳遞到分離器時期特征
characteristics & 0b0101_0101 保證了特征為spl和streamopflag的交集
*/
static int fromCharacteristics(Spliterator<?> spliterator) {
int characteristics = spliterator.characteristics();
if ((characteristics & Spliterator.SORTED) != 0
&& spliterator.getComparator() != null) {
return characteristics & SPLITERATOR_CHARACTERISTICS_MASK
& ~Spliterator.SORTED;
}
else {
return characteristics & SPLITERATOR_CHARACTERISTICS_MASK;
}
}
static int fromCharacteristics(int characteristics) {
return characteristics & SPLITERATOR_CHARACTERISTICS_MASK;
}
// Stream特征轉化回分離器特征
static int toCharacteristics(int streamFlags) {
return streamFlags & SPLITERATOR_CHARACTERISTICS_MASK;
}
/*
中間操作和終止操作使得流特征發(fā)生變化,combineOpFlags結合原特征與新產生的特征
e.g. 原0b0101_0101 新0b0000_0010
需要把DISTINCT從set變成clear
getMask(0b0000_0010)
如果flags == 0;
return 0b11_0000_0000_0000_0000_1111_11111, 全掩碼
否則((FLAG_MASK_IS & flags) << 1) | ((FLAG_MASK_NOT & flags) >> 1)
將低位特征碼中存在 set,clear,preserve碼
全變成presreve碼 0b0000_0011
| flags 目的是取高位特征SHORT_CIRCUIT,
~ 將新特征中為00(沒有設置set,clear,preserve)
取反為11生成掩碼 0b1111...1111_1100
prevCombOpFlags & StreamOpFlag.getMask(newStreamOrOpFlags)
原特征中取新特征沒有設置的特征(00)
| newStreamOrOpFlags 生成新的特征
整體邏輯:生成的特征碼是新特征碼和原特征碼中新特征中沒有的設置的特征 的并集
*/
static int combineOpFlags(int newStreamOrOpFlags, int prevCombOpFlags) {
return (prevCombOpFlags & StreamOpFlag.getMask(newStreamOrOpFlags))
| newStreamOrOpFlags;
}
private static int getMask(int flags) {
return (flags == 0)
? FLAG_MASK
: ~(flags | ((FLAG_MASK_IS & flags) << 1)
| ((FLAG_MASK_NOT & flags) >> 1));
}
// 重后期特征碼轉換成stream時期特征碼
static int toStreamFlags(int combOpFlags) {
// By flipping the nibbles 0x11 become 0x00 and 0x01 become 0x10
return ((~combOpFlags) >> 1) & FLAG_MASK_IS & combOpFlags;
}
}
Q3:ReferencePipeline.Head是什么?
A3:ReferencePipeline.Head是一個數據源Stream對象。

package java.util.stream;
// Reference.Head對象的構造方法鏈
Head(Spliterator<?> source, int sourceFlags, boolean parallel) {
// sourceFlags == 0b0101_0000,parallel == false
super(source, sourceFlags, parallel);
}
// super
ReferencePipeline(Spliterator<?> source, int sourceFlags, boolean parallel) {
super(source, sourceFlags, parallel);
}
// super
AbstractPipeline(Spliterator<?> source, int sourceFlags, boolean parallel) {
/*
StreamOpFlag.STREAM_MASK表示數據源流特征掩碼 == 0b0101_0101
StreamOpFlag.INITIAL_OPS_VALUE表示初始化操作流掩碼 == 0b1111_1111
Stream.of方法生成的資源流的特征是sourceFlags == 0b0101_0000
this.sourceOrFlags = 0b0101_0000
this.combinedFlags = 0b0101_1111
combinedFlags對應了StreamOpFlag有效特征未知予以保留為preserve
本資源流的特征是:
DISTINCT.preserve
SORTED.preserve
SIZED.set
ORDERED.set
*/
// 上一個階段的流對象,這是一個數據源流對象,沒有上一個階段
this.previousStage = null;
// 數據源分離迭代器
this.sourceSpliterator = source;
// 指向資源階段的流對象
this.sourceStage = this;
// 資源流或操作流特征
this.sourceOrOpFlags = sourceFlags & StreamOpFlag.STREAM_MASK;
// 結合特征,下一個操作流特征與其結合生成操作流的結合特征
this.combinedFlags = (~(sourceOrOpFlags << 1))
& StreamOpFlag.INITIAL_OPS_VALUE;
// 操作深度,資源流沒有操作
this.depth = 0;
// 是否并發(fā)處理流,false
this.parallel = parallel;
}
Stream的非并發(fā)執(zhí)行流程
Q4:什么是StatelessOp,StatefulOp?
A4:都是中間操作流,StatelessOp為無狀態(tài)中間操作流(操作元素時,元素之間沒有影響,例如map),StatefulOp為有狀態(tài)中間操作流(操作元素時,元素之間有影響,例如distinct)。

package java.util.stream;
abstract static class StatelessOp<E_IN, E_OUT> extends
ReferencePipeline<E_IN, E_OUT> {
/*
使用無狀態(tài)中間方法的時候,就會直接返回一個StatelessOp流
e.g. Stream.of("xx","xx").filter(x -> x > 5).map(x -> x - 5)
upstream在filter方法中就是Stream.of產生的資源流
在map方法中就是filter生產的StatelessOp流
中間操作方法的基本邏輯就是流對流的包裹
enum StreamShape {
REFERENCE,
INT_VALUE,
LONG_VALUE,
DOUBLE_VALUE
}
Stream.of生產的資源都是Object[], 也就是REFERENCE引用類型
assert upstream.getOutputShape() == inputShape; // 沒意義
opFlags為本此操作對流產生的特征
e.g. filter方法導致流的數據size不精確了,
opFlags可以是StreamOpFlag.NOT_SIZED
*/
StatelessOp(AbstractPipeline<?, E_IN, ?> upstream,
StreamShape inputShape, int opFlags) {
super(upstream, opFlags);
assert upstream.getOutputShape() == inputShape;
}
// 是否是有狀態(tài)的流,當然不是
@Override
final boolean opIsStateful() { return false; }
}
// super
ReferencePipeline(AbstractPipeline<?, P_IN, ?> upstream, int opFlags) {
super(upstream, opFlags);
}
// super
AbstractPipeline(AbstractPipeline<?, E_IN, ?> previousStage, int opFlags) {
// linkedOrConsumed默認為false, 流被鏈接或被消費為true
if (previousStage.linkedOrConsumed)
throw new IllegalStateException(MSG_STREAM_LINKED);
previousStage.linkedOrConsumed = true;
// 鏈接兩個流,可以想象多個中間操作會產生一條流鏈表
previousStage.nextStage = this;
this.previousStage = previousStage;
/* StreamOpFlag.OP_MASK == // 0b01_0000_0000_0000_0000_1011_1111
這個掩碼可以看出SIZED的為10,表示操作特征永遠不能是IS_SIZED
只能是NOT_SIZED,或者是無效00,也就是流操作默認不是IS_SIZED
this.combinedFlags由被鏈接的previousStage的結合特征及操作特征產生
資源流為head流,其sourceStage指向自己
this.sourceStage也是指向Head,就是說中間操作流的sourceStage都是指向Head流
那么sourceStage.sourceAnyStateful永遠是在Head流中更新
這是一個對并發(fā)操作有影響的變量,暫且不提
*/
this.sourceOrOpFlags = opFlags & StreamOpFlag.OP_MASK;
this.combinedFlags = StreamOpFlag.combineOpFlags(opFlags,
previousStage.combinedFlags);
this.sourceStage = previousStage.sourceStage;
if (opIsStateful())
sourceStage.sourceAnyStateful = true;
this.depth = previousStage.depth + 1;
}
abstract static class StatefulOp<E_IN, E_OUT> extends
ReferencePipeline<E_IN, E_OUT> {
/*
與StatelessOp基本一致
opIsStateful返回true導致sourceStage.sourceAnyStateful = true;
opEvaluateParallel方法為并發(fā)處理的方法
*/
StatefulOp(AbstractPipeline<?, E_IN, ?> upstream,
StreamShape inputShape, int opFlags) {
super(upstream, opFlags);
assert upstream.getOutputShape() == inputShape;
}
@Override
final boolean opIsStateful() { return true; }
@Override
abstract <P_IN> Node<E_OUT>
opEvaluateParallel(PipelineHelper<E_OUT> helper,
Spliterator<P_IN> spliterator,
IntFunction<E_OUT[]> generator);
}
Q5:什么是Sink,什么是Sink.ChainedReference?
A5:Sink是Consumer的擴展接口,Sink.ChainedReference是Sink的具體實現類。

package java.util.stream;
interface Sink<T> extends Consumer<T> {
/*
begin必須在accept接收數據之前調用,完成某些狀態(tài)的初始化
arg. size是本Sink具體接收到的數據個數,未知或無限為-1
end方法必須在accept處理完所有數據后調用,完成一些收尾工作
end之后Sink對象可以再次復用
Sink對象是用來封裝本次操作的和下次操作之間的邏輯
*/
default void begin(long size) {}
default void end() {}
/*
取消被請求。
主要是一些短路操作使用,可以監(jiān)控下個Sink對象是否短路
如果短路,那么本Sink在有些情況下也應該短路。
*/
default boolean cancellationRequested() { return false; }
}
abstract static class ChainedReference<T, E_OUT> implements Sink<T> {
/*
對Rederence流的函數式接口對象進行鏈式的連接
downstream是下個操作的Sink對象
e.g. aStreamObj.filter(predicate).map(function).forEach(consumer);
forEach的Sink.this.accept(T t) {
consumer.accept(t);
}
map的Sink.this.accept(T t) {
downstream.accept(function.apply(t))
}
filter的Sink.this.accept(T t) {
if (predicate) {
downstream.accept(t)
}
}
調用,predicate -> function -> consumer
可以看出整個操作流程成為一條執(zhí)行鏈
并且封裝所有的Sink為一個Sink,邏輯:
filterSink(mapSink(forEachSink))
begin,end,cancellationRequested方法也被封裝成一條調用鏈
downstream方法的調用必須有
否則,對一個Sink調用方法將無法傳播到下一個Sink
*/
protected final Sink<? super E_OUT> downstream;
public ChainedReference(Sink<? super E_OUT> downstream) {
this.downstream = Objects.requireNonNull(downstream);
}
@Override
public void begin(long size) { downstream.begin(size); }
@Override
public void end() { downstream.end(); }
@Override
public boolean cancellationRequested() {
return downstream.cancellationRequested();
}
}
Q6:什么是TerminalOp, TerminalSink?
A6:TerminalOp是終止操作對象,與中間操作建立一個操作流維護操作對象不同,終止操作并不建立一個流。TerminalSink是終止操作的Sink包裝。
package java.util.stream;
interface TerminalOp<E_IN, R> {
/*
由于流的惰性求值,終止操作必須對流進行
終止操作后面不能繼續(xù)使用流方法,所以終止操作不必是一個流對象
由此,該接口誕生。
*/
default StreamShape inputShape() { return StreamShape.REFERENCE; }
/*
返回終止操作特征,不同的終止操作有不同的結果。
終止操作特征只有:IS_SHORT_CIRCUIT 和 NOT_ORDERED有效
*/
default int getOpFlags() { return 0; }
// 并發(fā)處理
default <P_IN> R evaluateParallel(PipelineHelper<E_IN> helper,
Spliterator<P_IN> spliterator) {
if (Tripwire.ENABLED)
Tripwire.trip(getClass(),
"{0} triggering TerminalOp.evaluateParallel serial default");
return evaluateSequential(helper, spliterator);
}
// 非并發(fā)時處理
<P_IN> R evaluateSequential(PipelineHelper<E_IN> helper,
Spliterator<P_IN> spliterator);
}
// 支持了Supplier接口,有些終結方法是有返回值的
interface TerminalSink<T, R> extends Sink<T>, Supplier<R> { }
Q7:舉例說明中間方法的具體實現
Q7:
// 一個無狀態(tài)的中間方法,在并發(fā)和非并發(fā)時邏輯是一致的
@Override
public final Stream<P_OUT> filter(Predicate<? super P_OUT> predicate) {
Objects.requireNonNull(predicate);
/*
e.g. Stream.of("x", "xx").filter(i -> i.length() >= 2).map(略)
filter方法生成了一個無狀態(tài)中間流,使得.map可以繼續(xù)
this是filter的調用流, 即Stream.of產生的流
StreamOpFlag.NOT_SIZED是新的操作特征
重寫了AbstractPipline的opWrapSink方法
該方法就是將filter的sink和map的sink鏈接
并且前面的sink是依賴后面的sink實例化的
惰性求值,會在終止操作的時候調用
*/
return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE,
StreamOpFlag.NOT_SIZED) {
@Override
Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
/* sink是下一個操作的sink對象,在ChainedReference是downstream變量
flags是上一次操作的結合特征
*/
return new Sink.ChainedReference<P_OUT, P_OUT>(sink) {
@Override
public void begin(long size) {
/*
size可能是具體的值,但是經過fliter操作,size就不是精準的
所以傳遞給下一個方法的size為-1
-1表示容量是未知或無限的
*/
downstream.begin(-1);
}
@Override
public void accept(P_OUT u) {
/*
predicate就是被保存操作流的opWrapSink方法內的匿名類中
downstream.accept(u)下一個sink的accept方法
邏輯上preidicate.test(u) == true, u才會繼續(xù)傳播
所以流操作是不對數據直接進行修改
*/
if (predicate.test(u))
downstream.accept(u);
}
};
}
};
}
/* 一個有狀態(tài)的中間方法,任何短路方法和元素的之間互有影響的方法都是有狀態(tài)的
可能在accpet方法中收集元素,處理好收集好的元素,之后在向下一個sink傳播,例如sorted
也可能使用邏輯,滿足向下sink傳播,不滿足不傳播,例如distinct
在并發(fā)時邏輯可能和非并發(fā)時不一致
*/
private static final class SizedRefSortingSink<T>
extends AbstractRefSortingSink<T> {
private T[] array;
private int offset;
// 這是一排序方法
SizedRefSortingSink(Sink<? super T> sink,
Comparator<? super T> comparator) {
super(sink, comparator);
}
// 當中件方法執(zhí)行到這里的時候,所有的元素都被收集array中,并不向下執(zhí)行了
@Override
public void accept(T t) {
array[offset++] = t;
}
@Override
@SuppressWarnings("unchecked")
public void begin(long size) {
if (size >= Nodes.MAX_ARRAY_SIZE)
throw new IllegalArgumentException(Nodes.BAD_SIZE);
array = (T[]) new Object[(int) size];
}
/*
在end方法時才進行排序,
調用downstream繼續(xù)執(zhí)行
非短路調用流程:
sink.begin(-1);
spliterator.forEachRemaining(sink);
執(zhí)行完所有元素的sortedsink的accept方法,退出
sink.end(); // 調用排序方法,繼續(xù)向下一個sink執(zhí)行
*/
@Override
public void end() {
Arrays.sort(array, 0, offset, comparator);
downstream.begin(offset);
/*
cancellationRequestedCalled == false
說明sink鏈中是沒有短路方法的,可以將所有的元素向后傳播
== true的說面有短路情況,需要監(jiān)視傳播
*/
if (!cancellationRequestedCalled) {
for (int i = 0; i < offset; i++)
downstream.accept(array[i]);
}
else {
for (int i = 0; i < offset &&
!downstream.cancellationRequested(); i++)
downstream.accept(array[i]);
}
downstream.end();
array = null;
}
/*
sink鏈中有任何一個方法是短路方法時的執(zhí)行邏輯
do { } while (!(cancelled = sink.cancellationRequested())
&& spliterator.tryAdvance(sink));
sink.cancellationRequestedCalled方法永遠監(jiān)控不到sorted后面的短路情況
因為sortedsink.cancellationRequestedCalled方法是不向下傳播的
當sortedsink.cancellationRequestedCalled方法為被調用的時候,
說明sink鏈中是一定有短路的方法
sorted.cancellationRequested = true;說明要監(jiān)控后面sink的短路情況
*/
@Override
public final boolean cancellationRequested() {
cancellationRequestedCalled = true;
return false;
}
}
Q8:舉例說明終止方法的實現及執(zhí)行流程
A8:
// 終止方法forEach
@Override
public void forEach(Consumer<? super P_OUT> action) {
evaluate(ForEachOps.makeRef(action, false));
}
// 終止執(zhí)行方式和終止sink的生成
public static <T> TerminalOp<T, Void> makeRef(Consumer<? super T> action,
boolean ordered) {
Objects.requireNonNull(action);
return new ForEachOp.OfRef<>(action, ordered);
}
/*
ForEachOp實現了TerminalOp是一個終止執(zhí)行對象
實現了TerminalSink也是一個終止sink對象
*/
abstract static class ForEachOp<T>
implements TerminalOp<T, Void>, TerminalSink<T, Void> {
/* ordered是終止操作的特征的邏輯值,true表示0無效,false表示NOT_ORDERED
*/ ordered在并行處理時有不同的意義
private final boolean ordered;
protected ForEachOp(boolean ordered) { this.ordered = ordered; }
@Override
public int getOpFlags() { return ordered ? 0 : StreamOpFlag.NOT_ORDERED; }
// 非并行處理的執(zhí)行方法
@Override
public <S> Void evaluateSequential(PipelineHelper<T> helper,
Spliterator<S> spliterator) {
return helper.wrapAndCopyInto(this, spliterator).get();
}
// TerminalSink也是Suppier,get是某些需要終止方法有返回值需要實現的
@Override
public Void get() { return null; }
static final class OfRef<T> extends ForEachOp<T> {
final Consumer<? super T> consumer;
OfRef(Consumer<? super T> consumer, boolean ordered) {
super(ordered);
this.consumer = consumer;
}
// begin和end直接是默認的空方法,不需要向下傳播,也不是短路操作
@Override
public void accept(T t) {
consumer.accept(t);
}
}
}
// 執(zhí)行,執(zhí)行,執(zhí)行
final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {
assert getOutputShape() == terminalOp.inputShape();
/*
終止方法forEach的調用者是中間操作的最后一個流
也就是forEach的前一個方法產生的流對象
linkedOrConsumed必定是false
*/
if (linkedOrConsumed)
throw new IllegalStateException(MSG_STREAM_LINKED);
linkedOrConsumed = true;
return isParallel()
? terminalOp.evaluateParallel(this,
sourceSpliterator(terminalOp.getOpFlags()))
/* 只討論非并發(fā)的處理方式
terminalOp.evaluateSequential是終止執(zhí)行對象的具體實現
*/
: terminalOp.evaluateSequential(this,
sourceSpliterator(terminalOp.getOpFlags()));
}
private Spliterator<?> sourceSpliterator(int terminalFlags) {
// 本方法是獲取資源分離迭代器
Spliterator<?> spliterator = null;
if (sourceStage.sourceSpliterator != null) {
spliterator = sourceStage.sourceSpliterator;
sourceStage.sourceSpliterator = null;
}
else if (sourceStage.sourceSupplier != null) {
spliterator = (Spliterator<?>) sourceStage.sourceSupplier.get();
sourceStage.sourceSupplier = null;
}
else {
throw new IllegalStateException(MSG_CONSUMED);
}
/* 并行處理的情況下,如果有任何一個有狀態(tài)方法,才會特殊處理
因為無狀態(tài)方法,在并行時邏輯是沒有變化的
*/
if (isParallel() && sourceStage.sourceAnyStateful) { // 略 }
if (terminalFlags != 0) {
// 將終結操作特征與流結合特征結合
combinedFlags = StreamOpFlag.combineOpFlags(terminalFlags,
combinedFlags);
}
return spliterator;
}
// 調用具體的終止方法的shxian
@Override
public <S> Void evaluateSequential(PipelineHelper<T> helper,
Spliterator<S> spliterator) {
/* helper就是forEach方法的調用流
this是終止對象,
wrapAndCopyInto返回this
get()是針對有結果的對象,這里沒意義
*/
return helper.wrapAndCopyInto(this, spliterator).get();
}
// wrap是將sink對象鏈接,copyInto是具體的執(zhí)行
@Override
final <P_IN, S extends Sink<E_OUT>> S wrapAndCopyInto(S sink,
Spliterator<P_IN> spliterator) {
copyInto(wrapSink(Objects.requireNonNull(sink)), spliterator);
return sink;
}
// 將sink對象從后向前鏈接
final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) {
Objects.requireNonNull(sink);
/*
中間操作的depth都是大于0的
p = p.previousStage從后向前
*/
for ( @SuppressWarnings("rawtypes") AbstractPipeline
p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) {
/*
所有的中間操作流都重協(xié)了opWrapSink方法用于生成具體的Sink
p.previousStage.combinedFlags是前一流的結合特征
對本流的結合特征可能是有幫助的
*/
sink = p.opWrapSink(p.previousStage.combinedFlags, sink);
}
// 這個sink是第一個中間操作的sink
return (Sink<P_IN>) sink;
}
// 執(zhí)行
@Override
final <P_IN> void copyInto(Sink<P_IN> wrappedSink,
Spliterator<P_IN> spliterator) {
Objects.requireNonNull(wrappedSink);
/*
短路操作是:只執(zhí)行一部分元素就可以停止
e.g. limit(100),只需要前一百個元素,那么就不需要從spl中迭代所有的元素
*/
if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) {
/*
短路特征會在結合特征里一直向下傳播
沒有短路操作,forEachRemaining所有的元素
wrappedSink是sink鏈
end之后,流操作就已經完成了
*/
wrappedSink.begin(spliterator.getExactSizeIfKnown());
spliterator.forEachRemaining(wrappedSink);
wrappedSink.end();
}
else {
// 有短路操作
copyIntoWithCancel(wrappedSink, spliterator);
}
}
@Override
@SuppressWarnings("unchecked")
final <P_IN> boolean copyIntoWithCancel(Sink<P_IN> wrappedSink,
Spliterator<P_IN> spliterator) {
@SuppressWarnings({"rawtypes","unchecked"})
AbstractPipeline p = AbstractPipeline.this;
/* 獲取到的p為資源流,沒有任何意義
可能是非REFERENCE流中有其他的forEachWithCancel實現
*/
while (p.depth > 0) {
p = p.previousStage;
}
wrappedSink.begin(spliterator.getExactSizeIfKnown());
boolean cancelled = p.forEachWithCancel(spliterator, wrappedSink);
wrappedSink.end();
return cancelled;
}
@Override
final boolean forEachWithCancel(Spliterator<P_OUT> spliterator,
Sink<P_OUT> sink) {
boolean cancelled;
/*
tryAdvance是一個一個迭代元素的
必須滿足sink.cancellationRequestedCalled() == false
當有一個短路操作滿足時,迭代分離器就停止迭代
e.g. limit(100).limit(50);
limit(50)滿足是就停止迭代
對于limit(100),邏輯上也是沒有問題的
任何的短路accept方法都會實現邏輯上的停止
e.g. limit.accept
limit = 50
accept(T t) {
if (limit > 0) {
limit--;
downstream.accept(t)
}
}
實現了只向下傳播50個數據
但是分離迭代器沒有停止,產生了沒有必要的迭代
cancellationRequested() {
return limit == 0 ? true : false;
}
所以,cancellationRequested是讓迭代器停止的信號
*/
do { } while (!(cancelled = sink.cancellationRequested())
&& spliterator.tryAdvance(sink));
return cancelled;
}
Stream的并發(fā)執(zhí)行流程
final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {
assert getOutputShape() == terminalOp.inputShape();
if (linkedOrConsumed)
throw new IllegalStateException(MSG_STREAM_LINKED);
linkedOrConsumed = true;
return isParallel()
? terminalOp.evaluateParallel(this,
sourceSpliterator(terminalOp.getOpFlags()))
: terminalOp.evaluateSequential(this,
sourceSpliterator(terminalOp.getOpFlags()));
}
// 在并發(fā)執(zhí)行執(zhí)行中重要是sourceSpliterator方法,不是evaluateParallel
private Spliterator<?> sourceSpliterator(int terminalFlags) {
Spliterator<?> spliterator = null;
// 獲取sourceStage.spliterator略
// 條件是并發(fā),并且有有狀態(tài)中間方法才進行特殊處理
if (isParallel() && sourceStage.sourceAnyStateful) {
int depth = 1;
// 遍歷所有的中間操作流,
for (@SuppressWarnings("rawtypes") AbstractPipeline
u = sourceStage, p = sourceStage.nextStage, e = this;
u != e;
u = p, p = p.nextStage) {
int thisOpFlags = p.sourceOrOpFlags;
/*
如果p是有狀態(tài)的流進行特殊處理,先看depth
e.g. 有-有狀態(tài)流
無-無狀態(tài)流
資源流 -> 無1 -> 無2 -> 有1 -> 無4 -> 有2 -> 無5 -> 終止操作
depth: 0 1 2 0 1 0 1
wrapSink方法:
for ( @SuppressWarnings("rawtypes") AbstractPipeline
p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) {
sink = p.opWrapSink(p.previousStage.combinedFlags, sink);
}
從‘有1’開始向前鏈接sink,可得 無1 - 無2 - 有1 sink鏈
從‘有2’開始向前鏈接sink,可得 無4 - 有2 sink鏈
從終止操作開始向前鏈接sink, 可的 無5 - 終止操作 sink鏈
至此引出了一個分段執(zhí)行的方法:
先執(zhí)行‘有1’sink鏈,得出結果封裝成Spliterator
交給‘有2’sink鏈執(zhí)行,得出Spliterator
交給terminalOp.evaluateParallel執(zhí)行,結束
為什么要分段執(zhí)行?
因為有狀態(tài)的中間方法大多數需要收集前面流過來的所有元素
才能繼續(xù)執(zhí)行,可能執(zhí)行完畢才能傳遞給下一個sink,從此邏輯,分段執(zhí)行。
*/
if (p.opIsStateful()) {
depth = 0;
/* 短路方法一定是有狀態(tài)方法,并行處理時,需要去掉短路特征
1. 并行時每個線程都維護了資源的一部分。不可能因為一個短路信號就中斷
2. 分段處理,短路信號較短,沒有很大的影響
*/
if (StreamOpFlag.SHORT_CIRCUIT.isKnown(thisOpFlags)) {
thisOpFlags = thisOpFlags &
~StreamOpFlag.IS_SHORT_CIRCUIT;
}
/* opEvaluateParallelLazy是分段處理重要方法
就是像上面分段處理一樣,執(zhí)行p前面sink的流程
有狀態(tài)方法是不調用非并行的sink邏輯的,也就是onWrapSink
p具體的并行邏輯,不同方法有不同的實現
StatefulOp必須實現的是opEvaluateParallelLazy,但是很少使用,
使用更多的是opEvaluateParallelLazy,
所以StatefulOp都實現了opEvaluateParallelLazy;
*/
spliterator = p.opEvaluateParallelLazy(u, spliterator);
// sized是很有用特征
thisOpFlags = spliterator.hasCharacteristics(
Spliterator.SIZED)
? (thisOpFlags & ~StreamOpFlag.NOT_SIZED)
| StreamOpFlag.IS_SIZED
: (thisOpFlags & ~StreamOpFlag.IS_SIZED)
| StreamOpFlag.NOT_SIZED;
}
p.depth = depth++;
// 去掉了短路特征,新的spl的Sized特征也可能發(fā)生了變化
p.combinedFlags = StreamOpFlag.combineOpFlags(thisOpFlags,
u.combinedFlags);
}
}
if (terminalFlags != 0) {
combinedFlags = StreamOpFlag.combineOpFlags(terminalFlags,
combinedFlags);
}
return spliterator;
}
// 舉個例子distinct
static <T> ReferencePipeline<T, T> makeRef(AbstractPipeline<?, T, ?>
upstream) {
return new ReferencePipeline.StatefulOp<T, T>(upstream,
StreamShape.REFERENCE,
StreamOpFlag.IS_DISTINCT
| StreamOpFlag.NOT_SIZED) {
<P_IN> Node<T> reduce // 略
@Override
<P_IN> Node<T> opEvaluateParallel //略
@Override
<P_IN> Spliterator<T> opEvaluateParallelLazy(PipelineHelper<T>
helper,
Spliterator<P_IN> spliterator) {
if (StreamOpFlag.DISTINCT.isKnown(helper.getStreamAndOpFlags()))
{
/* helper是此中間方法流的前一個流
其特征是滿足DISTINCT,那么去重就不用做了
但是distinct前面的無狀態(tài)方法都沒有執(zhí)行阿!
wrapSpliterator就是將前面對sink鏈和spliterator封裝到一起
可以想象,使用靜態(tài)代理spliterator;
在迭代元素時,每個都經過sink鏈的處理,
那么,這個代理spl就是distinct方法sink鏈符合邏輯的spl
*/
return helper.wrapSpliterator(spliterator);
}
else if (StreamOpFlag.ORDERED.isKnown(
helper.getStreamAndOpFlags())) {
/*
如果helper有ORDERED特征,那么distinct方法沒有
NOT_ORDERED屬性就需要保持OREDERED
ORDERED是自然順序
必須在此以保持自然順序的方式得出此sink階段的結果spl
像的下面代理方式處理,是否保持ORERED特征,
只能看后面真正并行執(zhí)行時是不是保序方法了
所以有ORDERED特征的方法應該立即執(zhí)行,不去依賴后面的情況
reduce使用了ReduceOp,此sink使用了hashset接收元素
并行時,使用CountedCompleter任務
compute主要邏輯是:將spl分成小的spl,
每個小任務持有一個小spl,
每個小任務持有自己的sink鏈,保證了sink中hashset是單線程安全的
最后將所有任務對hashset合并成一個hashset
hashset在轉化成spl
*/
return reduce(helper, spliterator).spliterator();
}
else {
/*
沒有ORERED特征
wrapSpliterator代理了spl和前面的無狀態(tài)sink鏈
在使用一個DistinctSpliterator代理
DistinctSpliterator迭代時,先經過無狀態(tài)的sink,
再經過去重邏輯(ConcurrentHashMap)就符合去重邏輯了
這種代理的方式只是封裝了spl和sink鏈的邏輯,沒有執(zhí)行
再下一個StatefulOp處可能執(zhí)行,也可能繼續(xù)封裝sink邏輯代理,
那么只有在終結方法處執(zhí)行了
*/
return new StreamSpliterators.DistinctSpliterator<>(helper
.wrapSpliterator(spliterator));
}
}
@Override
Sink<T> opWrapSink(int flags, Sink<T> sink) { //略 }
}
Q9:什么是SpinedBuffer?
A9:SpindedBuffer是二維數組結構的緩沖,優(yōu)勢是減小了擴容的次數和代價。
package java.util.stream
abstract class AbstractSpinedBuffer {
/*
SpinedBuffer維護了一個二維數組,可增、查,不可刪、改
SpinedBuffer -> [[], [], [], null, null]
外部數組叫spine,內部數組叫chunk
spine最小容量:8
chunk最小容量:1 << 4, 最大容量:1 << 30
chunk是填滿一個在建立下一個chunk
elementIndex是當前可寫的chunk的可寫位置的索引
spineIndex是當前可寫chunk在spine中的索引位置
priorElementCount數組索引i位置上是spine索引i位置之前的所有chunk的容量和
總容量:priorElementCount[spineIndex] + spine[spineIndex].length
總數量:priorElementCount[spineIndex] + elementIndex
*/
public static final int MIN_CHUNK_POWER = 4;
public static final int MIN_CHUNK_SIZE = 1 << MIN_CHUNK_POWER;
public static final int MAX_CHUNK_POWER = 30;
public static final int MIN_SPINE_SIZE = 8;
protected final int initialChunkPower;
protected int elementIndex;
protected int spineIndex;
protected long[] priorElementCount;
protected AbstractSpinedBuffer() {
this.initialChunkPower = MIN_CHUNK_POWER; // 4
}
protected AbstractSpinedBuffer(int initialCapacity) {
if (initialCapacity < 0)
throw new IllegalArgumentException("Illegal Capacity: "
+ initialCapacity);
/*
將容量換成大致的power
initialCapacity - 1是為了防止
e.g. 0b0001_0000不減1 power == 5
實際上power應該為4
減1,0b0000_1111 power == 4
只對特定的數字有優(yōu)化。所以最好的initialCapacity是2的n次方
*/
this.initialChunkPower = Math.max(MIN_CHUNK_POWER,
Integer.SIZE - Integer.numberOfLeadingZeros(initialCapacity - 1));
}
public boolean isEmpty() {
return (spineIndex == 0) && (elementIndex == 0);
}
public long count() {
return (spineIndex == 0)
? elementIndex
: priorElementCount[spineIndex] + elementIndex;
}
/*
chunk的大小是倍增的,n為spine索引
0或1,建立 1 << initialChunkPower
大于1,建立 1 << initialChunkPower + n - 1
e.g. spine[chunk[16], chunk[16], chunk[32], chunk[64], chunk[128]]
這種情況非常容易二分,任何一個位置chunk的容量都是其前面的chunk的容量和
*/
protected int chunkSize(int n) {
int power = (n == 0 || n == 1)
? initialChunkPower
: Math.min(initialChunkPower + n - 1,
AbstractSpinedBuffer.MAX_CHUNK_POWER);
return 1 << power;
}
public abstract void clear();
}
class SpinedBuffer<E> extends AbstractSpinedBuffer
implements Consumer<E>, Iterable<E> {
// 指向當前可寫的chunk
protected E[] curChunk;
// 二維數組
protected E[][] spine;
// 生成迭代分離器的特征
private static final int SPLITERATOR_CHARACTERISTICS
= Spliterator.SIZED | Spliterator.ORDERED | Spliterator.SUBSIZED;
@SuppressWarnings("unchecked")
SpinedBuffer(int initialCapacity) {
super(initialCapacity);
// 會直接初始化一個chunk
curChunk = (E[]) new Object[1 << initialChunkPower];
}
@SuppressWarnings("unchecked")
SpinedBuffer() {
super();
curChunk = (E[]) new Object[1 << initialChunkPower];
}
// 容量
protected long capacity() {
return (spineIndex == 0)
? curChunk.length
: priorElementCount[spineIndex] + spine[spineIndex].length;
}
// 初始化spine,priorElementCount,curChunk放在spine[0]
@SuppressWarnings("unchecked")
private void inflateSpine() {
if (spine == null) {
spine = (E[][]) new Object[MIN_SPINE_SIZE][];
priorElementCount = new long[MIN_SPINE_SIZE];
spine[0] = curChunk;
}
}
// 根據目標容量擴容
@SuppressWarnings("unchecked")
protected final void ensureCapacity(long targetSize) {
long capacity = capacity();
if (targetSize > capacity) {
inflateSpine();
for (int i=spineIndex+1; targetSize > capacity; i++) {
if (i >= spine.length) {
// 這種情況spine完全填滿,擴容spine
int newSpineSize = spine.length * 2;
spine = Arrays.copyOf(spine, newSpineSize);
priorElementCount = Arrays.copyOf(priorElementCount,
newSpineSize);
}
// chunk填滿,建立下一個chunk
int nextChunkSize = chunkSize(i);
spine[i] = (E[]) new Object[nextChunkSize];
priorElementCount[i] = priorElementCount[i-1]
+ spine[i-1].length;
capacity += nextChunkSize;
}
}
}
protected void increaseCapacity() {
ensureCapacity(capacity() + 1);
}
// 查找,index是針對總size的
public E get(long index) {
if (spineIndex == 0) {
if (index < elementIndex)
return curChunk[((int) index)];
else
throw new IndexOutOfBoundsException(Long.toString(index));
}
if (index >= count())
throw new IndexOutOfBoundsException(Long.toString(index));
for (int j=0; j <= spineIndex; j++)
if (index < priorElementCount[j] + spine[j].length)
return spine[j][((int) (index - priorElementCount[j]))];
throw new IndexOutOfBoundsException(Long.toString(index));
}
// 從索引offset開始拷貝進入array
public void copyInto(E[] array, int offset) {
long finalOffset = offset + count();
if (finalOffset > array.length || finalOffset < offset) {
throw new IndexOutOfBoundsException("does not fit");
}
if (spineIndex == 0)
System.arraycopy(curChunk, 0, array, offset, elementIndex);
else {
for (int i=0; i < spineIndex; i++) {
System.arraycopy(spine[i], 0, array, offset, spine[i].length);
offset += spine[i].length;
}
if (elementIndex > 0)
System.arraycopy(curChunk, 0, array, offset, elementIndex);
}
}
// 轉化為一維數組
public E[] asArray(IntFunction<E[]> arrayFactory) {
long size = count();
if (size >= Nodes.MAX_ARRAY_SIZE)
throw new IllegalArgumentException(Nodes.BAD_SIZE);
E[] result = arrayFactory.apply((int) size);
copyInto(result, 0);
return result;
}
// 清空,只保留spine[0]處的chunk為curChunk,并清空
@Override
public void clear() {
if (spine != null) {
curChunk = spine[0];
for (int i=0; i<curChunk.length; i++)
curChunk[i] = null;
spine = null;
priorElementCount = null;
}
else {
for (int i=0; i<elementIndex; i++)
curChunk[i] = null;
}
elementIndex = 0;
spineIndex = 0;
}
@Override
public Iterator<E> iterator() {
return Spliterators.iterator(spliterator());
}
@Override
public void forEach(Consumer<? super E> consumer) {
for (int j = 0; j < spineIndex; j++)
for (E t : spine[j])
consumer.accept(t);
for (int i=0; i<elementIndex; i++)
consumer.accept(curChunk[i]);
}
// 添加元素
@Override
public void accept(E e) {
// chunk填滿需要擴容
if (elementIndex == curChunk.length) {
// 可能只有curChunk,沒有spine,需要初始化
inflateSpine();
// 沒有意義的if
if (spineIndex+1 >= spine.length || spine[spineIndex+1] == null)
increaseCapacity();
elementIndex = 0;
++spineIndex;
curChunk = spine[spineIndex];
}
curChunk[elementIndex++] = e;
}
@Override
public String toString() {
List<E> list = new ArrayList<>();
forEach(list::add);
return "SpinedBuffer:" + list.toString();
}
// 生成分離迭代器
public Spliterator<E> spliterator() {
class Splitr implements Spliterator<E> {
// 維護的第一個chunk在spine中的Index
int splSpineIndex;
// 維護的最后一個chunk在spine中的Index
final int lastSpineIndex;
/*
分離器中的chunk在被進行消費時,可能需要經過多個chunk
splChunk指向當前被消費的chunk
splElementIndex被消費的元素在chunk中的Index
*/
E[] splChunk;
int splElementIndex;
// 維護的最后一個chunk的容量
final int lastSpineElementFence;
Splitr(int firstSpineIndex, int lastSpineIndex,
int firstSpineElementIndex, int lastSpineElementFence) {
this.splSpineIndex = firstSpineIndex;
this.lastSpineIndex = lastSpineIndex;
this.splElementIndex = firstSpineElementIndex;
this.lastSpineElementFence = lastSpineElementFence;
assert spine != null || firstSpineIndex == 0
&& lastSpineIndex == 0;
splChunk = (spine == null) ? curChunk :spine[firstSpineIndex];
}
// 分離器剩余未被消費的元素的數量
@Override
public long estimateSize() {
return (splSpineIndex == lastSpineIndex)
? (long) lastSpineElementFence - splElementIndex
: priorElementCount[lastSpineIndex]
+ lastSpineElementFence
- priorElementCount[splSpineIndex]
- splElementIndex;
}
@Override
public int characteristics() {
return SPLITERATOR_CHARACTERISTICS;
}
@Override
public boolean tryAdvance(Consumer<? super E> consumer) {
// 略
}
@Override
public void forEachRemaining(Consumer<? super E> consumer) {
// 略
}
// 這里就體現出了chunkSize()生成的chunk使spine容易二分
@Override
public Spliterator<E> trySplit() {
if (splSpineIndex < lastSpineIndex) {
Spliterator<E> ret = new Splitr(splSpineIndex,
lastSpineIndex - 1,
splElementIndex,
spine[lastSpineIndex-1].length);
splSpineIndex = lastSpineIndex;
splElementIndex = 0;
splChunk = spine[splSpineIndex];
return ret;
}
else if (splSpineIndex == lastSpineIndex) {
int t = (lastSpineElementFence - splElementIndex) / 2;
if (t == 0)
return null;
else {
// 只有一個chunk的時候生成ArraySpliterator
Spliterator<E> ret = Arrays.spliterator(splChunk,
splElementIndex, splElementIndex + t);
splElementIndex += t;
return ret;
}
}
else {
return null;
}
}
}
return new Splitr(0, spineIndex, 0, elementIndex);
}
}
Q10:什么是WrapSpliterator?
A10:有狀態(tài)方法并發(fā)時,lazy執(zhí)行時,使用WrapSpliterator代理spl并封裝有狀態(tài)方法之前的sink鏈,而有狀態(tài)方法的sink需要再使用其他的spl代理WrapSpliterator進行封裝。no-lazy執(zhí)行,使用Nodes類代理,只代理處理完的spl,在調用spliterator方法生成spl。
// 有狀態(tài)方法的helper(有狀態(tài)方法的前一個流)調用
final <P_IN> Spliterator<E_OUT> wrapSpliterator(Spliterator<P_IN>
sourceSpliterator) {
/* 如果helper也是一個有狀態(tài)的流,depth等于0,
直接返回helper封裝好的sourceSpliterator
*/
if (depth == 0) {
return (Spliterator<E_OUT>) sourceSpliterator;
}
else {
// helper是無狀態(tài)的流,需要封裝無狀態(tài)的sink鏈
return wrap(this, () -> sourceSpliterator, isParallel());
}
}
@Override
final <P_IN> Spliterator<P_OUT> wrap(PipelineHelper<P_OUT> ph,
Supplier<Spliterator<P_IN>> supplier,
boolean isParallel) {
// 直接生成了一個代理對象
return new StreamSpliterators.WrappingSpliterator<>(ph, supplier,
isParallel);
}
private abstract static class AbstractWrappingSpliterator<P_IN, P_OUT,
T_BUFFER extends AbstractSpinedBuffer>
implements Spliterator<P_OUT> {
/* 只能由StatefulOp的helper創(chuàng)建的spl代理,
主要是將無狀態(tài)的sink鏈封裝進入迭代的邏輯中
由于size的不確定性,使用buffer緩沖容器
但是這里buffer最多只能有一個元素,真的不懂,有什么用
bufferSink是將buffer的accept方法鏈接到無狀態(tài)鏈最后
accept方法將經由無狀態(tài)鏈的元素放入buffer中
pusher是無狀態(tài)sink鏈的調用者,完成accept動作
finished在accept所有對元素之后為true
nextToConsume下一被消費的元素在buffer中的索引
*/
final boolean isParallel;
final PipelineHelper<P_OUT> ph;
private Supplier<Spliterator<P_IN>> spliteratorSupplier;
Spliterator<P_IN> spliterator;
Sink<P_IN> bufferSink;
BooleanSupplier pusher;
long nextToConsume;
T_BUFFER buffer;
boolean finished;
AbstractWrappingSpliterator(PipelineHelper<P_OUT> ph,
Supplier<Spliterator<P_IN>> spliteratorSupplier,
boolean parallel) {
this.ph = ph;
this.spliteratorSupplier = spliteratorSupplier;
this.spliterator = null;
this.isParallel = parallel;
}
AbstractWrappingSpliterator(PipelineHelper<P_OUT> ph,
Spliterator<P_IN> spliterator,
boolean parallel) {
this.ph = ph;
this.spliteratorSupplier = null;
this.spliterator = spliterator;
this.isParallel = parallel;
}
final void init() {
if (spliterator == null) {
spliterator = spliteratorSupplier.get();
spliteratorSupplier = null;
}
}
// 使用在tryAdvance中使用,時候buffer中有下一個元素
final boolean doAdvance() {
if (buffer == null) {
if (finished)
return false;
init();
initPartialTraversalState();
nextToConsume = 0;
bufferSink.begin(spliterator.getExactSizeIfKnown());
// buffer填添加一個元素,之后會被tryAdvance消耗
return fillBuffer();
}
else {
// buffer不為null
++nextToConsume; // 1
boolean hasNext = nextToConsume < buffer.count(); // 肯定是false
if (!hasNext) {
nextToConsume = 0;
buffer.clear(); // 清空
hasNext = fillBuffer(); // 添加一個
}
return hasNext;
}
}
abstract AbstractWrappingSpliterator<P_IN, P_OUT, ?>
wrap(Spliterator<P_IN> s);
abstract void initPartialTraversalState();
@Override
public Spliterator<P_OUT> trySplit() {
if (isParallel && buffer == null && !finished) {
init();
Spliterator<P_IN> split = spliterator.trySplit();
return (split == null) ? null : wrap(split);
}
else
return null;
}
private boolean fillBuffer() {
while (buffer.count() == 0) { // 這條件太苛刻
/* bufferSink都是無狀態(tài)sink,cancellationRequested肯定是false
pusher.getAsBoolean()向buffer中添加一個元素
如果是fliter之類的截停sink,while循環(huán)
*/
if (bufferSink.cancellationRequested() ||
!pusher.getAsBoolean()) {
if (finished)
return false;
else {
/* 無狀態(tài)sink中end也沒啥操作阿
如果sink鏈中有sorted,end填充buffer,這buffer還有點用
*/
bufferSink.end(); // might trigger more elements
finished = true;
}
}
}
return true;
}
@Override
public final long estimateSize() // 略
@Override
public final long getExactSizeIfKnown() // 略
@Override
public final int characteristics() // 略
@Override
public Comparator<? super P_OUT> getComparator() // 略
@Override
public final String toString() // 略
}
static final class WrappingSpliterator<P_IN, P_OUT> extends
AbstractWrappingSpliterator<P_IN, P_OUT, SpinedBuffer<P_OUT>> {
WrappingSpliterator(PipelineHelper<P_OUT> ph,
Supplier<Spliterator<P_IN>> supplier,
boolean parallel) {
super(ph, supplier, parallel);
}
WrappingSpliterator(PipelineHelper<P_OUT> ph,
Spliterator<P_IN> spliterator,
boolean parallel) {
super(ph, spliterator, parallel);
}
@Override
WrappingSpliterator<P_IN, P_OUT> wrap(Spliterator<P_IN> s) {
return new WrappingSpliterator<>(ph, s, isParallel);
}
@Override
void initPartialTraversalState() {
SpinedBuffer<P_OUT> b = new SpinedBuffer<>();
buffer = b;
bufferSink = ph.wrapSink(b::accept); // 鏈接sink
pusher = () -> spliterator.tryAdvance(bufferSink); // 填充buffer的方法
}
@Override
public boolean tryAdvance(Consumer<? super P_OUT> consumer) {
Objects.requireNonNull(consumer);
boolean hasNext = doAdvance();
if (hasNext)
consumer.accept(buffer.get(nextToConsume));
return hasNext;
}
@Override
public void forEachRemaining(Consumer<? super P_OUT> consumer) {
// 沒有調用過tryAdvance,buffer == null,finished == false
if (buffer == null && !finished) {
Objects.requireNonNull(consumer);
init();
// 不經過buffer了,直接鏈接sink鏈執(zhí)行
ph.wrapAndCopyInto((Sink<P_OUT>) consumer::accept, spliterator);
finished = true;
}
else {
// 有buffer
do { } while (tryAdvance(consumer));
}
}
}
Q11:并行任務的邏輯是什么?
A11:主要使用CountedCompleter任務。先將spl分割成多個spl,每個task持有一個spl,之后將spl使用sink消費迭代。有狀態(tài)方法,可能會對sink消費產生對元素進行另外的操作。
總結

并行處理ORDERED特征的spl時,每一段都需要并行處理之后傳遞給下一段再并行處理。處理NOT_ORDERED特征的spl,可以惰性求值,效率更高。使用unordered方法可以將特征變成NOT_ORDERED(結果不保持自然順序),應該使用在StatefulOp之前,sorted方法是特征變成IS_ORDERED的唯一方法,配合unordered使用需要注意邏輯。
雖然并行處理將所用StatefulOp的短路特征去掉,但是StatefulOp并行處理的sink邏輯都是特別實現的,其內部是完成邏輯上的短路,并且是分段執(zhí)行,前一段有短路邏輯,下一段的數據必然是比前一段沒有短路邏輯的要少。效率情況很模糊,和線程的切換、數據的總量、短路數據的量、OREDED特征的保持有關。并向效率和非并行效率的對比需要實測。
只有可以惰性求值的方法才需要重寫opEvaluateParallelLazy,無法惰性求值的方法(e.g. sorted)是調用默認的opEvalutateParallelLazy調用opEvalutateParallel這個非惰性方法直接求值。
附錄
