ConcurrentHashMap鎖的前世今生,了解一下

我們知道,HashMap 是線程不安全的,為了使用線程安全的 HashMap,一種方法是使用 Collections 中的 synchronizedMap 方法。另一種方法是使用使用 ConcurrentHashMap。


一、Collections.synchronizedMap(Map map)鎖的實(shí)現(xiàn)方式。

這里做一下延伸,我們簡單看一下這個(gè)方法的源碼,不出所料這個(gè)方法是重寫了 HashMap 的方法,并且為這個(gè) map 對象加了 synchronized 鎖。如下圖:

ConcurrentHashMap鎖的前世今生

二、ConcurrentHashMap 鎖的實(shí)現(xiàn)方式

在 Java 5 之后,JDK 引入了 java.util.concurrent 并發(fā)包 ,其中最常用的就是 ConcurrentHashMap 了, 它的原理是引用了內(nèi)部的 Segment ( ReentrantLock ) 分段鎖,保證在操作不同段 map 的時(shí)候, 可以并發(fā)執(zhí)行, 操作同段 map 的時(shí)候,進(jìn)行鎖的競爭和等待。從而達(dá)到線程安全, 且效率大于 synchronized。

但是在Java8之后,JDK又棄用了分段鎖的策略,重新使用synchronized來實(shí)現(xiàn)線程安全。

三、分段鎖(java8之前)

我們先來看一下分段鎖如何實(shí)現(xiàn)的。

  • 數(shù)據(jù)結(jié)構(gòu)

ConcurrentHashMap的底層數(shù)據(jù)結(jié)構(gòu)和HashMap一樣,仍然是數(shù)組和鏈表。不同點(diǎn)在于,ConcurrentHashMap的外層不是一個(gè)大數(shù)組,而是一個(gè)Segment數(shù)組,每個(gè)Segment包含一個(gè)和HashMap數(shù)據(jù)結(jié)構(gòu)差不多的鏈表數(shù)組。如下圖所示:

ConcurrentHashMap鎖的前世今生
  • 尋址方式

在讀寫某個(gè)Key時(shí),先取該Key的哈希值。并將哈希值的高N位對Segment個(gè)數(shù)取模從而得到該Key應(yīng)該屬于哪個(gè)Segment,接著如同操作HashMap一樣操作這個(gè)Segment。為了保證不同的值均勻分布到不同的Segment,需要通過如下方法計(jì)算哈希值。

private int hash(Object k) {
  int h = hashSeed;
  if ((0 != h) && (k instanceof String)) {
    return sun.misc.Hashing.stringHash32((String) k);
  }
  h ^= k.hashCode();
  h += (h <<  15) ^ 0xffffcd7d;
  h ^= (h >>> 10);
  h += (h <<   3);
  h ^= (h >>>  6);
  h += (h <<   2) + (h << 14);
  return h ^ (h >>> 16);
}

同樣為了提高取模運(yùn)算效率,通過如下計(jì)算,ssize即為大于concurrencyLevel的最小的2的N次方,同時(shí)segmentMask為2^N-1。這一點(diǎn)跟上文中計(jì)算數(shù)組長度的方法一致。對于某一個(gè)Key的哈希值,只需要向右移segmentShift位以取高sshift位,再與segmentMask取與操作即可得到它在Segment數(shù)組上的索引。

int sshift = 0;  
int ssize = 1;  
while (ssize < concurrencyLevel) {  
++sshift;  
ssize <<= 1;  
}  
this.segmentShift = 32 - sshift;  
this.segmentMask = ssize - 1;  
Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
  • 同步方式

Segment繼承自ReentrantLock,所以我們可以很方便的對每一個(gè)Segment上鎖。

對于讀操作,獲取Key所在的Segment時(shí),需要保證可見性(請參考如何保證多線程條件下的可見性)。具體實(shí)現(xiàn)上可以使用volatile關(guān)鍵字,也可使用鎖。但使用鎖開銷太大,而使用volatile時(shí)每次寫操作都會讓所有CPU內(nèi)緩存無效,也有一定開銷。ConcurrentHashMap使用如下方法保證可見性,取得最新的Segment。

Segment<K,V> s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)

獲取Segment中的HashEntry時(shí)也使用了類似方法

HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile  (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE)

