Java線程池實現(xiàn)原理及應(yīng)用

[toc]

概念

線程池是什么

線程池(Thread Pool)是一種基于池化思想管理線程的工具,經(jīng)常出現(xiàn)在多線程服務(wù)器中,如MySQL。

線程過多會帶來額外的開銷,其中包括創(chuàng)建銷毀線程的開銷,調(diào)度線程的開銷等等,同時也降低了計算機的整體性能,線程池維護多個線程,等待監(jiān)督管理者分配并發(fā)執(zhí)行的任務(wù),這種做法,一方面避免了處理任務(wù)時創(chuàng)建銷毀線程開銷的代價,另一方面避免了線程數(shù)量膨脹導致的過分調(diào)度問題,保證了對內(nèi)核的充分利用。

而本文描述的線程池是JDK中提供的ThreadPoolExecutor類。當然使用線程池可以帶來一系列好處:

  • 降低資源消耗:通過池化技術(shù)重復利用已創(chuàng)建的線程,減低線程創(chuàng)建和銷毀造成的損耗
  • 提高響應(yīng)速度:任務(wù)到達時無需等待線程創(chuàng)建即可立即執(zhí)行
  • 提高線程的可管理性:線程時稀缺資源,如果無限制創(chuàng)建,不僅會銷毀系統(tǒng)資源,還會因為線程的不合理分布導致資源調(diào)度不平衡,降低系統(tǒng)穩(wěn)定性,使用線程池可以進行統(tǒng)一的分配,調(diào)優(yōu)和監(jiān)控
  • 提供更多強大的功能:線程池具備可擴展性,允許開發(fā)人員向其中增加更多的功能,比如延時定時線程池ScheduledThreadPoolExecutor,就允許任務(wù)延期執(zhí)行或定期執(zhí)行。

線程池解決的問題是什么

線程池解決的核心問題就是資源管理問題,在并發(fā)環(huán)境下,系統(tǒng)不能夠穩(wěn)定在任意時刻中,有多少任務(wù)需要執(zhí)行,有多少資源需要投入,這種不確定性將帶來以下若干問題:

  • 頻繁申請/銷毀資源和調(diào)度資源,將帶來額外的消耗,可能會非常巨大
  • 對資源無限申請缺少抑制手段,易引發(fā)系統(tǒng)資源耗盡的風險
  • 系統(tǒng)無法合理管理內(nèi)部的資源分布,會降低系統(tǒng)的穩(wěn)定性

為解決資源分配問題,線程池采用了池化思想,池化,顧名思義,是為了最大化收益并最小化風險,而將資源統(tǒng)一在一起管理的一種思想。

典型的池化策略包括:

  • 內(nèi)存池:預先申請內(nèi)存,提升申請內(nèi)存速度,減少內(nèi)存碎片
  • 連接池:預先申請數(shù)據(jù)庫連接,提升申請連接的速度,降低系統(tǒng)開銷
  • 實例池:循環(huán)使用對象,減少資源在初始化和釋放時的昂貴損耗

線程池核心設(shè)計與實現(xiàn)

總體設(shè)計

Java中的線程池核心實現(xiàn)類是ThreadPoolExecutor,本章基于JDK1.8的源碼分析Java線程池的核心設(shè)計與實現(xiàn),我們先看一下ThreadPoolExecutor的UML類圖,了解一下ThreadPoolExecutor的繼承關(guān)系。

image

ThreadPoolExecutor實現(xiàn)的頂層接口是Executor,頂層接口Executor提供了一種思想:將任務(wù)提交和任務(wù)執(zhí)行解耦,用戶無需關(guān)注如何創(chuàng)建線程,如果調(diào)度線程來執(zhí)行任務(wù),用戶只需要提供Runnable對象,將任務(wù)的運行邏輯提交到執(zhí)行器(Executor)中,由Executor框架完成線程的調(diào)度和任務(wù)執(zhí)行的部分,ExecutorService接口增加了一些能力:

  • 擴充執(zhí)行任務(wù)能力,補充可以為一個或一批異步任務(wù)生成Future的方法
  • 提供了管控線程池的方法,比如停止線程池的運行,AbstractExecutorService則是上層的抽象類,將執(zhí)行的任務(wù)的流程串聯(lián)起來,保證下層的實現(xiàn)只需要關(guān)注一個執(zhí)行任務(wù)方法即可,最下層實現(xiàn)類ThreadPoolExecutor實現(xiàn)最復雜的運行部分,ThreadPoolExecutor將會一方面維護自身的生命周期,另一方面同時管理線程和任務(wù),是兩者良好的結(jié)合從而執(zhí)行并行任務(wù)。

