在 Java 多線程應(yīng)用中,隊(duì)列的使用率很高,多數(shù)生產(chǎn)消費(fèi)模型的首選數(shù)據(jù)結(jié)構(gòu)就是隊(duì)列(先進(jìn)先出)。
Java提供的線程安全的 Queue 可以分為
-
阻塞隊(duì)列,典型例子是
LinkedBlockingQueue
適用阻塞隊(duì)列的好處:多線程操作共同的隊(duì)列時(shí)不需要額外的同步,另外就是隊(duì)列會(huì)自動(dòng)平衡負(fù)載,即那邊(生產(chǎn)與消費(fèi)兩邊)處理快了就會(huì)被阻塞掉,從而減少兩邊的處理速度差距。 -
非阻塞隊(duì)列,典型例子是
ConcurrentLinkedQueue
當(dāng)許多線程共享訪問一個(gè)公共集合時(shí),ConcurrentLinkedQueue是一個(gè)恰當(dāng)?shù)倪x擇。
LinkedBlockingQueue 多用于任務(wù)隊(duì)列
ConcurrentLinkedQueue 多用于消息隊(duì)列
下面分別介紹下JDK中阻塞隊(duì)列和非阻塞隊(duì)列的各種實(shí)現(xiàn)。
Concurrent Queue的兩種實(shí)現(xiàn)(非阻塞隊(duì)列)
1. ConcurrentLinkedQueue
a) 基于鏈表節(jié)點(diǎn), 無界隊(duì)列。注意size需要遍歷整個(gè)鏈表,且如果有其他修改的操作會(huì)導(dǎo)致size不準(zhǔn)確;盡量使用isEmpty。
b) 通過無鎖CAS的方式操作元素,實(shí)現(xiàn)了高并發(fā)狀態(tài)下的高性能。
2. ConcurrentLinkedDeque
線程安全雙端隊(duì)列,實(shí)現(xiàn)方式基本同ConcurrentLinkedQueue。
BlockingQueue的各種實(shí)現(xiàn)(阻塞隊(duì)列)
BlockingQueue擴(kuò)展Queue的方法,使其在在隊(duì)列已滿的情況下添加元素時(shí)有了4種模式:
其一,拋異常;boolean add(E e);
其二,立即返回false;boolean offer(E e);
其三,等待;void put(E e) throws InterruptedException;
其四,等待一段時(shí)間。 boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException;
JDK中對BlockingQueue提供了近十種實(shí)現(xiàn),學(xué)習(xí)的時(shí)候可以從兩個(gè)主要方面入手,1) 有界還是無界 2)安全性是如何保證的。
1. ArrayBlockingQueue
a) 內(nèi)部維護(hù)了一個(gè)定長數(shù)組,以便緩存隊(duì)列中的數(shù)據(jù)對象。有界隊(duì)列。
b) 從其關(guān)鍵屬性可以看出來,安全訪問控制通過ReentrantLock和Condition配合實(shí)現(xiàn),固每次入列和出列時(shí)需要獲得全局的鎖,因此是不能完全并行的。也有人稱其為內(nèi)部沒有實(shí)現(xiàn)讀寫分離。
/** The queued items */
final Object[] items;
/** items index for next take, poll, peek or remove */
int takeIndex;
/** items index for next put, offer, or add */
int putIndex;
/** Number of elements in the queue */
int count;
/*
* Concurrency control uses the classic two-condition algorithm
* found in any textbook.
*/
/** Main lock guarding all access */
final ReentrantLock lock;
/** Condition for waiting takes */
private final Condition notEmpty;
/** Condition for waiting puts */
private final Condition notFull;
2. LinkedBlockingQueue
基于鏈表的阻塞隊(duì)列。
a) 其內(nèi)部也維持著一個(gè)數(shù)據(jù)緩沖隊(duì)列〈該隊(duì)列由一個(gè)鏈表構(gòu)成);可以指定容量,也可以不指定,不指定的話,默認(rèn)最大Integer.MAX_VALUE。也可以算是個(gè)無界隊(duì)列吧。
b) 其內(nèi)部實(shí)現(xiàn)采用分離鎖(讀寫分離兩個(gè)鎖),從而實(shí)現(xiàn)生產(chǎn)者和消費(fèi)者操作的完全并行運(yùn)行,因此能夠高效的處理并發(fā)數(shù)據(jù)。
/**
* Linked list node class
*/
static class Node<E> {
E item;
/**
* One of:
* - the real successor Node
* - this Node, meaning the successor is head.next
* - null, meaning there is no successor (this is the last node)
*/
Node<E> next;
Node(E x) { item = x; }
}
/** The capacity bound, or Integer.MAX_VALUE if none */
private final int capacity;
/** Current number of elements */
private final AtomicInteger count = new AtomicInteger();
/**
* Head of linked list.
* Invariant: head.item == null
*/
transient Node<E> head;
/**
* Tail of linked list.
* Invariant: last.next == null
*/
private transient Node<E> last;
/** Lock held by take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock();
/** Wait queue for waiting takes */
private final Condition notEmpty = takeLock.newCondition();
/** Lock held by put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();
/** Wait queue for waiting puts */
private final Condition notFull = putLock.newCondition();
3. SynchronousQueue
一種沒有緩沖的隊(duì)列,生產(chǎn)者產(chǎn)生的數(shù)據(jù)直接會(huì)被消費(fèi)者獲取并消費(fèi)。內(nèi)部提供了TransferQueue和TransferStack,其實(shí)是對應(yīng)FIFO, FILO,模擬公平鎖和非公平鎖。注意,在沒有需要的生產(chǎn)者或者消費(fèi)者的時(shí)候,SynchronousQueue會(huì)通過park阻塞線程。
4. PriorityBlockingQueue
基于優(yōu)先級(jí)的阻塞隊(duì)列(優(yōu)先級(jí)的判斷通過構(gòu)造函數(shù)傳入的Comparator對象來決定,傳入隊(duì)列的對象必須實(shí)現(xiàn)Comparable接口),在實(shí)現(xiàn)PriorityBlockingQueue時(shí),內(nèi)部控制線程同步的鎖采用的是公平鎖,他也是一個(gè)無界的隊(duì)列。
a) 底層是HEAP實(shí)現(xiàn),數(shù)組可擴(kuò)展,因此是無界隊(duì)列,因此put永遠(yuǎn)不會(huì)阻塞。
b) 鎖使用ReentrantLock,以及一個(gè)nonEmpty Condition,只有在隊(duì)列為空,take的時(shí)候會(huì)阻塞。
/**
* Priority queue represented as a balanced binary heap: the two
* children of queue[n] are queue[2*n+1] and queue[2*(n+1)]. The
* priority queue is ordered by comparator, or by the elements'
* natural ordering, if comparator is null: For each node n in the
* heap and each descendant d of n, n <= d. The element with the
* lowest value is in queue[0], assuming the queue is nonempty.
*/
private transient Object[] queue;
/**
* The number of elements in the priority queue.
*/
private transient int size;
/**
* The comparator, or null if priority queue uses elements'
* natural ordering.
*/
private transient Comparator<? super E> comparator;
/**
* Lock used for all public operations
*/
private final ReentrantLock lock;
/**
* Condition for blocking when empty
*/
private final Condition notEmpty;
5. 雙端隊(duì)列BlockingDeque及實(shí)現(xiàn)LinkedBlockingDeque
/** Main lock guarding all access */
final ReentrantLock lock = new ReentrantLock();
/** Condition for waiting takes */
private final Condition notEmpty = lock.newCondition();
/** Condition for waiting puts */
private final Condition notFull = lock.newCondition();
可以看出來由于只有一個(gè)鎖,其實(shí)也是不能做到真正意義上的兩頭并行操作的。

5. DelayQueue
帶有延遲時(shí)間的Queue,其中的元素只有當(dāng)其指定的延遲時(shí)間到了,才能夠從隊(duì)列中獲取到該元素。DelayQueue中的元素必須實(shí)現(xiàn)Delayed接口,DelayQueue是一個(gè)沒有大小限制的隊(duì)列,應(yīng)用場景很多,比如對緩存超時(shí)的數(shù)據(jù)進(jìn)行移除、任務(wù)超時(shí)處理、空閑連接的關(guān)閉等等。
對這個(gè)隊(duì)列的實(shí)現(xiàn)可以結(jié)合ScheduledThreadPoolExecutor相關(guān)來看。
終于基本把線程安全Queue相關(guān)的實(shí)現(xiàn)類都過了一遍了,項(xiàng)目中要結(jié)合應(yīng)用場景選擇最合適的使用。這需要建立在對各種實(shí)現(xiàn)都很熟悉的基礎(chǔ)上。
引用
LinkedBlockingQueue 和 ConcurrentLinkedQueue的用法及區(qū)別
java挑戰(zhàn)高并發(fā)(14): LinkedBlockingQueue和ConcurrentLinkedQueue的區(qū)別及用法
java線程安全之并發(fā)Queue(十三)