ConcurrentHashmap源碼分析

此文部分內(nèi)容來自 https://javadoop.com/post/hashmap

Hashmap

多線程死循環(huán)
主要是多線程同時(shí)put時(shí),如果同時(shí)觸發(fā)了rehash操作,會(huì)導(dǎo)致HashMap中的鏈表中出現(xiàn)循環(huán)節(jié)點(diǎn),進(jìn)而使得后面get的時(shí)候,會(huì)死循環(huán)。

ConcurrentHashmap JDK7

ConcurrentHashMap允許多個(gè)修改(寫)操作并發(fā)進(jìn)行,其關(guān)鍵在于使用了鎖分段技術(shù),它使用了不同的鎖來控制對(duì)哈希表的不同部分進(jìn)行的修改(寫),而 ConcurrentHashMap 內(nèi)部使用段(Segment)來表示這些不同的部分。

segmentMask; // 用于定位段,大小等于segments數(shù)組的大小減 1,是不可變的
segmentShift; // 用于定位段,大小等于32(hash值的位數(shù))減去對(duì)segments的大小取以2為底的對(duì)數(shù)值,是不可變的

1 Segment
Segment 類繼承于 ReentrantLock 類,從而使得 Segment 對(duì)象能充當(dāng)鎖的角色。每個(gè) Segment 對(duì)象用來守護(hù)它的成員對(duì)象 table 中包含的若干個(gè)桶。table 是一個(gè)由 HashEntry 對(duì)象組成的鏈表數(shù)組,table 數(shù)組的每一個(gè)數(shù)組成員就是一個(gè)桶。
segment中的字段

volatile int count 是用于計(jì)數(shù)每個(gè)segment管理的所有 table 數(shù)組包含的 HashEntry 對(duì)象的個(gè)數(shù),count 不是concurrenthashmap的全局變量的原因是 當(dāng)需要更新count的時(shí)候不需要鎖住整個(gè)CHM。

volatile HashEntry<K,V>[] table 鏈表數(shù)組

2 HashEntry
==CHM中不需要對(duì)讀加鎖的原因== 其一是HashEntry內(nèi) key hash next這些字段都時(shí)final的,所以hashentry對(duì)象基本是不可變的 另一個(gè)原因則是 value域也被volatile修飾所以可以確保讀線程讀到最后一次更新后的值。

next是final意味著不能從hash鏈的中間和尾部添加刪除節(jié)點(diǎn),所以所有節(jié)點(diǎn)的修改只能從頭部開始,對(duì)于put操作也是頭插法,而remove()刪除節(jié)點(diǎn)的時(shí)候也是需要重新new這個(gè)節(jié)點(diǎn)前面的所有節(jié)點(diǎn)并刪除原來的節(jié)點(diǎn)再把前部的最后一個(gè)節(jié)點(diǎn)的next指向刪除節(jié)點(diǎn)的后一個(gè)節(jié)點(diǎn)。

3 concurrentHashMap的構(gòu)造函數(shù)
主要有三個(gè)參數(shù) 初始容量、負(fù)載因子 和 并發(fā)級(jí)別,默認(rèn)分別為16,0.15,16 依據(jù)這三個(gè)來初始化segments數(shù)組(長度為2的n次冪),段偏移量 segmentShift(32-n),段掩碼segmentMask(2^n-1 二進(jìn)制為n個(gè)1),每個(gè)segment。

4 concurrentHashMap的數(shù)據(jù)結(jié)構(gòu)

image

5 定位操作

final Segment<K,V> segmentFor(int hash) {
    return segments[(hash >>> segmentShift) & segmentMask];
}

讓key的高n位和段掩碼做與操作。就可以具體定位到桶位。因?yàn)?segmentshift是 32-n

并發(fā)存取

線程對(duì)映射表做讀操作的時(shí)候不需要加鎖,而對(duì)容器做結(jié)構(gòu)型的修改操作(put remove)則需要加鎖

put操作

==ConcurrentHashMap不同于HashMap,它既不允許key值為null,也不允許value值為null。==
當(dāng)我們向ConcurrentHashMap中put一個(gè)Key/Value對(duì)時(shí),首先會(huì)獲得Key的哈希值并對(duì)其再次哈希,然后根據(jù)最終的hash值定位到這條記錄所應(yīng)該插入的段。(見上面的第5點(diǎn))

JDK7 put源碼

