1、AQS簡介
AQS(java.util.concurrent.locks.AbstractQueuedSynchronizer)是用來構(gòu)建鎖或者其他同步組件(信號量、事件等)的基礎(chǔ)框架類。JDK中許多并發(fā)工具類的內(nèi)部實(shí)現(xiàn)都依賴于AQS,如ReentrantLock, Semaphore, CountDownLatch等等。
AQS的主要使用方式是繼承它作為一個內(nèi)部輔助類實(shí)現(xiàn)同步原語,它可以簡化你的并發(fā)工具的內(nèi)部實(shí)現(xiàn),屏蔽同步狀態(tài)管理、線程的排隊(duì)、等待與喚醒等底層操作。
在基于AQS構(gòu)建的同步器類中,最基本的操作包括各種形式的獲取操作和釋放操作。獲取操作是一種依賴狀態(tài)的操作,并且通常會阻塞。當(dāng)使用鎖或信號量時,“獲取”操作的含義就很直觀,即獲取的是鎖或者許可,并且調(diào)用者可能會一直等待直到同步器類處于可被獲取的狀態(tài)。如果一個類想成為狀態(tài)依賴的類,那么它必須擁有一些狀態(tài)。AQS負(fù)責(zé)管理同步器類中的狀態(tài),它管理了一個整數(shù)狀態(tài)信息,可以通過getstate,setState以及compareAndSetState等 protected類型方法來進(jìn)行操作。這個整數(shù)可以用于表示任意狀態(tài)。
AQS主要做三件事:
- 同步狀態(tài)的管理
- 線程的阻塞和喚醒
- 同步隊(duì)列的維護(hù)
狀態(tài)管理API:
- int getState(): 獲取同步狀態(tài)
- void setState(): 設(shè)置同步狀態(tài)
- boolean compareAndSetState(int expect, int update):基于CAS,原子設(shè)置狀態(tài)
AQS模板方法:
| 方法 | 描述 |
|---|---|
| void acquire(int arg) | 獨(dú)占式獲取同步狀態(tài),如果當(dāng)前線程獲取同步狀態(tài)成功,則由該方法返回,否則,將會進(jìn)入同步隊(duì)列等待,該方法將會調(diào)用重寫的tryAcquire(intarg)方法 |
| void acquireInterruptibly(int arg) | 與acquire(int arg)相同,但是該方法響應(yīng)中斷,當(dāng)前線程未獲取到同步狀態(tài)而進(jìn)入同步隊(duì)列中,如果當(dāng)前線程被中斷,則該方法會拋出InterruptedException并返回 |
| boolean tryAcquireNanos(int arg,long nanos) | 在acquireInterruptibly(int arg)基礎(chǔ)上增加了超時限制,如果當(dāng)前線程在超時時間內(nèi)沒有獲取到同步狀態(tài),那么就會返回false,如果獲取到就返回true |
| void acquireShared(int arg) | 共享式的獲取同步狀態(tài),如果當(dāng)前線程未獲取到同步狀態(tài),將會進(jìn)入同步隊(duì)列等待,與獨(dú)占式獲取的主要區(qū)別是在同一時刻可以有多個線程獲取到同步狀態(tài)。 |
| void acquireSharedInterruptibly(int arg) | 與acquireShared(int arg)相同,該方法響應(yīng)中斷。 |
| boolean tryAcquireSharedNanos(int arg,long nanos) | 在acquireSharedInterruptibly(int arg)基礎(chǔ)上增加了超時限制。 |
| boolean release(int arg) | 獨(dú)占式的釋放同步狀態(tài),該方法會在釋放同步狀態(tài)之后,將同步隊(duì)列中第一個節(jié)點(diǎn)包含的線程喚醒。 |
| boolean releaseShared(int arg) | 共享式的釋放同步狀態(tài) |
| Collection getQueuedThreads() | 獲取等待在同步隊(duì)列上的線程集合 |
同步器可重寫的方法:
| 方法 | 描述 |
|---|---|
| boolean tryAcquire(int arg) | 獨(dú)占獲取同步狀態(tài),實(shí)現(xiàn)該方法需要查詢當(dāng)前狀態(tài),并判斷同步狀態(tài)是否符合預(yù)期狀態(tài),然后再進(jìn)行CAS設(shè)置同步狀態(tài)。 |
| boolean tryRelease(int arg) | 獨(dú)占式釋放同步狀態(tài),等待獲取同步狀態(tài)的線程將有機(jī)會獲取同步狀態(tài) |
| int tryAcquireShared(int arg) | 共享式獲取同步狀態(tài),返回大于等于0的值,表示獲取成功,反之失敗 |
| boolean tryReleaseShared(int arg) | 共享式釋放同步狀態(tài) |
| boolean isHeldExclusively() | 當(dāng)前同步器是否在獨(dú)占模式下被線程占用,一般該方法表示是否被當(dāng)前線程所獨(dú)占 |
AQS對外暴露的方法:

