Java線(xiàn)程池的實(shí)現(xiàn)原理

前言

線(xiàn)程是稀缺資源,如果被無(wú)限制的創(chuàng)建,不僅會(huì)消耗系統(tǒng)資源,還會(huì)降低系統(tǒng)的穩(wěn)定性,合理的使用線(xiàn)程池對(duì)線(xiàn)程進(jìn)行統(tǒng)一分配、調(diào)優(yōu)和監(jiān)控,有以下好處:
1、降低資源消耗;
2、提高響應(yīng)速度;
3、提高線(xiàn)程的可管理性。

Java1.5中引入的Executor框架把任務(wù)的提交和執(zhí)行進(jìn)行解耦,只需要定義好任務(wù),然后提交給線(xiàn)程池,而不用關(guān)心該任務(wù)是如何執(zhí)行、被哪個(gè)線(xiàn)程執(zhí)行,以及什么時(shí)候執(zhí)行。

demo

image

1、Executors.newFixedThreadPool(10)初始化一個(gè)包含10個(gè)線(xiàn)程的線(xiàn)程池executor;
2、通過(guò)executor.execute方法提交20個(gè)任務(wù),每個(gè)任務(wù)打印當(dāng)前的線(xiàn)程名;
3、負(fù)責(zé)執(zhí)行任務(wù)的線(xiàn)程的生命周期都由Executor框架進(jìn)行管理;

ThreadPoolExecutor

Executors是java線(xiàn)程池的工廠類(lèi),通過(guò)它可以快速初始化一個(gè)符合業(yè)務(wù)需求的線(xiàn)程池,如Executors.newFixedThreadPool方法可以生成一個(gè)擁有固定線(xiàn)程數(shù)的線(xiàn)程池。

image

其本質(zhì)是通過(guò)不同的參數(shù)初始化一個(gè)ThreadPoolExecutor對(duì)象,具體參數(shù)描述如下:

corePoolSize

線(xiàn)程池中的核心線(xiàn)程數(shù),當(dāng)提交一個(gè)任務(wù)時(shí),線(xiàn)程池創(chuàng)建一個(gè)新線(xiàn)程執(zhí)行任務(wù),直到當(dāng)前線(xiàn)程數(shù)等于corePoolSize;如果當(dāng)前線(xiàn)程數(shù)為corePoolSize,繼續(xù)提交的任務(wù)被保存到阻塞隊(duì)列中,等待被執(zhí)行;如果執(zhí)行了線(xiàn)程池的prestartAllCoreThreads()方法,線(xiàn)程池會(huì)提前創(chuàng)建并啟動(dòng)所有核心線(xiàn)程。

maximumPoolSize

線(xiàn)程池中允許的最大線(xiàn)程數(shù)。如果當(dāng)前阻塞隊(duì)列滿(mǎn)了,且繼續(xù)提交任務(wù),則創(chuàng)建新的線(xiàn)程執(zhí)行任務(wù),前提是當(dāng)前線(xiàn)程數(shù)小于maximumPoolSize;

keepAliveTime

線(xiàn)程空閑時(shí)的存活時(shí)間,即當(dāng)線(xiàn)程沒(méi)有任務(wù)執(zhí)行時(shí),繼續(xù)存活的時(shí)間;默認(rèn)情況下,該參數(shù)只在線(xiàn)程數(shù)大于corePoolSize時(shí)才有用;

unit

keepAliveTime的單位;

workQueue

用來(lái)保存等待被執(zhí)行的任務(wù)的阻塞隊(duì)列,且任務(wù)必須實(shí)現(xiàn)Runable接口,在JDK中提供了如下阻塞隊(duì)列:
1、ArrayBlockingQueue:基于數(shù)組結(jié)構(gòu)的有界阻塞隊(duì)列,按FIFO排序任務(wù);
2、LinkedBlockingQuene:基于鏈表結(jié)構(gòu)的阻塞隊(duì)列,按FIFO排序任務(wù),吞吐量通常要高于ArrayBlockingQuene;
3、SynchronousQuene:一個(gè)不存儲(chǔ)元素的阻塞隊(duì)列,每個(gè)插入操作必須等到另一個(gè)線(xiàn)程調(diào)用移除操作,否則插入操作一直處于阻塞狀態(tài),吞吐量通常要高于LinkedBlockingQuene;
4、priorityBlockingQuene:具有優(yōu)先級(jí)的無(wú)界阻塞隊(duì)列;

threadFactory

創(chuàng)建線(xiàn)程的工廠,通過(guò)自定義的線(xiàn)程工廠可以給每個(gè)新建的線(xiàn)程設(shè)置一個(gè)具有識(shí)別度的線(xiàn)程名。

image

handler

