前言
如果有人問(wèn)我:“你了解Java線程池嗎”,我不打算回答Java中常用的幾種線程池,也記不住。從線程池的上層API來(lái)看,再多種的線程池,無(wú)非是參數(shù)的不同,讓它們呈現(xiàn)出了不同的特性,那么這些特性到底依賴什么樣的原理實(shí)現(xiàn),就更值得去深究,也是本文的目的。
試著回答以下幾個(gè)問(wèn)題:
- 線程池如何實(shí)現(xiàn)
- 非核心線程延遲死亡,如何做到
- 核心線程為什么不會(huì)死
- 如何釋放核心線程
- 非核心線程能成為核心線程嗎
- Runnable在線程池里如何執(zhí)行
- 線程數(shù)如何做選擇
- 常見(jiàn)的不同類型的線程池的功效如何做到
如果以上問(wèn)題回答不出一二三,可以借鑒本文。
基礎(chǔ)知識(shí)
要了解線程池,必然涉及到ThreadPoolExecutor。ThreadPoolExecutors實(shí)現(xiàn)了線程池所需的最小功能集,已能hold住很多場(chǎng)景。常見(jiàn)的線程池類型,通過(guò)Executors提供的API,屏蔽了構(gòu)造參數(shù)細(xì)節(jié)來(lái)創(chuàng)建ThreadPoolExecutors,因?yàn)椴涣私饩唧w參數(shù)含義的話,可能拿到的線程池與設(shè)想的會(huì)有偏差。
構(gòu)造參數(shù)與對(duì)象成員變量
- corePoolSize:核心線程數(shù),期望保持的并發(fā)狀態(tài)
- maximumPoolSize:最大線程數(shù),允許超載,雖然期望將并發(fā)狀態(tài)保持在一定范圍,但是在任務(wù)過(guò)多時(shí),增加非核心線程來(lái)處理任務(wù)。非核心線程數(shù) = maximumPoolSize - corePoolSize
- workQueue:阻塞隊(duì)列,存儲(chǔ)線程任務(wù)Runnable
- keepAliveTime:在沒(méi)有任務(wù)時(shí),線程存活時(shí)間
- threadFactory:用來(lái)構(gòu)建線程
- handler:當(dāng)任務(wù)已滿,并且無(wú)法再增加線程數(shù)時(shí),或拒絕添加任務(wù)時(shí),所執(zhí)行的策略
Worker
線程池中的工作線程以Worker作為體現(xiàn),真正工作的線程為Worker的成員變量,Worker即是Runnable,又是同步器。Worker從工作隊(duì)列中取出任務(wù)來(lái)執(zhí)行,并能通過(guò)Worker控制任務(wù)狀態(tài)。
ctl
ctl用來(lái)控制線程池的狀態(tài),并用來(lái)表示線程池線程數(shù)量。在線程池中,有以下五種狀態(tài)
- RUNNABLE:運(yùn)行狀態(tài),接受新任務(wù),持續(xù)處理任務(wù)隊(duì)列里的任務(wù)
- SHUTDOWN:不再接受新任務(wù),但要處理任務(wù)隊(duì)列里的任務(wù)
- STOP:不接受新任務(wù),不再處理任務(wù)隊(duì)列里的任務(wù),中斷正在進(jìn)行中的任務(wù)
- TIDYING:表示線程池正在停止運(yùn)作,中止所有任務(wù),銷毀所有工作線程
- TERMINATED:表示線程池已停止運(yùn)作,所有工作線程已被銷毀,所有任務(wù)已被清空或執(zhí)行完畢
狀態(tài)轉(zhuǎn)換關(guān)系如下圖

