JUC學(xué)習(xí)筆記之CountDownLatch源碼初步理解

注:文中代碼的解釋基本上都以注釋的形式和代碼寫在一起

CountDownLatch是并發(fā)環(huán)境中常用的計(jì)數(shù)組件,也是基于AQS實(shí)現(xiàn)的。主要的方法有兩個(gè),countDown和await,實(shí)現(xiàn)了AQS模板方法的tryReleaseShared方法來完成countDown計(jì)數(shù)減的過程,實(shí)現(xiàn)了AQS模板方法的tryAcquireShared方法來實(shí)現(xiàn)await阻塞等待功能。

countDown方法

countDown方法源碼如下,直接調(diào)用了內(nèi)部類sync的releaseShared方法來實(shí)現(xiàn),這里的Sync和ReentrantLock的內(nèi)部類Sync一樣,是繼承了AQS的內(nèi)部類,releaseShared方法正是AQS提供的共享模式的模板方法。

public void countDown() {
   //直接調(diào)用了AQS的releaseShared 
   sync.releaseShared(1); 
}

AQS的releaseShared方法源碼如下

public final boolean releaseShared(int arg) {
        //tryReleaseShared方法AQS留給子類自己實(shí)現(xiàn)
        //嘗試將status減去arg,如果返回為true,執(zhí)行doRelease方法,否則返回
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }

releaseShared方法中調(diào)用了CountDownLatch中實(shí)現(xiàn)的tryReleaseShared方法,源碼如下

protected boolean tryReleaseShared(int releases) {
            //通過源碼我們發(fā)現(xiàn)傳入的參數(shù)releases并沒有什么用,每次計(jì)數(shù)固定減一
            // 無限循環(huán)直到值減一成功或者status變成0
            for (;;) {
                //獲取status的值,CountDownlatch中status的值代表要等待的總計(jì)數(shù)
                int c = getState();
                //如果已經(jīng)是0了說明已經(jīng)不能再減計(jì)數(shù)了,返回false
                if (c == 0)
                    return false;
                int nextc = c-1;
                //CAS的方式將status減一
                if (compareAndSetState(c, nextc))
                    如果當(dāng)前減完之后,status是0,也就意味著計(jì)數(shù)結(jié)束了,返回true
                    return nextc == 0;
            }
        }

tryReleaseShared方法返回為true,也就是計(jì)數(shù)結(jié)束時(shí),會(huì)接著執(zhí)行doReleaseShared方法。doReleaseShared方法在CountDownLatch中沒有重寫,直接調(diào)用的是AQS的doReleaseShared方法,源碼如下,其中unparkSuccessor源碼解析見另一篇博客《AQS源碼解析》

private void doReleaseShared() {
    //無限循環(huán)
    for (;;) {
        //獲取頭節(jié)點(diǎn)
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            //如果頭節(jié)點(diǎn)設(shè)置的是要喚醒下一個(gè)節(jié)點(diǎn)的等待狀態(tài)
            if (ws == Node.SIGNAL) {
                //將節(jié)點(diǎn)的waitStatus設(shè)置成0
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                   continue;            // loop to recheck cases
                   //如果設(shè)置成功,喚醒后面的等待節(jié)點(diǎn)
                   unparkSuccessor(h);
            }
            //ws==0說明第一步設(shè)置成功或者原先就是0
            //將其狀態(tài)設(shè)置為PROPAGATE
            //失敗(PROPAGATE狀態(tài)表示同步狀態(tài)將會(huì)無條件傳播,意思就是節(jié)點(diǎn)可運(yùn)行)
            else if (ws == 0 &&
                 !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            //如果h還是頭節(jié)點(diǎn),就結(jié)束循環(huán)
            if (h == head)                   // loop if head changed
                break;
     }
 }

await方法

await方法直接調(diào)用了內(nèi)部類Sync的acquireSharedInterruptibly方法,阻塞線程直到count為0,當(dāng)前線程才能拿到鎖(或者拋出異常也有可能結(jié)束阻塞)。源碼如下:

public void await() throws InterruptedException {
        //直接調(diào)用了內(nèi)部類Sync的方法(其實(shí)是AQS的方法)
        sync.acquireSharedInterruptibly(1);
    }

acquireSharedInterruptibly方法,如果線程被中斷過就拋出異常結(jié)束阻塞,不然就判斷計(jì)數(shù)的值,為0就返回,等于當(dāng)前阻塞的線程獲得了鎖,如果計(jì)數(shù)不為0,進(jìn)入doAcquireSharedInterruptibly方法進(jìn)行排隊(duì)等待。源碼如下:

public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        //如果線程被中斷過,拋出異常,線程不再等待 
        if (Thread.interrupted())
            throw new InterruptedException();
        //tryAcquireShared嘗試獲取共享狀態(tài)
        //tryAcquireShared源碼見下方,實(shí)際就是獲取計(jì)數(shù)
        //計(jì)數(shù)不為0則執(zhí)行doAcquireSharedInterruptibly方法
        //計(jì)數(shù)為0則返回值大于0,方法直接返回,線程阻塞結(jié)束。等于線程獲得了鎖
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }
    
