這可能是史上最全、最強(qiáng)的Java線(xiàn)程池學(xué)習(xí)總結(jié)

一、使用線(xiàn)程池的好處

池化技術(shù)相比大家已經(jīng)屢見(jiàn)不鮮了,線(xiàn)程池、數(shù)據(jù)庫(kù)連接池、Http 連接池等等都是對(duì)這個(gè)思想的應(yīng)用。池化技術(shù)的思想主要是為了減少每次獲取資源的消耗,提高對(duì)資源的利用率。

線(xiàn)程池提供了一種限制和管理資源(包括執(zhí)行一個(gè)任務(wù))。 每個(gè)線(xiàn)程池還維護(hù)一些基本統(tǒng)計(jì)信息,例如已完成任務(wù)的數(shù)量。

這里借用《Java 并發(fā)編程的藝術(shù)》提到的來(lái)說(shuō)一下使用線(xiàn)程池的好處

  • 降低資源消耗。通過(guò)重復(fù)利用已創(chuàng)建的線(xiàn)程降低線(xiàn)程創(chuàng)建和銷(xiāo)毀造成的消耗。
  • 提高響應(yīng)速度。當(dāng)任務(wù)到達(dá)時(shí),任務(wù)可以不需要的等到線(xiàn)程創(chuàng)建就能立即執(zhí)行。
  • 提高線(xiàn)程的可管理性。線(xiàn)程是稀缺資源,如果無(wú)限制的創(chuàng)建,不僅會(huì)消耗系統(tǒng)資源,還會(huì)降低系統(tǒng)的穩(wěn)定性,使用線(xiàn)程池可以進(jìn)行統(tǒng)一的分配,調(diào)優(yōu)和監(jiān)控。

二、Executor 框架

2.1 簡(jiǎn)介

Executor 框架是 Java5 之后引進(jìn)的,在 Java 5 之后,通過(guò) Executor 來(lái)啟動(dòng)線(xiàn)程比使用 Thread 的 start 方法更好,除了更易管理,效率更好(用線(xiàn)程池實(shí)現(xiàn),節(jié)約開(kāi)銷(xiāo))外,還有關(guān)鍵的一點(diǎn):有助于避免 this 逃逸問(wèn)題。

補(bǔ)充:this 逃逸是指在構(gòu)造函數(shù)返回之前其他線(xiàn)程就持有該對(duì)象的引用. 調(diào)用尚未構(gòu)造完全的對(duì)象的方法可能引發(fā)令人疑惑的錯(cuò)誤。

Executor 框架不僅包括了線(xiàn)程池的管理,還提供了線(xiàn)程工廠(chǎng)、隊(duì)列以及拒絕策略等,Executor 框架讓并發(fā)編程變得更加簡(jiǎn)單。

2.2 Executor 框架結(jié)構(gòu)(主要由三大部分組成)

①. 任務(wù)(Runnable /Callable)

執(zhí)行任務(wù)需要實(shí)現(xiàn)的 Runnable 接口Callable接口。Runnable 接口Callable 接口 實(shí)現(xiàn)類(lèi)都可以被 ThreadPoolExecutorScheduledThreadPoolExecutor 執(zhí)行。

②. 任務(wù)的執(zhí)行(Executor)

如下圖所示,包括任務(wù)執(zhí)行機(jī)制的核心接口 Executor ,以及繼承自 Executor 接口的 ExecutorService 接口。ThreadPoolExecutorScheduledThreadPoolExecutor 這兩個(gè)關(guān)鍵類(lèi)實(shí)現(xiàn)了 ExecutorService 接口

這里提了很多底層的類(lèi)關(guān)系,但是,實(shí)際上我們需要更多關(guān)注的是 ThreadPoolExecutor 這個(gè)類(lèi),這個(gè)類(lèi)在我們實(shí)際使用線(xiàn)程池的過(guò)程中,使用頻率還是非常高的。

注意: 通過(guò)查看 ScheduledThreadPoolExecutor 源代碼我們發(fā)現(xiàn) ScheduledThreadPoolExecutor 實(shí)際上是繼承了 ThreadPoolExecutor 并實(shí)現(xiàn)了 ScheduledExecutorService ,而 ScheduledExecutorService 又實(shí)現(xiàn)了 ExecutorService,正如我們下面給出的類(lèi)關(guān)系圖顯示的一樣。

ThreadPoolExecutor 類(lèi)描述:

//AbstractExecutorService實(shí)現(xiàn)了ExecutorService接口
public class ThreadPoolExecutor extends AbstractExecutorService

ScheduledThreadPoolExecutor 類(lèi)描述:

//ScheduledExecutorService實(shí)現(xiàn)了ExecutorService接口
public class ScheduledThreadPoolExecutor
        extends ThreadPoolExecutor
        implements ScheduledExecutorService

③. 異步計(jì)算的結(jié)果(Future)

Future 接口以及 Future 接口的實(shí)現(xiàn)類(lèi) FutureTask 類(lèi)都可以代表異步計(jì)算的結(jié)果。

當(dāng)我們把 Runnable接口Callable 接口 的實(shí)現(xiàn)類(lèi)提交給 ThreadPoolExecutorScheduledThreadPoolExecutor 執(zhí)行。(調(diào)用 submit() 方法時(shí)會(huì)返回一個(gè) FutureTask 對(duì)象)

