第十四章 并發(fā)編程 1

愛麗絲:“我可不想到瘋子中間去”

貓咪:“啊,那沒轍了,我們這都是瘋子。我瘋了,你也瘋了”

愛麗絲:“你怎么知道我瘋了”。

貓咪:“你一定是瘋了,否則你就不會來這兒” ——愛麗絲夢游仙境 第 6 章。

在本章之前,我們慣用一種簡單順序的敘事方式來編程,有點類似文學(xué)中的意識流:第一件事發(fā)生了,然后是第二件,第三件……總之,我們完全掌握著事情發(fā)生的進展和順序。如果我們將一個值設(shè)置為 5,再看時它已變成 47 的話,這就令人匪夷所思了。

現(xiàn)在,我們來到了陌生的并發(fā)世界,在這里這樣的結(jié)果一點都不奇怪。你原來相信的一切都不再可靠。原有的規(guī)則可能生效也可能失效。更可能的是原有的規(guī)則只會在某些情況下生效。我們只有完全了解這些情況,才能決定我們處理事情的方式。

比如,我們正常的生活的世界是遵循經(jīng)典牛頓力學(xué)的。物體具有質(zhì)量:會墜落并且轉(zhuǎn)移動能。電線有電阻,光沿直線傳播。假如我們進入到極小、極大、極冷或者極熱(那些我們無法生存的世界),這些現(xiàn)象就會發(fā)生改變。我們無法判斷某物體是粒子還是波,光是否受到重力影響,一些物質(zhì)還會變?yōu)槌瑢?dǎo)體。

假設(shè)我們處在多條故事線并行的間諜小說里,非單一意識流地敘事:第一個間諜在巖石底留下了微縮膠片。當(dāng)?shù)诙€間諜來取時,膠片可能已被第三個間諜拿走。小說并沒有交代此處的細(xì)節(jié)。所以直到故事結(jié)尾,我們都沒搞清楚到底發(fā)生了什么。

構(gòu)建并發(fā)程序好比玩搭積木游戲。每拉出一塊放在塔頂時都有崩塌的可能。每個積木塔和應(yīng)用程序都是獨一無二的,有著自己的作用。你在某個系統(tǒng)構(gòu)建中學(xué)到的知識并不一定適用于下一個系統(tǒng)。

本章是對并發(fā)概念最基本的介紹。雖然我們用到了現(xiàn)代的 Java 8 工具來演示原理,但還遠(yuǎn)未及全面論述并發(fā)。我的目標(biāo)是為你提供足夠的基礎(chǔ)知識,使你能夠把握問題的復(fù)雜性和危險性,從而安全地渡過這片鯊魚肆虐的困難水域。

更多繁瑣和底層的細(xì)節(jié),請參閱附錄:并發(fā)底層原理。要進一步深入這個領(lǐng)域,你還必須閱讀 Brian Goetz 等人的 《Java Concurrency in Practice》。在撰寫本文時,該書已有十多年的歷史了,但它仍包含我們必須要了解和明白的知識要點。理想情況下,本章和上述附錄是閱讀該書的良好前提。另外,Bill Venner 的 《Inside the Java Virtual Machine》也很值得一看。它詳細(xì)描述了包括線程在內(nèi)的 JVM 的內(nèi)部工作方式。

術(shù)語問題

術(shù)語“并發(fā)”,“并行”,“多任務(wù)”,“多處理”,“多線程”,分布式系統(tǒng)(可能還有其他)在整個編程文獻中都以多種相互沖突的方式使用,并且經(jīng)常被混為一談。 Brian Goetz 在他 2016 年《從并發(fā)到并行》的演講中指出了這一點,之后提出了合理的區(qū)分:

  • 并發(fā)是關(guān)于正確有效地控制對共享資源的訪問。
  • 并行是使用額外的資源來更快地產(chǎn)生結(jié)果。

這些定義很好,但是我們已有幾十年混亂使用和抗拒解決此問題的歷史了。一般來說,當(dāng)人們使用“并發(fā)”這個詞時,他們的意思是“所有的一切”。事實上,我自己也經(jīng)常陷入這樣的想法。在大多數(shù)書籍中,包括 Brian Goetz 的 《Java Concurrency in Practice》,都在標(biāo)題中使用這個詞。

“并發(fā)”通常表示:”不止一個任務(wù)正在執(zhí)行“。而“并行”幾乎總是代表:”不止一個任務(wù)同時執(zhí)行“。現(xiàn)在我們能立即看出這些定義中的問題所在:“并行”也有不止一個任務(wù)正在執(zhí)行的語義在里面。區(qū)別就在于細(xì)節(jié):究竟是怎么“執(zhí)行”的。此外還有一些重疊:為并行編寫的程序依舊可以在單處理器上運行,而并發(fā)編寫的系統(tǒng)也可以利用多個處理器。

還有另一種方式,圍繞”緩慢“出現(xiàn)的情況寫下定義:

并發(fā)

同時完成多任務(wù)。無需等待當(dāng)前任務(wù)完成即可執(zhí)行其他任務(wù)。“并發(fā)”解決了程序因外部控制而無法進一步執(zhí)行的阻塞問題。最常見的例子就是 I/O 操作,任務(wù)必須等待數(shù)據(jù)輸入(在一些例子中也稱阻塞)。這個問題常見于 I/O 密集型任務(wù)。

并行

同時在多個位置完成多任務(wù)。這解決了所謂的 CPU 密集型問題:將程序分為多部分,在多個處理器上同時處理不同部分來加快程序執(zhí)行效率。

上面的定義說明了這兩個術(shù)語令人困惑的原因:兩者的核心都是“同時完成多個任務(wù)”,不過并行增加了跨多個處理器的分布。更重要的是,它們可以解決不同類型的問題:并行可能對解決 I / O 密集型問題沒有任何好處,因為問題不在于程序的整體執(zhí)行速度,而在于 I/O 阻塞。而嘗試在單個處理器上使用并發(fā)來解決計算密集型問題也可能是浪費時間。兩種方法都試圖在更短的時間內(nèi)完成更多工作,但是它們實現(xiàn)加速的方式有所不同,這取決于問題施加的約束。

這兩個概念混合在一起的一個主要原因是包括 Java 在內(nèi)的許多編程語言使用相同的機制 - 線程來實現(xiàn)并發(fā)和并行。

我們甚至可以嘗試以更細(xì)的粒度去進行定義(然而這并不是標(biāo)準(zhǔn)化的術(shù)語):

  • 純并發(fā):仍然在單個 CPU 上運行任務(wù)。純并發(fā)系統(tǒng)比時序系統(tǒng)更快地產(chǎn)生結(jié)果,但是它的運行速度不會因為處理器的增加而變得更快。

  • 并發(fā)-并行:使用并發(fā)技術(shù),結(jié)果程序可以利用多處理器更快地產(chǎn)生結(jié)果。

  • 并行-并發(fā):使用并行編程技術(shù)編寫,即使只有一個處理器,結(jié)果程序仍然可以運行(Java 8 Streams 就是一個很好的例子)。

  • 純并行:只有多個處理器的情況下才能運行。

在某些情況下,這是一個有效的分類法。

