和肥肥的主管懟了一下午的ForkJoinPool

肥肥的主管: 小飯飯,了解ForkJoinPool嗎

帥氣的小飯飯: 了解啊,Caffeine中默認(rèn)用到的處理線程池就是這個

肥肥的主管: 和ThreadPoolExecutor有什么區(qū)別嗎?

帥氣的小飯飯: (⊙o⊙)…,這個沒了解

肥肥的主管: 有什么應(yīng)用場景嗎?能引入我們的游戲中嗎?有什么弊端嗎?

帥氣的小飯飯: (⊙o⊙)…,我周末研究下

以上就是我,帥氣的小飯飯被小肥肥刁難的日常了。

image-20210620115400862

每次看到這個,總會邪惡的想到FuckXXX的。

其實(shí)好久前就知道這個FuckXXX了,比如Caffeine、Netty中默認(rèn)提供的線程池,只是一直沒有去研究它。

過了個周末,我李漢三終于搞懂了ForkJoinPool了,終于可以和小肥肥正面肝上了

image-20210620154701046

看完該篇文章你大概可以了解到這些知識點(diǎn):

  • ForkJoinPool是什么

  • 和ThreadPoolExecutor有什么區(qū)別

  • 應(yīng)用場景是啥

  • 好處和弊端

  • 常見面試題

小肥肥: 周末不是研究了下ForkJoinPool嗎?說說看是啥東西啊這?

*帥氣的小飯飯: * 是啊是啊,可賣力了,現(xiàn)在已經(jīng)研究透徹了,

ForkJoinPool是JDK7引入的線程池,核心思想是將大的任務(wù)拆分成多個小任務(wù)(即fork),然后在將多個小任務(wù)處理匯總到一個結(jié)果(即join),非常像MapReduce處理原理。

同時呢,它還提供基本的線程池功能,支持設(shè)置最大并發(fā)線程數(shù),支持任務(wù)排隊,支持線程池停止,支持線程池使用情況監(jiān)控,也是AbstractExecutorService的子類,主要引入了“工作竊取”機(jī)制,在多CPU計算機(jī)上處理性能更佳。

為了讓你看清楚這個過程,我還特地畫了個流程圖給你look look

image-20210620120440613

小肥肥: 什么叫“工作竊取”機(jī)制啊,這么屌的

*帥氣的小飯飯: *和它的名字一樣啊,就是竊取啊,work-stealing(工作竊?。?,F(xiàn)orkJoinPool提供了一個更有效的利用線程的機(jī)制,當(dāng)ThreadPoolExecutor還在用單個隊列存放任務(wù)時,F(xiàn)orkJoinPool已經(jīng)分配了與線程數(shù)相等的隊列,當(dāng)有任務(wù)加入線程池時,會被平均分配到對應(yīng)的隊列上,各線程進(jìn)行正常工作,當(dāng)有線程提前完成時,會從隊列的末端“竊取”其他線程未執(zhí)行完的任務(wù),當(dāng)任務(wù)量特別大時,CPU多的計算機(jī)會表現(xiàn)出更好的性能,就是這么強(qiáng)啊。

image-20210620123535386

小肥肥: 我擦,好強(qiáng),原理是啥啊

*帥氣的小飯飯: *原理啊,我羅列下吧

  • ForkJoinPool 的每個工作線程都維護(hù)著一個工作隊列(WorkQueue),這是一個雙端隊列(Deque),里面存放的對象是任務(wù)(ForkJoinTask)。

  • 每個工作線程在運(yùn)行中產(chǎn)生新的任務(wù),通常是因為調(diào)用了fork(),會放入工作隊列的隊尾,并且工作線程在處理自己的工作隊列時,使用的是 LIFO 方式,也就是說每次從隊尾取出任務(wù)來執(zhí)行。

  • 每個工作線程在處理自己的工作隊列同時,會嘗試竊取一個任務(wù),這個竊取行為或是來自于剛剛提交到 pool 的任務(wù),或是來自于其他工作線程的工作隊列,竊取的任務(wù)位于其他線程的工作隊列的隊首,也就是說工作線程在竊取其他工作線程的任務(wù)時,使用的是 FIFO 方式。

  • 在遇到 join() 時,如果需要 join 的任務(wù)尚未完成,則會先處理其他任務(wù),并等待其完成。

  • 在既沒有自己的任務(wù),也沒有可以竊取的任務(wù)時,進(jìn)入休眠。

*小肥肥: *了解了,那除此之外還和ThreadPoolExecutor有什么區(qū)別嗎

*帥氣的小飯飯: *(⊙o⊙)…,最大的區(qū)別就是這個工作竊取機(jī)制了,其他的,我想想看哦,突然想到了我們游戲內(nèi)的線程池模型

我們游戲內(nèi)使用的ThreadPoolExecutor線程池內(nèi)的對應(yīng)線程池是有明確規(guī)定的,比如A線程只能處理一批玩家的數(shù)據(jù),B線程只能處理另一批玩家的數(shù)據(jù),這里用了取摩的思想,而ForkJoinPool 這種竊取思想,自身是并行計算,無法對對應(yīng)線程進(jìn)行明確的工作劃分,單純這點(diǎn)來說,其實(shí)就不適合用來做我們的線程模型的載體了。

小肥肥:嗯,也是,那它有啥應(yīng)用場景呢

*帥氣的小飯飯: *ForkJoinPool 最適合的是計算密集型的任務(wù),如果存在 I/O,線程間同步,sleep() 等會造成線程長時間阻塞的情況時,最好配合使用 ManagedBlocker,或者不用。

為了讓小肥肥看清楚這個應(yīng)用場景,我打開了idea,怒寫了以下代碼【其實(shí)是從網(wǎng)上看到的例子】

方案一:平平無奇的for循環(huán)解決

<pre spellcheck="false" class="md-fences md-end-block ty-contain-cm modeLoaded" lang="java" cid="n58" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: var(--monospace); font-size: 0.9em; display: block; break-inside: avoid; text-align: left; white-space: normal; background-image: inherit; background-position: inherit; background-size: inherit; background-repeat: inherit; background-attachment: inherit; background-origin: inherit; background-clip: inherit; background-color: rgb(248, 248, 248); position: relative !important; border: 1px solid rgb(231, 234, 237); border-radius: 3px; padding: 8px 4px 6px; margin-bottom: 15px; margin-top: 15px; width: inherit; color: rgb(51, 51, 51); font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-style: initial; text-decoration-color: initial;">public interface Calculator {

/**

  • 把傳進(jìn)來的所有numbers 做求和處理
  • @param numbers
  • @return 總和
    */
    long sumUp(long[] numbers);
    }
    </pre>

