池化技術(shù)簡單來說就是提前準備資源,當需要的時候可以直接獲取,避免了在需要大量資源時因創(chuàng)建資源耗時而導致阻塞和過高的時延。常見的有線程池,連接池和內(nèi)存池。
Java的線程池就是提前創(chuàng)建一定數(shù)量的線程,當需要線程處理相應(yīng)工作的時候直接喚醒獲取即可,從而增加系統(tǒng)的處理速度。
1. Java 中ThreadPoolExecutor
1.1 ThreadPoolExecutor構(gòu)造參數(shù)說明
Java中常用的線程池類為ThreadPoolExecutor,其繼承關(guān)系如下:

Executor和ExecutorService接口的聲明確定了ThreadPoolExecutor有兩種方式提交task——execute和submit。二者的區(qū)別在于:
execute:通過execute提交的task之后不會有返回值,提交的task也沒有返回值;
submit:通過submit提交task之后會返回一個Future對象。獲取到這個對象之后,我們可以對提交task進行相應(yīng)的操作。例如,取消,判斷是否結(jié)束,阻塞獲取task返回的結(jié)果等等。
再看ThreadPoolExecutor,這個類有三個構(gòu)造方法,使用不同的構(gòu)造方法會對一些參數(shù)使用一些缺省設(shè)置。這里看一下參數(shù)最全的構(gòu)造方法:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
下面對參數(shù)進行一一說明:
int coreSize: 線程池核心線程數(shù)量,這個數(shù)量的線程在線程池關(guān)閉之前會一直存活;
int maximumPoolSize: 線程最大線程數(shù)量,當核心線程被使用完之后,線程池中線程數(shù)量可以臨時增到這個數(shù)量;
long keepAliveTime: 除了核心線程之外,臨時增加線程的存活時間,超過這個時間線程就會結(jié)束;
TimeUnit unit: keepAliveTime的時間單位;
-
BlockingQueue<Runnable> workQueue: 線程池中的一個阻塞隊列,當線程池中線程被使用完之后,提交的task就會被放入這個隊列;JUC包中提供的隊列主要有以下幾種:
ArrayBlockingQueue: 一個FIFO隊列,新增加的元素會被追加到集合的尾部。這是一個有界的集合,一旦創(chuàng)建其大小就不能夠再改變了。往已滿隊列中追加元素會導致阻塞;
DelayQueue: 無界隊列,在放入元素的時候可以指定一個延遲時間,只有當延遲時間結(jié)束后,這個元素才能被取出;
LinkedBlockingQueue:通過不同的構(gòu)造器可以創(chuàng)建一個有界或者無界的隊列,隊列中的元素FIFO;
PriorityBlockingQueue:無界隊列,基于優(yōu)先級實現(xiàn)的隊列。集合中的元素按照優(yōu)先級排序。
ThreadFactory threadFactory: 線程工廠,用來創(chuàng)建線程池中的線程。
-
RejectedExecutionHandler handler: 線程池的飽和策略,如果線程池中線程被用完了,隊列也滿了,那么對于新提交的task就會使用這個策略。目前主要有以下幾種:
AbortPolicy:直接拋出異常(默認策略)
CallerRunsPolicy:調(diào)用線程池所在的線程去執(zhí)行被拒絕的task,會阻塞線程池所在的線程;
DiscardOldestPolicy:丟棄隊列里最久之前的一個任務(wù),并執(zhí)行當前任務(wù)。
DiscardPolicy:不進行任何處理,直接丟棄掉。
1.2 ThreadPoolExecutor 提交流程
1.2.1 ThreadPoolExecutor 中的ctl變量
在討論線程池的提交流程之前,我們需要先把注意力集中到一個特殊的變量上——ctl。 ctl是線程池中的控制狀態(tài),它是一個原子級別讀寫的integer,包含兩層含義:
workerCount: 有效線程數(shù);
runStat: 線程池的狀態(tài),有Running,Shutdown,Stop,Tidying,Terminate五種狀態(tài)。

因為5種狀態(tài)至少需要3位來表示,剩下的全部用來表示workerCount。所有COUNT_BITS為Interger.SIZE - 3 = 29位。CAPACITY表示的是最大容量,29表示的最大值為2^29 -1,即1 << 29 -1(左移一位相當于乘以2, 1*2^29 -1),二級制表示為
00011111111111111111111111111111。相當于低29為用來表是容量,高3位留下來表示狀態(tài)。
五種狀態(tài)對應(yīng)的表示分別如下:
-
RUNNING:
11100000000000000000000000000000running狀態(tài)下線程池接受新的task,并處理隊列中的task
-
SHUTDOWN:
00000000000000000000000000000000shutdown狀態(tài)下不會接受新的task,但是會處理隊列中的task
-
STOP:
00100000000000000000000000000000不接受新的task,處理隊列中的task,終止正在處理的task
-
TIDYNG:
01000000000000000000000000000000所有的task結(jié)束,workerCount為0,無活躍線程
-
TERMINATED:
011000000000000000000000000000000terminated()方法執(zhí)行結(jié)束
同時在線程池中還定義了三個方法來對ctl變量進行操作,其中兩個是從ctl中獲取workerCount和runStart,一個是通過指定的workerCount和runStat生成ctl:

