阿里面試官經(jīng)常問的 HashMap 和 ConcurrentHashMap,相信看完這篇沒有面試官能再難住你

老讀者就請(qǐng)肆無忌憚地點(diǎn)贊吧,微信搜索【沉默王二】關(guān)注這個(gè)在九朝古都洛陽茍且偷生的程序員。
本文 GitHub github.com/itwanger 已收錄,里面還有我精心為你準(zhǔn)備的一線大廠面試題。

HashMap 是 Java 中非常強(qiáng)大的數(shù)據(jù)結(jié)構(gòu),使用頻率非常高,幾乎所有的應(yīng)用程序都會(huì)用到它。但 HashMap 不是線程安全的,不能在多線程環(huán)境下使用,該怎么辦呢?

1)Hashtable,一個(gè)老掉牙的同步哈希表,t 竟然還是小寫的,一看就非常不專業(yè):

public class Hashtable<K,V>
        extends Dictionary<K,V>
        implements Map<K,V>, Cloneable, java.io.Serializable {
    public synchronized V put(K key, V value) {}
    public synchronized int size() {}
    public synchronized V get(Object key) {}
}

里面的方法全部是 synchronized,同步的力度非常大,對(duì)不對(duì)?這樣的話,性能就沒法保證了。pass。

2)Collections.synchronizedMap(new HashMap<String, String>()),可以把一個(gè) HashMap 包裝成同步的 SynchronizedMap:

private static class SynchronizedMap<K,V>
        implements Map<K,V>, Serializable {
    public int size() {
        synchronized (mutex) {return m.size();}
    }
    public V get(Object key) {
        synchronized (mutex) {return m.get(key);}
    }
    public V put(K key, V value) {
        synchronized (mutex) {return m.put(key, value);}
    }
}

可以看得出,SynchronizedMap 確實(shí)比 Hashtable 改進(jìn)了,synchronized 不再放在方法上,而是放在方法內(nèi)部,作為同步塊出現(xiàn),但仍然是對(duì)象級(jí)別的同步鎖,讀和寫操作都需要獲取鎖,本質(zhì)上,仍然只允許一個(gè)線程訪問,其他線程被排斥在外。

3)ConcurrentHashMap,本篇的主角,唯一正確的答案。Concurrent 這個(gè)單詞就是并發(fā)、并行的意思,所以 ConcurrentHashMap 就是一個(gè)可以在多線程環(huán)境下使用的 HashMap。

ConcurrentHashMap 一直在進(jìn)化,Java 7 和 Java 8 就有很大的不同。Java 7 版本的 ConcurrentHashMap 是基于分段鎖的,就是將內(nèi)部分成不同的 Segment(段),每個(gè)段里面是 HashEntry 數(shù)組。

來看一下 Segment:

static final class Segment<K,V> extends ReentrantLock implements Serializable {
       transient volatile HashEntry<K,V>[] table;
       transient int count;
       transient int modCount;
       transient int threshold;
       final float loadFactor;
}

再來看一下 HashEntry:

static final class HashEntry<K,V> {
        final K key;                       // 聲明 key 為 final 型
        final int hash;                   // 聲明 hash 值為 final 型
        volatile V value;                 // 聲明 value 為 volatile 型
        final HashEntry<K,V> next;      // 聲明 next 為 final 型

        HashEntry(K key, int hash, HashEntry<K,V> next, V value) {
            this.key = key;
            this.hash = hash;
            this.next = next;
            this.value = value;
        }
 }

和 HashMap 非常相似,唯一的區(qū)別就是 value 是 volatile 的,保證 get 時(shí)候的可見性。

Segment 繼承自 ReentrantLock,所以不會(huì)像 Hashtable 那樣不管是 put 還是 get 都需要 synchronized,鎖的力度變小了,每個(gè)線程只鎖一個(gè) Segment,對(duì)其他線程訪問的 Segment 沒有影響。

Java 8 和之后的版本在此基礎(chǔ)上做了很大的改進(jìn),不再采用分段鎖的機(jī)制了,而是利用 CAS(Compare and Swap,即比較并替換,實(shí)現(xiàn)并發(fā)算法時(shí)常用到的一種技術(shù))和 synchronized 來保證并發(fā),雖然內(nèi)部仍然定義了 Segment,但僅僅是為了保證序列化時(shí)的兼容性,代碼注釋上就可以看得出來:

/**
 * Stripped-down version of helper class used in previous version,
 * declared for the sake of serialization compatibility.
 */
static class Segment<K,V> extends ReentrantLock implements Serializable {
    final float loadFactor;
    Segment(float lf) { this.loadFactor = lf; }
}

底層結(jié)構(gòu)和 Java 7 也有所不同,更接近 HashMap(數(shù)組+雙向鏈表+紅黑樹):

來看一下新版 ConcurrentHashMap 定義的關(guān)鍵字段:

public class ConcurrentHashMap<K,V> extends AbstractMap<K,V>
        implements ConcurrentMap<K,V>, Serializable {
    transient volatile Node<K,V>[] table;
    private transient volatile Node<K,V>[] nextTable;
    private transient volatile int sizeCtl;
}

