從一個簡單的Java單例示例剖析并發(fā)體系設(shè)計

一個簡單的單例示例

單例模式可能是大家經(jīng)常接觸和使用的一個設(shè)計模式,你可能會這么寫

publicclassUnsafeLazyInitiallization{

privatestaticUnsafeLazyInitiallizationinstance;

privateUnsafeLazyInitiallization(){

}

publicstaticUnsafeLazyInitiallizationgetInstance(){

if(instance==null){//1:A線程執(zhí)行

instance=newUnsafeLazyInitiallization();//2:B線程執(zhí)行

}

returninstance;

}

}

上面代碼大家應(yīng)該都知道,所謂的線程不安全的懶漢單例寫法。在UnsafeLazyInitiallization類中,假設(shè)A線程執(zhí)行代碼1的同時,B線程執(zhí)行代碼2,此時,線程A可能看到instance引用的對象還沒有初始化。

你可能會說,線程不安全,我可以對getInstance()方法做同步處理保證安全啊,比如下面這樣的寫法

publicclassSafeLazyInitiallization{

privatestaticSafeLazyInitiallizationinstance;

privateSafeLazyInitiallization(){

}

publicsynchronizedstaticSafeLazyInitiallizationgetInstance(){

if(instance==null){

instance=newSafeLazyInitiallization();

}

returninstance;

}

}

這樣的寫法是保證了線程安全,但是由于getInstance()方法做了同步處理,synchronized將導(dǎo)致性能開銷。如getInstance()方法被多個線程頻繁調(diào)用,將會導(dǎo)致程序執(zhí)行性能的下降。反之,如果getInstance()方法不會被多個線程頻繁的調(diào)用,那么這個方案將能夠提供令人滿意的性能。

那么,有沒有更優(yōu)雅的方案呢?前人的智慧是偉大的,在早期的JVM中,synchronized存在巨大的性能開銷,因此,人們想出了一個“聰明”的技巧——雙重檢查鎖定。人們通過雙重檢查鎖定來降低同步的開銷。下面來讓我們看看

publicclassDoubleCheckedLocking{//1

privatestaticDoubleCheckedLockinginstance;//2

privateDoubleCheckedLocking(){

}

publicstaticDoubleCheckedLockinggetInstance(){//3

if(instance==null){//4:第一次檢查

synchronized(DoubleCheckedLocking.class){//5:加鎖

if(instance==null)//6:第二次檢查

instance=newDoubleCheckedLocking();//7:問題的根源出在這里

}//8

}//9

returninstance;//10

}//11

}

如上面代碼所示,如果第一次檢查instance不為null,那么就不需要執(zhí)行下面的加鎖和初始化操作。因此,可以大幅降低synchronized帶來的性能開銷。雙重檢查鎖定看起來似乎很完美,但這是一個錯誤的優(yōu)化!為什么呢?在線程執(zhí)行到第4行,代碼讀取到instance不為null時,instance引用的對象有可能還沒有完成初始化。在第7行創(chuàng)建了一個對象,這行代碼可以分解為如下的3行偽代碼

memory=allocate();//1:分配對象的內(nèi)存空間

ctorInstance(memory);//2:初始化對象

instance=memory;//3:設(shè)置instance指向剛分配的內(nèi)存地址

上面3行代碼中的2和3之間,可能會被重排序(在一些JIT編譯器上,這種重排序是真實發(fā)生的,如果不了解重排序,后文JMM會詳細(xì)解釋)。2和3之間重排序之后的執(zhí)行時序如下

memory=allocate();//1:分配對象的內(nèi)存空間

instance=memory;//3:設(shè)置instance指向剛分配的內(nèi)存地址,注意此時對象還沒有被初始化

ctorInstance(memory);//2:初始化對象

回到示例代碼第7行,如果發(fā)生重排序,另一個并發(fā)執(zhí)行的線程B就有可能在第4行判斷instance不為null。線程B接下來將訪問instance所引用的對象,但此時這個對象可能還沒有被A線程初始化。在知曉問題發(fā)生的根源之后,我們可以想出兩個辦法解決

不允許2和3重排序

允許2和3重排序,但不允許其他線程“看到”這個重排序

下面就介紹這兩個解決方案的具體實現(xiàn)

基于volatile的解決方案

對于前面的基于雙重檢查鎖定的方案,只需要做一點小的修改,就可以實現(xiàn)線程安全的延遲初始化。請看下面的示例代碼

publicclassSafeDoubleCheckedLocking{

privatevolatilestaticSafeDoubleCheckedLockinginstance;

privateSafeDoubleCheckedLocking(){

}

publicstaticSafeDoubleCheckedLockinggetInstance(){

if(instance==null){

synchronized(SafeDoubleCheckedLocking.class){

if(instance==null)

instance=newSafeDoubleCheckedLocking();//instance為volatile,現(xiàn)在沒問題了

}

}

returninstance;

}

}

當(dāng)聲明對象的引用為volatile后,前面?zhèn)未a談到的2和3之間的重排序,在多線程環(huán)境中將會被禁止。

基于類初始化的解決方案

JVM在類的初始化階段(即在Class被加載后,且被線程使用之前),會執(zhí)行類的初始化。在執(zhí)行類的初始化期間,JVM會去獲取多個線程對同一個類的初始化?;谶@個特性,實現(xiàn)的示例代碼如下

publicclassInstanceFactory{

privateInstanceFactory(){

}

privatestaticclassInstanceHolder{

publicstaticInstanceFactoryinstance=newInstanceFactory();

}

publicstaticInstanceFactorygetInstance(){

returnInstanceHolder.instance;//這里將導(dǎo)致InstanceHolder類被初始化

}

}

這個方案的本質(zhì)是允許前面?zhèn)未a談到的2和3重排序,但不允許其他線程“看到”這個重排序。在InstanceFactory示例代碼中,首次執(zhí)行g(shù)etInstance()方法的線程將導(dǎo)致InstanceHolder類被初始化。由于Java語言是多線程的,多個線程可能在同一時間嘗試去初始化同一個類或接口(比如這里多個線程可能會在同一時刻調(diào)用getInstance()方法來初始化IInstanceHolder類)。Java語言規(guī)定,對于每一個類和接口C,都有一個唯一的初始化鎖LC與之對應(yīng)。從C到LC的映射,由JVM的具體實現(xiàn)去自由實現(xiàn)。JVM在類初始化期間會獲取這個初始化鎖,并且每個線程至少獲取一次鎖來確保這個類已經(jīng)被初始化過了。

JMM

也許你還存在疑問,前面談的重排序是什么鬼?為什么volatile在某方面就能禁止重排序?現(xiàn)在引出本文的另一個話題JMM(Java Memory Model——Java內(nèi)存模型)。什么是JMM呢?JMM是一個抽象概念,它并不存在。Java虛擬機規(guī)范中試圖定義一種Java內(nèi)存模型(JMM)來屏蔽掉各種硬件和操作系統(tǒng)的內(nèi)存訪問差異,以實現(xiàn)讓Java程序在各種平臺下都能達(dá)到一致的內(nèi)存訪問效果。在此之前,主流程序語言(如C/C++等)直接使用物理硬件和操作系統(tǒng)的內(nèi)存模型,因此,會由于不同平臺的內(nèi)存模型的差異,有可能導(dǎo)致程序在一套平臺上并發(fā)完全正常,而在另一套平臺上并發(fā)訪問卻經(jīng)常出錯,因此在某些場景就必須針對不同的平臺來編寫程序。

