閱讀任何源碼,我們都應(yīng)該帶著幾個(gè)問(wèn)題去閱讀,從源碼中找出這些問(wèn)題的答案,這樣才能徹底搞明白某個(gè)知識(shí)點(diǎn)。
下面我們就帶著這樣幾個(gè)問(wèn)題,一起看一下ThreadPoolExecutor的源碼
- 為什么要用線程池
- 為什么不推薦使用juc直接創(chuàng)建的線程池
- 線程池的幾個(gè)核心參數(shù)
- 線程池是什么時(shí)候創(chuàng)建線程的?
- 線程池是如何重復(fù)利用線程的?
- 任務(wù)提交的順序和執(zhí)行的順序是一樣的嗎?
1、為什么要使用線程池
這個(gè)其實(shí)可以寫一個(gè)簡(jiǎn)單的程序去跑一下,比如使用線程池去跑1000個(gè)task和開(kāi)1000個(gè)線程去跑這1000個(gè)task,線程池的效率會(huì)高出很多倍,原因是線程池能夠重復(fù)利用線程,沒(méi)有創(chuàng)建和銷毀線程的開(kāi)銷。
其實(shí)池化的技術(shù)在很多地方都會(huì)用到比如數(shù)據(jù)庫(kù)的連接池,字符串常量池,netty的對(duì)象池等等
2、為什么不推薦使用juc直接創(chuàng)建線程池的方式
我們找兩個(gè)Exectors創(chuàng)建線程池的源碼
a、newCachedTreadPool的源碼
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
可以看到這里的最大線程數(shù)是0xffff個(gè),這個(gè)最大線程數(shù)在大并發(fā)提交任務(wù)的情況下會(huì)創(chuàng)建大量線程,會(huì)導(dǎo)致CPU100%
b、newSingleThreadExecutor
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory));
}
//看一下LinkedBlockingQueue的實(shí)現(xiàn)
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
會(huì)初始化一個(gè)容量為0xffff的隊(duì)列,由于這個(gè)隊(duì)列太大,如果我們提交的任務(wù)數(shù)很多并且自定義的線程里的對(duì)象又很大的話,就很容易發(fā)生oom的問(wèn)題。
從這個(gè)工具類創(chuàng)建線程的參數(shù)我們可以看到底層調(diào)用的都是ThreadPoolExecutor的構(gòu)造方法,所以我們建議根據(jù)具體業(yè)務(wù)的規(guī)模設(shè)置合適的線程池參數(shù)。
new ThreadPoolExecutor()
3、線程池的幾個(gè)核心參數(shù)
ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue)
corePoolSize:最大線程數(shù)
maximumPoolSize:最大線程數(shù)
keepAiveTime:線程存活時(shí)間
TimeUnit:存活時(shí)間的參數(shù)
BlockingQueue:線程池的任務(wù)隊(duì)列
task投遞到線程池中的整個(gè)過(guò)程如下

