【并發(fā)編程系列10】阻塞隊列之SynchronousQueue,LinkedTransferQueue原理分析

前言

前面我們介紹了ArrayBlockingQueue,LinkedBlockingQueue,LinkedBlockingDeque和 PriorityBlockingQueue,DelayQueue五種阻塞隊列,這一次就繼續(xù)介紹Java中提供的7種阻塞隊列中的最后兩種:SynchronousQueue和LinkedTransferQueue。

雙隊列

雙隊列是一個節(jié)點可以表示數(shù)據(jù)或者請求的隊列。即一個存在的節(jié)點可能表示put一個元素進去也可能是take()一個元素出來。

本文中敘述的兩種隊列SynchronousQueue和LinkedTransferQueue均是利用了雙隊列的特性:

  • put(E)操作時,隊列中的節(jié)點代表一個元素,也就說表示的是數(shù)據(jù)
  • take()操作時,如果隊列中無元素,會放一個null的項到隊列中占位,這時候表示的是一個請求,而不是一個數(shù)據(jù)。

SynchronousQueue

對于一個SynchronousQueue來說,每個插入操作都必須對應等待另一個線程的刪除操作,反之亦然。其沒有任何內(nèi)部容量,甚至連1都沒有。所以不能執(zhí)行peek()之類的方法獲取一個元素,因為一個元素只有在嘗試移除的時候才會出現(xiàn),也不能使用任何方法去插入一個元素,除非另一個線程正好在嘗試移除它,也不能去迭代,因為沒有任何東西可以迭代。


【并發(fā)編程系列10】阻塞隊列之SynchronousQueue,LinkedTransferQueue原理分析
在這里插入圖片描述

SynchronousQueue的head是第一個排隊插入線程試圖添加到隊列的元素,如果沒有這樣的排隊線程那么就沒有元素可以被移除,所以執(zhí)行poll()的時候就會返回null。

SynchronousQueue可以看成是一個傳球手,負責把生產(chǎn)者線程處理的數(shù)據(jù)直接傳遞給消費者線程。隊列本身并不存儲任何元素,非常適合傳遞性場景。

SynchronousQueue的吞吐量高于LinkedBlockingQueue和ArrayBlockingQueue,因為其內(nèi)部是通過CAS和自旋來實現(xiàn)并發(fā),沒有通過鎖來控制,減少了鎖的開銷。

先來看看類圖:


在這里插入圖片描述

SynchronousQueue提供了兩個構造器,默認構造器是非公平策略,可以通過第二個構造器傳入?yún)?shù)true來構造一個公平策略:


在這里插入圖片描述

可以看到,公平策略構造的是一個TransferQueue,而非公平構造的是一個TransferStack。

公平策略(TransferQueue)

TransferQueue是SynchronousQueue的一個內(nèi)部類,構造器如下:


在這里插入圖片描述

TransferQueue內(nèi)部是通過QNode節(jié)點來維持一個隊列,QNode是TransferQueue的一個內(nèi)部類:


在這里插入圖片描述

put(E)方法就是往隊列里面添加一個元素,并需要等待另一個線程來take(),如果沒有線程來取走,則put(E)線程會阻塞,反之也一樣。put(E)和take()方法必須要要成對出現(xiàn),否則就會一直阻塞。


在這里插入圖片描述
在這里插入圖片描述

從上面代碼可以看到,put(E)和take都是調(diào)用的同一個方法transfer,通過不同的參數(shù)來區(qū)分。

