Java基礎(chǔ)之ConcurrentHashMap

HashMap存在的問(wèn)題:

HashMap線程不安全

因?yàn)槎嗑€程環(huán)境下,使用Hashmap進(jìn)行put操作可能會(huì)引起死循環(huán),導(dǎo)致CPU利用率接近100%,所以在并發(fā)情況下不能使用HashMap。例如如下代碼:

final HashMap<String, String> map = new HashMap<String, String>(2);
for (int i = 0; i < 10000; i++) {
    new Thread(new Runnable() {
        @Override
        public void run() {
            map.put(UUID.randomUUID().toString(), "");
        }
    }).start();
}

Hashtable線程安全但效率低下

Hashtable容器使用synchronized來(lái)保證線程安全,但在線程競(jìng)爭(zhēng)激烈的情況下Hashtable的效率非常低下。因?yàn)楫?dāng)一個(gè)線程訪問(wèn)Hashtable的同步方法時(shí),其他線程訪問(wèn)Hashtable的同步方法時(shí),可能會(huì)進(jìn)入阻塞或輪詢狀態(tài)。如線程1使用put進(jìn)行添加元素,線程2不但不能使用put方法添加元素,并且也不能使用get方法來(lái)獲取元素,所以競(jìng)爭(zhēng)越激烈效率越低。

解決

分段鎖

HashTable容器在競(jìng)爭(zhēng)激烈的并發(fā)環(huán)境下表現(xiàn)出效率低下的原因,是因?yàn)樗性L問(wèn)HashTable的線程都必須競(jìng)爭(zhēng)同一把鎖,那假如容器里有多把鎖,每一把鎖用于鎖容器其中一部分?jǐn)?shù)據(jù),那么當(dāng)多線程訪問(wèn)容器里不同數(shù)據(jù)段的數(shù)據(jù)時(shí),線程間就不會(huì)存在鎖競(jìng)爭(zhēng),從而可以有效的提高并發(fā)訪問(wèn)效率,這就是ConcurrentHashMap所使用的鎖分段技術(shù),首先將數(shù)據(jù)分成一段一段的存儲(chǔ),然后給每一段數(shù)據(jù)配一把鎖,當(dāng)一個(gè)線程占用鎖訪問(wèn)其中一個(gè)段數(shù)據(jù)的時(shí)候,其他段的數(shù)據(jù)也能被其他線程訪問(wèn)。有些方法需要跨段,比如size()和containsValue(),它們可能需要鎖定整個(gè)表而而不僅僅是某個(gè)段,這需要按順序鎖定所有段,操作完畢后,又按順序釋放所有段的鎖。這里“按順序”是很重要的,否則極有可能出現(xiàn)死鎖,在ConcurrentHashMap內(nèi)部,段數(shù)組是final的,并且其成員變量實(shí)際上也是final的,但是,僅僅是將數(shù)組聲明為final的并不保證數(shù)組成員也是final的,這需要實(shí)現(xiàn)上的保證。這可以確保不會(huì)出現(xiàn)死鎖,因?yàn)楂@得鎖的順序是固定的。
ConcurrentHashMap是由Segment數(shù)組結(jié)構(gòu)和HashEntry數(shù)組結(jié)構(gòu)組成。Segment是一種可重入鎖ReentrantLock,在ConcurrentHashMap里扮演鎖的角色,HashEntry則用于存儲(chǔ)鍵值對(duì)數(shù)據(jù)。一個(gè)ConcurrentHashMap里包含一個(gè)Segment數(shù)組,Segment的結(jié)構(gòu)和HashMap類似,是一種數(shù)組和鏈表結(jié)構(gòu), 一個(gè)Segment里包含一個(gè)HashEntry數(shù)組,每個(gè)HashEntry是一個(gè)鏈表結(jié)構(gòu)的元素, 每個(gè)Segment守護(hù)者一個(gè)HashEntry數(shù)組里的元素,當(dāng)對(duì)HashEntry數(shù)組的數(shù)據(jù)進(jìn)行修改時(shí),必須首先獲得它對(duì)應(yīng)的Segment鎖。

ConcurrentHashMap

JDK1.8的實(shí)現(xiàn)已經(jīng)拋棄了Segment分段鎖機(jī)制,利用CAS+Synchronized來(lái)保證并發(fā)更新的安全。數(shù)據(jù)結(jié)構(gòu)采用:數(shù)組+鏈表+紅黑樹(shù)。


ConcurrentHashMap in JDK1.8

話不多說(shuō),還是看源碼吧:

構(gòu)造:

    //構(gòu)造方法
    public ConcurrentHashMap(int initialCapacity) {
        if (initialCapacity < 0)//判斷參數(shù)是否合法
            throw new IllegalArgumentException();
        int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
                   MAXIMUM_CAPACITY ://最大為2^30
                   tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));//根據(jù)參數(shù)調(diào)整table的大小
        this.sizeCtl = cap;//獲取容量
        //ConcurrentHashMap在構(gòu)造函數(shù)中只會(huì)初始化sizeCtl值,并不會(huì)直接初始化table
    }
    //調(diào)整table的大小
    private static final int tableSizeFor(int c) {//返回一個(gè)大于輸入?yún)?shù)且最小的為2的n次冪的數(shù)。
        int n = c - 1;
        n |= n >>> 1;
        n |= n >>> 2;
        n |= n >>> 4;
        n |= n >>> 8;
        n |= n >>> 16;
        return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1;
    }

tableSizeFor(int c)的原理:將c最高位以下通過(guò)|=運(yùn)算全部變成1,最后返回的時(shí)候,返回n+1;
eg:當(dāng)輸入為25的時(shí)候,n等于24,轉(zhuǎn)成二進(jìn)制為11000,右移1位為01100,將11000與01100進(jìn)行或("|")操作,得到11100。接下來(lái)右移兩位得00111,再進(jìn)行或操作得11111,接下來(lái)操作n的值就不會(huì)變化了。最后返回的時(shí)候,返回n+1,也就是100000,十進(jìn)制為32。按照這種邏輯得到2的n次冪的數(shù)。
那么為什么要先-1再+1呢?輸入若是為0,那么不論怎么操作,n還是0,但是HashMap的容量只有大于0時(shí)才有意義。

table初始化:

table初始化操作會(huì)延緩到第一次put行為。但是put是可以并發(fā)執(zhí)行的,那么是如何實(shí)現(xiàn)table只初始化一次的?接著上源碼:

    final V putVal(K key, V value, boolean onlyIfAbsent) {
        if (key == null || value == null) throw new NullPointerException();
        int hash = spread(key.hashCode());
        int binCount = 0;
        for (Node<K,V>[] tab = table;;) {
            Node<K,V> f; int n, i, fh; K fk; V fv;
            if (tab == null || (n = tab.length) == 0)//判斷table還未初始化
                tab = initTable();//初始化table
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
                if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value)))
                    break;                   // no lock when adding to empty bin
            }
           ...省略一部分源碼
        }
    } 
    
    private final Node<K,V>[] initTable() {
        Node<K,V>[] tab; int sc;
        while ((tab = table) == null || tab.length == 0) {
        //如果一個(gè)線程發(fā)現(xiàn)sizeCtl<0,意味著另外的線程執(zhí)行CAS操作成功,當(dāng)前線程只需要讓出cpu時(shí)間片,
        //由于sizeCtl是volatile的,保證了順序性和可見(jiàn)性
            if ((sc = sizeCtl) < 0)//sc保存了sizeCtl的值
                Thread.yield(); // lost initialization race; just spin
            else if (U.compareAndSetInt(this, SIZECTL, sc, -1)) {//cas操作判斷并置為-1
                try {
                    if ((tab = table) == null || tab.length == 0) {
                        int n = (sc > 0) ? sc : DEFAULT_CAPACITY;//DEFAULT_CAPACITY = 16,若沒(méi)有參數(shù)則大小默認(rèn)為16
                        @SuppressWarnings("unchecked")
                        Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                        table = tab = nt;
                        sc = n - (n >>> 2);
                    }
                } finally {
                    sizeCtl = sc;
                }
                break;
            }
        }
        return tab;
    }  

put操作

    
    final V putVal(K key, V value, boolean onlyIfAbsent) {
        if (key == null || value == null) throw new NullPointerException();
        int hash = spread(key.hashCode());//哈希算法
        int binCount = 0;
        for (Node<K,V>[] tab = table;;) {//無(wú)限循環(huán),確保插入成功
            Node<K,V> f; int n, i, fh; K fk; V fv;
            if (tab == null || (n = tab.length) == 0)//表為空或表長(zhǎng)度為0
                tab = initTable();//初始化表
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {//i = (n - 1) & hash為索引值,查找該元素,
            //如果為null,說(shuō)明第一次插入
                if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value)))
                    break;                   // no lock when adding to empty bin
            }
            else if ((fh = f.hash) == MOVED)//MOVED=-1;當(dāng)前正在擴(kuò)容,一起進(jìn)行擴(kuò)容操作
                tab = helpTransfer(tab, f);
            else if (onlyIfAbsent && fh == hash &&  // check first node
                     ((fk = f.key) == key || fk != null && key.equals(fk)) &&
                     (fv = f.val) != null)
                return fv;
            else {
                V oldVal = null;
                synchronized (f) {//其他情況加鎖同步
                    if (tabAt(tab, i) == f) {
                        if (fh >= 0) {
                            binCount = 1;
                            for (Node<K,V> e = f;; ++binCount) {
                                K ek;
                                if (e.hash == hash &&
                                    ((ek = e.key) == key ||
                                     (ek != null && key.equals(ek)))) {
                                    oldVal = e.val;
                                    if (!onlyIfAbsent)
                                        e.val = value;
                                    break;
                                }
                                Node<K,V> pred = e;
                                if ((e = e.next) == null) {
                                    pred.next = new Node<K,V>(hash, key, value);
                                    break;
                                }
                            }
                        }
                        else if (f instanceof TreeBin) {
                            Node<K,V> p;
                            binCount = 2;
                            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                           value)) != null) {
                                oldVal = p.val;
                                if (!onlyIfAbsent)
                                    p.val = value;
                            }
                        }
                        else if (f instanceof ReservationNode)
                            throw new IllegalStateException("Recursive update");
                    }
                }
                if (binCount != 0) {
                    if (binCount >= TREEIFY_THRESHOLD)
                        treeifyBin(tab, i);
                    if (oldVal != null)
                        return oldVal;
                    break;
                }
            }
        }
        addCount(1L, binCount);
        return null;
    }
    //哈希算法
    static final int spread(int h) {
        return (h ^ (h >>> 16)) & HASH_BITS;
    }
    //保證拿到最新的數(shù)據(jù)
    static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
        return (Node<K,V>)U.getObjectAcquire(tab, ((long)i << ASHIFT) + ABASE);
    }
    //CAS操作插入節(jié)點(diǎn),比較數(shù)組下標(biāo)為i的節(jié)點(diǎn)是否為c,若是,用v交換,否則不操作。
    //如果CAS成功,表示插入成功,結(jié)束循環(huán)進(jìn)行addCount(1L, binCount)看是否需要擴(kuò)容
    static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
                                        Node<K,V> c, Node<K,V> v) {
        return U.compareAndSetObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
    }

table擴(kuò)容

當(dāng)table容量不足的時(shí)候,即table的元素?cái)?shù)量達(dá)到容量閾值sizeCtl,需要對(duì)table進(jìn)行擴(kuò)容。 整個(gè)擴(kuò)容分為兩部分:

  1. 構(gòu)建一個(gè)nextTable,大小為table的兩倍。
  2. 把table的數(shù)據(jù)復(fù)制到nextTable中。
    這兩個(gè)過(guò)程在單線程下實(shí)現(xiàn)很簡(jiǎn)單,但是ConcurrentHashMap是支持并發(fā)插入的,擴(kuò)容操作自然也會(huì)有并發(fā)的出現(xiàn),這種情況下,第二步可以支持節(jié)點(diǎn)的并發(fā)復(fù)制,這樣性能自然提升不少,但實(shí)現(xiàn)的復(fù)雜度也上升了一個(gè)臺(tái)階。
    繼續(xù)上源碼:
    第一步,構(gòu)建nextTable,毫無(wú)疑問(wèn),這個(gè)過(guò)程只能只有單個(gè)線程進(jìn)行nextTable的初始化.
private final void addCount(long x, int check) {
    ... 省略部分代碼
    if (check >= 0) {
        Node<K,V>[] tab, nt; int n, sc;
        while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
               (n = tab.length) < MAXIMUM_CAPACITY) {
            int rs = resizeStamp(n);
            if (sc < 0) {// sc < 0 表明此時(shí)有別的線程正在進(jìn)行擴(kuò)容
                if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                    sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                    transferIndex <= 0)
                    break;
                if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                // 不滿足前面5個(gè)條件時(shí),嘗試參與此次擴(kuò)容,把正在執(zhí)行transfer任務(wù)的線程數(shù)加1,+2代表有1個(gè),+1代表有0個(gè)
                    transfer(tab, nt);
            }
            //試著讓自己成為第一個(gè)執(zhí)行transfer任務(wù)的線程
            else if (U.compareAndSwapInt(this, SIZECTL, sc,
                                         (rs << RESIZE_STAMP_SHIFT) + 2))
                transfer(tab, null);// 去執(zhí)行transfer任務(wù)
            s = sumCount();// 重新計(jì)數(shù),判斷是否需要開(kāi)啟下一輪擴(kuò)容
        }
    }
}

節(jié)點(diǎn)從table移動(dòng)到nextTable,大體思想是遍歷、復(fù)制的過(guò)程。遍歷過(guò)所有的節(jié)點(diǎn)以后就完成了復(fù)制工作,把table指向nextTable,并更新sizeCtl為新數(shù)組大小的0.75倍 ,擴(kuò)容完成。

get操作

  1. 判斷table是否為空,如果為空,直接返回null。
  2. 計(jì)算key的hash值,并獲取指定table中指定位置的Node節(jié)點(diǎn),通過(guò)遍歷鏈表或則樹(shù)結(jié)構(gòu)找到對(duì)應(yīng)的節(jié)點(diǎn),返回value值。

源碼:

public V get(Object key) {
    Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
    int h = spread(key.hashCode());
    if ((tab = table) != null && (n = tab.length) > 0 &&
        (e = tabAt(tab, (n - 1) & h)) != null) {
        if ((eh = e.hash) == h) {
            if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                return e.val;
        }
        else if (eh < 0)
            return (p = e.find(h, key)) != null ? p.val : null;
        while ((e = e.next) != null) {
            if (e.hash == h &&
                ((ek = e.key) == key || (ek != null && key.equals(ek))))
                return e.val;
        }
    }
    return null;
}

和HashTable的區(qū)別:

ConcurrentHashMap 是一個(gè)并發(fā)散列映射表,它允許完全并發(fā)的讀取,并且支持給定數(shù)量的并發(fā)更新。
而HashTable和同步包裝器包裝的 HashMap,使用一個(gè)全局的鎖來(lái)同步不同線程間的并發(fā)訪問(wèn),同一時(shí)間點(diǎn),只能有一個(gè)線程持有鎖,也就是說(shuō)在同一時(shí)間點(diǎn),只能有一個(gè)線程能訪問(wèn)容器,這雖然保證多線程間的安全并發(fā)訪問(wèn),但同時(shí)也導(dǎo)致對(duì)容器的訪問(wèn)變成串行化的了。

總結(jié):

Hashtable的任何操作都會(huì)把整個(gè)表鎖住,是阻塞的。好處是總能獲取最實(shí)時(shí)的更新,比如說(shuō)線程A調(diào)用putAll寫(xiě)入大量數(shù)據(jù),期間線程B調(diào)用get,線程B就會(huì)被阻塞,直到線程A完成putAll,因此線程B肯定能獲取到線程A寫(xiě)入的完整數(shù)據(jù)。壞處是所有調(diào)用都要排隊(duì),效率較低。
ConcurrentHashMap 是設(shè)計(jì)為非阻塞的。在更新時(shí)會(huì)局部鎖住某部分?jǐn)?shù)據(jù),但不會(huì)把整個(gè)表都鎖住。同步讀取操作則是完全非阻塞的。好處是在保證合理的同步前提下,效率很高。壞處是嚴(yán)格來(lái)說(shuō)讀取操作不能保證反映最近的更新。例如線程A調(diào)用putAll寫(xiě)入大量數(shù)據(jù),期間線程B調(diào)用get,則只能get到目前為止已經(jīng)順利插入的部分?jǐn)?shù)據(jù)。
應(yīng)該根據(jù)具體的應(yīng)用場(chǎng)景選擇合適的HashMap。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容