一文讓你徹底理解 Java HashMap 和 ConcurrentHashMap

前言

Map 這樣的 Key Value 在軟件開發(fā)中是非常經(jīng)典的結(jié)構(gòu),常用于在內(nèi)存中存放數(shù)據(jù)。

本篇主要想討論 ConcurrentHashMap 這樣一個并發(fā)容器,在正式開始之前我覺得有必要談談 HashMap,沒有它就不會有后面的 ConcurrentHashMap。歡迎大家關注公眾號:java大牛愛好者

HashMap

眾所周知 HashMap 底層是基于 數(shù)組 + 鏈表 組成的,不過在 jdk1.7 和 1.8 中具體實現(xiàn)稍有不同。

Base 1.7

1.7 中的數(shù)據(jù)結(jié)構(gòu)圖:

先來看看 1.7 中的實現(xiàn)。

這是 HashMap 中比較核心的幾個成員變量;看看分別是什么意思?

  1. 初始化桶大小,因為底層是數(shù)組,所以這是數(shù)組默認的大小。
  2. 桶最大值。
  3. 默認的負載因子(0.75)
  4. table 真正存放數(shù)據(jù)的數(shù)組。
  5. Map 存放數(shù)量的大小。
  6. 桶大小,可在初始化時顯式指定。
  7. 負載因子,可在初始化時顯式指定。

重點解釋下負載因子:

由于給定的 HashMap 的容量大小是固定的,比如默認初始化:

public HashMap() {
    this(DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR);
}
public HashMap(int initialCapacity, float loadFactor) {
    if (initialCapacity < 0)
        throw new IllegalArgumentException("Illegal initial capacity: " +
                                           initialCapacity);
    if (initialCapacity > MAXIMUM_CAPACITY)
        initialCapacity = MAXIMUM_CAPACITY;
    if (loadFactor <= 0 || Float.isNaN(loadFactor))
        throw new IllegalArgumentException("Illegal load factor: " +
                                           loadFactor);
    this.loadFactor = loadFactor;
    threshold = initialCapacity;
    init();
}

給定的默認容量為 16,負載因子為 0.75。Map 在使用過程中不斷的往里面存放數(shù)據(jù),當數(shù)量達到了 16 * 0.75 = 12 就需要將當前 16 的容量進行擴容,而擴容這個過程涉及到 rehash、復制數(shù)據(jù)等操作,所以非常消耗性能。

因此通常建議能提前預估 HashMap 的大小最好,盡量的減少擴容帶來的性能損耗。

根據(jù)代碼可以看到其實真正存放數(shù)據(jù)的是

transient Entry<K,V>[] table = (Entry<K,V>[]) EMPTY_TABLE;

這個數(shù)組,那么它又是如何定義的呢?

Entry 是 HashMap 中的一個內(nèi)部類,從他的成員變量很容易看出:

  • key 就是寫入時的鍵。
  • value 自然就是值。
  • 開始的時候就提到 HashMap 是由數(shù)組和鏈表組成,所以這個 next 就是用于實現(xiàn)鏈表結(jié)構(gòu)。
  • hash 存放的是當前 key 的 hashcode。

知曉了基本結(jié)構(gòu),那來看看其中重要的寫入、獲取函數(shù):

put 方法

public V put(K key, V value) {
    if (table == EMPTY_TABLE) {
        inflateTable(threshold);
    }
    if (key == null)
        return putForNullKey(value);
    int hash = hash(key);
    int i = indexFor(hash, table.length);
    for (Entry<K,V> e = table[i]; e != null; e = e.next) {
        Object k;
        if (e.hash == hash && ((k = e.key) == key || key.equals(k))) {
            V oldValue = e.value;
            e.value = value;
            e.recordAccess(this);
            return oldValue;
        }
    }
    modCount++;
    addEntry(hash, key, value, i);
    return null;
}
  • 判斷當前數(shù)組是否需要初始化。
  • 如果 key 為空,則 put 一個空值進去。
  • 根據(jù) key 計算出 hashcode。
  • 根據(jù)計算出的 hashcode 定位出所在桶。
  • 如果桶是一個鏈表則需要遍歷判斷里面的 hashcode、key 是否和傳入 key 相等,如果相等則進行覆蓋,并返回原來的值。
  • 如果桶是空的,說明當前位置沒有數(shù)據(jù)存入;新增一個 Entry 對象寫入當前位置。
