1 線程池原理

所謂線程池,就是有一個池子,里面存放著已經(jīng)創(chuàng)建好的線程,當有任務(wù)提交個線程池執(zhí)行時,池子中的某個線程會主動執(zhí)行該任務(wù).如果池子中的線程不夠應(yīng)付數(shù)量眾多的任務(wù)時,則需要自動擴充新的線程到池子中,但是該數(shù)量是有限的;當任務(wù)較少時,池子中的線程又會自動回收,釋放資源。
一個完整的線程池應(yīng)該具備如下要素:
- 任務(wù)隊列:用于緩存提交的任務(wù)。
- 線程數(shù)量管理功能:一個線程池必須能夠很好地管理和控制線程數(shù)量,可通過如下三個參數(shù)實現(xiàn):創(chuàng)建線程池時初始地線程數(shù)量init;線程池自動填充時最大地線程數(shù)量max;在線程池空閑時需要釋放線程但是也要維護一定數(shù)量地活躍數(shù)量或者核心數(shù)量core。
- 任務(wù)拒絕策略:如果線程數(shù)量已達到上線且任務(wù)隊列已滿,則需要有相應(yīng)地拒絕策略來通知任務(wù)提交者。
- 線程工廠:主要用于個性化定制線程,比如將線程設(shè)置為守護線程以及設(shè)置線程名稱等。
- QueueSize:任務(wù)隊列主要存放提交地Runnable,但是為了防止內(nèi)存溢出,需要有l(wèi)imit數(shù)量對其進行控制。
- KeepAlive時間:該時間主要決定線程各個重要參數(shù)自動維護地時間間隔。
2 線程池具體實現(xiàn)
想要實現(xiàn)線程池就需要實現(xiàn)以下幾個要素功能:
- 線程池狀態(tài)
- 任務(wù)的執(zhí)行
- 線程池中的線程初始化
- 任務(wù)緩存隊列及排隊策略
- 任務(wù)拒絕策略
- 線程池的關(guān)閉
- 線程池容量的動態(tài)調(diào)整
因為java5引入了concurrent包,里面就包含一些線程池的實現(xiàn)類,我們就不必再自己手動實現(xiàn)一遍了。
大體類關(guān)系:

2.1 線程池狀態(tài)
在ThreadPoolExecutor中定義了一個ctl變量用于保存線程池狀態(tài)。
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
不過這個ctl不只是保存線程池狀態(tài)那么簡單。根據(jù)注釋:
ctl是一個原子整型變量,它是由worker數(shù)量(有效的線程數(shù)量)和runState(是否處于運行,關(guān)閉等狀態(tài))兩個概念組成的。
為了將兩個概念打包成一個整數(shù),作者限制workerCount到2^29-1(約等于500,000,000)而不是2^31-1(約為二十億)。
COUNT_BITS表示移位個數(shù),Integer.SIZE的值為32,所以29就可以表示為Integer.SIZE-3
CAPACITY表示容量,那么2^29-1就可以表示為(c << COUNT_BITS)-1
相當于就是用整型的0到28位表示線程數(shù)量,用29到31位表示線程池狀態(tài)。
運行狀態(tài)表示:
- RUNNING:
- 可接受新任務(wù)且可以處理已添加的任務(wù)
- 線程池的初始化狀態(tài)是RUNNING。換句話說,線程池被一旦被創(chuàng)建,就處于RUNNING狀態(tài),并且線程池中的任務(wù)數(shù)為0!
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
- SHUTDOWN
- 不能再接受新任務(wù),但是可以繼續(xù)執(zhí)行已添加的任務(wù)
- 調(diào)用線程池的shutdown()接口時,線程池由RUNNING -> SHUTDOWN
- STOP:
- 不能再接受新任務(wù),也不處理已添加的任務(wù),并且會中斷執(zhí)行中的任務(wù)。
- 調(diào)用線程池的shutdownNow()接口時,線程池由(RUNNING or SHUTDOWN ) -> STOP。
- TIDYING:
- 當前所有任務(wù)已終止,ctr記錄的workCount為0,線程池會變?yōu)門IDYING狀態(tài)。當線程池變?yōu)門IDYING狀態(tài)時,會執(zhí)行鉤子函數(shù)terminated()。terminated()在ThreadPoolExecutor類中實現(xiàn)是空的,若用戶想在線程池變?yōu)門IDYING時,進行相應(yīng)的處理;可以通過重載terminated()函數(shù)來實現(xiàn)。
- 當線程池在STOP狀態(tài)下,線程池中執(zhí)行的任務(wù)為空時,就會由STOP -> TIDYING
- TERMINATED
- 線程池徹底終止,就變成TERMINATED狀態(tài)
- 線程池處在TIDYING狀態(tài)時,執(zhí)行完terminated()之后,就會由 TIDYING -> TERMINATED。

