什么是線程池
在 Java 中,如果每個(gè)請(qǐng)求到達(dá)就創(chuàng)建一個(gè)新線程, 創(chuàng)建和銷毀線程花費(fèi)的時(shí)間和消耗的系統(tǒng)資源都相當(dāng)大,甚至可能要比在處理實(shí)際的用戶請(qǐng)求的時(shí)間和資源要多的多。如果在一個(gè) Jvm 里創(chuàng)建太多的線程,可能會(huì)使系統(tǒng)由于過度消耗內(nèi)存或“切換過度”而導(dǎo)致系統(tǒng)資源不足。
為了解決這個(gè)問題,就有了線程池的概念,線程池的核心邏輯是提前創(chuàng)建好若干個(gè)線程放在一個(gè)容器中。如果有任務(wù)需要處理,則將任務(wù)直接分配給線程池中的線程來執(zhí)行就行,任務(wù)處理完以后這個(gè)線程不會(huì)被銷毀,而是等待后續(xù)分配任務(wù)。同時(shí)通過線程池來重復(fù)管理線程還可以避免創(chuàng)建大量線程增加開銷。
線程池的優(yōu)勢(shì)
合理的使用線程池,可以帶來一些好處
- 降低創(chuàng)建線程和銷毀線程的性能開銷
- 提高響應(yīng)速度,當(dāng)有新任務(wù)需要執(zhí)行是不需要等待線程創(chuàng)建就可以立馬執(zhí)行
- 合理的設(shè)置線程池大小可以避免因?yàn)榫€程數(shù)超過硬件資源瓶頸帶來的問題
Java 中提供的線程池 API
線程池的使用
要了解一個(gè)技術(shù),我們?nèi)匀皇菑氖褂瞄_始。 JDK 為我們提供了幾種不同的線程池實(shí)現(xiàn)。我們先來通過一個(gè)簡(jiǎn)單的案例來引入線程池的基本使用在 Java 中怎么創(chuàng)建線程池呢?下面這段代碼演示了創(chuàng)建三個(gè)固定線程數(shù)的線程池
public class ThreadPoolTest implements Runnable {
@Override
public void run() {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName());
}
static ExecutorService service = Executors.newFixedThreadPool(3);
public static void main(String[] args) {
for (int i = 0; i < 100; i++) {
service.execute(new ThreadPoolTest());
}
service.shutdown();
}
}
Java中提供的線程池Api
在Executors里面提供了幾個(gè)線程池的工廠方法,這樣,很多新手就不需要了解太多關(guān)于ThreadPoolExecutor的知識(shí)了,他們只需要直接使用Executors 的工廠方法,就可以使用線程池:
newFixedThreadPool: 該方法返回一個(gè)固定數(shù)量的線程池,線程數(shù)不變,當(dāng)有一個(gè)任務(wù)提交時(shí),若線程池中空閑,則立即執(zhí)行,若沒有,則會(huì)被暫緩在一個(gè)任務(wù)隊(duì)列中,等待有空閑的線程去執(zhí)行。
newSingleThreadExecutor: 創(chuàng)建一個(gè)線程的線程池,若空閑則執(zhí)行,若沒有空閑線程則暫緩在任務(wù)隊(duì)列中。
newCachedThreadPool: 返回一個(gè)可根據(jù)實(shí)際情況調(diào)整線程個(gè)數(shù)的線程池,不限制最大線程數(shù)量,若用空閑的線程則執(zhí)行任務(wù),若無任務(wù)則不創(chuàng)建線程。并且每一個(gè)空閑線程會(huì)在 60 秒后自動(dòng)回收。
newScheduledThreadPool: 創(chuàng)建一個(gè)可以指定線程的數(shù)量的線程池,但是這個(gè)線程池還帶有 延遲和周期性執(zhí)行任務(wù)的功能,類似定時(shí)器。
ThreadpoolExecutor
上面提到的四種線程池的構(gòu)建,都是基于 ThreadpoolExecutor 來構(gòu)建的,ThreadPoolThread 有哪些構(gòu)造參數(shù)
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
ThreadpoolExecutor有多個(gè)重載的構(gòu)造方法,我們可以基于它最完整的構(gòu)造方法來分析先來解釋一下每個(gè)參數(shù)的作用,稍后我們?cè)诜治鲈创a的過程中再來詳細(xì)了解參數(shù)的意義。
public ThreadPoolExecutor(int corePoolSize, //核心線程數(shù)量
int maximumPoolSize, //最大線程數(shù)
long keepAliveTime, //超時(shí)時(shí)間,超出核心線程數(shù)量以外的線程空余存活時(shí)間
TimeUnit unit, //存活時(shí)間單位
BlockingQueue<Runnable> workQueue, //保存執(zhí)行任務(wù)的隊(duì)列
ThreadFactory threadFactory,//創(chuàng)建新線程使用的工廠
RejectedExecutionHandler handler //當(dāng)任務(wù)無法執(zhí)行的時(shí)候的處理方式
)
線程池初始化以后做了什么事情
線程池初始化時(shí)是沒有創(chuàng)建線程的, 線程池里的線程的初始化與其他線程一樣,但是在完成任務(wù)以后,該線程不會(huì)自行銷毀,而是以掛起的狀態(tài)返回到線程池。直到應(yīng)用程序再次向線程池發(fā)出請(qǐng)求時(shí),線程池里掛起的線程就會(huì)再度激活執(zhí)行任務(wù)。這樣既節(jié)省了建立線程所造
成的性能損耗,也可以讓多個(gè)任務(wù)反復(fù)重用同一線程,從而在應(yīng)用程序生存期內(nèi)節(jié)約大量開銷。
newFixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
FixedThreadPool 的核心線程數(shù)和最大線程數(shù)都是指定值,也就是說當(dāng)線程池中的線程數(shù)超過核心線程數(shù)后,任務(wù)都會(huì)被放到阻塞隊(duì)列中。 另外keepAliveTime為 0,也就是超出核心線程數(shù)量以外的線程空余存活時(shí)間,而這里選用的阻塞隊(duì)列是 LinkedBlockingQueue,使用的是默認(rèn)容量 Integer.MAX_VALUE,相當(dāng)于沒有上限,這樣會(huì)有問題后面在具體分析。
這個(gè)線程池執(zhí)行任務(wù)的流程如下:
- 線程數(shù)少于核心線程數(shù),也就是設(shè)置的線程數(shù)時(shí),新建線程執(zhí)行任務(wù)。
- 線程數(shù)等于核心線程數(shù)后,將任務(wù)加入阻塞隊(duì)列。
- 由于隊(duì)列容量非常大, 可以一直添加。
- 執(zhí)行完任務(wù)的線程反復(fù)去隊(duì)列中取任務(wù)執(zhí)行。
用途:FixedThreadPool用于負(fù)載比較大的服務(wù)器,為了資源的合理利用,需要限制當(dāng)前線程數(shù)量。
newCachedThreadPool
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
CachedThreadPool創(chuàng)建一個(gè)可緩存線程池,如果線程池長(zhǎng)度超過處理需要,可靈活回收空閑線程,若無可回收,則新建線程; 并且沒有核心線程,非核心線程數(shù)無上限,但是每個(gè)空閑的時(shí)間只有 60 秒,超過后就會(huì)被回收。
它的執(zhí)行流程如下:
- 沒有核心線程,直接向
SynchronousQueue中提交任務(wù) - 如果有空閑線程,就去取出任務(wù)執(zhí)行;如果沒有空閑線程,就新建一個(gè)
- 執(zhí)行完任務(wù)的線程有 60 秒生存時(shí)間,如果在這個(gè)時(shí)間內(nèi)可以接到新任務(wù),就可以繼續(xù)活下去,否則就被回收。
newSingleThreadExecutor
創(chuàng)建一個(gè)單線程化的線程池,它只會(huì)用唯一的工作線程來執(zhí)行任務(wù),保證所有任務(wù)按照指定順序(FIFO, LIFO, 優(yōu)先級(jí))執(zhí)行。
線程池的實(shí)現(xiàn)原理分析
線程池的基本使用我們都清楚了,接下來我們來了解一下線程池的實(shí)現(xiàn)原理ThreadPoolExecutor 是線程池的核心,提供了線程池的實(shí)現(xiàn)。
ScheduledThreadPoolExecutor 繼承了 ThreadPoolExecutor,并另外提供一些調(diào)度方法以支持定時(shí)和周期任務(wù)。Executers 是工具類,主要用來創(chuàng)建線程池對(duì)象我們把一個(gè)任務(wù)提交給線程池去處理的時(shí)候,線程池的處理過程是什么樣的呢?首先直接來看看定義。
線程池原理分析(FixedThreadPool)

