ConcurrentHashMap源碼分析

前言

本文中的ConcurrentHashMap源碼分析版本是jdk1.8;

問題

在對(duì)ConcurrentHashMap進(jìn)行源碼分析之前,首先需要確定幾個(gè)問題,也就是ConcurrentHashMap相對(duì)HashMap來說需要解決的問題:

    1. 如何保證高并發(fā)時(shí)數(shù)據(jù)插入的同步性以及高效性?
    1. 如何保證高并發(fā)場(chǎng)景下擴(kuò)容時(shí),不會(huì)重復(fù)擴(kuò)容?
    1. 如何保證高并發(fā)場(chǎng)景下擴(kuò)容時(shí),如何處理數(shù)據(jù)讀取和插入?

ConcurrentHashMap常量說明

要理解ConcurrentHashMap,首先需要對(duì)其常量和變量有一定認(rèn)識(shí),必須知道這是用來干什么的,什么場(chǎng)景下使用;

  • MAXIMUM_CAPACITY = 1 << 30;表示哈希表最大容量為2^30,因?yàn)?2位bit中前兩位bit是被用作控制目的的;
  • DEFAULT_CAPACITY = 16;表示哈希表默認(rèn)容量是16,
  • MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;表示虛擬機(jī)棧數(shù)組最大長(zhǎng)度,JDK1.8版本后concurrenthashMap只是在toArray相關(guān)的方法使用到;
  • DEFAULT_CONCURRENCY_LEVEL = 16;默認(rèn)并發(fā)等級(jí),當(dāng)前版本不怎么使用,主要是為了兼容之前版本;
  • LOAD_FACTOR = 0.75f;哈希表加載因子,計(jì)算加載因子容量時(shí)可以利用n-(n>>2)表達(dá)式輕松計(jì)算出來;
  • TREEIFY_THRESHOLD = 8; bin鏈表轉(zhuǎn)化成紅黑樹的閾值;超過該值后查找紅黑樹相對(duì)鏈表來說要效率高些

bin:箱子或者柜子,可以理解為裝node的箱子,每個(gè)箱子里面掛著一個(gè)node的鏈表或者紅黑樹;

  • UNTREEIFY_THRESHOLD = 6;bin紅黑樹轉(zhuǎn)換成鏈表的閾值;當(dāng)nodes數(shù)量小于該值時(shí)紅黑樹將轉(zhuǎn)化為鏈表;這時(shí)查找鏈表速率更快些;
  • MIN_TREEIFY_CAPACITY = 64;表示哈希表容量大于該值時(shí)才會(huì)將鏈表轉(zhuǎn)換成紅黑樹,這是bin鏈表轉(zhuǎn)換成紅黑樹的第二個(gè)條件(如果一個(gè)bin中nodes太多的話,會(huì)選擇擴(kuò)容而不是轉(zhuǎn)化成紅黑樹);

