??最近總是在分析源碼,感覺(jué)源碼也不是想象上的那么難,今天我來(lái)記錄一下我對(duì)ConcurrentHashMap的理解。這里只敢說(shuō)記錄,不敢說(shuō)分析,因?yàn)镃oncurrentHashMap的代碼確實(shí)有點(diǎn)難以理解,本文不對(duì)代碼進(jìn)行死磕,也就是說(shuō),可能不會(huì)對(duì)某一行代碼進(jìn)行死磕,而是對(duì)整個(gè)ConcurrentHashMap類(lèi)進(jìn)行整體的理解。
??本文參考資料:
??1.ConcurrentHashMap源碼分析(JDK8版本)
??2. 死磕 Java 并發(fā):J.U.C 之 Java 并發(fā)容器:ConcurrentHashMap
??3.深入淺出ConcurrentHashMap1.8
??4.深入分析ConcurrentHashMap1.8的擴(kuò)容實(shí)現(xiàn)
??5.方騰飛、魏鵬、程曉明的《Java 并發(fā)編程的藝術(shù)》
1.為什么要使用ConcurrentHashMap?
??其實(shí),樓主都覺(jué)得這個(gè)問(wèn)題問(wèn)的非常傻逼,為什么使用ConcurrentHashMap呢?當(dāng)然是為了線程安全唄。其實(shí)這個(gè)回答是比較籠統(tǒng)的,這里面還有很多的問(wèn)題沒(méi)有涉及到,比如說(shuō),使用最多的HashMap在多線程模型下的缺點(diǎn),傳統(tǒng)線程安全的HashTable的問(wèn)題等等。
(1).線程不安全的HashMap
??我們才學(xué)習(xí)HashMap的時(shí)候,就知道HashMap是線程不安全的,但是不知道HashMap由于多線程會(huì)導(dǎo)致什么問(wèn)題。下面,我們來(lái)看一段代碼:
public class Demo {
public static void main(String[] args) {
final Map<String, String> map = new HashMap<>(2);
Thread t = new Thread(new Runnable() {
@Override
public void run() {
for(int i = 0; i < 100000; i++){
new Thread(new Runnable() {
@Override
public void run() {
map.put(UUID.randomUUID().toString(), "");
}
}).start();
}
}
});
t.start();
try {
t.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
??HashMap在并發(fā)執(zhí)行put操作是會(huì)引起死循環(huán),是因?yàn)槎嗑€程會(huì)導(dǎo)致HashMap的Entry鏈表形成數(shù)據(jù)結(jié)構(gòu),一旦形成環(huán)形數(shù)據(jù)結(jié)構(gòu),Entry的next節(jié)點(diǎn)永遠(yuǎn)不為空,就會(huì)產(chǎn)生死循環(huán)獲取Entry。
(2).效率低下的HashTable
??HashTable使用synchronized關(guān)鍵字來(lái)保證線程安全,但在線程競(jìng)爭(zhēng)激烈的情況下HashTable的效率非常低下。因?yàn)楫?dāng)一個(gè)線程訪問(wèn)HashTable的同步方法,其他線程也訪問(wèn)HashTable的同步方法,會(huì)進(jìn)入阻塞或者輪詢狀態(tài)。如果線程1在使用put進(jìn)行元素添加,線程2不但不能使用put方法添加元素也不能使用get方法來(lái)獲取元素,所以競(jìng)爭(zhēng)越激烈效率越低。
(3).ConcurrentHashMap
??相較于笨重的HashTable,ConcurrentHashMap就顯得非常高效。ConcurrentHashMap降低了鎖的粒度,其中在1.7中,設(shè)置了Segment數(shù)組,來(lái)表示不同數(shù)據(jù)段,在1.8中,取消了Segment數(shù)組,進(jìn)一步降低了鎖的粒度。由于本文是分析1.8的ConcurrentHashMap,所以不對(duì)1.7的版本過(guò)多的解釋。
2.ConcurrentHashMap的模型圖
??1.8在1.7的基礎(chǔ)上進(jìn)一步的優(yōu)化,使得ConcurrentHashMap的鎖的粒度進(jìn)一步降低。我們還是先來(lái)看看ConcurrentHashMap的結(jié)構(gòu)圖:

??我們看到的是,在1.8的內(nèi)部,維持了一個(gè)Node數(shù)組,用來(lái)存儲(chǔ)數(shù)據(jù)。實(shí)際上,ConcurrentHashMap結(jié)構(gòu)與HashMap的結(jié)構(gòu)類(lèi)似,基本結(jié)構(gòu)都是數(shù)組+鏈表+紅黑樹(shù)。但是在ConcurrentHashMap里面,紅黑樹(shù)在Node數(shù)組內(nèi)部存儲(chǔ)的不是一個(gè)TreeNode對(duì)象,而是一個(gè)TreeBin對(duì)象,TreeBin內(nèi)部維持著一個(gè)紅黑樹(shù)。
3.ConcurrentHashMap的內(nèi)部類(lèi)
??簡(jiǎn)單的看過(guò)ConcurrentHashMap的結(jié)構(gòu)圖之后,現(xiàn)在來(lái)了解一下ConcurrentHashMap的內(nèi)部類(lèi),這個(gè)在后面我們理解源碼有一定的幫助。
(1).Node類(lèi)
??Node類(lèi)在ConcurrentHashMap的內(nèi)部類(lèi)是最基本的類(lèi),其中桶里面裝的就是Node元素,還有就是TreeBin、TreeNode等都繼承于Node類(lèi)。我們先來(lái)看看Node的代碼:
static class Node<K,V> implements Map.Entry<K,V> {
final int hash;
final K key;
volatile V val;
volatile Node<K,V> next;
Node(int hash, K key, V val, Node<K,V> next) {
this.hash = hash;
this.key = key;
this.val = val;
this.next = next;
}
public final K getKey() { return key; }
public final V getValue() { return val; }
public final int hashCode() { return key.hashCode() ^ val.hashCode(); }
public final String toString(){ return key + "=" + val; }
public final V setValue(V value) {
throw new UnsupportedOperationException();
}
public final boolean equals(Object o) {
Object k, v, u; Map.Entry<?,?> e;
return ((o instanceof Map.Entry) &&
(k = (e = (Map.Entry<?,?>)o).getKey()) != null &&
(v = e.getValue()) != null &&
(k == key || k.equals(key)) &&
(v == (u = val) || v.equals(u)));
}
/**
* Virtualized support for map.get(); overridden in subclasses.
*/
Node<K,V> find(int h, Object k) {
Node<K,V> e = this;
if (k != null) {
do {
K ek;
if (e.hash == h &&
((ek = e.key) == k || (ek != null && k.equals(ek))))
return e;
} while ((e = e.next) != null);
}
return null;
}
}
??我們來(lái)看一下Node成員變量:
final int hash;
final K key;
volatile V val;
volatile Node<K,V> next;
??hash用來(lái)存儲(chǔ)該key的hash值,但是這里面有三個(gè)狀態(tài)需要注意:
| 名稱 | 值 | 描述 |
|---|---|---|
| MOVED | -1 | hash值為-1的Node,表示當(dāng)前節(jié)點(diǎn)已經(jīng)被處理過(guò)了,這中情況將出現(xiàn)多線程擴(kuò)容的情況下,后面會(huì)詳細(xì)的解釋 |
| TREEBIN | -2 | hash值為-2的Node,表示當(dāng)前的節(jié)點(diǎn)是TreeBin |
| RESERVED | -3 | 保留的hash值,用在ReservationNode上面 |
??key用來(lái)存儲(chǔ)的key值,val用來(lái)存儲(chǔ)value值,這個(gè)就不用多說(shuō)。next字段表示下一個(gè)Node,這個(gè)在出現(xiàn)哈希沖突時(shí),將定位在相同位置上的Node使用鏈表連接起來(lái)。
??我們還需要注意一點(diǎn)是,我們不能夠通過(guò)調(diào)用setValue方法來(lái)改變Node的值。
public final V setValue(V value) {
throw new UnsupportedOperationException();
}
(2).TreeNode類(lèi)和TreeBin類(lèi)
??TreeNode類(lèi)表示的是紅黑樹(shù)上的每個(gè)節(jié)點(diǎn)。當(dāng)一個(gè)鏈表上的節(jié)點(diǎn)數(shù)量超過(guò)了指定的值,會(huì)將這個(gè)鏈表變?yōu)榧t黑樹(shù),當(dāng)然每個(gè)節(jié)點(diǎn)就轉(zhuǎn)換為T(mén)reeNode。不像HashMap,ConcurrentHashMap在桶里面直接存儲(chǔ)的不是TreeNode,而是一個(gè)TreeBin,在TreeBin內(nèi)部維護(hù)一個(gè)紅黑樹(shù),也就是說(shuō)TreeNode在TreeBin內(nèi)部使用的。
(3).ForwardingNode類(lèi)
??這個(gè)是輔助類(lèi),通常在擴(kuò)容時(shí)用到。此時(shí)如果一個(gè)線程在進(jìn)行擴(kuò)容操作,另一個(gè)線程put一個(gè)元素進(jìn)來(lái),如果看到對(duì)應(yīng)的位置上面是ForwardingNode對(duì)象的話,那么就參與擴(kuò)容的操作隊(duì)列來(lái)。其中ForwardingNode對(duì)象來(lái)源有兩個(gè):1.原來(lái)的位置為null,但是此時(shí)復(fù)制操作已經(jīng)到當(dāng)前位置的后面了,會(huì)將這個(gè)原來(lái)的桶的這個(gè)位置置為ForwardingNode對(duì)象;2.原來(lái)位置不為null,但是已經(jīng)操作過(guò)這個(gè)位置了。
??我們來(lái)來(lái)ForwardingNode的源碼:
static final class ForwardingNode<K,V> extends Node<K,V> {
final Node<K,V>[] nextTable;
ForwardingNode(Node<K,V>[] tab) {
super(MOVED, null, null, null);
this.nextTable = tab;
}
//省略了find方法代碼
}
??在ForwardingNode里面,我們發(fā)現(xiàn)了一個(gè)nextTable對(duì)象,這個(gè)對(duì)象表示擴(kuò)容之后的數(shù)組,當(dāng)一個(gè)線程訪問(wèn)到了一個(gè)ForwardingNode對(duì)象,就知道當(dāng)前正在進(jìn)行擴(kuò)容操作,當(dāng)前這個(gè)線程會(huì)幫助擴(kuò)容(擴(kuò)容分為兩步:1.數(shù)組容量增大2倍;2.將原數(shù)組的元素拷貝到新數(shù)組里面去),將原數(shù)組的元素復(fù)制到這個(gè)nextTable里面去。
4.基本成員變量
??我們?cè)诜治鯟oncurrentHashMap的源碼時(shí),還是先來(lái)看看它基本的成員變量。
| 變量名 | 描述 |
|---|---|
| table | 桶數(shù)組,用來(lái)存儲(chǔ)Node元素的。默認(rèn)為null,只在第一次put操作的進(jìn)行初始化,該數(shù)組的長(zhǎng)度永遠(yuǎn)為2的n次方。 |
| nextTable | 默認(rèn)為null,當(dāng)不為null,表示當(dāng)前正在進(jìn)行擴(kuò)容操作,這個(gè)數(shù)組就是擴(kuò)容之后的數(shù)組,長(zhǎng)度為原數(shù)組的兩倍。 |
| baseCount | 記錄map中元素個(gè)數(shù),由于是多線程操作,baseCount記錄的不準(zhǔn)確,所以要結(jié)合counterCells 來(lái)使用保證記錄的正確性。 |
| sizeCtl | 表初始化和擴(kuò)容的控制位。其中,當(dāng)為-1時(shí),表示當(dāng)前table數(shù)組正在被初始化;當(dāng)為-N時(shí),表示有N-1個(gè)線程在進(jìn)行擴(kuò)容操作;當(dāng)為0(默認(rèn)值)時(shí),表示當(dāng)前table還未使用,此時(shí)table為null;當(dāng)為其余正整數(shù)時(shí),表示table的容量,默認(rèn)是table大小的0.75倍,在代碼中使用的是(n - (n>>>2))的方式來(lái)計(jì)算0.75 |
5.構(gòu)造方法
??在ConcurrentHashMap里面,我覺(jué)得有兩個(gè)構(gòu)造方法需要我們關(guān)注,一個(gè)是無(wú)參數(shù)的構(gòu)造方法,這個(gè)構(gòu)造方法非常的簡(jiǎn)單,是一個(gè)空的構(gòu)造方法,還有一個(gè)就是帶初始容量的構(gòu)造方法:
public ConcurrentHashMap(int initialCapacity) {
if (initialCapacity < 0)
throw new IllegalArgumentException();
int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
MAXIMUM_CAPACITY :
tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
this.sizeCtl = cap;
}
??在這個(gè)構(gòu)造方法里面最主要的是,初始化sizeCtl。在這里保證sizeCtl必須是2的n次方,所以這里調(diào)用tableSizeFor方法的目的就是調(diào)整sizeCtl大小為2的n次方。
private static final int tableSizeFor(int c) {
int n = c - 1;
n |= n >>> 1;
n |= n >>> 2;
n |= n >>> 4;
n |= n >>> 8;
n |= n >>> 16;
return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1;
}
??假設(shè)給出大小為100,調(diào)用這個(gè)方法之后,會(huì)將大小調(diào)整為256。
6.table數(shù)組的初始化
??在前面,對(duì)table數(shù)組的初始化也有所提及。table不會(huì)在構(gòu)造方法里面進(jìn)行初始化,而是在第一次put操作時(shí),進(jìn)行初始化。我們來(lái)看看:
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
//發(fā)現(xiàn)table數(shù)組尚未初始化,調(diào)用initTable方法來(lái)對(duì)table數(shù)組進(jìn)行初始化
if (tab == null || (n = tab.length) == 0)
tab = initTable();
//省略了其余代碼
}
??在調(diào)用put方法時(shí),如果發(fā)現(xiàn)table尚未被初始化,會(huì)調(diào)用initTable方法來(lái)初始化。我們來(lái)看看initTable方法的代碼:
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) {
//sizeCtl小于0,表示已經(jīng)有線程在對(duì)table數(shù)組進(jìn)行初始化,現(xiàn)在這個(gè)線程要做的就是讓出cpu時(shí)間,
//全力支持table初始化
if ((sc = sizeCtl) < 0)
Thread.yield(); // lost initialization race; just spin
//此時(shí)有一個(gè)線程在對(duì)table數(shù)組進(jìn)行是初始化,如果使用CAS算法對(duì)sizeCtl字段更新成功,
//表示當(dāng)前線程可以對(duì)table數(shù)組進(jìn)行初始化。
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
if ((tab = table) == null || tab.length == 0) {
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
break;
}
}
return tab;
}
??在initTable方法中,我們發(fā)現(xiàn)table數(shù)組只能被一個(gè)線程初始化。如果一個(gè)線程在初始化table數(shù)組,會(huì)將sizeCtl字段更新為-1,其他線程看到sizeCtl為-1時(shí),就知道當(dāng)前已經(jīng)有線程在初始化table數(shù)組,他們需要做的是,全力支持那個(gè)線程初始化table數(shù)組--讓出cpu占用時(shí)間。
??同時(shí),我們看到當(dāng)table數(shù)組更新成功之后,sizeCtl更新為table數(shù)組大小的0.75倍:
sc = n - (n >>> 2);
7.put操作
??我們使用HashMap最多的操作就是put操作和get操作。這里先對(duì)put進(jìn)行分析。我們先總體看一下put的代碼:
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0)
tab = initTable();
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else {
V oldVal = null;
synchronized (f) {
if (tabAt(tab, i) == f) {
if (fh >= 0) {
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
addCount(1L, binCount);
return null;
}
??這里,我先將整個(gè)put方法的代碼貼出,讓大家有一個(gè)總體上概念,然后再來(lái)一一解釋,整個(gè)put操作的流程。
??整個(gè)put操作分為4種情況:
??1.如果當(dāng)前table沒(méi)有被初始化,會(huì)先初始化,重新進(jìn)行put操作(循環(huán)操作,保證put成功)。
??2.通過(guò)hash計(jì)算,定位位置,如果位置上為null,表示可以直接放進(jìn)去。
??3.如果對(duì)應(yīng)位置的元素已經(jīng)被標(biāo)記為MOVED,即Node的hash為MOVED,那么表示當(dāng)前table數(shù)組在進(jìn)行擴(kuò)容操作,此時(shí),當(dāng)前的線程會(huì)幫助擴(kuò)容。
??4.如果通過(guò)hash計(jì)算之后的位置不為null,表示出現(xiàn)了哈希沖突,此時(shí)會(huì)鎖住當(dāng)前這個(gè)位置上的Node對(duì)象,然后將新添加的Node添加這個(gè)位置鏈表或者紅黑樹(shù)上。
(1).hash計(jì)算
??put操作的第一步就是hash計(jì)算,這里的hash計(jì)算,我認(rèn)為分為兩步:
??1.調(diào)用spread方法將key的hashcode再次hash。
??2.通過(guò)(n - 1) & hash方法來(lái)計(jì)算該元素定位的位置。
??我們先來(lái)看看spread方法:
static final int spread(int h) {
return (h ^ (h >>> 16)) & HASH_BITS;
}
??我先來(lái)解釋一下>>>運(yùn)算符的含義:
??>>>的基本操作就是左移,然后高位補(bǔ)0,這里的左移表示連符號(hào)位都要跟著左移;而>>只是左移數(shù)值位,不移動(dòng)符號(hào)位。
??h ^ (h >>> 16)與運(yùn)算過(guò)程如下圖:

??異或操作完成之后,然后跟HASH_BITS取與操作,我認(rèn)為是為了消除符號(hào)位的影響。
??至于(n - 1) & hash計(jì)算,其實(shí)就是hash % n的計(jì)算,具體的解釋,大家可以參考我的Java 源碼分析-ThreadLocal,這篇文章里面有相關(guān)的解釋。
(2).元素放入數(shù)組
??由于初始化table數(shù)組在前面已經(jīng)說(shuō)了,所以這里直接說(shuō)賦值的情況。在這種情況下,還有分為兩種情況:1.對(duì)應(yīng)位置為null;2.對(duì)應(yīng)位置不為null。這里將將兩種情況分別討論一下。
A.對(duì)應(yīng)位置為null
??這種情況比較簡(jiǎn)單,直接調(diào)用了casTabAt方法進(jìn)行賦值:
static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
Node<K,V> c, Node<K,V> v) {
return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
}
??這里使用Unsafe類(lèi)來(lái)對(duì)變量進(jìn)行CAS更新,具體的實(shí)現(xiàn)原理,樓主也不是很清楚,實(shí)在抱歉!
A.對(duì)應(yīng)位置不為null
??這種情況下,就比較復(fù)雜了,我們先來(lái)看看代碼:
V oldVal = null;
synchronized (f) {
if (tabAt(tab, i) == f) {
if (fh >= 0) {
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
??這種情況下,新添加的Node肯定會(huì)添加到這個(gè)位置后面,當(dāng)然為了保證線程安全,肯定鎖住當(dāng)前的Node,這樣將鎖的粒度降到了每個(gè)Node上,一個(gè)Node的put操作不會(huì)影響其他Node的操作。
??鎖住了之后,當(dāng)然就要添加Node。這里分為兩種情況,一種原來(lái)的Node不是TreeBin,也就是說(shuō)原來(lái)就是一個(gè)鏈表,這樣直接添加到鏈表的內(nèi)部就行了;還有就是原來(lái)的Node本身就是一個(gè)TreeBin,那么我們通過(guò)調(diào)用TreeBin內(nèi)部的方法來(lái)添加一個(gè)Node,至于紅黑樹(shù)的平衡性,讓TreeBin自己來(lái)維持,我們外部不需要關(guān)注。
??添加完成之后,還需要一個(gè)操作,那就是如果當(dāng)前鏈表達(dá)到了閾值,需要將鏈表變?yōu)榧t黑樹(shù)。這一步的解釋在后面在詳細(xì)的說(shuō)。
8.更新baseCount
??在putVal方法的循環(huán)執(zhí)行完畢之后,一個(gè)Node肯定放入進(jìn)去了,此時(shí)就需要調(diào)用addCount方法來(lái)更新baseCount變量:
private final void addCount(long x, int check) {
CounterCell[] as; long b, s;
//更新baseCount
if ((as = counterCells) != null ||
!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
CounterCell a; long v; int m;
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 ||
(a = as[ThreadLocalRandom.getProbe() & m]) == null ||
!(uncontended =
U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
fullAddCount(x, uncontended);
return;
}
if (check <= 1)
return;
s = sumCount();
}
//檢測(cè)是否需要擴(kuò)容
if (check >= 0) {
Node<K,V>[] tab, nt; int n, sc;
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
int rs = resizeStamp(n);
if (sc < 0) {
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
s = sumCount();
}
}
}
?? addCount方法里面分為主要兩個(gè)操作:1.更新baseCount;2.檢測(cè)是否擴(kuò)容。這里由于在討論更新baseCount,所以不對(duì)擴(kuò)容操作進(jìn)行詳細(xì)的解釋,之后會(huì)做詳細(xì)的解釋。
?? 更新baseCount的操作分成了兩步:1.嘗試更新baseCount變量;2.如果更新失敗,或者counterCells為null會(huì)調(diào)用fullAddCount方法進(jìn)行循環(huán)更新。
??我們來(lái)看看fullAddCount的代碼:
private final void fullAddCount(long x, boolean wasUncontended) {
int h;
if ((h = ThreadLocalRandom.getProbe()) == 0) {
ThreadLocalRandom.localInit(); // force initialization
h = ThreadLocalRandom.getProbe();
wasUncontended = true;
}
boolean collide = false; // True if last slot nonempty
for (;;) {
CounterCell[] as; CounterCell a; int n; long v;
if ((as = counterCells) != null && (n = as.length) > 0) {
// 如果counterCells數(shù)組對(duì)應(yīng)位置上為null,創(chuàng)建一個(gè)cell,
//放在這個(gè)位置上。
if ((a = as[(n - 1) & h]) == null) {
if (cellsBusy == 0) { // Try to attach new Cell
CounterCell r = new CounterCell(x); // Optimistic create
if (cellsBusy == 0 &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
boolean created = false;
try { // Recheck under lock
CounterCell[] rs; int m, j;
if ((rs = counterCells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
rs[j] = r;
created = true;
}
} finally {
cellsBusy = 0;
}
if (created)
break;
continue; // Slot is now non-empty
}
}
collide = false;
}
else if (!wasUncontended) // CAS already known to fail
wasUncontended = true; // Continue after rehash
//如果對(duì)應(yīng)位置上不為null,嘗試更新value值
else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))
break;
else if (counterCells != as || n >= NCPU)
collide = false; // At max size or stale
else if (!collide)
collide = true;
//如果對(duì)應(yīng)不為null,并且更新失敗,表示此時(shí)counterCells數(shù)組的容量過(guò)小,
//此時(shí)需要擴(kuò)容。
else if (cellsBusy == 0 &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
try {
if (counterCells == as) {// Expand table unless stale
CounterCell[] rs = new CounterCell[n << 1];
for (int i = 0; i < n; ++i)
rs[i] = as[i];
counterCells = rs;
}
} finally {
cellsBusy = 0;
}
collide = false;
continue; // Retry with expanded table
}
h = ThreadLocalRandom.advanceProbe(h);
}
//初始化counterCells數(shù)組
else if (cellsBusy == 0 && counterCells == as &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
boolean init = false;
try { // Initialize table
if (counterCells == as) {
CounterCell[] rs = new CounterCell[2];
rs[h & 1] = new CounterCell(x);
counterCells = rs;
init = true;
}
} finally {
cellsBusy = 0;
}
if (init)
break;
}
else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x))
break; // Fall back on using base
}
}
??這個(gè)方法的代碼比較長(zhǎng),理解起來(lái)可能麻煩,我來(lái)說(shuō)明整個(gè)方法的作用和執(zhí)行的過(guò)程。
??在解釋這個(gè)方法之前,先解釋一下baseCount和counterCells含義。兩個(gè)都是用記錄元素個(gè)數(shù),只是記錄的時(shí)機(jī)不同的。當(dāng)要更新元素個(gè)數(shù)時(shí),優(yōu)先更新baseCount,如果baseCount更新成功的話,表示更新元素個(gè)數(shù)的操作已經(jīng)完成了;如果更新失敗的話,此時(shí)會(huì)考慮更新counterCells數(shù)組中某一個(gè)(隨機(jī)的)cell的value值。因此,map的元素個(gè)數(shù) = baseCount + 所有的cell的value值。
??在調(diào)用fullAddCount方法之前,在addCount方法里面進(jìn)行了兩次嘗試:1.嘗試更新baseCount;2.嘗試更新counterCells數(shù)組隨機(jī)位置上的一個(gè)cell的value。如果這兩次嘗試都失敗的話,則需要調(diào)用fullAddCount來(lái)保證元素個(gè)數(shù)正確的更新。
??而這個(gè)fullAddCount方法的作用就是更新cell的value值。
??整個(gè)fullAddCount方法的執(zhí)行步驟:
??1.如果counterCells為null,先初始化counterCells數(shù)組,默認(rèn)大小為0。
??2.如果counterCells不為null,根據(jù)產(chǎn)生的隨機(jī),通過(guò)(n - 1) & h找到一個(gè)位置,如果這個(gè)位置為null的話,創(chuàng)建一個(gè)cell對(duì)象放在這個(gè)位置上面。
??3.如果這個(gè)位置上面不為null,首先會(huì)嘗試更新這個(gè)cell的value值,如果更新成功的話,表示更新操作完成,否則執(zhí)行第4步。
??4.如果第3步的更新操作失敗,表示此時(shí)counterCells數(shù)組的容量可能不足以應(yīng)付這么多的線程了。所以此時(shí)counterCells數(shù)組需要擴(kuò)容。
9.擴(kuò)容操作
??當(dāng)我們put一個(gè)元素之后,如果此時(shí)元素已經(jīng)達(dá)到了擴(kuò)容的閾值了,就需要擴(kuò)容。我們先來(lái)看看addCount方里面的擴(kuò)容部分:
if (check >= 0) {
Node<K,V>[] tab, nt; int n, sc;
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
int rs = resizeStamp(n);
//表示已經(jīng)有線程在進(jìn)行擴(kuò)容操作
if (sc < 0) {
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
//當(dāng)前線程是唯一的或是第一個(gè)發(fā)起擴(kuò)容的線程 此時(shí)nextTable=null
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
s = sumCount();
}
}
??addcount方法擴(kuò)容部分有兩種情況:1. sc<0情況下,表示當(dāng)前已經(jīng)有線程在進(jìn)行擴(kuò)容操作了,此時(shí)當(dāng)前這個(gè)線程需要參與幫助擴(kuò)容的任務(wù)中來(lái);2. 另一個(gè)情況,當(dāng)前線程只有一個(gè)線程準(zhǔn)備擴(kuò)容操作。第一種情況為什么不調(diào)用helperTransfer方法來(lái)表示幫助擴(kuò)容的含義,這個(gè)原因,我也不是很懂,但是比較了第一種情況的代碼與helperTransfer方法的代碼,兩者其實(shí)比較類(lèi)似。
??現(xiàn)在我們來(lái)看看transfer方法究竟為我們做了什么。由于transfer方法代碼過(guò)長(zhǎng),所以這里不完全展出來(lái),這里使用分段形式來(lái)解釋。
??首先,如果是第一個(gè)線程調(diào)用transfer方法進(jìn)行擴(kuò)容操作,那么nextTable肯定為null。所以transfer方法的第一步就是創(chuàng)建新數(shù)組,新數(shù)組的容量是舊數(shù)組的兩倍:
if (nextTab == null) { // initiating
try {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
transferIndex = n;
}
??擴(kuò)容之后,此時(shí)需要將原來(lái)的數(shù)組進(jìn)行復(fù)制操作了。這個(gè)過(guò)程支持多線程操作。在看這個(gè)過(guò)程之前,我們先來(lái)看看幾個(gè)變量的含義。
| 變量名 | 類(lèi)型 | 描述 |
|---|---|---|
| fwd | ForwardingNode | 用來(lái)作為占位符,表示當(dāng)前位置已經(jīng)被處理過(guò) |
| advance | boolean | advance為true,表示當(dāng)前這個(gè)節(jié)點(diǎn)已經(jīng)被處理,這個(gè)與fwd區(qū)別在于,advance為false表示當(dāng)前節(jié)點(diǎn)(i位置)可以被處理 |
| finishing | boolean | finishing為true,表示當(dāng)前擴(kuò)容操作已經(jīng)操作完畢 |
??接下來(lái),我們來(lái)分析一下transfer方法的執(zhí)行過(guò)程。大體的流程是,遍歷這個(gè)原來(lái)的桶數(shù)組,然后復(fù)制到新數(shù)組里面去。
??首先,必須找到一個(gè)可以被處理的Node節(jié)點(diǎn),這個(gè)節(jié)點(diǎn)可以為null,也可以不為null,但是不能為fwd對(duì)象,因?yàn)槿绻莊wd對(duì)象,表示當(dāng)前已經(jīng)被處理過(guò),沒(méi)必要再去處理。
while (advance) {
int nextIndex, nextBound;
if (--i >= bound || finishing)
advance = false;
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
??上面的代碼中,只要advance為false,就退出while循環(huán),表示當(dāng)前嘗試著處理這個(gè)位置上的Node。這里說(shuō)的是嘗試著處理,意思就是,可能不會(huì)被處理,比如說(shuō),已經(jīng)有現(xiàn)在在處理了;或者這個(gè)位置已經(jīng)被處理,已經(jīng)為fwd了。
??首先,如果這個(gè)位置上為null的話,那么直接將這個(gè)位置設(shè)置成fwd。
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
??如果當(dāng)前這個(gè)節(jié)點(diǎn)已經(jīng)被處理了,直接將advance設(shè)置為true,進(jìn)行下一次位置的選擇。
else if ((fh = f.hash) == MOVED)
advance = true; // already processed
??其他情況下,表示可以處理這個(gè)節(jié)點(diǎn),這里處理的意思,是可以將這個(gè)位置上的所有節(jié)點(diǎn)拷貝到新數(shù)組里面去。這里分為兩種情況,我們來(lái)一一的分析。
??首先,第一種情況就是節(jié)點(diǎn)本身為一個(gè)鏈表結(jié)構(gòu)。
Node<K,V> ln, hn;
if (fh >= 0) {
int runBit = fh & n;
Node<K,V> lastRun = f;
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
//將原來(lái)鏈表上的一部分就放在原位置
setTabAt(nextTab, i, ln);
//將原來(lái)鏈表上的另一部分放在 i + n的位置上
setTabAt(nextTab, i + n, hn);
//處理完畢,將這個(gè)位置上設(shè)置為fwd,后面的線程看到之后,知道這個(gè)節(jié)點(diǎn)已經(jīng)被處理了
setTabAt(tab, i, fwd);
advance = true;
}
??整個(gè)過(guò)程是非常簡(jiǎn)單,就是將原來(lái)的鏈表分為了兩部分,一部分在原來(lái)的 i 位置上,一部分在 i+ n的位置上。這里涉及到了喜聞樂(lè)見(jiàn)的反轉(zhuǎn)鏈表,這里不對(duì)反轉(zhuǎn)鏈表做解釋,后面會(huì)詳細(xì)的解釋反轉(zhuǎn)鏈表的原因,到時(shí)候會(huì)回來(lái)詳細(xì)的解釋這段代碼。
??現(xiàn)在來(lái)看看復(fù)制操作的第二種類(lèi)型。如果當(dāng)前位置上的Node為T(mén)reeBin類(lèi)型,表示為紅黑樹(shù),此時(shí)要做的操作跟鏈表的操作也是非常類(lèi)似,將樹(shù)分為兩個(gè)部分,一個(gè)部分在原來(lái)的i位置上,另一個(gè)部分在i + n的位置上。
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
//如果此時(shí)節(jié)點(diǎn)數(shù)量小于閾值,那么將紅黑樹(shù)變?yōu)殒湵? ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
??最后,如果所有的節(jié)點(diǎn)被賦值完畢之后,表示擴(kuò)容操作已經(jīng)完成了,此時(shí)finishing為true。
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
if (finishing) {
//將nextTable置為null,以便下次擴(kuò)容
nextTable = null;
table = nextTab;
//將sizeCtl的值調(diào)整為長(zhǎng)度的0.75
sizeCtl = (n << 1) - (n >>> 1);
return;
}
//sizeCtl減一,表示當(dāng)前有個(gè)線程參與擴(kuò)容
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
finishing = advance = true;
i = n; // recheck before commit
}
}
10.反轉(zhuǎn)鏈表的原因
??樓主的思路來(lái)自于深入分析ConcurrentHashMap1.8的擴(kuò)容實(shí)現(xiàn),如果大家可以參考一下。剛剛,我們對(duì)transfer方法的復(fù)制操作有了一個(gè)大概的理解,這里將對(duì)其詳細(xì)分析。
??在解釋這段代碼之前,我們先對(duì)這段代碼里面幾個(gè)變量有一個(gè)初步的理解。
| 變量名 | 類(lèi)型 | 描述 |
|---|---|---|
| runBit | int | 位置為每個(gè)節(jié)點(diǎn)的hash值與n進(jìn)行與操作,經(jīng)過(guò)第一次循環(huán),runBit記錄的是最后一個(gè)hash值變化的Node的hash值??赡苓@里拗口,待會(huì)舉一個(gè)詳細(xì)例子。 |
| lastRun | Node | 默認(rèn)值為當(dāng)前位置第一個(gè)Node,經(jīng)過(guò)第一次循環(huán),lastRun記錄的是最后一個(gè)hash值變化的Node。 |
| ln | Node | 默認(rèn)值為null,是放在新數(shù)組 i 位置上的鏈表頭結(jié)點(diǎn)。 |
| hn | Node | 默認(rèn)值為null,是放在新數(shù)組 i + n 位置上的鏈表頭結(jié)點(diǎn) |
??在上面的表格中,提到最后一個(gè)hash值變化的Node,這句話可能比較拗口,這里舉一個(gè)簡(jiǎn)單的例子來(lái)表達(dá)意思。