runStateOf和workerCountOf都是簡單的通過&計算將低29為或者高3位置0。而ctlOf這是通過 |操作將高3位和低29位組合到一起。
ctl變量的初始化語句入如下,表示的是running狀態(tài),當前worker數(shù)為0:
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
1.2.2 提交任務(wù)流程
通過ThreadPoolExecutor提交任務(wù)有兩種方式,一種是execute,由他自己實現(xiàn)。另一種是submit,由其父類AbstractExecutorService實現(xiàn)。
這里需要提前說明一些ThreadPoolExecutor類的一些屬性:
workers:HashSet 用來存放線程池worker資源(線程)
workQueue:BlockingQueue 用來存放用戶提交的task(Runnable)
1.2.2.1 通過execute提交task
execute(Runnable command)方法
這個方法體比較簡短,我就直接全部粘過來了。

通過代碼我們可以看到,最外圍的分支結(jié)構(gòu)有三個:
如果線程池的worker數(shù)量小于
corePoolSize,就會添加一個worker,執(zhí)行當前的task(這里我們先不探究addWorker執(zhí)行了什么操作);如果第一種情況沒有滿足(當前wroker數(shù)已經(jīng)達到了corePoolSize大小或者addWorker失敗),就把task添加到阻塞隊列中。這個過程成功之后,就會進行一次double-check。
如果當前worker數(shù)量大于coreSize,且隊列已滿,就會嘗試創(chuàng)建一個非核心線程來執(zhí)行當前task。創(chuàng)建失敗的話就會直接使用飽和策略處理task。
在double-check的過程中如果發(fā)現(xiàn)線程池已經(jīng)不在運行狀態(tài)就會把當前task移除,并使用飽和策略處理它。否則就會檢查是否有必要創(chuàng)建一些新的線程。

addWorker(Runnable firstTask, boolean core)方法
這里的代碼分解一下,首先看一下方法的參數(shù)列表:
private boolean addWorker(Runnable firstTask, boolean core)
這個方法需要傳遞兩個參數(shù),firstTask表示添加的新worker處理的第一個任務(wù),布爾型的core表示添加的是否為核心線程。
跳過線程狀態(tài)校驗的過程,我們直接看這一段:
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
}
這里是循環(huán)CAS來增加ctl的數(shù)值,一旦增加成功,就會正式地創(chuàng)建線程。
線面是創(chuàng)建線程的流程:
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
這里是直接創(chuàng)建了一個worker對象,而Worker的構(gòu)造方法如下:
Worker(Runnable firstTask) {
setState(-1);
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
可以看到,在構(gòu)造方法中,Worker創(chuàng)建了一個新的線程作為成員變量。
當一個worker創(chuàng)建之后,還會進行重復校驗,已確定worker確實創(chuàng)建成功。
if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive())
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
線程創(chuàng)建成功之后,就會啟動worker。
if (workerAdded) {
t.start();
workerStarted = true;
}
Worker
那么線程池創(chuàng)建了worker線程之后都干些什么呢?如果隊列中沒有任務(wù)要做,線程如何?;钅??如果隊列中有worker,線程又會如何去執(zhí)行呢?為了了解這些,剖析一下Worker的代碼就很有必要了。
ThreadPoolExecutor類通過一個HashSet<Worker>來存放Worker對象:
private final HashSet<Worker> workers = new HashSet<Worker>();
通過Worker的構(gòu)造器我們看到Worker在構(gòu)造線程的時候是將自身作為參數(shù)傳到方法中的,因為其本身也實現(xiàn)了Runnable接口,所以當執(zhí)行t.start()的時候,實際上執(zhí)行的是Worker的run()方法。一下是wroker的runWorker方法:
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
粗略地看一下代碼,我們知道worker首先會判斷自己是否有屬于自己的firstTask,如果有的話,就先執(zhí)行這個task,這里是task.run(),只是普通的方法調(diào)用,執(zhí)行了task的邏輯。當自己的第一個task執(zhí)行完之后,worker就會進行循環(huán),通過getTask()方法不停地從workerQueue中獲取task。這個getTask是個阻塞方法,會一直循環(huán)直到返回task或者線程池狀態(tài)不為running的時刻。
當然,這個getTask()的工作內(nèi)容不僅僅是返回隊列中的task,同時也管理著非核心線程的存活。我們通過參數(shù)指定了非核心線程的存活時間,當線程池中有非核心線程且線程空閑的時間超過了指定的時間,就會做掉這些線程。這里它是通過workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)這個方式實現(xiàn),當超過這個等待時間獲取到的結(jié)果依然為null,表示當前的線程已經(jīng)空閑了keepAliveTime這么長時間了,屬于超時的非核心線程。之后會return null,在worker的runWorker()方法中調(diào)用processWorkerExit()方法結(jié)束當前worker。
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
1.2.2.2 通過submit提交task
這個方法是在ThreadPoolExecutor的父類AbstractExecutorService中定義的。AbstractExecutorService是一個抽象類。而submit的邏輯也比較簡單:

參數(shù)的主要區(qū)別在于Runnable和Callable,前者無返回值后者有返回值,可以通過Future的get方法阻塞獲取。
通過submit提交的task可以獲取一個future對象,可以對已提交的task進行相關(guān)操作。例如獲取返回值或者判斷運行狀態(tài)等等。
可以看到,submit只是將task封裝成了一個ftask,然后調(diào)用了execute方法調(diào)教了這個task到線程池中。這里使用了模板方法模式,submit調(diào)用的是由子類的線程池實現(xiàn)的execute,也就是上面的execute方法。
小結(jié):
關(guān)于線程池的構(gòu)造參數(shù)含義和參數(shù)的使用邏輯在日常使用過程中還是很值得關(guān)注的,對于日后線程池問題定位調(diào)優(yōu)都會有不少的幫助。理解線程池工作原理,線程池中worker的執(zhí)行流程可以讓我們對池化技術(shù)和資源利用的看法有更進一步的了解。