Java程序員進(jìn)行并發(fā)編程時,相比于其他語言的程序員而言要倍感幸福,因為并發(fā)編程大師Doug Lea不遺余力地為Java開發(fā)者提供了非常多的并發(fā)容器和框架。如果使用C語言進(jìn)行開發(fā)的話,首先需要自己定義好底層數(shù)據(jù)結(jié)構(gòu)(也可以說是容器,比如鏈表/哈希/樹/...),然后結(jié)合使用mutex/cond等底層并發(fā)API來構(gòu)建并發(fā)安全的容器;如果使用C++的話可以使用STL,但是STL中容器并未對并發(fā)安全有很好的支持。
1 ConcurrentHashMap
在并發(fā)編程中使用HashMap可能導(dǎo)致程序死循環(huán)。而使用線程安全的HashTable效率又非常低下,基于以上兩個原因,便有了ConcurrentHashMap的登場機會,ConcurrentHashMap是并發(fā)安全且高效的HashMap。
(1) 線程不安全的HashMap
在多線程環(huán)境下,使用HashMap進(jìn)行put操作會引起死循環(huán),導(dǎo)致CPU利用率接近100%,所以在并發(fā)情況下不能使用HashMap。
public static void main(String[] args) {
final HashMap<String, String> map = new HashMap<String, String>(2);
Thread t = new Thread(new Runnable() {
@Override
public void run() {
for (int i = 0; i < 10000; i++) {
new Thread(new Runnable() {
@Override
public void run() {
map.put(UUID.randomUUID().toString(), "");
}
}, "ftf" + i).start();
}
}
}, "ftf");
t.start();
t.join();
}
執(zhí)行以上代碼就會導(dǎo)致死循環(huán),HashMap在并發(fā)執(zhí)行put操作時會引起死循環(huán)(具體來說就是put操作中rehash時可能導(dǎo)致鏈表形成循環(huán)結(jié)構(gòu)),是因為多線程會導(dǎo)致HashMap的Entry鏈表形成環(huán)形數(shù)據(jù)結(jié)構(gòu),一旦形成環(huán)形數(shù)據(jù)結(jié)構(gòu),Entry的next節(jié)點永遠(yuǎn)不為空,就會產(chǎn)生死循環(huán)獲取Entry。
(2) 效率低下的HashTable
HashTable容器使用synchronized來保證線程安全,但在線程競爭激烈的情況下HashTable的效率非常低下。因為當(dāng)一個線程訪問HashTable的同步方法,其他線程也訪問HashTable的同步方法時,會進(jìn)入阻塞或輪詢狀態(tài)。
(3) ConcurrentHashMap的鎖分段技術(shù)提高并發(fā)訪問效率
HashTable容器在競爭激烈的并發(fā)環(huán)境下表現(xiàn)出效率低下的原因是所有訪問HashTable的線程都必須競爭同一把鎖,假如容器里有多把鎖,每一把鎖用于鎖容器其中一部分?jǐn)?shù)據(jù),那么當(dāng)多線程訪問容器里不同數(shù)據(jù)段的數(shù)據(jù)時,線程間就不會存在鎖競爭,從而可以有效提高并發(fā)訪問效率,這就是ConcurrentHashMap所使用的鎖分段技術(shù)。首先將數(shù)據(jù)分成一段一段地存儲,然后給每一段數(shù)據(jù)配一把鎖,當(dāng)一個線程占用鎖訪問其中一個段數(shù)據(jù)的時候,其他段的數(shù)據(jù)也能被其他線程訪問。(注意:JDK1.8中舍棄了分段鎖技術(shù),改用CAS+Synchronized機制。以下ConcurrentHashMap講解以JDK1.8以例)
ConcurrentHashMap中一個重要的類就是Node,該類存儲鍵值對,所有插入ConcurrentHashMap的數(shù)據(jù)都包裝在這里面。它與HashMap中的定義很相似,但是但是有一些差別它對value和next屬性設(shè)置了volatile同步鎖,它不允許調(diào)用setValue方法直接改變Node的value域,它增加了find方法輔助map.get()方法??稍趃et方法返回的結(jié)果中更改對應(yīng)的value值。
static class Node<K,V> implements Map.Entry<K,V> {
final int hash;
final K key;
volatile V val;
volatile Node<K,V> next;
Node(int hash, K key, V val, Node<K,V> next) {
this.hash = hash;
this.key = key;
this.val = val;
this.next = next;
}
public final K getKey() { return key; }
public final V getValue() { return val; }
public final int hashCode() { return key.hashCode() ^ val.hashCode(); }
public final String toString(){ return key + "=" + val; }
public final V setValue(V value) {
throw new UnsupportedOperationException();
}
...
}
ConcurrentHashMap定義了三個原子操作,用于對指定位置的節(jié)點進(jìn)行操作。正是這些原子操作保證了ConcurrentHashMap的線程安全。
@SuppressWarnings("unchecked")
//獲得在i位置上的Node節(jié)點
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
}
//利用CAS算法設(shè)置i位置上的Node節(jié)點。之所以能實現(xiàn)并發(fā)是因為他指定了原來這個節(jié)點的值是多少
//在CAS算法中,會比較內(nèi)存中的值與你指定的這個值是否相等,如果相等才接受你的修改,否則拒絕你的修改
//因此當(dāng)前線程中的值并不是最新的值,這種修改可能會覆蓋掉其他線程的修改結(jié)果 有點類似于SVN
static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
Node<K,V> c, Node<K,V> v) {
return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
}
//利用volatile方法設(shè)置節(jié)點位置的值
static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {
U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v);
}
put方法
ConcurrentHashMap最常用的方法就是put和get方法了,put方法依然沿用了HashMap的put方法的思想,據(jù)hash值計算這個新插入的點在table中的位置i,如果i位置是空的,直接放進(jìn)去,否則進(jìn)行判斷,如果i位置是樹節(jié)點,按照樹的方式插入新的節(jié)點,否則把i插入到鏈表的末尾。ConcurrentHashMap中依然沿用這個思想,有一個最重要的不同點就是ConcurrentHashMap不允許key或value為null值。另外由于涉及到多線程,put方法就要復(fù)雜一點。在多線程中可能有以下兩種情況:
- 如果一個或多個線程正在對ConcurrentHashMap進(jìn)行擴容操作,當(dāng)前線程也要進(jìn)入擴容的操作中。這個擴容的操作之所以能被檢測到,是因為transfer方法中在空結(jié)點上插入forward節(jié)點,如果檢測到需要插入的位置被forward節(jié)點占有,就幫助進(jìn)行擴容;
- 如果檢測到要插入的節(jié)點是非空且不是forward節(jié)點,就對這個節(jié)點加鎖,這樣就保證了線程安全。盡管這個有一些影響效率,但是還是會比hashTable的synchronized要好得多。
get方法
get方法比較簡單,給定一個key來確定value的時候,必須滿足兩個條件 key相同 hash值相同,對于節(jié)點可能在鏈表或樹上的情況,需要分別去查找。get方法是不要加鎖的,因為Node節(jié)點中value數(shù)據(jù)類型時volatile的,保證了內(nèi)存可見性。
2 ConcurrentLinkedQueue
線程安全的隊列有2種實現(xiàn)方式,一種是非阻塞的,使用CAS算法實現(xiàn);另一種是阻塞的,使用鎖機制來實現(xiàn)。ConcurrentLinkedQueue是一種非阻塞型基于鏈接節(jié)點的無界線程安全隊列,它采用先進(jìn)先出的規(guī)
則對節(jié)點進(jìn)行排序。
ConcurrentLinkedQueue結(jié)構(gòu)類圖:

ConcurrentLinkedQueue由head節(jié)點和tail節(jié)點組成,每個節(jié)點(Node)由節(jié)點元素(item)和指向下一個節(jié)點(next)的引用組成,節(jié)點與節(jié)點之間就是通過這個next關(guān)聯(lián)起來,從而組成一張鏈表結(jié)構(gòu)的隊列。默認(rèn)情況下head節(jié)點存儲的元素為空,tail節(jié)點等于head節(jié)點。
入隊列
入隊列就是將如對隊節(jié)點添加到隊列尾部,入隊列主要做兩件事情:多線程同時入隊列操作時,使用CAS來保證操作。如果有一個線程正在入隊,那么它必須先獲取尾節(jié)點,然后設(shè)置尾節(jié)點的下一個節(jié)點為入隊節(jié)點,但這時可能有另外一個線程插隊了,那么隊列的尾節(jié)點就會發(fā)生變化,這時當(dāng)前線程要暫停入隊操作,然后重新獲取尾節(jié)點。

從源代碼角度來看,整個入隊過程確實做兩件事情:第一是定位出尾節(jié)點;第二是使用CAS算法將入隊節(jié)點設(shè)置成尾節(jié)點的next節(jié)點,如不成功則重試。
定位尾節(jié)點
tail節(jié)點并不總是尾節(jié)點,所以每次入隊都必須先通過tail節(jié)點來找到尾節(jié)點。尾節(jié)點可能是tail節(jié)點,也可能是tail節(jié)點的next節(jié)點。代碼中循環(huán)體中的第一個if就是判斷tail是否有next節(jié)點,有則表示next節(jié)點可能是尾節(jié)點。獲取tail節(jié)點的next節(jié)點需要注意的是p節(jié)點等于p的next節(jié)點的情況,只有一種可能就是p節(jié)點和p的next節(jié)點都等于空,表示這個隊列剛初始化,正準(zhǔn)備添加節(jié)點,所以需要返回head節(jié)點。獲取p節(jié)點的next節(jié)點代碼如下。