2.3 Executor 框架的使用

  1. 主線(xiàn)程首先要?jiǎng)?chuàng)建實(shí)現(xiàn) Runnable 或者 Callable 接口的任務(wù)對(duì)象。
  2. 把創(chuàng)建完成的實(shí)現(xiàn) Runnable/Callable接口的 對(duì)象直接交給 ExecutorService 執(zhí)行: ExecutorService.execute(Runnable command))或者也可以把 Runnable 對(duì)象或Callable 對(duì)象提交給 ExecutorService 執(zhí)行(ExecutorService.submit(Runnable task)ExecutorService.submit(Callable <T> task))。
  3. 如果執(zhí)行 ExecutorService.submit(…)ExecutorService 將返回一個(gè)實(shí)現(xiàn)Future接口的對(duì)象(我們剛剛也提到過(guò)了執(zhí)行 execute()方法和 submit()方法的區(qū)別,submit()會(huì)返回一個(gè) FutureTask 對(duì)象)。由于 FutureTask 實(shí)現(xiàn)了 Runnable,我們也可以創(chuàng)建 FutureTask,然后直接交給 ExecutorService 執(zhí)行。
  4. 最后,主線(xiàn)程可以執(zhí)行 FutureTask.get()方法來(lái)等待任務(wù)執(zhí)行完成。主線(xiàn)程也可以執(zhí)行 FutureTask.cancel(boolean mayInterruptIfRunning)來(lái)取消此任務(wù)的執(zhí)行。

三、(重要)ThreadPoolExecutor 類(lèi)簡(jiǎn)單介紹

線(xiàn)程池實(shí)現(xiàn)類(lèi) ThreadPoolExecutorExecutor 框架最核心的類(lèi)。

3.1 ThreadPoolExecutor 類(lèi)分析

ThreadPoolExecutor 類(lèi)中提供的四個(gè)構(gòu)造方法。我們來(lái)看最長(zhǎng)的那個(gè),其余三個(gè)都是在這個(gè)構(gòu)造方法的基礎(chǔ)上產(chǎn)生(其他幾個(gè)構(gòu)造方法說(shuō)白點(diǎn)都是給定某些默認(rèn)參數(shù)的構(gòu)造方法比如默認(rèn)制定拒絕策略是什么),這里就不貼代碼講了,比較簡(jiǎn)單。

    /**
     * 用給定的初始參數(shù)創(chuàng)建一個(gè)新的ThreadPoolExecutor。
     */
    public ThreadPoolExecutor(int corePoolSize,//線(xiàn)程池的核心線(xiàn)程數(shù)量
                              int maximumPoolSize,//線(xiàn)程池的最大線(xiàn)程數(shù)
                              long keepAliveTime,//當(dāng)線(xiàn)程數(shù)大于核心線(xiàn)程數(shù)時(shí),多余的空閑線(xiàn)程存活的最長(zhǎng)時(shí)間
                              TimeUnit unit,//時(shí)間單位
                              BlockingQueue<Runnable> workQueue,//任務(wù)隊(duì)列,用來(lái)儲(chǔ)存等待執(zhí)行任務(wù)的隊(duì)列
                              ThreadFactory threadFactory,//線(xiàn)程工廠(chǎng),用來(lái)創(chuàng)建線(xiàn)程,一般默認(rèn)即可
                              RejectedExecutionHandler handler//拒絕策略,當(dāng)提交的任務(wù)過(guò)多而不能及時(shí)處理時(shí),我們可以定制策略來(lái)處理任務(wù)
                               ) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

下面這些對(duì)創(chuàng)建 非常重要,在后面使用線(xiàn)程池的過(guò)程中你一定會(huì)用到!所以,務(wù)必拿著小本本記清楚。

ThreadPoolExecutor 3 個(gè)最重要的參數(shù):

  • corePoolSize : 核心線(xiàn)程數(shù)線(xiàn)程數(shù)定義了最小可以同時(shí)運(yùn)行的線(xiàn)程數(shù)量。
  • maximumPoolSize : 當(dāng)隊(duì)列中存放的任務(wù)達(dá)到隊(duì)列容量的時(shí)候,當(dāng)前可以同時(shí)運(yùn)行的線(xiàn)程數(shù)量變?yōu)樽畲缶€(xiàn)程數(shù)。
  • workQueue: 當(dāng)新任務(wù)來(lái)的時(shí)候會(huì)先判斷當(dāng)前運(yùn)行的線(xiàn)程數(shù)量是否達(dá)到核心線(xiàn)程數(shù),如果達(dá)到的話(huà),信任就會(huì)被存放在隊(duì)列中。

ThreadPoolExecutor其他常見(jiàn)參數(shù):

  1. keepAliveTime:當(dāng)線(xiàn)程池中的線(xiàn)程數(shù)量大于 corePoolSize 的時(shí)候,如果這時(shí)沒(méi)有新的任務(wù)提交,核心線(xiàn)程外的線(xiàn)程不會(huì)立即銷(xiāo)毀,而是會(huì)等待,直到等待的時(shí)間超過(guò)了 keepAliveTime才會(huì)被回收銷(xiāo)毀;
  2. unit : keepAliveTime 參數(shù)的時(shí)間單位。
  3. threadFactory :executor 創(chuàng)建新線(xiàn)程的時(shí)候會(huì)用到。
  4. handler :飽和策略。關(guān)于飽和策略下面單獨(dú)介紹一下。

ThreadPoolExecutor 飽和策略定義:

如果當(dāng)前同時(shí)運(yùn)行的線(xiàn)程數(shù)量達(dá)到最大線(xiàn)程數(shù)量并且隊(duì)列也已經(jīng)被放滿(mǎn)了任時(shí),ThreadPoolTaskExecutor 定義一些策略:

  • ThreadPoolExecutor.AbortPolicy:拋出 RejectedExecutionException來(lái)拒絕新任務(wù)的處理。
  • ThreadPoolExecutor.CallerRunsPolicy:調(diào)用執(zhí)行自己的線(xiàn)程運(yùn)行任務(wù),也就是直接在調(diào)用execute方法的線(xiàn)程中運(yùn)行(run)被拒絕的任務(wù),如果執(zhí)行程序已關(guān)閉,則會(huì)丟棄該任務(wù)。因此這種策略會(huì)降低對(duì)于新任務(wù)提交速度,影響程序的整體性能。另外,這個(gè)策略喜歡增加隊(duì)列容量。如果您的應(yīng)用程序可以承受此延遲并且你不能任務(wù)丟棄任何一個(gè)任務(wù)請(qǐng)求的話(huà),你可以選擇這個(gè)策略。
  • ThreadPoolExecutor.DiscardPolicy 不處理新任務(wù),直接丟棄掉。
  • ThreadPoolExecutor.DiscardOldestPolicy 此策略將丟棄最早的未處理的任務(wù)請(qǐng)求。

