一. ConcurrentHashMap
ConcurrentHashMap是線程安全且高效的HashMap。
- HashMap線程不安全;
- HashTable使用synchronized保證線程安全,但是效率非常低下;
- ConcurrentHashMap使用鎖分段技術(shù)提升并發(fā)訪問率。
注意:
ConcurrentHashMap的線程安全指的是其內(nèi)部的方法線程安全(如get、put等),即單獨調(diào)用某個方法線程安全的。
當(dāng)代碼塊包含多個操作時,此代碼塊并不是線程安全的,因為不能保證整個代碼塊是互斥執(zhí)行的,因此仍然需要同步處理(加鎖、double check等)。
1. ConcurrentHashMap的結(jié)構(gòu)
JDK1.7版本

HashMap由HashEntry數(shù)組組成,HashEntry是鏈表結(jié)構(gòu)。
ConcurrentHashMap由Segment數(shù)組組成,Segment由HashEntry數(shù)組組成,HashEntry是鏈表結(jié)構(gòu)。
2. ConcurrentHashMap的初始化
ConcurrentHashMap初始化方法是通過initialCapacity、loadFactor和concurrencyLevel等幾個參數(shù)來初始化segment數(shù)組、段偏移量segmentShift、段掩碼segmentMask和每個segment里的HashEntry數(shù)組來實現(xiàn)的。
3. 定位Segment
ConcurrentHashMap使用分段鎖Segment來保護(hù)不同段的數(shù)據(jù),在插入和獲取元素的時候,必須先通過散列算法定位到Segment。
散列算法的目的:是元素盡可能均勻的分布在不同的Segment上,從而提高容器的存取效率。
4. ConcurrentHashMap的操作
(1) get操作
- 對key的hashCode進(jìn)行一次再散列得到一個新的散列值;
- 根據(jù)新的散列值,利用散列運算定位到Segment;
- 再通過散列算法定位到元素;
整個get過程不需要加鎖,除非讀到的值為空才會加鎖重讀。
(2) put操作
- 定位到Segment;
- 判斷是否需要對Segment
(3) size操作
嘗試兩次不鎖住Segment直接統(tǒng)計各Segment大小的方法,如果兩次結(jié)果不相同,則采用加鎖的方式來統(tǒng)計所有Segment的大小。
二. ConcurrentLinkedQueue
ConcurrentLinkedQueue是一個基于鏈接節(jié)點的無界線程安全隊列,采用FIFO的規(guī)則對節(jié)點進(jìn)行排序。
隊列前后分別用head節(jié)點和tail節(jié)點來定位,中間的節(jié)點通過next關(guān)聯(lián)起來。
1. 入隊列
入隊列就是將入隊節(jié)點添加到隊列的尾部。
- 將入隊節(jié)點設(shè)置成當(dāng)前隊列尾節(jié)點的下一個節(jié)點;
- 更新tail節(jié)點,如果tail節(jié)點的next節(jié)點不為空,則將入隊節(jié)點設(shè)置為tail節(jié)點,如果tail節(jié)點的next節(jié)點為空,則將入隊節(jié)點設(shè)置為tail節(jié)點的next節(jié)點。
因此:tail節(jié)點并不一定就是尾節(jié)點,尾節(jié)點可能是tail節(jié)點,也可能是tail節(jié)點的next節(jié)點。
2. 出隊列
出隊列就是從隊列中返回一個節(jié)點元素,并清空該節(jié)點對元素的引用。
- 獲取head節(jié)點的元素,判斷是否為空
- 如果為空,表示另一個線程已經(jīng)進(jìn)行了一次出隊列操作將該節(jié)點的元素取走,如果不為空,則使用CAS將head節(jié)點設(shè)置為null并返回head節(jié)點的元素。
三. 阻塞隊列
阻塞隊列的主要使用場景:生產(chǎn)者和消費者。
四種處理方式:
| 處理方式 | 拋出異常 | 返回特殊值 | 一直阻塞 | 超時退出 |
|---|---|---|---|---|
| 插入方法 | add(e) | offer(e) | put(e) | offer(e,time,unit) |
| 移除方法 | remove() | poll() | take() | poll(time,unit) |
| 檢查方法 | element() | peek() | 不可用 | 不可用 |
拋出異常:隊列滿時,繼續(xù)插入數(shù)據(jù)則拋出IllegalStateException;隊列空時,從隊列取數(shù)據(jù)則拋出NoSuchElementException。
返回特殊值:插入元素時,成功返回true;移除元素時,如果沒有則返回null。
一直阻塞:隊列滿時,繼續(xù)插入數(shù)據(jù)會一直阻塞生產(chǎn)者線程直到隊列可用或響應(yīng)中斷退出;隊列空時,從隊列取數(shù)據(jù)會阻塞消費者線程直到隊列不空。
超時退出:隊列滿時,繼續(xù)插入數(shù)據(jù)會阻塞生產(chǎn)者線程一段時間,超過指定時間則退出。
1. Java里的阻塞隊列
(1) ArrayBlockingQueue
用數(shù)組實現(xiàn)的有界阻塞隊列,F(xiàn)IFO。
(2) LinkedBlockingQueue
用鏈表實現(xiàn)的有界阻塞隊列,默認(rèn)和最大長度為Integer.MAX_VALUE,F(xiàn)IFO。
(3) PriorityBlockingQueue
支持優(yōu)先級的無界阻塞隊列。
(4) DelayQueue
支持延時獲取元素的無界阻塞隊列,在創(chuàng)建元素時可指定延時,只有延遲期滿才能從隊列中提取元素。
(5) SynchronousQueue
不存儲元素的阻塞隊列,每一個put操作必須等待一個take操作,否則不能繼續(xù)添加元素。
(6) LinkedTransferQueue
由鏈表結(jié)構(gòu)組成的無界阻塞TransferQueue隊列,主要多了tryTransfer和transfer方法。
transfer方法:如果當(dāng)前有消費者正在等待接收元素,transfer方法可以把生產(chǎn)者傳入的元素立刻傳輸給消費者;如果沒有消費者在等待接收元素,transfer方法將元素存放在隊列tail節(jié)點,并等到該元素被消費者消費了才返回。
tryTransfer方法:如果當(dāng)前有消費者正在等待接收元素,tryTransfer方法可以把生產(chǎn)者傳入的元素立刻傳輸給消費者;如果沒有消費者在等待接收元素,tryTransfer方法將元素存放在隊列tail節(jié)點,并返回false。
(7) LinkedBlockingDeque
由鏈表結(jié)構(gòu)組成的雙向阻塞隊列,可以從隊列的兩端插入和移除元素。
2. 阻塞隊列的實現(xiàn)原理
使用等待 / 通知模式實現(xiàn)。
public class ArrayBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
private final Condition notEmpty;
private final Condition notFull;
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}
private void enqueue(E x) {
final Object[] items = this.items;
items[putIndex] = x;
if (++putIndex == items.length)
putIndex = 0;
count++;
notEmpty.signal();
}
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}
private E dequeue() {
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
notFull.signal();
return x;
}
}
當(dāng)生產(chǎn)者往滿的隊列中添加元素時會阻塞生產(chǎn)者(await),當(dāng)消費者消費了一個隊列中的元素后,會通知生產(chǎn)者當(dāng)前隊列可用(signal)。
當(dāng)消費者從空的隊列中取元素時會阻塞消費者(await),當(dāng)生產(chǎn)者添加了一個元素后,會通知消費者當(dāng)前隊列可用(signal)。
四. Fork/Join 框架
Fork/Join 框架是Java 7 提供的一個用于并行執(zhí)行任務(wù)的框架,把大任務(wù)分割成若干個小任務(wù),最終匯總每個小任務(wù)結(jié)果后得到大任務(wù)結(jié)果。
- 分割任務(wù);
- 執(zhí)行任務(wù)并合并結(jié)果。