本文轉(zhuǎn)自:https://www.javadoop.com/
本文轉(zhuǎn)載自互聯(lián)網(wǎng),侵刪
本系列文章將整理到我在GitHub上的《Java面試指南》倉(cāng)庫(kù),更多精彩內(nèi)容請(qǐng)到我的倉(cāng)庫(kù)里查看
喜歡的話麻煩點(diǎn)下Star哈
文章同步發(fā)于我的個(gè)人博客:
本文是微信公眾號(hào)【Java技術(shù)江湖】的《Java并發(fā)指南》其中一篇,本文大部分內(nèi)容來(lái)源于網(wǎng)絡(luò),為了把本文主題講得清晰透徹,也整合了很多我認(rèn)為不錯(cuò)的技術(shù)博客內(nèi)容,引用其中了一些比較好的博客文章,如有侵權(quán),請(qǐng)聯(lián)系作者。
該系列博文會(huì)告訴你如何全面深入地學(xué)習(xí)Java并發(fā)技術(shù),從Java多線程基礎(chǔ),再到并發(fā)編程的基礎(chǔ)知識(shí),從Java并發(fā)包的入門和實(shí)戰(zhàn),再到JUC的源碼剖析,一步步地學(xué)習(xí)Java并發(fā)編程,并上手進(jìn)行實(shí)戰(zhàn),以便讓你更完整地了解整個(gè)Java并發(fā)編程知識(shí)體系,形成自己的知識(shí)框架。
為了更好地總結(jié)和檢驗(yàn)?zāi)愕膶W(xué)習(xí)成果,本系列文章也會(huì)提供一些對(duì)應(yīng)的面試題以及參考答案。
如果對(duì)本系列文章有什么建議,或者是有什么疑問(wèn)的話,也可以關(guān)注公眾號(hào)【Java技術(shù)江湖】聯(lián)系作者,歡迎你參與本系列博文的創(chuàng)作和修訂。
前言
最近得空,想寫篇文章好好說(shuō)說(shuō) java 線程池問(wèn)題,我相信很多人都一知半解的,包括我自己在仔仔細(xì)細(xì)看源碼之前,也有許多的不解,甚至有些地方我一直都沒(méi)有理解到位。
說(shuō)到線程池實(shí)現(xiàn),那么就不得不涉及到各種 BlockingQueue 的實(shí)現(xiàn),那么我想就 BlockingQueue 的問(wèn)題和大家分享分享我了解的一些知識(shí)。
本文沒(méi)有像之前分析 AQS 那樣一行一行源碼分析了,不過(guò)還是把其中最重要和最難理解的代碼說(shuō)了一遍,所以不免篇幅略長(zhǎng)。本文涉及到比較多的 Doug Lea 對(duì) BlockingQueue 的設(shè)計(jì)思想,希望有心的讀者真的可以有一些收獲,我覺(jué)得自己還是寫了一些干貨的。
本文直接參考 Doug Lea 寫的 Java doc 和注釋,這也是我們?cè)趯W(xué)習(xí) java 并發(fā)包時(shí)最好的材料了。希望大家能有所思、有所悟,學(xué)習(xí) Doug Lea 的代碼風(fēng)格,并將其優(yōu)雅、嚴(yán)謹(jǐn)?shù)淖黠L(fēng)應(yīng)用到我們寫的每一行代碼中。
目錄:
BlockingQueue
開(kāi)篇先介紹下 BlockingQueue 這個(gè)接口的規(guī)則,后面再看其實(shí)現(xiàn)。
首先,最基本的來(lái)說(shuō), BlockingQueue 是一個(gè)先進(jìn)先出的隊(duì)列(Queue),為什么說(shuō)是阻塞(Blocking)的呢?是因?yàn)?BlockingQueue 支持當(dāng)獲取隊(duì)列元素但是隊(duì)列為空時(shí),會(huì)阻塞等待隊(duì)列中有元素再返回;也支持添加元素時(shí),如果隊(duì)列已滿,那么等到隊(duì)列可以放入新元素時(shí)再放入。
BlockingQueue 是一個(gè)接口,繼承自 Queue,所以其實(shí)現(xiàn)類也可以作為 Queue 的實(shí)現(xiàn)來(lái)使用,而 Queue 又繼承自 Collection 接口。
BlockingQueue 對(duì)插入操作、移除操作、獲取元素操作提供了四種不同的方法用于不同的場(chǎng)景中使用:1、拋出異常;2、返回特殊值(null 或 true/false,取決于具體的操作);3、阻塞等待此操作,直到這個(gè)操作成功;4、阻塞等待此操作,直到成功或者超時(shí)指定時(shí)間??偨Y(jié)如下:
| Throws exception | Special value | Blocks | Times out | |
|---|---|---|---|---|
| Insert | add(e) | offer(e) | put(e) | offer(e, time, unit) |
| Remove | remove() | poll() | take() | poll(time, unit) |
| Examine | element() | peek() | not applicable | not applicable |
BlockingQueue 的各個(gè)實(shí)現(xiàn)都遵循了這些規(guī)則,當(dāng)然我們也不用死記這個(gè)表格,知道有這么回事,然后寫代碼的時(shí)候根據(jù)自己的需要去看方法的注釋來(lái)選取合適的方法即可。
對(duì)于 BlockingQueue,我們的關(guān)注點(diǎn)應(yīng)該在 put(e) 和 take() 這兩個(gè)方法,因?yàn)檫@兩個(gè)方法是帶阻塞的。
BlockingQueue 不接受 null 值的插入,相應(yīng)的方法在碰到 null 的插入時(shí)會(huì)拋出 NullPointerException 異常。null 值在這里通常用于作為特殊值返回(表格中的第三列),代表 poll 失敗。所以,如果允許插入 null 值的話,那獲取的時(shí)候,就不能很好地用 null 來(lái)判斷到底是代表失敗,還是獲取的值就是 null 值。
一個(gè) BlockingQueue 可能是有界的,如果在插入的時(shí)候,發(fā)現(xiàn)隊(duì)列滿了,那么 put 操作將會(huì)阻塞。通常,在這里我們說(shuō)的無(wú)界隊(duì)列也不是說(shuō)真正的無(wú)界,而是它的容量是 Integer.MAX_VALUE(21億多)。
BlockingQueue 是設(shè)計(jì)用來(lái)實(shí)現(xiàn)生產(chǎn)者-消費(fèi)者隊(duì)列的,當(dāng)然,你也可以將它當(dāng)做普通的 Collection 來(lái)用,前面說(shuō)了,它實(shí)現(xiàn)了 java.util.Collection 接口。例如,我們可以用 remove(x) 來(lái)刪除任意一個(gè)元素,但是,這類操作通常并不高效,所以盡量只在少數(shù)的場(chǎng)合使用,比如一條消息已經(jīng)入隊(duì),但是需要做取消操作的時(shí)候。
BlockingQueue 的實(shí)現(xiàn)都是線程安全的,但是批量的集合操作如 addAll, containsAll, retainAll 和 removeAll 不一定是原子操作。如 addAll(c) 有可能在添加了一些元素后中途拋出異常,此時(shí) BlockingQueue 中已經(jīng)添加了部分元素,這個(gè)是允許的,取決于具體的實(shí)現(xiàn)。
BlockingQueue 不支持 close 或 shutdown 等關(guān)閉操作,因?yàn)殚_(kāi)發(fā)者可能希望不會(huì)有新的元素添加進(jìn)去,此特性取決于具體的實(shí)現(xiàn),不做強(qiáng)制約束。
最后,BlockingQueue 在生產(chǎn)者-消費(fèi)者的場(chǎng)景中,是支持多消費(fèi)者和多生產(chǎn)者的,說(shuō)的其實(shí)就是線程安全問(wèn)題。
相信上面說(shuō)的每一句都很清楚了,BlockingQueue 是一個(gè)比較簡(jiǎn)單的線程安全容器,下面我會(huì)分析其具體的在 JDK 中的實(shí)現(xiàn),這里又到了 Doug Lea 表演時(shí)間了。
BlockingQueue 實(shí)現(xiàn)之 ArrayBlockingQueue
ArrayBlockingQueue 是 BlockingQueue 接口的有界隊(duì)列實(shí)現(xiàn)類,底層采用數(shù)組來(lái)實(shí)現(xiàn)。
其并發(fā)控制采用可重入鎖來(lái)控制,不管是插入操作還是讀取操作,都需要獲取到鎖才能進(jìn)行操作。
如果讀者看過(guò)我之前寫的《一行一行源碼分析清楚 AbstractQueuedSynchronizer(二)》 的關(guān)于 Condition 的文章的話,那么你一定能很容易看懂 ArrayBlockingQueue 的源碼,它采用一個(gè) ReentrantLock 和相應(yīng)的兩個(gè) Condition 來(lái)實(shí)現(xiàn)。
ArrayBlockingQueue 共有以下幾個(gè)屬性:
// 用于存放元素的數(shù)組
final Object[] items;
// 下一次讀取操作的位置
int takeIndex;
// 下一次寫入操作的位置
int putIndex;
// 隊(duì)列中的元素?cái)?shù)量
int count;
// 以下幾個(gè)就是控制并發(fā)用的同步器
final ReentrantLock lock;
private final Condition notEmpty;
private final Condition notFull;
我們用個(gè)示意圖來(lái)描述其同步機(jī)制:

ArrayBlockingQueue 實(shí)現(xiàn)并發(fā)同步的原理就是,讀操作和寫操作都需要獲取到 AQS 獨(dú)占鎖才能進(jìn)行操作。如果隊(duì)列為空,這個(gè)時(shí)候讀操作的線程進(jìn)入到讀線程隊(duì)列排隊(duì),等待寫線程寫入新的元素,然后喚醒讀線程隊(duì)列的第一個(gè)等待線程。如果隊(duì)列已滿,這個(gè)時(shí)候?qū)懖僮鞯木€程進(jìn)入到寫線程隊(duì)列排隊(duì),等待讀線程將隊(duì)列元素移除騰出空間,然后喚醒寫線程隊(duì)列的第一個(gè)等待線程。
對(duì)于 ArrayBlockingQueue,我們可以在構(gòu)造的時(shí)候指定以下三個(gè)參數(shù):
- 隊(duì)列容量,其限制了隊(duì)列中最多允許的元素個(gè)數(shù);
- 指定獨(dú)占鎖是公平鎖還是非公平鎖。非公平鎖的吞吐量比較高,公平鎖可以保證每次都是等待最久的線程獲取到鎖;
- 可以指定用一個(gè)集合來(lái)初始化,將此集合中的元素在構(gòu)造方法期間就先添加到隊(duì)列中。
更具體的源碼我就不進(jìn)行分析了,因?yàn)樗褪?AbstractQueuedSynchronizer 中 Condition 的使用,感興趣的讀者請(qǐng)看我寫的《一行一行源碼分析清楚 AbstractQueuedSynchronizer(二)》,因?yàn)橹灰炊四瞧恼?,ArrayBlockingQueue 的代碼就沒(méi)有分析的必要了,當(dāng)然,如果你完全不懂 Condition,那么基本上也就可以說(shuō)看不懂 ArrayBlockingQueue 的源碼了。
BlockingQueue 實(shí)現(xiàn)之 LinkedBlockingQueue
底層基于單向鏈表實(shí)現(xiàn)的阻塞隊(duì)列,可以當(dāng)做無(wú)界隊(duì)列也可以當(dāng)做有界隊(duì)列來(lái)使用??礃?gòu)造方法:
// 傳說(shuō)中的無(wú)界隊(duì)列
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
// 傳說(shuō)中的有界隊(duì)列
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}
我們看看這個(gè)類有哪些屬性:
// 隊(duì)列容量
private final int capacity;
// 隊(duì)列中的元素?cái)?shù)量
private final AtomicInteger count = new AtomicInteger(0);
// 隊(duì)頭
private transient Node<E> head;
// 隊(duì)尾
private transient Node<E> last;
// take, poll, peek 等讀操作的方法需要獲取到這個(gè)鎖
private final ReentrantLock takeLock = new ReentrantLock();
// 如果讀操作的時(shí)候隊(duì)列是空的,那么等待 notEmpty 條件
private final Condition notEmpty = takeLock.newCondition();
// put, offer 等寫操作的方法需要獲取到這個(gè)鎖
private final ReentrantLock putLock = new ReentrantLock();
// 如果寫操作的時(shí)候隊(duì)列是滿的,那么等待 notFull 條件
private final Condition notFull = putLock.newCondition();
這里用了兩個(gè)鎖,兩個(gè) Condition,簡(jiǎn)單介紹如下:
takeLock 和 notEmpty 怎么搭配:如果要獲取(take)一個(gè)元素,需要獲取 takeLock 鎖,但是獲取了鎖還不夠,如果隊(duì)列此時(shí)為空,還需要隊(duì)列不為空(notEmpty)這個(gè)條件(Condition)。
putLock 需要和 notFull 搭配:如果要插入(put)一個(gè)元素,需要獲取 putLock 鎖,但是獲取了鎖還不夠,如果隊(duì)列此時(shí)已滿,還需要隊(duì)列不是滿的(notFull)這個(gè)條件(Condition)。
首先,這里用一個(gè)示意圖來(lái)看看 LinkedBlockingQueue 的并發(fā)讀寫控制,然后再開(kāi)始分析源碼:

看懂這個(gè)示意圖,源碼也就簡(jiǎn)單了,讀操作是排好隊(duì)的,寫操作也是排好隊(duì)的,唯一的并發(fā)問(wèn)題在于一個(gè)寫操作和一個(gè)讀操作同時(shí)進(jìn)行,只要控制好這個(gè)就可以了。
先上構(gòu)造方法:
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}
注意,這里會(huì)初始化一個(gè)空的頭結(jié)點(diǎn),那么第一個(gè)元素入隊(duì)的時(shí)候,隊(duì)列中就會(huì)有兩個(gè)元素。讀取元素時(shí),也總是獲取頭節(jié)點(diǎn)后面的一個(gè)節(jié)點(diǎn)。count 的計(jì)數(shù)值不包括這個(gè)頭節(jié)點(diǎn)。
我們來(lái)看下 put 方法是怎么將元素插入到隊(duì)尾的:
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
// 如果你糾結(jié)這里為什么是 -1,可以看看 offer 方法。這就是個(gè)標(biāo)識(shí)成功、失敗的標(biāo)志而已。
int c = -1;
Node<E> node = new Node(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
// 必須要獲取到 putLock 才可以進(jìn)行插入操作
putLock.lockInterruptibly();
try {
// 如果隊(duì)列滿,等待 notFull 的條件滿足。
while (count.get() == capacity) {
notFull.await();
}
// 入隊(duì)
enqueue(node);
// count 原子加 1,c 還是加 1 前的值
c = count.getAndIncrement();
// 如果這個(gè)元素入隊(duì)后,還有至少一個(gè)槽可以使用,調(diào)用 notFull.signal() 喚醒等待線程。
// 哪些線程會(huì)等待在 notFull 這個(gè) Condition 上呢?
if (c + 1 < capacity)
notFull.signal();
} finally {
// 入隊(duì)后,釋放掉 putLock
putLock.unlock();
}
// 如果 c == 0,那么代表隊(duì)列在這個(gè)元素入隊(duì)前是空的(不包括head空節(jié)點(diǎn)),
// 那么所有的讀線程都在等待 notEmpty 這個(gè)條件,等待喚醒,這里做一次喚醒操作
if (c == 0)
signalNotEmpty();
}
// 入隊(duì)的代碼非常簡(jiǎn)單,就是將 last 屬性指向這個(gè)新元素,并且讓原隊(duì)尾的 next 指向這個(gè)元素
// 這里入隊(duì)沒(méi)有并發(fā)問(wèn)題,因?yàn)橹挥蝎@取到 putLock 獨(dú)占鎖以后,才可以進(jìn)行此操作
private void enqueue(Node<E> node) {
// assert putLock.isHeldByCurrentThread();
// assert last.next == null;
last = last.next = node;
}
// 元素入隊(duì)后,如果需要,調(diào)用這個(gè)方法喚醒讀線程來(lái)讀
private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
notEmpty.signal();
} finally {
takeLock.unlock();
}
}
我們?cè)倏纯?take 方法:
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
// 首先,需要獲取到 takeLock 才能進(jìn)行出隊(duì)操作
takeLock.lockInterruptibly();
try {
// 如果隊(duì)列為空,等待 notEmpty 這個(gè)條件滿足再繼續(xù)執(zhí)行
while (count.get() == 0) {
notEmpty.await();
}
// 出隊(duì)
x = dequeue();
// count 進(jìn)行原子減 1
c = count.getAndDecrement();
// 如果這次出隊(duì)后,隊(duì)列中至少還有一個(gè)元素,那么調(diào)用 notEmpty.signal() 喚醒其他的讀線程
if (c > 1)
notEmpty.signal();
} finally {
// 出隊(duì)后釋放掉 takeLock
takeLock.unlock();
}
// 如果 c == capacity,那么說(shuō)明在這個(gè) take 方法發(fā)生的時(shí)候,隊(duì)列是滿的
// 既然出隊(duì)了一個(gè),那么意味著隊(duì)列不滿了,喚醒寫線程去寫
if (c == capacity)
signalNotFull();
return x;
}
// 取隊(duì)頭,出隊(duì)
private E dequeue() {
// assert takeLock.isHeldByCurrentThread();
// assert head.item == null;
// 之前說(shuō)了,頭結(jié)點(diǎn)是空的
Node<E> h = head;
Node<E> first = h.next;
h.next = h; // help GC
// 設(shè)置這個(gè)為新的頭結(jié)點(diǎn)
head = first;
E x = first.item;
first.item = null;
return x;
}
// 元素出隊(duì)后,如果需要,調(diào)用這個(gè)方法喚醒寫線程來(lái)寫
private void signalNotFull() {
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
notFull.signal();
} finally {
putLock.unlock();
}
}
源碼分析就到這里結(jié)束了吧,畢竟還是比較簡(jiǎn)單的源碼,基本上只要讀者認(rèn)真點(diǎn)都看得懂。
BlockingQueue 實(shí)現(xiàn)之 SynchronousQueue
它是一個(gè)特殊的隊(duì)列,它的名字其實(shí)就蘊(yùn)含了它的特征 - - 同步的隊(duì)列。為什么說(shuō)是同步的呢?這里說(shuō)的并不是多線程的并發(fā)問(wèn)題,而是因?yàn)楫?dāng)一個(gè)線程往隊(duì)列中寫入一個(gè)元素時(shí),寫入操作不會(huì)立即返回,需要等待另一個(gè)線程來(lái)將這個(gè)元素拿走;同理,當(dāng)一個(gè)讀線程做讀操作的時(shí)候,同樣需要一個(gè)相匹配的寫線程的寫操作。這里的 Synchronous 指的就是讀線程和寫線程需要同步,一個(gè)讀線程匹配一個(gè)寫線程。
我們比較少使用到 SynchronousQueue 這個(gè)類,不過(guò)它在線程池的實(shí)現(xiàn)類 ThreadPoolExecutor 中得到了應(yīng)用,感興趣的讀者可以在看完這個(gè)后去看看相應(yīng)的使用。
雖然上面我說(shuō)了隊(duì)列,但是 SynchronousQueue 的隊(duì)列其實(shí)是虛的,其不提供任何空間(一個(gè)都沒(méi)有)來(lái)存儲(chǔ)元素。數(shù)據(jù)必須從某個(gè)寫線程交給某個(gè)讀線程,而不是寫到某個(gè)隊(duì)列中等待被消費(fèi)。
你不能在 SynchronousQueue 中使用 peek 方法(在這里這個(gè)方法直接返回 null),peek 方法的語(yǔ)義是只讀取不移除,顯然,這個(gè)方法的語(yǔ)義是不符合 SynchronousQueue 的特征的。SynchronousQueue 也不能被迭代,因?yàn)楦揪蜎](méi)有元素可以拿來(lái)迭代的。雖然 SynchronousQueue 間接地實(shí)現(xiàn)了 Collection 接口,但是如果你將其當(dāng)做 Collection 來(lái)用的話,那么集合是空的。當(dāng)然,這個(gè)類也是不允許傳遞 null 值的(并發(fā)包中的容器類好像都不支持插入 null 值,因?yàn)?null 值往往用作其他用途,比如用于方法的返回值代表操作失?。?。
接下來(lái),我們來(lái)看看具體的源碼實(shí)現(xiàn)吧,它的源碼不是很簡(jiǎn)單的那種,我們需要先搞清楚它的設(shè)計(jì)思想。
源碼加注釋大概有 1200 行,我們先看大框架:
// 構(gòu)造時(shí),我們可以指定公平模式還是非公平模式,區(qū)別之后再說(shuō)
public SynchronousQueue(boolean fair) {
transferer = fair ? new TransferQueue() : new TransferStack();
}
abstract static class Transferer {
// 從方法名上大概就知道,這個(gè)方法用于轉(zhuǎn)移元素,從生產(chǎn)者手上轉(zhuǎn)到消費(fèi)者手上
// 也可以被動(dòng)地,消費(fèi)者調(diào)用這個(gè)方法來(lái)從生產(chǎn)者手上取元素
// 第一個(gè)參數(shù) e 如果不是 null,代表場(chǎng)景為:將元素從生產(chǎn)者轉(zhuǎn)移給消費(fèi)者
// 如果是 null,代表消費(fèi)者等待生產(chǎn)者提供元素,然后返回值就是相應(yīng)的生產(chǎn)者提供的元素
// 第二個(gè)參數(shù)代表是否設(shè)置超時(shí),如果設(shè)置超時(shí),超時(shí)時(shí)間是第三個(gè)參數(shù)的值
// 返回值如果是 null,代表超時(shí),或者中斷。具體是哪個(gè),可以通過(guò)檢測(cè)中斷狀態(tài)得到。
abstract Object transfer(Object e, boolean timed, long nanos);
}
Transferer 有兩個(gè)內(nèi)部實(shí)現(xiàn)類,是因?yàn)闃?gòu)造 SynchronousQueue 的時(shí)候,我們可以指定公平策略。公平模式意味著,所有的讀寫線程都遵守先來(lái)后到,F(xiàn)IFO 嘛,對(duì)應(yīng) TransferQueue。而非公平模式則對(duì)應(yīng) TransferStack。