對于寫操作,并不要求同時(shí)獲取所有Segment的鎖,因?yàn)槟菢酉喈?dāng)于鎖住了整個(gè)Map。它會先獲取該Key-Value對所在的Segment的鎖,獲取成功后就可以像操作一個(gè)普通的HashMap一樣操作該Segment,并保證該Segment的安全性。
同時(shí)由于其它Segment的鎖并未被獲取,因此理論上可支持concurrencyLevel(等于Segment的個(gè)數(shù))個(gè)線程安全的并發(fā)讀寫。

獲取鎖時(shí),并不直接使用lock來獲取,因?yàn)樵摲椒ǐ@取鎖失敗時(shí)會掛起(參考可重入鎖)。事實(shí)上,它使用了自旋鎖,如果tryLock獲取鎖失敗,說明鎖被其它線程占用,此時(shí)通過循環(huán)再次以tryLock的方式申請鎖。如果在循環(huán)過程中該Key所對應(yīng)的鏈表頭被修改,則重置retry次數(shù)。如果retry次數(shù)超過一定值,則使用lock方法申請鎖。

這里使用自旋鎖是因?yàn)樽孕i的效率比較高,但是它消耗CPU資源比較多,因此在自旋次數(shù)超過閾值時(shí)切換為互斥鎖。

  • size操作

put、remove和get操作只需要關(guān)心一個(gè)Segment,而size操作需要遍歷所有的Segment才能算出整個(gè)Map的大小。一個(gè)簡單的方案是,先鎖住所有Sgment,計(jì)算完后再解鎖。但這樣做,在做size操作時(shí),不僅無法對Map進(jìn)行寫操作,同時(shí)也無法進(jìn)行讀操作,不利于對Map的并行操作。

為更好支持并發(fā)操作,ConcurrentHashMap會在不上鎖的前提逐個(gè)Segment計(jì)算3次size,如果某相鄰兩次計(jì)算獲取的所有Segment的更新次數(shù)(每個(gè)Segment都與HashMap一樣通過modCount跟蹤自己的修改次數(shù),Segment每修改一次其modCount加一)相等,說明這兩次計(jì)算過程中無更新操作,則這兩次計(jì)算出的總size相等,可直接作為最終結(jié)果返回。如果這三次計(jì)算過程中Map有更新,則對所有Segment加鎖重新計(jì)算Size。該計(jì)算方法代碼如下

public int size() {  
 final Segment<K,V>[] segments = this.segments;  
 int size;  
 boolean overflow; // true if size overflows 32 bits  
 long sum;         // sum of modCounts  
 long last = 0L;   // previous sum  
 int retries = -1; // first iteration isn't retry  
 try {  
 for (;;) {  
 if (retries++ == RETRIES_BEFORE_LOCK) {  
 for (int j = 0; j < segments.length; ++j)  
 ensureSegment(j).lock(); // force creation  
 }  
 sum = 0L;  
 size = 0;  
 overflow = false;  
 for (int j = 0; j < segments.length; ++j) {  
 Segment<K,V> seg = segmentAt(segments, j);  
 if (seg != null) {  
 sum += seg.modCount;  
 int c = seg.count;  
 if (c < 0 || (size += c) < 0)  
 overflow = true;  
 }  
 }  
 if (sum == last)  
 break;  
 last = sum;  
 }  
 } finally {  
 if (retries > RETRIES_BEFORE_LOCK) {  
 for (int j = 0; j < segments.length; ++j)  
 segmentAt(segments, j).unlock();  
 }  
 }  
 return overflow ? Integer.MAX_VALUE : size;  
}

四、基于CAS的ConcurrentHashMap

Java 7為實(shí)現(xiàn)并行訪問,引入了Segment這一結(jié)構(gòu),實(shí)現(xiàn)了分段鎖,理論上最大并發(fā)度與Segment個(gè)數(shù)相等。Java 8為進(jìn)一步提高并發(fā)性,摒棄了分段鎖的方案,而是直接使用一個(gè)大的數(shù)組。同時(shí)為了提高哈希碰撞下的尋址性能,Java 8在鏈表長度超過一定閾值(8)時(shí)將鏈表(尋址時(shí)間復(fù)雜度為O(N))轉(zhuǎn)換為紅黑樹(尋址時(shí)間復(fù)雜度為O(long(N)))。其數(shù)據(jù)結(jié)構(gòu)如下圖所示:

ConcurrentHashMap鎖的前世今生
  • 尋址方式