E transfer(E e, boolean timed, long nanos) {
            QNode s = null; // constructed/reused as needed
            boolean isData = (e != null);//e==null表示當前是消費者(take操作),e!=null表示當前是生產(chǎn)者(put操作)

            for (;;) {
                QNode t = tail;
                QNode h = head;
                //表示還沒有初始化(初始化之后不可能為空),繼續(xù)自旋
                if (t == null || h == null)         // saw uninitialized value
                    continue;                       // spin

                //head=tail或者tail節(jié)點的模式和當前操作模式相同
                if (h == t || t.isData == isData) { // empty or same-mode
                    QNode tn = t.next;//獲得當前tail節(jié)點的next節(jié)點
                    //t和tail不一致,說明有其他線程操作過,繼續(xù)自旋
                    if (t != tail)    // inconsistent read
                        continue;
                    //tail.next正常是null,不為null說明其他線程新增了元素到tail.next
                    if (tn != null) {         // lagging tail
                        advanceTail(t, tn);//嘗試幫助其他線程將tail.next設置為tail,然后繼續(xù)自旋
                        continue;
                    }
                    //如果當前調(diào)用的是超時方法,且到時間了,直接返回null
                    if (timed && nanos <= 0)        // can't wait
                        return null;
                    if (s == null)//第一次進來肯定是null
                        //如果是put進來,e不為null,如果是take進來,e==null,也就是初始化了一個空節(jié)點
                        s = new QNode(e, isData);//初始化當前元素為QNode
                    //原tail節(jié)點為next
                    if (!t.casNext(null, s))        // failed to link in
                        continue;//替換tail節(jié)點的next節(jié)點為當前節(jié)點,失敗則繼續(xù)

                    //走到這里說明上面的CAS成功,需要將s設置為新的tail節(jié)點
                    //這里是一定會成功的,因為上面的cas完成之后,其他線程只能一直自旋等待
                    advanceTail(t, s);              // swing tail and wait
                    //節(jié)點添加進去之后,阻塞等待,直接消費者線程來消費
                    Object x = awaitFulfill(s, e, timed, nanos);
                    //x==s表示已經(jīng)被取消
                    if (x == s) { // wait was cancelled
                        clean(t, s);//清除
                        return null;//直接返回null
                    }

                    if (!s.isOffList()) {           // not already unlinked
                        advanceHead(t, s);          // unlink if head
                        if (x != null)              // and forget fields
                            s.item = s;
                        s.waiter = null;
                    }
                    return (x != null) ? (E)x : e;
                } else {//互補模式          // complementary-mode
                    QNode m = h.next;//拿到tail.next       // node to fulfill
                    //出現(xiàn)讀不一致的情況,繼續(xù)自旋
                    if (t != tail || m == null || h != head)
                        continue;    // inconsistent read

                    Object x = m.item;//拿到節(jié)點中的item
                    //走到這里時的isData一定等于false,如果false=(x!=null)就說明x==null,元素已經(jīng)被其他元素拿走了,繼續(xù)自旋
                    if (isData == (x != null) ||    // m already fulfilled
                        x == m ||//說明已經(jīng)被取消了   // m cancelled
                        !m.casItem(x, e)) {   // lost CAS
                        //上面任意一個條件失敗都說明當前已被其他線程取走了元素,所以幫助那個線程cas替換一下頭節(jié)點
                        advanceHead(h, m);          // dequeue and retry
                        continue;
                    }

                    //走到這說明元素是被當前線程取走了,cas替換一下頭節(jié)點
                    advanceHead(h, m);              // successfully fulfilled
                    LockSupport.unpark(m.waiter);//喚醒阻塞線程繼續(xù)傳遞元素
                    //x!=null說明是被當前線程獲得了元素,那么返回x,否則就是被其他線程拿走了,返回e
                    return (x != null) ? (E)x : e;
                }
            }
        }

這個方法的看起來很長,其實是因為SynchronousQueue內(nèi)部不通過鎖來控制并發(fā),而是通過CAS和自旋來控制并發(fā),所以會有很多的if判斷。
根據(jù)上面的方法,主要可以分為兩種場景:一種是先put(E)再take(),另一種是先take()再put(E)。

初始化

初始化的時候調(diào)用上面的構造器TransferQueue(),默認得到一個哨兵節(jié)點,里面的元素是空的,這個isData就是說這個節(jié)點是不是一個有效數(shù)據(jù),只有item!=null才表示一個有效數(shù)據(jù):


在這里插入圖片描述

先put(E)再take()

假如先put(E),因為沒有對用的take()操作,線程會被阻塞直到有take()出現(xiàn)。

線程t1過來put(1)

