Java 線程池原理分析

池化技術(shù)簡單來說就是提前準備資源,當需要的時候可以直接獲取,避免了在需要大量資源時因創(chuàng)建資源耗時而導致阻塞和過高的時延。常見的有線程池,連接池和內(nèi)存池。

Java的線程池就是提前創(chuàng)建一定數(shù)量的線程,當需要線程處理相應(yīng)工作的時候直接喚醒獲取即可,從而增加系統(tǒng)的處理速度。

1. Java 中ThreadPoolExecutor

1.1 ThreadPoolExecutor構(gòu)造參數(shù)說明

Java中常用的線程池類為ThreadPoolExecutor,其繼承關(guān)系如下:

01.png

Executor和ExecutorService接口的聲明確定了ThreadPoolExecutor有兩種方式提交task——execute和submit。二者的區(qū)別在于:

  • execute:通過execute提交的task之后不會有返回值,提交的task也沒有返回值;

  • submit:通過submit提交task之后會返回一個Future對象。獲取到這個對象之后,我們可以對提交task進行相應(yīng)的操作。例如,取消,判斷是否結(jié)束,阻塞獲取task返回的結(jié)果等等。

再看ThreadPoolExecutor,這個類有三個構(gòu)造方法,使用不同的構(gòu)造方法會對一些參數(shù)使用一些缺省設(shè)置。這里看一下參數(shù)最全的構(gòu)造方法:

public ThreadPoolExecutor(int corePoolSize,                         
                          int maximumPoolSize,      
                          long keepAliveTime, 
                          TimeUnit unit,  
                          BlockingQueue<Runnable> workQueue, 
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler)

下面對參數(shù)進行一一說明:

  • int coreSize: 線程池核心線程數(shù)量,這個數(shù)量的線程在線程池關(guān)閉之前會一直存活;

  • int maximumPoolSize: 線程最大線程數(shù)量,當核心線程被使用完之后,線程池中線程數(shù)量可以臨時增到這個數(shù)量;

  • long keepAliveTime: 除了核心線程之外,臨時增加線程的存活時間,超過這個時間線程就會結(jié)束;

  • TimeUnit unit: keepAliveTime的時間單位;

  • BlockingQueue<Runnable> workQueue: 線程池中的一個阻塞隊列,當線程池中線程被使用完之后,提交的task就會被放入這個隊列;JUC包中提供的隊列主要有以下幾種:

    1. ArrayBlockingQueue: 一個FIFO隊列,新增加的元素會被追加到集合的尾部。這是一個有界的集合,一旦創(chuàng)建其大小就不能夠再改變了。往已滿隊列中追加元素會導致阻塞;

    2. DelayQueue: 無界隊列,在放入元素的時候可以指定一個延遲時間,只有當延遲時間結(jié)束后,這個元素才能被取出;

    3. LinkedBlockingQueue:通過不同的構(gòu)造器可以創(chuàng)建一個有界或者無界的隊列,隊列中的元素FIFO

    4. PriorityBlockingQueue:無界隊列,基于優(yōu)先級實現(xiàn)的隊列。集合中的元素按照優(yōu)先級排序。

  • ThreadFactory threadFactory: 線程工廠,用來創(chuàng)建線程池中的線程。

  • RejectedExecutionHandler handler: 線程池的飽和策略,如果線程池中線程被用完了,隊列也滿了,那么對于新提交的task就會使用這個策略。目前主要有以下幾種:

    1. AbortPolicy:直接拋出異常(默認策略)

    2. CallerRunsPolicy:調(diào)用線程池所在的線程去執(zhí)行被拒絕的task,會阻塞線程池所在的線程;

    3. DiscardOldestPolicy:丟棄隊列里最久之前的一個任務(wù),并執(zhí)行當前任務(wù)。

    4. DiscardPolicy:不進行任何處理,直接丟棄掉。

1.2 ThreadPoolExecutor 提交流程

1.2.1 ThreadPoolExecutor 中的ctl變量

在討論線程池的提交流程之前,我們需要先把注意力集中到一個特殊的變量上——ctl。 ctl是線程池中的控制狀態(tài),它是一個原子級別讀寫的integer,包含兩層含義:

  • workerCount: 有效線程數(shù);

  • runStat: 線程池的狀態(tài),有Running,Shutdown,Stop,TidyingTerminate五種狀態(tài)。

02.png

