layout: post
title: 《Java并發(fā)編程的藝術(shù)》筆記
categories: Java
excerpt: The Art of Java Concurrency Programming.
<img src="http://upload-images.jianshu.io/upload_images/658453-a94405da52987372.jpg?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240" width="70%">
好記性不如爛筆頭。多讀多思考。
基本概念 & Java 并發(fā)機(jī)制的底層實現(xiàn)原理
上下文切換:CPU在任務(wù)切換前會保存前一個任務(wù)的狀態(tài),以便下次切換回這個任務(wù)時,可以再加載這個任務(wù)的狀態(tài)。所以任務(wù)從保存到再加載的過程就是一次任務(wù)切換。
內(nèi)存屏障:一組處理器指令,用于實現(xiàn)對內(nèi)存操作的順序限制。
鎖的升級
現(xiàn)在我們應(yīng)該知道,Synchronized 是通過對象內(nèi)部的一個叫做監(jiān)視器鎖(monitor)來實現(xiàn)的。但是監(jiān)視器鎖本質(zhì)又是依賴于底層的操作系統(tǒng)的 Mutex Lock 來實現(xiàn)的。而操作系統(tǒng)實現(xiàn)線程之間的切換這就需要從用戶態(tài)轉(zhuǎn)換到核心態(tài),這個成本非常高,狀態(tài)之間的轉(zhuǎn)換需要相對比較長的時間,這就是為什么 Synchronized 效率低的原因。因此,這種依賴于操作系統(tǒng) Mutex Lock 所實現(xiàn)的鎖我們稱之為“重量級鎖”。JDK 中對 Synchronized 做的種種優(yōu)化,其核心都是為了減少這種重量級鎖的使用。JDK1.6 以后,為了減少獲得鎖和釋放鎖所帶來的性能消耗,提高性能,引入了“輕量級鎖”和“偏向鎖”。
每一個線程在準(zhǔn)備獲取共享資源時:
已經(jīng)獲取偏向鎖的線程為線程1, 新線程為:線程2
第一步,線程2檢查MarkWord里面是不是放的自己的ThreadId ,如果是,表示當(dāng)前線程是處于 “偏向鎖” ,就可以直接執(zhí)行方法體了。
第二步,如果MarkWord不是自己的ThreadId, 用CAS來執(zhí)行切換,如果不成功,線程2根據(jù)MarkWord里現(xiàn)有的ThreadId,通知之前線程暫停,之前線程將Markword的內(nèi)容置為空。 (線程1的同步體執(zhí)行完后 會根據(jù)線程2的請求,暫停線程,置空markword里面的線程ID)
第三步,這樣線程2就以輕量級的鎖機(jī)制工作,如果這時線程3進(jìn)入,就會進(jìn)入自旋模式等待鎖
第四步,自旋的線程3在自旋過程中,成功獲得資源(即之前獲的資源的線程執(zhí)行完成并釋放了共享資源),則整個狀態(tài)依然處于 輕量級鎖的狀態(tài),如果自旋失敗 ,即自旋時間結(jié)束,仍然沒有獲取輕量級鎖,進(jìn)入重量級鎖。
第五步,線程3進(jìn)入重量級鎖,將對象的markword修改為指向重量級鎖的指針,線程2執(zhí)行為同步體,修改Markword時,會失敗,這樣線程2就會意識到進(jìn)入重量級鎖了,
第六步,線程2釋放鎖,通知重量級鎖喚醒阻塞隊列。
輕量級鎖是為了在線程交替執(zhí)行同步塊時提高性能,而偏向鎖則是在只有一個線程執(zhí)行同步塊時進(jìn)一步提高性能。
處理器實現(xiàn)原子操作的方式:總線鎖(鎖住整個內(nèi)存);緩存鎖(在處理器內(nèi)部緩存中實現(xiàn)原子操作,使其他處理器不能緩存 i 的緩存行)。
Java 實現(xiàn)原子操作的方式:鎖和循環(huán) CAS(Compare and Swap 比較并交換);CAS 利用了處理器的 CMPXCHG 指令(該指令是原子的)。
除了偏向鎖,JVM 實現(xiàn)鎖的方式都用了循環(huán) CAS,即當(dāng)一個線程想進(jìn)入同步塊的時候使用循環(huán) CAS 的方式來獲取鎖,當(dāng)它退出同步塊的時候使用循環(huán) CAS 釋放鎖。
// 循環(huán)CAS
public final int incrementAndGet() {
for (;;) {
int current = get();
int next = current + 1;
if (compareAndSet(current, next))
return next;
}
}
Java內(nèi)存模型
3個同步原語:synchronized,volatile,final;
并發(fā)編程的兩個關(guān)鍵問題:線程間通信和線程間同步;
在共享內(nèi)存的并發(fā)模型中,線程之間共享內(nèi)存的公共狀態(tài),通過讀-寫內(nèi)存的公共狀態(tài)進(jìn)行隱式通信。在消息傳遞的并發(fā)模型中,線程之間沒有公共狀態(tài),必須通過發(fā)送消息來顯式進(jìn)行通信。
同步是指用于控制不同線程間操作發(fā)生相對順序的機(jī)制。在共享內(nèi)存并發(fā)模型里,同步是顯式進(jìn)行的——程序員需要顯式指定某個方法或某段代碼需要在線程間互斥執(zhí)行。在消息傳遞的并發(fā)模型里,由于消息的發(fā)送必須在消息的接收之前,因此同步是隱式進(jìn)行的。
Java的并發(fā)采用的是共享內(nèi)存模型,所以Java線程之間的通信總是隱式進(jìn)行。
Java內(nèi)存模型(JMM):

(本地內(nèi)存是JMM的一個抽象概念,并不真實存在。它涵蓋了緩存、寫緩沖區(qū)、寄存器以及其他硬件和編譯器優(yōu)化。(不完全是內(nèi)存,也不完全是Cache))
從上圖來看,線程A與線程B之間如要通信的話,必須要經(jīng)歷下面2個步驟:
- 首先,線程A把本地內(nèi)存A中更新過的共享變量刷新到主內(nèi)存中去。
- 然后,線程B到主內(nèi)存中去讀取線程A之前已更新過的共享變量。

重要概念:重排序,編譯器重排序和處理器重排序,為了提高并行度。
數(shù)據(jù)依賴:寫后讀,寫后寫,讀后寫;這3種情況,只要重排序兩個操作的執(zhí)行順序,程序的執(zhí)行結(jié)果就會改變;所以重排序時會遵守數(shù)據(jù)依賴性,不會改變存在數(shù)據(jù)依賴關(guān)系的兩個操作的執(zhí)行順序。
控制依賴:由于處理器會采用分支預(yù)測技術(shù)來提高并行度,i = a * a可能會被重排序到if (flag)之前執(zhí)行——這在單線程中是沒問題的,但在多線程環(huán)境下就可能改變程序的執(zhí)行結(jié)果。
if (flag) {
i = a * a;
}
as-if-serial語義:不管怎么重排序,單線程程序的執(zhí)行結(jié)果不能被改變。
happens-before
JSR-133使用happens-before的概念來闡述操作之間的內(nèi)存可見性。在JMM中,如果一個操作執(zhí)行的結(jié)果需要對另一個操作可見,那么這兩個操作之間必須要存在happens-before關(guān)系。這里提到的兩個操作既可以是在一個線程之內(nèi),也可以是在不同線程之間。
happen-before的定義如下:
- 如果一個操作happens-before另一個操作,那么第一個操作的執(zhí)行結(jié)果將對第二個操作可見,而且第一個操作的執(zhí)行順序排在第二個操作之前
- 兩個操作之間存在happens-before關(guān)系,并不意味著Java平臺的具體實現(xiàn)必須要按照happens-before關(guān)系指定的順序來執(zhí)行。如果重排序之后的執(zhí)行結(jié)果與按照原來那種happens-before關(guān)系執(zhí)行的結(jié)果一致,那么JMM允許編譯器和處理器進(jìn)行這種重排序
as-if-serial語義保證單線程內(nèi)的程序執(zhí)行結(jié)果不會改變,happens-before保證正確同步的多線程程序的執(zhí)行結(jié)果不會被改變。
總共有六條規(guī)則:
- 程序順序規(guī)則:一個線程中的每個操作,happens-before于隨后該線程中的任意后續(xù)操作
- 監(jiān)視器鎖規(guī)則:對一個鎖的解鎖,happens-before于隨后對這個鎖的獲取
- volatile變量規(guī)則:對一個volatile域的寫,happens-before于對這個變量的讀
- 傳遞性:如果A happens-before B,B happens-before C,那么A happens-before C
- start規(guī)則:如果線程A執(zhí)行線程B的start方法,那么線程A的ThreadB.start()happens-before于線程B的任意操作
- join規(guī)則:如果線程A執(zhí)行線程B的join方法,那么線程B的任意操作happens-before于線程A從TreadB.join()方法成功返回。
順序一致性內(nèi)存模型:順序一致性內(nèi)存模型是一個被計算機(jī)科學(xué)家理想化了的理論參考模型,它為程序員提供了極強(qiáng)的內(nèi)存可見性保證。順序一致性內(nèi)存模型有兩大特性:
- 一個線程中的所有操作必須按照程序的順序來執(zhí)行。
- (不管程序是否同步)所有線程都只能看到一個單一的操作執(zhí)行順序。在順序一致性內(nèi)存模型中,每個操作都必須原子執(zhí)行且立刻對所有線程可見。
順序一致性內(nèi)存模型的視圖:

在JMM中,臨界區(qū)內(nèi)的代碼可以重排序,但不允許臨界區(qū)內(nèi)的代碼“溢出”到臨界區(qū)之外,那樣會破壞監(jiān)視器的內(nèi)存語義。
JMM保證:單線程程序和正確同步的多線程程序的執(zhí)行結(jié)果與在順序一致性內(nèi)存模型中的執(zhí)行結(jié)果相同。
volatile
對volatile變量的單個讀寫,可以看成是使用了同一個鎖對這些單個讀寫作了同步。(這樣,即使是64位的long/double型變量,只要用volatile修飾,對該變量的讀寫就具有了原子性。注意,++這種復(fù)合操作依舊不具有原子性。)
volatile變量自身的特性:
- 可見性。對一個volatile變量的讀,總是能看到(任意線程)對這個volatile變量最后的寫入。
- 原子性:對任意單個volatile變量的讀/寫具有原子性,但類似于volatile++這種復(fù)合操作不具有原子性。
volatile的內(nèi)存語義(對內(nèi)存可見性的影響)
- 當(dāng)寫一個volatile變量時,JMM會把該線程對應(yīng)的本地內(nèi)存中的共享變量刷新到主內(nèi)存。
- 當(dāng)讀一個volatile變量時,JMM會把該線程對應(yīng)的本地內(nèi)存置為無效。線程接下來將從主內(nèi)存中讀取共享變量。
當(dāng)?shù)诙€操作是volatile寫時,不管第一個操作是什么,都不能重排序。這個規(guī)則確保volatile寫之前的操作不會被編譯器重排序到volatile寫之后。
當(dāng)?shù)谝粋€操作是volatile讀時,不管第二個操作是什么,都不能重排序。這個規(guī)則確保volatile讀之后的操作不會被編譯器重排序到volatile讀之前。
當(dāng)?shù)谝粋€操作是volatile寫,第二個操作是volatile讀時,不能重排序。
鎖的內(nèi)存語義
眾所周知,鎖可以讓臨界區(qū)互斥執(zhí)行;但鎖有一個同樣重要,但常常被忽視的功能:鎖的內(nèi)存語義。
- 當(dāng)線程釋放鎖時,JMM會把該線程對應(yīng)的本地內(nèi)存中的共享變量刷新到主內(nèi)存中
- 當(dāng)線程獲取鎖時,JMM會把該線程對應(yīng)的本地內(nèi)存置為無效。從而使得被監(jiān)視器保護(hù)的臨界區(qū)代碼必須要從主內(nèi)存中去讀取共享變量
對比鎖釋放-獲取的內(nèi)存語義與volatile寫-讀的內(nèi)存語義,可以看出:鎖釋放與volatile寫有相同的內(nèi)存語義;鎖獲取與volatile讀有相同的內(nèi)存語義。
final的內(nèi)存語義
- JMM禁止編譯器把final域的寫重排序到構(gòu)造函數(shù)之外(對普通域的寫可能被重排序到構(gòu)造函數(shù)之外!)
- 在一個線程中,初次讀對象引用與初次讀該對象包含的final域,JMM禁止處理器重排序這兩個操作(這兩個操作之間存在間接依賴,大多數(shù)處理器會遵守間接依賴,不會重排序這兩個操作,但有少數(shù)處理器不遵守間接依賴關(guān)系,這個規(guī)則就是專門用來針對這種處理器的)
如果final域是引用類型:
public class FinalReferenceExample {
final int[] intArray; //final是引用類型
static FinalReferenceExample obj;
public FinalReferenceExample () { //構(gòu)造函數(shù)
intArray = new int[1]; //1
intArray[0] = 1; //2
}
public static void writerOne () { //寫線程A執(zhí)行
obj = new FinalReferenceExample (); //3
}
...
}
這里final域為一個引用類型,它引用一個int型的數(shù)組對象。對于引用類型,寫final域的重排序規(guī)則對編譯器和處理器增加了如下約束:
在構(gòu)造函數(shù)內(nèi)對一個final引用的對象的成員域的寫入,與隨后在構(gòu)造函數(shù)外把這個被構(gòu)造對象的引用賦值給一個引用變量,這兩個操作之間不能重排序。
在上圖中,1是對final域的寫入,2是對這個final域引用的對象的成員域的寫入,3是把被構(gòu)造的對象的引用賦值給某個引用變量。這里除了前面提到的1不能和3重排序外,2和3也不能重排序。
為什么final引用不能從構(gòu)造函數(shù)內(nèi)“逸出”
前面我們提到過,寫final域的重排序規(guī)則可以確保:在引用變量為任意線程可見之前,該引用變量指向的對象的final域已經(jīng)在構(gòu)造函數(shù)中被正確初始化過了(構(gòu)造函數(shù)完成,對象引用才會產(chǎn)生)。其實要得到這個效果,還需要一個保證:在構(gòu)造函數(shù)內(nèi)部,不能讓這個被構(gòu)造對象的引用為其他線程可見,也就是對象引用不能在構(gòu)造函數(shù)中“逸出”。為了說明問題,讓我們來看下面示例代碼:
public class FinalReferenceEscapeExample {
final int i;
static FinalReferenceEscapeExample obj;
public FinalReferenceEscapeExample () {
i = 1; //1 寫final域
obj = this; //2 this引用在此“逸出”
}
public static void writer() {
new FinalReferenceEscapeExample ();
}
public static void reader {
if (obj != null) { //3
int temp = obj.i; //4
}
}
}
這里1和2可能會發(fā)生重排序,導(dǎo)致final域在被正確初始化之前對象引用就暴露了,從而在線程B的reader中訪問到未初始化的final域。
JSR-133為什么要增強(qiáng)final的語義
在舊的Java內(nèi)存模型中 ,最嚴(yán)重的一個缺陷就是線程可能看到final域的值會改變。比如,一個線程當(dāng)前看到一個整形final域的值為0(還未初始化之前的默認(rèn)值),過一段時間之后這個線程再去讀這個final域的值時,卻發(fā)現(xiàn)值變?yōu)榱?(被某個線程初始化之后的值)。最常見的例子就是在舊的Java內(nèi)存模型中,String的值可能會改變。
為了修補(bǔ)這個漏洞,JSR-133專家組增強(qiáng)了final的語義。通過為final域增加寫和讀重排序規(guī)則,可以為java程序員提供初始化安全保證:只要對象是正確構(gòu)造的(被構(gòu)造對象的引用在構(gòu)造函數(shù)中沒有“逸出”),那么不需要使用同步(指lock和volatile的使用),就可以保證任意線程都能看到這個final域在構(gòu)造函數(shù)中被初始化之后的值。
雙重檢查鎖定與延遲初始化
延遲初始化:推遲一些高開銷的對象初始化操作,并且只有在使用這些對象時才進(jìn)行初始化。
private static Instance instance;
public synchronized static Instance getInstance() {
if (instance == null) {
instance = new Instance();
}
return instance;
}
上面的方法雖然線程安全,但用synchronized將導(dǎo)致性能開銷。
一個“聰明”的技巧:雙重檢查鎖定:
public class DoubleCheckLocking {
private static Instance instance;
public static Instance getInstance() {
if (instance == null) {
synchronized(DoubleCheckLocking.class) {
if (instance == null) {
instance = new Instance(); // 問題的根源出在這里
}
}
}
return instance;
}
}
創(chuàng)建對象的過程instance = new Instance()可以分解為以下三步:
- memory = allocate(); // 分配對象的內(nèi)存空間
- ctorInstance(memory); // 初始化對象
- instance = memory; // 返回對象地址
- 初次訪問對象
其中,2和3可能會被重排序!重排序之后變成了:分配對象內(nèi)存空間,返回對象地址,初始化對象;(在單線程內(nèi),只要保證2排在4的前面執(zhí)行,單線程內(nèi)的執(zhí)行結(jié)果就不會被改變,這個重排序就是被允許的)
在多線程環(huán)境下,假設(shè)2和3發(fā)生重排序,那么一個未初始化的對象引用將從同步塊中“溢出”,另一個線程可能會通過instance訪問到這個未初始化的對象!
解決方案:
1,利用volatile的內(nèi)存語義來禁止重排序
private volatile static Instance instance;
根據(jù)volatile寫的內(nèi)存語義:volatile寫之前的操作禁止被重排序到volatile寫之后。這樣上面2和3之間的重排序?qū)唤?,問題根源得到解決。
2,利用類初始化的原子性
在執(zhí)行類的初始化期間,JVM會去獲取一個鎖。這個鎖可以同步多個線程對同一個類的初始化。
public class InstanceFactory {
private static class InstanceHolder {
public static Instance instance = new Instance();
}
public static Instance getInstance() {
return InstanceHolder.instance ; // 這里將導(dǎo)致 InstanceHolder 類被初始化
}
}

