parallelStream和ForkJoinPool的使用風(fēng)險(xiǎn)

1.parallelStream和ForkJoinPool

parallelStream是JDK官方在1.8版本中增加的語(yǔ)法級(jí)新特性,主要的特點(diǎn)就是可以幫助用戶(hù)在流式開(kāi)發(fā)時(shí)快速實(shí)現(xiàn)并行編程,從而快速簡(jiǎn)單的實(shí)現(xiàn)多線(xiàn)程運(yùn)行,同時(shí)降低用戶(hù)對(duì)于線(xiàn)程池維護(hù)帶來(lái)的復(fù)雜性。

使用parallelStream的典型代碼:
list.parallelStream().forEach(o -> {
    o.doSomething();
    ...
});

ForkJoinPool是JDK官方在1.7版本中引入的特定線(xiàn)程池,主要應(yīng)用于基于遞歸調(diào)用策略的任務(wù)流多線(xiàn)程調(diào)用場(chǎng)景。

2.風(fēng)險(xiǎn)點(diǎn)

雖然parallelStream的流式編程帶來(lái)的極大的多線(xiàn)程開(kāi)發(fā)便利性,但同時(shí)也帶來(lái)了一個(gè)隱含的邏輯,且并未在接口注釋中說(shuō)明:

    /**
     * Returns a possibly parallel {@code Stream} with this collection as its
     * source.  It is allowable for this method to return a sequential stream.
     *
     * <p>This method should be overridden when the {@link #spliterator()}
     * method cannot return a spliterator that is {@code IMMUTABLE},
     * {@code CONCURRENT}, or <em>late-binding</em>. (See {@link #spliterator()}
     * for details.)
     *
     * @implSpec
     * The default implementation creates a parallel {@code Stream} from the
     * collection's {@code Spliterator}.
     *
     * @return a possibly parallel {@code Stream} over the elements in this
     * collection
     * @since 1.8
     */

以上是該接口的全部注釋?zhuān)@里所謂的隱含邏輯是,并非每一個(gè)獨(dú)立調(diào)用parallelStream的代碼都會(huì)獨(dú)立維護(hù)運(yùn)行一個(gè)多線(xiàn)程的策略,而是JDK默認(rèn)會(huì)調(diào)用同一個(gè)由運(yùn)行環(huán)境維護(hù)的ForkJoinPool線(xiàn)程池,也就是說(shuō),無(wú)論在哪個(gè)地方寫(xiě)了list.parallelStream().forEach();這樣一段代碼,底層實(shí)際都會(huì)由一套ForkJoinPool的線(xiàn)程池進(jìn)行運(yùn)行,一般線(xiàn)程池運(yùn)行會(huì)遇到的沖突、排隊(duì)等問(wèn)題,這里同樣會(huì)遇到,且會(huì)被隱藏在代碼邏輯中。

這里最危險(xiǎn)的當(dāng)然就是線(xiàn)程池的deadlock,一旦發(fā)生deadlock,所有調(diào)用parallelStream的地方都會(huì)被阻塞,無(wú)論你是否知道其他人是否這樣書(shū)寫(xiě)了代碼。

3.會(huì)引起線(xiàn)程池deadlock的場(chǎng)景

3.1 最常見(jiàn)的線(xiàn)程池內(nèi)部阻塞

以這段代碼為例
list.parallelStream().forEach(o -> {
    o.doSomething();
    ...
});

只要在doSomething()中有任何導(dǎo)致當(dāng)前執(zhí)行被hold住的情況,則由于parallelStream完成時(shí)會(huì)執(zhí)行join操作,任何一個(gè)沒(méi)有完成迭代都會(huì)導(dǎo)致join操作被hold住,進(jìn)而導(dǎo)致當(dāng)前線(xiàn)程被卡住。
典型的操作有:線(xiàn)程被wait,鎖,循環(huán)鎖,外部操作卡住等。

3.2 迭代時(shí)對(duì)象被修改

list.parallelStream().forEach()時(shí),如果不甚修改了list對(duì)象的長(zhǎng)度,則也有可能導(dǎo)致join操作無(wú)法完成。

3.3 static代碼塊中執(zhí)行迭代

如果你在一個(gè)類(lèi)的static代碼塊中寫(xiě)了迭代,并且執(zhí)行了lambda表達(dá)式,則也會(huì)導(dǎo)致線(xiàn)程被鎖住。
class A {
    static {
        list.parallelStream().forEach(n -> {
            n.doSomething();
        })
    }
}

這里的原因是執(zhí)行l(wèi)ambda表達(dá)式的前提是當(dāng)前類(lèi)A必須完成類(lèi)初始化,但初始化又由于static代碼塊無(wú)法執(zhí)行,而導(dǎo)致程序互鎖,最終導(dǎo)致卡住

4.建議

1、如果你開(kāi)發(fā)的是V5這樣的復(fù)雜系統(tǒng),不建議直接使用parallelStream執(zhí)行多線(xiàn)程操作
2、如果你真的希望在V5這樣的復(fù)雜系統(tǒng)中使用parallelStream,請(qǐng)考慮構(gòu)建獨(dú)立的ForkJoinPool,使用如下姿勢(shì)調(diào)用:
ForkJoinPool forkJoinPool1 = new ForkJoinPool(20);
ForkJoinTask<?> fs = forkJoinPool.submit(() -> list.parallelStream().forEach((n) -> {
        n.doSomething();
    }));
try {
    result = fs.get();
} catch (InterruptedException e) {
    e.printStackTrace();
} catch (ExecutionException e){
    e.printStackTrace();
}
forkJoinPool.shutdown();
 
即手動(dòng)制定線(xiàn)程池,不過(guò)個(gè)人建議,這樣的寫(xiě)法還不如自己寫(xiě)一般的多線(xiàn)程代碼來(lái)得簡(jiǎn)單。

3、當(dāng)然,如果你開(kāi)發(fā)的微服務(wù)等類(lèi)似的相對(duì)簡(jiǎn)單系統(tǒng),則可以直接使用parallelStream,因?yàn)橄到y(tǒng)的簡(jiǎn)單性,相關(guān)風(fēng)險(xiǎn)會(huì)很低。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀(guān)點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容