(四)JDK并發(fā)包——線程池

為了避免系統(tǒng)頻繁的創(chuàng)建和銷毀線程,可以使用線程池來管理線程,以實(shí)現(xiàn)線程的復(fù)用。同時,線程池還可以幫助管理系統(tǒng)中的線程數(shù)量,防止過多的并發(fā)線程耗盡系統(tǒng)的資源。

Executor框架

Executor框架

ThreadPoolExecutor表示一個線程池,而Executors可以當(dāng)作是一個線程池工廠類,用于方便的生產(chǎn)線程池,可以通過Executors的靜態(tài)方法獲得幾種預(yù)先設(shè)定的線程池。從上圖可知,ThreadPoolExecutor類實(shí)現(xiàn)了Executor接口,通過這個接口,線程池ThreadPoolExecutor可以接受并運(yùn)行任何實(shí)現(xiàn)Runnable接口的對象。

public static ExecutorService newFixedThreadPool(int nThreads)
public static ExecutorService newSingleThreadExecutor() 
public static ExecutorService newCachedThreadPool()
public static ScheduledExecutorService newSingleThreadScheduledExecutor()
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)
······

1.幾種經(jīng)典線程池

  • FixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

通過Executors.FixedThreadPool(n)方法生成的固定線程數(shù)量為n的線程池,該線程池中的線程數(shù)量始終不變,當(dāng)有一個新的任務(wù)提交時,如果線程中有空閑的線程,則立即執(zhí)行該任務(wù);如果沒有空閑線程,則新的任務(wù)會緩存在一個任務(wù)隊列中,等到線程池中有空閑線程了再執(zhí)行。線程池使用的是無界隊列LinkedBlockingQueue,任務(wù)可以無限的添加進(jìn)隊列中,但是也有耗盡系統(tǒng)資源的風(fēng)險。

  • CachedThreadPool
public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

通過Executors.newCachedThreadPool()方法生成的可根據(jù)實(shí)際情況調(diào)整線程數(shù)量的線程池。線程中的數(shù)量不固定,如果有空閑線程則優(yōu)先使用空閑線程,如果所有線程都忙碌則會創(chuàng)建一個新的線程處理任務(wù)。線程組將corePoolSize設(shè)為0,而maximumPoolSize設(shè)為無窮大,也就是說平時線程組中沒有線程,如果有任務(wù)提交而所有線程繁忙時,線程組會無限制的創(chuàng)建線程,同樣可能會耗盡系統(tǒng)資源。線程組使用的SynchronousQueue容量為0,無法存儲任務(wù),只是提交任務(wù)的中介,因此所有任務(wù)都會立刻提交成功,不會被拒絕。

  • SingleThreadExecutor
public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

通過Executors.newSingleThreadExecutor()方法生成線程數(shù)量永遠(yuǎn)為1的線程池。

  • SingleThreadScheduledExecutor
public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
        return new DelegatedScheduledExecutorService
            (new ScheduledThreadPoolExecutor(1));
    }

通過Executors.newSingleThreadScheduledExecutor()方法生成的線程數(shù)量永遠(yuǎn)為1的線程池。該線程池可以控制線程的啟動時間,達(dá)到延時啟動的效果。

  • ScheduledThreadPool
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }

通過Executors.newThreadScheduledExecutor(n)方法生成的固定線程數(shù)量為n的線程池。該線程池也可以控制線程的啟動時間。

2.ScheduledThreadPool的使用

public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit);
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit);
  • schedule()方法會在給定時間后調(diào)用一次指定方法。
  • scheduleAtFixedRate()會周期性的調(diào)用指定方法,在initialDelay之后執(zhí)行第一次方法,在上一次方法開始執(zhí)行后period時間后再執(zhí)行下一次方法。上一次任務(wù)的執(zhí)行時長不會影響下一次任務(wù)開始執(zhí)行的時間。
  • scheduleWithFixedDelay()會周期性的調(diào)用指定方法,在initialDelay之后執(zhí)行第一次方法,在上一次方法結(jié)束執(zhí)行后delay時間后再執(zhí)行下一次方法。上一次任務(wù)的執(zhí)行時長會影響下一次任務(wù)開始執(zhí)行的時間。

3.創(chuàng)建自定義的ThreadPoolExecutor線程池

ThreadPoolExecutor的構(gòu)造方法如下,

public ThreadPoolExecutor(
                              // 線程池中穩(wěn)定線程數(shù)量
                              int corePoolSize,
                              // 線程池中最大線程數(shù)量
                              int maximumPoolSize,
                              // 線程超過穩(wěn)定數(shù)量時,多余的線程的存活時間,
                              // 超時的線程將被回收
                              long keepAliveTime,
                              // 超時時間的單位
                              TimeUnit unit,
                              // 任務(wù)隊列,被提交但尚未被執(zhí)行的任務(wù)存放的地方
                              BlockingQueue<Runnable> workQueue,
                              // 線程工廠,用于創(chuàng)建線程
                              ThreadFactory threadFactory,
                              // 拒絕策略,當(dāng)任務(wù)太多來不及處理時,如何拒絕任務(wù)
                              RejectedExecutionHandler handler)
