有沒有想過ReentrantLock,Semaphores等是怎么實現(xiàn)同步的?這一切歸功于幕后功臣AQS,全名AbstractQueuedSynchronizer--抽象的隊列同步器
jdk文檔上的說明:
提供一個依賴于先進先出(FIFO)等待隊列,實現(xiàn)阻塞鎖和相關(guān)同步器(信號量、事件等)的框架
也就是說,AbstractQueuedSynchronizer是一個用隊列來實現(xiàn)的同步框架
首先瞄一瞄它的子類

再勾畫出它的繼承圖

可以看到,ReentrantLock,Semaphores,ThreadPoolExecutor,CountDownLatch,ReentrantReadWriteLock的同步機制的實現(xiàn),都依賴于AbstractQueuedSynchronizer
那么它們是怎么通過AQS實現(xiàn)同步的呢?
//ReentrantLock類的公平鎖實現(xiàn)
final void lock() {
acquire(1);
}
//ReentrantLock類的非公平鎖實現(xiàn)
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
//ReentrantReadWriteLock類lock方法實現(xiàn)
public void lock() {
sync.acquire(1);
}
可以看到這幾種鎖的實現(xiàn)都依賴AQS類的一個很重要的方法--acquire,我們一起來揭開acquire神秘的內(nèi)衣,不,面紗
/**
* 獨占模式獲取
* AQS類的子類需實現(xiàn)本方法里的tryAcquire方法
* 否則,線程將排隊,可能會重復(fù)阻塞和解除阻塞
* 直到調(diào)用tryAcquire成功為止.
* 該方法可用于實現(xiàn)鎖方法
*
* @param arg
*/
public final void acquire(int arg) {
if (!tryAcquire(arg) && acquireQueued(addWaiter(AbstractQueuedSynchronizer.Node.EXCLUSIVE), arg))
selfInterrupt();
}
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
此方法是使當前線程獲取獨占模式,說人話就是獲取線程的運行權(quán),使線程進入RUNNABLE狀態(tài).
在acquire里,首先調(diào)用tryAcquire方法,如果tryAcquire返回true,線程直接變?yōu)檫\行狀態(tài),若不成功則調(diào)用acquireQueued和addWaiter方法,使線程進入CLH隊列,這個后面會說到.
但我們看到tryAcquire直接拋異常了,什么鬼!我來翻譯一下jdk文檔的注釋
/**
* 試圖以獨占模式獲取.
* 此方法應(yīng)該查詢狀態(tài)是否允許存在獨占模式,允許才進行獲取
*
* @param arg 此參數(shù)一般傳1,不過如你有其他需要傳多少都行
* @return {@code true}
* @throws IllegalMonitorStateException 如果AQS的狀態(tài)是非法狀態(tài)就會拋出次異常
* @throws UnsupportedOperationException 如果實現(xiàn)類不支持獨占模式就會拋出此異常
*/
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
原來,AQS類的子類需覆蓋tryAcquire方法,直接拋異常是考慮到,子類若不支持獨占鎖模式,就無須覆蓋此方法,調(diào)用的時候自然就拋出UnsupportedOperationException.還有arg參數(shù)一般傳1,這是因為可重入鎖每次重入,狀態(tài)state+1.
至于各種Lock是怎么覆蓋tryAcquire方法,以及重入狀態(tài)為什么是+1,請參考線程八鎖
我們繼續(xù)來扒一扒acquireQueued方法,在說這個方法前,先說說AQS的底層數(shù)據(jù)結(jié)構(gòu)--CLH隊列--的變體--不知道什么鬼隊列.反正它叫隊列同步器,顧名思義,需要一個隊列來控制同步,而隊列的節(jié)點依賴于內(nèi)部類Node
/**
* 等待隊列節(jié)點類
* 此等待隊列是CLH鎖隊列的一種變體
* 每個節(jié)點都有一個"status"字段來表示該節(jié)點中的線程是否應(yīng)該被阻塞
*/
static final class Node {
/**
* 共享節(jié)點
*/
static final Node SHARED = new Node();
/**
* 獨占節(jié)點
*/
static final Node EXCLUSIVE = null;
/**
* 表示線程已被取消
*/
static final int CANCELLED = 1;
/**
* 表示當前節(jié)點的后繼線程需要阻塞
*/
static final int SIGNAL = -1;
/**
* 表示線程在等待條件
*/
static final int CONDITION = -2;
/**
* 表示下一次acquireShared應(yīng)該無條件傳播
*/
static final int PROPAGATE = -3;
/**
* SIGNAL: 此節(jié)點的后繼線程是阻塞的或即將阻塞,所以此節(jié)點的線程釋放或被取消時,必須喚醒它的后
* 繼.為了避免產(chǎn)生競爭,獲取方法必須表明它們需要一個信號,然后重新原子獲取,若失敗則阻塞
* CANCELLED: 由于超時或中斷,此節(jié)點被取消.節(jié)點永遠不會離開此狀態(tài).特別是,
* 節(jié)點的線程處于取消狀態(tài)再也不會阻塞
* CONDITION: 此節(jié)點當前位于條件隊列中.它不會用作同步隊列節(jié)點,直至狀態(tài)改變?yōu)橹?當時的狀態(tài)
* 將被設(shè)置為0
* PROPAGATE: 已發(fā)布的節(jié)點應(yīng)該傳播到其他節(jié)點。這在doReleaseShared中設(shè)置(僅針對head節(jié)
* 點),以確保傳播繼續(xù)進行,即使其他操作已經(jīng)介入。
* 0: 以上都不是
*/
volatile int waitStatus;
volatile Node prev;
volatile Node next;
volatile Thread thread;
Node nextWaiter;
final boolean isShared() {
return nextWaiter == SHARED;
}
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() { // Used to establish initial head or SHARED marker
}
//將線程構(gòu)造成一個Node,添加到等待隊列
Node(Thread thread, Node mode) {
this.nextWaiter = mode;
this.thread = thread;
}
//這個方法會在Condition隊列使用
Node(Thread thread, int waitStatus) {
this.waitStatus = waitStatus;
this.thread = thread;
}
}
從源碼可以看出,一個Node對象里面包含前繼節(jié)點,后繼節(jié)點和一個線程對象,所以Node對象組成的隊列是基于雙向鏈表的FIFO(先進先出)隊列.大概長這樣(懶的畫,偷了張圖回來,既然上面有水印我就不注明出處了)

