ConcurrentHashMap 源碼深度解析(java7)——原來如此簡單(寫的真好,建議收藏)

一、前言

ConcurrentHashMap算是java基礎中非?;镜闹R點,不僅在日常開發(fā)中經(jīng)常用到,面試中也是經(jīng)久不衰的話題。它基本沿用HashMap的接口定義,使得即使不了解其底層原理,也能無縫切換。

談到ConcurrentHashMap,經(jīng)常會拿java7和java8的實現(xiàn)做對比。雖然現(xiàn)在java的版本更新很快,但是常用的還是java8,而看似java7的實現(xiàn)方式已經(jīng)過時了,好像沒必要去了解了,非也。

ConcurrentHashMap在java7中的實現(xiàn)有很多值得學習借鑒的地方,比如基本的數(shù)據(jù)結(jié)構(gòu)數(shù)組鏈表的應用,并發(fā)開發(fā),哈希算法等都可以學以致用。而且了解了java7的實現(xiàn)細節(jié),才能更好的明白java8中為什么要做一些看似莫名其妙的優(yōu)化?

輪子好用,但是造輪子更好玩。

  1. ConcurrentHashMap的數(shù)據(jù)結(jié)構(gòu)是怎樣的?
  2. ConcurrentHashMap的容量為什么是2的整數(shù)次方?
  3. 如何實現(xiàn)的并發(fā)安全?是讀寫分離嗎?get需要加鎖嗎?
  4. 哈希沖突體現(xiàn)在哪里,如何解決?
  5. 擴容思想是什么,怎么擴容?

帶著問題探索源碼,才能更容易get到作者的良苦用心。

二、初始化

ConcurrentHashMap-數(shù)據(jù)結(jié)構(gòu)

ConcurrentHashMap初始化可自定義傳入初始容量、負載因子和并發(fā)級別,但并不是單純將值賦值給一些成員變量,而是需要經(jīng)過計算,找到適合的初始值:

  • initialCapacity初始容量,用于計算每個SegmentHashEntry數(shù)組的初始長度。
  • loadFactor負載因子,用于計算每個SegmentHashEntry數(shù)組的擴容閾值。
  • concurrencyLevel并發(fā)級別,用于計算Segment數(shù)組的固定長度。

初始化過程主要是對Segment數(shù)組和其內(nèi)部的HashEntry數(shù)組進行初始化:

(1)通過concurrencyLevel計算ssize作為Segment數(shù)組的固定長度,需要確保計算的ssize是大于等于concurrencyLevel且離concurrencyLevel最近的2的整數(shù)次方的數(shù)值。如默認concurrencyLevel=16,則ssize=16;若自定義傳入concurrencyLevel=15,則ssize=16;concurrencyLevel=17,則ssize=32。

(2)計算用于對key的哈希值進行映射成數(shù)組下標的segmentShift(偏移量)和segmentMask(掩碼)(int index=(hash >>> segmentShift) & segmentMask):

  • segmentShift,key的哈希值右移segmentShift位,最高sshift位用于哈希映射。
  • segmentMask,ssize的掩碼,因為ssize為2的整數(shù)次方值,二進制為1后面若干0,掩碼就是ssize-1,如2(10)掩碼(1)、4(100)掩碼3(11)、8(1000)掩碼7(111)。

(3)通過initialCapacityssize平均計算每個Segment內(nèi)部HashEntry數(shù)組的初始長度,但并不是簡單的int c = initialCapacity / ssize,而是像計算ssize一樣,找到一個大于等于c且離c最近的2的整數(shù)次方數(shù)值。

(4)初始化第一個Segment,其他Segment等添加元素時用到了再創(chuàng)建。

(5)初始化Segment數(shù)組。

