5.1 同步容器類(lèi)
同步容器類(lèi)包括Vector和HashTable以及Collection.synchronizedXxx等工廠(chǎng)方法。這些類(lèi)實(shí)現(xiàn)線(xiàn)程安全的方式是:將它們的狀態(tài)封裝起來(lái),并對(duì)每個(gè)公有方法都進(jìn)行同步,使得每次只有一個(gè)線(xiàn)程能訪(fǎng)問(wèn)容器的狀態(tài)。
5.1.1 同步容器類(lèi)的問(wèn)題
同步容器類(lèi)都是線(xiàn)程安全的,但在某些情況下可能需要額外的客戶(hù)端加鎖來(lái)保護(hù)復(fù)合操作。容器中常見(jiàn)的復(fù)合操作包括:迭代,跳轉(zhuǎn),條件運(yùn)算(比如“若沒(méi)有則添加”)等。
- 迭代問(wèn)題:如下為一個(gè)簡(jiǎn)單的Vector的遍歷,看起來(lái)這段遍歷代碼并沒(méi)有什么問(wèn)題,但實(shí)際上在多線(xiàn)程情況下,假設(shè)線(xiàn)程A在迭代遍歷,同時(shí)線(xiàn)程B在修改Vector,比如刪除操作,那么就可能會(huì)拋出異常。
for (int i = 0; i < vector.size(); i ++)
dosomething(vector.get(i));
為了解決這個(gè)問(wèn)題,我們可以對(duì)其進(jìn)行客戶(hù)端加鎖操作:
synchronized(vector){
for (int i = 0; i < vector.size(); i ++)
dosomething(vector.get(i));
}
- 條件運(yùn)算問(wèn)題
假設(shè)Vector定義兩個(gè)方法:getLast和deleteLast,它們都會(huì)進(jìn)行“先檢查后運(yùn)行”操作。
public static Object getLast(Vector list){
int lastIndex = list.size() - 1;
return list.get(lastIndex);
}
public static void deleteLast(Vector list){
int lastIndex = list.size() - 1;
list.remove(lastIndex);
}
假設(shè)線(xiàn)程A在包含10個(gè)元素的Vector上調(diào)用getLast,同時(shí)線(xiàn)程B在同一個(gè)Vector上調(diào)用deleteLast,這些操作交替執(zhí)行如下圖:

那么當(dāng)執(zhí)行到get操作時(shí),會(huì)報(bào)出異常。
為了解決這個(gè)問(wèn)題,我們可以采用客戶(hù)端加鎖的方法來(lái)保證復(fù)合操作的線(xiàn)程安全性:
public static Object getLast(Vector list){
synchronized(list){
int lastIndex = list.size() - 1;
return list.get(lastIndex);
}
}
public static void deleteLast(Vector list){
synchronized(list){
int lastIndex = list.size() - 1;
list.remove(lastIndex);
}
}
如此,便可以保證線(xiàn)程的安全性
5.1.2 迭代器與ConcurrentModificationException
在設(shè)計(jì)同步容器類(lèi)的迭代器時(shí)并沒(méi)有考慮到并發(fā)修改的問(wèn)題,并且它們表現(xiàn)出的行為是“及時(shí)失敗(fail-fast)”的。這意味著,當(dāng)它們發(fā)現(xiàn)容器在迭代過(guò)程中被修改時(shí),就會(huì)拋出一個(gè)ConcurrentModificationException異常。
注意:這種“及時(shí)失敗”的迭代器并不是一種完備的處理機(jī)制,而只是“善意地”捕獲并發(fā)錯(cuò)誤,因此,只能作為并發(fā)問(wèn)題的預(yù)警指示器。
解決方法:
a. 在迭代期間對(duì)容器加鎖,某些線(xiàn)程必須在等待迭代過(guò)程結(jié)束才可以進(jìn)行訪(fǎng)問(wèn)或修改,如果容器的規(guī)模很大,或者每個(gè)元素執(zhí)行操作的時(shí)間很長(zhǎng),那么這些線(xiàn)程將長(zhǎng)時(shí)間等待。我們知道持有鎖的時(shí)間越長(zhǎng),那么在鎖上的競(jìng)爭(zhēng)就可能越激烈,如果許多線(xiàn)程都在等待鎖被釋放,那么將極大地降低吞吐量和CPU的利用率。
b. ”克隆“容器,并在副本上進(jìn)行迭代。由于副本被封閉在線(xiàn)程內(nèi),因此其他線(xiàn)程無(wú)法在迭代期間對(duì)其進(jìn)行修改。但克隆容器時(shí)存在顯著的性能開(kāi)銷(xiāo)。
5.1.3 隱藏迭代器
public class HiddenIterator{
private final Set<Integer> set = new Hashset<Integer>();
public synchronized void add(Integer i){ set.add(i); }
public synchronized void remove(Integer i){ set.remove(i); }
public void addTenThings(){
Random r = new Random();
for (itn i = 0; i < 10; i ++)
add(r.nextInt());
//進(jìn)行了隱式地迭代
System.out.println("DEBUG: added ten elements to " + set);
}
}
如上,addTenThings方法可能會(huì)拋出ConcurrentModificationException異常,因?yàn)樵谏烧{(diào)試信息時(shí),toString會(huì)對(duì)容器set進(jìn)行迭代。
除此之外,常見(jiàn)的隱式迭代器還有hashCode和equals,containsAll,removeAll等方法。
5.2 并發(fā)容器
- 同步容器將所有對(duì)容器狀態(tài)的訪(fǎng)問(wèn)都進(jìn)行了串行化,以實(shí)現(xiàn)它們的線(xiàn)程安全性。這種方法的代價(jià)是嚴(yán)重降低并發(fā)性,當(dāng)多個(gè)線(xiàn)程競(jìng)爭(zhēng)容器的鎖時(shí),吞吐量將嚴(yán)重減低。
- 為此我們通過(guò)并發(fā)容器來(lái)代替同步容器,從而提高伸縮性并降低風(fēng)險(xiǎn)。
常見(jiàn)的并發(fā)容器:
- ConcurrentHashMap:用于代替同步且基于散列的Map。
- 與HashMap一樣,ConcurrentHashMap也是基于散列的Map,但它使用了一種完全不同的加鎖策略來(lái)提高并發(fā)性和伸縮性。ConcurrentHashMap并不是將每個(gè)方法都在同一個(gè)鎖上同步并使得每次只能一個(gè)線(xiàn)程訪(fǎng)問(wèn)容器,而是使用一種粒度更細(xì)的加鎖機(jī)制來(lái)實(shí)現(xiàn)更大程度的共享,這種機(jī)制稱(chēng)為分段鎖。在這種機(jī)制下,任意數(shù)量的讀取線(xiàn)程可以并發(fā)地訪(fǎng)問(wèn)Map,并且一定數(shù)量的寫(xiě)線(xiàn)程可以并發(fā)地修改Map。
- ConcurrentHashMap提供的迭代器不會(huì)拋出ConcurrentModificationException異常,因此不需要在迭代過(guò)程對(duì)容器進(jìn)行加鎖。因?yàn)镃oncurrentHashMap返回的迭代器具有弱一致性,而并非”及時(shí)失敗“。弱一致性的迭代器可以容忍并發(fā)的修改,當(dāng)創(chuàng)建迭代器時(shí)會(huì)遍歷已有的元素,并可以在迭代器被構(gòu)造后將修改操作反映給容器。
注意:
a. 在多線(xiàn)程環(huán)境下,我們通常選取ConcurrentHashMap,只有當(dāng)應(yīng)用程序需要加鎖Map以進(jìn)行獨(dú)占訪(fǎng)問(wèn)時(shí),才應(yīng)該放棄ConcurrentHashMap。
b. 由于ConcurrentHashMap不能被加鎖來(lái)執(zhí)行獨(dú)占訪(fǎng)問(wèn),因此我們無(wú)法使用客戶(hù)端加鎖的方式來(lái)創(chuàng)建新的原子操作。但ConcurrentHashMap提供了一些”額外的原子Map操作“。
public interface ConcurrentHashMap<k,V> extends Map<K,V>{
//僅當(dāng)K沒(méi)有相應(yīng)的映射值時(shí)才插入
V putIfAbsent(K key, V value);
//僅當(dāng)K被映射到V時(shí)才移除
boolean remove(K key, V value);
//僅當(dāng)K被映射到oldValue時(shí)才被替換
boolean replace(K key, V oldValue, V newValue);
//僅當(dāng)K被映射到某個(gè)值時(shí),才替換為newValue
boolean replace(K key, V newValue);
}
- CopyOnWriteArrayList
a. CopyOnWriteArrayList保證正確地發(fā)布一個(gè)事實(shí)不可變的對(duì)象,那么在訪(fǎng)問(wèn)該對(duì)象是就不需要進(jìn)一步的同步。
b. CopyOnWriteArrayList是通過(guò)在每次修改時(shí),都會(huì)創(chuàng)建并重新發(fā)布一個(gè)新的容器副本,從而實(shí)現(xiàn)可變性。
c. CopyOnWriteArrayList的迭代器保留一個(gè)指向底層的基礎(chǔ)數(shù)組引用,這個(gè)數(shù)組當(dāng)前位于迭代器的起始位置,由于它不會(huì)被修改,因此在對(duì)其進(jìn)行同步時(shí)只需確保數(shù)組內(nèi)容的可見(jiàn)性。因此,多個(gè)線(xiàn)程可以同時(shí)對(duì)這個(gè)容器進(jìn)行迭代,而不會(huì)彼此干擾或者與修改容器的線(xiàn)程相互干擾。
d. CopyOnWriteArrayList的迭代器不會(huì)拋出ConcurrentModificationException異常。
e. CopyOnWriteArrayList的最大缺點(diǎn)是每當(dāng)修改容器時(shí)都需要復(fù)制底層數(shù)組,這需要一定的開(kāi)銷(xiāo)。因此,僅當(dāng)?shù)鞑僮鬟h(yuǎn)遠(yuǎn)大于修改操作時(shí),才應(yīng)該用CopyOnWriteArrayList容器。例如,我們常見(jiàn)的通知系統(tǒng):在分發(fā)通知時(shí)需要迭代已注冊(cè)監(jiān)聽(tīng)器鏈表,并調(diào)用每個(gè)監(jiān)聽(tīng)器。
5.3 阻塞隊(duì)列和生產(chǎn)者-消費(fèi)者模式
阻塞隊(duì)列提供了可阻塞的put和take方法,以及支持定時(shí)的offer和poll方法。如果隊(duì)列已經(jīng)滿(mǎn)了,那么put方法會(huì)阻塞直到有空間可用;如果隊(duì)列為空,那么take方法會(huì)阻塞直到有元素可用。
阻塞隊(duì)列支持生產(chǎn)者-消費(fèi)者模式,當(dāng)數(shù)據(jù)生成時(shí),生產(chǎn)者把數(shù)據(jù)放入隊(duì)列,而當(dāng)消費(fèi)者準(zhǔn)備處理數(shù)據(jù)時(shí),將從隊(duì)列中取出。
兩種特殊情況:
a.如果生產(chǎn)者不能盡快地產(chǎn)生工作使消費(fèi)者保持忙碌,那么消費(fèi)者就只能一直等待,直到有工作可做。在某些情況下,這種方式是合適的,比如在服務(wù)器應(yīng)用程序中,沒(méi)有任何客戶(hù)請(qǐng)求服務(wù)。但在其他另一些情況下,就不適用,比如“網(wǎng)頁(yè)爬蟲(chóng)”中,有無(wú)窮的工作需要完成。
解決方法:
調(diào)整生產(chǎn)者線(xiàn)程數(shù)量和消費(fèi)者線(xiàn)程數(shù)量之間的比率,從而實(shí)現(xiàn)更高的資源利用率。
b.如果生產(chǎn)者生成的速率比消費(fèi)者處理工作的速率快,那么工作項(xiàng)會(huì)在隊(duì)列中累積起來(lái),最終耗盡內(nèi)存。
解決方法:
使用有界隊(duì)列,當(dāng)隊(duì)列充滿(mǎn)時(shí),生產(chǎn)者將被阻塞并且不能工作,而消費(fèi)者就有時(shí)間來(lái)趕上工作處理進(jìn)度。阻塞隊(duì)列提供了offer方法,如果數(shù)據(jù)項(xiàng)不能被添加到隊(duì)列中,那么會(huì)返回一個(gè)失敗狀態(tài)。這樣我們就可以創(chuàng)建更多靈活的策略來(lái)處理負(fù)荷過(guò)載的情況。比如將數(shù)據(jù)項(xiàng)寫(xiě)入磁盤(pán),或者減少生產(chǎn)者線(xiàn)程的數(shù)量。
在構(gòu)建高可靠的應(yīng)用程序時(shí),有界隊(duì)列是一種強(qiáng)大的資源管理工具:它們能抑制并防止產(chǎn)生過(guò)多的工作項(xiàng),使應(yīng)用程序在負(fù)荷過(guò)載的情況下變得更加健壯。
- 常見(jiàn)的阻塞隊(duì)列(BlockingQueue)的實(shí)現(xiàn)
a. LinkedBlockingQueue和ArrayBlockingQueue是FIFO隊(duì)列,二者分別與LinkedList和 ArrayList類(lèi)似,但比同步list有更好的并發(fā)性。
b. PriorityBlockingQueue是一個(gè)按優(yōu)先級(jí)排序的隊(duì)列。它既可以根據(jù)元素的自然順序來(lái)比較元素,也可以使用Comparator方法來(lái)比較。
c. SynchronousQueue并不是一個(gè)真正的隊(duì)列,因?yàn)樗粫?huì)為隊(duì)列中元素維護(hù)存儲(chǔ)空間,它維護(hù)的是一組線(xiàn)程,這些線(xiàn)程在等待把元素加入或移除。因?yàn)镾ynchronousQueue沒(méi)有存儲(chǔ)功能,因此put和take方法會(huì)被阻塞,直到有另一個(gè)線(xiàn)程已經(jīng)準(zhǔn)備好參與到交付過(guò)程中。僅當(dāng)有足夠多的消費(fèi)者,并總有一個(gè)消費(fèi)者是準(zhǔn)備好交付的工作時(shí),才適合使用同步隊(duì)列。
5.3.2 串行線(xiàn)程封閉
通過(guò)將多個(gè)并發(fā)的任務(wù)存入隊(duì)列實(shí)現(xiàn)任務(wù)的串行化,并未這些串行化的任務(wù)創(chuàng)建唯一的一個(gè)工作進(jìn)程處理。
本質(zhì):使用一個(gè)開(kāi)銷(xiāo)更小的鎖(隊(duì)列鎖)去代替另一個(gè)可能開(kāi)銷(xiāo)更大的鎖(非線(xiàn)程安全對(duì)象引用的鎖)。
- 適用場(chǎng)景:
a. 需要使用非線(xiàn)程安全對(duì)象,但又不希望引入鎖。
b. 任務(wù)的執(zhí)行涉及I/O操作,不希望過(guò)多的I/O線(xiàn)程增加上下文切換。
5.3.3 雙端隊(duì)列與工作密取
Deque是一個(gè)雙端隊(duì)列,實(shí)現(xiàn)了隊(duì)列在隊(duì)列頭和隊(duì)列尾的高效插入和移除。具體實(shí)現(xiàn)包括ArrayQueue和LinkedBlockingQueue。
在工作密取中,每個(gè)消費(fèi)者都有各自的雙端隊(duì)列。如果一個(gè)消費(fèi)者完成了自己雙端隊(duì)列中的全部工作,那么它可以從其他消費(fèi)者雙端隊(duì)列末尾秘密地獲取工作。
工作密取模式比傳統(tǒng)的生產(chǎn)者-消費(fèi)者模式具有更高的可伸縮性。因?yàn)樵诖蠖鄶?shù)情況下,它們都只是訪(fǎng)問(wèn)自己的雙端隊(duì)列,從而極大地減少了競(jìng)爭(zhēng),并且當(dāng)它需要訪(fǎng)問(wèn)另一個(gè)隊(duì)列時(shí),它會(huì)從尾部獲取工作。
5.5 同步工具類(lèi)
阻塞隊(duì)列可以作為同步工具類(lèi),其他類(lèi)型的同步工具類(lèi)還包括信號(hào)量(Semaphore),柵欄(Barrier)以及閉鎖(Latch)。
5.5.1 閉鎖
閉鎖是一種同步工具類(lèi),可以延遲進(jìn)程的進(jìn)度直到其到達(dá)終止?fàn)顟B(tài)。閉鎖的作用相當(dāng)于一扇門(mén):在閉鎖達(dá)到結(jié)束狀態(tài)之前,這扇門(mén)一直是關(guān)閉狀態(tài)的,并且沒(méi)有任何線(xiàn)程能通過(guò),當(dāng)達(dá)到結(jié)束狀態(tài)時(shí),這扇門(mén)會(huì)打開(kāi)并允許所有的線(xiàn)程通過(guò)。
閉鎖可以用來(lái)確保某些活動(dòng)直到其他活動(dòng)都完成后才繼續(xù)執(zhí)行
- 確保某個(gè)計(jì)算在其需要的所有資源都被初始化之后才繼續(xù)執(zhí)行
- 確保某個(gè)服務(wù)在其依賴(lài)的所有其他服務(wù)都已經(jīng)啟動(dòng)之后才啟動(dòng)
- 等待直到某個(gè)操作的所有參與者(例如,在多玩游戲中的所有玩家)都就緒再繼續(xù)執(zhí)行
CountDownLatch是一種靈活的閉鎖實(shí)現(xiàn)
- 閉鎖包含一個(gè)計(jì)數(shù)器,該計(jì)數(shù)器初始化為一個(gè)正數(shù),表示需要等待的事件數(shù)量。
- countDown方法遞減計(jì)數(shù)器,表示有一個(gè)事件已經(jīng)發(fā)生。
- await方法等待計(jì)數(shù)器達(dá)到零。如果計(jì)數(shù)器的值為非零,那么await會(huì)一直阻塞直到計(jì)數(shù)器為零。
如下例子:
我們需要保證所有線(xiàn)程就緒后才開(kāi)始計(jì)時(shí),并保證所有線(xiàn)程完成任務(wù)后停止計(jì)時(shí),最終返回時(shí)間。
public class TestHarness{
public long timeTasks(int nThreads, final Runnable task)
throws InterruptedException{
//作為保證同時(shí)開(kāi)始執(zhí)行任務(wù)的閉鎖
final CountDownLatch startGate = new CountDownLatch(1);
//作為保證全部執(zhí)行完任務(wù)的閉鎖
final CountDownLatch endGate = new CountDownLatch(nThreads);
for (int i = 0; i < nThreads; i ++) {
Thread t = new Thread(){
public void run(){
try{
//在此處,線(xiàn)程會(huì)被阻塞,直到startGate執(zhí)行了await方法
startGate.await();
try{
task.run();
} finally{
endGate.countDown();
}
} catch (InterruptedException ignored) { }
}
};
t.start();
}
long start = System.nanoTime();
startGate.countDown();
//在此處當(dāng)前線(xiàn)程會(huì)被阻塞,直到任務(wù)線(xiàn)程全部完成任務(wù),endGate值為0,才釋放
endGate.await();
long end = system.nanoTime();
return end - start;
}
}
我們?cè)谶@里設(shè)置了兩個(gè)閉鎖,其中startGate閉鎖保證了所有線(xiàn)程就緒后才開(kāi)始計(jì)時(shí),endGate閉鎖保證了所有線(xiàn)程完成任務(wù)才結(jié)束計(jì)時(shí)。
FutureTask
FutureTask也可以用作閉鎖,是一種抽象的可生成結(jié)果的Runnable,并且處于以下三種狀態(tài):
等待運(yùn)行,正在運(yùn)行和運(yùn)行完成。
FutrueTask的閉鎖體現(xiàn)在它的get方法,如果任務(wù)完成,那么get會(huì)立即返回結(jié)果,否則get會(huì)阻塞直到任務(wù)完成進(jìn)入完成狀態(tài)。
public class Preloader{
//通過(guò)call方法返回執(zhí)行的結(jié)果
private final FutrueTask<ProductInfo> futrue =
new FutrueTask<ProductInfo>(new Callable<ProductInfo>(){
public ProductInfo call() throws DataLoadException{
return loadProductInfo();
}
});
private final Thread thread = new Thread(futrue);
public void start(){ thread.start(); }
public ProductInfo get()
throws DataLoadException,InterruptedException{
try{
return futrue.get();
} catch (ExecutionException ex){
Throwable cause = e.getCause();
if (cause instanceof DataLoadException) {
throw (DataLoadException) cause;
} else {
throw launderThrowable(cause);
}
}
}
}
如上,我們使用FutrueTask來(lái)提前加載稍后需要的數(shù)據(jù)。Preloader創(chuàng)建了一個(gè)FutureTask,其中包含從數(shù)據(jù)庫(kù)加載產(chǎn)品信息,以及一個(gè)執(zhí)行運(yùn)算的線(xiàn)程。當(dāng)程序稍后需要ProductInfo時(shí),可以調(diào)用get方法,如果數(shù)據(jù)已經(jīng)加載,那么返回這些數(shù)據(jù),否則將等待加載完成后返回。
注意:在Preloader中,當(dāng)get方法拋出ExecutionException時(shí),可能是這三種情況:Callable拋出的異常,RuntimeExcetpion,以及Error。Preloader會(huì)首先檢查已知的受檢查類(lèi)型異常,并重新拋出給它們,剩下的就是未檢查異常交給了launderThrowable來(lái)處理。
public static RuntimeException launderThrowable(Throwable t){
if (t instanceof RuntimeException)
return (RuntimeException) t;
else if (t instanceof Error)
return (Error) t;
else
throw new IllegalStateException("Not unchecked",t);
}
5.5.3 信號(hào)量
- 計(jì)數(shù)信號(hào)量(Counting Semaphore)用來(lái)同時(shí)訪(fǎng)問(wèn)某個(gè)特定資源的操作數(shù)量,或者執(zhí)行某個(gè)指定操作的數(shù)量。
- 它主要包含兩個(gè)操作acquire和release。acquire將阻塞直到有許可,release方法將釋放一個(gè)許可。
- 使用Semaphore將任何一種容器變?yōu)橛薪缱枞萜鳌?/li>
public class BoundedHashSet<T>{
private final Set<T> set;
private final Semaphore sem;
public BoundedHashSet(int bound){
this.set = Collections.synchronizedSet(new Hashset<T>());
sem = new Semaphore(bound);
}
public boolean add(T o) throws InterruptedException{
//獲取許可,若容器數(shù)量已經(jīng)達(dá)到bound,則被阻塞直到有許可被釋放
sem.acquire();
boolean wasAdded = false;
try{
wasAdded = set.add(o);
return wasAdded;
} finally{
//若添加失敗,則釋放許可
if (!wasAdded)
sem.release();
}
}
public boolean remove(Object o){
boolean wasRemoved = set.remove(o);
//若刪除失敗,則釋放許可
if (wasRemoved)
sem.release();
return wasRemoved;
}
}
如上,我們通過(guò)Semaphore實(shí)現(xiàn)了一個(gè)可阻塞的有界容器。
5.5.4 柵欄
- 閉鎖是一次性對(duì)象,一旦進(jìn)入終止?fàn)顟B(tài),就不能被重置。
- 柵欄類(lèi)似于閉鎖,它能阻塞一組線(xiàn)程直到某個(gè)事件發(fā)生。
- 柵欄與閉鎖的關(guān)鍵區(qū)別在于:所有線(xiàn)程必須同時(shí)到達(dá)柵欄位置,才能繼續(xù)執(zhí)行。閉鎖用于等待事件,而柵欄用于等待其他線(xiàn)程。
- 當(dāng)線(xiàn)程達(dá)到柵欄時(shí)將調(diào)用await方法,這個(gè)方法將阻塞直到所有的線(xiàn)程都達(dá)到這個(gè)柵欄位置。
舉個(gè)例子:有三個(gè)工人合作建橋,有三個(gè)樁,每人打一個(gè),同時(shí)打完之后才能一起搭橋(搭橋需要三個(gè)人一起合作)。也就是三個(gè)人都打完樁之后才能繼續(xù)工作。
public class CyWork implements Runnable {
private CyclicBarrier cyclicBarrier;
private String name;
public CyWork(CyclicBarrier cyclicBarrier, String name){
this.cyclicBarrier = cyclicBarrier;
this.name = name;
}
@Override
public void run() {
System.out.println(name + "正在打樁....");
try {
Thread.sleep(5000);
System.out.println(name + "完成打樁。");
//阻塞線(xiàn)程,直到所有線(xiàn)程都到達(dá)柵欄
cyclicBarrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println(name + ": 其他人都打完樁了,開(kāi)始搭橋了。");
}
public static void main(String[] args){
ExecutorService executorService = Executors.newFixedThreadPool(3);
//定義柵欄
CyclicBarrier cyclicBarrier = new CyclicBarrier(3);
CyWork work1 = new CyWork(cyclicBarrier, "張三");
CyWork work2 = new CyWork(cyclicBarrier, "李四");
CyWork work3 = new CyWork(cyclicBarrier, "王五");
executorService.execute(work1);
executorService.execute(work2);
executorService.execute(work3);
executorService.shutdown();
}
}
輸出結(jié)果:
張三正在打樁....
李四正在打樁....
王五正在打樁....
李四完成打樁。
張三完成打樁。
王五完成打樁。
王五: 其他人都打完樁了,開(kāi)始搭橋了。
李四: 其他人都打完樁了,開(kāi)始搭橋了。
張三: 其他人都打完樁了,開(kāi)始搭橋了。
如上,我們使用柵欄的方式保證三個(gè)工人都完成打樁工作后,才進(jìn)行搭橋工作。
Exchanger是一種雙方柵欄,各方在柵欄位置上交換數(shù)據(jù)。
Exchanger可以在兩個(gè)線(xiàn)程之間交換數(shù)據(jù),只能是2個(gè)線(xiàn)程,不支持更多的線(xiàn)程之間交換數(shù)據(jù)。
當(dāng)線(xiàn)程A調(diào)用Exchange對(duì)象的exchange方法之后,它會(huì)陷入阻塞狀態(tài),直到線(xiàn)程B也調(diào)用exchange方法,然后以線(xiàn)程安全的方式交換數(shù)據(jù),之后線(xiàn)程A和B繼續(xù)運(yùn)行。
舉個(gè)例子:我們模擬將錢(qián)和商品進(jìn)行交換,線(xiàn)程A持有錢(qián),線(xiàn)程B持有商品,兩者之間進(jìn)行交換。
public class ExchangeTest {
public static void main(String[] args){
ExchangeTest test = new ExchangeTest();
//定義Exchanger
Exchanger<String> exchanger = new Exchanger<>();
test.new Money(exchanger).start();
test.new Product(exchanger).start();
}
class Money extends Thread{
private String data;
private Exchanger<String> exchanger = null;
Money(Exchanger<String> exchanger){
this.exchanger = exchanger;
data = "錢(qián)";
}
@Override
public void run() {
System.out.println("線(xiàn)程" + Thread.currentThread().getName() + "正在把數(shù)據(jù)<" + data +">交換出去");
try {
Thread.sleep(3000);
//進(jìn)行交換數(shù)據(jù)
data = exchanger.exchange(data);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("線(xiàn)程" + Thread.currentThread().getName() + "換回來(lái)的數(shù)據(jù)為<" + data +">");
}
}
class Product extends Thread{
private String data;
private Exchanger<String> exchanger = null;
Product(Exchanger<String> exchanger){
this.exchanger = exchanger;
data = "商品";
}
@Override
public void run() {
System.out.println("線(xiàn)程" + Thread.currentThread().getName() + "正在把數(shù)據(jù)<" + data +">交換出去");
try {
Thread.sleep(3000);
//進(jìn)行交換數(shù)據(jù)
data = exchanger.exchange(data);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("線(xiàn)程" + Thread.currentThread().getName() + "換回來(lái)的數(shù)據(jù)為<" + data+">");
}
}
}
輸出結(jié)果:
線(xiàn)程Thread-0正在把數(shù)據(jù)<錢(qián)>交換出去
線(xiàn)程Thread-1正在把數(shù)據(jù)<商品>交換出去
線(xiàn)程Thread-0換回來(lái)的數(shù)據(jù)為<商品>
線(xiàn)程Thread-1換回來(lái)的數(shù)據(jù)為<錢(qián)>
如上,我們可以看到兩個(gè)線(xiàn)程之間成功地進(jìn)行了交換數(shù)據(jù)。
- 適用場(chǎng)景:當(dāng)兩個(gè)線(xiàn)程執(zhí)行不對(duì)稱(chēng)的操作時(shí),Exchanger會(huì)非常有用。
比如當(dāng)一個(gè)線(xiàn)程向緩沖區(qū)寫(xiě)入數(shù)據(jù),而另一個(gè)線(xiàn)程從緩存區(qū)讀取數(shù)據(jù)。這些線(xiàn)程可以通過(guò)Exchanger來(lái)進(jìn)行匯合,并將滿(mǎn)的緩沖區(qū)與空的緩沖區(qū)進(jìn)行交換。
5.6 構(gòu)建高效可伸縮的結(jié)果緩存
重用之前的計(jì)算結(jié)果能降低延遲,提高吞吐量,但卻需要消耗更多的內(nèi)存。
- 使用HashMap和同步機(jī)制來(lái)初始化緩存
public interface Computable<A,V>{
V compute(A arg) throws InterruptedException;
}
public class ExpensiveFunction implements Computable<String,BigInteger>{
public BigInteger compute(String arg){
//經(jīng)過(guò)長(zhǎng)時(shí)間的計(jì)算后
return new BigInteger(arg);
}
}
public class Memoizer<A,V> implements Computable<A,V>{
private final Map<A,V> cache = new HashMap<A,V>();
private final Computable<A,V> c;
public Memoizer(Computable<A,V> c){
this.c = c;
}
public synchronized V compute(A arg) throws InterruptedException{
V result = cache.get(arg);
if (result == null){
result = c.compute(arg);
cache.put(arg,result);
}
return result;
}
}
如上,因?yàn)镠ashMap是線(xiàn)程不安全的,因此我們使用了synchronized關(guān)鍵字來(lái)保證線(xiàn)程安全性。雖然這樣的操作能夠保證線(xiàn)程安全,但每次只能有一個(gè)線(xiàn)程能夠執(zhí)行compute方法。如果一個(gè)線(xiàn)程正在計(jì)算結(jié)果,那么其他調(diào)用compute方法的線(xiàn)程將長(zhǎng)時(shí)間被阻塞。
- 使用ConcurrentHashMap代替HashMap
public class Memoizer<A,V> implements Computable<A,V>{
private final Map<A,V> cache = new ConcurrentHashMap<A,V>();
private final Computable<A,V> c;
public Memoizer(Computable<A,V> c){
this.c = c;
}
public V compute(A arg) throws InterruptedException{
V result = cache.get(arg);
if (result == null){
result = c.compute(arg);
cache.put(arg,result);
}
return result;
}
}
這個(gè)方法并第一個(gè)方法有更好的并發(fā)性,多線(xiàn)程可以并發(fā)地訪(fǎng)問(wèn)。但這里卻存在一個(gè)不足之處:當(dāng)兩個(gè)線(xiàn)程同時(shí)調(diào)用compute時(shí)存在一個(gè)漏洞,可能會(huì)導(dǎo)致計(jì)算得到相同的結(jié)果。比如線(xiàn)程A和線(xiàn)程B都攜帶數(shù)據(jù)arg="test",若此時(shí)緩存中不存在這樣的緩存,那么線(xiàn)程A先判斷為null,那么進(jìn)行計(jì)算,計(jì)算時(shí),線(xiàn)程B盡量進(jìn)行判斷為null,同樣B也會(huì)進(jìn)行計(jì)算,那么這樣就完全失去了緩存的意義了,特別是當(dāng)多個(gè)線(xiàn)程在進(jìn)行取相同的值時(shí)。
實(shí)際上,上面的問(wèn)題在于:如果某個(gè)線(xiàn)程啟動(dòng)了一個(gè)開(kāi)銷(xiāo)很大的計(jì)算,而其他線(xiàn)程并不知道這個(gè)計(jì)算正在進(jìn)行,那么很可能重復(fù)這個(gè)計(jì)算。
- 基于FutrueTask的Memoizing封裝器
public class Memoizer<A,V> implements Computable<A,V>{
private final Map<A,Futrue<V>> cache = new ConcurrentHashMap<A,Futrue<V>>();
private final Computable<A,V> c;
public Memoizer(Computable<A,V> c){
this.c = c;
}
public V compute(A arg) throws InterruptedException{
Futrue<V> f = cache.get(arg);
if (f == null) {
Callable<V> cal = new Callable<V>(){
public V call() throws InterruptedException{
return c.compute(arg);
}
};
FutrueTask<V> ft = new FutrueTask<V>(cal);
f = ft;
cache.put(arg,ft);
//在這里調(diào)用c.compute方法
ft.run();
}
try{
return f.get();
} catch (ExecutionException e){
throw launderThrowable(e.getCause());
}
}
}
在這里,使用FutrueTask來(lái)表示返回的結(jié)果,我們知道FutrueTask的特點(diǎn)是:若結(jié)果已經(jīng)計(jì)算出來(lái),則立即返回結(jié)果。如果其他線(xiàn)程正在計(jì)算該結(jié)果,那么新到的線(xiàn)程會(huì)一直等待這個(gè)結(jié)果計(jì)算出來(lái)。
但這里依舊存在一個(gè)缺陷:仍然存在兩個(gè)線(xiàn)程計(jì)算出相同的結(jié)果,但這個(gè)漏洞發(fā)生的幾率遠(yuǎn)小于上面的方法。造成這個(gè)缺陷的原因是compute方法中的if代碼塊是非原子的”先檢查后執(zhí)行“操作。因此,存在兩個(gè)線(xiàn)程同一時(shí)間內(nèi)調(diào)用compute方法判斷到結(jié)果為null,然后進(jìn)行計(jì)算。
- 最終版本:
public class Memoizer<A,V> implements Computable<A,V>{
private final Map<A,Futrue<V>> cache = new ConcurrentHashMap<A,Futrue<V>>();
private final Computable<A,V> c;
public Memoizer(Computable<A,V> c){
this.c = c;
}
public V compute(A arg) throws InterruptedException{
Futrue<V> f = cache.get(arg);
if (f == null) {
Callable<V> cal = new Callable<V>(){
public V call() throws InterruptedException{
return c.compute(arg);
}
};
FutrueTask<V> ft = new FutrueTask<V>(cal);
f = cache.putIfAbsent(arg,ft);
if (f == null) {f = ft; ft.run(); }
}
try{
return f.get();
} catch (ExecutionException e){
throw launderThrowable(e.getCause());
}
}
}
如上,我們采用ConcurrentHashMap中的putIfAbsent方法來(lái)解決這個(gè)非原子操作問(wèn)題,先判斷是否存在這個(gè)計(jì)算,若存在,則不會(huì)添加。
在因式分解servlet中使用Memoizer來(lái)緩存結(jié)果
public class Factorizer implements Servlet{
//調(diào)用計(jì)算方法,并返回結(jié)果
private final Computable<BigInteger,BigInteger[]> c =
new Computable<BigInteger,BigInteger[]>(){
public BigInteger[] compute(BigInteger arg){
return factor(arg);
}
}
//設(shè)置緩存
private final Computable<BigInteger,BigInteger[]> cache =
new Memoizer<BigInteger,BigInteger[]>(c);
public void service(ServletRequest req, ServletResponse resp){
try{
BigInteger i = extractFromRequest(req);
encodeIntoResponse(resp, cache.get(i));
} catch (InterruptedException e){
encodeIntoResponse(resp, "factorization interrupted");
}
}
}