美團二面:聊聊線程池設(shè)計與原理,由表及里趣味解析

關(guān)于線程池,無論是在實際的項目開發(fā)還是面試,它都是并發(fā)編程中當之無愧的重中之重。因此,掌握線程池是每個Java開發(fā)者的必備技能。

本文將從線程池的應用場景和設(shè)計原理出發(fā),先帶大家手擼一個線程池,在理解線程池的內(nèi)部構(gòu)造后,再深入剖析Java中的線程池。全文大約2.5萬字,篇幅較長,在閱讀時建議先看目錄再看內(nèi)容

一、為什么要使用線程池

在前面系列文章的學習中,你已然知道多線程可以加速任務的處理、提高系統(tǒng)的吞吐量。那么,是否我們因此就可以頻繁地創(chuàng)建新的線程呢?答案是否定的。頻繁地繁創(chuàng)建和啟用新的線程不僅代價昂貴,而且無限增加的線程勢必也會造成管理成本的急劇上升。因此,為了平衡多線程的收益和成本,線程池誕生了。

1. 線程池的使用場景

生產(chǎn)者與消費者問題是線程池的典型應用場景。當你有源源不斷的任務需要處理時,為了提高任務的處理速度,你需要創(chuàng)建多個線程。那么,問題來了,如何管理這些任務和多線程呢?答案是:線程池。

線程池的池化(Pooling)原理的應用并不局限于Java中,在MySQL和諸多的分布式中間件系統(tǒng)中都有著廣泛的應用。當我們鏈接數(shù)據(jù)庫的時候,對鏈接的管理用的是線程池;當我們使用Tomcat時,對請求鏈接的管理用的也是線程池。所以,當你有批量的任務需要多線程處理時,那么基本上你就需要使用線程池。

2. 線程池的使用好處

線程池的好處主要體現(xiàn)在三個方面:系統(tǒng)資源、任務處理速度相關(guān)的復雜度管理,主要表現(xiàn)在:

  • 降低系統(tǒng)的資源開銷:通過復用線程池中的工作線程,避免頻繁創(chuàng)建新的線程,可以有效降低系統(tǒng)資源的開銷;
  • 提高任務的執(zhí)行速度:新任務達到時,無需創(chuàng)建新的線程,直接將任務交由已經(jīng)存在的線程進行處理,可以有效提高任務的執(zhí)行速度;
  • 有效管理任務和工作線程:線程池內(nèi)提供了任務管理和工作線程管理的機制。

為什么說創(chuàng)建線程是昂貴的

現(xiàn)在你已經(jīng)知道,頻繁地創(chuàng)建新線程需要付出額外的代價,所以我們使用了線程池。那么,創(chuàng)建一個新的線程的代價究竟是怎樣的呢?可以參考以下幾點:

  • 創(chuàng)建線程時,JVM必須為線程堆棧分配和初始化一大塊內(nèi)存。每個線程方法的調(diào)用棧幀都會存儲到這里,包括局部變量、返回值和常量池等;
  • 在創(chuàng)建和注冊本機線程時,需要和宿主機發(fā)生系統(tǒng)調(diào)用;
  • 需要創(chuàng)建、初始化描述符,并將其添加到 JVM 內(nèi)部數(shù)據(jù)結(jié)構(gòu)中。

另外,從某種意義上說,只要線程還活著,它就會占用資源,這不僅昂貴,而且浪費。 例如 ,線程堆棧、訪問堆棧的可達對象、JVM 線程描述符、操作系統(tǒng)本機線程描述符等等,在線程活著的時候,這些資源都會持續(xù)占據(jù)。

雖然不同的Java平臺在創(chuàng)建線程時的代價可能有所差異,但總體來說,都不便宜。

3. 線程池的核心組成

一個完整的線程池,應該包含以下幾個核心部分:

  • 任務提交:提供接口接收任務的提交;
  • 任務管理:選擇合適的隊列對提交的任務進行管理,包括對拒絕策略的設(shè)置;
  • 任務執(zhí)行:由工作線程來執(zhí)行提交的任務;
  • 線程池管理:包括基本參數(shù)設(shè)置、任務監(jiān)控、工作線程管理等。
image.png

二、如何手工制作線程池

通過第一部分的閱讀,現(xiàn)在你已經(jīng)了解了線程池的作用及它的核心組成。為了更深刻地理解線程池的組成,在這一部分我們通過簡單的四步來手工制作一個簡單的線程池。當然,麻雀雖小,五臟俱全。如果你能手工自制線程池之后,那么在理解后續(xù)的Java中的線程池時,將會易如反掌。

1. 線程池設(shè)計和制作

第一步:定義一個王者線程池:TheKingThreadPool,它是這次手工制作中名副其實的主角兒。在這個線程池中,包含了任務隊列管理、工作線程管理,并提供了可以指定隊列類型的構(gòu)造參數(shù),以及任務提交入口和線程池關(guān)閉接口。你看,雖然它看起來似乎很迷你,但是線程池的核心組件都已經(jīng)具備了,甚至在它的基礎(chǔ)上,你完全可以把它擴展成更為成熟的線程池。

