Java并發(fā)包中線程池ThreadPoolExecutor原理探究

一、線程池簡介

線程池的使用主要是解決兩個問題:①當執(zhí)行大量異步任務的時候線程池能夠提供更好的性能,在不使用線程池時候,每當需要執(zhí)行異步任務的時候直接new一個線程來運行的話,線程的創(chuàng)建和銷毀都是需要開銷的。而線程池中的線程是可復用的,不需要每次執(zhí)行異步任務的時候重新創(chuàng)建和銷毀線程;②線程池提供一種資源限制和管理的手段,比如可以限制線程的個數(shù),動態(tài)的新增線程等等。

在下面的分析中,我們可以看到,線程池使用一個Integer的原子類型變量來記錄線程池狀態(tài)和線程池中的線程數(shù)量,通過線程池狀態(tài)來控制任務的執(zhí)行,每個工作線程Worker線程可以處理多個任務。

二、ThreadPoolExecutor類

1、我們先簡單看一下關于ThreadPoolExecutor的一些成員變量以及其所表示的含義

ThreadPoolExecutor繼承了AbstractExecutorService,其中的成員變量ctl是一個Integer類型的原子變量,用來記錄線程池的狀態(tài)和線程池中的線程的個數(shù),類似于前面講到的讀寫鎖中使用一個變量保存兩種信息。這里(Integer看做32位)ctl高三位表示線程池的狀態(tài),后面的29位表示線程池中的線程個數(shù)。如下所示是ThreadPoolExecutor源碼中的成員變量

1 //(高3位)表示線程池狀態(tài),(低29位)表示線程池中線程的個數(shù);

2 // 默認狀態(tài)是RUNNING,線程池中線程個數(shù)為0

3 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

4

5 //表示具體平臺下Integer的二進制位數(shù)-3后的剩余位數(shù)表示的數(shù)才是線程的個數(shù);

6 //其中Integer.SIZE=32,-3之后的低29位表示的就是線程的個數(shù)了

7 private static final int COUNT_BITS = Integer.SIZE - 3;

8

9 //線程最大個數(shù)(低29位)00011111111111111111111111111111(1<<29-1)

10 private static final int CAPACITY? = (1 << COUNT_BITS) - 1;

11

12 //線程池狀態(tài)(高3位表示線程池狀態(tài))

13 //111 00000000000000000000000000000

14 private static final int RUNNING? ? = -1 << COUNT_BITS;

15

16 //000 00000000000000000000000000000

17 private static final int SHUTDOWN? =? 0 << COUNT_BITS;

18

19 //001 00000000000000000000000000000

20 private static final int STOP? ? ? =? 1 << COUNT_BITS;

21

22 //010 00000000000000000000000000000

23 private static final int TIDYING? ? =? 2 << COUNT_BITS;

24

25 //011 00000000000000000000000000000

26 private static final int TERMINATED =? 3 << COUNT_BITS;

27

28 //獲取高3位(運行狀態(tài))==> c & 11100000000000000000000000000000

29 private static int runStateOf(int c)? ? { return c & ~CAPACITY; }

30

31 //獲取低29位(線程個數(shù))==> c &? 00011111111111111111111111111111

32 private static int workerCountOf(int c)? { return c & CAPACITY; }

33

34 //計算原子變量ctl新值(運行狀態(tài)和線程個數(shù))

35 private static int ctlOf(int rs, int wc) { return rs | wc; }

下面我們簡單解釋一下上面的線程狀態(tài)的含義:

①RUNNING:接受新任務并處理阻塞隊列中的任務

②SHUTDOWN:拒絕新任務但是處理阻塞隊列中的任務

③STOP:拒絕新任務并拋棄阻塞隊列中的任務,同時會中斷當前正在執(zhí)行的任務

④TIDYING:所有任務執(zhí)行完之后(包含阻塞隊列中的任務)當前線程池中活躍的線程數(shù)量為0,將要調(diào)用terminated方法

⑥TERMINATED:終止狀態(tài)。terminated方法調(diào)用之后的狀態(tài)

2、下面初步了解一下ThreadPoolExecutor的參數(shù)以及實現(xiàn)原理

①corePoolSize:線程池核心現(xiàn)車個數(shù)

②workQueue:用于保存等待任務執(zhí)行的任務的阻塞隊列(比如基于數(shù)組的有界阻塞隊列ArrayBlockingQueue、基于鏈表的無界阻塞隊列LinkedBlockingQueue等等)

③maximumPoolSize:線程池最大線程數(shù)量

④ThreadFactory:創(chuàng)建線程的工廠

⑤RejectedExecutionHandler:拒絕策略,表示當隊列已滿并且線程數(shù)量達到線程池最大線程數(shù)量的時候?qū)π绿峤坏娜蝿账扇〉牟呗?,主要有四種策略:AbortPolicy(拋出異常)、CallerRunsPolicy(只用調(diào)用者所在線程來運行該任務)、DiscardOldestPolicy(丟掉阻塞隊列中最近的一個任務來處理當前提交的任務)、DiscardPolicy(不做處理,直接丟棄掉)

⑥keepAliveTime:存活時間,如果當前線程池中的數(shù)量比核心線程數(shù)量多,并且當前線程是閑置狀態(tài),該變量就是這些線程的最大生存時間

⑦TimeUnit:存活時間的時間單位。

根據(jù)上面的參數(shù)介紹,簡單了解一下線程池的實現(xiàn)原理,以提交一個新任務為開始點,分析線程池的主要處理流程

3、關于一些線程池的使用類型

①newFixedThreadPool:創(chuàng)建一個核心線程個數(shù)和最大線程個數(shù)均為nThreads的線程池,并且阻塞隊列長度為Integer.MAX_VALUE,keepAliveTime=0說明只要線程個數(shù)比核心線程個數(shù)多并且當前空閑即回收。

