concurrency-threadpoolexecutor
Java 中的線程池是運用場景最多的并發(fā)框架,幾乎所有需要異步或并發(fā)執(zhí)行任務(wù)的程序都可以使用線程池。在開發(fā)過程中,合理地使用線程池能夠帶來 3 個好處。
- 降低資源消耗。通過重復(fù)利用已創(chuàng)建的線程降低線程創(chuàng)建和銷毀造成的消耗。
- 提高響應(yīng)速度。當(dāng)任務(wù)到達時,任務(wù)可以不需要等到線程創(chuàng)建就能立即執(zhí)行。
- 提高線程的可管理性。線程是稀缺資源,如果無限制地創(chuàng)建,不僅會消耗系統(tǒng)資源,還會降低系統(tǒng)的穩(wěn)定性,使用線程池可以進行統(tǒng)一分配、調(diào)優(yōu)和監(jiān)控。
ThreadPoolExecutor 執(zhí)行流程
ThreadPoolExecutor 執(zhí)行 execute 方法分下面 4 種情況。
- 如果當(dāng)前運行的線程少于 corePoolSize,則創(chuàng)建新線程來執(zhí)行任務(wù)(注意,執(zhí)行這一步驟需要獲取全局鎖)。
- 如果運行的線程等于或多于 corePoolSize,則將任務(wù)加入 BlockingQueue。
- 如果無法將任務(wù)加入 BlockingQueue(隊列已滿),則創(chuàng)建新的線程來處理任務(wù)(注意,執(zhí)行這一步驟需要獲取全局鎖)。
- 如果創(chuàng)建新線程將使當(dāng)前運行的線程超出 maximumPoolSize,任務(wù)將被拒絕,并調(diào)用 RejectedExecutionHandler.rejectedExecution() 方法。
ThreadPoolExecutor 采取上述步驟的總體設(shè)計思路,是為了在執(zhí)行 execute() 方法時,盡可能地避免獲取全局鎖(那將會是一個嚴重的可伸縮瓶頸)。在 ThreadPoolExecutor 完成預(yù)熱之后(當(dāng)前運行的線程數(shù)大于等于 corePoolSize),幾乎所有的 execute() 方法調(diào)用都是執(zhí)行步驟2,而步驟2不需要獲取全局鎖。
ThreadPoolExecutor 源碼分析
線程池生命周期和線程狀態(tài)標(biāo)識ctl
線程池用 ctl 的低 29 位表示線程池中的線程數(shù),高 3 位表示當(dāng)前線程狀態(tài)。
高3位表示狀態(tài)
- RUNNING:運行狀態(tài),高3位為111;
- SHUTDOWN:關(guān)閉狀態(tài),高3位為000,在此狀態(tài)下,線程池不再接受新任務(wù),但是仍然處理阻塞隊列中的任務(wù);
- STOP:停止?fàn)顟B(tài),高3位為001,在此狀態(tài)下,線程池不再接受新任務(wù),也不會處理阻塞隊列中的任務(wù),正在運行的任務(wù)也會停止;
- TIDYING:高3位為010;
- TERMINATED:終止?fàn)顟B(tài),高3位為011。
線程狀態(tài)標(biāo)識ctl
// ctl 高3位表示線程池狀態(tài),低29位表示當(dāng)前工作線程數(shù)
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3; // 低29位表示工作線程數(shù)
private static final int CAPACITY = (1 << COUNT_BITS) - 1; // 最大線程數(shù) 0x1fffffff
// 獲取線程池狀態(tài)、線程總數(shù)、構(gòu)造 ctl
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
全局鎖
// 全局鎖,創(chuàng)建工作線程等操作時需要獲取全局鎖
private final ReentrantLock mainLock = new ReentrantLock();
private final Condition termination = mainLock.newCondition();
工作線程
// 工作線程
private final HashSet<Worker> workers = new HashSet<Worker>();
private int largestPoolSize;
private volatile int corePoolSize;
任務(wù)提交execute
// ThreadPoolExecutor 的任務(wù)提交過程
// java.util.concurrent.ThreadPoolExecutor#execute
/**
* Executes the given task sometime in the future. The task
* may execute in a new thread or in an existing pooled thread.
*
* If the task cannot be submitted for execution, either because this
* executor has been shutdown or because its capacity has been reached,
* the task is handled by the current {@code RejectedExecutionHandler}.
*
* @param command the task to execute
* @throws RejectedExecutionException at discretion of
* {@code RejectedExecutionHandler}, if the task
* cannot be accepted for execution
* @throws NullPointerException if {@code command} is null
*/
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
// ctl 是一個重要的控制全局狀態(tài)的數(shù)據(jù)結(jié)構(gòu),定義為一個線程安全的 AtomicInteger
// ctl = new AtomicInteger(ctlOf(RUNNING, 0));
int c = ctl.get();
/**
* workerCountOf方法取出低29位的值,表示當(dāng)前活動的線程數(shù);
* 如果當(dāng)前活動的線程數(shù)小于corePoolSize,則新建一個線程放入線程池中,并把該任務(wù)放到線程中
*/
if (workerCountOf(c) < corePoolSize) {
/**
* addWorker中的第二個參數(shù)表示限制添加線程的數(shù)量 是根據(jù)據(jù)corePoolSize 來判斷還是maximumPoolSize來判斷;
* 如果是ture,根據(jù)corePoolSize判斷
* 如果是false,根據(jù)maximumPoolSize判斷
*/
if (addWorker(command, true))
return;
/**
* 如果添加失敗,則重新獲取ctl值
*/
c = ctl.get();
}
/**
* 如果線程池是Running狀態(tài),并且任務(wù)添加到隊列中
*/
if (isRunning(c) && workQueue.offer(command)) {
//double-check,重新獲取ctl的值
int recheck = ctl.get();
/**
* 再次判斷線程池的狀態(tài),如果不是運行狀態(tài),由于之前已經(jīng)把command添加到阻塞隊列中,這時候需要從隊列中移除command;
* 通過handler使用拒絕策略對該任務(wù)進行處理,整個方法返回
*/
if (!isRunning(recheck) && remove(command))
reject(command);
/**
* 獲取線程池中的有效線程數(shù),如果數(shù)量是0,則執(zhí)行addWorker方法;
* 第一個參數(shù)為null,表示在線程池中創(chuàng)建一個線程,但不去啟動
* 第二個參數(shù)為false,將線程池的線程數(shù)量的上限設(shè)置為maximumPoolSize,添加線程時根據(jù)maximumPoolSize來判斷
*/
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
/**
* 執(zhí)行到這里,有兩種情況:
* 1、線程池的狀態(tài)不是RUNNING;
* 2、線程池狀態(tài)是RUNNING,但是workerCount >= corePoolSize, workerQueue已滿
* 這個時候,再次調(diào)用addWorker方法,第二個參數(shù)傳false,將線程池的有限線程數(shù)量的上限設(shè)置為maximumPoolSize;
* 如果失敗則執(zhí)行拒絕策略;
*/
} else if (!addWorker(command, false))
reject(command);
}
execute總結(jié)
通過上面這一小段代碼,我們就已經(jīng)完整地看到了。通過一個 ctl 變量進行全局狀態(tài)控制,從而保證了線程安全性。整個框架并沒有使用鎖,但是卻是線程安全的。
整段代碼剛好完整描述了線程池的執(zhí)行流程:
- 如果workerCount < corePoolSize,則創(chuàng)建并啟動一個線程來執(zhí)行新提交的任務(wù);
- 如果workerCount >= corePoolSize,且線程池內(nèi)的阻塞隊列未滿,則將任務(wù)添加到該阻塞隊列中;
- 如果 workerCount >= corePoolSize && workerCount < maximumPoolSize,且線程池內(nèi)的阻塞隊列已滿,則創(chuàng)建并啟動一個線程來執(zhí)行新提交的任務(wù);
- 如果workerCount >= maximumPoolSize,并且線程池內(nèi)的阻塞隊列已滿, 則根據(jù)拒絕策略來處理該任務(wù), 默認的處理方式是直接拋異常。
這里要注意一下addWorker(null, false);,也就是創(chuàng)建一個線程,但并沒有傳入任務(wù),因為任務(wù)已經(jīng)被添加到workQueue中了,所以worker在執(zhí)行的時候,會直接從workQueue中獲取任務(wù)。所以,在workerCountOf(recheck) == 0時執(zhí)行addWorker(null, false);也是為了保證線程池在RUNNING狀態(tài)下必須要有一個線程來執(zhí)行任務(wù)。
工作線程worker
線程池中的每一個對象被封裝成一個Worker對象,ThreadPool維護的就是一組Worker對象。Worker類繼承了AQS,并實現(xiàn)了Runnable接口,其中包含了兩個重要屬性:firstTask用來保存?zhèn)魅氲娜蝿?wù),thread是在調(diào)用構(gòu)造方法是通過ThreadFactory來創(chuàng)建的線程,是用來處理任務(wù)的線程。
// Worker 是對線程 Thread 的包裝,實現(xiàn)了 AbstractQueuedSynchronizer
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable {
final Thread thread;
Runnable firstTask;
volatile long completedTasks;
Worker(Runnable firstTask) {
/**
* 把state設(shè)置為-1,,阻止中斷直到調(diào)用runWorker方法;
* 因為AQS默認state是0,如果剛創(chuàng)建一個Worker對象,還沒有執(zhí)行任務(wù)時,這時候不應(yīng)該被中斷
*/
setState(-1);
this.firstTask = firstTask;
/**
* 創(chuàng)建一個線程,newThread方法傳入的參數(shù)是this,因為Worker本身繼承了Runnable接口,也就是一個線程;
* 所以一個Worker對象在啟動的時候會調(diào)用Worker類中run方法
*/
this.thread = getThreadFactory().newThread(this);
}
}
Worker 為什么要繼承 AbstractQueuedSynchronizer 實現(xiàn)自己的鎖,而不使用 ReentrantLock 呢?
- lock方法一旦獲取獨占鎖,表示當(dāng)前線程正在執(zhí)行任務(wù)中;
- 如果正在執(zhí)行任務(wù),則不應(yīng)該中斷線程;
- 如果該線程現(xiàn)在不是獨占鎖的狀態(tài),也就是空閑狀態(tài),說明它沒有處理任務(wù),這時可以對該線程進行中斷;
- 線程池中執(zhí)行shutdown方法或tryTerminate方法時會調(diào)用interruptIdleWorkers方法來中斷空閑線程,interruptIdleWorkers方法會使用tryLock方法來判斷線程池中的線程是否是空閑狀態(tài);
- 之所以設(shè)置為不可重入的,是因為在任務(wù)調(diào)用setCorePoolSize這類線程池控制的方法時,不會中斷正在運行的線程所以,Worker繼承自AQS,用于判斷線程是否空閑以及是否處于被中斷。
創(chuàng)建工作線程 addWorker
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
/**
* 由于線程執(zhí)行過程中,各種情況都有可能處于,通過自旋的方式來保證worker的增加;
*/
for (; ; ) {
int c = ctl.get();
//獲取線程池運行狀態(tài)
int rs = runStateOf(c);
/**
*
* 如果rs >= SHUTDOWN, 則表示此時不再接收新任務(wù);
* 接下來是三個條件 通過 && 連接,只要有一個任務(wù)不滿足,就返回false;
* 1.rs == SHUTDOWN,表示關(guān)閉狀態(tài),不再接收提交的任務(wù),但卻可以繼續(xù)處理阻塞隊列中已經(jīng)保存的任務(wù);
* 2.fisrtTask為空
* 3.Check if queue empty only if necessary.
*/
if (rs >= SHUTDOWN &&
!(rs == SHUTDOWN &&
firstTask == null &&
!workQueue.isEmpty()))
return false;
for (; ; ) {
//獲取線程池的線程數(shù)
int wc = workerCountOf(c);
/**
* 如果線程數(shù) >= CAPACITY, 也就是ctl的低29位的最大值,則返回false;
* 這里的core用來判斷 限制線程數(shù)量的上限是corePoolSize還是maximumPoolSize;
* 如果core是ture表示根據(jù)corePoolSize來比較;
* 如果core是false表示根據(jù)maximumPoolSize來比較;
*/
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
/**
* 通過CAS原子的方式來增加線程數(shù)量;
* 如果成功,則跳出第一個for循環(huán);
*/
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
//如果當(dāng)前運行的狀態(tài)不等于rs,說明線程池的狀態(tài)已經(jīng)改變了,則返回第一個for循環(huán)繼續(xù)執(zhí)行
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
//根據(jù)firstTask來創(chuàng)建Worker對象
w = new Worker(firstTask);
//每一個Worker對象都會創(chuàng)建一個線程
final Thread t = w.thread;
if (t != null) {
//創(chuàng)建可重入鎖
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 獲取線程池的狀態(tài)
int rs = runStateOf(ctl.get());
/**
* 線程池的狀態(tài)小于SHUTDOWN,表示線程池處于RUNNING狀態(tài);
* 如果rs是RUNNING狀態(tài)或rs是SHUTDOWN狀態(tài)并且firstTask為null,向線程池中添加線程;
* 因為在SHUTDOWN狀態(tài)時不會再添加新的任務(wù),但還是處理workQueue中的任務(wù);
*/
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
//workers是一個hashSet
workers.add(w);
int s = workers.size();
//largestPoolSize記錄線程池中出現(xiàn)的最大的線程數(shù)量
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
//啟動線程,Worker實現(xiàn)了Running方法,此時會調(diào)用Worker的run方法
t.start();
workerStarted = true;
}
}
} finally {
if (!workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
- addWorker 前半部分主要是判斷能否新建工作線程,如果允許則執(zhí)行 compareAndIncrementWorkerCount(c),利用 CAS 原則,將線程數(shù)量+1。
- addWorker 后半部分則是真正創(chuàng)建工作線程并啟動,這個過程需要獲取全局鎖。創(chuàng)建失敗則需要回滾 addWorkerFailed。
addWorker 的 4 種調(diào)用方式:
- addWorker(command, true) 線程數(shù) < coreSize 時,則創(chuàng)建新線程
- addWorker(command, false) 當(dāng)①阻塞隊列已滿,②線程數(shù) < maximumPoolSize 時,則創(chuàng)建新線程
- addWorker(null, true) 同 1。只是線程初始化任務(wù)為 null,相當(dāng)于創(chuàng)建一個新的線程。實際的使用是在 prestartCoreThread() 等方法。
- addWorker(null, false) 同 2。只是線程初始化任務(wù)為 null,相當(dāng)于創(chuàng)建一個新的線程,沒立馬分配任務(wù);
線程執(zhí)行 runWorker
worker類中的runworker方法
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
//獲取第一個任務(wù)
Runnable task = w.firstTask;
w.firstTask = null;
//允許中斷
w.unlock();
//是否因異常退出循環(huán)
boolean completedAbruptly = true;
try {
//如果task為空,則通過getTask來獲取任務(wù)
while (task != null || (task = getTask()) != null) {
w.lock();
/**
* 如果線程池正在停止,那么要保證當(dāng)前線程時中斷狀態(tài);
* 如果不是的話,則要保證當(dāng)前線程不是中斷狀態(tài)
*/
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
//beforeExecute和afterExecute是留給子類來實現(xiàn)的
beforeExecute(wt, task);
Throwable thrown = null;
try {
//通過任務(wù)方式執(zhí)行,不是線程方式
task.run();
} catch (RuntimeException x) {
thrown = x;
throw x;
} catch (Error x) {
thrown = x;
throw x;
} catch (Throwable x) {
thrown = x;
throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
//processWorkerExit會對completedAbruptly進行判斷,表示在執(zhí)行過程中是否出現(xiàn)異常
processWorkerExit(w, completedAbruptly);
}
}
總結(jié) runworker
while循環(huán)不斷地通過getTask方法來獲取任務(wù);
getTask方法從阻塞隊列中獲取任務(wù);
如果線程池正在停止,那么要保證當(dāng)前線程處于中斷狀態(tài), 否則要保證當(dāng)前線程不是中斷狀態(tài);
調(diào)用task.run()執(zhí)行任務(wù);
如果task為null則會跳出循環(huán),執(zhí)行processWorkerExit方法;
runWorker方法執(zhí)行完畢,也代表著Worker中的run方法執(zhí)行完畢,銷毀線程。
線程啟動后,釋放鎖,設(shè) AQS 狀態(tài)為 0,釋放鎖。此時其它線程才可以獲取鎖,中斷線程 interrupt;
獲取 firstTask 任務(wù)并執(zhí)行,執(zhí)行任務(wù)前后可定制 beforeExecute 和 afterExecute;
如果 getTask 從阻塞隊列獲取等待任務(wù)執(zhí)行,如果獲取的任務(wù)為 null,while 則退出循環(huán),線程關(guān)閉。
如果線程已經(jīng)STOP,則一定要將線程 interrupt。如果線程處于運行狀態(tài)(包括SHUTDOWN),則一定不能 interrupt。但實際上 interrupt() 方法并不一定能中斷正在運行的線程,它只能喚醒 wait 阻塞的線程或給線程設(shè)置一個標(biāo)記位。業(yè)務(wù)線程必須對 interrupt 做出響應(yīng)才能中斷線程,否則會一直等線程執(zhí)行結(jié)束才會銷毀。
獲取任務(wù) getTask
getTask方法用于從阻塞隊列中獲取任務(wù)
private Runnable getTask() {
//timeout變量的值表示上次從阻塞隊列中獲取任務(wù)是否超時
boolean timedOut = false;
for (; ; ) {
int c = ctl.get();
int rs = runStateOf(c);
/**
* 如果rs >= SHUTDOWN,表示線程池非RUNNING狀態(tài),需要再次判斷:
* 1、rs >= STOP ,線程池是否正在STOP
* 2、阻塞隊列是否為空
* 滿足上述條件之一,則將workCount減一,并返回null;
* 因為如果當(dāng)前線程池的狀態(tài)處于STOP及以上或隊列為空,不能從阻塞隊列中獲取任務(wù);
*/
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
/**
* timed變量用于判斷是否需要進行超時控制;
* allowCoreThreadTimeOut默認是false,也就是核心線程不允許進行超時;
* wc > corePoolSize,表示當(dāng)前線程數(shù)大于核心線程數(shù)量;
* 對于超過核心線程數(shù)量的這些線程,需要進行超時控制;
*/
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
/**
* wc > maximumPoolSize的情況是因為可能在此方法執(zhí)行階段同時執(zhí)行了 setMaximumPoolSize方法;
* timed && timedOut 如果為true,表示當(dāng)前操作需要進行超時控制,并且上次從阻塞隊列中獲取任務(wù)發(fā)生了超時;
* 接下來判斷,如果有效咸亨數(shù)量大于1,或者workQueue為空,那么將嘗試workCount減1;
* 如果減1失敗,則返回重試;
* 如果wc==1時,也就說明當(dāng)前線程是線程池中的唯一線程了;
*/
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
/**
* timed為trure,則通過workQueue的poll方法進行超時控制,如果在keepAliveTime時間內(nèi)沒有獲取任務(wù),則返回null;
* 否則通過take方法,如果隊列為空,則take方法會阻塞直到隊列中不為空;
*/
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
//如果r==null,說明已經(jīng)超時了,timedOut = true;
timedOut = true;
} catch (InterruptedException retry) {
//如果獲取任務(wù)時當(dāng)前線程發(fā)生了中斷,則將timedOut = false;
timedOut = false;
}
}
}
getTask 總結(jié)
第二個if判斷,目的是為了控制線程池的有效線程數(shù)量。有上文分析得到,在execute方法時,如果當(dāng)前線程池的線程數(shù)量超過coolPoolSize且小于maxmumPoolSize,并且阻塞隊列已滿時,則可以通過增加工作線程。但是如果工作線程在超時時間內(nèi)沒有獲取到任務(wù),timeOut=true,說明workQueue為空,也就說當(dāng)前線程池不需要那么多線程來執(zhí)行任務(wù)了,可以把多于的corePoolSize數(shù)量的線程銷毀掉,保證線程數(shù)量在corePoolSize即可。
- getTask 時,worker 已經(jīng)釋放了鎖,也就是說其它線程可以調(diào)用 wt.interrupt() 喚醒等待的線程。
- 如果當(dāng)前線程數(shù)大于最大線程數(shù),或允許核心線程銷毀時,如果獲取任務(wù)超時則返回 null,即銷毀線程。
processWorkerExit方法
processWorkerExit執(zhí)行完之后,工作線程被銷毀。
private void processWorkerExit(Worker w, boolean completedAbruptly) {
/**
* 如果completedAbruptly為true,則說明線程執(zhí)行時出現(xiàn)異常,需要將workerCount數(shù)量減一
* 如果completedAbruptly為false,說明在getTask方法中已經(jīng)對workerCount進行減一,這里不用再減
*/
if (completedAbruptly)
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//統(tǒng)計完成的任務(wù)數(shù)
completedTaskCount += w.completedTasks;
//從workers中移除,也就表示從線程池中移除一個工作線程
workers.remove(w);
} finally {
mainLock.unlock();
}
//鉤子函數(shù),根據(jù)線程池的狀態(tài)來判斷是否結(jié)束線程池
tryTerminate();
int c = ctl.get();
/**
* 當(dāng)前線程是RUNNING或SHUTDOWN時,如果worker是異常結(jié)束,那么會直接addWorker;
* 如果allowCoreThreadTimeOut=true,那么等待隊列有任務(wù),至少保留一個worker;
* 如果allowCoreThreadTimeOut=false,workerCount少于coolPoolSize
*/
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && !workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
addWorker(null, false);
}
}
線程關(guān)閉
shutdown
/**
* Initiates an orderly shutdown in which previously submitted
* tasks are executed, but no new tasks will be accepted.
* Invocation has no additional effect if already shut down.
*
* <p>This method does not wait for previously submitted tasks to
* complete execution. Use {@link #awaitTermination awaitTermination}
* to do that.
*
* @throws SecurityException {@inheritDoc}
*/
public void shutdown() {
// 為保證線程安全,使用 mainLock
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// SecurityManager 檢查
checkShutdownAccess();
// 設(shè)置狀態(tài)為 SHUTDOWN
advanceRunState(SHUTDOWN);
// 中斷空閑的 Worker, 即相當(dāng)于依次關(guān)閉每個空閑線程
interruptIdleWorkers();
// 關(guān)閉鉤子,默認實現(xiàn)為空操作,為方便子類實現(xiàn)自定義清理功能
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
// 再
tryTerminate();
}
/**
* Transitions runState to given target, or leaves it alone if
* already at least the given target.
*
* @param targetState the desired state, either SHUTDOWN or STOP
* (but not TIDYING or TERMINATED -- use tryTerminate for that)
*/
private void advanceRunState(int targetState) {
for (;;) {
int c = ctl.get();
// 自身CAS更新成功或者被其他線程更新成功
if (runStateAtLeast(c, targetState) ||
ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
break;
}
}
// 關(guān)閉空閑線程(非 running 狀態(tài))
/**
* Common form of interruptIdleWorkers, to avoid having to
* remember what the boolean argument means.
*/
private void interruptIdleWorkers() {
// 上文已介紹, 此處 ONLY_ONE 為 false, 即是最大可能地中斷所有 Worker
interruptIdleWorkers(false);
}
shutdownNow
與 shutdown 對應(yīng)的,有一個 shutdownNow, 其語義是 立即停止所有任務(wù)。
/**
* Attempts to stop all actively executing tasks, halts the
* processing of waiting tasks, and returns a list of the tasks
* that were awaiting execution. These tasks are drained (removed)
* from the task queue upon return from this method.
*
* <p>This method does not wait for actively executing tasks to
* terminate. Use {@link #awaitTermination awaitTermination} to
* do that.
*
* <p>There are no guarantees beyond best-effort attempts to stop
* processing actively executing tasks. This implementation
* cancels tasks via {@link Thread#interrupt}, so any task that
* fails to respond to interrupts may never terminate.
*
* @throws SecurityException {@inheritDoc}
*/
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
// 與 shutdown 的差別,設(shè)置的狀態(tài)不一樣
advanceRunState(STOP);
// 強行中斷線程
interruptWorkers();
// 將未完成的任務(wù)返回
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}
/**
* Interrupts all threads, even if active. Ignores SecurityExceptions
* (in which case some threads may remain uninterrupted).
*/
private void interruptWorkers() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers)
// 調(diào)用 worker 的提供的中斷方法
w.interruptIfStarted();
} finally {
mainLock.unlock();
}
}
// ThreadPoolExecutor.Worker#interruptIfStarted
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
// 直接調(diào)用任務(wù)的 interrupt
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
shutdown 和 shutdownNow 區(qū)別:
- shutdown 會執(zhí)行完成已提交的任務(wù)后關(guān)閉線程池,而 shutdownNow 則會踢除已提交的任務(wù)。
- shutdown 調(diào)用 interruptIdleWorkers 關(guān)閉空閑的線程,而 shutdownNow 調(diào)用 interruptWorkers 強行中斷所有的線程。
interruptIdleWorkers 和 interruptWorkers
- interruptIdleWorkers 只會嘗試獲取鎖,因此只會中斷空閑線程。而 interruptWorkers 不需要獲取鎖,強行中斷線程。實際上業(yè)務(wù)線程必須對 interrupt 做出響應(yīng)才能中斷線程,否則會一直等線程執(zhí)行結(jié)束才會銷毀。
- 而 interruptIdleWorkers 和 interruptWorkers 都是 interrupt 所有線程, 因此大部分線程將立刻被中斷。之所以是大部分,而不是全部,是因為 interrupt() 方法能力有限。 如果線程中沒有 sleep 、wait、Condition、定時鎖等應(yīng)用, interrupt() 方法是無法中斷當(dāng)前的線程的。所以,ShutdownNow() 并不代表線程池就一定立即就能退出,它可能必須要等待所有正在執(zhí)行的任務(wù)都執(zhí)行完成了才能退出。 如下面這個線程永遠不會中斷,因為該線程沒有響應(yīng) Thread.interrupted() 或者是直接將 InterruptedException 異常 catch 了。
參考