原文出處http://cmsblogs.com/ 『chenssy』
【注】:SynchronousQueue實現(xiàn)算法看的暈乎乎的,寫了好久才寫完,如果當(dāng)中有什么錯誤之處,忘各位指正
作為BlockingQueue中的一員,SynchronousQueue與其他BlockingQueue有著不同特性:
- SynchronousQueue沒有容量。與其他BlockingQueue不同,SynchronousQueue是一個不存儲元素的BlockingQueue。每一個put操作必須要等待一個take操作,否則不能繼續(xù)添加元素,反之亦然。
- 因為沒有容量,所以對應(yīng) peek, contains, clear, isEmpty ... 等方法其實是無效的。例如clear是不執(zhí)行任何操作的,contains始終返回false,peek始終返回null。
- SynchronousQueue分為公平和非公平,默認(rèn)情況下采用非公平性訪問策略,當(dāng)然也可以通過構(gòu)造函數(shù)來設(shè)置為公平性訪問策略(為true即可)。
- 若使用 TransferQueue, 則隊列中永遠(yuǎn)會存在一個 dummy node(這點后面詳細(xì)闡述)。
SynchronousQueue非常適合做交換工作,生產(chǎn)者的線程和消費者的線程同步以傳遞某些信息、事件或者任務(wù)。
SynchronousQueue
與其他BlockingQueue一樣,SynchronousQueue同樣繼承AbstractQueue和實現(xiàn)BlockingQueue接口:
public class SynchronousQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable
SynchronousQueue提供了兩個構(gòu)造函數(shù):
public SynchronousQueue() {
this(false);
}
public SynchronousQueue(boolean fair) {
// 通過 fair 值來決定公平性和非公平性
// 公平性使用TransferQueue,非公平性采用TransferStack
transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
}
TransferQueue、TransferStack繼承Transferer,Transferer為SynchronousQueue的內(nèi)部類,它提供了一個方法transfer(),該方法定義了轉(zhuǎn)移數(shù)據(jù)的規(guī)范,如下:
abstract static class Transferer<E> {
abstract E transfer(E e, boolean timed, long nanos);
}
transfer()方法主要用來完成轉(zhuǎn)移數(shù)據(jù)的,如果e != null,相當(dāng)于將一個數(shù)據(jù)交給消費者,如果e == null,則相當(dāng)于從一個生產(chǎn)者接收一個消費者交出的數(shù)據(jù)。
SynchronousQueue采用隊列TransferQueue來實現(xiàn)公平性策略,采用堆棧TransferStack來實現(xiàn)非公平性策略,他們兩種都是通過鏈表實現(xiàn)的,其節(jié)點分別為QNode,SNode。TransferQueue和TransferStack在SynchronousQueue中扮演著非常重要的作用,SynchronousQueue的put、take操作都是委托這兩個類來實現(xiàn)的。
TransferQueue
TransferQueue是實現(xiàn)公平性策略的核心類,其節(jié)點為QNode,其定義如下:
static final class TransferQueue<E> extends Transferer<E> {
/** 頭節(jié)點 */
transient volatile QNode head;
/** 尾節(jié)點 */
transient volatile QNode tail;
// 指向一個取消的結(jié)點
//當(dāng)一個節(jié)點中最后一個插入時,它被取消了但是可能還沒有離開隊列
transient volatile QNode cleanMe;
/**
* 省略很多代碼O(∩_∩)O
*/
}
在TransferQueue中除了頭、尾節(jié)點外還存在一個cleanMe節(jié)點。該節(jié)點主要用于標(biāo)記,當(dāng)刪除的節(jié)點是尾節(jié)點時則需要使用該節(jié)點。
同時,對于TransferQueue需要注意的是,其隊列永遠(yuǎn)都存在一個dummy node,在構(gòu)造時創(chuàng)建:
TransferQueue() {
QNode h = new QNode(null, false); // initialize to dummy node.
head = h;
tail = h;
}
在TransferQueue中定義了QNode類來表示隊列中的節(jié)點,QNode節(jié)點定義如下:
static final class QNode {
// next 域
volatile QNode next;
// item數(shù)據(jù)項
volatile Object item;
// 等待線程,用于park/unpark
volatile Thread waiter; // to control park/unpark
//模式,表示當(dāng)前是數(shù)據(jù)還是請求,只有當(dāng)匹配的模式相匹配時才會交換
final boolean isData;
QNode(Object item, boolean isData) {
this.item = item;
this.isData = isData;
}
/**
* CAS next域,在TransferQueue中用于向next推進
*/
boolean casNext(QNode cmp, QNode val) {
return next == cmp &&
UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}
/**
* CAS itme數(shù)據(jù)項
*/
boolean casItem(Object cmp, Object val) {
return item == cmp &&
UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
}
/**
* 取消本結(jié)點,將item域設(shè)置為自身
*/
void tryCancel(Object cmp) {
UNSAFE.compareAndSwapObject(this, itemOffset, cmp, this);
}
/**
* 是否被取消
* 與tryCancel相照應(yīng)只需要判斷item釋放等于自身即可
*/
boolean isCancelled() {
return item == this;
}
boolean isOffList() {
return next == this;
}
private static final sun.misc.Unsafe UNSAFE;
private static final long itemOffset;
private static final long nextOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> k = QNode.class;
itemOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("item"));
nextOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("next"));
} catch (Exception e) {
throw new Error(e);
}
}
}
上面代碼沒啥好看的,需要注意的一點就是isData,該屬性在進行數(shù)據(jù)交換起到關(guān)鍵性作用,兩個線程進行數(shù)據(jù)交換的時候,必須要兩者的模式保持一致。
TransferStack
TransferStack用于實現(xiàn)非公平性,定義如下:
static final class TransferStack<E> extends Transferer<E> {
static final int REQUEST = 0;
static final int DATA = 1;
static final int FULFILLING = 2;
volatile SNode head;
/**
* 省略一堆代碼 O(∩_∩)O~
*/
}
TransferStack中定義了三個狀態(tài):REQUEST表示消費數(shù)據(jù)的消費者,DATA表示生產(chǎn)數(shù)據(jù)的生產(chǎn)者,F(xiàn)ULFILLING,表示匹配另一個生產(chǎn)者或消費者。任何線程對TransferStack的操作都屬于上述3種狀態(tài)中的一種(對應(yīng)著SNode節(jié)點的mode)。同時還包含一個head域,表示頭結(jié)點。
內(nèi)部節(jié)點SNode定義如下:
static final class SNode {
// next 域
volatile SNode next;
// 相匹配的節(jié)點
volatile SNode match;
// 等待的線程
volatile Thread waiter;
// item 域
Object item; // data; or null for REQUESTs
// 模型
int mode;
/**
* item域和mode域不需要使用volatile修飾,因為它們在volatile/atomic操作之前寫,之后讀
*/
SNode(Object item) {
this.item = item;
}
boolean casNext(SNode cmp, SNode val) {
return cmp == next &&
UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}
/**
* 將s結(jié)點與本結(jié)點進行匹配,匹配成功,則unpark等待線程
*/
boolean tryMatch(SNode s) {
if (match == null &&
UNSAFE.compareAndSwapObject(this, matchOffset, null, s)) {
Thread w = waiter;
if (w != null) { // waiters need at most one unpark
waiter = null;
LockSupport.unpark(w);
}
return true;
}
return match == s;
}
void tryCancel() {
UNSAFE.compareAndSwapObject(this, matchOffset, null, this);
}
boolean isCancelled() {
return match == this;
}
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long matchOffset;
private static final long nextOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> k = SNode.class;
matchOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("match"));
nextOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("next"));
} catch (Exception e) {
throw new Error(e);
}
}
}
上面簡單介紹了TransferQueue、TransferStack,由于SynchronousQueue的put、take操作都是調(diào)用Transfer的transfer()方法,只不過是傳遞的參數(shù)不同而已,put傳遞的是e參數(shù),所以模式為數(shù)據(jù)(公平isData = true,非公平mode= DATA),而take操作傳遞的是null,所以模式為請求(公平isData = false,非公平mode = REQUEST),如下:
// put操作
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
if (transferer.transfer(e, false, 0) == null) {
Thread.interrupted();
throw new InterruptedException();
}
}
// take操作
public E take() throws InterruptedException {
E e = transferer.transfer(null, false, 0);
if (e != null)
return e;
Thread.interrupted();
throw new InterruptedException();
}
公平模式
公平性調(diào)用TransferQueue的transfer方法:
E transfer(E e, boolean timed, long nanos) {
QNode s = null;
// 當(dāng)前節(jié)點模式
boolean isData = (e != null);
for (;;) {
QNode t = tail;
QNode h = head;
// 頭、尾節(jié)點 為null,沒有初始化
if (t == null || h == null)
continue;
// 頭尾節(jié)點相等(隊列為null) 或者當(dāng)前節(jié)點和隊列節(jié)點模式一樣
if (h == t || t.isData == isData) {
// tn = t.next
QNode tn = t.next;
// t != tail表示已有其他線程操作了,修改了tail,重新再來
if (t != tail)
continue;
// tn != null,表示已經(jīng)有其他線程添加了節(jié)點,tn 推進,重新處理
if (tn != null) {
// 當(dāng)前線程幫忙推進尾節(jié)點,就是嘗試將tn設(shè)置為尾節(jié)點
advanceTail(t, tn);
continue;
}
// 調(diào)用的方法的 wait 類型的, 并且 超時了, 直接返回 null
// timed 在take操作闡述
if (timed && nanos <= 0)
return null;
// s == null,構(gòu)建一個新節(jié)點Node
if (s == null)
s = new QNode(e, isData);
// 將新建的節(jié)點加入到隊列中,如果不成功,繼續(xù)處理
if (!t.casNext(null, s))
continue;
// 替換尾節(jié)點
advanceTail(t, s);
// 調(diào)用awaitFulfill, 若節(jié)點是 head.next, 則進行自旋
// 若不是的話, 直接 block, 直到有其他線程 與之匹配, 或它自己進行線程的中斷
Object x = awaitFulfill(s, e, timed, nanos);
// 若返回的x == s表示,當(dāng)前線程已經(jīng)超時或者中斷,不然的話s == null或者是匹配的節(jié)點
if (x == s) {
// 清理節(jié)點S
clean(t, s);
return null;
}
// isOffList:用于判斷節(jié)點是否已經(jīng)從隊列中離開了
if (!s.isOffList()) {
// 嘗試將S節(jié)點設(shè)置為head,移出t
advanceHead(t, s);
if (x != null)
s.item = s;
// 釋放線程 ref
s.waiter = null;
}
// 返回
return (x != null) ? (E)x : e;
}
// 這里是從head.next開始,因為TransferQueue總是會存在一個dummy node節(jié)點
else {
// 節(jié)點
QNode m = h.next;
// 不一致讀,重新開始
// 有其他線程更改了線程結(jié)構(gòu)
if (t != tail || m == null || h != head)
continue;
/**
* 生產(chǎn)者producer和消費者consumer匹配操作
*/
Object x = m.item;
// isData == (x != null):判斷isData與x的模式是否相同,相同表示已經(jīng)匹配了
// x == m :m節(jié)點被取消了
// !m.casItem(x, e):如果嘗試將數(shù)據(jù)e設(shè)置到m上失敗
if (isData == (x != null) || x == m || !m.casItem(x, e)) {
// 將m設(shè)置為頭結(jié)點,h出列,然后重試
advanceHead(h, m);
continue;
}
// 成功匹配了,m設(shè)置為頭結(jié)點h出列,向前推進
advanceHead(h, m);
// 喚醒m上的等待線程
LockSupport.unpark(m.waiter);
return (x != null) ? (E)x : e;
}
}
}
整個transfer的算法如下:
- 如果隊列為null或者尾節(jié)點模式與當(dāng)前節(jié)點模式一致,則嘗試將節(jié)點加入到等待隊列中(采用自旋的方式),直到被匹配或、超時或者取消。匹配成功的話要么返回null(producer返回的)要么返回真正傳遞的值(consumer返回的),如果返回的是node節(jié)點本身則表示當(dāng)前線程超時或者取消了。
- 如果隊列不為null,且隊列的節(jié)點是當(dāng)前節(jié)點匹配的節(jié)點,則進行數(shù)據(jù)的傳遞匹配并返回匹配節(jié)點的數(shù)據(jù)
- 在整個過程中都會檢測并幫助其他線程推進
當(dāng)隊列為空時,節(jié)點入列然后通過調(diào)用awaitFulfill()方法自旋,該方法主要用于自旋/阻塞節(jié)點,直到節(jié)點被匹配返回或者取消、中斷。
Object awaitFulfill(QNode s, E e, boolean timed, long nanos) {
// 超時控制
final long deadline = timed ? System.nanoTime() + nanos : 0L;
Thread w = Thread.currentThread();
// 自旋次數(shù)
// 如果節(jié)點Node恰好是head節(jié)點,則自旋一段時間,這里主要是為了效率問題,如果里面阻塞,會存在喚醒、線程上下文切換的問題
// 如果生產(chǎn)者、消費者者里面到來的話,就避免了這個阻塞的過程
int spins = ((head.next == s) ?
(timed ? maxTimedSpins : maxUntimedSpins) : 0);
// 自旋
for (;;) {
// 線程中斷了,剔除當(dāng)前節(jié)點
if (w.isInterrupted())
s.tryCancel(e);
// 如果線程進行了阻塞 -> 喚醒或者中斷了,那么x != e 肯定成立,直接返回當(dāng)前節(jié)點即可
Object x = s.item;
if (x != e)
return x;
// 超時判斷
if (timed) {
nanos = deadline - System.nanoTime();
// 如果超時了,取消節(jié)點,continue,在if(x != e)肯定會成立,直接返回x
if (nanos <= 0L) {
s.tryCancel(e);
continue;
}
}
// 自旋- 1
if (spins > 0)
--spins;
// 等待線程
else if (s.waiter == null)
s.waiter = w;
// 進行沒有超時的 park
else if (!timed)
LockSupport.park(this);
// 自旋次數(shù)過了, 直接 + timeout 方式 park
else if (nanos > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanos);
}
}
在自旋/阻塞過程中做了一點優(yōu)化,就是判斷當(dāng)前節(jié)點是否為對頭元素,如果是的則先自旋,如果自旋次數(shù)過了,則才阻塞,這樣做的主要目的就在如果生產(chǎn)者、消費者立馬來匹配了則不需要阻塞,因為阻塞、喚醒會消耗資源。在整個自旋的過程中會不斷判斷是否超時或者中斷了,如果中斷或者超時了則調(diào)用tryCancel()取消該節(jié)點。
tryCancel
void tryCancel(Object cmp) {
UNSAFE.compareAndSwapObject(this, itemOffset, cmp, this);
}
取消過程就是將節(jié)點的item設(shè)置為自身(itemOffset是item的偏移量)。所以在調(diào)用awaitFulfill()方法時,如果當(dāng)前線程被取消、中斷、超時了那么返回的值肯定時S,否則返回的則是匹配的節(jié)點。如果返回值是節(jié)點S,那么if(x == s)必定成立,如下:
Object x = awaitFulfill(s, e, timed, nanos);
if (x == s) { // wait was cancelled
clean(t, s);
return null;
}
如果返回的x == s成立,則調(diào)用clean()方法清理節(jié)點S:
void clean(QNode pred, QNode s) {
//
s.waiter = null;
while (pred.next == s) {
QNode h = head;
QNode hn = h.next;
// hn節(jié)點被取消了,向前推進
if (hn != null && hn.isCancelled()) {
advanceHead(h, hn);
continue;
}
// 隊列為空,直接return null
QNode t = tail;
if (t == h)
return;
QNode tn = t.next;
// 不一致,說明有其他線程改變了tail節(jié)點,重新開始
if (t != tail)
continue;
// tn != null 推進tail節(jié)點,重新開始
if (tn != null) {
advanceTail(t, tn);
continue;
}
// s 不是尾節(jié)點 移出
if (s != t) {
QNode sn = s.next;
// 如果s已經(jīng)被移除退出循環(huán),否則嘗試斷開s
if (sn == s || pred.casNext(s, sn))
return;
}
// s是尾節(jié)點,則有可能會有其他線程在添加新節(jié)點,則cleanMe出場
QNode dp = cleanMe;
// 如果dp不為null,說明是前一個被取消節(jié)點,將其移除
if (dp != null) {
QNode d = dp.next;
QNode dn;
if (d == null || // 節(jié)點d已經(jīng)刪除
d == dp || // 原來的節(jié)點 cleanMe 已經(jīng)通過 advanceHead 進行刪除
!d.isCancelled() || // 原來的節(jié)點 s已經(jīng)刪除
(d != t && // d 不是tail節(jié)點
(dn = d.next) != null && //
dn != d && // that is on list
dp.casNext(d, dn))) // d unspliced
// 清除 cleanMe 節(jié)點, 這里的 dp == pred 若成立, 說明清除節(jié)點s,成功, 直接return, 不然的話要再次循環(huán)
casCleanMe(dp, null);
if (dp == pred)
return;
} else if (casCleanMe(null, pred)) // 原來的 cleanMe 是 null, 則將 pred 標(biāo)記為 cleamMe 為下次 清除 s 節(jié)點做標(biāo)識
return;
}
}
這個clean()方法感覺有點兒難度,我也看得不是很懂。這里是引用http://www.itdecent.cn/p/95cb570c8187
刪除的節(jié)點不是queue尾節(jié)點, 這時 直接 pred.casNext(s, s.next) 方式來進行刪除(和ConcurrentLikedQueue中差不多)
刪除的節(jié)點是隊尾節(jié)點
- 此時 cleanMe == null, 則 前繼節(jié)點pred標(biāo)記為 cleanMe, 為下次刪除做準(zhǔn)備
- 此時 cleanMe != null, 先刪除上次需要刪除的節(jié)點, 然后將 cleanMe至null, 讓后再將 pred 賦值給 cleanMe
非公平模式
非公平模式transfer方法如下:
E transfer(E e, boolean timed, long nanos) {
SNode s = null; // constructed/reused as needed
int mode = (e == null) ? REQUEST : DATA;
for (;;) {
SNode h = head;
// 棧為空或者當(dāng)前節(jié)點模式與頭節(jié)點模式一樣,將節(jié)點壓入棧內(nèi),等待匹配
if (h == null || h.mode == mode) {
// 超時
if (timed && nanos <= 0) {
// 節(jié)點被取消了,向前推進
if (h != null && h.isCancelled())
// 重新設(shè)置頭結(jié)點(彈出之前的頭結(jié)點)
casHead(h, h.next);
else
return null;
}
// 不超時
// 生成一個SNode節(jié)點,并嘗試替換掉頭節(jié)點head (head -> s)
else if (casHead(h, s = snode(s, e, h, mode))) {
// 自旋,等待線程匹配
SNode m = awaitFulfill(s, timed, nanos);
// 返回的m == s 表示該節(jié)點被取消了或者超時、中斷了
if (m == s) {
// 清理節(jié)點S,return null
clean(s);
return null;
}
// 因為通過前面一步將S替換成了head,如果h.next == s ,則表示有其他節(jié)點插入到S前面了,變成了head
// 且該節(jié)點就是與節(jié)點S匹配的節(jié)點
if ((h = head) != null && h.next == s)
// 將s.next節(jié)點設(shè)置為head,相當(dāng)于取消節(jié)點h、s
casHead(h, s.next);
// 如果是請求則返回匹配的域,否則返回節(jié)點S的域
return (E) ((mode == REQUEST) ? m.item : s.item);
}
}
// 如果棧不為null,且兩者模式不匹配(h != null && h.mode != mode)
// 說明他們是一隊對等匹配的節(jié)點,嘗試用當(dāng)前節(jié)點s來滿足h節(jié)點
else if (!isFulfilling(h.mode)) {
// head 節(jié)點已經(jīng)取消了,向前推進
if (h.isCancelled())
casHead(h, h.next);
// 嘗試將當(dāng)前節(jié)點打上"正在匹配"的標(biāo)記,并設(shè)置為head
else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
// 循環(huán)loop
for (;;) {
// s為當(dāng)前節(jié)點,m是s的next節(jié)點,
// m節(jié)點是s節(jié)點的匹配節(jié)點
SNode m = s.next;
// m == null,其他節(jié)點把m節(jié)點匹配走了
if (m == null) {
// 將s彈出
casHead(s, null);
// 將s置空,下輪循環(huán)的時候還會新建
s = null;
// 退出該循環(huán),繼續(xù)主循環(huán)
break;
}
// 獲取m的next節(jié)點
SNode mn = m.next;
// 嘗試匹配
if (m.tryMatch(s)) {
// 匹配成功,將s 、 m彈出
casHead(s, mn); // pop both s and m
return (E) ((mode == REQUEST) ? m.item : s.item);
} else
// 如果沒有匹配成功,說明有其他線程已經(jīng)匹配了,把m移出
s.casNext(m, mn);
}
}
}
// 到這最后一步說明節(jié)點正在匹配階段
else {
// head 的next的節(jié)點,是正在匹配的節(jié)點,m 和 h配對
SNode m = h.next;
// m == null 其他線程把m節(jié)點搶走了,彈出h節(jié)點
if (m == null)
casHead(h, null);
else {
SNode mn = m.next;
if (m.tryMatch(h))
casHead(h, mn);
else
h.casNext(m, mn);
}
}
}
}
整個處理過程分為三種情況,具體如下:
- 如果當(dāng)前棧為空獲取節(jié)點模式與棧頂模式一樣,則嘗試將節(jié)點加入棧內(nèi),同時通過自旋方式等待節(jié)點匹配,最后返回匹配的節(jié)點或者null(被取消)
- 如果棧不為空且節(jié)點的模式與首節(jié)點模式匹配,則嘗試將該節(jié)點打上FULFILLING標(biāo)記,然后加入棧中,與相應(yīng)的節(jié)點匹配,成功后將這兩個節(jié)點彈出棧并返回匹配節(jié)點的數(shù)據(jù)
- 如果有節(jié)點在匹配,那么幫助這個節(jié)點完成匹配和出棧操作,然后在主循環(huán)中繼續(xù)執(zhí)行
當(dāng)節(jié)點加入棧內(nèi)后,通過調(diào)用awaitFulfill()方法自旋等待節(jié)點匹配:
SNode awaitFulfill(SNode s, boolean timed, long nanos) {
// 超時
final long deadline = timed ? System.nanoTime() + nanos : 0L;
// 當(dāng)前線程
Thread w = Thread.currentThread();
// 自旋次數(shù)
// shouldSpin 用于檢測當(dāng)前節(jié)點是否需要自旋
// 如果棧為空、該節(jié)點是首節(jié)點或者該節(jié)點是匹配節(jié)點,則先采用自旋,否則阻塞
int spins = (shouldSpin(s) ?
(timed ? maxTimedSpins : maxUntimedSpins) : 0);
for (;;) {
// 線程中斷了,取消該節(jié)點
if (w.isInterrupted())
s.tryCancel();
// 匹配節(jié)點
SNode m = s.match;
// 如果匹配節(jié)點m不為空,則表示匹配成功,直接返回
if (m != null)
return m;
// 超時
if (timed) {
nanos = deadline - System.nanoTime();
// 節(jié)點超時,取消
if (nanos <= 0L) {
s.tryCancel();
continue;
}
}
// 自旋;每次自旋的時候都需要檢查自身是否滿足自旋條件,滿足就 - 1,否則為0
if (spins > 0)
spins = shouldSpin(s) ? (spins-1) : 0;
// 第一次阻塞時,會將當(dāng)前線程設(shè)置到s上
else if (s.waiter == null)
s.waiter = w;
// 阻塞 當(dāng)前線程
else if (!timed)
LockSupport.park(this);
// 超時
else if (nanos > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanos);
}
}
awaitFulfill()方法會一直自旋/阻塞直到匹配節(jié)點。在S節(jié)點阻塞之前會先調(diào)用shouldSpin()方法判斷是否采用自旋方式,為的就是如果有生產(chǎn)者或者消費者馬上到來,就不需要阻塞了,在多核條件下這種優(yōu)化是有必要的。同時在調(diào)用park()阻塞之前會將當(dāng)前線程設(shè)置到S節(jié)點的waiter上。匹配成功,返回匹配節(jié)點m。
shouldSpin()方法如下:
boolean shouldSpin(SNode s) {
SNode h = head;
return (h == s || h == null || isFulfilling(h.mode));
}
同時在阻塞過程中會一直檢測當(dāng)前線程是否中斷了,如果中斷了,則調(diào)用tryCancel()方法取消該節(jié)點,取消過程就是將當(dāng)前節(jié)點的math設(shè)置為當(dāng)前節(jié)點。所以如果線程中斷了,那么在返回m時一定是S節(jié)點自身。
void tryCancel() {
UNSAFE.compareAndSwapObject(this, matchOffset, null, this);
}
awaitFullfill()方法如果返回的m == s,則表示當(dāng)前節(jié)點已經(jīng)中斷取消了,則需要調(diào)用clean()方法,清理節(jié)點S:
void clean(SNode s) {
// 清理item域
s.item = null;
// 清理waiter域
s.waiter = null;
// past節(jié)點
SNode past = s.next;
if (past != null && past.isCancelled())
past = past.next;
// 從棧頂head節(jié)點,取消從棧頂head到past節(jié)點之間所有已經(jīng)取消的節(jié)點
// 注意:這里如果遇到一個節(jié)點沒有取消,則會退出while
SNode p;
while ((p = head) != null && p != past && p.isCancelled())
casHead(p, p.next); // 如果p節(jié)點已經(jīng)取消了,則剔除該節(jié)點
// 如果經(jīng)歷上面while p節(jié)點還沒有取消,則再次循環(huán)取消掉所有p 到past之間的取消節(jié)點
while (p != null && p != past) {
SNode n = p.next;
if (n != null && n.isCancelled())
p.casNext(n, n.next);
else
p = n;
}
}
clean()方法就是將head節(jié)點到S節(jié)點之間所有已經(jīng)取消的節(jié)點全部移出?!静磺宄楹我脙蓚€while,一個不行么】
至此,SynchronousQueue的源碼分析完成了,說下我個人感覺吧:個人感覺SynchronousQueue實現(xiàn)好復(fù)雜(可能是自己智商不夠吧~~~(>_<)~~~),源碼看了好久,這篇博客寫了將近一個星期,如果有什么錯誤之處,煩請各位指正?。?/strong>