@SuppressWarnings("unchecked")
public ConcurrentHashMap(int initialCapacity,
                         float loadFactor, int concurrencyLevel) {
    if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
        throw new IllegalArgumentException();
    if (concurrencyLevel > MAX_SEGMENTS)
        concurrencyLevel = MAX_SEGMENTS;
    // Find power-of-two sizes best matching arguments
    int sshift = 0;
    int ssize = 1;
    while (ssize < concurrencyLevel) {
        ++sshift;
        // 1.保證并發(fā)級別是2的整數(shù)次方,即Segment<K,V>[]數(shù)組的長度
        // ssize = ssize << 1
        // ssize = ssize * (2^1)
        ssize <<= 1;
    }
    // 假設 ssize=16, sshift=4
    // segmentShift = 28
    // segmentMask = 15
    // 2.用于對key的哈希值進行映射計算成數(shù)組下標的偏移量和掩碼(int index=(hash >>> segmentShift) & segmentMask)。
    // segmentShift:key的哈希值右移segmentShift位,最高sshift位用于計算。
    // segmentMask:ssize的掩碼,因為ssize為2的整數(shù)次方值,二進制為1后面若干0,掩碼就是ssize-1,如2(10)掩碼(1)、4(100)掩碼3(11)、8(1000)掩碼7(111)。
    this.segmentShift = 32 - sshift; // 偏移量
    this.segmentMask = ssize - 1; // 掩碼
    if (initialCapacity > MAXIMUM_CAPACITY)
        initialCapacity = MAXIMUM_CAPACITY;
    // 3.Segment<K,V>[]平均計算每一個Segment內(nèi)部的HashEntry[]數(shù)組的長度
    // 但是為了確保HashEntry鏈表的長度也是2的整數(shù)次方,用同樣的方式找到離大于等于c且離c最近的2的整數(shù)次方的數(shù)
    int c = initialCapacity / ssize;
    if (c * ssize < initialCapacity)
        ++c;
    int cap = MIN_SEGMENT_TABLE_CAPACITY;
    while (cap < c)
        cap <<= 1;
    // 4.create segments and segments[0]
    // cap * loadFactor  HashEntry的容量 * loadFactor計算閾值
    // 構(gòu)造第0個Segment
    Segment<K,V> s0 =
            new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
                    (HashEntry<K,V>[])new HashEntry[cap]);
    // 5.Segment數(shù)組不可擴容
    Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
    UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
    this.segments = ss;
}

三、Segment是一個容器也是一把鎖

Segment繼承自ReentrantLock,所以其具備并發(fā)鎖的功能,在初始化時傳入concurrencyLevel計算而得的數(shù)組長度ssize,代表最多有ssize個鎖,也就決定了整個ConcurrentHashMap后續(xù)的最大并發(fā)級別是ssize(最多可以有ssize個線程獲取鎖操作容器)。

Segment內(nèi)部又包含了一個HashEntry數(shù)組,用于存儲key-value

從后續(xù)的擴容原理了解,Segment數(shù)組后續(xù)是不擴容的,真正擴容的是其內(nèi)部的HashEntry數(shù)組,所以Segment數(shù)組的長度的初始設置至關重要。

每一個Segment都有獨立的key-value增刪改和擴容機制,且操作前必須先獲取鎖以確保并發(fā)安全。

static final class Segment<K,V> extends ReentrantLock implements Serializable {
    private static final long serialVersionUID = 2249069246763182397L;
    
    /**
     * 用于在競爭鎖時最大自旋重試次數(shù)
     * The maximum number of times to tryLock in a prescan before
     * possibly blocking on acquire in preparation for a locked
     * segment operation. On multiprocessors, using a bounded
     * number of retries maintains cache acquired while locating
     * nodes.
     */
    static final int MAX_SCAN_RETRIES =
            Runtime.getRuntime().availableProcessors() > 1 ? 64 : 1;

    /**
     * 每一個HashEntry都是一個鏈表,然后組成一個HashEntry鏈表數(shù)組
     * The per-segment table. Elements are accessed via
     * entryAt/setEntryAt providing volatile semantics.
     */
    transient volatile HashEntry<K,V>[] table;

    /**
     * 元素的個數(shù),不是線程安全的
     * The number of elements. Accessed only either within locks
     * or among other volatile reads that maintain visibility.
     */
    transient int count;

    /**
     * 修改總次數(shù),線程不安全
     * The total number of mutative operations in this segment.
     * Even though this may overflows 32 bits, it provides
     * sufficient accuracy for stability checks in CHM isEmpty()
     * and size() methods.  Accessed only either within locks or
     * among other volatile reads that maintain visibility.
     */
    transient int modCount;

    /**
     * HashEntry數(shù)組的擴容閾值,(int)(cap * loadFactor)
     * count > threshold 且沒有達到最大元素數(shù)量就擴容
     * The table is rehashed when its size exceeds this threshold.
     * (The value of this field is always <tt>(int)(capacity *
     * loadFactor)</tt>.)
     */
    transient int threshold;

    /**
     * 裝載因子,所有的segments都是一樣的
     * The load factor for the hash table.  Even though this value
     * is same for all segments, it is replicated to avoid needing
     * links to outer object.
     * @serial
     */
     final float loadFactor;

    Segment(float lf, int threshold, HashEntry<K,V>[] tab) {
        this.loadFactor = lf;
        this.threshold = threshold;
        this.table = tab;
    }
    final V put(K key, int hash, V value, boolean onlyIfAbsent) {... ...}
    final V remove(Object key, int hash, Object value) {... ...}
    final boolean replace(K key, int hash, V oldValue, V newValue) {... ...}
    final V replace(K key, int hash, V value) {... ...}
    final void clear() {... ...}
}

四、HashEntry真正存放元素的鏈表數(shù)組