public static ExecutorService newFixedThreadPool(int nThreads) {

? ? return new ThreadPoolExecutor(nThreads, nThreads,

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? 0L, TimeUnit.MILLISECONDS,

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? new LinkedBlockingQueue<Runnable>());

}

②newSingleThreadExecutor:創(chuàng)建一個核心線程個數(shù)和最大線程個數(shù)都為1 的線程池,并且阻塞隊列長度為Integer.MAX_VALUE,keepAliveTime=0說明只要線程個數(shù)比核心線程個數(shù)多并且當前線程空閑即回收該線程。

public static ExecutorService newSingleThreadExecutor() {

? ? return new FinalizableDelegatedExecutorService

? ? ? ? (new ThreadPoolExecutor(1, 1,

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? 0L, TimeUnit.MILLISECONDS,

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? new LinkedBlockingQueue<Runnable>()));

}

③newCachedThreadPoolExecutor:創(chuàng)建一個按需創(chuàng)建線程的線程池,初始線程個數(shù)為0,最多線程個數(shù)為Integer.MAX_VALUE,并且阻塞隊列為同步隊列(最多只有一個元素),keepAliveTime=60說明只要當前線程在60s內(nèi)空閑則回收。這個類型的線程池的特點就是:加入同步隊列的任務會被馬上執(zhí)行,同步隊列中最多只有一個任務

public static ExecutorService newCachedThreadPool() {

? ? return new ThreadPoolExecutor(0, Integer.MAX_VALUE,

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? 60L, TimeUnit.SECONDS,

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? new SynchronousQueue<Runnable>());

}

4、ThreadPoolExecutor中的其他成員

其中的ReentrantLock可參考前面寫到的Java中的鎖——Lock和synchronized,其中降到了ReentrantLock的具體實現(xiàn)原理;

關于AQS部分可參考前面說到的Java中的隊列同步器AQS,也講到了關于AQS的具體實現(xiàn)原理分析;

關于條件隊列的相關知識可參考前面寫的Java中的線程協(xié)作之Condition,里面說到了關于Java中線程協(xié)作Condition的實現(xiàn)原理;

//獨占鎖,用來控制新增工作線程Worker操作的原子性

private final ReentrantLock mainLock = new ReentrantLock();

//工作線程集合,Worker繼承了AQS接口和Runnable接口,是具體處理任務的線程對象

//Worker實現(xiàn)AQS,并自己實現(xiàn)了簡單不可重入獨占鎖,其中state=0表示當前鎖未被獲取狀態(tài),state=1表示鎖被獲取,

//state=-1表示W(wǎng)ork創(chuàng)建時候的默認狀態(tài),創(chuàng)建時候設置state=-1是為了防止runWorker方法運行前被中斷

private final HashSet<Worker> workers = new HashSet<Worker>();

//termination是該鎖對應的條件隊列,在線程調(diào)用awaitTermination時候用來存放阻塞的線程

private final Condition termination = mainLock.newCondition();

三、execute(Runnable command)方法實現(xiàn)

executor方法的作用是提交任務command到線程池執(zhí)行,可以簡單的按照下面的圖進行理解,ThreadPoolExecutor的實現(xiàn)類似于一個生產(chǎn)者消費者模型,當用戶添加任務到線程池中相當于生產(chǎn)者生產(chǎn)元素,workers工作線程則直接執(zhí)行任務或者從任務隊列中獲取任務,相當于消費之消費元素。

1 public void execute(Runnable command) {

2? ? //(1)首先檢查任務是否為null,為null拋出異常,否則進行下面的步驟

3? ? if (command == null)

4? ? ? ? throw new NullPointerException();

5? ? //(2)ctl值中包含了當前線程池的狀態(tài)和線程池中的線程數(shù)量

6? ? int c = ctl.get();

7? ? //(3)workerCountOf方法是獲取低29位,即獲取當前線程池中的線程個數(shù),如果小于corePoolSize,就開啟新的線程運行

8? ? if (workerCountOf(c) < corePoolSize) {

9? ? ? ? if (addWorker(command, true))

10? ? ? ? ? ? return;

11? ? ? ? c = ctl.get();

12? ? }

13? ? //(4)如果線程池處理RUNNING狀態(tài),就添加任務到阻塞隊列中

14? ? if (isRunning(c) && workQueue.offer(command)) {

15? ? ? ? //(4-1)二次檢查,獲取ctl值

16? ? ? ? int recheck = ctl.get();

17? ? ? ? //(4-2)如果當前線程池不是出于RUNNING狀態(tài),就從隊列中刪除任務,并執(zhí)行拒絕策略

18? ? ? ? if (! isRunning(recheck) && remove(command))

19? ? ? ? ? ? reject(command);

20? ? ? ? //(4-3)否則,如果線程池為空,就添加一個線程

21? ? ? ? else if (workerCountOf(recheck) == 0)

22? ? ? ? ? ? addWorker(null, false);

23? ? }

24? ? //(5)如果隊列滿,則新增線程,如果新增線程失敗,就執(zhí)行拒絕策略

25? ? else if (!addWorker(command, false))

26? ? ? ? reject(command);

27 }

我們在看一下上面代碼的執(zhí)行流程,按照標記的數(shù)字進行分析:

①步驟(3)判斷當前線程池中的線程個數(shù)是否小于corePoolSize,如果小于核心線程數(shù),會向workers里面新增一個核心線程執(zhí)行任務。

②如果當前線程池中的線程數(shù)量大于核心線程數(shù),就執(zhí)行(4)。(4)首先判斷當前線程池是否處于RUNNING狀態(tài),如果處于該狀態(tài),就添加任務到任務隊列中,這里需要判斷線程池的狀態(tài)是因為線程池可能已經(jīng)處于非RUNNING狀態(tài),而在非RUNNING狀態(tài)下是需要拋棄新任務的。

