AQS

paper傳送門

/**

* Provides a framework for implementing blocking locks and related

* synchronizers (semaphores, events, etc) that rely on

* first-in-first-out (FIFO) wait queues.  This class is designed to

* be a useful basis for most kinds of synchronizers that rely on a

* single atomic int value to represent state. Subclasses

* must define the protected methods that change this state, and which

* define what that state means in terms of this object being acquired

* or released.  Given these, the other methods in this class carry

* out all queuing and blocking mechanics. Subclasses can maintain

* other state fields, but only the atomically updated int

* value manipulated using methods {@link #getState}, {@link

* #setState} and {@link #compareAndSetState} is tracked with respect

* to synchronization.

*/

*      +------+  prev +-----+       +-----+

* head |      | <---- |     | <---- |     |  tail

*      +------+       +-----+       +-----+

在AQS中維護(hù)著一個FIFO的同步隊列,當(dāng)線程獲取同步狀態(tài)失敗后,則會加入到這個CLH同步隊列的對尾并一直保持著自旋。在CLH同步隊列中的線程在自旋時會判斷其前驅(qū)節(jié)點是否為首節(jié)點,如果為首節(jié)點則不斷嘗試獲取同步狀態(tài),獲取成功則退出CLH同步隊列。當(dāng)線程執(zhí)行完邏輯后,會釋放同步狀態(tài),釋放后會喚醒其后繼節(jié)點。

它維護(hù)了一個volatile int state(代表共享資源)和一個FIFO線程等待隊列(多線程爭用資源被阻塞時會進(jìn)入此隊列)。這里volatile是核心關(guān)鍵詞,具體volatile的語義,在此不述。state的訪問方式有三種:

getState()

setState()

compareAndSetState()

AQS定義兩種資源共享方式:

  • Exclusive(獨占,只有一個線程能執(zhí)行,如ReentrantLock)
  • Share(共享,多個線程可同時執(zhí)行,如Semaphore/CountDownLatch)

ReentrantLock的公平和非公平如何實現(xiàn)的呢?它內(nèi)部有幾個類

abstract static class Sync extends AbstractQueuedSynchronizer

static final class NonfairSync extends Sync

static final class FairSync extends Sync

對于State都是tryAcquire方法

公平鎖

static final class FairSync extends Sync {
        private static final long serialVersionUID = -3000897897090466540L;

        final void lock() {
            acquire(1);
        }

        /**
         * Fair version of tryAcquire.  Don't grant access unless
         * recursive call or no waiters or is first.
         */
        protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
    }
}

非公平鎖

static final class NonfairSync extends Sync {
        private static final long serialVersionUID = 7316153563782823691L;

        /**
         * Performs lock.  Try immediate barge, backing up to normal
         * acquire on failure.
         */
        final void lock() {
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }

        protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }
    }
/**
         * Performs non-fair tryLock.  tryAcquire is implemented in
         * subclasses, but both need nonfair try for trylock method.
         */
        final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

可以發(fā)現(xiàn)區(qū)別在于公平鎖多了一層hasQueuedPredecessors()判斷

/**
     * Queries whether any threads have been waiting to acquire longer
     * than the current thread.
     *
     * <p>An invocation of this method is equivalent to (but may be
     * more efficient than):
     *  <pre> {@code
     * getFirstQueuedThread() != Thread.currentThread() &&
     * hasQueuedThreads()}</pre>
     *
     * <p>Note that because cancellations due to interrupts and
     * timeouts may occur at any time, a {@code true} return does not
     * guarantee that some other thread will acquire before the current
     * thread.  Likewise, it is possible for another thread to win a
     * race to enqueue after this method has returned {@code false},
     * due to the queue being empty.
     *
     * <p>This method is designed to be used by a fair synchronizer to
     * avoid <a href="AbstractQueuedSynchronizer#barging">barging</a>.
     * Such a synchronizer's {@link #tryAcquire} method should return
     * {@code false}, and its {@link #tryAcquireShared} method should
     * return a negative value, if this method returns {@code true}
     * (unless this is a reentrant acquire).  For example, the {@code
     * tryAcquire} method for a fair, reentrant, exclusive mode
     * synchronizer might look like this:
     *
     *  <pre> {@code
     * protected boolean tryAcquire(int arg) {
     *   if (isHeldExclusively()) {
     *     // A reentrant acquire; increment hold count
     *     return true;
     *   } else if (hasQueuedPredecessors()) {
     *     return false;
     *   } else {
     *     // try to acquire normally
     *   }
     * }}</pre>
     *
     * @return {@code true} if there is a queued thread preceding the
     *         current thread, and {@code false} if the current thread
     *         is at the head of the queue or the queue is empty
     * @since 1.7
     */
    public final boolean hasQueuedPredecessors() {
        // The correctness of this depends on head being initialized
        // before tail and on head.next being accurate if the current
        // thread is first in queue.
        Node t = tail; // Read fields in reverse initialization order
        Node h = head;
        Node s;
        return h != t &&
            ((s = h.next) == null || s.thread != Thread.currentThread());
    }

doc 和 代碼都很清楚, 此方法判斷當(dāng)前線程之前是否還有其他線程在(非頭結(jié)點)

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容