/** * 王者線程池 */public class TheKingThreadPool {    private final BlockingQueue<Task> taskQueue;    private final List<Worker> workers = new ArrayList<>();    private ThreadPoolStatus status;    /**     * 初始化構(gòu)建線程池     *     * @param worksNumber 線程池中的工作線程數(shù)量     * @param taskQueue   任務隊列     */    public TheKingThreadPool(int worksNumber, BlockingQueue<Task> taskQueue) {        this.taskQueue = taskQueue;        status = ThreadPoolStatus.RUNNING;        for (int i = 0; i < worksNumber; i++) {            workers.add(new Worker("Worker" + i, taskQueue));        }        for (Worker worker : workers) {            Thread workThread = new Thread(worker);            workThread.setName(worker.getName());            workThread.start();        }    }    /**     * 提交任務     *     * @param task 待執(zhí)行的任務     */    public synchronized void execute(Task task) {        if (!this.status.isRunning()) {            throw new IllegalStateException("線程池非運行狀態(tài),停止接單啦~");        }        this.taskQueue.offer(task);    }    /**     * 等待所有任務執(zhí)行結(jié)束     */    public synchronized void waitUntilAllTasksFinished() {        while (this.taskQueue.size() > 0) {            try {                Thread.sleep(1);            } catch (InterruptedException e) {                e.printStackTrace();            }        }    }    /**     * 關(guān)閉線程池     */    public synchronized void shutdown() {        this.status = ThreadPoolStatus.SHUTDOWN;    }    /**     * 停止線程池     */    public synchronized void stop() {        this.status = ThreadPoolStatus.SHUTDOWN;        for (Worker worker : workers) {            worker.doStop();        }    }}

第二步:設(shè)計并制作工作線程。工作線程是干活的線程,將負責處理提交到線程池中的任務,我們把它叫做Worker。其實,這里的Worker的定義和Java線程池中的Worker已經(jīng)很像了,它繼承了Runnable接口并封裝了Thread. 在構(gòu)造Worker時,可以設(shè)定它的名字,并傳入任務隊列。當Worker啟動后,它將會從任務隊列中獲取任務并執(zhí)行。此外,它還提供了Stop方法,用以響應線程池的狀態(tài)變化。

/** * 線程池中用于執(zhí)行任務的線程 */public class Worker implements Runnable {    private final String name;    private Thread thread = null;    private final BlockingQueue<Task> taskQueue;    private boolean isStopped = false;    private AtomicInteger counter = new AtomicInteger();    public Worker(String name, BlockingQueue<Task> queue) {        this.name = name;        taskQueue = queue;    }    public void run() {        this.thread = Thread.currentThread();        while (!isStopped()) {            try {                Task task = taskQueue.poll(5L, TimeUnit.SECONDS);                if (task != null) {                    note(this.thread.getName(), ":獲取到新的任務->", task.getTaskDesc());                    task.run();                    counter.getAndIncrement();                }            } catch (Exception ignored) {            }        }        note(this.thread.getName(), ":已結(jié)束工作,執(zhí)行任務數(shù)量:" + counter.get());    }    public synchronized void doStop() {        isStopped = true;        if (thread != null) {            this.thread.interrupt();        }    }    public synchronized boolean isStopped() {        return isStopped;    }    public String getName() {        return name;    }}

第三步:設(shè)計并制作任務。任務是可以可執(zhí)行的對象,因此我們直接繼承Runnable接口就行。其實,直接使用Runnable接口也是可以的,只不過為了讓示例更加清楚,我們給Task加了任務描述的方法。

/** * 任務 */public interface Task extends Runnable {    String getTaskDesc();}

第四步:設(shè)計線程池的狀態(tài)。線程池作為一個運行框架,它必然會有一系列的狀態(tài),比如運行中、停止、關(guān)閉等。

public enum ThreadPoolStatus {    RUNNING(),    SHUTDOWN(),    STOP(),    TIDYING(),    TERMINATED();    ThreadPoolStatus() {    }    public boolean isRunning() {        return ThreadPoolStatus.RUNNING.equals(this);    }}

以上四個步驟完成后,一個簡易的線程池就已經(jīng)制作完畢。你看,如果你從以上幾點入手來理解線程池的源碼的話,是不是要簡單多了?Java中的線程池的核心組成也是如此,只不過在細節(jié)處理等方面更多全面且豐富。

2. 運行線程池

現(xiàn)在,我們的王者線程池已經(jīng)制作好。接下來,我們通過一個場景來運行它,看看它的效果如何。

試驗場景:峽谷森林中,鎧、蘭陵王和典韋等負責打野,而安其拉、貂蟬和大喬等美女負責對狩獵到的野怪進行燒烤,一場歡快的峽谷燒烤節(jié)正在進行中

在這個場景中,鎧和蘭陵王他們負責提交任務,而貂蟬和大喬她們則負責處理任務。

image.png

