一、前言
ConcurrentHashMap算是java基礎中非?;镜闹R點,不僅在日常開發(fā)中經(jīng)常用到,面試中也是經(jīng)久不衰的話題。它基本沿用HashMap的接口定義,使得即使不了解其底層原理,也能無縫切換。
談到ConcurrentHashMap,經(jīng)常會拿java7和java8的實現(xiàn)做對比。雖然現(xiàn)在java的版本更新很快,但是常用的還是java8,而看似java7的實現(xiàn)方式已經(jīng)過時了,好像沒必要去了解了,非也。
ConcurrentHashMap在java7中的實現(xiàn)有很多值得學習借鑒的地方,比如基本的數(shù)據(jù)結(jié)構(gòu)數(shù)組鏈表的應用,并發(fā)開發(fā),哈希算法等都可以學以致用。而且了解了java7的實現(xiàn)細節(jié),才能更好的明白java8中為什么要做一些看似莫名其妙的優(yōu)化?
輪子好用,但是造輪子更好玩。
-
ConcurrentHashMap的數(shù)據(jù)結(jié)構(gòu)是怎樣的? -
ConcurrentHashMap的容量為什么是2的整數(shù)次方? - 如何實現(xiàn)的并發(fā)安全?是讀寫分離嗎?get需要加鎖嗎?
- 哈希沖突體現(xiàn)在哪里,如何解決?
- 擴容思想是什么,怎么擴容?
帶著問題探索源碼,才能更容易get到作者的良苦用心。
二、初始化
ConcurrentHashMap初始化可自定義傳入初始容量、負載因子和并發(fā)級別,但并不是單純將值賦值給一些成員變量,而是需要經(jīng)過計算,找到適合的初始值:
-
initialCapacity初始容量,用于計算每個Segment中HashEntry數(shù)組的初始長度。 -
loadFactor負載因子,用于計算每個Segment中HashEntry數(shù)組的擴容閾值。 -
concurrencyLevel并發(fā)級別,用于計算Segment數(shù)組的固定長度。
初始化過程主要是對Segment數(shù)組和其內(nèi)部的HashEntry數(shù)組進行初始化:
(1)通過concurrencyLevel計算ssize作為Segment數(shù)組的固定長度,需要確保計算的ssize是大于等于concurrencyLevel且離concurrencyLevel最近的2的整數(shù)次方的數(shù)值。如默認concurrencyLevel=16,則ssize=16;若自定義傳入concurrencyLevel=15,則ssize=16;concurrencyLevel=17,則ssize=32。
(2)計算用于對key的哈希值進行映射成數(shù)組下標的segmentShift(偏移量)和segmentMask(掩碼)(int index=(hash >>> segmentShift) & segmentMask):
-
segmentShift,key的哈希值右移segmentShift位,最高sshift位用于哈希映射。 -
segmentMask,ssize的掩碼,因為ssize為2的整數(shù)次方值,二進制為1后面若干0,掩碼就是ssize-1,如2(10)掩碼(1)、4(100)掩碼3(11)、8(1000)掩碼7(111)。
(3)通過initialCapacity和ssize平均計算每個Segment內(nèi)部HashEntry數(shù)組的初始長度,但并不是簡單的int c = initialCapacity / ssize,而是像計算ssize一樣,找到一個大于等于c且離c最近的2的整數(shù)次方數(shù)值。
(4)初始化第一個Segment,其他Segment等添加元素時用到了再創(chuàng)建。
(5)初始化Segment數(shù)組。
@SuppressWarnings("unchecked")
public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
if (concurrencyLevel > MAX_SEGMENTS)
concurrencyLevel = MAX_SEGMENTS;
// Find power-of-two sizes best matching arguments
int sshift = 0;
int ssize = 1;
while (ssize < concurrencyLevel) {
++sshift;
// 1.保證并發(fā)級別是2的整數(shù)次方,即Segment<K,V>[]數(shù)組的長度
// ssize = ssize << 1
// ssize = ssize * (2^1)
ssize <<= 1;
}
// 假設 ssize=16, sshift=4
// segmentShift = 28
// segmentMask = 15
// 2.用于對key的哈希值進行映射計算成數(shù)組下標的偏移量和掩碼(int index=(hash >>> segmentShift) & segmentMask)。
// segmentShift:key的哈希值右移segmentShift位,最高sshift位用于計算。
// segmentMask:ssize的掩碼,因為ssize為2的整數(shù)次方值,二進制為1后面若干0,掩碼就是ssize-1,如2(10)掩碼(1)、4(100)掩碼3(11)、8(1000)掩碼7(111)。
this.segmentShift = 32 - sshift; // 偏移量
this.segmentMask = ssize - 1; // 掩碼
if (initialCapacity > MAXIMUM_CAPACITY)
initialCapacity = MAXIMUM_CAPACITY;
// 3.Segment<K,V>[]平均計算每一個Segment內(nèi)部的HashEntry[]數(shù)組的長度
// 但是為了確保HashEntry鏈表的長度也是2的整數(shù)次方,用同樣的方式找到離大于等于c且離c最近的2的整數(shù)次方的數(shù)
int c = initialCapacity / ssize;
if (c * ssize < initialCapacity)
++c;
int cap = MIN_SEGMENT_TABLE_CAPACITY;
while (cap < c)
cap <<= 1;
// 4.create segments and segments[0]
// cap * loadFactor HashEntry的容量 * loadFactor計算閾值
// 構(gòu)造第0個Segment
Segment<K,V> s0 =
new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
(HashEntry<K,V>[])new HashEntry[cap]);
// 5.Segment數(shù)組不可擴容
Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
this.segments = ss;
}
三、Segment是一個容器也是一把鎖
Segment繼承自ReentrantLock,所以其具備并發(fā)鎖的功能,在初始化時傳入concurrencyLevel計算而得的數(shù)組長度ssize,代表最多有ssize個鎖,也就決定了整個ConcurrentHashMap后續(xù)的最大并發(fā)級別是ssize(最多可以有ssize個線程獲取鎖操作容器)。
Segment內(nèi)部又包含了一個HashEntry數(shù)組,用于存儲key-value。
從后續(xù)的擴容原理了解,Segment數(shù)組后續(xù)是不擴容的,真正擴容的是其內(nèi)部的HashEntry數(shù)組,所以Segment數(shù)組的長度的初始設置至關重要。
每一個Segment都有獨立的key-value增刪改和擴容機制,且操作前必須先獲取鎖以確保并發(fā)安全。
static final class Segment<K,V> extends ReentrantLock implements Serializable {
private static final long serialVersionUID = 2249069246763182397L;
/**
* 用于在競爭鎖時最大自旋重試次數(shù)
* The maximum number of times to tryLock in a prescan before
* possibly blocking on acquire in preparation for a locked
* segment operation. On multiprocessors, using a bounded
* number of retries maintains cache acquired while locating
* nodes.
*/
static final int MAX_SCAN_RETRIES =
Runtime.getRuntime().availableProcessors() > 1 ? 64 : 1;
/**
* 每一個HashEntry都是一個鏈表,然后組成一個HashEntry鏈表數(shù)組
* The per-segment table. Elements are accessed via
* entryAt/setEntryAt providing volatile semantics.
*/
transient volatile HashEntry<K,V>[] table;
/**
* 元素的個數(shù),不是線程安全的
* The number of elements. Accessed only either within locks
* or among other volatile reads that maintain visibility.
*/
transient int count;
/**
* 修改總次數(shù),線程不安全
* The total number of mutative operations in this segment.
* Even though this may overflows 32 bits, it provides
* sufficient accuracy for stability checks in CHM isEmpty()
* and size() methods. Accessed only either within locks or
* among other volatile reads that maintain visibility.
*/
transient int modCount;
/**
* HashEntry數(shù)組的擴容閾值,(int)(cap * loadFactor)
* count > threshold 且沒有達到最大元素數(shù)量就擴容
* The table is rehashed when its size exceeds this threshold.
* (The value of this field is always <tt>(int)(capacity *
* loadFactor)</tt>.)
*/
transient int threshold;
/**
* 裝載因子,所有的segments都是一樣的
* The load factor for the hash table. Even though this value
* is same for all segments, it is replicated to avoid needing
* links to outer object.
* @serial
*/
final float loadFactor;
Segment(float lf, int threshold, HashEntry<K,V>[] tab) {
this.loadFactor = lf;
this.threshold = threshold;
this.table = tab;
}
final V put(K key, int hash, V value, boolean onlyIfAbsent) {... ...}
final V remove(Object key, int hash, Object value) {... ...}
final boolean replace(K key, int hash, V oldValue, V newValue) {... ...}
final V replace(K key, int hash, V value) {... ...}
final void clear() {... ...}
}
四、HashEntry真正存放元素的鏈表數(shù)組
ConcurrentHashMap解決哈希沖突的方式是鏈表法,即將哈希沖突的節(jié)點鏈接成一個個鏈表。
基本屬性有final修飾的hash和key,一旦賦值不可修改,volatile修飾的value和next,修改時可以遵循volatile的語義,寫happen-before讀,立即刷新主內(nèi)存。
static final class HashEntry<K,V> {
final int hash;
final K key;
volatile V value;
// java7之前,next是final修飾,
// java7進行了優(yōu)化,在put時,若沒有搶到鎖,會自旋同時初始化一個HashEntry節(jié)點,為后面獲取鎖后節(jié)省時間
volatile HashEntry<K,V> next;
HashEntry(int hash, K key, V value, HashEntry<K,V> next) {
this.hash = hash;
this.key = key;
this.value = value;
this.next = next;
}
/**
* Sets next field with volatile write semantics. (See above
* about use of putOrderedObject.)
*/
final void setNext(HashEntry<K,V> n) {
UNSAFE.putOrderedObject(this, nextOffset, n);
}
// Unsafe mechanics
static final sun.misc.Unsafe UNSAFE;
static final long nextOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class k = HashEntry.class;
nextOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("next"));
} catch (Exception e) {
throw new Error(e);
}
}
}
五、put添加元素
public V put(K key, V value) {
Segment<K,V> s;
if (value == null)
throw new NullPointerException();
int hash = hash(key); // 計算出對應的哈希值
// 假設 ssize=16, sshift=4
// segmentShift = 28
// segmentMask = 15
// 計算的hash向右移28位在與segmentMask作與操作,即以hash值的最高4位映射計算對應的Segment數(shù)組下標j
// 找到對應Segment的位置j,如果該位置的segment還未設置則需要先初始化
int j = (hash >>> segmentShift) & segmentMask;
if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck
(segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment
// cas 初始化segment
s = ensureSegment(j);
// segment的put內(nèi)部加鎖,所以是分段鎖
return s.put(key, hash, value, false);
}
(1)添加元素時,首先對key進行哈希運算,hash()會對非字符串的key做一個補充哈希的處理(Wang/Jenkins hash變體),使得哈希值的高位和低位不相同,減少哈希沖突。
private transient final int hashSeed = randomHashSeed(this);
private int hash(Object k) {
int h = hashSeed;
if ((0 != h) && (k instanceof String)) {
return sun.misc.Hashing.stringHash32((String) k);
}
// 小聲說,補充哈希的位運算沒看懂運算過程...
h ^= k.hashCode();
h += (h << 15) ^ 0xffffcd7d;
h ^= (h >>> 10);
h += (h << 3);
h ^= (h >>> 6);
h += (h << 2) + (h << 14);
return h ^ (h >>> 16);
}
(2)然后對key的哈希值進行映射計算((hash >>> segmentShift) & segmentMask)找到Segment數(shù)組對應下標。
key的哈希值是一個32位的數(shù)值,右移segmentShift位剩余高位和掩碼segmentMask做&運算,這個過程相當于模運算((hash >>> segmentShift) % ssize),計算結(jié)果分布在[0,segmentMask]。
(3)為什么還要對hash映射的下標做(j << SSHIFT) + SBASE))運算呢?
因為通過UNSAFE.getObject可以從主內(nèi)存中獲取最新的Segment,而這個方法需要知道Segment在內(nèi)存中的偏移量。同樣計算主內(nèi)存偏移量的方式在后續(xù)獲取HashEntry時也會用到。
映射Segment數(shù)組下標的過程運用了與運算以及利用2的整數(shù)次方數(shù)值減1就是掩碼的特性,而位運算要比通俗意義上的加減乘除的性能要高,這也是為什么Segment數(shù)組的長度必須是2的整數(shù)次方的原因??上攵髡咴诩氈δ┕?jié)上的極致性能追求。
(4)若找到的segment是空的,則先進行cas初始化ensureSegment,可以看出segments數(shù)組是懶加載:
private Segment<K,V> ensureSegment(int k) {
final Segment<K,V>[] ss = this.segments;
// 算出 內(nèi)存偏移量
long u = (k << SSHIFT) + SBASE; // raw offset
Segment<K,V> seg;
if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) {
// 獲取的seg為null則 開始初始化,獲取0位置的segment,取出基本屬性cap、lf、threshold
Segment<K,V> proto = ss[0]; // use segment 0 as prototype
int cap = proto.table.length;
float lf = proto.loadFactor;
int threshold = (int)(cap * lf);
HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap];
// 再檢查一次
if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
== null) { // recheck
Segment<K,V> s = new Segment<K,V>(lf, threshold, tab);
// 自旋cas設置segment,保證線程安全
while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
== null) {
if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))
break;
}
}
}
return seg;
}
1、Segment#put真正的操作元素
找到對應的segment后,就繼續(xù)對其內(nèi)部的HashEntry鏈表數(shù)組進行操作,這個過程中可能會產(chǎn)生哈希沖突,也可能需要擴容,而作者是如何解決實現(xiàn)的呢?
// java.util.concurrent.ConcurrentHashMap.Segment#put
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
// (1)若獲取鎖,則node=null,沒有獲取鎖,也會做一些事情scanAndLockForPut:
// 拿不到鎖,不立即阻塞,而是先自旋,若自旋到一定次數(shù)仍未拿到鎖,再調(diào)用lock()阻塞;
// 在自旋的過程中遍歷鏈表,若發(fā)現(xiàn)沒有重復的節(jié)點,則提前新建一個節(jié)點,為后面再插入節(jié)省時間。
HashEntry<K,V> node = tryLock() ? null :
scanAndLockForPut(key, hash, value);
V oldValue;
try {
HashEntry<K,V>[] tab = table;
//(2)tab.length是2的整數(shù)次方,所以tab.length-1 的二進制就是若干1,對hash 做與運算,算出的index不會超出tab.length-1
int index = (tab.length - 1) & hash;
// 定位到第index個HashEntry
HashEntry<K,V> first = entryAt(tab, index);
for (HashEntry<K,V> e = first;;) {
if (e != null) {
// (3)e不為空則說明發(fā)生hash沖突,解決hash沖突的辦法:鏈表法,新節(jié)點作為鏈表的頭節(jié)點
// 如果hash值算的不好,經(jīng)常發(fā)生hash沖突,就會造成某一個鏈表很長,性能就會很低。
K k;
// HashEntry key地址相等 or (hash值相等且key值相等)時
if ((k = e.key) == key ||
(e.hash == hash && key.equals(k))) {
oldValue = e.value;
if (!onlyIfAbsent) {
// onlyIfAbsent 為false,則舊值替換為新值,然后break,否則直接break
e.value = value;
++modCount;
}
break;
// put key存在時會新值替換舊值
// putIfAbsent key存在時不替換。
}
// 在e不為null的情況下,向下遍歷,直到找到HashEntry的末尾或者 一個key或者hash相等的HashEntry
e = e.next;
}
else {
if (node != null)
// node不為null,說明前面沒有搶到鎖時,順便初始化了node
// 同時node放在了HashEntry單列表的頭部
node.setNext(first);
else
// 初始化一個node,并放在first鏈表的頭部
node = new HashEntry<K,V>(hash, key, value, first);
int c = count + 1;
if (c > threshold && tab.length < MAXIMUM_CAPACITY)
//(4) 因為count是對一個segment中的HashEntry節(jié)點個數(shù)的統(tǒng)計,
// 如果hash沖突嚴重,鍵值對只添加到HashEntry[]中某幾個HashEntry鏈表中,
// 就造成HashEntry[]有空閑位置,也會造成無味的擴容,內(nèi)存利用率持續(xù)下降
// count 超過了閾值,默認是HashEntry<K,V>[]初始容量*0.75
// 擴容
rehash(node);
else
// (5)更新tab index位置的node
setEntryAt(tab, index, node);
// modCount++
++modCount;
count = c;
oldValue = null;
break;
}
}
} finally {
unlock();
}
return oldValue;
}
(1)segment本身是一把鎖,且為了確保put元素的過程是線程安全的,必須先嘗試獲取鎖。若tryLock()獲取鎖失敗,不會立即阻塞,而是執(zhí)行scanAndLockForPut,自旋重試一定次數(shù),并且在這個過程中不只是單純的自旋,還會初始化添加元素需要的節(jié)點,為后續(xù)獲取鎖后節(jié)省時間,這又是一處體現(xiàn)作者追求極致性能的地方。
/**
* @return a new node if key not found, else null
*/
private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
// 找到對應的HashEntry節(jié)點first
HashEntry<K,V> first = entryForHash(this, hash);
HashEntry<K,V> e = first;
HashEntry<K,V> node = null;
int retries = -1; // negative while locating node
while (!tryLock()) {
HashEntry<K,V> f; // to recheck first below
if (retries < 0) {
if (e == null) {
// 若 找到的節(jié)點是空的,則進行初始化,為后面拿到鎖后,節(jié)省時間
if (node == null) // speculatively create node
node = new HashEntry<K,V>(hash, key, value, null);
// retries為0,防止重復初始化
retries = 0;
}
else if (key.equals(e.key))
// 若 找到節(jié)點不為空,且 key和準備添加的key相等
retries = 0;
else
// e不為空且 key不相等,則向下遍歷該鏈表,e設置為下一個節(jié)點,
// 直到找到下一個節(jié)點空的或者key相等的(可能是替換操作,無需初始化節(jié)點)
e = e.next;
}
else if (++retries > MAX_SCAN_RETRIES) {
// 自旋一定次數(shù)后lock阻塞
lock();
break;
}
else if ((retries & 1) == 0 &&
(f = entryForHash(this, hash)) != first) {
// (retries & 1) == 0 偶數(shù)次檢查頭節(jié)點是否有修改
// 若first節(jié)點被修改了,則重置自旋重試機制,
// 為什么鏈表的第一個節(jié)點會變呢,是因為,新增元素時在頭節(jié)點的位置添加。
e = first = f; // re-traverse if entry changed
retries = -1;
}
}
return node;
}
(2)獲取鎖之后,hash映射HashEntry數(shù)組的下標(int index= (tab.length - 1) & hash),并獲取主內(nèi)存中的first節(jié)點(entryAt(tab, index))。
static final <K,V> HashEntry<K,V> entryAt(HashEntry<K,V>[] tab, int i) {
return (tab == null) ? null :
(HashEntry<K,V>) UNSAFE.getObjectVolatile
(tab, ((long)i << TSHIFT) + TBASE);
}
和映射Segment數(shù)組下標不同的是,這里并沒有對hash做移位操作,也就是映射HashEntry數(shù)組下標用了hash值的低位,映射Segment數(shù)組下標用了hash值的高位,這樣做的目的也是為了使得元素分布均勻,減少哈希沖突(hash()補充哈希使得哈希值高位和低位不同)。
(3)找到的first節(jié)點不為空,則發(fā)生了哈希沖突,需要遍歷鏈表,看看是否有key和hash相同的節(jié)點,有則判斷是否需要替換,不論是否需要替換,都不需要加入新節(jié)點,則結(jié)束本次put操作。若遍歷到末尾依然找不到相同的節(jié)點,則需要將新節(jié)點加到鏈表頭部(頭插法)。
(4)元素個數(shù)count+1,若HashEntry數(shù)組元素count超出閾值且長度未達到最大值,則擴容rehash(node)。
(5)將新增的節(jié)點更新到主內(nèi)存對應位置(setEntryAt):
static final <K,V> void setEntryAt(HashEntry<K,V>[] tab, int i,
HashEntry<K,V> e) {
UNSAFE.putOrderedObject(tab, ((long)i << TSHIFT) + TBASE, e);
}
2、rehash擴容
擴容比較復雜重要,所以單獨拎出來探討,默認情況下當一個Segment中HashEntry數(shù)組的元素個數(shù)大于初始容量的3/4且小于最大長度時觸發(fā)擴容,從函數(shù)命名rehash()也可以看出是一個再哈希的過程:
private void rehash(HashEntry<K,V> node) {
HashEntry<K,V>[] oldTable = table;
int oldCapacity = oldTable.length;
// newCapacity = oldCapacity * 2^1,即為擴容為原來的2倍
int newCapacity = oldCapacity << 1;
// 重新計算新的threshold
threshold = (int)(newCapacity * loadFactor);
HashEntry<K,V>[] newTable =
(HashEntry<K,V>[]) new HashEntry[newCapacity];
int sizeMask = newCapacity - 1;
for (int i = 0; i < oldCapacity ; i++) {
HashEntry<K,V> e = oldTable[i];
if (e != null) {
HashEntry<K,V> next = e.next;
// 計算e在新數(shù)組中的位置
// 對newCapacity - 1 做&運算
int idx = e.hash & sizeMask;
if (next == null) // Single node on list
// next=null 說明是鏈表的最后一個節(jié)點了,直接賦值
newTable[idx] = e;
else { // Reuse consecutive sequence at same slot
// 不是鏈表的最后一個節(jié)點,則需要尋找鏈表的最后一個元素
HashEntry<K,V> lastRun = e;
int lastIdx = idx;
for (HashEntry<K,V> last = next;
last != null;
last = last.next) {
int k = last.hash & sizeMask;
if (k != lastIdx) {
lastIdx = k;
lastRun = last;
}
}
// 找到lastRun節(jié)點,因為lastRun后面的節(jié)點的hash都和lastRun一樣,
// 所以可以直接把lastRun連同后面的節(jié)點一起復制到新數(shù)組的對應位置。
newTable[lastIdx] = lastRun;
// Clone remaining nodes
// 而 lastRun前面的節(jié)點則需要重新計算在新數(shù)組中的位置,
// 并拷貝(new HashEntry)到新數(shù)組中
for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
V v = p.value;
int h = p.hash;
int k = h & sizeMask;
HashEntry<K,V> n = newTable[k];
newTable[k] = new HashEntry<K,V>(h, p.key, v, n);
}
}
}
}
// 將新元素添加到數(shù)組中
int nodeIndex = node.hash & sizeMask; // add the new node
node.setNext(newTable[nodeIndex]);
newTable[nodeIndex] = node;
// 將新鏈表賦值給table, copy on write
// 只能保證最終一致性,不能保證實時一致性
table = newTable;
}
(1)首先新建一個數(shù)組,長度為舊數(shù)組的2倍(oldCapacity << 1)。
(2)按新數(shù)組的長度重新計算擴容閾值(threshold = (int)(newCapacity * loadFactor))。
(3)從舊數(shù)組復制遷移節(jié)點到新數(shù)組。這里又有一個追求極致性能的點,按道理舊數(shù)組中的節(jié)點都需要重新哈希然后映射到新數(shù)組,但是,作者做了一個小優(yōu)化,找到每條鏈表中最后一個與前一個節(jié)點哈希映射新數(shù)組下標不同的點,稱之為lastRun,這樣就把一個鏈表截成了兩半,lastRun之后節(jié)點的哈希映射結(jié)果和lastRun相同,所以只需要復制遷移lastRun節(jié)點即可,其后的節(jié)點可以順帶過去;而lastRun前的節(jié)點則還需要一個個重新和新數(shù)組做哈希映射并復制。如圖:
(4)將新元素添加到新數(shù)組對應位置中。
(5)新數(shù)組賦值給舊數(shù)組(table = newTable),copy on write 思想,所以只能保證最終一致性,不能保證實時一致性,在擴容的過程中也不會影響get的使用。
六、remove刪除元素
刪除元素的方法有兩個:
remove(Object key),刪除鍵為key的元素。
remove(Object key, Object value),刪除鍵為key,值為value的元素。
public V remove(Object key) {
int hash = hash(key);
Segment<K,V> s = segmentForHash(hash);
return s == null ? null : s.remove(key, hash, null);
}
public boolean remove(Object key, Object value) {
int hash = hash(key);
Segment<K,V> s;
return value != null && (s = segmentForHash(hash)) != null &&
s.remove(key, hash, value) != null;
}
代碼基本相同,都是先找到對應的Segment,然后調(diào)用Segment的remove方法。
private Segment<K,V> segmentForHash(int h) {
// hash 映射 segments數(shù)組下標的同時 計算出 該segment在內(nèi)存中的偏移量
long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
// segments數(shù)組是沒有被volatile修飾的,使用getObjectVolatile,
// 可為segments增加volatile語義
return (Segment<K,V>) UNSAFE.getObjectVolatile(segments, u);
}
Segment的remove方法:
- 首先嘗試獲取鎖,獲取失敗會自旋重試一定次數(shù)后依然沒有獲取鎖則阻塞。
- 獲取鎖后,通過哈希映射找到對應的
HashEntry節(jié)點e,若為空則說明沒必要刪除,若不為空則開始遍歷鏈表。 - 找到
key相同的節(jié)點,若有傳遞value,還需判斷該節(jié)點的value是否相同。 - 刪除節(jié)點時需分兩種情況,若被刪除節(jié)點的前驅(qū)為空,則說明是從鏈表頭部刪除,被刪除的節(jié)點的
next節(jié)點作為index位置的新頭節(jié)點;若被刪除節(jié)點的前驅(qū)不為空,則說明是從鏈表中間刪除,將被刪除節(jié)點的next節(jié)點鏈接到其前驅(qū)的next指針上。 - 最后釋放鎖。
final V remove(Object key, int hash, Object value) {
if (!tryLock())
// 1.獲取鎖失敗,會自旋重試一段時間,如果還沒獲取鎖則阻塞
scanAndLock(key, hash);
V oldValue = null;
try {
HashEntry<K,V>[] tab = table;
int index = (tab.length - 1) & hash;
// 找到 對應的hashentry
HashEntry<K,V> e = entryAt(tab, index);
HashEntry<K,V> pred = null;
while (e != null) {
K k;
HashEntry<K,V> next = e.next;
if ((k = e.key) == key ||
(e.hash == hash && key.equals(k))) {
V v = e.value;
if (value == null || value == v || value.equals(v)) {
if (pred == null)
// 被刪節(jié)點的前驅(qū)為空,則說明刪除的節(jié)點是頭節(jié)點,
// 則被刪節(jié)點的next節(jié)點作為index位置的頭節(jié)點。
setEntryAt(tab, index, next);
else
// 若pred不為null,則是從鏈表中間位置刪除的節(jié)點,
// 可將刪除節(jié)點的pred與被刪除節(jié)點的next相連
pred.setNext(next);
++modCount;
--count;
oldValue = v;
}
break;
}
pred = e;
e = next;
}
} finally {
unlock();
}
return oldValue;
}
scanAndLock和scanAndLockForPut的目的基本一樣,scanAndLock少了初始化節(jié)點操作。
private void scanAndLock(Object key, int hash) {
// similar to but simpler than scanAndLockForPut
HashEntry<K,V> first = entryForHash(this, hash);
HashEntry<K,V> e = first;
int retries = -1;
while (!tryLock()) {
HashEntry<K,V> f;
if (retries < 0) {
if (e == null || key.equals(e.key))
// e為null or key存在則置retries=0,開始自旋計數(shù)
retries = 0;
else
e = e.next;
}
else if (++retries > MAX_SCAN_RETRIES) {
// 超過最大自旋次數(shù),則阻塞
lock();
break;
}
else if ((retries & 1) == 0 &&
(f = entryForHash(this, hash)) != first) {
// 偶數(shù)次檢查頭節(jié)點被修改,則 retries=-1 重置重試機制
e = first = f;
retries = -1;
}
}
}
七、get獲取元素
replace方法較為簡單就不做分析了,來看看get方法:
- 首先通過hash計算
segment[]下標同時計算內(nèi)存偏移量。 -
getObjectVolatile從主內(nèi)存中獲取對應Segment。 - 第二次hash計算找到
HashEntry。 - 遍歷鏈表找到key相同的節(jié)點返回即可。
可以看到get沒有加鎖,為什么可以不加鎖呢?
-
put添加元素時更改的是鏈表的頭節(jié)點不會影響get的遍歷,且put和remove修改的是HashEntry的next指針,next被volatile修飾,replace修改的是HashEntry的value,value被volatile修飾,都是利用volatile語義(寫happen-before讀),使得修改后立即刷新主內(nèi)存,并且通知其他線程獲取到最新值。 - put操作中若發(fā)生擴容,其利用了
copy on write思想,在擴容沒有完前,get獲取的數(shù)據(jù)都是一份獨立的舊數(shù)據(jù);又因為Segment中HashEntry數(shù)組被volatile修飾,擴容完成后重新賦值table會立即刷新主內(nèi)存,通知其他線程獲取最新值。
public V get(Object key) {
Segment<K,V> s; // manually integrate access methods to reduce overhead
HashEntry<K,V>[] tab;
int h = hash(key);
// 計算segment[]索引的同時計算內(nèi)存偏移量
long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
// 第一次hash計算找到segment,
// getObjectVolatile 加上volatile語義,強制從主存中獲取屬性值。
// 這個方法要求被使用的屬性被volatile or final(具有happen-before的修飾符)修飾,否則功能和getObject方法相同。
if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
(tab = s.table) != null) {
// 第二次hash計算找到HashEntry
for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
(tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
e != null; e = e.next) {
K k;
// key地址相等 or (hash相等&& key值相等)
if ((k = e.key) == key || (e.hash == h && key.equals(k)))
return e.value;
}
}
return null;
}
八、size獲取元素個數(shù)
一個ConcurrentHashMap被分成了多個Segment,那獲取元素的個數(shù)就是所有Segment中元素個數(shù)之和。而size的過程中有可能在put、remove等影響每個Segment內(nèi)部元素個數(shù)操作,所以需要一個個獲取鎖來統(tǒng)計。但是這樣一個不太重要的size操作把整個Segment數(shù)組鎖住豈不是非常影響寫性能。
所以作者用了一種樂觀鎖的方式,假設在結(jié)算size的過程Segment內(nèi)部沒有發(fā)生修改操作,如果發(fā)生了修改則重試重新計算。
判斷Segment內(nèi)部沒有發(fā)生修改的方式是比對最近兩次總的修改次數(shù)是否一致。而重試也不是無限重試,而是重試2次,加上初始的一次就是3次。
重試3次之后依然檢查到Segment內(nèi)部有修改,則遍歷segments數(shù)組加鎖統(tǒng)計。假設第3次重試和第4次加鎖統(tǒng)計的修改總次數(shù)sum相等則結(jié)束統(tǒng)計返回size,若不相等則第5次統(tǒng)計(重試次數(shù)不等2了,則不會發(fā)生鎖重入)。
public int size() {
// Try a few times to get accurate count. On failure due to
// continuous async changes in table, resort to locking.
final Segment<K,V>[] segments = this.segments;
// 統(tǒng)計所有segment中元素的個數(shù)
int size;
// size的長度超過了32位,代表溢出了,設置為true
boolean overflow; // true if size overflows 32 bits
// 統(tǒng)計所有segment中修改的次數(shù),后續(xù)可判斷segments是否有修改
long sum; // sum of modCounts
// 最近的一次sum
long last = 0L; // previous sum
// 重試次數(shù)
int retries = -1; // first iteration isn't retry
try {
for (;;) {
// RETRIES_BEFORE_LOCK = 2
// 重試三次,樂觀的認為在size的時候,segments內(nèi)部沒有變動
// 注意retries++
if (retries++ == RETRIES_BEFORE_LOCK) {
// 超過三次重試次數(shù),遍歷segments數(shù)組,分別獲取鎖,
for (int j = 0; j < segments.length; ++j)
ensureSegment(j).lock(); // force creation
}
sum = 0L;
size = 0;
overflow = false;
for (int j = 0; j < segments.length; ++j) {
// 遍歷獲取內(nèi)存中的Segment
Segment<K,V> seg = segmentAt(segments, j);
if (seg != null) {
// 統(tǒng)計總修改次數(shù)sum
sum += seg.modCount;
// 統(tǒng)計元素個數(shù),并判斷是否有溢出
int c = seg.count;
if (c < 0 || (size += c) < 0)
overflow = true;
}
}
// 統(tǒng)計的修改總次數(shù)sum和上次記錄的相同則停止重試
if (sum == last)
break;
last = sum;
}
} finally {
// 若重試超過了三次,說明分別獲取過鎖,則需要遍歷釋放鎖
if (retries > RETRIES_BEFORE_LOCK) {
for (int j = 0; j < segments.length; ++j)
segmentAt(segments, j).unlock();
}
}
// 返回size,若size溢出則為Integer.MAX_VALUE
return overflow ? Integer.MAX_VALUE : size;
}
九、總結(jié)
看完java7的ConcurrentHashMap源碼,了解了其實現(xiàn)原理后,心里的疑云基本都解開了:
-
ConcurrentHashMap中segment數(shù)組的長度以及HashEntry數(shù)組的長度之所以要保持為2的整數(shù)次方就是為了利用2的整數(shù)次方數(shù)值減1就是掩碼的特性,哈希值與掩碼做與運算相當于模運算來快速映射計算數(shù)組的下標。 -
ConcurrentHashMap使用分段鎖的方式,每一個segment相等于一把鎖,在修改map時首先會先獲取鎖。而get也不需要加鎖的原因是HashEntry中next指針,value都是volatile修飾的,可以很好的利用其寫happen-before讀的語義,以及修改后立即刷新到主內(nèi)存并通知其他線程獲取最新值。 - 哈希沖突的體現(xiàn)不只是在鏈表中,還是在第一次哈希映射
segment數(shù)組時,多個元素到同一個segment也算是哈希沖突。而解決哈希沖突的方式是鏈表法。 - 哈希映射
segment數(shù)組下標時用了哈希值的高位,而哈希映射HashEntry數(shù)組下標時用的哈希值低位,為的是盡可能利用哈希值,使得元素節(jié)點分布均勻減少沖突。 - 擴容是容量擴大為原來的2倍,然后遍歷每個元素重新哈希映射到新數(shù)組中,但是作者有一個小優(yōu)化就是把一個鏈表截成兩半,以
lastRun為界,lastRun后面的節(jié)點因為和lastRun哈希映射新數(shù)組的結(jié)果一樣,所以可以跟隨lastRun一起復制到新數(shù)組,而lastRun之前的節(jié)點則需要一個個重新哈希映射。 - size統(tǒng)計所有
segment的元素個數(shù),以樂觀重試的方式判斷segment內(nèi)部是否有修改,最近兩次修改次數(shù)一致則返回統(tǒng)計的size。 -
segment數(shù)組一旦初始化后期不可擴容。
PS: 如若文章中有錯誤理解,歡迎批評指正,同時非常期待你的評論、點贊和收藏。我是徐同學,愿與你共同進步!