Java 8 之Stream Spliterator

定義

  • 用于遍歷和分割“源”元素的對(duì)象。

數(shù)據(jù)源

  • Spliterator的元素來源可能是一個(gè)數(shù)組,一個(gè)集合,一個(gè)IO通道,一個(gè)生成函數(shù)。

處理數(shù)據(jù)源的方式

  • Spliterator可以單獨(dú)或順序地批量地遍歷元素。
  • Spliterator也可以將其部分元素作為另一個(gè)Spliterator進(jìn)行分區(qū),為了并行化操作。使用不能拆分或以非常不平衡或低效的方式進(jìn)行拆分Spliterator的操作不太可能從并行中獲益。遍歷和分解流出的元素;每個(gè)Spliterator只對(duì)單個(gè)批量計(jì)算有用。

特征 characteristics

Spliterator 還聲明了 一組關(guān)于它的結(jié)構(gòu)和源的特征(characteristics),包含以下以下幾種:

  • ORDERED int 型 值為16 既定的順序,Spliterator保證拆分和遍歷時(shí)是按照這一順序。
  • DISTINCT int型 值為1 表示元素都不是重復(fù)的,對(duì)于每一對(duì)元素{ x, y},{ !x.equals(y)}。例如,這適用于基于{@link Set}的Spliterator。
  • SORTED int型 值為4 表示元素順序按照預(yù)定義的順序,可以通過getComparator 獲取排序器,若返回null ,則是按自然排序。
  • SIZED int型 值為64 表示在遍歷分隔之前 estimateSize() 返回的值代表一個(gè)有限的大小,在沒有修改結(jié)構(gòu)源的情況下,代表了一個(gè)完整遍歷時(shí)所遇到的元素?cái)?shù)量的精確計(jì)數(shù)。
  • NONNULL init型 值為256 表示數(shù)據(jù)源保證元素不會(huì)為空
  • IMMUTABLE int 型 值為1024 表示在遍歷的過程中不能添加、替換、刪除元素
  • CONCURRENT int型 值為4096 表示元素可以被多個(gè)線程安全并發(fā)得修改而不需要外部的同步。
  • SUBSIZED int型 值為16384 表示trySplit()返回的結(jié)果都是SIZED和SUBSIZED