ThreadPoolExecutor是如何運行,如何同時維護線程和執(zhí)行任務(wù)呢?其運行機制如下圖所示:

image

線程池在內(nèi)部實際上構(gòu)建了一個生產(chǎn)者消費者模型,將線程和任務(wù)兩者解耦,并不直接關(guān)聯(lián),從而良好的緩沖任務(wù),復用線程。線程池的運行主要分為兩部分:任務(wù)管理、線程管理。任務(wù)管理部分充當了生產(chǎn)者的角色,當任務(wù)提交后,線程池會判斷該任務(wù)后續(xù)流轉(zhuǎn):

  • 直接申請線程執(zhí)行該任務(wù)
  • 緩沖到隊列中等待線程執(zhí)行
  • 拒絕該任務(wù)。

線程管理部分是消費者,他們被統(tǒng)一維護在線程池內(nèi),根據(jù)任務(wù)請求進行維護線程的分配,當線程執(zhí)行任務(wù)后則會繼續(xù)獲取新的任務(wù)去執(zhí)行,最終當線程獲取不到任務(wù)時候,線程就會被回收(核心線程除外)。

接下來我們按照以下三個部分去詳細講解線程池運行機制:

  • 線程池如何維護自身狀態(tài)
  • 線程池如何管理任務(wù)
  • 線程池如何管理線程

生命周期的管理

線程池運行的狀態(tài),并不是用戶顯式的設(shè)置的,而是伴隨著線程池的運行,由內(nèi)部來維護。線程池內(nèi)部使用一個變量維護兩個值:運行狀態(tài)(runState)和線程數(shù)量(workerCounnt)。具體實現(xiàn)中,線程池將運行狀態(tài)(runState)、線程數(shù)量(workerCount)兩個關(guān)鍵參數(shù)的維護放在了一起,如下代碼所示:

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

ctl這個AtomicInteger類型,是對線程池的運行狀態(tài)和線程池中有效線程的數(shù)量進行控制的一個字段,他同時包含兩部分的信息:線程池的運行狀態(tài)(runState)和線程池內(nèi)有效線程的數(shù)量(workerCount),高三位保存runState,低29位保存workerCount,兩個變量之間互相不干擾。用一個變量去存儲兩個值,可避免在做相關(guān)決策時,出現(xiàn)不一致的情況,不必為了維護兩者一致,而占用鎖資源,通過閱讀線程池源碼可以發(fā)現(xiàn),經(jīng)常出現(xiàn)同時判斷線程池運行狀態(tài)和線程數(shù)量的情況。

關(guān)于內(nèi)部封裝的獲取生命周期狀態(tài),獲取線程池線程數(shù)量的計算方法如下代碼所示:

private static int runStateOf(int c)     { return c & ~CAPACITY; } //計算當前運行狀態(tài)
private static int workerCountOf(int c)  { return c & CAPACITY; }  //計算當前線程數(shù)量
private static int ctlOf(int rs, int wc) { return rs | wc; }   //通過狀態(tài)和線程數(shù)生成ctl

ThreadPoolExecutor的運行狀態(tài)有五種分別為:

運行狀態(tài) 狀態(tài)描述
RUNNING 能接受新提交的任務(wù),并且也能處理阻塞隊列中的任務(wù)
SHUTDOWN 關(guān)閉狀態(tài),不能接受新提交的任務(wù),但可以繼續(xù)處理阻塞隊列中已保存的任務(wù)
STOP 不能接受新的任務(wù),也不處理隊列中任務(wù),會中斷正在處理任務(wù)的線程
TIDYING 所有任務(wù)都已終止了,workerCount(有效線程)為0
TERMINATED 在terminated()方法執(zhí)行完成后進入該狀態(tài)

其生命周期轉(zhuǎn)換如下所示:

image

任務(wù)執(zhí)行機制

任務(wù)調(diào)度

任務(wù)調(diào)度是線程池的主要入口,當用戶提交一個任務(wù),接下來這個任務(wù)將如何執(zhí)行都是由這個階段決定的。了解這部分就相當于了解線程池的核心運行機制。