支持并發(fā)性的語言和庫似乎是抽象泄露(Leaky Abstraction一詞的完美候選。抽象的目標(biāo)是“抽象”掉那些對手頭的想法不重要的部分,以屏蔽不必要的細(xì)節(jié)所帶來的影響。如果抽象發(fā)生泄露,那么即使費很大功夫去隱藏它們,這些細(xì)枝末節(jié)也總會不斷凸顯出自己是重要的。

于是我開始懷疑是否真的有高度地抽象。因為當(dāng)編寫這類程序時,底層的系統(tǒng)、工具,甚至是關(guān)于 CPU 緩存如何工作的細(xì)節(jié),都永遠(yuǎn)不會被屏蔽。最后,即使你已非常謹(jǐn)慎,你開發(fā)的程序也不一定在所有情況下運行正常。有時是因為兩臺機器的配置不同,有時是程序的預(yù)計負(fù)載不同。這不是 Java 特有的 - 這是并發(fā)和并行編程的本質(zhì)。

你可能會認(rèn)為純函數(shù)式語言沒有這些限制。實際上,純函數(shù)式語言的確解決了大量并發(fā)問題。如果你正在解決一個困難的并發(fā)問題,可以考慮用純函數(shù)語言編寫這個部分。但是,如果你編寫一個使用隊列的系統(tǒng),例如,如果該系統(tǒng)沒有被合理地調(diào)優(yōu),并且輸入速率也沒有被正確地估計或限制(在不同的情況下,限制意味著具有不同的影響的不同東西),該隊列要么被填滿并阻塞,要么溢出。最后,你必須了解所有可能會破壞你的系統(tǒng)的細(xì)節(jié)和問題。這是一種非常不同的編程方式。

并發(fā)的新定義

幾十年來,我一直在努力解決各種形式的并發(fā)問題,其中一個最大的挑戰(zhàn)是簡潔的定義它。在撰寫本章的過程中,我終于有了這樣的洞察力,我將其定義為:

并發(fā)性是一系列專注于減少等待的性能技術(shù)

這實際上是一個相當(dāng)復(fù)雜的表述,所以我將其分解:

  • 這是一個集合:包含許多不同的方法來解決這個問題。因為技術(shù)差異很大,這是使定義并發(fā)性如此具有挑戰(zhàn)性的問題之一。
  • 這些是性能技術(shù):就是這樣。并發(fā)的關(guān)鍵點在于讓你的程序運行得更快。在 Java 中,并發(fā)是非常棘手和困難的,所以絕對不要使用它,除非你有一個重大的性能問題 - 即使這樣,使用最簡單的方法產(chǎn)生你需要的性能,因為并發(fā)很快變得難以管理。
  • “減少等待”部分很重要而且微妙。無論(例如)你的程序運行在多少個處理器上,你只能在等待發(fā)生時產(chǎn)生效益。如果你發(fā)起 I/O 請求并立即獲得結(jié)果,沒有延遲,因此無需改進。如果你在多個處理器上運行多個任務(wù),并且每個處理器都以滿容量運行,并且沒有任務(wù)需要等待其他任務(wù),那么嘗試提高吞吐量是沒有意義的。并發(fā)的唯一機會是程序的某些部分被迫等待。等待會以多種形式出現(xiàn) - 這解釋了為什么存在多種不同的并發(fā)方法。

值得強調(diào)的是,這個定義的有效性取決于“等待”這個詞。如果沒有什么可以等待,那就沒有機會去加速。如果有什么東西在等待,那么就會有很多方法可以加快速度,這取決于多種因素,包括系統(tǒng)運行的配置,你要解決的問題類型以及其他許多問題。

并發(fā)的超能力

想象一下,你置身于一部科幻電影。你必須在一棟大樓中找到一個東西,它被小心而巧妙地隱藏在大樓千萬個房間中的一間。你進入大樓,沿著走廊走下去。走廊是分開的。

一個人完成這項任務(wù)要花上一百輩子的時間。

現(xiàn)在假設(shè)你有一個奇怪的超能力。你可以將自己一分為二,然后在繼續(xù)前進的同時將另一半送到另一個走廊。每當(dāng)你在走廊或樓梯上遇到分隔到下一層時,你都會重復(fù)這個分裂的技巧。最終,整個建筑中的每個走廊的終點都有一個你。

每個走廊都有一千個房間。此時你的超能力變得弱了一點,你只能克隆 50 個自己來并發(fā)搜索走廊里面的房間。

一旦克隆體進入房間,它必須搜索房間的每個角落。這時它切換到了第二種超能力。它分裂成了一百萬個納米機器人,每個機器人都會飛到或爬到房間里一些看不見的地方。你不需要了解這種功能 - 一旦你開啟它就會自動工作。在他們自己的控制下,納米機器人開始行動,搜索房間然后回來重新組裝成你,突然間,不知怎么的,你就知道這間房間里有沒有那個東西。

我很想說,“并發(fā)就是剛才描述的置身于科幻電影中的超能力“。就像你自己可以一分為二然后解決更多的問題一樣簡單。但是問題在于,我們來描述這種現(xiàn)象的任何模型最終都是抽象泄露的(leaky abstraction)。

以下是其中一個泄露:在理想的世界中,每次克隆自己時,也會復(fù)制一個物理處理器來運行克隆搜索者。這當(dāng)然是不現(xiàn)實的——實際上,你的機器上一般只有 4 個或 8 個處理器核心(編寫本文時的典型情況)?;蛟S你擁有更多的處理器,但仍有很多情況下只有一個單核處理器。在關(guān)于抽象的討論中,分配物理處理器核心這本身就是抽象的泄露,甚至也可以支配你的決策。
?
讓我們在科幻電影中改變一些東西?,F(xiàn)在當(dāng)每個克隆搜索者最終到達(dá)一扇門時,他們必須敲門并等到有人開門。如果每個搜索者都有一個處理器核心,這沒有問題——只是空閑等待直到有人開門。但是如果我們只有 8 個處理器核心卻有幾千個搜索者,我們不希望處理器僅僅因為某個搜索者恰好在等待回答中被鎖住而閑置下來。相反,我們希望將處理器應(yīng)用于可以真正執(zhí)行工作的搜索者身上,因此需要將處理器從一個任務(wù)切換到另一個任務(wù)的機制。

許多模型能夠有效地隱藏處理器的數(shù)量,允許你假裝有很多個處理器。但在某些情況下,當(dāng)你必須明確知道處理器數(shù)量以便于工作的時候,這些模型就會失效。

最大的影響之一取決于是使用單核處理器還是多核處理器。如果你只有單核處理器,那么任務(wù)切換的成本也由該核心承擔(dān),將并發(fā)技術(shù)應(yīng)用于你的系統(tǒng)會使它運行得更慢。

這可能會讓你以為,在單核處理器的情況下,編寫并發(fā)代碼是沒有意義的。然而,有些情況下,并發(fā)模型會產(chǎn)生更簡單的代碼,光是為了這個目的就值得舍棄一些性能。

在克隆體敲門等待的情況下,即使單核處理器系統(tǒng)也能從并發(fā)中受益,因為它可以從等待(阻塞)的任務(wù)切換到準(zhǔn)備運行的任務(wù)。但是如果所有任務(wù)都可以一直運行那么切換的成本反而會使任務(wù)變慢,在這種情況下,并發(fā)只在如果你有多個處理器的情況下有意義。

假設(shè)你正在嘗試破解某種密碼,在同一時間內(nèi)參與破解的線程越多,你越快得到答案的可能性就越大。每個線程都能持續(xù)使用你所分配的處理器時間,在這種情況下(CPU 密集型問題),你代碼中的線程數(shù)應(yīng)該和你擁有的處理器的核心數(shù)保持一致。

在接聽電話的客戶服務(wù)部門,你只有一定數(shù)量的員工,但是你的部門可能會收到很多電話。這些員工(處理器)一次只能接聽一個電話直到打完,此時其它打來的電話必須排隊等待。

在“鞋匠和精靈”的童話故事中,鞋匠有很多工作要做,當(dāng)他睡著時,出現(xiàn)了一群精靈來為他制作鞋子。這里的工作是分布式的,但即使使用大量的物理處理器,在制造鞋子的某些部件時也會產(chǎn)生限制——例如,如果鞋底的制作時間最長,這就限制了鞋子的制作速度,這也會改變你設(shè)計解決方案的方式。

因此,你要解決的問題驅(qū)動了方案的設(shè)計。將一個問題分解成“獨立運行”的子任務(wù),這是一種美好的抽象,然后就是實際發(fā)生的現(xiàn)實:物理現(xiàn)實不斷干擾和動搖這個抽象。

這只是問題的一部分:考慮一個制作蛋糕的工廠。我們以某種方式把制作蛋糕的任務(wù)分給了工人們,現(xiàn)在是時候讓工人把蛋糕放在盒子里了。那里有一個準(zhǔn)備存放蛋糕的盒子。但是在一個工人把蛋糕放進盒子之前,另一個工人就沖過去,把蛋糕放進盒子里,砰!這兩個蛋糕撞到一起砸壞了。這是常見的“共享內(nèi)存”問題,會產(chǎn)生所謂的競態(tài)條件(race condition),其結(jié)果取決于哪個工人能先把蛋糕放進盒子里(通常使用鎖機制來解決問題,因此一個工作人員可以先抓住一個盒子并防止蛋糕被砸爛)。

當(dāng)“同時”執(zhí)行的任務(wù)相互干擾時,就會出現(xiàn)問題。這可能以一種微妙而偶然的方式發(fā)生,因此可以說并發(fā)是“可以論證的確定性,但實際上是不確定性的”。也就是說,假設(shè)你很小心地編寫并發(fā)程序,而且通過了代碼檢查可以正確運行。然而實際上,我們編寫的并發(fā)程序大部分情況下都能正常運行,但是在一些特定情況下會失敗。這些情況可能永遠(yuǎn)不會發(fā)生,或者在你在測試期間幾乎很難發(fā)現(xiàn)它們。實際上,編寫測試代碼通常無法為并發(fā)程序生成故障條件。由此產(chǎn)生的失敗只會偶爾發(fā)生,因此它們以客戶投訴的形式出現(xiàn)。這是學(xué)習(xí)并發(fā)中最強有力的論點之一:如果你忽略它,你可能會受傷。

因此,并發(fā)似乎充滿了危險,如果這讓你有點害怕,這可能是一件好事。盡管 Java 8 在并發(fā)性方面做出了很大改進,但仍然沒有像編譯時驗證 (compile-time verification) 或受檢查的異常 (checked exceptions) 那樣的安全網(wǎng)來告訴你何時出現(xiàn)錯誤。關(guān)于并發(fā),你只能依靠自己,只有知識淵博、保持懷疑和積極進取的人,才能用 Java 編寫可靠的并發(fā)代碼。


并發(fā)為速度而生

在聽說并發(fā)編程的問題之后,你可能會想知道它是否值得這么麻煩。答案是“不,除非你的程序運行速度不夠快?!辈⑶以跊Q定用它之前你會想要仔細(xì)思考。不要隨便跳進并發(fā)編程的悲痛之中。如果有一種方法可以在更快的機器上運行你的程序,或者如果你可以對其進行分析并發(fā)現(xiàn)瓶頸并在該位置替換更快的算法,那么請執(zhí)行此操作。只有在顯然沒有其他選擇時才開始使用并發(fā),然后僅在必要的地方去使用它。

速度問題一開始聽起來很簡單:如果你想要一個程序運行得更快,將其分解為多個部分,并在單獨的處理器上運行每個部分。隨著我們提高時鐘速度的能力耗盡(至少對傳統(tǒng)芯片而言),速度的提高是出現(xiàn)在多核處理器的形式而不是更快的芯片。為了使程序運行得更快,你必須學(xué)會利用那些額外的處理器(譯者注:處理器一般代表 CPU 的一個邏輯核心),這是并發(fā)所帶來的好處之一。

對于多處理器機器,可以在這些處理器之間分配多個任務(wù),這可以顯著提高吞吐量。強大的多處理器 Web 服務(wù)器通常就是這種情況,它可以在程序中為 CPU 分配大量用戶請求,每個請求分配一個線程。

但是,并發(fā)通常可以提高在單處理器上運行的程序的性能。這聽起來有點違反直覺。如果你仔細(xì)想想,由于上下文切換的成本增加(從一個任務(wù)切換到另一個任務(wù)),在單個處理器上運行的并發(fā)程序?qū)嶋H上應(yīng)該比程序的所有部分順序運行具有更多的開銷。從表面上看,將程序的所有部分作為單個任務(wù)運行,并且節(jié)省上下文切換的成本,這樣看似乎更劃算。

使這個問題變得有些不同的是阻塞。如果程序中的某個任務(wù)由于程序控制之外的某種情況而無法繼續(xù)(通常是 I/O),我們就稱該任務(wù)或線程已阻塞(在我們的科幻故事中,就是克隆人已經(jīng)敲門并等待它打開)。如果沒有并發(fā),整個程序就會停下來,直到外部條件發(fā)生變化。但是,如果使用并發(fā)編寫程序,則當(dāng)一個任務(wù)被阻塞時,程序中的其他任務(wù)可以繼續(xù)執(zhí)行,因此整個程序得以繼續(xù)運行。事實上,從性能的角度來看,如果沒有任務(wù)會阻塞,那么在單處理器機器上使用并發(fā)是沒有意義的。

單處理器系統(tǒng)中性能改進的一個常見例子是事件驅(qū)動編程,特別是用戶界面編程。考慮一個程序執(zhí)行一些耗時操作,最終忽略用戶輸入導(dǎo)致無響應(yīng)。如果你有一個“退出”按鈕,你不想在你編寫的每段代碼中都檢查它的狀態(tài)(輪詢)。這會產(chǎn)生笨拙的代碼,也無法保證程序員不會忘了檢查。沒有并發(fā),生成可響應(yīng)用戶界面的唯一方法是讓所有任務(wù)都定期檢查用戶輸入。通過創(chuàng)建單獨的線程以執(zhí)行用戶輸入的響應(yīng),能夠讓程序保證一定程度的響應(yīng)能力。

