十二、Java中的線程池

轉(zhuǎn)《Java并發(fā)編程的藝術(shù)》

1.線程池的實(shí)現(xiàn)原理

線程池的主要處理流程

ThreadPoolExecutor執(zhí)行execute方法分下面4種情況。

1)如果當(dāng)前運(yùn)行的線程少于corePoolSize,則創(chuàng)建新線程來執(zhí)行任務(wù)(注意,執(zhí)行這一步驟需要獲取全局鎖)。

2)如果運(yùn)行的線程等于或多于corePoolSize,則將任務(wù)加入BlockingQueue。

3)如果無法將任務(wù)加入BlockingQueue(隊(duì)列已滿),則創(chuàng)建新的線程來處理任務(wù)(注意,執(zhí)行這一步驟需要獲取全局鎖)。

4)如果創(chuàng)建新線程將使當(dāng)前運(yùn)行的線程超出maximumPoolSize,任務(wù)將被拒絕,并調(diào)用
RejectedExecutionHandler.rejectedExecution()方法。

ThreadPoolExecutor采取上述步驟的總體設(shè)計(jì)思路,是為了在執(zhí)行execute()方法時,盡可能 地避免獲取全局鎖(那將會是一個嚴(yán)重的可伸縮瓶頸)。在ThreadPoolExecutor完成預(yù)熱之后 (當(dāng)前運(yùn)行的線程數(shù)大于等于corePoolSize),幾乎所有的execute()方法調(diào)用都是執(zhí)行步驟2,而步驟2不需要獲取全局鎖。

2.源碼分析

public void execute(Runnable command) {
  if (command == null)
    throw new NullPointerException();
  // 如果線程數(shù)小于基本線程數(shù),則創(chuàng)建線程并執(zhí)行當(dāng)前任務(wù)
  if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
  // 如線程數(shù)大于等于基本線程數(shù)或線程創(chuàng)建失敗,則將當(dāng)前任務(wù)放到工作隊(duì)列中。
  if (runState == RUNNING && workQueue.offer(command)) {
    if (runState != RUNNING || poolSize == 0)
      ensureQueuedTaskHandled(command);
  } 
  // 如果線程池不處于運(yùn)行中或任務(wù)無法放入隊(duì)列,并且當(dāng)前線程數(shù)量小于最大允許的線程數(shù)量,
  // 則創(chuàng)建一個線程執(zhí)行任務(wù)。
  else if (!addIfUnderMaximumPoolSize(command))
    // 拋出RejectedExecutionException異常
    reject(command); // is shutdown or saturated
  }
}

工作線程:線程池創(chuàng)建線程時,會將線程封裝成工作線程Worker,Worker在執(zhí)行完任務(wù)
后,還會循環(huán)獲取工作隊(duì)列里的任務(wù)來執(zhí)行。我們可以從Worker類的run()方法里看到這點(diǎn)。

public void run() {
  try {
    Runnable task = firstTask;
    firstTask = null;
    while (task != null || (task = getTask()) != null) {
      runTask(task);
      task = null;
    }
  } finally {
    workerDone(this);
  }
}

ThreadPoolExecutor中線程執(zhí)行任務(wù)的示意圖如圖所示。


ThreadPoolExecutor執(zhí)行任務(wù)示意圖

線程池中的線程執(zhí)行任務(wù)分兩種情況,如下。

1)在execute()方法中創(chuàng)建一個線程時,會讓這個線程執(zhí)行當(dāng)前任務(wù)。

2)這個線程執(zhí)行完上圖中1的任務(wù)后,會反復(fù)從BlockingQueue獲取任務(wù)來執(zhí)行。

3.線程池的使用

3.1 線程池的創(chuàng)建

我們可以通過ThreadPoolExecutor來創(chuàng)建一個線程池。

new ThreadPoolExecutor(corePoolSize,maximumPoolSize,keepAliveTime,milliseconds,runnableTaskQueue, handler);

創(chuàng)建一個線程池時需要輸入幾個參數(shù),如下。

1)corePoolSize(線程池的基本大小):當(dāng)提交一個任務(wù)到線程池時,線程池會創(chuàng)建一個線程來執(zhí)行任務(wù),即使其他空閑的基本線程能夠執(zhí)行新任務(wù)也會創(chuàng)建線程,等到需要執(zhí)行的任務(wù)數(shù)大于線程池基本大小時就不再創(chuàng)建。如果調(diào)用了線程池的prestartAllCoreThreads()方法,線程池會提前創(chuàng)建并啟動所有基本線程。

