詳解Condition的await和signal等待/通知機(jī)制

原創(chuàng)文章&經(jīng)驗(yàn)總結(jié)&從校招到A廠一路陽光一路滄桑

詳情請戳www.codercc.com

image

1.Condition簡介

任何一個java對象都天然繼承于Object類,在線程間實(shí)現(xiàn)通信的往往會應(yīng)用到Object的幾個方法,比如wait(),wait(long timeout),wait(long timeout, int nanos)與notify(),notifyAll()幾個方法實(shí)現(xiàn)等待/通知機(jī)制,同樣的, 在java Lock體系下依然會有同樣的方法實(shí)現(xiàn)等待/通知機(jī)制。從整體上來看Object的wait和notify/notify是與對象監(jiān)視器配合完成線程間的等待/通知機(jī)制,而Condition與Lock配合完成等待通知機(jī)制,前者是java底層級別的,后者是語言級別的,具有更高的可控制性和擴(kuò)展性。兩者除了在使用方式上不同外,在功能特性上還是有很多的不同:

  1. Condition能夠支持不響應(yīng)中斷,而通過使用Object方式不支持;
  2. Condition能夠支持多個等待隊(duì)列(new 多個Condition對象),而Object方式只能支持一個;
  3. Condition能夠支持超時時間的設(shè)置,而Object不支持

參照Object的wait和notify/notifyAll方法,Condition也提供了同樣的方法:

針對Object的wait方法

  1. void await() throws InterruptedException:當(dāng)前線程進(jìn)入等待狀態(tài),如果其他線程調(diào)用condition的signal或者signalAll方法并且當(dāng)前線程獲取Lock從await方法返回,如果在等待狀態(tài)中被中斷會拋出被中斷異常;
  2. long awaitNanos(long nanosTimeout):當(dāng)前線程進(jìn)入等待狀態(tài)直到被通知,中斷或者超時;
  3. boolean await(long time, TimeUnit unit)throws InterruptedException:同第二種,支持自定義時間單位
  4. boolean awaitUntil(Date deadline) throws InterruptedException:當(dāng)前線程進(jìn)入等待狀態(tài)直到被通知,中斷或者到了某個時間

針對Object的notify/notifyAll方法

  1. void signal():喚醒一個等待在condition上的線程,將該線程從等待隊(duì)列中轉(zhuǎn)移到同步隊(duì)列中,如果在同步隊(duì)列中能夠競爭到Lock則可以從等待方法中返回。
  2. void signalAll():與1的區(qū)別在于能夠喚醒所有等待在condition上的線程

2.Condition實(shí)現(xiàn)原理分析

2.1 等待隊(duì)列

要想能夠深入的掌握condition還是應(yīng)該知道它的實(shí)現(xiàn)原理,現(xiàn)在我們一起來看看condiiton的源碼。創(chuàng)建一個condition對象是通過lock.newCondition(),而這個方法實(shí)際上是會new出一個ConditionObject對象,該類是AQS(AQS的實(shí)現(xiàn)原理的文章)的一個內(nèi)部類,有興趣可以去看看。前面我們說過,condition是要和lock配合使用的也就是condition和Lock是綁定在一起的,而lock的實(shí)現(xiàn)原理又依賴于AQS,自然而然ConditionObject作為AQS的一個內(nèi)部類無可厚非。我們知道在鎖機(jī)制的實(shí)現(xiàn)上,AQS內(nèi)部維護(hù)了一個同步隊(duì)列,如果是獨(dú)占式鎖的話,所有獲取鎖失敗的線程的尾插入到同步隊(duì)列,同樣的,condition內(nèi)部也是使用同樣的方式,內(nèi)部維護(hù)了一個 等待隊(duì)列,所有調(diào)用condition.await方法的線程會加入到等待隊(duì)列中,并且線程狀態(tài)轉(zhuǎn)換為等待狀態(tài)。另外注意到ConditionObject中有兩個成員變量:

/** First node of condition queue. */
private transient Node firstWaiter;
/** Last node of condition queue. */
private transient Node lastWaiter;

這樣我們就可以看出來ConditionObject通過持有等待隊(duì)列的頭尾指針來管理等待隊(duì)列。主要注意的是Node類復(fù)用了在AQS中的Node類,其節(jié)點(diǎn)狀態(tài)和相關(guān)屬性可以去看AQS的實(shí)現(xiàn)原理的文章,如果您仔細(xì)看完這篇文章對condition的理解易如反掌,對lock體系的實(shí)現(xiàn)也會有一個質(zhì)的提升。Node類有這樣一個屬性:

//后繼節(jié)點(diǎn)
Node nextWaiter;

進(jìn)一步說明,等待隊(duì)列是一個單向隊(duì)列,而在之前說AQS時知道同步隊(duì)列是一個雙向隊(duì)列。接下來我們用一個demo,通過debug進(jìn)去看是不是符合我們的猜想:

public static void main(String[] args) {
    for (int i = 0; i < 10; i++) {
        Thread thread = new Thread(() -> {
            lock.lock();
            try {
                condition.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }finally {
                lock.unlock();
            }
        });
        thread.start();
    }
}

這段代碼沒有任何實(shí)際意義,甚至很臭,只是想說明下我們剛才所想的。新建了10個線程,沒有線程先獲取鎖,然后調(diào)用condition.await方法釋放鎖將當(dāng)前線程加入到等待隊(duì)列中,通過debug控制當(dāng)走到第10個線程的時候查看firstWaiter即等待隊(duì)列中的頭結(jié)點(diǎn),debug模式下情景圖如下:

debug模式下情景圖

從這個圖我們可以很清楚的看到這樣幾點(diǎn):1. 調(diào)用condition.await方法后線程依次尾插入到等待隊(duì)列中,如圖隊(duì)列中的線程引用依次為Thread-0,Thread-1,Thread-2....Thread-8;2. 等待隊(duì)列是一個單向隊(duì)列。通過我們的猜想然后進(jìn)行實(shí)驗(yàn)驗(yàn)證,我們可以得出等待隊(duì)列的示意圖如下圖所示:

等待隊(duì)列的示意圖

同時還有一點(diǎn)需要注意的是:我們可以多次調(diào)用lock.newCondition()方法創(chuàng)建多個condition對象,也就是一個lock可以持有多個等待隊(duì)列。而在之前利用Object的方式實(shí)際上是指在對象Object對象監(jiān)視器上只能擁有一個同步隊(duì)列和一個等待隊(duì)列,而并發(fā)包中的Lock擁有一個同步隊(duì)列和多個等待隊(duì)列。示意圖如下:

AQS持有多個Condition.png

如圖所示,ConditionObject是AQS的內(nèi)部類,因此每個ConditionObject能夠訪問到AQS提供的方法,相當(dāng)于每個Condition都擁有所屬同步器的引用。

2.2 await實(shí)現(xiàn)原理

當(dāng)調(diào)用condition.await()方法后會使得當(dāng)前獲取lock的線程進(jìn)入到等待隊(duì)列,如果該線程能夠從await()方法返回的話一定是該線程獲取了與condition相關(guān)聯(lián)的lock。接下來,我們還是從源碼的角度去看,只有熟悉了源碼的邏輯我們的理解才是最深的。await()方法源碼為:

public final void await() throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    // 1. 將當(dāng)前線程包裝成Node,尾插入到等待隊(duì)列中
    Node node = addConditionWaiter();
    // 2. 釋放當(dāng)前線程所占用的lock,在釋放的過程中會喚醒同步隊(duì)列中的下一個節(jié)點(diǎn)
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) {
        // 3. 當(dāng)前線程進(jìn)入到等待狀態(tài)
        LockSupport.park(this);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    // 4. 自旋等待獲取到同步狀態(tài)(即獲取到lock)
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    // 5. 處理被中斷的情況
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}

代碼的主要邏輯請看注釋,我們都知道當(dāng)當(dāng)前線程調(diào)用condition.await()方法后,會使得當(dāng)前線程釋放lock然后加入到等待隊(duì)列中,直至被signal/signalAll后會使得當(dāng)前線程從等待隊(duì)列中移至到同步隊(duì)列中去,直到獲得了lock后才會從await方法返回,或者在等待時被中斷會做中斷處理。那么關(guān)于這個實(shí)現(xiàn)過程我們會有這樣幾個問題:1. 是怎樣將當(dāng)前線程添加到等待隊(duì)列中去的?2.釋放鎖的過程?3.怎樣才能從await方法退出?而這段代碼的邏輯就是告訴我們這三個問題的答案。具體請看注釋,在第1步中調(diào)用addConditionWaiter將當(dāng)前線程添加到等待隊(duì)列中,該方法源碼為:

private Node addConditionWaiter() {
    Node t = lastWaiter;
    // If lastWaiter is cancelled, clean out.
    if (t != null && t.waitStatus != Node.CONDITION) {
        unlinkCancelledWaiters();
        t = lastWaiter;
    }
    //將當(dāng)前線程包裝成Node
    Node node = new Node(Thread.currentThread(), Node.CONDITION);
    if (t == null)
        firstWaiter = node;
    else
        //尾插入
        t.nextWaiter = node;
    //更新lastWaiter
    lastWaiter = node;
    return node;
}

這段代碼就很容易理解了,將當(dāng)前節(jié)點(diǎn)包裝成Node,如果等待隊(duì)列的firstWaiter為null的話(等待隊(duì)列為空隊(duì)列),則將firstWaiter指向當(dāng)前的Node,否則,更新lastWaiter(尾節(jié)點(diǎn))即可。就是通過尾插入的方式將當(dāng)前線程封裝的Node插入到等待隊(duì)列中即可,同時可以看出等待隊(duì)列是一個不帶頭結(jié)點(diǎn)的鏈?zhǔn)疥?duì)列,之前我們學(xué)習(xí)AQS時知道同步隊(duì)列是一個帶頭結(jié)點(diǎn)的鏈?zhǔn)疥?duì)列,這是兩者的一個區(qū)別。將當(dāng)前節(jié)點(diǎn)插入到等待對列之后,會使當(dāng)前線程釋放lock,由fullyRelease方法實(shí)現(xiàn),fullyRelease源碼為:

final int fullyRelease(Node node) {
    boolean failed = true;
    try {
        int savedState = getState();
        if (release(savedState)) {
            //成功釋放同步狀態(tài)
            failed = false;
            return savedState;
        } else {
            //不成功釋放同步狀態(tài)拋出異常
            throw new IllegalMonitorStateException();
        }
    } finally {
        if (failed)
            node.waitStatus = Node.CANCELLED;
    }
}

這段代碼就很容易理解了,調(diào)用AQS的模板方法release方法釋放AQS的同步狀態(tài)并且喚醒在同步隊(duì)列中頭結(jié)點(diǎn)的后繼節(jié)點(diǎn)引用的線程,如果釋放成功則正常返回,若失敗的話就拋出異常。到目前為止,這兩段代碼已經(jīng)解決了前面的兩個問題的答案了,還剩下第三個問題,怎樣從await方法退出?現(xiàn)在回過頭再來看await方法有這樣一段邏輯:

while (!isOnSyncQueue(node)) {
    // 3. 當(dāng)前線程進(jìn)入到等待狀態(tài)
    LockSupport.park(this);
    if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
        break;
}

很顯然,當(dāng)線程第一次調(diào)用condition.await()方法時,會進(jìn)入到這個while()循環(huán)中,然后通過LockSupport.park(this)方法使得當(dāng)前線程進(jìn)入等待狀態(tài),那么要想退出這個await方法第一個前提條件自然而然的是要先退出這個while循環(huán),出口就只剩下兩個地方:1. 邏輯走到break退出while循環(huán);2. while循環(huán)中的邏輯判斷為false。再看代碼出現(xiàn)第1種情況的條件是當(dāng)前等待的線程被中斷后代碼會走到break退出,第二種情況是當(dāng)前節(jié)點(diǎn)被移動到了同步隊(duì)列中(即另外線程調(diào)用的condition的signal或者signalAll方法),while中邏輯判斷為false后結(jié)束while循環(huán)。總結(jié)下,就是當(dāng)前線程被中斷或者調(diào)用condition.signal/condition.signalAll方法當(dāng)前節(jié)點(diǎn)移動到了同步隊(duì)列后 ,這是當(dāng)前線程退出await方法的前提條件。當(dāng)退出while循環(huán)后就會調(diào)用acquireQueued(node, savedState),這個方法在介紹AQS的底層實(shí)現(xiàn)時說過了,若感興趣的話可以去看這篇文章,該方法的作用是在自旋過程中線程不斷嘗試獲取同步狀態(tài),直至成功(線程獲取到lock)。這樣也說明了退出await方法必須是已經(jīng)獲得了condition引用(關(guān)聯(lián))的lock。到目前為止,開頭的三個問題我們通過閱讀源碼的方式已經(jīng)完全找到了答案,也對await方法的理解加深。await方法示意圖如下圖:

await方法示意圖

如圖,調(diào)用condition.await方法的線程必須是已經(jīng)獲得了lock,也就是當(dāng)前線程是同步隊(duì)列中的頭結(jié)點(diǎn)。調(diào)用該方法后會使得當(dāng)前線程所封裝的Node尾插入到等待隊(duì)列中。

超時機(jī)制的支持

condition還額外支持了超時機(jī)制,使用者可調(diào)用方法awaitNanos,awaitUtil。這兩個方法的實(shí)現(xiàn)原理,基本上與AQS中的tryAcquire方法如出一轍,關(guān)于tryAcquire可以仔細(xì)閱讀這篇文章的第3.4部分。

不響應(yīng)中斷的支持

要想不響應(yīng)中斷可以調(diào)用condition.awaitUninterruptibly()方法,該方法的源碼為:

public final void awaitUninterruptibly() {
    Node node = addConditionWaiter();
    int savedState = fullyRelease(node);
    boolean interrupted = false;
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this);
        if (Thread.interrupted())
            interrupted = true;
    }
    if (acquireQueued(node, savedState) || interrupted)
        selfInterrupt();
}

這段方法與上面的await方法基本一致,只不過減少了對中斷的處理,并省略了reportInterruptAfterWait方法拋被中斷的異常。

2.3 signal/signalAll實(shí)現(xiàn)原理

調(diào)用condition的signal或者signalAll方法可以將等待隊(duì)列中等待時間最長的節(jié)點(diǎn)移動到同步隊(duì)列中,使得該節(jié)點(diǎn)能夠有機(jī)會獲得lock。按照等待隊(duì)列是先進(jìn)先出(FIFO)的,所以等待隊(duì)列的頭節(jié)點(diǎn)必然會是等待時間最長的節(jié)點(diǎn),也就是每次調(diào)用condition的signal方法是將頭節(jié)點(diǎn)移動到同步隊(duì)列中。我們來通過看源碼的方式來看這樣的猜想是不是對的,signal方法源碼為:

public final void signal() {
    //1. 先檢測當(dāng)前線程是否已經(jīng)獲取lock
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    //2. 獲取等待隊(duì)列中第一個節(jié)點(diǎn),之后的操作都是針對這個節(jié)點(diǎn)
    Node first = firstWaiter;
    if (first != null)
        doSignal(first);
}

signal方法首先會檢測當(dāng)前線程是否已經(jīng)獲取lock,如果沒有獲取lock會直接拋出異常,如果獲取的話再得到等待隊(duì)列的頭指針引用的節(jié)點(diǎn),之后的操作的doSignal方法也是基于該節(jié)點(diǎn)。下面我們來看看doSignal方法做了些什么事情,doSignal方法源碼為:

private void doSignal(Node first) {
    do {
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        //1. 將頭結(jié)點(diǎn)從等待隊(duì)列中移除
        first.nextWaiter = null;
        //2. while中transferForSignal方法對頭結(jié)點(diǎn)做真正的處理
    } while (!transferForSignal(first) &&
             (first = firstWaiter) != null);
}

具體邏輯請看注釋,真正對頭節(jié)點(diǎn)做處理的邏輯在transferForSignal放,該方法源碼為:

final boolean transferForSignal(Node node) {
    /*
     * If cannot change waitStatus, the node has been cancelled.
     */
    //1. 更新狀態(tài)為0
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;

    /*
     * Splice onto queue and try to set waitStatus of predecessor to
     * indicate that thread is (probably) waiting. If cancelled or
     * attempt to set waitStatus fails, wake up to resync (in which
     * case the waitStatus can be transiently and harmlessly wrong).
     */
    //2.將該節(jié)點(diǎn)移入到同步隊(duì)列中去
    Node p = enq(node);
    int ws = p.waitStatus;
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        LockSupport.unpark(node.thread);
    return true;
}