Java線程之間的通信由JMM來控制,JMM決定一個線程共享變量的寫入何時對另一個線程可見。JMM保證如果程序是正確同步的,那么程序的執(zhí)行將具有順序一致性。從抽象的角度看,JMM定義了線程和主內(nèi)存之間的抽象關(guān)系:線程之間的共享變量(實例域、靜態(tài)域和數(shù)據(jù)元素)存儲在主內(nèi)存(Main Memory)中,每個線程都有一個私有的本地內(nèi)存(Local Memory),本地內(nèi)存中存儲了該線程以讀/寫共享變量的副本(局部變量、方法定義參數(shù)和異常處理參數(shù)是不會在線程之間共享,它們存儲在線程的本地內(nèi)存中)。從物理角度上看,主內(nèi)存僅僅是虛擬機內(nèi)存的一部分,與物理硬件的主內(nèi)存名字一樣,兩者可以互相類比;而本地內(nèi)存,可與處理器高速緩存類比。Java內(nèi)存模型的抽象示意圖如圖所示


這里先介紹幾個基礎(chǔ)概念:8種操作指令、內(nèi)存屏障、順序一致性模型、as-if-serial、happens-before 、數(shù)據(jù)依賴性、 重排序。

8種操作指令

關(guān)于主內(nèi)存與本地內(nèi)存之間具體的交互協(xié)議,即一個變量如何從主內(nèi)存拷貝到本地內(nèi)存、如何從本地內(nèi)存同步回主內(nèi)存之類的實現(xiàn)細(xì)節(jié),JMM中定義了以下8種操作來完成,虛擬機實現(xiàn)時必須保證下面提及的每種操作都是原子的、不可再分的(對于double和long類型的遍歷來說,load、store、read和write操作在某些平臺上允許有例外):

lock(鎖定):作用于主內(nèi)存的變量,它把一個變量標(biāo)識為一條線程獨立的狀態(tài)。

unlock(解鎖):作用于主內(nèi)存的變量,它把一個處于鎖定狀態(tài)的變量釋放出來,釋放后的變量才可以被其他線程鎖定。

read(讀取):作用于主內(nèi)存的變量,它把一個變量的值從主內(nèi)存?zhèn)鬏數(shù)骄€程的本地內(nèi)存中,以便隨后的load動作使用。

load(載入):作用于本地內(nèi)存的變量,它把read操作從主內(nèi)存中得到變量值放入本地內(nèi)存的變量副本中。

use(使用):作用于本地內(nèi)存的變量,它把本地內(nèi)存中一個變量的值傳遞給執(zhí)行引擎,每當(dāng)虛擬機遇到一個需要使用到變量的值的字節(jié)碼指令時將會執(zhí)行這個操作。

assign(賦值):作用于本地內(nèi)存的變量,它把一個從執(zhí)行引擎接收到的值賦給本地內(nèi)存的變量,每當(dāng)虛擬機遇到一個給變量賦值的字節(jié)碼指令時執(zhí)行這個操作。

store(存儲):作用于本地內(nèi)存的變量,它把本地內(nèi)存中的一個變量的值傳送到主內(nèi)存中,以便隨后的write操作使用。

write(寫入):作用于主內(nèi)存的變量,它把store操作從本地內(nèi)存中提到的變量的值放入到主內(nèi)存的變量中。

如果要把一個變量從主內(nèi)存模型復(fù)制到本地內(nèi)存,那就要順序的執(zhí)行read和load操作,如果要把變量從本地內(nèi)存同步回主內(nèi)存,就要順序的執(zhí)行store和write操作。注意,Java內(nèi)存模型只要求上述兩個操作必須按順序執(zhí)行,而沒有保證是連續(xù)執(zhí)行。也就是說read與load之間、store與write之間是可插入其他指令的,如對主內(nèi)存中的變量a、b進(jìn)行訪問時,一種可能出現(xiàn)的順序是read a read b、load b、load a。

內(nèi)存屏障

內(nèi)存屏障是一組處理器指令(前面的8個操作指令),用于實現(xiàn)對內(nèi)存操作的順序限制。包括LoadLoad, LoadStore, StoreLoad, StoreStore共4種內(nèi)存屏障。內(nèi)存屏障存在的意義是什么呢?它是在Java編譯器生成指令序列的適當(dāng)位置插入內(nèi)存屏障指令來禁止特定類型的處理器重排序,從而讓程序按我們預(yù)想的流程去執(zhí)行,內(nèi)存屏障是與相應(yīng)的內(nèi)存重排序相對應(yīng)的。JMM把內(nèi)存屏障指令分為4類


StoreLoad Barriers是一個“全能型 ”的屏障,它同時具有其他3個屏障的效果?,F(xiàn)在的多數(shù)處理器大多支持該屏障(其他類型的屏障不一定被所有處理器支持)。執(zhí)行該屏障開銷會很昂貴,因為當(dāng)前處理器通常要把寫緩沖區(qū)中的數(shù)據(jù)全部刷新到內(nèi)存中。

數(shù)據(jù)依賴性

如果兩個操作訪問同一個變量,且這兩個操作中有一個為寫操作,此時這兩個操作之間就存在數(shù)據(jù)依賴性。數(shù)據(jù)依賴性分3種類型:寫后讀、寫后寫、讀后寫。這3種情況,只要重排序兩個操作的執(zhí)行順序,程序的執(zhí)行結(jié)果就會被改變。編譯器和處理器可能對操作進(jìn)行重排序。而它們進(jìn)行重排序時,會遵守數(shù)據(jù)依賴性,不會改變數(shù)據(jù)依賴關(guān)系的兩個操作的執(zhí)行順序。

這里所說的數(shù)據(jù)依賴性僅針對單個處理器中執(zhí)行的指令序列和單個線程中執(zhí)行的操作,不同處理器之間和不同線程之間的數(shù)據(jù)依賴性不被編譯器和處理器考慮。

順序一致性內(nèi)存模型

順序一致性內(nèi)存模型是一個理論參考模型,在設(shè)計的時候,處理器的內(nèi)存模型和編程語言的內(nèi)存模型都會以順序一致性內(nèi)存模型作為參照。它有兩個特性:

一個線程中的所有操作必須按照程序的順序來執(zhí)行

(不管程序是否同步)所有線程都只能看到一個單一的操作執(zhí)行順序。在順序一致性的內(nèi)存模型中,每個操作必須原子執(zhí)行并且立刻對所有線程可見。

從順序一致性模型中,我們可以知道程序所有操作完全按照程序的順序串行執(zhí)行。而在JMM中,臨界區(qū)內(nèi)的代碼可以重排序(但JMM不允許臨界區(qū)內(nèi)的代碼“逸出”到臨界區(qū)外,那樣就破壞監(jiān)視器的語義)。JMM會在退出臨界區(qū)和進(jìn)入臨界區(qū)這兩個關(guān)鍵時間點做一些特別處理,使得線程在這兩個時間點具有與順序一致性模型相同的內(nèi)存視圖。雖然線程A在臨界區(qū)內(nèi)做了重排序,但由于監(jiān)視器互斥執(zhí)行的特性,這里的線程B根本無法“觀察”到線程A在臨界區(qū)內(nèi)的重排序。這種重排序既提高了執(zhí)行效率,又沒有改變程序的執(zhí)行結(jié)果。像前面單例示例的類初始化解決方案就是采用了這個思想。

as-if-serial

as-if-serial的意思是不管怎么重排序,(單線程)程序的執(zhí)行結(jié)果不能改變。編譯器、runtime和處理器都必須遵守as-if-serial語義。為了遵守as-if-serial語義,編譯器和處理器不會對存在數(shù)據(jù)依賴關(guān)系的操作做重排序。

