深度剖析ConcurrentHashMap

1.原理解析

線程安全的保證:CAS+Synchonized
數(shù)據(jù)存儲實現(xiàn):數(shù)組+鏈表+紅黑樹


image.png

1.1 成員變量

  • table: transient volatile Node<K,V>[] table
    一個Node類型的表,默認為null,初始化在第一次put操作時,默認大小為16,擴容時大小總是2的冪次方
  • nextTable: private transient volatile Node<K,V>[] nextTable
    默認為null,擴容時新生成的數(shù)組,大小為原數(shù)組的2倍
  • sizeCtl: private transient volatile int sizeCtl
    默認為0,用來控制table的初始化和擴容操作。
    -1 表示正在table初始化,-N表示有N-1個線程正在擴容
    如果table未完成初始化,則表示table初始化需要的大小,如果已經(jīng)完成初始化,則表示table的容量,默認為table大小的0.75倍
  • Node: 保存key和value以及key的hash值的數(shù)據(jù)結(jié)構(gòu)。其中value和next都用volatile修飾,保證了并發(fā)中的可見性。
static class Node<K,V> implements Map.Entry<K,V> {
        final int hash;
        final K key;
        volatile V val;
        volatile Node<K,V> next;
}
  • ForwardingNode: 一個特殊的節(jié)點,hash值為-1,只有在擴容的時候使用,作為一個占位符,表示當前節(jié)點為null或者已經(jīng)遷移。
static final class ForwardingNode<K,V> extends Node<K,V> {
        final Node<K,V>[] nextTable;
        ForwardingNode(Node<K,V>[] tab) {
            super(MOVED, null, null, null);
            this.nextTable = tab;
        }
}

2.常見方法

2.1 初始化table

ConcurrentHashTable的初始化操作是在第一次put操作時進行的,而且只會初始化一次:


putVal操作

這里的initTable,就是對table變量進行初始化操作:

private final Node<K,V>[] initTable() {
        Node<K,V>[] tab; int sc;
        while ((tab = table) == null || tab.length == 0) {
            if ((sc = sizeCtl) < 0)//如果一個線程發(fā)現(xiàn)當前sizeCtl小于0,表明當前有其他
                              //線程在對table執(zhí)行cas成功,需要當前線程讓出時間片
                Thread.yield(); // lost initialization race; just spin
            else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
                try {
                    if ((tab = table) == null || tab.length == 0) {
                        int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                        @SuppressWarnings("unchecked")
                        Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                        table = tab = nt;
                        sc = n - (n >>> 2);
                    }
                } finally {
                    sizeCtl = sc;
                }
                break;
            }
        }
        return tab;
    }

這里講解下U.compareAndSwapInt(this, SIZECTL, sc, -1)操作哈
這里的U是private static final sun.misc.Unsafe U;,是Unsafe類,這個方法CompareAnsSwapInt,就是樂觀鎖CAS了。這里有四個參數(shù),第一個,第二個參數(shù)用來確定當前操作對象在內(nèi)存中的存儲值,然后和第三個expect value比較,如果相等,則將內(nèi)存值更新為第四個updaet value值。關(guān)于CAS的介紹,在這里詳細介紹。在原子性的保證下,將sc的值設置為-1,表明當前table正在初始化。

2.2 put操作

直接看源碼,參看源碼進行解釋:

final V putVal(K key, V value, boolean onlyIfAbsent) {
        if (key == null || value == null) throw new NullPointerException();
        int hash = spread(key.hashCode());
        int binCount = 0;
        for (Node<K,V>[] tab = table;;) {
            Node<K,V> f; int n, i, fh;
            if (tab == null || (n = tab.length) == 0)
                tab = initTable();
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
                if (casTabAt(tab, i, null,
                             new Node<K,V>(hash, key, value, null)))
                    break;                   // no lock when adding to empty bin
            }
            else if ((fh = f.hash) == MOVED)
                tab = helpTransfer(tab, f);
            else {
                V oldVal = null;
                synchronized (f) {
                    if (tabAt(tab, i) == f) {
                        if (fh >= 0) {
                            binCount = 1;
                            for (Node<K,V> e = f;; ++binCount) {
                                K ek;
                                if (e.hash == hash &&
                                    ((ek = e.key) == key ||
                                     (ek != null && key.equals(ek)))) {
                                    oldVal = e.val;
                                    if (!onlyIfAbsent)
                                        e.val = value;
                                    break;
                                }
                                Node<K,V> pred = e;
                                if ((e = e.next) == null) {
                                    pred.next = new Node<K,V>(hash, key,
                                                              value, null);
                                    break;
                                }
                            }
                        }
                        else if (f instanceof TreeBin) {
                            Node<K,V> p;
                            binCount = 2;
                            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                           value)) != null) {
                                oldVal = p.val;
                                if (!onlyIfAbsent)
                                    p.val = value;
                            }
                        }
                    }
                }
                if (binCount != 0) {
                    if (binCount >= TREEIFY_THRESHOLD)
                        treeifyBin(tab, i);
                    if (oldVal != null)
                        return oldVal;
                    break;
                }
            }
        }
        addCount(1L, binCount);
        return null;
    }

