更多 Java 并發(fā)編程方面的文章,請(qǐng)參見文集《Java 并發(fā)編程》
AbstractQueuedSynchronizer (AQS)
AQS 的功能可以分為兩類,獨(dú)占功能 和 共享功能。
它的所有子類中:
- 要么實(shí)現(xiàn)并使用了它獨(dú)占功能的 API,例如
ReentrantLockRenntrantReadWriteLock - 要么使用了共享鎖的功能,例如
CountDownLatch - 而不會(huì)同時(shí)使用兩套 API,即便是它最有名的子類 ReentrantReadWriteLock,也是通過兩個(gè)內(nèi)部類:讀鎖和寫鎖,分別實(shí)現(xiàn)的兩套API來實(shí)現(xiàn)的
AQS 的基本思想
AQS 使用標(biāo)志位 + 隊(duì)列的方式,記錄獲取鎖、競(jìng)爭(zhēng)鎖、釋放鎖等一系列鎖的狀態(tài)。
-
獲取鎖:判斷當(dāng)前狀態(tài)
waitStatus是否允許獲取鎖- 如果是就獲取鎖,修改當(dāng)前狀態(tài),并且如果進(jìn)了隊(duì)列就從隊(duì)列中移除
- 否則就阻塞操作或者獲取失敗,也就是說如果是獨(dú)占鎖就可能阻塞,如果是共享鎖就可能失敗
- 另外如果是阻塞線程,那么線程就需要進(jìn)入阻塞隊(duì)列
- 釋放鎖:修改狀態(tài)位,如果有線程因?yàn)闋顟B(tài)位阻塞的話就喚醒隊(duì)列中的一個(gè)或者更多線程。
要支持上面兩個(gè)操作就必須有下面的條件:
-
原子性操作同步器的狀態(tài)位:
volatile int waitStatus;- 它是一個(gè) volatile 變量,確保了可見性
- 使用 CAS 操作來更新 waitStatus,確保了原子性
- waitStatus 可能的狀態(tài)包括:
- CANCELLED = 1: 節(jié)點(diǎn)操作因?yàn)槌瑫r(shí)或者對(duì)應(yīng)的線程被 interrupt。節(jié)點(diǎn)不應(yīng)該留在此狀態(tài),一旦達(dá)到此狀態(tài)將從CHL隊(duì)列中踢出。
-
SIGNAL = -1: 節(jié)點(diǎn)的繼任節(jié)點(diǎn)是(或者將要成為)BLOCKED狀態(tài)(例如通過 LockSupport.park() 操作),因此一個(gè)節(jié)點(diǎn)一旦被釋放(解鎖)或者取消就需要喚醒(LockSupport.unpack())它的繼任節(jié)點(diǎn)。
只有當(dāng)前節(jié)點(diǎn)的前一個(gè)節(jié)點(diǎn)為 SIGNAL 時(shí),才能當(dāng)前節(jié)點(diǎn)才能被掛起。 - CONDITION = -2:表明節(jié)點(diǎn)對(duì)應(yīng)的線程因?yàn)椴粷M足一個(gè)條件(Condition)而被阻塞。
- 0: 正常狀態(tài),新生的非CONDITION節(jié)點(diǎn)都是此狀態(tài)。
- 非負(fù)值標(biāo)識(shí)節(jié)點(diǎn)不需要被通知(喚醒)。
阻塞和喚醒線程:在 JDK 5.0 以后利用 JNI 在 LockSupport 類中實(shí)現(xiàn)了此特性。
LockSupport.park()
LockSupport.park(Object)
LockSupport.parkNanos(Object, long)
LockSupport.parkNanos(long)
LockSupport.parkUntil(Object, long)
LockSupport.parkUntil(long)
LockSupport.unpark(Thread)
上面的API中 park() 是在當(dāng)前線程中調(diào)用,導(dǎo)致線程阻塞,帶參數(shù)的 Object 是掛起的對(duì)象,這樣監(jiān)視的時(shí)候就能夠知道此線程是因?yàn)槭裁促Y源而阻塞的。-
一個(gè)有序的隊(duì)列:采用 CHL 列表來解決有序的 FIFO 隊(duì)列的問題
- 對(duì)于入隊(duì)列(enqueue):采用 CAS 操作,每次比較尾結(jié)點(diǎn)是否一致,然后插入的到尾結(jié)點(diǎn)中。
- 對(duì)于出隊(duì)列(dequeue):由于每一個(gè)節(jié)點(diǎn)也緩存了一個(gè)狀態(tài),決定是否出隊(duì)列,因此當(dāng)不滿足條件時(shí)就需要自旋等待,一旦滿足條件就將頭結(jié)點(diǎn)設(shè)置為下一個(gè)節(jié)點(diǎn)。
AQS 核心字段
-
private volatile int state;:描述的有多少個(gè)線程取得了鎖,對(duì)于互斥鎖來說state<=1。 -
private transient volatile Node head;:等待隊(duì)列的頭 -
private transient volatile Node tail;:等待隊(duì)列的尾,head 與 tail 構(gòu)成了一個(gè) FIFO 隊(duì)列
Node 節(jié)點(diǎn)的屬性包括:
-
volatile Node prev;:此節(jié)點(diǎn)的前一個(gè)節(jié)點(diǎn)。節(jié)點(diǎn)的 waitStatus 依賴于前一個(gè)節(jié)點(diǎn)的狀態(tài)。 -
volatile Node next;:此節(jié)點(diǎn)的后一個(gè)節(jié)點(diǎn)。后一個(gè)節(jié)點(diǎn)是否被喚醒依賴于當(dāng)前節(jié)點(diǎn)是否被釋放。 -
volatile Thread thread;:節(jié)點(diǎn)綁定的線程。 -
volatile int waitStatus;:描述節(jié)點(diǎn)的狀態(tài) -
Node nextWaiter;:下一個(gè)等待條件(Condition)的節(jié)點(diǎn),由于 Condition 是獨(dú)占模式,因此這里有一個(gè)簡(jiǎn)單的隊(duì)列來描述 Condition 上的線程節(jié)點(diǎn)。
AQS 獨(dú)占鎖
有且只有一個(gè)線程獲取到鎖,其余線程全部掛起,直到該擁有鎖的線程釋放鎖,被掛起的線程被喚醒重新開始競(jìng)爭(zhēng)鎖。
例如 ReentrantLock RenntrantReadWriteLock就是獨(dú)占鎖。
比如 ReentrantLock 中鎖的實(shí)現(xiàn) Sync 繼承了 AbstractQueuedSynchronizer,同時(shí)包括了公平鎖 FairSync 和非公平鎖NonfairSync :
public class ReentrantLock implements Lock, java.io.Serializable {
private static final long serialVersionUID = 7373984872572414699L;
/** Synchronizer providing all implementation mechanics */
private final Sync sync;
/**
* Base of synchronization control for this lock. Subclassed
* into fair and nonfair versions below. Uses AQS state to
* represent the number of holds on the lock.
*/
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = -5179523762034025860L;
/**
* Performs {@link Lock#lock}. The main reason for subclassing
* is to allow fast path for nonfair version.
*/
abstract void lock();
......
static final class NonfairSync extends Sync {
......
static final class FairSync extends Sync {
......
AQS 共享鎖
例如 CountDownLatch 就是共享鎖。
在 Java CyclicBarrier VS CountDownLatch 中關(guān)于 CountDownLatch 的示例中,使用了 private static CountDownLatch countDownLatch = new CountDownLatch(5); 就是表示該鎖可能被 5 個(gè)線程共享。
和 ReentrantLock 類似,CountDownLatch 內(nèi)部也有一個(gè)叫做 Sync 的內(nèi)部類,同樣也是用它繼承了AbstractQueuedSynchronizer。
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
Sync(int count) {
setState(count);
}
int getCount() {
return getState();
}
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}
- 其中的構(gòu)造方法
setState(count)就是設(shè)置private volatile int state;,描述的有多少個(gè)線程取得了鎖` - 其中的
tryAcquireShared()方法判斷了state是否為 0,即計(jì)數(shù)器是否為 0。
引用:
深度解析Java 8:JDK1.8 AbstractQueuedSynchronizer的實(shí)現(xiàn)分析(上)
深度解析Java 8:AbstractQueuedSynchronizer的實(shí)現(xiàn)分析(下)
深入淺出 Java Concurrency (7): 鎖機(jī)制 part 2 AQS