隊列同步器AbstractQueuedSynchronizer(以下簡稱同步器),是用來構(gòu)建鎖和其他同步組件的基礎(chǔ)框架。它使用了一個int成員變量來表示同步狀態(tài),通過內(nèi)置的FIFO隊列來完成資源獲取線程的排隊工作,并發(fā)包的作者(Doug Lea)期望它能夠成為實現(xiàn)大部分同步需求的基礎(chǔ)。
同步器的主要使用方式是繼承,子類通過繼承同步器并實現(xiàn)它的抽象方法管理同步狀態(tài),在抽象方法的實現(xiàn)過程中免不了要對同步狀態(tài)進行更改,這時就需要使用同步器提供的3個方法(getState(),setState(int),compareAndSetState(int,int))來進行操作,因為它們能夠保證狀態(tài)的改變是安全的。子類推薦被定義為自定義同步組件的靜態(tài)內(nèi)部類,同步器本身沒有實現(xiàn)任何同步接口,它僅僅定義了若干同步狀態(tài)獲取和釋放的方法供自定義同步組件使用,同步器既支持獨占式獲取同步狀態(tài),也支持共享式獲取同步狀態(tài)。當(dāng)以獨占式模式獲取時,其他線程嘗試獲取會失敗,而以共享式模式獲取時,多個線程獲取則可能會成功。
同步器是實現(xiàn)鎖(也可以是任何同步組件)的關(guān)鍵,在鎖的實現(xiàn)中聚合同步器,利用同步器實現(xiàn)鎖的語義。可以這樣理解二者之間的關(guān)系:鎖是面向使用者的,它定義了使用者與鎖交互的接口,隱藏了實現(xiàn)細節(jié);同步器是面向鎖的實現(xiàn)者的,它簡化了鎖的實現(xiàn)方式,屏蔽了同步狀態(tài)管理、線程的排隊、等待與喚醒等底層操作。
使用
為了使用此類作為同步組件的基礎(chǔ),需要重新定義以下方法,通過使用getState(),setState(int),compareAndSetState(int,int)方法來檢查和修改同步狀態(tài):
| 方法名稱 | 描述 |
|---|---|
| tryAcquire(int) | 獨占式獲取同步狀態(tài),實現(xiàn)該方法需要查詢當(dāng)前狀態(tài)把那個且判斷同步狀態(tài)是否符合預(yù)期,然后再進行CAS設(shè)置同步狀態(tài) |
| tryRelease(int) | 獨占式釋放同步狀態(tài),等待獲取同步狀態(tài)的線程將有機會獲取同步狀態(tài) |
| tryAcquireShared(int) | 共享式獲取同步狀態(tài),返回大于等于0的值,表示獲取成功,反之,獲取失敗 |
| tryReleaseShared(int) | 共享式釋放同步狀態(tài) |
| isHeldExclusively() | 同步器是否被在獨占模式下被線程占用 |
每個方法的默認實現(xiàn)是拋出UnsupportedOperationException異常。這個方法的實現(xiàn)必須是線程安全的,簡短的并且非阻塞的。定義這些方法是此類唯一支持的操作,其他方法被聲明為final因為它們不能被獨立改變。
> line: 1115
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
protected boolean tryReleaseShared(int arg) {
throw new UnsupportedOperationException();
}
protected boolean isHeldExclusively() {
throw new UnsupportedOperationException();
}
你可能也發(fā)現(xiàn)了從AbstractOwnableSynchronizer類中繼承的方法,這些方法對于追蹤線程獲取獨占式同步器很有用。你應(yīng)該去使用它們,這確保了監(jiān)視器和錯誤處理組件能幫助用戶確定哪個線程持有了鎖。
public abstract class AbstractOwnableSynchronizer
implements java.io.Serializable {
/** Use serial ID even though all fields transient. */
private static final long serialVersionUID = 3737899427754241961L;
protected AbstractOwnableSynchronizer() { }
/**
* 獨占模式同步器的持有線程
*/
private transient Thread exclusiveOwnerThread;
/**
* 設(shè)置同步器的擁有者。如果傳入的參數(shù)為null,表示當(dāng)前沒有線程占有同步器
* 這個方法沒有強加任何同步措施,例如synchronized或者volatile訪問
*/
protected final void setExclusiveOwnerThread(Thread thread) {
exclusiveOwnerThread = thread;
}
/**
* 返回最后一次使用setExclusiveOwnerThread設(shè)置的線程,如果沒有設(shè)置過,
* 則返回null。這個方法沒有強加任何同步措施,例如synchronized或者volatile訪問
*/
protected final Thread getExclusiveOwnerThread() {
return exclusiveOwnerThread;
}
}
這個類基于內(nèi)部的FIFO隊列,獨占式同步器的核心如下:
Acquire:
while (!tryAcquire(arg)) {
如果還未入隊,那么將此線程入隊;
可能會將當(dāng)前線程阻塞;
}
Release:
if (tryRelease(arg))
喚醒第一個等待線程;
共享模式與此相似。
示例
這里是一個不可重入的獨占鎖類,使用0代表解鎖狀態(tài),1代表加鎖狀態(tài)。雖然一個不可重入鎖不強制要求記錄當(dāng)前持有同步器的線程,但是這個類這樣做的原因是使得使用者更容易監(jiān)視。
class Mutex implements Lock, java.io.Serializable {
// Our internal helper class
private static class Sync extends AbstractQueuedSynchronizer {
// Acquires the lock if state is zero
public boolean tryAcquire(int acquires) {
assert acquires == 1; // Otherwise unused
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
// Releases the lock by setting state to zero
protected boolean tryRelease(int releases) {
assert releases == 1; // Otherwise unused
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
setExclusiveOwnerThread(null);
setState(0);
return true;
}
// Reports whether in locked state
public boolean isLocked() {
return getState() != 0;
}
public boolean isHeldExclusively() {
// a data race, but safe due to out-of-thin-air guarantees
return getExclusiveOwnerThread() == Thread.currentThread();
}
// Provides a Condition
public Condition newCondition() {
return new ConditionObject();
}
// Deserializes properly
private void readObject(ObjectInputStream s)
throws IOException, ClassNotFoundException {
s.defaultReadObject();
setState(0); // reset to unlocked state
}
}
// The sync object does all the hard work. We just forward to it.
private final Sync sync = new Sync();
public void lock() {
sync.acquire(1);
}
public boolean tryLock() {
return sync.tryAcquire(1);
}
public void unlock() {
sync.release(1);
}
public Condition newCondition() {
return sync.newCondition();
}
public boolean isLocked() {
return sync.isLocked();
}
public boolean isHeldByCurrentThread() {
return sync.isHeldExclusively();
}
public boolean hasQueuedThreads() {
return sync.hasQueuedThreads();
}
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
}
這里是一個類似java.util.concurrent.CountDownLatch的柵欄類,除了它只需要釋放一個信號。因為柵欄是非獨占式的,它將使用共享式獲取和釋放方法。
class BooleanLatch {
private static class Sync extends AbstractQueuedSynchronizer {
boolean isSignalled() {
return getState() != 0;
}
protected int tryAcquireShared(int ignore) {
return isSignalled() ? 1 : -1;
}
protected boolean tryReleaseShared(int ignore) {
setState(1);
return true;
}
}
private final Sync sync = new Sync();
public boolean isSignalled() {
return sync.isSignalled();
}
public void signal() {
sync.releaseShared(1);
}
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
}
從上面的實現(xiàn)類中可以看出使用AQS框架能夠大大降低一個可靠的自定義同步器的實現(xiàn)門檻。
AQS實現(xiàn)分析
接下來從實現(xiàn)角度分析同步器是如何完成線程同步狀態(tài)的管理,主要包括:同步隊列、獨占式同步狀態(tài)獲取與釋放、共享式同步狀態(tài)獲取與釋放以及超時獲取同步狀態(tài)等同步器的核心數(shù)據(jù)結(jié)構(gòu)和模板方法。
同步隊列
同步器依賴內(nèi)部的同步隊列來完成同步狀態(tài)的管理,當(dāng)前線程獲取同步狀態(tài)失敗時,同步器會將當(dāng)前線程以及等待信息等狀態(tài)狗造成一個節(jié)點并將其增加到同步隊列,同時會阻塞當(dāng)前線程,當(dāng)同步狀態(tài)釋放時,會把首節(jié)點中的線程喚醒,使其再次嘗試獲取同步狀態(tài)。
同步隊列中的節(jié)點用來保存獲取同步狀態(tài)失敗的線程引用、等待狀態(tài)以及前驅(qū)后繼結(jié)點,節(jié)點的屬性如下所示:
static final class Node {
/** 標(biāo)記節(jié)點在共享式模式下等待 */
static final Node SHARED = new Node();
/** 標(biāo)記節(jié)點在獨占式模式下等待 */
static final Node EXCLUSIVE = null;
/** waitStatus value 表示線程已經(jīng)被取消獲取共享狀態(tài) */
static final int CANCELLED = 1;
/** waitStatus value 表示后繼節(jié)點需要被喚醒 */
static final int SIGNAL = -1;
/** waitStatus value 表示節(jié)點在condition上等待 */
static final int CONDITION = -2;
/**
* waitStatus value 表示下一個共享式節(jié)點需要被無條件傳遞
*/
static final int PROPAGATE = -3;
/**
* 狀態(tài)域,只能有以下幾種值:
* SIGNAL: 這個節(jié)點的后續(xù)節(jié)點(或者即將成為后繼節(jié)點)被阻塞,
* 所以當(dāng)前節(jié)點必須在它釋放共享狀態(tài)或取消獲取共享狀態(tài)時喚醒它的后繼節(jié)點。
* 為了避免競爭,acquire方法必須首先表示它們需要一個信號,然后重新進行原子獲取,
* 如果失敗,則被阻塞
* CANCELLED: 這個節(jié)點由于超時或者中斷而被取消。節(jié)點不會離開這個狀態(tài)。
* 特別的,有cancelled節(jié)點的線程將不會再次被阻塞。
* CONDITION: 這個節(jié)點現(xiàn)在處于condition等待隊列中。
* 它不會作為同步隊列節(jié)點使用直到它的狀態(tài)被設(shè)置為0并且被轉(zhuǎn)移到同步隊列中
* (使用0這個值與這個域的其他使用沒有任何關(guān)系,簡化了機制)。
* PROPAGATE: 一次共享式同步狀態(tài)釋放應(yīng)當(dāng)被傳遞給其他節(jié)點。
* 這只會在doReleaseShared()方法中被設(shè)置來確保傳遞進行。
* 0: 初始值
*
* 這個值被安排為數(shù)字主要是簡化使用。肺腑值意味著節(jié)點不需要被通知。
* 因此,大多數(shù)代碼不需要只為了通知而去檢查特定的值。
*
* 此成員變量對于普通同步節(jié)點初始化為0,對于condition節(jié)點初始化為CONDITION。
* 它使用CAS操作進行修改,或者如果可能的話,使用volatile寫。
*/
volatile int waitStatus;
/**
* 前驅(qū)結(jié)點。當(dāng)節(jié)點入隊時被設(shè)置,出隊時被設(shè)為null以幫助 gc。
* 當(dāng)然,當(dāng)前驅(qū)結(jié)點被取消,我們快速循環(huán)搜索整個隊列來尋找
* 一個未被取消的節(jié)點,它總是會存在因為頭節(jié)點不會被取消:
* 一個節(jié)點成為頭節(jié)點只會因為成功獲取到了同步狀態(tài)。一個被取消
* 的線程將不會再繼續(xù)及進行獲取同步狀態(tài),并且它只會取消自己的節(jié)點
*/
volatile Node prev;
/**
* 后繼節(jié)點。當(dāng)節(jié)點入隊時被設(shè)置,刪除被取消的節(jié)點時被修改,出隊
* 時被設(shè)置為null以幫助 gc。入隊操作不會設(shè)置前驅(qū)結(jié)點的next域直到
* 入隊成功,所以發(fā)現(xiàn)next域的值為null不一定代表此節(jié)點為最后一個
* 節(jié)點。被取消的節(jié)點的next域被設(shè)置為指向它自己。
*/
volatile Node next;
/**
* 插入此節(jié)點的線程。在構(gòu)造函數(shù)中初始化,使用完后設(shè)置為null。
*/
volatile Thread thread;
/**
* 等待隊列中的后繼節(jié)點。如果當(dāng)前節(jié)點是共享的,那么這個字段
* 將是一個SHARED常量。因為等待隊列只有當(dāng)處于獨占模式時才能
* 訪問,所以我們只需要一個簡單的鏈接隊列來保存節(jié)點,當(dāng)它們在
* condition上等待時。它們再次嘗試獲取同步狀態(tài)會被轉(zhuǎn)移到同步隊列。
* 因為condition只可以是獨占式的,所以我們使用一個特殊值來表示
* 共享模式。
*/
Node nextWaiter;
/**
* 如果這個節(jié)點在共享模式等待,則返回true。
*/
final boolean isShared() {
return nextWaiter == SHARED;
}
/**
* 返回前驅(qū)結(jié)點,如果為null則拋出 NullPointerException 異常。
* 當(dāng)前驅(qū)結(jié)點不為null時使用。null檢查可以去掉,但是存在可以幫助VM。
* @return the predecessor of this node
*/
final Node predecessor() {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
/** Establishes initial head or SHARED marker. */
Node() {}
/** Constructor used by addWaiter. */
Node(Node nextWaiter) {
this.nextWaiter = nextWaiter;
THREAD.set(this, Thread.currentThread());
}
/** Constructor used by addConditionWaiter. */
Node(int waitStatus) {
WAITSTATUS.set(this, waitStatus);
THREAD.set(this, Thread.currentThread());
}
/** CASes waitStatus field. */
final boolean compareAndSetWaitStatus(int expect, int update) {
return WAITSTATUS.compareAndSet(this, expect, update);
}
/** CASes next field. */
final boolean compareAndSetNext(Node expect, Node update) {
return NEXT.compareAndSet(this, expect, update);
}
final void setPrevRelaxed(Node p) {
PREV.set(this, p);
}
// VarHandle 機制。將上面字段使用VarHandle封裝,只是為了提高代碼性能。
// 這是jdk提供的一種底層操作機制,java8前為Unsfe類。
private static final VarHandle NEXT;
private static final VarHandle PREV;
private static final VarHandle THREAD;
private static final VarHandle WAITSTATUS;
static {
try {
MethodHandles.Lookup l = MethodHandles.lookup();
NEXT = l.findVarHandle(Node.class, "next", Node.class);
PREV = l.findVarHandle(Node.class, "prev", Node.class);
THREAD = l.findVarHandle(Node.class, "thread", Thread.class);
WAITSTATUS = l.findVarHandle(Node.class, "waitStatus", int.class);
} catch (ReflectiveOperationException e) {
throw new Error(e);
}
}
}
節(jié)點是構(gòu)成同步隊列的基礎(chǔ),同步器擁有首節(jié)點head和尾節(jié)點tail,沒有成功獲取同步狀態(tài)的線程將會成為節(jié)點加入該隊列的尾部,同步隊列的基本結(jié)構(gòu)如下:

> line: 563 處于源代碼中的位置
/**
* 同步隊列的頭節(jié)點,延遲初始化。除了初始化以外,只能通過setHead方法
* 來修改。注意:如果頭節(jié)點存在,它的waitStatus被保證不會是CANCELLED
*/
private transient volatile Node head;
/**
* 同步隊列的尾節(jié)點,延遲初始化。只會通過enq方法增加新節(jié)點才會被改變。
*/
private transient volatile Node tail;
/**
* 同步狀態(tài)
*/
private volatile int state;
> line: 2293
// VarHandle 機制。將上面三個字段使用VarHandle封裝,只是為了提高代碼性能。
private static final VarHandle STATE;
private static final VarHandle HEAD;
private static final VarHandle TAIL;
static {
try {
MethodHandles.Lookup l = MethodHandles.lookup();
STATE = l.findVarHandle(AbstractQueuedSynchronizer.class, "state", int.class);
HEAD = l.findVarHandle(AbstractQueuedSynchronizer.class, "head", Node.class);
TAIL = l.findVarHandle(AbstractQueuedSynchronizer.class, "tail", Node.class);
} catch (ReflectiveOperationException e) {
throw new Error(e);
}
// Reduce the risk of rare disastrous classloading in first call to
// LockSupport.park: https://bugs.openjdk.java.net/browse/JDK-8074773
Class<?> ensureLoaded = LockSupport.class;
}
試想一下,當(dāng)一個線程成功獲取了同步狀態(tài),其他線程將無法獲取到同步狀態(tài),轉(zhuǎn)而被構(gòu)造成為節(jié)點并加入到同步隊列中,而這個加入隊列的過程必須要保證是線程安全的,因為同步器提供了一個基于CAS的設(shè)置尾節(jié)點的方法:compareAndSetTail(Node expect, Node update),它需要傳入一個當(dāng)前線程“認為”的尾節(jié)點和當(dāng)前節(jié)點,然后進行CAS更新。
> line: 2325
/**
* CASes tail field.
*/
private final boolean compareAndSetTail(Node expect, Node update) {
return TAIL.compareAndSet(this, expect, update);
}
同步器將節(jié)點加入到同步隊列的過程如下:

同步隊列遵循FIFO,首節(jié)點是獲取同步狀態(tài)的節(jié)點,首節(jié)點的線程在釋放同步狀態(tài)時,會喚醒后繼節(jié)點,而后繼節(jié)點會在成功獲取同步狀態(tài)時將自己設(shè)為頭節(jié)點:

設(shè)置頭節(jié)點是通過獲取同步狀態(tài)成功的節(jié)點來完成的,由于只有一個線程能夠獲取到同步狀態(tài),因此設(shè)置頭節(jié)點的方法并不需要使用CAS來保證。
> line: 674
/**
* Sets head of queue to be node, thus dequeuing. Called only by
* acquire methods. Also nulls out unused fields for sake of GC
* and to suppress unnecessary signals and traversals.
*
* @param node the node
*/
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}
獨占式同步狀態(tài)的獲取與釋放
通過調(diào)用同步器的acquire(int)方法可以獲取同步狀態(tài),該方法對中斷不敏感,也就是由于線程獲取同步狀態(tài)失敗后進入同步隊列中,后續(xù)如果對線程進行中斷操作,線程不會從同步隊列中移除。
> line: 1236
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
// 如果獲取同步狀態(tài)的過程中被中斷了,那么acquireQueued返回后就將自己中斷
selfInterrupt();
}
> line: 873
static void selfInterrupt() {
Thread.currentThread().interrupt();
}
上述代碼主要完成了同步狀態(tài)獲取、節(jié)點構(gòu)造、加入同步隊列以及在同步隊列中自旋等待的相關(guān)工作,其主要邏輯是:首先調(diào)用tryAcquire(int)嘗試獲得鎖,如果獲取鎖失敗,構(gòu)造一個獨占型結(jié)點并增加到隊尾。
然后循環(huán)CAS獲取鎖,如果前驅(qū)結(jié)點為head并且此時tryAcquire(int)成功,獲取到鎖,設(shè)置頭節(jié)點為此節(jié)點。如果失敗,調(diào)用shouldParkAfterFailedAcquire()方法判斷,如果返回true則阻塞當(dāng)前線程,否則循環(huán)再次獲取。
note: 前驅(qū)結(jié)點waitStatus為SIGNAL返回true,為其他則CAS設(shè)置為SIGNAL并返回false,即第二次調(diào)用一般返回true,即每個線程被構(gòu)造結(jié)點后有兩次機會嘗試獲取鎖,失敗則被阻塞。如果發(fā)生了異常,則取消此節(jié)點。
- 首先調(diào)用
tryAcquire(int)嘗試獲得鎖,如果獲取鎖失敗,構(gòu)造一個獨占型結(jié)點并增加到隊尾
> line: 650
private Node addWaiter(Node mode) {
//以獨占模式構(gòu)造一個節(jié)點
Node node = new Node(mode);
for (;;) {
Node oldTail = tail;
//獲取尾節(jié)點,如果為null則初始化同步隊列(對應(yīng)前面注釋中的延遲初始化)
if (oldTail != null) {
//設(shè)置prev結(jié)點
node.setPrevRelaxed(oldTail);
//CAS設(shè)置尾節(jié)點
if (compareAndSetTail(oldTail, node)) {
oldTail.next = node;
return node;
}
} else {
initializeSyncQueue();
}
}
}
> line: 2316
private final void initializeSyncQueue() {
Node h;
//使用CAS設(shè)置頭節(jié)點,此時head和tail指向同一節(jié)點
if (HEAD.compareAndSet(this, null, (h = new Node())))
tail = h;
}
> line: 561 Node#setPrevRelaxed(Node p)
final void setPrevRelaxed(Node p) {
PREV.set(this, p);
}
上面使用compareAndSetTail(oldTail, node)方法來去報節(jié)點能夠被線程安全添加。試想一下:如果使用一個普通的LinkedList來維護節(jié)點之間的關(guān)系,那么當(dāng)一個線程獲取了同步狀態(tài),而其他多個線程調(diào)用tryAcquire(int)獲取同步狀態(tài)失敗而被并發(fā)添加到LinkedList中時,LinkedList將難以保證Node的正確添加,最終的結(jié)果可能時節(jié)點數(shù)量有偏差,而且順序也是混亂的。
- 然后循環(huán)CAS獲取鎖,如果前驅(qū)結(jié)點為head并且此時
tryAcquire(int)成功,獲取到鎖,設(shè)置頭節(jié)點為此節(jié)點。如果失敗,調(diào)用shouldParkAfterFailedAcquire()方法判斷,如果返回true則阻塞當(dāng)前線程,否則循環(huán)再次獲取。
> line: 904
final boolean acquireQueued(final Node node, int arg) {
boolean interrupted = false;
try {
for (;;) {
// 獲取前驅(qū)結(jié)點
final Node p = node.predecessor();
// 如果前驅(qū)結(jié)點為頭節(jié)點并且tryAcquire(int)成功
if (p == head && tryAcquire(arg)) {
// 將自己設(shè)置為頭節(jié)點
setHead(node);
p.next = null; // help GC
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node))
interrupted |= parkAndCheckInterrupt();
}
} catch (Throwable t) {
//如果在獲取同步狀態(tài)時出現(xiàn)異常,則取消此節(jié)點
cancelAcquire(node);
if (interrupted)
selfInterrupt();
throw t;
}
}
> line: 842
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* 前驅(qū)結(jié)點已經(jīng)被設(shè)置為SIGNAL,要求線程釋放同步狀態(tài)時通知此節(jié)點,
* 所以它可以被安全的阻塞了。
* 如果線程第一次獲取同步狀態(tài)失敗,它前驅(qū)節(jié)點的waitStatus就會被設(shè)置為SIGNAL,
* 如果第二次再次失敗,它將會被阻塞。
*/
return true;
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* 新構(gòu)造的結(jié)點的waitStatus默認為0,將前驅(qū)節(jié)點的waitStatus CAS設(shè)置為SIGNAL。
*/
pred.compareAndSetWaitStatus(ws, Node.SIGNAL);
}
return false;
}
> line: 882
private final boolean parkAndCheckInterrupt() {
// 阻塞當(dāng)前線程
LockSupport.park(this);
// 返回線程的中斷標(biāo)志位
return Thread.interrupted();
}
> line: 789
private void cancelAcquire(Node node) {
// 如果此節(jié)點不存在則忽略
if (node == null)
return;
node.thread = null;
// 跳過前面被取消的結(jié)點
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
// predNext是前面第一個不需要刪除的結(jié)點。下面的CAS如果失敗了,
// 也就是我們和其他的cancel或signal操作競爭時失敗了,那么更后面
// 的操作就沒必要執(zhí)行了。
Node predNext = pred.next;
// 將waitStatus設(shè)置為CANCELLED
// 可以使用volatile寫代替CAS。此步完成后,其他結(jié)點能夠跳過此節(jié)點。
// 在此之前不受其他線程的干擾。
node.waitStatus = Node.CANCELLED;
// 如果是尾節(jié)點,則移除自己
if (node == tail && compareAndSetTail(node, pred)) {
pred.compareAndSetNext(predNext, null);
} else {
// 如果它的后繼節(jié)點需要喚醒,嘗試將其設(shè)置為前驅(qū)節(jié)點的next,
// 如此便可以喚醒它。否則,我們就直接喚醒它。
int ws;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && pred.compareAndSetWaitStatus(ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
pred.compareAndSetNext(predNext, next);
} else {
unparkSuccessor(node);
}
node.next = node; // help GC
}
}
> 看完cancelAcquire的源碼后,我們回頭再看shouldParkAfterFailedAcquire剩余部分
> line: 842
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
return true;
// 如果前驅(qū)結(jié)點的waitStatus為CANCELLED,那么跳過前面連續(xù)的CANCELLED結(jié)點,
// 尋找到一個waitStatus<=0的結(jié)點,并將其next指向此節(jié)點。
// 因為cancelAcquire正常執(zhí)行完CANCELLED結(jié)點的next會指向自己,當(dāng)pred
// 的next不再指向它時,將不再會有對象引用它,下一此gc時這些CACCELLED節(jié)點就會被回收
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
pred.compareAndSetWaitStatus(ws, Node.SIGNAL);
}
return false;
}
在acquireQueued(final Node node, int arg)方法中,當(dāng)前線程在循環(huán)中嘗試獲取同步狀態(tài),而只有前驅(qū)結(jié)點時頭節(jié)點時才能嘗試獲取同步狀態(tài),這是為什么?
- 頭節(jié)點是成功獲取到同步狀態(tài)的節(jié)點,而頭節(jié)點的線程釋放了同步狀態(tài)后,將會喚醒其后繼節(jié)點,后繼節(jié)點的線程被喚醒后需要檢查自己的前驅(qū)結(jié)點是否為頭節(jié)點。
- 維護同步隊列的FIFO原則。該方法中,節(jié)點自旋獲取同步狀態(tài)的行為如下:

由于非頭節(jié)點線程的前驅(qū)節(jié)點出隊或者被中斷線程會從等待狀態(tài)返回,隨后檢查自己的前驅(qū)節(jié)點是否為頭節(jié)點,如果是則嘗試獲取同步狀態(tài)??梢钥吹焦?jié)點與節(jié)點之間在循環(huán)檢查的過程中基本不相互通信,而是簡單的判斷自己的前驅(qū)是否為頭節(jié)點,這就使得節(jié)點的釋放規(guī)則符合FIFO,并且也便于對過早通知的處理(過早通知是指前驅(qū)節(jié)點不是頭節(jié)點的線程被意外喚醒或者因為被中斷而喚醒)。
acquire(int)方法調(diào)用流程如下所示:

可中斷獲取
> line: 1256
public final void acquireInterruptibly(int arg)
throws InterruptedException {
// 如果在嘗試獲取同步狀態(tài)前便被中斷了,那么立刻拋出InterruptedException
// note:其他線程對其調(diào)用Thread#interrupt()方法,此線程還能繼續(xù)運行,
// 只是中斷標(biāo)志位被設(shè)置為true。
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
> line: 929
private void doAcquireInterruptibly(int arg)
throws InterruptedException {
//構(gòu)造節(jié)點并增加到同步隊列中
final Node node = addWaiter(Node.EXCLUSIVE);
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
return;
}
// 如果在獲取同步狀態(tài)的過程中被中斷了,那么拋出InterruptedException
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} catch (Throwable t) {
cancelAcquire(node);
throw t;
}
}
可以看出此方法和acquire(int)方法幾乎一樣,除了線程如果被中斷會立刻拋出InterruptedException異常,而不是只被記錄下來。
超時獲取
> line: 1281
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquire(arg) ||
doAcquireNanos(arg, nanosTimeout);
}
> line: 957
private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
// 如果超時時間<=0,立刻返回
if (nanosTimeout <= 0L)
return false;
final long deadline = System.nanoTime() + nanosTimeout;
final Node node = addWaiter(Node.EXCLUSIVE);
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
return true;
}
// 如果超時時間結(jié)束,那么就取消此節(jié)點獲取同步狀態(tài)并返回false
nanosTimeout = deadline - System.nanoTime();
if (nanosTimeout <= 0L) {
cancelAcquire(node);
return false;
}
// 如果剩余時間大于1000ns,則線程睡眠
// 否則,線程進入快速的自旋中,嘗試獲取同步狀態(tài),
// 提高獲取同步狀態(tài)的可能。同時,1000ns以下的睡眠
// 無法保證精準(zhǔn)的執(zhí)行。
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD)
LockSupport.parkNanos(this, nanosTimeout);
if (Thread.interrupted())
throw new InterruptedException();
}
} catch (Throwable t) {
cancelAcquire(node);
throw t;
}
}
static final long SPIN_FOR_TIMEOUT_THRESHOLD = 1000L;
public static void parkNanos(Object blocker, long nanos) {
if (nanos > 0) {
Thread t = Thread.currentThread();
setBlocker(t, blocker);
U.park(false, nanos);
setBlocker(t, null);
}
}
可以看出此版本與acquire(int)也沒有太大區(qū)別,只是增加了中斷響應(yīng)和超時控制。
釋放同步狀態(tài)
當(dāng)前線程獲取同步狀態(tài)并執(zhí)行了相應(yīng)邏輯后,就需要釋放同步狀態(tài),使得后續(xù)節(jié)點能夠繼續(xù)獲取同步狀態(tài)。通過調(diào)用relaase(int)方法可以釋放同步狀態(tài),該方法釋放了同步狀態(tài)后,會喚醒其后繼節(jié)點。
>line: 1299
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
// 喚醒后繼節(jié)點
unparkSuccessor(h);
return true;
}
return false;
}
> line: 685
private void unparkSuccessor(Node node) {
/*
* 如果waitStatus為負值(可能需要喚醒后繼節(jié)點),嘗試清除它。
* 如果這個CAS失敗了或者waitStatus被等待線程改變也可以接受。
*/
int ws = node.waitStatus;
if (ws < 0)
node.compareAndSetWaitStatus(ws, 0);
/*
* 需要被喚醒的線程存儲在后繼節(jié)點中,一般都是下一個節(jié)點。
* 但是如果下一個節(jié)點被取消了或者是null,那么從隊列尾部開始
* 遍歷尋找沒有被取消的后繼節(jié)點。
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
// 作用是解除對下一個被取消節(jié)點的引用,使其能夠被gc
s = null;
for (Node p = tail; p != node && p != null; p = p.prev)
if (p.waitStatus <= 0)
s = p;
}
if (s != null)
LockSupport.unpark(s.thread);
}
分析完獨占式同步狀態(tài)獲取和釋放過程后,做一個總結(jié):在獲取同步狀態(tài)時,同步器維護一個同步隊列,獲取狀態(tài)失敗的線程都會被增加到同步隊列中并在隊列中自旋(只有兩次嘗試的機會,都失敗就會被阻塞,防止浪費CPU資源);移除隊列的條件是前驅(qū)結(jié)點為頭節(jié)點并且成功獲取了同步狀態(tài)。在釋放同步狀態(tài)時,同步器調(diào)用tryRelease(int)方法釋放同步狀態(tài),然后喚醒頭節(jié)點的后繼節(jié)點。
共享式同步狀態(tài)獲取與釋放
共享式與獨占式獲取最主要的區(qū)別在于同一時刻能否有多個線程同時獲取到同步狀態(tài),例如文件讀寫,可以有多個線程同時讀文件,但是只能有一個線程寫文件。
通過同步器的acquireShared(int)方法可以共享式獲取同步狀態(tài):
> line: 1320
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
> line: 992
private void doAcquireShared(int arg) {
// 增加一個SHARED節(jié)點
final Node node = addWaiter(Node.SHARED);
boolean interrupted = false;
try {
for (;;) {
final Node p = node.predecessor();
// 前驅(qū)結(jié)點為頭節(jié)點時嘗試獲取同步狀態(tài)
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
// 設(shè)置頭節(jié)點并嘗試喚醒后繼節(jié)點
setHeadAndPropagate(node, r);
p.next = null; // help GC
return;
}
}
if (shouldParkAfterFailedAcquire(p, node))
interrupted |= parkAndCheckInterrupt();
}
} catch (Throwable t) {
cancelAcquire(node);
throw t;
} finally {
if (interrupted)
selfInterrupt();
}
}
> line: 755
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
/*
* 如果滿足以下條件之一,嘗試喚醒下一個節(jié)點:
* 1. propagate>0,表示還有同步狀態(tài)可供獲取
* 2. waitStatus<0(在setHead()方法調(diào)用前后被其他操作修改),
* 表示喚醒后繼節(jié)點。注意:PROPAGATE在shouldParkAfterFailedAcquire
* 中可能會被更改為SIGNAL
* 并且下一個節(jié)點在共享模式下等待,或者為null 。
*
* 這些檢查可能會導(dǎo)致不必要的線程喚醒,當(dāng)時只有在多個線程正在
* 競爭acquires/releases時會發(fā)生,所以大部分情況都需要立刻喚醒
* 后面的共享節(jié)點。
*/
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
> line: 717
private void doReleaseShared() {
/*
* 確保釋放同步狀態(tài)傳遞,即使有其他正在進行中的acquires/releases。
* 如果waitStatus為SIGNAL使用unparkSuccessor(head)喚醒后繼節(jié)點。
* 但是如果不是,那么設(shè)置waitStatus為PROPAGATE來確保釋放傳遞
* 繼續(xù)下去。另外,當(dāng)我們這樣做時必須循環(huán)進行防止一個新節(jié)點被增加。
* 同時,不像unparkSuccessor在其他地方的使用,我們需要直到CAS是否
* 失敗,如果是那么進入下一個循環(huán)重新進行。
*/
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!h.compareAndSetWaitStatus(Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
// 當(dāng)前只有一個線程獲取了同步狀態(tài),沒有其他線程嘗試獲取,
// 所以waitStatus為0,CAS修改為PROPAGATE確保傳遞繼續(xù),因為
// 后續(xù)節(jié)點獲取同步狀態(tài)失敗時會調(diào)用shouldParkAfterFailedAcquire
// 方法將其設(shè)置為SIGNAL。
else if (ws == 0 &&
!h.compareAndSetWaitStatus(0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
在acquireShared(int)方法中,同步器調(diào)用tryAcquireShared(arg)方法嘗試獲取同步狀態(tài),tryAcquireShared(arg)方法返回值為int,當(dāng)返回值大于等于0時,表示能夠獲取到同步狀態(tài)。因此,在共享式獲取的自旋過程中,成功獲取同步狀態(tài)并退出自旋的條件就是tryAcquireShared(arg)方法返回值大于等于0。
可中斷獲取
> line: 1338
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
> line: 1022
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} catch (Throwable t) {
cancelAcquire(node);
throw t;
}
}
與獨占式獲取一樣,可中斷獲取版本能夠響應(yīng)中斷,當(dāng)線程被中斷后,立刻拋出InterruptedException異常。
超時獲取
> line: 1362
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquireShared(arg) >= 0 ||
doAcquireSharedNanos(arg, nanosTimeout);
}
> line: 1053
private boolean doAcquireSharedNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (nanosTimeout <= 0L)
return false;
final long deadline = System.nanoTime() + nanosTimeout;
final Node node = addWaiter(Node.SHARED);
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
return true;
}
}
nanosTimeout = deadline - System.nanoTime();
if (nanosTimeout <= 0L) {
cancelAcquire(node);
return false;
}
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD)
LockSupport.parkNanos(this, nanosTimeout);
if (Thread.interrupted())
throw new InterruptedException();
}
} catch (Throwable t) {
cancelAcquire(node);
throw t;
}
}
超時獲取和獨占式超時獲取也無差別,不再詳細贅述。
超時獲取同步狀態(tài)的流程如下:

共享同步狀態(tài)釋放
> line: 1379
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
doReleaseShared(int)方法前面已經(jīng)分析過。該方法釋放同步狀態(tài)后,將會喚醒處于等待狀態(tài)的節(jié)點。
剩余的一些監(jiān)視方法以及輔助方法不屬于核心機制部分,不對其進行分析,讀者有興趣可自行查看源碼,相信看懂上述部分后理解這些方法也不困難。同時,關(guān)于Condition的部分將在后面部分講解,下面先編寫一個自定義同步組件鞏固上面的分析。
自定義組件 —— TwinsLock
在前面幾節(jié)中,已經(jīng)對同步器AQS進行了主要功能實現(xiàn)的分析,本節(jié)通過編寫一個自定義同步組件來加深對同步器的理解。
設(shè)計一個同步工具:該工具在同一時刻,只允許最多兩個線程同時訪問,超時兩個線程的訪問將被阻塞。
首先,確定訪問模式。TwinsLock能夠同一時刻支持多個線程訪問,顯然是一個共享式訪問,因此需要AQS提供的acquiredShared(int)方法等和shared相關(guān)的方法,這就要求TwinsLock必須重新tryAcquiredShared(int)和tryReleaseShared(int)方法,這樣才能保證同步器的共享式同步狀態(tài)的獲取和釋放方法得以執(zhí)行。
其次,定義資源數(shù)。TwinsLock同一時刻允許最多兩個線程訪問,表明同步資源數(shù)為2,這樣可以設(shè)置初始狀態(tài)為2,當(dāng)一個線程進行獲取時,status減1,線程釋放,status加1,狀態(tài)的合法范圍是0、1、2。其中0表示當(dāng)前已經(jīng)有兩個線程獲取了同步資源,此時如果還有其他線程嘗試獲取同步狀態(tài),只能被阻塞。在同步狀態(tài)改變時,需要使用compareAndSet(int, int)方法做原子性保障。
最后,組合自定義同步器。
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
public class TwinsLock implements Lock {
private final Sync sync = new Sync(2);
@Override
public void lock() {
sync.acquireShared(1);
}
@Override
public void lockInterruptibly() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
@Override
public boolean tryLock() {
return sync.tryAcquireShared(1) >= 0;
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(time));
}
@Override
public void unlock() {
sync.releaseShared(1);
}
@Override
public Condition newCondition() {
return sync.newCondition();
}
private static class Sync extends AbstractQueuedSynchronizer {
private volatile int count;
Sync(int count) {
if(count <= 0) {
throw new IllegalArgumentException("count must large than 0");
}
this.count = count;
setState(count);
}
@Override
protected int tryAcquireShared(int arg) {
for(;;) {
int current = getState();
int newCount = current - arg;
if(newCount < 0 || compareAndSetState(current, newCount)) {
return newCount;
}
}
}
@Override
protected boolean tryReleaseShared(int arg) {
for(;;) {
int current = getState();
int newCount = current + arg;
if(newCount > count) {
throw new IllegalMonitorStateException("Unexpected unlock");
}
if(compareAndSetState(current, newCount)) {
return true;
}
}
}
Condition newCondition() {
return new ConditionObject();
}
}
}
在上述示例中,TwinsLock實現(xiàn)了Lock接口,提供了面向使用者的接口,使用者調(diào)用lock()方法獲取鎖,隨后調(diào)用unlock()方法釋放鎖。TwinsLock同時包含了一個自定義同步器Sync,該同步器面向線程訪問和同步狀態(tài)控制。
同步器作為一個橋梁,連接線程訪問以及同步狀態(tài)控制等底層技術(shù)與不同并發(fā)組件(如Lock、CountDownLatch等)的接口語義。
下面編寫一個測試驗證TwinsLock能否正常工作。
public class TwinsLockTest {
@Test
public void test() {
final Lock lock = new TwinsLock();
class Worker implements Runnable {
@Override
public void run() {
lock.lock();
try {
SleepUtils.sleep(1);
System.out.println(Thread.currentThread().getName());
SleepUtils.sleep(1);
} finally {
lock.unlock();
}
}
}
for (int i = 0; i < 10; ++i) {
Thread worker = new Thread(new Worker(), "Worker-" + i);
worker.setDaemon(true);
worker.start();
}
for (int i = 0; i < 10; ++i) {
SleepUtils.sleep(1);
System.out.println();
}
}
}
輸出如下:
Worker-2
Worker-0
Worker-1
Worker-5
Worker-3
Worker-4
Worker-8
Worker-9
Worker-6
Worker-7
可以看出線程名稱成對輸出,也就是同一時刻最多只有兩個線程能夠獲取到鎖,TwinsLock可以按預(yù)期正常工作。
Condition 接口
任意一個Java對象,都擁有一組監(jiān)視器方法(定義在java.lang.Object)上,主要包括wait(),wait(long),notify(),notifyAll()方法,這些方法與synchronized關(guān)鍵字配合,能夠?qū)崿F(xiàn)等待/通知機制。Condition接口也提供了類似Object的監(jiān)視器方法,與Lock配合也可以實現(xiàn)等待/通知機制,但是這兩者在使用方式以及功能特性上有所區(qū)別。
通過對比Object的監(jiān)視器方法和Condition接口,可以更詳細地了解Condition的特性。
| 對比項 | Object Monitor Methods | Conditon |
|---|---|---|
| 前置條件 | 獲取對象的鎖 | 調(diào)用Lock.lock()獲取鎖 調(diào)用Lock.newCondtion()獲取Condition對象 |
| 調(diào)用方式 | 直接調(diào)用,如object.wait() | 直接調(diào)用,如condition.await() |
| 等待隊列個數(shù) | 一個 | 多個 |
| 當(dāng)前線程釋放鎖并進入等待狀態(tài) | 支持 | 支持 |
| 當(dāng)前線程釋放鎖并進入等待狀態(tài) 后響應(yīng)中斷 |
不支持 | 支持 |
| 當(dāng)前線程釋放鎖并進入超時等待狀態(tài) | 支持 | 支持 |
| 當(dāng)前線程釋放鎖并進入等待狀態(tài)到將來某個時間 | 不支持 | 支持 |
| 喚醒等待隊列的一個線程 | 支持 | 支持 |
| 喚醒等待隊列的全部線程 | 支持 | 支持 |