Java Stream的并行實現(xiàn)

作者: 一字馬胡
轉(zhuǎn)載標(biāo)志 【2017-11-03】

更新日志

日期 更新內(nèi)容 備注
2017-11-03 添加轉(zhuǎn)載標(biāo)志 持續(xù)更新

并行與并發(fā)

關(guān)于并發(fā)與并行,需要弄清楚的是,并行關(guān)注于多個任務(wù)同時進行,而并發(fā)則通過調(diào)度來不停的切換多個任務(wù)執(zhí)行,而實質(zhì)上多個任務(wù)不是同時執(zhí)的。并發(fā),英文單詞為:Concurrent。并行的英文單詞為:parallel。如果想對并發(fā)和并行有一個比較直觀的認識,可以參考下面這張圖片:

并行與并發(fā)

Fork/Join 框架與 Java Stream API

Fork/Join框架屬于并行框架,關(guān)于Fork/Join框架的一些內(nèi)容,可以參考這篇文章:Java Fork/Join并行框架。簡單來說,F(xiàn)ork/Join框架可以將大的任務(wù)切分為足夠小的任務(wù),然后將小任務(wù)分配給不同的線程來執(zhí)行,而線程之間通過工作竊取算法來協(xié)調(diào)資源,提前昨晚任務(wù)的線程可以去“竊取”其他還沒有做完任務(wù)的線程的任務(wù),而每一個線程都會持有一個雙端隊列,里面存儲著分配給自己的任務(wù),F(xiàn)ork/Join框架在實現(xiàn)上,為了防止線程之間的競爭,線程在消費分配給自己的任務(wù)時,是從隊列頭取任務(wù)的,而“竊取”線程則從隊列尾部取任務(wù)。
Fork/Join框架通過fork方法來分割大任務(wù),通過使用join來獲取小任務(wù)的結(jié)果,然后組合成大任務(wù)的結(jié)果。關(guān)于Fork/Join任務(wù)模型,可以參考下面的圖片:

Fork/Join的任務(wù)模型

關(guān)于Java Stream API的相關(guān)內(nèi)容,可以參考該文章:Java Streams API。

Stream在實現(xiàn)上使用了Fork/Join框架來實現(xiàn)并發(fā),所以使用Stream我們可以在不知不覺間就使得我們的程序跑得飛快,究其原因就是Stream使用了Fork/Join并發(fā)框架來處理任務(wù),當(dāng)然,你需要顯示的指定Stream為parallel,否則Stream默認都是串行流。比如對于Collection,你可以使用parallelStream來轉(zhuǎn)換為一個并發(fā)流,或者使用stream方法轉(zhuǎn)換為串行流,然后使用parallel操作使得串行流變?yōu)椴l(fā)流。本文的重點是剖析Stream是如何使用Fork/Join來做并發(fā)的。

Stream的并發(fā)實現(xiàn)細節(jié)

在了解了Fork/Join并發(fā)框架和Java Stream之后,首要的問題就是:Stream是如何使用Fork/Join框架來做到并發(fā)的?其實對于使用者來說,了解Stream就是通過Fork/Join框架來做的就好了,但是如果想要深入了解一下Fork/Join框架的實踐,以及Java Stream的設(shè)計方法,那么去讀一下實現(xiàn)的源碼還是很有必要的,下文中的分析僅代表個人觀點!