舉個(gè)例子:

Spring 通過(guò) ThreadPoolTaskExecutor 或者我們直接通過(guò) ThreadPoolExecutor 的構(gòu)造函數(shù)創(chuàng)建線(xiàn)程池的時(shí)候,當(dāng)我們不指定 RejectedExecutionHandler 飽和策略的話(huà)來(lái)配置線(xiàn)程池的時(shí)候默認(rèn)使用的是 ThreadPoolExecutor.AbortPolicy。在默認(rèn)情況下,ThreadPoolExecutor 將拋出 RejectedExecutionException 來(lái)拒絕新來(lái)的任務(wù) ,這代表你將丟失對(duì)這個(gè)任務(wù)的處理。 對(duì)于可伸縮的應(yīng)用程序,建議使用 ThreadPoolExecutor.CallerRunsPolicy。當(dāng)最大池被填滿(mǎn)時(shí),此策略為我們提供可伸縮隊(duì)列。(這個(gè)直接查看 ThreadPoolExecutor 的構(gòu)造函數(shù)源碼就可以看出,比較簡(jiǎn)單的原因,這里就不貼代碼了。)

3.2 推薦使用 ThreadPoolExecutor 構(gòu)造函數(shù)創(chuàng)建線(xiàn)程池

在《阿里巴巴 Java 開(kāi)發(fā)手冊(cè)》“并發(fā)處理”這一章節(jié),明確指出線(xiàn)程資源必須通過(guò)線(xiàn)程池提供,不允許在應(yīng)用中自行顯示創(chuàng)建線(xiàn)程。

為什么呢?

使用線(xiàn)程池的好處是減少在創(chuàng)建和銷(xiāo)毀線(xiàn)程上所消耗的時(shí)間以及系統(tǒng)資源開(kāi)銷(xiāo),解決資源不足的問(wèn)題。如果不使用線(xiàn)程池,有可能會(huì)造成系統(tǒng)創(chuàng)建大量同類(lèi)線(xiàn)程而導(dǎo)致消耗完內(nèi)存或者“過(guò)度切換”的問(wèn)題。

另外《阿里巴巴 Java 開(kāi)發(fā)手冊(cè)》中強(qiáng)制線(xiàn)程池不允許使用 Executors 去創(chuàng)建,而是通過(guò) ThreadPoolExecutor 構(gòu)造函數(shù)的方式,這樣的處理方式讓寫(xiě)的同學(xué)更加明確線(xiàn)程池的運(yùn)行規(guī)則,規(guī)避資源耗盡的風(fēng)險(xiǎn)

Executors 返回線(xiàn)程池對(duì)象的弊端如下:

  • FixedThreadPoolSingleThreadExecutor : 允許請(qǐng)求的隊(duì)列長(zhǎng)度為 Integer.MAX_VALUE,可能堆積大量的請(qǐng)求,從而導(dǎo)致 OOM。
  • CachedThreadPool 和 ScheduledThreadPool : 允許創(chuàng)建的線(xiàn)程數(shù)量為 Integer.MAX_VALUE ,可能會(huì)創(chuàng)建大量線(xiàn)程,從而導(dǎo)致 OOM。

方式一:通過(guò)ThreadPoolExecutor構(gòu)造函數(shù)實(shí)現(xiàn)(推薦)

方式二:通過(guò) Executor 框架的工具類(lèi) Executors 來(lái)實(shí)現(xiàn) 我們可以創(chuàng)建三種類(lèi)型的 ThreadPoolExecutor:

  • FixedThreadPool
  • SingleThreadExecutor
  • CachedThreadPool

四、(重要)ThreadPoolExecutor 使用示例

我們上面講解了 Executor框架以及 ThreadPoolExecutor 類(lèi),下面讓我們實(shí)戰(zhàn)一下,來(lái)通過(guò)寫(xiě)一個(gè) ThreadPoolExecutor 的小 Demo 來(lái)回顧上面的內(nèi)容。

4.1 示例代碼:Runnable+ThreadPoolExecutor

首先創(chuàng)建一個(gè) Runnable 接口的實(shí)現(xiàn)類(lèi)(當(dāng)然也可以是 Callable 接口,我們上面也說(shuō)了兩者的區(qū)別。)

MyRunnable.java

import java.util.Date;

/**
 * 這是一個(gè)簡(jiǎn)單的Runnable類(lèi),需要大約5秒鐘來(lái)執(zhí)行其任務(wù)。
 * @author shuang.kou
 */
public class MyRunnable implements Runnable {

    private String command;

    public MyRunnable(String s) {
        this.command = s;
    }

    @Override
    public void run() {
        System.out.println(Thread.currentThread().getName() + " Start. Time = " + new Date());
        processCommand();
        System.out.println(Thread.currentThread().getName() + " End. Time = " + new Date());
    }