在下面的實現(xiàn)代碼中,我們通過上述設(shè)計的TheKingThreadPool來定義個線程池,wildMonsters中的野怪表示待提交的任務,并安排3個工作線程來執(zhí)行任務。在示例代碼的末尾,當所有任務執(zhí)行結(jié)束后,關(guān)閉線程池。

 public static void main(String[] args) {        TheKingThreadPool theKingThreadPool = new TheKingThreadPool(3, new ArrayBlockingQueue<>(10));        String[] wildMonsters = {"棕熊", "野雞", "灰狼", "野兔", "狐貍", "小鹿", "小花豹", "野豬"};        for (String wildMonsterName : wildMonsters) {            theKingThreadPool.execute(new Task() {                public String getTaskDesc() {                    return wildMonsterName;                }                public void run() {                    System.out.println(Thread.currentThread().getName() + ":" + wildMonsterName + "已經(jīng)烤好");                }            });        }        theKingThreadPool.waitUntilAllTasksFinished();        theKingThreadPool.stop();    }

王者線程池運行結(jié)果如下:

Worker0:獲取到新的任務->灰狼Worker1:獲取到新的任務->野雞Worker1:野雞已經(jīng)烤好Worker2:獲取到新的任務->棕熊Worker2:棕熊已經(jīng)烤好Worker1:獲取到新的任務->野兔Worker1:野兔已經(jīng)烤好Worker0:灰狼已經(jīng)烤好Worker1:獲取到新的任務->小鹿Worker1:小鹿已經(jīng)烤好Worker2:獲取到新的任務->狐貍Worker2:狐貍已經(jīng)烤好Worker1:獲取到新的任務->野豬Worker1:野豬已經(jīng)烤好Worker0:獲取到新的任務->小花豹Worker0:小花豹已經(jīng)烤好Worker0:已結(jié)束工作,執(zhí)行任務數(shù)量:2Worker2:已結(jié)束工作,執(zhí)行任務數(shù)量:2Worker1:已結(jié)束工作,執(zhí)行任務數(shù)量:4Process finished with exit code 0

從結(jié)果中可以看到,效果完全符合預期。所有的任務都已經(jīng)提交完畢,并且都被正確執(zhí)行。此外,通過線程池的任務統(tǒng)計,可以看到任務并不是均勻分配,Worker1執(zhí)行了4個任務,而Worker0和Worker2均只執(zhí)行了2個任務,這也是線程池中的正?,F(xiàn)象。

三、透徹理解Java中的線程池

在手工制作線程線程池之后,再來理解Java中的線程池就相對要容易很多。當然,相比于王者線程池,Java中的線程池(ThreadPoolExecutor)的實現(xiàn)要復雜很多。所以,理解時應當遵循一定的結(jié)構(gòu)和脈絡,把握住線程池的核心要點,眉毛胡子一把抓、理不清層次會導致你無法有效理解它的設(shè)計內(nèi)涵,進而導致你無法正確掌握它。

總體來說,Java中的線程池的設(shè)計核心都是圍繞“任務”進行,可以通過一個框架、兩大核心、三大過程概括。理解了這三個重要概念,基本上你已經(jīng)能從相對抽象的層面理解了線程池。

  • 一個框架:即線程池的整體設(shè)計存在一個框架,而不是雜亂無章的組成。所以,在學習線程池時,首先要能從立體上感知到這個框架的存在,而不要陷于凌亂的細節(jié)中;
  • 兩大核心:在線程池的整個框架中,圍繞任務執(zhí)行這件事,存在兩大核心:任務的管理任務的執(zhí)行,對應的也就是任務隊列和用于執(zhí)行任務的工作線程。任務隊列工作線程是框架得以有效運轉(zhuǎn)的關(guān)鍵部件;
  • 三大過程:前面說過,線程池的整體設(shè)計都是圍繞任務展開,所以框架內(nèi)可以分為任務提交、任務管理任務執(zhí)行三大過程。

從類比的角度講,你可以把框架看作是一個生產(chǎn)車間。在這個車間里,有一條流水線,任務隊列工作線程是這條流水線的兩大關(guān)鍵組成。而在流水線運作的過程中,就會涉及任務提交、任務管理任務執(zhí)行等不同的過程。

下面這幅圖,將幫助你立體地感知線程池的整體設(shè)計,建議你收藏。在這幅圖中,清楚地展示了線程池整個框架的工作流程和核心部件,接下來的文章也將圍繞這幅圖展開。

image.png

1. 線程池框架設(shè)計概覽

從源碼層面看,理解Java中的線程池,要從下面這四兄弟的概念和關(guān)系入手,這四個概念務必了然于心。