ConcurrentHashMap解決哈希沖突的方式是鏈表法,即將哈希沖突的節(jié)點鏈接成一個個鏈表。

基本屬性有final修飾的hashkey,一旦賦值不可修改,volatile修飾的valuenext,修改時可以遵循volatile的語義,寫happen-before讀,立即刷新主內(nèi)存。

static final class HashEntry<K,V> {
    final int hash;
    final K key;
    volatile V value;
    // java7之前,next是final修飾,
    // java7進行了優(yōu)化,在put時,若沒有搶到鎖,會自旋同時初始化一個HashEntry節(jié)點,為后面獲取鎖后節(jié)省時間
    volatile HashEntry<K,V> next;

    HashEntry(int hash, K key, V value, HashEntry<K,V> next) {
        this.hash = hash;
        this.key = key;
        this.value = value;
        this.next = next;
    }
    /**
     * Sets next field with volatile write semantics.  (See above
     * about use of putOrderedObject.)
     */
    final void setNext(HashEntry<K,V> n) {
        UNSAFE.putOrderedObject(this, nextOffset, n);
    }

    // Unsafe mechanics
    static final sun.misc.Unsafe UNSAFE;
    static final long nextOffset;
    static {
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class k = HashEntry.class;
            nextOffset = UNSAFE.objectFieldOffset
                    (k.getDeclaredField("next"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }
}

五、put添加元素

public V put(K key, V value) {
    Segment<K,V> s;
    if (value == null)
        throw new NullPointerException();
    int hash = hash(key); // 計算出對應的哈希值
    // 假設 ssize=16, sshift=4
    // segmentShift = 28
    // segmentMask = 15
    // 計算的hash向右移28位在與segmentMask作與操作,即以hash值的最高4位映射計算對應的Segment數(shù)組下標j
    // 找到對應Segment的位置j,如果該位置的segment還未設置則需要先初始化
    int j = (hash >>> segmentShift) & segmentMask;
    if ((s = (Segment<K,V>)UNSAFE.getObject          // nonvolatile; recheck
            (segments, (j << SSHIFT) + SBASE)) == null) //  in ensureSegment
        // cas 初始化segment
        s = ensureSegment(j);
    // segment的put內(nèi)部加鎖,所以是分段鎖
    return s.put(key, hash, value, false);
}

(1)添加元素時,首先對key進行哈希運算,hash()會對非字符串的key做一個補充哈希的處理(Wang/Jenkins hash變體),使得哈希值的高位和低位不相同,減少哈希沖突。

private transient final int hashSeed = randomHashSeed(this);
private int hash(Object k) {
    int h = hashSeed;
    if ((0 != h) && (k instanceof String)) {
        return sun.misc.Hashing.stringHash32((String) k);
    }
    // 小聲說,補充哈希的位運算沒看懂運算過程...
    h ^= k.hashCode();
    h += (h <<  15) ^ 0xffffcd7d;
    h ^= (h >>> 10);
    h += (h <<   3);
    h ^= (h >>>  6);
    h += (h <<   2) + (h << 14);
    return h ^ (h >>> 16);
}

(2)然后對key的哈希值進行映射計算((hash >>> segmentShift) & segmentMask)找到Segment數(shù)組對應下標。

key的哈希值是一個32位的數(shù)值,右移segmentShift位剩余高位和掩碼segmentMask&運算,這個過程相當于模運算((hash >>> segmentShift) % ssize),計算結(jié)果分布在[0,segmentMask]。

(3)為什么還要對hash映射的下標做(j << SSHIFT) + SBASE))運算呢?

因為通過UNSAFE.getObject可以從主內(nèi)存中獲取最新的Segment,而這個方法需要知道Segment內(nèi)存中的偏移量。同樣計算主內(nèi)存偏移量的方式在后續(xù)獲取HashEntry時也會用到。

映射Segment數(shù)組下標的過程運用了與運算以及利用2的整數(shù)次方數(shù)值減1就是掩碼的特性,而位運算要比通俗意義上的加減乘除的性能要高,這也是為什么Segment數(shù)組的長度必須是2的整數(shù)次方的原因??上攵髡咴诩氈δ┕?jié)上的極致性能追求。

(4)若找到的segment是空的,則先進行cas初始化ensureSegment,可以看出segments數(shù)組是懶加載:

private Segment<K,V> ensureSegment(int k) {
    final Segment<K,V>[] ss = this.segments;
    // 算出 內(nèi)存偏移量
    long u = (k << SSHIFT) + SBASE; // raw offset
    Segment<K,V> seg;
    if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) {
        // 獲取的seg為null則 開始初始化,獲取0位置的segment,取出基本屬性cap、lf、threshold
        Segment<K,V> proto = ss[0]; // use segment 0 as prototype
        int cap = proto.table.length;
        float lf = proto.loadFactor;
        int threshold = (int)(cap * lf);
        HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap];
        // 再檢查一次
        if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
                == null) { // recheck
            Segment<K,V> s = new Segment<K,V>(lf, threshold, tab);
            // 自旋cas設置segment,保證線程安全
            while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
                    == null) {
                if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))
                    break;
            }
        }
    }
    return seg;
}

