實(shí)際編程中,頻繁創(chuàng)建和銷毀線程開銷很大,所以一般使用線程的方式是線程池。
很方便的,java給我們提供了現(xiàn)成的線程池創(chuàng)建函數(shù)ThreadPoolExecutor,這個(gè)創(chuàng)建函數(shù)也成了不少公司面試必考題,當(dāng)然,要想徹底理清線程池執(zhí)行過程,需要剖析源碼,這里我們就來仔細(xì)分析分析。
首先是線程池創(chuàng)建函數(shù)。
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
可以把線程池類比成一個(gè)小公司,公司有少量正式員工,執(zhí)行平時(shí)的一些工作量。如果工作量太大,正式員工忙不過來,會(huì)雇傭部分外包人員。
原則上,如果任務(wù)量突然變多,先把任務(wù)臨時(shí)緩存起來,等正式員工有空閑時(shí),交由正式員工由處理(節(jié)約成本嘛)。如果緩存隊(duì)列滿了,這時(shí)候就要考慮找外包人員了。
公司總預(yù)算有限,所以正式員工數(shù)量是固定的,且雇傭的外包人員也有最大人數(shù)限制。
如果工作量變少,為了節(jié)約成本,就要釋放部分外包人員。
如果工作量實(shí)在太大了,正式員工、外包人員也達(dá)到最大預(yù)算人數(shù),且所有人都在拼命完成工作任務(wù),這時(shí)候,就要拒絕一部分任務(wù)了。
接下來這幾個(gè)參數(shù)就好理解了。
- corePoolSize: 核心線程數(shù)量,可以類比正式員工數(shù)量,常駐線程數(shù)量。
- maximumPoolSize: 最大的線程數(shù)量,公司最多雇傭員工數(shù)量(包含外包人員)。常駐+臨時(shí)線程數(shù)量。
- workQueue:任務(wù)等待隊(duì)列,所有的正式員工都在處理任務(wù),再來任務(wù)就先放到隊(duì)列吧,隊(duì)列如果也滿了,那就要找外包了。
- keepAliveTime:非核心線程空閑時(shí)間,就是外包人員等了多久,如果還沒有活干,就被解雇了。
- threadFactory: 創(chuàng)建線程的工廠,在這個(gè)地方可以統(tǒng)一處理創(chuàng)建的線程的屬性。比如每個(gè)員工的名字,工號(hào)不一致,方便區(qū)分和安排任務(wù)。
- handler:線程池拒絕策,什么意思呢?就是當(dāng)任務(wù)實(shí)在是太多,人也不夠,需求池也排滿了,還有任務(wù)咋辦?默認(rèn)是不處理,拋出異常告訴任務(wù)提交者,我這忙不過來了。
我們使用線程池的時(shí)候,一般是直接調(diào)用execute,提交一個(gè)任務(wù),對(duì)應(yīng)到線程池里是怎么處理的呢?
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) { //關(guān)鍵步驟1
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) { //關(guān)鍵步驟2
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false)) //關(guān)鍵步驟3
reject(command);
}
首先得理解這個(gè)ctl是什么意思?
線程池里為了充分利用int型的每一位,使用一個(gè)AtomicInteger的ctl來記錄線程池中線程的數(shù)量及當(dāng)前線程池的狀態(tài)。低29bit位表示線程池中線程的數(shù)量,高3bit位用來記錄線程池的狀態(tài)是RUNNING,SHUTDOWN,STOP,TIDYING,TERMINATED中的一種。
添加任務(wù)步驟
獲取線程池中線程數(shù)量。
-
線程數(shù)量小于核心線程數(shù)嗎。
- 小于核心線程數(shù),添加一個(gè)核心線程,添加成功的話直接返回。
- 如果添加核心線程失敗,因?yàn)槭嵌嗑€程同時(shí)執(zhí)行,再獲取一遍線程池中線程數(shù)量,繼續(xù)下一步。
-
線程數(shù)量大于等于核心線程數(shù)或上面添加核心線程失敗。
- 線程池還在運(yùn)行且添加任務(wù)到任務(wù)隊(duì)列成功。
- 重新檢查線程池是否還在運(yùn)行
- 線程池不在運(yùn)行,且從任務(wù)隊(duì)列刪除任務(wù)成功,拒絕該任務(wù)。
- 線程池在運(yùn)行,但線程池中沒有線程(核心線程數(shù)也可以設(shè)置成0),添加一個(gè)非核心線程。
- 線程池不在運(yùn)行或添加任務(wù)到任務(wù)隊(duì)列失敗
- 嘗試添加非核心線程去處理該任務(wù)。
- 如果添加非核心線程失敗,拒絕該任務(wù)。
- 重新檢查線程池是否還在運(yùn)行
- 線程池還在運(yùn)行且添加任務(wù)到任務(wù)隊(duì)列成功。
整個(gè)過程有點(diǎn)繞,可以對(duì)比圖形,再理解一遍。
添加任務(wù)最關(guān)鍵的函數(shù)就是addWorker。
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
要理解這個(gè)問題,首先要知道java里的標(biāo)簽是怎么用的,有點(diǎn)類似goto的意思,可以參考:https://blog.csdn.net/chanllenge/article/details/90266538
addWorker的核心思想是
- 添加線程時(shí)時(shí),區(qū)分核心和非核心線程,并可指定該線程的第一個(gè)處理任務(wù)。
- 判斷線程池狀態(tài)及工作隊(duì)列,線程數(shù)量等參數(shù)的合法性。
- 通過CAS自旋方式,增加線程數(shù)量。
- 加悲觀鎖,并結(jié)合參數(shù)合法性,添加一個(gè)線程worker,到線程隊(duì)列workers。
- 線程添加成功且正常啟動(dòng),返回true,其他情況,添加線程失敗,移除該線程,并線程數(shù)量減1,返回false。
線程池中,通過一個(gè)set來存儲(chǔ)所有的線程。
/**
* Set containing all worker threads in pool. Accessed only when
* holding mainLock.
*/
private final HashSet<Worker> workers = new HashSet<Worker>();
線程添加成功之后,該線程首先會(huì)執(zhí)行指定的第一個(gè)處理任務(wù),然后從工作隊(duì)列的隊(duì)首依次取任務(wù)去執(zhí)行。
源代碼:
/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this);
}
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
看到這里,線程池的創(chuàng)建和執(zhí)行你理解了嗎?歡迎留言討論!