Java多線程(二十三)---Condition接口

移步java多線程系列文章

  • 任意一個(gè)Java對(duì)象,都擁有一組監(jiān)視器方法(定義在java.lang.Object上),主要包括wait()、wait(long timeout)、notify()以及notifyAll()方法,這些方法與synchronized同步關(guān)鍵字配合,可以實(shí)現(xiàn)等待/通知模式。
  • Condition接口也提供了類似Object的監(jiān)視器方法,與Lock配合可以實(shí)現(xiàn)等待/通知模式

1.Condition接口

Condition接口意義在于提供一種針對(duì)使用條件狀態(tài)(隊(duì)列或變量)的線程進(jìn)行阻塞和喚醒的機(jī)制 :
阻塞: 阻塞線程進(jìn)入等待狀態(tài)同時(shí)釋放關(guān)聯(lián)鎖,直到被其他線程將條件狀態(tài)設(shè)置為true為止

喚醒: 喚醒等待在條件隊(duì)列中的線程,同時(shí)提供對(duì)喚醒全部的支持

鎖關(guān)聯(lián): 值得注意的是,由于多線程下該狀態(tài)是共享的,因此通常會(huì)通過于鎖關(guān)聯(lián)保證其原子性

解除等待: 當(dāng)線程解除等待狀態(tài)(被喚醒、中斷、超時(shí)等)后仍需重新競爭鎖,獲取鎖后才能夠從暫停位置開始繼續(xù)往后執(zhí)行

/*
 * Conditions (also known as `condition queues` or `condition variables`) 
 * provide a means for one thread to suspend execution to wait
 * until notified by another thread that some state condition may now be true.
 * 
 * Conditions(如條件隊(duì)列或條件變量)的作用如下:
 *  暫停線程并進(jìn)入等待狀態(tài),直到被其他線程將條件狀態(tài)設(shè)置為true為止
 *  值得注意的是:由于多線程下該狀態(tài)是共享的,因此通常會(huì)通過于鎖關(guān)聯(lián)保證其原子性
 *  如Lock#newCondition newCondition()方法
 */
public interface Condition {
    /**
     * Causes the current thread to wait until it is signalled or {Thread#interrupt interrupted}
     * 線程等待直到被喚醒或被中斷
     * 阻塞方法的實(shí)現(xiàn)有幾個(gè)準(zhǔn)則:
     *  1.當(dāng)前線程持有的鎖(關(guān)聯(lián)該條件變量)將被釋放
     *  2.當(dāng)線程解除等待狀態(tài)后仍需重新競爭鎖,獲取鎖后才能夠從當(dāng)前位置開始繼續(xù)往后執(zhí)行
     */
    void await() throws InterruptedException;
    /**
     * Causes the current thread to wait until it is signalled.
     * 線程等待直到被喚醒
     */
    void awaitUninterruptibly();
    /**
     * Causes the current thread to wait until it is signalled or interrupted,
     * or the specified waiting time elapses.
     * 線程等待直到被喚醒或被中斷或超時(shí)
     */
    long awaitNanos(long nanosTimeout) throws InterruptedException;
    /**
     * Causes the current thread to wait until it is signalled or interrupted,
     * or the specified waiting time elapses. This method is behaviorally
     * equivalent to: {@code awaitNanos(unit.toNanos(time)) > 0}
     * 線程等待直到被喚醒或被中斷或超時(shí)
     * 該方法等同于 awaitNanos(unit.toNanos(time)) > 0
     */
    boolean await(long time, TimeUnit unit) throws InterruptedException;
    /**
     * Causes the current thread to wait until it is signalled or interrupted,
     * or the specified deadline elapses. 
     * 線程等待直到被喚醒或被中斷或超時(shí)
     */
    boolean awaitUntil(Date deadline) throws InterruptedException;
    /**
     * Wakes up one waiting thread.
     * 喚醒在條件隊(duì)列中等待的一個(gè)線程
     * 注意:線程被喚醒后仍需重新競爭鎖,獲取鎖后才能夠從當(dāng)前位置開始繼續(xù)往后執(zhí)行
     */
    void signal();
    /**
     * Wakes up all waiting threads.
     * 喚醒在條件隊(duì)列中等待的所有線程
     */
    void signalAll();
}

2.ConditionObject綜述

ConditionObject包括如下方面:
定位: ConditionObject是AQS的內(nèi)部類,其通過實(shí)現(xiàn)Condition接口提供對(duì)管程形式的條件的支持

作用: ConditionObject在AQS中作為對(duì)Lock的實(shí)現(xiàn)支持

使用: 當(dāng)條件不滿足時(shí)線程入隊(duì),當(dāng)條件滿足時(shí)出隊(duì)并重新嘗試獲取鎖,獲取成功后從暫停位置開始繼續(xù)往后執(zhí)行

3.ConditionObject實(shí)現(xiàn)原理

ConditionObject的實(shí)現(xiàn)有幾個(gè)注意事項(xiàng):
1.內(nèi)部會(huì)維護(hù)一個(gè)條件等待同步隊(duì)列,根據(jù)FIFO原則執(zhí)行入隊(duì)出隊(duì)操作

2.通過firstWaiter維護(hù)頭節(jié)點(diǎn),lastWaiter維護(hù)尾節(jié)點(diǎn)

3.節(jié)點(diǎn)通過nextWaiter記錄后繼條件節(jié)點(diǎn)形成鏈表結(jié)構(gòu),遍歷時(shí)從頭節(jié)點(diǎn)開始沿著nextWaiter順序遍歷

4.條件隊(duì)列中節(jié)點(diǎn)狀態(tài)waitStatus只能為0或Node.CONDITION

5.使用條件隊(duì)列的前提是線程需要持有同步鎖,且只支持獨(dú)占模式

6.當(dāng)節(jié)點(diǎn)在條件隊(duì)列被喚醒(因signal|timeout|interrupt)后,需要進(jìn)行節(jié)點(diǎn)轉(zhuǎn)移,即由條件節(jié)點(diǎn)轉(zhuǎn)變?yōu)橥焦?jié)點(diǎn)

4.ConditionObject組成

4.1 類定義

public class ConditionObject implements Condition, java.io.Serializable

4.2 構(gòu)造器

public ConditionObject() { }

4.3 重要變量

/** 條件等待隊(duì)列首節(jié)點(diǎn) */
private transient Node firstWaiter;
/** 條件等待隊(duì)列尾節(jié)點(diǎn) */
private transient Node lastWaiter;
/** 退出時(shí)重新中斷*/
private static final int REINTERRUPT =  1;
/** 退出時(shí)直接拋出異常 */
private static final int THROW_IE    = -1;
/** 獨(dú)占模式時(shí)才能使用條件隊(duì)列 ,用于鏈接下一個(gè)等待節(jié)點(diǎn) */
Node nextWaiter;

5.條件節(jié)點(diǎn)阻塞

5.1 不響應(yīng)中斷阻塞

條件節(jié)點(diǎn)阻塞的一般流程:
1.入隊(duì): 根據(jù)FIFO原則,新節(jié)點(diǎn)會(huì)被封裝成Node并被加入到條件隊(duì)列隊(duì)尾

2.釋放: 為不影響其他線程,當(dāng)前已持有鎖的線程需要先釋放所有鎖,讓出鎖資源

3.阻塞: 在條件隊(duì)列中的節(jié)點(diǎn)線程需要被阻塞,直到條件滿足后重試獲取同步鎖

