Fork/Join框架與ForkJoinPool

1. Fork/Join框架

fork操作的作用是把一個大的問題劃分成若干個較小的問題。在這個劃分過程一般是遞歸進行的。直到可以直接進行計算。需要恰當?shù)剡x取子問題的大小。太大的子問題不利于通過并行方式來提高性能,而太小的子問題則會帶來較大的額外開銷。每個子問題計算完成后,可以得到關于整個問題的部分解。join操作的作用是把這些分解手機組織起來,得到完整解。

簡單的說,F(xiàn)orkJoin其核心思想就是分治。Fork分解任務,Join收集數(shù)據(jù)。

在fork/join框架中,若某個子問題由于等待另一個子問題的完成而無法繼續(xù)執(zhí)行。那么處理該子問題的線程會主動尋找其他尚未運行完成的子問題來執(zhí)行。這種方式減少了線程的等待時間,提高了性能。子問題中應該避免使用synchronized關鍵詞或其他方式方式的同步。也不應該是一阻塞IO或過多的訪問共享變量。在理想情況下,每個子問題的實現(xiàn)中都應該只進行CPU相關的計算,并且只適用每個問題的內(nèi)部對象。唯一的同步應該只發(fā)生在子問題和創(chuàng)建它的父問題之間。

2. Fork/Join框架的主要類

一個fork/join框架之下的任務由ForkJoinTask類表示。ForkJoinTask實現(xiàn)了Future接口,可以按照Future接口的方式來使用。在ForkJoinTask類中之重要的兩個方法fork和join。fork方法用以一部方式啟動任務的執(zhí)行,join方法則等待任務完成并返回指向結(jié)果。在創(chuàng)建自己的任務是,最好不要直接繼承自ForkJoinTask類,而要繼承自ForkJoinTask類的子類RecurisiveTask或RecurisiveAction類。兩種的區(qū)別在于RecurisiveTask類表示的任務可以返回結(jié)果,而RecurisiveAction類不行。

簡單總結(jié):

ForkJoin主要提供了兩個主要的執(zhí)行任務的接口。RecurisiveAction與RecurisiveTask 。

  • RecurisiveAction :沒有返回值的接口。
  • RecurisiveTask :帶有返回值的接口。

fork/join框架任務的執(zhí)行由ForkJoinTask類的對象之外,還可以使用一般的Callable和Runnable接口來表示任務。

ForkJoin要利用線程池ForkJoinPool。每個線程池都有一個WorkQueue實例。ForkJoinPool推薦查看JDK8的源碼,比JDK7更利于理解。

在ForkJoinPool類的對象中執(zhí)行的任務大支可以分為兩類,一類通過execute、invoke或submit提交的任務;另一類是ForkJoinTask類的對象在執(zhí)行過程中產(chǎn)生的子任務,并通過fork方法來運行。一般的做法是表示整個問題的ForkJoinTask類的對象用第一類型是提交,而在執(zhí)行過程中產(chǎn)生的子任務并不需要進行處理,F(xiàn)orkJoinPool類對象會負責子任務的執(zhí)行。

ForkJoinPool是ExecutorService的實現(xiàn)類,因此是一種特殊的線程池。使用方法與Executor框架類似。ForkJoinPool提供如下兩個常用的構(gòu)造器:

ForkJoinPool(int parallelism) 創(chuàng)建一個包含parallelism個并行線程的ForkJoinPool。

ForkJoinPool() 以Runtime.availableProcessors()方法的返回值作為parallelism參數(shù)來創(chuàng)建ForkJoinPool。

ForkJoinPool有如下三個方法啟動線程:

使用ForkJoinPool的submit(ForkJoinTask task) 或 invoke(ForkJoinTask task) 方法來執(zhí)行指定任務。其中ForkJoinTask代表一個可以并行、合并的任務。

| | 客戶端非fork/join調(diào)用 | 內(nèi)部調(diào)用fork/join |
| 異步執(zhí)行 | execute(ForkJoinTask) | ForkJoinTask.fork |
| 等待獲取結(jié)果 | invoke(ForkJoinTask) | ForkJoinTask.invoke |
| 執(zhí)行,獲取Future |

submit(ForkJoinTask)

| ForkJoinTask.fork(ForkJoinTask are Futures) |

ForkJoinTask是分支合并的執(zhí)行任何,分支合并的業(yè)務邏輯使用者可以再繼承了這個抽先類之后,在抽象方法exec()中實現(xiàn)。其中exec()的返回結(jié)果和ForkJoinPool的執(zhí)行調(diào)用方(execute(...),invoke(...),submit(...)),共同決定著線程是否阻塞,具體請看下面的測試用例。

