一、阻塞隊列 BlockingQueue
在java.util.concurrent包中,BlockingQueue很好的解決了多線程中,如何高效安全“傳輸”數(shù)據(jù)的問題。通過這些高效并且線程安全的隊列類,為我們快速搭建高質(zhì)量的多線程程序帶來極大的便利。
1.1、BlockingQueue的基本原理
先來解釋一下阻塞隊列:

如上圖:
- 1、生產(chǎn)線程1往阻塞隊列里面添加新的數(shù)據(jù),當阻塞隊列滿的時候(針對有界隊列),生產(chǎn)線程1將會處于阻塞狀態(tài),直到消費線程2從隊列中取走一個數(shù)據(jù);
- 2、消費線程2從阻塞隊列取數(shù)據(jù),當阻塞隊列空的時候,消費線程2將會處于阻塞狀態(tài),直到生產(chǎn)線程把一個數(shù)據(jù)放進去。
阻塞隊列的基本原理就這樣,至于隊列是用什么數(shù)據(jù)結(jié)構(gòu)進行存儲的,這里并沒有規(guī)定,所以后面我們可以看到很多阻塞隊列的實現(xiàn)。
阻塞隊列的常用方法
查閱BlockingQueue總結(jié)了以下阻塞隊列的方法:
1、boolean add(E e)
- 在不違反容量限制的情況下,可立即將指定元素插入此隊列,成功返回true,當無可用空間時候,返回IllegalStateException異常。
2、boolean offer(E e)
- 在不違反容量限制的情況下,可立即將指定元素插入此隊列,成功返回true,當無可用空間時候,返回false。
3、void put(E e)
- 直接在隊列中插入元素,當無可用空間時候,阻塞等待。
4、boolean offer(E e, long timeout, TimeUnit unit)
- 將給定元素在給定的時間內(nèi)設(shè)置到隊列中,如果設(shè)置成功返回true, 否則返回false。
5、E take()
- 獲取并移除隊列頭部的元素,無元素時候阻塞等待。
6、E poll( long time, timeunit unit)
- 獲取并移除隊列頭部的元素,無元素時候阻塞等待指定時間。
7、boolean remove()
- 獲取并移除隊列頭部的元素,無元素時候會拋出NoSuchElementException異常。
8、E element()
- 不移除的情況下返回列頭部的元素,無元素時候會拋出NoSuchElementException異常。
9、E peek()
- 不移除的情況下返回列頭部的元素,隊列為空無元素時返回null。
注意:
根據(jù)remove(Object o)方法簽名可知,這個方法可以移除隊列的特定對象,但是這個方法效率并不高。因為需要遍歷隊列匹配到特定的對象之后,再進行移除。
以上支持阻塞和超時的方法都是能夠響應(yīng)中斷的。
1.2、BlockingQueue的實現(xiàn)

