執(zhí)行器

在之前的例子中,線程執(zhí)行的任務(wù),在Runnable對象中定義,和線程,在Thread對象中定義,兩者之間總是有一種密切的關(guān)聯(lián)。這樣的機制對小型應(yīng)用管用,但是在大型的應(yīng)用中,把線程管理和應(yīng)用的其他部分分離會更好。負責封裝這些功能的對象被成為Executors執(zhí)行器。之后的子章節(jié)詳細介紹了執(zhí)行器的細節(jié)。

  • Executor接口定義了三種執(zhí)行器對象類型。
  • Thread Pools是最常見的執(zhí)行器實現(xiàn)。
  • Fork/Join是JAVA 7中新引入的一種利用多處理器優(yōu)勢的框架。

Executor Interfaces

java.util.concurrent包定義了三種執(zhí)行器接口:

  • Executor,一個支持發(fā)起新任務(wù)的簡單接口。
  • ExecutorService,Executor的一個子接口,它增加了幫助生命期管理的功能,可以用在單個的任務(wù)和執(zhí)行器自己之上。
  • ScheduledExecutorService,是ExecutorService的子接口,支持未來and/or階段性的任務(wù)執(zhí)行。

執(zhí)行器接口

執(zhí)行器接口提供了一種簡單的方法,execute,被設(shè)計成線程創(chuàng)造動作的一個簡單代替。如果r是一個Runnable對象,那么e是一個可以替換的Executor對象,(new Thread(r)).start(); 可以替換成e.execute(r);。然而,execute的定義沒有那么具體。底層次的做法是建立一個線程,然后立刻執(zhí)行。依賴于Executor的實現(xiàn),execute也會做同樣的事,但是更有可能會使用一個現(xiàn)有的工人線程來運行r,或者是把r放進一個隊列,等待工人線程可用。(我們將在線程池的章節(jié)講到工人線程。)

java.util.concurrent包中的執(zhí)行器實現(xiàn)是被設(shè)計成能充分利用更高級的ExecutorService和ScheduledExecutorService接口,盡管他們也是基于基本的Executor接口工作。

ExecutorService接口

ExecutorService接口使用一個更加全能的方法submit來支持execute方法。和execute方法一樣,submit方法接受Runnable對象,但也接受Callable對象,Callable對象允許任務(wù)擁有一個返回值。submit方法會返回一個Future對象,它可以獲取Callable的返回值,以及管理Callable和Runnable任務(wù)的狀態(tài)。

ExecutorService也為提交大規(guī)模Callable集合對象提供了方法。最后,ExecutorService提供了一系列的方法來管理執(zhí)行器的關(guān)閉。為了支持立刻關(guān)閉,任務(wù)需要正確處理中斷。

ScheduledExecutorService接口

與它的父類ExecutorService相比,ScheduledExecutorService接口提供了schedule方法,該方法可以在一段時間的延遲后執(zhí)行Runnable或Callable任務(wù)。此外,該接口定義了scheduleAtFixedRate方法和scheduleWithFixedDelay方法,他們可以以特定的間隔,反復(fù)執(zhí)行特定的任務(wù)。

線程池

java.util.concurrent包中大部分的執(zhí)行器實現(xiàn)使用了線程池。這些線程池和Runnable、Callable任務(wù)分別存在,并且總是被用來運行多任務(wù)。

使用工人線程最小程度地減少了由于線程創(chuàng)建帶來的開銷。新建線程會使用大量的內(nèi)存,在大規(guī)模的應(yīng)用中,分配和回收很多線程對象會帶來巨大的內(nèi)存管理開銷。

一個常見的線程池類型是固定線程池。這種類型的線程池擁有指定數(shù)量的正在執(zhí)行的線程;如果一個線程在運行時意外退出了,另一個新的線程會自動取代它。任務(wù)通過內(nèi)部的隊列分配給池子,隊列負責保存多余的任務(wù),當任務(wù)的數(shù)量超過線程數(shù)量時。

固定線程池的一個顯著的優(yōu)點是應(yīng)用會非常小心地使用它。想要理解這一點,考慮一個網(wǎng)頁服務(wù)器應(yīng)用,其中的每個HTTP應(yīng)用都被一個不同的線程處理。如果這個應(yīng)用在每次獲得新線程的時候都創(chuàng)建一個新的線程,那么假如這個系統(tǒng)忽然收到超出它能力的請求數(shù)量,它可能會忽然停止響應(yīng)所有請求,因為這些操作的額外成本超出了系統(tǒng)的能力。

通過限制系統(tǒng)中能創(chuàng)建的線程數(shù)量,應(yīng)用沒辦法像請求發(fā)送速度一樣地處理請求,但它能夠盡它所能地來服務(wù)請求。