我們先采用公平模式分析源碼,然后再說(shuō)說(shuō)公平模式和非公平模式的區(qū)別。
接下來(lái),我們看看 put 方法和 take 方法:
// 寫入值
public void put(E o) throws InterruptedException {
if (o == null) throw new NullPointerException();
if (transferer.transfer(o, false, 0) == null) { // 1
Thread.interrupted();
throw new InterruptedException();
}
}
// 讀取值并移除
public E take() throws InterruptedException {
Object e = transferer.transfer(null, false, 0); // 2
if (e != null)
return (E)e;
Thread.interrupted();
throw new InterruptedException();
}
我們看到,寫操作 put(E o) 和讀操作 take() 都是調(diào)用 Transferer.transfer(…) 方法,區(qū)別在于第一個(gè)參數(shù)是否為 null 值。
我們來(lái)看看 transfer 的設(shè)計(jì)思路,其基本算法如下:
- 當(dāng)調(diào)用這個(gè)方法時(shí),如果隊(duì)列是空的,或者隊(duì)列中的節(jié)點(diǎn)和當(dāng)前的線程操作類型一致(如當(dāng)前操作是 put 操作,而隊(duì)列中的元素也都是寫線程)。這種情況下,將當(dāng)前線程加入到等待隊(duì)列即可。
- 如果隊(duì)列中有等待節(jié)點(diǎn),而且與當(dāng)前操作可以匹配(如隊(duì)列中都是讀操作線程,當(dāng)前線程是寫操作線程,反之亦然)。這種情況下,匹配等待隊(duì)列的隊(duì)頭,出隊(duì),返回相應(yīng)數(shù)據(jù)。
其實(shí)這里有個(gè)隱含的條件被滿足了,隊(duì)列如果不為空,肯定都是同種類型的節(jié)點(diǎn),要么都是讀操作,要么都是寫操作。這個(gè)就要看到底是讀線程積壓了,還是寫線程積壓了。
我們可以假設(shè)出一個(gè)男女配對(duì)的場(chǎng)景:一個(gè)男的過(guò)來(lái),如果一個(gè)人都沒(méi)有,那么他需要等待;如果發(fā)現(xiàn)有一堆男的在等待,那么他需要排到隊(duì)列后面;如果發(fā)現(xiàn)是一堆女的在排隊(duì),那么他直接牽走隊(duì)頭的那個(gè)女的。
既然這里說(shuō)到了等待隊(duì)列,我們先看看其實(shí)現(xiàn),也就是 QNode:
static final class QNode {
volatile QNode next; // 可以看出來(lái),等待隊(duì)列是單向鏈表
volatile Object item; // CAS'ed to or from null
volatile Thread waiter; // 將線程對(duì)象保存在這里,用于掛起和喚醒
final boolean isData; // 用于判斷是寫線程節(jié)點(diǎn)(isData == true),還是讀線程節(jié)點(diǎn)
QNode(Object item, boolean isData) {
this.item = item;
this.isData = isData;
}
......
相信說(shuō)了這么多以后,我們?cè)賮?lái)看 transfer 方法的代碼就輕松多了。
/**
* Puts or takes an item.
*/
Object transfer(Object e, boolean timed, long nanos) {
QNode s = null; // constructed/reused as needed
boolean isData = (e != null);
for (;;) {
QNode t = tail;
QNode h = head;
if (t == null || h == null) // saw uninitialized value
continue; // spin
// 隊(duì)列空,或隊(duì)列中節(jié)點(diǎn)類型和當(dāng)前節(jié)點(diǎn)一致,
// 即我們說(shuō)的第一種情況,將節(jié)點(diǎn)入隊(duì)即可。讀者要想著這塊 if 里面方法其實(shí)就是入隊(duì)
if (h == t || t.isData == isData) { // empty or same-mode
QNode tn = t.next;
// t != tail 說(shuō)明剛剛有節(jié)點(diǎn)入隊(duì),continue 即可
if (t != tail) // inconsistent read
continue;
// 有其他節(jié)點(diǎn)入隊(duì),但是 tail 還是指向原來(lái)的,此時(shí)設(shè)置 tail 即可
if (tn != null) { // lagging tail
// 這個(gè)方法就是:如果 tail 此時(shí)為 t 的話,設(shè)置為 tn
advanceTail(t, tn);
continue;
}
//
if (timed && nanos <= 0) // can't wait
return null;
if (s == null)
s = new QNode(e, isData);
// 將當(dāng)前節(jié)點(diǎn),插入到 tail 的后面
if (!t.casNext(null, s)) // failed to link in
continue;
// 將當(dāng)前節(jié)點(diǎn)設(shè)置為新的 tail
advanceTail(t, s); // swing tail and wait
// 看到這里,請(qǐng)讀者先往下滑到這個(gè)方法,看完了以后再回來(lái)這里,思路也就不會(huì)斷了
Object x = awaitFulfill(s, e, timed, nanos);
// 到這里,說(shuō)明之前入隊(duì)的線程被喚醒了,準(zhǔn)備往下執(zhí)行
if (x == s) { // wait was cancelled
clean(t, s);
return null;
}
if (!s.isOffList()) { // not already unlinked
advanceHead(t, s); // unlink if head
if (x != null) // and forget fields
s.item = s;
s.waiter = null;
}
return (x != null) ? x : e;
// 這里的 else 分支就是上面說(shuō)的第二種情況,有相應(yīng)的讀或?qū)懴嗥ヅ涞那闆r
} else { // complementary-mode
QNode m = h.next; // node to fulfill
if (t != tail || m == null || h != head)
continue; // inconsistent read
Object x = m.item;
if (isData == (x != null) || // m already fulfilled
x == m || // m cancelled
!m.casItem(x, e)) { // lost CAS
advanceHead(h, m); // dequeue and retry
continue;
}
advanceHead(h, m); // successfully fulfilled
LockSupport.unpark(m.waiter);
return (x != null) ? x : e;
}
}
}
void advanceTail(QNode t, QNode nt) {
if (tail == t)
UNSAFE.compareAndSwapObject(this, tailOffset, t, nt);
}
// 自旋或阻塞,直到滿足條件,這個(gè)方法返回
Object awaitFulfill(QNode s, Object e, boolean timed, long nanos) {
long lastTime = timed ? System.nanoTime() : 0;
Thread w = Thread.currentThread();
// 判斷需要自旋的次數(shù),
int spins = ((head.next == s) ?
(timed ? maxTimedSpins : maxUntimedSpins) : 0);
for (;;) {
// 如果被中斷了,那么取消這個(gè)節(jié)點(diǎn)
if (w.isInterrupted())
// 就是將當(dāng)前節(jié)點(diǎn) s 中的 item 屬性設(shè)置為 this
s.tryCancel(e);
Object x = s.item;
// 這里是這個(gè)方法的唯一的出口
if (x != e)
return x;
// 如果需要,檢測(cè)是否超時(shí)
if (timed) {
long now = System.nanoTime();
nanos -= now - lastTime;
lastTime = now;
if (nanos <= 0) {
s.tryCancel(e);
continue;
}
}
if (spins > 0)
--spins;
// 如果自旋達(dá)到了最大的次數(shù),那么檢測(cè)
else if (s.waiter == null)
s.waiter = w;
// 如果自旋到了最大的次數(shù),那么線程掛起,等待喚醒
else if (!timed)
LockSupport.park(this);
// spinForTimeoutThreshold 這個(gè)之前講 AQS 的時(shí)候其實(shí)也說(shuō)過(guò),剩余時(shí)間小于這個(gè)閾值的時(shí)候,就
// 不要進(jìn)行掛起了,自旋的性能會(huì)比較好
else if (nanos > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanos);
}
}
Doug Lea 的巧妙之處在于,將各個(gè)代碼湊在了一起,使得代碼非常簡(jiǎn)潔,當(dāng)然也同時(shí)增加了我們的閱讀負(fù)擔(dān),看代碼的時(shí)候,還是得仔細(xì)想想各種可能的情況。
下面,再說(shuō)說(shuō)前面說(shuō)的公平模式和非公平模式的區(qū)別。
相信大家心里面已經(jīng)有了公平模式的工作流程的概念了,我就簡(jiǎn)單說(shuō)說(shuō) TransferStack 的算法,就不分析源碼了。
- 當(dāng)調(diào)用這個(gè)方法時(shí),如果隊(duì)列是空的,或者隊(duì)列中的節(jié)點(diǎn)和當(dāng)前的線程操作類型一致(如當(dāng)前操作是 put 操作,而棧中的元素也都是寫線程)。這種情況下,將當(dāng)前線程加入到等待棧中,等待配對(duì)。然后返回相應(yīng)的元素,或者如果被取消了的話,返回 null。
- 如果棧中有等待節(jié)點(diǎn),而且與當(dāng)前操作可以匹配(如棧里面都是讀操作線程,當(dāng)前線程是寫操作線程,反之亦然)。將當(dāng)前節(jié)點(diǎn)壓入棧頂,和棧中的節(jié)點(diǎn)進(jìn)行匹配,然后將這兩個(gè)節(jié)點(diǎn)出棧。配對(duì)和出棧的動(dòng)作其實(shí)也不是必須的,因?yàn)橄旅娴囊粭l會(huì)執(zhí)行同樣的事情。
- 如果棧頂是進(jìn)行匹配而入棧的節(jié)點(diǎn),幫助其進(jìn)行匹配并出棧,然后再繼續(xù)操作。
應(yīng)該說(shuō),TransferStack 的源碼要比 TransferQueue 的復(fù)雜一些,如果讀者感興趣,請(qǐng)自行進(jìn)行源碼閱讀。
BlockingQueue 實(shí)現(xiàn)之 PriorityBlockingQueue
帶排序的 BlockingQueue 實(shí)現(xiàn),其并發(fā)控制采用的是 ReentrantLock,隊(duì)列為無(wú)界隊(duì)列(ArrayBlockingQueue 是有界隊(duì)列,LinkedBlockingQueue 也可以通過(guò)在構(gòu)造函數(shù)中傳入 capacity 指定隊(duì)列最大的容量,但是 PriorityBlockingQueue 只能指定初始的隊(duì)列大小,后面插入元素的時(shí)候,如果空間不夠的話會(huì)自動(dòng)擴(kuò)容)。
簡(jiǎn)單地說(shuō),它就是 PriorityQueue 的線程安全版本。不可以插入 null 值,同時(shí),插入隊(duì)列的對(duì)象必須是可比較大小的(comparable),否則報(bào) ClassCastException 異常。它的插入操作 put 方法不會(huì) block,因?yàn)樗菬o(wú)界隊(duì)列(take 方法在隊(duì)列為空的時(shí)候會(huì)阻塞)。
它的源碼相對(duì)比較簡(jiǎn)單,本節(jié)將介紹其核心源碼部分。
我們來(lái)看看它有哪些屬性:
// 構(gòu)造方法中,如果不指定大小的話,默認(rèn)大小為 11
private static final int DEFAULT_INITIAL_CAPACITY = 11;
// 數(shù)組的最大容量
private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
// 這個(gè)就是存放數(shù)據(jù)的數(shù)組
private transient Object[] queue;
// 隊(duì)列當(dāng)前大小
private transient int size;
// 大小比較器,如果按照自然序排序,那么此屬性可設(shè)置為 null
private transient Comparator<? super E> comparator;
// 并發(fā)控制所用的鎖,所有的 public 且涉及到線程安全的方法,都必須先獲取到這個(gè)鎖
private final ReentrantLock lock;
// 這個(gè)很好理解,其實(shí)例由上面的 lock 屬性創(chuàng)建
private final Condition notEmpty;
// 這個(gè)也是用于鎖,用于數(shù)組擴(kuò)容的時(shí)候,需要先獲取到這個(gè)鎖,才能進(jìn)行擴(kuò)容操作
// 其使用 CAS 操作
private transient volatile int allocationSpinLock;
// 用于序列化和反序列化的時(shí)候用,對(duì)于 PriorityBlockingQueue 我們應(yīng)該比較少使用到序列化
private PriorityQueue q;
此類實(shí)現(xiàn)了 Collection 和 Iterator 接口中的所有接口方法,對(duì)其對(duì)象進(jìn)行迭代并遍歷時(shí),不能保證有序性。如果你想要實(shí)現(xiàn)有序遍歷,建議采用 Arrays.sort(queue.toArray()) 進(jìn)行處理。PriorityBlockingQueue 提供了 drainTo 方法用于將部分或全部元素有序地填充(準(zhǔn)確說(shuō)是轉(zhuǎn)移,會(huì)刪除原隊(duì)列中的元素)到另一個(gè)集合中。還有一個(gè)需要說(shuō)明的是,如果兩個(gè)對(duì)象的優(yōu)先級(jí)相同(compare 方法返回 0),此隊(duì)列并不保證它們之間的順序。
PriorityBlockingQueue 使用了基于數(shù)組的二叉堆來(lái)存放元素,所有的 public 方法采用同一個(gè) lock 進(jìn)行并發(fā)控制。
二叉堆:一顆完全二叉樹,它非常適合用數(shù)組進(jìn)行存儲(chǔ),對(duì)于數(shù)組中的元素 a[i],其左子節(jié)點(diǎn)為 a[2*i+1],其右子節(jié)點(diǎn)為 a[2*i + 2],其父節(jié)點(diǎn)為 a[(i-1)/2],其堆序性質(zhì)為,每個(gè)節(jié)點(diǎn)的值都小于其左右子節(jié)點(diǎn)的值。二叉堆中最小的值就是根節(jié)點(diǎn),但是刪除根節(jié)點(diǎn)是比較麻煩的,因?yàn)樾枰{(diào)整樹。
簡(jiǎn)單用個(gè)圖解釋一下二叉堆,我就不說(shuō)太多專業(yè)的嚴(yán)謹(jǐn)?shù)男g(shù)語(yǔ)了,這種數(shù)據(jù)結(jié)構(gòu)的優(yōu)點(diǎn)是一目了然的,最小的元素一定是根元素,它是一棵滿的樹,除了最后一層,最后一層的節(jié)點(diǎn)從左到右緊密排列。

