AQS原理(二):基于CountDownLatch

上一篇主要結(jié)束AQS的獨(dú)占功能,這次我們通過(guò)對(duì)CountDownLathc的介紹來(lái)解讀AQS的另一個(gè)功能:共享功能。

AQS共享功能的實(shí)現(xiàn)

我們將用CountDownLatch來(lái)闡述AQS共享功能,關(guān)于CountDownLatch的使用,可以參見(jiàn)以前寫(xiě)的博客。在多線程環(huán)境下,需要等待的線程調(diào)用CountDownLatch的await()方法會(huì)被阻塞,等到其他線程調(diào)用countDown()方法后,將計(jì)數(shù)器減為0時(shí),被阻塞的線程才會(huì)喚醒。具體是一個(gè)什么樣的過(guò)程,可以用下面這個(gè)簡(jiǎn)單程序debug一下,然后可以參考這篇多線程程序調(diào)試指南

package com.test;

import java.util.concurrent.CountDownLatch;

public class Test {

    public static void main(String[] args) {
        final CountDownLatch countDownLatch = new CountDownLatch(2);
        
        new Thread(new Runnable() {
            
            @Override
            public void run() {
                countDownLatch.countDown();
            }
        }).start();
        
        new Thread(new Runnable() {
            
            @Override
            public void run() {
                countDownLatch.countDown();
            }
        }).start();
        
        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

關(guān)于CountDownLatch的實(shí)現(xiàn),我們首先可以看它的構(gòu)造方法:

    public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }

和ReentrantLock類(lèi)似,CountDownLatch內(nèi)部也有一個(gè)Sync的內(nèi)部類(lèi),同樣也是繼承了AQS。

再來(lái)看一下Sync:

        Sync(int count) {
            setState(count);
        }

上一篇講到setState()方法是AQS的一個(gè)“狀態(tài)位”,在ReentrantLock中,表示加鎖的次數(shù),而在CountDownLatch中,表示計(jì)數(shù)器的初始大小。我們點(diǎn)擊進(jìn)入setState()方法,進(jìn)入到AQS:

    /**
     * The synchronization state.
     */
    //這個(gè)state是volatile變量,在多個(gè)線程中共享
    private volatile int state;

    /**
     * Returns the current value of synchronization state.
     * This operation has memory semantics of a {@code volatile} read.
     * @return current state value
     */
    protected final int getState() {
        return state;
    }

    /**
     * Sets the value of synchronization state.
     * This operation has memory semantics of a {@code volatile} write.
     * @param newState the new state value
     */
    protected final void setState(int newState) {
        state = newState;
    }

await()

設(shè)置萬(wàn)計(jì)數(shù)器大小后,CountDownLatch的構(gòu)造方法返回,我們接著看CountDownLatch 的await()方法:

    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

調(diào)用了Sync的acquireSharedInterruptibly()方法,實(shí)際上是調(diào)用AQS的acquireSharedInterruptibly()方法:

    public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }

從方法名上看,這個(gè)方法的調(diào)用是響應(yīng)線程的打斷的,所以在前兩行會(huì)檢查下線程是否被打斷。接著,嘗試著獲取共享鎖,小于 0,表示獲取失敗,通過(guò)本系列的上半部分的解讀, 我們知道 AQS 在獲取鎖的思路是,先嘗試直接獲取鎖,如果失敗會(huì)將當(dāng)前線程放在隊(duì)列中,按照 FIFO 的原則等待鎖。而對(duì)于共享鎖也是這個(gè)思路,如果和獨(dú)占鎖一致,這里的 tryAcquireShared 應(yīng)該是個(gè)空方法,留給子類(lèi)去判斷:

    protected int tryAcquireShared(int arg) {
        throw new UnsupportedOperationException();
    }

接著看CountDownLatch的tryAcquireShared():

        protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }

如果state變成0,則返回1,表示獲取成功,否則返回-1,表示獲取失敗。

看到這里,我們可能會(huì)異或,await()方法的獲取方法更像在獲取獨(dú)占鎖,為什么還用tryAcquireShared()呢?