這時候h==t走的是第一個if分支,至少在第2次自旋的時候將元素1包裝成節(jié)點QNode之后假如隊列,并會進入方法awaitFulfill阻塞等待傳遞元素:

 Object awaitFulfill(QNode s, E e, boolean timed, long nanos) {
            /* Same idea as TransferStack.awaitFulfill */
            final long deadline = timed ? System.nanoTime() + nanos : 0L;//獲得超時時間
            Thread w = Thread.currentThread();//當前執(zhí)行的線程
            int spins = ((head.next == s) ?
                         (timed ? maxTimedSpins : maxUntimedSpins) : 0);//獲得自旋次數(shù)
            for (;;) {
                if (w.isInterrupted())//如果線程被中斷了
                    s.tryCancel(e);//嘗試取消,注意這里取消后,會將s中的item和s替換。即s.item變成了s.QNode
                Object x = s.item;//拿到當前節(jié)點的item
                if (x != e)//不相等說明被取消或者值已經(jīng)被取走或者已經(jīng)有值放進來
                    return x;//直接返回自己
                if (timed) {//如果當前方法是計時方法
                    nanos = deadline - System.nanoTime();//獲取剩余時間
                    if (nanos <= 0L) {//如果已經(jīng)到時間了
                        s.tryCancel(e);//嘗試取消
                        continue;
                    }
                }
                if (spins > 0)
                    --spins;//自旋次數(shù)>0,則減1后繼續(xù)自旋
                else if (s.waiter == null)
                    s.waiter = w;
                else if (!timed)//非計時方法
                    LockSupport.park(this);//如果當前方法不是帶超時時間的,則直接掛起直到喚醒
                else if (nanos > spinForTimeoutThreshold)//如果到了自旋次數(shù),且還沒到指定的超時時間,就掛起指定的剩余時間
                    LockSupport.parkNanos(this, nanos);
            }
        }

因為調(diào)用的put(1)沒有帶超時時間,所以會被LockSupport.park(this)阻塞,這時候得到了如下隊列:


在這里插入圖片描述

主要經(jīng)過如下5個步驟:

  • 1、將元素初始化成為一個QNode。
  • 2、將tail.next指向當前新構建的QNode(CAS操作)。
  • 3、將新構建的QNode設置為tail節(jié)點(CAS操作)。
  • 4、將當前QNode節(jié)點中的waiter屬性設置為當前線程(awaitFulfill方法)。
  • 5、掛起前線程(awaitFulfill方法)。

線程t2過來put(2)

這時候h和t不相等了,但是isData都為true,所以t2過來的流程一樣,還是會走if分支,然后會繼續(xù)將元素2添加到隊尾,得到如下隊列:


在這里插入圖片描述

注意,QNode還有一個屬性waiter,是用來記錄當前節(jié)點是哪個線程放進來的,因為后面當節(jié)點被take()走了之后,需要知道當前節(jié)點是由哪個線程放進來的,然后去喚醒對應線程。

我們可以看到,SynchronousQueue號稱是不存儲元素的,但是不存儲元素并不代表它內(nèi)部沒有隊列,內(nèi)部還是會有一個隊列的,只不過每個線程過來put(E)的時候,如果沒有對應的take()來匹配,那么線程就一直卡住了,也就是元素不會一直停留在隊列,而是會等待被轉移(transfer)。

線程t3過來take()

這時候來了一個線程t3過來take(),這時候因為h!=t,且take()的時候isData=false,和tail節(jié)點中的isData不一致了,會走else分支。

因為head節(jié)點是一個哨兵節(jié)點(空元素),而這又是公平模式,也就是必須滿足FIFO,所以會從head.next開始轉移元素。
最終得到如下最新的隊列:


在這里插入圖片描述

主要經(jīng)過如下步驟:

  • 1、將head.next中的item設置為null(CAS操作)。
  • 2、將head.next設置為新的head節(jié)點(advanceHead方法)。
  • 3、將原h(huán)ead節(jié)點的next指向自己(advanceHead方法)。
  • 4、通過原節(jié)點的waiter屬性,將原先線程喚醒。
  • 5、返回獲取到的元素。

