大飛老師帶你看線程(并發(fā)容器—PriorityBlockingQueue)

本文作者:王一飛,叩丁狼高級講師。原創(chuàng)文章,轉(zhuǎn)載請注明出處。

概述

按api上的解釋,PriorityBlockingQueue 是有一個帶有優(yōu)先級級別的無界阻塞隊列,不支持null元素入列,并且要求隊列對象必須為可以比較對象。這點跟PriorityQueue類 類似,區(qū)別是PriorityBlockingQueue 帶有阻塞功能。

PriorityBlockingQueue 出列具有優(yōu)先級之分,每次出列返回優(yōu)先級最高的元素。其底層通過是二叉樹最小堆實現(xiàn),這也導(dǎo)致了遍歷隊列時,獲取到的元素是無序的。

外部結(jié)構(gòu)

內(nèi)部結(jié)構(gòu):

public class PriorityBlockingQueue<E> extends AbstractQueue<E>
    implements BlockingQueue<E>, java.io.Serializable {
    private transient Object[] queue;   //存數(shù)據(jù)
    private transient int size;  //隊列大小
    private transient Comparator<? super E> comparator;  //隊列比較器
    private final ReentrantLock lock;  //線程安全鎖
    private final Condition notEmpty;  //鎖條件
    ...
}
基本使用

需求:學(xué)號越大,優(yōu)先級越高

public class Student implements Comparable<Student>{
    private int num;  //學(xué)號
    private String name;

    public Student(int num, String name){
       this.num = num;
       this.name = name;
    }

    public int compareTo(Student o) {
        return o.num - this.num;
    }

    public String toString() {
        return "Student{num=" + num +", name='" + name +"}";
    }
}
public class App {

    public static void  main(String[] args) throws InterruptedException {
        //初始化隊列,容量為3
        PriorityBlockingQueue<Student> queue = new PriorityBlockingQueue<Student>(3);

        queue.offer(new Student(1, "zhangsan"));
        queue.offer(new Student(4, "zhaoliu"));
        queue.offer(new Student(3, "wangwu"));
        queue.offer(new Student(2, "lisi"));  //超過3,自動拓展

        System.out.println(queue.take());
        System.out.println(queue.take());
        System.out.println(queue.take());
        System.out.println(queue.take());

        System.out.println(queue.take()); //超過容量,阻塞
    }
}

上面代碼可以大體看出PriorityBlockingQueue的特性,阻塞,無界,帶優(yōu)先級的隊列。

源碼解析

構(gòu)造器

    //空參數(shù),使用默認的初始容量: 11
    public PriorityBlockingQueue() {
        this(DEFAULT_INITIAL_CAPACITY, null);
    }
    //單參數(shù), 指定初始容量
    public PriorityBlockingQueue(int initialCapacity) {
        this(initialCapacity, null);
    }
    //2參數(shù)數(shù), 指定初始容量與額外的比較器
    public PriorityBlockingQueue(int initialCapacity,
                                 Comparator<? super E> comparator) {
        if (initialCapacity < 1)
            throw new IllegalArgumentException();
        this.lock = new ReentrantLock();
        this.notEmpty = lock.newCondition();
        this.comparator = comparator;
        this.queue = new Object[initialCapacity];
    }

入列-offer
PriorityBlockingQueue 是一個無界的隊列,所以隨便插入不需要進行邊界限制,一直返回true

public boolean offer(E e) {
        //不允許null入列
        if (e == null)
            throw new NullPointerException();
        final ReentrantLock lock = this.lock; 
        lock.lock();  //加鎖
        int n, cap;  //操作臨時變量
        Object[] array;
        //判斷當(dāng)前隊列元素是否超過隊列容量,超過擴容
        while ((n = size) >= (cap = (array = queue).length))
            tryGrow(array, cap);  //嘗試擴容,可能會失敗
        try {
            Comparator<? super E> cmp = comparator;
            if (cmp == null)
                //無比較器使用自然排序
                siftUpComparable(n, e, array); //根據(jù)優(yōu)先級插入數(shù)據(jù)
            else
                //使用指定的比較器
                siftUpUsingComparator(n, e, array, cmp);//根據(jù)優(yōu)先級插入數(shù)據(jù)
            size = n + 1;
            notEmpty.signal();  //隊列非空,喚醒出列阻塞線程
        } finally {
            lock.unlock(); //釋放鎖
        }
        //入列成功返回true
        return true;
    }

入列一個核心點隊列擴容: tryGrow:

private void tryGrow(Object[] array, int oldCap) {
        lock.unlock(); //出于性能考慮先釋放鎖
        Object[] newArray = null;  //擴容后的數(shù)組
        //allocationSpinLock  自旋鎖變量,默認為0,需要經(jīng)過原子cas判斷之后才會改值此處控制單線程進入if語句
        if (allocationSpinLock == 0 &&
            UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
                                     0, 1)) {
            try {
                //擴容大小,<64, +2, 如果大于64, + 50%, 封頂為int最大值-8
                int newCap = oldCap + ((oldCap < 64) ?
                                       (oldCap + 2) : // grow faster if small
                                       (oldCap >> 1));
                if (newCap - MAX_ARRAY_SIZE > 0) {    // possible overflow
                    int minCap = oldCap + 1;
                    if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
                        throw new OutOfMemoryError();
                    newCap = MAX_ARRAY_SIZE;
                }
                //擴容出新數(shù)組,此處需要queue==array, 原因:已有線程擴容成功不必再擴容
                if (newCap > oldCap && queue == array)
                    newArray = new Object[newCap];
            } finally {
                allocationSpinLock = 0;
            }
        }
        //線程不滿足allocationSpinLock 自旋鎖變量的if判斷,表示擴容失敗,讓出cpu
        if (newArray == null) 
            Thread.yield();
        //數(shù)據(jù)拷貝,不允許同時出列入列,需要獲取鎖
        lock.lock();
        //queue==array 如果已經(jīng)擴容, 此處擴容放棄
        if (newArray != null && queue == array) {
            queue = newArray;
            System.arraycopy(array, 0, newArray, 0, oldCap);
        }
    }