③如果想任務隊列中添加任務成功,需要進行二次校驗,因為在添加任務到任務隊列后,可能線程池的狀態(tài)發(fā)生了變化,所以這里需要進行二次校驗,如果當前線程池已經(jīng)不是RUNNING狀態(tài)了,需要將任務從任務隊列中移除,然后執(zhí)行拒絕策略;如果二次校驗通過,則執(zhí)行4-3代碼重新判斷當前線程池是否為空,如果線程池為空沒有線程,那么就需要新創(chuàng)建一個線程。

④如果上面的步驟(4)創(chuàng)建添加任務失敗,說明隊列已滿,那么(5)會嘗試再開啟新的線程執(zhí)行任務(類比上圖中的thread3和thread4,即不是核心線程的那些線程),如果當前線程池中的線程個數(shù)已經(jīng)大于最大線程數(shù)maximumPoolSize,表示不能開啟新的線程。這就屬于線程池滿并且任務隊列滿,就需要執(zhí)行拒絕策略了。

下面我們在看看addWorker方法的實現(xiàn)

1 private boolean addWorker(Runnable firstTask, boolean core) {

2? ? retry:

3? ? for (;;) {

4? ? ? ? int c = ctl.get();

5? ? ? ? int rs = runStateOf(c);

6

7? ? ? ? //(6)檢查隊列是否只在必要時候為空

8? ? ? ? if (rs >= SHUTDOWN &&

9? ? ? ? ? ? ! (rs == SHUTDOWN &&

10? ? ? ? ? ? ? ? firstTask == null &&

11? ? ? ? ? ? ? ? ! workQueue.isEmpty()))

12? ? ? ? ? ? return false;

13

14? ? ? ? //(7)使用CAS增加線程個數(shù)

15? ? ? ? for (;;) {

16? ? ? ? ? ? //根據(jù)ctl值獲得當前線程池中的線程數(shù)量

17? ? ? ? ? ? int wc = workerCountOf(c);

18? ? ? ? ? ? //(7-1)如果線程數(shù)量超出限制,返回false

19? ? ? ? ? ? if (wc >= CAPACITY ||

20? ? ? ? ? ? ? ? wc >= (core ? corePoolSize : maximumPoolSize))

21? ? ? ? ? ? ? ? return false;

22? ? ? ? ? ? //(7-2)CAS增加線程數(shù)量,同時只有一個線程可以成功

23? ? ? ? ? ? if (compareAndIncrementWorkerCount(c))

24? ? ? ? ? ? ? ? break retry;

25? ? ? ? ? ? c = ctl.get();? // 重新讀取ctl值

26? ? ? ? ? ? //(7-3)CAS失敗了,需要查看當前線程池狀態(tài)是否發(fā)生變化,如果發(fā)生變化需要跳轉(zhuǎn)到外層循環(huán)嘗試重新獲取線程池狀態(tài),否則內(nèi)層循環(huán)重新進行CAS增加線程數(shù)量

27? ? ? ? ? ? if (runStateOf(c) != rs)

28? ? ? ? ? ? ? ? continue retry;

29? ? ? ? }

30? ? }

31

32? ? //(8)執(zhí)行到這里說明CAS增加新線程個數(shù)成功了,我們需要開始創(chuàng)建新的工作線程Worker

33? ? boolean workerStarted = false;

34? ? boolean workerAdded = false;

35? ? Worker w = null;

36? ? try {

37? ? ? ? //(8-1)創(chuàng)建新的worker

38? ? ? ? w = new Worker(firstTask);

39? ? ? ? final Thread t = w.thread;

40? ? ? ? if (t != null) {

41? ? ? ? ? ? final ReentrantLock mainLock = this.mainLock;

42? ? ? ? ? ? //(8-2)加獨占鎖,保證workers的同步,可能線程池中的多個線程調(diào)用了線程池的execute方法

43? ? ? ? ? ? mainLock.lock();

44? ? ? ? ? ? try {

45? ? ? ? ? ? ? ? // (8-3)重新檢查線程池狀態(tài),以免在獲取鎖之前調(diào)用shutdown方法改變線程池狀態(tài)

46? ? ? ? ? ? ? ? int rs = runStateOf(ctl.get());

47

48? ? ? ? ? ? ? ? if (rs < SHUTDOWN ||

49? ? ? ? ? ? ? ? ? ? (rs == SHUTDOWN && firstTask == null)) {

50? ? ? ? ? ? ? ? ? ? if (t.isAlive()) // precheck that t is startable

51? ? ? ? ? ? ? ? ? ? ? ? throw new IllegalThreadStateException();

52? ? ? ? ? ? ? ? ? ? //(8-4)添加新任務

53? ? ? ? ? ? ? ? ? ? workers.add(w);

54? ? ? ? ? ? ? ? ? ? int s = workers.size();

55? ? ? ? ? ? ? ? ? ? if (s > largestPoolSize)

56? ? ? ? ? ? ? ? ? ? ? ? largestPoolSize = s;

57? ? ? ? ? ? ? ? ? ? workerAdded = true;

58? ? ? ? ? ? ? ? }

59? ? ? ? ? ? } finally {

60? ? ? ? ? ? ? ? mainLock.unlock();

61? ? ? ? ? ? }

62? ? ? ? ? ? //(8-6)添加新任務成功之后,啟動任務

63? ? ? ? ? ? if (workerAdded) {

64? ? ? ? ? ? ? ? t.start();

65? ? ? ? ? ? ? ? workerStarted = true;

66? ? ? ? ? ? }

67? ? ? ? }

68? ? } finally {

69? ? ? ? if (! workerStarted)

70? ? ? ? ? ? addWorkerFailed(w);

71? ? }

72? ? return workerStarted;

73 }

