面試官:小伙子,你給我說(shuō)一下線程池的線程復(fù)用原理吧

前言

前兩天和粉絲聊天的時(shí)候,粉絲問(wèn)了我一個(gè)挺有意思的問(wèn)題,說(shuō)他之前在面試的時(shí)候被問(wèn)到線程池的線程復(fù)用原理,當(dāng)時(shí)我跟他簡(jiǎn)單的說(shuō)了一下,沒(méi)想到過(guò)了幾天又來(lái)問(wèn)我這個(gè)問(wèn)題了,說(shuō)他最近又被問(wèn)到了這個(gè)問(wèn)題.......想了想,干脆寫篇文章把這個(gè)東西講清楚吧,滿滿的干貨都放在下面了

1.什么是線程復(fù)用?

在線程池中,通過(guò)同一個(gè)線程去執(zhí)行不同的任務(wù),這就是線程復(fù)用。

假設(shè)現(xiàn)在有 100 個(gè)任務(wù),我們創(chuàng)建一個(gè)固定線程的線程池(FixedThreadPool),核心線程數(shù)和最大線程數(shù)都是 3,那么當(dāng)這個(gè) 100 個(gè)任務(wù)執(zhí)行完,都只會(huì)使用三個(gè)線程。

示例:

public class FixedThreadPoolDemo {

    static ExecutorService executorService = Executors.newFixedThreadPool(3);

    public static void main(String[] args) {
        for (int i = 0; i < 100; i++) {
            executorService.execute(() -> {
                System.out.println(Thread.currentThread().getName() + "-> 執(zhí)行");
            });
        }
        // 關(guān)閉線程池
        executorService.shutdown();
    }

}

執(zhí)行結(jié)果:

pool-1-thread-1-> 執(zhí)行
pool-1-thread-2-> 執(zhí)行
pool-1-thread-3-> 執(zhí)行
pool-1-thread-1-> 執(zhí)行
pool-1-thread-3-> 執(zhí)行
pool-1-thread-2-> 執(zhí)行
pool-1-thread-3-> 執(zhí)行
pool-1-thread-1-> 執(zhí)行
...

2.線程復(fù)用的原理

線程池將線程和任務(wù)進(jìn)行解耦,線程是線程,任務(wù)是任務(wù),擺脫了之前通過(guò) Thread 創(chuàng)建線程時(shí)的一個(gè)線程必須對(duì)應(yīng)一個(gè)任務(wù)的限制。

在線程池中,同一個(gè)線程可以從阻塞隊(duì)列中不斷獲取新任務(wù)來(lái)執(zhí)行,其核心原理在于線程池對(duì) Thread 進(jìn)行了封裝,并不是每次執(zhí)行任務(wù)都會(huì)調(diào)用 Thread.start() 來(lái)創(chuàng)建新線程,而是讓每個(gè)線程去執(zhí)行一個(gè)“循環(huán)任務(wù)”,在這個(gè)“循環(huán)任務(wù)”中不停的檢查是否有任務(wù)需要被執(zhí)行,如果有則直接執(zhí)行,也就是調(diào)用任務(wù)中的 run 方法,將 run 方法當(dāng)成一個(gè)普通的方法執(zhí)行,通過(guò)這種方式將只使用固定的線程就將所有任務(wù)的 run 方法串聯(lián)起來(lái)。

3.線程池執(zhí)行流程

這部分內(nèi)容在 Java 線程池的各個(gè)參數(shù)的含義 討論過(guò),這里我們?cè)購(gòu)?fù)習(xí)一次,再?gòu)闹腥チ私饩€程復(fù)用。

3.1 流程圖

3.2 線程創(chuàng)建的流程

當(dāng)任務(wù)提交之后,線程池首先會(huì)檢查當(dāng)前線程數(shù),如果當(dāng)前的線程數(shù)小于核心線程數(shù)(corePoolSize),比如最開(kāi)始創(chuàng)建的時(shí)候線程數(shù)為 0,則新建線程并執(zhí)行任務(wù)。
當(dāng)提交的任務(wù)不斷增加,創(chuàng)建的線程數(shù)等于核心線程數(shù)(corePoolSize),新增的任務(wù)會(huì)被添加到 workQueue 任務(wù)隊(duì)列中,等待核心線程執(zhí)行完當(dāng)前任務(wù)后,重新從 workQueue 中獲取任務(wù)執(zhí)行。
假設(shè)任務(wù)非常多,達(dá)到了 workQueue 的最大容量,但是當(dāng)前線程數(shù)小于最大線程數(shù)(maximumPoolSize),線程池會(huì)在核心線程數(shù)(corePoolSize)的基礎(chǔ)上繼續(xù)創(chuàng)建線程來(lái)執(zhí)行任務(wù)。
假設(shè)任務(wù)繼續(xù)增加,線程池的線程數(shù)達(dá)到最大線程數(shù)(maximumPoolSize),如果任務(wù)繼續(xù)增加,這個(gè)時(shí)候線程池就會(huì)采用拒絕策略來(lái)拒絕這些任務(wù)。
在任務(wù)不斷增加的過(guò)程中,線程池會(huì)逐一進(jìn)行以下 4 個(gè)方面的判斷

