集合框架 (第 09 篇) 源碼分析:jdk1.7版 ConcurrentHashMap

一、集合框架源碼分析

原文持續(xù)更新鏈接: https://github.com/about-cloud/JavaCore

正文


本文是基于 jdk1.7.0_79 分析

本文內容較多,我刪減后篇幅還是較長,長期有耐心,慢慢解讀吧。

零、非線程安全HashMap

前面文章分析了 HashMap 源碼,但其操作是非現(xiàn)在安全的,比如兩個線程并發(fā)賦值,其中key相同,而value不相同,就有可能造成值覆蓋的情況。再比如一個線程并發(fā)刪操作、另一個線性并發(fā)寫操作,也可能造成空轉問題。java.util.concurrentConcurrentHashMap 是其線程安全的實現(xiàn)。

一、ConcurrentHashMap的擴展關系

public class ConcurrentHashMap<K, V> extends AbstractMap<K, V>
        implements ConcurrentMap<K, V>, Serializable

抽象類 AbstractMap 實現(xiàn)了一些常用的方法,接口 ConcurrentMap 也是繼承至 Map 接口,它具有并發(fā)操作的支持。

接口 ConcurrentMap 定義的未實現(xiàn)方法如下:

// 添加不存在的元素
V putIfAbsent(K key, V value);
// 刪除元素
boolean remove(Object key, Object value);
// 替換指定key的value,替換成功返回true,否則返回false
boolean replace(K key, V oldValue, V newValue);
// 替換指定key的value
V replace(K key, V value);

??????

二、ConcurrentHashMap數(shù)據(jù)結構

ConcurrentHashMap是基于 分段鎖 機制設計的,將一個大的Map分割成n個小的 段segment,對每段進行加鎖,降低了容器加鎖的粒子度,每段(segment)各自加鎖,互不影響,當一個線程訪問 Map 其中一段數(shù)據(jù)時,其他段的數(shù)據(jù)也能被線程正常訪問。分段鎖使用的鎖是 ReentrantLock 可重入鎖。

ConcurrentHashMap1.7v
重要的字段
/** table的默認初始容量 16,可以通過構造函數(shù)指定初始容量 */
static final int DEFAULT_INITIAL_CAPACITY = 16;
/** table的默認負載因子 0.75,可以通過構造函數(shù)指定負載因子大小 */
static final float DEFAULT_LOAD_FACTOR = 0.75f;
/** table 默認并發(fā)等級 16,可以通過構造函數(shù)指定并發(fā)等級 */
static final int DEFAULT_CONCURRENCY_LEVEL = 16;
/** 最大容量 10.7億+ */
static final int MAXIMUM_CAPACITY = 1 << 30;
/** 每個段表的最小容量。容量必須是 2的次冪,
  * 容量最少是2,以避免在延遲構造后,為了下次使用而又立即調整大小。
  */
static final int MIN_SEGMENT_TABLE_CAPACITY = 2;
/** 允許的最大段數(shù);可以是使用構造方法的參數(shù)指定。但必須是2的次冪,并且小于1 << 16。 */
static final int MAX_SEGMENTS = 1 << 16; // 略微保守的段數(shù)
/** 加鎖失敗時的重試次數(shù) */
static final int RETRIES_BEFORE_LOCK = 2;
/** 分割段時使用的掩碼值。用來對segment進行定位,判斷應該落在哪個段segment中 */
final int segmentMask;
/** 段內索引的偏移量 */
final int segmentShift;
/** 將原來整個大的哈希表分割成n個小的哈希表,這里的每段就是專用的小的哈希表。 */
final Segment<K,V>[] segments;
元素項

它的元素節(jié)點項 HashEntry<K,V> 是內部獨有靜態(tài)類,不像 HashMapEntry<K, V> 實現(xiàn)至 Map.Entry<K, V>。它們都很相似,但不同的是 節(jié)點HashEntry<K,V>final 修飾表示被會被繼承,在 HashEntry 靜態(tài)內部類的內部 key 和 hash 是被 final 修飾,賦予其不能被修改的特性。 value和指向下一個節(jié)點的變量next是被 volatile 修飾的,表示具有 可見性 ,所以 讀操作 在無需加鎖的情況下總能讀取最新的數(shù)據(jù) 。

