Executor線程池源碼分析

日期:2020/7/5??

最近看并發(fā)源碼,把自個(gè)理解整理記錄,說(shuō)實(shí)話......一天不看就得重頭看,腦子可能比較笨,要反復(fù)琢磨理解,進(jìn)度比較慢,還沒(méi)整理研究完.......最終目的能自己手寫一個(gè)線程池

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? 線程池? ? ? ? ? ? ? ?

一.線程池使用場(chǎng)景

? ? ? 1.單個(gè)任務(wù)處理時(shí)間比較短

? ? ? 2.需要處理的任務(wù)量很大


二.線程池優(yōu)勢(shì)

? ? ? ? 1.重用存在的線程,減少線程創(chuàng)建,消亡的開銷,提高性能

? ? ? ? 2.提高響應(yīng)速度,當(dāng)任務(wù)到達(dá)時(shí),任務(wù)可以不要等到線程創(chuàng)建就立即執(zhí)行????

? ? ? ? 3.提高線程的可管理性,線程是稀缺資源,如果無(wú)線創(chuàng)建,會(huì)消耗系統(tǒng)資源,降低系統(tǒng)穩(wěn)定性,線程池可以進(jìn)行統(tǒng)一分配,調(diào)優(yōu)和調(diào)度


三.線程池分析

? ? ? ? 1.線程池的創(chuàng)建

? ? ? ? //利用Executors工具類去創(chuàng)建線程池

? ? ? ??ExecutorService executor = Executors.newFixedThreadPool(5);

????????????????????????????????????\Downarrow ? ??????????

? ??????public static ExecutorService? newFixedThreadPool(int nThreads) {

//參數(shù):線程池?cái)?shù)量,最大線程池?cái)?shù)量,線程最大空閑存活時(shí)間,空閑時(shí)間計(jì)量單位,無(wú)空閑線程則存放到阻塞隊(duì)列中

return new ThreadPoolExecutor(nThreads, nThreads,

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? 0L, TimeUnit.MILLISECONDS,

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? new LinkedBlockingQueue());

}



????????2.線程池的工作流程

? ? ? ? 線程池的大小 maximumPoolSize = 核心線程5+非核心線程5 = 10

? ? ? ? 當(dāng)有任務(wù)進(jìn)入的時(shí)候,會(huì)優(yōu)先將任務(wù)分配到核心線程,其他剩余的任務(wù)放入到阻塞隊(duì)列中去等待,如果此隊(duì)列是一個(gè)有界隊(duì)列,則直到放滿為止,如果還有任務(wù)則創(chuàng)建非核心線程,如還有剩余任務(wù)沒(méi)有分分配,則執(zhí)行RejectedExecutionHandler拒絕策略
? ??????


? ? ? ? 3.線程池的重點(diǎn)屬性

public class ThreadPoolExecutor extends AbstractExecutorService {

? ??????????private?final?AtomicInteger?ctl?=?new?AtomicInteger(ctlOf(RUNNING,? 0))? ????????????private?static?final?int?COUNT_BITS?=?Integer.SIZE?-?3? ????????????private?static?final?int?CAPACITY???=?(1?<<?COUNT_BITS)?-?1?

}

屬性解析:? ? ctl?是對(duì)線程池的運(yùn)行狀態(tài)和線程池中有效線程的數(shù)量進(jìn)行控制的一個(gè)字段,?它包含兩 部分的信息:?線程池的運(yùn)行狀態(tài)?(runState)?和線程池內(nèi)有效線程的數(shù)量?(workerCount),這 里可以看到,使用了Integer類型來(lái)保存,高3位保存runState,低29位保存 workerCount。COUNT_BITS?就是29,CAPACITY就是1左移29位減1(29個(gè)1),這個(gè)常 量表示workerCount的上限值,大約是5億。

ctl相關(guān)方法 :

? ??????private?static?int?runStateOf(int?c)?????{?return?c?&?~CAPACITY??} ????????private?static?int?workerCountOf(int?c)??{?return?c?&?CAPACITY??} ????????private?static?int?ctlOf(int?rs,?int?wc)?{?return?rs?|?wc??}?