簡單再分析說明一下上面的代碼,addWorker方法主要分為兩部分,第一部分是使用CAS線程安全的添加線程數(shù)量,第二部分則是創(chuàng)建新的線程并且將任務并發(fā)安全的添加到新的workers之中,然后啟動線程執(zhí)行。

①代碼(6)中檢查隊列是否只在必要時候為空,只有線程池狀態(tài)符合條件才能夠進行下面的步驟,從(6)中的判斷條件來看,下面的集中情況addWorker會直接返回false

( I )當前線程池狀態(tài)為STOP,TIDYING或者TERMINATED ; (I I)當前線程池狀態(tài)為SHUTDOWN并且已經(jīng)有了第一個任務; (I I I)當前線程池狀態(tài)為SHUTDOWN并且任務隊列為空

②外層循環(huán)中判斷條件通過之后,在內(nèi)層循環(huán)中使用CAS增加線程數(shù),當CAS成功就退出雙重循環(huán)進行(8)步驟代碼的執(zhí)行,如果失敗需要查看當前線程池的狀態(tài)是否發(fā)生變化,如果發(fā)生變化需要進行外層循環(huán)重新判斷線程池狀態(tài)然后在進入內(nèi)層循環(huán)重新進行CAS增加線程數(shù),如果線程池狀態(tài)沒有發(fā)生變化但是上一次CAS失敗就繼續(xù)進行CAS嘗試。

③執(zhí)行到(8)代碼處,表明當前已經(jīng)成功增加 了線程數(shù),但是還沒有線程執(zhí)行任務。ThreadPoolExecutor中使用全局獨占鎖mainLock來控制將新增的工作線程Worker線程安全的添加到工作者線程集合workers中。

④(8-2)獲取了獨占鎖,但是在獲取到鎖之后,還需要進行重新檢查線程池的狀態(tài),這是為了避免在獲取全局獨占鎖之前其他線程調(diào)用了shutDown方法關閉了線程池。如果線程池已經(jīng)關閉需要釋放鎖。否則將新增的線程添加到工作集合中,釋放鎖啟動線程執(zhí)行任務。

上面的addWorker方法最后幾行中,會判斷添加工作線程是否成功,如果失敗,會執(zhí)行addWorkerFailed方法,將任務從workers中移除,并且workerCount做-1操作。

1 private void addWorkerFailed(Worker w) {

2? ? final ReentrantLock mainLock = this.mainLock;

3? ? //獲取鎖

4? ? mainLock.lock();

5? ? try {

6? ? ? //如果worker不為null

7? ? ? if (w != null)

8? ? ? ? ? //workers移除worker

9? ? ? ? ? workers.remove(w);

10? ? ? //通過CAS操作,workerCount-1

11? ? ? decrementWorkerCount();

12? ? ? tryTerminate();

13? ? } finally {

14? ? ? //釋放鎖

15? ? ? mainLock.unlock();

16? ? }

17 }

四、工作線程Worker的執(zhí)行

(1)工作線程Worker類源碼分析

上面查看addWorker方法在CAS更新線程數(shù)成功之后,下面就是創(chuàng)建新的Worker線程執(zhí)行任務,所以我們這里先查看Worker類,下面是Worker類的源碼,我們可以看出,Worker類繼承了AQS并實現(xiàn)了Runnable接口,所以他既是一個自定義的同步組件,也是一個執(zhí)行任務的線程類。下面我們分析Worker類的執(zhí)行

1 private final class Worker

2? ? extends AbstractQueuedSynchronizer

3? ? implements Runnable

4 {

5

6? ? /** 使用線程工廠創(chuàng)建的線程,執(zhí)行任務 */

7? ? final Thread thread;

8? ? /** 初始化執(zhí)行任務 */

9? ? Runnable firstTask;

10? ? /** 計數(shù) */

11? ? volatile long completedTasks;

12

13? ? /**

14? ? ? * 給出初始firstTask,線程創(chuàng)建工廠創(chuàng)建新的線程

15? ? ? */

16? ? Worker(Runnable firstTask) {

17? ? ? ? setState(-1); // 防止在調(diào)用runWorker之前被中斷

18? ? ? ? this.firstTask = firstTask;

19? ? ? ? this.thread = getThreadFactory().newThread(this); //使用threadFactory創(chuàng)建線程

20? ? }

21

22? ? /** run方法實際上執(zhí)行的是runWorker方法? */

23? ? public void run() {

24? ? ? ? runWorker(this);

25? ? }

26

27? ? // 關于同步狀態(tài)(鎖)

28? ? //

29? ? // 同步狀態(tài)state=0表示鎖未被獲取

30? ? // 同步狀態(tài)state=1表示鎖被獲取

31

32? ? protected boolean isHeldExclusively() {

33? ? ? ? return getState() != 0;

34? ? }

35

36? ? //下面都是重寫AQS的方法,Worker為自定義的同步組件

37? ? protected boolean tryAcquire(int unused) {

38? ? ? ? if (compareAndSetState(0, 1)) {

39? ? ? ? ? ? setExclusiveOwnerThread(Thread.currentThread());

40? ? ? ? ? ? return true;

41? ? ? ? }

42? ? ? ? return false;

43? ? }

44

45? ? protected boolean tryRelease(int unused) {

46? ? ? ? setExclusiveOwnerThread(null);

47? ? ? ? setState(0);

48? ? ? ? return true;

49? ? }

50

51? ? public void lock()? ? ? ? { acquire(1); }

52? ? public boolean tryLock()? { return tryAcquire(1); }

53? ? public void unlock()? ? ? { release(1); }

54? ? public boolean isLocked() { return isHeldExclusively(); }

55

56? ? void interruptIfStarted() {

57? ? ? ? Thread t;

58? ? ? ? if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {

59? ? ? ? ? ? try {

60? ? ? ? ? ? ? ? t.interrupt();

61? ? ? ? ? ? } catch (SecurityException ignore) {

62? ? ? ? ? ? }

63? ? ? ? }

64? ? }

65 }

