參考文章:http://www.itdecent.cn/p/bd972088a494
數(shù)據(jù)結(jié)構(gòu)

構(gòu)造函數(shù)
put 的實(shí)現(xiàn)
public V put(K key, V value) {
Segment<K,V> s;
if (value == null)
throw new NullPointerException();
int hash = hash(key);
//定位Segment,讓Hash右移動(dòng)segmentShift位,默認(rèn)情況下就是28位(總長(zhǎng)32位),之后和segmentMask(0XFF)做與操作,得到段的索引
int j = (hash >>> segmentShift) & segmentMask;
//利用UNSAFE.getObject中的方法獲取到目標(biāo)的Segment。
if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck
(segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment
//如果沒有取到目標(biāo)Segment,所以需要保證能取到這個(gè)Segment,沒有的話創(chuàng)建一個(gè)Segment
s = ensureSegment(j);
//代理到Segment的put方法
return s.put(key, hash, value, false);
}
首先是(Segment<K,V>)UNSAFE.getObject(segments, (j << SSHIFT) + SBASE)), UNSAFE這種用法是在JDK1.6中沒有的,主要是利用Native方法來(lái)快速的定位元素。
還有一個(gè)點(diǎn)是:ensureSegment()
private Segment<K,V> ensureSegment(int k) {
final Segment<K,V>[] ss = this.segments;
long u = (k << SSHIFT) + SBASE; // raw offset
Segment<K,V> seg;
//getObjectVolatile是以Volatile的方式獲得目標(biāo)的Segment,Volatile是為了保證可見性。
if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) {
//如果沒有取到,那么證明指定的Segment不存在,那么需要新建Segment,方式是以ss[0]為鏡像創(chuàng)建。
Segment<K,V> proto = ss[0]; // use segment 0 as prototype
int cap = proto.table.length;
float lf = proto.loadFactor;
int threshold = (int)(cap * lf);
HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap];
if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
== null) { // 再次檢查
Segment<K,V> s = new Segment<K,V>(lf, threshold, tab);//創(chuàng)建新Segment
//以CAS的方式,將新建的Segment,set到指定的位置。
while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
== null) {
if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))
break;
}
}
}
return seg;
}
上面的代碼就是保證,在put之前,要保證目標(biāo)的Segment是存在的,不存在需要?jiǎng)?chuàng)建一個(gè)Segment。
put方法代理到了Segment的put方法,Segment extends 了ReentrantLock,以至于它能當(dāng)做一個(gè)Lock使用。那么我們看一下Segment的put的實(shí)現(xiàn):
進(jìn)入Segment的put操作時(shí)先進(jìn)行加鎖保護(hù)。如果加鎖沒有成功,調(diào)用scanAndLockForPut方法(詳細(xì)步驟見下面scanAndLockForPut()源碼分析)進(jìn)入自旋狀態(tài),該方法持續(xù)查找key對(duì)應(yīng)的節(jié)點(diǎn)鏈中是已存在該機(jī)節(jié)點(diǎn),如果沒有找到,則預(yù)創(chuàng)建一個(gè)新節(jié)點(diǎn),并且嘗試n次,直到嘗試次數(shù)操作限制,才真正進(jìn)入加鎖等待狀態(tài),自旋結(jié)束并返回節(jié)點(diǎn)(如果返回了一個(gè)非空節(jié)點(diǎn),則表示在鏈表中沒有找到相應(yīng)的節(jié)點(diǎn))。對(duì)最大嘗試次數(shù),目前的實(shí)現(xiàn)單核次數(shù)為1,多核為64。
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
//因?yàn)閜ut操作會(huì)改變整體的結(jié)構(gòu),所以需要保證段的線程安全性,所以首先tryLock
HashEntry<K,V> node = tryLock() ? null :
scanAndLockForPut(key, hash, value);
V oldValue;
try {
//新建tab引用,避免直接引用Volatile導(dǎo)致性能損耗,
HashEntry<K,V>[] tab = table;
int index = (tab.length - 1) & hash;
//Volatile讀,保證可見性
HashEntry<K,V> first = entryAt(tab, index);
for (HashEntry<K,V> e = first;;) {
if (e != null) {
K k;
//遍歷HashEntry數(shù)組,尋找可替換的HashEntry
if ((k = e.key) == key ||
(e.hash == hash && key.equals(k))) {
oldValue = e.value;
if (!onlyIfAbsent) {
e.value = value;
++modCount;
}
break;
}
e = e.next;
}
else {
//如果不存在可替換的HashEntry,如果在scanAndLockForPut中建立了此Node直接SetNext,追加到鏈表頭
if (node != null)
node.setNext(first);
else
//如果沒有則新建一個(gè)Node,添加到鏈表頭
node = new HashEntry<K,V>(hash, key, value, first);
//容量計(jì)數(shù)+1
int c = count + 1;
//如果容量不足,那么擴(kuò)容
if (c > threshold && tab.length < MAXIMUM_CAPACITY)
rehash(node);
else
//以Volatile寫的方式,替換tab[index]的引用
setEntryAt(tab, index, node);
++modCount;
count = c;
oldValue = null;
break;
}
}
} finally {
unlock();
}
return oldValue;
}
put方法是做了加鎖操作的,所以不用過多的考慮線程安全的問題,但是get操作為了保證性能是沒有加鎖的,所以需要盡量的保證數(shù)據(jù)的可見性,能讓get得到最新的數(shù)據(jù)。
讓我們看看 scanAndLockForPut(key, hash, value) 在做什么:
private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
HashEntry<K,V> first = entryForHash(this, hash);
HashEntry<K,V> e = first;
HashEntry<K,V> node = null;
int retries = -1; // negative while locating node
while (!tryLock()) {
HashEntry<K,V> f; // to recheck first below
if (retries < 0) {
if (e == null) {
if (node == null) // speculatively create node
node = new HashEntry<K,V>(hash, key, value, null);
retries = 0;
}
else if (key.equals(e.key))
retries = 0;
else
e = e.next;
}
else if (++retries > MAX_SCAN_RETRIES) {
lock();
break;
}
else if ((retries & 1) == 0 &&
(f = entryForHash(this, hash)) != first) {
e = first = f; // re-traverse if entry changed
retries = -1;
}
}
return node;
}
當(dāng)put操作嘗試加鎖沒成功時(shí),它不是直接進(jìn)入等待狀態(tài),而是調(diào)用了scanAndLockForPut()操作,該操作持續(xù)查找key對(duì)應(yīng)的節(jié)點(diǎn)鏈中是已存在該機(jī)節(jié)點(diǎn),如果沒有找到已存在的節(jié)點(diǎn),則預(yù)創(chuàng)建一個(gè)新節(jié)點(diǎn),并且嘗試n次,直到嘗試次數(shù)操作限制,才真正進(jìn)入等待狀態(tài),計(jì)所謂的自旋等待。對(duì)最大嘗試次數(shù),目前的實(shí)現(xiàn)單核次數(shù)為1,多核為64。
在這段邏輯中,它先獲取key對(duì)應(yīng)的節(jié)點(diǎn)鏈的頭,然后持續(xù)遍歷該鏈,如果節(jié)點(diǎn)鏈中不存在要插入的節(jié)點(diǎn),則預(yù)創(chuàng)建一個(gè)節(jié)點(diǎn),否則retries值資增,直到操作最大嘗試次數(shù)而進(jìn)入等待狀態(tài)。這里需要注意最后一個(gè)else if中的邏輯:當(dāng)在自旋過程中發(fā)現(xiàn)節(jié)點(diǎn)鏈的鏈頭發(fā)生了變化,則更新節(jié)點(diǎn)鏈的鏈頭,并重置retries值為-1,重新為嘗試獲取鎖而自旋遍歷。
為什么要這么做呢?為了事先做數(shù)據(jù)的緩存,讓這些數(shù)據(jù)緩存在CPU的cache中,這樣后續(xù)在使用時(shí)能避免Cache missing。ps:scanAndLockForPut有個(gè)孿生兄弟scanAndLock,作用都差不多。
和 JDK 1.6 的實(shí)現(xiàn)的不同:
V put(K key, int hash, V value, boolean onlyIfAbsent) {
lock();
try {
int c = count;
if (c++ > threshold) // ensure capacity
rehash();
HashEntry<K,V>[] tab = table;
int index = hash & (tab.length - 1);
HashEntry<K,V> first = tab[index];
HashEntry<K,V> e = first;
while (e != null && (e.hash != hash || !key.equals(e.key)))
e = e.next;
V oldValue;
if (e != null) {
oldValue = e.value;
if (!onlyIfAbsent)
e.value = value;
}
else {
oldValue = null;
++modCount;
tab[index] = new HashEntry<K,V>(key, hash, first, value);
count = c; // write-volatile
}
return oldValue;
} finally {
unlock();
}
}
JDK 1.6 的實(shí)現(xiàn)和 JDK 1.7 的實(shí)現(xiàn)比較相似,但是主要區(qū)別是,沒有使用一些 UNSAFE 的方法去保證內(nèi)存的可見性,而是通過一個(gè)Volatile變量——count去實(shí)現(xiàn)。在開始的時(shí)候讀count保證lock的內(nèi)存語(yǔ)意,最后寫count實(shí)現(xiàn)unlock的內(nèi)存語(yǔ)意。
但是這里存在一個(gè)問題,new HashEntry操作存在重排序問題,導(dǎo)致在getValue的時(shí)候tab[index]不為null,但是value為null。所以在 get 的時(shí)候會(huì)有一步重新檢查的步驟,如果 value 為 null 的話就把 segment 段加鎖在重新 get 一次。具體原因后面詳細(xì)說明。
get 的實(shí)現(xiàn)
public V get(Object key) {
Segment<K,V> s; // manually integrate access methods to reduce overhead
HashEntry<K,V>[] tab;
int h = hash(key);
long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
(tab = s.table) != null) {
for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
(tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
e != null; e = e.next) {
K k;
if ((k = e.key) == key || (e.hash == h && key.equals(k)))
return e.value;
}
}
return null;
}
可以看出來(lái),get方法很簡(jiǎn)單,同時(shí)get是沒有加鎖的,那么get是如何保證可見性的呢?首先獲取指定index的Segment,利用getObjectVolatile獲取指定index的first HashEntry,之后遍歷HashEntry鏈表,這里比較關(guān)鍵的是HashEntry的數(shù)據(jù)結(jié)構(gòu):
static final class HashEntry<K,V> {
final int hash;
final K key;
volatile V value;
volatile HashEntry<K,V> next;
}
兩個(gè)變量是volatile的,也就是說,兩個(gè)變量的讀寫能保證數(shù)據(jù)的可見性。
所以在變量HashEntry時(shí),總能保證得到最新的值。
JKD1.6的get方法的實(shí)現(xiàn):
V get(Object key, int hash) {
if (count != 0) { // read-volatile 當(dāng)前桶的數(shù)據(jù)個(gè)數(shù)是否為0
HashEntry<K,V> e = getFirst(hash); 得到頭節(jié)點(diǎn)
while (e != null) {
if (e.hash == hash && key.equals(e.key)) {
V v = e.value;
if (v != null)
return v;
return readValueUnderLock(e); // recheck
}
e = e.next;
}
}
return null;
}
首先是讀取count變量,因?yàn)閮?nèi)存的可見性,總是能返回最新的結(jié)構(gòu),但是對(duì)于getFirst可能得到的是過時(shí)的HashEntry。接下來(lái)獲取到HashEntry之后getValue。但是這里為什么要做一個(gè)value的判空,原因就是上一步put的重排序問題,如果為null,那么只能加鎖,加鎖之后進(jìn)行重新讀取。但是這樣確實(shí)會(huì)帶來(lái)一些開銷。
為什么 JDK 1.6 的實(shí)現(xiàn)是弱一致性的?
這里比較重要的一點(diǎn)就是,為什么JDK1.6的是弱一致性的?因?yàn)镴DK1.6的所有可見性都是以count實(shí)現(xiàn)的,當(dāng)put和get并發(fā)時(shí),get可能獲取不到最新的結(jié)果,這就是JDK1.6中ConcurrentHashMap弱一致性問題,主要問題是 tab[index] = new HashEntry<K,V>(key, hash, first, value); 不一定 happened before getFirst(hash);
如下圖可以說明:
| 執(zhí)行put的線程 | 執(zhí)行g(shù)et的線程 |
|---|---|
| ⑧tab[index] = new HashEntry<K,V>(key, hash, first, value) | |
| ③if (count != 0) | |
| ②count = c | |
| ⑨HashEntry e = getFirst(hash); |
對(duì)volatile的count的讀時(shí)間上發(fā)生在對(duì)count的寫之前,我們無(wú)法得出② hb ⑨這層關(guān)系了。因此,通過count變量,在這個(gè)軌跡上是無(wú)法得出⑧ hb ⑨的。
而JDK1.7的實(shí)現(xiàn),對(duì)于每一個(gè)操作都是Volatile變量的操作,能保證線程之間的可見性,所以不存在弱一致性的問題。(對(duì)這句話持有保留觀點(diǎn))
remove 的實(shí)現(xiàn)(具體實(shí)現(xiàn)以及和 1.6 對(duì)比要補(bǔ)上)
final V remove(Object key, int hash, Object value) {
if (!tryLock())
scanAndLock(key, hash);
V oldValue = null;
try {
HashEntry<K,V>[] tab = table;
int index = (tab.length - 1) & hash;
HashEntry<K,V> e = entryAt(tab, index);
HashEntry<K,V> pred = null;
while (e != null) {
K k;
HashEntry<K,V> next = e.next;
if ((k = e.key) == key ||
(e.hash == hash && key.equals(k))) {
V v = e.value;
if (value == null || value == v || value.equals(v)) {
if (pred == null)
setEntryAt(tab, index, next);
else
pred.setNext(next);
++modCount;
--count;
oldValue = v;
}
break;
}
pred = e;
e = next;
}
} finally {
unlock();
}
return oldValue;
}
這里的 remove 實(shí)現(xiàn)我在公司里看到一篇文章,但是記不清在哪里看到的了。大致上說的是,remove 操作的時(shí)候不需要向 JDK 1.6 的那樣,將要?jiǎng)h除的節(jié)點(diǎn)賦值一遍,然后刪除對(duì)應(yīng)的元素, 再將最后一個(gè)元素的 next 元素連接上去。 而是用了 CAS 的操作實(shí)現(xiàn)的刪除。
size 的實(shí)現(xiàn)
public int size() {
// 主要的方法是先遍歷 2 次 Segment,如果兩次遍歷的結(jié)果得到的 size 相同
//那么就認(rèn)為 size 的結(jié)果是準(zhǔn)確的,否則就要對(duì)每一個(gè) Segment 加鎖重新計(jì)算 size 的大小
final Segment<K,V>[] segments = this.segments;
int size;
boolean overflow; // true if size overflows 32 bits
long sum; // sum of modCounts
long last = 0L; // previous sum
int retries = -1; // first iteration isn't retry
try {
for (;;) {
if (retries++ == RETRIES_BEFORE_LOCK) {
for (int j = 0; j < segments.length; ++j)
ensureSegment(j).lock(); // force creation
}
sum = 0L;
size = 0;
overflow = false;
for (int j = 0; j < segments.length; ++j) {
Segment<K,V> seg = segmentAt(segments, j);
if (seg != null) {
sum += seg.modCount;
int c = seg.count;
if (c < 0 || (size += c) < 0)
overflow = true;
}
}
if (sum == last)
break;
last = sum;
}
} finally {
if (retries > RETRIES_BEFORE_LOCK) {
for (int j = 0; j < segments.length; ++j)
segmentAt(segments, j).unlock();
}
}
return overflow ? Integer.MAX_VALUE : size;
}
主要的方法是先遍歷 2 次 Segment,如果兩次遍歷的結(jié)果得到的 size 相同,那么就認(rèn)為 size 的結(jié)果是準(zhǔn)確的,否則就要對(duì)每一個(gè) Segment 加鎖重新計(jì)算 size 的大小。獲取鎖的操作在這里
if (retries++ == RETRIES_BEFORE_LOCK) {
for (int j = 0; j < segments.length; ++j)
ensureSegment(j).lock(); // force creation
}
JDK 6 和 7 的區(qū)別
總體來(lái)說變化挺多,不過總體的結(jié)構(gòu)并沒有發(fā)生改變,還是采用了 segment 分?jǐn)噫i的實(shí)現(xiàn)。 1.6 主要用 count 的 volatile 語(yǔ)義來(lái)實(shí)現(xiàn)put、get。1.7 主要使用 unsafe 的 CAS 操作來(lái)實(shí)現(xiàn)。