17-并發(fā)容器之ConcurrentHashMap

深入分析ConcurrentHashMap

JDK5中添加了新的concurrent包,相對(duì)同步容器而言,并發(fā)容器通過(guò)一些機(jī)制改進(jìn)了并發(fā)性能。因?yàn)橥饺萜鲗⑺袑?duì)容器狀態(tài)的訪問(wèn)都串行化了,這樣保證了線程的安全性,所以這種方法的代價(jià)就是嚴(yán)重降低了并發(fā)性,當(dāng)多個(gè)線程競(jìng)爭(zhēng)容器時(shí),吞吐量嚴(yán)重降低。因此Java5.0開始針對(duì)多線程并發(fā)訪問(wèn)設(shè)計(jì),提供了并發(fā)性能較好的并發(fā)容器,引入了java.util.concurrent包。與Vector和Hashtable、Collections.synchronizedXxx()等同步容器相比,java.util.concurrent中引入的并發(fā)容器主要解決了兩個(gè)問(wèn)題:

1)根據(jù)具體場(chǎng)景進(jìn)行設(shè)計(jì),盡量避免synchronized,提供并發(fā)性;

2)定義了一些并發(fā)安全的復(fù)合操作,并且保證并發(fā)環(huán)境下的迭代操作不會(huì)出錯(cuò)。

java.util.concurrent中的容器在迭代時(shí),可以不封裝在synchronized中,可以保證不拋異常,但是未必每次看到的都是“最新的、當(dāng)前的”數(shù)據(jù)。

下面是對(duì)并發(fā)容器的簡(jiǎn)單介紹:

ConcurrentHashMap代替同步的Map(Collections.synchronizedMap(new HashMap())),ConcurrentHashMap也增加了對(duì)常用復(fù)合操作的支持,比如“若沒(méi)有則添加”:putIfAbsent(),替換:replace()。這2個(gè)操作都是原子操作。

CopyOnWriteArrayList和CopyOnWriteArraySet分別代替List和Set,主要是在遍歷操作為主的情況下來(lái)代替同步的List和同步的Set,這也就是上面所述的思路:迭代過(guò)程要保證不出錯(cuò),除了加鎖,另外一種方法就是“克隆”容器對(duì)象。

ConcurrentLinkedQuerue是一個(gè)先進(jìn)先出的隊(duì)列。它是非阻塞隊(duì)列。

ConcurrentSkipListMap可以在高效并發(fā)中替代SoredMap(例如用Collections.synchronizedMap包裝的TreeMap)。

ConcurrentSkipListSet可以在高效并發(fā)中替代SoredSet(例如用Collections.synchronizedSet包裝的TreeSet)。

ConcurrentHashMap的鎖分段技術(shù)

HashTable使用synchronized來(lái)保證線程安全,但在線程競(jìng)爭(zhēng)激烈的情況下HashTable的效率非常低下。因?yàn)楫?dāng)一個(gè)線程訪問(wèn)HashTable的同步方法時(shí),其他線程訪問(wèn)HashTable的同步方法時(shí),可能會(huì)進(jìn)入阻塞或輪詢狀態(tài)。如線程1使用put進(jìn)行添加元素,線程2不但不能使用put方法添加元素,并且也不能使用get方法來(lái)獲取元素,所以競(jìng)爭(zhēng)越激烈效率越低。

HashTable容器在競(jìng)爭(zhēng)激烈的并發(fā)環(huán)境下表現(xiàn)出效率低下的原因,是因?yàn)樗性L問(wèn)HashTable的線程都必須競(jìng)爭(zhēng)同一把鎖,那假如容器里有多把鎖,每一把鎖用于鎖容器其中一部分?jǐn)?shù)據(jù),那么當(dāng)多線程訪問(wèn)容器里不同數(shù)據(jù)段的數(shù)據(jù)時(shí),線程間就不會(huì)存在鎖競(jìng)爭(zhēng),從而可以有效的提高并發(fā)訪問(wèn)效率,這就是ConcurrentHashMap所使用的鎖分段技術(shù),首先將數(shù)據(jù)分成一段一段的存儲(chǔ),然后給每一段數(shù)據(jù)配一把鎖,當(dāng)一個(gè)線程占用鎖訪問(wèn)其中一個(gè)段數(shù)據(jù)的時(shí)候,其他段的數(shù)據(jù)也能被其他線程訪問(wèn)。