總結(jié)一下上代碼, tryGrow的最終的目的是進行擴容,先釋放鎖是出于性能考慮,比較擴容操作需要消耗一定時間,而期間無法進行入列出列,那隊列的并發(fā)性就大大打折扣了。但是,如果釋放鎖之后,那么擴容的安全就需要另辟蹊徑了。

所以tryGrow方法采用了cas原子操作方式實現(xiàn):

if (allocationSpinLock == 0 &&
            UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset, 0, 1)) {
    .....
}

allocationSpinLock 變量最初為0, UNSAFE.compareAndSwapInt方法比較與置換allocationSpinLock =1, 因為compareAndSwapInt原子操作性,導(dǎo)致同一時刻只有一個線程進入執(zhí)行if語句塊。這樣也可以達到加鎖目的。

無法進入if語句塊的線程,則執(zhí)行Thread.yield();語句,讓cpu, 別占著茅坑不拉屎。

 if (newArray == null) 
            Thread.yield();

當(dāng)幸運線程執(zhí)行if語句塊之后,新隊列空間已經(jīng)開辟好了, 接下來就是數(shù)據(jù)拷貝,這里就又得注意的,為防止數(shù)據(jù)拷貝出亂子,又得爭奪鎖,保證隊列數(shù)據(jù)安全,所有需要重新加鎖。

lock.lock();
if (newArray != null && queue == array) {
    queue = newArray;
    System.arraycopy(array, 0, newArray, 0, oldCap);
}

入列另外一個核心點,數(shù)據(jù)存儲:siftUpComparable / siftUpUsingComparator

    private static <T> void siftUpComparable(int k, T x, Object[] array) {
        Comparable<? super T> key = (Comparable<? super T>) x;
        while (k > 0) {
            int parent = (k - 1) >>> 1;
            Object e = array[parent];
            if (key.compareTo((T) e) >= 0)
                break;
            array[k] = e;
            k = parent;
        }
        array[k] = key;
    }
    private static <T> void siftUpUsingComparator(int k, T x, Object[] array,
                                       Comparator<? super T> cmp) {
        while (k > 0) {
            int parent = (k - 1) >>> 1;
            Object e = array[parent];
            if (cmp.compare(x, (T) e) >= 0)
                break;
            array[k] = e;
            k = parent;
        }
        array[k] = x;
    }

PriorityBlockingQueue使用的是二叉樹最小堆的方式進行存儲數(shù)據(jù)。
要想理解上面代碼, 先得明白二叉樹最小堆操作概念:
1>二叉樹最小堆:是一種經(jīng)過排序的完全二叉樹,其中任一非終端節(jié)點的數(shù)據(jù)值均不大于其左子節(jié)點和右子節(jié)點的值。
2>使用數(shù)組來實現(xiàn)二叉堆最小堆,如果把最新入列元素下標(biāo)設(shè)置 i,那么該節(jié)點的父節(jié)點是i/2。
3>如果父節(jié)點的值大于子節(jié)點子需要進行交互。
有這個概念之后,我們來看 siftUpComparable代碼,進入siftUpComparable方法,先算出父parent的索引,再判斷父節(jié)點跟入列數(shù)據(jù)比對,如果父節(jié)點數(shù)據(jù)小于入列數(shù)據(jù)跳過,尋找上一個父節(jié)點。再重復(fù)剛剛判斷即可,如果父節(jié)點數(shù)據(jù)大于入列數(shù)據(jù),則交換。

出列-take

    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly(); //加鎖
        E result;
        try {
            //出列失敗,等待
            while ( (result = dequeue()) == null)
                notEmpty.await();
        } finally {
            lock.unlock();
        }
        return result;
    }

出列核心方法dequeue:

    private E dequeue() {
        int n = size - 1;
        if (n < 0)
            return null;
        else {
            Object[] array = queue;
            E result = (E) array[0];  //出列剔除數(shù)組第一個,也是最小堆的樹根
            E x = (E) array[n];
            array[n] = null;
            Comparator<? super E> cmp = comparator;
            //樹根出列之后,需要向上調(diào)整堆結(jié)構(gòu),弄出新的最小堆。
            if (cmp == null)
                siftDownComparable(0, x, array, n);
            else
                siftDownUsingComparator(0, x, array, n, cmp);
            size = n;
            return result;
        }
    }
總結(jié)

PriorityBlockingQueue是一個無界阻塞隊列,出隊的元素是優(yōu)先級最高的元素,而優(yōu)先級的規(guī)則可以自己指定,如果沒指定默認使用自然趴下規(guī)則。
PriorityBlockingQueue內(nèi)部通過使用一個二叉樹最小堆算法來維護內(nèi)部數(shù)組,這個數(shù)組是可擴容的,當(dāng)前元素個數(shù)>=最大容量時候會通過算法(小于64+2 大于64 + 50%)擴容。

想獲取更多技術(shù)干貨,請前往叩丁狼官網(wǎng):http://www.wolfcode.cn/all_article.html

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容