首先,所以任務(wù)的調(diào)度都是由execute方法(即使調(diào)用的是submit,最終也是通過execute執(zhí)行)完成的,這部分完成的工作是:檢查現(xiàn)在線程池中的運行狀態(tài)、運行線程數(shù),運行策略,決定接下來執(zhí)行的流程,是直接申請線程執(zhí)行,或是緩沖到隊列中執(zhí)行,亦或是直接拒絕任務(wù),其執(zhí)行過程如下:

  • 首先檢測線程池運行狀態(tài),如果不是RUNNING,則直接拒絕,線程池要保證在RUNNING的狀態(tài)下執(zhí)行任務(wù)
  • 如果workerCount<corePoolSize,則創(chuàng)建并啟動一個線程來執(zhí)行新提交的任務(wù)
  • 如果workerCount>=corePoolSize,且線程池內(nèi)的阻塞隊列未滿,則將任務(wù)添加到該阻塞隊列中
  • 如果workerCount>=corePoolSize&&workerCount<maxumumPoolSize,且線程池內(nèi)的阻塞隊列已滿,則創(chuàng)建并啟動一個線程來執(zhí)行新提交的任務(wù)
  • 如果workerCount>=maximumPoolSize,并且線程池內(nèi)的阻塞隊列已經(jīng)滿,則根據(jù)拒絕策略處理該任務(wù),默認的處理方式是直接拋異常。

流程圖如下所示:

image

任務(wù)緩沖

任務(wù)緩沖模塊是線程池能夠管理任務(wù)的核心部分,線程池的本質(zhì)是對任務(wù)和線程的管理,而做到這一點的關(guān)鍵的思想就是將任務(wù)和線程兩者解耦,不讓兩者直接關(guān)聯(lián),才可以做后續(xù)的分配工作。線程池中是以生產(chǎn)者和消費者模式,通過一個阻塞隊列來實現(xiàn)的。阻塞隊列中緩存任務(wù),工作線程從阻塞隊列中獲取任務(wù)。

阻塞隊列(BlockintQueue)是一個支持兩個附加操作的隊列,這兩個附加操作是:在隊列為空時,獲取元素的線程會等待隊列變?yōu)榉强?,當隊列滿時,存儲元素的線程會等待隊列可用。阻塞隊列常用于生產(chǎn)者和消費者的場景。生產(chǎn)者是往隊列里添加元素,下面展示線程1往阻塞隊列中添加元素,而線程2從阻塞隊列中移除元素:

image

使用不同的隊列可用實現(xiàn)不一樣的任務(wù)存儲策略,在這里我們可以在介紹一下阻塞隊列的成員:

名稱 描述
ArrayBlockingQueue 一個數(shù)組實現(xiàn)的有界的阻塞隊列,此隊列按照先進先出(FIFO)的原則對元素進行排序。支持公平鎖和非公平鎖
LinkedBlockingQueue 一個由鏈表結(jié)構(gòu)組成的有界隊列,此隊列按照先進先出(FIFO)的原則對元素進行排序,此隊列默認長度為Integer.MAX_VALUE,所以默認創(chuàng)建的該隊列有容量危險
PriorityBlockingQueue 一個支持優(yōu)先級排序的無界隊列,默認自然序進行排序,也可以自定義實現(xiàn)compareTo()方法來指定元素排序規(guī)則,不能同保證優(yōu)先級元素的順序
DelayQueue 一個實現(xiàn)PriorityBlockingQueue實現(xiàn)的延遲獲取的無界隊列,在創(chuàng)建元素時,可以指定多久才能從隊列中獲取當前元素,只有延時期滿后才能從隊列中獲取元素
SynchronousQueue 一個不存儲元素的阻塞隊列,每一個put操作必須等待take操作,否則不能添加元素,支持公平鎖和非公平鎖,SynchronousQueue一個使用場景是在線程池里,Exectors.newCachedThreadPool()就使用了SynchronousQueue,這個線程池根據(jù)需要(新任務(wù)到來時)創(chuàng)建新的線程,如果有空閑線程則會重復使用,線程空閑了60秒后就會被回收。
LinkedTransferQueue 一個由聯(lián)邦結(jié)構(gòu)組成的無界阻塞隊列,相當于其他隊列,LinkedTransferQueue對了transfer和tryTransfer方法
LinkedBlockingDeque 一個鏈表結(jié)構(gòu)組成的雙向阻塞隊列。隊列頭和尾部都可以添加和移除元素,多線程并發(fā)時,可以將鎖的競爭最多降到一半。

任務(wù)申請