先看看addWaiter源碼
/**
* Creates and enqueues node for current thread and given mode.
*
* @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
* @return the new node
*/
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {//若尾節(jié)點非空
node.prev = pred;//將尾節(jié)點作為新節(jié)點的前繼
if (compareAndSetTail(pred, node)) {//CAS把新節(jié)點設(shè)置為尾節(jié)點
pred.next = node;//把原尾節(jié)點的后繼設(shè)置為新節(jié)點
return node;
}
}
enq(node);
return node;
}
/**
* 通過自旋操作把當前節(jié)點加入到隊列尾部
*/
private Node enq(final Node node) {
for (; ; ) {
Node t = tail; //如果是第一次添加到隊列,那么tail=null
if (t == null) { // Must initialize
//CAS的方式創(chuàng)建一個空的Node作為頭結(jié)點
if (compareAndSetHead(new Node()))
//此時隊列中只一個頭結(jié)點,所以tail也指向它
tail = head;
} else {
//進行第二次循環(huán)時,tail不為null,進入else區(qū)域。
//將當前線程的Node結(jié)點的prev指向tail,然后使用CAS將tail指向Node
node.prev = t;
if (compareAndSetTail(t, node)) {
//t此時指向tail,所以可以CAS成功,將tail重新指向Node。
//此時t為更新前的tail的值,即指向空的頭結(jié)點,t.next=node,就將頭結(jié)點的后續(xù)結(jié)點指向Node,返回頭結(jié)點
t.next = node;
return t;
}
}
}
}
addWaiter方法大致流程是:
- 創(chuàng)建當前線程節(jié)點
- 若尾節(jié)點不為空,則把新節(jié)點添加到尾部,成為新的尾節(jié)點
- 若尾節(jié)點為空,則調(diào)用enq方法,首先初始化隊列,第二次循環(huán)時,再把新節(jié)點添加到尾部,成為新的尾節(jié)點

