轉(zhuǎn):ConcurrentHashMap JDK 7 源碼分析

參考文章:http://www.itdecent.cn/p/bd972088a494

數(shù)據(jù)結(jié)構(gòu)

image

構(gòu)造函數(shù)

put 的實(shí)現(xiàn)

public V put(K key, V value) {
    Segment<K,V> s;
    if (value == null)
        throw new NullPointerException();
    int hash = hash(key);
    //定位Segment,讓Hash右移動(dòng)segmentShift位,默認(rèn)情況下就是28位(總長(zhǎng)32位),之后和segmentMask(0XFF)做與操作,得到段的索引
    int j = (hash >>> segmentShift) & segmentMask;
    //利用UNSAFE.getObject中的方法獲取到目標(biāo)的Segment。
    if ((s = (Segment<K,V>)UNSAFE.getObject          // nonvolatile; recheck
         (segments, (j << SSHIFT) + SBASE)) == null) //  in ensureSegment
        //如果沒有取到目標(biāo)Segment,所以需要保證能取到這個(gè)Segment,沒有的話創(chuàng)建一個(gè)Segment
        s = ensureSegment(j);
    //代理到Segment的put方法
    return s.put(key, hash, value, false);
}

首先是(Segment<K,V>)UNSAFE.getObject(segments, (j << SSHIFT) + SBASE)), UNSAFE這種用法是在JDK1.6中沒有的,主要是利用Native方法來(lái)快速的定位元素。

還有一個(gè)點(diǎn)是:ensureSegment()

private Segment<K,V> ensureSegment(int k) {
    final Segment<K,V>[] ss = this.segments;
    long u = (k << SSHIFT) + SBASE; // raw offset
    Segment<K,V> seg;
    //getObjectVolatile是以Volatile的方式獲得目標(biāo)的Segment,Volatile是為了保證可見性。
    if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) {
        //如果沒有取到,那么證明指定的Segment不存在,那么需要新建Segment,方式是以ss[0]為鏡像創(chuàng)建。
        Segment<K,V> proto = ss[0]; // use segment 0 as prototype
        int cap = proto.table.length;
        float lf = proto.loadFactor;
        int threshold = (int)(cap * lf);
        HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap];
        if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
            == null) { // 再次檢查
            Segment<K,V> s = new Segment<K,V>(lf, threshold, tab);//創(chuàng)建新Segment
            //以CAS的方式,將新建的Segment,set到指定的位置。
            while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
                   == null) {
                if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))
                    break;
            }
        }
    }
    return seg;
}

上面的代碼就是保證,在put之前,要保證目標(biāo)的Segment是存在的,不存在需要?jiǎng)?chuàng)建一個(gè)Segment。
put方法代理到了Segment的put方法,Segment extends 了ReentrantLock,以至于它能當(dāng)做一個(gè)Lock使用。那么我們看一下Segment的put的實(shí)現(xiàn):

進(jìn)入Segment的put操作時(shí)先進(jìn)行加鎖保護(hù)。如果加鎖沒有成功,調(diào)用scanAndLockForPut方法(詳細(xì)步驟見下面scanAndLockForPut()源碼分析)進(jìn)入自旋狀態(tài),該方法持續(xù)查找key對(duì)應(yīng)的節(jié)點(diǎn)鏈中是已存在該機(jī)節(jié)點(diǎn),如果沒有找到,則預(yù)創(chuàng)建一個(gè)新節(jié)點(diǎn),并且嘗試n次,直到嘗試次數(shù)操作限制,才真正進(jìn)入加鎖等待狀態(tài),自旋結(jié)束并返回節(jié)點(diǎn)(如果返回了一個(gè)非空節(jié)點(diǎn),則表示在鏈表中沒有找到相應(yīng)的節(jié)點(diǎn))。對(duì)最大嘗試次數(shù),目前的實(shí)現(xiàn)單核次數(shù)為1,多核為64。

