
image
1、五種阻塞隊列介紹
- ArrayBlockingQueue
有界隊列,底層使用數組實現,并發(fā)控制使用ReentrantLock控制,不管是插入操作還是讀取操作,都需要獲取鎖之后才能執(zhí)行。 - LinkedBlockingQueue
底層基于單向鏈表實現,既可以當做有界隊列,也可以當做無界隊列使用。使用兩個ReentrantLock實現并發(fā)控制:takelock和putlock。 - SynchronousQueue
底層使用單向鏈表實現,只有一個元素,同步的意思是一個寫操作必須等到一個讀操作之后才返回,指的是讀寫線程的同步。 - PriorityBlockingQueue
帶排序的阻塞隊列的實現,使用數組進行實現。并發(fā)控制使用ReentrantLock,隊列為無界隊列。
有初始化參數指定隊列大小,但是會自動擴容。使用最小堆來實現排序。 - DelayedQueue
DelayedQueue是使用PriorityBlockingQueue和Delayed實現的,內部定義了一個優(yōu)先級隊列,當調用offer的時候,把Delayed對象加入隊列中,使用take先把first對象拿出來(peek),如果沒有到達閾值,進行await處理。
2、poll和peek的區(qū)別
都用于取隊列的頭結點,poll會刪除頭結點,peek不會刪除頭結點。
3、LinkedBlockingQueue
- 是先進先出隊列FIFO。
- 采用ReentrantLock保證線程安全
3.1、功能
3.1.1、增加
增加有三種方式,前提:隊列滿
| 方式 | put | add | offer |
|---|---|---|---|
| 特點 | 一直阻塞 | 拋異常 | 返回false |
3.1.2、刪除
刪除有三種方式,前提:隊列為空
| 方式 | remove | poll | take |
|---|---|---|---|
| 特點 | NoSuchElementException | 返回false | 阻塞 |
3.2、簡單分析
- LinkedBlockingQueue是一個阻塞隊列,內部由兩個ReentrantLock來實現出入隊列的線程安全,由各自的Condition對象的await和signal來實現等待和喚醒功能。
- 基于單向鏈表的、范圍任意的(其實是有界的)、FIFO 阻塞隊列。
- 頭結點和尾結點一開始總是指向一個哨兵的結點,它不持有實際數據,當隊列中有數據時,頭結點仍然指向這個哨兵,尾結點指向有效數據的最后一個結點。這樣做的好處在于,與計數器 count 結合后,對隊頭、隊尾的訪問可以獨立進行,而不需要判斷頭結點與尾結點的關系。

