Java并發(fā)編程——CountDownLatch

1. 簡(jiǎn)介

在上篇中我們介紹了SyclicBarrier類的使用,通過(guò)SyclicBarrier我們可以完成一些分批執(zhí)行匯總的任務(wù),而此次介紹的CountDownLatch則是實(shí)現(xiàn)類似“倒計(jì)時(shí)”的功能。

2. Api分析

CountDownLatch源碼很簡(jiǎn)潔,它提供兩個(gè)典型的方法來(lái)實(shí)現(xiàn)“倒計(jì)時(shí)功能”。CountDownLatch在構(gòu)造方法中指定門(mén)閂鎖latch的個(gè)數(shù),當(dāng)latch的個(gè)數(shù)為0的時(shí)候則喚醒所有等待狀態(tài)下的線程。

  • countDown():通過(guò)調(diào)用countDown方法實(shí)現(xiàn)門(mén)閂鎖-1的效果。
  • await():讓當(dāng)前線程進(jìn)入等待。
  • getCount():獲取門(mén)閂鎖的個(gè)數(shù)。

下面我們通過(guò)一個(gè)示例代碼來(lái)看一下:

public class MyTest {
    static CountDownLatch countDownLatch;
    public static void main(String[]args){
        countDownLatch = new CountDownLatch(3);
        for(int i=0;i<3;i++){
            new Thread(new Runnable() {
                
                @Override
                public void run() {
                    System.out.println(Thread.currentThread().getName() + "--到達(dá)車門(mén)");
                    countDownLatch.countDown();
                    System.out.println(Thread.currentThread().getName() + "--已登車");
                }
            }).start();
        }
        
        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(Thread.currentThread().getName() + "--登車完畢");
    }
}

在示例代碼中,通過(guò)構(gòu)造函數(shù)創(chuàng)建一個(gè)指定latch個(gè)數(shù)為3的CountDownLatch的方法,同時(shí)創(chuàng)建了三個(gè)線程,在每個(gè)線程的內(nèi)部分別調(diào)用一次countDown()方法,最后在主線程中調(diào)用await方法進(jìn)行等待阻塞,最后完成輸出。
執(zhí)行結(jié)果:

Thread-2--到達(dá)車門(mén)
Thread-2--已登車
Thread-0--到達(dá)車門(mén)
Thread-0--已登車
Thread-1--到達(dá)車門(mén)
main--登車完畢
Thread-1--已登車

從上面可以得知CountDownLatch以下特點(diǎn):

  1. 線程中執(zhí)行countDown方法后能繼續(xù)執(zhí)行后續(xù)代碼,線程不會(huì)阻塞等待;
  2. latch個(gè)數(shù)為0的時(shí)候會(huì)立即喚醒a(bǔ)wait等待的線程;

3. CountDownLatch源碼解析

結(jié)合上面對(duì)CountDownLatch功能的描述,假如讓我們實(shí)現(xiàn)一個(gè)這樣的功能,我們首先想到的思路應(yīng)該是在工具類中維持一個(gè)Count計(jì)數(shù)變量,然后維持對(duì)該變量的判斷。那么我們來(lái)看看CountDownLatch是怎么實(shí)現(xiàn)的。

public class CountDownLatch {
    /**
     * Synchronization control For CountDownLatch.
     * Uses AQS state to represent count.
     */
    private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 4982264981922014374L;

        Sync(int count) {
            setState(count);
        }

        int getCount() {
            return getState();
        }

        protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }

        protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }
    }

    private final Sync sync;

    public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }

    /**
     * 讓當(dāng)前線程進(jìn)行等待,直到鎖個(gè)數(shù)為0或者當(dāng)前線程interrupt
     *  如果當(dāng)前鎖latch個(gè)數(shù)為0,則立即返回
     */
    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

    public boolean await(long timeout, TimeUnit unit)
        throws InterruptedException {
        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
    }

    /**
     * 減少latch鎖,當(dāng)latch鎖個(gè)數(shù)為0的時(shí)候釋放所有等待線程。
     */
    public void countDown() {
        sync.releaseShared(1);
    }

    public long getCount() {
        return sync.getCount();
    }

    public String toString() {
        return super.toString() + "[Count = " + sync.getCount() + "]";
    }
}

首先從構(gòu)造方法來(lái)看,CountDownLatch提供了有參構(gòu)造函數(shù):

public CountDownLatch(int count) {
    if (count < 0) throw new IllegalArgumentException("count < 0");
    this.sync = new Sync(count);
}

這里的Sync類是CountDownLatch內(nèi)部集成AQS實(shí)現(xiàn)的一個(gè)內(nèi)部類。

Sync(int count) {
    setState(count);
}

在Sync的構(gòu)造函數(shù)中,調(diào)用的是AQS的setState方法修改state值,將state值修改為count值。

通過(guò)上面的介紹,對(duì)于CountDownLatch類來(lái)說(shuō),使用最多的就是countDown和await方法。

countDown()方法

public void countDown() {
    sync.releaseShared(1);
}

/**AQS類中定義的該方法*/
public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

在countDown方法內(nèi)部是調(diào)用Sync的releaseShared方法,嘗試釋放公共鎖狀態(tài)。

protected boolean tryReleaseShared(int releases) {
    // Decrement count; signal when transition to zero
    for (;;) {
        int c = getState();
        if (c == 0)
            return false;
        int nextc = c-1;
        if (compareAndSetState(c, nextc))
            return nextc == 0;
    }
}

采用自旋的方式,獲取當(dāng)前state值,如果state值為0就返回false,即釋放失敗。反之則state-1,通過(guò)CAS操作成功,返回nextc == 0;當(dāng)最后通過(guò)咨詢的方式返回true的時(shí)候(state ==0),則執(zhí)行releaseShared方法中的doReleaseShared()方法,在doReleaseShared()方法內(nèi)部通過(guò)unparkSuccessor()方法喚醒阻塞的線程開(kāi)始執(zhí)行。

await()方法

public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

/***AQS中的方法*/
public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}

protected int tryAcquireShared(int acquires) {
    return (getState() == 0) ? 1 : -1;
}

這里調(diào)用acquireSharedInterruptibly()方法申請(qǐng)鎖,如果鎖被占有state!=0,則通過(guò)doAcquireSharedInterruptibly()進(jìn)行鎖申請(qǐng)。

4. CountDownLatch的使用場(chǎng)景

  • 確保某個(gè)計(jì)算在其需要的所有資源都被初始化之后才繼續(xù)執(zhí)行。
  • 確保某個(gè)服務(wù)在其依賴的所有其他服務(wù)都已啟動(dòng)后才啟動(dòng)。
  • 等待知道某個(gè)操作的所有者都就緒在繼續(xù)執(zhí)行。
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容