為什么是64呢?因?yàn)楸苊庠跀U(kuò)容和轉(zhuǎn)換成紅黑樹時(shí)的沖突,因?yàn)楣1砣萘枯^小時(shí)是比較容易進(jìn)行擴(kuò)容操作的,如果一個(gè)bin中nodes比較多就轉(zhuǎn)換成紅黑樹,那么發(fā)生擴(kuò)容之后,又得轉(zhuǎn)換成鏈表,會(huì)造成很多不必要的計(jì)算,所以為了平衡兩者的沖突,需要確定一個(gè)數(shù)組容量閾值;

  • MIN_TRANSFER_STRIDE = 16;表示擴(kuò)容時(shí)一個(gè)線程需要轉(zhuǎn)移bins的最小長(zhǎng)度為16,因?yàn)閿U(kuò)容時(shí)如果bins長(zhǎng)度比較大,那么可以借助多線程并發(fā)轉(zhuǎn)移bins,轉(zhuǎn)移bins指的是擴(kuò)容后根據(jù)bin中node的hash值與擴(kuò)容的數(shù)組長(zhǎng)度相與后確定新的bin,然后將該node轉(zhuǎn)移到新的bin去;
  • RESIZE_STAMP_BITS = 16;每次擴(kuò)容時(shí)用來在sizeCtl中記錄擴(kuò)容戳的位數(shù),不同輪次的擴(kuò)容操作的擴(kuò)容戳都是不同的,擴(kuò)容戳可以保證多次擴(kuò)容操作不會(huì)交叉重疊,該變量不是常量也不提供修改方法;
  • MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1;表示擴(kuò)容時(shí)最多允許2^16-1個(gè)線程進(jìn)行transfer工作;
  • RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;擴(kuò)容戳移位量,擴(kuò)容時(shí)將擴(kuò)容戳進(jìn)行移位用來當(dāng)做擴(kuò)容線程計(jì)數(shù)的基數(shù),通過反向移位后可以計(jì)算出擴(kuò)容戳;
  • MOVED = -1,表示擴(kuò)容時(shí)FORWARDING節(jié)點(diǎn)的hash值,F(xiàn)ORWARDING節(jié)點(diǎn)是一個(gè)臨時(shí)節(jié)點(diǎn),只在擴(kuò)容時(shí)出現(xiàn),如果舊數(shù)組中某個(gè)bin(箱子)的nodes全部都遷移到了新數(shù)組中,舊數(shù)組就會(huì)在該bin頭部放一個(gè)forwarding節(jié)點(diǎn);如果讀線程在進(jìn)行讀操作或者迭代讀時(shí)遇到了該節(jié)點(diǎn),就會(huì)被轉(zhuǎn)移到新數(shù)組上執(zhí)行讀操作,如果寫線程遇到該節(jié)點(diǎn),則會(huì)嘗試協(xié)助擴(kuò)容;
  • TREEBIN=-2,表示TreeBin的hash值,TreeBin是ConcurrentHashMap中用來代理操作TreeNode的特殊節(jié)點(diǎn),擁有存儲(chǔ)實(shí)際數(shù)據(jù)的紅黑樹的根節(jié)點(diǎn);
  • RESERVED = -3;保留節(jié)點(diǎn),也就是站位符,一般情況下不會(huì)出現(xiàn),在jdk1.8版本中新的函數(shù)式方法computeIfAbsent和compute中才會(huì)出現(xiàn);
  • HASH_BITS=0x7ffffff;用于將節(jié)點(diǎn)hash值為負(fù)數(shù)的轉(zhuǎn)化為正數(shù),hashtable中利用hash值定位bin也是利用該方法;

ConcurrentHashMap變量說明

  • volatile Node<K,V>[] nextTable;nextTabel!=null表示當(dāng)前ConcurrentHashMap正在擴(kuò)容,還有些擴(kuò)容線程沒有完全退出;

  • volatile int sizeCtl;用來控制ConcurrentHashMap初始化和擴(kuò)容的;
    sizeCtl=-1表示進(jìn)行初始化;
    sizeCtl=-(1+n)時(shí)表示擴(kuò)容時(shí)的活躍線程數(shù)為n,n+1為sizeCtl的低32 - RESIZE_STAMP_BITS位值代表的數(shù)值,如只有一個(gè)線程第一次進(jìn)行擴(kuò)容時(shí),sizeCtl=-2145714174,那么其二進(jìn)制為
    -2145714174=1000 0000 0001 1011 0000 0000 0000 0010,
    其低32 - RESIZE_STAMP_BITS=32-16=16位的補(bǔ)碼為0000 0000 0000 0010,剛好為2;
    如果該值減少2如果使得活躍線程位數(shù)代表的值為零,說明擴(kuò)容已經(jīng)結(jié)束;
    sizeCtl>0時(shí)表示初始化中正在使用的數(shù)組初始容量值或者初始化后下一次初始化數(shù)組需要的容量值;
    sizeCtl=0表示初始化中數(shù)組默認(rèn)使用的是初始化容量值;

  • volatile int transferIndex:表示上一個(gè)transfer任務(wù)結(jié)束的位置或者數(shù)組長(zhǎng)度值,ConcurrentHashMap將bin數(shù)組劃分為長(zhǎng)度為stride的m(m=n%stride)個(gè)transfer任務(wù),第k(1<=K<=m)個(gè)transfer任務(wù)處理[(k-1)* stride,k* stride-1]個(gè)bins,而且為了盡量避免和迭代處理相同的bin,transfer任務(wù)是從后往前執(zhí)行的,所以每次設(shè)置transferIndex都是k*stride,第一次設(shè)置transferIndex為bin數(shù)組的長(zhǎng)度n,從n-1開始處理bin轉(zhuǎn)移;第二次transferIndex則為n-stride;每次都是通過cas機(jī)制避免多線程處理同一個(gè)transfer任務(wù);

  • volatile long baseCount:計(jì)數(shù)器基本值,主要在沒有多線程競(jìng)爭(zhēng)情況下使用,需要通過CAS機(jī)制更新;

  • volatile CounterCell[] counterCells:CAS自旋鎖標(biāo)志位,用于初始化,或者counterCells擴(kuò)容時(shí)

  • volatile int cellsBusy:高并發(fā)的計(jì)數(shù)單元,如果進(jìn)行了初始化,那么長(zhǎng)度和bin數(shù)組一樣都是2^n;

