JUC之AbstractQueuedSynchronizer

1. 前言

AbstractQueuedSynchronizer(AQS)基于FIFO等待隊(duì)列以及CAS操作實(shí)現(xiàn)了基礎(chǔ)了同步框架,JUC中包括ReentrantLock,ReentrantReadWriteLock,CountDownLatch,Condition都是基于AbstractQueuedSynchronizer提供的基礎(chǔ)同步操作來(lái)實(shí)現(xiàn)的。

2. AbstractQueuedSynchronizer實(shí)現(xiàn)原理

2. 1 CLH隊(duì)列

CLH(代表Craig, Landin, Hagersten三人)隊(duì)列是一個(gè)FIFO的隊(duì)列,這個(gè)隊(duì)列用來(lái)對(duì)資源競(jìng)爭(zhēng)者(不同的線程就是一個(gè)競(jìng)爭(zhēng)者)進(jìn)行排隊(duì)。

2.1.1 隊(duì)列節(jié)點(diǎn)Node

AQS實(shí)現(xiàn)中隊(duì)列中一個(gè)節(jié)點(diǎn)即一個(gè)競(jìng)爭(zhēng)者,用類Node,Node類有如下主要成員:

成員 含義 取值
waitStatus 當(dāng)前節(jié)點(diǎn)狀態(tài) SIGNAL=-1 表示當(dāng)前節(jié)點(diǎn)釋放資源后需要喚醒他的后繼阻塞中的節(jié)點(diǎn); CANCELLED=1當(dāng)前節(jié)點(diǎn)已經(jīng)取消對(duì)資源的競(jìng)爭(zhēng); PROPAGATE=-3 用于競(jìng)爭(zhēng)共享資源競(jìng)爭(zhēng),表示當(dāng)前節(jié)點(diǎn)獲取資源后,應(yīng)該喚醒其他排隊(duì)阻塞中獲取該共享資源的競(jìng)爭(zhēng)者; CONDITION=-2專門用于Condition的實(shí)現(xiàn)
Node prev,next 當(dāng)前節(jié)點(diǎn)的前驅(qū)和后繼節(jié)點(diǎn) --
Thread thread 當(dāng)前排隊(duì)的線程,也就是競(jìng)爭(zhēng)資源的線程 --
Node nextWaiter 如果waitStatus=CONDITION則nextWaiter連接到下一個(gè)等待在同一個(gè)condition上的node;nextWaiter也可以是class NODE的靜態(tài)成員SHARED表示當(dāng)前節(jié)點(diǎn)競(jìng)爭(zhēng)共享資源;等于null表示競(jìng)爭(zhēng)排他性資源

1. 節(jié)點(diǎn)種類
根據(jù)上表中nextWaiter可以看出,實(shí)際上node的種類有三種:

  1. exclusive node(表示當(dāng)前節(jié)點(diǎn)申請(qǐng)排他新資源)
  2. shared node(表示當(dāng)前節(jié)點(diǎn)申請(qǐng)的是共享資源)
  3. condition, 當(dāng)前節(jié)點(diǎn)等待條件變量

不同的node種類獲取資源(對(duì)應(yīng)入隊(duì)操作)和釋放資源(對(duì)應(yīng)出隊(duì)操作)都有所不同

2.2.2 入隊(duì)操作

也就是去競(jìng)爭(zhēng)資源。
這里只介紹一下exclusive和shared node的入隊(duì)操作

2.2.2.1 exclusive node入隊(duì)操作

使用AQS競(jìng)爭(zhēng)排他性資源時(shí),需要調(diào)用如下兩個(gè)接口之一:

/*
調(diào)用acquire獲取資源,這是一個(gè)阻塞操作直到獲取到資源,且不可被中斷。
*/
public final void acquire(long arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
}

/**
和acquire不同的是競(jìng)爭(zhēng)者等待期間是可以被中斷
*/
 public final void acquireInterruptibly(long arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (!tryAcquire(arg))
            doAcquireInterruptibly(arg);
    }

兩者都調(diào)用了tryAcquire(), 這個(gè)方法需要使用AQS的用戶繼承并實(shí)現(xiàn)它,并且這個(gè)調(diào)用不應(yīng)該阻塞,之所以提供tryAcquire操作是因?yàn)楫?dāng)前競(jìng)爭(zhēng)者可能已經(jīng)獲取了資源,那么在tryAcquire可以做一下判斷,避免已經(jīng)獲取了資源后還去盲目的進(jìn)去CLH隊(duì)列排隊(duì)。
典型的就是重進(jìn)入鎖ReentrantLock的實(shí)現(xiàn),獲取鎖的線程可以再次獲取鎖,那么就可以在tryAcquire中做一下判斷。

關(guān)于參數(shù)arg,可以理解成嘗試獲取資源數(shù)量吧,實(shí)際上這個(gè)跟AQS的一個(gè)重要的成員state有關(guān),當(dāng)用戶使用AQS時(shí),需要設(shè)置state這個(gè)整數(shù)值,關(guān)于這個(gè)state,應(yīng)該可以當(dāng)成是對(duì)資源狀態(tài)的描述吧。比如ReenctrantLock中可以根據(jù)state是否等于0判斷是否加鎖了,以及state的值大小判斷重進(jìn)入鎖加了幾次鎖。

上面代碼addWaiter()其實(shí)就是不停的使用CAS操作將當(dāng)前節(jié)點(diǎn)加入CLH隊(duì)列尾部。

2. acquireQueued()

下面是acquireQueued的代碼實(shí)現(xiàn):