回想一下CountDownLatch的await()方法是不是智能在主線程中調(diào)用?并不是,CountDownLatch的await()方法可以在多個(gè)線程中調(diào)用,當(dāng)CountDownLatch的計(jì)數(shù)器為0后,調(diào)用 await 的方法都會(huì)依次返回。 也就是說(shuō)可以多個(gè)線程同時(shí)在等待 await 方法返回,所以它被設(shè)計(jì)成了實(shí)現(xiàn) tryAcquireShared 方法,獲取的是一個(gè)共享鎖,鎖在所有調(diào)用 await 方法的線程間共享,所以叫共享鎖。

回到 acquireSharedInterruptibly 方法:

    public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }

如果獲取共享鎖失?。ǚ祷亓?1,說(shuō)明state不為0,即CountDownLatch的計(jì)數(shù)器還不為0),進(jìn)入調(diào)用 doAcquireSharedInterruptibly() 方法,將當(dāng)前線程放入到隊(duì)列中。

AQS的結(jié)構(gòu),上一篇已經(jīng)詳細(xì)闡述了,本質(zhì)是一個(gè)雙向鏈表,可以參考上一篇文章。

繼續(xù)進(jìn)入到doAcquireSharedInterruptibly()方法:

private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.SHARED); 
// 將當(dāng)前線程包裝為類(lèi)型為 Node.SHARED 的節(jié)點(diǎn),標(biāo)示這是一個(gè)共享節(jié)點(diǎn)。
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
// 如果新建節(jié)點(diǎn)的前一個(gè)節(jié)點(diǎn),就是 Head,說(shuō)明當(dāng)前節(jié)點(diǎn)是 AQS 隊(duì)列中等待獲取鎖的第一個(gè)節(jié)點(diǎn),
// 按照 FIFO 的原則,可以直接嘗試獲取鎖。
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
// 獲取成功,需要將當(dāng)前節(jié)點(diǎn)設(shè)置為 AQS 隊(duì)列中的第一個(gè)節(jié)點(diǎn),這是 AQS 的規(guī)則 // 隊(duì)列的頭節(jié)點(diǎn)表示正在獲取鎖的節(jié)點(diǎn) 
                        setHeadAndPropagate(node, r); 
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) && // 檢查下是否需要將當(dāng)前節(jié)點(diǎn)掛起 
                    parkAndCheckInterrupt()) 
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

這里的代碼說(shuō)明:
1.setHeadAndPropagete()方法:

    private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head; // Record old head for check below
        setHead(node);
        /*
         * Try to signal next queued node if:
         *   Propagation was indicated by caller,
         *     or was recorded (as h.waitStatus either before
         *     or after setHead) by a previous operation
         *     (note: this uses sign-check of waitStatus because
         *      PROPAGATE status may transition to SIGNAL.)
         * and
         *   The next node is waiting in shared mode,
         *     or we don't know, because it appears null
         *
         * The conservatism in both of these checks may cause
         * unnecessary wake-ups, but only when there are multiple
         * racing acquires/releases, so most need signals now or soon
         * anyway.
         */
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            Node s = node.next;
            if (s == null || s.isShared())
                doReleaseShared();
        }
    }

首先,使用CAS更好頭結(jié)點(diǎn),然后將當(dāng)前結(jié)點(diǎn)的下一個(gè)結(jié)點(diǎn)取出來(lái),如果同樣是“shared”類(lèi)型的,在做一次“releaseShared”操作。

