不就是CountDownLatch

介紹CountDownLatch之前,我相信很多人在學(xué)習(xí)的時候是不清楚這個CountDownLatch的使用場景是啥。為了回答這個問題,簡單說個小段子。
老李家有兩個熊孩子小A和小B,小A和小B每天放學(xué)后自己回家,到家后都需要老李來開門,不要問我為啥不給小A和小B一把鑰匙。由于不是一個年級的,放學(xué)的時間不同,每天都需要老李開兩次門,有一天老李怒了,告訴兩個熊孩子,以后到家了必須敲下門,在門口喊一聲,老李聽到兩個孩子的敲門聲再去敲門,不要問我小A和小B是親生的不。
其實,上面這個例子就是CountDownLatch的使用場景,小A和小B到家時間不同相當(dāng)于兩個線程的執(zhí)行時間不同,小A和小B每次回家必須喊一次相當(dāng)于線程間的通信,老李只有聽到兩個孩子的敲門聲才會去敲門相當(dāng)于主線程不再阻塞,向下進(jìn)行。

再舉個最近項目中的使用場景。
最近在做圖像識別的一個項目,需要上傳圖片到華為云的modelart服務(wù)來獲取圖片的識別信息,然后對返回信息進(jìn)行處理,分析出想要的信息。

由于有些產(chǎn)品是需要同時上傳兩張圖片,然后再根據(jù)返回的信息進(jìn)行處理。上傳一張圖片等待返回信息這個過程的時間大概是3-5秒,上傳兩張圖片,需要訪問兩次華為云modelart服務(wù),如果使用串行方式的話,那么需要花費(fèi)10s左右的時間,這里就想到了可以使用CountDownLatch,等待這兩個上傳操作的線程結(jié)束拿到返回信息后,再調(diào)用后面的接口來分析這兩個圖片的信息。

這里,就簡單介紹完了CountDownLatch的使用場景,下面簡單說下CountDownLatch的使用,直接給出CountDownLatch源碼中的例子。

* class Driver2 { // ...
 *   void main() throws InterruptedException {
 *     CountDownLatch doneSignal = new CountDownLatch(N);
 *     Executor e = ...
 *
 *     for (int i = 0; i < N; ++i) // create and start threads
 *       e.execute(new WorkerRunnable(doneSignal, i));
 *
 *     doneSignal.await();           // wait for all to finish
 *   }
 * }
 *
 * class WorkerRunnable implements Runnable {
 *   private final CountDownLatch doneSignal;
 *   private final int i;
 *   WorkerRunnable(CountDownLatch doneSignal, int i) {
 *     this.doneSignal = doneSignal;
 *     this.i = i;
 *   }
 *   public void run() {
 *     try {
 *       doWork(i);
 *       doneSignal.countDown();
 *     } catch (InterruptedException ex) {} // return;
 *   }
 *
 *   void doWork() { ... }
 * }}

首先看main方法,一開始根據(jù)需要等待的線程數(shù),初始化CountDownLatch,然后啟動線程,線程結(jié)束后調(diào)用CountDownLatch的countDown方法,當(dāng)調(diào)用countDownLatch的counDown次數(shù)和初始化CountDownLatch的線程數(shù)相同時,主線程中的CountDownLatch的await方法不再阻塞,往下進(jìn)行。

使用很簡單,主要看源碼實現(xiàn)。
CountDownLatch的底層實現(xiàn)是使用AQS隊列實現(xiàn),對AQS的不熟悉的同學(xué)可以看下方騰飛的《java并發(fā)編程的藝術(shù)》這本書或者看下這個AQS。
首先看下await方法。

    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

sync這個實例是什么類型的呢


public class CountDownLatch {
    /**
     * Synchronization control For CountDownLatch.
     * Uses AQS state to represent count.
     */
    private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 4982264981922014374L;

        Sync(int count) {
            setState(count);
        }

        int getCount() {
            return getState();
        }

        protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }

        protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }
    }

    private final Sync sync;

}

Sync類繼承了AbstractQueuedSynchronizer(AQS), 通過state值的大小來控制鎖的獲取。下面根據(jù)CountDownLatch的使用來分析下源碼。
(1)創(chuàng)建CountDownLatch實例時。

    public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }

這里就可以很清楚的看到,這里會初始化AQS隊列的state值的大小,state值其實就是需要等待線程數(shù)的大小。