1、Segment#put真正的操作元素

找到對應的segment后,就繼續(xù)對其內(nèi)部的HashEntry鏈表數(shù)組進行操作,這個過程中可能會產(chǎn)生哈希沖突,也可能需要擴容,而作者是如何解決實現(xiàn)的呢?

// java.util.concurrent.ConcurrentHashMap.Segment#put
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
    // (1)若獲取鎖,則node=null,沒有獲取鎖,也會做一些事情scanAndLockForPut:
    // 拿不到鎖,不立即阻塞,而是先自旋,若自旋到一定次數(shù)仍未拿到鎖,再調(diào)用lock()阻塞;
    // 在自旋的過程中遍歷鏈表,若發(fā)現(xiàn)沒有重復的節(jié)點,則提前新建一個節(jié)點,為后面再插入節(jié)省時間。
    HashEntry<K,V> node = tryLock() ? null :
            scanAndLockForPut(key, hash, value);
    V oldValue;
    try {
        HashEntry<K,V>[] tab = table;
        //(2)tab.length是2的整數(shù)次方,所以tab.length-1 的二進制就是若干1,對hash 做與運算,算出的index不會超出tab.length-1
        int index = (tab.length - 1) & hash;
        // 定位到第index個HashEntry
        HashEntry<K,V> first = entryAt(tab, index);
        for (HashEntry<K,V> e = first;;) {
            if (e != null) {
               // (3)e不為空則說明發(fā)生hash沖突,解決hash沖突的辦法:鏈表法,新節(jié)點作為鏈表的頭節(jié)點
                // 如果hash值算的不好,經(jīng)常發(fā)生hash沖突,就會造成某一個鏈表很長,性能就會很低。
                K k;
                // HashEntry key地址相等 or (hash值相等且key值相等)時
                if ((k = e.key) == key ||
                        (e.hash == hash && key.equals(k))) {
                    oldValue = e.value;
                    if (!onlyIfAbsent) {
                        // onlyIfAbsent 為false,則舊值替換為新值,然后break,否則直接break
                        e.value = value;
                        ++modCount;
                    }
                    break;
                    // put key存在時會新值替換舊值
                    // putIfAbsent key存在時不替換。
                }
                // 在e不為null的情況下,向下遍歷,直到找到HashEntry的末尾或者 一個key或者hash相等的HashEntry
                e = e.next;
            }
            else {
                if (node != null)
                    // node不為null,說明前面沒有搶到鎖時,順便初始化了node
                    // 同時node放在了HashEntry單列表的頭部
                    node.setNext(first);
                else
                    // 初始化一個node,并放在first鏈表的頭部
                    node = new HashEntry<K,V>(hash, key, value, first);
                int c = count + 1;
                if (c > threshold && tab.length < MAXIMUM_CAPACITY)
                    //(4) 因為count是對一個segment中的HashEntry節(jié)點個數(shù)的統(tǒng)計,
                    // 如果hash沖突嚴重,鍵值對只添加到HashEntry[]中某幾個HashEntry鏈表中,
                    // 就造成HashEntry[]有空閑位置,也會造成無味的擴容,內(nèi)存利用率持續(xù)下降
                    // count 超過了閾值,默認是HashEntry<K,V>[]初始容量*0.75
                    // 擴容
                    rehash(node);
                else
                   // (5)更新tab index位置的node
                    setEntryAt(tab, index, node);
                // modCount++
                ++modCount;
                count = c;
                oldValue = null;
                break;
            }
        }
    } finally {
        unlock();
    }
    return oldValue;
}

(1)segment本身是一把鎖,且為了確保put元素的過程是線程安全的,必須先嘗試獲取鎖。若tryLock()獲取鎖失敗,不會立即阻塞,而是執(zhí)行scanAndLockForPut,自旋重試一定次數(shù),并且在這個過程中不只是單純的自旋,還會初始化添加元素需要的節(jié)點,為后續(xù)獲取鎖后節(jié)省時間,這又是一處體現(xiàn)作者追求極致性能的地方。

/**
 * @return a new node if key not found, else null
 */