Java并發(fā)編程基礎(chǔ)
設(shè)置線程優(yōu)先級時,針對頻繁阻塞(休眠或IO操作)的線程需要設(shè)置較高的優(yōu)先級,而偏重計算的線程則設(shè)置較低的優(yōu)先級,確保處理器不會被獨占。
線程狀態(tài)變遷

可參考 鏈接
疑惑:貌似可以從等待態(tài)直接回到就緒/運(yùn)行態(tài),WHY / HOW?
另,書上一句話:
阻塞狀態(tài)是線程阻塞在進(jìn)入 synchronized 同步代碼塊或方法(獲取鎖)時的狀態(tài),但是阻塞在 java.concurrent 包中 Lock 接口的線程狀態(tài)卻是等待狀態(tài),因為 java.concurrent 包中的 Lock 接口對于阻塞的實現(xiàn)均使用了 LockSupport 類中的相關(guān)方法。
中斷
中斷可以理解為線程的一個標(biāo)識位屬性,它表示一個線程是否被其他線程進(jìn)行了中斷操作。
調(diào)用一個線程對象的interrupt()方法,只是將該線程的中斷標(biāo)識位設(shè)為true,并不是真的“中斷“了該線程。這個地方很容易迷惑人。
一個被中斷的線程(被調(diào)用了interrupt()方法)如何響應(yīng)中斷完全取決于該線程本身。
線程有兩種方法來判斷自己是否被中斷:
- 實例方法isInterrupted(),返回true/false,不對中斷標(biāo)識位復(fù)位;
- 靜態(tài)方法Thread.interrupted(),返回true/false,同時對中斷標(biāo)識位進(jìn)行復(fù)位;
Object.wait(),Thread.sleep(),Thread.join()等方法均聲明拋出InterruptedException異常,說明這些方法是可中斷的——這些方法在執(zhí)行時會不斷輪詢監(jiān)聽中斷標(biāo)識位,當(dāng)發(fā)現(xiàn)其為true時,會恢復(fù)中斷標(biāo)識位(即設(shè)為false),并拋出InterruptedException異常。
進(jìn)入synchronized塊和Lock.lock()等操作是不可被中斷的(不拋出中斷異常)。
安全地終止線程
輪詢中斷標(biāo)識位,或另設(shè)一個標(biāo)志:
public class Runner implements Runnable {
private volatile boolean on = true;
private long i;
@Override
public void run() {
while (on && !Thread.currentThread().isInterrupted()) {
i++;
}
System.out.println("Count i = " + i);
}
public void cancel() {
on = false;
}
}
Runner one = new Runner();
Thread t1 = new Thread(one);
t1.start();
...
t1.interrupt();
Runner two = new Runner();
new Thread(two).start();
...
two.cancel();
等待/通知機(jī)制
等待/通知的經(jīng)典范式:
synchronized(obj) {
while(條件不滿足) {
obj.wait();
}
處理邏輯;
}
synchronized(obj) {
改變條件;
obj.notifyAll();
}
在while循環(huán)中判斷條件并調(diào)用wait()是使用wait()的唯一正確方式——這樣能保證線程在睡眠前后都會檢查條件。
wait()返回的前提是當(dāng)前線程獲得鎖;返回后從wait()處繼續(xù)執(zhí)行。
注意一點:wait()會使當(dāng)前對象釋放鎖,notify() 和 notifyAll() 不會!
synchronized(obj) {
if (條件不滿足) {
obj.wait();
}
處理邏輯;
}
用 if 為什么錯了呢?
wait()的線程被其他線程用notify()或notifyAll()喚醒后,是需要先獲得鎖的(畢竟你是在synchronized塊里);如果在被喚醒到獲得鎖的這段時間內(nèi),條件又被另一個線程改變了,而你獲得鎖并從wait()方法返回后,直接跳出了 if 的條件判斷——這時條件是不滿足的,于是產(chǎn)生了邏輯錯誤。所以,線程在睡眠前后都需要檢查條件。
狀態(tài)轉(zhuǎn)換圖