Java 8的ConcurrentHashMap同樣是通過Key的哈希值與數(shù)組長度取模確定該Key在數(shù)組中的索引。同樣為了避免不太好的Key的hashCode設(shè)計(jì),它通過如下方法計(jì)算得到Key的最終哈希值。不同的是,Java 8的ConcurrentHashMap作者認(rèn)為引入紅黑樹后,即使哈希沖突比較嚴(yán)重,尋址效率也足夠高,所以作者并未在哈希值的計(jì)算上做過多設(shè)計(jì),只是將Key的hashCode值與其高16位作異或并保證最高位為0(從而保證最終結(jié)果為正整數(shù))。

static final int spread(int h) {
  return (h ^ (h >>> 16)) & HASH_BITS;
}
  • 同步方式

對于put操作,如果Key對應(yīng)的數(shù)組元素為null,則通過CAS操作將其設(shè)置為當(dāng)前值。如果Key對應(yīng)的數(shù)組元素(也即鏈表表頭或者樹的根元素)不為null,則對該元素使用synchronized關(guān)鍵字申請鎖,然后進(jìn)行操作。如果該put操作使得當(dāng)前鏈表長度超過一定閾值,則將該鏈表轉(zhuǎn)換為樹,從而提高尋址效率。

對于讀操作,由于數(shù)組被volatile關(guān)鍵字修飾,因此不用擔(dān)心數(shù)組的可見性問題。同時(shí)每個(gè)元素是一個(gè)Node實(shí)例(Java 7中每個(gè)元素是一個(gè)HashEntry),它的Key值和hash值都由final修飾,不可變更,無須關(guān)心它們被修改后的可見性問題。而其Value及對下一個(gè)元素的引用由volatile修飾,可見性也有保障。

static class Node<K,V> implements Map.Entry<K,V> {
  final int hash;
  final K key;
  volatile V val;
  volatile Node<K,V> next;
}

對于Key對應(yīng)的數(shù)組元素的可見性,由Unsafe的getObjectVolatile方法保證。

static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
  return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
}
  • size操作

put方法和remove方法都會通過addCount方法維護(hù)Map的size。size方法通過sumCount獲取由addCount方法維護(hù)的Map的size。

下面,我們結(jié)合部分源碼來看一下:

  • put操作

首先通過 hash 找到對應(yīng)鏈表過后, 查看是否是第一個(gè)object, 如果是, 直接用cas原則插入,無需加鎖。

Node<K,V>  f;  int  n,  i,  fh;  K  fk;  V  fv;
if  (tab  ==  null  ||  (n  =  tab.length)  ==  0)
    tab  =  initTable();  // 這里在整個(gè)map第一次操作時(shí),初始化hash桶, 也就是一個(gè)table
else  if  ((f  =  tabAt(tab,  i  =  (n  -  1)  &  hash))  ==  null)  {
//如果是第一個(gè)object, 則直接cas放入, 不用鎖
    if  (casTabAt(tab,  i,  null,  new  Node<K,V>(hash,  key,  value)))
        break;                  
}

然后, 如果不是鏈表第一個(gè)object, 則直接用鏈表第一個(gè)object加鎖,這里加的鎖是synchronized,雖然效率不如 ReentrantLock, 但節(jié)約了空間,這里會一直用第一個(gè)object為鎖, 直到重新計(jì)算map大小, 比如擴(kuò)容或者操作了第一個(gè)object為止。

synchronized  (f)  {// 這里的f即為第一個(gè)鏈表的object
    if  (tabAt(tab,  i)  ==  f)  {
        if  (fh  >=  0)  {
            binCount  =  1;
            for  (Node<K,V>  e  =  f;;  ++binCount)  {
                K  ek;
                if  (e.hash  ==  hash  &&
                    ((ek  =  e.key)  ==  key  ||
 (ek  !=  null  &&  key.equals(ek))))  {
                    oldVal  =  e.val;
                    if  (!onlyIfAbsent)
                        e.val  =  value;
                    break;
                }
                Node<K,V>  pred  =  e;
                if  ((e  =  e.next)  ==  null)  {
                    pred.next  =  new  Node<K,V>(hash,  key,  value);
                    break;
                }
            }
        }else  if  (f  instanceof  TreeBin)  {  // 太長會用紅黑樹
            Node<K,V>  p;
            binCount  =  2;
            if  ((p  =  ((TreeBin<K,V>)f).putTreeVal(hash,  key, value))  !=  null)  {
                oldVal  =  p.val;
                if  (!onlyIfAbsent)
                    p.val  =  value;
            }
        }else  if  (f  instanceof  ReservationNode)
            throw  new  IllegalStateException("Recursive update");
    }
}

來源:https://www.tuicool.com/articles/zeQzmeU

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容