目錄
最近在閱讀Spark源碼的過程中,又重新接觸到了一些Java并發(fā)方面的知識,于是就見縫插針地將它們記錄下來,當做復習與備忘。
阻塞隊列簡介
阻塞隊列的定義
根據(jù)Doug Lea在JavaDoc中的解釋,所謂阻塞隊列,就是在普通隊列的基礎之上,支持以下兩種操作的隊列:
- 當某線程從隊列獲取元素時,如果隊列為空,就等待(阻塞)直至隊列中有元素;
- 當某線程向隊列插入元素時,如果隊列已滿,就等待(阻塞)直至隊列中有空間。
也就是說,阻塞隊列是自帶同步機制的隊列。它最常用來解決線程同步中經典的生產者-消費者問題,前面講過的Spark Core異步事件總線中,就采用阻塞隊列作為事件存儲。
Java中的阻塞隊列
Java中阻塞隊列的基類是j.u.c.BlockingQueue接口,它繼承自Queue接口,并且定義了額外的方法實現(xiàn)同步:
void put(E e) throws InterruptedException;
boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException;
E take() throws InterruptedException;
E poll(long timeout, TimeUnit unit) throws InterruptedException;
上述put()與offer()方法用于向隊列插入元素,take()與poll()方法則是從隊列獲取元素。不同的是,put()與take()方法在插入/獲取時,如果必須等待,就會一直阻塞下去;而offer()與poll()方法可以指定阻塞的時間長度。
以BlockingQueue接口為中心的繼承關系如下圖所示。

平時開發(fā)中比較常用的阻塞隊列是基于數(shù)組實現(xiàn)的ArrayBlockingQueue,與基于單鏈表實現(xiàn)的LinkedBlockingQueue。本文選擇后者來深入看一下阻塞隊列的實現(xiàn)細節(jié),因為它的性能在多數(shù)情況下更優(yōu),可以自行寫benchmark程序來測測。
LinkedBlockingQueue
LinkedBlockingQueue(以下簡稱LBQ)是基于單鏈表實現(xiàn)的,先進先出(FIFO)的有界阻塞隊列。
單鏈表定義
LBQ的單鏈表結點數(shù)據(jù)結構定義在靜態(tài)內部類Node中。
static class Node<E> {
E item;
Node<E> next;
Node(E x) { item = x; }
}
在類的內部還定義了單鏈表的頭結點與尾結點。
transient Node<E> head;
private transient Node<E> last;
head始終指向鏈表的第一個結點,該結點是哨兵結點,不存儲數(shù)據(jù),只標記鏈表的開始,即head.item == null。這樣可以避免只有一個結點時造成混亂。
tail始終指向鏈表的最后一個結點,該結點是有數(shù)據(jù)的,并滿足last.next == null。
LBQ在隊頭獲取及彈出元素,在隊尾插入元素。
鎖和等待隊列
LBQ采用雙鎖機制保證入隊和出隊可以同時進行,互不干擾。
private final ReentrantLock takeLock = new ReentrantLock();
private final Condition notEmpty = takeLock.newCondition();
private final ReentrantLock putLock = new ReentrantLock();
private final Condition notFull = putLock.newCondition();
可見定義有兩個ReentrantLock,takeLock用于控制出隊,putLock用于控制入隊。另外,還有這兩個鎖分別對應的條件變量notEmpty和notFull,分別維護出隊和入隊線程的等待隊列。ReentrantLock和Condition都是Java AQS機制的重要組成部分,之后也會細說。
值得注意的是,在某些方法中需要同時對takeLock與putLock加鎖與解鎖,所以LBQ內部也提供了這樣的方法。
void fullyLock() {
putLock.lock();
takeLock.lock();
}
void fullyUnlock() {
takeLock.unlock();
putLock.unlock();
}
這兩個方法總會成對調用,保證所有需要同時加鎖和解鎖的地方,其順序都一致并且不可中斷,也防止了前一個鎖操作成功執(zhí)行,后一個鎖操作被打斷導致死鎖的風險。
另外,LBQ也對條件變量的Condition.signal()方法進行了簡單封裝,分別用來喚醒阻塞的出隊操作線程和入隊操作線程。
private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
notEmpty.signal();
} finally {
takeLock.unlock();
}
}
private void signalNotFull() {
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
notFull.signal();
} finally {
putLock.unlock();
}
}
容量和計數(shù)
private final int capacity;
private final AtomicInteger count = new AtomicInteger();
capacity是LBQ的最大容量,可以在構造方法中隨同名參數(shù)傳入,默認值是Integer.MAX_VALUE。
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}
count則是LBQ內當前元素的計數(shù),由于入隊和出隊動作可以并發(fā)執(zhí)行,所以要用原子類型AtomicInteger保證線程安全。
入隊操作
由于put()和offer()方法的邏輯基本相同,所以只看offer()方法就好了。
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
if (e == null) throw new NullPointerException();
long nanos = unit.toNanos(timeout);
int c = -1;
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
while (count.get() == capacity) {
if (nanos <= 0)
return false;
nanos = notFull.awaitNanos(nanos);
}
enqueue(new Node<E>(e));
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
return true;
}
private void enqueue(Node<E> node) {
// assert putLock.isHeldByCurrentThread();
// assert last.next == null;
last = last.next = node;
}
在入隊時,首先將putLock加鎖,然后用衛(wèi)語句count.get() == capacity判斷隊列是否已滿,若已滿,則進入等待循環(huán)。當阻塞的時間超時后,判定入隊操作失敗,并返回false。
如果隊列未滿,或者在超時時間未到時有了空間,就調用enqueue()方法在隊尾插入元素,并將計數(shù)器自增。入隊后若還有更多的剩余空間,則喚醒其他等待的入隊線程。
最后將putLock解鎖,并檢查由count.getAndIncrement()返回的值是否為0。如果為0,表示隊列剛剛由空變?yōu)榉强諣顟B(tài),因此也要喚醒等待的出隊線程。
出隊操作
同理,只看poll()方法。
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
E x = null;
int c = -1;
long nanos = unit.toNanos(timeout);
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
if (nanos <= 0)
return null;
nanos = notEmpty.awaitNanos(nanos);
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
private E dequeue() {
// assert takeLock.isHeldByCurrentThread();
// assert head.item == null;
Node<E> h = head;
Node<E> first = h.next;
h.next = h; // help GC
head = first;
E x = first.item;
first.item = null;
return x;
}
將講解入隊方法時的描述反著說一遍就行了:
在出隊時,首先將takeLock加鎖,然后用衛(wèi)語句count.get() == 0判斷隊列是否為空,若為空,則進入等待循環(huán)。當阻塞的時間超時后,判定出隊操作失敗,并返回false。
如果隊列不為空,或者在超時時間未到時進了新元素,就調用dequeue()方法彈出隊頭元素,并將計數(shù)器自減。出隊后若還有更多的剩余元素,則喚醒其他等待的出隊線程。
最后將takeLock解鎖,并檢查由count.getAndDecrement()返回的值是否為capacity。如果為capacity,表示隊列剛剛由滿變?yōu)椴粷M狀態(tài),因此也要喚醒等待的入隊線程。
需要操作雙鎖的情況
以remove()方法為例。
public boolean remove(Object o) {
if (o == null) return false;
fullyLock();
try {
for (Node<E> trail = head, p = trail.next;
p != null;
trail = p, p = p.next) {
if (o.equals(p.item)) {
unlink(p, trail);
return true;
}
}
return false;
} finally {
fullyUnlock();
}
}
void unlink(Node<E> p, Node<E> trail) {
// assert isFullyLocked();
// p.next is not changed, to allow iterators that are
// traversing p to maintain their weak-consistency guarantee.
p.item = null;
trail.next = p.next;
if (last == p)
last = trail;
if (count.getAndDecrement() == capacity)
notFull.signal();
}