線程調(diào)用wait()方法釋放鎖,進(jìn)入等待隊列,等待狀態(tài)(WAITING);被notify()/notifyAll()喚醒后,進(jìn)入同步隊列,變?yōu)樽枞麪顟B(tài)(BLOCKING);隨后可再次獲得鎖并從wait()返回繼續(xù)執(zhí)行。
管道輸入/輸出流
4種實現(xiàn):PipedOutputStream, PipedInputStream, PipedReader, PipedWriter
PipedWriter out = new PipedWriter();
PipedReader in = new PipedReader();
out.connect(in); // 將輸入流和輸出流進(jìn)行連接,否則在使用時會拋出IOException;
ThreadLocal
在main線程中定義一個ThreadLocal對象,在各個線程中訪問時,訪問到的是各個線程獨立的版本——并且是獨立初始化的ThreadLocal對象。
默認(rèn)情況下 initValue() 返回 null 。線程在沒有調(diào)用 set 之前,第一次調(diào)用 get 的時候, get 方法會默認(rèn)去調(diào)用 initValue 這個方法。所以如果沒有覆寫這個方法,可能導(dǎo)致 get 返回的是 null 。當(dāng)然如果調(diào)用過 set 就不會有這種情況了。但是往往在多線程情況下我們不能保證每個線程的在調(diào)用 get 之前都調(diào)用了 set ,所以最好對 initValue 進(jìn)行覆寫,以免導(dǎo)致空指針異常。
public class ConcurrentProgramming {
public static ThreadLocal<Integer> threadLocalInt = new ThreadLocal<Integer>() {
@Override
protected Integer initialValue() {
return 0;
}
};
// public static ThreadLocal<Integer> threadLocalInt = new ThreadLocal<>();
public static void main(String[] args) throws InterruptedException {
// threadLocalInt.set(0);
// System.out.println(threadLocalInt.get()); // 這里可以正常輸出,因為在當(dāng)前main線程中是先set,再get;
for (int i = 0; i < 2; i++) {
new Thread(new Worker()).start();
}
}
}
class Worker implements Runnable {
@Override
public void run() {
for (int i = 0; i < 5; i++) {
// 但在這里就報空指針錯了———— 所以,并不是共享的同一個ThreadLocal對象,而是每個線程new一個,對嗎?
ConcurrentProgramming.threadLocalInt.set(ConcurrentProgramming.threadLocalInt.get() + 1);
System.out.println(Thread.currentThread().getName() + ": " + ConcurrentProgramming.threadLocalInt.get());
}
}
}
output:
Thread-0: 1
Thread-1: 1
Thread-1: 2
Thread-1: 3
Thread-1: 4
Thread-0: 2
Thread-0: 3
Thread-0: 4
Thread-0: 5
Thread-1: 5
注意代碼中的注釋部分。沒有重寫initialValue()時,在main中set(0)然后get,沒有問題;但在另外兩個線程中的get卻報空指針異?!f明在main中set的值只在main線程中可見。
This class provides thread-local variables. These variables differ from their normal counterparts in that each thread that accesses one (via its get or set method) has its own, independently initialized copy of the variable.
—— 每個線程有自己的、獨立初始化的變量拷貝。
所以,每個線程會獨自new一個Threadlocal對象,只是共用了同一個變量名,或你寫的ThreadLocal匿名內(nèi)部類。
等待超時模式
開發(fā)人員經(jīng)常會遇到這樣的方法調(diào)用場景:調(diào)用一個方法時等待一段時間,如果該方法在給定的時間段內(nèi)能夠得到結(jié)果,那么將立刻返回;反之,超時返回默認(rèn)結(jié)果。
實現(xiàn)方式:在經(jīng)典的等待/通知模型的加鎖、條件循環(huán)、邏輯處理的基礎(chǔ)上作出非常小的改動:
public synchronized Object get(long mills) throws InterruptedException {
long future = System.currentTimeMillis() + mills;
long remaining = mills;
while ((result == null) && remaining > 0) {
wait(remaining);
remaining = future - System.currentTimeMillis();
}
return result;
}
(數(shù)據(jù)庫連接池示例、線程池示例 未)
Java中的鎖
Lock接口
- void lock() 獲取鎖,調(diào)用該方法當(dāng)前線程將會獲取鎖,當(dāng)鎖獲取后,該方法將返回。
- void lockInterruptibly() throws InterruptedException 可中斷獲取鎖,與lock()方法不同之處在于該方法會響應(yīng)中斷,即在鎖的獲取過程中可以中斷當(dāng)前線程
- boolean tryLock() 嘗試非阻塞的獲取鎖,調(diào)用該方法立即返回,true表示獲取到鎖
- boolean tryLock(long time,TimeUnit unit) throws InterruptedException 超時獲取鎖,以下情況會返回:時間內(nèi)獲取到了鎖,時間內(nèi)被中斷,時間到了沒有獲取到鎖。
- void unlock() 釋放鎖
- Condition newCondition() 獲取等待通知組件
隊列同步器
隊列同步器AbstractQueuedSynchronizer(AQS)是用來構(gòu)建鎖或者其他同步組件的基礎(chǔ)框架,它使用了一個int成員變量表示同步狀態(tài),通過內(nèi)置的FIFO隊列來完成資源獲取線程的排隊工作。下圖顯示了java.concurrent包的實現(xiàn)示意圖:

隊列同步器的實現(xiàn)依賴內(nèi)部的同步隊列來完成同步狀態(tài)的管理。它是一個FIFO的雙向隊列,當(dāng)線程獲取同步狀態(tài)失敗時,同步器會將當(dāng)前線程和等待狀態(tài)等信息包裝成一個節(jié)點并將其加入同步隊列,同時會阻塞當(dāng)前線程。當(dāng)同步狀態(tài)釋放時,會把首節(jié)點中的線程喚醒,使其再次嘗試獲取同步狀態(tài)。

共享式同步狀態(tài)獲取與釋放
共享式獲取與獨占式獲取最主要的區(qū)別在于同一時刻能否有多個線程同時獲取到同步狀態(tài)。以文件的讀寫為例,如果一個程序在對文件進(jìn)行讀操作,那么這一時刻對于該文件的寫操作均被阻塞,而讀操作能夠同時進(jìn)行。寫操作要求對資源的獨占式訪問,而讀操作可以是共享式訪問。

左半部分,共享式訪問資源時,其他共享式的訪問均被允許,而獨占式訪問被阻塞;右半部分是獨占式訪問資源時,同一時刻其他訪問均被阻塞。
重入鎖 ReentrantLock
重入鎖 ReentrantLock,顧名思義,就是支持重進(jìn)入的鎖,它表示該鎖能夠支持一個線程對資源的重復(fù)加鎖。除此之外,該鎖的還支持獲取鎖時的公平和非公平性選擇。
對于獨占鎖(Mutex),考慮如下場景:當(dāng)一個線程調(diào)用Mutex的lock()方法獲取鎖之后,如果再次調(diào)用lock()方法,則該線程將會被自己所阻塞,原因是Mutex在實現(xiàn)tryAcquire(int acquires)方法時沒有考慮占有鎖的線程再次獲取鎖的場景,而在調(diào)用tryAcquire(int acquires)方法時返回了false,導(dǎo)致該線程被阻塞。簡單地說,Mutex是一個不支持重進(jìn)入的鎖。
synchronized關(guān)鍵字隱式的支持重進(jìn)入,比如一個synchronized修飾的遞歸方法,在方法執(zhí)行時,執(zhí)行線程在獲取了鎖之后仍能連續(xù)多次地獲得該鎖,而不像Mutex由于獲取了鎖,而在下一次獲取鎖時出現(xiàn)阻塞自己的情況。
ReentrantLock雖然沒能像synchronized關(guān)鍵字一樣支持隱式的重進(jìn)入,但是在調(diào)用lock()方法時,已經(jīng)獲取到鎖的線程,能夠再次調(diào)用lock()方法獲取鎖而不被阻塞。
鎖獲取的公平性問題
公平性與否是針對獲取鎖而言的,如果一個鎖是公平的,那么鎖的獲取順序就應(yīng)該和鎖的請求順序一致,也就是FIFO。
非公平性鎖可能使線程“饑餓”,當(dāng)一個線程請求鎖時,只要獲取了同步狀態(tài)即成功獲取鎖。在這個前提下,剛釋放鎖的線程再次獲取同步狀態(tài)的幾率會非常大,使得其他線程只能在同步隊列中等待。
非公平鎖可能使線程“饑餓”,為什么它又被設(shè)定成默認(rèn)的實現(xiàn)呢?非公平性鎖模式下線程上下文切換的次數(shù)少,因此其性能開銷更小。公平性鎖保證了鎖的獲取按照FIFO原則,而代價是進(jìn)行大量的線程切換。非公平性鎖雖然可能造成線程“饑餓”,但極少的線程切換,保證了其更大的吞吐量。
讀寫鎖
在Java并發(fā)包中常用的鎖(如ReentrantLock),基本上都是排他鎖,這些鎖在同一時刻只允許一個線程進(jìn)行訪問,而讀寫鎖在同一時刻可以允許多個讀線程訪問,但是在寫線程訪問時,所有的讀線程和其他寫線程均被阻塞。讀寫鎖維護(hù)了一對鎖,一個讀鎖和一個寫鎖,通過分離讀鎖和寫鎖,使得并發(fā)性相比一般的排他鎖有了很大提升。
除了保證寫操作對讀操作的可見性以及并發(fā)性的提升之外,讀寫鎖能夠簡化讀寫交互場景的編程方式。假設(shè)在程序中定義一個共享的數(shù)據(jù)結(jié)構(gòu)用作緩存,它大部分時間提供讀服務(wù)(例如:查詢和搜索),而寫操作占有的時間很少,但是寫操作完成之后的更新需要對后續(xù)的讀服務(wù)可見。
在沒有讀寫鎖支持的(Java 5 之前)時候,如果需要完成上述工作就要使用Java的等待通知機(jī)制,就是當(dāng)寫操作開始時,所有晚于寫操作的讀操作均會進(jìn)入等待狀態(tài),只有寫操作完成并進(jìn)行通知之后,所有等待的讀操作才能繼續(xù)執(zhí)行(寫操作之間依靠synchronized關(guān)鍵字進(jìn)行同步),這樣做的目的是使讀操作都能讀取到正確的數(shù)據(jù),而不會出現(xiàn)臟讀。
改用讀寫鎖實現(xiàn)上述功能,只需要在讀操作時獲取讀鎖,而寫操作時獲取寫鎖即可,當(dāng)寫鎖被獲取到時,后續(xù)(非當(dāng)前寫操作線程)的讀寫操作都會被阻塞,寫鎖釋放之后,所有操作繼續(xù)執(zhí)行,編程方式相對于使用等待通知機(jī)制的實現(xiàn)方式而言,變得簡單明了。
一般情況下,讀寫鎖的性能都會比排它鎖要好,因為大多數(shù)場景讀是多于寫的。在讀多于寫的情況下,讀寫鎖能夠提供比排它鎖更好的并發(fā)性和吞吐量。Java并發(fā)包提供讀寫鎖的實現(xiàn)是ReentrantReadWriteLock。
ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
Lock r = rwl.readLock();
Lock w = rwl.writeLock();
Condition接口
任何一個Java對象,都擁有一組監(jiān)視器方法,主要包括wait()、notify()、notifyAll()方法,這些方法與synchronized關(guān)鍵字配合使用可以實現(xiàn)等待/通知模式。Condition接口也提供類似的Object的監(jiān)視器的方法,主要包括await()、signal()、signalAll()方法,這些方法與Lock鎖配合使用也可以實現(xiàn)等待/通知模式。
相比Object實現(xiàn)的監(jiān)視器方法,Condition接口的監(jiān)視器方法具有一些Object所沒有的特性:
- Condition接口可以支持多個等待隊列:一個Lock實例可以綁定多個Condition。
- Condition接口支持在等待時不響應(yīng)中斷:wait()是會響應(yīng)中斷的;
- Condition接口支持等待到將來的某個時間點返回(和awaitNanos(long)/wait(long)不同!):awaitUntil(Date deadline);
class BoundedBuffer {
final Lock lock = new ReentrantLock();// 鎖對象
final Condition notFull = lock.newCondition(); //寫線程條件
final Condition notEmpty = lock.newCondition();//讀線程條件
final Object[] items = new Object[100];// 初始化一個長度為100的隊列
int putptr/* 寫索引 */, takeptr/* 讀索引 */, count/* 隊列中存在的數(shù)據(jù)個數(shù) */;
public void put(Object x) throws InterruptedException {
lock.lock(); //獲取鎖
try {
while (count == items.length)
notFull.await();// 當(dāng)計數(shù)器count等于隊列的長度時,不能再插入,因此等待。阻塞寫線程。
items[putptr] = x;//賦值
putptr++;
if (putptr == items.length)
putptr = 0;// 若寫索引寫到隊列的最后一個位置了,將putptr置為0。
count++; // 每放入一個對象就將計數(shù)器加1。
notEmpty.signal(); // 一旦插入就喚醒取數(shù)據(jù)線程。
} finally {
lock.unlock(); // 最后釋放鎖
}
}
public Object take() throws InterruptedException {
lock.lock(); // 獲取鎖
try {
while (count == 0)
notEmpty.await(); // 如果計數(shù)器等于0則等待,即阻塞讀線程。
Object x = items[takeptr]; // 取值
takeptr++;
if (takeptr == items.length)
takeptr = 0; //若讀鎖應(yīng)讀到了隊列的最后一個位置了,則讀鎖應(yīng)置為0;即當(dāng)takeptr達(dá)到隊列長度時,從零開始取
count++; // 每取一個將計數(shù)器減1。
notFull.signal(); //枚取走一個就喚醒存線程。
return x;
} finally {
lock.unlock();// 釋放鎖
}
}
}
上面用了兩個Condition。(是不是很熟悉?王道,信號量,線程間同步)
等待隊列與同步隊列

