20.執(zhí)行器

任務(wù)

任務(wù)通常是一些抽象且離散的邏輯工作單元。當(dāng)圍繞任務(wù)執(zhí)行來(lái)構(gòu)建并發(fā)程序時(shí),需要找到任務(wù)的邊界,使得每個(gè)任務(wù)盡可能與其他任務(wù)獨(dú)立開(kāi)來(lái),這樣能夠獨(dú)立地在單獨(dú)的線(xiàn)程中執(zhí)行,提高并發(fā)性。

線(xiàn)程池

線(xiàn)程池是指管理一組同構(gòu)工作線(xiàn)程的資源池,線(xiàn)程池與工作隊(duì)列是密切相關(guān)的,在工作隊(duì)列中保存了所有等待執(zhí)行的任務(wù)。工作者線(xiàn)程的任務(wù)就是從工作隊(duì)列中取出一個(gè)任務(wù)執(zhí)行,執(zhí)行結(jié)束返回線(xiàn)程池等待下一個(gè)任務(wù)。

在線(xiàn)程池中的線(xiàn)程不是根據(jù)任務(wù)臨時(shí)創(chuàng)建的,而是事先準(zhǔn)備好一組線(xiàn)程,等待任務(wù)的出現(xiàn),執(zhí)行完任務(wù)并不隨著任務(wù)結(jié)束而銷(xiāo)毀,而是返回線(xiàn)程池等待下次任務(wù)的復(fù)用。這減少了線(xiàn)程頻繁創(chuàng)建和銷(xiāo)毀的開(kāi)銷(xiāo),同時(shí)因?yàn)榫€(xiàn)程池的大小限制,也限制住系統(tǒng)中最大的并發(fā)量。

為什么要使用線(xiàn)程池

  1. 構(gòu)建和銷(xiāo)毀一個(gè)線(xiàn)程是要與操作系統(tǒng)交互的,這個(gè)成本是很高的;
  2. 活躍的線(xiàn)程會(huì)消耗系統(tǒng)資源,尤其是內(nèi)存。如果可運(yùn)行的線(xiàn)程對(duì)于處理器的數(shù)量,那么就會(huì)閑置一些線(xiàn)程。大量的線(xiàn)程會(huì)占用大量的內(nèi)存空間,也會(huì)給垃圾回收帶來(lái)壓力,因?yàn)榫€(xiàn)程往往生命周期很短。大量的線(xiàn)程競(jìng)爭(zhēng)CPU也會(huì)帶來(lái)性能的問(wèn)題;
  3. 可創(chuàng)建的線(xiàn)程數(shù)量在各個(gè)平臺(tái)上有不同的限制,如果破壞了限制,可能會(huì)拋出OOM異常而終止程序。在一定范圍內(nèi),增加線(xiàn)程可以提高系統(tǒng)的吞吐量,但是物極必反,再創(chuàng)建更多的線(xiàn)程只會(huì)降低程序的執(zhí)行速度,過(guò)多的創(chuàng)建線(xiàn)程可能會(huì)使系統(tǒng)崩潰。

任務(wù)執(zhí)行策略

任務(wù)是一組邏輯工作單元,需要依附于運(yùn)行線(xiàn)程被執(zhí)行。任務(wù)的執(zhí)行策略有簡(jiǎn)單粗暴的把所有任務(wù)放入單線(xiàn)程中順序執(zhí)行、為每個(gè)任務(wù)創(chuàng)建一個(gè)線(xiàn)程執(zhí)行、使用異步任務(wù)執(zhí)行框架來(lái)執(zhí)行。

單線(xiàn)程執(zhí)行簡(jiǎn)單、但非常不高效且可能會(huì)因?yàn)橐粋€(gè)任務(wù)的錯(cuò)誤導(dǎo)致整個(gè)任務(wù)阻塞。每個(gè)任務(wù)創(chuàng)建線(xiàn)程的缺點(diǎn)上節(jié)已經(jīng)闡述。最佳選擇是選擇異步執(zhí)行框架也就是基于線(xiàn)程池的任務(wù)執(zhí)行框架。

Executor框架

Executor是一個(gè)簡(jiǎn)單的函數(shù)式接口:

public interface Executor {

    /**
     * Executes the given command at some time in the future.  The command
     * may execute in a new thread, in a pooled thread, or in the calling
     * thread, at the discretion of the {@code Executor} implementation.
     *
     * @param command the runnable task
     * @throws RejectedExecutionException if this task cannot be
     * accepted for execution
     * @throws NullPointerException if command is null
     */
    void execute(Runnable command);
}

它提供一種標(biāo)準(zhǔn)的方法將任務(wù)的提交和執(zhí)行過(guò)程解耦,使用Runnalbe作為任務(wù)的抽象。

Executor還提供了對(duì)周期性任務(wù)的支持,以及統(tǒng)計(jì)信息收集、應(yīng)用程序管理機(jī)制和性能監(jiān)視等機(jī)制。

Executor基于生產(chǎn)者-消費(fèi)者模式,提交任務(wù)相當(dāng)于生產(chǎn)者,執(zhí)行任務(wù)相當(dāng)于消費(fèi)者。

Executor有許多靜態(tài)工廠(chǎng)方法用來(lái)構(gòu)建線(xiàn)程池(返回的是ExecutorService):

方法 描述
newCachedThreadPool 必要時(shí)創(chuàng)建新線(xiàn)程,空閑線(xiàn)程會(huì)被保留60秒
newFixedThreadPool 創(chuàng)建指定數(shù)量線(xiàn)程的線(xiàn)程池,且線(xiàn)程一直保留
newSingleTheadPool 只有一個(gè)線(xiàn)程的線(xiàn)程池,改線(xiàn)程順序執(zhí)行提交的每一個(gè)任務(wù)
newScheduledThreadPool 用于預(yù)定執(zhí)行而構(gòu)建的線(xiàn)程池,用于替代java.util.Timer
newSingleThreadScheduledExecutor 用于執(zhí)行預(yù)定任務(wù)的單線(xiàn)程池。

線(xiàn)程池是執(zhí)行框架的實(shí)現(xiàn)的一部分,作為消費(fèi)者角色,執(zhí)行器將任務(wù)分發(fā)給線(xiàn)程池執(zhí)行。

當(dāng)用完一個(gè)線(xiàn)程池時(shí),需要將線(xiàn)程池關(guān)閉,否則JVM將無(wú)法退出。關(guān)閉線(xiàn)程池有兩種方法:

  • shutdown

此方法是平緩的關(guān)閉方式,不再接受新的任務(wù),等待以已經(jīng)啟動(dòng)的任務(wù)結(jié)束,當(dāng)所有的任務(wù)完成,線(xiàn)程池中的線(xiàn)程死亡。

  • shutdownNow

暴力關(guān)閉方式,取消尚未開(kāi)始的任務(wù)并試圖中斷正在運(yùn)行的線(xiàn)程。

ExecutorService的生命周期有三種:運(yùn)行、關(guān)閉、終止。Executor初始創(chuàng)建時(shí)處于運(yùn)行狀態(tài),執(zhí)行shutdown之后進(jìn)入關(guān)閉狀態(tài),等所有任務(wù)都完成后進(jìn)入終止?fàn)顟B(tài)??梢哉{(diào)用awaitTermination等待ExecutorService到達(dá)終止?fàn)顟B(tài),或者使用isTerminated輪詢(xún)狀態(tài)。

使用線(xiàn)程池的一般邏輯:

  1. 調(diào)用Executors類(lèi)的靜態(tài)方法newCachedThreadPool或者newFixedThreadPoo
  2. 調(diào)用submit提交任務(wù)(Runnable或Callable對(duì)象)
  3. 如果想要取消一個(gè)任務(wù),或者提交Callable對(duì)象,要保存好返回的Future對(duì)象
  4. 當(dāng)不再提交新任務(wù)時(shí),調(diào)用shutdown。

ExecutorService

public interface ExecutorService extends Executor {

    void shutdown();
    
    List<Runnable> shutdownNow();
    
    boolean isShutdown();
    
    boolean isTerminated();
    
    boolean awaitTermination(long timeout, TimeUnit unit)    throws InterruptedException;
    
    <T> Future<T> submit(Callable<T> task);
    