關(guān)鍵邏輯請看注釋,這段代碼主要做了兩件事情1.將頭結(jié)點(diǎn)的狀態(tài)更改為CONDITION;2.調(diào)用enq方法,將該節(jié)點(diǎn)尾插入到同步隊(duì)列中,關(guān)于enq方法請看AQS的底層實(shí)現(xiàn)這篇文章。現(xiàn)在我們可以得出結(jié)論:調(diào)用condition的signal的前提條件是當(dāng)前線程已經(jīng)獲取了lock,該方法會使得等待隊(duì)列中的頭節(jié)點(diǎn)即等待時間最長的那個節(jié)點(diǎn)移入到同步隊(duì)列,而移入到同步隊(duì)列后才有機(jī)會使得等待線程被喚醒,即從await方法中的LockSupport.park(this)方法中返回,從而才有機(jī)會使得調(diào)用await方法的線程成功退出。signal執(zhí)行示意圖如下圖:

signal執(zhí)行示意圖

signalAll

sigllAll與sigal方法的區(qū)別體現(xiàn)在doSignalAll方法上,前面我們已經(jīng)知道doSignal方法只會對等待隊(duì)列的頭節(jié)點(diǎn)進(jìn)行操作,,而doSignalAll的源碼為:

private void doSignalAll(Node first) {
    lastWaiter = firstWaiter = null;
    do {
        Node next = first.nextWaiter;
        first.nextWaiter = null;
        transferForSignal(first);
        first = next;
    } while (first != null);
}

該方法只不過時間等待隊(duì)列中的每一個節(jié)點(diǎn)都移入到同步隊(duì)列中,即“通知”當(dāng)前調(diào)用condition.await()方法的每一個線程。

3. await與signal/signalAll的結(jié)合思考

文章開篇提到等待/通知機(jī)制,通過使用condition提供的await和signal/signalAll方法就可以實(shí)現(xiàn)這種機(jī)制,而這種機(jī)制能夠解決最經(jīng)典的問題就是“生產(chǎn)者與消費(fèi)者問題”,關(guān)于“生產(chǎn)者消費(fèi)者問題”之后會用單獨(dú)的一篇文章進(jìn)行講解,這也是面試的高頻考點(diǎn)。await和signal和signalAll方法就像一個開關(guān)控制著線程A(等待方)和線程B(通知方)。它們之間的關(guān)系可以用下面一個圖來表現(xiàn)得更加貼切:

condition下的等待通知機(jī)制.png

如圖,線程awaitThread先通過lock.lock()方法獲取鎖成功后調(diào)用了condition.await方法進(jìn)入等待隊(duì)列,而另一個線程signalThread通過lock.lock()方法獲取鎖成功后調(diào)用了condition.signal或者signalAll方法,使得線程awaitThread能夠有機(jī)會移入到同步隊(duì)列中,當(dāng)其他線程釋放lock后使得線程awaitThread能夠有機(jī)會獲取lock,從而使得線程awaitThread能夠從await方法中退出執(zhí)行后續(xù)操作。如果awaitThread獲取lock失敗會直接進(jìn)入到同步隊(duì)列。

3. 一個例子

我們用一個很簡單的例子說說condition的用法:

public class AwaitSignal {
    private static ReentrantLock lock = new ReentrantLock();
    private static Condition condition = lock.newCondition();
    private static volatile boolean flag = false;

    public static void main(String[] args) {
        Thread waiter = new Thread(new waiter());
        waiter.start();
        Thread signaler = new Thread(new signaler());
        signaler.start();
    }

    static class waiter implements Runnable {

        @Override
        public void run() {
            lock.lock();
            try {
                while (!flag) {
                    System.out.println(Thread.currentThread().getName() + "當(dāng)前條件不滿足等待");
                    try {
                        condition.await();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                System.out.println(Thread.currentThread().getName() + "接收到通知條件滿足");
            } finally {
                lock.unlock();
            }
        }
    }

    static class signaler implements Runnable {

        @Override
        public void run() {
            lock.lock();
            try {
                flag = true;
                condition.signalAll();
            } finally {
                lock.unlock();
            }
        }
    }
}

輸出結(jié)果為:

Thread-0當(dāng)前條件不滿足等待
Thread-0接收到通知,條件滿足

開啟了兩個線程waiter和signaler,waiter線程開始執(zhí)行的時候由于條件不滿足,執(zhí)行condition.await方法使該線程進(jìn)入等待狀態(tài)同時釋放鎖,signaler線程獲取到鎖之后更改條件,并通知所有的等待線程后釋放鎖。這時,waiter線程獲取到鎖,并由于signaler線程更改了條件此時相對于waiter來說條件滿足,繼續(xù)執(zhí)行。

參考文獻(xiàn)

《java并發(fā)編程的藝術(shù)》

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容