JAVA并發(fā)梳理(四) 各種安全隊(duì)列

在 Java 多線程應(yīng)用中,隊(duì)列的使用率很高,多數(shù)生產(chǎn)消費(fèi)模型的首選數(shù)據(jù)結(jié)構(gòu)就是隊(duì)列(先進(jìn)先出)。
Java提供的線程安全的 Queue 可以分為

  • 阻塞隊(duì)列,典型例子是 LinkedBlockingQueue
    適用阻塞隊(duì)列的好處:多線程操作共同的隊(duì)列時(shí)不需要額外的同步,另外就是隊(duì)列會(huì)自動(dòng)平衡負(fù)載,即那邊(生產(chǎn)與消費(fèi)兩邊)處理快了就會(huì)被阻塞掉,從而減少兩邊的處理速度差距。
  • 非阻塞隊(duì)列,典型例子是 ConcurrentLinkedQueue
    當(dāng)許多線程共享訪問一個(gè)公共集合時(shí),ConcurrentLinkedQueue 是一個(gè)恰當(dāng)?shù)倪x擇。

LinkedBlockingQueue 多用于任務(wù)隊(duì)列
ConcurrentLinkedQueue 多用于消息隊(duì)列

下面分別介紹下JDK中阻塞隊(duì)列和非阻塞隊(duì)列的各種實(shí)現(xiàn)。

Concurrent Queue的兩種實(shí)現(xiàn)(非阻塞隊(duì)列)
1. ConcurrentLinkedQueue

a) 基于鏈表節(jié)點(diǎn), 無界隊(duì)列。注意size需要遍歷整個(gè)鏈表,且如果有其他修改的操作會(huì)導(dǎo)致size不準(zhǔn)確;盡量使用isEmpty。
b) 通過無鎖CAS的方式操作元素,實(shí)現(xiàn)了高并發(fā)狀態(tài)下的高性能。

2. ConcurrentLinkedDeque

線程安全雙端隊(duì)列,實(shí)現(xiàn)方式基本同ConcurrentLinkedQueue。

BlockingQueue的各種實(shí)現(xiàn)(阻塞隊(duì)列)

BlockingQueue擴(kuò)展Queue的方法,使其在在隊(duì)列已滿的情況下添加元素時(shí)有了4種模式:
其一,拋異常;boolean add(E e);
其二,立即返回false;boolean offer(E e);
其三,等待;void put(E e) throws InterruptedException;
其四,等待一段時(shí)間。 boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException;

JDK中對BlockingQueue提供了近十種實(shí)現(xiàn),學(xué)習(xí)的時(shí)候可以從兩個(gè)主要方面入手,1) 有界還是無界 2)安全性是如何保證的。

1. ArrayBlockingQueue

a) 內(nèi)部維護(hù)了一個(gè)定長數(shù)組,以便緩存隊(duì)列中的數(shù)據(jù)對象。有界隊(duì)列。
b) 從其關(guān)鍵屬性可以看出來,安全訪問控制通過ReentrantLock和Condition配合實(shí)現(xiàn),固每次入列和出列時(shí)需要獲得全局的鎖,因此是不能完全并行的。也有人稱其為內(nèi)部沒有實(shí)現(xiàn)讀寫分離。

   /** The queued items */
    final Object[] items;

    /** items index for next take, poll, peek or remove */
    int takeIndex;

    /** items index for next put, offer, or add */
    int putIndex;

    /** Number of elements in the queue */
    int count;

    /*
     * Concurrency control uses the classic two-condition algorithm
     * found in any textbook.
     */

    /** Main lock guarding all access */
    final ReentrantLock lock;

    /** Condition for waiting takes */
    private final Condition notEmpty;

    /** Condition for waiting puts */
    private final Condition notFull;
2. LinkedBlockingQueue

基于鏈表的阻塞隊(duì)列。
a) 其內(nèi)部也維持著一個(gè)數(shù)據(jù)緩沖隊(duì)列〈該隊(duì)列由一個(gè)鏈表構(gòu)成);可以指定容量,也可以不指定,不指定的話,默認(rèn)最大Integer.MAX_VALUE。也可以算是個(gè)無界隊(duì)列吧。
b) 其內(nèi)部實(shí)現(xiàn)采用分離鎖(讀寫分離兩個(gè)鎖),從而實(shí)現(xiàn)生產(chǎn)者和消費(fèi)者操作的完全并行運(yùn)行,因此能夠高效的處理并發(fā)數(shù)據(jù)。

 /**
     * Linked list node class
     */
    static class Node<E> {
        E item;

        /**
         * One of:
         * - the real successor Node
         * - this Node, meaning the successor is head.next
         * - null, meaning there is no successor (this is the last node)
         */
        Node<E> next;

        Node(E x) { item = x; }
    }

    /** The capacity bound, or Integer.MAX_VALUE if none */
    private final int capacity;

    /** Current number of elements */
    private final AtomicInteger count = new AtomicInteger();

    /**
     * Head of linked list.
     * Invariant: head.item == null
     */
    transient Node<E> head;

    /**
     * Tail of linked list.
     * Invariant: last.next == null
     */
    private transient Node<E> last;

    /** Lock held by take, poll, etc */
    private final ReentrantLock takeLock = new ReentrantLock();

    /** Wait queue for waiting takes */
    private final Condition notEmpty = takeLock.newCondition();

    /** Lock held by put, offer, etc */
    private final ReentrantLock putLock = new ReentrantLock();

    /** Wait queue for waiting puts */
    private final Condition notFull = putLock.newCondition();
