ConcurrentHashMap (JDK7) 詳解

數(shù)據(jù)結(jié)構(gòu)



ConcurrentHashMap 實(shí)現(xiàn)并發(fā)操作的原理

使用了鎖分段技術(shù):ConcurrentHashMap持有一組鎖(segment[]),并將數(shù)據(jù)盡可能分散在不同的鎖段中(即,每個(gè)鎖只會(huì)控制部分的數(shù)據(jù)HashEntry[])。這樣如果寫(xiě)操作的數(shù)據(jù)分布在不同的鎖中,那么寫(xiě)操作將可并行操作。因此來(lái)實(shí)現(xiàn)一定數(shù)量(即,鎖數(shù)量)并發(fā)線程修改。
同時(shí)通過(guò)Unsafe.putOrderedObject、UNSAFE.getObjectVolatile(??這兩個(gè)方法很重要,下文會(huì)介紹)來(lái)操作segment[]、HashEntry[]的元素使得在提升了性能的情況下在并發(fā)環(huán)境下依舊能獲取到最新的數(shù)據(jù),同時(shí)HashEntry的value為volatile屬性,從而實(shí)現(xiàn)不加鎖的進(jìn)行并發(fā)的讀操作,并且對(duì)該并發(fā)量并無(wú)限制。
注意,中不使用volatile的屬性來(lái)實(shí)現(xiàn)segment[]和HashEntry[]在多線程間的可見(jiàn)性。因?yàn)槿绻切薷牟僮?,則在釋放鎖的時(shí)候就會(huì)將當(dāng)前線程緩存中的數(shù)據(jù)寫(xiě)到主存中,所以就無(wú)需在修改操作的過(guò)程中因修改volatile屬性字段而頻繁的寫(xiě)線程內(nèi)存數(shù)據(jù)到主存中。

源碼解析

重要屬性
//散列映射表的默認(rèn)初始容量為 16。
static final int DEFAULT_INITIAL_CAPACITY = 16;

//散列映射表的默認(rèn)裝載因子為 0.75,用于表示segment中包含的HashEntry元素的個(gè)數(shù)與HashEntry[]數(shù)組長(zhǎng)度的比值。當(dāng)某個(gè)segment中包含的HashEntry元素的個(gè)數(shù)超過(guò)了HashEntry[]數(shù)組的長(zhǎng)度與裝載因子的乘積時(shí),將觸發(fā)擴(kuò)容操作。
static final float DEFAULT_LOAD_FACTOR = 0.75f;

//散列表的默認(rèn)并發(fā)級(jí)別為 16。該值表示segment[]數(shù)組的長(zhǎng)度,也就是鎖的總數(shù)。
static final int DEFAULT_CONCURRENCY_LEVEL = 16;

//散列表的最大容量
static final int MAXIMUM_CAPACITY = 1 << 30;

//segment中HashEntry[]數(shù)組最小長(zhǎng)度
static final int MIN_SEGMENT_TABLE_CAPACITY = 2;

//散列表的最大段數(shù),也就是segment[]數(shù)組的最大長(zhǎng)度
static final int MAX_SEGMENTS = 1 << 16; // slightly conservative

//在執(zhí)行size()和containsValue(value)操作時(shí),ConcurrentHashMap的做法是先嘗試 RETRIES_BEFORE_LOCK 次( 即,2次 )通過(guò)不鎖住segment的方式來(lái)統(tǒng)計(jì)、查詢各個(gè)segment,如果2次執(zhí)行過(guò)程中,容器的modCount發(fā)生了變化,則再采用加鎖的方式來(lái)操作所有的segment
static final int RETRIES_BEFORE_LOCK = 2;

//segmentMask用于定位segment在segment[]數(shù)組中的位置。segmentMask是與運(yùn)算的掩碼,等于segment[]數(shù)組size減1,掩碼的二進(jìn)制各個(gè)位的值都是1( 因?yàn)?,?shù)組長(zhǎng)度總是2^N )。
final int segmentMask;

//segmentShift用于決定H(key)參與segmentMask與運(yùn)算的位數(shù)(高位),這里指在從segment[]數(shù)組定位segment:通過(guò)key的哈希結(jié)果的高位與segmentMask進(jìn)行與運(yùn)算哈希的結(jié)果。(詳見(jiàn)下面舉例)
final int segmentShift;

//Segment 類繼承于 ReentrantLock 類,從而使得 Segment 對(duì)象能充當(dāng)鎖的角色。
final ConcurrentHashMap.Segment<K, V>[] segments;


重要對(duì)象
  • ConcurrentHashMap.Segment
    Segment 類繼承于 ReentrantLock 類,從而使得 Segment 對(duì)象能充當(dāng)鎖的角色,并且是一個(gè)可重入鎖。每個(gè) Segment 對(duì)象維護(hù)其包含的若干個(gè)桶(即,HashEntry[])。
//最大自旋次數(shù),若是單核則為1,多核則為64。該字段用于scanAndLockForPut、scanAndLock方法
static final int MAX_SCAN_RETRIES =
    Runtime.getRuntime().availableProcessors() > 1 ? 64 : 1;
