隊(duì)列同步器AbstractQueuedSynchronizer(簡稱同步器),主要是用于構(gòu)建鎖或其他同步組件(例如Semaphore)的基礎(chǔ)框架,它使用了一個(gè)int成員變量表示同步狀態(tài),通過內(nèi)置的FIFO隊(duì)列來完成資源獲取線程的排隊(duì)工作,成為實(shí)現(xiàn)大部分同步需求的基礎(chǔ)?!禞ava并發(fā)編程的藝術(shù)》
上一篇介紹ReentrantLock可重入鎖時(shí)提到其底層實(shí)現(xiàn)為同步器,其內(nèi)部定義一個(gè)靜態(tài)內(nèi)部類繼承AbstractQueuedSynchronizer,通過實(shí)現(xiàn)相關(guān)模版方法來完成線程同步鎖的功能。
基礎(chǔ)知識
同步器提供了3個(gè)方法(getState()、setState(int newState)、compareAndSetState(int expect, int update))來實(shí)現(xiàn)狀態(tài)的獲取的設(shè)置,上述方法通過硬件層面的原子操作保證其安全性。同時(shí),同步器還提供了一系列的模版方法(例如:tryAcquire(int arg)、tryRelease(int arg)),其本身不具體實(shí)現(xiàn)。同步組件(鎖、semaphore等)內(nèi)部通過靜態(tài)內(nèi)部類繼承同步器,實(shí)現(xiàn)相關(guān)的同步接口,完成線程同步功能。鎖是面向用戶的,其定義一系列接口,底層實(shí)現(xiàn)交給同步器完成。同步器所提供的模板方法可分為3類:獨(dú)占式獲取和釋放同步狀態(tài)、共享式獲取和釋放同步狀態(tài)、查詢同步隊(duì)列的等待情況。
實(shí)現(xiàn)分析
1.同步隊(duì)列
同步器內(nèi)部通過一個(gè)同步隊(duì)列(FIFO雙向隊(duì)列)來完成同步狀態(tài)的管理,當(dāng)線程獲取同步狀態(tài)失敗時(shí),會將線程和等待狀態(tài)信息構(gòu)造成一個(gè)Node對象,并加入到同步隊(duì)列,同時(shí)會將當(dāng)前線程阻塞。當(dāng)同步狀態(tài)釋放時(shí),會將頭節(jié)點(diǎn)的后繼節(jié)點(diǎn)線程喚醒,并修改頭節(jié)點(diǎn)引用。

上圖中同步器包含兩個(gè)節(jié)點(diǎn)類型的引用,一個(gè)指向頭節(jié)點(diǎn),一個(gè)指向尾節(jié)點(diǎn)。頭節(jié)點(diǎn)即為當(dāng)前獲取同步狀態(tài)成功的節(jié)點(diǎn),頭節(jié)點(diǎn)線程釋放同步狀態(tài)后,會喚醒后繼節(jié)點(diǎn)線程,當(dāng)后繼節(jié)點(diǎn)線程獲取同步狀態(tài)成功后會將當(dāng)前線程對應(yīng)的節(jié)點(diǎn)設(shè)置為頭節(jié)點(diǎn)。因此設(shè)置頭節(jié)點(diǎn)的操作只存在與當(dāng)前獲取同步狀態(tài)成功的線程,不存在并發(fā)操作,不需要使用CAS操作來保證其安全性。不同的是,尾節(jié)點(diǎn)的設(shè)置需要使用CAS操作,原因就是多線程并發(fā)獲取同步狀態(tài),要保證等待線程加入隊(duì)列的安全性,只有
compareAndSetTail方法設(shè)置成功后,節(jié)點(diǎn)才回進(jìn)入隊(duì)列。
2.獨(dú)占式同步狀態(tài)獲取與釋放
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
調(diào)用同步器的acquire(int arg)方法可以獲取同步狀態(tài),該方法不處理線程中斷操作,當(dāng)線程獲取同步狀態(tài)失敗后加入同步隊(duì)列后,對線程進(jìn)行中斷無法從隊(duì)列中移除,只有通過前繼節(jié)點(diǎn)的喚醒操作,直到獲取同步狀態(tài)。線程獲取同步狀態(tài)失敗后,調(diào)用addWaiter(Node mode),將線程和等待信息構(gòu)造的節(jié)點(diǎn)加入到同步隊(duì)列中,enq(final Node node)方法是一個(gè)死循環(huán),直到節(jié)點(diǎn)節(jié)點(diǎn)成功加入同步隊(duì)列才會退出。當(dāng)節(jié)點(diǎn)加入同步隊(duì)列后調(diào)用acquireQueued(final Node node, int arg) 方法,該方法內(nèi)部為一個(gè)自旋的死循環(huán),會阻塞該線程,直到其前驅(qū)節(jié)點(diǎn)釋放同步狀態(tài),喚醒該線程,且成功獲取同步狀態(tài)后才會返回。

