高并發(fā)之——從源碼角度分析創(chuàng)建線程池究竟有哪些方式

前言

在Java的高并發(fā)領(lǐng)域,線程池一直是一個繞不開的話題。有些童鞋一直在使用線程池,但是,對于如何創(chuàng)建線程池僅僅停留在使用Executors工具類的方式,那么,創(chuàng)建線程池究竟存在哪幾種方式呢?就讓我們一起從創(chuàng)建線程池的源碼來深入分析究竟有哪些方式可以創(chuàng)建線程池。

使用Executors工具類創(chuàng)建線程池

在創(chuàng)建線程池時,初學(xué)者用的最多的就是Executors 這個工具類,而使用這個工具類創(chuàng)建線程池時非常簡單的,不需要關(guān)注太多的線程池細節(jié),只需要傳入必要的參數(shù)即可。Executors 工具類提供了幾種創(chuàng)建線程池的方法,如下所示。

  • Executors.newCachedThreadPool:創(chuàng)建一個可緩存的線程池,如果線程池的大小超過了需要,可以靈活回收空閑線程,如果沒有可回收線程,則新建線程
  • Executors.newFixedThreadPool:創(chuàng)建一個定長的線程池,可以控制線程的最大并發(fā)數(shù),超出的線程會在隊列中等待
  • Executors.newScheduledThreadPool:創(chuàng)建一個定長的線程池,支持定時、周期性的任務(wù)執(zhí)行
  • Executors.newSingleThreadExecutor: 創(chuàng)建一個單線程化的線程池,使用一個唯一的工作線程執(zhí)行任務(wù),保證所有任務(wù)按照指定順序(先入先出或者優(yōu)先級)執(zhí)行
  • Executors.newSingleThreadScheduledExecutor:創(chuàng)建一個單線程化的線程池,支持定時、周期性的任務(wù)執(zhí)行
  • Executors.newWorkStealingPool:創(chuàng)建一個具有并行級別的work-stealing線程池

其中,Executors.newWorkStealingPool方法是Java 8中新增的創(chuàng)建線程池的方法,它能夠為線程池設(shè)置并行級別,具有更高的并發(fā)度和性能。除了此方法外,其他創(chuàng)建線程池的方法本質(zhì)上調(diào)用的是ThreadPoolExecutor類的構(gòu)造方法。

例如,我們可以使用如下代碼創(chuàng)建線程池。

Executors.newWorkStealingPool();
Executors.newCachedThreadPool();
Executors.newScheduledThreadPool(3);

使用ThreadPoolExecutor類創(chuàng)建線程池

從代碼結(jié)構(gòu)上看ThreadPoolExecutor類繼承自AbstractExecutorService,也就是說,ThreadPoolExecutor類具有AbstractExecutorService類的全部功能。

既然Executors工具類中創(chuàng)建線程池大部分調(diào)用的都是ThreadPoolExecutor類的構(gòu)造方法,所以,我們也可以直接調(diào)用ThreadPoolExecutor類的構(gòu)造方法來創(chuàng)建線程池,而不再使用Executors工具類。接下來,我們一起看下ThreadPoolExecutor類的構(gòu)造方法。

ThreadPoolExecutor類中的所有構(gòu)造方法如下所示。

public ThreadPoolExecutor(int corePoolSize,
                  int maximumPoolSize,
                  long keepAliveTime,
                  TimeUnit unit,
                 BlockingQueue<Runnable> workQueue) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         Executors.defaultThreadFactory(), defaultHandler);
}
 
public ThreadPoolExecutor(int corePoolSize,
                int maximumPoolSize,
                long keepAliveTime,
                TimeUnit unit,
                BlockingQueue<Runnable> workQueue,
                    ThreadFactory threadFactory) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
     threadFactory, defaultHandler);
}
 
public ThreadPoolExecutor(int corePoolSize,
                int maximumPoolSize,
                long keepAliveTime,
                    TimeUnit unit,
                BlockingQueue<Runnable> workQueue,
                RejectedExecutionHandler handler) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
     Executors.defaultThreadFactory(), handler);
}
 
