目錄

目錄.png
概念
- 并行是指兩個(gè)或者多個(gè)事件在同一時(shí)刻發(fā)生(cpu多核);而并發(fā)是指兩個(gè)或多個(gè)事件在同一時(shí)間間隔內(nèi)發(fā)生
饑餓,死鎖, 活鎖
- 饑餓: 優(yōu)先級(jí)低的線程得到執(zhí)行的機(jī)會(huì)很小,就可能發(fā)生線程饑餓
- 饑餓預(yù)防: 要避免使用線程優(yōu)先級(jí),因?yàn)檫@會(huì)增加平臺(tái)依賴性,并可能導(dǎo)致活躍性(某件正確的事情最終會(huì)發(fā)生,主要問(wèn)題包括死鎖、饑餓、以及活鎖)問(wèn)題。在大多數(shù)并發(fā)應(yīng)用程序中,都可以使用默認(rèn)的線程優(yōu)先級(jí)
- 活鎖: 過(guò)度的錯(cuò)誤處理代碼導(dǎo)致無(wú)限循環(huán)處理錯(cuò)誤,當(dāng)多線程中出現(xiàn)了相互謙讓,都主動(dòng)將資源釋放給別的線程使用,有可能自動(dòng)解決。
- 活鎖預(yù)防: 比如重試機(jī)制中引入隨機(jī)性(不同時(shí)間)。 例如,在網(wǎng)絡(luò)上,如果有兩臺(tái)機(jī)器嘗試使用相同的載波來(lái)發(fā)送數(shù)據(jù)包,那么這些數(shù)據(jù)包就會(huì)發(fā)生沖突。這兩臺(tái)機(jī)器都檢查到了沖突,并都在稍后再次發(fā)送。 如果二者都選擇了在0.1秒后重試,那么會(huì)再次沖突,并且不斷沖突下去,這時(shí)候需要改變重試時(shí)間。
- 死鎖: 多個(gè)線程相互占用對(duì)方的資源的鎖,而又相互等對(duì)方釋放鎖。
- 死鎖條件:
(1) 互斥條件:一個(gè)資源每次只能被一個(gè)進(jìn)程使用。
(2) 請(qǐng)求與保持條件:一個(gè)進(jìn)程因請(qǐng)求資源而阻塞時(shí),對(duì)已獲得的資源保持不放。
(3) 不剝奪條件:進(jìn)程已獲得的資源,在末使用完之前,不能強(qiáng)行剝奪。
(4) 循環(huán)等待條件:若干進(jìn)程之間形成一種頭尾相接的循環(huán)等待資源關(guān)系
ForkJoinPool
- ForkJoinPool 不是為了替代 ExecutorService,而是它的補(bǔ)充,在某些應(yīng)用場(chǎng)景下性能比 ExecutorService 更好。
- ForkJoinPool 主要用于實(shí)現(xiàn)“分而治之”的算法,特別是分治之后遞歸調(diào)用的函數(shù),例如 quick sort 等。
// newWorkStealingPool線程池的實(shí)現(xiàn)用到了ForkJoinPool,用到了分而治之,遞歸計(jì)算的算法, 搶占式
ExecutorService exec = Executors.newWorkStealingPool();
- ForkJoinPool 最適合的是計(jì)算密集型的任務(wù),如果存在 I/O,線程間同步,sleep() 等會(huì)造成線程長(zhǎng)時(shí)間阻塞的情況, 不太適合
- parallel()并行流并不一定有想象中那么美好,有時(shí)候并行流可能增加時(shí)間,導(dǎo)致時(shí)間增加的一個(gè)重要原因是處理器內(nèi)存緩存限制, parallel底層就是用到了ForkJoinPool
- ForkJoinPool示例
public class TestForkJoinCalculator {
private final ForkJoinPool pool;
//執(zhí)行任務(wù)RecursiveTask:有返回值 RecursiveAction:無(wú)返回值
private static class SumTask extends RecursiveTask<Long> {
private final long[] numbers;
private final int from;
private final int to;
public SumTask(long[] numbers, int from, int to) {
this.numbers = numbers;
this.from = from;
this.to = to;
}
//此方法為ForkJoin的核心方法:對(duì)任務(wù)進(jìn)行拆分 拆分的好壞決定了效率的高低
@Override
protected Long compute() {
// 當(dāng)需要計(jì)算的數(shù)字個(gè)數(shù)小于6時(shí),直接采用for loop方式計(jì)算結(jié)果
if (to - from < 6) {
long total = 0;
for (int i = from; i <= to; i++) {
total += numbers[i];
}
return total;
} else { // 否則,把任務(wù)一分為二,遞歸拆分(注意此處有遞歸)到底拆分成多少分 需要根據(jù)具體情況而定
int middle = (from + to) / 2;
SumTask taskLeft = new SumTask(numbers, from, middle);
SumTask taskRight = new SumTask(numbers, middle + 1, to);
taskLeft.fork();
taskRight.fork();
return taskLeft.join() + taskRight.join();
}
}
}
public TestForkJoinCalculator() {
// 也可以使用公用的線程池 ForkJoinPool.commonPool():
// pool = ForkJoinPool.commonPool()
pool = new ForkJoinPool();
}
public long sumUp(long[] numbers) {
Long result = pool.invoke(new SumTask(numbers, 0, numbers.length - 1));
pool.shutdown();
return result;
}
public static void main(String[] args) {
long[] numbers = LongStream.rangeClosed(1, 10000000).toArray();
StopWatch stopWatch = new StopWatch();
stopWatch.start();
TestForkJoinCalculator calculator = new TestForkJoinCalculator();
long result = calculator.sumUp(numbers);
stopWatch.stop();
System.out.println("耗時(shí):" + stopWatch.getTotalTimeMillis() + "ms");
System.out.println("結(jié)果為:" + result);
}
}
-
原理:
參考文章4的圖
原理圖.png
- 每個(gè)工作線程在運(yùn)行中產(chǎn)生新的任務(wù)(通常是因?yàn)檎{(diào)用了 fork())時(shí),會(huì)放入工作隊(duì)列的隊(duì)尾,并且工作線程在處理自己的工作隊(duì)列時(shí),使用的是 LIFO 方式,也就是說(shuō)每次從隊(duì)尾取出任務(wù)來(lái)執(zhí)行。
- 每個(gè)工作線程在處理自己的工作隊(duì)列同時(shí),會(huì)嘗試竊取一個(gè)任務(wù)(或是來(lái)自于剛剛提交到 pool 的任務(wù),或是來(lái)自于其他工作線程的工作隊(duì)列),竊取的任務(wù)位于其他線程的工作隊(duì)列的隊(duì)首,也就是說(shuō)工作線程在竊取其他工作線程的任務(wù)時(shí),使用的是 FIFO 方式
- 在遇到 join() 時(shí),如果需要 join 的任務(wù)尚未完成,則會(huì)先處理其他任務(wù),并等待其完成
- 線程默認(rèn): 如果沒(méi)有指定,則默認(rèn)為Runtime.getRuntime().availableProcessors() - 1. 或者設(shè)置啟動(dòng)參數(shù):
-Djava.util.concurrent.ForkJoinPool.common.parallelism=8
- 當(dāng)使用ThreadPoolExecutor時(shí),使用分治法(分治時(shí)任務(wù)會(huì)很多)會(huì)存在問(wèn)題,因?yàn)門(mén)hreadPoolExecutor中的線程無(wú)法像任務(wù)隊(duì)列中再添加一個(gè)任務(wù)并且在等待該任務(wù)完成之后再繼續(xù)執(zhí)行。而使用ForkJoinPool時(shí),就能夠讓其中的線程創(chuàng)建新的任務(wù),并掛起當(dāng)前的任務(wù),此時(shí)線程就能夠從隊(duì)列中選擇子任務(wù)執(zhí)行。ThreadPoolExecutor中的Thread無(wú)法選擇優(yōu)先執(zhí)行子任務(wù)
基礎(chǔ)
while true
- while true運(yùn)行的線程除了中斷,任務(wù)終止的最佳方法是設(shè)置任務(wù)周期性檢查的標(biāo)志
然后任務(wù)可以通過(guò)自己的 shutdown 進(jìn)程并正常終止。不是在任務(wù)中隨機(jī)關(guān)閉線程,而是要求任務(wù)在到達(dá)了一個(gè)較好時(shí)自行終止。
線程等待其他線程
join,countDownLatch, CyclicBarrier
CompletableFuture
-
CompletableFuture是受guava的的listenable future啟發(fā)寫(xiě)的, 比Future更多功能,但是項(xiàng)目中實(shí)現(xiàn)多線程調(diào)用接口,并合并結(jié)果是用的是封裝的RxJava2框架。RxJava2更能更多,比如CompletableFuture不支持lazy,RxJava2支持。復(fù)用了參考文章3的圖
對(duì)比
構(gòu)造方法非線程安全
- 構(gòu)造函數(shù)和普通函數(shù)一樣,并不是默認(rèn)被synchronized 的,有可能出現(xiàn)同步問(wèn)題, 比如有靜態(tài)變量在構(gòu)造函數(shù)處理,就有可能出現(xiàn)問(wèn)題。
- 構(gòu)造方法不需要同步化,因?yàn)樗豢赡馨l(fā)生在一個(gè)線程里,在構(gòu)造方法返回值前沒(méi)有其他線程可以使用該對(duì)象。
- this逃逸問(wèn)題: this逃逸是指在構(gòu)造函數(shù)返回之前其他線程就持有該對(duì)象的引用。this逃逸經(jīng)常發(fā)生在構(gòu)造函數(shù)中啟動(dòng)線程或注冊(cè)監(jiān)聽(tīng)器時(shí)
public class ThisEscape {
public ThisEscape() {
new Thread(new EscapeRunnable()).start();
// ...其他代碼
}
private class EscapeRunnable implements Runnable {
@Override
public void run() {
// 在這里通過(guò)ThisEscape.this就可以引用外圍類對(duì)象, 但是此時(shí)外圍類對(duì)象可能還沒(méi)有構(gòu)造完成, 即發(fā)生了外圍類的this引用的逃逸
}
}
}
改造:
public class ThisEscape {
private Thread t;
public ThisEscape() {
t = new Thread(new EscapeRunnable());
// ...其他代碼
}
public void init() {
t.start();
}
private class EscapeRunnable implements Runnable {
@Override
public void run() {
// 在這里通過(guò)ThisEscape.this就可以引用外圍類對(duì)象, 此時(shí)可以保證外圍類對(duì)象已經(jīng)構(gòu)造完成
}
}
}
并發(fā)編程缺點(diǎn)
- 在線程等待共享資源時(shí)會(huì)降低速度。
- 線程管理產(chǎn)生額外 CPU 開(kāi)銷(xiāo)。
- 糟糕的設(shè)計(jì)決策帶來(lái)無(wú)法彌補(bǔ)的復(fù)雜性。
創(chuàng)建一個(gè) Thread, jvm做的事情
- 程序計(jì)數(shù)器,指明要執(zhí)行的下一個(gè) JVM 字節(jié)碼指令。
- 用于支持 Java 代碼執(zhí)行的棧
- 第二個(gè)則用于 native code(本機(jī)方法代碼)執(zhí)行的棧
- thread-local variables (線程本地變量)的存儲(chǔ)區(qū)域
- 用于控制線程的狀態(tài)管理變量
超線程
- 指一種硬件技巧,能在單個(gè)處理器上產(chǎn)生非??焖俚纳舷挛那袚Q,在某些情況下可以使內(nèi)核看起來(lái)像運(yùn)行兩個(gè)硬件線(Java線程之間切換上下文是有代價(jià))
線程數(shù)量
- 可以有幾千個(gè), 但是要考慮上下文切換代價(jià)
wait, notify
- wait方法需要釋放鎖,前提條件是它已經(jīng)持有鎖。所以wait和notify(或者notifyAll)方法都必須被包裹在synchronized語(yǔ)句塊中,并且synchronized后鎖的對(duì)象應(yīng)該與調(diào)用wait方法的對(duì)象一樣。否則拋出IllegalMonitorStateException。wait是在當(dāng)前線程持有wait對(duì)象鎖的情況下,暫時(shí)放棄鎖,并讓出CPU資源,并積極等待其它線程調(diào)用同一對(duì)象的notify或者notifyAll方法。
- 并發(fā)工具優(yōu)先于wait和notify
volatile
- 如果你正在嘗試調(diào)試其他人的并發(fā)代碼,請(qǐng)首先查找使用 volatile 的代碼并將其替換為Atomic 變量,成本低很多(java8之后)。
字分裂
- (在 Java 中 long 和 double 類型都是 64 位),寫(xiě)入變量的過(guò)程分兩步進(jìn)行,就會(huì)發(fā)生 Word tearing (字分裂)情況。 JVM 被允許將64位數(shù)量的讀寫(xiě)作為兩個(gè)單獨(dú)的32位操作執(zhí)行用 volatile 修飾符定義一個(gè) long 或 double 變量,可阻止字分裂情況
volatile可見(jiàn)性
- 當(dāng)一個(gè)任務(wù)更改標(biāo)志值時(shí),這些更改可以存儲(chǔ)在本地處理器緩存中,而不會(huì)刷新到主內(nèi)存
使用 AtomicBoolean 類型作為標(biāo)志值的辦法替代volatile也可以。發(fā)送一條Lock前綴的指令會(huì)強(qiáng)制將對(duì)緩存的修改操作立即寫(xiě)入主內(nèi)存。 - 如果是寫(xiě)操作,它會(huì)導(dǎo)致其他CPU中對(duì)應(yīng)的緩存行無(wú)效。為了保證各個(gè)處理器緩存一致,每個(gè)處理會(huì)通過(guò)嗅探在總線上傳播的數(shù)據(jù)來(lái)檢查 自己的緩存是否過(guò)期,當(dāng)處理器發(fā)現(xiàn)自己緩存行對(duì)應(yīng)的內(nèi)存地址被修改了,就會(huì)將當(dāng)前處理器的緩存行設(shè)置成無(wú)效狀態(tài)。
volatile 重排與 Happen-Before 原則
happens before 擔(dān)保原則( volatile (易變性)操作通常稱為 memory barrier (通過(guò)Lock前綴指令生內(nèi)存屏障)
one,two,three 變量賦值操作就可以被重排, xyz也是
happens before 擔(dān)保原則確保 volatile 變量的讀寫(xiě)指令不能跨過(guò)內(nèi)存屏障進(jìn)行重排
happens before 擔(dān)保原則還有另一個(gè)作用:當(dāng)線程向一個(gè) volatile 變量寫(xiě)入時(shí),在線程寫(xiě)入之前的其他所有變量(包括非 volatile 變量)也會(huì)刷新到主內(nèi)存。當(dāng)線程讀取一個(gè) volatile 變量時(shí),它也會(huì)讀取其他所有變量(包括非 volatile 變量)與 volatile 變量一起刷新到主內(nèi)存
public void run() {
one = 1;
two = 2;
three = 3;
volaTile = 92;
int x = four;
int y = five;
int z = six;
}
happens-before原則
- JMM規(guī)定了JVM必須遵循一組最小保證
- 在JMM中,如果一個(gè)操作執(zhí)行的結(jié)果需要對(duì)另一個(gè)操作可見(jiàn),那么這兩個(gè)操作之間必須存在happens-before關(guān)系。HB 原則是對(duì)單線程環(huán)境下的指令重排序以及多線程環(huán)境下的線程間數(shù)據(jù)的一致性進(jìn)行的約束。
- happen-before原則是JMM中非常重要的原則,它是判斷數(shù)據(jù)是否存在競(jìng)爭(zhēng)、線程是否安全的主要依據(jù),保證了多線程環(huán)境下的可見(jiàn)性。
- 單線程happen-before原則:在同一個(gè)線程中,前面的操作產(chǎn)生的結(jié)果必須對(duì)后面的操作可見(jiàn),書(shū)寫(xiě)在前面的操作happen-before后面的操作(必須有數(shù)據(jù)依賴,無(wú)數(shù)據(jù)依賴則有可能指令重排)。
- 鎖的happen-before原則:同一個(gè)鎖的unlock操作happen-before此鎖的lock操作
- volatile的happen-before原則: 對(duì)一個(gè)volatile變量的寫(xiě)操作happen-before對(duì)此變量的任意操作。
- happen-before的傳遞性原則: 如果A操作 happen-before B操作,B操作happen-before C操作,那么A操作happen-before C操作。
- 線程啟動(dòng)的happen-before原則:同一個(gè)線程的start方法happen-before此線程的其它方法
- 線程中斷的happen-before原則:對(duì)線程interrupt方法的調(diào)用happen-before被中斷線程的檢測(cè)到中斷發(fā)送的代碼(interrupt 方法改變的狀態(tài)必須對(duì)后續(xù)執(zhí)行的檢測(cè)方法可見(jiàn))
- 線程終結(jié)的happen-before原則:線程中的所有操作都happen-before線程的終止檢測(cè)。
- 對(duì)象創(chuàng)建的happen-before原則:一個(gè)對(duì)象的初始化完成先于他的finalize方法調(diào)用。
原子性
- 同步機(jī)制強(qiáng)制多核處理器系統(tǒng)上的一個(gè)任務(wù)做出的修改必須在應(yīng)用程序中是可見(jiàn)的。如果沒(méi)有同步機(jī)制,那么修改時(shí)可見(jiàn)性將無(wú)法確認(rèn)。原子性并不保證可見(jiàn)性。
- automic原子類: 快速、無(wú)鎖的操作,它們是利用了現(xiàn)代處理器上可用的機(jī)器級(jí)原子性
- 使用顯示鎖ReentrantLock之類的: 如果使用 synchronized 關(guān)鍵字失敗,就會(huì)拋出異常,但是你沒(méi)有機(jī)會(huì)進(jìn)行任何清理以保持系統(tǒng)處于良好狀態(tài)。而使用顯式鎖對(duì)象,可以使用 finally 子句在系統(tǒng)中維護(hù)適當(dāng)?shù)臓顟B(tài)。顯式鎖比起內(nèi)置同步鎖提供更細(xì)粒度的加鎖和解鎖控制。
- 無(wú)鎖: cow cas, ConcurrentHashMap 不會(huì)拋出concurrentmodificationexception(并發(fā)安全的,迭代安全,但是迭代完整沒(méi)辦法保證)CopyOnWriteArrayList 的其中一個(gè)好處是,當(dāng)多個(gè)迭代器遍歷和修改列表時(shí),它不會(huì)拋出 ConcurrentModificationException 異常。對(duì)集合迭代時(shí) 對(duì)原集合進(jìn)行一份拷貝,對(duì)拷貝的新元素進(jìn)行迭代,這叫安全失敗。
- cas: 在 比較并交換 (CAS) 中,你從內(nèi)存中獲取一個(gè)值,并在計(jì)算新值時(shí)保留原始值。然后使用 CAS 指令,它將原始值與當(dāng)前內(nèi)存中的值進(jìn)行比較,如果這兩個(gè)值是相等的,則將內(nèi)存中的舊值替換為計(jì)算新值的結(jié)果,所有操作都在一個(gè)原子操作中完成。許多現(xiàn)代處理器的匯編語(yǔ)言中都有一條 CAS 指令,并且也被 JVM 中的 CAS 操作(例如 Atomic 類中的操作)所使用。CAS 指令在硬件層面中是原子性的,并且與你所期望的操作一樣快。
java并發(fā)中常見(jiàn)的鎖
- 偏向鎖,輕量級(jí)鎖,重量級(jí)鎖
- 偏向鎖: 其核心的思想是,如果程序沒(méi)有競(jìng)爭(zhēng),則取消之前已經(jīng)取得鎖的線程同步操作。也就是鎖消除
- 輕量級(jí)鎖: 其他線程會(huì)通過(guò)自旋的形式嘗試獲取鎖,不會(huì)阻塞,提高性能,自旋等待
- 重量級(jí)鎖:synchronized這類的,追求吞吐量。同步塊執(zhí)行速度較長(zhǎng)。
- 樂(lè)觀鎖,悲觀鎖
- 樂(lè)觀鎖,認(rèn)為多線程競(jìng)爭(zhēng)不激烈,競(jìng)爭(zhēng)不激烈的情況下, 在數(shù)據(jù)庫(kù)表中增加版本號(hào),先查數(shù)據(jù),然后根據(jù)之前的版本號(hào)更新這條數(shù)據(jù),找得到更新,找不到不更,這就是樂(lè)觀鎖的實(shí)現(xiàn)。
- 悲觀鎖: 認(rèn)為多線程競(jìng)爭(zhēng)激烈。
- 公平鎖,非公平鎖
- sychronized為非公平鎖,鎖獲取隨機(jī), 公平鎖的鎖獲取是根據(jù)申請(qǐng)時(shí)間的,非公平鎖處理的快。
- 可重入鎖
- 可重入鎖又名遞歸鎖,是指在同一個(gè)線程在外層方法獲取鎖的時(shí)候,在進(jìn)入內(nèi)層方法會(huì)自動(dòng)獲取鎖。
- 獨(dú)享鎖/共享鎖
- 獨(dú)享鎖是指該鎖一次只能被一個(gè)線程所持有;共享鎖是指該鎖可被多個(gè)線程所持有。
- 對(duì)于Java ReentrantLock而言,其是獨(dú)享鎖。但是對(duì)于Lock的另一個(gè)實(shí)現(xiàn)類ReadWriteLock,其讀鎖是共享鎖,其寫(xiě)鎖是獨(dú)享鎖。
- 分段鎖
- 分段鎖其實(shí)是一種鎖的設(shè)計(jì),并不是具體的一種鎖,對(duì)于ConcurrentHashMap而言,其并發(fā)的實(shí)現(xiàn)就是通過(guò)分段鎖的形式來(lái)實(shí)現(xiàn)高效的并發(fā)操
多線程獲取結(jié)果對(duì)比RxJava2使用
- 多線程版本(偽代碼)
private Response asyncHandle(SearchRequest request){
Response response = new HResponse();
response.setLowRates(new ArrayList<>());
List<Future<Response>> futures = new ArrayList<>();
while (iteratorCity.hasNext()) {
futures.add(threadPool.submit(new SearchRunner(countryID, request));
}
while(iter.hasNext()) {
if(timeout(start)) {
System.ou.println("....");
break;
}
Future<Response> future = iter.next();
try {
Response resp = future.get(Config.getSingleTimeout(), TimeUnit.MILLISECONDS);
if(resp != null) {
response.getLowRates().addAll(resp.getLowRates());
iter.remove();
}
}
// 省略catch
}
return response;
}
- RxJava2版本
- 異步操作,中間執(zhí)行的任務(wù)可以是異步網(wǎng)絡(luò)操作,控制socket timeout之類的可以在這塊處理。更優(yōu)雅。
// 常見(jiàn)的示例,這是一個(gè)異步操作
Single.create(new Single.OnSubscribe<Integer>() {
@Override
public void call(SingleSubscriber<? super Integer> singleSubscriber) {
// 這里被指定在IO線程
singleSubscriber.onSuccess(addValue(1, 2));
}
})
.subscribeOn(Schedulers.io())// 指定運(yùn)行在IO線程
.subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() { }
@Override
public void onError(Throwable e) { }
@Override
public void onNext(Integer o) {
// o = 3
}
});
- zip
Single.zip(s1, s2, new Func2<Integer, Integer, String>() {
@Override
public String call(Integer o, Integer o2) {
LogHelper.e("A:" + o + "=" + o2);
return null;
}}).subscribe(new Action1<String>() {
@Override
public void call(String s) {
LogHelper.e("kk:"+s);
}
});

