概述
AQS( AbstractQueuedSynchronizer ) 是一個(gè)用于構(gòu)建鎖和同步器的框架,許多同步器都可以通過(guò)AQS很容易并且高效地構(gòu)造出來(lái)。如: ReentrantLock 和 Semaphore都是基于AQS構(gòu)建的,還包括CountDownLatch、ReentrantReadWriteLock、SynchronousQueue和FutureTask。
AQS解決了在實(shí)現(xiàn)同步器時(shí)涉及的大量細(xì)節(jié)問(wèn)題,例如等待線程采用FIFO隊(duì)列操作順序。在不同的同步器中還可以定義一些靈活的標(biāo)準(zhǔn)來(lái)判斷某個(gè)線程是應(yīng)該通過(guò)還是需要等待。本文通過(guò)介紹AQS相關(guān)API以及juc包下相關(guān)實(shí)現(xiàn)類對(duì)其進(jìn)行介紹
原理
使用volatile的整形變量state用來(lái)表示當(dāng)前是否可以獲取鎖,如在某些非共享鎖里,state=1 則代表當(dāng)前鎖已經(jīng)被占有,此時(shí)如果有線程來(lái)請(qǐng)求鎖時(shí)且當(dāng)前獨(dú)占模式的鎖已經(jīng)被其他線程占有,則會(huì)進(jìn)入AQS里維持的CLH隊(duì)列(FIFO)里排隊(duì),HEAD 位置的線程為正在占用鎖的線程,當(dāng)它釋放鎖時(shí),會(huì)喚醒Head的next 節(jié)點(diǎn)的線程,next節(jié)點(diǎn)使用自旋的方式不斷的嘗試獲取鎖。 自定義的同步器在實(shí)現(xiàn)時(shí),只需要定義state變量獲?。╰ryAcquire)與釋放(tryRelease)的規(guī)則,其他細(xì)節(jié)在AQS里已經(jīng)實(shí)現(xiàn)。