4.重試: 當(dāng)條件滿足后需要重試獲取同步鎖,獲取成功后才能繼續(xù)從暫停位置繼續(xù)向后執(zhí)行

補(bǔ)充:條件滿足指的是線程被其他線程喚醒、或超時(shí)、或中斷后且位于同步隊(duì)列中

public final void awaitUninterruptibly() {
    //1.新節(jié)點(diǎn)入條件隊(duì)列
    Node node = addConditionWaiter();
    //2.當(dāng)前線程已持有鎖,但由于要被阻塞,為不影響其他線程,需要先釋放鎖
    int savedState = fullyRelease(node);
    //3.阻塞在條件隊(duì)列中的不滿足條件的節(jié)點(diǎn)線程
    boolean interrupted = false;
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this);
        //由于不響應(yīng)中斷,因此只是記錄是否中斷,此時(shí)會(huì)清除中斷標(biāo)識(shí)
        if (Thread.interrupted())
            interrupted = true;
    }
    /**
     * 4.被喚醒需要重新嘗試獲取鎖,獲取鎖成功后才能繼續(xù)往后執(zhí)行
     * 若期間又被中斷后,需要再次設(shè)置已被清除的中斷標(biāo)識(shí)
     */
    if (acquireQueued(node, savedState) || interrupted)
        selfInterrupt();
}

小問:為神馬需要循環(huán)判斷isOnSyncQueue? 即while (!isOnSyncQueue(node)) ?
友情小提示:讀者可以回顧一下同步隊(duì)列獲取鎖的過程

小答:循環(huán)只有一個(gè)目的,就是確保節(jié)點(diǎn)已經(jīng)脫離條件隊(duì)列且進(jìn)入同步隊(duì)列,這樣才有資格重新獲取同步鎖

5.2 響應(yīng)中斷阻塞

public final void await() throws InterruptedException {
    //響應(yīng)中斷,直接拋異常,沒必要往下走了
    if (Thread.interrupted())
        throw new InterruptedException();
    //新節(jié)點(diǎn)入條件隊(duì)列   
    Node node = addConditionWaiter();
    //當(dāng)前線程已持有鎖,但由于要被阻塞,為不影響其他線程,需要先釋放鎖
    int savedState = fullyRelease(node);
    //只阻塞在條件隊(duì)列中的節(jié)點(diǎn)線程,同步節(jié)點(diǎn)才能競爭同步鎖
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this);
        /**
         * 檢測中斷,一旦發(fā)生中斷
         *  1.將條件隊(duì)列中因中斷而喚醒的節(jié)點(diǎn)進(jìn)行轉(zhuǎn)移(注意此處是中斷)
         *  2.退出循環(huán) -> 接下來會(huì)在循環(huán)外進(jìn)行中斷處理
         *  注意:之所以安心退出是因?yàn)闀?huì)通過執(zhí)行transferAfterCancelledWait進(jìn)行節(jié)點(diǎn)轉(zhuǎn)移
         *        這樣隨后就能安心執(zhí)行acquireQueued同步隊(duì)列入隊(duì)操作了
         */    
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    /**
     * 重新嘗試獲取同步鎖,獲取成功后且被中斷,當(dāng)中斷模式為拋出異常時(shí),需要設(shè)置為重新中斷
     * 補(bǔ)充:acquireQueued會(huì)返回獲取鎖過程中線程是否有過中斷,true則說明發(fā)生過中斷
     */
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    /**
     * 若當(dāng)前節(jié)點(diǎn)存在后繼節(jié)點(diǎn)時(shí),需要執(zhí)行出隊(duì)操作
     * 補(bǔ)充:acquireQueued會(huì)返回獲取鎖過程中線程是否有過中斷,true則說明發(fā)生過中斷
     */
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    //interruptMode != 0 說明是需要進(jìn)行中斷處理的
    if (interruptMode != 0)
        //執(zhí)行中斷處理
        reportInterruptAfterWait(interruptMode);
}
/**
 * 中斷處理:
 *  1.中斷模式為THROW_IE:向上拋出異常
 *  2.中斷模式為REINTERRUPT:重新設(shè)置中斷狀態(tài)
 */
private void reportInterruptAfterWait(int interruptMode)
    throws InterruptedException {
    if (interruptMode == THROW_IE)
        throw new InterruptedException();
    else if (interruptMode == REINTERRUPT)
        selfInterrupt();
}

5.3 響應(yīng)中斷和超時(shí)阻塞-納秒可選

/**
 * Implements timed condition wait.
 * 響應(yīng)中斷和超時(shí)阻塞
 * @return long 剩余超時(shí)時(shí)間 <0 說明已超時(shí)
 */
public final long awaitNanos(long nanosTimeout)
        throws InterruptedException {
    //響應(yīng)中斷,直接拋異常,沒必要往下走了
    if (Thread.interrupted())
        throw new InterruptedException();
    //新節(jié)點(diǎn)入條件隊(duì)列     
    Node node = addConditionWaiter();
    //當(dāng)前線程已持有鎖,但由于要被阻塞,為不影響其他線程,需要先釋放同步鎖
    int savedState = fullyRelease(node);
    //截止時(shí)長
    final long deadline = System.nanoTime() + nanosTimeout;
    //只阻塞在條件隊(duì)列中的節(jié)點(diǎn)線程,同步節(jié)點(diǎn)才能競爭同步鎖
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) {
        /**
         * 將條件隊(duì)列中因超時(shí)而喚醒的節(jié)點(diǎn)進(jìn)行轉(zhuǎn)移(注意此處是超時(shí))
         * 值得注意的是:剩余超時(shí)時(shí)間允許為0和負(fù)數(shù)
         */
        if (nanosTimeout <= 0L) {
            transferAfterCancelledWait(node);
            break;
        }
        //為了提高效率,超過閾值才執(zhí)行阻塞,否則接著自旋
        if (nanosTimeout >= spinForTimeoutThreshold)
            //若超過nanosTimeout時(shí)長,會(huì)自動(dòng)解除阻塞喚醒線程
            LockSupport.parkNanos(this, nanosTimeout);
        /**
         * 檢測中斷,一旦發(fā)生中斷
         *  1.將條件隊(duì)列中因中斷而喚醒的節(jié)點(diǎn)進(jìn)行轉(zhuǎn)移(注意此處是中斷)
         *  2.退出循環(huán) -> 接下來會(huì)在循環(huán)外進(jìn)行中斷處理
         *  注意:之所以安心退出是因?yàn)闀?huì)通過執(zhí)行transferAfterCancelledWait進(jìn)行節(jié)點(diǎn)轉(zhuǎn)移
         *        這樣隨后就能安心執(zhí)行acquireQueued同步隊(duì)列入隊(duì)操作了
         */    
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
        //剩余超時(shí)時(shí)間    
        nanosTimeout = deadline - System.nanoTime();
    }
    /**
     * 重新嘗試獲取同步鎖,獲取成功后且被中斷,當(dāng)中斷模式為拋出異常時(shí),需要設(shè)置為重新中斷
     * 補(bǔ)充:acquireQueued會(huì)返回獲取鎖過程中線程是否有過中斷,true則說明發(fā)生過中斷
     */
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    /**
     * 若當(dāng)前節(jié)點(diǎn)存在后繼節(jié)點(diǎn)時(shí),需要執(zhí)行出隊(duì)操作
     * 補(bǔ)充:acquireQueued會(huì)返回獲取鎖過程中線程是否有過中斷,true則說明發(fā)生過中斷
     */
    if (node.nextWaiter != null)
        unlinkCancelledWaiters();
    //interruptMode != 0 說明是需要進(jìn)行中斷處理的
    if (interruptMode != 0)
        //執(zhí)行中斷處理
        reportInterruptAfterWait(interruptMode);
    //返回已花費(fèi)時(shí)長    
    return deadline - System.nanoTime();
}