image.png
  • Executor:作為線程池的最頂層接口,Executor的接口在設(shè)計上,實現(xiàn)了任務提交任務執(zhí)行之間的解耦,這是它存在的意義。在Executor中,只定義了一個方法void execute(Runnable command),用于執(zhí)行提交的可運行的任務。注意,你看它這個方法的參數(shù)干脆就叫command,也就是“命令”,意在表明所提交的不是一個靜止的對象,而是可運行的命令。并且,這個命令將在未來的某一時刻執(zhí)行,具體由哪個線程來執(zhí)行也是不確定的;
  • ExecutorService:繼承了Executor的接口,并在此基礎(chǔ)上提供可以管理服務執(zhí)行結(jié)果(Futrue) 的能力。ExecutorService所提供的submit方法可以返回任務的執(zhí)行結(jié)果,而shutdown方法則可以用于關(guān)閉服務。相比起來,Executor只具備單一的執(zhí)行能力,而ExecutorService則不僅具有執(zhí)行能力,還提供了簡單的服務管理能力
  • AbstractExecutorService:作為ExecutorService的簡單實現(xiàn),該類通過RunnableFuture和newTaskFor實現(xiàn)了submit、invokeAny和invokeAll等方法;
  • ThreadPoolExecutor:該類是線程池的最終實現(xiàn)類,實現(xiàn)了Executor和ExecutorService中定義的能力,并豐富了AbstractExecutorService中的實現(xiàn)。在ThreadPoolExecutor中,定義了任務管理策略和線程池管理能力,相關(guān)能力的實現(xiàn)細節(jié)將是我們下文所要講解的核心所在。

如果你覺得還是不太能直觀地感受四兄弟的差異,那么你可以放大查看下面這幅高清圖示。看的時候,要格外注意它們各自方法的不同,方法的不同意味著它們的能力不同。

image.png

而對于線程池總體的執(zhí)行過程,下面這幅圖也建議你收藏。這幅圖雖然簡明,但完整展示了從任務提交到任務執(zhí)行的整個過程。這個執(zhí)行過程往往也是面試中的高頻面試題,務必掌握。

image.png

(1)線程池的核心屬性

線程池中的一些核心屬性選取如下,對于其中個別屬性會做特別說明。

// 線程池控制相關(guān)的主要變量// 這個變量很神奇,下文后專門陳述,請?zhí)貏e留意private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));// 待處理的任務隊列private final BlockingQueue < Runnable > workQueue;// 工作線程集合private final HashSet < Worker > workers = new HashSet < Worker > ();// 創(chuàng)建線程所用到的線程工廠private volatile ThreadFactory threadFactory;// 拒絕策略private volatile RejectedExecutionHandler handler;// 核心線程數(shù)private volatile int corePoolSize;// 最大線程數(shù)private volatile int maximumPoolSize;// 空閑線程的保活時長private volatile long keepAliveTime;// 線程池變更的主要控制鎖,在工作線程數(shù)、變更線程池狀態(tài)等場景下都會用到private final ReentrantLock mainLock = new ReentrantLock();

關(guān)于ctl字段的特別說明

在ThreadPoolExecutor的多個核心字段中,其他字段可能都比較好理解,但是ctl要單獨拎出來做些解釋。

顧名思義,ctl這個字段用于對線程池的控制。它的設(shè)計比較有趣,用一個字段卻表示了兩層含義,也就是這個字段實際是兩個字段的合體:

  • runState:線程池的運行狀態(tài)(高3位);
  • workerCount:工作線程數(shù)量(第29位)。

這兩個字段的值相互獨立,互不影響。那為何要用這種設(shè)計呢?這是因為,在線程池中這兩個字段幾乎總是如影相隨,如果不用一個字段來表示的話,那么就需要通過鎖的機制來控制兩個字段的一致性。不得不說,這個字段設(shè)計上還是比較巧妙的。

在線程池中,也提供了一些方法可以方便地獲取線程池的狀態(tài)和工作線程數(shù)量,它們都是通過對ctl進行位運算得來。

/**    計算當前線程池的狀態(tài)*/private static int runStateOf(int c) {    return c & ~CAPACITY;}/**    計算當前工作線程數(shù)*/private static int workerCountOf(int c) {    return c & CAPACITY;}/**    初始化ctl變量*/private static int ctlOf(int rs, int wc) {    return rs | wc;}

關(guān)于位運算,這里補充一點說明,如果你對位運算有點迷糊的話可以看看,如果你對它比較熟悉則可以直接跳過。

假設(shè)A=15,二進制是1111;B=6,二進制是110.

運算符名稱描述示例&按位與如果相對應位都是1,則結(jié)果為1,否則為0(A&B),得到6,即110~按位非按位取反運算符翻轉(zhuǎn)操作數(shù)的每一位,即0變成1,1變成0。(?A)得到-16,即
11111111111111111111111111110000|按位或如果相對應位都是 0,則結(jié)果為 0,否則為 1(A | B)得到15,即 1111

(2)線程池的核心構(gòu)造器

ThreadPoolExecutor有四個構(gòu)造器,其中一個是核心構(gòu)造器。你可以根據(jù)需要,按需使用這些構(gòu)造器。

  • 核心構(gòu)造器之一:相對較為常用的一個構(gòu)造器,你可以指定核心線程數(shù)、最大線程數(shù)、線程保活時間和任務隊列類型。
