1、AQS介紹
AQS全稱AbstractQueuedSynchronizer,是一個同步器,用來構(gòu)建鎖或者其他同步組件的基礎(chǔ)框架。內(nèi)部主要使用一個volatile修飾的state變量和一個FIFO雙向隊列來實現(xiàn)的。
/**
* Head of the wait queue, lazily initialized. Except for
* initialization, it is modified only via method setHead. Note:
* If head exists, its waitStatus is guaranteed not to be
* CANCELLED.
*/
private transient volatile Node head;
/**
* Tail of the wait queue, lazily initialized. Modified only via
* method enq to add new wait node.
*/
private transient volatile Node tail;
/**
* The synchronization state.
*/
private volatile int state;
ReentrantLock、Semaphore、CountDownLatch等都是基于AQS實現(xiàn)的。
同步器的主要使用方式是繼承,子類通過繼承同步器并實現(xiàn)它的抽象方法來管理同步狀 態(tài),在抽象方法的實現(xiàn)過程中免不了要對同步狀態(tài)進行更改,這時就需要使用同步器提供的3 個方法(getState()、setState(int newState)和compareAndSetState(int expect,int update))來進行操 作,因為它們能夠保證狀態(tài)的改變是安全的。

子類推薦被定義為自定義同步組件的靜態(tài)內(nèi)部類,同步器自身沒有實現(xiàn)任何同步接口,它僅僅是定義了若干同步狀態(tài)獲取和釋放的方法來 供自定義同步組件使用,同步器既可以支持獨占式地獲取同步狀態(tài),也可以支持共享式地獲 取同步狀態(tài),這樣就可以方便實現(xiàn)不同類型的同步組件(ReentrantLock、 ReentrantReadWriteLock和CountDownLatch等)
public class Main {
static class Sync extends AbstractQueuedSynchronizer {
@Override
protected boolean tryAcquire(int arg) {
return super.tryAcquire(arg);
}
@Override
protected boolean tryRelease(int arg) {
return super.tryRelease(arg);
}
@Override
protected int tryAcquireShared(int arg) {
return super.tryAcquireShared(arg);
}
@Override
protected boolean tryReleaseShared(int arg) {
return super.tryReleaseShared(arg);
}
@Override
protected boolean isHeldExclusively() {
return super.isHeldExclusively();
}
}
}
同步器是實現(xiàn)鎖(也可以是任意同步組件)的關(guān)鍵,在鎖的實現(xiàn)中聚合同步器,利用同步器實現(xiàn)鎖的語義??梢赃@樣理解二者之間的關(guān)系:鎖是面向使用者的,它定義了使用者與鎖交 互的接口(比如可以允許兩個線程并行訪問),隱藏了實現(xiàn)細節(jié);同步器面向的是鎖的實現(xiàn)者,它簡化了鎖的實現(xiàn)方式,屏蔽了同步狀態(tài)管理、線程的排隊、等待與喚醒等底層操作。鎖和同步器很好地隔離了使用者和實現(xiàn)者所需關(guān)注的領(lǐng)域。
2、同步器的接口與示例
同步器的設(shè)計是基于模板方法模式的,也就是說,使用者需要繼承同步器并重寫指定的方法,隨后將同步器組合在自定義同步組件的實現(xiàn)中,并調(diào)用同步器提供的模板方法,而這些模板方法將會調(diào)用使用者重寫的方法。 這句話聽起來很繞,慢慢往后看就懂了。
我們在繼承AQS并重寫那5個方法的時候,需要調(diào)用AQS提供的幾個方法去訪問或修改stats變量的狀態(tài)。
// 獲取當(dāng)前同步狀態(tài)
protected final int getState() {
return state;
}
// 設(shè)置當(dāng)前同步狀態(tài)
protected final void setState(int newState) {
state = newState;
}
// 使用CAS設(shè)置當(dāng)前狀態(tài),CAS能保證操作的原子性
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
我們在繼承同步器時可重寫的方法如下

