六、線程池

為什么要用線程池?

  1. 降低資源消耗
    • 通過重復(fù)利用已創(chuàng)建的線程, 降低線程創(chuàng)建和銷毀的造成的消耗
  2. 提高相應(yīng)速度
    • 任務(wù)到達(dá)時(shí),可以不需要等待線程創(chuàng)建就能立即執(zhí)行
  3. 提高線程的可管理性
    • 線程池對(duì)線程進(jìn)行統(tǒng)一的分配、調(diào)優(yōu)可監(jiān)控

線程池種類

  1. newSingleThreadExecutor
//只有一個(gè)線程的線程池
        public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }
  1. newWorkStealingPool
//可以設(shè)置并行級(jí)別(數(shù)量) 并行級(jí)別決定了同一時(shí)刻最多有多少個(gè)線程在執(zhí)行 默認(rèn)是cpu個(gè)數(shù)
       public static ExecutorService newWorkStealingPool() {
        return new ForkJoinPool
            (Runtime.getRuntime().availableProcessors(),
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
    }
  1. newFixedThreadPool
//固定大小 可重用的線程池
   public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
  1. newCachedThreadPool
// 創(chuàng)建沒有數(shù)量上線的線程池 線程不夠就創(chuàng)建  不用的60s就銷毀
        public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }
  1. newScheduledThreadPool

// 維持一定數(shù)量的線程 線程即使空閑 也不會(huì)被回收
//可以定時(shí)或者周期執(zhí)行任務(wù)
   public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }

Executor框架

image
  • 接口
    • Executor 一個(gè)運(yùn)行新任務(wù)的簡單接口
    • ExecutorService 擴(kuò)展了Executor接口,添加了一些管理執(zhí)行器和任務(wù)生命周期的方法
    • ScheduledExecutorService 擴(kuò)展了ExecutorService。支持Future和定時(shí)執(zhí)行任務(wù)
public interface Executor {
//只有一個(gè)方法
    void execute(Runnable command);
}
  • ThreadPoolExecutor分析
public class ThreadPoolExecutor extends AbstractExecutorService {

ctl

 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
 
// ctl 控制線程池的運(yùn)行狀態(tài)(runState) 和線程池有效線程(workCount)的數(shù)量進(jìn)行控制的字段
//獲取運(yùn)行狀態(tài)
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
//獲取運(yùn)行狀態(tài)
    private static int workerCountOf(int c)  { return c & CAPACITY; }
// 獲取運(yùn)行狀態(tài)和活動(dòng)線程數(shù)的值。
    private static int ctlOf(int rs, int wc) { return rs | wc; }


runState 線程池的運(yùn)行狀態(tài) 看下圖

    // 能接受新提交的任務(wù),也能處理阻塞隊(duì)列中的任務(wù)
    private static final int RUNNING    = -1 << COUNT_BITS;
    //關(guān)閉狀態(tài),不接收新任務(wù),卻能繼續(xù)處理阻塞隊(duì)列中的任務(wù)。調(diào)用shutdown()使線程進(jìn)入該狀態(tài)
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    //不接收新任務(wù),不處理隊(duì)列中任務(wù),中斷正在處理任務(wù)的線程。調(diào)用shutdownNow()使線程進(jìn)入該狀態(tài)
    private static final int STOP       =  1 << COUNT_BITS;
    //所有任務(wù)都終止了,workerCount=0,可以進(jìn)入該狀態(tài)
    private static final int TIDYING    =  2 << COUNT_BITS;
    //在terminated() 方法執(zhí)行完后進(jìn)入該狀態(tài)
    private static final int TERMINATED =  3 << COUNT_BITS;
    
}
  • 線程池運(yùn)行狀態(tài)圖


    image
構(gòu)造函數(shù)    參數(shù)很重要

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
                      ……
        //核心線程數(shù)量
        this.corePoolSize = corePoolSize;
        //最大線程數(shù)量
        this.maximumPoolSize = maximumPoolSize;
        //等待隊(duì)列
        this.workQueue = workQueue;
        // 線程池維護(hù)線程所允許的空閑時(shí)間
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        //用來創(chuàng)建新線程
        this.threadFactory = threadFactory;
        //拒絕策略
        this.handler = handler;
                              }
  • 執(zhí)行流程


    image

或者這個(gè)圖


image

調(diào)用ThreadPoolExecutor的execute提交線程,首先檢查corePool

  1. 如果CorePool內(nèi)的線程<CorePoolSize,新創(chuàng)建線程執(zhí)行任務(wù),即使線程池中其他線程是空閑的。
  2. 如果>=CorePoolSize且<maximumPoolSize,
    • 且Queue不滿,將線程加入workQueue
    • 如果滿了,創(chuàng)建新線程處理任務(wù)
  3. 如果>=maximumPoolSize
    • 如果Queue不滿,放入Queue
    • Queue滿了,執(zhí)行拒絕策略。

所以判斷的順序?yàn)?corePoolSize --> workQueue --> maximumPoolSize。

  • 拒絕策略
    • AbortPolicy:直接拋出異常,這是默認(rèn)策略;
    • CallerRunsPolicy:用調(diào)用者所在的線程來執(zhí)行任務(wù);
    • DiscardOldestPolicy:丟棄阻塞隊(duì)列中靠最前的任務(wù),并執(zhí)行當(dāng)前任務(wù);
    • DiscardPolicy:直接丟棄任務(wù);

阻塞隊(duì)列

  • ArrayBlockingQueue
    • 基于數(shù)組實(shí)現(xiàn)的阻塞隊(duì)列
    • 創(chuàng)建時(shí)必須指定容量大小
    • 可以指定公平性或非公平性,默認(rèn)非公平,即不保證等待時(shí)間長的最優(yōu)先訪問隊(duì)列
  • LinkedBlockingQueue
    • 基于鏈表實(shí)現(xiàn)
    • 如果不指定容量大小,默認(rèn)是Integer.MAX_VALUE
  • PriorityBlockingQueue
    • 無界阻塞隊(duì)列
    • 按照元素優(yōu)先級(jí)進(jìn)行排序,優(yōu)先級(jí)高的先出隊(duì)
  • SynchronousQueue
    • 同步阻塞隊(duì)列
    • 隊(duì)列大小為1,一個(gè)元素要放入該隊(duì)列中必須有一個(gè)線程在等待獲取元素
  • DelayQueue

  • DelayedWorkQueue
  • BlockingDeque
    • 雙向阻塞隊(duì)列
  • TransferQueue
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容