下面開(kāi)始 PriorityBlockingQueue 的源碼分析,首先我們來(lái)看看構(gòu)造方法:
// 默認(rèn)構(gòu)造方法,采用默認(rèn)值(11)來(lái)進(jìn)行初始化
public PriorityBlockingQueue() {
this(DEFAULT_INITIAL_CAPACITY, null);
}
// 指定數(shù)組的初始大小
public PriorityBlockingQueue(int initialCapacity) {
this(initialCapacity, null);
}
// 指定比較器
public PriorityBlockingQueue(int initialCapacity,
Comparator<? super E> comparator) {
if (initialCapacity < 1)
throw new IllegalArgumentException();
this.lock = new ReentrantLock();
this.notEmpty = lock.newCondition();
this.comparator = comparator;
this.queue = new Object[initialCapacity];
}
// 在構(gòu)造方法中就先填充指定的集合中的元素
public PriorityBlockingQueue(Collection<? extends E> c) {
this.lock = new ReentrantLock();
this.notEmpty = lock.newCondition();
//
boolean heapify = true; // true if not known to be in heap order
boolean screen = true; // true if must screen for nulls
if (c instanceof SortedSet<?>) {
SortedSet<? extends E> ss = (SortedSet<? extends E>) c;
this.comparator = (Comparator<? super E>) ss.comparator();
heapify = false;
}
else if (c instanceof PriorityBlockingQueue<?>) {
PriorityBlockingQueue<? extends E> pq =
(PriorityBlockingQueue<? extends E>) c;
this.comparator = (Comparator<? super E>) pq.comparator();
screen = false;
if (pq.getClass() == PriorityBlockingQueue.class) // exact match
heapify = false;
}
Object[] a = c.toArray();
int n = a.length;
// If c.toArray incorrectly doesn't return Object[], copy it.
if (a.getClass() != Object[].class)
a = Arrays.copyOf(a, n, Object[].class);
if (screen && (n == 1 || this.comparator != null)) {
for (int i = 0; i < n; ++i)
if (a[i] == null)
throw new NullPointerException();
}
this.queue = a;
this.size = n;
if (heapify)
heapify();
}
接下來(lái),我們來(lái)看看其內(nèi)部的自動(dòng)擴(kuò)容實(shí)現(xiàn):
private void tryGrow(Object[] array, int oldCap) {
// 這邊做了釋放鎖的操作
lock.unlock(); // must release and then re-acquire main lock
Object[] newArray = null;
// 用 CAS 操作將 allocationSpinLock 由 0 變?yōu)?1,也算是獲取鎖
if (allocationSpinLock == 0 &&
UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
0, 1)) {
try {
// 如果節(jié)點(diǎn)個(gè)數(shù)小于 64,那么增加的 oldCap + 2 的容量
// 如果節(jié)點(diǎn)數(shù)大于等于 64,那么增加 oldCap 的一半
// 所以節(jié)點(diǎn)數(shù)較小時(shí),增長(zhǎng)得快一些
int newCap = oldCap + ((oldCap < 64) ?
(oldCap + 2) :
(oldCap >> 1));
// 這里有可能溢出
if (newCap - MAX_ARRAY_SIZE > 0) { // possible overflow
int minCap = oldCap + 1;
if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
throw new OutOfMemoryError();
newCap = MAX_ARRAY_SIZE;
}
// 如果 queue != array,那么說(shuō)明有其他線程給 queue 分配了其他的空間
if (newCap > oldCap && queue == array)
// 分配一個(gè)新的大數(shù)組
newArray = new Object[newCap];
} finally {
// 重置,也就是釋放鎖
allocationSpinLock = 0;
}
}
// 如果有其他的線程也在做擴(kuò)容的操作
if (newArray == null) // back off if another thread is allocating
Thread.yield();
// 重新獲取鎖
lock.lock();
// 將原來(lái)數(shù)組中的元素復(fù)制到新分配的大數(shù)組中
if (newArray != null && queue == array) {
queue = newArray;
System.arraycopy(array, 0, newArray, 0, oldCap);
}
}
擴(kuò)容方法對(duì)并發(fā)的控制也非常的巧妙,釋放了原來(lái)的獨(dú)占鎖 lock,這樣的話,擴(kuò)容操作和讀操作可以同時(shí)進(jìn)行,提高吞吐量。
下面,我們來(lái)分析下寫操作 put 方法和讀操作 take 方法。
public void put(E e) {
// 直接調(diào)用 offer 方法,因?yàn)榍懊嫖覀円舱f(shuō)了,在這里,put 方法不會(huì)阻塞
offer(e);
}
public boolean offer(E e) {
if (e == null)
throw new NullPointerException();
final ReentrantLock lock = this.lock;
// 首先獲取到獨(dú)占鎖
lock.lock();
int n, cap;
Object[] array;
// 如果當(dāng)前隊(duì)列中的元素個(gè)數(shù) >= 數(shù)組的大小,那么需要擴(kuò)容了
while ((n = size) >= (cap = (array = queue).length))
tryGrow(array, cap);
try {
Comparator<? super E> cmp = comparator;
// 節(jié)點(diǎn)添加到二叉堆中
if (cmp == null)
siftUpComparable(n, e, array);
else
siftUpUsingComparator(n, e, array, cmp);
// 更新 size
size = n + 1;
// 喚醒等待的讀線程
notEmpty.signal();
} finally {
lock.unlock();
}
return true;
}
對(duì)于二叉堆而言,插入一個(gè)節(jié)點(diǎn)是簡(jiǎn)單的,插入的節(jié)點(diǎn)如果比父節(jié)點(diǎn)小,交換它們,然后繼續(xù)和父節(jié)點(diǎn)比較。
// 這個(gè)方法就是將數(shù)據(jù) x 插入到數(shù)組 array 的位置 k 處,然后再調(diào)整樹
private static <T> void siftUpComparable(int k, T x, Object[] array) {
Comparable<? super T> key = (Comparable<? super T>) x;
while (k > 0) {
// 二叉堆中 a[k] 節(jié)點(diǎn)的父節(jié)點(diǎn)位置
int parent = (k - 1) >>> 1;
Object e = array[parent];
if (key.compareTo((T) e) >= 0)
break;
array[k] = e;
k = parent;
}
array[k] = key;
}
我們用圖來(lái)示意一下,我們接下來(lái)要將 11 插入到隊(duì)列中,看看 siftUp 是怎么操作的。