在實現(xiàn)自定義同步組件時,也會調(diào)用同步器提供的模板方法,部分模板方法如下

AQS提供的模板方法主要分為三類:
(1)獨占式獲取和釋放同步狀態(tài)。
(2)共享式獲取和釋放同步狀態(tài)。
(3)查詢同步隊列中等待的線程情況。
3、實現(xiàn)一個獨占鎖
獨占鎖,就是同一時刻只能一個線程擁有鎖?;贏QS,我們可以很方便的實現(xiàn)。
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
/**
* 基于AQS實現(xiàn)獨占鎖
*/
public class ExclusiveLock implements Lock {
/**
* 實現(xiàn)自定義的同步器
*/
private static class Sync extends AbstractQueuedSynchronizer {
/**
* 獲取鎖
* @param arg
* @return
*/
@Override
protected boolean tryAcquire(int arg) {
// 設(shè)置同步變量state為1
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
@Override
protected boolean tryRelease(int arg) {
if (getState() == 0) {
throw new IllegalMonitorStateException();
}
setExclusiveOwnerThread(null);
setState(0);
return true;
}
/**
* 是否處于占用狀態(tài)
* @return
*/
@Override
protected boolean isHeldExclusively() {
return getState() == 1;
}
// 每個condition都包含一個condition隊列
Condition newCondition() {
return new ConditionObject();
}
}
private final Sync sync = new Sync();
@Override
public void lock() {
sync.acquire(1);
}
@Override
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
@Override
public boolean tryLock() {
return sync.tryAcquire(1);
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(time));
}
@Override
public void unlock() {
sync.release(1);
}
@Override
public Condition newCondition() {
return sync.newCondition();
}
}
一般在實現(xiàn)自定義同步器時,都會把它作為靜態(tài)內(nèi)部類去實現(xiàn)。上面實現(xiàn)的ExclusiveLock類中就是定義了一個靜態(tài)內(nèi)部類Sync去繼承AQS實現(xiàn)獨占鎖邏輯的。通過CAS設(shè)置同步變量state值為1表示獲取鎖成功,釋放鎖時把state設(shè)置為0即可。
4、AQS的實現(xiàn)分析
4.1 同步隊列
AQS內(nèi)部依賴同步隊列(雙向FIFO隊列)來進行線程的管理,當(dāng)線程獲取鎖失敗時,會將線程以及等待狀態(tài)信息構(gòu)造為一個節(jié)點Node并將其放入同步隊列隊尾,然后阻塞該線程。當(dāng)鎖釋放的時候,會把隊首節(jié)點中的線程喚醒,使其再次嘗試獲取同步狀態(tài)。
Node是AQS中的一個靜態(tài)內(nèi)部類,用來保存獲取同步狀態(tài)失敗的線程引用、等待狀態(tài)以及前驅(qū)和后繼節(jié)點,主要字段如下
static final class Node {
// 表示節(jié)點是共享模式
static final Node SHARED = new Node();
// 表示節(jié)點是獨占模式
static final Node EXCLUSIVE = null;
// 由于在同步隊列中等待的線程等待超時或者被中斷,需要從同步隊列中取消等待,節(jié)點進入該狀態(tài)將不會改變。
static final int CANCELLED = 1;
// 后繼節(jié)點的線程處于等待狀態(tài),而當(dāng)前節(jié)點的線程如果釋放了同步狀態(tài)或者被取消,將會通知后繼節(jié)點,使后繼節(jié)點的線程得以運行。
static final int SIGNAL = -1;
// 節(jié)點在等待隊列中,節(jié)點線程等待在Condition上,當(dāng)其他線程對Condition調(diào)用了signal()方法后,該節(jié)點將會從等待隊列轉(zhuǎn)移到同步隊列中,加入到對同步狀態(tài)的獲取中。
static final int CONDITION = -2;
// 表示下一次共享式同步狀態(tài)獲取將會無條件的傳播下去。
static final int PROPAGATE = -3;
/**
* 等待狀態(tài),取值如下
* SIGNAL 值為-1,
*
* CANCELLED 值為1,
*
* CONDITION 值為-2,
*
* PROPAGATE 值為-3,
*
* INITIAL 值為0,表示初始狀態(tài)。
*/
volatile int waitStatus;
// 前驅(qū)節(jié)點,當(dāng)節(jié)點加入同步隊列時被設(shè)置(尾部添加)
volatile Node prev;
// 后繼節(jié)點
volatile Node next;
// 獲取同步狀態(tài)的線程
volatile Thread thread;
// 等待隊列中的后繼節(jié)點。如果當(dāng)前節(jié)點是共享的,那么這個字段將是一個SHARED常量,也就是說節(jié)點類型(獨占和共享)和等待隊列中的后繼節(jié)點共用同一個字段
Node nextWaiter;
}
沒有成功獲取同步狀態(tài)的線程將會加入同步隊列的尾部,這個加入的過程也必須要保證線程安全,因此AQS提供了compareAndSetTail(Node expect, Node update)方法。同步隊列的結(jié)構(gòu)大致如下