創(chuàng)建固定線程池的一個簡單方法是使用在java.util.concurrent包中的newFixedThreadPool工廠方法。這個類也提供了其他種類的工廠方法如下所示:

  • newCachedThreadPool方法,會創(chuàng)建一個能夠擴展線程池尺寸的執(zhí)行器。這種執(zhí)行器適合生命周期很短的應(yīng)用。
  • newSingleThreadExecutor方法,創(chuàng)建一次只會執(zhí)行一個任務(wù)的執(zhí)行器。
  • 還有幾個工廠方法是上面執(zhí)行器的ScheduledExecutorService版本。

如果上面提供的方法沒有能滿足你的要求,構(gòu)造java.util.concurrent.ThreadPoolExecutor或java.util.concurrent.ScheduledThreadPoolExecutor的實例會給你更多的選擇。

Fork/Join

Fork/Join框架是ExecutorService接口的一個實現(xiàn),它可以幫助你利用多處理器。它被設(shè)計成針對那些能夠被遞歸分解為更小任務(wù)的工作。目的是最大限度地使用可提供地處理器能力來提高應(yīng)用的性能。

和其他ExecutorService的實現(xiàn)一樣,fork/join框架把任務(wù)分發(fā)給線程池中的若干工人線程。讓Fork/Join與眾不同的是,它使用了工作偷竊算法。做完了事情的工人線程會從其他繁忙線程中偷任務(wù)。

Fork/Join框架的核心類是ForkJoinPool,它是AbstractExecutorService類的擴展。ForkJoinPool實現(xiàn)了核心的工作偷竊算法,并且會執(zhí)行ForkJoinTask進程。

基礎(chǔ)用法

使用fork/join框架的第一步,是寫能執(zhí)行一部分工作的代碼。你的代碼會看上去和下面的很像:

if (my portion of the work is small enough)
  do the work directly
else
  split my work into two pieces
  invoke the two pieces and wait for the results

把這些代碼包裹在ForkJoinTask子類中,或是使用它的更為具體的一個子類,比如RecursiveTask或RecursiveAction。

等你的ForkJoinTask子類完成之后,創(chuàng)建一個代表所有工作的對象,然后把它傳遞給ForkJoinPool實例的invoke方法。

為了清晰而模糊

為了幫助你理解fork/join框架的工作原理,考慮下面的案例。假設(shè)你想要涂掉一幅畫。原始的畫素材由一個整數(shù)數(shù)組代表,其中每個整數(shù)都包含著一個像素的顏色值?;煜蟮哪繕藞D也是由相同大小的整數(shù)數(shù)組代表。

實施模糊操作是由每次操作一個像素完成的。每個像素都被賦值為它周圍像素的平均值,結(jié)果存放在結(jié)果數(shù)組中。因為這個圖像是一個很大的數(shù)組,這個過程會花費很長的時間。你可以通過實現(xiàn)算法來使用fork/join,利用多核系統(tǒng)的并行處理能力。下面是一種可能的實現(xiàn):

public class ForkBlur extends RecursiveAction {
    private int[] mSource;
    private int mStart;
    private int mLength;
    private int[] mDestination;
  
    // Processing window size; should be odd.
    private int mBlurWidth = 15;
  
    public ForkBlur(int[] src, int start, int length, int[] dst) {
        mSource = src;
        mStart = start;
        mLength = length;
        mDestination = dst;
    }

    protected void computeDirectly() {
        int sidePixels = (mBlurWidth - 1) / 2;
        for (int index = mStart; index < mStart + mLength; index++) {
            // Calculate average.
            float rt = 0, gt = 0, bt = 0;
            for (int mi = -sidePixels; mi <= sidePixels; mi++) {
                int mindex = Math.min(Math.max(mi + index, 0),
                                    mSource.length - 1);
                int pixel = mSource[mindex];
                rt += (float)((pixel & 0x00ff0000) >> 16)
                      / mBlurWidth;
                gt += (float)((pixel & 0x0000ff00) >>  8)
                      / mBlurWidth;
                bt += (float)((pixel & 0x000000ff) >>  0)
                      / mBlurWidth;
            }
          
            // Reassemble destination pixel.
            int dpixel = (0xff000000     ) |
                   (((int)rt) << 16) |
                   (((int)gt) <<  8) |
                   (((int)bt) <<  0);
            mDestination[index] = dpixel;
        }
    }

你現(xiàn)在實現(xiàn)了抽象的compute()方法,它要么直接進行模糊操作,要么把任務(wù)分成兩份。一個簡單的數(shù)組長度閾值決定了這個工作是直接運算,還是分割。

protected static int sThreshold = 100000;

protected void compute() {
    if (mLength < sThreshold) {
        computeDirectly();
        return;
    }
    
    int split = mLength / 2;
    
    invokeAll(new ForkBlur(mSource, mStart, split, mDestination),
              new ForkBlur(mSource, mStart + split, mLength - split,
                           mDestination));
}
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容