引言
最近看了一個(gè)開(kāi)源庫(kù)的改動(dòng),其中里面的一個(gè)代碼改動(dòng)引起了我的好奇

,可以看到作者將阻塞隊(duì)列從LinkedBlockingQueue換成了SynchronousQueue。那么問(wèn)題來(lái)了,作者為什么要進(jìn)行這種更改。對(duì)于線(xiàn)程池的使用,相信大家即使沒(méi)看過(guò)源碼,面試中也會(huì)不可避免的總會(huì)被問(wèn)到一些問(wèn)題,背都背會(huì)了。但是欠下的技術(shù)債遲早哪天是要還的,不明白原理,用起來(lái)心里沒(méi)底。
再比如這個(gè)問(wèn)題
ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(0, 10, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
for (int i = 0; i < 10; i++) {
int finalI = i;
poolExecutor.execute(new Runnable() {
@Override
public void run() {
Log.e("mandy", "run" + finalI);
}
});
}
打印的結(jié)果是什么,如果你以為是隨機(jī)打印,說(shuō)明你對(duì)線(xiàn)程池的理解還沒(méi)到位,運(yùn)行下代碼就知道結(jié)果。
現(xiàn)在網(wǎng)上都這么多優(yōu)秀的線(xiàn)程池源碼分析文章了,為什么還要去寫(xiě),其實(shí)之所以寫(xiě)這篇關(guān)于線(xiàn)程池的源碼分析,主要有以下兩個(gè)原因:
(1) 證明我看過(guò)線(xiàn)程池的源碼,沒(méi)錯(cuò)就是裝逼用,不是那種背兩個(gè)面試題就說(shuō)自己會(huì)線(xiàn)程池的老鐵
(2) 看別人的文章總是會(huì)有一些理解不了地方,還需要自己去翻源碼進(jìn)一步消化,不如寫(xiě)一篇給自己看的文章,好記性不如爛筆頭。其實(shí)這不是我第一次看線(xiàn)程池的源碼,一段時(shí)間后有些關(guān)鍵實(shí)現(xiàn)總會(huì)想不起來(lái),把自己疑惑的點(diǎn)寫(xiě)出來(lái),將來(lái)直接看自己的文章做到查漏補(bǔ)缺的作用,這才是寫(xiě)這篇文章的主要目的。
線(xiàn)程池類(lèi)之間的關(guān)系
下面進(jìn)入正題,來(lái)看下和線(xiàn)程池有關(guān)的類(lèi)
Executor:最底層的接口,內(nèi)部只包含一個(gè)execute方法
public interface Executor {
void execute(Runnable command);
}
ExecutorService:接口,繼承自Executor接口,內(nèi)部包含了shutDown,submit等線(xiàn)程池使用的方法,看下它的繼承關(guān)系
public interface ExecutorService extends Executor
AbstractExecutorService:實(shí)現(xiàn)了ExecutorService大部分方法,但沒(méi)有實(shí)現(xiàn)execute方法,該類(lèi)中提供了一系列的submit方法,線(xiàn)程池要提交任務(wù)最終都會(huì)調(diào)用這個(gè)方法,來(lái)隨便看一個(gè)submit方法
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}
通過(guò)newTaskFor生成RunnableFuture,然后調(diào)用execute執(zhí)行任務(wù)。newTaskFor最終會(huì)生成一個(gè)FutureTask類(lèi),關(guān)于FutureTask不就詳細(xì)說(shuō)了,之前寫(xiě)過(guò)一篇文章分析過(guò)Asynctask源碼分析,不止于此,感興趣的可以看看,你可以將FutrueTask當(dāng)成一個(gè)類(lèi)似Runnable的類(lèi)可以執(zhí)行任務(wù)。
所以最核心的還是execute方法的實(shí)現(xiàn),最終的實(shí)現(xiàn)類(lèi)就是ThreadPoolExecutor了。而另一個(gè)和ThreadPoolExecutor相關(guān)的類(lèi)就是Executors,內(nèi)部提供了各種現(xiàn)成的線(xiàn)程池使用,本質(zhì)上就是一個(gè)工具類(lèi)。到此就將Executor,Executors,ExecutorService,AbstractExecutorService,ThreadPoolExecutor之間的關(guān)系表述清楚了。核心還是execute方法。
ThreadPoolExecutor成員字段
要想看明白execute方法的源碼,就得先來(lái)理解下ThreadPoolExecutor中的一些成員變量以及成員字段的含義,等理解之后再看execute方法。看下關(guān)鍵的成員字段
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
// Packing and unpacking ctl
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
線(xiàn)程池有5種狀態(tài),分別為RUNNING,SHUTDOWN,STOP,TIDYING,TERMINATED,所以需要3個(gè)比特位才能表示。一個(gè)Integer32個(gè)比特長(zhǎng)度,線(xiàn)程池中通過(guò)一個(gè)Integer長(zhǎng)度來(lái)表示工作線(xiàn)程數(shù)和線(xiàn)程池的狀態(tài),高3位表示狀態(tài)后29位表示工作線(xiàn)程數(shù)量,這種方式類(lèi)似于android中的MeasureSpec使用方式。
runStateOf和workerCountOf就是通過(guò)位運(yùn)算得到線(xiàn)程池狀態(tài)和工作線(xiàn)程數(shù)量。ctlOf就是將狀態(tài)和數(shù)量整合成一個(gè)Integer??梢钥吹匠跏蓟臅r(shí)候,ctl為AtomicInteger(ctlOf(RUNNING, 0));,表示線(xiàn)程池處于運(yùn)行狀態(tài),此時(shí)工作線(xiàn)程為0個(gè)。
Worker
上面介紹了線(xiàn)程池中有關(guān)的成員,接下來(lái)介紹下線(xiàn)程池中一個(gè)很重要的成員變量Worker,來(lái)看下源碼
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
/**
* This class will never be serialized, but we provide a
* serialVersionUID to suppress a javac warning.
*/
private static final long serialVersionUID = 6138294804551838833L;
/** Thread this worker is running in. Null if factory fails. */
final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;
......
}
thread成員變量:每一個(gè)Worker都可以看成一個(gè)thread,當(dāng)Worker被創(chuàng)建時(shí)就會(huì)生成一個(gè)對(duì)應(yīng)的thread保存到這個(gè)變量中。
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
從Worker的繼承關(guān)系可以看出它是一個(gè)AQS的子類(lèi),關(guān)于AQS可以說(shuō)是實(shí)現(xiàn)并發(fā)的核心基礎(chǔ),這里不展開(kāi)說(shuō),網(wǎng)上有很多優(yōu)秀的分析文章,這里只需要知道AQS可以實(shí)現(xiàn)同步操作即可。Worker構(gòu)造函數(shù)通過(guò)調(diào)用setState(-1)將標(biāo)志位state設(shè)置為-1,在runWorker方法中又通過(guò)unlock操作重新將state設(shè)置為0。這么做的目的和shutDownNow有一定的關(guān)系。
來(lái)看下shutDownNow源碼
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(STOP);
interruptWorkers();
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}
里面調(diào)用了 interruptWorkers();關(guān)閉所有的Worker,而interruptWorkers源碼就是通過(guò)worker的state來(lái)決定是否調(diào)用中斷操作。在worker還沒(méi)真正運(yùn)行起來(lái)之前state為-1,那么interruptWorkers也就不需要中斷worker,而如果運(yùn)行起來(lái)后state設(shè)置為0滿(mǎn)足關(guān)閉條件。
Worker中的另一個(gè)成員變量firstTask,從命名可以看出表示是否屬于Worker的第一個(gè)task。Worker運(yùn)行起來(lái)后獲取的task有兩個(gè)途徑,其一就是在Worker創(chuàng)建的時(shí)候通過(guò)構(gòu)造函數(shù)自帶過(guò)來(lái),另一個(gè)途徑就是從阻塞隊(duì)列中去取。如果從第二種途徑獲取到那么firstTask即為null。
execute源碼
搞明白線(xiàn)程池的成員字段和Worker的作用之后,再來(lái)看execute源碼就比較清楚了。
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
可以看出道格.利大佬在寫(xiě)代碼的時(shí)候真的就是言簡(jiǎn)意賅,每個(gè)函數(shù)都顯得十分的短小精悍,這點(diǎn)在看ReentrantLock和AQS源碼的時(shí)候也體現(xiàn)的非常明顯?;氐缴鲜鲈创a中,首先通過(guò)ctl獲取到當(dāng)前線(xiàn)程池的狀態(tài),然后通過(guò)workerCountOf判斷worker的工作數(shù)量,如果小于corePoolSize則說(shuō)明worker數(shù)量少于指定核心線(xiàn)程數(shù),通過(guò)addWorker再去啟動(dòng)線(xiàn)程。
關(guān)于addWorker的大致邏輯就是生成一個(gè)Woker對(duì)象,然后將worker內(nèi)部的thread啟動(dòng)起來(lái)去處理task,詳細(xì)源碼等下分析,這里知道個(gè)大概即可。如果worker數(shù)量大于corePoolSize那么直接進(jìn)入
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
之前說(shuō)過(guò)線(xiàn)程池有5種狀態(tài),只有在running狀態(tài)并且阻塞隊(duì)列可以存儲(chǔ)元素的時(shí)候才會(huì)執(zhí)行內(nèi)部邏輯,在內(nèi)部邏輯當(dāng)中又會(huì)進(jìn)行一次isRunning判斷,這種雙重判斷和單例的doublecheck是不是有些類(lèi)似,如果發(fā)現(xiàn)線(xiàn)程池不是running狀態(tài)則直接調(diào)用拒絕策略對(duì)該command進(jìn)行處理。否則進(jìn)行workerCountOf(recheck) == 0的判斷。這里可以看出只有在worker一個(gè)都沒(méi)啟動(dòng)的時(shí)候才會(huì)執(zhí)行addWorker操作,否則就是塞到阻塞隊(duì)列就完事了。
這段代碼比較關(guān)鍵,可以看出只要worker啟動(dòng)了一個(gè)那么addWorker就不會(huì)被執(zhí)行到。換句話(huà)說(shuō)核心線(xiàn)程都已經(jīng)啟動(dòng)的情況下,只要阻塞隊(duì)列還能容納command,那么永遠(yuǎn)不會(huì)addWorker去啟動(dòng)一個(gè)臨時(shí)線(xiàn)程,或者說(shuō)線(xiàn)程池不允許有核心線(xiàn)程,那么只會(huì)啟動(dòng)一個(gè)臨時(shí)線(xiàn)程。直到阻塞隊(duì)列offer失敗才會(huì)進(jìn)入到最后的那段邏輯,而阻塞隊(duì)列是否能offer成功就和具體的阻塞隊(duì)列實(shí)現(xiàn)有關(guān)系了。
else if (!addWorker(command, false))
reject(command);
分析完execute之后就會(huì)發(fā)現(xiàn)addWorker才是問(wèn)題關(guān)鍵,execute的三個(gè)分支都調(diào)用到了該方法,如果留心的話(huà)會(huì)發(fā)現(xiàn)一些地方調(diào)用的是addWorker(null)而另一些地方調(diào)用的是addWorker(command)那么接下來(lái)看下addWorker是如何啟動(dòng)一個(gè)線(xiàn)程,并源源不斷的去執(zhí)行task。
addWorker
先說(shuō)下addWorker(null)和addWorker(command)的區(qū)別,如果阻塞隊(duì)列還能容納下的話(huà),那么worker從隊(duì)列取task執(zhí)行即可,此時(shí)調(diào)用addWorker(null),如果隊(duì)列滿(mǎn)了或者說(shuō)直接啟動(dòng)核心線(xiàn)程那么首個(gè)task就不會(huì)從隊(duì)列去取,此時(shí)就需要調(diào)用addWorker(command)傳入需要被執(zhí)行的task。這也是Worker中firstTask的含義。搞明白這些區(qū)別后再看addWorker的源碼
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
源碼中會(huì)首先對(duì)線(xiàn)程池的狀態(tài)進(jìn)行一些判斷,如果線(xiàn)程池調(diào)用了shutDown關(guān)閉了,那么直接返回,shutdown之后submit的任何task都不會(huì)被執(zhí)行。如果沒(méi)有shutdown那么繼續(xù)下面的邏輯判斷確保wc是正確的范圍之內(nèi),然后通過(guò)CAS操作將worker的數(shù)量加1,表示將有一個(gè)新的線(xiàn)程馬上要被啟動(dòng)了。最后判斷下線(xiàn)程池的狀態(tài)是否正確。一切正常之后接下來(lái)進(jìn)行啟動(dòng)線(xiàn)程的操作。
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
就不一行行分析了,都能看懂,核心就是t.start的調(diào)用,啟動(dòng)了一個(gè)新的線(xiàn)程。新啟動(dòng)的線(xiàn)程會(huì)執(zhí)行runnable中的run方法,而worker就是一個(gè)runnable實(shí)現(xiàn),所以最終調(diào)用到了worker的runworker方法。
runWorker
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
首先將firstTask復(fù)制給task,關(guān)于firstTask前面都已經(jīng)分析了,不再累述,unlock方法將原本的-1重置回了0,已經(jīng)在文章分析過(guò)了。核心就是啟動(dòng)的這個(gè)線(xiàn)程通過(guò)一個(gè)while循環(huán)去不斷的執(zhí)行task,task來(lái)源有兩個(gè)地方,firstTask或者getTask方法。getTask等下分析,如果task為null則直接調(diào)用processWorkerExit結(jié)束該線(xiàn)程,內(nèi)部將worker數(shù)量減1。
重點(diǎn)看下task不為null的情況,如果線(xiàn)程池調(diào)用了shutDownNow方法,那么會(huì)將該線(xiàn)程設(shè)置為中斷狀態(tài),所以接下來(lái)的task.run被執(zhí)行時(shí)養(yǎng)成良好的習(xí)慣,判斷下線(xiàn)程是否被中斷才是正確的處理方式。每次while循環(huán)執(zhí)行一遍后都會(huì)重新通過(guò)getTask去獲取新的task,這也是線(xiàn)程池為什么能高效利用線(xiàn)程的關(guān)鍵。
getTask
getTask實(shí)際上就是一個(gè)可以阻塞的方法,有task直接返回,沒(méi)有的話(huà)就會(huì)被阻塞住。阻塞的方法有兩種分別為阻塞隊(duì)列的take和poll,如果是核心線(xiàn)程阻塞那么調(diào)用take,該方法會(huì)一直阻塞,這也是核心線(xiàn)程為什么不會(huì)消亡的原因,而如果是臨時(shí)線(xiàn)程則調(diào)用poll方法阻塞,該方法傳入阻塞時(shí)間的參數(shù),一定時(shí)間后沒(méi)有獲取到task就返回null,這也是臨時(shí)線(xiàn)程為什么有存活時(shí)間的原理。j然后你再去看getTask的源碼是不是就豁然開(kāi)朗了。
到此線(xiàn)程池的主要源碼就分析完畢了,現(xiàn)在再去看創(chuàng)建線(xiàn)程池的那幾個(gè)參數(shù)是不是印象更加深刻了,最后再回到文章開(kāi)頭的兩段代碼中,去好好理解下CC作者為什么會(huì)將LinkedBlockingQueue改成SynchronousQueue,以及下面那段代碼的打印順序?yàn)槭裁词琼樞虻摹H绻€不能解釋的話(huà),只能說(shuō)明你在看源碼的時(shí)候沒(méi)有好好思考了。
shutDown和shutDownNow
最后說(shuō)下shutDown和shutDonwNow的區(qū)別,前者是調(diào)用之后不再接受新的task,而原有線(xiàn)程池中的task還會(huì)繼續(xù)被執(zhí)行。而如果調(diào)用的是shutDownNow方法,不僅不會(huì)接受新的task,包括線(xiàn)程池原有的task執(zhí)行都會(huì)被中斷掉。這些不難理解,結(jié)合上述分析再看一次源碼就明白了。
最后再說(shuō)一點(diǎn),在我們通過(guò)線(xiàn)程池submit任務(wù)的時(shí)候,去判斷下線(xiàn)程的isInterrupted,盡可能的避免被中斷的線(xiàn)程執(zhí)行多余的邏輯不失為一個(gè)好的習(xí)慣。AQS對(duì)于這塊的處理就相當(dāng)?shù)挠眯摹,F(xiàn)在自己也寫(xiě)了一篇關(guān)于線(xiàn)程池的分析文章,以后想回顧下線(xiàn)程池的原理,再也不用到處去找別人的文章慢慢啃了。