/**
 * table 是由 HashEntry 對(duì)象組成的數(shù)組
 * 如果散列時(shí)發(fā)生碰撞,碰撞的 HashEntry 對(duì)象就以鏈表的形式鏈接成一個(gè)鏈表
 * table 數(shù)組的元素代表散列映射表的一個(gè)桶
 * 每個(gè) table 守護(hù)整個(gè) ConcurrentHashMap 數(shù)據(jù)總數(shù)的一部分
 * 如果并發(fā)級(jí)別為 16,table 則維護(hù) ConcurrentHashMap 數(shù)據(jù)總數(shù)的 1/16 
 */
transient volatile HashEntry<K,V>[] table;
//segment中HashEntry的總數(shù)。 PS:注意JDK 7中該字段不是volatile的
transient int count;
//segment中數(shù)據(jù)被更新的次數(shù)
transient int modCount;
//當(dāng)table中包含的HashEntry元素的個(gè)數(shù)超過(guò)本變量值時(shí),觸發(fā)table的擴(kuò)容
transient int threshold;
//裝載因子
final float loadFactor;
  • ConcurrentHashMap.HashEntry
    HashEntry封裝了key-value對(duì),是一個(gè)單向鏈表結(jié)構(gòu),每個(gè)HashEntry節(jié)點(diǎn)都維護(hù)著next HashEntry節(jié)點(diǎn)的引用。
static final class HashEntry<K,V> 
final int hash;
final K key;
volatile V value;
//HashEntry鏈表中的下一個(gè)entry。PS:JDK 7中該字段不是final的,意味著該字段可修改,而且也確實(shí)在remove方法中對(duì)該地段進(jìn)行了修改
volatile HashEntry<K,V> next;
構(gòu)造方法
    public ConcurrentHashMap(int initialCapacity,
                             float loadFactor, int concurrencyLevel) {
        if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
            throw new IllegalArgumentException();
        if (concurrencyLevel > MAX_SEGMENTS)
            concurrencyLevel = MAX_SEGMENTS;
        // Find power-of-two sizes best matching arguments
        int sshift = 0;
        int ssize = 1;
        while (ssize < concurrencyLevel) {
            ++sshift;
            ssize <<= 1;
        }
        this.segmentShift = 32 - sshift;
        this.segmentMask = ssize - 1;
        if (initialCapacity > MAXIMUM_CAPACITY)
            initialCapacity = MAXIMUM_CAPACITY;
        int c = initialCapacity / ssize;
        if (c * ssize < initialCapacity)
            ++c;
        int cap = MIN_SEGMENT_TABLE_CAPACITY;
        while (cap < c)
            cap <<= 1;
        // create segments and segments[0]
        Segment<K,V> s0 =
            new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
                             (HashEntry<K,V>[])new HashEntry[cap]);
        Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
        UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
        this.segments = ss;
    }

a) 限制并發(fā)等級(jí)最大為MAX_SEGMENTS,即2^16。
b) 計(jì)算真實(shí)的并發(fā)等級(jí)ssize,必須是2的N次方,即 ssize( actual_concurrency_level ) >= concurrency_level。
舉例:concurrencyLevel等于14,15或16,ssize都會(huì)等于16,即容器里鎖的個(gè)數(shù)也是16。
Q:為什么數(shù)組的長(zhǎng)度都需要設(shè)計(jì)成2^N次方了?
A:這是因?yàn)樵卦跀?shù)組中的定位主要是通過(guò)H(key) & (數(shù)組長(zhǎng)度 - 1)方式實(shí)現(xiàn)的,這樣我們稱(數(shù)組長(zhǎng)度 - 1)為element_mask。那么假設(shè)有一個(gè)長(zhǎng)度為16和長(zhǎng)度為15的數(shù)組,他們element_mask分別為15和14。即array_16_element_mask = 15(二進(jìn)制”1111”);array_15_element_mask = 14(二進(jìn)制”1110”)。你會(huì)發(fā)現(xiàn)所以和”1110”進(jìn)行與操作結(jié)果的最后一位都是0,這就導(dǎo)致數(shù)組的’0001’、’0011’、’1001’、’0101’、’1101’、’0111’位置都無(wú)法存放數(shù)據(jù),這就導(dǎo)致了數(shù)組空間的浪費(fèi),以及數(shù)據(jù)沒(méi)有得到更好的分散。而使用array_16_element_mask = 15(二進(jìn)制”1111”)則不會(huì)有該問(wèn)題,數(shù)據(jù)可以分散到數(shù)組個(gè)每個(gè)索引位置。
c) sshift表示在通過(guò)H(key)來(lái)定位segment的index時(shí),參與到segmentMask掩碼與運(yùn)算的H(key)高位位數(shù)。
d) 計(jì)算每個(gè)Segment中HashEntry[]數(shù)組的長(zhǎng)度,根據(jù)數(shù)據(jù)均勻分配到各個(gè)segment的HashEntry[]中,并且數(shù)組長(zhǎng)度必須是2的N次方的思路來(lái)獲取。注意,HashEntry[]數(shù)組的長(zhǎng)度最小為2。
e) 創(chuàng)建一個(gè)Segment對(duì)象,將新建的Segment對(duì)象放入Segment[]數(shù)組中index為0的位置。這里只先構(gòu)建了Segnemt[]數(shù)組的一個(gè)元素,則其他index的元素在被使用到時(shí)通過(guò)ensureSegment(index)方法來(lái)構(gòu)建。