實現(xiàn)并發(fā)的一種簡單方式是使用操作系統(tǒng)級別的進程。與線程不同,進程是在其自己的地址空間中運行的獨立程序。進程的優(yōu)勢在于,因為操作系統(tǒng)通常將一個進程與另一個進程隔離,因此它們不會相互干擾,這使得進程編程相對容易。相比之下,線程之間會共享內(nèi)存和 I/O 等資源,因此編寫多線程程序最基本的困難,在于協(xié)調(diào)不同線程驅(qū)動的任務(wù)之間對這些資源的使用,以免這些資源同時被多個任務(wù)訪問。

有些人甚至提倡將進程作為唯一合理的并發(fā)實現(xiàn)方式[^1],但遺憾的是,通常存在數(shù)量和開銷方面的限制,從而阻止了進程在并發(fā)范圍內(nèi)的適用性(最終你會習(xí)慣標(biāo)準(zhǔn)的并發(fā)限制,“這種方法適用于一些情況但不適用于其他情況”)

一些編程語言旨在將并發(fā)任務(wù)彼此隔離。這些通常被稱為函數(shù)式語言,其中每個函數(shù)調(diào)用不產(chǎn)生副作用(不會干擾到其它函數(shù)),所以可以作為獨立的任務(wù)來驅(qū)動。Erlang 就是這樣一種語言,它包括一個任務(wù)與另一個任務(wù)進行通信的安全機制。如果發(fā)現(xiàn)程序的某一部分必須大量使用并發(fā),并且在嘗試構(gòu)建該部分時遇到了過多的問題,那么可以考慮使用這些專用的并發(fā)語言創(chuàng)建程序的這個部分。

Java 采用了更傳統(tǒng)的方法[^2],即在順序語言之上添加對線程的支持而不是在多任務(wù)操作系統(tǒng)中分叉外部進程,線程是在表示執(zhí)行程序的單個進程內(nèi)創(chuàng)建任務(wù)。

并發(fā)會帶來各種成本,包括復(fù)雜性成本,但可以被程序設(shè)計、資源平衡和用戶便利性方面的改進所抵消。通常,并發(fā)性使你能夠創(chuàng)建更低耦合的設(shè)計;另一方面,你必須特別關(guān)注那些使用了并發(fā)操作的代碼。

Java 并發(fā)的四句格言

在經(jīng)歷了多年 Java 并發(fā)的實踐之后,我總結(jié)了以下四個格言:

1.不要用它(避免使用并發(fā))

2.沒有什么是真的,一切可能都有問題

3.僅僅是它能運行,并不意味著它沒有問題

4.你必須理解它(逃不掉并發(fā))

這些格言專門針對 Java 的并發(fā)設(shè)計問題,盡管它們也可以適用于其他一些語言。但是,確實存在旨在防止這些問題的語言。

1.不要用它

(而且不要自己去實現(xiàn)它)

避免陷入并發(fā)所帶來的玄奧問題的最簡單方法就是不要用它。盡管嘗試一些簡單的東西可能很誘人,也似乎足夠安全,但是陷阱卻是無窮且微妙的。如果你能避免使用它,你的生活將會輕松得多。

使用并發(fā)唯一的正當(dāng)理由是速度。如果你的程序運行速度不夠快——這里要小心,因為僅僅想讓它運行得更快不是正當(dāng)理由——應(yīng)該首先用一個分析器(參見代碼校驗章中分析和優(yōu)化)來發(fā)現(xiàn)你是否可以執(zhí)行其他一些優(yōu)化。

如果你被迫使用并發(fā),請采取最簡單,最安全的方法來解決問題。使用知名的庫并盡可能少地自己編寫代碼。對于并發(fā),就沒有“太簡單了”——自作聰明是你的敵人。

2.沒有什么是真的,一切可能都有問題

不使用并發(fā)編程,你已經(jīng)預(yù)料到你的世界具有確定的順序和一致性。對于變量賦值這樣簡單的操作,很明顯它應(yīng)該總是能夠正常工作。

在并發(fā)領(lǐng)域,有些事情可能是真的而有些事情卻不是,以至于你必須假設(shè)沒有什么是真的。你必須質(zhì)疑一切。即使將變量設(shè)置為某個值也可能不會按預(yù)期的方式工作,事情從這里開始迅速惡化。我已經(jīng)熟悉了這樣一種感覺:我認(rèn)為應(yīng)該明顯奏效的東西,實際上卻行不通。

在非并發(fā)編程中你可以忽略的各種事情,在并發(fā)下突然變得很重要。例如,你必須了解處理器緩存以及保持本地緩存與主內(nèi)存一致的問題,你必須理解對象構(gòu)造的深層復(fù)雜性,這樣你的構(gòu)造函數(shù)就不會意外地暴露數(shù)據(jù),以致于被其它線程更改。這樣的例子不勝枚舉。

雖然這些主題過于復(fù)雜,無法在本章中給你提供專業(yè)知識(同樣,請參見 Java Concurrency in Practice),但你必須了解它們。

3.僅僅是它能運行,并不意味著它沒有問題

我們很容易編寫出一個看似正常實則有問題的并發(fā)程序,而且問題只有在極少的情況下才會顯現(xiàn)出來——在程序部署后不可避免地會成為用戶問題(投訴)。

  • 你不能驗證出并發(fā)程序是正確的,你只能(有時)驗證出它是不正確的。
  • 大多數(shù)情況下你甚至沒辦法驗證:如果它出問題了,你可能無法檢測到它。
  • 你通常無法編寫有用的測試,因此你必須依靠代碼檢查和對并發(fā)的深入了解來發(fā)現(xiàn)錯誤。
  • 即使是有效的程序也只能在其設(shè)計參數(shù)下工作。當(dāng)超出這些設(shè)計參數(shù)時,大多數(shù)并發(fā)程序會以某種方式失敗。

在其他 Java 主題中,我們養(yǎng)成了決定論的觀念。一切都按照語言的承諾的(或暗示的)發(fā)生,這是令人欣慰的也是人們所期待的——畢竟,編程語言的意義就是讓機器做我們想要它做的事情。從確定性編程的世界進入并發(fā)編程領(lǐng)域,我們遇到了一種稱為 鄧寧-克魯格效應(yīng) 的認(rèn)知偏差,可以概括為“無知者無畏”,意思是:“相對不熟練的人擁有著虛幻的優(yōu)越感,錯誤地評估他們的能力遠(yuǎn)高于實際。

我自己的經(jīng)驗是,無論你是多么確定你的代碼是線程安全的,它都可能是有問題的。你可以很容易地了解所有的問題,然后幾個月或幾年后你會發(fā)現(xiàn)一些概念,讓你意識到你編寫的大多數(shù)代碼實際上都容易受到并發(fā) bug 的影響。當(dāng)某些代碼不正確時,編譯器不會告訴你。為了使它正確,在研究代碼時,必須將并發(fā)性的所有問題都放在前腦中。

在 Java 的所有非并發(fā)領(lǐng)域,“沒有明顯的 bug 而且沒有編譯報錯“似乎意味著一切都好。但對于并發(fā),它沒有任何意義。在這種情況你最糟糕的表現(xiàn)就是“自信”。

4.你必須理解它

在格言 1-3 之后,你可能會對并發(fā)性感到害怕,并且認(rèn)為,“到目前為止,我已經(jīng)避免了它,也許我可以繼續(xù)避免它。

這是一種理性的反應(yīng)。你可能知道其他更好地被設(shè)計用于構(gòu)建并發(fā)程序的編程語言——甚至是在 JVM 上運行的語言(從而提供與 Java 的輕松通信),例如 Clojure 或 Scala。為什么不用這些語言來編寫并發(fā)部分,然后用Java來做其他的事情呢?

唉,你不能輕易逃脫:

  • 即使你從未顯示地創(chuàng)建一個線程,你使用的框架也可能——例如,Swing 圖形用戶界面(GUI)庫,或者像 Timer 類(計時器)那樣簡單的東西。
  • 最糟糕的是:當(dāng)你創(chuàng)建組件時,必須假設(shè)這些組件可能會在多線程環(huán)境中重用。即使你的解決方案是放棄并聲明你的組件是“非線程安全的”,你仍然必須充分了解這樣一個語句的重要性及其含義。

人們有時會認(rèn)為并發(fā)對于介紹語言的書來說太高級了,因此不適合放在其中。他們認(rèn)為并發(fā)是一個獨立的主題,并且對于少數(shù)出現(xiàn)在日常的程序設(shè)計中的情況(例如圖形用戶界面),可以用特殊的慣用法來處理。如果你可以回避,為什么還要介紹這么復(fù)雜的主題呢?

唉,如果是這樣就好了。遺憾的是,對于線程何時出現(xiàn)在 Java 程序中,這不是你能決定的。僅僅是你自己沒有啟動線程,并不代表你就可以回避編寫使用線程的代碼。例如,Web 系統(tǒng)是最常見的 Java 應(yīng)用之一,本質(zhì)上是多線程的 Web 服務(wù)器,通常包含多個處理器,而并行是利用這些處理器的理想方式。盡管這樣的系統(tǒng)看起來很簡單,但你必須理解并發(fā)才能正確地編寫它。

Java 是一種多線程語言,不管你有沒有意識到并發(fā)問題,它就在那里。因此,有很多使用并發(fā)的 Java 程序,要么只是偶然運行,要么大部分時間都在運行,并且會因為未被發(fā)現(xiàn)的并發(fā)缺陷而時不時地神秘崩潰。有時這種崩潰是相對溫和的,但有時它意味著丟失有價值的數(shù)據(jù),如果你沒有意識到并發(fā)問題,你最終可能會把問題歸咎于其他地方而不是你的代碼中。如果將程序移動到多處理器系統(tǒng)中,這些類型的問題還會被暴露或放大。基本上,了解并發(fā)可以使你意識到明顯正確的程序也可能會表現(xiàn)出錯誤的行為。

殘酷的真相

當(dāng)人類開始烹飪他們的食物時,他們大大減少了他們的身體分解和消化食物所需的能量。烹飪創(chuàng)造了一個“外化的胃”,從而釋放出能量去發(fā)展其他的能力?;鸬氖褂么俪闪宋拿?。