public ThreadPoolExecutor(int corePoolSize,
                int maximumPoolSize,
                long keepAliveTime,
                TimeUnit unit,
                    BlockingQueue<Runnable> workQueue,
                ThreadFactory threadFactory,
                RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.acc = System.getSecurityManager() == null ?
            null :
            AccessController.getContext();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

由ThreadPoolExecutor類的構(gòu)造方法的源代碼可知,創(chuàng)建線程池最終調(diào)用的構(gòu)造方法如下。

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
              long keepAliveTime, TimeUnit unit,
              BlockingQueue<Runnable> workQueue,
              ThreadFactory threadFactory,
                  RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.acc = System.getSecurityManager() == null ?
            null :
            AccessController.getContext();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

關(guān)于此構(gòu)造方法中各參數(shù)的含義和作用,如下所示。
注意:為了更加深入的分析ThreadPoolExecutor類的構(gòu)造方法,會適當(dāng)調(diào)整參數(shù)的順序進行解析,以便于大家更能深入的理解ThreadPoolExecutor構(gòu)造方法中每個參數(shù)的作用。

上述構(gòu)造方法接收如下參數(shù)進行初始化:

(1)corePoolSize:核心線程數(shù)量。

(2)maximumPoolSize:最大線程數(shù)。

(3)workQueue:阻塞隊列,存儲等待執(zhí)行的任務(wù),很重要,會對線程池運行過程產(chǎn)生重大影響。

其中,上述三個參數(shù)的關(guān)系如下所示:

  • 如果運行的線程數(shù)小于corePoolSize,直接創(chuàng)建新線程處理任務(wù),即使線程池中的其他線程是空閑的。
  • 如果運行的線程數(shù)大于等于corePoolSize,并且小于maximumPoolSize,此時,只有當(dāng)workQueue滿時,才會創(chuàng)建新的線程處理任務(wù)。
  • 如果設(shè)置的corePoolSize與maximumPoolSize相同,那么創(chuàng)建的線程池大小是固定的,此時,如果有新任務(wù)提交,并且workQueue沒有滿時,就把請求放入到workQueue中,等待空閑的線程,從workQueue中取出任務(wù)進行處理。
  • 如果運行的線程數(shù)量大于maximumPoolSize,同時,workQueue已經(jīng)滿了,會通過拒絕策略參數(shù)rejectHandler來指定處理策略。

根據(jù)上述三個參數(shù)的配置,線程池會對任務(wù)進行如下處理方式:

當(dāng)提交一個新的任務(wù)到線程池時,線程池會根據(jù)當(dāng)前線程池中正在運行的線程數(shù)量來決定該任務(wù)的處理方式。處理方式總共有三種:直接切換、使用無限隊列、使用有界隊列。

  • 直接切換常用的隊列就是SynchronousQueue。
  • 使用無限隊列就是使用基于鏈表的隊列,比如:LinkedBlockingQueue,如果使用這種方式,線程池中創(chuàng)建的最大線程數(shù)就是corePoolSize,此時maximumPoolSize不會起作用。當(dāng)線程池中所有的核心線程都是運行狀態(tài)時,提交新任務(wù),就會放入等待隊列中。
  • 使用有界隊列使用的是ArrayBlockingQueue,使用這種方式可以將線程池的最大線程數(shù)量限制為maximumPoolSize,可以降低資源的消耗。但是,這種方式使得線程池對線程的調(diào)度更困難,因為線程池和隊列的容量都是有限的了。

根據(jù)上面三個參數(shù),我們可以簡單得出如何降低系統(tǒng)資源消耗的一些措施:

  • 如果想降低系統(tǒng)資源的消耗,包括CPU使用率,操作系統(tǒng)資源的消耗,上下文環(huán)境切換的開銷等,可以設(shè)置一個較大的隊列容量和較小的線程池容量。這樣,會降低線程處理任務(wù)的吞吐量。
  • 如果提交的任務(wù)經(jīng)常發(fā)生阻塞,可以考慮調(diào)用設(shè)置最大線程數(shù)的方法,重新設(shè)置線程池最大線程數(shù)。如果隊列的容量設(shè)置的較小,通常需要將線程池的容量設(shè)置的大一些,這樣,CPU的使用率會高些。如果線程池的容量設(shè)置的過大,并發(fā)量就會增加,則需要考慮線程調(diào)度的問題,反而可能會降低處理任務(wù)的吞吐量。

接下來,我們繼續(xù)看ThreadPoolExecutor的構(gòu)造方法的參數(shù)。

(4)keepAliveTime:線程沒有任務(wù)執(zhí)行時最多保持多久時間終止
當(dāng)線程池中的線程數(shù)量大于corePoolSize時,如果此時沒有新的任務(wù)提交,核心線程外的線程不會立即銷毀,需要等待,直到等待的時間超過了keepAliveTime就會終止。

(5)unit:keepAliveTime的時間單位

(6)threadFactory:線程工廠,用來創(chuàng)建線程
默認會提供一個默認的工廠來創(chuàng)建線程,當(dāng)使用默認的工廠來創(chuàng)建線程時,會使新創(chuàng)建的線程具有相同的優(yōu)先級,并且是非守護的線程,同時也設(shè)置了線程的名稱

(7)rejectHandler:拒絕處理任務(wù)時的策略

如果workQueue阻塞隊列滿了,并且沒有空閑的線程池,此時,繼續(xù)提交任務(wù),需要采取一種策略來處理這個任務(wù)。
線程池總共提供了四種策略:

  • 直接拋出異常,這也是默認的策略。實現(xiàn)類為AbortPolicy。
  • 用調(diào)用者所在的線程來執(zhí)行任務(wù)。實現(xiàn)類為CallerRunsPolicy。
  • 丟棄隊列中最靠前的任務(wù)并執(zhí)行當(dāng)前任務(wù)。實現(xiàn)類為DiscardOldestPolicy。
  • 直接丟棄當(dāng)前任務(wù)。實現(xiàn)類為DiscardPolicy。

大家可以自行調(diào)用ThreadPoolExecutor類的構(gòu)造方法來創(chuàng)建線程池。例如,我們可以使用如下形式創(chuàng)建線程池。

new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                       60L, TimeUnit.SECONDS,
                       new SynchronousQueue<Runnable>());

