看源碼學數(shù)據(jù)結構,如何實現(xiàn)并發(fā)安全的Hash表

散列表(Hash table)是字典結構的常用實現(xiàn),它能夠在插入和根據(jù)Key查詢數(shù)據(jù)時都保持O(1)的時間復雜度。大部分語言中都有散列表的默認實現(xiàn),比如Java中的HashMap和Go中的map?;诖蟛糠质褂脠鼍暗男阅芸紤],這些實現(xiàn)都不是并發(fā)安全的。為了在多線程下安全的使用map,大部分基礎庫都額外提供了線程安全的map實現(xiàn),我們從源碼層面看下這些庫的實現(xiàn)原理。

所有讀寫串行執(zhí)行

既然并發(fā)讀寫同一個Map不是線程安全的,那么最簡單的方式就是把所有的操作都串行化。Java最早的Map實現(xiàn)Hashtable就是這么做的,通過將所有方法訪問標識都設置成synchronized,保證了同一時間只有一個線程在訪問Map實例,從而達到線程安全的目的。我們可以看下代碼:

public synchronized V get(Object key) {
        Entry<?,?> tab[] = table;
        int hash = key.hashCode();
        ...
 }
public synchronized V put(K key, V value) {
      ...
}
public synchronized int size() {
       return count;
}

存在問題
這種實現(xiàn)存在的問題就是,所有操作都是串行的,在沒有修改操作只是并發(fā)讀取的情況下也要串行執(zhí)行,大大影響并發(fā)速度。

普通字典加讀寫鎖

既然前一種實現(xiàn)中讀寫會相互影響,那自然就想到可以將讀寫分離,讓讀取操作可以并發(fā)執(zhí)行,讀寫和寫寫之間串行。java中我們可以使用普通的HashMapReadWriteLock來達到這個目的。我們看下示例代碼:

public class ConcurrentMap<K, V> {
    private ReadWriteLock lock;
    private Map<K, V> dataMap;

    public ConcurrentMap() {
        dataMap = new HashMap<>();
        lock = new ReentrantReadWriteLock();
    }

    public V get(K key) {
        lock.readLock().lock();
        try{
            return dataMap.get(key);
        }finally {
            lock.readLock().unlock();
        }
    }

    public void put(K key, V value) {
        lock.writeLock().lock();
        try{
            dataMap.put(key, value);
        }finally {
            lock.writeLock().unlock();
        }
    }

    public int size() {
        lock.readLock().lock();
        try{
            return dataMap.size();
        }finally {
            lock.readLock().unlock();
        }
    }
}

使用JDK中提供的讀寫鎖,可以達到兩個目的。首先,在修改數(shù)據(jù)的時候通過獲取寫鎖,可以阻塞其它線程的讀寫操作,不會造成并發(fā)寫; 同時在讀取數(shù)據(jù)的時候只要沒有寫鎖存在,而讀鎖不會互相阻塞,就可以實現(xiàn)并發(fā)讀。再次,讀寫鎖還可以保證dataMap中數(shù)據(jù)的內存可見性。
上面這種實現(xiàn)方式在寫少讀多的情況下會大大提高并發(fā)執(zhí)行效率 (可以使用JDK1.8中新提供的StampedLock代替ReadWriteLock對上面的代碼性能做進一步優(yōu)化)。
存在問題
這種方式減少了讀寫沖突,但是因為寫鎖只有一個,對于多個并發(fā)中有修改數(shù)據(jù)的操作,仍然需要串行執(zhí)行。而且讀取的時候還是要先嘗試獲取一下讀鎖,總歸還是有影響的。

讀寫字典分開

Go語言的并發(fā)安全的Map實現(xiàn)中,使用了另外一種讀寫分離的方案。即在一個ConcurrentMap的底層使用一個普通的ReadMap和一個普通的dirtyMap來存儲數(shù)據(jù)。兩個map中key有可能是重復的,但是同一個key對應的value通過指針的方式指到同一個內存地址。
新增數(shù)據(jù)時,直接寫到dirtyMap中,并且修改readMap的標記位,標示dirtyMap中存在readMap沒有的數(shù)據(jù)。在達到一定條件后,使用dirtyMap的數(shù)據(jù)覆蓋readMap。在讀取數(shù)據(jù)的時候,先從readMap中不加鎖讀取,如果找到value或者沒找到value但是標記位表示dirtyMap中沒有新數(shù)據(jù),則直接返回。否則對dirtyMap加鎖讀取。
首先來看下Get實現(xiàn):