(2)主線程調(diào)用CountDownLatch的await方法,阻塞主線程,等待其他線程執(zhí)行結(jié)束。

    public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }

        protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }

首先調(diào)用tryAcquireShared獲取當(dāng)前state的值,如果值為0返回1,說明其他線程執(zhí)行結(jié)束,不再阻塞。如果值不為0,則返回-1,說明其他線程還未執(zhí)行結(jié)束,需要調(diào)用doAcquireSharedInterruptibly方法阻塞等待。
下面看下這個方法的實現(xiàn)。

    private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.SHARED);  ##隊列中插入node節(jié)點(diǎn),保存線程信息
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();    ##獲取node節(jié)點(diǎn)的前一個節(jié)點(diǎn)
                if (p == head) {      ## 判斷p節(jié)點(diǎn)是否是頭結(jié)點(diǎn)
                    int r = tryAcquireShared(arg);  ##獲取state值得大小
                    if (r >= 0) {                      ## r>=0 說明state值為0
                        setHeadAndPropagate(node, r);  ##設(shè)置頭結(jié)點(diǎn)并且觸發(fā)隊列中頭結(jié)點(diǎn)的下一個節(jié)點(diǎn)是否是共享節(jié)點(diǎn),如果是的話,下個節(jié)點(diǎn)對應(yīng)的線程也不再阻塞,具有傳播特性。
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())  ## 阻塞調(diào)用此方法的線程
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

上面的注釋已經(jīng)說明上面方法中整個的處理過程,其中setHeadAndPropagate和shouldParkAfterFailedAcquire還需要詳細(xì)分析一下,首先看下setHeadAndPropagate方法。

  private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head; // Record old head for check below
        setHead(node);
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            Node s = node.next;
            if (s == null || s.isShared())
                doReleaseShared();
        }
    }

執(zhí)行此方法的前提是node的前一個節(jié)點(diǎn)是head節(jié)點(diǎn),并且state值為0。在這個方法里,首先將當(dāng)前的node節(jié)點(diǎn)設(shè)置為head節(jié)點(diǎn),然后根據(jù)propagate這個值的大小,判斷是否獲取node節(jié)點(diǎn)的下一個節(jié)點(diǎn),然后根據(jù)下一個節(jié)點(diǎn)是否是共享式類型的節(jié)點(diǎn),來釋放下個節(jié)點(diǎn)對應(yīng)的線程,使下個節(jié)點(diǎn)的線程也不再阻塞,propagate使線程的釋放具有了傳播性,從隊列的頭結(jié)點(diǎn)開始,只要頭結(jié)點(diǎn)不再阻塞,也可以使隊列中的其他共享節(jié)點(diǎn)也不再阻塞,具有了傳播性。
然后看下shouldParkAfterFailedAcquire方法的實現(xiàn)。

    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            return true;
        if (ws > 0) {
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

這個方法的目的主要是獲取state值不為0時,是否阻塞此線程。如果此方法返回true則會調(diào)用parkAndCheckInterrupt這個方法,在這個方法里調(diào)用LockSupport的park方法阻塞此線程。那么阻塞后,什么時候喚醒這個線程呢,想要解決這個疑問就需要看下CountDownLatch的countDown方法的處理邏輯了。
(3) 線程執(zhí)行完,調(diào)用CountDownLatch的countDown方法。

    public void countDown() {
        sync.releaseShared(1);
    }

    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }

        protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }

首先,在tryReleaseShared方法中將state值的大小減一,然后執(zhí)行doReleaseShared方法,

    private void doReleaseShared() {
      
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    unparkSuccessor(h);
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            if (h == head)                   // loop if head changed
                break;
        }
    }

在doReleaseShared方法中通過unparkSuccessor獲取head節(jié)點(diǎn)的下一個節(jié)點(diǎn)的thread信息,然后執(zhí)行LockSupport的unpark方法,這樣的話之前await方法中阻塞的線程就不再阻塞,繼續(xù)往下執(zhí)行。

通過研究CountDownLatch的這三個方法,基本理解了底層實現(xiàn),另外,如果能看懂這幾個方法的源碼,其實對AQS的源碼也已經(jīng)了解的差不多了,后面可以去看下Lock的源碼,也是基于AQS實現(xiàn)的。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容