線程池介紹
基本元素
線程池是一個(gè)創(chuàng)建使用線程并能保存使用過(guò)的線程以達(dá)到服用的對(duì)象,他使用workQueue當(dāng)作阻塞隊(duì)列,workers作為線程的封裝操作對(duì)象。
/**
* 用于保留任務(wù)并移交給工作線程(指允許不超過(guò)maximumPoolSize大小的線程)的隊(duì)列。
* 我們不要求workQueue.poll()返回null必然意味著workQueue.isEmpty(),因此僅依靠isEmpty來(lái)查看隊(duì)列是否為空(例如,在決定是否從SHUTDOWN過(guò)渡到TIDYING時(shí)必須這樣做)。
* 這可容納特殊用途的隊(duì)列,例如DelayQueues,允許poll()返回null,即使它在延遲到期后稍后可能返回non-null。
*/
private final BlockingQueue<Runnable> workQueue;
/**
* 包含池中所有工作線程的集合。只有在持有mainLock鎖時(shí)才能訪問(wèn)。
*/
private final HashSet<Worker> workers = new HashSet<Worker>();
保證線程安全的元素
/**
* 主池控制狀態(tài)變量ctl包含了兩個(gè)概念字段,workerCount表示有效線程數(shù),runState表示是否正在運(yùn)行,正在關(guān)閉等
* 為了將它們打包為一個(gè)int,我們將workerCount限制為(2 ^ 29)-1(約5億)個(gè)線程,而不是(2 ^ 31)-1(20億)可以表示的線程。如果將來(lái)有問(wèn)題,可以將該變量更改為AtomicLong,并調(diào)整以下移位掩碼常量。但是在需要之前,使用int可以使此代碼更快,更簡(jiǎn)單。
* workerCount是已被允許啟動(dòng)但不允許停止的工人數(shù)。該值可能與活動(dòng)線程的實(shí)際數(shù)量暫時(shí)不同,例如,當(dāng)ThreadFactory在被詢(xún)問(wèn)時(shí)創(chuàng)建線程失敗,并且退出線程仍在終止之前執(zhí)行簿記操作時(shí),該值會(huì)有所不同。用戶(hù)可見(jiàn)的線程池大小是工作集合的當(dāng)前大小。
*/
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
/**
* 鎖定時(shí)要鎖定工人集合和相關(guān)簿記。
* 雖然我們可以使用某種并發(fā)集合,但事實(shí)證明,通常最好使用鎖。 原因之一是,這可以序列化interruptIdleWorkers,從而避免了不必要的中斷風(fēng)暴,尤其是在關(guān)機(jī)期間。 否則,退出線程將同時(shí)中斷那些尚未中斷的線程。 它還簡(jiǎn)化了一些相關(guān)的統(tǒng)計(jì)數(shù)據(jù),如largePoolSize等。我們還在shutdown和shutdownNow上保留mainLock,以確保在單獨(dú)檢查中斷和實(shí)際中斷的權(quán)限時(shí),工人集合是穩(wěn)定的。
*/
private final ReentrantLock mainLock = new ReentrantLock();
execute(Runnable)執(zhí)行邏輯