源碼分析
execute
基于源碼入口進(jìn)行分析,先看 execute方法
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {//1.當(dāng)前池中線程比核心數(shù)少,新建一個(gè)線程執(zhí)行任務(wù)
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {//2.核心池已滿,但任務(wù)隊(duì)列未滿,添加到隊(duì)列中
int recheck = ctl.get();//任務(wù)成功添加到隊(duì)列以后,再次檢查是否需要添加新的線程,因?yàn)橐汛嬖诘木€程可能被銷毀了
if (!isRunning(recheck) && remove(command))
reject(command);//如果線程池處于非運(yùn)行狀態(tài),并且把當(dāng)前的任務(wù)從任務(wù)隊(duì)列中移除成功,則拒絕該任務(wù)
else if (workerCountOf(recheck) == 0)//如果之前的線程已被銷毀完,新建一個(gè) 線程
addWorker(null, false);
} else if (!addWorker(command, false)) //3.核心池已滿,隊(duì)列已滿,試著創(chuàng)建一個(gè)新 線程
reject(command); //如果創(chuàng)建新線程失敗了,說明線程池被關(guān)閉或者線程池完全滿了, 拒絕任務(wù)
}
ctl 的作用
在線程池中, ctl貫穿在線程池的整個(gè)生命周期中
ctl:
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING,
0));
它是一個(gè)原子類,主要作用是用來保存線程數(shù)量和線程池的狀態(tài)。我們來分析一下這段代碼,其實(shí)比較有意思,他用到了位運(yùn)算一個(gè) int 數(shù)值是 32 個(gè) bit 位,這里采用高 3 位來保存運(yùn)行狀態(tài),低 29 位來保存線程數(shù)量。我們來分析默認(rèn)情況下,也就是 ctlOf(RUNNING)運(yùn)行狀態(tài),調(diào)用了 ctlOf(int rs,int wc)方法;
private static int ctlOf(int rs, int wc) { return rs | wc; }
其中 RUNNING =-1 << COUNT_BITS ; -1 左移 29 位. -1 的二進(jìn)制是 32 個(gè) 1(1111 1111 11111111 1111 1111 1111 1111)
-1 的二進(jìn)制計(jì)算方法
原碼是 1000…001 . 高位 1 表示符號(hào)位。
然后對(duì)原碼取反,高位不變得到 1111…110
然后對(duì)反碼進(jìn)行+1 ,也就是補(bǔ)碼操作, 最后得到 1111…1111
那么-1 <<左移 29 位, 也就是 【111】 表示; rs | wc 。二進(jìn)制的 111 | 000 。得到的結(jié)果仍然是 111
& 運(yùn)算規(guī)則:0&0=0; 0&1=0; 1&0=0; 1&1=1;
| 運(yùn)算規(guī)則:0|0=0; 0|1=1; 1|0=1; 1|1=1;
^ 運(yùn)算規(guī)則:0^0=0; 0^1=1; 1^0=1; 1^1=0;
~ 運(yùn)算規(guī)則:~1=0; ~0=1;
那么同理可得其他的狀態(tài)的 bit 位表示
private static final int COUNT_BITS = Integer.SIZE - 3; //32-3
private static final int CAPACITY = (1 << COUNT_BITS) - 1; //將 1 的二進(jìn)制向左位移 29 位,再減 1 表示最大線程容量
//運(yùn)行狀態(tài)保存在 int 值的高 3 位 (所有數(shù)值左移 29 位)
private static final int RUNNING = -1 << COUNT_BITS;// 接收新任務(wù),并執(zhí)行隊(duì)列中的任務(wù)
private static final int SHUTDOWN = 0 << COUNT_BITS;// 不接收新任務(wù),但是執(zhí)行隊(duì)列中的任務(wù)
private static final int STOP = 1 << COUNT_BITS;// 不接收新任務(wù),不執(zhí)行隊(duì)列中的任務(wù),中斷正在執(zhí)行中的任務(wù)
private static final int TIDYING = 2 << COUNT_BITS; //所有的任務(wù)都已結(jié)束,線程數(shù)量為 0,處于該狀態(tài)的線程池即將調(diào)用 terminated()方法
private static final int TERMINATED = 3 << COUNT_BITS;// terminated()方法執(zhí)行完成
更多關(guān)于ctl
https://www.dazhuanlan.com/2019/12/25/5e0296f9dcf20/
狀態(tài)轉(zhuǎn)化