在構(gòu)造函數(shù)中我們可以看出,首先將同步狀態(tài)state置為-1,而Worker這個同步組件的state有三個值,其中state=-1表示W(wǎng)ork創(chuàng)建時候的默認狀態(tài),創(chuàng)建時候設置state=-1是為了防止runWorker方法運行前被中斷?前面說到過這個結(jié)論,這里置為-1是為了避免當前Worker在調(diào)用runWorker方法之前被中斷(當其他線程調(diào)用線程池的shutDownNow時候,如果Worker的state>=0則會中斷線程),設置為-1就不會被中斷了。而Worker實現(xiàn)Runnable接口,那么需要重寫run方法,在run方法中,我們可以看到,實際上執(zhí)行的是runWorker方法,在runWorker方法中,會首先調(diào)用unlock方法,該方法會將state置為0,所以這個時候調(diào)用shutDownNow方法就會中斷當前線程,而這個時候已經(jīng)進入了runWork方法了,就不會在還沒有執(zhí)行runWorker方法的時候就中斷線程。

(2)runWorker方法的源碼分析

1 final void runWorker(Worker w) {

2? ? Thread wt = Thread.currentThread();

3? ? Runnable task = w.firstTask;

4? ? w.firstTask = null;

5? ? w.unlock(); // 這個時候調(diào)用unlock方法,將state置為0,就可以被中斷了

6? ? boolean completedAbruptly = true;

7? ? try {

8? ? ? ? //(10)如果當前任務為null,或者從任務隊列中獲取到的任務為null,就跳轉(zhuǎn)到(11)處執(zhí)行清理工作

9? ? ? ? while (task != null || (task = getTask()) != null) {

10? ? ? ? ? ? //task不為null,就需要線程執(zhí)行任務,這個時候,需要獲取工作線程內(nèi)部持有的獨占鎖

11? ? ? ? ? ? w.lock();

12? ? ? ? ? ? /**如果線程池已被停止(STOP)(至少大于STOP狀態(tài)),要確保線程都被中斷

13? ? ? ? ? ? ? * 如果狀態(tài)不對,檢查當前線程是否中斷并清除中斷狀態(tài),并且再次檢查線程池狀態(tài)是否大于STOP

14? ? ? ? ? ? ? * 如果上述滿足,檢查該對象是否處于中斷狀態(tài),不清除中斷標記

15? ? ? ? ? ? ? */

16? ? ? ? ? ? if ((runStateAtLeast(ctl.get(), STOP) ||

17? ? ? ? ? ? ? ? ? (Thread.interrupted() &&

18? ? ? ? ? ? ? ? ? runStateAtLeast(ctl.get(), STOP))) &&

19? ? ? ? ? ? ? ? !wt.isInterrupted())

20? ? ? ? ? ? ? ? //中斷該對象

21? ? ? ? ? ? ? ? wt.interrupt();

22? ? ? ? ? ? try {

23? ? ? ? ? ? ? ? //執(zhí)行任務之前要做的事情

24? ? ? ? ? ? ? ? beforeExecute(wt, task);

25? ? ? ? ? ? ? ? Throwable thrown = null;

26? ? ? ? ? ? ? ? try {

27? ? ? ? ? ? ? ? ? ? task.run(); //執(zhí)行任務

28? ? ? ? ? ? ? ? } catch (RuntimeException x) {

29? ? ? ? ? ? ? ? ? ? thrown = x; throw x;

30? ? ? ? ? ? ? ? } catch (Error x) {

31? ? ? ? ? ? ? ? ? ? thrown = x; throw x;

32? ? ? ? ? ? ? ? } catch (Throwable x) {

33? ? ? ? ? ? ? ? ? ? thrown = x; throw new Error(x);

34? ? ? ? ? ? ? ? } finally {

35? ? ? ? ? ? ? ? ? ? //執(zhí)行任務之后的方法

36? ? ? ? ? ? ? ? ? ? afterExecute(task, thrown);

37? ? ? ? ? ? ? ? }

38? ? ? ? ? ? } finally {

39? ? ? ? ? ? ? ? task = null;

40? ? ? ? ? ? ? ? //更新當前已完成任務數(shù)量

41? ? ? ? ? ? ? ? w.completedTasks++;

42? ? ? ? ? ? ? ? //釋放鎖

43? ? ? ? ? ? ? ? w.unlock();

44? ? ? ? ? ? }

45? ? ? ? }

46? ? ? ? completedAbruptly = false;

47? ? } finally {

48? ? ? ? //執(zhí)行清理工作:處理并退出當前worker

49? ? ? ? processWorkerExit(w, completedAbruptly);

50? ? }

51 }

我們梳理一下runWorker方法的執(zhí)行流程

①首先先執(zhí)行unlock方法,將Worker的state置為0,這樣工作線程就可以被中斷了(后續(xù)的操作如果線程池關閉就需要線程被中斷)

②首先判斷判斷當前的任務(當前工作線程中的task,或者從任務隊列中取出的task)是否為null,如果不為null就往下執(zhí)行,為null就執(zhí)行processWorkerExit方法。

③獲取工作線程內(nèi)部持有的獨占鎖(避免在執(zhí)行任務期間,其他線程調(diào)用shutdown后正在執(zhí)行的任務被中斷,shutdown只會中斷當前被阻塞掛起的沒有執(zhí)行任務的線程)

④然后執(zhí)行beforeExecute()方法,該方法為擴展接口代碼,表示在具體執(zhí)行任務之前所做的一些事情,然后執(zhí)行task.run()方法執(zhí)行具體任務,執(zhí)行完之后會調(diào)用afterExecute()方法,用以處理任務執(zhí)行完畢之后的工作,也是一個擴展接口代碼。

⑤更新當前線程池完成的任務數(shù),并釋放鎖

(3)執(zhí)行清理工作的方法processWorkerExit