再看一下doReleaseShared()方法:

    private void doReleaseShared() {
        /*
         * Ensure that a release propagates, even if there are other
         * in-progress acquires/releases.  This proceeds in the usual
         * way of trying to unparkSuccessor of head if it needs
         * signal. But if it does not, status is set to PROPAGATE to
         * ensure that upon release, propagation continues.
         * Additionally, we must loop in case a new node is added
         * while we are doing this. Also, unlike other uses of
         * unparkSuccessor, we need to know if CAS to reset status
         * fails, if so rechecking.
         */
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
              // 如果當(dāng)前節(jié)點(diǎn)是 SIGNAL 意味著,它正在等待一個(gè)信號(hào),  
              // 或者說(shuō),它在等待被喚醒,因此做兩件事,1 是重置 waitStatus 標(biāo)志位,2 是重置成功后, 喚醒下一個(gè)節(jié)點(diǎn)。
                if (ws == Node.SIGNAL) {
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    unparkSuccessor(h);
                }
                // 如果本身頭節(jié)點(diǎn)的 waitStatus 是出于重置狀態(tài)(waitStatus==0)的,    將其設(shè)置為“傳播”狀態(tài)。
              // 意味著需要將狀態(tài)向后一個(gè)節(jié)點(diǎn)傳播。
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            if (h == head)                   // loop if head changed
                break;
        }
    }

這樣做的原因是:共享功能和獨(dú)占功能不一樣的地方,對(duì)于獨(dú)占功能,有且只有一個(gè)線程(通常只對(duì)應(yīng)一個(gè)結(jié)點(diǎn),拿ReentrantLock來(lái)說(shuō),如果當(dāng)前持有鎖的線程重復(fù)調(diào)用lock()方法,會(huì)被包裝成多個(gè)節(jié)點(diǎn)在 AQS 的隊(duì)列中,所以用一個(gè)線程來(lái)描述更準(zhǔn)確),能夠獲取鎖。

但是對(duì)于共享功能來(lái)說(shuō),共享的狀態(tài)是可以共享的,也就是意味著其他AQS隊(duì)列中的其他結(jié)點(diǎn)也能第一時(shí)間知道狀態(tài)的變化。因此,一個(gè)節(jié)點(diǎn)獲取到共享狀態(tài)流程圖是這樣的:

比如現(xiàn)在有如下隊(duì)列:
當(dāng) Node1 調(diào)用 tryAcquireShared 成功后,更換了頭節(jié)點(diǎn):


Node1 變成了頭節(jié)點(diǎn)然后調(diào)用 unparkSuccessor() 方法喚醒了 Node2、Node2 中持有的線程 A 出于上面流程圖的 park node 的位置,線程 A 被喚醒后,重復(fù)黃色線條的流程,重新檢查調(diào)用 tryAcquireShared 方法,看能否成功,如果成功,則又更改頭節(jié)點(diǎn),重復(fù)以上步驟,以實(shí)現(xiàn)節(jié)點(diǎn)自身獲取共享鎖成功后,喚醒下一個(gè)共享類(lèi)型節(jié)點(diǎn)的操作,實(shí)現(xiàn)共享狀態(tài)的向后傳遞。

2.對(duì)于doAcquireShared()方法,AQS還提供了類(lèi)似的實(shí)現(xiàn):


分別對(duì)應(yīng)了:
1)帶參數(shù)請(qǐng)求共享鎖。 (忽略中斷)
2)帶參數(shù)請(qǐng)求共享鎖,且響應(yīng)中斷。(每次循環(huán)時(shí),會(huì)檢查當(dāng)前線程的中斷狀態(tài),以實(shí)現(xiàn)對(duì)線程中斷的響應(yīng))
3)帶參數(shù)請(qǐng)求共享鎖但是限制等待時(shí)間。(第二個(gè)參數(shù)設(shè)置超時(shí)時(shí)間,超出時(shí)間后,方法返回。)

countDown()

看完await()方法,接著看countDown()方法:

    public void countDown() {
        sync.releaseShared(1);
    }

調(diào)用了AQS的releaseShared方法,并傳入了參數(shù)1:

    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }

先嘗試釋放鎖,tryReleaseShared 同樣為空方法,留給子類(lèi)自己去實(shí)現(xiàn),以下是 CountDownLatch 的內(nèi)部類(lèi) Sync 的實(shí)現(xiàn):

        protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }
    }

死循環(huán)更新 state 的值,實(shí)現(xiàn) state 的減 1 操作,之所以用死循環(huán)是為了確保 state 值的更新成功。

從上文的分析中可知,如果 state 的值為 0,在 CountDownLatch 中意味:所有的子線程已經(jīng)執(zhí)行完畢,這個(gè)時(shí)候可以喚醒調(diào)用 await() 方法的線程了,而這些線程正在 AQS 的隊(duì)列中,并被掛起的,