ctl類型為AtomicInteger,那用一個(gè)基礎(chǔ)如何表示以上五種狀態(tài)以及線程池工作線程數(shù)量呢?int型變量占用4字節(jié),共32位,因此采用位表示,可以解決上述問(wèn)題。5種狀態(tài)使用5種數(shù)值進(jìn)行表示,需要占用3位,余下的29位就可以用來(lái)表示線程數(shù)。因此,高三位表示進(jìn)程狀態(tài),低29位為線程數(shù)量,代碼如下:
// 值為29
private static final int COUNT_BITS = Integer.SIZE - 3;
// 高三位全為0,低29位全為1,因此線程數(shù)量的表示范圍為 0 ~ 2^29
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
/**
因?yàn)閏tl分位來(lái)表示狀態(tài)和數(shù)量,下面幾個(gè)狀態(tài)僅看有效位的值
*/
// 有效值為 111
private static final int RUNNING = -1 << COUNT_BITS;
// 有效值為 000
private static final int SHUTDOWN = 0 << COUNT_BITS;
// 有效值為 001
private static final int STOP = 1 << COUNT_BITS;
// 有效值為 010
private static final int TIDYING = 2 << COUNT_BITS;
// 有效值為 011
private static final int TERMINATED = 3 << COUNT_BITS;
// 默認(rèn)狀態(tài)為RUNNING,線程數(shù)量為0
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
既然采用了int分位表示線程池狀態(tài)和線程數(shù)量,那么線程池自然提供了方法來(lái)獲取狀態(tài)與數(shù)量
- runStateOf(): 獲取線程池狀態(tài)
- workerCountOf(): 獲取工作線程數(shù)量
兩函數(shù)均為二進(jìn)制操作,代碼不貼,可用下圖說(shuō)明:

線程池實(shí)現(xiàn)
添加任務(wù)
線程池可以通過(guò)submit()、execute()提交線程任務(wù),其中,submit()可以通過(guò)Future拿到執(zhí)行結(jié)果,內(nèi)部也是通過(guò)execute()向線程池提交線程任務(wù).
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
// 獲取當(dāng)前ctl值
int c = ctl.get();
// 當(dāng)前線程數(shù)少于最大核心線程數(shù)
if (workerCountOf(c) < corePoolSize) {
// 添加核心線程,添加線程任務(wù)
if (addWorker(command, true))
return;
// 上面的過(guò)程期間,ctl可能已被更改,獲取最新值
c = ctl.get();
}
// 線程池狀態(tài)為RUNNABLE,向工作隊(duì)列添加任務(wù)
if (isRunning(c) && workQueue.offer(command)) {
// 再次檢查用
int recheck = ctl.get();
// 線程不處于RUNNABLE狀態(tài),移除任務(wù)
if (! isRunning(recheck) && remove(command))
// 執(zhí)行拒絕任務(wù)策略
reject(command);
else if (workerCountOf(recheck) == 0)
// 執(zhí)行到這里說(shuō)明已沒(méi)有可用的工作線程,創(chuàng)建新的工作現(xiàn)線程
// ,并從任務(wù)隊(duì)列里取任務(wù)。因?yàn)樵谶@個(gè)時(shí)刻,存在所有工作線程
// 都被釋放的可能,為了應(yīng)對(duì)這個(gè)線程池“假死”的情況,所以創(chuàng)建
// 了新的工作線程
addWorker(null, false);
}
// 添加非核心隊(duì)列來(lái)執(zhí)行線程任務(wù)
else if (!addWorker(command, false))
// 說(shuō)明線程池達(dá)到飽和,或者線程池shut down,執(zhí)行拒絕策略
reject(command);
}
當(dāng)有任務(wù)到來(lái)時(shí),按照如下策略進(jìn)行:
- 如果當(dāng)前核心線程數(shù)量沒(méi)達(dá)到最大值corePoolSize,創(chuàng)建新線程來(lái)執(zhí)行此任務(wù)
- 如果當(dāng)前核心線程到達(dá)最大,向阻塞隊(duì)列添加任務(wù)
- 如果核心線程已滿,阻塞隊(duì)列已滿,嘗試開(kāi)啟非核心線程來(lái)執(zhí)行任務(wù)
- 如果線程池不處于RUNNABLE狀態(tài),或者處于飽和狀態(tài),執(zhí)行任務(wù)拒絕策略
線程池是按照上面123的順序來(lái)處理新進(jìn)的任務(wù)的,并且在每一個(gè)過(guò)程中,會(huì)檢查ctl的最新值有效性,因?yàn)樵谔幚磉^(guò)程中線程池的各種狀態(tài)隨時(shí)可能發(fā)生了改變。
不過(guò)是通過(guò)添加核心或是通過(guò)添加非核心線程來(lái)執(zhí)行任務(wù),都是通過(guò)addWorker()來(lái)完成,下面是代碼
private boolean addWorker(Runnable firstTask, boolean core) {
// 這個(gè)是類似 goto 的語(yǔ)法,代碼有效片段是下面第一for循環(huán)
retry:
for (;;) {
int c = ctl.get();
// 獲取程序狀態(tài)
int rs = runStateOf(c);
/**
這一個(gè)條件需要仔細(xì)理解。
1. 當(dāng)線程處于STOP、TIDYING、TERMINATED時(shí),線程池是拒絕執(zhí)行任務(wù)的
因此不需要任務(wù),也不添加線程
2. 當(dāng)線程處于SHUTDOWN狀態(tài)時(shí),線程池需要把任務(wù)處理完,才會(huì)到達(dá)后面的
TIDYING、TERMINATED狀態(tài)。因此,如果阻塞隊(duì)列還有任務(wù)的話,繼續(xù)添加
線程來(lái)加快處理。
*/
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
// 獲取線程數(shù)
int wc = workerCountOf(c);
// 線程數(shù)超過(guò)或等于能表示的上限
// 或 比較 核心線程數(shù)達(dá)到上限,或比較線程池允許的最大線程數(shù),取決于core
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// CAS操作增加線程數(shù),跳出循環(huán)
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
// 上面的CAS操作沒(méi)成功,檢查線程池狀態(tài)與開(kāi)始是否一致,
// 如果一致,繼續(xù)執(zhí)行此for循環(huán),否則重新執(zhí)行retry代碼塊,
// 自旋以期CAS成功,后續(xù)才能添加線程
if (runStateOf(c) != rs)
continue retry;
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 將線程任務(wù)加入Worker,新增了Worker,就是新增了線程
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int rs = runStateOf(ctl.get());
// 再次檢查線程池狀態(tài)
// 1. 處于RUNNABLE狀態(tài),繼續(xù)添加線程執(zhí)行任務(wù)
// 2. 處于SHUTDOWN狀態(tài),到這里說(shuō)明隊(duì)列里還有任務(wù)要執(zhí)行
// 增加線程期望讓任務(wù)執(zhí)行快一點(diǎn)
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
// 這里說(shuō)明發(fā)生了意外狀況,新建的線程不可用
if (t.isAlive())
throw new IllegalThreadStateException();
// 添加worker進(jìn)集合
workers.add(w);
int s = workers.size();
// largestPoolSize可以表示線程池達(dá)到的最大并發(fā)
if (s > largestPoolSize)
largestPoolSize = s;
// 添加線程成功
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
// 啟動(dòng)新添加的線程
t.start();
// 線程啟動(dòng)成功
workerStarted = true;
}
}
} finally {
if (! workerStarted)
// 線程啟動(dòng)失敗,移除work,銷毀線程
addWorkerFailed(w);
}
return workerStarted;
}
以上代碼做了如下幾件事:
- 線程池處于 RUNNBALE 或者處于 SHUTDOWN 并在阻塞隊(duì)列里還有任務(wù)時(shí),需要添加新線程。自旋確保 CAS 成功,然后添加新線程
- 線程存于Worker,線程池存有Worker信息,就能訪問(wèn)線程
- 線程啟動(dòng)失敗,則移除Worker,銷毀線程
addWorkerFailed()操作就不進(jìn)去看了,首先是將Worker移除,然后通過(guò)CAS操作更新ctl,最后調(diào)用tryTerminate()操作嘗試中止線程池。
執(zhí)行任務(wù)
之前的代碼開(kāi)啟了新線程并讓線程執(zhí)行,但是沒(méi)有看到有Runnable提交。之前說(shuō)過(guò)Worker本身為Runnable,并且存有為T(mén)hread類型的成員變量。線程池執(zhí)行的任務(wù)的線程,也就是Workder里的Thread。
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
Worker(Runnable firstTask) {
setState(-1);
// firstTask就是addWorker()帶來(lái)的Runnable
this.firstTask = firstTask;
// 通過(guò)ThreadFactory創(chuàng)建線程,將自己作為Runnable提交
this.thread = getThreadFactory().newThread(this);
}
......
}
因此線程執(zhí)行后,執(zhí)行的是Worker.run(),run()則調(diào)用了ThreadPoolExecutor.runWorker()
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock();
boolean completedAbruptly = true;
try {
// task一開(kāi)始是firstTask, 后面就通過(guò)getTask()從阻塞隊(duì)列里拿任務(wù)
while (task != null || (task = getTask()) != null) {
w.lock();
// 線程池狀態(tài)檢查
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
// 這個(gè)方法子類可以重寫(xiě),在任務(wù)執(zhí)行前有回調(diào)
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 執(zhí)行任務(wù)
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
// 這個(gè)方法子類可以重寫(xiě),在任務(wù)執(zhí)行后有回調(diào)
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
// 線程池已沒(méi)有任務(wù)了,工作線程達(dá)到了可退出的狀態(tài),Worker退出
processWorkerExit(w, completedAbruptly);
}
}
線程首個(gè)任務(wù)為firstTask,之后通過(guò)getTask()就從阻塞隊(duì)列里任務(wù)。線程池提供了beforeExecute()和afterExecute()通知子類任務(wù)執(zhí)行前后的回調(diào),讓子類有時(shí)機(jī)能執(zhí)行自己的事情。如果線程池已沒(méi)有任務(wù)了,工作線程達(dá)到了可退出的狀態(tài),則將線程退出。
主要看getTask() 和 processWorkerExit()
private Runnable getTask() {
// 超時(shí)標(biāo)志
boolean timedOut = false;
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 檢查線程池和阻塞隊(duì)列狀態(tài)
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
// 減少線程數(shù)
decrementWorkerCount();
return null;
}
// 獲取線程數(shù)
int wc = workerCountOf(c);
// 線程等待方式標(biāo)志位判斷依據(jù)
// allowCoreThreadTimeOut代表核心線程是不是能退出,如果核心線程能退出,就更別說(shuō)非核心線程了
// 另一個(gè)則是看是否存在非核心線程
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// 超時(shí),或者并且線程超標(biāo)超標(biāo),返回null,讓上一層函數(shù)退出線程
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
// 如果timed為true,則使用poll等待最多keepAliveTime時(shí)間獲取任務(wù)
// 如果timed為false,使用take()獲取任務(wù),阻塞線程,直到可以從阻塞隊(duì)列拿到任務(wù)
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
// 超時(shí)
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
線程池里的線程從阻塞隊(duì)列里拿任務(wù),如果存在非核心線程,假設(shè)阻塞隊(duì)列里沒(méi)有任務(wù),那么非核心線程也要在等到keepAliveTime時(shí)間后才會(huì)釋放。如果當(dāng)前僅有核心線程存在,如果允許釋放核心線程的話,也就和非核線程的處理方式一樣,反之,則通過(guò)take()一直阻塞直到拿到任務(wù),這也就是線程池里的核心線程為什么不死的原因。
從之前的代碼一直看到這,并沒(méi)有發(fā)現(xiàn)有明顯的標(biāo)志來(lái)標(biāo)志核心線程與非核心線程,而是以線程數(shù)來(lái)表達(dá)線程身份。0 ~ corePoolSize 表示線程池里只有核心線程,corePoolSize ~ maximumPoolSize 表示線程池里核心線程滿,存在非核心線程。然后,根據(jù)區(qū)間狀態(tài)做有差異的處理??梢源竽懖聹y(cè),線程池實(shí)際并不區(qū)分核心線程與非核心線程,是根據(jù)當(dāng)前的總體并發(fā)狀態(tài)來(lái)決定怎樣處理線程任務(wù)。corePoolSize是線程池希望達(dá)到并保持的并發(fā)狀態(tài),而corePoolSize ~ maximumPoolSize則是線程池允許的并發(fā)的超載狀態(tài),不希望長(zhǎng)期保持。
釋放線程
在線程沒(méi)有拿到任務(wù)后,退出線程,通過(guò)processWorkerExit()可以證實(shí)上述所言。
private void processWorkerExit(Worker w, boolean completedAbruptly) {
// 到這里說(shuō)明線程中斷,先通過(guò)decrementWorkerCount()減少線程數(shù)值
// 否則,說(shuō)明是線程沒(méi)有從阻塞隊(duì)列獲取到線程
if (completedAbruptly)
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// completedTaskCount記錄線程池總共完成的任務(wù)
// w.completedTasks則是線程完成的任務(wù)數(shù)
completedTaskCount += w.completedTasks;
// 移除Worker
workers.remove(w);
} finally {
mainLock.unlock();
}
// 線程池狀態(tài)改變,嘗試中止線程池
tryTerminate();
int c = ctl.get();
// 檢查線程池狀態(tài),線程池處于RUNNABLE或者SHUTDOWN則進(jìn)入
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
// 線程池最小數(shù)量,取決于是否能釋放核心線程
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
// 如果任務(wù)隊(duì)列還有線程,最起碼都要有一個(gè)線程來(lái)處理任務(wù)
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return;
}
// 因?yàn)榫€程中斷,可能導(dǎo)致沒(méi)有線程來(lái)執(zhí)行阻塞隊(duì)列里的任務(wù)
// 因此嘗試創(chuàng)建線程去執(zhí)行任務(wù)
addWorker(null, false);
}
}
釋放工作線程也并沒(méi)有區(qū)分核心與非核心,也是隨機(jī)進(jìn)行的。所謂隨機(jī),就是在前面所說(shuō)的區(qū)間范圍內(nèi),根據(jù)釋放策略,哪個(gè)線程先達(dá)到獲取不到任務(wù)的狀態(tài),就釋放哪個(gè)線程。
文中多次出現(xiàn)tryTerminate(),但不深入去看了。里邊最主要的操作是,發(fā)現(xiàn)可以中止線程池時(shí),中止,并調(diào)用terminated()進(jìn)行通知。如果線程池處于RUNNABLE狀態(tài),什么也不做,否則嘗試中斷一個(gè)線程。 中斷線程則是通過(guò)interruptIdleWorker()操作,就不展開(kāi)了。
到這里就能能明白線程池的原理的,如下圖