5.4 響應(yīng)中斷和超時(shí)阻塞-納秒日期單位可選

/**
 * Implements timed condition wait.
 * 響應(yīng)中斷和超時(shí)阻塞-納秒日期單位可選
 * @return boolean 超時(shí)是否發(fā)生在被喚醒之前
 */
public final boolean await(long time, TimeUnit unit)
            throws InterruptedException {
    //注意是納秒        
    long nanosTimeout = unit.toNanos(time);
    //響應(yīng)中斷,直接拋異常,沒必要往下走了
    if (Thread.interrupted())
        throw new InterruptedException();
    //新節(jié)點(diǎn)入條件隊(duì)列 
    Node node = addConditionWaiter();
    //當(dāng)前線程已持有鎖,但由于要被阻塞,為不影響其他線程,需要先釋放同步鎖
    int savedState = fullyRelease(node);
    //截止時(shí)長
    final long deadline = System.nanoTime() + nanosTimeout;
    //只阻塞在條件隊(duì)列中的節(jié)點(diǎn)線程,同步節(jié)點(diǎn)才能競爭同步鎖
    boolean timedout = false;
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) {
        /**
         * 將條件隊(duì)列中因超時(shí)而喚醒的節(jié)點(diǎn)進(jìn)行轉(zhuǎn)移(注意此處是超時(shí))
         * 值得注意的是:剩余超時(shí)時(shí)間允許為0和負(fù)數(shù)
         */
        if (nanosTimeout <= 0L) {
            timedout = transferAfterCancelledWait(node);
            break;
        }
        //為了提高效率,超過閾值才執(zhí)行阻塞,否則接著自旋
        if (nanosTimeout >= spinForTimeoutThreshold)
            //若超過nanosTimeout時(shí)長,會(huì)自動(dòng)解除阻塞喚醒線程
            LockSupport.parkNanos(this, nanosTimeout);
        /**
         * 檢測中斷,一旦發(fā)生中斷
         *  1.將條件隊(duì)列中因中斷而喚醒的節(jié)點(diǎn)進(jìn)行轉(zhuǎn)移(注意此處是中斷)
         *  2.退出循環(huán) -> 接下來會(huì)在循環(huán)外進(jìn)行中斷處理
         *  注意:之所以安心退出是因?yàn)闀?huì)通過執(zhí)行transferAfterCancelledWait進(jìn)行節(jié)點(diǎn)轉(zhuǎn)移
         *        這樣隨后就能安心執(zhí)行acquireQueued同步隊(duì)列入隊(duì)操作了
         */   
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
        //剩余超時(shí)時(shí)間 
        nanosTimeout = deadline - System.nanoTime();
    }
    /**
     * 重新嘗試獲取同步鎖,獲取成功后且被中斷,當(dāng)中斷模式為拋出異常時(shí),需要設(shè)置為重新中斷
     * 補(bǔ)充:acquireQueued會(huì)返回獲取鎖過程中線程是否有過中斷,true則說明發(fā)生過中斷
     */
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    /**
     * 若當(dāng)前節(jié)點(diǎn)存在后繼節(jié)點(diǎn)時(shí),需要執(zhí)行出隊(duì)操作
     * 補(bǔ)充:acquireQueued會(huì)返回獲取鎖過程中線程是否有過中斷,true則說明發(fā)生過中斷
     */
    if (node.nextWaiter != null)
        unlinkCancelledWaiters();
    //interruptMode != 0 說明是需要進(jìn)行中斷處理的
    if (interruptMode != 0)
        //執(zhí)行中斷處理
        reportInterruptAfterWait(interruptMode);
    return !timedout;
}

5.5 響應(yīng)中斷和超時(shí)阻塞-毫秒日期可選

/**
 * Implements absolute timed condition wait.
 * 響應(yīng)中斷和超時(shí)阻塞-毫秒日期可選
 * @return boolean 超時(shí)是否發(fā)生在被喚醒之前
 */
public final boolean awaitUntil(Date deadline)
            throws InterruptedException {
    //注意:與awaitNano區(qū)別的是這里用的是毫秒
    long abstime = deadline.getTime();
    //響應(yīng)中斷,直接拋異常,沒必要往下走了
    if (Thread.interrupted())
        throw new InterruptedException();
    //新節(jié)點(diǎn)入條件隊(duì)列 
    Node node = addConditionWaiter();
    //當(dāng)前線程已持有鎖,但由于要被阻塞,為不影響其他線程,需要先釋放同步鎖
    int savedState = fullyRelease(node);
    //記錄節(jié)點(diǎn)是否超時(shí)
    boolean timedout = false;
    //只阻塞在條件隊(duì)列中的節(jié)點(diǎn)線程,同步節(jié)點(diǎn)才能競爭同步鎖
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) {
        //毫秒超時(shí),將條件隊(duì)列中因超時(shí)而喚醒的節(jié)點(diǎn)進(jìn)行轉(zhuǎn)移(注意此處是超時(shí))
        if (System.currentTimeMillis() > abstime) {
            /**
             * 記錄節(jié)點(diǎn)是否超時(shí)
             * 注意:當(dāng)取消發(fā)生在節(jié)點(diǎn)被喚醒之前才返回true
             */
            timedout = transferAfterCancelledWait(node);
            //轉(zhuǎn)換一定會(huì)成功,因此安心退出即可
            break;
        }
        //若超過abstime時(shí)長,會(huì)自動(dòng)解除阻塞喚醒線程
        LockSupport.parkUntil(this, abstime);
        /**
         * 檢測中斷,一旦發(fā)生中斷
         *  1.將條件隊(duì)列中因中斷而喚醒的節(jié)點(diǎn)進(jìn)行轉(zhuǎn)移(注意此處是中斷)
         *  2.退出循環(huán) -> 接下來會(huì)在循環(huán)外進(jìn)行中斷處理
         *  注意:之所以安心退出是因?yàn)闀?huì)通過執(zhí)行transferAfterCancelledWait進(jìn)行節(jié)點(diǎn)轉(zhuǎn)移
         *        這樣隨后就能安心執(zhí)行acquireQueued同步隊(duì)列入隊(duì)操作了
         */    
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
        break;
    }
    /**
     * 重新嘗試獲取同步鎖,獲取成功后且被中斷,當(dāng)中斷模式為拋出異常時(shí),需要設(shè)置為重新中斷
     * 補(bǔ)充:acquireQueued會(huì)返回獲取鎖過程中線程是否有過中斷,true則說明發(fā)生過中斷
     */
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    /**
     * 若當(dāng)前節(jié)點(diǎn)存在后繼節(jié)點(diǎn)時(shí),需要執(zhí)行出隊(duì)操作
     * 補(bǔ)充:acquireQueued會(huì)返回獲取鎖過程中線程是否有過中斷,true則說明發(fā)生過中斷
     */
    if (node.nextWaiter != null)
        unlinkCancelledWaiters();
    //interruptMode != 0 說明是需要進(jìn)行中斷處理的    
    if (interruptMode != 0)
         //執(zhí)行中斷處理
        reportInterruptAfterWait(interruptMode);
    return !timedout;
}

