散列表(Hash table)是字典結構的常用實現(xiàn),它能夠在插入和根據(jù)Key查詢數(shù)據(jù)時都保持O(1)的時間復雜度。大部分語言中都有散列表的默認實現(xiàn),比如Java中的HashMap和Go中的map?;诖蟛糠质褂脠鼍暗男阅芸紤],這些實現(xiàn)都不是并發(fā)安全的。為了在多線程下安全的使用map,大部分基礎庫都額外提供了線程安全的map實現(xiàn),我們從源碼層面看下這些庫的實現(xiàn)原理。
所有讀寫串行執(zhí)行
既然并發(fā)讀寫同一個Map不是線程安全的,那么最簡單的方式就是把所有的操作都串行化。Java最早的Map實現(xiàn)Hashtable就是這么做的,通過將所有方法訪問標識都設置成synchronized,保證了同一時間只有一個線程在訪問Map實例,從而達到線程安全的目的。我們可以看下代碼:
public synchronized V get(Object key) {
Entry<?,?> tab[] = table;
int hash = key.hashCode();
...
}
public synchronized V put(K key, V value) {
...
}
public synchronized int size() {
return count;
}
存在問題
這種實現(xiàn)存在的問題就是,所有操作都是串行的,在沒有修改操作只是并發(fā)讀取的情況下也要串行執(zhí)行,大大影響并發(fā)速度。
普通字典加讀寫鎖
既然前一種實現(xiàn)中讀寫會相互影響,那自然就想到可以將讀寫分離,讓讀取操作可以并發(fā)執(zhí)行,讀寫和寫寫之間串行。java中我們可以使用普通的HashMap和ReadWriteLock來達到這個目的。我們看下示例代碼:
public class ConcurrentMap<K, V> {
private ReadWriteLock lock;
private Map<K, V> dataMap;
public ConcurrentMap() {
dataMap = new HashMap<>();
lock = new ReentrantReadWriteLock();
}
public V get(K key) {
lock.readLock().lock();
try{
return dataMap.get(key);
}finally {
lock.readLock().unlock();
}
}
public void put(K key, V value) {
lock.writeLock().lock();
try{
dataMap.put(key, value);
}finally {
lock.writeLock().unlock();
}
}
public int size() {
lock.readLock().lock();
try{
return dataMap.size();
}finally {
lock.readLock().unlock();
}
}
}
使用JDK中提供的讀寫鎖,可以達到兩個目的。首先,在修改數(shù)據(jù)的時候通過獲取寫鎖,可以阻塞其它線程的讀寫操作,不會造成并發(fā)寫; 同時在讀取數(shù)據(jù)的時候只要沒有寫鎖存在,而讀鎖不會互相阻塞,就可以實現(xiàn)并發(fā)讀。再次,讀寫鎖還可以保證dataMap中數(shù)據(jù)的內存可見性。
上面這種實現(xiàn)方式在寫少讀多的情況下會大大提高并發(fā)執(zhí)行效率 (可以使用JDK1.8中新提供的StampedLock代替ReadWriteLock對上面的代碼性能做進一步優(yōu)化)。
存在問題
這種方式減少了讀寫沖突,但是因為寫鎖只有一個,對于多個并發(fā)中有修改數(shù)據(jù)的操作,仍然需要串行執(zhí)行。而且讀取的時候還是要先嘗試獲取一下讀鎖,總歸還是有影響的。
讀寫字典分開
Go語言的并發(fā)安全的Map實現(xiàn)中,使用了另外一種讀寫分離的方案。即在一個ConcurrentMap的底層使用一個普通的ReadMap和一個普通的dirtyMap來存儲數(shù)據(jù)。兩個map中key有可能是重復的,但是同一個key對應的value通過指針的方式指到同一個內存地址。
新增數(shù)據(jù)時,直接寫到dirtyMap中,并且修改readMap的標記位,標示dirtyMap中存在readMap沒有的數(shù)據(jù)。在達到一定條件后,使用dirtyMap的數(shù)據(jù)覆蓋readMap。在讀取數(shù)據(jù)的時候,先從readMap中不加鎖讀取,如果找到value或者沒找到value但是標記位表示dirtyMap中沒有新數(shù)據(jù),則直接返回。否則對dirtyMap加鎖讀取。
首先來看下Get實現(xiàn):
func (m *Map) Load(key interface{}) (value interface{}, ok bool) {
//1. 使用原子操作加載read map
read, _ := m.read.Load().(readOnly)
//2. 從read map中查詢key
e, ok := read.m[key]
//3. 如果沒找到并且dirty map中有新的key
if !ok && read.amended {
//4. 獲取鎖
m.mu.Lock()
//5. 獲取鎖之后從對read map做Double Check
read, _ = m.read.Load().(readOnly)
e, ok = read.m[key]
//6. 還是沒找到
if !ok && read.amended {
//7. 檢查dirty map中key存不存在
e, ok = m.dirty[key]
//8. 每次從dirty map嘗試讀取都會增加miss計數(shù)
m.missLocked()
}
m.mu.Unlock()
}
if !ok {
return nil, false
}
//9. 返回結果,這里用的原子Load操作
return e.load()
}
從上面的代碼中可以看到,使用了兩種手段保證并發(fā)安全。首先,每次都使用原子操作加載read map,這樣可以保證內存可見性;再次,如果需要從dirty map中獲取值,則需要加鎖,并且在加鎖成功后再次檢查read map有沒有變化。這里通過read.amended屬性來減少獲取鎖的次數(shù),如果map內數(shù)據(jù)一直沒有變化,amended等于false,就不會進入加鎖檢查讀取dirty map的邏輯。
第8步中的missLocked()方法會增加miss計數(shù),當計數(shù)超過閾值時,會使用dirty map覆蓋read map,這樣dirty map中如果有新的key,一段時間的加鎖讀取之后就會轉換為直接讀取。
下面再來看Put的邏輯:
func (m *Map) Store(key, value interface{}) {
//1. 通過原子操作加載read map,如果key存在通過CAS覆蓋value
read, _ := m.read.Load().(readOnly)
if e, ok := read.m[key]; ok && e.tryStore(&value) {
return
}
//2. 獲取鎖
m.mu.Lock()
read, _ = m.read.Load().(readOnly)
if e, ok := read.m[key]; ok {
//3. 獲取鎖成功后對 read map做double check
if e.unexpungeLocked() {
m.dirty[key] = e
}
e.storeLocked(&value)
} else if e, ok := m.dirty[key]; ok {
//4. 如果dirty map中存在,更新
e.storeLocked(&value)
} else {
//5. 新的key插入到dirty map中
if !read.amended {
//6. 更新read.amended屬性為true, 表示dirty map中增加了新的key
m.dirtyLocked()
m.read.Store(readOnly{m: read.m, amended: true})
}
m.dirty[key] = newEntry(value)
}
m.mu.Unlock()
}
在Put操作的時候,首先是判斷read map中key是否存在,存在則更新。這里使用的是CAS循環(huán)更新保證第3,4行之間即使有其他線程改了key的值也不會有問題。
如果read map中不存在,則加鎖更新或者新增到dirty map中。
存在問題
Go中使用的方法跟前面使用讀寫鎖的辦法相比,通過使用原子load和store方法,減少寫鎖阻塞范圍。同時如果key存在的情況下,更新操作無需加鎖,性能也會有提升。但是對于新的key寫入的情況,仍然需要加鎖串行執(zhí)行。而且因為讀寫數(shù)據(jù)分離,需要做數(shù)據(jù)定期copy,所以這個實現(xiàn)僅適合寫少讀多或者多線程更新不同的key的場景。
使用分段鎖
上面講的讀寫鎖和Go中的實現(xiàn)方案最大的問題是只要有任何一個線程獲取到寫鎖,其他寫線程只能等待,無論多個線程操作的數(shù)據(jù)是否有關系。所以,JDK1.8之前的實現(xiàn)類ConcurrentHashMap采用分段鎖來做優(yōu)化。
首先來看下數(shù)據(jù)結構:

每個Map中包含多個Segment,每個Segment中包含一個類似HashMap的實現(xiàn)。每個key先在計算出hashCode后,首先定位屬于哪個segment, 然后再在Segment包含的table中定位屬于哪個Slot。
當執(zhí)行Get操作時,利用final和volatile變量的內存可見性定義,無需加鎖從內存中直接讀取。執(zhí)行Put操作時,首先定位Segment,然后通過輪詢獲取鎖。這樣鎖定的范圍僅限于當前的Segment,不同Segment的更新操作可以并發(fā)執(zhí)行。
Segment定義
static final class Segment<K,V> extends ReentrantLock implements Serializable {
transient volatile HashEntry<K,V>[] table;
}
static final class HashEntry<K,V> {
final int hash;
final K key;
volatile V value;
volatile HashEntry<K,V> next;
}
Segment是直接繼承了ReentrantLock,這樣可以少定義一個鎖對象。同時通過將包含的HashEntry table用volatile修飾來保證并發(fā)讀取的時候的內存可見性。HashEntry的hash code和key屬性都是final的,保證初始化后所有線程看到的值是不會變的,value和next也都是volatile的。
Get方法實現(xiàn)
public V get(Object key) {
Segment<K,V> s;
HashEntry<K,V>[] tab;
//1. 獲取key的hashcode
int h = hash(key);
//2. 計算Segment的index
long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
//3. 獲取key所屬的Segment和包含的Hash table
if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
(tab = s.table) != null) {
//4. 遍歷key所在slot的List, 獲取value的值
for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
(tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
e != null; e = e.next) {
K k;
if ((k = e.key) == key || (e.hash == h && key.equals(k)))
return e.value;
}
}
return null;
}
Put操作實現(xiàn)
public V put(K key, V value) {
Segment<K,V> s;
if (value == null)
throw new NullPointerException();
int hash = hash(key);
int j = (hash >>> segmentShift) & segmentMask;
if ((s = (Segment<K,V>)UNSAFE.getObject
(segments, (j << SSHIFT) + SBASE)) == null)
//如果Segment尚未初始化,則使用CAS方式初始化Segment
s = ensureSegment(j);
//調用Segment的put方法
return s.put(key, hash, value, false);
}
Put操作除了跟Get操作一樣,首先獲取Segment之外,如果Segment尚未初始化,會采用CAS加循環(huán)的方式初始化Segment,仍然不需要使用鎖。然后調用Segment的put方法。
Segment.put()
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
//1. 獲取鎖
HashEntry<K,V> node = tryLock() ? null :
scanAndLockForPut(key, hash, value);
V oldValue;
try {
HashEntry<K,V>[] tab = table;
//2. 計算key所在的slot
int index = (tab.length - 1) & hash;
//3. 獲取slot鏈表的第一個元素
HashEntry<K,V> first = entryAt(tab, index);
for (HashEntry<K,V> e = first;;) {
if (e != null) {
//4. 如果同一個slot存在HashEntry, 則更新或者一直走到list的末尾
K k;
if ((k = e.key) == key ||
(e.hash == hash && key.equals(k))) {
//5. Key已存在,更新
oldValue = e.value;
if (!onlyIfAbsent) {
e.value = value;
++modCount; //記錄更新次數(shù)
}
break;
}
e = e.next;
}
else {
//6. 如果slot為空,則當前Entry作為第一個元素
if (node != null)
node.setNext(first);
else
node = new HashEntry<K,V>(hash, key, value, first);
int c = count + 1;
if (c > threshold && tab.length < MAXIMUM_CAPACITY)
rehash(node); //判斷是否需要做rehash
else
setEntryAt(tab, index, node);
++modCount;
count = c;
oldValue = null;
break;
}
}
} finally {
unlock();
}
return oldValue;
}
在Segment的put方法第一步就先嘗試獲取鎖,這里面的scanAndLockForPut()方法會循環(huán)調用tryLock(),如果超過指定的次數(shù)仍然沒有獲取到,則直接調用lock()方法等待其它線程釋放鎖。
存在的問題
JDK1.8之前的實現(xiàn)中,通過將key分到不同的Segment來減小鎖的力度,只有屬于同一個Segment的Key在更新時才會互相阻塞,大大降低了并發(fā)沖突的可能。但是Lock的實現(xiàn)原理決定了Segment的數(shù)目是有數(shù)量限制的,過多會導致鎖輪詢獲取時,耗費大量的CPU時間。但是如果過少,又會造成單個Segment中元素過多,鎖的粒度會變大。
JDK1.8鎖定策略
在JDK1.8中對ConcurrentHashMap做了重新實現(xiàn),取消了Segment邏輯,而更像是普通的HashMap,并針對每個Slot加了一個同步鎖。
數(shù)據(jù)結構如下:

對于hash table每個slot中鏈表超過一定長度,則轉化為紅黑樹,加快查找速度。同時對Node的屬性定義也使用final和volatile修飾保證內存可見性。
public class ConcurrentHashMap<K,V> extends AbstractMap<K,V>
implements ConcurrentMap<K,V>, Serializable {
transient volatile Node<K,V>[] table;
...
static class Node<K,V> implements Map.Entry<K,V> {
final int hash;
final K key;
volatile V val;
volatile Node<K,V> next;
}
}
對于key的查找操作跟之前區(qū)別不大,只是少了Segment定位這一步,直接定位到具體的Slot。重點來看一下Put操作:
public V put(K key, V value) {
return putVal(key, value, false);
}
/** Implementation for put and putIfAbsent */
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0)
tab = initTable(); //hash table延遲初始化
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
//1. 如果Slot是空的,直接通過CAS操作將當前值作為第一個元素,無需lock
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
else if ((fh = f.hash) == MOVED)
//2. 正在做rehash
tab = helpTransfer(tab, f);
else {
V oldVal = null;
//3. 同步鎖鎖定Slot的第一個Node
synchronized (f) {
if (tabAt(tab, i) == f) {
if (fh >= 0) {
// 鏈表存儲
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
else if (f instanceof TreeBin) {
//紅黑樹存儲
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
//4. 是否需要轉成紅黑樹
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
//5. 增加計數(shù),用于size()返回
addCount(1L, binCount);
return null;
}
Put操作中首先會檢查Slot是否為空,如果為空,則使用CAS加循環(huán)將當前元素設置為第一個元素,無需加鎖。如果Slot中已經存在元素,使用同步鎖鎖定Slot中第一個元素,做插入或者更新操作。
從上面的鎖定邏輯可以看到,同一個slot中的元素更新時才會互相阻塞,而且新版本中不再使用ReentrantLock,相對于之前版本鎖定粒度更小。那之前版本鎖過多的問題如何解決呢?這個主要是在1.8 Java虛擬機對于同步鎖做了較大的升級,引入了偏向鎖->輕量級鎖->重量級鎖的升級邏輯,顯然JDK開發(fā)者經過測試同步鎖不足以對性能產生較大影響。
實際使用案例
通過對以上并發(fā)安全的Hash表的分析,對我們在解決實際項目中的并發(fā)問題也有一定的啟發(fā)。下面舉一個分段鎖的使用案例。
在分布式系統(tǒng)中,服務之間經常使用消息隊列來解耦,比如電商系統(tǒng)中,一筆訂單的狀態(tài)變化可能需要推送消息給物流系統(tǒng)、ERP系統(tǒng)、CRM系統(tǒng)等,而在接收端Consumer通常是多線程處理消息的,大致架構如下:

在特定業(yè)務場景下,需要同一筆訂單的消息不能并發(fā)處理。最好的解決辦法,將屬于同一筆訂單的消息使用同一個線程來處理,但是對于使用線程池的場景,這個要求是很難滿足的。所以自然想到用互斥鎖來實現(xiàn),我們可以借鑒ConcurrentHashMap中的分段鎖邏輯,比如使用8個Lock,每個訂單消息收到的時候,將訂單id對8取模,然后獲取相應的Lock。
大致的代碼邏輯如下:
public class OrderMessageListener {
private Lock[] locks = new ReentrantLock[8];
public void process(message Message) {
int index = message.getOrderID() % locks.length;
locks[index].lock();
try{
// process message
}finally {
locks[index].unlock();
}
}
}
以上邏輯只能處理單個consumer進程的情況
總結
從以上實現(xiàn)方案可以看到,高并發(fā)場景下需要保證線程安全又能保持高吞吐,優(yōu)先考慮是否可以讀操作不使用互斥鎖,而選擇用原子操作或者volatile變量控制內存可見性。對于寫操作的鎖,需要盡量減小鎖粒度,并盡量快的釋放鎖。