線(xiàn)程池的飽和策略,當(dāng)阻塞隊(duì)列滿(mǎn)了,且沒(méi)有空閑的工作線(xiàn)程,如果繼續(xù)提交任務(wù),必須采取一種策略處理該任務(wù),線(xiàn)程池提供了4種策略:
1、AbortPolicy:直接拋出異常,默認(rèn)策略;
2、CallerRunsPolicy:用調(diào)用者所在的線(xiàn)程來(lái)執(zhí)行任務(wù);
3、DiscardOldestPolicy:丟棄阻塞隊(duì)列中靠最前的任務(wù),并執(zhí)行當(dāng)前任務(wù);
4、DiscardPolicy:直接丟棄任務(wù);
當(dāng)然也可以根據(jù)應(yīng)用場(chǎng)景實(shí)現(xiàn)RejectedExecutionHandler接口,自定義飽和策略,如記錄日志或持久化存儲(chǔ)不能處理的任務(wù)。

Exectors

Exectors工廠類(lèi)提供了線(xiàn)程池的初始化接口,主要有如下幾種:

newFixedThreadPool
image

初始化一個(gè)指定線(xiàn)程數(shù)的線(xiàn)程池,其中corePoolSize == maximumPoolSize,使用LinkedBlockingQuene作為阻塞隊(duì)列,不過(guò)當(dāng)線(xiàn)程池沒(méi)有可執(zhí)行任務(wù)時(shí),也不會(huì)釋放線(xiàn)程。

newCachedThreadPool
image

1、初始化一個(gè)可以緩存線(xiàn)程的線(xiàn)程池,默認(rèn)緩存60s,線(xiàn)程池的線(xiàn)程數(shù)可達(dá)到Integer.MAX_VALUE,即2147483647,內(nèi)部使用SynchronousQueue作為阻塞隊(duì)列;
2、和newFixedThreadPool創(chuàng)建的線(xiàn)程池不同,newCachedThreadPool在沒(méi)有任務(wù)執(zhí)行時(shí),當(dāng)線(xiàn)程的空閑時(shí)間超過(guò)keepAliveTime,會(huì)自動(dòng)釋放線(xiàn)程資源,當(dāng)提交新任務(wù)時(shí),如果沒(méi)有空閑線(xiàn)程,則創(chuàng)建新線(xiàn)程執(zhí)行任務(wù),會(huì)導(dǎo)致一定的系統(tǒng)開(kāi)銷(xiāo);

所以,使用該線(xiàn)程池時(shí),一定要注意控制并發(fā)的任務(wù)數(shù),否則創(chuàng)建大量的線(xiàn)程可能導(dǎo)致嚴(yán)重的性能問(wèn)題。

newSingleThreadExecutor
image

初始化的線(xiàn)程池中只有一個(gè)線(xiàn)程,如果該線(xiàn)程異常結(jié)束,會(huì)重新創(chuàng)建一個(gè)新的線(xiàn)程繼續(xù)執(zhí)行任務(wù),唯一的線(xiàn)程可以保證所提交任務(wù)的順序執(zhí)行,內(nèi)部使用LinkedBlockingQueue作為阻塞隊(duì)列。

newScheduledThreadPool
image

初始化的線(xiàn)程池可以在指定的時(shí)間內(nèi)周期性的執(zhí)行所提交的任務(wù),在實(shí)際的業(yè)務(wù)場(chǎng)景中可以使用該線(xiàn)程池定期的同步數(shù)據(jù)。

實(shí)現(xiàn)原理

除了newScheduledThreadPool的內(nèi)部實(shí)現(xiàn)特殊一點(diǎn)之外,其它幾個(gè)線(xiàn)程池都是基于ThreadPoolExecutor類(lèi)實(shí)現(xiàn)的。

線(xiàn)程池內(nèi)部狀態(tài)

image

其中AtomicInteger變量ctl的功能非常強(qiáng)大:利用低29位表示線(xiàn)程池中線(xiàn)程數(shù),通過(guò)高3位表示線(xiàn)程池的運(yùn)行狀態(tài):
1、RUNNING:-1 << COUNT_BITS,即高3位為111,該狀態(tài)的線(xiàn)程池會(huì)接收新任務(wù),并處理阻塞隊(duì)列中的任務(wù);
2、SHUTDOWN: 0 << COUNT_BITS,即高3位為000,該狀態(tài)的線(xiàn)程池不會(huì)接收新任務(wù),但會(huì)處理阻塞隊(duì)列中的任務(wù);
3、STOP : 1 << COUNT_BITS,即高3位為001,該狀態(tài)的線(xiàn)程不會(huì)接收新任務(wù),也不會(huì)處理阻塞隊(duì)列中的任務(wù),而且會(huì)中斷正在運(yùn)行的任務(wù);
4、TIDYING : 2 << COUNT_BITS,即高3位為010;
5、TERMINATED: 3 << COUNT_BITS,即高3位為011;