6.條件節(jié)點(diǎn)喚醒

喚醒操作主要干了三個(gè)事情:
1.清除節(jié)點(diǎn): 從條件隊(duì)列中移除該節(jié)點(diǎn)

2.節(jié)點(diǎn)轉(zhuǎn)移: 將條件節(jié)點(diǎn)轉(zhuǎn)換為同步節(jié)點(diǎn),即從條件隊(duì)列轉(zhuǎn)移到同步隊(duì)列

3.喚醒節(jié)點(diǎn): 將轉(zhuǎn)移成功的節(jié)點(diǎn)重新喚醒

6.1 喚醒單個(gè)

喚醒單個(gè)的實(shí)質(zhì):根據(jù)FIFO原則喚醒first,即條件隊(duì)列頭節(jié)點(diǎn)

/**
 * Moves the longest-waiting thread, if one exists, from the
 * wait queue for this condition to the wait queue for the
 * owning lock.
 *
 * 從條件隊(duì)列中喚醒一個(gè)節(jié)點(diǎn)
 * 原則:將條件隊(duì)列中已存在的等待時(shí)間最長的線程轉(zhuǎn)移到等待隊(duì)列中,即頭節(jié)點(diǎn)
 */
public final void signal() {
    //條件隊(duì)列只適用于獨(dú)占模式且只能由持有鎖的線程執(zhí)行喚醒操作
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    //先進(jìn)先出原則    
    Node first = firstWaiter;
    if (first != null)
        doSignal(first);
}
/**
 * Removes and transfers nodes until hit non-cancelled one or null. 
 * @param first (non-null) the first node on condition queue 條件隊(duì)列非空頭節(jié)點(diǎn)
 */
private void doSignal(Node first) {
    do {
        //清空隊(duì)列
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        first.nextWaiter = null;
        /**
         * 將節(jié)點(diǎn)從條件隊(duì)列轉(zhuǎn)換到同步隊(duì)列中并喚醒線程
         */
    } while (!transferForSignal(first) && (first = firstWaiter) != null);
}

6.2 喚醒全部

喚醒全部的實(shí)質(zhì):沿著nextWaiter順序遍歷依次喚醒

/**
 * Moves all threads from the wait queue for this condition to
 * the wait queue for the owning lock.
 *
 * 喚醒條件隊(duì)列中的全部節(jié)點(diǎn)
 */
public final void signalAll() {
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        //跟signal的區(qū)別就是多了個(gè)All...
        doSignalAll(first);
}
/**
 * Removes and transfers all nodes.
 * 
 * 全部喚醒的實(shí)質(zhì):
 *     沿著nextWaiter順序遍歷依次轉(zhuǎn)移并喚醒
 */
private void doSignalAll(Node first) {
    //1.注意要清空條件隊(duì)列
    lastWaiter = firstWaiter = null;
    //2.沿著nextWaiter順序遍歷依次喚醒
    do {
        Node next = first.nextWaiter;
        first.nextWaiter = null;
        transferForSignal(first);
        //3.其實(shí)質(zhì)就是沿著nextWaiter順序遍歷
        first = next;
    } while (first != null);
}

7.條件節(jié)點(diǎn)轉(zhuǎn)移

轉(zhuǎn)移操作指的是節(jié)點(diǎn)由條件節(jié)點(diǎn)轉(zhuǎn)換為同步節(jié)點(diǎn),主要進(jìn)行了如下兩步進(jìn)行轉(zhuǎn)移:
1.更新節(jié)點(diǎn)狀態(tài): 根據(jù)同步隊(duì)列入隊(duì)原則,新節(jié)點(diǎn)初始狀態(tài)必須為0
2.同步隊(duì)列入隊(duì): 當(dāng)前節(jié)點(diǎn)肯定能入隊(duì)成功,同時(shí)返回前驅(qū)節(jié)點(diǎn)

7.1 因正常喚醒轉(zhuǎn)移

/**
 * Transfers a node from a condition queue onto sync queue.
 * Returns true if successful.
 *
 * 將節(jié)點(diǎn)從條件隊(duì)列轉(zhuǎn)換到同步隊(duì)列中并喚醒線程
 *  1.在正常調(diào)用signalXXX()方法時(shí)才會(huì)調(diào)用該方法
 *  2.若因?yàn)槌瑫r(shí)或中斷進(jìn)行轉(zhuǎn)移,不會(huì)調(diào)用該方法,但這兩種情況的轉(zhuǎn)移都會(huì)置為0
 *  3.注意:在調(diào)用該方法之前都會(huì)執(zhí)行first.nextWaiter = null,即從等待隊(duì)列中移除當(dāng)前頭節(jié)點(diǎn)
 * 
 * @param node the node 根據(jù)FIFO原則,通常為first節(jié)點(diǎn)
 * @return true if successfully transferred (else the node was
 * cancelled before signal) 轉(zhuǎn)換成功或節(jié)點(diǎn)在喚醒前被取消 才返回true
 */
final boolean transferForSignal(Node node) {
    /**
     * 1.CAS更新節(jié)點(diǎn)狀態(tài)為0,一旦失敗立即返回false
     * 注意:必須將節(jié)點(diǎn)狀態(tài)更新為0,因?yàn)橥疥?duì)列入隊(duì)時(shí)新節(jié)點(diǎn)必須為0,
     *       否則就不符合同步隊(duì)列入隊(duì)原則,因此一旦失敗立即返回
     * 補(bǔ)充:CAS失敗的原因在于其他線程已經(jīng)執(zhí)行喚醒操作將該節(jié)點(diǎn)變更為0,因此其實(shí)無需再次enq了
     */
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;
    //2.進(jìn)入同步隊(duì)列,注意enq方法返回的是前驅(qū)節(jié)點(diǎn)
    Node p = enq(node);
    /**
     * 3.在因前驅(qū)節(jié)點(diǎn)被取消或狀態(tài)突變發(fā)生后需要喚醒節(jié)點(diǎn)線程 
     *  ws > 0:
     *     說明前驅(qū)節(jié)點(diǎn)被取消(CANCELLED),因此需要喚醒當(dāng)前節(jié)點(diǎn)線程
     *  !compareAndSetWaitStatus(p, ws, Node.SIGNAL):
     *     此時(shí)發(fā)生狀態(tài)突變,比如ws剛好變成CANCELLED
     * 補(bǔ)充:因?yàn)閑nq保證一定入隊(duì)成功,因此實(shí)質(zhì)是喚醒在同步隊(duì)列中的節(jié)點(diǎn)
     *       結(jié)合awaitXXX(),線程會(huì)從park之后繼續(xù)往后執(zhí)行
     * 
     * !!特別注意!!:正常情況下,根據(jù)流程可知
     *  ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)
     *  這種判斷是不會(huì)返回true的,此時(shí)是不會(huì)喚線程;
     *  因此真正喚醒線程的地方在于調(diào)用signal()方法的線程發(fā)送完signal
     *  信號(hào)后再調(diào)用release(1)方法(比如調(diào)用ReentrantLock的unlock()),
     *  因?yàn)槠湟讶腙?duì),因此可以被喚醒
     */
    int ws = p.waitStatus.
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        LockSupport.unpark(node.thread);
    return true;
}