下面是方法processWorkerExit的源碼,在下面的代碼中

①首先(1-1)處統(tǒng)計線程池完成的任務個數(shù),并且在此之前獲取全局鎖,然后更新當前的全局計數(shù)器,然后從工作線程集合中移除當前工作線程,完成清理工作。

②代碼(1-2)調(diào)用了tryTerminate 方法,在該方法中,判斷了當前線程池狀態(tài)是SHUTDOWN并且隊列不為空或者當前線程池狀態(tài)為STOP并且當前線程池中沒有活動線程,則置線程池狀態(tài)為TERMINATED。如果設置稱為了TERMINATED狀態(tài),還需要調(diào)用全局條件變量termination的signalAll方法喚醒所有因為調(diào)用線程池的awaitTermination方法而被阻塞住的線程,使得線程池中的所有線程都停止,從而使得線程池為TERMINATED狀態(tài)。

③代碼(1-3)處判斷當前線程池中的線程個數(shù)是否小于核心線程數(shù),如果是,需要新增一個線程保證有足夠的線程可以執(zhí)行任務隊列中的任務或者提交的任務。

1 private void processWorkerExit(Worker w, boolean completedAbruptly) {

2? ? /*

3? ? *completedAbruptly:是由runWorker傳過來的參數(shù),表示是否突然完成的意思

4? ? *當在就是在執(zhí)行任務過程當中出現(xiàn)異常,就會突然完成,傳true

5? ? *

6? ? *如果是突然完成,需要通過CAS操作,更新workerCount(-1操作)

7? ? *不是突然完成,則不需要-1,因為getTask方法當中已經(jīng)-1(getTask方法中執(zhí)行了decrementWorkerCount()方法)

8? ? */

9? ? if (completedAbruptly)

10? ? ? ? decrementWorkerCount();

11? ? //(1-1)在統(tǒng)計完成任務個數(shù)之前加上全局鎖,然后統(tǒng)計線程池中完成的任務個數(shù)并更新全局計數(shù)器,并從工作集中刪除當前worker

12? ? final ReentrantLock mainLock = this.mainLock;

13? ? mainLock.lock(); //獲得全局鎖

14? ? try {

15? ? ? ? completedTaskCount += w.completedTasks; //更新已完成的任務數(shù)量

16? ? ? ? workers.remove(w); //將完成該任務的線程worker從工作線程集合中移除

17? ? } finally {

18? ? ? ? mainLock.unlock(); //釋放鎖

19? ? }

20? ? /**(1-2)

21? ? ? * 這一個方法調(diào)用完成了下面的事情:

22? ? ? * 判斷如果當前線程池狀態(tài)是SHUTDOWN并且工作隊列為空,

23? ? ? * 或者當前線程池狀態(tài)是STOP并且當前線程池里面沒有活動線程,

24? ? ? * 則設置當前線程池狀態(tài)為TERMINATED,如果設置成了TERMINATED狀態(tài),

25? ? ? * 還需要調(diào)用條件變量termination的signAll方法激活所有因為調(diào)用線程池的awaitTermination方法而被阻塞的線程

26? ? ? */

27? ? tryTerminate();

28

29? ? //(1-3)如果當前線程池中線程數(shù)小于核心線程,則增加核心線程數(shù)

30? ? int c = ctl.get();

31? ? //判斷當前線程池的狀態(tài)是否小于STOP(RUNNING或者SHUTDOWN)

32? ? if (runStateLessThan(c, STOP)) {

33? ? ? ? //如果任務忽然完成,執(zhí)行后續(xù)的代碼

34? ? ? ? if (!completedAbruptly) {

35? ? ? ? ? ? //allowCoreThreadTimeOut表示是否允許核心線程超時,默認為false

36? ? ? ? ? ? //min這里當默認為allowCoreThreadTimeOut默認為false的時候,min置為coorPoolSize

37? ? ? ? ? ? int min = allowCoreThreadTimeOut ? 0 : corePoolSize;

38? ? ? ? ? ? //這里說明:如果允許核心線程超時,那么allowCoreThreadTimeOut可為true,那么min值為0,不需要維護核心線程了

39? ? ? ? ? ? //如果min為0并且任務隊列不為空

40? ? ? ? ? ? if (min == 0 && ! workQueue.isEmpty())

41? ? ? ? ? ? ? ? min = 1; //這里表示如果min為0,且隊列不為空,那么至少需要一個核心線程存活來保證任務的執(zhí)行

42? ? ? ? ? ? //如果工作線程數(shù)大于min,表示當前線程數(shù)滿足,直接返回

43? ? ? ? ? ? if (workerCountOf(c) >= min)

44? ? ? ? ? ? ? ? return; // replacement not needed

45? ? ? ? }

46? ? ? ? addWorker(null, false);

47? ? }

48 }

在tryTerminate 方法中,我們簡單說明了該方法的作用,下面是該方法的源碼,可以看出源碼實現(xiàn)上和上面所總結(jié)的功能是差不多的