任務(wù)提交

線(xiàn)程池框架提供了兩種方式提交任務(wù),根據(jù)不同的業(yè)務(wù)需求選擇不同的方式。

Executor.execute()
image

通過(guò)Executor.execute()方法提交的任務(wù),必須實(shí)現(xiàn)Runnable接口,該方式提交的任務(wù)不能獲取返回值,因此無(wú)法判斷任務(wù)是否執(zhí)行成功。

ExecutorService.submit()
image

通過(guò)ExecutorService.submit()方法提交的任務(wù),可以獲取任務(wù)執(zhí)行完的返回值。

任務(wù)執(zhí)行

當(dāng)向線(xiàn)程池中提交一個(gè)任務(wù),線(xiàn)程池會(huì)如何處理該任務(wù)?

execute實(shí)現(xiàn)
image

具體的執(zhí)行流程如下:
1、workerCountOf方法根據(jù)ctl的低29位,得到線(xiàn)程池的當(dāng)前線(xiàn)程數(shù),如果線(xiàn)程數(shù)小于corePoolSize,則執(zhí)行addWorker方法創(chuàng)建新的線(xiàn)程執(zhí)行任務(wù);否則執(zhí)行步驟(2);
2、如果線(xiàn)程池處于RUNNING狀態(tài),且把提交的任務(wù)成功放入阻塞隊(duì)列中,則執(zhí)行步驟(3),否則執(zhí)行步驟(4);
3、再次檢查線(xiàn)程池的狀態(tài),如果線(xiàn)程池沒(méi)有RUNNING,且成功從阻塞隊(duì)列中刪除任務(wù),則執(zhí)行reject方法處理任務(wù);
4、執(zhí)行addWorker方法創(chuàng)建新的線(xiàn)程執(zhí)行任務(wù),如果addWoker執(zhí)行失敗,則執(zhí)行reject方法處理任務(wù);

image
addWorker實(shí)現(xiàn)

從方法execute的實(shí)現(xiàn)可以看出:addWorker主要負(fù)責(zé)創(chuàng)建新的線(xiàn)程并執(zhí)行任務(wù),代碼實(shí)現(xiàn)如下:

image

這只是addWoker方法實(shí)現(xiàn)的前半部分:
1、判斷線(xiàn)程池的狀態(tài),如果線(xiàn)程池的狀態(tài)值大于或等SHUTDOWN,則不處理提交的任務(wù),直接返回;
2、通過(guò)參數(shù)core判斷當(dāng)前需要?jiǎng)?chuàng)建的線(xiàn)程是否為核心線(xiàn)程,如果core為true,且當(dāng)前線(xiàn)程數(shù)小于corePoolSize,則跳出循環(huán),開(kāi)始創(chuàng)建新的線(xiàn)程,具體實(shí)現(xiàn)如下:

image

線(xiàn)程池的工作線(xiàn)程通過(guò)Woker類(lèi)實(shí)現(xiàn),在ReentrantLock鎖的保證下,把Woker實(shí)例插入到HashSet后,并啟動(dòng)Woker中的線(xiàn)程,其中Worker類(lèi)設(shè)計(jì)如下:
1、繼承了AQS類(lèi),可以方便的實(shí)現(xiàn)工作線(xiàn)程的中止操作;
2、實(shí)現(xiàn)了Runnable接口,可以將自身作為一個(gè)任務(wù)在工作線(xiàn)程中執(zhí)行;
3、當(dāng)前提交的任務(wù)firstTask作為參數(shù)傳入Worker的構(gòu)造方法;

image

從Woker類(lèi)的構(gòu)造方法實(shí)現(xiàn)可以發(fā)現(xiàn):線(xiàn)程工廠在創(chuàng)建線(xiàn)程thread時(shí),將Woker實(shí)例本身this作為參數(shù)傳入,當(dāng)執(zhí)行start方法啟動(dòng)線(xiàn)程thread時(shí),本質(zhì)是執(zhí)行了Worker的runWorker方法。

runWorker實(shí)現(xiàn)
image

runWorker方法是線(xiàn)程池的核心:
1、線(xiàn)程啟動(dòng)之后,通過(guò)unlock方法釋放鎖,設(shè)置AQS的state為0,表示運(yùn)行中斷;
2、獲取第一個(gè)任務(wù)firstTask,執(zhí)行任務(wù)的run方法,不過(guò)在執(zhí)行任務(wù)之前,會(huì)進(jìn)行加鎖操作,任務(wù)執(zhí)行完會(huì)釋放鎖;
3、在執(zhí)行任務(wù)的前后,可以根據(jù)業(yè)務(wù)場(chǎng)景自定義beforeExecute和afterExecute方法;
4、firstTask執(zhí)行完成之后,通過(guò)getTask方法從阻塞隊(duì)列中獲取等待的任務(wù),如果隊(duì)列中沒(méi)有任務(wù),getTask方法會(huì)被阻塞并掛起,不會(huì)占用cpu資源;