    private void processCommand() {
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    @Override
    public String toString() {
        return this.command;
    }
}

編寫(xiě)測(cè)試程序,我們這里以阿里巴巴推薦的使用 ThreadPoolExecutor 構(gòu)造函數(shù)自定義參數(shù)的方式來(lái)創(chuàng)建線(xiàn)程池。

ThreadPoolExecutorDemo.java

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ThreadPoolExecutorDemo {

    private static final int CORE_POOL_SIZE = 5;
    private static final int MAX_POOL_SIZE = 10;
    private static final int QUEUE_CAPACITY = 100;
    private static final Long KEEP_ALIVE_TIME = 1L;
    public static void main(String[] args) {

        //使用阿里巴巴推薦的創(chuàng)建線(xiàn)程池的方式
        //通過(guò)ThreadPoolExecutor構(gòu)造函數(shù)自定義參數(shù)創(chuàng)建
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
                CORE_POOL_SIZE,
                MAX_POOL_SIZE,
                KEEP_ALIVE_TIME,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(QUEUE_CAPACITY),
                new ThreadPoolExecutor.CallerRunsPolicy());

        for (int i = 0; i < 10; i++) {
            //創(chuàng)建WorkerThread對(duì)象(WorkerThread類(lèi)實(shí)現(xiàn)了Runnable 接口)
            Runnable worker = new MyRunnable("" + i);
            //執(zhí)行Runnable
            executor.execute(worker);
        }
        //終止線(xiàn)程池
        executor.shutdown();
        while (!executor.isTerminated()) {
        }
        System.out.println("Finished all threads");
    }
}

可以看到我們上面的代碼指定了:

  1. corePoolSize: 核心線(xiàn)程數(shù)為 5。
  2. maximumPoolSize :最大線(xiàn)程數(shù) 10
  3. keepAliveTime : 等待時(shí)間為 1L。
  4. unit: 等待時(shí)間的單位為 TimeUnit.SECONDS。
  5. workQueue:任務(wù)隊(duì)列為 ArrayBlockingQueue,并且容量為 100;
  6. handler:飽和策略為 CallerRunsPolicy。

Output:

pool-1-thread-2 Start. Time = Tue Nov 12 20:59:44 CST 2019
pool-1-thread-5 Start. Time = Tue Nov 12 20:59:44 CST 2019
pool-1-thread-4 Start. Time = Tue Nov 12 20:59:44 CST 2019
pool-1-thread-1 Start. Time = Tue Nov 12 20:59:44 CST 2019
pool-1-thread-3 Start. Time = Tue Nov 12 20:59:44 CST 2019
pool-1-thread-5 End. Time = Tue Nov 12 20:59:49 CST 2019
pool-1-thread-3 End. Time = Tue Nov 12 20:59:49 CST 2019
pool-1-thread-2 End. Time = Tue Nov 12 20:59:49 CST 2019
pool-1-thread-4 End. Time = Tue Nov 12 20:59:49 CST 2019
pool-1-thread-1 End. Time = Tue Nov 12 20:59:49 CST 2019
pool-1-thread-2 Start. Time = Tue Nov 12 20:59:49 CST 2019
pool-1-thread-1 Start. Time = Tue Nov 12 20:59:49 CST 2019
pool-1-thread-4 Start. Time = Tue Nov 12 20:59:49 CST 2019
pool-1-thread-3 Start. Time = Tue Nov 12 20:59:49 CST 2019
pool-1-thread-5 Start. Time = Tue Nov 12 20:59:49 CST 2019
pool-1-thread-2 End. Time = Tue Nov 12 20:59:54 CST 2019
pool-1-thread-3 End. Time = Tue Nov 12 20:59:54 CST 2019
pool-1-thread-4 End. Time = Tue Nov 12 20:59:54 CST 2019
pool-1-thread-5 End. Time = Tue Nov 12 20:59:54 CST 2019
pool-1-thread-1 End. Time = Tue Nov 12 20:59:54 CST 2019

4.2 線(xiàn)程池原理分析

承接 4.1 節(jié),我們通過(guò)代碼輸出結(jié)果可以看出:線(xiàn)程池每次會(huì)同時(shí)執(zhí)行 5 個(gè)任務(wù),這 5 個(gè)任務(wù)執(zhí)行完之后,剩余的 5 個(gè)任務(wù)才會(huì)被執(zhí)行。 大家可以先通過(guò)上面講解的內(nèi)容,分析一下到底是咋回事?(自己獨(dú)立思考一會(huì))

現(xiàn)在,我們就分析上面的輸出內(nèi)容來(lái)簡(jiǎn)單分析一下線(xiàn)程池原理。

為了搞懂線(xiàn)程池的原理,我們需要首先分析一下 execute方法。在 4.1 節(jié)中的 Demo 中我們使用 executor.execute(worker)來(lái)提交一個(gè)任務(wù)到線(xiàn)程池中去,這個(gè)方法非常重要,下面我們來(lái)看看它的源碼:

   // 存放線(xiàn)程池的運(yùn)行狀態(tài) (runState) 和線(xiàn)程池內(nèi)有效線(xiàn)程的數(shù)量 (workerCount)
   private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

    private static int workerCountOf(int c) {
        return c & CAPACITY;
    }

    private final BlockingQueue<Runnable> workQueue;

    public void execute(Runnable command) {
        // 如果任務(wù)為null,則拋出異常。
        if (command == null)
            throw new NullPointerException();
        // ctl 中保存的線(xiàn)程池當(dāng)前的一些狀態(tài)信息
        int c = ctl.get();

        //  下面會(huì)涉及到 3 步 操作
        // 1.首先判斷當(dāng)前線(xiàn)程池中之行的任務(wù)數(shù)量是否小于 corePoolSize
        // 如果小于的話(huà),通過(guò)addWorker(command, true)新建一個(gè)線(xiàn)程,并將任務(wù)(command)添加到該線(xiàn)程中;然后,啟動(dòng)該線(xiàn)程從而執(zhí)行任務(wù)。
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        // 2.如果當(dāng)前之行的任務(wù)數(shù)量大于等于 corePoolSize 的時(shí)候就會(huì)走到這里
        // 通過(guò) isRunning 方法判斷線(xiàn)程池狀態(tài),線(xiàn)程池處于 RUNNING 狀態(tài)才會(huì)被并且隊(duì)列可以加入任務(wù),該任務(wù)才會(huì)被加入進(jìn)去
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            // 再次獲取線(xiàn)程池狀態(tài),如果線(xiàn)程池狀態(tài)不是 RUNNING 狀態(tài)就需要從任務(wù)隊(duì)列中移除任務(wù),并嘗試判斷線(xiàn)程是否全部執(zhí)行完畢。同時(shí)執(zhí)行拒絕策略。
            if (!isRunning(recheck) && remove(command))
                reject(command);
                // 如果當(dāng)前線(xiàn)程池為空就新創(chuàng)建一個(gè)線(xiàn)程并執(zhí)行。
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        //3\. 通過(guò)addWorker(command, false)新建一個(gè)線(xiàn)程,并將任務(wù)(command)添加到該線(xiàn)程中;然后,啟動(dòng)該線(xiàn)程從而執(zhí)行任務(wù)。
        //如果addWorker(command, false)執(zhí)行失敗,則通過(guò)reject()執(zhí)行相應(yīng)的拒絕策略的內(nèi)容。
        else if (!addWorker(command, false))
            reject(command);
    }

