原博客:
https://www.cnblogs.com/lujiango/p/7580558.html
http://www.importnew.com/28263.html
CouncurrentHashMap 線程安全
一、CouncurrentHashMap<jdk1.7>
1、底層:
(1)底層數(shù)據(jù)結(jié)構(gòu):
<jdk1.7>:數(shù)組(Segment) + 數(shù)組(HashEntry) + 鏈表(HashEntry節(jié)點)
底層一個Segments數(shù)組,存儲一個Segments對象,一個Segments中儲存一個Entry數(shù)組,存儲的每個Entry對象又是一個鏈表頭結(jié)點。
(2)基本屬性:
jdk1,7
兩個主要的內(nèi)部類:
class Segment內(nèi)部類,繼承ReentrantLock,有一個HashEntry數(shù)組,用來存儲鏈表頭結(jié)點
int count ; // 此對象中存放的HashEntry個數(shù)
int threshold ; //擴容閾值
volatile HashEntry<K,V>[] table; //儲存entry的數(shù)組,每一個entry都是鏈表的頭部
float loadFactor; //加載因子
方法:
v get(Object key, int hash); 獲取相應(yīng)元素
注意:此方法并不加鎖,因為只是讀操作,
V put(K key, int hash, V value, boolean onlyIfAbsent)
注意:此方法加鎖
class HashEntry 定義的節(jié)點,里面存儲的數(shù)據(jù)和下一個節(jié)點,在此不分析
(3)主要方法:
get():
1、第一次哈希 找到 對應(yīng)的Segment段,
調(diào)用Segment中的get方法
2、再次哈希找到對應(yīng)的鏈表,
3、最后在鏈表中查找。
// 外部類方法
public V get(Object key) {
int hash = hash(key.hashCode());
return segmentFor(hash).get(key, hash); // 第一次hash 確定段的位置
}
//以下方法是在Segment對象中的方法;
//確定段之后在段中再次hash,找出所屬鏈表的頭結(jié)點。
final Segment<K,V> segmentFor(int hash) {
return segments[(hash >>> segmentShift) & segmentMask];
}
V get(Object key, int hash) {
if (count != 0) { // read-volatile
HashEntry<K,V> e = getFirst(hash);
while (e != null) {
if (e.hash == hash && key.equals(e.key)) {
V v = e.value;
if (v != null)
return v;
return readValueUnderLock(e); // recheck
}
e = e.next;
}
}
return null;
}
put():
1、首先確定段的位置,
調(diào)用Segment中的put方法:
2、加鎖
3、檢查當(dāng)前Segment數(shù)組中包含的HashEntry節(jié)點的個數(shù),如果超過閾值就重新hash
4、然后再次hash確定放的鏈表。
5、在對應(yīng)的鏈表中查找是否相同節(jié)點,如果有直接覆蓋,如果沒有將其放置鏈表尾部
//外部類方法
public V put(K key, V value) {
if (value == null)
throw new NullPointerException();
int hash = hash(key.hashCode());
return segmentFor(hash).put(key, hash, value, false); //先確定段的位置
}
// Segment類中的方法
V put(K key, int hash, V value, boolean onlyIfAbsent) {
lock();
try {
int c = count;
if (c++ > threshold) // 如果當(dāng)個數(shù)超過閾值,就重新hash當(dāng)前段的元素 ,
rehash();
HashEntry<K,V>[] tab = table;
int index = hash & (tab.length - 1);
HashEntry<K,V> first = tab[index];
HashEntry<K,V> e = first;
while (e != null && (e.hash != hash || !key.equals(e.key)))
e = e.next;
V oldValue;
if (e != null) {
oldValue = e.value;
if (!onlyIfAbsent)
e.value = value;
}
else {
oldValue = null;
++modCount;
tab[index] = new HashEntry<K,V>(key, hash, first, value);
count = c; // write-volatile
}
return oldValue;
} finally {
unlock();
}
}
(4) 重哈希方式 :重點:
重哈希的方式 :只是對 Segments對象中的Hashentry數(shù)組進行重哈希
2、通過什么保證線程安全
<JDK1.7>,
分段鎖 對整個桶數(shù)組進行了分割分段(Segment),每一把鎖只鎖容器其中一部分數(shù)據(jù),多線程訪問容器里不同數(shù)據(jù)段的數(shù)據(jù),就不會存在鎖競爭,提高并發(fā)訪問率。
<jdk1.8>
使用的是優(yōu)化的synchronized 關(guān)鍵字同步代碼塊 和 cas操作了維護并發(fā)。
3、和 hashTable的保證線程安全的機制有何聯(lián)系
Hashtable通過synchronized修飾方法 來保證線程安全
通過segment(繼承了ReentrantLock)調(diào)用父類的鎖對象加鎖來實現(xiàn),
4、hashMap、 hashTable、 和 ConcurrentHashMap的區(qū)別
主要區(qū)別:
(1):實現(xiàn)線程安全的方式
hashMap是線程不安全的,
hashTable是線程安全的,實現(xiàn)線程安全的機制是使用Synchronized關(guān)鍵字修飾方法。
ConcurrentHashMap
<JDK1.7>,
ConcurrentHashMap(分段鎖) 對整個桶數(shù)組進行了分割分段(Segment),每一把鎖只鎖容器其中一部分數(shù)據(jù),多線程訪問容器里不同數(shù)據(jù)段的數(shù)據(jù),就不會存在鎖競爭,提高并發(fā)訪問率。
<jdk1.8>
使用的是優(yōu)化的synchronized 關(guān)鍵字 和 cas操作了維護并發(fā)。
(2):底層數(shù)據(jù)結(jié)構(gòu):
hashMap同hashTable;都是使用數(shù)組 + 鏈表結(jié)構(gòu)
ConcurrentHashMap
<jdk1.7> :使用 Segment數(shù)組 + HashEntry數(shù)組 + 鏈表
<jdk1.8> :使用 Node數(shù)組+鏈表+ 紅黑樹
(3) : 效率
hashMap只能單線程操作,效率低下
hashTable使用的是synchronized方法鎖,若一個線程搶奪了鎖,其他線程只能等到持鎖線程操作完成之后才能搶鎖操作
《1.7》ConcurrentHashMap 使用的分段鎖,如果一個線程占用一段,別的線程可以操作別的部分,
《1.8》簡化結(jié)構(gòu),put和get不用二次哈希,一把鎖只鎖住一個鏈表或者一棵樹,并發(fā)效率更加提升。
二、CouncurrentHashMap<jdk1.8>底層:
(1)數(shù)據(jù)結(jié)構(gòu):
Node數(shù)組+鏈表 / 紅黑樹: 類似hashMap<jdk1.8>
Node數(shù)組使用來存放樹或者鏈表的頭結(jié)點,當(dāng)一個鏈表中的數(shù)量到達一個數(shù)目時,會使查詢速率降低,所以到達一定閾值時,會將一個鏈表轉(zhuǎn)換為一個紅黑二叉樹,通告查詢的速率。
(2)主要屬性:
外部類的基本屬性
volatile Node<K,V>[] table; // Node數(shù)組用于存放鏈表或者樹的頭結(jié)點
static final int TREEIFY_THRESHOLD = 8; // 鏈表轉(zhuǎn)紅黑樹的閾值 > 8 時
static final int UNTREEIFY_THRESHOLD = 6; // 紅黑樹轉(zhuǎn)鏈表的閾值 <= 6 時
static final int TREEBIN = -2; // 樹根節(jié)點的hash值
static final float LOAD_FACTOR = 0.75f;// 負載因子
static final int DEFAULT_CAPACITY = 16; // 默認大小為16
內(nèi)部類
class Node<K,V> implements Map.Entry<K,V> {
int hash;
final K key;
volatile V val;
volatile Node<K,V> next;
}
jdk1.8中雖然不在使用分段鎖,但是仍然有Segment這個類,但是沒有實際作用
3)主要方法:
1、構(gòu)造方法:
構(gòu)造方法并沒有直接new出來一個Node的數(shù)組,只是檢查數(shù)值之后確定了容量大小。
ConcurrentHashMap(int initialCapacity) {
if (initialCapacity < 0)
throw new IllegalArgumentException();
// 如果傳入的數(shù)值>= 最大容量的一半,就使用最大容量,否則使用
//1.5*initialCapacity +1 ,然后向上取最近的 2 的 n 次方數(shù);
int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
MAXIMUM_CAPACITY :
tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
this.sizeCtl = cap;
}
2、put方法:
步驟:
1、檢查Key或者Value是否為null,
2、得到Kye的hash值
3、如果Node數(shù)組是空的,此時才初始化 initTable(),
4、如果找的對應(yīng)的下標的位置為空,直接new一個Node節(jié)點并放入, break;
5、
6、如果對應(yīng)頭結(jié)點不為空, 進入同步代碼塊
判斷此頭結(jié)點的hash值,是否大于零,大于零則說明是鏈表的頭結(jié)點在鏈表中尋找,
如果有相同hash值并且key相同,就直接覆蓋,返回舊值 結(jié)束
如果沒有則就直接放置在鏈表的尾部
此頭節(jié)點的Hash值小于零,則說明此節(jié)點是紅黑二叉樹的根節(jié)點
調(diào)用樹的添加元素方法
判斷當(dāng)前數(shù)組是否要轉(zhuǎn)變?yōu)榧t黑樹
public V put(K key, V value) {
return putVal(key, value, false);
}
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode());// 得到 hash 值
int binCount = 0; // 用于記錄相應(yīng)鏈表的長度
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
// 如果數(shù)組"空",進行數(shù)組初始化
if (tab == null || (n = tab.length) == 0)
// 初始化數(shù)組
tab = initTable();
// 找該 hash 值對應(yīng)的數(shù)組下標,得到第一個節(jié)點 f
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
// 如果數(shù)組該位置為空,
// 用一次 CAS 操作將新new出來的 Node節(jié)點放入數(shù)組i下標位置
// 如果 CAS 失敗,那就是有并發(fā)操作,進到下一個循環(huán)
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
// hash 居然可以等于 MOVED,這個需要到后面才能看明白,不過從名字上也能猜到,肯定是因為在擴容
else if ((fh = f.hash) == MOVED)
// 幫助數(shù)據(jù)遷移,這個等到看完數(shù)據(jù)遷移部分的介紹后,再理解這個就很簡單了
tab = helpTransfer(tab, f);
else { // 到這里就是說,f 是該位置的頭結(jié)點,而且不為空
V oldVal = null;
// 獲取鏈表頭結(jié)點監(jiān)視器對象
synchronized (f) {
if (tabAt(tab, i) == f) {
if (fh >= 0) { // 頭結(jié)點的 hash 值大于 0,說明是鏈表
// 用于累加,記錄鏈表的長度
binCount = 1;
// 遍歷鏈表
for (Node<K,V> e = f;; ++binCount) {
K ek;
// 如果發(fā)現(xiàn)了"相等"的 key,判斷是否要進行值覆蓋,然后也就可以 break 了
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
// 到了鏈表的最末端,將這個新值放到鏈表的最后面
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
else if (f instanceof TreeBin) { // 紅黑樹
Node<K,V> p;
binCount = 2;
// 調(diào)用紅黑樹的插值方法插入新節(jié)點
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
// binCount != 0 說明上面在做鏈表操作
if (binCount != 0) {
// 判斷是否要將鏈表轉(zhuǎn)換為紅黑樹,臨界值: 8
if (binCount >= TREEIFY_THRESHOLD)
// 如果當(dāng)前數(shù)組的長度小于 64,那么會進行數(shù)組擴容,而不是轉(zhuǎn)換為紅黑樹
treeifyBin(tab, i); // 如果超過64,會轉(zhuǎn)成紅黑樹
if (oldVal != null)
return oldVal;
break;
}
}
}
//
addCount(1L, binCount);
return null;
}
3、get 方法
首先獲取到Key的hash值,
然后找到對應(yīng)的數(shù)組下標處的元素
如果次元素是我們要找的,直接返回,
如果次元素是null 返回null
如果Key的值< 0 ,說明是紅黑樹,
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
int h = spread(key.hashCode()); //獲得Hash值
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
if ((eh = e.hash) == h) { // 比較 此頭結(jié)點e是否是我們需要的元素
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val; // 如果是,就返回
}
else if (eh < 0) // 如果小于零,說明此節(jié)點是紅黑樹
return (p = e.find(h, key)) != null ? p.val : null;
while ((e = e.next) != null) {
// 開始循環(huán) 查找
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}
4、擴容:tryPresize()
容后數(shù)組容量為原來的 2 倍。
private final void tryPresize(int size) {
int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :
tableSizeFor(size + (size >>> 1) + 1);
int sc;
while ((sc = sizeCtl) >= 0) {
Node<K,V>[] tab = table; int n;
if (tab == null || (n = tab.length) == 0) {
n = (sc > c) ? sc : c;
if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
if (table == tab) {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = nt;
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
}
}
else if (c <= sc || n >= MAXIMUM_CAPACITY)
break;
else if (tab == table) {
int rs = resizeStamp(n);
if (sc < 0) {
Node<K,V>[] nt;
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
}
}
}
5.其他內(nèi)部類結(jié)構(gòu)
Node:
ConcurrentHashMap存儲結(jié)構(gòu)的基本單元,實現(xiàn)了Map.Entry接口,用于存儲數(shù)據(jù)。它對value和next屬性設(shè)置了volatile同步鎖(與JDK7的Segment相同),它不允許調(diào)用setValue方法直接改變Node的value域,它增加了find方法輔助map.get()方法。
TreeNode:
繼承于Node,但是數(shù)據(jù)結(jié)構(gòu)換成了二叉樹結(jié)構(gòu),它是紅黑樹的數(shù)據(jù)的存儲結(jié)構(gòu),用于紅黑樹中存儲數(shù)據(jù),當(dāng)鏈表的節(jié)點數(shù)大于8時會轉(zhuǎn)換成紅黑樹的結(jié)構(gòu),他就是通過TreeNode作為存儲結(jié)構(gòu)代替Node來轉(zhuǎn)換成黑紅樹。
TreeBin:
從字面含義中可以理解為存儲樹形結(jié)構(gòu)的容器,而樹形結(jié)構(gòu)就是指TreeNode,所以TreeBin就是封裝TreeNode的容器,它提供轉(zhuǎn)換黑紅樹的一些條件和鎖的控制。
ForwardingNode:
一個用于連接兩個table的節(jié)點類。它包含一個nextTable指針,用于指向下一張表。而且這個節(jié)點的key value next指針全部為null,它的hash值為-1. 這里面定義的find的方法是從nextTable里進行查詢節(jié)點,而不是以自身為頭節(jié)點進行查找。
Unsafe和CAS:
在ConcurrentHashMap中,隨處可以看到U, 大量使用了U.compareAndSwapXXX的方法,這個方法是利用一個CAS算法實現(xiàn)無鎖化的修改值的操作,他可以大大降低鎖代理的性能消耗。這個算法的基本思想就是不斷地去比較當(dāng)前內(nèi)存中的變量值與你指定的一個變量值是否相等,如果相等,則接受你指定的修改的值,否則拒絕你的操作。因為當(dāng)前線程中的值已經(jīng)不是最新的值,你的修改很可能會覆蓋掉其他線程修改的結(jié)果。這一點與樂觀鎖,SVN的思想是比較類似的。
6、通過什么保證線程安全
通過使用Synchroized關(guān)鍵字來同步代碼塊,而且只是在put方法中加鎖,在get方法中沒有加鎖
在加鎖時是使用頭結(jié)點作為同步鎖對象。,并且定義了三個原子操作方法
/ 獲取tab數(shù)組的第i個node<br>
@SuppressWarnings("unchecked")
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
}
// 利用CAS算法設(shè)置i位置上的node節(jié)點。csa(你叫私有空間的值和內(nèi)存中的值是否相等),即這個操作有可能不成功。
static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
Node<K,V> c, Node<K,V> v) {
return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
}
// 利用volatile方法設(shè)置第i個節(jié)點的值,這個操作一定是成功的。
static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {
U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v);
}
3、和 hashTable的保證線程安全的機制有何聯(lián)系
Hashtable通過synchronized修飾方法 來保證線程安全
通過synchronized同步代碼塊和 CAS操作來實現(xiàn)線程安全
由此拋出的問題:
為什么要用synchronized,cas不是已經(jīng)可以保證操作的線程安全嗎?
原因:
CAS也是適用一些場合的,比如資源競爭小時,是非常適用的,不用進行內(nèi)核態(tài)和用戶態(tài)之間
的線程上下文切換,同時自旋概率也會大大減少,提升性能,但資源競爭激烈時(比如大量線
程對同一資源進行寫和讀操作)并不適用,自旋概率會大大增加,從而浪費CPU資源,降低性
能