1.原理解析
線程安全的保證:CAS+Synchonized
數(shù)據(jù)存儲實現(xiàn):數(shù)組+鏈表+紅黑樹

1.1 成員變量
- table:
transient volatile Node<K,V>[] table
一個Node類型的表,默認為null,初始化在第一次put操作時,默認大小為16,擴容時大小總是2的冪次方 - nextTable:
private transient volatile Node<K,V>[] nextTable
默認為null,擴容時新生成的數(shù)組,大小為原數(shù)組的2倍 - sizeCtl:
private transient volatile int sizeCtl
默認為0,用來控制table的初始化和擴容操作。
-1 表示正在table初始化,-N表示有N-1個線程正在擴容
如果table未完成初始化,則表示table初始化需要的大小,如果已經(jīng)完成初始化,則表示table的容量,默認為table大小的0.75倍 - Node: 保存key和value以及key的hash值的數(shù)據(jù)結(jié)構(gòu)。其中value和next都用volatile修飾,保證了并發(fā)中的可見性。
static class Node<K,V> implements Map.Entry<K,V> {
final int hash;
final K key;
volatile V val;
volatile Node<K,V> next;
}
- ForwardingNode: 一個特殊的節(jié)點,hash值為-1,只有在擴容的時候使用,作為一個占位符,表示當前節(jié)點為null或者已經(jīng)遷移。
static final class ForwardingNode<K,V> extends Node<K,V> {
final Node<K,V>[] nextTable;
ForwardingNode(Node<K,V>[] tab) {
super(MOVED, null, null, null);
this.nextTable = tab;
}
}
2.常見方法
2.1 初始化table
ConcurrentHashTable的初始化操作是在第一次put操作時進行的,而且只會初始化一次:

這里的initTable,就是對table變量進行初始化操作:
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) {
if ((sc = sizeCtl) < 0)//如果一個線程發(fā)現(xiàn)當前sizeCtl小于0,表明當前有其他
//線程在對table執(zhí)行cas成功,需要當前線程讓出時間片
Thread.yield(); // lost initialization race; just spin
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
if ((tab = table) == null || tab.length == 0) {
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
break;
}
}
return tab;
}
這里講解下U.compareAndSwapInt(this, SIZECTL, sc, -1)操作哈
這里的U是private static final sun.misc.Unsafe U;,是Unsafe類,這個方法CompareAnsSwapInt,就是樂觀鎖CAS了。這里有四個參數(shù),第一個,第二個參數(shù)用來確定當前操作對象在內(nèi)存中的存儲值,然后和第三個expect value比較,如果相等,則將內(nèi)存值更新為第四個updaet value值。關(guān)于CAS的介紹,在這里詳細介紹。在原子性的保證下,將sc的值設置為-1,表明當前table正在初始化。
2.2 put操作
直接看源碼,參看源碼進行解釋:
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0)
tab = initTable();
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else {
V oldVal = null;
synchronized (f) {
if (tabAt(tab, i) == f) {
if (fh >= 0) {
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
addCount(1L, binCount);
return null;
}
putVal的操作可以分為三個步驟【假設table已經(jīng)初始化了】:
- bucket為空時,采用CAS將Node放到對應的bucket中
- 當前map正在擴容
f.hash == MOVED,則先進行擴容,再進行更新值 - 發(fā)生hash沖突時,先使用synchonized字鎖住。倘若當前的hash值對應的是鏈表的頭節(jié)點,則遍歷鏈表,如果能找到hash對應的節(jié)點,則更改對應的值,否則在鏈表的尾部增加節(jié)點;倘若當前的hash值對應的是紅黑樹的根節(jié)點,則在樹結(jié)構(gòu)上遍歷,進行更新或者插入操作。
整個putval的操作中,是用for循環(huán)來外包的。當遇到map正在擴容時,沒有break操作,會等待map擴容結(jié)束后,進行更新或者插入值。
這里一些細節(jié)點再進行介紹下:
2.2.1 hash算法
這里和hashmap一樣:
static final int HASH_BITS = 0x7fffffff; // usable bits of normal node hash
static final int spread(int h) {
return (h ^ (h >>> 16)) & HASH_BITS;
}
2.2.2 獲取table所對應的索引元素
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
}
采用Unsafe.getObjectVolatie()來獲取,而不是直接用table[index]的原因跟ConcurrentHashMap的弱一致性有關(guān)。在java內(nèi)存模型中,我們已經(jīng)知道每個線程都有一個工作內(nèi)存,里面存儲著table的副本,雖然table是volatile修飾的,但不能保證線程每次都拿到table中的最新元素,Unsafe.getObjectVolatile可以直接獲取指定內(nèi)存的數(shù)據(jù),保證了每次拿到數(shù)據(jù)都是最新的。
2.3 擴容
為什么會擴容?
- 當ConcurrentHashMap的tab長度大于64時,會使用紅黑樹
- 新增節(jié)點后,如果鏈表的長度大于8時,會調(diào)用
treeifyBin把鏈表轉(zhuǎn)換為紅黑樹。在轉(zhuǎn)換結(jié)構(gòu)時,如果tab的長度小于MIN_TREEIFY_CAPACITY,默認是64,則會將數(shù)組長度擴大到原來的兩倍。并觸發(fā)tranfer,重新調(diào)整節(jié)點的位置 - 新增節(jié)點后,如果
addCount中統(tǒng)計的節(jié)點數(shù)超過sizeCtl,也會觸發(fā)tranfer,進行位置調(diào)整
2.3.1 addCount
private final void addCount(long x, int check) {
CounterCell[] as; long b, s;
if ((as = counterCells) != null ||
!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
CounterCell a; long v; int m;
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 ||
(a = as[ThreadLocalRandom.getProbe() & m]) == null ||
!(uncontended =
U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
fullAddCount(x, uncontended);
return;
}
if (check <= 1)
return;
s = sumCount();
}
if (check >= 0) {
Node<K,V>[] tab, nt; int n, sc;
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
int rs = resizeStamp(n);
if (sc < 0) {
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
s = sumCount();
}
}
}
每次put后,會計算節(jié)點的數(shù)量
2.3.2 treeifyBin
private final void treeifyBin(Node<K,V>[] tab, int index) {
Node<K,V> b; int n, sc;
if (tab != null) {
if ((n = tab.length) < MIN_TREEIFY_CAPACITY)
tryPresize(n << 1);
else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {
synchronized (b) {
if (tabAt(tab, index) == b) {
TreeNode<K,V> hd = null, tl = null;
for (Node<K,V> e = b; e != null; e = e.next) {
TreeNode<K,V> p =
new TreeNode<K,V>(e.hash, e.key, e.val,
null, null);
if ((p.prev = tl) == null)
hd = p;
else
tl.next = p;
tl = p;
}
setTabAt(tab, index, new TreeBin<K,V>(hd));
}
}
}
}
}
將鏈表轉(zhuǎn)換為紅黑樹
2.3.3 transfer
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
if (nextTab == null) { // initiating
try {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
transferIndex = n;
}
int nextn = nextTab.length;
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
boolean advance = true;
boolean finishing = false; // to ensure sweep before committing nextTab
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
while (advance) {
int nextIndex, nextBound;
if (--i >= bound || finishing)
advance = false;
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
if (finishing) {
nextTable = null;
table = nextTab;
sizeCtl = (n << 1) - (n >>> 1);
return;
}
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
finishing = advance = true;
i = n; // recheck before commit
}
}
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
else if ((fh = f.hash) == MOVED)
advance = true; // already processed
else {
synchronized (f) {
if (tabAt(tab, i) == f) {
Node<K,V> ln, hn;
if (fh >= 0) {
int runBit = fh & n;
Node<K,V> lastRun = f;
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
else if (f instanceof TreeBin) {
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
}
}
}
}
}
擴容的代碼有點長,簡單的過程描述是,當tab元素的量達到容量閾值sizeCtl時,觸發(fā)擴容:
- 構(gòu)建一個NextTable,其大小為Table的兩倍
- 把table中的數(shù)據(jù),復制到NextTable中
在擴容的過程中,依然支持并發(fā)更新操作,也支持并發(fā)插入。
如何在擴容時,并發(fā)地復制與插入?
- 遍歷整個table,當前節(jié)點為空,則采用CAS的方式在當前位置放入fwd
- 當前節(jié)點已經(jīng)為fwd(with hash field “MOVED”),則已經(jīng)有有線程處理完了了,直接跳過 ,這里是控制并發(fā)擴容的核心
- 當前節(jié)點為鏈表節(jié)點或紅黑樹,重新計算鏈表節(jié)點的hash值,移動到nextTable相應的位置(構(gòu)建了一個反序鏈表和順序鏈表,分別放置在i和i+n的位置上)。移動完成后,用Unsafe.putObjectVolatile在tab的原位置賦為為fwd, 表示當前節(jié)點已經(jīng)完成擴容。
2.4 get 讀操作
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
int h = spread(key.hashCode());
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
else if (eh < 0)
return (p = e.find(h, key)) != null ? p.val : null;
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}
讀操作比較簡單了,不需要控制并發(fā),如果tab為空,返回null,否則計算hash值,找到bucket中對應的位置,如果是node節(jié)點直接返回,否則,返回null。
3.在jdk1.7和1.8之間的不同
3.1 并發(fā)機制
- 在jdk1.7中,concurrentHashMap是通過鎖分段技術(shù)。把整個table分為幾個segment,每個segment中分配一個鎖,每個segment下,又分table+HashEntry。通過給segment加一個重入鎖,實現(xiàn)線程安全?!鞠啾扔趆ashtable,就是把hashtable分割為多個段,細化鎖的粒度】。
- 在jdk1.8中,直接使用CAS + synchronized保證并發(fā)更新。粒度更細,不會在一個segment上加一把鎖了。
3.2 put操作
- 在1.7中,多個線程同時競爭獲取同一個segment鎖,獲取成功的線程更新map;失敗的線程嘗試多次獲取鎖仍未成功,則掛起線程,等待釋放鎖
- 在1.8中,訪問相應的bucket時,使用sychronizeded關(guān)鍵字,防止多個線程同時操作同一個bucket,如果該節(jié)點的hash不小于0,則遍歷鏈表更新節(jié)點或插入新節(jié)點;如果該節(jié)點是TreeBin類型的節(jié)點,說明是紅黑樹結(jié)構(gòu),則通過putTreeVal方法往紅黑樹中插入節(jié)點;更新了節(jié)點數(shù)量,還要考慮擴容和鏈表轉(zhuǎn)紅黑樹
4 ConcurrentHashMap能替換HashTable嗎?
hash table雖然性能上不如ConcurrentHashMap,但并不能完全被取代,兩者的迭代器的一致性不同的,hash table的迭代器是強一致性的,而concurrenthashmap是弱一致的。 ConcurrentHashMap的get,clear,iterator 都是弱一致性的。
下面是大白話的解釋:
- Hashtable的任何操作都會把整個表鎖住,是阻塞的。好處是總能獲取最實時的更新,比如說線程A調(diào)用putAll寫入大量數(shù)據(jù),期間線程B調(diào)用get,線程B就會被阻塞,直到線程A完成putAll,因此線程B肯定能獲取到線程A寫入的完整數(shù)據(jù)。壞處是所有調(diào)用都要排隊,效率較低。
- ConcurrentHashMap 是設計為非阻塞的。在更新時會局部鎖住某部分數(shù)據(jù),但不會把整個表都鎖住。同步讀取操作則是完全非阻塞的。好處是在保證合理的同步前提下,效率很高。壞處 是嚴格來說讀取操作不能保證反映最近的更新。例如線程A調(diào)用putAll寫入大量數(shù)據(jù),期間線程B調(diào)用get,則只能get到目前為止已經(jīng)順利插入的部分 數(shù)據(jù)。
選擇哪一個,是在性能與數(shù)據(jù)一致性之間權(quán)衡。ConcurrentHashMap適用于追求性能的場景,大多數(shù)線程都只做insert/delete操作,對讀取數(shù)據(jù)的一致性要求較低。
Reference:
[1] https://blog.csdn.net/programmer_at/article/details/79715177