現(xiàn)在,讓我們?cè)诨氐?4.1 節(jié)我們寫(xiě)的 Demo, 現(xiàn)在應(yīng)該是不是很容易就可以搞懂它的原理了呢?

沒(méi)搞懂的話(huà),也沒(méi)關(guān)系,可以看看我的分析:

我們?cè)诖a中模擬了 10 個(gè)任務(wù),我們配置的核心線(xiàn)程數(shù)為 5 、等待隊(duì)列容量為 100 ,所以每次只可能存在 5 個(gè)任務(wù)同時(shí)執(zhí)行,剩下的 5 個(gè)任務(wù)會(huì)被放到等待隊(duì)列中去。當(dāng)前的 5 個(gè)任務(wù)之行完成后,才會(huì)之行剩下的 5 個(gè)任務(wù)。

4.3 幾個(gè)常見(jiàn)的對(duì)比

4.3.1 Runnable vs Callable

Runnable自 Java 1.0 以來(lái)一直存在,但Callable僅在 Java 1.5 中引入,目的就是為了來(lái)處理Runnable不支持的用例。Runnable 接口不會(huì)返回結(jié)果或拋出檢查異常,但是Callable 接口可以。所以,如果任務(wù)不需要返回結(jié)果或拋出異常推薦使用 Runnable 接口,這樣代碼看起來(lái)會(huì)更加簡(jiǎn)潔。

工具類(lèi) Executors 可以實(shí)現(xiàn) Runnable 對(duì)象和 Callable 對(duì)象之間的相互轉(zhuǎn)換。(Executors.callable(Runnable task)或 Executors.callable(Runnable task,Object resule))。

Runnable.java

@FunctionalInterface
public interface Runnable {
   /**
    * 被線(xiàn)程執(zhí)行,沒(méi)有返回值也無(wú)法拋出異常
    */
    public abstract void run();
}

Callable.java

@FunctionalInterface
public interface Callable<V> {
    /**
     * 計(jì)算結(jié)果,或在無(wú)法這樣做時(shí)拋出異常。
     * @return 計(jì)算得出的結(jié)果
     * @throws 如果無(wú)法計(jì)算結(jié)果,則拋出異常
     */
    V call() throws Exception;
}

4.3.2 execute() vs submit()

  • execute()方法用于提交不需要返回值的任務(wù),所以無(wú)法判斷任務(wù)是否被線(xiàn)程池執(zhí)行成功與否;
  • submit()方法用于提交需要返回值的任務(wù)。線(xiàn)程池會(huì)返回一個(gè) Future 類(lèi)型的對(duì)象,通過(guò)這個(gè) Future 對(duì)象可以判斷任務(wù)是否執(zhí)行成功,并且可以通過(guò) Futureget()方法來(lái)獲取返回值,get()方法會(huì)阻塞當(dāng)前線(xiàn)程直到任務(wù)完成,而使用 get(long timeout,TimeUnit unit)方法則會(huì)阻塞當(dāng)前線(xiàn)程一段時(shí)間后立即返回,這時(shí)候有可能任務(wù)沒(méi)有執(zhí)行完。

我們以AbstractExecutorService接口中的一個(gè) submit 方法為例子來(lái)看看源代碼:

    public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
    }

上面方法調(diào)用的 newTaskFor 方法返回了一個(gè) FutureTask 對(duì)象。

    protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
        return new FutureTask<T>(runnable, value);
    }

我們?cè)賮?lái)看看execute()方法:

    public void execute(Runnable command) {
      ...
    }

4.3.3 shutdown()VSshutdownNow()

  • shutdown() :關(guān)閉線(xiàn)程池,線(xiàn)程池的狀態(tài)變?yōu)?SHUTDOWN。線(xiàn)程池不再接受新任務(wù)了,但是隊(duì)列里的任務(wù)得執(zhí)行完畢。
  • shutdownNow() :關(guān)閉線(xiàn)程池,線(xiàn)程的狀態(tài)變?yōu)?STOP。線(xiàn)程池會(huì)終止當(dāng)前正在運(yùn)行的任務(wù),并停止處理排隊(duì)的任務(wù)并返回正在等待執(zhí)行的 List。

4.3.2 isTerminated() VS isShutdown()

  • isShutDown 當(dāng)調(diào)用 shutdown() 方法后返回為 true。
  • isTerminated 當(dāng)調(diào)用 shutdown() 方法后,并且所有提交的任務(wù)完成后返回為 true

4.4 加餐:Callable+ThreadPoolExecutor示例代碼

MyCallable.java

import java.util.concurrent.Callable;