看一下具體的代碼
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
//這里判斷線程數(shù)量是否小于corePoolSize,如過(guò)小于corePoolSize則直接創(chuàng)建worker線程
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
//到這個(gè)if說(shuō)明worker線程數(shù)量大于了corePoolSize了,這里直接添加到任務(wù)隊(duì)列
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//到這個(gè)if說(shuō)明任務(wù)加入隊(duì)列失敗,隊(duì)列滿了,則再創(chuàng)建線程worker線程,如果創(chuàng)建失敗則執(zhí)行拒絕策略
else if (!addWorker(command, false))
reject(command);
}
看到這里從宏觀上我們就能看到整個(gè)task投遞到線程池的一個(gè)過(guò)程,其實(shí)這里最主要的方法是addWorker,其實(shí)addworker里才是線程池的精華,里面有如何創(chuàng)建線程及start的邏輯,如何回收過(guò)期的線程,如何重復(fù)利用創(chuàng)建的線程去運(yùn)行task的邏輯
4、線程池是什么時(shí)候創(chuàng)建線程的
線程池的線程不是線程池創(chuàng)建的時(shí)候創(chuàng)建的,線程池的線程是在調(diào)用addWorker方法,并且addWorker執(zhí)行成功才會(huì)創(chuàng)建
下面我們一起分析一下addworker代碼,這個(gè)方法很長(zhǎng),其實(shí)要看明白這個(gè)方法我們要先看一下Worker這個(gè)類,可以看到這個(gè)類實(shí)際上實(shí)現(xiàn)了Runnable接口,其實(shí)線程池中運(yùn)行的runnable任務(wù)都會(huì)被包裝成Worker對(duì)象
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
private static final long serialVersionUID = 6138294804551838833L;
//當(dāng)前worker會(huì)綁定一個(gè)線程
final Thread thread;
//當(dāng)前worker處理的第一個(gè)任務(wù)
Runnable firstTask;
volatile long completedTasks;
//創(chuàng)建worker時(shí)就會(huì)生成 一個(gè)線程及賦值firstTask,
//注意線程池的線程就是在這里被創(chuàng)建的
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
//注意看這里,這個(gè)線程創(chuàng)建的時(shí)候傳的是this,也就意味著一會(huì)this.thread.start時(shí),
//執(zhí)行的是worker對(duì)象的run方法
//這個(gè)大家想一下,回味一下
this.thread = getThreadFactory().newThread(this);
}
}
下面在具體分析一下addWorker是如何start線程及重復(fù)利用線程運(yùn)行任務(wù)的
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
//獲取worker數(shù)量
int c = ctl.get();
//獲取線程池狀態(tài)
int rs = runStateOf(c);
// 如果線程池狀態(tài)為SHUTDOWN就不接收任務(wù)了直接returnfalse
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
//這里會(huì)判斷一下worker數(shù)量
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
//如果代碼運(yùn)行到了這里,就表示線程池狀態(tài)正常,任務(wù)達(dá)到了創(chuàng)建worker線程的條件
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
//這里會(huì)創(chuàng)建一個(gè)worker對(duì)象,
//結(jié)合上面的代碼,worker對(duì)象中會(huì)包含一個(gè)線程和一個(gè)firstTask
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
//下面這一部分是用來(lái)判斷需不需要將worker線程進(jìn)行緩存,給其他的任務(wù)使用
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
//注意看這個(gè)workers對(duì)象,這是一個(gè)hashset,用來(lái)緩存創(chuàng)建好的worker對(duì)象
//注意在強(qiáng)調(diào)一下這個(gè)worker對(duì)象包含一個(gè)Thread引用及Runnable引用
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
//緩存好worker線程后,這里會(huì)執(zhí)行線程的start邏輯
//注意線程池的任務(wù)就是在這里開(kāi)啟start使任務(wù)真正進(jìn)入runnable狀態(tài)的
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
下面我們看一下start的具體邏輯:
剛剛我們已經(jīng)看到了,thread是worker對(duì)象的一個(gè)屬性,實(shí)際上創(chuàng)建線程時(shí),傳的參數(shù)就是worker對(duì)象本身,所以線程start執(zhí)行的邏輯就是worker對(duì)象的run方法
//worker的run方法很簡(jiǎn)單,下面我們看一下runWorker方法
public void run() {
runWorker(this);
}
看完這個(gè)我們就基本上看完了線程池的核心邏輯了,但是還有一些細(xì)節(jié)沒(méi)有仔細(xì)看,后面我會(huì)提到,留給大家思考
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
//獲取一下fisrtTask
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
//這個(gè)task什么時(shí)候不為空?
//注意這里會(huì)有一個(gè)隱含的問(wèn)題,我們提交的任務(wù)存放的順序是 核心worker->隊(duì)列->最大線程worker
//這里的getTask是從隊(duì)列里獲取的任務(wù)
//這里task的執(zhí)行順序就變成了,核心worker->最大線程worker->隊(duì)列
//不知道大家有沒(méi)有g(shù)et到我的點(diǎn),可以做一個(gè)實(shí)驗(yàn)就是為task編一個(gè)號(hào),比如安1-10的順序提交任務(wù),最終執(zhí)行的結(jié)果卻是 1,2,3,4,8,9,10,5,6,7這種順序,這里就是出現(xiàn)這種提交順序和執(zhí)行順序不一樣的原理
while (task != null || (task = getTask()) != null) {
w.lock();
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
//這里會(huì)最終調(diào)用task的run方法,到此線程池的創(chuàng)建到運(yùn)行任務(wù)就看結(jié)束了
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
//這里會(huì)銷毀線程,在隊(duì)列為空的時(shí)候,會(huì)銷毀線程,讓線程數(shù)量停留在corePoolsize范圍內(nèi),什么時(shí)候會(huì)執(zhí)行到這,當(dāng)task為空的時(shí)候,什么時(shí)候task為空,看getTask的邏輯,getTask會(huì)判斷隊(duì)列是否為空及活躍線程的時(shí)間來(lái)返回task的值,這里就不細(xì)講了
processWorkerExit(w, completedAbruptly);
}
}
5、線程池是如何重復(fù)利用線程的
看完上面的分析,其實(shí)就能回答這個(gè)問(wèn)題了,當(dāng)任務(wù)提交時(shí)會(huì)創(chuàng)建worker對(duì)象,這個(gè)對(duì)象里會(huì)有綁定一個(gè)線程,同時(shí)會(huì)將worker對(duì)象放入workers Set中,創(chuàng)建成功后,會(huì)立馬調(diào)用worker.thread.start方法啟動(dòng)線程,這個(gè)線程的run方法進(jìn)行了包裝,首先判斷fisrtTask是否為空如果不為空則直接運(yùn)行,否則會(huì)while循環(huán)拿緩存隊(duì)列中的任務(wù),知道緩存隊(duì)列為空,或者空閑線程超過(guò)了keepalive時(shí)間就會(huì)銷毀線程,以保證線程維持在corePoolSize的大小
6、任務(wù)提交的順序和執(zhí)行的順序是一樣的嗎?
不一樣,提交順序是 核心worker線程->隊(duì)列->非核心worker線程,
但是執(zhí)行的順序卻是核心worker任務(wù)->非核心worker任務(wù)->隊(duì)列任務(wù)
如果能理解這個(gè),線程池就真的理解的差不多了
7、其他
上面幾個(gè)問(wèn)題有些是我在看源碼時(shí)的困惑點(diǎn),有些是我看完源碼之后的一些想法,除了這些問(wèn)題外,線程池還有很多精妙的地方比如,
a、線程池的狀態(tài)和核心線程數(shù)其實(shí)是用一個(gè)4個(gè)字節(jié)int表示的,為什么要這么表示
b、線程池中使用到的設(shè)計(jì)模式有哪些
c、線程池本身就是并發(fā)場(chǎng)景下提交任務(wù)的,那它自己的安全性是如何保證的,execute方法是如何保證安全性的