同步隊列設(shè)置尾結(jié)點的過程大致如下

同步隊列的首節(jié)點是獲取鎖成功的線程,首節(jié)點的線程在釋放鎖后,將會喚醒后繼節(jié)點,后繼節(jié)點如果獲取鎖成功的同時會把自己設(shè)置為頭結(jié)點

設(shè)置頭結(jié)點是通過獲取同步狀態(tài)成功的線程來完成的,由于只有一個線程能成功獲取到同步狀態(tài),因此設(shè)置頭結(jié)點的方法并不需要CAS來保證,它只需要將首節(jié)點設(shè)置為頭結(jié)點的后繼節(jié)點然后斷開首節(jié)點即可。
4.2 獨占式同步狀態(tài)獲取和釋放
我們在實現(xiàn)自定義的獨占式同步器時,主要重寫了AQS的tryAcquire和tryRelease方法,通過操作同步變量state完成同步狀態(tài)的獲取和釋放。
我們可以調(diào)用AQS對外提供的acquire獲取同步狀態(tài),該方法會調(diào)用我們重新的tryAcquire方法。
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
如果調(diào)用tryAcquire方法返回為false,則通過addWaiter把線程加入同步隊列隊尾,并標(biāo)志位獨占Node.EXCLUSIVE。通過CAS確保節(jié)點能安全的添加到隊列尾。
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
加入隊列尾后,通過CAS不斷的嘗試獲取同步狀態(tài)。只有在前驅(qū)節(jié)點是頭結(jié)點的情況下,才有可能獲取到同步狀態(tài)。如果獲取不到則阻塞節(jié)點中的線程,而被阻塞線程的喚醒主要依靠前驅(qū)節(jié)點的出隊或阻塞線程被中斷來實現(xiàn)。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
在acquireQueued(final Node node,int arg)方法中,當(dāng)前線程在“死循環(huán)”中嘗試獲取同步狀態(tài),而只有前驅(qū)節(jié)點是頭節(jié)點才能夠嘗試獲取同步狀態(tài),這是為什么?原因有兩個,如下。
(1)頭節(jié)點是成功獲取到同步狀態(tài)的節(jié)點,而頭節(jié)點的線程釋放了同步狀態(tài)之后,將會喚醒其后繼節(jié)點,后繼節(jié)點的線程被喚醒后需要檢查自己的前驅(qū)節(jié)點是否是頭節(jié)點。
(2)維護同步隊列的FIFO原則。

獨占式同步狀態(tài)獲取流程,也就是acquire(int arg)方法調(diào)用流程大致如下