public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
線程獲取到同步狀態(tài)并執(zhí)行完成后,通過release方法釋放同步狀態(tài),釋放成功后將喚醒頭節(jié)點(diǎn)的后繼節(jié)點(diǎn)線程。
3.共享式同步狀態(tài)獲取與釋放
共享式與同步式最大的區(qū)別在于同一時(shí)刻能否允許多個(gè)線程同時(shí)獲取同步狀態(tài)。
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} catch (Throwable t) {
cancelAcquire(node);
throw t;
}
}
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!h.compareAndSetWaitStatus(Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!h.compareAndSetWaitStatus(0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
共享式同步狀態(tài)獲取通過acquireShared方法,doAcquireShared方法實(shí)現(xiàn)為自旋操作,直到tryAcquireShared方法返回值大于0,表示成功獲取到同步狀態(tài)并退出自旋。共享式同步狀態(tài)釋放由于可能存在多個(gè)線程同時(shí)擁有同步狀態(tài),因此其在釋放時(shí)必須保證線程安全,因此在doReleaseShared方法中會同過自旋和CAS操作實(shí)現(xiàn)釋放。
4.獨(dú)占式超時(shí)獲取同步狀態(tài)
通過同步器的doAcquireNanos(int arg, long nanosTimeout)方法可以實(shí)現(xiàn)超時(shí)獲取同步狀態(tài),該方法與獨(dú)占式最大的區(qū)別在于獲取同步狀態(tài)失敗后通過一個(gè)超時(shí)時(shí)間進(jìn)行自旋,在設(shè)定的超時(shí)時(shí)間內(nèi),若成功獲取同步狀態(tài)則成功返回,即使其前面有其他等待的線程,否則失敗。除此之外,其流程基本上與獨(dú)占式獲取基本相同。
private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (nanosTimeout <= 0L)
return false;
final long deadline = System.nanoTime() + nanosTimeout;
final Node node = addWaiter(Node.EXCLUSIVE);
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
return true;
}
nanosTimeout = deadline - System.nanoTime();
if (nanosTimeout <= 0L) {
cancelAcquire(node);
return false;
}
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD)
LockSupport.parkNanos(this, nanosTimeout);
if (Thread.interrupted())
throw new InterruptedException();
}
} catch (Throwable t) {
cancelAcquire(node);
throw t;
}
}

結(jié)束語
本文介紹AbstractQueuedSynchronizer隊(duì)列同步器的相關(guān)知識,包括使用和實(shí)現(xiàn)原理。通過了解同步器的實(shí)現(xiàn),基本上就掌握了鎖的實(shí)現(xiàn)原理。因此,了解同步器的的實(shí)現(xiàn)對于理解鎖的實(shí)現(xiàn)機(jī)制非常重要。本篇文章內(nèi)容大部分來自《Java并發(fā)編程藝術(shù)》,有興趣的可以讀下該書,好好看看源碼。