1)table,默認(rèn)為 null,第一次 put 的時(shí)候初始化,默認(rèn)大小為 16,用來存儲(chǔ) Node 節(jié)點(diǎn),擴(kuò)容時(shí)大小總是 2 的冪次方。

順帶看一下 Node 的定義:

static class Node<K,V> implements Map.Entry<K,V> {
        final int hash;
        final K key;
        volatile V val;
        volatile Node<K,V> next;
        // … 
}

hash 和 key 是 final 的,和 HashMap 的 Node 一樣,因?yàn)?key 是不會(huì)發(fā)生變化的。val 和 next 是 volatile 的,保證多線程環(huán)境下的可見性。

2)nextTable,默認(rèn)為 null,擴(kuò)容時(shí)新生成的數(shù)組,大小為原數(shù)組的兩倍。

3)sizeCtl,默認(rèn)為 0,用來控制 table 的初始化和擴(kuò)容操作。-1 表示 table 正在初始化;-(1+線程數(shù)) 表示正在被多個(gè)線程擴(kuò)容。

Map 最重要的方法就是 put,ConcurrentHashMap 也不例外:

public V put(K key, V value) {
    return putVal(key, value, false);
}

final V putVal(K key, V value, boolean onlyIfAbsent) {
    if (key == null || value == null) throw new NullPointerException();
    int hash = spread(key.hashCode());
    int binCount = 0;
    for (Node<K,V>[] tab = table;;) {
        Node<K,V> f; int n, i, fh;
        if (tab == null || (n = tab.length) == 0)
            tab = initTable();
        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
            if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null)))
                break;                   // no lock when adding to empty bin
        }
        else if ((fh = f.hash) == MOVED)
            tab = helpTransfer(tab, f);
        ...省略部分代碼
    }
    addCount(1L, binCount);
    return null;
}

1)spread() 是一個(gè)哈希算法,和 HashMap 的 hash() 方法類似:

static final int spread(int h) {
    return (h ^ (h >>> 16)) & HASH_BITS;
}

2)如果是第一次 put 的話,會(huì)調(diào)用 initTable() 對(duì) table 進(jìn)行初始化。

private final ConcurrentHashMap.Node<K,V>[] initTable() {
    ConcurrentHashMap.Node<K,V>[] tab; int sc;
    while ((tab = table) == null || tab.length == 0) {
        if ((sc = sizeCtl) < 0)
            Thread.yield(); // lost initialization race; just spin
        else if (U.compareAndSetInt(this, SIZECTL, sc, -1)) {
            try {
                if ((tab = table) == null || tab.length == 0) {
                    int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                    @SuppressWarnings("unchecked")
                    ConcurrentHashMap.Node<K,V>[] nt = (ConcurrentHashMap.Node<K,V>[])new ConcurrentHashMap.Node<?,?>[n];
                    table = tab = nt;
                    sc = n - (n >>> 2);
                }
            } finally {
                sizeCtl = sc;
            }
            break;
        }
    }
    return tab;
}

外層用了一個(gè) while 循環(huán),如果發(fā)現(xiàn) sizeCtl 小于 0 的話,就意味著其他線程正在初始化,yield 讓出 CPU。

第一次 put 的時(shí)候會(huì)執(zhí)行 U.compareAndSetInt(this, SIZECTL, sc, -1),把 sizeCtl 賦值為 -1,表示當(dāng)前線程正在初始化。

private static final Unsafe U = Unsafe.getUnsafe();
private static final long SIZECTL
        = U.objectFieldOffset(ConcurrentHashMap.class, "sizeCtl");

U 是一個(gè) Unsafe(可以提供硬件級(jí)別的原子操作,可以獲取某個(gè)屬性在內(nèi)存中的位置,也可以修改對(duì)象的字段值)對(duì)象,compareAndSetInt() 是 Unsafe 的一個(gè)本地(native)方法,它就負(fù)責(zé)把 ConcurrentHashMap 的 sizeCtl 修改為指定的值(-1)。

初始化后的 table 大小為 16(DEFAULT_CAPACITY)。

不是第一次 put 的話,會(huì)調(diào)用 tabAt() 取出 key 位置((n - 1) & hash)上的值(f):

static final <K,V> ConcurrentHashMap.Node<K,V> tabAt(ConcurrentHashMap.Node<K,V>[] tab, int i) {
    return (ConcurrentHashMap.Node<K,V>)U.getReferenceAcquire(tab, ((long)i << ASHIFT) + ABASE);
}

U.getReferenceAcquire() 會(huì)調(diào)用 Unsafe 的 本地方法 getReferenceVolatile() 獲取指定內(nèi)存中的數(shù)據(jù),保證每次拿到的數(shù)據(jù)都是最新的。

如果 f 為 null,說明 table 中這個(gè)位置上是第一次 put 元素,調(diào)用 casTabAt() 插入 Node。

static final <K,V> boolean casTabAt(ConcurrentHashMap.Node<K,V>[] tab, int i,
                                    ConcurrentHashMap.Node<K,V> c, ConcurrentHashMap.Node<K,V> v) {
    return U.compareAndSetReference(tab, ((long)i << ASHIFT) + ABASE, c, v);
}