public ThreadPoolExecutor(int corePoolSize,    int maximumPoolSize,    long keepAliveTime,    TimeUnit unit,    BlockingQueue < Runnable > workQueue) {    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,        Executors.defaultThreadFactory(), defaultHandler);}
  • 核心構(gòu)造器之二:相比于第一個構(gòu)造器,你可以在這個構(gòu)造器中指定ThreadFactory. 通過ThreadFactory,你可以指定線程名稱、分組等個性化信息。
  public ThreadPoolExecutor(int corePoolSize,    int maximumPoolSize,    long keepAliveTime,    TimeUnit unit,    BlockingQueue < Runnable > workQueue,    ThreadFactory threadFactory) {    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,        threadFactory, defaultHandler);}
  • 核心構(gòu)造器之三:這個構(gòu)造器的要點在于,你可以指定拒絕策略。關(guān)于任務隊列的拒絕策略,下文有詳細介紹。
public ThreadPoolExecutor(int corePoolSize,      int maximumPoolSize,      long keepAliveTime,      TimeUnit unit,      BlockingQueue < Runnable > workQueue,      RejectedExecutionHandler handler) {      this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,          Executors.defaultThreadFactory(), handler);}
  • 核心構(gòu)造器之四:這個構(gòu)造器是ThreadPoolExecutor的核心構(gòu)造器,提供了較為全面的參數(shù)設(shè)置,上述的三個構(gòu)造器都是基于它實現(xiàn)。
public ThreadPoolExecutor(int corePoolSize,      int maximumPoolSize,      long keepAliveTime,      TimeUnit unit,      BlockingQueue < Runnable > workQueue,      ThreadFactory threadFactory,      RejectedExecutionHandler handler) {      if (corePoolSize < 0 ||          maximumPoolSize <= 0 ||          maximumPoolSize < corePoolSize ||          keepAliveTime < 0)          throw new IllegalArgumentException();      if (workQueue == null || threadFactory == null || handler == null)          throw new NullPointerException();      this.acc = System.getSecurityManager() == null ?          null :          AccessController.getContext();      this.corePoolSize = corePoolSize;      this.maximumPoolSize = maximumPoolSize;      this.workQueue = workQueue;      this.keepAliveTime = unit.toNanos(keepAliveTime);      this.threadFactory = threadFactory;      this.handler = handler;}

(3)線程池中的核心方法

/*** 提交Runnable類型的任務并執(zhí)行,但不返回結(jié)果*/public void execute(Runnable command){...}/*** 提交Runnable類型的任務,并返回結(jié)果*/public Future<?> submit(Runnable task){...}/*** 提交Runnable類型的任務,并返回結(jié)果,支持指定默認結(jié)果*/public <T> Future<T> submit(Runnable task, T result){...}/*** 提交Callable類型的任務并執(zhí)行*/public <T> Future<T> submit(Callable<T> task) {...}/*** 關(guān)閉線程池,繼續(xù)執(zhí)行隊列中未完成的任務,但不會接收新的任務*/public void shutdown() {...}/*** 立即關(guān)閉線程池,同時放棄未執(zhí)行的任務,并不再接收新的任務*/public List<Runnable> shutdownNow(){...}

(4)線程池的狀態(tài)與生命周期管理

前文說過,線程池恰似一個生產(chǎn)車間,而從生產(chǎn)車間的角度看,生產(chǎn)車間有運行、停產(chǎn)等不同狀態(tài),所以線程池也是有一定的狀態(tài)和使用周期的。

image.png
  • Running:運行中,該狀態(tài)下可以繼續(xù)向線程池中增加任務,并正常處理隊列中的任務;
  • Shutdown:關(guān)閉中,該狀態(tài)下線程池不會立即停止,但不能繼續(xù)向線程池中增加任務,直到任務執(zhí)行結(jié)束;
  • Stop:停止,該狀態(tài)下將不再接收新的任務,同時不再處理隊列中的任務,并中斷工作中的線程
  • Tidying:相對短暫的中間狀態(tài),所有任務都已經(jīng)結(jié)束,并且所有的工作線程都不再存在(workerCount==0),并運行terminated()鉤子方法;
  • Terminated:terminated()運行結(jié)束。

2. 如何向線程池中提交任務

向線程池提交任務有兩種比較常見的方式,一種是需要返回執(zhí)行結(jié)果的一種則是不需要返回結(jié)果的。

(1)不關(guān)注任務執(zhí)行結(jié)果:execute

通過execute()提交任務到線程池后,任務將在未來某個時刻執(zhí)行,執(zhí)行的任務的線程可能是當前線程池中的線程,也可能是新創(chuàng)建的線程。當然,如果此時線程池應關(guān)閉,或者任務隊列已滿,那么該任務將交由RejectedExecutionHandler處理。

(2)關(guān)注任務執(zhí)行結(jié)果:submit

通過submit()提交任務到線程池后,運行機制和execute類似,其核心不同在于,由submit()提交任務時將等待任務執(zhí)行結(jié)束并返回結(jié)果。

3. 如何管理提交的任務

