一、線程池簡介
線程池的使用主要是解決兩個問題:①當執(zhí)行大量異步任務的時候線程池能夠提供更好的性能,在不使用線程池時候,每當需要執(zhí)行異步任務的時候直接new一個線程來運行的話,線程的創(chuàng)建和銷毀都是需要開銷的。而線程池中的線程是可復用的,不需要每次執(zhí)行異步任務的時候重新創(chuàng)建和銷毀線程;②線程池提供一種資源限制和管理的手段,比如可以限制線程的個數(shù),動態(tài)的新增線程等等。
在下面的分析中,我們可以看到,線程池使用一個Integer的原子類型變量來記錄線程池狀態(tài)和線程池中的線程數(shù)量,通過線程池狀態(tài)來控制任務的執(zhí)行,每個工作線程Worker線程可以處理多個任務。
二、ThreadPoolExecutor類
1、我們先簡單看一下關于ThreadPoolExecutor的一些成員變量以及其所表示的含義
ThreadPoolExecutor繼承了AbstractExecutorService,其中的成員變量ctl是一個Integer類型的原子變量,用來記錄線程池的狀態(tài)和線程池中的線程的個數(shù),類似于前面講到的讀寫鎖中使用一個變量保存兩種信息。這里(Integer看做32位)ctl高三位表示線程池的狀態(tài),后面的29位表示線程池中的線程個數(shù)。如下所示是ThreadPoolExecutor源碼中的成員變量
1 //(高3位)表示線程池狀態(tài),(低29位)表示線程池中線程的個數(shù);
2 // 默認狀態(tài)是RUNNING,線程池中線程個數(shù)為0
3 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
4
5 //表示具體平臺下Integer的二進制位數(shù)-3后的剩余位數(shù)表示的數(shù)才是線程的個數(shù);
6 //其中Integer.SIZE=32,-3之后的低29位表示的就是線程的個數(shù)了
7 private static final int COUNT_BITS = Integer.SIZE - 3;
8
9 //線程最大個數(shù)(低29位)00011111111111111111111111111111(1<<29-1)
10 private static final int CAPACITY? = (1 << COUNT_BITS) - 1;
11
12 //線程池狀態(tài)(高3位表示線程池狀態(tài))
13 //111 00000000000000000000000000000
14 private static final int RUNNING? ? = -1 << COUNT_BITS;
15
16 //000 00000000000000000000000000000
17 private static final int SHUTDOWN? =? 0 << COUNT_BITS;
18
19 //001 00000000000000000000000000000
20 private static final int STOP? ? ? =? 1 << COUNT_BITS;
21
22 //010 00000000000000000000000000000
23 private static final int TIDYING? ? =? 2 << COUNT_BITS;
24
25 //011 00000000000000000000000000000
26 private static final int TERMINATED =? 3 << COUNT_BITS;
27
28 //獲取高3位(運行狀態(tài))==> c & 11100000000000000000000000000000
29 private static int runStateOf(int c)? ? { return c & ~CAPACITY; }
30
31 //獲取低29位(線程個數(shù))==> c &? 00011111111111111111111111111111
32 private static int workerCountOf(int c)? { return c & CAPACITY; }
33
34 //計算原子變量ctl新值(運行狀態(tài)和線程個數(shù))
35 private static int ctlOf(int rs, int wc) { return rs | wc; }
下面我們簡單解釋一下上面的線程狀態(tài)的含義:
①RUNNING:接受新任務并處理阻塞隊列中的任務
②SHUTDOWN:拒絕新任務但是處理阻塞隊列中的任務
③STOP:拒絕新任務并拋棄阻塞隊列中的任務,同時會中斷當前正在執(zhí)行的任務
④TIDYING:所有任務執(zhí)行完之后(包含阻塞隊列中的任務)當前線程池中活躍的線程數(shù)量為0,將要調(diào)用terminated方法
⑥TERMINATED:終止狀態(tài)。terminated方法調(diào)用之后的狀態(tài)
2、下面初步了解一下ThreadPoolExecutor的參數(shù)以及實現(xiàn)原理
①corePoolSize:線程池核心現(xiàn)車個數(shù)
②workQueue:用于保存等待任務執(zhí)行的任務的阻塞隊列(比如基于數(shù)組的有界阻塞隊列ArrayBlockingQueue、基于鏈表的無界阻塞隊列LinkedBlockingQueue等等)
③maximumPoolSize:線程池最大線程數(shù)量
④ThreadFactory:創(chuàng)建線程的工廠
⑤RejectedExecutionHandler:拒絕策略,表示當隊列已滿并且線程數(shù)量達到線程池最大線程數(shù)量的時候?qū)π绿峤坏娜蝿账扇〉牟呗?,主要有四種策略:AbortPolicy(拋出異常)、CallerRunsPolicy(只用調(diào)用者所在線程來運行該任務)、DiscardOldestPolicy(丟掉阻塞隊列中最近的一個任務來處理當前提交的任務)、DiscardPolicy(不做處理,直接丟棄掉)
⑥keepAliveTime:存活時間,如果當前線程池中的數(shù)量比核心線程數(shù)量多,并且當前線程是閑置狀態(tài),該變量就是這些線程的最大生存時間
⑦TimeUnit:存活時間的時間單位。
根據(jù)上面的參數(shù)介紹,簡單了解一下線程池的實現(xiàn)原理,以提交一個新任務為開始點,分析線程池的主要處理流程
3、關于一些線程池的使用類型
①newFixedThreadPool:創(chuàng)建一個核心線程個數(shù)和最大線程個數(shù)均為nThreads的線程池,并且阻塞隊列長度為Integer.MAX_VALUE,keepAliveTime=0說明只要線程個數(shù)比核心線程個數(shù)多并且當前空閑即回收。
public static ExecutorService newFixedThreadPool(int nThreads) {
? ? return new ThreadPoolExecutor(nThreads, nThreads,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? 0L, TimeUnit.MILLISECONDS,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? new LinkedBlockingQueue<Runnable>());
}
②newSingleThreadExecutor:創(chuàng)建一個核心線程個數(shù)和最大線程個數(shù)都為1 的線程池,并且阻塞隊列長度為Integer.MAX_VALUE,keepAliveTime=0說明只要線程個數(shù)比核心線程個數(shù)多并且當前線程空閑即回收該線程。
public static ExecutorService newSingleThreadExecutor() {
? ? return new FinalizableDelegatedExecutorService
? ? ? ? (new ThreadPoolExecutor(1, 1,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? 0L, TimeUnit.MILLISECONDS,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? new LinkedBlockingQueue<Runnable>()));
}
③newCachedThreadPoolExecutor:創(chuàng)建一個按需創(chuàng)建線程的線程池,初始線程個數(shù)為0,最多線程個數(shù)為Integer.MAX_VALUE,并且阻塞隊列為同步隊列(最多只有一個元素),keepAliveTime=60說明只要當前線程在60s內(nèi)空閑則回收。這個類型的線程池的特點就是:加入同步隊列的任務會被馬上執(zhí)行,同步隊列中最多只有一個任務
public static ExecutorService newCachedThreadPool() {
? ? return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? 60L, TimeUnit.SECONDS,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? new SynchronousQueue<Runnable>());
}
4、ThreadPoolExecutor中的其他成員
其中的ReentrantLock可參考前面寫到的Java中的鎖——Lock和synchronized,其中降到了ReentrantLock的具體實現(xiàn)原理;
關于AQS部分可參考前面說到的Java中的隊列同步器AQS,也講到了關于AQS的具體實現(xiàn)原理分析;
關于條件隊列的相關知識可參考前面寫的Java中的線程協(xié)作之Condition,里面說到了關于Java中線程協(xié)作Condition的實現(xiàn)原理;
//獨占鎖,用來控制新增工作線程Worker操作的原子性
private final ReentrantLock mainLock = new ReentrantLock();
//工作線程集合,Worker繼承了AQS接口和Runnable接口,是具體處理任務的線程對象
//Worker實現(xiàn)AQS,并自己實現(xiàn)了簡單不可重入獨占鎖,其中state=0表示當前鎖未被獲取狀態(tài),state=1表示鎖被獲取,
//state=-1表示W(wǎng)ork創(chuàng)建時候的默認狀態(tài),創(chuàng)建時候設置state=-1是為了防止runWorker方法運行前被中斷
private final HashSet<Worker> workers = new HashSet<Worker>();
//termination是該鎖對應的條件隊列,在線程調(diào)用awaitTermination時候用來存放阻塞的線程
private final Condition termination = mainLock.newCondition();
三、execute(Runnable command)方法實現(xiàn)
executor方法的作用是提交任務command到線程池執(zhí)行,可以簡單的按照下面的圖進行理解,ThreadPoolExecutor的實現(xiàn)類似于一個生產(chǎn)者消費者模型,當用戶添加任務到線程池中相當于生產(chǎn)者生產(chǎn)元素,workers工作線程則直接執(zhí)行任務或者從任務隊列中獲取任務,相當于消費之消費元素。
1 public void execute(Runnable command) {
2? ? //(1)首先檢查任務是否為null,為null拋出異常,否則進行下面的步驟
3? ? if (command == null)
4? ? ? ? throw new NullPointerException();
5? ? //(2)ctl值中包含了當前線程池的狀態(tài)和線程池中的線程數(shù)量
6? ? int c = ctl.get();
7? ? //(3)workerCountOf方法是獲取低29位,即獲取當前線程池中的線程個數(shù),如果小于corePoolSize,就開啟新的線程運行
8? ? if (workerCountOf(c) < corePoolSize) {
9? ? ? ? if (addWorker(command, true))
10? ? ? ? ? ? return;
11? ? ? ? c = ctl.get();
12? ? }
13? ? //(4)如果線程池處理RUNNING狀態(tài),就添加任務到阻塞隊列中
14? ? if (isRunning(c) && workQueue.offer(command)) {
15? ? ? ? //(4-1)二次檢查,獲取ctl值
16? ? ? ? int recheck = ctl.get();
17? ? ? ? //(4-2)如果當前線程池不是出于RUNNING狀態(tài),就從隊列中刪除任務,并執(zhí)行拒絕策略
18? ? ? ? if (! isRunning(recheck) && remove(command))
19? ? ? ? ? ? reject(command);
20? ? ? ? //(4-3)否則,如果線程池為空,就添加一個線程
21? ? ? ? else if (workerCountOf(recheck) == 0)
22? ? ? ? ? ? addWorker(null, false);
23? ? }
24? ? //(5)如果隊列滿,則新增線程,如果新增線程失敗,就執(zhí)行拒絕策略
25? ? else if (!addWorker(command, false))
26? ? ? ? reject(command);
27 }
我們在看一下上面代碼的執(zhí)行流程,按照標記的數(shù)字進行分析:
①步驟(3)判斷當前線程池中的線程個數(shù)是否小于corePoolSize,如果小于核心線程數(shù),會向workers里面新增一個核心線程執(zhí)行任務。
②如果當前線程池中的線程數(shù)量大于核心線程數(shù),就執(zhí)行(4)。(4)首先判斷當前線程池是否處于RUNNING狀態(tài),如果處于該狀態(tài),就添加任務到任務隊列中,這里需要判斷線程池的狀態(tài)是因為線程池可能已經(jīng)處于非RUNNING狀態(tài),而在非RUNNING狀態(tài)下是需要拋棄新任務的。
③如果想任務隊列中添加任務成功,需要進行二次校驗,因為在添加任務到任務隊列后,可能線程池的狀態(tài)發(fā)生了變化,所以這里需要進行二次校驗,如果當前線程池已經(jīng)不是RUNNING狀態(tài)了,需要將任務從任務隊列中移除,然后執(zhí)行拒絕策略;如果二次校驗通過,則執(zhí)行4-3代碼重新判斷當前線程池是否為空,如果線程池為空沒有線程,那么就需要新創(chuàng)建一個線程。
④如果上面的步驟(4)創(chuàng)建添加任務失敗,說明隊列已滿,那么(5)會嘗試再開啟新的線程執(zhí)行任務(類比上圖中的thread3和thread4,即不是核心線程的那些線程),如果當前線程池中的線程個數(shù)已經(jīng)大于最大線程數(shù)maximumPoolSize,表示不能開啟新的線程。這就屬于線程池滿并且任務隊列滿,就需要執(zhí)行拒絕策略了。
下面我們在看看addWorker方法的實現(xiàn)
1 private boolean addWorker(Runnable firstTask, boolean core) {
2? ? retry:
3? ? for (;;) {
4? ? ? ? int c = ctl.get();
5? ? ? ? int rs = runStateOf(c);
6
7? ? ? ? //(6)檢查隊列是否只在必要時候為空
8? ? ? ? if (rs >= SHUTDOWN &&
9? ? ? ? ? ? ! (rs == SHUTDOWN &&
10? ? ? ? ? ? ? ? firstTask == null &&
11? ? ? ? ? ? ? ? ! workQueue.isEmpty()))
12? ? ? ? ? ? return false;
13
14? ? ? ? //(7)使用CAS增加線程個數(shù)
15? ? ? ? for (;;) {
16? ? ? ? ? ? //根據(jù)ctl值獲得當前線程池中的線程數(shù)量
17? ? ? ? ? ? int wc = workerCountOf(c);
18? ? ? ? ? ? //(7-1)如果線程數(shù)量超出限制,返回false
19? ? ? ? ? ? if (wc >= CAPACITY ||
20? ? ? ? ? ? ? ? wc >= (core ? corePoolSize : maximumPoolSize))
21? ? ? ? ? ? ? ? return false;
22? ? ? ? ? ? //(7-2)CAS增加線程數(shù)量,同時只有一個線程可以成功
23? ? ? ? ? ? if (compareAndIncrementWorkerCount(c))
24? ? ? ? ? ? ? ? break retry;
25? ? ? ? ? ? c = ctl.get();? // 重新讀取ctl值
26? ? ? ? ? ? //(7-3)CAS失敗了,需要查看當前線程池狀態(tài)是否發(fā)生變化,如果發(fā)生變化需要跳轉(zhuǎn)到外層循環(huán)嘗試重新獲取線程池狀態(tài),否則內(nèi)層循環(huán)重新進行CAS增加線程數(shù)量
27? ? ? ? ? ? if (runStateOf(c) != rs)
28? ? ? ? ? ? ? ? continue retry;
29? ? ? ? }
30? ? }
31
32? ? //(8)執(zhí)行到這里說明CAS增加新線程個數(shù)成功了,我們需要開始創(chuàng)建新的工作線程Worker
33? ? boolean workerStarted = false;
34? ? boolean workerAdded = false;
35? ? Worker w = null;
36? ? try {
37? ? ? ? //(8-1)創(chuàng)建新的worker
38? ? ? ? w = new Worker(firstTask);
39? ? ? ? final Thread t = w.thread;
40? ? ? ? if (t != null) {
41? ? ? ? ? ? final ReentrantLock mainLock = this.mainLock;
42? ? ? ? ? ? //(8-2)加獨占鎖,保證workers的同步,可能線程池中的多個線程調(diào)用了線程池的execute方法
43? ? ? ? ? ? mainLock.lock();
44? ? ? ? ? ? try {
45? ? ? ? ? ? ? ? // (8-3)重新檢查線程池狀態(tài),以免在獲取鎖之前調(diào)用shutdown方法改變線程池狀態(tài)
46? ? ? ? ? ? ? ? int rs = runStateOf(ctl.get());
47
48? ? ? ? ? ? ? ? if (rs < SHUTDOWN ||
49? ? ? ? ? ? ? ? ? ? (rs == SHUTDOWN && firstTask == null)) {
50? ? ? ? ? ? ? ? ? ? if (t.isAlive()) // precheck that t is startable
51? ? ? ? ? ? ? ? ? ? ? ? throw new IllegalThreadStateException();
52? ? ? ? ? ? ? ? ? ? //(8-4)添加新任務
53? ? ? ? ? ? ? ? ? ? workers.add(w);
54? ? ? ? ? ? ? ? ? ? int s = workers.size();
55? ? ? ? ? ? ? ? ? ? if (s > largestPoolSize)
56? ? ? ? ? ? ? ? ? ? ? ? largestPoolSize = s;
57? ? ? ? ? ? ? ? ? ? workerAdded = true;
58? ? ? ? ? ? ? ? }
59? ? ? ? ? ? } finally {
60? ? ? ? ? ? ? ? mainLock.unlock();
61? ? ? ? ? ? }
62? ? ? ? ? ? //(8-6)添加新任務成功之后,啟動任務
63? ? ? ? ? ? if (workerAdded) {
64? ? ? ? ? ? ? ? t.start();
65? ? ? ? ? ? ? ? workerStarted = true;
66? ? ? ? ? ? }
67? ? ? ? }
68? ? } finally {
69? ? ? ? if (! workerStarted)
70? ? ? ? ? ? addWorkerFailed(w);
71? ? }
72? ? return workerStarted;
73 }
簡單再分析說明一下上面的代碼,addWorker方法主要分為兩部分,第一部分是使用CAS線程安全的添加線程數(shù)量,第二部分則是創(chuàng)建新的線程并且將任務并發(fā)安全的添加到新的workers之中,然后啟動線程執(zhí)行。
①代碼(6)中檢查隊列是否只在必要時候為空,只有線程池狀態(tài)符合條件才能夠進行下面的步驟,從(6)中的判斷條件來看,下面的集中情況addWorker會直接返回false
( I )當前線程池狀態(tài)為STOP,TIDYING或者TERMINATED ; (I I)當前線程池狀態(tài)為SHUTDOWN并且已經(jīng)有了第一個任務; (I I I)當前線程池狀態(tài)為SHUTDOWN并且任務隊列為空
②外層循環(huán)中判斷條件通過之后,在內(nèi)層循環(huán)中使用CAS增加線程數(shù),當CAS成功就退出雙重循環(huán)進行(8)步驟代碼的執(zhí)行,如果失敗需要查看當前線程池的狀態(tài)是否發(fā)生變化,如果發(fā)生變化需要進行外層循環(huán)重新判斷線程池狀態(tài)然后在進入內(nèi)層循環(huán)重新進行CAS增加線程數(shù),如果線程池狀態(tài)沒有發(fā)生變化但是上一次CAS失敗就繼續(xù)進行CAS嘗試。
③執(zhí)行到(8)代碼處,表明當前已經(jīng)成功增加 了線程數(shù),但是還沒有線程執(zhí)行任務。ThreadPoolExecutor中使用全局獨占鎖mainLock來控制將新增的工作線程Worker線程安全的添加到工作者線程集合workers中。
④(8-2)獲取了獨占鎖,但是在獲取到鎖之后,還需要進行重新檢查線程池的狀態(tài),這是為了避免在獲取全局獨占鎖之前其他線程調(diào)用了shutDown方法關閉了線程池。如果線程池已經(jīng)關閉需要釋放鎖。否則將新增的線程添加到工作集合中,釋放鎖啟動線程執(zhí)行任務。
上面的addWorker方法最后幾行中,會判斷添加工作線程是否成功,如果失敗,會執(zhí)行addWorkerFailed方法,將任務從workers中移除,并且workerCount做-1操作。
1 private void addWorkerFailed(Worker w) {
2? ? final ReentrantLock mainLock = this.mainLock;
3? ? //獲取鎖
4? ? mainLock.lock();
5? ? try {
6? ? ? //如果worker不為null
7? ? ? if (w != null)
8? ? ? ? ? //workers移除worker
9? ? ? ? ? workers.remove(w);
10? ? ? //通過CAS操作,workerCount-1
11? ? ? decrementWorkerCount();
12? ? ? tryTerminate();
13? ? } finally {
14? ? ? //釋放鎖
15? ? ? mainLock.unlock();
16? ? }
17 }
四、工作線程Worker的執(zhí)行
(1)工作線程Worker類源碼分析
上面查看addWorker方法在CAS更新線程數(shù)成功之后,下面就是創(chuàng)建新的Worker線程執(zhí)行任務,所以我們這里先查看Worker類,下面是Worker類的源碼,我們可以看出,Worker類繼承了AQS并實現(xiàn)了Runnable接口,所以他既是一個自定義的同步組件,也是一個執(zhí)行任務的線程類。下面我們分析Worker類的執(zhí)行
1 private final class Worker
2? ? extends AbstractQueuedSynchronizer
3? ? implements Runnable
4 {
5
6? ? /** 使用線程工廠創(chuàng)建的線程,執(zhí)行任務 */
7? ? final Thread thread;
8? ? /** 初始化執(zhí)行任務 */
9? ? Runnable firstTask;
10? ? /** 計數(shù) */
11? ? volatile long completedTasks;
12
13? ? /**
14? ? ? * 給出初始firstTask,線程創(chuàng)建工廠創(chuàng)建新的線程
15? ? ? */
16? ? Worker(Runnable firstTask) {
17? ? ? ? setState(-1); // 防止在調(diào)用runWorker之前被中斷
18? ? ? ? this.firstTask = firstTask;
19? ? ? ? this.thread = getThreadFactory().newThread(this); //使用threadFactory創(chuàng)建線程
20? ? }
21
22? ? /** run方法實際上執(zhí)行的是runWorker方法? */
23? ? public void run() {
24? ? ? ? runWorker(this);
25? ? }
26
27? ? // 關于同步狀態(tài)(鎖)
28? ? //
29? ? // 同步狀態(tài)state=0表示鎖未被獲取
30? ? // 同步狀態(tài)state=1表示鎖被獲取
31
32? ? protected boolean isHeldExclusively() {
33? ? ? ? return getState() != 0;
34? ? }
35
36? ? //下面都是重寫AQS的方法,Worker為自定義的同步組件
37? ? protected boolean tryAcquire(int unused) {
38? ? ? ? if (compareAndSetState(0, 1)) {
39? ? ? ? ? ? setExclusiveOwnerThread(Thread.currentThread());
40? ? ? ? ? ? return true;
41? ? ? ? }
42? ? ? ? return false;
43? ? }
44
45? ? protected boolean tryRelease(int unused) {
46? ? ? ? setExclusiveOwnerThread(null);
47? ? ? ? setState(0);
48? ? ? ? return true;
49? ? }
50
51? ? public void lock()? ? ? ? { acquire(1); }
52? ? public boolean tryLock()? { return tryAcquire(1); }
53? ? public void unlock()? ? ? { release(1); }
54? ? public boolean isLocked() { return isHeldExclusively(); }
55
56? ? void interruptIfStarted() {
57? ? ? ? Thread t;
58? ? ? ? if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
59? ? ? ? ? ? try {
60? ? ? ? ? ? ? ? t.interrupt();
61? ? ? ? ? ? } catch (SecurityException ignore) {
62? ? ? ? ? ? }
63? ? ? ? }
64? ? }
65 }
在構(gòu)造函數(shù)中我們可以看出,首先將同步狀態(tài)state置為-1,而Worker這個同步組件的state有三個值,其中state=-1表示W(wǎng)ork創(chuàng)建時候的默認狀態(tài),創(chuàng)建時候設置state=-1是為了防止runWorker方法運行前被中斷?前面說到過這個結(jié)論,這里置為-1是為了避免當前Worker在調(diào)用runWorker方法之前被中斷(當其他線程調(diào)用線程池的shutDownNow時候,如果Worker的state>=0則會中斷線程),設置為-1就不會被中斷了。而Worker實現(xiàn)Runnable接口,那么需要重寫run方法,在run方法中,我們可以看到,實際上執(zhí)行的是runWorker方法,在runWorker方法中,會首先調(diào)用unlock方法,該方法會將state置為0,所以這個時候調(diào)用shutDownNow方法就會中斷當前線程,而這個時候已經(jīng)進入了runWork方法了,就不會在還沒有執(zhí)行runWorker方法的時候就中斷線程。
(2)runWorker方法的源碼分析
1 final void runWorker(Worker w) {
2? ? Thread wt = Thread.currentThread();
3? ? Runnable task = w.firstTask;
4? ? w.firstTask = null;
5? ? w.unlock(); // 這個時候調(diào)用unlock方法,將state置為0,就可以被中斷了
6? ? boolean completedAbruptly = true;
7? ? try {
8? ? ? ? //(10)如果當前任務為null,或者從任務隊列中獲取到的任務為null,就跳轉(zhuǎn)到(11)處執(zhí)行清理工作
9? ? ? ? while (task != null || (task = getTask()) != null) {
10? ? ? ? ? ? //task不為null,就需要線程執(zhí)行任務,這個時候,需要獲取工作線程內(nèi)部持有的獨占鎖
11? ? ? ? ? ? w.lock();
12? ? ? ? ? ? /**如果線程池已被停止(STOP)(至少大于STOP狀態(tài)),要確保線程都被中斷
13? ? ? ? ? ? ? * 如果狀態(tài)不對,檢查當前線程是否中斷并清除中斷狀態(tài),并且再次檢查線程池狀態(tài)是否大于STOP
14? ? ? ? ? ? ? * 如果上述滿足,檢查該對象是否處于中斷狀態(tài),不清除中斷標記
15? ? ? ? ? ? ? */
16? ? ? ? ? ? if ((runStateAtLeast(ctl.get(), STOP) ||
17? ? ? ? ? ? ? ? ? (Thread.interrupted() &&
18? ? ? ? ? ? ? ? ? runStateAtLeast(ctl.get(), STOP))) &&
19? ? ? ? ? ? ? ? !wt.isInterrupted())
20? ? ? ? ? ? ? ? //中斷該對象
21? ? ? ? ? ? ? ? wt.interrupt();
22? ? ? ? ? ? try {
23? ? ? ? ? ? ? ? //執(zhí)行任務之前要做的事情
24? ? ? ? ? ? ? ? beforeExecute(wt, task);
25? ? ? ? ? ? ? ? Throwable thrown = null;
26? ? ? ? ? ? ? ? try {
27? ? ? ? ? ? ? ? ? ? task.run(); //執(zhí)行任務
28? ? ? ? ? ? ? ? } catch (RuntimeException x) {
29? ? ? ? ? ? ? ? ? ? thrown = x; throw x;
30? ? ? ? ? ? ? ? } catch (Error x) {
31? ? ? ? ? ? ? ? ? ? thrown = x; throw x;
32? ? ? ? ? ? ? ? } catch (Throwable x) {
33? ? ? ? ? ? ? ? ? ? thrown = x; throw new Error(x);
34? ? ? ? ? ? ? ? } finally {
35? ? ? ? ? ? ? ? ? ? //執(zhí)行任務之后的方法
36? ? ? ? ? ? ? ? ? ? afterExecute(task, thrown);
37? ? ? ? ? ? ? ? }
38? ? ? ? ? ? } finally {
39? ? ? ? ? ? ? ? task = null;
40? ? ? ? ? ? ? ? //更新當前已完成任務數(shù)量
41? ? ? ? ? ? ? ? w.completedTasks++;
42? ? ? ? ? ? ? ? //釋放鎖
43? ? ? ? ? ? ? ? w.unlock();
44? ? ? ? ? ? }
45? ? ? ? }
46? ? ? ? completedAbruptly = false;
47? ? } finally {
48? ? ? ? //執(zhí)行清理工作:處理并退出當前worker
49? ? ? ? processWorkerExit(w, completedAbruptly);
50? ? }
51 }
我們梳理一下runWorker方法的執(zhí)行流程
①首先先執(zhí)行unlock方法,將Worker的state置為0,這樣工作線程就可以被中斷了(后續(xù)的操作如果線程池關閉就需要線程被中斷)
②首先判斷判斷當前的任務(當前工作線程中的task,或者從任務隊列中取出的task)是否為null,如果不為null就往下執(zhí)行,為null就執(zhí)行processWorkerExit方法。
③獲取工作線程內(nèi)部持有的獨占鎖(避免在執(zhí)行任務期間,其他線程調(diào)用shutdown后正在執(zhí)行的任務被中斷,shutdown只會中斷當前被阻塞掛起的沒有執(zhí)行任務的線程)
④然后執(zhí)行beforeExecute()方法,該方法為擴展接口代碼,表示在具體執(zhí)行任務之前所做的一些事情,然后執(zhí)行task.run()方法執(zhí)行具體任務,執(zhí)行完之后會調(diào)用afterExecute()方法,用以處理任務執(zhí)行完畢之后的工作,也是一個擴展接口代碼。
⑤更新當前線程池完成的任務數(shù),并釋放鎖
(3)執(zhí)行清理工作的方法processWorkerExit
下面是方法processWorkerExit的源碼,在下面的代碼中
①首先(1-1)處統(tǒng)計線程池完成的任務個數(shù),并且在此之前獲取全局鎖,然后更新當前的全局計數(shù)器,然后從工作線程集合中移除當前工作線程,完成清理工作。
②代碼(1-2)調(diào)用了tryTerminate 方法,在該方法中,判斷了當前線程池狀態(tài)是SHUTDOWN并且隊列不為空或者當前線程池狀態(tài)為STOP并且當前線程池中沒有活動線程,則置線程池狀態(tài)為TERMINATED。如果設置稱為了TERMINATED狀態(tài),還需要調(diào)用全局條件變量termination的signalAll方法喚醒所有因為調(diào)用線程池的awaitTermination方法而被阻塞住的線程,使得線程池中的所有線程都停止,從而使得線程池為TERMINATED狀態(tài)。
③代碼(1-3)處判斷當前線程池中的線程個數(shù)是否小于核心線程數(shù),如果是,需要新增一個線程保證有足夠的線程可以執(zhí)行任務隊列中的任務或者提交的任務。
1 private void processWorkerExit(Worker w, boolean completedAbruptly) {
2? ? /*
3? ? *completedAbruptly:是由runWorker傳過來的參數(shù),表示是否突然完成的意思
4? ? *當在就是在執(zhí)行任務過程當中出現(xiàn)異常,就會突然完成,傳true
5? ? *
6? ? *如果是突然完成,需要通過CAS操作,更新workerCount(-1操作)
7? ? *不是突然完成,則不需要-1,因為getTask方法當中已經(jīng)-1(getTask方法中執(zhí)行了decrementWorkerCount()方法)
8? ? */
9? ? if (completedAbruptly)
10? ? ? ? decrementWorkerCount();
11? ? //(1-1)在統(tǒng)計完成任務個數(shù)之前加上全局鎖,然后統(tǒng)計線程池中完成的任務個數(shù)并更新全局計數(shù)器,并從工作集中刪除當前worker
12? ? final ReentrantLock mainLock = this.mainLock;
13? ? mainLock.lock(); //獲得全局鎖
14? ? try {
15? ? ? ? completedTaskCount += w.completedTasks; //更新已完成的任務數(shù)量
16? ? ? ? workers.remove(w); //將完成該任務的線程worker從工作線程集合中移除
17? ? } finally {
18? ? ? ? mainLock.unlock(); //釋放鎖
19? ? }
20? ? /**(1-2)
21? ? ? * 這一個方法調(diào)用完成了下面的事情:
22? ? ? * 判斷如果當前線程池狀態(tài)是SHUTDOWN并且工作隊列為空,
23? ? ? * 或者當前線程池狀態(tài)是STOP并且當前線程池里面沒有活動線程,
24? ? ? * 則設置當前線程池狀態(tài)為TERMINATED,如果設置成了TERMINATED狀態(tài),
25? ? ? * 還需要調(diào)用條件變量termination的signAll方法激活所有因為調(diào)用線程池的awaitTermination方法而被阻塞的線程
26? ? ? */
27? ? tryTerminate();
28
29? ? //(1-3)如果當前線程池中線程數(shù)小于核心線程,則增加核心線程數(shù)
30? ? int c = ctl.get();
31? ? //判斷當前線程池的狀態(tài)是否小于STOP(RUNNING或者SHUTDOWN)
32? ? if (runStateLessThan(c, STOP)) {
33? ? ? ? //如果任務忽然完成,執(zhí)行后續(xù)的代碼
34? ? ? ? if (!completedAbruptly) {
35? ? ? ? ? ? //allowCoreThreadTimeOut表示是否允許核心線程超時,默認為false
36? ? ? ? ? ? //min這里當默認為allowCoreThreadTimeOut默認為false的時候,min置為coorPoolSize
37? ? ? ? ? ? int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
38? ? ? ? ? ? //這里說明:如果允許核心線程超時,那么allowCoreThreadTimeOut可為true,那么min值為0,不需要維護核心線程了
39? ? ? ? ? ? //如果min為0并且任務隊列不為空
40? ? ? ? ? ? if (min == 0 && ! workQueue.isEmpty())
41? ? ? ? ? ? ? ? min = 1; //這里表示如果min為0,且隊列不為空,那么至少需要一個核心線程存活來保證任務的執(zhí)行
42? ? ? ? ? ? //如果工作線程數(shù)大于min,表示當前線程數(shù)滿足,直接返回
43? ? ? ? ? ? if (workerCountOf(c) >= min)
44? ? ? ? ? ? ? ? return; // replacement not needed
45? ? ? ? }
46? ? ? ? addWorker(null, false);
47? ? }
48 }
在tryTerminate 方法中,我們簡單說明了該方法的作用,下面是該方法的源碼,可以看出源碼實現(xiàn)上和上面所總結(jié)的功能是差不多的
1 final void tryTerminate() {
2? ? for (;;) {
3? ? ? ? //獲取線程池狀態(tài)
4? ? ? ? int c = ctl.get();
5? ? ? ? //如果線程池狀態(tài)為RUNNING
6? ? ? ? //或者狀態(tài)大于TIDYING
7? ? ? ? //或者狀態(tài)==SHUTDOWN并未任務隊列不為空
8? ? ? ? //直接返回,不能調(diào)用terminated方法
9? ? ? ? if (isRunning(c) ||
10? ? ? ? ? ? runStateAtLeast(c, TIDYING) ||
11? ? ? ? ? ? (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
12? ? ? ? ? ? return;
13? ? ? ? //如果線程池中工作線程數(shù)不為0,需要中斷線程
14? ? ? ? if (workerCountOf(c) != 0) { // Eligible to terminate
15? ? ? ? ? ? interruptIdleWorkers(ONLY_ONE);
16? ? ? ? ? ? return;
17? ? ? ? }
18? ? ? ? //獲得線程池的全局鎖
19? ? ? ? final ReentrantLock mainLock = this.mainLock;
20? ? ? ? mainLock.lock();
21? ? ? ? try {
22? ? ? ? ? ? //通過CAS操作,將線程池狀態(tài)設置為TIDYING
23? ? ? ? ? ? if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { //private static int ctlOf(int rs, int wc) { return rs | wc; }
24? ? ? ? ? ? ? ? try {
25? ? ? ? ? ? ? ? ? ? //調(diào)用terminated方法
26? ? ? ? ? ? ? ? ? ? terminated();
27? ? ? ? ? ? ? ? } finally {
28? ? ? ? ? ? ? ? ? ? //最終將線程狀態(tài)設置為TERMINATED
29? ? ? ? ? ? ? ? ? ? ctl.set(ctlOf(TERMINATED, 0));
30? ? ? ? ? ? ? ? ? ? //調(diào)用條件變量termination的signaAll方法喚醒所有因為
31? ? ? ? ? ? ? ? ? ? //調(diào)用線程池的awaitTermination方法而被阻塞的線程
32? ? ? ? ? ? ? ? ? ? //private final Condition termination = mainLock.newCondition();
33? ? ? ? ? ? ? ? ? ? termination.signalAll();
34? ? ? ? ? ? ? ? }
35? ? ? ? ? ? ? ? return;
36? ? ? ? ? ? }
37? ? ? ? } finally {
38? ? ? ? ? ? mainLock.unlock();
39? ? ? ? }
40? ? ? ? // else retry on failed CAS
41? ? }
42 }
五、補充(shutdown、shutdownNow、awaitTermination方法)
(1)shutdown操作
我們在使用線程池的時候知道,調(diào)用shutdown方法之后線程池就不會再接受新的任務了,但是任務隊列中的任務還是需要執(zhí)行完的。調(diào)用該方法會立刻返回,并不是等到線程池的任務隊列中的所有任務執(zhí)行完畢在返回的。
1 public void shutdown() {
2? ? //獲得線程池的全局鎖
3? ? final ReentrantLock mainLock = this.mainLock;
4? ? mainLock.lock();
5? ? try {
6? ? ? ? //進行權(quán)限檢查
7? ? ? ? checkShutdownAccess();
8? ? ? ?
9? ? ? ? //設置當前線程池的狀態(tài)的SHUTDOWN,如果線程池狀態(tài)已經(jīng)是該狀態(tài)就會直接返回,下面我們會分析這個方法的源碼
10? ? ? ? advanceRunState(SHUTDOWN);
11? ? ? ?
12? ? ? ? //設置中斷 標志
13? ? ? ? interruptIdleWorkers();
14? ? ? ? onShutdown(); // hook for ScheduledThreadPoolExecutor
15? ? } finally {
16? ? ? ? mainLock.unlock();
17? ? }
18? ? //嘗試將狀態(tài)變?yōu)門ERMINATED,上面已經(jīng)分析過該方法的源碼
19? ? tryTerminate();
20 }
該方法的源碼比較簡短,首先檢查了安全管理器,是查看當前調(diào)用shutdown命令的線程是否有關閉線程的權(quán)限,如果有權(quán)限還需要看調(diào)用線程是否有中斷工作線程的權(quán)限,如果沒有權(quán)限將會拋出SecurityException異?;蛘呖罩羔槷惓!O旅嫖覀儾榭匆幌耡dvanceRunState 方法的源碼。
1 private void advanceRunState(int targetState) {
2? ? for (;;) {
3? ? ? ? //下面的方法執(zhí)行的就是:
4? ? ? ? //首先獲取線程的ctl值,然后判斷當前線程池的狀態(tài)如果已經(jīng)是SHUTDOWN,那么if條件第一個為真就直接返回
5? ? ? ? //如果不是SHUTDOWN狀態(tài),就需要CAS的設置當前狀態(tài)為SHUTDOWN
6? ? ? ? int c = ctl.get();
7? ? ? ? if (runStateAtLeast(c, targetState) ||
8? ? ? ? ? ? //private static int ctlOf(int rs, int wc) { return rs | wc; }
9? ? ? ? ? ? ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
10? ? ? ? ? ? break;
11? ? }
12 }
我們可以看出advanceRunState 方法實際上就是判斷當前線程池的狀態(tài)是否為SHUTDWON,如果是那么就返回,否則就需要設置當前狀態(tài)為SHUTDOWN。
我們再來看看shutdown方法中調(diào)用線程中斷的方法interruptIdleWorkers源碼
1 private void interruptIdleWorkers() {
2? ? interruptIdleWorkers(false);
3 }
4 private void interruptIdleWorkers(boolean onlyOne) {
5? ? final ReentrantLock mainLock = this.mainLock;
6? ? mainLock.lock();
7? ? try {
8? ? ? ? for (Worker w : workers) {
9? ? ? ? ? ? Thread t = w.thread;
10? ? ? ? ? ? //如果工作線程沒有被中斷,并且沒有正在運行設置中斷標志
11? ? ? ? ? ? if (!t.isInterrupted() && w.tryLock()) {
12? ? ? ? ? ? ? ? try {
13? ? ? ? ? ? ? ? ? ? //需要中斷當前線程
14? ? ? ? ? ? ? ? ? ? t.interrupt();
15? ? ? ? ? ? ? ? } catch (SecurityException ignore) {
16? ? ? ? ? ? ? ? } finally {
17? ? ? ? ? ? ? ? ? ? w.unlock();
18? ? ? ? ? ? ? ? }
19? ? ? ? ? ? }
20? ? ? ? ? ? if (onlyOne)
21? ? ? ? ? ? ? ? break;
22? ? ? ? }
23? ? } finally {
24? ? ? ? mainLock.unlock();
25? ? }
26 }
上面的代碼中,需要設置所有空閑線程的中斷標志。首先獲取線程池的全局鎖,同時只有一個線程可以調(diào)用shutdown方法設置中斷標志。然后嘗試獲取工作線程Worker自己的鎖,獲取成功則可以設置中斷標志(這是由于正在執(zhí)行任務的線程需要獲取自己的鎖,并且不可重入,所以正在執(zhí)行的任務沒有被中斷),這里要中斷的那些線程是阻塞到getTask()方法并嘗試從任務隊列中獲取任務的線程即空閑線程。
(2)shutdownNow操作
在使用線程池的時候,如果我們調(diào)用了shutdownNow方法,線程池不僅不會再接受新的任務,還會將任務隊列中的任務丟棄,正在執(zhí)行的任務也會被中斷,然后立刻返回該方法,不會等待激活的任務完成,返回值為當前任務隊列中被丟棄的任務列表
1 public List<Runnable> shutdownNow() {
2? ? List<Runnable> tasks;
3? ? final ReentrantLock mainLock = this.mainLock;
4? ? mainLock.lock();
5? ? try {
6? ? ? ? checkShutdownAccess(); //還是進行權(quán)限檢查
7? ? ? ? advanceRunState(STOP); //設置線程池狀態(tài)臺STOP
8? ? ? ? interruptWorkers(); //中斷所有線程
9? ? ? ? tasks = drainQueue(); //將任務隊列中的任務移動到task中
10? ? } finally {
11? ? ? ? mainLock.unlock();
12? ? }
13? ? tryTerminate();
14? ? return tasks; //返回tasks
15 }
從上面的代碼中,我們可以可以發(fā)現(xiàn),shutdownNow方法也是首先需要檢查調(diào)用該方法的線程的權(quán)限,之后不同于shutdown方法之處在于需要即刻設置當前線程池狀態(tài)為STOP,然后中斷所有線程(空閑線程+正在執(zhí)行任務的線程),移除任務隊列中的任務
1 private void interruptWorkers() {
2? ? final ReentrantLock mainLock = this.mainLock;
3? ? mainLock.lock();
4? ? try {
5? ? ? ? for (Worker w : workers) //不需要判斷當前線程是否在執(zhí)行任務(即不需要調(diào)用w.tryLock方法),中斷所有線程
6? ? ? ? ? ? w.interruptIfStarted();
7? ? } finally {
8? ? ? ? mainLock.unlock();
9? ? }
10 }
(3)awaitTermination操作
當線程調(diào)用該方法之后,會阻塞調(diào)用者線程,直到線程池狀態(tài)為TERMINATED狀態(tài)才會返回,或者等到超時時間到之后會返回,下面是該方法的源碼。
1 //調(diào)用該方法之后,會阻塞調(diào)用者線程,直到線程池狀態(tài)為TERMINATED狀態(tài)才會返回,或者等到超時時間到之后會返回
2 public boolean awaitTermination(long timeout, TimeUnit unit)
3? ? throws InterruptedException {
4? ? long nanos = unit.toNanos(timeout);
5? ? final ReentrantLock mainLock = this.mainLock;
6? ? mainLock.lock();
7? ? try {
8? ? ? ? //阻塞當前線程,(獲取了Worker自己的鎖),那么當前線程就不會再執(zhí)行任務(因為獲取不到鎖)
9? ? ? ? for (;;) {
10? ? ? ? ? ? //當前線程池狀態(tài)為TERMINATED狀態(tài),會返回true
11? ? ? ? ? ? if (runStateAtLeast(ctl.get(), TERMINATED))
12? ? ? ? ? ? ? ? return true;
13? ? ? ? ? ? //超時時間到返回false
14? ? ? ? ? ? if (nanos <= 0)
15? ? ? ? ? ? ? ? return false;
16? ? ? ? ? ? nanos = termination.awaitNanos(nanos);
17? ? ? ? }
18? ? } finally {
19? ? ? ? mainLock.unlock();
20? ? }
21 }
在上面的代碼中,調(diào)用者線程需要首先獲取線程Worker 自己的獨占鎖,然后在循環(huán)判斷當前線程池是否已經(jīng)是TERMINATED狀態(tài),如果是則直接返回,否則說明當前線程池中還有線程正在執(zhí)行任務,這時候需要查看當前設置的超時時間是否小于0,小于0說明不需要等待就直接返回,如果大于0就需要調(diào)用條件變量termination的awaitNanos方法等待設置的時間,并在這段時間之內(nèi)等待線程池的狀態(tài)變?yōu)門ERMINATED。
我們在前面說到清理線程池的方法processWorkerExit的時候,需要調(diào)用tryTerminated方法,在該方法中會查看當前線程池狀態(tài)是否為TERMINATED,如果是該狀態(tài)也會調(diào)用termination.signalAll()方法喚醒所有線程池中因調(diào)用awaitTermination而被阻塞住的線程。
如果是設置了超時時間,那么termination的awaitNanos方法也會返回,這時候需要重新檢查線程池狀態(tài)是否為TERMINATED,如果是則返回,不是就繼續(xù)阻塞自己。
歡迎工作一到五年的Java工程師朋友們加入Java程序員開發(fā): 721575865
群內(nèi)提供免費的Java架構(gòu)學習資料(里面有高可用、高并發(fā)、高性能及分布式、Jvm性能調(diào)優(yōu)、Spring源碼,MyBatis,Netty,Redis,Kafka,Mysql,Zookeeper,Tomcat,Docker,Dubbo,Nginx等多個知識點的架構(gòu)資料)合理利用自己每一分每一秒的時間來學習提升自己,不要再用"沒有時間“來掩飾自己思想上的懶惰!趁年輕,使勁拼,給未來的自己一個交代!