【java】并發(fā)-ForkJoinPool

Fork 就是把一個(gè)大任務(wù)切分為若干個(gè)子任務(wù)并行地執(zhí)行,Join 就是合并這些子任務(wù)的執(zhí)行結(jié)果,最后得到這個(gè)大任務(wù)的結(jié)果。Fork/Join 框架使用的是工作竊取算法。

工作竊取算法

工作竊取算法是指某個(gè)線程從其他隊(duì)列里竊取任務(wù)來執(zhí)行。對(duì)于一個(gè)比較大的任務(wù),可以把它分割為若干個(gè)互不依賴的子任務(wù),為了減少線程間的競爭,把這些子任務(wù)分別放到不同的隊(duì)列里,并為每個(gè)隊(duì)列創(chuàng)建一個(gè)單獨(dú)的線程來執(zhí)行隊(duì)列里的任務(wù),線程和隊(duì)列一一對(duì)應(yīng)。但是,有的線程會(huì)先把自己隊(duì)列里的任務(wù)干完,而其他線程對(duì)應(yīng)的隊(duì)列里還有任務(wù)需要處理,于是它就去其他線程的隊(duì)列里竊取一個(gè)任務(wù)來執(zhí)行。由于此時(shí)它們訪問同一個(gè)隊(duì)列,為了減小競爭,通常會(huì)使用雙端隊(duì)列。被竊取任務(wù)的線程永遠(yuǎn)從雙端隊(duì)列的頭部獲取任務(wù),竊取任務(wù)的線程永遠(yuǎn)從雙端隊(duì)列的尾部獲取任務(wù)。


image.png

工作竊取算法的優(yōu)缺點(diǎn)

優(yōu)點(diǎn):充分利用線程進(jìn)行并行計(jì)算,減少了線程間的競爭。
缺點(diǎn):雙端隊(duì)列只存在一個(gè)任務(wù)時(shí)會(huì)導(dǎo)致競爭,會(huì)消耗更多的系統(tǒng)資源,因?yàn)樾枰獎(jiǎng)?chuàng)建多個(gè)線程和多個(gè)雙端隊(duì)列。

使用 ForkJoinPool 進(jìn)行分叉和合并

ForkJoinPool 在 Java 7 中被引入。它和 ExecutorService 很相似,除了一點(diǎn)不同。ForkJoinPool 讓我們可以很方便地把任務(wù)分裂成幾個(gè)更小的任務(wù),這些分裂出來的任務(wù)也將會(huì)提交給 ForkJoinPool。任務(wù)可以繼續(xù)分割成更小的子任務(wù),只要它還能分割。可能聽起來有些抽象,因此本節(jié)中我們將會(huì)解釋 ForkJoinPool 是如何工作的,還有任務(wù)分割是如何進(jìn)行的。

分叉和合并解釋

在我們開始看 ForkJoinPool 之前我們先來簡要解釋一下分叉和合并的原理。
分叉和合并原理包含兩個(gè)遞歸進(jìn)行的步驟。兩個(gè)步驟分別是分叉步驟和合并步驟。

分叉

一個(gè)使用了分叉和合并原理的任務(wù)可以將自己分叉(分割)為更小的子任務(wù),這些子任務(wù)可以被并發(fā)執(zhí)行。如下圖所示:


image.png

通過把自己分割成多個(gè)子任務(wù),每個(gè)子任務(wù)可以由不同的 CPU 并行執(zhí)行,或者被同一個(gè) CPU 上的不同線程執(zhí)行。
只有當(dāng)給的任務(wù)過大,把它分割成幾個(gè)子任務(wù)才有意義。把任務(wù)分割成子任務(wù)有一定開銷,因此對(duì)于小型任務(wù),這個(gè)分割的消耗可能比每個(gè)子任務(wù)并發(fā)執(zhí)行的消耗還要大。
什么時(shí)候把一個(gè)任務(wù)分割成子任務(wù)是有意義的,這個(gè)界限也稱作一個(gè)閥值。這要看每個(gè)任務(wù)對(duì)有意義閥值的決定。很大程度上取決于它要做的工作的種類。

合并

當(dāng)一個(gè)任務(wù)將自己分割成若干子任務(wù)之后,該任務(wù)將進(jìn)入等待所有子任務(wù)的結(jié)束之中。
一旦子任務(wù)執(zhí)行結(jié)束,該任務(wù)可以把所有結(jié)果合并到同一個(gè)結(jié)果。圖示如下:


image.png

當(dāng)然,并非所有類型的任務(wù)都會(huì)返回一個(gè)結(jié)果。如果這個(gè)任務(wù)并不返回一個(gè)結(jié)果,它只需等待所有子任務(wù)執(zhí)行完畢。也就不需要結(jié)果的合并啦。

ForkJoinPool

