8.Countdownlatch

Countdownlatch

此小節(jié)介紹幾個(gè)與鎖有關(guān)的有用工具。

閉鎖(Latch)

閉鎖(Latch):一種同步方法,可以延遲線程的進(jìn)度直到線程到達(dá)某個(gè)終點(diǎn)狀態(tài)。通俗的講就是,一個(gè)閉鎖相當(dāng)于一扇大門,在大門打開(kāi)之前所有線程都被阻斷,一旦大門打開(kāi)所有線程都將通過(guò),但是一旦大門打開(kāi),所有線程都通過(guò)了,那么這個(gè)閉鎖的狀態(tài)就失效了,門的狀態(tài)也就不能變了,只能是打開(kāi)狀態(tài)。也就是說(shuō)閉鎖的狀態(tài)是一次性的,它確保在閉鎖打開(kāi)之前所有特定的活動(dòng)都需要在閉鎖打開(kāi)之后才能完成。

CountDownLatch是JDK 5+里面閉鎖的一個(gè)實(shí)現(xiàn),允許一個(gè)或者多個(gè)線程等待某個(gè)事件的發(fā)生。CountDownLatch有一個(gè)正數(shù)計(jì)數(shù)器,countDown方法對(duì)計(jì)數(shù)器做減操作,await方法等待計(jì)數(shù)器達(dá)到0。所有await的線程都會(huì)阻塞直到計(jì)數(shù)器為0或者等待線程中斷或者超時(shí)。

CountDownLatch的API如下。

  • public void await() throws InterruptedException
  • public boolean await(long timeout, TimeUnit unit) throws InterruptedException
  • public void countDown()
  • public long getCount()

其中getCount()描述的是當(dāng)前計(jì)數(shù),通常用于調(diào)試目的。

下面的例子中描述了閉鎖的兩種常見(jiàn)的用法。

package xylz.study.concurrency.lock;

import java.util.concurrent.CountDownLatch;

public class PerformanceTestTool {

? public long timecost(final int times, final Runnable task) throws InterruptedException {
? if (times <= 0) throw new IllegalArgumentException();
? final CountDownLatch startLatch = new CountDownLatch(1);
? final CountDownLatch overLatch = new CountDownLatch(times);
? for (int i = 0; i < times; i++) {
? new Thread(new Runnable() {
? public void run() {
? try {
? startLatch.await();
? //
? task.run();
? } catch (InterruptedException ex) {
? Thread.currentThread().interrupt();
? } finally {
? overLatch.countDown();
? }
? }
? }).start();
? }
? //
? long start = System.nanoTime();
? startLatch.countDown();
? overLatch.await();
? return System.nanoTime() - start;
? }

}

在上面的例子中使用了兩個(gè)閉鎖,第一個(gè)閉鎖確保在所有線程開(kāi)始執(zhí)行任務(wù)前,所有準(zhǔn)備工作都已經(jīng)完成,一旦準(zhǔn)備工作完成了就調(diào)用startLatch.countDown()打開(kāi)閉鎖,所有線程開(kāi)始執(zhí)行。第二個(gè)閉鎖在于確保所有任務(wù)執(zhí)行完成后主線程才能繼續(xù)進(jìn)行,這樣保證了主線程等待所有任務(wù)線程執(zhí)行完成后才能得到需要的結(jié)果。在第二個(gè)閉鎖當(dāng)中,初始化了一個(gè)N次的計(jì)數(shù)器,每個(gè)任務(wù)執(zhí)行完成后都會(huì)將計(jì)數(shù)器減一,所有任務(wù)完成后計(jì)數(shù)器就變?yōu)榱?,這樣主線程閉鎖overLatch拿到此信號(hào)后就可以繼續(xù)往下執(zhí)行了。

根據(jù)前面的happend-before法則可以知道閉鎖有以下特性:

內(nèi)存一致性效果:線程中調(diào)用 countDown() 之前的操作 happen-before****** 緊跟在從另一個(gè)線程中對(duì)應(yīng) await() 成功返回的操作。

在上面的例子中第二個(gè)閉鎖相當(dāng)于把一個(gè)任務(wù)拆分成N份,每一份獨(dú)立完成任務(wù),主線程等待所有任務(wù)完成后才能繼續(xù)執(zhí)行。這個(gè)特性在后面的線程池框架中會(huì)用到,其實(shí)FutureTask就可以看成一個(gè)閉鎖。后面的章節(jié)還會(huì)具體分析FutureTask的。

同樣基于探索精神,仍然需要“窺探”下CountDownLatch里面到底是如何實(shí)現(xiàn)await*countDown的。

首先,研究下await()方法。內(nèi)部直接調(diào)用了AQSacquireSharedInterruptibly(1)。

public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
? if (Thread.interrupted())
? throw new InterruptedException();
? if (tryAcquireShared(arg) < 0)
? doAcquireSharedInterruptibly(arg);
}

前面一直提到的都是獨(dú)占鎖(排它鎖、互斥鎖),現(xiàn)在就用到了另外一種鎖,共享鎖。

所謂共享鎖是說(shuō)所有共享鎖的線程共享同一個(gè)資源,一旦任意一個(gè)線程拿到共享資源,那么所有線程就都擁有的同一份資源。也就是通常情況下共享鎖只是一個(gè)標(biāo)志,所有線程都等待這個(gè)標(biāo)識(shí)是否滿足,一旦滿足所有線程都被激活(相當(dāng)于所有線程都拿到鎖一樣)。這里的閉鎖CountDownLatch就是基于共享鎖的實(shí)現(xiàn)。

