這幾天秋招面試的時候問到了線程池原理,因為線程池這塊都是只了解API,當時沒能很好的回答面試官提出的問題,花了整整一晚上結(jié)合別人的博客看了下源碼,了解了線程池的大概執(zhí)行流程,寫一篇博客總結(jié)一下,這里我不細扣邏輯,網(wǎng)上的博客大部分都有對處理進行總結(jié),我在這里主要通過源碼來分析這些邏輯的實現(xiàn)。
網(wǎng)上隨便找了篇博客,也對關鍵參數(shù)和拒絕策略以及線程池的優(yōu)缺點有說明,這里不在復述:
https://www.cnblogs.com/spec-dog/p/11149741.html
通常我們創(chuàng)建線程池有兩種方式,一種是通過線程池工廠Executor創(chuàng)建線程,另一種是通過new自己傳參創(chuàng)建線程池
如下:

點開Executors的newCachedThreadPool()方法我們可以看到,其實本質(zhì)上他也是通過new的方式傳參創(chuàng)建線程池并返回,但是返回的引用類型為ExecutorService。

我們通過這個ExecutorService類型的引用就可以使用線程池的幾乎所有API,那么我們可以推測出這個ExecutorService就是線程池的規(guī)范接口。

ThreadPoolExecutor直接繼承自抽象類AbstractExecutorService

AbstractExecutorService實現(xiàn)了ExecutorService接口并實現(xiàn)了一些方法。

ExecutorService這個接口繼承自Executor接口,其中定義了包括shotdownNow(),isShutDown()等我們使用線程池中可能會直接使用到的方法,從這里可以看出ExecutorService確實是線程池的規(guī)范接口,并且這些線程池的關鍵方法沒有在AbstractExecutorService中實現(xiàn),而是由ThreadPoolExecutor直接實現(xiàn)。

ExecutorService繼承的Executor接口只定義了一個方法execute,也就是我們平時將任務添加到線程池中直接使用的方法。這個接口也是線程池的頂級接口。這里我們可以大致整理出ThreadPoolExecutor->AbstractExecutorService->ExecutorService->Executor
這里提一下,Executor和Executors雖然只差了一個s,卻是完全不同的兩個東西,Executor是線程池的頂級接口,Executors則是創(chuàng)建線程池的工廠類,是一個有很多靜態(tài)方法用于創(chuàng)建不同類型線程池的工具類。
在了解源碼之前我們要記住幾個關鍵的變量(這里說的不是常見的一些創(chuàng)建線程池使用到的參數(shù)):
1.ctl

這是一個很神奇的變量,它通過一個變量,同時存儲了當前線程數(shù)量以及線程池狀態(tài)。Doug
lea也定義了一些方法如runStateOf,這些不同的方法對同一個變量進行操作可以返回不同的數(shù)據(jù),如runStateOf返回的就是當前線程池的狀態(tài),workerCountOf返回的就是線程池的線程數(shù)量,而他們讀取的參數(shù)為同一個參數(shù)ctl。ctl是AtomicInteger類型的變量。

點開這個類我們發(fā)現(xiàn),其實這個類只有在一個int類型的參數(shù)value,只不過Doug lea在這里封裝了對這個Int的許多原子操作方法。這些原子操作的方法底層使用的是Unsafe的原子操作方法。因為ctl存儲的信息,可以說是線程池的每一個線程都一定會使用它來讀取線程池狀態(tài),所以這里保證ctl的原子性還是很有必要的。
2.workQueue

工作隊列,當核心線程數(shù)滿了后,用于存儲待執(zhí)行任務的隊列,這里要注意兩件事,
一:他是BlockingQueue,這是一個線程安全的隊列,也就是說不同的線程對這個隊列執(zhí)行添加或取出任務的時候不需要加鎖。
二:他的存儲類型為runable,說到這里,我們猜都可以猜到,如果需要執(zhí)行任務,那么一定是工作線程將隊列中的任務取出直接調(diào)用其run方法,也可以推測出我們需要添加的任務必須實現(xiàn)runable接口并把具體任務實現(xiàn)寫在run方法中。
3.mainLock