static final class HashEntry<K,V> {
    // 具有不可更改特性的 hash 和 key
    final int hash;
    final K key;
    // 具有可見性的 value 和 next
    volatile V value;
    volatile HashEntry<K,V> next;

    HashEntry(int hash, K key, V value, HashEntry<K,V> next) {
        this.hash = hash;
        this.key = key;
        this.value = value;
        this.next = next;
    }

    /** 設置具有 volatile 特性的 next 屬性值 */
    final void setNext(HashEntry<K,V> n) {
        // objectFieldOffset 這個方法的意思是獲取字段屬性的偏移量(也就是內存位置)
        UNSAFE.putOrderedObject(this, nextOffset, n);
    }

    // Unsafe 操作
    ...
}

:tropical_drink:這里介紹一下 Unsafe 類,它位于 sun.misc. 包下,具有像 C語言 中指針一樣的功能,能大大提升效率,也能夠操作內存空間,更會存在像 指針 那樣的問題。它不屬于標準Java類庫,可以看成第三方開發(fā)工具庫。Oracle原計劃從Java 9中去掉Unsafe類,現(xiàn)在我看了下 Java 11 它已經(jīng)被移到 jdk.internal.misc 這個包下(jdk8時還在sun.misc包中 )。

三、segment段

ConcurrentHashMap 的主干就是Segment<K,V> segments,每個 segment 又含有 HashEntry<K, V> table ,你可以寬泛的認為每段就是一個 HashEntry 數(shù)組,就像 HashMap 的 Entry<K, V> table 一樣,但粒子度不在一個 level 上。

分段(segment) 的設計是 ConcurrentHashMap 所特有的功能。Segment<K,V> 類恰逢其時的繼承了 ReentrantLock 重入鎖的特性。作為 ReentrantLock 的子類,只是為了簡化一些 鎖:lock: 的設計,避免分開創(chuàng)建其他鎖。

static final class Segment<K,V> extends ReentrantLock implements Serializable {
    private static final long serialVersionUID = 2249069246763182397L;

    /** 當獲取鎖失敗時,重試次數(shù)。與Java虛擬機可用的CPU處理器的個數(shù)有關 */
    static final int MAX_SCAN_RETRIES =
        Runtime.getRuntime().availableProcessors() > 1 ? 64 : 1;
        // Runtime.getRuntime().availableProcessors() 獲取Java虛擬機可用的處理器數(shù)量

    /**
     * 每段被volatile修飾的 table散列表(實際存放元素的地方)。
     * 通過 entryAt/setEntryAt 方法訪問元素
     */
    transient volatile HashEntry<K,V>[] table;

    /** 元素的個數(shù)。僅在加鎖或者保持volatile可見性的情況下讀取(訪問)*/
    transient int count;

    /** 此段的被更改操作的總次數(shù) */
    transient int modCount;

    /** 閾值(容量 * 負載因子) */
    transient int threshold;

    /** 負載因子 */
    final float loadFactor;
    /** 有參構造方法 */
    Segment(float lf, int threshold, HashEntry<K,V>[] tab) {
        this.loadFactor = lf;
        this.threshold = threshold;
        this.table = tab;
    }
...
}

四、構造方法

ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel)
/**
 * @param initialCapacity  初始化容量
 * @param loadFactor       負載因子
 * @param concurrencyLevel 并發(fā)等級
 */
public ConcurrentHashMap(int initialCapacity,
                         float loadFactor, int concurrencyLevel) {
    // 負載因子必須大于0,初始化容量不能小于0,并發(fā)等級必須大于0
    // 否則就會拋出非法參數(shù)異常
    if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
        throw new IllegalArgumentException();
    // 保證并發(fā)等級最大為 1 << 16
    if (concurrencyLevel > MAX_SEGMENTS)
        concurrencyLevel = MAX_SEGMENTS;
    // 下面這些操作用戶計算 segments 長度、具體要分成多少段
    int sshift = 0;
    int ssize = 1;
    while (ssize < concurrencyLevel) {
        ++sshift;
        ssize <<= 1;
    }
    this.segmentShift = 32 - sshift;
    this.segmentMask = ssize - 1;
    if (initialCapacity > MAXIMUM_CAPACITY)
        initialCapacity = MAXIMUM_CAPACITY;
    int c = initialCapacity / ssize;
    if (c * ssize < initialCapacity)
        ++c;
    int cap = MIN_SEGMENT_TABLE_CAPACITY;
    while (cap < c)
        cap <<= 1;
    // 創(chuàng)建單個segment用于填充segments[0]位置(因為實際存儲元素是在HashEntry上)
    Segment<K,V> s0 =
        new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
                         (HashEntry<K,V>[])new HashEntry[cap]);
    // 創(chuàng)建 segments數(shù)組
    Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
    // 將 s0段填充到 segments[0]
    UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
    // 設置 segments
    this.segments = ss;
}