as-if-serial語義把單線程程序保護了起來,遵守as-if-serial語義的編譯器、runtime和處理器共同為編寫單線程程序的程序員創(chuàng)建了一個幻覺:單線程程序是按程序的順序來執(zhí)行的。as-if-serial語義使單線程程序員無需擔(dān)心重排序會干擾他們,也無需擔(dān)心內(nèi)存可見性問題。

happens-before

happens-before是JMM最核心的概念。從JDK5開始,Java使用新的JSR-133內(nèi)存模型,JSR-133?使用happens-before的概念闡述操作之間的內(nèi)存可見性,如果一個操作執(zhí)行的結(jié)果需要對另一個操作可見,那么這兩個操作之間必須存在happens-before關(guān)系。

happens-before規(guī)則如下:

程序次序法則:線程中的每個動作 A 都 happens-before 于該線程中的每一個動作 B,其中,在程序中,所有的動作 B 都出現(xiàn)在動作 A 之后。(注:此法則只是要求遵循 as-if-serial語義)

監(jiān)視器鎖法則:對一個監(jiān)視器鎖的解鎖 happens-before 于每一個后續(xù)對同一監(jiān)視器鎖的加鎖。(顯式鎖的加鎖和解鎖有著與內(nèi)置鎖,即監(jiān)視器鎖相同的存儲語意。)

volatile變量法則:對 volatile 域的寫入操作 happens-before 于每一個后續(xù)對同一域的讀操作。(原子變量的讀寫操作有著與 volatile 變量相同的語意。)(volatile變量具有可見性和讀寫原子性。)

線程啟動法則:在一個線程里,對 Thread.start 的調(diào)用會 happens-before 于每一個啟動線程中的動作。

線程終止法則:線程中的任何動作都 happens-before 于其他線程檢測到這個線程已終結(jié),或者從 Thread.join 方法調(diào)用中成功返回,或者 Thread.isAlive 方法返回false。

中斷法則法則:一個線程調(diào)用另一個線程的 interrupt 方法 happens-before 于被中斷線程發(fā)現(xiàn)中斷(通過拋出InterruptedException, 或者調(diào)用 isInterrupted 方法和 interrupted 方法)。

終結(jié)法則:一個對象的構(gòu)造函數(shù)的結(jié)束 happens-before 于這個對象 finalizer 開始。

傳遞性:如果 A happens-before 于 B,且 B happens-before 于 C,則 A happens-before 于 C。

happens-before與JMM的關(guān)系如下圖所示


as-if-serial語義和happens-before本質(zhì)上一樣,參考順序一致性內(nèi)存模型的理論,在不改變程序執(zhí)行結(jié)果的前提下,給編譯器和處理器以最大的自由度,提高并行度。

重排序

終于談到我們反復(fù)提及的重排序了,重排序是指編譯器和處理器為了優(yōu)化程序性能而對指令序列進(jìn)行重新排序的一種手段。重排序分3種類型。

編譯器優(yōu)化的重排序。編譯器在不改變單線程程序語義(as-if-serial )的前提下,可以重新安排語句的執(zhí)行順序。

指令級并行的重排序?,F(xiàn)代處理器采用了指令級并行技術(shù)(Instruction Level Parallelism,ILP)來將多條指令重疊執(zhí)行。如果不存在數(shù)據(jù)依賴性,處理器可以改變語句對機器指令的執(zhí)行順序。

內(nèi)存系統(tǒng)的重排序。由于處理器使用緩存和讀/寫緩沖區(qū),這使得加載和存儲操作看上去可能是在亂序執(zhí)行。

從Java源代碼到最終實際執(zhí)行的指令序列,會分別經(jīng)歷下面3種重排序


上述的1屬于編譯器重排序,2和3屬于處理器重排序。這些重排序可能會導(dǎo)致多線程程序出現(xiàn)內(nèi)存可見性問題。對于編譯器,JMM的編譯器重排序規(guī)則會禁止特定類型的編譯器重排序(不是所有的編譯器重排序都要禁止)。對于處理器重排序,JMM的處理器重排序規(guī)則會要求Java編譯器在生成指令序列時,插入特定類型的內(nèi)存屏障指令,通過內(nèi)存屏障指令來禁止特定類型的處理器重排序。

JMM屬于語言級的內(nèi)存模型,它確保在不同的編譯器和不同的處理器平臺之上,通過禁止特定類型的編譯器重排序和處理器重排序,為程序員提供一致的內(nèi)存可見性保證。

從JMM設(shè)計者的角度來說,在設(shè)計JMM時,需要考慮兩個關(guān)鍵因素:

程序員對內(nèi)存模型的使用。程序員希望內(nèi)存模型易于理解,易于編程。程序員希望基于一個強內(nèi)存模型(程序盡可能的順序執(zhí)行)來編寫代碼。

編譯器和處理器對內(nèi)存模型的實現(xiàn)。編譯器和處理器希望內(nèi)存模型對它們的束縛越少越好,這樣它們就可以做盡可能多的優(yōu)化(對程序重排序,做盡可能多的并發(fā))來提高性能。編譯器和處理器希望實現(xiàn)一個弱內(nèi)存模型。

JMM設(shè)計就需要在這兩者之間作出協(xié)調(diào)。JMM對程序采取了不同的策略:

對于會改變程序執(zhí)行結(jié)果的重排序,JMM要求編譯器和處理器必須禁止這種重排序。

對于不會改變程序執(zhí)行結(jié)果的重排序,JMM對編譯器和處理器不作要求(JMM允許這種重排序)。

介紹完了這幾個基本概念,我們不難推斷出JMM是圍繞著在并發(fā)過程中如何處理原子性、可見性和有序性這三個特征來建立的。

原子性:由Java內(nèi)存模型來直接保證的原子性操作就是我們前面介紹的8個原子操作指令,其中l(wèi)ock(lock指令實際在處理器上原子操作體現(xiàn)對總線加鎖或?qū)彺婕渔i)和unlock指令操作JVM并未直接開放給用戶使用,但是卻提供了更高層次的字節(jié)碼指令monitorenter和monitorexit來隱式使用這兩個操作,這兩個字節(jié)碼指令反映到Java代碼中就是同步塊——synchronize關(guān)鍵字,因此在synchronized塊之間的操作也具備原子性。除了synchronize,在Java中另一個實現(xiàn)原子操作的重要方式是自旋CAS,它是利用處理器提供的cmpxchg指令實現(xiàn)的。至于自旋CAS后面J.U.C中會詳細(xì)介紹,它和volatile是整個J.U.C底層實現(xiàn)的核心。

可見性:可見性是指一個線程修改了共享變量的值,其他線程能夠立即得知這個修改。而我們上文談的happens-before原則禁止某些處理器和編譯器的重排序,來保證了JMM的可見性。而體現(xiàn)在程序上,實現(xiàn)可見性的關(guān)鍵字包含了volatile、synchronize和final。

有序性:談到有序性就涉及到前面說的重排序和順序一致性內(nèi)存模型。我們也都知道了as-if-serial是針對單線程程序有序的,即使存在重排序,但是最終程序結(jié)果還是不變的,而多線程程序的有序性則體現(xiàn)在JMM通過插入內(nèi)存屏障指令,禁止了特定類型處理器的重排序。通過前面8個操作指令和happens-before原則介紹,也不難推斷出,volatile和synchronized兩個關(guān)鍵字來保證線程之間的有序性,volatile本身就包含了禁止指令重排序的語義,而synchronized則是由監(jiān)視器法則獲得。

