轉(zhuǎn)載請(qǐng)附原文鏈接:ThreadPoolExecutor源碼學(xué)習(xí)筆記
歡迎來我的個(gè)人主頁交流:http://extremej.itscoder.com/
大部分分析以注釋形式寫在源碼中
本篇筆記將從 ThreadPoolExecutor 的一次使用上來分析源碼,主要涉及線程池創(chuàng)建,execute 的步驟,任務(wù)添加到阻塞隊(duì)列,線程從阻塞隊(duì)列中拿取任務(wù)執(zhí)行,線程的回收,線程池的終止。
涉及到的類有
Executors — 獲取線程池
ThreadPoolExecutor — 線程池
Worker — 工作線程
LinkedBlockingQueue — 阻塞隊(duì)列
-
RejectedExecutionHandler — 任務(wù)拒絕處理器(實(shí)在不知道什么翻譯~)
?
線程池的獲取
我們知道可以通過 Executors 來獲取不同類型的線程池,那么就從 Executors 來開始看它是如何返回不同類型的線程池的,看看我們常用的一些方法
//獲取一個(gè)固定線程池
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}
//獲取只有一個(gè)線程的池子
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
//獲取一個(gè)緩存線程池,可變
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
從上面的三個(gè)方法可以發(fā)現(xiàn)其實(shí)都是 new 了一個(gè) ThreadPoolExecutor ,但是傳入的參數(shù)不同,我們進(jìn)到這個(gè)構(gòu)造方法中去一探究竟,看看不同的參數(shù)到底代表了什么
//構(gòu)造方法
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
通過參數(shù)名稱以及注釋可以知道這幾個(gè)參數(shù)的作用分別是
- corePoolSize — 核心線程數(shù),即允許閑置的線程數(shù)目
- maximumPoolSize — 最大線程數(shù),即這個(gè)線程池的容量
- keepAliveTime — 非核心線程的閑置存活時(shí)間
- unit — 上一個(gè)參數(shù)的單位
- workQueue — 任務(wù)隊(duì)列(阻塞隊(duì)列)
- threadFacotry — 線程創(chuàng)建工廠
- handler — 當(dāng)線程池或者任務(wù)隊(duì)列容量已滿時(shí)用于reject
這里要明白一件事情,核心線程只是通過數(shù)目來判斷,而不是說先創(chuàng)建的線程就是核心線程
在構(gòu)造方法里面初始化了成員變量值,通過構(gòu)造方法應(yīng)該明白了不同類型的線程獲取的原理。
任務(wù)的執(zhí)行
A.狀態(tài)屬性
在看源碼之前先了解一下 ThreadPoolExecutor的幾個(gè)狀態(tài)屬性,這對(duì)后面的源碼閱讀有很重要的作用,ThreadPoolExecutor 有五種狀態(tài)
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
從上到下依次是
- RUNNING — 運(yùn)行狀態(tài),可以添加新任務(wù),也可以處理阻塞隊(duì)列中的任務(wù)
- SHUTDOWN — 待關(guān)閉狀態(tài),不再接受新的任務(wù),會(huì)繼續(xù)處理阻塞隊(duì)列中的任務(wù)
- STOP — 停止?fàn)顟B(tài),此時(shí)的線程池不處理任何任務(wù)
- TIDYING — 整理狀態(tài),也可以理解為預(yù)終結(jié)狀態(tài),這個(gè)時(shí)候任務(wù)都處理完畢,池中無有效線程
- TERMINATED — 終止?fàn)顟B(tài)
B.execute(Runnable command)
當(dāng)獲取到了一個(gè)線程池之后,需要它來執(zhí)行異步任務(wù),也就是 execute(Runnable) ,傳入一個(gè) runnable 對(duì)象,在 run 方法中執(zhí)行我們的代碼,那么來看一下 execute() 是怎么工作的,因?yàn)樵创a的注釋解釋得十分清楚,這里將注釋也貼出來。簡單翻譯一下,當(dāng) execute 被調(diào)用時(shí)總共有三種情況。
- 如果當(dāng)前的有效線程數(shù)小于核心線程數(shù),則試圖創(chuàng)建一個(gè)新的 worker 線程
- 如果上面一步失敗了,則試圖將任務(wù)添加到阻塞隊(duì)列中,并且要再一次判斷需要不需要回滾隊(duì)列,或者說創(chuàng)建線程(后面會(huì)詳細(xì)說明)
- 如果上面兩步都失敗了,則會(huì)試圖強(qiáng)行創(chuàng)建一個(gè)線程來執(zhí)行這個(gè)任務(wù),如果還是失敗,扔掉這個(gè)任務(wù)
了解了這三個(gè)步驟,來看看源碼,源碼中調(diào)用了 addworker 方法,這是創(chuàng)建線程的方法,會(huì)在后面講到
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
//1.判斷有效線程數(shù)是否小于核心線程數(shù)
if (workerCountOf(c) < corePoolSize) {
//創(chuàng)建新線程
if (addWorker(command, true))
return;
c = ctl.get();
}
//2.分開來看,首先判斷當(dāng)前的池子是否是處于 running 狀態(tài)
//因?yàn)橹挥?running 狀態(tài)才可以接收新任務(wù)
//接下來判斷能否成功添加到隊(duì)列中,如果隊(duì)列滿了或者其他情況則會(huì)跳到下一步
if (isRunning(c) && workQueue.offer(command)) {
//再次檢查池子的狀態(tài),如果進(jìn)入了非 running 狀態(tài),回滾隊(duì)列,扔掉這個(gè)任務(wù)
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
//如果處于 running 狀態(tài)則檢查當(dāng)前的有效線程,如果沒有則創(chuàng)建一個(gè)線程
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//3.前兩步失敗了,就強(qiáng)行創(chuàng)建線程,成功會(huì)返回true,如果失敗扔掉這個(gè)任務(wù)
else if (!addWorker(command, false))
reject(command);
}
解釋一下第二步,為什么要recheck
當(dāng)這個(gè)任務(wù)被添加到了阻塞隊(duì)列前,池子處于 RUNNING 狀態(tài),但如果在添加到隊(duì)列成功后,池子進(jìn)入了 SHUTDOWN 狀態(tài)或者其他狀態(tài),這時(shí)候是不應(yīng)該再接收新的任務(wù)的,所以需要把這個(gè)任務(wù)從隊(duì)列中移除,并且 reject
同樣,在沒有添加到隊(duì)列前,可能有一個(gè)有效線程,但添加完任務(wù)后,這個(gè)線程閑置超時(shí)或者因?yàn)楫惓1桓傻袅?,這時(shí)候需要?jiǎng)?chuàng)建一個(gè)新的線程來執(zhí)行任務(wù)
C .addWorker()
前一步把 execute 的流程捋了一遍,里面多次出現(xiàn)了 addWorker() 方法,前文說到這是個(gè)創(chuàng)建線程的方法,來看看 addWorker 做了些什么,這個(gè)方法代碼比較長,我們拆開來一點(diǎn)一點(diǎn)看
- 第一部分 — 判斷各種基礎(chǔ)異常
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
// 檢查線程池狀態(tài),隊(duì)列狀態(tài),以及 firstask ,拆開來看
// 這段代碼看起來異常的蛋疼,轉(zhuǎn)換一下邏輯即
//rs>= SHUTDOWN && (rs != SHUTDOWN || firstTask != null ||workQueue.isEmpty())
// 總結(jié)起來就是 當(dāng)前處于非 Running 狀態(tài),并且這三種情況
// 1. 不是處于 SHUTDOWN 狀態(tài),不能再創(chuàng)建線程
// 2. 有新的任務(wù) (因?yàn)椴荒茉俳邮招碌娜蝿?wù))
// 3. 阻塞隊(duì)列中已經(jīng)沒有任務(wù) (不需要再創(chuàng)建線程)
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);//當(dāng)前有效線程數(shù)目
// 根據(jù)傳入的參數(shù)確定以核心線程數(shù)還是最大線程數(shù)作為判斷條件
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
// 大于容量 或者指定的線程數(shù),不允許創(chuàng)建
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
}
- 第二部分 — 試圖創(chuàng)建線程
創(chuàng)建一個(gè)Worker(什么東西?下文會(huì)講解,這里把它就當(dāng)成是一個(gè)線程的容器)
boolean workerStarted = false;//標(biāo)記 worker 開啟狀態(tài)
boolean workerAdded = false;//標(biāo)記 worker 添加狀態(tài)
Worker w = null;
try {
w = new Worker(firstTask); //將這個(gè)任務(wù)作為 worker 的第一個(gè)任務(wù)傳入
final Thread t = w.thread;//通過 worker 獲取到一個(gè)線程
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
// running狀態(tài),或者 shutdown 狀態(tài)但是沒有新的任務(wù)
if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
// 將這個(gè) worker 添加到線程池中
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
//標(biāo)記worker添加成功
workerAdded = true;
}
} finally {
mainLock.unlock();
}
// 如果 worker 創(chuàng)建成功,開啟線程
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
上面代碼從邏輯層面來看不算難懂,到這里一個(gè)任務(wù)到達(dá)后,ThreadPoolExecutor 的處理就結(jié)束了,那么任務(wù)又是怎么被添加到阻塞隊(duì)列中,線程是如何從隊(duì)列中取出任務(wù),上文中的 Worker 又是什么東西?
一個(gè)一個(gè)來,先來看看 Worker 到底是什么
D.Worker
Worker 是 ThreadPoolExecutor 的一個(gè)內(nèi)部類,實(shí)現(xiàn)了 Runnable 接口,繼承自 AbstractQueuedSynchronizer,這又是個(gè)什么鬼???我也不造~可以看看這篇文章
《Java并發(fā)包源碼學(xué)習(xí)之AQS框架(一)概述》
簡單來說,Worker實(shí)現(xiàn)了 lock 和 unLock 方法來標(biāo)示當(dāng)前線程的狀態(tài)是否為閑置
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
上一節(jié)創(chuàng)建線程成功后調(diào)用 t.start() 而這個(gè)線程又是 Worker 的成員變量
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
可以看到這里將 Worker 作為 Runnable 參數(shù)創(chuàng)建了一個(gè)新的線程,我們知道 Thread 接收一個(gè) Runnable 對(duì)象后 start 運(yùn)行的是 Runnable 的 run 方法,Worker 的 run 方法調(diào)用了 runWorker ,這個(gè)方法里面就是取出任務(wù)執(zhí)行的邏輯
public void run() {
runWorker(this);
}
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask; // 獲取到 worker 的第一個(gè)任務(wù)
w.firstTask = null;
w.unlock(); // 標(biāo)記為閑置,還沒有開始任務(wù) 允許打斷
boolean completedAbruptly = true; // 異常退出標(biāo)記
try {
// 循環(huán)取出任務(wù),如果第一個(gè)任務(wù)不為空,或者從隊(duì)列中拿到了任務(wù)
// 只要這兩個(gè)條件滿足,會(huì)一直循環(huán),直到?jīng)]有任務(wù),正常退出,或者異常退出
while (task != null || (task = getTask()) != null) {
w.lock();// 該線程標(biāo)記為非閑置
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
// 翻譯注釋:1.如果線程池STOPPING狀態(tài),需要中斷線程
// 2.Thread.interrupted()是一個(gè)native方法,返回當(dāng)前線程是否有被等待中斷的請(qǐng)求
// 3.第二個(gè)條件成立時(shí),檢查線程池狀態(tài),如果為STOP,并且沒有被中斷,則中斷線程
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
// 執(zhí)行任務(wù)
try {
beforeExecute(wt, task);// 執(zhí)行前
Throwable thrown = null;
try {
task.run(); // 執(zhí)行任務(wù)
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown); // 執(zhí)行結(jié)束
}
} finally {
task = null; // 將 worker 的任務(wù)置空
w.completedTasks++;
w.unlock(); // 釋放鎖,進(jìn)入閑置狀態(tài)
}
}// 循環(huán)結(jié)束
completedAbruptly = false; // 標(biāo)記為正常退出
} finally {
// 干掉 worker
processWorkerExit(w, completedAbruptly);
}
}
這里弄清楚了一件事情,進(jìn)入循環(huán)準(zhǔn)備執(zhí)行任務(wù)時(shí),worker 加鎖標(biāo)記為非閑置,任務(wù)執(zhí)行完畢或者出現(xiàn)異常,worker 釋放鎖,進(jìn)入閑置狀態(tài)。
也就是當(dāng)一個(gè) worker 執(zhí)行任務(wù)前或者執(zhí)行完任務(wù),到取出下一個(gè)任務(wù)期間,都是閑置狀態(tài)可以被打斷
上面取出任務(wù)調(diào)用了 getTask() ,誒~為什么有一個(gè)死循環(huán),別著急,慢慢看來。上面的代碼可以知道如果 getTask 返回任務(wù)則執(zhí)行,如果返回為 null 則 worker 需要被回收
private Runnable getTask() {
// 標(biāo)記取任務(wù)是否超時(shí)
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
// 如果線程池狀態(tài)為 STOP 或者 SHUTDOWN 并且隊(duì)列已經(jīng)為空,回收 wroker
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
//獲取當(dāng)前有效線程數(shù)
int wc = workerCountOf(c);
// Are workers subject to culling?
// timed 用來標(biāo)記當(dāng)前的 worker 是否設(shè)置超時(shí)時(shí)間,
// 還記得獲取線程池的時(shí)候 可以設(shè)置核心線程超時(shí)時(shí)間
//1.允許核心線程超時(shí)回收(即所有線程) 2.當(dāng)前有效線程超過核心線程數(shù)(需要回收)
// 如果timed == false 則該worker不會(huì)被回收,如果沒有取到任務(wù) 會(huì)一直阻塞
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// 回收線程條件
// 1. 有效線程數(shù)已經(jīng)大于了線程池的最大線程數(shù)或者設(shè)置了超時(shí)回收并且已經(jīng)超時(shí)
// 2. 有效線程數(shù)大于1或者隊(duì)列任務(wù)已經(jīng)為空
// 只有當(dāng)上面1和2 同時(shí)滿足時(shí) 則試圖回收線程
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
// 如果減少workercount成功 直接回收
if (compareAndDecrementWorkerCount(c))
return null;
// 否則重走循環(huán),從第一個(gè)判斷條件處回收
continue;
}
// 取任務(wù)
try {
// 根據(jù)是否設(shè)置超時(shí)回收來選擇不同的取任務(wù)的方式
// poll 方法取任務(wù)會(huì)有超時(shí)時(shí)間,超過時(shí)間則返回null
// take 方法沒有超時(shí)時(shí)間,阻塞式方法
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
// 如果任務(wù)不為空返回任務(wù)
if (r != null)
return r;
// 否則標(biāo)記超時(shí) 進(jìn)入下一次循環(huán)等待回收
timedOut = true;
} catch (InterruptedException retry) {
// 如果出現(xiàn)異常,試圖重試
timedOut = false;
}
}
}
getTask() 方法邏輯也捋得差不多了,這里又出現(xiàn)了兩個(gè)新的方法,workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) 和 workQueue.take() ,這兩個(gè)都是阻塞隊(duì)列的方法,來看看它們又各自是怎么實(shí)現(xiàn)的
E.LinkedBlockingQueue — 阻塞隊(duì)列
ThreadPoolExecutor 使用的是鏈表結(jié)構(gòu)的阻塞隊(duì)列,實(shí)現(xiàn)了 BlockingQueue 接口,而 BlockingQueue 則是繼承自 Queue 接口,再上層就是 Collection 接口。
因?yàn)楸酒P記主要是分析 ThreadPoolExecutor 的原理,所以不會(huì)詳細(xì)介紹 LinkedBlockingQueue 中的其它代碼,主要介紹這里所用的方法,首先來看一下上文所提到的 take()
public E take() throws InterruptedException {
E x; // 任務(wù)
int c = -1; // 取出任務(wù)后的剩余任務(wù)數(shù)量
final AtomicInteger count = this.count; // 當(dāng)前任務(wù)數(shù)量
final ReentrantLock takeLock = this.takeLock; // 加鎖防止并發(fā)
takeLock.lockInterruptibly();
try {
// 如果隊(duì)列數(shù)量為空,則一直循環(huán),阻塞線程
while (count.get() == 0) {
notEmpty.await();
}
// 取出任務(wù)
x = dequeue();
// 任務(wù)數(shù)量減一
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();// 標(biāo)記隊(duì)列非空
} finally {
takeLock.unlock(); // 釋放鎖
}
//
if (c == capacity)
signalNotFull();//標(biāo)記隊(duì)列已滿
return x;// 返回任務(wù)
}
上面的代碼可以知道 take 方法會(huì)一直阻塞直到隊(duì)列有新的任務(wù)為止
接下來是 poll 方法,可以看到幾乎與 take 方法相同,唯一的區(qū)別是在阻塞的循環(huán)代碼塊里面加了時(shí)間判斷,如果超時(shí)則直接返回為空,不會(huì)一直阻塞下去
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
E x = null; // 存放的任務(wù)
int c = -1;
long nanos = unit.toNanos(timeout); // 超時(shí)時(shí)間
final AtomicInteger count = this.count; // 隊(duì)列中的數(shù)量
final ReentrantLock takeLock = this.takeLock; // 加鎖防止并發(fā)
takeLock.lockInterruptibly();
try {
// 如果隊(duì)列為空,則不斷的循環(huán)
while (count.get() == 0) {
// 如果當(dāng)?shù)褂?jì)時(shí)小于0 即超時(shí)時(shí)間到 則返回空
if (nanos <= 0)
return null;
// 讓線程等待
nanos = notEmpty.awaitNanos(nanos);
}
x = dequeue(); // 取出一個(gè)任務(wù)
c = count.getAndDecrement(); // 取出后的隊(duì)列數(shù)量
if (c > 1)
notEmpty.signal(); // 標(biāo)記非空
} finally {
takeLock.unlock(); // 釋放鎖
}
if (c == capacity)
signalNotFull(); // 標(biāo)記隊(duì)列已滿
return x; // 返回任務(wù)
}
線程池的回收及終止
前一節(jié)分析了任務(wù)的執(zhí)行流程及原理,也留下了一個(gè)問題,worker 是如何被回收的呢?線程池該如何管理呢?回到上一節(jié)的 runWorker() 方法中,還記得最后調(diào)用了一個(gè)方法
processWorkerExit(w, completedAbruptly);
這個(gè)方法傳入了兩個(gè)參數(shù),第一個(gè)是當(dāng)前的 Woker ,第二個(gè)是標(biāo)記異常退出的標(biāo)識(shí)
首先判斷是否為異常退出,如果是異常退出的話需要手動(dòng)調(diào)整線程數(shù)量,如果是正?;厥盏模琯etTask 方法里面已經(jīng)手動(dòng)調(diào)整過了,不記得的小伙伴可以看看前文的代碼,找找 decrementWorkerCount(),
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
// 加鎖
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
// 記錄線程池完成的任務(wù)總數(shù),從 workers 中移除該 worker
try {
completedTaskCount += w.completedTasks;
workers.remove(w);
} finally {
mainLock.unlock();
}
tryTerminate();//嘗試關(guān)閉池子
int c = ctl.get();
// 以下的代碼是判斷需不需要給線程池創(chuàng)建一個(gè)新的線程
// 如果線程池的狀態(tài)是 RUNNING 或者 SHUTDOWN 進(jìn)一步判斷需不需要?jiǎng)?chuàng)建
if (runStateLessThan(c, STOP)) {
// 如果為異常退出直接創(chuàng)建,如果不是異常退出進(jìn)入判斷
if (!completedAbruptly) {
// 獲取線程池應(yīng)該存在的最小線程數(shù) 如果設(shè)置了超時(shí) 則是0,否則是核心線程數(shù)
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
// 如果 min 是0 但是隊(duì)列又不為空,則 min 應(yīng)該是1
if (min == 0 && ! workQueue.isEmpty())
min = 1;
//如果當(dāng)前池中的有效線程數(shù)大于等于最小線程數(shù) 則不需要?jiǎng)?chuàng)建
if (workerCountOf(c) >= min)
return; // replacement not needed
}
// 創(chuàng)建線程
addWorker(null, false);
}
}
上面的代碼中調(diào)用了 tryTerminate() 方法,這個(gè)方法是用于終止線程池的,又是一個(gè) for 循環(huán),從代碼結(jié)構(gòu)來看是異常情況的重試機(jī)制。還是老方法,慢慢來看總共做了幾件事情
final void tryTerminate() {
for (;;) {
int c = ctl.get();
// 如果處于這三種情況不需要關(guān)閉線程池
// 1. Running 狀態(tài)
// 2. SHUTDOWN 狀態(tài)并且任務(wù)隊(duì)列不為空,不能終止
// 3. TIDYING 或者 TERMINATE 狀態(tài),說明已經(jīng)在關(guān)閉了 不需要重復(fù)關(guān)閉
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
// 進(jìn)入到關(guān)閉線程池的代碼,如果線程池中還有線程,則需要打斷線程
if (workerCountOf(c) != 0) { // Eligible to terminate 可以關(guān)閉池子
// 打斷閑置線程,只打斷一個(gè)
interruptIdleWorkers(ONLY_ONE);
return;
// 如果有兩個(gè)以上怎么辦?只打斷一個(gè)?
// 這里只打斷一個(gè)是因?yàn)?worker 回收的時(shí)候都會(huì)進(jìn)入到該方法中來,可以回去再看看
// runWorker方法最后的代碼
}
// 線程已經(jīng)回收完畢,準(zhǔn)備關(guān)閉線程池
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();// 加鎖
try {
// 將狀態(tài)改變?yōu)?TIDYING 并且即將調(diào)用 terminated
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
terminated(); // 終止線程池
} finally {
ctl.set(ctlOf(TERMINATED, 0)); // 改變狀態(tài)
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
// 如果終止失敗會(huì)重試
}
// else retry on failed CAS
}
}
嘗試終止線程池的代碼分析完了,好像就結(jié)束了~但作為好奇寶寶,我們是不是應(yīng)該看看如何打斷閑置線程,以及 terminated 中做了什么呢?來吧,繼續(xù)裝逼
先來看打斷線程
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();//加鎖~
try {
// 遍歷線程池中的 wroker
for (Worker w : workers) {
Thread t = w.thread;
// 如果線程沒有被中斷,并且能夠獲取到 worker的鎖(說明是閑置線程)
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();// 中斷線程
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
// 只中斷一個(gè) worker 跳出循環(huán),否則會(huì)將所有的閑置線程都中斷
if (onlyOne)
break;
}
} finally {
mainLock.unlock();// 釋放鎖
}
}
有同學(xué)開始裝逼了,說我們是好奇寶寶,t.interrupt() 方法也應(yīng)該看,嗯~沒錯(cuò),但這里是調(diào)用了 native 方法,會(huì) c 的可以去看看裝逼,我就算了~
好了,再來看看 terminate, 是不是很坑爹? terminated 里面神!馬!也!沒!干!。。。淡定,其實(shí)這個(gè)方法類似于 Activity 的生命周期方法,允許你在被終止時(shí)做一些事情,默認(rèn)的線程池沒有什么要做的事情,當(dāng)然什么也沒寫啦~
/**
* Method invoked when the Executor has terminated. Default
* implementation does nothing. Note: To properly nest multiple
* overridings, subclasses should generally invoke
* {@code super.terminated} within this method.
*/
protected void terminated() { }
異常處理
還記得前面講到,出現(xiàn)各種異常情況,添加隊(duì)列失敗等等,只是籠統(tǒng)的說了一句扔掉,當(dāng)然代碼實(shí)現(xiàn)不可能是簡單一句扔掉就完了?;氐?execute() 方法中找到 reject() 任務(wù),看看究竟是怎么處理的
final void reject(Runnable command) {
handler.rejectedExecution(command, this);
}
還記得在創(chuàng)建線程池的時(shí)候,初始化了一個(gè) handler — RejectedExecutionHandler
這是一個(gè)接口,只有一個(gè)方法,接收兩個(gè)參數(shù)
void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
既然是一個(gè)接口,那么肯定有他的實(shí)現(xiàn)類,我們先不急著看所有實(shí)現(xiàn)類,先來看看這里的 handler 可能是什么,記得在使用 Executors 獲取線程池調(diào)用構(gòu)造方法的時(shí)候并沒有傳入 handler 參數(shù),那么 ThreadPoolExecutor 應(yīng)該會(huì)有一個(gè)默認(rèn)的 handler
private static final RejectedExecutionHandler defaultHandler =
new AbortPolicy();
public static class AbortPolicy implements RejectedExecutionHandler {
public AbortPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
默認(rèn) handler 是 AbortPolicy ,這個(gè)類實(shí)現(xiàn)了 rejectedExecution() 方法,拋了一個(gè) Runtime 異常,也就是說當(dāng)任務(wù)添加失敗,就會(huì)拋出異常。這個(gè)類在 AsyncTask 引發(fā)了一場血案~所以在 API19 以后修改了 AsyncTask 的部分代碼邏輯,這里就不細(xì)說啦,會(huì)在下一篇 AsyncTask 的筆記中分析。
實(shí)際上,在 ThreadPoolExecutor 中除了 AbortPolicy 外還實(shí)現(xiàn)了三種不同類型的 handler
- CallerRunsPolicy — 在 線程池沒有 shutdown 的前提下,會(huì)直接在執(zhí)行 execute 方法的線程里執(zhí)行這個(gè)任務(wù)
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
- DiscardPolicy — 啥也不干,默默地丟掉任務(wù)~不信你看
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
- DiscardOldestPolicy — 丟棄掉隊(duì)列中未執(zhí)行的,最老的任務(wù),也就是任務(wù)隊(duì)列排頭的任務(wù),然后再試圖在執(zhí)行一次
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
總結(jié)
其實(shí)我不想做任何概念性的總結(jié),原因是我之前沒有開始學(xué)習(xí)源碼的時(shí)候也看過很多源碼分析的文章,大部分文章都會(huì)總結(jié)一些概念,這些概念本身可能是沒有錯(cuò)的,起碼是作者自己對(duì)源碼的理解,但是文字所傳達(dá)的思想真的是有限的,有時(shí)候因?yàn)楦拍畹哪:?,反而?huì)被帶入一個(gè)誤區(qū),并且長時(shí)間的無法轉(zhuǎn)變。
我自己一開始對(duì)線程池的理解其實(shí)是有偏差的,宏觀上可能沒有大的問題,但在細(xì)節(jié)上有很大的誤區(qū),通過自己耐心的閱讀源碼分析后學(xué)習(xí)到了很多東西。
非要總結(jié)的話就給一點(diǎn)我閱讀源碼的小思路吧:
- 一定要使用過,起碼能完整的使用。如果沒有用過很難把流程捋清楚
- 從使用的角度作為突破口,一步步的去尋找線索
- 一開始看不需要每一句都弄得很清楚,比如一個(gè)方法,應(yīng)該先搞清楚這個(gè)方法里面做了幾件事,核心的邏輯是什么
- 在捋清了整體邏輯后,再去看細(xì)節(jié)上的實(shí)現(xiàn)
- 實(shí)在無法理解的內(nèi)容,再看看別人的文章,因?yàn)橛辛嗽创a的基礎(chǔ),再看別人的文章能夠有自己的思路
- 與你的好基友探討,你會(huì)發(fā)現(xiàn)每個(gè)人有不同的角度去理解源碼,找到最合適你的那一種
以上是我的一點(diǎn)拙見。
最后感謝我的好基友 — 用語,在與他的探討中我走出了誤區(qū)并有了很多新的理解。