2)runnableTaskQueue(任務(wù)隊(duì)列):用于保存等待執(zhí)行的任務(wù)的阻塞隊(duì)列??梢赃x擇以下幾個阻塞隊(duì)列。

  • ArrayBlockingQueue:是一個基于數(shù)組結(jié)構(gòu)的有界阻塞隊(duì)列,此隊(duì)列按FIFO(先進(jìn)先出)原則對元素進(jìn)行排序。
  • LinkedBlockingQueue:一個基于鏈表結(jié)構(gòu)的阻塞隊(duì)列,此隊(duì)列按FIFO排序元素,吞吐量通常要高于ArrayBlockingQueue。靜態(tài)工廠方法Executors.newFixedThreadPool()使用了這個隊(duì)列。
  • SynchronousQueue:一個不存儲元素的阻塞隊(duì)列。每個插入操作必須等到另一個線程調(diào)用 移除操作,否則插入操作一直處于阻塞狀態(tài),吞吐量通常要高于LinkedBlockingQueue,靜態(tài)工廠方法Executors.newCachedThreadPool使用了這個隊(duì)列。
  • PriorityBlockingQueue:一個具有優(yōu)先級的無限阻塞隊(duì)列。

3)maximumPoolSize(線程池最大數(shù)量):線程池允許創(chuàng)建的最大線程數(shù)。如果隊(duì)列滿了,并且已創(chuàng)建的線程數(shù)小于最大線程數(shù),則線程池會再創(chuàng)建新的線程執(zhí)行任務(wù)。值得注意的是,如果使用了無界的任務(wù)隊(duì)列這個參數(shù)就沒什么效果。

4)ThreadFactory:用于設(shè)置創(chuàng)建線程的工廠,可以通過線程工廠給每個創(chuàng)建出來的線程設(shè) 置更有意義的名字。使用開源框架guava提供的ThreadFactoryBuilder可以快速給線程池里的線 程設(shè)置有意義的名字,代碼如下。new ThreadFactoryBuilder().setNameFormat("XX-task-%d").build();

5)RejectedExecutionHandler(飽和策略):當(dāng)隊(duì)列和線程池都滿了,說明線程池處于飽和狀態(tài),那么必須采取一種策略處理提交的新任務(wù)。這個策略默認(rèn)情況下是AbortPolicy,表示無法處理新任務(wù)時拋出異常。在JDK 1.5中Java線程池框架提供了以下4種策略?!bortPolicy:直接拋出異常?!allerRunsPolicy:只用調(diào)用者所在線程來運(yùn)行任務(wù)?!iscardOldestPolicy:丟棄隊(duì)列里最近的一個任務(wù),并執(zhí)行當(dāng)前任務(wù)?!iscardPolicy:不處理,丟棄掉。 當(dāng)然,也可以根據(jù)應(yīng)用場景需要來實(shí)現(xiàn)RejectedExecutionHandler接口自定義策略。如記錄日志或持久化存儲不能處理的任務(wù)。

6)AliveTime(線程活動保持時間):線程池的工作線程空閑后,保持存活的時間。所以, 如果任務(wù)很多,并且每個任務(wù)執(zhí)行的時間比較短,可以調(diào)大時間,提高線程的利用率。

7)TimeUnit(線程活動保持時間的單位):可選的單位有天(DAYS)、小時(HOURS)、分鐘(MINUTES)、毫秒(MILLISECONDS)、微秒(MICROSECONDS,千分之一毫秒)和納秒(NANOSECONDS),千分之一微秒。

3.2 向線程池提交任務(wù)

可以使用兩個方法向線程池提交任務(wù),分別為execute()和submit()方法。

execute()方法用于提交不需要返回值的任務(wù),所以無法判斷任務(wù)是否被線程池執(zhí)行成功。 通過以下代碼可知execute()方法輸入的任務(wù)是一個Runnable類的實(shí)例。

threadsPool.execute(new Runnable() {
  @Override
  public void run() {
      // TODO Auto-generated method stub
  }
});

submit()方法用于提交需要返回值的任務(wù)。線程池會返回一個future類型的對象,通過這個 future對象可以判斷任務(wù)是否執(zhí)行成功,并且可以通過future的get()方法來獲取返回值,get()方法會阻塞當(dāng)前線程直到任務(wù)完成,而使用get(long timeout, TimeUnit unit)方法則會阻塞當(dāng)前線程一段時間后立即返回,這時候有可能任務(wù)沒有執(zhí)行完。

Future<Object> future = executor.submit(harReturnValuetask);
try {
  Object s = future.get();
} catch (InterruptedException e) {
// 處理中斷異常
} catch (ExecutionException e) {
// 處理無法執(zhí)行任務(wù)異常
} finally {
// 關(guān)閉線程池
  executor.shutdown();
}