ConcurrentHashMap的內(nèi)部結(jié)構(gòu)

我們通過(guò)ConcurrentHashMap的類圖來(lái)分析ConcurrentHashMap的結(jié)構(gòu):

image.png

ConcurrentHashMap為了提高本身的并發(fā)能力,在內(nèi)部采用了一個(gè)叫做Segment的結(jié)構(gòu),一個(gè)Segment其實(shí)就是一個(gè)類Hash Table的結(jié)構(gòu),Segment內(nèi)部維護(hù)了一個(gè)鏈表數(shù)組,我們用下面這一幅圖來(lái)看下ConcurrentHashMap的內(nèi)部結(jié)構(gòu):

image.png

從上面的結(jié)構(gòu)我們可以了解到,ConcurrentHashMap定位一個(gè)元素的過(guò)程需要進(jìn)行兩次Hash操作,第一次Hash定位到Segment,第二次Hash定位到元素所在的鏈表的頭部,因此,這一種結(jié)構(gòu)的帶來(lái)的副作用是Hash的過(guò)程要比普通的HashMap要長(zhǎng),但是帶來(lái)的好處是寫操作的時(shí)候可以只對(duì)元素所在的Segment進(jìn)行加鎖即可,不會(huì)影響到其他的Segment,這樣,在最理想的情況下,ConcurrentHashMap可以最高同時(shí)支持Segment數(shù)量大小的寫操作(剛好這些寫操作都非常平均地分布在所有的Segment上),所以,通過(guò)這一種結(jié)構(gòu),ConcurrentHashMap的并發(fā)能力可以大大的提高。

Segment

我們?cè)賮?lái)具體了解一下Segment的數(shù)據(jù)結(jié)構(gòu):

static final class Segment<K,V> extends ReentrantLock implements Serializable {
    transient volatile int count;
    transient int modCount;
    transient int threshold;
    transient volatile HashEntry<K,V>[] table;
    final float loadFactor;
}

詳細(xì)解釋一下Segment里面的成員變量的意義:

1)count:Segment中元素的數(shù)量;

2)modCount:對(duì)table的大小造成影響的操作的數(shù)量(比如put或者remove操作);

3)threshold:閾值,Segment里面元素的數(shù)量超過(guò)這個(gè)值就會(huì)對(duì)Segment進(jìn)行擴(kuò)容;

4)table:鏈表數(shù)組,數(shù)組中的每一個(gè)元素代表了一個(gè)鏈表的頭部;

5)loadFactor:負(fù)載因子,用于確定threshold。

HashEntry

Segment中的元素是以HashEntry的形式存放在鏈表數(shù)組中的,看一下HashEntry的結(jié)構(gòu):

static final class HashEntry<K,V> {
    final K key;
    final int hash;
    volatile V value;
    final HashEntry<K,V> next;
}

可以看到HashEntry的一個(gè)特點(diǎn),除了value以外,其他的幾個(gè)變量都是final的,這樣做是為了防止鏈表結(jié)構(gòu)被破壞,出現(xiàn)ConcurrentModification的情況。

ConcurrentHashMap的初始化

下面我們來(lái)結(jié)合源代碼來(lái)具體分析一下ConcurrentHashMap的實(shí)現(xiàn),先看下初始化方法:

public ConcurrentHashMap(int initialCapacity,
                         float loadFactor, int concurrencyLevel) {
    if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
        throw new IllegalArgumentException();
  
    if (concurrencyLevel > MAX_SEGMENTS)
        concurrencyLevel = MAX_SEGMENTS;
  
    // Find power-of-two sizes best matching arguments
    int sshift = 0;
    int ssize = 1;
    while (ssize < concurrencyLevel) {
        ++sshift;
        ssize <<= 1;
    }
    segmentShift = 32 - sshift;
    segmentMask = ssize - 1;
    this.segments = Segment.newArray(ssize);
  
    if (initialCapacity > MAXIMUM_CAPACITY)
        initialCapacity = MAXIMUM_CAPACITY;
    int c = initialCapacity / ssize;
    if (c * ssize < initialCapacity)
        ++c;
    int cap = 1;
    while (cap < c)
        cap <<= 1;
  
    for (int i = 0; i < this.segments.length; ++i)
        this.segments[i] = new Segment<K,V>(cap, loadFactor);
}

CurrentHashMap的初始化一共有三個(gè)參數(shù),一個(gè)initialCapacity,表示初始的容量,一個(gè)loadFactor,表示負(fù)載參數(shù),最后一個(gè)是concurrentLevel,代表ConcurrentHashMap內(nèi)部的Segment的數(shù)量,ConcurrentLevel一經(jīng)指定,不可改變,后續(xù)如果ConcurrentHashMap的元素?cái)?shù)量增加導(dǎo)致ConrruentHashMap需要擴(kuò)容,ConcurrentHashMap不會(huì)增加Segment的數(shù)量,而只會(huì)增加Segment中鏈表數(shù)組的容量大小,這樣的好處是擴(kuò)容過(guò)程不需要對(duì)整個(gè)ConcurrentHashMap做rehash,而只需要對(duì)Segment里面的元素做一次rehash就可以了。

整個(gè)ConcurrentHashMap的初始化方法還是非常簡(jiǎn)單的,先是根據(jù)concurrentLevel來(lái)new出Segment,這里Segment的數(shù)量是不大于concurrentLevel的最大的2的指數(shù),就是說(shuō)Segment的數(shù)量永遠(yuǎn)是2的指數(shù)個(gè),這樣的好處是方便采用移位操作來(lái)進(jìn)行hash,加快hash的過(guò)程。接下來(lái)就是根據(jù)intialCapacity確定Segment的容量的大小,每一個(gè)Segment的容量大小也是2的指數(shù),同樣使為了加快hash的過(guò)程。

這邊需要特別注意一下兩個(gè)變量,分別是segmentShift和segmentMask,這兩個(gè)變量在后面將會(huì)起到很大的作用,假設(shè)構(gòu)造函數(shù)確定了Segment的數(shù)量是2的n次方,那么segmentShift就等于32減去n,而segmentMask就等于2的n次方減一。

ConcurrentHashMap的get操作

前面提到過(guò)ConcurrentHashMap的get操作是不用加鎖的,我們這里看一下其實(shí)現(xiàn):

public V get(Object key) {
    int hash = hash(key.hashCode());
    return segmentFor(hash).get(key, hash);
}

看第三行,segmentFor這個(gè)函數(shù)用于確定操作應(yīng)該在哪一個(gè)segment中進(jìn)行,幾乎對(duì)ConcurrentHashMap的所有操作都需要用到這個(gè)函數(shù),我們看下這個(gè)函數(shù)的實(shí)現(xiàn):

final Segment<K,V> segmentFor(int hash) {
    return segments[(hash >>> segmentShift) & segmentMask];
}

這個(gè)函數(shù)用了位操作來(lái)確定Segment,根據(jù)傳入的hash值向右無(wú)符號(hào)右移segmentShift位,然后和segmentMask進(jìn)行與操作,結(jié)合我們之前說(shuō)的segmentShift和segmentMask的值,就可以得出以下結(jié)論:假設(shè)Segment的數(shù)量是2的n次方,根據(jù)元素的hash值的高n位就可以確定元素到底在哪一個(gè)Segment中。

