原理剖析(第 007 篇)CountDownLatch工作原理分析

原理剖析(第 007 篇)CountDownLatch工作原理分析

一、大致介紹

1、在前面章節(jié)了解了CAS、AQS后,想必大家已經(jīng)對(duì)這塊知識(shí)有了深刻的了解了;
2、而JDK中有一個(gè)關(guān)于計(jì)數(shù)同步器的工具類(lèi),它也是基于AQS實(shí)現(xiàn)的;
3、那么本章節(jié)就和大家分享分析一下JDK1.8的CountDownLatch的工作原理; 

二、簡(jiǎn)單認(rèn)識(shí)CountDownLatch

2.1 何為CountDownLatch?

1、CountDownLatch從英文字面上理解,count計(jì)數(shù)做down的減法動(dòng)作,而Latch又是門(mén)閂的意思;

2、CountDownLatch是一種同步幫助,允許一個(gè)或多個(gè)線(xiàn)程等待,直到在其他線(xiàn)程中執(zhí)行的一組操作完成。;

3、CountDownLatch內(nèi)部沒(méi)有所謂的公平鎖\非公平鎖的靜態(tài)內(nèi)部類(lèi),只有一個(gè)Sync靜態(tài)內(nèi)部類(lèi),CountDownLatch內(nèi)部基本上也是通過(guò)sync.xxx之類(lèi)的這種調(diào)用方式的;

4、CountDownLatch內(nèi)部維護(hù)了一個(gè)虛擬的資源池,如果許可數(shù)不為為0一直線(xiàn)程阻塞等待,直到許可數(shù)為0時(shí)才釋放繼續(xù)往下執(zhí)行;

2.2 CountDownLatch的state關(guān)鍵詞

1、其實(shí)CountDownLatch的實(shí)現(xiàn)也恰恰很好利用了其父類(lèi)AQS的state變量值;

2、初始化一個(gè)數(shù)量值作為計(jì)數(shù)器的默認(rèn)值,假設(shè)為N,那么當(dāng)任何線(xiàn)程調(diào)用一次countDown則計(jì)數(shù)值減1,直到許可為0時(shí)才釋放等待;

3、CountDownLatch,簡(jiǎn)單大致意思為:A組線(xiàn)程等待另外B組線(xiàn)程,B組線(xiàn)程執(zhí)行完了,A組線(xiàn)程才可以執(zhí)行;

2.3 常用重要的方法

1、public CountDownLatch(int count)
   // 創(chuàng)建一個(gè)給定許計(jì)數(shù)值的計(jì)數(shù)同步器對(duì)象

2、public void await()
   // 入隊(duì)等待,直到計(jì)數(shù)器值為0則釋放等待

3、public void countDown()
   // 釋放許可,計(jì)數(shù)器值減1,若計(jì)數(shù)器值為0則觸發(fā)釋放無(wú)用結(jié)點(diǎn)
   
4、public long getCount() 
   // 獲取目前最新的共享資源計(jì)數(shù)器值

2.4 設(shè)計(jì)與實(shí)現(xiàn)偽代碼

1、獲取共享鎖:
    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

    await{
        如果檢測(cè)中斷狀態(tài)發(fā)現(xiàn)被中斷過(guò)的話(huà),那么則拋出InterruptedException異常
        如果嘗試獲取共享鎖失敗的話(huà)( 嘗試獲取共享鎖的各種方式由AQS的子類(lèi)實(shí)現(xiàn) ),
        那么就新增共享鎖結(jié)點(diǎn)通過(guò)自旋操作加入到隊(duì)列中,然后通過(guò)調(diào)用LockSupport.park進(jìn)入阻塞等待,直到計(jì)數(shù)器值為零才釋放等待
    }
    
    
2、釋放共享鎖:
    public void countDown() {
        sync.releaseShared(1);
    }
    
    release{
        如果嘗試釋放共享鎖失敗的話(huà)( 嘗試釋放共享鎖的各種方式由AQS的子類(lèi)實(shí)現(xiàn) ),
        那么通過(guò)自旋操作完成阻塞線(xiàn)程的喚起操作
    }

2.5、CountDownLatch生活細(xì)節(jié)化理解

比如百米賽跑,我就以賽跑為例生活化闡述該CountDownLatch原理:

1、場(chǎng)景:百米賽跑十人參賽,終點(diǎn)處有一個(gè)裁判計(jì)數(shù);

2、開(kāi)跑一聲槍響,十個(gè)人爭(zhēng)先恐后的向終點(diǎn)跑去,真的是振奮多秒,令人振奮;

3、當(dāng)一個(gè)人到達(dá)終點(diǎn),這個(gè)人就完成了他的賽跑事情了,就沒(méi)事一邊玩去了,那么裁判則減去一個(gè)人;

4、隨著人員陸陸續(xù)續(xù)的都跑到了終點(diǎn),最后裁判計(jì)數(shù)顯示還有0個(gè)人未到達(dá),意思就是人員都達(dá)到了;

5、然后裁判就拿著登記的成績(jī)屁顛屁顛去輸入電腦登記了;

8、到此打止,這一系列的動(dòng)作認(rèn)為是A組線(xiàn)程等待另外其他組線(xiàn)程的操作,直到計(jì)數(shù)器為零,那么A則再干其他事情;

三、源碼分析CountDownLatch

3.1、CountDownLatch構(gòu)造器

1、構(gòu)造器源碼:
    /**
     * Constructs a {@code CountDownLatch} initialized with the given count.
     *
     * @param count the number of times {@link #countDown} must be invoked
     *        before threads can pass through {@link #await}
     * @throws IllegalArgumentException if {@code count} is negative
     */
    public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }
    
2、創(chuàng)建一個(gè)給定許計(jì)數(shù)值的計(jì)數(shù)同步器對(duì)象,計(jì)數(shù)器值必須大于零,count值最后賦值給了state這個(gè)共享資源值;

3.2、Sync同步器

1、AQS --> Sync
                  
2、CountDownLatch內(nèi)的同步器都是通過(guò)Sync抽象接口來(lái)操作調(diào)用關(guān)系的,細(xì)看會(huì)發(fā)現(xiàn)基本上都是通過(guò)sync.xxx之類(lèi)的這種調(diào)用方式的;

3.3、await()

1、源碼:
    /**
     * Causes the current thread to wait until the latch has counted down to
     * zero, unless the thread is {@linkplain Thread#interrupt interrupted}.
     *
     * // 導(dǎo)致當(dāng)前線(xiàn)程等待,直到計(jì)數(shù)器值減為零則釋放等待,或者由于線(xiàn)程被中斷也可導(dǎo)致釋放等待;
     *
     * <p>If the current count is zero then this method returns immediately.
     *
     * <p>If the current count is greater than zero then the current
     * thread becomes disabled for thread scheduling purposes and lies
     * dormant until one of two things happen:
     * <ul>
     * <li>The count reaches zero due to invocations of the
     * {@link #countDown} method; or
     * <li>Some other thread {@linkplain Thread#interrupt interrupts}
     * the current thread.
     * </ul>
     *
     * <p>If the current thread:
     * <ul>
     * <li>has its interrupted status set on entry to this method; or
     * <li>is {@linkplain Thread#interrupt interrupted} while waiting,
     * </ul>
     * then {@link InterruptedException} is thrown and the current thread's
     * interrupted status is cleared.
     *
     * @throws InterruptedException if the current thread is interrupted
     *         while waiting
     */
    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }
    
2、await此方法被調(diào)用后,則一直會(huì)處于等待狀態(tài),其核心還是由于調(diào)用了LockSupport.park進(jìn)入阻塞等待;
   當(dāng)計(jì)數(shù)器值state=0時(shí)可以打破等待現(xiàn)狀,當(dāng)然還有線(xiàn)程被中斷后也可以打破線(xiàn)程等待現(xiàn)狀;

3.4、acquireSharedInterruptibly(int)

