引言
在上篇文章深入剖析并發(fā)之AQS獨(dú)占鎖&重入鎖(ReetrantLock)及Condition實(shí)現(xiàn)原理中我們?cè)赗eetrantLock鎖分析了AQS獨(dú)占模式的實(shí)現(xiàn)原理,本章則準(zhǔn)備從Semaphore信號(hào)量的角度出發(fā)一探AQS共享模式的具體實(shí)現(xiàn)。共享模式與獨(dú)占模式區(qū)別在于:共享模式下允許多條線程同時(shí)獲取鎖資源,而在之前分析的獨(dú)占模式中,在同一時(shí)刻只允許一條線程持有鎖資源。
一、快速認(rèn)識(shí)Semaphore信號(hào)量及實(shí)戰(zhàn)
Semaphore信號(hào)量是java.util.concurrent(JUC)包下的一個(gè)并發(fā)工具類,可以用來控制同一時(shí)刻訪問臨界資源(共享資源)的線程數(shù),以確保訪問臨界資源的線程能夠正確、合理的使用公共資源。而其內(nèi)部則于ReetrantLock一樣,都是通過直接或間接的調(diào)用AQS框架的方法實(shí)現(xiàn)。在Semaphore中存在一個(gè)“許可”的概念:
在初始化Semaphore信號(hào)量需要為這個(gè)許可傳入一個(gè)數(shù)值,該數(shù)值表示表示同一時(shí)刻可以訪問臨界資源的最大線程數(shù),也被稱為許可集。一條線程想要訪問臨界資源則需要先執(zhí)行
acquire()獲取一個(gè)許可,如果線程在獲取時(shí)許可集已經(jīng)被分配完了,那么該線程則會(huì)進(jìn)入阻塞等待狀態(tài),直至有其他持有許可的線程釋放后才有可能獲取到許可。當(dāng)線程訪問完成臨界資源后則需要執(zhí)行release()方法釋放已獲取的許可。
其實(shí)通過如上這段描述,我們不難發(fā)現(xiàn),Semaphore信號(hào)量里面的“許可”概念與前面我們文章中,分析的互斥鎖中的“同步狀態(tài)標(biāo)識(shí)”有著異曲同工之妙,其實(shí)也就是我們所談的“鎖資源”。下面我們可以簡(jiǎn)單看看Semaphore類中所提供的方法:
// 調(diào)用該方法后線程會(huì)從許可集中嘗試獲取一個(gè)許可
public void acquire()
// 線程調(diào)用該方法時(shí)會(huì)釋放已獲取的許可
public void release()
// Semaphore構(gòu)造方法:permits→許可集數(shù)量
Semaphore(int permits)
// Semaphore構(gòu)造方法:permits→許可集數(shù)量,fair→公平與非公平
Semaphore(int permits, boolean fair)
// 從信號(hào)量中獲取許可,該方法不響應(yīng)中斷
void acquireUninterruptibly()
// 返回當(dāng)前信號(hào)量中未被獲取的許可數(shù)
int availablePermits()
// 獲取并返回當(dāng)前信號(hào)量中立即未被獲取的所有許可
int drainPermits()
// 返回等待獲取許可的所有線程Collection集合
protected Collection<Thread> getQueuedThreads();
// 返回等待獲取許可的線程估計(jì)數(shù)量
int getQueueLength()
// 查詢是否有線程正在等待獲取當(dāng)前信號(hào)量中的許可
boolean hasQueuedThreads()
// 返回當(dāng)前信號(hào)量的公平類型,如為公平鎖返回true,非公平鎖為false
boolean isFair()
// 獲取當(dāng)前信號(hào)量中一個(gè)許可,當(dāng)沒有許可可用時(shí)直接返回false不阻塞線程
boolean tryAcquire()
// 在給定時(shí)間內(nèi)獲取當(dāng)前信號(hào)量中一個(gè)許可,超時(shí)還未獲取成功則返回false
boolean tryAcquire(long timeout, TimeUnit unit)
如上便是Semaphore信號(hào)量提供的一些主要方法,下面我們可以上個(gè)簡(jiǎn)單小案例演示,需求如下:
現(xiàn)在項(xiàng)目中有個(gè)需求,每晚需要長(zhǎng)時(shí)間處理大量的Excel表數(shù)據(jù)與數(shù)據(jù)庫中數(shù)據(jù)對(duì)賬請(qǐng)求,由于文件讀取屬于IO密集型任務(wù),我們可以使用多線程的方式優(yōu)化,加速處理速度。但是在該項(xiàng)目中因?yàn)檫€有其他業(yè)務(wù)要處理,為了保證整體性能,所以對(duì)于該業(yè)務(wù)的實(shí)現(xiàn)最多只能使用三個(gè)數(shù)據(jù)庫連接對(duì)象。因?yàn)槿绻?dāng)前業(yè)務(wù)線程同一時(shí)刻獲取的數(shù)據(jù)庫連接數(shù)量過多,會(huì)導(dǎo)致其他業(yè)務(wù)線程需要操作數(shù)據(jù)庫時(shí)獲取不到連接對(duì)象阻塞(因?yàn)閿?shù)據(jù)庫連接對(duì)象與線程對(duì)象一樣數(shù)據(jù)珍惜資源/資源有限),從而引發(fā)整體程序堆積大量客戶端請(qǐng)求導(dǎo)致系統(tǒng)整體癱瘓。這時(shí)我們就需要控制同一時(shí)刻最多只有三條線程拿到數(shù)據(jù)庫連接進(jìn)行操作,此時(shí)就可以使用Semaphore做流量控制。
public class SemaphoreDemo {
public static void main(String[] args) {
// 自定義線程池(后續(xù)文章會(huì)詳細(xì)分析到)
// 環(huán)境:四核四線程CPU 任務(wù)阻塞系數(shù)0.9
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
4*2, 40,
30, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(1024*10),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy());
// 設(shè)置信號(hào)量同一時(shí)刻最大線程數(shù)為3
final Semaphore semaphore = new Semaphore(3);
// 模擬100個(gè)對(duì)賬請(qǐng)求
for (int index = 0; index < 100; index++) {
final int serial = index;
threadPool.execute(()->{
try {
// 使用acquire()獲取許可
semaphore.acquire();
System.out.println(Thread.currentThread().getName() +
"線程成功獲取許可!請(qǐng)求序號(hào): " + serial);
// 模擬數(shù)據(jù)庫IO
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 臨界資源訪問結(jié)束后釋放許可
semaphore.release();
}
});
}
// 關(guān)閉線程池資源
threadPool.shutdown();
}
}
上述代碼中,在創(chuàng)建Semaphore信號(hào)量對(duì)象時(shí)為該對(duì)象初始化了三個(gè)許可,也就意味著在同一時(shí)刻允許三條線程同時(shí)訪問臨界資源。線程在訪問臨界資源之前,需要使用acquire()先成功獲取一個(gè)許可,才能訪問臨界資源。如果一條線程獲取許可,該信號(hào)量對(duì)象的許可集已經(jīng)被分配完時(shí),新來的線程需進(jìn)入等待狀態(tài)。之前獲取許可成功的線程在操作完成之后需執(zhí)行release()方法釋放已獲取的許可。我們執(zhí)行如上案例則可看到執(zhí)行結(jié)果幾乎每隔一千毫秒會(huì)出現(xiàn)三條線程同時(shí)訪問,如下:
/*
第一秒:程序運(yùn)行初
pool-1-thread-1線程成功獲取許可!請(qǐng)求序號(hào): 0
pool-1-thread-2線程成功獲取許可!請(qǐng)求序號(hào): 1
pool-1-thread-3線程成功獲取許可!請(qǐng)求序號(hào): 2
第二秒:程序運(yùn)行1000ms后
pool-1-thread-4線程成功獲取許可!請(qǐng)求序號(hào): 3
pool-1-thread-5線程成功獲取許可!請(qǐng)求序號(hào): 4
pool-1-thread-6線程成功獲取許可!請(qǐng)求序號(hào): 5
第三秒:程序運(yùn)行2000ms后
pool-1-thread-7線程成功獲取許可!請(qǐng)求序號(hào): 6
pool-1-thread-8線程成功獲取許可!請(qǐng)求序號(hào): 7
pool-1-thread-2線程成功獲取許可!請(qǐng)求序號(hào): 8
第四秒:程序運(yùn)行3000ms后
........
*/
如上便是一個(gè)簡(jiǎn)單使用Demo,總體看來關(guān)于Semaphore信號(hào)量的用法還是比較簡(jiǎn)單的。不過我們也在前面提到過這么一句話:
Semaphore信號(hào)量里的“許可”概念與前面我們文章中分析的互斥鎖的“同步狀態(tài)標(biāo)識(shí)”有著異曲同工之妙。
那我們能否使用Semaphore信號(hào)量實(shí)現(xiàn)一把獨(dú)占鎖呢?答案也是肯定的,可以。我們只需要在創(chuàng)建信號(hào)量對(duì)象時(shí),只給許可集分配一個(gè)數(shù)量即可,如下:
final Semaphore semaphore = new Semaphore(1);
二、Semaphore信號(hào)量中AQS的共享模式實(shí)現(xiàn)
Semaphore信號(hào)量其實(shí)與我們上篇文章所分析的ReetrantLock類結(jié)構(gòu)大致相同,其內(nèi)部存在繼承自AbstractQueuedSynchronizer內(nèi)部Sync類以及它的兩個(gè)子類:FairSync公平鎖類和NofairSync非公平鎖類,從這我們也可以看出,Semaphore的內(nèi)部實(shí)現(xiàn)其實(shí)與ReetrantLock一樣都是基于AQS組件實(shí)現(xiàn)的。在上一篇文章中我們也曾提到,AQS設(shè)計(jì)的初衷并不打算直接作為調(diào)用類對(duì)外暴露服務(wù),而只是作為并發(fā)包基礎(chǔ)組件,為其他并發(fā)工具類提供基礎(chǔ)設(shè)施,如維護(hù)同步隊(duì)列、控制/修改同步狀態(tài)等。具體的獲取鎖和釋放鎖的邏輯則交給子類自己去實(shí)現(xiàn),從而也能最大程度的保留框架的靈活性。因此無論是Semaphore還是ReetrantLock都需要獨(dú)自實(shí)現(xiàn)tryAcquireShared(int arg)獲取鎖方法以及tryReleaseShared(int arg)釋放鎖方法。AQS總體類圖關(guān)系如下:

如上圖,Semaphore與ReetrantLock的結(jié)構(gòu)大致相同,而實(shí)現(xiàn)思路也大致相同,獲取鎖(許可)的方法tryAcquireShared(arg)分別由兩個(gè)子類FairSync和NofairSync實(shí)現(xiàn),因?yàn)楣芥i和非公平鎖的加鎖方式畢竟存在些許不同,而釋放鎖tryReleaseShared(arg)的邏輯則交由Sync實(shí)現(xiàn),因?yàn)獒尫挪僮鞫际窍嗤?,因此放在父類Sync中實(shí)現(xiàn)自然是最好的方式。下面我們就從Semaphore源碼的角度分析AQS共享模式的具體實(shí)現(xiàn)原理,我們先從非公平鎖的獲取鎖實(shí)現(xiàn)開始。
2.1、AQS共享模式之Semaphore的NofairSync非公平鎖實(shí)現(xiàn)
我們?cè)趧?chuàng)建Semaphore對(duì)象時(shí),也和ReetrantLock一樣手動(dòng)選擇公平鎖和非公平鎖:
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
通過Semaphore的構(gòu)造函數(shù)我們不難發(fā)現(xiàn),如果在我們創(chuàng)建時(shí)不選擇聲明公平類型,Semaphore默認(rèn)創(chuàng)建的是非公平鎖類型,NonfairSync構(gòu)造如下:
static final class NonfairSync extends Sync {
// 構(gòu)造函數(shù):將給定的許可數(shù)permits傳給父類同步狀態(tài)標(biāo)識(shí)state
NonfairSync(int permits) {
super(permits);
}
// 釋放鎖的方法實(shí)現(xiàn)則是直接調(diào)用父類Sync的釋放鎖方法
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
}
從如上源碼中,我們可以得知Semaphore中的非公平鎖NonfairSync類的構(gòu)造函數(shù)是基于調(diào)用父類Sync構(gòu)造函數(shù)完成的,而在創(chuàng)建Semaphore對(duì)象時(shí)傳入的許可數(shù)permits最終則會(huì)傳遞給AQS同步器的同步狀態(tài)標(biāo)識(shí)state,如下:
// 父類 - Sync類構(gòu)造函數(shù)
Sync(int permits) {
setState(permits); // 調(diào)用AQS內(nèi)部的set方法
}
// AQS(AbstractQueuedSynchronizer)同步器
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer {
// 同步狀態(tài)標(biāo)識(shí)
private volatile int state;
protected final int getState() {
return state;
}
protected final void setState(int newState) {
state = newState;
}
// 對(duì)state變量進(jìn)行CAS操作
protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
}
從上述分析中可知,Semaphore對(duì)象創(chuàng)建時(shí)傳入的許可數(shù)permits,實(shí)則其實(shí)最終是在對(duì)AQS內(nèi)部的state進(jìn)行初始化。初始化完成后,state代表著當(dāng)前信號(hào)量對(duì)象的可用許可數(shù)。
2.1.1、信號(hào)量中非公平鎖NonfairSync獲取許可/鎖實(shí)現(xiàn)
我們?cè)谑褂肧emaphore時(shí)獲取鎖是調(diào)用Semaphore.acquire()方法,,調(diào)用該方法的線程會(huì)開始獲取鎖/許可,嘗試對(duì)permits/state進(jìn)行CAS加一,CAS成功則代表獲取成功。下面我們來分析一下Semaphore獲取許可的方法acquire()的具體實(shí)現(xiàn),源碼如下:
// Semaphore類 → acquire()方法
public void acquire() throws InterruptedException {
// Sync類繼承AQS,此處直接調(diào)用AQS內(nèi)部的acquireSharedInterruptibly()方法
sync.acquireSharedInterruptibly(1);
}
// AbstractQueuedSynchronizer類 → acquireSharedInterruptibly()方法
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
// 判斷是否出現(xiàn)線程中斷信號(hào)(標(biāo)志)
if (Thread.interrupted())
throw new InterruptedException();
// 如果tryAcquireShared(arg)執(zhí)行結(jié)果不小于0,則線程獲取同步狀態(tài)成功
if (tryAcquireShared(arg) < 0)
// 未獲取成功加入同步隊(duì)列阻塞等待
doAcquireSharedInterruptibly(arg);
}
信號(hào)量獲取許可的方法acquire()最終是通過Sync對(duì)象調(diào)用AQS內(nèi)部的acquireSharedInterruptibly()方法完成的,而acquireSharedInterruptibly()在獲取同步狀態(tài)標(biāo)識(shí)的過程中是可以響應(yīng)線程中斷操作的,如果該操作沒有沒中斷,則首先調(diào)用tryAcquireShared(arg)嘗試獲取一個(gè)許可數(shù),獲取成功則返回執(zhí)行業(yè)務(wù),方法結(jié)束。如果獲取失敗,則調(diào)用doAcquireSharedInterruptibly(arg)將當(dāng)前線程加入同步隊(duì)列阻塞等待。不過值得我們注意的是:tryAcquireShared(arg)方法是AQS提供的模板方法,并沒有提供具體實(shí)現(xiàn),而是把具體實(shí)現(xiàn)的邏輯交由子類完成,我們先看看信號(hào)量中非公平鎖NonfairSync類的實(shí)現(xiàn):
// Semaphore類 → NofairSync內(nèi)部類 → tryAcquireShared()方法
protected int tryAcquireShared(int acquires) {
// 調(diào)用了父類Sync中的實(shí)現(xiàn)方法
return nonfairTryAcquireShared(acquires);
}
// Syn類 → nonfairTryAcquireShared()方法
abstract static class Sync extends AbstractQueuedSynchronizer {
final int nonfairTryAcquireShared(int acquires) {
// 開啟自旋死循環(huán)
for (;;) {
int available = getState();
int remaining = available - acquires;
// 判斷信號(hào)量中可用許可數(shù)是否已<0或者CAS執(zhí)行是否成功
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
}
nonfairTryAcquireShared(acquires)方法首先獲取到state值后,減去一得到remaining值,如果不小于0則代表著當(dāng)前信號(hào)量中還存在可用許可,當(dāng)前線程開始嘗試cas更新state值,cas成功則代表獲取同步狀態(tài)成功,返回remaining值。反之,如果remaining值小于0則代表著信號(hào)量中的許可數(shù)已被其他線程獲取,目前不存在可用許可數(shù),直接返回小于0的remaining值,nonfairTryAcquireShared(acquires)方法執(zhí)行結(jié)束,回到AQS的acquireSharedInterruptibly()方法。當(dāng)返回的remaining值小于0時(shí),if(tryAcquireShared(arg)<0)條件成立,進(jìn)入if執(zhí)行doAcquireSharedInterruptibly(arg)方法將當(dāng)前線程加入同步隊(duì)列阻塞,等待其他線程釋放同步狀態(tài)。線程入列方法如下:
// AbstractQueuedSynchronizer類 → doAcquireSharedInterruptibly()方法
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
// 創(chuàng)建節(jié)點(diǎn)狀態(tài)為Node.SHARED共享模式的節(jié)點(diǎn)并將其加入同步隊(duì)列
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
// 開啟自旋操作
for (;;) {
final Node p = node.predecessor();
// 判斷前驅(qū)節(jié)點(diǎn)是否為head
if (p == head) {
// 嘗試獲取同步狀態(tài)state
int r = tryAcquireShared(arg);
// 如果r不小于0說明獲取同步狀態(tài)成功
if (r >= 0) {
// 將當(dāng)前線程結(jié)點(diǎn)設(shè)置為頭節(jié)點(diǎn)并喚醒后繼節(jié)點(diǎn)線程
setHeadAndPropagate(node, r);
p.next = null; // 置空方便GC
failed = false;
return;
}
}
// 調(diào)整同步隊(duì)列中node節(jié)點(diǎn)的狀態(tài)并判斷是否應(yīng)該被掛起
// 并判斷是否存在中斷信號(hào),如果需要中斷直接拋出異常結(jié)束執(zhí)行
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
// 結(jié)束該節(jié)點(diǎn)線程的請(qǐng)求
cancelAcquire(node);
}
}
// AbstractQueuedSynchronizer類 → setHeadAndPropagate()方法
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // 獲取同步隊(duì)列中原本的head頭節(jié)點(diǎn)
setHead(node); // 將傳入的node節(jié)點(diǎn)設(shè)置為頭節(jié)點(diǎn)
/*
* propagate=剩余可用許可數(shù),h=舊的head節(jié)點(diǎn)
* h==null,(h=head)==null:
* 非空判斷的標(biāo)準(zhǔn)寫法,避免原本head以及新的頭節(jié)點(diǎn)node為空
* 如果當(dāng)前信號(hào)量對(duì)象中剩余可用許可數(shù)大于0或者
* 原本頭節(jié)點(diǎn)h或者新的頭節(jié)點(diǎn)node不是結(jié)束狀態(tài)則喚醒后繼節(jié)點(diǎn)線程
*
* 寫兩個(gè)if的原因在于避免造成不必要的喚醒,因?yàn)楹苡锌赡軉拘蚜撕罄m(xù)
* 節(jié)點(diǎn)的線程之后,還沒有線程釋放許可/鎖,從而導(dǎo)致再次陷入阻塞
*/
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
// 避免傳入的node為同步隊(duì)列的唯一節(jié)點(diǎn),
// 因?yàn)殛?duì)列中如果只存在node一個(gè)節(jié)點(diǎn),那么后驅(qū)節(jié)點(diǎn)s必然為空
if (s == null || s.isShared())
doReleaseShared(); // 喚醒后繼節(jié)點(diǎn)
}
}
在doAcquireSharedInterruptibly(arg)方法中總共做了三件事:
- 一、創(chuàng)建一個(gè)狀態(tài)為Node.SHARED共享模式的節(jié)點(diǎn),并通過addWaiter()加入隊(duì)列
- 二、加入成功后開啟自旋,判斷前驅(qū)節(jié)點(diǎn)是否為head,是則嘗試獲取同步狀態(tài)標(biāo)識(shí),獲取成功后,將自己設(shè)置為head節(jié)點(diǎn),如果可用許可數(shù)大于0則喚醒后繼節(jié)點(diǎn)的線程
- 三、如果前驅(qū)節(jié)點(diǎn)不為head的節(jié)點(diǎn)以及前驅(qū)節(jié)點(diǎn)為head節(jié)點(diǎn)但獲取同步狀態(tài)失敗的節(jié)點(diǎn),則調(diào)用
shouldParkAfterFailedAcquire(p,node)判斷前驅(qū)節(jié)點(diǎn)的狀態(tài)是否為SIGNAL狀態(tài)(一般shouldParkAfterFailedAcquire(p,node)中的for循環(huán)至少需要執(zhí)行兩次以上才會(huì)返回ture,第一次把前驅(qū)節(jié)點(diǎn)設(shè)置為SIGNAL狀態(tài),第二次檢測(cè)到SIGNAL狀態(tài)),如果是則調(diào)用parkAndCheckInterrupt()掛起當(dāng)前線程并返回線程中斷狀態(tài)
如上便是doAcquireSharedInterruptibly(arg)方法的大概工作,接下來我們可以看看shouldParkAfterFailedAcquire()以及parkAndCheckInterrupt()方法:
// AbstractQueuedSynchronizer類 → shouldParkAfterFailedAcquire()方法
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 獲取當(dāng)前節(jié)點(diǎn)的等待狀態(tài)
int ws = pred.waitStatus;
// 如果為等待喚醒(SIGNAL)狀態(tài)則返回true
if (ws == Node.SIGNAL)
return true;
// 如果當(dāng)前節(jié)點(diǎn)等待狀態(tài)大于0則說明是結(jié)束狀態(tài),
// 遍歷前驅(qū)節(jié)點(diǎn)直到找到?jīng)]有結(jié)束狀態(tài)的節(jié)點(diǎn)
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 如果當(dāng)前節(jié)點(diǎn)等待狀態(tài)小于0又不是SIGNAL狀態(tài),
// 則將其設(shè)置為SIGNAL狀態(tài),代表該節(jié)點(diǎn)的線程正在等待喚醒
// 也就是代表節(jié)點(diǎn)是剛從Condition的條件等待隊(duì)列轉(zhuǎn)移到同步隊(duì)列,
// 節(jié)點(diǎn)狀態(tài)為CONDITION狀態(tài)(Semaphore中不存在condition的概念,
// 所以同步隊(duì)列不會(huì)出現(xiàn)這個(gè)狀態(tài)的節(jié)點(diǎn),此處代碼不會(huì)執(zhí)行)
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
// AbstractQueuedSynchronizer類 → parkAndCheckInterrupt()方法
private final boolean parkAndCheckInterrupt() {
// 將當(dāng)前線程掛起
LockSupport.park(this);
// 獲取線程中斷狀態(tài),interrupted()是判斷當(dāng)前中斷狀態(tài),
// 而并不是中斷線程,線程需要中斷返回true,反之false
return Thread.interrupted();
}
LockSupport → park()方法
public static void park(Object blocker) {
Thread t = Thread.currentThread();
// 設(shè)置當(dāng)前線程的監(jiān)視器blocker
setBlocker(t, blocker);
// 調(diào)用了native方法到JVM級(jí)別的阻塞機(jī)制阻塞當(dāng)前線程
UNSAFE.park(false, 0L);
// 阻塞結(jié)束后把blocker置空
setBlocker(t, null);
}
shouldParkAfterFailedAcquire()方法的作用是判斷節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn)是否為等待喚醒狀態(tài)(SIGNAL狀態(tài)),如果是則返回true。如果前驅(qū)節(jié)點(diǎn)的waitStatus大于0(只有CANCELLED結(jié)束狀態(tài)=1>0),既代表該前驅(qū)節(jié)點(diǎn)已沒有用了,應(yīng)該從同步隊(duì)列移除,執(zhí)行do/while循環(huán)遍歷所有前前驅(qū)節(jié)點(diǎn),直到尋找到非CANCELLED結(jié)束狀態(tài)的節(jié)點(diǎn)。如果節(jié)點(diǎn)狀態(tài)為SIGNAL等待喚醒狀態(tài)則直接調(diào)用parkAndCheckInterrupt()掛起當(dāng)前線程。至此整個(gè)Semaphore.acquire()獲取許可的方法流程結(jié)束。如下圖:

如上圖,在AQS同步器中存在一個(gè)變量state,Semaphore信號(hào)量對(duì)象在初始化時(shí)傳遞的permits許可數(shù)會(huì)間接的賦值給AQS中的state同步標(biāo)識(shí),而
permits/state則代表著同一時(shí)刻可同時(shí)訪問臨界/共享資源的最大線程數(shù)。當(dāng)一條線程調(diào)用Semaphore.acquire()獲取許可時(shí),會(huì)首先判斷state是否大于0,如果大于則代表還有可用許可數(shù),state減1,線程獲取成功并返回執(zhí)行。直到state為零時(shí),代表著當(dāng)前信號(hào)量已經(jīng)不存在可用許可數(shù)了,后續(xù)請(qǐng)求的線程則需要封裝成Node節(jié)點(diǎn)并將其加入同步隊(duì)列開啟自旋操作直至有線程釋放許可(state加一)。
至此,AQS共享模式中非公平鎖的獲取鎖原理分析完畢。但是我們?nèi)缟戏治龅氖强身憫?yīng)線程中斷請(qǐng)求的獲取許可方式,而Semaphore中也實(shí)現(xiàn)了一套不可中斷式的獲取方法,如下:
// Semaphore類 → acquireUninterruptibly()方法
public void acquireUninterruptibly() {
sync.acquireShared(1);
}
// AbstractQueuedSynchronizer類 → acquireShared()方法
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
// AbstractQueuedSynchronizer類 → doAcquireShared()方法
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
// 在前面的可中斷式獲取鎖方法中此處是直接拋出異常強(qiáng)制中斷線程的
// 而在不可中斷式的獲取方法中,這里是沒有拋出異常中斷線程的
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
觀察如上源碼不難發(fā)現(xiàn),可響應(yīng)線程中斷的方法與不可響應(yīng)線程中斷的方法區(qū)別在于:
可響應(yīng)線程中斷的方法在每次操作之前會(huì)先檢測(cè)線程中斷信號(hào),如果線程需要中斷操作,則直接拋出異常強(qiáng)制中斷線程的執(zhí)行。反之,不可響應(yīng)線程中斷的方法不會(huì)檢測(cè)線程中斷信號(hào),而且不會(huì)拋出異常強(qiáng)制中斷。
2.1.2、信號(hào)量中非公平鎖NonfairSync釋放許可/鎖實(shí)現(xiàn)
使用Semaphore時(shí)釋放鎖則調(diào)用的是Semaphore.release()方法,調(diào)用該方法之后線程持有的許可會(huì)被釋放,同時(shí)permits/state加一,接下來Semaphore獲取許可的方法release()的具體實(shí)現(xiàn),源碼如下:
// Semaphore類 → release()方法
public void release() {
sync.releaseShared(1);
}
// AbstractQueuedSynchronizer類 → releaseShared(arg)方法
public final boolean releaseShared(int arg) {
// 調(diào)用子類Semaphore中tryReleaseShared()方法實(shí)現(xiàn)
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
與之前獲取許可的方法一樣,Semaphore釋放許可的方法release()也是通過間接調(diào)用AQS內(nèi)部的releaseShared(arg)完成。因?yàn)锳QS的releaseShared(arg)是魔法方法,所以最終的邏輯實(shí)現(xiàn)由Semaphore的子類Sync完成,如下:
// Semaphore類 → Sync子類 → tryReleaseShared()方法
protected final boolean tryReleaseShared(int releases) {
for (;;) {
// 獲取AQS中當(dāng)前同步狀態(tài)state值
int current = getState();
// 對(duì)當(dāng)前的state值進(jìn)行增加操作
int next = current + releases;
// 不可能出現(xiàn),除非傳入的releases為負(fù)數(shù)
if (next < current)
throw new Error("Maximum permit count exceeded");
// CAS更新state值為增加之后的next值
if (compareAndSetState(current, next))
return true;
}
}
釋放鎖/許可的方法邏輯相對(duì)來說比較簡(jiǎn)單,對(duì)AQS中的state加一釋放獲取的同步狀態(tài)。不過值得注意的是:在我們上篇文章分享的AQS獨(dú)占模式實(shí)現(xiàn)中,釋放鎖的邏輯中是沒有保證線程安全的,因?yàn)楠?dú)占模式的釋放鎖邏輯永遠(yuǎn)只會(huì)存在一條線程同時(shí)操作。而在共享模式中,可能會(huì)存在多條線程同時(shí)釋放許可/鎖資源,所以在此處使用了CAS+自旋的方式保證線程安全問題。
如果此處tryReleaseShared(releases)CAS更新成功,那么則會(huì)進(jìn)入if(tryReleaseShared(arg))中執(zhí)行doReleaseShared();喚醒后繼節(jié)點(diǎn)線程。
// AbstractQueuedSynchronizer類 → doReleaseShared()方法
private void doReleaseShared() {
/*
* 為了防止釋放過程中有其他線程進(jìn)入隊(duì)列,這里必須開啟自旋
* 如果頭節(jié)點(diǎn)設(shè)置失敗則重新檢測(cè)繼續(xù)循環(huán)
*/
for (;;) {
// 獲取隊(duì)列head頭節(jié)點(diǎn)
Node h = head;
// 如果頭節(jié)點(diǎn)不為空并且隊(duì)列中還存在其他節(jié)點(diǎn)
if (h != null && h != tail) {
// 獲取頭節(jié)點(diǎn)的節(jié)點(diǎn)狀態(tài)
int ws = h.waitStatus;
// 如果節(jié)點(diǎn)狀態(tài)為SIGNAL等待喚醒狀態(tài)則代表
if (ws == Node.SIGNAL) {
// 嘗試cas修改節(jié)點(diǎn)狀態(tài)值為0
// 失敗則繼續(xù)下次循環(huán)
// 成功則喚醒頭節(jié)點(diǎn)的后繼節(jié)點(diǎn)
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue;
unparkSuccessor(h); // 喚醒后繼節(jié)點(diǎn)線程
}
// 節(jié)點(diǎn)狀態(tài)為0時(shí)嘗試將節(jié)點(diǎn)狀態(tài)修改為PROPAGATE傳播狀態(tài)
// 失敗則跳出循環(huán)繼續(xù)下次循環(huán)
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue;
}
// 如果當(dāng)前隊(duì)列頭節(jié)點(diǎn)發(fā)生變化繼續(xù)循環(huán),反之則終止自旋
if (h == head)
break;
}
}
// AbstractQueuedSynchronizer類 → unparkSuccessor()方法
// 參數(shù):傳入需要喚醒后繼節(jié)點(diǎn)的節(jié)點(diǎn)
private void unparkSuccessor(Node node) {
// 獲取node節(jié)點(diǎn)的線程狀態(tài)
int ws = node.waitStatus;
if (ws < 0)
// 設(shè)置head節(jié)點(diǎn)為0
compareAndSetWaitStatus(node, ws, 0);
// 獲取后繼節(jié)點(diǎn)
Node s = node.next;
// 如果后繼節(jié)點(diǎn)為空或線程狀態(tài)已經(jīng)結(jié)束
if (s == null || s.waitStatus > 0) {
s = null;
// 遍歷整個(gè)隊(duì)列拿到可喚醒的節(jié)點(diǎn)
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
// 喚醒后繼節(jié)點(diǎn)線程
LockSupport.unpark(s.thread);
}
在doReleaseShared()方法直接在執(zhí)行中獲取head節(jié)點(diǎn),通過調(diào)用unparkSuccessor()方法喚醒head后繼節(jié)點(diǎn)中的線程。而因?yàn)樵摲椒w的邏輯是一個(gè)for(;;){}死循環(huán),退出的條件為:只當(dāng)隊(duì)頭不發(fā)生改變時(shí)才退出,如果發(fā)生改變了則代表著一定有其他線程在當(dāng)前線程釋放鎖/許可的過程中獲取到了鎖,所以當(dāng)前循環(huán)會(huì)繼續(xù),在第二次循環(huán)過程中,在滿足條件h.waitStauts==0的情況下,這里會(huì)把head節(jié)點(diǎn)的waitStauts設(shè)置為Node.PROPAGATE傳播狀態(tài)是為了保證喚醒傳遞。因?yàn)锳QS共享模式下是會(huì)出現(xiàn)多個(gè)線程同時(shí)對(duì)同步狀態(tài)標(biāo)識(shí)state進(jìn)行操作,如線程T1在執(zhí)行release()→doReleaseShared()釋放許可操作,剛喚醒后繼線程準(zhǔn)備替換為head頭節(jié)點(diǎn)(準(zhǔn)備替換但是還沒替換),此時(shí)另外一條線程T2正好在同一時(shí)刻執(zhí)行acquire()→doAcquireShared()→setHeadAndPropagate()獲取鎖操作,假設(shè)T2線程獲取的是最后一個(gè)可用許可,在執(zhí)行到setHeadAndPropagate()方法(這個(gè)方法中存在一個(gè)超長(zhǎng)判斷),傳入的propagate=0:
// AbstractQueuedSynchronizer類 → setHeadAndPropagate()方法
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // 獲取同步隊(duì)列中原本的head頭節(jié)點(diǎn)
setHead(node); // 將傳入的node節(jié)點(diǎn)設(shè)置為頭節(jié)點(diǎn)
/*
* propagate=剩余可用許可數(shù),h=舊的head節(jié)點(diǎn)
* h==null,(h=head)==null:
* 非空判斷的標(biāo)準(zhǔn)寫法,避免原本head以及新的頭節(jié)點(diǎn)node為空
* 如果當(dāng)前信號(hào)量對(duì)象中剩余可用許可數(shù)大于0或者
* 原本頭節(jié)點(diǎn)h或者新的頭節(jié)點(diǎn)node不是結(jié)束狀態(tài)則喚醒后繼節(jié)點(diǎn)線程
*
* 寫兩個(gè)if的原因在于避免造成不必要的喚醒,因?yàn)楹苡锌赡軉拘蚜撕罄m(xù)
* 節(jié)點(diǎn)的線程之后,還沒有線程釋放許可/鎖,從而導(dǎo)致再次陷入阻塞
*/
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
// 避免傳入的node為同步隊(duì)列的唯一節(jié)點(diǎn),
// 因?yàn)殛?duì)列中如果只存在node一個(gè)節(jié)點(diǎn),那么后驅(qū)節(jié)點(diǎn)s必然為空
if (s == null || s.isShared())
doReleaseShared(); // 喚醒后繼節(jié)點(diǎn)
}
}
// 超長(zhǎng)判斷:該判斷的作用在于面對(duì)各種特殊情況能夠時(shí)保證及時(shí)獲取鎖
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
// 避免傳入的node為同步隊(duì)列的唯一節(jié)點(diǎn),
// 因?yàn)殛?duì)列中如果只存在node一個(gè)節(jié)點(diǎn),那么后驅(qū)節(jié)點(diǎn)s必然為空
if (s == null || s.isShared())
doReleaseShared(); // 喚醒后繼節(jié)點(diǎn)
}
根據(jù)這個(gè)超長(zhǎng)判斷的邏輯,因?yàn)閭魅氲?code>propagate=0代表著當(dāng)前已經(jīng)沒有可用許可數(shù)了,不滿足超長(zhǎng)判斷中的第一個(gè)條件propagate>0,所以T2線程獲取鎖之后理論上是不需要再喚醒其他線程獲取鎖/許可,但是因?yàn)門1線程已經(jīng)訪問完成臨界資源了正在釋放持有的許可,那么就會(huì)造成一種情況:隊(duì)列中head節(jié)點(diǎn)的后繼節(jié)點(diǎn)如果此時(shí)嘗試獲取鎖/許可,那么有很大幾率獲取到T1線程釋放的許可。所以在釋放鎖時(shí),將head節(jié)點(diǎn)的waitStauts設(shè)置為Node.PROPAGATE傳播狀態(tài)值為-3,滿足這個(gè)超長(zhǎng)判斷中的第三個(gè)條件h.waitStatus < 0,所以此時(shí)T2也會(huì)喚醒head后繼節(jié)點(diǎn)中等待獲取鎖/許可資源的線程。這樣去實(shí)現(xiàn)的好處在于:能夠充分照顧到head的后繼節(jié)點(diǎn)同時(shí)也能保證喚醒的傳遞。
至于這里為什么獲取到鎖/許可的線程需要繼續(xù)喚醒后繼節(jié)點(diǎn)線程?因?yàn)檫@里是共享鎖,而不是獨(dú)占鎖。一個(gè)線程剛獲得了共享鎖/許可,那么很有可能還有剩余的共享鎖可供排隊(duì)在后面的線程獲得,所以需要喚醒后面的線程。
至此,釋放許可邏輯結(jié)束,對(duì)比獲取許可的邏輯相對(duì)來說要簡(jiǎn)單許多,只需要更新state值后調(diào)用doReleaseShared()方法喚醒后繼節(jié)點(diǎn)線程即可。但是在理解doReleaseShared()方法時(shí)需要額外注意:調(diào)用doReleaseShared()方法的線程會(huì)存在兩種:
- 一是釋放共享鎖/許可數(shù)的線程。調(diào)用release()方法釋放許可時(shí)必然調(diào)用它喚醒后繼線程
- 二是剛獲取到共享鎖/許可數(shù)的線程。一定情況下,在滿足“超長(zhǎng)判斷”的任意條件時(shí)也會(huì)調(diào)用它喚醒后繼線程
2.2、AQS共享模式之Semaphore的FairSync公平鎖實(shí)現(xiàn)
AQS共享模式中的公平鎖實(shí)現(xiàn)除開在獲取鎖的邏輯上與非公平鎖的有些許不同外,其他的實(shí)現(xiàn)大致相同。
2.2.1、信號(hào)量中公平鎖FairSync獲取許可/鎖實(shí)現(xiàn)
公平鎖的概念是指先請(qǐng)求鎖的線程一定比后請(qǐng)求鎖的線程要先執(zhí)行,先獲取到鎖資源,從時(shí)間上需要保證執(zhí)行的先后順序。
公平鎖獲取許可執(zhí)行邏輯:Semaphore.acquire()獲取許可方法 → AQS.acquireSharedInterruptibly()方法 → AQS.tryAcquireShared()獲取共享鎖模板方法 → FairSync.tryAcquireShared()方法
// Semaphore類 → 構(gòu)造方法
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
// Semaphore類 → acquire()獲取鎖方法
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
// AbstractQueuedSynchronizer類 → acquireSharedInterruptibly()方法
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 調(diào)用AQS定義的獲取共享鎖的模板方法
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
// AbstractQueuedSynchronizer類 → tryAcquireShared()模板方法
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
// Semaphore類 → FairSync公平鎖類
static final class FairSync extends Sync {
FairSync(int permits) {
super(permits);
}
// Semaphore類 → FairSync內(nèi)部類 → tryAcquireShared()子類實(shí)現(xiàn)
protected int tryAcquireShared(int acquires) {
for (;;) {
// 不同點(diǎn):先判斷隊(duì)列中是否存在節(jié)點(diǎn)后再執(zhí)行獲取鎖操作
if (hasQueuedPredecessors())
return -1;
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
}
從獲取鎖/許可的代碼中可以非常明顯的看出,公平鎖的實(shí)現(xiàn)相對(duì)來說,與非公平鎖的唯一不同點(diǎn)在于:公平鎖的模式下獲取鎖,會(huì)先調(diào)用hasQueuedPredecessors()方法判斷同步隊(duì)列中是否存在節(jié)點(diǎn),如果存在則直接返回-1回到acquireSharedInterruptibly()方法if(tryAcquireShared(arg)<0)判斷,調(diào)用doAcquireSharedInterruptibly(arg)方法將當(dāng)前線程封裝成Node.SHARED共享節(jié)點(diǎn)加入同步隊(duì)列等待。反之,如果隊(duì)列中不存在節(jié)點(diǎn)則嘗試直接獲取鎖/許可。
2.2.2、信號(hào)量中公平鎖FairSync釋放許可/鎖實(shí)現(xiàn)
公平鎖釋放許可的邏輯與非公平鎖的實(shí)現(xiàn)是一致的,因?yàn)槎际荢ync類的子類,而釋放鎖的邏輯都是對(duì)state減一更新后,喚醒后繼節(jié)點(diǎn)的線程。所以關(guān)于釋放鎖的具體實(shí)現(xiàn)則是交由Sync類實(shí)現(xiàn),這里不再重復(fù)贅述。
三、ReetrantLock與Semaphore的區(qū)別
| 對(duì)比項(xiàng) | ReetrantLock | Semaphore |
|---|---|---|
| 實(shí)現(xiàn)模式 | 獨(dú)占模式 | 共享模式 |
| 獲取鎖方法 | tryAcquire() | tryAcquireShared() |
| 釋放鎖方法 | tryRelease() | tryAcquireShared() |
| 是否支持重入 | 支持 | 不支持 |
| 線程中斷 | 支持 | 支持 |
| Condition | 支持 | 不支持 |
| 隊(duì)列數(shù)量 | 一個(gè)同步+多個(gè)等待 | 單個(gè)同步 |
| 節(jié)點(diǎn)類型 | Node.EXCLUSIVE | Node.SHARED |
四、共享模式的其他實(shí)現(xiàn)者
除開Semaphore信號(hào)量的實(shí)現(xiàn)是基于AQS的共享模式之外,在JUC并發(fā)包中CountDownLatch、ReetrantReadWriteLock讀寫鎖的Read讀鎖等都是基于AQS的共享模式實(shí)現(xiàn)的,下面我們也可以簡(jiǎn)單的看看關(guān)于CountDownLatch的用法。
4.1、CountDownLatch應(yīng)用場(chǎng)景實(shí)戰(zhàn)
在CountDownLatch初始化時(shí)和Semaphore一樣,我們需要傳入一個(gè)數(shù)字count作為最大線程數(shù)
CountDownLatch countDownLatch = new CountDownLatch(3);
這個(gè)參數(shù)同樣會(huì)間接的賦值給AQS內(nèi)部的state同步狀態(tài)標(biāo)識(shí)。一般我們會(huì)調(diào)用它的兩個(gè)方法:await()與countDown():
- await():調(diào)用await()方法的線程會(huì)被封裝成共享節(jié)點(diǎn)加入同步隊(duì)列阻塞等待,直至state=0時(shí)才會(huì)喚醒同步隊(duì)列中所有的線程
- countDown():調(diào)用countDown()方法的線程會(huì)對(duì)state減一
而關(guān)于CountDownLatch有兩種用法:
- 一、多等一:初始化count=1,多條線程await()阻塞,一條線程調(diào)用countDown()喚醒所有阻塞線程
- 二、一等多:初始化count=x,多線程countDown()對(duì)count進(jìn)行減一,一條線程await()阻塞,當(dāng)count=0時(shí)阻塞的線程開始執(zhí)行
如上兩種用法在我們的項(xiàng)目中也可以有很多的應(yīng)用場(chǎng)景,多等一的用法我們可以用來在一定程度上模擬并發(fā)測(cè)試接口并發(fā)安全問題、死鎖問題等,如:
final CountDownLatch countDownLatch = new CountDownLatch(1);
for (int i = 1; i <= 3; i++) {
new Thread(() -> {
try {
System.out.println("線程:" + Thread.currentThread().getName()
+ "....阻塞等待!");
countDownLatch.await();
// 可以在此處調(diào)用需要并發(fā)測(cè)試的方法或接口
System.out.println("線程:" + Thread.currentThread().getName()
+ "....開始執(zhí)行!");
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "T" + i).start();
}
Thread.sleep(1000);
countDownLatch.countDown();
/*
程序開始運(yùn)行:
線程:T2....阻塞等待!
線程:T1....阻塞等待!
線程:T3....阻塞等待!
程序運(yùn)行一秒后(三條線程幾乎同時(shí)執(zhí)行):
線程:T2....開始執(zhí)行!
線程:T1....開始執(zhí)行!
線程:T3....開始執(zhí)行!
*/
如上案例中,創(chuàng)建了一個(gè)CountDownLatch對(duì)象,初始化時(shí)傳遞count=1,循環(huán)創(chuàng)建三條線程T1,T2,T3阻塞等待,主線程在一秒后調(diào)用countDown()喚醒了同步隊(duì)列中的三條線程繼續(xù)執(zhí)行,原理如下:

我們?cè)趯?shí)際開發(fā)過程中,往往很多并發(fā)任務(wù)都存在前后依賴關(guān)系,如詳情頁需要調(diào)用多個(gè)接口完成數(shù)據(jù)聚合、并行執(zhí)行獲取到數(shù)據(jù)后需要進(jìn)行結(jié)果合并、多個(gè)操作完成后需要進(jìn)行數(shù)據(jù)檢查等等,而這些場(chǎng)景下我們可以使用一等多的用法:
final CountDownLatch countDownLatch = new CountDownLatch(3);
Map data = new HashMap();
for (int i = 1; i <= 3; i++) {
final int page = i;
new Thread(() -> {
System.out.println("線程:" + Thread.currentThread().getName() +
"....讀取分段數(shù)據(jù):"+(page-1)*200+"-"+page*200+"行");
// 數(shù)據(jù)加入結(jié)果集:data.put();
countDownLatch.countDown();
}, "T" + i).start();
}
countDownLatch.await();
System.out.println("線程:" + Thread.currentThread().getName()
+ "....對(duì)數(shù)據(jù)集:data進(jìn)行處理");
/*
運(yùn)行結(jié)果:
線程:T1....讀取分段數(shù)據(jù):0-200行
線程:T2....讀取分段數(shù)據(jù):200-400行
線程:T3....讀取分段數(shù)據(jù):400-600行
線程main....對(duì)數(shù)據(jù)集:data進(jìn)行處理
*/
如上一等多的案例中,for循環(huán)開啟三個(gè)線程T1,T2,T3并行執(zhí)行同時(shí)讀取數(shù)據(jù)增快處理效率,讀取完成之后將數(shù)據(jù)加入data結(jié)果集中匯總,主線程等待三條線程讀取完成后對(duì)數(shù)據(jù)集data進(jìn)行處理,如下:

4.2、CountDownLatch實(shí)現(xiàn)原理
前面我們?cè)岬竭^,CountDownLatch也是基于AQS共享模式實(shí)現(xiàn)的,與Semaphore一樣,會(huì)將傳入的count間接的賦值給AQS內(nèi)部的state同步狀態(tài)標(biāo)識(shí)。
private final Sync sync;
// CountDownLatch構(gòu)造方法
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
// 對(duì)其內(nèi)部Sync對(duì)象進(jìn)行初始化
this.sync = new Sync(count);
}
// CountDownLatch類 → Sync內(nèi)部類
private static final class Sync extends AbstractQueuedSynchronizer {
// Sync構(gòu)造函數(shù):對(duì)AQS內(nèi)部的state進(jìn)行賦值
Sync(int count) {setState(count);}
// 調(diào)用await()方法最終會(huì)調(diào)用到這里
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
}
// CountDownLatch類 → await()方法
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
在CountDownLatch中,存在一個(gè)Sync內(nèi)部類,當(dāng)我們創(chuàng)建一個(gè)CountDownLatch對(duì)象時(shí),實(shí)則其內(nèi)部的構(gòu)造函數(shù)是在對(duì)其sync對(duì)象進(jìn)行初始化,與我們前面所說的一樣
CountDownLatch countDownLatch = new CountDownLatch(count);
初始化時(shí)傳遞的count數(shù)字最終會(huì)通過調(diào)用setState(state)方法賦值給AQS內(nèi)部的同步狀態(tài)標(biāo)識(shí)state變量,而當(dāng)線程調(diào)用await()方法時(shí),會(huì)調(diào)用AQS的acquireSharedInterruptibly()方法:
// AbstractQueuedSynchronizer類 → acquireSharedInterruptibly()方法
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 最終調(diào)用到CountDownLatch內(nèi)部Sync類的tryAcquireShared()方法
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
// CountDownLatch類 → Sync內(nèi)部類 → tryAcquireShared()方法
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
因?yàn)锳QS中tryAcquireShared(arg)方法僅是模板方法的原因,所以線程在執(zhí)行acquireSharedInterruptibly()方法時(shí)最終會(huì)調(diào)用到CountDownLatch內(nèi)部Sync類的tryAcquireShared()方法。當(dāng)count/state為0時(shí)返回true,反之,count不為0時(shí)返回false,最終回到if(tryAcquireShared(arg)<0)執(zhí)行時(shí),如果count不為0則執(zhí)行doAcquireSharedInterruptibly(arg)方法將當(dāng)前線程信息封裝成Node.SHARED共享節(jié)點(diǎn)加入同步隊(duì)列阻塞等待。原理如下:

如上便是
CountDownLatch.await()的實(shí)現(xiàn)原理,大體來說還是比較簡(jiǎn)單。下面我們接著分析一下CountDownLatch.countDown()方法的實(shí)現(xiàn)。
// CountDownLatch類 → countDown()方法
public void countDown() {
sync.releaseShared(1);
}
// AbstractQueuedSynchronizer類 → releaseShared()方法
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
// AbstractQueuedSynchronizer類 → 模板方法:tryReleaseShared()
protected boolean tryReleaseShared(int arg) {
throw new UnsupportedOperationException();
}
// CountDownLatch類 → Sync內(nèi)部類 → tryReleaseShared()方法
// 調(diào)用countDown()方法最終會(huì)調(diào)用到這里
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
調(diào)用CountDownLatch.countDown()方法后會(huì)調(diào)用AQS的tryReleaseShared()模板方法,最終調(diào)用CountDownLatch中Sync內(nèi)部類的tryReleaseShared()方法。在該方法中首先會(huì)先對(duì)于state/count進(jìn)行一次為0判斷,如果不為0則對(duì)count/state減一,然后再次對(duì)更新之后的state/count進(jìn)行為0判斷,如果減一后state等于0返回true,回到releaseShared()的if(tryReleaseShared(arg))執(zhí)行doReleaseShared()喚醒同步隊(duì)列中的阻塞線程。反之,如果減一后不為0,當(dāng)前線程則直接返回,方法結(jié)束。如下:

4.3、CountDownLatch與CyclicBarrier的區(qū)別
在JUC包中還存在另一個(gè)和CountDownLatch作用相同的工具類:CyclicBarrier。與CountDownLatch不同的是:CyclicBarrier是基于AQS的獨(dú)占模式實(shí)現(xiàn)的,其內(nèi)部通過ReetrantLock與Condition實(shí)現(xiàn)線程阻塞與喚醒。對(duì)比如下:
| 對(duì)比項(xiàng) | CountDownLatch | CyclicBarrier |
|---|---|---|
| 實(shí)現(xiàn)模式 | 共享模式 | 獨(dú)占模式 |
| 計(jì)數(shù)方式 | 減法 | 減法 |
| 復(fù)用支持 | 不可復(fù)用 | 計(jì)數(shù)可置0 |
| 重置支持 | 不可重置 | 可重置 |
| 設(shè)計(jì)重點(diǎn) | 一等多 | 多等多 |
五、總結(jié)
通過Semaphore與CountDownLatch原理進(jìn)行分析后,不難得知,在初始化時(shí)傳遞的許可數(shù)/計(jì)數(shù)器最終都會(huì)間接的傳遞給AQS的同步狀態(tài)標(biāo)識(shí)state。當(dāng)一條線程嘗試獲取共享鎖時(shí),會(huì)對(duì)state減一,當(dāng)state為0時(shí)代表沒有可用共享鎖了,其他后續(xù)請(qǐng)求的線程會(huì)被封裝成共享節(jié)點(diǎn)加入同步隊(duì)列等待,直至其他持有共享鎖的線程釋放(state加一)。不過與獨(dú)占模式不同的是:共享模式中,除開釋放鎖時(shí)會(huì)喚醒后繼節(jié)點(diǎn)的線程外,獲取共享鎖成功的線程也會(huì)在滿足一定條件下喚醒后繼節(jié)點(diǎn)。至于共享模式中的公平鎖與非公平鎖則與之前的獨(dú)占模式的公平鎖與非公平鎖相同,公平鎖情況下,先判斷隊(duì)列是否存在Node再獲取鎖,從而保證每條線程獲取共享鎖時(shí)都是先到先得的順序執(zhí)行的。而非公平鎖情況下,通過線程競(jìng)爭(zhēng)的方式獲取,不管隊(duì)列中是否已經(jīng)存在Node節(jié)點(diǎn),請(qǐng)求的線程都會(huì)先執(zhí)行一遍獲取鎖的邏輯,只要執(zhí)行成功就能獲取到共享鎖,獲得線程執(zhí)行權(quán)。