由于單鏈表刪除結點涉及到對鏈表的遍歷,以及對前驅和后繼結點的斷鏈和補鏈,因此必須將兩個鎖都加上,禁止一切修改。待刪除成功后才能解鎖,繼續(xù)正常的入隊和出隊操作。
生產者-消費者問題示例
生產者-消費者問題的解決方法用操作系統(tǒng)理論中的信號量PV(wait-signal)原語描述如下:
semaphore filled = 0;
semaphore empty = BUF_CAPACITY;
mutex_semaphore mutex = 1;
procedure producer() {
while (true) {
item = produce();
wait(empty);
wait(mutex);
buffer.put(item);
signal(mutex);
signal(filled);
}
}
procedure consumer() {
while (true) {
wait(filled);
wait(mutex);
item = buffer.get();
signal(mutex);
signal(empty);
consume(item);
}
}
利用阻塞隊列可以免去自己實現(xiàn)同步機制的麻煩,從而非常方便地實現(xiàn)。一個極簡的示例如下:
public class ProducerConsumerExample {
private static final int BUF_CAPACITY = 16;
public static void main(String[] args) {
BlockingQueue<Long> blockingQueue = new LinkedBlockingQueue<>(BUF_CAPACITY);
Thread producerThread = new Thread(() -> {
try {
while (true) {
long value = System.currentTimeMillis() % 1000;
blockingQueue.put(value);
Thread.sleep(value);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "producer");
Thread consumerThread = new Thread(() -> {
try {
while (true) {
System.out.println(blockingQueue.take());
Thread.sleep(System.currentTimeMillis() % 1000);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "consumer");
producerThread.start();
consumerThread.start();
}
}
一個?。ǎ浚﹩栴}
在上面的代碼(以及j.u.c包中很多類的代碼)的方法體中,經常能看到類似以下的語句:
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
也就是有些類中定義的字段,在方法中使用時會先賦值給一個局部變量。這樣做到底是為了什么?以目前我所了解到的而言,還沒有特別確切的答案,但可以確定是一個非常微小的優(yōu)化,與JVM及緩存有關。
以下是reference傳送門:
- https://stackoverflow.com/questions/8019831/java-lock-variable-assignment-before-use-why
- https://stackoverflow.com/questions/2785964/in-arrayblockingqueue-why-copy-final-member-field-into-local-final-variable
- http://cs.oswego.edu/pipermail/concurrency-interest/2013-February/010768.html
順便,StackOverflow最近(不知道是哪一天)改版成了1998年的樣式,滿滿的懷舊感。上面concurrency-interest郵件列表中關于這個問題也是眾說紛紜,如果仔細爬樓還會發(fā)現(xiàn)Doug Lea本人的回復,不過有些令人費解。