//主流程
public V put(K key, V value) {
    Segment<K,V> s;
    if (value == null)
        throw new NullPointerException();
    // 1. 計(jì)算 key 的 hash 值
    int hash = hash(key);
    // 2. 根據(jù) hash 值找到 Segment 數(shù)組中的位置 j
    //    hash 是 32 位,無符號(hào)右移 segmentShift(28) 位,剩下低 4 位,
    //    然后和 segmentMask(15) 做一次與操作,也就是說 j 是 hash 值的最后 4 位,也就是槽的數(shù)組下標(biāo)
    int j = (hash >>> segmentShift) & segmentMask;
    // 剛剛說了,初始化的時(shí)候初始化了 segment[0],但是其他位置還是 null,
    // ensureSegment(j) 對(duì) segment[j] 進(jìn)行初始化
    if ((s = (Segment<K,V>)UNSAFE.getObject          // nonvolatile; recheck
         (segments, (j << SSHIFT) + SBASE)) == null) //  in ensureSegment
        s = ensureSegment(j);
    // 3. 插入新值到 槽 s 中
    return s.put(key, hash, value, false);
}
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
    // 在往該 segment 寫入前,需要先獲取該 segment 的獨(dú)占鎖
    HashEntry<K,V> node = tryLock() ? null :
        scanAndLockForPut(key, hash, value);
    V oldValue;
    try {
        // 這個(gè)是 segment 內(nèi)部的數(shù)組
        HashEntry<K,V>[] tab = table;
        // 再利用 hash 值,求應(yīng)該放置的數(shù)組下標(biāo)
        int index = (tab.length - 1) & hash;
        // first 是數(shù)組該位置處的鏈表的表頭
        HashEntry<K,V> first = entryAt(tab, index);
 
        // 下面這串 for 循環(huán)雖然很長,不過也很好理解,想想該位置沒有任何元素和已經(jīng)存在一個(gè)鏈表這兩種情況
        for (HashEntry<K,V> e = first;;) {
            if (e != null) {
                K k;
                if ((k = e.key) == key ||
                    (e.hash == hash && key.equals(k))) {
                    oldValue = e.value;
                    if (!onlyIfAbsent) {
                        // 覆蓋舊值
                        e.value = value;
                        ++modCount;
                    }
                    break;
                }
                // 繼續(xù)順著鏈表走
                e = e.next;
            }
            else {
                // node 到底是不是 null,這個(gè)要看獲取鎖的過程,不過和這里都沒有關(guān)系。
                // 如果不為 null,那就直接將它設(shè)置為鏈表表頭;如果是null,初始化并設(shè)置為鏈表表頭。
                if (node != null)
                    node.setNext(first);
                else
                    node = new HashEntry<K,V>(hash, key, value, first);
 
                int c = count + 1;
                // 如果超過了該 segment 的閾值,這個(gè) segment 需要擴(kuò)容
                if (c > threshold && tab.length < MAXIMUM_CAPACITY)
                    rehash(node); // 擴(kuò)容后面也會(huì)具體分析
                else
                    // 沒有達(dá)到閾值,將 node 放到數(shù)組 tab 的 index 位置,
                    // 其實(shí)就是將新的節(jié)點(diǎn)設(shè)置成原鏈表的表頭
                    setEntryAt(tab, index, node);
                ++modCount;
                count = c;
                oldValue = null;
                break;
            }
        }
    } finally {
        // 解鎖
        unlock();
    }
    return oldValue;

獲取寫入鎖

node = tryLock() ? null : scanAndLockForPut(key, hash, value)根據(jù)tryLock()快速獲取這個(gè)segment的獨(dú)占鎖,如果獲取失敗 就進(jìn)入到 scanAndLockForPut(key,hash,value)來獲取鎖該方法的源碼如下

private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
    HashEntry<K,V> first = entryForHash(this, hash);
    HashEntry<K,V> e = first;
    HashEntry<K,V> node = null;
    int retries = -1; // negative while locating node
 
    // 循環(huán)獲取鎖
    while (!tryLock()) {
        HashEntry<K,V> f; // to recheck first below
        if (retries < 0) {
            if (e == null) {
                if (node == null) // speculatively create node
                    // 進(jìn)到這里說明數(shù)組該位置的鏈表是空的,沒有任何元素
                    // 當(dāng)然,進(jìn)到這里的另一個(gè)原因是 tryLock() 失敗,所以該槽存在并發(fā),不一定是該位置
                    node = new HashEntry<K,V>(hash, key, value, null);
                retries = 0;
            }
            else if (key.equals(e.key))
                retries = 0;
            else
                // 順著鏈表往下走
                e = e.next;
        }
        // 重試次數(shù)如果超過 MAX_SCAN_RETRIES(單核1多核64),那么不搶了,進(jìn)入到阻塞隊(duì)列等待鎖
        //    lock() 是阻塞方法,直到獲取鎖后返回
        else if (++retries > MAX_SCAN_RETRIES) {
            lock();
            break;
        }
        else if ((retries & 1) == 0 &&
                 // 這個(gè)時(shí)候是有大問題了,那就是有新的元素進(jìn)到了鏈表,成為了新的表頭
                 //     所以這邊的策略是,相當(dāng)于重新進(jìn)入 scanAndLockForPut 方法
                 (f = entryForHash(this, hash)) != first) {
            e = first = f; // re-traverse if entry changed
            retries = -1;
        }
    }
    return node;
}