日常代碼

<pre spellcheck="false" class="md-fences md-end-block ty-contain-cm modeLoaded" lang="java" cid="n60" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: var(--monospace); font-size: 0.9em; display: block; break-inside: avoid; text-align: left; white-space: normal; background-image: inherit; background-position: inherit; background-size: inherit; background-repeat: inherit; background-attachment: inherit; background-origin: inherit; background-clip: inherit; background-color: rgb(248, 248, 248); position: relative !important; border: 1px solid rgb(231, 234, 237); border-radius: 3px; padding: 8px 4px 6px; margin-bottom: 15px; margin-top: 15px; width: inherit; color: rgb(51, 51, 51); font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-style: initial; text-decoration-color: initial;">public class ForLoopCalculator implements Calculator {

@Override
public long sumUp(long[] numbers) {
long total = 0;
for (long i : numbers) {
total += i;
}
return total;
}
}
</pre>

執(zhí)行類

<pre spellcheck="false" class="md-fences md-end-block ty-contain-cm modeLoaded" lang="java" cid="n62" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: var(--monospace); font-size: 0.9em; display: block; break-inside: avoid; text-align: left; white-space: normal; background-image: inherit; background-position: inherit; background-size: inherit; background-repeat: inherit; background-attachment: inherit; background-origin: inherit; background-clip: inherit; background-color: rgb(248, 248, 248); position: relative !important; border: 1px solid rgb(231, 234, 237); border-radius: 3px; padding: 8px 4px 6px; margin-bottom: 15px; margin-top: 15px; width: inherit; color: rgb(51, 51, 51); font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-style: initial; text-decoration-color: initial;"> public static void main(String[] args) {
long[] numbers = LongStream.rangeClosed(1, 10000000).toArray();

Instant start = Instant.now();
Calculator calculator = new ForLoopCalculator();
long result = calculator.sumUp(numbers);
Instant end = Instant.now();
System.out.println("耗時:" + Duration.between(start, end).toMillis() + "ms");

System.out.println("結(jié)果為:" + result);
}
輸出:
耗時:10ms
結(jié)果為:50000005000000</pre>

方案二:ExecutorService多線程方式實(shí)現(xiàn)

<pre spellcheck="false" class="md-fences md-end-block ty-contain-cm modeLoaded" lang="java" cid="n64" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: var(--monospace); font-size: 0.9em; display: block; break-inside: avoid; text-align: left; white-space: normal; background-image: inherit; background-position: inherit; background-size: inherit; background-repeat: inherit; background-attachment: inherit; background-origin: inherit; background-clip: inherit; background-color: rgb(248, 248, 248); position: relative !important; border: 1px solid rgb(231, 234, 237); border-radius: 3px; padding: 8px 4px 6px; margin-bottom: 15px; margin-top: 15px; width: inherit; color: rgb(51, 51, 51); font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-style: initial; text-decoration-color: initial;">public class ExecutorServiceCalculator implements Calculator {

