此文部分內(nèi)容來自 https://javadoop.com/post/hashmap
Hashmap
多線程死循環(huán)
主要是多線程同時(shí)put時(shí),如果同時(shí)觸發(fā)了rehash操作,會(huì)導(dǎo)致HashMap中的鏈表中出現(xiàn)循環(huán)節(jié)點(diǎn),進(jìn)而使得后面get的時(shí)候,會(huì)死循環(huán)。
ConcurrentHashmap JDK7
ConcurrentHashMap允許多個(gè)修改(寫)操作并發(fā)進(jìn)行,其關(guān)鍵在于使用了鎖分段技術(shù),它使用了不同的鎖來控制對(duì)哈希表的不同部分進(jìn)行的修改(寫),而 ConcurrentHashMap 內(nèi)部使用段(Segment)來表示這些不同的部分。
segmentMask; // 用于定位段,大小等于segments數(shù)組的大小減 1,是不可變的
segmentShift; // 用于定位段,大小等于32(hash值的位數(shù))減去對(duì)segments的大小取以2為底的對(duì)數(shù)值,是不可變的
1 Segment
Segment 類繼承于 ReentrantLock 類,從而使得 Segment 對(duì)象能充當(dāng)鎖的角色。每個(gè) Segment 對(duì)象用來守護(hù)它的成員對(duì)象 table 中包含的若干個(gè)桶。table 是一個(gè)由 HashEntry 對(duì)象組成的鏈表數(shù)組,table 數(shù)組的每一個(gè)數(shù)組成員就是一個(gè)桶。
segment中的字段
volatile int count 是用于計(jì)數(shù)每個(gè)segment管理的所有 table 數(shù)組包含的 HashEntry 對(duì)象的個(gè)數(shù),count 不是concurrenthashmap的全局變量的原因是 當(dāng)需要更新count的時(shí)候不需要鎖住整個(gè)CHM。
volatile HashEntry<K,V>[] table 鏈表數(shù)組
2 HashEntry
==CHM中不需要對(duì)讀加鎖的原因== 其一是HashEntry內(nèi) key hash next這些字段都時(shí)final的,所以hashentry對(duì)象基本是不可變的 另一個(gè)原因則是 value域也被volatile修飾所以可以確保讀線程讀到最后一次更新后的值。
next是final意味著不能從hash鏈的中間和尾部添加刪除節(jié)點(diǎn),所以所有節(jié)點(diǎn)的修改只能從頭部開始,對(duì)于put操作也是頭插法,而remove()刪除節(jié)點(diǎn)的時(shí)候也是需要重新new這個(gè)節(jié)點(diǎn)前面的所有節(jié)點(diǎn)并刪除原來的節(jié)點(diǎn)再把前部的最后一個(gè)節(jié)點(diǎn)的next指向刪除節(jié)點(diǎn)的后一個(gè)節(jié)點(diǎn)。
3 concurrentHashMap的構(gòu)造函數(shù)
主要有三個(gè)參數(shù) 初始容量、負(fù)載因子 和 并發(fā)級(jí)別,默認(rèn)分別為16,0.15,16 依據(jù)這三個(gè)來初始化segments數(shù)組(長度為2的n次冪),段偏移量 segmentShift(32-n),段掩碼segmentMask(2^n-1 二進(jìn)制為n個(gè)1),每個(gè)segment。
4 concurrentHashMap的數(shù)據(jù)結(jié)構(gòu)

