AbstractQueuedSynchronizer 詳細(xì)解讀

概述

AQS( AbstractQueuedSynchronizer ) 是一個(gè)用于構(gòu)建鎖和同步器的框架,許多同步器都可以通過(guò)AQS很容易并且高效地構(gòu)造出來(lái)。如: ReentrantLock 和 Semaphore都是基于AQS構(gòu)建的,還包括CountDownLatch、ReentrantReadWriteLock、SynchronousQueue和FutureTask。

AQS解決了在實(shí)現(xiàn)同步器時(shí)涉及的大量細(xì)節(jié)問(wèn)題,例如等待線程采用FIFO隊(duì)列操作順序。在不同的同步器中還可以定義一些靈活的標(biāo)準(zhǔn)來(lái)判斷某個(gè)線程是應(yīng)該通過(guò)還是需要等待。本文通過(guò)介紹AQS相關(guān)API以及juc包下相關(guān)實(shí)現(xiàn)類對(duì)其進(jìn)行介紹

原理

使用volatile的整形變量state用來(lái)表示當(dāng)前是否可以獲取鎖,如在某些非共享鎖里,state=1 則代表當(dāng)前鎖已經(jīng)被占有,此時(shí)如果有線程來(lái)請(qǐng)求鎖時(shí)且當(dāng)前獨(dú)占模式的鎖已經(jīng)被其他線程占有,則會(huì)進(jìn)入AQS里維持的CLH隊(duì)列(FIFO)里排隊(duì),HEAD 位置的線程為正在占用鎖的線程,當(dāng)它釋放鎖時(shí),會(huì)喚醒Head的next 節(jié)點(diǎn)的線程,next節(jié)點(diǎn)使用自旋的方式不斷的嘗試獲取鎖。 自定義的同步器在實(shí)現(xiàn)時(shí),只需要定義state變量獲?。╰ryAcquire)與釋放(tryRelease)的規(guī)則,其他細(xì)節(jié)在AQS里已經(jīng)實(shí)現(xiàn)。

獲取 state 變量.png

獨(dú)占模式

    /**
     * Acquires in exclusive mode, ignoring interrupts.  Implemented
     * by invoking at least once {@link #tryAcquire},
     * returning on success.  Otherwise the thread is queued, possibly
     * repeatedly blocking and unblocking, invoking {@link
     * #tryAcquire} until success.  This method can be used
     * to implement method {@link Lock#lock}.
     *
     * @param arg the acquire argument.  This value is conveyed to
     *        {@link #tryAcquire} but is otherwise uninterpreted and
     *        can represent anything you like.
     */
    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

根據(jù) java doc的注釋:以獨(dú)占模式獲取,忽略中斷,通過(guò)最少調(diào)用一次 tryAcquire 來(lái)返回成功,否則將該線程放入隊(duì)列,可能一直阻塞或非阻塞,直到調(diào)用 tryAcquire 返回成功,該方法可以用來(lái)實(shí)現(xiàn)鎖。
可以看出 AbstractQueuedSynchronizer 中的 acquire 是獨(dú)占模式需要調(diào)用的方法,該方法又依次調(diào)用了 3個(gè)方法:
tryAcquire(arg)

    protected boolean tryAcquire(int arg) {
        throw new UnsupportedOperationException();
    }

默認(rèn)實(shí)現(xiàn)拋出一個(gè) UnsupportedOperationException 異常 ,將該方法的具體實(shí)現(xiàn)交給子類,那么這里的入?yún)?int arg 便是與 AbstractQueuedSynchronizer 中的 state 變量來(lái)比較確認(rèn)當(dāng)前線程是否可以獲取鎖,一般情況下,如果是獨(dú)占鎖,獲取鎖時(shí)通過(guò)CAS 將 state 置為 1 ,釋放鎖時(shí) 通過(guò) CAS 將 state 置為 0

addWaiter(Node.EXCLUSIVE)

    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }

該方法用于將當(dāng)前線程的狀態(tài)封裝到 Node 節(jié)點(diǎn)內(nèi),然后通過(guò)尾插法將 node 節(jié)點(diǎn)放入到鏈表末尾