1 final void tryTerminate() {

2? ? for (;;) {

3? ? ? ? //獲取線程池狀態(tài)

4? ? ? ? int c = ctl.get();

5? ? ? ? //如果線程池狀態(tài)為RUNNING

6? ? ? ? //或者狀態(tài)大于TIDYING

7? ? ? ? //或者狀態(tài)==SHUTDOWN并未任務隊列不為空

8? ? ? ? //直接返回,不能調(diào)用terminated方法

9? ? ? ? if (isRunning(c) ||

10? ? ? ? ? ? runStateAtLeast(c, TIDYING) ||

11? ? ? ? ? ? (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))

12? ? ? ? ? ? return;

13? ? ? ? //如果線程池中工作線程數(shù)不為0,需要中斷線程

14? ? ? ? if (workerCountOf(c) != 0) { // Eligible to terminate

15? ? ? ? ? ? interruptIdleWorkers(ONLY_ONE);

16? ? ? ? ? ? return;

17? ? ? ? }

18? ? ? ? //獲得線程池的全局鎖

19? ? ? ? final ReentrantLock mainLock = this.mainLock;

20? ? ? ? mainLock.lock();

21? ? ? ? try {

22? ? ? ? ? ? //通過CAS操作,將線程池狀態(tài)設置為TIDYING

23? ? ? ? ? ? if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { //private static int ctlOf(int rs, int wc) { return rs | wc; }

24? ? ? ? ? ? ? ? try {

25? ? ? ? ? ? ? ? ? ? //調(diào)用terminated方法

26? ? ? ? ? ? ? ? ? ? terminated();

27? ? ? ? ? ? ? ? } finally {

28? ? ? ? ? ? ? ? ? ? //最終將線程狀態(tài)設置為TERMINATED

29? ? ? ? ? ? ? ? ? ? ctl.set(ctlOf(TERMINATED, 0));

30? ? ? ? ? ? ? ? ? ? //調(diào)用條件變量termination的signaAll方法喚醒所有因為

31? ? ? ? ? ? ? ? ? ? //調(diào)用線程池的awaitTermination方法而被阻塞的線程

32? ? ? ? ? ? ? ? ? ? //private final Condition termination = mainLock.newCondition();

33? ? ? ? ? ? ? ? ? ? termination.signalAll();

34? ? ? ? ? ? ? ? }

35? ? ? ? ? ? ? ? return;

36? ? ? ? ? ? }

37? ? ? ? } finally {

38? ? ? ? ? ? mainLock.unlock();

39? ? ? ? }

40? ? ? ? // else retry on failed CAS

41? ? }

42 }

五、補充(shutdown、shutdownNow、awaitTermination方法)

(1)shutdown操作

我們在使用線程池的時候知道,調(diào)用shutdown方法之后線程池就不會再接受新的任務了,但是任務隊列中的任務還是需要執(zhí)行完的。調(diào)用該方法會立刻返回,并不是等到線程池的任務隊列中的所有任務執(zhí)行完畢在返回的。

1 public void shutdown() {

2? ? //獲得線程池的全局鎖

3? ? final ReentrantLock mainLock = this.mainLock;

4? ? mainLock.lock();

5? ? try {

6? ? ? ? //進行權(quán)限檢查

7? ? ? ? checkShutdownAccess();

8? ? ? ?

9? ? ? ? //設置當前線程池的狀態(tài)的SHUTDOWN,如果線程池狀態(tài)已經(jīng)是該狀態(tài)就會直接返回,下面我們會分析這個方法的源碼

10? ? ? ? advanceRunState(SHUTDOWN);

11? ? ? ?

12? ? ? ? //設置中斷 標志

13? ? ? ? interruptIdleWorkers();

14? ? ? ? onShutdown(); // hook for ScheduledThreadPoolExecutor

15? ? } finally {

16? ? ? ? mainLock.unlock();

17? ? }

18? ? //嘗試將狀態(tài)變?yōu)門ERMINATED,上面已經(jīng)分析過該方法的源碼

19? ? tryTerminate();

20 }

該方法的源碼比較簡短,首先檢查了安全管理器,是查看當前調(diào)用shutdown命令的線程是否有關閉線程的權(quán)限,如果有權(quán)限還需要看調(diào)用線程是否有中斷工作線程的權(quán)限,如果沒有權(quán)限將會拋出SecurityException異?;蛘呖罩羔槷惓!O旅嫖覀儾榭匆幌耡dvanceRunState 方法的源碼。

1 private void advanceRunState(int targetState) {

2? ? for (;;) {

3? ? ? ? //下面的方法執(zhí)行的就是:

4? ? ? ? //首先獲取線程的ctl值,然后判斷當前線程池的狀態(tài)如果已經(jīng)是SHUTDOWN,那么if條件第一個為真就直接返回

5? ? ? ? //如果不是SHUTDOWN狀態(tài),就需要CAS的設置當前狀態(tài)為SHUTDOWN

6? ? ? ? int c = ctl.get();

7? ? ? ? if (runStateAtLeast(c, targetState) ||

8? ? ? ? ? ? //private static int ctlOf(int rs, int wc) { return rs | wc; }

9? ? ? ? ? ? ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))

10? ? ? ? ? ? break;

11? ? }

12 }

我們可以看出advanceRunState 方法實際上就是判斷當前線程池的狀態(tài)是否為SHUTDWON,如果是那么就返回,否則就需要設置當前狀態(tài)為SHUTDOWN。

我們再來看看shutdown方法中調(diào)用線程中斷的方法interruptIdleWorkers源碼

1 private void interruptIdleWorkers() {

2? ? interruptIdleWorkers(false);

3 }

4 private void interruptIdleWorkers(boolean onlyOne) {

5? ? final ReentrantLock mainLock = this.mainLock;

6? ? mainLock.lock();

7? ? try {

8? ? ? ? for (Worker w : workers) {

9? ? ? ? ? ? Thread t = w.thread;

10? ? ? ? ? ? //如果工作線程沒有被中斷,并且沒有正在運行設置中斷標志

11? ? ? ? ? ? if (!t.isInterrupted() && w.tryLock()) {

12? ? ? ? ? ? ? ? try {

13? ? ? ? ? ? ? ? ? ? //需要中斷當前線程

14? ? ? ? ? ? ? ? ? ? t.interrupt();

15? ? ? ? ? ? ? ? } catch (SecurityException ignore) {

16? ? ? ? ? ? ? ? } finally {

17? ? ? ? ? ? ? ? ? ? w.unlock();

18? ? ? ? ? ? ? ? }

19? ? ? ? ? ? }

20? ? ? ? ? ? if (onlyOne)

21? ? ? ? ? ? ? ? break;

22? ? ? ? }

23? ? } finally {

24? ? ? ? mainLock.unlock();

25? ? }

26 }

