? ? PriorityBlockingQueue是帶優(yōu)先級的無界阻塞隊列,每次出隊都返回優(yōu)先級最高或最低的元素。其內(nèi)部使用平衡二叉樹堆實現(xiàn)的,所以遍歷隊列元素不能保證有序性。默認(rèn)使用對象的compareTo方法進行比較,也可以自定義comparators。
? ??PriorityBlockingQueue內(nèi)部有一個數(shù)組用來存放隊列元素,在前面介紹的ArrayBlockingQueue類中也有一個數(shù)組存放隊列元素,為什么ArrayBlockingQueue是有界隊列而PriorityBlockingQueue是無界隊列呢?因為在PriorityBlockingQueue內(nèi)部會對存放隊列元素的數(shù)據(jù)進行擴容,擴容要保證只能一個線程進行,所以PriorityBlockingQueue內(nèi)部有一個自旋鎖?allocationSpinLock,其使用CAS操作來保證只有一個線程可以擴容。? ?PriorityBlockingQueue類中還有一個ReentrantLock對象鎖,隊列的讀寫操作需要獲取該對象。由于是無界隊列生成元素并不受限制,但是隊列為空時消費數(shù)據(jù)會被限制(阻塞),所以PriorityBlockingQueue內(nèi)部只有一個條件變量來實現(xiàn)消費模式。
? ??PriorityBlockingQueue內(nèi)部主要的成員變量:
private transient Object[]queue;
private transient volatile int allocationSpinLock;
private final ReentrantLocklock;
private final ConditionnotEmpty;
? ? 下面對主要函數(shù)原理進行講解。
offer(E e)
? ? offer方法向隊列中插入一個元素,由于是無界隊列,所以插入操作總是返回true。
public boolean offer(E e) {
? ? if (e == null)
? ? ? ? throw new NullPointerException();
//(1)嘗試獲取獨占鎖對象。
? ? final ReentrantLock lock = this.lock;
? ? lock.lock();
? ? int n, cap;
? ? Object[] array;
//(2)判斷隊列是否需要進行擴容。
? ? while ((n = size) >= (cap = (array = queue).length))
? ? ? ? tryGrow(array, cap);
? ? try {
? ? ? ? Comparator<? super E> cmp = comparator;
//(3)使用默認(rèn)對比器或自定義對比器進行建二叉樹堆
? ? ? ? if (cmp == null)
? ? ? ? ? ? siftUpComparable(n, e, array);
? ? ? ? else
? ? ? ? ? ? siftUpUsingComparator(n, e, array, cmp);
? ? ? ? size = n + 1;
//(4)通知因為隊列為空而阻塞的消費者可以進行獲取數(shù)據(jù)。
? ? ? ? notEmpty.signal();
? ? } finally {
//(5)釋放鎖。
? ? ? ? lock.unlock();
? ? }
? ? return true;
}
tryGrow(Object[] array, int oldCap)
? ? 對隊列進行擴容,使用自旋鎖和CAS算法保證只有一個線程能進行擴容。
private void tryGrow(Object[] array, int oldCap) {
//(1)釋放獲取的獨占鎖,在進行擴容的過程讓別的線程也可以獲取到該鎖。
? ? lock.unlock();?
? ? Object[] newArray = null;
//(2)CAS成功則進行擴容。
? ? if (allocationSpinLock == 0 &&
? ? ? ? UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? 0, 1)) {
? ? ? ? try {
//(3)進行擴容,oldGap<64則增加oldCap+2,否則增加oldCap的一半,并且容量最大值為MAX_ARRAY_SIZE。從這來看雖然隊列會進行擴容,但也不是無限擴容,嚴(yán)格來說也應(yīng)該算是有界的。
? ? ? ? ? ? int newCap = oldCap + ((oldCap < 64) ?
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? (oldCap + 2) : // grow faster if small
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? (oldCap >> 1));
? ? ? ? ? ? if (newCap - MAX_ARRAY_SIZE > 0) {? ? // possible overflow
? ? ? ? ? ? ? ? int minCap = oldCap + 1;
? ? ? ? ? ? ? ? if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
? ? ? ? ? ? ? ? ? ? throw new OutOfMemoryError();
? ? ? ? ? ? ? ? newCap = MAX_ARRAY_SIZE;
? ? ? ? ? ? }
? ? ? ? ? ? if (newCap > oldCap && queue == array)
? ? ? ? ? ? ? ? newArray = new Object[newCap];
? ? ? ? } finally {
? ? //(4)擴完容之后將自旋鎖allocationSpinLock? 設(shè)置為0,允許下次進行擴容。
? ? ? ? ? ? allocationSpinLock = 0;
? ? ? ? }
? ? }
//(5)第一個線程CAS成功后,第二個線程會進入這段代碼,然后第二個線程會讓出cpu,盡量讓第一個線程獲取鎖,但這不保證一定可以。? ? if (newArray == null) // back off if another thread is allocating
? ? ? ? Thread.yield();
//(6)獲取鎖,將原來隊列中的元素拷貝到擴容后的隊列中。
? ? lock.lock();
? ? if (newArray != null && queue == array) {
? ? ? ? queue = newArray;
? ? ? ? System.arraycopy(array, 0, newArray, 0, oldCap);
? ? }
}
poll()
? ? 獲取隊列內(nèi)部堆樹的根節(jié)點元素,如果隊列為空則返回null。
public E poll() {
//(1)嘗試獲取鎖。
? ? final ReentrantLock lock = this.lock;
? ? lock.lock();
? ? try {
//(2)獲取隊列的第一個元素,并整理二叉樹堆。
? ? ? ? return dequeue();
? ? } finally {
//(3)釋放鎖。
? ? ? ? lock.unlock();
? ? }
}
put()
? ? 由于是無界隊列,不需要阻塞,put方法內(nèi)部調(diào)用的offer方法,這就不進行贅述了。
public void put(E e) {
? ? offer(e); // never need to block
}
take()
? ? 獲取隊列內(nèi)部堆樹的根節(jié)點元素,如果隊列為空則阻塞。
public E take() throws InterruptedException {
//(1)嘗試獲取鎖對象,調(diào)用的是lockInterruptibly方法,所以在當(dāng)其他線程設(shè)置了中斷標(biāo)志,該線程會拋出InterruptedException異常。
? ? final ReentrantLock lock = this.lock;
? ? lock.lockInterruptibly();
? ? E result;
? ? try {
//(2)如果隊列為空,則阻塞,停止消費。
? ? ? ? while ( (result = dequeue()) == null)
? ? ? ? ? ? notEmpty.await();
? ? } finally {
//(3)釋放鎖。
? ? ? ? lock.unlock();
? ? }
? ? return result;
}
size()
? ??size方法需要獲取鎖,因為本類中的size變量沒有volatile變量修飾無法保證內(nèi)存的可見性。
public int size() {
? ? final ReentrantLock lock = this.lock;
? ? lock.lock();
? ? try {
? ? ? ? return size;
? ? } finally {
? ? ? ? lock.unlock();
? ? }
}
? ??PriorityBlockingQueue隊列在內(nèi)部使用二叉樹堆維護元素的優(yōu)先級,使用數(shù)組作為元素的存儲的數(shù)據(jù)結(jié)構(gòu),該數(shù)組可進行擴容,但是容量也是有限制的,使用CAS來保證擴容時的唯一性。
? ???今天的分享就到這,有看不明白的地方一定是我寫的不夠清楚,所有歡迎提任何問題以及改善方法。