ForkJoinPool 是一個(gè)特殊的線程池,它的設(shè)計(jì)是為了更好的配合 分叉-和-合并 任務(wù)分割的工作。ForkJoinPool 也在 java.util.concurrent 包中,其完整類名為 java.util.concurrent.ForkJoinPool。

創(chuàng)建一個(gè) ForkJoinPool

你可以通過其構(gòu)造子創(chuàng)建一個(gè) ForkJoinPool。作為傳遞給 ForkJoinPool 構(gòu)造子的一個(gè)參數(shù),你可以定義你期望的并行級(jí)別。并行級(jí)別表示你想要傳遞給 ForkJoinPool 的任務(wù)所需的線程或 CPU 數(shù)量。以下是一個(gè) ForkJoinPool 示例:

ForkJoinPool forkJoinPool = new ForkJoinPool(4);

這個(gè)示例創(chuàng)建了一個(gè)并行級(jí)別為 4 的 ForkJoinPool。

提交任務(wù)到 ForkJoinPool

就像提交任務(wù)到 ExecutorService 那樣,把任務(wù)提交到 ForkJoinPool。你可以提交兩種類型的任務(wù)。一種是沒有任何返回值的(一個(gè) "行動(dòng)"),另一種是有返回值的(一個(gè)"任務(wù)")。這兩種類型分別由 RecursiveAction 和 RecursiveTask 表示。接下來介紹如何使用這兩種類型的任務(wù),以及如何對(duì)它們進(jìn)行提交。

RecursiveAction

RecursiveAction 是一種沒有任何返回值的任務(wù)。它只是做一些工作,比如寫數(shù)據(jù)到磁盤,然后就退出了。
一個(gè) RecursiveAction 可以把自己的工作分割成更小的幾塊,這樣它們可以由獨(dú)立的線程或者 CPU 執(zhí)行。
你可以通過繼承來實(shí)現(xiàn)一個(gè) RecursiveAction。示例如下:

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.RecursiveAction;
 
public class MyRecursiveAction extends RecursiveAction {
 
    private long workLoad = 0;
 
    public MyRecursiveAction(long workLoad) {
        this.workLoad = workLoad;
    }
 
    @Override
    protected void compute() {
 
        //if work is above threshold, break tasks up into smaller tasks
        if(this.workLoad > 16) {
            System.out.println("Splitting workLoad : " + this.workLoad);
 
            List<MyRecursiveAction> subtasks =
                new ArrayList<MyRecursiveAction>();
 
            subtasks.addAll(createSubtasks());
 
            for(RecursiveAction subtask : subtasks){
                subtask.fork();
            }
 
        } else {
            System.out.println("Doing workLoad myself: " + this.workLoad);
        }
    }
 
    private List<MyRecursiveAction> createSubtasks() {
        List<MyRecursiveAction> subtasks =
            new ArrayList<MyRecursiveAction>();
 
        MyRecursiveAction subtask1 = new MyRecursiveAction(this.workLoad / 2);
        MyRecursiveAction subtask2 = new MyRecursiveAction(this.workLoad / 2);
 
        subtasks.add(subtask1);
        subtasks.add(subtask2);
 
        return subtasks;
    }
 
}

例子很簡單。MyRecursiveAction 將一個(gè)虛構(gòu)的 workLoad 作為參數(shù)傳給自己的構(gòu)造子。如果 workLoad 高于一個(gè)特定閥值,該工作將被分割為幾個(gè)子工作,子工作繼續(xù)分割。如果 workLoad 低于特定閥值,該工作將由 MyRecursiveAction 自己執(zhí)行。
你可以這樣規(guī)劃一個(gè) MyRecursiveAction 的執(zhí)行:

MyRecursiveAction myRecursiveAction = new MyRecursiveAction(24);
 
forkJoinPool.invoke(myRecursiveAction);

RecursiveTask

RecursiveTask 是一種會(huì)返回結(jié)果的任務(wù)。它可以將自己的工作分割為若干更小任務(wù),并將這些子任務(wù)的執(zhí)行結(jié)果合并到一個(gè)集體結(jié)果。可以有幾個(gè)水平的分割和合并。以下是一個(gè) RecursiveTask 示例:


import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.RecursiveTask;
    
    
public class MyRecursiveTask extends RecursiveTask<Long> {
 
    private long workLoad = 0;
 
    public MyRecursiveTask(long workLoad) {
        this.workLoad = workLoad;
    }
 
    protected Long compute() {
 
        //if work is above threshold, break tasks up into smaller tasks
        if(this.workLoad > 16) {
            System.out.println("Splitting workLoad : " + this.workLoad);
 
            List<MyRecursiveTask> subtasks =
                new ArrayList<MyRecursiveTask>();
            subtasks.addAll(createSubtasks());
 
            for(MyRecursiveTask subtask : subtasks){
                subtask.fork();
            }
 
            long result = 0;
            for(MyRecursiveTask subtask : subtasks) {
                result += subtask.join();
            }
            return result;
 
        } else {
            System.out.println("Doing workLoad myself: " + this.workLoad);
            return workLoad * 3;
        }
    }
    