ForkJoinTask 是一個抽象類,它還有兩個抽象子類:RecurisiveTask和RecurisiveAction。

RecurisiveTask代表有返回值的任務。RecursiveTask<T>是泛型類。T是返回值的類型。

RecurisiveAction代表沒有返回值的任務。

3. 異常處理

ForkJoinTask在執(zhí)行的時候可能會拋出異常,但是沒辦法在主線程里直接捕獲異常,所以ForkJoinTask提供了isCompletedAbnormally()方法來檢查任務是否已經(jīng)拋出異常或已經(jīng)被取消了,并且可以通過ForkJoinTask的getException方法獲取異常。使用如下代碼:

<pre style="margin: 0px; padding: 0px; white-space: pre-wrap; word-wrap: break-word; font-family: "Courier New" !important; font-size: 12px !important;">if(task.isCompletedAbnormally()) {
System.out.println(task.getException());
}</pre>

getException方法返回Throwable對象,如果任務被取消了則返回CancellationException。如果任務沒有完成或者沒有拋出異常則返回null。

4. ForkJoinPool

ForkJoinPool是Java 1.7之后新添加的一個ExecutorService實現(xiàn),在java.util.concurrent中。和其他的ExecutorService一樣,F(xiàn)orkJoinPool在提供自身特殊優(yōu)勢的同時也可以作為一個普通的Executor框架來使用,通過submit等方法來提交Runnable任務。

ForkJoinPool最大的特殊之處就在于其實現(xiàn)了工作密?。?a target="_blank" rel="nofollow">work-stealing)。

工作竊取 (work-stealing)

所謂工作竊取,即當前線程的Task已經(jīng)全被執(zhí)行完畢,則自動取到其他線程的Task池中取出Task繼續(xù)執(zhí)行。

image

ForkJoinPool中維護著多個線程(一般為CPU核數(shù))在不斷地執(zhí)行Task,每個線程除了執(zhí)行自己職務內(nèi)的Task之外,還會根據(jù)自己工作線程的閑置情況去獲取其他繁忙的工作線程的Task,如此一來就能能夠減少線程阻塞或是閑置的時間,提高CPU利用率。

實現(xiàn)原理

ForkJoinPool的具體實現(xiàn)可參考Doug Lea的論文:A Java Fork/Join Framework

image

ForkJoinPool中的工作線程是由ForkJoinWorkerThread類實現(xiàn)的,其通過維護一個雙端隊列(ForkJoinPool.WorkQueue)來存放Task的,這里的Task一般是ForkJoinTask的子類。每一個工作線程簡單的通過以下兩條原則進行活動:

  • 若隊列非空,則代表自己線程的Task還沒執(zhí)行完畢,取出Task并執(zhí)行。
  • 若隊列為空,則隨機選取一個其他的工作線程的Task并執(zhí)行。

那么為了減少在對Task雙端隊列進行操作時的Race Condition,這里的雙端隊列通過維護一個top變量和一個base變量來解決這個問題。top變量類似于棧幀,當ForkJoinTask fork出新的Task或者Client從外部提交一個新的Task的ForkJoinPool時,工作線程將Task以LIFO的方式push到雙端隊列的隊頭,top維護隊頭的位置,可以簡單理解為雙端隊列push的部分為一個棧。而base維護隊列的隊尾,當別的線程需要從本工作線程密取任務時,是從雙端隊列的隊尾出取出任務。工作隊列基于以下幾個保證對隊列進行操作:

  • push和pop操作只會被owner線程調(diào)用。
  • 只有非owner線程會調(diào)用take操作。
  • pop和take操作只有在隊列將要變成空(當前只有一個元素)時才會需要處理同步問題。

也就是說這個實現(xiàn)的雙端隊列將整體的同步問題轉(zhuǎn)換為了一個two-party的同步問題,對于take而言我們只要提供一個簡單的entry lock來保證所以其他線程的take的一致性,而對于自己owner線程的pop和push幾乎不需要同步。

由于ForkJoinPool的這些特性,因此它除了適合用來實現(xiàn)分而治之的計算框架以外,還非常適合用來作為基于event的異步消息處理執(zhí)行框架,而Akka正是將ForkJoinPool作為默認的底層ExcutorService。事實證明,F(xiàn)orkJoinPool在Akka這種基于消息傳遞的異步執(zhí)行環(huán)境下能夠展現(xiàn)出非常高的性能優(yōu)勢,前提是盡量減少在處理過程中的線程阻塞(如IO等待等等)。

為了防止無良網(wǎng)站的爬蟲抓取文章,特此標識,轉(zhuǎn)載請注明文章出處。LaplaceDemon/ShiJia

http://www.cnblogs.com/shijiaqi1066/p/4631466.html

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容