在之前的例子中,線程執(zhí)行的任務(wù),在Runnable對象中定義,和線程,在Thread對象中定義,兩者之間總是有一種密切的關(guān)聯(lián)。這樣的機制對小型應(yīng)用管用,但是在大型的應(yīng)用中,把線程管理和應(yīng)用的其他部分分離會更好。負責封裝這些功能的對象被成為Executors執(zhí)行器。之后的子章節(jié)詳細介紹了執(zhí)行器的細節(jié)。
- Executor接口定義了三種執(zhí)行器對象類型。
- Thread Pools是最常見的執(zhí)行器實現(xiàn)。
- Fork/Join是JAVA 7中新引入的一種利用多處理器優(yōu)勢的框架。
Executor Interfaces
java.util.concurrent包定義了三種執(zhí)行器接口:
- Executor,一個支持發(fā)起新任務(wù)的簡單接口。
- ExecutorService,Executor的一個子接口,它增加了幫助生命期管理的功能,可以用在單個的任務(wù)和執(zhí)行器自己之上。
- ScheduledExecutorService,是ExecutorService的子接口,支持未來and/or階段性的任務(wù)執(zhí)行。
執(zhí)行器接口
執(zhí)行器接口提供了一種簡單的方法,execute,被設(shè)計成線程創(chuàng)造動作的一個簡單代替。如果r是一個Runnable對象,那么e是一個可以替換的Executor對象,(new Thread(r)).start(); 可以替換成e.execute(r);。然而,execute的定義沒有那么具體。底層次的做法是建立一個線程,然后立刻執(zhí)行。依賴于Executor的實現(xiàn),execute也會做同樣的事,但是更有可能會使用一個現(xiàn)有的工人線程來運行r,或者是把r放進一個隊列,等待工人線程可用。(我們將在線程池的章節(jié)講到工人線程。)
java.util.concurrent包中的執(zhí)行器實現(xiàn)是被設(shè)計成能充分利用更高級的ExecutorService和ScheduledExecutorService接口,盡管他們也是基于基本的Executor接口工作。
ExecutorService接口
ExecutorService接口使用一個更加全能的方法submit來支持execute方法。和execute方法一樣,submit方法接受Runnable對象,但也接受Callable對象,Callable對象允許任務(wù)擁有一個返回值。submit方法會返回一個Future對象,它可以獲取Callable的返回值,以及管理Callable和Runnable任務(wù)的狀態(tài)。
ExecutorService也為提交大規(guī)模Callable集合對象提供了方法。最后,ExecutorService提供了一系列的方法來管理執(zhí)行器的關(guān)閉。為了支持立刻關(guān)閉,任務(wù)需要正確處理中斷。
ScheduledExecutorService接口
與它的父類ExecutorService相比,ScheduledExecutorService接口提供了schedule方法,該方法可以在一段時間的延遲后執(zhí)行Runnable或Callable任務(wù)。此外,該接口定義了scheduleAtFixedRate方法和scheduleWithFixedDelay方法,他們可以以特定的間隔,反復(fù)執(zhí)行特定的任務(wù)。
線程池
java.util.concurrent包中大部分的執(zhí)行器實現(xiàn)使用了線程池。這些線程池和Runnable、Callable任務(wù)分別存在,并且總是被用來運行多任務(wù)。
使用工人線程最小程度地減少了由于線程創(chuàng)建帶來的開銷。新建線程會使用大量的內(nèi)存,在大規(guī)模的應(yīng)用中,分配和回收很多線程對象會帶來巨大的內(nèi)存管理開銷。
一個常見的線程池類型是固定線程池。這種類型的線程池擁有指定數(shù)量的正在執(zhí)行的線程;如果一個線程在運行時意外退出了,另一個新的線程會自動取代它。任務(wù)通過內(nèi)部的隊列分配給池子,隊列負責保存多余的任務(wù),當任務(wù)的數(shù)量超過線程數(shù)量時。
固定線程池的一個顯著的優(yōu)點是應(yīng)用會非常小心地使用它。想要理解這一點,考慮一個網(wǎng)頁服務(wù)器應(yīng)用,其中的每個HTTP應(yīng)用都被一個不同的線程處理。如果這個應(yīng)用在每次獲得新線程的時候都創(chuàng)建一個新的線程,那么假如這個系統(tǒng)忽然收到超出它能力的請求數(shù)量,它可能會忽然停止響應(yīng)所有請求,因為這些操作的額外成本超出了系統(tǒng)的能力。
通過限制系統(tǒng)中能創(chuàng)建的線程數(shù)量,應(yīng)用沒辦法像請求發(fā)送速度一樣地處理請求,但它能夠盡它所能地來服務(wù)請求。
創(chuàng)建固定線程池的一個簡單方法是使用在java.util.concurrent包中的newFixedThreadPool工廠方法。這個類也提供了其他種類的工廠方法如下所示:
- newCachedThreadPool方法,會創(chuàng)建一個能夠擴展線程池尺寸的執(zhí)行器。這種執(zhí)行器適合生命周期很短的應(yīng)用。
- newSingleThreadExecutor方法,創(chuàng)建一次只會執(zhí)行一個任務(wù)的執(zhí)行器。
- 還有幾個工廠方法是上面執(zhí)行器的ScheduledExecutorService版本。
如果上面提供的方法沒有能滿足你的要求,構(gòu)造java.util.concurrent.ThreadPoolExecutor或java.util.concurrent.ScheduledThreadPoolExecutor的實例會給你更多的選擇。
Fork/Join
Fork/Join框架是ExecutorService接口的一個實現(xiàn),它可以幫助你利用多處理器。它被設(shè)計成針對那些能夠被遞歸分解為更小任務(wù)的工作。目的是最大限度地使用可提供地處理器能力來提高應(yīng)用的性能。
和其他ExecutorService的實現(xiàn)一樣,fork/join框架把任務(wù)分發(fā)給線程池中的若干工人線程。讓Fork/Join與眾不同的是,它使用了工作偷竊算法。做完了事情的工人線程會從其他繁忙線程中偷任務(wù)。
Fork/Join框架的核心類是ForkJoinPool,它是AbstractExecutorService類的擴展。ForkJoinPool實現(xiàn)了核心的工作偷竊算法,并且會執(zhí)行ForkJoinTask進程。
基礎(chǔ)用法
使用fork/join框架的第一步,是寫能執(zhí)行一部分工作的代碼。你的代碼會看上去和下面的很像:
if (my portion of the work is small enough)
do the work directly
else
split my work into two pieces
invoke the two pieces and wait for the results
把這些代碼包裹在ForkJoinTask子類中,或是使用它的更為具體的一個子類,比如RecursiveTask或RecursiveAction。
等你的ForkJoinTask子類完成之后,創(chuàng)建一個代表所有工作的對象,然后把它傳遞給ForkJoinPool實例的invoke方法。
為了清晰而模糊
為了幫助你理解fork/join框架的工作原理,考慮下面的案例。假設(shè)你想要涂掉一幅畫。原始的畫素材由一個整數(shù)數(shù)組代表,其中每個整數(shù)都包含著一個像素的顏色值?;煜蟮哪繕藞D也是由相同大小的整數(shù)數(shù)組代表。
實施模糊操作是由每次操作一個像素完成的。每個像素都被賦值為它周圍像素的平均值,結(jié)果存放在結(jié)果數(shù)組中。因為這個圖像是一個很大的數(shù)組,這個過程會花費很長的時間。你可以通過實現(xiàn)算法來使用fork/join,利用多核系統(tǒng)的并行處理能力。下面是一種可能的實現(xiàn):
public class ForkBlur extends RecursiveAction {
private int[] mSource;
private int mStart;
private int mLength;
private int[] mDestination;
// Processing window size; should be odd.
private int mBlurWidth = 15;
public ForkBlur(int[] src, int start, int length, int[] dst) {
mSource = src;
mStart = start;
mLength = length;
mDestination = dst;
}
protected void computeDirectly() {
int sidePixels = (mBlurWidth - 1) / 2;
for (int index = mStart; index < mStart + mLength; index++) {
// Calculate average.
float rt = 0, gt = 0, bt = 0;
for (int mi = -sidePixels; mi <= sidePixels; mi++) {
int mindex = Math.min(Math.max(mi + index, 0),
mSource.length - 1);
int pixel = mSource[mindex];
rt += (float)((pixel & 0x00ff0000) >> 16)
/ mBlurWidth;
gt += (float)((pixel & 0x0000ff00) >> 8)
/ mBlurWidth;
bt += (float)((pixel & 0x000000ff) >> 0)
/ mBlurWidth;
}
// Reassemble destination pixel.
int dpixel = (0xff000000 ) |
(((int)rt) << 16) |
(((int)gt) << 8) |
(((int)bt) << 0);
mDestination[index] = dpixel;
}
}
你現(xiàn)在實現(xiàn)了抽象的compute()方法,它要么直接進行模糊操作,要么把任務(wù)分成兩份。一個簡單的數(shù)組長度閾值決定了這個工作是直接運算,還是分割。
protected static int sThreshold = 100000;
protected void compute() {
if (mLength < sThreshold) {
computeDirectly();
return;
}
int split = mLength / 2;
invokeAll(new ForkBlur(mSource, mStart, split, mDestination),
new ForkBlur(mSource, mStart + split, mLength - split,
mDestination));
}