下面的這兩個構造方法調用的是上面這個構造方法,如果指定參數(shù)就使用指定的參數(shù),否則使用默認參數(shù)。

ConcurrentHashMap(int initialCapacity)
public ConcurrentHashMap(int initialCapacity) {
    this(initialCapacity, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);
}
ConcurrentHashMap()
public ConcurrentHashMap() {
        this(DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);
    }

另一個構造方法這里就不分析了

五、添加方法

通過上面的分析,我們知道 segments數(shù)組本身不是用來存放 元素 的,它是用來存儲 HashEntry<K, V>[] tab 數(shù)組的, 真正存儲元素的是 HashEntry<K, V>[] tab。下面我們通過源碼來分析元素是如何存儲的:

添加鍵值對的 put 方法

public V put(K key, V value) {
    Segment<K,V> s;
    // 不允許 value 為 null 值,否則拋出NPE
    if (value == null)
        throw new NullPointerException();
    // 計算key的哈希碼
    int hash = hash(key);
    // 計算應該落到哪一段segment(段號)
    int j = (hash >>> segmentShift) & segmentMask;
    // 如果該片段為null,那么進入ensureSegment(int k)方法處理
    if ((s = (Segment<K,V>)UNSAFE.getObject(segments, (j << SSHIFT) + SBASE)) == null)
        s = ensureSegment(j);
    // 添加并返回value
    return s.put(key, hash, value, false);
}

確保分段不為null的方法

/**
 * 返回指定的index處的segment段。如果不存在,就在 segments數(shù)組中(通過CAS自旋)創(chuàng)建并記錄
 *
 * @param k 指定段的索引
 * @return 此段segment
 */
@SuppressWarnings("unchecked")
private Segment<K,V> ensureSegment(int k) {
    // segment集合數(shù)組
    final Segment<K,V>[] ss = this.segments;
    long u = (k << SSHIFT) + SBASE; // 原始偏移量
    Segment<K,V> seg;
    // 獲取 segments(這里是ss)集合數(shù)組在偏移量u位置的那一段
    // 如果此段為null,那么就創(chuàng)建
    if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) {
        // 使用 segments[0]這一段作為原型
        Segment<K,V> proto = ss[0];
        // cap指的是 HashEntry<K, V> table數(shù)組的容量(capacity),也就是數(shù)組的長度
        int cap = proto.table.length;
        // 負載因子
        float lf = proto.loadFactor;
        // 閾值 = 容量 * 負載因子
        int threshold = (int)(cap * lf);
        // 有了必要的參數(shù)后,就開始構造實際存放元素的HashEntry數(shù)組了
        HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap];
        // 再次判斷想要的segment是否存在
        if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
            == null) {
            // 如果還未null那么久創(chuàng)建一個segment(并把構造號的HashEntry數(shù)組放入)
            Segment<K,V> s = new Segment<K,V>(lf, threshold, tab);
            // 通過CAS自旋搶占資源方式來確保剛構造的segment這一段放入 ss(既segments)中
            while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
                   == null) {
                if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))
                    break;
            }
        }
    }
    // 返回
    return seg;
}

實際將鍵值對添加到集合中的方法