final V put(K key, int hash, V value, boolean onlyIfAbsent) {
    //因?yàn)閜ut操作會(huì)改變整體的結(jié)構(gòu),所以需要保證段的線程安全性,所以首先tryLock
    HashEntry<K,V> node = tryLock() ? null :
        scanAndLockForPut(key, hash, value);
    V oldValue;
    try {
        //新建tab引用,避免直接引用Volatile導(dǎo)致性能損耗,
        HashEntry<K,V>[] tab = table;
        int index = (tab.length - 1) & hash;
        //Volatile讀,保證可見性
        HashEntry<K,V> first = entryAt(tab, index);
        for (HashEntry<K,V> e = first;;) {
            if (e != null) {
                K k;
                //遍歷HashEntry數(shù)組,尋找可替換的HashEntry
                if ((k = e.key) == key ||
                    (e.hash == hash && key.equals(k))) {
                    oldValue = e.value;
                    if (!onlyIfAbsent) {
                        e.value = value;
                        ++modCount;
                    }
                    break;
                }
                e = e.next;
            }
            else {
                //如果不存在可替換的HashEntry,如果在scanAndLockForPut中建立了此Node直接SetNext,追加到鏈表頭
                if (node != null)
                    node.setNext(first);
                else
                    //如果沒有則新建一個(gè)Node,添加到鏈表頭
                    node = new HashEntry<K,V>(hash, key, value, first);
                //容量計(jì)數(shù)+1
                int c = count + 1;
                //如果容量不足,那么擴(kuò)容
                if (c > threshold && tab.length < MAXIMUM_CAPACITY)
                    rehash(node);
                else
                    //以Volatile寫的方式,替換tab[index]的引用
                    setEntryAt(tab, index, node);
                ++modCount;
                count = c;
                oldValue = null;
                break;
            }
        }
    } finally {
        unlock();
    }
    return oldValue;
}

put方法是做了加鎖操作的,所以不用過多的考慮線程安全的問題,但是get操作為了保證性能是沒有加鎖的,所以需要盡量的保證數(shù)據(jù)的可見性,能讓get得到最新的數(shù)據(jù)。

讓我們看看 scanAndLockForPut(key, hash, value) 在做什么:

private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
    HashEntry<K,V> first = entryForHash(this, hash);
    HashEntry<K,V> e = first;
    HashEntry<K,V> node = null;
    int retries = -1; // negative while locating node
    while (!tryLock()) {
        HashEntry<K,V> f; // to recheck first below
        if (retries < 0) {
            if (e == null) {
                if (node == null) // speculatively create node
                    node = new HashEntry<K,V>(hash, key, value, null);
                retries = 0;
            }
            else if (key.equals(e.key))
                retries = 0;
            else
                e = e.next;
        }
        else if (++retries > MAX_SCAN_RETRIES) {
            lock();
            break;
        }
        else if ((retries & 1) == 0 &&
                 (f = entryForHash(this, hash)) != first) {
            e = first = f; // re-traverse if entry changed
            retries = -1;
        }
    }
    return node;
}

當(dāng)put操作嘗試加鎖沒成功時(shí),它不是直接進(jìn)入等待狀態(tài),而是調(diào)用了scanAndLockForPut()操作,該操作持續(xù)查找key對(duì)應(yīng)的節(jié)點(diǎn)鏈中是已存在該機(jī)節(jié)點(diǎn),如果沒有找到已存在的節(jié)點(diǎn),則預(yù)創(chuàng)建一個(gè)新節(jié)點(diǎn),并且嘗試n次,直到嘗試次數(shù)操作限制,才真正進(jìn)入等待狀態(tài),計(jì)所謂的自旋等待。對(duì)最大嘗試次數(shù),目前的實(shí)現(xiàn)單核次數(shù)為1,多核為64。

在這段邏輯中,它先獲取key對(duì)應(yīng)的節(jié)點(diǎn)鏈的頭,然后持續(xù)遍歷該鏈,如果節(jié)點(diǎn)鏈中不存在要插入的節(jié)點(diǎn),則預(yù)創(chuàng)建一個(gè)節(jié)點(diǎn),否則retries值資增,直到操作最大嘗試次數(shù)而進(jìn)入等待狀態(tài)。這里需要注意最后一個(gè)else if中的邏輯:當(dāng)在自旋過程中發(fā)現(xiàn)節(jié)點(diǎn)鏈的鏈頭發(fā)生了變化,則更新節(jié)點(diǎn)鏈的鏈頭,并重置retries值為-1,重新為嘗試獲取鎖而自旋遍歷。

為什么要這么做呢?為了事先做數(shù)據(jù)的緩存,讓這些數(shù)據(jù)緩存在CPU的cache中,這樣后續(xù)在使用時(shí)能避免Cache missing。ps:scanAndLockForPut有個(gè)孿生兄弟scanAndLock,作用都差不多。

和 JDK 1.6 的實(shí)現(xiàn)的不同:

V put(K key, int hash, V value, boolean onlyIfAbsent) {  
    lock();  
    try {  
        int c = count;  
        if (c++ > threshold) // ensure capacity  
            rehash();  
        HashEntry<K,V>[] tab = table;  
        int index = hash & (tab.length - 1);  
        HashEntry<K,V> first = tab[index];  
         HashEntry<K,V> e = first;  
         while (e != null && (e.hash != hash || !key.equals(e.key)))  
             e = e.next;  
   
         V oldValue;  
         if (e != null) {  
            oldValue = e.value;  
             if (!onlyIfAbsent)  
                e.value = value;  
         }  
         else {  
            oldValue = null;  
             ++modCount;  
            tab[index] = new HashEntry<K,V>(key, hash, first, value);  
            count = c; // write-volatile  
        }  
        return oldValue;  
    } finally {  
         unlock();  
    }  
}

JDK 1.6 的實(shí)現(xiàn)和 JDK 1.7 的實(shí)現(xiàn)比較相似,但是主要區(qū)別是,沒有使用一些 UNSAFE 的方法去保證內(nèi)存的可見性,而是通過一個(gè)Volatile變量——count去實(shí)現(xiàn)。在開始的時(shí)候讀count保證lock的內(nèi)存語(yǔ)意,最后寫count實(shí)現(xiàn)unlock的內(nèi)存語(yǔ)意。
但是這里存在一個(gè)問題,new HashEntry操作存在重排序問題,導(dǎo)致在getValue的時(shí)候tab[index]不為null,但是value為null。所以在 get 的時(shí)候會(huì)有一步重新檢查的步驟,如果 value 為 null 的話就把 segment 段加鎖在重新 get 一次。具體原因后面詳細(xì)說明。

get 的實(shí)現(xiàn)

public V get(Object key) {
    Segment<K,V> s; // manually integrate access methods to reduce overhead
    HashEntry<K,V>[] tab;
    int h = hash(key);
    long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
    if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
        (tab = s.table) != null) {
        for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
                 (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
             e != null; e = e.next) {
            K k;
            if ((k = e.key) == key || (e.hash == h && key.equals(k)))
                return e.value;
        }
    }
    return null;
}

可以看出來(lái),get方法很簡(jiǎn)單,同時(shí)get是沒有加鎖的,那么get是如何保證可見性的呢?首先獲取指定index的Segment,利用getObjectVolatile獲取指定index的first HashEntry,之后遍歷HashEntry鏈表,這里比較關(guān)鍵的是HashEntry的數(shù)據(jù)結(jié)構(gòu):

static final class HashEntry<K,V> {
    final int hash;
    final K key;
    volatile V value;
    volatile HashEntry<K,V> next;
}

兩個(gè)變量是volatile的,也就是說,兩個(gè)變量的讀寫能保證數(shù)據(jù)的可見性。
所以在變量HashEntry時(shí),總能保證得到最新的值。

JKD1.6的get方法的實(shí)現(xiàn):

V get(Object key, int hash) {  
    if (count != 0) { // read-volatile 當(dāng)前桶的數(shù)據(jù)個(gè)數(shù)是否為0 
        HashEntry<K,V> e = getFirst(hash);  得到頭節(jié)點(diǎn)
        while (e != null) {  
            if (e.hash == hash && key.equals(e.key)) {  
                V v = e.value;  
                if (v != null)  
                   return v;  
                return readValueUnderLock(e); // recheck  
            }  
            e = e.next;  
        }  
    }  
    return null;  
}

首先是讀取count變量,因?yàn)閮?nèi)存的可見性,總是能返回最新的結(jié)構(gòu),但是對(duì)于getFirst可能得到的是過時(shí)的HashEntry。接下來(lái)獲取到HashEntry之后getValue。但是這里為什么要做一個(gè)value的判空,原因就是上一步put的重排序問題,如果為null,那么只能加鎖,加鎖之后進(jìn)行重新讀取。但是這樣確實(shí)會(huì)帶來(lái)一些開銷。

為什么 JDK 1.6 的實(shí)現(xiàn)是弱一致性的?

這里比較重要的一點(diǎn)就是,為什么JDK1.6的是弱一致性的?因?yàn)镴DK1.6的所有可見性都是以count實(shí)現(xiàn)的,當(dāng)put和get并發(fā)時(shí),get可能獲取不到最新的結(jié)果,這就是JDK1.6中ConcurrentHashMap弱一致性問題,主要問題是 tab[index] = new HashEntry<K,V>(key, hash, first, value); 不一定 happened before getFirst(hash);

如下圖可以說明:

執(zhí)行put的線程 執(zhí)行g(shù)et的線程
⑧tab[index] = new HashEntry<K,V>(key, hash, first, value)
③if (count != 0)
②count = c
⑨HashEntry e = getFirst(hash);

