第三章 Java內(nèi)存模型
3.1 Java內(nèi)存模型的基礎(chǔ)
- 通信
在共享內(nèi)存的模型里,通過寫-讀內(nèi)存中的公共狀態(tài)進(jìn)行隱式通信;在消息傳遞的并發(fā)模型里,線程之間必須通過發(fā)送消息來進(jìn)行顯示的通信。 - 同步
在共享內(nèi)存并發(fā)模型里,同步是顯示進(jìn)行的,程序員必須顯示指定某個(gè)方法或者某段代碼需要在線程之間互斥執(zhí)行;在消息傳遞的并發(fā)模型里,由于消息的發(fā)送必須在接收之前,因此同步是隱式進(jìn)行的。
在Java中,所有實(shí)例域、靜態(tài)域和數(shù)組元素都存儲在堆內(nèi)存中,堆內(nèi)存在線程之間共享;局部變量、方法定義參數(shù)和異常處理器參數(shù)不會在線程之間共享。
從抽象角度來看,JMM定義了線程和主內(nèi)存之間的抽象關(guān)系:線程之間的共享變量存儲在主內(nèi)存中,每個(gè)線程都有一個(gè)私有的本地內(nèi)存,本地內(nèi)存涵蓋了緩存、寫緩沖區(qū)、寄存器以及其它的硬件和編譯器優(yōu)化。
JMM通過控制主內(nèi)存與每個(gè)線程的本地內(nèi)存之間的交互,來為Java程序員提供內(nèi)存可見性保證。
重排序
指編譯器和處理器為了優(yōu)化程序性能而對指令序列進(jìn)行重新排序的一種手段:
- 編譯器優(yōu)化的重排序:編譯器在不改變單線程程序語義的前提下,重新安排語句的執(zhí)行順序。
- 處理器的指令級并行的重排序:如果不存在數(shù)據(jù)依賴性,處理器可以改變語句對應(yīng)機(jī)器指令的執(zhí)行順序。
- 內(nèi)存系統(tǒng)的重排序:由于處理器使用緩存和讀/寫緩沖區(qū),這使得加載和存儲操作看上去可能是在亂序執(zhí)行。
JMM的編譯器重新排序規(guī)則會禁止特定類型的編譯器重排序,對于處理器重排序,JMM的處理器重排序規(guī)則會要求Java編譯器在生成指令時(shí),插入特定類型的內(nèi)存屏障。
現(xiàn)代的處理器使用寫緩沖區(qū)臨時(shí)保存向內(nèi)存寫入的數(shù)據(jù),但每個(gè)處理器上的寫緩沖區(qū),僅僅對它所在的處理器可見。
由于寫緩沖區(qū)僅對自己的處理器可見,它會導(dǎo)致處理器執(zhí)行內(nèi)存操作的順序可能會與內(nèi)存實(shí)際的操作執(zhí)行順序不一致,由于現(xiàn)代的處理器都會使用寫緩沖區(qū),因此現(xiàn)代的處理器都會允許對寫-讀操作進(jìn)行重排序,但不允許對存在數(shù)據(jù)依賴的操作做重排序。
happens-before簡介
用來闡述操作之間的內(nèi)存可見性,如果一個(gè)操作執(zhí)行的結(jié)果需要對另一個(gè)操作可見,那么這兩個(gè)操作必須要存在happens-before關(guān)系,這兩個(gè)操作既可以在一個(gè)線程之內(nèi),也可以在不同線程之間,但并不等于前一個(gè)操作必須要在后一個(gè)操作之前執(zhí)行。
數(shù)據(jù)依賴性
編譯器和處理器不會改變存在數(shù)據(jù)依賴關(guān)系的兩個(gè)操作的執(zhí)行順序,但是僅針對單個(gè)處理器中執(zhí)行的指令序列和單個(gè)線程中執(zhí)行的操作。
as-if-serial
無論怎么重排序,單線程程序的執(zhí)行結(jié)果不能改變。
在單線程中,對存在控制依賴的操作重排序,不會改變執(zhí)行結(jié)果;但在多線程程序中,對存在控制依賴的操作重排序,可能會改變程序的執(zhí)行結(jié)果。
順序一致性
順序一致性是一個(gè)理論參考模型,在設(shè)計(jì)的時(shí)候,處理器的內(nèi)存模型和編程語言的內(nèi)存模型都會以順序一致性內(nèi)存作為參照。
如果程序是正確同步的,程序的執(zhí)行將具有順序一致性:即程序的執(zhí)行結(jié)果與該程序在順序一致性內(nèi)存模型中的執(zhí)行結(jié)果相同。
如果程序是正確同步的,程序的執(zhí)行將具有順序一致性:即程序的執(zhí)行結(jié)果與該程序在順序一致性內(nèi)存模型中的執(zhí)行結(jié)果相同。
順序一致模型有兩大特性:
- 一個(gè)線程中的所有操作必須按照程序的順序來執(zhí)行。
- 所有線程都只能看到一個(gè)單一的操作執(zhí)行順序,在順序一致性內(nèi)存模型中,每個(gè)操作都必須原子執(zhí)行且立刻對所有線程可見。
對于未同步或未正確同步的多線程程序,JMM只提供最小安全性:線程執(zhí)行時(shí)讀取到的值,要么是之前某個(gè)線程寫入的值,要么是默認(rèn)值。
JMM不保證未同步程序的執(zhí)行結(jié)果與該程序在順序一致性模型中的執(zhí)行結(jié)果一致。
未同步程序在兩個(gè)模型中的執(zhí)行特征有如下差異:
- 順序一致性模型保證單線程內(nèi)的操作會按程序的順序執(zhí)行,而
JMM不保證單線程內(nèi)的操作會按程序的順序執(zhí)行。 - 順序一致性模型保證所有線程只能看到一致的操作執(zhí)行順序,而
JMM不保證所有線程能看到一致的操作執(zhí)行順序。 -
JMM不保證對64位的long/double型變量的寫操作具有原子性,而順序一致性模型保證對所有內(nèi)存讀/寫操作都具有原子性。
第四章 Java并發(fā)編程基礎(chǔ)
- 現(xiàn)代操作系統(tǒng)調(diào)度的最小單元是線程,也叫輕量級進(jìn)程,在一個(gè)進(jìn)程里可以創(chuàng)建多個(gè)線程,這些線程都擁有各自的計(jì)數(shù)器、堆棧和局部變量等特性,并且能夠訪問共享的內(nèi)存變量。
- 設(shè)置線程優(yōu)先級時(shí),針對頻繁阻塞(休眠或者
I/O操作)的線程需要設(shè)置較高優(yōu)先級,而偏重計(jì)算(需要較多CPU時(shí)間或者偏運(yùn)算)的線程則設(shè)置較低的優(yōu)先級,確保處理器不會被獨(dú)占。 - 線程在運(yùn)行的生命周期中可能處于以下6種不同的狀態(tài):
-
New:初始狀態(tài),線程被創(chuàng)建,但是沒有調(diào)用start()方法。 -
Runnable:運(yùn)行狀態(tài),Java線程將操作系統(tǒng)中的就緒和運(yùn)行兩種狀態(tài)統(tǒng)稱為“運(yùn)行中”。 -
Blocked:阻塞狀態(tài),表示線程阻塞于鎖。 -
Waiting:等待狀態(tài),表示線程進(jìn)入等待狀態(tài),進(jìn)入該狀態(tài)表示當(dāng)前線程需要等待其它線程做出一些指定動作(通知或中斷)。 -
Time_Waiting:超時(shí)等待狀態(tài),可以在指定的時(shí)間自行返回。 -
Terminated:終止?fàn)顟B(tài),表示當(dāng)前線程已經(jīng)執(zhí)行完畢。 - 中斷可以理解為線程的一個(gè)標(biāo)識位屬性,它標(biāo)識一個(gè)運(yùn)行中的線程是否被其它線程進(jìn)行了中斷操作。中斷好比其他線程對該線程打了一個(gè)招呼,其他線程通過調(diào)用該線程的
interrupt()方法對其進(jìn)行中斷操作。 - 線程通過檢查自身是否被中斷來進(jìn)行響應(yīng),線程通過方法
isInterrupt來進(jìn)行判斷是否被中斷,也可以調(diào)用靜態(tài)方法Thread.interrupt對當(dāng)前線程的中斷標(biāo)識位進(jìn)行復(fù)位,如果該線程已經(jīng)處于終止?fàn)顟B(tài),即使該線程被中斷過,在調(diào)用該線程對象的isInterrupt時(shí)依舊返回false。 - 在拋出
InterruptedException異常之前,Java虛擬機(jī)會先將該線程的中斷標(biāo)識位清除。 - 中斷狀態(tài)是線程的一個(gè)標(biāo)識位,而中斷操作是一種簡便的線程間交互方式,而這種交互方式最適合用來取消或停止任務(wù),除了中斷之外,還可以利用一個(gè)
boolean變量來控制是否需要停止任務(wù)并終止該線程。 -
Java支持多個(gè)線程同時(shí)訪問一個(gè)對象或者對象的成員變量,由于每個(gè)線程可以擁有這個(gè)變量的拷貝,所以在程序的執(zhí)行過程中,一個(gè)線程看到的變量并不一定是最新的。 -
volatile可以用來修飾字段,就是告知程序任何對該變量的訪問需要從共享內(nèi)存中獲取,而對它的改變必須同步刷新回共享內(nèi)存,它能保證所有線程對變量訪問的可見性。 -
synchronized可以修飾方法或者以同步塊的形式來進(jìn)行使用,它主要確保多個(gè)線程在同一時(shí)刻,只能有一個(gè)線程處于方法或者同步塊中,它保證了線程對變量訪問的可見性和排他性。 - 任意線程對
Object(Object由synchronized保護(hù))的訪問,首先要獲得Object的監(jiān)視器,如果獲取失敗,線程進(jìn)入同步隊(duì)列,線程狀態(tài)變?yōu)?code>Blocked,當(dāng)訪問Object的前驅(qū)(獲得了鎖的線程)釋放了鎖,則該釋放操作喚醒阻塞在同步隊(duì)列中的線程,使其重新嘗試對監(jiān)視器的獲取。 - 等待/通知的相關(guān)方法:
-
notify():通知一個(gè)在對象上等待的線程,使其從wait()方法返回,而返回的前提是該線程獲取到了對象上的鎖。 -
notifyAll():通知所有等待在該對象上的鎖。 -
wait():調(diào)用該方法的線程進(jìn)入Waiting狀態(tài),只有等待另外線程的通知或被中斷才會返回,調(diào)用wait()方法后,會釋放對象的鎖。 -
wait(long):超時(shí)等待一段時(shí)間,如果沒有通知就返回。 -
wait(long, int):對于超時(shí)時(shí)間更精細(xì)粒度的控制,可以達(dá)到納秒。 - 兩個(gè)線程通過對象來完成交互,而對象上的
wait和notify/notifyAll()的關(guān)系就如同開關(guān)信號一樣,用來完成等待方和通知方之間的交互工作。 - 等待/通知的經(jīng)典范式:
- 等待方
(1) 獲取對象的鎖。
(2) 如果條件不滿足,那么調(diào)用對象的wait()方法,被通知后仍要檢查條件。
(3) 條件滿足則執(zhí)行對應(yīng)的邏輯。
synchronized(對象) {
while(條件不滿足) {
對象.wait();
}
對應(yīng)的處理邏輯;
}
- 通知方
(1) 獲得對象的鎖
(2) 改變條件
(3) 通知所有等待在該對象上的線程。
synchronized(對象) {
改變條件;
對象.notifyAll();
}
- 管道輸入/輸出流用于線程之間的數(shù)據(jù)傳輸,而傳輸?shù)拿浇闉閮?nèi)存,主要包括了以下4種實(shí)現(xiàn):
PipedOutputStream、PipeInputStream、PipedReader、PipedWriter,前兩種面向字節(jié),后兩種面向字符。 - 如果一個(gè)線程
A執(zhí)行了Thread.join(),其含義是:當(dāng)前線程A等待Thread線程終止后,才從Thread.join返回,線程Thread除了提供join()方法外,還提供了join(long millis)和join(long millis, int nanos)兩個(gè)具備超時(shí)特性的方法,如果在給定的超時(shí)時(shí)間內(nèi)沒有終止,那么將會從超時(shí)方法中返回。 -
ThreadLocal,即線程變量,是一個(gè)以ThreadLocal對象為鍵、任意對象為值的存儲結(jié)構(gòu),這個(gè)結(jié)構(gòu)被附帶在線程上,也就是說一個(gè)線程可以根據(jù)一個(gè)ThreadLocal對象查詢到綁定在這個(gè)線程上的一個(gè)值,可以通過set(T)方法來設(shè)置一個(gè)值,在當(dāng)前線程下再通過get()方法獲取到原先設(shè)置的值。
第五章 Java中的鎖
5.1 Lock接口
鎖是用來控制多個(gè)線程訪問共享資源的方式,雖然它缺少了隱式獲取釋放鎖的便捷性,但是卻擁有了鎖獲取與釋放的可操作性、可中斷地獲取鎖以及超時(shí)獲取鎖等多種
synchronized關(guān)鍵字不具備的同步特性。在
finally塊中釋放鎖,目的是保證在獲取到鎖之后,最終能夠被釋放。Lock接口提供的synchronized關(guān)鍵字不具備的主要特性嘗試非阻塞地獲取鎖:當(dāng)前線程嘗試獲取鎖,如果這一時(shí)刻沒有被其它線程獲取到,則成功獲取并持有鎖。
能被中斷地獲取鎖:與
synchronized不同,獲取到鎖的線程能夠響應(yīng)中斷,當(dāng)獲取到鎖的線程被中斷時(shí),中斷異常將會被拋出,同時(shí)鎖會被釋放。在指定的截止時(shí)間之前獲取鎖:如果截止時(shí)間到了仍舊無法獲取鎖,則返回。
Lock的APIvoid lock():獲取鎖,調(diào)用該方法當(dāng)前線程將會獲取鎖,當(dāng)鎖獲得后,從該方法返回。void lockInterruptibly():可中斷地獲取鎖,該方法會響應(yīng)中斷,即在鎖的獲取中可以中斷當(dāng)前線程。boolean tryLock():嘗試非阻塞地獲取鎖,調(diào)用該方法后立刻返回,如果能夠獲取則返回true,否則返回false。boolean tryLock(long time, TimeUnit unit) throws InterruptedException:當(dāng)前線程在超時(shí)時(shí)間內(nèi)獲得了鎖;當(dāng)前線程在超時(shí)時(shí)間內(nèi)被中斷;超時(shí)時(shí)間結(jié)束,返回false。void unlock():釋放鎖。Condition newCondition():獲取等待/通知組件,該組件和當(dāng)前的鎖綁定,當(dāng)前線程只有獲得了鎖,才能調(diào)用該組件的wait()方法,而調(diào)用后,當(dāng)前線程將釋放鎖。
5.2 隊(duì)列同步器
5.2.1 隊(duì)列同步器接口
- 隊(duì)列同步器
AbstractQueuedSynchronizer,是用來構(gòu)建鎖或者其它同步組件的基礎(chǔ)框架,它使用了一個(gè)int成員變量表示同步狀態(tài),通過內(nèi)置的FIFO隊(duì)列來完成資源獲取線程的排隊(duì)工作。 - 同步器是實(shí)現(xiàn)鎖的關(guān)鍵,在鎖的實(shí)現(xiàn)中聚合同步器,利用同步器實(shí)現(xiàn)鎖的語義。可以理解二者之間的關(guān)系:鎖是面向使用者的,它定義了使用者與鎖交互的接口,隱藏了實(shí)現(xiàn)細(xì)節(jié);同步器面向鎖的實(shí)現(xiàn)者,它簡化了鎖的實(shí)現(xiàn)方式,屏蔽了同步狀態(tài)管理、線程的排隊(duì)、等待與喚醒等底層操作。鎖和同步器很好地隔離了使用者和實(shí)現(xiàn)者所需關(guān)注地領(lǐng)域。
- 同步器的設(shè)計(jì)是基于模板方法模式,使用者需要繼承同步器并重寫指定的方法,隨后將同步器組合在自定義同步組件的實(shí)現(xiàn)中,并調(diào)用同步器的模板方法,而這些模板方法將會調(diào)用使用者重載的方法。
- 重寫同步器指定的方法時(shí),需要使用同步器提供的3個(gè)方法來訪問或者修改同步狀態(tài):
-
getState():獲取當(dāng)前同步狀態(tài)。 -
setState(int newState):設(shè)置當(dāng)前同步狀態(tài)。 -
compareAndSetState(int except, int update):使用CAS設(shè)置當(dāng)前狀態(tài),該方法能夠保證狀態(tài)設(shè)置的原始性。 - 同步器提供的模板方法基本上分為以下3類:
- 獨(dú)占式獲取與釋放同步狀態(tài)
- 共享式獲取與釋放同步狀態(tài)
- 查詢同步隊(duì)列中的等待線程情況。
5.2.2 隊(duì)列同步器的實(shí)現(xiàn)分析
5.2.2.1 同步隊(duì)列
- 同步器依賴內(nèi)部的同步隊(duì)列來完成同步狀態(tài)的管理,當(dāng)前線程獲取同步狀態(tài)失敗時(shí),同步器會將當(dāng)前線程以及等待狀態(tài)等信息構(gòu)造稱為一個(gè)節(jié)點(diǎn),并將其加入同步隊(duì)列,同時(shí)會阻塞當(dāng)前線程,當(dāng)同步狀態(tài)釋放時(shí),會把首節(jié)點(diǎn)中的線程喚醒,使其再次嘗試獲取同步狀態(tài)。
- 同步器中包含了兩個(gè)節(jié)點(diǎn)類型的引用,一個(gè)指向頭節(jié)點(diǎn),而另一個(gè)指向尾節(jié)點(diǎn)。
- 當(dāng)一個(gè)線程成功地獲取了同步狀態(tài),其他線程將無法獲取到同步狀態(tài),轉(zhuǎn)而被構(gòu)造成為節(jié)點(diǎn)并加入到同步隊(duì)列當(dāng)中,而這個(gè)加入到隊(duì)列地過程必須要保證線程安全,因此同步器提供了一個(gè)基于
CAS的設(shè)置尾節(jié)點(diǎn)的方法。 - 同步隊(duì)列遵循
FIFO,首節(jié)點(diǎn)是獲取同步狀態(tài)成功的節(jié)點(diǎn),首節(jié)點(diǎn)的線程在釋放同步狀態(tài)時(shí),將會喚醒后繼節(jié)點(diǎn),而后繼節(jié)點(diǎn)將會在獲取同步狀態(tài)成功時(shí)將自己設(shè)置為首節(jié)點(diǎn)。
5.2.2.2 獨(dú)占式同步狀態(tài)獲取與釋放
- 通過調(diào)用同步器的
acquire(int arg)方法可以獲取同步狀態(tài),該方法對中斷不敏感,即由于線程獲取同步狀態(tài)失敗而進(jìn)入同步隊(duì)列后,后續(xù)對線程進(jìn)行中斷操作時(shí),線程不會從同步隊(duì)列中移除。
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
它的主要邏輯是:
- (1)調(diào)用自定義同步器實(shí)現(xiàn)的
tryAcquire方法,該方法保證線程安全的獲取同步狀態(tài),這個(gè)方法需要隊(duì)列同步器的實(shí)現(xiàn)者來重寫。 - (2)如果同步狀態(tài)獲取失敗,則構(gòu)造同步節(jié)點(diǎn)(獨(dú)占式
Node.EXCLUSIVE)并通過addWaiter(Node node)方法將該節(jié)點(diǎn)加入到同步隊(duì)列的尾部。
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
//1.確保節(jié)點(diǎn)能夠線程安全地被添加
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//2.通過死循環(huán)來確保節(jié)點(diǎn)的正確添加,在"死循環(huán)"中只有通過`CAS`將節(jié)點(diǎn)設(shè)置為尾節(jié)點(diǎn)之后,當(dāng)前線程才能從該方法返回,否則當(dāng)前線程不斷地進(jìn)行嘗試。
enq(node);
return node;
}
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
- (3)最后調(diào)用
acquireQueued(Node node, int arg)方法,使得該節(jié)點(diǎn)以死循環(huán)的方式獲取同步狀態(tài)。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
//1.得到當(dāng)前節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn)
final Node p = node.predecessor();
//2.如果當(dāng)前節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn)是頭節(jié)點(diǎn),只有在這種情況下獲取同步狀態(tài)成功
if (p == head && tryAcquire(arg)) {
//3.將當(dāng)前節(jié)點(diǎn)設(shè)為頭節(jié)點(diǎn)
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
-
可以看到,當(dāng)前線程在“死循環(huán)”中嘗試獲取同步狀態(tài),而只有前驅(qū)節(jié)點(diǎn)是頭節(jié)點(diǎn)才能夠嘗試獲取同步狀態(tài),這是由于:
- 頭節(jié)點(diǎn)是成功獲取到同步狀態(tài)的節(jié)點(diǎn),而頭節(jié)點(diǎn)的線程釋放了同步狀態(tài)后,將會喚醒其后繼節(jié)點(diǎn),后繼節(jié)點(diǎn)的線程被喚醒后需要檢查自己的前驅(qū)節(jié)點(diǎn)是否是頭節(jié)點(diǎn)。
- 維護(hù)同步隊(duì)列的
FIFO原則,通過簡單地判斷自己的前驅(qū)是否為頭節(jié)點(diǎn),這樣就使得節(jié)點(diǎn)的釋放規(guī)則符合FIFO,并且也便于對過早通知的處理(過早通知是指前驅(qū)節(jié)點(diǎn)不是頭節(jié)點(diǎn)的線程由于中斷而被喚醒)
當(dāng)同步狀態(tài)獲取成功之后,當(dāng)前線程從
acquire(int arg)方法返回,如果對于鎖這種并發(fā)組件而言,代表著當(dāng)前線程獲取了鎖。通過調(diào)用同步器的
release(int arg)方法可以釋放同步狀態(tài),該方法執(zhí)行時(shí),會喚醒頭節(jié)點(diǎn)的后繼節(jié)點(diǎn)線程,unparkSuccessor(Node node)方法使用LockSupport來喚醒處于等待狀態(tài)的線程。
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
- (4)如果獲取不到,則阻塞節(jié)點(diǎn)中的線程,而被阻塞線程的喚醒主要依靠前驅(qū)節(jié)點(diǎn)的出隊(duì)或阻塞線程被中斷來實(shí)現(xiàn)。
總結(jié):
1.在獲取同步狀態(tài)時(shí),同步器維護(hù)一個(gè)同步隊(duì)列,獲取狀態(tài)失敗的線程都會被加入到隊(duì)列中進(jìn)行自旋;
2.移出隊(duì)列(或停止自旋)的條件是前驅(qū)節(jié)點(diǎn)為頭節(jié)點(diǎn)且成功獲取了同步狀態(tài)。
3.在釋放同步狀態(tài)時(shí),同步器調(diào)用tryRelease(int arg)方法來釋放同步狀態(tài),然后喚醒頭節(jié)點(diǎn)的后繼節(jié)點(diǎn)。
5.2.2.3 共享式同步狀態(tài)獲取與釋放
- 共享式獲取和獨(dú)占式獲取最主要的區(qū)別在于同一時(shí)刻能夠有多個(gè)線程同時(shí)獲取到同步狀態(tài)。
- 通過調(diào)用同步器的
acquireShared(int arg)方法可以共享式地獲取同步狀態(tài):
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
tryAcquireShared返回int類型,如果同步狀態(tài)獲取成功,那么返回值大于等于0,否則進(jìn)入自旋狀態(tài);成功獲取到同步狀態(tài)并退出自旋狀態(tài)的條件是當(dāng)前節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn)為頭節(jié)點(diǎn),并且返回值大于等于0.
- 共享式獲取,通過調(diào)用
releaseShared(int arg)方法釋放同步狀態(tài),tryReleaseShared必須要確保同步狀態(tài)線程安全釋放,一般是通過循環(huán)或CAS來保證的,因?yàn)獒尫磐綘顟B(tài)的操作會同時(shí)來自多個(gè)線程。
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
5.2.2.4 獨(dú)占式超時(shí)獲取同步狀態(tài)
- 通過調(diào)用同步器的
doAcquireNanos(int arg, long nanosTimeout)方法可以超時(shí)獲取同步狀態(tài),即在指定的時(shí)間段內(nèi)獲取同步狀態(tài)。 - 在此之前,一個(gè)線程如果獲取不到鎖而被阻塞在
synchronized之外,對該線程進(jìn)行中斷操作,此時(shí)線程中斷的標(biāo)志位會被修改,但線程依舊會阻塞在synchronized上;如果通過acquireInterruptibly(int arg)方法獲取,如果在等待過程中被中斷,會立刻返回,并拋出InterruptedException異常。
private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (nanosTimeout <= 0L)
return false;
//1.計(jì)算出截止時(shí)間.
final long deadline = System.nanoTime() + nanosTimeout;
//2.加入節(jié)點(diǎn)
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
//3.取出前驅(qū)節(jié)點(diǎn)
final Node p = node.predecessor();
//4.如果獲取成功則直接返回
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return true;
}
nanosTimeout = deadline - System.nanoTime();
//5.如果到了超時(shí)時(shí)間,則直接返回
if (nanosTimeout <= 0L)
return false;
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
//6.如果在自旋過程中被中斷,那么拋出異常返回
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
通過上面的代碼可以知道,它和獨(dú)占式獲取的區(qū)別在于未獲取到同步狀態(tài)時(shí)的處理邏輯:獨(dú)占式獲取在獲取不到是會一直自旋等待;而超時(shí)獲取則會使當(dāng)前線程等待nanosTimeout納秒,如果當(dāng)前線程在這個(gè)時(shí)間內(nèi)沒有獲取到同步狀態(tài),將會從等待邏輯中自動返回。
5.2.2.5 自定義同步組件 - TwinsLock
TwinsLock只允許至多兩個(gè)線程同時(shí)訪問,超過兩個(gè)線程的訪問將會被阻塞。
public class TwinsLock implements Lock {
private final Sync sync = new Sync(2);
private static final class Sync extends AbstractQueuedSynchronizer {
Sync(int count) {
//初始值為2.
setState(count);
}
@Override
protected int tryAcquireShared(int arg) {
for(;;) {
//1.獲得當(dāng)前的狀態(tài).
int current = getState();
//2.newCount表示剩余可獲取同步狀態(tài)的線程數(shù)
int newCount = current - arg;
//3.如果小于0,那么返回獲取同步狀態(tài)失敗;否則通過CAS確保設(shè)置的正確性.
if (newCount < 0 || compareAndSetState(current, newCount)) {
//4.當(dāng)返回值大于等于0表示獲取同步狀態(tài)成功.
return newCount;
}
}
}
@Override
protected boolean tryReleaseShared(int arg) {
for (;;) {
int current = getState();
//將可獲取同步狀態(tài)的線程數(shù)加1.
int newCount = current + current;
if (compareAndSetState(current, newCount)) {
return true;
}
}
}
}
@Override
public void lock() {
sync.acquireShared(1);
}
@Override
public void unlock() {
sync.releaseShared(1);
}
@Override
public boolean tryLock() {
return false;
}
@Override
public boolean tryLock(long time, @NonNull TimeUnit unit) throws InterruptedException {
return false;
}
@Override
public void lockInterruptibly() throws InterruptedException {
}
@NonNull
@Override
public Condition newCondition() {
return null;
}
}
測試用例:
public static void createTwinsLock() {
final Lock lock = new TwinsLock();
class TwinsLockThread extends Thread {
@Override
public void run() {
Log.d(TAG, "TwinsLockThread, run=" + Thread.currentThread().getName());
while (true) {
lock.lock();
try {
Thread.sleep(1000);
Log.d(TAG, "TwinsLockThread, name=" + Thread.currentThread().getName());
Thread.sleep(1000);
} catch (Exception e) {
e.printStackTrace();
} finally {
Log.d(TAG, "TwinsLockThread, unlock=" + Thread.currentThread().getName());
lock.unlock();
}
}
}
}
for (int i = 0; i < 10; i++) {
Thread thread = new TwinsLockThread();
thread.start();
}
}
5.3 重入鎖
- 重入鎖
ReentrantLock表示該鎖能夠支持一個(gè)線程對資源的重復(fù)加鎖。 - 如果在絕對時(shí)間上,先對鎖獲取的請求一定先被滿足,那么這個(gè)鎖是公平的,公平地獲取鎖,也就是等待時(shí)間最長的線程最優(yōu)先地獲取鎖。
5.3.1 實(shí)現(xiàn)重進(jìn)入
重進(jìn)入需要解決兩個(gè)問題:
- 線程再次獲取鎖,鎖需要去識別獲取鎖地線程是否為當(dāng)前占據(jù)鎖的線程,如果是,則再次獲取成功。
- 鎖的最終釋放,線程重復(fù)
n次獲取了鎖,隨后在第n次釋放該鎖后,其它線程能夠獲取到該鎖。
5.3.2 公平與非公平鎖的區(qū)別
- 公平與否是針對獲取鎖而言的,如果一個(gè)鎖是公平的,那么鎖的獲取順序就應(yīng)該符合請求的絕對時(shí)間順序,即
FIFO。 - 公平鎖的區(qū)別在于加入了同步隊(duì)列中當(dāng)前節(jié)點(diǎn)是否有前驅(qū)節(jié)點(diǎn)的判斷,如果該方法返回
true,表示有線程比當(dāng)前線程更早地請求獲取鎖,因此需要等待前驅(qū)線程獲取并釋放鎖之后才能繼續(xù)獲取鎖;而對于非公平鎖,只要CAS設(shè)置同步狀態(tài)成功即可。 - 因此,公平鎖每次都是從同步隊(duì)列中的第一個(gè)節(jié)點(diǎn)獲取到鎖,而非公平鎖出現(xiàn)了一個(gè)線程連續(xù)獲取鎖的情況。
- 非公平鎖可能使線程饑餓,但其極少的線程切換,保證了更大的吞吐量。
5.4 讀寫鎖
- 之前提到的鎖都是排它鎖,這些鎖在同一時(shí)刻只允許一個(gè)線程進(jìn)行訪問,而讀寫鎖在同一時(shí)刻可以允許多個(gè)讀線程訪問,但是在寫線程訪問時(shí),所有的讀線程和其他寫線程均被阻塞。讀寫鎖維護(hù)了一對鎖,一個(gè)讀鎖和一個(gè)寫鎖,通過分離讀鎖和寫鎖,使得并發(fā)性有很大提升。
- 并發(fā)包提供的讀寫鎖的實(shí)現(xiàn)是
ReentrantReadWrireLock,它支持公平性選擇、重進(jìn)入、鎖降級(寫鎖能夠降級為讀鎖)。
ReadWriteLock僅定義了獲取讀鎖和寫鎖的兩個(gè)方法,即readLock和writeLock,而其實(shí)現(xiàn)ReentrantReadWriteLock:
-
getReadLockCount:返回當(dāng)前讀鎖被獲取的次數(shù)。 -
getReadHoldCount:返回當(dāng)前線程獲取讀鎖的次數(shù)。 -
isWriteLocked:判斷寫鎖是否被獲取。 -
getWriteHoldCount:返回當(dāng)前線程獲取寫鎖的次數(shù)。
下面是一個(gè)讀寫鎖的簡單用例:
public class ReadWriteCache {
static Map<String, Object> map = new HashMap<>();
static ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
static Lock r = rwl.readLock();
static Lock w = rwl.writeLock();
public static Object get(String key) {
r.lock();
try {
return map.get(key);
} finally {
r.unlock();
}
}
public static Object put(String key, Object value) {
w.lock();
try {
return map.put(key, value);
} finally {
w.unlock();
}
}
public static void clear() {
w.lock();
try {
map.clear();
} finally {
w.unlock();
}
}
}
5.4.2 讀寫鎖的實(shí)現(xiàn)分析
- 讀寫狀態(tài)的設(shè)計(jì)
讀寫鎖需要在同步狀態(tài)(一個(gè)整形變量,高16表示讀,低16表示寫)上維護(hù)多個(gè)讀線程和一個(gè)寫線程的狀態(tài),使得該狀態(tài)的設(shè)計(jì)成為讀寫鎖實(shí)現(xiàn)的關(guān)鍵。 - 寫鎖的獲取與釋放
寫鎖是一個(gè)支持重進(jìn)入的排它鎖,如果當(dāng)前線程已經(jīng)獲取了寫鎖,則增加寫狀態(tài)。如果當(dāng)前線程在獲取寫鎖時(shí),讀鎖已經(jīng)被獲取,則當(dāng)前線程進(jìn)入等待狀態(tài)。
原因在于:讀寫鎖要確保寫鎖的操作對讀鎖可見,如果允許讀鎖在已經(jīng)被獲取的情況下對寫鎖的獲取,那么正在運(yùn)行的其它讀線程就無法感知到當(dāng)前寫線程的操作。 - 讀鎖的獲取與釋放
讀鎖是一個(gè)支持重進(jìn)入的共享鎖,它能被多個(gè)線程同時(shí)獲取,在沒有其它寫線程訪問(或者寫狀態(tài)為0)時(shí),讀鎖總是被成功地獲取,而所做的也只是(線程安全)增加讀狀態(tài)。 - 鎖降級
鎖降級是指把持住(當(dāng)前擁有的)寫鎖,再獲取到讀鎖,隨后釋放(先前擁有的)寫鎖的過程。
5.6 Condition接口
Condition定義了等待/通知兩種類型的方法,當(dāng)前線程調(diào)用這些方法時(shí),需要提前獲取到Condition對象關(guān)聯(lián)的鎖,Condition是依賴Lock對象的。
當(dāng)調(diào)用await()方法后,當(dāng)前線程會釋放鎖并在此等待,而其他線程調(diào)用Condition對象的signal方法,通知當(dāng)前線程后,當(dāng)前線程才從await方法返回,并且在返回前已經(jīng)獲取了鎖。
獲取一個(gè)Condition必須通過Lock的newCondition方法,下面是一個(gè)有界隊(duì)列的示例:
public class BoundedQueue<T> {
private Object[] items;
private int addIndex, removeIndex, count;
private Lock lock = new ReentrantLock();
private Condition notEmpty = lock.newCondition();
private Condition notFull = lock.newCondition();
public BoundedQueue(int size) {
items = new Object[size];
}
public void add(T t) throws InterruptedException {
lock.lock();
try {
while (count == items.length) { //如果當(dāng)前隊(duì)列內(nèi)的個(gè)數(shù)等于最大長度,那么釋放鎖.
notFull.await();
}
if (++addIndex == items.length) { //如果已經(jīng)到了尾部,那么從頭開始.
addIndex = 0;
}
++count;
notEmpty.signal(); //通知阻塞在"空"條件上的線程.
} finally {
lock.unlock();
}
}
public T remove() throws InterruptedException {
lock.lock();
try {
while (count == 0) {
notEmpty.await(); //如果當(dāng)前隊(duì)列的個(gè)數(shù)等于0,那么釋放鎖.
}
Object x = items[removeIndex];
if (++removeIndex == items.length) {
removeIndex = 0;
}
--count;
notFull.signal(); //通知阻塞在"滿"條件上的線程.
return (T) x;
} finally {
lock.unlock();
}
}
}
Condition的方法:
await():當(dāng)前線程進(jìn)入等待狀態(tài)直到被通知signal或中斷,當(dāng)前線程進(jìn)入運(yùn)行狀態(tài)且從await返回的情況:其他線程調(diào)用該
Condition的signal或signalAll方法。其它線程中斷當(dāng)前線程(
interrupt)。如果當(dāng)前等待線程從
await方法返回,那么表明當(dāng)前線程已經(jīng)獲取了Condition對象所對應(yīng)的鎖。awaitUninerruptibly:對中斷不敏感long await Nanos(long):加入了超時(shí)的判斷,返回值是(nanosTimeout- 實(shí)際耗時(shí)),如果返回值是0或者負(fù)數(shù),那么可以認(rèn)定為超時(shí)。boolean awaitUntil(Data):直到某個(gè)固定時(shí)間。signal:喚醒一個(gè)等待在Condition上的線程。signalAll:喚醒所有等待在Condition上的線程。
5.6.2 Condition的實(shí)現(xiàn)
ConditionObject是AbstractQueuedSynchronizer的內(nèi)部類,每個(gè)Condition對象都包含著一個(gè)隊(duì)列。
1.等待隊(duì)列
在隊(duì)列中的每個(gè)節(jié)點(diǎn)都包含了一個(gè)線程的引用,該線程就是在Condition對象上等待的線程,同步隊(duì)列和等待隊(duì)列中節(jié)點(diǎn)的類型都是同步器的靜態(tài)內(nèi)部類AbstractQueuedSynchronizer.Node。
由于Condition的實(shí)現(xiàn)是同步器的內(nèi)部類,因此每個(gè)Condition實(shí)例都能夠訪問同步器提供的方法,相當(dāng)于每個(gè)Condition都擁有所屬同步器的引用。
當(dāng)調(diào)用await方法時(shí),將會以當(dāng)前線程構(gòu)造節(jié)點(diǎn),并將節(jié)點(diǎn)從尾部加入到等待隊(duì)列,也就是將同步隊(duì)列移動到Condition隊(duì)列當(dāng)中。
2.等待
調(diào)用該方法的前提是當(dāng)前線程必須獲取了鎖,也就是同步隊(duì)列中的首節(jié)點(diǎn),它不是直接加入到等待隊(duì)列當(dāng)中,而是通過addConditionWaiter()方法把當(dāng)前線程構(gòu)造成一個(gè)新的節(jié)點(diǎn)并將其加入到等待隊(duì)列當(dāng)中。
3.通知
調(diào)用該方法的前提是當(dāng)前線程必須獲取了鎖,接著獲取等待隊(duì)列的首節(jié)點(diǎn),將其移動到同步隊(duì)列并使用LockSupport喚醒節(jié)點(diǎn)中的線程。
被喚醒的線程,將從await方法中的while中返回,進(jìn)而調(diào)用同步器的acquireQueued方法加入到獲取同步狀態(tài)的競爭中。
Condition的signalAll方法,相當(dāng)于對等待隊(duì)列中的每個(gè)節(jié)點(diǎn)均執(zhí)行一次signal方法,效果就是將等待隊(duì)列中所有節(jié)點(diǎn)全部移動到同步隊(duì)列中,并喚醒每個(gè)節(jié)點(diǎn)。
六、Java并發(fā)容器和框架
6.1 ConcurrentHashMap
ConcurrentHashMap是線程安全并且高效的HashMap,其它的類似容器有以下缺點(diǎn):
-
HashMap在并發(fā)執(zhí)行put操作時(shí),會導(dǎo)致Entry鏈表形成環(huán)形數(shù)據(jù)結(jié)構(gòu),就會產(chǎn)生死循環(huán)獲取Entry。 -
HashTable使用synchronized來保證線程安全,但在線程競爭激烈的情況下HashTable的效率非常低下。
ConcurrentHashMap高效的原因在于它采用鎖分段技術(shù),首先將數(shù)據(jù)分成一段一段地存儲,然后給每段數(shù)據(jù)配一把鎖,當(dāng)一個(gè)線程占用鎖并且訪問一段數(shù)據(jù)的時(shí)候,其他段的數(shù)據(jù)也能被其他線程訪問。
6.1.2 ConcurrentHashMap的結(jié)構(gòu)
ConcurrentHashMap是由Segment數(shù)組結(jié)構(gòu)和HashEntry數(shù)組結(jié)構(gòu)組成:
-
Segment是一種可重入鎖,在ConcurrentHashMap里面扮演鎖的角色; -
HashEntry則用于存儲鍵值對數(shù)據(jù)。
一個(gè)ConcurrentHashMap里包含一個(gè)Segment數(shù)組,它的結(jié)構(gòu)和HashMap類似,是一種數(shù)組和鏈表結(jié)構(gòu)。
一個(gè)Segment里包含一個(gè)HashEntry數(shù)組,每個(gè)HashEntry是一個(gè)鏈表結(jié)構(gòu)的元素,每個(gè)Segment守護(hù)著一個(gè)HashEntry里的元素,當(dāng)對HashEntry數(shù)組的數(shù)據(jù)進(jìn)行修改時(shí),必須首先獲得與它對應(yīng)的Segment鎖。
6.1.5 ConcurrentHashMap的操作
get
get的高效在于整個(gè)get過程中不需要加鎖,除非讀到的值是空才會加鎖重讀。原因是它的get方法將要使用的共享變量都設(shè)為volatile,能夠在線程間保持可見性,能夠被多線程同時(shí)讀,并且不會讀到過期的值,例如用于統(tǒng)計(jì)當(dāng)前Segment大小的count字段和用于存儲值的HashEntry的value。
put
put方法里需要對共享變量進(jìn)行寫入操作,所以為了線程安全,在操作共享變量之前必須加鎖,put首先定位到Segment,然后在Segment里進(jìn)行插入操作。
size
先嘗試2次通過不鎖住Segment的方式來統(tǒng)計(jì)各個(gè)Segment的大小,如果統(tǒng)計(jì)的過程中,容器的count發(fā)生了變化,則再用加鎖的方式來統(tǒng)計(jì)所有Segment的大小。
6.2 ConcurrentLinkedQueue
ConcurrentLinkedQueue是一個(gè)基于鏈接節(jié)點(diǎn)的無界線程安全隊(duì)列,它采用先進(jìn)先出的規(guī)則對節(jié)點(diǎn)進(jìn)行排序,它采用CAS算法來實(shí)現(xiàn)。
6.2.1 入隊(duì)列
入隊(duì)主要做兩件事情:
- 將入隊(duì)節(jié)點(diǎn)設(shè)置成當(dāng)前隊(duì)列尾節(jié)點(diǎn)的下一個(gè)節(jié)點(diǎn)。
- 更新
tail節(jié)點(diǎn),如果tail節(jié)點(diǎn)的next節(jié)點(diǎn)不為空,則將入隊(duì)節(jié)點(diǎn)設(shè)置成tail節(jié)點(diǎn);如果tail節(jié)點(diǎn)的next節(jié)點(diǎn)為空,則將入隊(duì)節(jié)點(diǎn)設(shè)置成tail的next節(jié)點(diǎn)。
在多線程情況下,如果有一個(gè)線程正在入隊(duì),那么它必須先獲取尾節(jié)點(diǎn),然后設(shè)置尾節(jié)點(diǎn)的下一個(gè)節(jié)點(diǎn)為入隊(duì)節(jié)點(diǎn),但這時(shí)可能有另外一個(gè)線程插隊(duì)了,那么隊(duì)列的尾節(jié)點(diǎn)就會發(fā)生變化,這時(shí)第一個(gè)線程要暫停入隊(duì)操作,然后重新獲取尾節(jié)點(diǎn)。
整個(gè)入隊(duì)操作主要做兩件事:
- 定位出尾節(jié)點(diǎn)。
- 使用
CAS算法將入隊(duì)節(jié)點(diǎn)設(shè)置成尾節(jié)點(diǎn)的next節(jié)點(diǎn),如不成功則重試。
6.3 阻塞隊(duì)列
6.3.1 阻塞隊(duì)列
阻塞隊(duì)列是一個(gè)支持兩個(gè)附加操作的隊(duì)列,這兩個(gè)附加的操作支持阻塞的插入和移除方法:
- 當(dāng)隊(duì)列滿時(shí),隊(duì)列會阻塞插入元素的線程,直到隊(duì)列不滿。
- 當(dāng)隊(duì)列空時(shí),獲取元素的線程會等待隊(duì)列為空。
在阻塞隊(duì)列不可用時(shí),附加操作提供了4種處理方式:拋出異常、返回特殊值、一直阻塞、超時(shí)退出。每種方式通過調(diào)用不同的方法來實(shí)現(xiàn)。
Java里面提供了7種阻塞隊(duì)列。
6.4 Fork/Join框架
用于并行執(zhí)行任務(wù)的框架,是把一個(gè)大任務(wù)分割成若干個(gè)小任務(wù),最終匯總每個(gè)小任務(wù)結(jié)果后得到大人物結(jié)果的框架。
Fork/Join使用兩個(gè)類來完成事情:
-
ForkJoinTask:它提供了fork()和join()操作的機(jī)制,通常情況下,我們繼承它的子類:有返回結(jié)果的RecursiveTask和沒有返回結(jié)果的RecursiveAction。 -
ForkJoinPool:ForkJoinTask需要通過ForkJoinPool來添加。
ForkJoinTask在執(zhí)行的時(shí)候可能會拋出異常,但是我們沒有辦法在主線程里直接捕獲異常,所以ForkJoinTask提供了isCompletedAbnormally()方法來檢查任務(wù)是否已經(jīng)拋出異?;蛞呀?jīng)取消了。
ForkJoinPool由ForkJoinTask數(shù)組和ForkJoinWorkerThread數(shù)組組成,ForkJoinTask數(shù)組負(fù)責(zé)將存放程序提交給ForkJoinPool的任務(wù),而ForkJoinWorkerThread數(shù)組負(fù)責(zé)執(zhí)行這些任務(wù)。
七、Java中的13個(gè)原子操作類
Atomic包里提供了:原子更新基本類型、原子更新數(shù)組、原子更新引用和原子更新屬性。
7.1 原子更新基本類型:
AtomicBooleanAtomicIntegerAtomicLong
基本方法:
-
int addAndGet(int delta):以原子方式將輸入的值與當(dāng)前的值相加,并返回結(jié)果。 -
boolean compareAndSet(int expect, int update):如果當(dāng)前的數(shù)值等于預(yù)期值,則以原子方式將該值設(shè)置為輸入的值。 -
int getAndIncrement():以原子方式加1,并返回自增前的值。 -
void lazySet(int newValue):最終會設(shè)置成newValue,可能會導(dǎo)致其他線程在之后的一小段時(shí)間內(nèi)還是讀到舊值。 -
int getAndSet(int newValue):以原子方式設(shè)置為newValue的值,并返回舊值。
7.2 原子更新引用類型
AtomicIntegerArrayAtomicLongArrayAtomicReferenceArray
基本方法:
-
int addAndGet(int i, int delta):以原子方式將輸入值和索引i的元素相加。 -
boolean compareAndSet(int i, int expect, int update):如果當(dāng)前值等于預(yù)期值,則以原子方式將數(shù)組位置i的元素設(shè)置成update值。
7.3 原子更新引用類型
用于原子更新多個(gè)變量,提供了3種類型:
-
AtomicReference:原子更新引用類型。 -
AtomicReferenceFieldUpdater:原子更新引用類型里的字段。 -
AtomicMarkableReference:原子更新帶有標(biāo)記位的引用類型。
7.4 原子更新字段類
-
AtomicIntegerFieldUpdater:原子更新整形的字段的更新器。 -
AtomicLongFieldUpdater:原子更新長整形字段的更新器。 -
AtomicStampedReference:原子更新帶有版本號的引用類型。
原子地更新字段需要兩步:
- 因?yàn)樵痈伦侄晤惗际浅橄箢悾看问褂玫臅r(shí)候必須使用靜態(tài)方法
newUpdater創(chuàng)建一個(gè)更新器,并且需要設(shè)置想要更新的類和屬性。 - 更新類的字段必須使用
public volatile來修飾。
八、Java中的并發(fā)工具類
九、Java中的線程池
線程池的優(yōu)點(diǎn):降低資源消耗,提高響應(yīng)速度,提高線程的可管理性。
9.1 線程池的實(shí)現(xiàn)原理
線程池的處理流程如下:
- 判斷核心線程池是否已滿,如果不是,則創(chuàng)建一個(gè)新的工作線程來執(zhí)行任務(wù);如果已滿,則進(jìn)入下個(gè)流程。
- 判斷工作隊(duì)列是否已滿,如果不是,則將提交的任務(wù)存儲在工作隊(duì)列里;如果已滿,則進(jìn)入下個(gè)流程。
- 判斷線程池的線程是否都處于工作狀態(tài),如果沒有,則創(chuàng)建一個(gè)新的工作線程;如果已滿,則交給飽和策略來處理。
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) { //1.添加進(jìn)入核心線程.
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) { //2.添加進(jìn)入隊(duì)列.
int recheck = ctl.get();
if (!isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false)) //3.添加進(jìn)入非核心線程.
reject(command);
}
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
在以上的三步中,除了加入隊(duì)列不用獲取全局鎖以外,其它兩種情況都需要獲取,為了盡可能地避免獲取全局鎖,在ThreadPoolExecutor完成預(yù)熱之后(當(dāng)前運(yùn)行的線程數(shù)大于corePoolSize),幾乎所有的execute方法調(diào)用都是加入到隊(duì)列當(dāng)中。
9.2 線程池的使用
9.2.1 線程池的創(chuàng)建
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
-
corePoolSize:當(dāng)提交一個(gè)任務(wù)到線程池時(shí),線程池會創(chuàng)建一個(gè)線程來執(zhí)行任務(wù),即使其它空閑的基本線程能夠執(zhí)行新任務(wù)也會創(chuàng)建。 -
runnableTaskQueue:用于保存等待執(zhí)行的任務(wù)的阻塞隊(duì)列,可以選擇: -
ArrayBlockingQueue:基于數(shù)組結(jié)構(gòu)的有界阻塞隊(duì)列。 -
LinkedBlockingQueue:基于鏈表結(jié)構(gòu)的阻塞隊(duì)列,吞吐量高于前者。 -
SynchronousQueue:不存儲元素的阻塞隊(duì)列,每個(gè)插入操作必須等待另一個(gè)線程調(diào)用了移除操作,靜態(tài)工廠方法Executors.newCachedThreadPool使用了這個(gè)隊(duì)列。 -
PriorityBlockingQueue:一個(gè)具有優(yōu)先級的無限阻塞隊(duì)列。 -
maxPoolSize:允許創(chuàng)建的最大線程數(shù)。 -
ThreadFactory:用于設(shè)置創(chuàng)建線程的工廠。 -
RejectExecutionHandler:飽和策略。 -
keepAliveTime:線程池的工作線程空閑后,保持存活的時(shí)間。 -
TimeUnit:線程保持活動的單位。
9.2.2 向線程池提交任務(wù)
-
execute(Runnable runnable):提交不需要返回值的任務(wù)。 -
Future<Object> future = executor.submit(haveReturnValuetask):用于提交需要返回值的任務(wù),線程池會返回一個(gè)future類型任務(wù),可以用它來判斷任務(wù)是否執(zhí)行成功,并且可以通過get方法來獲取返回值,get方法會阻塞當(dāng)前線程直到任務(wù)完成。
9.2.3 關(guān)閉線程池
-
shutdownNow:首先將線程池的狀態(tài)設(shè)為STOP,然后嘗試停止所有的正在執(zhí)行或暫停任務(wù)的線程,并返回等待執(zhí)行任務(wù)的列表。 -
shutdown:將線程池的狀態(tài)置為SHUTDOWN,然后中斷所有沒有正在執(zhí)行任務(wù)的線程。
十、Executor框架
(1)在上層,Java多線程程序通常把應(yīng)用分解為若干個(gè)任務(wù),然后使用用戶級的調(diào)度器(Executor框架)將這些任務(wù)映射為固定數(shù)量的線程。
(2)在HotSpot VM的線程模型中,Java線程再被一對一映射為本地操作系統(tǒng)線程,Java線程啟動時(shí)會創(chuàng)建一個(gè)本地操作系統(tǒng)線程,當(dāng)該線程終止時(shí),這個(gè)操作系統(tǒng)線程也會被回收。
(3)操作系統(tǒng)會調(diào)度所有線程并將它們分配給可用的CPU。
Executor框架
由三個(gè)部分組成:
- 任務(wù),即
Runnable接口或Callable接口。 - 任務(wù)的執(zhí)行,包括核心接口
Executor,以及繼承自Executor的ExecutorService,還有它的兩個(gè)關(guān)鍵類ThreadPoolExecutor(用來執(zhí)行任務(wù))和ScheduledThreadPoolExecutor(可以在給定的延遲后運(yùn)行命令,或者定期執(zhí)行命令)。 - 異步計(jì)算的結(jié)果,包括接口
Future和實(shí)現(xiàn)類FutureTask。
10.2 ThreadPoolExecutor詳解
通過工具類Executors,可以創(chuàng)建以下三種類型的ThreadPoolExecutor,調(diào)用靜態(tài)創(chuàng)建方法之后,會返回ExecutorService
-
FixedThreadPool
可重用固定線程數(shù)的線程池;如果當(dāng)前運(yùn)行的線程數(shù)少于corePoolSize,則創(chuàng)建新線程來執(zhí)行任務(wù);如果等于corePoolSize,將任務(wù)加入到無界隊(duì)列LinkedBlockingQueue當(dāng)中;多余的空閑線程將會被立即終止。 -
SingleThreadPool
單個(gè)woker線程的executor;corePoolSize和maximumPoolSize為1;采用無界隊(duì)列作為工作隊(duì)列。 -
CacheThreadPool
采用沒有容量的SynchronousQueue作為線程池的工作隊(duì)列,其corePoolSize為0,maximumPool是無界的;其中的空閑線程最多等待60s。
如果主線程提交任務(wù)的速度高于maximumPool中線程處理任務(wù)的速度時(shí),CacheThreadPool會不斷創(chuàng)建新線程,極端情況下,CacheThreadPool會因?yàn)閯?chuàng)建過多線程而耗盡CPU資源。
10.3 ScheduledThreadPoolExecutor詳解
用來在給定的延遲之后執(zhí)行任務(wù),或者定期執(zhí)行任務(wù),并且可以在指定的構(gòu)造函數(shù)中指定多個(gè)對應(yīng)的后臺線程數(shù)。
它采用DelayQueue這個(gè)無界隊(duì)列作為工作隊(duì)列,其執(zhí)行分為兩個(gè)部分:
- 當(dāng)調(diào)用
ScheduledThreadPoolExecutor的scheduleAtFixedRate()或者scheduleWithFIxedDelay,它會向DelayQueue中添加ScheduledFutureTask。 - 線程池中的線程從
DelayQueue中獲取ScheduledFutureTask。