轉(zhuǎn)載請附原文鏈接:ThreadPoolExecutor源碼學習筆記
歡迎來我的個人主頁交流:http://extremej.itscoder.com/
大部分分析以注釋形式寫在源碼中
本篇筆記將從 ThreadPoolExecutor 的一次使用上來分析源碼,主要涉及線程池創(chuàng)建,execute 的步驟,任務添加到阻塞隊列,線程從阻塞隊列中拿取任務執(zhí)行,線程的回收,線程池的終止。
涉及到的類有
Executors — 獲取線程池
ThreadPoolExecutor — 線程池
Worker — 工作線程
LinkedBlockingQueue — 阻塞隊列
-
RejectedExecutionHandler — 任務拒絕處理器(實在不知道什么翻譯~)
?
線程池的獲取
我們知道可以通過 Executors 來獲取不同類型的線程池,那么就從 Executors 來開始看它是如何返回不同類型的線程池的,看看我們常用的一些方法
//獲取一個固定線程池
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}
//獲取只有一個線程的池子
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
//獲取一個緩存線程池,可變
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
從上面的三個方法可以發(fā)現(xiàn)其實都是 new 了一個 ThreadPoolExecutor ,但是傳入的參數(shù)不同,我們進到這個構(gòu)造方法中去一探究竟,看看不同的參數(shù)到底代表了什么
//構(gòu)造方法
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
通過參數(shù)名稱以及注釋可以知道這幾個參數(shù)的作用分別是
- corePoolSize — 核心線程數(shù),即允許閑置的線程數(shù)目
- maximumPoolSize — 最大線程數(shù),即這個線程池的容量
- keepAliveTime — 非核心線程的閑置存活時間
- unit — 上一個參數(shù)的單位
- workQueue — 任務隊列(阻塞隊列)
- threadFacotry — 線程創(chuàng)建工廠
- handler — 當線程池或者任務隊列容量已滿時用于reject
這里要明白一件事情,核心線程只是通過數(shù)目來判斷,而不是說先創(chuàng)建的線程就是核心線程
在構(gòu)造方法里面初始化了成員變量值,通過構(gòu)造方法應該明白了不同類型的線程獲取的原理。
任務的執(zhí)行
A.狀態(tài)屬性
在看源碼之前先了解一下 ThreadPoolExecutor的幾個狀態(tài)屬性,這對后面的源碼閱讀有很重要的作用,ThreadPoolExecutor 有五種狀態(tài)
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
從上到下依次是
- RUNNING — 運行狀態(tài),可以添加新任務,也可以處理阻塞隊列中的任務
- SHUTDOWN — 待關閉狀態(tài),不再接受新的任務,會繼續(xù)處理阻塞隊列中的任務
- STOP — 停止狀態(tài),此時的線程池不處理任何任務
- TIDYING — 整理狀態(tài),也可以理解為預終結(jié)狀態(tài),這個時候任務都處理完畢,池中無有效線程
- TERMINATED — 終止狀態(tài)
B.execute(Runnable command)
當獲取到了一個線程池之后,需要它來執(zhí)行異步任務,也就是 execute(Runnable) ,傳入一個 runnable 對象,在 run 方法中執(zhí)行我們的代碼,那么來看一下 execute() 是怎么工作的,因為源碼的注釋解釋得十分清楚,這里將注釋也貼出來。簡單翻譯一下,當 execute 被調(diào)用時總共有三種情況。
- 如果當前的有效線程數(shù)小于核心線程數(shù),則試圖創(chuàng)建一個新的 worker 線程
- 如果上面一步失敗了,則試圖將任務添加到阻塞隊列中,并且要再一次判斷需要不需要回滾隊列,或者說創(chuàng)建線程(后面會詳細說明)
- 如果上面兩步都失敗了,則會試圖強行創(chuàng)建一個線程來執(zhí)行這個任務,如果還是失敗,扔掉這個任務
了解了這三個步驟,來看看源碼,源碼中調(diào)用了 addworker 方法,這是創(chuàng)建線程的方法,會在后面講到
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
//1.判斷有效線程數(shù)是否小于核心線程數(shù)
if (workerCountOf(c) < corePoolSize) {
//創(chuàng)建新線程
if (addWorker(command, true))
return;
c = ctl.get();
}
//2.分開來看,首先判斷當前的池子是否是處于 running 狀態(tài)
//因為只有 running 狀態(tài)才可以接收新任務
//接下來判斷能否成功添加到隊列中,如果隊列滿了或者其他情況則會跳到下一步
if (isRunning(c) && workQueue.offer(command)) {
//再次檢查池子的狀態(tài),如果進入了非 running 狀態(tài),回滾隊列,扔掉這個任務
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
//如果處于 running 狀態(tài)則檢查當前的有效線程,如果沒有則創(chuàng)建一個線程
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//3.前兩步失敗了,就強行創(chuàng)建線程,成功會返回true,如果失敗扔掉這個任務
else if (!addWorker(command, false))
reject(command);
}
解釋一下第二步,為什么要recheck
當這個任務被添加到了阻塞隊列前,池子處于 RUNNING 狀態(tài),但如果在添加到隊列成功后,池子進入了 SHUTDOWN 狀態(tài)或者其他狀態(tài),這時候是不應該再接收新的任務的,所以需要把這個任務從隊列中移除,并且 reject
同樣,在沒有添加到隊列前,可能有一個有效線程,但添加完任務后,這個線程閑置超時或者因為異常被干掉了,這時候需要創(chuàng)建一個新的線程來執(zhí)行任務
C .addWorker()
前一步把 execute 的流程捋了一遍,里面多次出現(xiàn)了 addWorker() 方法,前文說到這是個創(chuàng)建線程的方法,來看看 addWorker 做了些什么,這個方法代碼比較長,我們拆開來一點一點看
- 第一部分 — 判斷各種基礎異常
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
// 檢查線程池狀態(tài),隊列狀態(tài),以及 firstask ,拆開來看
// 這段代碼看起來異常的蛋疼,轉(zhuǎn)換一下邏輯即
//rs>= SHUTDOWN && (rs != SHUTDOWN || firstTask != null ||workQueue.isEmpty())
// 總結(jié)起來就是 當前處于非 Running 狀態(tài),并且這三種情況
// 1. 不是處于 SHUTDOWN 狀態(tài),不能再創(chuàng)建線程
// 2. 有新的任務 (因為不能再接收新的任務)
// 3. 阻塞隊列中已經(jīng)沒有任務 (不需要再創(chuàng)建線程)
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);//當前有效線程數(shù)目
// 根據(jù)傳入的參數(shù)確定以核心線程數(shù)還是最大線程數(shù)作為判斷條件
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
// 大于容量 或者指定的線程數(shù),不允許創(chuàng)建
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
}
- 第二部分 — 試圖創(chuàng)建線程
創(chuàng)建一個Worker(什么東西?下文會講解,這里把它就當成是一個線程的容器)
boolean workerStarted = false;//標記 worker 開啟狀態(tài)
boolean workerAdded = false;//標記 worker 添加狀態(tài)
Worker w = null;
try {
w = new Worker(firstTask); //將這個任務作為 worker 的第一個任務傳入
final Thread t = w.thread;//通過 worker 獲取到一個線程
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
// running狀態(tài),或者 shutdown 狀態(tài)但是沒有新的任務
if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
// 將這個 worker 添加到線程池中
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
//標記worker添加成功
workerAdded = true;
}
} finally {
mainLock.unlock();
}
// 如果 worker 創(chuàng)建成功,開啟線程
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
上面代碼從邏輯層面來看不算難懂,到這里一個任務到達后,ThreadPoolExecutor 的處理就結(jié)束了,那么任務又是怎么被添加到阻塞隊列中,線程是如何從隊列中取出任務,上文中的 Worker 又是什么東西?
一個一個來,先來看看 Worker 到底是什么
D.Worker
Worker 是 ThreadPoolExecutor 的一個內(nèi)部類,實現(xiàn)了 Runnable 接口,繼承自 AbstractQueuedSynchronizer,這又是個什么鬼???我也不造~可以看看這篇文章
簡單來說,Worker實現(xiàn)了 lock 和 unLock 方法來標示當前線程的狀態(tài)是否為閑置
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
上一節(jié)創(chuàng)建線程成功后調(diào)用 t.start() 而這個線程又是 Worker 的成員變量
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
可以看到這里將 Worker 作為 Runnable 參數(shù)創(chuàng)建了一個新的線程,我們知道 Thread 接收一個 Runnable 對象后 start 運行的是 Runnable 的 run 方法,Worker 的 run 方法調(diào)用了 runWorker ,這個方法里面就是取出任務執(zhí)行的邏輯
public void run() {
runWorker(this);
}
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask; // 獲取到 worker 的第一個任務
w.firstTask = null;
w.unlock(); // 標記為閑置,還沒有開始任務 允許打斷
boolean completedAbruptly = true; // 異常退出標記
try {
// 循環(huán)取出任務,如果第一個任務不為空,或者從隊列中拿到了任務
// 只要這兩個條件滿足,會一直循環(huán),直到?jīng)]有任務,正常退出,或者異常退出
while (task != null || (task = getTask()) != null) {
w.lock();// 該線程標記為非閑置
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
// 翻譯注釋:1.如果線程池STOPPING狀態(tài),需要中斷線程
// 2.Thread.interrupted()是一個native方法,返回當前線程是否有被等待中斷的請求
// 3.第二個條件成立時,檢查線程池狀態(tài),如果為STOP,并且沒有被中斷,則中斷線程
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
// 執(zhí)行任務
try {
beforeExecute(wt, task);// 執(zhí)行前
Throwable thrown = null;
try {
task.run(); // 執(zhí)行任務
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown); // 執(zhí)行結(jié)束
}
} finally {
task = null; // 將 worker 的任務置空
w.completedTasks++;
w.unlock(); // 釋放鎖,進入閑置狀態(tài)
}
}// 循環(huán)結(jié)束
completedAbruptly = false; // 標記為正常退出
} finally {
// 干掉 worker
processWorkerExit(w, completedAbruptly);
}
}
這里弄清楚了一件事情,進入循環(huán)準備執(zhí)行任務時,worker 加鎖標記為非閑置,任務執(zhí)行完畢或者出現(xiàn)異常,worker 釋放鎖,進入閑置狀態(tài)。
也就是當一個 worker 執(zhí)行任務前或者執(zhí)行完任務,到取出下一個任務期間,都是閑置狀態(tài)可以被打斷
上面取出任務調(diào)用了 getTask() ,誒~為什么有一個死循環(huán),別著急,慢慢看來。上面的代碼可以知道如果 getTask 返回任務則執(zhí)行,如果返回為 null 則 worker 需要被回收
private Runnable getTask() {
// 標記取任務是否超時
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
// 如果線程池狀態(tài)為 STOP 或者 SHUTDOWN 并且隊列已經(jīng)為空,回收 wroker
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
//獲取當前有效線程數(shù)
int wc = workerCountOf(c);
// Are workers subject to culling?
// timed 用來標記當前的 worker 是否設置超時時間,
// 還記得獲取線程池的時候 可以設置核心線程超時時間
//1.允許核心線程超時回收(即所有線程) 2.當前有效線程超過核心線程數(shù)(需要回收)
// 如果timed == false 則該worker不會被回收,如果沒有取到任務 會一直阻塞
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// 回收線程條件
// 1. 有效線程數(shù)已經(jīng)大于了線程池的最大線程數(shù)或者設置了超時回收并且已經(jīng)超時
// 2. 有效線程數(shù)大于1或者隊列任務已經(jīng)為空
// 只有當上面1和2 同時滿足時 則試圖回收線程
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
// 如果減少workercount成功 直接回收
if (compareAndDecrementWorkerCount(c))
return null;
// 否則重走循環(huán),從第一個判斷條件處回收
continue;
}
// 取任務
try {
// 根據(jù)是否設置超時回收來選擇不同的取任務的方式
// poll 方法取任務會有超時時間,超過時間則返回null
// take 方法沒有超時時間,阻塞式方法
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
// 如果任務不為空返回任務
if (r != null)
return r;
// 否則標記超時 進入下一次循環(huán)等待回收
timedOut = true;
} catch (InterruptedException retry) {
// 如果出現(xiàn)異常,試圖重試
timedOut = false;
}
}
}
getTask() 方法邏輯也捋得差不多了,這里又出現(xiàn)了兩個新的方法,workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) 和 workQueue.take() ,這兩個都是阻塞隊列的方法,來看看它們又各自是怎么實現(xiàn)的
E.LinkedBlockingQueue — 阻塞隊列
ThreadPoolExecutor 使用的是鏈表結(jié)構(gòu)的阻塞隊列,實現(xiàn)了 BlockingQueue 接口,而 BlockingQueue 則是繼承自 Queue 接口,再上層就是 Collection 接口。
因為本篇筆記主要是分析 ThreadPoolExecutor 的原理,所以不會詳細介紹 LinkedBlockingQueue 中的其它代碼,主要介紹這里所用的方法,首先來看一下上文所提到的 take()
public E take() throws InterruptedException {
E x; // 任務
int c = -1; // 取出任務后的剩余任務數(shù)量
final AtomicInteger count = this.count; // 當前任務數(shù)量
final ReentrantLock takeLock = this.takeLock; // 加鎖防止并發(fā)
takeLock.lockInterruptibly();
try {
// 如果隊列數(shù)量為空,則一直循環(huán),阻塞線程
while (count.get() == 0) {
notEmpty.await();
}
// 取出任務
x = dequeue();
// 任務數(shù)量減一
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();// 標記隊列非空
} finally {
takeLock.unlock(); // 釋放鎖
}
//
if (c == capacity)
signalNotFull();//標記隊列已滿
return x;// 返回任務
}
上面的代碼可以知道 take 方法會一直阻塞直到隊列有新的任務為止
接下來是 poll 方法,可以看到幾乎與 take 方法相同,唯一的區(qū)別是在阻塞的循環(huán)代碼塊里面加了時間判斷,如果超時則直接返回為空,不會一直阻塞下去
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
E x = null; // 存放的任務
int c = -1;
long nanos = unit.toNanos(timeout); // 超時時間
final AtomicInteger count = this.count; // 隊列中的數(shù)量
final ReentrantLock takeLock = this.takeLock; // 加鎖防止并發(fā)
takeLock.lockInterruptibly();
try {
// 如果隊列為空,則不斷的循環(huán)
while (count.get() == 0) {
// 如果當?shù)褂嫊r小于0 即超時時間到 則返回空
if (nanos <= 0)
return null;
// 讓線程等待
nanos = notEmpty.awaitNanos(nanos);
}
x = dequeue(); // 取出一個任務
c = count.getAndDecrement(); // 取出后的隊列數(shù)量
if (c > 1)
notEmpty.signal(); // 標記非空
} finally {
takeLock.unlock(); // 釋放鎖
}
if (c == capacity)
signalNotFull(); // 標記隊列已滿
return x; // 返回任務
}
線程池的回收及終止
前一節(jié)分析了任務的執(zhí)行流程及原理,也留下了一個問題,worker 是如何被回收的呢?線程池該如何管理呢?回到上一節(jié)的 runWorker() 方法中,還記得最后調(diào)用了一個方法
processWorkerExit(w, completedAbruptly);
這個方法傳入了兩個參數(shù),第一個是當前的 Woker ,第二個是標記異常退出的標識
首先判斷是否為異常退出,如果是異常退出的話需要手動調(diào)整線程數(shù)量,如果是正?;厥盏模琯etTask 方法里面已經(jīng)手動調(diào)整過了,不記得的小伙伴可以看看前文的代碼,找找 decrementWorkerCount(),
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
// 加鎖
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
// 記錄線程池完成的任務總數(shù),從 workers 中移除該 worker
try {
completedTaskCount += w.completedTasks;
workers.remove(w);
} finally {
mainLock.unlock();
}
tryTerminate();//嘗試關閉池子
int c = ctl.get();
// 以下的代碼是判斷需不需要給線程池創(chuàng)建一個新的線程
// 如果線程池的狀態(tài)是 RUNNING 或者 SHUTDOWN 進一步判斷需不需要創(chuàng)建
if (runStateLessThan(c, STOP)) {
// 如果為異常退出直接創(chuàng)建,如果不是異常退出進入判斷
if (!completedAbruptly) {
// 獲取線程池應該存在的最小線程數(shù) 如果設置了超時 則是0,否則是核心線程數(shù)
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
// 如果 min 是0 但是隊列又不為空,則 min 應該是1
if (min == 0 && ! workQueue.isEmpty())
min = 1;
//如果當前池中的有效線程數(shù)大于等于最小線程數(shù) 則不需要創(chuàng)建
if (workerCountOf(c) >= min)
return; // replacement not needed
}
// 創(chuàng)建線程
addWorker(null, false);
}
}
上面的代碼中調(diào)用了 tryTerminate() 方法,這個方法是用于終止線程池的,又是一個 for 循環(huán),從代碼結(jié)構(gòu)來看是異常情況的重試機制。還是老方法,慢慢來看總共做了幾件事情
final void tryTerminate() {
for (;;) {
int c = ctl.get();
// 如果處于這三種情況不需要關閉線程池
// 1. Running 狀態(tài)
// 2. SHUTDOWN 狀態(tài)并且任務隊列不為空,不能終止
// 3. TIDYING 或者 TERMINATE 狀態(tài),說明已經(jīng)在關閉了 不需要重復關閉
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
// 進入到關閉線程池的代碼,如果線程池中還有線程,則需要打斷線程
if (workerCountOf(c) != 0) { // Eligible to terminate 可以關閉池子
// 打斷閑置線程,只打斷一個
interruptIdleWorkers(ONLY_ONE);
return;
// 如果有兩個以上怎么辦?只打斷一個?
// 這里只打斷一個是因為 worker 回收的時候都會進入到該方法中來,可以回去再看看
// runWorker方法最后的代碼
}
// 線程已經(jīng)回收完畢,準備關閉線程池
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();// 加鎖
try {
// 將狀態(tài)改變?yōu)?TIDYING 并且即將調(diào)用 terminated
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
terminated(); // 終止線程池
} finally {
ctl.set(ctlOf(TERMINATED, 0)); // 改變狀態(tài)
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
// 如果終止失敗會重試
}
// else retry on failed CAS
}
}
嘗試終止線程池的代碼分析完了,好像就結(jié)束了~但作為好奇寶寶,我們是不是應該看看如何打斷閑置線程,以及 terminated 中做了什么呢?來吧,繼續(xù)裝逼
先來看打斷線程
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();//加鎖~
try {
// 遍歷線程池中的 wroker
for (Worker w : workers) {
Thread t = w.thread;
// 如果線程沒有被中斷,并且能夠獲取到 worker的鎖(說明是閑置線程)
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();// 中斷線程
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
// 只中斷一個 worker 跳出循環(huán),否則會將所有的閑置線程都中斷
if (onlyOne)
break;
}
} finally {
mainLock.unlock();// 釋放鎖
}
}
有同學開始裝逼了,說我們是好奇寶寶,t.interrupt() 方法也應該看,嗯~沒錯,但這里是調(diào)用了 native 方法,會 c 的可以去看看裝逼,我就算了~
好了,再來看看 terminate, 是不是很坑爹? terminated 里面神!馬!也!沒!干!。。。淡定,其實這個方法類似于 Activity 的生命周期方法,允許你在被終止時做一些事情,默認的線程池沒有什么要做的事情,當然什么也沒寫啦~
/**
* Method invoked when the Executor has terminated. Default
* implementation does nothing. Note: To properly nest multiple
* overridings, subclasses should generally invoke
* {@code super.terminated} within this method.
*/
protected void terminated() { }
異常處理
還記得前面講到,出現(xiàn)各種異常情況,添加隊列失敗等等,只是籠統(tǒng)的說了一句扔掉,當然代碼實現(xiàn)不可能是簡單一句扔掉就完了?;氐?execute() 方法中找到 reject() 任務,看看究竟是怎么處理的
final void reject(Runnable command) {
handler.rejectedExecution(command, this);
}
還記得在創(chuàng)建線程池的時候,初始化了一個 handler — RejectedExecutionHandler
這是一個接口,只有一個方法,接收兩個參數(shù)
void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
既然是一個接口,那么肯定有他的實現(xiàn)類,我們先不急著看所有實現(xiàn)類,先來看看這里的 handler 可能是什么,記得在使用 Executors 獲取線程池調(diào)用構(gòu)造方法的時候并沒有傳入 handler 參數(shù),那么 ThreadPoolExecutor 應該會有一個默認的 handler
private static final RejectedExecutionHandler defaultHandler =
new AbortPolicy();
public static class AbortPolicy implements RejectedExecutionHandler {
public AbortPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
默認 handler 是 AbortPolicy ,這個類實現(xiàn)了 rejectedExecution() 方法,拋了一個 Runtime 異常,也就是說當任務添加失敗,就會拋出異常。這個類在 AsyncTask 引發(fā)了一場血案~所以在 API19 以后修改了 AsyncTask 的部分代碼邏輯,這里就不細說啦,會在下一篇 AsyncTask 的筆記中分析。
實際上,在 ThreadPoolExecutor 中除了 AbortPolicy 外還實現(xiàn)了三種不同類型的 handler
- CallerRunsPolicy — 在 線程池沒有 shutdown 的前提下,會直接在執(zhí)行 execute 方法的線程里執(zhí)行這個任務
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
- DiscardPolicy — 啥也不干,默默地丟掉任務~不信你看
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
- DiscardOldestPolicy — 丟棄掉隊列中未執(zhí)行的,最老的任務,也就是任務隊列排頭的任務,然后再試圖在執(zhí)行一次
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
總結(jié)
其實我不想做任何概念性的總結(jié),原因是我之前沒有開始學習源碼的時候也看過很多源碼分析的文章,大部分文章都會總結(jié)一些概念,這些概念本身可能是沒有錯的,起碼是作者自己對源碼的理解,但是文字所傳達的思想真的是有限的,有時候因為概念的模糊,反而會被帶入一個誤區(qū),并且長時間的無法轉(zhuǎn)變。
我自己一開始對線程池的理解其實是有偏差的,宏觀上可能沒有大的問題,但在細節(jié)上有很大的誤區(qū),通過自己耐心的閱讀源碼分析后學習到了很多東西。
非要總結(jié)的話就給一點我閱讀源碼的小思路吧:
- 一定要使用過,起碼能完整的使用。如果沒有用過很難把流程捋清楚
- 從使用的角度作為突破口,一步步的去尋找線索
- 一開始看不需要每一句都弄得很清楚,比如一個方法,應該先搞清楚這個方法里面做了幾件事,核心的邏輯是什么
- 在捋清了整體邏輯后,再去看細節(jié)上的實現(xiàn)
- 實在無法理解的內(nèi)容,再看看別人的文章,因為有了源碼的基礎,再看別人的文章能夠有自己的思路
- 與你的好基友探討,你會發(fā)現(xiàn)每個人有不同的角度去理解源碼,找到最合適你的那一種
以上是我的一點拙見。
最后感謝我的好基友 — 用語,在與他的探討中我走出了誤區(qū)并有了很多新的理解。