addWorker
如果工作線程數(shù)小于核心線程數(shù)的話,會(huì)調(diào)用 addWorker,顧名思義,其實(shí)就是要?jiǎng)?chuàng)建一個(gè)工作線程。我們來看看源碼的實(shí)現(xiàn)源碼比較長(zhǎng),看起來比較唬人,其實(shí)就做了兩件事。
1)才用循環(huán) CAS 操作來將線程數(shù)加 1;
2)新建一個(gè)線程并啟用。
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
//goto 語句,避免死循環(huán)
for (; ; ) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
// 如果線程處于非運(yùn)行狀態(tài),并且 rs 不等于 SHUTDOWN 且 firstTask 不等于空且且
// workQueue 為空,直接返回 false(表示不可添加 work 狀態(tài))
// 1. 線程池已經(jīng) shutdown 后,還要添加新的任務(wù),拒絕
// 2. (第二個(gè)判斷)SHUTDOWN 狀態(tài)不接受新任務(wù),但仍然會(huì)執(zhí)行已經(jīng)加入任務(wù)隊(duì)列的任
// 務(wù),所以當(dāng)進(jìn)入 SHUTDOWN 狀態(tài),而傳進(jìn)來的任務(wù)為空,并且任務(wù)隊(duì)列不為空的時(shí)候,是允許添加
// 新線程的, 如果把這個(gè)條件取反,就表示不允許添加 worker
if (rs >= SHUTDOWN &&
!(rs == SHUTDOWN &&
firstTask == null &&
!workQueue.isEmpty()))
return false;
for (; ; ) { //自旋
int wc = workerCountOf(c);//獲得 Worker 工作線程數(shù)
//如果工作線程數(shù)大于默認(rèn)容量大小或者大于核心線程數(shù)大小,則直接返回 false 表示不能再添加 worker。
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))//通過 cas 來增加工作線程數(shù),
如果 cas 失敗,則直接重試
break retry;
c = ctl.get(); // Re-read ctl //再次獲取 ctl 的值
if (runStateOf(c) != rs) //這里如果不想等,說明線程的狀態(tài)發(fā)生了變化, 繼續(xù)重試
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}//上面這段代碼主要是對(duì) worker 數(shù)量做原子+1 操作,下面的邏輯才是正式構(gòu)建一個(gè) worker
boolean workerStarted = false; //工作線程是否啟動(dòng)的標(biāo)識(shí)
boolean workerAdded = false; //工作線程是否已經(jīng)添加成功的標(biāo)識(shí)
Worker w = null;
try {
w = new Worker(firstTask); //構(gòu)建一個(gè) Worker,這個(gè) worker 是什么呢?我們可以看到構(gòu)造方法里面?zhèn)魅肓艘粋€(gè) Runnable 對(duì)象
final Thread t = w.thread; //從 worker 對(duì)象中取出線程
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock(); //這里有個(gè)重入鎖,避免并發(fā)問題
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
//只有當(dāng)前線程池是正在運(yùn)行狀態(tài), [或是 SHUTDOWN 且 firstTask 為空],才
能添加到 workers 集合中
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
//任務(wù)剛封裝到 work 里面,還沒 start,你封裝的線程就是 alive,幾個(gè)意思?肯定是要拋異常出去的
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w); //將新創(chuàng)建的 Worker 添加到 workers 集合中
int s = workers.size();
//如果集合中的工作線程數(shù)大于最大線程數(shù),這個(gè)最大線程數(shù)表示線程池曾經(jīng)出現(xiàn)過的最大線程數(shù)
if (s > largestPoolSize)
largestPoolSize = s; //更新線程池出現(xiàn)過的最大線程數(shù)
workerAdded = true;//表示工作線程創(chuàng)建成功了
}
} finally {
mainLock.unlock(); //釋放鎖}
if (workerAdded) {//如果 worker 添加成功
t.start();//啟動(dòng)線程
workerStarted = true;
}
}
} finally{
if (!workerStarted)
addWorkerFailed(w); //如果添加失敗,就需要做一件事,就是遞減實(shí)際工作線程數(shù)(還記得我們最開始的時(shí)候增加了工作線程數(shù)嗎)
}
return workerStarted;//返回結(jié)果
}
}
Worker 類說明
我們發(fā)現(xiàn) addWorker方法只是構(gòu)造了一個(gè) Worker,并且把 firstTask封裝到 worker 中, 它是做什么的呢?我們來看看
- 每個(gè)
worker,都是一條線程,同時(shí)里面包含了一個(gè)firstTask,即初始化時(shí)要被首先執(zhí)行的任務(wù). - 最終執(zhí)行任務(wù)的,是
runWorker()方法,為什么這樣說,我們可以看到
this.thread = getThreadFactory().newThread(this);
在調(diào)用構(gòu)造方法時(shí),需要傳入任務(wù),這里通過getThreadFactory().newThread(this);來新建一個(gè)線程, newThread方法傳入的參數(shù)是this,因?yàn)?Worker本身繼承了Runnable接口,也就是一個(gè)線程,所以一個(gè) Worker對(duì)象在啟動(dòng)的時(shí)候會(huì)調(diào)用 Worker 類中的 run方法。Worker 類繼承了 AQS,并實(shí)現(xiàn)了 Runnable接口,注意其中的 firstTask 和 thread 屬性:firstTask用它來保存?zhèn)魅氲娜蝿?wù);thread是在調(diào)用構(gòu)造方法時(shí)通過ThreadFactory來創(chuàng)建的線程,是用來處理任務(wù)的線程。Worker 繼承了 AQS,使用 AQS 來實(shí)現(xiàn)獨(dú)占鎖的功能。為什么不使用 ReentrantLock來實(shí)現(xiàn)呢?可以看到 tryAcquire方法,它是不允許重入的,而 ReentrantLock是允許重入的:
lock 方法一旦獲取了獨(dú)占鎖,表示當(dāng)前線程正在執(zhí)行任務(wù)中; 那么它會(huì)有以下幾個(gè)作用
- 如果正在執(zhí)行任務(wù),則不應(yīng)該中斷線程;
- 如果該線程現(xiàn)在不是獨(dú)占鎖的狀態(tài),也就是空閑的狀態(tài),說明它沒有在處理任務(wù),這時(shí)可以對(duì)該線程進(jìn)行中斷;
- 線程池在執(zhí)行
shutdown方法或tryTerminate方法時(shí)會(huì)調(diào)用interruptIdleWorkers方法來中斷空閑的線程,interruptIdleWorkers方法會(huì)使用tryLock方法來判斷線程池中的線程是否是空閑狀態(tài) - 之所以設(shè)置為不可重入,是因?yàn)槲覀儾幌M蝿?wù)在調(diào)用像
setCorePoolSize這樣的線程池控制方法時(shí)重新獲取鎖,這樣會(huì)中斷正在運(yùn)行的線程
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable {
private static final long serialVersionUID = 6138294804551838833L;
/**
* Thread this worker is running in. Null if factory fails.
*/
final Thread thread; //注意了,這才是真正執(zhí)行 task 的線程,從構(gòu)造函數(shù)可知是由ThreadFactury 創(chuàng)建的
/**
* Initial task to run. Possibly null.
*/
Runnable firstTask; //這就是需要執(zhí)行的 task
/**
* Per-thread task counter
*/
volatile long completedTasks; //完成的任務(wù)數(shù),用于線程池統(tǒng)計(jì)
Worker(Runnable firstTask) {
setState(-1); //初始狀態(tài) -1,防止在調(diào)用 runWorker(),也就是真正執(zhí)行 task前中斷 thread。
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
public void run() {
runWorker(this);
}
protected boolean isHeldExclusively() {
return getState() != 0;
}
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() {
acquire(1);
}
public boolean tryLock() {
return tryAcquire(1);
}
public void unlock() {
release(1);
}
public boolean isLocked() {
return isHeldExclusively();
}
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null
&& !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
addWorkerFailed
addWorker 方法中,如果添加 Worker并且啟動(dòng)線程失敗,則會(huì)做失敗后的處理。這個(gè)方法主要做兩件事
- 如果
worker已經(jīng)構(gòu)造好了,則從workers集合中移除這個(gè)worker - 原子遞減核心線程數(shù)(因?yàn)樵?
addWorker方法中先做了原子增加) - 嘗試結(jié)束線程池
private void addWorkerFailed(Worker w) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (w != null)
workers.remove(w);
decrementWorkerCount();
tryTerminate();
} finally {
mainLock.unlock();
}
}
runWorker 方法
前面已經(jīng)了解了 ThreadPoolExecutor 的核心方法 addWorker,主要作用是增加工作線程,而Worker 簡(jiǎn)單理解其實(shí)就是一個(gè)線程,里面重新了 run方法,這塊是線程池中執(zhí)行任務(wù)的真正處理邏輯,也就是runWorker方法,這個(gè)方法主要做幾件事
- 如果
task不為空,則開始執(zhí)行task - 如果
task為空,則通過getTask()再去取任務(wù),并賦值給 task,如果取到的Runnable不為空,則執(zhí)行該任務(wù) - 執(zhí)行完畢后,通過
while循環(huán)繼續(xù)getTask()取任務(wù) - 如果
getTask()取到的任務(wù)依然是空,那么整個(gè)runWorker()方法執(zhí)行完畢
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
// unlock,表示當(dāng)前 worker 線程允許中斷,因?yàn)?new Worker 默認(rèn)的 state = -1, 此處是調(diào)用 Worker 類的 tryRelease() 方法,將 state 置為 0,
// 而 interruptIfStarted () 中只有 state>=0 才允許調(diào)用中斷
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
//注意這個(gè) while 循環(huán),在這里實(shí)現(xiàn)了 [線程復(fù)用] // 如果 task 為空,則通過getTask 來獲取任務(wù)
while (task != null || (task = getTask()) != null) {
//上鎖,不是為了防止并發(fā)執(zhí)行任務(wù),為了在 shutdown()時(shí)不終止正在運(yùn)行的 worker線程池為 stop 狀態(tài)時(shí)不接受新任務(wù),不執(zhí)行已經(jīng)加入任務(wù)隊(duì)列的任務(wù),還中斷正在執(zhí)行的任務(wù)
w.lock();
//所以對(duì)于 stop 狀態(tài)以上是要中斷線程的
//(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP)確保線程中斷標(biāo)志位為 true 且是 stop 狀態(tài)以上,接著清除了中斷標(biāo)志
//!wt.isInterrupted()則再一次檢查保證線程需要設(shè)置中斷標(biāo)志位
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
//這里默認(rèn)是沒有實(shí)現(xiàn)的,在一些特定的場(chǎng)景中我們可以自己繼承 ThreadpoolExecutor 自己重寫
Throwable thrown = null;
try {
task.run(); //執(zhí)行任務(wù)中的 run 方法
} catch (RuntimeException x) {
thrown = x;
throw x;
} catch (Error x) {
thrown = x;
throw x;
} catch (Throwable x) {
thrown = x;
throw new Error(x);
} finally {
afterExecute(task, thrown); //這里默認(rèn)默認(rèn)而也是沒有實(shí)現(xiàn)
}
} finally {
//置空任務(wù)(這樣下次循環(huán)開始時(shí),task 依然為 null,需要再通過 getTask() +記錄該 Worker 完成任務(wù)數(shù)量 +解鎖
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
//1.將入?yún)?worker 從數(shù)組 workers 里刪除掉;
//2.根據(jù)布爾值 allowCoreThreadTimeOut 來決定是否補(bǔ)充新的 Worker 進(jìn)數(shù)組workers
}
}
getTask
worker線程會(huì)從阻塞隊(duì)列中獲取需要執(zhí)行的任務(wù),這個(gè)方法不是簡(jiǎn)單的take 數(shù)據(jù),我們來分析下他的源碼實(shí)現(xiàn)你也許好奇是怎樣判斷線程有多久沒有活動(dòng)了,是不是以為線程池會(huì)啟動(dòng)一個(gè)監(jiān)控線程,專門監(jiān)控哪個(gè)線程正在偷懶?想太多,其實(shí)只是在線程從工作隊(duì)列 poll任務(wù)時(shí),加上了超時(shí)限制,如果線程在keepAliveTime 的時(shí)間內(nèi) poll不到任務(wù),那我就認(rèn)為這條線程沒事做,可以干掉了,看看這個(gè)代碼片段你就清楚了.
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (; ; ) {//自旋
int c = ctl.get();
int rs = runStateOf(c);
/*對(duì)線程池狀態(tài)的判斷,兩種情況會(huì) workerCount -1,并且返回 null
1. 線程池狀態(tài)為 shutdown,且 workQueue 為空(反映了 shutdown 狀態(tài)的線程池還是
要執(zhí)行 workQueue 中剩余的任務(wù)的)
2. 線程池狀態(tài)為 stop(shutdownNow() 會(huì)導(dǎo)致變成 STOP)(此時(shí)不用考慮 workQueue
的情況)*/
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;//返回 null,則當(dāng)前 worker 線程會(huì)退出
}
int wc = workerCountOf(c);
// timed 變量用于判斷是否需要進(jìn)行超時(shí)控制。
// allowCoreThreadTimeOut 默認(rèn)是 false,也就是核心線程不允許進(jìn)行超時(shí);
// wc > corePoolSize,表示當(dāng)前線程池中的線程數(shù)量大于核心線程數(shù)量;
// 對(duì)于超過核心線程數(shù)量的這些線程,需要進(jìn)行超時(shí)控制
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
/*1. 線程數(shù)量超過 maximumPoolSize 可能是線程池在運(yùn)行時(shí)被調(diào)用了 setMaximumPoolSize ()
被改變了大小,否則已經(jīng) addWorker () 成功不會(huì)超過 maximumPoolSize
2. timed && timedOut 如果為 true,表示當(dāng)前操作需要進(jìn)行超時(shí)控制,并且上次從阻塞隊(duì)列中
獲取任務(wù)發(fā)生了超時(shí).其實(shí)就是體現(xiàn)了空閑線程的存活時(shí)間*/
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
/*根據(jù) timed 來判斷,如果為 true,則通過阻塞隊(duì)列 poll 方法進(jìn)行超時(shí)控制,如果在
keepaliveTime 時(shí)間內(nèi)沒有獲取到任務(wù),則返回 null.
否則通過 take 方法阻塞式獲取隊(duì)列中的任務(wù)*/
Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)//如果拿到的任務(wù)不為空,則直接返回給 worker 進(jìn)行處理
return r;
timedOut = true;//如果 r==null,說明已經(jīng)超時(shí)了,設(shè)置 timedOut=true,在下次自旋的時(shí)候進(jìn)行回收
} catch (InterruptedException retry) {
timedOut = false;// 如果獲取任務(wù)時(shí)當(dāng)前線程發(fā)生了中斷,則設(shè)置 timedOut 為false 并返回循環(huán)重試
}
}
}
這里重要的地方是第二個(gè)if 判斷,目的是控制線程池的有效線程數(shù)量。由上文中的分析可以知道,在執(zhí)行execute方法時(shí),如果當(dāng)前線程池的線程數(shù)量超過了corePoolSize 且小于maximumPoolSize,并且 workQueue已滿時(shí),則可以增加工作線程,但這時(shí)如果超時(shí)沒有獲取到任務(wù),也就是 timedOut 為true的情況,說明workQueue 已經(jīng)為空了,也就說明了當(dāng)前線程池中不需要那么多線程來執(zhí)行任務(wù)了,可以把多于 corePoolSize數(shù)量的線程銷毀掉,保持線程數(shù)量在corePoolSize即可。什么時(shí)候會(huì)銷毀?當(dāng)然是 runWorker 方法執(zhí)行完之后,也就是Worker 中的 run方法執(zhí)行完,由 JVM 自動(dòng)回收。
getTask方法返回 null 時(shí),在 runWorker方法中會(huì)跳出while 循環(huán),然后會(huì)執(zhí)行processWorkerExit 方法。
processWorkerExit
runWorker 的while 循環(huán)執(zhí)行完畢以后,在finally中會(huì)調(diào)用processWorkerExit, 來銷毀工作線程。
到目前為止,我們已經(jīng)從 execute方法中輸入了 worker 線程的創(chuàng)建到執(zhí)行以及最后到銷毀的全部過程。那么我們繼續(xù)回到 execute 方法.我們只分析完addWorker這段邏輯,繼續(xù)來看后面的判斷
execute 后續(xù)邏輯分析
如果核心線程數(shù)已滿,說明這個(gè)時(shí)候不能再創(chuàng)建核心線程了,于是走第二個(gè)判斷,第二個(gè)判斷邏輯比較簡(jiǎn)單,如果線程池處于運(yùn)行狀態(tài)并且任務(wù)隊(duì)列沒有滿,則將任務(wù)添加到隊(duì)列中
第三個(gè)判斷,核心線程數(shù)滿了,隊(duì)列也滿了,那么這個(gè)時(shí)候創(chuàng)建新的線程也就是(非核心線程)如果非核心線程數(shù)也達(dá)到了最大線程數(shù)大小,則直接拒絕任務(wù)
if (isRunning(c) && workQueue.offer(command)) {//2.核心池已滿,但任務(wù)隊(duì)列未滿,添加到隊(duì)列中
int recheck = ctl.get();
//任務(wù)成功添加到隊(duì)列以后,再次檢查是否需要添加新的線程,因?yàn)橐汛嬖诘木€程可能被銷毀了
if (!isRunning(recheck) && remove(command))
reject(command);//如果線程池處于非運(yùn)行狀態(tài),并且把當(dāng)前的任務(wù)從任務(wù)隊(duì)列中移除成功,則拒絕該任務(wù)
else if (workerCountOf(recheck) == 0)//如果之前的線程已被銷毀完,新建一個(gè)線程
addWorker(null, false);
} else if (!addWorker(command, false)) //3.核心池已滿,隊(duì)列已滿,試著創(chuàng)建一個(gè)新線程
reject(command); //如果創(chuàng)建新線程失敗了,說明線程池被關(guān)閉或者線程池完全滿了,拒絕任務(wù)
拒絕策略
1、 AbortPolicy:直接拋出異常,默認(rèn)策略;
2、 CallerRunsPolicy:用調(diào)用者所在的線程來執(zhí)行任務(wù);
3、 DiscardOldestPolicy:丟棄阻塞隊(duì)列中靠最前的任務(wù),并執(zhí)行當(dāng)前任務(wù);
4、 DiscardPolicy:直接丟棄任務(wù);當(dāng)然也可以根據(jù)應(yīng)用場(chǎng)景實(shí)現(xiàn)RejectedExecutionHandler 接口,自定義飽和策略,如記錄日志或持久化存儲(chǔ)不能處理的任務(wù).
線程池的注意事項(xiàng)
分析完線程池以后,我們?cè)賮砹私庖幌戮€程池的注意事項(xiàng)
阿里開發(fā)手冊(cè)不建議使用線程池
線程池的構(gòu)建不允許使用Executors去創(chuàng)建,而是通過 ThreadPoolExecutor 的方式。分析完原理以后,大家自己一定要有一個(gè)答案。我來簡(jiǎn)單分析下,用Executors 使得用戶不需要關(guān)心線程池的參數(shù)配置,意味著大家對(duì)于線程池的運(yùn)行規(guī)則也會(huì)慢慢的忽略。這會(huì)導(dǎo)致一個(gè)問題,比如我們用 newFixdThreadPool 或者singleThreadPool.允許的隊(duì)列長(zhǎng)度為Integer.MAX_VALUE,如果使用不當(dāng)會(huì)導(dǎo)致大量請(qǐng)求堆積到隊(duì)列中導(dǎo)致 OOM 的風(fēng)險(xiǎn),而 newCachedThreadPool,允許創(chuàng)建線程數(shù)量為 Integer.MAX_VALUE,也可能會(huì)導(dǎo)致大量線程的創(chuàng)建出現(xiàn) CPU 使用過高或者 OOM 的問題
而如果我們通過 ThreadPoolExecutor 來構(gòu)造線程池的話,我們勢(shì)必要了解線程池構(gòu)造中每個(gè)參數(shù)的具體含義,使得開發(fā)者在配置參數(shù)的時(shí)候能夠更加謹(jǐn)慎。
如何合理配置線程池的大小
如何合理配置線程池大小。線程池大小不是靠猜,也不是說越多越好。
在遇到這類問題時(shí),先冷靜下來分析
- 需要分析線程池執(zhí)行的任務(wù)的特性: CPU 密集型還是 IO 密集型
- 每個(gè)任務(wù)執(zhí)行的平均時(shí)長(zhǎng)大概是多少,這個(gè)任務(wù)的執(zhí)行時(shí)長(zhǎng)可能還跟任務(wù)處理邏輯是否涉及到網(wǎng)絡(luò)傳輸以及底層系統(tǒng)資源依賴有關(guān)系
如果是 CPU 密集型, 主要是執(zhí)行計(jì)算任務(wù),響應(yīng)時(shí)間很快, cpu 一直在運(yùn)行,這種任務(wù) cpu的利用率很高,那么線程數(shù)的配置應(yīng)該根據(jù) CPU 核心數(shù)來決定, CPU 核心數(shù)=最大同時(shí)執(zhí)行線程數(shù),加入 CPU 核心數(shù)為 4,那么服務(wù)器最多能同時(shí)執(zhí)行 4 個(gè)線程。過多的線程會(huì)導(dǎo)致上下文切換反而使得效率降低。那線程池的最大線程數(shù)可以配置為 cpu 核心數(shù)+1
如果是 IO 密集型, 主要是進(jìn)行 IO 操作,執(zhí)行 IO 操作的時(shí)間較長(zhǎng),這是 cpu 出于空閑狀態(tài),導(dǎo)致 cpu 的利用率不高,這種情況下可以增加線程池的大小。這種情況下可以結(jié)合線程的等待時(shí)長(zhǎng)來做判斷,等待時(shí)間越高,那么線程數(shù)也相對(duì)越多。一般可以配置 cpu 核心數(shù)的 2 倍。
一個(gè)公式:線程池設(shè)定最佳線程數(shù)目 = ((線程池設(shè)定的線程等待時(shí)間+線程 CPU 時(shí)間) / 線程 CPU 時(shí)間 ) * CPU 數(shù)目
這個(gè)公式的線程 cpu 時(shí)間是預(yù)估的程序單個(gè)線程在 cpu 上運(yùn)行的時(shí)間(通常使用 loadrunner測(cè)試大量運(yùn)行次數(shù)求出平均值)
線程池中的線程初始化
默認(rèn)情況下,創(chuàng)建線程池之后,線程池中是沒有線程的,需要提交任務(wù)之后才會(huì)創(chuàng)建線程。在實(shí) 際中如果需要 線程池創(chuàng)建之 后立即創(chuàng)建線 程,可以通過 以下兩個(gè)方法 辦到:
prestartCoreThread():初始化一個(gè)核心線程;
prestartAllCoreThreads():初始化所有核心線程
ThreadPoolExecutor tpe = (ThreadPoolExecutor)service;
tpe.prestartAllCoreThreads();
線程池的關(guān)閉
ThreadPoolExecutor提 供 了 兩 個(gè) 方 法 , 用 于 線 程 池 的 關(guān) 閉 , 分 別 是 shutdown()和shutdownNow(),其中:
shutdown():不會(huì)立即終止線程池,而是要等所有任務(wù)緩存隊(duì)列中
的任務(wù)都執(zhí)行完后才終止,但再也不會(huì)接受新的任務(wù)
shutdownNow():立即終止線程池,并嘗試打斷正在執(zhí)行的任務(wù),并且清空任務(wù)緩存隊(duì)列,返回尚未執(zhí)行的任務(wù).
線程池容量的動(dòng)態(tài)調(diào)整
ThreadPoolExecutor提 供 了 動(dòng) 態(tài) 調(diào) 整 線 程 池 容 量 大 小 的 方 法 :setCorePoolSize()和setMaximumPoolSize(),
1.setCorePoolSize:設(shè)置核心池大小
2.setMaximumPoolSize:設(shè)置線程池最大能創(chuàng)建的線程數(shù)目大小
任務(wù)緩存隊(duì)列及排隊(duì)策略
在前面我們多次提到了任務(wù)緩存隊(duì)列,即 workQueue,它用來存放等待執(zhí)行的任務(wù)。workQueue 的類型為 BlockingQueue,通常可以取下面三種類型:
1.ArrayBlockingQueue:基于數(shù)組的先進(jìn)先出隊(duì)列,此隊(duì)列創(chuàng)建必須指定大小
2.LinkedBlockingQueue:基于鏈表的先進(jìn)先出隊(duì)列,如果創(chuàng)建時(shí)沒有指定此隊(duì)列大小,則默認(rèn)為Integer.MAX_VALUE;
3.SynchronousQueue:這個(gè)隊(duì)列比較特殊,它不會(huì)保存提交的任務(wù),而是將直接新建一個(gè)線程來執(zhí)行新來的任務(wù)
線程池的監(jiān)控
如果在項(xiàng)目中大規(guī)模的使用了線程池,那么必須要有一套監(jiān)控體系,來指導(dǎo)當(dāng)前線程池的狀態(tài),當(dāng)出現(xiàn)問題的時(shí)候可以快速定位到問題。而線程池提供了相應(yīng)的擴(kuò)展方法,我們通過重寫線程池的beforeExecute、 afterExecute 和 shutdown還有線程的異常處理 等方式就可以實(shí)現(xiàn)對(duì)線程的監(jiān)控,簡(jiǎn)單給大家演示一個(gè)案例
public class ThreadPoolMonitor extends ThreadPoolExecutor {
// 保存任務(wù)開始執(zhí)行的時(shí)間,當(dāng)任務(wù)結(jié)束時(shí),用任務(wù)結(jié)束時(shí)間減去開始時(shí)間計(jì)算任務(wù)執(zhí)行時(shí)間
private ConcurrentHashMap<String, Date> startTimes;
public ThreadPoolMonitor(int corePoolSize, int maximumPoolSize, long
keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory);
this.startTimes = new ConcurrentHashMap<>();
}
@Override
public void shutdown() {
System.out.println("已經(jīng)執(zhí)行的任務(wù)數(shù):" + this.getCompletedTaskCount() +
", 當(dāng)前活動(dòng)線程數(shù):" + this.getActiveCount() +
", 當(dāng)前排隊(duì)線程數(shù):" + this.getQueue().size());
System.out.println();
super.shutdown();
}
//任務(wù)開始之前記錄任務(wù)開始時(shí)間
@Override
protected void beforeExecute(Thread t, Runnable r) {
startTimes.put(String.valueOf(r.hashCode()), new Date());
super.beforeExecute(t, r);
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
Date startDate = startTimes.remove(String.valueOf(r.hashCode()));
Date finishDate = new Date();
long diff = finishDate.getTime() - startDate.getTime();
// 統(tǒng)計(jì)任務(wù)耗時(shí)、初始線程數(shù)、核心線程數(shù)、正在執(zhí)行的任務(wù)數(shù)量、
// 已完成任務(wù)數(shù)量、任務(wù)總數(shù)、隊(duì)列里緩存的任務(wù)數(shù)量、
// 池中存在的最大線程數(shù)、最大允許的線程數(shù)、線程空閑時(shí)間、線程池是否關(guān)閉、線程池是否終止
System.out.print("任務(wù)耗時(shí):" + diff + "\n");
System.out.print("初始線程數(shù):" + this.getPoolSize() + "\n");
System.out.print("核心線程數(shù):" + this.getCorePoolSize() + "\n");
System.out.print("正在執(zhí)行的任務(wù)數(shù)量:" + this.getActiveCount() + "\n");
System.out.print("已經(jīng)執(zhí)行的任務(wù)數(shù):" + this.getCompletedTaskCount() + "\n ");
System.out.print("任務(wù)總數(shù):" + this.getTaskCount() + "\n");
System.out.print("最大允許的線程數(shù):" + this.getMaximumPoolSize() + "\n");
System.out.print("線程空閑時(shí)間:" + this.getKeepAliveTime(TimeUnit.MILLISECONDS) + "\n ");
System.out.println();
super.afterExecute(r, t);
}
public static ExecutorService newCachedThreadPool() {
//1.實(shí)現(xiàn)一個(gè)自己的線程池工廠
ThreadFactory factory = (Runnable r) -> {
//創(chuàng)建一個(gè)線程
Thread t = new Thread(r);
//給創(chuàng)建的線程設(shè)置UncaughtExceptionHandler對(duì)象 里面實(shí)現(xiàn)異常的默認(rèn)邏輯
Thread.setDefaultUncaughtExceptionHandler((Thread thread1, Throwable e) -> {
System.out.println("線程工廠設(shè)置的exceptionHandler" + e.getMessage());
});
return t;
};
return new ThreadPoolMonitor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new
SynchronousQueue<>(), factory);
}
}
public class Task implements Runnable {
private static ExecutorService es = ThreadPoolMonitor.newCachedThreadPool();
@Override
public void run() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
for (int i = 0; i < 100; i++) {
es.execute(new Task());
}
es.shutdown();
}
}
Callable/Future 使用及原理分析
線程池的執(zhí)行任務(wù)有兩種方法,一種是 submit、一種是 execute;
這兩個(gè)方法是有區(qū)別的,那么基于這個(gè)區(qū)別我們?cè)賮砜纯础?/p>
execute 和 submit 區(qū)別
- execute 只可以接收一個(gè) Runnable 的參數(shù)
- execute 如果出現(xiàn)異常會(huì)拋出
- execute 沒有返回值
- submit 可以接收 Runable 和 Callable 這兩種類型的參數(shù),
- 對(duì)于 submit 方法,如果傳入一個(gè) Callable,可以得到一個(gè) Future 的返回值
- submit 方法調(diào)用不會(huì)拋異常,除非調(diào)用 Future.get,這里,我們重點(diǎn)了解一下 Callable/Future,可能很多人知道它是一個(gè)帶返回值的線程,但是具體的實(shí)現(xiàn)可能不清楚。
Callable/Future 案例演示
Callable/Future 和 Thread 之類的線程構(gòu)建最大的區(qū)別在于,能夠很方便的獲取線程執(zhí)行完以后的結(jié)果。首先來看一個(gè)簡(jiǎn)單的例子
public class CallableDemo implements Callable<String> {
@Override
public String call() throws Exception {
//Thread.sleep(3000);//阻塞案例演示
return "hello world";
}
public static void main(String[] args) throws ExecutionException,
InterruptedException {
CallableDemo callableDemo = new CallableDemo();
FutureTask futureTask = new FutureTask(callableDemo);
new Thread(futureTask).start();
System.out.println(futureTask.get());
}
}
想一想我們?yōu)槭裁葱枰褂没卣{(diào)呢?那是因?yàn)榻Y(jié)果值是由另一線程計(jì)算的,當(dāng)前線程是不知道結(jié)果值什么時(shí)候計(jì)算完成,所以它傳遞一個(gè)回調(diào)接口給計(jì)算線程,當(dāng)計(jì)算完成時(shí),調(diào)用這個(gè)回調(diào)接口,回傳結(jié)果值。這個(gè)在很多地方有用到,比如 Dubbo 的異步調(diào)用,比如消息中間件的異步通信等等…利用 FutureTask、 Callable、 Thread 對(duì)耗時(shí)任務(wù)(如查詢數(shù)據(jù)庫(kù))做預(yù)處理,在需要計(jì)算結(jié)果之前就啟動(dòng)計(jì)算。
所以我們來看一下 Future/Callable 是如何實(shí)現(xiàn)的.
Callable/Future 原理分析
在剛剛實(shí)現(xiàn)的 demo 中,我們用到了兩個(gè) api,分別是 Callable 和 FutureTask。
Callable 是一個(gè)函數(shù)式接口,里面就只有一個(gè) call 方法。子類可以重寫這個(gè)方法,并且這個(gè)方法會(huì)有一個(gè)返回值
@FunctionalInterface
public interface Callable<V> {
/**
* Computes a result, or throws an exception if unable to do so.
*
* @return computed result
* @throws Exception if unable to compute a result
*/
V call() throws Exception;
}
FutureTask
FutureTask 的類關(guān)系圖如下,它實(shí)現(xiàn) RunnableFuture 接口,那么這個(gè) RunnableFuture 接口的作用是什么呢。
在講解 FutureTask 之前,先看看 Callable, Future, FutureTask 它們之間的關(guān)系圖,如下:

public interface RunnableFuture<V> extends Runnable, Future<V> {
/**
* Sets this Future to the result of its computation
* unless it has been cancelled.
*/
void run();
}
RunnableFuture 是一個(gè)接口,它繼承了 Runnable 和Future這兩個(gè)接口, Runnable 太熟悉了, 那么 Future是什么呢?
Future表示一個(gè)任務(wù)的生命周期,并提供了相應(yīng)的方法來判斷是否已經(jīng)完成或取消,以及獲取任務(wù)的結(jié)果和取消任務(wù)等。
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
// 當(dāng)前的 Future 是否被取消,返回 true 表示已取消
boolean isCancelled();
// 當(dāng)前 Future 是否已結(jié)束。包括運(yùn)行完成、拋出異常以及取消,都表示當(dāng)前 Future 已結(jié)束
boolean isDone();
// 獲取 Future 的結(jié)果值。如果當(dāng)前 Future 還沒有結(jié)束,那么當(dāng)前線程就等待,
// 直到 Future 運(yùn)行結(jié)束,那么會(huì)喚醒等待結(jié)果值的線程的。
V get() throws InterruptedException, ExecutionException;// 獲取 Future 的結(jié)果值。與 get()相比較多了允許設(shè)置超時(shí)時(shí)間
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
分析到這里我們其實(shí)有一些初步的頭緒了, FutureTask是Runnable 和Future 的結(jié)合,如果我們把 Runnable比作是生產(chǎn)者,Future 比作是消費(fèi)者,那么 FutureTask是被這兩者共享的,生產(chǎn)者運(yùn)行 run方法計(jì)算結(jié)果,消費(fèi)者通過 get方法獲取結(jié)果。
作為生產(chǎn)者消費(fèi)者模式,有一個(gè)很重要的機(jī)制,就是如果生產(chǎn)者數(shù)據(jù)還沒準(zhǔn)備的時(shí)候,消費(fèi)者會(huì)被阻塞。當(dāng)生產(chǎn)者數(shù)據(jù)準(zhǔn)備好了以后會(huì)喚醒消費(fèi)者繼續(xù)執(zhí)行。
這個(gè)有點(diǎn)像我們上次可分析的阻塞隊(duì)列,那么在 FutureTask里面是基于什么方式實(shí)現(xiàn)的呢?
state 的含義
表示 FutureTask 當(dāng)前的狀態(tài),分為七種狀態(tài)
private static final int NEW = 0; // NEW 新建狀態(tài),表示這個(gè) FutureTask還沒有開始運(yùn)行
// COMPLETING 完成狀態(tài), 表示 FutureTask 任務(wù)已經(jīng)計(jì)算完畢了
// 但是還有一些后續(xù)操作,例如喚醒等待線程操作,還沒有完成。
private static final int COMPLETING = 1;
// FutureTask 任務(wù)完結(jié),正常完成,沒有發(fā)生異常
private static final int NORMAL = 2;
// FutureTask 任務(wù)完結(jié),因?yàn)榘l(fā)生異常。
private static final int EXCEPTIONAL = 3;
// FutureTask 任務(wù)完結(jié),因?yàn)槿∠蝿?wù)
private static final int CANCELLED = 4;
// FutureTask 任務(wù)完結(jié),也是取消任務(wù),不過發(fā)起了中斷運(yùn)行任務(wù)線程的中斷請(qǐng)求
private static final int INTERRUPTING = 5;
// FutureTask 任務(wù)完結(jié),也是取消任務(wù),已經(jīng)完成了中斷運(yùn)行任務(wù)線程的中斷請(qǐng)求
private static final int INTERRUPTED = 6;
run 方法
public void run() {
// 如果狀態(tài) state 不是 NEW,或者設(shè)置 runner 值失敗// 表示有別的線程在此之前調(diào)用 run 方法,并成功設(shè)置了 runner 值
// 保證了只有一個(gè)線程可以運(yùn)行 try 代碼塊中的代碼。
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {/只有 c 不為 null 且狀態(tài) state 為 NEW 的情
況
V result;
boolean ran;
try {
result = c.call(); //調(diào)用 callable 的 call 方法,并獲得返回結(jié)果
ran = true;//運(yùn)行成功
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex); //設(shè)置異常結(jié)果,
}
if (ran)
set(result);//設(shè)置結(jié)果
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
其實(shí)run方法作用非常簡(jiǎn)單,就是調(diào)用 callable 的 call方法返回結(jié)果值result,根據(jù)是否發(fā)生異常,調(diào)用 set(result)或 setException(ex)方法表示 FutureTask 任務(wù)完結(jié)。
不過因?yàn)?FutureTask任務(wù)都是在多線程環(huán)境中使用,所以要注意并發(fā)沖突問題。注意在 run方法中,我們沒有使用synchronized 代碼塊或者 Lock 來解決并發(fā)問題,而是使用了 CAS 這個(gè)樂觀鎖來實(shí)現(xiàn)并發(fā)安全,保證只有一個(gè)線程能運(yùn)行FutureTask 任務(wù)
get方法
get 方法就是阻塞獲取線程執(zhí)行結(jié)果,這里主要做了兩個(gè)事情
- 判斷當(dāng)前的狀態(tài),如果狀態(tài)小于等于 COMPLETING,表示 FutureTask 任務(wù)還沒有完結(jié),所以調(diào)用 awaitDone 方法,讓當(dāng)前線程等待。
- report 返回結(jié)果值或者拋出異常
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}
awaitDone
如果當(dāng)前的結(jié)果還沒有被執(zhí)行完,把當(dāng)前線程插入到等待隊(duì)列
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
final long deadline = timed ? System.nanoTime() +
nanos : 0L;
WaitNode q = null;
boolean queued = false; // 節(jié)點(diǎn)是否已添加
for (; ; ) {
// 如果當(dāng)前線程中斷標(biāo)志位是 true,
// 那么從列表中移除節(jié)點(diǎn) q,并拋出 InterruptedException 異常
if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}
int s = state;
if (s > COMPLETING) { // 當(dāng)狀態(tài)大于 COMPLETING 時(shí),表示 FutureTask 任務(wù)已結(jié)束。
if (q != null)
q.thread = null; // 將節(jié)點(diǎn) q 線程設(shè)置為 null,因?yàn)榫€程沒有阻塞等待
return s;
}// 表示還有一些后序操作沒有完成,那么當(dāng)前線程讓出執(zhí)行權(quán)
else if (s == COMPLETING) // cannot time out yet
Thread.yield();
//表示狀態(tài)是 NEW,那么就需要將當(dāng)前線程阻塞等待。
// 就是將它插入等待線程鏈表中,
else if (q == null)
q = new WaitNode();
else if (!queued)
// 使用 CAS 函數(shù)將新節(jié)點(diǎn)添加到鏈表中,如果添加失敗,那么 queued 為 false,
// 下次循環(huán)時(shí),會(huì)繼續(xù)添加,直到成功。
queued = UNSAFE.compareAndSwapObject(this,
waitersOffset,
q.next =
waiters, q);
else if (timed) {// timed 為 true 表示需要設(shè)置超時(shí)
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
LockSupport.parkNanos(this, nanos); // 讓當(dāng)前線程等待 nanos 時(shí)間
} else
LockSupport.park(this);
}
}
被阻塞的線程,會(huì)等到 run 方法執(zhí)行結(jié)束之后被喚醒
report
report方法就是根據(jù)傳入的狀態(tài)值 s,來決定是拋出異常,還是返回結(jié)果值。 這個(gè)兩種情況都表示 FutureTask完結(jié)了.
private V report(int s) throws ExecutionException {
Object x = outcome;//表示 call 的返回值
if (s == NORMAL) // 表示正常完結(jié)狀態(tài),所以返回結(jié)果值
return (V) x;
// 大于或等于 CANCELLED,都表示手動(dòng)取消 FutureTask 任務(wù),
// 所以拋出 CancellationException 異常
if (s >= CANCELLED)
throw new CancellationException();
// 否則就是運(yùn)行過程中,發(fā)生了異常,這里就拋出這個(gè)異常
throw new ExecutionException((Throwable) x);
}
線程池對(duì)于 Future/Callable 的執(zhí)行
我們現(xiàn)在再來看線程池里面的 submit 方法,就會(huì)很清楚了。
public class CallableDemo implements Callable<String> {
@Override
public String call() throws Exception {
//Thread.sleep(3000);//阻塞案例演示
return "hello world";
}
public static void main(String[] args) throws ExecutionException,
InterruptedException {
ExecutorService es = Executors.newFixedThreadPool(1);
CallableDemo callableDemo = new CallableDemo();
Future future = es.submit(callableDemo);
System.out.println(future.get());
}
}
AbstractExecutorService.submit
調(diào)用抽象類中的 submit 方法,這里其實(shí)相對(duì)于 execute 方法來說,只多做了一步操作,就是封裝了一個(gè) RunnableFuture.
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
ThreadpoolExecutor.execute
然后調(diào)用 execute 方法,這里面的邏輯前面分析過了,會(huì)通過 worker 線程來調(diào)用過 ftask 的run 方法。而這個(gè) ftask 其實(shí)就是 FutureTask 里面最終實(shí)現(xiàn)的邏輯.