獨(dú)占模式
/**
* Acquires in exclusive mode, ignoring interrupts. Implemented
* by invoking at least once {@link #tryAcquire},
* returning on success. Otherwise the thread is queued, possibly
* repeatedly blocking and unblocking, invoking {@link
* #tryAcquire} until success. This method can be used
* to implement method {@link Lock#lock}.
*
* @param arg the acquire argument. This value is conveyed to
* {@link #tryAcquire} but is otherwise uninterpreted and
* can represent anything you like.
*/
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
根據(jù) java doc的注釋:以獨(dú)占模式獲取,忽略中斷,通過(guò)最少調(diào)用一次 tryAcquire 來(lái)返回成功,否則將該線程放入隊(duì)列,可能一直阻塞或非阻塞,直到調(diào)用 tryAcquire 返回成功,該方法可以用來(lái)實(shí)現(xiàn)鎖。
可以看出 AbstractQueuedSynchronizer 中的 acquire 是獨(dú)占模式需要調(diào)用的方法,該方法又依次調(diào)用了 3個(gè)方法:
tryAcquire(arg)
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
默認(rèn)實(shí)現(xiàn)拋出一個(gè) UnsupportedOperationException 異常 ,將該方法的具體實(shí)現(xiàn)交給子類,那么這里的入?yún)?int arg 便是與 AbstractQueuedSynchronizer 中的 state 變量來(lái)比較確認(rèn)當(dāng)前線程是否可以獲取鎖,一般情況下,如果是獨(dú)占鎖,獲取鎖時(shí)通過(guò)CAS 將 state 置為 1 ,釋放鎖時(shí) 通過(guò) CAS 將 state 置為 0
addWaiter(Node.EXCLUSIVE)
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
該方法用于將當(dāng)前線程的狀態(tài)封裝到 Node 節(jié)點(diǎn)內(nèi),然后通過(guò)尾插法將 node 節(jié)點(diǎn)放入到鏈表末尾
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
//判斷當(dāng)前節(jié)點(diǎn)是否被通知中斷
boolean interrupted = false;
//自旋方式,不斷嘗試獲取資源
for (;;) {
final Node p = node.predecessor();
//如果上一個(gè)節(jié)點(diǎn)是Head節(jié)點(diǎn),則嘗試獲取資源
if (p == head && tryAcquire(arg)) {
//獲取資源成功,則將當(dāng)前節(jié)點(diǎn)設(shè)為Head節(jié)點(diǎn)
setHead(node);
//當(dāng)前節(jié)點(diǎn)獲取資源成功,那么之前的Head節(jié)點(diǎn)需要釋放,因此將之前Head相關(guān)引用設(shè)為NULL,幫助GC回收
p.next = null; // help GC
failed = false;
//返回中斷過(guò)程是否中斷過(guò)
return interrupted;
}
//遞歸判斷當(dāng)前節(jié)點(diǎn)的上一個(gè)節(jié)點(diǎn)的waitStatus是否有效
//park 當(dāng)前Node節(jié)點(diǎn)的線程(wait當(dāng)前線程),檢查是否中斷
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
自旋一次,嘗試獲取鎖??梢钥吹?node.predecessor(); 返回的是當(dāng)前node的父節(jié)點(diǎn),如果父節(jié)點(diǎn)是Head 節(jié)點(diǎn),那么當(dāng)前 node 節(jié)點(diǎn)就會(huì)再次執(zhí)行 tryAcquire(arg) 方法,若返回 success 則獲取鎖,失敗則 park 當(dāng)前線程進(jìn)入等待喚醒的狀態(tài),同時(shí)檢查當(dāng)前線程是否中斷,如果被設(shè)置了中斷的標(biāo)記,則 將interrupted 設(shè)置為true,在獲取資源成功的時(shí)候,去響應(yīng)中斷
shouldParkAfterFailedAcquire(Node pred, Node node)
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
//判斷前一個(gè)節(jié)點(diǎn)的waitStatus 是否小于0,如果大于0,則該節(jié)點(diǎn)已經(jīng)被取消
int ws = pred.waitStatus;
//如果為等待通知,則表示等待unpark(等待喚醒)
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
//如果上一個(gè)節(jié)點(diǎn)被取消掉(大于0)則跳過(guò),循環(huán)的找上一節(jié)點(diǎn)為未取消的(小于0)
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
//如果當(dāng)前節(jié)點(diǎn)的waitStatus 小于0,但不等于SIGNAL,則設(shè)置為SIGNAL
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
shouldParkAfterFailedAcquire(Node pred, Node node) 此方法主要用于檢查waitStatus的狀態(tài),看看自己是否真的可以去休息了(進(jìn)入waiting狀態(tài)),萬(wàn)一隊(duì)列前邊的線程都放棄了,則丟棄引用,讓GC回收。
Node節(jié)點(diǎn)中維持著waitStatus這個(gè)字段代表的含義如下:
/** waitStatus value to indicate thread has cancelled */
//值為1,在同步隊(duì)列中等待的線程等待超時(shí)或被中斷,需要從同步隊(duì)列中取消該Node的結(jié)點(diǎn),其結(jié)點(diǎn)的waitStatus為CANCELLED,即結(jié)束狀態(tài),進(jìn)入該狀態(tài)后的結(jié)點(diǎn)將不會(huì)再變化。
static final int CANCELLED = 1;
/** waitStatus value to indicate successor's thread needs unparking */
//值為-1,被標(biāo)識(shí)為該等待喚醒狀態(tài)的后繼結(jié)點(diǎn),當(dāng)其前繼結(jié)點(diǎn)的線程釋放了同步鎖或被取消,將會(huì)通知該后繼結(jié)點(diǎn)的線程執(zhí)行。說(shuō)白了,
//就是處于喚醒狀態(tài),只要前繼結(jié)點(diǎn)釋放鎖,就會(huì)通知標(biāo)識(shí)為SIGNAL狀態(tài)的后繼結(jié)點(diǎn)的線程執(zhí)行。
static final int SIGNAL = -1;
/** waitStatus value to indicate thread is waiting on condition */
//值為-2,與Condition相關(guān),該標(biāo)識(shí)的結(jié)點(diǎn)處于等待隊(duì)列中,結(jié)點(diǎn)的線程等待在Condition上,當(dāng)其他線程調(diào)用了Condition的signal()
// 方法后,CONDITION狀態(tài)的結(jié)點(diǎn)將從等待隊(duì)列轉(zhuǎn)移到同步隊(duì)列中,等待獲取同步鎖。
static final int CONDITION = -2;
/**
* waitStatus value to indicate the next acquireShared should
* unconditionally propagate
*/
//值為-3,與共享模式相關(guān),在共享模式中,該狀態(tài)標(biāo)識(shí)結(jié)點(diǎn)的線程處于可運(yùn)行狀態(tài)。
static final int PROPAGATE = -3;
//0 代表為初始狀態(tài)
parkAndCheckInterrupt()
private final boolean parkAndCheckInterrupt() {
//調(diào)用park()使線程進(jìn)入waiting狀態(tài)
LockSupport.park(this);
//如果被喚醒,查看自己是不是被中斷的
return Thread.interrupted();
}
parkAndCheckInterrupt() 讓線程去休息,真正進(jìn)入等待狀態(tài)。park()會(huì)讓當(dāng)前線程進(jìn)入waiting狀態(tài)。在此狀態(tài)下,有兩種途徑可以喚醒該線程:1)被unpark();2)被interrupt()。需要注意的是,Thread.interrupted()會(huì)清除當(dāng)前線程的中斷標(biāo)記位。
獨(dú)占模式的大致流程知道了后,用流程圖來(lái)總結(jié)下(貼上源碼)
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
- 調(diào)用自定義同步器的tryAcquire()嘗試直接去獲取資源,如果成功則直接返回;
沒(méi)成功,則addWaiter()將該線程加入等待隊(duì)列的尾部,并標(biāo)記為獨(dú)占模式; - acquireQueued()使線程在等待隊(duì)列中休息,有機(jī)會(huì)時(shí)(輪到自己,會(huì)被
unpark())會(huì)去嘗試獲取資源。獲取到資源后才返回。如果在整個(gè)等待過(guò)程中被中斷過(guò),則返回true,否則返回false。 - 如果線程在等待過(guò)程中被中斷過(guò),它是不響應(yīng)的。只是獲取資源后才再進(jìn)行自我中斷selfInterrupt(),將中斷補(bǔ)上。