小問:讀者還記得enq方法都做了什么嗎?
小答:針對(duì)enq方法,筆者簡單回顧一下作用,幫助讀者理解一下:
1.初始化時(shí)對(duì)head和tail進(jìn)行賦值

2.當(dāng)前節(jié)點(diǎn)進(jìn)入同步對(duì)了并作為新的tail

3.入隊(duì)一定可以成功因?yàn)樽孕?/p>

4.注意enq返回的是前驅(qū)節(jié)點(diǎn)

7.2 因中斷或超時(shí)喚醒轉(zhuǎn)移

/**
 * Transfers node, if necessary, to sync queue after a cancelled wait.
 * Returns true if thread was cancelled before being signalled.
 *
 * 將條件隊(duì)列中因中斷或超時(shí)而喚醒的節(jié)點(diǎn)進(jìn)行轉(zhuǎn)移
 * 取消發(fā)生在被喚醒之前返回true
 * 兩種情況需要調(diào)用該方法:
 *  1.中斷:checkInterruptWhileWaiting() -> 該方法會(huì)被awaitXXX()調(diào)用
 *  2.超時(shí):帶超時(shí)的awaitXXX()方法
 */
final boolean transferAfterCancelledWait(Node node) {
    //Node.CONDITION狀態(tài)節(jié)點(diǎn)只用于條件隊(duì)列,因此需要設(shè)置為0才能進(jìn)入同步隊(duì)列
    if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
        //當(dāng)CAS變更waitStatus為0時(shí),說明節(jié)點(diǎn)已跟條件隊(duì)列無關(guān),隨后進(jìn)入同步隊(duì)列即可
        enq(node);
        //由于enq方法能夠保證進(jìn)入同步隊(duì)列成功,因此當(dāng)enq執(zhí)行完畢,可以放心的返回true
        return true;
    }
    //若CAS失敗且節(jié)點(diǎn)還不在同步隊(duì)列中,線程需要先釋放資源,提高效率
    while (!isOnSyncQueue(node))
        Thread.yield();
    return false;
}

小問:為什么需要執(zhí)行Thread.yield()?
小答:對(duì)于這個(gè)問題,筆者的考慮如下:
1.若CAS變更為0失敗,說明已被其他線程執(zhí)行喚醒轉(zhuǎn)換并變更為0
2.但由于enq操作可能還在進(jìn)行中,因此此時(shí)該節(jié)點(diǎn)可能還沒真正入隊(duì),因此需要循環(huán)檢測是否已入隊(duì)
3.由于入隊(duì)操作已經(jīng)在執(zhí)行中,因此無需重復(fù)執(zhí)行enq操作,可以先釋放資源,并通過isOnSyncQueue退出循環(huán)即可,優(yōu)化效率

8.條件節(jié)點(diǎn)隊(duì)列操作

8.1 條件節(jié)點(diǎn)入隊(duì)

新節(jié)點(diǎn)入隊(duì)遵循FIFO原則,主要會(huì)做如下操作:

1.清洗: 若尾節(jié)點(diǎn)非CONDITION狀態(tài),需要清除所有非CONDITION狀態(tài)節(jié)點(diǎn)并重設(shè)頭尾節(jié)點(diǎn)

2.封裝: 封裝當(dāng)前線程為Node,同時(shí)初始化節(jié)點(diǎn)狀態(tài)為CONDITION

3.入隊(duì): 隊(duì)列為空時(shí),頭尾節(jié)點(diǎn)指向同一個(gè)元素;非空時(shí)通過nextWaiter形成鏈表

/**
 * Adds a new waiter to wait queue.
 *
 * 新增條件等待隊(duì)列隊(duì)尾節(jié)點(diǎn) - FIFO
 * 補(bǔ)充:由于條件隊(duì)列只適用于獨(dú)占模式,因此該方法不會(huì)有并發(fā)問題
 * @return its new wait node 返回新節(jié)點(diǎn)
 */
private Node addConditionWaiter() {
    //FIFO原則:新入隊(duì)元素需隊(duì)尾插入
    Node t = lastWaiter;
    //1.若尾節(jié)點(diǎn)非Condition狀態(tài),需要清除所有非CONDITION狀態(tài)節(jié)點(diǎn)并重設(shè)頭尾節(jié)點(diǎn)
    if (t != null && t.waitStatus != Node.CONDITION) {
        /**
         * 刪除條件等待隊(duì)列中的所有非CONDITION狀態(tài)節(jié)點(diǎn)
         * 補(bǔ)充:由于在條件隊(duì)列中節(jié)點(diǎn)狀態(tài)只能是CONDITION或0(signal|timeout|interrupt)
         *      因此該方法實(shí)質(zhì)就清除空節(jié)點(diǎn)或狀態(tài)為0節(jié)點(diǎn)
         */
        unlinkCancelledWaiters();
        //上個(gè)方法執(zhí)行后會(huì)使得lastWaiter肯定為非取消狀態(tài)節(jié)點(diǎn)(只可能為空或CONDITION狀態(tài))
        t = lastWaiter;
    }
    //2.封裝當(dāng)前線程為Node,同時(shí)初始化節(jié)點(diǎn)狀態(tài)為CONDITION
    Node node = new Node(Thread.currentThread(), Node.CONDITION);
    //3.隊(duì)尾元素為空,正說明條件隊(duì)列為空
    if (t == null)
        firstWaiter = node;
    else
        //4.新元素入隊(duì)需隊(duì)尾插入
        t.nextWaiter = node;
    //5.這里需要注意的是:隊(duì)列為空時(shí),頭尾節(jié)點(diǎn)是指向同一個(gè)元素的;非空時(shí)就會(huì)形成鏈表
    lastWaiter = node;
    return node;
}

小問:何時(shí)出現(xiàn)t != null && t.waitStatus != Node.CONDITION?
小答:尾節(jié)點(diǎn)的狀態(tài)不為Node.CONDITION,那只可能為0,這意味著節(jié)點(diǎn)需要轉(zhuǎn)移,那么節(jié)點(diǎn)狀態(tài)變更時(shí)機(jī)都有哪些呢?
1.當(dāng)因中斷或超時(shí)被喚醒后會(huì)通過調(diào)用transferAfterCancelledWait將節(jié)點(diǎn)狀態(tài)CAS為0

2.當(dāng)被正常喚醒后會(huì)通過調(diào)用transferForSignal將節(jié)點(diǎn)狀態(tài)CAS為0

分析:但問題是正常喚醒后會(huì)先執(zhí)行first.nextWaiter = null,因此此時(shí)尾節(jié)點(diǎn)應(yīng)為空

結(jié)論:排除這種情況后可知,只有第一種情況,即因中斷或超時(shí)被喚醒后才會(huì)出現(xiàn)這種情況

8.2 條件節(jié)點(diǎn)出隊(duì)

條件節(jié)點(diǎn)的出隊(duì)時(shí)機(jī):
1.新節(jié)點(diǎn)入隊(duì): 通過判斷t.waitStatus != Node.CONDITION為true時(shí)執(zhí)行unlinkCancelledWaiters()

2.節(jié)點(diǎn)重新嘗試獲取同步鎖后: 通過判斷node.nextWaiter != null為true時(shí)執(zhí)行unlinkCancelledWaiters()