重要方法
  • segment的定位
        int h = hash(key);
        long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
        s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u))

通過(guò)key的二次哈希運(yùn)算后再進(jìn)行移位和與運(yùn)算得到key在segment[]數(shù)組中所對(duì)應(yīng)的segment
a) hash(key)

    private int hash(Object k) {
        int h = hashSeed;

        if ((0 != h) && (k instanceof String)) {
            return sun.misc.Hashing.stringHash32((String) k);
        }

        h ^= k.hashCode();

        // Spread bits to regularize both segment and index locations,
        // using variant of single-word Wang/Jenkins hash.
        h += (h <<  15) ^ 0xffffcd7d;
        h ^= (h >>> 10);
        h += (h <<   3);
        h ^= (h >>>  6);
        h += (h <<   2) + (h << 14);
        return h ^ (h >>> 16);
    }

??這里之所以需要將key.hashCode再進(jìn)行一次hash計(jì)算,是為了減少哈希沖突,使元素能夠均勻的分布在不同的Segment上,從而提高容器的存取效率。
b) 取hash(key)結(jié)果的(32 - segmentShift)位數(shù)的高位和segmentMask掩碼進(jìn)行與運(yùn)算。(其實(shí),與運(yùn)算時(shí),就是“hash(key)的高segmentMask(十進(jìn)制值)位"于“segmentMask的二進(jìn)制值”進(jìn)行與操作,此時(shí)進(jìn)行與操作的兩個(gè)數(shù)的有效二進(jìn)制位數(shù)是一樣的了。)
c) 將b)的結(jié)果j進(jìn)行 (j << SSHIFT) + SBASE 以得到key在segement[]數(shù)組中的位置
舉例:假如哈希的質(zhì)量差到極點(diǎn),那么所有的元素都在一個(gè)Segment中,不僅存取元素緩慢,分段鎖也會(huì)失去意義。我做了一個(gè)測(cè)試,不通過(guò)再哈希而直接執(zhí)行哈希計(jì)算。

        System.out.println(Integer.parseInt("0001111", 2) & 15);
        System.out.println(Integer.parseInt("0011111", 2) & 15);
        System.out.println(Integer.parseInt("0111111", 2) & 15);
        System.out.println(Integer.parseInt("1111111", 2) & 15);

計(jì)算后輸出的哈希值全是15,通過(guò)這個(gè)例子可以發(fā)現(xiàn)如果不進(jìn)行再哈希,哈希沖突會(huì)非常嚴(yán)重,因?yàn)橹灰臀灰粯?,無(wú)論高位是什么數(shù),其哈希值總是一樣。我們?cè)侔焉厦娴亩M(jìn)制數(shù)據(jù)進(jìn)行再哈希后結(jié)果如下,為了方便閱讀,不足32位的高位補(bǔ)了0,每隔四位用豎線分割下。

0100 0111 0110 0111 1101 1010 0100 1110
1111 0111 0100 0011 0000 0001 1011 1000
0111 0111 0110 1001 0100 0110 0011 1110
1000 0011 0000 0000 1100 1000 0001 1010

可以發(fā)現(xiàn)每一位的數(shù)據(jù)都散列開(kāi)了,通過(guò)這種再哈希能讓數(shù)字的每一位都能參加到哈希運(yùn)算當(dāng)中,從而減少哈希沖突。

  • HashEntry定位