在Object的監(jiān)視器模型上,一個對象擁有一個同步隊列和一個等待隊列,而并發(fā)包中的Lock(更確切的說是同步器)可以擁有一個同步隊列和多個等待多列。
Java并發(fā)容器和框架
ConcurrentHashMap
在并發(fā)環(huán)境下,HashMap的put操作會引起死循環(huán)。因為多線程會導(dǎo)致HashMap的Entry鏈表形成環(huán)形數(shù)據(jù)結(jié)構(gòu),使得Entry的next節(jié)點永遠(yuǎn)不為空。
HashTable容器使用synchronized來保證線程安全,但在線程競爭激烈的情況下HashTable的效率非常低下。因為當(dāng)一個線程訪問HashTable的同步方法時,其他線程訪問HashTable的同步方法時,可能會進(jìn)入阻塞或輪詢狀態(tài)。如線程1使用put進(jìn)行添加元素,線程2不但不能使用put方法添加元素,并且也不能使用get方法來獲取元素,所以競爭越激烈效率越低。
ConcurrentHashMap的鎖分段技術(shù)
HashTable容器在競爭激烈的并發(fā)環(huán)境下表現(xiàn)出效率低下的原因,是因為所有訪問HashTable的線程都必須競爭同一把鎖,那假如容器里有多把鎖,每一把鎖用于鎖容器其中一部分?jǐn)?shù)據(jù),那么當(dāng)多線程訪問容器里不同數(shù)據(jù)段的數(shù)據(jù)時,線程間就不會存在鎖競爭,從而可以有效的提高并發(fā)訪問效率,這就是ConcurrentHashMap所使用的鎖分段技術(shù),首先將數(shù)據(jù)分成一段一段的存儲,然后給每一段數(shù)據(jù)配一把鎖,當(dāng)一個線程占用鎖訪問其中一個段數(shù)據(jù)的時候,其他段的數(shù)據(jù)也能被其他線程訪問。

ConcurrentHashMap是由Segment數(shù)組結(jié)構(gòu)和HashEntry數(shù)組結(jié)構(gòu)組成。Segment是一種可重入鎖ReentrantLock,在ConcurrentHashMap里扮演鎖的角色,HashEntry則用于存儲鍵值對數(shù)據(jù)。
ConcurrentHashMap的get操作
Segment的get操作實現(xiàn)非常簡單和高效。先經(jīng)過一次再哈希,然后使用這個哈希值通過哈希運(yùn)算定位到segment,再通過哈希算法定位到元素,代碼如下:(兩次哈希)
public V get(Object key) {
int hash = hash(key.hashCode());
return segmentFor(hash).get(key, hash);
}
ConcurrentHashMap的Put操作
由于put方法里需要對共享變量進(jìn)行寫入操作,所以為了線程安全,在操作共享變量時必須得加鎖。Put方法首先定位到Segment,然后在Segment里進(jìn)行插入操作。插入操作需要經(jīng)歷兩個步驟,第一步判斷是否需要對Segment里的HashEntry數(shù)組進(jìn)行擴(kuò)容,第二步定位添加元素的位置然后放在HashEntry數(shù)組里。(擴(kuò)容的時候首先會創(chuàng)建一個兩倍于原容量的數(shù)組,然后將原數(shù)組里的元素進(jìn)行再hash后插入到新的數(shù)組里。為了高效ConcurrentHashMap不會對整個容器進(jìn)行擴(kuò)容,而只對某個segment進(jìn)行擴(kuò)容)
ConcurrentHashMap的size操作
如果我們要統(tǒng)計整個ConcurrentHashMap里元素的大小,就必須統(tǒng)計所有Segment里元素的大小后求和。Segment里的全局變量count是一個volatile變量,那么在多線程場景下,我們是不是直接把所有Segment的count相加就可以得到整個ConcurrentHashMap大小了呢?不是的,雖然相加時可以獲取每個Segment的count的最新值,但是拿到之后可能累加前使用的count發(fā)生了變化,那么統(tǒng)計結(jié)果就不準(zhǔn)了。所以最安全的做法,是在統(tǒng)計size的時候把所有Segment的put,remove和clean方法全部鎖住,但是這種做法顯然非常低效。
因為在累加count操作過程中,之前累加過的count發(fā)生變化的幾率非常小,所以ConcurrentHashMap的做法是先嘗試2次通過不鎖住Segment的方式來統(tǒng)計各個Segment大小,如果統(tǒng)計的過程中,容器的count發(fā)生了變化,則再采用加鎖的方式來統(tǒng)計所有Segment的大小。
并發(fā)隊列:ConcurrentLinkedQueue
用非阻塞的循環(huán)CAS方式實現(xiàn)。
Java中的阻塞隊列
阻塞隊列(BlockingQueue)是一個支持兩個附加操作的隊列。這兩個附加的操作是:在隊列為空時,獲取元素的線程會等待隊列變?yōu)榉强?。?dāng)隊列滿時,存儲元素的線程會等待隊列可用。
插入和移除操作的四種處理方式