2、AQS數(shù)據(jù)結(jié)構(gòu)
因?yàn)楂@取鎖是有條件的,沒有獲取鎖的線程就要阻塞等待,那么就要存儲這些等待的線程。在AQS中使用CLH隊(duì)列儲存這些等待的線程,但它并不是直接儲存線程,而是儲存擁有線程的node節(jié)點(diǎn)。
2.1、Node數(shù)據(jù)結(jié)構(gòu):
static final class Node {
//共享模式的標(biāo)記,標(biāo)識一個節(jié)點(diǎn)在共享模式下等待
static final Node SHARED = new Node();
//獨(dú)占模式的標(biāo)記,標(biāo)識一個節(jié)點(diǎn)在獨(dú)占模式下等待
static final Node EXCLUSIVE = null;
// waitStatus變量的值,標(biāo)志著線程被取消,后續(xù)將不會獲取到鎖
static final int CANCELLED = 1;
// waitStatus變量的值,標(biāo)志著后繼線程(即隊(duì)列中此節(jié)點(diǎn)之后的節(jié)點(diǎn))需要被阻塞.(用于獨(dú)占鎖)
static final int SIGNAL = -1;
// waitStatus變量的值,標(biāo)志著線程在Condition條件上等待阻塞.(用于Condition的await等待)
static final int CONDITION = -2;
// waitStatus變量的值,標(biāo)志著下一個acquireShared方法線程應(yīng)該被無條件傳播。(用于共享鎖)
static final int PROPAGATE = -3;
// 標(biāo)記著當(dāng)前節(jié)點(diǎn)的狀態(tài),默認(rèn)狀態(tài)是0, 小于0的狀態(tài)都是有特殊作用,大于0的狀態(tài)表示已取消
//SIGNAL:此節(jié)點(diǎn)的繼任節(jié)點(diǎn)被阻塞,故當(dāng)此節(jié)點(diǎn)釋放鎖或中斷時需要換繼任節(jié)點(diǎn)的線程。
//CANCELLED:當(dāng)獲取鎖超時或被中斷時節(jié)點(diǎn)狀態(tài)會被設(shè)置成此狀態(tài),此狀態(tài)的節(jié)點(diǎn)不會被再次阻塞;
//CONDITION:標(biāo)識此節(jié)點(diǎn)在條件隊(duì)列中,在同步隊(duì)列的節(jié)點(diǎn)不會出現(xiàn)此狀態(tài),
//當(dāng)節(jié)點(diǎn)從條件隊(duì)列移到同步隊(duì)列時此狀態(tài)會被設(shè)置為0;
//PROPAGATE:一個releaseShared應(yīng)該被傳播到其他節(jié)點(diǎn),此狀態(tài)在doReleaseShared()中調(diào)用,
//以確保傳播傳播在其他插入時保持繼續(xù)。
//總結(jié):狀態(tài)小于0表示節(jié)點(diǎn)無需被通知喚醒;狀態(tài)為0表示普通同步節(jié)點(diǎn);CONDITION表示節(jié)點(diǎn)在
//等待隊(duì)列中,狀態(tài)通過CAS進(jìn)行原子更新。
volatile int waitStatus;
/**
* 前驅(qū)節(jié)點(diǎn),在enqueue時設(shè)置,在dequeue或前驅(qū)節(jié)點(diǎn)取消時清除;
*/
volatile Node prev;
/**
* 后繼節(jié)點(diǎn),在enqueue時設(shè)置,在dequeue或前驅(qū)節(jié)點(diǎn)取消時清除;
* 入隊(duì)操作不會設(shè)置前驅(qū)節(jié)點(diǎn)的后繼節(jié)點(diǎn),直到節(jié)點(diǎn)連接到隊(duì)列;
* 故next節(jié)點(diǎn)為null不一定表示此節(jié)點(diǎn)為隊(duì)列尾部,當(dāng)next節(jié)點(diǎn)為null時,
* 可遍歷prev節(jié)點(diǎn)進(jìn)行雙重檢查;已經(jīng)取消的節(jié)點(diǎn)的next指向自己而不是null
*/
volatile Node next;
//該節(jié)點(diǎn)擁有的線程
volatile Thread thread;
/**
* 1、值為null或非SHARED;為null時表示獨(dú)占模式;非SHARED時表示在Condition中等待的隊(duì)列;
* 2、值為SHARED,表示共享模式;
*/
Node nextWaiter;
//是否為共享模式
final boolean isShared() {
return nextWaiter == SHARED;
}
//當(dāng)前節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn),如果前驅(qū)節(jié)點(diǎn)為null,則拋出NPE異常
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() { // Used to establish initial head or SHARED marker
}
//用于在addWaiter()中使用,創(chuàng)建同步隊(duì)列中的節(jié)點(diǎn)
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
//在Condition中使用,創(chuàng)建等待隊(duì)列的節(jié)點(diǎn)
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
說明:
- waitStatus:表示當(dāng)前節(jié)點(diǎn)的狀態(tài),會有如下五中狀態(tài):
CANCELLED(1):當(dāng)獲取鎖超時或被中斷時節(jié)點(diǎn)狀態(tài)會被設(shè)置成此狀態(tài),此狀態(tài)的 節(jié)點(diǎn)會被unpark,不會參與鎖的獲取,不會被再次阻塞;
0:表示普通節(jié)點(diǎn),當(dāng)節(jié)點(diǎn)初始插入到同步隊(duì)列時的狀態(tài);
SIGNAL(-1):此節(jié)點(diǎn)的繼任節(jié)點(diǎn)被阻塞,故當(dāng)此節(jié)點(diǎn)釋放鎖或中斷時需要換繼任節(jié)點(diǎn)的線程。
CONDITION(-2):標(biāo)識此節(jié)點(diǎn)在條件隊(duì)列中,在同步隊(duì)列的節(jié)點(diǎn)不會出現(xiàn)此狀態(tài),當(dāng)節(jié)點(diǎn)從條件隊(duì)列移到同步隊(duì)列時此狀態(tài)會被設(shè)置為0;
PROPAGATE(-3):一個releaseShared應(yīng)該被傳播到其他節(jié)點(diǎn),此狀態(tài)在doReleaseShared()中調(diào)用,以確保傳播傳播在其他插入時保持繼續(xù)。
總結(jié):狀態(tài)小于0表示節(jié)點(diǎn)無需被通知喚醒;狀態(tài)為0表示普通同步節(jié)點(diǎn);CONDITION表示節(jié)點(diǎn)在等待隊(duì)列中,狀態(tài)通過CAS進(jìn)行原子更新。
- prev:前驅(qū)節(jié)點(diǎn),在enqueue時設(shè)置,在dequeue或前驅(qū)節(jié)點(diǎn)取消時清除;
- next:后繼節(jié)點(diǎn),在enqueue時設(shè)置,在dequeue或前驅(qū)節(jié)點(diǎn)取消時清除; 入隊(duì)操作不會設(shè)置前驅(qū)節(jié)點(diǎn)的后繼節(jié)點(diǎn),直到節(jié)點(diǎn)連接到隊(duì)列; 故next節(jié)點(diǎn)為null不一定表示此節(jié)點(diǎn)為隊(duì)列尾部,當(dāng)next節(jié)點(diǎn)為null時, 可遍歷prev節(jié)點(diǎn)進(jìn)行雙重檢查;已經(jīng)取消的節(jié)點(diǎn)的next指向自己而不是null
- **nextWaiter: **值為SHARED,表示共享模式;值為null或非SHARED,為null時表示獨(dú)占模式,非SHARED時表示在Condition隊(duì)列中等待的節(jié)點(diǎn);
waitStatus狀態(tài)狀態(tài):

2.2、AQS數(shù)據(jù)結(jié)構(gòu)
/**
* 等待隊(duì)列的頭節(jié)點(diǎn),通過setHead方法進(jìn)行更新;頭結(jié)點(diǎn)的狀態(tài)不可能是CANCELLED
*/
private transient volatile Node head;
/**
* 等待隊(duì)列的尾節(jié)點(diǎn)
*/
private transient volatile Node tail;
/**
* 同步狀態(tài)
*/
private volatile int state;
3、CLH隊(duì)列相關(guān)操作
3.1、相關(guān)屬性的CAS操作
/**
* 通過CAS設(shè)置AQS的head值
*/
private final boolean compareAndSetHead(Node update) {
return unsafe.compareAndSwapObject(this, headOffset, null, update);
}
/**
* 通過CAS設(shè)置AQS的tail值
*/
private final boolean compareAndSetTail(Node expect, Node update) {
return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
}
/**
* 通過CAS設(shè)置Node節(jié)點(diǎn)的waitStatue值
*/
private static final boolean compareAndSetWaitStatus(Node node,
int expect,
int update) {
return unsafe.compareAndSwapInt(node, waitStatusOffset,
expect, update);
}
/**
* 通過CAS設(shè)置Node節(jié)點(diǎn)的next值
*/
private static final boolean compareAndSetNext(Node node,
Node expect,
Node update) {
return unsafe.compareAndSwapObject(node, nextOffset, expect, update);
}
3.2、將節(jié)點(diǎn)添加到CLH隊(duì)尾
//通過空轉(zhuǎn)及CAS方式,將一個node插入同步隊(duì)列的隊(duì)尾
private Node enq(final Node node) {
for (; ; ) {
Node t = tail;
//尾節(jié)點(diǎn)為空,則直接CAS初始化頭結(jié)點(diǎn),并將尾節(jié)點(diǎn)設(shè)置為頭節(jié)點(diǎn)
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
//尾節(jié)點(diǎn)不為空,則CAS設(shè)置當(dāng)前節(jié)點(diǎn)為尾節(jié)點(diǎn)
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
3.3、將當(dāng)前線程添加到CLH隊(duì)尾
//創(chuàng)建一個給定模式的節(jié)點(diǎn),此節(jié)點(diǎn)線程為當(dāng)前線程,并將節(jié)點(diǎn)加入隊(duì)尾
private Node addWaiter(Node mode) {
//創(chuàng)建線程
Node node = new Node(Thread.currentThread(), mode);
//快速檢測尾節(jié)點(diǎn)不為空,則CAS將當(dāng)前節(jié)點(diǎn)替換為尾節(jié)點(diǎn)
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//快速替換失?。縿t通過enq()的空轉(zhuǎn)方式將當(dāng)前節(jié)點(diǎn)插入隊(duì)尾
enq(node);
return node;
}
4、獨(dú)占鎖
獨(dú)占鎖主要包括兩方面功能:
- 獲取鎖的功能:既當(dāng)多個線程一起獲取鎖的時候,只有一個線程能獲取到鎖,其他線程必須在當(dāng)前位置阻塞等待。
- 釋放鎖的功能:獲取鎖的線程釋放鎖資源,而且還必須能喚醒正在等待鎖資源的一個線程。
4.1、獨(dú)占鎖獲取流程

4.2、獲取獨(dú)占鎖相關(guān)方法
//獲取獨(dú)占鎖,忽略中斷;直到成功獲取鎖,此方法經(jīng)常被lock.lock調(diào)用
public final void acquire(int arg) {
//tryAcquire:先CAS嘗試獲取鎖,當(dāng)返回true表示獲取成功,此方法由子類實(shí)現(xiàn);
//當(dāng)返回false,則調(diào)用
//acquireQueued進(jìn)行獲取及入隊(duì)阻塞處理,當(dāng)有其他線程釋放鎖會喚醒改線程。
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
//調(diào)用tryAcquire嘗試獲取鎖,當(dāng)獲取失敗就將線程節(jié)點(diǎn)入隊(duì)并阻塞節(jié)點(diǎn)線程;
//直到線程被中斷或被喚醒,會再次嘗試獲取鎖;
final boolean acquireQueued(final Node node, int arg) {
//標(biāo)識獲取鎖失敗
boolean failed = true;
try {
//標(biāo)識鎖被中斷
boolean interrupted = false;
for (; ; ) {
//獲取當(dāng)前節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn),若前驅(qū)節(jié)點(diǎn)為頭節(jié)點(diǎn),則嘗試獲取鎖;
//若獲取成功則將當(dāng)前節(jié)點(diǎn)設(shè)為頭節(jié)點(diǎn),否則嘗試阻塞節(jié)點(diǎn)線程
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
//將當(dāng)前節(jié)點(diǎn)設(shè)為頭節(jié)點(diǎn)
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
//判斷當(dāng)前是否滿足阻塞條件,滿足則阻塞當(dāng)前線程;
//并等待被中斷或被喚醒
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
//獲取鎖失敗,則進(jìn)行取消獲取操作
if (failed)
cancelAcquire(node);
}
}
//判斷當(dāng)前節(jié)點(diǎn)線程是否需要阻塞
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
//前驅(qū)節(jié)點(diǎn)的狀態(tài)
int ws = pred.waitStatus;
//前驅(qū)節(jié)點(diǎn)狀態(tài)為SIGNAL,表示前驅(qū)節(jié)點(diǎn)在等待獲取鎖的信號
//故本節(jié)點(diǎn)可以安全的阻塞
if (ws == Node.SIGNAL)
return true;
//前驅(qū)節(jié)點(diǎn)waitStatus>0,即為waitStatus=CANCELLED;
//表示前驅(qū)節(jié)點(diǎn)已經(jīng)被取消,需需要前向遍歷前驅(qū)節(jié)點(diǎn),直到狀態(tài)
//不為CANCELLED的節(jié)點(diǎn),并將此節(jié)點(diǎn)設(shè)為node節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn);
//返回false,讓上層調(diào)用繼續(xù)嘗試獲取鎖
if (ws > 0) {
//循環(huán)遍歷前驅(qū)節(jié)點(diǎn),尋找狀態(tài)不為CANCELLED的節(jié)點(diǎn),并設(shè)為當(dāng)前節(jié)點(diǎn)的
//前驅(qū)節(jié)點(diǎn)
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
//當(dāng)前驅(qū)節(jié)點(diǎn)狀態(tài)為0或PROPAGATE時,通過CAS設(shè)置前驅(qū)節(jié)點(diǎn)狀態(tài)為SIGNAL
//并返回fase,等待下個循環(huán)阻塞當(dāng)前節(jié)點(diǎn)線程;
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
//通過LockSupport.park()阻塞當(dāng)前線程,直到線程被unpark或被中斷
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
4.3、釋放獨(dú)占鎖相關(guān)方法
//釋放獨(dú)占鎖
public final boolean release(int arg) {
//調(diào)用tryRelease方式通過CAS嘗試釋放鎖,tryRelease由子類實(shí)現(xiàn)
if (tryRelease(arg)) {
Node h = head;
//頭結(jié)點(diǎn)不為空且頭節(jié)點(diǎn)狀態(tài)不為0,應(yīng)該為SIGNAL
//表示隊(duì)列中有需要喚醒的節(jié)點(diǎn),調(diào)用unparkSuccessor進(jìn)行頭節(jié)點(diǎn)線程
//喚醒操作
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
//喚醒節(jié)點(diǎn)線程處理
private void unparkSuccessor(Node node) {
//獲取當(dāng)前節(jié)點(diǎn)狀態(tài)
int ws = node.waitStatus;
//狀態(tài)小于零,則將狀態(tài)重置為0,表示節(jié)點(diǎn)處理已經(jīng)完成
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
//獲取后繼節(jié)點(diǎn),當(dāng)后繼節(jié)點(diǎn)為空或后繼節(jié)點(diǎn)狀態(tài)為CANCELLED時;
//由tail前向遍歷隊(duì)列,找到當(dāng)前節(jié)點(diǎn)的下個有效節(jié)點(diǎn),即waitStatus <= 0
//的節(jié)點(diǎn)
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
//下個節(jié)點(diǎn)非空,表示為等待信號的節(jié)點(diǎn),執(zhí)行unpark喚醒節(jié)點(diǎn)線程
if (s != null)
LockSupport.unpark(s.thread);
}
5、共享鎖
5.1、共享鎖獲取流程

5.2、共享鎖獲取相關(guān)方法
//獲取共享鎖,忽略中斷;
public final void acquireShared(int arg) {
//子類實(shí)現(xiàn),CAS方式獲取共享鎖,若獲取失敗,調(diào)用doAcquireShared繼續(xù)獲取共享鎖
if (tryAcquireShared(arg) < 0)
//嘗試獲取共享鎖,獲取失敗則將當(dāng)前線程節(jié)點(diǎn)入隊(duì),直到被通知或被中斷
doAcquireShared(arg);
}
//獲取共享鎖,若CAS獲取失敗,則將當(dāng)前節(jié)點(diǎn)入隊(duì)并阻塞當(dāng)前線程,直到獲取鎖
private void doAcquireShared(int arg) {
//將當(dāng)前節(jié)點(diǎn)插入隊(duì)尾
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (; ; ) {
//獲取當(dāng)前節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn),若前驅(qū)節(jié)點(diǎn)為頭節(jié)點(diǎn),則嘗試獲取鎖;
//若獲取失敗,則檢查節(jié)點(diǎn)狀態(tài);當(dāng)節(jié)點(diǎn)狀態(tài)為SIGNAL時將節(jié)點(diǎn)線程阻塞
final Node p = node.predecessor();
if (p == head) {
//CAS獲取鎖
int r = tryAcquireShared(arg);
if (r >= 0) {
//成功則設(shè)置當(dāng)前節(jié)點(diǎn)為頭節(jié)點(diǎn)并將其他節(jié)點(diǎn)狀態(tài)設(shè)為PROPAGAE
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
//檢查當(dāng)前節(jié)點(diǎn)是否應(yīng)該阻塞,是則進(jìn)行阻塞處理,直到被中斷
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
//若獲取鎖失敗,則進(jìn)行取消獲取處理
if (failed)
cancelAcquire(node);
}
}
//設(shè)置當(dāng)前節(jié)點(diǎn)為頭節(jié)點(diǎn)并喚醒共享模式下的線程
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head;
//設(shè)為頭節(jié)點(diǎn)
setHead(node);
//若propagate > 0或頭結(jié)點(diǎn)為空且頭節(jié)點(diǎn)狀態(tài)為 < 0
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
//若頭節(jié)點(diǎn)的后繼節(jié)點(diǎn)為共享模式,則獲取頭結(jié)點(diǎn)
if (s == null || s.isShared())
doReleaseShared();
}
}
5.3、共享鎖釋放相關(guān)方法
//釋放共享鎖
public final boolean releaseShared(int arg) {
//cas方式釋放鎖,若失敗則doReleaseShared釋放鎖
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
private void doReleaseShared() {
for (; ; ) {
//獲取頭結(jié)點(diǎn),頭結(jié)點(diǎn)不為空且狀態(tài)為SIGNAL時,CAS設(shè)置狀態(tài)為0并喚醒線程
//否則將頭結(jié)點(diǎn)狀態(tài)設(shè)置為PROPAGATE,然后循環(huán)檢查頭結(jié)點(diǎn)狀態(tài)并試圖喚醒
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
} else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
6、condition條件
6.1、condition的實(shí)現(xiàn)
首先內(nèi)部存在一個Condition隊(duì)列,存儲著所有在此Condition條件等待的線程。
await系列方法:讓當(dāng)前持有鎖的線程釋放鎖,并喚醒一個在CLH隊(duì)列上等待鎖的線程,再為當(dāng)前線程創(chuàng)建一個node節(jié)點(diǎn),插入到Condition隊(duì)列(注意不是插入到CLH隊(duì)列中)
signal系列方法:其實(shí)這里沒有喚醒任何線程,而是將Condition隊(duì)列上的等待節(jié)點(diǎn)插入到CLH隊(duì)列中,所以當(dāng)持有鎖的線程執(zhí)行完畢釋放鎖時,就會喚醒CLH隊(duì)列中的一個線程,這個時候才會喚醒線程。
6.2、await及signal處理流程


6.3、await相關(guān)方法
//讓當(dāng)前持有鎖的線程阻塞等待,并釋放鎖。如果有中斷請求,則拋出InterruptedException異常
public final void await() throws InterruptedException {
//若當(dāng)前線程已被中斷,則拋出中斷異常
if (Thread.interrupted())
throw new InterruptedException();
// 為當(dāng)前線程創(chuàng)建新的Node節(jié)點(diǎn),并且將這個節(jié)點(diǎn)插入到Condition隊(duì)列中
Node node = addConditionWaiter();
//釋放當(dāng)前線程持有的鎖,并喚醒同步隊(duì)列中的頭結(jié)點(diǎn)
int savedState = fullyRelease(node);
int interruptMode = 0;
//如果當(dāng)前節(jié)點(diǎn)補(bǔ)足同步隊(duì)列中;
//阻塞當(dāng)前線程,當(dāng)前當(dāng)前線程被signal信號喚醒后,將當(dāng)前節(jié)點(diǎn)加入同步隊(duì)列中;
//等待獲取獲取鎖
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
//檢查是否被中斷并入隊(duì)等待鎖
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 如果節(jié)點(diǎn)node已經(jīng)在同步隊(duì)列中了,獲取同步鎖,只有得到鎖才能繼續(xù)執(zhí)行,否則線程繼續(xù)阻塞等待
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
// 清除Condition隊(duì)列中狀態(tài)不是Node.CONDITION的節(jié)點(diǎn)
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
// 是否要拋出異常,或者發(fā)出中斷請求
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
//為當(dāng)前線程創(chuàng)建新的Node節(jié)點(diǎn),并且將這個節(jié)點(diǎn)插入到Condition隊(duì)列中
private Node addConditionWaiter() {
Node t = lastWaiter;
// 如果等待隊(duì)列尾節(jié)點(diǎn)狀態(tài)不是CONDITION,則進(jìn)行清除操作;
// 清除隊(duì)列中狀態(tài)不是CONDITION的節(jié)點(diǎn)
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
//為當(dāng)前線程創(chuàng)建一個狀態(tài)為CONDITION的節(jié)點(diǎn),并將節(jié)點(diǎn)插入隊(duì)尾
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
//從頭到尾部遍歷等待隊(duì)列,去除狀態(tài)不是CONDITION的節(jié)點(diǎn)
private void unlinkCancelledWaiters() {
//記錄下一個待處理的節(jié)點(diǎn)
Node t = firstWaiter;
//記錄上一個狀態(tài)為CONDITION的節(jié)點(diǎn)
Node trail = null;
while (t != null) {
Node next = t.nextWaiter;
if (t.waitStatus != Node.CONDITION) {
t.nextWaiter = null;
if (trail == null)
firstWaiter = next;
else
trail.nextWaiter = next;
if (next == null)
lastWaiter = trail;
} else
trail = t;
t = next;
}
}
//釋放當(dāng)前線程占有的鎖,并喚醒同步隊(duì)列一個等待線程
//如果失敗就拋出異常,設(shè)置node節(jié)點(diǎn)的狀態(tài)是Node.CANCELLED
final int fullyRelease(Node node) {
boolean failed = true;
try {
int savedState = getState();
//釋放當(dāng)前線程占有的鎖
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
//判斷節(jié)點(diǎn)釋放在同步隊(duì)列中
final boolean isOnSyncQueue(Node node) {
// 如果node的狀態(tài)是Node.CONDITION,或者node沒有前一個節(jié)點(diǎn)prev,
// 那么返回false,節(jié)點(diǎn)node不在同步隊(duì)列中
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
//如果node有下個節(jié)點(diǎn),則其一定在同步隊(duì)列中
if (node.next != null) // If has successor, it must be on queue
return true;
//從同步隊(duì)列中查找node節(jié)點(diǎn)
return findNodeFromTail(node);
}
//根據(jù)當(dāng)前的模式,判斷是否拋出異常或重新中斷等
private void reportInterruptAfterWait(int interruptMode)
throws InterruptedException {
if (interruptMode == THROW_IE)
throw new InterruptedException();
else if (interruptMode == REINTERRUPT)
selfInterrupt();
}
6.4、signal相關(guān)方法
//如果等待隊(duì)列不為空,則將隊(duì)列頭節(jié)點(diǎn)插入同步隊(duì)列中
public final void signal() {
//如果當(dāng)前線程不是獨(dú)占鎖,則拋出異常
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
//將等待隊(duì)列中的頭結(jié)點(diǎn)插入同步隊(duì)列中
if (first != null)
doSignal(first);
}
//將等待隊(duì)列總的頭結(jié)點(diǎn)插入同步隊(duì)列中
private void doSignal(Node first) {
do {
// 原先的Condition隊(duì)列頭節(jié)點(diǎn)取消,所以重新賦值Condition隊(duì)列頭節(jié)點(diǎn)
// 如果新的Condition隊(duì)列頭節(jié)點(diǎn)為null,表示Condition隊(duì)列為空了
// ,所以也要設(shè)置Condition隊(duì)列尾lastWaiter為null
if ((firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
// 返回true表示節(jié)點(diǎn)node插入到同步隊(duì)列中,返回false表示節(jié)點(diǎn)node沒有插入到同步隊(duì)列中
final boolean transferForSignal(Node node) {
//如果無法將節(jié)點(diǎn)狀態(tài)由CONDITION修改為0,表示節(jié)點(diǎn)已在同步隊(duì)列中,直接返回false
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
//將節(jié)點(diǎn)node插入到同步隊(duì)列中,p是原先同步隊(duì)列尾節(jié)點(diǎn),也是node節(jié)點(diǎn)的前一個節(jié)點(diǎn)
Node p = enq(node);
int ws = p.waitStatus;
// 如果前一個節(jié)點(diǎn)是已取消狀態(tài),或者不能將它設(shè)置成Node.SIGNAL狀態(tài)。
// 就說明節(jié)點(diǎn)p之后也不會發(fā)起喚醒下一個node節(jié)點(diǎn)線程的操作,
// 所以這里直接調(diào)用 LockSupport.unpark(node.thread)方法,喚醒節(jié)點(diǎn)node所在線程
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
//喚醒所有節(jié)點(diǎn)
public final void signalAll() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignalAll(first);
}
//循環(huán)喚醒所有等待中的節(jié)點(diǎn)
private void doSignalAll(Node first) {
lastWaiter = firstWaiter = null;
do {
Node next = first.nextWaiter;
first.nextWaiter = null;
transferForSignal(first);
first = next;
} while (first != null);
}