J.U.C

談完了JMM,那么Java相關(guān)類庫是如何實現(xiàn)的呢?這里就談?wù)凧.U.C( java.util.concurrent),先來張J.U.C的思維導(dǎo)圖


不難看出,J.U.C由atomic、locks、tools、collections、executor這五部分組成。它們的實現(xiàn)基于volatile的讀寫和CAS所具有的volatile讀和寫。AQS(AbstractQueuedSynchronizer,隊列同步器)、非阻塞數(shù)據(jù)結(jié)構(gòu)和原子變量類,這些J.U.C中的基礎(chǔ)類都是使用了這種模式實現(xiàn)的,而J.U.C中的高層類又依賴于這些基礎(chǔ)類來實現(xiàn)的。從整體上看,J.U.C的實現(xiàn)示意圖如下


也許你對volatile和CAS的底層實現(xiàn)原理不是很了解,這里先這里先簡單介紹下它們的底層實現(xiàn)

volatile

Java語言規(guī)范第三版對volatile的定義為:Java編程語言允許線程訪問共享變量,為了確保共享變量能被準(zhǔn)確和一致性的更新,線程應(yīng)該確保通過排他鎖單獨獲得這個變量。如果一個字段被聲明為volatile,Java內(nèi)存模型確保這個所有線程看到這個值的變量是一致的。而volatile是如何來保證可見性的呢?如果對聲明了volatile的變量進(jìn)行寫操作,JVM就會向處理器發(fā)送一條Lock前綴的指令,將這個變量所在緩存行的數(shù)據(jù)寫回到系統(tǒng)內(nèi)存(Lock指令會在聲言該信號期間鎖總線/緩存,這樣就獨占了系統(tǒng)內(nèi)存)。但是,就算是寫回到內(nèi)存,如果其他處理器緩存的值還是舊的,再執(zhí)行計算操作就會有問題。所以,在多處理器下,為了保證各個處理器的緩存是一致的,就會實現(xiàn)緩存一致性協(xié)議,每個處理器通過嗅探在總線(注意處理器不直接跟系統(tǒng)內(nèi)存交互,而是通過總線)上傳播的數(shù)據(jù)來檢查自己緩存的值是不是過期了,當(dāng)處理器發(fā)現(xiàn)直接緩存行對應(yīng)的內(nèi)存地址被修改,就會將當(dāng)前處理器的緩存行設(shè)置成無效狀態(tài),當(dāng)處理器對這個數(shù)據(jù)進(jìn)行修改操作的時候,會重新從系統(tǒng)內(nèi)存中把數(shù)據(jù)讀到處理器緩存里。

CAS

CAS其實應(yīng)用挺廣泛的,我們常常聽到的悲觀鎖樂觀鎖的概念,樂觀鎖(無鎖)指的就是CAS。這里只是簡單說下在并發(fā)的應(yīng)用,所謂的樂觀并發(fā)策略,通俗的說,就是先進(jìn)性操作,如果沒有其他線程爭用共享數(shù)據(jù),那操作就成功了,如果共享數(shù)據(jù)有爭用,產(chǎn)生了沖突,那就采取其他的補償措施(最常見的補償措施就是不斷重試,治到成功為止,這里其實也就是自旋CAS的概念),這種樂觀的并發(fā)策略的許多實現(xiàn)都不需要把線程掛起,因此這種操作也被稱為非阻塞同步。而CAS這種樂觀并發(fā)策略操作和沖突檢測這兩個步驟具備的原子性,是靠什么保證的呢?硬件,硬件保證了一個從語義上看起來需要多次操作的行為只通過一條處理器指令就能完成。

也許你會存在疑問,為什么這種無鎖的方案一般會比直接加鎖效率更高呢?這里其實涉及到線程的實現(xiàn)和線程的狀態(tài)轉(zhuǎn)換。實現(xiàn)線程主要有三種方式:使用內(nèi)核線程實現(xiàn)、使用用戶線程實現(xiàn)和使用用戶線程加輕量級進(jìn)程混合實現(xiàn)。而Java的線程實現(xiàn)則依賴于平臺使用的線程模型。至于狀態(tài)轉(zhuǎn)換,Java定義了6種線程狀態(tài),在任意一個時間點,一個線程只能有且只有其中的一種狀態(tài),這6種狀態(tài)分別是:新建、運行、無限期等待、限期等待、阻塞、結(jié)束。 Java的線程是映射到操作系統(tǒng)的原生線程之上的,如果要阻塞或喚醒一個線程,都需要操作系統(tǒng)來幫忙完成,這就需要從用戶態(tài)轉(zhuǎn)換到核心態(tài)中,因此狀態(tài)轉(zhuǎn)換需要耗費很多的處理器時間。對于簡單的同步塊(被synchronized修飾的方法),狀態(tài)轉(zhuǎn)換消耗的時間可能比用戶代碼執(zhí)行的時間還要長。所以出現(xiàn)了這種優(yōu)化方案,在操作系統(tǒng)阻塞線程之間引入一段自旋過程或一直自旋直到成功為止。避免頻繁的切入到核心態(tài)之中。

但是這種方案其實也并不完美,在這里就說下CAS實現(xiàn)原子操作的三大問題

ABA問題。因為CAS需要在操作值的時候,檢查值有沒有變化,如果沒有發(fā)生變化則更新,但是如果一個值原來是A,變成了B,又變成了A,那么使用CAS進(jìn)行檢查時會發(fā)現(xiàn)它的值沒有變化,但是實際上發(fā)生變化了。ABA解決的思路是使用版本號。在變量前面追加上版本號,每次變量更新的時候把版本號加1。JDK的Atomic包里提供了一個類AtomicStampedReference來解決ABA問題。不過目前來說這個類比較“雞肋”,大部分情況下ABA問題不會影響程序并發(fā)的正確性,如果需要解決ABA問題,改用原來的互斥同步可能會比原子類更高效。

循環(huán)時間長開銷大。自旋CAS如果長時間不成功,會給CPU帶來非常大的執(zhí)行開銷。所以說如果是長時間占用鎖執(zhí)行的程序,這種方案并不適用于此。

只能保證一個共享變量的原子操作。當(dāng)對一個共享變量執(zhí)行操作時,我們可以使用自旋CAS來保證原子性,但是對多個共享變量的操作時,自旋CAS就無法保證操作的原子性,這個時候可以用鎖。、

談完了這兩個概念,下面我們就來逐個分析這五部分的具體源碼實現(xiàn)

atomic

atomic包的原子操作類提供了一種簡單、性能高效、線程安全操作一個變量的方式。atomic包里一共13個類,屬于4種類型的原子更新方式,分別是原子更新基本類型、原子更新數(shù)組、原子更新引用、原子更新屬性。atomic包里的類基本使用Unsafe實現(xiàn)的包裝類。

下面通過一個簡單的CAS方式實現(xiàn)計數(shù)器(一個線程安全的計數(shù)器方法safeCount和一個非線程安全的計數(shù)器方法count)的示例來說下

publicclassCASTest{

publicstaticvoidmain(String[]args){

finalCountercas=newCounter();

Listts=newArrayList(600);

longstart=System.currentTimeMillis();

for(intj=0;j<100;j++){

Threadt=newThread(newRunnable(){

@Override

publicvoidrun(){

for(inti=0;i<10000;i++){

cas.count();

cas.safeCount();

}

}

});

ts.add(t);

}

for(Threadt:ts){

t.start();

}

for(Threadt:ts){

try{

t.join();

}catch(InterruptedExceptione){

e.printStackTrace();

}

}

System.out.println(cas.i);

System.out.println(cas.atomicI.get());

System.out.println(System.currentTimeMillis()-start);

}

}