// 這里很像HashMap添加元素的操作
// (可參考往期文章:https://github.com/about-cloud/JavaCore)
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
    // 先嘗試獲取鎖,如果成功獲取那么返回??null
    // 如果加鎖失敗,那么就通過scanAndLockForPut方法掃描加鎖來存放元素(詳見下面??分析)
    HashEntry<K,V> node = tryLock() ? null :
        scanAndLockForPut(key, hash, value);
    V oldValue;
    try {
        // 此段中存放元素的哈希表
        HashEntry<K,V>[] tab = table;
        // 通過 邏輯與&?? 計算出桶號(哈希槽位置)(請參考HashMap)
        int index = (tab.length - 1) & hash;
        // 根據(jù)桶號獲取桶頂?shù)脑?        HashEntry<K,V> first = entryAt(tab, index);
        // 遍歷鏈表
        for (HashEntry<K,V> e = first;;) {
            // 元素不為空的情況下
            if (e != null) {
                K k;
                // 通過比較 key 或者 key的哈希碼和key來判斷key是否相等
                // 如果 key 相等,那么替換value
                if ((k = e.key) == key || (e.hash == hash && key.equals(k))) {
                    oldValue = e.value;
                    if (!onlyIfAbsent) {
                        e.value = value;
                        ++modCount;
                    }
                    break;
                }
                // 指定的key與此元素的key不相等,那么就移到下一個元素
                e = e.next;
            }
            // 迭代到當前元素的為null情況
            else {
                // 通過scanAndLockForPut得到方法放入的元素
                if (node != null)
                    // 使用UNSAFE.putOrderedObject方法以“下沉”的方式,
                    // 將元素鏈入鏈表,新的元素在桶頂,舊的元素在下面
                    node.setNext(first);
                // 如果 node == null,以為著通過tryLock()獲得了鎖??
                else
                    // 構造新??元素
                    node = new HashEntry<K,V>(hash, key, value, first);
                // 判斷新添加此元素后,是否超過容量閾值
                int c = count + 1;
                if (c > threshold && tab.length < MAXIMUM_CAPACITY)
                    // 如果超過了容器的負載量,那么進行擴容
                    rehash(node);
                else
                    // 如果沒有容量夠用,那么就在指定的位置,
                    // 間接UNSAFE.putOrderedObject方法通過添加?元素
                    setEntryAt(tab, index, node);
                ++modCount;
                // 記錄??實際存放元素的大小
                count = c;
                oldValue = null; // 釋放舊值的引用,讓GC去處理吧
                break;
            }
        }
    } finally {
        // ??解鎖
        unlock();
    }
    return oldValue;
}
(獲取鎖失敗后,線程就會進入等待狀態(tài))此時通過自定義掃描、加鎖的方式來存放元素
private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
    // 通過當前段segment和指定key的哈希碼來獲取元素
    HashEntry<K,V> first = entryForHash(this, hash);
    HashEntry<K,V> e = first;
    HashEntry<K,V> node = null;
    // 重試次數(shù)
    int retries = -1;
    // 如果加鎖失敗,就循環(huán)
    while (!tryLock()) {
        HashEntry<K,V> f; // to recheck first below
        // 重試次數(shù)小于0表示重來都沒有創(chuàng)建過元素
        if (retries < 0) {
            // 指定哈希碼定位的位置存在null元素,表示此處沒有元素,下面就創(chuàng)建元素
            if (e == null) {
                // 如果node再為null的化,就大膽的創(chuàng)建元素吧
                if (node == null)
                    node = new HashEntry<K,V>(hash, key, value, null);
                // 賦值0,表示首次操作
                retries = 0;
            }
            // 指定哈希碼定位的位置有非空元素,那就比較key是否相同
            else if (key.equals(e.key))
                // 既然非空,就意味著創(chuàng)建過
                retries = 0;
            else
                // 遍歷鏈表中下一個元素
                e = e.next;
        }
        // 當達到最大重試次數(shù)時,加鎖跳出循環(huán)
        else if (++retries > MAX_SCAN_RETRIES) {
            lock();
            break;
        }
        // 在重試次數(shù)允許范圍內
        // (retries & 1) == 0 為true,表示retries為偶數(shù)(請注意上面做了++retries操作)
        else if ((retries & 1) == 0 &&
                 // 為true,意味著節(jié)點被更改了
                 (f = entryForHash(this, hash)) != first) {
            // 節(jié)點被更改了,那就替換芯節(jié)點
            e = first = f;
            // 再重新開始
            retries = -1;
        }
    }
    return node;
}
私有的擴容方法
/**
 * table(指的是HashEntrH<K, V>[])2倍擴容,
 * 然后在新的table重新排放所有的元素
 * (你看看,擴容多麻煩、又消耗性能,所以初始時一定要合理的指定初始容量)
 * 并且將給定的節(jié)點添加到散列表
 */