使用ForkJoinPool類創(chuàng)建線程池

在Java8的Executors工具類中,新增了如下創(chuàng)建線程池的方式。

public static ExecutorService newWorkStealingPool(int parallelism) {
    return new ForkJoinPool
        (parallelism,
         ForkJoinPool.defaultForkJoinWorkerThreadFactory,
         null, true);
}
 
public static ExecutorService newWorkStealingPool() {
    return new ForkJoinPool
        (Runtime.getRuntime().availableProcessors(),
         ForkJoinPool.defaultForkJoinWorkerThreadFactory,
         null, true);
}

從源代碼可以可以,本質(zhì)上調(diào)用的是ForkJoinPool類的構(gòu)造方法類創(chuàng)建線程池,而從代碼結(jié)構(gòu)上來看ForkJoinPool類繼承自AbstractExecutorService抽象類。接下來,我們看下ForkJoinPool類的構(gòu)造方法。

public ForkJoinPool() {
    this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()),
         defaultForkJoinWorkerThreadFactory, null, false);
}
 public ForkJoinPool(int parallelism) {
    this(parallelism, defaultForkJoinWorkerThreadFactory, null, false);
}
 
public ForkJoinPool(int parallelism,
                ForkJoinWorkerThreadFactory factory,
                UncaughtExceptionHandler handler,
                boolean asyncMode) {
    this(checkParallelism(parallelism),
         checkFactory(factory),
         handler,
         asyncMode ? FIFO_QUEUE : LIFO_QUEUE,
         "ForkJoinPool-" + nextPoolId() + "-worker-");
    checkPermission();
}
 