我們現(xiàn)在通過計算機和網(wǎng)絡(luò)技術(shù)創(chuàng)造了一個“外化大腦”,開始了第二次基本轉(zhuǎn)變。雖然我們只是觸及表面,但已經(jīng)引發(fā)了其他轉(zhuǎn)變,例如設(shè)計生物機制的能力,并且已經(jīng)看到文化演變的顯著加速(過去,人們通過旅游進行文化交流,但現(xiàn)在他們開始在互聯(lián)網(wǎng)上做這件事)。這些轉(zhuǎn)變的影響和好處已經(jīng)超出了科幻作家預(yù)測它們的能力(他們在預(yù)測文化和個人變化,甚至技術(shù)轉(zhuǎn)變的次要影響方面都特別困難)。

有了這種根本性的人類變化,看到許多破壞和失敗的實驗并不令人驚訝。實際上,進化依賴于無數(shù)的實驗,其中大多數(shù)都失敗了。這些實驗是向前發(fā)展的必要條件。

Java 是在充滿自信,熱情和睿智的氛圍中創(chuàng)建的。在發(fā)明一種編程語言時,很容易感覺語言的初始可塑性會持續(xù)存在一樣,你可以把某些東西拿出來,如果不能解決問題,那么就修復(fù)它。編程語言以這種方式是獨一無二的 - 它們經(jīng)歷了類似水的改變:氣態(tài),液態(tài)和最終的固態(tài)。在氣態(tài)階段,靈活性似乎是無限的,并且很容易認(rèn)為它總是那樣。一旦人們開始使用你的語言,變化就會變得更加嚴(yán)重,環(huán)境變得更加粘稠。語言設(shè)計的過程本身就是一門藝術(shù)。

緊迫感來自互聯(lián)網(wǎng)的最初興起。它似乎是一場比賽,第一個通過起跑線的人將“獲勝”(事實上,Java,JavaScript 和 PHP 等語言的流行程度可以證明這一點)。唉,通過匆忙設(shè)計語言而產(chǎn)生的認(rèn)知負(fù)荷和技術(shù)債務(wù)最終會趕上我們。

Turing completeness 是不足夠的;語言需要更多的東西:它們必須能夠創(chuàng)造性地表達(dá),而不是用不必要的東西來衡量我們。解放我們的心理能力只是為了扭轉(zhuǎn)并再次陷入困境,這是毫無意義的。我承認(rèn),盡管存在這些問題,我們已經(jīng)完成了令人驚奇的事情,但我也知道如果沒有這些問題我們能做得更多。

熱情使原始 Java 設(shè)計師加入了一些似乎有必要的特性。信心(以及氣態(tài)的初始語言)讓他們認(rèn)為任何問題隨后都可以解決。在時間軸的某個地方,有人認(rèn)為任何加入 Java 的東西是固定的和永久性的 -他們非常有信心,并相信第一個決定永遠(yuǎn)是正確的,因此我們看到 Java 的體系中充斥著糟糕的決策。其中一些決定最終沒有什么后果;例如,你可以告訴人們不要使用 Vector,但只能在語言中繼續(xù)保留它以便對之前版本的支持。

線程包含在 Java 1.0 中。當(dāng)然,對 java 來說支持并發(fā)是一個很基本的設(shè)計決定,該特性影響了這個語言的各個角落,我們很難想象以后在以后的版本添加它。公平地說,當(dāng)時并不清楚基本的并發(fā)性是多少。像 C 這樣的其他語言能夠?qū)⒕€程視為一個附加功能,因此 Java 設(shè)計師也紛紛效仿,包括一個 Thread 類和必要的 JVM 支持(這比你想象的要復(fù)雜得多)。

C 語言是面向過程語言,這限制了它的野心。這些限制使附加線程庫合理。當(dāng)采用原始模型并將其粘貼到復(fù)雜語言中時,Java 的大規(guī)模擴展迅速暴露了基本問題。在 Thread 類中的許多方法的棄用以及后續(xù)的高級庫浪潮中,這種情況變得明顯,這些庫試圖提供更好的并發(fā)抽象。

不幸的是,為了在更高級別的語言中獲得并發(fā)性,所有語言功能都會受到影響,包括最基本的功能,例如標(biāo)識符代表可變值。在簡化并發(fā)編程中,所有函數(shù)和方法中為了保持事物不變和防止副作用都要做出巨大的改變(這些是純函數(shù)式編程語言的基礎(chǔ)),但當(dāng)時對于主流語言的創(chuàng)建者來說似乎是奇怪的想法。最初的 Java 設(shè)計師要么沒有意識到這些選擇,要么認(rèn)為它們太不同了,并且會勸退許多潛在的語言使用者。我們可以慷慨地說,語言設(shè)計社區(qū)當(dāng)時根本沒有足夠的經(jīng)驗來理解調(diào)整在線程庫中的影響。

Java 實驗告訴我們,結(jié)果是悄然災(zāi)難性的。程序員很容易陷入認(rèn)為 Java 線程并不那么困難的陷阱。表面上看起來正常工作的程序?qū)嶋H上充滿了微妙的并發(fā) bug。

為了獲得正確的并發(fā)性,語言功能必須從頭開始設(shè)計并考慮并發(fā)性。木已成舟;Java 將不再是為并發(fā)而設(shè)計的語言,而只是一種允許并發(fā)的語言。

盡管有這些基本的不可修復(fù)的缺陷,但令人印象深刻的是它已經(jīng)走了這么遠(yuǎn)。Java 的后續(xù)版本添加了庫,以便在使用并發(fā)時提升抽象級別。事實上,我根本不會想到有可能在 Java 8 中進行改進:并行流和 CompletableFutures - 這是驚人的史詩般的變化,我會驚奇地重復(fù)的查看它[^3]。

這些改進非常有用,我們將在本章重點介紹并行流和 CompletableFutures 。雖然它們可以大大簡化你對并發(fā)和后續(xù)代碼的思考方式,但基本問題仍然存在:由于 Java 的原始設(shè)計,代碼的所有部分仍然很脆弱,你仍然必須理解這些復(fù)雜和微妙的問題。Java 中的線程絕不是簡單或安全的;那種經(jīng)歷必須降級為另一種更新的語言。

本章其余部分

這是我們將在本章的其余部分介紹的內(nèi)容。請記住,本章的重點是使用最新的高級 Java 并發(fā)結(jié)構(gòu)。相比于舊的替代品,使用這些會使你的生活更加輕松。但是,你仍會在遺留代碼中遇到一些低級工具。有時,你可能會被迫自己使用其中的一些。附錄:并發(fā)底層原理 包含一些更原始的 Java 并發(fā)元素的介紹。

  • Parallel Streams(并行流)
    到目前為止,我已經(jīng)強調(diào)了 Java 8 Streams 提供的改進語法?,F(xiàn)在該語法(作為一個粉絲,我希望)會使你感到舒適,你可以獲得額外的好處:你可以通過簡單地將 parallel() 添加到表達(dá)式來并行化流。這是一種簡單,強大,坦率地說是利用多處理器的驚人方式

添加 parallel() 來提高速度似乎是微不足道的,但是,唉,它就像你剛剛在殘酷的真相 中學(xué)到的那樣簡單。我將演示并解釋一些盲目添加 parallel() 到 Stream 表達(dá)式的缺陷。

  • 創(chuàng)建和運行任務(wù)
    任務(wù)是一段可以獨立運行的代碼。為了解釋創(chuàng)建和運行任務(wù)的一些基礎(chǔ)知識,本節(jié)介紹了一種比并行流或 CompletableFutures 更簡單的機制:Executor。執(zhí)行者管理一些低級 Thread 對象(Java 中最原始的并發(fā)形式)。你創(chuàng)建一個任務(wù),然后將其交給 Executor 去運行。

有多種類型的 Executor 用于不同的目的。在這里,我們將展示規(guī)范形式,代表創(chuàng)建和運行任務(wù)的最簡單和最佳方法。

  • 終止長時間運行的任務(wù)
    任務(wù)獨立運行,因此需要一種機制來關(guān)閉它們。典型的方法使用了一個標(biāo)志,這引入了共享內(nèi)存的問題,我們將使用 Java 的“Atomic”庫來回避它。
  • Completable Futures
    當(dāng)你將衣服帶到干洗店時,他們會給你一張收據(jù)。你繼續(xù)完成其他任務(wù),當(dāng)你的衣服洗干凈時你可以把它取走。收據(jù)是你與干洗店在后臺執(zhí)行的任務(wù)的連接。這是 Java 5 中引入的 Future 的方法。

Future 比以前的方法更方便,但你仍然必須出現(xiàn)并用收據(jù)取出干洗,如果任務(wù)沒有完成你還需要等待。對于一系列操作,F(xiàn)utures 并沒有真正幫助那么多。

Java 8 CompletableFuture 是一個更好的解決方案:它允許你將操作鏈接在一起,因此你不必將代碼寫入接口排序操作。有了 CompletableFuture 完美的結(jié)合,就可以更容易地做出“采購原料,組合成分,烹飪食物,提供食物,收拾餐具,儲存餐具”等一系列鏈?zhǔn)讲僮鳌?/p>

  • 死鎖
    某些任務(wù)必須去等待 - 阻塞來獲得其他任務(wù)的結(jié)果。被阻止的任務(wù)有可能等待另一個被阻止的任務(wù),另一個被阻止的任務(wù)也在等待其他任務(wù),等等。如果被阻止的任務(wù)鏈循環(huán)到第一個,沒有人可以取得任何進展,你就會陷入死鎖。

如果在運行程序時沒有立即出現(xiàn)死鎖,則會出現(xiàn)最大的問題。你的系統(tǒng)可能容易出現(xiàn)死鎖,并且只會在某些條件下死鎖。程序可能在某個平臺上(例如在你的開發(fā)機器)運行正常,但是當(dāng)你將其部署到不同的硬件時會開始死鎖。

死鎖通常源于細(xì)微的編程錯誤;一系列無辜的決定,最終意外地創(chuàng)建了一個依賴循環(huán)。本節(jié)包含一個經(jīng)典示例,演示了死鎖的特性。

  • 努力,復(fù)雜,成本

我們將通過模擬創(chuàng)建披薩的過程完成本章,首先使用并行流實現(xiàn)它,然后是 CompletableFutures。這不僅僅是兩種方法的比較,更重要的是探索你應(yīng)該投入多少工作來使你的程序變得更快。

并行流