3.節(jié)點(diǎn)被正常喚醒后: 通過調(diào)用signal()方法并執(zhí)行first.nextWaiter = null

補(bǔ)充:3與1,2的區(qū)別在于前者只是清除頭節(jié)點(diǎn),后者是遍歷清除所有非CONDITION狀態(tài)節(jié)點(diǎn)
注意:無論入隊(duì)還是出隊(duì),前提都是線程已經(jīng)持有同步鎖

/**
 * Unlinks cancelled waiter nodes from condition queue.
 *
 * 移除條件隊(duì)列中的所有非CONDITION狀態(tài)節(jié)點(diǎn)
 * 兩種情況會(huì)觸發(fā)該方法:
 *  1.新節(jié)點(diǎn)加入條件隊(duì)列時(shí) -> addConditionWaiter()
 *  2.節(jié)點(diǎn)被喚醒之后 -> awaitXXX()
 *    補(bǔ)充一點(diǎn):注意不是在signal()方法中執(zhí)行的(因?yàn)楹笳咧挥修D(zhuǎn)移和喚醒操作,前者才有獲取鎖操作)
 * 注意:條件隊(duì)列中的節(jié)點(diǎn)只有CONDITION和0兩種狀態(tài)
 */
private void unlinkCancelledWaiters() {
    Node t = firstWaiter;
    //臨時(shí)節(jié)點(diǎn),主要用于在遍歷時(shí)記錄上一個(gè)非CONDITION節(jié)點(diǎn)(因?yàn)橐^所有非CONDITION節(jié)點(diǎn))
    Node trail = null;
    //從前往后順序遍歷條件隊(duì)列,剔除全部非CONDITION狀態(tài)節(jié)點(diǎn)
    while (t != null) {
        //下一個(gè)節(jié)點(diǎn),注意next可能為空
        Node next = t.nextWaiter;
        /**
         * 非CONDITION狀態(tài)(即0),需要執(zhí)行移除操作
         * 強(qiáng)調(diào):在條件隊(duì)列中節(jié)點(diǎn)的waitStatus,只可能是CONDITION或是0(signal|timeout|interrupt)
         */
        if (t.waitStatus != Node.CONDITION) {
            /**
             * 移除后繼節(jié)點(diǎn) 等同于 將當(dāng)前節(jié)點(diǎn)從隊(duì)列中移除
             * 同時(shí)由于不再有引用,會(huì)help GC
             * 注意:GC同時(shí)會(huì)考慮firstWaiter和lastWaiter的引用情況
             *      即若當(dāng)前線程無用,上述變量最終也會(huì)移除對(duì)該節(jié)點(diǎn)的引用
             */
            t.nextWaiter = null;
            /**
             * 若之前沒有非CONDITION狀態(tài)節(jié)點(diǎn),就先讓next當(dāng)一下頭頭
             * 注意:該方法執(zhí)行完畢后,firstWaiter只能為空或CONDITION
             */
            if (trail == null)
                firstWaiter = next;
            else
                /**
                 * 一旦當(dāng)前節(jié)點(diǎn)非CONDITION狀態(tài),那么需要先將它的后繼節(jié)點(diǎn)與上一個(gè)非取消節(jié)點(diǎn)建立起聯(lián)系
                 * 即使后繼節(jié)點(diǎn)是非CONDITION狀態(tài)也沒事,因?yàn)樵谙麓伪闅v時(shí)會(huì)重新建立聯(lián)系的
                 * 說白了,其本質(zhì)就是刪除t
                 */
                trail.nextWaiter = next;
            /**
             * 最后一個(gè)非CONDITION狀態(tài)節(jié)點(diǎn)即是條件隊(duì)列的尾節(jié)點(diǎn)
             * 注意:該方法執(zhí)行完畢后,lastWaiter最終只能為空或CONDITION
             */
            if (next == null)
                lastWaiter = trail;
        }
        else
            //CONDITION狀態(tài),就記錄一下以作為下次遍歷時(shí)的上一個(gè)非CONDITION節(jié)點(diǎn)
            trail = t;
        //開啟下一次循環(huán)
        t = next;
    }
}

小問:為神馬調(diào)用unlinkCancelledWaiters后firstWaiter只能為空或CONDITION?
小答:頭節(jié)點(diǎn)非空時(shí),有且只有條件隊(duì)列中全部都是非CONDITION狀態(tài)的節(jié)點(diǎn)時(shí),新的頭節(jié)點(diǎn)才能為空,原因在于firstWaiter = next;的前提是無可用的CONDITION狀態(tài)節(jié)點(diǎn),而trail只有在有CONDITION狀態(tài)節(jié)點(diǎn)時(shí)才會(huì)被賦值更新

9.重要方法

9.1 釋放同步鎖

/**
 * Invokes release with current state value; returns saved state.
 * Cancels node and throws exception on failure.
 *
 * 釋放同步狀態(tài)并返回原狀態(tài)
 * 當(dāng)釋放失敗時(shí)節(jié)點(diǎn)會(huì)被取消同時(shí)拋出IllegalMonitorStateException異常
 */
final int fullyRelease(Node node) {
    boolean failed = true;
    try {
        //獲取當(dāng)前線程持有鎖的同步狀態(tài) -- 支持可重入  
        int savedState = getState();
        /**
         * 由于該方法只用于獨(dú)占模式,因此使用的是獨(dú)占獨(dú)有的release方法
         * 關(guān)于release方法請(qǐng)參見筆者的并發(fā)番@AbstractQueuedSynchronizer一文通
         * 作用是更新state狀態(tài)(為0)同時(shí)喚醒后繼節(jié)點(diǎn)(如果存在的話)
         * 釋放鎖的目的是為了讓其他線程能夠獲取鎖去執(zhí)行任務(wù),
         * 并等到其他線程調(diào)用signal()和release()后能夠喚醒該線程
         */
        if (release(savedState)) {
            failed = false;
            return savedState;
        } else {
            //一旦釋放失敗,直接拋出異常 -- 干脆的很
            throw new IllegalMonitorStateException();
        }
    } finally {
        /**
         * 釋放失敗的補(bǔ)救措施:
         *   由于實(shí)際上節(jié)點(diǎn)已完成使命,節(jié)點(diǎn)狀態(tài)需要變成取消狀態(tài)以用于跳過和回收
         * 注意:此時(shí)節(jié)點(diǎn)還是同步節(jié)點(diǎn),因此需要設(shè)置為CANCELLED
         */
        if (failed)
            node.waitStatus = Node.CANCELLED;
    }
}

9.2 檢測中斷

/**
 * Checks for interrupt, returning THROW_IE if interrupted
 * before signalled, REINTERRUPT if after signalled, or
 * 0 if not interrupted.
 *
 * 檢測線程的中斷情況,通過返回狀態(tài)碼告知線程下一步應(yīng)如何處理中斷,情況有如下三種:
 *  1.中斷發(fā)生在被喚醒之前返回THROW_IE(需要拋出異常)
 *  2.中斷發(fā)生在被喚醒之后返回REINTERRUPT(需要重新中斷)
 *  3.無中斷返回0
 */
private int checkInterruptWhileWaiting(Node node) {
    return Thread.interrupted() ? (transferAfterCancelledWait(node) ? 
        THROW_IE : REINTERRUPT) : 0
}

