ConcurrentHashMap源碼分析JDK1.7&1.8

一、背景

? 在JDK1.5版本中引入了一個(gè)線(xiàn)程安全高性能Map類(lèi)—ConcurrentHashMap,主要為了解決HashMap線(xiàn)程不安全和Hashtable效率不高的問(wèn)題。眾所周知,HashMap在多線(xiàn)程編程中是線(xiàn)程不安全的,而Hashtable由于使用了synchronized修飾方法而導(dǎo)致執(zhí)行效率不高;因此,ConcurrentHashMap的出現(xiàn)是為了在保證線(xiàn)程安全的前提下,提高性能。

二、設(shè)計(jì)猜想

? 在探究源碼之前,首先我們先嘗試用自己的邏輯來(lái)設(shè)計(jì)下ConcurrentHashMap<K,V>的數(shù)據(jù)結(jié)構(gòu)。在我們熟悉的數(shù)據(jù)結(jié)構(gòu)中有 一種hash表的數(shù)據(jù)結(jié)構(gòu)符合ConcurrentHashMap的存儲(chǔ)要求

[散列表](Hash table,也叫哈希表),是根據(jù)關(guān)鍵碼值(Key value)而直接進(jìn)行訪(fǎng)問(wèn)的數(shù)據(jù)結(jié)構(gòu)。也就是 說(shuō),它通過(guò)把關(guān)鍵碼值映射到表中一個(gè)位置來(lái)訪(fǎng)問(wèn)記錄,以加快查找的速度。這個(gè)映射函數(shù)叫做散列函數(shù), 存放記錄的數(shù)組叫做散列表。

在設(shè)計(jì)ConcurrentHashMap中需要關(guān)注的幾個(gè)方面:

  • 由于數(shù)組大小、類(lèi)型一旦確定,編譯、運(yùn)行時(shí)都不可修改,必須對(duì)ConcurrentHashMap的數(shù)組動(dòng)態(tài)擴(kuò)容。
  • 通過(guò)Key計(jì)算得到的Hash值,必須保證元素都能正確的被保存,需要考慮hash沖突的處理。

三、設(shè)計(jì)思路

Hashtable效率不高的主要原因是容器操作過(guò)程中對(duì)整個(gè)對(duì)象進(jìn)行加鎖,其他線(xiàn)程的任何操作都會(huì)被阻塞。解決方式就是從“減少鎖的范圍”、“引入非阻塞鎖”或引入”讀寫(xiě)鎖“等方面進(jìn)行優(yōu)化,針對(duì)以上的思路可以進(jìn)行如下改進(jìn):

  1. Hashtable整張表拆分成一張張的小表,操作中只有某個(gè)小表受影響,而其他小表可以實(shí)現(xiàn)高性能操作。
  2. 小表可以實(shí)現(xiàn)非阻塞鎖的”讀寫(xiě)“鎖,在操作小表時(shí),可以減少其他線(xiàn)程的等待時(shí)間。

我們暫稱(chēng)以上的設(shè)計(jì)為“分段鎖”技術(shù)。

Map的底層數(shù)據(jù)結(jié)構(gòu)是數(shù)組+鏈表,基于上面的拆表思路,將上面的小表替換成鏈表的頭節(jié)點(diǎn)即數(shù)組的某個(gè)元素?;谝陨系姆桨?,可以降低方案一的復(fù)雜度和空間使用率。

JDK中提供的ConcurrentHashMap在1.7和1.8的版本中正是基于這個(gè)改進(jìn)思路進(jìn)行設(shè)計(jì)的。

四、分段鎖技術(shù)

? ConcurrentHashMap通過(guò)持有一個(gè)Segment[]數(shù)組對(duì)象來(lái)實(shí)現(xiàn)鎖分段技術(shù),其中每一個(gè)Segment對(duì)象通過(guò)繼承ReentrantLock實(shí)現(xiàn)持有非阻塞的重入鎖。實(shí)則一個(gè)Segment對(duì)象可以理解為一個(gè)HashMap,不同在于Segment持有鎖,而HashMap不持有鎖。Segment中持有一個(gè)HashEntry[]數(shù)組,HashEntry是數(shù)據(jù)節(jié)點(diǎn)對(duì)象。

ConcurrentHashMap結(jié)構(gòu)如下圖:

img

4.1 名詞術(shù)語(yǔ)

字段 說(shuō)明 備注
concurrencyLevel 并發(fā)級(jí)別
segmentShift 段偏移量
segmentMask 段掩碼

4.2 源碼剖析

4.2.1 容器初始化

