圖解java.util.concurrent源碼(五) CountDownLatch

引言


今天分享一個比較簡短一些的源碼,那就是concurrent包中我們經(jīng)常使用的CountDownLatch同步器,"latch"在英文中也是鎖的意思,翻譯成中文就是“倒數(shù)鎖”,當你調(diào)用了這個類型對象中的await方法后,必須要等待這個鎖倒數(shù)到0,才能繼續(xù)運行。

這個類的源碼非常短,因為其實它就是對AQS共享模式的一個簡單實現(xiàn)而已,如果你還不理解AQS的話,可以去看看我這個系列的第一篇文章第三篇文章,他們詳細介紹了AQS的使用與原理,以及AQS經(jīng)典的兩個實現(xiàn)類ReentrantLock與Semophore。

Demo


雖然這篇文章重點講的是實現(xiàn),但是出于完整性,這里也給個demo示意其使用,如果你已經(jīng)知道這個類的使用方法的話,直接跳過這一部分即可。

假設現(xiàn)在有一個大小為15的int數(shù)組(全部為0),我想用三個線程將數(shù)組中的內(nèi)容全部置位1(每個線程只需要將5個數(shù)組元素置位1即可),使用CountDownLatch即可實現(xiàn)這個過程的同步:

package org.du.test.blogdemo;

import java.util.Arrays;
import java.util.concurrent.CountDownLatch;

public class CountDownLatchDemo {

    private static class Fill extends Thread{

        private int start;

        private int end;

        private int[] array;

        private CountDownLatch countDownLatch;

        public Fill(int start, int end, int[] array, CountDownLatch countDownLatch) {
            this.start = start;
            this.end = end;
            this.array = array;
            this.countDownLatch = countDownLatch;
        }

        @Override
        public void run() {
            for ( int i = start; i < end; i++ ){
                array[i] = 1;
            }
            //該部分任務完成,倒數(shù)一個
            countDownLatch.countDown();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        //任務只有3個部分,所以倒數(shù)三下即可,構(gòu)造函數(shù)中的3代表需要倒數(shù)的次數(shù)
        CountDownLatch countDownLatch = new CountDownLatch(3);

        int[] array = new int[15];

        new Fill(0, 5, array, countDownLatch).start();
        new Fill(5, 10, array, countDownLatch).start();
        new Fill(10, 15, array, countDownLatch).start();

        //等待倒數(shù)完成
        countDownLatch.await();

        System.out.printf(Arrays.toString(array));
    }

}

程序最后輸出:

所有的元素都被置1了,說明線程同步是正確的。

需要注意的一點是CountDownLatch的計數(shù)是無法被重置的,如果你的場景總是需要重置計數(shù)的話,最好考慮使用CyclicBarrier(循環(huán)柵欄),我的下一篇文章將會詳細分析CyclicBarrier。

實現(xiàn)分析


從上面的demo可以看出,CountDownLatch的核心方法其實就只有兩個:

  • countDown: 用于計數(shù)器倒數(shù)一次
  • await: 等待計數(shù)器倒數(shù)完成

點開這兩個方法看一下:

    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }
    public void countDown() {
        sync.releaseShared(1);
    }

發(fā)現(xiàn)有點類似與之前講過的Semaphore,因為他們都是AQS在共享模式下的實現(xiàn),套路是一樣的。

Sync內(nèi)部類實現(xiàn)了AQS接口:

    private static final class Sync extends AbstractQueuedSynchronizer {
                                              ...

而當你調(diào)用CountDownLatch(int count)構(gòu)造方法時會實例化一個sync實例:

    public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        //實例化sync同步器
        this.sync = new Sync(count);
    }

繼續(xù)查看Sync的構(gòu)造器的代碼:

        Sync(int count) {
            //這里AQS的state代表計數(shù)器的值
            setState(count);
        }

回憶一下AQS的相關知識,AQS允許子類通過state變量進行同步,在ReentrantLock的實現(xiàn)中,state同步變量被用作記錄線程的沖入次數(shù),而在Semaphore的實現(xiàn)中,state同步變量被用作記錄剩余許可數(shù),這里可以看出:

  • CountDownLatch中的AQS同步狀態(tài)被用作記錄剩余倒數(shù)次數(shù)

countDown的實現(xiàn)

從上面可以看到countDown方法本質(zhì)上調(diào)用的是AQS的releaseShared,回憶一下AQS的知識,releaseShared本質(zhì)上是調(diào)用子類實現(xiàn)的tryReleaseShared類嘗試釋放鎖的,Sync的實現(xiàn)如下:

        protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            //使用CAS循環(huán)更新  計數(shù)器 減一
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }

可以看出就是一個CAS循環(huán),不把同步狀態(tài)state減1誓不擺休。其中compareAndSetState是由父類AQS提供的用于操作同步狀態(tài)state的方法。

await的實現(xiàn)

await調(diào)用的其實就是AQS的acquireSharedInterruptibly方法,而acquireSharedInterruptibly本質(zhì)上是調(diào)用子類實現(xiàn)的tryAcquireShared方法類不斷嘗試獲得鎖的,Sync中的實現(xiàn)如下:

        protected int tryAcquireShared(int acquires) {
            //返回1會不停地傳播下去
            return (getState() == 0) ? 1 : -1;
        }

代碼只有一行,但是要看懂的話需要了解一些AQS共享模式的知識,如果子類實現(xiàn)的tryAcquireShared返回一個大于等于0的數(shù)字的話,線程獲得鎖,除了會將自己的節(jié)點設置為頭結(jié)點外,還會繼續(xù)喚醒后繼的一個節(jié)點(代碼中稱之為“傳播”, PROPAGATE);如果返回0的話,則線程獲得鎖,然后就像獨占模式中一樣將自己的節(jié)點設置為頭節(jié)點;返回一個小于0的數(shù)字的話,則線程無法獲得鎖,等待它的就將是被阻塞。

這里畫了張表格總結(jié)了一下:

tryAcquireShared返回值 導致的行為
>0 線程獲得鎖,將自己的節(jié)點設置為頭結(jié)點,繼續(xù)喚醒后繼的一個節(jié)點
=0 線程獲得鎖,將自己的節(jié)點設置為頭結(jié)點
<0 線程無法獲得鎖

這樣我們就能看懂這個返回值了,如果getState發(fā)現(xiàn)同步狀態(tài)已經(jīng)倒數(shù)到0了,則一直返回1,這樣就能夠從頭節(jié)點不停地傳播下去,直到喚醒所有正在await的線程。

如果發(fā)現(xiàn)還沒有倒數(shù)到0,則始終返回-1,這樣所有正在await的線程就會一直阻塞下去。

End


這一篇分享比較短,有劃水的嫌疑,其實我是在為下一篇介紹CyclicBarrier的文章作鋪墊。

滑稽.jpg
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容