上圖中,當(dāng)同步狀態(tài)獲取成功之后,當(dāng)前線程從acquire(int arg)方法返回,如果對于鎖這種并發(fā)組件而言,代表著當(dāng)前線程獲取了鎖。
當(dāng)線程獲取同步狀態(tài)并執(zhí)行完相應(yīng)的邏輯后,就需要釋放同步狀態(tài),通過調(diào)用AQS提供的release方法。該方法在釋放了同步狀態(tài)之后,會喚醒其后繼節(jié)點(進而使后繼節(jié)點重新嘗試獲取同步狀態(tài))。
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
該方法執(zhí)行時,會喚醒頭節(jié)點的后繼節(jié)點線程,unparkSuccessor(Node node)方法使用LockSupport來喚醒處于等待狀態(tài)的線程。
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
分析了獨占式同步狀態(tài)獲取和釋放過程后,適當(dāng)做個總結(jié):在獲取同步狀態(tài)時,同步器維護一個同步隊列,獲取狀態(tài)失敗的線程都會被加入到隊列中并在隊列中進行自旋;移出隊列(或停止自旋)的條件是前驅(qū)節(jié)點為頭節(jié)點且成功獲取了同步狀態(tài)。在釋放同步狀態(tài)時,同步器調(diào)用tryRelease(int arg)方法釋放同步狀態(tài),然后喚醒頭節(jié)點的后繼節(jié)點。
4.3 共享式同步狀態(tài)獲取和釋放
共享式獲取與獨占式獲取最主要的區(qū)別在于同一時刻能否有多個線程同時獲取到同步狀態(tài)。以文件的讀寫為例,如果一個程序在對文件進行讀操作,那么這一時刻對于該文件的寫操作均被阻塞,而讀操作能夠同時進行。寫操作要求對資源的獨占式訪問,而讀操作可以是共享式訪問。
通過調(diào)用同步器的acquireShared(int arg)方法可以共享式地獲取同步狀態(tài)
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
在acquireShared方法中,會調(diào)用我們重寫的tryAcquireShared方法嘗試獲取同步狀態(tài),該方法返回值為int,返回值大于等于0時,表示獲取成功。因此,在共享式獲取的自旋過程中,成功獲取到同步狀態(tài)并退出自旋的條件就是tryAcquireShared(int arg)方法返回值大于等于0??梢钥吹?,在doAcquireShared(int arg)方法的自旋過程中,如果當(dāng)前節(jié)點的前驅(qū)為頭節(jié)點時,嘗試獲取同步狀態(tài),如果返回值大于等于0,表示該次獲取同步狀態(tài)成功并從自旋過程中退出。
與獨占式一樣,共享式獲取也需要釋放同步狀態(tài),通過調(diào)用releaseShared(int arg)方法可以釋放同步狀態(tài)
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
該方法在釋放同步狀態(tài)之后,將會喚醒后續(xù)處于等待狀態(tài)的節(jié)點。對于能夠支持多個線程同時訪問的并發(fā)組件(比如Semaphore),它和獨占式主要區(qū)別在于tryReleaseShared(int arg)方法必須確保同步狀態(tài)(或者資源數(shù))線程安全釋放,一般是通過循環(huán)和CAS來保證的,因為釋放同步狀態(tài)的操作會同時來自多個線程。
4.4 超時獲取同步狀態(tài)
通過調(diào)用同步器的doAcquireNanos(int arg,long nanosTimeout)和doAcquireSharedNanos(int arg, long nanosTimeout)方法可以超時獲取同步狀態(tài),即在指定的時間段內(nèi)獲取同步狀態(tài),如果獲取到同步狀態(tài)則返回true,否則,返回false。該方法提供了傳統(tǒng)Java同步操作(比如synchronized關(guān)鍵字)所不具備的特性。
private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (nanosTimeout <= 0L)
return false;
// 從現(xiàn)在起經(jīng)過nanosTimeout后超時
final long deadline = System.nanoTime() + nanosTimeout;
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
// 調(diào)用tryAcquire方法獨占式獲取同步狀態(tài)
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return true;
}
// 更新超時時間
nanosTimeout = deadline - System.nanoTime();
if (nanosTimeout <= 0L)
return false;
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
大致流程如下