putVal的操作可以分為三個步驟【假設table已經(jīng)初始化了】:

  • bucket為空時,采用CAS將Node放到對應的bucket中
  • 當前map正在擴容f.hash == MOVED,則先進行擴容,再進行更新值
  • 發(fā)生hash沖突時,先使用synchonized字鎖住。倘若當前的hash值對應的是鏈表的頭節(jié)點,則遍歷鏈表,如果能找到hash對應的節(jié)點,則更改對應的值,否則在鏈表的尾部增加節(jié)點;倘若當前的hash值對應的是紅黑樹的根節(jié)點,則在樹結(jié)構(gòu)上遍歷,進行更新或者插入操作。
    整個putval的操作中,是用for循環(huán)來外包的。當遇到map正在擴容時,沒有break操作,會等待map擴容結(jié)束后,進行更新或者插入值。
    這里一些細節(jié)點再進行介紹下:

2.2.1 hash算法

這里和hashmap一樣:

static final int HASH_BITS = 0x7fffffff; // usable bits of normal node hash
static final int spread(int h) {
        return (h ^ (h >>> 16)) & HASH_BITS;
    }

2.2.2 獲取table所對應的索引元素

static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
        return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
    }

采用Unsafe.getObjectVolatie()來獲取,而不是直接用table[index]的原因跟ConcurrentHashMap的弱一致性有關(guān)。在java內(nèi)存模型中,我們已經(jīng)知道每個線程都有一個工作內(nèi)存,里面存儲著table的副本,雖然table是volatile修飾的,但不能保證線程每次都拿到table中的最新元素,Unsafe.getObjectVolatile可以直接獲取指定內(nèi)存的數(shù)據(jù),保證了每次拿到數(shù)據(jù)都是最新的。

2.3 擴容

為什么會擴容?

  • 當ConcurrentHashMap的tab長度大于64時,會使用紅黑樹
  • 新增節(jié)點后,如果鏈表的長度大于8時,會調(diào)用treeifyBin把鏈表轉(zhuǎn)換為紅黑樹。在轉(zhuǎn)換結(jié)構(gòu)時,如果tab的長度小于MIN_TREEIFY_CAPACITY,默認是64,則會將數(shù)組長度擴大到原來的兩倍。并觸發(fā)tranfer,重新調(diào)整節(jié)點的位置
  • 新增節(jié)點后,如果addCount中統(tǒng)計的節(jié)點數(shù)超過sizeCtl,也會觸發(fā)tranfer,進行位置調(diào)整

2.3.1 addCount

private final void addCount(long x, int check) {
        CounterCell[] as; long b, s;
        if ((as = counterCells) != null ||
            !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
            CounterCell a; long v; int m;
            boolean uncontended = true;
            if (as == null || (m = as.length - 1) < 0 ||
                (a = as[ThreadLocalRandom.getProbe() & m]) == null ||
                !(uncontended =
                  U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
                fullAddCount(x, uncontended);
                return;
            }
            if (check <= 1)
                return;
            s = sumCount();
        }
        if (check >= 0) {
            Node<K,V>[] tab, nt; int n, sc;
            while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
                   (n = tab.length) < MAXIMUM_CAPACITY) {
                int rs = resizeStamp(n);
                if (sc < 0) {
                    if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                        sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                        transferIndex <= 0)
                        break;
                    if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                        transfer(tab, nt);
                }
                else if (U.compareAndSwapInt(this, SIZECTL, sc,
                                             (rs << RESIZE_STAMP_SHIFT) + 2))
                    transfer(tab, null);
                s = sumCount();
            }
        }
    }

每次put后,會計算節(jié)點的數(shù)量

2.3.2 treeifyBin

private final void treeifyBin(Node<K,V>[] tab, int index) {
        Node<K,V> b; int n, sc;
        if (tab != null) {
            if ((n = tab.length) < MIN_TREEIFY_CAPACITY)
                tryPresize(n << 1);
            else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {
                synchronized (b) {
                    if (tabAt(tab, index) == b) {
                        TreeNode<K,V> hd = null, tl = null;
                        for (Node<K,V> e = b; e != null; e = e.next) {
                            TreeNode<K,V> p =
                                new TreeNode<K,V>(e.hash, e.key, e.val,
                                                  null, null);
                            if ((p.prev = tl) == null)
                                hd = p;
                            else
                                tl.next = p;
                            tl = p;
                        }
                        setTabAt(tab, index, new TreeBin<K,V>(hd));
                    }
                }
            }
        }
    }