    <T> Future<T> submit(Runnable task, T result);
 
    Future<?> submit(Runnable task);
    /*
    * 提交所有對(duì)象到一個(gè)Callable對(duì)象的集合中,并返回一個(gè)Future對(duì)象列表,代表所有任務(wù)的Future對(duì)象。
    * 可以使用ExecutorCompletionService對(duì)結(jié)果按可獲得的順序排序。
    */
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)         throws InterruptedException;
     
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit)  throws InterruptedException;
    /*
    *提交所有對(duì)象到一個(gè)Callable對(duì)象的集合中,并返回某個(gè)已經(jīng)完成的任務(wù)的結(jié)果;
    * 這個(gè)結(jié)果無(wú)法知道是哪個(gè)任務(wù)返回的,返回值之后,這個(gè)任務(wù)組就結(jié)束了。
    */
    <T> T invokeAny(Collection<? extends Callable<T>> tasks)
        throws InterruptedException, ExecutionException;

    <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                    long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

Callable和Future

Executor以Runnable為任務(wù)抽象,但是Runnable方法不能拋出異常且沒(méi)有返回值。Callable是一種更好的任務(wù)抽象,它認(rèn)為主入口點(diǎn)將返回一個(gè)值并可能拋出一個(gè)異常。

Future表示一個(gè)任務(wù)的生命周期,并提供了方法來(lái)判斷是否已經(jīng)完成或取消,以及獲取任務(wù)的結(jié)果或者取消任務(wù)等。Future需要和Callable合作使用才能獲取返回結(jié)果。

CompletionService控制任務(wù)組

如果向Executor提交了一組任務(wù),并且希望在任務(wù)執(zhí)行完成后獲得結(jié)果,那么可以保留每個(gè)任務(wù)的Future對(duì)象,然后通過(guò)調(diào)用get方法獲取執(zhí)行結(jié)果。CompletionService提供了更好的方法來(lái)完成此需求。

CompletionService將Executor和BlockingQueue的功能融合在一起??梢詫allable任務(wù)提交給它執(zhí)行,然后使用類(lèi)似隊(duì)列操作的take和poll方法獲取已完成的結(jié)果,而且這些結(jié)果在完成時(shí)會(huì)被封裝成Future對(duì)象。

ExecutorCompletionService實(shí)現(xiàn)了CompletionService,并將計(jì)算部分委托給Executor。

CompletionService<ImageData> completionService = new ExecutorCompletionService<>(executor);
completionServie.submit(()->{//...});

Future<ImageInfo> f = completionService.take();

為任務(wù)設(shè)置時(shí)限

利用Future的限時(shí)get方法。

異構(gòu)任務(wù)的并行問(wèn)題

各個(gè)任務(wù)執(zhí)行時(shí)間可能差距較大,導(dǎo)致并行任務(wù)的時(shí)間依賴(lài)于最久執(zhí)行時(shí)間的任務(wù)。

Fork-join框架

并行任務(wù)執(zhí)行框架。
要采用框架可用的一種方式完成遞歸計(jì)算,需要通過(guò)一個(gè)擴(kuò)展RecursiveTasK<T>的類(lèi)或者提供一個(gè)擴(kuò)展RecursiveAction的類(lèi),再覆蓋compute方法來(lái)生成和調(diào)用子任務(wù),然后合并結(jié)果。

可完成Future

處理非阻塞調(diào)用的傳統(tǒng)方法是使用事件監(jiān)聽(tīng)器,為任務(wù)完成之后要出現(xiàn)的動(dòng)作注冊(cè)一個(gè)處理器,如果下一個(gè)動(dòng)作也是異步的,在它之后的下一個(gè)動(dòng)作會(huì)在一個(gè)不同的事件處理器中。這樣雖然在功能上不會(huì)有什么問(wèn)題,但是這個(gè)流程的代碼可能分散到各處。
Java8的CompletableFuture類(lèi)提供了一種候選方法,可完成Future可以組合。

    @Test
    public void testCompletableFuture() {

        String res = CompletableFuture.supplyAsync(() -> "hello").thenApplyAsync((a) -> a + "world").join();
        System.out.println(res);
    }
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀(guān)點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容