? ? 前面的文章介紹了使用CAS算法實現(xiàn)的非阻塞有界隊列ConcurrentLinkedQueue(詳情點這里),今天介紹另一個并發(fā)隊列LinkedBlockingQueue。
? ??LinkedBlockingQueue是一個阻塞有界的隊列,使用單鏈表實現(xiàn),和ConcurrentLinkedQueue一樣也有兩個Node,分別存放首、尾節(jié)點,并且還有一個初始值為0的原子變量count,用來記錄隊列元素個數(shù)。還要兩個ReentrantLock對象,分別控制元素的入隊和出隊的原子性。另外,還有兩個條件變量,條件變量內部都有一個條件隊列用來存放進隊和出隊阻塞的線程,其實就是生產者-消費者模型。這些主要的成員變量如下:
transient Node<E> head;
private transient Node<E> last;
private final AtomicInteger count = new AtomicInteger();
private final ReentrantLock takeLock = new ReentrantLock();
private final ReentrantLock putLock = new ReentrantLock();
private final Condition notEmpty = takeLock.newCondition();
private final Condition notFull = putLock.newCondition();
? ? 下面通過源碼來看一看具體的LinkedBlockingQueue隊列的讀寫操作。
offer(E e)
? ? 向隊列尾部插入一個元素,如果隊列未達到指定容量則插入成功返回true,如果達到指定容量則丟棄當前元素然后返回false。
public boolean offer(E e) {
? ? if (e == null) throw new NullPointerException();
? ? final AtomicInteger count = this.count;
//(1)獲取當前元素個數(shù),判斷是否達到指定容量,是則返回false。
? ? if (count.get() == capacity)
? ? ? ? return false;
? ? int c = -1;
//(2)創(chuàng)建Node變量,并嘗試獲取putLock 鎖
? ? Node<E> node = new Node<E>(e);
? ? final ReentrantLock putLock = this.putLock;
? ? putLock.lock();
? ? try {
//(3)當前隊列元素小于容量大小則將Node插入到隊列尾部,并將記錄元素個數(shù)的原子變量count+1
? ? ? ? if (count.get() < capacity) {
? ? ? ? ? ? enqueue(node);
? ? ? ? ? ? c = count.getAndIncrement();
//(4)插入元素之后,隊列大小仍小于容量大小,則調用notFull.signal()通知阻塞的生產者可以繼續(xù)插入元素。
? ? ? ? ? ? if (c + 1 < capacity)
? ? ? ? ? ? ? ? notFull.signal();
? ? ? ? }
? ? } finally {
//(5)釋放鎖
? ? ? ? putLock.unlock();
? ? }
//(6)如果c等于0,則說明在插入這個元素之前隊列中沒有元素,隊列從無元素狀態(tài)切換到有元素狀態(tài),通知消費者可以進行消費獲取隊列中元素。
? ? if (c == 0)
? ? ? ? signalNotEmpty();
? ? return c >= 0;
}
? ? offer方法通過使用putLock鎖保證了在隊尾新增元素操作的原子性,進隊時只操作隊列尾節(jié)點,并在消費者-生產者模式中進行通知。在調用條件變量的方法時,要注意先獲取對應的鎖。
put(E e)
????向隊列隊尾插入一個元素,如果隊列中有空閑則插入后直接返回,如果隊列已滿則阻塞當前線程,直到隊列中有空閑插入成功后返回。線程阻塞時如果被其他線程設置了中斷標志,則該線程會拋出InterruptedException異常。
public void put(E e) throws InterruptedException {
? ? if (e == null) throw new NullPointerException();
? ? int c = -1;
//(1)創(chuàng)建Node變量,并嘗試獲取putLock鎖 ,獲取鎖是用的lockInterruptibly()方法會對中斷進行響應并拋出InterruptedException異常。
? ? Node<E> node = new Node<E>(e);
? ? final ReentrantLock putLock = this.putLock;
? ? final AtomicInteger count = this.count;
? ? putLock.lockInterruptibly();
? ? try {
//(2)如果當前隊列已滿則會調用notFull.await()方法阻塞當前線程,停止生產,即停止向隊列中插入元素。這里使用while判斷隊列是否已滿,是為了防止虛假喚醒。
? ? ? ? while (count.get() == capacity) {
? ? ? ? ? ? notFull.await();
? ? ? ? }
//(3)隊列未滿則在隊列尾部插入node,并將記錄元素的原子變量count+1。
? ? ? ? enqueue(node);
? ? ? ? c = count.getAndIncrement();
//(4)插入元素之后,隊列大小仍小于容量大小,則調用?notFull.signal()通知阻塞的生產者可以繼續(xù)插入元素。
? ? ? ? if (c + 1 < capacity)
? ? ? ? ? ? notFull.signal();
? ? } finally {
//(5)釋放鎖
? ? ? ? putLock.unlock();
? ? }
//(6)如果c等于0,則說明在插入這個元素之前隊列中沒有元素,隊列從無元素狀態(tài)切換到有元素狀態(tài),通知消費者可以進行消費獲取隊列中元素。
? ? if (c == 0)
? ? ? ? signalNotEmpty();
}
? ? put方法相對于offer方法,會在隊列已滿的條件下阻塞當前線程,其余操作和offer方法基本一致。
poll()
? ? 從隊列頭部獲取一個元素,并從隊列里移除該元素,如果隊列為空則返回null。
public E poll() {
? ? final AtomicInteger count = this.count;
//(1)如果隊列為空則直接返回null。
? ? if (count.get() == 0)
? ? ? ? return null;
? ? E x = null;
? ? int c = -1;
//(2)獲取takeLock 鎖。
? ? final ReentrantLock takeLock = this.takeLock;
? ? takeLock.lock();
? ? try {
//(3)如果當前隊列不為空,則將頭節(jié)點元素彈出,并將記錄隊列元素個數(shù)的原子變量count-1。
? ? ? ? if (count.get() > 0) {
? ? ? ? ? ? x = dequeue();
? ? ? ? ? ? c = count.getAndDecrement();
//(4)如果彈出元素之后隊列中還有元素,則調用notEmpty.signal()方法,通知阻塞消費者可以進行消費數(shù)據(jù),即可以從隊列中獲取原始。
? ? ? ? ? ? if (c > 1)
? ? ? ? ? ? ? ? notEmpty.signal();
? ? ? ? }
? ? } finally {
//(5)釋放鎖
? ? ? ? takeLock.unlock();
? ? }
//(6)如果c等于容量大小,則說明隊列狀態(tài)由已滿狀態(tài)轉換為有空閑狀態(tài),通知阻塞的生產者可以進行生產數(shù)據(jù),即向隊列中插入元素。
? ? if (c == capacity)
? ? ? ? signalNotFull();
? ? return x;
}
? ? poll方法獲取元素只操作隊列的頭節(jié)點。
peek()
? ? 獲取隊列頭部的元素但不從隊列中移除該元素,隊列為空的時候返回null。
public E peek() {
//(1)隊列為空則返回null。
? ? if (count.get() == 0)
? ? ? ? return null;
//(2)嘗試獲取takeLock鎖,獲取鎖成功后返回頭結點元素。
? ? final ReentrantLock takeLock = this.takeLock;
? ? takeLock.lock();
? ? try {
? ? ? ? Node<E> first = head.next;
//(3)這里有個細節(jié),判斷頭節(jié)點是否為空。
? ? ? ? if (first == null)
? ? ? ? ? ? return null;
? ? ? ? else
? ? ? ? ? ? return first.item;
? ? } finally {
//(4)釋放鎖。
? ? ? ? takeLock.unlock();
? ? }
}
take()
? ? 獲取隊列中頭節(jié)點元素并從隊列里移除該節(jié)點,如果隊列為空則會阻塞當前線程直到隊列中有元素。如果當前線程在阻塞時被其他線程設置了中斷標志,則會拋出InterruptedException異常。
public E take() throws InterruptedException {
? ? E x;
? ? int c = -1;
? ? final AtomicInteger count = this.count;
//(1)嘗試獲取takeLock 鎖,使用lockInterruptibly方法進行獲取,會對中斷進行響應并拋出InterruptedException異常。
? ? final ReentrantLock takeLock = this.takeLock;
? ? takeLock.lockInterruptibly();
? ? try {
//(2)如果隊列為空則阻塞當前線程,使用while來進行判斷隊列是否為空防止虛假喚醒。
? ? ? ? while (count.get() == 0) {
? ? ? ? ? ? notEmpty.await();
? ? ? ? }
//(3)隊列不為空則將頭節(jié)點移出隊列,并將記錄元素個數(shù)的原子變量count-1。
? ? ? ? x = dequeue();
? ? ? ? c = count.getAndDecrement();
//(4)如果彈出元素之后隊列中還有元素,則調用 notEmpty.signal()方法,通知阻塞消費者可以進行消費數(shù)據(jù),即可以從隊列中獲取原始。
? ? ? ? if (c > 1)
? ? ? ? ? ? notEmpty.signal();
? ? } finally {
//(5)釋放鎖。
? ? ? ? takeLock.unlock();
? ? }
//(6)如果c等于容量大小,則說明隊列狀態(tài)由已滿狀態(tài)轉換為有空閑狀態(tài),通知阻塞的生產者可以進行生產數(shù)據(jù),即向隊列中插入元素。
? ? if (c == capacity)
? ? ? ? signalNotFull();
? ? return x;
}? ?
? ? take方法想比與poll方法不同之處就是前者在隊列為空的時候會阻塞當前線程,其余邏輯一致。
remove(Object o)
? ? 從隊列中移除指定的元素,該方法先從隊列的頭節(jié)點遍歷找到指定的元素,找到指定的元素則將其從隊列中刪除,否則返回false。這里在遍歷的時候有可能有其他的線程往隊列里插入元素,所以需要獲取同時獲取putLock和takeLock兩把鎖。
public boolean remove(Object o) {
? ? if (o == null) return false;
//(1)獲取putLock鎖和takeLock鎖。
? ? fullyLock();
? ? try {
//(2)遍歷查找指定的元素 ,找到則刪除并返回true
? ? ? ? for (Node<E> trail = head, p = trail.next;
? ? ? ? ? ? p != null;
? ? ? ? ? ? trail = p, p = p.next) {
? ? ? ? ? ? if (o.equals(p.item)) {
//(3)從隊列中刪除指定的元素,如果在刪除前隊列已滿,則需要在刪除后喚醒生產者可以進生產消息。
? ? ? ? ? ? ? ? unlink(p, trail);
? ? ? ? ? ? ? ? return true;
? ? ? ? ? ? }
? ? ? ? }
? ? ? ? return false;
? ? } finally {
//(4)是否兩把鎖,要和獲取兩把鎖的順序想法。
? ? ? ? fullyUnlock();
? ? }
}
? ? 由于remove方法在刪除指定元素前加了兩把鎖,所以在遍歷和刪除過程都是線程安全的。在獲取多個資源鎖的順序要和釋放的順序相反。
size()
? ? 返回隊列中的元素個數(shù)。由于入隊和出隊都有加鎖操作,所有記錄元素的原子變量count的操作都是線程安全的,所以在size方法獲取count變量的時候就不需要加鎖了。
public int size() {
? ? return count.get();
}
今天的分享就到這,有看不明白的地方一定是我寫的不夠清楚,所有歡迎提任何問題以及改善方法。