任務(wù)隊列(workQueue)
  • SynchronousQueue直接提交的隊列:SynchronousQueue沒有容量,每一個插入操作都要等待一個刪除操作,也就是說SynchronousQueue只是任務(wù)提交的中介,提交的任務(wù)不會被真實(shí)的保存,而是直接提交給線程組執(zhí)行,如果沒有空閑線程則創(chuàng)建新線程,如果進(jìn)程數(shù)量達(dá)到最大值則拒絕任務(wù)。因此使用SynchronousQueue總要配合很大的maximumPoolSize值以防止任務(wù)被拒絕。
  • ArrayBlockingQueue有界的任務(wù)隊列:使用ArrayBlockingQueue時,如果線程池實(shí)際線程數(shù) n 小于穩(wěn)定數(shù)量corePoolSize,則會優(yōu)先創(chuàng)建新的線程;如果 n 大于corePoolSize,則將任務(wù)存入隊列中等待;如果隊列存滿了,而 n 小于最大線程數(shù)maximumPoolSize,就創(chuàng)建新線程處理任務(wù);如果隊列已滿,而線程數(shù)也達(dá)到最大值,則拒絕任務(wù)。也就是說,除非系統(tǒng)非常繁忙,否則ArrayBlockingQueue會使線程數(shù)穩(wěn)定在corePoolSize。
  • LinkedBlockingQueue無界的任務(wù)隊列:除非系統(tǒng)資源耗盡,否則任務(wù)可以一直加入隊列中。當(dāng)有新任務(wù)到來時,如果線程池實(shí)際線程數(shù) n 小于穩(wěn)定數(shù)量corePoolSize,則會創(chuàng)建新的線程;而如果 n 大于corePoolSize,則將任務(wù)存入隊列中等待。也就是說,線程中的數(shù)量永遠(yuǎn)不會超過corePoolSize,而線程組的拒絕策略也永遠(yuǎn)不會觸發(fā)。而如果隊列中的任務(wù)數(shù)量持續(xù)增長,將會耗盡系統(tǒng)資源。
  • PriorityBlockingQueue優(yōu)先任務(wù)隊列:優(yōu)先隊列可以控制任務(wù)的執(zhí)行順序,按照任務(wù)的優(yōu)先級執(zhí)行任務(wù)。

4.線程組執(zhí)行邏輯

ThreadPoolExecutor類的核心方法execute()的執(zhí)行邏輯如下,源碼的英文注解也描述了線程組是如何執(zhí)行任務(wù)的:

  1. 如果線程組線程數(shù)少于穩(wěn)定數(shù)corePoolSize,方法會直接嘗試調(diào)用addWorker()方法創(chuàng)建一個新的線程來執(zhí)行任務(wù),此時會原子性的檢查runStateworkerCount的值,如果不滿足則會返回false來阻止線程組創(chuàng)建新線程。
  2. 如果線程數(shù)高于corePoolSize或創(chuàng)建線程失敗,則方法會嘗試將任務(wù)加入等待隊列中,如果加入成功,則任務(wù)在隊列中等待執(zhí)行。
  3. 如果加入隊列失敗,則方法會嘗試再次調(diào)用addWorker()方法提交線程,這次如果線程數(shù)小于maximumPoolSize則會為任務(wù)創(chuàng)建線程執(zhí)行。而如果這次提交失敗,則方法會根據(jù)拒絕策略拒絕任務(wù)。
    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }

5.拒絕策略

JDK內(nèi)置四種拒絕策略:

  • AbortPolicy:該策略會直接拋出異常,阻止系統(tǒng)正常工作。
  • CallerRunsPolicy:只要線程池未關(guān)閉,該策略會直接在調(diào)用者線程中運(yùn)行被拒絕的任務(wù)。
  • DiscardOldestPolicy:該策略將丟棄最老的一個任務(wù),也就是即將被執(zhí)行的那一個任務(wù),然后嘗試再次提交當(dāng)前任務(wù)。
  • DiscardOldestPolicy:該策略會直接丟棄當(dāng)前這個被拒絕的任務(wù)。

6.線程工廠(ThreadFactory)

ThreadFactory是一個接口,用于描述如何生成線程。Executor中的線程池構(gòu)造方法都有額外接受一個ThreadFactory參數(shù)的版本。建議使用自定義的ThreadFactory方法來為創(chuàng)建的線程起個名字,這是ThreadFactory最簡單也是最有用的功能,免得在排查線程問題時面對報錯信息無從下手。

7.最優(yōu)的線程池線程數(shù)

線程池的大小對系統(tǒng)性能的影響并不大,因此大小并不需要過分精確,只需要避免極大和極小的極端情況。
線程池線程數(shù) = CPU數(shù)量 * CPU使用率 * (1 + 等待時間/計算時間)

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容