LinkedBlockingQueue繼承關系圖.png
3.2.1、節(jié)點與屬性
//鏈表節(jié)點內部類
static class Node<E> {
//節(jié)點元素
E item;
Node<E> next;
Node(E x) {
item = x;
}
}
//容量界限,如果未設定,則為Integer最大值
private final int capacity;
//當前元素個數
private final AtomicInteger count = new AtomicInteger();
//鏈表的頭:head.item == null
transient Node<E> head;
//鏈表的尾:last.next == null
private transient Node<E> last;
//take,poll等獲取鎖
private final ReentrantLock takeLock = new ReentrantLock();
//等待任務的等待隊列
private final Condition notEmpty = takeLock.newCondition();
//put,offer等插入鎖
private final ReentrantLock putLock = new ReentrantLock();
//等待插入的等待隊列
private final Condition notFull = putLock.newCondition();
3.2.2、插入線程與獲取線程的相互通知
signalNotEmpty()方法,在插入線程發(fā)現隊列為空時調用,告知獲取線程需要等待。
signalNotFull()方法,在獲取線程發(fā)現隊列已滿時調用,告知插入線程需要等待。
//表示等待take。put/offer調用,否則通常不會鎖定takeLock。
private void signalNotEmpty() {
//獲取takeLock
final ReentrantLock takeLock = this.takeLock;
//鎖定takeLock
takeLock.lock();
try {
//喚醒take線程等待隊列
notEmpty.signal();
} finally {
//釋放鎖
takeLock.unlock();
}
}
//表示等待put,take/poll 調用
private void signalNotFull() {
//獲取putLock
final ReentrantLock putLock = this.putLock;
//鎖定putLock
putLock.lock();
try {
//喚醒插入線程等待隊列
notFull.signal();
} finally {
//釋放鎖
putLock.unlock();
}
}
3.2.3、入隊與出隊操作
enqueue()方法只能在持有 putLock 鎖下執(zhí)行,dequeue()在持有 takeLock 鎖下執(zhí)行。
//在隊列尾部插入
private void enqueue(Node<E> node) {
// assert putLock.isHeldByCurrentThread();
// assert last.next == null;
//last.next指向當前node
//尾指針后移
last = last.next = node;
}
//移除隊列頭
private E dequeue() {
// assert takeLock.isHeldByCurrentThread();
// assert head.item == null;
//保存頭指針
Node<E> h = head;
//獲取當前鏈表第一個元素
Node<E> first = h.next;
//頭指針的next指向自己
h.next = h; // help GC
//頭指針指向第一個元素
head = first;
//獲取第一個元素的值
E x = first.item;
//將第一個元素的值置空
first.item = null;
//返回第一個元素的值
return x;
}
3.2.4、對兩把鎖的加鎖與釋放
在需要對兩把鎖同時加鎖時,把加鎖的順序與釋放的順序封裝成方法,確保所有地方都是一致的。而且獲取鎖時都是不響應中斷的,一直獲取直到加鎖成功,這就避免了第一把鎖加鎖成功,而第二把鎖加鎖失敗導致鎖不釋放的風險。
//鎖定putLock和takeLock
void fullyLock() {
putLock.lock();
takeLock.lock();
}
//與fullyLock的加鎖順序相反,先解鎖takeLock,再解鎖putLock
void fullyUnlock() {
takeLock.unlock();
putLock.unlock();
}
3.3、源碼解讀
簡單介紹一下LinkedBlockingQueue中API的源碼,如構造方法,新增,獲取,刪除,drainTo。
3.3.1、構造函數
LinkedBlockingQueue有三個構造方法,其中無參構造盡量少用,因為容量為Integer的最大值,操作不當會出現內存溢出。
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
public LinkedBlockingQueue(int capacity) {
//參數校驗
if (capacity <= 0) throw new IllegalArgumentException();
//設置容量
this.capacity = capacity;
//首尾節(jié)點指向一個空節(jié)點
last = head = new Node<E>(null);
}
public LinkedBlockingQueue(Collection<? extends E> c) {
this(Integer.MAX_VALUE);
//獲取putLock
final ReentrantLock putLock = this.putLock;
//鎖定
putLock.lock(); // Never contended, but necessary for visibility
try {
int n = 0;
for (E e : c) {
if (e == null)
throw new NullPointerException();
if (n == capacity)
throw new IllegalStateException("Queue full");
enqueue(new Node<E>(e));
++n;
}
count.set(n);
} finally {
putLock.unlock();
}
}
3.3.2、offer(E e)
將給定的元素設置到隊列中,如果設置成功返回true, 否則返回false。 e的值不能為空,否則拋出空指針異常。
//如果可以在不超過隊列容量的情況下立即插入指定的元素到隊列的尾部,成功后返回true,如果隊列已滿,返回false。當使用容量受限的隊列時,此方法通常比方法BlockingQueue#add更可取,后者只能通過拋出異常才能插入元素。
public boolean offer(E e) {
//非空判斷
if (e == null) throw new NullPointerException();
//計數器
final AtomicInteger count = this.count;
//如果隊列已滿,直接返回插入失敗
if (count.get() == capacity)
return false;
int c = -1;
//新建節(jié)點
Node<E> node = new Node<E>(e);
//獲取插入鎖
final ReentrantLock putLock = this.putLock;
//鎖定
putLock.lock();
try {
//如果隊列未滿
if (count.get() < capacity) {
//插入隊列
enqueue(node);
//計數
c = count.getAndIncrement();
//還有空余空間
if (c + 1 < capacity)
//喚醒插入線程
notFull.signal();
}
} finally {
//解鎖
putLock.unlock();
}
//如果隊列為空
if (c == 0)
//通知獲取線程阻塞
signalNotEmpty();
//返回成功或者插入失敗
return c >= 0;
}
3.3.3、put(E e)
將元素設置到隊列中,如果隊列中沒有多余的空間,該方法會一直阻塞,直到隊列中有多余的空間。
public void put(E e) throws InterruptedException {
//不可以插入空元素
if (e == null) throw new NullPointerException();
//所有put/take/etc中的約定都是預先設置本地var
//除非設置,否則保持計數為負數表示失敗。
int c = -1;
//新建節(jié)點
Node<E> node = new Node<E>(e);
//獲取putLock
final ReentrantLock putLock = this.putLock;
//獲取計數器
final AtomicInteger count = this.count;
//可中斷加鎖,即在鎖獲取過程中不處理中斷狀態(tài),而是直接拋出中斷異常,由上層調用者處理中斷。
putLock.lockInterruptibly();
try {
/*
* 注意count在wait守衛(wèi)線程中使用,即使它沒有被鎖保護。
* 這是因為count只能在此時減少(所有其他put都被鎖定關閉),
* 如果它從容量更改,我們(或其他一些等待put)將收到信號。
* 類似地,count在其他等待守衛(wèi)線程中的所有其他用途也是如此。
*/
//只要當前隊列已滿
while (count.get() == capacity) {
//通知插入線程等待
notFull.await();
}
//插入隊列
enqueue(node);
//數量加1
c = count.getAndIncrement();
//如果隊列增加1個元素還未滿
if (c + 1 < capacity)
//喚醒插入進程
notFull.signal();
} finally {
//解鎖
putLock.unlock();
}
//如果隊列中沒有元素了
if (c == 0)
//通知獲取線程等待
signalNotEmpty();
}
3.3.4、peek()
非阻塞的獲取隊列中的第一個元素,不出隊列。
public E peek() {
//隊列為空,直接返回
if (count.get() == 0)
return null;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
//獲取第一個元素,非哨兵
Node<E> first = head.next;
//元素為空,返回null
if (first == null)
return null;
else
//返回第一個元素值
return first.item;
} finally {
takeLock.unlock();
}
}
3.3.5、poll()
非阻塞的獲取隊列中的值,未獲取到返回null。
public E poll() {
final AtomicInteger count = this.count;
//隊列為空,直接返回
if (count.get() == 0)
return null;
E x = null;
int c = -1;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
//隊列非空,獲取隊列中元素
if (count.get() > 0) {
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
}
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
3.3.6、remove(Object o)
從隊列中移除指定的值。將兩把鎖都鎖定。
public boolean remove(Object o) {
//不支持null
if (o == null) return false;
//鎖定兩個鎖
fullyLock();
try {
//迭代隊列
for (Node<E> trail = head, p = trail.next;
p != null;
trail = p, p = p.next) {
//通過equals方法匹配待刪除元素
if (o.equals(p.item)) {
//移除p節(jié)點
unlink(p, trail);
//成功
return true;
}
}
//失敗
return false;
} finally {
//解鎖
fullyUnlock();
}
}
// 將內部節(jié)點p與前一個跟蹤斷開連接
void unlink(Node<E> p, Node<E> trail) {
// assert isFullyLocked();
// p.next is not changed, to allow iterators that are
// traversing p to maintain their weak-consistency guarantee.
//p節(jié)點內容置空
p.item = null;
//trail節(jié)點的next指向p的next
trail.next = p.next;
//如果p是隊尾
if (last == p)
//trail變?yōu)殛犖? last = trail;
//如果隊列已滿
if (count.getAndDecrement() == capacity)
//通知插入線程阻塞
notFull.signal();
}
3.3.7、clear()
清空隊列。
//原子性地從隊列中刪除所有元素。此調用返回后,隊列將為空。
public void clear() {
//鎖定
fullyLock();
try {
//清空數據,幫助垃圾回收
for (Node<E> p, h = head; (p = h.next) != null; h = p) {
h.next = h;
p.item = null;
}
head = last;
// assert head.item == null && head.next == null;
//如果容量為0
if (count.getAndSet(0) == capacity)
//喚醒插入線程
notFull.signal();
} finally {
//解鎖
fullyUnlock();
}
}
3.3.8、drainTo(Collection c)
將隊列中值,全部移除,并發(fā)設置到給定的集合中。
public int drainTo(Collection<? super E> c, int maxElements) {
//各種判斷
if (c == null)
throw new NullPointerException();
if (c == this)
throw new IllegalArgumentException();
if (maxElements <= 0)
return 0;
boolean signalNotFull = false;
//鎖
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
//獲取要轉移的數量
int n = Math.min(maxElements, count.get());
// count.get provides visibility to first n Nodes
Node<E> h = head;
int i = 0;
try {
//組裝集合
while (i < n) {
Node<E> p = h.next;
c.add(p.item);
p.item = null;
h.next = h;
h = p;
++i;
}
return n;
} finally {
// Restore invariants even if c.add() threw
if (i > 0) {
// assert h.item == null;
head = h;
signalNotFull = (count.getAndAdd(-i) == capacity);
}
}
} finally {
takeLock.unlock();
if (signalNotFull)
signalNotFull();
}
}

tencent.jpg