acquireQueued(addWaiter(Node.EXCLUSIVE), arg))

   final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            //判斷當(dāng)前節(jié)點(diǎn)是否被通知中斷
            boolean interrupted = false;
            //自旋方式,不斷嘗試獲取資源
            for (;;) {
                final Node p = node.predecessor();
                //如果上一個(gè)節(jié)點(diǎn)是Head節(jié)點(diǎn),則嘗試獲取資源
                if (p == head && tryAcquire(arg)) {
                    //獲取資源成功,則將當(dāng)前節(jié)點(diǎn)設(shè)為Head節(jié)點(diǎn)
                    setHead(node);
                    //當(dāng)前節(jié)點(diǎn)獲取資源成功,那么之前的Head節(jié)點(diǎn)需要釋放,因此將之前Head相關(guān)引用設(shè)為NULL,幫助GC回收
                    p.next = null; // help GC
                    failed = false;
                    //返回中斷過(guò)程是否中斷過(guò)
                    return interrupted;
                }
                //遞歸判斷當(dāng)前節(jié)點(diǎn)的上一個(gè)節(jié)點(diǎn)的waitStatus是否有效
                //park 當(dāng)前Node節(jié)點(diǎn)的線程(wait當(dāng)前線程),檢查是否中斷
                if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

自旋一次,嘗試獲取鎖??梢钥吹?node.predecessor(); 返回的是當(dāng)前node的父節(jié)點(diǎn),如果父節(jié)點(diǎn)是Head 節(jié)點(diǎn),那么當(dāng)前 node 節(jié)點(diǎn)就會(huì)再次執(zhí)行 tryAcquire(arg) 方法,若返回 success 則獲取鎖,失敗則 park 當(dāng)前線程進(jìn)入等待喚醒的狀態(tài),同時(shí)檢查當(dāng)前線程是否中斷,如果被設(shè)置了中斷的標(biāo)記,則 將interrupted 設(shè)置為true,在獲取資源成功的時(shí)候,去響應(yīng)中斷

shouldParkAfterFailedAcquire(Node pred, Node node)

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        //判斷前一個(gè)節(jié)點(diǎn)的waitStatus 是否小于0,如果大于0,則該節(jié)點(diǎn)已經(jīng)被取消
        int ws = pred.waitStatus;
        //如果為等待通知,則表示等待unpark(等待喚醒)
        if (ws == Node.SIGNAL)
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
            return true;
        //如果上一個(gè)節(jié)點(diǎn)被取消掉(大于0)則跳過(guò),循環(huán)的找上一節(jié)點(diǎn)為未取消的(小于0)
        if (ws > 0) {
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
            //如果當(dāng)前節(jié)點(diǎn)的waitStatus 小于0,但不等于SIGNAL,則設(shè)置為SIGNAL
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

shouldParkAfterFailedAcquire(Node pred, Node node) 此方法主要用于檢查waitStatus的狀態(tài),看看自己是否真的可以去休息了(進(jìn)入waiting狀態(tài)),萬(wàn)一隊(duì)列前邊的線程都放棄了,則丟棄引用,讓GC回收。

Node節(jié)點(diǎn)中維持著waitStatus這個(gè)字段代表的含義如下:

/** waitStatus value to indicate thread has cancelled */
    //值為1,在同步隊(duì)列中等待的線程等待超時(shí)或被中斷,需要從同步隊(duì)列中取消該Node的結(jié)點(diǎn),其結(jié)點(diǎn)的waitStatus為CANCELLED,即結(jié)束狀態(tài),進(jìn)入該狀態(tài)后的結(jié)點(diǎn)將不會(huì)再變化。
    static final int CANCELLED =  1;
    /** waitStatus value to indicate successor's thread needs unparking */
    //值為-1,被標(biāo)識(shí)為該等待喚醒狀態(tài)的后繼結(jié)點(diǎn),當(dāng)其前繼結(jié)點(diǎn)的線程釋放了同步鎖或被取消,將會(huì)通知該后繼結(jié)點(diǎn)的線程執(zhí)行。說(shuō)白了,
    //就是處于喚醒狀態(tài),只要前繼結(jié)點(diǎn)釋放鎖,就會(huì)通知標(biāo)識(shí)為SIGNAL狀態(tài)的后繼結(jié)點(diǎn)的線程執(zhí)行。
    static final int SIGNAL    = -1;
    /** waitStatus value to indicate thread is waiting on condition */
    //值為-2,與Condition相關(guān),該標(biāo)識(shí)的結(jié)點(diǎn)處于等待隊(duì)列中,結(jié)點(diǎn)的線程等待在Condition上,當(dāng)其他線程調(diào)用了Condition的signal()
    // 方法后,CONDITION狀態(tài)的結(jié)點(diǎn)將從等待隊(duì)列轉(zhuǎn)移到同步隊(duì)列中,等待獲取同步鎖。
    static final int CONDITION = -2;
    /**
     * waitStatus value to indicate the next acquireShared should
     * unconditionally propagate
     */
    //值為-3,與共享模式相關(guān),在共享模式中,該狀態(tài)標(biāo)識(shí)結(jié)點(diǎn)的線程處于可運(yùn)行狀態(tài)。
    static final int PROPAGATE = -3;
    //0  代表為初始狀態(tài)

parkAndCheckInterrupt()

private final boolean parkAndCheckInterrupt() {
        //調(diào)用park()使線程進(jìn)入waiting狀態(tài)
        LockSupport.park(this);
        //如果被喚醒,查看自己是不是被中斷的
        return Thread.interrupted();
    }

parkAndCheckInterrupt() 讓線程去休息,真正進(jìn)入等待狀態(tài)。park()會(huì)讓當(dāng)前線程進(jìn)入waiting狀態(tài)。在此狀態(tài)下,有兩種途徑可以喚醒該線程:1)被unpark();2)被interrupt()。需要注意的是,Thread.interrupted()會(huì)清除當(dāng)前線程的中斷標(biāo)記位。