將鏈表轉(zhuǎn)換為紅黑樹

2.3.3 transfer

private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
        int n = tab.length, stride;
        if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
            stride = MIN_TRANSFER_STRIDE; // subdivide range
        if (nextTab == null) {            // initiating
            try {
                @SuppressWarnings("unchecked")
                Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
                nextTab = nt;
            } catch (Throwable ex) {      // try to cope with OOME
                sizeCtl = Integer.MAX_VALUE;
                return;
            }
            nextTable = nextTab;
            transferIndex = n;
        }
        int nextn = nextTab.length;
        ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
        boolean advance = true;
        boolean finishing = false; // to ensure sweep before committing nextTab
        for (int i = 0, bound = 0;;) {
            Node<K,V> f; int fh;
            while (advance) {
                int nextIndex, nextBound;
                if (--i >= bound || finishing)
                    advance = false;
                else if ((nextIndex = transferIndex) <= 0) {
                    i = -1;
                    advance = false;
                }
                else if (U.compareAndSwapInt
                         (this, TRANSFERINDEX, nextIndex,
                          nextBound = (nextIndex > stride ?
                                       nextIndex - stride : 0))) {
                    bound = nextBound;
                    i = nextIndex - 1;
                    advance = false;
                }
            }
            if (i < 0 || i >= n || i + n >= nextn) {
                int sc;
                if (finishing) {
                    nextTable = null;
                    table = nextTab;
                    sizeCtl = (n << 1) - (n >>> 1);
                    return;
                }
                if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
                    if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                        return;
                    finishing = advance = true;
                    i = n; // recheck before commit
                }
            }
            else if ((f = tabAt(tab, i)) == null)
                advance = casTabAt(tab, i, null, fwd);
            else if ((fh = f.hash) == MOVED)
                advance = true; // already processed
            else {
                synchronized (f) {
                    if (tabAt(tab, i) == f) {
                        Node<K,V> ln, hn;
                        if (fh >= 0) {
                            int runBit = fh & n;
                            Node<K,V> lastRun = f;
                            for (Node<K,V> p = f.next; p != null; p = p.next) {
                                int b = p.hash & n;
                                if (b != runBit) {
                                    runBit = b;
                                    lastRun = p;
                                }
                            }
                            if (runBit == 0) {
                                ln = lastRun;
                                hn = null;
                            }
                            else {
                                hn = lastRun;
                                ln = null;
                            }
                            for (Node<K,V> p = f; p != lastRun; p = p.next) {
                                int ph = p.hash; K pk = p.key; V pv = p.val;
                                if ((ph & n) == 0)
                                    ln = new Node<K,V>(ph, pk, pv, ln);
                                else
                                    hn = new Node<K,V>(ph, pk, pv, hn);
                            }
                            setTabAt(nextTab, i, ln);
                            setTabAt(nextTab, i + n, hn);
                            setTabAt(tab, i, fwd);
                            advance = true;
                        }
                        else if (f instanceof TreeBin) {
                            TreeBin<K,V> t = (TreeBin<K,V>)f;
                            TreeNode<K,V> lo = null, loTail = null;
                            TreeNode<K,V> hi = null, hiTail = null;
                            int lc = 0, hc = 0;
                            for (Node<K,V> e = t.first; e != null; e = e.next) {
                                int h = e.hash;
                                TreeNode<K,V> p = new TreeNode<K,V>
                                    (h, e.key, e.val, null, null);
                                if ((h & n) == 0) {
                                    if ((p.prev = loTail) == null)
                                        lo = p;
                                    else
                                        loTail.next = p;
                                    loTail = p;
                                    ++lc;
                                }
                                else {
                                    if ((p.prev = hiTail) == null)
                                        hi = p;
                                    else
                                        hiTail.next = p;
                                    hiTail = p;
                                    ++hc;
                                }
                            }
                            ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
                                (hc != 0) ? new TreeBin<K,V>(lo) : t;
                            hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
                                (lc != 0) ? new TreeBin<K,V>(hi) : t;
                            setTabAt(nextTab, i, ln);
                            setTabAt(nextTab, i + n, hn);
                            setTabAt(tab, i, fwd);
                            advance = true;
                        }
                    }
                }
            }
        }
    }