publicclassCounter{

publicAtomicIntegeratomicI=newAtomicInteger(0);

publicinti=0;

/**

* 使用CAS實現(xiàn)線程安全計數(shù)器

*/

publicvoidsafeCount(){

for(;;){

inti=atomicI.get();

booleansuc=atomicI.compareAndSet(i,++i);

if(suc){

break;

}

}

}

/**

* 非線程安全計數(shù)器

*/

publicvoidcount(){

i++;

}

}

safeCount()方法的代碼塊其實是getandIncrement()方法的實現(xiàn),源碼for循環(huán)體第一步優(yōu)先取得atomicI里存儲的數(shù)值,第二步對atomicI的當(dāng)前數(shù)值進(jìn)行加1操作,關(guān)鍵的第三步調(diào)用compareAndSet()方法來進(jìn)行原子更新操作,該方法先檢查當(dāng)前數(shù)值是否等于current,等于意味著atomicI的值沒有被其他線程修改過,則將atomicI的當(dāng)前數(shù)值更新成next的值,如果不等compareAndSet()方法會返回false,程序則進(jìn)入for循環(huán)重新進(jìn)行compareAndSet()方法操作進(jìn)行不斷嘗試直到成功為止。在這里我們跟蹤下compareAndSet()方法如下

publicfinalbooleancompareAndSet(intexpect,intupdate){

returnunsafe.compareAndSwapInt(this,valueOffset,expect,update);

}

從上面源碼我們發(fā)現(xiàn)是使用Unsafe實現(xiàn)的,其實atomic里的類基本都是使用Unsafe實現(xiàn)的。我們再回到這個本地方法調(diào)用,這個本地方法在openjdk中依次調(diào)用c++代碼為unsafe.cpp、atomic.app和atomic_windows_x86.inline.hpp。關(guān)于本地方法實現(xiàn)的源代碼這里就不貼出來了,其實大體上是程序會根據(jù)當(dāng)前處理器的類型來決定是否為cmpxchg指令添加lock前綴。如果程序是在多處理器上運行,就為cmpxchg指令加上lock前綴(Lock Cmpxchg)。反之,如果程序是在單處理器上運行,就省略lock前綴(單處理器自身就會維護單處理器內(nèi)的順序一致性,不需要lock前綴提供的內(nèi)存屏障效果)。

locks

鎖是用來控制多個線程訪問共享資源的形式,Java SE 5之后,J.U.C中新增了locks來實現(xiàn)鎖功能,它提供了與synchronized關(guān)鍵字類似的同步功能。只是在使用時需要顯示的獲取和釋放鎖。雖然它缺少了隱式獲取和釋放鎖的便捷性,但是卻擁有了鎖獲取和釋放的可操作性、可中斷的獲取鎖及超時獲取鎖等多種synchronized關(guān)鍵字不具備的同步特性。

locks在這我們只介紹下核心的AQS(AbstractQueuedSynchronizer,隊列同步器),AQS是用來構(gòu)建鎖或者其他同步組件的基礎(chǔ)框架,它使用一個用volatile修飾的int成員變量表示同步狀態(tài)。通過內(nèi)置的FIFO隊列來完成資源獲取線程的排隊工作。同步器的主要使用方式是繼承,子類通過繼承同步器并實現(xiàn)它的抽象方法來管理同步狀態(tài),在抽象方法的實現(xiàn)過程免不了要對同步狀態(tài)進(jìn)行更改,這時候就會使用到AQS提供的3個方法:getState()、setState()和compareAndSetState()來進(jìn)行操作,這是因為它們能夠保證狀態(tài)的改變是原子性的。為什么這么設(shè)計呢?因為鎖是面向使用者的,它定義了使用者與鎖交互的接口,隱藏了實現(xiàn)細(xì)節(jié),而AQS面向的是鎖的實現(xiàn)者,它簡化了鎖的實現(xiàn)方式,屏蔽了同步狀態(tài)管理、線程的排隊、等待與喚醒等底層操作。鎖和AQS很好的隔離了使用者和實現(xiàn)者鎖關(guān)注的領(lǐng)域。

現(xiàn)在我們就自定義一個獨占鎖來詳細(xì)解釋下AQS的實現(xiàn)機制

publicclassMuteximplementsLock{

privatestaticclassSyncextendsAbstractQueuedSynchronizer{

privatestaticfinallongserialVersionUID=-4387327721959839431L;

protectedbooleanisHeldExclusively(){

returngetState()==1;

}

publicbooleantryAcquire(intacquires){

assertacquires==1;// Otherwise unused

if(compareAndSetState(0,1)){

setExclusiveOwnerThread(Thread.currentThread());

returntrue;

}

returnfalse;

}

protectedbooleantryRelease(intreleases){

assertreleases==1;// Otherwise unused

if(getState()==0)

thrownewIllegalMonitorStateException();

setExclusiveOwnerThread(null);

setState(0);

returntrue;

}

ConditionnewCondition(){

returnnewConditionObject();

}

}

privatefinalSyncsync=newSync();

publicvoidlock(){

sync.acquire(1);

}

publicbooleantryLock(){

returnsync.tryAcquire(1);

}

publicvoidunlock(){

sync.release(1);

}

publicConditionnewCondition(){

returnsync.newCondition();

}

publicbooleanisLocked(){

returnsync.isHeldExclusively();

}

publicbooleanhasQueuedThreads(){

returnsync.hasQueuedThreads();

}

publicvoidlockInterruptibly()throwsInterruptedException{

sync.acquireInterruptibly(1);

}

publicbooleantryLock(longtimeout,TimeUnitunit)throwsInterruptedException{

returnsync.tryAcquireNanos(1,unit.toNanos(timeout));

}

}

實現(xiàn)自定義組件的時候,我們可以看到,AQS可重寫的方法是tryAcquire()——獨占式獲取同步狀態(tài)、tryRelease()——獨占式釋放同步狀態(tài)、tryAcquireShared()——共享式獲取同步狀態(tài)、tryReleaseShared ()——共享式釋放同步狀態(tài)、isHeldExclusively()——是否被當(dāng)前線程所獨占。這個示例中,獨占鎖Mutex是一個自定義同步組件,它在同一時刻只允許一個線程占有鎖。Mutex中定義了一個靜態(tài)內(nèi)部類,該內(nèi)部類繼承了同步器并實現(xiàn)了獨占式獲取和釋放同步狀態(tài)。在tryAcquire()中,如果經(jīng)過CAS設(shè)置成功(同步狀態(tài)設(shè)置為1),則表示獲取了同步狀態(tài),而在tryRelease()中,只是將同步狀態(tài)重置為0。接著我們對比一下重入鎖(ReentrantLock)的源碼實現(xiàn)

