Condition的await、signal源碼分析

本文從源碼角度,以AbstractQueuedSynchronizer.ConditionObject的await、signal、signalAll為入口,對基于ReentrantLock的線程間通信的原理進行介紹,其中涉及到的與ReentrantLock相關(guān)的方法,在ReentrantLock一文中已經(jīng)介紹過,這里不再說明。
源碼注釋中同步隊列指ReentrantLock維護的隊列,條件隊列指ConditionObject維護的隊列,對于內(nèi)部數(shù)據(jù)結(jié)構(gòu)的組織形式,這里不進行介紹。
由于是多線程并發(fā),所以會有多種不同的情況,源碼中針對每個條件,僅給出一兩種情形,留作參考。

類名簡寫說明:

  • CO:ConditionObject
  • AQS:AbstractQueuedSynchronizer

1. AQS.CO.await()

(1)AQS.CO.await()

    public final void await() throws InterruptedException {
        // 先判斷當前線程是否被中斷,被中斷則拋出異常
        if (Thread.interrupted())
            throw new InterruptedException();
        // 將當前線程對應(yīng)的節(jié)點加入條件隊列
        Node node = addConditionWaiter();
        // 完全釋放鎖,因為可重入,所以saveState可能大于1
        int savedState = fullyRelease(node);
        int interruptMode = 0;
        // 若不在同步隊列中,被喚醒后會再次進入while中被park
        while (!isOnSyncQueue(node)) {
            // 掛起當前線程
            LockSupport.park(this);
            // checkInterruptWhileWaiting中會判斷當前線程是否是被中斷喚醒的
            // 返回值非0表示是被中斷喚醒的,會通過break跳出while
            if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                break;
        }
        // 條件1:
        //   為true的情形:
        //     當前線程在上面while中park時被中斷,在acquireQueued中park時再次被中斷時條件1為true
        // 條件2:
        //   條件2為true,即interruptMode是0或REINTERRUPT,為
        //   REINTERRUPT的情況詳見下面(5)處,為0的情況尚不明確
        if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
            // 從上面舉的情形中來看,到這里時interruptMode就是REINTERRUPT,
            // 這里難道是Doug Lea老先生的一個編碼缺陷?
            interruptMode = REINTERRUPT;
        // 該條件成立的情形:
        //   假設(shè)有三個線程thread0、thread1、thread2,三個線程依次調(diào)用AQS.CO.await(),之后三者均
        //   在上面while中被park,此時,外部線程中斷thread0,thread0會執(zhí)行到這里,此時條件為true
        if (node.nextWaiter != null) 
            // 斷開條件隊列中所有不是Node.CONDITION狀態(tài)的節(jié)點(代碼不再貼出)
            unlinkCancelledWaiters();
        // 該條件成立時,說明當前線程被中斷,interruptMode為THROW_IE或REINTERRUPT
        if (interruptMode != 0)
            // 方法中會根據(jù)interruptMode的值做相應(yīng)處理(代碼不再貼出)
            reportInterruptAfterWait(interruptMode);
    }

(2)AQS.CO.addConditionWaiter

    private Node addConditionWaiter() {
        Node t = lastWaiter;
        // 條件2:
        //   為true時的情形:
        //     情形1:假設(shè)有兩個線程thread0、thread1,初始時,thread0在未持有鎖的情況下調(diào)用
        //            AQS.CO.await(),當thread0執(zhí)行到AQS.fullyRelease中時會將其對應(yīng)節(jié)點
        //            的waitStatus字段設(shè)置為取消狀態(tài)(見下面(4)處),之后持有鎖的thread1調(diào)用
        //            AQS.CO.await(),會執(zhí)行到這里,這里t.waitStatus為1
        //     情形2:假設(shè)有兩個線程thread0、thread1,初始時,thread0持有鎖,之后調(diào)用AQS.CO.await()
        //            釋放鎖并阻塞在LockSupport.park(this)處,之后外部線程中斷thread0,thread0被喚
        //            醒后會執(zhí)行到AQS.transferAfterCancelledWait里的if處將t.waitStatus設(shè)置為0,之
        //            后thread1獲取到鎖,執(zhí)行到這里時,t.waitStatus為0,條件2為true
        if (t != null && t.waitStatus != Node.CONDITION) {
            // 斷開條件隊列中所有不是Node.CONDITION狀態(tài)的節(jié)點(代碼不再貼出)
            unlinkCancelledWaiters();
            // 更新t。經(jīng)過unlinkCancelledWaiters,lastWaiter可能已經(jīng)改變
            t = lastWaiter;
        }
        // 創(chuàng)建新的狀態(tài)為Node.CONDITION的節(jié)點,并連接到同步隊列中
        Node node = new Node(Thread.currentThread(), Node.CONDITION);
        if (t == null)
            firstWaiter = node;
        else
            t.nextWaiter = node;
        lastWaiter = node;
        return node;
    }