核心線程數(shù)(corePoolSize)
任務(wù)隊(duì)列(workQueue)
最大線程數(shù)(maximumPoolSize)
拒絕策略

3.3 ThreadPoolExecutor#execute 源碼分析

java.util.concurrent.ThreadPoolExecutor#execute

 public void execute(Runnable command) {
     // 如果傳入的Runnable的空,就拋出異常
     if (command == null)
         throw new NullPointerException();
     int c = ctl.get();
     // 線程池中的線程比核心線程數(shù)少 
     if (workerCountOf(c) < corePoolSize) {
         // 新建一個(gè)核心線程執(zhí)行任務(wù)
         if (addWorker(command, true))
             return;
         c = ctl.get();
     }
     // 核心線程已滿,但是任務(wù)隊(duì)列未滿,添加到隊(duì)列中
     if (isRunning(c) && workQueue.offer(command)) {
         int recheck = ctl.get();
         // 任務(wù)成功添加到隊(duì)列以后,再次檢查是否需要添加新的線程,因?yàn)橐汛嬖诘木€程可能被銷毀了
         if (! isRunning(recheck) && remove(command))
             // 如果線程池處于非運(yùn)行狀態(tài),并且把當(dāng)前的任務(wù)從任務(wù)隊(duì)列中移除成功,則拒絕該任務(wù)
             reject(command);
         else if (workerCountOf(recheck) == 0)
             // 如果之前的線程已經(jīng)被銷毀完,新建一個(gè)非核心線程
             addWorker(null, false);
     }
     // 核心線程池已滿,隊(duì)列已滿,嘗試創(chuàng)建一個(gè)非核心新的線程
     else if (!addWorker(command, false))
         // 如果創(chuàng)建新線程失敗,說(shuō)明線程池關(guān)閉或者線程池滿了,拒絕任務(wù)
         reject(command);
 }

3.4 逐行分析

//如果傳入的Runnable的空,就拋出異常        
if (command == null)
   throw new NullPointerException();

execute 方法中通過(guò) if 語(yǔ)句判斷 command ,也就是 Runnable 任務(wù)是否等于 null,如果為 null 就拋出異常。

if (workerCountOf(c) < corePoolSize) { 
    if (addWorker(command, true)) 
        return;
        c = ctl.get();
}

判斷當(dāng)前線程數(shù)是否小于核心線程數(shù),如果小于核心線程數(shù)就調(diào)用 addWorker() 方法增加一個(gè) Worker,這里的 Worker 就可以理解為一個(gè)線程。

addWorker 方法的主要作用是在線程池中創(chuàng)建一個(gè)線程并執(zhí)行傳入的任務(wù),如果返回 true 代表添加成功,如果返回 false 代表添加失敗。

第一個(gè)參數(shù)表示傳入的任務(wù)

第二個(gè)參數(shù)是個(gè)布爾值,如果布爾值傳入 true 代表增加線程時(shí)判斷當(dāng)前線程是否少于 corePoolSize,小于則增加新線程(核心線程),大于等于則不增加;同理,如果傳入 false 代表增加線程時(shí)判斷當(dāng)前線程是否少于 maximumPoolSize,小于則增加新線程(非核心線程),大于等于則不增加,所以這里的布爾值的含義是以核心線程數(shù)為界限還是以最大線程數(shù)為界限進(jìn)行是否新增非核心線程的判斷

這一段判斷相關(guān)源碼如下

    private boolean addWorker(Runnable firstTask, boolean core) {     
                ...
                int wc = workerCountOf(c);//當(dāng)前工作線程數(shù)
                //判斷當(dāng)前工作線程數(shù)>=最大線程數(shù) 或者 >=核心線程數(shù)(當(dāng)core = true)
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                ...

最核心的就是 core ? corePoolSize : maximumPoolSize 這個(gè)三目運(yùn)算。

      // 核心線程已滿,但是任務(wù)隊(duì)列未滿,添加到隊(duì)列中
      if (isRunning(c) && workQueue.offer(command)) {
          int recheck = ctl.get();
          // 任務(wù)成功添加到隊(duì)列以后,再次檢查是否需要添加新的線程,因?yàn)橐汛嬖诘木€程可能被銷毀了
          if (! isRunning(recheck) && remove(command))
              // 如果線程池處于非運(yùn)行狀態(tài),并且把當(dāng)前的任務(wù)從任務(wù)隊(duì)列中移除成功,則拒絕該任務(wù)
              reject(command);
          else if (workerCountOf(recheck) == 0)
              // 如果之前的線程已經(jīng)被銷毀完,新建一個(gè)非核心線程
              addWorker(null, false);
      }

如果代碼執(zhí)行到這里,說(shuō)明當(dāng)前線程數(shù)大于或等于核心線程數(shù)或者 addWorker 失敗了,那么就需要通過(guò)

if (isRunning(c) && workQueue.offer(command)) 檢查線程池狀態(tài)是否為 Running,如果線程池狀態(tài)是 Running 就通過(guò) workQueue.offer(command) 將任務(wù)放入任務(wù)隊(duì)列中,

任務(wù)成功添加到隊(duì)列以后,再次檢查線程池狀態(tài),如果線程池不處于 Running 狀態(tài),說(shuō)明線程池被關(guān)閉,那么就移除剛剛添加到任務(wù)隊(duì)列中的任務(wù),并執(zhí)行拒絕策略,代碼如下:

            if (! isRunning(recheck) && remove(command))
                // 如果線程池處于非運(yùn)行狀態(tài),并且把當(dāng)前的任務(wù)從任務(wù)隊(duì)列中移除成功,則拒絕該任務(wù)
                reject(command);

下面我們?cè)賮?lái)看后一個(gè) else 分支:

            else if (workerCountOf(recheck) == 0)
                // 如果之前的線程已經(jīng)被銷毀完,新建一個(gè)非核心線程
                addWorker(null, false);

