并發(fā)工具類
通常我們所說的并發(fā)包也就是java.util.concurrent(JUC),集中了Java并發(fā)的各種工具類, 合理地使用它們能幫忙我們快速地完成功能 。
- 作者: 博學(xué)谷狂野架構(gòu)師
- GitHub:GitHub地址 (有我精心準(zhǔn)備的130本電子書PDF)
只分享干貨、不吹水,讓我們一起加油!??
1. CountDownLatch
CountDownLatch是一個同步計(jì)數(shù)器,初始化的時候 傳入需要計(jì)數(shù)的線程等待數(shù),可以是需要等待執(zhí)行完成的線程數(shù),或者大于 ,一般稱為發(fā)令槍。\
countdownlatch 是一個同步類工具,不涉及鎖定,當(dāng)count的值為零時當(dāng)前線程繼續(xù)運(yùn)行,不涉及同步,只涉及線程通信的時候,使用它較為合適

1.1 作用
用來協(xié)調(diào)多個線程之間的同步,或者說起到線程之間的通信(而不是用作互斥的作用),是一組線程等待其他的線程完成工作以后在執(zhí)行,相當(dāng)于加強(qiáng)版join。
注意:這是一個一次性操作 - 計(jì)數(shù)無法重置。 如果你需要一個重置的版本計(jì)數(shù),考慮使用CyclicBarrier。
1.2 舉例
我們?nèi)ソM團(tuán)游玩一樣,總共30個人,來的人要等待還沒有到的人,一直等到第30個人到了,我們才開始出發(fā),在等待過程中,其他人(線程)是等待狀態(tài)不做任何事情的,一直等所有人(線程)到齊了(準(zhǔn)備完成)才開始執(zhí)行。
1.3 概念
- countDownLatch這個類使一個線程等待其他線程各自執(zhí)行完畢后再執(zhí)行。
- 是通過一個計(jì)數(shù)器來實(shí)現(xiàn)的,計(jì)數(shù)器的初始值是線程的數(shù)量。每當(dāng)一個線程執(zhí)行完畢后,計(jì)數(shù)器的值就-1,當(dāng)計(jì)數(shù)器的值為0時,表示所有線程都執(zhí)行完畢,然后在閉鎖上等待的線程就可以恢復(fù)工作了。
我們打開CountDownLatch的源代碼分析,我們發(fā)現(xiàn)最重要的方法就是一下這兩個方法:
//阻塞當(dāng)前線程,等待其他線程執(zhí)行完成,直到計(jì)數(shù)器計(jì)數(shù)值減到0。
public void await() throws InterruptedException;
//阻塞當(dāng)前線程指定的時間,如果達(dá)到時間就放行,等待其他線程執(zhí)行完成,直到計(jì)數(shù)器計(jì)數(shù)值減到0。
public boolean await(long timeout, TimeUnit unit) throws InterruptedException
//負(fù)責(zé)計(jì)數(shù)器的減一。
public void countDown():
1.4 應(yīng)用場景
1.4.1 多線程壓測
有時我們想同時啟動多個線程,實(shí)現(xiàn)最大程度的并行性。
例如,我們想測試一個單例類。如果我們創(chuàng)建一個初始計(jì)數(shù)為1的CountDownLatch,并讓所有線程都在這個鎖上等待,那么我們可以很輕松地完成測試。我們只需調(diào)用 一次countDown()方法就可以讓所有的等待線程同時恢復(fù)執(zhí)行。
1.4.2 等待其他線程
例如應(yīng)用程序啟動類要確保在處理用戶請求前,所有N個外部系統(tǒng)已經(jīng)啟動和運(yùn)行了,例如處理excel中多個表單,如果一個一個出來很耗IO和性能,我們可以等100或者1000個線程都完成了表單的操作后一下子寫進(jìn)excel表單中。