int h = hash(key);
((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE)

主要通過(guò)對(duì)key進(jìn)行二次hash運(yùn)算,再講哈希結(jié)果和HashEntry[]的長(zhǎng)度掩碼進(jìn)行與運(yùn)算得到key所對(duì)應(yīng)的HashEntry在數(shù)組中的索引。HashEntry的定位和Segment的定位方式很像,但是HashEntry少了將hash(key)的結(jié)果進(jìn)行掩碼取高位后再與數(shù)組長(zhǎng)度與操作,而是直接將hash(key)的結(jié)果和數(shù)組長(zhǎng)度的掩碼進(jìn)行與操作。其目的是避免兩次哈希后的值一樣,導(dǎo)致元素雖然在Segment里散列開(kāi)了,但是卻沒(méi)有在HashEntry里散列開(kāi)( 也就是說(shuō),如果Segment和HashEntry的定位方式一樣,那么到同一個(gè)Segment的key都會(huì)落到該segment中的同一個(gè)HashEntry了 )。

  • Unsafe類中的putOrderedObject、getObjectVolatile方法
    getObjectVolatile:使用volatile讀的語(yǔ)義獲取數(shù)據(jù),也就是通過(guò)getObjectVolatile獲取數(shù)據(jù)時(shí)回去主存中獲取最新的數(shù)據(jù)放到線程的緩存中,這能保證正確的獲取最新的數(shù)據(jù)。
    putOrderedObject:為了控制特定條件下的指令重排序和內(nèi)存可見(jiàn)性問(wèn)題,Java編譯器使用一種叫內(nèi)存屏障(Memory Barrier,或叫做內(nèi)存柵欄,Memory Fence)的CPU指令來(lái)禁止指令重排序。java中volatile寫(xiě)入使用了內(nèi)存屏障中的LoadStore屏障規(guī)則,對(duì)于這樣的語(yǔ)句Load1; LoadStore; Store2,在Store2及后續(xù)寫(xiě)入操作被刷出前,保證Load1要讀取的數(shù)據(jù)被讀取完畢。volatile的寫(xiě)所插入的storeLoad是一個(gè)耗時(shí)的操作,因此出現(xiàn)了一個(gè)對(duì)volatile寫(xiě)的升級(jí)版本,利用lazySet方法進(jìn)行性能優(yōu)化,在實(shí)現(xiàn)上對(duì)volatile的寫(xiě)只會(huì)在之前插入StoreStore屏障,對(duì)于這樣的語(yǔ)句Store1; StoreStore; Store2,在Store2及后續(xù)寫(xiě)入操作執(zhí)行前,保證Store1的寫(xiě)入操作對(duì)其它處理器可見(jiàn),也就是按順序的寫(xiě)入。UNSAFE.putOrderedObject正是提供了這樣的語(yǔ)義,避免了寫(xiě)寫(xiě)指令重排序,但不保證內(nèi)存可見(jiàn)性,因此讀取時(shí)需借助volatile讀保證可見(jiàn)性。

  • ensureSegment(k)

    private Segment<K,V> ensureSegment(int k) {
        final Segment<K,V>[] ss = this.segments;
        long u = (k << SSHIFT) + SBASE; // raw offset
        Segment<K,V> seg;
        if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) {
            Segment<K,V> proto = ss[0]; // use segment 0 as prototype
            int cap = proto.table.length;
            float lf = proto.loadFactor;
            int threshold = (int)(cap * lf);
            HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap];
            if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
                == null) { // recheck
                Segment<K,V> s = new Segment<K,V>(lf, threshold, tab);
                while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
                       == null) {
                    if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))
                        break;
                }
            }
        }
        return seg;
    }

根據(jù)計(jì)算得到的index從segment[]數(shù)組中獲取segment,如果segment不存在,則創(chuàng)建一個(gè)segment并通過(guò)CAS算法放入segment[]數(shù)組中。這里的獲取和插入分別通過(guò)UNSAGE.getObjectVolatile(??保證獲取segment[]最新數(shù)據(jù))和UNSAFE.cmpareAndSwapObject(??保證原子性的將新建的segment插入segment[]數(shù)組,并使其他線程可見(jiàn))實(shí)現(xiàn),并不直接對(duì)segment[]數(shù)組操作。

  • HashEntry<K,V> scanAndLockForPut(K key, int hash, V value)
        private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
            HashEntry<K,V> first = entryForHash(this, hash);
            HashEntry<K,V> e = first;
            HashEntry<K,V> node = null;
            int retries = -1; // negative while locating node
            while (!tryLock()) {
                HashEntry<K,V> f; // to recheck first below
                if (retries < 0) {
                    if (e == null) {
                        if (node == null) // speculatively create node
                            node = new HashEntry<K,V>(hash, key, value, null);
                        retries = 0;
                    }
                    else if (key.equals(e.key))
                        retries = 0;
                    else
                        e = e.next;
                }
                else if (++retries > MAX_SCAN_RETRIES) {
                    lock();
                    break;
                }
                else if ((retries & 1) == 0 &&
                         (f = entryForHash(this, hash)) != first) {
                    e = first = f; // re-traverse if entry changed
                    retries = -1;
                }
            }
            return node;
        }

在put操作嘗試加鎖沒(méi)成功時(shí),不是直接進(jìn)入等待狀態(tài),而是調(diào)用??scanAndLockForPut()方法,該方法實(shí)現(xiàn)了:
a) 首次進(jìn)入該方法,重試次數(shù)retries初始值為-1。
b) 若retries為-1,則判斷查詢key對(duì)應(yīng)的HashEntry節(jié)點(diǎn)鏈中是否已經(jīng)存在了該節(jié)點(diǎn),如果還沒(méi)則預(yù)先創(chuàng)建一個(gè)新節(jié)點(diǎn)。然后將retries=0;
c) 然后嘗試MAX_SCAN_RETRIES次獲取鎖( 自旋鎖 ),如果依舊沒(méi)能成功獲得鎖,則進(jìn)入等待狀態(tài)(互斥鎖)。
JDK7嘗試使用自旋鎖來(lái)提升性能,好處在于:自旋鎖當(dāng)前的線程不會(huì)掛起,而是一直處于running狀態(tài),這樣一旦能夠獲得鎖時(shí)就key在不進(jìn)行上下文切換的情況下獲取到鎖。
d) 如果在嘗試MAX_SCAN_RETRIES次獲取鎖的過(guò)程,key對(duì)應(yīng)的entry發(fā)送了變化,則將嘗試次數(shù)重置為-1,從第b)步驟重新開(kāi)始

  • void scanAndLock(Object key, int hash)
        private void scanAndLock(Object key, int hash) {
            // similar to but simpler than scanAndLockForPut
            HashEntry<K,V> first = entryForHash(this, hash);
            HashEntry<K,V> e = first;
            int retries = -1;
            while (!tryLock()) {
                HashEntry<K,V> f;
                if (retries < 0) {
                    if (e == null || key.equals(e.key))
                        retries = 0;
                    else
                        e = e.next;
                }
                else if (++retries > MAX_SCAN_RETRIES) {
                    lock();
                    break;
                }
                else if ((retries & 1) == 0 &&
                         (f = entryForHash(this, hash)) != first) {
                    e = first = f;
                    retries = -1;
                }
            }
        }