BlockingQueue底層也是基于AQS實現(xiàn)的,隊列的阻塞使用ReentrantLock的Condition實現(xiàn)的。
下面我們來看看各個實現(xiàn)類的原理。以下分析我都會基于支持阻塞的put和take方法來分析。
二、LinkedBlockingDeque
LinkedBlockingDeque是一個由鏈表結(jié)構(gòu)(雙向鏈表)組成的雙向阻塞隊列,即可以從隊列的兩端插入和移除元素,支持FIFO和FILO。
相比于其他阻塞隊列,LinkedBlockingDeque多了addFirst、addLast、peekFirst、peekLast等方法。
LinkedBlockingDeque是可選容量的,默認容量大小為Integer.MAX_VALUE。
LinkedBlockingDequeConcurrentLinkedDeque類似,都是一種雙端隊列的結(jié)構(gòu),只不過LinkedBlockingDeque同時也是一種阻塞隊列。
注意:LinkedBlockingDeque底層利用ReentrantLock實現(xiàn)同步,并不像ConcurrentLinkedDeque那樣采用無鎖算法。
如何使用 LinkedBlockingDeque
- 1、并發(fā)場景下,需要作為雙端隊列使用時,如果只是作為 FIFO 隊列使用,則 LinkedBlockingQueue 的性能更高。
- 2、指定隊列的容量,以避免生產(chǎn)速率遠高于消費速率時資源耗盡的問題。
使用 LinkedBlockingDeque 的風(fēng)險
- 1、未指定容量的情況下,生產(chǎn)速率遠高于消費速率時,會導(dǎo)致內(nèi)存耗盡而 OOM。
- 2、高并發(fā)場景下,性能遠低于 LinkedBlockingQueue。
- 3、由于需要維持前后節(jié)點的鏈接,內(nèi)存消耗也高于 LinkedBlockingQueue。
2.1、內(nèi)部結(jié)構(gòu)
LinkedBlockingDeque內(nèi)部是雙鏈表的結(jié)構(gòu),結(jié)點Node的定義如下:
public class LinkedBlockingDeque<E>
extends AbstractQueue<E>
implements BlockingDeque<E>, java.io.Serializable {
/** 雙向鏈表節(jié)點 */
static final class Node<E> {
/**
* 節(jié)點元素,如果節(jié)點已經(jīng)被移除,則為 null
*/
E item;
/**
* 前驅(qū)結(jié)點指針.
*/
Node<E> prev;
/**
* 后驅(qū)結(jié)點指針.
*/
Node<E> next;
Node(E x) {
item = x;
}
}
}
2.2、成員屬性
public class LinkedBlockingDeque<E>
extends AbstractQueue<E>
implements BlockingDeque<E>, java.io.Serializable {
/**
* 頭結(jié)點
*/
transient Node<E> first;
/**
* 尾結(jié)點
*/
transient Node<E> last;
/** 隊列中個數(shù) */
private transient int count;
/** 隊列長度,可以使用構(gòu)造注入,如未設(shè)定,默認為無界隊列 */
private final int capacity;
/** 顯示鎖 */
final ReentrantLock lock = new ReentrantLock();
/** 消費隊列(隊列為空時,無法消費,線程阻塞) */
private final Condition notEmpty = lock.newCondition();
/** 生產(chǎn)隊列(隊列滿時,無法入隊,線程阻塞) */
private final Condition notFull = lock.newCondition();
}
2.3、構(gòu)造函數(shù)
LinkedBlockingDeque一共三種構(gòu)造器,不指定容量時,默認為Integer.MAX_VALUE:
public class LinkedBlockingDeque<E>
extends AbstractQueue<E>
implements BlockingDeque<E>, java.io.Serializable {
/**
* 默認構(gòu)造器.
*/
public LinkedBlockingDeque() {
this(Integer.MAX_VALUE);
}
/**
* 指定容量的構(gòu)造器.
*/
public LinkedBlockingDeque(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
}
/**
* 從已有集合構(gòu)造隊列.
*/
public LinkedBlockingDeque(Collection<? extends E> c) {
this(Integer.MAX_VALUE);
final ReentrantLock lock = this.lock;
lock.lock(); // Never contended, but necessary for visibility
try {
for (E e : c) {
if (e == null)
throw new NullPointerException();
if (!linkLast(new Node<E>(e)))
throw new IllegalStateException("Deque full");
}
} finally {
lock.unlock();
}
}
}
2.4、隊首入隊
初始:

隊首插入結(jié)點node:

- 1、將目標元素 e 添加到隊列頭部,如果隊列已滿,則阻塞等待有可用空間后重試
public class LinkedBlockingDeque<E>
extends AbstractQueue<E>
implements BlockingDeque<E>, java.io.Serializable {
/**
* 將目標元素 e 添加到隊列頭部,如果隊列已滿,則阻塞等待有可用空間后重試
*/
public void putFirst(E e) throws InterruptedException {
// 如果存入的值為null,直接拋出空指針異常
if (e == null) throw new NullPointerException();
Node<E> node = new Node<E>(e);
final ReentrantLock lock = this.lock;
//加鎖
lock.lock();
try {
// 嘗試在頭部添加元素
while (!linkFirst(node))
// 當前線程在非滿條件上等待
notFull.await();
} finally {
//釋放鎖
lock.unlock();
}
}
private boolean linkFirst(Node<E> node) {
// 隊列已滿,則直接返回 false
if (count >= capacity)
return false;
// 讀取頭節(jié)點
Node<E> f = first;
// 將舊頭結(jié)點鏈接到目標節(jié)點之后
node.next = f;
// 寫入新頭節(jié)點
first = node;
// 1)當前元素為第一個添加到隊列中的元素
if (last == null)
// 寫入尾節(jié)點
last = node;
else
// 將舊頭節(jié)點的前置節(jié)點設(shè)置為新頭結(jié)點
f.prev = node;
// 遞增計數(shù)
++count;
// 喚醒在非空條件上阻塞等待的線程來讀取元素
notEmpty.signal();
return true;
}
}
- 2、如果隊列已滿,則直接返回 false,否則將目標元素 e 添加到隊列頭部
public class LinkedBlockingDeque<E>
extends AbstractQueue<E>
implements BlockingDeque<E>, java.io.Serializable {
/**
* 如果隊列已滿,則直接返回 false,否則將目標元素 e 添加到隊列頭部
*/
public boolean offerFirst(E e) {
// 如果存入的值為null,直接拋出空指針異常
if (e == null) throw new NullPointerException();
Node<E> node = new Node<E>(e);
final ReentrantLock lock = this.lock;
//加鎖
lock.lock();
try {
// 嘗試在頭部添加元素
return linkFirst(node);
} finally {
//釋放鎖
lock.unlock();
}
}
}
- 3、在指定的超時時間內(nèi)嘗試將目標元素 e 添加到隊列頭部,成功則返回 true
public class LinkedBlockingDeque<E>
extends AbstractQueue<E>
implements BlockingDeque<E>, java.io.Serializable {
/**
* 在指定的超時時間內(nèi)嘗試將目標元素 e 添加到隊列頭部,成功則返回 true
*/
public boolean offerFirst(E e, long timeout, TimeUnit unit)
throws InterruptedException {
// 如果存入的值為null,直接拋出空指針異常
if (e == null) throw new NullPointerException();
Node<E> node = new Node<E>(e);
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
// 頭結(jié)點添加失敗
while (!linkFirst(node)) {
// 已經(jīng)超時則直接返回
if (nanos <= 0)
return false;
// 當前線程在非滿條件上阻塞等待,喚醒后再次嘗試添加
nanos = notFull.awaitNanos(nanos);
}
return true;
} finally {
lock.unlock();
}
}
}
2.4、隊尾入隊
初始:

隊尾插入結(jié)點node:

- 1、將目標元素 e 添加到隊列尾部,如果隊列已滿,則阻塞等待有可用空間后重試
public class LinkedBlockingDeque<E>
extends AbstractQueue<E>
implements BlockingDeque<E>, java.io.Serializable {
public void put(E e) throws InterruptedException {
putLast(e);
}
/**
* 將目標元素 e 添加到隊列尾部,如果隊列已滿,則阻塞等待有可用空間后重試
*/
public void putLast(E e) throws InterruptedException {
// 如果存入的值為null,直接拋出空指針異常
if (e == null) throw new NullPointerException();
Node<E> node = new Node<E>(e);
final ReentrantLock lock = this.lock;
//加鎖
lock.lock();
try {
// 嘗試將節(jié)點鏈接到隊列尾部
while (!linkLast(node))
// 隊列已滿,當前線程在非滿條件上阻塞等待,被喚醒后再次嘗試
notFull.await();
} finally {
//釋放鎖
lock.unlock();
}
}
private boolean linkLast(Node<E> node) {
// 隊列已滿,則直接返回 false
if (count >= capacity)
return false;
// 讀取尾節(jié)點
Node<E> l = last;
// 將目標節(jié)點鏈接到尾節(jié)點之后
node.prev = l;
// 寫入尾節(jié)點為新增節(jié)點
last = node;
// 1)當前元素是第一個加入隊列的元素
if (first == null)
// 寫入頭結(jié)點
first = node;
else
// 將舊尾節(jié)點的后置節(jié)點更新為新增節(jié)點
l.next = node;
// 遞增總數(shù)
++count;
// 喚醒在非空條件上等待的線程
notEmpty.signal();
return true;
}
}
- 2、如果隊列已滿,則直接返回 false,否則將目標元素 e 添加到隊列尾部
public class LinkedBlockingDeque<E>
extends AbstractQueue<E>
implements BlockingDeque<E>, java.io.Serializable {
public boolean offer(E e) {
return offerLast(e);
}
/**
* 如果隊列已滿,則直接返回 false,否則將目標元素 e 添加到隊列尾部
*/
public boolean offerLast(E e) {
// 如果存入的值為null,直接拋出空指針異常
if (e == null) throw new NullPointerException();
Node<E> node = new Node<E>(e);
final ReentrantLock lock = this.lock;
//加鎖
lock.lock();
try {
// 嘗試將節(jié)點鏈接到隊列尾部
return linkLast(node);
} finally {
//釋放鎖
lock.unlock();
}
}
}
- 3、在指定的超時時間內(nèi)嘗試將目標元素 e 添加到隊列尾部,成功則返回 true
public class LinkedBlockingDeque<E>
extends AbstractQueue<E>
implements BlockingDeque<E>, java.io.Serializable {
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
return offerLast(e, timeout, unit);
}
/**
* 在指定的超時時間內(nèi)嘗試將目標元素 e 添加到隊列尾部,成功則返回 true
*/
public boolean offerLast(E e, long timeout, TimeUnit unit)
throws InterruptedException {
// 如果存入的值為null,直接拋出空指針異常
if (e == null) throw new NullPointerException();
Node<E> node = new Node<E>(e);
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
//加鎖
lock.lockInterruptibly();
try {
// 嘗試將目標元素 e 添加到隊列尾部
while (!linkLast(node)) {
// 已經(jīng)超時則直接返回 false
if (nanos <= 0)
return false;
// 當前線程在非滿條件上阻塞等待,被喚醒后再次嘗試
nanos = notFull.awaitNanos(nanos);
}
return true;
} finally {
//釋放鎖
lock.unlock();
}
}
}
2.5、隊首出隊
初始:

刪除隊首結(jié)點:

- 1、移除并返回頭部節(jié)點,如果隊列為空,則阻塞等待有可用元素之后重試
public class LinkedBlockingDeque<E>
extends AbstractQueue<E>
implements BlockingDeque<E>, java.io.Serializable {
public E take() throws InterruptedException {
return takeFirst();
}
/**
* 移除并返回頭部節(jié)點,如果隊列為空,則阻塞等待有可用元素之后重試
*/
public E takeFirst() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
E x;
// 嘗試移除并返回頭部節(jié)點
while ( (x = unlinkFirst()) == null)
// 隊列為空,則阻塞等待有可用元素之后重試
notEmpty.await();
return x;
} finally {
lock.unlock();
}
}
}
- 2、如果隊列為空,則立即返回 null,否則移除并返回頭部元素
public class LinkedBlockingDeque<E>
extends AbstractQueue<E>
implements BlockingDeque<E>, java.io.Serializable {
public E poll() {
return pollFirst();
}
/**
* 如果隊列為空,則立即返回 null,否則移除并返回頭部元素
*/
public E pollFirst() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return unlinkFirst();
} finally {
lock.unlock();
}
}
/**
* 從隊首刪除一個元素, 失敗則返回null.
*/
private E unlinkFirst() {
// 獲取首節(jié)點
Node<E> f = first;
// 首節(jié)點為null,則返回null
if (f == null)
return null;
// 獲取首節(jié)點的后繼節(jié)點
Node<E> n = f.next;
// 移除first,將首節(jié)點更新為n
E item = f.item;
f.item = null;
f.next = f; // help GC
first = n;
// 移除首節(jié)點后,為空隊列
if (n == null)
last = null;
else
// 將新的首節(jié)點的前驅(qū)節(jié)點設(shè)置為null
n.prev = null;
--count;
// 喚醒阻塞在notFull上的線程
notFull.signal();
return item;
}
}
- 3、在指定的超時時間內(nèi)嘗試移除并返回頭部元素,如果已經(jīng)超時,則返回 null
public class LinkedBlockingDeque<E>
extends AbstractQueue<E>
implements BlockingDeque<E>, java.io.Serializable {
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
return pollFirst(timeout, unit);
}
/**
* 在指定的超時時間內(nèi)嘗試移除并返回頭部元素,如果已經(jīng)超時,則返回 null
*/
public E pollFirst(long timeout, TimeUnit unit)
throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
E x;
// 嘗試移除并返回頭部元素
while ( (x = unlinkFirst()) == null) {
// 已經(jīng)超時則返回 null
if (nanos <= 0)
return null;
// 當前線程在非空條件上阻塞等待,被喚醒后進行重試
nanos = notEmpty.awaitNanos(nanos);
}
// 移除成功則直接返回頭部元素
return x;
} finally {
lock.unlock();
}
}
}
2.6、隊尾出隊
初始:

刪除隊尾結(jié)點:

- 1、移除并返回尾部節(jié)點,如果隊列為空,則阻塞等待有可用元素之后重試
public class LinkedBlockingDeque<E>
extends AbstractQueue<E>
implements BlockingDeque<E>, java.io.Serializable {
/**
* 移除并返回尾部節(jié)點,如果隊列為空,則阻塞等待有可用元素之后重試
*/
public E takeLast() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
E x;
// 嘗試移除并返回尾部節(jié)點
while ( (x = unlinkLast()) == null)
// 隊列為空,則阻塞等待有可用元素之后重試
notEmpty.await();
return x;
} finally {
lock.unlock();
}
}
/**
* 從隊尾刪除一個元素, 失敗則返回null.
*/
private E unlinkLast() {
// 獲取尾節(jié)點
Node<E> l = last;
// 尾節(jié)點為null,則返回null
if (l == null)
return null;
// 獲取尾節(jié)點的前驅(qū)節(jié)點
Node<E> p = l.prev;
// 移除尾節(jié)點,將尾節(jié)點更新為p
E item = l.item;
l.item = null;
l.prev = l; // help GC
last = p;
// 移除尾節(jié)點后,為空隊列
if (p == null)
first = null;
else
// 將新的尾節(jié)點的后繼節(jié)點設(shè)置為null
p.next = null;
--count;
// 喚醒阻塞在notFull上的線程
notFull.signal();
return item;
}
}
- 2、如果隊列為空,則立即返回 null,否則移除并返回尾部元素
public class LinkedBlockingDeque<E>
extends AbstractQueue<E>
implements BlockingDeque<E>, java.io.Serializable {
/**
* 如果隊列為空,則拋出異常,否則移除并返回尾部元素
*/
public E removeLast() {
E x = pollLast();
if (x == null) throw new NoSuchElementException();
return x;
}
/**
* 如果隊列為空,則立即返回 null,否則移除并返回尾部元素
*/
public E pollLast() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
//從隊尾刪除一個元素, 失敗則返回null.
return unlinkLast();
} finally {
lock.unlock();
}
}
}
- 3、在指定的超時時間內(nèi)嘗試移除并返回尾部元素,如果已經(jīng)超時,則返回 null
public class LinkedBlockingDeque<E>
extends AbstractQueue<E>
implements BlockingDeque<E>, java.io.Serializable {
public E pollLast(long timeout, TimeUnit unit)
throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
E x;
// 嘗試移除并返回尾部元素
while ( (x = unlinkLast()) == null) {
// 已經(jīng)超時則返回 null
if (nanos <= 0)
return null;
// 當前線程在非空條件上阻塞等待,被喚醒后進行重試
nanos = notEmpty.awaitNanos(nanos);
}
// 移除成功則直接返回尾部元素
return x;
} finally {
lock.unlock();
}
}
}
三、LinkedBlockingQueue與LinkedBlockingDeque對比
LinkedBlockingQueue
- FIFO;
- 讀寫分開兩個ReentrantLock;
LinkedBlockingDeque
- FIFO & FILO;
- 全局一把ReentrantLock;
參考:
https://www.itzhai.com/articles/graphical-blocking-queue.html
https://www.cnblogs.com/zhuxudong/p/10079511.html