??上圖中是一個(gè)鏈表的模型,其中,我將這個(gè)鏈表中的節(jié)點(diǎn)分為兩類(lèi):1.一類(lèi)是節(jié)點(diǎn)的hash值相同的;2.第二類(lèi)是跟第一類(lèi)的hash值不同的。所以,從這個(gè)鏈表中我們可以得到,lastRun為節(jié)點(diǎn)6,runBit也是節(jié)點(diǎn)6與n進(jìn)行與操作得到的值。為什么是節(jié)點(diǎn)6呢?因?yàn)樵?之后,節(jié)點(diǎn)類(lèi)型沒(méi)有再變了。
Node<K,V> ln, hn;
if (fh >= 0) {
int runBit = fh & n;
Node<K,V> lastRun = f;
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
//將原來(lái)鏈表上的一部分就放在原位置
setTabAt(nextTab, i, ln);
//將原來(lái)鏈表上的另一部分放在 i + n的位置上
setTabAt(nextTab, i + n, hn);
//處理完畢,將這個(gè)位置上設(shè)置為fwd,后面的線程看到之后,知道這個(gè)節(jié)點(diǎn)已經(jīng)被處理了
setTabAt(tab, i, fwd);
advance = true;
}
??結(jié)合源碼來(lái)分析,runBit默認(rèn)為節(jié)點(diǎn)1的hash和n與操作的值。后面依次記錄與上一次runBit的不同值,最后通過(guò)runBit == 0將所有的節(jié)點(diǎn)分為了兩類(lèi)。
??現(xiàn)在我們結(jié)合上面的圖做一個(gè)假設(shè)。我們假設(shè)紫色節(jié)點(diǎn)的hash值&n為0,那么藍(lán)色節(jié)點(diǎn)的hash值&n就不為0。經(jīng)過(guò)第一次循環(huán),得出 ln = lastRun = 節(jié)點(diǎn)6,hn = null。
??此時(shí)我們來(lái)分析第二次循環(huán),先來(lái)看看紫色節(jié)點(diǎn)構(gòu)成ln鏈表的過(guò)程。