閉鎖中關(guān)于AQStryAcquireShared的實(shí)現(xiàn)是如下代碼(java.util.concurrent.CountDownLatch.Sync.tryAcquireShared):

public int tryAcquireShared(int acquires) {
? return getState() == 0? 1 : -1;
}

在這份邏輯中,對(duì)于閉鎖而言第一次await時(shí)tryAcquireShared應(yīng)該總是-1,因?yàn)閷?duì)于閉鎖CountDownLatch而言state的值就是初始化的count值。這也就解釋了為什么在countDown調(diào)用之前閉鎖的count總是>0。

private void doAcquireSharedInterruptibly(int arg)
? throws InterruptedException {
? final Node node = addWaiter(Node.SHARED);
? try {
? for (;;) {
? final Node p = node.predecessor();
? if (p == head) {
? int r = tryAcquireShared(arg);
? if (r >= 0) {
? setHeadAndPropagate(node, r);
? p.next = null; // help GC
? return;
? }
? }
? if (shouldParkAfterFailedAcquire(p, node) &&
? parkAndCheckInterrupt())
? break;
? }
? } catch (RuntimeException ex) {
? cancelAcquire(node);
? throw ex;
? }
? // Arrive here only if interrupted
? cancelAcquire(node);
? throw new InterruptedException();
}

上面的邏輯展示了如何通過(guò)await將所有線程串聯(lián)并掛起,直到被喚醒或者條件滿足或者被中斷。整個(gè)過(guò)程是這樣的:

    1. 將當(dāng)前線程節(jié)點(diǎn)以共享模式加入AQSCLH隊(duì)列中(相關(guān)概念參考這里這里)。進(jìn)行2。
    2. 檢查當(dāng)前節(jié)點(diǎn)的前任節(jié)點(diǎn),如果是頭結(jié)點(diǎn)并且當(dāng)前閉鎖計(jì)數(shù)為0就將當(dāng)前節(jié)點(diǎn)設(shè)置為頭結(jié)點(diǎn),喚醒繼任節(jié)點(diǎn),返回(結(jié)束線程阻塞)。否則進(jìn)行3。
    3. 檢查線程是否該阻塞,如果應(yīng)該就阻塞(park),直到被喚醒(unpark)。重復(fù)2。
    4. 如果2、3有異常就拋出異常(結(jié)束線程阻塞)。

這里有一點(diǎn)值得說(shuō)明下,設(shè)置頭結(jié)點(diǎn)并喚醒繼任節(jié)點(diǎn)setHeadAndPropagate。由于前面tryAcquireShared總是返回1或者-1,而進(jìn)入setHeadAndPropagate時(shí)總是propagate>=0,所以這里propagate==1。后面喚醒繼任節(jié)點(diǎn)操作就非常熟悉了。

private void setHeadAndPropagate(Node node, int propagate) {
? setHead(node);
? if (propagate > 0 && node.waitStatus != 0) {
? Node s = node.next;
? if (s == null || s.isShared())
? unparkSuccessor(node);
? }
}

從上面的所有邏輯可以看出countDown應(yīng)該就是在條件滿足(計(jì)數(shù)為0)時(shí)喚醒頭結(jié)點(diǎn)(時(shí)間最長(zhǎng)的一個(gè)節(jié)點(diǎn)),然后頭結(jié)點(diǎn)就會(huì)根據(jù)FIFO隊(duì)列喚醒整個(gè)節(jié)點(diǎn)列表(如果有的話)。

CountDownLatchcountDown代碼中看到,直接調(diào)用的是AQSreleaseShared(1),參考前面的知識(shí),這就印證了上面的說(shuō)法。

*tryReleaseShared*中正是采用CAS操作減少計(jì)數(shù)(每次減-1)。

public boolean tryReleaseShared(int releases) {
? for (;;) {
? int c = getState();
? if (c == 0)
? return false;
? int nextc = c-1;
? if (compareAndSetState(c, nextc))
? return nextc == 0;
? }
}

整個(gè)CountDownLatch就是這個(gè)樣子的。其實(shí)有了前面原子操作和AQS的原理及實(shí)現(xiàn),分析CountDownLatch還是比較容易的。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 同步工具類 同步工具類可以是任何一個(gè)對(duì)象,只要他根據(jù)自身的狀態(tài)來(lái)協(xié)調(diào)線程的控制流.阻塞隊(duì)列可以作為同步工具類,其他...
    Showdy閱讀 681評(píng)論 0 0
  • 此篇博客所有源碼均來(lái)自JDK 1.8 在上篇博客中介紹了Java四大并發(fā)工具之一的CyclicBarrier,今天...
    chenssy閱讀 2,967評(píng)論 2 27
  • 一.線程安全性 線程安全是建立在對(duì)于對(duì)象狀態(tài)訪問(wèn)操作進(jìn)行管理,特別是對(duì)共享的與可變的狀態(tài)的訪問(wèn) 解釋下上面的話: ...
    黃大大吃不胖閱讀 963評(píng)論 0 3
  • 1、線程安全與鎖 線程安全的本質(zhì),在于 存在了共享的可變狀態(tài) status, 在多線程共同操作狀態(tài)變量時(shí),當(dāng)計(jì)算的...
    軒居晨風(fēng)閱讀 414評(píng)論 1 1
  • 正常情況下,每個(gè)子線程完成各自的任務(wù)就可以結(jié)束了。不過(guò)有的時(shí)候,我們希望多個(gè)線程協(xié)同工作來(lái)完成某個(gè)任務(wù),這時(shí)就涉及...
    野夢(mèng)M閱讀 556評(píng)論 0 2

友情鏈接更多精彩內(nèi)容