private int parallism;
private ExecutorService pool;

public ExecutorServiceCalculator() {
parallism = Runtime.getRuntime().availableProcessors(); // CPU的核心數(shù) 默認(rèn)就用cpu核心數(shù)了
pool = Executors.newFixedThreadPool(parallism);
}

//處理計算任務(wù)的線程
private static class SumTask implements Callable<Long> {
private long[] numbers;
private int from;
private int to;

public SumTask(long[] numbers, int from, int to) {
this.numbers = numbers;
this.from = from;
this.to = to;
}

@Override
public Long call() {
long total = 0;
for (int i = from; i <= to; i++) {
total += numbers[i];
}
return total;
}
}

@Override
public long sumUp(long[] numbers) {
List<Future<Long>> results = new ArrayList<>();

// 把任務(wù)分解為 n 份,交給 n 個線程處理 4核心 就等分成4份唄
// 然后把每一份都扔個一個SumTask線程 進(jìn)行處理
int part = numbers.length / parallism;
for (int i = 0; i < parallism; i++) {
int from = i * part; //開始位置
int to = (i == parallism - 1) ? numbers.length - 1 : (i + 1) * part - 1; //結(jié)束位置

//扔給線程池計算
results.add(pool.submit(new SumTask(numbers, from, to)));
}

// 把每個線程的結(jié)果相加,得到最終結(jié)果 get()方法 是阻塞的
// 優(yōu)化方案:可以采用CompletableFuture來優(yōu)化 JDK1.8的新特性
long total = 0L;
for (Future<Long> f : results) {
try {
total += f.get();
} catch (Exception ignore) {
}
}

return total;
}
}</pre>

執(zhí)行類

<pre spellcheck="false" class="md-fences md-end-block ty-contain-cm modeLoaded" lang="java" cid="n66" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: var(--monospace); font-size: 0.9em; display: block; break-inside: avoid; text-align: left; white-space: normal; background-image: inherit; background-position: inherit; background-size: inherit; background-repeat: inherit; background-attachment: inherit; background-origin: inherit; background-clip: inherit; background-color: rgb(248, 248, 248); position: relative !important; border: 1px solid rgb(231, 234, 237); border-radius: 3px; padding: 8px 4px 6px; margin-bottom: 15px; margin-top: 15px; width: inherit; color: rgb(51, 51, 51); font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-style: initial; text-decoration-color: initial;"> public static void main(String[] args) {
long[] numbers = LongStream.rangeClosed(1, 10000000).toArray();

Instant start = Instant.now();
Calculator calculator = new ExecutorServiceCalculator();
long result = calculator.sumUp(numbers);
Instant end = Instant.now();
System.out.println("耗時:" + Duration.between(start, end).toMillis() + "ms");

System.out.println("結(jié)果為:" + result); // 打印結(jié)果500500
}
輸出:
耗時:30ms
結(jié)果為:50000005000000</pre>

方案三:采用ForkJoinPool(Fork/Join)

<pre spellcheck="false" class="md-fences md-end-block ty-contain-cm modeLoaded" lang="java" cid="n68" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: var(--monospace); font-size: 0.9em; display: block; break-inside: avoid; text-align: left; white-space: normal; background-image: inherit; background-position: inherit; background-size: inherit; background-repeat: inherit; background-attachment: inherit; background-origin: inherit; background-clip: inherit; background-color: rgb(248, 248, 248); position: relative !important; border: 1px solid rgb(231, 234, 237); border-radius: 3px; padding: 8px 4px 6px; margin-bottom: 15px; margin-top: 15px; width: inherit; color: rgb(51, 51, 51); font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-style: initial; text-decoration-color: initial;">
public class ForkJoinCalculator implements Calculator {

private ForkJoinPool pool;

//執(zhí)行任務(wù)RecursiveTask:有返回值 RecursiveAction:無返回值
private static class SumTask extends RecursiveTask<Long> {
private long[] numbers;
private int from;
private int to;

public SumTask(long[] numbers, int from, int to) {
this.numbers = numbers;
this.from = from;
this.to = to;
}

//此方法為ForkJoin的核心方法:對任務(wù)進(jìn)行拆分 拆分的好壞決定了效率的高低
@Override
protected Long compute() {

// 當(dāng)需要計算的數(shù)字個數(shù)小于6時,直接采用for loop方式計算結(jié)果
if (to - from < 6) {
long total = 0;
for (int i = from; i <= to; i++) {
total += numbers[i];
}
return total;
} else { // 否則,把任務(wù)一分為二,遞歸拆分(注意此處有遞歸)到底拆分成多少分 需要根據(jù)具體情況而定
int middle = (from + to) / 2;
SumTask taskLeft = new SumTask(numbers, from, middle);
SumTask taskRight = new SumTask(numbers, middle + 1, to);
taskLeft.fork();
taskRight.fork();
return taskLeft.join() + taskRight.join();
}
}
}

public ForkJoinCalculator() {
// 也可以使用公用的線程池 ForkJoinPool.commonPool():
// pool = ForkJoinPool.commonPool()
pool = new ForkJoinPool();
}

@Override
public long sumUp(long[] numbers) {
Long result = pool.invoke(new SumTask(numbers, 0, numbers.length - 1));
pool.shutdown();
return result;
}
}
輸出:
耗時:390ms
結(jié)果為:50000005000000</pre>