(3)AQS.fullyRelease

    final int fullyRelease(Node node) {
        boolean failed = true;
        try {
            // 獲取AQS.state
            // 因為鎖可重入,所以saveState可能大于1
            int savedState = getState();
            // 傳入saveState而不是1,因此該方法名叫"fully"Release
            // 該方法在ReentrantLock的1.2(2)處已經(jīng)說明,這里不再介紹
            if (release(savedState)) {
                failed = false;
                // 返回saveState,再次喚醒該線程時會用到
                return savedState;
            } else { // 可以說不會執(zhí)行到該分支
                throw new IllegalMonitorStateException();
            }
        } finally {
            // 當未持有鎖的線程調(diào)用await方法時,release(savedState)中會拋異常,此時failed為true
            if (failed)
                // 設(shè)置為取消狀態(tài)
                node.waitStatus = Node.CANCELLED;
        }
    }

(4)AQS.isOnSyncQueue

    final boolean isOnSyncQueue(Node node) {
        // 一般情況下,初次進入該方法,條件1為true,該方法返回false,
        // 下面對初次進入該方法時的一些特殊情形進行介紹:
        // a.情形:
        //   假設(shè)有兩個線程thread0、thread1,初始時,thread0持有鎖,調(diào)用了AQS.CO.await(),從
        //   while (!isOnSyncQueue(node))處到這里(還未執(zhí)行),thread1獲取到鎖,調(diào)用了AQS.CO.signal,
        //   一直調(diào)到AQS.transferForSignal,在方法里的第一個if執(zhí)行后,會將node.waitStatus設(shè)置為0,
        //   使得這里的條件1為false,因為thread1還未執(zhí)行AQS.transferForSignal的Node p=enq(node),
        //   因此這里node.prev為null,條件2為true。
        // b.接著上面的情形繼續(xù)延伸:
        //   若thread0仍未執(zhí)行該if語句,thread1執(zhí)行完AQS.transferForSignal
        //   的Node p=enq(node),則條件2變?yōu)閒alse
        // c.繼續(xù)延伸:
        //   若thread0仍未執(zhí)行該if語句,線程thread2與來競爭鎖,因為thread0此時仍持有鎖,
        //   因此,thread2會被加入到同步隊列中,經(jīng)過一些列步驟后,會被park,期間會將
        //   thread1對應(yīng)節(jié)點的waitStatus設(shè)置為-1,條件1仍為false
        if (node.waitStatus == Node.CONDITION || node.prev == null)
            return false; // 返回false,表示node不在同步隊列中
        // 條件為false的情形:
        //   上面a、b情形不變,將c處的情形改為:thread2執(zhí)行到到AQS.addWaiter的pred.next = node;
        //   處時(該語句還未執(zhí)行),thread0執(zhí)行到這里,此時該條件為false
        // 條件為true的情形:
        //   thread0執(zhí)行到這里時,thread2已經(jīng)執(zhí)行完AQS.addWaiter的pred.next = node;處,此時該條件成立
        if (node.next != null) // If has successor, it must be on queue
            return true;
        
        // 到這里說明node.waitStatus不為Node.CONDITION且node.prev不為null
        
        // findNodeFromTail中會從tail向前遍歷同步隊列,尋找node是否已在隊列中,存在則返回true,否則返回false。
        // 情形:
        //   上面a情形不變,將b處的情形改為:thread1執(zhí)行到AQS.enq的if (compareAndSetTail(t, node))
        //   (該語句還未執(zhí)行),或執(zhí)行了但由于其他線程的競爭導(dǎo)致執(zhí)行失敗了,findNodeFromTail
        //   中會返回false;若執(zhí)行了且成功了,則findNodeFromTail會返回true
        //   (多數(shù)情況下node都在同步隊列的尾部附近)
        return findNodeFromTail(node);
    }

(5)AQS.CO.checkInterruptWhileWaiting

    private int checkInterruptWhileWaiting(Node node) {
        return Thread.interrupted() ?
            // 被中斷
            // 返回THROW_IE表示當前線程在其他線程調(diào)用signal前被中斷
            // 返回REINTERRUPT表示當前線程在其他線程調(diào)用signal后被中斷
            // 具體的分界點就是node.waitStatus的值,若其值為Node.CONDITION,
            // 則是signal前被中斷,否則在signal后被中斷
            (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
            0;
    }   
(6)AQS.transferAfterCancelledWait 
    final boolean transferAfterCancelledWait(Node node) {
        // 該條件為true的情形:
        //   假設(shè)僅有thread0,thread0持有鎖后調(diào)用AQS.CO.await()被park,
        //   之后被外部線程中斷,會執(zhí)行到這里,此時條件為true
        // 該條件為false的情形:
        //   假設(shè)有兩個線程thread0、thread1,thread0,thread0持有鎖后調(diào)用AQS.CO.await()被park,thread1
        //   獲取到鎖后調(diào)用AQS.CO.signal,之后會執(zhí)行到AQS.transferForSignal的第一個if處,該if語句執(zhí)行
        //   完后,這里node.waitStatus被修改為0,所以thread0執(zhí)行這里的if語句時會失敗
        if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
            enq(node); // 入隊
            return true;
        }
        // 到這里說明其他線程調(diào)用了AQS.CO.signal,將當前線程對應(yīng)的節(jié)點加入同步隊列,這里自選等待入隊完成
        while (!isOnSyncQueue(node))
            Thread.yield(); // 主動讓出當前線程的CPU時間片
        return false;
    }   