如果 CAS 成功,說明 Node 插入成功,執(zhí)行 addCount() 方法檢查是否需要擴(kuò)容。

如果失敗,說明有其他線程提前插入了 Node,進(jìn)行下一輪 for 循環(huán)繼續(xù)嘗試,俗稱自旋。

如果 f 的 hash 為 MOVED(-1),意味著有其他線程正在擴(kuò)容,執(zhí)行 helpTransfer() 一起擴(kuò)容。

否則,把 Node 按鏈表或者紅黑樹的方式插入到合適的位置,這個(gè)過程是通過 synchronized 塊實(shí)現(xiàn)的。

synchronized (f) {
    if (tabAt(tab, i) == f) {
        if (fh >= 0) {
            binCount = 1;
            for (Node<K,V> e = f;; ++binCount) {
                K ek;
                if (e.hash == hash &&
                    ((ek = e.key) == key ||
                     (ek != null && key.equals(ek)))) {
                    oldVal = e.val;
                    if (!onlyIfAbsent)
                        e.val = value;
                    break;
                }
                Node<K,V> pred = e;
                if ((e = e.next) == null) {
                    pred.next = new Node<K,V>(hash, key,
                                              value, null);
                    break;
                }
            }
        }
        else if (f instanceof TreeBin) {
            Node<K,V> p;
            binCount = 2;
            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                           value)) != null) {
                oldVal = p.val;
                if (!onlyIfAbsent)
                    p.val = value;
            }
        }
    }
}

1)插入之前,再次調(diào)用 tabAt(tab, i) == f 來判斷 f 是否被其他線程修改。

2)如果 fh(f 的哈希值) >= 0,說明 f 是鏈表的頭節(jié)點(diǎn),遍歷鏈表,找到對(duì)應(yīng)的 Node,更新值,否則插入到末尾。

3)如果 f 是紅黑樹,則按照紅黑樹的方式插入或者更新節(jié)點(diǎn)。

分析完 put() 方法后,再來看 get() 方法:

public V get(Object key) {
    ConcurrentHashMap.Node<K,V>[] tab; ConcurrentHashMap.Node<K,V> e, p; int n, eh; K ek;
    int h = spread(key.hashCode());
    if ((tab = table) != null && (n = tab.length) > 0 &&
            (e = tabAt(tab, (n - 1) & h)) != null) {
        if ((eh = e.hash) == h) {
            if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                return e.val;
        }
        else if (eh < 0)
            return (p = e.find(h, key)) != null ? p.val : null;
        while ((e = e.next) != null) {
            if (e.hash == h &&
                    ((ek = e.key) == key || (ek != null && key.equals(ek))))
                return e.val;
        }
    }
    return null;
}

是不是簡(jiǎn)單很多?

1)如果哈希值相等((eh = e.hash) == h),直接返回 table 數(shù)組中的元素。

2)如果是紅黑樹(eh < 0),按照紅黑樹的方式 find 返回。

3)如果是鏈表,進(jìn)行遍歷,然后根據(jù) key 獲取 value。

最后,來寫一個(gè) ConcurrentHashMap 的應(yīng)用實(shí)例吧!

/**
 * @author 沉默王二,一枚有趣的程序員
 */
public class ConcurrentHashMapDemo {
    public final static int THREAD_POOL_SIZE = 5;

    public static void main(String[] args) throws InterruptedException {
        Map<String, String> map = new ConcurrentHashMap<>();

        long startTime = System.nanoTime();
        ExecutorService crunchifyExServer = Executors.newFixedThreadPool(THREAD_POOL_SIZE);
        for (int j = 0; j < THREAD_POOL_SIZE; j++) {
            crunchifyExServer.execute(new Runnable() {
                @SuppressWarnings("unused")
                @Override
                public void run() {
                    for (int i = 0; i < 500000; i++) {
                        map.put("itwanger"+i, "沉默王二");
                    }
                }
            });
        }

        crunchifyExServer.shutdown();
        crunchifyExServer.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);

        long entTime = System.nanoTime();
        long totalTime = (entTime - startTime) / 1000000L;
        System.out.println(totalTime + "ms");
    }
}

給同學(xué)們留一道作業(yè)題,感興趣的話可以嘗試下,把 ConcurrentHashMap 換成 SynchronizedMap,比較一下兩者性能上的差異,差距還是挺明顯的。


我是沉默王二,一枚在九朝古都洛陽茍且偷生的程序員。關(guān)注即可提升學(xué)習(xí)效率,感謝你的三連支持,奧利給??。

注:如果文章有任何問題,歡迎毫不留情地指正。

如果你覺得文章對(duì)你有些幫助,歡迎微信搜索「沉默王二」第一時(shí)間閱讀,回復(fù)關(guān)鍵字「小白」可以免費(fèi)獲取我肝了 4 萬+字的 《Java 小白從入門到放肆》2.0 版;本文 GitHub github.com/itwanger 已收錄,歡迎 star。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容