我們?cè)倏纯?take 方法:
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
// 獨(dú)占鎖
lock.lockInterruptibly();
E result;
try {
// dequeue 出隊(duì)
while ( (result = dequeue()) == null)
notEmpty.await();
} finally {
lock.unlock();
}
return result;
}
private E dequeue() {
int n = size - 1;
if (n < 0)
return null;
else {
Object[] array = queue;
// 隊(duì)頭,用于返回
E result = (E) array[0];
// 隊(duì)尾元素先取出
E x = (E) array[n];
// 隊(duì)尾置空
array[n] = null;
Comparator<? super E> cmp = comparator;
if (cmp == null)
siftDownComparable(0, x, array, n);
else
siftDownUsingComparator(0, x, array, n, cmp);
size = n;
return result;
}
}
dequeue 方法返回隊(duì)頭,并調(diào)整二叉堆的樹,調(diào)用這個(gè)方法必須先獲取獨(dú)占鎖。
廢話不多說(shuō),出隊(duì)是非常簡(jiǎn)單的,因?yàn)殛?duì)頭就是最小的元素,對(duì)應(yīng)的是數(shù)組的第一個(gè)元素。難點(diǎn)是隊(duì)頭出隊(duì)后,需要調(diào)整樹。
private static <T> void siftDownComparable(int k, T x, Object[] array,
int n) {
if (n > 0) {
Comparable<? super T> key = (Comparable<? super T>)x;
// 這里得到的 half 肯定是非葉節(jié)點(diǎn)
// a[n] 是最后一個(gè)元素,其父節(jié)點(diǎn)是 a[(n-1)/2]。所以 n >>> 1 代表的節(jié)點(diǎn)肯定不是葉子節(jié)點(diǎn)
// 下面,我們結(jié)合圖來(lái)一行行分析,這樣比較直觀簡(jiǎn)單
// 此時(shí) k 為 0, x 為 17,n 為 9
int half = n >>> 1; // 得到 half = 4
while (k < half) {
// 先取左子節(jié)點(diǎn)
int child = (k << 1) + 1; // 得到 child = 1
Object c = array[child]; // c = 12
int right = child + 1; // right = 2
// 如果右子節(jié)點(diǎn)存在,而且比左子節(jié)點(diǎn)小
// 此時(shí) array[right] = 20,所以條件不滿足
if (right < n &&
((Comparable<? super T>) c).compareTo((T) array[right]) > 0)
c = array[child = right];
// key = 17, c = 12,所以條件不滿足
if (key.compareTo((T) c) <= 0)
break;
// 把 12 填充到根節(jié)點(diǎn)
array[k] = c;
// k 賦值后為 1
k = child;
// 一輪過(guò)后,我們發(fā)現(xiàn),12 左邊的子樹和剛剛的差不多,都是缺少根節(jié)點(diǎn),接下來(lái)處理就簡(jiǎn)單了
}
array[k] = key;
}
}

