HashMap存在的問(wèn)題:
HashMap線程不安全
因?yàn)槎嗑€程環(huán)境下,使用Hashmap進(jìn)行put操作可能會(huì)引起死循環(huán),導(dǎo)致CPU利用率接近100%,所以在并發(fā)情況下不能使用HashMap。例如如下代碼:
final HashMap<String, String> map = new HashMap<String, String>(2);
for (int i = 0; i < 10000; i++) {
new Thread(new Runnable() {
@Override
public void run() {
map.put(UUID.randomUUID().toString(), "");
}
}).start();
}
Hashtable線程安全但效率低下
Hashtable容器使用synchronized來(lái)保證線程安全,但在線程競(jìng)爭(zhēng)激烈的情況下Hashtable的效率非常低下。因?yàn)楫?dāng)一個(gè)線程訪問(wèn)Hashtable的同步方法時(shí),其他線程訪問(wèn)Hashtable的同步方法時(shí),可能會(huì)進(jìn)入阻塞或輪詢狀態(tài)。如線程1使用put進(jìn)行添加元素,線程2不但不能使用put方法添加元素,并且也不能使用get方法來(lái)獲取元素,所以競(jìng)爭(zhēng)越激烈效率越低。
解決
分段鎖
HashTable容器在競(jìng)爭(zhēng)激烈的并發(fā)環(huán)境下表現(xiàn)出效率低下的原因,是因?yàn)樗性L問(wèn)HashTable的線程都必須競(jìng)爭(zhēng)同一把鎖,那假如容器里有多把鎖,每一把鎖用于鎖容器其中一部分?jǐn)?shù)據(jù),那么當(dāng)多線程訪問(wèn)容器里不同數(shù)據(jù)段的數(shù)據(jù)時(shí),線程間就不會(huì)存在鎖競(jìng)爭(zhēng),從而可以有效的提高并發(fā)訪問(wèn)效率,這就是ConcurrentHashMap所使用的鎖分段技術(shù),首先將數(shù)據(jù)分成一段一段的存儲(chǔ),然后給每一段數(shù)據(jù)配一把鎖,當(dāng)一個(gè)線程占用鎖訪問(wèn)其中一個(gè)段數(shù)據(jù)的時(shí)候,其他段的數(shù)據(jù)也能被其他線程訪問(wèn)。有些方法需要跨段,比如size()和containsValue(),它們可能需要鎖定整個(gè)表而而不僅僅是某個(gè)段,這需要按順序鎖定所有段,操作完畢后,又按順序釋放所有段的鎖。這里“按順序”是很重要的,否則極有可能出現(xiàn)死鎖,在ConcurrentHashMap內(nèi)部,段數(shù)組是final的,并且其成員變量實(shí)際上也是final的,但是,僅僅是將數(shù)組聲明為final的并不保證數(shù)組成員也是final的,這需要實(shí)現(xiàn)上的保證。這可以確保不會(huì)出現(xiàn)死鎖,因?yàn)楂@得鎖的順序是固定的。
ConcurrentHashMap是由Segment數(shù)組結(jié)構(gòu)和HashEntry數(shù)組結(jié)構(gòu)組成。Segment是一種可重入鎖ReentrantLock,在ConcurrentHashMap里扮演鎖的角色,HashEntry則用于存儲(chǔ)鍵值對(duì)數(shù)據(jù)。一個(gè)ConcurrentHashMap里包含一個(gè)Segment數(shù)組,Segment的結(jié)構(gòu)和HashMap類似,是一種數(shù)組和鏈表結(jié)構(gòu), 一個(gè)Segment里包含一個(gè)HashEntry數(shù)組,每個(gè)HashEntry是一個(gè)鏈表結(jié)構(gòu)的元素, 每個(gè)Segment守護(hù)者一個(gè)HashEntry數(shù)組里的元素,當(dāng)對(duì)HashEntry數(shù)組的數(shù)據(jù)進(jìn)行修改時(shí),必須首先獲得它對(duì)應(yīng)的Segment鎖。

JDK1.8的實(shí)現(xiàn)已經(jīng)拋棄了Segment分段鎖機(jī)制,利用CAS+Synchronized來(lái)保證并發(fā)更新的安全。數(shù)據(jù)結(jié)構(gòu)采用:數(shù)組+鏈表+紅黑樹(shù)。

