此篇博客所有源碼均來自JDK 1.8
在前面提到過,AQS是構(gòu)建Java同步組件的基礎(chǔ),我們期待它能夠成為實現(xiàn)大部分同步需求的基礎(chǔ)。AQS的設(shè)計模式采用的模板方法模式,子類通過繼承的方式,實現(xiàn)它的抽象方法來管理同步狀態(tài),對于子類而言它并沒有太多的活要做,AQS提供了大量的模板方法來實現(xiàn)同步,主要是分為三類:獨占式獲取和釋放同步狀態(tài)、共享式獲取和釋放同步狀態(tài)、查詢同步隊列中的等待線程情況。自定義子類使用AQS提供的模板方法就可以實現(xiàn)自己的同步語義。
獨占式
獨占式,同一時刻僅有一個線程持有同步狀態(tài)。
獨占式同步狀態(tài)獲取
acquire(int arg)方法為AQS提供的模板方法,該方法為獨占式獲取同步狀態(tài),但是該方法對中斷不敏感,也就是說由于線程獲取同步狀態(tài)失敗加入到CLH同步隊列中,后續(xù)對線程進(jìn)行中斷操作時,線程不會從同步隊列中移除。代碼如下:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
各個方法定義如下:
- tryAcquire:去嘗試獲取鎖,獲取成功則設(shè)置鎖狀態(tài)并返回true,否則返回false。該方法自定義同步組件自己實現(xiàn),該方法必須要保證線程安全的獲取同步狀態(tài)。
- addWaiter:如果tryAcquire返回FALSE(獲取同步狀態(tài)失敗),則調(diào)用該方法將當(dāng)前線程加入到CLH同步隊列尾部。
- acquireQueued:當(dāng)前線程會根據(jù)公平性原則來進(jìn)行阻塞等待(自旋),直到獲取鎖為止;并且返回當(dāng)前線程在等待過程中有沒有中斷過。
- selfInterrupt:產(chǎn)生一個中斷。
acquireQueued方法為一個自旋的過程,也就是說當(dāng)前線程(Node)進(jìn)入同步隊列后,就會進(jìn)入一個自旋的過程,每個節(jié)點都會自省地觀察,當(dāng)條件滿足,獲取到同步狀態(tài)后,就可以從這個自旋過程中退出,否則會一直執(zhí)行下去。如下:
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
//中斷標(biāo)志
boolean interrupted = false;
/*
* 自旋過程,其實就是一個死循環(huán)而已
*/
for (;;) {
//當(dāng)前線程的前驅(qū)節(jié)點
final Node p = node.predecessor();
//當(dāng)前線程的前驅(qū)節(jié)點是頭結(jié)點,且同步狀態(tài)成功
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
//獲取失敗,線程等待--具體后面介紹
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
從上面代碼中可以看到,當(dāng)前線程會一直嘗試獲取同步狀態(tài),當(dāng)然前提是只有其前驅(qū)節(jié)點為頭結(jié)點才能夠嘗試獲取同步狀態(tài),理由:
- 保持FIFO同步隊列原則。
- 頭節(jié)點釋放同步狀態(tài)后,將會喚醒其后繼節(jié)點,后繼節(jié)點被喚醒后需要檢查自己是否為頭節(jié)點。
acquire(int arg)方法流程圖如下:

獨占式獲取響應(yīng)中斷
AQS提供了acquire(int arg)方法以供獨占式獲取同步狀態(tài),但是該方法對中斷不響應(yīng),對線程進(jìn)行中斷操作后,該線程會依然位于CLH同步隊列中等待著獲取同步狀態(tài)。為了響應(yīng)中斷,AQS提供了acquireInterruptibly(int arg)方法,該方法在等待獲取同步狀態(tài)時,如果當(dāng)前線程被中斷了,會立刻響應(yīng)中斷拋出異常InterruptedException。
public final void acquireInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
首先校驗該線程是否已經(jīng)中斷了,如果是則拋出InterruptedException,否則執(zhí)行tryAcquire(int arg)方法獲取同步狀態(tài),如果獲取成功,則直接返回,否則執(zhí)行doAcquireInterruptibly(int arg)。doAcquireInterruptibly(int arg)定義如下:
private void doAcquireInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
doAcquireInterruptibly(int arg)方法與acquire(int arg)方法僅有兩個差別。1.方法聲明拋出InterruptedException異常,2.在中斷方法處不再是使用interrupted標(biāo)志,而是直接拋出InterruptedException異常。
獨占式超時獲取
AQS除了提供上面兩個方法外,還提供了一個增強版的方法:tryAcquireNanos(int arg,long nanos)。該方法為acquireInterruptibly方法的進(jìn)一步增強,它除了響應(yīng)中斷外,還有超時控制。即如果當(dāng)前線程沒有在指定時間內(nèi)獲取同步狀態(tài),則會返回false,否則返回true。如下:
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquire(arg) ||
doAcquireNanos(arg, nanosTimeout);
}
tryAcquireNanos(int arg, long nanosTimeout)方法超時獲取最終是在doAcquireNanos(int arg, long nanosTimeout)中實現(xiàn)的,如下:
private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
//nanosTimeout <= 0
if (nanosTimeout <= 0L)
return false;
//超時時間
final long deadline = System.nanoTime() + nanosTimeout;
//新增Node節(jié)點
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
//自旋
for (;;) {
final Node p = node.predecessor();
//獲取同步狀態(tài)成功
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return true;
}
/*
* 獲取失敗,做超時、中斷判斷
*/
//重新計算需要休眠的時間
nanosTimeout = deadline - System.nanoTime();
//已經(jīng)超時,返回false
if (nanosTimeout <= 0L)
return false;
//如果沒有超時,則等待nanosTimeout納秒
//注:該線程會直接從LockSupport.parkNanos中返回,
//LockSupport為JUC提供的一個阻塞和喚醒的工具類,后面做詳細(xì)介紹
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
//線程是否已經(jīng)中斷了
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
針對超時控制,程序首先記錄喚醒時間deadline ,deadline = System.nanoTime() + nanosTimeout(時間間隔)。如果獲取同步狀態(tài)失敗,則需要計算出需要休眠的時間間隔nanosTimeout(= deadline - System.nanoTime()),如果nanosTimeout <= 0 表示已經(jīng)超時了,返回false,如果大于spinForTimeoutThreshold(1000L)則需要休眠nanosTimeout ,如果nanosTimeout <= spinForTimeoutThreshold ,就不需要休眠了,直接進(jìn)入快速自旋的過程。原因在于 spinForTimeoutThreshold 已經(jīng)非常小了,非常短的時間等待無法做到十分精確,如果這時再次進(jìn)行超時等待,相反會讓nanosTimeout 的超時從整體上面表現(xiàn)得不是那么精確,所以在超時非常短的場景中,AQS會進(jìn)行無條件的快速自旋。
整個流程如下:

獨占式同步狀態(tài)釋放
當(dāng)線程獲取同步狀態(tài)后,執(zhí)行完相應(yīng)邏輯后就需要釋放同步狀態(tài)。AQS提供了release(int arg)方法釋放同步狀態(tài):
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
該方法同樣是先調(diào)用自定義同步器自定義的tryRelease(int arg)方法來釋放同步狀態(tài),釋放成功后,會調(diào)用unparkSuccessor(Node node)方法喚醒后繼節(jié)點(如何喚醒LZ后面介紹)。
這里稍微總結(jié)下:
在AQS中維護(hù)著一個FIFO的同步隊列,當(dāng)線程獲取同步狀態(tài)失敗后,則會加入到這個CLH同步隊列的對尾并一直保持著自旋。在CLH同步隊列中的線程在自旋時會判斷其前驅(qū)節(jié)點是否為首節(jié)點,如果為首節(jié)點則不斷嘗試獲取同步狀態(tài),獲取成功則退出CLH同步隊列。當(dāng)線程執(zhí)行完邏輯后,會釋放同步狀態(tài),釋放后會喚醒其后繼節(jié)點。
共享式
共享式與獨占式的最主要區(qū)別在于同一時刻獨占式只能有一個線程獲取同步狀態(tài),而共享式在同一時刻可以有多個線程獲取同步狀態(tài)。例如讀操作可以有多個線程同時進(jìn)行,而寫操作同一時刻只能有一個線程進(jìn)行寫操作,其他操作都會被阻塞。
共享式同步狀態(tài)獲取
AQS提供acquireShared(int arg)方法共享式獲取同步狀態(tài):
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
//獲取失敗,自旋獲取同步狀態(tài)
doAcquireShared(arg);
}
從上面程序可以看出,方法首先是調(diào)用tryAcquireShared(int arg)方法嘗試獲取同步狀態(tài),如果獲取失敗則調(diào)用doAcquireShared(int arg)自旋方式獲取同步狀態(tài),共享式獲取同步狀態(tài)的標(biāo)志是返回 >= 0 的值表示獲取成功。自選式獲取同步狀態(tài)如下:
private void doAcquireShared(int arg) {
/共享式節(jié)點
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
//前驅(qū)節(jié)點
final Node p = node.predecessor();
//如果其前驅(qū)節(jié)點,獲取同步狀態(tài)
if (p == head) {
//嘗試獲取同步
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
tryAcquireShared(int arg)方法嘗試獲取同步狀態(tài),返回值為int,當(dāng)其 >= 0 時,表示能夠獲取到同步狀態(tài),這個時候就可以從自旋過程中退出。
acquireShared(int arg)方法不響應(yīng)中斷,與獨占式相似,AQS也提供了響應(yīng)中斷、超時的方法,分別是:acquireSharedInterruptibly(int arg)、tryAcquireSharedNanos(int arg,long nanos),這里就不做解釋了。
共享式同步狀態(tài)釋放
獲取同步狀態(tài)后,需要調(diào)用release(int arg)方法釋放同步狀態(tài),方法如下:
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
因為可能會存在多個線程同時進(jìn)行釋放同步狀態(tài)資源,所以需要確保同步狀態(tài)安全地成功釋放,一般都是通過CAS和循環(huán)來完成的。
參考資料
Doug Lea:《Java并發(fā)編程實戰(zhàn)》
方騰飛:《Java并發(fā)編程的藝術(shù)》
