數(shù)據(jù)結(jié)構(gòu)

ConcurrentHashMap 實(shí)現(xiàn)并發(fā)操作的原理
使用了鎖分段技術(shù):ConcurrentHashMap持有一組鎖(segment[]),并將數(shù)據(jù)盡可能分散在不同的鎖段中(即,每個(gè)鎖只會(huì)控制部分的數(shù)據(jù)HashEntry[])。這樣如果寫(xiě)操作的數(shù)據(jù)分布在不同的鎖中,那么寫(xiě)操作將可并行操作。因此來(lái)實(shí)現(xiàn)一定數(shù)量(即,鎖數(shù)量)并發(fā)線程修改。
同時(shí)通過(guò)Unsafe.putOrderedObject、UNSAFE.getObjectVolatile(??這兩個(gè)方法很重要,下文會(huì)介紹)來(lái)操作segment[]、HashEntry[]的元素使得在提升了性能的情況下在并發(fā)環(huán)境下依舊能獲取到最新的數(shù)據(jù),同時(shí)HashEntry的value為volatile屬性,從而實(shí)現(xiàn)不加鎖的進(jìn)行并發(fā)的讀操作,并且對(duì)該并發(fā)量并無(wú)限制。
注意,中不使用volatile的屬性來(lái)實(shí)現(xiàn)segment[]和HashEntry[]在多線程間的可見(jiàn)性。因?yàn)槿绻切薷牟僮?,則在釋放鎖的時(shí)候就會(huì)將當(dāng)前線程緩存中的數(shù)據(jù)寫(xiě)到主存中,所以就無(wú)需在修改操作的過(guò)程中因修改volatile屬性字段而頻繁的寫(xiě)線程內(nèi)存數(shù)據(jù)到主存中。
源碼解析
重要屬性
//散列映射表的默認(rèn)初始容量為 16。
static final int DEFAULT_INITIAL_CAPACITY = 16;
//散列映射表的默認(rèn)裝載因子為 0.75,用于表示segment中包含的HashEntry元素的個(gè)數(shù)與HashEntry[]數(shù)組長(zhǎng)度的比值。當(dāng)某個(gè)segment中包含的HashEntry元素的個(gè)數(shù)超過(guò)了HashEntry[]數(shù)組的長(zhǎng)度與裝載因子的乘積時(shí),將觸發(fā)擴(kuò)容操作。
static final float DEFAULT_LOAD_FACTOR = 0.75f;
//散列表的默認(rèn)并發(fā)級(jí)別為 16。該值表示segment[]數(shù)組的長(zhǎng)度,也就是鎖的總數(shù)。
static final int DEFAULT_CONCURRENCY_LEVEL = 16;
//散列表的最大容量
static final int MAXIMUM_CAPACITY = 1 << 30;
//segment中HashEntry[]數(shù)組最小長(zhǎng)度
static final int MIN_SEGMENT_TABLE_CAPACITY = 2;
//散列表的最大段數(shù),也就是segment[]數(shù)組的最大長(zhǎng)度
static final int MAX_SEGMENTS = 1 << 16; // slightly conservative
//在執(zhí)行size()和containsValue(value)操作時(shí),ConcurrentHashMap的做法是先嘗試 RETRIES_BEFORE_LOCK 次( 即,2次 )通過(guò)不鎖住segment的方式來(lái)統(tǒng)計(jì)、查詢各個(gè)segment,如果2次執(zhí)行過(guò)程中,容器的modCount發(fā)生了變化,則再采用加鎖的方式來(lái)操作所有的segment
static final int RETRIES_BEFORE_LOCK = 2;
//segmentMask用于定位segment在segment[]數(shù)組中的位置。segmentMask是與運(yùn)算的掩碼,等于segment[]數(shù)組size減1,掩碼的二進(jìn)制各個(gè)位的值都是1( 因?yàn)?,?shù)組長(zhǎng)度總是2^N )。
final int segmentMask;
//segmentShift用于決定H(key)參與segmentMask與運(yùn)算的位數(shù)(高位),這里指在從segment[]數(shù)組定位segment:通過(guò)key的哈希結(jié)果的高位與segmentMask進(jìn)行與運(yùn)算哈希的結(jié)果。(詳見(jiàn)下面舉例)
final int segmentShift;
//Segment 類繼承于 ReentrantLock 類,從而使得 Segment 對(duì)象能充當(dāng)鎖的角色。
final ConcurrentHashMap.Segment<K, V>[] segments;
重要對(duì)象
- ConcurrentHashMap.Segment
Segment 類繼承于 ReentrantLock 類,從而使得 Segment 對(duì)象能充當(dāng)鎖的角色,并且是一個(gè)可重入鎖。每個(gè) Segment 對(duì)象維護(hù)其包含的若干個(gè)桶(即,HashEntry[])。
//最大自旋次數(shù),若是單核則為1,多核則為64。該字段用于scanAndLockForPut、scanAndLock方法
static final int MAX_SCAN_RETRIES =
Runtime.getRuntime().availableProcessors() > 1 ? 64 : 1;
/**
* table 是由 HashEntry 對(duì)象組成的數(shù)組
* 如果散列時(shí)發(fā)生碰撞,碰撞的 HashEntry 對(duì)象就以鏈表的形式鏈接成一個(gè)鏈表
* table 數(shù)組的元素代表散列映射表的一個(gè)桶
* 每個(gè) table 守護(hù)整個(gè) ConcurrentHashMap 數(shù)據(jù)總數(shù)的一部分
* 如果并發(fā)級(jí)別為 16,table 則維護(hù) ConcurrentHashMap 數(shù)據(jù)總數(shù)的 1/16
*/
transient volatile HashEntry<K,V>[] table;
//segment中HashEntry的總數(shù)。 PS:注意JDK 7中該字段不是volatile的
transient int count;
//segment中數(shù)據(jù)被更新的次數(shù)
transient int modCount;
//當(dāng)table中包含的HashEntry元素的個(gè)數(shù)超過(guò)本變量值時(shí),觸發(fā)table的擴(kuò)容
transient int threshold;
//裝載因子
final float loadFactor;
- ConcurrentHashMap.HashEntry
HashEntry封裝了key-value對(duì),是一個(gè)單向鏈表結(jié)構(gòu),每個(gè)HashEntry節(jié)點(diǎn)都維護(hù)著next HashEntry節(jié)點(diǎn)的引用。
static final class HashEntry<K,V>
final int hash;
final K key;
volatile V value;
//HashEntry鏈表中的下一個(gè)entry。PS:JDK 7中該字段不是final的,意味著該字段可修改,而且也確實(shí)在remove方法中對(duì)該地段進(jìn)行了修改
volatile HashEntry<K,V> next;
構(gòu)造方法
public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
if (concurrencyLevel > MAX_SEGMENTS)
concurrencyLevel = MAX_SEGMENTS;
// Find power-of-two sizes best matching arguments
int sshift = 0;
int ssize = 1;
while (ssize < concurrencyLevel) {
++sshift;
ssize <<= 1;
}
this.segmentShift = 32 - sshift;
this.segmentMask = ssize - 1;
if (initialCapacity > MAXIMUM_CAPACITY)
initialCapacity = MAXIMUM_CAPACITY;
int c = initialCapacity / ssize;
if (c * ssize < initialCapacity)
++c;
int cap = MIN_SEGMENT_TABLE_CAPACITY;
while (cap < c)
cap <<= 1;
// create segments and segments[0]
Segment<K,V> s0 =
new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
(HashEntry<K,V>[])new HashEntry[cap]);
Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
this.segments = ss;
}
a) 限制并發(fā)等級(jí)最大為MAX_SEGMENTS,即2^16。
b) 計(jì)算真實(shí)的并發(fā)等級(jí)ssize,必須是2的N次方,即 ssize( actual_concurrency_level ) >= concurrency_level。
舉例:concurrencyLevel等于14,15或16,ssize都會(huì)等于16,即容器里鎖的個(gè)數(shù)也是16。
Q:為什么數(shù)組的長(zhǎng)度都需要設(shè)計(jì)成2^N次方了?
A:這是因?yàn)樵卦跀?shù)組中的定位主要是通過(guò)H(key) & (數(shù)組長(zhǎng)度 - 1)方式實(shí)現(xiàn)的,這樣我們稱(數(shù)組長(zhǎng)度 - 1)為element_mask。那么假設(shè)有一個(gè)長(zhǎng)度為16和長(zhǎng)度為15的數(shù)組,他們element_mask分別為15和14。即array_16_element_mask = 15(二進(jìn)制”1111”);array_15_element_mask = 14(二進(jìn)制”1110”)。你會(huì)發(fā)現(xiàn)所以和”1110”進(jìn)行與操作結(jié)果的最后一位都是0,這就導(dǎo)致數(shù)組的’0001’、’0011’、’1001’、’0101’、’1101’、’0111’位置都無(wú)法存放數(shù)據(jù),這就導(dǎo)致了數(shù)組空間的浪費(fèi),以及數(shù)據(jù)沒(méi)有得到更好的分散。而使用array_16_element_mask = 15(二進(jìn)制”1111”)則不會(huì)有該問(wèn)題,數(shù)據(jù)可以分散到數(shù)組個(gè)每個(gè)索引位置。
c) sshift表示在通過(guò)H(key)來(lái)定位segment的index時(shí),參與到segmentMask掩碼與運(yùn)算的H(key)高位位數(shù)。
d) 計(jì)算每個(gè)Segment中HashEntry[]數(shù)組的長(zhǎng)度,根據(jù)數(shù)據(jù)均勻分配到各個(gè)segment的HashEntry[]中,并且數(shù)組長(zhǎng)度必須是2的N次方的思路來(lái)獲取。注意,HashEntry[]數(shù)組的長(zhǎng)度最小為2。
e) 創(chuàng)建一個(gè)Segment對(duì)象,將新建的Segment對(duì)象放入Segment[]數(shù)組中index為0的位置。這里只先構(gòu)建了Segnemt[]數(shù)組的一個(gè)元素,則其他index的元素在被使用到時(shí)通過(guò)ensureSegment(index)方法來(lái)構(gòu)建。
重要方法
- segment的定位
int h = hash(key);
long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u))
通過(guò)key的二次哈希運(yùn)算后再進(jìn)行移位和與運(yùn)算得到key在segment[]數(shù)組中所對(duì)應(yīng)的segment
a) hash(key)
private int hash(Object k) {
int h = hashSeed;
if ((0 != h) && (k instanceof String)) {
return sun.misc.Hashing.stringHash32((String) k);
}
h ^= k.hashCode();
// Spread bits to regularize both segment and index locations,
// using variant of single-word Wang/Jenkins hash.
h += (h << 15) ^ 0xffffcd7d;
h ^= (h >>> 10);
h += (h << 3);
h ^= (h >>> 6);
h += (h << 2) + (h << 14);
return h ^ (h >>> 16);
}
??這里之所以需要將key.hashCode再進(jìn)行一次hash計(jì)算,是為了減少哈希沖突,使元素能夠均勻的分布在不同的Segment上,從而提高容器的存取效率。
b) 取hash(key)結(jié)果的(32 - segmentShift)位數(shù)的高位和segmentMask掩碼進(jìn)行與運(yùn)算。(其實(shí),與運(yùn)算時(shí),就是“hash(key)的高segmentMask(十進(jìn)制值)位"于“segmentMask的二進(jìn)制值”進(jìn)行與操作,此時(shí)進(jìn)行與操作的兩個(gè)數(shù)的有效二進(jìn)制位數(shù)是一樣的了。)
c) 將b)的結(jié)果j進(jìn)行 (j << SSHIFT) + SBASE 以得到key在segement[]數(shù)組中的位置
舉例:假如哈希的質(zhì)量差到極點(diǎn),那么所有的元素都在一個(gè)Segment中,不僅存取元素緩慢,分段鎖也會(huì)失去意義。我做了一個(gè)測(cè)試,不通過(guò)再哈希而直接執(zhí)行哈希計(jì)算。
System.out.println(Integer.parseInt("0001111", 2) & 15);
System.out.println(Integer.parseInt("0011111", 2) & 15);
System.out.println(Integer.parseInt("0111111", 2) & 15);
System.out.println(Integer.parseInt("1111111", 2) & 15);
計(jì)算后輸出的哈希值全是15,通過(guò)這個(gè)例子可以發(fā)現(xiàn)如果不進(jìn)行再哈希,哈希沖突會(huì)非常嚴(yán)重,因?yàn)橹灰臀灰粯?,無(wú)論高位是什么數(shù),其哈希值總是一樣。我們?cè)侔焉厦娴亩M(jìn)制數(shù)據(jù)進(jìn)行再哈希后結(jié)果如下,為了方便閱讀,不足32位的高位補(bǔ)了0,每隔四位用豎線分割下。
| 0100 | 0111 | 0110 | 0111 | 1101 | 1010 | 0100 | 1110 |
|---|---|---|---|---|---|---|---|
| 1111 | 0111 | 0100 | 0011 | 0000 | 0001 | 1011 | 1000 |
| 0111 | 0111 | 0110 | 1001 | 0100 | 0110 | 0011 | 1110 |
| 1000 | 0011 | 0000 | 0000 | 1100 | 1000 | 0001 | 1010 |
可以發(fā)現(xiàn)每一位的數(shù)據(jù)都散列開(kāi)了,通過(guò)這種再哈希能讓數(shù)字的每一位都能參加到哈希運(yùn)算當(dāng)中,從而減少哈希沖突。
- HashEntry定位
int h = hash(key);
((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE)
主要通過(guò)對(duì)key進(jìn)行二次hash運(yùn)算,再講哈希結(jié)果和HashEntry[]的長(zhǎng)度掩碼進(jìn)行與運(yùn)算得到key所對(duì)應(yīng)的HashEntry在數(shù)組中的索引。HashEntry的定位和Segment的定位方式很像,但是HashEntry少了將hash(key)的結(jié)果進(jìn)行掩碼取高位后再與數(shù)組長(zhǎng)度與操作,而是直接將hash(key)的結(jié)果和數(shù)組長(zhǎng)度的掩碼進(jìn)行與操作。其目的是避免兩次哈希后的值一樣,導(dǎo)致元素雖然在Segment里散列開(kāi)了,但是卻沒(méi)有在HashEntry里散列開(kāi)( 也就是說(shuō),如果Segment和HashEntry的定位方式一樣,那么到同一個(gè)Segment的key都會(huì)落到該segment中的同一個(gè)HashEntry了 )。
Unsafe類中的putOrderedObject、getObjectVolatile方法
getObjectVolatile:使用volatile讀的語(yǔ)義獲取數(shù)據(jù),也就是通過(guò)getObjectVolatile獲取數(shù)據(jù)時(shí)回去主存中獲取最新的數(shù)據(jù)放到線程的緩存中,這能保證正確的獲取最新的數(shù)據(jù)。
putOrderedObject:為了控制特定條件下的指令重排序和內(nèi)存可見(jiàn)性問(wèn)題,Java編譯器使用一種叫內(nèi)存屏障(Memory Barrier,或叫做內(nèi)存柵欄,Memory Fence)的CPU指令來(lái)禁止指令重排序。java中volatile寫(xiě)入使用了內(nèi)存屏障中的LoadStore屏障規(guī)則,對(duì)于這樣的語(yǔ)句Load1; LoadStore; Store2,在Store2及后續(xù)寫(xiě)入操作被刷出前,保證Load1要讀取的數(shù)據(jù)被讀取完畢。volatile的寫(xiě)所插入的storeLoad是一個(gè)耗時(shí)的操作,因此出現(xiàn)了一個(gè)對(duì)volatile寫(xiě)的升級(jí)版本,利用lazySet方法進(jìn)行性能優(yōu)化,在實(shí)現(xiàn)上對(duì)volatile的寫(xiě)只會(huì)在之前插入StoreStore屏障,對(duì)于這樣的語(yǔ)句Store1; StoreStore; Store2,在Store2及后續(xù)寫(xiě)入操作執(zhí)行前,保證Store1的寫(xiě)入操作對(duì)其它處理器可見(jiàn),也就是按順序的寫(xiě)入。UNSAFE.putOrderedObject正是提供了這樣的語(yǔ)義,避免了寫(xiě)寫(xiě)指令重排序,但不保證內(nèi)存可見(jiàn)性,因此讀取時(shí)需借助volatile讀保證可見(jiàn)性。ensureSegment(k)
private Segment<K,V> ensureSegment(int k) {
final Segment<K,V>[] ss = this.segments;
long u = (k << SSHIFT) + SBASE; // raw offset
Segment<K,V> seg;
if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) {
Segment<K,V> proto = ss[0]; // use segment 0 as prototype
int cap = proto.table.length;
float lf = proto.loadFactor;
int threshold = (int)(cap * lf);
HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap];
if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
== null) { // recheck
Segment<K,V> s = new Segment<K,V>(lf, threshold, tab);
while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
== null) {
if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))
break;
}
}
}
return seg;
}
根據(jù)計(jì)算得到的index從segment[]數(shù)組中獲取segment,如果segment不存在,則創(chuàng)建一個(gè)segment并通過(guò)CAS算法放入segment[]數(shù)組中。這里的獲取和插入分別通過(guò)UNSAGE.getObjectVolatile(??保證獲取segment[]最新數(shù)據(jù))和UNSAFE.cmpareAndSwapObject(??保證原子性的將新建的segment插入segment[]數(shù)組,并使其他線程可見(jiàn))實(shí)現(xiàn),并不直接對(duì)segment[]數(shù)組操作。
- HashEntry<K,V> scanAndLockForPut(K key, int hash, V value)
private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
HashEntry<K,V> first = entryForHash(this, hash);
HashEntry<K,V> e = first;
HashEntry<K,V> node = null;
int retries = -1; // negative while locating node
while (!tryLock()) {
HashEntry<K,V> f; // to recheck first below
if (retries < 0) {
if (e == null) {
if (node == null) // speculatively create node
node = new HashEntry<K,V>(hash, key, value, null);
retries = 0;
}
else if (key.equals(e.key))
retries = 0;
else
e = e.next;
}
else if (++retries > MAX_SCAN_RETRIES) {
lock();
break;
}
else if ((retries & 1) == 0 &&
(f = entryForHash(this, hash)) != first) {
e = first = f; // re-traverse if entry changed
retries = -1;
}
}
return node;
}
在put操作嘗試加鎖沒(méi)成功時(shí),不是直接進(jìn)入等待狀態(tài),而是調(diào)用??scanAndLockForPut()方法,該方法實(shí)現(xiàn)了:
a) 首次進(jìn)入該方法,重試次數(shù)retries初始值為-1。
b) 若retries為-1,則判斷查詢key對(duì)應(yīng)的HashEntry節(jié)點(diǎn)鏈中是否已經(jīng)存在了該節(jié)點(diǎn),如果還沒(méi)則預(yù)先創(chuàng)建一個(gè)新節(jié)點(diǎn)。然后將retries=0;
c) 然后嘗試MAX_SCAN_RETRIES次獲取鎖( 自旋鎖 ),如果依舊沒(méi)能成功獲得鎖,則進(jìn)入等待狀態(tài)(互斥鎖)。
JDK7嘗試使用自旋鎖來(lái)提升性能,好處在于:自旋鎖當(dāng)前的線程不會(huì)掛起,而是一直處于running狀態(tài),這樣一旦能夠獲得鎖時(shí)就key在不進(jìn)行上下文切換的情況下獲取到鎖。
d) 如果在嘗試MAX_SCAN_RETRIES次獲取鎖的過(guò)程,key對(duì)應(yīng)的entry發(fā)送了變化,則將嘗試次數(shù)重置為-1,從第b)步驟重新開(kāi)始
- void scanAndLock(Object key, int hash)
private void scanAndLock(Object key, int hash) {
// similar to but simpler than scanAndLockForPut
HashEntry<K,V> first = entryForHash(this, hash);
HashEntry<K,V> e = first;
int retries = -1;
while (!tryLock()) {
HashEntry<K,V> f;
if (retries < 0) {
if (e == null || key.equals(e.key))
retries = 0;
else
e = e.next;
}
else if (++retries > MAX_SCAN_RETRIES) {
lock();
break;
}
else if ((retries & 1) == 0 &&
(f = entryForHash(this, hash)) != first) {
e = first = f;
retries = -1;
}
}
}
在replace、remove操作嘗試加鎖沒(méi)成功時(shí),不是直接進(jìn)入等待狀態(tài),而是調(diào)用??scanAndLock()方法。該方法是實(shí)現(xiàn)和scanAndLockForPut()差不了多少,主要的區(qū)別在于scanAndLockForPut()方法在key對(duì)應(yīng)entry不存在時(shí)是不會(huì)去創(chuàng)建一個(gè)HashEntry對(duì)象的。
- V get(Object key)
public V get(Object key) {
Segment<K,V> s; // manually integrate access methods to reduce overhead
HashEntry<K,V>[] tab;
int h = hash(key);
long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
(tab = s.table) != null) {
for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
(tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
e != null; e = e.next) {
K k;
if ((k = e.key) == key || (e.hash == h && key.equals(k)))
return e.value;
}
}
return null;
}
在JDK 7中g(shù)et的實(shí)現(xiàn)原理已經(jīng)和JDK 6不同了,JDK 6通過(guò)volatile實(shí)現(xiàn)多線程間內(nèi)存的可見(jiàn)性。而JDK 7為了提升性能,用UNSAFE.getObjectVolatile(...)來(lái)獲取segment[]數(shù)組和HashEntry[]數(shù)組中對(duì)應(yīng)index的最新值。同時(shí)值得說(shuō)明的是,當(dāng)volatile引用一個(gè)數(shù)組時(shí),數(shù)組中的元素是不具有volatile特性的,所以,也需要通過(guò)UNSAFE.getObjectVolatile(…)來(lái)獲取數(shù)組中真實(shí)的數(shù)據(jù)。
- put操作
public V put(K key, V value) {
Segment<K,V> s;
if (value == null)
throw new NullPointerException();
int hash = hash(key);
int j = (hash >>> segmentShift) & segmentMask;
if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck
(segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment
s = ensureSegment(j);
return s.put(key, hash, value, false);
}
a) 通過(guò)key算出對(duì)應(yīng)的segment在segment[]中的位置,如果對(duì)應(yīng)的segment不存在,則創(chuàng)建。
b) 將key、value插入到segment中對(duì)應(yīng)的HashEntry中
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
HashEntry<K,V> node = tryLock() ? null :
scanAndLockForPut(key, hash, value);
V oldValue;
try {
HashEntry<K,V>[] tab = table;
int index = (tab.length - 1) & hash;
HashEntry<K,V> first = entryAt(tab, index);
for (HashEntry<K,V> e = first;;) {
if (e != null) {
K k;
if ((k = e.key) == key ||
(e.hash == hash && key.equals(k))) {
oldValue = e.value;
if (!onlyIfAbsent) {
e.value = value;
++modCount;
}
break;
}
e = e.next;
}
else {
if (node != null)
node.setNext(first);
else
node = new HashEntry<K,V>(hash, key, value, first);
int c = count + 1;
if (c > threshold && tab.length < MAXIMUM_CAPACITY)
rehash(node);
else
setEntryAt(tab, index, node);
++modCount;
count = c;
oldValue = null;
break;
}
}
} finally {
unlock();
}
return oldValue;
}
a) 嘗試獲得鎖,如果失敗,則調(diào)用scanAndLockForPut(...)通過(guò)自旋等待的方式獲得鎖。注意,這里鎖操作鎖的只是當(dāng)前這個(gè)segment,而不會(huì)影響segment[]數(shù)組中其他的segment對(duì)象的寫(xiě)操作。這是ConcurrentHashMap實(shí)現(xiàn)并發(fā)寫(xiě)操作的精髓所在。通過(guò)分段鎖來(lái)支持一定并發(fā)量的寫(xiě)操作,并通過(guò)volatile以及UNSAFE.getObjectVolatile、UNSAFE.putOrderedObject來(lái)實(shí)現(xiàn)不加鎖的讀操作,也就是支持任何并發(fā)量的讀操作。
b) 計(jì)算key應(yīng)插入的HashEntry在HashEntry[]數(shù)組的index,并通過(guò)UNSAFE.getObjectVolatile(...)方式獲取最新的到HashEntry對(duì)象
c) 判斷HashEntry鏈中是否已經(jīng)存在該key了,如果存在則將key的value替換成新值,并將modCount加1
d) 如果HashEntry鏈中不存在該key,則將key-value出入到HashEntry鏈頭處,并將count加1,但此時(shí)count還未更新到segment中。
e) 如果在count加1后發(fā)現(xiàn)目前HashEntry鏈表長(zhǎng)度以及達(dá)到了閾值并且HashEntry的鏈表長(zhǎng)度小于限制的最大長(zhǎng)度,則會(huì)進(jìn)行HashEntry的擴(kuò)容操作。注意,在JDK 7中是確定當(dāng)前put操作是會(huì)加入一個(gè)新節(jié)點(diǎn)情況下才會(huì)觸發(fā)擴(kuò)容操作,而在JDK 6中,可能存在put操作只是替換一個(gè)已經(jīng)存在的key的value值的情況下也會(huì)觸發(fā)擴(kuò)容操作。
f) 如果count加1未觸發(fā)閾值,則通過(guò)UNSAFE.putOrderedObject(…)方式將最新的HashEntry更新到HashEntry[]數(shù)組中。
g) 更新segment中的modCount、count值
h) 釋放鎖。釋放鎖的操作會(huì)將當(dāng)前線程緩存里的數(shù)據(jù)寫(xiě)到主存中。
- rehash(HashEntry<K,V> node)
private void rehash(HashEntry<K,V> node) {
/*
* Reclassify nodes in each list to new table. Because we
* are using power-of-two expansion, the elements from
* each bin must either stay at same index, or move with a
* power of two offset. We eliminate unnecessary node
* creation by catching cases where old nodes can be
* reused because their next fields won't change.
* Statistically, at the default threshold, only about
* one-sixth of them need cloning when a table
* doubles. The nodes they replace will be garbage
* collectable as soon as they are no longer referenced by
* any reader thread that may be in the midst of
* concurrently traversing table. Entry accesses use plain
* array indexing because they are followed by volatile
* table write.
*/
HashEntry<K,V>[] oldTable = table;
int oldCapacity = oldTable.length;
int newCapacity = oldCapacity << 1;
threshold = (int)(newCapacity * loadFactor);
HashEntry<K,V>[] newTable =
(HashEntry<K,V>[]) new HashEntry[newCapacity];
int sizeMask = newCapacity - 1;
for (int i = 0; i < oldCapacity ; i++) {
HashEntry<K,V> e = oldTable[i];
if (e != null) {
HashEntry<K,V> next = e.next;
int idx = e.hash & sizeMask;
if (next == null) // Single node on list
newTable[idx] = e;
else { // Reuse consecutive sequence at same slot
HashEntry<K,V> lastRun = e;
int lastIdx = idx;
for (HashEntry<K,V> last = next;
last != null;
last = last.next) {
int k = last.hash & sizeMask;
if (k != lastIdx) {
lastIdx = k;
lastRun = last;
}
}
newTable[lastIdx] = lastRun;
// Clone remaining nodes
for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
V v = p.value;
int h = p.hash;
int k = h & sizeMask;
HashEntry<K,V> n = newTable[k];
newTable[k] = new HashEntry<K,V>(h, p.key, v, n);
}
}
}
}
int nodeIndex = node.hash & sizeMask; // add the new node
node.setNext(newTable[nodeIndex]);
newTable[nodeIndex] = node;
table = newTable;
}
當(dāng)HashEntry的數(shù)量達(dá)到閾值時(shí)就會(huì)觸發(fā)HashEntry[]數(shù)組的擴(kuò)容操作
a) 創(chuàng)建new HashEntry[]數(shù)組,new HashEntry[]數(shù)組的容量為old HashEntry的2倍
b) 設(shè)置新的閾值
c) 將old HashEntry[]數(shù)組中的內(nèi)容放入new HashEntry[]中,這并不是盲目的將元素一一取出然后計(jì)算元素在new HashEntry的位置,然后插入。這里Doug Lea做了一些優(yōu)化。
如果old HashEntry[]數(shù)組的元素HashEntry鏈表,若該HashEntry鏈表的頭節(jié)點(diǎn)不存在next節(jié)點(diǎn),即說(shuō)明該HashEntry鏈表是個(gè)單節(jié)點(diǎn),則直接將HashEntry插入到new HashEntry[]數(shù)組對(duì)應(yīng)的位置中。
因?yàn)閚ew HashEntry[]的length是old HashEntry[]的2倍,所以對(duì)應(yīng)的new sizeMask比old sizeMask多了old HashEntry[] length的大小( 比如,old_HashEntry_array_length為8,則old sizeMask為’0000 0111’;new_HashEntry_array_length為16,則new sizeMask為’0000 1111’)。所以元素在new HashEntry[]的new index要么和old index一樣,要么就是old_index + old_HashEntry_array_length。因此我們可通過(guò)對(duì)節(jié)點(diǎn)的復(fù)用來(lái)減少不必要的節(jié)點(diǎn)創(chuàng)建,通過(guò)計(jì)算每個(gè)HashEntry鏈表中每個(gè)entry的new index值,如果存在從某個(gè)entry開(kāi)始到該HashEntry鏈表末尾的所有entrys,它們的new index值都一樣,那么就該entry直接插入到new HashEntry[newIndex]中,當(dāng)然最壞的請(qǐng)求就是該entry就是HashEntry鏈的最后一個(gè)entry。然后只需重建HashEntry中該entry之前的到鏈表頭的entry節(jié)點(diǎn),分別將新構(gòu)建的entry插入到new HashEntry[]中。
再者,經(jīng)統(tǒng)計(jì),在使用默認(rèn)閾值的情況下,一般只有1/6的節(jié)點(diǎn)需要重新構(gòu)建最后將當(dāng)前操作新構(gòu)建的節(jié)點(diǎn)加入到new HashEntry[]數(shù)組中
d) old HashEntry如果沒(méi)有其他讀線程操作引用時(shí),將會(huì)盡快被垃圾回收。
e) 擴(kuò)容操作因?yàn)橐匦聵?gòu)建正整個(gè)HashEntry[]數(shù)組,所以不需要通過(guò)UNSAFE.putOrderedObject(...)方式將元素插入一個(gè)已經(jīng)存在的HashEntry[]中,而是直接通過(guò)索引操作插入到new HashEntry[]數(shù)組就好,最后我們會(huì)將new HashEntry[]直接賦值給volatile tables字段,這樣就可以保證new HashEntry[]對(duì)其他線程可見(jiàn)了remove操作
public V remove(Object key) {
int hash = hash(key);
Segment<K,V> s = segmentForHash(hash);
return s == null ? null : s.remove(key, hash, null);
}
a) 根據(jù)key計(jì)算出該key對(duì)應(yīng)的segment在segment[]數(shù)組中的index,并獲取該segment。
b) 將key從該segment中移除
final V remove(Object key, int hash, Object value) {
if (!tryLock())
scanAndLock(key, hash);
V oldValue = null;
try {
HashEntry<K,V>[] tab = table;
int index = (tab.length - 1) & hash;
HashEntry<K,V> e = entryAt(tab, index);
HashEntry<K,V> pred = null;
while (e != null) {
K k;
HashEntry<K,V> next = e.next;
if ((k = e.key) == key ||
(e.hash == hash && key.equals(k))) {
V v = e.value;
if (value == null || value == v || value.equals(v)) {
if (pred == null)
setEntryAt(tab, index, next);
else
pred.setNext(next);
++modCount;
--count;
oldValue = v;
}
break;
}
pred = e;
e = next;
}
} finally {
unlock();
}
return oldValue;
}
a) 嘗試獲得鎖,如果失敗則調(diào)用scanAndLock(...)通過(guò)自旋等待的方式獲得鎖。
b) 獲取key鎖對(duì)應(yīng)的HashEntry鏈表,并在該HashEntry中找到key對(duì)應(yīng)entry節(jié)點(diǎn)
c) 如果key對(duì)應(yīng)的節(jié)點(diǎn)是在HashEntry鏈表頭,則直接將key的next節(jié)點(diǎn)通過(guò)UNSAFE.putOrderedObject的方式這是為對(duì)HashEntry[]數(shù)組中對(duì)應(yīng)的位置,即使得next節(jié)點(diǎn)稱為成為鏈表頭。
d) 如果key不是HashEntry的鏈表頭節(jié)點(diǎn),則將key的前一個(gè)節(jié)點(diǎn)的next節(jié)點(diǎn)修改為key的next節(jié)點(diǎn)。額,這么說(shuō)太繞了,舉個(gè)例子吧~
key對(duì)應(yīng)的節(jié)點(diǎn):current_HashEntry;current_HashEntry的前一個(gè)節(jié)點(diǎn):pre_HashEntry;current_HashEntry的下一個(gè)節(jié)點(diǎn):next_HashEntry
刪除前:
pre_HashEntry.next ——> current_HashEntry
current_HashEntry.next ——> next_HashEntry
刪除后:
pre_HashEntry.next ——> next_HashEntry
e) 修改segment屬性:modCount加1,count減1
f) 釋放鎖
- size()
public int size() {
// Try a few times to get accurate count. On failure due to
// continuous async changes in table, resort to locking.
final Segment<K,V>[] segments = this.segments;
int size;
boolean overflow; // true if size overflows 32 bits
long sum; // sum of modCounts
long last = 0L; // previous sum
int retries = -1; // first iteration isn't retry
try {
for (;;) {
if (retries++ == RETRIES_BEFORE_LOCK) {
for (int j = 0; j < segments.length; ++j)
ensureSegment(j).lock(); // force creation
}
sum = 0L;
size = 0;
overflow = false;
for (int j = 0; j < segments.length; ++j) {
Segment<K,V> seg = segmentAt(segments, j);
if (seg != null) {
sum += seg.modCount;
int c = seg.count;
if (c < 0 || (size += c) < 0)
overflow = true;
}
}
if (sum == last)
break;
last = sum;
}
} finally {
if (retries > RETRIES_BEFORE_LOCK) {
for (int j = 0; j < segments.length; ++j)
segmentAt(segments, j).unlock();
}
}
return overflow ? Integer.MAX_VALUE : size;
}
a) 會(huì)先嘗試RETRIES_BEFORE_LOCK次( 即2次 )不加鎖的情況下,將segment[]數(shù)組中每個(gè)segment的count累加,同時(shí)也會(huì)將每個(gè)segment的modCount進(jìn)行累加。如果兩次不加鎖的操作后,modCountSum值是一樣的,這說(shuō)明在這兩次累加segmentcount的過(guò)程中ConcurrentHashMap沒(méi)有發(fā)生結(jié)構(gòu)性變化,那么就直接返回累加的count值
b) 如果在兩次累加segment的count操作期間ConcurrentHashMap發(fā)生了結(jié)構(gòu)性改變,則會(huì)通過(guò)將所有的segment都加鎖,然后重新進(jìn)行count的累加操作。在完成count的累加操作后,釋放所有的鎖。最后返回累加的count值。
c) 注意,如果累加的count值大于了Integer.MAX_VALUE,則返回Integer.MAX_VALUE。
弱一致性
相對(duì)于HashMap的fast-fail,ConcurrentHashMap的迭代器并不會(huì)拋出ConcurrentModificationException異常。這是由于ConcurrentHashMap的讀行為是弱一致性的。
也就是說(shuō),在同時(shí)對(duì)一個(gè)segment進(jìn)行讀線程和寫(xiě)線程操作時(shí),并不保證寫(xiě)操作的行為能被并行允許的讀線程所感知。
比如,當(dāng)一個(gè)寫(xiě)線程和讀線程并發(fā)的對(duì)同一個(gè)key進(jìn)行操作時(shí):寫(xiě)線程在操作一個(gè)put操作,如果這個(gè)時(shí)候put的是一個(gè)已經(jīng)存在的key值,則會(huì)替換該key對(duì)應(yīng)的value值,因?yàn)関alue是volatile屬性的,所以該替換操作時(shí)能立即被讀線程感知的。但如果此時(shí)的put是要新插入一個(gè)entry,則存在兩種情況:①在寫(xiě)線程通過(guò)UNSAFE.putOrderedObject方式將新entry插入到HashEntry鏈表后,讀線程才通過(guò)UNSAFE.getObjectVolatile來(lái)獲取對(duì)應(yīng)的HashEntry鏈表,那么這個(gè)時(shí)候讀線程是能夠獲取到這個(gè)新插入的entry的;②反之,如果讀線程的UNSAFE.getObjectVolatile操作在寫(xiě)線程的UNSAFE.putOrderedObject之前,則就無(wú)法感知到這個(gè)新加入的entry了。
其實(shí)在大多數(shù)并發(fā)的業(yè)務(wù)邏輯下,我們是允許這樣的弱一致性存在的。如果你的業(yè)務(wù)邏輯不允許這樣的弱一致性存在的,你可以考慮對(duì)segment中的HashEntry鏈表的讀操作加鎖,或者將segment改造成讀寫(xiě)鎖模式。但這都將大大降低ConcurrentHashMap的性能并且使得你的程序變得復(fù)雜且難以維護(hù)。或許你該考慮使用其他的存儲(chǔ)模型代替ConcurrentHashMap。
后記
雖然JDK 8已經(jīng)出來(lái)很久了,但是我還是花了很多時(shí)間在JDK 7的ConcurrentHashMap上,一個(gè)很重要的原因是,我認(rèn)為ConcurrentHashMap在并發(fā)模式下的設(shè)計(jì)思想是很值得我們深究和學(xué)習(xí)的,無(wú)論是jdk7相對(duì)于jdk6的各種細(xì)節(jié)和性能上的優(yōu)化,還是jdk8的大改造都是對(duì)并發(fā)編程各種模式很好的學(xué)習(xí)。文章還有很多可以繼續(xù)深入挖掘的點(diǎn),希望在后期的學(xué)習(xí)中能夠繼續(xù)完善~
參考
http://www.infoq.com/cn/articles/ConcurrentHashMap/
http://www.blogjava.net/DLevin/archive/2013/10/18/405030.html
https://my.oschina.net/7001/blog/896587