在replace、remove操作嘗試加鎖沒(méi)成功時(shí),不是直接進(jìn)入等待狀態(tài),而是調(diào)用??scanAndLock()方法。該方法是實(shí)現(xiàn)和scanAndLockForPut()差不了多少,主要的區(qū)別在于scanAndLockForPut()方法在key對(duì)應(yīng)entry不存在時(shí)是不會(huì)去創(chuàng)建一個(gè)HashEntry對(duì)象的。

  • V get(Object key)
    public V get(Object key) {
        Segment<K,V> s; // manually integrate access methods to reduce overhead
        HashEntry<K,V>[] tab;
        int h = hash(key);
        long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
        if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
            (tab = s.table) != null) {
            for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
                     (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
                 e != null; e = e.next) {
                K k;
                if ((k = e.key) == key || (e.hash == h && key.equals(k)))
                    return e.value;
            }
        }
        return null;
    }

在JDK 7中g(shù)et的實(shí)現(xiàn)原理已經(jīng)和JDK 6不同了,JDK 6通過(guò)volatile實(shí)現(xiàn)多線程間內(nèi)存的可見(jiàn)性。而JDK 7為了提升性能,用UNSAFE.getObjectVolatile(...)來(lái)獲取segment[]數(shù)組和HashEntry[]數(shù)組中對(duì)應(yīng)index的最新值。同時(shí)值得說(shuō)明的是,當(dāng)volatile引用一個(gè)數(shù)組時(shí),數(shù)組中的元素是不具有volatile特性的,所以,也需要通過(guò)UNSAFE.getObjectVolatile(…)來(lái)獲取數(shù)組中真實(shí)的數(shù)據(jù)。

  • put操作
    public V put(K key, V value) {
        Segment<K,V> s;
        if (value == null)
            throw new NullPointerException();
        int hash = hash(key);
        int j = (hash >>> segmentShift) & segmentMask;
        if ((s = (Segment<K,V>)UNSAFE.getObject          // nonvolatile; recheck
             (segments, (j << SSHIFT) + SBASE)) == null) //  in ensureSegment
            s = ensureSegment(j);
        return s.put(key, hash, value, false);
    }

a) 通過(guò)key算出對(duì)應(yīng)的segment在segment[]中的位置,如果對(duì)應(yīng)的segment不存在,則創(chuàng)建。
b) 將key、value插入到segment中對(duì)應(yīng)的HashEntry中

        final V put(K key, int hash, V value, boolean onlyIfAbsent) {
            HashEntry<K,V> node = tryLock() ? null :
                scanAndLockForPut(key, hash, value);
            V oldValue;
            try {
                HashEntry<K,V>[] tab = table;
                int index = (tab.length - 1) & hash;
                HashEntry<K,V> first = entryAt(tab, index);
                for (HashEntry<K,V> e = first;;) {
                    if (e != null) {
                        K k;
                        if ((k = e.key) == key ||
                            (e.hash == hash && key.equals(k))) {
                            oldValue = e.value;
                            if (!onlyIfAbsent) {
                                e.value = value;
                                ++modCount;
                            }
                            break;
                        }
                        e = e.next;
                    }
                    else {
                        if (node != null)
                            node.setNext(first);
                        else
                            node = new HashEntry<K,V>(hash, key, value, first);
                        int c = count + 1;
                        if (c > threshold && tab.length < MAXIMUM_CAPACITY)
                            rehash(node);
                        else
                            setEntryAt(tab, index, node);
                        ++modCount;
                        count = c;
                        oldValue = null;
                        break;
                    }
                }
            } finally {
                unlock();
            }
            return oldValue;
        }