1、源碼:
    /**
     * Acquires in shared mode, aborting if interrupted.  Implemented
     * by first checking interrupt status, then invoking at least once
     * {@link #tryAcquireShared}, returning on success.  Otherwise the
     * thread is queued, possibly repeatedly blocking and unblocking,
     * invoking {@link #tryAcquireShared} until success or the thread
     * is interrupted.
     * @param arg the acquire argument.
     * This value is conveyed to {@link #tryAcquireShared} but is
     * otherwise uninterpreted and can represent anything
     * you like.
     * @throws InterruptedException if the current thread is interrupted
     */
    public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted()) // 調(diào)用之前先檢測(cè)該線(xiàn)程中斷標(biāo)志位,檢測(cè)該線(xiàn)程在之前是否被中斷過(guò)
            throw new InterruptedException(); // 若被中斷過(guò)的話(huà),則拋出中斷異常
        if (tryAcquireShared(arg) < 0) // 嘗試獲取共享資源鎖,小于0則獲取失敗,此方法由AQS的具體子類(lèi)實(shí)現(xiàn)
            doAcquireSharedInterruptibly(arg); // 將嘗試獲取鎖資源的線(xiàn)程進(jìn)行入隊(duì)操作
    }
    
2、由于是實(shí)現(xiàn)同步計(jì)數(shù)器功能,所以tryAcquireShared首次調(diào)用必定小于0,則就順利了進(jìn)入了doAcquireSharedInterruptibly線(xiàn)程等待;
   至于首次調(diào)用為什么會(huì)小于0,請(qǐng)看子類(lèi)的實(shí)現(xiàn),子類(lèi)的實(shí)現(xiàn)判斷為 "(getState() == 0) ? 1 : -1" ;

3.5、tryAcquireShared(int)

1、源碼:
    protected int tryAcquireShared(int acquires) {
        return (getState() == 0) ? 1 : -1; // 計(jì)數(shù)器值與零比較判斷,小于零則獲取鎖失敗,大于零則獲取鎖成功
    }
    
2、嘗試獲取共享鎖資源,但是在計(jì)數(shù)器CountDownLatch這個(gè)功能中,小于零則需要入隊(duì),進(jìn)入阻塞隊(duì)列進(jìn)行等待;大于零則喚醒等待隊(duì)列,釋放await方法的阻塞等待;

3.6、doAcquireSharedInterruptibly(int)

