《Java并發(fā)編程之美》讀書筆記
第8章 Java并發(fā)包中線程池ThreadPoolExceutor原理探究
介紹
線程池主要解決兩個問題:
- 一是當執(zhí)行大量異步任務(wù)時線程池能提供較好的性能,在不適用線程池時,每當需要執(zhí)行異步任務(wù)時直接new一個線程來運行,而線程的創(chuàng)建和銷毀都是需要開銷的。線程池里面的線程是可以復(fù)用的,不需要每次執(zhí)行異步任務(wù)的時候都重新創(chuàng)建和銷毀線程。
- 線程池提供了一種資源限制和管理的手段,比如可以限制線程的個數(shù),動態(tài)增減線程的等,ThreadPoolExceutor也保留了一些基本的統(tǒng)計參數(shù),比如當前線程池完成的任務(wù)數(shù)目等
另外,線程池也提供了許多可調(diào)的參數(shù)個可擴展性接口,以滿足不同情況的需要,程序員可以使用更方便的Executors的工程方法,比如newCachedThreadPool(線程池線程個數(shù)最多可達Integer.MAX_VALUE,線程自動回收)newFixedThreadPool(固定大小的線程池)和newSingleThreadExecutor(單個線程)等來創(chuàng)建線程池,當然用戶還可以自定義
類圖介紹
在類圖中,Executors其實是個工具類,里面提供了好多靜態(tài)方法,這些方法更具用戶選擇返回不同的線程池實例。ThreadPoolExceutor繼承了AbstractExecutorService,成員變量ctl是一個Integer的原子變量,用來記錄線程池狀態(tài)和線程池中的線程個數(shù),類似于ReentrantReadWriteLock使用一個變量來保存兩種信息。

這里假設(shè)Integer類型是32位二進制表示,則其中高3位用來表示線程池狀態(tài),后面29位用來記錄線程池的線程個數(shù)
//原子變量ctl高3位用愛表示線程池狀態(tài),低29位用來表示線程個數(shù)
//默認RUNNING狀態(tài),線程個數(shù)為0
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
//線程個數(shù)掩碼位數(shù),并不是所有平臺的int類型都是32位的,所有的來說,是具體平臺下的Integer的二進制位數(shù)-3的剩余位數(shù)所表示的數(shù)才是線程的個數(shù)
private static final int COUNT_BITS = Integer.SIZE - 3;
//線程的最大個數(shù)(低29位)00011111111111111111111;
private static final int COUNT_MASK = (1 << COUNT_BITS) - 1;
// 線程池的狀態(tài)
//高3位 運行態(tài)111000000000000000000000000000
private static final int RUNNING = -1 << COUNT_BITS;
//高3位 000000000000000000000000000000
private static final int SHUTDOWN = 0 << COUNT_BITS;
//高3位 001
private static final int STOP = 1 << COUNT_BITS;
//高3位 010
privatestatic final int TIDYING = 2 << COUNT_BITS;
//高3位 011
private static final int TERMINATED = 3 << COUNT_BITS;
// 獲取高3位
private static int runStateOf(int c) { return c & ~COUNT_MASK; }
//獲取低29位(線程個數(shù))
private static int workerCountOf(int c) { return c & COUNT_MASK; }
//計算ctl新值(線程狀態(tài)與線程個數(shù))
private static int ctlOf(int rs, int wc) { return rs | wc; }
- RUNNING:接受新任務(wù)并且處理阻塞隊列里面的任務(wù)
- SHUTDOWN:拒絕新任務(wù)但是處理阻塞隊列里的任務(wù)
- STOP:拒絕新任務(wù)并且拋棄阻塞隊列里的任務(wù),同時會中斷正在處理的任務(wù)
- TIDYING:所有任務(wù)都執(zhí)行完(包含阻塞隊列里面的任務(wù))后當前線程池活動線程為0,將要調(diào)用terminated方法
- TERMINATED:終止狀態(tài),terminated方法調(diào)用完成以后的狀態(tài)
- RUNNING->SHUTDOWN:顯示調(diào)用shutdown()方法,或者隱士調(diào)用了finalize()方法里面的shutdown方法;
- RUNNING or SHUTDOWN ->STOP:顯示調(diào)用shutdownNow()方法時。
- SHUTDOWN->tIDYING:當線程池和任務(wù)隊列都為空的時候
- STOP->TIDYING:當線程池為空的時候
- TIDYING->TERMINATED:當terminated()hook方法完成時。
線程池的參數(shù)如下: - corePoolSize:線程池核心線程的個數(shù)
- workQueue:用于保存等待執(zhí)行的任務(wù)的阻塞隊列,比如基于數(shù)組的有界的ArrayBlockingQueue、基于鏈表的無界LinkedBlockingQueue、最多只有一個元素的同步隊列SynchronousQueue及優(yōu)先級隊列PriorityBlockingQueue等
- maximumPoolSize:線程池最大的線程數(shù)量
- ThreadFactory:創(chuàng)造線程的工廠
- RejectedExecutionHandler:飽和策略,當隊列滿并且線程個數(shù)達到maximunPollSize后采取的策略,比如AbortPolicy(拋出異常),CallerRunsPolicy(使用調(diào)用者線程來運行任務(wù))、DiscardOldestPolicy(調(diào)用poll丟棄一個任務(wù),執(zhí)行當前任務(wù))及DiscardPolicy(默默丟棄,不拋出異常)
- keeyAliveTime:存活時間。如果當前線程池中的線程數(shù)量比核心線程數(shù)量多,并且是閑置狀態(tài),則這些閑置的線程能存活的最大時間。
- TimeUnit:存活時間的時間單位
- newFixedThreadPool:創(chuàng)建一個核心線程個數(shù)和最大線程個數(shù)都為nThreads的線程池,并且阻塞隊列長度為Integer.MAX_VALUE。 keepAliveTime=0說明只要線程個數(shù)比核心線程多并且當前空閑則回收。
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
//使用自定義線程創(chuàng)建工廠
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}
- newSingleThreadExecutor:創(chuàng)建一個核心線程個數(shù)和最大線程個數(shù)都為1的線程池,并且阻塞隊列的長度為Integer.NAX_VALUE.keepAliveTime=0說明只要線程個數(shù)比核心個數(shù)多并且當前空閑則回收。
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
//使用自己的線程工廠
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory));
}
- newCachedTheadPool:創(chuàng)建一個按需創(chuàng)建線程的線程池,出事的線程個數(shù)為0,最多線程的個數(shù)為Integer.MAX_VALUE,并且阻塞隊列為同步隊列。KeepAliveTime=60,表示當前線程在60s內(nèi)空閑則回收。這個類型的特殊之處在于,加入同步隊列的任務(wù)會被馬上執(zhí)行,同步隊列里面最多只有一個任務(wù)。
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
//使用自定義的線程工廠
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);
}
如上的TheadPoolExecutor類圖所示,其中mainLock是獨占鎖,用來控制新增Worker線程操作的原子性,termination是該鎖對應(yīng)的條件隊列,在線程調(diào)用awaitTermination時用來存放阻塞的線程。
Worker繼承AQS和Runnable接口,是具體承載任務(wù)的對象,worker繼承了AQS,自己實現(xiàn)了簡單不可重入獨占鎖,其中state=0表示鎖未被獲取狀態(tài),state=1表示所以就備貨區(qū),state=-1是創(chuàng)建worker默認的狀態(tài),創(chuàng)建時狀態(tài)設(shè)置為-1是為了避免該線程在運行runWorker方法前被中斷,其中變量firstTask記錄該工作線程執(zhí)行的第一個任務(wù),thread是具體執(zhí)行任務(wù)的線程。
DefaultThreadFactory是線程工廠,newThread方法時對線程的一個修飾,其中poolNumber是個靜態(tài)的原子變量,用來統(tǒng)計線程工程的個數(shù),threadNumber用來記錄每個線程工廠創(chuàng)建了多少的線程,這兩個值也作為線程池和線程的名稱的一部分。
源碼分析
public void execute(Runnable command)
execute方法的作用是提交任務(wù)command到線程池進行執(zhí)行,用戶線程提交任務(wù)到線程池的模型圖如下圖所示:

從該圖可以看出,ThreadPoolExecutor的實現(xiàn)實際上是一個生產(chǎn)消費模型,當用戶添加任務(wù)到線程池到相當于生產(chǎn)者生產(chǎn)元素,workers線程工作集中的線程直接執(zhí)行任務(wù)或者從任務(wù)隊列里面獲取任務(wù)時則相當于消費者消費元素。
public void execute(Runnable command) {
//1.如果任務(wù)為null,則拋出NPE異常
if (command == null)
throw new NullPointerException();
//2獲取當前線程池的狀態(tài)+線程個數(shù)變量的組合值
int c = ctl.get();
//3當前線程池中的線程個數(shù)是否小于corePoolSize,小于的話則開啟線程運行
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
//4如果線程池處于Running狀態(tài),則添加任務(wù)到阻塞隊列
if (isRunning(c) && workQueue.offer(command)) {
//4,1二次檢查
int recheck = ctl.get();
//4.2如果當前的線程池狀態(tài)不是RUNNING則從隊列中刪除任務(wù),并且執(zhí)行拒絕策略
if (! isRunning(recheck) && remove(command))
reject(command);
//4.3否則如果當前線程池為空,則添加一個線程
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//5如果隊列滿,則新增線程,新增失敗則執(zhí)行拒絕策略
else if (!addWorker(command, false))
reject(command);
}
代碼3判斷如果當前線程池中線程個數(shù)小于corePoolSize,會向workers里面新增一個核心線程(core線程)執(zhí)行該任務(wù)
如果當前線程池中線程個數(shù)大于等于corePoolSize則執(zhí)行代碼4.如果當前線程池處于RUNNING狀態(tài)則添加當前任務(wù)到任務(wù)隊列。這里需要判斷線程池狀態(tài)是因為線程池已經(jīng)處于非RUNNING,而非RUNNING狀態(tài)下是要拋棄新任務(wù)的。
如果向任務(wù)隊列添加任務(wù)成功,則代碼4.2對線程池狀態(tài)進行二次校驗,這是因為添加任務(wù)到任務(wù)隊列后,執(zhí)行代碼4.2之前有可能線程池的狀態(tài)已經(jīng)發(fā)生了變化了,這里進行二次檢驗,如果當前線程池狀態(tài)不是RUNNINGLE則把任務(wù)從任務(wù)隊列里面移除,移除后執(zhí)行拒絕策略;如果二次校驗通過,則執(zhí)行待嗎4.3重新判斷當前線程池里面是否還有線程,如果沒有則新增一個線程。
如果代碼4添加任務(wù)失敗,則說明任務(wù)隊列已滿,那么執(zhí)行代碼5嘗試新開啟線程如圖中中thread3,thread4來執(zhí)行該任務(wù),如果當前線程池中線程個數(shù)>maximunPoolSize則執(zhí)行拒絕策略。
下面分析新增線程addWorker方法:
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (int c = ctl.get();;) {
// 6檢查隊列是否只在必要時為空
if (runStateAtLeast(c, SHUTDOWN)
&& (runStateAtLeast(c, STOP)
|| firstTask != null
|| workQueue.isEmpty()))
return false;
//7 循環(huán)CAS增加線程個數(shù)
for (;;) {
//7.1如果線程個數(shù)則返回false
if (workerCountOf(c)
>= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
return false;
//7.2CAS增加線程個數(shù),同時只有一個線程成功
if (compareAndIncrementWorkerCount(c))
break retry;
//7.3到這里說明CAS失敗了,則看線程池狀態(tài)是否變化了,變化則調(diào)到外層循環(huán)重新嘗試獲取線程池狀態(tài),否則內(nèi)層循環(huán)重新CAS
c = ctl.get(); // Re-read ctl
if (runStateAtLeast(c, SHUTDOWN))
continue retry;
}
}
//8到這里說明CAS成功了
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
//8.1創(chuàng)建worker
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
//8.2加獨占鎖,為了實現(xiàn)workers同步,因為可能多個線程調(diào)用了線程池的execute方法
mainLock.lock();
try {
// 重新檢查線程池狀態(tài),以避免在獲取鎖調(diào)用了shutdown接口
int c = ctl.get();
if (isRunning(c) ||
(runStateLessThan(c, STOP) && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
//添加任務(wù)
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
//添加成功后則啟動任務(wù)
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
代碼比較長主要分為兩個部分:
- 第一部分雙重循環(huán)的目的是通過CAS操作添加線程數(shù)
- 第二部分主要是把并發(fā)安全的任務(wù)添加到workers里面,并且啟動任務(wù)執(zhí)行**
首先來分析第一部分代碼
if (runStateAtLeast(c, SHUTDOWN)
&& (runStateAtLeast(c, STOP)//1
|| firstTask != null//2
|| workQueue.isEmpty()))//3
return false;
也就是說代碼6在下面幾種情況下會返回false
- 當前線程池狀態(tài)為STOP TIDYING TERMINATED
- 當前線程池狀態(tài)為SHUTDOWN并且已經(jīng)有第一個任務(wù)
- 當前線程池狀態(tài)為SHUTDOWN并且任務(wù)隊列為空
內(nèi)層循環(huán)的作用是使用CAS操作增加線程數(shù),代碼7.1如果線程個數(shù)超限則返回false,否則執(zhí)行代碼7.2CAS操作設(shè)置線程個數(shù),CAS成功則退出雙循環(huán),CAS失敗則執(zhí)行代碼7.3看當前線程池的狀態(tài)是否變化了,如果變了,則再次進入外層循環(huán)重新獲取線程池狀態(tài),否則進入內(nèi)存循環(huán)繼續(xù)進行CAS嘗試。
執(zhí)行到了第二部分的代碼8是說明使用CAS成功的增加了線程個數(shù),但是現(xiàn)在任務(wù)還沒有開始執(zhí)行。這里要使用全局的獨占鎖把新增的Worker添加到工作集workers中。代碼8.1創(chuàng)建了一個工作線程Worker。
代碼8.2獲取了獨占鎖,代碼8.3重新檢查線程池狀態(tài),這是為了避免在獲取鎖之前其他線程調(diào)用了shutdown關(guān)閉了線程池,如果線程池已經(jīng)被關(guān)閉,則釋放鎖,新增線程失敗,否則執(zhí)行代碼8.4天假工作線程到線程工作集,然后釋放鎖,代碼8.5判斷如果新增工作線程成功,則啟動工作線程。
工作線程Worker的執(zhí)行
用戶線程提交任務(wù)到線程池后,由worker來執(zhí)行。先看下worker的構(gòu)造函數(shù)。
Worker(Runnable firstTask) {
setState(-1); // 在調(diào)用runworker前禁止中斷
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);//創(chuàng)建一個線程
}
在構(gòu)造函數(shù)內(nèi)首先設(shè)置worker的狀態(tài)為1,這是為了避免當前worker在調(diào)用runworker方法前被中斷(當其他線程調(diào)用了線程池的shutdownNow時候,如果worker代碼中狀態(tài)>=0則會中斷該線程。這里設(shè)置線程的狀態(tài)為-1,所以該線程就不會中斷了,咋子如下的runworker代碼中,運行代碼9時會調(diào)用unlock方法,該方法把status設(shè)置了為0,所以這時候調(diào)用shutDownNow會中斷worker線程。
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts9將state設(shè)置為0,允許中斷
boolean completedAbruptly = true;
try {
//10
while (task != null || (task = getTask()) != null) { //10.1
w.lock();
...
try {
//10.2執(zhí)行任務(wù)前干一些事情
beforeExecute(wt, task);
try {
task.run();//10.3執(zhí)行任務(wù)
afterExecute(task, null);
} catch (Throwable ex) {
afterExecute(task, ex);
throw ex;
}
} finally {
task = null;
//10.5 統(tǒng)計當前worker完成了多少任務(wù)
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
//11執(zhí)行清理工作
processWorkerExit(w, completedAbruptly);
}
}
在如上的代碼10中,如果當前task==null或者調(diào)用getTask()從任務(wù)隊列獲取的任務(wù)返回null,則跳轉(zhuǎn)到代碼11執(zhí)行,如果task不為null則執(zhí)行任務(wù)10.1獲取線程內(nèi)部持有的獨占鎖,然后執(zhí)行擴展接口代碼10.2在具體任務(wù)之前做些事情,代碼10.3具體執(zhí)行任務(wù),代碼10。5統(tǒng)計當前worker完成了多少個任務(wù),并釋放鎖。
這里在執(zhí)行具體的任務(wù)期間加鎖,是為了避免在任務(wù)運行的期間,其他線程調(diào)用了shutdown后正在執(zhí)行的任務(wù)被中斷(shutdown只會中斷當前被阻塞掛起的線程)
代碼11執(zhí)行清理任務(wù),其代碼如下:
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
//11.1統(tǒng)計整個線程池完成的任務(wù)個數(shù),并從工作集里面刪除當前的worker
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
workers.remove(w);
} finally {
mainLock.unlock();
}
//11.2嘗試設(shè)置線程池的狀態(tài)為Terminated,如果當前是shutdown狀態(tài)并工作隊列為空
//或者當前是stop狀態(tài),當前線程池里沒有活動線程。
tryTerminate();
int c = ctl.get();
//11.3如果當前線程個數(shù)小于核心個數(shù),則增加
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
addWorker(null, false);
}
}
在如上的代碼中,代碼11.1統(tǒng)計線程池完成任務(wù)的個數(shù),并且在統(tǒng)計前加了全局所,把在當前工作線程中完成的任務(wù)累加到全局計數(shù)器,然后從工作集中刪除當前的worker。
代碼11.2判斷如果線程池的狀態(tài)是SHUTDOWN并且工作隊列為空,或者當前線程池狀態(tài)是STOP并且當前線程池里面沒有活動線程,則設(shè)置線程池狀態(tài)為TERMINATED。如果設(shè)置為了TERMINATED狀態(tài),則還需要調(diào)用條件變量termination的signalAll()方法激活所有因為調(diào)用線程池的awaitTermination方法而被阻塞的線程。
代碼11.3則判斷當前線程池個數(shù)是否小于核心線程個數(shù),如果是則在新增一個線程。
shutdown操作
調(diào)用shutdown方法之后,線程系就不會在接受新的任務(wù)了,但是工作隊列里面的任務(wù)還是需要執(zhí)行的。該方法會立刻返回,并不等待隊列任務(wù)完成在返回。
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//12 權(quán)限檢查
checkShutdownAccess();
//13 設(shè)置當前線程池狀態(tài)為SHUTDOWN,如果已經(jīng)是了SHUTDOWN則直接返回。
advanceRunState(SHUTDOWN);
//設(shè)置中斷標志
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
//15 嘗試將狀態(tài)變?yōu)門ERMINATED
tryTerminate();
}
在如上的代碼中,代碼12檢查看是否設(shè)置了安全管理器,是則看點前調(diào)用shutdown命令的線程是否有關(guān)閉線程的權(quán)限,如果有則還要看調(diào)用線程是否有中斷線程工作線程的權(quán)限,如果沒有權(quán)限則拋出SecurityException或者NullPointerException
其中代碼13的內(nèi)從如下,如果當前線程池狀態(tài)>=
SHUTDOWN則直接返回,否則是指為SHUTDOWN狀態(tài)。
private void advanceRunState(int targetState) {
// assert targetState == SHUTDOWN || targetState == STOP;
for (;;) {
int c = ctl.get();
if (runStateAtLeast(c, targetState) ||
ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
break;
}
}
代碼14的內(nèi)容如下,其設(shè)置所有空閑線程的中斷標志。這里首先加了全局鎖,同時只有一個線程可以調(diào)用shutdown方法設(shè)置中斷標志,然后嘗試獲取worker自己的鎖,獲取成功則設(shè)置中斷標志。由于正在執(zhí)行的任務(wù)已經(jīng)獲取了鎖,所以正在執(zhí)行的任務(wù)沒有被中斷。這里中斷的是阻塞到getTask()方法并企圖從隊列里面獲取任務(wù)的線程,也就是空閑線程。
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
//如果工作線程沒有被中斷,并且沒有正在運行則設(shè)置中斷標志
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
final void tryTerminate() {
for (;;) {
..
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {//設(shè)置當前線程池狀態(tài)為TIDYING
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
terminated();
} finally {
//設(shè)置當前線程池狀態(tài)為TERMINATED
ctl.set(ctlOf(TERMINATED, 0));
//激活因調(diào)用條件變量termination的await方法而被阻塞的所有線程
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
}
}
在如上代碼中,首先使用CAS設(shè)置當前線程池狀態(tài)為TIDYING,如果設(shè)置成功則執(zhí)行擴展接口terminated在線程池狀態(tài)變?yōu)門ERMINATED前做一些事情,然后設(shè)置當前線程值得狀態(tài)為TERMINATED。最后調(diào)用termination.signalAll激活因調(diào)用條件變量termination的await方法而被阻塞的所有線程。
shutdownNow操作
調(diào)用shutdownNow方法后,線程池就不會再接受新的任務(wù)了,并且會丟棄工作隊列里面的任務(wù),正在執(zhí)行的任務(wù)會被中斷,該方法會立刻返回,并不等待激活的任務(wù)執(zhí)行完成。返回值為這時候隊列里面被丟棄的任務(wù)列表。
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();//16權(quán)限檢查
advanceRunState(STOP);//17設(shè)置線程池的狀態(tài)為stop
interruptWorkers();//18中斷所有線程
tasks = drainQueue();//19將隊列任務(wù)全部移動到tasks里面
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}
在如上的代碼中,首先調(diào)用代碼16檢查權(quán)限,然后調(diào)用代碼17設(shè)置當前線程池的狀態(tài)為stop,隨后執(zhí)行代碼18中斷所有工作線程,這里需要注意的是,中斷的所有線程包含空閑線程和正在執(zhí)行任務(wù)的線程。
private void interruptWorkers() {
// assert mainLock.isHeldByCurrentThread();
for (Worker w : workers)
w.interruptIfStarted();
}
然后執(zhí)行代碼19將任務(wù)隊列里面的任務(wù)移動到tasks列表
awaitTermination操作
當線程調(diào)用awaitTermination方法后,當前線程會阻塞,直到線程池狀態(tài)變?yōu)門ermination才返回,或者等待時間超時才返回。整個過程中獨占鎖的代碼:
public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
while (runStateLessThan(ctl.get(), TERMINATED)) {
if (nanos <= 0L)
return false;
nanos = termination.awaitNanos(nanos);
}
return true;
} finally {
mainLock.unlock();
}
}
如上代碼首先獲取獨占鎖,然后再無限循環(huán)內(nèi)部判斷當前狀態(tài)池是否至少是Termination狀態(tài),如果是則直接返回,否則說明當前線程池還有線程在執(zhí)行,則看設(shè)置的超時時間nanos是否小于0,小于0則說明不需要等待,那就直接返回,如果大于0則調(diào)用條件變量termination的awaitNanos方法等待nanos時間,期望在這段時間內(nèi)線程池的狀態(tài)變?yōu)門ERMINATED
在講shutdown方法提到過,當線程池狀態(tài)變?yōu)門ERMINATED的時候,會調(diào)用termination.signalAll激活因調(diào)用條件變量termination的await方法而被阻塞的所有線程,所以如果在調(diào)用awaitTermination方法之后調(diào)用shutdown方法,并且在shutdown內(nèi)部將線程池狀態(tài)設(shè)置為TERMINATED,則termination.awaitNanos方法會返回。
另外在工作線程中worker的runworker方法內(nèi),當工作線程運行結(jié)束后,會低啊用processWorkerExit方法,在processWorkerExit方法內(nèi)部也會調(diào)用trytREMINATE方法測試當前時候應(yīng)該把線程池狀態(tài)設(shè)置為TERMINATED,如果是,則也會調(diào)用termination.signalAll激活因調(diào)用條件變量termination的await方法而被阻塞的所有線程。
而且當?shù)却龝r間超時后,terminate.awaitNanos也會返回,這時候會重現(xiàn)檢查當前線程池狀態(tài)是否為TERMINATED;如果是則世界返回,否則繼續(xù)阻塞掛起自己。
總結(jié)
線程池巧妙的使用一個Integer類型的原子變量來記錄線程池狀態(tài)和線程池中線程個數(shù),通過線程池狀態(tài)來控制任務(wù)的執(zhí)行,每個Worker線程可以處理多個任務(wù),線程池通過線程的復(fù)用減少了線程的創(chuàng)建和銷毀的開銷。
Java并發(fā)包中ScheduledThreadPoolExecutor原理探究
ThreadPollExecutor只是Executors工具類的一部分功能,而ScheduledThreadPoolExecutor是一個可以在指定一定延遲時間后或者定時進行任務(wù)調(diào)度執(zhí)行的線程池。
類圖介紹
Executors其實是個工具類,它提供了好多靜態(tài)方法,可根據(jù)用戶的選擇返回不同的線程池實例。ScheduledThreadPoolExecutor繼承了ThreadPoolExecutor并實現(xiàn)了ScheduledExecutorService接口,線程池隊列是DelayedWorkQueue,其和DelayedQueue類似,是一個延遲隊列。ScheduledFutureTask是具有返回值得任務(wù),繼承自FutureTask。FutureTask的內(nèi)部用一個變量state用來表示任務(wù)的狀態(tài),一開始狀態(tài)為new,所有狀態(tài)為
private volatile int state;
private static final int NEW = 0;//初始狀態(tài)
private static final int COMPLETING = 1;//執(zhí)行中狀態(tài)
private static final int NORMAL = 2;//正常運行結(jié)束狀態(tài)
private static final int EXCEPTIONAL = 3;//運行中異常
private static final int CANCELLED = 4;//任務(wù)被取消
private static final int INTERRUPTING = 5;//任務(wù)正在被中斷
private static final int INTERRUPTED = 6;//任務(wù)已經(jīng)被中斷