publicclassReentrantLockimplementsLock,java.io.Serializable{

privatestaticfinallongserialVersionUID=7373984872572414699L;

/** Synchronizer providing all implementation mechanics */

privatefinalSyncsync;

/**

* Base of synchronization control for this lock. Subclassed

* into fair and nonfair versions below. Uses AQS state to

* represent the number of holds on the lock.

*/

abstractstaticclassSyncextendsAbstractQueuedSynchronizer{

privatestaticfinallongserialVersionUID=-5179523762034025860L;

/**

* Performs {@link Lock#lock}. The main reason for subclassing

* is to allow fast path for nonfair version.

*/

abstractvoidlock();

/**

* Performs non-fair tryLock.? tryAcquire is

* implemented in subclasses, but both need nonfair

* try for trylock method.

*/

finalbooleannonfairTryAcquire(intacquires){

finalThreadcurrent=Thread.currentThread();

intc=getState();

if(c==0){

if(compareAndSetState(0,acquires)){

setExclusiveOwnerThread(current);

returntrue;

}

}

elseif(current==getExclusiveOwnerThread()){

intnextc=c+acquires;

if(nextc<0)// overflow

thrownewError("Maximum lock count exceeded");

setState(nextc);

returntrue;

}

returnfalse;

}

protectedfinalbooleantryRelease(intreleases){

intc=getState()-releases;

if(Thread.currentThread()!=getExclusiveOwnerThread())

thrownewIllegalMonitorStateException();

booleanfree=false;

if(c==0){

free=true;

setExclusiveOwnerThread(null);

}

setState(c);

returnfree;

}

protectedfinalbooleanisHeldExclusively(){

// While we must in general read state before owner,

// we don't need to do so to check if current thread is owner

returngetExclusiveOwnerThread()==Thread.currentThread();

}

finalConditionObjectnewCondition(){

returnnewConditionObject();

}

// Methods relayed from outer class

finalThreadgetOwner(){

returngetState()==0?null:getExclusiveOwnerThread();

}

finalintgetHoldCount(){

returnisHeldExclusively()?getState():0;

}

finalbooleanisLocked(){

returngetState()!=0;

}

/**

* Reconstitutes this lock instance from a stream.

* @param s the stream

*/

privatevoidreadObject(java.io.ObjectInputStreams)

throwsjava.io.IOException,ClassNotFoundException{

s.defaultReadObject();

setState(0);// reset to unlocked state

}

}

/**

* Sync object for non-fair locks

*/

staticfinalclassNonfairSyncextendsSync{

//todo sth...

}

//todo sth...

}

重入鎖分公平鎖和不公平鎖,默認(rèn)使用的是不公平鎖,在這我們看到實現(xiàn)重入鎖大體上跟我們剛才自定義的獨占鎖差不多,但是有什么區(qū)別呢?我們看看重入鎖nonfairTryAcquire()方法實現(xiàn):首先獲取同步狀態(tài)(默認(rèn)是0),如果是0的話,CAS設(shè)置同步狀態(tài),非0的話則判斷當(dāng)前線程是否已占有鎖,如果是的話,則偏向更新同步狀態(tài)。從這里我們不難推斷出重入鎖的概念,同一個線程可以多次獲得同一把鎖,在釋放的時候也必須釋放相同次數(shù)的鎖。通過對比相信大家對自定義一個鎖有了一個初步的概念,也許你存在疑問我們重寫的這幾個方法在AQS哪地方用呢?現(xiàn)在我們來繼續(xù)往下跟蹤,我們深入跟蹤下剛才自定義獨占鎖lock()方法里面acquire()的實現(xiàn)

publicfinalvoidacquire(intarg){

if(!tryAcquire(arg)&&

acquireQueued(addWaiter(Node.EXCLUSIVE),arg))

selfInterrupt();

}

這個方法在AQS類里面,看到里面的tryAcquire(arg)大家也就明白了,tryAcquire(arg)方法獲取同步狀態(tài),后面acquireQueued(addWaiter(Node.EXCLUSIVE),?arg)方法就是說的節(jié)點構(gòu)造、加入同步隊列及在同步隊列中自旋等待的AQS沒暴露給我們的相關(guān)操作。大體的流程就是首先調(diào)用自定義同步器實現(xiàn)的tryAcquire()方法,該方法保證線程安全的獲取同步狀態(tài),如果獲取同步狀態(tài)失敗,則構(gòu)造同步節(jié)點(獨占式Node.EXCLUSIVE,同一時刻只能有一個線程成功獲取同步狀態(tài))并通過addWaiter()方法將該節(jié)點加入到同步隊列的尾部,最后調(diào)用acquireQueued()方法,使得該節(jié)點以“死循環(huán)”的方式獲取同步狀態(tài)。如果獲取不到則阻塞節(jié)點中的線程,而被阻塞線程的喚醒主要靠前驅(qū)節(jié)點的出隊或阻塞線程被中斷來實現(xiàn)。也許你還是不明白剛才所說的,那么我們繼續(xù)跟蹤下addWaiter()方法的實現(xiàn)

privateNodeaddWaiter(Nodemode){

Nodenode=newNode(Thread.currentThread(),mode);

// Try the fast path of enq; backup to full enq on failure

Nodepred=tail;

if(pred!=null){

node.prev=pred;

if(compareAndSetTail(pred,node)){

pred.next=node;

returnnode;

}

}

enq(node);

returnnode;

}

privateNodeenq(finalNodenode){

for(;;){

Nodet=tail;

if(t==null){// Must initialize

if(compareAndSetHead(newNode()))

tail=head;

}else{

node.prev=t;

if(compareAndSetTail(t,node)){

t.next=node;

returnt;

}

}

}

}

上面的代碼通過使用compareAndSetTail()方法來確保節(jié)點能夠被線程安全添加。在enq()方法中,同步器通過“死循環(huán)”來確保節(jié)點的正確添加,在”死循環(huán)“中只有通過CAS將節(jié)點設(shè)置成為尾節(jié)點之后,當(dāng)前線程才能夠從該方法返回,否則,當(dāng)前線程不斷地嘗試重試設(shè)置。

在節(jié)點進(jìn)入同步隊列之后,發(fā)生了什么呢?現(xiàn)在我們繼續(xù)跟蹤下acquireQueued()方法

finalbooleanacquireQueued(finalNodenode,intarg){

booleanfailed=true;

try{

booleaninterrupted=false;

for(;;){

finalNodep=node.predecessor();

if(p==head&&tryAcquire(arg)){

setHead(node);

p.next=null;// help GC

failed=false;

returninterrupted;

}

if(shouldParkAfterFailedAcquire(p,node)&&

parkAndCheckInterrupt())

interrupted=true;

}

}finally{

if(failed)

cancelAcquire(node);

}

}

從上面的代碼我們不難看出,節(jié)點進(jìn)入同步隊列之后,就進(jìn)入了一個自旋的過程,每個節(jié)點(或者說每個線程)都在自省的觀察,當(dāng)條件滿足時(自己的前驅(qū)節(jié)點是頭節(jié)點就進(jìn)行CAS設(shè)置同步狀態(tài))就獲得同步狀態(tài),然后就可以從自旋的過程中退出,否則依舊在這個自旋的過程中。

collections

從前面的思維導(dǎo)圖我們可以看到并發(fā)容器包括鏈表、隊列、HashMap等.它們都是線程安全的.

ConcurrentHashMap : 一個高效的線程安全的HashMap。

CopyOnWriteArrayList : 在讀多寫少的場景中,性能非常好,遠(yuǎn)遠(yuǎn)高于vector。

ConcurrentLinkedQueue : 高效并發(fā)隊列,使用鏈表實現(xiàn),可以看成線程安全的LinkedList。

BlockingQueue : 一個接口,JDK內(nèi)部通過鏈表,數(shù)組等方式實現(xiàn)了這個接口,表示阻塞隊列,非常適合用作數(shù)據(jù)共享 。

ConcurrentSkipListMap : 跳表的實現(xiàn),這是一個Map,使用跳表數(shù)據(jù)結(jié)構(gòu)進(jìn)行快速查找 。

另外Collections工具類可以幫助我們將任意集合包裝成線程安全的集合。在這里重點說下ConcurrentHashMap和BlockingQueue這兩個并發(fā)容器。