getTask實(shí)現(xiàn)
image

整個(gè)getTask操作在自旋下完成:
1、workQueue.take:如果阻塞隊(duì)列為空,當(dāng)前線(xiàn)程會(huì)被掛起等待;當(dāng)隊(duì)列中有任務(wù)加入時(shí),線(xiàn)程被喚醒,take方法返回任務(wù),并執(zhí)行;
2、workQueue.poll:如果在keepAliveTime時(shí)間內(nèi),阻塞隊(duì)列還是沒(méi)有任務(wù),則返回null;

所以,線(xiàn)程池中實(shí)現(xiàn)的線(xiàn)程可以一直執(zhí)行由用戶(hù)提交的任務(wù)。

Future和Callable實(shí)現(xiàn)

通過(guò)ExecutorService.submit()方法提交的任務(wù),可以獲取任務(wù)執(zhí)行完的返回值。

image

在實(shí)際業(yè)務(wù)場(chǎng)景中,F(xiàn)uture和Callable基本是成對(duì)出現(xiàn)的,Callable負(fù)責(zé)產(chǎn)生結(jié)果,F(xiàn)uture負(fù)責(zé)獲取結(jié)果。
1、Callable接口類(lèi)似于Runnable,只是Runnable沒(méi)有返回值。
2、Callable任務(wù)除了返回正常結(jié)果之外,如果發(fā)生異常,該異常也會(huì)被返回,即Future可以拿到異步執(zhí)行任務(wù)各種結(jié)果;
3、Future.get方法會(huì)導(dǎo)致主線(xiàn)程阻塞,直到Callable任務(wù)執(zhí)行完成;

submit實(shí)現(xiàn)
image

通過(guò)submit方法提交的Callable任務(wù)會(huì)被封裝成了一個(gè)FutureTask對(duì)象。

FutureTask

image

1、FutureTask在不同階段擁有不同的狀態(tài)state,初始化為NEW;
2、FutureTask類(lèi)實(shí)現(xiàn)了Runnable接口,這樣就可以通過(guò)Executor.execute方法提交FutureTask到線(xiàn)程池中等待被執(zhí)行,最終執(zhí)行的是FutureTask的run方法;

FutureTask.get實(shí)現(xiàn)

image

內(nèi)部通過(guò)awaitDone方法對(duì)主線(xiàn)程進(jìn)行阻塞,具體實(shí)現(xiàn)如下:

image

1、如果主線(xiàn)程被中斷,則拋出中斷異常;
2、判斷FutureTask當(dāng)前的state,如果大于COMPLETING,說(shuō)明任務(wù)已經(jīng)執(zhí)行完成,則直接返回;
3、如果當(dāng)前state等于COMPLETING,說(shuō)明任務(wù)已經(jīng)執(zhí)行完,這時(shí)主線(xiàn)程只需通過(guò)yield方法讓出cpu資源,等待state變成NORMAL;
4、通過(guò)WaitNode類(lèi)封裝當(dāng)前線(xiàn)程,并通過(guò)UNSAFE添加到waiters鏈表;
5、最終通過(guò)LockSupport的park或parkNanos掛起線(xiàn)程;

FutureTask.run實(shí)現(xiàn)

image

FutureTask.run方法是在線(xiàn)程池中被執(zhí)行的,而非主線(xiàn)程
1、通過(guò)執(zhí)行Callable任務(wù)的call方法;
2、如果call執(zhí)行成功,則通過(guò)set方法保存結(jié)果;
3、如果call執(zhí)行有異常,則通過(guò)setException保存異常;

set
image
setException
image

set和setException方法中,都會(huì)通過(guò)UnSAFE修改FutureTask的狀態(tài),并執(zhí)行finishCompletion方法通知主線(xiàn)程任務(wù)已經(jīng)執(zhí)行完成;

finishCompletion
image

1、執(zhí)行FutureTask類(lèi)的get方法時(shí),會(huì)把主線(xiàn)程封裝成WaitNode節(jié)點(diǎn)并保存在waiters鏈表中;
2、FutureTask任務(wù)執(zhí)行完成后,通過(guò)UNSAFE設(shè)置waiters的值,并通過(guò)LockSupport類(lèi)unpark方法喚醒主線(xiàn)程;

鏈接:http://www.itdecent.cn/p/87bff5cc8d8c

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容