一、背景
? 在JDK1.5版本中引入了一個(gè)線(xiàn)程安全高性能Map類(lèi)—ConcurrentHashMap,主要為了解決HashMap線(xiàn)程不安全和Hashtable效率不高的問(wèn)題。眾所周知,HashMap在多線(xiàn)程編程中是線(xiàn)程不安全的,而Hashtable由于使用了synchronized修飾方法而導(dǎo)致執(zhí)行效率不高;因此,ConcurrentHashMap的出現(xiàn)是為了在保證線(xiàn)程安全的前提下,提高性能。
二、設(shè)計(jì)猜想
? 在探究源碼之前,首先我們先嘗試用自己的邏輯來(lái)設(shè)計(jì)下ConcurrentHashMap<K,V>的數(shù)據(jù)結(jié)構(gòu)。在我們熟悉的數(shù)據(jù)結(jié)構(gòu)中有 一種hash表的數(shù)據(jù)結(jié)構(gòu)符合ConcurrentHashMap的存儲(chǔ)要求
[散列表](Hash table,也叫哈希表),是根據(jù)關(guān)鍵碼值(Key value)而直接進(jìn)行訪(fǎng)問(wèn)的數(shù)據(jù)結(jié)構(gòu)。也就是 說(shuō),它通過(guò)把關(guān)鍵碼值映射到表中一個(gè)位置來(lái)訪(fǎng)問(wèn)記錄,以加快查找的速度。這個(gè)映射函數(shù)叫做散列函數(shù), 存放記錄的數(shù)組叫做散列表。
在設(shè)計(jì)ConcurrentHashMap中需要關(guān)注的幾個(gè)方面:
- 由于數(shù)組大小、類(lèi)型一旦確定,編譯、運(yùn)行時(shí)都不可修改,必須對(duì)
ConcurrentHashMap的數(shù)組動(dòng)態(tài)擴(kuò)容。 - 通過(guò)Key計(jì)算得到的
Hash值,必須保證元素都能正確的被保存,需要考慮hash沖突的處理。
三、設(shè)計(jì)思路
Hashtable效率不高的主要原因是容器操作過(guò)程中對(duì)整個(gè)對(duì)象進(jìn)行加鎖,其他線(xiàn)程的任何操作都會(huì)被阻塞。解決方式就是從“減少鎖的范圍”、“引入非阻塞鎖”或引入”讀寫(xiě)鎖“等方面進(jìn)行優(yōu)化,針對(duì)以上的思路可以進(jìn)行如下改進(jìn):
- 將
Hashtable整張表拆分成一張張的小表,操作中只有某個(gè)小表受影響,而其他小表可以實(shí)現(xiàn)高性能操作。 - 小表可以實(shí)現(xiàn)非阻塞鎖的”讀寫(xiě)“鎖,在操作小表時(shí),可以減少其他線(xiàn)程的等待時(shí)間。
我們暫稱(chēng)以上的設(shè)計(jì)為“分段鎖”技術(shù)。
Map的底層數(shù)據(jù)結(jié)構(gòu)是數(shù)組+鏈表,基于上面的拆表思路,將上面的小表替換成鏈表的頭節(jié)點(diǎn)即數(shù)組的某個(gè)元素?;谝陨系姆桨?,可以降低方案一的復(fù)雜度和空間使用率。
JDK中提供的ConcurrentHashMap在1.7和1.8的版本中正是基于這個(gè)改進(jìn)思路進(jìn)行設(shè)計(jì)的。
四、分段鎖技術(shù)
? ConcurrentHashMap通過(guò)持有一個(gè)Segment[]數(shù)組對(duì)象來(lái)實(shí)現(xiàn)鎖分段技術(shù),其中每一個(gè)Segment對(duì)象通過(guò)繼承ReentrantLock實(shí)現(xiàn)持有非阻塞的重入鎖。實(shí)則一個(gè)Segment對(duì)象可以理解為一個(gè)HashMap,不同在于Segment持有鎖,而HashMap不持有鎖。Segment中持有一個(gè)HashEntry[]數(shù)組,HashEntry是數(shù)據(jù)節(jié)點(diǎn)對(duì)象。
ConcurrentHashMap結(jié)構(gòu)如下圖:

4.1 名詞術(shù)語(yǔ)
| 字段 | 說(shuō)明 | 備注 |
|---|---|---|
| concurrencyLevel | 并發(fā)級(jí)別 | |
| segmentShift | 段偏移量 | |
| segmentMask | 段掩碼 |
4.2 源碼剖析
4.2.1 容器初始化
? 容器初始化主要是為完成Segment[]數(shù)組對(duì)象的初始化,Segment[]數(shù)組的大小又依賴(lài)于ssize的值,ssize是通過(guò)concurrencyLevel計(jì)算得到,其中ssize應(yīng)該滿(mǎn)足2的N次方(power-of-two sizes),所以ssize >=concurrencyLevel,此處對(duì)于Segment[]數(shù)組的大小的約束是為了通過(guò)按位與的散列算法來(lái)定位Segments數(shù)組的索引。(以下省略無(wú)關(guān)代碼)
public ConcurrentHashMap(int initialCapacity,float loadFactor, int concurrencyLevel) {
int ssize = 1;
while (ssize < concurrencyLevel)
ssize <<= 1;
Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
}
4.2.2 設(shè)置值
public V put(K key, V value) {
Segment<K,V> s;
if (value == null)
throw new NullPointerException();
int hash = hash(key);
int j = (hash >>> segmentShift) & segmentMask;
if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck
(segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment
s = ensureSegment(j);
return s.put(key, hash, value, false);
}
-
ConcurrentHashMap的value不能為空,否則拋出NPE異常。 - 獲取key相應(yīng)的
hash值,用于計(jì)算操作元素所在的segment[]數(shù)組的索引下標(biāo)j。 - 通過(guò)索引拿到相應(yīng)的
segment對(duì)象,進(jìn)行put操作
計(jì)算Key的hash值
private int hash(Object k) {
int h = hashSeed;
if ((0 != h) && (k instanceof String))
return sun.misc.Hashing.stringHash32((String) k);
h ^= k.hashCode();
h += (h << 15) ^ 0xffffcd7d;
h ^= (h >>> 10);
h += (h << 3);
h ^= (h >>> 6);
h += (h << 2) + (h << 14);
return h ^ (h >>> 16);
}
ConcurrentHashMap中對(duì)key的處理采用的是變體的Wang/Jenkins哈希算法。此算法特點(diǎn):雪崩性和可逆性。
計(jì)算Segment數(shù)組下標(biāo)
public ConcurrentHashMap(int initialCapacity,float loadFactor, int concurrencyLevel) {
int sshift = 0,ssize = 1;
while (ssize < concurrencyLevel){
++sshift;
ssize <<= 1;
}
this.segmentShift = 32 - sshift;
this.segmentMask = ssize - 1;
}
public V put(K key, V value) {
int j = (hash >>> segmentShift) & segmentMask;
}
? j指待操作的Segment[]數(shù)組的下標(biāo),而其中的segmentShift和segmentMask是容器初始化時(shí)完成的賦值。其中的ssize等于Segment[]數(shù)組的長(zhǎng)度,而sshift滿(mǎn)足2^sshif=ssize公式。下標(biāo)j的值域范圍為[0,ssize-1],由于ssize=2^sshif,那么下標(biāo)j可以用1個(gè)sshift位的二進(jìn)制數(shù)字表示。假如:ssize為16,那么sshift=4,j的值域?yàn)閇0,15],而[0000b,1111b]就是j的值域;則求key在Segment[]的下標(biāo)j,就是求key對(duì)應(yīng)的一個(gè)散列的4位二進(jìn)制數(shù)值。而ConcurrentHashMap的源碼求下標(biāo)j的方式非常簡(jiǎn)單,就是取key的hash值的高4位。因此,求key散列到長(zhǎng)度為ssize的Segment數(shù)組的下標(biāo)j,就是求key的hash值的高sshift位。