2. AQS.CO.signal

(1)AQS.CO.signal

    public final void signal() {
        // 持有鎖的線程不是當前線程(即外部某個線程調(diào)用了該方法)則拋異常
        if (!isHeldExclusively())
            throw new IllegalMonitorStateException();
        // 獲取條件隊列中的第一個節(jié)點
        Node first = firstWaiter;
        if (first != null)
            doSignal(first);
    }

(2)AQS.CO.doSignal

    private void doSignal(Node first) {
        // 遍歷條件隊列,僅喚醒第一個符合條件的線程
        do {
            // 更新firstWaiter
            // firstWaiter為null,說明已到隊列尾部
            if ( (firstWaiter = first.nextWaiter) == null)
                // 將lastWaiter設(shè)置為null
                lastWaiter = null;
            // 將first與其后繼節(jié)點斷開   
            first.nextWaiter = null;
            // 條件1:只要一個線程入隊成功,transferForSignal返回true,就會終止該循環(huán)
            // 條件2:用于判斷是否到隊列尾部
        } while (!transferForSignal(first) &&
                 (first = firstWaiter) != null);
    }

(3)AQS.transferForSignal

    final boolean transferForSignal(Node node) {

        // 條件為true的情形(注意前面的"!"): 
        //   情形1:假設(shè)有兩個線程thread0、thread1,初始時,thread0在未持有鎖的情況下調(diào)用
        //          AQS.CO.await(),當thread0執(zhí)行到AQS.fullyRelease中時會將其對應(yīng)節(jié)點的
        //          waitStatus字段設(shè)置為取消狀態(tài)(見上面1.(4)處),之后持有鎖的thread1調(diào)用
        //          AQS.CO.signal就會執(zhí)行到這里,此時條件為true
        //   情形2:假設(shè)有兩個線程thread0、thread1,初始時,持有鎖的thread0調(diào)用AQS.CO.await(),
        //          執(zhí)行到該方法的while里被park,之后外部線程中斷thread0,thread0被喚醒后會執(zhí)
        //          行到AQS.transferAfterCancelledWait的第一個if處將node.waitStatus設(shè)置為0,
        //          之后thread1獲取到鎖,調(diào)用AQS.CO.signal執(zhí)行到這里,條件就為true
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            return false;
        // 入隊
        // 注意p是node的前驅(qū)節(jié)點
        Node p = enq(node);
        int ws = p.waitStatus;
        // 條件1:
        //   為true時的情形:
        //     假設(shè)有三個線程thread0、thread1、thread2,初始時,持有鎖的thread0調(diào)用AQS.CO.await(),執(zhí)行到該方法
        //     的while里被park,之后thread1獲取到鎖,調(diào)用AQS.CO.signal執(zhí)行到上面Node p = enq(node);處(該行還未執(zhí)行),
        //     之后thread2通過ReentrantLock.lockInterruptibly獲取鎖,因為thread1仍持有鎖,因此thread2最終會在
        //     AQS.parkAndCheckInterrupt中被park,之后外部線程中斷thread2,當thread2執(zhí)行完AQS.cancelAcquire的
        //     node.waitStatus = Node.CANCELLED;后,thread1從上面Node p = enq(node);處執(zhí)行到這里,此時ws為1
        // 條件2:
        //   為true時的情形(注意前面的"!"):
        //     對于條件1為true時的情形,在thread2被park后,若thread1先執(zhí)行完ws > 0(條件2還未執(zhí)行)(此時ws為0,
        //     條件1不成立),此時thread2,被中斷后在AQS.cancelAcquire的node.waitStatus = Node.CANCELLED;處
        //     將p.waitStatus設(shè)置為1,此時條件2就為true
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
            // node.thread會從AQS.CO.await里的park處被喚醒,因為已經(jīng)在同步隊列中了,因此會跳出while,進
            // 入AQS.acquireQueued中,若此時p前面仍有節(jié)點等待獲取鎖或p前面沒有節(jié)點了但調(diào)用AQS.CO.signal
            // 的線程仍未釋放鎖,node.thread在嘗試幾次后最終仍會在AQS.parkAndCheckInterrupt中被park
            LockSupport.unpark(node.thread);
        return true;
    }

3. AQS.CO.signalAll

AQS.CO.signalAll與AQS.CO.signal類似,區(qū)別是signalAll會將所有節(jié)點加入同步隊列,下面僅對AQS.CO.doSignalAll方法進行介紹:

    private void doSignalAll(Node first) {
        // 將條件隊列的頭節(jié)點和尾節(jié)點都置為null
        lastWaiter = firstWaiter = null;
        // 遍歷條件隊列,依次喚醒所有線程
        do {
            Node next = first.nextWaiter;
            first.nextWaiter = null;
            transferForSignal(first);
            first = next;
        } while (first != null);
    }
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容