??我看到ln鏈表只是部分反轉(zhuǎn)了,默認(rèn)部分是沒(méi)有反轉(zhuǎn)了。其中得到的ln鏈表放在新數(shù)組的i位置上。
??然后我們?cè)賮?lái)看看hn鏈表的構(gòu)成過(guò)程。

??我們發(fā)現(xiàn)hn鏈表是完全反轉(zhuǎn)的。從而,我們得出一個(gè)結(jié)論,哪個(gè)鏈表默認(rèn)為null,那個(gè)鏈表就是完全反轉(zhuǎn)的。
??最終,我們終于知道了,一個(gè)反轉(zhuǎn)鏈表得出的原因,其實(shí)不是故意為之,而是加快復(fù)制操作速度,因?yàn)閘astRun之后的節(jié)點(diǎn)進(jìn)行復(fù)制操作不需要?jiǎng)?chuàng)建新節(jié)點(diǎn)。
11.get操作
??前面詳細(xì)的說(shuō)了put操作和擴(kuò)容操作,這里簡(jiǎn)單的分析一下get操作,get操作相比于前面的操作就非常的簡(jiǎn)單。先來(lái)看看源碼:
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
int h = spread(key.hashCode());
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
//匹配成功
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
//當(dāng)節(jié)點(diǎn)為T(mén)reeBin或者ForwardingNode
else if (eh < 0)
return (p = e.find(h, key)) != null ? p.val : null;
//遍歷鏈表
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}
??整個(gè)get操作分為三種情況:1.table數(shù)組指定位置的第一個(gè)節(jié)點(diǎn)就匹配成功,直接返回;2.table數(shù)組指定位置上的hash值小于0,此時(shí)當(dāng)前位置可能已經(jīng)被其他在擴(kuò)容時(shí)處理過(guò),或者當(dāng)前位置的Node為一個(gè)TreeBin,不管是那種類(lèi)型的Node,調(diào)用的都是find方法來(lái)獲取Node節(jié)點(diǎn);3.其余情況下,直接遍歷鏈表查找。
12.size操作
??size操作里面,主要有兩個(gè)方法提供,一個(gè)是傳統(tǒng)的size方法,一個(gè)是ConcurrentHashMap比較提倡的mappingCount方法。我們先來(lái)看看size方法:
public int size() {
long n = sumCount();
return ((n < 0L) ? 0 :
(n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
(int)n);
}
??size方法里面調(diào)用sumCount方法,sumCount方法才是真正計(jì)算元素個(gè)數(shù)的方法。我們來(lái)看看sumCount方法:
final long sumCount() {
CounterCell[] as = counterCells; CounterCell a;
long sum = baseCount;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}
??可能大家對(duì)這個(gè)方法計(jì)算方式不太好理解,其實(shí)這種計(jì)算方式取決于前面的設(shè)計(jì)方式。在前面我說(shuō)到過(guò),map的元素個(gè)數(shù) = baseCount + 所有的cell的value值。這個(gè)結(jié)論是因?yàn)槎嗑€程更新個(gè)數(shù)時(shí),如果只在baseCount里面記錄,一個(gè)原因是多線程更新可能會(huì)出錯(cuò),另一個(gè)原因是競(jìng)爭(zhēng)性太大了。所以,設(shè)計(jì)理念就變成了,當(dāng)baseCount更新失敗時(shí),表示baseCount變量當(dāng)前忙碌,可以將這個(gè)增量添加到一個(gè)不忙的cell里面去,這樣競(jìng)爭(zhēng)性就降低了。
??我們?cè)賮?lái)看看mappingCount方法:
public long mappingCount() {
long n = sumCount();
return (n < 0L) ? 0L : n; // ignore transient negative values
}
??達(dá)到的效果感覺(jué)跟size都差不多,只不過(guò)是一個(gè)返回int,一個(gè)返回long而已。