- 拋出異常:是指當(dāng)阻塞隊列滿時候,再往隊列里插入元素,會拋出IllegalStateException(“Queue full”)異常。當(dāng)隊列為空時,從隊列里獲取元素時會拋出NoSuchElementException異常 。
- 返回特殊值:插入方法會返回是否成功,成功則返回true。移除方法,則是從隊列里拿出一個元素,如果沒有則返回null
- 一直阻塞:當(dāng)阻塞隊列滿時,如果生產(chǎn)者線程往隊列里put元素,隊列會一直阻塞生產(chǎn)者線程,直到拿到數(shù)據(jù),或者響應(yīng)中斷退出。當(dāng)隊列空時,消費(fèi)者線程試圖從隊列里take元素,隊列也會阻塞消費(fèi)者線程,直到隊列可用。
- 超時退出:當(dāng)阻塞隊列滿時,隊列會阻塞生產(chǎn)者線程一段時間,如果超過一定的時間,生產(chǎn)者線程就會退出。
Java里的阻塞隊列
- ArrayBlockingQueue :一個由數(shù)組結(jié)構(gòu)組成的有界阻塞隊列。
- LinkedBlockingQueue :一個由鏈表結(jié)構(gòu)組成的有界阻塞隊列。
- PriorityBlockingQueue :一個支持優(yōu)先級排序的無界阻塞隊列。
- DelayQueue:一個使用優(yōu)先級隊列實現(xiàn)的無界阻塞隊列;支持延時獲取元素——在創(chuàng)建元素時可以指定多久才能從隊列中取出當(dāng)前元素;
- SynchronousQueue:一個不存儲元素的阻塞隊列——每一個put操作必須等待一個take操作;
- LinkedTransferQueue:一個由鏈表結(jié)構(gòu)組成的無界阻塞隊列。
- LinkedBlockingDeque:一個由鏈表結(jié)構(gòu)組成的雙向阻塞隊列。
// 大小1000的、線程公平的阻塞隊列;
// 傳入了大小參數(shù),這就叫有界;
ArrayBlockingQueue fairQueue = new ArrayBlockingQueue(1000, true);
阻塞隊列的實現(xiàn)原理,見前面BoundedBuffer的代碼。(一個隊列,一個鎖,兩個Condition:notFull,notEmpty,等待通知模型)
Fork/Join框架

