1.簡介:
本文分析的ConcurrentHashMap是基于jdk1.6版本,jdk1.8版本的ConcurrentHashMap發(fā)生了較大變化將在下文分析,相比于傳統(tǒng)的線程安全容器hashtable所有方法都是synchronized,對整張哈希表加鎖,ConcurrentHashMap使用分段的思想,及每個鎖鎖住一段數(shù)據(jù)。
private static final int MAXIMUM_CAPACITY = 1 << 30;
private static final int DEFAULT_CAPACITY = 16;
static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
private static final int DEFAULT_CONCURRENCY_LEVEL = 16;
private static final float LOAD_FACTOR = 0.75f;
static final int TREEIFY_THRESHOLD = 8;
static final int MAX_SEGMENTS = 1 << 16;
和hashmap一樣,其默認的初始大小為16,最大容量1<<30,默認裝載因子0.75。concurrentHashMap默認并發(fā)數(shù)量是16.最大并發(fā)數(shù)量為1<<16.
final Segment<K,V>[] segments;
ConcurrentHashMap的主干為一個Segment數(shù)組,其大小必須為2的冪,而Segment類似一個hashmap,每個segment內部維護一個table數(shù)組,tabel中的每個桶為一個hashEntry,hashEntry類似hashmap中的entry,內部維護一個鍵值對。

