線程池ThreadPoolExecutor源碼分析
所需知識點:
- 1、ReentranLock 重入鎖 以及 Condition的聯合使用。
- 不可重入的互斥鎖,AQS AbstractQueuedSynchronizer
- AQS是怎么實現不可重入互斥鎖的。
- 2、AtomicInteger 線程安全的int
- 3、volatile 線程安全關鍵字
- 4、BlockingQueue 阻塞線程隊列
源碼分析execute()執(zhí)行過程
老規(guī)矩我們先上圖:

1、如果線程池中的線程數量少于corePoolSize,就創(chuàng)建新的線程來執(zhí)行新添加的任務
2、如果線程池中的線程數量大于等于corePoolSize,但隊列workQueue未滿,則將新添加的任務放到workQueue中
3、如果線程池中的線程數量大于等于corePoolSize,且隊列workQueue已滿,但線程池中的線程數量小于maximumPoolSize,則會創(chuàng)建新的線程來處理被添加的任務
4、如果線程池中的線程數量等于了maximumPoolSize,就用RejectedExecutionHandler來執(zhí)行拒絕策略
- 一、ThreadPoolExecutor.execute()方法
- 二、ThreadPoolExecutor.addWorker()方法
- 三、Worker內部類
- 四、ThreadPoolExecutor.runWorker()
- 五、ThreadPoolExecutor.getTask()方法
- 六、ThreadPoolExecutor.processWorkerExit()方法
- 七、ThreadPoolExecutor.tryTerminate()方法
- 八、ThreadPoolExecutor.interruptIdleWorkers()方法
<span id="jump1">一、ThreadPoolExecutor.execute()方法</span>
流程圖:

public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 如果運行的線程少于corePoolSize,嘗試開啟一個新線程去運行command,command作為這個線程的第一個任務
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
* 如果任務成功放入隊列,我們仍需要一個雙重校驗去確認是否應該新建一個線程(因為可能存在有些線程在我們上次檢查后死了) 或者 從我們進入這個方法后,pool被關閉了
* 所以我們需要再次檢查state,如果線程池停止了需要回滾入隊列,如果池中沒有線程了,新開啟 一個線程
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
* 如果無法將任務入隊列(可能隊列滿了),需要新開區(qū)一個線程(自己:往maxPoolSize發(fā)展)
* 如果失敗了,說明線程池shutdown 或者 飽和了,所以我們拒絕任務
*/
//獲取線程池當前狀態(tài)
int c = ctl.get();
/**
* 1、獲取當前工作的Worker數量,如果小于核心線程池數量。就創(chuàng)建新的Worker
*/
if (workerCountOf(c) < corePoolSize) {
//創(chuàng)建新worker對象,啟動新線程,并且設置為true 核心線程。
//如果添加創(chuàng)建成功直接返回。
if (addWorker(command, true))
return;
//新增Worker失敗,重新獲取線程池狀態(tài)值
/**
* 沒有成功addWorker(),再次獲取c(凡是需要再次用ctl做判斷時,都會再次調用ctl.get())
* 失敗的原因可能是:
* 1、線程池已經shutdown,shutdown的線程池不再接收新任務
* 2、workerCountOf(c) < corePoolSize 判斷后,由于并發(fā),別的線程先創(chuàng)建了worker線程,導致workerCount>=corePoolSize
*/
c = ctl.get();
}
/**
* 2、走到這,說明核心線程池已滿,或者線程池shutdown了。
* 如果線程池在運行狀態(tài),并且workQueue隊列插入成功。
* BlockQueue #offer()特性,插入值失敗返回false
*/
if (isRunning(c) && workQueue.offer(command)) {
//再次校驗位
int recheck = ctl.get();
/**
* 再次校驗放入workerQueue中的任務是否能被執(zhí)行
* 1、如果線程池不是運行狀態(tài)了,應該拒絕添加新任務,從workQueue中刪除任務
* 2、如果線程池是運行狀態(tài),或者從workQueue中刪除任務失敗(剛好有一個線程執(zhí)行完畢,并消耗了這個任務),
* 那么addWorker(null,false)確保還有線程執(zhí)行任務(只要有一個就夠了)
*/
//如果線程池不再運行狀態(tài),并且workQueue成功刪除了剛添加的任務,那么就調用拒絕handler方法。
if (! isRunning(recheck) && remove(command))
reject(command);
//如果當前worker數量為0,通過addWorker(null, false)創(chuàng)建一個線程,其任務為null
//為什么只檢查運行的worker數量是不是0呢?? 為什么不和corePoolSize比較呢??
//只保證有一個worker線程可以從queue中獲取任務執(zhí)行就行了??
//因為只要還有活動的worker線程,就可以消費workerQueue中的任務
else if (workerCountOf(recheck) == 0)
addWorker(null, false);//第一個參數為null,說明只為新建一個worker線程,沒有指定firstTask
//第二個參數為true代表占用corePoolSize,false占用maxPoolSize
}
/**
* 3、如果添加workQueue隊列失敗,那么就嘗試添加非核心線程。
* 直到線程擴容超過maximumPoolSize,addWorker失敗就會調用拒絕handler方法
*/
else if (!addWorker(command, false))
reject(command);
}
總結分析:
execute(Runnable command)
參數:
command 提交執(zhí)行的任務,不能為空
執(zhí)行流程:
1、如果線程池當前線程數量少于corePoolSize,則addWorker(command, true)創(chuàng)建新worker線程,如創(chuàng)建成功返回,如沒創(chuàng)建成功,則執(zhí)行后續(xù)步驟;
addWorker(command, true)失敗的原因可能是:
A、線程池已經shutdown,shutdown的線程池不再接收新任務
B、workerCountOf(c) < corePoolSize 判斷后,由于并發(fā),別的線程先創(chuàng)建了worker線程,導致workerCount>=corePoolSize
2、如果線程池還在running狀態(tài),將task加入workQueue阻塞隊列中,如果加入成功,進行double-check,如果加入失?。赡苁顷犃幸褲M),則執(zhí)行后續(xù)步驟;
double-check主要目的是判斷剛加入workQueue阻塞隊列的task是否能被執(zhí)行
A、如果線程池已經不是running狀態(tài)了,應該拒絕添加新任務,從workQueue中刪除任務
B、如果線程池是運行狀態(tài),或者從workQueue中刪除任務失?。▌偤糜幸粋€線程執(zhí)行完畢,并消耗了這個任務),確保還有線程執(zhí)行任務(只要有一個就夠了)
3、如果線程池不是running狀態(tài) 或者 無法入隊列,嘗試開啟新線程,擴容至maxPoolSize,如果addWork(command, false)失敗了,拒絕當前command
<span id="jump2">二、ThreadPoolExecutor.addWorker()方法</span>