protected int tryAcquireShared(int acquires) {
    return (getState() == 0) ? 1 : -1;
}

doAcquireSharedInterruptibly方法只有在線程阻塞時(shí)會(huì)被調(diào)用,也就是計(jì)數(shù)不為0時(shí)被調(diào)用。方法將當(dāng)前線程構(gòu)造為一個(gè)共享模式的等待節(jié)點(diǎn)加入等待隊(duì)列中,然后開啟無限自循環(huán),直到計(jì)數(shù)等于0獲取到鎖,或者拋出異常為止。源碼如下:

private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        //新建一個(gè)共享模式節(jié)點(diǎn)加入等待隊(duì)列
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            for (;;) {
                //獲取新建節(jié)點(diǎn)的前一個(gè)節(jié)點(diǎn)
                final Node p = node.predecessor();
                //如果前一個(gè)就是頭節(jié)點(diǎn)
                if (p == head) {
                    //說明當(dāng)前線程是第一個(gè)等待的,嘗試獲取鎖
                    //也就是獲取計(jì)數(shù)
                    int r = tryAcquireShared(arg);
                    //r不小于0說明計(jì)數(shù)已經(jīng)是0了,等于當(dāng)前線程已經(jīng)獲得了鎖
                    if (r >= 0) {
                        //將當(dāng)前線程的節(jié)點(diǎn)設(shè)置成頭節(jié)點(diǎn)
                        setHeadAndPropagate(node, r);
                        //原先的頭節(jié)點(diǎn)p從隊(duì)列中解除,便于垃圾回收
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                //shouldParkAfterFailedAcquire檢查如果獲取鎖失敗當(dāng)前節(jié)點(diǎn)是否需要掛起
                //只有當(dāng)前一個(gè)節(jié)點(diǎn)的waitStatus是SIGNAL也就是說前一個(gè)節(jié)點(diǎn)
                //獲得鎖以后會(huì)把自己喚醒,當(dāng)前節(jié)點(diǎn)才能放心掛起
                //parkAndCheckInterrupt判斷節(jié)點(diǎn)是否被中斷過
                //這里意思是如果當(dāng)前節(jié)點(diǎn)是在被掛起狀態(tài)而且被中斷過就拋出異常
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            //如果failed為true說明線程被中斷,沒有獲得鎖
            //所以取消獲取鎖的動(dòng)作
            if (failed)
                cancelAcquire(node);
        }
    }

總結(jié)來說就是await方法會(huì)讓當(dāng)前線程阻塞,進(jìn)入方法后先判斷一次計(jì)數(shù)是否為0,如果是0則直接返回,線程獲得鎖,阻塞結(jié)束。如果不為0則進(jìn)入doAcquireSharedInterruptibly方法,將當(dāng)前線程構(gòu)造成了一個(gè)等待節(jié)點(diǎn),開啟無限循環(huán),線程被阻塞。無限循環(huán)直到當(dāng)前線程的節(jié)點(diǎn)排隊(duì)排到了頭節(jié)點(diǎn)的后面,就可以嘗試獲得鎖了,如果成功了就可以返回,阻塞結(jié)束。在排隊(duì)的過程中如果線程被中斷那么就拋出異常。簡而言之阻塞是由無限的for循環(huán)造成的,所以結(jié)束循環(huán)就是線程結(jié)束阻塞的關(guān)鍵了。

提問:await方法支持多個(gè)線程一起等待嗎
回答:支持的。從源碼角度看,await方法并沒有做任何的同步控制,多個(gè)線程等待和一個(gè)線程等待,結(jié)果沒有什么不同,所有等待的線程都會(huì)阻塞到status為0,阻塞過程中這些線程就乖乖的在隊(duì)列里等待。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容