? 容器初始化主要是為完成Segment[]數(shù)組對(duì)象的初始化,Segment[]數(shù)組的大小又依賴(lài)于ssize的值,ssize是通過(guò)concurrencyLevel計(jì)算得到,其中ssize應(yīng)該滿(mǎn)足2的N次方(power-of-two sizes),所以ssize >=concurrencyLevel,此處對(duì)于Segment[]數(shù)組的大小的約束是為了通過(guò)按位與的散列算法來(lái)定位Segments數(shù)組的索引。(以下省略無(wú)關(guān)代碼)

public ConcurrentHashMap(int initialCapacity,float loadFactor, int concurrencyLevel) {
  int ssize = 1;
  while (ssize < concurrencyLevel)
    ssize <<= 1;
  Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
}

4.2.2 設(shè)置值

public V put(K key, V value) {
  Segment<K,V> s;
  if (value == null)
  throw new NullPointerException();
  int hash = hash(key);
  int j = (hash >>> segmentShift) & segmentMask;
  if ((s = (Segment<K,V>)UNSAFE.getObject          // nonvolatile; recheck
  (segments, (j << SSHIFT) + SBASE)) == null) //  in ensureSegment
  s = ensureSegment(j);
  return s.put(key, hash, value, false);
}
  • ConcurrentHashMap的value不能為空,否則拋出NPE異常。
  • 獲取key相應(yīng)的hash值,用于計(jì)算操作元素所在的segment[]數(shù)組的索引下標(biāo)j。
  • 通過(guò)索引拿到相應(yīng)的segment對(duì)象,進(jìn)行put操作
計(jì)算Key的hash值
private int hash(Object k) {
  int h = hashSeed;
  if ((0 != h) && (k instanceof String))
    return sun.misc.Hashing.stringHash32((String) k);
  h ^= k.hashCode();
  h += (h <<  15) ^ 0xffffcd7d;
  h ^= (h >>> 10);
  h += (h << 3);
  h ^= (h >>> 6);
  h += (h << 2) + (h << 14);
  return h ^ (h >>> 16);
}

ConcurrentHashMap中對(duì)key的處理采用的是變體的Wang/Jenkins哈希算法。此算法特點(diǎn):雪崩性和可逆性。

計(jì)算Segment數(shù)組下標(biāo)
public ConcurrentHashMap(int initialCapacity,float loadFactor, int concurrencyLevel) {
  int sshift = 0,ssize = 1;
  while (ssize < concurrencyLevel){
    ++sshift;
    ssize <<= 1;
  }
  this.segmentShift = 32 - sshift;
    this.segmentMask = ssize - 1;
}

public V put(K key, V value) {
int j = (hash >>> segmentShift) & segmentMask;
}

? j指待操作的Segment[]數(shù)組的下標(biāo),而其中的segmentShiftsegmentMask是容器初始化時(shí)完成的賦值。其中的ssize等于Segment[]數(shù)組的長(zhǎng)度,而sshift滿(mǎn)足2^sshif=ssize公式。下標(biāo)j的值域范圍為[0,ssize-1],由于ssize=2^sshif,那么下標(biāo)j可以用1個(gè)sshift位的二進(jìn)制數(shù)字表示。假如:ssize為16,那么sshift=4,j的值域?yàn)閇0,15],而[0000b,1111b]就是j的值域;則求key在Segment[]的下標(biāo)j,就是求key對(duì)應(yīng)的一個(gè)散列的4位二進(jìn)制數(shù)值。而ConcurrentHashMap的源碼求下標(biāo)j的方式非常簡(jiǎn)單,就是取key的hash值的高4位。因此,求key散列到長(zhǎng)度為ssize的Segment數(shù)組的下標(biāo)j,就是求key的hash值的高sshift位。

1538405541238
初始化Segment對(duì)象
private Segment<K,V> ensureSegment(int k) {
  final Segment<K,V>[] ss = this.segments;
  long u = (k << SSHIFT) + SBASE; // raw offset
  Segment<K,V> seg;
  if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) {
    Segment<K,V> proto = ss[0]; // use segment 0 as prototype
    int cap = proto.table.length;
    float lf = proto.loadFactor;
    int threshold = (int)(cap * lf);
    HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap];
    if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))== null) { // recheck
      Segment<K,V> s = new Segment<K,V>(lf, threshold, tab);
      while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))== null) {
        if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))
          break;
      }
    }
  }
  return seg;
}
  • 利用UNSAFE原子操作從Segment[]數(shù)組中嘗試獲取對(duì)應(yīng)Segment對(duì)象,沒(méi)有則進(jìn)行初始化,有則直接返回。
  • Segment對(duì)象的初始化以Segment[]數(shù)組中頭節(jié)點(diǎn)的參數(shù)和UNSAFE原子操作完成初始化。