進(jìn)入這個(gè) else 說(shuō)明前面判斷到線程池狀態(tài)為 Running,那么當(dāng)任務(wù)被添加進(jìn)來(lái)之后就需要防止沒(méi)有可執(zhí)行線程的情況發(fā)生(比如之前的線程被回收了或意外終止了),所以此時(shí)如果檢查當(dāng)前線程數(shù)為 0,也就是 workerCountOf(recheck) == 0,那就執(zhí)行 addWorker() 方法新建一個(gè)非核心線程。

我們?cè)賮?lái)看最后一部分代碼:

        // 核心線程池已滿,隊(duì)列已滿,嘗試創(chuàng)建一個(gè)非核心新的線程
        else if (!addWorker(command, false))
            // 如果創(chuàng)建新線程失敗,說(shuō)明線程池關(guān)閉或者線程池滿了,拒絕任務(wù)
            reject(command);

執(zhí)行到這里,說(shuō)明線程池不是 Running 狀態(tài),又或者線程數(shù) >= 核心線程數(shù)并且任務(wù)隊(duì)列已經(jīng)滿了,根據(jù)規(guī)則,此時(shí)需要添加新線程,直到線程數(shù)達(dá)到“最大線程數(shù)”,所以此時(shí)就會(huì)再次調(diào)用 addWorker 方法并將第二個(gè)參數(shù)傳入 false,傳入 false 代表增加非核心線程。

addWorker 方法如果返回 true 代表添加成功,如果返回 false 代表任務(wù)添加失敗,說(shuō)明當(dāng)前線程數(shù)已經(jīng)達(dá)到 maximumPoolSize,然后執(zhí)行拒絕策略 reject 方法。

如果執(zhí)行到這里線程池的狀態(tài)不是 Running,那么 addWorker 會(huì)失敗并返回 false,所以也會(huì)執(zhí)行拒絕策略 reject 方法。

4.線程復(fù)用源碼分析

java.util.concurrent.ThreadPoolExecutor#runWorker
省略掉部分和復(fù)用無(wú)關(guān)的代碼之后,代碼如下:

    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // 釋放鎖 設(shè)置work的state=0 允許中斷
        boolean completedAbruptly = true;
        try {
            //一直執(zhí)行 如果task不為空 或者 從隊(duì)列中獲取的task不為空
            while (task != null || (task = getTask()) != null) {
                    task.run();//執(zhí)行task中的run方法
                }
            }
            completedAbruptly = false;
        } finally {
            //1.將 worker 從數(shù)組 workers 里刪除掉
            //2.根據(jù)布爾值 allowCoreThreadTimeOut 來(lái)決定是否補(bǔ)充新的 Worker 進(jìn)數(shù)組 workers
            processWorkerExit(w, completedAbruptly);
        }
    }

可以看到,實(shí)現(xiàn)線程復(fù)用的邏輯主要在一個(gè)不停循環(huán)的 while 循環(huán)體中。

通過(guò)獲取 Worker 的 firstTask 或者通過(guò) getTask 方法從 workQueue 中獲取待執(zhí)行的任務(wù)

直接通過(guò) task.run() 來(lái)執(zhí)行具體的任務(wù)(而不是新建線程)

在這里,我們找到了線程復(fù)用最終的實(shí)現(xiàn),通過(guò)取 Worker 的 firstTask 或者 getTask 方法從 workQueue 中取出了新任務(wù),并直接調(diào)用 Runnable 的 run 方法來(lái)執(zhí)行任務(wù),也就是如之前所說(shuō)的,每個(gè)線程都始終在一個(gè)大循環(huán)中,反復(fù)獲取任務(wù),然后執(zhí)行任務(wù),從而實(shí)現(xiàn)了線程的復(fù)用。

總結(jié)

這篇關(guān)于線程池的線程復(fù)用原理的文章就到這里了,大家看完有什么不懂的歡迎在下方留言評(píng)論,也可以私信問(wèn)我,我看到了一般都會(huì)回復(fù)的!

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容