前言
在前面一節(jié)JDK線程池(一):體系結(jié)構(gòu)中已經(jīng)分析了JDK的線程池核心接口的組成,通過那些接口的描述,我們知道線程池它所提供的功能,而本文將圍繞JDK中的線程池是如何具體實現(xiàn)這些功能展開分析。
(PS:由于ThreadPoolExecutor的具體實現(xiàn)比較復雜,如果分析存在有誤的地方,請拍磚!)
本文將以如下內(nèi)容作為大綱進行分析說明:
- ThreadPoolExcutor的核心組成
- 任務(wù)的提交的執(zhí)行流程分析
- RejectedExecutionHandler分析
- ThreadFactory分析
1.走進ThreadPoolExecutor的世界
首先讓我們看看ThreadPoolExecutor的繼承結(jié)構(gòu):

通過上圖我們我們可以看到ThreadPoolExecutor繼承了抽象類AbstractExecutorService,從而完成對ExecutorService接口的實現(xiàn),而AbstractExecutorService只是提供了一些模板方法的實現(xiàn),具體的處理細節(jié)都還是落實到ThreadPoolExecutor中。
1.1 核心參數(shù)
由于線程池要應對不同的負載情況,ThreadPoolExecutor為了更好的適配不同的場景,因此其提供了很多的可調(diào)節(jié)的參數(shù),讓用戶根據(jù)實際的負載情況進行調(diào)節(jié)。這些核心參數(shù)需要在創(chuàng)建ThreadPoolExecutor時通過構(gòu)造方法來進行指定,ThreadPoolExecutor中提供了4個重載的構(gòu)造方法,下面讓我們看看ThreadPoolExecutor中最復雜的一個的構(gòu)造方法的實現(xiàn)(其余的構(gòu)造方法底層都是調(diào)用下面的這個):
/*
(1).corePoolSize:設(shè)置一個線程池中的核心線程數(shù)
如果設(shè)置allowCoreThreadTimeOut為false的情況下:
即使當線程池中的線程處于空閑狀態(tài),這些線程也不會被線程池中移除。
如果設(shè)置了allowCoreThreadTimeOut為true,
那么當核心線程在空閑了一段時間后依舊沒有用于工作,那么將會從線程池中移除。
注意:(allowCoreThreadTimeOut默認為false,通常情況下也無需做修改)
(2).maximumPoolSize:線程池中所允許創(chuàng)建最大線程數(shù)量
(3).keepAliveTime:當線程池中的線程數(shù)量大于核心線程數(shù),
如果這些多出的線程在經(jīng)過了keepAliveTime時間后,
依然處于空閑狀態(tài),那么這些多出的空閑線程將會被結(jié)束其生命周期。
(4).unit:keepAliveTime的時間單位
(5).workQueue:用于存放任務(wù)的阻塞隊列,當線程池中的核心線程都處在執(zhí)行任務(wù)時,
提交的任務(wù)將被存儲在workQueue進行緩沖。
該隊列只能存放通過execute方法提交的Runnable任務(wù)。
(6).threadFactory:線程池中用于創(chuàng)建線程的工廠
在這里使用線程工廠的目的也是為了解耦,將創(chuàng)建的實現(xiàn)細節(jié)通過工廠進行封裝,
而不是直接將創(chuàng)建的方式固化在ThreadPoolExecutor本身的代碼中。
(7)handler:當線程池中的線程數(shù)量達到最大并且阻塞隊列也已經(jīng)滿了無法再添加任務(wù)時,
線程池所采取的處理策略。
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
//對給定的核心參數(shù)進行合法性的校驗
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
ThreadPoolExecutor根據(jù)上述給定的參數(shù),會根據(jù)實際的負載情況對一個線程池中的實際工作線程做出動態(tài)調(diào)正,我們可以通過getPoolSize()方法來查看當前線程池中實際的線程數(shù)量。
/**
* 返回當前線程池中實際線程的數(shù)量
*
* @return the number of threads
*/
public int getPoolSize() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
/*根據(jù)當前線程池的狀態(tài)進行判斷,如果線程池已經(jīng)處
于terimated狀態(tài)時,則返回0,否則就通過worker集合(底層實際是一個HaseSet)中
返回當前線程的數(shù)量。
*/
return runStateAtLeast(ctl.get(), TIDYING) ? 0
: workers.size();
} finally {
mainLock.unlock();
}
}
上面的這些核心參數(shù)除了在ThreadPoolExecutor創(chuàng)建時指定,在后期運行的過程中也可以進行動態(tài)的修改,只是提供其對應的setXXX方法完成修改即可,從這點我們可以很好的感覺到ThreadPoolExecutor實現(xiàn)的靈活性。
1.2 線程池的狀態(tài)與工作線程數(shù)量
橫看成嶺側(cè)成峰,遠近高低各不同。
普通程序猿看整數(shù),它可能就是占4個字節(jié)的數(shù)字(Java中),而大神看整數(shù)卻能看出不同的風景。在ThreadPoolExecutor中使用了一種非常巧妙的方式對表示線程池的狀態(tài)和工作線程的數(shù)量。
在ThreadPoolExecutor中,使用了一個AtomicInteger對將當前線程的工作狀態(tài)和工作線程數(shù)量(有效線程數(shù))使用同一個整數(shù)進行包裝。
為了將兩個數(shù)值包裝在同一個整數(shù)中,它將32位的高3位表示線程的狀態(tài)值,而后29位來表示線程的數(shù)量。這也意味著,在ThreadPoolExecutor中最多可以存在線程數(shù)實際為2^29-1個,當然這個只是理論值,實際的應用根本不可能有這么多線程數(shù)量。
設(shè)計思想與目的:
也許很多人會覺得這樣的設(shè)計有點奇怪,因為不就是表示2個信息嘛,我的線程數(shù)量用個int來表示,而線程狀態(tài)用個byte來表示不就OK嘛,不就多浪費一個字節(jié)數(shù)量而已嘛。其原因在于,線程的狀態(tài)和數(shù)量往往需要同時更新,然而線程池天生處在一個并發(fā)的環(huán)境下,那么當對2個變量進行修改時,那么就勢必需要通過鎖來進行線程安全的處理,從而保證2個變量修改具備原子性;但是這種做法對于性能的影響是非常嚴重的,因此在ThreadPoolExecutor將兩個變量的分別包裝在一個變量中,最后的并發(fā)操作發(fā)生在AtomicInteger上,而AtomicInteger恰恰就是具有一個無鎖原子操作類,這樣既可以解決線程安全的問題,又可以規(guī)避避免所的使用,從而提供性能。
下面是ThreadPoolExecutor中對狀態(tài)和線程數(shù)量的源碼,這里使用的是JDK1.7,在第375行。
/*
使用AtomicInteger來對實際的線程數(shù)量(workCount)
以及這個線程池的狀態(tài)(runState),
該值默認為111...000(29個0),每增加一個線程,ctl值就會+1
使用后29位來保存線程數(shù)量。
*/
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
/*
使用int的bit數(shù)量減去3,即32-3=29
*/
private static final int COUNT_BITS = Integer.SIZE - 3;
/*
線程池中工作線程的最大數(shù)量為2^29-1
*/
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
/*
使用如下整型值來表示線程池的狀態(tài)
RUNNING:當處于RUNNING狀態(tài)時,
線程池可以接受新的任務(wù)并且會執(zhí)行任務(wù)隊列中的任務(wù)。
*/
private static final int RUNNING = -1 << COUNT_BITS;
/*
SHUTDOWN:不再接受新的任務(wù),但是會繼續(xù)處理隊列中還沒有處理完成的任務(wù)
*/
private static final int SHUTDOWN = 0 << COUNT_BITS;
/*
STOP:不再接受新的任務(wù),
并且不會繼續(xù)處理隊列中還沒有處理完成的任務(wù),
同時還會去中斷當前正在執(zhí)行的任務(wù)。
*/
private static final int STOP = 1 << COUNT_BITS;
/*
TIDYING:所有的任務(wù)都已經(jīng)結(jié)束,并且workCount的數(shù)量為0。
*/
private static final int TIDYING = 2 << COUNT_BITS;
/*
當線程池的狀態(tài)變到TERMINATED狀態(tài)后,ThreadPoolExecutor
提供了一個terminated()方法供用戶進行擴展實現(xiàn)。我們可以通過這個方法記錄線程池關(guān)閉等信息。
通過上面分析可以自己手動進行運算一下,會得到如下的結(jié)果:
(1).當線程池處于RUNNING時,ctl值小于0,
(2).而當線程池處于其他狀態(tài)時,則ctl將大于等于0。
*/
private static final int TERMINATED = 3 << COUNT_BITS;
/*
使用一個整數(shù)值進行兩個信息的包裝與拆解的過程
獲取線程的狀態(tài),取32位的前3位即可。
即就ctl與11100....(29個0)進行按位與。
*/
private static int runStateOf(int c) { return c & ~CAPACITY; }
/*
取線程的數(shù)量,即取后29位的值,即將ctl與
00011...(29個1)進行按位與運算。
*/
private static int workerCountOf(int c) { return c & CAPACITY; }
/*
將兩個狀態(tài)值組裝成一個整數(shù)
通過rs(狀態(tài)值)與wc(workCount)值進行或運算,它們各自獨立
不會產(chǎn)生相互的響應。
*/
private static int ctlOf(int rs, int wc) { return rs | wc; }
private static boolean runStateLessThan(int c, int s) {
return c < s;
}
private static boolean runStateAtLeast(int c, int s) {
return c >= s;
}
/*
對線程池狀態(tài)的判斷方法,此時無需對ctl進行拆分獲前3位進行比較。
因為SHUTDOWN的值為0,而只要ctl小于0,則說明線程池就處于運行狀態(tài)。
*/
private static boolean isRunning(int c) {
return c < SHUTDOWN;
}
2.看我如幫你完成任務(wù)
之前我們可能將一個任務(wù)丟給線程池就不管了,它的底層執(zhí)行對于我們來說可能是一個黑匣子;下面我們來深入的看看一個任務(wù)是如何被提交到線程池中,如何被線程池的線程所執(zhí)行,我們以execute(Runnable)方法作為例子來進行分析。
查看ThreadPoolExecutor的executor方法:
public void execute(Runnable command) {
//判斷提交的任務(wù)是否為null
if (command == null)
throw new NullPointerException();
//獲取到ctl值
int c = ctl.get();
/*
通過ctl值進行拆解,獲取到具體的線程池中實際的線程數(shù)量,
判斷其是否小于用戶所執(zhí)行的corePoolSize,如果小于則
直接創(chuàng)建一個線程進行執(zhí)行(通過addWorker(command,true)方法來處理)
否則就繼續(xù)執(zhí)行下面代碼
*/
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
/*
判斷線程池中的狀態(tài)是否為RUNNING,如果是
則將任務(wù)提交到任務(wù)隊列中,如果提交任務(wù)隊列成功,
則會對線程池的狀態(tài)進行一次重復檢查,再次檢查當前線程的狀態(tài)以及實際的線程數(shù)量。
提交失敗可能是由于任務(wù)隊列已經(jīng)滿了,從而無法提交。
*/
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
/*
當提交任務(wù)隊列失敗,則會去判斷當前線程池線程是否已經(jīng)到達最大值
如果未到達,則創(chuàng)建一個線程繼續(xù)執(zhí)行,否則則執(zhí)行拒絕處理。
*/
else if (!addWorker(command, false))
reject(command);
}
/*
根據(jù)給定的任務(wù)來創(chuàng)建線程,創(chuàng)建的過程中會根據(jù)實際的線程數(shù)量以及狀態(tài)來判斷是否去創(chuàng)建。
core:當core為true,判斷當前線程池中實際線程數(shù)是否大于corePoolSize,如果大于,則不執(zhí)行。
當core為false,判斷當前線程池中實際線程數(shù)是否大于maximumPoolSize,如果大于,則不執(zhí)行。
*/
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
//獲取實際的線程數(shù)量
int wc = workerCountOf(c);
/*
判斷是否大于2^29-1,同時根據(jù)給定的core參數(shù),
來選擇到底是與corePoolSize還是maximumPoolSize進行比較
*/
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//當比較成功,則對工作線程數(shù)+1,跳出循環(huán)
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
/*
完成對任務(wù)的真正執(zhí)行
*/
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
final ReentrantLock mainLock = this.mainLock;
/*
將任務(wù)封裝在一個Worker中,這里的Worker是
ThreadPoolExecutor中所定義的一個Runnable實現(xiàn)類。
*/
w = new Worker(firstTask);
/*
worker中同時封裝了線程對象,該線程對象是從線程工廠中所獲取
因此Worker的數(shù)量和線程池中線程的數(shù)量是一一對應的。
*/
final Thread t = w.thread;
if (t != null) {
mainLock.lock();
try {
int c = ctl.get();
int rs = runStateOf(c);
//對狀態(tài)進行重復檢測,這里具體的細節(jié)我們暫且忽略,只關(guān)注任務(wù)什么時候被執(zhí)行成功。
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
/*
通過一個HashSet專門存儲Worker,本質(zhì)上是存儲線程,
因為每一個Worker底層都維護著一個線程
*/
workers.add(w);
/*
獲取當前的Set集合的大小,用戶統(tǒng)計線程池中線程數(shù)量達到最高峰時的線程數(shù),
使用largestPoolSize來存儲。
*/
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
//設(shè)置任務(wù)添加成功,只有該值為true時,任務(wù)才會真正得到執(zhí)行
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
//執(zhí)行任務(wù)
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
總結(jié)線程池任務(wù)提交的過程:
通過上面的代碼分析,我們大體上可以知道一個任務(wù)提交到線程池中會經(jīng)歷如下的3步過程:
- 如果線程池中實際的線程數(shù)量小于corePoolSize,那么就啟動一個新的線程進行任務(wù)的處理。
- 如果線程池中實際的線程數(shù)量大于等于corePoolSize,則將任務(wù)放置到任務(wù)隊列中進行處理。
- 如果由于任務(wù)隊列已經(jīng)滿了,無法再存放新的任務(wù),則判斷線程池中實際的線程數(shù)量是否大于maximumPoolSize,如果小于,則創(chuàng)建新的線程執(zhí)行,否則將拒絕執(zhí)行任務(wù)。
3.負載過高我該怎么辦?
上面我們已經(jīng)提到了當線程池實際的數(shù)據(jù)量到達最大值時,如果再次提交新的執(zhí)行,則會拒絕執(zhí)行。那么ThreadPoolExecutor是如何拒絕執(zhí)行的呢?
ThreadPool中默認的拒絕策略使用的是中斷策略,即當無法接哦受新的任務(wù)時,直接拋出RejectedExecutionException異常。
/**
* ThreadPool中默認定義的RejectedExecutionHandler
*/
private static final RejectedExecutionHandler defaultHandler =
new AbortPolicy();
AbortPolicy實現(xiàn)了java.util.concurrent.RejectedExecutionHandler接口。
public interface RejectedExecutionHandler {
//當ThreadPoolExecutor無法接受新的任務(wù)時,將會執(zhí)行該方法完成任務(wù)的拒絕執(zhí)行。
void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}
public static class AbortPolicy implements RejectedExecutionHandler {
public AbortPolicy() { }
/*
AbortPolicy的執(zhí)行拒絕策略非常簡單,當無法再次接受新的任務(wù)時,就拋出一個異常。
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
除了AbortPolicy,ThreadPoolExecutor中還定義3個RejectedExecutionHandler的實現(xiàn)類,它們分別是DiscardPolicy、DiscardOldestPolicy、CallerRunsPolicy,下面是這3個實現(xiàn)類所對應的源碼:
public static class DiscardPolicy implements RejectedExecutionHandler {
public DiscardPolicy() { }
//DiscardPolicy的處理方式非常簡單,直接忽略提交的任務(wù)
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
public DiscardOldestPolicy() { }
/*
DiscardOldestPolicy首先判斷線程池是否已經(jīng)關(guān)閉,
如果未關(guān)閉,則將任務(wù)隊列中之前提交的任務(wù)移除,將
新提交的任務(wù)加入到隊列中
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
}
public static class CallerRunsPolicy implements RejectedExecutionHandler {
public CallerRunsPolicy() { }
/*
CallerRunsPolicy所采取的策略是不再啟動新的線程,
而是讓當前提交任務(wù)的線程直接自己去處理這個任務(wù)
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
}
在實際的應用中,我們還可以自己去擴展RejectedExecutionHandler,根據(jù)自己的業(yè)務(wù)需求來進行相應的處理,將拒絕的信息通過日志記錄從而方便后期進行參數(shù)調(diào)優(yōu)。
4.線程從何而來?
我們在分析ThreadPoolExecutor的參數(shù)時,提到了一個ThreadFactory的東東,它是一個用于創(chuàng)建線程的工廠,該接口的定義也非常的簡單,里面只有一個方法。
public interface ThreadFactory {
Thread newThread(Runnable r);
}
在ThreadPoolExecutor中,默認所使用的ThreadFactory是Executors中所定義的DefaultThreadFactory,該方法的具體實現(xiàn)如下:
public Thread newThread(Runnable r) {
//設(shè)置線程組以及線程的名字
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
//設(shè)置線程為前臺線程,同時設(shè)置線程的優(yōu)先級
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
ThreadFactory在ThreadPool中主要用于Worker中,在執(zhí)行任務(wù)的時候,用戶所提交的任務(wù)會被包裝成一個Worker來進行執(zhí)行。而Worker內(nèi)部維護著一個線程對象,這個線程對象就是從ThreadFactory中所得到的。
/*
Worker的構(gòu)造方法,將用戶提交的任務(wù)進行封裝
*/
Worker(Runnable firstTask) {
setState(-1);
this.firstTask = firstTask;
//通過獲取線程工廠來獲取到線程
this.thread = getThreadFactory().newThread(this);
}
至此,ThreadPoolExecutor中比較核心的內(nèi)容就分析到這里,如果發(fā)現(xiàn)在分析的過程中存在問題,請及時指正!后面的內(nèi)容將分析一下線程池工具類--Executors以及JDK1.7推出的ForkJoinPool。