private ForkJoinPool(int parallelism,
                 ForkJoinWorkerThreadFactory factory,
                 UncaughtExceptionHandler handler,
                 int mode,
                 String workerNamePrefix) {
    this.workerNamePrefix = workerNamePrefix;
    this.factory = factory;
    this.ueh = handler;
    this.config = (parallelism & SMASK) | mode;
    long np = (long)(-parallelism); // offset ctl counts
    this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
}

通過查看源代碼得知,F(xiàn)orkJoinPool的構(gòu)造方法,最終調(diào)用的是如下私有構(gòu)造方法。

private ForkJoinPool(int parallelism,
                 ForkJoinWorkerThreadFactory factory,
                 UncaughtExceptionHandler handler,
                 int mode,
                 String workerNamePrefix) {
    this.workerNamePrefix = workerNamePrefix;
    this.factory = factory;
    this.ueh = handler;
    this.config = (parallelism & SMASK) | mode;
    long np = (long)(-parallelism); // offset ctl counts
    this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
}

其中,各參數(shù)的含義如下所示。

  • parallelism:并發(fā)級別。
  • factory:創(chuàng)建線程的工廠類對象。
  • handler:當(dāng)線程池中的線程拋出未捕獲的異常時,統(tǒng)一使用UncaughtExceptionHandler對象處理。
  • mode:取值為FIFO_QUEUE或者LIFO_QUEUE。
  • workerNamePrefix:執(zhí)行任務(wù)的線程名稱的前綴。

當(dāng)然,私有構(gòu)造方法雖然是參數(shù)最多的一個方法,但是其不會直接對外方法,我們可以使用如下方式創(chuàng)建線程池。

new ForkJoinPool();
new ForkJoinPool(Runtime.getRuntime().availableProcessors());
new ForkJoinPool(Runtime.getRuntime().availableProcessors(),
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);

使用ScheduledThreadPoolExecutor類創(chuàng)建線程池

在Executors工具類中存在如下方法類創(chuàng)建線程池。

public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
    return new DelegatedScheduledExecutorService
        (new ScheduledThreadPoolExecutor(1));
}
 
public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) {
    return new DelegatedScheduledExecutorService
        (new ScheduledThreadPoolExecutor(1, threadFactory));
}
 
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    return new ScheduledThreadPoolExecutor(corePoolSize);
}
 
public static ScheduledExecutorService newScheduledThreadPool(
        int corePoolSize, ThreadFactory threadFactory) {
    return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}

從源碼來看,這幾個方法本質(zhì)上調(diào)用的都是ScheduledThreadPoolExecutor類的構(gòu)造方法,ScheduledThreadPoolExecutor中存在的構(gòu)造方法如下所示。

public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue());
}
 
public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue(), threadFactory);
}
 
public ScheduledThreadPoolExecutor(int corePoolSize, RejectedExecutionHandler handler) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue(), handler);
}
 
public ScheduledThreadPoolExecutor(int corePoolSize,ThreadFactory threadFactory, RejectedExecutionHandler handler) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue(), threadFactory, handler);
}

而從代碼結(jié)構(gòu)上看,ScheduledThreadPoolExecutor類繼承自ThreadPoolExecutor類,本質(zhì)上還是調(diào)用ThreadPoolExecutor類的構(gòu)造方法,只不過此時傳遞的隊列為DelayedWorkQueue。我們可以直接調(diào)用ScheduledThreadPoolExecutor類的構(gòu)造方法來創(chuàng)建線程池,例如以如下形式創(chuàng)建線程池。

new ScheduledThreadPoolExecutor(3)

最后,需要注意的是:ScheduledThreadPoolExecutor主要用來創(chuàng)建執(zhí)行定時任務(wù)的線程池。

關(guān)注【冰河技術(shù)】微信公眾號,每天閱讀深度技術(shù)好文

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容