定義
- 用于遍歷和分割“源”元素的對(duì)象。
數(shù)據(jù)源
- Spliterator的元素來源可能是一個(gè)數(shù)組,一個(gè)集合,一個(gè)IO通道,一個(gè)生成函數(shù)。
處理數(shù)據(jù)源的方式
- Spliterator可以單獨(dú)或順序地批量地遍歷元素。
- Spliterator也可以將其部分元素作為另一個(gè)Spliterator進(jìn)行分區(qū),為了并行化操作。使用不能拆分或以非常不平衡或低效的方式進(jìn)行拆分Spliterator的操作不太可能從并行中獲益。遍歷和分解流出的元素;每個(gè)Spliterator只對(duì)單個(gè)批量計(jì)算有用。
特征 characteristics
Spliterator 還聲明了 一組關(guān)于它的結(jié)構(gòu)和源的特征(characteristics),包含以下以下幾種:
- ORDERED int 型 值為16 既定的順序,Spliterator保證拆分和遍歷時(shí)是按照這一順序。
- DISTINCT int型 值為1 表示元素都不是重復(fù)的,對(duì)于每一對(duì)元素{ x, y},{ !x.equals(y)}。例如,這適用于基于{@link Set}的Spliterator。
- SORTED int型 值為4 表示元素順序按照預(yù)定義的順序,可以通過getComparator 獲取排序器,若返回null ,則是按自然排序。
- SIZED int型 值為64 表示在遍歷分隔之前 estimateSize() 返回的值代表一個(gè)有限的大小,在沒有修改結(jié)構(gòu)源的情況下,代表了一個(gè)完整遍歷時(shí)所遇到的元素?cái)?shù)量的精確計(jì)數(shù)。
- NONNULL init型 值為256 表示數(shù)據(jù)源保證元素不會(huì)為空
- IMMUTABLE int 型 值為1024 表示在遍歷的過程中不能添加、替換、刪除元素
- CONCURRENT int型 值為4096 表示元素可以被多個(gè)線程安全并發(fā)得修改而不需要外部的同步。
- SUBSIZED int型 值為16384 表示trySplit()返回的結(jié)果都是SIZED和SUBSIZED
Tips
- 一個(gè)late-binding Spliterator 在第一次遍歷、分隔或者查詢?nèi)魏喂烙?jì)的大小時(shí)綁定 ,而不是在創(chuàng)建的時(shí)候綁定。
- 非后期綁定的Spliterator在構(gòu)建或在任何方法的第一次調(diào)用時(shí)綁定到數(shù)據(jù)源。在綁定之前對(duì)源進(jìn)行的修改將在遍歷Spliterator時(shí)反映出來,在綁定源之后,發(fā)現(xiàn) structural interference應(yīng)立即拋出ConcurrentModificationException 異常,這稱為快速失敗。
- Spliterator的批量遍歷方法({@link # forEachRemaining()})可以在遍歷完所有元素之后優(yōu)化遍歷并檢查 structural interference,而不是檢查每個(gè)元素并立即失敗。
- Spliterator 提供估計(jì)剩余多少元素的方法,即estimateSize()方法,理想情況下,正如在characteristics SIZED反應(yīng)的那樣,這個(gè)值會(huì)與成功遍歷所遇到的數(shù)量完全一致。但是,即使不知道確切的值,估計(jì)值對(duì)于在數(shù)據(jù)源上執(zhí)行的操作來說仍然是有用的,例如幫助確定是進(jìn)一步分割還是按順序遍歷其余的元素。
并行的實(shí)現(xiàn)
盡管在并行算法中有明顯的實(shí)用功能,但spliterator并不向我們期望的那樣是線程安全的;相反,使用spliterator的并行算法的實(shí)現(xiàn)應(yīng)該確保spliterator一次只使用一個(gè)線程。這個(gè)通常很容易通過 串行線程封閉 來實(shí)現(xiàn):通常使用遞歸分解這個(gè)經(jīng)典的并行算法。調(diào)用{@link #trySplit()}的線程可以將返回的Spliterator傳遞給另一個(gè)線程,而這個(gè)線程又可以遍歷或進(jìn)一步拆分這個(gè)Spliterator。如果兩個(gè)或多個(gè)線程在同一個(gè)Spliterator上同時(shí)操作,則不定義分割和遍歷的行為。如果原始線程將一個(gè)spliterator傳遞給另一個(gè)線程進(jìn)行處理,那么最好是在使用{@link #tryAdvance(Consumer) tryAdvance()}的任何元素之前進(jìn)行切換,因?yàn)槟承┍WC(例如{@link #estimateSize()}對(duì)于{@code size}spliterator的精度)只有在遍歷開始之前才有效。

Spliterator通過支持分割和單元素迭代,除了支持串行遍歷,還支持高效的并行遍歷。另外,Spliterator 不像Iterator設(shè)計(jì)的那樣設(shè)計(jì)兩個(gè)方法hasNext 判斷是否有元素和next() 返回元素進(jìn)行消費(fèi),Spliterator 設(shè)計(jì)一個(gè)tryAdvance方法,消費(fèi)元素,如果有就消費(fèi)并返回true,如果沒有則返回false,不需要兩個(gè)獨(dú)立的方法。
對(duì)于可變?cè)矗绻赟pliterator綁定到其數(shù)據(jù)源和遍歷結(jié)束之間對(duì)源進(jìn)行結(jié)構(gòu)上的干擾(添加、替換或刪除元素),可能會(huì)出現(xiàn)隨機(jī)和不確定的影響。
對(duì)于structurally interfered 可以有一下幾個(gè)方法避免:
數(shù)據(jù)源為java.util包的CopyOnWriteArrayList ,它是不可變的,數(shù)據(jù)源為該類實(shí)例的Spliterator同樣會(huì)將characteristics聲明為IMMUTABLE
-
數(shù)據(jù)源為java.util包的ConcurrentHashMap, 數(shù)據(jù)源為該類實(shí)例的Spliterator會(huì)將特性(characteristics) 聲明為CONCURRENT。
可變的數(shù)據(jù)源會(huì)提供一個(gè) late-binding 和快速失敗的Spliterator。這里有一個(gè)類(除了當(dāng)做例子之外,它不是一個(gè)非常有用的類),它維護(hù)一個(gè)數(shù)組,其中實(shí)際數(shù)據(jù)保存在偶數(shù)位置,而不相關(guān)的標(biāo)記數(shù)據(jù)保存在奇數(shù)位置。它的Spliterator會(huì)忽略標(biāo)記數(shù)據(jù)。
/**
* @Author unyielding
* @date 2018/7/26 0026 19:48
* @desc 一個(gè)類(除了當(dāng)做例子之外,它不是一個(gè)非常有用的類),
* 它維護(hù)一個(gè)數(shù)組, 其中實(shí)際數(shù)據(jù)保存在偶數(shù)位置,而不相關(guān)的標(biāo)記數(shù)據(jù)保存在奇數(shù)位置。
* 它的Spliterator會(huì)忽略標(biāo)記數(shù)據(jù)。
*/
public class TaggedArray<T> {
private final Object[] elements;//創(chuàng)建后,不可變的
/**
* 構(gòu)造方法
*
* @param data 實(shí)際數(shù)據(jù)
* @param tags 標(biāo)記數(shù)據(jù)
*/
TaggedArray(T[] data, Object[] tags) {
int size = data.length;
//保證實(shí)際數(shù)據(jù)數(shù)組和標(biāo)記數(shù)據(jù)數(shù)組的大小相同
if (tags.length != size) throw new IllegalArgumentException();
this.elements = new Object[2 * size];
//初始化elements 數(shù)組
for (int i = 0, j = 0; i < size; ++i) {
elements[j++] = data[i];
elements[j++] = tags[i];
}
}
public Spliterator<T> spliterator() {
return new TaggedArraySpliterator<>(elements, 0, elements.length);
}
static class TaggedArraySpliterator<T> implements Spliterator<T> {
private final Object[] array;
private int origin; //當(dāng)前索引,在分割或者遍歷時(shí)使用
private final int fence;//最大的下標(biāo)加一
TaggedArraySpliterator(Object[] array, int origin, int fence) {
this.array = array;
this.origin = origin;
this.fence = fence;
}
/**
* 批量遍歷
* @param action 消費(fèi)函數(shù) {@link Consumer} 的子類,可以通過lambda表達(dá)式表示
*/
@Override
public void forEachRemaining(Consumer<? super T> action) {
for (; origin < fence; origin += 2) {
action.accept((T) array[origin]);
}
}
/**
* 處理單個(gè)元素
* @param action 消費(fèi)函數(shù) {@link Consumer} 的子類,可以通過lambda表達(dá)式表示
* @return 如果有元素消費(fèi)就返回true,如果沒有就直接返回false
*/
@Override
public boolean tryAdvance(Consumer<? super T> action) {
if (origin < fence) {
action.accept((T) array[origin]);
origin += 2;
return true;
}
return false;
}
/**
* 分割數(shù)據(jù)源
* @return 返回分割后生成的Spliterator
*/
@Override
public Spliterator<T> trySplit() {
int lo = origin;
int mid = ((lo + fence) >> 1) & 1;//強(qiáng)制中點(diǎn)數(shù)為偶數(shù)
if (lo < mid) {
origin = mid;//重置Spliterator的 當(dāng)前下標(biāo)
return new TaggedArraySpliterator<>(array, lo, mid);
}//太小不需要拆分
return null;
}
/**
* 估計(jì)剩余還有多少元素
* @return 剩余還有多少元素
*/
@Override
public long estimateSize() {
return (long) ((fence - origin) / 2);
}
/**
* 獲取特征值 用戶可以根據(jù) 特征值 ,
* 用戶可以根據(jù) 配置更好的控制和優(yōu)化它的使用
* @return
*/
@Override
public int characteristics() {
return ORDERED | IMMUTABLE | SIZED | SUBSIZED;
}
}
/**
* 并行遍歷
* @param a 一個(gè){@link TaggedArray} 實(shí)例
* @param action
* @param <T> 每個(gè)元素的值
*/
static <T> void parEach(TaggedArray<T> a, Consumer<T> action) {
Spliterator<T> spliterator = a.spliterator();
long targetBatchSize = spliterator.estimateSize()
/ (ForkJoinPool.getCommonPoolParallelism() * 8);
new ParEach<>(null, spliterator, action, targetBatchSize).invoke();
}
}
并行計(jì)算器 ,其實(shí)就是繼承CountedCompleter 一個(gè)可以放到forlk/join 線程池里的類
/**
* 并行計(jì)算器
* @param <T> 元素的類型
*/
static class ParEach<T> extends CountedCompleter<T> {
final Spliterator<T> spliterator;
final Consumer<T> action;
final long targetBatchSize;
ParEach(ParEach<T> parent, Spliterator<T> spliterator,
Consumer<T> action, long targetBatchSize) {
super(parent);
this.spliterator = spliterator;
this.action = action;
this.targetBatchSize = targetBatchSize;
}
@Override
public void compute() {
Spliterator<T> sub;
while (spliterator.estimateSize() > targetBatchSize
&& (sub = spliterator.trySplit()) != null) {
addToPendingCount(1);
new ParEach<>(this, sub, action, targetBatchSize).fork();
}
spliterator.forEachRemaining(action);
propagateCompletion();
}
}
至于生成Stream 的姿勢(shì) 詳見Java 之Stream 生成姿勢(shì)
代碼地址
推薦閱讀
http://www.itdecent.cn/p/af22a9d8ce98