1. 概述
在jdk1.4以前,java的內(nèi)置鎖(使用synchronized的方法或代碼塊)性能問題一直都在被人們關(guān)注.jdk1.5中加入了java.concurrent.util包, 來實現(xiàn)高性能的同步器(如ReentrantLock, CountDownLatch等).
顯然, java.util.concurrent包的目的有兩個:1.提升性能(相比于synchronized), 2.實現(xiàn)同步器的基本功能(acquire和release).
那么, java.util.concurrent包是如何達(dá)到以上兩個目的的呢? 主要從兩方面著手:
1.jdk1.5中提供了硬件級別的同步原語(compareAndSwap)來提高同步效率;
2.在java層面提供了一個設(shè)計良好的抽象同步器AbstractQueuedSync. java.concurrent.util包粽所有的同步器(ReentrantLock,CountDownLatch等)都基于這個抽象類.
本文將首先介紹compareAndSwap相比于synchronized的優(yōu)勢, 接著介紹一下框架核心AbstractQueuedSync的結(jié)構(gòu), 最后簡述一下java.util.concurrent包中的若干個同步器.
2. java.concurrent.util包實現(xiàn)原理
2.1 同步原語(synchronized and CAS)
jdk1.4以前, java用內(nèi)置鎖(synchronized)來實現(xiàn)同步. 這種方式是非常低效的, 為什么? 來看一下原因.
我們可以通過反編譯原來來看一下synchronized在class文件中的指令. 先編寫一個類.
package chyun.concurrent;
public class SynchronizedDemo {
public void method() {
synchronized (this) {
System.out.println("Method 1 start");
}
}
}
反編譯結(jié)果:

可以看到synchronized是通過monitorenter和monitorexit兩條指令來實現(xiàn)的,
關(guān)于這兩條指令的作用,我們直接參考JVM規(guī)范中描述.
monitorenter:
Each object is associated with a monitor. A monitor is locked if and only if it has an owner. The thread that executes monitorenter attempts to gain ownership of the monitor associated with objectref, as follows:
? If the entry count of the monitor associated with objectref is zero, the thread enters the monitor and sets its entry count to one. The thread is then the owner of the monitor.
? If the thread already owns the monitor associated with objectref, it reenters the monitor, incrementing its entry count.
? If another thread already owns the monitor associated with objectref, the thread blocks until the monitor's entry count is zero, then tries again to gain ownership.
這段話的大概意思為:
每個對象有一個監(jiān)視器鎖(monitor).當(dāng)monitor被占用時就會處于鎖定狀態(tài),線程執(zhí)行monitorenter指令時嘗試獲取monitor的所有權(quán),過程如下:
- 如果monitor的進(jìn)入數(shù)為0,則該線程進(jìn)入monitor,然后將進(jìn)入數(shù)設(shè)置為1,該線程即為monitor的所有者.
- 如果線程已經(jīng)占有該monitor,只是重新進(jìn)入,則進(jìn)入monitor的進(jìn)入數(shù)加1.
- 如果其他線程已經(jīng)占用了monitor,則該線程進(jìn)入阻塞狀態(tài),直到monitor的進(jìn)入數(shù)為0,再重新嘗試獲取monitor的所有權(quán).
monitorexit
The thread that executes monitorexit must be the owner of the monitor associated with the instance referenced by objectref.
The thread decrements the entry count of the monitor associated with objectref. If as a result the value of the entry count is zero, the thread exits the monitor and is no longer its owner. Other threads that are blocking to enter the monitor are allowed to attempt to do so.
這段話的大概意思為:
執(zhí)行monitorexit的線程必須是objectref所對應(yīng)的monitor的所有者.
指令執(zhí)行時,monitor的進(jìn)入數(shù)減1,如果減1后進(jìn)入數(shù)為0,那線程退出monitor,不再是這個monitor的所有者.其他被這個monitor阻塞的線程可以嘗試去獲取這個 monitor 的所有權(quán).
通過這兩段描述,我們應(yīng)該能很清楚的看出Synchronized的實現(xiàn)原理,Synchronized的語義底層是通過一個monitor的對象來完成,其實wait/notify等方法也依賴于monitor對象,這就是為什么只有在同步的塊或者方法中才能調(diào)用wait/notify等方法,否則會拋出java.lang.IllegalMonitorStateException的異常的原因.
我們可以用同樣的方法查看同步方法(方法前用Synchronized修飾)的原理此處不在論述.
大致上我們可以認(rèn)為, Synchronized提供的鎖事悲觀鎖, 并且是在java指令層面實現(xiàn)的.
下面看java.util.concurrent的實現(xiàn).
以AtomicInteger為例, 其incrementAndGet的源碼如下:
public final int incrementAndGet() {
for (;;) {
int current = get();
int next = current + 1;
if (compareAndSet(current, next))
return next;
}
}
public final boolean compareAndSet(int expect, int update) {
return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}
/**
* Atomically update Java variable to <tt>x</tt> if it is currently
* holding <tt>expected</tt>.
* @return <tt>true</tt> if successful
*/
public final native boolean compareAndSwapInt(Object o, long offset,
int expected,
int x);
跟蹤代碼發(fā)現(xiàn)其底層實現(xiàn)為unsafe.compareAndSwapInt(), 其底層的原子性由cpu提供.
與synchronized不同, java.util.concurent包中的同步器一般使用樂觀鎖(采用CAS的方式), 并且提供硬件級別的同步原語來提升性能.
2.2 AbstractQueuedSync
java.util.concurrent包為所有應(yīng)用層的同步器提供了一個抽象類AbstractQueuedSync, 其余同步器只需繼承該類即可實現(xiàn)一個高性能的同步器.
同步器背后的基本思想非常簡單, 只需兩個操作acquire(獲取資源)和release(釋放資源).
其偽代碼如下:
acquire:
while (synchronization state does not allow acquire) {
enqueue current thread if not already queued;
possibly block current thread;
}
dequeue current thread if it was queued;
release:
update synchronization state;
if (state may permit a blocked thread to acquire)
unblock one or more queued threads;
為了實現(xiàn)上述操作,需要下面三個基本組件的相互協(xié)作:
-同步狀態(tài)的原子性管理;
-線程的阻塞與解除阻塞;
-隊列的管理;
2.2.1 同步狀態(tài)
AQS類使用單個int(32位)來保存同步狀態(tài),并暴露出getState、setState以及compareAndSet操作來讀取和更新這個狀態(tài).這些方法都依賴于j.u.c.atomic包的支持,這個包提供了兼容JSR133中volatile在讀和寫上的語義,并且通過使用本地的compare-and-swap或load-linked/store-conditional指令來實現(xiàn)compareAndSetState,使得僅當(dāng)同步狀態(tài)擁有一個期望值的時候,才會被原子地設(shè)置成新值.
基于AQS的具體實現(xiàn)類必須根據(jù)暴露出的狀態(tài)相關(guān)的方法定義tryAcquire和tryRelease方法,以控制acquire和release操作.當(dāng)同步狀態(tài)滿足時,tryAcquire方法必須返回true,而當(dāng)新的同步狀態(tài)允許后續(xù)acquire時,tryRelease方法也必須返回true.這些方法都接受一個int類型的參數(shù)用于傳遞想要的狀態(tài).例如:可重入鎖中,當(dāng)某個線程從條件等待中返回,然后重新獲取鎖時,為了重新建立循環(huán)計數(shù)的場景.很多同步器并不需要這樣一個參數(shù),因此忽略它即可.
2.2.2 阻塞
在JSR166之前,阻塞線程和解除線程阻塞都是基于Java內(nèi)置管程,沒有其它非基于Java內(nèi)置管程的API可以用來創(chuàng)建同步器.唯一可以選擇的是Thread.suspend和Thread.resume,但是它們都有無法解決的競態(tài)問題,所以也沒法用:當(dāng)一個非阻塞的線程在一個正準(zhǔn)備阻塞的線程調(diào)用suspend前調(diào)用了resume,這個resume操作將不會有什么效果.
j.u.c包有一個LockSuport類,這個類中包含了解決這個問題的方法.方法LockSupport.park阻塞當(dāng)前線程除非/直到有個LockSupport.unpark方法被調(diào)用(unpark方法被提前調(diào)用也是可以的).unpark的調(diào)用是沒有被計數(shù)的,因此在一個park調(diào)用前多次調(diào)用unpark方法只會解除一個park操作.另外,它們作用于每個線程而不是每個同步器.一個線程在一個新的同步器上調(diào)用park操作可能會立即返回,因為在此之前可能有“剩余的”unpark操作.但是,在缺少一個unpark操作時,下一次調(diào)用park就會阻塞.雖然可以顯式地消除這個狀態(tài),但并不值得這樣做.在需要的時候多次調(diào)用park會更高效.
這個簡單的機(jī)制與有些用法在某種程度上是相似的,例如Solaris-9的線程庫,WIN32中的“可消費事件”,以及Linux中的NPTL線程庫.因此最常見的運行Java的平臺上都有相對應(yīng)的有效實現(xiàn).(但目前Solaris和Linux上的Sun Hotspot JVM參考實現(xiàn)實際上是使用一個pthread的condvar來適應(yīng)目前的運行時設(shè)計的).park方法同樣支持可選的相對或絕對的超時設(shè)置,以及與JVM的Thread.interrupt結(jié)合--可通過中斷來unpark一個線程.
2.2.3 隊列
整個框架的關(guān)鍵就是如何管理被阻塞的線程的隊列,該隊列是嚴(yán)格的FIFO隊列,因此,框架不支持基于優(yōu)先級的同步.
AQS維護(hù)一個CLH鏈表隊列, 通過兩個字段head和tail來存取, 這兩個字段是可原子更新的, 兩者在初始化時都指向了一個空節(jié)點.