我們都知道HashMap線程不安全的,而我們可以通過Collections.synchronizedMap(new HashMap<>())來包裝一個線程安全的HashMap或者使用線程安全的HashTable,但是它們的效率都不是很好,這時候我們就有了ConcurrentHashMap。為什么ConcurrentHashMap高效且線程安全呢?其實它使用了鎖分段技術(shù)來提高了并發(fā)的訪問率。假如容器里有多把鎖,每一把鎖用于鎖容器的一部分?jǐn)?shù)據(jù),那么當(dāng)多線程訪問容器里不同數(shù)據(jù)段的數(shù)據(jù)時,線程間就不會存在鎖競爭,從而可以有效地提高并發(fā)訪問效率,這就是鎖分段技術(shù)。首先將數(shù)據(jù)分成一段段的存儲,然后給每段數(shù)據(jù)配一把鎖,當(dāng)一個線程占用鎖訪問其中一個段數(shù)據(jù)的時候,其他段的數(shù)據(jù)也能被其他線程訪問。而既然數(shù)據(jù)被分成了多個段,線程如何定位要訪問的段的數(shù)據(jù)呢?這里其實是通過散列算法來定位的。

現(xiàn)在來談?wù)勛枞犃?,阻塞隊列其實跟后面要談的線程池息息相關(guān)的,JDK7提供了7個阻塞隊列,分別是

ArrayBlockingQueue :一個由數(shù)組結(jié)構(gòu)組成的有界阻塞隊列。

LinkedBlockingQueue :一個由鏈表結(jié)構(gòu)組成的有界阻塞隊列。

PriorityBlockingQueue :一個支持優(yōu)先級排序的無界阻塞隊列。

DelayQueue:一個使用優(yōu)先級隊列實現(xiàn)的無界阻塞隊列。

SynchronousQueue:一個不存儲元素的阻塞隊列。

LinkedTransferQueue:一個由鏈表結(jié)構(gòu)組成的無界阻塞隊列。

LinkedBlockingDeque:一個由鏈表結(jié)構(gòu)組成的雙向阻塞隊列。

如果隊列是空的,消費者會一直等待,當(dāng)生產(chǎn)者添加元素時候,消費者是如何知道當(dāng)前隊列有元素的呢?如果讓你來設(shè)計阻塞隊列你會如何設(shè)計,讓生產(chǎn)者和消費者能夠高效率的進(jìn)行通訊呢?讓我們先來看看JDK是如何實現(xiàn)的。

使用通知模式實現(xiàn)。所謂通知模式,就是當(dāng)生產(chǎn)者往滿的隊列里添加元素時會阻塞住生產(chǎn)者,當(dāng)消費者消費了一個隊列中的元素后,會通知生產(chǎn)者當(dāng)前隊列可用。通過查看JDK源碼發(fā)現(xiàn)ArrayBlockingQueue使用了Condition來實現(xiàn),代碼如下:

privatefinalConditionnotFull;

privatefinalConditionnotEmpty;

publicArrayBlockingQueue(intcapacity,booleanfair){

//省略其他代碼

notEmpty=lock.newCondition();

notFull=lock.newCondition();

}

publicvoidput(E e)throwsInterruptedException{

checkNotNull(e);

finalReentrantLocklock=this.lock;

lock.lockInterruptibly();

try{

while(count==items.length)

notFull.await();

insert(e);

}finally{

lock.unlock();

}

}

publicE take()throwsInterruptedException{

finalReentrantLocklock=this.lock;

lock.lockInterruptibly();

try{

while(count==0)

notEmpty.await();

returnextract();

}finally{

lock.unlock();

}

}

privatevoidinsert(E x){

items[putIndex]=x;

putIndex=inc(putIndex);

++count;

notEmpty.signal();

}

當(dāng)我們往隊列里插入一個元素時,如果隊列不可用,阻塞生產(chǎn)者主要通過LockSupport.park(this)來實現(xiàn)

publicfinalvoidawait()throwsInterruptedException{

if(Thread.interrupted())

thrownewInterruptedException();

Nodenode=addConditionWaiter();

intsavedState=fullyRelease(node);

intinterruptMode=0;

while(!isOnSyncQueue(node)){

LockSupport.park(this);

if((interruptMode=checkInterruptWhileWaiting(node))!=0)

break;

}

if(acquireQueued(node,savedState)&&interruptMode!=THROW_IE)

interruptMode=REINTERRUPT;

if(node.nextWaiter!=null)// clean up if cancelled

unlinkCancelledWaiters();

if(interruptMode!=0)

reportInterruptAfterWait(interruptMode);

}

繼續(xù)進(jìn)入源碼,發(fā)現(xiàn)調(diào)用setBlocker先保存下將要阻塞的線程,然后調(diào)用unsafe.park阻塞當(dāng)前線程。

publicstaticvoidpark(Objectblocker){

Threadt=Thread.currentThread();

setBlocker(t,blocker);

unsafe.park(false,0L);

setBlocker(t,null);

}

unsafe.park是個native方法,代碼如下:

publicnativevoidpark(booleanisAbsolute,longtime);

park這個方法會阻塞當(dāng)前線程,只有以下四種情況中的一種發(fā)生時,該方法才會返回。

與park對應(yīng)的unpark執(zhí)行或已經(jīng)執(zhí)行時。注意:已經(jīng)執(zhí)行是指unpark先執(zhí)行,然后再執(zhí)行的park。

線程被中斷時。

如果參數(shù)中的time不是零,等待了指定的毫秒數(shù)時。

發(fā)生異?,F(xiàn)象時。這些異常事先無法確定。

我們繼續(xù)看一下JVM是如何實現(xiàn)park方法的,park在不同的操作系統(tǒng)使用不同的方式實現(xiàn),在linux下是使用的是系統(tǒng)方法pthread_cond_wait實現(xiàn)。實現(xiàn)代碼在JVM源碼路徑src/os/linux/vm/os_linux.cpp里的 os::PlatformEvent::park方法,代碼如下:

voidos::PlatformEvent::park(){

intv;

for(;;){

v=_Event;

if(Atomic::cmpxchg(v-1,&_Event,v)==v)break;

}

guarantee(v>=0,"invariant");

if(v==0){

// Do this the hard way by blocking ...

intstatus=pthread_mutex_lock(_mutex);

assert_status(status==0,status,"mutex_lock");

guarantee(_nParked==0,"invariant");

++_nParked;

while(_Event<0){

status=pthread_cond_wait(_cond,_mutex);

// for some reason, under 2.7 lwp_cond_wait() may return ETIME ...

// Treat this the same as if the wait was interrupted

if(status==ETIME){status=EINTR;}

assert_status(status==0||status==EINTR,status,"cond_wait");

}

--_nParked;

// In theory we could move the ST of 0 into _Event past the unlock(),

// but then we'd need a MEMBAR after the ST.

_Event=0;

status=pthread_mutex_unlock(_mutex);

assert_status(status==0,status,"mutex_unlock");

}

guarantee(_Event>=0,"invariant");

}

}

pthread_cond_wait是一個多線程的條件變量函數(shù),cond是condition的縮寫,字面意思可以理解為線程在等待一個條件發(fā)生,這個條件是一個全局變量。這個方法接收兩個參數(shù),一個共享變量_cond,一個互斥量_mutex。而unpark方法在linux下是使用pthread_cond_signal實現(xiàn)的。park 在windows下則是使用WaitForSingleObject實現(xiàn)的。

當(dāng)隊列滿時,生產(chǎn)者往阻塞隊列里插入一個元素,生產(chǎn)者線程會進(jìn)入WAITING (parking)狀態(tài)。

