10、Executor框架

Java并發(fā)編程的藝術(shù)筆記


前言

在Java中,使用線程來異步執(zhí)行任務(wù)。
Java線程的創(chuàng)建與銷毀需要一定的開銷,如果我們?yōu)槊恳粋€任務(wù)創(chuàng)建一個新線程來執(zhí)行,這些線程的創(chuàng)建與銷毀將消耗大量的計(jì)算資源。
同時,為每一個任務(wù)創(chuàng)建一個新線程來執(zhí)行,這種策略可能會使處于高負(fù)荷狀態(tài)的應(yīng)用最終崩潰。

Java的線程既是工作單元,也是執(zhí)行機(jī)制。

JDK 5 開始,把 工作單元執(zhí)行機(jī)制 分離開來。

  • 工作單元 :
    • Runnable
    • Callable
  • 執(zhí)行機(jī)制
    • Executor框架

Executor框架簡介

Executor框架的兩級調(diào)度模型

HotSpot VM的線程模型中,Java線程(java.lang.Thread)被 一對一映射為本地操作系統(tǒng)線程。Java線程啟動時會創(chuàng)建一個本地操作系統(tǒng)線程;當(dāng)該Java線程終止時,這個操作系統(tǒng)線程也會被回收。
操作系統(tǒng)會調(diào)度所有線程并將它們分配給可用的CPU。
在上層,Java多線程程序通常把應(yīng)用分解為若干個任務(wù),然后使用用戶級的調(diào)度器(Executor框架)將這些任務(wù)映射為固定數(shù)量的線程;在底層,操作系統(tǒng)內(nèi)核將這些線程映射到硬件處理器上。這種兩級調(diào)度模型的示意圖下面有介紹。
從下圖中可以看出,應(yīng)用程序通過Executor框架控制上層的調(diào)度;而下層的調(diào)度由操作系統(tǒng)內(nèi)核控制,下層的調(diào)度不受應(yīng)用程序的控制。

Executor框架的結(jié)構(gòu)與成員
Executor框架的兩級調(diào)度模型
Executor框架的結(jié)構(gòu)
  • 任務(wù)。包括被執(zhí)行任務(wù)需要實(shí)現(xiàn)的接口:Runnable接口Callable接口。

  • 任務(wù)的執(zhí)行。包括任務(wù)執(zhí)行機(jī)制的核心接口Executor,以及繼承自ExecutorExecutorService接口。Executor框架有兩個關(guān)鍵類實(shí)現(xiàn)了ExecutorService接口(ThreadPoolExecutorScheduledThreadPoolExecutor)。

  • 異步計(jì)算的結(jié)果。包括接口Future和實(shí)現(xiàn)Future接口的FutureTask類。

  • Executor框架包含的主要的類與接口如下圖所示:

    Executor框架的類與接口

    下面是這些類和接口的簡介:

    • Executor是一個接口,它是Executor框架的基礎(chǔ),它將任務(wù)的提交與任務(wù)的執(zhí)行分離開來。
    • ThreadPoolExecutor 是線程池的核心實(shí)現(xiàn)類,用來執(zhí)行被提交的任務(wù)。
    • ScheduledThreadPoolExecutor 是一個實(shí)現(xiàn)類,可以在給定的延遲后運(yùn)行命令,或者定期執(zhí)行命令。ScheduledThreadPoolExecutorTimer更靈活,功能更強(qiáng)大。
    • Future接口和實(shí)現(xiàn)Future接口的FutureTask類,代表異步計(jì)算的結(jié)果。
    • Runnable接口和Callable接口的實(shí)現(xiàn)類,都可以被ThreadPoolExecutorScheduledThreadPoolExecutor執(zhí)行。