與MapReduce一致的思想。
ForkJoinTask(抽象類):我們要使用ForkJoin框架,必須首先創(chuàng)建一個ForkJoin任務(wù)。它提供在任務(wù)中執(zhí)行fork()和join()操作的機(jī)制。Fork/Join框架提供了以下兩個子類:
- RecursiveAction:用于沒有返回結(jié)果的任務(wù)。
- RecursiveTask :用于有返回結(jié)果的任務(wù)。
package com.xiao;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.RecursiveTask;
public class CountTask extends RecursiveTask<Integer> {
private static final int THRESHOLD = 2; // 閾值
private int start;
private int end;
public CountTask(int start, int end) {
this.start = start;
this.end = end;
}
@Override
protected Integer compute() {
int sum = 0;
// 如果任務(wù)足夠小就計算任務(wù)
boolean canCompute = (end - start) <= THRESHOLD;
if (canCompute) {
for (int i = start; i <= end; i++) {
sum += i;
}
} else {
//如果任務(wù)大于閥值,就分裂成兩個子任務(wù)計算
int middle = (start + end) / 2;
CountTask leftTask = new CountTask(start, middle);
CountTask rightTask = new CountTask(middle + 1, end);
//執(zhí)行子任務(wù)
leftTask.fork();
rightTask.fork();
//等待子任務(wù)執(zhí)行完,并得到其結(jié)果
int leftResult = leftTask.join();
int rightResult = rightTask.join();
//合并子任務(wù)
sum = leftResult + rightResult;
}
return sum;
}
public static void main(String[] args) {
ForkJoinPool forkJoinPool = new ForkJoinPool();
//生成一個計算任務(wù),負(fù)責(zé)計算1+2+3+4
CountTask task = new CountTask(1, 4);
//執(zhí)行一個任務(wù)
Future result = forkJoinPool.submit(task);
try {
System.out.println(result.get());
} catch (InterruptedException | ExecutionException e) {
}
}
}
Fork/Join框架的實現(xiàn)原理
ForkJoinPool由ForkJoinTask數(shù)組和ForkJoinWorkerThread數(shù)組組成,F(xiàn)orkJoinTask數(shù)組負(fù)責(zé)存放程序提交給ForkJoinPool的任務(wù),而ForkJoinWorkerThread數(shù)組負(fù)責(zé)執(zhí)行這些任務(wù)。(類似于線程池的實現(xiàn))
Java中的13個原子操作類
原子更新方式
- 原子更新基本類型
- 原子更新數(shù)組
- 原子更新引用
- 原子更新屬性(字段)
1,原子更新基本類型
- AtomicBoolean :原子更新布爾類型
- AtomicInteger: 原子更新整型
- AtomicLong: 原子更新長整型
2,原子更新數(shù)組
- AtomicIntegerArray :原子更新整型數(shù)組里的元素
- AtomicLongArray :原子更新長整型數(shù)組里的元素
- AtomicReferenceArray : 原子更新引用類型數(shù)組的元素
3,原子更新引用類型
- AtomicReference :原子更新引用類型
- AtomicReferenceFieldUpdater :原子更新引用類型里的字段
- AtomicMarkableReference:原子更新帶有標(biāo)記位的引用類型。可以原子更新一個布爾類型的標(biāo)記位和應(yīng)用類型
4,原子更新字段類
- AtomicIntegerFieldUpdater:原子更新整型的字段的更新器
- AtomicLongFieldUpdater:原子更新長整型字段的更新器
- AtomicStampedReference:原子更新帶有版本號的引用類型。該類將整型數(shù)值與引用關(guān)聯(lián)起來,可用于原子的更新數(shù)據(jù)和數(shù)據(jù)的版本號,可以解決使用CAS進(jìn)行原子更新時可能出現(xiàn)的ABA問題。
(恩,是個坑,需要踩)
Java中的并發(fā)工具類
CountDownLatch
(Latch:門閂)
用于等待其他線程完成操作。一個功能更強(qiáng)大的 join().
CountDownLatch c = new CountDownLatch(2); // 等待兩個[點]完成;
...
c.countDown(); // 第一個等待的操作完成;
...
c.countDown(); // 第二個等待的操作完成;
...
c.await(); // 等待兩個操作完成;
...
CountDownLatch(N)等待N個點完成;這里說的N個點,可以是N個線程,也可以是一個線程里的N個執(zhí)行步驟。
同步屏障:CyclicBarrier
讓一組線程到達(dá)一個屏障(也可以叫同步點)時被阻塞,直到最后一個線程到達(dá)屏障時,屏障才會打開,所有被屏障攔截的線程才會繼續(xù)運(yùn)行。
CyclicBarrier c = new CyclicBarrier(2); // 屏障會攔截/等待兩個線程;
// 在第一個線程中;
c.await(); // 當(dāng)前線程(執(zhí)行了某些操作后)到達(dá)屏障;
// 在第二個線程中;
c.await(); // 當(dāng)前線程(執(zhí)行了某些操作后)到達(dá)屏障;
CyclicBarrier和CountDownLatch的區(qū)別
CountDownLatch的計數(shù)器只能用一次,而CyclicBarrier的計數(shù)器可以使用reset()方法重置。所以CyclicBarrier可以處理更復(fù)雜的業(yè)務(wù)場景。例如,如果計算發(fā)生錯誤,可以重置計數(shù)器,并讓線程重新執(zhí)行一次。
控制并發(fā)線程數(shù)的Semaphore
信號量,用來控制同時訪問特定資源的線程數(shù)量。
Semaphore s = new Semaphore(10);
Executor threadPool = Executors.newFixedThreadPool(30);
for (int i = 0; i < 30; i++) {
threadPool.execute(new Runnable() {
@Override
public void run() {
try {
s.acquire();
System.out.println("Save Date");
s.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
在代碼中,雖然有30個線程在執(zhí)行,但只允許10個并發(fā)執(zhí)行。
線程間交換數(shù)據(jù)的Exchanger
Exchanger用于進(jìn)行線程間的數(shù)據(jù)交換。它提供一個同步點,在這個同步點,兩個線程可以交換彼此的數(shù)據(jù)。如果第一個線程先執(zhí)行exchange()方法,它會一直等待第二個線程也執(zhí)行exchange()方法,然后兩個線程交換數(shù)據(jù)。
Exchanger<String> exchanger = new Exchanger<>();
// 在線程A中;
try {
String B = exchanger.exchange("A's data");
} catch (InterruptedException e) {
e.printStackTrace();
}
// 在線程B中;
try {
String A = exchanger.exchange("B's data");
} catch (InterruptedException e) {
e.printStackTrace();
}
Java中的線程池
corePool
首先理解一個[corePool 核心池]的概念:核心池是一個線程池的基本/平均能力保障。在線程池的使用初期,隨著任務(wù)的提交,線程池會先盡快填滿核心池——提交一個任務(wù)就創(chuàng)建一個線程,即使核心池中有空閑的線程。如果線程池有溫度的話,核心池就是線程池的“常溫”。


線程池的創(chuàng)建
我們可以通過ThreadPoolExecutor來創(chuàng)建一個線程池。
new ThreadPoolExecutor(corePoolSize, maximumPoolSize,
keepAliveTime, milliseconds,runnableTaskQueue, threadFactory,handler);
- corePoolSize(線程池的基本大?。寒?dāng)提交一個任務(wù)到線程池時,線程池會創(chuàng)建一個線程來執(zhí)行任務(wù),即使其他空閑的基本線程能夠執(zhí)行新任務(wù)也會創(chuàng)建線程,等到需要執(zhí)行的任務(wù)數(shù)大于線程池基本大小時就不再創(chuàng)建。如果調(diào)用了線程池的prestartAllCoreThreads方法,線程池會提前創(chuàng)建并啟動所有基本線程。
- runnableTaskQueue(任務(wù)隊列):用于保存等待執(zhí)行的任務(wù)的阻塞隊列??梢赃x擇以下幾個阻塞隊列。
- ArrayBlockingQueue:是一個基于數(shù)組結(jié)構(gòu)的有界阻塞隊列,此隊列按 FIFO(先進(jìn)先出)原則對元素進(jìn)行排序。
- LinkedBlockingQueue:一個基于鏈表結(jié)構(gòu)的阻塞隊列,此隊列按FIFO (先進(jìn)先出) 排序元素,吞吐量通常要高于ArrayBlockingQueue。靜態(tài)工廠方法Executors.newFixedThreadPool()使用了這個隊列。
- SynchronousQueue:一個不存儲元素的阻塞隊列。每個插入操作(offer())必須等到另一個線程調(diào)用移除操作(poll()),否則插入操作一直處于阻塞狀態(tài),吞吐量通常要高于LinkedBlockingQueue,靜態(tài)工廠方法Executors.newCachedThreadPool使用了這個隊列。
- PriorityBlockingQueue:一個具有優(yōu)先級得無限阻塞隊列。
maximumPoolSize(線程池最大大小):線程池允許創(chuàng)建的最大線程數(shù)。如果隊列滿了,并且已創(chuàng)建的線程數(shù)小于最大線程數(shù),則線程池會再創(chuàng)建新的線程執(zhí)行任務(wù)。值得注意的是如果使用了無界的任務(wù)隊列這個參數(shù)就沒什么效果。
- ThreadFactory:用于設(shè)置創(chuàng)建線程的工廠,可以通過線程工廠給每個創(chuàng)建出來的線程設(shè)置更有意義的名字,Debug和定位問題時非常又幫助。
- RejectedExecutionHandler(飽和策略):當(dāng)隊列和線程池都滿了,說明線程池處于飽和狀態(tài),那么必須采取一種策略處理提交的新任務(wù)。這個策略默認(rèn)情況下是AbortPolicy,表示無法處理新任務(wù)時拋出異常。以下是JDK1.5提供的四種策略。
- AbortPolicy:直接拋出異常。
- CallerRunsPolicy:只用調(diào)用者所在線程來運(yùn)行任務(wù)。
- DiscardOldestPolicy:丟棄隊列里最近的一個任務(wù),并執(zhí)行當(dāng)前任務(wù)。
- DiscardPolicy:不處理,丟棄掉。
當(dāng)然也可以根據(jù)應(yīng)用場景需要來實現(xiàn)RejectedExecutionHandler接口自定義策略。如記錄日志或持久化不能處理的任務(wù)。
- keepAliveTime(線程活動保持時間):線程池的工作線程空閑后,保持存活的時間。所以如果任務(wù)很多,并且每個任務(wù)執(zhí)行的時間比較短,可以調(diào)大這個時間,提高線程的利用率。
- TimeUnit(線程活動保持時間的單位):可選的單位有天(DAYS),小時(HOURS),分鐘(MINUTES),毫秒(MILLISECONDS),微秒(MICROSECONDS, 千分之一毫秒)和毫微秒(NANOSECONDS, 千分之一微秒)。
提交任務(wù)
void execute(Runnable command) // 沒有返回值;
<T> Future<T> submit(Callable<T> task) // 有返回值的任務(wù);
關(guān)閉線程池
我們可以通過調(diào)用線程池的shutdown或shutdownNow方法來關(guān)閉線程池,但是它們的實現(xiàn)原理不同,shutdown的原理是只是將線程池的狀態(tài)設(shè)置成SHUTDOWN狀態(tài),然后中斷所有沒有正在執(zhí)行任務(wù)的線程。shutdownNow的原理是遍歷線程池中的工作線程,然后逐個調(diào)用線程的interrupt方法來中斷線程,所以無法響應(yīng)中斷的任務(wù)可能永遠(yuǎn)無法終止。shutdownNow會首先將線程池的狀態(tài)設(shè)置成STOP,然后嘗試停止所有的正在執(zhí)行或暫停任務(wù)的線程,并返回等待執(zhí)行任務(wù)的列表。
只要調(diào)用了這兩個關(guān)閉方法的其中一個,isShutdown方法就會返回true。當(dāng)所有的任務(wù)都已關(guān)閉后,才表示線程池關(guān)閉成功,這時調(diào)用isTerminaed方法會返回true。至于我們應(yīng)該調(diào)用哪一種方法來關(guān)閉線程池,應(yīng)該由提交到線程池的任務(wù)特性決定,通常調(diào)用shutdown來關(guān)閉線程池,如果任務(wù)不一定要執(zhí)行完,則可以調(diào)用shutdownNow。
合理的配置線程池
要想合理的配置線程池,就必須首先分析任務(wù)特性,可以從以下幾個角度來進(jìn)行分析:
- 任務(wù)的性質(zhì):CPU密集型任務(wù),IO密集型任務(wù)和混合型任務(wù)。
- 任務(wù)的優(yōu)先級:高,中和低。
- 任務(wù)的執(zhí)行時間:長,中和短。
- 任務(wù)的依賴性:是否依賴其他系統(tǒng)資源,如數(shù)據(jù)庫連接。
任務(wù)性質(zhì)不同的任務(wù)可以用不同規(guī)模的線程池分開處理。CPU密集型任務(wù)配置盡可能少的線程數(shù)量,如配置Ncpu+1個線程的線程池。IO密集型任務(wù)則由于需要等待IO操作,線程并不是一直在執(zhí)行任務(wù),則配置盡可能多的線程,如2*Ncpu?;旌闲偷娜蝿?wù),如果可以拆分,則將其拆分成一個CPU密集型任務(wù)和一個IO密集型任務(wù),只要這兩個任務(wù)執(zhí)行的時間相差不是太大,那么分解后執(zhí)行的吞吐率要高于串行執(zhí)行的吞吐率,如果這兩個任務(wù)執(zhí)行時間相差太大,則沒必要進(jìn)行分解。我們可以通過Runtime.getRuntime().availableProcessors()方法獲得當(dāng)前設(shè)備的CPU個數(shù)。
優(yōu)先級不同的任務(wù)可以使用優(yōu)先級隊列PriorityBlockingQueue來處理。它可以讓優(yōu)先級高的任務(wù)先得到執(zhí)行,需要注意的是如果一直有優(yōu)先級高的任務(wù)提交到隊列里,那么優(yōu)先級低的任務(wù)可能永遠(yuǎn)不能執(zhí)行。
執(zhí)行時間不同的任務(wù)可以交給不同規(guī)模的線程池來處理,或者也可以使用優(yōu)先級隊列,讓執(zhí)行時間短的任務(wù)先執(zhí)行。
依賴數(shù)據(jù)庫連接池的任務(wù),因為線程提交SQL后需要等待數(shù)據(jù)庫返回結(jié)果,如果等待的時間越長CPU空閑時間就越長,那么線程數(shù)應(yīng)該設(shè)置越大,這樣才能更好的利用CPU。
建議使用有界隊列,有界隊列能增加系統(tǒng)的穩(wěn)定性和預(yù)警能力,可以根據(jù)需要設(shè)大一點,比如幾千。有一次我們組使用的后臺任務(wù)線程池的隊列和線程池全滿了,不斷的拋出拋棄任務(wù)的異常,通過排查發(fā)現(xiàn)是數(shù)據(jù)庫出現(xiàn)了問題,導(dǎo)致執(zhí)行SQL變得非常緩慢,因為后臺任務(wù)線程池里的任務(wù)全是需要向數(shù)據(jù)庫查詢和插入數(shù)據(jù)的,所以導(dǎo)致線程池里的工作線程全部阻塞住,任務(wù)積壓在線程池里。如果當(dāng)時我們設(shè)置成無界隊列,線程池的隊列就會越來越多,有可能會撐滿內(nèi)存,導(dǎo)致整個系統(tǒng)不可用,而不只是后臺任務(wù)出現(xiàn)問題。當(dāng)然我們的系統(tǒng)所有的任務(wù)是用的單獨的服務(wù)器部署的,而我們使用不同規(guī)模的線程池跑不同類型的任務(wù),但是出現(xiàn)這樣問題時也會影響到其他任務(wù)。
線程池的監(jiān)控
通過線程池提供的參數(shù)進(jìn)行監(jiān)控。線程池里有一些屬性在監(jiān)控線程池的時候可以使用
- taskCount:線程池需要執(zhí)行的任務(wù)數(shù)量。
- completedTaskCount:線程池在運(yùn)行過程中已完成的任務(wù)數(shù)量。小于或等于taskCount。
- largestPoolSize:線程池曾經(jīng)創(chuàng)建過的最大線程數(shù)量。通過這個數(shù)據(jù)可以知道線程池是否滿過。如等于線程池的最大大小,則表示線程池曾經(jīng)滿了。
- getPoolSize:線程池的線程數(shù)量。如果線程池不銷毀的話,池里的線程不會自動銷毀,所以這個大小只增不減。
- getActiveCount:獲取活動的線程數(shù)。
通過擴(kuò)展線程池進(jìn)行監(jiān)控。通過繼承線程池并重寫線程池的beforeExecute,afterExecute和terminated方法,我們可以在任務(wù)執(zhí)行前,執(zhí)行后和線程池關(guān)閉前干一些事情。如監(jiān)控任務(wù)的平均執(zhí)行時間,最大執(zhí)行時間和最小執(zhí)行時間等。這幾個方法在線程池里是空方法。
Executor框架
Executor框架的結(jié)構(gòu)和成員

Executor框架主要由3大部分組成如下:
- 任務(wù):Runnable接口和Callable接口;
- 任務(wù)的執(zhí)行:Executor接口,繼承Executor的ExecutorService接口,以及ExecutorService接口的兩個實現(xiàn)類ThreadPoolExecutor和ScheduledThreadPoolExecutor;以及一個工具類:Executors;
- 異步計算的結(jié)果:Future接口和Future接口的實現(xiàn)類FutureTask;
ThreadPoolExecutor
ThreadPoolExecutor通常由工廠類Executors來創(chuàng)建。Executors可以創(chuàng)建3種類型的ThreadPoolExecutor:SingleThreadExecutor,F(xiàn)ixedThreadPool,CachedThreadPool;
FixedThreadPool是使用固定線程數(shù)的線程池,Executors提供的API有如下兩個:
public static ExecutorService newFixedThreadPool(int nThreads);public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory theadFactory);
public static ExecutorService newFixedThreadPool(int nThreads){
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
// corePoolSize和maximumPoolSize都設(shè)為nThreads;
// 空閑線程的存活時間為0,意味著多余的空閑線程會立即死亡;
// 使用無界的LinkedBlockingQueue,不會拒絕任務(wù);
FixedThreadPool滿足了資源管理的需求,可以限制當(dāng)前線程數(shù)量。適用于負(fù)載較重的服務(wù)器環(huán)境。
SingleThreadExecutor使用單線程執(zhí)行任務(wù),Executors提供的API有如下兩個:
public static ExecutorService newSingleThreadExecutor();public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory);
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
// corePoolSize和maximumPoolSize均為1;
// 多余的空閑線程立即死亡;
// 不拒絕任務(wù);
SingleThreadExecutor保證了任務(wù)執(zhí)行的順序,不會存在多線程活動。
CachedThreadPool是無界線程池,Executors提供的API有如下兩個:
public static ExecutorService newCachedThreadPool();public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory);
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
// 線程池大小不限;
// 多余的空閑線程存活60s;
// 使用不存儲元素的SynchronousQueue作為線程池的任務(wù)隊列,一個offer操作必須等待另一個線程的poll操作;如果主線程提交任務(wù)的速度高于線程池中處理任務(wù)的速度,CachedThreadPool會不斷創(chuàng)建新線程;極端情況下,可能會因為創(chuàng)建過多的線程而耗盡CPU和內(nèi)存資源;
CachedThreadPool適用于執(zhí)行很多短期異步任務(wù)的小程序,適用于負(fù)載較輕的服務(wù)器。
ScheduledThreadPoolExecutor
它是ThreadPoolExecutor的子類且實現(xiàn)了ScheduledExecutorService接口,它可以在給定的延遲時間后執(zhí)行命令,或者定期執(zhí)行命令,它比Timer更強(qiáng)大更靈活。
Executors可以創(chuàng)建的ScheduledThreadPoolExecutor的類型有ScheduledThreadPoolExecutor和SingleThreadScheduledExecutor等
ScheduledThreadPoolExecutor具有固定線程個數(shù),適用于需要多個后臺線程執(zhí)行周期任務(wù),并且為了滿足資源管理需求而限制后臺線程數(shù)量的場景,Executors中提供的API有如下兩個:
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize);public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize, ThreadFactory threadFactory);
SingleThreadScheduledExecutor具有單個線程,Executors提供的創(chuàng)建API有如下兩個:
public static ScheduledExecutorService newSingleThreadScheduledExecutor();public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory);
它適用于單個后臺線程執(zhí)行周期任務(wù),并且保證順序一致執(zhí)行的場景。
ScheduledThreadPoolExecutor
在給定延遲之后執(zhí)行任務(wù),或者定期執(zhí)行任務(wù)。ScheduledThreadPoolExecutor的功能與Timer類似,但更強(qiáng)大、更靈活。Timer對應(yīng)的是單個后臺線程,而ScheduledThreadPoolExecutor可以在構(gòu)造函數(shù)中指定多個對應(yīng)的后臺線程數(shù)。

ScheduledThreadPoolExecutor中線程執(zhí)行某個周期任務(wù)的4個步驟:
步驟1:線程1從工作隊列DelayQueue中獲取已到期的task;
步驟2:線程1執(zhí)行該task;
步驟3:線程1修改ScheduledFutureTask的time變量為下次被執(zhí)行的時間;
步驟4:線程1將修改后的task重新放回DelayQueue中。
FutureTask類
Runnable接口:
@FunctionalInterface
public interface Runnable {
public abstract void run();
}
Callable接口(可以有返回值,可以拋出異常):
@FunctionalInterface
public interface Callable<V> {
V call() throws Exception;
}
Future接口:
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
FutureTask類的構(gòu)造方法:
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
ExecutorService的3個submit()方法都返回Future:
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result); // 執(zhí)行成功返回指定的值result;
Future<?> submit(Runnable task); // 線程執(zhí)行成功返回null;
Callable和Future的普通用法:
Callable<Integer> callable = new Callable<Integer>() {
public Integer call() throws Exception {
return new Random().nextInt(100);
}
};
FutureTask<Integer> future = new FutureTask<Integer>(callable);
new Thread(future).start();
int result = future.get();
Executors