小問:為神馬中斷發(fā)生在被喚醒之前返回THROW_IE?之后返回REINTERRUPT?
小答:喚醒前拋出異常主要是為了快速失敗,提高效率,而重新中斷是要處理被喚醒后一系列
喚醒前: 若被喚醒之前線程被中斷,說明線程此時(shí)沒有獲取到資源,盡快拋出異常就可以結(jié)束等待并解放生產(chǎn)力;

喚醒后: 若被喚醒后線程被中斷,說明線程基本已經(jīng)獲取鎖,這時(shí)可能要多執(zhí)行一些操作,如釋放鎖等

9.3 判斷節(jié)點(diǎn)是否位于同步隊(duì)列

/**
 * Returns true if a node, always one that was initially placed on
 * a condition queue, is now waiting to reacquire on sync queue.
 *
 * 判斷節(jié)點(diǎn)是否位于同步隊(duì)列
 * @param node the node
 * @return true if is reacquiring
 */
final boolean isOnSyncQueue(Node node) {
    //CONDITION只用于條件隊(duì)列 || prev為空說明其一定不在同步隊(duì)列中
    if (node.waitStatus == Node.CONDITION || node.prev == null)
        return false;
    //如果next非空,它肯定在同步隊(duì)列中    
    if (node.next != null) 
        return true;
    //node.prev非空并不意味著在同步隊(duì)列中,因此需要從后往前遍歷以判斷是否在同步隊(duì)列中    
    return findNodeFromTail(node);
}
/**
 * Returns true if node is on sync queue by searching backwards from tail.
 * 
 * node.prev非空并不意味著在同步隊(duì)列中,原因是CAS(放入同步隊(duì)列)可能失敗
 * 因此需要從后往前遍歷以判斷其是否在同步隊(duì)列中
 * 由于該方法被調(diào)用時(shí),該節(jié)點(diǎn)總是靠近tail,因此除非CAS失敗(可能性很低),否則幾乎無須遍歷
 */
private boolean findNodeFromTail(Node node) {
    //從同步隊(duì)列的隊(duì)尾節(jié)點(diǎn)開始沿著prev依次往前遍歷
    Node t = tail;
    for (;;) {
        //存在,返回true
        if (t == node)
            return true;
        //都遍歷到頭了還不存在,那只能返回false了    
        if (t == null)
            return false;
        //沿著prev依次往前遍歷吧    
        t = t.prev;
    }
}

10.流程圖

10.1 無響應(yīng)中斷

注意:僅當(dāng)調(diào)用signal()的線程再調(diào)用release()方法之后才會(huì)真正解除被阻塞線程的阻塞狀態(tài),但release()之后該線程并不是被立即喚醒,而是重新競爭鎖直到變成head節(jié)點(diǎn)的后繼節(jié)點(diǎn)且head節(jié)點(diǎn)為SIGNAl狀態(tài)時(shí)才能被真正喚醒


條件隊(duì)列.jpg

10.2 響應(yīng)中斷

中斷的條件隊(duì)列.jpg

10.3 響應(yīng)中斷與超時(shí)

超時(shí)中斷的條件隊(duì)列.jpg

1. 通過對(duì)比Object的監(jiān)視器方法和Condition接口

qq_pic_merged_1535605094135.jpg

2 Condition接口與示例

Condition是依賴Lock對(duì)象的
使用示例

Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();
public void conditionWait() throws InterruptedException {
    lock.lock();
    try {
            condition.await();
    } finally {
            lock.unlock();
    }
}
public void conditionSignal() throws InterruptedException {
    lock.lock();
    try {
            condition.signal();
    } finally {
            lock.unlock();
    }
}

v般都會(huì)將Condition對(duì)象作為成員變量。當(dāng)調(diào)用await()方法后,當(dāng)前線程會(huì)釋放鎖并在此等待,而其他線程調(diào)用Condition對(duì)象的signal()方法,通知當(dāng)前線程后,當(dāng)前線程才從await()方法返回,并且在返回前已經(jīng)獲取了鎖。

Condition定義的方法

qq_pic_merged_1535605905139.jpg

qq_pic_merged_1535605921452.jpg

獲取一個(gè)Condition必須通過Lock的newCondition()方法。

Condition的使用方式
一個(gè)有界隊(duì)列的示例來深入了解Condition的使用方式。有界隊(duì)列是一種特殊的隊(duì)列,當(dāng)隊(duì)列為空時(shí),隊(duì)列的獲取操作將會(huì)阻塞獲取線程,直到隊(duì)列中有新增元素,當(dāng)隊(duì)列已滿時(shí),隊(duì)列的插入操作將會(huì)阻塞插入線程,直到隊(duì)列出現(xiàn)“空位”

public class BoundedQueue<T> {
    private Object[]    items;
    // 添加的下標(biāo),刪除的下標(biāo)和數(shù)組當(dāng)前數(shù)量
    private int addIndex, removeIndex, count;
    private Lock lock     = new ReentrantLock();
    private Condition    notEmpty = lock.newCondition();
    private Condition    notFull = lock.newCondition();
    public BoundedQueue(int size) {
            items = new Object[size];
    }
    // 添加一個(gè)元素,如果數(shù)組滿,則添加線程進(jìn)入等待狀態(tài),直到有"空位"
    public void add(T t) throws InterruptedException {
            lock.lock();
            try {
                    while (count == items.length)
                            notFull.await();
                    items[addIndex] = t;
                    if (++addIndex == items.length)
                            addIndex = 0;

                    ++count;
                    notEmpty.signal();
            } finally {
                    lock.unlock();
            }
    }
    // 由頭部刪除一個(gè)元素,如果數(shù)組空,則刪除線程進(jìn)入等待狀態(tài),直到有新添加元素
    @SuppressWarnings("unchecked")
    public T remove() throws InterruptedException {
            lock.lock();
            try {
                    while (count == 0)
                            notEmpty.await();
                    Object x = items[removeIndex];
                    if (++removeIndex == items.length)
                            removeIndex = 0;
                    --count;
                    notFull.signal();
                    return (T) x;
            } finally {
                    lock.unlock();
            }
    }
}

上述示例中,BoundedQueue通過add(T t)方法添加一個(gè)元素,通過remove()方法移出一個(gè)元素。以添加方法為例。

  • 首先需要獲得鎖,目的是確保數(shù)組修改的可見性和排他性。當(dāng)數(shù)組數(shù)量等于數(shù)組長度時(shí),表示數(shù)組已滿,則調(diào)用notFull.await(),當(dāng)前線程隨之釋放鎖并進(jìn)入等待狀態(tài)。
  • 如果數(shù)組數(shù)量不等于數(shù)組長度,表示數(shù)組未滿,則添加元素到數(shù)組中,同時(shí)通知等待在notEmpty上的線程,數(shù)組中已經(jīng)有新元素可以獲取。
  • 在添加和刪除方法中使用while循環(huán)而非if判斷,目的是防止過早或意外的通知,只有條件符合才能夠退出循環(huán)。

3 Condition的實(shí)現(xiàn)分析

  • ConditionObject是同步器AbstractQueuedSynchronizer的內(nèi)部類,因?yàn)镃ondition的操作需首先需要獲得鎖,
  • 每個(gè)Condition對(duì)象都包含著一個(gè)隊(duì)列(以下稱為等待隊(duì)列),該隊(duì)列是Condition對(duì)象實(shí)現(xiàn)等待/通知功能的關(guān)鍵。