private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
    // 找到對應的HashEntry節(jié)點first
    HashEntry<K,V> first = entryForHash(this, hash);
    HashEntry<K,V> e = first;
    HashEntry<K,V> node = null;
    int retries = -1; // negative while locating node
    while (!tryLock()) {
        HashEntry<K,V> f; // to recheck first below
        if (retries < 0) {
            if (e == null) {
                // 若 找到的節(jié)點是空的,則進行初始化,為后面拿到鎖后,節(jié)省時間
                if (node == null) // speculatively create node
                    node = new HashEntry<K,V>(hash, key, value, null);
                // retries為0,防止重復初始化
                retries = 0;
            }
            else if (key.equals(e.key))
                // 若 找到節(jié)點不為空,且 key和準備添加的key相等
                retries = 0;
            else
                // e不為空且 key不相等,則向下遍歷該鏈表,e設置為下一個節(jié)點,
                // 直到找到下一個節(jié)點空的或者key相等的(可能是替換操作,無需初始化節(jié)點)
                e = e.next;
        }
        else if (++retries > MAX_SCAN_RETRIES) {
            // 自旋一定次數(shù)后lock阻塞
            lock();
            break;
        }
        else if ((retries & 1) == 0 &&
                (f = entryForHash(this, hash)) != first) {
            // (retries & 1) == 0 偶數(shù)次檢查頭節(jié)點是否有修改
            // 若first節(jié)點被修改了,則重置自旋重試機制,
            // 為什么鏈表的第一個節(jié)點會變呢,是因為,新增元素時在頭節(jié)點的位置添加。
            e = first = f; // re-traverse if entry changed
            retries = -1;
        }
    }
    return node;
}

(2)獲取鎖之后,hash映射HashEntry數(shù)組的下標(int index= (tab.length - 1) & hash),并獲取主內(nèi)存中的first節(jié)點(entryAt(tab, index))。

static final <K,V> HashEntry<K,V> entryAt(HashEntry<K,V>[] tab, int i) {
    return (tab == null) ? null :
            (HashEntry<K,V>) UNSAFE.getObjectVolatile
                    (tab, ((long)i << TSHIFT) + TBASE);
}

和映射Segment數(shù)組下標不同的是,這里并沒有對hash做移位操作,也就是映射HashEntry數(shù)組下標用了hash值的低位,映射Segment數(shù)組下標用了hash值的高位,這樣做的目的也是為了使得元素分布均勻,減少哈希沖突(hash()補充哈希使得哈希值高位和低位不同)。

(3)找到的first節(jié)點不為空,則發(fā)生了哈希沖突,需要遍歷鏈表,看看是否有key和hash相同的節(jié)點,有則判斷是否需要替換,不論是否需要替換,都不需要加入新節(jié)點,則結(jié)束本次put操作。若遍歷到末尾依然找不到相同的節(jié)點,則需要將新節(jié)點加到鏈表頭部(頭插法)。


ConcurrentHash-put

(4)元素個數(shù)count+1,若HashEntry數(shù)組元素count超出閾值且長度未達到最大值,則擴容rehash(node)。

(5)將新增的節(jié)點更新到主內(nèi)存對應位置(setEntryAt):

static final <K,V> void setEntryAt(HashEntry<K,V>[] tab, int i,
                                   HashEntry<K,V> e) {
    UNSAFE.putOrderedObject(tab, ((long)i << TSHIFT) + TBASE, e);
}

2、rehash擴容

擴容比較復雜重要,所以單獨拎出來探討,默認情況下當一個SegmentHashEntry數(shù)組的元素個數(shù)大于初始容量的3/4且小于最大長度時觸發(fā)擴容,從函數(shù)命名rehash()也可以看出是一個再哈希的過程:

private void rehash(HashEntry<K,V> node) {
    HashEntry<K,V>[] oldTable = table;
    int oldCapacity = oldTable.length;
    // newCapacity = oldCapacity * 2^1,即為擴容為原來的2倍
    int newCapacity = oldCapacity << 1;
    // 重新計算新的threshold
    threshold = (int)(newCapacity * loadFactor);
    HashEntry<K,V>[] newTable =
            (HashEntry<K,V>[]) new HashEntry[newCapacity];
    int sizeMask = newCapacity - 1;
    for (int i = 0; i < oldCapacity ; i++) {
        HashEntry<K,V> e = oldTable[i];
        if (e != null) {
            HashEntry<K,V> next = e.next;
            // 計算e在新數(shù)組中的位置
            // 對newCapacity - 1 做&運算
            int idx = e.hash & sizeMask;
            if (next == null)   //  Single node on list
                // next=null 說明是鏈表的最后一個節(jié)點了,直接賦值
                newTable[idx] = e;
            else { // Reuse consecutive sequence at same slot
                // 不是鏈表的最后一個節(jié)點,則需要尋找鏈表的最后一個元素
                HashEntry<K,V> lastRun = e;
                int lastIdx = idx;
                for (HashEntry<K,V> last = next;
                     last != null;
                     last = last.next) {
                    int k = last.hash & sizeMask;
                    if (k != lastIdx) {
                        lastIdx = k;
                        lastRun = last;
                    }
                }
                // 找到lastRun節(jié)點,因為lastRun后面的節(jié)點的hash都和lastRun一樣,
                // 所以可以直接把lastRun連同后面的節(jié)點一起復制到新數(shù)組的對應位置。
                newTable[lastIdx] = lastRun;
                // Clone remaining nodes
                // 而 lastRun前面的節(jié)點則需要重新計算在新數(shù)組中的位置,
                // 并拷貝(new HashEntry)到新數(shù)組中
                for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
                    V v = p.value;
                    int h = p.hash;
                    int k = h & sizeMask;
                    HashEntry<K,V> n = newTable[k];
                    newTable[k] = new HashEntry<K,V>(h, p.key, v, n);
                }
            }
        }
    }
    // 將新元素添加到數(shù)組中
    int nodeIndex = node.hash & sizeMask; // add the new node
    node.setNext(newTable[nodeIndex]);
    newTable[nodeIndex] = node;
    // 將新鏈表賦值給table, copy on write
    // 只能保證最終一致性,不能保證實時一致性
    table = newTable;
}

