[懷舊并發(fā)07]分析CountDownLatch的實(shí)現(xiàn)原理

Java并發(fā)編程源碼分析系列:

上一篇通過(guò)研究ReentrantLock分析了AQS的獨(dú)占功能,本文將通過(guò)同樣是AQS子類的CountDownLatch分析AQS的共享功能。有了前文研究獨(dú)占功能的基礎(chǔ),再研究共享鎖就簡(jiǎn)單多了。

CountDownLatch的使用

CountDownLatch是同步工具類之一,可以指定一個(gè)計(jì)數(shù)值,在并發(fā)環(huán)境下由線程進(jìn)行減1操作,當(dāng)計(jì)數(shù)值變?yōu)?之后,被await方法阻塞的線程將會(huì)喚醒,實(shí)現(xiàn)線程間的同步。

public void startTestCountDownLatch() {
   int threadNum = 10;
   final CountDownLatch countDownLatch = new CountDownLatch(threadNum);

   for (int i = 0; i < threadNum; i++) {
       final int finalI = i + 1;
       new Thread(() -> {
           System.out.println("thread " + finalI + " start");
           Random random = new Random();
           try {
               Thread.sleep(random.nextInt(10000) + 1000);
           } catch (InterruptedException e) {
               e.printStackTrace();
           }
           System.out.println("thread " + finalI + " finish");

           countDownLatch.countDown();
       }).start();
   }

   try {
       countDownLatch.await();
   } catch (InterruptedException e) {
       e.printStackTrace();
   }
   System.out.println(threadNum + " thread finish");
}

主線程啟動(dòng)10個(gè)子線程后阻塞在await方法,需要等子線程都執(zhí)行完畢,主線程才能喚醒繼續(xù)執(zhí)行。

構(gòu)造器

CountDownLatch和ReentrantLock一樣,內(nèi)部使用Sync繼承AQS。構(gòu)造函數(shù)很簡(jiǎn)單地傳遞計(jì)數(shù)值給Sync,并且設(shè)置了state。

Sync(int count) {
    setState(count);
}

上文已經(jīng)介紹過(guò)AQS的state,這是一個(gè)由子類決定含義的“狀態(tài)”。對(duì)于ReentrantLock來(lái)說(shuō),state是線程獲取鎖的次數(shù);對(duì)于CountDownLatch來(lái)說(shuō),則表示計(jì)數(shù)值的大小。

阻塞線程

接著來(lái)看await方法,直接調(diào)用了AQS的acquireSharedInterruptibly。

public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}
public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}

首先嘗試獲取共享鎖,實(shí)現(xiàn)方式和獨(dú)占鎖類似,由CountDownLatch實(shí)現(xiàn)判斷邏輯。

protected int tryAcquireShared(int acquires) {
   return (getState() == 0) ? 1 : -1;
}

返回1代表獲取成功,返回-1代表獲取失敗。如果獲取失敗,需要調(diào)用doAcquireSharedInterruptibly:

private void doAcquireSharedInterruptibly(int arg)
    throws InterruptedException {
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

doAcquireSharedInterruptibly的邏輯和獨(dú)占功能的acquireQueued基本相同,阻塞線程的過(guò)程是一樣的。不同之處:

  1. 創(chuàng)建的Node是定義成共享的(Node.SHARED);
  2. 被喚醒后重新嘗試獲取鎖,不只設(shè)置自己為head,還需要通知其他等待的線程。(重點(diǎn)看后文釋放操作里的setHeadAndPropagate)

釋放操作

public void countDown() {
    sync.releaseShared(1);
}

countDown操作實(shí)際就是釋放鎖的操作,每調(diào)用一次,計(jì)數(shù)值減少1:

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

同樣是首先嘗試釋放鎖,具體實(shí)現(xiàn)在CountDownLatch中:

protected boolean tryReleaseShared(int releases) {
    // Decrement count; signal when transition to zero
    for (;;) {
        int c = getState();
        if (c == 0)
            return false;
        int nextc = c-1;
        if (compareAndSetState(c, nextc))
            return nextc == 0;
    }
}

死循環(huán)加上cas的方式保證state的減1操作,當(dāng)計(jì)數(shù)值等于0,代表所有子線程都執(zhí)行完畢,被await阻塞的線程可以喚醒了,下一步調(diào)用doReleaseShared:

private void doReleaseShared() {
   for (;;) {
       Node h = head;
       if (h != null && h != tail) {
           int ws = h.waitStatus;
           if (ws == Node.SIGNAL) {
             //1
               if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                   continue;            // loop to recheck cases
               unparkSuccessor(h);
           }
           //2
           else if (ws == 0 &&
                    !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
               continue;                // loop on failed CAS
       }
       if (h == head)                   // loop if head changed
           break;
   }
}

標(biāo)記1里,頭節(jié)點(diǎn)狀態(tài)如果SIGNAL,則狀態(tài)重置為0,并調(diào)用unparkSuccessor喚醒下個(gè)節(jié)點(diǎn)。

標(biāo)記2里,被喚醒的節(jié)點(diǎn)狀態(tài)會(huì)重置成0,在下一次循環(huán)中被設(shè)置成PROPAGATE狀態(tài),代表狀態(tài)要向后傳播。

private void unparkSuccessor(Node node) {
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        LockSupport.unpark(s.thread);
}

在喚醒線程的操作里,分成三步:

  • 處理當(dāng)前節(jié)點(diǎn):非CANCELLED狀態(tài)重置為0;
  • 尋找下個(gè)節(jié)點(diǎn):如果是CANCELLED狀態(tài),說(shuō)明節(jié)點(diǎn)中途溜了,從隊(duì)列尾開(kāi)始尋找排在最前還在等著的節(jié)點(diǎn)
  • 喚醒:利用LockSupport.unpark喚醒下個(gè)節(jié)點(diǎn)里的線程。

線程是在doAcquireSharedInterruptibly里被阻塞的,喚醒后調(diào)用到setHeadAndPropagate。

private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head;
    setHead(node);
    
    if (propagate > 0 || h == null || h.waitStatus < 0 ||
        (h = head) == null || h.waitStatus < 0) {
        Node s = node.next;
        if (s == null || s.isShared())
            doReleaseShared();
    }
}

setHead設(shè)置頭節(jié)點(diǎn)后,再判斷一堆條件,取出下一個(gè)節(jié)點(diǎn),如果也是共享類型,進(jìn)行doReleaseShared釋放操作。下個(gè)節(jié)點(diǎn)被喚醒后,重復(fù)上面的步驟,達(dá)到共享狀態(tài)向后傳播。

要注意,await操作看著好像是獨(dú)占操作,但它可以在多個(gè)線程中調(diào)用。當(dāng)計(jì)數(shù)值等于0的時(shí)候,調(diào)用await的線程都需要知道,所以使用共享鎖。

限定時(shí)間的await

CountDownLatch的await方法還有個(gè)限定阻塞時(shí)間的版本.

public boolean await(long timeout, TimeUnit unit)
    throws InterruptedException {
    return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}

跟蹤代碼,最后來(lái)看doAcquireSharedNanos方法,和上文介紹的doAcquireShared邏輯基本一樣,不同之處是加了time字眼的處理。

private boolean doAcquireSharedNanos(int arg, long nanosTimeout)
        throws InterruptedException {
    if (nanosTimeout <= 0L)
        return false;
    final long deadline = System.nanoTime() + nanosTimeout;
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return true;
                }
            }
            nanosTimeout = deadline - System.nanoTime();
            if (nanosTimeout <= 0L)
                return false;
            if (shouldParkAfterFailedAcquire(p, node) &&
                nanosTimeout > spinForTimeoutThreshold)
                LockSupport.parkNanos(this, nanosTimeout);
            if (Thread.interrupted())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

進(jìn)入方法時(shí),算出能夠執(zhí)行多久的deadline,然后在循環(huán)中判斷時(shí)間。注意到代碼中間有句:

nanosTimeout > spinForTimeoutThreshold
static final long spinForTimeoutThreshold = 1000L;

spinForTimeoutThreshold寫(xiě)死了1000ns,這就是所謂的自旋操作。當(dāng)超時(shí)在1000ns內(nèi),讓線程在循環(huán)中自旋,否則阻塞線程。

總結(jié)

兩篇文章分別以ReentrantLock和CountDownLatch為例研究了AQS的獨(dú)占功能和共享功能。AQS里的主要方法研究得七七八八了,趁熱打鐵下一篇將會(huì)研究其他同步工具類的實(shí)現(xiàn)。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容