獨(dú)占模式的大致流程知道了后,用流程圖來(lái)總結(jié)下(貼上源碼)

    public final void acquire(int arg) {
         if (!tryAcquire(arg) &&
             acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
             selfInterrupt();
     }

  1. 調(diào)用自定義同步器的tryAcquire()嘗試直接去獲取資源,如果成功則直接返回;
    沒(méi)成功,則addWaiter()將該線程加入等待隊(duì)列的尾部,并標(biāo)記為獨(dú)占模式;
  2. acquireQueued()使線程在等待隊(duì)列中休息,有機(jī)會(huì)時(shí)(輪到自己,會(huì)被
    unpark())會(huì)去嘗試獲取資源。獲取到資源后才返回。如果在整個(gè)等待過(guò)程中被中斷過(guò),則返回true,否則返回false。
  3. 如果線程在等待過(guò)程中被中斷過(guò),它是不響應(yīng)的。只是獲取資源后才再進(jìn)行自我中斷selfInterrupt(),將中斷補(bǔ)上。
獨(dú)占流程.png

釋放鎖

release(int arg)

    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

release(int arg) 獨(dú)占模式下線程釋放共享資源的頂層入口。它會(huì)釋放指定量的資源,如果徹底釋放了(即state=0),它會(huì)喚醒等待隊(duì)列里的其他線程來(lái)獲取資源。這也正是unlock()的語(yǔ)義,當(dāng)然不僅僅只限于unlock()。

tryRelease

    protected boolean tryRelease(int arg) {
        throw new UnsupportedOperationException();
    }

tryRelease(int arg) 需要子類實(shí)現(xiàn)的獲取資源成功的方法,用作父類根據(jù)子類釋放是否成功作為判斷依據(jù)

unparkSuccessor(Node node)

private void unparkSuccessor(Node node) {
        int ws = node.waitStatus;
        if (ws < 0) //設(shè)置當(dāng)前線程的waitStatus為初始狀態(tài) 0
            compareAndSetWaitStatus(node, ws, 0);
        Node s = node.next; //找到head節(jié)點(diǎn)的下一個(gè)需要喚醒的節(jié)點(diǎn),如果下一個(gè)節(jié)點(diǎn)被取消了(>0),則從tail往前找
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        //unpart 喚醒線程
        if (s != null)
            LockSupport.unpark(s.thread);
    }

unparkSuccessor(Node node) 方法用來(lái)將 Head(當(dāng)前持有鎖的 Node)節(jié)點(diǎn)的 waitStatus 的值重置為 0,然后找到 Head 的下一個(gè)未取消 (cancel)的節(jié)點(diǎn)找出來(lái)并喚醒(因此喚醒的節(jié)點(diǎn)便可以繼續(xù)在 acquireQueued 中自旋獲取鎖)

疑問(wèn)

在 unparkSuccessor(Node node) 中

        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }

如果發(fā)現(xiàn) s 為空,則從 tail 尾向 prev 前遍歷整個(gè)鏈表,找到未取消(waitStatus < 0)的節(jié)點(diǎn),那么為什么這里不直接從s 的 next 節(jié)點(diǎn)往后找呢?
個(gè)人猜測(cè)這里可能是因?yàn)橛?jì)算機(jī)的局部性原理,如果head 的 next節(jié)點(diǎn)取消了,那么next 節(jié)點(diǎn)的 next 節(jié)點(diǎn)等也又可能取消了,因此從最新的尾部向前找似乎更合理。

但是這里我又有一個(gè)新的問(wèn)題!

在入隊(duì) 調(diào)用 addWaiter(Node mode)方法的時(shí)候,因?yàn)楸旧硎俏膊宸?,此時(shí)便檢查 prev.waitStatus < 0 == true,如果為 false 則往 prev 前繼續(xù)找有效的尾部節(jié)點(diǎn),而不是在釋放鎖的地方去判斷?

感謝閱讀,您的點(diǎn)贊加關(guān)注是對(duì)我最大的支持

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容