    private List<MyRecursiveTask> createSubtasks() {
        List<MyRecursiveTask> subtasks =
        new ArrayList<MyRecursiveTask>();
 
        MyRecursiveTask subtask1 = new MyRecursiveTask(this.workLoad / 2);
        MyRecursiveTask subtask2 = new MyRecursiveTask(this.workLoad / 2);
 
        subtasks.add(subtask1);
        subtasks.add(subtask2);
 
        return subtasks;
    }
}

除了有一個(gè)結(jié)果返回之外,這個(gè)示例和 RecursiveAction 的例子很像。MyRecursiveTask 類繼承自 RecursiveTask<Long>,這也就意味著它將返回一個(gè) Long 類型的結(jié)果。
MyRecursiveTask 示例也會(huì)將工作分割為子任務(wù),并通過 fork() 方法對(duì)這些子任務(wù)計(jì)劃執(zhí)行。
此外,本示例還通過調(diào)用每個(gè)子任務(wù)的 join() 方法收集它們返回的結(jié)果。子任務(wù)的結(jié)果隨后被合并到一個(gè)更大的結(jié)果,并最終將其返回。對(duì)于不同級(jí)別的遞歸,這種子任務(wù)的結(jié)果合并可能會(huì)發(fā)生遞歸。
你可以這樣規(guī)劃一個(gè) RecursiveTask:

MyRecursiveTask myRecursiveTask = new MyRecursiveTask(128);
 
long mergedResult = forkJoinPool.invoke(myRecursiveTask);
 
System.out.println("mergedResult = " + mergedResult);

Fork/Join 案例Demo

需求:使用 Fork/Join 計(jì)算 1-10000的和,當(dāng)一個(gè)任務(wù)的計(jì)算數(shù)量大于3000時(shí)拆分任務(wù),數(shù)量小于3000時(shí)計(jì)算。

image.png

因?yàn)?~10000求和,耗時(shí)較少。下面我們將數(shù)據(jù)調(diào)大,求和1 ~ 59999999999(599億),然后來對(duì)比一下使用 Fork/Join求和 和 普通求和之間的效率差異。

普通求和

public class ForkJoinDemo {
    public static void main(String[] args) {
        //開始時(shí)間
        Long start = System.currentTimeMillis();
        long sum = 0l;
        for (long i = 1; i <= 9999999999L; i++) {
            sum+=i;
        }
        System.out.println(sum);
        //結(jié)束時(shí)間
        Long end = System.currentTimeMillis();
        System.out.println("消耗時(shí)間:"+(end-start));
    }
}

結(jié)果:從CPU利用率發(fā)現(xiàn),其實(shí)每個(gè)核心的利用率并不同,因?yàn)樽约汉说墓ぷ魍瓿闪司托菹⒘耍⒉粫?huì)去竊取其他核心的任務(wù),所以資源并不能得到充分利用;最終耗時(shí):14394毫秒。

Fork/Join求和

public class ForkJoinDemo {

    public static void main(String[] args) {
        Long start = System.currentTimeMillis();
        //放入線程池
        ForkJoinPool pool = new ForkJoinPool();
        SumRecursiveTask task = new SumRecursiveTask(1, 59999999999L);
        Long result = pool.invoke(task);
        System.out.println("result="+result);
        Long end = System.currentTimeMillis();
        System.out.println("消耗時(shí)間:"+(end-start));
    }
}

//1.創(chuàng)建一個(gè)求和的任務(wù)
//RecursiveTask:表示一個(gè)任務(wù)
class SumRecursiveTask extends RecursiveTask<Long>{

    //大于3000要拆分(創(chuàng)建一個(gè)變量)
    //是否要拆分的臨界值
    private static final long THRESHOLD = 3000L;

    //起始值
    private final long start;
    //結(jié)束值
    private final long end;

    //構(gòu)造方法(傳遞起始值、結(jié)束值)
    public SumRecursiveTask(long start, long end) {
        this.start = start;
        this.end = end;
    }

    //任務(wù)編寫完成
    @Override
    protected Long compute() {
        long length = end - start;
        //計(jì)算
        if(length < THRESHOLD){
            long sum = 0;
            for (long i = start; i <= end; i++) {
                sum +=i;
            }
            return sum;
        }else{
            //拆分
            long middle = (start + end) /2;
            SumRecursiveTask left = new SumRecursiveTask(start,middle);
            left.fork();

            SumRecursiveTask right = new SumRecursiveTask(middle+1,end);
            right.fork();
            return left.join() +right.join();
        }
    }
}

小結(jié): Fork/Join 算法使用工作竊取算法,我們發(fā)現(xiàn)在求和的過程中,每個(gè)核心的利用率都為100%,所以資源能夠得到充分的利用;最終耗時(shí):8880毫秒

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容