ReentrantLock源碼剖析三(Condition)

一、簡(jiǎn)介

? ? ? ?JUC中的ReentrantLock給我們提供了方便的加鎖解鎖操作,但是我們有時(shí)候會(huì)需要有條件的對(duì)線程進(jìn)行掛起和喚醒,此時(shí)另一個(gè)工具就排上了用場(chǎng)。下面是Condition變量的常用用法。

public static void main(String[] args) {
        ReentrantLock lock = new ReentrantLock();
        Condition condition = lock.newCondition();

        Thread thread1 = new Thread(() -> {
            try {
                lock.lock();
                System.out.println("我要等一個(gè)新信號(hào)");
                condition.await();
            }
            catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("拿到一個(gè)信號(hào)??!");
            lock.unlock();
        }, "waitThread");

        Thread thread2 = new Thread(() -> {
            lock.lock();
            System.out.println("我拿到鎖了");
            try {
                Thread.sleep(3000);
            }
            catch (InterruptedException e) {
                e.printStackTrace();
            }
            condition.signal();
            System.out.println("我發(fā)了一個(gè)信號(hào)!!");
            lock.unlock();
        }, "signalThread");

        thread1.start();
        thread2.start();
    }

運(yùn)行完之后的結(jié)果:

我要等一個(gè)新信號(hào)
我拿到鎖了
我發(fā)了一個(gè)信號(hào)??!
拿到一個(gè)信號(hào)!!

? ? ? ?線程1獲取到鎖之后調(diào)用Condition的await()方法,該方法會(huì)釋放鎖,并將當(dāng)前線程掛起。隨后線程2會(huì)拿到鎖,并執(zhí)行signal()方法,該方法會(huì)喚起線程1,并釋放鎖,然后線程1拿到鎖,執(zhí)行后續(xù)的流程。
? ? ? ?所以說(shuō)Condition是一個(gè)多線程間協(xié)調(diào)通信的工具,使得某個(gè),或者某些線程一起等待某個(gè)條件(Condition),只有當(dāng)該條件具備( signal 或者 signalAll方法被調(diào)用)時(shí) ,這些等待線程才會(huì)被喚醒,從而重新爭(zhēng)奪鎖。
? ? ? ?那么,這些邏輯具體是怎么實(shí)現(xiàn)的呢?
? ? ? ?首先,必須要明確,在線程調(diào)用await()或者signal()/signalAll()方法時(shí),必須首先獲取鎖,否則會(huì)出現(xiàn)java.lang.IllegalMonitorStateException異常。其次,Condition(其實(shí)是ConditionObject,Condition接口的實(shí)現(xiàn))維護(hù)了一個(gè)所有等待Condition條件變量的線程的隊(duì)列,每個(gè)線程構(gòu)成一個(gè)Node結(jié)點(diǎn),也就是AQS中的Node結(jié)點(diǎn):

/** First node of condition queue. */
private transient Node firstWaiter;
/** Last node of condition queue. */
private transient Node lastWaiter;

二、await()

? ? ? ?以Condition condition = lock.newCondition();為例,newCondition()會(huì)調(diào)用Sync的newCondition()方法:

public Condition newCondition() {
        return sync.newCondition();
}

Sync的newCondition()方法:

final ConditionObject newCondition() {
            return new ConditionObject();
}
//ConditionObject是AQS中的內(nèi)部類,實(shí)例屬性只有上面的firstWaiter和lastWaiter
public ConditionObject() { }

? ? ? ?對(duì)于await()方法,可以拋出中斷異常,

public final void await() throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            //將當(dāng)前線程封裝成Node結(jié)點(diǎn),添加到Condition的等待隊(duì)列中
            Node node = addConditionWaiter();  
            int savedState = fullyRelease(node);   //釋放所占的鎖,因?yàn)樵谡{(diào)用await()方法時(shí),是占有鎖的
            int interruptMode = 0;
            //釋放了鎖之后,判斷當(dāng)前線程的node結(jié)點(diǎn)是不是在syncQueue中,
            //什么是syncQueue呢?就是獲取鎖不成功而被掛起的線程所在的那個(gè)隊(duì)列
            //如果不在syncQueue中,說(shuō)明當(dāng)前線程還不具備獲取鎖的資格,就將當(dāng)前線程掛起,直到被添加到阻塞隊(duì)列中,
            while (!isOnSyncQueue(node)) {
                LockSupport.park(this);  //線程被掛起,等待被signal喚醒,此時(shí)可以直接跳到下面的signal方法解析
                //掛起的過(guò)程中,如果被中斷了,線程被喚醒,跳出while循環(huán)
                //如果沒(méi)被中斷,則此時(shí)node已經(jīng)在阻塞隊(duì)列中了,也會(huì)跳出循環(huán)
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            //此時(shí)線程已經(jīng)被喚醒,node結(jié)點(diǎn)已經(jīng)被添加到阻塞隊(duì)列中準(zhǔn)備獲取鎖,被誰(shuí)添加到阻塞隊(duì)列了呢,是signal
            //acquireQueued嘗試獲取鎖,被中斷返回true,否則返回false
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
}