Java 8 流的一個顯著優(yōu)點是,在某些情況下,它們可以很容易地并行化。這來自庫的仔細(xì)設(shè)計,特別是流使用內(nèi)部迭代的方式 - 也就是說,它們控制著自己的迭代器。特別是,他們使用一種特殊的迭代器,稱為 Spliterator,它被限制為易于自動分割。我們只需要念 .parallel() 就會產(chǎn)生魔法般的結(jié)果,流中的所有內(nèi)容都作為一組并行任務(wù)運行。如果你的代碼是使用 Streams 編寫的,那么并行化以提高速度似乎是一種瑣事

例如,考慮來自 Streams 的 Prime.java。查找質(zhì)數(shù)可能是一個耗時的過程,我們可以看到該程序的計時:

// concurrent/ParallelPrime.java
import java.util.*;
import java.util.stream.*;
import static java.util.stream.LongStream.*;
import java.io.*;
import java.nio.file.*;
import onjava.Timer;

public class ParallelPrime {
    static final int COUNT = 100_000;
    public static boolean isPrime(long n){
        return rangeClosed(2, (long)Math.sqrt(n)).noneMatch(i -> n % i == 0);
        }
    public static void main(String[] args)
        throws IOException {
        Timer timer = new Timer();
        List<String> primes =
            iterate(2, i -> i + 1)
                .parallel()              // [1]
                .filter(ParallelPrime::isPrime)
                .limit(COUNT)
                .mapToObj(Long::toString)
                .collect(Collectors.toList());
        System.out.println(timer.duration());
        Files.write(Paths.get("primes.txt"), primes, StandardOpenOption.CREATE);
        }
    }

輸出結(jié)果:

    Output:
    1224

請注意,這不是微基準(zhǔn)測試,因為我們計時整個程序。我們將數(shù)據(jù)保存在磁盤上以防止編譯器過激的優(yōu)化;如果我們沒有對結(jié)果做任何事情,那么一個高級的編譯器可能會觀察到程序沒有意義并且終止了計算(這不太可能,但并非不可能)。請注意使用 nio2 庫編寫文件的簡單性(在文件 一章中有描述)。

當(dāng)我注釋掉[1] parallel() 行時,我的結(jié)果用時大約是 parallel() 的三倍。

并行流似乎是一個甜蜜的交易。你所需要做的就是將編程問題轉(zhuǎn)換為流,然后插入 parallel() 以加快速度。實際上,有時候這很容易。但遺憾的是,有許多陷阱。

  • parallel() 不是靈丹妙藥

作為對流和并行流的不確定性的探索,讓我們看一個看似簡單的問題:對增長的數(shù)字序列進行求和。事實證明有大量的方式去實現(xiàn)它,并且我將冒險用計時器將它們進行比較 - 我會盡量小心,但我承認(rèn)我可能會在計時代碼執(zhí)行時遇到許多基本陷阱之一。結(jié)果可能有一些缺陷(例如 JVM 沒有“熱身”),但我認(rèn)為它仍然提供了一些有用的指示。

我將從一個計時方法 timeTest() 開始,它采用 LongSupplier ,測量 getAsLong() 調(diào)用的長度,將結(jié)果與 checkValue 進行比較并顯示結(jié)果。

請注意,一切都必須嚴(yán)格使用 long ;我花了一些時間發(fā)現(xiàn)隱蔽的溢出,然后才意識到在重要的地方錯過了 long 。

所有關(guān)于時間和內(nèi)存的數(shù)字和討論都是指“我的機器”。你的經(jīng)歷可能會有所不同。

// concurrent/Summing.java
import java.util.stream.*;
import java.util.function.*;
import onjava.Timer;
public class Summing {
    static void timeTest(String id, long checkValue,    LongSupplier operation){
        System.out.print(id + ": ");
        Timer timer = new Timer();
        long result = operation.getAsLong();
        if(result == checkValue)
            System.out.println(timer.duration() + "ms");
        else
            System.out.format("result: %d%ncheckValue: %d%n", result, checkValue);
        }
    public static final int SZ = 100_000_000;
    // This even works:
    // public static final int SZ = 1_000_000_000;
    public static final long CHECK = (long)SZ * ((long)SZ + 1)/2; // Gauss's formula
    public static void main(String[] args){
        System.out.println(CHECK);
        timeTest("Sum Stream", CHECK, () ->
        LongStream.rangeClosed(0, SZ).sum());
        timeTest("Sum Stream Parallel", CHECK, () ->
        LongStream.rangeClosed(0, SZ).parallel().sum());
        timeTest("Sum Iterated", CHECK, () ->
        LongStream.iterate(0, i -> i + 1)
        .limit(SZ+1).sum());
        // Slower & runs out of memory above 1_000_000:
        // timeTest("Sum Iterated Parallel", CHECK, () ->
        //   LongStream.iterate(0, i -> i + 1)
        //     .parallel()
        //     .limit(SZ+1).sum());
    }
}

輸出結(jié)果:

5000000050000000
Sum Stream: 167ms
Sum Stream Parallel: 46ms
Sum Iterated: 284ms

CHECK 值是使用 Carl Friedrich Gauss(高斯)在 1700 年代后期還在上小學(xué)的時候創(chuàng)建的公式計算出來的.

main() 的第一個版本使用直接生成 Stream 并調(diào)用 sum() 的方法。我們看到流的好處在于即使 SZ 為十億(1_000_000_000)程序也可以很好地處理而沒有溢出(為了讓程序運行得快一點,我使用了較小的數(shù)字)。使用 parallel() 的基本范圍操作明顯更快。

如果使用 iterate() 來生成序列,則減速是相當(dāng)明顯的,可能是因為每次生成數(shù)字時都必須調(diào)用 lambda。但是如果我們嘗試并行化,當(dāng) SZ 超過一百萬時,結(jié)果不僅比非并行版本花費的時間更長,而且也會耗盡內(nèi)存(在某些機器上)。當(dāng)然,當(dāng)你可以使用 range() 時,你不會使用 iterate() ,但如果你生成的東西不是簡單的序列,你必須使用 iterate() 。應(yīng)用 parallel() 是一個合理的嘗試,但會產(chǎn)生令人驚訝的結(jié)果。我們將在后面的部分中探討內(nèi)存限制的原因,但我們可以對流并行算法進行初步觀察:

  • 流并行性將輸入數(shù)據(jù)分成多個部分,因此算法可以應(yīng)用于那些單獨的部分。
  • 數(shù)組分割成本低,分割均勻且對分割的大小有著完美的掌控。
  • 鏈表沒有這些屬性;“拆分”一個鏈表僅僅意味著把它分成“第一元素”和“其余元素”,這相對無用。
  • 無狀態(tài)生成器的行為類似于數(shù)組;上面使用的 range() 就是無狀態(tài)的。
  • 迭代生成器的行為類似于鏈表; iterate() 是一個迭代生成器。

現(xiàn)在讓我們嘗試通過在數(shù)組中填充值并對數(shù)組求和來解決問題。因為數(shù)組只分配了一次,所以我們不太可能遇到垃圾收集時序問題。

首先我們將嘗試一個充滿原始 long 的數(shù)組:

// concurrent/Summing2.java
// {ExcludeFromTravisCI}import java.util.*;
public class Summing2 {
    static long basicSum(long[] ia) {
        long sum = 0;
        int size = ia.length;
        for(int i = 0; i < size; i++)
            sum += ia[i];return sum;
    }
    // Approximate largest value of SZ before
    // running out of memory on mymachine:
    public static final int SZ = 20_000_000;
    public static final long CHECK = (long)SZ * ((long)SZ + 1)/2;
    public static void main(String[] args) {
        System.out.println(CHECK);
        long[] la = newlong[SZ+1];
        Arrays.parallelSetAll(la, i -> i);
        Summing.timeTest("Array Stream Sum", CHECK, () ->
        Arrays.stream(la).sum());
        Summing.timeTest("Parallel", CHECK, () ->
        Arrays.stream(la).parallel().sum());
        Summing.timeTest("Basic Sum", CHECK, () ->
        basicSum(la));// Destructive summation:
        Summing.timeTest("parallelPrefix", CHECK, () -> {
            Arrays.parallelPrefix(la, Long::sum);
        return la[la.length - 1];
        });
    }
}

輸出結(jié)果:

200000010000000
Array Stream
Sum: 104ms
Parallel: 81ms
Basic Sum: 106ms
parallelPrefix: 265ms

第一個限制是內(nèi)存大??;因為數(shù)組是預(yù)先分配的,所以我們不能創(chuàng)建幾乎與以前版本一樣大的任何東西。并行化可以加快速度,甚至比使用 basicSum() 循環(huán)更快。有趣的是, Arrays.parallelPrefix() 似乎實際上減慢了速度。但是,這些技術(shù)中的任何一種在其他條件下都可能更有用 - 這就是為什么你不能做出任何確定性的聲明,除了“你必須嘗試一下”。

最后,考慮使用包裝類 Long 的效果:

// concurrent/Summing3.java
// {ExcludeFromTravisCI}
import java.util.*;
public class Summing3 {
    static long basicSum(Long[] ia) {
        long sum = 0;
        int size = ia.length;
        for(int i = 0; i < size; i++)
            sum += ia[i];
            return sum;
    }
    // Approximate largest value of SZ before
    // running out of memory on my machine:
    public static final int SZ = 10_000_000;
    public static final long CHECK = (long)SZ * ((long)SZ + 1)/2;
    public static void main(String[] args) {
        System.out.println(CHECK);
        Long[] aL = newLong[SZ+1];
        Arrays.parallelSetAll(aL, i -> (long)i);
        Summing.timeTest("Long Array Stream Reduce", CHECK, () ->
        Arrays.stream(aL).reduce(0L, Long::sum));
        Summing.timeTest("Long Basic Sum", CHECK, () ->
        basicSum(aL));
        // Destructive summation:
        Summing.timeTest("Long parallelPrefix",CHECK, ()-> {
            Arrays.parallelPrefix(aL, Long::sum);
            return aL[aL.length - 1];
            });
    }
}

輸出結(jié)果:

50000005000000
Long Array
Stream Reduce: 1038ms
Long Basic
Sum: 21ms
Long parallelPrefix: 3616ms

現(xiàn)在可用的內(nèi)存量大約減半,并且所有情況下所需的時間都會很長,除了 basicSum() ,它只是循環(huán)遍歷數(shù)組。令人驚訝的是, Arrays.parallelPrefix() 比任何其他方法都要花費更長的時間。

