ForkJoinPool在Java7中開始提供。它將一個(gè)任務(wù)拆分成多個(gè)小任務(wù)并行計(jì)算,從而充分使用cpu的能力。Fork/Join并行算法是我們所熟悉的分治算法的并行版本,典型的用法如下:
Result solve(Problem problem) {
if (problem is small) {
directly solve problem
} else {
split problem into independent parts
fork new subtasks to solve each part
join all subtasks
compose result from subresults
}
}
ForkJoin框架針對(duì)其執(zhí)行的任務(wù)特性對(duì)執(zhí)行框架進(jìn)行了定制性的優(yōu)化,體現(xiàn)在以下幾個(gè)方面,
- 線程池是已經(jīng)準(zhǔn)備好的,通常工作線程個(gè)數(shù)與系統(tǒng)CPU個(gè)數(shù)相同以充分使用CPU
- 所有的ForkJoin任務(wù)都是輕量級(jí)執(zhí)行類的實(shí)例,而不是線程實(shí)例
- 采用特殊的隊(duì)列和調(diào)度原則來管理任務(wù)并通過工作線程來執(zhí)行任務(wù)。
- 提供一個(gè)簡單的控制管理類來啟動(dòng)工作線程池
ForkJoin框架的核心在于提供輕量級(jí)調(diào)度機(jī)制。它的基本調(diào)度策略如下:
- 每個(gè)工作線程維護(hù)自己的調(diào)度隊(duì)列中的可運(yùn)行任務(wù)
- 隊(duì)列以雙端隊(duì)列的形式被維護(hù),支持后進(jìn)先出以及先進(jìn)先出
- 對(duì)于一個(gè)給定的工作線程而言,任務(wù)所產(chǎn)生的子任務(wù)會(huì)被放入工作者自己的雙端隊(duì)列中
- 工作線程使用后進(jìn)先出,通過彈出任務(wù)來處理隊(duì)列中的任務(wù)
- 當(dāng)一個(gè)工作線程本地沒有任務(wù)去運(yùn)行的時(shí)候,它將使用先進(jìn)先出的規(guī)則嘗試從隨機(jī)的從別的工作線程中拿一個(gè)任務(wù)執(zhí)行
- 當(dāng)一個(gè)工作線程觸發(fā)了join操作,如果可能的話它將處理其他任務(wù),
直到目標(biāo)任務(wù)被告知已經(jīng)結(jié)束。所有的任務(wù)都會(huì)無阻塞的完成。 - 當(dāng)一個(gè)工作線程無法再從其他線程中獲取任務(wù)和失敗處理的時(shí)候,它就會(huì)退出,
并經(jīng)過一段時(shí)間之后再次嘗試直到所有的工作線程都被告知他們處于空閑的狀態(tài)。在這種情況下,他們都會(huì)阻塞到直到其他任務(wù)再度被上層調(diào)用。
核心特點(diǎn): 后進(jìn)先出處理自己的任務(wù),先進(jìn)先出處理竊取的任務(wù)。
優(yōu)先處理自己的任務(wù)可以獲得更好的局部性。從其他隊(duì)列竊取任務(wù)的開銷會(huì)比在自己的隊(duì)列中執(zhí)行pop操作的開銷大。
竊取時(shí)從相反的方向進(jìn)行操作可以減小競爭。
更早期竊取可以獲得更大的單元任務(wù),從而竊取線程可以在將來進(jìn)行遞歸分解。
可以通過ForkJoin計(jì)算Fibonacci的例子來學(xué)習(xí)ForkJoin并行框架的使用
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;
import org.junit.Test;
public class ForkJoinFibonacciTest {
static final long[] FIBONACCI = {1, 1, 2, 3, 5, 8, 13, 21};
@Test
public void testFibonacci() {
int n = 40;
long t0 = System.nanoTime();
long rs = fibRecur(n);
long t1 = System.nanoTime();
System.out.printf(
"recur fibonacci cost %d ms , result %d \n", (t1 - t0) / 1000000, rs);
ForkJoinPool pool = new ForkJoinPool();
FibTask task = new FibTask(n);
t0 = System.nanoTime();
pool.invoke(task);
t1 = System.nanoTime();
System.out.printf(
"fork/join fibonacci cost %d ms, result %d \n", (t1 - t0) / 1000000,
rs);
}
private static long fibRecur(int n) {
if (n < FIBONACCI.length) {
return FIBONACCI[n];
}
return fibRecur(n - 1) + fibRecur(n - 2);
}
private static class FibTask extends RecursiveTask<Long> {
static final int threshold = FIBONACCI.length - 1;
volatile int number;
FibTask(int n) {
number = n;
}
@Override
protected Long compute() {
int n = number;
if (n < threshold) {
return FIBONACCI[n];
} else {
FibTask task1 = new FibTask(n - 1);
FibTask task2 = new FibTask(n - 2);
task1.fork();
task2.fork();
return task1.join() + task2.join();
}
}
}
}
輸出結(jié)果為
recur fibonacci cost 39 ms , result 165580141
fork/join fibonacci cost 960 ms, result 165580141
fork/join反而比普通的遞歸算法慢。個(gè)人認(rèn)為原因可能是Fibonacci計(jì)算本身簡單,因此導(dǎo)致fork/join的任務(wù)分配和管理帶來的開銷遠(yuǎn)超于并行計(jì)算帶來的提升。