一個ReentrantLock類型的鎖,是線程池的內(nèi)置鎖,那么我們可以推測出可能線程池的許多方法都可能會使用到這把鎖,雖然存儲任務的隊列workQueue是線程安全的,但是還是有一些變量,如下面所說的workers,HashSet類型線程不安全的集合,對這種非線程安全的集合進行操作,肯定會存在競爭情況(如多個線程嘗試同時取任務),那么就需要mainlock進行加鎖,以保證線程安全性。
4.workers

這里很關鍵,有些博客也會提到這個變量,它是存儲worker的HashSet類型集合,作用是存儲那些工作線程。但是那些博客寫到這里就戛然而止了,我想看的東西全都沒有!worker是怎么工作的呢?worker是怎么被回收的呢?worker是怎么創(chuàng)建的呢?
我們點開這個Worker

它實現(xiàn)了AbstractQueueSynchronizer接口,也就是我們常說的AQS,這里就不細扣AQS了,有時間的話我再花時間總結(jié)一個Reentrantlock的博客再具體分析。
這里我們關鍵看三個地方
一:他實現(xiàn)了runable接口
實現(xiàn)runable一定就有run方法,看這個類的名字Worker,我們推測他是以start的方式執(zhí)行run方法,而不會是通過直接調(diào)用run方法的形式運行run方法(要說為什么這么推測,任務的run方法肯定是直接調(diào)用,工人(worker)的run方法肯定是需要啟動)。那么就一定會有去start他的線程。
二:Thread thread
根據(jù)Worker的構造方法

我們可以發(fā)現(xiàn),這個Thread類型的參數(shù)確實就是運行Worker run方法的啟動線程。
三:firstTask
這是一個runable類型的變量,正好對應上我們之前看到的存儲待執(zhí)行工作workQueue的存儲類型,再根據(jù)變量名,我們可以推測出他是Worker啟動后第一個執(zhí)行的任務,由上面的構造函數(shù)我們也可以看到他是在Worker創(chuàng)建時就被添加的。
最關鍵的run方法我們后面再說,了解了worker的基本實現(xiàn),我們可以開始講流程實現(xiàn)不帶停了!
由上面的接口關系我們就可以大概了解executor這個方法的重要性,那么它又是怎么實現(xiàn)的呢?
我們直接點進源碼:

這段代碼其實網(wǎng)上的大部分博客都能看的到,這里也簡單分析一下,
1.首先判斷當前線程數(shù)是否小于核心線程數(shù),小于的話addWorker創(chuàng)建核心線程執(zhí)行任務。
2.不小于的話嘗試將任務添加到任務隊列。
3.如果線程池不處于運行狀態(tài),執(zhí)行拒絕策略。否則通過addWorker創(chuàng)建非核心線程執(zhí)行任務。
4.如果達到線程池最大容量即非核心線程也無法創(chuàng)建,執(zhí)行拒絕策略。
除了第二種情況,和第四種情況,創(chuàng)建核心線程和非核心線程都需要調(diào)用addWorker方法。我們點開這個方法。
這個方法主要分為兩個部分,
我們先看第一部分:

這里主要是對線程池狀態(tài)進行判斷,在兩個死循環(huán)中。
先看外面的循環(huán):外層循環(huán)首先會通過ctl獲取當前線程的數(shù)量,程池被shutdownNow,那么return false,即添加失敗,這里還有一個判斷條件firstTask==null&&!workQueue.isEmpty()用于判斷線程池被shotdown但不是shutdownNow的情況,但是怎么會有提交的任務為null呢?這里就涉及到了線程池后面的線程回收機制。我們后面再提。
再到內(nèi)層循環(huán):這時候傳入的參數(shù)boolean類型的core就發(fā)揮作用了,根據(jù)添加的是否為核心線程,這里的判斷條件也不一樣,若為核心線程,那么會根據(jù)創(chuàng)建線程池設置的核心線程數(shù)量比較,若大于設置的核心線程數(shù)量,則創(chuàng)建失敗。如果添加的不是核心線程,那么就會和創(chuàng)建線程池設置的最大線程數(shù)量比較,如果當前線程數(shù)已經(jīng)不小于最大線程數(shù)量,那么即使你是非核心線程,也無法成功被添加了。
接著通過cas操作嘗試增加線程池的當前線程數(shù),如果成功添加,則通過goto語句跳出當前的兩個死循環(huán),執(zhí)行之后的語句,如果cas操作失敗,先會判斷當前線程數(shù)量有沒有被修改,如果已經(jīng)被修改,那么通過continue跳出二重循環(huán),并重新從最外層的循環(huán)開始執(zhí)行,重新通過ctl讀取線程池的最新狀態(tài)。如果沒有被修改,那么正常執(zhí)行,即繼續(xù)內(nèi)層循環(huán),繼續(xù)嘗試通過cas添加當前線程數(shù)。
再看第二部分:

如果第一部分我們執(zhí)行成功,那么已經(jīng)修改ctl,對線程池的當前線程數(shù)進行了更新,第二部分就是對worker具體添加過程的實現(xiàn)。
一上來就通過new Worker(firstTask)的方式創(chuàng)建了Worker,接著加鎖,加鎖是為了保證對workers操作的安全性,接著更新如largestPoolSize線程池最大線程數(shù)量、workerAdded是否添加成功這樣的一些參數(shù)狀態(tài)。
通過if(workerAdded)判斷線程是否添加成功
添加成功后直接執(zhí)行worker的內(nèi)置線程,即開始運行firstTask。
這里提一下,finally下面有一個:

若添加失敗,則執(zhí)行addWorkerFailed

這個方法就是在addWorker添加失敗后對之前的操作執(zhí)行撤回的一個方法。
首先加鎖對workers進行操作,remove掉之前添加的worker,接著調(diào)用decrementWorkerCount()

這個方法底層是一個CAS操作,對ctl里的線程數(shù)量進行-1操作,可以理解為撤銷我們addWorker這個方法第一部分的操作。
接著調(diào)用tryTerminate()方法,這個方法會判斷當前線程池的狀態(tài),并可能對線程池的狀態(tài)進行修改。因為addWorker失敗可能是遇到shutDownNow等修改線程池狀態(tài)的操作,這里會判斷當前線程池的狀態(tài)是否是遇到了那樣的情況,是否需要進行對應操作。
由上面的描述我們可以知道,addWorker添加完worker后會直接啟動worker,那么worker的run方法又是什么樣的呢。worker是怎么執(zhí)行任務的呢?
上代碼:

run方法直接調(diào)用了runWorker方法,我們點開runWorker:
這里代碼比較長我分兩次截圖。


核心部分了:
while (task != null || (task = getTask()) != null)
首先會通過run方法執(zhí)行firstTask,執(zhí)行完畢后會將task置為null,那么task!=null的判斷條件肯定不通過,它就會嘗試通過getTask(),從任務隊列中獲取任務。
但這個getTask()可不是想拿就拿,它的功能很簡單,從任務隊列中取任務,但是判斷條件卻非常繁瑣。各種特殊情況,它都會返回null。
這里的判斷比較繞,建議反復理解
(如果對細節(jié)不感興趣可以直接跳過getTask該部分)。

