@TOC
前言
上一篇我們介紹了線程池的使用,這一篇我們接著分析下線程池的實(shí)現(xiàn)原理。首先從創(chuàng)建線程池的核心類ThreadPoolExecutor類說起。
ThreadPoolExecutor類的常量
//用來存放工作線程數(shù)量和線程池狀態(tài)
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3; //32-3=29
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
//運(yùn)行狀態(tài),可以執(zhí)行任務(wù)
private static final int RUNNING = -1 << COUNT_BITS;
//不能接受新任務(wù),但是可以執(zhí)行完正在執(zhí)行任務(wù)
private static final int SHUTDOWN = 0 << COUNT_BITS;
//不能接受新任務(wù),也不能執(zhí)行已有的任務(wù)
private static final int STOP = 1 << COUNT_BITS;
//所有任務(wù)都終止,工作線程數(shù)歸零
private static final int TIDYING = 2 << COUNT_BITS;
//終止?fàn)顟B(tài)執(zhí)行完成
private static final int TERMINATED = 3 << COUNT_BITS;
//獲取線程池的狀態(tài)
private static int runStateOf(int c) { return c & ~CAPACITY; }
//獲取工作線程的數(shù)量
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
ctl 變量主要是為了把工作線程數(shù)量和線程池狀態(tài)放在一個(gè)整型變量存儲而設(shè)置的一個(gè)原子類型的變量。在ctl中,低位的29位表示工作線程的數(shù)量,高位用來表示RUNNING,SHUTDOWN,STOP等線程池狀態(tài)。上面定義的三個(gè)方法只是為了計(jì)算得到線程池的狀態(tài)和工作線程的數(shù)量以及得到ctl。
下面是一段線程池的測試代碼,定義線程池,并調(diào)用execute方法添加任務(wù),并執(zhí)行任務(wù)。
public class ExectorTest {
public static void main(String[] args) {
//給線程設(shè)置一個(gè)自定義名稱
ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("測試線程-%d").build();
ThreadPoolExecutor executorService = new ThreadPoolExecutor(
3,
6,
10,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(5),
threadFactory
// , new ThreadPoolExecutor.CallerRunsPolicy()
);
for (int i=0;i<20;i++) {
executorService.execute(()->{
//模擬耗時(shí)的任務(wù)
System.out.println(Thread.currentThread().getName()+" 開始執(zhí)行任務(wù)");
int j = 10000 * 10000;
while (j >0) {
j--;
}
System.out.println(Thread.currentThread().getName()+" 執(zhí)行結(jié)束");
});
}
}
}
利用debug模式得到的調(diào)試棧如下:

提交任務(wù)execute方法是整個(gè)線程池的執(zhí)行入口,下面我就從它開始分析。
execute方法
public void execute(Runnable command) {
//如果任務(wù)為空,則拋出NPE異常
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
//獲取線程池狀態(tài)
int c = ctl.get();
//1.如果工作線程的數(shù)量小于核心線程數(shù)
if (workerCountOf(c) < corePoolSize) {
//調(diào)用addWorker增加一個(gè)新線程,并執(zhí)行一個(gè)任務(wù)
if (addWorker(command, true))
return;
c = ctl.get();
}
//如果線程池的狀態(tài)是運(yùn)行狀態(tài),并且任務(wù)加入到了工作隊(duì)列成功
if (isRunning(c) && workQueue.offer(command)) {
//雙重檢查,再次檢查線程池的狀態(tài)。
int recheck = ctl.get();
//如果線程池的狀態(tài)不是運(yùn)行狀態(tài)并且移除任務(wù)成功則調(diào)用拒絕策略
if (! isRunning(recheck) && remove(command))
//調(diào)用RejectedExecutionHandler.rejectedExecution()方法。根據(jù)不同的拒絕策略去處理
reject(command);
//如果工作線程的數(shù)量為0,說明工作隊(duì)列中可能有任務(wù)沒有線程執(zhí)行,此時(shí)則新建一個(gè)線程來執(zhí)行任務(wù),由于執(zhí)行的是隊(duì)列中已經(jīng)堆積的任務(wù),所以沒有傳入具體的任務(wù)。
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//如果前面的新增work,放入隊(duì)列都失敗,則會繼續(xù)新增worker,此時(shí)線程池中的工作線程數(shù)達(dá)到corePoolSize,阻塞隊(duì)列任務(wù)已滿,只能基于maximumPoolSize來繼續(xù)增加work,如果還是失敗
else if (!addWorker(command, false))
//如果還是失敗,則調(diào)用RejectedExecutionHandler.rejectedExecution()方法。根據(jù)不同的拒絕策略去處理
reject(command);
}
從上代碼中,我們可以總結(jié)出execute方法主要有如下三個(gè)流程
- 如果線程池中當(dāng)前工作線程數(shù)小于核心線程數(shù)(corePoolSize),則創(chuàng)建一個(gè)新線程來執(zhí)行傳入的任務(wù)(執(zhí)行這一步驟需要獲取全局鎖)
- 如果工作線程數(shù)大于等于核心線程數(shù),并且線程池是運(yùn)行狀態(tài),則將傳入的任務(wù)加入到工作隊(duì)列(BlockingQueue)中。
- 如果無法將任務(wù)加入BlockingQueue(隊(duì)列已滿),則創(chuàng)建新的線程來處理任務(wù)(執(zhí)行這一步驟需要獲取全局鎖)
- 如果創(chuàng)建新線程將使得當(dāng)前運(yùn)行的線程超出maximumPoolSize,任務(wù)將被拒絕,并調(diào)用RejectedExecutionHandler.rejectedExecution()方法。根據(jù)不同的拒絕策略去處理
運(yùn)行的流程圖如下:
在這里插入圖片描述
從execute()方法可以看到新增線程并且執(zhí)行任務(wù)核心邏輯在addWorker方法中。
addWorker的方法
首先第一段代碼,這段代碼有兩個(gè)死循環(huán),外層的死循環(huán)主要是檢查線程池的狀態(tài),更新線程池的狀態(tài)。內(nèi)層的死循環(huán),是檢查工作線程的數(shù)量,并且通過CAS的方式在ctl中更新工作線程的數(shù)量。
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
//檢查線程池的狀態(tài)是否是運(yùn)行狀態(tài),并且隊(duì)列不為空
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
//獲取工作線程數(shù)
int wc = workerCountOf(c);
//工作線程數(shù)
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//通過CAS的方式來在ctl中增加工作線程的數(shù)量
if (compareAndIncrementWorkerCount(c))
break retry;
//再次獲取狀態(tài)
c = ctl.get(); // Re-read ctl
//如果狀態(tài)更新失敗,則循環(huán)更新
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
分析完了前置的一些檢查工作的代碼,接下來,來看下主流程的代碼:
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
//1. 新建一個(gè)工作線程,Work后面會說
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
//加鎖
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
//再次獲取線程池的狀態(tài),在新建線程或者釋放鎖時(shí),都會重新檢查。
int rs = runStateOf(ctl.get());
//如果線程池的狀態(tài)不是關(guān)閉狀態(tài),則進(jìn)入下面的分支
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
//檢查新建的線程是否是可運(yùn)行狀態(tài)
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
//將工作線程添加到HashSet類型的集合中
workers.add(w);
int s = workers.size();
//如果工作線程的集合數(shù)大于largestPoolSize
if (s > largestPoolSize)
largestPoolSize = s;
//新建工作線程成功之后,將操作標(biāo)志workerAdded設(shè)為true,表示新增工作線程成功,后續(xù)流程用
workerAdded = true;
}
} finally {
//釋放鎖
mainLock.unlock();
}
//如果新建工作線程成功,則調(diào)用start() 方法啟動(dòng)線程
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
//如果workerStarted為false,表示新建工作線程失敗
if (! workerStarted)
//移除已經(jīng)創(chuàng)建的工作線程
addWorkerFailed(w);
}
return workerStarted;
如上,該主流程的代碼邏輯也是比較清晰的,首先是新建一個(gè)工作線程,然后就是在同步代碼塊中檢查線程池的狀態(tài),如果不是SHUTDOWN狀態(tài),則將新增的線程放在HashSet類型線程的集合中,放入成功之后,將創(chuàng)建work的標(biāo)識workerAdded改成true,然后釋放鎖。接著就是調(diào)用start()方法使得線程可以執(zhí)行任務(wù)。接下來就來看看Worker的結(jié)構(gòu)
Work 類
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask; //傳入的任務(wù)
this.thread = getThreadFactory().newThread(this); //創(chuàng)建一個(gè)新線程
}
public void run() {
runWorker(this);
}
}
Worker類是ThreadPoolExecutor類的一個(gè)私有內(nèi)部不變類,其實(shí)現(xiàn)了Runnable接口,內(nèi)部的run()方法里面調(diào)用的runWorker()方法。所以,任務(wù)的最終執(zhí)行時(shí)通過runWorker()方法的。 在介紹runWorker()之前,我們先看看創(chuàng)建線程的邏輯。
ThreadFactoryBuilder類
按照前面調(diào)用棧我們接著分析下ThreadFactoryBuilder。ThreadFactoryBuilder類用于生成ThreadFactory并且設(shè)置一些參數(shù),比如線程名,線程的等級,是否是后臺線程等信息。這里設(shè)置信息用到了建造者模式。代碼如下:
public ThreadFactory build() {
return build(this);
}
private static ThreadFactory build(ThreadFactoryBuilder builder) {
final String nameFormat = builder.nameFormat;
final Boolean daemon = builder.daemon;
final Integer priority = builder.priority;
final UncaughtExceptionHandler uncaughtExceptionHandler = builder.uncaughtExceptionHandler;
//沒有指定ThreadFactory實(shí)現(xiàn)類的話默認(rèn)就是Executors.defaultThreadFactory()
final ThreadFactory backingThreadFactory =
(builder.backingThreadFactory != null)
? builder.backingThreadFactory
: Executors.defaultThreadFactory();
final AtomicLong count = (nameFormat != null) ? new AtomicLong(0) : null;
//匿名內(nèi)部類
return new ThreadFactory() {
@Override
public Thread newThread(Runnable runnable) {
//調(diào)用 backingThreadFactory.newThread得到生成的工作線程
Thread thread = backingThreadFactory.newThread(runnable);
//重置線程名
if (nameFormat != null) {
thread.setName(format(nameFormat, count.getAndIncrement()));
}
//重置是否是后臺線程
if (daemon != null) {
thread.setDaemon(daemon);
}
//重置線程的等級
if (priority != null) {
thread.setPriority(priority);
}
if (uncaughtExceptionHandler != null) {
thread.setUncaughtExceptionHandler(uncaughtExceptionHandler);
}
return thread;
}
};
}
這個(gè)類的邏輯比較簡單,主要是兩步
- 獲取ThreadFactory的具體實(shí)現(xiàn)類
- 調(diào)用ThreadFactory的newThread方法,并重置線程名信息。
接下來我們看看Executors類的內(nèi)部靜態(tài)類DefaultThreadFactory類。
DefaultThreadFactory類
static class DefaultThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
DefaultThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-";
}
public Thread newThread(Runnable r) {
//直接new一個(gè)工作線程
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
//是否是后臺線程
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
上面DefaultThreadFactory的代碼比較簡單,就是new一個(gè)工作線程并設(shè)置工作線程的默認(rèn)名。說完了創(chuàng)建工作線程的邏輯,接下來,我們來看看執(zhí)行任務(wù)的runWorker方法的邏輯。
runWorker
final void runWorker(Worker w) {
//獲取當(dāng)前線程
Thread wt = Thread.currentThread();
//獲取當(dāng)前的任務(wù)
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
//一般情況下,task都不會為空(特殊情況上面注釋就是前面execute方法說的)或者可以從工作隊(duì)列中取到任務(wù),會直接進(jìn)入循環(huán)體中。
while (task != null || (task = getTask()) != null) {
//加鎖
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
//如果線程池正在停止,確保線程是被中斷,
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
//該方法是個(gè)空的實(shí)現(xiàn),如果有需要用戶可以自己繼承該類進(jìn)行實(shí)現(xiàn)
beforeExecute(wt, task);
Throwable thrown = null;
//調(diào)用任務(wù)的run方法,真正的任務(wù)執(zhí)行邏輯
task.run();
.....省略部分代碼
finally {
//該方法是個(gè)空的實(shí)現(xiàn),如果有需要用戶可以自己繼承該類進(jìn)行實(shí)現(xiàn)
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
//當(dāng)指定任務(wù)執(zhí)行完成,阻塞隊(duì)列中也取不到可執(zhí)行任務(wù)時(shí),會進(jìn)入這里,做一些善后工作,比如在corePoolSize跟maximumPoolSize之間的woker會進(jìn)行回收
processWorkerExit(w, completedAbruptly);
}
}
work線程的執(zhí)行流程就是首先執(zhí)行初始化分配給的任務(wù),執(zhí)行完成之后會嘗試從阻塞中獲取可執(zhí)行的任務(wù),如果指定時(shí)間內(nèi)仍然沒有任務(wù)可以執(zhí)行,則進(jìn)入銷毀邏輯,這里只會回收corePoolSize與maxmumPoolSize之間的那部分worker。
getTask方法
這里getTask方法的實(shí)現(xiàn)更我們構(gòu)造參數(shù)設(shè)置存活時(shí)間有關(guān),我們都知道構(gòu)造參數(shù)設(shè)置的時(shí)間代表了線程池中的線程,即worker線程的存活時(shí)間,如果到期則回收worker線程,這個(gè)邏輯的實(shí)現(xiàn)就在getTask中。來不及執(zhí)行的任務(wù),線程池會放入一個(gè)阻塞隊(duì)列(工作隊(duì)列),getTask方法就是去工作隊(duì)列中取任務(wù),用戶設(shè)置的存活時(shí)間,就是從這個(gè)阻塞隊(duì)列中取任務(wù)等待的最大時(shí)間,如果getTask返回null,意思就是worker等待了指定時(shí)間仍然沒有取到任務(wù),此時(shí)就會跳過循環(huán)體,進(jìn)入worker線程銷毀邏輯。
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
//線程池如果是SHUTDOWN或者STOP狀態(tài),則將work移除。
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// 對于allowCoreThreadTimeOut為true(設(shè)置了核心線程的存活時(shí)間),或者是在corePoolSize與maxmumPoolSize之間的那部分worker
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
//如果timed為true,則需要在keepAliveTime時(shí)間內(nèi)取任務(wù),否則沒有存活時(shí)間的限制
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
總結(jié)
本文對線程池添加任務(wù),執(zhí)行任務(wù)的源碼做了重點(diǎn)解析,內(nèi)部用到了很多設(shè)計(jì)模式,比如創(chuàng)建線程用到了工廠模式,設(shè)置線程的屬性用到了建造者模式。同時(shí)還用到了鎖等知識。了解其實(shí)現(xiàn)原理對我們更好的使用線程池大有好處。
參考
Java線程池總結(jié)
面試題|關(guān)于Java線程池一篇文章就夠了
全網(wǎng)同名【碼農(nóng)飛哥】。不積跬步,無以至千里,享受分享的快樂
我是碼農(nóng)飛哥,再次感謝您讀完本文。