runStateOf:獲取運(yùn)行狀態(tài); ????????workerCountOf:獲取活動(dòng)線程數(shù); ctlOf:獲取運(yùn)行狀態(tài)和活動(dòng)線程數(shù)的?

??????


??4.線程池存在的5種狀態(tài)

RUNNING????=?-1?<<?COUNT_BITS??//高3位為111?

SHUTDOWN???=??0?<<?COUNT_BITS??//高3位為000?

STOP???????=??1?<<?COUNT_BITS??//高3位為001?

TIDYING????=??2?<<?COUNT_BITS??//高3位為010?

TERMINATED?=??3?<<?COUNT_BITS??//高3位為011

1.RUNNING:

? ? ? ? (1)狀態(tài)說(shuō)明:線程池處于RUNNING狀態(tài)時(shí),能夠接收新任務(wù),以及對(duì)已經(jīng)添加的任務(wù)進(jìn)行處理

? ? ? ? (2)狀態(tài)切換:線程池的初始化狀態(tài)是RUNNING,話句話說(shuō),線程池一但被創(chuàng)建,就處于RUNNING狀態(tài),并且線程池種的任務(wù)書為0

2.SHUTDOWN:

? ? ? ? (1)狀態(tài)說(shuō)明:線程池處在SHUTDOWN狀態(tài)時(shí),不接受新任務(wù),但能處理已經(jīng)添加的任務(wù)

? ? ? ? (2)狀態(tài)切換:調(diào)用線程池的shutdown()接口時(shí),線程池由RUNNING -> SHUTDOWN

3.STOP:

? ? ? ? (1):狀態(tài)說(shuō)明:線程池處在STOP狀態(tài)時(shí),不接受新任務(wù),不處理已添加的任務(wù),并且會(huì)中斷正在處理的任務(wù)

? ? ? ? (2)狀態(tài)切換:調(diào)用線程池的shutdownNow()接口時(shí),線程池由(RUNNING or SHUTDOWN)-->STOP

4.TIDYING:

????????(1) 狀態(tài)說(shuō)明:當(dāng)所有的任務(wù)已終止,ctl記錄的”任務(wù)數(shù)量”為0,線程池會(huì)變?yōu)門IDYING

狀態(tài)。當(dāng)線程池變?yōu)門IDYING狀態(tài)時(shí),會(huì)執(zhí)行鉤子函數(shù)terminated()。terminated()在

ThreadPoolExecutor類中是空的,若用戶想在線程池變?yōu)門IDYING時(shí),進(jìn)行相應(yīng)的處理;

可以通過(guò)重載terminated()函數(shù)來(lái)實(shí)現(xiàn)。

????????(2) 狀態(tài)切換:當(dāng)線程池在SHUTDOWN狀態(tài)下,阻塞隊(duì)列為空并且線程池中執(zhí)行的任務(wù)也

為空時(shí),就會(huì)由 SHUTDOWN -> TIDYING。 當(dāng)線程池在STOP狀態(tài)下,線程池中執(zhí)行的

任務(wù)為空時(shí),就會(huì)由STOP -> TIDYING。

5.TERMINARTED:

????????(1) 狀態(tài)說(shuō)明:線程池徹底終止,就變成TERMINATED狀態(tài)。

????????(2) 狀態(tài)切換:線程池處在TIDYING狀態(tài)時(shí),執(zhí)行完terminated()之后,就會(huì)由 TIDYING -

> TERMINATED。


狀態(tài)轉(zhuǎn)換流程圖



? ? ? ? ? ? ? ? ? ? ? ? ? ? ThreadPoolExecutor

1.線程池的創(chuàng)建:

ThreadPoolExecutor(int corePoolSize,? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?

????????????????????????????????????????int maximumPoolSize,? ? ? ? ? ? ? ? ? ? ?

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?long keepAliveTime,? ? ? ? ? ? ? ? ? ? ? ??

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?TimeUnit unit,?

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?BlockingQueue<Runnable> workQueue,?

????????????????????????????????????ThreadFactory threadFactory,?

????????????????????????????????????RejectedExecutionHandler handler)? ?

參數(shù)解釋:

????????corePoolSize?:? ? ? ?

線程池中的核心線程數(shù),當(dāng)提交一個(gè)任務(wù)時(shí),線程池創(chuàng)建一個(gè)新線程執(zhí)行任務(wù),直到當(dāng)

前線程數(shù)等于corePoolSize;如果當(dāng)前線程數(shù)為corePoolSize,繼續(xù)提交的任務(wù)被保存到

阻塞隊(duì)列中,等待被執(zhí)行;

? ? ? ?maximumPoolSize:

線程池中允許的最大線程數(shù)。如果當(dāng)前阻塞隊(duì)列滿了,且繼續(xù)提交任務(wù),則創(chuàng)建新的線

程執(zhí)行任務(wù),前提是當(dāng)前線程數(shù)小于maximumPoolSize;

? ??????keepAliveTime:

線程池維護(hù)線程所允許的空閑時(shí)間,當(dāng)線程池中的線程數(shù)量大于corePoolSize的時(shí)候,如果這時(shí)沒(méi)有新的任務(wù)提交,核心線程外的線程不會(huì)立刻銷毀,而是會(huì)等待,知道等待的時(shí)間超過(guò)了keepAliveTime

????????unit? :

keepAliveTime的單位

? ? ? ? workQueue:

用來(lái)保存等待被執(zhí)行的任務(wù)的阻塞隊(duì)列,且任務(wù)必須實(shí)現(xiàn)Runable接口,在JDK中提供

了如下阻塞隊(duì)列:

1、ArrayBlockingQueue:基于數(shù)組結(jié)構(gòu)的有界阻塞隊(duì)列,按FIFO排序任務(wù);

2、LinkedBlockingQuene:基于鏈表結(jié)構(gòu)的阻塞隊(duì)列,按FIFO排序任務(wù),吞

吐量通常要高于ArrayBlockingQuene;

3、SynchronousQuene:一個(gè)不存儲(chǔ)元素的阻塞隊(duì)列,每個(gè)插入操作必須等到

另一個(gè)線程調(diào)用移除操作,否則插入操作一直處于阻塞狀態(tài),吞吐量通常要高于

LinkedBlockingQuene;

4、priorityBlockingQuene:具有優(yōu)先級(jí)的無(wú)界阻塞隊(duì)列;

? ? ? ? threadFactory:

它是ThreadFactory類型的變量,用來(lái)創(chuàng)建新線程。默認(rèn)使用Executors.defaultThreadFactory() 來(lái)創(chuàng)建線程。使用默認(rèn)的ThreadFactory來(lái)創(chuàng)建線程

時(shí),會(huì)使新創(chuàng)建的線程具有相同的NORM_PRIORITY優(yōu)先級(jí)并且是非守護(hù)線程,同時(shí)也設(shè)

置了線程的名稱。

? ? ? ? handler:

線程池的飽和策略,當(dāng)阻塞隊(duì)列滿了,且沒(méi)有空閑的工作線程,如果繼續(xù)提交任務(wù),必須提供一中策略處理該任務(wù),一般提供了4中策略

1、AbortPolicy:直接拋出異常,默認(rèn)策略;

2、CallerRunsPolicy:用調(diào)用者所在的線程來(lái)執(zhí)行任務(wù);

3、DiscardOldestPolicy:丟棄阻塞隊(duì)列中靠最前的任務(wù),并執(zhí)行當(dāng)前任務(wù);

4、DiscardPolicy:直接丟棄任務(wù);

當(dāng)然也可以根據(jù)應(yīng)用場(chǎng)景實(shí)現(xiàn)RejectedExecutionHandler接口,自定義飽和策略,如

記錄日志或持久化存儲(chǔ)不能處理的任務(wù)。