? ? ? ?其他版本的await()方法和這個(gè)類似,讀者可以自行嘗試分析。

三.signal()

//ConditionObject
public final void signal() {
            //如果在沒(méi)有獲取鎖的情況下調(diào)用signal,會(huì)拋出IllegalMonitorStateException異常
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            Node first = firstWaiter;      //拿到隊(duì)列中的第一個(gè)node,此隊(duì)列是Condition隊(duì)列
            if (first != null)
                doSignal(first);
}

private void doSignal(Node first) {
            do {
                if ( (firstWaiter = first.nextWaiter) == null)
                    lastWaiter = null;
                first.nextWaiter = null;   //將第一個(gè)隊(duì)列移出隊(duì)列
            } while (!transferForSignal(first) &&
                     (first = firstWaiter) != null);
}

//該方法是將結(jié)點(diǎn)移到阻塞隊(duì)列中,使得當(dāng)前node節(jié)點(diǎn)的線程可以有資格獲取鎖
final boolean transferForSignal(Node node) {
        /*
         * If cannot change waitStatus, the node has been cancelled.
         */
        //在加入阻塞隊(duì)列之前,將node的waitStatus設(shè)置為0,如果失敗,說(shuō)明該節(jié)點(diǎn)已經(jīng)被取消,
        //返回false,此時(shí)上面的doSignal方法會(huì)繼續(xù)遍歷Condition隊(duì)列,
        //找到第一個(gè)還在等待Condition變量的結(jié)點(diǎn)
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            return false;

        /*
         * Splice onto queue and try to set waitStatus of predecessor to
         * indicate that thread is (probably) waiting. If cancelled or
         * attempt to set waitStatus fails, wake up to resync (in which
         * case the waitStatus can be transiently and harmlessly wrong).
         */
        //將結(jié)點(diǎn)加入到阻塞隊(duì)列,返回的p結(jié)點(diǎn)是node的前置節(jié)點(diǎn)
        Node p = enq(node);
        int ws = p.waitStatus;
        //如果前一個(gè)節(jié)點(diǎn)已經(jīng)被取消等待(ws>0),或者修改waitStatus失敗,則直接喚醒。
        //正常情況 ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)這個(gè)判斷
        //是不會(huì)為true的,所以,不會(huì)在這個(gè)時(shí)候喚醒該線程。
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
            LockSupport.unpark(node.thread);
        return true;   //成功轉(zhuǎn)移到阻塞隊(duì)列,返回true
}

? ? ? ?那么什么時(shí)候才會(huì)喚醒呢?當(dāng)前線程調(diào)用完 signal()之后,執(zhí)行lock.unlock()方法之后,釋放鎖的時(shí)候,會(huì)喚醒其他線程,這個(gè)就是unlock()的邏輯了。
? ? ? ?signalAll()方法的邏輯與signal()類似,讀者可以自己分析一遍。

四.總結(jié)

? ? ? ?現(xiàn)在我們?cè)賮?lái)理一遍:
? ? ? ?一個(gè)線程Alock.lock()獲取鎖成功之后,調(diào)用condition.await()方法,該方法會(huì)首先將線程封裝成node加入Condition隊(duì)列,然后釋放鎖,將線程移出阻塞隊(duì)列,然后掛起;另一個(gè)線程B因?yàn)橹熬€程釋放鎖,獲取鎖成功,調(diào)用signal()方法,將Condition隊(duì)列的第一個(gè)node轉(zhuǎn)移到阻塞隊(duì)列,這個(gè)時(shí)候還沒(méi)完,執(zhí)行完signal之后會(huì)釋放鎖,此時(shí)會(huì)喚醒后面的線程,此時(shí)線程A有機(jī)會(huì)競(jìng)爭(zhēng)到鎖。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容