ConcurrentHashMap方法解析

putVal()解析

putVal有三個(gè)參數(shù)K key,V value,boolean onlyIfAbsent,onlyIfAbsent默認(rèn)為false,當(dāng)putIfAbsent方法調(diào)用時(shí)則為ture;

  final V putVal(K key, V value, boolean onlyIfAbsent) {
        if (key == null || value == null) throw new NullPointerException();
        //將key的hashcode進(jìn)行平鋪,也就是為了避免hash值低位與數(shù)組長(zhǎng)度n-1相與可能會(huì)有更多的hash collide,因此將hash值高16位和低16位先進(jìn)行或運(yùn)算
        int hash = spread(key.hashCode());
        int binCount = 0;//用來計(jì)算bin鏈表長(zhǎng)度
        for (Node<K,V>[] tab = table;;) {
            Node<K,V> f; int n, i, fh; K fk; V fv;
            if (tab == null || (n = tab.length) == 0)//初始化
                tab = initTable();
            //將hash和數(shù)組長(zhǎng)度n-1進(jìn)行與運(yùn)算找到bin的下標(biāo)i,然后對(duì)該bin第一個(gè)Node進(jìn)行判斷
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {//如果該node為空,則cas添加一個(gè)新node,成功則退出循環(huán),否則繼續(xù);
                if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value)))
                    break;                   // no lock when adding to empty bin
            }
            else if ((fh = f.hash) == MOVED)//如果node的hash為moved,那么表示該bin正在擴(kuò)容中,則協(xié)助擴(kuò)容
                tab = helpTransfer(tab, f);
            else if (onlyIfAbsent // check first node without acquiring lock
                     && fh == hash
                     && ((fk = f.key) == key || (fk != null && key.equals(fk)))
                     && (fv = f.val) != null)//判斷bin頭node如果不能替換且old key和new key一致,則直接退出
                return fv;
            else {//如果bin header node可以替換或者不等于key不一致
                V oldVal = null;
                synchronized (f) {//利用synchronized方法將header node加鎖,避免競(jìng)爭(zhēng)造成一致性
                    if (tabAt(tab, i) == f) {//如果header node尚未沒有發(fā)生變化,避免另有線程更新該header node
                        if (fh >= 0) {//hash值大于零表示,該node是普通node
                            binCount = 1;
                            for (Node<K,V> e = f;; ++binCount) {//從header node開始迭代找到key,hash值一致的node進(jìn)行替換,找不到則在tail添加新node
                                K ek;
                                if (e.hash == hash &&
                                    ((ek = e.key) == key ||
                                     (ek != null && key.equals(ek)))) {
                                    oldVal = e.val;
                                    if (!onlyIfAbsent)
                                        e.val = value;
                                    break;
                                }
                                Node<K,V> pred = e;
                                if ((e = e.next) == null) {
                                    pred.next = new Node<K,V>(hash, key, value);
                                    break;
                                }
                            }
                        }
                        else if (f instanceof TreeBin) {//如果header node是紅黑樹TreeBin
                            Node<K,V> p;
                            binCount = 2;
                            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                           value)) != null) {
                                oldVal = p.val;
                                if (!onlyIfAbsent)
                                    p.val = value;
                            }
                        }
                        else if (f instanceof ReservationNode)//保留節(jié)點(diǎn)一般情況不太可能出現(xiàn)
                            throw new IllegalStateException("Recursive update");
                    }
                }
                if (binCount != 0) {//如果binCount大于零即bin鏈表長(zhǎng)度大于零
                    if (binCount >= TREEIFY_THRESHOLD)//判斷是否大于紅黑樹轉(zhuǎn)化閾值
                        treeifyBin(tab, i);
                    if (oldVal != null)
                        return oldVal;
                    break;
                }
            }
        }
        addCount(1L, binCount);//添加到高效計(jì)數(shù)器,另外根據(jù)binCount判斷是否需要協(xié)助擴(kuò)容或者紅黑樹轉(zhuǎn)換
        return null;
    }

transfer方法說明