a) 嘗試獲得鎖,如果失敗,則調(diào)用scanAndLockForPut(...)通過(guò)自旋等待的方式獲得鎖。注意,這里鎖操作鎖的只是當(dāng)前這個(gè)segment,而不會(huì)影響segment[]數(shù)組中其他的segment對(duì)象的寫(xiě)操作。這是ConcurrentHashMap實(shí)現(xiàn)并發(fā)寫(xiě)操作的精髓所在。通過(guò)分段鎖來(lái)支持一定并發(fā)量的寫(xiě)操作,并通過(guò)volatile以及UNSAFE.getObjectVolatile、UNSAFE.putOrderedObject來(lái)實(shí)現(xiàn)不加鎖的讀操作,也就是支持任何并發(fā)量的讀操作。
b) 計(jì)算key應(yīng)插入的HashEntry在HashEntry[]數(shù)組的index,并通過(guò)UNSAFE.getObjectVolatile(...)方式獲取最新的到HashEntry對(duì)象
c) 判斷HashEntry鏈中是否已經(jīng)存在該key了,如果存在則將key的value替換成新值,并將modCount加1
d) 如果HashEntry鏈中不存在該key,則將key-value出入到HashEntry鏈頭處,并將count加1,但此時(shí)count還未更新到segment中。
e) 如果在count加1后發(fā)現(xiàn)目前HashEntry鏈表長(zhǎng)度以及達(dá)到了閾值并且HashEntry的鏈表長(zhǎng)度小于限制的最大長(zhǎng)度,則會(huì)進(jìn)行HashEntry的擴(kuò)容操作。注意,在JDK 7中是確定當(dāng)前put操作是會(huì)加入一個(gè)新節(jié)點(diǎn)情況下才會(huì)觸發(fā)擴(kuò)容操作,而在JDK 6中,可能存在put操作只是替換一個(gè)已經(jīng)存在的key的value值的情況下也會(huì)觸發(fā)擴(kuò)容操作。
f) 如果count加1未觸發(fā)閾值,則通過(guò)UNSAFE.putOrderedObject(…)方式將最新的HashEntry更新到HashEntry[]數(shù)組中。
g) 更新segment中的modCount、count值
h) 釋放鎖。釋放鎖的操作會(huì)將當(dāng)前線程緩存里的數(shù)據(jù)寫(xiě)到主存中。

  • rehash(HashEntry<K,V> node)
        private void rehash(HashEntry<K,V> node) {
            /*
             * Reclassify nodes in each list to new table.  Because we
             * are using power-of-two expansion, the elements from
             * each bin must either stay at same index, or move with a
             * power of two offset. We eliminate unnecessary node
             * creation by catching cases where old nodes can be
             * reused because their next fields won't change.
             * Statistically, at the default threshold, only about
             * one-sixth of them need cloning when a table
             * doubles. The nodes they replace will be garbage
             * collectable as soon as they are no longer referenced by
             * any reader thread that may be in the midst of
             * concurrently traversing table. Entry accesses use plain
             * array indexing because they are followed by volatile
             * table write.
             */
            HashEntry<K,V>[] oldTable = table;
            int oldCapacity = oldTable.length;
            int newCapacity = oldCapacity << 1;
            threshold = (int)(newCapacity * loadFactor);
            HashEntry<K,V>[] newTable =
                (HashEntry<K,V>[]) new HashEntry[newCapacity];
            int sizeMask = newCapacity - 1;
            for (int i = 0; i < oldCapacity ; i++) {
                HashEntry<K,V> e = oldTable[i];
                if (e != null) {
                    HashEntry<K,V> next = e.next;
                    int idx = e.hash & sizeMask;
                    if (next == null)   //  Single node on list
                        newTable[idx] = e;
                    else { // Reuse consecutive sequence at same slot
                        HashEntry<K,V> lastRun = e;
                        int lastIdx = idx;
                        for (HashEntry<K,V> last = next;
                             last != null;
                             last = last.next) {
                            int k = last.hash & sizeMask;
                            if (k != lastIdx) {
                                lastIdx = k;
                                lastRun = last;
                            }
                        }
                        newTable[lastIdx] = lastRun;
                        // Clone remaining nodes
                        for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
                            V v = p.value;
                            int h = p.hash;
                            int k = h & sizeMask;
                            HashEntry<K,V> n = newTable[k];
                            newTable[k] = new HashEntry<K,V>(h, p.key, v, n);
                        }
                    }
                }
            }
            int nodeIndex = node.hash & sizeMask; // add the new node
            node.setNext(newTable[nodeIndex]);
            newTable[nodeIndex] = node;
            table = newTable;
        }