擴容的代碼有點長,簡單的過程描述是,當tab元素的量達到容量閾值sizeCtl時,觸發(fā)擴容:

  1. 構(gòu)建一個NextTable,其大小為Table的兩倍
  2. 把table中的數(shù)據(jù),復制到NextTable中
    在擴容的過程中,依然支持并發(fā)更新操作,也支持并發(fā)插入。
    如何在擴容時,并發(fā)地復制與插入?
  • 遍歷整個table,當前節(jié)點為空,則采用CAS的方式在當前位置放入fwd
  • 當前節(jié)點已經(jīng)為fwd(with hash field “MOVED”),則已經(jīng)有有線程處理完了了,直接跳過 ,這里是控制并發(fā)擴容的核心
  • 當前節(jié)點為鏈表節(jié)點或紅黑樹,重新計算鏈表節(jié)點的hash值,移動到nextTable相應的位置(構(gòu)建了一個反序鏈表和順序鏈表,分別放置在i和i+n的位置上)。移動完成后,用Unsafe.putObjectVolatile在tab的原位置賦為為fwd, 表示當前節(jié)點已經(jīng)完成擴容。

2.4 get 讀操作

public V get(Object key) {
        Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
        int h = spread(key.hashCode());
        if ((tab = table) != null && (n = tab.length) > 0 &&
            (e = tabAt(tab, (n - 1) & h)) != null) {
            if ((eh = e.hash) == h) {
                if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                    return e.val;
            }
            else if (eh < 0)
                return (p = e.find(h, key)) != null ? p.val : null;
            while ((e = e.next) != null) {
                if (e.hash == h &&
                    ((ek = e.key) == key || (ek != null && key.equals(ek))))
                    return e.val;
            }
        }
        return null;
    }

讀操作比較簡單了,不需要控制并發(fā),如果tab為空,返回null,否則計算hash值,找到bucket中對應的位置,如果是node節(jié)點直接返回,否則,返回null。

3.在jdk1.7和1.8之間的不同

3.1 并發(fā)機制

  • 在jdk1.7中,concurrentHashMap是通過鎖分段技術(shù)。把整個table分為幾個segment,每個segment中分配一個鎖,每個segment下,又分table+HashEntry。通過給segment加一個重入鎖,實現(xiàn)線程安全?!鞠啾扔趆ashtable,就是把hashtable分割為多個段,細化鎖的粒度】。
  • 在jdk1.8中,直接使用CAS + synchronized保證并發(fā)更新。粒度更細,不會在一個segment上加一把鎖了。

3.2 put操作

  • 在1.7中,多個線程同時競爭獲取同一個segment鎖,獲取成功的線程更新map;失敗的線程嘗試多次獲取鎖仍未成功,則掛起線程,等待釋放鎖
  • 在1.8中,訪問相應的bucket時,使用sychronizeded關(guān)鍵字,防止多個線程同時操作同一個bucket,如果該節(jié)點的hash不小于0,則遍歷鏈表更新節(jié)點或插入新節(jié)點;如果該節(jié)點是TreeBin類型的節(jié)點,說明是紅黑樹結(jié)構(gòu),則通過putTreeVal方法往紅黑樹中插入節(jié)點;更新了節(jié)點數(shù)量,還要考慮擴容和鏈表轉(zhuǎn)紅黑樹

4 ConcurrentHashMap能替換HashTable嗎?

hash table雖然性能上不如ConcurrentHashMap,但并不能完全被取代,兩者的迭代器的一致性不同的,hash table的迭代器是強一致性的,而concurrenthashmap是弱一致的。 ConcurrentHashMap的get,clear,iterator 都是弱一致性的。
下面是大白話的解釋:

  • Hashtable的任何操作都會把整個表鎖住,是阻塞的。好處是總能獲取最實時的更新,比如說線程A調(diào)用putAll寫入大量數(shù)據(jù),期間線程B調(diào)用get,線程B就會被阻塞,直到線程A完成putAll,因此線程B肯定能獲取到線程A寫入的完整數(shù)據(jù)。壞處是所有調(diào)用都要排隊,效率較低。
  • ConcurrentHashMap 是設計為非阻塞的。在更新時會局部鎖住某部分數(shù)據(jù),但不會把整個表都鎖住。同步讀取操作則是完全非阻塞的。好處是在保證合理的同步前提下,效率很高。壞處 是嚴格來說讀取操作不能保證反映最近的更新。例如線程A調(diào)用putAll寫入大量數(shù)據(jù),期間線程B調(diào)用get,則只能get到目前為止已經(jīng)順利插入的部分 數(shù)據(jù)。

選擇哪一個,是在性能與數(shù)據(jù)一致性之間權(quán)衡。ConcurrentHashMap適用于追求性能的場景,大多數(shù)線程都只做insert/delete操作,對讀取數(shù)據(jù)的一致性要求較低。

Reference:
[1] https://blog.csdn.net/programmer_at/article/details/79715177

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容