方案四:采用并行流

<pre spellcheck="false" class="md-fences md-end-block ty-contain-cm modeLoaded" lang="java" cid="n70" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: var(--monospace); font-size: 0.9em; display: block; break-inside: avoid; text-align: left; white-space: normal; background-image: inherit; background-position: inherit; background-size: inherit; background-repeat: inherit; background-attachment: inherit; background-origin: inherit; background-clip: inherit; background-color: rgb(248, 248, 248); position: relative !important; border: 1px solid rgb(231, 234, 237); border-radius: 3px; padding: 8px 4px 6px; margin-bottom: 15px; margin-top: 15px; width: inherit; color: rgb(51, 51, 51); font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-style: initial; text-decoration-color: initial;"> public static void main(String[] args) {

Instant start = Instant.now();
long result = LongStream.rangeClosed(0, 10000000L).parallel().reduce(0, Long::sum);
Instant end = Instant.now();
System.out.println("耗時:" + Duration.between(start, end).toMillis() + "ms");

System.out.println("結(jié)果為:" + result); // 打印結(jié)果500500

}
輸出:
耗時:130ms
結(jié)果為:50000005000000</pre>

并行流底層也是Fork/Join框架,只是任務(wù)拆分優(yōu)化得很好。

耗時效率方面解釋:Fork/Join 并行流等當(dāng)計算的數(shù)字非常大的時候,優(yōu)勢才能體現(xiàn)出來。也就是說,如果你的計算比較小,或者不是CPU密集型的任務(wù),不太建議使用并行處理。

小肥肥: 不明覺厲,看起來就是不適用,使用它的好處和弊端呢?

帥氣的小飯飯: 好處就是對CPU的充分利用了,特別是在計算密集型的任務(wù)的時候,弊端就是需要對任務(wù)進(jìn)行拆分,拆分的好壞也意味著任務(wù)的好壞,對開發(fā)人員來說,還是有點(diǎn)成本存在的。

小肥肥: 嗯,確實(shí),我這邊有份簡歷,有某初級游戲開發(fā)說用上了這個技術(shù),你設(shè)計下相關(guān)面試題問問人家吧。

帥氣的小飯飯: 好吧,那這邊大概設(shè)計以下幾道面試題吧

  • 什么是ForkJoin?

  • 說說看ForkJoin的內(nèi)部原理?

  • 大概聊聊工作竊取機(jī)制?

  • 如何充分利用CPU,計算超大數(shù)組中所有整數(shù)的和?

這些面試題的答案基本上都在我們聊的話題中啦。

小肥肥: 挺好的,等著升資深開發(fā)吧你,周末還學(xué)習(xí),對了,總結(jié)下,將剛剛的聊天記錄總結(jié)下,然后后期做個分享。

帥氣的小飯飯: 哦豁,可以啊,先加點(diǎn)錢啊,窮啊,我總結(jié)下吧

  • ForkJoinPool特別適合于“分而治之”算法的實(shí)現(xiàn);

  • ForkJoinPool和ThreadPoolExecutor是互補(bǔ)的,不是誰替代誰的關(guān)系,二者適用的場景不同;

  • ForkJoinTask有兩個核心方法——fork()和join(),有三個重要子類——RecursiveAction、RecursiveTask和CountedCompleter;

  • ForkjoinPool內(nèi)部基于“工作竊取”算法實(shí)現(xiàn);

  • 每個線程有自己的工作隊列,它是一個雙端隊列,自己從隊列頭存取任務(wù),其它線程從尾部竊取任務(wù);

  • ForkJoinPool最適合于計算密集型任務(wù),但也可以使用ManagedBlocker以便用于阻塞型任務(wù);

  • RecursiveTask內(nèi)部可以少調(diào)用一次fork(),利用當(dāng)前線程處理,這是一種技巧;

原文鏈接:https://mp.weixin.qq.com/s/z7yBgd3qqecCAyXwOmwJDQ

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容