/**
* Checks if a new worker can be added with respect to current
* pool state and the given bound (either core or maximum). If so,
* the worker count is adjusted accordingly, and, if possible, a
* new worker is created and started, running firstTask as its
* first task. This method returns false if the pool is stopped or
* eligible to shut down. It also returns false if the thread
* factory fails to create a thread when asked. If the thread
* creation fails, either due to the thread factory returning
* null, or due to an exception (typically OutOfMemoryError in
* Thread.start()), we roll back cleanly.
*
* @param firstTask the task the new thread should run first (or
* null if none). Workers are created with an initial first task
* (in method execute()) to bypass queuing when there are fewer
* than corePoolSize threads (in which case we always start one),
* or when the queue is full (in which case we must bypass queue).
* Initially idle threads are usually created via
* prestartCoreThread or to replace other dying workers.
*
* @param core if true use corePoolSize as bound, else
* maximumPoolSize. (A boolean indicator is used here rather than a
* value to ensure reads of fresh values after checking other pool
* state).
* @return true if successful
* 檢查根據當前線程池的狀態(tài)和給定的邊界(core or maximum)是否可以創(chuàng)建一個新的worker
* * 如果是這樣的話,worker的數量做相應的調整,如果可能的話,創(chuàng)建一個新的worker并啟動,參數中的firstTask作為worker的第一個任務
* * 如果方法返回false,可能因為pool已經關閉或者調用過了shutdown
* * 如果線程工廠創(chuàng)建線程失敗,也會失敗,返回false
* * 如果線程創(chuàng)建失敗,要么是因為線程工廠返回null,要么是發(fā)生了OutOfMemoryError
*/
private boolean addWorker(Runnable firstTask, boolean core) {
//外層循環(huán),負責判斷線程池狀態(tài)
retry:
for (;;) {
int c = ctl.get();
//線程池狀態(tài)
int rs = runStateOf(c);
// Check if queue empty only if necessary.
/**
* 線程池的state越小越是運行狀態(tài),running=-1,shutdown=0, stop=1, tidying=2,terminated=3
* 1、如果線程池state已經至少是shutdown狀態(tài)了
* 2、并且以下3個條件任意一個是false
* rs == SHUTDOWN (隱含:rs>=SHUTDOWN)false情況: 線程池狀態(tài)已經超過shutdown,可能是stop、tidying、terminated其中一個,即線程池已經終止
* firstTask == null (隱含:rs==SHUTDOWN)false情況: firstTask不為空,rs==SHUTDOWN 且 firstTask不為空,return false,場景是在線程池已經shutdown后,還要添加新的任務,拒絕
* ! workQueue.isEmpty() (隱含:rs==SHUTDOWN,firstTask==null)false情況: workQueue為空,當firstTask為空時是為了創(chuàng)建一個沒有任務的線程,再從workQueue中獲取任務,如果workQueue已經為空,那么就沒有添加新worker線程的必要了
* return false,即無法addWorker()
*/
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty())
)
return false;
//內層循環(huán),負責worker數量+1
for (;;) {
//獲取當前worker數量
int wc = workerCountOf(c);
//如果worker數量>線程池最大上限CAPACITY(即使用int低29位可以容納的最大值)
//或者( worker數量>corePoolSize 或 worker數量>maximumPoolSize ),即已經超過了給定的邊界
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//調用unsafe CAS操作,使得worker數量+1,成功則跳出外層retry循環(huán)
//CAS 即Compare and Swap 調用AtomicInteger的同步+1方法,這個方法可能會失敗,返回true成功,false失敗
if (compareAndIncrementWorkerCount(c))
break retry;
//重新驗證線程池運行狀態(tài)
c = ctl.get(); // Re-read ctl
//如果當前狀態(tài)和外層循環(huán)開始時不一樣了,那么回到外層循環(huán)重新開始。
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
/**
* worker數量+1成功的后續(xù)操作
* 添加到workers Set集合,并啟動worker線程
*/
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
//1、設置worker這個AQS鎖的同步狀態(tài)state=-1
//2、將firstTask設置給worker的成員變量firstTask
//3、使用worker自身這個runnable,調用ThreadFactory創(chuàng)建一個線程,并設置給worker的成員變量thread
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//--------------------------------------------這部分代碼是上鎖的
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
// 當獲取到鎖后,再次檢查線程池運行狀態(tài),
int rs = runStateOf(ctl.get());
//如果線程池在運行running<shutdown 或者 線程池已經shutdown,且firstTask==null(可能是workQueue中仍有未執(zhí)行完成的任務,創(chuàng)建沒有初始任務的worker線程執(zhí)行)
//worker數量-1的操作在addWorkerFailed()
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable 線程已經啟動,拋非法線程狀態(tài)異常
throw new IllegalThreadStateException();
//workers是一個HashSet<Worker>,將worker存入
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
//標識worker添加成功
workerAdded = true;
}
} finally {
mainLock.unlock();
}
//添加成功,啟動線程
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
//如果啟動失敗,回滾操作
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
/**
* 如果worker不為空,就從workQueue移除
* @param w
*/
private void addWorkerFailed(Worker w) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (w != null)
workers.remove(w);
//里邊就是調用AtomicInteger#compareAndSet(expect, expect - 1) 減了1
decrementWorkerCount();
tryTerminate();
} finally {
mainLock.unlock();
}
}
addWorker(Runnable firstTask, boolean core)
參數:
firstTask: worker線程的初始任務,可以為空
core: true:將corePoolSize作為上限,false:將maximumPoolSize作為上限
addWorker方法有4種傳參的方式:
1、addWorker(command, true)
2、addWorker(command, false)
3、addWorker(null, false)
4、addWorker(null, true)
在execute方法中就使用了前3種,結合這個核心方法進行以下分析
第一個:線程數小于corePoolSize時,放一個需要處理的task進Workers Set。如果Workers Set長度超過corePoolSize,就返回false
第二個:當隊列被放滿時,就嘗試將這個新來的task直接放入Workers Set,而此時Workers Set的長度限制是maximumPoolSize。如果線程池也滿了的話就返回false
第三個:放入一個空的task進workers Set,長度限制是maximumPoolSize。這樣一個task為空的worker在線程執(zhí)行的時候會去任務隊列里拿任務,這樣就相當于創(chuàng)建了一個新的線程,只是沒有馬上分配任務
第四個:這個方法就是放一個null的task進Workers Set,而且是在小于corePoolSize時,如果此時Set中的數量已經達到corePoolSize那就返回false,什么也不干。實際使用中是在prestartAllCoreThreads()方法,這個方法用來為線程池預先啟動corePoolSize個worker等待從workQueue中獲取任務執(zhí)行
執(zhí)行流程:
1、判斷線程池當前是否為可以添加worker線程的狀態(tài),可以則繼續(xù)下一步,不可以return false:
A、線程池狀態(tài)>shutdown,可能為stop、tidying、terminated,不能添加worker線程
B、線程池狀態(tài)==shutdown,firstTask不為空,不能添加worker線程,因為shutdown狀態(tài)的線程池不接收新任務
C、線程池狀態(tài)==shutdown,firstTask==null,workQueue為空,不能添加worker線程,因為firstTask為空是為了
添加一個沒有任務的線程再從workQueue獲取task,而workQueue為空,說明添加無任務線程已經沒有意義
2、線程池當前線程數量是否超過上限(corePoolSize 或 maximumPoolSize),超過了return false,沒超過則對workerCount+1,繼續(xù)下一步
3、在線程池的ReentrantLock保證下,向Workers Set中添加新創(chuàng)建的worker實例,添加完成后解鎖,并啟動worker線程,
如果這一切都成功了,return true,如果添加worker入Set失敗或啟動失敗,調用addWorkerFailed()邏輯
<span id="jump3">三、Worker內部類</span>
/**
* Class Worker mainly maintains interrupt control state for
* threads running tasks, along with other minor bookkeeping.
* This class opportunistically extends AbstractQueuedSynchronizer
* to simplify acquiring and releasing a lock surrounding each
* task execution.This protects against interrupts that are
* intended to wake up a worker thread waiting for a task from
* instead interrupting a task being run.
*
* We implement a simple non-reentrant mutual exclusion lock rather than use
* ReentrantLock because we do not want worker tasks to be able to
* reacquire the lock when they invoke pool control methods like
* setCorePoolSize.
*
* Additionally, to suppress interrupts until
* the thread actually starts running tasks, we initialize lock
* state to a negative value, and clear it upon start (in
* runWorker).
*
* Worker類大體上管理著運行線程的中斷狀態(tài) 和 一些指標
* Worker類投機取巧的繼承了AbstractQueuedSynchronizer來簡化在執(zhí)行任務時的獲取、釋放鎖
* 這樣防止了中斷在運行中的任務,只會喚醒(中斷)在等待從workQueue中獲取任務的線程
* 解釋:
* 為什么不直接在execute(runnable command)直接執(zhí)行command,而要包一層Worker呢?
* 1、主要目的是為了控制線程中斷,使用不可重入的互斥鎖AQS,來限制同一線程中其他操作導致線程中斷。
* 2、正常shutdown()方法,調用的是interruptIdleWorkers(),這個方法是需要w.tryLock()的,
* 也就是用不可重入鎖AQS協助攔截正在運行的線程調用t.intercept()中斷。所以shutdown()方法
* 不會中斷正在執(zhí)行任務的worker線程。
* 3、但如果shutdownNow()方法,調用的是interruptWorkers(),這個方法并不加鎖,而是直接遍歷
* 所有worker,并t.intercept()。所以shutdownNow()相當于會中斷正在執(zhí)行的Worker線程。
*
* worker實現了一個簡單的不可重入的互斥鎖,而不是用ReentrantLock可重入鎖
* 因為我們不想讓在調用比如setCorePoolSize()這種線程池控制方法時可以再次獲取鎖(重入)
* 解釋:
* 1、setCorePoolSize()時會調用interruptIdleWorkers(),通過這個方法里w.tryLock()來攔截
* 利用不可重入鎖的特性,保證同一線程中執(zhí)行時也會阻塞,來保證worker不被中斷。
* 2、類似的方法還有(只要調用interruptIdleWorkers()的全是):
* shutdown()
* setMaximumPoolSize()
* setKeepAliveTime()
* allowCoreThreadTimeOut()
*
* 另外,為了保證只有worker中的線程已經在運行狀態(tài)才能被中斷。我們初始化state = -1,并在runWorker()
* 啟動線程時,將state設置 = 0。
* 解釋:
* 1、創(chuàng)建Worker過程并沒有真正的t.start()啟動線程。在runWorker()中才調用啟動線程。
* 2、所以在t.start()之前并沒有必要去中斷線程。只有state >= 0的時候,才表示有線程可中斷。
*/
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
/**
* This class will never be serialized, but we provide a
* serialVersionUID to suppress a javac warning.
*
* 這個類永遠都不會被序列化,我們提供序列號id只是解決javac 的警告
*/
private static final long serialVersionUID = 6138294804551838833L;
/** Thread this worker is running in. Null if factory fails. */
final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask;
/** Per-thread task counter 記錄已經完成的任務數*/
volatile long completedTasks;
/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
//初始化state為-1,在runWorker()調用t.start()時再設置為0
setState(-1); // inhibit interrupts until runWorker
//任務可能為null,為空時runWorker()時就會自旋,getTask(),不斷獲取workQueue中的任務。
this.firstTask = firstTask;
//通過線程工廠創(chuàng)建線程
this.thread = getThreadFactory().newThread(this);
}
/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this);
}
// Lock methods
//
// The value 0 represents the unlocked state.
// The value 1 represents the locked state.
//
// 獲取鎖狀態(tài),0表示未鎖,1表示已鎖
protected boolean isHeldExclusively() {
return getState() != 0;
}
/**
* 嘗試獲取鎖方法。AQS獲取鎖的時候會調用這個,本身就是讓子類實現的。
*
* 這里判斷邏輯是通過(CAS)unsafe.compareAndSwapInt的原子性,來比較并設置值。
* 最終是比較當前state == 0 ?那么就設置為 1 并返回true。
* true: 將當前線程綁定上。return true表示獲取鎖成功
* false: 獲取鎖失敗。
*/
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
/**
* 釋放鎖,將state置為0
*/
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
//lock方法也會調用tryAcquire()方法,但是獲取失敗會中斷線程。
public void lock() { acquire(1); }
//嘗試獲取鎖
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
/**
* 結束線程t。如果線程已經start(),并且線程t不為空,也不是中斷狀態(tài),那么就中斷。
* 這個方法再shutdownNow()中使用了,不獲取鎖直接中斷,
*/
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
Worker存在的意義:
- 1、控制中斷,保證task能夠完整執(zhí)行。防止在shutdown()等情況下意外中斷。
- 2、控制自旋獲取getTask()。
- 3、控制自身退出時機
Worker如何實現不可重入鎖:state (-1,0,1)
控制中斷,主要意義就是盡量保證worker中正在執(zhí)行的task能成功執(zhí)行完。
不要在執(zhí)行過程中被其他方法intercept()了。
- 1、繼承AQS(AbstractQueuedSynchronizer)實現不可重入鎖
- 2、在new Worker()時,線程還沒有start()時,state設置-1。此時線程如果調用intercept()沒有意義。
在runWoker()=>t.start()時再設置state=0
- 3、在tryLock()->tryAcquire(),使用CAS先比較再重置的方式設置state 0=>1,就保證了不可重入鎖。
因為只有state=0時才能重新設置,其他-1,1的情況都不能設置成功。
Worker如何控制中斷
- 1、初始化AQS state = -1,此時不允許調用interrupt()。只有runWorker()將state設置為0才允許調用中斷。
- 2、shutdown(),setMaximumPoolSize()等安全退出或改變線程池的方法,都會調用interruptIdleWorkers();中斷空閑worker的方法。
這個方法中會遍歷所有worker,然后嘗試獲取鎖tryLock(),如果tryLock()獲取成功,則說明該worker屬于空閑狀態(tài),
因為worker自旋時如果獲取到task的話就會lock()。只有在getTask()阻塞狀態(tài)時才會釋放鎖。故如果w.tryLock()能成功獲取鎖,
則說明worker空閑。這一點就是利用不可重入鎖的特點來實現的。
- 3、shutdownNow(),調用的是interruptWorkers(),這個方法是直接遍歷worker,直接調用interrupt(),并不會去獲取鎖。
但是判斷state是不是>-1,因為state=-1時,線程都沒有start()呢,沒必要中斷。
<span id="jump4">四、ThreadPoolExecutor.runWorker()</span>
[圖片上傳失敗...(image-107cc8-1618402940252)]
/**
* Main worker run loop. Repeatedly gets tasks from queue and
* executes them, while coping with a number of issues:
* 重復的從隊列中獲取任務并執(zhí)行,同時應對一些問題:
*
* 1. We may start out with an initial task, in which case we
* don't need to get the first one. Otherwise, as long as pool is
* running, we get tasks from getTask. If it returns null then the
* worker exits due to changed pool state or configuration
* parameters. Other exits result from exception throws in
* external code, in which case completedAbruptly holds, which
* usually leads processWorkerExit to replace this thread.
* 我們可能使用一個初始化任務開始,即firstTask為null
* 然后只要線程池在運行,我們就從getTask()獲取任務
* 如果getTask()返回null,則worker由于改變了線程池狀態(tài)或參數配置而退出
* 其它退出因為外部代碼拋異常了,這會使得completedAbruptly為true,這會導致在processWorkerExit()方法中替換當前線程
*
* 2. Before running any task, the lock is acquired to prevent
* other pool interrupts while the task is executing, and
* clearInterruptsForTaskRun called to ensure that unless pool is
* stopping, this thread does not have its interrupt set.
* 在任何任務執(zhí)行之前,都需要對worker加鎖去防止在任務運行時,其它的線程池中斷操作
* clearInterruptsForTaskRun保證除非線程池正在stoping,線程不會被設置中斷標示
*
* 3. Each task run is preceded by a call to beforeExecute, which
* might throw an exception, in which case we cause thread to die
* (breaking loop with completedAbruptly true) without processing
* the task.
* 每個任務執(zhí)行前會調用beforeExecute(),其中可能拋出一個異常,這種情況下會導致線程die(跳出循環(huán),且completedAbruptly==true),沒有執(zhí)行任務
* 因為beforeExecute()的異常沒有cache住,會上拋,跳出循環(huán)
*
* 4. Assuming beforeExecute completes normally, we run the task,
* gathering any of its thrown exceptions to send to
* afterExecute. We separately handle RuntimeException, Error
* (both of which the specs guarantee that we trap) and arbitrary
* Throwables. Because we cannot rethrow Throwables within
* Runnable.run, we wrap them within Errors on the way out (to the
* thread's UncaughtExceptionHandler). Any thrown exception also
* conservatively causes thread to die.
* 假定beforeExecute()正常完成,我們執(zhí)行任務
* 匯總任何拋出的異常并發(fā)送給afterExecute(task, thrown)
* 因為我們不能在Runnable.run()方法中重新上拋Throwables,我們將Throwables包裝到Errors上拋(會到線程的UncaughtExceptionHandler去處理)
* 任何上拋的異常都會導致線程die
*
* 5. After task.run completes, we call afterExecute, which may
* also throw an exception, which will also cause thread to
* die. According to JLS Sec 14.20, this exception is the one that
* will be in effect even if task.run throws.
* 任務執(zhí)行結束后,調用afterExecute(),也可能拋異常,也會導致線程die
* 根據JLS Sec 14.20,這個異常(finally中的異常)會生效
*
* The net effect of the exception mechanics is that afterExecute
* and the thread's UncaughtExceptionHandler have as accurate
* information as we can provide about any problems encountered by
* user code.
*
*
* @param w the worker
*/
final void runWorker(Worker w) {
// runWorker()這個方法執(zhí)行節(jié)點是Worker.run(),而run()方法執(zhí)行節(jié)點是addWorker()里的worker.t.start()。
// 所以執(zhí)行到這個方法的時候,說明線程已經start()了。
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
// 目的是調用worker的tryRelease()方法,將state 置為 0。
// 意思是當前線程已經運行了,如果有人需要intercept()中斷。就可以調用了。
w.unlock(); // allow interrupts
//標識線程是否為正常退出的。不拋意想不到的異常就會將這個值置為false。
//這個標識會傳到processWorkerExit()中,在里邊判斷如果不是正常退出會啟動一個新worker來繼續(xù)處理出現問題的task。
boolean completedAbruptly = true;
try {
//判斷firstTask不為空,或者從workQueue阻塞隊列中拿到了任務。就就開始執(zhí)行task。
//自旋
while (task != null || (task = getTask()) != null) {
//開始任務之前先加鎖,而且是不可重入的互斥鎖。不可重入的目的在Worker中已經說過了。
//上鎖,不是為了防止并發(fā)執(zhí)行任務,為了在shutdown()時不終止正在運行的worker
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
/**
* clearInterruptsForTaskRun操作
* 確保只有在線程stoping時,才會被設置中斷標示,否則清除中斷標示
* 1、如果線程池狀態(tài)>=stop,且當前線程沒有設置中斷狀態(tài),wt.interrupt()
* 2、如果一開始判斷線程池狀態(tài)<stop,但Thread.interrupted()為true,即線程已經被中斷,又清除了中斷標示,再次判斷線程池狀態(tài)是否>=stop
* 是,再次設置中斷標示,wt.interrupt()
* 否,不做操作,清除中斷標示后進行后續(xù)步驟
*/
// RUNNING = -1
// SHUTDOWN = 0
// STOP = 1
// TIDYING = 2
// TERMINATED = 3
if (
(runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP)))
&& !wt.isInterrupted()
)
wt.interrupt();
try {
//任務執(zhí)行前(子類實現,可自定義操作)
beforeExecute(wt, task);
Throwable thrown = null;
try {
//執(zhí)行任務
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
//任務執(zhí)行后(子類實現,可自定義操作)
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;//完成任務數+1
//解鎖
w.unlock();
}
}
completedAbruptly = false;
} finally {
//處理worker的退出
processWorkerExit(w, completedAbruptly);
}
}
執(zhí)行流程:
1、Worker線程啟動后,通過Worker類的run()方法調用runWorker(this)
2、執(zhí)行任務之前,首先worker.unlock(),將AQS的state置為0,允許中斷當前worker線程
3、開始執(zhí)行firstTask,調用task.run(),在執(zhí)行任務前會上鎖wroker.lock(),在執(zhí)行完任務后會解鎖,為了防止在任務運行時被線程池一些中斷操作中斷
4、在任務執(zhí)行前后,可以根據業(yè)務場景自定義beforeExecute() 和 afterExecute()方法
5、無論在beforeExecute()、task.run()、afterExecute()發(fā)生異常上拋,都會導致worker線程終止,進入processWorkerExit()處理worker退出的流程
6、如正常執(zhí)行完當前task后,會通過getTask()從阻塞隊列中獲取新任務,當隊列中沒有任務,且獲取任務超時,那么當前worker也會進入退出流程
<span id="jump5">五、ThreadPoolExecutor.getTask()方法</span>
[圖片上傳失敗...(image-7d2362-1618402940252)]
/**
* Performs blocking or timed wait for a task, depending on
* current configuration settings, or returns null if this worker
* must exit because of any of:
* 1. There are more than maximumPoolSize workers (due to
* a call to setMaximumPoolSize).
* 2. The pool is stopped.
* 3. The pool is shutdown and the queue is empty.
* 4. This worker timed out waiting for a task, and timed-out
* workers are subject to termination (that is,
* {@code allowCoreThreadTimeOut || workerCount > corePoolSize})
* both before and after the timed wait, and if the queue is
* non-empty, this worker is not the last thread in the pool.
*
* 執(zhí)行阻塞隊列的:阻塞獲取方法:take() 或者 超時等待方法:poll(timeout)方法,取決于當前的配置。
* 如果這個worker返回null,必須滿足如下條件:
* 1、超過最大線程數量(因為調用了setMaximumPoolSize())
* 2、線程池stop了
* 3、線程池shutdown了,并且任務隊列queue為空
* 4、這個worker不是核心線程、或者設置了允許核心線程退出。并且超過了keepAliveTime的等待時間,
* 仍然沒有獲取到task,那么return null
*
*
* 如果返回null,那么worker就是要退出了,所以把工作計數 - 1
* @return task, or null if the worker must exit, in which case
* workerCount is decremented
*/
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
// 獲取線程池狀態(tài)
int c = ctl.get();
int rs = runStateOf(c);
//線程池狀態(tài)已經在SHUTDOWN之后了 && (線程池狀態(tài)在STOP之后了 || 工作隊列為空了)
//那么返回null,退出線程。并且調用decrementWorkerCount,(CAS)方式核減調當前線程數。
// RUNNING = -1 << COUNT_BITS;
// SHUTDOWN = 0 << COUNT_BITS;
// STOP = 1 << COUNT_BITS;
// TIDYING = 2 << COUNT_BITS;
// TERMINATED = 3 << COUNT_BITS;
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
// (CAS)方式核減調當前線程數。
//方法里最終是循環(huán)調用 ctl.compareAndSet(expect, expect - 1);,直到成功
decrementWorkerCount();
return null;
}
// 獲取當前線程數
int wc = workerCountOf(c);
// worker是否允許退出?
// 設置了允許核心線程退出 || 當前線程數 > 核心線程數
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
//(當前線程數 > maximumPollSize || (允許線程退出 && timedOut超時事件過后仍然沒有任務))
// && (線程數 > 1 || 任務隊列queue為空)
if (
(wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())
) {
//調用CAS方式,把工作線程數-1,如果-1成功就直接返回null。如果失敗就再循環(huán)一圈。
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
//允許線程退出 ? 調用poll() : 調用take()
//poll(keepAliveTime)方法,是等待超過keepAliveTime之后會返回null。
//take()方法,是一直處于阻塞狀態(tài),直到隊列中有新數據插入時,拿到數據。
//workQueue的源碼分析過,
// 1、其實就是take()獲取時通過線程鎖的Condition屬性控制線程睡眠掛起await()
// Condition notEmpty = lock.newCondition()
// notEmpty.await()睡眠等待。
// 2、在插入時,notEmpty.signal();進行線程喚醒而已。
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
//執(zhí)行到這,意味著當前線程具備退出條件了。下次循環(huán)后再結合其他條件確定是否返回null
timedOut = true;
} catch (InterruptedException retry) {
//如果拋異常,那么退出標識就先置為false,繼續(xù)自旋
timedOut = false;
}
}
}
核心就是利用workQueue阻塞隊列的特性:
A、workQueue.poll():如果在keepAliveTime時間內,阻塞隊列還是沒有任務,返回null
B、workQueue.take():如果阻塞隊列為空,當前線程會被掛起等待await();當隊列中有任務加入時,線程被喚醒signal(),take方法返回任務
<span id="jump6">六、ThreadPoolExecutor.processWorkerExit()方法</span>
/**
* Performs cleanup and bookkeeping for a dying worker. Called
* only from worker threads. Unless completedAbruptly is set,
* assumes that workerCount has already been adjusted to account
* for exit. This method removes thread from worker set, and
* possibly terminates the pool or replaces the worker if either
* it exited due to user task exception or if fewer than
* corePoolSize workers are running or queue is non-empty but
* there are no workers.
*
* 為被干掉的worker調用清理方法和記錄。這個方法只會被worker線程調用。
* @param w the worker
* @param completedAbruptly if the worker died due to user exception
*/
private void processWorkerExit(ThreadPoolExecutor.Worker w, boolean completedAbruptly) {
/**
* 1、worker數量-1
* 如果是突然終止,說明是task執(zhí)行時異常情況導致,即run()方法執(zhí)行時發(fā)生了異常,那么正在工作的worker線程數量需要-1
* 如果不是突然終止,說明是worker線程沒有task可執(zhí)行了,不用-1,因為已經在getTask()方法中-1了
*/
//在runWorker中拋異常了才為true,那么就調用CAS方式將工作線程-1
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
/**
* 2、從Workers Set中移除worker
*/
//線程加鎖
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//已完成任務總數 += worker的完成數
completedTaskCount += w.completedTasks;
//將該worker從HashSet中移除
workers.remove(w);
} finally {
mainLock.unlock();
}
/**
* 3、在對線程池有負效益的操作時,都需要“嘗試終止”線程池
* 主要是判斷線程池是否滿足終止的狀態(tài)
* 如果狀態(tài)滿足,但還有線程池還有線程,嘗試對其發(fā)出中斷響應,使其能進入退出流程
* 沒有線程了,更新狀態(tài)為tidying->terminated
*/
tryTerminate();
/**
* 4、是否需要增加worker線程
* 1.線程池狀態(tài)是running 或 shutdown
* 2.如果當前線程是突然終止的,addWorker()
* 3.如果當前線程不是突然終止的,但當前線程數量 < 要維護的線程數量,addWorker()
* 故如果調用線程池shutdown(),直到workQueue為空前,
* 線程池都會維持corePoolSize個或者1個線程,然后再逐漸銷毀這corePoolSize個或者1個線程
*/
int c = ctl.get();
//如果狀態(tài)是running、shutdown,即tryTerminate()沒有成功終止線程池,嘗試再添加一個worker
if (runStateLessThan(c, STOP)) {
//不是突然完成的,即沒有task任務可以獲取而完成的,計算min,并根據當前worker數量判斷是否需要addWorker()
if (!completedAbruptly) {
//最小值min = 如果允許核心線程退出,那么最小值就是0。否則就是核心線程數。
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
//如果最小值為0 && 工作隊列不為空,那么意味著任務沒有消化完,至少還需要一個worker去消化。
if (min == 0 && ! workQueue.isEmpty())
min = 1;
//如果當前的工作worker數量 >= 最小值,就不需要加worker了。否則就會執(zhí)行下邊的addWorker(null,false)方法,添加worker
if (workerCountOf(c) >= min)
return; // replacement not needed
}
//添加一個沒有firstTask的worker
//只要worker是completedAbruptly突然終止的,或者線程數量小于要維護的數量,就新添一個worker線程。
//因為拋出異常的情況可能是workQueue的task并沒有完成執(zhí)行完。啟動一個空task的worker去消化workQueue隊列
addWorker(null, false);
}
}
<span id="jump7">七、ThreadPoolExecutor.tryTerminate()方法</span>
/**
* Transitions to TERMINATED state if either (SHUTDOWN and pool
* and queue empty) or (STOP and pool empty). If otherwise
* eligible to terminate but workerCount is nonzero, interrupts an
* idle worker to ensure that shutdown signals propagate. This
* method must be called following any action that might make
* termination possible -- reducing worker count or removing tasks
* from the queue during shutdown. The method is non-private to
* allow access from ScheduledThreadPoolExecutor.
*
* 在以下情況將線程池變?yōu)門ERMINATED終止狀態(tài)
* shutdown 且 正在運行的worker 和 workQueue隊列 都empty
* stop 且 沒有正在運行的worker
*
* 這個方法必須在任何可能導致線程池終止的情況下被調用,如:
* 減少worker數量
* shutdown時從queue中移除任務
*
* 這個方法不是私有的,所以允許子類ScheduledThreadPoolExecutor調用
*/
final void tryTerminate() {
//這個for循環(huán)主要是和進入關閉線程池操作的CAS判斷結合使用的
for (;;) {
int c = ctl.get();
/**
* 線程池是否需要終止
* 如果以下3中情況任一為true,return,不進行終止
* 1、還在運行狀態(tài)
* 2、狀態(tài)是TIDYING、或 TERMINATED,已經終止過了
* 3、SHUTDOWN 且 workQueue不為空
*/
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
/**
* 只有shutdown狀態(tài) 且 workQueue為空,或者 stop狀態(tài)能執(zhí)行到這一步
* 如果此時線程池還有線程(正在運行任務,正在等待任務)
* 中斷喚醒一個正在等任務的空閑worker
* 喚醒后再次判斷線程池狀態(tài),會return null,進入processWorkerExit()流程
*/
if (workerCountOf(c) != 0) { // Eligible to terminate 資格終止
interruptIdleWorkers(ONLY_ONE); //中斷workers集合中的空閑任務,參數為true,只中斷一個
return;
}
/**
* 如果狀態(tài)是SHUTDOWN,workQueue也為空了,正在運行的worker也沒有了,開始terminated
*/
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//CAS:將線程池的ctl變成TIDYING(所有的任務被終止,workCount為0,為此狀態(tài)時將會調用terminated()方法),期間ctl有變化就會失敗,會再次for循環(huán)
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
terminated(); //需子類實現
}
finally {
ctl.set(ctlOf(TERMINATED, 0)); //將線程池的ctl變成TERMINATED
termination.signalAll(); //喚醒調用了 等待線程池終止的線程 awaitTermination()
}
return;
}
}
finally {
mainLock.unlock();
}
// else retry on failed CAS
// 如果上面的CAS判斷false,再次循環(huán)
}
}
<span id="jump8">八、ThreadPoolExecutor.interruptIdleWorkers()方法</span>
/**
* Interrupts threads that might be waiting for tasks (as
* indicated by not being locked) so they can check for
* termination or configuration changes. Ignores
* SecurityExceptions (in which case some threads may remain
* uninterrupted).
* 中斷在等待任務的線程(沒有上鎖的),中斷喚醒后,可以判斷線程池狀態(tài)是否變化來決定是否繼續(xù)
*
* @param onlyOne If true, interrupt at most one worker. This is
* called only from tryTerminate when termination is otherwise
* enabled but there are still other workers. In this case, at
* most one waiting worker is interrupted to propagate shutdown
* signals in case(以免) all threads are currently waiting.
* Interrupting any arbitrary thread ensures that newly arriving
* workers since shutdown began will also eventually exit.
* To guarantee eventual termination, it suffices to always
* interrupt only one idle worker, but shutdown() interrupts all
* idle workers so that redundant workers exit promptly, not
* waiting for a straggler task to finish.
*
* onlyOne如果為true,最多interrupt一個worker
* 只有當終止流程已經開始,但線程池還有worker線程時,tryTerminate()方法會做調用onlyOne為true的調用
* (終止流程已經開始指的是:shutdown狀態(tài) 且 workQueue為空,或者 stop狀態(tài))
* 在這種情況下,最多有一個worker被中斷,為了傳播shutdown信號,以免所有的線程都在等待
* 為保證線程池最終能終止,這個操作總是中斷一個空閑worker
* 而shutdown()中斷所有空閑worker,來保證空閑線程及時退出
*/
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock(); //上鎖
try {
for (Worker w : workers) {
Thread t = w.thread;
//w.tryLock(),只有執(zhí)行完task,正在getTask()阻塞狀態(tài)的worker才能獲取到lock。因為runWorker()時獲取到task,會先lock()
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock(); //解鎖
}
}