這個(gè)方法有兩個(gè)出口,一個(gè)是 tryLock() 成功了,循環(huán)終止,另一個(gè)就是重試次數(shù)超過了 MAX_SCAN_RETRIES,進(jìn)到 lock() 方法,此方法會(huì)阻塞等待,直到成功拿到獨(dú)占鎖。
實(shí)質(zhì)上這個(gè)方法的功能是獲取segment的獨(dú)占鎖,然后實(shí)例化node

rehash
segment數(shù)組不能擴(kuò)容 ,擴(kuò)容是針對(duì)于HashEntry[]


/*
     * Reclassify nodes in each list to new table.  Because we
     * are using power-of-two expansion, the elements from
     * each bin must either stay at same index, or move with a
     * power of two offset. We eliminate unnecessary node
     * creation by catching cases where old nodes can be
     * reused because their next fields won't change.
     * Statistically, at the default threshold, only about
     * one-sixth of them need cloning when a table
     * doubles. The nodes they replace will be garbage
     * collectable as soon as they are no longer referenced by
     * any reader thread that may be in the midst of
     * concurrently traversing table. Entry accesses use plain
     * array indexing because they are followed by volatile
     * table write.
     */
// 方法參數(shù)上的 node 是這次擴(kuò)容后,需要添加到新的數(shù)組中的數(shù)據(jù)。
private void rehash(HashEntry<K,V> node) {
    HashEntry<K,V>[] oldTable = table;
    int oldCapacity = oldTable.length;
    // 2 倍
    int newCapacity = oldCapacity << 1;
    threshold = (int)(newCapacity * loadFactor);
    // 創(chuàng)建新數(shù)組
    HashEntry<K,V>[] newTable =
        (HashEntry<K,V>[]) new HashEntry[newCapacity];
    // 新的掩碼,如從 16 擴(kuò)容到 32,那么 sizeMask 為 31,對(duì)應(yīng)二進(jìn)制 ‘000...00011111’
    int sizeMask = newCapacity - 1;
 
    // 遍歷原數(shù)組,老套路,將原數(shù)組位置 i 處的鏈表拆分到 新數(shù)組位置 i 和 i+oldCap 兩個(gè)位置
    for (int i = 0; i < oldCapacity ; i++) {
        // e 是鏈表的第一個(gè)元素
        HashEntry<K,V> e = oldTable[i];
        if (e != null) {
            HashEntry<K,V> next = e.next;
            // 計(jì)算應(yīng)該放置在新數(shù)組中的位置,
            // 假設(shè)原數(shù)組長度為 16,e 在 oldTable[3] 處,那么 idx 只可能是 3 或者是 3 + 16 = 19
            int idx = e.hash & sizeMask;
            if (next == null)   // 該位置處只有一個(gè)元素,那比較好辦
                newTable[idx] = e;
            else { // Reuse consecutive sequence at same slot
                // e 是鏈表表頭
                HashEntry<K,V> lastRun = e;
                // idx 是當(dāng)前鏈表的頭結(jié)點(diǎn) e 的新位置
                int lastIdx = idx;
 
                // 下面這個(gè) for 循環(huán)會(huì)找到一個(gè) lastRun 節(jié)點(diǎn),這個(gè)節(jié)點(diǎn)之后的所有元素是將要放到一起的 
                // 尋找k值相同的子鏈,該子鏈尾節(jié)點(diǎn)與父鏈的尾節(jié)點(diǎn)必須是同一個(gè)
                for (HashEntry<K,V> last = next;
                     last != null;
                     last = last.next) {
                    int k = last.hash & sizeMask;
                    if (k != lastIdx) {
                        lastIdx = k;
                        lastRun = last;
                    }
                }
                // 將 lastRun 及其之后的所有節(jié)點(diǎn)組成的這個(gè)鏈表放到 lastIdx 這個(gè)位置
                newTable[lastIdx] = lastRun;
                // 下面的操作是處理 lastRun 之前的節(jié)點(diǎn),
                //    這些節(jié)點(diǎn)可能分配在另一個(gè)鏈表中,也可能分配到上面的那個(gè)鏈表中
                for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
                    V v = p.value;
                    int h = p.hash;
                    int k = h & sizeMask;
                    HashEntry<K,V> n = newTable[k];
                    newTable[k] = new HashEntry<K,V>(h, p.key, v, n);
                }
            }
        }
    }
    // 將新來的 node 放到新數(shù)組中剛剛的 兩個(gè)鏈表之一 的 頭部
    int nodeIndex = node.hash & sizeMask; // add the new node
    node.setNext(newTable[nodeIndex]);
    newTable[nodeIndex] = node;
    table = newTable;
}

