AbstractQueuedSynchronizer原理剖析

隊(duì)列同步器AbstractQueuedSynchronizer(簡稱同步器),主要是用于構(gòu)建鎖或其他同步組件(例如Semaphore)的基礎(chǔ)框架,它使用了一個(gè)int成員變量表示同步狀態(tài),通過內(nèi)置的FIFO隊(duì)列來完成資源獲取線程的排隊(duì)工作,成為實(shí)現(xiàn)大部分同步需求的基礎(chǔ)?!禞ava并發(fā)編程的藝術(shù)》

上一篇介紹ReentrantLock可重入鎖時(shí)提到其底層實(shí)現(xiàn)為同步器,其內(nèi)部定義一個(gè)靜態(tài)內(nèi)部類繼承AbstractQueuedSynchronizer,通過實(shí)現(xiàn)相關(guān)模版方法來完成線程同步鎖的功能。

基礎(chǔ)知識

同步器提供了3個(gè)方法(getState()、setState(int newState)、compareAndSetState(int expect, int update))來實(shí)現(xiàn)狀態(tài)的獲取的設(shè)置,上述方法通過硬件層面的原子操作保證其安全性。同時(shí),同步器還提供了一系列的模版方法(例如:tryAcquire(int arg)、tryRelease(int arg)),其本身不具體實(shí)現(xiàn)。同步組件(鎖、semaphore等)內(nèi)部通過靜態(tài)內(nèi)部類繼承同步器,實(shí)現(xiàn)相關(guān)的同步接口,完成線程同步功能。鎖是面向用戶的,其定義一系列接口,底層實(shí)現(xiàn)交給同步器完成。同步器所提供的模板方法可分為3類:獨(dú)占式獲取和釋放同步狀態(tài)、共享式獲取和釋放同步狀態(tài)、查詢同步隊(duì)列的等待情況。

實(shí)現(xiàn)分析

1.同步隊(duì)列

同步器內(nèi)部通過一個(gè)同步隊(duì)列(FIFO雙向隊(duì)列)來完成同步狀態(tài)的管理,當(dāng)線程獲取同步狀態(tài)失敗時(shí),會將線程和等待狀態(tài)信息構(gòu)造成一個(gè)Node對象,并加入到同步隊(duì)列,同時(shí)會將當(dāng)前線程阻塞。當(dāng)同步狀態(tài)釋放時(shí),會將頭節(jié)點(diǎn)的后繼節(jié)點(diǎn)線程喚醒,并修改頭節(jié)點(diǎn)引用。


同步隊(duì)列的基本結(jié)構(gòu)

上圖中同步器包含兩個(gè)節(jié)點(diǎn)類型的引用,一個(gè)指向頭節(jié)點(diǎn),一個(gè)指向尾節(jié)點(diǎn)。頭節(jié)點(diǎn)即為當(dāng)前獲取同步狀態(tài)成功的節(jié)點(diǎn),頭節(jié)點(diǎn)線程釋放同步狀態(tài)后,會喚醒后繼節(jié)點(diǎn)線程,當(dāng)后繼節(jié)點(diǎn)線程獲取同步狀態(tài)成功后會將當(dāng)前線程對應(yīng)的節(jié)點(diǎn)設(shè)置為頭節(jié)點(diǎn)。因此設(shè)置頭節(jié)點(diǎn)的操作只存在與當(dāng)前獲取同步狀態(tài)成功的線程,不存在并發(fā)操作,不需要使用CAS操作來保證其安全性。不同的是,尾節(jié)點(diǎn)的設(shè)置需要使用CAS操作,原因就是多線程并發(fā)獲取同步狀態(tài),要保證等待線程加入隊(duì)列的安全性,只有
compareAndSetTail方法設(shè)置成功后,節(jié)點(diǎn)才回進(jìn)入隊(duì)列。

2.獨(dú)占式同步狀態(tài)獲取與釋放
    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }
    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

調(diào)用同步器的acquire(int arg)方法可以獲取同步狀態(tài),該方法不處理線程中斷操作,當(dāng)線程獲取同步狀態(tài)失敗后加入同步隊(duì)列后,對線程進(jìn)行中斷無法從隊(duì)列中移除,只有通過前繼節(jié)點(diǎn)的喚醒操作,直到獲取同步狀態(tài)。線程獲取同步狀態(tài)失敗后,調(diào)用addWaiter(Node mode),將線程和等待信息構(gòu)造的節(jié)點(diǎn)加入到同步隊(duì)列中,enq(final Node node)方法是一個(gè)死循環(huán),直到節(jié)點(diǎn)節(jié)點(diǎn)成功加入同步隊(duì)列才會退出。當(dāng)節(jié)點(diǎn)加入同步隊(duì)列后調(diào)用acquireQueued(final Node node, int arg) 方法,該方法內(nèi)部為一個(gè)自旋的死循環(huán),會阻塞該線程,直到其前驅(qū)節(jié)點(diǎn)釋放同步狀態(tài),喚醒該線程,且成功獲取同步狀態(tài)后才會返回。