所以下一步應(yīng)該去喚醒 AQS 隊(duì)列中的頭節(jié)點(diǎn)了(AQS 的隊(duì)列為 FIFO 隊(duì)列),然后由頭節(jié)點(diǎn)去依次喚醒 AQS 隊(duì)列中的其他共享節(jié)點(diǎn)。

如果 tryReleaseShared 返回 true, 進(jìn)入 doReleaseShared() 方法:

private void doReleaseShared() {
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) { 
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) 
// 如果當(dāng)前節(jié)點(diǎn)是 SIGNAL 意味著,它正在等待一個(gè)信號(hào),
 // 或者說(shuō),它在等待被喚醒,因此做兩件事,1 是重置 waitStatus 標(biāo)志位,2 是重置成功后, 喚醒下一個(gè)節(jié)點(diǎn)。
                        continue;            // loop to recheck cases
                    unparkSuccessor(h);
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))  
// 如果本身頭節(jié)點(diǎn)的 waitStatus 是出于重置狀態(tài)(waitStatus==0)的,將其設(shè)置為“傳播”狀態(tài)。
// 意味著需要將狀態(tài)向后一個(gè)節(jié)點(diǎn)傳播。
                    continue;                // loop on failed CAS
            }
            if (h == head)                   // loop if head changed
                break;
        }
  }

當(dāng)線程被喚醒后,會(huì)重新嘗試獲取共享鎖,而對(duì)于 CountDownLatch 線程獲取共享鎖判斷依據(jù)是 state 是否為 0,而這個(gè)時(shí)候顯然 state 已經(jīng)變成了 0,因此可以順利獲取共享鎖并且依次喚醒 AQS 隊(duì)里中后面的節(jié)點(diǎn)及對(duì)應(yīng)的線程。

總結(jié)

本文從 CountDownLatch 入手,深入分析了 AQS 關(guān)于共享鎖方面的實(shí)現(xiàn)方式:

如果獲取共享鎖失敗后,將請(qǐng)求共享鎖的線程封裝成 Node 對(duì)象放入 AQS 的隊(duì)列中,并掛起 Node 對(duì)象對(duì)應(yīng)的線程,實(shí)現(xiàn)請(qǐng)求鎖線程的等待操作。待共享鎖可以被獲取后,從頭節(jié)點(diǎn)開(kāi)始,依次喚醒頭節(jié)點(diǎn)及其以后的所有共享類(lèi)型的節(jié)點(diǎn)。實(shí)現(xiàn)共享狀態(tài)的傳播。

這里有幾點(diǎn)值得注意:

1.與 AQS 的獨(dú)占功能一樣,共享鎖是否可以被獲取的判斷為空方法,交由子類(lèi)去實(shí)現(xiàn)。
2.與 AQS 的獨(dú)占功能不同,當(dāng)鎖被頭節(jié)點(diǎn)獲取后,獨(dú)占功能是只有頭節(jié)點(diǎn)獲取鎖,其余節(jié)點(diǎn)的線程繼續(xù)沉睡,等待鎖被釋放后,才會(huì)喚醒下一個(gè)節(jié)點(diǎn)的線程,而共享功能是只要頭節(jié)點(diǎn)獲取鎖成功,就在喚醒自身節(jié)點(diǎn)對(duì)應(yīng)的線程的同時(shí),繼續(xù)喚醒 AQS 隊(duì)列中的下一個(gè)節(jié)點(diǎn)的線程,每個(gè)節(jié)點(diǎn)在喚醒自身的同時(shí)還會(huì)喚醒下一個(gè)節(jié)點(diǎn)對(duì)應(yīng)的線程,以實(shí)現(xiàn)共享狀態(tài)的“向后傳播”,從而實(shí)現(xiàn)共享功能。
以上的分析都是從 AQS 子類(lèi)的角度去看待 AQS 的部分功能的,而如果直接看待 AQS,或許可以這么去解讀:

首先,AQS 并不關(guān)心“是什么鎖”,對(duì)于 AQS 來(lái)說(shuō)它只是實(shí)現(xiàn)了一系列的用于判斷“資源”是否可以訪問(wèn)的 API, 并且封裝了在“訪問(wèn)資源”受限時(shí)將請(qǐng)求訪問(wèn)的線程的加入隊(duì)列、掛起、喚醒等操作, AQS 只關(guān)心“資源不可以訪問(wèn)時(shí),怎么處理?”、“資源是可以被同時(shí)訪問(wèn),還是在同一時(shí)間只能被一個(gè)線程訪問(wèn)?”、“如果有線程等不及資源了,怎么從 AQS 的隊(duì)列中退出?”等一系列圍繞資源訪問(wèn)的問(wèn)題,而至于“資源是否可以被訪問(wèn)?”這個(gè)問(wèn)題則交給 AQS 的子類(lèi)去實(shí)現(xiàn)。

當(dāng) AQS 的子類(lèi)是實(shí)現(xiàn)獨(dú)占功能時(shí),例如 ReentrantLock,“資源是否可以被訪問(wèn)”被定義為只要 AQS 的 state 變量不為 0,并且持有鎖的線程不是當(dāng)前線程,則代表資源不能訪問(wèn)。

當(dāng) AQS 的子類(lèi)是實(shí)現(xiàn)共享功能時(shí),例如:CountDownLatch,“資源是否可以被訪問(wèn)”被定義為只要 AQS 的 state 變量不為 0,說(shuō)明資源不能訪問(wèn)。

這是典型的將規(guī)則和操作分開(kāi)的設(shè)計(jì)思路:規(guī)則子類(lèi)定義,操作邏輯因?yàn)榫哂泄眯裕旁诟割?lèi)中去封裝。

當(dāng)然,正式因?yàn)?AQS 只是關(guān)心“資源在什么條件下可被訪問(wèn)”,所以子類(lèi)還可以同時(shí)使用 AQS 的共享功能和獨(dú)占功能的 API 以實(shí)現(xiàn)更為復(fù)雜的功能。

比如:ReentrantReadWriteLock,我們知道 ReentrantReadWriteLock 的中也有一個(gè)叫 Sync 的內(nèi)部類(lèi)繼承了 AQS,而 AQS 的隊(duì)列可以同時(shí)存放共享鎖和獨(dú)占鎖,對(duì)于 ReentrantReadWriteLock 來(lái)說(shuō)分別代表讀鎖和寫(xiě)鎖,當(dāng)隊(duì)列中的頭節(jié)點(diǎn)為讀鎖時(shí),代表讀操作可以執(zhí)行,而寫(xiě)操作不能執(zhí)行,因此請(qǐng)求寫(xiě)操作的線程會(huì)被掛起,當(dāng)讀操作依次推出后,寫(xiě)鎖成為頭節(jié)點(diǎn),請(qǐng)求寫(xiě)操作的線程被喚醒,可以執(zhí)行寫(xiě)操作,而此時(shí)的讀請(qǐng)求將被封裝成 Node 放入 AQS 的隊(duì)列中。如此往復(fù),實(shí)現(xiàn)讀寫(xiě)鎖的讀寫(xiě)交替進(jìn)行。

而本系列文章上半部分提到的 FutureTask,其實(shí)思路也是:封裝一個(gè)存放線程執(zhí)行結(jié)果的變量 A, 使用 AQS 的獨(dú)占 API 實(shí)現(xiàn)線程對(duì)變量 A 的獨(dú)占訪問(wèn),判斷規(guī)則是,線程沒(méi)有執(zhí)行完畢:call() 方法沒(méi)有返回前,不能訪問(wèn)變量 A,或者是超時(shí)時(shí)間沒(méi)到前不能訪問(wèn)變量 A(這就是 FutureTask 的 get 方法可以實(shí)現(xiàn)獲取線程執(zhí)行結(jié)果時(shí),設(shè)置超時(shí)時(shí)間的原因)。

參考資料

https://www.infoq.cn/article/java8-abstractqueuedsynchronizer

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容