當(dāng)HashEntry的數(shù)量達(dá)到閾值時(shí)就會(huì)觸發(fā)HashEntry[]數(shù)組的擴(kuò)容操作
a) 創(chuàng)建new HashEntry[]數(shù)組,new HashEntry[]數(shù)組的容量為old HashEntry的2倍
b) 設(shè)置新的閾值
c) 將old HashEntry[]數(shù)組中的內(nèi)容放入new HashEntry[]中,這并不是盲目的將元素一一取出然后計(jì)算元素在new HashEntry的位置,然后插入。這里Doug Lea做了一些優(yōu)化。

  • 如果old HashEntry[]數(shù)組的元素HashEntry鏈表,若該HashEntry鏈表的頭節(jié)點(diǎn)不存在next節(jié)點(diǎn),即說(shuō)明該HashEntry鏈表是個(gè)單節(jié)點(diǎn),則直接將HashEntry插入到new HashEntry[]數(shù)組對(duì)應(yīng)的位置中。

  • 因?yàn)閚ew HashEntry[]的length是old HashEntry[]的2倍,所以對(duì)應(yīng)的new sizeMask比old sizeMask多了old HashEntry[] length的大小( 比如,old_HashEntry_array_length為8,則old sizeMask為’0000 0111’;new_HashEntry_array_length為16,則new sizeMask為’0000 1111’)。所以元素在new HashEntry[]的new index要么和old index一樣,要么就是old_index + old_HashEntry_array_length。因此我們可通過(guò)對(duì)節(jié)點(diǎn)的復(fù)用來(lái)減少不必要的節(jié)點(diǎn)創(chuàng)建,通過(guò)計(jì)算每個(gè)HashEntry鏈表中每個(gè)entry的new index值,如果存在從某個(gè)entry開(kāi)始到該HashEntry鏈表末尾的所有entrys,它們的new index值都一樣,那么就該entry直接插入到new HashEntry[newIndex]中,當(dāng)然最壞的請(qǐng)求就是該entry就是HashEntry鏈的最后一個(gè)entry。然后只需重建HashEntry中該entry之前的到鏈表頭的entry節(jié)點(diǎn),分別將新構(gòu)建的entry插入到new HashEntry[]中。
    再者,經(jīng)統(tǒng)計(jì),在使用默認(rèn)閾值的情況下,一般只有1/6的節(jié)點(diǎn)需要重新構(gòu)建

  • 最后將當(dāng)前操作新構(gòu)建的節(jié)點(diǎn)加入到new HashEntry[]數(shù)組中
    d) old HashEntry如果沒(méi)有其他讀線程操作引用時(shí),將會(huì)盡快被垃圾回收。
    e) 擴(kuò)容操作因?yàn)橐匦聵?gòu)建正整個(gè)HashEntry[]數(shù)組,所以不需要通過(guò)UNSAFE.putOrderedObject(...)方式將元素插入一個(gè)已經(jīng)存在的HashEntry[]中,而是直接通過(guò)索引操作插入到new HashEntry[]數(shù)組就好,最后我們會(huì)將new HashEntry[]直接賦值給volatile tables字段,這樣就可以保證new HashEntry[]對(duì)其他線程可見(jiàn)了

  • remove操作

    public V remove(Object key) {
        int hash = hash(key);
        Segment<K,V> s = segmentForHash(hash);
        return s == null ? null : s.remove(key, hash, null);
    }

a) 根據(jù)key計(jì)算出該key對(duì)應(yīng)的segment在segment[]數(shù)組中的index,并獲取該segment。
b) 將key從該segment中移除

        final V remove(Object key, int hash, Object value) {
            if (!tryLock())
                scanAndLock(key, hash);
            V oldValue = null;
            try {
                HashEntry<K,V>[] tab = table;
                int index = (tab.length - 1) & hash;
                HashEntry<K,V> e = entryAt(tab, index);
                HashEntry<K,V> pred = null;
                while (e != null) {
                    K k;
                    HashEntry<K,V> next = e.next;
                    if ((k = e.key) == key ||
                        (e.hash == hash && key.equals(k))) {
                        V v = e.value;
                        if (value == null || value == v || value.equals(v)) {
                            if (pred == null)
                                setEntryAt(tab, index, next);
                            else
                                pred.setNext(next);
                            ++modCount;
                            --count;
                            oldValue = v;
                        }
                        break;
                    }
                    pred = e;
                    e = next;
                }
            } finally {
                unlock();
            }
            return oldValue;
        }

a) 嘗試獲得鎖,如果失敗則調(diào)用scanAndLock(...)通過(guò)自旋等待的方式獲得鎖。
b) 獲取key鎖對(duì)應(yīng)的HashEntry鏈表,并在該HashEntry中找到key對(duì)應(yīng)entry節(jié)點(diǎn)
c) 如果key對(duì)應(yīng)的節(jié)點(diǎn)是在HashEntry鏈表頭,則直接將key的next節(jié)點(diǎn)通過(guò)UNSAFE.putOrderedObject的方式這是為對(duì)HashEntry[]數(shù)組中對(duì)應(yīng)的位置,即使得next節(jié)點(diǎn)稱為成為鏈表頭。
d) 如果key不是HashEntry的鏈表頭節(jié)點(diǎn),則將key的前一個(gè)節(jié)點(diǎn)的next節(jié)點(diǎn)修改為key的next節(jié)點(diǎn)。額,這么說(shuō)太繞了,舉個(gè)例子吧~
key對(duì)應(yīng)的節(jié)點(diǎn):current_HashEntry;current_HashEntry的前一個(gè)節(jié)點(diǎn):pre_HashEntry;current_HashEntry的下一個(gè)節(jié)點(diǎn):next_HashEntry
刪除前:
pre_HashEntry.next ——> current_HashEntry
current_HashEntry.next ——> next_HashEntry
刪除后:
pre_HashEntry.next ——> next_HashEntry
e) 修改segment屬性:modCount加1,count減1
f) 釋放鎖

  • size()
    public int size() {
        // Try a few times to get accurate count. On failure due to
        // continuous async changes in table, resort to locking.
        final Segment<K,V>[] segments = this.segments;
        int size;
        boolean overflow; // true if size overflows 32 bits
        long sum;         // sum of modCounts
        long last = 0L;   // previous sum
        int retries = -1; // first iteration isn't retry
        try {
            for (;;) {
                if (retries++ == RETRIES_BEFORE_LOCK) {
                    for (int j = 0; j < segments.length; ++j)
                        ensureSegment(j).lock(); // force creation
                }
                sum = 0L;
                size = 0;
                overflow = false;
                for (int j = 0; j < segments.length; ++j) {
                    Segment<K,V> seg = segmentAt(segments, j);
                    if (seg != null) {
                        sum += seg.modCount;
                        int c = seg.count;
                        if (c < 0 || (size += c) < 0)
                            overflow = true;
                    }
                }
                if (sum == last)
                    break;
                last = sum;
            }
        } finally {
            if (retries > RETRIES_BEFORE_LOCK) {
                for (int j = 0; j < segments.length; ++j)
                    segmentAt(segments, j).unlock();
            }
        }
        return overflow ? Integer.MAX_VALUE : size;
    }