因為5種狀態(tài)至少需要3位來表示,剩下的全部用來表示workerCount。所有COUNT_BITS為Interger.SIZE - 3 = 29位。CAPACITY表示的是最大容量,29表示的最大值為2^29 -1,即1 << 29 -1(左移一位相當于乘以2, 1*2^29 -1),二級制表示為00011111111111111111111111111111。相當于低29為用來表是容量,高3位留下來表示狀態(tài)。

五種狀態(tài)對應(yīng)的表示分別如下:

  • RUNNING11100000000000000000000000000000

    running狀態(tài)下線程池接受新的task,并處理隊列中的task

  • SHUTDOWN00000000000000000000000000000000

    shutdown狀態(tài)下不會接受新的task,但是會處理隊列中的task

  • STOP00100000000000000000000000000000

    不接受新的task,處理隊列中的task,終止正在處理的task

  • TIDYNG01000000000000000000000000000000

    所有的task結(jié)束,workerCount為0,無活躍線程

  • TERMINATED011000000000000000000000000000000

    terminated()方法執(zhí)行結(jié)束

同時在線程池中還定義了三個方法來對ctl變量進行操作,其中兩個是從ctl中獲取workerCount和runStart,一個是通過指定的workerCount和runStat生成ctl:

03.png

runStateOfworkerCountOf都是簡單的通過&計算將低29為或者高3位置0。而ctlOf這是通過 |操作將高3位和低29位組合到一起。

ctl變量的初始化語句入如下,表示的是running狀態(tài),當前worker數(shù)為0:

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

1.2.2 提交任務(wù)流程

通過ThreadPoolExecutor提交任務(wù)有兩種方式,一種是execute,由他自己實現(xiàn)。另一種是submit,由其父類AbstractExecutorService實現(xiàn)。

這里需要提前說明一些ThreadPoolExecutor類的一些屬性:

  • workers:HashSet 用來存放線程池worker資源(線程)

  • workQueue:BlockingQueue 用來存放用戶提交的task(Runnable)

1.2.2.1 通過execute提交task

execute(Runnable command)方法

這個方法體比較簡短,我就直接全部粘過來了。

04.png

通過代碼我們可以看到,最外圍的分支結(jié)構(gòu)有三個:

  1. 如果線程池的worker數(shù)量小于corePoolSize,就會添加一個worker,執(zhí)行當前的task(這里我們先不探究addWorker執(zhí)行了什么操作);

  2. 如果第一種情況沒有滿足(當前wroker數(shù)已經(jīng)達到了corePoolSize大小或者addWorker失敗),就把task添加到阻塞隊列中。這個過程成功之后,就會進行一次double-check。

  3. 如果當前worker數(shù)量大于coreSize,且隊列已滿,就會嘗試創(chuàng)建一個非核心線程來執(zhí)行當前task。創(chuàng)建失敗的話就會直接使用飽和策略處理task。

在double-check的過程中如果發(fā)現(xiàn)線程池已經(jīng)不在運行狀態(tài)就會把當前task移除,并使用飽和策略處理它。否則就會檢查是否有必要創(chuàng)建一些新的線程。

05.png

addWorker(Runnable firstTask, boolean core)方法

這里的代碼分解一下,首先看一下方法的參數(shù)列表:

    private boolean addWorker(Runnable firstTask, boolean core) 

這個方法需要傳遞兩個參數(shù),firstTask表示添加的新worker處理的第一個任務(wù),布爾型的core表示添加的是否為核心線程。

跳過線程狀態(tài)校驗的過程,我們直接看這一段:

for (;;) {
   int wc = workerCountOf(c);
   if (wc >= CAPACITY ||
   wc >= (core ? corePoolSize : maximumPoolSize))
   return false;
   if (compareAndIncrementWorkerCount(c))
   break retry;
   c = ctl.get();  // Re-read ctl
   if (runStateOf(c) != rs)
   continue retry;
}

這里是循環(huán)CAS來增加ctl的數(shù)值,一旦增加成功,就會正式地創(chuàng)建線程。

線面是創(chuàng)建線程的流程:

boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
   w = new Worker(firstTask);
   final Thread t = w.thread;
   if (t != null) {
   final ReentrantLock mainLock = this.mainLock;
   mainLock.lock();
   try {
     int rs = runStateOf(ctl.get());
     if (rs < SHUTDOWN ||
     (rs == SHUTDOWN && firstTask == null)) {
     if (t.isAlive()) // precheck that t is startable
       throw new IllegalThreadStateException();
     workers.add(w);
     int s = workers.size();
     if (s > largestPoolSize)
       largestPoolSize = s;
     workerAdded = true;
   }
 } finally {
     mainLock.unlock();
 }
 if (workerAdded) {
     t.start();
     workerStarted = true;
 }
 }
} finally {
 if (! workerStarted)
 addWorkerFailed(w);
}
return workerStarted;

這里是直接創(chuàng)建了一個worker對象,而Worker的構(gòu)造方法如下:

 Worker(Runnable firstTask) {
   setState(-1); 
   this.firstTask = firstTask;
   this.thread = getThreadFactory().newThread(this);
 }

可以看到,在構(gòu)造方法中,Worker創(chuàng)建了一個新的線程作為成員變量。

當一個worker創(chuàng)建之后,還會進行重復校驗,已確定worker確實創(chuàng)建成功。

if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {
 if (t.isAlive()) 
   throw new IllegalThreadStateException();
 workers.add(w);
 int s = workers.size();
 if (s > largestPoolSize)
   largestPoolSize = s;
 workerAdded = true;
 }

線程創(chuàng)建成功之后,就會啟動worker。

if (workerAdded) {
   t.start();
   workerStarted = true;
}

Worker

那么線程池創(chuàng)建了worker線程之后都干些什么呢?如果隊列中沒有任務(wù)要做,線程如何?;钅??如果隊列中有worker,線程又會如何去執(zhí)行呢?為了了解這些,剖析一下Worker的代碼就很有必要了。

ThreadPoolExecutor類通過一個HashSet<Worker>來存放Worker對象:

private final HashSet<Worker> workers = new HashSet<Worker>();

通過Worker的構(gòu)造器我們看到Worker在構(gòu)造線程的時候是將自身作為參數(shù)傳到方法中的,因為其本身也實現(xiàn)了Runnable接口,所以當執(zhí)行t.start()的時候,實際上執(zhí)行的是Worker的run()方法。一下是wroker的runWorker方法:

    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {
                w.lock();
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }

粗略地看一下代碼,我們知道worker首先會判斷自己是否有屬于自己的firstTask,如果有的話,就先執(zhí)行這個task,這里是task.run(),只是普通的方法調(diào)用,執(zhí)行了task的邏輯。當自己的第一個task執(zhí)行完之后,worker就會進行循環(huán),通過getTask()方法不停地從workerQueue中獲取task。這個getTask是個阻塞方法,會一直循環(huán)直到返回task或者線程池狀態(tài)不為running的時刻。

當然,這個getTask()的工作內(nèi)容不僅僅是返回隊列中的task,同時也管理著非核心線程的存活。我們通過參數(shù)指定了非核心線程的存活時間,當線程池中有非核心線程且線程空閑的時間超過了指定的時間,就會做掉這些線程。這里它是通過workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)這個方式實現(xiàn),當超過這個等待時間獲取到的結(jié)果依然為null,表示當前的線程已經(jīng)空閑了keepAliveTime這么長時間了,屬于超時的非核心線程。之后會return null,在worker的runWorker()方法中調(diào)用processWorkerExit()方法結(jié)束當前worker。

if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }

1.2.2.2 通過submit提交task

這個方法是在ThreadPoolExecutor的父類AbstractExecutorService中定義的。AbstractExecutorService是一個抽象類。而submit的邏輯也比較簡單:


06.png

參數(shù)的主要區(qū)別在于Runnable和Callable,前者無返回值后者有返回值,可以通過Future的get方法阻塞獲取。

通過submit提交的task可以獲取一個future對象,可以對已提交的task進行相關(guān)操作。例如獲取返回值或者判斷運行狀態(tài)等等。

可以看到,submit只是將task封裝成了一個ftask,然后調(diào)用了execute方法調(diào)教了這個task到線程池中。這里使用了模板方法模式,submit調(diào)用的是由子類的線程池實現(xiàn)的execute,也就是上面的execute方法。

小結(jié):

關(guān)于線程池的構(gòu)造參數(shù)含義和參數(shù)的使用邏輯在日常使用過程中還是很值得關(guān)注的,對于日后線程池問題定位調(diào)優(yōu)都會有不少的幫助。理解線程池工作原理,線程池中worker的執(zhí)行流程可以讓我們對池化技術(shù)和資源利用的看法有更進一步的了解。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容