多線程并發(fā)編程12-LinkedBlockingQueue源碼剖析

? ? 前面的文章介紹了使用CAS算法實現(xiàn)的非阻塞有界隊列ConcurrentLinkedQueue(詳情點這里),今天介紹另一個并發(fā)隊列LinkedBlockingQueue。

? ??LinkedBlockingQueue是一個阻塞有界的隊列,使用單鏈表實現(xiàn),和ConcurrentLinkedQueue一樣也有兩個Node,分別存放首、尾節(jié)點,并且還有一個初始值為0的原子變量count,用來記錄隊列元素個數(shù)。還要兩個ReentrantLock對象,分別控制元素的入隊和出隊的原子性。另外,還有兩個條件變量,條件變量內部都有一個條件隊列用來存放進隊和出隊阻塞的線程,其實就是生產者-消費者模型。這些主要的成員變量如下:

transient Node<E> head;

private transient Node<E> last;

private final AtomicInteger count = new AtomicInteger();

private final ReentrantLock takeLock = new ReentrantLock();

private final ReentrantLock putLock = new ReentrantLock();

private final Condition notEmpty = takeLock.newCondition();

private final Condition notFull = putLock.newCondition();

? ? 下面通過源碼來看一看具體的LinkedBlockingQueue隊列的讀寫操作。

offer(E e)

? ? 向隊列尾部插入一個元素,如果隊列未達到指定容量則插入成功返回true,如果達到指定容量則丟棄當前元素然后返回false。

public boolean offer(E e) {

? ? if (e == null) throw new NullPointerException();

? ? final AtomicInteger count = this.count;

//(1)獲取當前元素個數(shù),判斷是否達到指定容量,是則返回false。

? ? if (count.get() == capacity)

? ? ? ? return false;

? ? int c = -1;

//(2)創(chuàng)建Node變量,并嘗試獲取putLock 鎖

? ? Node<E> node = new Node<E>(e);

? ? final ReentrantLock putLock = this.putLock;

? ? putLock.lock();

? ? try {

//(3)當前隊列元素小于容量大小則將Node插入到隊列尾部,并將記錄元素個數(shù)的原子變量count+1

? ? ? ? if (count.get() < capacity) {

? ? ? ? ? ? enqueue(node);

? ? ? ? ? ? c = count.getAndIncrement();

//(4)插入元素之后,隊列大小仍小于容量大小,則調用notFull.signal()通知阻塞的生產者可以繼續(xù)插入元素。

? ? ? ? ? ? if (c + 1 < capacity)

? ? ? ? ? ? ? ? notFull.signal();

? ? ? ? }

? ? } finally {

//(5)釋放鎖

? ? ? ? putLock.unlock();

? ? }

//(6)如果c等于0,則說明在插入這個元素之前隊列中沒有元素,隊列從無元素狀態(tài)切換到有元素狀態(tài),通知消費者可以進行消費獲取隊列中元素。

? ? if (c == 0)

? ? ? ? signalNotEmpty();

? ? return c >= 0;

}

? ? offer方法通過使用putLock鎖保證了在隊尾新增元素操作的原子性,進隊時只操作隊列尾節(jié)點,并在消費者-生產者模式中進行通知。在調用條件變量的方法時,要注意先獲取對應的鎖。

put(E e)

????向隊列隊尾插入一個元素,如果隊列中有空閑則插入后直接返回,如果隊列已滿則阻塞當前線程,直到隊列中有空閑插入成功后返回。線程阻塞時如果被其他線程設置了中斷標志,則該線程會拋出InterruptedException異常。

public void put(E e) throws InterruptedException {

? ? if (e == null) throw new NullPointerException();

? ? int c = -1;

//(1)創(chuàng)建Node變量,并嘗試獲取putLock鎖 ,獲取鎖是用的lockInterruptibly()方法會對中斷進行響應并拋出InterruptedException異常。

? ? Node<E> node = new Node<E>(e);

? ? final ReentrantLock putLock = this.putLock;

? ? final AtomicInteger count = this.count;

? ? putLock.lockInterruptibly();

? ? try {

//(2)如果當前隊列已滿則會調用notFull.await()方法阻塞當前線程,停止生產,即停止向隊列中插入元素。這里使用while判斷隊列是否已滿,是為了防止虛假喚醒。

? ? ? ? while (count.get() == capacity) {

? ? ? ? ? ? notFull.await();

? ? ? ? }

//(3)隊列未滿則在隊列尾部插入node,并將記錄元素的原子變量count+1。

? ? ? ? enqueue(node);

? ? ? ? c = count.getAndIncrement();

//(4)插入元素之后,隊列大小仍小于容量大小,則調用?notFull.signal()通知阻塞的生產者可以繼續(xù)插入元素。

? ? ? ? if (c + 1 < capacity)

? ? ? ? ? ? notFull.signal();

? ? } finally {

//(5)釋放鎖

? ? ? ? putLock.unlock();

? ? }

//(6)如果c等于0,則說明在插入這個元素之前隊列中沒有元素,隊列從無元素狀態(tài)切換到有元素狀態(tài),通知消費者可以進行消費獲取隊列中元素。

? ? if (c == 0)

? ? ? ? signalNotEmpty();

}

? ? put方法相對于offer方法,會在隊列已滿的條件下阻塞當前線程,其余操作和offer方法基本一致。

poll()

? ? 從隊列頭部獲取一個元素,并從隊列里移除該元素,如果隊列為空則返回null。

public E poll() {

? ? final AtomicInteger count = this.count;

//(1)如果隊列為空則直接返回null。

? ? if (count.get() == 0)

? ? ? ? return null;

? ? E x = null;

? ? int c = -1;

//(2)獲取takeLock 鎖。

? ? final ReentrantLock takeLock = this.takeLock;

? ? takeLock.lock();

? ? try {

//(3)如果當前隊列不為空,則將頭節(jié)點元素彈出,并將記錄隊列元素個數(shù)的原子變量count-1。

? ? ? ? if (count.get() > 0) {

? ? ? ? ? ? x = dequeue();

? ? ? ? ? ? c = count.getAndDecrement();

//(4)如果彈出元素之后隊列中還有元素,則調用notEmpty.signal()方法,通知阻塞消費者可以進行消費數(shù)據(jù),即可以從隊列中獲取原始。

? ? ? ? ? ? if (c > 1)

? ? ? ? ? ? ? ? notEmpty.signal();

? ? ? ? }

? ? } finally {

//(5)釋放鎖

? ? ? ? takeLock.unlock();

? ? }

//(6)如果c等于容量大小,則說明隊列狀態(tài)由已滿狀態(tài)轉換為有空閑狀態(tài),通知阻塞的生產者可以進行生產數(shù)據(jù),即向隊列中插入元素。

? ? if (c == capacity)

? ? ? ? signalNotFull();

? ? return x;

}

? ? poll方法獲取元素只操作隊列的頭節(jié)點。

peek()

? ? 獲取隊列頭部的元素但不從隊列中移除該元素,隊列為空的時候返回null。

public E peek() {

//(1)隊列為空則返回null。

? ? if (count.get() == 0)

? ? ? ? return null;

//(2)嘗試獲取takeLock鎖,獲取鎖成功后返回頭結點元素。

? ? final ReentrantLock takeLock = this.takeLock;

? ? takeLock.lock();

? ? try {

? ? ? ? Node<E> first = head.next;

//(3)這里有個細節(jié),判斷頭節(jié)點是否為空。

? ? ? ? if (first == null)

? ? ? ? ? ? return null;

? ? ? ? else

? ? ? ? ? ? return first.item;

? ? } finally {

//(4)釋放鎖。

? ? ? ? takeLock.unlock();

? ? }

}

take()