3.3 關(guān)閉線程池

可以通過調(diào)用線程池的shutdown或shutdownNow方法來關(guān)閉線程池。它們的原理是遍歷線 程池中的工作線程,然后逐個調(diào)用線程的interrupt方法來中斷線程,所以無法響應(yīng)中斷的任務(wù)可能永遠(yuǎn)無法終止。但是它們存在一定的區(qū)別,shutdownNow首先將線程池的狀態(tài)設(shè)置成 STOP,然后嘗試停止所有的正在執(zhí)行或暫停任務(wù)的線程,并返回等待執(zhí)行任務(wù)的列表,而 shutdown只是將線程池的狀態(tài)設(shè)置成SHUTDOWN狀態(tài),然后中斷所有沒有正在執(zhí)行任務(wù)的線程。

只要調(diào)用了這兩個關(guān)閉方法中的任意一個,isShutdown方法就會返回true。當(dāng)所有的任務(wù) 都已關(guān)閉后,才表示線程池關(guān)閉成功,這時調(diào)用isTerminaed方法會返回true。至于應(yīng)該調(diào)用哪一種方法來關(guān)閉線程池,應(yīng)該由提交到線程池的任務(wù)特性決定,通常調(diào)用shutdown方法來關(guān)閉線程池,如果任務(wù)不一定要執(zhí)行完,則可以調(diào)用shutdownNow方法。

4. 合理地配置線程池

要想合理地配置線程池,就必須首先分析任務(wù)特性,可以從以下幾個角度來分析。

  • 任務(wù)的性質(zhì):CPU密集型任務(wù)、IO密集型任務(wù)和混合型任務(wù)。

CPU密集型任務(wù): 主要是執(zhí)行計(jì)算任務(wù),響應(yīng)時間很快,cpu一直在運(yùn)行,這種任務(wù)cpu的利用率很高
IO密集型任務(wù):主要是進(jìn)行IO操作,執(zhí)行IO操作的時間較長,這是cpu出于空閑狀態(tài),導(dǎo)致cpu的利用率不高

CPU密集型:線程個數(shù)為CPU核數(shù)。這幾個線程可以并行執(zhí)行,不存在線程切換到開銷,提高了cpu的利用率的同時也減少了切換線程導(dǎo)致的性能損耗
IO密集型:線程個數(shù)為CPU核數(shù)的兩倍。到其中的線程在IO操作的時候,其他線程可以繼續(xù)用cpu,提高了cpu的利用率

  • 任務(wù)的優(yōu)先級:高、中和低。

  • 任務(wù)的執(zhí)行時間:長、中和短。

  • 任務(wù)的依賴性:是否依賴其他系統(tǒng)資源,如數(shù)據(jù)庫連接。

性質(zhì)不同的任務(wù)可以用不同規(guī)模的線程池分開處理。CPU密集型任務(wù)應(yīng)配置盡可能少的 線程,如配置Ncpu+1個線程的線程池。由于IO密集型任務(wù)線程并不是一直在執(zhí)行任務(wù),則應(yīng)配置盡可能多的線程,如2*Ncpu?;旌闲偷娜蝿?wù),如果可以拆分,將其拆分成一個CPU密集型任務(wù)和一個IO密集型任務(wù),只要這兩個任務(wù)執(zhí)行的時間相差不是太大,那么分解后執(zhí)行的吞吐量將高于串行執(zhí)行的吞吐量。如果這兩個任務(wù)執(zhí)行時間相差太大,則沒必要進(jìn)行分解。
可以通過Runtime.getRuntime().availableProcessors()方法獲得當(dāng)前設(shè)備的CPU個數(shù)。

優(yōu)先級不同的任務(wù)可以使用優(yōu)先級隊(duì)列PriorityBlockingQueue來處理。它可以讓優(yōu)先級高
的任務(wù)先執(zhí)行。

執(zhí)行時間不同的任務(wù)可以交給不同規(guī)模的線程池來處理,或者可以使用優(yōu)先級隊(duì)列,讓
執(zhí)行時間短的任務(wù)先執(zhí)行。

依賴數(shù)據(jù)庫連接池的任務(wù),因?yàn)榫€程提交SQL后需要等待數(shù)據(jù)庫返回結(jié)果,等待的時間越
長,則CPU空閑時間就越長,那么線程數(shù)應(yīng)該設(shè)置得越大,這樣才能更好地利用CPU。

建議使用有界隊(duì)

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容