線程池第一印象~

進(jìn)階線程池啦~.png
ThreadPoolExecutor 繼承關(guān)系.png
ThreadPoolExecutor 方法Structure.png
AbstractExecutorService 方法Structure.png
ExecutorService 方法Structure.png
最頂層接口 Executor.png

ThreadPoolExecutor

public class ThreadPoolExecutor extends AbstractExecutorService {
...
    public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler);
    }

    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, defaultHandler);
    }

    public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), handler);
    }

    public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
        if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
...
}
ThreadPoolExecutor 四個(gè)構(gòu)造器.png

看源碼可知前三個(gè)構(gòu)造器最終調(diào)用的都是第四個(gè)進(jìn)行初始化工作。

workQueue 等待隊(duì)列

  • ArrayBlockingQueue 使用較少。必須指定 capacity,即有界隊(duì)列
  • PriorityBlockingQueue 使用較少。默認(rèn)大小 DEFAULT_INITIAL_CAPACITY = 11,最大MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8,即無界有序隊(duì)列
  • LinkedBlockingQueue 默認(rèn)大小 Integer.MAX_VALUE,即無界隊(duì)列
  • SynchronousQueue 內(nèi)部并沒有數(shù)據(jù)緩存空間,一旦有了插入線程和移除線程,元素很快就從插入線程移交給移除線程(快速傳遞元素的方式),在多任務(wù)隊(duì)列中是最快處理任務(wù)的方式,元素總是以最快的方式從生產(chǎn)者傳遞給消費(fèi)者。典型應(yīng)用是Executors.newCachedThreadPool(),這個(gè)線程池根據(jù)需要(新任務(wù)到來時(shí))創(chuàng)建新的線程,如果有空閑線程則會重復(fù)使用,線程空閑了60秒后會被回收。
  • 線程池的排隊(duì)策略與所選的 BlockingQueue 有關(guān)。

handler 拒絕處理任務(wù)時(shí)使用的策略

  • ThreadPoolExecutor.AbortPolicy 丟棄任務(wù)并拋出RejectedExecutionException異常,會阻止正常工作。
  • ThreadPoolExecutor.DiscardPolicy 丟棄任務(wù),但是不拋出異常,系統(tǒng)正常工作。
  • ThreadPoolExecutor.DiscardOldestPolicy 丟棄隊(duì)列最前面的任務(wù)(即將被執(zhí)行的),然后重新嘗試提交當(dāng)前任務(wù)(重復(fù)此過程)
  • ThreadPoolExecutor.CallerRunsPolicy 由調(diào)用線程處理該任務(wù)

核心方法
execute() 是在Executor接口中的聲明,通過這個(gè)方法可以向線程池提交一個(gè)任務(wù),交由線程池去執(zhí)行
submit() 是在ExecutorService中聲明的方法,在AbstractExecutorService就已經(jīng)有了具體的實(shí)現(xiàn),在ThreadPoolExecutor中并沒有對其進(jìn)行重寫,這個(gè)方法也是用來向線程池提交任務(wù)的,但是它和execute()方法不同,它能夠返回任務(wù)執(zhí)行的結(jié)果,去看submit()方法的實(shí)現(xiàn),會發(fā)現(xiàn)它實(shí)際上還是調(diào)用的execute()方法,只不過它利用了Future來獲取任務(wù)執(zhí)行結(jié)果
shutdown() 不會立即終止線程池,而是要等所有任務(wù)緩存隊(duì)列中的任務(wù)都執(zhí)行完后才終止,但再也不會接受新的任務(wù)
shutdownNow() 立即終止線程池,并嘗試打斷正在執(zhí)行的任務(wù),并且清空任務(wù)緩存隊(duì)列,返回尚未執(zhí)行的任務(wù)

