
前提
很早之前就打算看一次JUC線程池ThreadPoolExecutor的源碼實現(xiàn),由于近段時間比較忙,一直沒有時間整理出源碼分析的文章。之前在分析擴展線程池實現(xiàn)可回調(diào)的Future時候曾經(jīng)提到并發(fā)大師Doug Lea在設(shè)計線程池ThreadPoolExecutor的提交任務(wù)的頂層接口Executor只有一個無狀態(tài)的執(zhí)行方法:
public interface Executor {
void execute(Runnable command);
}
而ExecutorService提供了很多擴展方法底層基本上是基于Executor#execute()方法進(jìn)行擴展。本文著重分析ThreadPoolExecutor#execute()的實現(xiàn),筆者會從實現(xiàn)原理、源碼實現(xiàn)等角度結(jié)合簡化例子進(jìn)行詳細(xì)的分析。ThreadPoolExecutor的源碼從JDK8到JDK11基本沒有變化,本文編寫的時候使用的是JDK11。
ThreadPoolExecutor的原理
ThreadPoolExecutor里面使用到JUC同步器框架AbstractQueuedSynchronizer(俗稱AQS)、大量的位操作、CAS操作。ThreadPoolExecutor提供了固定活躍線程(核心線程)、額外的線程(線程池容量 - 核心線程數(shù)這部分額外創(chuàng)建的線程,下面稱為非核心線程)、任務(wù)隊列以及拒絕策略這幾個重要的功能。
JUC同步器框架
ThreadPoolExecutor里面使用到JUC同步器框架,主要用于四個方面:
- 全局鎖
mainLock成員屬性,是可重入鎖ReentrantLock類型,主要是用于訪問工作線程Worker集合和進(jìn)行數(shù)據(jù)統(tǒng)計記錄時候的加鎖操作。 - 條件變量
termination,Condition類型,主要用于線程進(jìn)行等待終結(jié)awaitTermination()方法時的帶期限阻塞。 - 任務(wù)隊列
workQueue,BlockingQueue類型,任務(wù)隊列,用于存放待執(zhí)行的任務(wù)。 - 工作線程,內(nèi)部類
Worker類型,是線程池中真正的工作線程對象。
關(guān)于AQS筆者之前寫過一篇相關(guān)源碼分析的文章:JUC同步器框架AbstractQueuedSynchronizer源碼圖文分析。
核心線程
這里先參考ThreadPoolExecutor的實現(xiàn)并且進(jìn)行簡化,實現(xiàn)一個只有核心線程的線程池,要求如下:
- 暫時不考慮任務(wù)執(zhí)行異常情況下的處理。
- 任務(wù)隊列為無界隊列。
- 線程池容量固定為核心線程數(shù)量。
- 暫時不考慮拒絕策略。
public class CoreThreadPool implements Executor {
private BlockingQueue<Runnable> workQueue;
private static final AtomicInteger COUNTER = new AtomicInteger();
private int coreSize;
private int threadCount = 0;
public CoreThreadPool(int coreSize) {
this.coreSize = coreSize;
this.workQueue = new LinkedBlockingQueue<>();
}
@Override
public void execute(Runnable command) {
if (++threadCount <= coreSize) {
new Worker(command).start();
} else {
try {
workQueue.put(command);
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
}
}
private class Worker extends Thread {
private Runnable firstTask;
public Worker(Runnable runnable) {
super(String.format("Worker-%d", COUNTER.getAndIncrement()));
this.firstTask = runnable;
}
@Override
public void run() {
Runnable task = this.firstTask;
while (null != task || null != (task = getTask())) {
try {
task.run();
} finally {
task = null;
}
}
}
}
private Runnable getTask() {
try {
return workQueue.take();
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
}
public static void main(String[] args) throws Exception {
CoreThreadPool pool = new CoreThreadPool(5);
IntStream.range(0, 10)
.forEach(i -> pool.execute(() ->
System.out.println(String.format("Thread:%s,value:%d", Thread.currentThread().getName(), i))));
Thread.sleep(Integer.MAX_VALUE);
}
}
某次運行結(jié)果如下:
Thread:Worker-0,value:0
Thread:Worker-3,value:3
Thread:Worker-2,value:2
Thread:Worker-1,value:1
Thread:Worker-4,value:4
Thread:Worker-1,value:5
Thread:Worker-2,value:8
Thread:Worker-4,value:7
Thread:Worker-0,value:6
Thread:Worker-3,value:9
設(shè)計此線程池的時候,核心線程是懶創(chuàng)建的,如果線程空閑的時候則阻塞在任務(wù)隊列的take()方法,其實對于ThreadPoolExecutor也是類似這樣實現(xiàn),只是如果使用了keepAliveTime并且允許核心線程超時(allowCoreThreadTimeOut設(shè)置為true)則會使用BlockingQueue#poll(keepAliveTime)進(jìn)行輪詢代替永久阻塞。
其他附加功能
構(gòu)建ThreadPoolExecutor實例的時候,需要定義maximumPoolSize(線程池最大線程數(shù))和corePoolSize(核心線程數(shù))。當(dāng)任務(wù)隊列是有界的阻塞隊列,核心線程滿負(fù)載,任務(wù)隊列已經(jīng)滿的情況下,會嘗試創(chuàng)建額外的maximumPoolSize - corePoolSize個線程去執(zhí)行新提交的任務(wù)。當(dāng)ThreadPoolExecutor這里實現(xiàn)的兩個主要附加功能是:
- 一定條件下會創(chuàng)建非核心線程去執(zhí)行任務(wù),非核心線程的回收周期(線程生命周期終結(jié)時刻)是
keepAliveTime,線程生命周期終結(jié)的條件是:下一次通過任務(wù)隊列獲取任務(wù)的時候并且存活時間超過keepAliveTime。 - 提供拒絕策略,也就是在核心線程滿負(fù)載、任務(wù)隊列已滿、非核心線程滿負(fù)載的條件下會觸發(fā)拒絕策略。
源碼分析
先分析線程池的關(guān)鍵屬性,接著分析其狀態(tài)控制,最后重點分析ThreadPoolExecutor#execute()方法。
關(guān)鍵屬性
public class ThreadPoolExecutor extends AbstractExecutorService {
// 控制變量-存放狀態(tài)和線程數(shù)
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 任務(wù)隊列,必須是阻塞隊列
private final BlockingQueue<Runnable> workQueue;
// 工作線程集合,存放線程池中所有的(活躍的)工作線程,只有在持有全局鎖mainLock的前提下才能訪問此集合
private final HashSet<Worker> workers = new HashSet<>();
// 全局鎖
private final ReentrantLock mainLock = new ReentrantLock();
// awaitTermination方法使用的等待條件變量
private final Condition termination = mainLock.newCondition();
// 記錄峰值線程數(shù)
private int largestPoolSize;
// 記錄已經(jīng)成功執(zhí)行完畢的任務(wù)數(shù)
private long completedTaskCount;
// 線程工廠,用于創(chuàng)建新的線程實例
private volatile ThreadFactory threadFactory;
// 拒絕執(zhí)行處理器,對應(yīng)不同的拒絕策略
private volatile RejectedExecutionHandler handler;
// 空閑線程等待任務(wù)的時間周期,單位是納秒
private volatile long keepAliveTime;
// 是否允許核心線程超時,如果為true則keepAliveTime對核心線程也生效
private volatile boolean allowCoreThreadTimeOut;
// 核心線程數(shù)
private volatile int corePoolSize;
// 線程池容量
private volatile int maximumPoolSize;
// 省略其他代碼
}
下面看參數(shù)列表最長的構(gòu)造函數(shù):
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
可以自定義核心線程數(shù)、線程池容量(最大線程數(shù))、空閑線程等待任務(wù)周期、任務(wù)隊列、線程工廠、拒絕策略。下面簡單分析一下每個參數(shù)的含義和作用:
corePoolSize:int類型,核心線程數(shù)量。maximumPoolSize:int類型,最大線程數(shù)量,也就是線程池的容量。keepAliveTime:long類型,線程空閑等待時間,也和工作線程的生命周期有關(guān),下文會分析。unit:TimeUnit類型,keepAliveTime參數(shù)的時間單位,實際上keepAliveTime最終會轉(zhuǎn)化為納秒。workQueue:BlockingQueue類型,等待隊列或者叫任務(wù)隊列。threadFactory:ThreadFactory類型,線程工廠,用于創(chuàng)建工作線程(包括核心線程和非核心線程),默認(rèn)使用Executors.defaultThreadFactory()作為內(nèi)建線程工廠實例,一般自定義線程工廠才能更好地跟蹤工作線程。handler:-
RejectedExecutionHandler類型,線程池的拒絕執(zhí)行處理器,更多時候稱為拒絕策略,拒絕策略執(zhí)行的時機是當(dāng)阻塞隊列已滿、沒有空閑的線程(包括核心線程和非核心線程)并且繼續(xù)提交任務(wù)。提供了4種內(nèi)建的拒絕策略實現(xiàn):
AbortPolicy:直接拒絕策略,也就是不會執(zhí)行任務(wù),直接拋出RejectedExecutionException,這是默認(rèn)的拒絕策略。DiscardPolicy:拋棄策略,也就是直接忽略提交的任務(wù)(通俗來說就是空實現(xiàn))。DiscardOldestPolicy:拋棄最老任務(wù)策略,也就是通過poll()方法取出任務(wù)隊列隊頭的任務(wù)拋棄,然后執(zhí)行當(dāng)前提交的任務(wù)。CallerRunsPolicy:調(diào)用者執(zhí)行策略,也就是當(dāng)前調(diào)用Executor#execute()的線程直接調(diào)用任務(wù)Runnable#run(),一般不希望任務(wù)丟失會選用這種策略,但從實際角度來看,原來的異步調(diào)用意圖會退化為同步調(diào)用。
狀態(tài)控制
狀態(tài)控制主要圍繞原子整型成員變量ctl:
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int COUNT_MASK = (1 << COUNT_BITS) - 1;
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
// 通過ctl值獲取運行狀態(tài)
private static int runStateOf(int c) { return c & ~COUNT_MASK; }
// 通過ctl值獲取工作線程數(shù)
private static int workerCountOf(int c) { return c & COUNT_MASK; }
// 通過運行狀態(tài)和工作線程數(shù)計算ctl的值,或運算
private static int ctlOf(int rs, int wc) { return rs | wc; }
private static boolean runStateLessThan(int c, int s) {
return c < s;
}
private static boolean runStateAtLeast(int c, int s) {
return c >= s;
}
private static boolean isRunning(int c) {
return c < SHUTDOWN;
}
// CAS操作線程數(shù)增加1
private boolean compareAndIncrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect + 1);
}
// CAS操作線程數(shù)減少1
private boolean compareAndDecrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect - 1);
}
// 線程數(shù)直接減少1
private void decrementWorkerCount() {
ctl.addAndGet(-1);
}
接下來分析一下線程池的狀態(tài)變量,工作線程上限數(shù)量位的長度是COUNT_BITS,它的值是Integer.SIZE - 3,也就是正整數(shù)29:
我們知道,整型包裝類型Integer實例的大小是4 byte,一共32 bit,也就是一共有32個位用于存放0或者1。在ThreadPoolExecutor實現(xiàn)中,使用32位的整型包裝類型存放工作線程數(shù)和線程池狀態(tài)。其中,低29位用于存放工作線程數(shù),而高3位用于存放線程池狀態(tài),所以線程池的狀態(tài)最多只能有23種。工作線程上限數(shù)量為229 - 1,超過5億,這個數(shù)量在短時間內(nèi)不用考慮會超限。
接著看工作線程上限數(shù)量掩碼COUNT_MASK,它的值是(1 < COUNT_BITS) - l,也就是1左移29位,再減去1,如果補全32位,它的位視圖如下:

然后就是線程池的狀態(tài)常量,這里只詳細(xì)分析其中一個,其他類同,這里看RUNNING狀態(tài):
// -1的補碼為:111-11111111111111111111111111111
// 左移29位后:111-00000000000000000000000000000
// 10進(jìn)制值為:-536870912
// 高3位111的值就是表示線程池正在處于運行狀態(tài)
private static final int RUNNING = -1 << COUNT_BITS;
控制變量ctl的組成就是通過線程池運行狀態(tài)rs和工作線程數(shù)wc通過或運算得到的:
// rs=RUNNING值為:111-00000000000000000000000000000
// wc的值為0:000-00000000000000000000000000000
// rs | wc的結(jié)果為:111-00000000000000000000000000000
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static int ctlOf(int rs, int wc) {
return rs | wc;
}
那么我們怎么從ctl中取出高3位的線程池狀態(tài)?上面源碼中提供的runStateOf()方法就是提取運行狀態(tài):
// 先把COUNT_MASK取反(~COUNT_MASK),
得到:111-00000000000000000000000000000
// ctl位圖特點是:xxx-yyyyyyyyyyyyyyyyyyyyyyyyyyyyyy
// 兩者做一次與運算即可得到高3位xxx
private static int runStateOf(int c){
return c & ~COUNT_MASK;
}
同理,取出低29位的工作線程數(shù)量只需要把ctl和COUNT_MASK(000-11111111111111111111111111111)做一次與運算即可。
工作線程數(shù)為0的前提下,小結(jié)一下線程池的運行狀態(tài)常量:

這里有一個比較特殊的技巧,由于運行狀態(tài)值存放在高3位,所以可以直接通過十進(jìn)制值(甚至可以忽略低29位,直接用ctl進(jìn)行比較,或者使用ctl和線程池狀態(tài)常量進(jìn)行比較)來比較和判斷線程池的狀態(tài):
工作線程數(shù)為0的前提下:RUNNING(-536870912) < SHUTDOWN(0) < STOP(536870912) < TIDYING(1073741824) < TERMINATED(1610612736)
下面這三個方法就是使用這種技巧:
// ctl和狀態(tài)常量比較,判斷是否小于
private static boolean runStateLessThan(int c, int s) {
return c < s;
}
// ctl和狀態(tài)常量比較,判斷是否小于或等于
private static boolean runStateAtLeast(int c, int s) {
return c >= s;
}
// ctl和狀態(tài)常量SHUTDOWN比較,判斷是否處于RUNNING狀態(tài)
private static boolean isRunning(int c) {
return c < SHUTDOWN;
}
最后是線程池狀態(tài)的躍遷圖:

PS:線程池源碼中有很多中間變量用了簡單的單字母表示,例如c就是表示ctl、wc就是表示worker count、rs就是表示running status。
execute方法源碼分析
線程池異步執(zhí)行任務(wù)的方法實現(xiàn)是ThreadPoolExecutor#execute(),源碼如下:
// 執(zhí)行命令,其中命令(下面稱任務(wù))對象是Runnable的實例
public void execute(Runnable command) {
// 判斷命令(任務(wù))對象非空
if (command == null)
throw new NullPointerException();
// 獲取ctl的值
int c = ctl.get();
// 判斷如果當(dāng)前工作線程數(shù)小于核心線程數(shù),則創(chuàng)建新的核心線程并且執(zhí)行傳入的任務(wù)
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
// 如果創(chuàng)建新的核心線程成功則直接返回
return;
// 這里說明創(chuàng)建核心線程失敗,需要更新ctl的臨時變量c
c = ctl.get();
}
// 走到這里說明創(chuàng)建新的核心線程失敗,也就是當(dāng)前工作線程數(shù)大于等于corePoolSize
// 判斷線程池是否處于運行中狀態(tài),同時嘗試用非阻塞方法向任務(wù)隊列放入任務(wù)(放入任務(wù)失敗返回false)
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 這里是向任務(wù)隊列投放任務(wù)成功,對線程池的運行中狀態(tài)做二次檢查
// 如果線程池二次檢查狀態(tài)是非運行中狀態(tài),則從任務(wù)隊列移除當(dāng)前的任務(wù)調(diào)用拒絕策略處理之(也就是移除前面成功入隊的任務(wù)實例)
if (! isRunning(recheck) && remove(command))
// 調(diào)用拒絕策略處理任務(wù) - 返回
reject(command);
// 走到下面的else if分支,說明有以下的前提:
// 0、待執(zhí)行的任務(wù)已經(jīng)成功加入任務(wù)隊列
// 1、線程池可能是RUNNING狀態(tài)
// 2、傳入的任務(wù)可能從任務(wù)隊列中移除失?。ㄒ瞥〉奈ㄒ豢赡芫褪侨蝿?wù)已經(jīng)被執(zhí)行了)
// 如果當(dāng)前工作線程數(shù)量為0,則創(chuàng)建一個非核心線程并且傳入的任務(wù)對象為null - 返回
// 也就是創(chuàng)建的非核心線程不會馬上運行,而是等待獲取任務(wù)隊列的任務(wù)去執(zhí)行
// 如果前工作線程數(shù)量不為0,原來應(yīng)該是最后的else分支,但是可以什么也不做,因為任務(wù)已經(jīng)成功入隊列,總會有合適的時機分配其他空閑線程去執(zhí)行它
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 走到這里說明有以下的前提:
// 0、線程池中的工作線程總數(shù)已經(jīng)大于等于corePoolSize(簡單來說就是核心線程已經(jīng)全部懶創(chuàng)建完畢)
// 1、線程池可能不是RUNNING狀態(tài)
// 2、線程池可能是RUNNING狀態(tài)同時任務(wù)隊列已經(jīng)滿了
// 如果向任務(wù)隊列投放任務(wù)失敗,則會嘗試創(chuàng)建非核心線程傳入任務(wù)執(zhí)行
// 創(chuàng)建非核心線程失敗,此時需要拒絕執(zhí)行任務(wù)
else if (!addWorker(command, false))
// 調(diào)用拒絕策略處理任務(wù) - 返回
reject(command);
}
這里簡單分析一下整個流程:
- 如果當(dāng)前工作線程總數(shù)小于
corePoolSize,則直接創(chuàng)建核心線程執(zhí)行任務(wù)(任務(wù)實例會傳入直接用于構(gòu)造工作線程實例)。 - 如果當(dāng)前工作線程總數(shù)大于等于
corePoolSize,判斷線程池是否處于運行中狀態(tài),同時嘗試用非阻塞方法向任務(wù)隊列放入任務(wù),這里會二次檢查線程池運行狀態(tài),如果當(dāng)前工作線程數(shù)量為0,則創(chuàng)建一個非核心線程并且傳入的任務(wù)對象為null。 - 如果向任務(wù)隊列投放任務(wù)失?。ㄈ蝿?wù)隊列已經(jīng)滿了),則會嘗試創(chuàng)建非核心線程傳入任務(wù)實例執(zhí)行。
- 如果創(chuàng)建非核心線程失敗,此時需要拒絕執(zhí)行任務(wù),調(diào)用拒絕策略處理任務(wù)。
這里是一個疑惑點:為什么需要二次檢查線程池的運行狀態(tài),當(dāng)前工作線程數(shù)量為0,嘗試創(chuàng)建一個非核心線程并且傳入的任務(wù)對象為null?這個可以看API注釋:
如果一個任務(wù)成功加入任務(wù)隊列,我們依然需要二次檢查是否需要添加一個工作線程(因為所有存活的工作線程有可能在最后一次檢查之后已經(jīng)終結(jié))或者執(zhí)行當(dāng)前方法的時候線程池是否已經(jīng)shutdown了。所以我們需要二次檢查線程池的狀態(tài),必須時把任務(wù)從任務(wù)隊列中移除或者在沒有可用的工作線程的前提下新建一個工作線程。
任務(wù)提交流程從調(diào)用者的角度來看如下:

addWorker方法源碼分析
boolean addWorker(Runnable firstTask, boolean core)方法的第一的參數(shù)可以用于直接傳入任務(wù)實例,第二個參數(shù)用于標(biāo)識將要創(chuàng)建的工作線程是否核心線程。方法源碼如下:
// 添加工作線程,如果返回false說明沒有新創(chuàng)建工作線程,如果返回true說明創(chuàng)建和啟動工作線程成功
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
// 注意這是一個死循環(huán) - 最外層循環(huán)
for (int c = ctl.get();;) {
// 這個是十分復(fù)雜的條件,這里先拆分多個與(&&)條件:
// 1. 線程池狀態(tài)至少為SHUTDOWN狀態(tài),也就是rs >= SHUTDOWN(0)
// 2. 線程池狀態(tài)至少為STOP狀態(tài),也就是rs >= STOP(1),或者傳入的任務(wù)實例firstTask不為null,或者任務(wù)隊列為空
// 其實這個判斷的邊界是線程池狀態(tài)為shutdown狀態(tài)下,不會再接受新的任務(wù),在此前提下如果狀態(tài)已經(jīng)到了STOP、或者傳入任務(wù)不為空、或者任務(wù)隊列為空(已經(jīng)沒有積壓任務(wù))都不需要添加新的線程
if (runStateAtLeast(c, SHUTDOWN)
&& (runStateAtLeast(c, STOP)
|| firstTask != null
|| workQueue.isEmpty()))
return false;
// 注意這也是一個死循環(huán) - 二層循環(huán)
for (;;) {
// 這里每一輪循環(huán)都會重新獲取工作線程數(shù)wc
// 1. 如果傳入的core為true,表示將要創(chuàng)建核心線程,通過wc和corePoolSize判斷,如果wc >= corePoolSize,則返回false表示創(chuàng)建核心線程失敗
// 1. 如果傳入的core為false,表示將要創(chuàng)非建核心線程,通過wc和maximumPoolSize判斷,如果wc >= maximumPoolSize,則返回false表示創(chuàng)建非核心線程失敗
if (workerCountOf(c)
>= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
return false;
// 成功通過CAS更新工作線程數(shù)wc,則break到最外層的循環(huán)
if (compareAndIncrementWorkerCount(c))
break retry;
// 走到這里說明了通過CAS更新工作線程數(shù)wc失敗,這個時候需要重新判斷線程池的狀態(tài)是否由RUNNING已經(jīng)變?yōu)镾HUTDOWN
c = ctl.get(); // Re-read ctl
// 如果線程池狀態(tài)已經(jīng)由RUNNING已經(jīng)變?yōu)镾HUTDOWN,則重新跳出到外層循環(huán)繼續(xù)執(zhí)行
if (runStateAtLeast(c, SHUTDOWN))
continue retry;
// 如果線程池狀態(tài)依然是RUNNING,CAS更新工作線程數(shù)wc失敗說明有可能是并發(fā)更新導(dǎo)致的失敗,則在內(nèi)層循環(huán)重試即可
// else CAS failed due to workerCount change; retry inner loop
}
}
// 標(biāo)記工作線程是否啟動成功
boolean workerStarted = false;
// 標(biāo)記工作線程是否創(chuàng)建成功
boolean workerAdded = false;
Worker w = null;
try {
// 傳入任務(wù)實例firstTask創(chuàng)建Worker實例,Worker構(gòu)造里面會通過線程工廠創(chuàng)建新的Thread對象,所以下面可以直接操作Thread t = w.thread
// 這一步Worker實例已經(jīng)創(chuàng)建,但是沒有加入工作線程集合或者啟動它持有的線程Thread實例
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
// 這里需要全局加鎖,因為會改變一些指標(biāo)值和非線程安全的集合
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int c = ctl.get();
// 這里主要在加鎖的前提下判斷ThreadFactory創(chuàng)建的線程是否存活或者判斷獲取鎖成功之后線程池狀態(tài)是否已經(jīng)更變?yōu)镾HUTDOWN
// 1. 如果線程池狀態(tài)依然為RUNNING,則只需要判斷線程實例是否存活,需要添加到工作線程集合和啟動新的Worker
// 2. 如果線程池狀態(tài)小于STOP,也就是RUNNING或者SHUTDOWN狀態(tài)下,同時傳入的任務(wù)實例firstTask為null,則需要添加到工作線程集合和啟動新的Worker
// 對于2,換言之,如果線程池處于SHUTDOWN狀態(tài)下,同時傳入的任務(wù)實例firstTask不為null,則不會添加到工作線程集合和啟動新的Worker
// 這一步其實有可能創(chuàng)建了新的Worker實例但是并不啟動(臨時對象,沒有任何強引用),這種Worker有可能成功下一輪GC被收集的垃圾對象
if (isRunning(c) ||
(runStateLessThan(c, STOP) && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
// 把創(chuàng)建的工作線程實例添加到工作線程集合
workers.add(w);
int s = workers.size();
// 嘗試更新歷史峰值工作線程數(shù),也就是線程池峰值容量
if (s > largestPoolSize)
largestPoolSize = s;
// 這里更新工作線程是否啟動成功標(biāo)識為true,后面才會調(diào)用Thread#start()方法啟動真實的線程實例
workerAdded = true;
}
} finally {
mainLock.unlock();
}
// 如果成功添加工作線程,則調(diào)用Worker內(nèi)部的線程實例t的Thread#start()方法啟動真實的線程實例
if (workerAdded) {
t.start();
// 標(biāo)記線程啟動成功
workerStarted = true;
}
}
} finally {
// 線程啟動失敗,需要從工作線程集合移除對應(yīng)的Worker
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
// 添加Worker失敗
private void addWorkerFailed(Worker w) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 從工作線程集合移除之
if (w != null)
workers.remove(w);
// wc數(shù)量減1
decrementWorkerCount();
// 基于狀態(tài)判斷嘗試終結(jié)線程池
tryTerminate();
} finally {
mainLock.unlock();
}
}
筆者發(fā)現(xiàn)了Doug Lea大神十分喜歡復(fù)雜的條件判斷,而且單行復(fù)雜判斷不喜歡加花括號,像下面這種代碼在他編寫的很多類庫中都比較常見:
if (runStateAtLeast(c, SHUTDOWN)
&& (runStateAtLeast(c, STOP)
|| firstTask != null
|| workQueue.isEmpty()))
return false;
// ....
// 代碼拆分一下如下
boolean atLeastShutdown = runStateAtLeast(c, SHUTDOWN); # rs >= SHUTDOWN(0)
boolean atLeastStop = runStateAtLeast(c, STOP) || firstTask != null || workQueue.isEmpty();
if (atLeastShutdown && atLeastStop){
return false;
}
上面的分析邏輯中需要注意一點,Worker實例創(chuàng)建的同時,在其構(gòu)造函數(shù)中會通過ThreadFactory創(chuàng)建一個Java線程Thread實例,后面會加鎖后二次檢查是否需要把Worker實例添加到工作線程集合workers中和是否需要啟動Worker中持有的Thread實例,只有啟動了Thread實例實例,Worker才真正開始運作,否則只是一個無用的臨時對象。Worker本身也實現(xiàn)了Runnable接口,它可以看成是一個Runnable的適配器。
工作線程內(nèi)部類Worker源碼分析
線程池中的每一個具體的工作線程被包裝為內(nèi)部類Worker實例,Worker繼承于AbstractQueuedSynchronizer(AQS),實現(xiàn)了Runnable接口:
private final class Worker extends AbstractQueuedSynchronizer implements Runnable{
/**
* This class will never be serialized, but we provide a
* serialVersionUID to suppress a javac warning.
*/
private static final long serialVersionUID = 6138294804551838833L;
// 保存ThreadFactory創(chuàng)建的線程實例,如果ThreadFactory創(chuàng)建線程失敗則為null
final Thread thread;
// 保存?zhèn)魅氲腞unnable任務(wù)實例
Runnable firstTask;
// 記錄每個線程完成的任務(wù)總數(shù)
volatile long completedTasks;
// 唯一的構(gòu)造函數(shù),傳入任務(wù)實例firstTask,注意可以為null
Worker(Runnable firstTask) {
// 禁止線程中斷,直到runWorker()方法執(zhí)行
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
// 通過ThreadFactory創(chuàng)建線程實例,注意一下Worker實例自身作為Runnable用于創(chuàng)建新的線程實例
this.thread = getThreadFactory().newThread(this);
}
// 委托到外部的runWorker()方法,注意runWorker()方法是線程池的方法,而不是Worker的方法
public void run() {
runWorker(this);
}
// Lock methods
//
// The value 0 represents the unlocked state.
// The value 1 represents the locked state.
// 是否持有獨占鎖,state值為1的時候表示持有鎖,state值為0的時候表示已經(jīng)釋放鎖
protected boolean isHeldExclusively() {
return getState() != 0;
}
// 獨占模式下嘗試獲取資源,這里沒有判斷傳入的變量,直接CAS判斷0更新為1是否成功,成功則設(shè)置獨占線程為當(dāng)前線程
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
// 獨占模式下嘗試是否資源,這里沒有判斷傳入的變量,直接把state設(shè)置為0
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
// 加鎖
public void lock() { acquire(1); }
// 嘗試加鎖
public boolean tryLock() { return tryAcquire(1); }
// 解鎖
public void unlock() { release(1); }
// 是否鎖定
public boolean isLocked() { return isHeldExclusively(); }
// 啟動后進(jìn)行線程中斷,注意這里會判斷線程實例的中斷標(biāo)志位是否為false,只有中斷標(biāo)志位為false才會中斷
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
Worker的構(gòu)造函數(shù)里面的邏輯十分重要,通過ThreadFactory創(chuàng)建的Thread實例同時傳入Worker實例,因為Worker本身實現(xiàn)了Runnable,所以可以作為任務(wù)提交到線程中執(zhí)行。只要Worker持有的線程實例w調(diào)用Thread#start()方法就能在合適時機執(zhí)行Worker#run()。簡化一下邏輯如下:
// addWorker()方法中構(gòu)造
Worker worker = createWorker();
// 通過線程池構(gòu)造時候傳入
ThreadFactory threadFactory = getThreadFactory();
// Worker構(gòu)造函數(shù)中
Thread thread = threadFactory.newThread(worker);
// addWorker()方法中啟動
thread.start();
Worker繼承自AQS,這里使用了AQS的獨占模式,有個技巧是構(gòu)造Worker的時候,把AQS的資源(狀態(tài))通過setState(-1)設(shè)置為-1,這是因為Worker實例剛創(chuàng)建時AQS中state的默認(rèn)值為0,此時線程尚未啟動,不能在這個時候進(jìn)行線程中斷,見Worker#interruptIfStarted()方法。Worker中兩個覆蓋AQS的方法tryAcquire()和tryRelease()都沒有判斷外部傳入的變量,前者直接CAS(0,1),后者直接setState(0)。接著看核心方法ThreadPoolExecutor#runWorker():
final void runWorker(Worker w) {
// 獲取當(dāng)前線程,實際上和Worker持有的線程實例是相同的
Thread wt = Thread.currentThread();
// 獲取Worker中持有的初始化時傳入的任務(wù)對象,這里注意存放在臨時變量task中
Runnable task = w.firstTask;
// 設(shè)置Worker中持有的初始化時傳入的任務(wù)對象為null
w.firstTask = null;
// 由于Worker初始化時AQS中state設(shè)置為-1,這里要先做一次解鎖把state更新為0,允許線程中斷
w.unlock(); // allow interrupts
// 記錄線程是否因為用戶異常終結(jié),默認(rèn)是true
boolean completedAbruptly = true;
try {
// 初始化任務(wù)對象不為null,或者從任務(wù)隊列獲取任務(wù)不為空(從任務(wù)隊列獲取到的任務(wù)會更新到臨時變量task中)
// getTask()由于使用了阻塞隊列,這個while循環(huán)如果命中后半段會處于阻塞或者超時阻塞狀態(tài),getTask()返回為null會導(dǎo)致線程跳出死循環(huán)使線程終結(jié)
while (task != null || (task = getTask()) != null) {
// Worker加鎖,本質(zhì)是AQS獲取資源并且嘗試CAS更新state由0更變?yōu)?
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
// 如果線程池正在停止(也就是由RUNNING或者SHUTDOWN狀態(tài)向STOP狀態(tài)變更),那么要確保當(dāng)前工作線程是中斷狀態(tài)
// 否則,要保證當(dāng)前線程不是中斷狀態(tài)
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
// 鉤子方法,任務(wù)執(zhí)行前
beforeExecute(wt, task);
try {
task.run();
// 鉤子方法,任務(wù)執(zhí)行后 - 正常情況
afterExecute(task, null);
} catch (Throwable ex) {
// 鉤子方法,任務(wù)執(zhí)行后 - 異常情況
afterExecute(task, ex);
throw ex;
}
} finally {
// 清空task臨時變量,這個很重要,否則while會死循環(huán)執(zhí)行同一個task
task = null;
// 累加Worker完成的任務(wù)數(shù)
w.completedTasks++;
// Worker解鎖,本質(zhì)是AQS釋放資源,設(shè)置state為0
w.unlock();
}
}
// 走到這里說明某一次getTask()返回為null,線程正常退出
completedAbruptly = false;
} finally {
// 處理線程退出,completedAbruptly為true說明由于用戶異常導(dǎo)致線程非正常退出
processWorkerExit(w, completedAbruptly);
}
}
這里重點拆解分析一下判斷當(dāng)前工作線程中斷狀態(tài)的代碼:
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
// 先簡化一下判斷邏輯,如下
// 判斷線程池狀態(tài)是否至少為STOP,rs >= STOP(1)
boolean atLeastStop = runStateAtLeast(ctl.get(), STOP);
// 判斷線程池狀態(tài)是否至少為STOP,同時判斷當(dāng)前線程的中斷狀態(tài)并且清空當(dāng)前線程的中斷狀態(tài)
boolean interruptedAndAtLeastStop = Thread.interrupted() && runStateAtLeast(ctl.get(), STOP);
if (atLeastStop || interruptedAndAtLeastStop && !wt.isInterrupted()){
wt.interrupt();
}
Thread.interrupted()方法獲取線程的中斷狀態(tài)同時會清空該中斷狀態(tài),這里之所以會調(diào)用這個方法是因為在執(zhí)行上面這個if邏輯同時外部有可能調(diào)用shutdownNow()方法,shutdownNow()方法中也存在中斷所有Worker線程的邏輯,但是由于shutdownNow()方法中會遍歷所有Worker做線程中斷,有可能無法及時在任務(wù)提交到Worker執(zhí)行之前進(jìn)行中斷,所以這個中斷邏輯會在Worker內(nèi)部執(zhí)行,就是if代碼塊的邏輯。
這里還要注意的是:STOP狀態(tài)下會拒絕所有新提交的任務(wù),不會再執(zhí)行任務(wù)隊列中的任務(wù),同時會中斷所有Worker線程。
也就是,即使任務(wù)Runnable已經(jīng)runWorker()中前半段邏輯取出,只要還沒走到調(diào)用其Runnable#run(),都有可能被中斷。
假設(shè)剛好發(fā)生了進(jìn)入if代碼塊的邏輯同時外部調(diào)用了shutdownNow()方法,那么if邏輯內(nèi)會判斷線程中斷狀態(tài)并且重置,那么shutdownNow()方法中調(diào)用的interruptWorkers()就不會因為中斷狀態(tài)判斷出現(xiàn)問題導(dǎo)致二次中斷線程(會導(dǎo)致異常)。
小結(jié)一下上面runWorker()方法的核心流程:
-
Worker先執(zhí)行一次解鎖操作,用于解除不可中斷狀態(tài)。 - 通過
while循環(huán)調(diào)用getTask()方法從任務(wù)隊列中獲取任務(wù)(當(dāng)然,首輪循環(huán)也有可能是外部傳入的firstTask任務(wù)實例)。 - 如果線程池更變?yōu)?code>STOP狀態(tài),則需要確保工作線程是中斷狀態(tài)并且進(jìn)行中斷處理,否則要保證工作線程必須不是中斷狀態(tài)。
- 執(zhí)行任務(wù)實例
Runnale#run()方法,任務(wù)實例執(zhí)行之前和之后(包括正常執(zhí)行完畢和異常執(zhí)行情況)分別會調(diào)用鉤子方法beforeExecute()和afterExecute()。 -
while循環(huán)跳出意味著runWorker()方法結(jié)束和工作線程生命周期結(jié)束(Worker#run()生命周期完結(jié)),會調(diào)用processWorkerExit()處理工作線程退出的后續(xù)工作。

寫在最后
歡迎大家關(guān)注我的公眾號【風(fēng)平浪靜如碼】,海量Java相關(guān)文章,學(xué)習(xí)資料都會在里面更新,整理的資料也會放在里面。
覺得寫的還不錯的就點個贊,加個關(guān)注唄!點關(guān)注,不迷路,持續(xù)更新?。?!