(1)任務隊列選型策略

  • SynchronousQueue:無縫傳遞(Direct handoffs)。當新的任務到達時,將直接交由線程處理,而不是放入緩存隊列。因此,如果任務達到時卻沒有可用線程,那么將會創(chuàng)建新的線程。所以,為了避免任務丟失,在使用SynchronousQueue時,將會需要創(chuàng)建無數(shù)的線程,在使用時需要謹慎評估。
  • LinkedBlockingQueue:無界隊列,新提交的任務都會緩存到該隊列中。使用無界隊列時,只有corePoolSize中的線程來處理隊列中的任務,這時候和maximumPoolSize是沒有關(guān)系的,它不會創(chuàng)建新的線程。當然,你需要注意的是,如果任務的處理速度遠低于任務的產(chǎn)生速度,那么LinkedBlockingQueue的無限增長可能會導致內(nèi)存容量等問題。
  • ArrayBlockingQueue:有界隊列,可能會觸發(fā)創(chuàng)建新的工作線程,maximumPoolSize參數(shù)設(shè)置在有界隊列中將發(fā)揮作用。在使用有界隊列時,要特別注意任務隊列大小和工作線程數(shù)量之間的權(quán)衡。如果任務隊列大但是線程數(shù)量少,那么結(jié)果會是系統(tǒng)資源(主要是CPU)占用率較低,但同時系統(tǒng)的吞吐量也會降低。反之,如果縮小任務隊列并擴大工作線程數(shù)量,那么結(jié)果則是系統(tǒng)吞吐量增大,但同時系統(tǒng)資源占用也會增加。所以,使用有界隊列時,要考慮到平衡的藝術(shù),并配置相應的拒絕策略。

(2)如何選擇合適的拒絕策略

在使用線程池時,拒絕策略是必須要確認的地方,因為它可能會造成任務丟失。

線程池已經(jīng)關(guān)閉任務隊列已滿且無法再創(chuàng)建新的工作線程時,那么新提交的任務將會被拒絕,拒絕時將調(diào)用RejectedExecutionHandler中的rejectedExecution(Runnable r, ThreadPoolExecutor executor)來執(zhí)行具體的拒絕動作。

final void reject(Runnable command) {    handler.rejectedExecution(command, this);}

以execute方法為例,當線程池狀態(tài)異?;驘o法新增工作線程時,將會執(zhí)行任務拒絕策略。

public void execute(Runnable command) {        if (command == null)            throw new NullPointerException();               int c = ctl.get();        if (workerCountOf(c) < corePoolSize) {            if (addWorker(command, true))                return;            c = ctl.get();        }        if (isRunning(c) && workQueue.offer(command)) {            int recheck = ctl.get();            if (! isRunning(recheck) && remove(command))                reject(command);            else if (workerCountOf(recheck) == 0)                addWorker(null, false);        }        else if (!addWorker(command, false))            reject(command);}

ThreadPoolExecutor的默認拒絕策略是AbortPolicy,這一點在屬性定義中已經(jīng)確定。在大部分場景中,直接拒絕任務都是不合適的。

private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();
  • AbortPolicy:默認策略,直接拋出RejectedExecutionException異常;
  • CallerRunsPolicy:交由當前線程自己來執(zhí)行。這種策略這提供了一個簡單的反饋控制機制,可以減慢提交新任務的速度;
  • DiscardPolicy:直接丟棄任務,不會拋出異常;
  • DiscardOldestPolicy:如果此時線程池沒有關(guān)閉,將從隊列的頭部取出第一個任務并丟棄,并再次嘗試執(zhí)行。如果執(zhí)行失敗,那么將重復這個過程。

如果上述四種策略均不滿足,你也可以通過RejectedExecutionHandler接口定制個性化的拒絕策略。事實上,為了兼顧任務不丟失和系統(tǒng)負載,建議你自己實現(xiàn)拒絕策略

(3)隊列維護

對于任務隊列的維護,線程池也提供了一些方法。

  • 獲取當前任務隊列
public BlockingQueue<Runnable> getQueue() {    return workQueue;}
  • 從隊列中移除任務