Executor`框架的使用示意圖如下:
Executor框架的使用示意圖

主線程首先要創(chuàng)建實(shí)現(xiàn)Runnable或者Callable接口的任務(wù)對象。
工具類Executors可以通過以下兩個方法把一個Runnable對象封裝為一個Callable對象:

  • Executors.callable(Runnable task)
  • Executors.callable(Runnable task, Object resule)。

然后可以把Runnable對象直接交給ExecutorService執(zhí)行ExecutorService.execute(Runnable command);或者也可以把Runnable對象或Callable對象提交給ExecutorService 執(zhí)行 ExecutorService.submit(Runnable task)ExecutorService.submit(Callable<T>task)

如果執(zhí)行ExecutorService.submit(…),ExecutorService 將返回一個實(shí)現(xiàn) Future 接口的對象(到目前為止的JDK中,返回的是FutureTask對象)。由于FutureTask實(shí)現(xiàn)了Runnable,程序員也可以創(chuàng)建FutureTask,然后直接交給ExecutorService執(zhí)行。

最后,主線程可以執(zhí)行 FutureTask.get() 方法來等待任務(wù)執(zhí)行完成。主線程也可以執(zhí)行
FutureTask.cancel(boolean mayInterruptIfRunning)來取消此任務(wù)的執(zhí)行。

Executor框架的成員

這里將介紹Executor框架的主要成員:

  • ThreadPoolExecutor
  • ScheduledThreadPoolExecutor
  • Future接口
  • Runnable接口
  • Callable接口
  • Executors
ThreadPoolExecutor

通常使用工廠類Executors來創(chuàng)建。Executors可以創(chuàng)建3種類型的ThreadPoolExecutor

  • SingleThreadExecutor:適用于需要保證順序地執(zhí)行各個任務(wù);并且在任意時間點(diǎn),不會有多個線程是活動的應(yīng)用場景。
    下面是Executors提供的,創(chuàng)建使用單個線程的SingleThreadExecutor的API。

    public static ExecutorService newSingleThreadExecutor() {
        return new Executors.FinalizableDelegatedExecutorService(
                new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue())
        );
    }
    public static ExecutorService newSingleThreadExecutor(ThreadFactory var0) {
        return new Executors.FinalizableDelegatedExecutorService(
                new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(), var0)
        );
    }
    
    
  • FixedThreadPool:適用于為了滿足資源管理的需求,而需要限制當(dāng)前線程數(shù)量的應(yīng)用場景,它適用于負(fù)載比較重的服務(wù)器。
    下面是Executors提供的,創(chuàng)建使用固定線程數(shù)的FixedThreadPool的API:

    public static ExecutorService newFixedThreadPool(int var0) {
        return new ThreadPoolExecutor(var0, var0, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue());
    }
    public static ExecutorService newFixedThreadPool(int var0, ThreadFactory var1) {
        return new ThreadPoolExecutor(var0, var0, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(), var1);
    }
    
    
  • CachedThreadPool:是大小無界的線程池,適用于執(zhí)行很多的短期異步任務(wù)的小程序,或者是負(fù)載較輕的服務(wù)器。
    下面是Executors提供的,創(chuàng)建一個會根據(jù)需要創(chuàng)建新線程的CachedThreadPool的API。

    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, 2147483647, 60L, TimeUnit.SECONDS, new SynchronousQueue());
    }
    public static ExecutorService newCachedThreadPool(ThreadFactory var0) {
        return new ThreadPoolExecutor(0, 2147483647, 60L, TimeUnit.SECONDS, new SynchronousQueue(), var0);
    }
    
    
  • ScheduledThreadPoolExecutor
    ScheduledThreadPoolExecutor通常使用工廠類Executors來創(chuàng)建。
    Executors可以創(chuàng)建2種類型的ScheduledThreadPoolExecutor,如下:

    • ScheduledThreadPoolExecutor。包含若干個線程的ScheduledThreadPoolExecutor。
      適用于需要多個后臺線程執(zhí)行周期任務(wù),同時為了滿足資源管理的需求而需要限制后臺線程的數(shù)量的應(yīng)用場景。
      下面是工廠類Executors提供的,創(chuàng)建 固定個數(shù) 線程的ScheduledThreadPoolExecutor的API。

      public static ScheduledExecutorService newScheduledThreadPool(int var0) {
          return new ScheduledThreadPoolExecutor(var0);
      }
      public static ScheduledExecutorService newScheduledThreadPool(int var0, ThreadFactory var1) {
          return new ScheduledThreadPoolExecutor(var0, var1);
      }
      
      
    • SingleThreadScheduledExecutor。只包含一個線程的ScheduledThreadPoolExecutor
      適用于需要單個后臺線程執(zhí)行周期任務(wù),同時需要保證順序地執(zhí)行各個任務(wù)的應(yīng)用場景。
      下面是Executors提供的,創(chuàng)建單個線程的SingleThreadScheduledExecutor的API。

      public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
          return new Executors.DelegatedScheduledExecutorService(new ScheduledThreadPoolExecutor(1));
      }
      public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory var0) {
          return new Executors.DelegatedScheduledExecutorService(new ScheduledThreadPoolExecutor(1, var0));
      }
      
      
Future接口

Future接口和實(shí)現(xiàn)Future接口的FutureTask類用來表示異步計(jì)算的結(jié)果。
當(dāng)我們把Runnable接口或Callable接口的實(shí)現(xiàn)類提交(submit)給ThreadPoolExecutorScheduledThreadPoolExecutor時,ThreadPoolExecutorScheduledThreadPoolExecutor會向我們返回一個FutureTask對象。
下面是對應(yīng)的API:

<T> Future<T> submit(Callable<T> task)
<T> Future<T> submit(Runnable task, T result)
Future<> submit(Runnable task)

有一點(diǎn)需要讀者注意,到目前最新的JDK 8為止,Java通過上述API返回的是一個FutureTask對象。但從API可以看到,Java僅僅保證返回的是一個實(shí)現(xiàn)了Future接口的對象。在將來的JDK實(shí)現(xiàn)中,返回的可能不一定是FutureTask。

Runnable接口和 Callable接口

Runnable接口和Callable接口的實(shí)現(xiàn)類,都可以被ThreadPoolExecutorScheduledThreadPoolExecutor執(zhí)行。
它們之間的區(qū)別是Runnable不會返回結(jié)果,而Callable可以返回結(jié)果。
除了可以自己創(chuàng)建實(shí)現(xiàn)Callable接口的對象外,還可以使用工廠類Executors來把一個Runnable包裝成一個Callable

下面是Executors提供的,把一個Runnable包裝成一個Callable的API。

public static Callable<Object> callable(Runnable task)  // 假設(shè)返回對象Callable1

下面是Executors提供的,把一個Runnable和一個待返回的結(jié)果包裝成一個Callable的API。

public static <T> Callable<T> callable(Runnable task, T result) // 假設(shè)返回對象Callable2

前面講過,當(dāng)我們把一個Callable對象(比如上面的Callable1Callable2)提交給ThreadPoolExecutorScheduledThreadPoolExecutor執(zhí)行時,submit(…)會向我們返回一個FutureTask對象。我們可以執(zhí)行FutureTask.get()方法來 等待任務(wù)執(zhí)行完成。當(dāng)任務(wù)成功完成后FutureTask.get()將返回該任務(wù)的結(jié)果。

例如,如果提交的是對象Callable1FutureTask.get()方法將返回null;如果提交的是對象Callable2FutureTask.get()方法將返回result對象。

ThreadPoolExecutor詳解

Executor框架最核心的類是ThreadPoolExecutor,它是線程池的實(shí)現(xiàn)類,主要由下列4個組件構(gòu)成。

  • corePool:核心線程池的大小。
  • maximumPool:最大線程池的大小。
  • BlockingQueue:用來暫時保存任務(wù)的工作隊(duì)列。
  • RejectedExecutionHandler:當(dāng)ThreadPoolExecutor已經(jīng)關(guān)閉或ThreadPoolExecutor已經(jīng)飽和時(達(dá)到了最大線程池大小且工作隊(duì)列已滿),execute()方法將要調(diào)用的Handler。

通過Executor框架的工具類Executors,可以創(chuàng)建3種類型的ThreadPoolExecutor

  • FixedThreadPool
  • SingleThreadExecutor
  • CachedThreadPool
FixedThreadPool詳解

FixedThreadPool被稱為可重用固定線程數(shù)的線程池。下面是FixedThreadPool的源代碼實(shí)現(xiàn)。

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
}

FixedThreadPoolcorePoolSizemaximumPoolSize都被設(shè)置為創(chuàng)建FixedThreadPool時指定的參數(shù)nThreads。

當(dāng)線程池中的線程數(shù)大于corePoolSize時,keepAliveTime為多余的空閑線程等待新任務(wù)的最長時間,超過這個時間后多余的線程將被終止。這里把keepAliveTime設(shè)置為0L,意味著多余的空閑線程會被立即終止。

FixedThreadPoolexecute()方法的運(yùn)行示意圖如下圖所示。

FixedThreadPool的execute()的運(yùn)行示意圖

  • 圖中1:如果當(dāng)前運(yùn)行的線程數(shù)少于corePoolSize,則創(chuàng)建新線程來執(zhí)行任務(wù)。
  • 圖中2:在線程池完成預(yù)熱之后(當(dāng)前運(yùn)行的線程數(shù)等于corePoolSize),將任務(wù)加入LinkedBlockingQueue
  • 圖中3:線程執(zhí)行完1中的任務(wù)后,會在循環(huán)中反復(fù)從LinkedBlockingQueue獲取任務(wù)來執(zhí)行。

FixedThreadPool使用無界隊(duì)列LinkedBlockingQueue作為線程池的工作隊(duì)列(隊(duì)列的容量為Integer.MAX_VALUE)。
使用無界隊(duì)列作為工作隊(duì)列會對線程池帶來如下影響:

  • 當(dāng)線程池中的線程數(shù)達(dá)到corePoolSize后,新任務(wù)將在無界隊(duì)列中等待,因此線程池中的線程數(shù)不會超過corePoolSize。
  • 由于上一點(diǎn),使用無界隊(duì)列時maximumPoolSize將是一個無效參數(shù)。
  • 由于前面兩點(diǎn),使用無界隊(duì)列時keepAliveTime將是一個無效參數(shù)。
  • 由于使用無界隊(duì)列,運(yùn)行中的FixedThreadPool(未執(zhí)行方法shutdown()shutdownNow())不會拒絕任務(wù)(不會調(diào)用RejectedExecutionHandler.rejectedExecution方法)。

SingleThreadExecutor詳解

SingleThreadExecutor是使用單個worker線程的Executor。
下面是SingleThreadExecutor的源代碼實(shí)現(xiàn):

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService(
            new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())
    );
}

SingleThreadExecutorcorePoolSizemaximumPoolSize被設(shè)置為1。其他參數(shù)與FixedThreadPool相同。SingleThreadExecutor使用無界隊(duì)列LinkedBlockingQueue作為線程池的工作隊(duì)列(隊(duì)列的容量為Integer.MAX_VALUE)。SingleThreadExecutor使用無界隊(duì)列作為工作隊(duì)列對線程池帶來的影響與FixedThreadPool相同,這里就不贅述了。

SingleThreadExecutor的運(yùn)行示意圖如下圖所示:

SingleThreadExecutor的execute()的運(yùn)行示意圖

  • 上圖1:如果當(dāng)前運(yùn)行的線程數(shù)少于corePoolSize(即線程池中無運(yùn)行的線程),則創(chuàng)建一個新線程來執(zhí)行任務(wù)。
  • 上圖2:在線程池完成預(yù)熱之后(當(dāng)前線程池中有一個運(yùn)行的線程),將任務(wù)加入LinkedBlockingQueue。
  • 上圖3:線程執(zhí)行完上圖1中的任務(wù)后,會在一個無限循環(huán)中反復(fù)從LinkedBlockingQueue獲取任務(wù)來執(zhí)行。

CachedThreadPool詳解

CachedThreadPool是一個會根據(jù)需要創(chuàng)建新線程的線程池。
下面是創(chuàng)建CachedThreadPool的源代碼。

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS,
            new SynchronousQueue<Runnable>());
}

CachedThreadPoolcorePoolSize被設(shè)置為0,即corePool為空;maximumPoolSize被設(shè)置為Integer.MAX_VALUE,即maximumPool是無界的。這里把keepAliveTime設(shè)置為60L,意味著CachedThreadPool中的空閑線程等待新任務(wù)的最長時間為60秒,空閑線程超過60秒后將會被終止。

FixedThreadPoolSingleThreadExecutor使用無界隊(duì)列LinkedBlockingQueue作為線程池的工作隊(duì)列。
CachedThreadPool使用沒有容量的SynchronousQueue作為線程池的工作隊(duì)列,但CachedThreadPoolmaximumPool是無界的。
這意味著,如果主線程提交任務(wù)的速度高于maximumPool中線程處理任務(wù)的速度時,CachedThreadPool會不斷創(chuàng)建新線程。
極端情況下,CachedThreadPool會因?yàn)閯?chuàng)建過多線程而耗盡CPU和內(nèi)存資源。

CachedThreadPoolexecute()方法的執(zhí)行示意圖如下圖所示:

CachedThreadPool的execute()的運(yùn)行示意圖

  • 上圖1: 首先執(zhí)行SynchronousQueue.offer(Runnable task)。如果當(dāng)前maximumPool中有空閑線程正在執(zhí)行SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS),那么主線程執(zhí)行offer操作與空閑線程執(zhí)行的poll操作配對成功,主線程把任務(wù)交給空閑線程執(zhí)行,execute()方法執(zhí)行完成;否則執(zhí)行下面的步驟2
  • 上圖2:當(dāng)初始maximumPool為空,或者maximumPool中當(dāng)前沒有空閑線程時,將沒有線程執(zhí)行SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS)。這種情況下,步驟1將失敗。此時CachedThreadPool會創(chuàng)建一個新線程執(zhí)行任務(wù),execute()方法執(zhí)行完成。
  • 上圖3:.在步驟2中新創(chuàng)建的線程將任務(wù)執(zhí)行完后,會執(zhí)行SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS)。這個poll操作會讓空閑線程最多在SynchronousQueue中等待60秒鐘。如果60秒鐘內(nèi)主線程提交了一個新任務(wù)(主線程執(zhí)行步驟1),那么這個空閑線程將執(zhí)行主線程提交的新任務(wù);否則,這個空閑線程將終止。由于空閑60秒的空閑線程會被終止,因此長時間保持空閑的CachedThreadPool不會使用任何資源。

前面提到過,SynchronousQueue是一個沒有容量的阻塞隊(duì)列。每個插入操作必須等待另一個線程的對應(yīng)移除操作,反之亦然。CachedThreadPool使用SynchronousQueue,把主線程提交的任務(wù)傳遞給空閑線程執(zhí)行。
CachedThreadPool中任務(wù)傳遞的示意圖如下圖所示:

CachedThreadPool的任務(wù)傳遞示意圖

ScheduledThreadPoolExecutor詳解

ScheduledThreadPoolExecutor繼承自ThreadPoolExecutor。它主要用來在給定的延遲之后運(yùn)行任務(wù),或者定期執(zhí)行任務(wù)。ScheduledThreadPoolExecutor的功能與Timer類似,但ScheduledThreadPoolExecutor功能更強(qiáng)大、更靈活。Timer對應(yīng)的是單個后臺線程,而ScheduledThreadPoolExecutor可以在構(gòu)造函數(shù)中指定多個對應(yīng)的后臺線程數(shù)。

  • ScheduledThreadPoolExecutor的運(yùn)行機(jī)制
    ScheduledThreadPoolExecutor的執(zhí)行示意圖(基于JDK 6)如下圖所示:

    ScheduledThreadPoolExecutor的任務(wù)傳遞示意圖

    DelayQueue是一個無界隊(duì)列,所以ThreadPoolExecutormaximumPoolSizeScheduledThreadPoolExecutor中沒有什么意義(設(shè)置maximumPoolSize的大小沒有什么效果)。

ScheduledThreadPoolExecutor的執(zhí)行主要分為兩大部分。

  • 當(dāng)調(diào)用ScheduledThreadPoolExecutorscheduleAtFixedRate()方法或者scheduleWithFixedDelay()方法時,會向ScheduledThreadPoolExecutor的DelayQueue添加一個實(shí)現(xiàn)了RunnableScheduledFutur接口的ScheduledFutureTask。
  • 線程池中的線程從DelayQueue中獲取ScheduledFutureTask,然后執(zhí)行任務(wù)。

ScheduledThreadPoolExecutor為了實(shí)現(xiàn)周期性的執(zhí)行任務(wù),對ThreadPoolExecutor做了如下的修改。

  • 使用DelayQueue作為任務(wù)隊(duì)列。
  • 獲取任務(wù)的方式不同(后文會說明)。
  • 執(zhí)行周期任務(wù)后,增加了額外的處理(后文會說明)。

ScheduledThreadPoolExecutor的實(shí)現(xiàn)

前面我們提到過,ScheduledThreadPoolExecutor會把待調(diào)度的任務(wù)(ScheduledFutureTask)放到一個DelayQueue中。

ScheduledFutureTask主要包含3個成員變量,如下。

  • long型成員變量time,表示這個任務(wù)將要被執(zhí)行的具體時間。
  • long型成員變量sequenceNumber,表示這個任務(wù)被添加到ScheduledThreadPoolExecutor中的序號。
  • long型成員變量period,表示任務(wù)執(zhí)行的間隔周期。

DelayQueue封裝了一個PriorityQueue,這個PriorityQueue會對隊(duì)列中的ScheduledFutureTask進(jìn)行排序。排序時,time小的排在前面(時間早的任務(wù)將被先執(zhí)行)。如果兩個ScheduledFutureTasktime相同,就比較sequenceNumbersequenceNumber小的排在前面(也就是說,如果兩個任務(wù)的執(zhí)行時間相同,那么先提交的任務(wù)將被先執(zhí)行)。

首先,讓我們看看ScheduledThreadPoolExecutor中的線程執(zhí)行周期任務(wù)的過程。下圖是ScheduledThreadPoolExecutor中的線程1執(zhí)行某個周期任務(wù)的4個步驟。

ScheduledThreadPoolExecutor的任務(wù)執(zhí)行步驟

下面是對這4個步驟的說明:

  • 上圖1: 線程·1從DelayQueue中獲取已到期的ScheduledFutureTaskDelayQueue.take()。到期任務(wù)是指ScheduledFutureTasktime大于等于當(dāng)前時間。
  • 上圖2: 線程1執(zhí)行這個ScheduledFutureTask
  • 上圖3: 線程1修改ScheduledFutureTasktime變量為下次將要被執(zhí)行的時間。
  • 上圖4: 線程1把這個修改time之后的ScheduledFutureTask放回DelayQueueDelayQueue.add()

接下來,讓我們看看上面的步驟1獲取任務(wù)的過程。下面是DelayQueue.take()方法的源代碼實(shí)現(xiàn):

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly(); // 1
    try {
        for (; ; ) {
            E first = q.peek();
            if (first == null) {
                available.await(); // 2.1
            } else {
                long delay = first.getDelay(TimeUnit.NANOSECONDS);
                if (delay > 0) {
                    long tl = available.awaitNanos(delay); // 2.2
                } else {
                    E x = q.poll(); // 2.3.1
                    assert x != null;
                    if (q.size() != 0)
                        available.signalAll(); // 2.3.2
                    return x;
                }
            }
        }
    } finally {
        lock.unlock(); // 3
    }
}

下圖是DelayQueue.take()的執(zhí)行示意圖:

ScheduledThreadPoolExecutor獲取任務(wù)的過程

如圖所示,獲取任務(wù)分為3大步驟。

  • 獲取Lock。
  • 獲取周期任務(wù)。
    • 如果PriorityQueue為空,當(dāng)前線程到Condition中等待;否則執(zhí)行下面的2.2
    • 如果PriorityQueue的頭元素的time時間比當(dāng)前時間大,到Condition中等待到time時間;否則執(zhí)行下面的2.3。
    • 獲取PriorityQueue的頭元素(2.3.1);如果PriorityQueue不為空,則喚醒在Condition中等待的所有線程(2.3.2)。
  • 釋放Lock。

ScheduledThreadPoolExecutor在一個循環(huán)中執(zhí)行步驟2,直到線程從PriorityQueue獲取到一個元素之后(執(zhí)行2.3.1之后),才會退出無限循環(huán)(結(jié)束步驟2)。

最后,讓我們看看ScheduledThreadPoolExecutor中的線程執(zhí)行任務(wù)的步驟4,把ScheduledFutureTask放入DelayQueue中的過程。

下面是DelayQueue.add()的源代碼實(shí)現(xiàn):

public boolean offer(E e) {
    final ReentrantLock lock = this.lock;
    lock.lock(); // 1
    try {
        E first = q.peek();
        q.offer(e); // 2.1
        if (first == null || e.compareTo(first) < 0) {
            available.signalAll(); // 2.2
        }
        return true;
    } finally {
        lock.unlock(); // 3
    }
}

下圖是DelayQueue.add()的執(zhí)行示意圖:

ScheduledThreadPoolExecutor添加任務(wù)的過程

如圖所示,添加任務(wù)分為3大步驟。

  • 獲取Lock。
  • 添加任務(wù)。
    • PriorityQueue添加任務(wù)。
    • 如果在上面2.1中添加的任務(wù)是PriorityQueue的頭元素,喚醒在Condition中等待的所有線程。
  • 釋放Lock

FutureTask詳解

Future接口和實(shí)現(xiàn)Future接口的FutureTask類,代表 異步計(jì)算的結(jié)果

FutureTask簡介

FutureTask除了實(shí)現(xiàn)Future接口外,還實(shí)現(xiàn)了Runnable接口。因此,FutureTask可以交給Executor執(zhí)行,也可以由調(diào)用線程直接執(zhí)行FutureTask.run()。根據(jù)FutureTask.run()方法被執(zhí)行的時機(jī),FutureTask可以處于下面3種狀態(tài)。

  • 未啟動FutureTask.run()方法還沒有被執(zhí)行之前,FutureTask處于未啟動狀態(tài)。當(dāng)創(chuàng)建一個FutureTask,且沒有執(zhí)行FutureTask.run()方法之前,這個FutureTask處于未啟動狀態(tài)。
  • 已啟動。FutureTask.run()方法被執(zhí)行的過程中,FutureTask處于已啟動狀態(tài)。
  • 已完成。FutureTask.run()方法執(zhí)行完后正常結(jié)束,或被取消FutureTask.cancel(…),或執(zhí)行FutureTask.run()方法時拋出異常而異常結(jié)束,FutureTask處于已完成狀態(tài)。

下圖是FutureTask狀態(tài)遷移 的示意圖。

FutureTask的狀態(tài)遷移示意圖

當(dāng)FutureTask處于未啟動或已啟動狀態(tài)時,執(zhí)行FutureTask.get()方法將導(dǎo)致調(diào)用線程阻塞;當(dāng)FutureTask處于已完成狀態(tài)時,執(zhí)行FutureTask.get()方法將導(dǎo)致調(diào)用線程立即返回結(jié)果或拋出異常。

當(dāng)FutureTask處于未啟動狀態(tài)時,執(zhí)行FutureTask.cancel()方法將導(dǎo)致此任務(wù)永遠(yuǎn)不會被執(zhí)行;當(dāng)FutureTask處于已啟動狀態(tài)時,執(zhí)行FutureTask.cancel(true)方法將以中斷執(zhí)行此任務(wù)線程的方式來試圖停止任務(wù);當(dāng)FutureTask處于已啟動狀態(tài)時,執(zhí)行FutureTask.cancel(false)方法將不會對正在執(zhí)行此任務(wù)的線程產(chǎn)生影響(讓正在執(zhí)行的任務(wù)運(yùn)行完成);當(dāng)FutureTask處于已完成狀態(tài)時,執(zhí)行FutureTask.cancel(…)方法將返回false。

下圖是get方法和cancel方法的執(zhí)行示意圖:


get方法和cancel方法的執(zhí)行示意圖
FutureTask的使用

可以把FutureTask交給Executor執(zhí)行;也可以通過ExecutorService.submit(...)方法返回一個FutureTask,然后執(zhí)行FutureTask.get()方法或FutureTask.cancel(...)方法。除此以外,還可以單獨(dú)使用FutureTask

當(dāng)一個線程需要等待另一個線程把某個任務(wù)執(zhí)行完后它才能繼續(xù)執(zhí)行,此時可以使用FutureTask。假設(shè)有多個線程執(zhí)行若干任務(wù),每個任務(wù)最多只能被執(zhí)行一次。當(dāng)多個線程試圖同時執(zhí)行同一個任務(wù)時,只允許一個線程執(zhí)行任務(wù),其他線程需要等待這個任務(wù)執(zhí)行完后才能繼續(xù)執(zhí)行。

下面是對應(yīng)的示例代碼:

private final ConcurrentMap<Object, Future<String>> taskCache = new ConcurrentHashMap<>();

private String executionTask(final String taskName)
        throws ExecutionException, InterruptedException {
    while (true) {
        Future<String> future = taskCache.get(taskName); // 1.1, 2.1
        if (future == null) {
            Callable<String> task = new Callable<String>() {
                @Override
                public String call() throws InterruptedException {
                    return taskName;
                }
            };
            FutureTask<String> futureTask = new FutureTask<String>(task);
            future = taskCache.putIfAbsent(taskName, futureTask); // 1.3
            if (future == null) {
                future = futureTask;
                futureTask.run(); // 1.4執(zhí)行任務(wù)
            }
        }
        try {
            return future.get(); // 1.5, 2.2
        } catch (CancellationException e) {
            taskCache.remove(taskName, future);
        }
    }
}

上述代碼的執(zhí)行示意圖如下圖所示:


FutureTask的使用示例代碼的執(zhí)行示意圖

當(dāng)兩個線程試圖同時執(zhí)行同一個任務(wù)時,如果線程1執(zhí)行1.3線程2執(zhí)行2.1,那么接下來線程2將在2.2等待,直到線程1執(zhí)行完1.4線程2才能2.2(FutureTask.get())返回。

FutureTask的實(shí)現(xiàn)

FutureTask的實(shí)現(xiàn)基于AbstractQueuedSynchronizer(以下簡稱為 AQS)。java.util.concurrent中的很多可阻塞類(比如ReentrantLock)都是基于AQS來實(shí)現(xiàn)的。AQS是一個同步框架,它提供通用機(jī)制來原子性管理同步狀態(tài)、阻塞和喚醒線程,以及維護(hù)被阻塞線程的隊(duì)列。
JDK 6AQS被廣泛使用,基于AQS實(shí)現(xiàn)的同步器包括:

  • ReentrantLock
  • Semaphore
  • ReentrantReadWriteLock
  • CountDownLatch
  • FutureTask

每一個基于AQS實(shí)現(xiàn)的同步器都會包含兩種類型的操作,如下:

  • 至少一個acquire操作。這個操作阻塞調(diào)用線程,除非/直到AQS的狀態(tài)允許這個線程繼續(xù)執(zhí)行。FutureTaskacquire操作為get() / get(long timeout,TimeUnit unit)方法調(diào)用。
  • 至少一個release操作。這個操作改變AQS的狀態(tài),改變后的狀態(tài)可允許一個或多個阻塞線程被解除阻塞。FutureTaskrelease操作包括run()方法和cancel(...)方法。

基于“ 復(fù)合優(yōu)先于繼承 ”的原則,FutureTask聲明了一個內(nèi)部私有的繼承于AQS的子類Sync,對FutureTask所有公有方法的調(diào)用都會委托給這個內(nèi)部子類。

AQS被作為“模板方法模式”的基礎(chǔ)類提供給FutureTask的內(nèi)部子類Sync,這個內(nèi)部子類只需要實(shí)現(xiàn)狀態(tài)檢查和狀態(tài)更新的方法即可,這些方法將控制FutureTask的獲取和釋放操作。具體來說,Sync實(shí)現(xiàn)了AQStryAcquireShared(int)方法和tryReleaseShared(int)方法,Sync通過這兩個方法來檢查和更新同步狀態(tài)。

FutureTask的設(shè)計(jì)示意圖如下圖所示:

FutureTask的設(shè)計(jì)示意圖

如圖所示,SyncFutureTask的內(nèi)部私有類,它繼承自AQS。創(chuàng)建FutureTask時會創(chuàng)建內(nèi)部私有的成員對象Sync,FutureTask所有的的公有方法都直接委托給了內(nèi)部私有的Sync。

FutureTask.get()方法會調(diào)用AQS.acquireSharedInterruptibly(int arg)方法,這個方法的執(zhí)行過程如下:

  • 調(diào)用AQS.acquireSharedInterruptibly(int arg)方法,這個方法首先會回調(diào)在子類Sync中實(shí)現(xiàn)的tryAcquireShared()方法來判斷acquire操作是否可以成功。acquire操作可以成功的條件為:state為執(zhí)行完成狀態(tài)RAN已取消狀態(tài)CANCELLED,且runner不為null。
  • 如果成功則get()方法立即返回。如果失敗則到線程等待隊(duì)列中去等待其他線程執(zhí)行
    release操作。
  • 當(dāng)其他線程執(zhí)行release操作(比如FutureTask.run()FutureTask.cancel(…))喚醒當(dāng)前線程后,當(dāng)前線程再次執(zhí)行tryAcquireShared()將返回正值1,當(dāng)前線程將離開線程等待隊(duì)列并喚醒它的后繼線程(這里會產(chǎn)生級聯(lián)喚醒的效果,后面會介紹)。
  • 最后返回計(jì)算的結(jié)果或拋出異常。

FutureTask.run()的執(zhí)行過程如下:

  • 執(zhí)行在構(gòu)造函數(shù)中指定的任務(wù)(Callable.call())。
  • 以原子方式來更新同步狀態(tài)(調(diào)用AQS.compareAndSetState(int expect,int update),設(shè)置state為執(zhí)行完成狀態(tài)RAN)。如果這個原子操作成功,就設(shè)置代表計(jì)算結(jié)果的變量result的值為Callable.call()的返回值,然后調(diào)用AQS.releaseShared(int arg)。
  • AQS.releaseShared(int arg)首先會回調(diào)在子類Sync中實(shí)現(xiàn)的tryReleaseShared(arg)來執(zhí)行release操作(設(shè)置運(yùn)行任務(wù)的線程runnernull,然會返回true);AQS.releaseShared(int arg),然后喚醒線程等待隊(duì)列中的第一個線程。
  • 調(diào)用FutureTask.done()。

當(dāng)執(zhí)行FutureTask.get()方法時,如果FutureTask不是處于執(zhí)行完成狀態(tài)RAN已取消狀態(tài)CANCELLED,當(dāng)前執(zhí)行線程將到AQS的線程等待隊(duì)列中等待(見下圖的線程A、B、C和D)。當(dāng)某個線程執(zhí)行FutureTask.run()方法或FutureTask.cancel(...)方法時,會喚醒線程等待隊(duì)列的第一個線程(見下圖所示的線程E喚醒線程A)。

FutureTask的級聯(lián)喚醒示意圖

假設(shè)開始時FutureTask處于未啟動狀態(tài)或已啟動狀態(tài),等待隊(duì)列中已經(jīng)有3個線程(A、BC)在等待。此時,線程D執(zhí)行get()方法將導(dǎo)致線程D也到等待隊(duì)列中去等待。

當(dāng)線程E執(zhí)行run()方法時,會喚醒隊(duì)列中的第一個線程A。線程A被喚醒后,首先把自己從隊(duì)列中刪除,然后喚醒它的后繼線程B,最后線程Aget()方法返回。線程B、CD重復(fù)A線程的處理流程。最終,在隊(duì)列中等待的所有線程都被級聯(lián)喚醒并從get()方法返回。

小結(jié)

本文介紹了Executor框架的整體結(jié)構(gòu)和成員組件。
希望讀者閱讀本章之后,能夠?qū)?code>Executor框架有一個比較深入的理解,同時也希望本章內(nèi)容有助于讀者更熟練地使用Executor框架。

</article>

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • Executor框架Executor框架簡介Executor框架的兩級調(diào)度模型Executor框架的結(jié)構(gòu)與成員Ex...
    叫我胖虎大人閱讀 398評論 0 6
  • Java的線程既是工作單元,也是執(zhí)行機(jī)制。JDK 5開始,把工作單元與執(zhí)行機(jī)制分離開來。工作單元包括Runnabl...
    加夕閱讀 746評論 0 1
  • Executor框架簡介 JDK將工作單元和執(zhí)行機(jī)制分離,工作單元包括Runnable和Callable,執(zhí)行機(jī)制...
    星冉子閱讀 126評論 0 0
  • 在Java中,使用線程來執(zhí)行異步任務(wù)。Java線程的創(chuàng)建于銷毀需要一定的開銷,如果我們?yōu)槊恳粋€任務(wù)創(chuàng)建一個新線程來...
    伊凡的一天閱讀 397評論 0 1
  • Executor框架的整體架構(gòu)和成員組件。Executor的結(jié)構(gòu)和Executor框架包含的成員組件。 在Java...
    巴巴11閱讀 279評論 0 0

友情鏈接更多精彩內(nèi)容