記住二叉堆是一棵完全二叉樹,那么根節(jié)點(diǎn) 10 拿掉后,最后面的元素 17 必須找到合適的地方放置。首先,17 和 10 不能直接交換,那么先將根節(jié)點(diǎn) 10 的左右子節(jié)點(diǎn)中較小的節(jié)點(diǎn)往上滑,即 12 往上滑,然后原來(lái) 12 留下了一個(gè)空節(jié)點(diǎn),然后再把這個(gè)空節(jié)點(diǎn)的較小的子節(jié)點(diǎn)往上滑,即 13 往上滑,最后,留出了位子,17 補(bǔ)上即可。
我稍微調(diào)整下這個(gè)樹,以便讀者能更明白:

好了, PriorityBlockingQueue 我們也說(shuō)完了。
總結(jié)
我知道本文過(guò)長(zhǎng),相信一字不漏看完的讀者肯定是少數(shù)。
ArrayBlockingQueue 底層是數(shù)組,有界隊(duì)列,如果我們要使用生產(chǎn)者-消費(fèi)者模式,這是非常好的選擇。
LinkedBlockingQueue 底層是鏈表,可以當(dāng)做無(wú)界和有界隊(duì)列來(lái)使用,所以大家不要以為它就是無(wú)界隊(duì)列。
SynchronousQueue 本身不帶有空間來(lái)存儲(chǔ)任何元素,使用上可以選擇公平模式和非公平模式。
PriorityBlockingQueue 是無(wú)界隊(duì)列,基于數(shù)組,數(shù)據(jù)結(jié)構(gòu)為二叉堆,數(shù)組第一個(gè)也是樹的根節(jié)點(diǎn)總是最小值。
(全文完)