Java多線程(6)-- 分工之線程池

? ? ? ?使用線程的時候就去創(chuàng)建一個線程,這樣實現(xiàn)起來非常簡便,但是就會有一個問題:

  如果并發(fā)的線程數(shù)量很多,并且每個線程都是執(zhí)行一個時間很短的任務(wù)就結(jié)束了,這樣頻繁創(chuàng)建線程就會大大降低系統(tǒng)的效率,因為頻繁創(chuàng)建線程和銷毀線程需要時間。

  那么有沒有一種辦法使得線程可以復(fù)用,就是執(zhí)行完一個任務(wù),并不被銷毀,而是可以繼續(xù)執(zhí)行其他的任務(wù)?在Java中可以通過線程池來達(dá)到這樣的效果。

合理利用線程池能夠帶來三個好處。第一:降低資源消耗。通過重復(fù)利用已創(chuàng)建的線程降低線程創(chuàng)建和銷毀造成的消耗。第二:提高響應(yīng)速度。當(dāng)任務(wù)到達(dá)時,任務(wù)可以不需要的等到線程創(chuàng)建就能立即執(zhí)行。第三:提高線程的可管理性。線程是稀缺資源,如果無限制的創(chuàng)建,不僅會消耗系統(tǒng)資源,還會降低系統(tǒng)的穩(wěn)定性,使用線程池可以進(jìn)行統(tǒng)一的分配,調(diào)優(yōu)和監(jiān)控。

線程池繼承關(guān)系圖:

ThreadPoolExecutor、AbstractExecutorService、ExecutorService和Executor幾個之間的關(guān)系:

  Executor是一個頂層接口,在它里面只聲明了一個方法execute(Runnable),返回值為void,參數(shù)為Runnable類型,從字面意思可以理解,就是用來執(zhí)行傳進(jìn)去的任務(wù)的;

  然后ExecutorService接口繼承了Executor接口,并聲明了一些方法:submit、invokeAll、invokeAny以及shutDown等,也提供了更加全面的提交任務(wù)機(jī)制,如返回Future而不是void的submit方法。注意,這個方法的輸入是Callable,它解決了Runnable無法返回結(jié)果的困擾;

  抽象類AbstractExecutorService實現(xiàn)了ExecutorService接口,基本實現(xiàn)了ExecutorService中聲明的所有方法;

  然后ThreadPoolExecutor繼承了類AbstractExecutorService。

? ? ? ?Java標(biāo)準(zhǔn)庫提供了幾種基礎(chǔ)實現(xiàn),比如ThreadPoolExecutor、ScheduledThreadPoolExecutor、ForkJoinPool。這些線程池的設(shè)計特點在于其高度的可調(diào)節(jié)性和靈活性,以盡量滿足復(fù)雜多變的實際應(yīng)用場景。

? ? ? ? Executors則從簡化使用的角度,為我們提供了各種方便的靜態(tài)工廠方法。


java.uitl.concurrent.ThreadPoolExecutor類是線程池中最核心的一個類,因此如果要透徹地了解Java中的線程池,必須先了解這個類。

public?ThreadPoolExecutor(int?corePoolSize, int?maximumPoolSize,long?keepAliveTime, TimeUnit unit, BlockingQueueworkQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler);

參數(shù)的含義:

corePoolSize:線程池的基本大小。默認(rèn)情況下,在創(chuàng)建了線程池后,線程池中的線程數(shù)為0,當(dāng)有任務(wù)來之后,就會創(chuàng)建一個線程去執(zhí)行任務(wù),當(dāng)線程池中的線程數(shù)目達(dá)到corePoolSize后,就會把到達(dá)的任務(wù)放到緩存隊列當(dāng)中;

maximumPoolSize:線程池中允許的最大線程數(shù)。如果隊列滿了,并且已創(chuàng)建的線程數(shù)小于最大線程數(shù),則線程池會再創(chuàng)建新的線程執(zhí)行任務(wù)。值得注意的是如果使用了無界的任務(wù)隊列這個參數(shù)就沒什么效果。注意還有一個largestPoolSize,記錄了曾經(jīng)出現(xiàn)的最大線程個數(shù)。因為setMaximumPoolSize()可以改變最大線程數(shù)。

poolSize:線程池中當(dāng)前線程的數(shù)量。

