Executor框架使用ThreadPoolExecutor

java線程池 - ThreadPoolExecutor

ThreadPoolExecutor是Executor框架的主要成員,也是最核心的類,是線程池的實(shí)現(xiàn)類。

通過(guò)Executor框架的Executors工具類,可以創(chuàng)建3種類型的ThreadPoolExecutor,如下

  • FixedThreadPool:適用于需要限制當(dāng)前線程數(shù)量的應(yīng)用場(chǎng)景
  • SingleThreadExecutor:適用于需要保證順序地執(zhí)行各個(gè)任務(wù)且在任意時(shí)間點(diǎn),不會(huì)有多個(gè)線程活動(dòng)的應(yīng)用場(chǎng)景
  • CachedThreadPool:適用于執(zhí)行很多的短期異步任務(wù)

FixedThreadPool

可重用固定線程數(shù)的線程池

源碼

public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>(),
                                      threadFactory);
    }

FixedThreadPool的運(yùn)行示意圖

FixedThreadPool的運(yùn)行示意圖
  • 如果當(dāng)前運(yùn)行的線程數(shù)少于corePoolSize,則創(chuàng)建新的線程來(lái)執(zhí)行任務(wù)。
  • 當(dāng)前運(yùn)行的線程數(shù)等于或者大于corePoolSize,將任務(wù)加入到LinkedBlockingQueue。
  • 任務(wù)執(zhí)行完后會(huì)在循環(huán)中反復(fù)從LinkedBlockingQueue中獲取任務(wù)來(lái)執(zhí)行
    -因?yàn)槭褂玫氖菬o(wú)界隊(duì)列,所以運(yùn)行中的FixedThreadPool不會(huì)拒絕任務(wù),maximumPoolSizekeepAliveTime都是無(wú)效參數(shù)。

例子

package com.sy.thread.example;

import java.util.concurrent.*;

/**
 * Description: thread
 *
 * @author songyu
 */
public class FixedThreadPoolTest {

    public static void main(String[] args) {
        //因?yàn)閚ewFixedThreadPool創(chuàng)建的是使用的無(wú)界的隊(duì)列,如果IDEA里面安裝了阿里的java編碼規(guī)范會(huì)提示:
        //FixedThreadPool和SingleThreadPool: 允許的請(qǐng)求隊(duì)列長(zhǎng)度為Integer.MAX_VALUE,可能會(huì)堆積大量的請(qǐng)求,從而導(dǎo)致OOM
        //可以換一種寫法等同寫法,如下
        //int nThreads = 1;
        //ExecutorService service = new ThreadPoolExecutor(nThreads,nThreads,0L, TimeUnit.MILLISECONDS,
        //        new LinkedBlockingQueue<Runnable>());
        ExecutorService service = Executors.newFixedThreadPool(1);
        for(int i = 0; i < 5; i++) {
            int finalI = i;
            Thread thread = new Thread(new Runnable() {
                @Override
                public void run() {
                    System.out.println("第" + finalI +  "任務(wù)執(zhí)行");
                }
            });
            service.execute(thread);
        }
    }
}

SingleThreadExecutor

使用單個(gè)工作線程

源碼

public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>(),
                                    threadFactory));
    }

SingleThreadExecutor的運(yùn)行示意圖

SingleThreadExecutor的運(yùn)行示意圖
  • 任務(wù)提交到線程池后,如果線程池中沒有運(yùn)行的線程,則創(chuàng)建新的線程來(lái)執(zhí)行任務(wù)。
  • 如果線程中有一個(gè)運(yùn)行的線程,則將任務(wù)加入到LinkedBlockingQueue。
  • 任務(wù)執(zhí)行完后會(huì)在循環(huán)中反復(fù)從LinkedBlockingQueue中獲取任務(wù)來(lái)執(zhí)行。

例子

package com.sy.thread.example;

import java.util.concurrent.*;

/**
 * Description: thread
 *
 * @author songyu
 */
public class SingleThreadExecutorTest {

