ThreadPoolExecutor是jdk內(nèi)置線程池的一個(gè)實(shí)現(xiàn),基本上大部分情況都會使用這個(gè)線程池完成各項(xiàng)操作。
本文分析ThreadPoolExecutor的實(shí)現(xiàn)原理。
ThreadPoolExecutor的狀態(tài)和屬性
ThreadPoolExecutor的屬性在之前的一篇java內(nèi)置的線程池筆記文章中解釋過了,本文不再解釋。
ThreadPoolExecutor線程池有5個(gè)狀態(tài),分別是:
- RUNNING:可以接受新的任務(wù),也可以處理阻塞隊(duì)列里的任務(wù)
- SHUTDOWN:不接受新的任務(wù),但是可以處理阻塞隊(duì)列里的任務(wù)
- STOP:不接受新的任務(wù),不處理阻塞隊(duì)列里的任務(wù),中斷正在處理的任務(wù)
- TIDYING:過渡狀態(tài),也就是說所有的任務(wù)都執(zhí)行完了,當(dāng)前線程池已經(jīng)沒有有效的線程,這個(gè)時(shí)候線程池的狀態(tài)將會TIDYING,并且將要調(diào)用terminated方法
- TERMINATED:終止?fàn)顟B(tài)。terminated方法調(diào)用完成以后的狀態(tài)
狀態(tài)之間可以進(jìn)行轉(zhuǎn)換:
RUNNING -> SHUTDOWN:手動調(diào)用shutdown方法,或者ThreadPoolExecutor要被GC回收的時(shí)候調(diào)用finalize方法,finalize方法內(nèi)部也會調(diào)用shutdown方法
(RUNNING or SHUTDOWN) -> STOP:調(diào)用shutdownNow方法
SHUTDOWN -> TIDYING:當(dāng)隊(duì)列和線程池都為空的時(shí)候
STOP -> TIDYING:當(dāng)線程池為空的時(shí)候
TIDYING -> TERMINATED:terminated方法調(diào)用完成之后
ThreadPoolExecutor內(nèi)部還保存著線程池的有效線程個(gè)數(shù)。
狀態(tài)和線程數(shù)在ThreadPoolExecutor內(nèi)部使用一個(gè)整型變量保存,沒錯,一個(gè)變量表示兩種含義。
為什么一個(gè)整型變量既可以保存狀態(tài),又可以保存數(shù)量? 分析一下:
首先,我們知道java中1個(gè)整型占4個(gè)字節(jié),也就是32位,所以1個(gè)整型有32位。
所以整型1用二進(jìn)制表示就是:00000000000000000000000000000001
整型-1用二進(jìn)制表示就是:11111111111111111111111111111111(這個(gè)是補(bǔ)碼,不懂的同學(xué)可以看下原碼,反碼,補(bǔ)碼的知識)
在ThreadPoolExecutor,整型中32位的前3位用來表示線程池狀態(tài),后3位表示線程池中有效的線程數(shù)。
// 前3位表示狀態(tài),所有線程數(shù)占29位
private static final int COUNT_BITS = Integer.SIZE - 3;
線程池容量大小為 1 << 29 - 1 = 00011111111111111111111111111111(二進(jìn)制),代碼如下
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
RUNNING狀態(tài) -1 << 29 = 11111111111111111111111111111111 << 29 = 11100000000000000000000000000000(前3位為111):
private static final int RUNNING = -1 << COUNT_BITS;
SHUTDOWN狀態(tài) 0 << 29 = 00000000000000000000000000000000 << 29 = 00000000000000000000000000000000(前3位為000)
private static final int SHUTDOWN = 0 << COUNT_BITS;
STOP狀態(tài) 1 << 29 = 00000000000000000000000000000001 << 29 = 00100000000000000000000000000000(前3位為001):
private static final int STOP = 1 << COUNT_BITS;
TIDYING狀態(tài) 2 << 29 = 00000000000000000000000000000010 << 29 = 01000000000000000000000000000000(前3位為010):
private static final int TIDYING = 2 << COUNT_BITS;
TERMINATED狀態(tài) 3 << 29 = 00000000000000000000000000000011 << 29 = 01100000000000000000000000000000(前3位為011):
private static final int TERMINATED = 3 << COUNT_BITS;
清楚狀態(tài)位之后,下面是獲得狀態(tài)和線程數(shù)的內(nèi)部方法:
// 得到線程數(shù),也就是后29位的數(shù)字。 直接跟CAPACITY做一個(gè)與操作即可,CAPACITY就是的值就 1 << 29 - 1 = 00011111111111111111111111111111。 與操作的話前面3位肯定為0,相當(dāng)于直接取后29位的值
private static int workerCountOf(int c) { return c & CAPACITY; }
// 得到狀態(tài),CAPACITY的非操作得到的二進(jìn)制位11100000000000000000000000000000,然后做在一個(gè)與操作,相當(dāng)于直接取前3位的的值
private static int runStateOf(int c) { return c & ~CAPACITY; }
// 或操作。相當(dāng)于更新數(shù)量和狀態(tài)兩個(gè)操作
private static int ctlOf(int rs, int wc) { return rs | wc; }
線程池初始化狀態(tài)線程數(shù)變量:
// 初始化狀態(tài)和數(shù)量,狀態(tài)為RUNNING,線程數(shù)為0
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
ThreadPoolExecutor執(zhí)行任務(wù)
使用ThreadPoolExecutor執(zhí)行任務(wù)的時(shí)候,可以使用execute或submit方法,submit方法如下:
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
很明顯地看到,submit方法內(nèi)部使用了execute方法,而且submit方法是有返回值的。在調(diào)用execute方法之前,使用FutureTask包裝一個(gè)Runnable,這個(gè)FutureTask就是返回值。
由于submit方法內(nèi)部調(diào)用execute方法,所以execute方法就是執(zhí)行任務(wù)的方法,來看一下execute方法,execute方法內(nèi)部分3個(gè)步驟進(jìn)行處理。
- 如果當(dāng)前正在執(zhí)行的Worker數(shù)量比corePoolSize(基本大小)要小。直接創(chuàng)建一個(gè)新的Worker執(zhí)行任務(wù),會調(diào)用addWorker方法
- 如果當(dāng)前正在執(zhí)行的Worker數(shù)量大于等于corePoolSize(基本大小)。將任務(wù)放到阻塞隊(duì)列里,如果阻塞隊(duì)列沒滿并且狀態(tài)是RUNNING的話,直接丟到阻塞隊(duì)列,否則執(zhí)行第3步。丟到阻塞隊(duì)列之后,還需要再做一次驗(yàn)證(丟到阻塞隊(duì)列之后可能另外一個(gè)線程關(guān)閉了線程池或者剛剛加入到隊(duì)列的線程死了)。如果這個(gè)時(shí)候線程池不在RUNNING狀態(tài),把剛剛丟入隊(duì)列的任務(wù)remove掉,調(diào)用reject方法,否則查看Worker數(shù)量,如果Worker數(shù)量為0,起一個(gè)新的Worker去阻塞隊(duì)列里拿任務(wù)執(zhí)行
- 丟到阻塞失敗的話,會調(diào)用addWorker方法嘗試起一個(gè)新的Worker去阻塞隊(duì)列拿任務(wù)并執(zhí)行任務(wù),如果這個(gè)新的Worker創(chuàng)建失敗,調(diào)用reject方法
上面說的Worker可以暫時(shí)理解為一個(gè)執(zhí)行任務(wù)的線程。
execute方法源碼如下,上面提到的3個(gè)步驟對應(yīng)源碼中的3個(gè)注釋:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) { // 第一個(gè)步驟,滿足線程池中的線程大小比基本大小要小
if (addWorker(command, true)) // addWorker方法第二個(gè)參數(shù)true表示使用基本大小
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) { // 第二個(gè)步驟,線程池的線程大小比基本大小要大,并且線程池還在RUNNING狀態(tài),阻塞隊(duì)列也沒滿的情況,加到阻塞隊(duì)列里
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command)) // 雖然滿足了第二個(gè)步驟,但是這個(gè)時(shí)候可能突然線程池關(guān)閉了,所以再做一層判斷
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false)) // 第三個(gè)步驟,直接使用線程池最大大小。addWorker方法第二個(gè)參數(shù)false表示使用最大大小
reject(command);
}
addWorker關(guān)系著如何起一個(gè)線程,再看addWorker方法之前,先看一下ThreadPoolExecutor的一個(gè)內(nèi)部類Worker, Worker是一個(gè)AQS的實(shí)現(xiàn)類(為何設(shè)計(jì)成一個(gè)AQS在閑置Worker里會說明),同時(shí)也是一個(gè)實(shí)現(xiàn)Runnable的類,使用獨(dú)占鎖,它的構(gòu)造函數(shù)只接受一個(gè)Runnable參數(shù),內(nèi)部保存著這個(gè)Runnable屬性,還有一個(gè)thread線程屬性用于包裝這個(gè)Runnable(這個(gè)thread屬性使用ThreadFactory構(gòu)造,在構(gòu)造函數(shù)內(nèi)完成thread線程的構(gòu)造),另外還有一個(gè)completedTasks計(jì)數(shù)器表示這個(gè)Worker完成的任務(wù)數(shù)。Worker類復(fù)寫了run方法,使用ThreadPoolExecutor的runWorker方法(在addWorker方法里調(diào)用),直接啟動Worker的話,會調(diào)用ThreadPoolExecutor的runWork方法。需要特別注意的是這個(gè)Worker是實(shí)現(xiàn)了Runnable接口的,thread線程屬性使用ThreadFactory構(gòu)造Thread的時(shí)候,構(gòu)造的Thread中使用的Runnable其實(shí)就是Worker。下面的Worker的源碼:
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
/**
* This class will never be serialized, but we provide a
* serialVersionUID to suppress a javac warning.
*/
private static final long serialVersionUID = 6138294804551838833L;
/** Thread this worker is running in. Null if factory fails. */
final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;
/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
// 使用ThreadFactory構(gòu)造Thread,這個(gè)構(gòu)造的Thread內(nèi)部的Runnable就是本身,也就是Worker。所以得到Worker的thread并start的時(shí)候,會執(zhí)行Worker的run方法,也就是執(zhí)行ThreadPoolExecutor的runWorker方法
setState(-1); 把狀態(tài)位設(shè)置成-1,這樣任何線程都不能得到Worker的鎖,除非調(diào)用了unlock方法。這個(gè)unlock方法會在runWorker方法中一開始就調(diào)用,這是為了確保Worker構(gòu)造出來之后,沒有任何線程能夠得到它的鎖,除非調(diào)用了runWorker之后,其他線程才能獲得Worker的鎖
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this);
}
// Lock methods
//
// The value 0 represents the unlocked state.
// The value 1 represents the locked state.
protected boolean isHeldExclusively() {
return getState() != 0;
}
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
接下來看一下addWorker源碼:
// 兩個(gè)參數(shù),firstTask表示需要跑的任務(wù)。boolean類型的core參數(shù)為true的話表示使用線程池的基本大小,為false使用線程池最大大小
// 返回值是boolean類型,true表示新任務(wù)被接收了,并且執(zhí)行了。否則是false
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c); // 線程池當(dāng)前狀態(tài)
// 這個(gè)判斷轉(zhuǎn)換成 rs >= SHUTDOWN && (rs != SHUTDOWN || firstTask != null || workQueue.isEmpty)。
// 概括為3個(gè)條件:
// 1\. 線程池不在RUNNING狀態(tài)并且狀態(tài)是STOP、TIDYING或TERMINATED中的任意一種狀態(tài)
// 2\. 線程池不在RUNNING狀態(tài),線程池接受了新的任務(wù)
// 3\. 線程池不在RUNNING狀態(tài),阻塞隊(duì)列為空。 滿足這3個(gè)條件中的任意一個(gè)的話,拒絕執(zhí)行任務(wù)
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c); // 線程池線程個(gè)數(shù)
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize)) // 如果線程池線程數(shù)量超過線程池最大容量或者線程數(shù)量超過了基本大小(core參數(shù)為true,core參數(shù)為false的話判斷超過最大大小)
return false; // 超過直接返回false
if (compareAndIncrementWorkerCount(c)) // 沒有超過各種大小的話,cas操作線程池線程數(shù)量+1,cas成功的話跳出循環(huán)
break retry;
c = ctl.get(); // 重新檢查狀態(tài)
if (runStateOf(c) != rs) // 如果狀態(tài)改變了,重新循環(huán)操作
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
// 走到這一步說明cas操作成功了,線程池線程數(shù)量+1
boolean workerStarted = false; // 任務(wù)是否成功啟動標(biāo)識
boolean workerAdded = false; // 任務(wù)是否添加成功標(biāo)識
Worker w = null;
try {
final ReentrantLock mainLock = this.mainLock; // 得到線程池的可重入鎖
w = new Worker(firstTask); // 基于任務(wù)firstTask構(gòu)造worker
final Thread t = w.thread; // 使用Worker的屬性thread,這個(gè)thread是使用ThreadFactory構(gòu)造出來的
if (t != null) { // ThreadFactory構(gòu)造出的Thread有可能是null,做個(gè)判斷
mainLock.lock(); // 鎖住,防止并發(fā)
try {
// 在鎖住之后再重新檢測一下狀態(tài)
int c = ctl.get();
int rs = runStateOf(c);
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) { // 如果線程池在RUNNING狀態(tài)或者線程池在SHUTDOWN狀態(tài)并且任務(wù)是個(gè)null
if (t.isAlive()) // 判斷線程是否還活著,也就是說線程已經(jīng)啟動并且還沒死掉
throw new IllegalThreadStateException(); // 如果存在已經(jīng)啟動并且還沒死的線程,拋出異常
workers.add(w); // worker添加到線程池的workers屬性中,是個(gè)HashSet
int s = workers.size(); // 得到目前線程池中的線程個(gè)數(shù)
if (s > largestPoolSize) // 如果線程池中的線程個(gè)數(shù)超過了線程池中的最大線程數(shù)時(shí),更新一下這個(gè)最大線程數(shù)
largestPoolSize = s;
workerAdded = true; // 標(biāo)識一下任務(wù)已經(jīng)添加成功
}
} finally {
mainLock.unlock(); // 解鎖
}
if (workerAdded) { // 如果任務(wù)添加成功,運(yùn)行任務(wù),改變一下任務(wù)成功啟動標(biāo)識
t.start(); // 啟動線程,這里的t是Worker中的thread屬性,所以相當(dāng)于就是調(diào)用了Worker的run方法
workerStarted = true;
}
}
} finally {
if (! workerStarted) // 如果任務(wù)啟動失敗,調(diào)用addWorkerFailed方法
addWorkerFailed(w);
}
return workerStarted;
}
Worker中的線程start的時(shí)候,調(diào)用Worker本身run方法,這個(gè)run方法之前分析過,調(diào)用外部類ThreadPoolExecutor的runWorker方法,直接看runWorker方法:
final void runWorker(Worker w) {
Thread wt = Thread.currentThread(); // 得到當(dāng)前線程
Runnable task = w.firstTask; // 得到Worker中的任務(wù)task,也就是用戶傳入的task
w.firstTask = null; // 將Worker中的任務(wù)置空
w.unlock(); // allow interrupts。
boolean completedAbruptly = true;
try {
// 如果worker中的任務(wù)不為空,繼續(xù)知否,否則使用getTask獲得任務(wù)。一直死循環(huán),除非得到的任務(wù)為空才退出
while (task != null || (task = getTask()) != null) {
w.lock(); // 如果拿到了任務(wù),給自己上鎖,表示當(dāng)前Worker已經(jīng)要開始執(zhí)行任務(wù)了,已經(jīng)不是閑置Worker(閑置Worker的解釋請看下面的線程池關(guān)閉)
// 在執(zhí)行任務(wù)之前先做一些處理。 1\. 如果線程池已經(jīng)處于STOP狀態(tài)并且當(dāng)前線程沒有被中斷,中斷線程 2\. 如果線程池還處于RUNNING或SHUTDOWN狀態(tài),并且當(dāng)前線程已經(jīng)被中斷了,重新檢查一下線程池狀態(tài),如果處于STOP狀態(tài)并且沒有被中斷,那么中斷線程
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task); // 任務(wù)執(zhí)行前需要做什么,ThreadPoolExecutor是個(gè)空實(shí)現(xiàn)
Throwable thrown = null;
try {
task.run(); // 真正的開始執(zhí)行任務(wù),調(diào)用的是run方法,而不是start方法。這里run的時(shí)候可能會被中斷,比如線程池調(diào)用了shutdownNow方法
} catch (RuntimeException x) { // 任務(wù)執(zhí)行發(fā)生的異常全部拋出,不在runWorker中處理
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown); // 任務(wù)執(zhí)行結(jié)束需要做什么,ThreadPoolExecutor是個(gè)空實(shí)現(xiàn)
}
} finally {
task = null;
w.completedTasks++; // 記錄執(zhí)行任務(wù)的個(gè)數(shù)
w.unlock(); // 執(zhí)行完任務(wù)之后,解鎖,Worker變成閑置Worker
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly); // 回收Worker方法
}
}
我們看一下getTask方法是如何獲得任務(wù)的:
// 如果發(fā)生了以下四件事中的任意一件,那么Worker需要被回收:
// 1\. Worker個(gè)數(shù)比線程池最大大小要大
// 2\. 線程池處于STOP狀態(tài)
// 3\. 線程池處于SHUTDOWN狀態(tài)并且阻塞隊(duì)列為空
// 4\. 使用超時(shí)時(shí)間從阻塞隊(duì)列里拿數(shù)據(jù),并且超時(shí)之后沒有拿到數(shù)據(jù)(allowCoreThreadTimeOut || workerCount > corePoolSize)
private Runnable getTask() {
boolean timedOut = false; // 如果使用超時(shí)時(shí)間并且也沒有拿到任務(wù)的標(biāo)識
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 如果線程池是SHUTDOWN狀態(tài)并且阻塞隊(duì)列為空的話,worker數(shù)量減一,直接返回null(SHUTDOWN狀態(tài)還會處理阻塞隊(duì)列任務(wù),但是阻塞隊(duì)列為空的話就結(jié)束了),如果線程池是STOP狀態(tài)的話,worker數(shù)量建議,直接返回null(STOP狀態(tài)不處理阻塞隊(duì)列任務(wù))[方法一開始注釋的2,3兩點(diǎn),返回null,開始Worker回收]
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
boolean timed; // 標(biāo)記從隊(duì)列中取任務(wù)時(shí)是否設(shè)置超時(shí)時(shí)間,如果為true說明這個(gè)worker可能需要回收,為false的話這個(gè)worker會一直存在,并且阻塞當(dāng)前線程等待阻塞隊(duì)列中有數(shù)據(jù)
for (;;) {
int wc = workerCountOf(c); // 得到當(dāng)前線程池Worker個(gè)數(shù)
// allowCoreThreadTimeOut屬性默認(rèn)為false,表示線程池中的核心線程在閑置狀態(tài)下還保留在池中;如果是true表示核心線程使用keepAliveTime這個(gè)參數(shù)來作為超時(shí)時(shí)間
// 如果worker數(shù)量比基本大小要大的話,timed就為true,需要進(jìn)行回收worker
timed = allowCoreThreadTimeOut || wc > corePoolSize;
if (wc <= maximumPoolSize && ! (timedOut && timed)) // 方法一開始注釋的1,4兩點(diǎn),會進(jìn)行下一步worker數(shù)量減一
break;
if (compareAndDecrementWorkerCount(c)) // worker數(shù)量減一,返回null,之后會進(jìn)行Worker回收工作
return null;
c = ctl.get(); // 重新檢查線程池狀態(tài)
if (runStateOf(c) != rs) // 線程池狀態(tài)改變的話重新開始外部循環(huán),否則繼續(xù)內(nèi)部循環(huán)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
try {
// 如果需要設(shè)置超時(shí)時(shí)間,使用poll方法,否則使用take方法一直阻塞等待阻塞隊(duì)列新進(jìn)數(shù)據(jù)
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false; // 閑置Worker被中斷
}
}
}
如果getTask返回的是null,那說明阻塞隊(duì)列已經(jīng)沒有任務(wù)并且當(dāng)前調(diào)用getTask的Worker需要被回收,那么會調(diào)用processWorkerExit方法進(jìn)行回收:
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) // 如果Worker沒有正常結(jié)束流程調(diào)用processWorkerExit方法,worker數(shù)量減一。如果是正常結(jié)束的話,在getTask方法里worker數(shù)量已經(jīng)減一了
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock(); // 加鎖,防止并發(fā)問題
try {
completedTaskCount += w.completedTasks; // 記錄總的完成任務(wù)數(shù)
workers.remove(w); // 線程池的worker集合刪除掉需要回收的Worker
} finally {
mainLock.unlock(); // 解鎖
}
tryTerminate(); // 嘗試結(jié)束線程池
int c = ctl.get();
if (runStateLessThan(c, STOP)) { // 如果線程池還處于RUNNING或者SHUTDOWN狀態(tài)
if (!completedAbruptly) { // Worker是正常結(jié)束流程的話
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // 不需要新開一個(gè)Worker
}
// 新開一個(gè)Worker代替原先的Worker
// 新開一個(gè)Worker需要滿足以下3個(gè)條件中的任意一個(gè):
// 1\. 用戶執(zhí)行的任務(wù)發(fā)生了異常
// 2\. Worker數(shù)量比線程池基本大小要小
// 3\. 阻塞隊(duì)列不空但是沒有任何Worker在工作
addWorker(null, false);
}
}
在回收Worker的時(shí)候線程池會嘗試結(jié)束自己的運(yùn)行,tryTerminate方法:
final void tryTerminate() {
for (;;) {
int c = ctl.get();
// 滿足3個(gè)條件中的任意一個(gè),不終止線程池
// 1\. 線程池還在運(yùn)行,不能終止
// 2\. 線程池處于TIDYING或TERMINATED狀態(tài),說明已經(jīng)在關(guān)閉了,不允許繼續(xù)處理
// 3\. 線程池處于SHUTDOWN狀態(tài)并且阻塞隊(duì)列不為空,這時(shí)候還需要處理阻塞隊(duì)列的任務(wù),不能終止線程池
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
// 走到這一步說明線程池已經(jīng)不在運(yùn)行,阻塞隊(duì)列已經(jīng)沒有任務(wù),但是還要回收正在工作的Worker
if (workerCountOf(c) != 0) {
// 由于線程池不運(yùn)行了,調(diào)用了線程池的關(guān)閉方法,在解釋線程池的關(guān)閉原理的時(shí)候會說道這個(gè)方法
interruptIdleWorkers(ONLY_ONE); // 中斷閑置Worker,直到回收全部的Worker。這里沒有那么暴力,只中斷一個(gè),中斷之后退出方法,中斷了Worker之后,Worker會回收,然后還是會調(diào)用tryTerminate方法,如果還有閑置線程,那么繼續(xù)中斷
return;
}
// 走到這里說明worker已經(jīng)全部回收了,并且線程池已經(jīng)不在運(yùn)行,阻塞隊(duì)列已經(jīng)沒有任務(wù)??梢詼?zhǔn)備結(jié)束線程池了
final ReentrantLock mainLock = this.mainLock;
mainLock.lock(); // 加鎖,防止并發(fā)
try {
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { // cas操作,將線程池狀態(tài)改成TIDYING
try {
terminated(); // 調(diào)用terminated方法
} finally {
ctl.set(ctlOf(TERMINATED, 0)); // terminated方法調(diào)用完畢之后,狀態(tài)變?yōu)門ERMINATED
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock(); // 解鎖
}
// else retry on failed CAS
}
}
解釋了這么多,對線程池的啟動并且執(zhí)行任務(wù)做一個(gè)總結(jié):
首先,構(gòu)造線程池的時(shí)候,需要一些參數(shù)。一些重要的參數(shù)解釋在
文章中的結(jié)尾已經(jīng)說明了一下重要參數(shù)的意義。
線程池構(gòu)造完畢之后,如果用戶調(diào)用了execute或者submit方法的時(shí)候,最后都會使用execute方法執(zhí)行。
execute方法內(nèi)部分3種情況處理任務(wù):
- 如果當(dāng)前正在執(zhí)行的Worker數(shù)量比corePoolSize(基本大小)要小。直接創(chuàng)建一個(gè)新的Worker執(zhí)行任務(wù),會調(diào)用addWorker方法
- 如果當(dāng)前正在執(zhí)行的Worker數(shù)量大于等于corePoolSize(基本大小)。將任務(wù)放到阻塞隊(duì)列里,如果阻塞隊(duì)列沒滿并且狀態(tài)是RUNNING的話,直接丟到阻塞隊(duì)列,否則執(zhí)行第3步
- 丟到阻塞失敗的話,會調(diào)用addWorker方法嘗試起一個(gè)新的Worker去阻塞隊(duì)列拿任務(wù)并執(zhí)行任務(wù),如果這個(gè)新的Worker創(chuàng)建失敗,調(diào)用reject方法
線程池中的這個(gè)基本大小指的是Worker的數(shù)量。一個(gè)Worker是一個(gè)Runnable的實(shí)現(xiàn)類,會被當(dāng)做一個(gè)線程進(jìn)行啟動。Worker內(nèi)部帶有一個(gè)Runnable屬性firstTask,這個(gè)firstTask可以為null,為null的話Worker會去阻塞隊(duì)列拿任務(wù)執(zhí)行,否則會先執(zhí)行這個(gè)任務(wù),執(zhí)行完畢之后再去阻塞隊(duì)列繼續(xù)拿任務(wù)執(zhí)行。
所以說如果Worker數(shù)量超過了基本大小,那么任務(wù)都會在阻塞隊(duì)列里,當(dāng)Worker執(zhí)行完了它的第一個(gè)任務(wù)之后,就會去阻塞隊(duì)列里拿其他任務(wù)繼續(xù)執(zhí)行。
Worker在執(zhí)行的時(shí)候會根據(jù)一些參數(shù)進(jìn)行調(diào)節(jié),比如Worker數(shù)量超過了線程池基本大小或者超時(shí)時(shí)間到了等因素,這個(gè)時(shí)候Worker會被線程池回收,線程池會盡量保持內(nèi)部的Worker數(shù)量不超過基本大小。
另外Worker執(zhí)行任務(wù)的時(shí)候調(diào)用的是Runnable的run方法,而不是start方法,調(diào)用了start方法就相當(dāng)于另外再起一個(gè)線程了。
Worker在回收的時(shí)候會嘗試終止線程池。嘗試關(guān)閉線程池的時(shí)候,會檢查是否還有Worker在工作,檢查線程池的狀態(tài),沒問題的話會將狀態(tài)過度到TIDYING狀態(tài),之后調(diào)用terminated方法,terminated方法調(diào)用完成之后將線程池狀態(tài)更新到TERMINATED。
ThreadPoolExecutor的關(guān)閉
線程池的啟動過程分析好了之后,接下來看線程池的關(guān)閉操作:
shutdown方法,關(guān)閉線程池,關(guān)閉之后阻塞隊(duì)列里的任務(wù)不受影響,會繼續(xù)被Worker處理,但是新的任務(wù)不會被接受:
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock(); // 關(guān)閉的時(shí)候需要加鎖,防止并發(fā)
try {
checkShutdownAccess(); // 檢查關(guān)閉線程池的權(quán)限
advanceRunState(SHUTDOWN); // 把線程池狀態(tài)更新到SHUTDOWN
interruptIdleWorkers(); // 中斷閑置的Worker
onShutdown(); // 鉤子方法,默認(rèn)不處理。ScheduledThreadPoolExecutor會做一些處理
} finally {
mainLock.unlock(); // 解鎖
}
tryTerminate(); // 嘗試結(jié)束線程池,上面已經(jīng)分析過了
}
interruptIdleWorkers方法,注意,這個(gè)方法打斷的是閑置Worker,打斷閑置Worker之后,getTask方法會返回null,然后Worker會被回收。那什么是閑置Worker呢?
閑置Worker是這樣解釋的:Worker運(yùn)行的時(shí)候會去阻塞隊(duì)列拿數(shù)據(jù)(getTask方法),拿的時(shí)候如果沒有設(shè)置超時(shí)時(shí)間,那么會一直阻塞等待阻塞隊(duì)列進(jìn)數(shù)據(jù),這樣的Worker就被稱為閑置Worker。由于Worker也是一個(gè)AQS,在runWorker方法里會有一對lock和unlock操作,這對lock操作是為了確保Worker不是一個(gè)閑置Worker。
所以Worker被設(shè)計(jì)成一個(gè)AQS是為了根據(jù)Worker的鎖來判斷是否是閑置線程,是否可以被強(qiáng)制中斷。
下面我們看下interruptIdleWorkers方法:
// 調(diào)用他的一個(gè)重載方法,傳入了參數(shù)false,表示要中斷所有的正在運(yùn)行的閑置Worker,如果為true表示只打斷一個(gè)閑置Worker
private void interruptIdleWorkers() {
interruptIdleWorkers(false);
}
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock(); // 中斷閑置Worker需要加鎖,防止并發(fā)
try {
for (Worker w : workers) {
Thread t = w.thread; // 拿到worker中的線程
if (!t.isInterrupted() && w.tryLock()) { // Worker中的線程沒有被打斷并且Worker可以獲取鎖,這里Worker能獲取鎖說明Worker是個(gè)閑置Worker,在阻塞隊(duì)列里拿數(shù)據(jù)一直被阻塞,沒有數(shù)據(jù)進(jìn)來。如果沒有獲取到Worker鎖,說明Worker還在執(zhí)行任務(wù),不進(jìn)行中斷(shutdown方法不會中斷正在執(zhí)行的任務(wù))
try {
t.interrupt(); // 中斷Worker線程
} catch (SecurityException ignore) {
} finally {
w.unlock(); // 釋放Worker鎖
}
}
if (onlyOne) // 如果只打斷1個(gè)Worker的話,直接break退出,否則,遍歷所有的Worker
break;
}
} finally {
mainLock.unlock(); // 解鎖
}
}
shutdown方法將線程池狀態(tài)改成SHUTDOWN,線程池還能繼續(xù)處理阻塞隊(duì)列里的任務(wù),并且會回收一些閑置的Worker。但是shutdownNow方法不一樣,它會把線程池狀態(tài)改成STOP狀態(tài),這樣不會處理阻塞隊(duì)列里的任務(wù),也不會處理新的任務(wù):
// shutdownNow方法會有返回值的,返回的是一個(gè)任務(wù)列表,而shutdown方法沒有返回值
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock(); // shutdownNow操作也需要加鎖,防止并發(fā)
try {
checkShutdownAccess(); // 檢查關(guān)閉線程池的權(quán)限
advanceRunState(STOP); // 把線程池狀態(tài)更新到STOP
interruptWorkers(); // 中斷Worker的運(yùn)行
tasks = drainQueue();
} finally {
mainLock.unlock(); // 解鎖
}
tryTerminate(); // 嘗試結(jié)束線程池,上面已經(jīng)分析過了
return tasks;
}
shutdownNow的中斷和shutdown方法不一樣,調(diào)用的是interruptWorkers方法:
private void interruptWorkers() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock(); // 中斷Worker需要加鎖,防止并發(fā)
try {
for (Worker w : workers)
w.interruptIfStarted(); // 中斷Worker的執(zhí)行
} finally {
mainLock.unlock(); // 解鎖
}
}
Worker的interruptIfStarted方法中斷Worker的執(zhí)行:
void interruptIfStarted() {
Thread t;
// Worker無論是否被持有鎖,只要還沒被中斷,那就中斷Worker
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt(); // 強(qiáng)行中斷Worker的執(zhí)行
} catch (SecurityException ignore) {
}
}
}
線程池關(guān)閉總結(jié):
線程池的關(guān)閉主要是兩個(gè)方法,shutdown和shutdownNow方法。
shutdown方法會更新狀態(tài)到SHUTDOWN,不會影響阻塞隊(duì)列里任務(wù)的執(zhí)行,但是不會執(zhí)行新進(jìn)來的任務(wù)。同時(shí)也會回收閑置的Worker,閑置Worker的定義上面已經(jīng)說過了。
shutdownNow方法會更新狀態(tài)到STOP,會影響阻塞隊(duì)列的任務(wù)執(zhí)行,也不會執(zhí)行新進(jìn)來的任務(wù)。同時(shí)會回收所有的Worker。
ps:這是轉(zhuǎn)載他人的文章
https://fangjian0423.github.io/2016/03/22/java-threadpool-analysis/