并發(fā)工具類(lèi)—— CountDownLatch

概述

前段時(shí)間在解決請(qǐng)求風(fēng)控服務(wù)器超時(shí)的問(wèn)題時(shí),涉及到到一個(gè)CountDownLunch的并發(fā)工具類(lèi),非常實(shí)用,順記自然就去研究了一下相關(guān)的并發(fā)工具類(lèi)。
在JDK的并發(fā)包里(java.util.concurrent)提供了這樣幾個(gè)非常有用的并發(fā)工具類(lèi)來(lái)解決并發(fā)編程的流程控制。分別是CountDownLatch、CyclicBarrier和Semaphore。

1. CountDownLatch

1.1 CountDownLatch是什么?

CountDownLatch打多是被用在等待多線程完成,具體來(lái)說(shuō)就是允許一個(gè)或多個(gè)線程等待其他線程完成操作。

1.2 CountDownLatch原理?

API 文檔有這樣一盒解釋

A CountDownLatch is a versatile synchronization tool and can be used for a number of purposes. A CountDownLatch initialized with a count of one serves as a simple on/off latch, or gate: all threads invoking await wait at the gate until it is opened by a thread invoking countDown(). A CountDownLatch initialized to N can be used to make one thread wait until N threads have completed some action, or some action has been completed N times.

構(gòu)造函數(shù)

//Constructs a CountDownLatch initialized with the given count.
public void CountDownLatch(int count) {...}

在 CountDownLunch啟動(dòng)的時(shí)候。主線程必須在啟動(dòng)其他線程后立即調(diào)用CountDownLatch.await()方法。這樣主線程的操作就會(huì)在這個(gè)方法上阻塞,直到其他線程完成各自的任務(wù)。

public void countDown()

在每次任務(wù)執(zhí)行完直接調(diào)用,計(jì)數(shù)器就會(huì)減一操作。

public boolean await(long timeout,TimeUnit unit) throws InterruptedException
public void await() throws InterruptedException

這個(gè)方法就是用來(lái)堵塞主線程的,前者是有等待時(shí)間的,可以自定義,后者是無(wú)限等待,知道其他count 計(jì)數(shù)器為0為止。

看圖可能一下子明白

詳細(xì)的 demo 就不在這里粘貼了
如有需要傳送門(mén)

1.3 使用場(chǎng)景

超時(shí)機(jī)制

主線程里面設(shè)置好等待時(shí)間,如果發(fā)現(xiàn)在規(guī)定時(shí)間內(nèi)還是沒(méi)有返回結(jié)果,那就喚醒主線程,拋棄。

開(kāi)始執(zhí)行前等待n個(gè)線程完成各自任務(wù)

例如應(yīng)用程序啟動(dòng)類(lèi)要確保在處理用戶請(qǐng)求前,所有N個(gè)外部系統(tǒng)已經(jīng)啟動(dòng)和運(yùn)行了。

死鎖檢測(cè)

一個(gè)非常方便的使用場(chǎng)景是,你可以使用n個(gè)線程訪問(wèn)共享資源,在每次測(cè)試階段的線程數(shù)目是不同的,并嘗試產(chǎn)生死鎖。
若有不正之處請(qǐng)多多諒解,并歡迎各位大牛批評(píng)指正。

1.4 深入源碼

這里面我簡(jiǎn)單的研究了一下CountDownLunch 源碼。
底層是由AbstractQueuedSynchronizer提供支持(后面就簡(jiǎn)稱(chēng) AQS),所以其數(shù)據(jù)結(jié)構(gòu)就是AQS的數(shù)據(jù)結(jié)構(gòu),而AQS的核心就是兩個(gè)虛擬隊(duì)列:同步隊(duì)列syncQueue 和條件隊(duì)列conditionQueue(前者數(shù)據(jù)結(jié)構(gòu)是雙向鏈表,后者是單向鏈表)不同的條件會(huì)有不同的條件隊(duì)列。
本省CountDownLunch繼承的是 Object,比較簡(jiǎn)單,但是存在內(nèi)部類(lèi),Sync,繼承自AbstractQueuedSynchronizer,我簡(jiǎn)單理解一下

private static final class Sync extends AbstractQueuedSynchronizer {
        // 版本號(hào)
        private static final long serialVersionUID = 4982264981922014374L;
        
        // 構(gòu)造器
        Sync(int count) {
            setState(count);
        }
   
        // 返回當(dāng)前計(jì)數(shù)
        int getCount() {
            return getState();
        }

        // 試圖在共享模式下獲取對(duì)象狀態(tài)
        protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }

        // 試圖設(shè)置狀態(tài)來(lái)反映共享模式下的一個(gè)釋放
        protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            // 無(wú)限循環(huán)
            for (;;) {
                // 獲取狀態(tài)
                int c = getState();
                if (c == 0) // 沒(méi)有被線程占有
                    return false;
                // 下一個(gè)狀態(tài)
                int nextc = c-1;
                if (compareAndSetState(c, nextc)) // 比較并且設(shè)置成功
                    return nextc == 0;
            }
        }
    }

1.4.1核心函數(shù)分析

  • await函數(shù)
    此函數(shù)將會(huì)使當(dāng)前線程在鎖存器倒計(jì)數(shù)至零之前一直等待,除非線程被中斷。其源碼如下
public void await() throws InterruptedException{
      // 轉(zhuǎn)發(fā)到sync對(duì)象上 
      sync.acquireSharedInterruptibly(1);
}

源碼可知,對(duì)CountDownLatch對(duì)象的await的調(diào)用會(huì)轉(zhuǎn)發(fā)為對(duì)Sync的acquireSharedInterruptibly(從AQS繼承的方法)方法的調(diào)用。

public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }

這里先檢測(cè)了線程中斷狀態(tài),中斷了則拋出異常,接下來(lái)調(diào)用tryAcquireShared,tryAcquireShared是Syn的實(shí)現(xiàn)的

        protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }

其實(shí)就是簡(jiǎn)單的獲取了同步器的state,判斷是否為0.
接下來(lái)是

private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

關(guān)鍵點(diǎn) 我看到的是
parkAndCheckInterrupt()

  private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
 }

執(zhí)行到此處時(shí),線程會(huì)阻塞,知道有其他線程喚醒此線程,執(zhí)行await之后,上文中的主線程阻塞在這。


整個(gè)調(diào)用鏈路如下
  • countDown函數(shù)
    此函數(shù)將遞減鎖存器的計(jì)數(shù),如果計(jì)數(shù)到達(dá)零,則釋放所有等待的線程
void countDown() { 
    sync.releaseShared(1);
}

可以看出 對(duì)countDown的調(diào)用轉(zhuǎn)換為對(duì)Sync對(duì)象的releaseShared(從AQS繼承而來(lái))方法的調(diào)用。
這里面的具體原理能力有限,有點(diǎn)看不懂,CAS相關(guān)的東西。

1.5 小結(jié)

不得不說(shuō)countdownlatch是一個(gè)很高的線程控制工具,極大的方便了我們開(kāi)發(fā)。由于知識(shí)能力有限,上面是自己的一點(diǎn)見(jiàn)識(shí),有什么錯(cuò)誤還望提出,便于我及時(shí)改進(jìn)。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容