transfer方法其實(shí)劃分為3個(gè)階段:

    1. 根據(jù)cpu核數(shù)計(jì)算transfer任務(wù)步長(zhǎng)stride和初始化nextTab;
    1. 當(dāng)前線程申請(qǐng)transfer任務(wù),transfer任務(wù)劃分如下圖:


      transfer任務(wù).png
    1. 擴(kuò)容操作
      源碼解析如下:
  private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
        int n = tab.length, stride;
         //計(jì)算transfer任務(wù)步長(zhǎng)stride,最小為16,多線程情況下為n/(8*NCPU)        
        if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
            stride = MIN_TRANSFER_STRIDE; // subdivide range
        if (nextTab == null) {            // nextTab初始化
            try {
                @SuppressWarnings("unchecked")
                Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
                nextTab = nt;
            } catch (Throwable ex) {      // try to cope with OOME
                sizeCtl = Integer.MAX_VALUE;
                return;
            }
            nextTable = nextTab;//長(zhǎng)度增加兩倍后復(fù)制給nextTable
            transferIndex = n;//一開始transferIndex等于原數(shù)組末尾index+1處
        }
        int nextn = nextTab.length;
        ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);//轉(zhuǎn)移節(jié)點(diǎn)
        boolean advance = true;//表示是否還能繼續(xù)執(zhí)行申請(qǐng)transfer任務(wù)
        boolean finishing = false; // 在提交nextTable之前用來保證所有transfer任務(wù)都已經(jīng)完成
        for (int i = 0, bound = 0;;) {//i表示當(dāng)前transfer任務(wù)初始化開始bin位置,bound表示當(dāng)前transfer任務(wù)初始化結(jié)束bin的下標(biāo);
            Node<K,V> f; int fh;
            while (advance) {
                int nextIndex, nextBound;//定義下一個(gè)transfer任務(wù)開始bin的下標(biāo)和結(jié)束bin的下標(biāo);
                if (--i >= bound || finishing)
                    advance = false;
                else if ((nextIndex = transferIndex) <= 0) {//如果nextIndex已經(jīng)不大于零,那么說明transfer任務(wù)已經(jīng)結(jié)束;
                    i = -1;
                    advance = false;
                }
                else if (U.compareAndSetInt
                         (this, TRANSFERINDEX, nextIndex,
                          nextBound = (nextIndex > stride ?
                                       nextIndex - stride : 0))) {//嘗試申請(qǐng)下一個(gè)transfer任務(wù)
                    //申請(qǐng)成功,則將bound賦值開始和結(jié)束bin的下標(biāo)位置(i,bound)
                    bound = nextBound;
                    i = nextIndex - 1;
                    advance = false;
                }
            }
            //i<0表示transfer任務(wù)已經(jīng)結(jié)束,i大于n表示存在新的擴(kuò)容任務(wù),
            if (i < 0 || i >= n || i + n >= nextn) {
                int sc;
                if (finishing) {//執(zhí)行本輪擴(kuò)容的收尾工作
                    nextTable = null;
                    table = nextTab;
                    sizeCtl = (n << 1) - (n >>> 1);//將原數(shù)組長(zhǎng)度擴(kuò)大兩倍-減去一半即是新數(shù)組長(zhǎng)度的0.75;
                    return;
                }
                if (U.compareAndSetInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {//嘗試將sizeCtl減1,表明需要減少一個(gè)活躍擴(kuò)容線程
                   //sc-2如果除了擴(kuò)容戳以外的低位都是零的話,那么表示當(dāng)前線程是否是最后一個(gè)擴(kuò)容線程,如果是直接返回
                    if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                        return;
                    finishing = advance = true;
                    i = n; // recheck before commit//重新檢查先是否任務(wù)分配的hash bins都已經(jīng)轉(zhuǎn)移完畢
                }
            }
            else if ((f = tabAt(tab, i)) == null)//如果當(dāng)前bin為null,則直接放置一個(gè)forwardnode表明是擴(kuò)容轉(zhuǎn)移
                advance = casTabAt(tab, i, null, fwd);
            else if ((fh = f.hash) == MOVED)//如果當(dāng)前bin是forwardnode,那么表示出現(xiàn)了擴(kuò)容重疊,需要重新申請(qǐng)transfer任務(wù)
                advance = true; // already processed
            else {
                synchronized (f) {
                    if (tabAt(tab, i) == f) {
                        Node<K,V> ln, hn;
                        if (fh >= 0) {
                            //n=2^k,表示第k位為1,而對(duì)于數(shù)組長(zhǎng)度為n,hash&n時(shí)第k位為零說明是新數(shù)組低位bin,而第k為1則說明是新數(shù)組高位bin;
                            int runBit = fh & n;
                            Node<K,V> lastRun = f;
                            for (Node<K,V> p = f.next; p != null; p = p.next) {//找到最后一個(gè)node
                                int b = p.hash & n;
                                if (b != runBit) {
                                    runBit = b;
                                    lastRun = p;
                                }
                            }
                            if (runBit == 0) {
                                ln = lastRun;
                                hn = null;
                            }
                            else {
                                hn = lastRun;
                                ln = null;
                            }
                            for (Node<K,V> p = f; p != lastRun; p = p.next) {//根據(jù)lastRun生成node鏈
                                int ph = p.hash; K pk = p.key; V pv = p.val;
                                if ((ph & n) == 0)
                                    ln = new Node<K,V>(ph, pk, pv, ln);
                                else
                                    hn = new Node<K,V>(ph, pk, pv, hn);
                            }
                            setTabAt(nextTab, i, ln);
                            setTabAt(nextTab, i + n, hn);
                            setTabAt(tab, i, fwd);
                            advance = true;
                        }
                        else if (f instanceof TreeBin) {//按照紅黑樹的方式轉(zhuǎn)移bin
                            TreeBin<K,V> t = (TreeBin<K,V>)f;
                            TreeNode<K,V> lo = null, loTail = null;
                            TreeNode<K,V> hi = null, hiTail = null;
                            int lc = 0, hc = 0;
                            for (Node<K,V> e = t.first; e != null; e = e.next) {
                                int h = e.hash;
                                TreeNode<K,V> p = new TreeNode<K,V>
                                    (h, e.key, e.val, null, null);
                                if ((h & n) == 0) {
                                    if ((p.prev = loTail) == null)
                                        lo = p;
                                    else
                                        loTail.next = p;
                                    loTail = p;
                                    ++lc;
                                }
                                else {
                                    if ((p.prev = hiTail) == null)
                                        hi = p;
                                    else
                                        hiTail.next = p;
                                    hiTail = p;
                                    ++hc;
                                }
                            }
                            ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
                                (hc != 0) ? new TreeBin<K,V>(lo) : t;
                            hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
                                (lc != 0) ? new TreeBin<K,V>(hi) : t;
                            setTabAt(nextTab, i, ln);
                            setTabAt(nextTab, i + n, hn);
                            setTabAt(tab, i, fwd);
                            advance = true;
                        }
                    }
                }
            }
        }
    }