Tips

  • 一個(gè)late-binding Spliterator 在第一次遍歷、分隔或者查詢?nèi)魏喂烙?jì)的大小時(shí)綁定 ,而不是在創(chuàng)建的時(shí)候綁定。
  • 非后期綁定的Spliterator在構(gòu)建或在任何方法的第一次調(diào)用時(shí)綁定到數(shù)據(jù)源。在綁定之前對(duì)源進(jìn)行的修改將在遍歷Spliterator時(shí)反映出來,在綁定源之后,發(fā)現(xiàn) structural interference應(yīng)立即拋出ConcurrentModificationException 異常,這稱為快速失敗。
  • Spliterator的批量遍歷方法({@link # forEachRemaining()})可以在遍歷完所有元素之后優(yōu)化遍歷并檢查 structural interference,而不是檢查每個(gè)元素并立即失敗。
  • Spliterator 提供估計(jì)剩余多少元素的方法,即estimateSize()方法,理想情況下,正如在characteristics SIZED反應(yīng)的那樣,這個(gè)值會(huì)與成功遍歷所遇到的數(shù)量完全一致。但是,即使不知道確切的值,估計(jì)值對(duì)于在數(shù)據(jù)源上執(zhí)行的操作來說仍然是有用的,例如幫助確定是進(jìn)一步分割還是按順序遍歷其余的元素。

并行的實(shí)現(xiàn)

盡管在并行算法中有明顯的實(shí)用功能,但spliterator并不向我們期望的那樣是線程安全的;相反,使用spliterator的并行算法的實(shí)現(xiàn)應(yīng)該確保spliterator一次只使用一個(gè)線程。這個(gè)通常很容易通過 串行線程封閉 來實(shí)現(xiàn):通常使用遞歸分解這個(gè)經(jīng)典的并行算法。調(diào)用{@link #trySplit()}的線程可以將返回的Spliterator傳遞給另一個(gè)線程,而這個(gè)線程又可以遍歷或進(jìn)一步拆分這個(gè)Spliterator。如果兩個(gè)或多個(gè)線程在同一個(gè)Spliterator上同時(shí)操作,則不定義分割和遍歷的行為。如果原始線程將一個(gè)spliterator傳遞給另一個(gè)線程進(jìn)行處理,那么最好是在使用{@link #tryAdvance(Consumer) tryAdvance()}的任何元素之前進(jìn)行切換,因?yàn)槟承┍WC(例如{@link #estimateSize()}對(duì)于{@code size}spliterator的精度)只有在遍歷開始之前才有效。


Spliterator分割圖.png

Spliterator通過支持分割和單元素迭代,除了支持串行遍歷,還支持高效的并行遍歷。另外,Spliterator 不像Iterator設(shè)計(jì)的那樣設(shè)計(jì)兩個(gè)方法hasNext 判斷是否有元素和next() 返回元素進(jìn)行消費(fèi),Spliterator 設(shè)計(jì)一個(gè)tryAdvance方法,消費(fèi)元素,如果有就消費(fèi)并返回true,如果沒有則返回false,不需要兩個(gè)獨(dú)立的方法。
對(duì)于可變?cè)矗绻赟pliterator綁定到其數(shù)據(jù)源和遍歷結(jié)束之間對(duì)源進(jìn)行結(jié)構(gòu)上的干擾(添加、替換或刪除元素),可能會(huì)出現(xiàn)隨機(jī)和不確定的影響。
對(duì)于structurally interfered 可以有一下幾個(gè)方法避免:

  • 數(shù)據(jù)源為java.util包的CopyOnWriteArrayList ,它是不可變的,數(shù)據(jù)源為該類實(shí)例的Spliterator同樣會(huì)將characteristics聲明為IMMUTABLE

  • 數(shù)據(jù)源為java.util包的ConcurrentHashMap, 數(shù)據(jù)源為該類實(shí)例的Spliterator會(huì)將特性(characteristics) 聲明為CONCURRENT。
    可變的數(shù)據(jù)源會(huì)提供一個(gè) late-binding 和快速失敗的Spliterator。

    這里有一個(gè)類(除了當(dāng)做例子之外,它不是一個(gè)非常有用的類),它維護(hù)一個(gè)數(shù)組,其中實(shí)際數(shù)據(jù)保存在偶數(shù)位置,而不相關(guān)的標(biāo)記數(shù)據(jù)保存在奇數(shù)位置。它的Spliterator會(huì)忽略標(biāo)記數(shù)據(jù)。

/**
 * @Author unyielding
 * @date 2018/7/26 0026 19:48
 * @desc 一個(gè)類(除了當(dāng)做例子之外,它不是一個(gè)非常有用的類),
 * 它維護(hù)一個(gè)數(shù)組, 其中實(shí)際數(shù)據(jù)保存在偶數(shù)位置,而不相關(guān)的標(biāo)記數(shù)據(jù)保存在奇數(shù)位置。
 * 它的Spliterator會(huì)忽略標(biāo)記數(shù)據(jù)。
 */
public class TaggedArray<T> {
    private final Object[] elements;//創(chuàng)建后,不可變的
    /**
     * 構(gòu)造方法
     *
     * @param data 實(shí)際數(shù)據(jù)
     * @param tags 標(biāo)記數(shù)據(jù)
     */
    TaggedArray(T[] data, Object[] tags) {
        int size = data.length;
        //保證實(shí)際數(shù)據(jù)數(shù)組和標(biāo)記數(shù)據(jù)數(shù)組的大小相同
        if (tags.length != size) throw new IllegalArgumentException();
        this.elements = new Object[2 * size];
        //初始化elements 數(shù)組
        for (int i = 0, j = 0; i < size; ++i) {
            elements[j++] = data[i];
            elements[j++] = tags[i];
        }
    }

    public Spliterator<T> spliterator() {
        return new TaggedArraySpliterator<>(elements, 0, elements.length);
    }

    static class TaggedArraySpliterator<T> implements Spliterator<T> {
        private final Object[] array;

        private int origin; //當(dāng)前索引,在分割或者遍歷時(shí)使用

        private final int fence;//最大的下標(biāo)加一

        TaggedArraySpliterator(Object[] array, int origin, int fence) {
            this.array = array;
            this.origin = origin;
            this.fence = fence;
        }

        /**
         *  批量遍歷
         * @param action 消費(fèi)函數(shù) {@link Consumer} 的子類,可以通過lambda表達(dá)式表示
         */
        @Override
        public void forEachRemaining(Consumer<? super T> action) {
            for (; origin < fence; origin += 2) {
                action.accept((T) array[origin]);
            }
        }

        /**
         *  處理單個(gè)元素
         * @param action 消費(fèi)函數(shù) {@link Consumer} 的子類,可以通過lambda表達(dá)式表示
         * @return 如果有元素消費(fèi)就返回true,如果沒有就直接返回false
         */
        @Override
        public boolean tryAdvance(Consumer<? super T> action) {
            if (origin < fence) {
                action.accept((T) array[origin]);
                origin += 2;
                return true;
            }
            return false;
        }

        /**
         * 分割數(shù)據(jù)源
         * @return 返回分割后生成的Spliterator
         */
        @Override
        public Spliterator<T> trySplit() {
            int lo = origin;
            int mid = ((lo + fence) >> 1) & 1;//強(qiáng)制中點(diǎn)數(shù)為偶數(shù)
            if (lo < mid) {
                origin = mid;//重置Spliterator的 當(dāng)前下標(biāo)
                return new TaggedArraySpliterator<>(array, lo, mid);
            }//太小不需要拆分
            return null;
        }

        /**
         * 估計(jì)剩余還有多少元素
         * @return 剩余還有多少元素
         */
        @Override
        public long estimateSize() {
            return (long) ((fence - origin) / 2);
        }

        /**
         * 獲取特征值 用戶可以根據(jù) 特征值 ,
         * 用戶可以根據(jù) 配置更好的控制和優(yōu)化它的使用
         * @return
         */
        @Override
        public int characteristics() {
            return ORDERED | IMMUTABLE | SIZED | SUBSIZED;
        }
    }

    /**
     * 并行遍歷
     * @param a 一個(gè){@link TaggedArray} 實(shí)例
     * @param action
     * @param <T> 每個(gè)元素的值
     */
    static <T> void parEach(TaggedArray<T> a, Consumer<T> action) {
        Spliterator<T> spliterator = a.spliterator();
        long targetBatchSize = spliterator.estimateSize()
                / (ForkJoinPool.getCommonPoolParallelism() * 8);
        new ParEach<>(null, spliterator, action, targetBatchSize).invoke();
    }
}

并行計(jì)算器 ,其實(shí)就是繼承CountedCompleter 一個(gè)可以放到forlk/join 線程池里的類

    /**
     * 并行計(jì)算器
     * @param <T> 元素的類型
     */
    static class ParEach<T> extends CountedCompleter<T> {
        final Spliterator<T> spliterator;
        final Consumer<T> action;
        final long targetBatchSize;

        ParEach(ParEach<T> parent, Spliterator<T> spliterator,
                Consumer<T> action, long targetBatchSize) {
            super(parent);
            this.spliterator = spliterator;
            this.action = action;
            this.targetBatchSize = targetBatchSize;
        }

        @Override
        public void compute() {
            Spliterator<T> sub;
            while (spliterator.estimateSize() > targetBatchSize
                    && (sub = spliterator.trySplit()) != null) {
                addToPendingCount(1);
                new ParEach<>(this, sub, action, targetBatchSize).fork();
            }
            spliterator.forEachRemaining(action);
            propagateCompletion();
        }
    }

至于生成Stream 的姿勢(shì) 詳見Java 之Stream 生成姿勢(shì)
代碼地址
推薦閱讀
http://www.itdecent.cn/p/af22a9d8ce98

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容