void addEntry(int hash, K key, V value, int bucketIndex) {
    if ((size >= threshold) && (null != table[bucketIndex])) {
        resize(2 * table.length);
        hash = (null != key) ? hash(key) : 0;
        bucketIndex = indexFor(hash, table.length);
    }
    createEntry(hash, key, value, bucketIndex);
}
void createEntry(int hash, K key, V value, int bucketIndex) {
    Entry<K,V> e = table[bucketIndex];
    table[bucketIndex] = new Entry<>(hash, key, value, e);
    size++;
}

當調(diào)用 addEntry 寫入 Entry 時需要判斷是否需要擴容。

如果需要就進行兩倍擴充,并將當前的 key 重新 hash 并定位。

而在 createEntry 中會將當前位置的桶傳入到新建的桶中,如果當前桶有值就會在位置形成鏈表。

get 方法

再來看看 get 函數(shù):

public V get(Object key) {
    if (key == null)
        return getForNullKey();
    Entry<K,V> entry = getEntry(key);
    return null == entry ? null : entry.getValue();
}
final Entry<K,V> getEntry(Object key) {
    if (size == 0) {
        return null;
    }
    int hash = (key == null) ? 0 : hash(key);
    for (Entry<K,V> e = table[indexFor(hash, table.length)];
         e != null;
         e = e.next) {
        Object k;
        if (e.hash == hash &&
            ((k = e.key) == key || (key != null && key.equals(k))))
            return e;
    }
    return null;
}
  • 首先也是根據(jù) key 計算出 hashcode,然后定位到具體的桶中。
  • 判斷該位置是否為鏈表。
  • 不是鏈表就根據(jù) key、key 的 hashcode 是否相等來返回值。
  • 為鏈表則需要遍歷直到 key 及 hashcode 相等時候就返回值。
  • 啥都沒取到就直接返回 null 。

Base 1.8

不知道 1.7 的實現(xiàn)大家看出需要優(yōu)化的點沒有?

其實一個很明顯的地方就是:

當 Hash 沖突嚴重時,在桶上形成的鏈表會變的越來越長,這樣在查詢時的效率就會越來越低;時間復雜度為 O(N)

因此 1.8 中重點優(yōu)化了這個查詢效率。

1.8 HashMap 結(jié)構(gòu)圖:

先來看看幾個核心的成員變量:

static final int DEFAULT_INITIAL_CAPACITY = 1 << 4; // aka 16
/**
 * The maximum capacity, used if a higher value is implicitly specified
 * by either of the constructors with arguments.
 * MUST be a power of two <= 1<<30.
 */
static final int MAXIMUM_CAPACITY = 1 << 30;
/**
 * The load factor used when none specified in constructor.
 */
static final float DEFAULT_LOAD_FACTOR = 0.75f;
static final int TREEIFY_THRESHOLD = 8;
transient Node<K,V>[] table;
/**
 * Holds cached entrySet(). Note that AbstractMap fields are used
 * for keySet() and values().
 */
transient Set<Map.Entry<K,V>> entrySet;
/**
 * The number of key-value mappings contained in this map.
 */
transient int size;

和 1.7 大體上都差不多,還是有幾個重要的區(qū)別:

  • TREEIFY_THRESHOLD 用于判斷是否需要將鏈表轉(zhuǎn)換為紅黑樹的閾值。
  • HashEntry 修改為 Node。

Node 的核心組成其實也是和 1.7 中的 HashEntry 一樣,存放的都是 key value hashcode next 等數(shù)據(jù)。

再來看看核心方法。

put 方法

看似要比 1.7 的復雜,我們一步步拆解:

  1. 判斷當前桶是否為空,空的就需要初始化(resize 中會判斷是否進行初始化)。
  2. 根據(jù)當前 key 的 hashcode 定位到具體的桶中并判斷是否為空,為空表明沒有 Hash 沖突就直接在當前位置創(chuàng)建一個新桶即可。
  3. 如果當前桶有值( Hash 沖突),那么就要比較當前桶中的 key、key 的 hashcode 與寫入的 key 是否相等,相等就賦值給 e,在第 8 步的時候會統(tǒng)一進行賦值及返回。
  4. 如果當前桶為紅黑樹,那就要按照紅黑樹的方式寫入數(shù)據(jù)。
  5. 如果是個鏈表,就需要將當前的 key、value 封裝成一個新節(jié)點寫入到當前桶的后面(形成鏈表)。
  6. 接著判斷當前鏈表的大小是否大于預設的閾值,大于時就要轉(zhuǎn)換為紅黑樹。
  7. 如果在遍歷過程中找到 key 相同時直接退出遍歷。
  8. 如果 e != null 就相當于存在相同的 key,那就需要將值覆蓋。
  9. 最后判斷是否需要進行擴容。

