多線程并發(fā)編程14-PriorityBlockingQueue源碼剖析

? ? PriorityBlockingQueue是帶優(yōu)先級的無界阻塞隊列,每次出隊都返回優(yōu)先級最高或最低的元素。其內(nèi)部使用平衡二叉樹堆實現(xiàn)的,所以遍歷隊列元素不能保證有序性。默認(rèn)使用對象的compareTo方法進行比較,也可以自定義comparators。

? ??PriorityBlockingQueue內(nèi)部有一個數(shù)組用來存放隊列元素,在前面介紹的ArrayBlockingQueue類中也有一個數(shù)組存放隊列元素,為什么ArrayBlockingQueue是有界隊列而PriorityBlockingQueue是無界隊列呢?因為在PriorityBlockingQueue內(nèi)部會對存放隊列元素的數(shù)據(jù)進行擴容,擴容要保證只能一個線程進行,所以PriorityBlockingQueue內(nèi)部有一個自旋鎖?allocationSpinLock,其使用CAS操作來保證只有一個線程可以擴容。? ?PriorityBlockingQueue類中還有一個ReentrantLock對象鎖,隊列的讀寫操作需要獲取該對象。由于是無界隊列生成元素并不受限制,但是隊列為空時消費數(shù)據(jù)會被限制(阻塞),所以PriorityBlockingQueue內(nèi)部只有一個條件變量來實現(xiàn)消費模式。

? ??PriorityBlockingQueue內(nèi)部主要的成員變量:

private transient Object[]queue;

private transient volatile int allocationSpinLock;

private final ReentrantLocklock;

private final ConditionnotEmpty;

? ? 下面對主要函數(shù)原理進行講解。

offer(E e)

? ? offer方法向隊列中插入一個元素,由于是無界隊列,所以插入操作總是返回true。

public boolean offer(E e) {

? ? if (e == null)

? ? ? ? throw new NullPointerException();

//(1)嘗試獲取獨占鎖對象。

? ? final ReentrantLock lock = this.lock;

? ? lock.lock();

? ? int n, cap;

? ? Object[] array;

//(2)判斷隊列是否需要進行擴容。

? ? while ((n = size) >= (cap = (array = queue).length))

? ? ? ? tryGrow(array, cap);

? ? try {

? ? ? ? Comparator<? super E> cmp = comparator;

//(3)使用默認(rèn)對比器或自定義對比器進行建二叉樹堆

? ? ? ? if (cmp == null)

? ? ? ? ? ? siftUpComparable(n, e, array);

? ? ? ? else

? ? ? ? ? ? siftUpUsingComparator(n, e, array, cmp);

? ? ? ? size = n + 1;

//(4)通知因為隊列為空而阻塞的消費者可以進行獲取數(shù)據(jù)。

? ? ? ? notEmpty.signal();

? ? } finally {

//(5)釋放鎖。

? ? ? ? lock.unlock();

? ? }

? ? return true;

}

tryGrow(Object[] array, int oldCap)

? ? 對隊列進行擴容,使用自旋鎖和CAS算法保證只有一個線程能進行擴容。

private void tryGrow(Object[] array, int oldCap) {

//(1)釋放獲取的獨占鎖,在進行擴容的過程讓別的線程也可以獲取到該鎖。

? ? lock.unlock();?

? ? Object[] newArray = null;

//(2)CAS成功則進行擴容。

? ? if (allocationSpinLock == 0 &&

? ? ? ? UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? 0, 1)) {

? ? ? ? try {

//(3)進行擴容,oldGap<64則增加oldCap+2,否則增加oldCap的一半,并且容量最大值為MAX_ARRAY_SIZE。從這來看雖然隊列會進行擴容,但也不是無限擴容,嚴(yán)格來說也應(yīng)該算是有界的。

? ? ? ? ? ? int newCap = oldCap + ((oldCap < 64) ?

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? (oldCap + 2) : // grow faster if small

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? (oldCap >> 1));

? ? ? ? ? ? if (newCap - MAX_ARRAY_SIZE > 0) {? ? // possible overflow

? ? ? ? ? ? ? ? int minCap = oldCap + 1;

? ? ? ? ? ? ? ? if (minCap < 0 || minCap > MAX_ARRAY_SIZE)

? ? ? ? ? ? ? ? ? ? throw new OutOfMemoryError();

? ? ? ? ? ? ? ? newCap = MAX_ARRAY_SIZE;

? ? ? ? ? ? }

? ? ? ? ? ? if (newCap > oldCap && queue == array)

? ? ? ? ? ? ? ? newArray = new Object[newCap];

? ? ? ? } finally {

? ? //(4)擴完容之后將自旋鎖allocationSpinLock? 設(shè)置為0,允許下次進行擴容。

? ? ? ? ? ? allocationSpinLock = 0;

? ? ? ? }

? ? }
//(5)第一個線程CAS成功后,第二個線程會進入這段代碼,然后第二個線程會讓出cpu,盡量讓第一個線程獲取鎖,但這不保證一定可以。

? ? if (newArray == null) // back off if another thread is allocating

? ? ? ? Thread.yield();

//(6)獲取鎖,將原來隊列中的元素拷貝到擴容后的隊列中。

? ? lock.lock();

? ? if (newArray != null && queue == array) {

? ? ? ? queue = newArray;

? ? ? ? System.arraycopy(array, 0, newArray, 0, oldCap);

? ? }

}

poll()

? ? 獲取隊列內(nèi)部堆樹的根節(jié)點元素,如果隊列為空則返回null。

public E poll() {

//(1)嘗試獲取鎖。

? ? final ReentrantLock lock = this.lock;

? ? lock.lock();

? ? try {

//(2)獲取隊列的第一個元素,并整理二叉樹堆。

? ? ? ? return dequeue();

? ? } finally {

//(3)釋放鎖。

? ? ? ? lock.unlock();

? ? }

}

put()

? ? 由于是無界隊列,不需要阻塞,put方法內(nèi)部調(diào)用的offer方法,這就不進行贅述了。

public void put(E e) {

? ? offer(e); // never need to block

}

take()

? ? 獲取隊列內(nèi)部堆樹的根節(jié)點元素,如果隊列為空則阻塞。

public E take() throws InterruptedException {

//(1)嘗試獲取鎖對象,調(diào)用的是lockInterruptibly方法,所以在當(dāng)其他線程設(shè)置了中斷標(biāo)志,該線程會拋出InterruptedException異常。

? ? final ReentrantLock lock = this.lock;

? ? lock.lockInterruptibly();

? ? E result;

? ? try {

//(2)如果隊列為空,則阻塞,停止消費。

? ? ? ? while ( (result = dequeue()) == null)

? ? ? ? ? ? notEmpty.await();

? ? } finally {

//(3)釋放鎖。

? ? ? ? lock.unlock();

? ? }

? ? return result;

}

size()

? ??size方法需要獲取鎖,因為本類中的size變量沒有volatile變量修飾無法保證內(nèi)存的可見性。

public int size() {

? ? final ReentrantLock lock = this.lock;

? ? lock.lock();

? ? try {

? ? ? ? return size;

? ? } finally {

? ? ? ? lock.unlock();

? ? }

}

? ??PriorityBlockingQueue隊列在內(nèi)部使用二叉樹堆維護元素的優(yōu)先級,使用數(shù)組作為元素的存儲的數(shù)據(jù)結(jié)構(gòu),該數(shù)組可進行擴容,但是容量也是有限制的,使用CAS來保證擴容時的唯一性。

? ???今天的分享就到這,有看不明白的地方一定是我寫的不夠清楚,所有歡迎提任何問題以及改善方法。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容