上面的代碼中,需要設置所有空閑線程的中斷標志。首先獲取線程池的全局鎖,同時只有一個線程可以調(diào)用shutdown方法設置中斷標志。然后嘗試獲取工作線程Worker自己的鎖,獲取成功則可以設置中斷標志(這是由于正在執(zhí)行任務的線程需要獲取自己的鎖,并且不可重入,所以正在執(zhí)行的任務沒有被中斷),這里要中斷的那些線程是阻塞到getTask()方法并嘗試從任務隊列中獲取任務的線程即空閑線程。

(2)shutdownNow操作

在使用線程池的時候,如果我們調(diào)用了shutdownNow方法,線程池不僅不會再接受新的任務,還會將任務隊列中的任務丟棄,正在執(zhí)行的任務也會被中斷,然后立刻返回該方法,不會等待激活的任務完成,返回值為當前任務隊列中被丟棄的任務列表

1 public List<Runnable> shutdownNow() {

2? ? List<Runnable> tasks;

3? ? final ReentrantLock mainLock = this.mainLock;

4? ? mainLock.lock();

5? ? try {

6? ? ? ? checkShutdownAccess(); //還是進行權(quán)限檢查

7? ? ? ? advanceRunState(STOP); //設置線程池狀態(tài)臺STOP

8? ? ? ? interruptWorkers(); //中斷所有線程

9? ? ? ? tasks = drainQueue(); //將任務隊列中的任務移動到task中

10? ? } finally {

11? ? ? ? mainLock.unlock();

12? ? }

13? ? tryTerminate();

14? ? return tasks; //返回tasks

15 }

從上面的代碼中,我們可以可以發(fā)現(xiàn),shutdownNow方法也是首先需要檢查調(diào)用該方法的線程的權(quán)限,之后不同于shutdown方法之處在于需要即刻設置當前線程池狀態(tài)為STOP,然后中斷所有線程(空閑線程+正在執(zhí)行任務的線程),移除任務隊列中的任務

1 private void interruptWorkers() {

2? ? final ReentrantLock mainLock = this.mainLock;

3? ? mainLock.lock();

4? ? try {

5? ? ? ? for (Worker w : workers) //不需要判斷當前線程是否在執(zhí)行任務(即不需要調(diào)用w.tryLock方法),中斷所有線程

6? ? ? ? ? ? w.interruptIfStarted();

7? ? } finally {

8? ? ? ? mainLock.unlock();

9? ? }

10 }

(3)awaitTermination操作

當線程調(diào)用該方法之后,會阻塞調(diào)用者線程,直到線程池狀態(tài)為TERMINATED狀態(tài)才會返回,或者等到超時時間到之后會返回,下面是該方法的源碼。

1 //調(diào)用該方法之后,會阻塞調(diào)用者線程,直到線程池狀態(tài)為TERMINATED狀態(tài)才會返回,或者等到超時時間到之后會返回

2 public boolean awaitTermination(long timeout, TimeUnit unit)

3? ? throws InterruptedException {

4? ? long nanos = unit.toNanos(timeout);

5? ? final ReentrantLock mainLock = this.mainLock;

6? ? mainLock.lock();

7? ? try {

8? ? ? ? //阻塞當前線程,(獲取了Worker自己的鎖),那么當前線程就不會再執(zhí)行任務(因為獲取不到鎖)

9? ? ? ? for (;;) {

10? ? ? ? ? ? //當前線程池狀態(tài)為TERMINATED狀態(tài),會返回true

11? ? ? ? ? ? if (runStateAtLeast(ctl.get(), TERMINATED))

12? ? ? ? ? ? ? ? return true;

13? ? ? ? ? ? //超時時間到返回false

14? ? ? ? ? ? if (nanos <= 0)

15? ? ? ? ? ? ? ? return false;

16? ? ? ? ? ? nanos = termination.awaitNanos(nanos);

17? ? ? ? }

18? ? } finally {

19? ? ? ? mainLock.unlock();

20? ? }

21 }

在上面的代碼中,調(diào)用者線程需要首先獲取線程Worker 自己的獨占鎖,然后在循環(huán)判斷當前線程池是否已經(jīng)是TERMINATED狀態(tài),如果是則直接返回,否則說明當前線程池中還有線程正在執(zhí)行任務,這時候需要查看當前設置的超時時間是否小于0,小于0說明不需要等待就直接返回,如果大于0就需要調(diào)用條件變量termination的awaitNanos方法等待設置的時間,并在這段時間之內(nèi)等待線程池的狀態(tài)變?yōu)門ERMINATED。

我們在前面說到清理線程池的方法processWorkerExit的時候,需要調(diào)用tryTerminated方法,在該方法中會查看當前線程池狀態(tài)是否為TERMINATED,如果是該狀態(tài)也會調(diào)用termination.signalAll()方法喚醒所有線程池中因調(diào)用awaitTermination而被阻塞住的線程。

如果是設置了超時時間,那么termination的awaitNanos方法也會返回,這時候需要重新檢查線程池狀態(tài)是否為TERMINATED,如果是則返回,不是就繼續(xù)阻塞自己。

歡迎工作一到五年的Java工程師朋友們加入Java程序員開發(fā): 721575865

群內(nèi)提供免費的Java架構(gòu)學習資料(里面有高可用、高并發(fā)、高性能及分布式、Jvm性能調(diào)優(yōu)、Spring源碼,MyBatis,Netty,Redis,Kafka,Mysql,Zookeeper,Tomcat,Docker,Dubbo,Nginx等多個知識點的架構(gòu)資料)合理利用自己每一分每一秒的時間來學習提升自己,不要再用"沒有時間“來掩飾自己思想上的懶惰!趁年輕,使勁拼,給未來的自己一個交代!

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容