JAVA 并發(fā)容器和阻塞隊(duì)列
JAVA 并發(fā)容器
ConcurrentHashMapjdk7 vs jdk8 異同和優(yōu)缺點(diǎn)
-
數(shù)據(jù)結(jié)構(gòu)
JDK7 采用segment 分段鎖的思想 ,jdk8 中是使用 數(shù)組+鏈表+紅黑樹(shù) 實(shí)現(xiàn)。
image.png

-
并發(fā)度
JDK7 每個(gè)segment 獨(dú)立加鎖, 最大并發(fā)個(gè)數(shù)是 segment 個(gè)數(shù),默認(rèn)是16。
JDK8中 ,鎖的粒度更細(xì),理想情況下數(shù)組長(zhǎng)度就是就是支持并發(fā)的最大個(gè)數(shù),并發(fā)度提高很多。
-
保證并發(fā)安全的原理
JDK7 采用繼承自 ReentrantLock 的segment 分段鎖的思想,來(lái)保證線程安全。
JDK8 放棄了Segment 的設(shè)計(jì), 采用 node+ CAS + synchronizad 保證線程安全。
-
查詢(xún)時(shí)間復(fù)雜度
JDK7 遍歷鏈表的事件復(fù)雜度是 O(n), 【n 為鏈表的長(zhǎng)度】。
JDK8 如果變成遍歷紅黑樹(shù),那么時(shí)間復(fù)雜度為 O(logn) 【n 為紅黑樹(shù)節(jié)點(diǎn)個(gè)數(shù)】。
ConcurrentHashMap 與 HashTable 的區(qū)別
-
出現(xiàn)的版本不同
HashTable 是在 JDK1.0 的時(shí)候存在,在JDK1.2 的版本實(shí)現(xiàn)了Map 接口,成為了集合框架中的一員。
ConcurrentHashMap 則是在JDK1.5 中出現(xiàn)的。出現(xiàn)的年代不同,所以在實(shí)現(xiàn)方式和性能上都有很大的不同。
-
實(shí)現(xiàn)線程安全的方式不同
HashTable 中的每個(gè)方法都被 synchronized 關(guān)鍵字修飾了, 這也就保證了線程的安全。但是這種方式在高并發(fā)的情況下性能并不是非常良好的。
ConcurrentHashMap 是通過(guò) node+CAS+synchronized 的方式實(shí)現(xiàn)的
-
性能不同
當(dāng)線程數(shù)量急劇增多的時(shí)候,因?yàn)槊恳淮涡薷亩夹枰i住整個(gè)對(duì)象,其他線程再此時(shí)是不能操作的,導(dǎo)致性能會(huì)急劇下降,不僅如此還會(huì)帶來(lái)額外的上下文切換開(kāi)銷(xiāo), 所以它的吞吐量甚至沒(méi)有單線程的情況下要好。
ConcurrentHashMap 上鎖也僅僅是**對(duì)一部分上鎖 ** ,多線程的性能帶來(lái)的是增幅的效果。
-
迭代時(shí)元素內(nèi)容修改不同
HashTable 在迭代期間不能修改內(nèi)容,否則會(huì)因?yàn)?strong>modCount != expectedModeCount 拋出 ConcurrentModificationException 異常, 而 ConcurrentHashMap 在迭代期間修改內(nèi)容**也不會(huì)拋出ConcurrentModificationException **異常。
有關(guān)Map 的一些問(wèn)題
為什么Map 桶中超過(guò)8 個(gè)才轉(zhuǎn)為紅黑樹(shù)?
- 采用拉鏈法在相同的hash 位的值, 當(dāng)鏈表的長(zhǎng)度大于等于閾值 (TREEIFY_THRESHOLD=8) 同時(shí)還滿(mǎn)足容量大于等于( MIN_TREEIFY_CAPACITY = 64) ** 的要求, 就會(huì)把鏈表轉(zhuǎn)化為紅黑樹(shù), 后續(xù)如果鏈表的容量小于等于(UNTREEIFY_THRESHOLD = 6)時(shí),又會(huì)恢復(fù)為鏈表**的形式。
為什么不一開(kāi)始就采用紅黑樹(shù)的結(jié)構(gòu)呢?
- 單個(gè)TreeNode 節(jié)點(diǎn)占用的空間大約是普通node 的兩倍, 當(dāng)node 足夠多時(shí)才轉(zhuǎn)化為 TreeNode,
- 如果hashCode 分布良好,hash 計(jì)算離散性比較好的情況下,那么紅黑樹(shù)這種數(shù)據(jù)結(jié)構(gòu)是很少會(huì)用到的,在理想情況下,鏈表的長(zhǎng)度符合泊松分布,各個(gè)鏈表長(zhǎng)度的命中概率依次遞減,當(dāng)長(zhǎng)度為8 時(shí) 概率僅僅為 0.00000006是一個(gè)非常小的概率,,通常情況下并不會(huì)發(fā)生鏈表向紅黑樹(shù)的轉(zhuǎn)換。
- 如果發(fā)現(xiàn) hashMap 或 ConcurrrentHashMap 內(nèi)部出現(xiàn)了 紅黑樹(shù)的結(jié)構(gòu),這個(gè)時(shí)候說(shuō)明我們的hash算法出現(xiàn)了問(wèn)題,(hashCode 的實(shí)現(xiàn)邏輯)
CopyOnWriteList 有什么特點(diǎn)?
適用場(chǎng)景
- 讀操作可以盡可能的快,而寫(xiě)操作即使慢一下也沒(méi)有關(guān)系。
- 讀多寫(xiě)少
讀寫(xiě)規(guī)則
-
讀寫(xiě)鎖的思想
讀寫(xiě)鎖的思想是 讀讀共享 ,其他都是互斥的(讀寫(xiě),寫(xiě)寫(xiě),寫(xiě)讀),并發(fā)讀并不會(huì)修改數(shù)據(jù),不會(huì)帶來(lái)線程安全問(wèn)題,寫(xiě)操作會(huì)修改原來(lái)的數(shù)據(jù),因此寫(xiě)操作的時(shí)候,不允許讀線程和寫(xiě)線程的加入。
-
對(duì)讀寫(xiě)鎖規(guī)則的升級(jí)
copyOnWriteArrayList 讀取是不用加鎖的,,寫(xiě)入也不會(huì)阻塞讀取操作,即 寫(xiě)讀不互斥。但是寫(xiě)寫(xiě)是不能線程共享的,寫(xiě)寫(xiě)互斥
特點(diǎn)
- 當(dāng)容器內(nèi)的內(nèi)容被個(gè)修改時(shí),copy 出一個(gè)新的容器, 然后在新的容器中進(jìn)行操作,修改完成 再將原來(lái)容器的引用指向新的容器
- 迭代期間允許修改集合內(nèi)容
缺點(diǎn)
- 內(nèi)存占用問(wèn)題
- 在元素較多或者復(fù)雜的情況下。復(fù)制的開(kāi)銷(xiāo)很大
- 數(shù)據(jù)一致性問(wèn)題
阻塞隊(duì)列與非阻塞隊(duì)列
常見(jiàn)的阻塞隊(duì)列
- ArrayBlockingQueue
- LinkedBlockingQueue
- SynchronousQueue
- PriorityBlockingQueue
- DelayQueue
ArrayBlockingQueue
-
是典型的 有界隊(duì)列 ,其內(nèi)部是用 數(shù)組 結(jié)構(gòu)存儲(chǔ)元素的, 利用 ReentrantLock 實(shí)現(xiàn)線程安全。內(nèi)部排序方式是 先進(jìn)先出(FIFO)的。
/** The queued items */ final Object[] items; /** Main lock guarding all access */ final ReentrantLock lock;/** * Inserts the specified element at the tail of this queue if it is * possible to do so immediately without exceeding the queue's capacity, * returning {@code true} upon success and {@code false} if this queue * is full. This method is generally preferable to method {@link #add}, * which can fail to insert an element only by throwing an exception. * * @throws NullPointerException if the specified element is null */ public boolean offer(E e) { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lock(); try { if (count == items.length) return false; else { enqueue(e); return true; } } finally { lock.unlock(); } }
-
在創(chuàng)建時(shí) 指定容量和排隊(duì)任務(wù)是否公平 , (通常非公平的吞吐量要優(yōu)于公平的場(chǎng)景)
/** * Creates an {@code ArrayBlockingQueue} with the given (fixed) * capacity and the specified access policy. * * @param capacity the capacity of this queue * @param fair if {@code true} then queue accesses for threads blocked * on insertion or removal, are processed in FIFO order; * if {@code false} the access order is unspecified. * @throws IllegalArgumentException if {@code capacity < 1} */ public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); notFull = lock.newCondition(); }
LinkedBlockingQueue
-
是一個(gè)內(nèi)部用 鏈表 實(shí)現(xiàn)的BlockingQueue ,如果不指定 初始容量,那么他的默認(rèn)容量為 Integer.MAX_VALUE。 會(huì)成為一個(gè)無(wú)界隊(duì)列, 內(nèi)部排序方式是 先進(jìn)先出(FIFO)的。
public LinkedBlockingQueue() { this(Integer.MAX_VALUE); } -
同樣內(nèi)部也是通過(guò)ReentrantLock 來(lái)保證線程安全。
public E peek() { if (count.get() == 0) return null; final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { Node<E> first = head.next; if (first == null) return null; else return first.item; } finally { takeLock.unlock(); } }
SynchronousQueue
-
最大的不同之處在于 它的容量是 0 ,因此每次取數(shù)據(jù)都要先阻塞,直到有數(shù)據(jù)被放入,每次放入數(shù)據(jù)也會(huì)阻塞,直到有消費(fèi)者來(lái)取出。
/** * Inserts the specified element into this queue, waiting if necessary * up to the specified wait time for another thread to receive it. * * @return {@code true} if successful, or {@code false} if the * specified waiting time elapses before a consumer appears * @throws InterruptedException {@inheritDoc} * @throws NullPointerException {@inheritDoc} */ public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { if (e == null) throw new NullPointerException(); if (transferer.transfer(e, true, unit.toNanos(timeout)) != null) return true; if (!Thread.interrupted()) return false; throw new InterruptedException(); } 他會(huì)把數(shù)據(jù)直接從生產(chǎn)者傳給消費(fèi)者(direct handoff), 不需要存儲(chǔ)。
PriorityBlockingQueue
-
是一個(gè)支持 優(yōu)先級(jí)的無(wú)界阻塞隊(duì)列 ,可以通過(guò)自定義類(lèi)實(shí)現(xiàn) compareTo() 方法,或初始化是通過(guò)構(gòu)造函數(shù)傳入Comparator 來(lái)指定元素排序規(guī)則。
public PriorityBlockingQueue(int initialCapacity, Comparator<? super E> comparator) { if (initialCapacity < 1) throw new IllegalArgumentException(); this.lock = new ReentrantLock(); this.notEmpty = lock.newCondition(); this.comparator = comparator; this.queue = new Object[initialCapacity]; } -
他的默認(rèn)初始化容量為 11,最大容量是 Integer.MAX_VALUE - 8 內(nèi)部是 用 **數(shù)組 **的數(shù)據(jù)結(jié)構(gòu)存儲(chǔ)元素,(transient : 阻止屬性被序列化,盡在內(nèi)存中,不會(huì)序列化到磁盤(pán))
/** * Default array capacity. */ private static final int DEFAULT_INITIAL_CAPACITY = 11; /** * The maximum size of array to allocate. * Some VMs reserve some header words in an array. * Attempts to allocate larger arrays may result in * OutOfMemoryError: Requested array size exceeds VM limit */ private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8; private transient Object[] queue;
插入隊(duì)列的對(duì)象必須是可以比較大小的, 否則會(huì)拋出 ClassCastException。
-
消費(fèi)數(shù)據(jù)在隊(duì)列為空的是時(shí)候會(huì)阻塞 (take, pool),放入數(shù)據(jù)的時(shí)候不會(huì)被阻塞(因?yàn)殛?duì)列是無(wú)界的)。
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); E result; try { while ( (result = dequeue()) == null) notEmpty.await(); } finally { lock.unlock(); } return result; }
DelayQueue
- 是一個(gè)具有延遲功能的隊(duì)列, 可以設(shè)定讓隊(duì)列中的任務(wù)延遲多久后再執(zhí)行。
- 它是無(wú)界隊(duì)列,放入的元素必須實(shí)現(xiàn)了 Delayed 接口。(Delayed 繼承了 Comparable 接口, 擁有了比較和排序)
- 元素會(huì)根據(jù)延遲時(shí)間的長(zhǎng)短被放到隊(duì)列的不同位置, 越靠近隊(duì)列頭則早過(guò)期。
- DelayQueue 內(nèi)部使用了 PriorityQueue 的能力來(lái)排序。
阻塞隊(duì)列常用的方法
操作失敗拋出異常
| 方法 | 含義 | 特點(diǎn) |
|---|---|---|
| add | 添加一個(gè)元素 | 如果隊(duì)列滿(mǎn)了,拋出IllegalStateException |
| remove | 返回并刪除隊(duì)列的頭元素 | 如果隊(duì)列為空,拋出NoSuchElementException |
| element | 返回隊(duì)列的頭元素 | 如果隊(duì)列為空,拋出NoSuchElementException |
返回操作結(jié)果,不拋出異常
| 方法 | 含義 | 特點(diǎn) |
|---|---|---|
| offer | 添加一個(gè)元素 | 隊(duì)列滿(mǎn),返回false ,添加成功 返回true |
| poll | 返回并刪除隊(duì)列的頭元素 | 隊(duì)列空,刪除失敗,返回null |
| peeK | 返回隊(duì)列的頭元素 | 隊(duì)列空,刪除失敗,返回null |
陷入阻塞
| 方法 | 含義 | 特點(diǎn) |
|---|---|---|
| put | 添加一個(gè)元素 | 如果隊(duì)列滿(mǎn),則阻塞,直至隊(duì)列有空閑 |
| take | 返回并刪除隊(duì)列的頭元素 | 如果隊(duì)列空,則阻塞。直至隊(duì)列有元素 |
非阻塞隊(duì)列
ConCurrentLinkedQueue
- 使用CAS 非阻塞算法 + 重試 來(lái)實(shí)現(xiàn)線程安全。
boolean casNext(Node<E> cmp, Node<E> val) {
return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}