(1)首先新建一個數(shù)組,長度為舊數(shù)組的2倍(oldCapacity << 1)。

(2)按新數(shù)組的長度重新計算擴容閾值(threshold = (int)(newCapacity * loadFactor))。

(3)從舊數(shù)組復制遷移節(jié)點到新數(shù)組。這里又有一個追求極致性能的點,按道理舊數(shù)組中的節(jié)點都需要重新哈希然后映射到新數(shù)組,但是,作者做了一個小優(yōu)化,找到每條鏈表中最后一個與前一個節(jié)點哈希映射新數(shù)組下標不同的點,稱之為lastRun,這樣就把一個鏈表截成了兩半,lastRun之后節(jié)點的哈希映射結(jié)果和lastRun相同,所以只需要復制遷移lastRun節(jié)點即可,其后的節(jié)點可以順帶過去;而lastRun前的節(jié)點則還需要一個個重新和新數(shù)組做哈希映射并復制。如圖:

ConcurrentHashMap-reHash

(4)將新元素添加到新數(shù)組對應位置中。

(5)新數(shù)組賦值給舊數(shù)組(table = newTable),copy on write 思想,所以只能保證最終一致性,不能保證實時一致性,在擴容的過程中也不會影響get的使用。

六、remove刪除元素

刪除元素的方法有兩個:

remove(Object key),刪除鍵為key的元素。

remove(Object key, Object value),刪除鍵為key,值為value的元素。

public V remove(Object key) {
    int hash = hash(key);
    Segment<K,V> s = segmentForHash(hash);
    return s == null ? null : s.remove(key, hash, null);
}

public boolean remove(Object key, Object value) {
    int hash = hash(key);
    Segment<K,V> s;
    return value != null && (s = segmentForHash(hash)) != null &&
            s.remove(key, hash, value) != null;
}

代碼基本相同,都是先找到對應的Segment,然后調(diào)用Segmentremove方法。

private Segment<K,V> segmentForHash(int h) {
    // hash 映射 segments數(shù)組下標的同時 計算出 該segment在內(nèi)存中的偏移量
    long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
    // segments數(shù)組是沒有被volatile修飾的,使用getObjectVolatile,
    // 可為segments增加volatile語義
    return (Segment<K,V>) UNSAFE.getObjectVolatile(segments, u);
}

Segmentremove方法:

  • 首先嘗試獲取鎖,獲取失敗會自旋重試一定次數(shù)后依然沒有獲取鎖則阻塞。
  • 獲取鎖后,通過哈希映射找到對應的HashEntry節(jié)點e,若為空則說明沒必要刪除,若不為空則開始遍歷鏈表。
  • 找到key相同的節(jié)點,若有傳遞value,還需判斷該節(jié)點的value是否相同。
  • 刪除節(jié)點時需分兩種情況,若被刪除節(jié)點的前驅(qū)為空,則說明是從鏈表頭部刪除,被刪除的節(jié)點的next節(jié)點作為index位置的新頭節(jié)點;若被刪除節(jié)點的前驅(qū)不為空,則說明是從鏈表中間刪除,將被刪除節(jié)點的next節(jié)點鏈接到其前驅(qū)的next指針上。
  • 最后釋放鎖。