1、源碼:
    /**
     * Acquires in shared interruptible mode.
     * @param arg the acquire argument
     */
    private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        // 按照給定的mode模式創(chuàng)建新的結(jié)點(diǎn),模式有兩種:Node.EXCLUSIVE獨(dú)占模式、Node.SHARED共享模式;
        final Node node = addWaiter(Node.SHARED); // 創(chuàng)建共享模式的結(jié)點(diǎn)
        boolean failed = true;
        try {
            for (;;) { // 自旋的死循環(huán)操作方式
                final Node p = node.predecessor(); // 獲取結(jié)點(diǎn)的前驅(qū)結(jié)點(diǎn)
                if (p == head) { // 若前驅(qū)結(jié)點(diǎn)為head的話(huà),那么說(shuō)明當(dāng)前結(jié)點(diǎn)自然不用說(shuō)了,僅次于老大之后的便是老二了咯
                    int r = tryAcquireShared(arg); // 而且老二也希望嘗試去獲取一下鎖,萬(wàn)一頭結(jié)點(diǎn)恰巧剛剛釋放呢?希望還是要有的,萬(wàn)一實(shí)現(xiàn)了呢。。。
                    if (r >= 0) { // 若r>=0,說(shuō)明已經(jīng)成功的獲取到了共享鎖資源
                        setHeadAndPropagate(node, r); // 把當(dāng)前node結(jié)點(diǎn)設(shè)置為頭結(jié)點(diǎn),并且調(diào)用doReleaseShared釋放一下無(wú)用的結(jié)點(diǎn)
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                    
                    // 但是在await方法首次被調(diào)用會(huì)流轉(zhuǎn)到此,這個(gè)時(shí)候獲取鎖資源會(huì)失敗,即r<0,所以會(huì)進(jìn)入是否需要休眠的判斷
                    // 但是第一次進(jìn)入休眠方法,因?yàn)楸粍?chuàng)建的結(jié)點(diǎn)waitStatus=0,所以會(huì)被修改一次為SIGNAL狀態(tài),再次循環(huán)一次
                    // 而第二次循環(huán)進(jìn)入shouldParkAfterFailedAcquire方法時(shí),返回true就是需要休眠,則順利調(diào)用park方式阻塞等待
                }
                if (shouldParkAfterFailedAcquire(p, node) && // 根據(jù)前驅(qū)結(jié)點(diǎn)看看是否需要休息一會(huì)兒
                    parkAndCheckInterrupt()) // 阻塞操作,正常情況下,獲取不到共享鎖,代碼就在該方法停止了,直到被喚醒
                    // 被喚醒后,發(fā)現(xiàn)parkAndCheckInterrupt()里面檢測(cè)了被中斷了的話(huà),則補(bǔ)上中斷異常,因此拋了個(gè)異常
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
    
2、doAcquireSharedInterruptibly在實(shí)現(xiàn)計(jì)數(shù)器原理的時(shí)候,主要的干的事情就是等待再等待,等到計(jì)數(shù)器值為零時(shí)才蘇醒;

3.7、countDown()

1、源碼:
    /**
     * Decrements the count of the latch, releasing all waiting threads if
     * the count reaches zero.
     *
     * <p>If the current count is greater than zero then it is decremented.
     * If the new count is zero then all waiting threads are re-enabled for
     * thread scheduling purposes.
     *
     * <p>If the current count equals zero then nothing happens.
     */
    public void countDown() {
        sync.releaseShared(1); // 釋放一個(gè)許可資源 
    }
    
2、釋放許可資源,也就是計(jì)數(shù)器值不斷的做減1操作,當(dāng)計(jì)數(shù)器值為零的時(shí)候,該方法將會(huì)釋放所有正在等待的線(xiàn)程隊(duì)列;
   至于為什么還會(huì)釋放所有,請(qǐng)看后續(xù)的releaseShared(int arg)講解;

3.8、releaseShared(int)

1、源碼:
    /**
     * Releases in shared mode.  Implemented by unblocking one or more
     * threads if {@link #tryReleaseShared} returns true.
     *
     * @param arg the release argument.  This value is conveyed to
     *        {@link #tryReleaseShared} but is otherwise uninterpreted
     *        and can represent anything you like.
     * @return the value returned from {@link #tryReleaseShared}
     */
    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) { // 嘗試釋放共享鎖資源,此方法由AQS的具體子類(lèi)實(shí)現(xiàn)
            doReleaseShared(); // 自旋操作,喚醒后繼結(jié)點(diǎn)
            return true; // 返回true表明所有線(xiàn)程已釋放
        }
        return false; // 返回false表明目前還沒(méi)釋放完全,只要計(jì)數(shù)器值不為零的話(huà),那么都會(huì)返回false
    }
    
2、releaseShared方法首先就判斷了tryReleaseShared(arg)的返回值,但是計(jì)數(shù)器值只要不為零,都會(huì)返回false,因此releaseShared該方法就立馬返回false了;

3、所以當(dāng)計(jì)數(shù)器值慢慢減至零時(shí),則立馬返回true,那么也就立馬會(huì)調(diào)用doReleaseShared釋放所有等待的線(xiàn)程隊(duì)列;

3.9、tryReleaseShared(int)

1、源碼:
    // CountDownLatch 的靜態(tài)內(nèi)部類(lèi) Sync 類(lèi)的 tryReleaseShared 方法    
    protected boolean tryReleaseShared(int releases) {
        // Decrement count; signal when transition to zero
        for (;;) { // 自旋的死循環(huán)操作方式
            int c = getState(); // 獲取最新的計(jì)數(shù)器值
            if (c == 0) // 若計(jì)數(shù)器值為零,說(shuō)明已經(jīng)通過(guò)CAS操作減至零了,所以在并發(fā)中讀取到零時(shí)并不需要做什么操作,因此返回false
                return false;
            int nextc = c-1; // 計(jì)數(shù)器值減1操作
            if (compareAndSetState(c, nextc)) // 通過(guò)CAS比較,順利情況下設(shè)置成功返回true
                return nextc == 0; // 當(dāng)通過(guò)計(jì)算操作得到的nextc為零時(shí)通過(guò)CAS修改成功,那么表明所有事情都已經(jīng)做完,需要釋放所有等待的線(xiàn)程隊(duì)列
                
            // 若CAS失敗,想都不用想肯定是由于并發(fā)操作,導(dǎo)致CAS失敗,那么唯一可做的就是下一次循環(huán)查看是否已經(jīng)被其他線(xiàn)程處理了
        }
    }
    
2、CountDownLatch的靜態(tài)內(nèi)部類(lèi)實(shí)現(xiàn)父類(lèi)AQS的方法,用來(lái)處理如何釋放鎖,籠統(tǒng)的講,若返回負(fù)數(shù)則需要進(jìn)入阻塞隊(duì)列,否則需要釋放所有等待隊(duì)列;

3.10、doReleaseShared()

1、源碼:
    /**
     * Release action for shared mode -- signals successor and ensures
     * propagation. (Note: For exclusive mode, release just amounts
     * to calling unparkSuccessor of head if it needs signal.)
     */
    private void doReleaseShared() {
        /*
         * Ensure that a release propagates, even if there are other
         * in-progress acquires/releases.  This proceeds in the usual
         * way of trying to unparkSuccessor of head if it needs
         * signal. But if it does not, status is set to PROPAGATE to
         * ensure that upon release, propagation continues.
         * Additionally, we must loop in case a new node is added
         * while we are doing this. Also, unlike other uses of
         * unparkSuccessor, we need to know if CAS to reset status
         * fails, if so rechecking.
         */
        for (;;) { // 自旋的死循環(huán)操作方式
            Node h = head; // 每次都是取出隊(duì)列的頭結(jié)點(diǎn)
            if (h != null && h != tail) { // 若頭結(jié)點(diǎn)不為空且也不是隊(duì)尾結(jié)點(diǎn)
                int ws = h.waitStatus; // 那么則獲取頭結(jié)點(diǎn)的waitStatus狀態(tài)值
                if (ws == Node.SIGNAL) { // 若頭結(jié)點(diǎn)是SIGNAL狀態(tài)則意味著頭結(jié)點(diǎn)的后繼結(jié)點(diǎn)需要被喚醒了
                    // 通過(guò)CAS嘗試設(shè)置頭結(jié)點(diǎn)的狀態(tài)為空狀態(tài),失敗的話(huà),則繼續(xù)循環(huán),因?yàn)椴l(fā)有可能其它地方也在進(jìn)行釋放操作
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    unparkSuccessor(h); // 喚醒頭結(jié)點(diǎn)的后繼結(jié)點(diǎn)
                }
                // 如頭結(jié)點(diǎn)為空狀態(tài),則把其改為PROPAGATE狀態(tài),失敗的則可能是因?yàn)椴l(fā)而被改動(dòng)過(guò),則再次循環(huán)處理
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            // 若頭結(jié)點(diǎn)沒(méi)有發(fā)生什么變化,則說(shuō)明上述設(shè)置已經(jīng)完成,大功告成,功成身退
            // 若發(fā)生了變化,可能是操作過(guò)程中頭結(jié)點(diǎn)有了新增或者啥的,那么則必須進(jìn)行重試,以保證喚醒動(dòng)作可以延續(xù)傳遞
            if (h == head)                   // loop if head changed
                break;
        }
    }
    
2、主要目的是釋放線(xiàn)程中所有等待的隊(duì)列,當(dāng)計(jì)數(shù)器值為零時(shí),此方法馬上會(huì)被調(diào)用,通過(guò)自旋方式輪詢(xún)干掉所有等待的隊(duì)列;

四、總結(jié)

1、有了分析AQS的基礎(chǔ)后,再來(lái)分析CountDownLatch便快了很多;

2、在這里我簡(jiǎn)要總結(jié)一下CountDownLatch的流程的一些特性:
    ? 管理一個(gè)大于零的計(jì)數(shù)器值;
    ? 每countDown一次則state就減1一次,直到許可證數(shù)量等于0則釋放隊(duì)列中所有的等待線(xiàn)程;
    ? 也可以通過(guò)countDown/await組合一起使用,來(lái)實(shí)現(xiàn)CyclicBarrier的功能;

五、下載地址

https://gitee.com/ylimhhmily/SpringCloudTutorial.git

SpringCloudTutorial交流QQ群: 235322432

SpringCloudTutorial交流微信群: 微信溝通群二維碼圖片鏈接

歡迎關(guān)注,您的肯定是對(duì)我最大的支持!!!

<上一篇????????首頁(yè)????????下一篇>

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容