前言
本文中的ConcurrentHashMap源碼分析版本是jdk1.8;
問題
在對(duì)ConcurrentHashMap進(jìn)行源碼分析之前,首先需要確定幾個(gè)問題,也就是ConcurrentHashMap相對(duì)HashMap來說需要解決的問題:
- 如何保證高并發(fā)時(shí)數(shù)據(jù)插入的同步性以及高效性?
- 如何保證高并發(fā)場(chǎng)景下擴(kuò)容時(shí),不會(huì)重復(fù)擴(kuò)容?
- 如何保證高并發(fā)場(chǎng)景下擴(kuò)容時(shí),如何處理數(shù)據(jù)讀取和插入?
ConcurrentHashMap常量說明
要理解ConcurrentHashMap,首先需要對(duì)其常量和變量有一定認(rèn)識(shí),必須知道這是用來干什么的,什么場(chǎng)景下使用;
- MAXIMUM_CAPACITY = 1 << 30;表示哈希表最大容量為2^30,因?yàn)?2位bit中前兩位bit是被用作控制目的的;
- DEFAULT_CAPACITY = 16;表示哈希表默認(rèn)容量是16,
- MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;表示虛擬機(jī)棧數(shù)組最大長(zhǎng)度,JDK1.8版本后concurrenthashMap只是在toArray相關(guān)的方法使用到;
- DEFAULT_CONCURRENCY_LEVEL = 16;默認(rèn)并發(fā)等級(jí),當(dāng)前版本不怎么使用,主要是為了兼容之前版本;
- LOAD_FACTOR = 0.75f;哈希表加載因子,計(jì)算加載因子容量時(shí)可以利用n-(n>>2)表達(dá)式輕松計(jì)算出來;
- TREEIFY_THRESHOLD = 8; bin鏈表轉(zhuǎn)化成紅黑樹的閾值;超過該值后查找紅黑樹相對(duì)鏈表來說要效率高些
bin:箱子或者柜子,可以理解為裝node的箱子,每個(gè)箱子里面掛著一個(gè)node的鏈表或者紅黑樹;
- UNTREEIFY_THRESHOLD = 6;bin紅黑樹轉(zhuǎn)換成鏈表的閾值;當(dāng)nodes數(shù)量小于該值時(shí)紅黑樹將轉(zhuǎn)化為鏈表;這時(shí)查找鏈表速率更快些;
- MIN_TREEIFY_CAPACITY = 64;表示哈希表容量大于該值時(shí)才會(huì)將鏈表轉(zhuǎn)換成紅黑樹,這是bin鏈表轉(zhuǎn)換成紅黑樹的第二個(gè)條件(如果一個(gè)bin中nodes太多的話,會(huì)選擇擴(kuò)容而不是轉(zhuǎn)化成紅黑樹);
為什么是64呢?因?yàn)楸苊庠跀U(kuò)容和轉(zhuǎn)換成紅黑樹時(shí)的沖突,因?yàn)楣1砣萘枯^小時(shí)是比較容易進(jìn)行擴(kuò)容操作的,如果一個(gè)bin中nodes比較多就轉(zhuǎn)換成紅黑樹,那么發(fā)生擴(kuò)容之后,又得轉(zhuǎn)換成鏈表,會(huì)造成很多不必要的計(jì)算,所以為了平衡兩者的沖突,需要確定一個(gè)數(shù)組容量閾值;
- MIN_TRANSFER_STRIDE = 16;表示擴(kuò)容時(shí)一個(gè)線程需要轉(zhuǎn)移bins的最小長(zhǎng)度為16,因?yàn)閿U(kuò)容時(shí)如果bins長(zhǎng)度比較大,那么可以借助多線程并發(fā)轉(zhuǎn)移bins,轉(zhuǎn)移bins指的是擴(kuò)容后根據(jù)bin中node的hash值與擴(kuò)容的數(shù)組長(zhǎng)度相與后確定新的bin,然后將該node轉(zhuǎn)移到新的bin去;
- RESIZE_STAMP_BITS = 16;每次擴(kuò)容時(shí)用來在sizeCtl中記錄擴(kuò)容戳的位數(shù),不同輪次的擴(kuò)容操作的擴(kuò)容戳都是不同的,擴(kuò)容戳可以保證多次擴(kuò)容操作不會(huì)交叉重疊,該變量不是常量也不提供修改方法;
- MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1;表示擴(kuò)容時(shí)最多允許2^16-1個(gè)線程進(jìn)行transfer工作;
- RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;擴(kuò)容戳移位量,擴(kuò)容時(shí)將擴(kuò)容戳進(jìn)行移位用來當(dāng)做擴(kuò)容線程計(jì)數(shù)的基數(shù),通過反向移位后可以計(jì)算出擴(kuò)容戳;
- MOVED = -1,表示擴(kuò)容時(shí)FORWARDING節(jié)點(diǎn)的hash值,F(xiàn)ORWARDING節(jié)點(diǎn)是一個(gè)臨時(shí)節(jié)點(diǎn),只在擴(kuò)容時(shí)出現(xiàn),如果舊數(shù)組中某個(gè)bin(箱子)的nodes全部都遷移到了新數(shù)組中,舊數(shù)組就會(huì)在該bin頭部放一個(gè)forwarding節(jié)點(diǎn);如果讀線程在進(jìn)行讀操作或者迭代讀時(shí)遇到了該節(jié)點(diǎn),就會(huì)被轉(zhuǎn)移到新數(shù)組上執(zhí)行讀操作,如果寫線程遇到該節(jié)點(diǎn),則會(huì)嘗試協(xié)助擴(kuò)容;
- TREEBIN=-2,表示TreeBin的hash值,TreeBin是ConcurrentHashMap中用來代理操作TreeNode的特殊節(jié)點(diǎn),擁有存儲(chǔ)實(shí)際數(shù)據(jù)的紅黑樹的根節(jié)點(diǎn);
- RESERVED = -3;保留節(jié)點(diǎn),也就是站位符,一般情況下不會(huì)出現(xiàn),在jdk1.8版本中新的函數(shù)式方法computeIfAbsent和compute中才會(huì)出現(xiàn);
- HASH_BITS=0x7ffffff;用于將節(jié)點(diǎn)hash值為負(fù)數(shù)的轉(zhuǎn)化為正數(shù),hashtable中利用hash值定位bin也是利用該方法;
ConcurrentHashMap變量說明
volatile Node<K,V>[] nextTable;nextTabel!=null表示當(dāng)前ConcurrentHashMap正在擴(kuò)容,還有些擴(kuò)容線程沒有完全退出;
volatile int sizeCtl;用來控制ConcurrentHashMap初始化和擴(kuò)容的;
sizeCtl=-1表示進(jìn)行初始化;
sizeCtl=-(1+n)時(shí)表示擴(kuò)容時(shí)的活躍線程數(shù)為n,n+1為sizeCtl的低32 - RESIZE_STAMP_BITS位值代表的數(shù)值,如只有一個(gè)線程第一次進(jìn)行擴(kuò)容時(shí),sizeCtl=-2145714174,那么其二進(jìn)制為
-2145714174=1000 0000 0001 1011 0000 0000 0000 0010,
其低32 - RESIZE_STAMP_BITS=32-16=16位的補(bǔ)碼為0000 0000 0000 0010,剛好為2;
如果該值減少2如果使得活躍線程位數(shù)代表的值為零,說明擴(kuò)容已經(jīng)結(jié)束;
sizeCtl>0時(shí)表示初始化中正在使用的數(shù)組初始容量值或者初始化后下一次初始化數(shù)組需要的容量值;
sizeCtl=0表示初始化中數(shù)組默認(rèn)使用的是初始化容量值;volatile int transferIndex:表示上一個(gè)transfer任務(wù)結(jié)束的位置或者數(shù)組長(zhǎng)度值,ConcurrentHashMap將bin數(shù)組劃分為長(zhǎng)度為stride的m(m=n%stride)個(gè)transfer任務(wù),第k(1<=K<=m)個(gè)transfer任務(wù)處理[(k-1)* stride,k* stride-1]個(gè)bins,而且為了盡量避免和迭代處理相同的bin,transfer任務(wù)是從后往前執(zhí)行的,所以每次設(shè)置transferIndex都是k*stride,第一次設(shè)置transferIndex為bin數(shù)組的長(zhǎng)度n,從n-1開始處理bin轉(zhuǎn)移;第二次transferIndex則為n-stride;每次都是通過cas機(jī)制避免多線程處理同一個(gè)transfer任務(wù);
volatile long baseCount:計(jì)數(shù)器基本值,主要在沒有多線程競(jìng)爭(zhēng)情況下使用,需要通過CAS機(jī)制更新;
volatile CounterCell[] counterCells:CAS自旋鎖標(biāo)志位,用于初始化,或者counterCells擴(kuò)容時(shí)
volatile int cellsBusy:高并發(fā)的計(jì)數(shù)單元,如果進(jìn)行了初始化,那么長(zhǎng)度和bin數(shù)組一樣都是2^n;
ConcurrentHashMap方法解析
putVal()解析
putVal有三個(gè)參數(shù)K key,V value,boolean onlyIfAbsent,onlyIfAbsent默認(rèn)為false,當(dāng)putIfAbsent方法調(diào)用時(shí)則為ture;
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
//將key的hashcode進(jìn)行平鋪,也就是為了避免hash值低位與數(shù)組長(zhǎng)度n-1相與可能會(huì)有更多的hash collide,因此將hash值高16位和低16位先進(jìn)行或運(yùn)算
int hash = spread(key.hashCode());
int binCount = 0;//用來計(jì)算bin鏈表長(zhǎng)度
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh; K fk; V fv;
if (tab == null || (n = tab.length) == 0)//初始化
tab = initTable();
//將hash和數(shù)組長(zhǎng)度n-1進(jìn)行與運(yùn)算找到bin的下標(biāo)i,然后對(duì)該bin第一個(gè)Node進(jìn)行判斷
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {//如果該node為空,則cas添加一個(gè)新node,成功則退出循環(huán),否則繼續(xù);
if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value)))
break; // no lock when adding to empty bin
}
else if ((fh = f.hash) == MOVED)//如果node的hash為moved,那么表示該bin正在擴(kuò)容中,則協(xié)助擴(kuò)容
tab = helpTransfer(tab, f);
else if (onlyIfAbsent // check first node without acquiring lock
&& fh == hash
&& ((fk = f.key) == key || (fk != null && key.equals(fk)))
&& (fv = f.val) != null)//判斷bin頭node如果不能替換且old key和new key一致,則直接退出
return fv;
else {//如果bin header node可以替換或者不等于key不一致
V oldVal = null;
synchronized (f) {//利用synchronized方法將header node加鎖,避免競(jìng)爭(zhēng)造成一致性
if (tabAt(tab, i) == f) {//如果header node尚未沒有發(fā)生變化,避免另有線程更新該header node
if (fh >= 0) {//hash值大于零表示,該node是普通node
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {//從header node開始迭代找到key,hash值一致的node進(jìn)行替換,找不到則在tail添加新node
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key, value);
break;
}
}
}
else if (f instanceof TreeBin) {//如果header node是紅黑樹TreeBin
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
else if (f instanceof ReservationNode)//保留節(jié)點(diǎn)一般情況不太可能出現(xiàn)
throw new IllegalStateException("Recursive update");
}
}
if (binCount != 0) {//如果binCount大于零即bin鏈表長(zhǎng)度大于零
if (binCount >= TREEIFY_THRESHOLD)//判斷是否大于紅黑樹轉(zhuǎn)化閾值
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
addCount(1L, binCount);//添加到高效計(jì)數(shù)器,另外根據(jù)binCount判斷是否需要協(xié)助擴(kuò)容或者紅黑樹轉(zhuǎn)換
return null;
}
transfer方法說明
transfer方法其實(shí)劃分為3個(gè)階段:
- 根據(jù)cpu核數(shù)計(jì)算transfer任務(wù)步長(zhǎng)stride和初始化nextTab;
-
當(dāng)前線程申請(qǐng)transfer任務(wù),transfer任務(wù)劃分如下圖:
transfer任務(wù).png
-
- 擴(kuò)容操作
源碼解析如下:
- 擴(kuò)容操作
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
//計(jì)算transfer任務(wù)步長(zhǎng)stride,最小為16,多線程情況下為n/(8*NCPU)
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
if (nextTab == null) { // nextTab初始化
try {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;//長(zhǎng)度增加兩倍后復(fù)制給nextTable
transferIndex = n;//一開始transferIndex等于原數(shù)組末尾index+1處
}
int nextn = nextTab.length;
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);//轉(zhuǎn)移節(jié)點(diǎn)
boolean advance = true;//表示是否還能繼續(xù)執(zhí)行申請(qǐng)transfer任務(wù)
boolean finishing = false; // 在提交nextTable之前用來保證所有transfer任務(wù)都已經(jīng)完成
for (int i = 0, bound = 0;;) {//i表示當(dāng)前transfer任務(wù)初始化開始bin位置,bound表示當(dāng)前transfer任務(wù)初始化結(jié)束bin的下標(biāo);
Node<K,V> f; int fh;
while (advance) {
int nextIndex, nextBound;//定義下一個(gè)transfer任務(wù)開始bin的下標(biāo)和結(jié)束bin的下標(biāo);
if (--i >= bound || finishing)
advance = false;
else if ((nextIndex = transferIndex) <= 0) {//如果nextIndex已經(jīng)不大于零,那么說明transfer任務(wù)已經(jīng)結(jié)束;
i = -1;
advance = false;
}
else if (U.compareAndSetInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {//嘗試申請(qǐng)下一個(gè)transfer任務(wù)
//申請(qǐng)成功,則將bound賦值開始和結(jié)束bin的下標(biāo)位置(i,bound)
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
//i<0表示transfer任務(wù)已經(jīng)結(jié)束,i大于n表示存在新的擴(kuò)容任務(wù),
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
if (finishing) {//執(zhí)行本輪擴(kuò)容的收尾工作
nextTable = null;
table = nextTab;
sizeCtl = (n << 1) - (n >>> 1);//將原數(shù)組長(zhǎng)度擴(kuò)大兩倍-減去一半即是新數(shù)組長(zhǎng)度的0.75;
return;
}
if (U.compareAndSetInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {//嘗試將sizeCtl減1,表明需要減少一個(gè)活躍擴(kuò)容線程
//sc-2如果除了擴(kuò)容戳以外的低位都是零的話,那么表示當(dāng)前線程是否是最后一個(gè)擴(kuò)容線程,如果是直接返回
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
finishing = advance = true;
i = n; // recheck before commit//重新檢查先是否任務(wù)分配的hash bins都已經(jīng)轉(zhuǎn)移完畢
}
}
else if ((f = tabAt(tab, i)) == null)//如果當(dāng)前bin為null,則直接放置一個(gè)forwardnode表明是擴(kuò)容轉(zhuǎn)移
advance = casTabAt(tab, i, null, fwd);
else if ((fh = f.hash) == MOVED)//如果當(dāng)前bin是forwardnode,那么表示出現(xiàn)了擴(kuò)容重疊,需要重新申請(qǐng)transfer任務(wù)
advance = true; // already processed
else {
synchronized (f) {
if (tabAt(tab, i) == f) {
Node<K,V> ln, hn;
if (fh >= 0) {
//n=2^k,表示第k位為1,而對(duì)于數(shù)組長(zhǎng)度為n,hash&n時(shí)第k位為零說明是新數(shù)組低位bin,而第k為1則說明是新數(shù)組高位bin;
int runBit = fh & n;
Node<K,V> lastRun = f;
for (Node<K,V> p = f.next; p != null; p = p.next) {//找到最后一個(gè)node
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
for (Node<K,V> p = f; p != lastRun; p = p.next) {//根據(jù)lastRun生成node鏈
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
else if (f instanceof TreeBin) {//按照紅黑樹的方式轉(zhuǎn)移bin
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
}
}
}
}
}
resizeStamp(n)解析
resizeStamp方法是用來生成擴(kuò)容戳的,對(duì)于不同數(shù)組長(zhǎng)度n,則擴(kuò)容戳不一樣,同時(shí)每個(gè)擴(kuò)容戳需要容納最大為MAX_RESIZERS=(1 << (32 - RESIZE_STAMP_BITS)) - 1的活躍擴(kuò)容線程,那么sizeCtl-2=resizeStamp(n)<<RESIZE_STAMP_BITS,就應(yīng)該要能表示這三組信息:負(fù)值,表示不同數(shù)組長(zhǎng)度的擴(kuò)容戳和活躍線程數(shù);
先看下源碼:
static final int resizeStamp(int n) {
//獲取數(shù)組長(zhǎng)度n前面零的個(gè)數(shù)一般為32-log(n),那么RESIZE_STAMP_BITS為擴(kuò)容戳需要的位數(shù)
return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1));
}
因?yàn)镽ESIZE_STAMP_BITS需要能展示出當(dāng)前數(shù)組擴(kuò)容次數(shù),而數(shù)組可擴(kuò)容次數(shù)最大值為32-log(n),所以RESIZE_STAMP_BITS>32>2^5,所以RESIZE_STAMP_BITS至少為6位;
則假設(shè)n=2^k,那么經(jīng)過resizeStamp之后得到的rs如下圖:

;
經(jīng)過左移RESIZE_STAMP_SHIFT后如下圖:

如上圖所示最高位為1表示負(fù)數(shù),擴(kuò)容戳的后五位表示當(dāng)前擴(kuò)容次數(shù),32-擴(kuò)容戳所占位數(shù)即可表示活躍擴(kuò)容線程數(shù),活躍擴(kuò)容線程數(shù)為1時(shí)表示初始化,為2時(shí)表示當(dāng)前擴(kuò)容線程數(shù)為1;