5 定位操作
final Segment<K,V> segmentFor(int hash) {
return segments[(hash >>> segmentShift) & segmentMask];
}
讓key的高n位和段掩碼做與操作。就可以具體定位到桶位。因?yàn)?segmentshift是 32-n
并發(fā)存取
線程對(duì)映射表做讀操作的時(shí)候不需要加鎖,而對(duì)容器做結(jié)構(gòu)型的修改操作(put remove)則需要加鎖
put操作
==ConcurrentHashMap不同于HashMap,它既不允許key值為null,也不允許value值為null。==
當(dāng)我們向ConcurrentHashMap中put一個(gè)Key/Value對(duì)時(shí),首先會(huì)獲得Key的哈希值并對(duì)其再次哈希,然后根據(jù)最終的hash值定位到這條記錄所應(yīng)該插入的段。(見上面的第5點(diǎn))
JDK7 put源碼
//主流程
public V put(K key, V value) {
Segment<K,V> s;
if (value == null)
throw new NullPointerException();
// 1. 計(jì)算 key 的 hash 值
int hash = hash(key);
// 2. 根據(jù) hash 值找到 Segment 數(shù)組中的位置 j
// hash 是 32 位,無符號(hào)右移 segmentShift(28) 位,剩下低 4 位,
// 然后和 segmentMask(15) 做一次與操作,也就是說 j 是 hash 值的最后 4 位,也就是槽的數(shù)組下標(biāo)
int j = (hash >>> segmentShift) & segmentMask;
// 剛剛說了,初始化的時(shí)候初始化了 segment[0],但是其他位置還是 null,
// ensureSegment(j) 對(duì) segment[j] 進(jìn)行初始化
if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck
(segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment
s = ensureSegment(j);
// 3. 插入新值到 槽 s 中
return s.put(key, hash, value, false);
}
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
// 在往該 segment 寫入前,需要先獲取該 segment 的獨(dú)占鎖
HashEntry<K,V> node = tryLock() ? null :
scanAndLockForPut(key, hash, value);
V oldValue;
try {
// 這個(gè)是 segment 內(nèi)部的數(shù)組
HashEntry<K,V>[] tab = table;
// 再利用 hash 值,求應(yīng)該放置的數(shù)組下標(biāo)
int index = (tab.length - 1) & hash;
// first 是數(shù)組該位置處的鏈表的表頭
HashEntry<K,V> first = entryAt(tab, index);
// 下面這串 for 循環(huán)雖然很長,不過也很好理解,想想該位置沒有任何元素和已經(jīng)存在一個(gè)鏈表這兩種情況
for (HashEntry<K,V> e = first;;) {
if (e != null) {
K k;
if ((k = e.key) == key ||
(e.hash == hash && key.equals(k))) {
oldValue = e.value;
if (!onlyIfAbsent) {
// 覆蓋舊值
e.value = value;
++modCount;
}
break;
}
// 繼續(xù)順著鏈表走
e = e.next;
}
else {
// node 到底是不是 null,這個(gè)要看獲取鎖的過程,不過和這里都沒有關(guān)系。
// 如果不為 null,那就直接將它設(shè)置為鏈表表頭;如果是null,初始化并設(shè)置為鏈表表頭。
if (node != null)
node.setNext(first);
else
node = new HashEntry<K,V>(hash, key, value, first);
int c = count + 1;
// 如果超過了該 segment 的閾值,這個(gè) segment 需要擴(kuò)容
if (c > threshold && tab.length < MAXIMUM_CAPACITY)
rehash(node); // 擴(kuò)容后面也會(huì)具體分析
else
// 沒有達(dá)到閾值,將 node 放到數(shù)組 tab 的 index 位置,
// 其實(shí)就是將新的節(jié)點(diǎn)設(shè)置成原鏈表的表頭
setEntryAt(tab, index, node);
++modCount;
count = c;
oldValue = null;
break;
}
}
} finally {
// 解鎖
unlock();
}
return oldValue;
獲取寫入鎖
node = tryLock() ? null : scanAndLockForPut(key, hash, value)根據(jù)tryLock()快速獲取這個(gè)segment的獨(dú)占鎖,如果獲取失敗 就進(jìn)入到 scanAndLockForPut(key,hash,value)來獲取鎖該方法的源碼如下
private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
HashEntry<K,V> first = entryForHash(this, hash);
HashEntry<K,V> e = first;
HashEntry<K,V> node = null;
int retries = -1; // negative while locating node
// 循環(huán)獲取鎖
while (!tryLock()) {
HashEntry<K,V> f; // to recheck first below
if (retries < 0) {
if (e == null) {
if (node == null) // speculatively create node
// 進(jìn)到這里說明數(shù)組該位置的鏈表是空的,沒有任何元素
// 當(dāng)然,進(jìn)到這里的另一個(gè)原因是 tryLock() 失敗,所以該槽存在并發(fā),不一定是該位置
node = new HashEntry<K,V>(hash, key, value, null);
retries = 0;
}
else if (key.equals(e.key))
retries = 0;
else
// 順著鏈表往下走
e = e.next;
}
// 重試次數(shù)如果超過 MAX_SCAN_RETRIES(單核1多核64),那么不搶了,進(jìn)入到阻塞隊(duì)列等待鎖
// lock() 是阻塞方法,直到獲取鎖后返回
else if (++retries > MAX_SCAN_RETRIES) {
lock();
break;
}
else if ((retries & 1) == 0 &&
// 這個(gè)時(shí)候是有大問題了,那就是有新的元素進(jìn)到了鏈表,成為了新的表頭
// 所以這邊的策略是,相當(dāng)于重新進(jìn)入 scanAndLockForPut 方法
(f = entryForHash(this, hash)) != first) {
e = first = f; // re-traverse if entry changed
retries = -1;
}
}
return node;
}
這個(gè)方法有兩個(gè)出口,一個(gè)是 tryLock() 成功了,循環(huán)終止,另一個(gè)就是重試次數(shù)超過了 MAX_SCAN_RETRIES,進(jìn)到 lock() 方法,此方法會(huì)阻塞等待,直到成功拿到獨(dú)占鎖。
實(shí)質(zhì)上這個(gè)方法的功能是獲取segment的獨(dú)占鎖,然后實(shí)例化node
rehash
segment數(shù)組不能擴(kuò)容 ,擴(kuò)容是針對(duì)于HashEntry[]
/*
* Reclassify nodes in each list to new table. Because we
* are using power-of-two expansion, the elements from
* each bin must either stay at same index, or move with a
* power of two offset. We eliminate unnecessary node
* creation by catching cases where old nodes can be
* reused because their next fields won't change.
* Statistically, at the default threshold, only about
* one-sixth of them need cloning when a table
* doubles. The nodes they replace will be garbage
* collectable as soon as they are no longer referenced by
* any reader thread that may be in the midst of
* concurrently traversing table. Entry accesses use plain
* array indexing because they are followed by volatile
* table write.
*/
// 方法參數(shù)上的 node 是這次擴(kuò)容后,需要添加到新的數(shù)組中的數(shù)據(jù)。
private void rehash(HashEntry<K,V> node) {
HashEntry<K,V>[] oldTable = table;
int oldCapacity = oldTable.length;
// 2 倍
int newCapacity = oldCapacity << 1;
threshold = (int)(newCapacity * loadFactor);
// 創(chuàng)建新數(shù)組
HashEntry<K,V>[] newTable =
(HashEntry<K,V>[]) new HashEntry[newCapacity];
// 新的掩碼,如從 16 擴(kuò)容到 32,那么 sizeMask 為 31,對(duì)應(yīng)二進(jìn)制 ‘000...00011111’
int sizeMask = newCapacity - 1;
// 遍歷原數(shù)組,老套路,將原數(shù)組位置 i 處的鏈表拆分到 新數(shù)組位置 i 和 i+oldCap 兩個(gè)位置
for (int i = 0; i < oldCapacity ; i++) {
// e 是鏈表的第一個(gè)元素
HashEntry<K,V> e = oldTable[i];
if (e != null) {
HashEntry<K,V> next = e.next;
// 計(jì)算應(yīng)該放置在新數(shù)組中的位置,
// 假設(shè)原數(shù)組長度為 16,e 在 oldTable[3] 處,那么 idx 只可能是 3 或者是 3 + 16 = 19
int idx = e.hash & sizeMask;
if (next == null) // 該位置處只有一個(gè)元素,那比較好辦
newTable[idx] = e;
else { // Reuse consecutive sequence at same slot
// e 是鏈表表頭
HashEntry<K,V> lastRun = e;
// idx 是當(dāng)前鏈表的頭結(jié)點(diǎn) e 的新位置
int lastIdx = idx;
// 下面這個(gè) for 循環(huán)會(huì)找到一個(gè) lastRun 節(jié)點(diǎn),這個(gè)節(jié)點(diǎn)之后的所有元素是將要放到一起的
// 尋找k值相同的子鏈,該子鏈尾節(jié)點(diǎn)與父鏈的尾節(jié)點(diǎn)必須是同一個(gè)
for (HashEntry<K,V> last = next;
last != null;
last = last.next) {
int k = last.hash & sizeMask;
if (k != lastIdx) {
lastIdx = k;
lastRun = last;
}
}
// 將 lastRun 及其之后的所有節(jié)點(diǎn)組成的這個(gè)鏈表放到 lastIdx 這個(gè)位置
newTable[lastIdx] = lastRun;
// 下面的操作是處理 lastRun 之前的節(jié)點(diǎn),
// 這些節(jié)點(diǎn)可能分配在另一個(gè)鏈表中,也可能分配到上面的那個(gè)鏈表中
for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
V v = p.value;
int h = p.hash;
int k = h & sizeMask;
HashEntry<K,V> n = newTable[k];
newTable[k] = new HashEntry<K,V>(h, p.key, v, n);
}
}
}
}
// 將新來的 node 放到新數(shù)組中剛剛的 兩個(gè)鏈表之一 的 頭部
int nodeIndex = node.hash & sizeMask; // add the new node
node.setNext(newTable[nodeIndex]);
newTable[nodeIndex] = node;
table = newTable;
}
擴(kuò)容后鍵值對(duì)的新位置要么和原位置一樣,要么就是原位+舊數(shù)組的長度
上面的代碼先找出擴(kuò)容前后需要轉(zhuǎn)移的點(diǎn),先執(zhí)行轉(zhuǎn)移,再把該鏈條上剩下的節(jié)點(diǎn)轉(zhuǎn)移,這么寫是起到了復(fù)用的效果,官方注釋中也說了 at the default threshold, only about one-sixth of them need cloning when a table doubles 在默認(rèn)閾值下只有大約1/6的節(jié)點(diǎn)需要被克隆。
get比較簡(jiǎn)單不需要加鎖步驟是
- 計(jì)算hash值找到segment數(shù)組的位置
- segment中根據(jù)hash找到hashEntry數(shù)組中具體的位置對(duì)應(yīng)的鏈表
- 遍歷鏈表找到具體的KV
源碼如下
public V get(Object key) {
Segment<K,V> s; // manually integrate access methods to reduce overhead
HashEntry<K,V>[] tab;
// 1. hash 值
int h = hash(key);
long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
// 2. 根據(jù) hash 找到對(duì)應(yīng)的 segment
if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
(tab = s.table) != null) {
// 3. 找到segment 內(nèi)部數(shù)組相應(yīng)位置的鏈表,遍歷
for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
(tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
e != null; e = e.next) {
K k;
if ((k = e.key) == key || (e.hash == h && key.equals(k)))
return e.value;
}
}
return null;
}
Q:如果在執(zhí)行g(shù)et的時(shí)候在同一個(gè)segment中發(fā)生了put或remove操作呢?
1.put操作的線程安全性的體現(xiàn)
- 使用CAS在初始化segment數(shù)組
- 鏈表中添加節(jié)點(diǎn)使用頭插法,如果此時(shí)get遍歷鏈表已經(jīng)到了中間就不會(huì)對(duì)get操作有影響,如果put happens-before get 需要get到剛剛插入到的頭節(jié)點(diǎn),就依賴setEntryAt方法中使用的 UNSAFE.putOrderedObject.
- 擴(kuò)容,創(chuàng)建了新的數(shù)組如果這個(gè)時(shí)候有g(shù)et,get先行就在舊table上查詢,put先行,put的操作可見性保證就是table使用了volatile關(guān)鍵字
transient volatile HashEntry<K,V>[] table;
- remove操作的線程安全性
remove操作會(huì)更改鏈表的結(jié)構(gòu),如果remove的是get已經(jīng)遍歷過的節(jié)點(diǎn),則不會(huì)發(fā)生問題。
如果 remove 先破壞了一個(gè)節(jié)點(diǎn),分兩種情況考慮。 1、如果此節(jié)點(diǎn)是頭結(jié)點(diǎn),那么需要將頭結(jié)點(diǎn)的 next 設(shè)置為數(shù)組該位置的元素,table 雖然使用了 volatile 修飾,但是 volatile 并不能提供數(shù)組內(nèi)部操作的可見性保證,所以源碼中使用了 UNSAFE 來操作數(shù)組,請(qǐng)看方法 setEntryAt。2、如果要?jiǎng)h除的節(jié)點(diǎn)不是頭結(jié)點(diǎn),它會(huì)將要?jiǎng)h除節(jié)點(diǎn)的后繼節(jié)點(diǎn)接到前驅(qū)節(jié)點(diǎn)中,這里的并發(fā)保證就是 next 屬性是 volatile 的。
ConcurrentHashmap JDK8
類似于8中的HashMap的改進(jìn),ConcurrentHashMap也引入了紅黑樹。
