隊(duì)列同步器AbstractQueuedSynchronizer(以下簡稱同步器),是用來構(gòu)建鎖或者其他同步組件的基礎(chǔ)框架,它使用了一個int成員變量表示同步狀態(tài),通過內(nèi)置的FIFO隊(duì)列來完成資源獲取線程的排隊(duì)工作,并發(fā)包的作者(Doug Lea)期望它能夠成為實(shí)現(xiàn)大部分同步需求的基礎(chǔ)。
顧名思義,AQS不是一個實(shí)際的類,它是一個抽象類,需要繼承該類并且實(shí)現(xiàn)抽象方法來管理同步狀態(tài)。而管理同步狀態(tài)時不免要對同步狀態(tài)進(jìn)行更改,這就需要使用到以下三個方法:
-
getState()獲取當(dāng)前同步狀態(tài)。 -
setState(int newState)設(shè)置當(dāng)前同步狀態(tài)。 -
compareAndSetState(int expect,int update)使用CAS設(shè)置當(dāng)前狀態(tài),該方法能夠保證狀態(tài) 設(shè)置的原子性。
子類推薦被定義為自定義同步組件的靜態(tài)內(nèi)部 類,同步器自身沒有實(shí)現(xiàn)任何同步接口,它僅僅是定義了若干同步狀態(tài)獲取和釋放的方法來 供自定義同步組件使用,同步器既可以支持獨(dú)占式地獲取同步狀態(tài),也可以支持共享式地獲 取同步狀態(tài),這樣就可以方便實(shí)現(xiàn)不同類型的同步組件(ReentrantLock、 ReentrantReadWriteLock和CountDownLatch等)
同步器是實(shí)現(xiàn)鎖(也可以是任意同步組件)的關(guān)鍵,在鎖的實(shí)現(xiàn)中聚合同步器,利用同步 器實(shí)現(xiàn)鎖的語義。可以這樣理解二者之間的關(guān)系:
- 鎖是面向使用者的,它定義了使用者與鎖交 互的接口(比如可以允許兩個線程并行訪問),隱藏了實(shí)現(xiàn)細(xì)節(jié);
- 同步器面向的是鎖的實(shí)現(xiàn)者, 它簡化了鎖的實(shí)現(xiàn)方式,屏蔽了同步狀態(tài)管理、線程的排隊(duì)、等待與喚醒等底層操作。
鎖和同 步器很好地隔離了使用者和實(shí)現(xiàn)者所需關(guān)注的領(lǐng)域。
同步器接口實(shí)例:
同步器可重寫的方法:
| 方法名稱 | 方法描述 |
|---|---|
boolean tryAcquire(int arg) |
獨(dú)占式獲取同步狀態(tài),實(shí)現(xiàn)該方法需要查詢當(dāng)前 狀態(tài)并判斷同步狀態(tài)是否符合預(yù)期,然后再進(jìn)行 CAS設(shè)置同步狀態(tài) |
boolean tryRelease(int arg) |
獨(dú)占式釋放同步狀態(tài),等待獲取同步狀態(tài)的 線程將有機(jī)會獲取同步狀態(tài) |
int tryAcquireShared(int arg) |
共享式獲取同步狀態(tài),返回大于等于0的值, 表示獲取成功,反之,獲取失敗 |
boolean tryReleaseShared(int arg) |
共享式釋放同步狀態(tài) |
boolean isHeldExclusively() |
當(dāng)前同步器是否在獨(dú)占模式下被線程占用, 一般該方法表示是否被當(dāng)前線程所獨(dú)占 |
實(shí)現(xiàn)自定義同步組件時,將會調(diào)用同步器提供的模板方法,這些(部分)模板方法與描述如下表所示
| 方法名稱 | 方法描述 |
|---|---|
void acquire(int arg) |
獨(dú)占式獲取同步狀態(tài),如果當(dāng)前線程獲取同 步狀態(tài)成功,則由該方法返回 否則,將會進(jìn) 入同步隊(duì)列等待,該方法將會調(diào)用重寫的 tryAcquire(int arg)方法 |
void acquireInterruptibly(int arg) |
與 acquire(int arg)相同,但是該方法響應(yīng)中斷 ,當(dāng)前線程未獲取到 同步狀態(tài)而進(jìn)入同步 隊(duì)列中,如果當(dāng)前線程被中斷,則該方法會 拋出Interruptedexception并返回 |
tryAcquireNanos(int arg, long nanosTimeout) |
在 acquireinterruptibly(int arg)基礎(chǔ)上增加 了超時限制,如果當(dāng)前線程在超時時間內(nèi)沒 有獲取到同步狀態(tài),那么將會返回 false, 如果獲取到了返回true |
void acquireShared(int arg) |
共享式的獲取同步狀態(tài),如果當(dāng)前線程未獲 取到同步狀態(tài),將會進(jìn)入同步隊(duì)列等待,與 獨(dú)占式獲取的主要區(qū)別是在同一時刻可以 有多個線程獲取到同步狀態(tài) |
void acquireSharedInterruptibly(int arg) |
與 acquireShared(int arg)相同,該方法響應(yīng)中斷 |
boolean tryAcquireSharedNanos(int arg, long nanosTimeout) |
在 acquireSharedinterruptibly(int arg) 基礎(chǔ)上增加了超時限制 |
boolean release(int arg) |
獨(dú)占式的釋放同步狀態(tài),該方法會在釋放同 步狀態(tài)之后,將同步隊(duì)列中第一個節(jié)點(diǎn)包含的線程喚醒 |
boolean releaseShared(int arg) |
共享式的釋放同步狀態(tài) |
Collection<Thread> getQueuedThreads() |
獲取等待在同步隊(duì)列上的線程集合 |
同步器提供的模板方法基本上分為3類:
- 獨(dú)占式獲取與釋放同步狀態(tài)
- 共享式獲取與釋放
- 同步狀態(tài)和查詢同步隊(duì)列中的等待線程情況
通過上述模板方法,我們可以將鎖大致分為獨(dú)占式鎖與共享式鎖。
獨(dú)占式鎖
顧名思義,獨(dú)占鎖就是在同一時刻只能有一個線程獲取到鎖,而其他獲取鎖的線程只能 處于同步隊(duì)列中等待,只有獲取鎖的線程釋放了鎖,后繼的線程才能夠獲取鎖。下面我們通過一段代碼來演示一下:
public class Mutex implements Lock {
// 靜態(tài)內(nèi)部類,自定義同步器
private static class Sync extends AbstractQueuedSynchronizer {
// 是否處于占用狀態(tài)
@Override
protected boolean isHeldExclusively() {
return getState() == 1;
}
// 當(dāng)狀態(tài)為0的時候獲取鎖
@Override
public boolean tryAcquire(int acquires) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
// 釋放鎖,將狀態(tài)設(shè)置為0
@Override
protected boolean tryRelease(int releases) {
if (getState() == 0) throw new IllegalMonitorStateException();
setExclusiveOwnerThread(null);
setState(0);
return true;
}
// 返回一個Condition,每個condition都包含了一個condition隊(duì)列
Condition newCondition() {
return new ConditionObject();
}
}
// 僅需要將操作代理到Sync上即可
private final Sync sync = new Sync();
public void lock() {
sync.acquire(1);
}
public boolean tryLock() {
return sync.tryAcquire(1);
}
public void unlock() {
sync.release(1);
}
public Condition newCondition() {
return sync.newCondition();
}
public boolean isLocked() {
return sync.isHeldExclusively();
}
public boolean hasQueuedThreads() {
return sync.hasQueuedThreads();
}
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
}
上述示例中,獨(dú)占鎖Mutex是一個自定義同步組件,它在同一時刻只允許一個線程占有鎖。Mutex中定義了一個靜態(tài)內(nèi)部類,該內(nèi)部類繼承了同步器并實(shí)現(xiàn)了獨(dú)占式獲取和釋放同步狀態(tài)。在tryAcquire(int acquires)方法中,如果經(jīng)過CAS設(shè)置成功(同步狀態(tài)設(shè)置為1),則代表獲取了同步狀態(tài),而在tryRelease(int releases)方法中只是將同步狀態(tài)重置為0。用戶使用Mutex時并不會直接和內(nèi)部同步器的實(shí)現(xiàn)打交道,而是調(diào)用Mutex提供的方法,在Mutex的實(shí)現(xiàn)中,以獲取鎖的lock()方法為例,只需要在方法實(shí)現(xiàn)中調(diào)用同步器的模板方法acquire(int args)即可,當(dāng)前線程調(diào)用該方法獲取同步狀態(tài)失敗后會被加入到同步隊(duì)列中等待,這樣就大大降低了實(shí)現(xiàn)一個可靠自定義同步組件的門檻。
共享式鎖
共享式獲取與獨(dú)占式獲取最主要的區(qū)別在于同一時刻能否有多個線程同時獲取到同步狀態(tài)。以文件的讀寫為例,如果一個程序在對文件進(jìn)行讀操作,那么這一時刻對于該文件的寫操作均被阻塞,而讀操作能夠同時進(jìn)行。寫操作要求對資源的獨(dú)占式訪問,而讀操作可以是共享式訪問,兩種不同的訪問模式在同一時刻對文件或資源的訪問情況如下圖所示:

上圖中左半部分,共享式訪問資源時,其他共享式的訪問均被允許,而獨(dú)占式訪問被阻塞,右半部分是獨(dú)占式訪問資源時,同一時刻其他訪問均被阻塞。
下面我們通過一段代碼來實(shí)際演示共享式同步鎖的使用:
public class TwinsLock implements Lock {
private final Sync sync = new Sync(2);
private static final class Sync extends AbstractQueuedSynchronizer {
Sync(int count) {
if (count <= 0) {
throw new IllegalArgumentException("count must large than zero.");
}
setState(count);
}
public int tryAcquireShared(int reduceCount) {
for (; ; ) {
int current = getState();
int newCount = current - reduceCount;
if (newCount < 0 || compareAndSetState(current, newCount)) {
return newCount;
}
}
}
public boolean tryReleaseShared(int returnCount) {
for (; ; ) {
int current = getState();
int newCount = current + returnCount;
if (compareAndSetState(current, newCount)) {
return true;
}
}
}
}
public void lock() {
sync.acquireShared(1);
}
public void unlock() {
sync.releaseShared(1);
}
// 其他接口方法略
}
在上述示例中,TwinsLock實(shí)現(xiàn)了Lock接口,提供了面向使用者的接口,使用者調(diào)用lock()方法獲取鎖,隨后調(diào)用unlock()方法釋放鎖,而同一時刻只能有兩個線程同時獲取到鎖。TwinsLock同時包含了一個自定義同步器Sync,而該同步器面向線程訪問和同步狀態(tài)控制。以共享式獲取同步狀態(tài)為例:同步器會先計算出獲取后的同步狀態(tài),然后通過CAS確保狀態(tài)的正確設(shè)置,當(dāng)tryAcquireShared(int reduceCount)方法返回值大于等于0時,當(dāng)前線程才獲取同步狀態(tài),對于上層的TwinsLock而言,則表示當(dāng)前線程獲得了鎖。
下面編寫一個測試來驗(yàn)證TwinsLock是否能按照預(yù)期工作。在測試用例中,定義了工作者線程Worker,該線程在執(zhí)行過程中獲取鎖,當(dāng)獲取鎖之后使當(dāng)前線程睡眠0.5秒(并不釋放鎖),隨后打印當(dāng)前線程名稱,最后再次睡眠1秒并釋放鎖,測試用例如下:
@Test
public void twinsLockTest() throws InterruptedException {
final Lock lock = new TwinsLock();
class Worker extends Thread {
public void run() {
while (true) {
// System.out.println("before - " + Thread.currentThread().getName());
lock.lock();
try {
Thread.sleep(1000);
System.err.println("got - " + Thread.currentThread().getName());
Thread.sleep(1000);
} catch (InterruptedException e) {
} finally {
lock.unlock();
}
// System.out.println("after - " + Thread.currentThread().getName());
}
}
}
// 啟動10個線程
for (int i = 0; i < 10; i++) {
Worker w = new Worker();
w.start();
}
// 每隔1秒換行
for (int i = 0; i < 10; i++) {
Thread.sleep(1000);
System.out.println();
}
}
運(yùn)行該測試用例,可以看到線程名稱成對輸出,也就是在同一時刻只有兩個線程能夠獲 取到鎖,這表明TwinsLock可以按照預(yù)期正確工作。
啟動運(yùn)行之后會看到基本獲取到共享資源的線程都是固定的那兩個,這是因?yàn)樵跊]獲取到共享資源時該線程會被加入AQS的等待隊(duì)列,在釋放資源之后再被喚醒重新競爭資源,而由于之前等待的線程需要被喚醒才能重新競爭共享資源,而釋放資源的線程由于不需要喚醒,所以大概率會比其他線程優(yōu)先再次獲取到鎖,可以將before和after打印語句取消注釋在運(yùn)行,這樣可以解決這個問題。
對于AQS隊(duì)列同步器的實(shí)現(xiàn)分析我們放到下一節(jié)去分析,本節(jié)主要簡單的講解和舉例AQS對于共享式和獨(dú)占式鎖的實(shí)現(xiàn)