擴(kuò)容后鍵值對(duì)的新位置要么和原位置一樣,要么就是原位+舊數(shù)組的長度
上面的代碼先找出擴(kuò)容前后需要轉(zhuǎn)移的點(diǎn),先執(zhí)行轉(zhuǎn)移,再把該鏈條上剩下的節(jié)點(diǎn)轉(zhuǎn)移,這么寫是起到了復(fù)用的效果,官方注釋中也說了 at the default threshold, only about one-sixth of them need cloning when a table doubles 在默認(rèn)閾值下只有大約1/6的節(jié)點(diǎn)需要被克隆。

get比較簡(jiǎn)單不需要加鎖步驟是

  1. 計(jì)算hash值找到segment數(shù)組的位置
  2. segment中根據(jù)hash找到hashEntry數(shù)組中具體的位置對(duì)應(yīng)的鏈表
  3. 遍歷鏈表找到具體的KV

源碼如下

public V get(Object key) {
    Segment<K,V> s; // manually integrate access methods to reduce overhead
    HashEntry<K,V>[] tab;
    // 1. hash 值
    int h = hash(key);
    long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
    // 2. 根據(jù) hash 找到對(duì)應(yīng)的 segment
    if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
        (tab = s.table) != null) {
        // 3. 找到segment 內(nèi)部數(shù)組相應(yīng)位置的鏈表,遍歷
        for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
                 (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
             e != null; e = e.next) {
            K k;
            if ((k = e.key) == key || (e.hash == h && key.equals(k)))
                return e.value;
        }
    }
    return null;
}

Q:如果在執(zhí)行g(shù)et的時(shí)候在同一個(gè)segment中發(fā)生了put或remove操作呢?
1.put操作的線程安全性的體現(xiàn)

  • 使用CAS在初始化segment數(shù)組
  • 鏈表中添加節(jié)點(diǎn)使用頭插法,如果此時(shí)get遍歷鏈表已經(jīng)到了中間就不會(huì)對(duì)get操作有影響,如果put happens-before get 需要get到剛剛插入到的頭節(jié)點(diǎn),就依賴setEntryAt方法中使用的 UNSAFE.putOrderedObject.
  • 擴(kuò)容,創(chuàng)建了新的數(shù)組如果這個(gè)時(shí)候有g(shù)et,get先行就在舊table上查詢,put先行,put的操作可見性保證就是table使用了volatile關(guān)鍵字transient volatile HashEntry<K,V>[] table;
  1. remove操作的線程安全性
    remove操作會(huì)更改鏈表的結(jié)構(gòu),如果remove的是get已經(jīng)遍歷過的節(jié)點(diǎn),則不會(huì)發(fā)生問題。
    如果 remove 先破壞了一個(gè)節(jié)點(diǎn),分兩種情況考慮。 1、如果此節(jié)點(diǎn)是頭結(jié)點(diǎn),那么需要將頭結(jié)點(diǎn)的 next 設(shè)置為數(shù)組該位置的元素,table 雖然使用了 volatile 修飾,但是 volatile 并不能提供數(shù)組內(nèi)部操作的可見性保證,所以源碼中使用了 UNSAFE 來操作數(shù)組,請(qǐng)看方法 setEntryAt。2、如果要?jiǎng)h除的節(jié)點(diǎn)不是頭結(jié)點(diǎn),它會(huì)將要?jiǎng)h除節(jié)點(diǎn)的后繼節(jié)點(diǎn)接到前驅(qū)節(jié)點(diǎn)中,這里的并發(fā)保證就是 next 屬性是 volatile 的。

ConcurrentHashmap JDK8

類似于8中的HashMap的改進(jìn),ConcurrentHashMap也引入了紅黑樹。


image
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容