Java中線程安全的容器主要包括兩類(lèi):
-
Vector、Hashtable,以及封裝器類(lèi)Collections.synchronizedList和Collections.synchronizedMap; - Java 5.0引入的
java.util.concurrent包,其中包含并發(fā)隊(duì)列、并發(fā)HashMap以及寫(xiě)入時(shí)復(fù)制容器。
依筆者看,早期使用的同步容器主要有兩方面的問(wèn)題:1)通過(guò)對(duì)方法添加synchronized關(guān)鍵字實(shí)現(xiàn)同步,這種粗粒度的加鎖操作在synchronized關(guān)鍵字本身未充分優(yōu)化之前,效率偏低;2)同步容器雖然是線程安全的,但在某些外部復(fù)合操作(例:若沒(méi)有則添加)時(shí),依然需要客戶(hù)端加鎖保證數(shù)據(jù)安全。因此,從Java 5.0以后,并發(fā)編程偏向于使用java.util.concurrent包(作者:Doug Lea)中的容器類(lèi),本文也將著重介紹該包中的容器類(lèi),主要包括:
- 阻塞隊(duì)列
- ConcurrentHashMap
- 寫(xiě)入時(shí)復(fù)制容器
一、阻塞隊(duì)列
在并發(fā)環(huán)境下,阻塞隊(duì)列是常用的數(shù)據(jù)結(jié)構(gòu),它能確保數(shù)據(jù)高效安全的傳輸,為快速搭建高質(zhì)量的多線程應(yīng)用帶來(lái)極大的便利,比如MQ的原理就是基于阻塞隊(duì)列的。java.util.concurrent中包含豐富的隊(duì)列實(shí)現(xiàn),它們之間的關(guān)系如下圖所示:

- BlockingQueue、Deque(雙向隊(duì)列)繼承自Queue接口;
- BlockingDeque同時(shí)繼承自BlockingQueue、Deque接口,提供阻塞的雙向隊(duì)列屬性;
- LinkedBlockingQueue和LinkedBlockingDeque分別實(shí)現(xiàn)了BlockingQueue和BlockingDeque接口;
- DelayQueue實(shí)現(xiàn)了BlockingQueue接口,提供任務(wù)延遲功能;
- TransferQueue是Java 7引入的,用于替代BlockingQueue,LinkedTransferQueue是其實(shí)現(xiàn)類(lèi)。
下面對(duì)這些隊(duì)列進(jìn)行詳細(xì)的介紹:
1.1 BlockingQueue與BlockingDeque
阻塞隊(duì)列(BlockingQueue)是一個(gè)支持兩個(gè)附加操作的隊(duì)列。這兩個(gè)附加的操作是:
- 在隊(duì)列為空時(shí),獲取元素的線程會(huì)等待隊(duì)列變?yōu)榉强铡?/li>
- 當(dāng)隊(duì)列滿(mǎn)時(shí),存儲(chǔ)元素的線程會(huì)等待隊(duì)列可用。
阻塞隊(duì)列常用于生產(chǎn)者和消費(fèi)者的場(chǎng)景,生產(chǎn)者是往隊(duì)列里添加元素的線程,消費(fèi)者是從隊(duì)列里拿元素的線程。阻塞隊(duì)列就是生產(chǎn)者存放元素的容器,而消費(fèi)者也只從容器里拿元素。
阻塞隊(duì)列提供了四種處理方法:
| 方法 | 拋出異常 | 返回特殊值 | 一直阻塞 | 超時(shí)退出 |
|---|---|---|---|---|
| 插入方法 | add(e) | offer(e) | put(e) | offer(e,time,unit) |
| 移除方法 | remove() | poll() | take() | poll(time,unit) |
| 檢查方法 | element() | peek() | 不可用 | 不可用 |
- 拋出異常:是指當(dāng)阻塞隊(duì)列滿(mǎn)時(shí)候,再往隊(duì)列里插入元素,會(huì)拋出IllegalStateException("Queue full")異常。當(dāng)隊(duì)列為空時(shí),從隊(duì)列里獲取元素時(shí)會(huì)拋出NoSuchElementException異常 。
- 返回特殊值:插入方法會(huì)返回是否成功,成功則返回true。移除方法,則是從隊(duì)列里拿出一個(gè)元素,如果沒(méi)有則返回null
- 一直阻塞:當(dāng)阻塞隊(duì)列滿(mǎn)時(shí),如果生產(chǎn)者線程往隊(duì)列里put元素,隊(duì)列會(huì)一直阻塞生產(chǎn)者線程,直到拿到數(shù)據(jù),或者響應(yīng)中斷退出。當(dāng)隊(duì)列空時(shí),消費(fèi)者線程試圖從隊(duì)列里take元素,隊(duì)列也會(huì)阻塞消費(fèi)者線程,直到隊(duì)列可用。
- 超時(shí)退出:當(dāng)阻塞隊(duì)列滿(mǎn)時(shí),隊(duì)列會(huì)阻塞生產(chǎn)者線程一段時(shí)間,如果超過(guò)一定的時(shí)間,生產(chǎn)者線程就會(huì)退出。
BlockingDeque在BlockingQueue的基礎(chǔ)上,增加了支持雙向隊(duì)列的屬性。如下圖所示,相比于BlockingQueue的插入和移除方法,變?yōu)?code>XxxFirst,XxxLast方法,分別對(duì)應(yīng)隊(duì)列的兩端,既可以在頭部添加或移除,也可以在尾部添加或移除。

1.2 LinkedBlockingQueue與LinkedBlockingDeque
LinkedBlockingQueue是一個(gè)用鏈表實(shí)現(xiàn)的有界阻塞隊(duì)列。此隊(duì)列的默認(rèn)和最大長(zhǎng)度為Integer.MAX_VALUE,按照先進(jìn)先出的原則對(duì)元素進(jìn)行排序。
首先看下LinkedBlockingQueue中核心的域:
static class Node<E> {
E item;
Node<E> next;
Node(E x) { item = x; }
}
private final int capacity;
private final AtomicInteger count = new AtomicInteger();
transient Node<E> head;
private transient Node<E> last;
private final ReentrantLock takeLock = new ReentrantLock();
private final Condition notEmpty = takeLock.newCondition();
private final ReentrantLock putLock = new ReentrantLock();
private final Condition notFull = putLock.newCondition();
-
LinkedBlockingQueue和LinkedList類(lèi)似,通過(guò)靜態(tài)內(nèi)部類(lèi)Node<E>進(jìn)行元素的存儲(chǔ); -
capacity表示阻塞隊(duì)列所能存儲(chǔ)的最大容量,在創(chuàng)建時(shí)可以手動(dòng)指定最大容量,默認(rèn)的最大容量為Integer.MAX_VALUE; -
count表示當(dāng)前隊(duì)列中的元素?cái)?shù)量,LinkedBlockingQueue的入隊(duì)列和出隊(duì)列使用了兩個(gè)不同的lock對(duì)象,因此無(wú)論是在入隊(duì)列還是出隊(duì)列,都會(huì)涉及對(duì)元素?cái)?shù)量的并發(fā)修改,因此這里使用了一個(gè)原子操作類(lèi)來(lái)解決對(duì)同一個(gè)變量進(jìn)行并發(fā)修改的線程安全問(wèn)題。 -
head和last分別表示鏈表的頭部和尾部; -
takeLock表示元素出隊(duì)列時(shí)線程所獲取的鎖,當(dāng)執(zhí)行take、poll等操作時(shí)線程獲??;notEmpty當(dāng)隊(duì)列為空時(shí),通過(guò)該Condition讓獲取元素的線程處于等待狀態(tài); -
putLock表示元素入隊(duì)列時(shí)線程所獲取的鎖,當(dāng)執(zhí)行put、offer等操作時(shí)獲??;notFull當(dāng)隊(duì)列容量達(dá)到capacity時(shí),通過(guò)該Condition讓加入元素的線程處于等待狀態(tài)。
其次,LinkedBlockingQueue有三個(gè)構(gòu)造方法,分別如下:
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}
public LinkedBlockingQueue(Collection<? extends E> c) {
this(Integer.MAX_VALUE);
final ReentrantLock putLock = this.putLock;
putLock.lock(); // Never contended, but necessary for visibility
try {
int n = 0;
for (E e : c) {
if (e == null)
throw new NullPointerException();
if (n == capacity)
throw new IllegalStateException("Queue full");
enqueue(new Node<E>(e));
++n;
}
count.set(n);
} finally {
putLock.unlock();
}
}
默認(rèn)構(gòu)造函數(shù)直接調(diào)用LinkedBlockingQueue(int capacity),LinkedBlockingQueue(int capacity)會(huì)初始化首尾節(jié)點(diǎn),并置位null。LinkedBlockingQueue(Collection<? extends E> c)在初始化隊(duì)列的同時(shí),將一個(gè)集合的全部元素加入隊(duì)列。
最后分析下put和take的過(guò)程,這里重點(diǎn)關(guān)注:LinkedBlockingQueue如何實(shí)現(xiàn)添加/移除并行的?
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
while (count.get() == capacity) {
notFull.await();
}
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
}
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
notEmpty.await();
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
之所以把put和take放在一起,是因?yàn)樗鼈兪且粚?duì)互逆的過(guò)程:
-
put在插入元素前首先獲得putLock和當(dāng)前隊(duì)列的元素?cái)?shù)量,take在去除元素前首先獲得takeLock和當(dāng)前隊(duì)列的元素?cái)?shù)量; -
put時(shí)需要判斷當(dāng)前隊(duì)列是否已滿(mǎn),已滿(mǎn)時(shí)當(dāng)前線程進(jìn)行等待,take時(shí)需要判斷隊(duì)列是否已空,隊(duì)列為空時(shí)當(dāng)前線程進(jìn)行等待; -
put調(diào)用enqueue在隊(duì)尾插入元素,并修改尾指針,take調(diào)用dequeue將head指向原來(lái)first的位置,并將first的數(shù)據(jù)域置位null,實(shí)現(xiàn)刪除原first指針,并產(chǎn)生新的head,同時(shí),切斷原head節(jié)點(diǎn)的引用,便于垃圾回收。
private void enqueue(Node<E> node) {
last = last.next = node;
}
private E dequeue() {
Node<E> h = head;
Node<E> first = h.next;
h.next = h; // help GC
head = first;
E x = first.item;
first.item = null;
return x;
}
- 最后,
put根據(jù)count決定是否觸發(fā)隊(duì)列未滿(mǎn)和隊(duì)列空;take根據(jù)count決定是否觸發(fā)隊(duì)列未空和隊(duì)列滿(mǎn)。
回到剛才的問(wèn)題:LinkedBlockingQueue如何實(shí)現(xiàn)添加/移除并行的?
LinkedBlockingQueue在入隊(duì)列和出隊(duì)列時(shí)使用的是不同的Lock,這也意味著它們之間的操作不會(huì)存在互斥。在多個(gè)CPU的情況下,可以做到在同一時(shí)刻既消費(fèi)、又生產(chǎn),做到并行處理。
同樣的,LinkedBlockingDeque在LinkedBlockingQueue的基礎(chǔ)上,增加了雙向操作的屬性。繼續(xù)以put和take為例,LinkedBlockingDeque增加了putFirst/putLast、takeFirst/takeLast方法,分別用于在隊(duì)列頭、尾進(jìn)行添加和刪除。與LinkedBlockingQueue不同的是,LinkedBlockingDeque的入隊(duì)列和出隊(duì)列不再使用不同的Lock。
final ReentrantLock lock = new ReentrantLock();
private final Condition notEmpty = lock.newCondition();
private final Condition notFull = lock.newCondition();
其中,lock表示讀寫(xiě)的主鎖,notEmpty和notFull依然表示相應(yīng)的控制線程狀態(tài)條件量。以putFirst和takeFirst為例:
public void putFirst(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
Node<E> node = new Node<E>(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
while (!linkFirst(node))
notFull.await();
} finally {
lock.unlock();
}
}
putFirst不支持插入null元素,首先新建一個(gè)Node對(duì)象,然后調(diào)用ReentrantLock的lock方法獲取鎖,插入操作通過(guò)boolean linkFirst(Node<E> node)實(shí)現(xiàn),如果當(dāng)前隊(duì)列頭已滿(mǎn),那么該線程等待(linkFirst方法在寫(xiě)入元素成功后會(huì)釋放該鎖信號(hào)),最后,在finally塊中釋放鎖(ReentrantLock的使用)。
public E takeFirst() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
E x;
while ( (x = unlinkFirst()) == null)
notEmpty.await();
return x;
} finally {
lock.unlock();
}
}
與putFirst類(lèi)似,takeFirst首先獲取鎖,然后在try中解除尾元素對(duì)象的引用,如果unlinkFirst為空,表示隊(duì)列為空,沒(méi)有元素可刪,那么該線程等待。同樣,最后在finally塊中釋放鎖。
那么問(wèn)題來(lái)了,LinkedBlockingDeque為什么不使用LinkedBlockingQueue讀寫(xiě)鎖分離的方式呢?LinkedBlockingDeque與LinkedBlockingQueue的使用場(chǎng)景有什么區(qū)別呢?
1.3 DelayQueue
DelayQueue主要用于實(shí)現(xiàn)延時(shí)任務(wù),比如:等待一段時(shí)間之后關(guān)閉連接,緩存對(duì)象過(guò)期刪除,任務(wù)超時(shí)處理等等,這些任務(wù)的共同特點(diǎn)是等待一段時(shí)間之后執(zhí)行(類(lèi)似于TimerTask)。DelayQueue的實(shí)現(xiàn)包括三個(gè)核心特征:
- 延時(shí)任務(wù):
DelayQueue的泛型類(lèi)需要繼承自Delayed接口,而Delayed接口繼承自Comparable<Delayed>,用于隊(duì)列中優(yōu)先排序的比較; - 優(yōu)先隊(duì)列:
DelayQueue的實(shí)現(xiàn)采用了優(yōu)先隊(duì)列PriorityQueue,即延遲時(shí)間越短的任務(wù)越優(yōu)先(回憶下優(yōu)先隊(duì)列中二叉堆的實(shí)現(xiàn))。 - 阻塞隊(duì)列:支持并發(fā)讀寫(xiě),采用
ReentrantLock來(lái)實(shí)現(xiàn)讀寫(xiě)的鎖操作。
因此,DelayQueue = Delayed + PriorityQueue + BlockingQueue。
public interface Delayed extends Comparable<Delayed> {
long getDelay(TimeUnit unit);
}
public class DelayQueue<E extends Delayed> extends AbstractQueue<E> implements BlockingQueue<E> {
private final transient ReentrantLock lock = new ReentrantLock();
private final PriorityQueue<E> q = new PriorityQueue<E>();
private Thread leader = null;
private final Condition available = lock.newCondition();
}
接下來(lái)看下DelayQueue的讀寫(xiě)操作如何實(shí)現(xiàn)延時(shí)任務(wù)的?
public boolean offer(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
q.offer(e);
if (q.peek() == e) {
leader = null;
available.signal();
}
return true;
} finally {
lock.unlock();
}
}
首先執(zhí)行加鎖操作,然后往優(yōu)先隊(duì)列中插入元素e,優(yōu)先隊(duì)列會(huì)調(diào)用泛型E的compareTo方法進(jìn)行比較(具體關(guān)于二叉堆的操作,這里不再贅述,請(qǐng)參考數(shù)據(jù)結(jié)構(gòu)部分相關(guān)分析),將延遲時(shí)間最短的任務(wù)添加到隊(duì)頭。最后檢查下元素是否為隊(duì)頭,如果是隊(duì)頭的話,設(shè)置leader為空,喚醒所有等待的隊(duì)列,釋放鎖。
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
E first = q.peek();
if (first == null)
available.await();
else {
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0)
return q.poll();
first = null; // don't retain ref while waiting
if (leader != null)
available.await();
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
available.awaitNanos(delay);
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && q.peek() != null)
available.signal();
lock.unlock();
}
}
- 首先執(zhí)行加鎖操作,然后取出優(yōu)先隊(duì)列的隊(duì)頭,如果對(duì)頭為空,則該線程阻塞;
- 獲得對(duì)頭元素的延遲時(shí)間,如果延遲時(shí)間小于等于0,說(shuō)明該元素已經(jīng)到了可以使用的時(shí)間,調(diào)用poll方法彈出該元素;
- 在延遲時(shí)間大于0時(shí),首先釋放元素first的引用(避免內(nèi)存泄露),其次判斷如果leader線程不為空,則該線程阻塞(表示已有線程在等待)。否則,把當(dāng)前線程賦值給leader元素,然后阻塞delay的時(shí)間,即等待隊(duì)頭到達(dá)延遲時(shí)間,在finally塊中釋放leader元素的引用。循環(huán)后,取出對(duì)頭元素,退出for循環(huán)。
- 最后,如果leader為空并且優(yōu)先級(jí)隊(duì)列不為空的情況下(判斷還有沒(méi)有其他后續(xù)節(jié)點(diǎn)),調(diào)用signal通知其他的線程,并執(zhí)行解鎖操作。
1.4 TransferQueue與LinkedTransferQueue
TransferQueue是一個(gè)繼承了BlockingQueue的接口,并且增加若干新的方法。LinkedTransferQueue是TransferQueue接口的實(shí)現(xiàn)類(lèi),其定義為一個(gè)無(wú)界的隊(duì)列,具有先進(jìn)先出(FIFO)的特性。
TransferQueue接口主要包含以下方法:
public interface TransferQueue<E> extends BlockingQueue<E> {
boolean tryTransfer(E e);
void transfer(E e) throws InterruptedException;
boolean tryTransfer(E e, long timeout, TimeUnit unit) throws InterruptedException;
boolean hasWaitingConsumer();
int getWaitingConsumerCount();
}
- transfer(E e):若當(dāng)前存在一個(gè)正在等待獲取的消費(fèi)者線程,即立刻移交之;否則,會(huì)插入當(dāng)前元素e到隊(duì)列尾部,并且等待進(jìn)入阻塞狀態(tài),到有消費(fèi)者線程取走該元素。
- tryTransfer(E e):若當(dāng)前存在一個(gè)正在等待獲取的消費(fèi)者線程(使用take()或者poll()函數(shù)),使用該方法會(huì)即刻轉(zhuǎn)移/傳輸對(duì)象元素e;若不存在,則返回false,并且不進(jìn)入隊(duì)列。這是一個(gè)不阻塞的操作。
- tryTransfer(E e, long timeout, TimeUnit unit):若當(dāng)前存在一個(gè)正在等待獲取的消費(fèi)者線程,會(huì)立即傳輸給它;否則將插入元素e到隊(duì)列尾部,并且等待被消費(fèi)者線程獲取消費(fèi)掉;若在指定的時(shí)間內(nèi)元素e無(wú)法被消費(fèi)者線程獲取,則返回false,同時(shí)該元素被移除。
- hasWaitingConsumer():判斷是否存在消費(fèi)者線程。
- getWaitingConsumerCount():獲取所有等待獲取元素的消費(fèi)線程數(shù)量。
LinkedTransferQueue實(shí)現(xiàn)了上述方法,較之于LinkedBlockingQueue在隊(duì)列滿(mǎn)時(shí),入隊(duì)操作會(huì)被阻塞的特性,LinkedTransferQueue在隊(duì)列不滿(mǎn)時(shí)也可以阻塞,只要沒(méi)有消費(fèi)者使用元素。下面來(lái)看下LinkedTransferQueue的入隊(duì)和和出隊(duì)操作:transfer和take方法。
public void transfer(E e) throws InterruptedException {
if (xfer(e, true, SYNC, 0) != null) {
Thread.interrupted(); // failure possible only due to interrupt
throw new InterruptedException();
}
}
public E take() throws InterruptedException {
E e = xfer(null, false, SYNC, 0);
if (e != null)
return e;
Thread.interrupted();
throw new InterruptedException();
}
LinkedTransferQueue入隊(duì)和和出隊(duì)都使用了一個(gè)關(guān)鍵方法:
private E xfer(E e, boolean haveData, int how, long nanos) {}
其中,E表示被操作的元素,haveData為true表示添加數(shù)據(jù),false表示移除數(shù)據(jù);how有四種取值:NOW, ASYNC, SYNC, 或者TIMED,分別表示執(zhí)行的時(shí)機(jī);nanos表示how為TIMED時(shí)的時(shí)間限制。
(xfer方法具體流程較為復(fù)雜,這里不再展開(kāi)。另外,LinkedTransferQueue采用了CAS非阻塞同步機(jī)制,后面會(huì)具體講到)