由上文的任務(wù)分配部分可知,任務(wù)的執(zhí)行有兩種可能:一種是任務(wù)直接由新創(chuàng)建的線程執(zhí)行。另一種是線程從任務(wù)隊列中獲取任務(wù)然后執(zhí)行,執(zhí)行完任務(wù)的空閑線程會字詞從隊列中申請任務(wù)去執(zhí)行。第一種情況僅僅出現(xiàn)在線程初始創(chuàng)建的時候,第二種情況是線程獲取任務(wù)的絕大數(shù)情況。

線程需要從任務(wù)緩存模塊中不斷獲取任務(wù)執(zhí)行,幫助線程從阻塞隊列中獲取任務(wù),實現(xiàn)線程管理模塊和任務(wù)管理模塊之間的通信,這部分策略由getTask方法實現(xiàn),其執(zhí)行流程如下圖所示:

image

getTask這部分進行了多次判斷,為的是控制線程的數(shù)量,使其符合線程池的狀態(tài),如果線程池現(xiàn)在不應(yīng)該持有那么多線程,則會返回null值,工作線程Worker會不斷接受新任務(wù)去執(zhí)行,而當工作線程Worker接受不到任務(wù)的時候,就會開始被回收。

任務(wù)的拒絕

任務(wù)拒絕模塊是線程池的保護部分,線程池有一個最大容量,當線程池的任務(wù)緩存隊列已滿,并且線程池中的線程數(shù)目達到maximumPoolSize時,就需要拒絕掉任務(wù),采取任務(wù)的拒絕策略,保護線程池。

拒絕策略是一個接口,其設(shè)計如下:

public interface RejectedExecutionHandler {
    void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}

用戶可以通過實現(xiàn)這個接口去定值拒絕策略,也可以選擇JDK提供的四種已有的拒絕策略,其特點如下:

名稱 描述
AbortPolicy 丟棄任務(wù)并拋出RejectedExecution異常。這是線程初次默認的拒絕策略,在任務(wù)不能再提交的時候,拋出異常,及時反饋程序運行狀態(tài),如果是比較關(guān)鍵的業(yè)務(wù),推薦使用此拒絕策略,這樣在系統(tǒng)不能承受更大并發(fā)量的時候,能夠及時的通過異常發(fā)現(xiàn)
DiscardPolicy 丟棄任務(wù),但是不拋出異常,可能會使我們無法大仙系統(tǒng)的異常狀態(tài),建議一些無關(guān)緊要的業(yè)務(wù)采取此策略
DiscardOldPolicy 丟棄隊列最前面的任務(wù),然后重新提交被拒絕的任務(wù),是否采用此種拒絕策略,還得根據(jù)實際業(yè)務(wù)是否允許丟棄老任務(wù)來認真衡量
CallerRunsPolicy 由調(diào)用線程(提交任務(wù)的線程)處理該任務(wù),這種情況需要讓所有的任務(wù)都執(zhí)行完畢,那么就適合大量計算的任務(wù)類型去執(zhí)行,多線程僅僅增大了吞吐量的手段,最終必須要讓每個任務(wù)都執(zhí)行完畢

Worker線程管理

Worker線程

線程池為了掌握線程的狀態(tài)并維護線程的生命周期,設(shè)計了線程池內(nèi)的工作線程Worker。我們看一下他的部分代碼:

private final class Worker extends AbstractQueuedSynchronizer implements Runnable{
    final Thread thread;//Worker持有的線程
    Runnable firstTask;//初始化的任務(wù),可以為null
}

Worker這個工作線程,實現(xiàn)了Runnable接口,并持有一個線程thread,一個初始化的任務(wù)firstTask,thread是在調(diào)用構(gòu)造方法時通過ThreadFactory創(chuàng)建的線程,可以用了執(zhí)行任務(wù),firstTask用他保存?zhèn)魅氲牡谝粋€任務(wù),這個任務(wù)可以有也可以為null,如果這個值是非null,那么線程就會在啟動初期立即執(zhí)行這個任務(wù),也就是核心線程創(chuàng)建時的情況,如果這個值是null,那么就需要創(chuàng)建一個線程取執(zhí)行任務(wù)列表(workerQueue)中的任務(wù),也就是非核心線程的創(chuàng)建,Workeer執(zhí)行任務(wù)的模型如下圖所示:

image

線程池需要管理線程的生命周期,需要在線程長時間不運行的時候進行回收,線程池使用一張Hash表去持有線程的引用,這樣就可以通過添加引用,移除引用這樣的操作來控制線程的生命周期,這個時候需要的就是如何判斷線程是否在運行。