resizeStamp(n)解析

resizeStamp方法是用來生成擴(kuò)容戳的,對(duì)于不同數(shù)組長(zhǎng)度n,則擴(kuò)容戳不一樣,同時(shí)每個(gè)擴(kuò)容戳需要容納最大為MAX_RESIZERS=(1 << (32 - RESIZE_STAMP_BITS)) - 1的活躍擴(kuò)容線程,那么sizeCtl-2=resizeStamp(n)<<RESIZE_STAMP_BITS,就應(yīng)該要能表示這三組信息:負(fù)值,表示不同數(shù)組長(zhǎng)度的擴(kuò)容戳和活躍線程數(shù);
先看下源碼:

  static final int resizeStamp(int n) {
       //獲取數(shù)組長(zhǎng)度n前面零的個(gè)數(shù)一般為32-log(n),那么RESIZE_STAMP_BITS為擴(kuò)容戳需要的位數(shù)
        return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1));
    }

因?yàn)镽ESIZE_STAMP_BITS需要能展示出當(dāng)前數(shù)組擴(kuò)容次數(shù),而數(shù)組可擴(kuò)容次數(shù)最大值為32-log(n),所以RESIZE_STAMP_BITS>32>2^5,所以RESIZE_STAMP_BITS至少為6位;
則假設(shè)n=2^k,那么經(jīng)過resizeStamp之后得到的rs如下圖:


rs.png

;

經(jīng)過左移RESIZE_STAMP_SHIFT后如下圖:


sizeCtl.png

如上圖所示最高位為1表示負(fù)數(shù),擴(kuò)容戳的后五位表示當(dāng)前擴(kuò)容次數(shù),32-擴(kuò)容戳所占位數(shù)即可表示活躍擴(kuò)容線程數(shù),活躍擴(kuò)容線程數(shù)為1時(shí)表示初始化,為2時(shí)表示當(dāng)前擴(kuò)容線程數(shù)為1;

未完待續(xù).....

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容