get 方法

public V get(Object key) {
    Node<K,V> e;
    return (e = getNode(hash(key), key)) == null ? null : e.value;
}
final Node<K,V> getNode(int hash, Object key) {
    Node<K,V>[] tab; Node<K,V> first, e; int n; K k;
    if ((tab = table) != null && (n = tab.length) > 0 &&
        (first = tab[(n - 1) & hash]) != null) {
        if (first.hash == hash && // always check first node
            ((k = first.key) == key || (key != null && key.equals(k))))
            return first;
        if ((e = first.next) != null) {
            if (first instanceof TreeNode)
                return ((TreeNode<K,V>)first).getTreeNode(hash, key);
            do {
                if (e.hash == hash &&
                    ((k = e.key) == key || (key != null && key.equals(k))))
                    return e;
            } while ((e = e.next) != null);
        }
    }
    return null;
}

get 方法看起來就要簡單許多了。

  • 首先將 key hash 之后取得所定位的桶。
  • 如果桶為空則直接返回 null 。
  • 否則判斷桶的第一個位置(有可能是鏈表、紅黑樹)的 key 是否為查詢的 key,是就直接返回 value。
  • 如果第一個不匹配,則判斷它的下一個是紅黑樹還是鏈表。
  • 紅黑樹就按照樹的查找方式返回值。
  • 不然就按照鏈表的方式遍歷匹配返回值。

從這兩個核心方法(get/put)可以看出 1.8 中對大鏈表做了優(yōu)化,修改為紅黑樹之后查詢效率直接提高到了 O(logn)。

但是 HashMap 原有的問題也都存在,比如在并發(fā)場景下使用時容易出現(xiàn)死循環(huán)。

final HashMap<String, String> map = new HashMap<String, String>();
for (int i = 0; i < 1000; i++) {
    new Thread(new Runnable() {
        @Override
        public void run() {
            map.put(UUID.randomUUID().toString(), "");
        }
    }).start();
}

但是為什么呢?簡單分析下。

看過上文的還記得在 HashMap 擴容的時候會調(diào)用 resize() 方法,就是這里的并發(fā)操作容易在一個桶上形成環(huán)形鏈表;這樣當獲取一個不存在的 key 時,計算出的 index 正好是環(huán)形鏈表的下標就會出現(xiàn)死循環(huán)。

如下圖:

遍歷方式

還有一個值得注意的是 HashMap 的遍歷方式,通常有以下幾種:

Iterator<Map.Entry<String, Integer>> entryIterator = map.entrySet().iterator();
        while (entryIterator.hasNext()) {
            Map.Entry<String, Integer> next = entryIterator.next();
            System.out.println("key=" + next.getKey() + " value=" + next.getValue());
        }

Iterator<String> iterator = map.keySet().iterator();
        while (iterator.hasNext()){
            String key = iterator.next();
            System.out.println("key=" + key + " value=" + map.get(key));
        }

強烈建議使用第一種 EntrySet 進行遍歷。

第一種可以把 key value 同時取出,第二種還得需要通過 key 取一次 value,效率較低。

簡單總結(jié)下 HashMap:無論是 1.7 還是 1.8 其實都能看出 JDK 沒有對它做任何的同步操作,所以并發(fā)會出問題,甚至 1.7 中出現(xiàn)死循環(huán)導致系統(tǒng)不可用(1.8 已經(jīng)修復死循環(huán)問題)。

因此 JDK 推出了專項專用的 ConcurrentHashMap ,該類位于 java.util.concurrent 包下,專門用于解決并發(fā)問題。

堅持看到這里的朋友算是已經(jīng)把 ConcurrentHashMap 的基礎已經(jīng)打牢了,下面正式開始分析。

ConcurrentHashMap

ConcurrentHashMap 同樣也分為 1.7 、1.8 版,兩者在實現(xiàn)上略有不同。

Base 1.7

