前言
前面我們介紹了ArrayBlockingQueue,LinkedBlockingQueue,LinkedBlockingDeque和 PriorityBlockingQueue,DelayQueue五種阻塞隊列,這一次就繼續(xù)介紹Java中提供的7種阻塞隊列中的最后兩種:SynchronousQueue和LinkedTransferQueue。
雙隊列
雙隊列是一個節(jié)點可以表示數(shù)據(jù)或者請求的隊列。即一個存在的節(jié)點可能表示put一個元素進去也可能是take()一個元素出來。
本文中敘述的兩種隊列SynchronousQueue和LinkedTransferQueue均是利用了雙隊列的特性:
- put(E)操作時,隊列中的節(jié)點代表一個元素,也就說表示的是數(shù)據(jù)
- take()操作時,如果隊列中無元素,會放一個null的項到隊列中占位,這時候表示的是一個請求,而不是一個數(shù)據(jù)。
SynchronousQueue
對于一個SynchronousQueue來說,每個插入操作都必須對應等待另一個線程的刪除操作,反之亦然。其沒有任何內(nèi)部容量,甚至連1都沒有。所以不能執(zhí)行peek()之類的方法獲取一個元素,因為一個元素只有在嘗試移除的時候才會出現(xiàn),也不能使用任何方法去插入一個元素,除非另一個線程正好在嘗試移除它,也不能去迭代,因為沒有任何東西可以迭代。
SynchronousQueue的head是第一個排隊插入線程試圖添加到隊列的元素,如果沒有這樣的排隊線程那么就沒有元素可以被移除,所以執(zhí)行poll()的時候就會返回null。
SynchronousQueue可以看成是一個傳球手,負責把生產(chǎn)者線程處理的數(shù)據(jù)直接傳遞給消費者線程。隊列本身并不存儲任何元素,非常適合傳遞性場景。
SynchronousQueue的吞吐量高于LinkedBlockingQueue和ArrayBlockingQueue,因為其內(nèi)部是通過CAS和自旋來實現(xiàn)并發(fā),沒有通過鎖來控制,減少了鎖的開銷。
先來看看類圖:
SynchronousQueue提供了兩個構造器,默認構造器是非公平策略,可以通過第二個構造器傳入?yún)?shù)true來構造一個公平策略:

可以看到,公平策略構造的是一個TransferQueue,而非公平構造的是一個TransferStack。
公平策略(TransferQueue)
TransferQueue是SynchronousQueue的一個內(nèi)部類,構造器如下:

TransferQueue內(nèi)部是通過QNode節(jié)點來維持一個隊列,QNode是TransferQueue的一個內(nèi)部類:
put(E)方法就是往隊列里面添加一個元素,并需要等待另一個線程來take(),如果沒有線程來取走,則put(E)線程會阻塞,反之也一樣。put(E)和take()方法必須要要成對出現(xiàn),否則就會一直阻塞。
從上面代碼可以看到,put(E)和take都是調(diào)用的同一個方法transfer,通過不同的參數(shù)來區(qū)分。
E transfer(E e, boolean timed, long nanos) {
QNode s = null; // constructed/reused as needed
boolean isData = (e != null);//e==null表示當前是消費者(take操作),e!=null表示當前是生產(chǎn)者(put操作)
for (;;) {
QNode t = tail;
QNode h = head;
//表示還沒有初始化(初始化之后不可能為空),繼續(xù)自旋
if (t == null || h == null) // saw uninitialized value
continue; // spin
//head=tail或者tail節(jié)點的模式和當前操作模式相同
if (h == t || t.isData == isData) { // empty or same-mode
QNode tn = t.next;//獲得當前tail節(jié)點的next節(jié)點
//t和tail不一致,說明有其他線程操作過,繼續(xù)自旋
if (t != tail) // inconsistent read
continue;
//tail.next正常是null,不為null說明其他線程新增了元素到tail.next
if (tn != null) { // lagging tail
advanceTail(t, tn);//嘗試幫助其他線程將tail.next設置為tail,然后繼續(xù)自旋
continue;
}
//如果當前調(diào)用的是超時方法,且到時間了,直接返回null
if (timed && nanos <= 0) // can't wait
return null;
if (s == null)//第一次進來肯定是null
//如果是put進來,e不為null,如果是take進來,e==null,也就是初始化了一個空節(jié)點
s = new QNode(e, isData);//初始化當前元素為QNode
//原tail節(jié)點為next
if (!t.casNext(null, s)) // failed to link in
continue;//替換tail節(jié)點的next節(jié)點為當前節(jié)點,失敗則繼續(xù)
//走到這里說明上面的CAS成功,需要將s設置為新的tail節(jié)點
//這里是一定會成功的,因為上面的cas完成之后,其他線程只能一直自旋等待
advanceTail(t, s); // swing tail and wait
//節(jié)點添加進去之后,阻塞等待,直接消費者線程來消費
Object x = awaitFulfill(s, e, timed, nanos);
//x==s表示已經(jīng)被取消
if (x == s) { // wait was cancelled
clean(t, s);//清除
return null;//直接返回null
}
if (!s.isOffList()) { // not already unlinked
advanceHead(t, s); // unlink if head
if (x != null) // and forget fields
s.item = s;
s.waiter = null;
}
return (x != null) ? (E)x : e;
} else {//互補模式 // complementary-mode
QNode m = h.next;//拿到tail.next // node to fulfill
//出現(xiàn)讀不一致的情況,繼續(xù)自旋
if (t != tail || m == null || h != head)
continue; // inconsistent read
Object x = m.item;//拿到節(jié)點中的item
//走到這里時的isData一定等于false,如果false=(x!=null)就說明x==null,元素已經(jīng)被其他元素拿走了,繼續(xù)自旋
if (isData == (x != null) || // m already fulfilled
x == m ||//說明已經(jīng)被取消了 // m cancelled
!m.casItem(x, e)) { // lost CAS
//上面任意一個條件失敗都說明當前已被其他線程取走了元素,所以幫助那個線程cas替換一下頭節(jié)點
advanceHead(h, m); // dequeue and retry
continue;
}
//走到這說明元素是被當前線程取走了,cas替換一下頭節(jié)點
advanceHead(h, m); // successfully fulfilled
LockSupport.unpark(m.waiter);//喚醒阻塞線程繼續(xù)傳遞元素
//x!=null說明是被當前線程獲得了元素,那么返回x,否則就是被其他線程拿走了,返回e
return (x != null) ? (E)x : e;
}
}
}
這個方法的看起來很長,其實是因為SynchronousQueue內(nèi)部不通過鎖來控制并發(fā),而是通過CAS和自旋來控制并發(fā),所以會有很多的if判斷。
根據(jù)上面的方法,主要可以分為兩種場景:一種是先put(E)再take(),另一種是先take()再put(E)。
初始化
初始化的時候調(diào)用上面的構造器TransferQueue(),默認得到一個哨兵節(jié)點,里面的元素是空的,這個isData就是說這個節(jié)點是不是一個有效數(shù)據(jù),只有item!=null才表示一個有效數(shù)據(jù):

先put(E)再take()
假如先put(E),因為沒有對用的take()操作,線程會被阻塞直到有take()出現(xiàn)。
線程t1過來put(1)
這時候h==t走的是第一個if分支,至少在第2次自旋的時候將元素1包裝成節(jié)點QNode之后假如隊列,并會進入方法awaitFulfill阻塞等待傳遞元素:
Object awaitFulfill(QNode s, E e, boolean timed, long nanos) {
/* Same idea as TransferStack.awaitFulfill */
final long deadline = timed ? System.nanoTime() + nanos : 0L;//獲得超時時間
Thread w = Thread.currentThread();//當前執(zhí)行的線程
int spins = ((head.next == s) ?
(timed ? maxTimedSpins : maxUntimedSpins) : 0);//獲得自旋次數(shù)
for (;;) {
if (w.isInterrupted())//如果線程被中斷了
s.tryCancel(e);//嘗試取消,注意這里取消后,會將s中的item和s替換。即s.item變成了s.QNode
Object x = s.item;//拿到當前節(jié)點的item
if (x != e)//不相等說明被取消或者值已經(jīng)被取走或者已經(jīng)有值放進來
return x;//直接返回自己
if (timed) {//如果當前方法是計時方法
nanos = deadline - System.nanoTime();//獲取剩余時間
if (nanos <= 0L) {//如果已經(jīng)到時間了
s.tryCancel(e);//嘗試取消
continue;
}
}
if (spins > 0)
--spins;//自旋次數(shù)>0,則減1后繼續(xù)自旋
else if (s.waiter == null)
s.waiter = w;
else if (!timed)//非計時方法
LockSupport.park(this);//如果當前方法不是帶超時時間的,則直接掛起直到喚醒
else if (nanos > spinForTimeoutThreshold)//如果到了自旋次數(shù),且還沒到指定的超時時間,就掛起指定的剩余時間
LockSupport.parkNanos(this, nanos);
}
}
因為調(diào)用的put(1)沒有帶超時時間,所以會被LockSupport.park(this)阻塞,這時候得到了如下隊列:

主要經(jīng)過如下5個步驟:
- 1、將元素初始化成為一個QNode。
- 2、將tail.next指向當前新構建的QNode(CAS操作)。
- 3、將新構建的QNode設置為tail節(jié)點(CAS操作)。
- 4、將當前QNode節(jié)點中的waiter屬性設置為當前線程(awaitFulfill方法)。
- 5、掛起前線程(awaitFulfill方法)。
線程t2過來put(2)
這時候h和t不相等了,但是isData都為true,所以t2過來的流程一樣,還是會走if分支,然后會繼續(xù)將元素2添加到隊尾,得到如下隊列:
注意,QNode還有一個屬性waiter,是用來記錄當前節(jié)點是哪個線程放進來的,因為后面當節(jié)點被take()走了之后,需要知道當前節(jié)點是由哪個線程放進來的,然后去喚醒對應線程。
我們可以看到,SynchronousQueue號稱是不存儲元素的,但是不存儲元素并不代表它內(nèi)部沒有隊列,內(nèi)部還是會有一個隊列的,只不過每個線程過來put(E)的時候,如果沒有對應的take()來匹配,那么線程就一直卡住了,也就是元素不會一直停留在隊列,而是會等待被轉移(transfer)。
線程t3過來take()
這時候來了一個線程t3過來take(),這時候因為h!=t,且take()的時候isData=false,和tail節(jié)點中的isData不一致了,會走else分支。
因為head節(jié)點是一個哨兵節(jié)點(空元素),而這又是公平模式,也就是必須滿足FIFO,所以會從head.next開始轉移元素。
最終得到如下最新的隊列:
主要經(jīng)過如下步驟:
- 1、將head.next中的item設置為null(CAS操作)。
- 2、將head.next設置為新的head節(jié)點(advanceHead方法)。
- 3、將原h(huán)ead節(jié)點的next指向自己(advanceHead方法)。
- 4、通過原節(jié)點的waiter屬性,將原先線程喚醒。
- 5、返回獲取到的元素。
注意,這里將元素1取走之后,原先的線程t1被喚醒,喚醒之后會在方法awaitFulfill繼續(xù)自旋,這時候執(zhí)行到if (x != e)條件的時候就會成立了,所以會返回x。然后回到transfer方法,將元素返回,t1線程結束。
線程t4過來take()
這時候的步驟和上面也是一樣,最終得到如下隊列:
回到了原始的初始化狀態(tài),只保留了一個哨兵節(jié)點。
先take()再put(E)
假如先take()進來,步驟和上面put(E)基本一致,唯一的區(qū)別就是take()會先搶占一個隊列的位置,將一個item==null的節(jié)點加入隊列。
線程t1過來take()
線程t1過來take()因為一開始h==t,還是會走的if邏輯,最終會得到如下隊列:

主要經(jīng)過如下步驟:
- 1、將一個null元素初始化成為一個QNode。
- 2、將tail.next指向當前新構建的QNode(CAS操作)。
- 3、將新構建的QNode設置為tail節(jié)點(CAS操作)。
- 4、將當前QNode節(jié)點中的waiter屬性設置為當前線程(awaitFulfill方法)。
- 5、掛起前線程(awaitFulfill方法)。
除了第1個步驟,其他都和首先進來put(E)的步驟一樣
線程t2過來put(1)
這時候因為if條件不滿足,會走else分支,先將元素1賦值到之前被線程t1占的位置,最終得到如下隊列:

主要經(jīng)過如下步驟:
- 1、將head.next中的item設置為1(CAS操作)。
- 2、將head.next設置為新的head節(jié)點(advanceHead方法)。
- 3、將原h(huán)ead節(jié)點的next指向自己(advanceHead方法)。
- 4、通過原節(jié)點的waiter屬性,將原先線程喚醒。
- 5、返回成功put進去到的元素。
接下來將原先的t1線程喚醒,t1線程喚醒之后會繼續(xù)將節(jié)點中的item設置為自己,然后返回拿到的元素:

非公平策略(TransferStack)
非公平策略是通過其內(nèi)部類TransferStack來實現(xiàn)的,思想基本和TransferQueue一致,唯一的區(qū)別就是TransferStack是非公平的,也就是LIFO模式,在這里就不詳細介紹了。
LinkedTransferQueue
LinkedTransferQueue是一個由鏈表結構組成的無界阻塞TransferQueue隊列。相對于其他阻塞隊列,LinkedTransferQueue多了tryTransfer和transfer方法。LinkedTransferQueue和SynchronousQueue中的公平策略使用的算法是一樣的,唯一的區(qū)別是SynchronousQueue內(nèi)部不會存儲哪怕1個元素,而LinkedTransferQueue內(nèi)部會存儲元素。
松弛度
正常隊列中,當移除一個元素的時候,就會同步移動head和tail節(jié)點的指針,為了最大程序的保證性能LinkedTransferQueue不會實時去更新head和tail的指針,而是引入了一個松弛度的概念。
松弛度指的是head值與第一個不匹配節(jié)點之間的目標最大距離,反之對tail也是如此。這個值這一般為1-3(根據(jù)經(jīng)驗得出),在LinkedTransferQueue中松弛度定義為2。因為如果太大了會增加緩存丟失的成本或者長遍歷鏈的風險,而較小的話就會增加CAS的開銷。
LinkedTransferQueue原理分析
LinkedTransferQueue內(nèi)部也是通過CAS和自旋來實現(xiàn)并發(fā)控制,所以也是一種效率比較高的隊列。
下面還是先來看看LinkedTransferQueue類圖:
相比較于其他阻塞隊列,多了一個TransferQueue接口,我們先來看看TransferQueue接口中核心的幾個方法:
| 方法 | 功能 |
|---|---|
| tryTransfer(e) | 傳遞一個元素給正在等待的消費者,如果沒有正在等待的消費者,則返回false |
| transfer(e) | 傳遞一個元素給正在等待的消費者,如果沒有正在等待的消費者,則阻塞等待 |
| tryTransfer(e,time,uint) | 傳遞一個元素給正在等待的消費者,如果沒有正在等待的消費者,則阻塞等待指定時間,過了超時時間之后,仍沒有消費者,則直接返回false |
| hasWaitingConsumer() | 至少有一個消費者正在等待接收元素則返回true |
| getWaitingConsumerCount() | 返回正在等待的消費者數(shù)量,返回的值是一個近似值,因為消費者可能很快就完成消費或者放棄等待 |
初始化
初始化的時候什么也不做,并不會在內(nèi)部構造一個初始節(jié)點,addAll()實際上也是循環(huán)調(diào)用了add(E)方法:
然后我們再看看其他方法,add,put,take,offer等,都是調(diào)用了一個共同的方法xfer,只不過通過不同參數(shù)來控制。
xfer方法
private E xfer(E e, boolean haveData, int how, long nanos) {
if (haveData && (e == null))//如果當前是put操作,且e==null,則拋出異常
throw new NullPointerException();
Node s = null; // the node to append, if needed
retry:
for (;;) { // restart on append race
//從head開始循環(huán)匹配
for (Node h = head, p = h; p != null;) { // find & match first node
boolean isData = p.isData;
Object item = p.item;
//如果元素還沒被匹配過,也就是還在隊列里
if (item != p && (item != null) == isData) { // unmatched
//如果相等,就說明是兩個相同操作,直接不用執(zhí)行后面了,互補操作才能往后走
if (isData == haveData) // can't match
break;
//將p中的item替換成e
if (p.casItem(item, e)) { // match
//假如有一個隊列是有元素的,第一次被take()的時候,q==h是進不了for循環(huán)的,所以會直接返回
//第2次進來會先匹配一次head節(jié)點,匹配不上,在匹配第2個節(jié)點,這就相當于松弛度=2了,所以
//這時候是滿足條件的,可以進入for循環(huán),
for (Node q = p; q != h;) {
Node n = q.next; // update by 2 unless singleton
//移動head指針
if (head == h && casHead(h, n == null ? q : n)) {
h.forgetNext();
break;
} // advance and retry
if ((h = head) == null ||
(q = h.next) == null || !q.isMatched())
break; // unless slack < 2
}
LockSupport.unpark(p.waiter);
return LinkedTransferQueue.<E>cast(item);
}
}
Node n = p.next;
p = (p != n) ? n : (h = head); // Use head if p offlist
}
if (how != NOW) { // No matches available
if (s == null)
s = new Node(e, haveData);//初始化節(jié)點
Node pred = tryAppend(s, haveData);
if (pred == null)
continue retry; // lost race vs opposite mode
if (how != ASYNC)//take()或者帶超時時間的方法會走這里
return awaitMatch(s, pred, e, (how == TIMED), nanos);
}
return e; // not waiting
}
}
這個方法也要分為兩種方式,先put(E)再take()和先take()再put(E)。
先put(E)再take()
put(E)操作不會進行阻塞,成功之后直接返回。
線程t1過來put(1)
因為head和tail都是null(一開始不會初始化隊列),所以上面的第2個for循環(huán)是進不去的,會走到后面這里初始化node,并加入到隊列中,最終得到如下隊列:

可以看到,這時候tail節(jié)點并沒有被初始化,這是因為利用了松弛度,松弛度要等于2才會移動tail指針(這是一種性能的優(yōu)化),我們看看tryAppend方法(主要是看紅框部分):
主要分為以下步驟:
- 1、初始化Node節(jié)點
- 2、將Node節(jié)點設為head節(jié)點
線程t2過來put(2)
線程t2再進來put的時候,因為滿足松弛度=2了,這時候就會移動tail指針,所以會得到如下隊列:
主要分為以下步驟:
- 1、初始化Node節(jié)點
- 2、將Node節(jié)點設為head.next節(jié)點
- 3、達到松弛度,將新Node設置為tail節(jié)點
后面如果再有元素過來添加,到第3個元素的時候,tail也是不會移動的,要第3個元素才會移動tail,這里就不再繼續(xù)舉例了。
線程t3過來take()
這時候來take會將head節(jié)點的元素設為null,然后直接返回,得到如下隊列:

這時候因為松弛度還沒達到2,不會移動head指針。
主要經(jīng)過如下步驟:
- 1、將head節(jié)點中的item設置為null。
- 2、返回獲取到的item,。
線程t4過來take()
這時候首先會循環(huán)head節(jié)點,發(fā)現(xiàn)不匹配,然后循環(huán)到head.next,得到如下隊列:

這里因為松弛度達到2,所以會移動head指針。
主要經(jīng)過如下步驟:
- 1、循環(huán)head節(jié)點,發(fā)現(xiàn)不匹配。
- 2、循環(huán)head.next,匹配上,將head.next中item設置為null。
- 3、移動head指針,指向head.next。
先take()再put(E)
先take()的時候,因為隊列中還沒有元素,所以會先自旋,自旋一定次數(shù)之后就阻塞,直到有元素put(E)進來然后喚醒線程。
線程t1過來take()
和上面第一次put(E)一樣,上面的第2個for循環(huán)進不去,會走到后面這里初始化node,并加入到隊列中,最終得到如下隊列:

主要經(jīng)過如下步驟:
- 1、初始化一個item=null的Node節(jié)點。
- 2、將Node節(jié)點設置為head節(jié)點。
- 3、自旋一定次數(shù)(awaitMatch方法)。
- 4、達到自旋次數(shù)后還沒有線程過來take(),執(zhí)行park掛起線程(awaitMatch方法)。
注意,上面的Node中也有一個waier屬性用來存儲線程信息,后面喚醒需要獲取waiter中的線程
線程t2過來take()
這時候因為松弛度達到2了,會移動tail指針,最終得到如下隊列:

線程t3過來put(1)
這時候因為前面take()的時候,隊列中已經(jīng)有了item=null的元素(node!=null),所以會直接進入第2個for循環(huán),然后將值替換進head節(jié)點中的item,最后喚醒t1線程,t1線程被喚醒之后,會繼續(xù)自旋,然后返回拿到t3線程put進來的元素:
最終得到如下隊列:

主要經(jīng)過如下步驟:
- 1、將head節(jié)點中的item替換成當前元素1。
- 2、喚醒t1線程。
- 3、t1線程將head節(jié)點中的item指向當前自己的Node,并將waiter設置為null。
- 4、t1拿到put進來的元素1返回。
線程t4過來put(2)
這時候主要流程和上面一樣,但是因為松弛度達到了2,所以會移動head節(jié)點指針,最終得到如下隊列:

主要步驟為:
- 1、循環(huán)head節(jié)點,發(fā)現(xiàn)head已經(jīng)被匹配過了(item=p)。
- 2、繼續(xù)循環(huán)head.next,這時候發(fā)現(xiàn)可以匹配上,將head.next中的item設置為2。
- 3、這時候因為松弛度達到2,會將head節(jié)點后移。
- 4、將舊head節(jié)點的next指向當前自己的節(jié)點(forgetNext方法)。
- 5、喚醒線程t2。
- 6、t2線程將新head節(jié)點中的item指向當前自己的Node,并將waiter設置為null。
- 7、線程t2拿到元素2返回
總結
本文主要講述了SynchronousQueue,LinkedTransferQueue兩種不加鎖的傳遞型隊列,因為不加鎖,所以期性能會高于其他五種加鎖的隊列
下一篇,將為大家介紹一下Java中提供的12個原子類操作。