a) 會(huì)先嘗試RETRIES_BEFORE_LOCK次( 即2次 )不加鎖的情況下,將segment[]數(shù)組中每個(gè)segment的count累加,同時(shí)也會(huì)將每個(gè)segment的modCount進(jìn)行累加。如果兩次不加鎖的操作后,modCountSum值是一樣的,這說(shuō)明在這兩次累加segmentcount的過(guò)程中ConcurrentHashMap沒(méi)有發(fā)生結(jié)構(gòu)性變化,那么就直接返回累加的count值
b) 如果在兩次累加segment的count操作期間ConcurrentHashMap發(fā)生了結(jié)構(gòu)性改變,則會(huì)通過(guò)將所有的segment都加鎖,然后重新進(jìn)行count的累加操作。在完成count的累加操作后,釋放所有的鎖。最后返回累加的count值。
c) 注意,如果累加的count值大于了Integer.MAX_VALUE,則返回Integer.MAX_VALUE。

弱一致性

相對(duì)于HashMap的fast-fail,ConcurrentHashMap的迭代器并不會(huì)拋出ConcurrentModificationException異常。這是由于ConcurrentHashMap的讀行為是弱一致性的。
也就是說(shuō),在同時(shí)對(duì)一個(gè)segment進(jìn)行讀線程和寫(xiě)線程操作時(shí),并不保證寫(xiě)操作的行為能被并行允許的讀線程所感知。
比如,當(dāng)一個(gè)寫(xiě)線程和讀線程并發(fā)的對(duì)同一個(gè)key進(jìn)行操作時(shí):寫(xiě)線程在操作一個(gè)put操作,如果這個(gè)時(shí)候put的是一個(gè)已經(jīng)存在的key值,則會(huì)替換該key對(duì)應(yīng)的value值,因?yàn)関alue是volatile屬性的,所以該替換操作時(shí)能立即被讀線程感知的。但如果此時(shí)的put是要新插入一個(gè)entry,則存在兩種情況:①在寫(xiě)線程通過(guò)UNSAFE.putOrderedObject方式將新entry插入到HashEntry鏈表后,讀線程才通過(guò)UNSAFE.getObjectVolatile來(lái)獲取對(duì)應(yīng)的HashEntry鏈表,那么這個(gè)時(shí)候讀線程是能夠獲取到這個(gè)新插入的entry的;②反之,如果讀線程的UNSAFE.getObjectVolatile操作在寫(xiě)線程的UNSAFE.putOrderedObject之前,則就無(wú)法感知到這個(gè)新加入的entry了。
其實(shí)在大多數(shù)并發(fā)的業(yè)務(wù)邏輯下,我們是允許這樣的弱一致性存在的。如果你的業(yè)務(wù)邏輯不允許這樣的弱一致性存在的,你可以考慮對(duì)segment中的HashEntry鏈表的讀操作加鎖,或者將segment改造成讀寫(xiě)鎖模式。但這都將大大降低ConcurrentHashMap的性能并且使得你的程序變得復(fù)雜且難以維護(hù)。或許你該考慮使用其他的存儲(chǔ)模型代替ConcurrentHashMap。

后記

雖然JDK 8已經(jīng)出來(lái)很久了,但是我還是花了很多時(shí)間在JDK 7的ConcurrentHashMap上,一個(gè)很重要的原因是,我認(rèn)為ConcurrentHashMap在并發(fā)模式下的設(shè)計(jì)思想是很值得我們深究和學(xué)習(xí)的,無(wú)論是jdk7相對(duì)于jdk6的各種細(xì)節(jié)和性能上的優(yōu)化,還是jdk8的大改造都是對(duì)并發(fā)編程各種模式很好的學(xué)習(xí)。文章還有很多可以繼續(xù)深入挖掘的點(diǎn),希望在后期的學(xué)習(xí)中能夠繼續(xù)完善~

參考

http://www.infoq.com/cn/articles/ConcurrentHashMap/
http://www.blogjava.net/DLevin/archive/2013/10/18/405030.html
https://my.oschina.net/7001/blog/896587

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容