BlockingQueue與Condition原理解析

我在前段時(shí)間寫了一篇關(guān)于AQS的文章,在文章里邊我說幾乎所有在JUC包中的所有多線程相關(guān)的類都和AQS相關(guān),今天我就在這里總結(jié)一下另一個(gè)依賴于AQS來實(shí)現(xiàn)的同步工具類:BlockingQueue。我們主要以ArrayBlockingQueue為主來分析相關(guān)的源碼。

阻塞隊(duì)列

相信大多數(shù)同學(xué)都是在學(xué)習(xí)線程池相關(guān)知識時(shí)了解到阻塞隊(duì)列的概念的。知道各種類型的阻塞隊(duì)列對線程池初始化時(shí)的影響。在java doc中這樣定義阻塞隊(duì)列。當(dāng)從阻塞隊(duì)列獲取元素但是隊(duì)列為空時(shí),當(dāng)前線程會(huì)阻塞直到另一個(gè)線程向阻塞隊(duì)列中添加一個(gè)元素;類似的,當(dāng)向一個(gè)阻塞隊(duì)列加入元素時(shí),如果隊(duì)列已經(jīng)滿了,當(dāng)前線程也會(huì)阻塞知道另外一個(gè)線程從隊(duì)列中讀取一個(gè)元素。阻塞隊(duì)列一般都是FIFO,用來實(shí)現(xiàn)生產(chǎn)者和消費(fèi)者模式。阻塞隊(duì)列的方法通過四種不同的方式來處理操作無法被立即完成的情況,這四種情況分別為拋出異常,返回特殊值(null或在是false),阻塞當(dāng)前線程直到執(zhí)行結(jié)束,最后一種是只阻塞固定時(shí)間,然后還未執(zhí)行成功就放棄操作。這些方法都總結(jié)在下邊這種表中了。

BlockingQueue.png

我們就只分析puttake方法。

put和take函數(shù)

我們都知道,使用同步隊(duì)列可以很輕松的實(shí)現(xiàn)生產(chǎn)者-消費(fèi)者模式,其實(shí),同步隊(duì)列就是按照生產(chǎn)者-消費(fèi)者的模式來實(shí)現(xiàn)的,我們可以將put函數(shù)看作生產(chǎn)者的操作,take是消費(fèi)者的操作。
?put函數(shù)會(huì)在隊(duì)列末尾添加元素,如果隊(duì)列已經(jīng)滿了,無法添加元素的話,就一直阻塞等待到可以加入為止。函數(shù)的源碼如下所示。

    public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly(); //先獲得鎖
        try {
            while (count == items.length) 
            //如果隊(duì)列滿了,就NotFull這個(gè)condition對象上進(jìn)行等待
                notFull.await();
            enqueue(e);
        } finally {
            lock.unlock();
        }
    }
    private void enqueue(E x) {
        final Object[] items = this.items;
        items[putIndex] = x;
        //這里可以注意的是ArrayBlockingList實(shí)際上使用Array實(shí)現(xiàn)了一個(gè)環(huán)形數(shù)組,
       //當(dāng)putIndex達(dá)到最大時(shí),就返回到起點(diǎn),繼續(xù)插入,
       //當(dāng)然,如果此時(shí)0位置的元素還沒有被取走,
       //下次put時(shí),就會(huì)因?yàn)閏out == item.length未被阻塞。
        if (++putIndex == items.length)
            putIndex = 0;
        count++;
        //因?yàn)椴迦肓嗽?,通知等待notEmpty事件的線程。
        notEmpty.signal();
    }

我們會(huì)發(fā)現(xiàn)put函數(shù)也是使用了wait/notify的機(jī)制。與一般生產(chǎn)者-消費(fèi)者的實(shí)現(xiàn)方式不同,同步隊(duì)列使用ReentrantLockCondition相結(jié)合的先獲得鎖,再等待的機(jī)制;而不是synchronizedObject.wait的機(jī)制。這里的區(qū)別我們下一節(jié)再詳細(xì)講解。
?看完了生產(chǎn)者相關(guān)的put函數(shù),我們再來看一下消費(fèi)者調(diào)用的take函數(shù)。take函數(shù)在隊(duì)列為空時(shí)會(huì)被阻塞,一直到阻塞隊(duì)列加入了新的元素。

    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0)
            //如果隊(duì)列為空,那么在notEmpty對象上等待,
            //當(dāng)put函數(shù)調(diào)用時(shí),會(huì)調(diào)用notEmpty的notify進(jìn)行通知。
                notEmpty.await();
            return dequeue();
        } finally {
            lock.unlock();
        }
    }
    private E dequeue() {
        E x = (E) items[takeIndex];
        items[takeIndex] = null; //取出takeIndex位置的元素
        if (++takeIndex == items.length)
            //如果到了尾部,將指針重新調(diào)整到頭部
            takeIndex = 0;
        count--;
        ....
        //通知notFull對象上等待的線程
        notFull.signal();
        return x;
    }

Condition.await和Object.wait

我們發(fā)現(xiàn)ArrayBlockingList并沒有使用Object.wait,而是使用的Condition.await,這是為什么呢?其中又有哪些原因呢?
?Condition對象可以提供和Objectwaitnotify一樣的行為,但是后者必須使用synchronized這個(gè)內(nèi)置的monitor鎖,而Condition使用的是RenentranceLock。這兩種方式在阻塞等待時(shí)都會(huì)將相應(yīng)的鎖釋放掉,但是Condition的等待可以中斷,這是二者唯一的區(qū)別。
 Condition的流程大致如下邊兩張圖所示.

