AQS是一個(gè)用來(lái)構(gòu)建鎖和同步器的框架,使用AQS能簡(jiǎn)單且高效地構(gòu)造出應(yīng)用廣泛的大量的同步器,比如ReentrantLock,Semaphore,其他的諸如ReentrantReadWriteLock,SynchronousQueue,F(xiàn)utureTask等等皆是基于AQS的。
1. AQS核心思想
AQS核心思想是,如果被請(qǐng)求的共享資源空閑,則將當(dāng)前請(qǐng)求資源的線程設(shè)置為有效的工作線程,并且將共享資源設(shè)置為鎖定狀態(tài)。如果被請(qǐng)求的共享資源被占用,那么就需要一套線程阻塞等待以及被喚醒時(shí)鎖分配的機(jī)制,這個(gè)機(jī)制AQS是用CLH隊(duì)列鎖實(shí)現(xiàn)的,即將暫時(shí)獲取不到鎖的線程加入到隊(duì)列中。
CLH(Craig,Landin,and Hagersten)隊(duì)列是一個(gè)虛擬的雙向隊(duì)列(虛擬的雙向隊(duì)列即不存在隊(duì)列實(shí)例,僅存在結(jié)點(diǎn)之間的關(guān)聯(lián)關(guān)系)。AQS是將每條請(qǐng)求共享資源的線程封裝成一個(gè)CLH鎖隊(duì)列的一個(gè)結(jié)點(diǎn)(Node)來(lái)實(shí)現(xiàn)鎖的分配。其中Sync queue,即同步隊(duì)列,是雙向鏈表,包括head結(jié)點(diǎn)和tail結(jié)點(diǎn),head結(jié)點(diǎn)主要用作后續(xù)的調(diào)度。而Condition queue不是必須的,其是一個(gè)單向鏈表,只有當(dāng)使用Condition時(shí),才會(huì)存在此單向鏈表。并且可能會(huì)有多個(gè)Condition queue。
AQS使用一個(gè)int成員變量來(lái)表示同步狀態(tài),通過(guò)內(nèi)置的FIFO隊(duì)列來(lái)完成獲取資源線程的排隊(duì)工作。AQS使用CAS對(duì)該同步狀態(tài)進(jìn)行原子操作實(shí)現(xiàn)對(duì)其值的修改。
private volatile int state; // 共享變量,使用volatile修飾保證線程可見(jiàn)性
狀態(tài)信息通過(guò)procted類型的getState,setState,compareAndSetState進(jìn)行操作
// 返回同步狀態(tài)的當(dāng)前值
protected final int getState() {
return state;
}
// 設(shè)置同步狀態(tài)的值
protected final void setState(int newState) {
state = newState;
}
//原子地(CAS操作)將同步狀態(tài)值設(shè)置為給定值update如果當(dāng)前同步狀態(tài)的值等于expect(期望值)
protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
2. AQS對(duì)資源共享的方式
AQS定義兩種資源共享方式
- Exclusive(獨(dú)占):只有一個(gè)線程能執(zhí)行,如ReentrantLock。又可分為公平鎖和非公平鎖:
- 公平鎖:按照線程在隊(duì)列中的排隊(duì)順序,先到者先拿到鎖
- 非公平鎖:當(dāng)線程要獲取鎖時(shí),無(wú)視隊(duì)列順序直接去搶鎖,誰(shuí)搶到就是誰(shuí)的
- Share(共享):多個(gè)線程可同時(shí)執(zhí)行,如Semaphore/CountDownLatch。Semaphore、CountDownLatCh、 CyclicBarrier、ReadWriteLock 。
ReentrantReadWriteLock 可以看成是組合式,因?yàn)镽eentrantReadWriteLock也就是讀寫(xiě)鎖允許多個(gè)線程同時(shí)對(duì)某一資源進(jìn)行讀。
不同的自定義同步器爭(zhēng)用共享資源的方式也不同。自定義同步器在實(shí)現(xiàn)時(shí)只需要實(shí)現(xiàn)共享資源 state 的獲取與釋放方式即可,至于具體線程等待隊(duì)列的維護(hù)(如獲取資源失敗入隊(duì)/喚醒出隊(duì)等),AQS已經(jīng)在上層已經(jīng)實(shí)現(xiàn)好了。
3. AQS使用的設(shè)計(jì)模式
同步器的設(shè)計(jì)是基于模板方法模式的,使用者繼承AbstractQueuedSynchronizer并重寫(xiě)指定的方法。(這些重寫(xiě)方法很簡(jiǎn)單,無(wú)非是對(duì)于共享資源state的獲取和釋放) 將AQS組合在自定義同步組件的實(shí)現(xiàn)中,并調(diào)用其模板方法,而這些模板方法會(huì)調(diào)用使用者重寫(xiě)的方法。以下是AQS提供的可重寫(xiě)的方法:
protected boolean isHeldExclusively();//該線程是否正在獨(dú)占資源。只有用到condition才需要去實(shí)現(xiàn)它。
protected boolean tryAcquire(int);//獨(dú)占方式。嘗試獲取資源,成功則返回true,失敗則返回false。
protected boolean tryRelease(int);//獨(dú)占方式。嘗試釋放資源,成功則返回true,失敗則返回false。
protected int tryAcquireShared(int);//共享方式。嘗試獲取資源。負(fù)數(shù)表示失??;0表示成功,但沒(méi)有剩余可用資源;正數(shù)表示成功,且有剩余資源。
protected boolean tryReleaseShared(int);//共享方式。嘗試釋放資源,成功則返回true,失敗則返回false。
4. Node類
該類為AbstractQueuedSynchronizer的一個(gè)靜態(tài)內(nèi)部類,每個(gè)被阻塞的線程都會(huì)被封裝成一個(gè)Node結(jié)點(diǎn),放入隊(duì)列。
static final class Node {
// 模式,分為共享與獨(dú)占
// 共享模式
static final Node SHARED = new Node();
// 獨(dú)占模式
static final Node EXCLUSIVE = null;
// 結(jié)點(diǎn)狀態(tài)常量
static final int CANCELLED = 1; // 表示當(dāng)前的線程被取消,線程被中斷的情況
static final int SIGNAL = -1; //表示當(dāng)前節(jié)點(diǎn)的后繼節(jié)點(diǎn)包含的線程需要運(yùn)行,也就是需要去unpark后面的節(jié)點(diǎn)
static final int CONDITION = -2; //表示當(dāng)前節(jié)點(diǎn)在等待condition,也就是在condition隊(duì)列中
static final int PROPAGATE = -3; //表示當(dāng)前場(chǎng)景下后續(xù)的acquireShared能夠得以執(zhí)行
// 結(jié)點(diǎn)狀態(tài)
volatile int waitStatus; // 值為0,表示當(dāng)前節(jié)點(diǎn)在sync隊(duì)列中,等待著獲取鎖,后面已經(jīng)沒(méi)有其他節(jié)點(diǎn)了,就不用去unPark()
// 前驅(qū)結(jié)點(diǎn)
volatile Node prev;
// 后繼結(jié)點(diǎn)
volatile Node next;
// 結(jié)點(diǎn)所對(duì)應(yīng)的線程
volatile Thread thread;
// 下一個(gè)等待者
Node nextWaiter;
// 結(jié)點(diǎn)是否在共享模式下等待
final boolean isShared() {
return nextWaiter == SHARED;
}
// 獲取前驅(qū)結(jié)點(diǎn),若前驅(qū)結(jié)點(diǎn)為空,拋出異常
final Node predecessor() throws NullPointerException {
// 保存前驅(qū)結(jié)點(diǎn)
Node p = prev;
if (p == null) // 前驅(qū)結(jié)點(diǎn)為空,拋出異常
throw new NullPointerException();
else // 前驅(qū)結(jié)點(diǎn)不為空,返回
return p;
}
// 無(wú)參構(gòu)造方法
Node() { // 由addWaiter使用用于建立初始標(biāo)頭或SHARED標(biāo)記
}
// 構(gòu)造方法
Node(Thread thread, Node mode) { // 由addWaiter使用
this.nextWaiter = mode;
this.thread = thread;
}
// 構(gòu)造方法
Node(Thread thread, int waitStatus) { // 由Condition使用
this.waitStatus = waitStatus;
this.thread = thread;
}
}
5. AbstractQueuedSynchronizer類屬性
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
// 版本號(hào)
private static final long serialVersionUID = 7373984972572414691L;
// 頭節(jié)點(diǎn)
private transient volatile Node head;
// 尾結(jié)點(diǎn)
private transient volatile Node tail;
// 狀態(tài)
private volatile int state;
// 自旋時(shí)間
static final long spinForTimeoutThreshold = 1000L;
// Unsafe類實(shí)例
private static final Unsafe unsafe = Unsafe.getUnsafe();
// state內(nèi)存偏移地址
private static final long stateOffset;
// head內(nèi)存偏移地址
private static final long headOffset;
// state內(nèi)存偏移地址
private static final long tailOffset;
// tail內(nèi)存偏移地址
private static final long waitStatusOffset;
// next內(nèi)存偏移地址
private static final long nextOffset;
// 靜態(tài)初始化塊
static {
try {
//unsafe.objectFieldOffset();用于獲取某個(gè)字段相對(duì)Java對(duì)象的“起始地址”的偏移量
stateOffset = unsafe.objectFieldOffset(AbstractQueuedSynchronizer.class.getDeclaredField("state"));
headOffset = unsafe.objectFieldOffset(AbstractQueuedSynchronizer.class.getDeclaredField("head"));
tailOffset = unsafe.objectFieldOffset(AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
waitStatusOffset = unsafe.objectFieldOffset(Node.class.getDeclaredField("waitStatus"));
nextOffset = unsafe.objectFieldOffset(Node.class.getDeclaredField("next"));
} catch (Exception ex) { throw new Error(ex); }
}
}
6. 核心方法
6.1 acquire方法(加鎖)
該方法以獨(dú)占模式獲取(資源),忽略中斷,即線程在acquire過(guò)程中,中斷此線程是無(wú)效的。源碼如下:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
調(diào)用流程如下:

- 首先調(diào)用tryAcquire方法,調(diào)用此方法的線程會(huì)試圖在獨(dú)占模式下加鎖,如果加鎖成功什么都不做,如果加鎖失敗,那么!tryAcquire(arg)=true,說(shuō)明此時(shí)的鎖被其他線程所占有。在AbstractQueuedSynchronizer源碼中默認(rèn)會(huì)拋出一個(gè)異常,即需要子類去重寫(xiě)此方法完成自己的邏輯。
- 若tryAcquire失敗,則調(diào)用addWaiter方法,addWaiter方法完成的功能是將調(diào)用此方法的線程封裝成為一個(gè)結(jié)點(diǎn)并放入同步隊(duì)列。
- 調(diào)用acquireQueued方法,此方法完成的功能是同步隊(duì)列中的結(jié)點(diǎn)不斷嘗試獲取鎖,若成功,則返回true,否則,返回false。
- 如果if條件滿足就會(huì)被中斷。
addWaiter方法
// 添加等待者
private Node addWaiter(Node mode) {
// 新生成一個(gè)結(jié)點(diǎn),默認(rèn)為獨(dú)占模式
Node node = new Node(Thread.currentThread(), mode);
// 獲取尾節(jié)點(diǎn)
Node pred = tail;
if (pred != null) { // 尾結(jié)點(diǎn)不為空,即已經(jīng)被初始化,隊(duì)列中是有元素的
// 將新結(jié)點(diǎn)的prev連接到尾結(jié)點(diǎn),也就是將新的節(jié)點(diǎn)加入到隊(duì)列的后面
node.prev = pred;
if (compareAndSetTail(pred, node)) { //為了解決有多個(gè)線程同時(shí)進(jìn)入這個(gè)判斷邏輯生成了多個(gè)節(jié)點(diǎn),保證只能有一個(gè)節(jié)點(diǎn)成為隊(duì)列的尾部。
//
pred.next = node;
return node; // 返回新生成的結(jié)點(diǎn)
}
}
enq(node);
// 如果尾結(jié)點(diǎn)為空(隊(duì)列中沒(méi)有元素),或者是compareAndSetTail操作失敗,則入隊(duì)列
// 對(duì)于公平鎖而言,沒(méi)有競(jìng)爭(zhēng)到隊(duì)列的尾部,那么就一直去競(jìng)爭(zhēng)直到競(jìng)爭(zhēng)到。
return node;
}
enq(node)
假設(shè)此時(shí)有兩個(gè)線程:線程1、線程2進(jìn)來(lái)。兩個(gè)線程都在上面代碼中判斷if (pred != null)都為空則進(jìn)入到enq方法。
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // 兩個(gè)線程進(jìn)來(lái)發(fā)現(xiàn)尾節(jié)點(diǎn)為空,那么就生成一個(gè)空的node節(jié)點(diǎn),再次循環(huán)線程1和線程2進(jìn)來(lái)判斷不為空走else邏輯
if (compareAndSetHead(new Node()))
tail = head; //頭節(jié)點(diǎn)與尾結(jié)點(diǎn)都指向同一個(gè)新生結(jié)點(diǎn)(空節(jié)點(diǎn))
} else {
node.prev = t;
if (compareAndSetTail(t, node)) { //兩個(gè)線程再進(jìn)來(lái)競(jìng)爭(zhēng)尾節(jié)點(diǎn),將屬性指向空的node節(jié)點(diǎn)。
t.next = node;
return t; //假設(shè)線程2競(jìng)爭(zhēng)到尾節(jié)點(diǎn)
}
}
}
}
acquireQueued方法
此時(shí)線程2進(jìn)到這個(gè)方法
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();//獲取當(dāng)前節(jié)點(diǎn)的prev前一個(gè)節(jié)點(diǎn)
if (p == head && tryAcquire(arg)) {//如果前一個(gè)節(jié)點(diǎn)等于head節(jié)點(diǎn),那么說(shuō)明它就是第一個(gè)排隊(duì)的線程
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
//parkAndCheckInterrupt :阻塞
// shouldParkAfterFailedAcquire: 修改上一個(gè)node節(jié)點(diǎn)的waitStatus為-1
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
tryAcquire方法
以ReentrantLock公平鎖為例子,下面是ReentrantLock內(nèi)部lock()方法邏輯:
final void lock() {
acquire(1);
}
//嘗試獲取鎖
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();//可重入條件
if (c == 0) { //表示第一次獲取到鎖
//compareAndSetState(0, acquires)
//利用 cas 將state狀態(tài)改為 1
if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current); // 設(shè)置鎖被當(dāng)前線程獨(dú)占,也就是真正的加鎖操作
return true;
}
}
//重入邏輯
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
public final boolean hasQueuedPredecessors() {
Node t = tail;
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