3. SynchronousQueue

一種沒有緩沖的隊(duì)列,生產(chǎn)者產(chǎn)生的數(shù)據(jù)直接會(huì)被消費(fèi)者獲取并消費(fèi)。內(nèi)部提供了TransferQueueTransferStack,其實(shí)是對應(yīng)FIFO, FILO,模擬公平鎖和非公平鎖。注意,在沒有需要的生產(chǎn)者或者消費(fèi)者的時(shí)候,SynchronousQueue會(huì)通過park阻塞線程。

4. PriorityBlockingQueue

基于優(yōu)先級(jí)的阻塞隊(duì)列(優(yōu)先級(jí)的判斷通過構(gòu)造函數(shù)傳入的Comparator對象來決定,傳入隊(duì)列的對象必須實(shí)現(xiàn)Comparable接口),在實(shí)現(xiàn)PriorityBlockingQueue時(shí),內(nèi)部控制線程同步的鎖采用的是公平鎖,他也是一個(gè)無界的隊(duì)列。
a) 底層是HEAP實(shí)現(xiàn),數(shù)組可擴(kuò)展,因此是無界隊(duì)列,因此put永遠(yuǎn)不會(huì)阻塞。
b) 鎖使用ReentrantLock,以及一個(gè)nonEmpty Condition,只有在隊(duì)列為空,take的時(shí)候會(huì)阻塞。

/**
     * Priority queue represented as a balanced binary heap: the two
     * children of queue[n] are queue[2*n+1] and queue[2*(n+1)].  The
     * priority queue is ordered by comparator, or by the elements'
     * natural ordering, if comparator is null: For each node n in the
     * heap and each descendant d of n, n <= d.  The element with the
     * lowest value is in queue[0], assuming the queue is nonempty.
     */
    private transient Object[] queue;

    /**
     * The number of elements in the priority queue.
     */
    private transient int size;

    /**
     * The comparator, or null if priority queue uses elements'
     * natural ordering.
     */
    private transient Comparator<? super E> comparator;

    /**
     * Lock used for all public operations
     */
    private final ReentrantLock lock;

    /**
     * Condition for blocking when empty
     */
    private final Condition notEmpty;
5. 雙端隊(duì)列BlockingDeque及實(shí)現(xiàn)LinkedBlockingDeque
    /** Main lock guarding all access */
    final ReentrantLock lock = new ReentrantLock();

    /** Condition for waiting takes */
    private final Condition notEmpty = lock.newCondition();

    /** Condition for waiting puts */
    private final Condition notFull = lock.newCondition();

可以看出來由于只有一個(gè)鎖,其實(shí)也是不能做到真正意義上的兩頭并行操作的。

LinkedBlockingDeque的基本方法
5. DelayQueue

帶有延遲時(shí)間的Queue,其中的元素只有當(dāng)其指定的延遲時(shí)間到了,才能夠從隊(duì)列中獲取到該元素。DelayQueue中的元素必須實(shí)現(xiàn)Delayed接口,DelayQueue是一個(gè)沒有大小限制的隊(duì)列,應(yīng)用場景很多,比如對緩存超時(shí)的數(shù)據(jù)進(jìn)行移除、任務(wù)超時(shí)處理、空閑連接的關(guān)閉等等。
對這個(gè)隊(duì)列的實(shí)現(xiàn)可以結(jié)合ScheduledThreadPoolExecutor相關(guān)來看。

終于基本把線程安全Queue相關(guān)的實(shí)現(xiàn)類都過了一遍了,項(xiàng)目中要結(jié)合應(yīng)用場景選擇最合適的使用。這需要建立在對各種實(shí)現(xiàn)都很熟悉的基礎(chǔ)上。

引用
LinkedBlockingQueue 和 ConcurrentLinkedQueue的用法及區(qū)別
java挑戰(zhàn)高并發(fā)(14): LinkedBlockingQueue和ConcurrentLinkedQueue的區(qū)別及用法
java線程安全之并發(fā)Queue(十三)

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容