線程池中有一定數(shù)量的工作線程,工作線程會循環(huán)從任務(wù)隊(duì)列中獲取任務(wù),并執(zhí)行這個(gè)任務(wù)。那么怎么去停止這些工作線程呢?
這里就涉及到線程池兩個(gè)重要概念:工作線程數(shù)量和線程池狀態(tài)。
一.線程池狀態(tài)和工作線程數(shù)量
這本來是兩個(gè)不同的概念,但是在ThreadPoolExecutor中我們使用一個(gè)變量ctl來存儲這兩個(gè)值,這樣我們只需要維護(hù)這一個(gè)變量的并發(fā)問題,提高運(yùn)行效率。
/**
* 記錄線程池中Worker工作線程數(shù)量和線程池的狀態(tài)
* int類型是32位,它的高3位,表示線程池的狀態(tài),低29位表示W(wǎng)orker的數(shù)量
*/
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// COUNT_BITS 29位,
private static final int COUNT_BITS = Integer.SIZE - 3;
// 表示線程池中創(chuàng)建Worker工作線程數(shù)量的最大值。即 0b0001.....1(29位1)
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
怎么使用一個(gè)變量ctl存儲兩個(gè)值呢?
就是利用int變量的高3位來儲存線程池狀態(tài),用int變量的低29位來儲存工作線程數(shù)量。
這樣就有兩個(gè)需要注意的地方:
- 工作線程數(shù)量最大值不能超過int類型29位的值CAPACITY 即0b0001.....1(29位1)
- 因?yàn)榫€程池狀態(tài)都是高3位儲存的,所以工作線程數(shù)量不會影響狀態(tài)值大小關(guān)系。
1.1 線程池狀態(tài)
// 高3位值是111
private static final int RUNNING = -1 << COUNT_BITS;
// 高3位值是000
private static final int SHUTDOWN = 0 << COUNT_BITS;
// 高3位值是001
private static final int STOP = 1 << COUNT_BITS;
// 高3位值是010
private static final int TIDYING = 2 << COUNT_BITS;
// 高3位值是011
private static final int TERMINATED = 3 << COUNT_BITS;
線程池狀態(tài)分析:
- RUNNING狀態(tài):線程池剛創(chuàng)建時(shí)的狀態(tài)。向任務(wù)隊(duì)列中添加任務(wù),并執(zhí)行任務(wù)隊(duì)列中的任務(wù)。因?yàn)楦?位值是111,即處于RUNNING狀態(tài)下的ctl值都是負(fù)數(shù)。
- SHUTDOWN狀態(tài): 調(diào)用shutdown方法,會將線程池設(shè)置成這個(gè)狀態(tài)。不能向任務(wù)隊(duì)列中添加任務(wù),但是可以執(zhí)行任務(wù)隊(duì)列中已添加的任務(wù)。并且處于SHUTDOWN狀態(tài)下正在運(yùn)行任務(wù)的工作線程不能中斷的,就是保證任務(wù)能夠執(zhí)行完成。
- STOP狀態(tài): 調(diào)用shutdownNow方法,會將線程池設(shè)置成這個(gè)狀態(tài)。不能向任務(wù)隊(duì)列中添加任務(wù),也不能再執(zhí)行任務(wù)隊(duì)列中已添加的任務(wù)。
- TIDYING狀態(tài): 調(diào)用tryTerminate方法,可能會將線程池設(shè)置成這個(gè)狀態(tài)。這個(gè)只是中斷過度狀態(tài),表示線程池即將變成TERMINATED狀態(tài)。
- TERMINATED狀態(tài): 調(diào)用tryTerminate方法,可能會將線程池設(shè)置成這個(gè)狀態(tài)。表示線程池已經(jīng)完全終止,即任務(wù)隊(duì)列為空,工作線程數(shù)量也是0.
線程池為什么要定義這么多狀態(tài)呢?按道理說線程池只應(yīng)該有運(yùn)行和終止這兩種狀態(tài)啊。
主要是因?yàn)榻K止線程池時(shí),要考慮正在執(zhí)行的任務(wù)和已經(jīng)添加到任務(wù)隊(duì)列中待執(zhí)行的任務(wù)該如何處理,否則的話,這些任務(wù)可能就會被丟失。
線程池提供了兩個(gè)方式處理:
- shutdown方法: 它會將線程池狀態(tài)變成SHUTDOWN 狀態(tài)。禁止向添加新的任務(wù),但是會讓任務(wù)隊(duì)列中的任務(wù)繼續(xù)執(zhí)行,最后釋放所有的工作線程,讓線程池狀態(tài)變成TERMINATED狀態(tài)。
- shutdownNow方法: 它會將線程池狀態(tài)變成STOP 狀態(tài)。禁止向添加新的任務(wù),也不會執(zhí)行任務(wù)隊(duì)列中的任務(wù),但是會返回這個(gè)任務(wù)集合,釋放所有的工作線程,讓線程池狀態(tài)變成TERMINATED狀態(tài)。
1.2 操作ctl的方法
1.2.1 獲取線程池的狀態(tài)
/**
* 獲取線程池的狀態(tài)。因?yàn)榫€程池的狀態(tài)是使用高3位儲存,所以屏蔽低29位就行了。
* 所以就c與~CAPACITY(0b1110..0)進(jìn)行&操作,屏蔽低29位的值了。
* 注意:這里是屏蔽低29位的值,而不是右移29位。
*/
private static int runStateOf(int c) { return c & ~CAPACITY; }
1.2.2 獲取工作線程數(shù)量
/**
* 獲取線程池中Worker工作線程的數(shù)量,
* 因?yàn)橹皇褂玫?9位保存Worker的數(shù)量,只要屏蔽高3位的值就行了
* 所以就c與CAPACITY(0b0001...1)進(jìn)行&操作,屏蔽高3位的值了。
*/
private static int workerCountOf(int c) { return c & CAPACITY; }
1.2.3 合并ctl的值
/**
* 得到ctl的值。
* 接受兩個(gè)參數(shù)rs和wc。rs表示線程池的狀態(tài),wc表示W(wǎng)orker工作線程的數(shù)量。
* 對于rs來說我們只需要高3位的值,對于wc來說我們需要低29位的值。
* 所以我們將rs | wc就可以得到ctl的值了。
*/
private static int ctlOf(int rs, int wc) { return rs | wc; }
1.2.4 其他方法
// 因?yàn)镽UNNING狀態(tài)高三位是111,所以狀態(tài)值rs與工作線程數(shù)量ws相與的結(jié)果值c一定是個(gè)負(fù)數(shù),
// 而其他狀態(tài)值都是大于等于0的數(shù),所以c是負(fù)數(shù),那么表示當(dāng)前線程處于運(yùn)行狀態(tài)。
private static boolean isRunning(int c) {
return c < SHUTDOWN;
}
/**
* 使用CAS函數(shù)將ctl值自增
*/
private boolean compareAndIncrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect + 1);
}
/**
* 使用CAS函數(shù)將ctl值自減
*/
private boolean compareAndDecrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect - 1);
}
/**
* 使用CAS函數(shù)加循環(huán)方法這種樂觀鎖的方式,解決并發(fā)問題。
* 保證使ctl值減一
*/
private void decrementWorkerCount() {
do {} while (! compareAndDecrementWorkerCount(ctl.get()));
}
二. 重要成員變量
// 記錄線程池中Worker工作線程數(shù)量和線程池的狀態(tài)
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 任務(wù)線程的阻塞隊(duì)列,因?yàn)槭亲枞?duì)列,所以它是并發(fā)安全的
private final BlockingQueue<Runnable> workQueue;
// 獨(dú)占鎖,用來保證操作成員變量的并發(fā)安全問題
private final ReentrantLock mainLock = new ReentrantLock();
// 等待線程池完全終止的條件Condition,
private final Condition termination = mainLock.newCondition();
//----------------- 需要mainLock來保證并發(fā)安全-------------------------//
// 線程池中工作線程集合。Worker中持有線程thread變量
private final HashSet<Worker> workers = new HashSet<Worker>();
// 線程池中曾擁有過的最大工作線程個(gè)數(shù)
private int largestPoolSize;
// 線程池完成過任務(wù)的總個(gè)數(shù)
private long completedTaskCount;
//----------------- 需要mainLock來保證并發(fā)安全-------------------------//
// 創(chuàng)建線程的工廠類
private volatile ThreadFactory threadFactory;
// 當(dāng)任務(wù)被拒絕時(shí),用來處理這個(gè)被拒絕的任務(wù)
private volatile RejectedExecutionHandler handler;
// 工作線程空閑的超時(shí)時(shí)間keepAliveTime
private volatile long keepAliveTime;
// 是否允許核心池線程超時(shí)釋放
private volatile boolean allowCoreThreadTimeOut;
// 線程池核心池線程個(gè)數(shù)
private volatile int corePoolSize;
// 線程池最大的線程個(gè)數(shù)
private volatile int maximumPoolSize;
成員變量的含義已經(jīng)標(biāo)注了:
- mainLock:使用mainLock來保證會發(fā)生變化成員變量的并發(fā)安全問題。會發(fā)生的成員變量有5個(gè):ctl、workQueue、workers、largestPoolSize和completedTaskCount。但是其中ctl和workQueue的類型本身就是多線程安全的,所以不用mainLock鎖保護(hù)。
- termination:等待線程池完全終止的條件,如果線程池沒有完全終止,調(diào)用它的awaitNanos方法,讓線程等待。當(dāng)線程池完全終止后,調(diào)用它的signalAll方法,喚醒所有等待termination條件的線程。
- workers:記錄所有的工作線程Worker
- workQueue:記錄所有待執(zhí)行的任務(wù)。使用阻塞隊(duì)列BlockingQueue,可以在隊(duì)列為空時(shí),線程等待,隊(duì)列有值時(shí),喚醒等待的線程。
- largestPoolSize:線程池中曾擁有過的最大工作線程個(gè)數(shù)
- completedTaskCount:線程池完成過任務(wù)的總個(gè)數(shù)
- threadFactory:創(chuàng)建線程的工廠類
- handler:當(dāng)任務(wù)被拒絕時(shí),用來處理這個(gè)被拒絕的任務(wù)
- keepAliveTime:工作線程允許空閑的超時(shí)時(shí)間,一般都是針對超過核心池?cái)?shù)量的工作線程。
- allowCoreThreadTimeOut: 是否允許核心池的工作線程超時(shí)釋放。
- corePoolSize:線程池核心池線程個(gè)數(shù)。
- maximumPoolSize: 線程池最大的線程個(gè)數(shù)。
這里注意一下兩個(gè)概念核心池個(gè)數(shù)和最大線程池個(gè)數(shù):
- 核心池個(gè)數(shù)就是線程池能夠維持的常用工作線程個(gè)數(shù),當(dāng)工作線程沒有執(zhí)行任務(wù)空閑時(shí),它不會被銷毀,而是在等待。但是如果設(shè)置allowCoreThreadTimeOut為true,那么核心池工作線程也是會被銷毀。
- 最大線程池個(gè)數(shù)就是線程池允許開啟的最大工作線程個(gè)數(shù)。最大線程池的意義就是當(dāng)核心池的工作線程不夠用,且任務(wù)隊(duì)列也已經(jīng)滿了,不能添加新的任務(wù)了,那么就要開啟新的工作線程來執(zhí)行任務(wù)。
三. 執(zhí)行任務(wù)execute方法
在線程池中如何執(zhí)行一個(gè)任務(wù)command,要分三種情況:
- 線程池中工作線程的數(shù)量沒有達(dá)到核心池個(gè)數(shù),那么線程池就應(yīng)該開啟新的工作線程來執(zhí)行任務(wù)。
- 線程池中工作線程的數(shù)量達(dá)到核心池個(gè)數(shù),那么就應(yīng)該將任務(wù)添加到任務(wù)隊(duì)列中,等待著工作線程去任務(wù)隊(duì)列中獲取任務(wù)并執(zhí)行。
- 如果任務(wù)添加到任務(wù)隊(duì)列失敗,那么就要開啟新的工作線程來執(zhí)行任務(wù)。
public void execute(Runnable command) {
// 如果command為null,拋出異常
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 分為三個(gè)步驟:
* 1. 如果運(yùn)行的工作線程數(shù)量少于核心池?cái)?shù)量corePoolSize,
* 那么就調(diào)用addWorker方法開啟一個(gè)新的工作線程,運(yùn)行任務(wù)command。
* 2. 如果開啟新的工作線程失敗,就將任務(wù)添加到任務(wù)隊(duì)列中。
* 3. 添加到任務(wù)隊(duì)列失敗,
* 那么仍然addWorker方法在最大池中開啟一個(gè)新的工作線程,運(yùn)行任務(wù)command。
*/
int c = ctl.get();
// 運(yùn)行的工作線程數(shù)量少于核心池?cái)?shù)量corePoolSize
if (workerCountOf(c) < corePoolSize) {
/**
* 開啟一個(gè)新的工作線程,運(yùn)行任務(wù)command。
* 返回true,表示開啟工作線程成功,直接return。
* 返回false,表示沒有開啟新線程。那么任務(wù)command就沒有運(yùn)行,所以要執(zhí)行下面代碼。
*/
if (addWorker(command, true))
return;
c = ctl.get();
}
// 線程池處于運(yùn)行狀態(tài),
// 且任務(wù)添加到任務(wù)阻塞隊(duì)列workQueue中成功,即workQueue隊(duì)列有剩余空間。
if (isRunning(c) && workQueue.offer(command)) {
// 再次檢查線程池狀態(tài)和工作線程數(shù)量
int recheck = ctl.get();
/**
* 如果線程池不在運(yùn)行狀態(tài),那么就調(diào)用remove方法移除workQueue隊(duì)列這個(gè)任務(wù)command,
* 如果移除成功,那么調(diào)用reject(command)方法,進(jìn)行拒絕任務(wù)的處理。
* 如果移除失敗,那么這個(gè)任務(wù)還是會被執(zhí)行,那么就不用調(diào)用reject(command)方法
*/
if (! isRunning(recheck) && remove(command))
reject(command);
// 如果工作線程數(shù)量為0,但是workQueue隊(duì)列中我們添加過任務(wù),
// 那么必須調(diào)用addWorker方法,開啟一個(gè)新的工作線程。
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 調(diào)用addWorker方法,開啟一個(gè)新的工作線程,運(yùn)行任務(wù)command。
// 如果還是失敗,那么這個(gè)任務(wù)command就不會不可能執(zhí)行了,
// 那么調(diào)用reject(command)方法拒絕這個(gè)任務(wù)
else if (!addWorker(command, false))
reject(command);
}
方法流程上面已經(jīng)有標(biāo)注,注意有以下幾點(diǎn):
- addWorker(Runnable firstTask, boolean core):表示開啟一個(gè)新的工作線程執(zhí)行任務(wù)firstTask。core是用來判斷核心池還是最大池。返回false,表示開啟新線程失敗,即任務(wù)firstTask沒有機(jī)會執(zhí)行。
- isRunning(c)線程池處于RUNNING狀態(tài),只有處于RUNNING狀態(tài)下,才能將任務(wù)添加到任務(wù)隊(duì)列。
- reject(command) 當(dāng)任務(wù)command不能在線程池中執(zhí)行時(shí),就會調(diào)用這個(gè)方法,告訴調(diào)用值,線程池拒絕執(zhí)行這個(gè)任務(wù)。
四. 添加工作線程addWorker方法
就是利用任務(wù)task創(chuàng)建一個(gè)新的工作線程Work,然后將它添加到工作線程集合workers中。但是需要注意多線程并發(fā)問題。
private boolean addWorker(Runnable firstTask, boolean core) {
// 利用死循環(huán)和CAS函數(shù),實(shí)現(xiàn)樂觀鎖,來實(shí)現(xiàn)多線程改變ctl值的并發(fā)問題
// 因?yàn)閏tl值代表兩個(gè)東西,工作線程數(shù)量和線程池狀態(tài)。
// 這里就用了兩個(gè)for循環(huán),一個(gè)是線程池狀態(tài)的for循環(huán),一個(gè)是工作線程數(shù)量的for循環(huán)
retry:
for (;;) {
int c = ctl.get();
// 獲取線程池運(yùn)行狀態(tài)rs,
int rs = runStateOf(c);
// 首先判斷線程池狀態(tài)和任務(wù)隊(duì)列狀態(tài),
// 來判斷能否創(chuàng)建新的工作線程
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
// 線程池中工作線程數(shù)量wc
int wc = workerCountOf(c);
// 當(dāng)線程池工作線程數(shù)量wc大于線程上限CAPACITY,
// 或者用戶規(guī)定核心池?cái)?shù)量corePoolSize或用戶規(guī)定最大線程池?cái)?shù)量maximumPoolSize
// 表示不能創(chuàng)建工作線程了,所以返回false
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 使用CAS函數(shù),使工作線程數(shù)量wc加一
if (compareAndIncrementWorkerCount(c))
// 跳出retry循環(huán)
break retry;
// 來到這里表示CAS函數(shù)失敗,那么就要循環(huán)重新判斷
// 但是c還代表線程狀態(tài),如果線程狀態(tài)改變,那么就必須跳轉(zhuǎn)到retry循環(huán)
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
// 工作線程是否開始,即調(diào)用了線程的start方法
boolean workerStarted = false;
// 工作線程是否添加到工作線程隊(duì)列workers中
boolean workerAdded = false;
Worker w = null;
try {
// 創(chuàng)建一個(gè)Worker對象
w = new Worker(firstTask);
// 得到Worker所擁有的線程thread
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
// 并發(fā)鎖
mainLock.lock();
try {
// 獲取線程池運(yùn)行狀態(tài)rs
int rs = runStateOf(ctl.get());
// 當(dāng)線程池是運(yùn)行狀態(tài),或者是SHUTDOWN狀態(tài)但firstTask為null,
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
// 如果線程t已經(jīng)被開啟,就拋出異常
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
// 將w添加到工作線程集合workers中
workers.add(w);
// 獲取工作線程集合workers的個(gè)數(shù)
int s = workers.size();
// 記錄線程池歷史最大的工作線程個(gè)數(shù)
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
// 如果已經(jīng)添加到工作線程隊(duì)列中,那么開啟線程
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
// 如果開啟工作線程失敗,那么這個(gè)任務(wù)也就沒有執(zhí)行
// 因此移除這個(gè)任務(wù)w(如果隊(duì)列中有),減少工作線程數(shù)量,因?yàn)檫@個(gè)數(shù)量在之前已經(jīng)增加了
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
添加一個(gè)新的工作線程,就涉及到兩個(gè)成員變量的改變,一個(gè)是工作線程數(shù)量ctl,一個(gè)是工作線程集合workers。而ctl的類型是AtomicInteger,所以它可以使用樂觀鎖解決并發(fā)問題,workers就只能使用mainLock互斥鎖來保證并發(fā)安全問題。
4.1 更改工作線程數(shù)量ctl
因?yàn)閏tl儲存了兩個(gè)值,工作線程數(shù)量和線程池狀態(tài)。所以使用了兩個(gè)for循環(huán)來監(jiān)控多線程對這兩個(gè)值的更改。
用線程池狀態(tài)來判斷是否允許添加新的工作線程:
// 是對addWorker中線程狀態(tài)if判斷的拆分
// 當(dāng)線程池不是處于運(yùn)行狀態(tài)
if (rs >= SHUTDOWN) {
/**
* 線程池狀態(tài)不是SHUTDOWN,或者firstTask不為null,或者任務(wù)隊(duì)列為空,
* 都直接返回false,表示開啟新工作線程失敗。
* 只有當(dāng)線程池狀態(tài)是SHUTDOWN,firstTask為null,任務(wù)隊(duì)列不為空時(shí),
* 需要?jiǎng)?chuàng)建新的工作線程。
* 從execute(Runnable command)方法中分析,firstTask參數(shù)為空只有一種情況,
* 此時(shí)線程池中工作線程數(shù)量是0,而任務(wù)隊(duì)列不為空,
* 那么就要開啟一個(gè)新工作線程去執(zhí)行任務(wù)隊(duì)列中的任務(wù),否則這些任務(wù)會被丟失。
*/
if (rs != SHUTDOWN || firstTask != null || workQueue.isEmpty()) {
return false;
}
}
由此可以得出,只有兩種情形允許添加新的工作線程:
- 線程池處于RUNNING狀態(tài)
- 線程池雖然處于SHUTDOWN狀態(tài),但是線程池工作線程個(gè)數(shù)是0(即這里的firstTask != null),且任務(wù)隊(duì)列workQueue不為空,那么就要開啟一個(gè)新工作線程去執(zhí)行任務(wù)隊(duì)列中的任務(wù)。
然后使用for循環(huán)和CAS函數(shù)方式,來給工作線程數(shù)量加一。注意此時(shí)工作線程還沒有創(chuàng)建,并添加到線程集合workers中,所以如果線程添加失敗,那么還要將工作線程數(shù)量減一。
4.2 添加工作線程集合workers
創(chuàng)建一個(gè)工作線程Worker,將它添加到線程集合workers中,然后開啟這個(gè)工作線程,使用mainLock獨(dú)占鎖保證成員變量workers的并發(fā)安全問題。
五. 內(nèi)部類Worker
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
private static final long serialVersionUID = 6138294804551838833L;
/** 該Worker所擁有的工作線程 */
final Thread thread;
/** Worker擁有的第一個(gè)任務(wù),初始化的時(shí)候賦值 */
Runnable firstTask;
/** 該工作線程Worker完成任務(wù)的數(shù)量 */
volatile long completedTasks;
/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
// 將state設(shè)置為-1,禁止發(fā)起中斷請求,
// 直到調(diào)用過runWorker方法,即線程已經(jīng)運(yùn)行時(shí)。
setState(-1);
// 第一個(gè)任務(wù)
this.firstTask = firstTask;
// 創(chuàng)建一個(gè)thread線程對象,它的run方法就是本W(wǎng)orker的run方法
// 這個(gè)thread就是Worker真正執(zhí)行任務(wù)的工作線程
this.thread = getThreadFactory().newThread(this);
}
/** 復(fù)寫的是Runnable中的run方法,所以當(dāng)工作線程開啟運(yùn)行后,會調(diào)用這個(gè)方法。 */
public void run() {
runWorker(this);
}
// 當(dāng)前獨(dú)占鎖是否空閑
protected boolean isHeldExclusively() {
return getState() != 0;
}
// 嘗試獲取獨(dú)占鎖
protected boolean tryAcquire(int unused) {
// 如果通過CAS函數(shù),可以將state值從0改變成1,那么表示獲取獨(dú)占鎖成功。
// 否則獨(dú)占鎖被別的線程獲取了。
if (compareAndSetState(0, 1)) {
// 設(shè)置擁有獨(dú)占鎖的線程是當(dāng)前線程
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
// 釋放獨(dú)占鎖
protected boolean tryRelease(int unused) {
// 設(shè)置擁有獨(dú)占鎖的線程為null
setExclusiveOwnerThread(null);
// 設(shè)置獲取獨(dú)占鎖的次數(shù)是0,表示鎖是空閑狀態(tài)
setState(0);
return true;
}
// 獲取獨(dú)占鎖,如果鎖被別的獲取,就一直等待。
public void lock() { acquire(1); }
// 嘗試獲取獨(dú)占鎖,如果鎖被別的獲取,就直接返回false,表示獲取失敗。
public boolean tryLock() { return tryAcquire(1); }
// 釋放獨(dú)占鎖
public void unlock() { release(1); }
// 當(dāng)前獨(dú)占鎖是否空閑
public boolean isLocked() { return isHeldExclusively(); }
// 如果Worker的工作線程thread已經(jīng)開啟,那么發(fā)起中斷請求。
void interruptIfStarted() {
Thread t;
// getState() >= 0表示thread已經(jīng)開啟
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
Worker實(shí)現(xiàn)了Runnable接口,那么就可以通過Worker對象創(chuàng)建一個(gè)新線程thread,這個(gè)thread就是Worker的工作線程,而任務(wù)都在run方法中執(zhí)行。
Worker還繼承自AbstractQueuedSynchronizer類。我們知道可以通過AQS類實(shí)現(xiàn)獨(dú)占鎖和共享鎖,而Worker中實(shí)現(xiàn)了tryAcquire和tryRelease方法,說明Worker對象也是個(gè)獨(dú)占鎖對象。我們可以考慮一下Worker這個(gè)獨(dú)占鎖的作用是什么?在后面會介紹到。
六. 工作線程運(yùn)行任務(wù)runWorker方法
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
// 將w的state狀態(tài)設(shè)置成0,這樣就允許對w的thread線程進(jìn)行中斷請求了。
w.unlock();
// completedAbruptly表示線程突然終結(jié)
boolean completedAbruptly = true;
try {
// 通過getTask從任務(wù)隊(duì)列中獲取任務(wù)task執(zhí)行,這個(gè)方法是個(gè)阻塞方法。
while (task != null || (task = getTask()) != null) {
// 獲取w獨(dú)占鎖,保證當(dāng)本工作線程運(yùn)行任務(wù)時(shí),
// 不能對該線程進(jìn)行中斷請求。
w.lock();
/**
* 如果線程池大于STOP狀態(tài),且Worker工作線程中斷標(biāo)志位是false,
* 那么就調(diào)用wt的interrupt方法發(fā)起中斷請求。
*/
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
// Worker工作線程發(fā)起中斷請求
wt.interrupt();
try {
// 鉤子方法,提供給子類。在執(zhí)行任務(wù)之前調(diào)用
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 調(diào)用run方法,執(zhí)行任務(wù)
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
// 鉤子方法,提供給子類。在執(zhí)行任務(wù)完成后調(diào)用
afterExecute(task, thrown);
}
} finally {
// 將task設(shè)置為null,進(jìn)行下一次循環(huán)
task = null;
// 將work完成的任務(wù)數(shù)completedTasks加一
w.completedTasks++;
// 釋放w獨(dú)占鎖
w.unlock();
}
}
// completedAbruptly = false表示線程正常完成終結(jié)
completedAbruptly = false;
} finally {
// 進(jìn)行一個(gè)工作線程完結(jié)后的后續(xù)操作
processWorkerExit(w, completedAbruptly);
}
}
runWorker方法是在每個(gè)工作線程的run方法中調(diào)用,通過getTask()方法從任務(wù)隊(duì)列中獲取任務(wù)task執(zhí)行,這個(gè)方法可以阻塞當(dāng)前工作線程,如果getTask()方法返回null,那么工作線程就會運(yùn)行結(jié)束,釋放線程。
雖然runWorker方法運(yùn)行在每個(gè)工作線程中,但是對于一個(gè)Worker來說,只會有它的工作線程能夠運(yùn)行runWorker方法,而且改變的也是這個(gè)Worker的成員變量,且這些成員變量也只能在runWorker方法改變,那么它沒有多線程并發(fā)問題啊,那么為什么在這里加鎖呢?
這是因?yàn)閃orker中有一個(gè)變量是可以被其他線程改變的,就是它的工作線程thread的中斷請求,所以Worker獨(dú)占鎖的作用就是控制別的線程對它的工作線程thread中斷請求的。
最后調(diào)用processWorkerExit方法,進(jìn)行一個(gè)工作線程完結(jié)后的后續(xù)操作。
七. 獲取任務(wù)getTask方法
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
// 獲取線程池狀態(tài)rs
int rs = runStateOf(c);
// 如果有需要檢查任務(wù)隊(duì)列workQueue是否為空
// 即rs >= STOP或者rs == SHUTDOWN且workQueue為空,那么返回null,停止工作線程
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
// 將工作線程數(shù)量減一
decrementWorkerCount();
return null;
}
// 獲取工作線程數(shù)量wc
int wc = workerCountOf(c);
/**
* 如果allowCoreThreadTimeOut為true或者wc > corePoolSize時(shí),
* 就要減少工作線程數(shù)量了。
* 當(dāng)工作線程在keepAliveTime時(shí)間內(nèi),沒有獲取到可執(zhí)行的任務(wù),
* 那么該工作線程就要被銷毀。
*/
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
// 工作線程數(shù)量減一,返回null,銷毀工作線程。
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
// 從任務(wù)隊(duì)列workQueue中獲取了任務(wù)r,會阻塞當(dāng)前線程。
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
// 如果r不為null,返回這個(gè)任務(wù)r
if (r != null)
return r;
// r是null,表示獲取任務(wù)超時(shí)
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
從阻塞任務(wù)隊(duì)列workQueue中獲取任務(wù)返回,因?yàn)槭亲枞蝿?wù)隊(duì)列,所以可以阻塞當(dāng)前線程。如果返回null,那么會完結(jié)調(diào)用getTask方法的那個(gè)工作線程。那么getTask方法在什么情況下返回null呢?
- 線程池的狀態(tài)大于等于STOP,或者線程狀態(tài)是SHUTDOWN且當(dāng)前任務(wù)隊(duì)列為空,那么返回null,停止工作線程。
- 獲取任務(wù)時(shí)間超時(shí),那么也會返回null,停止工作線程。因?yàn)榫€程池一般只維護(hù)一定數(shù)量的工作線程,如果超過這個(gè)數(shù)量,那么超過數(shù)量的工作線程,在空閑一定時(shí)間后,應(yīng)該被釋放。
八. 終止線程池的方法
8.1 shutdown和shutdownNow方法
/**
* 終止線程池。不能在添加新任務(wù)了,但是已經(jīng)添加到任務(wù)隊(duì)列的任務(wù)還是會執(zhí)行。
* 且對所有不是正在執(zhí)行任務(wù)的工作線程都發(fā)起中斷請求
*/
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 檢查是否擁有Shutdown的權(quán)限
checkShutdownAccess();
// 將線程池狀態(tài)變成SHUTDOWN狀態(tài)
advanceRunState(SHUTDOWN);
// 對所有不是正在執(zhí)行任務(wù)的工作線程都發(fā)起中斷請求
interruptIdleWorkers();
// 鉤子方法,提供給子類實(shí)現(xiàn)。表示線程池已經(jīng)shutdown了
onShutdown();
} finally {
mainLock.unlock();
}
// 嘗試去終結(jié)線程池
tryTerminate();
}
/**
* 終止線程池。不能在添加新任務(wù)了,也不會執(zhí)行已經(jīng)添加到任務(wù)隊(duì)列的任務(wù),只是將這些任務(wù)返回。
* 且對所有工作線程都發(fā)起中斷請求, 不管這個(gè)工作線程是否正在執(zhí)行任務(wù)
*/
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 檢查是否擁有Shutdown的權(quán)限
checkShutdownAccess();
// 將線程池狀態(tài)變成STOP狀態(tài)
advanceRunState(STOP);
// 對所有工作線程都發(fā)起中斷請求, 不管這個(gè)工作線程是否正在執(zhí)行任務(wù)
interruptWorkers();
// 返回阻塞隊(duì)列workQueue中未執(zhí)行任務(wù)的集合
tasks = drainQueue();
} finally {
mainLock.unlock();
}
// 嘗試去終結(jié)線程池
tryTerminate();
return tasks;
}
shutdown和shutdownNow區(qū)別:
- shutdown方法將線程池設(shè)置成SHUTDOWN狀態(tài),shutdownNow將線程池設(shè)置成STOP狀態(tài)。
- shutdown方法調(diào)用之后不能在添加新任務(wù)了,但是已經(jīng)添加到任務(wù)隊(duì)列的任務(wù)還是會執(zhí)行。shutdownNow方法調(diào)用之后不能在添加新任務(wù)了,也不會執(zhí)行已經(jīng)添加到任務(wù)隊(duì)列的任務(wù),只是將這些任務(wù)返回。
- shutdown方法會對所有不是正在執(zhí)行任務(wù)的工作線程都發(fā)起中斷請求,shutdownNow方法會對所有工作線程都發(fā)起中斷請求, 不管這個(gè)工作線程是否正在執(zhí)行任務(wù)。
8.2 advanceRunState方法
private void advanceRunState(int targetState) {
// 采用樂觀鎖的方法,來并發(fā)更改線程池狀態(tài)。
for (;;) {
int c = ctl.get();
// 如果runStateAtLeast方法返回true,表示當(dāng)前線程池狀態(tài)已經(jīng)是目標(biāo)狀態(tài)targetState
// 采用CAS函數(shù)嘗試更改線程池狀態(tài),如果失敗就循環(huán)繼續(xù)。
if (runStateAtLeast(c, targetState) ||
ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
break;
}
}
這個(gè)方法來改變線程池狀態(tài),使用樂觀鎖的方式保證并發(fā)安全。
8.3 中斷空閑狀態(tài)下的工作線程
/**
* 對所有不是正在執(zhí)行任務(wù)的工作線程都發(fā)起中斷請求。
*/
private void interruptIdleWorkers() {
interruptIdleWorkers(false);
}
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 遍歷工作線程Worker集合
for (Worker w : workers) {
Thread t = w.thread;
// 如果工作線程中斷標(biāo)志位是false,
// 且能夠獲取鎖,即當(dāng)前工作線程沒有運(yùn)行任務(wù)
if (!t.isInterrupted() && w.tryLock()) {
try {
// 發(fā)起中斷請求。
// 因?yàn)楂@取了鎖,所以在進(jìn)入中斷請求時(shí),worker工作線程不會執(zhí)行任務(wù)
t.interrupt();
} catch (SecurityException ignore) {
} finally {
// 釋放鎖
w.unlock();
}
}
// 是否只進(jìn)行一個(gè)工作線程的中斷請求。
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
遍歷工作線程Worker集合,如果工作線程出于空閑狀態(tài),且沒有被中斷,那么就發(fā)起中斷請求。通過獨(dú)占鎖Worker知道,當(dāng)前工作線程是否在執(zhí)行任務(wù)。
8.4 對所有已開啟的工作線程發(fā)起中斷請求
/**
* 對所有工作線程都發(fā)起中斷請求, 不管這個(gè)工作線程是否正在執(zhí)行任務(wù)
*/
private void interruptWorkers() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers)
// 如果w的工作線程thread已經(jīng)開啟,那么發(fā)起中斷請求。
w.interruptIfStarted();
} finally {
mainLock.unlock();
}
}
遍歷工作線程Worker集合,調(diào)用Worker的interruptIfStarted方法,如果工作線程已開啟,那么就會發(fā)起中斷。
8.5 嘗試完結(jié)線程池的方法
final void tryTerminate() {
for (;;) {
int c = ctl.get();
/**
* 如果線程池是RUNNING狀態(tài),
* 或者線程池是TIDYING狀態(tài)(是因?yàn)橐呀?jīng)有別的線程在終止線程池了)
* 或者線程池是SHUTDOWN狀態(tài)且任務(wù)隊(duì)列不為空,
* 線程池不能被terminate終止,直接return返回
*
*/
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
// 線程池中工作線程數(shù)量不是0,線程池不能被terminate終止,所以要return
if (workerCountOf(c) != 0) { // Eligible to terminate
interruptIdleWorkers(ONLY_ONE);
return;
}
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
// 鉤子方法,提供給子類實(shí)現(xiàn)。表示線程池已經(jīng)終止。
terminated();
} finally {
// 設(shè)置線程池狀態(tài)是TERMINATED
ctl.set(ctlOf(TERMINATED, 0));
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
}
}
線程池在什么情況下算是完全停止了呢?有三個(gè)條件:
- 線程池不是RUNNING狀態(tài)。
- 線程池中工作線程數(shù)量是0。
- 線程池中任務(wù)隊(duì)列為空。
所以在看看tryTerminate()中,前面兩個(gè)if判斷條件,就可以理解了。
8.6 等待線程池完結(jié)的方法
public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (;;) {
// 如果是TERMINATED已終止?fàn)顟B(tài),那么就返回true
if (runStateAtLeast(ctl.get(), TERMINATED))
return true;
// 如果已經(jīng)超時(shí)就返回false
if (nanos <= 0)
return false;
// 讓當(dāng)前線程等待。并設(shè)置超時(shí)時(shí)間nanos
nanos = termination.awaitNanos(nanos);
}
} finally {
mainLock.unlock();
}
}
如果線程池不是TERMINATED狀態(tài),就讓當(dāng)前線程在termination條件上等待,直到線程池變成TERMINATED狀態(tài),或者等待時(shí)間超時(shí)才會被喚醒。
8.7 工作線程退出的方法
private void processWorkerExit(Worker w, boolean completedAbruptly) {
// 如果工作線程突然被終結(jié),那么工作線程的數(shù)量就沒有減一。
if (completedAbruptly)
// 將工作線程數(shù)量減一。
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 將工作線程的任務(wù)完成數(shù)添加到線程池完成任務(wù)總數(shù)中
completedTaskCount += w.completedTasks;
// 從工作線程集合中移除本工作線程
workers.remove(w);
} finally {
mainLock.unlock();
}
// 因?yàn)橛幸粋€(gè)工作線程已經(jīng)完成被釋放,那么就去嘗試終結(jié)線程池。
tryTerminate();
int c = ctl.get();
// 如果線程池狀態(tài)小于STOP,
// 就要判斷終結(jié)了這個(gè)工作線程之后,線程池中工作線程數(shù)量是否滿足需求。
if (runStateLessThan(c, STOP)) {
// 如果工作線程正常終結(jié),
// 那么要看線程池中工作線程數(shù)量是否滿足需求。
if (!completedAbruptly) {
// 不允許核心池線程釋放,那么最小值是corePoolSize,否則就可以為0
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
// 但是如果任務(wù)隊(duì)列中還有任務(wù),那么工作線程數(shù)量最少為1
if (min == 0 && ! workQueue.isEmpty())
min = 1;
// 如果工作線程數(shù)量小于min值,就要?jiǎng)?chuàng)建新的工作線程了。
if (workerCountOf(c) >= min)
return; // replacement not needed
}
// 開啟一個(gè)新的工作線程
addWorker(null, false);
}
}
工作線程被釋放,有兩種情況,一種是運(yùn)行完成正常結(jié)束,一種是發(fā)生異常意外終止。
當(dāng)工作線程被釋放時(shí),需要將它從工作線程集合workers中移除,將該工作線程任務(wù)完成數(shù)添加到線程池完成任務(wù)總數(shù)中。調(diào)用tryTerminate方法嘗試終結(jié)線程池。
另外因?yàn)橛幸粋€(gè)工作線程被釋放,那么就要考慮線程池中當(dāng)前工作線程數(shù)量是否符合要求,要不要添加新的工作線程。
九. 創(chuàng)建線程池的方法。
上面分析完線程池的功能方法后,再來說說怎樣創(chuàng)建一個(gè)線程池。
9.1 構(gòu)造函數(shù)
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory, defaultHandler);
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), handler);
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
// 判斷設(shè)置的核心池?cái)?shù)量corePoolSize、最大池?cái)?shù)量maximumPoolSize、
// 與線程空閑存活時(shí)間keepAliveTime的值,是否符合要求
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
// 賦值
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
ThreadPoolExecutor類一共有四個(gè)構(gòu)造函數(shù),前面三個(gè)構(gòu)造函數(shù)都是調(diào)用后面那個(gè)構(gòu)造函數(shù)來實(shí)現(xiàn)的。參數(shù)意義:
- corePoolSize: 線程池核心池線程個(gè)數(shù)。
- maximumPoolSize: 線程池允許最大的線程個(gè)數(shù)。
- keepAliveTime: 線程空閑時(shí),允許存活的時(shí)間。
- unit:輔助變量,用來將keepAliveTime參數(shù),轉(zhuǎn)成對應(yīng)納秒值。
- workQueue:儲存所有待執(zhí)行任務(wù)的阻塞隊(duì)列
- threadFactory:用來創(chuàng)建線程的工廠類
- handler:通過它來通知調(diào)用值,線程池拒絕了任務(wù)。
注:有沒有注意到,沒有傳遞allowCoreThreadTimeOut這個(gè)參數(shù),那么怎么設(shè)置這個(gè)成語變量呢?通過allowCoreThreadTimeOut(boolean value)方法來設(shè)置。
一般我們不用自己來new ThreadPoolExecutor對象,而是通過Executors這個(gè)工具類來創(chuàng)建ThreadPoolExecutor實(shí)例。
9.2 創(chuàng)建固定數(shù)量的線程池
// 創(chuàng)建固定數(shù)量的線程池
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
// 創(chuàng)建固定數(shù)量的線程池
public static ExecutorService newFixedThreadPool(int nThreads,
ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}
根據(jù)我們前面講解,要想線程池維持固定數(shù)量的工作線程,那么工作線程就不能被釋放,就要做到兩點(diǎn):
- allowCoreThreadTimeOut為false,這個(gè)是默認(rèn)的。keepAliveTime設(shè)置為0,這樣當(dāng)調(diào)用allowCoreThreadTimeOut(boolean value)方法修改allowCoreThreadTimeOut值時(shí),會拋出異常,不允許修改。
- 核心池?cái)?shù)量和最大池?cái)?shù)量一樣,防止添加新的工作線程池。任務(wù)隊(duì)列容量要足夠大,防止任務(wù)添加到任務(wù)隊(duì)列中失敗,不能執(zhí)行。
9.3 創(chuàng)建單個(gè)線程的線程池
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newSingleThreadExecutor(
ThreadFactory threadFactory) {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory));
}
與固定數(shù)量的線程池相比:
- 將固定數(shù)量nThreads變成了1
- 使用了FinalizableDelegatedExecutorService這個(gè)代理類,主要作用就是當(dāng)對象被銷毀時(shí),會調(diào)用shutdown方法,停止線程池。
9.4 創(chuàng)建緩存線程池
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);
}
什么叫做緩存線程池,當(dāng)有任務(wù)執(zhí)行時(shí),會創(chuàng)建工作線程來執(zhí)行任務(wù),當(dāng)任務(wù)執(zhí)行完畢后,工作線程會等待一段時(shí)間,如果還是沒有任務(wù)需要執(zhí)行,那么就會釋放工作線程。
十. ThreadPoolExecutor 重要參數(shù)方法
10.1 getActiveCount 正在執(zhí)行任務(wù)線程數(shù)
public int getActiveCount() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int n = 0;
for (Worker w : workers)
// isLocked() 表示這個(gè) Worker 正在執(zhí)行任務(wù)
if (w.isLocked())
++n;
return n;
} finally {
mainLock.unlock();
}
}
這個(gè)方法獲取正在執(zhí)行任務(wù)的線程數(shù)量。w.isLocked() 表示正在執(zhí)行任務(wù)的 Worker 。
10.2 getCompletedTaskCount 返回完成任務(wù)大致數(shù)量
public long getCompletedTaskCount() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// completedTaskCount 表示已完成 Worker 的completedTasks數(shù)量之和。
long n = completedTaskCount;
for (Worker w : workers)
n += w.completedTasks;
return n;
} finally {
mainLock.unlock();
}
}
返回已完成執(zhí)行的任務(wù)的大致總數(shù)。由于任務(wù)和線程的狀態(tài)可能在計(jì)算過程中動(dòng)態(tài)變化,因此返回值只是一個(gè)近似值。
completedTaskCount表示已完成Worker的completedTasks數(shù)量之和。在Worker退出方法processWorkerExit()中進(jìn)行增加操作。
10.3 getTaskCount 返回已計(jì)劃執(zhí)行的任務(wù)的大致總數(shù)
public long getTaskCount() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// completedTaskCount 表示已完成 Worker 的completedTasks數(shù)量之和。
long n = completedTaskCount;
for (Worker w : workers) {
n += w.completedTasks;
// w.isLocked() 表示一個(gè)任務(wù)正在執(zhí)行
if (w.isLocked())
++n;
}
// 再加上待執(zhí)行的任務(wù)數(shù)量
return n + workQueue.size();
} finally {
mainLock.unlock();
}
}
返回已計(jì)劃執(zhí)行的任務(wù)的大致總數(shù)。由于任務(wù)和線程的狀態(tài)可能在計(jì)算過程中動(dòng)態(tài)變化,因此返回值只是一個(gè)近似值。
10.4 getCorePoolSize 方法
public int getCorePoolSize() {
return corePoolSize;
}
返回線程池核心線程數(shù)量。
10.5 getKeepAliveTime 方法
public long getKeepAliveTime(TimeUnit unit) {
return unit.convert(keepAliveTime, TimeUnit.NANOSECONDS);
}
返回線程保持活動(dòng)時(shí)間。一是當(dāng)線程超過核心線程數(shù)時(shí),超過的線程超過 keepAliveTime 時(shí)間沒有執(zhí)行任務(wù),就會關(guān)閉;二是 allowCoreThreadTimeOut 值為 true,那么核心線程空閑事件超過 keepAliveTime 也會關(guān)閉。
10.6 getLargestPoolSize 方法
public int getLargestPoolSize() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
return largestPoolSize;
} finally {
mainLock.unlock();
}
}
獲取曾經(jīng)出現(xiàn)的最大線程數(shù)。
10.7 getMaximumPoolSize 方法
public int getMaximumPoolSize() {
return maximumPoolSize;
}
獲取最大線程數(shù)。
10.8 getPoolSize 方法
public int getPoolSize() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Remove rare and surprising possibility of
// isTerminated() && getPoolSize() > 0
return runStateAtLeast(ctl.get(), TIDYING) ? 0
: workers.size();
} finally {
mainLock.unlock();
}
}
返回池中的當(dāng)前線程數(shù)。
10.9 getQueue 方法
public BlockingQueue<Runnable> getQueue() {
return workQueue;
}
返回待執(zhí)行任務(wù)隊(duì)列。
十一 總結(jié)
線程池有兩個(gè)概念核心池與最大池。
- 核心池:線程池應(yīng)該維持的工作線程數(shù)量,如果線程池中工作線程數(shù)量小于核心池?cái)?shù)量,就會創(chuàng)建新的工作線程添加到線程池中。
- 最大池: 線程池中臨時(shí)存在的工作線程,當(dāng)任務(wù)隊(duì)列不能添加新任務(wù)時(shí),就會創(chuàng)建新的工作線程添加到線程池中。執(zhí)行完任務(wù)后,超過一定時(shí)間沒有接受到新任務(wù),這個(gè)臨時(shí)工作線程就會被釋放。
兩者的區(qū)別:
- 線程釋放:最大池中的線程當(dāng)超過一定時(shí)間沒有接受到新任務(wù),就會被釋放,而核心池中的線程,一般不釋放,只有設(shè)置allowCoreThreadTimeOut為true,且超過一定時(shí)間沒有接受到新任務(wù),也會被釋放。
- 創(chuàng)建時(shí)機(jī):線程池中工作線程數(shù)量小于核心池?cái)?shù)量,就會創(chuàng)建核心池線程。但是對于最大池來說,只有任務(wù)隊(duì)列已滿,不能添加新任務(wù)時(shí),才會創(chuàng)建新線程,放入最大池中。
注:一般稱小于等于corePoolSize數(shù)量的工作線程池是核心池中的線程,大于corePoolSize數(shù)量的工作線程池就是最大池中的線程。
11.1 線程池執(zhí)行任務(wù)流程
通過execute方法執(zhí)行新任務(wù)command,分為三個(gè)步驟:
- 線程池中工作線程數(shù)量小于核心池?cái)?shù)量,那么就開啟新的工作線程來執(zhí)行任務(wù)。
- 線程池中工作線程數(shù)量達(dá)到核心池?cái)?shù)量,那么就將新任務(wù)添加到任務(wù)隊(duì)列中。
- 如果新任務(wù)添加到任務(wù)隊(duì)列失敗,那么就開啟新的工作線程來執(zhí)行任務(wù)(這個(gè)線程就在最大池中了)。
在每個(gè)工作線程,會通過循環(huán),調(diào)用getTask方法,不斷地從任務(wù)隊(duì)列中獲取任務(wù)來執(zhí)行。如果任務(wù)隊(duì)列中沒有任務(wù),那么getTask方法會阻塞當(dāng)前工作線程。
但是工作線程被喚醒后,getTask方法返回null,那么就會跳出循環(huán),該工作線程運(yùn)行結(jié)束,準(zhǔn)備釋放。
11.2 終止線程池
線程池不可能立即就終止,因?yàn)樯婕暗骄€程池正在執(zhí)行任務(wù)的線程和任務(wù)隊(duì)列中等待執(zhí)行的任務(wù)該如何處理問題,有兩個(gè)方式:
- shutdown方法:不能再向線程池中添加新任務(wù)了,但是已經(jīng)添加到任務(wù)隊(duì)列的任務(wù)還是會執(zhí)行,也不會對正在執(zhí)行任務(wù)的線程發(fā)起中斷請求。等待任務(wù)隊(duì)列任務(wù)執(zhí)行完成,釋放線程池中所有線程,線程池進(jìn)入完全終止?fàn)顟B(tài)。
- shutdownNow方法:不能再向線程池中添加新任務(wù)了,也不會執(zhí)行已經(jīng)添加到任務(wù)隊(duì)列的任務(wù),但是會返回未執(zhí)行的任務(wù)集合。而且對所有工作線程都發(fā)起中斷請求, 不管這個(gè)工作線程是否正在執(zhí)行任務(wù)。等待線程池中所有線程釋放,線程池進(jìn)入完全終止?fàn)顟B(tài)。
兩者的區(qū)別:
兩者都不能再向線程池中添加新任務(wù)了。shutdown方法還是會將已添加的任務(wù)都執(zhí)行完畢,而shutdownNow方法不會再執(zhí)行任何新任務(wù)了。
注:對于正在執(zhí)行的任務(wù)是可能執(zhí)行完成的,因?yàn)橹袛嗾埱笾荒苤袛嗵幱赪AITING與TIMED_WAITING狀態(tài)的線程,對于處于其他狀態(tài)的線程不起作用。
十二. 重要示例
12.1 正常運(yùn)行線程池
package com.zhang._22._5;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
class Run implements Runnable {
private int index;
public Run(int index) {
this.index = index+1;
}
@Override
public void run() {
System.out.println("--"+Thread.currentThread().getName()+"開始運(yùn)行 任務(wù)"+index);
try {
int waitTime = 100 + index * 10;
Thread.sleep(waitTime);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("======="+Thread.currentThread().getName()+"結(jié)束 任務(wù)"+index);
}
}
class MyThreadFactory implements ThreadFactory {
private int sequenceNumber = 0;
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "線程"+(++sequenceNumber));
}
}
public class ThreadPoolExecutorTest {
public static void main(String[] args) {
ThreadFactory threadFactory = new MyThreadFactory();
// 固定數(shù)量的線程池
ExecutorService service = Executors.newFixedThreadPool(3, threadFactory);
// // 單個(gè)線程的線程池
// ExecutorService service = Executors.newSingleThreadExecutor(threadFactory);
//
// // 緩存線程池
// ExecutorService service = Executors.newCachedThreadPool(threadFactory);
for (int i = 0; i < 6; i++) {
service.execute(new Run(i));
}
}
}
運(yùn)行結(jié)果:
--線程1開始運(yùn)行 任務(wù)1
--線程2開始運(yùn)行 任務(wù)2
--線程3開始運(yùn)行 任務(wù)3
=======線程1結(jié)束 任務(wù)1
--線程1開始運(yùn)行 任務(wù)4
=======線程2結(jié)束 任務(wù)2
--線程2開始運(yùn)行 任務(wù)5
=======線程3結(jié)束 任務(wù)3
--線程3開始運(yùn)行 任務(wù)6
=======線程1結(jié)束 任務(wù)4
=======線程2結(jié)束 任務(wù)5
=======線程3結(jié)束 任務(wù)6
這里使用的是固定數(shù)量的線程池,所以只有三個(gè)線程來執(zhí)行任務(wù),未執(zhí)行到的任務(wù)只能等待。
如果換成單個(gè)線程的線程池,那么只有一個(gè)線程在執(zhí)行任務(wù)。
而緩存線程池呢?你就會發(fā)現(xiàn)居然有六個(gè)線程在執(zhí)行任務(wù),就是有多少任務(wù)創(chuàng)建多少個(gè)線程。
運(yùn)行完任務(wù)后,你會發(fā)現(xiàn)程序沒有結(jié)束,那是因?yàn)榫€程池沒有被終止。
12.2 終止線程池
package com.zhang._22._5;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
class Run implements Runnable {
private int index;
public Run(int index) {
this.index = index+1;
}
@Override
public void run() {
System.out.println("--"+Thread.currentThread().getName()+"開始運(yùn)行 任務(wù)"+index);
try {
int waitTime = 100 + index * 10;
Thread.sleep(waitTime);
} catch (InterruptedException e) {
System.out.println(Thread.currentThread().getName()+" 發(fā)生中斷異常 exception=="+e.getMessage());
}
System.out.println("======="+Thread.currentThread().getName()+"結(jié)束 任務(wù)"+index);
}
}
class MyThreadFactory implements ThreadFactory {
private int sequenceNumber = 0;
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "線程"+(++sequenceNumber));
}
}
public class ThreadPoolExecutorTest {
public static void main(String[] args) {
ThreadFactory threadFactory = new MyThreadFactory();
// 固定數(shù)量的線程池
ExecutorService service = Executors.newFixedThreadPool(3, threadFactory);
// // 單個(gè)線程的線程池
// ExecutorService service = Executors.newSingleThreadExecutor(threadFactory);
//
// // 緩存線程池
// ExecutorService service = Executors.newCachedThreadPool(threadFactory);
for (int i = 0; i < 6; i++) {
service.execute(new Run(i));
}
// 還是會執(zhí)行完已經(jīng)添加的任務(wù)
service.shutdown();
}
}
運(yùn)行結(jié)果:
--線程1開始運(yùn)行 任務(wù)1
--線程3開始運(yùn)行 任務(wù)3
--線程2開始運(yùn)行 任務(wù)2
=======線程1結(jié)束 任務(wù)1
--線程1開始運(yùn)行 任務(wù)4
=======線程2結(jié)束 任務(wù)2
--線程2開始運(yùn)行 任務(wù)5
=======線程3結(jié)束 任務(wù)3
--線程3開始運(yùn)行 任務(wù)6
=======線程1結(jié)束 任務(wù)4
=======線程2結(jié)束 任務(wù)5
=======線程3結(jié)束 任務(wù)6
Process finished with exit code 0
使用shutdown方法,還是會執(zhí)行完已經(jīng)添加的任務(wù)。最后程序退出。
package com.zhang._22._5;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
class Run implements Runnable {
private int index;
public Run(int index) {
this.index = index+1;
}
@Override
public void run() {
System.out.println("--"+Thread.currentThread().getName()+"開始運(yùn)行 任務(wù)"+index);
try {
int waitTime = 100 + index * 10;
Thread.sleep(waitTime);
} catch (InterruptedException e) {
System.out.println(Thread.currentThread().getName()+" 發(fā)生中斷異常 exception=="+e.getMessage());
}
System.out.println("======="+Thread.currentThread().getName()+"結(jié)束 任務(wù)"+index);
}
}
class MyThreadFactory implements ThreadFactory {
private int sequenceNumber = 0;
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "線程"+(++sequenceNumber));
}
}
public class ThreadPoolExecutorTest {
public static void main(String[] args) {
ThreadFactory threadFactory = new MyThreadFactory();
// 固定數(shù)量的線程池
ExecutorService service = Executors.newFixedThreadPool(3, threadFactory);
// // 單個(gè)線程的線程池
// ExecutorService service = Executors.newSingleThreadExecutor(threadFactory);
//
// // 緩存線程池
// ExecutorService service = Executors.newCachedThreadPool(threadFactory);
for (int i = 0; i < 6; i++) {
service.execute(new Run(i));
}
service.shutdownNow();
}
}
運(yùn)行結(jié)果:
--線程1開始運(yùn)行 任務(wù)1
--線程2開始運(yùn)行 任務(wù)2
--線程3開始運(yùn)行 任務(wù)3
線程2 發(fā)生中斷異常 exception==sleep interrupted
線程1 發(fā)生中斷異常 exception==sleep interrupted
=======線程1結(jié)束 任務(wù)1
=======線程2結(jié)束 任務(wù)2
線程3 發(fā)生中斷異常 exception==sleep interrupted
=======線程3結(jié)束 任務(wù)3
Process finished with exit code 0
使用shutdownNow方法,在任務(wù)隊(duì)列中等待的任務(wù)是不會執(zhí)行的,而且立即發(fā)起線程中斷請求。