注意,這里將元素1取走之后,原先的線程t1被喚醒,喚醒之后會在方法awaitFulfill繼續(xù)自旋,這時候執(zhí)行到if (x != e)條件的時候就會成立了,所以會返回x。然后回到transfer方法,將元素返回,t1線程結束。

線程t4過來take()

這時候的步驟和上面也是一樣,最終得到如下隊列:


在這里插入圖片描述

回到了原始的初始化狀態(tài),只保留了一個哨兵節(jié)點。

先take()再put(E)

假如先take()進來,步驟和上面put(E)基本一致,唯一的區(qū)別就是take()會先搶占一個隊列的位置,將一個item==null的節(jié)點加入隊列。

線程t1過來take()

線程t1過來take()因為一開始h==t,還是會走的if邏輯,最終會得到如下隊列:


在這里插入圖片描述

主要經(jīng)過如下步驟:

  • 1、將一個null元素初始化成為一個QNode。
  • 2、將tail.next指向當前新構建的QNode(CAS操作)。
  • 3、將新構建的QNode設置為tail節(jié)點(CAS操作)。
  • 4、將當前QNode節(jié)點中的waiter屬性設置為當前線程(awaitFulfill方法)。
  • 5、掛起前線程(awaitFulfill方法)。

除了第1個步驟,其他都和首先進來put(E)的步驟一樣

線程t2過來put(1)

這時候因為if條件不滿足,會走else分支,先將元素1賦值到之前被線程t1占的位置,最終得到如下隊列:


在這里插入圖片描述

主要經(jīng)過如下步驟:

  • 1、將head.next中的item設置為1(CAS操作)。
  • 2、將head.next設置為新的head節(jié)點(advanceHead方法)。
  • 3、將原h(huán)ead節(jié)點的next指向自己(advanceHead方法)。
  • 4、通過原節(jié)點的waiter屬性,將原先線程喚醒。
  • 5、返回成功put進去到的元素。

接下來將原先的t1線程喚醒,t1線程喚醒之后會繼續(xù)將節(jié)點中的item設置為自己,然后返回拿到的元素:


在這里插入圖片描述

非公平策略(TransferStack)

非公平策略是通過其內(nèi)部類TransferStack來實現(xiàn)的,思想基本和TransferQueue一致,唯一的區(qū)別就是TransferStack是非公平的,也就是LIFO模式,在這里就不詳細介紹了。

LinkedTransferQueue

LinkedTransferQueue是一個由鏈表結構組成的無界阻塞TransferQueue隊列。相對于其他阻塞隊列,LinkedTransferQueue多了tryTransfer和transfer方法。LinkedTransferQueue和SynchronousQueue中的公平策略使用的算法是一樣的,唯一的區(qū)別是SynchronousQueue內(nèi)部不會存儲哪怕1個元素,而LinkedTransferQueue內(nèi)部會存儲元素。

松弛度

正常隊列中,當移除一個元素的時候,就會同步移動head和tail節(jié)點的指針,為了最大程序的保證性能LinkedTransferQueue不會實時去更新head和tail的指針,而是引入了一個松弛度的概念。

松弛度指的是head值與第一個不匹配節(jié)點之間的目標最大距離,反之對tail也是如此。這個值這一般為1-3(根據(jù)經(jīng)驗得出),在LinkedTransferQueue中松弛度定義為2。因為如果太大了會增加緩存丟失的成本或者長遍歷鏈的風險,而較小的話就會增加CAS的開銷。

LinkedTransferQueue原理分析

LinkedTransferQueue內(nèi)部也是通過CAS和自旋來實現(xiàn)并發(fā)控制,所以也是一種效率比較高的隊列。

下面還是先來看看LinkedTransferQueue類圖:


在這里插入圖片描述

相比較于其他阻塞隊列,多了一個TransferQueue接口,我們先來看看TransferQueue接口中核心的幾個方法:

方法 功能
tryTransfer(e) 傳遞一個元素給正在等待的消費者,如果沒有正在等待的消費者,則返回false
transfer(e) 傳遞一個元素給正在等待的消費者,如果沒有正在等待的消費者,則阻塞等待
tryTransfer(e,time,uint) 傳遞一個元素給正在等待的消費者,如果沒有正在等待的消費者,則阻塞等待指定時間,過了超時時間之后,仍沒有消費者,則直接返回false
hasWaitingConsumer() 至少有一個消費者正在等待接收元素則返回true
getWaitingConsumerCount() 返回正在等待的消費者數(shù)量,返回的值是一個近似值,因為消費者可能很快就完成消費或者放棄等待

初始化

初始化的時候什么也不做,并不會在內(nèi)部構造一個初始節(jié)點,addAll()實際上也是循環(huán)調(diào)用了add(E)方法:


在這里插入圖片描述
在這里插入圖片描述

然后我們再看看其他方法,add,put,take,offer等,都是調(diào)用了一個共同的方法xfer,只不過通過不同參數(shù)來控制。


在這里插入圖片描述

xfer方法

private E xfer(E e, boolean haveData, int how, long nanos) {
        if (haveData && (e == null))//如果當前是put操作,且e==null,則拋出異常
            throw new NullPointerException();
        Node s = null;                        // the node to append, if needed

        retry:
        for (;;) {                            // restart on append race
            //從head開始循環(huán)匹配
            for (Node h = head, p = h; p != null;) { // find & match first node
                boolean isData = p.isData;
                Object item = p.item;
                //如果元素還沒被匹配過,也就是還在隊列里
                if (item != p && (item != null) == isData) { // unmatched
                    //如果相等,就說明是兩個相同操作,直接不用執(zhí)行后面了,互補操作才能往后走
                    if (isData == haveData)   // can't match
                        break;
                    //將p中的item替換成e
                    if (p.casItem(item, e)) { // match
                        //假如有一個隊列是有元素的,第一次被take()的時候,q==h是進不了for循環(huán)的,所以會直接返回
                        //第2次進來會先匹配一次head節(jié)點,匹配不上,在匹配第2個節(jié)點,這就相當于松弛度=2了,所以
                        //這時候是滿足條件的,可以進入for循環(huán),
                        for (Node q = p; q != h;) {
                            Node n = q.next;  // update by 2 unless singleton
                            //移動head指針
                            if (head == h && casHead(h, n == null ? q : n)) {
                                h.forgetNext();
                                break;
                            }                 // advance and retry
                            if ((h = head)   == null ||
                                (q = h.next) == null || !q.isMatched())
                                break;        // unless slack < 2
                        }
                        LockSupport.unpark(p.waiter);
                        return LinkedTransferQueue.<E>cast(item);
                    }
                }
                Node n = p.next;
                p = (p != n) ? n : (h = head); // Use head if p offlist
            }

            if (how != NOW) {                 // No matches available
                if (s == null)
                    s = new Node(e, haveData);//初始化節(jié)點
                Node pred = tryAppend(s, haveData);
                if (pred == null)
                    continue retry;           // lost race vs opposite mode
                if (how != ASYNC)//take()或者帶超時時間的方法會走這里
                    return awaitMatch(s, pred, e, (how == TIMED), nanos);
            }
            return e; // not waiting
        }
    }

這個方法也要分為兩種方式,先put(E)再take()和先take()再put(E)。

先put(E)再take()

put(E)操作不會進行阻塞,成功之后直接返回。

線程t1過來put(1)

因為head和tail都是null(一開始不會初始化隊列),所以上面的第2個for循環(huán)是進不去的,會走到后面這里初始化node,并加入到隊列中,最終得到如下隊列:


在這里插入圖片描述

可以看到,這時候tail節(jié)點并沒有被初始化,這是因為利用了松弛度,松弛度要等于2才會移動tail指針(這是一種性能的優(yōu)化),我們看看tryAppend方法(主要是看紅框部分):


在這里插入圖片描述

主要分為以下步驟:

  • 1、初始化Node節(jié)點
  • 2、將Node節(jié)點設為head節(jié)點

線程t2過來put(2)

線程t2再進來put的時候,因為滿足松弛度=2了,這時候就會移動tail指針,所以會得到如下隊列:


在這里插入圖片描述