final boolean acquireQueued(final Node node, long arg) {
        boolean failed = true;
        try {
            // 線程中斷標(biāo)記
            boolean interrupted = false;
            for (;;) {
                // 獲得當(dāng)前節(jié)點(diǎn)的前驅(qū)
                final Node p = node.predecessor();
                // 如果前驅(qū)節(jié)點(diǎn)已經(jīng)是隊(duì)列頭,那就意味著看起來(lái)自己最優(yōu)先獲取到資源,然后就調(diào)用tryAcquire嘗試獲取資源。
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                // 當(dāng)前節(jié)點(diǎn)不是頭節(jié)點(diǎn)則排隊(duì)阻塞。
                // shouldParkAfterFailedAcquire會(huì)將前驅(qū)節(jié)點(diǎn)的waitStatus標(biāo)記成SIGNAL表示告訴前驅(qū)節(jié)點(diǎn):你釋放資源后需要喚醒我,我可能阻塞了
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    // 如果排隊(duì)過程中被中斷,記下來(lái),不甩異常
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

以上可以看出acquireQueued其實(shí)就是在不停的做如下嘗試:

  1. 判斷當(dāng)前節(jié)點(diǎn)是不是最前面了,是則調(diào)用tryAcquire嘗試獲取資源,獲取成功就返回,獲取失敗或者不是最前面節(jié)點(diǎn)則阻塞等待,然后到2
  2. 無(wú)限等待然后被喚醒,繼續(xù)1.

2. doAcquireInterruptibly()
和acquireQueued的過程是一樣的,就是等待的過程中被中斷了立即甩異常

2.2.2.2 shared node入隊(duì)操作

和exelusive模式一樣都有可中斷和不可中斷兩個(gè),如下:

 public final void acquireShared(long arg) {
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
 }

public final void acquireSharedInterruptibly(long arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
}

tryAcquireShared()需要使用AQS使用者繼承實(shí)現(xiàn),不過和tryAcquire不一樣的是它返回一個(gè)整數(shù),小于0表示嘗試失敗了。

1. doAcquireShared

private void doAcquireShared(int arg) {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                       // 和exclusive模式下acquireQueued整體流程是一樣的,
                       // 不同的是在這里,成功獲取到競(jìng)爭(zhēng)資源后,調(diào)用的這個(gè)方法。
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
 }

關(guān)于上面setHeadAndPropagate做了什么,先通過下圖描述一下為什么會(huì)有這個(gè)方法的調(diào)用吧.

假設(shè)現(xiàn)在排隊(duì)隊(duì)列如下:


-----     ------    -----     -----
| E1 |  ->| S1 | -> | S2 | ->| S3 |
-----     ------    -----     ----

節(jié)點(diǎn)E1是頭節(jié)點(diǎn),exclusively持有資源, S1, S2, S3都是在申請(qǐng)共享訪問資源,然后因?yàn)镋1已經(jīng)持有了,所以都阻塞了。

然后E1釋放資源了,所以他會(huì)喚醒阻塞中的S1,S1拿到了資源,但是由于S1是共享模式去拿資源,S2,S3也都是去申請(qǐng)共享資源,所以S1應(yīng)該去同樣喚醒S2,S3一起去分享資源。 這也就是setHeadAndPropagate干的事情--喚醒S2, S2醒了喚醒S3。

2.2.3 出隊(duì)操作

也就是釋放資源.

2.2.3.1 exclusive node釋放資源

釋放資源通過如下方法實(shí)現(xiàn):

public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
 }

tryRelease和tryAcquire一樣,是需要使用者繼承并實(shí)現(xiàn)的,返回true表示資源完全被釋放(也就是需要喚醒后面等待的node),tryRelease同樣需要結(jié)合資源狀態(tài)state去判斷,且應(yīng)該是一個(gè)非阻塞的調(diào)用。

下面unparkSuccessor喚醒后繼等待者的實(shí)現(xiàn):

private void unparkSuccessor(Node node) {
       
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        Node s = node.next;
        // 后繼節(jié)點(diǎn)等于null并不能代表沒有后繼節(jié)點(diǎn),原因跟入隊(duì)addWaiter的實(shí)現(xiàn)有關(guān).
        // 調(diào)用addWaiter入隊(duì)newNode時(shí)先用CAS將tail設(shè)置成newNode,然后再將之前的tail.next設(shè)置成newNode,
        // 所以這中間的時(shí)間間隔中有tail但是之前的tail還沒有完成和newNode連接,也就是之前的tail的后繼是null
        // 而s.waitStatus >0 則表示這個(gè)節(jié)點(diǎn)狀態(tài)為CACELLED取消競(jìng)爭(zhēng)資源,應(yīng)該再找它后面的節(jié)點(diǎn)
        // for循環(huán)里從tail往前遍歷找到最前面的沒有CANCELLED的節(jié)點(diǎn),然后喚醒它。
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            // 喚醒找到的節(jié)點(diǎn)對(duì)應(yīng)的線程。
            LockSupport.unpark(s.thread);
    }

2.2.3.2 shared node釋放資源

調(diào)用如下方法:

public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
 }

tryReleaseShared需要使用AQS的繼承并實(shí)現(xiàn)它,返回true表示成功釋放,意味著需要喚醒后續(xù)等待者。

關(guān)于doReleaseShared就是喚醒后繼所有等待的節(jié)點(diǎn),被喚醒的節(jié)點(diǎn)又會(huì)在前面介紹的各種doAcquireXXX中醒來(lái)去嘗試獲取資源,獲取不到的又會(huì)等待。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容