public boolean remove(Runnable task) {    boolean removed = workQueue.remove(task);    tryTerminate(); // In case SHUTDOWN and now empty    return removed;}

4. 如何管理執(zhí)行任務的工作線程

(1)核心工作線程

核心線程(corePoolSize)是指最小數(shù)量的工作線程,此類線程不允許超時回收。當然,如果你設(shè)置了allowCoreThreadTimeOut,那么核心線程也是會超時的,這可能會導致核心線程數(shù)為零。核心線程的數(shù)量可以通過線程池的構(gòu)造參數(shù)指定。

(2)最大工作線程

最大工作線程指的是線程池為了處理現(xiàn)有任務,所能創(chuàng)建的最大工作線程數(shù)量。

最大工作線程可以通過構(gòu)造函數(shù)的maximumPoolSize變量設(shè)定。當然,如果你所使用的任務隊列是無界隊列,那么這個參數(shù)將形同虛設(shè)。

(3)如何創(chuàng)建新的工作線程

在線程池中,新線程的創(chuàng)建是通過ThreadFactory完成。你可以通過線程池的構(gòu)造函數(shù)指定特定的ThreadFactory,如未指定將使用默認的Executors.defaultThreadFactory(),該工廠所創(chuàng)建的線程具有相同的ThreadGroup和優(yōu)先級(NORM_PRIORITY),并且都不是守護( Non-Daemon)線程。

通過設(shè)定ThreadFactory,你可以自定義線程的名字、線程組以及守護狀態(tài)等。

在Java的線程池ThreadPoolExecutor中,addWorker方法負責新線程的具體創(chuàng)建工作。

  private boolean addWorker(Runnable firstTask, boolean core) {...}

(4)保活時間

?;顣r間指的是非核心線程在空閑時所能存活的時間。

如果線程池中的線程數(shù)量超過了corePoolSize中的設(shè)定,那么空閑線程的空閑時間在超過keepAliveTime中設(shè)定的時間后,線程將被回收終止。在線程被回收后,如果需要新的線程時,將繼續(xù)創(chuàng)建新的線程。

需要注意的是,keepAliveTime僅對非核心線程有效,如果需要設(shè)置核心線程的?;顣r間,需要使用allowCoreThreadTimeOut參數(shù)。

(5)鉤子方法

  • 設(shè)定任務執(zhí)行前動作:beforeExecute

如果你希望提交的任務在執(zhí)行前執(zhí)行特定的動作,比如寫入日志或設(shè)定ThreadLocal等。那么,你可以通過重寫beforeExecute來實現(xiàn)這一目的。

protected void beforeExecute(Thread t, Runnable r) { }
  • 設(shè)定任務執(zhí)行后動作:beforeExecute 如果你希望提交的任務在執(zhí)行后執(zhí)行特定的動作,比如寫入日志或捕獲異常等。那么,你可以通過重寫afterExecute來實現(xiàn)這一目的。
protected void afterExecute(Runnable r, Throwable t) { }
  • 設(shè)定線程池終止動作:terminated
protected void terminated() { }

(6)線程池的預熱

默認情況下,在設(shè)置核心線程數(shù)之后,也不會立即創(chuàng)建相關(guān)線程,而是任務到達后再創(chuàng)建。

如果你需要預先就啟動核心線程,那么你可以通過調(diào)用prestartCoreThread或prestartAllCoreThreads來提前啟動,以達到線程池預熱目的,并且可以通過ensurePrestart方法來驗證效果。

(7)線程回收機制

當線程池中的工作線程數(shù)量大于corePoolSize設(shè)置的數(shù)量時,并且存在空閑線程,并且這個空閑線程的空閑時長超過了keepAliveTime所設(shè)置的時長,那么這樣的空閑線程將會被回收,以降低不必要的資源浪費。

final void runWorker(Worker w) {        Thread wt = Thread.currentThread();        Runnable task = w.firstTask;        w.firstTask = null;        w.unlock(); // allow interrupts        boolean completedAbruptly = true;        try {            while (task != null || (task = getTask()) != null) {           ...        } finally {            processWorkerExit(w, completedAbruptly); // 主動回收自己        }    }

(8)線程數(shù)調(diào)整策略

線程池的工作線程的設(shè)置是否合理,關(guān)系到系統(tǒng)負載和任務處理速度之間的平衡。這里要明確的是,如何設(shè)置核心線程并沒有放之四海而皆準的公式。每個業(yè)務場景都有著它獨特的地方,CPU密集型和IO密集型任務存在較大差異。因此,在使用線程池的時候,要具體問題具體分析,但是你可以運行結(jié)果持續(xù)調(diào)整來優(yōu)化線程池。

5. 線程池使用示例

我們?nèi)砸允止ぶ谱骶€程池部分的場景為例,通過ThreadPoolExecutor實現(xiàn)來展示線程池的使用示例。從代碼中看,ThreadPoolExecutor的使用和王者線程池TheKingThreadPool的用法基本一致。

public static void main(String[] args) {    ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(3, 20, 1000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue < > (10));    String[] wildMonsters = {"棕熊", "野雞", "灰狼", "野兔", "狐貍", "小鹿", "小花豹", "野豬"};    for (String wildMonsterName: wildMonsters) {        threadPoolExecutor.execute(new RunnableTask() {            public String getTaskDesc() {                return wildMonsterName;            }            public void run() {                System.out.println(Thread.currentThread().getName() + ":" + wildMonsterName + "已經(jīng)烤好");            }        });    }    threadPoolExecutor.shutdown();}

6. Executors類

Executors是JUC中一個針對ThreadPoolExecutor和ThreadFactory等設(shè)計的一個工具類。通過Executors,可以方便地創(chuàng)建不同類型的線程池。當然,其內(nèi)部主要是通過給ThreadPoolExecutor的構(gòu)造傳遞特定的參數(shù)實現(xiàn),并無玄機可言。常用的幾個工具如下所示:

  • 創(chuàng)建固定線程數(shù)的線程池
public static ExecutorService newFixedThreadPool(int nThreads) {        return new ThreadPoolExecutor(nThreads, nThreads,                                      0L, TimeUnit.MILLISECONDS,                                      new LinkedBlockingQueue<Runnable>());    }
  • 創(chuàng)建只有1個線程的線程池
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {        return new FinalizableDelegatedExecutorService            (new ThreadPoolExecutor(1, 1,                                    0L, TimeUnit.MILLISECONDS,                                    new LinkedBlockingQueue<Runnable>(),                                    threadFactory));    }
  • 創(chuàng)建緩存線程池:這種線程池不設(shè)定核心線程數(shù),根據(jù)任務的數(shù)據(jù)動態(tài)創(chuàng)建線程。當任務執(zhí)行結(jié)束后,線程會被逐步回收,也就是所有的線程都是臨時的。
public static ExecutorService newCachedThreadPool() {        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,                                      60L, TimeUnit.SECONDS,                                      new SynchronousQueue<Runnable>());    }

