Java ForkJoin 原理及示例

ForkJoinPool在Java7中開始提供。它將一個(gè)任務(wù)拆分成多個(gè)小任務(wù)并行計(jì)算,從而充分使用cpu的能力。Fork/Join并行算法是我們所熟悉的分治算法的并行版本,典型的用法如下:

Result solve(Problem problem) {
    if (problem is small) {
        directly solve problem
    } else {
        split problem into independent parts
        fork new subtasks to solve each part
        join all subtasks
        compose result from subresults
    }
}

ForkJoin框架針對(duì)其執(zhí)行的任務(wù)特性對(duì)執(zhí)行框架進(jìn)行了定制性的優(yōu)化,體現(xiàn)在以下幾個(gè)方面,

  1. 線程池是已經(jīng)準(zhǔn)備好的,通常工作線程個(gè)數(shù)與系統(tǒng)CPU個(gè)數(shù)相同以充分使用CPU
  2. 所有的ForkJoin任務(wù)都是輕量級(jí)執(zhí)行類的實(shí)例,而不是線程實(shí)例
  3. 采用特殊的隊(duì)列和調(diào)度原則來管理任務(wù)并通過工作線程來執(zhí)行任務(wù)。
  4. 提供一個(gè)簡單的控制管理類來啟動(dòng)工作線程池

ForkJoin框架的核心在于提供輕量級(jí)調(diào)度機(jī)制。它的基本調(diào)度策略如下:

  1. 每個(gè)工作線程維護(hù)自己的調(diào)度隊(duì)列中的可運(yùn)行任務(wù)
  2. 隊(duì)列以雙端隊(duì)列的形式被維護(hù),支持后進(jìn)先出以及先進(jìn)先出
  3. 對(duì)于一個(gè)給定的工作線程而言,任務(wù)所產(chǎn)生的子任務(wù)會(huì)被放入工作者自己的雙端隊(duì)列中
  4. 工作線程使用后進(jìn)先出,通過彈出任務(wù)來處理隊(duì)列中的任務(wù)
  5. 當(dāng)一個(gè)工作線程本地沒有任務(wù)去運(yùn)行的時(shí)候,它將使用先進(jìn)先出的規(guī)則嘗試從隨機(jī)的從別的工作線程中拿一個(gè)任務(wù)執(zhí)行
  6. 當(dāng)一個(gè)工作線程觸發(fā)了join操作,如果可能的話它將處理其他任務(wù),
    直到目標(biāo)任務(wù)被告知已經(jīng)結(jié)束。所有的任務(wù)都會(huì)無阻塞的完成。
  7. 當(dāng)一個(gè)工作線程無法再從其他線程中獲取任務(wù)和失敗處理的時(shí)候,它就會(huì)退出,
    并經(jīng)過一段時(shí)間之后再次嘗試直到所有的工作線程都被告知他們處于空閑的狀態(tài)。在這種情況下,他們都會(huì)阻塞到直到其他任務(wù)再度被上層調(diào)用。

核心特點(diǎn): 后進(jìn)先出處理自己的任務(wù),先進(jìn)先出處理竊取的任務(wù)。
優(yōu)先處理自己的任務(wù)可以獲得更好的局部性。從其他隊(duì)列竊取任務(wù)的開銷會(huì)比在自己的隊(duì)列中執(zhí)行pop操作的開銷大。
竊取時(shí)從相反的方向進(jìn)行操作可以減小競爭。
更早期竊取可以獲得更大的單元任務(wù),從而竊取線程可以在將來進(jìn)行遞歸分解。

可以通過ForkJoin計(jì)算Fibonacci的例子來學(xué)習(xí)ForkJoin并行框架的使用

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;
import org.junit.Test;

public class ForkJoinFibonacciTest {

  static final long[] FIBONACCI = {1, 1, 2, 3, 5, 8, 13, 21};

  @Test
  public void testFibonacci() {
    int n = 40;

    long t0 = System.nanoTime();
    long rs = fibRecur(n);
    long t1 = System.nanoTime();
    System.out.printf(
        "recur fibonacci cost %d ms , result %d \n", (t1 - t0) / 1000000, rs);

    ForkJoinPool pool = new ForkJoinPool();
    FibTask task = new FibTask(n);
    t0 = System.nanoTime();
    pool.invoke(task);
    t1 = System.nanoTime();
    System.out.printf(
        "fork/join fibonacci cost %d ms, result %d \n", (t1 - t0) / 1000000,
        rs);
  }


  private static long fibRecur(int n) {
    if (n < FIBONACCI.length) {
      return FIBONACCI[n];
    }
    return fibRecur(n - 1) + fibRecur(n - 2);
  }

  private static class FibTask extends RecursiveTask<Long> {

    static final int threshold = FIBONACCI.length - 1;
    volatile int number;

    FibTask(int n) {
      number = n;
    }

    @Override
    protected Long compute() {
      int n = number;
      if (n < threshold) {
        return FIBONACCI[n];
      } else {
        FibTask task1 = new FibTask(n - 1);
        FibTask task2 = new FibTask(n - 2);
        task1.fork();
        task2.fork();
        return task1.join() + task2.join();
      }
    }
  }
}

輸出結(jié)果為

recur fibonacci cost 39 ms , result 165580141 
fork/join fibonacci cost 960 ms, result 165580141 

fork/join反而比普通的遞歸算法慢。個(gè)人認(rèn)為原因可能是Fibonacci計(jì)算本身簡單,因此導(dǎo)致fork/join的任務(wù)分配和管理帶來的開銷遠(yuǎn)超于并行計(jì)算帶來的提升。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容