await
notify

我們首先來看一下await函數(shù)的實(shí)現(xiàn),詳細(xì)的講解都在代碼中.

public final void await() throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    //在condition wait隊(duì)列上添加新的節(jié)點(diǎn)
    Node node = addConditionWaiter();
    //釋放當(dāng)前持有的鎖
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    //由于node在之前是添加到condition wait queue上的,現(xiàn)在判斷這個(gè)node
    //是否被添加到Sync的獲得鎖的等待隊(duì)列上。
    //node在condition queue上說明還在等待事件的notify,
    //notify函數(shù)會(huì)將condition queue 上的node轉(zhuǎn)化到Sync的隊(duì)列上。
    while (!isOnSyncQueue(node)) {
        //node還沒有被添加到Sync Queue上,說明還在等待事件通知
        //所以調(diào)用park函數(shù)來停止線程執(zhí)行
        LockSupport.park(this);
        //判斷是否被中斷,線程從park函數(shù)返回有兩種情況,一種是
        //其他線程調(diào)用了unpark,另外一種是線程被中斷
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    //代碼執(zhí)行到這里,已經(jīng)有其他線程調(diào)用notify函數(shù),或則被中斷,該線程可以繼續(xù)執(zhí)行,但是必須先
    //再次獲得調(diào)用await函數(shù)時(shí)的鎖.a(chǎn)cquireQueued函數(shù)在AQS文章中做了介紹.
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
   ....
}

final int fullyRelease(Node node) {
    //AQS的方法,當(dāng)前已經(jīng)在鎖中了,所以直接操作
    boolean failed = true;
    try {
        int savedState = getState();
        //獲取state當(dāng)前的值,然后保存,以待以后恢復(fù)
        if (release(savedState)) {
            failed = false;
            return savedState;
        } else {
            throw new IllegalMonitorStateException();
        }
    } finally {
        if (failed)
            node.waitStatus = Node.CANCELLED;
    }
}

/**
 * Checks for interrupt, returning THROW_IE if interrupted
 * before signalled, REINTERRUPT if after signalled, or
 * 0 if not interrupted.
 */
private int checkInterruptWhileWaiting(Node node) {
    //中斷可能發(fā)生在兩個(gè)階段中,一是在等待singla,另外一個(gè)是在獲得signal之后
    return Thread.interrupted() ?
        (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
        0;
}

final boolean transferAfterCancelledWait(Node node) {
    //這里要和下邊的transferForSignal對應(yīng)著看,這是線程中斷進(jìn)入的邏輯.那邊是signal的邏輯
    //兩邊可能有并發(fā)沖突,但是成功的一方必須調(diào)用enq來進(jìn)入acquire lock queue中.
    if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
        enq(node);
        return true;
    }
    //如果失敗了,說明transferForSignal那邊成功了,等待node 進(jìn)入acquire lock queue
    while (!isOnSyncQueue(node))
        Thread.yield();
    return false;
}

signal函數(shù)將等待事件最長時(shí)間的線程節(jié)點(diǎn)從等待condition的隊(duì)列移動(dòng)到獲得lock的等待隊(duì)列中.

public final void signal() {
    //
    if (!isHeldExclusively())
    //如果當(dāng)前線程沒有獲得鎖,拋出異常
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        //將Condition wait queue中的第一個(gè)node轉(zhuǎn)移到acquire lock queue中.
        doSignal(first);
}

private void doSignal(Node first) {
    do {
   //由于生產(chǎn)者的signal在有消費(fèi)者等待的情況下,必須要通知
        //一個(gè)消費(fèi)者,所以這里有一個(gè)循環(huán),直到隊(duì)列為空
        //把first 這個(gè)node從condition queue中刪除掉
        //condition queue的頭指針指向node的后繼節(jié)點(diǎn),如果node后續(xù)節(jié)點(diǎn)為null,那么也將尾指針也置為null
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        first.nextWaiter = null;
    } while (!transferForSignal(first) &&
             (first = firstWaiter) != null);
    //transferForSignal將node轉(zhuǎn)而添加到Sync的acquire lock 隊(duì)列
}

final boolean transferForSignal(Node node) {
    //如果設(shè)置失敗,說明該node已經(jīng)被取消了,所以返回false,讓doSignal繼續(xù)向下通知其他未被取消的node
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;
    //將node添加到acquire lock queue中.
    Node p = enq(node);
    int ws = p.waitStatus;
    //需要注意的是這里的node進(jìn)行了轉(zhuǎn)化
    //ws>0代表canceled的含義所以直接unpark線程
    //如果compareAndSetWaitStatus失敗,所以直接unpark,讓線程繼續(xù)執(zhí)行await中的
    //進(jìn)行isOnSyncQueue判斷的while循環(huán),然后進(jìn)入acquireQueue函數(shù).
    //這里失敗的原因可能是Lock其他線程釋放掉了鎖,同步設(shè)置p的waitStatus
    //如果compareAndSetWaitStatus成功了呢?那么該node就一直在acquire lock queue中
    //等待鎖被釋放掉再次搶奪鎖,然后再unpark
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        LockSupport.unpark(node.thread);
    return true;
}

后記

?后邊一篇文章主要講解如何自己使用AQS來創(chuàng)建符合自己業(yè)務(wù)需求的鎖,請大家繼續(xù)關(guān)注我的文章啦.一起進(jìn)步偶.

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容