我將 parallel() 版本分開了,因為在上面的程序中運行它導(dǎo)致了一個冗長的垃圾收集,扭曲了結(jié)果:

// concurrent/Summing4.java
// {ExcludeFromTravisCI}
import java.util.*;
public class Summing4 {
    public static void main(String[] args) {
        System.out.println(Summing3.CHECK);
        Long[] aL = newLong[Summing3.SZ+1];
        Arrays.parallelSetAll(aL, i -> (long)i);
        Summing.timeTest("Long Parallel",
        Summing3.CHECK, () ->
        Arrays.stream(aL)
        .parallel()
        .reduce(0L,Long::sum));
    }
}

輸出結(jié)果:

50000005000000
Long Parallel: 1014ms

它比非 parallel() 版本略快,但并不顯著。

導(dǎo)致時間增加的一個重要原因是處理器內(nèi)存緩存。使用 Summing2.java 中的原始 long ,數(shù)組 la 是連續(xù)的內(nèi)存。處理器可以更容易地預(yù)測該陣列的使用,并使緩存充滿下一個需要的陣列元素。訪問緩存比訪問主內(nèi)存快得多。似乎 Long parallelPrefix 計算受到影響,因為它為每個計算讀取兩個數(shù)組元素,并將結(jié)果寫回到數(shù)組中,并且每個都為 Long 生成一個超出緩存的引用。

使用 Summing3.javaSumming4.java ,aL 是一個 Long 數(shù)組,它不是一個連續(xù)的數(shù)據(jù)數(shù)組,而是一個連續(xù)的 Long 對象引用數(shù)組。盡管該數(shù)組可能會在緩存中出現(xiàn),但指向的對象幾乎總是不在緩存中。

這些示例使用不同的 SZ 值來顯示內(nèi)存限制。

為了進行時間比較,以下是 SZ 設(shè)置為最小值 1000 萬的結(jié)果:

Sum Stream: 69msSum
Stream Parallel: 18msSum
Iterated: 277ms
Array Stream Sum: 57ms
Parallel: 14ms
Basic Sum: 16ms
parallelPrefix: 28ms
Long Array Stream Reduce: 1046ms
Long Basic Sum: 21ms
Long parallelPrefix: 3287ms
Long Parallel: 1008ms

雖然 Java 8 的各種內(nèi)置“并行”工具非常棒,但我認(rèn)為它們被視為神奇的靈丹妙藥:“只需添加 parallel() 并且它會更快!” 我希望我已經(jīng)開始表明情況并非所有都是如此,并且盲目地應(yīng)用內(nèi)置的“并行”操作有時甚至?xí)惯\行速度明顯變慢。

  • parallel()/limit() 交點

使用 parallel() 時會有更復(fù)雜的問題。從其他語言中吸取的流機制被設(shè)計為大約是一個無限的流模型。如果你擁有有限數(shù)量的元素,則可以使用集合以及為有限大小的集合設(shè)計的關(guān)聯(lián)算法。如果你使用無限流,則使用針對流優(yōu)化的算法。

Java 8 將兩者合并起來。例如,Collections 沒有內(nèi)置的 map() 操作。在 CollectionMap 中唯一類似流的批處理操作是 forEach() 。如果要執(zhí)行 map()reduce() 等操作,必須首先將 Collection 轉(zhuǎn)換為存在這些操作的 Stream :

// concurrent/CollectionIntoStream.java
import onjava.*;
import java.util.*;
import java.util.stream.*;
public class CollectionIntoStream {
    public static void main(String[] args) {
    List<String> strings = Stream.generate(new Rand.String(5))
    .limit(10)
    .collect(Collectors.toList());
    strings.forEach(System.out::println);
    // Convert to a Stream for many more options:
    String result = strings.stream()
    .map(String::toUpperCase)
    .map(s -> s.substring(2))
    .reduce(":", (s1, s2) -> s1 + s2);
    System.out.println(result);
    }
}

輸出結(jié)果:

btpen
pccux
szgvg
meinn
eeloz
tdvew
cippc
ygpoa
lkljl
bynxt
:PENCUXGVGINNLOZVEWPPCPOALJLNXT

Collection 確實有一些批處理操作,如 removeAll() ,removeIf()retainAll() ,但這些都是破壞性的操作。ConcurrentHashMapforEachreduce 操作有特別廣泛的支持。

在許多情況下,只在集合上調(diào)用 stream() 或者 parallelStream() 沒有問題。但是,有時將 StreamCollection 混合會產(chǎn)生意想不到的結(jié)果。這是一個有趣的難題:

// concurrent/ParallelStreamPuzzle.java
import java.util.*;
import java.util.function.*;
import java.util.stream.*;
public class ParallelStreamPuzzle {
    static class IntGenerator
    implements Supplier<Integer> {
        private int current = 0;
        @Override
        public Integer get() {
            return current++;
        }
    }
    public static void main(String[] args) {
        List<Integer> x = Stream.generate(new IntGenerator())
        .limit(10)
        .parallel()  // [1]
        .collect(Collectors.toList());
        System.out.println(x);
    }
}
/* Output:
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
*/

如果[1] 注釋運行它,它會產(chǎn)生預(yù)期的:
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
每次。但是包含了 parallel(),它看起來像一個隨機數(shù)生成器,帶有輸出(從一次運行到下一次運行不同),如:
[0, 3, 6, 8, 11, 14, 17, 20, 23, 26]
這樣一個簡單的程序怎么會如此糟糕呢?讓我們考慮一下我們在這里要實現(xiàn)的目標(biāo):“并行生成?!蹦且馕吨裁矗恳欢丫€程都在從一個生成器取值,然后以某種方式選擇有限的結(jié)果集?代碼看起來很簡單,但它變成了一個特別棘手的問題。

為了看到它,我們將添加一些儀器。由于我們正在處理線程,因此我們必須將任何跟蹤信息捕獲到并發(fā)數(shù)據(jù)結(jié)構(gòu)中。在這里我使用 ConcurrentLinkedDeque

// concurrent/ParallelStreamPuzzle2.java
import java.util.*;
import java.util.function.*;
import java.util.stream.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
import java.nio.file.*;
public class ParallelStreamPuzzle2 {
    public static final Deque<String> TRACE =
    new ConcurrentLinkedDeque<>();
    static class
    IntGenerator implements Supplier<Integer> {
        private AtomicInteger current =
        new AtomicInteger();
        @Override
        public Integer get() {
            TRACE.add(current.get() + ": " +Thread.currentThread().getName());
            return current.getAndIncrement();
        }
    }
    public static void main(String[] args) throws Exception {
    List<Integer> x = Stream.generate(newIntGenerator())
    .limit(10)
    .parallel()
    .collect(Collectors.toList());
    System.out.println(x);
    Files.write(Paths.get("PSP2.txt"), TRACE);
    }
}

輸出結(jié)果:

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

current 是使用線程安全的 AtomicInteger 類定義的,可以防止競爭條件;parallel() 允許多個線程調(diào)用 get() 。

在查看 PSP2.txt .IntGenerator.get() 被調(diào)用 1024 次時,你可能會感到驚訝。

0: main
1: ForkJoinPool.commonPool-worker-1
2: ForkJoinPool.commonPool-worker-2
3: ForkJoinPool.commonPool-worker-2
4: ForkJoinPool.commonPool-worker-1
5: ForkJoinPool.commonPool-worker-1
6: ForkJoinPool.commonPool-worker-1
7: ForkJoinPool.commonPool-worker-1
8: ForkJoinPool.commonPool-worker-4
9: ForkJoinPool.commonPool-worker-4
10: ForkJoinPool.commonPool-worker-4
11: main
12: main
13: main
14: main
15: main...10
17: ForkJoinPool.commonPool-worker-110
18: ForkJoinPool.commonPool-worker-610
19: ForkJoinPool.commonPool-worker-610
20: ForkJoinPool.commonPool-worker-110
21: ForkJoinPool.commonPool-worker-110
22: ForkJoinPool.commonPool-worker-110
23: ForkJoinPool.commonPool-worker-1

這個塊大小似乎是內(nèi)部實現(xiàn)的一部分(嘗試使用limit() 的不同參數(shù)來查看不同的塊大小)。將parallel()limit()結(jié)合使用可以預(yù)取一串值,作為流輸出。

試著想象一下這里發(fā)生了什么:一個流抽象出無限序列,按需生成。當(dāng)你要求它并行產(chǎn)生流時,你要求所有這些線程盡可能地調(diào)用get()。添加limit(),你說“只需要這些?!被旧?,當(dāng)你為了隨機輸出而選擇將parallel()limit()結(jié)合使用時,這種方法可能對你正在解決的問題有效。但是當(dāng)你這樣做時,你必須明白。這是一個僅限專家的功能,而不是要爭辯說“Java 弄錯了”。

什么是更合理的方法來解決問題?好吧,如果你想生成一個 int 流,你可以使用 IntStream.range(),如下所示:

// concurrent/ParallelStreamPuzzle3.java
// {VisuallyInspectOutput}
import java.util.*;
import java.util.stream.*;
public class ParallelStreamPuzzle3 {
    public static void main(String[] args) {
    List<Integer> x = IntStream.range(0, 30)
        .peek(e -> System.out.println(e + ": " +Thread.currentThread()
        .getName()))
        .limit(10)
        .parallel()
        .boxed()
        .collect(Collectors.toList());
        System.out.println(x);
    }
}

輸出結(jié)果:

8: main
6: ForkJoinPool.commonPool-worker-5
3: ForkJoinPool.commonPool-worker-7
5: ForkJoinPool.commonPool-worker-5
1: ForkJoinPool.commonPool-worker-3
2: ForkJoinPool.commonPool-worker-6
4: ForkJoinPool.commonPool-worker-1
0: ForkJoinPool.commonPool-worker-4
7: ForkJoinPool.commonPool-worker-1
9: ForkJoinPool.commonPool-worker-2
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

為了表明 parallel() 確實有效,我添加了一個對 peek() 的調(diào)用,這是一個主要用于調(diào)試的流函數(shù):它從流中提取一個值并執(zhí)行某些操作但不影響從流向下傳遞的元素。注意這會干擾線程行為,但我只是嘗試在這里做一些事情,而不是實際調(diào)試任何東西。

你還可以看到 boxed() 的添加,它接受 int 流并將其轉(zhuǎn)換為 Integer 流。

現(xiàn)在我們得到多個線程產(chǎn)生不同的值,但它只產(chǎn)生 10 個請求的值,而不是 1024 個產(chǎn)生 10 個值。