需要注意的一點是,Java Stream的操作分為兩類,也可以分為三類,具體的細節(jié)可以參考該文章:Java Streams API。一個簡單的判斷一個操作是否是Terminal操作還是Intermediate操作的方法是,如果操作返回的是一個新的Stream,那么就是一個Intermediate操作,否則就是一個Terminal操作。

  • Intermediate:一個流可以后面跟隨零個或多個 intermediate 操作。其目的主要是打開流,做出某種程度的數(shù)據(jù)操作,然后返回一個新的流,交給下一個操作使用。這類操作都是惰性化的(lazy),就是說,僅僅調(diào)用到這類方法,并沒有真正開始流的遍歷。

  • Terminal:一個流只能有一個 terminal 操作,當(dāng)這個操作執(zhí)行后,流就被使用“光”了,無法再被操作。所以這必定是流的最后一個操作。Terminal 操作的執(zhí)行,才會真正開始流的遍歷,并且會生成一個結(jié)果,或者一個 side effect。

  • 還有一種操作被稱為 short-circuiting。用以指:

    • 對于一個 intermediate 操作,如果它接受的是一個無限大(infinite/unbounded)的 Stream,但返回一個 有限的新 Stream。
    • 對于一個 terminal 操作,如果它接受的是一個無限大的 Stream,但能在有限的時間計算出結(jié)果。

Java Stream對四種類型的Terminal操作使用了Fork/Join實現(xiàn)了并發(fā)操作,下面的圖片展示了這四種操作類型:

支持并行的四種Stream操作

我們首先來走一遍Stream操作的執(zhí)行路徑,下面的代碼是我們想要做的操作流,下文會根據(jù)該代碼示例來跟蹤Stream的執(zhí)行路徑:

        Stream.of(1,2,3,4)
                .parallel()
                .map(n -> n*2)
                .collect(Collectors.toCollection(ArrayList::new));

解釋一下,上面的代碼想要實現(xiàn)的功能是將(1,2,3,4)這四個數(shù)字每一個都變?yōu)槠渥陨淼膬杀叮缓笫占@些元素到一個ArrayList中返回。這是一個非常簡單的功能,下面是上面的操作流的執(zhí)行路徑:


    step 1:
    
    public static<T> Stream<T> of(T... values) {
        return Arrays.stream(values);
    }
    
    step 2:
    
        public final <R> Stream<R> map(Function<? super P_OUT, ? extends R> mapper) {
        Objects.requireNonNull(mapper);
        return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,
                                     StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
            @Override
            Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {
                return new Sink.ChainedReference<P_OUT, R>(sink) {
                    @Override
                    public void accept(P_OUT u) {
                        downstream.accept(mapper.apply(u));
                    }
                };
            }
        };
    }
    
    step 3:
    
        public final <R, A> R collect(Collector<? super P_OUT, A, R> collector) {
            ...
            container = evaluate(ReduceOps.makeRef(collector));
            ...
    }
    
    step 4:
    
        final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {
        assert getOutputShape() == terminalOp.inputShape();
        if (linkedOrConsumed)
            throw new IllegalStateException(MSG_STREAM_LINKED);
        linkedOrConsumed = true;

        return isParallel()
               ? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))
               : terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
    }
    
    step 5:
    
    使用Fork/Join框架執(zhí)行操作。
    

上面的五個步驟是經(jīng)過一些省略的,需要注意的一點是,intermediate類型的操作僅僅將操作加到一個upstream里面,具體的原文描述如下:


Construct a new Stream by appending a stateless intermediate operation to an existing stream.

比如上面我們的操作中的map操作,實際上只是將操作加到一個intermediate鏈條上面,不會立刻執(zhí)行。重點是第五步,Stream是如何使用Fork/Join來實現(xiàn)并發(fā)的。evaluate這個方法至關(guān)重要,在方法里面會分開處理,對于設(shè)置了并發(fā)標(biāo)志的操作流,會使用Fork/Join來并發(fā)執(zhí)行操作任務(wù),而對于沒有打開并發(fā)標(biāo)志的操作流,則串行執(zhí)行操作。

Fork/Join框架的核心方法是一個叫做compute的方法,下面分析一個forEach操作如何通過Fork/Join框架來實現(xiàn)并發(fā),通過追蹤代碼,可以發(fā)現(xiàn)forEach的并發(fā)版本其實是一個交由一個ForEachTask對象來做,而ForEachTask類中實現(xiàn)了compute方法:

// Similar to AbstractTask but doesn't need to track child tasks
        public void compute() {
            Spliterator<S> rightSplit = spliterator, leftSplit;
            long sizeEstimate = rightSplit.estimateSize(), sizeThreshold;
            if ((sizeThreshold = targetSize) == 0L)
                targetSize = sizeThreshold = AbstractTask.suggestTargetSize(sizeEstimate);
            boolean isShortCircuit = StreamOpFlag.SHORT_CIRCUIT.isKnown(helper.getStreamAndOpFlags());
            boolean forkRight = false;
            Sink<S> taskSink = sink;
            ForEachTask<S, T> task = this;
            while (!isShortCircuit || !taskSink.cancellationRequested()) {
                if (sizeEstimate <= sizeThreshold ||
                    (leftSplit = rightSplit.trySplit()) == null) {
                    task.helper.copyInto(taskSink, rightSplit);
                    break;
                }
                ForEachTask<S, T> leftTask = new ForEachTask<>(task, leftSplit);
                task.addToPendingCount(1);
                ForEachTask<S, T> taskToFork;
                if (forkRight) {
                    forkRight = false;
                    rightSplit = leftSplit;
                    taskToFork = task;
                    task = leftTask;
                }
                else {
                    forkRight = true;
                    taskToFork = leftTask;
                }
                taskToFork.fork();
                sizeEstimate = rightSplit.estimateSize();
            }
            task.spliterator = null;
            task.propagateCompletion();
        }
    }

在上面的代碼中將大任務(wù)拆成成了小任務(wù),那哪里收集了這些小任務(wù)呢?看下面的代碼:

        @Override
        public <S> Void evaluateParallel(PipelineHelper<T> helper,
                                         Spliterator<S> spliterator) {
            if (ordered)
                new ForEachOrderedTask<>(helper, spliterator, this).invoke();
            else
                new ForEachTask<>(helper, spliterator, helper.wrapSink(this)).invoke();
            return null;
        }

可以看到調(diào)用了invoke方法,而對invoke的描述如下:

     * Commences performing this task, awaits its completion if
     * necessary, and returns its result, or throws an (unchecked)
     * {@code RuntimeException} or {@code Error} if the underlying
     * computation did so.

不是說Fork/Join框架嘛?那有了fork為什么沒有join而是invoke呢?下面是對join方法的描述:


     * Returns the result of the computation when it {@link #isDone is
     * done}.  This method differs from {@link #get()} in that
     * abnormal completion results in {@code RuntimeException} or
     * {@code Error}, not {@code ExecutionException}, and that
     * interrupts of the calling thread do <em>not</em> cause the
     * method to abruptly return by throwing {@code
     * InterruptedException}.
     

根據(jù)join的描述,我們知道還可以使用get方法來獲取結(jié)果,但是get方法會拋出異常而join和invoke方法都不會拋出異常,而是將異常報告給ForkJoinTask,讓ForkJoinTask來拋出異常。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 轉(zhuǎn)自: Java 8 中的 Streams API 詳解 為什么需要 Stream Stream 作為 Java ...
    普度眾生的面癱青年閱讀 2,978評論 0 11
  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn),斷路器,智...
    卡卡羅2017閱讀 136,578評論 19 139
  • 為什么需要 Stream Stream 作為 Java 8 的一大亮點,它與 java.io 包里的 InputS...
    鐵鋼0閱讀 158評論 0 0
  • 本文采用實例驅(qū)動的方式,對JAVA8的stream API進行一個深入的介紹。雖然JAVA8中的stream AP...
    浮梁翁閱讀 26,143評論 3 50
  • 伴隨著一場淋漓的大雨,呼吸著新鮮的空氣,開始了小歐的第一天的正課。最開始是入學(xué)儀式,陽光挺拔的男生、端莊美...
    王延旭閱讀 673評論 0 2

友情鏈接更多精彩內(nèi)容