func (m *Map) Load(key interface{}) (value interface{}, ok bool) {
    //1. 使用原子操作加載read map
    read, _ := m.read.Load().(readOnly)
    //2. 從read map中查詢key
    e, ok := read.m[key]
    //3. 如果沒找到并且dirty map中有新的key
    if !ok && read.amended {
        //4. 獲取鎖
        m.mu.Lock()
        //5. 獲取鎖之后從對read map做Double Check
        read, _ = m.read.Load().(readOnly)
        e, ok = read.m[key]
        //6. 還是沒找到
        if !ok && read.amended {
            //7. 檢查dirty map中key存不存在
            e, ok = m.dirty[key]
            //8. 每次從dirty map嘗試讀取都會增加miss計數(shù)
            m.missLocked()
        }
        m.mu.Unlock()
    }
    if !ok {
        return nil, false
    }
    //9. 返回結果,這里用的原子Load操作
    return e.load()
}

從上面的代碼中可以看到,使用了兩種手段保證并發(fā)安全。首先,每次都使用原子操作加載read map,這樣可以保證內存可見性;再次,如果需要從dirty map中獲取值,則需要加鎖,并且在加鎖成功后再次檢查read map有沒有變化。這里通過read.amended屬性來減少獲取鎖的次數(shù),如果map內數(shù)據(jù)一直沒有變化,amended等于false,就不會進入加鎖檢查讀取dirty map的邏輯。
第8步中的missLocked()方法會增加miss計數(shù),當計數(shù)超過閾值時,會使用dirty map覆蓋read map,這樣dirty map中如果有新的key,一段時間的加鎖讀取之后就會轉換為直接讀取。
下面再來看Put的邏輯:

func (m *Map) Store(key, value interface{}) {
   //1. 通過原子操作加載read map,如果key存在通過CAS覆蓋value
    read, _ := m.read.Load().(readOnly)
    if e, ok := read.m[key]; ok && e.tryStore(&value) {
        return
    }
   //2.  獲取鎖
    m.mu.Lock()
    read, _ = m.read.Load().(readOnly)
    if e, ok := read.m[key]; ok {
       //3. 獲取鎖成功后對 read map做double check
        if e.unexpungeLocked() {
            m.dirty[key] = e
        }
        e.storeLocked(&value)
    } else if e, ok := m.dirty[key]; ok {
        //4. 如果dirty map中存在,更新
        e.storeLocked(&value)
    } else {
        //5. 新的key插入到dirty map中
        if !read.amended {
            //6. 更新read.amended屬性為true, 表示dirty map中增加了新的key
            m.dirtyLocked()
            m.read.Store(readOnly{m: read.m, amended: true})
        }
        m.dirty[key] = newEntry(value)
    }
    m.mu.Unlock()
}

在Put操作的時候,首先是判斷read map中key是否存在,存在則更新。這里使用的是CAS循環(huán)更新保證第3,4行之間即使有其他線程改了key的值也不會有問題。
如果read map中不存在,則加鎖更新或者新增到dirty map中。
存在問題
Go中使用的方法跟前面使用讀寫鎖的辦法相比,通過使用原子load和store方法,減少寫鎖阻塞范圍。同時如果key存在的情況下,更新操作無需加鎖,性能也會有提升。但是對于新的key寫入的情況,仍然需要加鎖串行執(zhí)行。而且因為讀寫數(shù)據(jù)分離,需要做數(shù)據(jù)定期copy,所以這個實現(xiàn)僅適合寫少讀多或者多線程更新不同的key的場景。

使用分段鎖

上面講的讀寫鎖和Go中的實現(xiàn)方案最大的問題是只要有任何一個線程獲取到寫鎖,其他寫線程只能等待,無論多個線程操作的數(shù)據(jù)是否有關系。所以,JDK1.8之前的實現(xiàn)類ConcurrentHashMap采用分段鎖來做優(yōu)化。
首先來看下數(shù)據(jù)結構:

數(shù)據(jù)結構

每個Map中包含多個Segment,每個Segment中包含一個類似HashMap的實現(xiàn)。每個key先在計算出hashCode后,首先定位屬于哪個segment, 然后再在Segment包含的table中定位屬于哪個Slot。
當執(zhí)行Get操作時,利用final和volatile變量的內存可見性定義,無需加鎖從內存中直接讀取。執(zhí)行Put操作時,首先定位Segment,然后通過輪詢獲取鎖。這樣鎖定的范圍僅限于當前的Segment,不同Segment的更新操作可以并發(fā)執(zhí)行。
Segment定義

static final class Segment<K,V> extends ReentrantLock implements Serializable {
    transient volatile HashEntry<K,V>[] table;
}
static final class HashEntry<K,V> {
        final int hash;
        final K key;
        volatile V value;
        volatile HashEntry<K,V> next;
}

Segment是直接繼承了ReentrantLock,這樣可以少定義一個鎖對象。同時通過將包含的HashEntry table用volatile修飾來保證并發(fā)讀取的時候的內存可見性。HashEntry的hash code和key屬性都是final的,保證初始化后所有線程看到的值是不會變的,value和next也都是volatile的。
Get方法實現(xiàn)