public class MyCallable implements Callable<String> {
    @Override
    public String call() throws Exception {
        Thread.sleep(1000);
        //返回執(zhí)行當(dāng)前 Callable 的線(xiàn)程名字
        return Thread.currentThread().getName();
    }
}

CallableDemo.java

import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class CallableDemo {

    private static final int CORE_POOL_SIZE = 5;
    private static final int MAX_POOL_SIZE = 10;
    private static final int QUEUE_CAPACITY = 100;
    private static final Long KEEP_ALIVE_TIME = 1L;

    public static void main(String[] args) {

        //使用阿里巴巴推薦的創(chuàng)建線(xiàn)程池的方式
        //通過(guò)ThreadPoolExecutor構(gòu)造函數(shù)自定義參數(shù)創(chuàng)建
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
                CORE_POOL_SIZE,
                MAX_POOL_SIZE,
                KEEP_ALIVE_TIME,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(QUEUE_CAPACITY),
                new ThreadPoolExecutor.CallerRunsPolicy());

        List<Future<String>> futureList = new ArrayList<>();
        Callable<String> callable = new MyCallable();
        for (int i = 0; i < 10; i++) {
            //提交任務(wù)到線(xiàn)程池
            Future<String> future = executor.submit(callable);
            //將返回值 future 添加到 list,我們可以通過(guò) future 獲得 執(zhí)行 Callable 得到的返回值
            futureList.add(future);
        }
        for (Future<String> fut : futureList) {
            try {
                System.out.println(new Date() + "::" + fut.get());
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
        }
        //關(guān)閉線(xiàn)程池
        executor.shutdown();
    }
}

Output:

Wed Nov 13 13:40:41 CST 2019::pool-1-thread-1
Wed Nov 13 13:40:42 CST 2019::pool-1-thread-2
Wed Nov 13 13:40:42 CST 2019::pool-1-thread-3
Wed Nov 13 13:40:42 CST 2019::pool-1-thread-4
Wed Nov 13 13:40:42 CST 2019::pool-1-thread-5
Wed Nov 13 13:40:42 CST 2019::pool-1-thread-3
Wed Nov 13 13:40:43 CST 2019::pool-1-thread-2
Wed Nov 13 13:40:43 CST 2019::pool-1-thread-1
Wed Nov 13 13:40:43 CST 2019::pool-1-thread-4
Wed Nov 13 13:40:43 CST 2019::pool-1-thread-5

五、幾種常見(jiàn)的線(xiàn)程池詳解

5.1 FixedThreadPool

5.1.1 介紹

FixedThreadPool 被稱(chēng)為可重用固定線(xiàn)程數(shù)的線(xiàn)程池。通過(guò) Executors 類(lèi)中的相關(guān)源代碼來(lái)看一下相關(guān)實(shí)現(xiàn):

   /**
     * 創(chuàng)建一個(gè)可重用固定數(shù)量線(xiàn)程的線(xiàn)程池
     */
    public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>(),
                                      threadFactory);
    }

另外還有一個(gè) FixedThreadPool 的實(shí)現(xiàn)方法,和上面的類(lèi)似,所以這里不多做闡述:

    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

從上面源代碼可以看出新創(chuàng)建的 FixedThreadPoolcorePoolSizemaximumPoolSize 都被設(shè)置為 nThreads,這個(gè) nThreads 參數(shù)是我們使用的時(shí)候自己傳遞的。

5.1.2 為什么不推薦使用FixedThreadPool?

FixedThreadPool 使用無(wú)界隊(duì)列 LinkedBlockingQueue(隊(duì)列的容量為 Intger.MAX_VALUE)作為線(xiàn)程池的工作隊(duì)列會(huì)對(duì)線(xiàn)程池帶來(lái)如下影響 :

  1. 當(dāng)線(xiàn)程池中的線(xiàn)程數(shù)達(dá)到 corePoolSize 后,新任務(wù)將在無(wú)界隊(duì)列中等待,因此線(xiàn)程池中的線(xiàn)程數(shù)不會(huì)超過(guò) corePoolSize;
  2. 由于使用無(wú)界隊(duì)列時(shí) maximumPoolSize 將是一個(gè)無(wú)效參數(shù),因?yàn)椴豢赡艽嬖谌蝿?wù)隊(duì)列滿(mǎn)的情況。所以,通過(guò)創(chuàng)建 FixedThreadPool的源碼可以看出創(chuàng)建的 FixedThreadPoolcorePoolSizemaximumPoolSize 被設(shè)置為同一個(gè)值。
  3. 由于 1 和 2,使用無(wú)界隊(duì)列時(shí) keepAliveTime 將是一個(gè)無(wú)效參數(shù);
  4. 運(yùn)行中的 FixedThreadPool(未執(zhí)行 shutdown()shutdownNow())不會(huì)拒絕任務(wù),在任務(wù)比較多的時(shí)候會(huì)導(dǎo)致 OOM(內(nèi)存溢出)。

5.2 SingleThreadExecutor 詳解

5.2.1 介紹

SingleThreadExecutor 是只有一個(gè)線(xiàn)程的線(xiàn)程池。下面看看SingleThreadExecutor 的實(shí)現(xiàn):

   /**
     *返回只有一個(gè)線(xiàn)程的線(xiàn)程池
     */
    public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>(),
                                    threadFactory));
    }
   public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

從上面源代碼可以看出新創(chuàng)建的 SingleThreadExecutorcorePoolSizemaximumPoolSize 都被設(shè)置為 1.其他參數(shù)和 FixedThreadPool 相同。

5.2.2 為什么不推薦使用SingleThreadExecutor

SingleThreadExecutor 使用無(wú)界隊(duì)列 LinkedBlockingQueue 作為線(xiàn)程池的工作隊(duì)列(隊(duì)列的容量為 Intger.MAX_VALUE)。SingleThreadExecutor 使用無(wú)界隊(duì)列作為線(xiàn)程池的工作隊(duì)列會(huì)對(duì)線(xiàn)程池帶來(lái)的影響與 FixedThreadPool 相同。說(shuō)簡(jiǎn)單點(diǎn)就是可能會(huì)導(dǎo)致 OOM,

