一 線程池中的一些重要概念
Worker / workers
Worker類(lèi)是ThreadPoolExecutor類(lèi)的一個(gè)內(nèi)部類(lèi),也是線程池管理操作線程的核心所在。每一個(gè)worker都對(duì)應(yīng)著一個(gè)thread,所以在不混淆的情況下,可以把worker理解為工作線程。
ThreadPoolExecutor有一個(gè)名為workers的成員變量,它保存了這個(gè)線程池所有存活的worker對(duì)象。
workQueue
workQueue是線程池內(nèi)部用來(lái)保存待執(zhí)行任務(wù)的隊(duì)列。它是一個(gè)BlockingQueue<Runnable>類(lèi)型的變量,在沒(méi)有任務(wù)的時(shí)候,它的poll()方法會(huì)阻塞。
在一個(gè)允許超時(shí)的worker執(zhí)行完任務(wù)之后,會(huì)調(diào)用workQueue.poll()取出下一個(gè)任務(wù)執(zhí)行。如果沒(méi)有任務(wù),則會(huì)在這里阻塞;當(dāng)阻塞時(shí)間達(dá)到超時(shí)時(shí)間后,這個(gè)工作線程會(huì)退出并銷(xiāo)毀。
一些簡(jiǎn)單的線程池使用方法,可以看看這篇文章Android 多線程之線程池(一)
二 源碼分析
ctl
ThreadPoolExecutor通過(guò)一個(gè)原子整型ctl來(lái)保存線程池的兩個(gè)重要字段,workerCount和runState。workerCount即線程池工作線程的數(shù)量,而runState代表了線程池當(dāng)前的狀態(tài)(如:運(yùn)行中、關(guān)閉、終止)。通過(guò)位運(yùn)算,可以從ctl得到workerCount和runState的值,反之也可以通過(guò)workerCount和runState組合得到ctl。
//ThreadPoolExecutor.java
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
execute(runnable)
//ThreadPoolExecutor.java
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
當(dāng)線程池執(zhí)行一個(gè)任務(wù)會(huì)調(diào)這個(gè)方法進(jìn)來(lái),回想下之前講過(guò)線程池的工作流程是怎么樣的Android 多線程之線程池(一),再來(lái)看看源碼會(huì)加深印象。
分析:
1、先判斷任務(wù)是否為空,如果為空直接拋異常。
2、如果當(dāng)前運(yùn)行的線程小于核心線程數(shù),那么就去addWorker(command, true),這個(gè)方式返回一個(gè)Boolen值,意思是創(chuàng)建一個(gè)worker是否成功,第一個(gè)參數(shù)是任務(wù),第二個(gè)參數(shù)是創(chuàng)建的worker的線程是否為核心線程。
3、如果當(dāng)前線程池是運(yùn)作的,并且這個(gè)任務(wù)workQueue.offer(command)添加進(jìn)隊(duì)列,那么繼續(xù)往下走,如果添加進(jìn)隊(duì)列失敗,則去addWorker(command, false)創(chuàng)建一個(gè)普通線程并把任務(wù)對(duì)給他執(zhí)行,如果創(chuàng)建失敗,則拒絕這個(gè)任務(wù)。
4、如果3的條件重新取到ctl,如果這個(gè)線程池不是運(yùn)作的,并且這個(gè)任務(wù)已經(jīng)在任務(wù)隊(duì)列移除了,那么直接拒絕;如果上述條件不成立則判斷workerCountOf(recheck) == 0 線程池沒(méi)有工作線程的時(shí)候,則去創(chuàng)建普通線程的worker。

addWorker(Runnable firstTask, boolean core)
//ThreadPoolExecutor.java
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
先去判斷線程池的狀態(tài)if (rs >= SHUTDOWN &&!(rs == SHUTDOWN &&firstTask == null && ! workQueue.isEmpty()))return false;
直接返回false有以下3種情況:
1、線程池狀態(tài)為STOP、TIDYING、TERMINATED
2、線程池狀態(tài)不是running狀態(tài),并且firstTask不為空
3、線程池狀態(tài)不是running狀態(tài),并且工作隊(duì)列為空
當(dāng)返回false時(shí)則進(jìn)入拒絕策略
之后去判斷
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
addWorker的core的值,如果是核心線程,當(dāng)前的workerCountOf大于corePoolSize則返回false;如果是非核心線程,當(dāng)前的workerCountOf大于maximumPoolSize則返回false,當(dāng)返回false時(shí)則進(jìn)入拒絕策略
如果上述條件滿(mǎn)足,compareAndIncrementWorkerCount(c)表示workerCountOf+1
接下來(lái)就是去創(chuàng)建一個(gè)worker了,相當(dāng)于創(chuàng)建一個(gè)工作線程,之后把work添加到隊(duì)列里面workers.add(w);如果workers.size() > largestPoolSize 當(dāng)前的工作線程(包含未啟動(dòng))大于最大工作線程,那么最大工作線程設(shè)置為隊(duì)列的工作線程
if (workerAdded) {
t.start();
workerStarted = true;
}
執(zhí)行work對(duì)象里面的Thread
ThreadPoolExecutor.Worker(Runnable firstTask)、runWorker(Worker)
//ThreadPoolExecutor.java
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
public void run() {
runWorker(this);
}
剛才有提及到worker可以理解為工作線程,原因是worker.thread是工作線程,而worker.firstTask為該線程處理的任務(wù)
當(dāng)Worker的線程開(kāi)始運(yùn)行之后,會(huì)調(diào)用其run()方法:runWorker(this)
// 省略一大部分
final void runWorker(Worker w) {
Runnable task = w.firstTask;
while (task != null || (task = getTask()) != null) {
try {
task.run();
} catch (Exception x) {
}
}
}
// 省略一大部分
private Runnable getTask() {
for (;;) {
if (/*無(wú)法獲取任務(wù)*/) {
return null;
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
} catch (InterruptedException retry) {
}
}
}
為了便于觀看,只留了核心的幾行,如果傳入的task不為null,則執(zhí)行相應(yīng)的run方法;當(dāng)傳入的task為null,則會(huì)從getTask()方法中獲取runnable對(duì)象
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
} catch (InterruptedException retry) { }
這里面通過(guò)timed變量判斷是使用poll還是task
poll:如果有值,則立馬返回。否則等待一段時(shí)間,該時(shí)間內(nèi)阻塞,時(shí)間結(jié)束后,隊(duì)列還是沒(méi)有元素則返回null。
task:如果有值,則立馬返回。否則一直阻塞。
什么時(shí)候返回null,不讓worker繼續(xù)存活了呢?
1、線程池被shutdown,并且任務(wù)隊(duì)列空了;
2、線程池超容量;
3、超時(shí);
用一張圖總結(jié)