3.1 等待隊(duì)列

  • 等待隊(duì)列是一個(gè)FIFO的隊(duì)列,在隊(duì)列中的每個(gè)節(jié)點(diǎn)都包含了一個(gè)線程引用,該線程就是在Condition對(duì)象上等待的線程,如果一個(gè)線程調(diào)用了Condition.await()方法,那么該線程將會(huì)釋放鎖、構(gòu)造成節(jié)點(diǎn)加入等待隊(duì)列并進(jìn)入等待狀態(tài)。
  • 節(jié)點(diǎn)的定義復(fù)用了同步器中節(jié)點(diǎn)的定義,也就是說,同步隊(duì)列和等待隊(duì)列中節(jié)點(diǎn)類型都是同步器的靜態(tài)內(nèi)部類AbstractQueuedSynchronizer.Node**。
  • 一個(gè)Condition包含一個(gè)等待隊(duì)列,Condition擁有首節(jié)點(diǎn)(firstWaiter)和尾節(jié)點(diǎn)(lastWaiter)。
  • 當(dāng)前線程調(diào)用Condition.await()方法,將會(huì)以當(dāng)前線程構(gòu)造節(jié)點(diǎn),并將節(jié)點(diǎn)從尾部加入等待隊(duì)列

等待隊(duì)列的基本結(jié)構(gòu)圖


qq_pic_merged_1535807449758.jpg

Condition擁有首尾節(jié)點(diǎn)的引用,而新增節(jié)點(diǎn)只需要將原有的尾節(jié)點(diǎn)nextWaiter指向它,并且更新尾節(jié)點(diǎn)即可。

沒有使用CAS保證,原因在于調(diào)用await()方法的線程必定是獲取了鎖的線程,也就是說該過程是由鎖來保證線程安全的。

在Object的監(jiān)視器模型上,一個(gè)對(duì)象擁有一個(gè)同步隊(duì)列和等待隊(duì)列,
并發(fā)包中的Lock(更確切地說是同步器)擁有一個(gè)同步隊(duì)列和多個(gè)等待隊(duì)列

qq_pic_merged_1538233209041.jpg

Condition的實(shí)現(xiàn)是同步器的內(nèi)部類,因此每個(gè)Condition實(shí)例都能夠訪問同步器提供的方法,相當(dāng)于每個(gè)Condition都擁有所屬同步器的引用。

3.2 等待

  • 調(diào)用Condition的await()方法(或者以await開頭的方法),會(huì)使當(dāng)前線程進(jìn)入等待隊(duì)列并釋放鎖,同時(shí)線程狀態(tài)變?yōu)榈却隣顟B(tài)。
  • 當(dāng)從await()方法返回時(shí),當(dāng)前線程一定獲取了Condition相關(guān)聯(lián)的鎖。 如果從隊(duì)列(同步隊(duì)列和等待隊(duì)列)的角度看await()方法,當(dāng)調(diào)用await()方法時(shí),相當(dāng)于同步隊(duì)列的首節(jié)點(diǎn)(獲取了鎖的節(jié)點(diǎn))移動(dòng)到Condition的等待隊(duì)列中。

Condition的await()方法

 public final void await() throws Interrupt
edException {
    if (Thread.interrupted())
            throw new InterruptedException();
    // 當(dāng)前線程加入等待隊(duì)列
    Node node = addConditionWaiter();
    // 釋放同步狀態(tài),也就是釋放鎖
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) {
            LockSupport.park(this);
            if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
    }
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
            interruptMode = REINTERRUPT;
    if (node.nextWaiter != null)
            unlinkCancelledWaiters();
    if (interruptMode != 0)
            reportInterruptAfterWait(interruptMode);
}
  • 調(diào)用該方法的線程成功獲取了鎖的線程,也就是同步隊(duì)列中的首節(jié)點(diǎn),該方法會(huì)將當(dāng)前線程構(gòu)造成節(jié)點(diǎn)并加入等待隊(duì)列中,然后釋放同步狀態(tài),喚醒同步隊(duì)列中的后繼節(jié)點(diǎn),然后當(dāng)前線程會(huì)進(jìn)入等待狀態(tài)。
  • 當(dāng)?shù)却?duì)列中的節(jié)點(diǎn)被喚醒,則喚醒節(jié)點(diǎn)的線程開始嘗試獲取同步狀態(tài)。如果不是通過其他線程調(diào)用Condition.signal()方法喚醒,而是對(duì)等待線程進(jìn)行中斷,則會(huì)拋出InterruptedException。
  • 如果從隊(duì)列的角度去看,當(dāng)前線程加入Condition的等待隊(duì)列,該過程如圖5-11示。
qq_pic_merged_1538234152233.jpg
  • 如圖所示,同步隊(duì)列的首節(jié)點(diǎn)并不會(huì)直接加入等待隊(duì)列,而是通過addConditionWaiter()方法把當(dāng)前線程構(gòu)造成一個(gè)新的節(jié)點(diǎn)并將其加入等待隊(duì)列中。

3.3 通知

調(diào)用Condition的signal()方法,將會(huì)喚醒在等待隊(duì)列中等待時(shí)間最長的節(jié)點(diǎn)(首節(jié)點(diǎn)),在喚醒節(jié)點(diǎn)之前,會(huì)將節(jié)點(diǎn)移到同步隊(duì)列中。
ConditionObject的signal方法

  public final void signal() {
    if (!isHeldExclusively())
            throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
            doSignal(first);
}

調(diào)用該方法的前置條件是當(dāng)前線程必須獲取了鎖,可以看到signal()方法進(jìn)行了isHeldExclusively()檢查,也就是當(dāng)前線程必須是獲取了鎖的線程。接著獲取等待隊(duì)列的首節(jié)點(diǎn),將其移動(dòng)到同步隊(duì)列并使用LockSupport喚醒節(jié)點(diǎn)中的線程。
節(jié)點(diǎn)從等待隊(duì)列移動(dòng)到同步隊(duì)列的過程

qq_pic_merged_1538234438845.jpg

  • 通過調(diào)用同步器的enq(Node node)方法,等待隊(duì)列中的頭節(jié)點(diǎn)線程安全地移動(dòng)到同步隊(duì)列。
  • 當(dāng)節(jié)點(diǎn)移動(dòng)到同步隊(duì)列后,當(dāng)前線程再使用LockSupport喚醒該節(jié)點(diǎn)的線程。 被喚醒后的線程,將從await()方法中的while循環(huán)中退出(isOnSyncQueue(Node node)方法返回true,節(jié)點(diǎn)已經(jīng)在同步隊(duì)列中),進(jìn)而調(diào)用同步器的acquireQueued()方法加入到獲取同步狀態(tài)的競爭中。
  • 成功獲取同步狀態(tài)(或者說鎖)之后,被喚醒的線程將從先前調(diào)用的await()方法返回,此時(shí)該線程已經(jīng)成功地獲取了鎖。
  • Condition的signalAll()方法,相當(dāng)于對(duì)等待隊(duì)列中的每個(gè)節(jié)點(diǎn)均執(zhí)行一次signal()方法,效果就是將等待隊(duì)列中所有節(jié)點(diǎn)全部移動(dòng)到同步隊(duì)列中,并喚醒每個(gè)節(jié)點(diǎn)的線程。

參考

《java并發(fā)編程的藝術(shù)》
并發(fā)番@ConditionObject一文通

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容