keepAliveTime:表示線程沒有任務(wù)執(zhí)行時最多保持多久時間會終止。默認(rèn)情況下,只有當(dāng)線程池中的線程數(shù)大于corePoolSize時,keepAliveTime才會起作用,直到線程池中的線程數(shù)不大于corePoolSize,即當(dāng)線程池中的線程數(shù)大于corePoolSize時,如果一個線程空閑的時間達(dá)到keepAliveTime,則會終止,直到線程池中的線程數(shù)不超過corePoolSize。但是如果調(diào)用了allowCoreThreadTimeOut(boolean)方法,在線程池中的線程數(shù)不大于corePoolSize時,keepAliveTime參數(shù)也會起作用,直到線程池中的線程數(shù)為0;

workQueue(任務(wù)隊列):用于保存等待執(zhí)行的任務(wù)的阻塞隊列。可以選擇以下幾個阻塞隊列:

(1)ArrayBlockingQueue:是一個基于數(shù)組結(jié)構(gòu)的有界阻塞隊列,此隊列按 FIFO(先進(jìn)先出)原則對元素進(jìn)行排序。

(2)LinkedBlockingQueue:一個基于鏈表結(jié)構(gòu)的阻塞隊列,此隊列按FIFO (先進(jìn)先出) 排序元素,吞吐量通常要高于ArrayBlockingQueue。其實也是有界隊列,但是不設(shè)置大小時就時Integer.MAX_VALUE。靜態(tài)工廠方法Executors.newFixedThreadPool()和Executors.newSingleThreadExecutor()使用了這個隊列。

(3)SynchronousQueue:一個不存儲元素的阻塞隊列。每個插入操作必須等到另一個線程調(diào)用移除操作,否則插入操作一直處于阻塞狀態(tài),吞吐量通常要高于LinkedBlockingQueue,靜態(tài)工廠方法Executors.newCachedThreadPool使用了這個隊列。

(4)PriorityBlockingQueue:一個具有優(yōu)先級得無界阻塞隊列。

handler(任務(wù)拒絕策略):

當(dāng)線程池的任務(wù)緩存隊列已滿并且線程池中的線程數(shù)目達(dá)到maximumPoolSize,如果還有任務(wù)到來就會采取任務(wù)拒絕策略,通常有以下四種策略:

ThreadPoolExecutor.AbortPolicy:丟棄任務(wù)并拋出RejectedExecutionException異常。

ThreadPoolExecutor.DiscardPolicy:也是丟棄任務(wù),但是不拋出異常。

ThreadPoolExecutor.DiscardOldestPolicy:丟棄隊列最前面的任務(wù),然后重新嘗試執(zhí)行任務(wù)(重復(fù)此過程)

ThreadPoolExecutor.CallerRunsPolicy:由調(diào)用線程處理該任務(wù)


那么poolSize、corePoolSize、maximumPoolSize三者的關(guān)系是如何的呢?

有界隊列:

當(dāng)新提交一個任務(wù)時:

(1)如果poolSize < corePoolSize,新增加一個線程處理新的任務(wù)。

(2)如果poolSize = corePoolSize,新任務(wù)會被放入阻塞隊列等待。

(3)如果阻塞隊列的容量達(dá)到上限,且這時poolSize < maximumPoolSize,新增線程來處理任務(wù)。

(4)如果阻塞隊列滿了,且poolSize = maximumPoolSize,那么線程池已經(jīng)達(dá)到極限,會根據(jù)飽和策略RejectedExecutionHandler拒絕新的任務(wù)。

(5)如果線程池中的線程數(shù)量大于 corePoolSize時,且某線程空閑時間超過keepAliveTime,線程將被終止,直至線程池中的線程數(shù)目不大于corePoolSize;如果允許為核心池中的線程設(shè)置存活時間,那么核心池中的線程空閑時間超過keepAliveTime,線程也會被終止。

無界隊列

與有界隊列相比,除非系統(tǒng)資源耗盡,否則無界的任務(wù)隊列不存在任務(wù)入隊失敗的情況。當(dāng)有新的任務(wù)到來,系統(tǒng)的線程數(shù)小于corePoolSize時,則新建線程執(zhí)行任務(wù)。當(dāng)達(dá)到corePoolSize后,就不會繼續(xù)增加,若后續(xù)仍有新的任務(wù)加入,而沒有空閑的線程資源,則任務(wù)直接進(jìn)入隊列等待。若任務(wù)創(chuàng)建和處理的速度差異很大,無界隊列會保持快速增長,直到耗盡系統(tǒng)內(nèi)存。

https://blog.csdn.net/kusedexingfu/article/details/72491864


幾個工廠方法:

不過在java doc中,并不提倡我們直接使用ThreadPoolExecutor,而是使用Executors類中提供的幾個靜態(tài)方法來創(chuàng)建線程池:

Executors.newCachedThreadPool();??????? //創(chuàng)建一個線程池,容量大小為??? ?????

????????????????????????????????????????????????????????????????????Integer.MAX_VALUE

Executors.newSingleThreadExecutor();?? //創(chuàng)建容量為1的線程池

Executors.newFixedThreadPool(intnThreads);??? //創(chuàng)建固定容量大小的線程池

Executors.newSingleThreadScheduledExecutor()和newScheduledThreadPool(int corePoolSize)??//創(chuàng)建的是一個ScheduledExecutorService,可以進(jìn)行定時或周期性的工作調(diào)度,區(qū)別在于單一工作線程還是多個工作線程。

Executors.newWorkStealingPool(intparallelism)????//這是一個經(jīng)常被人忽略的線程池,Java8才加入這個創(chuàng)建方法,其內(nèi)部會構(gòu)建ForkJoinPool,利用Work-Stealing算法,并行的處理任務(wù),不保證處理順序。

實現(xiàn)細(xì)節(jié):

public static ExecutorService newFixedThreadPool(int nThreads) {

??? return newThreadPoolExecutor(nThreads, nThreads,

?????????????????????????????????0L, TimeUnit.MILLISECONDS,

?????????????????????????????????new LinkedBlockingQueue());

}

public static ExecutorService newSingleThreadExecutor() {

??? return newFinalizableDelegatedExecutorService

??????? (newThreadPoolExecutor(1, 1,

???????????????????????????????0L, TimeUnit.MILLISECONDS,

???????????????????????????????new LinkedBlockingQueue()));

}

public static ExecutorService newCachedThreadPool() {

??? return newThreadPoolExecutor(0, Integer.MAX_VALUE,

?????????????????????????????????60L, TimeUnit.SECONDS,

?????????????????????????????????new SynchronousQueue());

}


從它們的具體實現(xiàn)來看,它們實際上也是調(diào)用了ThreadPoolExecutor,只不過參數(shù)都已配置好了。

newFixedThreadPool創(chuàng)建的線程池corePoolSize和maximumPoolSize值是相等的,它使用的LinkedBlockingQueue;

newSingleThreadExecutor將corePoolSize和maximumPoolSize都設(shè)置為1,也使用的LinkedBlockingQueue;

newCachedThreadPool將corePoolSize設(shè)置為0,將maximumPoolSize設(shè)置為Integer.MAX_VALUE,使用的SynchronousQueue,也就是說來了任務(wù)就創(chuàng)建線程運行,當(dāng)線程空閑超過60秒,就銷毀線程。


向線程池提交任務(wù):

我們可以使用execute提交的任務(wù),但是execute方法沒有返回值,所以無法判斷任務(wù)知否被線程池執(zhí)行成功。通過以下代碼可知execute方法輸入的任務(wù)是一個Runnable類的實例。

threadsPool.execute(new Runnable() {

??? @Override

??? public void run() {

??? // TODO Auto-generatedmethod stub

? ? }

});

我們也可以使用submit 方法來提交任務(wù),它會返回一個future,那么我們可以通過這個future來判斷任務(wù)是否執(zhí)行成功,通過future的get方法來獲取返回值,

get方法會阻塞住直到任務(wù)完成,而使用get(long timeout, TimeUnit unit)方法則會阻塞一段時間后立即返回,這時有可能任務(wù)沒有執(zhí)行完。

try {

??? Object s = future.get();

??? } catch(InterruptedException e) {

??? //處理中斷異常

??? } catch(ExecutionException e) {

??? //處理無法執(zhí)行任務(wù)異常

??? } finally {

??? //關(guān)閉線程池

??? executor.shutdown();

}


線程池的關(guān)閉:

ThreadPoolExecutor提供了兩個方法,用于線程池的關(guān)閉,分別是shutdown()和shutdownNow(),其中:

shutdown():不會立即終止線程池,而是要等所有任務(wù)緩存隊列中的任務(wù)都執(zhí)行完后才終止,但再也不會接受新的任務(wù)。

shutdownNow():立即終止線程池,并嘗試打斷正在執(zhí)行的任務(wù),并且清空任務(wù)緩存隊列,返回尚未執(zhí)行的任務(wù)。


線程池的更多細(xì)節(jié)可參考:https://www.cnblogs.com/dolphin0520/p/3932921.html

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

友情鏈接更多精彩內(nèi)容