Java并發(fā)編程核心在于java.concurrent.util包,而juc當(dāng)中的大多數(shù)同步器實(shí)現(xiàn)都是圍繞著共同的基礎(chǔ)行為,比如等待隊(duì)列、條件隊(duì)列、獨(dú)占獲取、共享獲取等,而這個(gè)行為的抽象就是基于AbstractQueuedSynchronizer,簡稱AQS。
AQS具備特性
?阻塞等待隊(duì)列
?共享/獨(dú)占
?公平/非公平
?可重入
?允許中斷
可以說,AQS貫穿了整個(gè)并發(fā)包設(shè)計(jì),是juc的核心,對于并發(fā)編程實(shí)現(xiàn)的理解至關(guān)重要。
AQS是什么
JDK源碼對AQS有十分具體的解釋,下面這段英文摘自java.util.concurrent.locks包下的AbstractQueuedSynchronizer.java源碼文件。
*Provides a framework for implementing blocking locks and related
* synchronizers (semaphores, events, etc) that rely on
* first-in-first-out (FIFO) wait queues. This class is designed to
* be a useful basis for most kinds of synchronizers that rely on a
* single atomic {@code int} value to represent state. Subclasses
* must define the protected methods that change this state, and which
* define what that state means in terms of this object being acquired
* or released. Given these, the other methods in this class carry
* out all queuing and blocking mechanics. Subclasses can maintain
* other state fields, but only the atomically updated {@code int}
* value manipulated using methods {@link #getState}, {@link
* #setState} and {@link #compareAndSetState} is tracked with respect
* to synchronization.
*
* <p>Subclasses should be defined as non-public internal helper
* classes that are used to implement the synchronization properties
* of their enclosing class. Class
* {@code AbstractQueuedSynchronizer} does not implement any
* synchronization interface. Instead it defines methods such as
* {@link #acquireInterruptibly} that can be invoked as
* appropriate by concrete locks and related synchronizers to
* implement their public methods.
*
* <p>This class supports either or both a default <em>exclusive</em>
* mode and a <em>shared</em> mode. When acquired in exclusive mode,
* attempted acquires by other threads cannot succeed. Shared mode
* acquires by multiple threads may (but need not) succeed. This class
* does not "understand" these differences except in the
* mechanical sense that when a shared mode acquire succeeds, the next
* waiting thread (if one exists) must also determine whether it can
* acquire as well. Threads waiting in the different modes share the
* same FIFO queue. Usually, implementation subclasses support only
* one of these modes, but both can come into play for example in a
* {@link ReadWriteLock}. Subclasses that support only exclusive or
* only shared modes need not define the methods supporting the unused mode.
*
* <p>This class defines a nested {@link ConditionObject} class that
* can be used as a {@link Condition} implementation by subclasses
* supporting exclusive mode for which method {@link
* #isHeldExclusively} reports whether synchronization is exclusively
* held with respect to the current thread, method {@link #release}
* invoked with the current {@link #getState} value fully releases
* this object, and {@link #acquire}, given this saved state value,
* eventually restores this object to its previous acquired state. No
* {@code AbstractQueuedSynchronizer} method otherwise creates such a
* condition, so if this constraint cannot be met, do not use it. The
* behavior of {@link ConditionObject} depends of course on the
* semantics of its synchronizer implementation.
*
* <p>This class provides inspection, instrumentation, and monitoring
* methods for the internal queue, as well as similar methods for
* condition objects. These can be exported as desired into classes
* using an {@code AbstractQueuedSynchronizer} for their
* synchronization mechanics.
英語好的同學(xué)可以直接閱讀上述源碼作者Doug Lea給出的注解,這里為了讓更多人更好的理解,給出英文的大致含義。
AQS同步器是用來構(gòu)建鎖和其他同步組件的基礎(chǔ)框架,它的實(shí)現(xiàn)主要依賴一個(gè)int成員變量來表示同步狀態(tài)以及通過一個(gè)FIFO隊(duì)列構(gòu)成等待隊(duì)列。它的子類必須重寫AQS的幾個(gè)protected修飾的用來改變同步狀態(tài)的方法,其他方法主要是實(shí)現(xiàn)了排隊(duì)和阻塞機(jī)制。狀態(tài)的更新使用getState,setState以及compareAndSetState這三個(gè)方法。
子類被推薦定義為自定義同步組件的非public靜態(tài)內(nèi)部類,同步器自身沒有實(shí)現(xiàn)任何同步接口,它僅僅是定義了若干同步狀態(tài)的獲取和釋放方法來供自定義同步組件的使用。
同步器既支持獨(dú)占式獲取同步狀態(tài),也可以支持共享式獲取同步狀態(tài),這樣就可以方便的實(shí)現(xiàn)不同類型的同步組件。
同步器定義了一個(gè)嵌套的條件對象類COnditionObject,可以被子類使用作為Contidition用于支持獨(dú)占模式。
同步器是實(shí)現(xiàn)鎖(也可以是任意同步組件)的關(guān)鍵,在鎖的實(shí)現(xiàn)中聚合同步器,利用同步器實(shí)現(xiàn)鎖的語義??梢赃@樣理解二者的關(guān)系:鎖是面向使用者,它定義了使用者與鎖交互的接口,隱藏了實(shí)現(xiàn)細(xì)節(jié);同步器是面向鎖的實(shí)現(xiàn)者,它簡化了鎖的實(shí)現(xiàn)方式,屏蔽了同步狀態(tài)的管理,線程的排隊(duì),等待和喚醒等底層操作。鎖和同步器很好的隔離了使用者和實(shí)現(xiàn)者所需關(guān)注的領(lǐng)域。
AQS的模板方法設(shè)計(jì)模式
AQS的設(shè)計(jì)采用了模板方法的設(shè)計(jì)模式,它將一些方法開放給子類進(jìn)行重寫,根據(jù)java多態(tài)性,同步器給同步組件所提供模板方法在調(diào)用這些被重寫的方法時(shí),實(shí)際調(diào)用的是子類的實(shí)現(xiàn)。

以ReentrantLock為例舉個(gè)例子,AQS中需要重寫的方法tryAcquire:
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
ReentrantLock中NonfairSync(繼承AQS)會(huì)重寫該方法為:
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
而AQS中的模板方法acquire():
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
會(huì)調(diào)用tryAcquire方法,而此時(shí)當(dāng)繼承AQS的NonfairSync調(diào)用模板方法acquire時(shí)就會(huì)調(diào)用已經(jīng)被NonfairSync重寫的tryAcquire方法。這就是使用AQS的方式,在弄懂這點(diǎn)后會(huì)lock的實(shí)現(xiàn)理解有很大的提升??梢詺w納總結(jié)為這么幾點(diǎn):
- 同步組件(這里不僅僅指鎖,還包括CountDownLatch等)的實(shí)現(xiàn)依賴于同步器AQS,在同步組件實(shí)現(xiàn)中,使用AQS的方式被推薦為定義繼承AQS的靜態(tài)內(nèi)部類;
- AQS采用模板方法進(jìn)行設(shè)計(jì),AQS的protected修飾的方法需要由繼承AQS的子類進(jìn)行重寫實(shí)現(xiàn),當(dāng)調(diào)用AQS的子類的方法時(shí)就會(huì)調(diào)用被重寫的方法;
- AQS負(fù)責(zé)同步狀態(tài)的管理,線程的排隊(duì),等待和喚醒這些底層操作,而Lock等同步組件主要專注于實(shí)現(xiàn)同步語義;
- 在重寫AQS的方式時(shí),使用AQS提供的
getState(),setState(),compareAndSetState()方法進(jìn)行修改同步狀態(tài)
AQS可重寫的方法如下圖(摘自《java并發(fā)編程的藝術(shù)》一書):

在實(shí)現(xiàn)同步組件時(shí)AQS提供的模板方法如下圖:

AQS提供的模板方法可以分為3類:
- 獨(dú)占式獲取與釋放同步狀態(tài);
- 共享式獲取與釋放同步狀態(tài);
- 查詢同步隊(duì)列中等待線程情況;
同步組件通過AQS提供的模板方法實(shí)現(xiàn)自己的同步語義。
一個(gè)例子
下面使用一個(gè)例子來進(jìn)一步理解下AQS的使用。這個(gè)例子也是來源于AQS源碼中的example。
class Mutex implements Lock, java.io.Serializable {
// Our internal helper class
// 繼承AQS的靜態(tài)內(nèi)存類
// 重寫方法
private static class Sync extends AbstractQueuedSynchronizer {
// Reports whether in locked state
//判斷同步器是否處于鎖定狀態(tài)
protected boolean isHeldExclusively() {
return getState() == 1;
}
// Acquires the lock if state is zero
//只有當(dāng)state值為0,也就是沒有線程獲取著該鎖,才能拿到鎖的使用權(quán)
public boolean tryAcquire(int acquires) {
assert acquires == 1; // Otherwise unused
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
// Releases the lock by setting state to zero
protected boolean tryRelease(int releases) {
assert releases == 1; // Otherwise unused
if (getState() == 0) throw new IllegalMonitorStateException();
setExclusiveOwnerThread(null);
setState(0);
return true;
}
// Provides a Condition
Condition newCondition() {
return new ConditionObject();
}
// Deserializes properly
private void readObject(ObjectInputStream s)
throws IOException, ClassNotFoundException {
s.defaultReadObject();
setState(0); // reset to unlocked state
}
}
// The sync object does all the hard work. We just forward to it.
private final Sync sync = new Sync();
//使用同步器的模板方法實(shí)現(xiàn)自己的同步語義
public void lock() {
sync.acquire(1);
}
public boolean tryLock() {
return sync.tryAcquire(1);
}
public void unlock() {
sync.release(1);
}
public Condition newCondition() {
return sync.newCondition();
}
public boolean isLocked() {
return sync.isHeldExclusively();
}
public boolean hasQueuedThreads() {
return sync.hasQueuedThreads();
}
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
}
MutexDemo:
public class MutextDemo {
private static Mutex mutex = new Mutex();
public static void main(String[] args) {
for (int i = 0; i < 10; i++) {
Thread thread = new Thread(() -> {
mutex.lock();
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
mutex.unlock();
}
});
thread.start();
}
}
}
執(zhí)行情況:

上面的這個(gè)例子實(shí)現(xiàn)了獨(dú)占鎖的語義,在同一個(gè)時(shí)刻只允許一個(gè)線程占有鎖。MutexDemo新建了10個(gè)線程,分別睡眠3s。從執(zhí)行情況也可以看出來當(dāng)前Thread-6正在執(zhí)行占有鎖而其他Thread-7,Thread-8等線程處于WAIT狀態(tài)。按照推薦的方式,Mutex定義了一個(gè)繼承AQS的靜態(tài)內(nèi)部類Sync,并且重寫了AQS的tryAcquire等等方法,而對state的更新也是利用了setState(),getState(),compareAndSetState()這三個(gè)方法。在實(shí)現(xiàn)lock接口中的方法也只是調(diào)用了AQS提供的模板方法(因?yàn)镾ync繼承AQS)。
從這個(gè)例子就可以很清楚的看出來,在同步組件的實(shí)現(xiàn)上主要是利用了AQS,而AQS“屏蔽”了同步狀態(tài)的修改,線程排隊(duì)等底層實(shí)現(xiàn),通過AQS的模板方法可以很方便的給同步組件的實(shí)現(xiàn)者進(jìn)行調(diào)用。而針對用戶來說,只需要調(diào)用同步組件提供的方法來實(shí)現(xiàn)并發(fā)編程即可。同時(shí)在新建一個(gè)同步組件時(shí)需要把握的兩個(gè)關(guān)鍵點(diǎn)是:
- 實(shí)現(xiàn)同步組件時(shí)推薦定義繼承AQS的靜態(tài)內(nèi)存類,并重寫需要的protected修飾的方法;
- 同步組件語義的實(shí)現(xiàn)依賴于AQS的模板方法,而AQS模板方法又依賴于被AQS的子類所重寫的方法。
通俗點(diǎn)說,因?yàn)锳QS整體設(shè)計(jì)思路采用模板方法設(shè)計(jì)模式,同步組件以及AQS的功能實(shí)際上別切分成各自的兩部分:
同步組件實(shí)現(xiàn)者的角度:
通過可重寫的方法:獨(dú)占式: tryAcquire()(獨(dú)占式獲取同步狀態(tài)),tryRelease()(獨(dú)占式釋放同步狀態(tài));共享式 :tryAcquireShared()(共享式獲取同步狀態(tài)),tryReleaseShared()(共享式釋放同步狀態(tài));告訴AQS怎樣判斷當(dāng)前同步狀態(tài)是否成功獲取或者是否成功釋放。同步組件專注于對當(dāng)前同步狀態(tài)的邏輯判斷,從而實(shí)現(xiàn)自己的同步語義。這句話比較抽象,舉例來說,上面的Mutex例子中通過tryAcquire方法實(shí)現(xiàn)自己的同步語義,在該方法中如果當(dāng)前同步狀態(tài)為0(即該同步組件沒被任何線程獲?。?,當(dāng)前線程可以獲取同時(shí)將狀態(tài)更改為1返回true,否則,該組件已經(jīng)被線程占用返回false。很顯然,該同步組件只能在同一時(shí)刻被線程占用,Mutex專注于獲取釋放的邏輯來實(shí)現(xiàn)自己想要表達(dá)的同步語義。
AQS的角度
而對AQS來說,只需要同步組件返回的true和false即可,因?yàn)锳QS會(huì)對true和false會(huì)有不同的操作,true會(huì)認(rèn)為當(dāng)前線程獲取同步組件成功直接返回,而false的話就AQS也會(huì)將當(dāng)前線程插入同步隊(duì)列等一系列的方法。
總的來說,同步組件通過重寫AQS的方法實(shí)現(xiàn)自己想要表達(dá)的同步語義,而AQS只需要同步組件表達(dá)的true和false即可,AQS會(huì)針對true和false不同的情況做不同的處理,至于底層實(shí)現(xiàn),可以看這篇文章。