final V remove(Object key, int hash, Object value) {
    if (!tryLock())
        // 1.獲取鎖失敗,會自旋重試一段時間,如果還沒獲取鎖則阻塞
        scanAndLock(key, hash);
    V oldValue = null;
    try {
        HashEntry<K,V>[] tab = table;
        int index = (tab.length - 1) & hash;
        // 找到 對應的hashentry
        HashEntry<K,V> e = entryAt(tab, index);
        HashEntry<K,V> pred = null;
        while (e != null) {
            K k;
            HashEntry<K,V> next = e.next;
            if ((k = e.key) == key ||
                    (e.hash == hash && key.equals(k))) {
                V v = e.value;
                if (value == null || value == v || value.equals(v)) {
                    if (pred == null)
                        // 被刪節(jié)點的前驅(qū)為空,則說明刪除的節(jié)點是頭節(jié)點,
                        // 則被刪節(jié)點的next節(jié)點作為index位置的頭節(jié)點。
                        setEntryAt(tab, index, next);
                    else
                        // 若pred不為null,則是從鏈表中間位置刪除的節(jié)點,
                        // 可將刪除節(jié)點的pred與被刪除節(jié)點的next相連
                        pred.setNext(next);
                    ++modCount;
                    --count;
                    oldValue = v;
                }
                break;
            }
            pred = e;
            e = next;
        }
    } finally {
        unlock();
    }
    return oldValue;
}

scanAndLockscanAndLockForPut的目的基本一樣,scanAndLock少了初始化節(jié)點操作。

private void scanAndLock(Object key, int hash) {
    // similar to but simpler than scanAndLockForPut
    HashEntry<K,V> first = entryForHash(this, hash);
    HashEntry<K,V> e = first;
    int retries = -1;
    while (!tryLock()) {
        HashEntry<K,V> f;
        if (retries < 0) {
            if (e == null || key.equals(e.key))
                // e為null or key存在則置retries=0,開始自旋計數(shù)
                retries = 0;
            else
                e = e.next;
        }
        else if (++retries > MAX_SCAN_RETRIES) {
            // 超過最大自旋次數(shù),則阻塞
            lock();
            break;
        }
        else if ((retries & 1) == 0 &&
                (f = entryForHash(this, hash)) != first) {
            // 偶數(shù)次檢查頭節(jié)點被修改,則 retries=-1 重置重試機制
            e = first = f;
            retries = -1;
        }
    }
}

七、get獲取元素

replace方法較為簡單就不做分析了,來看看get方法:

  • 首先通過hash計算segment[]下標同時計算內(nèi)存偏移量。
  • getObjectVolatile從主內(nèi)存中獲取對應Segment。
  • 第二次hash計算找到HashEntry
  • 遍歷鏈表找到key相同的節(jié)點返回即可。

可以看到get沒有加鎖,為什么可以不加鎖呢?

  • put添加元素時更改的是鏈表的頭節(jié)點不會影響get的遍歷,且putremove修改的是HashEntrynext指針,nextvolatile修飾,replace修改的是HashEntryvalue,valuevolatile修飾,都是利用volatile語義(寫happen-before讀),使得修改后立即刷新主內(nèi)存,并且通知其他線程獲取到最新值。
  • put操作中若發(fā)生擴容,其利用了copy on write思想,在擴容沒有完前,get獲取的數(shù)據(jù)都是一份獨立的舊數(shù)據(jù);又因為SegmentHashEntry數(shù)組被volatile修飾,擴容完成后重新賦值table會立即刷新主內(nèi)存,通知其他線程獲取最新值。
public V get(Object key) {
    Segment<K,V> s; // manually integrate access methods to reduce overhead
    HashEntry<K,V>[] tab;
    int h = hash(key);
    // 計算segment[]索引的同時計算內(nèi)存偏移量
    long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
    // 第一次hash計算找到segment,
    // getObjectVolatile 加上volatile語義,強制從主存中獲取屬性值。
    // 這個方法要求被使用的屬性被volatile or final(具有happen-before的修飾符)修飾,否則功能和getObject方法相同。
    if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
            (tab = s.table) != null) {
        // 第二次hash計算找到HashEntry
        for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
                (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
             e != null; e = e.next) {
            K k;
            // key地址相等 or (hash相等&& key值相等)
            if ((k = e.key) == key || (e.hash == h && key.equals(k)))
                return e.value;
        }
    }
    return null;
}

八、size獲取元素個數(shù)

一個ConcurrentHashMap被分成了多個Segment,那獲取元素的個數(shù)就是所有Segment中元素個數(shù)之和。而size的過程中有可能在put、remove等影響每個Segment內(nèi)部元素個數(shù)操作,所以需要一個個獲取鎖來統(tǒng)計。但是這樣一個不太重要的size操作把整個Segment數(shù)組鎖住豈不是非常影響寫性能。

所以作者用了一種樂觀鎖的方式,假設在結(jié)算size的過程Segment內(nèi)部沒有發(fā)生修改操作,如果發(fā)生了修改則重試重新計算。

判斷Segment內(nèi)部沒有發(fā)生修改的方式是比對最近兩次總的修改次數(shù)是否一致。而重試也不是無限重試,而是重試2次,加上初始的一次就是3次。