static final class HashEntry<K,V> {
final K key;
final int hash;
volatile V value;
final HashEntry<K,V> next;
HashEntry(K key, int hash, HashEntry<K,V> next, V value) {
this.key = key;
this.hash = hash;
this.next = next;
this.value = value;
}
@SuppressWarnings("unchecked")
static final <K,V> HashEntry<K,V>[] newArray(int i) {
return new HashEntry[i];
}
}
ConcurrentHashMap允許多個讀操作并發(fā)進行,并沒有對讀操作加鎖.其next是用final修飾的,說明只能在鏈表的頭部新增一個節(jié)點,刪除時也不能像普通hashmap那樣刪除,需要將刪除節(jié)點之前的節(jié)點復制一遍。
segment為ConcurrentHashMap中的一個嵌套類,繼承ReentrantLock,與hashmap類似,其主體為一個hashEntry類型的table,成員變量業(yè)與hashmap類似。
static final class Segment<K,V> extends ReentrantLock implements Serializable {
private static final long serialVersionUID = 2249069246763182397L;
transient volatile int count;
transient int modCount;
transient int threshold;
transient volatile HashEntry<K,V>[] table;
final float loadFactor;
構造函數(shù):
public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
if (concurrencyLevel > MAX_SEGMENTS)
concurrencyLevel = MAX_SEGMENTS;
// Find power-of-two sizes best matching arguments
int sshift = 0;
int ssize = 1;
//找到比concurrencyLevel小的最大的2的冪
while (ssize < concurrencyLevel) {
++sshift;
ssize <<= 1;
}
segmentShift = 32 - sshift;
segmentMask = ssize - 1;
this.segments = Segment.newArray(ssize);
if (initialCapacity > MAXIMUM_CAPACITY)
initialCapacity = MAXIMUM_CAPACITY;
int c = initialCapacity / ssize;
if (c * ssize < initialCapacity)
++c;
int cap = 1;
//找到比c大的最小的2的冪
while (cap < c)
cap <<= 1;
for (int i = 0; i < this.segments.length; ++i)
this.segments[i] = new Segment<K,V>(cap, loadFactor);
}
由此可見ConcurrentHashMap的concurrencyLevel并發(fā)級別就是segment數(shù)組的大小,每個segment的容量為總容量/segment的數(shù)量。
二.主要方法:
get方法:
public V get(Object key) {
int hash = hash(key.hashCode());
return segmentFor(hash).get(key, hash);
}
final Segment<K,V> segmentFor(int hash) {
return segments[(hash >>> segmentShift) & segmentMask];
}
segmentFor方法確定元素在那個segment中,segmentMask=segment大小-1
V get(Object key, int hash) {
if (count != 0) { // read-volatile
HashEntry<K,V> e = getFirst(hash);
while (e != null) {
if (e.hash == hash && key.equals(e.key)) {
V v = e.value;
//在put的時候new了一個entry,
if (v != null)
return v;
return readValueUnderLock(e); // recheck
}
e = e.next;
}
}
return null;
}
get方法沒有加鎖是一種樂觀的設計方法,只有在put方法做了安全檢查。
第一步判段每個桶中鏈表的長度,由于count是volatile的且put和remove都是加鎖的,且修改count值的操作總是在方法的最后一步。所以可能出現(xiàn)一個一個線程正在執(zhí)行get方法,對應桶上鏈表的長度為0,但另一個線程正在執(zhí)行put方法向該桶中添加元素,put方法執(zhí)行到一半還沒有修改count的值。這樣該方法會返回null.
由于value為volatile的,所以當其他線程修改了節(jié)點的value,get方法會立即可見。next為final不可變的,就算刪除操作將節(jié)點刪除暴露出去,由于next不可變,所以也不會發(fā)生并發(fā)安全問題。但getFirst()可能返回過期的頭結點,這一點是允許的。
當value的值為null的時候,調用加鎖的get方法,理論上不可能因為put的時候做了判段,如果鍵為空則拋出空指針異常,但由于在put的時候new了一個Entry:tab[index] = new HashEntry<K,V>(key, hash, first, value);在這個語句中對tab[index]的賦值和對HashEntry的賦值可能發(fā)生指令重排序,導致節(jié)點的值為空。當這個值為空時說明有別的線程正在改變節(jié)點,會引起數(shù)據(jù)不一致,需要加鎖。
put方法:
public V put(K key, V value) {
if (value == null)
throw new NullPointerException();
int hash = hash(key.hashCode());
return segmentFor(hash).put(key, hash, value, false);
}
V put(K key, int hash, V value, boolean onlyIfAbsent) {
lock();
try {
int c = count;
if (c++ > threshold) // ensure capacity
rehash();
HashEntry<K,V>[] tab = table;
int index = hash & (tab.length - 1);
HashEntry<K,V> first = tab[index];
HashEntry<K,V> e = first;
while (e != null && (e.hash != hash || !key.equals(e.key)))
e = e.next;
V oldValue;
if (e != null) {
oldValue = e.value;
if (!onlyIfAbsent)
e.value = value;
}
else {
oldValue = null;
++modCount;
tab[index] = new HashEntry<K,V>(key, hash, first, value);
count = c; // write-volatile
}
return oldValue;
} finally {
unlock();
}
}
put方法在鏈表的頭部插入節(jié)點。
void rehash() {
HashEntry<K,V>[] oldTable = table;
int oldCapacity = oldTable.length;
if (oldCapacity >= MAXIMUM_CAPACITY)
return;
HashEntry<K,V>[] newTable = HashEntry.newArray(oldCapacity<<1);
threshold = (int)(newTable.length * loadFactor);
int sizeMask = newTable.length - 1;
for (int i = 0; i < oldCapacity ; i++) {
// We need to guarantee that any existing reads of old Map can
// proceed. So we cannot yet null out each bin.
HashEntry<K,V> e = oldTable[i];
if (e != null) {
HashEntry<K,V> next = e.next;
int idx = e.hash & sizeMask;
// Single node on list
if (next == null)
newTable[idx] = e;
else {
// Reuse trailing consecutive sequence at same slot
HashEntry<K,V> lastRun = e;
int lastIdx = idx;
for (HashEntry<K,V> last = next;
last != null;
last = last.next) {
int k = last.hash & sizeMask;
if (k != lastIdx) {
lastIdx = k;
lastRun = last;
}
}
newTable[lastIdx] = lastRun;
// Clone all remaining nodes
for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
int k = p.hash & sizeMask;
HashEntry<K,V> n = newTable[k];
newTable[k] = new HashEntry<K,V>(p.key, p.hash,
n, p.value);
}
}
}
}
table = newTable;
}