Java基礎-AbstractQueuedSynchronizer類

????AbstractQueuedSynchronizer類(簡稱AQS)依賴于內(nèi)部的FIFO隊列,提供了一個可以實現(xiàn)阻塞鎖和同步機制的框架。依賴于AQS實現(xiàn)的具有同步機制和鎖功能的類,需要實現(xiàn)它定義的protect方法(tryAcquire,tryRelease等方法),通過改變state字段的狀態(tài)來保證同步機制。

AQS的使用場景

1、ReentrantLock類提供的同步鎖的功能也是基于AQS類來實現(xiàn)的,ReentrantLock提供的公平鎖(sync)和非公平鎖(NofairSync)兩種鎖機制,都是通過重寫tryAcquire和tryRelease方法來實現(xiàn)的。
ReentrantLock重寫的tryAcquire方法:

        protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }
        // 非公平的獲取鎖方法
        final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            // 提供了鎖的可重入功能,當已經(jīng)獲取鎖的線程再次請求獲取鎖時
            // 將state的值加一。
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

ReentrantLock重寫的tryRelease方法

        protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            // 判斷當前請求釋放鎖的線程與當前持有鎖的線程是不是同一個線程
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            // 因為提供了鎖重入的功能,這里會判斷當前線程是否已經(jīng)釋放完所有持有的鎖
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }

Reentrant基于AQS加鎖的執(zhí)行過程


image.png

2、CyclicBarrier類
應用場景:創(chuàng)建一組任務,它們并行地執(zhí)行工作,然后在進行下一個步驟之前等待,直至所有任務都完成,它使得所有并行任務都將在柵欄處列隊,因此可以一致的向前移動。
CyclicBarrier類提供的功能也是基于AQS類來實現(xiàn)的,主要是利用AQS類中ConditionObject類提供的功能。

        // 喚醒所有等待的線程
        public final void signalAll() {
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            Node first = firstWaiter;
            if (first != null)
                doSignalAll(first);
        }
        // 喚醒所有線程的方法,循環(huán)隊列中等待的線程,調(diào)用transferForSignal方法喚醒在等待中的線程。
        private void doSignalAll(Node first) {
            lastWaiter = firstWaiter = null;
            do {
                Node next = first.nextWaiter;
                first.nextWaiter = null;
                transferForSignal(first);
                first = next;
            } while (first != null);
        }

transferForSignal方法,會對當前節(jié)點狀態(tài)校驗,然后調(diào)用LockSupport.unpark方法喚醒指定的線程。

    final boolean transferForSignal(Node node) {
        /*
         * If cannot change waitStatus, the node has been cancelled.
         */
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            return false;

        /*
         * Splice onto queue and try to set waitStatus of predecessor to
         * indicate that thread is (probably) waiting. If cancelled or
         * attempt to set waitStatus fails, wake up to resync (in which
         * case the waitStatus can be transiently and harmlessly wrong).
         */
        Node p = enq(node);
        int ws = p.waitStatus;
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
            LockSupport.unpark(node.thread);
        return true;
    }

await方法:當CyclicBarrier類提供的計數(shù)器沒有減到等于0時,提前進入的線程會調(diào)用AQS中ConditionObject類提供的await方法,將當前線程加入到等待隊列中。

        public final void await() throws InterruptedException {
            // 如果當前線程被中斷,會拋出異常
            if (Thread.interrupted())
                throw new InterruptedException();
            // 將新節(jié)點添加到等待隊列中去,節(jié)點的狀態(tài)為 CONDITON。
            Node node = addConditionWaiter();
            // 釋放當前線程持有的鎖,如果失敗則將當前節(jié)點的狀態(tài)改為cancelled(表示當前節(jié)點失效)。
            int savedState = fullyRelease(node);
            int interruptMode = 0;
            // 判斷當前節(jié)點是否在CLH隊列中,CONDITION狀態(tài)的節(jié)點,只能在CONDITION隊列中。
            while (!isOnSyncQueue(node)) {
                LockSupport.park(this);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            // 出了while循環(huán),說明線程被喚醒,并且已經(jīng)將該node節(jié)點從CONDITION隊列transfer到了CLH隊列中,或者發(fā)生了中斷。
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            // 處理異?;蛘咧袛嗟墓?jié)點
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }

下面來看一下CyclicBarrier類的執(zhí)行過程


image.png
?著作權歸作者所有,轉載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容