獨(dú)占式同步狀態(tài)獲取流程
    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

線程獲取到同步狀態(tài)并執(zhí)行完成后,通過release方法釋放同步狀態(tài),釋放成功后將喚醒頭節(jié)點(diǎn)的后繼節(jié)點(diǎn)線程。

3.共享式同步狀態(tài)獲取與釋放

共享式與同步式最大的區(qū)別在于同一時(shí)刻能否允許多個(gè)線程同時(shí)獲取同步狀態(tài)。

    public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }
    private void doAcquireShared(int arg) {
        final Node node = addWaiter(Node.SHARED);
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } catch (Throwable t) {
            cancelAcquire(node);
            throw t;
        }
    }
    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }
    private void doReleaseShared() {
        /*
         * Ensure that a release propagates, even if there are other
         * in-progress acquires/releases.  This proceeds in the usual
         * way of trying to unparkSuccessor of head if it needs
         * signal. But if it does not, status is set to PROPAGATE to
         * ensure that upon release, propagation continues.
         * Additionally, we must loop in case a new node is added
         * while we are doing this. Also, unlike other uses of
         * unparkSuccessor, we need to know if CAS to reset status
         * fails, if so rechecking.
         */
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {
                    if (!h.compareAndSetWaitStatus(Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    unparkSuccessor(h);
                }
                else if (ws == 0 &&
                         !h.compareAndSetWaitStatus(0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            if (h == head)                   // loop if head changed
                break;
        }
    }

共享式同步狀態(tài)獲取通過acquireShared方法,doAcquireShared方法實(shí)現(xiàn)為自旋操作,直到tryAcquireShared方法返回值大于0,表示成功獲取到同步狀態(tài)并退出自旋。共享式同步狀態(tài)釋放由于可能存在多個(gè)線程同時(shí)擁有同步狀態(tài),因此其在釋放時(shí)必須保證線程安全,因此在doReleaseShared方法中會同過自旋和CAS操作實(shí)現(xiàn)釋放。

4.獨(dú)占式超時(shí)獲取同步狀態(tài)

通過同步器的doAcquireNanos(int arg, long nanosTimeout)方法可以實(shí)現(xiàn)超時(shí)獲取同步狀態(tài),該方法與獨(dú)占式最大的區(qū)別在于獲取同步狀態(tài)失敗后通過一個(gè)超時(shí)時(shí)間進(jìn)行自旋,在設(shè)定的超時(shí)時(shí)間內(nèi),若成功獲取同步狀態(tài)則成功返回,即使其前面有其他等待的線程,否則失敗。除此之外,其流程基本上與獨(dú)占式獲取基本相同。

private boolean doAcquireNanos(int arg, long nanosTimeout)
        throws InterruptedException {
    if (nanosTimeout <= 0L)
        return false;
    final long deadline = System.nanoTime() + nanosTimeout;
    final Node node = addWaiter(Node.EXCLUSIVE);
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                return true;
            }
            nanosTimeout = deadline - System.nanoTime();
            if (nanosTimeout <= 0L) {
                cancelAcquire(node);
                return false;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD)
                LockSupport.parkNanos(this, nanosTimeout);
            if (Thread.interrupted())
                throw new InterruptedException();
        }
    } catch (Throwable t) {
        cancelAcquire(node);
        throw t;
    }
}
獨(dú)占式超時(shí)獲取同步狀態(tài)

結(jié)束語

本文介紹AbstractQueuedSynchronizer隊(duì)列同步器的相關(guān)知識,包括使用和實(shí)現(xiàn)原理。通過了解同步器的實(shí)現(xiàn),基本上就掌握了鎖的實(shí)現(xiàn)原理。因此,了解同步器的的實(shí)現(xiàn)對于理解鎖的實(shí)現(xiàn)機(jī)制非常重要。本篇文章內(nèi)容大部分來自《Java并發(fā)編程藝術(shù)》,有興趣的可以讀下該書,好好看看源碼。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容