我們知道,HashMap 是線程不安全的,為了使用線程安全的 HashMap,一種方法是使用 Collections 中的 synchronizedMap 方法。另一種方法是使用使用 ConcurrentHashMap。
一、Collections.synchronizedMap(Map map)鎖的實(shí)現(xiàn)方式。
這里做一下延伸,我們簡單看一下這個(gè)方法的源碼,不出所料這個(gè)方法是重寫了 HashMap 的方法,并且為這個(gè) map 對象加了 synchronized 鎖。如下圖:
二、ConcurrentHashMap 鎖的實(shí)現(xiàn)方式
在 Java 5 之后,JDK 引入了 java.util.concurrent 并發(fā)包 ,其中最常用的就是 ConcurrentHashMap 了, 它的原理是引用了內(nèi)部的 Segment ( ReentrantLock ) 分段鎖,保證在操作不同段 map 的時(shí)候, 可以并發(fā)執(zhí)行, 操作同段 map 的時(shí)候,進(jìn)行鎖的競爭和等待。從而達(dá)到線程安全, 且效率大于 synchronized。
但是在Java8之后,JDK又棄用了分段鎖的策略,重新使用synchronized來實(shí)現(xiàn)線程安全。
三、分段鎖(java8之前)
我們先來看一下分段鎖如何實(shí)現(xiàn)的。
- 數(shù)據(jù)結(jié)構(gòu)
ConcurrentHashMap的底層數(shù)據(jù)結(jié)構(gòu)和HashMap一樣,仍然是數(shù)組和鏈表。不同點(diǎn)在于,ConcurrentHashMap的外層不是一個(gè)大數(shù)組,而是一個(gè)Segment數(shù)組,每個(gè)Segment包含一個(gè)和HashMap數(shù)據(jù)結(jié)構(gòu)差不多的鏈表數(shù)組。如下圖所示:
- 尋址方式
在讀寫某個(gè)Key時(shí),先取該Key的哈希值。并將哈希值的高N位對Segment個(gè)數(shù)取模從而得到該Key應(yīng)該屬于哪個(gè)Segment,接著如同操作HashMap一樣操作這個(gè)Segment。為了保證不同的值均勻分布到不同的Segment,需要通過如下方法計(jì)算哈希值。
private int hash(Object k) {
int h = hashSeed;
if ((0 != h) && (k instanceof String)) {
return sun.misc.Hashing.stringHash32((String) k);
}
h ^= k.hashCode();
h += (h << 15) ^ 0xffffcd7d;
h ^= (h >>> 10);
h += (h << 3);
h ^= (h >>> 6);
h += (h << 2) + (h << 14);
return h ^ (h >>> 16);
}
同樣為了提高取模運(yùn)算效率,通過如下計(jì)算,ssize即為大于concurrencyLevel的最小的2的N次方,同時(shí)segmentMask為2^N-1。這一點(diǎn)跟上文中計(jì)算數(shù)組長度的方法一致。對于某一個(gè)Key的哈希值,只需要向右移segmentShift位以取高sshift位,再與segmentMask取與操作即可得到它在Segment數(shù)組上的索引。
int sshift = 0;
int ssize = 1;
while (ssize < concurrencyLevel) {
++sshift;
ssize <<= 1;
}
this.segmentShift = 32 - sshift;
this.segmentMask = ssize - 1;
Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
- 同步方式
Segment繼承自ReentrantLock,所以我們可以很方便的對每一個(gè)Segment上鎖。
對于讀操作,獲取Key所在的Segment時(shí),需要保證可見性(請參考如何保證多線程條件下的可見性)。具體實(shí)現(xiàn)上可以使用volatile關(guān)鍵字,也可使用鎖。但使用鎖開銷太大,而使用volatile時(shí)每次寫操作都會讓所有CPU內(nèi)緩存無效,也有一定開銷。ConcurrentHashMap使用如下方法保證可見性,取得最新的Segment。
Segment<K,V> s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)
獲取Segment中的HashEntry時(shí)也使用了類似方法
HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE)
對于寫操作,并不要求同時(shí)獲取所有Segment的鎖,因?yàn)槟菢酉喈?dāng)于鎖住了整個(gè)Map。它會先獲取該Key-Value對所在的Segment的鎖,獲取成功后就可以像操作一個(gè)普通的HashMap一樣操作該Segment,并保證該Segment的安全性。
同時(shí)由于其它Segment的鎖并未被獲取,因此理論上可支持concurrencyLevel(等于Segment的個(gè)數(shù))個(gè)線程安全的并發(fā)讀寫。
獲取鎖時(shí),并不直接使用lock來獲取,因?yàn)樵摲椒ǐ@取鎖失敗時(shí)會掛起(參考可重入鎖)。事實(shí)上,它使用了自旋鎖,如果tryLock獲取鎖失敗,說明鎖被其它線程占用,此時(shí)通過循環(huán)再次以tryLock的方式申請鎖。如果在循環(huán)過程中該Key所對應(yīng)的鏈表頭被修改,則重置retry次數(shù)。如果retry次數(shù)超過一定值,則使用lock方法申請鎖。
這里使用自旋鎖是因?yàn)樽孕i的效率比較高,但是它消耗CPU資源比較多,因此在自旋次數(shù)超過閾值時(shí)切換為互斥鎖。
- size操作
put、remove和get操作只需要關(guān)心一個(gè)Segment,而size操作需要遍歷所有的Segment才能算出整個(gè)Map的大小。一個(gè)簡單的方案是,先鎖住所有Sgment,計(jì)算完后再解鎖。但這樣做,在做size操作時(shí),不僅無法對Map進(jìn)行寫操作,同時(shí)也無法進(jìn)行讀操作,不利于對Map的并行操作。
為更好支持并發(fā)操作,ConcurrentHashMap會在不上鎖的前提逐個(gè)Segment計(jì)算3次size,如果某相鄰兩次計(jì)算獲取的所有Segment的更新次數(shù)(每個(gè)Segment都與HashMap一樣通過modCount跟蹤自己的修改次數(shù),Segment每修改一次其modCount加一)相等,說明這兩次計(jì)算過程中無更新操作,則這兩次計(jì)算出的總size相等,可直接作為最終結(jié)果返回。如果這三次計(jì)算過程中Map有更新,則對所有Segment加鎖重新計(jì)算Size。該計(jì)算方法代碼如下
public int size() {
final Segment<K,V>[] segments = this.segments;
int size;
boolean overflow; // true if size overflows 32 bits
long sum; // sum of modCounts
long last = 0L; // previous sum
int retries = -1; // first iteration isn't retry
try {
for (;;) {
if (retries++ == RETRIES_BEFORE_LOCK) {
for (int j = 0; j < segments.length; ++j)
ensureSegment(j).lock(); // force creation
}
sum = 0L;
size = 0;
overflow = false;
for (int j = 0; j < segments.length; ++j) {
Segment<K,V> seg = segmentAt(segments, j);
if (seg != null) {
sum += seg.modCount;
int c = seg.count;
if (c < 0 || (size += c) < 0)
overflow = true;
}
}
if (sum == last)
break;
last = sum;
}
} finally {
if (retries > RETRIES_BEFORE_LOCK) {
for (int j = 0; j < segments.length; ++j)
segmentAt(segments, j).unlock();
}
}
return overflow ? Integer.MAX_VALUE : size;
}
四、基于CAS的ConcurrentHashMap
Java 7為實(shí)現(xiàn)并行訪問,引入了Segment這一結(jié)構(gòu),實(shí)現(xiàn)了分段鎖,理論上最大并發(fā)度與Segment個(gè)數(shù)相等。Java 8為進(jìn)一步提高并發(fā)性,摒棄了分段鎖的方案,而是直接使用一個(gè)大的數(shù)組。同時(shí)為了提高哈希碰撞下的尋址性能,Java 8在鏈表長度超過一定閾值(8)時(shí)將鏈表(尋址時(shí)間復(fù)雜度為O(N))轉(zhuǎn)換為紅黑樹(尋址時(shí)間復(fù)雜度為O(long(N)))。其數(shù)據(jù)結(jié)構(gòu)如下圖所示:
- 尋址方式
Java 8的ConcurrentHashMap同樣是通過Key的哈希值與數(shù)組長度取模確定該Key在數(shù)組中的索引。同樣為了避免不太好的Key的hashCode設(shè)計(jì),它通過如下方法計(jì)算得到Key的最終哈希值。不同的是,Java 8的ConcurrentHashMap作者認(rèn)為引入紅黑樹后,即使哈希沖突比較嚴(yán)重,尋址效率也足夠高,所以作者并未在哈希值的計(jì)算上做過多設(shè)計(jì),只是將Key的hashCode值與其高16位作異或并保證最高位為0(從而保證最終結(jié)果為正整數(shù))。
static final int spread(int h) {
return (h ^ (h >>> 16)) & HASH_BITS;
}
- 同步方式
對于put操作,如果Key對應(yīng)的數(shù)組元素為null,則通過CAS操作將其設(shè)置為當(dāng)前值。如果Key對應(yīng)的數(shù)組元素(也即鏈表表頭或者樹的根元素)不為null,則對該元素使用synchronized關(guān)鍵字申請鎖,然后進(jìn)行操作。如果該put操作使得當(dāng)前鏈表長度超過一定閾值,則將該鏈表轉(zhuǎn)換為樹,從而提高尋址效率。
對于讀操作,由于數(shù)組被volatile關(guān)鍵字修飾,因此不用擔(dān)心數(shù)組的可見性問題。同時(shí)每個(gè)元素是一個(gè)Node實(shí)例(Java 7中每個(gè)元素是一個(gè)HashEntry),它的Key值和hash值都由final修飾,不可變更,無須關(guān)心它們被修改后的可見性問題。而其Value及對下一個(gè)元素的引用由volatile修飾,可見性也有保障。
static class Node<K,V> implements Map.Entry<K,V> {
final int hash;
final K key;
volatile V val;
volatile Node<K,V> next;
}
對于Key對應(yīng)的數(shù)組元素的可見性,由Unsafe的getObjectVolatile方法保證。
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
}
- size操作
put方法和remove方法都會通過addCount方法維護(hù)Map的size。size方法通過sumCount獲取由addCount方法維護(hù)的Map的size。
下面,我們結(jié)合部分源碼來看一下:
- put操作
首先通過 hash 找到對應(yīng)鏈表過后, 查看是否是第一個(gè)object, 如果是, 直接用cas原則插入,無需加鎖。
Node<K,V> f; int n, i, fh; K fk; V fv;
if (tab == null || (n = tab.length) == 0)
tab = initTable(); // 這里在整個(gè)map第一次操作時(shí),初始化hash桶, 也就是一個(gè)table
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
//如果是第一個(gè)object, 則直接cas放入, 不用鎖
if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value)))
break;
}
然后, 如果不是鏈表第一個(gè)object, 則直接用鏈表第一個(gè)object加鎖,這里加的鎖是synchronized,雖然效率不如 ReentrantLock, 但節(jié)約了空間,這里會一直用第一個(gè)object為鎖, 直到重新計(jì)算map大小, 比如擴(kuò)容或者操作了第一個(gè)object為止。
synchronized (f) {// 這里的f即為第一個(gè)鏈表的object
if (tabAt(tab, i) == f) {
if (fh >= 0) {
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key, value);
break;
}
}
}else if (f instanceof TreeBin) { // 太長會用紅黑樹
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}else if (f instanceof ReservationNode)
throw new IllegalStateException("Recursive update");
}
}