阻塞隊列之LinkedBlockingQueue
在前面的文章中,已經對JDK中的BlockingQueue做了一個回顧,同時對ArrayBlockingQueue中的核心方法作了說明,而LinkedBlockingQueue作為JDK中BlockingQueue家族系列中一員,由于其作為固定大小線程池(Executors.newFixedThreadPool())底層所使用的阻塞隊列,分析它的目的主要在于2點:
(1) 與ArrayBlockingQueue進行類比學習,加深各種數據結構的理解;
(2) 了解底層實現(xiàn),能夠更好地理解每一種阻塞隊列對線程池性能的影響,做到真正的知其然,且知其所以然。
源碼分析LinkedBlockingQueue的實現(xiàn)
LinkedBlockingQueue:顧名思義,它是一個基于鏈表的阻塞隊列,首先看一下它的核心組成。
// 所有的元素都通過Node這個靜態(tài)內部類來進行存儲,這與LinkedList的處理方式完全一樣
static class Node<E> {
//使用item來保存元素本身
E item;
//保存當前節(jié)點的后繼節(jié)點
Node<E> next;
Node(E x) { item = x; }
}
/**
阻塞隊列所能存儲的最大容量
用戶可以在創(chuàng)建時手動指定最大容量,如果用戶沒有指定最大容量
那么最默認的最大容量為Integer.MAX_VALUE.
*/
private final int capacity;
/**
當前阻塞隊列中的元素數量
PS:如果你看過ArrayBlockingQueue的源碼,你會發(fā)現(xiàn)
ArrayBlockingQueue底層保存元素數量使用的是一個
普通的int類型變量。其原因是在ArrayBlockingQueue底層
對于元素的入隊列和出隊列使用的是同一個lock對象。而數
量的修改都是在處于線程獲取鎖的情況下進行操作,因此不
會有線程安全問題。
而LinkedBlockingQueue卻不是,它的入隊列和出隊列使用的是兩個
不同的lock對象,因此無論是在入隊列還是出隊列,都會涉及對元素數
量的并發(fā)修改,(之后通過源碼可以更加清楚地看到)因此這里使用了一個原子操作類
來解決對同一個變量進行并發(fā)修改的線程安全問題。
*/
private final AtomicInteger count = new AtomicInteger(0);
/**
* 鏈表的頭部
* LinkedBlockingQueue的頭部具有一個不變性:
* 頭部的元素總是為null,head.item==null
*/
private transient Node<E> head;
/**
* 鏈表的尾部
* LinkedBlockingQueue的尾部也具有一個不變性:
* 即last.next==null
*/
private transient Node<E> last;
/**
元素出隊列時線程所獲取的鎖
當執(zhí)行take、poll等操作時線程需要獲取的鎖
*/
private final ReentrantLock takeLock = new ReentrantLock();
/**
當隊列為空時,通過該Condition讓從隊列中獲取元素的線程處于等待狀態(tài)
*/
private final Condition notEmpty = takeLock.newCondition();
/**
元素入隊列時線程所獲取的鎖
當執(zhí)行add、put、offer等操作時線程需要獲取鎖
*/
private final ReentrantLock putLock = new ReentrantLock();
/**
當隊列的元素已經達到capactiy,通過該Condition讓入隊列的線程處于等待狀態(tài)
*/
private final Condition notFull = putLock.newCondition();
通過上面的分析,我們可以發(fā)現(xiàn)LinkedBlockingQueue在入隊列和出隊列時使用的不是同一個Lock,這也意味著它們之間的操作不會存在互斥操作。在多個CPU的情況下,它們可以做到真正的在同一時刻既消費、又生產,能夠做到并行處理。
下面讓我們看下LinkedBlockingQueue的構造方法:
/**
* 如果用戶沒有顯示指定capacity的值,默認使用int的最大值
*/
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
/**
可以看到,當隊列中沒有任何元素的時候,此時隊列的頭部就等于隊列的尾部,
指向的是同一個節(jié)點,并且元素的內容為null
*/
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}
/*
在初始化LinkedBlockingQueue的時候,還可以直接將一個集合
中的元素全部入隊列,此時隊列最大容量依然是int的最大值。
*/
public LinkedBlockingQueue(Collection<? extends E> c) {
this(Integer.MAX_VALUE);
final ReentrantLock putLock = this.putLock;
//獲取鎖
putLock.lock(); // Never contended, but necessary for visibility
try {
//迭代集合中的每一個元素,讓其入隊列,并且更新一下當前隊列中的元素數量
int n = 0;
for (E e : c) {
if (e == null)
throw new NullPointerException();
if (n == capacity)
throw new IllegalStateException("Queue full");
//參考下面的enqueue分析
enqueue(new Node<E>(e));
++n;
}
count.set(n);
} finally {
//釋放鎖
putLock.unlock();
}
}
/**
* 我去,這代碼其實可讀性不怎么樣啊。
* 其實下面的代碼等價于如下內容:
* last.next=node;
* last = node;
* 其實也沒有什么花樣:
就是讓新入隊列的元素成為原來的last的next,讓進入的元素成為last
*
*/
private void enqueue(Node<E> node) {
// assert putLock.isHeldByCurrentThread();
// assert last.next == null;
last = last.next = node;
}
在分析完LinkedBlockingQueue的核心組成之后,下面讓我們再看下核心的幾個操作方法,首先分析一下元素入隊列的過程:
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
// Note: convention in all put/take/etc is to preset local var
/*注意上面這句話,約定所有的put/take操作都會預先設置本地變量,
可以看到下面有一個將putLock賦值給了一個局部變量的操作
*/
int c = -1;
Node<E> node = new Node(e);
/*
在這里首先獲取到putLock,以及當前隊列的元素數量
即上面所描述的預設置本地變量操作
*/
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
/*
執(zhí)行可中斷的鎖獲取操作,即意味著如果線程由于獲取
鎖而處于Blocked狀態(tài)時,線程是可以被中斷而不再繼
續(xù)等待,這也是一種避免死鎖的一種方式,不會因為
發(fā)現(xiàn)到死鎖之后而由于無法中斷線程最終只能重啟應用。
*/
putLock.lockInterruptibly();
try {
/*
當隊列的容量到底最大容量時,此時線程將處于等待狀
態(tài),直到隊列有空閑的位置才繼續(xù)執(zhí)行。使用while判
斷依舊是為了防止線程被"偽喚醒”而出現(xiàn)的情況,即當
線程被喚醒時而隊列的大小依舊等于capacity時,線
程應該繼續(xù)等待。
*/
while (count.get() == capacity) {
notFull.await();
}
//讓元素進行隊列的末尾,enqueue代碼在上面分析過了
enqueue(node);
//首先獲取原先隊列中的元素個數,然后再對隊列中的元素個數+1.
c = count.getAndIncrement();
/*注:c+1得到的結果是新元素入隊列之后隊列元素的總和。
當前隊列中的總元素個數小于最大容量時,此時喚醒其他執(zhí)行入隊列的線程
讓它們可以放入元素,如果新加入元素之后,隊列的大小等于capacity,
那么就意味著此時隊列已經滿了,也就沒有必須要喚醒其他正在等待入隊列的線程,因為喚醒它們之后,它們也還是繼續(xù)等待。
*/
if (c + 1 < capacity)
notFull.signal();
} finally {
//完成對鎖的釋放
putLock.unlock();
}
/*當c=0時,即意味著之前的隊列是空隊列,出隊列的線程都處于等待狀態(tài),
現(xiàn)在新添加了一個新的元素,即隊列不再為空,因此它會喚醒正在等待獲取元素的線程。
*/
if (c == 0)
signalNotEmpty();
}
/*
喚醒正在等待獲取元素的線程,告訴它們現(xiàn)在隊列中有元素了
*/
private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
//通過notEmpty喚醒獲取元素的線程
notEmpty.signal();
} finally {
takeLock.unlock();
}
}
在分析完入隊列的過程之后,我們接下來看看LinkedBlockingQueue出隊列的過程;由于BlockingQueue的方法都具有對稱性,此處就只分析take方法的實現(xiàn),其余方法的實現(xiàn)都如出一轍:
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
//通過takeLock獲取鎖,并且支持線程中斷
takeLock.lockInterruptibly();
try {
//當隊列為空時,則讓當前線程處于等待
while (count.get() == 0) {
notEmpty.await();
}
//完成元素的出隊列
x = dequeue();
/*
隊列元素個數完成原子化操作-1,可以看到count元素會
在插入元素的線程和獲取元素的線程進行并發(fā)修改操作。
*/
c = count.getAndDecrement();
/*
當一個元素出隊列之后,隊列的大小依舊大于1時
當前線程會喚醒其他執(zhí)行元素出隊列的線程,讓它們也
可以執(zhí)行元素的獲取
*/
if (c > 1)
notEmpty.signal();
} finally {
//完成鎖的釋放
takeLock.unlock();
}
/*
當c==capaitcy時,即在獲取當前元素之前,
隊列已經滿了,而此時獲取元素之后,隊列就會
空出一個位置,故當前線程會喚醒執(zhí)行插入操作的線
程通知其中的一個可以進行插入操作。
*/
if (c == capacity)
signalNotFull();
return x;
}
/*
喚醒等待插入元素的線程,告訴它們現(xiàn)在隊列中有空間了
*/
private void signalNotFull() {
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
notFull.signal();
} finally {
putLock.unlock();
}
}
/**
* 讓頭部元素出隊列的過程
* 其最終的目的是讓原來的head被GC回收,讓其的next成為head
* 并且新的head的item為null.
* 因為LinkedBlockingQueue的頭部具有一致性:即元素為null。
*/
private E dequeue() {
Node<E> h = head;
Node<E> first = h.next;
h.next = h; // help GC
head = first;
E x = first.item;
first.item = null;
return x;
}