先來看看 1.7 的實現(xiàn),下面是他的結(jié)構(gòu)圖:

如圖所示,是由 Segment 數(shù)組、HashEntry 組成,和 HashMap 一樣,仍然是數(shù)組加鏈表。

它的核心成員變量:

/**
 * Segment 數(shù)組,存放數(shù)據(jù)時首先需要定位到具體的 Segment 中。
 */
final Segment<K,V>[] segments;
transient Set<K> keySet;
transient Set<Map.Entry<K,V>> entrySet;

Segment 是 ConcurrentHashMap 的一個內(nèi)部類,主要的組成如下:

static final class Segment<K,V> extends ReentrantLock implements Serializable {
       private static final long serialVersionUID = 2249069246763182397L;

       // 和 HashMap 中的 HashEntry 作用一樣,真正存放數(shù)據(jù)的桶
       transient volatile HashEntry<K,V>[] table;
       transient int count;
       transient int modCount;
       transient int threshold;
       final float loadFactor;

}

看看其中 HashEntry 的組成:

和 HashMap 非常類似,唯一的區(qū)別就是其中的核心數(shù)據(jù)如 value ,以及鏈表都是 Volatile 修飾的,保證了獲取時的可見性。

原理上來說:ConcurrentHashMap 采用了分段鎖技術,其中 Segment 繼承于 ReentrantLock。不會像 HashTable 那樣不管是 put 還是 get 操作都需要做同步處理,理論上 ConcurrentHashMap 支持 CurrencyLevel (Segment 數(shù)組數(shù)量)的線程并發(fā)。每當一個線程占用鎖訪問一個 Segment 時,不會影響到其他的 Segment。

下面也來看看核心的 put get 方法。

put 方法

public V put(K key, V value) {
    Segment<K,V> s;
    if (value == null)
        throw new NullPointerException();
    int hash = hash(key);
    int j = (hash >>> segmentShift) & segmentMask;
    if ((s = (Segment<K,V>)UNSAFE.getObject          // nonvolatile; recheck
         (segments, (j << SSHIFT) + SBASE)) == null) //  in ensureSegment
        s = ensureSegment(j);
    return s.put(key, hash, value, false);
}

首先是通過 key 定位到 Segment,之后在對應的 Segment 中進行具體的 put。

final V put(K key, int hash, V value, boolean onlyIfAbsent) {
    HashEntry<K,V> node = tryLock() ? null :
        scanAndLockForPut(key, hash, value);
    V oldValue;
    try {
        HashEntry<K,V>[] tab = table;
        int index = (tab.length - 1) & hash;
        HashEntry<K,V> first = entryAt(tab, index);
        for (HashEntry<K,V> e = first;;) {
            if (e != null) {
                K k;
                if ((k = e.key) == key ||
                    (e.hash == hash && key.equals(k))) {
                    oldValue = e.value;
                    if (!onlyIfAbsent) {
                        e.value = value;
                        ++modCount;
                    }
                    break;
                }
                e = e.next;
            }
            else {
                if (node != null)
                    node.setNext(first);
                else
                    node = new HashEntry<K,V>(hash, key, value, first);
                int c = count + 1;
                if (c > threshold && tab.length < MAXIMUM_CAPACITY)
                    rehash(node);
                else
                    setEntryAt(tab, index, node);
                ++modCount;
                count = c;
                oldValue = null;
                break;
            }
        }
    } finally {
        unlock();
    }
    return oldValue;
}

雖然 HashEntry 中的 value 是用 volatile 關鍵詞修飾的,但是并不能保證并發(fā)的原子性,所以 put 操作時仍然需要加鎖處理。

首先第一步的時候會嘗試獲取鎖,如果獲取失敗肯定就有其他線程存在競爭,則利用 scanAndLockForPut() 自旋獲取鎖。

  1. 嘗試自旋獲取鎖。
  2. 如果重試的次數(shù)達到了 MAX_SCAN_RETRIES 則改為阻塞鎖獲取,保證能獲取成功。

再結(jié)合圖看看 put 的流程。

  1. 將當前 Segment 中的 table 通過 key 的 hashcode 定位到 HashEntry。
  2. 遍歷該 HashEntry,如果不為空則判斷傳入的 key 和當前遍歷的 key 是否相等,相等則覆蓋舊的 value。
  3. 不為空則需要新建一個 HashEntry 并加入到 Segment 中,同時會先判斷是否需要擴容。
  4. 最后會解除在 1 中所獲取當前 Segment 的鎖。