線程池狀態(tài)

    // runState is stored in the high-order bits
    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;
    /**
     *   RUNNING:  Accept new tasks and process queued tasks
     *   SHUTDOWN: Don't accept new tasks, but process queued tasks
     *   STOP:     Don't accept new tasks, don't process queued tasks,
     *             and interrupt in-progress tasks
     *   TIDYING:  All tasks have terminated, workerCount is zero,
     *             the thread transitioning to state TIDYING
     *             will run the terminated() hook method
     *   TERMINATED: terminated() has completed
     *   
     * RUNNING -> SHUTDOWN
     *    On invocation of shutdown(), perhaps implicitly in finalize()
     * (RUNNING or SHUTDOWN) -> STOP
     *    On invocation of shutdownNow()
     * SHUTDOWN -> TIDYING
     *    When both queue and pool are empty
     * STOP -> TIDYING
     *    When pool is empty
     * TIDYING -> TERMINATED
     *    When the terminated() hook method has completed
    **/
  • 創(chuàng)建線程池后,初始時(shí),線程池處于RUNNING狀態(tài);
  • 調(diào)用 shutdown(),則線程池處于SHUTDOWN狀態(tài),此時(shí)線程池不能夠接受新的任務(wù),它會等待所有任務(wù)執(zhí)行完畢;
  • 調(diào)用 shutdownNow(),則線程池處于STOP狀態(tài),此時(shí)線程池不能接受新的任務(wù),并且嘗試終止正在執(zhí)行的任務(wù);
  • 當(dāng)線程池處于SHUTDOWN或STOP狀態(tài),隊(duì)列和線程池中都為空的情況,即所有任務(wù)都已被終止,workerCount 標(biāo)記為 0,則處于TIDYING狀態(tài)
  • 當(dāng)線程池處于SHUTDOWN或STOP狀態(tài),并且所有工作線程已經(jīng)銷毀(處于TIDYING狀態(tài)),任務(wù)緩存隊(duì)列已經(jīng)清空或執(zhí)行結(jié)束后,線程池被設(shè)置為TERMINATED狀態(tài)。

任務(wù)的執(zhí)行相關(guān)重要參數(shù)

 // 任務(wù)緩存隊(duì)列,用來存放等待執(zhí)行的任務(wù)
private final BlockingQueue<Runnable> workQueue;
// 線程池的主要狀態(tài)鎖,對線程池狀態(tài)(比如線程池大小、runState等)的改變都要使用這個(gè)鎖
private final ReentrantLock mainLock = new ReentrantLock();

// Accessed only under mainLock.
// 用來存放工作集   
private final HashSet<Worker> workers = new HashSet<Worker>();
// 用來記錄線程池中曾經(jīng)出現(xiàn)過的最大線程數(shù)
private int largestPoolSize;
// 用來記錄已經(jīng)執(zhí)行完畢的任務(wù)個(gè)數(shù)
private long completedTaskCount;

// 線程工廠類,用來創(chuàng)建線程
private volatile ThreadFactory threadFactory;
// 任務(wù)拒絕策略
private volatile RejectedExecutionHandler handler;
// 線程存活時(shí)間
private volatile long keepAliveTime;
// 是否允許為核心線程設(shè)置存活時(shí)間
private volatile boolean allowCoreThreadTimeOut;
// 核心池的大?。淳€程池中的線程數(shù)目大于這個(gè)參數(shù)時(shí),提交的任務(wù)會被放進(jìn)任務(wù)緩存隊(duì)列)
private volatile int corePoolSize;

// //線程池最大能容忍的線程數(shù). Note that the actual maximum is internally bounded by CAPACITY.
private volatile int maximumPoolSize;

// 默認(rèn)的任務(wù)拒絕策略:丟棄任務(wù)并拋出RejectedExecutionException異常
private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();

corePoolSize是正常情況下線程池大小,maximumPoolSize是線程池的一種額外Support,即任務(wù)量突然過大時(shí)的額外可支持的開銷
largestPoolSize只是用來記錄線程池中曾經(jīng)有過的最大線程數(shù)目,跟線程池的容量沒有任何關(guān)系。

execute()

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    int c = ctl.get();
   
    /** 1. If fewer than corePoolSize threads are running, try to start a new thread with the given command as its first  task.  
      * The call to addWorker atomically checks runState and workerCount, and so prevents false alarms that would add  threads when it shouldn't, by returning false.
      */
    // 線程池中正在作業(yè)的線程數(shù) < corePoolSize數(shù)
    if (workerCountOf(c) < corePoolSize) {
        // 是否可以繼續(xù)向 corePool 中新增線程
        if (addWorker(command, true))
            return;
        // 再次獲取當(dāng)前線程池狀態(tài)值
        c = ctl.get();
    }

    /** 2. If a task can be successfully queued, then we still need to double-check whether we should have added a thread (because existing ones died since last checking) or that the pool shut down since entry into this method.
      * So we recheck state and if necessary roll back the enqueuing if stopped, or start a new thread if there are none.
      */
    // 線程池為可運(yùn)行狀態(tài),同時(shí)Runnable command可以加入等待隊(duì)列
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        //  非Running狀態(tài),則從隊(duì)列中去掉command并執(zhí)行拒絕策略
        if (! isRunning(recheck) && remove(command))
            reject(command);

       /** 3. If we cannot queue task, then we try to add a new thread.  
         * If it fails, we know we are shut down or saturated and so reject the task.
         */
         // 新增thread
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    // 不可以在拓展的線程池中運(yùn)行該實(shí)例,執(zhí)行拒絕策略
    else if (!addWorker(command, false))
        reject(command);
}