1.任務(wù)提交(execute方法源碼分析)

????????public void execute() //提交任務(wù)無(wú)返回值

public void execute(Runnable command) {

????????if (command ==null)

????????????????????throw new NullPointerException();


? ????? int c =ctl.get();

?????????/*\color{green}{workerCountOf方法取出低29位的值,表示當(dāng)前的線程數(shù);}

? ?\color{green}{如果當(dāng)前活動(dòng)線程數(shù)小于corePoolSize,則新建一個(gè)線程放入線程池中;}

? ? ? ?\color{green}{ 并把任務(wù)添加到該線程中}*/

? ? ????if (workerCountOf(c)<corePoolSize){

? ? ? ? ? ? ? ??\color{green}{*斷還是maximumPoolSize}

? ? ? ? ? ? ? ? \color{green}{*如果為true,根據(jù)corePoolSize來(lái)判斷;}

? ? ? ? ? ? ? \color{green}{  *如果為false,則根據(jù)maximumPoolSize來(lái)判斷}

????????????????if (addWorker(command, true))

????????????????????????return;

? ? ? ? ? ? ? ??\color{green}{ //如果添加失敗,則重新獲取ctl值}

? ? ? ? ? ? ? ? c =ctl.get();

? ? }

? ? ? ? ? ? ?//? ?\color{green}{如果當(dāng)前線程是運(yùn)行狀態(tài),并且任務(wù)添加到隊(duì)列成功}

if (isRunning(c) &&workQueue.offer(command)) {

? ? ? ? ? ? int recheck =ctl.get();

? ? ? ??\color{green}{//再次判斷線程的運(yùn)行狀態(tài),如果不是運(yùn)行狀態(tài),由于之前已經(jīng)把command添加到workQueue中了,}

\color{green}{這時(shí)需要移除該command,執(zhí)行通過(guò)后執(zhí)行handler使用拒絕策略,進(jìn)行處理}

? ? ? ? ? if (!isRunning(recheck) && remove(command))

? ? ? ? ? ? ? ? ? ?reject(command);

\color{green}{//獲取線程池中有效的線程數(shù),如果是0,則執(zhí)行addworker()方法}

\color{green}{//第一參數(shù)為null,表示在線程池中創(chuàng)建一個(gè)線程,但不去啟動(dòng),第二個(gè)參數(shù)為false,表示添加時(shí)根據(jù)maximumPoolSize來(lái)判斷}

? ? ? ? ? else if (workerCountOf(recheck) ==0)

? ? ? ? ? ? ? ? ? addWorker(null, false);

? ? }

\color{green}{//執(zhí)行到這里,有兩種情況:}

? ??????\color{green}{1.線程池已經(jīng)不是RUNNING狀態(tài),                         }

? ? ? ? ? ?\color{green}{   2.線程池是RUNNING狀態(tài),但workerCount >= corePoolSize并且 workQueue已滿} ? ? ? ? ?\color{green}{//這時(shí)再次調(diào)用addWorker方法,但第二個(gè)參數(shù)是false,將線程池的有限線程數(shù)量上線設(shè)置為maximumPoolSize}

else if (!addWorker(command, false))

????????????reject(command);

}

\color{red}{個(gè)人總結(jié):}?

?簡(jiǎn)單來(lái)說(shuō),在執(zhí)行execute()方法時(shí)如果狀態(tài)一直是RUNNING,執(zhí)行過(guò)程如下:? ? ? ? ? ? ? ??

????????1. 如果workerCount < corePoolSize,則創(chuàng)建并啟動(dòng)一個(gè)線程來(lái)執(zhí)行新提交的任

務(wù);

????????2. 如果workerCount >= corePoolSize,且線程池內(nèi)的阻塞隊(duì)列未滿,則將任務(wù)添

加到該阻塞隊(duì)列中;

????????3. 如 果 workerCount >= corePoolSize && workerCount <

????????????maximumPoolSize,且線程池內(nèi)的阻塞隊(duì)列已滿,則創(chuàng)建并啟動(dòng)一個(gè)線程來(lái)執(zhí)行新

????????????提交的任務(wù);

????????4. 如果workerCount >= maximumPoolSize,并且線程池內(nèi)的阻塞隊(duì)列已滿, 則根

????????????據(jù)拒絕策略來(lái)處理該任務(wù), 默認(rèn)的處理方式是直接拋異常。

????????????這里要注意一下addWorker(null, false);,也就是創(chuàng)建一個(gè)線程,但并沒(méi)有傳入任務(wù),因?yàn)?/p>

????????????任務(wù)已經(jīng)被添加到workQueue中了,所以worker在執(zhí)行的時(shí)候,會(huì)直接從workQueue中

????????????獲取任務(wù)。所以,在workerCountOf(recheck) == 0時(shí)執(zhí)行addWorker(null, false);也是

????????????為了保證線程池在RUNNING狀態(tài)下必須要有一個(gè)線程來(lái)執(zhí)行任務(wù)。


?execute方法執(zhí)行流程



2.addWorker方法

? ? addWorker方法的主要工作是在線程池中創(chuàng)建一個(gè)新的線程并執(zhí)行,代碼如下:

private boolean addWorker(Runnable firstTask, boolean core) {

????retry:

????for (;;) {

????????????int c =ctl.get();

? ? ? ? ? ??\color{green}{//獲取運(yùn)行狀態(tài)}

? ? ? ? ????int rs =runStateOf(c);

? ??????????????\color{green}{//這個(gè)if判斷,如果rs>=SHUTDOWN,則表示不能再接收新任務(wù)}

\color{green}{//接著判斷一下3個(gè)條件,只要有1個(gè)不滿足,則返回false}
\color{green}{1.rs==SHUTDOWN,這是表示關(guān)閉狀態(tài),不再接受新任務(wù)}
\color{green}{2.frisTask為空,3.阻塞隊(duì)列不為空}
\color{green}{//因?yàn)殛?duì)列中已經(jīng)沒(méi)有任務(wù)了,不需要再添加線程了}

? ? ? ? ? ? if (rs >=SHUTDOWN &&

????????????! (rs ==SHUTDOWN &&

????????????????firstTask ==null &&

????????????????????!workQueue.isEmpty()))

????????????????????????return false;

? ? ? ? for (;;) {//這個(gè)自旋是來(lái)判斷我們這個(gè)線程池是否可以創(chuàng)建我們的任務(wù)

? ??????????\color{green}{//獲取線程數(shù)}

????????????????????int wc =workerCountOf(c);??\color{green}{//如果wc超過(guò)CAPACITY(workCountd的上限值)}??\color{green}{//這里的core是addworker方法的第二個(gè)參數(shù)}? ? ? ? ? ? ? ??\color{green}{//如果為true則根據(jù)corePoolSize比較,否則根據(jù)maximumPoolSize比較}

? ? ? ? ? ????????? if (wc >=CAPACITY ||

????????????????????????wc >= (core ?corePoolSize :maximumPoolSize))

????????????????????????return false;

????????????????????\color{green}{//嘗試增加workerCount,如果成功則跳出第一個(gè)循環(huán)}? ? ? ? ???????
? ? ? ? ? ? ? ? ? ?if (compareAndIncrementWorkerCount(c))

????????????????????????????????????????break retry;

? ??????????????????\color{green}{//如果新增失敗,則獲取ctl的值}

? ? ? ? ? ? ? ? ? ? ? ? ?c =ctl.get();??

? ? ? ? ? ? ? ? ? ? ? ??\color{green}{//如果當(dāng)前運(yùn)行狀態(tài)不等于re,說(shuō)明狀態(tài)已經(jīng)改變,翻譯第一個(gè)for循環(huán)繼續(xù)執(zhí)行}

? ? ? ? ? ? ? ? ? ? ? ? ? if (runStateOf(c) != rs)

????????????????????????????????continue retry;

? ? ? ? }

}

? ? boolean workerStarted =false;

? ? boolean workerAdded =false;

? ? Worker w =null;

? ? try {

? ??????\color{green}{//根據(jù)firstTask對(duì)象來(lái)創(chuàng)建Worker對(duì)象,此類的分析在后面分析}

? ? ? ? w =new Worker(firstTask);

? ? ? ? final Thread t = w.thread;

? ? ? ? if (t !=null) {

? ? ? ? ? ? final ReentrantLock mainLock =this.mainLock;

? ? ? ? ? ? mainLock.lock();

? ? ? ? ? ? try {

? ? ? ? ? ? ? ? int rs =runStateOf(ctl.get());

\color{green}{//rs<SHUTDOWN表示是RUNNING狀態(tài)}

\color{green}{/或者是SHUTDOWN狀態(tài),并且firstTask
是null,向線程池中添加線程}

? ? ? ? ? ? ? ? if (rs<SHUTDOWN||

????????????????????????(rs ==SHUTDOWN && firstTask ==null)) {

????????????????????????if (t.isAlive())

? ? ? ? ? ? ? ? ? ? ? ? ????????????throw new IllegalThreadStateException();
? ??????????????????????\color{green}{//workers是一個(gè)HashSet}

? ? ? ? ? ? ? ? ? ? ? ? workers.add(w);

? ? ? ? ? ? ? ? ? ? int s =workers.size();
? ??????????????\color{green}{//記錄這線程池中出現(xiàn)過(guò)的最大線程數(shù)量}

? ? ? ? ? ? ? ? ? ? if (s >largestPoolSize)

? ? ? ? ? ? ? ? ? ? ? ? largestPoolSize = s;

? ? ? ? ? ? ? ? ? ? workerAdded =true;

? ? ? ? ? ? ? ? }

}finally {

mainLock.unlock();

? ? ? ? ? ? }

if (workerAdded) {

? ??????????\color{green}{//啟動(dòng)線程,這里執(zhí)行了Worker類中的run方法}

? ? ? ? ? ? ? ?t.start();

? ? ? ? ? ? ? ? workerStarted =true;

? ? ? ? ? ? }

}

}finally {

????????????if (! workerStarted)

????????????addWorkerFailed(w);

? ? }

????????????return workerStarted;

}



3.Worker類

? ? ? ? 線程池中的每一個(gè)線程被封裝成一個(gè)Worker對(duì)象,? ? ? ? ThreadPool維護(hù)的其實(shí)就是一組Worker對(duì)象

? ? ? ? Worker繼承了AQS,實(shí)現(xiàn)了Runnable接口,其中的firstTask和thread屬性,firstTask用他來(lái)保存?zhèn)魅氲娜蝿?wù),thread是在調(diào)用構(gòu)造方法時(shí)通過(guò)ThreadFactory來(lái)創(chuàng)建的線程,用來(lái)處理任務(wù)的線程

? ? ? ? 在調(diào)用構(gòu)造方法時(shí),需要把任務(wù)傳入,這里通過(guò)getThreadFactory().newThread(this);來(lái)新建一個(gè)線程,newThread方法傳入的參數(shù)是this,因?yàn)閃orker本身繼承了Runable接口,也就是一個(gè)線程,所以Worker對(duì)象在啟動(dòng)的時(shí)候回調(diào)用Worker類中的run方法

Worker類



4.RunWorker方法

? ? ? ? 在Worker類中的run方法調(diào)用了runWorker方法來(lái)執(zhí)行任務(wù),runWorker方法的代碼如下:

final void runWorker(Worker w) {? ? ? ? ? ??

????????Thread wt = Thread.currentThread();

? ? ????Runnable task = w.firstTask;

? ? ? ?w.firstTask =null;

\color{green}{//   允許中斷,因?yàn)樵趧?chuàng)建Worker的時(shí)候setStatus(-1),是不允許中斷的,執(zhí)行到這里可以中斷了}

? ????? w.unlock();?

\color{green}{//是否因?yàn)楫惓M顺鲅h(huán)}

????? ? boolean completedAbruptly =true;

\color{green}{//線程池一直保持存活的原因}

? ????????????? try {

\color{green}{//如果task為空,則通過(guò)getTask來(lái)獲取任務(wù)(阻塞隊(duì)列中)}

\color{red}{//getTask()方法在下面解析}

? ? ? ? ? ? ? ?while (task !=null || (task = getTask()) !=null) {

????????????????????w.lock();

? ? ? ? ? ????????? if ((runStateAtLeast(ctl.get(), STOP) ||

????????????????????(Thread.interrupted() &&

? ? ? ? ? ????????? runStateAtLeast(ctl.get(), STOP))) &&

????????????????????!wt.isInterrupted())

????????????????????????????????wt.interrupt();

? ? ? ? ? ? try {

????????????????beforeExecute(wt, task);

? ? ? ? ? ? ? ? Throwable thrown =null;

? ? ? ? ? ? ? ? try {

????????????????????????????task.run();

? ? ? ? ? ? ? ? }catch (RuntimeException x) {

????????????????thrown = x; throw x;

? ? ? ? ? ? ? ? }catch (Error x) {

????????????????????????????thrown = x; throw x;

? ? ? ? ? ? ? ? }catch (Throwable x) {

????????????????????????????thrown = x; throw new Error(x);

? ? ? ? ? ? ? ? }finally {

????????????????????????????afterExecute(task, thrown);

? ? ? ? ? ? ? ? }

????????????}finally {

????????????????????????????task =null;

? ? ? ? ? ? ? ? w.completedTasks++;

? ? ? ? ? ? ? ? w.unlock();

? ? ? ? ? ? }

}

completedAbruptly =false;

? ? }finally {

\color{red}{//下面講解此方法,進(jìn)程退出的邏輯}

processWorkerExit(w, completedAbruptly);

? ? }

}

\color{red}{總結(jié) runWorker方法的執(zhí)行過(guò)程}:

1. while循環(huán)不斷地通過(guò)getTask()方法獲取任務(wù);

2. getTask()方法從阻塞隊(duì)列中取任務(wù);

3. 如果線程池正在停止,那么要保證當(dāng)前線程是中斷狀態(tài),否則要保證當(dāng)前線程不是

中斷狀態(tài);

4. 調(diào)用task.run()執(zhí)行任務(wù);

5. 如果task為null則跳出循環(huán),執(zhí)行processWorkerExit()方法;

6. runWorker方法執(zhí)行完畢,也代表著Worker中的run方法執(zhí)行完畢,銷毀線程。

這里的beforeExecute方法和afterExecute方法在ThreadPoolExecutor類中是空的,留給

子類來(lái)實(shí)現(xiàn)。

completedAbruptly 變 量 來(lái) 表 示 在 執(zhí) 行 任 務(wù) 過(guò) 程 中 是 否 出 現(xiàn) 了 異 常 , 在

processWorkerExit方法中會(huì)對(duì)該變量的值進(jìn)行判斷。




5.getTask方法

getTask方法用來(lái)從阻塞隊(duì)列中獲取任務(wù)

private RunnablegetTask() {? ? ? ??

\color{green}{// timeOut變量的值表示上次從阻塞隊(duì)列中取任務(wù)時(shí)是否超時(shí)}

????????boolean timedOut =false;?

?????????????? for (;;) {

????????????????????int c =ctl.get();

? ? ? ????????????? int rs =runStateOf(c);

? ? ? ? ????????????if (rs >=SHUTDOWN && (rs >=STOP ||workQueue.isEmpty())) {

????????????????????????????decrementWorkerCount();

????????????????????????????????return null;

? ? ????????????? ? }

????????????????????int wc =workerCountOf(c);

? ??????????????\color{green}{//timed變量用于判斷是否需要進(jìn)行超時(shí)控制,allowCoreThreadTimeOut默認(rèn)是false}

? ??????????????\color{green}{//allowCoreThreadTimeOut 是否允許核心線程超時(shí),如果允許,一旦超時(shí),核心線程和非核心線程都會(huì)結(jié)束生命周期}

\color{green}{//wc>corePoolSize,說(shuō)明有非核心線程,time為true,允許停掉}

\color{green}{//對(duì)于超過(guò)核心線程數(shù)量的這些線程,需要進(jìn)行超時(shí)控制}

? ? ? ????????????? boolean timed =allowCoreThreadTimeOut || wc >corePoolSize;

? ? ? ? ????????????if ((wc >maximumPoolSize || (timed && timedOut))

????????????????????????&& (wc >1 ||workQueue.isEmpty())) {

? ??????????????????\color{green}{//判斷有效線程數(shù)>1或者阻塞隊(duì)列是空的,那么嘗試將workerCount減1}

????????????????????????????????????if (compareAndDecrementWorkerCount(c))

????????????????????????????????????????????????return null;

????????????????????????????????????????????????continue;

? ? ? ????????????????????????????????? }

try {

????????????Runnable r = timed ?

\color{green}{//timed為true,要進(jìn)行超時(shí)判斷,如果在keepAliveTime時(shí)間內(nèi)沒(méi)有獲取到任務(wù),則返回null}

? ? ? ? ? ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :

? ? ? ? ? ? workQueue.take();

? ? ? ? ? ? if (r !=null)

????????????????return r;

//這里設(shè)置timedOut=true,會(huì)在第二次for循環(huán)中返回null,workerCount數(shù)量減1

? ? ? ? ? ? timedOut =true;

? ? ? ? }catch (InterruptedException retry) {

timedOut =false;

? ? ? ? }

}

}



? 6.processWorkerExit方法

? ??????????private void processWorkerExit(Worker w, boolean completedAbruptly) {

\color{green}{//如果completedAbruptly為true,說(shuō)明線程在執(zhí)行過(guò)程中出現(xiàn)了異常,需要將workerCount減1}

\color{green}{//如果線程執(zhí)行過(guò)程中沒(méi)有異常,說(shuō)明在getTask()中已經(jīng)對(duì)workerCount進(jìn)行了減1操作,這里不需要再減}

????????????????????if (completedAbruptly)

? ? ? ? ????????????????????????decrementWorkerCount();

? ? ????????????????final ReentrantLock mainLock =this.mainLock;

? ????????????????? mainLock.lock();

? ? ? ? ? ? ? ? ? ? try {

? ??????????????????????\color{green}{//統(tǒng)計(jì)完成的任務(wù)數(shù)}

????????????????????????????completedTaskCount += w.completedTasks;

? ??????????????????????\color{green}{//從workers中移除,也就表示著從線程池中移除了一個(gè)工作線程}

? ? ? ? ????????????????????workers.remove(w);

? ????????????????? }finally {

????????????????????????????mainLock.unlock();

? ????????????????? }

????????????????tryTerminate();

? ????????????? int c =ctl.get();

? ? ? ? ? ? ? ?if (runStateLessThan(c, STOP)) {

? ? ? ? ? ? ? ? ? ? ? ?if (!completedAbruptly) {

????????????????????????????????int min =allowCoreThreadTimeOut ?0 :corePoolSize;

? ? ? ? ? ????????????????????? if (min ==0 && !workQueue.isEmpty())

????????????????????????????????????????min =1;

? ? ? ? ? ????????????????????? if (workerCountOf(c) >= min)

????????????????????????????????????????????return;?

? ? ? ????????? }

????????????????addWorker(null, false);

? ????? }

}



至此,processWorkerExit執(zhí)行完之后,工作線程被銷毀,以上就是整個(gè)工作線程的生

命周期,從execute方法開始,Worker使用ThreadFactory創(chuàng)建新的工作線程,

runWorker通過(guò)getTask獲取任務(wù),然后執(zhí)行任務(wù),如果getTask返回null,進(jìn)入

processWorkerExit方法,整個(gè)線程結(jié)束,如圖所示:


線程池的生命周期
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容