重試3次之后依然檢查到Segment內(nèi)部有修改,則遍歷segments數(shù)組加鎖統(tǒng)計。假設第3次重試和第4次加鎖統(tǒng)計的修改總次數(shù)sum相等則結(jié)束統(tǒng)計返回size,若不相等則第5次統(tǒng)計(重試次數(shù)不等2了,則不會發(fā)生鎖重入)。

public int size() {
    // Try a few times to get accurate count. On failure due to
    // continuous async changes in table, resort to locking.
    final Segment<K,V>[] segments = this.segments;
    // 統(tǒng)計所有segment中元素的個數(shù)
    int size;
    // size的長度超過了32位,代表溢出了,設置為true
    boolean overflow; // true if size overflows 32 bits
    // 統(tǒng)計所有segment中修改的次數(shù),后續(xù)可判斷segments是否有修改
    long sum;         // sum of modCounts
    // 最近的一次sum
    long last = 0L;   // previous sum
    // 重試次數(shù)
    int retries = -1; // first iteration isn't retry
    try {
        for (;;) {
            // RETRIES_BEFORE_LOCK = 2
            // 重試三次,樂觀的認為在size的時候,segments內(nèi)部沒有變動
            // 注意retries++
            if (retries++ == RETRIES_BEFORE_LOCK) {
                // 超過三次重試次數(shù),遍歷segments數(shù)組,分別獲取鎖,
                for (int j = 0; j < segments.length; ++j)
                    ensureSegment(j).lock(); // force creation
            }
            sum = 0L;
            size = 0;
            overflow = false;
            for (int j = 0; j < segments.length; ++j) {
                // 遍歷獲取內(nèi)存中的Segment
                Segment<K,V> seg = segmentAt(segments, j);
                if (seg != null) {
                    // 統(tǒng)計總修改次數(shù)sum
                    sum += seg.modCount;
                    // 統(tǒng)計元素個數(shù),并判斷是否有溢出
                    int c = seg.count;
                    if (c < 0 || (size += c) < 0)
                        overflow = true;
                }
            }
            // 統(tǒng)計的修改總次數(shù)sum和上次記錄的相同則停止重試
            if (sum == last)
                break;
            last = sum;
        }
    } finally {
        // 若重試超過了三次,說明分別獲取過鎖,則需要遍歷釋放鎖
        if (retries > RETRIES_BEFORE_LOCK) {
            for (int j = 0; j < segments.length; ++j)
                segmentAt(segments, j).unlock();
        }
    }
    // 返回size,若size溢出則為Integer.MAX_VALUE
    return overflow ? Integer.MAX_VALUE : size;
}

九、總結(jié)

看完java7的ConcurrentHashMap源碼,了解了其實現(xiàn)原理后,心里的疑云基本都解開了:

  1. ConcurrentHashMapsegment數(shù)組的長度以及HashEntry數(shù)組的長度之所以要保持為2的整數(shù)次方就是為了利用2的整數(shù)次方數(shù)值減1就是掩碼的特性,哈希值與掩碼做與運算相當于模運算來快速映射計算數(shù)組的下標。
  2. ConcurrentHashMap使用分段鎖的方式,每一個segment相等于一把鎖,在修改map時首先會先獲取鎖。而get也不需要加鎖的原因是HashEntrynext指針,value都是volatile修飾的,可以很好的利用其寫happen-before讀的語義,以及修改后立即刷新到主內(nèi)存并通知其他線程獲取最新值。
  3. 哈希沖突的體現(xiàn)不只是在鏈表中,還是在第一次哈希映射segment數(shù)組時,多個元素到同一個segment也算是哈希沖突。而解決哈希沖突的方式是鏈表法。
  4. 哈希映射segment數(shù)組下標時用了哈希值的高位,而哈希映射HashEntry數(shù)組下標時用的哈希值低位,為的是盡可能利用哈希值,使得元素節(jié)點分布均勻減少沖突。
  5. 擴容是容量擴大為原來的2倍,然后遍歷每個元素重新哈希映射到新數(shù)組中,但是作者有一個小優(yōu)化就是把一個鏈表截成兩半,以lastRun為界,lastRun后面的節(jié)點因為和lastRun哈希映射新數(shù)組的結(jié)果一樣,所以可以跟隨lastRun一起復制到新數(shù)組,而lastRun之前的節(jié)點則需要一個個重新哈希映射。
  6. size統(tǒng)計所有segment的元素個數(shù),以樂觀重試的方式判斷segment內(nèi)部是否有修改,最近兩次修改次數(shù)一致則返回統(tǒng)計的size。
  7. segment數(shù)組一旦初始化后期不可擴容。

PS: 如若文章中有錯誤理解,歡迎批評指正,同時非常期待你的評論、點贊和收藏。我是徐同學,愿與你共同進步!

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容