在確定了需要在哪一個(gè)segment中進(jìn)行操作以后,接下來(lái)的事情就是調(diào)用對(duì)應(yīng)的Segment的get方法:

V get(Object key, int hash) {
    if (count != 0) { // read-volatile
        HashEntry<K,V> e = getFirst(hash);
        while (e != null) {
            if (e.hash == hash && key.equals(e.key)) {
                V v = e.value;
                if (v != null)
                    return v;
                return readValueUnderLock(e); // recheck
            }
            e = e.next;
        }
    }
    return null;
}

先看第二行代碼,這里對(duì)count進(jìn)行了一次判斷,其中count表示Segment中元素的數(shù)量,我們可以來(lái)看一下count的定義:

transient volatile int count;

可以看到count是volatile的,實(shí)際上這里面利用了volatile的語(yǔ)義:

對(duì)volatile字段的寫入操作happens-before于每一個(gè)后續(xù)的同一個(gè)字段的讀操作。
因?yàn)閷?shí)際上put、remove等操作也會(huì)更新count的值,所以當(dāng)競(jìng)爭(zhēng)發(fā)生的時(shí)候,volatile的語(yǔ)義可以保證寫操作在讀操作之前,也就保證了寫操作對(duì)后續(xù)的讀操作都是可見的,這樣后面get的后續(xù)操作就可以拿到完整的元素內(nèi)容。

然后,在第三行,調(diào)用了getFirst()來(lái)取得鏈表的頭部:

HashEntry<K,V> getFirst(int hash) {
    HashEntry<K,V>[] tab = table;
    return tab[hash & (tab.length - 1)];
}

同樣,這里也是用位操作來(lái)確定鏈表的頭部,hash值和HashTable的長(zhǎng)度減一做與操作,最后的結(jié)果就是hash值的低n位,其中n是HashTable的長(zhǎng)度以2為底的結(jié)果。

在確定了鏈表的頭部以后,就可以對(duì)整個(gè)鏈表進(jìn)行遍歷,看第4行,取出key對(duì)應(yīng)的value的值,如果拿出的value的值是null,則可能這個(gè)key,value對(duì)正在put的過(guò)程中,如果出現(xiàn)這種情況,那么就加鎖來(lái)保證取出的value是完整的,如果不是null,則直接返回value。

ConcurrentHashMap的put操作

看完了get操作,再看下put操作,put操作的前面也是確定Segment的過(guò)程,這里不再贅述,直接看關(guān)鍵的segment的put方法:

V put(K key, int hash, V value, boolean onlyIfAbsent) {
    lock();
    try {
        int c = count;
        if (c++ > threshold) // ensure capacity
            rehash();
        HashEntry<K,V>[] tab = table;
        int index = hash & (tab.length - 1);
        HashEntry<K,V> first = tab[index];
        HashEntry<K,V> e = first;
        while (e != null && (e.hash != hash || !key.equals(e.key)))
            e = e.next;
  
        V oldValue;
        if (e != null) {
            oldValue = e.value;
            if (!onlyIfAbsent)
                e.value = value;
        }
        else {
            oldValue = null;
            ++modCount;
            tab[index] = new HashEntry<K,V>(key, hash, first, value);
            count = c; // write-volatile
        }
        return oldValue;
    } finally {
        unlock();
    }
}

首先對(duì)Segment的put操作是加鎖完成的,然后在第五行,如果Segment中元素的數(shù)量超過(guò)了閾值(由構(gòu)造函數(shù)中的loadFactor算出)這需要進(jìn)行對(duì)Segment擴(kuò)容,并且要進(jìn)行rehash,關(guān)于rehash的過(guò)程大家可以自己去了解,這里不詳細(xì)講了。

第8和第9行的操作就是getFirst的過(guò)程,確定鏈表頭部的位置。