主要分為以下步驟:

  • 1、初始化Node節(jié)點
  • 2、將Node節(jié)點設為head.next節(jié)點
  • 3、達到松弛度,將新Node設置為tail節(jié)點
    后面如果再有元素過來添加,到第3個元素的時候,tail也是不會移動的,要第3個元素才會移動tail,這里就不再繼續(xù)舉例了。

線程t3過來take()

這時候來take會將head節(jié)點的元素設為null,然后直接返回,得到如下隊列:


在這里插入圖片描述

這時候因為松弛度還沒達到2,不會移動head指針。
主要經(jīng)過如下步驟:

  • 1、將head節(jié)點中的item設置為null。
  • 2、返回獲取到的item,。

線程t4過來take()

這時候首先會循環(huán)head節(jié)點,發(fā)現(xiàn)不匹配,然后循環(huán)到head.next,得到如下隊列:


在這里插入圖片描述

這里因為松弛度達到2,所以會移動head指針。
主要經(jīng)過如下步驟:

  • 1、循環(huán)head節(jié)點,發(fā)現(xiàn)不匹配。
  • 2、循環(huán)head.next,匹配上,將head.next中item設置為null。
  • 3、移動head指針,指向head.next。

先take()再put(E)

先take()的時候,因為隊列中還沒有元素,所以會先自旋,自旋一定次數(shù)之后就阻塞,直到有元素put(E)進來然后喚醒線程。

線程t1過來take()

和上面第一次put(E)一樣,上面的第2個for循環(huán)進不去,會走到后面這里初始化node,并加入到隊列中,最終得到如下隊列:


在這里插入圖片描述

主要經(jīng)過如下步驟:

  • 1、初始化一個item=null的Node節(jié)點。
  • 2、將Node節(jié)點設置為head節(jié)點。
  • 3、自旋一定次數(shù)(awaitMatch方法)。
  • 4、達到自旋次數(shù)后還沒有線程過來take(),執(zhí)行park掛起線程(awaitMatch方法)。

注意,上面的Node中也有一個waier屬性用來存儲線程信息,后面喚醒需要獲取waiter中的線程

線程t2過來take()

這時候因為松弛度達到2了,會移動tail指針,最終得到如下隊列:


在這里插入圖片描述

線程t3過來put(1)

這時候因為前面take()的時候,隊列中已經(jīng)有了item=null的元素(node!=null),所以會直接進入第2個for循環(huán),然后將值替換進head節(jié)點中的item,最后喚醒t1線程,t1線程被喚醒之后,會繼續(xù)自旋,然后返回拿到t3線程put進來的元素:


在這里插入圖片描述

最終得到如下隊列:


在這里插入圖片描述

主要經(jīng)過如下步驟:

  • 1、將head節(jié)點中的item替換成當前元素1。
  • 2、喚醒t1線程。
  • 3、t1線程將head節(jié)點中的item指向當前自己的Node,并將waiter設置為null。
  • 4、t1拿到put進來的元素1返回。

線程t4過來put(2)

這時候主要流程和上面一樣,但是因為松弛度達到了2,所以會移動head節(jié)點指針,最終得到如下隊列:


在這里插入圖片描述

主要步驟為:

  • 1、循環(huán)head節(jié)點,發(fā)現(xiàn)head已經(jīng)被匹配過了(item=p)。
  • 2、繼續(xù)循環(huán)head.next,這時候發(fā)現(xiàn)可以匹配上,將head.next中的item設置為2。
  • 3、這時候因為松弛度達到2,會將head節(jié)點后移。
  • 4、將舊head節(jié)點的next指向當前自己的節(jié)點(forgetNext方法)。
  • 5、喚醒線程t2。
  • 6、t2線程將新head節(jié)點中的item指向當前自己的Node,并將waiter設置為null。
  • 7、線程t2拿到元素2返回

總結

本文主要講述了SynchronousQueue,LinkedTransferQueue兩種不加鎖的傳遞型隊列,因為不加鎖,所以期性能會高于其他五種加鎖的隊列

下一篇,將為大家介紹一下Java中提供的12個原子類操作。

?著作權歸作者所有,轉載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

友情鏈接更多精彩內(nèi)容