在java.util.concurrent包中,大部分的同步器都是基于AbstractQueuedSynchronizer(AQS)這個(gè)框架實(shí)現(xiàn)的。這個(gè)框架為同步狀態(tài)提供原子性管理、線程的阻塞和解除阻塞以及排隊(duì)提供了一種通用機(jī)制。
同步器一般包含2種方法,一種是acquire,另一種是release。acquire操作阻塞線程,獲取鎖。release通過(guò)某種方式改變讓被acquire阻塞的線程繼續(xù)執(zhí)行,釋放鎖。為了實(shí)現(xiàn)這2種操作,需要以下3個(gè)基本組件的相互協(xié)作:
- 同步狀態(tài)的原子性管理
- 線程的阻塞和解除阻塞
- 隊(duì)列管理
同步狀態(tài)
/**
* The synchronization state.
*/
private volatile int state;
AQS使用一個(gè)int變量來(lái)保存同步狀態(tài),并暴露出getState、setState以及compareAndSet來(lái)讀取或更新這個(gè)狀態(tài)。并且用了volatile來(lái)修飾,保證了在多線程環(huán)境下的可見性。通過(guò)使用compare-and-swap(CAS)指令來(lái)實(shí)現(xiàn)compareAndSetState。
這里的同步狀態(tài)用int而非long,主要是因?yàn)?4位long字段的原子性操作在很多平臺(tái)上是使用內(nèi)部鎖的方式來(lái)模擬實(shí)現(xiàn)的,這會(huì)使得同步器的會(huì)有性能問(wèn)題。絕對(duì)多數(shù)int型的state足夠我們使用,但JDK也提供了long型state的實(shí)現(xiàn):java.util.concurrent.locks.AbstractQueuedLongSynchronizer。
阻塞
JDK1.5之前,阻塞線程和解除線程阻塞都是基于Java自身的監(jiān)控器。在AQS中實(shí)現(xiàn)阻塞是用java.util.concurrent包的LockSuport類。方法LockSupport.park阻塞當(dāng)前線程,直到有個(gè)LockSupport.unpark方法被調(diào)用。
隊(duì)列管理
AQS框架關(guān)鍵就在于如何管理被阻塞的線程隊(duì)列。提供了2個(gè)隊(duì)列,分別是線程安全Sync Queue(CLH Queue)、普通的Condition Queue。
Sync Queue
Sync Queue是基于FIFO的隊(duì)列,用于構(gòu)建鎖或者其他相關(guān)同步裝置。CLH鎖可以更容易地去實(shí)現(xiàn)取消(cancellation)和超時(shí)功能,因此我們選擇了CLH鎖作為實(shí)現(xiàn)的基礎(chǔ)。
隊(duì)列中的元素Node是保存線程的引用和線程狀態(tài)。Node是AQS的一個(gè)靜態(tài)內(nèi)部類:
static final class Node {
static final Node SHARED = new Node();
static final Node EXCLUSIVE = null;
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;
volatile int waitStatus;
volatile Node prev;
volatile Node next;
volatile Thread thread;
Node nextWaiter;
}
Node類的成員變量如上所示,主要負(fù)責(zé)保存線程引用、隊(duì)列的前繼和后繼節(jié)點(diǎn),以及同步狀態(tài):
| 成員 | 描述 |
|---|---|
| waitStatus | 用來(lái)標(biāo)記Node的狀態(tài): CANCELLED:1, 表示當(dāng)前線程已經(jīng)被取消 SIGNAL:-1,表示當(dāng)前節(jié)點(diǎn)的后繼節(jié)點(diǎn)等待運(yùn)行 CONDITION:-2, 表示當(dāng)前節(jié)點(diǎn)已被加入Condition Queue PROPAGATE:-3, 共享鎖的最終狀態(tài)是PROPAGATE |
| thread | 當(dāng)前獲取lock的線程 |
| SHARED | 表示節(jié)點(diǎn)是共享模式 |
| EXCLUSIVE | 表示節(jié)點(diǎn)是獨(dú)占模式 |
| prev | 前繼節(jié)點(diǎn) |
| next | 后繼節(jié)點(diǎn) |
| nextWaiter | 存儲(chǔ)Condition Queue中的后繼節(jié)點(diǎn) |
Node元素是Sync Queue構(gòu)建的基礎(chǔ)。當(dāng)獲取鎖的時(shí)候,請(qǐng)求形成節(jié)點(diǎn)掛載在尾部。而鎖資源的釋放再獲取的過(guò)程是從開始向后進(jìn)行的。
acquire 獲取鎖
在AQS自身僅定義了類似acquire方法。在實(shí)現(xiàn)鎖的時(shí)候,一般會(huì)實(shí)現(xiàn)一個(gè)繼承AQS的內(nèi)部類Sync。而在Sync類中,我們根據(jù)需求來(lái)實(shí)現(xiàn)重寫tryAcquire方法和tryRelease方法。獨(dú)占鎖acquire方法如下:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
- 通過(guò)
tryAcquire(由不同的實(shí)現(xiàn)類實(shí)現(xiàn))嘗試獲取鎖,如果可以獲取鎖直接返回。獲取不到鎖,則調(diào)用addWaiter方法;
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
-
addWaiter方法作用是把當(dāng)前線程封裝成Node節(jié)點(diǎn),通過(guò)CAS操作快速嘗試掛載至隊(duì)列尾部。- 如果
tail節(jié)點(diǎn)t已經(jīng)有了:將t節(jié)點(diǎn)更新為當(dāng)前節(jié)點(diǎn)node的前繼節(jié)點(diǎn)node.prev,將t.next更新為當(dāng)前節(jié)點(diǎn)node; - 如果
tail節(jié)點(diǎn)添加失敗:- 如果
tail節(jié)點(diǎn)為空,那么原子化的分配一個(gè)頭節(jié)點(diǎn),并將尾節(jié)點(diǎn)指向頭節(jié)點(diǎn),這一步是初始化; - 如果
tail節(jié)點(diǎn)不為空,循環(huán)重復(fù)addWaiter方法的工作直至當(dāng)前節(jié)點(diǎn)入隊(duì)為止。
- 如果
- 如果
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
- 節(jié)點(diǎn)加入
Sync Queue之后,接下來(lái)就是要進(jìn)行鎖的獲取,或者說(shuō)是訪問(wèn)控制了,只有一個(gè)線程能夠在同一時(shí)刻繼續(xù)的運(yùn)行,而其他的進(jìn)入等待狀態(tài)。- 獲取當(dāng)前節(jié)點(diǎn)的前繼節(jié)點(diǎn)
- 當(dāng)前繼節(jié)點(diǎn)是頭結(jié)點(diǎn)并且能夠獲取狀態(tài),代表該當(dāng)前節(jié)點(diǎn)占有鎖;如果滿足上述條件,那么代表能夠占有鎖,根據(jù)節(jié)點(diǎn)對(duì)鎖占有的含義,設(shè)置頭結(jié)點(diǎn)為當(dāng)前節(jié)點(diǎn)。
- 否則進(jìn)入等待狀態(tài)。
至此,可以總結(jié)一次acquire的過(guò)程大致為:
release 釋放鎖
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
- 首先通過(guò)
CAS操作變更同步狀態(tài)state。 - 釋放成功后,通過(guò)
LockSupport.unpark方法來(lái)喚醒后繼節(jié)點(diǎn),后繼節(jié)點(diǎn)繼續(xù)獲取鎖。
Condition Queue
AQS框架提供了一個(gè)ConditionObject內(nèi)部類,給維護(hù)獨(dú)占同步的類以及實(shí)現(xiàn)Lock接口的類使用。一個(gè)鎖對(duì)象可以關(guān)聯(lián)任意數(shù)目的條件對(duì)象,可以提供典型的Java監(jiān)視器風(fēng)格的await、signal和signalAll操作,包括帶有超時(shí)的,以及一些檢測(cè)、監(jiān)控的方法。Condition Queue是普通的隊(duì)列并不要求是線程安全,原因是在線程在操作Condition時(shí),要求線程必須獨(dú)占鎖,不需要考慮并發(fā)的問(wèn)題。
Condition Queue也是以Node為基礎(chǔ)的隊(duì)列。
/** First node of condition queue. */
private transient Node firstWaiter;
/** Last node of condition queue. */
private transient Node lastWaiter;
await操作
-
Condition在執(zhí)行await操作時(shí),首先會(huì)調(diào)用addConditionWaiter()方法將當(dāng)前線程封裝的Node節(jié)點(diǎn)加入到wait queue。
private Node addConditionWaiter() {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
上述addConditionWaiter的邏輯是:
- 首先清除
Condition Queue隊(duì)列中cancelled狀態(tài)的尾節(jié)點(diǎn); - 如
Condition Queue隊(duì)列為空,封裝當(dāng)前線程的node節(jié)點(diǎn)為Condition Queue的firstWaiter。如Condition Queue隊(duì)列不為空,則把該節(jié)點(diǎn)加至隊(duì)列尾部。
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
- 加入
Condition queue之后,要釋放當(dāng)前線程獲取的所有的鎖; - 如果線程沒有在
Sync Queue中,將調(diào)用LockSupport.park阻塞當(dāng)前線程,直到signalled或者interrupted喚醒去獲取鎖。
single 操作
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
- 首先檢查線程是否獨(dú)占鎖;
- 獲取
Codition Queue的firstWaiter,將節(jié)點(diǎn)轉(zhuǎn)移至Sync Queue中去。
singleAll 操作
private void doSignalAll(Node first) {
lastWaiter = firstWaiter = null;
do {
Node next = first.nextWaiter;
first.nextWaiter = null;
transferForSignal(first);
first = next;
} while (first != null);
}
signalAll喚醒Condition Queue的所有等待線程,將所有的Condition Queue中的node元素轉(zhuǎn)移至Sync Queue中去。
其他API
這里只介紹了獨(dú)占鎖模式下,普通acquire、release方法的原理,AQS還提供了很多可以供我們選擇的API:
- 如優(yōu)先考慮中斷、超時(shí)的:
acquireInterruptibly、tryAcquireNanos; - 如共享鎖模式下
acquireShared、releaseShared;
等等...
這里暫時(shí)不詳細(xì)分析,后面有時(shí)間的話可以再做了解。
參考: