ConcurrentHashMap源碼分析(JDK1.8)——擴容

前言

在分析ConcurrentHashMap之前,希望大家先讀完HashMap的源碼,因為ConcurrentHashMap基本算法和HashMap是一致的,只是增加了并發(fā)控制而已,有了HashMap的基礎才能更好的理解ConcurrentHashMap,推薦大家先看看這兩篇文章:
HashMap的hash機制詳解
HashMap源碼分析

1. 重要成員

/**
     * 初始化和擴容標志,也是并發(fā)控制非常重要的一員,當sizeCtl<0時
     * 表明當前正在初始化或擴容,sizeCtl=-1正在初始化,sizeCtl<-1說明正在擴容
     * 而且此時sizeCtl = -(1+正在擴容的線程數(shù)量).  
     * 當還未進行初始化時sizeCtl為初始化容量大小,默認16,
     */
    private transient volatile int sizeCtl;

    /**
     * 擴容時使用
     */
    private transient volatile int transferIndex;


    /**
    *真正存儲數(shù)據(jù)的數(shù)組
    */
    transient volatile Node<K,V>[] table;

這里要注意,此處沒有了JDK1.7中的分段鎖的概念了,全部都是基于CAS的。

2. 并發(fā)基礎——CAS

整個ConcurrentHashMap完全沒有方法級別的鎖,到底是什么機制來保證并發(fā)的呢?這里簡單的介紹下。
首先大家對樂觀鎖和悲觀鎖要有個大致理解:

  • 悲觀鎖 :悲觀鎖認為競爭一定會發(fā)生,所以不管如何都會鎖住資源,不允許其他線程進入,sychorinized關鍵字就是標準的悲觀鎖
  • 樂觀鎖: 所謂的樂觀鎖就是認為競爭不一定會發(fā)生,比如有個變量A=3,我希望將它變成A=4,那么可以先比較 如果A=3,那么說明沒有其他線程競爭修改這個變量,我可以直接設置A=4, 這個比較和設置過程在硬件上是原子級別的,如果比較時發(fā)現(xiàn)A!=3,說明有其他線程修改了,這個情況會被返回,調用者可以針對這個情況特殊處理。
    我們看下ConcurrentHashMap是如何將一個Node節(jié)點放到數(shù)組table的一個位置上的:
    static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
                                        Node<K,V> c, Node<K,V> v) {
        return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
    }

這里重點關注傳入的c和v,整個compareAndSwapObject就是先比較tab數(shù)組上某個位置(通過內存偏移量算出來的) 上的節(jié)點是不是 c,如果是就認為沒有競爭,直接將該位置設置為 v,否則返回false。一般都是通過一個死循環(huán)來調用這個方法的,比如:

for (Node<K,V>[] tab = table;;) {  
       if (casTabAt(tab, i, null, r)) {
             //修改成功,會繼續(xù)執(zhí)行其他業(yè)務
            break;            
       }
       //修改失敗,會死循環(huán)下次重試
}

這種機制就是保證ConcurrentHashMap高效并發(fā)的基礎了。

由于ConcurrentHashMap處理hash沖突以及hash算法都是一樣的,所以這里一些基本功能不再分析,我們重點分析一些由于并發(fā)導致的和HashMap區(qū)別較大的方法。

3. 擴容

ConcurrentHashMap的擴容是支持多線程并發(fā)擴容的,所以擴容效率很高,在看源碼之前,我們先大致看下并發(fā)擴容的思想,擴容的核心就在于將舊的table數(shù)組中的數(shù)據(jù)遷移到新的數(shù)組中來。我們先看張圖:


并發(fā)擴容.png

之所以能并發(fā)擴容就在于這里,將現(xiàn)有的數(shù)據(jù)分成了幾部分,每個線程領一個自己的部分 ,線程領到了自己的部分后如何復制到新的數(shù)組的呢?


單個線程復制過程

對于擴容的單個線程來說,每次復制都是從尾部開始,一個節(jié)點復制完畢后會在這個位置放置一個ForwardingNode節(jié)點,表明這個節(jié)點已經(jīng)處理過了。

有了上述基礎我們再結合代碼分析.

3.1 確定每個線程擴容時負責的數(shù)組部分長度

        int n = tab.length, stride;
        if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
            stride = MIN_TRANSFER_STRIDE; // subdivide range

這里主要是根據(jù)cpu的數(shù)量來計算的,但是如果算出來小于16的話,stride=16,也就是說一個線程處理的數(shù)量最少是16.

3.2 申請新空間

擴容第一步就是申請新的table數(shù)組,這個和HashMap一樣,都是直接兩倍擴容:

       if (nextTab == null) {            // initiating
            try {
                @SuppressWarnings("unchecked")
                Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
                nextTab = nt;
            } catch (Throwable ex) {      // try to cope with OOME
                sizeCtl = Integer.MAX_VALUE;
                return;
            }
            nextTable = nextTab;
            transferIndex = n;
        }

3.2 遍歷自己負責的所有元素

        boolean advance = true;
        boolean finishing = false; // to ensure sweep before committing nextTab
        for (int i = 0, bound = 0;;) {
            Node<K,V> f; int fh;
            while (advance) {
                int nextIndex, nextBound;
                if (--i >= bound || finishing)   //分支1
                    advance = false;
                else if ((nextIndex = transferIndex) <= 0) {   //分支2
                    i = -1;
                    advance = false;
                }
                else if (U.compareAndSwapInt    // 分支3
                         (this, TRANSFERINDEX, nextIndex,
                          nextBound = (nextIndex > stride ?
                                       nextIndex - stride : 0))) {
                    bound = nextBound;
                    i = nextIndex - 1;
                    advance = false;
                }
            }
             //省略一些具體實現(xiàn) 
        }

這里以一個長度32的ConcurrentHashMap擴容到64為例,記住,之前在申請空間時:

        if (nextTab == null) {            // initiating
         //省略代碼
            nextTable = nextTab;
            transferIndex = n;
        }

這也就意味著,一個線程剛開始擴容時,transferIndex = 32, i=0,bound=0;,所以會進入分支3,
這個時候會通過CAS操作將transferIndex賦值為16,bound=16, i=31. 記住transferIndex,每個線程擴容起始位置都是由它決定的,這個線程將他改成了16,那么下個并發(fā)線程擴容就會從16開始了,從而做到每個線程負責自己的數(shù)據(jù)。最終大部分操作都會進入分支1,通過--i來遍歷該線程負責的部分數(shù)組,從而拷貝數(shù)據(jù)。

3.3 遷移table數(shù)組中的單個元素。

3.3.1 該位置沒有數(shù)據(jù)

          else if ((f = tabAt(tab, i)) == null)
                advance = casTabAt(tab, i, null, fwd);

這個時候無需拷貝,只要將再這個位置放置一個ForwardingNode節(jié)點即可。

3.3.2 該位置已經(jīng)被拷貝過了

else if ((fh = f.hash) == MOVED)
        advance = true; // already processed

此處的MOVED為-1,F(xiàn)orwardingNode的hash值都為-1,說明這個節(jié)點已經(jīng)被處理過了。所有數(shù)據(jù)拷貝完成后會重新遍歷一遍檢查,這個時候時會進入這個分支。

3.3.3 該位置是一個鏈表

            synchronized (f) {
                    if (tabAt(tab, i) == f) {
                        Node<K,V> ln, hn;
                        if (fh >= 0) {
                            int runBit = fh & n;
                            Node<K,V> lastRun = f;
                            for (Node<K,V> p = f.next; p != null; p = p.next) {
                                int b = p.hash & n;
                                if (b != runBit) {
                                    runBit = b;
                                    lastRun = p;
                                }
                            }
                            if (runBit == 0) {
                                ln = lastRun;
                                hn = null;
                            }
                            else {
                                hn = lastRun;
                                ln = null;
                            }
                            for (Node<K,V> p = f; p != lastRun; p = p.next) {
                                int ph = p.hash; K pk = p.key; V pv = p.val;
                                if ((ph & n) == 0)
                                    ln = new Node<K,V>(ph, pk, pv, ln);
                                else
                                    hn = new Node<K,V>(ph, pk, pv, hn);
                            }
                            setTabAt(nextTab, i, ln);
                            setTabAt(nextTab, i + n, hn);
                            setTabAt(tab, i, fwd);
                            advance = true;
                        }
              //省略部分代碼
            }

這里的思想其實和HashMap一樣(不熟悉的可以先看看本文開始推薦的兩篇文章),都是把鏈表拆成兩部分,一部分放在nextTab的i位置,一部分放在nextTab的i+n位置。
注意,這里使用了synchronized鎖住了當前節(jié)點,這也是一種沒辦法的事。但由于鎖住的只是一個節(jié)點,并不會影響到其他擴容線程。

3.3.4 該位置是一個紅黑樹

                  else if (f instanceof TreeBin) {
                            TreeBin<K,V> t = (TreeBin<K,V>)f;
                            TreeNode<K,V> lo = null, loTail = null;
                            TreeNode<K,V> hi = null, hiTail = null;
                            int lc = 0, hc = 0;
                            for (Node<K,V> e = t.first; e != null; e = e.next) {
                                int h = e.hash;
                                TreeNode<K,V> p = new TreeNode<K,V>
                                    (h, e.key, e.val, null, null);
                                if ((h & n) == 0) {
                                    if ((p.prev = loTail) == null)
                                        lo = p;
                                    else
                                        loTail.next = p;
                                    loTail = p;
                                    ++lc;
                                }
                                else {
                                    if ((p.prev = hiTail) == null)
                                        hi = p;
                                    else
                                        hiTail.next = p;
                                    hiTail = p;
                                    ++hc;
                                }
                            }
                            ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
                                (hc != 0) ? new TreeBin<K,V>(lo) : t;
                            hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
                                (lc != 0) ? new TreeBin<K,V>(hi) : t;
                            setTabAt(nextTab, i, ln);
                            setTabAt(nextTab, i + n, hn);
                            setTabAt(tab, i, fwd);
                            advance = true;
                        }

紅黑樹道理也是一樣,也是拆分成兩部分,但這里會統(tǒng)計放在nextTab的i位置的數(shù)量和i+n位置的數(shù)量,如果低于6會退化成鏈表。

3.3.5 擴容結束

          if (i < 0 || i >= n || i + n >= nextn) {
                int sc;
                if (finishing) {  //分支1
                    nextTable = null;
                    table = nextTab;
                    sizeCtl = (n << 1) - (n >>> 1);
                    return;
                }
                if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {  //分支2
                    if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                        //走到這里說明所有的擴容線程都結束了,也就是此次擴容徹底結束。
                        return;
                    finishing = advance = true;
                    i = n; // recheck before commit
                }
            }

當自己的部分數(shù)組都copy完畢后,通常會先進入分支2,將finishing置為true,i=n,由于此時i=n,會重新遍歷一遍自己負責的部分數(shù)組,確保每個節(jié)點都被復制了,最后會進入分支1,將sizeCtl 置為下次擴容的閾值,其實sizeCtl = n2-n/2 = 2n 0.75. 也就是新的容量乘以擴容因子。至此,此次該線程擴容結束。

4. 總結

總體而言,本文并沒有再重復介紹一些和HashMap中一樣的算法,比如具體的hash算法,比如如何判斷擴容時怎樣將一個鏈表拆成兩部分,主要介紹了思路以及并發(fā)相關的,個人理解,如果有錯誤懇請指正。

?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內容

友情鏈接更多精彩內容