存放元素
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
  //嘗試獲取Segment的Lock,獲取成功返回null,否則非阻塞重試3次,超過(guò)3次則阻塞等待鎖,返回對(duì)應(yīng)的鏈表節(jié)點(diǎn)。   
  HashEntry<K,V> node = tryLock() ? null :scanAndLockForPut(key, hash, value);
  V oldValue;
  try {
    HashEntry<K,V>[] tab = table;
    int index = (tab.length - 1) & hash;
    HashEntry<K,V> first = entryAt(tab, index);
    for (HashEntry<K,V> e = first;;) {
      if (e != null) {
        K k;
        if ((k = e.key) == key ||(e.hash == hash && key.equals(k))) {
          oldValue = e.value;
          if (!onlyIfAbsent) {
            e.value = value;
            ++modCount;
          }
          break;
        }
        //遞歸從鏈表頭切換到后續(xù)節(jié)點(diǎn)
        e = e.next;
      }
      else {
        if (node != null)
          node.setNext(first);
        else
          node = new HashEntry<K,V>(hash, key, value, first);
        int c = count + 1;
        if (c > threshold && tab.length < MAXIMUM_CAPACITY)
          rehash(node);
        else
          setEntryAt(tab, index, node);
        ++modCount;
        count = c;
        oldValue = null;
        break;
      }
    }
  } finally {
    unlock();
  }
  return oldValue;
}
  1. 首先嘗試獲取Segment的Lock,Segment持有的是ReentrantLock非阻塞鎖,如果失敗則重試獲取
  2. 獲取鏈表的頭結(jié)點(diǎn),Segment持有的HashEntry[]數(shù)組操作的是頭結(jié)點(diǎn)開(kāi)始:
    1. 如果頭節(jié)點(diǎn)e不為空,如果key相同,則更新值,否則執(zhí)行e = e.next進(jìn)行鏈表遍歷
    2. 如果頭節(jié)點(diǎn)為空,則新增節(jié)點(diǎn)node,滿(mǎn)足node.next=first。

4.2.3 容器擴(kuò)容

private void rehash(HashEntry<K,V> node) {
  HashEntry<K,V>[] oldTable = table;
  int oldCapacity = oldTable.length;
  int newCapacity = oldCapacity << 1;
  threshold = (int)(newCapacity * loadFactor);
  HashEntry<K,V>[] newTable =
    (HashEntry<K,V>[]) new HashEntry[newCapacity];
  int sizeMask = newCapacity - 1;
  for (int i = 0; i < oldCapacity ; i++) {
    HashEntry<K,V> e = oldTable[i];
    if (e != null) {
      HashEntry<K,V> next = e.next;
      int idx = e.hash & sizeMask;
      if (next == null)   //  Single node on list
        newTable[idx] = e;
      else { // Reuse consecutive sequence at same slot
        HashEntry<K,V> lastRun = e;
        int lastIdx = idx;
        for (HashEntry<K,V> last = next;
             last != null;
             last = last.next) {
          int k = last.hash & sizeMask;
          if (k != lastIdx) {
            lastIdx = k;
            lastRun = last;
          }
        }
        newTable[lastIdx] = lastRun;
        // Clone remaining nodes
        for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
          V v = p.value;
          int h = p.hash;
          int k = h & sizeMask;
          HashEntry<K,V> n = newTable[k];
          newTable[k] = new HashEntry<K,V>(h, p.key, v, n);
        }
      }
    }
  }
  int nodeIndex = node.hash & sizeMask; // add the new node
  node.setNext(newTable[nodeIndex]);
  newTable[nodeIndex] = node;
  table = newTable;
}

4.2.4 獲取值

public V get(Object key) {
  Segment<K,V> s; // manually integrate access methods to reduce overhead
  HashEntry<K,V>[] tab;
  int h = hash(key);
  long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
  if ((s=(Segment<K,V>)UNSAFE.getObjectVolatile(segments,u))!=null&&(tab = s.table)!=null) {
    for (HashEntry<K,V> e=(HashEntry<K,V>) UNSAFE.getObjectVolatile(tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);e != null; e = e.next) {
      K k;
      if ((k = e.key) == key || (e.hash == h && key.equals(k)))
        return e.value;
    }
  }
  return null;
}
  • 獲取key相應(yīng)的hash值,通過(guò)計(jì)算得到索引u,從而定位到Segment[]數(shù)組中的Segment對(duì)象。
  • 在通過(guò)計(jì)算得到HashEntry[]數(shù)組中的HashEntry對(duì)象,再通過(guò)循環(huán)遍歷鏈表,最終定位到數(shù)據(jù)。

五、行鎖技術(shù)

5.1 源碼剖析

5.2.1 容器初始化

static final float DEFAULT_LOAD_FACTOR = 0.75f;
static final int DEFAULT_INITIAL_CAPACITY = 1 << 4;
public ConcurrentHashMap(int initialCapacity,float loadFactor, int concurrencyLevel) {
    ……………………
    this.loadFactor = loadFactor;
  this.threshold = tableSizeFor(initialCapacity);
}