線程池里有容納一定的Worker,Worker中的線程就是線程池中用來(lái)執(zhí)行任務(wù)的線程。當(dāng)有任務(wù)加入線程時(shí),根據(jù)線程池狀態(tài)的不同,有不同的步驟。當(dāng)核心線程未滿時(shí),創(chuàng)建新線程來(lái)執(zhí)行;否則將任務(wù)加入到阻塞隊(duì)列;否則創(chuàng)建非核心線程來(lái)執(zhí)行。而線程獲取任務(wù)的方式有兩種,根據(jù)線程池容量區(qū)間,以及是否可以釋放核心線程來(lái)使用take()或者poll()來(lái)獲取任務(wù),其中poll()在一定時(shí)間內(nèi)獲取不到任務(wù),則當(dāng)前線程會(huì)被釋放。
當(dāng)然,在addWorker()方法來(lái)有任務(wù)添加失敗的策略,也就是RejectedExecutionHandler。ThreadPoolExecutor實(shí)現(xiàn)了四種策略來(lái)進(jìn)行處理,簡(jiǎn)單了解即可:
- CallerRunsPolicy: 如果線程池沒(méi)有SHUTODOWN的話,直接執(zhí)行任務(wù)
- AbortPolicy: 拋出異常,說(shuō)明當(dāng)前情況的線程池不希望得到接收不了任務(wù)的狀態(tài)
- DiscardOldestPolicy: 丟棄阻塞隊(duì)列最舊的任務(wù)
- DiscardPolicy: 什么也不做
需要注意的是,默認(rèn)情況下策略為AbortPolicy。
總結(jié)
做個(gè)總結(jié):
- 線程池傾向于使用核心線程來(lái)處理任務(wù),從任務(wù)的添加策略可以看出,先考慮創(chuàng)建核心線程處理,再考慮放到阻塞隊(duì)列,再考慮創(chuàng)建非核心線程處理。以上都不行,則使用任務(wù)拒絕策略
- 通過(guò)向阻塞隊(duì)列取任務(wù)的不同操作,能確保線程的存活,take()保證核心線程不死,poll()保證非核心線程存活等待一定時(shí)間
- 線程池不區(qū)分核心線程和非核心線程,線程池是期望達(dá)到corePoolSize的并發(fā)狀態(tài),并允許在不得已情況下超載,達(dá)到corePoolSize ~ maximumPoolSize 的并發(fā)狀態(tài)
- 線程池狀態(tài)和線程數(shù)量用ctl表示,高三位為狀態(tài),低29位為當(dāng)前線程池?cái)?shù)量
- 線程池對(duì)狀態(tài)的檢測(cè)非常苛刻,幾乎在所有稍微耗時(shí)或影響下一步操作正確性的代碼前都校驗(yàn)ctl
線程池中有很多值得學(xué)習(xí)的東西,線程容量調(diào)整的設(shè)計(jì)、ctl的設(shè)計(jì)、任務(wù)調(diào)度的設(shè)計(jì)等。也有需要更深的儲(chǔ)備才能看懂的實(shí)現(xiàn),這里點(diǎn)出,以備近一步學(xué)習(xí),如同步器的使用,并發(fā)場(chǎng)景的考慮與應(yīng)用等。
下面回答開(kāi)篇提出的問(wèn)題。
問(wèn)答
線程池如何實(shí)現(xiàn)
總結(jié)就是這個(gè)問(wèn)題的答案
非核心線程延遲死亡,如何實(shí)現(xiàn)
通過(guò)阻塞隊(duì)列poll(),讓線程阻塞等待一段時(shí)間,如果沒(méi)有取到任務(wù),則線程死亡
核心線程為什么不死
通過(guò)阻塞隊(duì)列take(),讓線程一直等待,直到獲取到任務(wù)
如何釋放核心線程
將allowCoreThreadTimeOut設(shè)置為true。可用下面代碼實(shí)驗(yàn)
// 偽代碼
{
// 允許釋放核心線程,等待時(shí)間為100毫秒
es.allowCoreThreadTimeOut(true);
for(......){
// 向線程池里添加任務(wù),任務(wù)內(nèi)容為打印當(dāng)前線程池線程數(shù)
Thread.currentThread().sleep(200);
}
}
線程數(shù)會(huì)一直為1。 如果allowCoreThreadTimeOut為false,線程數(shù)會(huì)逐漸達(dá)到飽和,然后大家一起阻塞等待。
非核心線程能成為核心線程嗎
線程池不區(qū)分核心線程于非核心線程,只是根據(jù)當(dāng)前線程池容量狀態(tài)做不同的處理來(lái)進(jìn)行調(diào)整,因此看起來(lái)像是有核心線程于非核心線程,實(shí)際上是滿足線程池期望達(dá)到的并發(fā)狀態(tài)。
Runnable在線程池里如何執(zhí)行
線程執(zhí)行Worker,Worker不斷從阻塞隊(duì)列里獲取任務(wù)來(lái)執(zhí)行。如果任務(wù)加入線程池失敗,則在拒絕策略里,還有處理機(jī)會(huì)。
線程數(shù)如何做選擇
這就要看任務(wù)類型是計(jì)算密集型任務(wù)還是IO密集型任務(wù)了,區(qū)別在于CPU占用率。計(jì)算密集型任務(wù)涉及內(nèi)存數(shù)據(jù)的存取,CPU處于忙綠狀態(tài),因此并發(fā)數(shù)相應(yīng)要低一些。而IO密集型任務(wù),因?yàn)橥獠吭O(shè)備速度不匹配問(wèn)題,CPU更多是處于等待狀態(tài),因此可以把時(shí)間片分給其他線程,因此并發(fā)數(shù)可以高一些。
常見(jiàn)的不同類型的線程池的功效如何做到
常見(jiàn)的線程池有:
- CachedThreadPool:適合異步任務(wù)多,但周期短的場(chǎng)景
- FixedThreadPool: 適合有一定異步任務(wù),周期較長(zhǎng)的場(chǎng)景,能達(dá)到有效的并發(fā)狀態(tài)
- SingleThreadExecutor: 適合任務(wù)串行的場(chǎng)景
- ScheduledThreadPool: 適合周期性執(zhí)行任務(wù)的場(chǎng)景
對(duì)于如何選擇線程池就要看具體的場(chǎng)景,其中的差異通過(guò)構(gòu)造參數(shù)可以到達(dá)效果,通過(guò)之前的分析,就能知道參數(shù)的具體作用以及為什么能達(dá)到效果。取FixedThreadPool來(lái)看,拋磚引玉。
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
nThreads個(gè)數(shù)量核心線程持續(xù)并發(fā)任務(wù),沒(méi)有非核心線程,如果沒(méi)有任務(wù),則通過(guò)take()阻塞等待,不允許核心線程死亡。并且阻塞隊(duì)列為L(zhǎng)inkedBlockingQueue,容量為Integer.MAX_VALUE,可以視為無(wú)界隊(duì)列,更難走到拒絕添加線程邏輯。
參考
線程池原理
徹底理解Java線程池原理篇
Java線程池---ThreadPoolExecutor中的ctl變量
JUC鎖框架_AbstractQueuedSynchronizer詳細(xì)分析