對于LinkedBlockingQueue的源碼分析就到這里,下面讓我們將LinkedBlockingQueue與ArrayBlockingQueue進行一個比較。
LinkedBlockingQueue與ArrayBlockingQueue的比較
ArrayBlockingQueue由于其底層基于數組,并且在創(chuàng)建時指定存儲的大小,在完成后就會立即在內存分配固定大小容量的數組元素,因此其存儲通常有限,故其是一個“有界“的阻塞隊列;而LinkedBlockingQueue可以由用戶指定最大存儲容量,也可以無需指定,如果不指定則最大存儲容量將是Integer.MAX_VALUE,即可以看作是一個“無界”的阻塞隊列,由于其節(jié)點的創(chuàng)建都是動態(tài)創(chuàng)建,并且在節(jié)點出隊列后可以被GC所回收,因此其具有靈活的伸縮性。但是由于ArrayBlockingQueue的有界性,因此其能夠更好的對于性能進行預測,而LinkedBlockingQueue由于沒有限制大小,當任務非常多的時候,不停地向隊列中存儲,就有可能導致內存溢出的情況發(fā)生。
其次,ArrayBlockingQueue中在入隊列和出隊列操作過程中,使用的是同一個lock,所以即使在多核CPU的情況下,其入隊和出隊的操作都無法做到并行,而LinkedBlockingQueue的讀取和插入操作所使用的鎖是兩個不同的lock,它們之間的操作互相不受干擾,因此兩種操作可以并行完成,故LinkedBlockingQueue的吞吐量要高于ArrayBlockingQueue。
選擇LinkedBlockingQueue的理由
/**
下面的代碼是Executors創(chuàng)建固定大小線程池的代碼,其使用了
LinkedBlockingQueue來作為任務隊列。
*/
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
JDK中選用LinkedBlockingQueue作為阻塞隊列的原因就在于其無界性。因為線程大小固定的線程池,其線程的數量是不具備伸縮性的,當任務非常繁忙的時候,就勢必會導致所有的線程都處于工作狀態(tài),如果使用一個有界的阻塞隊列來進行處理,那么就非常有可能很快導致隊列滿的情況發(fā)生,從而導致任務無法提交而拋出RejectedExecutionException,而使用無界隊列由于其良好的存儲容量的伸縮性,可以很好的去緩沖任務繁忙情況下的場景,即使任務非常多,也可以進行動態(tài)擴容,當任務被處理完成之后,隊列中的節(jié)點也會被隨之被GC回收,非常靈活。