CopyOnWriteArrayList
CopyOnWriteArrayList 是寫時(shí)復(fù)制的容器。通俗的理解是當(dāng)我們要往容器中添加元素的時(shí)候,不直接往當(dāng)前的數(shù)組天假,而是將當(dāng)前數(shù)組進(jìn)行 Copy,然后往新的數(shù)組中添加元素,添加完元素后,再將數(shù)組引用指向新的數(shù)組。
這樣做的好處是可以并發(fā)的讀,而不需要加鎖同步,而同步鎖只加在寫線程上,也就說很可能讀和寫并不是同一個(gè)數(shù)組。
實(shí)現(xiàn)原理
簡單起見,不妨看看 CopyOnWriteArrayList 的 get 和 add 接口是如何實(shí)現(xiàn)的:
final transient ReentrantLock lock = new ReentrantLock();
/** The array, accessed only via getArray/setArray. */
private transient volatile Object[] array;
private E get(Object[] a, int index) {
return (E) a[index];
}
public E get(int index) {
return get(getArray(), index);
}
public boolean add(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
Object[] elements = getArray();
int len = elements.length;
Object[] newElements = Arrays.copyOf(elements, len + 1);
newElements[len] = e;
setArray(newElements);
return true;
} finally {
lock.unlock();
}
}
可以看到調(diào)用 get 讀數(shù)據(jù)的時(shí)候并沒有加鎖,多個(gè)線程是并發(fā)的讀。但是調(diào)用 add 接口是會(huì)先獲取鎖,所有寫線程串行訪問。但是寫線程和讀線程是并發(fā)訪問的,所以如果寫線程還在寫的時(shí)候,讀線程還是有可能讀取舊的數(shù)據(jù)。
CopyOnWriteArrayList 返回的迭代器不會(huì)拋出 ConcurrentModificationException 異常,并且返回的元素與迭代器創(chuàng)建時(shí)的元素完全一致,而不必考慮之后修改操作所帶來的影響。
應(yīng)用場(chǎng)景
每當(dāng)修改時(shí),CopyOnWriteArrayList 都會(huì)復(fù)制底層數(shù)組,這需要一定開銷,特別是當(dāng)容器規(guī)模非常大的。僅當(dāng)?shù)僮?strong>遠(yuǎn)遠(yuǎn)多于修改操作時(shí),才應(yīng)該考慮使用“寫入時(shí)復(fù)制”容器。
盡量使用批量添加。因?yàn)槊看翁砑?,容器每次都?huì)進(jìn)行復(fù)制,所以減少添加次數(shù),可以減少容器的復(fù)制次數(shù)。
缺點(diǎn)
CopyOnWriteArrayList 有很多優(yōu)點(diǎn),但是同時(shí)也存在兩個(gè)問題,即內(nèi)存占用問題和數(shù)據(jù)一致性問題。所以在開發(fā)的時(shí)候需要注意一下:
- 內(nèi)存占用問題。因?yàn)閷憰r(shí)復(fù)制機(jī)制,所以在進(jìn)行寫操作的時(shí)候,內(nèi)存里會(huì)同時(shí)駐扎兩個(gè)數(shù)組,如果數(shù)組內(nèi)對(duì)象占用的內(nèi)存比較,那么這個(gè)時(shí)候很有可能造成頻繁的 GC。
- 只能保證數(shù)據(jù)的最終一致性,不能保證數(shù)據(jù)的實(shí)時(shí)一致性。所以如果你希望寫入的的數(shù)據(jù),馬上能讀到,請(qǐng)不要使用寫時(shí)復(fù)制容器。
CopyOnWriteArraySet
CopyOnWriteArraySet 實(shí)現(xiàn) Set 接口,也屬于寫時(shí)復(fù)制容器,其實(shí) CopyOnWriteArraySet 內(nèi)部是用
CopyOnWriteArrayList 實(shí)現(xiàn)的,所以 CopyOnWriteArraySet 的特性幾乎和 CopyOnWriteArrayList 一樣,簡單起見,可以看看以下關(guān)鍵代碼:
private final CopyOnWriteArrayList<E> al;
public boolean add(E e) {
return al.addIfAbsent(e);
}
public Object[] toArray() {
return al.toArray();
}
BlockingQueue
BlockingQueue 繼承自 Queue,Queue 繼承自 Collection,BlockingQueue 提供了可阻塞的 put 和 take 方法,以及支持定時(shí)的 offer 和 poll 方法。如果隊(duì)列已經(jīng)滿了,那么 put 方法將阻塞直到有空間可用;如果隊(duì)列為空,那么 take 方法將會(huì)阻塞知道有元素可用。隊(duì)列可以是有界的也可以是無界的,無界隊(duì)列永遠(yuǎn)不會(huì)充滿,所以無界隊(duì)列的 put 方法也永遠(yuǎn)不會(huì)阻塞。
接口
BlockingQueue 主要方法如下所示:
//添加元素,添加成功則返回 true,隊(duì)列已滿,則拋出 IllegalStateException 異常
boolean add(E e);
//添加元素,添加成功返回 true,添加失敗 返回 false
boolean offer(E e);
//添加元素,如果隊(duì)列已滿,則阻塞等待
void put(E e) throws InterruptedException;
//添加元素,如果隊(duì)列已滿,則阻塞等待一段時(shí)間,等待后還是插入失敗,則返回 false
boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException;
//獲取并刪除隊(duì)列第一個(gè)元素,如果隊(duì)列為空,則阻塞等待
E take() throws InterruptedException;
//獲取并刪除隊(duì)列第一個(gè)元素
//如果隊(duì)列為空,則阻塞等待一段時(shí)間,等待后還是為空,則返回 null
E poll(long timeout, TimeUnit unit)
throws InterruptedException;
//獲取并刪除隊(duì)列第一個(gè)元素,如果隊(duì)列為空,則拋出 NoSuchElementException 異常
E remove();
//獲取并刪除隊(duì)列第一個(gè)元素,如果隊(duì)列為空,則返回 null
E poll();
//獲取隊(duì)列第一個(gè)元素,但是并不出隊(duì),如果隊(duì)列為空,拋出 NoSuchElementException 異常
E element();
//獲取隊(duì)列第一個(gè)元素,但是并不出隊(duì),如果隊(duì)列為空,則返回 null
E peek()
使用場(chǎng)景
BlockingQueue 經(jīng)常用于生產(chǎn)者-消費(fèi)者設(shè)計(jì)模式中。該模式將“找出需要完成的工作”與“執(zhí)行工作”這兩個(gè)過程分離開來。生產(chǎn)者-消費(fèi)者模式能夠簡化開發(fā)過程,因?yàn)樗軌蛳a(chǎn)者類和消費(fèi)者類之間的代碼依賴性。此外,該模式還將生產(chǎn)數(shù)據(jù)的過程與使用數(shù)據(jù)的過程解耦以簡化工作負(fù)載的管理,因?yàn)檫@兩個(gè)過程在處理數(shù)據(jù)的速率上有所不同。
一種常見的生產(chǎn)者-消費(fèi)者模式就是線程池與工作隊(duì)列的組合,在 Executor 任務(wù)執(zhí)行框架中就體現(xiàn)了這種模式。
ArrayBlockingQueue
它是一個(gè)由數(shù)組支持的有界阻塞隊(duì)列。它的容納大小是固定的。此隊(duì)列按 FIFO(先進(jìn)先出)原則對(duì)元素進(jìn)行排序。數(shù)組容量在構(gòu)造函數(shù)里傳入。以下是它的核心成員變量:
/** 數(shù)組作為隊(duì)列*/
final Object[] items;
/** 隊(duì)列頭部坐標(biāo) */
int takeIndex;
/** 隊(duì)尾坐標(biāo) **/
int putIndex;
/** 隊(duì)列中元素個(gè)數(shù) **/
int count;
/** 獨(dú)占鎖,對(duì)隊(duì)列的訪問需要獲取該鎖,保證線程訪問同步 **/
final ReentrantLock lock;
/** 非空等待條件變量,用于阻塞/通知隊(duì)列為空時(shí)的 take 線程 */
private final Condition notEmpty;
/** 未滿等待條件隊(duì)列,用于阻塞/通知隊(duì)列充滿時(shí)的 put 線程 **/
private final Condition notFull;
ArrayBlockingQueue 有以下特點(diǎn):
- 有界數(shù)組,隊(duì)列容量在構(gòu)造時(shí)傳入;
- 通過 takeIndex / putIndex 設(shè)計(jì),數(shù)組是 循環(huán)數(shù)組;
- lock 保護(hù)臨界區(qū),notEmpty 和 notFull 用來阻塞和通知等待線程;
- 不接受 null 元素;
- 構(gòu)造函數(shù)中可以指定公平性,公平性是通過 lock 實(shí)現(xiàn)的。
LinkedBlockingQueue
它是一個(gè)基于已鏈接節(jié)點(diǎn)的,可以無界,也可以有界的阻塞隊(duì)列。此隊(duì)列按 FIFO(先進(jìn)先出)原則對(duì)元素進(jìn)行排序。以下是它的核心成員變量:
/** 隊(duì)列容量,默認(rèn)為 Integer.MAX_VALUE */
private final int capacity;
/** 當(dāng)前元素?cái)?shù)量 */
private final AtomicInteger count = new AtomicInteger();
/** 隊(duì)列頭部 */
transient Node<E> head;
/** 隊(duì)列尾部 */
private transient Node<E> last;
/** 從隊(duì)列取數(shù)據(jù)時(shí)的獨(dú)占鎖 */
private final ReentrantLock takeLock = new ReentrantLock();
/** 條件變量,用于阻塞/喚醒隊(duì)列為空時(shí)的 take 線程 */
private final Condition notEmpty = takeLock.newCondition();
/** 向隊(duì)列加入數(shù)據(jù)時(shí)的獨(dú)占鎖 */
private final ReentrantLock putLock = new ReentrantLock();
/** 條件變量,用于阻塞/喚醒隊(duì)列充滿時(shí)的 put 線程 */
private final Condition notFull = putLock.newCondition();
LinkedBlockingQueue 有以下特點(diǎn):
- 內(nèi)部鏈表結(jié)構(gòu),默認(rèn)無界;
- 初始化時(shí),head 和 last 指向同一個(gè)占位 node,該 node 的元素為 null;
- takeLock 保護(hù)多線程同步訪問 head,取數(shù)據(jù);
- putLock 保護(hù)多線程同步訪問 last,往隊(duì)列加數(shù)據(jù);
- 讀線程和寫線程同步分離,提高隊(duì)列的并發(fā)行,同時(shí)可以有一個(gè)讀線程和寫線程訪問隊(duì)列;
- 沒有公平性設(shè)計(jì)。
PriorityBlockingQueue
它是一個(gè)基于數(shù)組的無界數(shù)組。它使用與類PriorityQueue相同的順序規(guī)則,并且提供了阻塞檢索的操作。以下是它的核心成員變量:
/** 元素?cái)?shù)組,數(shù)組結(jié)構(gòu),但是是用極小堆來處理的 */
private transient Object[] queue;
/** 隊(duì)列當(dāng)前大小 */
private transient int size;
/**
* 排序使用的對(duì)比式,如果沒有設(shè)置,則需要對(duì)象實(shí)現(xiàn) Comparable 接口
*/
private transient Comparator<? super E> comparator;
/**
* public 接口都要獲取該鎖,用于保護(hù)臨界區(qū)
*/
private final ReentrantLock lock;
/**
* 當(dāng)取數(shù)據(jù)時(shí),隊(duì)列為空,線程等待的條件變量
*/
private final Condition notEmpty;
/**
* 數(shù)組擴(kuò)容時(shí)的自旋鎖.
*/
private transient volatile int allocationSpinLock;
PriorityBlockingQueue 有以下特點(diǎn):
- 內(nèi)部數(shù)組存儲(chǔ)數(shù)據(jù),讀和寫需要同一個(gè)鎖進(jìn)行同步保護(hù);
- 雖然是數(shù)組存儲(chǔ),但是使用極小堆進(jìn)行排序;
- 初始化可以指定 comparator,如果沒有指定,則需要對(duì)象實(shí)現(xiàn) Comparable 接口,調(diào)用它的 compareTo;
- 不接受 null
- 雖然是數(shù)組,但是必要時(shí)回?cái)U(kuò)容,所以無界,只需要 notEmpty 條件變量
SynchronousQueue
它是一個(gè)沒有數(shù)據(jù)緩沖的 BlockingQueue,生產(chǎn)者線程對(duì)其的插入操作 put 必須等待消費(fèi)者的移除操作take,反過來也一樣。
不像 ArrayBlockingQueue 或 LinkedListBlockingQueue,SynchronousQueue 內(nèi)部并沒有數(shù)據(jù)緩存空間,你不能調(diào)用peek()方法來看隊(duì)列中是否有數(shù)據(jù)元素,因?yàn)閿?shù)據(jù)元素只有當(dāng)你試著取走的時(shí)候才可能存在,不取走而只想偷窺一下是不行的,當(dāng)然遍歷這個(gè)隊(duì)列的操作也是不允許的。隊(duì)列頭元素是第一個(gè)排隊(duì)要插入數(shù)據(jù)的線程,而不是要交換的數(shù)據(jù)。數(shù)據(jù)是在配對(duì)的生產(chǎn)者和消費(fèi)者線程之間直接傳遞的,并不會(huì)將數(shù)據(jù)緩沖數(shù)據(jù)到隊(duì)列中??梢赃@樣來理解:生產(chǎn)者和消費(fèi)者互相等待對(duì)方,握手,然后一起離開。
SynchronousQueue 有以下特點(diǎn):
- 沒有數(shù)據(jù)緩沖,寫線程和讀線程一一對(duì)應(yīng);
- 排隊(duì)等候的線程,而非數(shù)據(jù);
- 支持 FIFO 或 LIFO,公平模式下,使用 TransferQueue 支持 FIFO,非公平模式下,使用 TransferStack 支持 LIFO;
- 內(nèi)部用鏈表存儲(chǔ)等候線程;
- 實(shí)現(xiàn)時(shí),使用輪詢 CAS 來實(shí)現(xiàn)無鎖化處理;
內(nèi)容來源
Java 并發(fā)編程實(shí)戰(zhàn)
http://ifeve.com/java-copy-on-write/
http://wsmajunfeng.iteye.com/blog/1629354