一、相關概念
ThreadPoolExecutor是jdk內(nèi)置線程池的一個實現(xiàn),基本上大部分情況都會使用這個線程池完成并發(fā)操作,下面是涉及到ThreadPoolExecutor的相關概念。
ThreadPoolExecutor: 這是線程池實現(xiàn)類,會動態(tài)創(chuàng)建多個線程,并發(fā)執(zhí)行提交的多個任務;
Worker: 是個Runnable實現(xiàn)類來的,內(nèi)部會創(chuàng)建一個線程,一直循環(huán)不斷執(zhí)行任務,所以可以認為一個Worker就是一個工作線程;
corePoolSize: 當池中總線程數(shù)<corePoolSize時,提交一個任務創(chuàng)建一個新線程,而不管已經(jīng)存在的線程是不是閑著,通常情況下,一旦線程數(shù)達到了corePoolSize,那么池中總線程數(shù)是不會跌破到corePoolSize以下的。(除非 allowCoreThreadTimeOut=true并且keepAliveTime>0);
maximumPoolSize: 當池中線程數(shù)達到了corePoolSize,這時候新提交的任務就會放入等待隊列中,一般情況下,這些任務會被前面創(chuàng)建的 corePoolSize個線程執(zhí)行。當任務提交速度過快,隊列滿了,這時候,如果當前總線程數(shù)<maximumPoolSize,那么線程池會創(chuàng)建一個新的線程來執(zhí)行新提交的任務,否則根據(jù)策略放棄任務;
keepAliveTime:存活時間,分兩種情況: (1)allowCoreThreadTimeOut=true,所有線程,一旦創(chuàng)建后,在keepAliveTime時間內(nèi),如果沒有任務可以執(zhí)行,則該線程會退出并銷毀,這樣的好處是系統(tǒng)不忙時可以回收線程資源;(2)allowCoreThreadTimeOut=false,如果總線程數(shù)<=corePoolSize,那么這些線程是不會退出的,他們會一直不斷的等待任務并執(zhí)行,哪怕當前沒有任務,但如果線程數(shù)>corePoolSize,而且一旦一個線程閑的時間超過 keepAliveTime則會退出,但一旦降低到corePoolSize,則不會再退出了。
allowCoreThreadTimeOut: 用于決定是否在系統(tǒng)閑時可以逐步回收所有的線程,如果為allowCoreThreadTimeOut=true,必須結合keepAliveTime一起使用,用于決定當線程數(shù)<corePoolSize時,是否要回收這些線程。
workQueue:這是一個阻塞隊列,當線程數(shù)>=corePoolSize,這時候提交的任務將會放入阻塞隊列中,如果阻塞隊列是無界的,那么總的線程數(shù)是不可能>corePoolSize的,即maximumPoolSize屬性就是無用的;如果阻塞隊列是有界的,而且未滿,則任務入隊,否則根據(jù)maximumPoolSize的值判斷是要新建線程執(zhí)行新任務或者是根據(jù)策略丟棄任務。
通過下面演示,熟悉上面相關概念:

二、ThreadPoolExecutor狀態(tài)
ThreadPoolExecutor的五個狀態(tài)
線程池有5個狀態(tài),分別是:
- RUNNING:可以接受新的任務,也可以處理阻塞隊列里的任務
- SHUTDOWN:不接受新的任務,但是可以處理阻塞隊列里的任務
- STOP:不接受新的任務,不處理阻塞隊列里的任務,中斷正在處理的任務
- TIDYING:過渡狀態(tài),也就是說所有的任務都執(zhí)行完了,當前線程池已經(jīng)沒有有效的線程,這個時候線程池的狀態(tài)將會TIDYING,并且將要調(diào)用terminated方法
- TERMINATED:終止狀態(tài)。terminated方法調(diào)用完成以后的狀態(tài)
狀態(tài)的轉換
狀態(tài)之間可以進行轉換:
RUNNING -> SHUTDOWN:手動調(diào)用shutdown方法,或者ThreadPoolExecutor要被GC回收的時候調(diào)用finalize方法,finalize方法內(nèi)部也會調(diào)用shutdown方法
(RUNNING or SHUTDOWN) -> STOP:調(diào)用shutdownNow方法
SHUTDOWN -> TIDYING:當隊列和線程池都為空的時候
STOP -> TIDYING:當線程池為空的時候
TIDYING -> TERMINATED:terminated方法調(diào)用完成之后
狀態(tài)的表示
在ThreadPoolExecutor,利用AtomicInteger(一個提供原子操作的Integer的類)保存狀態(tài)和線程數(shù),整型中32位的前3位用來表示線程池狀態(tài),后3位表示線程池中有效的線程數(shù)。
// 前3位表示狀態(tài),所有線程數(shù)占29位
private static final int COUNT_BITS = Integer.SIZE - 3;
線程池容量大小為 1 << 29 - 1 = 00011111111111111111111111111111(二進制),代碼如下
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
RUNNING狀態(tài) -1 << 29 = 11111111111111111111111111111111 << 29 = 11100000000000000000000000000000(前3位為111):
private static final int RUNNING = -1 << COUNT_BITS;
SHUTDOWN狀態(tài) 0 << 29 = 00000000000000000000000000000000 << 29 = 00000000000000000000000000000000(前3位為000):
private static final int SHUTDOWN = 0 << COUNT_BITS;
STOP狀態(tài) 1 << 29 = 00000000000000000000000000000001 << 29 = 00100000000000000000000000000000(前3位為001):
private static final int STOP = 1 << COUNT_BITS;
TIDYING狀態(tài) 2 << 29 = 00000000000000000000000000000010 << 29 = 01000000000000000000000000000000(前3位為010):
private static final int TIDYING = 2 << COUNT_BITS;
TERMINATED狀態(tài) 3 << 29 = 00000000000000000000000000000011 << 29 = 01100000000000000000000000000000(前3位為011):
private static final int TERMINATED = 3 << COUNT_BITS;
清楚狀態(tài)位之后,下面是獲得狀態(tài)和線程數(shù)的內(nèi)部方法:
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
// 得到線程數(shù),也就是后29位的數(shù)字。 直接跟CAPACITY做一個與操作即可,CAPACITY就是的值就 1 << 29 - 1 = 00011111111111111111111111111111。 與操作的話前面3位肯定為0,相當于直接取后29位的值
private static int workerCountOf(int c) { return c & CAPACITY; }
// 得到狀態(tài),CAPACITY的非操作得到的二進制位11100000000000000000000000000000,然后做在一個與操作,相當于直接取前3位的的值
private static int runStateOf(int c) { return c & ~CAPACITY; }
// 或操作。相當于更新數(shù)量和狀態(tài)兩個操作
private static int ctlOf(int rs, int wc) { return rs | wc; }
三、源碼分析
根據(jù)javadoc中關于ThreadPoolExecutor類的描述可知。ThreadPoolExecutor的實現(xiàn)主要依靠兩個數(shù)據(jù)結構:
- 線程池
- 任務隊列
任務隊列使用的數(shù)據(jù)結構比較容易想到,可以采用實現(xiàn)了java.util.concurrent.BlockingQueue接口的類。
線程池該怎么實現(xiàn)才能讓線程池里的任務持續(xù)執(zhí)行一個接一個的任務呢?
ThreadPoolExecutor類
public class ThreadPoolExecutor extends AbstractExecutorService {
...
/**
* Set containing all worker threads in pool. Accessed only when
* holding mainLock.
*/
private final HashSet<Worker> workers = new HashSet<Worker>();
...
/**
* The queue used for holding tasks and handing off to worker
* threads. We do not require that workQueue.poll() returning
* null necessarily means that workQueue.isEmpty(), so rely
* solely on isEmpty to see if the queue is empty (which we must
* do for example when deciding whether to transition from
* SHUTDOWN to TIDYING). This accommodates special-purpose
* queues such as DelayQueues for which poll() is allowed to
* return null even if it may later return non-null when delays
* expire.
*/
private final BlockingQueue<Runnable> workQueue;
...
}
如代碼中的注釋所說,workers就是存放工作線程的線程池,就是一個簡單的HashSet。那么,關鍵信息一定是藏在這個Worker類里了。
Worker類
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
/**
* This class will never be serialized, but we provide a
* serialVersionUID to suppress a javac warning.
*/
private static final long serialVersionUID = 6138294804551838833L;
/** Thread this worker is running in. Null if factory fails. */
final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;
/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
// 使用ThreadFactory構造Thread,這個構造的Thread內(nèi)部的Runnable就是本身,也就是Worker。所以得到Worker的thread并start的時候,會執(zhí)行Worker的run方法,也就是執(zhí)行ThreadPoolExecutor的runWorker方法
setState(-1); //把狀態(tài)位設置成-1,這樣任何線程都不能得到Worker的鎖,除非調(diào)用了unlock方法。這個unlock方法會在runWorker方法中一開始就調(diào)用,這是為了確保Worker構造出來之后,沒有任何線程能夠得到它的鎖,除非調(diào)用了runWorker之后,其他線程才能獲得Worker的鎖
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this);
}
// Lock methods
//
// The value 0 represents the unlocked state.
// The value 1 represents the locked state.
protected boolean isHeldExclusively() {
return getState() != 0;
}
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
Worker是ThreadPoolExecutor的內(nèi)部類,成員變量thread就是實際執(zhí)行任務的線程。這個thread不直接執(zhí)行用戶提交的任務,它執(zhí)行的任務就是它所在的Worker對象。 Worker對象的run()方法調(diào)用了ThreadPoolExecutor.runWorker(Worker w)方法。
ThreadPoolExecutor.runWorker(Worker w)
final void runWorker(Worker w) {
Thread wt = Thread.currentThread(); // 得到當前線程
Runnable task = w.firstTask; // 得到Worker中的任務task,也就是用戶傳入的task
w.firstTask = null; // 將Worker中的任務置空
w.unlock(); // allow interrupts。
boolean completedAbruptly = true;
try {
// 如果worker中的任務不為空,繼續(xù)知否,否則使用getTask獲得任務。一直死循環(huán),除非得到的任務為空才退出
while (task != null || (task = getTask()) != null) {
w.lock(); // 如果拿到了任務,給自己上鎖,表示當前Worker已經(jīng)要開始執(zhí)行任務了,已經(jīng)不是閑置Worker(閑置Worker的解釋請看下面的線程池關閉)
// 在執(zhí)行任務之前先做一些處理。 1. 如果線程池已經(jīng)處于STOP狀態(tài)并且當前線程沒有被中斷,中斷線程 2. 如果線程池還處于RUNNING或SHUTDOWN狀態(tài),并且當前線程已經(jīng)被中斷了,重新檢查一下線程池狀態(tài),如果處于STOP狀態(tài)并且沒有被中斷,那么中斷線程
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task); // 任務執(zhí)行前需要做什么,ThreadPoolExecutor是個空實現(xiàn)
Throwable thrown = null;
try {
task.run(); // 真正的開始執(zhí)行任務,調(diào)用的是run方法,而不是start方法。這里run的時候可能會被中斷,比如線程池調(diào)用了shutdownNow方法
} catch (RuntimeException x) { // 任務執(zhí)行發(fā)生的異常全部拋出,不在runWorker中處理
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown); // 任務執(zhí)行結束需要做什么,ThreadPoolExecutor是個空實現(xiàn)
}
} finally {
task = null;
w.completedTasks++; // 記錄執(zhí)行任務的個數(shù)
w.unlock(); // 執(zhí)行完任務之后,解鎖,Worker變成閑置Worker
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly); // 回收Worker方法
}
}
程序的大致邏輯就是在firstTask或getTask()返回方法不為空的情況下執(zhí)行task.run()。這里的getTask()方法就是從用戶任務隊列workQueue獲取任務的那個方法。
我們看一下getTask方法是如何獲得任務的:
ThreadPoolExecutor.getTask()
// 如果發(fā)生了以下四件事中的任意一件,那么Worker需要被回收:
// 1. Worker個數(shù)比線程池最大大小要大
// 2. 線程池處于STOP狀態(tài)
// 3. 線程池處于SHUTDOWN狀態(tài)并且阻塞隊列為空
// 4. 使用超時時間從阻塞隊列里拿數(shù)據(jù),并且超時之后沒有拿到數(shù)據(jù)(allowCoreThreadTimeOut || workerCount > corePoolSize)
private Runnable getTask() {
boolean timedOut = false; // 如果使用超時時間并且也沒有拿到任務的標識
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 如果線程池是SHUTDOWN狀態(tài)并且阻塞隊列為空的話,worker數(shù)量減一,直接返回null(SHUTDOWN狀態(tài)還會處理阻塞隊列任務,但是阻塞隊列為空的話就結束了),如果線程池是STOP狀態(tài)的話,worker數(shù)量建議,直接返回null(STOP狀態(tài)不處理阻塞隊列任務)[方法一開始注釋的2,3兩點,返回null,開始Worker回收]
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
boolean timed; // 標記從隊列中取任務時是否設置超時時間,如果為true說明這個worker可能需要回收,為false的話這個worker會一直存在,并且阻塞當前線程等待阻塞隊列中有數(shù)據(jù)
for (;;) {
int wc = workerCountOf(c); // 得到當前線程池Worker個數(shù)
// allowCoreThreadTimeOut屬性默認為false,表示線程池中的核心線程在閑置狀態(tài)下還保留在池中;如果是true表示核心線程使用keepAliveTime這個參數(shù)來作為超時時間
// 如果worker數(shù)量比基本大小要大的話,timed就為true,需要進行回收worker
timed = allowCoreThreadTimeOut || wc > corePoolSize;
if (wc <= maximumPoolSize && ! (timedOut && timed)) // 方法一開始注釋的1,4兩點,會進行下一步worker數(shù)量減一
break;
if (compareAndDecrementWorkerCount(c)) // worker數(shù)量減一,返回null,之后會進行Worker回收工作
return null;
c = ctl.get(); // 重新檢查線程池狀態(tài)
if (runStateOf(c) != rs) // 線程池狀態(tài)改變的話重新開始外部循環(huán),否則繼續(xù)內(nèi)部循環(huán)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
try {
// 如果需要設置超時時間,使用poll方法,否則使用take方法一直阻塞等待阻塞隊列新進數(shù)據(jù)
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false; // 閑置Worker被中斷
}
}
}
Worker類的執(zhí)行邏輯大致就是這樣了。那么ThreadPoolExecutor是如何新建和啟動這些Worker類的呢?
來看一下我們提交任務時使用的ThreadPoolExecutor.execute(Runnable command)方法。
ThreadPoolExecutor.execute(Runable command)
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) { // 第一個步驟,滿足線程池中的線程大小比基本大小要小
if (addWorker(command, true)) // addWorker方法第二個參數(shù)true表示使用基本大小
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) { // 第二個步驟,線程池的線程大小比基本大小要大,并且線程池還在RUNNING狀態(tài),阻塞隊列也沒滿的情況,加到阻塞隊列里
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command)) // 雖然滿足了第二個步驟,但是這個時候可能突然線程池關閉了,所以再做一層判斷
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false)) // 第三個步驟,直接使用線程池最大大小。addWorker方法第二個參數(shù)false表示使用最大大小
reject(command);
}
除去處理ThreadPoolExecutor對象狀態(tài)的代碼,最關鍵的兩段代碼就是workQueue.offer(command)和addWorker(command, true)。
workQueue.offer(command)是將任務加入隊列;新建和啟動Worker對象的代碼就是在addWorker(command, true)里了。
addWorker(Runnable firstTask, boolean core)
// 兩個參數(shù),firstTask表示需要跑的任務。boolean類型的core參數(shù)為true的話表示使用線程池的基本大小,為false使用線程池最大大小
// 返回值是boolean類型,true表示新任務被接收了,并且執(zhí)行了。否則是false
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c); // 線程池當前狀態(tài)
// 這個判斷轉換成 rs >= SHUTDOWN && (rs != SHUTDOWN || firstTask != null || workQueue.isEmpty)。
// 概括為3個條件:
// 1. 線程池不在RUNNING狀態(tài)并且狀態(tài)是STOP、TIDYING或TERMINATED中的任意一種狀態(tài)
// 2. 線程池不在RUNNING狀態(tài),線程池接受了新的任務
// 3. 線程池不在RUNNING狀態(tài),阻塞隊列為空。 滿足這3個條件中的任意一個的話,拒絕執(zhí)行任務
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c); // 線程池線程個數(shù)
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize)) // 如果線程池線程數(shù)量超過線程池最大容量或者線程數(shù)量超過了基本大小(core參數(shù)為true,core參數(shù)為false的話判斷超過最大大小)
return false; // 超過直接返回false
if (compareAndIncrementWorkerCount(c)) // 沒有超過各種大小的話,cas操作線程池線程數(shù)量+1,cas成功的話跳出循環(huán)
break retry;
c = ctl.get(); // 重新檢查狀態(tài)
if (runStateOf(c) != rs) // 如果狀態(tài)改變了,重新循環(huán)操作
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
// 走到這一步說明cas操作成功了,線程池線程數(shù)量+1
boolean workerStarted = false; // 任務是否成功啟動標識
boolean workerAdded = false; // 任務是否添加成功標識
Worker w = null;
try {
final ReentrantLock mainLock = this.mainLock; // 得到線程池的可重入鎖
w = new Worker(firstTask); // 基于任務firstTask構造worker
final Thread t = w.thread; // 使用Worker的屬性thread,這個thread是使用ThreadFactory構造出來的
if (t != null) { // ThreadFactory構造出的Thread有可能是null,做個判斷
mainLock.lock(); // 鎖住,防止并發(fā)
try {
// 在鎖住之后再重新檢測一下狀態(tài)
int c = ctl.get();
int rs = runStateOf(c);
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) { // 如果線程池在RUNNING狀態(tài)或者線程池在SHUTDOWN狀態(tài)并且任務是個null
if (t.isAlive()) // 判斷線程是否還活著,也就是說線程已經(jīng)啟動并且還沒死掉
throw new IllegalThreadStateException(); // 如果存在已經(jīng)啟動并且還沒死的線程,拋出異常
workers.add(w); // worker添加到線程池的workers屬性中,是個HashSet
int s = workers.size(); // 得到目前線程池中的線程個數(shù)
if (s > largestPoolSize) // 如果線程池中的線程個數(shù)超過了線程池中的最大線程數(shù)時,更新一下這個最大線程數(shù)
largestPoolSize = s;
workerAdded = true; // 標識一下任務已經(jīng)添加成功
}
} finally {
mainLock.unlock(); // 解鎖
}
if (workerAdded) { // 如果任務添加成功,運行任務,改變一下任務成功啟動標識
t.start(); // 啟動線程,這里的t是Worker中的thread屬性,所以相當于就是調(diào)用了Worker的run方法
workerStarted = true;
}
}
} finally {
if (! workerStarted) // 如果任務啟動失敗,調(diào)用addWorkerFailed方法
addWorkerFailed(w);
}
return workerStarted;
}
這個方法里執(zhí)行了三個我們關注的操作:
- 新建Worker對象——w = new Worker(firstTask);
- 將Worker對象加入workers集合——workers.add(w);
- 啟動Worker對象里的thread——t.start();
總結
簡單概括一下ThreadPoolExecutor的運行過程(不包括線程池大小控制、線程池關閉等邏輯):
- ThreadPoolExecutor.execute(Runnable command)提交任務
- 如果Worker數(shù)量未達到上限,新建一個Worker并將command作為Worker的firstTask
- 如果Worker數(shù)量已達到上限,則將command放入workQueue
- 每個啟動了的worker先執(zhí)行firstTask,然后繼續(xù)從workQueue獲取task來執(zhí)行