Worker是通過繼承了AQS,使用AQS來實現(xiàn)獨占鎖這個功能,沒有使用可重入鎖ReentrantLock,而是使用AQS,為的是實現(xiàn)不可重入的特性去反應(yīng)線程現(xiàn)在的執(zhí)行狀態(tài)。

  • lock方法一旦獲取了獨占鎖,表示當前線程正在執(zhí)行任務(wù)中。
  • 如果正在執(zhí)行任務(wù),則不應(yīng)該中斷線程
  • 如果該線程現(xiàn)在不是獨占鎖,也就是空閑狀態(tài),說, 他沒有在處理任務(wù),這是可以對線程進行中斷
  • 線程池在執(zhí)行shutdown方法或tryTerminate方法時會調(diào)用interruptIdleWorkers方法,中斷空閑的線程,interruptIdleWorkers方法會使用tryLock方法來判斷線程池中的線程是否是空閑狀態(tài),如果線程時空閑狀態(tài)則回收。

在線程回收過程中就是用了這種特性,回收過程如下圖所示:

image

Worker線程增加

增加線程時通過線程池中的addWorker方法,該方法的功能就是增加一個線程,該方法不考慮線程池是在哪個階段增加的該線程,這個分配線程的策略是在上一個步驟完成的,該步驟僅僅完成增加線程,并使用他運行,最后返回是否成功這個結(jié)果,addWorker方法有兩個參數(shù):firstTask、core。

  • firstTask:參數(shù)用于指定新增的線程執(zhí)行的第一個任務(wù),該參數(shù)可以為null
  • core參數(shù)為true表示新增的線程時會判斷當前活動線程是否少于corePoolSize,false表示新增線程前需要判斷當前活動線程是否少于maximumPoolSize,其執(zhí)行流程如下圖所示:
image

Worker線程回收

線程池中線程的銷毀依賴于JVM自動的回收,線程池做的工作是根據(jù)當前線程池的狀態(tài)維護一定數(shù)量的線程引用,防止這部分線程被JVM回收,當線程池覺得哪些線程需要回收時,只需要將其引用消除即可,Worker被創(chuàng)建出來后,就會不斷進行輪詢,然后獲取任務(wù)執(zhí)行,核心線程可以無限等待獲取任務(wù),非核心線程需要限時獲取任務(wù),當Worker無法獲取到任務(wù),也就是獲取任務(wù)為空時,循環(huán)結(jié)束,Worker會主動消除自身在線程池內(nèi)的引用

try {
  while (task != null || (task = getTask()) != null) {
    //執(zhí)行任務(wù)
  }
} finally {
  processWorkerExit(w, completedAbruptly);//獲取不到任務(wù)時,主動回收自己
}
//getTask輪詢獲取任務(wù),直到獲取成功,核心線程可以一直阻塞,但是非核心線程不能一直阻塞
private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?

        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

            int wc = workerCountOf(c);

            // Are workers subject to culling?
            //線程是否需要回收,如果核心線程也可以被回收那么就是true,如果wc>corePoolSize說明線程池內(nèi)的線程大于核心線程數(shù),可以被回收的,如果allowCoreThreadTimeOut不能被回收,并且線程池的線程數(shù)小于等于核心線程,那么不能回收可以一直阻塞等待
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
                //如果需要回收,那么調(diào)用poll方法設(shè)置獲取任務(wù)阻塞時長,如果不能回收調(diào)用take,可以一直阻塞
                Runnable r = timed ?
                   workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

線程回收的工作是在processWorkerExit方法完成的。

image

事實上,這個方法中,將線程引用移出線程池就已經(jīng)結(jié)束蘇了線程銷毀的部分,但由于引起線程銷毀的可能性很多,線程池,還要判斷是什么引發(fā)了這次銷毀,是否要改變線程池現(xiàn)階段狀態(tài),是否會根據(jù)新的狀態(tài),重新分配線程。

Worker線程執(zhí)行任務(wù)

在Worker類中的run方法調(diào)用了runWork方法來執(zhí)行任務(wù)的,runWork方法的執(zhí)行過程下:

  • while循環(huán)不斷通過getTask()方法獲取任務(wù)
  • getTask()方法從阻塞隊列中取任務(wù)
  • 如果線程池正在停止,那么要保證當前線程是中斷狀態(tài),要么保證當前線程不是中斷狀態(tài)
  • 執(zhí)行任務(wù)
  • 如果getTask結(jié)果為null,則跳出循環(huán),執(zhí)行processWorkerExit()方法,銷毀線程。

流程圖如下:


image
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

友情鏈接更多精彩內(nèi)容