本文不是討論ConcurrentHashMap的數(shù)據(jù)結(jié)構(gòu),而是討論它的內(nèi)存語義和并發(fā)操作。
ConcurrentHashMap的三個并發(fā)目標(biāo):
(寫者指的是會修改數(shù)據(jù)結(jié)構(gòu)(鏈表/紅黑樹)的線程)
- 任何時刻讀者都不需要等待
- 如果在時刻A寫者 完成了 修改,在時刻B讀者 開始 讀,而且時刻B晚于時刻A,那么讀者將會看到這些修改(i.e. Happens-before) 。所以,我們也可以說:在時刻A寫者 開始 修改,在時刻B讀者 開始 讀,而且時刻B晚于時刻A,那么讀者可能看到這些修改,也可能看不到。
- 任何時刻不會有兩個寫者。
預(yù)備知識(本文假設(shè)你都已經(jīng)了解這些知識):
- 第二個目標(biāo)其實就是一個Happens-Before關(guān)系。Happens-Before關(guān)系是一個內(nèi)存語義,用volatile變量就可以獲得這種內(nèi)存語義。
- 如果一個類含有final域,那么這個類的對象不可能是部分構(gòu)造的。
- CAS操作
如何實現(xiàn)這三個并發(fā)目標(biāo):(分兩種情況)
bin是鏈表:
static class Node<K,V> implements Map.Entry<K,V> {
final int hash;
final K key;
volatile V val;
volatile Node<K,V> next;
Node(int hash, K key, V val, Node<K,V> next) {
this.hash = hash;
this.key = key;
this.val = val;
this.next = next;
}
...
對于第一個目標(biāo),get()函數(shù)中直接就是不加鎖地遍歷整個鏈表
public V get(Object key) {
...
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
...
}
對于第二個目標(biāo),可以分成三種情況:
- 修改一個已存在的結(jié)點的val域
由于val是一個volatile變量,第二個目標(biāo)已達成。 - 在鏈表的末尾插入一個結(jié)點。比如現(xiàn)在需要將B插入到end的后面,那么:
next域也是一個volatile變量,而且B一定是一個完全構(gòu)造好的對象,第二個目標(biāo)也達成。Node<K, V> B = new Node<>(.., .., .., null); end.next = B; - 刪除一個已存在的結(jié)點,比如有:...->A->B->...,現(xiàn)在要刪除B,那么:
和插入一樣,next域也是一個volatile變量,第二個目標(biāo)也達成。A.next = B.next;
對于第三個目標(biāo),每次寫者都需要獲得bin鎖(以bin的第一個結(jié)點作為這個bin的鎖)才能進行修改操作。還有一個細節(jié),如果這是一個空bin(i.e. bin的第一個結(jié)點為null),那么直接用CAS操作將新創(chuàng)建出的結(jié)點賦給bin的第一個結(jié)點。
final V putVal(K key, V value, boolean onlyIfAbsent) {
...
f是一個Node<K,V>類型的對象,它現(xiàn)在指向是bin的第一個結(jié)點
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
...
else { 進入這里說明鍵key不存在,需要插入一個結(jié)點
synchronized (f) { 加鎖
if (tabAt(tab, i) == f) { 確認(rèn)f仍然是bin的第一個結(jié)點
if(f是鏈表結(jié)點)
...鏈表的插入操作
else if (f instanceof TreeBin)
...紅黑樹的插入操作(在后面分析,在這里只需要記住,即使是樹插入,也要先獲得bin鎖
}//if
}//synchronized
}//else
...
bin是紅黑樹(重點):
有必要先知道ConcurrentHashMap是怎樣表示紅黑樹的:
如果一個bin是紅黑樹結(jié)構(gòu),那么這個bin的第一個結(jié)點是TreeBin類型的(正如上面的代碼所示)。TreeBin中不存放key-value對,root指向真正紅黑樹的根,而它的first指向鏈表的第一個結(jié)點。在這里要注意:即使樹化后,ConcurrentHashMap仍然維持鏈表結(jié)構(gòu),因為next域是在父類Node<K,V>中。所以,可以這樣說:如果這個bin樹化了,那么既可以用紅黑樹來遍歷它的結(jié)點,也可以用鏈表來遍歷它的結(jié)點。還有一個細節(jié)就是,TreeNode<K,V>中有一個prev,用來(在鏈表結(jié)構(gòu)中)保存前一個結(jié)點,在用紅黑樹的刪除算法時會用到。
static final class TreeBin<K,V> extends Node<K,V> {
TreeNode<K,V> root;
volatile TreeNode<K,V> first;
volatile Thread waiter;
volatile int lockState;
// values for lockState
static final int WRITER = 1; // set while holding write lock
static final int WAITER = 2; // set when waiting for write lock
static final int READER = 4; // increment value for setting read lock
....
}
static final class TreeNode<K,V> extends Node<K,V> {
TreeNode<K,V> parent; // red-black tree links
TreeNode<K,V> left;
TreeNode<K,V> right;
TreeNode<K,V> prev; // needed to unlink next upon deletion
boolean red;
...
}
回到并發(fā),先談總體策略:(下面都假設(shè)bin已樹化)
- 與鏈表結(jié)構(gòu)的bin不同,如果一個寫者正在修改紅黑樹結(jié)構(gòu),那么讀者不能用紅黑樹來讀?!驗檫@樣沒有保證第二個目標(biāo)。Doug Leg給出了以下注釋:
* TreeBins also require an additional locking mechanism. While
* list traversal is always possible by readers even during
* updates, tree traversal is not, mainly because of tree-rotations
* that may change the root node and/or its linkages.
再結(jié)合Java對java.util.concurrent.collection的說明:
The methods of all classes in java.util.concurrent and its subpackages extend these guarantees to higher-levelsynchronization. In particular: Actions in a thread prior to placing an object into any concurrent collection happen-before actions subsequent to the access or removal of that element from the collection in another thread.
紅黑樹的插入/刪除是有可能旋轉(zhuǎn)樹的,如果寫者和讀者能并發(fā)執(zhí)行,有可能發(fā)生這樣一種情況:假設(shè)結(jié)點N已存在于紅黑樹中。寫者插入M結(jié)點,讀者讀N結(jié)點。因為寫者旋轉(zhuǎn)了樹,導(dǎo)致讀者讀不到N。這種情境下placing an object(N) into collection Happens-Before access that element(N)的關(guān)系不成立。所以,讀者和寫者一定不能并發(fā)執(zhí)行。
- 如果一個寫者正在做修改,讀者就用鏈表結(jié)構(gòu)去遍歷(這會慢一點,而且沒有利用紅黑樹)。這就對應(yīng)了第一個并發(fā)目標(biāo)。
- 現(xiàn)在,其實就是一個讀者-寫者問題。
3.1 同一時刻允許多個讀者進行讀。
3.2 只要有讀者在讀,寫者就不能進入,此時寫者就變成了等待者,它在等待最后一個讀者離開。
3.3 最后一個離開的讀者必須喚醒等待者(如果有的話)
3.4 只有一個寫者(Recall: bin鎖)
其實這個問題可以用信號量非常優(yōu)雅地解決,作為熱身,先看一下CS:APP給出的解法。
/*
* Readers-writers solution with weak reader priority
* From Courtois et al, CACM, 1971.
*/
#include "csapp.h"
int readcnt; /* Initially = 0 */
sem_t mutex, w; /* Both initially = 1 */
void reader(void)
{
while (1) {
P(&mutex);
readcnt++;
if (readcnt == 1) /* First in */
P(&w);
V(&mutex);
/* Critical section */
/* Reading happens */
P(&mutex);
readcnt--;
if (readcnt == 0) /* Last out */
V(&w);
V(&mutex);
}
}
void writer(void)
{
while (1) {
P(&w);
/* Critical section */
/* Writing happens */
V(&w);
}
}
mutex可以理解為訪問readcnt的鎖,每次訪問readcnt之前都要去獲取這個鎖。w作為寫鎖,寫者只有獲得這個鎖才可以進入。這里的思想是:第一個進入的讀者首先去獲取寫鎖,這樣寫者就無辦法進入了(于是,它變?yōu)榈却撸W詈笠粋€離開的讀者必須釋放寫鎖(然后等待者就可以進入了)。
為什么要用兩個鎖呢?只用一個鎖會怎樣?(這部分可能有點啰嗦了)
/*
* Naive readers-writers solution
*/
#include "csapp.h"
/* Global variable */
sem_t w; /* Initially = 1 */
void reader(void)
{
while (1) {
P(&w);
/* Critical section: */
/* Reading happens */
V(&w);
}
}
void writer(void)
{
while (1) {
P(&w);
/* Critical section: */
/* Writing happens */
V(&w);
}
}
上面這樣的寫法,雖然也可以保證讀者和寫者不會并發(fā)執(zhí)行,但也使得多個讀者不能并發(fā)執(zhí)行。OK,下面來看ConcurrentHashMap的解法...
下面的代碼有點復(fù)雜,用了位操作,留意注釋,涉及到TreeBin中的:
volatile Thread waiter;
volatile int lockState;
// values for lockState
static final int WRITER = 1; //set while holding write lock
static final int WAITER = 2; //set when waiting for write lock
static final int READER = 4; //increment value for setting read lock
分兩個角度 (以下:‘我’指當(dāng)前線程)
讀者
- 如果已有等待者進入,或已有寫者進入,那么我無法進入紅黑樹,只能按鏈表讀 ——for循環(huán)中第一個if
- 如果既沒有等待者,又沒有寫者,那么嘗試將讀者的數(shù)量加一,并進入——for循環(huán)中的else if
- 讀完后,我嘗試離開。如果嘗試離開成功,而且發(fā)現(xiàn)自己是最后一個讀者,而且有等待者在等待,那么將等待者喚醒?!猣or循環(huán)中的最后一個if
final Node<K,V> find(int h, Object k) {
if (k != null) {
first是鏈表的第一個結(jié)點,而且它是'volatile'修飾的
當(dāng)走到鏈表末尾時,退出循環(huán)
for (Node<K,V> e = first; e != null; ) {
int s; K ek;
if (((s = lockState) & (WAITER|WRITER)) != 0) {
if (e是我們要找的結(jié)點嗎)
return e;
e = e.next; 鏈表遍歷操作
}
else if (U.compareAndSwapInt(this, LOCKSTATE, s,
s + READER)) {
TreeNode<K,V> r, p;
try {
p = ((r = root) == null ? null :
r.findTreeNode(h, k, null)); 紅黑樹讀
} finally {
Thread w;
if (U.getAndAddInt(this, LOCKSTATE, -READER) ==
(READER|WAITER) && (w = waiter) != null)
LockSupport.unpark(w); 使等待者可以被調(diào)度
}
return p;
}
}
}
return null;
}
上面的全部是CAS操作,所以我用“嘗試”這個字眼,如果嘗試失敗,那么下一次循環(huán)再試。而waiter, lockState是volatile變量,它們一定是最新的。
寫者
- 所有的修改方法(removeTreeNode(), putTreeVal())都調(diào)用lockRoot()方法
- lockRoot()中首先判斷有無讀者?有無等待者?有無寫者?如果都無,我進入,成為寫者。如果其中有任意一個,那么調(diào)用contendedLock()——競爭鎖
- 如果有一個等待者,或沒有讀者,那么我就進入成為寫者?!猣or循環(huán)中的第一個if
3.1關(guān)鍵:任何時刻,如果有等待者,就不可能有寫者
3.2 任何時刻,如果有讀者,就不可能有等待者,也不可能有寫者
3.2 如果在這一個判斷中發(fā)現(xiàn)已存在一個等待者,其實就是當(dāng)前線程自身。因為它之前嘗試進入失敗,所以成為了等待者。現(xiàn)在沒有讀者,它也就能成為寫者了。 - 如果嘗試成為寫者失敗,又沒有等待者,那么我就要成為等待者。——for循環(huán)中的else if
/**
* Acquires write lock for tree restructuring.
*/
private final void lockRoot() {
if (!U.compareAndSwapInt(this, LOCKSTATE, 0, WRITER))
contendedLock(); // offload to separate method
}
/**
* Possibly blocks awaiting root lock.
*/
private final void contendedLock() {
boolean waiting = false;
for (int s;;) {
if (((s = lockState) & ~WAITER) == 0) {
if (U.compareAndSwapInt(this, LOCKSTATE, s, WRITER)) {
if (waiting)
waiter = null;
return;
}
}
else if ((s & WAITER) == 0) {
if (U.compareAndSwapInt(this, LOCKSTATE, s, s | WAITER)) {
waiting = true;
waiter = Thread.currentThread();
}
}
else if (waiting)
LockSupport.park(this); 禁止等待者被系統(tǒng)調(diào)度,等待者需要被最后一個離開的讀者顯式地喚醒
}
}
回顧和對比
ConcurrentHashMap中的算法和CS:APP的思想都是一樣的,只不過用了非阻塞算法(CAS),使得它復(fù)雜很多。比如,一個線程想嘗試獲取某個共享變量,如果用阻塞算法(同步 / 信號量),那么就是嘗試獲取,如果失敗,則阻塞(被掛起)。如果使用的CAS,那么如果嘗試失敗,下一次再嘗試。
所以使用CAS,會有多個狀態(tài)。而用同步,狀態(tài)就比較少。例如,在ConcurrentHashMap中,有可能出現(xiàn)這樣一個狀態(tài):lockState == WAITER,這說明沒有讀者,沒有寫者,只有一個等待者(PS:這個狀態(tài)在contendedLock()的for循環(huán)的第一個if中被處理)。你可能會對這個狀態(tài)感到疑惑,既然沒有讀者,那么為什么這個線程不成為寫者,卻成為等待者呢?其實,只要考慮一下什么時候會出現(xiàn)這個狀態(tài),就很容易理解了。
1. 當(dāng)一個想修改紅黑樹數(shù)據(jù)結(jié)構(gòu)的線程嘗試進入時,發(fā)現(xiàn)已經(jīng)存在讀者。它嘗試進入失敗,于是,它嘗試成為等待者,嘗試成功,它真的成為等待者了。2. 之后,過了一段時間,最后一個讀者嘗試離開,如果嘗試離開成功了,那么保證會喚起等待者。3. 等待者醒了,此時在等待者眼中,就會出現(xiàn)上述狀態(tài)
現(xiàn)在在看下CS:APP的解法,發(fā)現(xiàn)是真的簡單啊。。。
補充:
CAS一般都會結(jié)合volatile變量一起使用,CAS實現(xiàn)原子性,而volatile實現(xiàn)內(nèi)存可見性(i.e.happens-before)。參卡AtomicInteger,AtomicReference等
附加
resize
* The table is resized when occupancy exceeds a percentage
* threshold (nominally, 0.75, but see below). Any thread
* noticing an overfull bin may assist in resizing after the
* initiating thread allocates and sets up the replacement array.
* However, rather than stalling, these other threads may proceed
* with insertions etc. The use of TreeBins shields us from the
* worst case effects of overfilling while resizes are in
* progress. Resizing proceeds by transferring bins, one by one,
* from the table to the next table. However, threads claim small
* blocks of indices to transfer (via field transferIndex) before
* doing so, reducing contention. A generation stamp in field
* sizeCtl ensures that resizings do not overlap. Because we are
* using power-of-two expansion, the elements from each bin must
* either stay at same index, or move with a power of two
* offset. We eliminate unnecessary node creation by catching
* cases where old nodes can be reused because their next fields
* won't change. On average, only about one-sixth of them need
* cloning when a table doubles. The nodes they replace will be
* garbage collectable as soon as they are no longer referenced by
* any reader thread that may be in the midst of concurrently
* traversing table. Upon transfer, the old table bin contains
* only a special forwarding node (with hash field "MOVED") that
* contains the next table as its key. On encountering a
* forwarding node, access and update operations restart, using
* the new table.
resize時的并發(fā)
resize是通過transfer完成的,在resize時,仍然能達成三個并發(fā)目標(biāo)。讀操作(get)線程不會helpTransfer,但是所有寫操作線程(put remove)都會helpTransfer。resize時,不會改動原table,所以讀線程繼續(xù)讀原table是安全的。而且,因為此時任何寫線程都在helpTransfer,它們的寫操作都沒有完成,所以讀線程即使讀不到它們的修改,也是允許的(符合happens-before)。
數(shù)據(jù)結(jié)構(gòu)如何變化
table是Bin數(shù)組,每一個Bin裝Node,每個Node裝一個<key, value>
invariants:
key.hashCode() --> compute hash --> hash % table.length作為table數(shù)組的下標(biāo)。所以,resize時,要重新決定每一個結(jié)點要放到哪一個Bin中。
tricks to speed up
Resizing proceeds by transferring bins, one by one,
from the table to the next table. However, threads claim small
blocks of indices to transfer (via field transferIndex) before
doing so, reducing contention. A generation stamp in field
sizeCtl ensures that resizings do not overlap
Because we are using power-of-two expansion, the elements from each bin must either stay at same index, or move with a power of two offset.
We eliminate unnecessary node creation by catching cases where old nodes can be reused because their next fields won't change.