概述
前段時(shí)間在解決請(qǐng)求風(fēng)控服務(wù)器超時(shí)的問(wèn)題時(shí),涉及到到一個(gè)CountDownLunch的并發(fā)工具類(lèi),非常實(shí)用,順記自然就去研究了一下相關(guān)的并發(fā)工具類(lèi)。
在JDK的并發(fā)包里(java.util.concurrent)提供了這樣幾個(gè)非常有用的并發(fā)工具類(lèi)來(lái)解決并發(fā)編程的流程控制。分別是CountDownLatch、CyclicBarrier和Semaphore。
1. CountDownLatch
1.1 CountDownLatch是什么?
CountDownLatch打多是被用在等待多線程完成,具體來(lái)說(shuō)就是允許一個(gè)或多個(gè)線程等待其他線程完成操作。
1.2 CountDownLatch原理?
API 文檔有這樣一盒解釋
A CountDownLatch is a versatile synchronization tool and can be used for a number of purposes. A CountDownLatch initialized with a count of one serves as a simple on/off latch, or gate: all threads invoking await wait at the gate until it is opened by a thread invoking countDown(). A CountDownLatch initialized to N can be used to make one thread wait until N threads have completed some action, or some action has been completed N times.
構(gòu)造函數(shù)
//Constructs a CountDownLatch initialized with the given count.
public void CountDownLatch(int count) {...}
在 CountDownLunch啟動(dòng)的時(shí)候。主線程必須在啟動(dòng)其他線程后立即調(diào)用CountDownLatch.await()方法。這樣主線程的操作就會(huì)在這個(gè)方法上阻塞,直到其他線程完成各自的任務(wù)。
public void countDown()
在每次任務(wù)執(zhí)行完直接調(diào)用,計(jì)數(shù)器就會(huì)減一操作。
public boolean await(long timeout,TimeUnit unit) throws InterruptedException
public void await() throws InterruptedException
這個(gè)方法就是用來(lái)堵塞主線程的,前者是有等待時(shí)間的,可以自定義,后者是無(wú)限等待,知道其他count 計(jì)數(shù)器為0為止。

詳細(xì)的 demo 就不在這里粘貼了
如有需要傳送門(mén)
1.3 使用場(chǎng)景
超時(shí)機(jī)制
主線程里面設(shè)置好等待時(shí)間,如果發(fā)現(xiàn)在規(guī)定時(shí)間內(nèi)還是沒(méi)有返回結(jié)果,那就喚醒主線程,拋棄。
開(kāi)始執(zhí)行前等待n個(gè)線程完成各自任務(wù)
例如應(yīng)用程序啟動(dòng)類(lèi)要確保在處理用戶請(qǐng)求前,所有N個(gè)外部系統(tǒng)已經(jīng)啟動(dòng)和運(yùn)行了。
死鎖檢測(cè)
一個(gè)非常方便的使用場(chǎng)景是,你可以使用n個(gè)線程訪問(wèn)共享資源,在每次測(cè)試階段的線程數(shù)目是不同的,并嘗試產(chǎn)生死鎖。
若有不正之處請(qǐng)多多諒解,并歡迎各位大牛批評(píng)指正。
1.4 深入源碼
這里面我簡(jiǎn)單的研究了一下CountDownLunch 源碼。
底層是由AbstractQueuedSynchronizer提供支持(后面就簡(jiǎn)稱(chēng) AQS),所以其數(shù)據(jù)結(jié)構(gòu)就是AQS的數(shù)據(jù)結(jié)構(gòu),而AQS的核心就是兩個(gè)虛擬隊(duì)列:同步隊(duì)列syncQueue 和條件隊(duì)列conditionQueue(前者數(shù)據(jù)結(jié)構(gòu)是雙向鏈表,后者是單向鏈表)不同的條件會(huì)有不同的條件隊(duì)列。
本省CountDownLunch繼承的是 Object,比較簡(jiǎn)單,但是存在內(nèi)部類(lèi),Sync,繼承自AbstractQueuedSynchronizer,我簡(jiǎn)單理解一下
private static final class Sync extends AbstractQueuedSynchronizer {
// 版本號(hào)
private static final long serialVersionUID = 4982264981922014374L;
// 構(gòu)造器
Sync(int count) {
setState(count);
}
// 返回當(dāng)前計(jì)數(shù)
int getCount() {
return getState();
}
// 試圖在共享模式下獲取對(duì)象狀態(tài)
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
// 試圖設(shè)置狀態(tài)來(lái)反映共享模式下的一個(gè)釋放
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
// 無(wú)限循環(huán)
for (;;) {
// 獲取狀態(tài)
int c = getState();
if (c == 0) // 沒(méi)有被線程占有
return false;
// 下一個(gè)狀態(tài)
int nextc = c-1;
if (compareAndSetState(c, nextc)) // 比較并且設(shè)置成功
return nextc == 0;
}
}
}
1.4.1核心函數(shù)分析
- await函數(shù)
此函數(shù)將會(huì)使當(dāng)前線程在鎖存器倒計(jì)數(shù)至零之前一直等待,除非線程被中斷。其源碼如下
public void await() throws InterruptedException{
// 轉(zhuǎn)發(fā)到sync對(duì)象上
sync.acquireSharedInterruptibly(1);
}
源碼可知,對(duì)CountDownLatch對(duì)象的await的調(diào)用會(huì)轉(zhuǎn)發(fā)為對(duì)Sync的acquireSharedInterruptibly(從AQS繼承的方法)方法的調(diào)用。
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
這里先檢測(cè)了線程中斷狀態(tài),中斷了則拋出異常,接下來(lái)調(diào)用tryAcquireShared,tryAcquireShared是Syn的實(shí)現(xiàn)的
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
其實(shí)就是簡(jiǎn)單的獲取了同步器的state,判斷是否為0.
接下來(lái)是
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
關(guān)鍵點(diǎn) 我看到的是
parkAndCheckInterrupt()
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
執(zhí)行到此處時(shí),線程會(huì)阻塞,知道有其他線程喚醒此線程,執(zhí)行await之后,上文中的主線程阻塞在這。

- countDown函數(shù)
此函數(shù)將遞減鎖存器的計(jì)數(shù),如果計(jì)數(shù)到達(dá)零,則釋放所有等待的線程
void countDown() {
sync.releaseShared(1);
}
可以看出 對(duì)countDown的調(diào)用轉(zhuǎn)換為對(duì)Sync對(duì)象的releaseShared(從AQS繼承而來(lái))方法的調(diào)用。
這里面的具體原理能力有限,有點(diǎn)看不懂,CAS相關(guān)的東西。
1.5 小結(jié)
不得不說(shuō)countdownlatch是一個(gè)很高的線程控制工具,極大的方便了我們開(kāi)發(fā)。由于知識(shí)能力有限,上面是自己的一點(diǎn)見(jiàn)識(shí),有什么錯(cuò)誤還望提出,便于我及時(shí)改進(jìn)。