get 方法

public V get(Object key) {
    Segment<K,V> s; // manually integrate access methods to reduce overhead
    HashEntry<K,V>[] tab;
    int h = hash(key);
    long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
    if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
        (tab = s.table) != null) {
        for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
                 (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
             e != null; e = e.next) {
            K k;
            if ((k = e.key) == key || (e.hash == h && key.equals(k)))
                return e.value;
        }
    }
    return null;
}

get 邏輯比較簡單:

只需要將 Key 通過 Hash 之后定位到具體的 Segment ,再通過一次 Hash 定位到具體的元素上。

由于 HashEntry 中的 value 屬性是用 volatile 關鍵詞修飾的,保證了內(nèi)存可見性,所以每次獲取時都是最新值。

ConcurrentHashMap 的 get 方法是非常高效的,因為整個過程都不需要加鎖。

Base 1.8

1.7 已經(jīng)解決了并發(fā)問題,并且能支持 N 個 Segment 這么多次數(shù)的并發(fā),但依然存在 HashMap 在 1.7 版本中的問題。

那就是查詢遍歷鏈表效率太低。

因此 1.8 做了一些數(shù)據(jù)結(jié)構(gòu)上的調(diào)整。

首先來看下底層的組成結(jié)構(gòu):

image

看起來是不是和 1.8 HashMap 結(jié)構(gòu)類似?

其中拋棄了原有的 Segment 分段鎖,而采用了 CAS + synchronized 來保證并發(fā)安全性。

image

也將 1.7 中存放數(shù)據(jù)的 HashEntry 改為 Node,但作用都是相同的。

其中的 val next 都用了 volatile 修飾,保證了可見性。

put 方法

重點來看看 put 函數(shù):

image
  • 根據(jù) key 計算出 hashcode 。
  • 判斷是否需要進行初始化。
  • f 即為當前 key 定位出的 Node,如果為空表示當前位置可以寫入數(shù)據(jù),利用 CAS 嘗試寫入,失敗則自旋保證成功。
  • 如果當前位置的 hashcode == MOVED == -1,則需要進行擴容。
  • 如果都不滿足,則利用 synchronized 鎖寫入數(shù)據(jù)。
  • 如果數(shù)量大于 TREEIFY_THRESHOLD 則要轉(zhuǎn)換為紅黑樹。

get 方法

image
  • 根據(jù)計算出來的 hashcode 尋址,如果就在桶上那么直接返回值。
  • 如果是紅黑樹那就按照樹的方式獲取值。
  • 就不滿足那就按照鏈表的方式遍歷獲取值。

1.8 在 1.7 的數(shù)據(jù)結(jié)構(gòu)上做了大的改動,采用紅黑樹之后可以保證查詢效率(O(logn)),甚至取消了 ReentrantLock 改為了 synchronized,這樣可以看出在新版的 JDK 中對 synchronized 優(yōu)化是很到位的。

總結(jié)

看完了整個 HashMap 和 ConcurrentHashMap 在 1.7 和 1.8 中不同的實現(xiàn)方式相信大家對他們的理解應該會更加到位。

其實這塊也是面試的重點內(nèi)容,通常的套路是:

  1. 談談你理解的 HashMap,講講其中的 get put 過程。
  2. 1.8 做了什么優(yōu)化?
  3. 是線程安全的嘛?
  4. 不安全會導致哪些問題?
  5. 如何解決?有沒有線程安全的并發(fā)容器?
  6. ConcurrentHashMap 是如何實現(xiàn)的? 1.7、1.8 實現(xiàn)有何不同?為什么這么做?

這一串問題相信大家仔細看完都能懟回面試官。

除了面試會問到之外平時的應用其實也蠻多,像之前談到的 Guava 中 Cache 的實現(xiàn)就是利用 ConcurrentHashMap 的思想。

同時也能學習 JDK 作者大牛們的優(yōu)化思路以及并發(fā)解決方案。

其實寫這篇的前提是源于 GitHub 上的一個 Issues,也希望大家能參與進來,共同維護好這個項目。

更多資料請點擊、關注:

你真的掌握面向?qū)ο蟮乃季S方法嗎?

Java Map遍歷方式方式及性能測試

歡迎大家關注公眾號:java大牛愛好者

最后編輯于
?著作權歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容