還有幾個對ctl進行計算的方法:
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
- runStateOf:獲取運行狀態(tài)
- workerCountOf:獲取活動線程數(shù)
- ctlOf:獲取運行狀態(tài)和活動線程數(shù)的值
另外還有6個方法與ctr相關(guān):
// 比較大小
private static boolean runStateLessThan(int c, int s) {
return c < s;
}
// 比較大小
private static boolean runStateAtLeast(int c, int s) {
return c >= s;
}
// 判斷state是否在運行狀態(tài)
private static boolean isRunning(int c) {
return c < SHUTDOWN;
}
/**
* Attempts to CAS-increment the workerCount field of ctl.
*/
private boolean compareAndIncrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect + 1);
}
/**
* Attempts to CAS-decrement the workerCount field of ctl.
*/
private boolean compareAndDecrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect - 1);
}
/**
* Decrements the workerCount field of ctl. This is called only on
* abrupt termination of a thread (see processWorkerExit). Other
* decrements are performed within getTask.
*/
private void decrementWorkerCount() {
do {} while (! compareAndDecrementWorkerCount(ctl.get()));
}
2.2 任務(wù)的執(zhí)行
首先要了解下線程池的一些成員:
// 這個隊列用于緩存任務(wù),將任務(wù)轉(zhuǎn)交給線程。
private final BlockingQueue<Runnable> workQueue;
// 線程池的主要狀態(tài)鎖,對線程池大小,runState都使用這個鎖
private final ReentrantLock mainLock = new ReentrantLock();
// 存放線程池所有工作線程的集合,當擁有mainLock時才能訪問
private final HashSet<Worker> workers = new HashSet<Worker>();
// 支持等待終止的等待條件
private final Condition termination = mainLock.newCondition();
// 記錄有過的最大的線程數(shù)
private int largestPoolSize;
// 完成的任務(wù)個數(shù),只能由工作線程更新
private long completedTaskCount;
// 線程工廠
private volatile ThreadFactory threadFactory;
// 拒絕策略執(zhí)行器:當執(zhí)行時線程池飽和、shutdown會用到
private volatile RejectedExecutionHandler handler;
// ?;顣r間:即空閑線程最長的等待時間
private volatile long keepAliveTime;
// 允許核心線程在空閑時用此時間來等待
private volatile boolean allowCoreThreadTimeOut;
// 核心線程數(shù)
private volatile int corePoolSize;
// 最大線程數(shù)
private volatile int maximumPoolSize;
然后還要了解下worker:因為 Doug Lea 把線程池中的線程包裝成了一個個 Worker,翻譯成工人,就是線程池中做任務(wù)的線程。所以到這里,我們知道任務(wù)是 Runnable(內(nèi)部叫 task 或 command),線程是 Worker。
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
/** 真正執(zhí)行任務(wù)的thread */
final Thread thread;
/** 為什么有firstTask?因為新建線程時可能會有一個現(xiàn)成的task可以執(zhí)行,它就可以當作該新建線程執(zhí)行的第一個任務(wù),如果沒有,可以后面到任務(wù)隊列里去取 */
Runnable firstTask;
/** 用于存放此線程完全的任務(wù)數(shù),注意了,這里用了 volatile,保證可見性*/
volatile long completedTasks;
// Worker 只有這一個構(gòu)造方法,傳入 firstTask,也可以傳 null
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
// 調(diào)用 ThreadFactory 來創(chuàng)建一個新的線程,這里創(chuàng)建的線程到時候用來執(zhí)行任務(wù)
this.thread = getThreadFactory().newThread(this);
}
/** 代理Thread的run方法,這里調(diào)用了外部類的 runWorker 方法 */
public void run() {
runWorker(this);
}
...
然后直接看execute()實現(xiàn):
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
// 如果當前線程數(shù)少于核心線程數(shù),那么直接添加一個 worker 來執(zhí)行任務(wù),
// 創(chuàng)建一個新的線程,并把當前任務(wù) command 作為這個線程的第一個任務(wù)(firstTask)
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
// 添加任務(wù)成功,那么就結(jié)束
return;
c = ctl.get();
}
// 如果到這里,要么當前線程數(shù)大于等于核心線程數(shù),要么剛剛 addWorker 失敗了
// 如果線程池處于 RUNNING 狀態(tài),把這個任務(wù)添加到任務(wù)隊列 workQueue 中
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 再次檢查,如果線程池已不處于 RUNNING 狀態(tài),那么移除已經(jīng)入隊的這個任務(wù),并且執(zhí)行拒絕策略
if (! isRunning(recheck) && remove(command))
reject(command);
// 如果當前線程數(shù)為0,則新建worker線程
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 如果線程池添加worker失敗,一般是線程數(shù)達到極限值,這樣的話作拒絕處理
else if (!addWorker(command, false))
reject(command);
}
總的邏輯注釋也有,主要有3步:
- 1 如果運行的線程數(shù)少于核心線程數(shù),就會新建一個線程并且將該任務(wù)作為該線程的第一個任務(wù)。用addWorker()來添加新線程,它會自動檢查runState和workerCount,如果它返回false,可以防止在不應(yīng)該添加線程時發(fā)生錯誤。
- 2 如果一個task任務(wù)被成功加入隊列,我們?nèi)匀恍枰俅螜z查我們是否成功添加work線程或者線程池已經(jīng)執(zhí)行了shutDown(),如果線程池已經(jīng)停止則需要回滾任務(wù)隊列事務(wù),或者是之前添加work線程失敗則重新嘗試新添加一個worker線程。
- 3 如果我們不能入隊一個任務(wù),則嘗試添加一個新線程。如果此操作失敗,那么應(yīng)該是線程池被shutDown或者飽和了,所以應(yīng)處理為拒絕
addWorker如何實現(xiàn)?
// 檢查一個worker在當前線程池狀態(tài)和給定的邊界(核心值或最大值)是否能被添加。如果是這樣,worker個數(shù)將得到相應(yīng)地調(diào)整,如果有可能,將創(chuàng)建并啟動一個新worker,并將運行firstTask作為其第一個任務(wù)。
// 如果線程池已停止或者shutdown,本方法會返回false;如果線程創(chuàng)建失敗也會返回false,原因要么是線程工廠返回null,要么是其它什么異常,這會方便回滾。
// firstTask 新線程應(yīng)該首先運行該任務(wù)(如果沒有該任務(wù),則為null)。當線程數(shù)少于corePoolSize線程時(在方法execute()中),使用初始化的第一個任務(wù)創(chuàng)建worker(在方法execute()中)來繞過隊列。最初,空閑線程通常是通過prestartCoreThread創(chuàng)建的,或者用來替換其他垂死的worker。
// core ture是設(shè)置corePoolSize作為邊界,否則用maximunPoolSize作為邊界
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
首先死循環(huán)檢查,注意這里有個retry用法:
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 如果狀態(tài)是stop或者是處于shutdown,隊列為空,返回false
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
// 如果線程數(shù)超過最大極限值或者設(shè)定的邊界值,返回false
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// workCount自增1結(jié)束循環(huán)
if (compareAndIncrementWorkerCount(c))
break retry;
// 否則,再次循環(huán)檢查,知道滿足上述條件
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
然后是新建一個worker,加入到集合,并且執(zhí)行worker,返回worker添加結(jié)果:
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
// 如果線程池狀態(tài)是running或者是shutdown,但是當前task不為空,滿足條件可以添加并執(zhí)行worker
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
// task對應(yīng)的Thread處于活躍狀態(tài),則拋出狀態(tài)異常
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
// 更新最大線程數(shù)
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
// worker成功添加,則執(zhí)行worker
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;