public V get(Object key) {
        Segment<K,V> s; 
        HashEntry<K,V>[] tab;
        //1. 獲取key的hashcode
        int h = hash(key);
        //2. 計算Segment的index
        long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
        //3. 獲取key所屬的Segment和包含的Hash table
        if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
            (tab = s.table) != null) {
            //4. 遍歷key所在slot的List, 獲取value的值
            for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
                     (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
                 e != null; e = e.next) {
                K k;
                if ((k = e.key) == key || (e.hash == h && key.equals(k)))
                    return e.value;
            }
        }
        return null;
 }

Put操作實現(xiàn)

public V put(K key, V value) {
        Segment<K,V> s;
        if (value == null)
            throw new NullPointerException();
        int hash = hash(key);
        int j = (hash >>> segmentShift) & segmentMask;
        if ((s = (Segment<K,V>)UNSAFE.getObject         
             (segments, (j << SSHIFT) + SBASE)) == null) 
             //如果Segment尚未初始化,則使用CAS方式初始化Segment
            s = ensureSegment(j);
            //調用Segment的put方法
        return s.put(key, hash, value, false);
}

Put操作除了跟Get操作一樣,首先獲取Segment之外,如果Segment尚未初始化,會采用CAS加循環(huán)的方式初始化Segment,仍然不需要使用鎖。然后調用Segment的put方法。
Segment.put()

final V put(K key, int hash, V value, boolean onlyIfAbsent) {
            //1. 獲取鎖
            HashEntry<K,V> node = tryLock() ? null :
                scanAndLockForPut(key, hash, value);
            V oldValue;
            try {
                HashEntry<K,V>[] tab = table;
                //2. 計算key所在的slot
                int index = (tab.length - 1) & hash;
                //3. 獲取slot鏈表的第一個元素
                HashEntry<K,V> first = entryAt(tab, index);
                for (HashEntry<K,V> e = first;;) {
                    if (e != null) {
                        //4. 如果同一個slot存在HashEntry, 則更新或者一直走到list的末尾
                        K k;
                        if ((k = e.key) == key ||
                            (e.hash == hash && key.equals(k))) {
                            //5. Key已存在,更新
                            oldValue = e.value;
                            if (!onlyIfAbsent) {
                                e.value = value;
                                ++modCount; //記錄更新次數(shù)
                            }
                            break;
                        }
                        e = e.next;
                    }
                    else {
                        //6. 如果slot為空,則當前Entry作為第一個元素
                        if (node != null)
                            node.setNext(first);
                        else
                            node = new HashEntry<K,V>(hash, key, value, first);
                        int c = count + 1;
                        if (c > threshold && tab.length < MAXIMUM_CAPACITY)
                            rehash(node); //判斷是否需要做rehash
                        else
                            setEntryAt(tab, index, node);
                        ++modCount;
                        count = c;
                        oldValue = null;
                        break;
                    }
                }
            } finally {
                unlock();
            }
            return oldValue;
 }

Segment的put方法第一步就先嘗試獲取鎖,這里面的scanAndLockForPut()方法會循環(huán)調用tryLock(),如果超過指定的次數(shù)仍然沒有獲取到,則直接調用lock()方法等待其它線程釋放鎖。
存在的問題
JDK1.8之前的實現(xiàn)中,通過將key分到不同的Segment來減小鎖的力度,只有屬于同一個Segment的Key在更新時才會互相阻塞,大大降低了并發(fā)沖突的可能。但是Lock的實現(xiàn)原理決定了Segment的數(shù)目是有數(shù)量限制的,過多會導致鎖輪詢獲取時,耗費大量的CPU時間。但是如果過少,又會造成單個Segment中元素過多,鎖的粒度會變大。

JDK1.8鎖定策略

在JDK1.8中對ConcurrentHashMap做了重新實現(xiàn),取消了Segment邏輯,而更像是普通的HashMap,并針對每個Slot加了一個同步鎖。
數(shù)據(jù)結構如下:

數(shù)據(jù)結構1.8

對于hash table每個slot中鏈表超過一定長度,則轉化為紅黑樹,加快查找速度。同時對Node的屬性定義也使用final和volatile修飾保證內存可見性。

public class ConcurrentHashMap<K,V> extends AbstractMap<K,V>
    implements ConcurrentMap<K,V>, Serializable {
    transient volatile Node<K,V>[] table;
    ...
    static class Node<K,V> implements Map.Entry<K,V> {
        final int hash;
        final K key;
        volatile V val;
        volatile Node<K,V> next;
    }
}