對(duì)volatile的count的讀時(shí)間上發(fā)生在對(duì)count的寫之前,我們無(wú)法得出② hb ⑨這層關(guān)系了。因此,通過count變量,在這個(gè)軌跡上是無(wú)法得出⑧ hb ⑨的。

而JDK1.7的實(shí)現(xiàn),對(duì)于每一個(gè)操作都是Volatile變量的操作,能保證線程之間的可見性,所以不存在弱一致性的問題。(對(duì)這句話持有保留觀點(diǎn))

remove 的實(shí)現(xiàn)(具體實(shí)現(xiàn)以及和 1.6 對(duì)比要補(bǔ)上)

final V remove(Object key, int hash, Object value) {
    if (!tryLock())
        scanAndLock(key, hash);
    V oldValue = null;
    try {
        HashEntry<K,V>[] tab = table;
        int index = (tab.length - 1) & hash;
        HashEntry<K,V> e = entryAt(tab, index);
        HashEntry<K,V> pred = null;
        while (e != null) {
            K k;
            HashEntry<K,V> next = e.next;
            if ((k = e.key) == key ||
                (e.hash == hash && key.equals(k))) {
                V v = e.value;
                if (value == null || value == v || value.equals(v)) {
                    if (pred == null)
                        setEntryAt(tab, index, next);
                    else
                        pred.setNext(next);
                    ++modCount;
                    --count;
                    oldValue = v;
                }
                break;
            }
            pred = e;
            e = next;
        }
    } finally {
        unlock();
    }
    return oldValue;
}

這里的 remove 實(shí)現(xiàn)我在公司里看到一篇文章,但是記不清在哪里看到的了。大致上說的是,remove 操作的時(shí)候不需要向 JDK 1.6 的那樣,將要?jiǎng)h除的節(jié)點(diǎn)賦值一遍,然后刪除對(duì)應(yīng)的元素, 再將最后一個(gè)元素的 next 元素連接上去。 而是用了 CAS 的操作實(shí)現(xiàn)的刪除。

size 的實(shí)現(xiàn)

public int size() {
    // 主要的方法是先遍歷 2 次 Segment,如果兩次遍歷的結(jié)果得到的 size 相同
    //那么就認(rèn)為 size 的結(jié)果是準(zhǔn)確的,否則就要對(duì)每一個(gè) Segment 加鎖重新計(jì)算 size 的大小
    final Segment<K,V>[] segments = this.segments;
    int size;
    boolean overflow; // true if size overflows 32 bits
    long sum;         // sum of modCounts
    long last = 0L;   // previous sum
    int retries = -1; // first iteration isn't retry
    try {
        for (;;) {
            if (retries++ == RETRIES_BEFORE_LOCK) {
                for (int j = 0; j < segments.length; ++j)
                    ensureSegment(j).lock(); // force creation
            }
            sum = 0L;
            size = 0;
            overflow = false;
            for (int j = 0; j < segments.length; ++j) {
                Segment<K,V> seg = segmentAt(segments, j);
                if (seg != null) {
                    sum += seg.modCount;
                    int c = seg.count;
                    if (c < 0 || (size += c) < 0)
                        overflow = true;
                }
            }
            if (sum == last)
                break;
            last = sum;
        }
    } finally {
        if (retries > RETRIES_BEFORE_LOCK) {
            for (int j = 0; j < segments.length; ++j)
                segmentAt(segments, j).unlock();
        }
    }
    return overflow ? Integer.MAX_VALUE : size;
}

主要的方法是先遍歷 2 次 Segment,如果兩次遍歷的結(jié)果得到的 size 相同,那么就認(rèn)為 size 的結(jié)果是準(zhǔn)確的,否則就要對(duì)每一個(gè) Segment 加鎖重新計(jì)算 size 的大小。獲取鎖的操作在這里

if (retries++ == RETRIES_BEFORE_LOCK) {
    for (int j = 0; j < segments.length; ++j)
        ensureSegment(j).lock(); // force creation
}

JDK 6 和 7 的區(qū)別

總體來(lái)說變化挺多,不過總體的結(jié)構(gòu)并沒有發(fā)生改變,還是采用了 segment 分?jǐn)噫i的實(shí)現(xiàn)。 1.6 主要用 count 的 volatile 語(yǔ)義來(lái)實(shí)現(xiàn)put、get。1.7 主要使用 unsafe 的 CAS 操作來(lái)實(shí)現(xiàn)。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容