execute()執(zhí)行邏輯,來(lái)自下方參考鏈接
/*
* 1.如果正在運(yùn)行的線程少于corePoolSize線程,會(huì)創(chuàng)建新線程。對(duì)addWorker方法的調(diào)用從原子上檢查runState和workerCount,并通過(guò)返回false來(lái)表示添加工作線程失敗了。
* 2.如果一個(gè)任務(wù)可以成功排隊(duì),那么我們?nèi)匀恍枰屑?xì)檢查是否應(yīng)該添加一個(gè)線程(因?yàn)楝F(xiàn)有線程自上次檢查后就死掉了)或自從進(jìn)入該方法以來(lái)該池已關(guān)閉。因此,我們重新檢查狀態(tài),并在必要時(shí)回滾排隊(duì)(如果已停止),或者在沒(méi)有線程的情況下啟動(dòng)新線程。
* 3.如果我們無(wú)法將任務(wù)排隊(duì),則嘗試添加一個(gè)新線程。如果失敗,線程池可能已關(guān)閉或已飽和,因此拒絕該任務(wù)。
*/
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
////獲取線程池控制狀態(tài)
int c = ctl.get();
//通過(guò)workerCountOf計(jì)算出實(shí)際線程數(shù)
if (workerCountOf(c) < corePoolSize) {
//未超過(guò)核心線程數(shù),則新增 Worker 對(duì)象,true表示核心線程,false表示最大線程
if (addWorker(command, true))
return;
c = ctl.get();
}
//核心線程滿(mǎn)了,如果線程池處于運(yùn)行狀態(tài)則往隊(duì)列中添加任務(wù),
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
//如果不在運(yùn)行狀態(tài)則刪除阻塞隊(duì)列中的任務(wù)并執(zhí)行拒絕策略
if (! isRunning(recheck) && remove(command))
reject(command);
//如果當(dāng)前線程池沒(méi)有任何工作線程在運(yùn)行,再次嘗試添加新的工作線程
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//嘗試調(diào)用最非核心線程,失敗則執(zhí)行拒絕策略
else if (!addWorker(command, false))
reject(command);
}
其中主要方法是addWorker()。
private boolean addWorker(Runnable firstTask, boolean core) {
//這部分主要是對(duì)運(yùn)行狀態(tài)的操作,嘗試通過(guò)原子操作增加工作線程數(shù),如果成功則跳出循環(huán),否則重新獲取ctl的值并重新檢查運(yùn)行狀,略過(guò)。。。
boolean workerStarted = false;//線程是否啟動(dòng)
boolean workerAdded = false;//線程是否添加
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 在鎖的保護(hù)下重新檢查運(yùn)行狀態(tài)rs。
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // 線程是否正在執(zhí)行任務(wù)
throw new IllegalThreadStateException();
workers.add(w);
//更新最大線程池大小largestPoolSize。
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
//線程啟動(dòng)!
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
經(jīng)過(guò)一系列邏輯運(yùn)行后,終于 t.start() 了!,然后他會(huì)調(diào)用Worker類(lèi)重寫(xiě)的run()方法,而里面只有runWorker()方法調(diào)用。
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
//獲取要執(zhí)行的任務(wù)
Runnable task = w.firstTask;
w.firstTask = null;
//釋放鎖以允許中斷。
w.unlock();
//設(shè)置一個(gè)布爾變量completedAbruptly,用于標(biāo)記任務(wù)是否突然完成。
boolean completedAbruptly = true;
try {
//完成初始任務(wù)后,繼續(xù)等待獲取新任務(wù)執(zhí)行
while (task != null || (task = getTask()) != null) {
w.lock();
//檢查線程池是否正在停止,如果是,則確保線程被中斷;如果不是,則確保線程沒(méi)有被中斷
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);//方法里面是空的,需要自定義實(shí)現(xiàn)
Throwable thrown = null;
try {
//執(zhí)行我們傳入的代碼
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);//方法里面是空的,需要自定義實(shí)現(xiàn)
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
//最后執(zhí)行線程退出前的操作
processWorkerExit(w, completedAbruptly);
}
}
這個(gè)方法的主要邏輯是通過(guò)循環(huán)不停調(diào)用getTask()獲取任務(wù)并執(zhí)行,直到getTask()返回為空。
/**
* 根據(jù)當(dāng)前配置設(shè)置執(zhí)行阻塞或定時(shí)等待任務(wù),或者線程由于以下任何原因而必須退出,則返回null:
* 1. 超過(guò)最大線程數(shù)。
* 2.線程池已停止。
* 3.線程池被關(guān)閉并且隊(duì)列已經(jīng)空了。
* 4.線程超時(shí)等待任務(wù),并且在定時(shí)等待之前和之后都將終止線程(即{@code allowCoreThreadTimeOut || workerCount> corePoolSize}),并且如果隊(duì)列為非空,此工作程序不是線程池中的最后一個(gè)線程。
*/
private Runnable getTask() {
//聲明一個(gè)布爾變量timedOut,用于記錄上一次poll()是否超時(shí)。
boolean timedOut = false;
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 檢查線程池狀態(tài)以及阻塞隊(duì)列是否為空
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// 允許核心線程超時(shí)或者實(shí)際線程數(shù)大于核心線程則為true,否則為false
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
//poll():使當(dāng)前線程等待,直到發(fā)出信號(hào)或中斷它,或者經(jīng)過(guò)指定的等待時(shí)間。
//take():使當(dāng)前線程等待,直到發(fā)出信號(hào)或被中斷為止。
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
核心線程和非核心線程的邏輯區(qū)分
特別說(shuō)明下核心線程和非核心線程的區(qū)別,面試容易問(wèn)到。
核心線程如果不設(shè)置屬性 allowCoreThreadTimeOut 為 true,那么創(chuàng)建后永遠(yuǎn)不會(huì)被關(guān)閉中斷,會(huì)在 getTask() 方法中的 workQueue.take() 處 阻塞等待任務(wù);
如果核心線程設(shè)置屬性 allowCoreThreadTimeOut 為 true或者是非核心線程,那么就會(huì)調(diào)用 workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) 方法,指定時(shí)間內(nèi)獲取不到任務(wù),就會(huì)跳出 runWorker(Worker w) 方法中的while循環(huán)而關(guān)閉線程。