LinkedBlockingQueue是用鏈表實現(xiàn)的FIFO隊列,該隊列大小默認是Integer.MAX_VALUE,可以認為是無界的,也可指定大小,使其有界?;阪湵淼年犃型ǔ1然跀到M的隊列有更高的吞吐量,但大多數并發(fā)場景下,實際的性能優(yōu)劣不易測量。
LinkedBlockingQueue通過一個啞鈴節(jié)點、takeLock和putLock、(AtomicInteger類型的)count,以及級聯(lián)通知實現(xiàn)了雙鎖隊列算法(的變體),提高了性能。
java.util包下的迭代器是fail-fast的,即當迭代器創(chuàng)建后,若外部通過非迭代器自身的方法修改集合的內容,會拋出ConcurrentModificationException異常;而java.util.concurrent包下的迭代器是弱一致性的,即當迭代器創(chuàng)建后,其他線程通過非迭代器自身的方法修改集合的內容并不會拋出異常。
1. LinkedBlockingQueue繼承關系圖

2. LinkedBlockingQueue源碼分析
2.1 內部類
// 底層單向鏈表的Node
static class Node<E> {
E item;
// 為了LinkedBlockingQueue.Itr的弱一致性,可能出現(xiàn)next指向當前節(jié)點自身的情況
Node<E> next;
Node(E x) { item = x; }
}
2.2 字段
// 隊列容量(未指定時默認是Integer.MAX_VALUE)
private final int capacity;
// 隊列中元素的個數
// 因為添加、獲取(刪除)操作使用各自的鎖,且兩種操作都會修改count
// 的值,因此使用AtomicInteger類型的count來保證原子性(和可見性)
private final AtomicInteger count = new AtomicInteger();
// 底層鏈表的頭節(jié)點
transient Node<E> head;
// 底層鏈表的尾節(jié)點
private transient Node<E> last;
// take、poll等方法使用的ReentrantLock和Condition
private final ReentrantLock takeLock = new ReentrantLock();
private final Condition notEmpty = takeLock.newCondition();
// put、offer等方法使用的ReentrantLock和Condition
private final ReentrantLock putLock = new ReentrantLock();
private final Condition notFull = putLock.newCondition();
2.3 三個構造方法
(1)LinkedBlockingQueue(int)
public LinkedBlockingQueue(int capacity) {
// 檢查capacity
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
// last和head初始時都指向該啞鈴節(jié)點,正是由于該節(jié)點,實現(xiàn)了添加元素和獲取(刪除)元素
// 兩種操作的解耦,使得LinkedBlockingQueue這個基于鏈表的阻塞隊列可以采用雙鎖隊列算法
last = head = new Node<E>(null);
}
(2)LinkedBlockingQueue()
public LinkedBlockingQueue() {
// 調用LinkedBlockingQueue(int)
this(Integer.MAX_VALUE);
}
(3)LinkedBlockingQueue(Collection)
public LinkedBlockingQueue(Collection<? extends E> c) {
// 調用LinkedBlockingQueue(int)
this(Integer.MAX_VALUE);
// 要(在隊尾)添加元素,所以使用putLock鎖
final ReentrantLock putLock = this.putLock;
// 實例化時不存在多線程競爭,這里上鎖是為了可見性
putLock.lock();
try {
int n = 0;
for (E e : c) {
// e不能為null
if (e == null)
throw new NullPointerException();
// 注意n從0開始
if (n == capacity)
throw new IllegalStateException("Queue full");
enqueue(new Node<E>(e)); // 入隊
++n;
}
// 更新count.value的值
count.set(n);
} finally {
putLock.unlock();
}
}
2.4 輔助方法
(1)enqueue
// 將node添加到隊列末尾
private void enqueue(Node<E> node) {
// 該語句等價轉化為:
// last.next = node;
// last = last.next;
last = last.next = node;
}
(2)dequeue
// 從頭部刪除一個節(jié)點
private E dequeue() {
// 注意:head指向的是啞鈴節(jié)點
Node<E> h = head;
Node<E> first = h.next;
// 不直接將h.next置為null是為了LinkedBlockingQueue.Itr的弱一致性
h.next = h; // help GC
head = first;
// 獲取first.item作為返回值
E x = first.item;
// 將first.item置為null,作為新的啞鈴節(jié)點
first.item = null;
return x;
}
(3)signalNotEmpty
private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
// 先獲取takeLock鎖
// notEmpty.signal中會判斷當前線程是否持有鎖,所以要調用
// notEmpty的signal方法,必須持有takeLock鎖,否則會拋出異常
takeLock.lock();
try {
// 調用notEmpty.signal后,會將條件隊列中第一個調用了notEmpty.await或awaitNanos(nanos)
// 的線程對應的Node轉移到同步隊列中,前驅節(jié)點調用unpark后才算真正喚醒該線程
notEmpty.signal();
} finally {
takeLock.unlock();
}
}
(4)signalNotFull
// 與signalNotEmpty類似
private void signalNotFull() {
final ReentrantLock putLock = this.putLock;
// 先獲取putLock鎖
putLock.lock();
try {
notFull.signal();
} finally {
putLock.unlock();
}
}
(5)fullyLock
// 上鎖
void fullyLock() {
putLock.lock();
takeLock.lock();
}
(6)fullyUnlock
// 解鎖
// 以一種順序上鎖,以相反的順序釋放鎖,可避免死鎖
// (這里上鎖順序或解鎖順序顛倒也不會出現(xiàn)死鎖)
void fullyUnlock() {
takeLock.unlock();
putLock.unlock();
}
2.5 添加元素
(1)put-無限期阻塞版
public void put(E e) throws InterruptedException {
// e不能為null
if (e == null) throw new NullPointerException();
// 約定put、take等方法中將c初始化為-1,表示入隊失敗
int c = -1;
// 將e封裝成Node
Node<E> node = new Node<E>(e);
// 要(在隊尾)添加元素,所以使用putLock鎖
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
// 獲取count.value的值,若為capacity則阻塞當前線程
// 可能出現(xiàn)當前線程被喚醒后隊列大小仍是capacity的情況,當前線程會再次阻塞
while (count.get() == capacity) {
notFull.await();
}
// 入隊
enqueue(node);
// 注意:getAndIncrement中會以CAS的方式將count.value加1并返回count.value原來的值
c = count.getAndIncrement();
// 注意c是本次入隊前count.value的值
// 這兩行語句算是級聯(lián)通知結構的一部分,若不采用級聯(lián)通知,即將這兩行語句替換為
// signalNotEmpty();則每次入隊都要獲取takeLock,調用notEmpty.signal,極大降低了性能
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
// c為0表示隊列中有一個元素
if (c == 0)
// 為了降低添加元素相關操作對takeLock的依賴,采用了級聯(lián)通知
// 級聯(lián)通知:
// 只要隊列中有一個元素,就通過signalNotEmpty獲取takeLock鎖,并調用notEmpty.signal
// 將一個消費者線程(若存在的話)加入同步隊列(此時可能沒有消費者線程,這種情況下,后
// 面過來的消費者線程是不會阻塞的,因為隊列中已經有元素了、也可能有多個阻塞著的消費
// 者線程),后面過來的線程和同步隊列中的線程競爭鎖,競爭到鎖的線程會執(zhí)行出隊操作,
// 若隊列不為空,該線程會調用notEmpty.signal,如此往復......若隊列為空了,消費者線程
// 會阻塞,又會通過這里的signalNotEmpty進行處理。
signalNotEmpty();
}
(2)offer(E)-非阻塞版
public boolean offer(E e) {
// e不能為null(略)
final AtomicInteger count = this.count;
// 先獲取count.value的值,嘗試判斷一下
if (count.get() == capacity)
return false;
int c = -1;
// 與put中相同的代碼不再貼出
// 當其他線程在上面if之后又添加元素時再次獲取的
// count.value就為capacity,條件為false,c為-1
if (count.get() < capacity) {
// 入隊
enqueue(node);
// 再次強調:c是count.value入隊前的值
c = count.getAndIncrement();
// 級聯(lián)通知相關
if (c + 1 < capacity)
notFull.signal();
}
// 若c仍為-1,表示入隊失敗,返回false,否則返回true
return c >= 0;
}
(3)offer(E,long,TimeUnit)-限期阻塞版
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
// 與put中相同的代碼不再貼出
// 轉化為納秒數
long nanos = unit.toNanos(timeout);
while (count.get() == capacity) {
// 隊列仍是滿的,但阻塞時長已到,在這里跳出
if (nanos <= 0)
return false;
// 阻塞指定時長,也可能被提前喚醒
nanos = notFull.awaitNanos(nanos);
}
return true;
}
2.6 獲取(刪除)元素
(1)take-阻塞版
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
// 要(在隊t頭)刪除元素,所以使用takeLock鎖
takeLock.lockInterruptibly();
try {
// 隊列為空則阻塞
while (count.get() == 0) {
notEmpty.await();
}
// 出隊
x = dequeue();
// 強調:c是count.value入隊前的值
c = count.getAndDecrement();
// c為1說明經過上面的出隊操作隊列已空
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
// c為capacity說明當前隊列中的元素個數是capacity-1
if (c == capacity)
// 為了降低添加元素相關操作對putLock的依賴,采用了級聯(lián)通知
// 與put的級聯(lián)通知是相對應的,不再介紹
signalNotFull();
return x;
}
poll()-非阻塞版和poll(long,TimeUnit)-限期阻塞版與offer(E)-非阻塞版和offer(E,long,TimeUnit)-限期阻塞版是相對應的,不再介紹。
(2)remove-直接刪除元素
// 注意take、poll等方法是獲取元素,remove是直接刪除隊列中第一個滿足o.equals(p.item)的元素
public boolean remove(Object o) {
// 隊列中的元素不允許為null,所以這里o也不能為null
if (o == null) return false;
// 因為要操作的是整個隊列,所以使用雙鎖
fullyLock();
try {
// 遍歷鏈表
// trail是p的前一個節(jié)點,trail初始為啞鈴節(jié)點
for (Node<E> trail = head, p = trail.next;
p != null;
trail = p, p = p.next) {
if (o.equals(p.item)) {
// 找到滿足o.equals(p.item)的節(jié)點,斷開該節(jié)點
unlink(p, trail);
// 已移除一個,返回true
return true;
}
}
// 未找到匹配的節(jié)點,返回false
return false;
} finally {
// 釋放雙鎖
fullyUnlock();
}
}
void unlink(Node<E> p, Node<E> trail) {
// 為了LinkedBlockingQueue.Itr的弱一致性,不將p.next置為null
p.item = null;
// 進行連接
trail.next = p.next;
// p是否是最后一個節(jié)點
if (last == p)
last = trail;
// 若隊列中元素個數為capacity-1,則進行級聯(lián)通知
if (count.getAndDecrement() == capacity)
notFull.signal();
}
2.7 查看元素
public E peek() {
// 條件成立說明隊列為空
if (count.get() == 0)
return null;
final ReentrantLock takeLock = this.takeLock;
// 要查看對頭元素的值,因此使用takeLock
takeLock.lock();
try {
// 獲取頭節(jié)點的下一個節(jié)點
// 注意頭節(jié)點是啞鈴節(jié)點
Node<E> first = head.next;
// 當在上面if之后,其他線程將隊列中的元素取完時,這里first就為null
if (first == null)
return null;
else
return first.item;
} finally {
takeLock.unlock();
}
}
2.8 弱一致性迭代器
下面通過兩個例子對LinkedBlockingQueue.Itr.nextNode中的條件進行復現(xiàn),留作參考。
(1)nextNode中"s == p"為true的例子
public static void main(String[] args) throws InterruptedException {
LinkedBlockingQueue<String> lbq = new LinkedBlockingQueue(3);
lbq.put("a");
lbq.put("b");
lbq.put("c");
Iterator<String> iterator = lbq.iterator();
lbq.take();
lbq.take();
// 此處設斷點,進入nextNode方法,會發(fā)現(xiàn)s == p成立
iterator.next();
}
(2)nextNode中"s != null && s.item == null"為true的例子
public static void main(String[] args) throws InterruptedException {
LinkedBlockingQueue<String> lbq = new LinkedBlockingQueue(3);
lbq.put("a");
lbq.put("b");
lbq.put("c");
Iterator<String> iterator = lbq.iterator();
lbq.remove("a");
lbq.take();
// 此處設斷點調試,進入nextNode方法,會
// 發(fā)現(xiàn)s != null && s.item == null成立
iterator.next();
}