在進(jìn)行初始化容器時(shí),會(huì)設(shè)置兩個(gè)參數(shù),initialCapacity表示容器數(shù)組的容量,loadFactor表示容器的負(fù)載因子。其中tableSizeFor會(huì)對(duì)initialCapacity參數(shù)值進(jìn)行二進(jìn)制的位左移,使得容量是2的N次方。

5.2.2 設(shè)置值

計(jì)算Key的hash值
static final int spread(int h) {
  return (h ^ (h >>> 16)) & HASH_BITS;
}

其中 hash(key) 方法就是計(jì)算key的hash值, (h = key.hashCode()) ^ (h >>> 16) 使用高位16位和低16位異或運(yùn)算得到Hash值,主要為了使hash分布盡可能的均勻。

[圖片上傳失敗...(image-237caa-1576651270043)]

初始化容器數(shù)組
private final Node<K,V>[] initTable() {
  Node<K,V>[] tab; int sc;
  while ((tab = table) == null || tab.length == 0) {
    if ((sc = sizeCtl) < 0)
      Thread.yield(); // lost initialization race; just spin
    else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
      try {
        if ((tab = table) == null || tab.length == 0) {
          int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
          @SuppressWarnings("unchecked")
          Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
          table = tab = nt;
          sc = n - (n >>> 2);
        }
      } finally {
        sizeCtl = sc;
      }
      break;
    }
  }
  return tab;
}
  • 通過(guò)sc變量來(lái)控制初始化過(guò)程,sc=-1在進(jìn)行容器初始化,其他線(xiàn)程循環(huán)等待并讓出CPU(Thread.yield())
  • 初始化Node[]數(shù)組對(duì)象,并返回
存放元素
final V putVal(K key, V value, boolean onlyIfAbsent) {
  for (Node<K,V>[] tab = table;;) {
    Node<K,V> f; int n, i, fh;
    if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
      if (casTabAt(tab, i, null,new Node<K,V>(hash, key, value, null)))
        break;                   // no lock when adding to empty bin
    }else {
      V oldVal = null;
      synchronized (f) {
        if (tabAt(tab, i) == f) {
          if (fh >= 0) {
            binCount = 1;
            for (Node<K,V> e = f;; ++binCount) {
              K ek;
              if (e.hash == hash &&
                  ((ek = e.key) == key ||
                   (ek != null && key.equals(ek)))) {
                oldVal = e.val;
                if (!onlyIfAbsent)
                  e.val = value;
                break;
              }
              Node<K,V> pred = e;
              if ((e = e.next) == null) {
                pred.next = new Node<K,V>(hash, key,
                                          value, null);
                break;
              }
            }
          }
          else if (f instanceof TreeBin) {
            Node<K,V> p;
            binCount = 2;
            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                  value)) != null) {
              oldVal = p.val;
              if (!onlyIfAbsent)
                p.val = value;
            }
          }
        }
      }
      if (binCount != 0) {
        if (binCount >= TREEIFY_THRESHOLD)
          treeifyBin(tab, i);
        if (oldVal != null)
          return oldVal;
        break;
      }
    }
  }
  addCount(1L, binCount);
  return null;
}
  1. 獲取容器中Node[]數(shù)組的頭結(jié)點(diǎn),如果頭結(jié)點(diǎn)為空,則構(gòu)造新節(jié)點(diǎn),并添加到該頭結(jié)點(diǎn)上(CAS)。
  2. 如果不是頭節(jié)點(diǎn),則使用synchronized鎖住頭節(jié)點(diǎn),循環(huán)進(jìn)行鏈表尾部添加

5.2.3 容器擴(kuò)容

https://www.cnblogs.com/menghuantiancheng/p/10462370.html

5.2.4 獲取值

public V get(Object key) {
  Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
  int h = spread(key.hashCode());
  if ((tab = table) != null && (n = tab.length) >0&&(e = tabAt(tab, (n - 1) & h)) != null) {
    if ((eh = e.hash) == h) {
      if ((ek = e.key) == key || (ek != null && key.equals(ek)))
        return e.val;
    } else if (eh < 0)
      return (p = e.find(h, key)) != null ? p.val : null;
    while ((e = e.next) != null) {
      if (e.hash == h &&
          ((ek = e.key) == key || (ek != null && key.equals(ek))))
        return e.val;
    }
  }
  return null;
}
  • 獲取key相應(yīng)的hash值,定位到Node[]數(shù)組的索引位,依次循環(huán)遍歷鏈表,最終定位到數(shù)據(jù)。

參考文檔:

https://www.cnblogs.com/weiweiblog/archive/2018/09/09/9611271.html

https://blog.csdn.net/majinggogogo/article/details/80260400

https://www.cnblogs.com/lonelyJay/p/9736027.html

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀(guān)點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容