話不多說(shuō),還是看源碼吧:
構(gòu)造:
//構(gòu)造方法
public ConcurrentHashMap(int initialCapacity) {
if (initialCapacity < 0)//判斷參數(shù)是否合法
throw new IllegalArgumentException();
int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
MAXIMUM_CAPACITY ://最大為2^30
tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));//根據(jù)參數(shù)調(diào)整table的大小
this.sizeCtl = cap;//獲取容量
//ConcurrentHashMap在構(gòu)造函數(shù)中只會(huì)初始化sizeCtl值,并不會(huì)直接初始化table
}
//調(diào)整table的大小
private static final int tableSizeFor(int c) {//返回一個(gè)大于輸入?yún)?shù)且最小的為2的n次冪的數(shù)。
int n = c - 1;
n |= n >>> 1;
n |= n >>> 2;
n |= n >>> 4;
n |= n >>> 8;
n |= n >>> 16;
return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1;
}
tableSizeFor(int c)的原理:將c最高位以下通過(guò)|=運(yùn)算全部變成1,最后返回的時(shí)候,返回n+1;
eg:當(dāng)輸入為25的時(shí)候,n等于24,轉(zhuǎn)成二進(jìn)制為11000,右移1位為01100,將11000與01100進(jìn)行或("|")操作,得到11100。接下來(lái)右移兩位得00111,再進(jìn)行或操作得11111,接下來(lái)操作n的值就不會(huì)變化了。最后返回的時(shí)候,返回n+1,也就是100000,十進(jìn)制為32。按照這種邏輯得到2的n次冪的數(shù)。
那么為什么要先-1再+1呢?輸入若是為0,那么不論怎么操作,n還是0,但是HashMap的容量只有大于0時(shí)才有意義。
table初始化:
table初始化操作會(huì)延緩到第一次put行為。但是put是可以并發(fā)執(zhí)行的,那么是如何實(shí)現(xiàn)table只初始化一次的?接著上源碼:
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh; K fk; V fv;
if (tab == null || (n = tab.length) == 0)//判斷table還未初始化
tab = initTable();//初始化table
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value)))
break; // no lock when adding to empty bin
}
...省略一部分源碼
}
}
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) {
//如果一個(gè)線程發(fā)現(xiàn)sizeCtl<0,意味著另外的線程執(zhí)行CAS操作成功,當(dāng)前線程只需要讓出cpu時(shí)間片,
//由于sizeCtl是volatile的,保證了順序性和可見(jiàn)性
if ((sc = sizeCtl) < 0)//sc保存了sizeCtl的值
Thread.yield(); // lost initialization race; just spin
else if (U.compareAndSetInt(this, SIZECTL, sc, -1)) {//cas操作判斷并置為-1
try {
if ((tab = table) == null || tab.length == 0) {
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;//DEFAULT_CAPACITY = 16,若沒(méi)有參數(shù)則大小默認(rèn)為16
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
break;
}
}
return tab;
}
put操作
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode());//哈希算法
int binCount = 0;
for (Node<K,V>[] tab = table;;) {//無(wú)限循環(huán),確保插入成功
Node<K,V> f; int n, i, fh; K fk; V fv;
if (tab == null || (n = tab.length) == 0)//表為空或表長(zhǎng)度為0
tab = initTable();//初始化表
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {//i = (n - 1) & hash為索引值,查找該元素,
//如果為null,說(shuō)明第一次插入
if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value)))
break; // no lock when adding to empty bin
}
else if ((fh = f.hash) == MOVED)//MOVED=-1;當(dāng)前正在擴(kuò)容,一起進(jìn)行擴(kuò)容操作
tab = helpTransfer(tab, f);
else if (onlyIfAbsent && fh == hash && // check first node
((fk = f.key) == key || fk != null && key.equals(fk)) &&
(fv = f.val) != null)
return fv;
else {
V oldVal = null;
synchronized (f) {//其他情況加鎖同步
if (tabAt(tab, i) == f) {
if (fh >= 0) {
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key, value);
break;
}
}
}
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
else if (f instanceof ReservationNode)
throw new IllegalStateException("Recursive update");
}
}
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
addCount(1L, binCount);
return null;
}
//哈希算法
static final int spread(int h) {
return (h ^ (h >>> 16)) & HASH_BITS;
}
//保證拿到最新的數(shù)據(jù)
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
return (Node<K,V>)U.getObjectAcquire(tab, ((long)i << ASHIFT) + ABASE);
}
//CAS操作插入節(jié)點(diǎn),比較數(shù)組下標(biāo)為i的節(jié)點(diǎn)是否為c,若是,用v交換,否則不操作。
//如果CAS成功,表示插入成功,結(jié)束循環(huán)進(jìn)行addCount(1L, binCount)看是否需要擴(kuò)容
static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
Node<K,V> c, Node<K,V> v) {
return U.compareAndSetObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
}
table擴(kuò)容
當(dāng)table容量不足的時(shí)候,即table的元素?cái)?shù)量達(dá)到容量閾值sizeCtl,需要對(duì)table進(jìn)行擴(kuò)容。 整個(gè)擴(kuò)容分為兩部分:
- 構(gòu)建一個(gè)nextTable,大小為table的兩倍。
- 把table的數(shù)據(jù)復(fù)制到nextTable中。
這兩個(gè)過(guò)程在單線程下實(shí)現(xiàn)很簡(jiǎn)單,但是ConcurrentHashMap是支持并發(fā)插入的,擴(kuò)容操作自然也會(huì)有并發(fā)的出現(xiàn),這種情況下,第二步可以支持節(jié)點(diǎn)的并發(fā)復(fù)制,這樣性能自然提升不少,但實(shí)現(xiàn)的復(fù)雜度也上升了一個(gè)臺(tái)階。
繼續(xù)上源碼:
第一步,構(gòu)建nextTable,毫無(wú)疑問(wèn),這個(gè)過(guò)程只能只有單個(gè)線程進(jìn)行nextTable的初始化.
private final void addCount(long x, int check) {
... 省略部分代碼
if (check >= 0) {
Node<K,V>[] tab, nt; int n, sc;
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
int rs = resizeStamp(n);
if (sc < 0) {// sc < 0 表明此時(shí)有別的線程正在進(jìn)行擴(kuò)容
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
// 不滿足前面5個(gè)條件時(shí),嘗試參與此次擴(kuò)容,把正在執(zhí)行transfer任務(wù)的線程數(shù)加1,+2代表有1個(gè),+1代表有0個(gè)
transfer(tab, nt);
}
//試著讓自己成為第一個(gè)執(zhí)行transfer任務(wù)的線程
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);// 去執(zhí)行transfer任務(wù)
s = sumCount();// 重新計(jì)數(shù),判斷是否需要開(kāi)啟下一輪擴(kuò)容
}
}
}
節(jié)點(diǎn)從table移動(dòng)到nextTable,大體思想是遍歷、復(fù)制的過(guò)程。遍歷過(guò)所有的節(jié)點(diǎn)以后就完成了復(fù)制工作,把table指向nextTable,并更新sizeCtl為新數(shù)組大小的0.75倍 ,擴(kuò)容完成。
get操作
- 判斷table是否為空,如果為空,直接返回null。
- 計(jì)算key的hash值,并獲取指定table中指定位置的Node節(jié)點(diǎn),通過(guò)遍歷鏈表或則樹(shù)結(jié)構(gòu)找到對(duì)應(yīng)的節(jié)點(diǎn),返回value值。
源碼:
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
int h = spread(key.hashCode());
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
else if (eh < 0)
return (p = e.find(h, key)) != null ? p.val : null;
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}
和HashTable的區(qū)別:
ConcurrentHashMap 是一個(gè)并發(fā)散列映射表,它允許完全并發(fā)的讀取,并且支持給定數(shù)量的并發(fā)更新。
而HashTable和同步包裝器包裝的 HashMap,使用一個(gè)全局的鎖來(lái)同步不同線程間的并發(fā)訪問(wèn),同一時(shí)間點(diǎn),只能有一個(gè)線程持有鎖,也就是說(shuō)在同一時(shí)間點(diǎn),只能有一個(gè)線程能訪問(wèn)容器,這雖然保證多線程間的安全并發(fā)訪問(wèn),但同時(shí)也導(dǎo)致對(duì)容器的訪問(wèn)變成串行化的了。
總結(jié):
Hashtable的任何操作都會(huì)把整個(gè)表鎖住,是阻塞的。好處是總能獲取最實(shí)時(shí)的更新,比如說(shuō)線程A調(diào)用putAll寫(xiě)入大量數(shù)據(jù),期間線程B調(diào)用get,線程B就會(huì)被阻塞,直到線程A完成putAll,因此線程B肯定能獲取到線程A寫(xiě)入的完整數(shù)據(jù)。壞處是所有調(diào)用都要排隊(duì),效率較低。
ConcurrentHashMap 是設(shè)計(jì)為非阻塞的。在更新時(shí)會(huì)局部鎖住某部分?jǐn)?shù)據(jù),但不會(huì)把整個(gè)表都鎖住。同步讀取操作則是完全非阻塞的。好處是在保證合理的同步前提下,效率很高。壞處是嚴(yán)格來(lái)說(shuō)讀取操作不能保證反映最近的更新。例如線程A調(diào)用putAll寫(xiě)入大量數(shù)據(jù),期間線程B調(diào)用get,則只能get到目前為止已經(jīng)順利插入的部分?jǐn)?shù)據(jù)。
應(yīng)該根據(jù)具體的應(yīng)用場(chǎng)景選擇合適的HashMap。