7. 線程池監(jiān)控

作為一個運行框架,ThreadPoolExecutor既簡單也復雜。因此,對其內(nèi)部的監(jiān)控和管理是十分必要的。ThreadPoolExecutor也提供了一些方法,通過這些方法,我們可以獲取到線程池的一些重要狀態(tài)和數(shù)據(jù)。

  • 獲取線程池大小
 public int getPoolSize() {     final ReentrantLock mainLock = this.mainLock;     mainLock.lock();     try {         // Remove rare and surprising possibility of         // isTerminated() && getPoolSize() > 0         return runStateAtLeast(ctl.get(), TIDYING) ? 0 :             workers.size();     } finally {         mainLock.unlock();     } }
  • 獲取活躍工作線程數(shù)量
 public int getActiveCount() {     final ReentrantLock mainLock = this.mainLock;     mainLock.lock();     try {         int n = 0;         for (Worker w: workers)             if (w.isLocked())                 ++n;         return n;     } finally {         mainLock.unlock();     } }
  • 獲取最大線程池
 public int getLargestPoolSize() {     final ReentrantLock mainLock = this.mainLock;     mainLock.lock();     try {         return largestPoolSize;     } finally {         mainLock.unlock();     } }
  • 獲取線程池中的任務總數(shù)
 public long getTaskCount() {     final ReentrantLock mainLock = this.mainLock;     mainLock.lock();     try {         long n = completedTaskCount;         for (Worker w: workers) {             n += w.completedTasks;             if (w.isLocked())                 ++n;         }         return n + workQueue.size();     } finally {         mainLock.unlock();     } }
  • 獲取線程池中已完成的任務總數(shù)
public long getCompletedTaskCount() {    final ReentrantLock mainLock = this.mainLock;    mainLock.lock();    try {        long n = completedTaskCount;        for (Worker w: workers)            n += w.completedTasks;        return n;    } finally {        mainLock.unlock();    }}

四、如何養(yǎng)成正確使用線程池的良好習慣

1. 線程池的使用風險提示

雖然線程池的使用有諸多的好處,然而天下沒有免費的午餐,線程池在給我們帶來便利的同時,也有一些避免踩坑的注意事項:

  • 線程池設(shè)置過大或過小都不合適。如果線程池的線程數(shù)量過多,雖然局部處理速度增加,但將會影響應用系統(tǒng)的整體性能。而如果線程池的線程數(shù)量過少,線程池可能無法帶來預期的性能的提升;
  • 和其他多線程類似,線程池中也可能會發(fā)生死鎖。比如,某個任務等待另外一個任務結(jié)束,但卻沒有線程來執(zhí)行等待的那個任務,這也是為什么要避免任務間存在依賴;
  • 添加任務到隊列時耗時過長。如果任務隊列已滿,外部線程向隊列添加任務將會受阻。所以,為了避免外部線程阻塞時間過長,你可以設(shè)定最大等待時間;

為了降低這些風險的發(fā)生,你在設(shè)置線程池的類型和參數(shù)時,應當格外小心。在正式上線前,最好能做一次壓力測試。

2. 創(chuàng)建線程池的推薦姿勢

雖然通過Executors創(chuàng)建線程比較方便,但是Executors的封裝屏蔽了一些重要的參數(shù)細節(jié),而這些參數(shù)對于線程池至關(guān)重要,所以為了避免因?qū)xecutors不了解而錯誤地使用線程池,建議還是通過ThreadPoolExecutor的構(gòu)造參數(shù)直接創(chuàng)建

3. 盡量避免使用無界隊列

如果再認真點說的話,你應該在任何時候都避免使用無界隊列來管理任務。注意,Executors的newFixedThreadPool所使用的是LinkedBlockingQueue,上文有它的源碼。

小結(jié)

以上就是關(guān)于Java線程池的全部內(nèi)容。在這篇文章中,我們講解了線程池的應用場景、核心組成及原理,并手工制作了一個線程池,而且在此基礎(chǔ)上深入講解了Java中的線程池ThreadPoolExecutor的實現(xiàn)。雖然文章整體篇幅較大,但是由于線程池涉及的內(nèi)容十分廣泛,難以在一篇文章中全部提及,仍有部分重要內(nèi)容未能覆蓋,比如如何處理線程池中的異常、如何優(yōu)雅關(guān)閉線程池等。

熟練掌握線程池并不是一件容易的事,建議按照本文開篇的建議,先理解其要解決的問題,再理解其核心組成原理,最后再深入到Java中的源碼中。如此一來,帶著已知的概念去看源碼,會更容易理解源碼的設(shè)計之道。

轉(zhuǎn)載于:
https://www.cnblogs.com/time-as-a-friend/p/15060244.html

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容