第11行這里的這個(gè)while循環(huán)是在鏈表中尋找和要put的元素相同key的元素,如果找到,就直接更新更新key的value,如果沒(méi)有找到,則進(jìn)入21行這里,生成一個(gè)新的HashEntry并且把它加到整個(gè)Segment的頭部,然后再更新count的值。

ConcurrentHashMap的remove操作

Remove操作的前面一部分和前面的get和put操作一樣,都是定位Segment的過(guò)程,然后再調(diào)用Segment的remove方法:

V remove(Object key, int hash, Object value) {
    lock();
    try {
        int c = count - 1;
        HashEntry<K,V>[] tab = table;
        int index = hash & (tab.length - 1);
        HashEntry<K,V> first = tab[index];
        HashEntry<K,V> e = first;
        while (e != null && (e.hash != hash || !key.equals(e.key)))
            e = e.next;
  
        V oldValue = null;
        if (e != null) {
            V v = e.value;
            if (value == null || value.equals(v)) {
                oldValue = v;
                // All entries following removed node can stay
                // in list, but all preceding ones need to be
                // cloned.
                ++modCount;
                HashEntry<K,V> newFirst = e.next;
                for (HashEntry<K,V> p = first; p != e; p = p.next)
                    newFirst = new HashEntry<K,V>(p.key, p.hash,
                                                  newFirst, p.value);
                tab[index] = newFirst;
                count = c; // write-volatile
            }
        }
        return oldValue;
    } finally {
        unlock();
    }
}

首先remove操作也是確定需要?jiǎng)h除的元素的位置,不過(guò)這里刪除元素的方法不是簡(jiǎn)單地把待刪除元素的前面一個(gè)元素的next指向后面一個(gè)就完事了,我們之前已經(jīng)說(shuō)過(guò)HashEntry中的next是final的,一經(jīng)賦值以后就不可修改,在定位到待刪除元素的位置以后,程序就將待刪除元素前面的那一些元素全部復(fù)制一遍,然后再一個(gè)一個(gè)重新接到鏈表上去,看一下下面這一幅圖來(lái)了解這個(gè)過(guò)程:

image.png

假設(shè)鏈表中原來(lái)的元素如上圖所示,現(xiàn)在要?jiǎng)h除元素3,那么刪除元素3以后的鏈表就如下圖所示:

image.png

ConcurrentHashMap的size操作

如果我們要統(tǒng)計(jì)整個(gè)ConcurrentHashMap里元素的大小,就必須統(tǒng)計(jì)所有Segment里元素的大小后求和。Segment里的全局變量count是一個(gè)volatile變量,那么在多線程場(chǎng)景下,我們是不是直接把所有Segment的count相加就可以得到整個(gè)ConcurrentHashMap大小了呢?不是的,雖然相加時(shí)可以獲取每個(gè)Segment的count的最新值,但是拿到之后可能累加前使用的count發(fā)生了變化,那么統(tǒng)計(jì)結(jié)果就不準(zhǔn)了。所以最安全的做法,是在統(tǒng)計(jì)size的時(shí)候把所有Segment的put,remove和clear方法全部鎖住,但是這種做法顯然非常低效。

因?yàn)樵诶奂觕ount操作過(guò)程中,之前累加過(guò)的count發(fā)生變化的幾率非常小,所以ConcurrentHashMap的做法是先嘗試2次通過(guò)不鎖住Segment的方式來(lái)統(tǒng)計(jì)各個(gè)Segment大小,如果統(tǒng)計(jì)的過(guò)程中,容器的count發(fā)生了變化,則再采用加鎖的方式來(lái)統(tǒng)計(jì)所有Segment的大小。

那么ConcurrentHashMap是如何判斷在統(tǒng)計(jì)的時(shí)候容器是否發(fā)生了變化呢?使用modCount變量,在put , remove和clear方法里操作元素前都會(huì)將變量modCount進(jìn)行加1,那么在統(tǒng)計(jì)size前后比較modCount是否發(fā)生變化,從而得知容器的大小是否發(fā)生變化。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容