    public static void main(String[] args) {
        //因?yàn)閚ewSingleThreadExecutor創(chuàng)建的是使用的無(wú)界的隊(duì)列,如果IDEA里面安裝了阿里的java編碼規(guī)范會(huì)提示:
        //FixedThreadPool和SingleThreadPool: 允許的請(qǐng)求隊(duì)列長(zhǎng)度為Integer.MAX_VALUE,可能會(huì)堆積大量的請(qǐng)求,從而導(dǎo)致OOM
        //可以換一種寫法等同寫法,如下
        //ExecutorService service = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
        ExecutorService service = Executors.newSingleThreadExecutor();
        for(int i = 0; i < 5; i++) {
            int finalI = i;
            Thread thread = new Thread(new Runnable() {
                @Override
                public void run() {
                    System.out.println("第" + finalI +  "任務(wù)執(zhí)行");
                }
            });
            service.execute(thread);
        }
    }
}

CachedThreadPool

根據(jù)需要?jiǎng)?chuàng)建新線程

源碼

 public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>(),
                                      threadFactory);
    }

CachedThreadPoolcorePoolSize設(shè)置成0,即corePool為空,maximumPoolSize被設(shè)置為Integer.MAX_VALUE,即maximumPool是無(wú)界的,keepAliveTime設(shè)置為60L,即CachedThreadPool中的空閑線程等待新任務(wù)的最長(zhǎng)時(shí)間為60s,超過(guò)60s就會(huì)被終止。
CachedThreadPool使用的是沒有容量的SynchronousQueue作為工作隊(duì)列,而maximumPool也是無(wú)界的,所以,如果主線程提交任務(wù)的速度大于線程池中的線程處理速度,CachedThreadPool會(huì)不斷創(chuàng)建新線程,在極端情況下會(huì)因?yàn)閯?chuàng)建過(guò)多的線程而耗盡CPU和內(nèi)存資源。

CachedThreadPool運(yùn)行示意圖

CachedThreadPool運(yùn)行示意圖
  • 首先執(zhí)行SynchronousQueue.offer(Runnable task),如果當(dāng)前maximumPool中的空閑線程正在執(zhí)行SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS),那么主線程執(zhí)行offer操作與空閑線程執(zhí)行的poll操作配對(duì)成功,主線程把任務(wù)交給空閑線程執(zhí)行(當(dāng)初始maximumPool為空或者maximumPool中當(dāng)前沒有空閑線程時(shí),將沒有線程執(zhí)行SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS),上述步驟將會(huì)失敗,這個(gè)時(shí)候CachedThreadPool會(huì)創(chuàng)建一個(gè)新線程執(zhí)行任務(wù)。
  • 新創(chuàng)建的線程將任務(wù)執(zhí)行完畢后,會(huì)執(zhí)行SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS)讓空閑的線程最多在SynchronousQueue中等待60s,如果60s內(nèi)主線程提交了新任務(wù),那么空線程將執(zhí)行新任務(wù),否則線程終止。

SynchronousQueue是一個(gè)沒有容量的阻塞隊(duì)列,CachedThreadPool使用SynchronousQueue把主線程提交的任務(wù)傳遞給空閑線程執(zhí)行。

CachedThreadPool中任務(wù)傳遞示意圖

CachedThreadPool中任務(wù)傳遞示意圖

例子

package com.sy.thread.example;

import java.util.concurrent.*;

/**
 * Description: thread
 *
 * @author songyu
 */
public class CachedThreadPoolTest {
    public static void main(String[] args) {
        //因?yàn)镃achedThreadPool允許的創(chuàng)建線程數(shù)量為Integer.MAX_VALUE,如果IDEA里面安裝了阿里的java編碼規(guī)范會(huì)提示:
        //CachedThreadPool: 允許的創(chuàng)建線程數(shù)量為Integer.MAX_VALUE,可能會(huì)創(chuàng)建大量的線程,從而導(dǎo)致OOM
        //可以換一種寫法等同寫法,如下
        //ExecutorService service = new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());
        ExecutorService service = Executors.newCachedThreadPool();
        for(int i = 0; i < 5; i++) {
            int finalI = i;
            Thread thread = new Thread(new Runnable() {
                @Override
                public void run() {
                    System.out.println("第" + finalI +  "任務(wù)執(zhí)行");
                }
            });
            service.execute(thread);
        }
    }
}

參考書籍《java并發(fā)編程的藝術(shù)》推薦大家閱讀。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容