可能的任務(wù)狀態(tài)的轉(zhuǎn)換路徑為
new->completing-normal 初始態(tài)->執(zhí)行中-正常結(jié)束
new->completing->exception 初始態(tài)->執(zhí)行中->執(zhí)行異常
new->cancelled 初始態(tài)->任務(wù)取消
new->interrupting-interrupted初始狀態(tài)->被中斷中->被中斷
ScheduledFutureTask內(nèi)部還有一個變量period用來表示任務(wù)的類型,任務(wù)的類型如下:
- period=0說明當前任務(wù)時一次性的,執(zhí)行完畢后就退出了
- period為附屬,說明當前任務(wù)為fixed-delay任務(wù),是固定延遲的定時可重復(fù)執(zhí)行的任務(wù)
- period為正數(shù),說明當前任務(wù)為fixed-rate任務(wù),是固定頻率的定時可重復(fù)執(zhí)行任務(wù)
ScheduledThreadPoolExecutor的一個構(gòu)造函數(shù)如下,由構(gòu)造函數(shù)可知線程池隊列是DelayedWorkQueue
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE,
DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
new DelayedWorkQueue());
}
原理剖析
- schedule(Runnable command,long delay,TimeUnit unit)
- scheduleWithFixedDelay(Runnable command,long initialDelay,long delay,TimeUnit unit)
- scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit)
schedule(Runnable command,long delay,TimeUnit unit)
這份方法的作用是提交一個延遲執(zhí)行的任務(wù),任務(wù)從提交時間算起延遲單位為unit的delay的時間開始執(zhí)行。提交的任務(wù)不是周期性任務(wù),任務(wù)只會執(zhí)行一次
public ScheduledFuture<?> schedule(Runnable command,
long delay,
TimeUnit unit) {
//1參數(shù)校驗
if (command == null || unit == null)
throw new NullPointerException();
//2任務(wù)轉(zhuǎn)換
RunnableScheduledFuture<Void> t = decorateTask(command,
new ScheduledFutureTask<Void>(command, null,
triggerTime(delay, unit),
sequencer.getAndIncrement()));
//3添加任務(wù)到延遲隊列
delayedExecute(t);
return t;
}
1.如上代碼1進行參數(shù)校驗,如果command或者unit為null,則拋出空指針異常
2。代碼2執(zhí)行裝飾任務(wù),把提交的command任務(wù)轉(zhuǎn)換為ScheduledFutureTask。ScheduledFutureTask是具體放入延遲隊列的東西,由于是延遲任務(wù),所以ScheduledFutureTask實現(xiàn)了long getDelay(TimeUnit unit)和int compareTo(Dealyed other)方法。triggerTime方法將延遲時間轉(zhuǎn)為絕對時間,也就是把當前時間的那描述加上延遲的納秒數(shù)的long值,ScheduledFutureTask的構(gòu)造函數(shù)如下。
ScheduledFutureTask(Runnable r, V result, long triggerTime,
long sequenceNumber) {
//調(diào)用父類FutureTask構(gòu)造函數(shù)
super(r, result);
this.time = triggerTime;
this.period = 0;//period為0,說明為一次性任務(wù)
this.sequenceNumber = sequenceNumber;
}
在構(gòu)造函數(shù)內(nèi)部首先調(diào)用了父類FutureTask的構(gòu)造函數(shù),父類FutureTask的構(gòu)造函數(shù)的代碼如下
//通過適配器將runnable轉(zhuǎn)換為callable
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // 設(shè)置當前任務(wù)狀態(tài)為new
}
FutureTask中的任務(wù)被轉(zhuǎn)換為Callable類型后,被保存到了變量this.callable里面,并設(shè)置FutureTask的任務(wù)為NEW
然后再ScheduledFutureTask構(gòu)造函數(shù)內(nèi)部設(shè)置time為上面說的絕對時間,需要注意,這里的period的值為0,這說明當前任務(wù)為一次性的任務(wù),不是定時反復(fù)執(zhí)行任務(wù)。其中 long getDealy(TimeUnit unit)方法的代碼如下(剛方法是用來計算當前任務(wù)還有多少時間就過期了)
public long getDelay(TimeUnit unit) {
return unit.convert(time - System.nanoTime(), NANOSECONDS);
}
public int compareTo(Delayed other) {
if (other == this) // compare zero if same object
return 0;
if (other instanceof ScheduledFutureTask) {
ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
long diff = time - x.time;
if (diff < 0)
return -1;
else if (diff > 0)
return 1;
else if (sequenceNumber < x.sequenceNumber)
return -1;
else
return 1;
}
long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
}
compareTo的作用是假如元素到延遲隊列后,在內(nèi)部建立或者調(diào)整堆的時候回使用鈣元素的compareTo方法與隊列里面其他元素進行比較,讓最快要過期的元素放到隊首。所以無論什么時候往隊列里面添加元素,隊首的元素都是最快要過期的元素
private void delayedExecute(RunnableScheduledFuture<?> task) {
//4 如果線程池關(guān)閉了,則執(zhí)行線程池拒絕策略
if (isShutdown())
reject(task);
else {
//5添加任務(wù)到延遲隊列
super.getQueue().add(task);
//6再次檢查線程池狀態(tài)
if (!canRunInCurrentRunState(task) && remove(task))
task.cancel(false);
else
//7確保至少一個線程處理任務(wù)
ensurePrestart();
}
}
4.代碼4首先判斷當前線程池是否已經(jīng)關(guān)閉了,如果已經(jīng)關(guān)閉則中線程池的拒絕策略沒否則實行代碼5將任務(wù)添加到延遲隊列。添加完畢后還要重新檢查線程池是否被關(guān)閉了,如果已經(jīng)關(guān)閉了則從延遲隊列里面刪除剛才添加的任務(wù),但是此時有可能線程池中的線程已經(jīng)從任務(wù)隊列里面移除了該任務(wù),也就是該任務(wù)已經(jīng)在執(zhí)行了,所以還需要調(diào)用任務(wù)的cancel方法取消任務(wù)
5如果代碼6判斷的結(jié)果為false,則會執(zhí)行代碼7確保至少有一個線程在處理任務(wù),即使核心線程數(shù)corePoolSize被設(shè)置為0,ensureOrestart的代碼如下
void ensurePrestart() {
int wc = workerCountOf(ctl.get());
if (wc < corePoolSize)
//增加核心線程數(shù)
addWorker(null, true);
//如果初始化corePoolsize==0,則也增加一個線程
else if (wc == 0)
addWorker(null, false);
}
如上代碼首先獲取線程池中的線程個數(shù),如果線程個數(shù)小于核心線程數(shù)則核心線程數(shù)新增一個線程,否則如果當前線程數(shù)為0則新增一個線程。
上面我們分析了如何向延遲隊列里面添加任務(wù),接下來我們來見線程池里面的線程如何獲取并執(zhí)行任務(wù),在前面講解ThreadPoolExecutor時有提及,具體執(zhí)行任務(wù)的線程是worker線程,worker線程調(diào)用具體任務(wù)的run方法來執(zhí)行,由于這里的任務(wù)是ScheduledFutureTask,所以我們具體來看看ScheduledFutureTask的run方法
public void run() {
//8是否只執(zhí)行一次
boolean periodic=isPeriodic()
//9取消任務(wù)
if (!canRunInCurrentRunState(this))
cancel(false);
//10 只執(zhí)行一次,調(diào)用schedule方法時候
else if (!isPeriodic())
super.run();
//11定時執(zhí)行
else if (super.runAndReset()) {
//11.1設(shè)置time=time+period
setNextRunTime();
//11.2重新加入該任務(wù)到delay隊列
reExecutePeriodic(outerTask);
}
}
代碼8中isPeriodic的作用是判斷當前任務(wù)是一次性任務(wù)還是可重復(fù)執(zhí)行的任務(wù)
public boolean isPeriodic() {
return period != 0;
}
其內(nèi)部是通過period的值來判斷,由于轉(zhuǎn)換任務(wù)在創(chuàng)建ScheduledFutureTask時傳遞的period的值為0,所以這里isPeriodic
返回false.
6.代碼9判斷當前任務(wù)是否應(yīng)該被取消,
boolean canRunInCurrentRunState(RunnableScheduledFuture<?> task) {
return isRunningOrShutdown(preiodic?continueExistingPeriodicTasksAfterShutdown:
executeExistingDelayedTasksAfterShutdown);
}
這里傳遞的preiodic的值為false,所以isRunningOrShutdown的參數(shù)為executeExistingDelayedTasksAfterShutdown。executeExistingDelayedTasksAfterShutdown默認為true,表示當其他線程調(diào)用了shutdown命令關(guān)閉了線程后,當前任務(wù)還是要執(zhí)行,否則如果為false,則當前任務(wù)要取消。
7.由于periodic的值為false,所以執(zhí)行代碼10父類FutureTask的方法執(zhí)行具體執(zhí)行任務(wù),F(xiàn)utureTask的run方法代碼如下。
public void run() {
//12
if (state != NEW ||
!RUNNER.compareAndSet(this, null, Thread.currentThread()))
return;
//13
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
//13.1
setException(ex);
}
//13.2
if (ran)
set(result);
}
} finally {
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
代碼12判斷如果任務(wù)狀態(tài)不是new則直接返回,或者如果當前任務(wù)狀態(tài)為new但是使用CAS設(shè)置當前任務(wù)的持有者為當前線程失敗則直接返回,代碼13具體調(diào)用callable的call方法執(zhí)行任務(wù)。這里在調(diào)用前又判斷了任務(wù)的狀態(tài)是否為new,是為了避免在執(zhí)行代碼12后其他線程不該了任務(wù)的狀態(tài)(比如取消了該任務(wù))
如果任務(wù)執(zhí)行成功則執(zhí)行代碼13.2修改任務(wù)的狀態(tài),set方法的代碼如下
protected void set(V v) {
//如果當前任務(wù)的狀態(tài)為new,則設(shè)置為COMPLETING
if (STATE.compareAndSet(this, NEW, COMPLETING)) {
outcome = v;
//設(shè)置當前任務(wù)的狀態(tài)為normal,也就是任務(wù)正常結(jié)束
STATE.setRelease(this, NORMAL); // final state
finishCompletion();
}
}
如上的代碼首先使用CAS將當前任務(wù)的狀態(tài)從NEW轉(zhuǎn)換為COMPLETING。這里當有多個線程調(diào)用時只有一個線程會成功。成功的線程在此通過unsafe.putOrderInt設(shè)置任務(wù)的狀態(tài)為正常結(jié)束狀態(tài),這里沒有使用CAS時因為對于同一個任務(wù)只可能有一個線程運行到這里。在這里使用putOrderInt比使用CAS或者putLongvolatile效率更高,并且這里的場景不要求其他線程馬上對設(shè)置的狀態(tài)值可見。
思考個問題,在什么時候多個線程會同時執(zhí)行CAS將當前任務(wù)的狀態(tài)從NEW轉(zhuǎn)換到COMPLETING?其實當同一個command被多次提交到線程池就會存在這樣的情況,因為痛一個任務(wù)共享一個狀態(tài)值state。
如果任務(wù)執(zhí)行失敗,則執(zhí)行代碼13.1,setException的代碼如下,可見與set函數(shù)類似。
protected void setException(Throwable t) {
//如果當前任務(wù)的狀態(tài)為new,則設(shè)置為COMPLETING
if (STATE.compareAndSet(this, NEW, COMPLETING)) {
outcome = t;
//設(shè)置當前任務(wù)的狀態(tài)為EXCEPTIONAL,也就是任務(wù)非正常結(jié)束
STATE.setRelease(this, EXCEPTIONAL); // final state
finishCompletion();
}
}
到這里代碼10的邏輯執(zhí)行完畢,一次性任務(wù)也就執(zhí)行完畢了
scheduleWithFixedDelay
**** 該方法的作用是,當任務(wù)執(zhí)行完畢后,讓其延遲固定時間后再次運行(fixed-delay)。其中initialDelay表示提交任務(wù)后延遲多少時間可以執(zhí)行command任務(wù),delay表示當任務(wù)執(zhí)行完畢后延長多少時間后再次運行command任務(wù),unit是delay和initialDelay時間單位,任務(wù)會一直重復(fù)運行中直到運行中拋出異常,被取消了,或者關(guān)閉了線程池。
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
//14參數(shù)校驗 TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (delay <= 0L)
throw new IllegalArgumentException();
//15任務(wù)轉(zhuǎn)換,注意這里是period=-delay<0
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
-unit.toNanos(delay),
sequencer.getAndIncrement());
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
//16 添加任務(wù)到隊列
delayedExecute(t);
return t;
}
代碼14進行參數(shù)校驗,校驗失敗則拋出異常,代碼15將command任務(wù)轉(zhuǎn)化為ScheduledFutureTask,這里需要注意的是,傳遞給ScheduledFutureTask的period變量的值為-delay,period<0說明該任務(wù)時可重復(fù)執(zhí)行的任務(wù)。然后代碼16添加任務(wù)到延遲隊列后返回。
將任務(wù)添加到延遲隊列后線程池線程會從隊列里面獲取任務(wù),然后調(diào)用ScheduledFutureTask的run方法執(zhí)行,優(yōu)質(zhì)這里的period<0,所以isPeriodic返回true,所以執(zhí)行代碼11。
protected boolean runAndReset() {
//17
if (state != NEW ||
!RUNNER.compareAndSet(this, null, Thread.currentThread()))
return false;
//18
boolean ran = false;
int s = state;
try {
Callable<V> c = callable;
if (c != null && s == NEW) {
try {
c.call(); // don't set result
ran = true;
} catch (Throwable ex) {
setException(ex);
}
}
} finally {
runner = null;
s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
return ran && s == NEW;//19
}
這個代碼和FutureTask的run方法類似,只是任務(wù)正常執(zhí)行完畢后不會設(shè)置任務(wù)的狀態(tài),這樣做是為了讓任務(wù)成為可重復(fù)執(zhí)行的任務(wù),這里多了代碼19,這段代碼判斷當前任務(wù)正常執(zhí)行完畢并且任務(wù)狀態(tài)為NEW則返回true,否則返回false。如果true則執(zhí)行代碼11.1的setNextRunTime方法設(shè)置該任務(wù)下次一的執(zhí)行時間。setNextRunTime的代碼如下
private void setNextRunTime() {
long p = period;
if (p > 0)//fixed-rate類型任務(wù)
time += p;
else//fixed-delay類型任務(wù)
time = triggerTime(-p);
}
這里p<0說明當前任務(wù)為fixed-delay類型任務(wù)。然后設(shè)置time為當前時間加上-p的時間,也就是延遲-p時間后再次執(zhí)行
總結(jié):fixed-delay類型的任務(wù)的執(zhí)行原理為:當添加一個人任務(wù)到延遲隊列后,等待initialDelay時間,任務(wù)就會過期,過去的任務(wù)就會被從隊列移除,并執(zhí)行,執(zhí)行完畢后,會重新設(shè)置任務(wù)的延遲時間,然后再把任務(wù)放入延遲隊列,循環(huán)往復(fù),需要注意的是,如果一個任務(wù)在執(zhí)行中拋出了異常,那么這個任務(wù)就結(jié)束了,但是不影響其他任務(wù)的執(zhí)行
scheduleAtFixedRate
這個方法相對其實時間點以固定頻率調(diào)用指定的任務(wù)(fixed-rate)。當把任務(wù)提交到線程池并延遲initialDelay時間,時間單位為(unit)后開始執(zhí)行任務(wù)command。然后從initialDelay+period時間點再次執(zhí)行,而后在initialDelay+2*period時間點再次執(zhí)行,循環(huán)往復(fù),直到拋出異?;蛘哒{(diào)用了任務(wù)的cancel取消了任務(wù),或者關(guān)閉了線程池,scheduleAtFixedRate和scheduleWithFixedDelay類似。
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit) {
...
//裝飾任務(wù)類,注意period=period》0,不是負的
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(period),
return t;
}
在如上代碼中,在fixed-rate類型的任務(wù)command轉(zhuǎn)換為ScheduledFutureTask是設(shè)置period=period,不再是-period
所以當前任務(wù)執(zhí)行完畢后,調(diào)用setNextRunTime設(shè)置任務(wù)下次執(zhí)行的時間時執(zhí)行的是time+=p而不再是time=triggerTime(-p)
總結(jié):相對于fixed-delay任務(wù)來說,fixed-rate方法執(zhí)行規(guī)則為,時間為initdelday+nperiod時啟動任務(wù),但是如果當前任務(wù)還沒有執(zhí)行完,下一次要執(zhí)行任務(wù)的時間到了,則不會并發(fā)執(zhí)行,下次要執(zhí)行的任務(wù)會延遲執(zhí)行,要等到當前任務(wù)執(zhí)行完畢后在執(zhí)行*
總結(jié)
ScheduledThreadPoolExecutor的內(nèi)部使用了DelayedQueue來存放具體任務(wù)。任務(wù)分為三種,其中一次性執(zhí)行任務(wù)執(zhí)行完畢后就結(jié)束了,fixed-delay任務(wù)保證同一個任務(wù)在多次執(zhí)行期間間隔固定時間,fixed-rate任務(wù)保證按照固定的頻率執(zhí)行。任務(wù)類型使用period的值來劃分。

參考資料:
《Java并發(fā)編程之美》