出隊列
出隊列的就是從隊列里返回一個節(jié)點元素,并清空該節(jié)點對元素的引用。與入隊列類似,并不是每次出隊時都更新head節(jié)點,當(dāng)head節(jié)點里有元素時,直接彈出head節(jié)點里的元素,而不會更新head節(jié)點。只有當(dāng)head節(jié)點里沒有元素時,出隊操作才會更新head節(jié)點。這種做法也是通過hops變量來減少使用CAS更新head節(jié)點的消耗,從而提高出隊效率。

首先獲取頭節(jié)點的元素,然后判斷頭節(jié)點元素是否為空,如果為空,表示另外一個線程已經(jīng)進(jìn)行了一次出隊操作將該節(jié)點的元素取走,如果不為空,則使用CAS的方式將頭節(jié)點的引用設(shè)置成null,如果CAS成功,則直接返回頭節(jié)點的元素,如果不成功,表示另外一個線程已經(jīng)進(jìn)行了一次出隊操作更新了head節(jié)點,導(dǎo)致元素發(fā)生了變化,需要重新獲取頭節(jié)點。
3 ConcurrentBlockingQueue
阻塞隊列就是支持阻塞插入和刪除的一個隊列。
支持阻塞的插入方法:意思是當(dāng)隊列滿時,隊列會阻塞插入元素的線程,直到隊列不滿。
支持阻塞的移除方法:意思是在隊列為空時,獲取元素的線程會等待隊列變?yōu)榉强铡?/p>
阻塞隊列常用于生產(chǎn)者和消費者的場景,生產(chǎn)者是向隊列里添加元素的線程,消費者是從隊列里取元素的線程。阻塞隊列就是生產(chǎn)者用來存放元素、消費者用來獲取元素的容器。
在阻塞隊列不可用時(添加時隊列已滿,移除時隊列為空),這兩個附加操作提供了4種處理方式:

注意:如果是無界阻塞隊列,隊列不可能會出現(xiàn)滿的情況,所以使用put或offer方法永遠(yuǎn)不會被阻塞,而且使用offer方法時,該方法永遠(yuǎn)返回true。
Java中的阻塞隊列
JDK7提供了以下7種隊列:
- ArrayBlockingQueue:一個由數(shù)組結(jié)構(gòu)組成的有界阻塞隊列。
- LinkedBlockingQueue:一個由鏈表結(jié)構(gòu)組成的有界阻塞隊列,默認(rèn)最大長度為Integer.MAX_VALUE。
- PriorityBlockingQueue:一個支持優(yōu)先級排序的無界阻塞隊列,注意不能保證同優(yōu)先級的相對順序。
- DelayQueue:一個使用優(yōu)先級隊列實現(xiàn)的無界阻塞隊列。
- SynchronousQueue:一個不存儲元素的阻塞隊列。
- LinkedTransferQueue:一個由鏈表結(jié)構(gòu)組成的無界阻塞隊列。
- LinkedBlockingDeque:一個由鏈表結(jié)構(gòu)組成的雙向阻塞隊列。
ArrayBlockingQueue
一個用數(shù)組實現(xiàn)的有界阻塞隊列。此隊列按照先進(jìn)先出(FIFO)的原則對元素進(jìn)行排序。
默認(rèn)情況下不保證線程公平的訪問隊列,所謂公平訪問隊列是指阻塞的線程,可以按照阻塞的先后順序訪問隊列,即先阻塞線程先訪問隊列。非公平性是對先等待的線程是非公平的,當(dāng)隊列可用時,阻塞的線程都可以爭奪訪問隊列的資格,有可能先阻塞的線程最后才訪問隊列。為了保證公平性,通常會降低吞吐量。
DelayQueue
DelayQueue是一個支持延時獲取元素的無界阻塞隊列。隊列使用PriorityQueue來實現(xiàn)。隊列中的元素必須實現(xiàn)Delayed接口,在創(chuàng)建元素時可以指定多久才能從隊列中獲取當(dāng)前元素。只有在延遲期滿時才能從隊列中提取元素。
DelayQueue非常有用,可以將DelayQueue運用在以下應(yīng)用場景。
● ·緩存系統(tǒng)的設(shè)計:可以用DelayQueue保存緩存元素的有效期,使用一個線程循環(huán)查詢DelayQueue,一旦能從DelayQueue中獲取元素時,表示緩存有效期到了。
● ·定時任務(wù)調(diào)度:使用DelayQueue保存當(dāng)天將會執(zhí)行的任務(wù)和執(zhí)行時間,一旦從DelayQueue中獲取到任務(wù)就開始執(zhí)行,比如TimerQueue就是使用DelayQueue實現(xiàn)的。
參考資料:
1《Java并發(fā)編程的藝術(shù)》
2 JAVA HASHMAP的死循環(huán) - 酷殼
3 ConcurrentHashMap源碼分析(JDK8版本)