再看看acquireQueued的源碼
/**
* 獲取已處于獨占不可中斷模式的線程隊列
* @return 是否在等待時被打斷
*/
final boolean acquireQueued(/**此node為addWaiter所返回,即已成為尾節(jié)點的當前線程節(jié)點**/final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {//自旋
final Node p = node.predecessor();//獲取前繼節(jié)點
//若前繼是head節(jié)點才有資格獲取鎖
if (p == head &&
//在這里還是會調(diào)用tryAcquire
tryAcquire(arg)) {
//若獲取鎖成功,設(shè)置當前節(jié)點為頭節(jié)點
setHead(node);
//凡是head節(jié)點,head.thread與head.prev永遠為null, 但是head.next不為null
p.next = null; // help GC--有助于GC回收
failed = false;
return interrupted;
}
//如果獲取鎖失敗或前繼節(jié)點不是頭節(jié)點,則根據(jù)節(jié)點的waitStatus決定是否需要掛起線程
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())// 若前面為true,則執(zhí)行掛起,待下次喚醒的時候檢測中斷的標志
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);// 如果拋出異常則取消鎖的獲取,進行出隊(sync queue)操作
}
}
小結(jié)acquireQueued的大致流程:
- 判斷當前節(jié)點的前驅(qū)節(jié)點是否頭節(jié)點
- 若是,則嘗試獲取鎖,如果獲取鎖成功,則設(shè)置當前為頭節(jié)點
- 若當前節(jié)點的前驅(qū)節(jié)點不是頭節(jié)點或嘗試獲取鎖失敗,則根據(jù)節(jié)點的waitStatus決定是否需要掛起線程
- 若上述過程出現(xiàn)異常(因為上述操作在自旋中,不出現(xiàn)異常不會跳到finally代碼塊中),則判斷頭節(jié)點是否設(shè)置失敗,若失敗,則取消正在進行的獲取鎖的嘗試
釋放鎖移除節(jié)點
head節(jié)點表示獲取鎖成功的節(jié)點,當頭結(jié)點在釋放同步狀態(tài)時,會喚醒后繼節(jié)點,如果后繼節(jié)點獲得鎖成功,會把自己設(shè)置為頭結(jié)點,節(jié)點的變化過程如下

- 修改head節(jié)點指向下一個獲得鎖的節(jié)點
- 新的獲得鎖的節(jié)點,將prev的指針指向null
這里有一個小的變化,就是設(shè)置head節(jié)點不需要用CAS,原因是設(shè)置head節(jié)點是由獲得鎖的線程來完成的,而同步鎖只能由一個線程獲得,所以不需要CAS保證,只需要把head節(jié)點設(shè)置為原首節(jié)點的后繼節(jié)點,并且斷開原h(huán)ead節(jié)點的next引用即可
對應(yīng)源碼
/**
* Releases in exclusive mode. Implemented by unblocking one or
* more threads if {@link #tryRelease} returns true.
* This method can be used to implement method {@link Lock#unlock}.
*
* @param arg the release argument. This value is conveyed to
* {@link #tryRelease} but is otherwise uninterpreted and
* can represent anything you like.
* @return the value returned from {@link #tryRelease}
*/
public final boolean release(long arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
參考文獻
深入分析AQS實現(xiàn)原理