并發(fā)輔助類CountDownLatch的使用和源碼

CountDownLatch類位于java.util.concurrent包下,利用它可以實現(xiàn)類似計數(shù)器的功能。比如有一個任務A,它要等待其他10個線程的任務執(zhí)行完畢之后才能執(zhí)行,此時就可以利用CountDownLatch來實現(xiàn)這種功能了。

CountDownLatch是通過一個計數(shù)器來實現(xiàn)的,計數(shù)器的初始值為那10個線程的數(shù)量也就是10。每當一個線程完成了自己的任務后,計數(shù)器的值就會減1。當計數(shù)器值到達0時,它表示所有的線程已經(jīng)完成了任務,然后在閉鎖上await()等待的線程就可以恢復執(zhí)行任務。

下面我們來詳細分析

構造函數(shù):

public CountDownLatch(int count) {  };  //參數(shù)count為計數(shù)值,也就是需要等幾個線程結(jié)束的個數(shù)

它有三個主要方法:

public void await() throws InterruptedException { };   
public boolean await(long timeout, TimeUnit unit) throws InterruptedException { };  
public void countDown() { };  

await():調(diào)用await()方法的線程會被掛起,它會等待直到count值為0才繼續(xù)執(zhí)行
await(long timeout, TimeUnit unit):和await()類似,只不過等待一定的時間后count值還沒變?yōu)?的話就會繼續(xù)執(zhí)行
countDown() :將count值減1,通常在等待的線程完成時調(diào)用,當10個線程都執(zhí)行完,都減1后,count值為0,被掛起的線程就可以啟動了。

實例:使用await在主線程阻塞,當每個子線程執(zhí)行完了,就調(diào)用latch.countDown()一次,知道最后count的值為0,才解開主線程的等待;

public static void main(String[] args) {

        final CountDownLatch latch = new CountDownLatch(2);

        new Thread() {

            public void run() {

                System.out.println("子線程" + Thread.currentThread().getName() + "正在執(zhí)行");

                System.out.println("子線程" + Thread.currentThread().getName() + "執(zhí)行完畢");

                latch.countDown();

            };

        }.start();

        new Thread() {

            public void run() {

                System.out.println("子線程" + Thread.currentThread().getName() + "正在執(zhí)行");

                System.out.println("子線程" + Thread.currentThread().getName() + "執(zhí)行完畢");

                latch.countDown();

            };

        }.start();

        try {

            System.out.println("等待2個子線程執(zhí)行完畢...");

            latch.await();

            System.out.println("2個子線程已經(jīng)執(zhí)行完畢");

            System.out.println("繼續(xù)執(zhí)行主線程");

        } catch (InterruptedException e) {

            e.printStackTrace();

        }

    }

輸出結(jié)果:

子線程Thread-0正在執(zhí)行
子線程Thread-0執(zhí)行完畢
等待2個子線程執(zhí)行完畢...
子線程Thread-1正在執(zhí)行
子線程Thread-1執(zhí)行完畢
2個子線程已經(jīng)執(zhí)行完畢
繼續(xù)執(zhí)行主線程

從輸出上我們可以知道這個CountDownLatch的使用方法和執(zhí)行過程了,接下來我們通過對它的主要方法的分析來看一下實現(xiàn)原理。

源碼解析:

1.CountDownLatch(int count)

這個是CountDownLatch的構造函數(shù),我們跟進看一下

public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }

先判斷是否count的值是否正常,如果小于0,直接拋出異常,否則創(chuàng)建一個Sync對象

private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 4982264981922014374L;

        Sync(int count) {
            setState(count);
        }

        int getCount() {
            return getState();
        }

        protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }

        protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }
    }

我們看到這里使用了AQS的Sync,唯一與lock不同的是把state設置為了count,然后獲取鎖的邏輯tryAcquireShared方法也做了對應的調(diào)整,這里獲取鎖的話判斷state是否為0。

2.await()

await方法用于阻塞線程,也就是令當前線程阻塞直到拿到鎖(state==0也就是count==0)

public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }

先判斷是否中斷,如果中斷的話響應中斷并拋出異常,結(jié)束阻塞,然后通過tryAcquireShared獲取鎖,我們來看tryAcquireShared方法

protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }

拿到鎖的唯一條件就是state==0,也就是子線程通過countDown()方法把count變?yōu)?才可以。我們接下來先看doAcquireSharedInterruptibly()上鎖的過程。

3.doAcquireSharedInterruptibly()
private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

這里的邏輯和lock方法的邏輯基本一致,只是稍作修改,主線程通過parkAndCheckInterrupt方法進行了阻塞。

4.countDown()
public void countDown() {
        sync.releaseShared(1);
    }
public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }

先調(diào)用了tryReleaseShared來解鎖,我們看一下這個過程

protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }

拿到state,然后通過CAS把state-1,最后返回Boolean值,如果state==0,說明鎖釋放返回true,如果state>0,返回false,這里也就證明了我們的猜想,countdown方法就是來把state每次減一的,直到所有子線程執(zhí)行完,減為0,鎖釋放。我們繼續(xù)看鎖釋放后的執(zhí)行過程。

5.doReleaseShared()
private void doReleaseShared() {
        /*
         * Ensure that a release propagates, even if there are other
         * in-progress acquires/releases.  This proceeds in the usual
         * way of trying to unparkSuccessor of head if it needs
         * signal. But if it does not, status is set to PROPAGATE to
         * ensure that upon release, propagation continues.
         * Additionally, we must loop in case a new node is added
         * while we are doing this. Also, unlike other uses of
         * unparkSuccessor, we need to know if CAS to reset status
         * fails, if so rechecking.
         */
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    unparkSuccessor(h);
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            if (h == head)                   // loop if head changed
                break;
        }
    }

這里的步驟也和lock的釋放過程類似,最后通過waitStatus的判斷來執(zhí)行unparkSuccessor()喚醒阻塞的線程。

6.setHeadAndPropagate()

當喚醒await的線程后,會執(zhí)行第3步doAcquireSharedInterruptibly()里的setHeadAndPropagate()

private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head; // Record old head for check below
        setHead(node);
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            Node s = node.next;
            if (s == null || s.isShared())
                doReleaseShared();
        }
    }

可以看到,把喚醒的node設置為了head節(jié)點,也就是node拿到鎖可以繼續(xù)執(zhí)行了,那么如果有其它的await也在等待呢?此時count為0,其它的肯定也要向下執(zhí)行,是怎么連續(xù)喚醒的呢,我們看本方法里的Node s = node.next;這里判斷后續(xù)阻塞節(jié)點,如果存在,就執(zhí)行 doReleaseShared();持續(xù)喚醒,doReleaseShared()在也就是第5步,他會解除head節(jié)點的next的阻塞,然后再執(zhí)行本步驟,設置為head,循環(huán)喚醒。

最后:

這里循環(huán)喚醒也是共享鎖的實現(xiàn)方式。在這里我們也再次印證了AQS是java.util.concurrent包下幾乎所有類的實現(xiàn)核心,像CountDownLatch、CyclicBarrier和Semaphore三大輔助類,lock等都是基于AQS來實現(xiàn)自己的控制邏輯的。

最后編輯于
?著作權歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

友情鏈接更多精彩內(nèi)容