它更快嗎?一個更好的問題是:什么時候開始有意義?當(dāng)然不是這么小的一套;上下文切換的代價遠(yuǎn)遠(yuǎn)超過并行性的任何加速。很難想象什么時候用并行生成一個簡單的數(shù)字序列會有意義。如果你要生成的東西需要很高的成本,它可能有意義 - 但這都是猜測。只有通過測試我們才能知道用并行是否有效。記住這句格言:“首先使它工作,然后使它更快地工作 - 只有當(dāng)你必須這樣做時?!睂?parallel()limit() 結(jié)合使用僅供專家操作(把話說在前面,我不認(rèn)為自己是這里的專家)。

  • 并行流只看起來很容易

實際上,在許多情況下,并行流確實可以毫不費力地更快地產(chǎn)生結(jié)果。但正如你所見,僅僅將 parallel() 加到你的 Stream 操作上并不一定是安全的事情。在使用 parallel() 之前,你必須了解并行性如何幫助或損害你的操作。一個基本認(rèn)知錯誤就是認(rèn)為使用并行性總是一個好主意。事實上并不是。Stream 意味著你不需要重寫所有代碼便可以并行運行它。但是流的出現(xiàn)并不意味著你可以不用理解并行的原理以及不用考慮并行是否真的有助于實現(xiàn)你的目標(biāo)。

創(chuàng)建和運行任務(wù)

如果無法通過并行流實現(xiàn)并發(fā),則必須創(chuàng)建并運行自己的任務(wù)。稍后你將看到運行任務(wù)的理想 Java 8 方法是 CompletableFuture,但我們將使用更基本的工具介紹概念。

Java 并發(fā)的歷史始于非常原始和有問題的機制,并且充滿了各種嘗試的改進。這些主要歸入附錄:低級并發(fā) (Appendix: Low-Level Concurrency)。在這里,我們將展示一個規(guī)范形式,表示創(chuàng)建和運行任務(wù)的最簡單,最好的方法。與并發(fā)中的所有內(nèi)容一樣,存在各種變體,但這些變體要么降級到該附錄,要么超出本書的范圍。

  • Tasks and Executors

在 Java 的早期版本中,你通過直接創(chuàng)建自己的 Thread 對象來使用線程,甚至將它們子類化以創(chuàng)建你自己的特定“任務(wù)線程”對象。你手動調(diào)用了構(gòu)造函數(shù)并自己啟動了線程。

創(chuàng)建所有這些線程的開銷變得非常重要,現(xiàn)在不鼓勵采用手動操作方法。在 Java 5 中,添加了類來為你處理線程池。你可以將任務(wù)創(chuàng)建為單獨的類型,然后將其交給 ExecutorService 以運行該任務(wù),而不是為每種不同類型的任務(wù)創(chuàng)建新的 Thread 子類型。ExecutorService 為你管理線程,并且在運行任務(wù)后重新循環(huán)線程而不是丟棄線程。

首先,我們將創(chuàng)建一個幾乎不執(zhí)行任務(wù)的任務(wù)。它“sleep”(暫停執(zhí)行)100 毫秒,顯示其標(biāo)識符和正在執(zhí)行任務(wù)的線程的名稱,然后完成:

// concurrent/NapTask.java
import onjava.Nap;
public class NapTask implements Runnable {
    final int id;
    public NapTask(int id) {
        this.id = id;
        }
    @Override
    public void run() {
        new Nap(0.1);// Seconds
        System.out.println(this + " "+
            Thread.currentThread().getName());
        }
    @Override
    public String toString() {
        return"NapTask[" + id + "]";
    }
}

這只是一個 Runnable :一個包含 run() 方法的類。它沒有包含實際運行任務(wù)的機制。我們使用 Nap 類中的“sleep”:

// onjava/Nap.java
package onjava;
import java.util.concurrent.*;
public class Nap {
    public Nap(double t) { // Seconds
        try {
            TimeUnit.MILLISECONDS.sleep((int)(1000 * t));
        } catch(InterruptedException e){
            throw new RuntimeException(e);
        }
    }
    public Nap(double t, String msg) {
        this(t);
        System.out.println(msg);
    }
}

為了消除異常處理的視覺干擾,這被定義為實用程序。第二個構(gòu)造函數(shù)在超時時顯示一條消息

TimeUnit.MILLISECONDS.sleep() 的調(diào)用獲取“當(dāng)前線程”并在參數(shù)中將其置于休眠狀態(tài),這意味著該線程被掛起。這并不意味著底層處理器停止。操作系統(tǒng)將其切換到其他任務(wù),例如在你的計算機上運行另一個窗口。OS 任務(wù)管理器定期檢查 sleep() 是否超時。當(dāng)它執(zhí)行時,線程被“喚醒”并給予更多處理時間。

你可以看到 sleep() 拋出一個受檢的 InterruptedException ;這是原始 Java 設(shè)計中的一個工件,它通過突然斷開它們來終止任務(wù)。因為它往往會產(chǎn)生不穩(wěn)定的狀態(tài),所以后來不鼓勵終止。但是,我們必須在需要或仍然發(fā)生終止的情況下捕獲異常。

要執(zhí)行任務(wù),我們將從最簡單的方法--SingleThreadExecutor 開始:

//concurrent/SingleThreadExecutor.java
import java.util.concurrent.*;
import java.util.stream.*;
import onjava.*;
public class SingleThreadExecutor {
    public static void main(String[] args) {
        ExecutorService exec =
            Executors.newSingleThreadExecutor();
        IntStream.range(0, 10)
            .mapToObj(NapTask::new)
            .forEach(exec::execute);
        System.out.println("All tasks submitted");
        exec.shutdown();
        while(!exec.isTerminated()) {
            System.out.println(
            Thread.currentThread().getName()+
            " awaiting termination");
            new Nap(0.1);
        }
    }
}

輸出結(jié)果:

All tasks submitted
main awaiting termination
main awaiting termination
NapTask[0] pool-1-thread-1
main awaiting termination
NapTask[1] pool-1-thread-1
main awaiting termination
NapTask[2] pool-1-thread-1
main awaiting termination
NapTask[3] pool-1-thread-1
main awaiting termination
NapTask[4] pool-1-thread-1
main awaiting termination
NapTask[5] pool-1-thread-1
main awaiting termination
NapTask[6] pool-1-thread-1
main awaiting termination
NapTask[7] pool-1-thread-1
main awaiting termination
NapTask[8] pool-1-thread-1
main awaiting termination
NapTask[9] pool-1-thread-1

首先請注意,沒有 SingleThreadExecutor 類。newSingleThreadExecutor()Executors 中的一個工廠方法,它創(chuàng)建特定類型的 ExecutorService [^4]

我創(chuàng)建了十個 NapTasks 并將它們提交給 ExecutorService,這意味著它們開始自己運行。然而,在此期間,main() 繼續(xù)做事。當(dāng)我運行 callexec.shutdown() 時,它告訴 ExecutorService 完成已經(jīng)提交的任務(wù),但不接受任何新任務(wù)。此時,這些任務(wù)仍然在運行,因此我們必須等到它們在退出 main() 之前完成。這是通過檢查 exec.isTerminated() 來實現(xiàn)的,這在所有任務(wù)完成后變?yōu)?true。

請注意,main() 中線程的名稱是 main,并且只有一個其他線程 pool-1-thread-1。此外,交錯輸出顯示兩個線程確實同時運行。

如果你只是調(diào)用 exec.shutdown(),程序?qū)⑼瓿伤腥蝿?wù)。也就是說,不需要 while(!exec.isTerminated())

// concurrent/SingleThreadExecutor2.java
import java.util.concurrent.*;
import java.util.stream.*;
public class SingleThreadExecutor2 {
    public static void main(String[] args)throws InterruptedException {
        ExecutorService exec
        =Executors.newSingleThreadExecutor();
        IntStream.range(0, 10)
            .mapToObj(NapTask::new)
            .forEach(exec::execute);
        exec.shutdown();
    }
}

輸出結(jié)果:

NapTask[0] pool-1-thread-1
NapTask[1] pool-1-thread-1
NapTask[2] pool-1-thread-1
NapTask[3] pool-1-thread-1
NapTask[4] pool-1-thread-1
NapTask[5] pool-1-thread-1
NapTask[6] pool-1-thread-1
NapTask[7] pool-1-thread-1
NapTask[8] pool-1-thread-1
NapTask[9] pool-1-thread-1

一旦你調(diào)用了 exec.shutdown(),嘗試提交新任務(wù)將拋出 RejectedExecutionException。

// concurrent/MoreTasksAfterShutdown.java
import java.util.concurrent.*;
public class MoreTasksAfterShutdown {
    public static void main(String[] args) {
        ExecutorService exec
        =Executors.newSingleThreadExecutor();
        exec.execute(newNapTask(1));
        exec.shutdown();
        try {
            exec.execute(newNapTask(99));
        } catch(RejectedExecutionException e) {
            System.out.println(e);
        }
    }
}

輸出結(jié)果:

java.util.concurrent.RejectedExecutionException: TaskNapTask[99] rejected from java.util.concurrent.ThreadPoolExecutor@4e25154f[Shutting down, pool size = 1,active threads = 1, queued tasks = 0, completed tasks =0]NapTask[1] pool-1-thread-1

exec.shutdown() 的替代方法是 exec.shutdownNow() ,它除了不接受新任務(wù)外,還會嘗試通過中斷任務(wù)來停止任何當(dāng)前正在運行的任務(wù)。同樣,中斷是錯誤的,容易出錯并且不鼓勵。

  • 使用更多線程

使用線程的重點是(幾乎總是)更快地完成任務(wù),那么我們?yōu)槭裁匆拗谱约菏褂?SingleThreadExecutor 呢?查看執(zhí)行 Executors 的 Javadoc,你將看到更多選項。例如 CachedThreadPool:

// concurrent/CachedThreadPool.java
import java.util.concurrent.*;
import java.util.stream.*;
public class CachedThreadPool {
    public static void main(String[] args) {
        ExecutorService exec
        =Executors.newCachedThreadPool();
        IntStream.range(0, 10)
        .mapToObj(NapTask::new)
        .forEach(exec::execute);
        exec.shutdown();
    }
}

輸出結(jié)果:

NapTask[7] pool-1-thread-8
NapTask[4] pool-1-thread-5
NapTask[1] pool-1-thread-2
NapTask[3] pool-1-thread-4
NapTask[0] pool-1-thread-1
NapTask[8] pool-1-thread-9
NapTask[2] pool-1-thread-3
NapTask[9] pool-1-thread-10
NapTask[6] pool-1-thread-7
NapTask[5] pool-1-thread-6

當(dāng)你運行這個程序時,你會發(fā)現(xiàn)它完成得更快。這是有道理的,每個任務(wù)都有自己的線程,所以它們都并行運行,而不是使用相同的線程來順序運行每個任務(wù)。這似乎沒毛病,很難理解為什么有人會使用 SingleThreadExecutor。

要理解這個問題,我們需要一個更復(fù)雜的任務(wù):

// concurrent/InterferingTask.java
public class InterferingTask implements Runnable {
    final int id;
    private static Integer val = 0;
    public InterferingTask(int id) {
        this.id = id;
    }
    @Override
    public void run() {
        for(int i = 0; i < 100; i++)
        val++;
    System.out.println(id + " "+
        Thread.currentThread().getName() + " " + val);
    }
}

每個任務(wù)增加 val 一百次。這似乎很簡單。讓我們用 CachedThreadPool 嘗試一下:

// concurrent/CachedThreadPool2.java
import java.util.concurrent.*;
import java.util.stream.*;
public class CachedThreadPool2 {
    public static void main(String[] args) {
    ExecutorService exec
    =Executors.newCachedThreadPool();
    IntStream.range(0, 10)
    .mapToObj(InterferingTask::new)
    .forEach(exec::execute);
    exec.shutdown();
    }
}

輸出結(jié)果:

0 pool-1-thread-1 200
1 pool-1-thread-2 200
4 pool-1-thread-5 300
5 pool-1-thread-6 400
8 pool-1-thread-9 500
9 pool-1-thread-10 600
2 pool-1-thread-3 700
7 pool-1-thread-8 800
3 pool-1-thread-4 900
6 pool-1-thread-7 1000

輸出不是我們所期望的,并且從一次運行到下一次運行會有所不同。問題是所有的任務(wù)都試圖寫入 val 的單個實例,并且他們正在踩著彼此的腳趾。我們稱這樣的類是線程不安全的。讓我們看看 SingleThreadExecutor 會發(fā)生什么:

// concurrent/SingleThreadExecutor3.java
import java.util.concurrent.*;
import java.util.stream.*;
public class SingleThreadExecutor3 {
    public static void main(String[] args)throws InterruptedException {
        ExecutorService exec
        =Executors.newSingleThreadExecutor();
        IntStream.range(0, 10)
        .mapToObj(InterferingTask::new)
        .forEach(exec::execute);
        exec.shutdown();
    }
}

輸出結(jié)果:

0 pool-1-thread-1 100
1 pool-1-thread-1 200
2 pool-1-thread-1 300
3 pool-1-thread-1 400
4 pool-1-thread-1 500
5 pool-1-thread-1 600
6 pool-1-thread-1 700
7 pool-1-thread-1 800
8 pool-1-thread-1 900
9 pool-1-thread-1 1000

現(xiàn)在我們每次都得到一致的結(jié)果,盡管 InterferingTask 缺乏線程安全性。這是 SingleThreadExecutor 的主要好處 - 因為它一次運行一個任務(wù),這些任務(wù)不會相互干擾,因此強加了線程安全性。這種現(xiàn)象稱為線程封閉,因為在單線程上運行任務(wù)限制了它們的影響。線程封閉限制了加速,但可以節(jié)省很多困難的調(diào)試和重寫。

  • 產(chǎn)生結(jié)果

因為 InterferingTask 是一個 Runnable ,它沒有返回值,因此只能使用副作用產(chǎn)生結(jié)果 - 操縱緩沖值而不是返回結(jié)果。副作用是并發(fā)編程中的主要問題之一,因為我們看到了 CachedThreadPool2.javaInterferingTask 中的 val 被稱為可變共享狀態(tài),這就是問題所在:多個任務(wù)同時修改同一個變量會產(chǎn)生競爭。結(jié)果取決于首先在終點線上執(zhí)行哪個任務(wù),并修改變量(以及其他可能性的各種變化)。

避免競爭條件的最好方法是避免可變的共享狀態(tài)。我們可以稱之為自私的孩子原則:什么都不分享。

使用 InterferingTask ,最好刪除副作用并返回任務(wù)結(jié)果。為此,我們創(chuàng)建 Callable 而不是 Runnable

// concurrent/CountingTask.java
import java.util.concurrent.*;
public class CountingTask implements Callable<Integer> {
    final int id;
    public CountingTask(int id) { this.id = id; }
    @Override
    public Integer call() {
    Integer val = 0;
    for(int i = 0; i < 100; i++)
        val++;
    System.out.println(id + " " +
        Thread.currentThread().getName() + " " + val);
    return val;
    }
}

call() 完全獨立于所有其他 CountingTasks 生成其結(jié)果,這意味著沒有可變的共享狀態(tài)

ExecutorService 允許你使用 invokeAll() 啟動集合中的每個 Callable:

// concurrent/CachedThreadPool3.java
import java.util.*;
import java.util.concurrent.*;
import java.util.stream.*;
public class CachedThreadPool3 {
    public static Integer extractResult(Future<Integer> f) {
        try {
            return f.get();
        } catch(Exception e) {
            throw new RuntimeException(e);
        }
    }
    public static void main(String[] args)throws InterruptedException {
    ExecutorService exec =
    Executors.newCachedThreadPool();
    List<CountingTask> tasks =
        IntStream.range(0, 10)
            .mapToObj(CountingTask::new)
            .collect(Collectors.toList());
        List<Future<Integer>> futures =
            exec.invokeAll(tasks);
        Integer sum = futures.stream()
            .map(CachedThreadPool3::extractResult)
            .reduce(0, Integer::sum);
        System.out.println("sum = " + sum);
        exec.shutdown();
    }
}

輸出結(jié)果:

1 pool-1-thread-2 100
0 pool-1-thread-1 100
4 pool-1-thread-5 100
5 pool-1-thread-6 100
8 pool-1-thread-9 100
9 pool-1-thread-10 100
2 pool-1-thread-3 100
3 pool-1-thread-4 100
6 pool-1-thread-7 100
7 pool-1-thread-8 100
sum = 1000

只有在所有任務(wù)完成后,invokeAll() 才會返回一個 Future 列表,每個任務(wù)一個 Future 。Future 是 Java 5 中引入的機制,允許你提交任務(wù)而無需等待它完成。在這里,我們使用 ExecutorService.submit() :

// concurrent/Futures.java
import java.util.*;
import java.util.concurrent.*;
import java.util.stream.*;
public class Futures {
    public static void main(String[] args)throws InterruptedException, ExecutionException {
    ExecutorService exec
        =Executors.newSingleThreadExecutor();
    Future<Integer> f =
        exec.submit(newCountingTask(99));
    System.out.println(f.get()); // [1]
    exec.shutdown();
    }
}

輸出結(jié)果:

99 pool-1-thread-1 100
100
  • [1] 當(dāng)你的任務(wù)在尚未完成的 Future 上調(diào)用 get() 時,調(diào)用會阻塞(等待)直到結(jié)果可用。

但這意味著,在 CachedThreadPool3.java 中,Future 似乎是多余的,因為 invokeAll() 甚至在所有任務(wù)完成之前都不會返回。但是,這里的 Future 并不用于延遲結(jié)果,而是用于捕獲任何可能發(fā)生的異常。

還要注意在 CachedThreadPool3.java.get() 中拋出異常,因此 extractResult() 在 Stream 中執(zhí)行此提取。

因為當(dāng)你調(diào)用 get() 時,Future 會阻塞,所以它只能解決等待任務(wù)完成才暴露問題。最終,Futures 被認(rèn)為是一種無效的解決方案,現(xiàn)在不鼓勵,我們推薦 Java 8 的 CompletableFuture ,這將在本章后面探討。當(dāng)然,你仍會在遺留庫中遇到 Futures。

我們可以使用并行 Stream 以更簡單,更優(yōu)雅的方式解決這個問題:

// concurrent/CountingStream.java
// {VisuallyInspectOutput}
import java.util.*;
import java.util.concurrent.*;
import java.util.stream.*;
public class CountingStream {
    public static void main(String[] args) {
        System.out.println(
            IntStream.range(0, 10)
                .parallel()
                .mapToObj(CountingTask::new)
                .map(ct -> ct.call())
                .reduce(0, Integer::sum));
    }
}

輸出結(jié)果:

1 ForkJoinPool.commonPool-worker-3 100
8 ForkJoinPool.commonPool-worker-2 100
0 ForkJoinPool.commonPool-worker-6 100
2 ForkJoinPool.commonPool-worker-1 100
4 ForkJoinPool.commonPool-worker-5 100
9 ForkJoinPool.commonPool-worker-7 100
6 main 100
7 ForkJoinPool.commonPool-worker-4 100
5 ForkJoinPool.commonPool-worker-2 100
3 ForkJoinPool.commonPool-worker-3 100
1000

這不僅更容易理解,而且我們需要做的就是將 parallel() 插入到其他順序操作中,然后一切都在同時運行。

  • Lambda 和方法引用作為任務(wù)

java8 有了 lambdas 和方法引用,你不需要受限于只能使用 RunnableCallable 。因為 java8 的 lambdas 和方法引用可以通過匹配方法簽名來使用(即,它支持結(jié)構(gòu)一致性),所以我們可以將非 RunnableCallable 的參數(shù)傳遞給 ExecutorService :

// concurrent/LambdasAndMethodReferences.java
import java.util.concurrent.*;
class NotRunnable {
    public void go() {
        System.out.println("NotRunnable");
    }
}
class NotCallable {
    public Integer get() {
        System.out.println("NotCallable");
        return 1;
    }
}
public class LambdasAndMethodReferences {
    public static void main(String[] args)throws InterruptedException {
    ExecutorService exec =
        Executors.newCachedThreadPool();
    exec.submit(() -> System.out.println("Lambda1"));
    exec.submit(new NotRunnable()::go);
    exec.submit(() -> {
        System.out.println("Lambda2");
        return 1;
    });
    exec.submit(new NotCallable()::get);
    exec.shutdown();
    }
}

輸出結(jié)果:

Lambda1
NotCallable
NotRunnable
Lambda2

這里,前兩個 submit() 調(diào)用可以改為調(diào)用 execute() 。所有 submit() 調(diào)用都返回 Futures ,你可以在后兩次調(diào)用的情況下提取結(jié)果。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容