手把手教你 Fork/Join 并發(fā)處理

ForkJoinPool 是 Java 7 加入的一個(gè)并發(fā)處理類(lèi),位于 java.util.concurrent 包。

Fork / Join 回顧

ForkJoinPool 讓我們可以很方便地把任務(wù)分裂成幾個(gè)更小的任務(wù),這些分裂出來(lái)的任務(wù)也將會(huì)提交給 ForkJoinPool。任務(wù)可以繼續(xù)分割成更小的子任務(wù),只要它還能分割。分叉和合并原理包含兩個(gè)遞歸進(jìn)行的步驟。兩個(gè)步驟分別是分叉步驟和合并步驟。

一個(gè)使用了分叉和合并原理的任務(wù)可以將自己分叉(分割)為更小的子任務(wù),這些子任務(wù)可以被并發(fā)執(zhí)行。如下圖所示:


通過(guò)把自己分割成多個(gè)子任務(wù),每個(gè)子任務(wù)可以由不同的 CPU 并行執(zhí)行,或者被同一個(gè) CPU 上的不同線(xiàn)程執(zhí)行。

只有當(dāng)給的任務(wù)過(guò)大,把它分割成幾個(gè)子任務(wù)才有意義。把任務(wù)分割成子任務(wù)有一定開(kāi)銷(xiāo),因此對(duì)于小型任務(wù),這個(gè)分割的消耗可能比每個(gè)子任務(wù)并發(fā)執(zhí)行的消耗還要大。

什么時(shí)候把一個(gè)任務(wù)分割成子任務(wù)是有意義的,這個(gè)界限也稱(chēng)作一個(gè)閥值。這要看每個(gè)任務(wù)對(duì)有意義閥值的決定。很大程度上取決于它要做的工作的種類(lèi)。

當(dāng)一個(gè)任務(wù)將自己分割成若干子任務(wù)之后,該任務(wù)將等待所有子任務(wù)結(jié)束。一旦子任務(wù)執(zhí)行結(jié)束,該任務(wù)可以把所有結(jié)果合并到同一個(gè)結(jié)果。圖示如下:

Fork / Join 的使用

Fork / Join 的使用主要涉及 ForkJoinPool 和 ForkJoinTask。ForkJoinTask 類(lèi)定義了任務(wù),實(shí)現(xiàn)了 Fork 和 Join 操作;ForkJoinPool 管理線(xiàn)程與任務(wù)的執(zhí)行。

ForkJoinTask 類(lèi)是一個(gè)抽象類(lèi),要求子類(lèi)實(shí)現(xiàn)以下三個(gè)方法:

getRawResult :獲取 ForkJoinTask 的執(zhí)行結(jié)果;

setRawResult :設(shè)置 ForkJoinTask 的執(zhí)行結(jié)果;

exec :ForkJoinTask 的執(zhí)行邏輯,返回 true 表示正常返回;

為了方便開(kāi)發(fā),標(biāo)準(zhǔn)庫(kù)提供了 ForkJoinTask 的一個(gè)子類(lèi) RecursiveTask。RecursiveTask 類(lèi)也是一個(gè)抽象類(lèi),封裝了上述 3 個(gè)方法的實(shí)現(xiàn),要求子類(lèi)實(shí)現(xiàn)一個(gè)方法 compute。這樣一來(lái),我們只需要實(shí)現(xiàn) compute 一個(gè)方法就可以使用 ForkJoinTask 了。RecursiveTask 的定義如下:

public abstract class RecursiveTask<V> extends ForkJoinTask<V> {
    private static final long serialVersionUID = 5232453952276485270L;
    V result;

    protected abstract V compute();

    public final V getRawResult() {
        return result;
    }

    protected final void setRawResult(V value) {
        result = value;
    }

    protected final boolean exec() {
        result = compute();
        return true;
    }
}

以計(jì)算數(shù)組所有數(shù)字的和為例,我們定義一個(gè) Task 類(lèi)繼承 RecursiveTask,在 compute 方法中把數(shù)組一分為二,創(chuàng)建兩個(gè) Task 實(shí)例,分別調(diào)用 fork 方法,再分別調(diào)用 join 方法獲取兩個(gè) Task 的計(jì)算結(jié)果,從而得到數(shù)組所有數(shù)字的和。

public class Task extends RecursiveTask<Long> {
    private static final long serialVersionUID = 1L;
    long[] data;
    int start;
    int end;

    public Task(long[] data, int start, int end) {
        this.data = data;
        this.start = start;
        this.end = end;
    }

    @Override
    protected Long compute() {
        long sum = 0;
        if(end - start < 1000){
            for(int i = start; i <= end; i++){
                sum += data[i];
            }
        }else {
            //分割任務(wù)
            int middle = (start + end) / 2;
            Task left = new Task(data,start,middle);
            Task right = new Task(data,middle + 1,end);
            left.fork();//fork 操作
            right.fork();//fork 操作
            sum = left.join() + right.join();//join操作
        }
        return sum;
    }
}

使用 ForkJoinPool 的方法很簡(jiǎn)單,創(chuàng)建 ForkJoinPool 實(shí)例,然后調(diào)用 invoke 方法執(zhí)行 ForkJoinTask 任務(wù)即可獲得計(jì)算結(jié)果。

long[] data = new long[1024*1024];
Arrays.setAll(data, i -> i);
long sum = new ForkJoinPool().invoke(
    new Task(data, 0, data.length - 1)
);
System.out.println(sum);

如果不需要獲取計(jì)算的結(jié)果,比如需要執(zhí)行一些沒(méi)有返回值的操作,也可以調(diào)用 execute 方法。

每周 3 篇學(xué)習(xí)筆記或技術(shù)總結(jié),內(nèi)容涉及 Java 進(jìn)階、虛擬機(jī)、MySQL、NoSQL、分布式計(jì)算、開(kāi)源框架等多個(gè)領(lǐng)域。關(guān)注作者或微信公眾號(hào) 后端開(kāi)發(fā)那點(diǎn)事兒 第一時(shí)間獲取最新內(nèi)容。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀(guān)點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容