釋放鎖
release(int arg)
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
release(int arg) 獨(dú)占模式下線程釋放共享資源的頂層入口。它會(huì)釋放指定量的資源,如果徹底釋放了(即state=0),它會(huì)喚醒等待隊(duì)列里的其他線程來(lái)獲取資源。這也正是unlock()的語(yǔ)義,當(dāng)然不僅僅只限于unlock()。
tryRelease
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
tryRelease(int arg) 需要子類實(shí)現(xiàn)的獲取資源成功的方法,用作父類根據(jù)子類釋放是否成功作為判斷依據(jù)
unparkSuccessor(Node node)
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0) //設(shè)置當(dāng)前線程的waitStatus為初始狀態(tài) 0
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next; //找到head節(jié)點(diǎn)的下一個(gè)需要喚醒的節(jié)點(diǎn),如果下一個(gè)節(jié)點(diǎn)被取消了(>0),則從tail往前找
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
//unpart 喚醒線程
if (s != null)
LockSupport.unpark(s.thread);
}
unparkSuccessor(Node node) 方法用來(lái)將 Head(當(dāng)前持有鎖的 Node)節(jié)點(diǎn)的 waitStatus 的值重置為 0,然后找到 Head 的下一個(gè)未取消 (cancel)的節(jié)點(diǎn)找出來(lái)并喚醒(因此喚醒的節(jié)點(diǎn)便可以繼續(xù)在 acquireQueued 中自旋獲取鎖)
疑問(wèn)
在 unparkSuccessor(Node node) 中
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
如果發(fā)現(xiàn) s 為空,則從 tail 尾向 prev 前遍歷整個(gè)鏈表,找到未取消(waitStatus < 0)的節(jié)點(diǎn),那么為什么這里不直接從s 的 next 節(jié)點(diǎn)往后找呢?
個(gè)人猜測(cè)這里可能是因?yàn)橛?jì)算機(jī)的局部性原理,如果head 的 next節(jié)點(diǎn)取消了,那么next 節(jié)點(diǎn)的 next 節(jié)點(diǎn)等也又可能取消了,因此從最新的尾部向前找似乎更合理。
但是這里我又有一個(gè)新的問(wèn)題!
在入隊(duì) 調(diào)用 addWaiter(Node mode)方法的時(shí)候,因?yàn)楸旧硎俏膊宸?,此時(shí)便檢查 prev.waitStatus < 0 == true,如果為 false 則往 prev 前繼續(xù)找有效的尾部節(jié)點(diǎn),而不是在釋放鎖的地方去判斷?
感謝閱讀,您的點(diǎn)贊加關(guān)注是對(duì)我最大的支持