初始化Segment對(duì)象
private Segment<K,V> ensureSegment(int k) {
final Segment<K,V>[] ss = this.segments;
long u = (k << SSHIFT) + SBASE; // raw offset
Segment<K,V> seg;
if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) {
Segment<K,V> proto = ss[0]; // use segment 0 as prototype
int cap = proto.table.length;
float lf = proto.loadFactor;
int threshold = (int)(cap * lf);
HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap];
if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))== null) { // recheck
Segment<K,V> s = new Segment<K,V>(lf, threshold, tab);
while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))== null) {
if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))
break;
}
}
}
return seg;
}
- 利用
UNSAFE原子操作從Segment[]數(shù)組中嘗試獲取對(duì)應(yīng)Segment對(duì)象,沒(méi)有則進(jìn)行初始化,有則直接返回。 -
Segment對(duì)象的初始化以Segment[]數(shù)組中頭節(jié)點(diǎn)的參數(shù)和UNSAFE原子操作完成初始化。
存放元素
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
//嘗試獲取Segment的Lock,獲取成功返回null,否則非阻塞重試3次,超過(guò)3次則阻塞等待鎖,返回對(duì)應(yīng)的鏈表節(jié)點(diǎn)。
HashEntry<K,V> node = tryLock() ? null :scanAndLockForPut(key, hash, value);
V oldValue;
try {
HashEntry<K,V>[] tab = table;
int index = (tab.length - 1) & hash;
HashEntry<K,V> first = entryAt(tab, index);
for (HashEntry<K,V> e = first;;) {
if (e != null) {
K k;
if ((k = e.key) == key ||(e.hash == hash && key.equals(k))) {
oldValue = e.value;
if (!onlyIfAbsent) {
e.value = value;
++modCount;
}
break;
}
//遞歸從鏈表頭切換到后續(xù)節(jié)點(diǎn)
e = e.next;
}
else {
if (node != null)
node.setNext(first);
else
node = new HashEntry<K,V>(hash, key, value, first);
int c = count + 1;
if (c > threshold && tab.length < MAXIMUM_CAPACITY)
rehash(node);
else
setEntryAt(tab, index, node);
++modCount;
count = c;
oldValue = null;
break;
}
}
} finally {
unlock();
}
return oldValue;
}
- 首先嘗試獲取
Segment的Lock,Segment持有的是ReentrantLock非阻塞鎖,如果失敗則重試獲取 - 獲取鏈表的頭結(jié)點(diǎn),
Segment持有的HashEntry[]數(shù)組操作的是頭結(jié)點(diǎn)開(kāi)始:- 如果頭節(jié)點(diǎn)e不為空,如果key相同,則更新值,否則執(zhí)行
e = e.next進(jìn)行鏈表遍歷 - 如果頭節(jié)點(diǎn)為空,則新增節(jié)點(diǎn)node,滿(mǎn)足
node.next=first。
- 如果頭節(jié)點(diǎn)e不為空,如果key相同,則更新值,否則執(zhí)行
4.2.3 容器擴(kuò)容
private void rehash(HashEntry<K,V> node) {
HashEntry<K,V>[] oldTable = table;
int oldCapacity = oldTable.length;
int newCapacity = oldCapacity << 1;
threshold = (int)(newCapacity * loadFactor);
HashEntry<K,V>[] newTable =
(HashEntry<K,V>[]) new HashEntry[newCapacity];
int sizeMask = newCapacity - 1;
for (int i = 0; i < oldCapacity ; i++) {
HashEntry<K,V> e = oldTable[i];
if (e != null) {
HashEntry<K,V> next = e.next;
int idx = e.hash & sizeMask;
if (next == null) // Single node on list
newTable[idx] = e;
else { // Reuse consecutive sequence at same slot
HashEntry<K,V> lastRun = e;
int lastIdx = idx;
for (HashEntry<K,V> last = next;
last != null;
last = last.next) {
int k = last.hash & sizeMask;
if (k != lastIdx) {
lastIdx = k;
lastRun = last;
}
}
newTable[lastIdx] = lastRun;
// Clone remaining nodes
for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
V v = p.value;
int h = p.hash;
int k = h & sizeMask;
HashEntry<K,V> n = newTable[k];
newTable[k] = new HashEntry<K,V>(h, p.key, v, n);
}
}
}
}
int nodeIndex = node.hash & sizeMask; // add the new node
node.setNext(newTable[nodeIndex]);
newTable[nodeIndex] = node;
table = newTable;
}
4.2.4 獲取值
public V get(Object key) {
Segment<K,V> s; // manually integrate access methods to reduce overhead
HashEntry<K,V>[] tab;
int h = hash(key);
long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
if ((s=(Segment<K,V>)UNSAFE.getObjectVolatile(segments,u))!=null&&(tab = s.table)!=null) {
for (HashEntry<K,V> e=(HashEntry<K,V>) UNSAFE.getObjectVolatile(tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);e != null; e = e.next) {
K k;
if ((k = e.key) == key || (e.hash == h && key.equals(k)))
return e.value;
}
}
return null;
}
- 獲取key相應(yīng)的
hash值,通過(guò)計(jì)算得到索引u,從而定位到Segment[]數(shù)組中的Segment對(duì)象。 - 在通過(guò)計(jì)算得到
HashEntry[]數(shù)組中的HashEntry對(duì)象,再通過(guò)循環(huán)遍歷鏈表,最終定位到數(shù)據(jù)。
五、行鎖技術(shù)
5.1 源碼剖析
5.2.1 容器初始化
static final float DEFAULT_LOAD_FACTOR = 0.75f;
static final int DEFAULT_INITIAL_CAPACITY = 1 << 4;
public ConcurrentHashMap(int initialCapacity,float loadFactor, int concurrencyLevel) {
……………………
this.loadFactor = loadFactor;
this.threshold = tableSizeFor(initialCapacity);
}
在進(jìn)行初始化容器時(shí),會(huì)設(shè)置兩個(gè)參數(shù),initialCapacity表示容器數(shù)組的容量,loadFactor表示容器的負(fù)載因子。其中tableSizeFor會(huì)對(duì)initialCapacity參數(shù)值進(jìn)行二進(jìn)制的位左移,使得容量是2的N次方。
5.2.2 設(shè)置值
計(jì)算Key的hash值
static final int spread(int h) {
return (h ^ (h >>> 16)) & HASH_BITS;
}
其中 hash(key) 方法就是計(jì)算key的hash值, (h = key.hashCode()) ^ (h >>> 16) 使用高位16位和低16位異或運(yùn)算得到Hash值,主要為了使hash分布盡可能的均勻。
[圖片上傳失敗...(image-237caa-1576651270043)]
初始化容器數(shù)組
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) {
if ((sc = sizeCtl) < 0)
Thread.yield(); // lost initialization race; just spin
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
if ((tab = table) == null || tab.length == 0) {
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
break;
}
}
return tab;
}
- 通過(guò)sc變量來(lái)控制初始化過(guò)程,
sc=-1在進(jìn)行容器初始化,其他線(xiàn)程循環(huán)等待并讓出CPU(Thread.yield()) - 初始化
Node[]數(shù)組對(duì)象,并返回
存放元素
final V putVal(K key, V value, boolean onlyIfAbsent) {
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null,new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}else {
V oldVal = null;
synchronized (f) {
if (tabAt(tab, i) == f) {
if (fh >= 0) {
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
addCount(1L, binCount);
return null;
}
- 獲取容器中
Node[]數(shù)組的頭結(jié)點(diǎn),如果頭結(jié)點(diǎn)為空,則構(gòu)造新節(jié)點(diǎn),并添加到該頭結(jié)點(diǎn)上(CAS)。 - 如果不是頭節(jié)點(diǎn),則使用
synchronized鎖住頭節(jié)點(diǎn),循環(huán)進(jìn)行鏈表尾部添加
5.2.3 容器擴(kuò)容
https://www.cnblogs.com/menghuantiancheng/p/10462370.html
5.2.4 獲取值
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
int h = spread(key.hashCode());
if ((tab = table) != null && (n = tab.length) >0&&(e = tabAt(tab, (n - 1) & h)) != null) {
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
} else if (eh < 0)
return (p = e.find(h, key)) != null ? p.val : null;
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}
- 獲取key相應(yīng)的
hash值,定位到Node[]數(shù)組的索引位,依次循環(huán)遍歷鏈表,最終定位到數(shù)據(jù)。
參考文檔:
https://www.cnblogs.com/weiweiblog/archive/2018/09/09/9611271.html