5.3 CachedThreadPool 詳解

5.3.1 介紹

CachedThreadPool 是一個(gè)會(huì)根據(jù)需要?jiǎng)?chuàng)建新線(xiàn)程的線(xiàn)程池。下面通過(guò)源碼來(lái)看看 CachedThreadPool 的實(shí)現(xiàn):

    /**
     * 創(chuàng)建一個(gè)線(xiàn)程池,根據(jù)需要?jiǎng)?chuàng)建新線(xiàn)程,但會(huì)在先前構(gòu)建的線(xiàn)程可用時(shí)重用它。
     */
    public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>(),
                                      threadFactory);
    }

    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

CachedThreadPoolcorePoolSize 被設(shè)置為空(0),maximumPoolSize被設(shè)置為 Integer.MAX.VALUE,即它是無(wú)界的,這也就意味著如果主線(xiàn)程提交任務(wù)的速度高于 maximumPool 中線(xiàn)程處理任務(wù)的速度時(shí),CachedThreadPool 會(huì)不斷創(chuàng)建新的線(xiàn)程。極端情況下,這樣會(huì)導(dǎo)致耗盡 cpu 和內(nèi)存資源。

5.3.2 為什么不推薦使用CachedThreadPool?

CachedThreadPool允許創(chuàng)建的線(xiàn)程數(shù)量為 Integer.MAX_VALUE ,可能會(huì)創(chuàng)建大量線(xiàn)程,從而導(dǎo)致 OOM。

六、ScheduledThreadPoolExecutor 詳解

ScheduledThreadPoolExecutor 主要用來(lái)在給定的延遲后運(yùn)行任務(wù),或者定期執(zhí)行任務(wù)。 這個(gè)在實(shí)際項(xiàng)目中基本不會(huì)被用到,所以對(duì)這部分大家只需要簡(jiǎn)單了解一下它的思想。

6.1 簡(jiǎn)介

ScheduledThreadPoolExecutor 使用的任務(wù)隊(duì)列 DelayQueue 封裝了一個(gè) PriorityQueuePriorityQueue 會(huì)對(duì)隊(duì)列中的任務(wù)進(jìn)行排序,執(zhí)行所需時(shí)間短的放在前面先被執(zhí)行(ScheduledFutureTasktime 變量小的先執(zhí)行),如果執(zhí)行所需時(shí)間相同則先提交的任務(wù)將被先執(zhí)行(ScheduledFutureTasksquenceNumber 變量小的先執(zhí)行)。

ScheduledThreadPoolExecutorTimer 的比較:

  • Timer 對(duì)系統(tǒng)時(shí)鐘的變化敏感,ScheduledThreadPoolExecutor不是;
  • Timer 只有一個(gè)執(zhí)行線(xiàn)程,因此長(zhǎng)時(shí)間運(yùn)行的任務(wù)可以延遲其他任務(wù)。 ScheduledThreadPoolExecutor 可以配置任意數(shù)量的線(xiàn)程。 此外,如果你想(通過(guò)提供 ThreadFactory),你可以完全控制創(chuàng)建的線(xiàn)程;
  • TimerTask 中拋出的運(yùn)行時(shí)異常會(huì)殺死一個(gè)線(xiàn)程,從而導(dǎo)致 Timer 死機(jī):-( ...即計(jì)劃任務(wù)將不再運(yùn)行。ScheduledThreadExecutor 不僅捕獲運(yùn)行時(shí)異常,還允許您在需要時(shí)處理它們(通過(guò)重寫(xiě) afterExecute 方法ThreadPoolExecutor)。拋出異常的任務(wù)將被取消,但其他任務(wù)將繼續(xù)運(yùn)行。

綜上,在 JDK1.5 之后,你沒(méi)有理由再使用 Timer 進(jìn)行任務(wù)調(diào)度了。

備注: Quartz 是一個(gè)由 java 編寫(xiě)的任務(wù)調(diào)度庫(kù),由 OpenSymphony 組織開(kāi)源出來(lái)。在實(shí)際項(xiàng)目開(kāi)發(fā)中使用 Quartz 的還是居多,比較推薦使用 Quartz。因?yàn)?Quartz 理論上能夠同時(shí)對(duì)上萬(wàn)個(gè)任務(wù)進(jìn)行調(diào)度,擁有豐富的功能特性,包括任務(wù)調(diào)度、任務(wù)持久化、可集群化、插件等等。

6.2 運(yùn)行機(jī)制

ScheduledThreadPoolExecutor 的執(zhí)行主要分為兩大部分:

  1. 當(dāng)調(diào)用 ScheduledThreadPoolExecutorscheduleAtFixedRate() 方法或者scheduleWirhFixedDelay() 方法時(shí),會(huì)向 ScheduledThreadPoolExecutorDelayQueue 添加一個(gè)實(shí)現(xiàn)了 RunnableScheduledFuture 接口的 ScheduledFutureTask
  2. 線(xiàn)程池中的線(xiàn)程從 DelayQueue 中獲取 ScheduledFutureTask,然后執(zhí)行任務(wù)。

ScheduledThreadPoolExecutor 為了實(shí)現(xiàn)周期性的執(zhí)行任務(wù),對(duì) ThreadPoolExecutor做了如下修改:

  • 使用 DelayQueue 作為任務(wù)隊(duì)列;
  • 獲取任務(wù)的方不同
  • 執(zhí)行周期任務(wù)后,增加了額外的處理