注意:一個線程不一定只能做countDown一次,也可以countDown多次
1.5 示例
1.5.1 準(zhǔn)備完成后執(zhí)行
在實(shí)際項(xiàng)目中可能有些線程需要資源準(zhǔn)備完成后才能進(jìn)行執(zhí)行,這個時候就可以使用countDownLatch
package chapter02.countdownlatch;
import java.util.Random;
import java.util.concurrent.*;
/**
* countdownlatch 示例
*/
public class CountDownLatchTest {
private static ExecutorService executorService = Executors.newFixedThreadPool(10);
private static Random random = new Random();
public static void execute(CountDownLatch countDownLatch) {
//獲取一個隨機(jī)數(shù)
long sleepTime = random.nextInt(10);
//獲取線程ID
long threadId = Thread.currentThread().getId();
System.out.println("線程ID" + threadId + ",開始執(zhí)行--countDown");
try {
//睡眠隨機(jī)秒
Thread.sleep(sleepTime * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
//計(jì)數(shù)器減1
countDownLatch.countDown();
System.out.println("線程ID" + threadId + ",準(zhǔn)備任務(wù)完成耗時:" + sleepTime + "當(dāng)前時間" + System.currentTimeMillis());
try {
//線程等待其他任務(wù)完成后喚醒
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("線程ID" + threadId + ",開始執(zhí)行任務(wù),當(dāng)前時間:" + System.currentTimeMillis());
}
public static void main(String[] args) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(5);
for (int i = 0; i < 5; i++) {
executorService.submit(() -> {
execute(countDownLatch);
});
}
//線程等待其他任務(wù)完成后喚醒
countDownLatch.await();
Thread.sleep(1000);
executorService.shutdown();
System.out.println("全部任務(wù)執(zhí)行完成");
}
}
1.5.2 多線程壓測
在實(shí)戰(zhàn)項(xiàng)目中,我們除了使用 jemter 等工具進(jìn)行壓測外,還可以自己動手使用 CountDownLatch 類編寫壓測代碼。
可以說 jemter 的并發(fā)壓測背后也是使用的 CountDownLatch,可見掌握 CountDownLatch 類的使用是有多么的重要, **CountDownLatch**是Java多線程同步器的四大金剛之一,CountDownLatch能夠使一個線程等待其他線程完成各自的工作后再執(zhí)行。
package chapter02.countdownlatch;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* countDownLatch 壓測
*/
public class CountDownLatchPressure {
/**
* 壓測業(yè)務(wù)代碼
*/
public void testLoad() {
System.out.println("壓測:" + Thread.currentThread().getId() + ":" + System.currentTimeMillis());
}
/**
* 壓測啟動
* 主線程負(fù)責(zé)壓測線程準(zhǔn)備工作
* 壓測線程準(zhǔn)備完成后 調(diào)用 start.countDown(); 啟動線程執(zhí)行
* @throws InterruptedException
*/
private void latchTest() throws InterruptedException {
//壓測線程數(shù)
int testThreads = 300;
final CountDownLatch start = new CountDownLatch(1);
final CountDownLatch end = new CountDownLatch(testThreads);
//創(chuàng)建線程池
ExecutorService exce = Executors.newFixedThreadPool(testThreads);
//準(zhǔn)備線程準(zhǔn)備
for (int i = 0; i < testThreads; i++) {
//添加到線程池
exce.submit(() -> {
try {
//啟動后等待 喚醒
start.await();
testLoad();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
//壓測完成
end.countDown();
}
});
}
//連接池線程初始化完成 開始壓測
start.countDown();
//壓測完成后結(jié)束
end.await();
//關(guān)閉線程池
exce.shutdown();
}
public static void main(String[] args) throws InterruptedException {
CountDownLatchPressure countDownLatchPressure = new CountDownLatchPressure();
//開始壓測
countDownLatchPressure.latchTest();
}
}
2. CyclicBarrier

2.1 簡介
CyclicBarrier,是JDK1.5的java.util.concurrent(JUC)并發(fā)包中提供的一個并發(fā)工具類
C yclicBarrier可以使一定數(shù)量的線程反復(fù)地在柵欄位置處匯集,當(dāng)線程到達(dá)柵欄位置時將調(diào)用await方法,這個方法將阻塞直到所有線程都到達(dá)柵欄位置,如果所有線程都到達(dá)柵欄位置,那么柵欄將打開,此時所有的線程都將被釋放,而柵欄將被重置以便下次使用。
2.2 舉例
就像生活中我們會約朋友們到某個餐廳一起吃飯,有些朋友可能會早到,有些朋友可能會晚到,但是這個餐廳規(guī)定必須等到所有人到齊之后才會讓我們進(jìn)去。
這里的朋友們就是各個線程,餐廳就是CyclicBarrier,感覺和 CountDownLatch是一樣的,但是他們是有區(qū)別的,吃完飯之后可以選擇去玩一會,去處理任務(wù),然后等待第二次聚餐,重復(fù)循環(huán)。
2.3 功能
CyclicBarrier和CountDownLatch是非常類似的,CyclicBarrier核心的概念是在于設(shè)置一個等待線程的數(shù)量邊界,到達(dá)了此邊界之后進(jìn)行執(zhí)行。
**CyclicBarrier**類是一個同步輔助類,它允許一組線程互相等待,直到到達(dá)某個公共屏障點(diǎn)(Common Barrier Point)。
**CyclicBarrier**類是一種同步機(jī)制,它能夠?qū)μ幚硪恍┧惴ǖ木€程實(shí)現(xiàn)同。換句話講,它就是一個所有線程必須等待的一個柵欄,直到所有線程都到達(dá)這里,然后所有線程才可以繼續(xù)做其他事情。
通過調(diào)用**CyclicBarrier**對象的**await()**方法,兩個線程可以實(shí)現(xiàn)互相等待,一旦N個線程在等待**CyclicBarrier**達(dá)成,所有線程將被釋放掉去繼續(xù)執(zhí)行。
2.4 構(gòu)造方法
我們可以看下 CyclicBarrier源碼的構(gòu)造方法
public CyclicBarrier(int parties)
public CyclicBarrier(int parties, Runnable barrierAction)
2.4.1 參數(shù)介紹
parties : 是參與線程的個數(shù) , 其參數(shù)表示屏障攔截的線程數(shù)量,每個線程調(diào)用await方法告訴CyclicBarrier我已經(jīng)到達(dá)了屏障,然后當(dāng)前線程被阻塞。
barrierAction : 優(yōu)先執(zhí)行線程 ,用于在線程到達(dá)屏障時,優(yōu)先執(zhí)行barrierAction,方便處理更復(fù)雜的業(yè)務(wù)場景,一般用于數(shù)據(jù)的整理以及匯總,例如excel插入一樣,等所有線程都插入完了,到達(dá)了屏障后,barrierAction線程開始進(jìn)行保存操作,完成后,接下來由其他線程開始進(jìn)行插入,然后到達(dá)屏障接著就是保存,不斷循環(huán)。

CyclicBarrier可以用于多線程計(jì)算數(shù)據(jù),最后合并計(jì)算結(jié)果的場景。
2.5 重要方法
我們上面介紹了構(gòu)造方法,下面我們介紹下CyclicBarrier中重要的方法
//阻塞當(dāng)前線程,等待其他線程執(zhí)行完成。
public int await() throws InterruptedException, BrokenBarrierException
//阻塞當(dāng)前線程指定的時間,如果達(dá)到時間就放行,等待其他線程執(zhí)行完成,
public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException
- 線程調(diào)用 await() 表示自己已經(jīng)到達(dá)柵欄
- BrokenBarrierException 表示柵欄已經(jīng)被破壞,破壞的原因可能是其中一個線程 await() 時被中斷或者超時
2.6 基本使用
一個線程組的線程需要等待所有線程完成任務(wù)后再繼續(xù)執(zhí)行下一次任務(wù)
package chapter02.cyclicbarrier;
import java.util.Random;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class CyclicBarrierTest {
private static Random random = new Random();
/**
* 執(zhí)行任務(wù)
*
* @param barrier
*/
public static void execute(CyclicBarrier barrier) {
//獲取一個隨機(jī)數(shù)
long sleepTime = random.nextInt(10);
//獲取線程id
long threadId = Thread.currentThread().getId();
try {
//睡眠隨機(jī)秒
Thread.sleep(sleepTime * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("線程ID" + threadId + ",準(zhǔn)備任務(wù)完成耗時:" + sleepTime + "當(dāng)前時間" + System.currentTimeMillis());
//線程等待其他任務(wù)完成后喚醒
try {
barrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println("線程ID" + threadId + ",開始執(zhí)行任務(wù),當(dāng)前時間:" + System.currentTimeMillis());
}
public static void main(String[] args) {
//初始化線程數(shù)量
int threadNum = 5;
//初始化一般的線程
CyclicBarrier barrier = new CyclicBarrier(5, () -> System.out.println("整理任務(wù)開始..."));
ExecutorService executor = Executors.newFixedThreadPool(threadNum);
for (int i = 0; i < threadNum; i++) {
executor.submit(() -> {
execute(barrier);
});
}
}
}
2.7 CyclicBarrier 與 CountDownLatch 區(qū)別
- CountDownLatch 是一次性的,CyclicBarrier 是可循環(huán)利用的
- CountDownLatch.await一般阻塞工作線程,所有的進(jìn)行預(yù)備工作的線程執(zhí)行countDown,而CyclicBarrier通過工作線程調(diào)用await從而自行阻塞,直到所有工作線程達(dá)到指定屏障,再大家一起往下走。
- CountDownLatch 參與的線程的職責(zé)是不一樣的,有的在倒計(jì)時,有的在等待倒計(jì)時結(jié)束。CyclicBarrier 參與的線程職責(zé)是一樣的。
- 在控制多個線程同時運(yùn)行上,CountDownLatch可以不限線程數(shù)量,而CyclicBarrier是固定線程數(shù)。
- CyclicBarrier還可以提供一個barrierAction,合并多線程計(jì)算結(jié)果。
3. Semaphore

3.1 簡介
Semaphore也叫信號量,在JDK1.5被引入,可以用來控制同時訪問特定資源的線程數(shù)量,通過協(xié)調(diào)各個線程,以保證合理的使用資源。
Semaphore內(nèi)部維護(hù)了一組虛擬的許可,許可的數(shù)量可以通過構(gòu)造函數(shù)的參數(shù)指定。
訪問特定資源前,必須使用acquire方法獲得許可,如果許可數(shù)量為0,該線程則一直阻塞,直到有可用許可。
-
訪問資源后,使用release釋放許可。
**Semaphore**是一種在多線程環(huán)境下使用的設(shè)施,該設(shè)施負(fù)責(zé)協(xié)調(diào)各個線程,以保證它們能夠正確、合理的使用公共資源的設(shè)施,也是操作系統(tǒng)中用于控制進(jìn)程同步互斥的量。Semaphore是一種計(jì)數(shù)信號量,用于管理一組資源,內(nèi)部是基于AQS的共享模式。它相當(dāng)于給線程規(guī)定一個量從而控制允許活動的線程數(shù)。 可以用來控制同時訪問特定資源的線程數(shù)量,它通過協(xié)調(diào)各個線程,以保證合理的使用公共資源
3.2 舉例
這里面令牌就像停車位一樣,來了十輛車,停車位只有三個,只有三輛車能夠進(jìn)行,只有等其他車開走后,其他車才能開進(jìn)去,和鎖的不一樣的地方是,鎖一次只能進(jìn)入一輛車,但是Semaphore允許一次進(jìn)入很多車,這個令牌是可以調(diào)整的,隨時可以增減令牌。
3.3 應(yīng)用場景
Semaphore 是 synchronized 的加強(qiáng)版,作用是控制線程的并發(fā)數(shù)量。就這一點(diǎn)而言,單純的synchronized 關(guān)鍵字是實(shí)現(xiàn)不了的。
Semaphore可以用于做流量控制,特別是公用資源有限的應(yīng)用場景,比如數(shù)據(jù)庫連接。假如有一個需求,要讀取幾萬個文件的數(shù)據(jù),因?yàn)槎际荌O密集型任務(wù),我們可以啟動幾十個線程并發(fā)地讀取,但是如果讀到內(nèi)存后,還需要存儲到數(shù)據(jù)庫中,而數(shù)據(jù)庫的連接數(shù)只有10個,這時我們必須控制只有10個線程同時獲取數(shù)據(jù)庫連接保存數(shù)據(jù),否則會報(bào)錯無法獲取數(shù)據(jù)庫連接。這個時候,就可以使用Semaphore來做流量控制
3.4 工作原理
以一個停車場是運(yùn)作為例,為了簡單起見,假設(shè)停車場只有三個車位,一開始三個車位都是空的。
這時如果同時來了五輛車,看門人允許其中三輛不受阻礙的進(jìn)入,然后放下車攔,剩下的車則必須在入口等待,此后來的車也都不得不在入口處等待。
這時,有一輛車離開停車場,看門人得知后,打開車攔,放入一輛,如果又離開兩輛,則又可以放入兩輛,如此往復(fù)。
這個停車系統(tǒng)中,每輛車就好比一個線程,看門人就好比一個信號量,看門人限制了可以活動的線程,假如里面依然是三個車位,但是看門人改變了規(guī)則,要求每次只能停兩輛車,那么一開始進(jìn)入兩輛車,后面得等到有車離開才能有車進(jìn)入,但是得保證最多停兩輛車。
對于Semaphore類而言,就如同一個看門人,限制了可活動的線程數(shù)。
3.5 構(gòu)造方法
創(chuàng)建具有給定許可數(shù)的計(jì)數(shù)信號量并設(shè)置為非公平信號量
查看Semaphore源碼發(fā)現(xiàn)他有這兩個構(gòu)造方法
public Semaphore(int permits)
public Semaphore(int permits, boolean fair)
3.5.1 參數(shù)介紹
permits 是設(shè)置同時允許通過的線程數(shù)
fair 等于true時,創(chuàng)建具有給定許可數(shù)的計(jì)數(shù)信號量并設(shè)置為公平信號量。
3.6 其他方法
Semaphore類里面還有一些重要的方法
//從此信號量獲取一個許可前線程將一直阻塞。相當(dāng)于一輛車占了一個車位
public void acquire() throws InterruptedException
//從此信號量獲取給定數(shù)目許可,在提供這些許可前一直將線程阻塞。比如n=2,就相當(dāng)于一輛車占了兩個車位。
public void acquire(int permits) throws InterruptedException
//釋放一個許可,將其返回給信號量。就如同車開走返回一個車位。
public void release()
//獲取當(dāng)前可用許可數(shù)
public void release(int permits)
//獲取當(dāng)前可用許可數(shù)
public int availablePermits()
3.7 示例代碼
共有5個車位但是有100個線程進(jìn)行占用,車停幾秒后會離開,釋放車位給其他線程。
package chapter02.semaphore;
import java.util.Random;
import java.util.concurrent.*;
public class SemaphoreTest {
private static ExecutorService executorService = Executors.newCachedThreadPool();
private static Random random = new Random();
//阻塞隊(duì)列
private static BlockingQueue<String> parks = new LinkedBlockingQueue<>(5);
public static void execute(Semaphore semaphore) {
//獲取一個隨機(jī)數(shù)
long sleepTime = random.nextInt(10);
long threadId = Thread.currentThread().getId();
String park = null;
try {
/**
* 獲取許可,首先判斷semaphore內(nèi)部的數(shù)字是否大于0,如果大于0,
* 才能獲得許可,然后將初始值5減去1,線程才會接著去執(zhí)行;如果沒有
* 獲得許可(原因是因?yàn)橐呀?jīng)有5個線程獲得到許可,semaphore內(nèi)部的數(shù)字為0),
* 線程會阻塞直到已經(jīng)獲得到許可的線程,調(diào)用release()方法,釋放掉許可,
* 也就是將semaphore內(nèi)部的數(shù)字加1,該線程才有可能獲得許可。
*/
semaphore.acquire();
/**
* 對應(yīng)的線程會到阻塞對,對應(yīng)車輛去獲取到車位,如果沒有拿到一致阻塞,
* 直到其他車輛歸還車位。
*/
park = parks.take();
System.out.println("線程ID" + threadId + ",開始占用車位:" + park + ",當(dāng)前剩余車位" + semaphore.availablePermits());
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
//睡眠隨機(jī)秒
Thread.sleep(sleepTime * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
//歸還車位
parks.offer(park);
System.out.println("線程ID" + threadId + ",開始?xì)w還車位:" + park + ",共占用" + sleepTime + "秒");
//線程釋放掉許可,通俗來將就是將semaphore內(nèi)部的數(shù)字加1
semaphore.release();
}
public static void main(String[] args) {
//初始化線程數(shù)量
int threadNum = 100;
parks.offer("車位一");
parks.offer("車位二");
parks.offer("車位三");
parks.offer("車位四");
parks.offer("車位五");
// 初始化5個許可證
Semaphore semaphore = new Semaphore(5);
//可以提前釋放但是車位就會被多個線程同時占用
//semaphore.release(5);
for (int i = 0; i < threadNum; i++) {
executorService.submit(() -> {
execute(semaphore);
});
}
}
}
3.8 注意事項(xiàng)
即使創(chuàng)建信號量的時候,指定了信號量的大小 ,但是在通過 release()操作釋放信號量仍然能釋放超過配置的大小,也就有可能同時執(zhí)行的線程數(shù)量比最開始設(shè)置的要大,沒有任何線程獲取信號量的時候,依然能夠釋放并且釋放的有效。
推薦的做法是一個線程先 acquire 然后 release,如果釋放線程和獲取線程不是同一個,那么最好保證這種對應(yīng)關(guān)系。不要釋放過多的許可證。
4. Fork/Join

4.1 簡介
java下多線程的開發(fā)可以我們自己啟用多線程,線程池,還可以使用forkjoin,forkjoin可以讓我們不去了解諸如Thread,Runnable等相關(guān)的知識,只要遵循forkjoin的開發(fā)模式,就可以寫出很好的多線程并發(fā)程序
Fork/Join框架是Java7提供了的一個用于并行執(zhí)行任務(wù)的框架, 是一個把大任務(wù)分割成若干個小任務(wù),最終匯總每個小任務(wù)結(jié)果后得到大任務(wù)結(jié)果的框架。
Fork/Join框架是一個實(shí)現(xiàn)了ExecutorService接口的多線程處理器。它可以把一個大的任務(wù)劃分為若干個小的任務(wù)并發(fā)執(zhí)行,充分利用可用的資源,進(jìn)而提高應(yīng)用的執(zhí)行效率。
Fork/Join框架簡化了并行程序的原因有 :
- 它簡化了線程的創(chuàng)建,在框架中線程是自動被創(chuàng)建和管理。
- 它自動使用多個處理器,因此程序可以擴(kuò)展到使用可用處理器。
4.2 舉例
就像我需要處理一百萬行的excel,普通的處理是一個一個的excel進(jìn)行處理,但是使用Fork/Join框架后的處理方式呢,加入我們定義100條數(shù)據(jù)為一個批次,那么Fork/Join就會拆分這個excel先到中間拆分成各有50萬的數(shù)據(jù),然后還比100大就繼續(xù)拆分,不斷的細(xì)分,最后分到了每一個線程分得到了100條然后才開始執(zhí)行。
4.3 分而治之
“分而治之” 一直是一個有效的處理大量數(shù)據(jù)的方法。著名的 MapReduce 也是采取了分而治之的思想。
簡單來說,就是如果你要處理1000個數(shù)據(jù),但是你并不具備處理1000個數(shù)據(jù)的能力,那么你可以只處理其中的10個,然后,分階段處理100次,將100次的結(jié)果進(jìn)行合成,那就是最終想要的對原始的1000個數(shù)據(jù)的處理結(jié)果。
同時forkjoin在處理某一類問題時非常的有用,哪一類問題?分而治之的問題。十大計(jì)算機(jī)經(jīng)典算法:快速排序、堆排序、歸并排序、二分查找、線性查找、深度優(yōu)先、廣度優(yōu)先、Dijkstra、動態(tài)規(guī)劃、樸素貝葉斯分類,有幾個屬于分而治之?3個,快速排序、歸并排序、二分查找,還有大數(shù)據(jù)中M/R都是。
4.3.1 分治法的設(shè)計(jì)思想
將一個難以直接解決的大問題,分割成一些規(guī)模較小的相同問題,以便各個擊破,分而治之。
4.3.2 分治策略
對于一個規(guī)模為n的問題,若該問題可以容易地解決(比如說規(guī)模n較?。﹦t直接解決,否則將其分解為k個規(guī)模較小的子問題,**這些子問題互相獨(dú)立且與原問題形式相同**(**子問題相互之間有聯(lián)系就會變?yōu)閯討B(tài)規(guī)范算法**),遞歸地解這些子問題,然后將各子問題的解合并得到原問題的解。這種算法設(shè)計(jì)策略叫做分治法。
4.4 Fork-Join原理
Fork/Join實(shí)現(xiàn)了ExecutorService,所以它的任務(wù)也需要放在線程池中執(zhí)行。它的不同在于它使用了工作竊取算法,空閑的線程可以從滿負(fù)荷的線程中竊取任務(wù)來幫忙執(zhí)行。
由于線程池中的每個線程都有一個隊(duì)列,而且線程間互不影響,那么線程每次都從自己的任務(wù)隊(duì)列的頭部獲取一個任務(wù)出來執(zhí)行。如果某個時候一個線程的任務(wù)隊(duì)列空了,而其余的線程任務(wù)隊(duì)列中還有任務(wù),那么這個線程就會從其他線程的任務(wù)隊(duì)列中取一個任務(wù)出來幫忙執(zhí)行。就像偷取了其他人的工作一樣
4.4.1 任務(wù)分割和合并
Fork/Join框架的基本思想就是將一個大任務(wù)分解(Fork)成一系列子任務(wù),子任務(wù)可以繼續(xù)往下分解,當(dāng)多個不同的子任務(wù)都執(zhí)行完成后,可以將它們各自的結(jié)果合并(Join)成一個大結(jié)果,最終合并成大任務(wù)的結(jié)果

我們看下面這個圖

首先main Task 先fork成 0,1兩個任務(wù) 接著,因?yàn)檫€是太大,繼續(xù)fork成 0-0,0-1,1-0,1-1 然后進(jìn)行計(jì)算計(jì)算完成后進(jìn)行join操作,0-0,1-1 join到0, 1-0,1-1 join到1 然后 0和1繼續(xù)join到mainTask,完成計(jì)算任務(wù)。
4.4.2 工作密取
即當(dāng)前線程的Task已經(jīng)全被執(zhí)行完畢,則自動取到其他線程的Task池中取出Task繼續(xù)執(zhí)行即如果一個工作線程沒有事情要做,它可以從其他仍然忙碌的線程竊取任務(wù)。
ForkJoinPool中維護(hù)著多個線程(一般為CPU核數(shù))在不斷地執(zhí)行Task,每個線程除了執(zhí)行自己職務(wù)內(nèi)的Task之外,還會根據(jù)自己工作線程的閑置情況去獲取其他繁忙的工作線程的Task,如此一來就能能夠減少線程阻塞或是閑置的時間,提高CPU利用率。

4.5 相關(guān)子類
我們已經(jīng)很清楚 Fork/Join 框架的需求了,那么我們可以思考一下,如果讓我們來設(shè)計(jì)一個 Fork/Join 框架,該如何設(shè)計(jì)?這個思考有助于你理解 Fork/Join 框架的設(shè)計(jì)。
第一步分割任務(wù)。首先我們需要有一個 fork 類來把大任務(wù)分割成子任務(wù),有可能子任務(wù)還是很大,所以還需要不停的分割,直到分割出的子任務(wù)足夠小。
第二步執(zhí)行任務(wù)并合并結(jié)果。分割的子任務(wù)分別放在雙端隊(duì)列里,然后幾個啟動線程分別從雙端隊(duì)列里獲取任務(wù)執(zhí)行。子任務(wù)執(zhí)行完的結(jié)果都統(tǒng)一放在一個隊(duì)列里,啟動一個線程從隊(duì)列里拿數(shù)據(jù),然后合并這些數(shù)據(jù)。
Fork/Join 使用兩個類來完成以上兩件事情:
4.5.1 ForkJoinTask
我們要使用 ForkJoin 框架,必須首先創(chuàng)建一個 ForkJoin 任務(wù)。它提供在任務(wù)中執(zhí)行 fork() 和 join() 操作的機(jī)制,通常情況下我們不需要直接繼承 ForkJoinTask 類,而只需要繼承它的子類,F(xiàn)ork/Join 框架提供了以下兩個子類:
4.5.1.1 RecursiveAction
用于沒有返回結(jié)果的任務(wù)
4.5.1.2 RecursiveTask
用于有返回結(jié)果的任務(wù)。
4.5.2 ForkJoinPool
ForkJoinTask 需要通過 ForkJoinPool 來執(zhí)行,任務(wù)分割出的子任務(wù)會添加到當(dāng)前工作線程所維護(hù)的雙端隊(duì)列中,進(jìn)入隊(duì)列的頭部。當(dāng)一個工作線程的隊(duì)列里暫時沒有任務(wù)時,它會隨機(jī)從其他工作線程的隊(duì)列的尾部獲取一個任務(wù)
4.6 Fork/Join使用
Task要通過ForkJoinPool來執(zhí)行,使用submit 或 invoke 提交,兩者的區(qū)別是:invoke是同步執(zhí)行,調(diào)用之后需要等待任務(wù)完成,才能執(zhí)行后面的代碼;submit是異步執(zhí)行,join()和get方法當(dāng)任務(wù)完成的時候返回計(jì)算結(jié)果

在我們自己實(shí)現(xiàn)的compute方法里,首先需要判斷任務(wù)是否足夠小,如果足夠小就直接執(zhí)行任務(wù)。如果不足夠小,就必須分割成兩個子任務(wù),每個子任務(wù)在調(diào)用invokeAll方法時,又會進(jìn)入compute方法,看看當(dāng)前子任務(wù)是否需要繼續(xù)分割成孫任務(wù),如果不需要繼續(xù)分割,則執(zhí)行當(dāng)前子任務(wù)并返回結(jié)果。使用join方法會等待子任務(wù)執(zhí)行完并得到其結(jié)果。
4.6.1 任務(wù)的提交邏輯
fork/join其實(shí)大部分邏輯處理操作都集中在提交任務(wù)和處理任務(wù)這兩塊,了解任務(wù)的提交基本上后面就很容易理解了, fork/join提交任務(wù)主要分為兩種:
4.6.1.1 第一次提交到forkJoinPool
//創(chuàng)建初始化任務(wù)
SubmitTask submitTask = new SubmitTask(start, end);
//將初始任務(wù)扔進(jìn)連接池中執(zhí)行
forkJoinPool.invoke(submitTask);
4.6.1.2 任務(wù)切分之后的提交
//沒有達(dá)到閾值 計(jì)算一個中間值
long mid = (start + end) / 2;
//拆分 左邊的
SubmitTask left = new SubmitTask(start, mid);
//拆分右邊的
SubmitTask right = new SubmitTask(mid + 1, end);
//添加到任務(wù)列表
invokeAll(left, right);
4.6.1.3 合并任務(wù)
//合并結(jié)果并返回
return left.join() + right.join();
4.6.1.4 代碼案例
package chapter02.forkjoin;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;
/**
* 計(jì)算 0-10000 階乘
*/
public class SubmitTask extends RecursiveTask<Long> {
/**
* 起始值
*/
private long start;
/**
* 結(jié)束值
*/
private long end;
/**
* 閾值
*/
private long threshold = 10L;
public SubmitTask(long start, long end) {
this.start = start;
this.end = end;
}
/**
* 計(jì)算邏輯
* 進(jìn)行任務(wù)的拆分 以及 達(dá)到閾值的計(jì)算
*
* @return
*/
@Override
protected Long compute() {
//校驗(yàn)是否達(dá)到了閾值
if (isLessThanThreshold()) {
//處理并返回結(jié)果
return handle();
} else {
//沒有達(dá)到閾值 計(jì)算一個中間值
long mid = (start + end) / 2;
//拆分 左邊的
SubmitTask left = new SubmitTask(start, mid);
//拆分右邊的
SubmitTask right = new SubmitTask(mid + 1, end);
//添加到任務(wù)列表
invokeAll(left, right);
//合并結(jié)果并返回
return left.join() + right.join();
}
}
/**
* 處理的任務(wù)
*
* @return
*/
public Long handle() {
long sum = 0;
for (long i = start; i <= end; i++) {
sum += i;
try {
Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
return sum;
}
/*是否達(dá)到了閾值*/
private boolean isLessThanThreshold() {
return end - start <= threshold;
}
/**
* forkJoin 方式調(diào)用
*
* @param start
* @param end
*/
public static void forkJoinInvok(long start, long end) {
long sum = 0;
long currentTime = System.currentTimeMillis();
//創(chuàng)建ForkJoinPool 連接池
ForkJoinPool forkJoinPool = new ForkJoinPool();
//創(chuàng)建初始化任務(wù)
SubmitTask submitTask = new SubmitTask(start, end);
//將初始任務(wù)扔進(jìn)連接池中執(zhí)行
forkJoinPool.invoke(submitTask);
//forkJoinPool.submit(submitTask);
// System.out.println("異步方式,任務(wù)結(jié)束才會調(diào)用該方法,當(dāng)前耗時"+(System.currentTimeMillis() - currentTime));
//等待返回結(jié)果
sum = submitTask.join();
//forkjoin調(diào)用方式耗時
System.out.println("forkJoin調(diào)用:result:" + sum);
System.out.println("forkJoin調(diào)用耗時:" + (System.currentTimeMillis() - currentTime));
}
/**
* 普通方式調(diào)用
*
* @param start
* @param end
*/
public static void normalInvok(long start, long end) {
long sum = 0;
long currentTime = System.currentTimeMillis();
for (long i = start; i <= end; i++) {
sum += i;
try {
Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//普通調(diào)動方式耗時
System.out.println("普通調(diào)用:result:" + sum);
System.out.println("普通調(diào)用耗時:" + (System.currentTimeMillis() - currentTime));
}
public static void main(String[] args) {
//起始值的大小
long start = 0;
//結(jié)束值的大小
long end = 10000;
//forkJoin 調(diào)用
forkJoinInvok(start, end);
System.out.println("========================");
//普通調(diào)用
normalInvok(start, end);
}
}
本文由
傳智教育博學(xué)谷狂野架構(gòu)師教研團(tuán)隊(duì)發(fā)布。如果本文對您有幫助,歡迎
關(guān)注和點(diǎn)贊;如果您有任何建議也可留言評論或私信,您的支持是我堅(jiān)持創(chuàng)作的動力。轉(zhuǎn)載請注明出處!