if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
首先:
會判斷線程池狀態(tài),如果線程池狀態(tài)為shutdown等特殊狀態(tài),直接返回null。如果線程池處于正常狀態(tài),通過ctl拿到當前線程數(shù)量。
接著:
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
他通過||來進行判斷,線程池是否允許核心線程被回收,線程池當前線程數(shù)量是否大于核心線程最大數(shù)量。只有當線程池不允許核心線程被回收且當前線程數(shù)量小于核心線程最大數(shù)時,Timed才會為false。
然后:
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount?)
return null;
continue;
}
我們來分析這段判斷條件,首先判斷當前線程是否大于線程池最大線程數(shù),是的話不用看后面的直接返回null。
如果不大于,那么接著判斷timed(上面已經(jīng)講過)和timedOut,timeOut在進入getTask默認為false,如果嘗試取任務時隊列里已經(jīng)沒有任務了,那么timeOut會改為true,第二次循環(huán)判斷同樣的條件可能出現(xiàn)不同的結(jié)果。
(wc > 1 || workQueue.isEmpty())
最后判斷線程數(shù)是否>1或任務隊列已經(jīng)為空
(這個條件用于判斷當前線程池是否還有線程或者任務隊列已無任務)
若當前線程池沒有線程并且隊列中仍有任務,那么即使可能存在超時情況,也任然會繼續(xù)跳過該判斷執(zhí)行后面的語句。
(從判斷條件來看,若當前線程數(shù)已經(jīng)大于核心線程數(shù),那么就一定會返回null,可以理解為非核心線程執(zhí)行完自己的firsttask后,是無法通過gettask拿到隊列中的任務的。但是若設置了允許核心線程超時回收那么即使當前線程數(shù)沒有達到核心線程數(shù),gettask依舊會返回null,保底只維持一個線程執(zhí)行任務)
走到這里終于通過了以上兩個可能返回null的判斷
第一個判斷當前線程池狀態(tài)。
第二個判斷當前線程池是否滿足創(chuàng)建線程池的參數(shù)設置。
走到這里如果都沒問題,那么則拿任務,如果任務不為null,返回任務,如果任務為null,那么設置timeOut為true(這時候其實就是線程拿不到線程進入超時狀態(tài)了),因為是在死循環(huán)中,那么會在getTask方法中再次嘗試取任務,但是這次timeOut已經(jīng)為true,如果timed也為true,那么就會出現(xiàn)timed&&timeOut的情況,根據(jù)上面第二個返回為null的判斷我們可以推測出,該線程getTask會返回null。
(這里的判斷條件非常繁瑣,各種判斷的組合太過復雜,我沒辦法描述出各種具體情況,有點只可意會不可言傳的味道,想要理解的老哥需要自己結(jié)合代碼理解情況。)
回到runWorker,我們知道在這里線程通過循環(huán)嘗試從任務隊列取任務,但是當getTask取任務失敗之后呢?
會執(zhí)行finllay里的processWorkerExit()方法,該方法可以說是線程池中worker的回收機制實現(xiàn)。
上代碼:

我們看到它首先加鎖,不管三七二十一先把當前worker從workers中remove,接著又是一頓判斷
先判斷線程池狀態(tài)。
接著給min賦值,根據(jù)是否允許核心線程超時不被回收,min會被賦0或corePoolSize(允許的核心線程最大數(shù)量)。
如果任務隊列不為空,則設置min=1
(這里我的理解是,當線程進入processWorkerExit,那么最可能的情況就是任務隊列已經(jīng)空了跳出了runWorker的循環(huán),才會進入到processWorkerExit,那么在processWrokerExit中判斷的workQueue.isEmpty()添加進來的任務最大的可能是剛剛添加的新任務,此時queue大概率沒有滿且只有一點點剛添加進來的新任務,那么線程池就會留一個線程去執(zhí)行這個任務)
最關鍵的地方來了:
若allowCoreThreadTimeOut==true,且當前線程數(shù)不大于corePoolSize,就會執(zhí)行 addWorker(null, false);
怎么樣,是不是很熟悉,沒有錯,線程池線程回收機制就是通過processWorkerExit這樣維持線程池核心線程數(shù)量? 還記得addWorker的第一部分兩個循環(huán)的外層循環(huán)嗎?!(rs==shutdown&&first==nul&&!workqueue.isEmpty),當時判斷的條件我們現(xiàn)在可以看出,即使線程池已經(jīng)被shutdown,只要當前隊列不為空,那么通過ProcessWrokerExit創(chuàng)建用于維持核心線程數(shù)的addWorker就不會創(chuàng)建失敗。
但是還有一個讓人疑惑的地方,既然已經(jīng)知道了當線程不允許核心線程超時被回收(超時即任務隊列里沒任務了),且當前線程數(shù)小于創(chuàng)建線程時所規(guī)定的核心線程數(shù)時,就會以addWorker(null, false)的方式創(chuàng)建新的核心線程,但是到這里肯定有人會疑惑,如果沒有達到核心線程的大小且不允許核心線程超時被回收,那么這里創(chuàng)建的應該是核心線程才對啊,應該為addWorker(null,true)。
其實這里true和false的差別不大,根據(jù)上面的代碼分析我們可以看到,即使是核心線程,且線程池設置不允許回收超時的核心線程,執(zhí)行完任務后processWorkerExit還是不管三七二十一先把worker給回收了,接著再判斷是否需要添加新的worker以維持核心線程數(shù)量。
那么addWorker(null,true)和addWorker(null,false)的區(qū)別在哪呢?我們回到addWorker第一部分兩個循環(huán)判斷里的內(nèi)循環(huán),找到這么一段。
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
在這里根據(jù)core為true或false對線程池的線程數(shù)又進行了一次判斷,但這里源碼使用的時addWorker(null,false),所以哪怕超過了核心線程數(shù),只要不大于線程池最大線程容量,也依舊會執(zhí)行addWorker的后續(xù)代碼。
所以我們可以得出區(qū)別:
在這么一種情況下:若兩個線程同時執(zhí)行processWorkerExit,若源碼為addWorker(null,true),則當?shù)谝粋€線程addWorker成功后,第二個線程會因為第一個線程addWorker成功而導致在addWorker的時候無法通過內(nèi)層循環(huán)的判斷,因為為core為true,所以判斷當前線程數(shù)已經(jīng)等于coolPoolSize,所以添加失敗,若源碼為addWorker(null,false),則會因為當前線程數(shù)雖然大于coolPoolSize卻小于maximumPoolSize而添加成功。
(Doug lea的設計具體為什么要這樣我只能理解皮毛,但目前這么設計只能理解為任務隊列里存在任務時,比coolPoolSize數(shù)多一點線程去執(zhí)行任務也不是什么壞事。如果任務隊列已經(jīng)為空,那么下一次執(zhí)行到processWorkerExit,多出來的線程一樣會因為線程數(shù)大于coolPoolSize而被回收,無傷大雅)
線程池中線程在回收線程的時候首先通過getTask方法建議回收,如果允許核心線程超時被回收那么每個在任務隊列里已經(jīng)沒有任務,即線程進入超時狀態(tài)后,核心線程執(zhí)行完任務,getTask只會保留一個線程工作,其他worker都會被返回null,在runWorker方法中跳出while循環(huán),執(zhí)行后面的procrsworkerexit回收機制,processworkerexit則會具體執(zhí)行回收機制,再次通過是否允許回收超時核心線程這一參數(shù)來決定通過addworker維持的線程池線程數(shù)量,如果允許被回收,那么僅維持一個,如果不允許被回收,那么維持corepoolsize個。
可以看到在核心線程不允許被超時回收且當前線程數(shù)量沒有大于corePoolSize時,線程池隊列中沒有任務(即進入超時狀態(tài))線程池的核心線程會卡在getTask中,不停的嘗試獲取任務,但如果設置了允許核心線程被超時回收,那么僅僅會維持一個線程卡在循環(huán)中,其余的都會通過processWorkekExit進行回收。
發(fā)現(xiàn)當corePoolSize設置的比較大時,其實哪怕即使不使用線程池,線程池中的線程也不是被掛起,而是卡在死循環(huán)中不斷嘗試獲取任務,這樣是十分損耗性能的!
所以初始化線程池設置參數(shù),其實也非常講究??!