executor

Executor框架提供了各種類型的線程池,不同的線程池應(yīng)用了前面介紹的不同的堵塞隊列


Executor框架最核心的類是ThreadPoolExecutor,它是線程池的實現(xiàn)類。 對于核心的幾個線程池,無論是newFixedThreadPool()、newSingleThreadExecutor()還是newCacheThreadPool()方法,雖然看起來創(chuàng)建的線程具有完全不同的功能特點,但其內(nèi)部均使用了ThreadPoolExecutor實現(xiàn)

publicstaticExecutorServicenewFixedThreadPool(intnThreads){

returnnewThreadPoolExecutor(nThreads,nThreads,

0L,TimeUnit.MILLISECONDS,

newLinkedBlockingQueue());

}

publicstaticExecutorServicenewSingleThreadExecutor(){

returnnewFinalizableDelegatedExecutorService

(newThreadPoolExecutor(1,1,

0L,TimeUnit.MILLISECONDS,

newLinkedBlockingQueue()));

}

publicstaticExecutorServicenewCachedThreadPool(){

returnnewThreadPoolExecutor(0,Integer.MAX_VALUE,

60L,TimeUnit.SECONDS,

newSynchronousQueue());

}

newFixedThreadPool()方法的實現(xiàn),它返回一個corePoolSize和maximumPoolSize一樣的,并使用了LinkedBlockingQueue任務(wù)隊列(無界隊列)的線程池。當(dāng)任務(wù)提交非常頻繁時,該隊列可能迅速膨脹,從而系統(tǒng)資源耗盡。

newSingleThreadExecutor()返回單線程線程池,是newFixedThreadPool()方法的退化,只是簡單的將線程池數(shù)量設(shè)置為1。

newCachedThreadPool()方法返回corePoolSize為0而maximumPoolSize無窮大的線程池,這意味著沒有任務(wù)的時候線程池內(nèi)沒有現(xiàn)場,而當(dāng)任務(wù)提交時,該線程池使用空閑線程執(zhí)行任務(wù),若無空閑則將任務(wù)加入SynchronousQueue隊列,而SynchronousQueue隊列是直接提交隊列,它總是破事線程池增加新的線程來執(zhí)行任務(wù)。當(dāng)任務(wù)執(zhí)行完后由于corePoolSize為0,因此空閑線程在指定時間內(nèi)(60s)被回收。對于newCachedThreadPool(),如果有大量任務(wù)提交,而任務(wù)又不那么快執(zhí)行時,那么系統(tǒng)變回開啟等量的線程處理,這樣做法可能會很快耗盡系統(tǒng)的資源,因為它會增加無窮大數(shù)量的線程。

由以上線程池的實現(xiàn)可以看到,它們都只是ThreadPoolExecutor類的封裝。我們看下ThreadPoolExecutor最重要的構(gòu)造函數(shù):

publicThreadPoolExecutor(

//核心線程池,指定了線程池中的線程數(shù)量

intcorePoolSize,

//基本線程池,指定了線程池中的最大線程數(shù)量

intmaximumPoolSize,

//當(dāng)前線程池數(shù)量超過corePoolSize時,多余的空閑線程的存活時間,即多次時間內(nèi)會被銷毀。

longkeepAliveTime,

//keepAliveTime的單位

TimeUnitunit,

//任務(wù)隊列,被提交但尚未被執(zhí)行的任務(wù)。

BlockingQueueworkQueue,

//線程工廠,用于創(chuàng)建線程,一般用默認(rèn)的即可

ThreadFactorythreadFactory,

//拒絕策略,當(dāng)任務(wù)太多來不及處理,如何拒絕任務(wù)。

RejectedExecutionHandlerhandler)

ThreadPoolExecutor的任務(wù)調(diào)度邏輯如下


從上圖我們可以看出,當(dāng)提交一個新任務(wù)到線程池時,線程池的處理流程如下:

首先線程池判斷基本線程池是否已滿,如果沒滿,創(chuàng)建一個工作線程來執(zhí)行任務(wù)。滿了,則進(jìn)入下個流程。

其次線程池判斷工作隊列是否已滿,如果沒滿,則將新提交的任務(wù)存儲在工作隊列里。滿了,則進(jìn)入下個流程。

最后線程池判斷整個線程池是否已滿,如果沒滿,則創(chuàng)建一個新的工作線程來執(zhí)行任務(wù),滿了,則交給飽和策略來處理這個任務(wù)。

下面我們來看看ThreadPoolExecutor核心調(diào)度代碼

publicvoidexecute(Runnablecommand){

if(command==null)

thrownewNullPointerException();

intc=ctl.get();

/**

* workerCountOf(c)獲取當(dāng)前線程池線程總數(shù)

* 當(dāng)前線程數(shù)小于corePoolSize核心線程數(shù)時,會將任務(wù)通過addWorker(command, true)方法直接調(diào)度執(zhí)行。

* 否則進(jìn)入下個if,將任務(wù)加入等待隊列

**/

if(workerCountOf(c)

if(addWorker(command,true))

return;

c=ctl.get();

}

/**

* workQueue.offer(command) 將任務(wù)加入等待隊列。

* 如果加入失?。ū热缬薪珀犃羞_(dá)到上限或者使用了synchronousQueue)則會執(zhí)行else。

*

**/

if(isRunning(c)&&workQueue.offer(command)){

intrecheck=ctl.get();

if(!isRunning(recheck)&&remove(command))

reject(command);

elseif(workerCountOf(recheck)==0)

addWorker(null,false);

}

/**

* addWorker(command, false)直接交給線程池,

* 如果當(dāng)前線程已達(dá)到maximumPoolSize,則提交失敗執(zhí)行reject()拒絕策略。

**/

elseif(!addWorker(command,false))

reject(command);

}

從上面的源碼我們可以知道execute的執(zhí)行步驟:

如果當(dāng)前運行的線程少于corePoolSize,則創(chuàng)建新線程來執(zhí)行任務(wù)(注意,執(zhí)行這一步驟需要獲取全局鎖)。

如果運行的線程等于或多于corePoolSize,則將任務(wù)加入到BlockingQueue。

如果無法將任務(wù)假如BlockingQueue(隊列已滿),則創(chuàng)建新的線程來處理任務(wù)(注意,執(zhí)行這一步驟需要獲取全局鎖)。

如果創(chuàng)建新線程將使當(dāng)前運行的線程超出maximumPoolSize,任務(wù)將被拒絕,并調(diào)用RejectedExecutionHandler.rejectedExecution()方法。

ThreadPoolExecutor采取上述步驟的總體設(shè)計思路,是為了在執(zhí)行execute()方法時,盡可能的避免獲取全局鎖(那將會是一個嚴(yán)重的 可伸縮瓶頸)。在ThreadPoolExecutor完成預(yù)熱之后(當(dāng)前運行的線程數(shù)大于等于corePoolSize),幾乎所有的execute()方法調(diào)用都是執(zhí)行步驟2,而步驟2不需要獲取全局鎖。

參考閱讀

本文部分內(nèi)容參考自《Java并發(fā)編程的藝術(shù)》、《深入理解java虛擬機(第2版)》、《實戰(zhàn)Java高并發(fā)程序設(shè)計》、《深入Java內(nèi)存模型》、《Java并發(fā)編程實踐》,感興趣的可自行查閱。

打開手機看微信,關(guān)注微信訂閱號“Martin說”,每天收獲更多Martin的閑扯。

原文出處:HugNew ? 從一個簡單的Java單例示例談?wù)劜l(fā)

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容