6.3 ScheduledThreadPoolExecutor 執(zhí)行周期任務(wù)的步驟

  1. 線(xiàn)程 1 從 DelayQueue 中獲取已到期的 ScheduledFutureTask(DelayQueue.take())。到期任務(wù)是指 ScheduledFutureTask的 time 大于等于當(dāng)前系統(tǒng)的時(shí)間;
  2. 線(xiàn)程 1 執(zhí)行這個(gè) ScheduledFutureTask
  3. 線(xiàn)程 1 修改 ScheduledFutureTask 的 time 變量為下次將要被執(zhí)行的時(shí)間;
  4. 線(xiàn)程 1 把這個(gè)修改 time 之后的 ScheduledFutureTask 放回 DelayQueue 中(DelayQueue.add())。

七、線(xiàn)程池大小確定

線(xiàn)程池?cái)?shù)量的確定一直是困擾著程序員的一個(gè)難題,大部分程序員在設(shè)定線(xiàn)程池大小的時(shí)候就是隨心而定。我們并沒(méi)有考慮過(guò)這樣大小的配置是否會(huì)帶來(lái)什么問(wèn)題,我自己就是這大部分程序員中的一個(gè)代表。

由于筆主對(duì)如何確定線(xiàn)程池大小也沒(méi)有什么實(shí)際經(jīng)驗(yàn),所以,這部分內(nèi)容參考了網(wǎng)上很多文章/書(shū)籍。

首先,可以肯定的一點(diǎn)是線(xiàn)程池大小設(shè)置過(guò)大或者過(guò)小都會(huì)有問(wèn)題。合適的才是最好,貌似在 95 % 的場(chǎng)景下都是合適的。

如果我們?cè)O(shè)置的線(xiàn)程池?cái)?shù)量太小的話(huà),如果同一時(shí)間有大量任務(wù)/請(qǐng)求需要處理,可能會(huì)導(dǎo)致大量的請(qǐng)求/任務(wù)在任務(wù)隊(duì)列中排隊(duì)等待執(zhí)行,甚至?xí)霈F(xiàn)任務(wù)隊(duì)列滿(mǎn)了之后任務(wù)/請(qǐng)求無(wú)法處理的情況,或者大量任務(wù)堆積在任務(wù)隊(duì)列導(dǎo)致 OOM。這樣很明顯是有問(wèn)題的! CPU 根本沒(méi)有得到充分利用。

但是,如果我們?cè)O(shè)置線(xiàn)程數(shù)量太大,大量線(xiàn)程可能會(huì)同時(shí)在爭(zhēng)取 CPU 資源,這樣會(huì)導(dǎo)致大量的上下文切換,從而增加線(xiàn)程的執(zhí)行時(shí)間,影響了整體執(zhí)行效率。

上下文切換:

多線(xiàn)程編程中一般線(xiàn)程的個(gè)數(shù)都大于 CPU 核心的個(gè)數(shù),而一個(gè) CPU 核心在任意時(shí)刻只能被一個(gè)線(xiàn)程使用,為了讓這些線(xiàn)程都能得到有效執(zhí)行,CPU 采取的策略是為每個(gè)線(xiàn)程分配時(shí)間片并輪轉(zhuǎn)的形式。當(dāng)一個(gè)線(xiàn)程的時(shí)間片用完的時(shí)候就會(huì)重新處于就緒狀態(tài)讓給其他線(xiàn)程使用,這個(gè)過(guò)程就屬于一次上下文切換。概括來(lái)說(shuō)就是:當(dāng)前任務(wù)在執(zhí)行完 CPU 時(shí)間片切換到另一個(gè)任務(wù)之前會(huì)先保存自己的狀態(tài),以便下次再切換回這個(gè)任務(wù)時(shí),可以再加載這個(gè)任務(wù)的狀態(tài)。任務(wù)從保存到再加載的過(guò)程就是一次上下文切換。

上下文切換通常是計(jì)算密集型的。也就是說(shuō),它需要相當(dāng)可觀(guān)的處理器時(shí)間,在每秒幾十上百次的切換中,每次切換都需要納秒量級(jí)的時(shí)間。所以,上下文切換對(duì)系統(tǒng)來(lái)說(shuō)意味著消耗大量的 CPU 時(shí)間,事實(shí)上,可能是操作系統(tǒng)中時(shí)間消耗最大的操作。

Linux 相比與其他操作系統(tǒng)(包括其他類(lèi) Unix 系統(tǒng))有很多的優(yōu)點(diǎn),其中有一項(xiàng)就是,其上下文切換和模式切換的時(shí)間消耗非常少。

有一個(gè)簡(jiǎn)單并且適用面比較廣的公式:

  • CPU 密集型任務(wù)(N+1): 這種任務(wù)消耗的主要是 CPU 資源,可以將線(xiàn)程數(shù)設(shè)置為 N(CPU 核心數(shù))+1,比 CPU 核心數(shù)多出來(lái)的一個(gè)線(xiàn)程是為了防止線(xiàn)程偶發(fā)的缺頁(yè)中斷,或者其它原因?qū)е碌娜蝿?wù)暫停而帶來(lái)的影響。一旦任務(wù)暫停,CPU 就會(huì)處于空閑狀態(tài),而在這種情況下多出來(lái)的一個(gè)線(xiàn)程就可以充分利用 CPU 的空閑時(shí)間。
  • I/O 密集型任務(wù)(2N): 這種任務(wù)應(yīng)用起來(lái),系統(tǒng)會(huì)用大部分的時(shí)間來(lái)處理 I/O 交互,而線(xiàn)程在處理 I/O 的時(shí)間段內(nèi)不會(huì)占用 CPU 來(lái)處理,這時(shí)就可以將 CPU 交出給其它線(xiàn)程使用。因此在 I/O 密集型任務(wù)的應(yīng)用中,我們可以多配置一些線(xiàn)程,具體的計(jì)算方法是 2N。

# 鏈接 Java程序員福利"常用資料分享"

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀(guān)點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容