一個新的節(jié)點,node,通過一個原子操作入隊:
do {
pred = tail;
} while(!tail.compareAndSet(pred, node));
每一個節(jié)點的“釋放”狀態(tài)都保存在其前驅(qū)節(jié)點中. 因此AQS作為自旋鎖的"自旋"操作就如下:
while (pred.status != RELEASED); // spin
自旋后的出隊操作只需將head字段指向剛剛得到鎖的節(jié)點:
head = node;
CLH鎖的優(yōu)點在于其入隊和出隊操作是快速、無鎖的,以及無障礙的(即使在競爭下,某個線程總會贏得一次插入機(jī)會而能繼續(xù)執(zhí)行);且探測是否有線程正在等待也很快(只要測試一下head是否與tail相等);同時,“釋放”狀態(tài)是分散的,避免了一些不必要的內(nèi)存競爭.
在原始版本的CLH鎖中,節(jié)點間甚至都沒有互相鏈接.自旋鎖中,pred變量可以是一個局部變量.然而,Scott和Scherer證明了通過在節(jié)點中顯式地維護(hù)前驅(qū)節(jié)點,CLH鎖就可以處理“超時”和各種形式的“取消”:如果一個節(jié)點的前驅(qū)節(jié)點取消了,這個節(jié)點就可以滑動去使用前面一個節(jié)點的狀態(tài)字段.
為了將CLH隊列用于阻塞式同步器,需要做些額外的修改以提供一種高效的方式定位某個節(jié)點的后繼節(jié)點.在自旋鎖中,一個節(jié)點只需要改變其狀態(tài),下一次自旋中其后繼節(jié)點就能注意到這個改變,所以節(jié)點間的鏈接并不是必須的.但在阻塞式同步器中,一個節(jié)點需要顯式地喚醒(unpark)其后繼節(jié)點.
AQS隊列的節(jié)點包含一個next鏈接到它的后繼節(jié)點.但是,由于沒有針對雙向鏈表節(jié)點的類似compareAndSet的原子性無鎖插入指令,因此這個next鏈接的設(shè)置并非作為原子性插入操作的一部分,而僅是在節(jié)點被插入后簡單地賦值:
pred.next = node;
next鏈接僅是一種優(yōu)化.如果通過某個節(jié)點的next字段發(fā)現(xiàn)其后繼結(jié)點不存在(或看似被取消了),總是可以使用pred字段從尾部開始向前遍歷來檢查是否真的有后續(xù)節(jié)點.
第二個對CLH隊列主要的修改是將每個節(jié)點都有的狀態(tài)字段用于控制阻塞而非自旋.在同步器框架中,僅在線程調(diào)用具體子類中的tryAcquire方法返回true時,隊列中的線程才能從acquire操作中返回;而單個“released”位是不夠的.但仍然需要做些控制以確保當(dāng)一個活動的線程位于隊列頭部時,僅允許其調(diào)用tryAcquire;這時的acquire可能會失敗,然后(重新)阻塞.這種情況不需要讀取狀態(tài)標(biāo)識,因為可以通過檢查當(dāng)前節(jié)點的前驅(qū)是否為head來確定權(quán)限.與自旋鎖不同,讀取head以保證復(fù)制時不會有太多的內(nèi)存競爭( there is not enough memory contention reading head to warrant replication.).然而,“取消”狀態(tài)必須存在于狀態(tài)字段中.
隊列節(jié)點的狀態(tài)字段也用于避免沒有必要的park和unpark調(diào)用.雖然這些方法跟阻塞原語一樣快,但在跨越Java和JVM runtime以及操作系統(tǒng)邊界時仍有可避免的開銷.在調(diào)用park前,線程設(shè)置一個“喚醒(signal me)”位,然后再一次檢查同步和節(jié)點狀態(tài).一個釋放的線程會清空其自身狀態(tài).這樣線程就不必頻繁地嘗試阻塞,特別是在鎖相關(guān)的類中,這樣會浪費時間等待下一個符合條件的線程去申請鎖,從而加劇其它競爭的影響.除非后繼節(jié)點設(shè)置了“喚醒”位,否則這也可避免正在release的線程去判斷其后繼節(jié)點.這反過來也消除了這些情形:除非“喚醒”與“取消”同時發(fā)生,否則必須遍歷多個節(jié)點來處理一個似乎為null的next字段.
同步框架中使用的CLH鎖的變體與其他語言中的相比,主要區(qū)別可能是同步框架中使用的CLH鎖需要依賴?yán)厥展芾砉?jié)點的內(nèi)存,這就避免了一些復(fù)雜性和開銷.但是,即使依賴GC也仍然需要在確定鏈接字段不再需要時將其置為null.這往往可以與出隊操作一起完成.否則,無用的節(jié)點仍然可觸及,它們就沒法被回收.
其它一些更深入的微調(diào),包括CLH隊列首次遇到競爭時才需要的初始空節(jié)點的延遲初始化等,都可以在J2SE1.5的版本的源代碼文檔中找到相應(yīng)的描述.
拋開這些細(xì)節(jié),基本的acquire操作的最終實現(xiàn)的一般形式如下(互斥,非中斷,無超時):
if(!tryAcquire(arg)) {
node = create and enqueue new node;
pred = node's effective predecessor;
while (pred is not head node || !tryAcquire(arg)) {
if (pred's signal bit is set)
pard()
else
compareAndSet pred's signal bit to true;
pred = node's effective predecessor;
}
head = node;
}
release操作:
if(tryRelease(arg) && head node's signal bit is set) {
compareAndSet head's bit to false;
unpark head's successor, if one exist
}
acquire操作的主循環(huán)次數(shù)依賴于具體實現(xiàn)類中tryAcquire的實現(xiàn)方式.另一方面,在沒有“取消”操作的情況下,每一個組件的acquire和release都是一個O(1)的操作,忽略park中發(fā)生的所有操作系統(tǒng)線程調(diào)度.
支持“取消”操作主要是要在acquire循環(huán)里的park返回時檢查中斷或超時.由超時或中斷而被取消等待的線程會設(shè)置其節(jié)點狀態(tài),然后unpark其后繼節(jié)點.在有“取消”的情況下,判斷其前驅(qū)節(jié)點和后繼節(jié)點以及重置狀態(tài)可能需要O(n)的遍歷(n是隊列的長度).由于“取消”操作,該線程再也不會被阻塞,節(jié)點的鏈接和狀態(tài)字段可以被快速重建.
2.2.4 條件隊列
AQS框架提供了一個ConditionObject類,給維護(hù)獨占同步的類以及實現(xiàn)Lock接口的類使用.一個鎖對象可以關(guān)聯(lián)任意數(shù)目的條件對象,可以提供典型的管程風(fēng)格的await、signal和signalAll操作,包括帶有超時的,以及一些檢測、監(jiān)控的方法.
通過修正一些設(shè)計決策,ConditionObject類有效地將條件(conditions)與其它同步操作結(jié)合到了一起.該類只支持Java風(fēng)格的管程訪問規(guī)則,這些規(guī)則中,僅當(dāng)當(dāng)前線程持有鎖且要操作的條件(condition)屬于該鎖時,條件操作才是合法的.這樣,一個ConditionObject關(guān)聯(lián)到一個 ReentrantLock 上就表現(xiàn)的跟內(nèi)置的管程(通過Object.wait等)一樣了.兩者的不同僅僅在于方法的名稱、額外的功能以及用戶可以為每個鎖聲明多個條件.
ConditionObject使用了與同步器一樣的內(nèi)部隊列節(jié)點.但是,是在一個單獨的條件隊列中維護(hù)這些節(jié)點的.signal操作是通過將節(jié)點從條件隊列轉(zhuǎn)移到鎖隊列中來實現(xiàn)的,而沒有必要在需要喚醒的線程重新獲取到鎖之前將其喚醒.
基本的await操作如下:
create and add new node to conditon queue;
release lock;
block until node is on lock queue;
re-acquire lock;
signal操作如下:
transfer the first node from condition queue to lock queue;
因為只有在持有鎖的時候才能執(zhí)行這些操作,因此他們可以使用順序鏈表隊列操作來維護(hù)條件隊列(在節(jié)點中用一個nextWaiter字段).轉(zhuǎn)移操作僅僅把第一個節(jié)點從條件隊列中的鏈接解除,然后通過CLH插入操作將其插入到鎖隊列上.
為了維護(hù)適當(dāng)?shù)捻樞?,隊列?jié)點狀態(tài)變量中的一個位記錄了該節(jié)點是否已經(jīng)(或正在)被轉(zhuǎn)移.“喚醒”和“取消”相關(guān)的代碼都會嘗試用compareAndSet修改這個狀態(tài).如果某次signal操作修改失敗,就會轉(zhuǎn)移隊列中的下一個節(jié)點(如果存在的話).如果某次“取消”操作修改失敗,就必須中止此次轉(zhuǎn)移,然后等待重新獲得鎖.后面的情況采用了一個潛在的無限的自旋等待.在節(jié)點成功的被插到鎖隊列之前,被“取消”的等待不能重新獲得鎖,所以必須自旋等待CLH隊列插入(即compareAndSet操作)被“喚醒”線程成功執(zhí)行.這里極少需要自旋,且自旋里使用Thread.yield來提示應(yīng)該調(diào)度某一其它線程,理想情況下就是執(zhí)行signal的那個線程.雖然有可能在這里為“取消”實現(xiàn)一個幫助策略以幫助插入節(jié)點,但這種情況實在太少,找不到合適的理由來增加這些開銷.在其它所有的情況下,這個基本的機(jī)制都不需要自旋或yield,因此在單處理器上保持著合理的性能.
3. 利用AQS實現(xiàn)同步器
AQS已經(jīng)定義了同步器的基本功能, 要實現(xiàn)一個應(yīng)用層面的同步器, 只需要繼承該類并實現(xiàn)tryAcquire和tryRelease方法即可.
下面是一個最簡單的Mutex類的實現(xiàn),它使用同步狀態(tài)0表示解鎖,1表示鎖定.這個類并不需要同步方法中的參數(shù),因此這里在調(diào)用的時候使用0作為實參,方法實現(xiàn)里將其忽略.
class Mutex {
class Sync extends AbstractQueuedSynchronizer {
public boolean tryAcquire(int ignore) {
return compareAndSetState(0, 1);
}
public boolean tryRelease(int ignore) {
setState(0); return true;
}
}
private final Sync sync = new Sync();
public void lock() { sync.acquire(0); }
public void unlock() { sync.release(0); }
}
下面是java.util.concurrent包中同步器定義方式的概述:
ReentrantLock類使用AQS同步狀態(tài)來保存鎖(重復(fù))持有的次數(shù).當(dāng)鎖被一個線程獲取時,ReentrantLock也會記錄下當(dāng)前獲得鎖的線程標(biāo)識,以便檢查是否是重復(fù)獲取,以及當(dāng)錯誤的線程(譯者注:如果線程不是鎖的持有者,在此線程中執(zhí)行該鎖的unlock操作就是非法的)試圖進(jìn)行解鎖操作時檢測是否存在非法狀態(tài)異常.ReentrantLock也使用了AQS提供的ConditionObject,還向外暴露了其它監(jiān)控和監(jiān)測相關(guān)的方法.ReentrantLock通過在內(nèi)部聲明兩個不同的AbstractQueuedSynchronizer實現(xiàn)類(提供公平模式的那個禁用了闖入策略)來實現(xiàn)可選的公平模式,在創(chuàng)建ReentrantLock實例的時候根據(jù)設(shè)置(譯者注:即ReentrantLock構(gòu)造方法中的fair參數(shù))使用相應(yīng)的AbstractQueuedSynchronizer實現(xiàn)類.
ReentrantReadWriteLock類使用AQS同步狀態(tài)中的16位來保存寫鎖持有的次數(shù),剩下的16位用來保存讀鎖的持有次數(shù).WriteLock的構(gòu)建方式同ReentrantLock.ReadLock則通過使用acquireShared方法來支持同時允許多個讀線程.
Semaphore類(計數(shù)信號量)使用AQS同步狀態(tài)來保存信號量的當(dāng)前計數(shù).它里面定義的acquireShared方法會減少計數(shù),或當(dāng)計數(shù)為非正值時阻塞線程;tryRelease方法會增加計數(shù),可能在計數(shù)為正值時還要解除線程的阻塞.
CountDownLatch類使用AQS同步狀態(tài)來表示計數(shù).當(dāng)該計數(shù)為0時,所有的acquire操作(譯者注:acquire操作是從aqs的角度說的,對應(yīng)到CountDownLatch中就是await方法)才能通過.
FutureTask類使用AQS同步狀態(tài)來表示某個異步計算任務(wù)的運行狀態(tài)(初始化、運行中、被取消和完成).設(shè)置(譯者注:FutureTask的set方法)或取消(譯者注:FutureTask的cancel方法)一個FutureTask時會調(diào)用AQS的release操作,等待計算結(jié)果的線程的阻塞解除是通過AQS的acquire操作實現(xiàn)的.
SynchronousQueues類(一種CSP(Communicating Sequential Processes)形式的傳遞)使用了內(nèi)部的等待節(jié)點,這些節(jié)點可以用于協(xié)調(diào)生產(chǎn)者和消費者.同時,它使用AQS同步狀態(tài)來控制當(dāng)某個消費者消費當(dāng)前一項時,允許一個生產(chǎn)者繼續(xù)生產(chǎn),反之亦然.
4. 總結(jié)
總而言是, java.util.concurrent包中的同步框架使用硬件級別的同步原語和CAS的方式提升了java內(nèi)置同步工具9synchronized)的性能, 另外還提供了一個AQS框架, 是的實現(xiàn)一個高性能的同步器變得非常方便, 只需繼承AQS, 并實現(xiàn)tryAcquire和tryRelease方法即可.