1 ConcurrentLinkedQueue
1.1 引言
在并發(fā)編程中我們有時候需要使用線程安全的隊列。如果我們要實(shí)現(xiàn)一個線程安全的隊列有兩種實(shí)現(xiàn)方式一種是使用阻塞算法,另一種是使用非阻塞算法。使用阻塞算法的隊列可以用一個鎖(入隊和出隊用同一把鎖)或兩個鎖(入隊和出隊用不同的鎖)等方式來實(shí)現(xiàn),而非阻塞的實(shí)現(xiàn)方式則可以使用循環(huán)CAS的方式來實(shí)現(xiàn),本文讓我們一起來研究下Doug Lea是如何使用非阻塞的方式來實(shí)現(xiàn)線程安全隊列ConcurrentLinkedQueue的
1.2 ConcurrentLinkedQueue的介紹
ConcurrentLinkedQueue是一個基于鏈接節(jié)點(diǎn)的無界線程安全隊列,它采用先進(jìn)先出的規(guī)則對節(jié)點(diǎn)進(jìn)行排序,當(dāng)我們添加一個元素的時候,它會添加到隊列的尾部,當(dāng)我們獲取一個元素時,它會返回隊列頭部的元素。它采用了wait-free算法來實(shí)現(xiàn)
1.3 ConcurrentLinkedQueue的結(jié)構(gòu)
我們通過ConcurrentLinkedQueue的類圖來分析一下它的結(jié)構(gòu)。
ConcurrentLinkedQueue由head節(jié)點(diǎn)和tair節(jié)點(diǎn)組成,每個節(jié)點(diǎn)(Node)由節(jié)點(diǎn)元素(item)和指向下一個節(jié)點(diǎn)的引用(next)組成,節(jié)點(diǎn)與節(jié)點(diǎn)之間就是通過這個next關(guān)聯(lián)起來,從而組成一張鏈表結(jié)構(gòu)的隊列。默認(rèn)情況下head節(jié)點(diǎn)存儲的元素為空,tair節(jié)點(diǎn)等于head節(jié)點(diǎn)。
private transient volatile Node<e> tail = head;
1.4 入隊列
入隊列就是將入隊節(jié)點(diǎn)添加到隊列的尾部
第一步添加元素1。隊列更新head節(jié)點(diǎn)的next節(jié)點(diǎn)為元素1節(jié)點(diǎn)。又因?yàn)?code>tail節(jié)點(diǎn)默認(rèn)情況下等于head節(jié)點(diǎn),所以它們的next節(jié)點(diǎn)都指向元素1節(jié)點(diǎn)。
第二步添加元素2。隊列首先設(shè)置元素1節(jié)點(diǎn)的next節(jié)點(diǎn)為元素2節(jié)點(diǎn),然后更新tail節(jié)點(diǎn)指向元素2節(jié)點(diǎn)。
第三步添加元素3,設(shè)置tail節(jié)點(diǎn)的next節(jié)點(diǎn)為元素3節(jié)點(diǎn)。
第四步添加元素4,設(shè)置元素3的next節(jié)點(diǎn)為元素4節(jié)點(diǎn),然后將tail節(jié)點(diǎn)指向元素4節(jié)點(diǎn)。
通過debug入隊過程并觀察head節(jié)點(diǎn)和tail節(jié)點(diǎn)的變化,發(fā)現(xiàn)入隊主要做兩件事情,第一是將入隊節(jié)點(diǎn)設(shè)置成當(dāng)前隊列尾節(jié)點(diǎn)的下一個節(jié)點(diǎn)。第二是更新tail節(jié)點(diǎn),如果tail節(jié)點(diǎn)的next節(jié)點(diǎn)不為空,則將入隊節(jié)點(diǎn)設(shè)置成tail節(jié)點(diǎn),如果tail節(jié)點(diǎn)的next節(jié)點(diǎn)為空,則將入隊節(jié)點(diǎn)設(shè)置成tail的next節(jié)點(diǎn),所以tail節(jié)點(diǎn)不總是尾節(jié)點(diǎn),理解這一點(diǎn)對于我們研究源碼會非常有幫助。
上面的分析讓我們從單線程入隊的角度來理解入隊過程,但是多個線程同時進(jìn)行入隊情況就變得更加復(fù)雜,因?yàn)榭赡軙霈F(xiàn)其他線程插隊的情況。如果有一個線程正在入隊,那么它必須先獲取尾節(jié)點(diǎn),然后設(shè)置尾節(jié)點(diǎn)的下一個節(jié)點(diǎn)為入隊節(jié)點(diǎn),但這時可能有另外一個線程插隊了,那么隊列的尾節(jié)點(diǎn)就會發(fā)生變化,這時當(dāng)前線程要暫停入隊操作,然后重新獲取尾節(jié)點(diǎn)。讓我們再通過源碼來詳細(xì)分析下它是如何使用CAS算法來入隊的。
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
//入隊前,創(chuàng)建一個入隊節(jié)點(diǎn)
Node</e><e> n = new Node</e><e>(e);
retry:
//死循環(huán),入隊不成功反復(fù)入隊。
for (;;) {
//創(chuàng)建一個指向tail節(jié)點(diǎn)的引用
Node</e><e> t = tail;
//p用來表示隊列的尾節(jié)點(diǎn),默認(rèn)情況下等于tail節(jié)點(diǎn)。
Node</e><e> p = t;
for (int hops = 0; ; hops++) {
//獲得p節(jié)點(diǎn)的下一個節(jié)點(diǎn)。
Node</e><e> next = succ(p);
//next節(jié)點(diǎn)不為空,說明p不是尾節(jié)點(diǎn),需要更新p后在將它指向next節(jié)點(diǎn)
if (next != null) {
//循環(huán)了兩次及其以上,并且當(dāng)前節(jié)點(diǎn)還是不等于尾節(jié)點(diǎn)
if (hops > HOPS && t != tail)
continue retry;
p = next;
}
//如果p是尾節(jié)點(diǎn),則設(shè)置p節(jié)點(diǎn)的next節(jié)點(diǎn)為入隊節(jié)點(diǎn)。
else if (p.casNext(null, n)) {
//如果tail節(jié)點(diǎn)有大于等于1個next節(jié)點(diǎn),則將入隊節(jié)點(diǎn)設(shè)置成tair節(jié)點(diǎn),更新失敗了也沒關(guān)系,因?yàn)槭×吮硎居衅渌€程成功更新了tair節(jié)點(diǎn)。
if (hops >= HOPS)
casTail(t, n); // 更新tail節(jié)點(diǎn),允許失敗
return true;
}
// p有next節(jié)點(diǎn),表示p的next節(jié)點(diǎn)是尾節(jié)點(diǎn),則重新設(shè)置p節(jié)點(diǎn)
else {
p = succ(p);
}
}
}
}
從源代碼角度來看整個入隊過程主要做二件事情。第一是定位出尾節(jié)點(diǎn),第二是使用CAS算法能將入隊節(jié)點(diǎn)設(shè)置成尾節(jié)點(diǎn)的next節(jié)點(diǎn),如不成功則重試
第一步定位尾節(jié)點(diǎn)。
tail節(jié)點(diǎn)并不總是尾節(jié)點(diǎn),所以每次入隊都必須先通過tail節(jié)點(diǎn)來找到尾節(jié)點(diǎn),尾節(jié)點(diǎn)可能就是tail節(jié)點(diǎn),也可能是tail節(jié)點(diǎn)的next節(jié)點(diǎn)。代碼中循環(huán)體中的第一個if就是判斷tail是否有next節(jié)點(diǎn),有則表示next節(jié)點(diǎn)可能是尾節(jié)點(diǎn)。獲取tail節(jié)點(diǎn)的next節(jié)點(diǎn)需要注意的是p節(jié)點(diǎn)等于p的next節(jié)點(diǎn)的情況,只有一種可能就是p節(jié)點(diǎn)和p的next節(jié)點(diǎn)都等于空,表示這個隊列剛初始化,正準(zhǔn)備添加第一次節(jié)點(diǎn),所以需要返回head節(jié)點(diǎn)。獲取p節(jié)點(diǎn)的next節(jié)點(diǎn)代碼如下
final Node</e><e> succ(Node</e><e> p) {
Node</e><e> next = p.getNext();
return (p == next) ? head : next;
}
第二步設(shè)置入隊節(jié)點(diǎn)為尾節(jié)點(diǎn)。
p.casNext(null, n)方法用于將入隊節(jié)點(diǎn)設(shè)置為當(dāng)前隊列尾節(jié)點(diǎn)的next節(jié)點(diǎn),p如果是null表示p是當(dāng)前隊列的尾節(jié)點(diǎn),如果不為null表示有其他線程更新了尾節(jié)點(diǎn),則需要重新獲取當(dāng)前隊列的尾節(jié)點(diǎn)。
hops的設(shè)計意圖。上面分析過對于先進(jìn)先出的隊列入隊所要做的事情就是將入隊節(jié)點(diǎn)設(shè)置成尾節(jié)點(diǎn)
public boolean offer(E e) {
if (e == null)
throw new NullPointerException();
Node</e><e> n = new Node</e><e>(e);
for (;;) {
Node</e><e> t = tail;
if (t.casNext(null, n) && casTail(t, n)) {
return true;
}
}
}
讓tail節(jié)點(diǎn)永遠(yuǎn)作為隊列的尾節(jié)點(diǎn),這樣實(shí)現(xiàn)代碼量非常少,而且邏輯非常清楚和易懂。但是這么做有個缺點(diǎn)就是每次都需要使用循環(huán)CAS更新tail節(jié)點(diǎn)。如果能減少CAS更新tail節(jié)點(diǎn)的次數(shù),就能提高入隊的效率,所以doug lea使用hops變量來控制并減少tail節(jié)點(diǎn)的更新頻率,并不是每次節(jié)點(diǎn)入隊后都將 tail節(jié)點(diǎn)更新成尾節(jié)點(diǎn),而是當(dāng) tail節(jié)點(diǎn)和尾節(jié)點(diǎn)的距離大于等于常量HOPS的值(默認(rèn)等于1)時才更新tail節(jié)點(diǎn),tail和尾節(jié)點(diǎn)的距離越長使用CAS更新tail節(jié)點(diǎn)的次數(shù)就會越少,但是距離越長帶來的負(fù)面效果就是每次入隊時定位尾節(jié)點(diǎn)的時間就越長,因?yàn)檠h(huán)體需要多循環(huán)一次來定位出尾節(jié)點(diǎn),但是這樣仍然能提高入隊的效率,因?yàn)閺谋举|(zhì)上來看它通過增加對volatile變量的讀操作來減少了對volatile變量的寫操作,而對volatile變量的寫操作開銷要遠(yuǎn)遠(yuǎn)大于讀操作,所以入隊效率會有所提升。
private static final int HOPS = 1;
還有一點(diǎn)需要注意的是入隊方法永遠(yuǎn)返回true,所以不要通過返回值判斷入隊是否成功。
1.5 出隊列
出隊列的就是從隊列里返回一個節(jié)點(diǎn)元素,并清空該節(jié)點(diǎn)對元素的引用。讓我們通過每個節(jié)點(diǎn)出隊的快照來觀察下head節(jié)點(diǎn)的變化。
出隊列時并不是每次出隊時都更新head節(jié)點(diǎn),當(dāng)head節(jié)點(diǎn)里有元素時,直接彈出head節(jié)點(diǎn)里的元素,而不會更新head節(jié)點(diǎn)。只有當(dāng)head節(jié)點(diǎn)里沒有元素時,出隊操作才會更新head節(jié)點(diǎn)。這種做法也是通過hops變量來減少使用CAS更新head節(jié)點(diǎn)的消耗,從而提高出隊效率。讓我們再通過源碼來深入分析下出隊過程。
public E poll() {
Node</e><e> h = head;
// p表示頭節(jié)點(diǎn),需要出隊的節(jié)點(diǎn)
Node</e><e> p = h;
for (int hops = 0;; hops++) {
// 獲取p節(jié)點(diǎn)的元素
E item = p.getItem();
// 如果p節(jié)點(diǎn)的元素不為空,使用CAS設(shè)置p節(jié)點(diǎn)引用的元素為null,如果成功則返回p節(jié)點(diǎn)的元素。
if (item != null && p.casItem(item, null)) {
if (hops >= HOPS) {
//將p節(jié)點(diǎn)下一個節(jié)點(diǎn)設(shè)置成head節(jié)點(diǎn)
Node</e><e> q = p.getNext();
updateHead(h, (q != null) ? q : p);
}
return item;
}
// 如果頭節(jié)點(diǎn)的元素為空或頭節(jié)點(diǎn)發(fā)生了變化,這說明頭節(jié)點(diǎn)已經(jīng)被另外一個線程修改了。那么獲取p節(jié)點(diǎn)的下一個節(jié)點(diǎn)
Node</e><e> next = succ(p);
// 如果p的下一個節(jié)點(diǎn)也為空,說明這個隊列已經(jīng)空了
if (next == null) {
// 更新頭節(jié)點(diǎn)。
updateHead(h, p);
break;
}
// 如果下一個元素不為空,則將頭節(jié)點(diǎn)的下一個節(jié)點(diǎn)設(shè)置成頭節(jié)點(diǎn)
p = next;
}
return null;
}
首先獲取頭節(jié)點(diǎn)的元素,然后判斷頭節(jié)點(diǎn)元素是否為空,如果為空,表示另外一個線程已經(jīng)進(jìn)行了一次出隊操作將該節(jié)點(diǎn)的元素取走,如果不為空,則使用CAS的方式將頭節(jié)點(diǎn)的引用設(shè)置成null,如果CAS成功,則直接返回頭節(jié)點(diǎn)的元素,如果不成功,表示另外一個線程已經(jīng)進(jìn)行了一次出隊操作更新了head節(jié)點(diǎn),導(dǎo)致元素發(fā)生了變化,需要重新獲取頭節(jié)點(diǎn)。
2 阻塞隊列
2.1 什么是阻塞隊列
阻塞隊列(BlockingQueue)是一個支持兩個附加操作的隊列。這兩個附加的操作是:在隊列為空時,獲取元素的線程會等待隊列變?yōu)榉强铡.?dāng)隊列滿時,存儲元素的線程會等待隊列可用。阻塞隊列常用于生產(chǎn)者和消費(fèi)者的場景,生產(chǎn)者是往隊列里添加元素的線程,消費(fèi)者是從隊列里拿元素的線程。阻塞隊列就是生產(chǎn)者存放元素的容器,而消費(fèi)者也只從容器里拿元素。
阻塞隊列提供了四種處理方法:
| 方法\處理方式 | 拋出異常 | 返回特殊值 | 一直阻塞 | 超時退出 |
|---|---|---|---|---|
| 插入方法 | add(e) | offer(e) | put(e) | offer(e,time,unit) |
| 移除方法 | remove() | poll() | take() | poll(time,unit) |
| 檢查方法 | element() | peek() | 不可用 | 不可用 |
-
拋出異常:是指當(dāng)阻塞隊列滿時候,再往隊列里插入元素,會拋出IllegalStateException(“Queue full”)異常。當(dāng)隊列為空時,從隊列里獲取元素時會拋出NoSuchElementException異常 。 -
返回特殊值:插入方法會返回是否成功,成功則返回true。移除方法,則是從隊列里拿出一個元素,如果沒有則返回null -
一直阻塞:當(dāng)阻塞隊列滿時,如果生產(chǎn)者線程往隊列里put元素,隊列會一直阻塞生產(chǎn)者線程,直到拿到數(shù)據(jù),或者響應(yīng)中斷退出。當(dāng)隊列空時,消費(fèi)者線程試圖從隊列里take元素,隊列也會阻塞消費(fèi)者線程,直到隊列可用。 -
超時退出:當(dāng)阻塞隊列滿時,隊列會阻塞生產(chǎn)者線程一段時間,如果超過一定的時間,生產(chǎn)者線程就會退出。
2.2 Java里的阻塞隊列
JDK7提供了7個阻塞隊列。分別是
-
ArrayBlockingQueue:一個由數(shù)組結(jié)構(gòu)組成的有界阻塞隊列。 -
LinkedBlockingQueue:一個由鏈表結(jié)構(gòu)組成的有界阻塞隊列。 -
PriorityBlockingQueue:一個支持優(yōu)先級排序的無界阻塞隊列。 -
DelayQueue:一個使用優(yōu)先級隊列實(shí)現(xiàn)的無界阻塞隊列。 -
SynchronousQueue:一個不存儲元素的阻塞隊列。 -
LinkedTransferQueue:一個由鏈表結(jié)構(gòu)組成的無界阻塞隊列。 -
LinkedBlockingDeque:一個由鏈表結(jié)構(gòu)組成的雙向阻塞隊列。
2.2.1 ArrayBlockingQueue
ArrayBlockingQueue是一個用數(shù)組實(shí)現(xiàn)的有界阻塞隊列。此隊列按照先進(jìn)先出(FIFO)的原則對元素進(jìn)行排序。默認(rèn)情況下不保證訪問者公平的訪問隊列,所謂公平訪問隊列是指阻塞的所有生產(chǎn)者線程或消費(fèi)者線程,當(dāng)隊列可用時,可以按照阻塞的先后順序訪問隊列,即先阻塞的生產(chǎn)者線程,可以先往隊列里插入元素,先阻塞的消費(fèi)者線程,可以先從隊列里獲取元素。通常情況下為了保證公平性會降低吞吐量。我們可以使用以下代碼創(chuàng)建一個公平的阻塞隊列:
ArrayBlockingQueue fairQueue = new ArrayBlockingQueue(1000,true);
訪問者的公平性是使用可重入鎖實(shí)現(xiàn)的,代碼如下:
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
2.2.2 LinkedBlockingQueue
LinkedBlockingQueue是一個用鏈表實(shí)現(xiàn)的有界阻塞隊列。此隊列的默認(rèn)和最大長度為Integer.MAX_VALUE。此隊列按照先進(jìn)先出的原則對元素進(jìn)行排序。
而 LinkedBlockingQueue 之所以能夠高效的處理并發(fā)數(shù)據(jù),還因?yàn)槠鋵τ谏a(chǎn)者端和消費(fèi)者端分別采用了獨(dú)立的鎖來控制數(shù)據(jù)同步,這也意味著在高并發(fā)的情況下生產(chǎn)者和消費(fèi)者可以并行地操作隊列中的數(shù)據(jù),以此來提高整個隊列的并發(fā)性能
2.2.3 PriorityBlockingQueue
PriorityBlockingQueue是一個支持優(yōu)先級的無界隊列。默認(rèn)情況下元素采取自然順序排列,也可以通過比較器comparator來指定元素的排序規(guī)則。元素按照升序排列。
2.2.4 DelayQueue
DelayQueue是一個支持延時獲取元素的無界阻塞隊列。隊列使用PriorityQueue來實(shí)現(xiàn)。隊列中的元素必須實(shí)現(xiàn)Delayed接口,在創(chuàng)建元素時可以指定多久才能從隊列中獲取當(dāng)前元素。只有在延遲期滿時才能從隊列中提取元素。我們可以將DelayQueue運(yùn)用在以下應(yīng)用場景:
-
緩存系統(tǒng)的設(shè)計:可以用DelayQueue保存緩存元素的有效期,使用一個線程循環(huán)查詢DelayQueue,一旦能從DelayQueue中獲取元素時,表示緩存有效期到了。 -
定時任務(wù)調(diào)度。使用DelayQueue保存當(dāng)天將會執(zhí)行的任務(wù)和執(zhí)行時間,一旦從DelayQueue中獲取到任務(wù)就開始執(zhí)行,從比如TimerQueue就是使用DelayQueue實(shí)現(xiàn)的。
隊列中的Delayed必須實(shí)現(xiàn)compareTo來指定元素的順序。比如讓延時時間最長的放在隊列的末尾。實(shí)現(xiàn)代碼如下:
public int compareTo(Delayed other) {
if (other == this) // compare zero ONLY if same object
return 0;
if (other instanceof ScheduledFutureTask) {
ScheduledFutureTask x = (ScheduledFutureTask)other;
long diff = time - x.time;
if (diff < 0)
return -1;
else if (diff > 0)
return 1;
else if (sequenceNumber < x.sequenceNumber)
return -1;
else
return 1;
}
long d = (getDelay(TimeUnit.NANOSECONDS) -
other.getDelay(TimeUnit.NANOSECONDS));
return (d == 0) ? 0 : ((d < 0) ? -1 : 1);
}
那么如何實(shí)現(xiàn)Delayed接口
我們可以參考ScheduledThreadPoolExecutor里ScheduledFutureTask類。這個類實(shí)現(xiàn)了Delayed接口。首先:在對象創(chuàng)建的時候,使用time記錄前對象什么時候可以使用,代碼如下:
ScheduledFutureTask(Runnable r, V result, long ns, long period) {
super(r, result);
this.time = ns;
this.period = period;
this.sequenceNumber = sequencer.getAndIncrement();
}
然后使用getDelay可以查詢當(dāng)前元素還需要延時多久,代碼如下:
public long getDelay(TimeUnit unit) {
return unit.convert(time - now(), TimeUnit.NANOSECONDS);
}
通過構(gòu)造函數(shù)可以看出延遲時間參數(shù)ns的單位是納秒,自己設(shè)計的時候最好使用納秒,因?yàn)?code>getDelay是可以指定任意單位,一旦以納秒作為單位,而延時的時間又精確不到納秒就麻煩了。使用時請注意當(dāng)time小于當(dāng)前時間時,getDelay會返回負(fù)數(shù)。
如何實(shí)現(xiàn)延時隊列
延時隊列的實(shí)現(xiàn)很簡單,當(dāng)消費(fèi)者從隊列里獲取元素時,如果元素沒有達(dá)到延時時間,就阻塞當(dāng)前線程。
long delay = first.getDelay(TimeUnit.NANOSECONDS);
if (delay <= 0)
return q.poll();
else if (leader != null)
available.await();
2.2.5 SynchronousQueue
SynchronousQueue是一個不存儲元素的阻塞隊列。每一個put操作必須等待一個take操作,否則不能繼續(xù)添加元素。SynchronousQueue可以看成是一個傳球手,負(fù)責(zé)把生產(chǎn)者線程處理的數(shù)據(jù)直接傳遞給消費(fèi)者線程。隊列本身并不存儲任何元素,非常適合于傳遞性場景,比如在一個線程中使用的數(shù)據(jù),傳遞給另外一個線程使用,SynchronousQueue的吞吐量高于LinkedBlockingQueue 和 ArrayBlockingQueue
2.2.6 LinkedTransferQueue
LinkedTransferQueue是一個由鏈表結(jié)構(gòu)組成的無界阻塞TransferQueue隊列。相對于其他阻塞隊列LinkedTransferQueue多了tryTransfer和transfer方法。
-
transfer方法:如果當(dāng)前有消費(fèi)者正在等待接收元素(消費(fèi)者使用take()方法或帶時間限制的poll()方法時),transfer方法可以把生產(chǎn)者傳入的元素立刻transfer(傳輸)給消費(fèi)者。如果沒有消費(fèi)者在等待接收元素,transfer方法會將元素存放在隊列的tail節(jié)點(diǎn),并等到該元素被消費(fèi)者消費(fèi)了才返回。transfer方法的關(guān)鍵代碼如下:
Node pred = tryAppend(s, haveData);
return awaitMatch(s, pred, e, (how == TIMED), nanos);
第一行代碼是試圖把存放當(dāng)前元素的s節(jié)點(diǎn)作為tail節(jié)點(diǎn)。
第二行代碼是讓CPU自旋等待消費(fèi)者消費(fèi)元素。因?yàn)樽孕龝?code>CPU,所以自旋一定的次數(shù)后使用Thread.yield()方法來暫停當(dāng)前正在執(zhí)行的線程,并執(zhí)行其他線程。
-
tryTransfer方法:則是用來試探下生產(chǎn)者傳入的元素是否能直接傳給消費(fèi)者。如果沒有消費(fèi)者等待接收元素,則返回false。和transfer方法的區(qū)別是tryTransfer方法無論消費(fèi)者是否接收,方法立即返回。而transfer方法是必須等到消費(fèi)者消費(fèi)了才返回。
對于帶有時間限制的tryTransfer(E e, long timeout, TimeUnit unit)方法,則是試圖把生產(chǎn)者傳入的元素直接傳給消費(fèi)者,但是如果沒有消費(fèi)者消費(fèi)該元素則等待指定的時間再返回,如果超時還沒消費(fèi)元素,則返回false,如果在超時時間內(nèi)消費(fèi)了元素,則返回true。
2.2.7 LinkedBlockingDeque
LinkedBlockingDeque是一個由鏈表結(jié)構(gòu)組成的雙向阻塞隊列。所謂雙向隊列指的你可以從隊列的兩端插入和移出元素。雙端隊列因?yàn)槎嗔艘粋€操作隊列的入口,在多線程同時入隊時,也就減少了一半的競爭。相比其他的阻塞隊列,LinkedBlockingDeque多了addFirst,addLast,offerFirst,offerLast,peekFirst,peekLast等方法,以First單詞結(jié)尾的方法,表示插入,獲?。╬eek)或移除雙端隊列的第一個元素。以Last單詞結(jié)尾的方法,表示插入,獲取或移除雙端隊列的最后一個元素。
另外插入方法add等同于addLast,移除方法remove等效于removeFirst。但是take方法卻等同于takeFirst,不知道是不是Jdk的bug,使用時還是用帶有First和Last后綴的方法更清楚。在初始化LinkedBlockingDeque時可以初始化隊列的容量,用來防止其再擴(kuò)容時過渡膨脹。另外雙向阻塞隊列可以運(yùn)用在“工作竊取”模式中。
2.3 阻塞隊列的實(shí)現(xiàn)原理
如果隊列是空的,消費(fèi)者會一直等待,當(dāng)生產(chǎn)者添加元素時候,消費(fèi)者是如何知道當(dāng)前隊列有元素的呢?如果讓你來設(shè)計阻塞隊列你會如何設(shè)計,讓生產(chǎn)者和消費(fèi)者能夠高效率的進(jìn)行通訊呢?讓我們先來看看JDK是如何實(shí)現(xiàn)的。
使用通知模式實(shí)現(xiàn)。所謂通知模式,就是當(dāng)生產(chǎn)者往滿的隊列里添加元素時會阻塞住生產(chǎn)者,當(dāng)消費(fèi)者消費(fèi)了一個隊列中的元素后,會通知生產(chǎn)者當(dāng)前隊列可用。通過查看JDK源碼發(fā)現(xiàn)ArrayBlockingQueue使用了Condition來實(shí)現(xiàn),代碼如下:
private final Condition notFull;
private final Condition notEmpty;
public ArrayBlockingQueue(int capacity, boolean fair) {
//省略其他代碼
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
insert(e);
} finally {
lock.unlock();
}
}
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return extract();
} finally {
lock.unlock();
}
}
private void insert(E x) {
items[putIndex] = x;
putIndex = inc(putIndex);
++count;
notEmpty.signal();
}
當(dāng)我們往隊列里插入一個元素時,如果隊列不可用,阻塞生產(chǎn)者主要通過LockSupport.park(this);來實(shí)現(xiàn)
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
繼續(xù)進(jìn)入源碼,發(fā)現(xiàn)調(diào)用setBlocker先保存下將要阻塞的線程,然后調(diào)用unsafe.park阻塞當(dāng)前線程。
public static void park(Object blocker) {
Thread t = Thread.currentThread();
setBlocker(t, blocker);
unsafe.park(false, 0L);
setBlocker(t, null);
}
unsafe.park是個native方法,代碼如下:
public native void park(boolean isAbsolute, long time);
park這個方法會阻塞當(dāng)前線程,只有以下四種情況中的一種發(fā)生時,該方法才會返回。
- 與
park對應(yīng)的unpark執(zhí)行或已經(jīng)執(zhí)行時。注意:已經(jīng)執(zhí)行是指unpark先執(zhí)行,然后再執(zhí)行的park - 線程被中斷時。
- 如果參數(shù)中的
time不是零,等待了指定的毫秒數(shù)時。 - 發(fā)生異?,F(xiàn)象時。這些異常事先無法確定。
2.4 阻塞隊列操作
public class Test {
public static void main(String[] args) throws InterruptedException {
BlockingQueue bqueue = new ArrayBlockingQueue(9);
for (int i = 0; i < 30; i++) {
//將指定元素添加到此隊列中,如果沒有可用空間,將一直等待(如果有必要)。
bqueue.put(i);
System.out.println("向阻塞隊列中添加了元素:" + i);
}
System.out.println("程序到此運(yùn)行結(jié)束,即將退出----");
}
}
運(yùn)行結(jié)果
向阻塞隊列中添加了元素:0
向阻塞隊列中添加了元素:1
向阻塞隊列中添加了元素:2
向阻塞隊列中添加了元素:3
向阻塞隊列中添加了元素:4
向阻塞隊列中添加了元素:5
向阻塞隊列中添加了元素:6
向阻塞隊列中添加了元素:7
向阻塞隊列中添加了元素:8
向阻塞隊列中添加了元素:9
可以看出,輸出到元素9時候,就一直處于等待狀態(tài),因?yàn)殛犃袧M了,程序阻塞了
順便看下阻塞棧
對于阻塞棧,與阻塞隊列相似。不同點(diǎn)在于棧是后入先出的結(jié)構(gòu),每次操作的是棧頂,而隊列是先進(jìn)先出的結(jié)構(gòu),每次操作的是隊列頭。
這里要特別說明一點(diǎn)的是,阻塞棧是Java6的新特征。、
Java為阻塞棧定義了接口:java.util.concurrent.BlockingDeque
下面看一個簡單例子:
public class Test {
public static void main(String[] args) throws InterruptedException {
BlockingDeque bDeque = new LinkedBlockingDeque(9);
for (int i = 0; i < 30; i++) {
//將指定元素添加到此阻塞棧中,如果沒有可用空間,將一直等待(如果有必要)。
bDeque.putFirst(i);
System.out.println("向阻塞棧中添加了元素:" + i);
}
System.out.println("程序到此運(yùn)行結(jié)束,即將退出----");
}
}
輸出結(jié)果:
向阻塞棧中添加了元素:0
向阻塞棧中添加了元素:1
向阻塞棧中添加了元素:2
向阻塞棧中添加了元素:3
向阻塞棧中添加了元素:4
向阻塞棧中添加了元素:5
向阻塞棧中添加了元素:6
向阻塞棧中添加了元素:7
向阻塞棧中添加了元素:8
向阻塞棧中添加了元素:9
從上面結(jié)果可以看到,程序并沒結(jié)束,二是阻塞住了,原因是棧已經(jīng)滿了,后面追加元素的操作都被阻塞了。