使用 ThreadPoolExecutor 創(chuàng)建線程池:

public class HelloThreadPool {
  public static void main(String[] args) {
    ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 2,
      100, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(2));
    
    IntStream.range(0, 4).mapToObj(PrintTask::new).forEach(printTask -> {
      executor.execute(printTask);
      System.out.println("線程池中所有線程數(shù)目:" + executor.getPoolSize() + ",隊(duì)列中待執(zhí)行的任務(wù)數(shù)目:" +
        executor.getQueue().size() + ",已執(zhí)行完的任務(wù)數(shù)目:" + executor.getCompletedTaskCount());
    });
    
    executor.shutdown();
  }
}

class PrintTask implements Runnable {
  private int taskIndex;
  
  PrintTask(int index) {
    this.taskIndex = index;
  }
  
  @Override
  public void run() {
    System.out.println(taskIndex + " is running...");
     try {
     TimeUnit.SECONDS.sleep(2);
     } catch (InterruptedException e) {
     e.printStackTrace();
     }
    System.out.println(taskIndex + " end");
  }
}

output:

0 is running...
線程池中所有線程數(shù)目:1,隊(duì)列中待執(zhí)行的任務(wù)數(shù)目:0,已執(zhí)行完的任務(wù)數(shù)目:0
線程池中所有線程數(shù)目:1,隊(duì)列中待執(zhí)行的任務(wù)數(shù)目:1,已執(zhí)行完的任務(wù)數(shù)目:0
線程池中所有線程數(shù)目:1,隊(duì)列中待執(zhí)行的任務(wù)數(shù)目:2,已執(zhí)行完的任務(wù)數(shù)目:0
線程池中所有線程數(shù)目:2,隊(duì)列中待執(zhí)行的任務(wù)數(shù)目:2,已執(zhí)行完的任務(wù)數(shù)目:0
3 is running...
3 end
0 end
1 is running...
2 is running...
2 end
1 end

當(dāng)把要執(zhí)行的實(shí)例變成 5 個(gè)就會出現(xiàn) RejectedExecutionException 異常:

java.util.concurrent.RejectedExecutionException: 
Task threadPool.PrintTask@7699a589 rejected from 
java.util.concurrent.ThreadPoolExecutor@58372a00[Running, pool size = 2, active threads = 2, queued tasks = 2, completed tasks = 0]

java doc中,并不提倡我們直接使用 ThreadPoolExecutor ,而是使用 Executors 類中提供的幾個(gè)靜態(tài)方法來創(chuàng)建線程池

  • Executors.newCachedThreadPool() 若線程池的當(dāng)前規(guī)模超過了`corePoolSize`,就會回收部分空閑的線程(根據(jù)`keepAliveTime`來回收),當(dāng)需求增加時(shí),線程池又可以智能的添加新線程來處理任務(wù)。此線程池大小`Integer.MAX_VALUE`可以認(rèn)為是不做限制(使用隊(duì)列`SynchronousQueue`),線程池大小完全依賴于JVM能夠創(chuàng)建的最大線程大小

  • Executors.newSingleThreadExecutor() 創(chuàng)建容量為1的線程池,`corePoolSize`和`maximumPoolSize`均為1,使用無界隊(duì)列`LinkedBlockingQueue`

  • Executors.newFixedThreadPool(int) 創(chuàng)建容量為固定個(gè)數(shù)n的線程池。`corePoolSize`和`maximumPoolSize`均為n,使用無界隊(duì)列`LinkedBlockingQueue`

  • Executors.newScheduledThreadPool(int) 創(chuàng)建一個(gè)指定corePoolSize的線程池,支持定時(shí)及周期性任務(wù)執(zhí)行

  • 如果ThreadPoolExecutor達(dá)不到要求,可以自己繼承ThreadPoolExecutor類進(jìn)行重寫

    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
    }

    public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
        return new DelegatedScheduledExecutorService(new ScheduledThreadPoolExecutor(1));
    }

    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }

參考鏈接:https://www.cnblogs.com/dolphin0520/p/3932921.html

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容