? ? 獲取隊列中頭節(jié)點元素并從隊列里移除該節(jié)點,如果隊列為空則會阻塞當前線程直到隊列中有元素。如果當前線程在阻塞時被其他線程設置了中斷標志,則會拋出InterruptedException異常。

public E take() throws InterruptedException {

? ? E x;

? ? int c = -1;

? ? final AtomicInteger count = this.count;

//(1)嘗試獲取takeLock 鎖,使用lockInterruptibly方法進行獲取,會對中斷進行響應并拋出InterruptedException異常。

? ? final ReentrantLock takeLock = this.takeLock;

? ? takeLock.lockInterruptibly();

? ? try {

//(2)如果隊列為空則阻塞當前線程,使用while來進行判斷隊列是否為空防止虛假喚醒。

? ? ? ? while (count.get() == 0) {

? ? ? ? ? ? notEmpty.await();

? ? ? ? }

//(3)隊列不為空則將頭節(jié)點移出隊列,并將記錄元素個數(shù)的原子變量count-1。

? ? ? ? x = dequeue();

? ? ? ? c = count.getAndDecrement();

//(4)如果彈出元素之后隊列中還有元素,則調用 notEmpty.signal()方法,通知阻塞消費者可以進行消費數(shù)據(jù),即可以從隊列中獲取原始。

? ? ? ? if (c > 1)

? ? ? ? ? ? notEmpty.signal();

? ? } finally {

//(5)釋放鎖。

? ? ? ? takeLock.unlock();

? ? }

//(6)如果c等于容量大小,則說明隊列狀態(tài)由已滿狀態(tài)轉換為有空閑狀態(tài),通知阻塞的生產者可以進行生產數(shù)據(jù),即向隊列中插入元素。

? ? if (c == capacity)

? ? ? ? signalNotFull();

? ? return x;

}? ?

? ? take方法想比與poll方法不同之處就是前者在隊列為空的時候會阻塞當前線程,其余邏輯一致。

remove(Object o)

? ? 從隊列中移除指定的元素,該方法先從隊列的頭節(jié)點遍歷找到指定的元素,找到指定的元素則將其從隊列中刪除,否則返回false。這里在遍歷的時候有可能有其他的線程往隊列里插入元素,所以需要獲取同時獲取putLock和takeLock兩把鎖。

public boolean remove(Object o) {

? ? if (o == null) return false;

//(1)獲取putLock鎖和takeLock鎖。

? ? fullyLock();

? ? try {

//(2)遍歷查找指定的元素 ,找到則刪除并返回true

? ? ? ? for (Node<E> trail = head, p = trail.next;

? ? ? ? ? ? p != null;

? ? ? ? ? ? trail = p, p = p.next) {

? ? ? ? ? ? if (o.equals(p.item)) {

//(3)從隊列中刪除指定的元素,如果在刪除前隊列已滿,則需要在刪除后喚醒生產者可以進生產消息。

? ? ? ? ? ? ? ? unlink(p, trail);

? ? ? ? ? ? ? ? return true;

? ? ? ? ? ? }

? ? ? ? }

? ? ? ? return false;

? ? } finally {

//(4)是否兩把鎖,要和獲取兩把鎖的順序想法。

? ? ? ? fullyUnlock();

? ? }

}

? ? 由于remove方法在刪除指定元素前加了兩把鎖,所以在遍歷和刪除過程都是線程安全的。在獲取多個資源鎖的順序要和釋放的順序相反。

size()

? ? 返回隊列中的元素個數(shù)。由于入隊和出隊都有加鎖操作,所有記錄元素的原子變量count的操作都是線程安全的,所以在size方法獲取count變量的時候就不需要加鎖了。

public int size() {

? ? return count.get();

}

今天的分享就到這,有看不明白的地方一定是我寫的不夠清楚,所有歡迎提任何問題以及改善方法。

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內容

友情鏈接更多精彩內容