對于key的查找操作跟之前區(qū)別不大,只是少了Segment定位這一步,直接定位到具體的Slot。重點來看一下Put操作:

    public V put(K key, V value) {
        return putVal(key, value, false);
    }

    /** Implementation for put and putIfAbsent */
    final V putVal(K key, V value, boolean onlyIfAbsent) {
        if (key == null || value == null) throw new NullPointerException();
        int hash = spread(key.hashCode());
        int binCount = 0;
        for (Node<K,V>[] tab = table;;) {
            Node<K,V> f; int n, i, fh;
            if (tab == null || (n = tab.length) == 0)
                tab = initTable(); //hash table延遲初始化
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
                //1. 如果Slot是空的,直接通過CAS操作將當前值作為第一個元素,無需lock
                if (casTabAt(tab, i, null,
                             new Node<K,V>(hash, key, value, null)))
                    break;                   // no lock when adding to empty bin
            }
            else if ((fh = f.hash) == MOVED)
                //2. 正在做rehash
                tab = helpTransfer(tab, f);
            else {
                V oldVal = null;
                //3. 同步鎖鎖定Slot的第一個Node 
                synchronized (f) {
                    if (tabAt(tab, i) == f) {
                        if (fh >= 0) {
                            // 鏈表存儲
                            binCount = 1;
                            for (Node<K,V> e = f;; ++binCount) {
                                K ek;
                                if (e.hash == hash &&
                                    ((ek = e.key) == key ||
                                     (ek != null && key.equals(ek)))) {
                                    oldVal = e.val;
                                    if (!onlyIfAbsent)
                                        e.val = value;
                                    break;
                                }
                                Node<K,V> pred = e;
                                if ((e = e.next) == null) {
                                    pred.next = new Node<K,V>(hash, key,
                                                              value, null);
                                    break;
                                }
                            }
                        }
                        else if (f instanceof TreeBin) {
                            //紅黑樹存儲
                            Node<K,V> p;
                            binCount = 2;
                            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                           value)) != null) {
                                oldVal = p.val;
                                if (!onlyIfAbsent)
                                    p.val = value;
                            }
                        }
                    }
                }
                if (binCount != 0) {
                    if (binCount >= TREEIFY_THRESHOLD)
                        //4. 是否需要轉成紅黑樹
                        treeifyBin(tab, i);
                    if (oldVal != null)
                        return oldVal;
                    break;
                }
            }
        }
        //5. 增加計數(shù),用于size()返回
        addCount(1L, binCount);
        return null;
   }

Put操作中首先會檢查Slot是否為空,如果為空,則使用CAS加循環(huán)將當前元素設置為第一個元素,無需加鎖。如果Slot中已經存在元素,使用同步鎖鎖定Slot中第一個元素,做插入或者更新操作。
從上面的鎖定邏輯可以看到,同一個slot中的元素更新時才會互相阻塞,而且新版本中不再使用ReentrantLock,相對于之前版本鎖定粒度更小。那之前版本鎖過多的問題如何解決呢?這個主要是在1.8 Java虛擬機對于同步鎖做了較大的升級,引入了偏向鎖->輕量級鎖->重量級鎖的升級邏輯,顯然JDK開發(fā)者經過測試同步鎖不足以對性能產生較大影響。

實際使用案例

通過對以上并發(fā)安全的Hash表的分析,對我們在解決實際項目中的并發(fā)問題也有一定的啟發(fā)。下面舉一個分段鎖的使用案例。
在分布式系統(tǒng)中,服務之間經常使用消息隊列來解耦,比如電商系統(tǒng)中,一筆訂單的狀態(tài)變化可能需要推送消息給物流系統(tǒng)、ERP系統(tǒng)、CRM系統(tǒng)等,而在接收端Consumer通常是多線程處理消息的,大致架構如下:

消息處理

在特定業(yè)務場景下,需要同一筆訂單的消息不能并發(fā)處理。最好的解決辦法,將屬于同一筆訂單的消息使用同一個線程來處理,但是對于使用線程池的場景,這個要求是很難滿足的。所以自然想到用互斥鎖來實現(xiàn),我們可以借鑒ConcurrentHashMap中的分段鎖邏輯,比如使用8個Lock,每個訂單消息收到的時候,將訂單id對8取模,然后獲取相應的Lock。
大致的代碼邏輯如下:

public class OrderMessageListener {
    private Lock[] locks = new ReentrantLock[8];
    
    public void process(message Message) {
        int index = message.getOrderID() % locks.length;
        locks[index].lock();
        try{
            // process message
        }finally {
            locks[index].unlock();
        }
    }
}

以上邏輯只能處理單個consumer進程的情況

總結

從以上實現(xiàn)方案可以看到,高并發(fā)場景下需要保證線程安全又能保持高吞吐,優(yōu)先考慮是否可以讀操作不使用互斥鎖,而選擇用原子操作或者volatile變量控制內存可見性。對于寫操作的鎖,需要盡量減小鎖粒度,并盡量快的釋放鎖。

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內容

友情鏈接更多精彩內容