@SuppressWarnings("unchecked")
private void rehash(HashEntry<K,V> node) {
    // 當前的散列表(哈希是音譯,我覺得“散列”更能表達其真實含義)
    HashEntry<K,V>[] oldTable = table;
    // 當前散列表的長度(即容量)
    int oldCapacity = oldTable.length;
    // 新的容量(2倍于當前散列表的容量)
    int newCapacity = oldCapacity << 1;
    // 新閾值
    threshold = (int)(newCapacity * loadFactor);
    // 新的散列表
    HashEntry<K,V>[] newTable =
        (HashEntry<K,V>[]) new HashEntry[newCapacity];
    // 掩碼(數(shù)組的最大索引值)
    int sizeMask = newCapacity - 1;
    // 迭代數(shù)組(散列表),將當前散列表中的元素轉移到新的、擴容的散列表中
    for (int i = 0; i < oldCapacity ; i++) {
        HashEntry<K,V> e = oldTable[i];
        if (e != null) {
            // 如果哈希槽處(桶頂)的元素不為空,就準備遍歷這個鏈表
            // 提前獲取下一個節(jié)點的引用
            HashEntry<K,V> next = e.next;
            // 計算出該元素在新散列表中哈希槽的位置(在新數(shù)組中的索引)
            int idx = e.hash & sizeMask;
            // 桶頂處的下一個元素為空,意味著這是個單元素的鏈表,
            // 直接將元素移到新散列表中
            if (next == null)
                newTable[idx] = e;
            // 否則就意為著鏈表中有多個元素
            else {
                 // 轉移鏈表中元素的思路,就是遍歷這個鏈表,將元素一個一個的轉移
                HashEntry<K,V> lastRun = e; // 記錄??鏈表中的最后一個元素
                int lastIdx = idx; // 記錄??鏈表中的最后一個元素應該落在新散列表中的索引
                // 這里的for循環(huán)不是遍歷、轉移元素,而是獲取鏈表中最后一個元素的信息
                for (HashEntry<K,V> last = next;
                     last != null;
                     last = last.next) {
                    // 計算出該元素在新散列表中哈希槽的位置(在新數(shù)組中的索引)
                    // 直到遍歷鏈表中的最后一個元素,那么也就獲取到鏈表中的最后一個元素啦
                    int k = last.hash & sizeMask;
                    if (k != lastIdx) {
                        lastIdx = k;
                        lastRun = last;
                    }
                }
                // 把鏈表中的元素轉移到新鏈表中
                newTable[lastIdx] = lastRun;
                // 外層for循環(huán)的一開始已經(jīng)把鏈表中第一個元素(桶頂?shù)脑?轉移走了,
                // 下面就轉移接續(xù)的元素
                // 克隆剩下的元素節(jié)點
                for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
                    V v = p.value;
                    int h = p.hash;
                    int k = h & sizeMask;
                    HashEntry<K,V> n = newTable[k];
                    // 從外面看,這里是深復制(上面這個鏈表中的最后一個節(jié)點是淺復制)
                    newTable[k] = new HashEntry<K,V>(h, p.key, v, n);
                }
            }
        }
    }
    
    // 擴容的也擴容了,所有的元素也都轉移了
    // 這才開始添加用戶要添加的新節(jié)點
    int nodeIndex = node.hash & sizeMask;
    // 借助UNSAFE.putOrderedObject方法來實現(xiàn)節(jié)點的添加
    node.setNext(newTable[nodeIndex]);
    newTable[nodeIndex] = node;
    table = newTable;
}

六、刪除方法

刪除元素的思路:定位到 段segment ,segment 是用來存放 HashEntry散列表的,散列表table是實際存放元素的地方,然后再定位散列表的位置,判斷桶頂是否有元素,如果有的話再遍歷鏈表。刪除元素的最復雜的操作是刪除鏈中的元素(注意解鏈、再接鏈就可以了),可以參考這里關于LinkedList源碼分析更詳細的文章:
https://github.com/about-cloud/JavaCore
當然后面還會持續(xù)更新本文,有興趣可以關注上面GitHub文章。

原文持續(xù)更新鏈接: https://github.com/about-cloud/JavaCore

?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內容

友情鏈接更多精彩內容