一起了解一下線程池的原理與使用

什么是線程池

線程池,顧名思義就是裝線程的池子。其用途是為了幫我們重復(fù)管理線程,避免創(chuàng)建大量的線程增加開(kāi)銷,提高響應(yīng)速度。

為什么要用線程池

作為一個(gè)嚴(yán)謹(jǐn)?shù)墓コ仟{,不會(huì)希望別人看到我們的代碼就開(kāi)始吐槽,new Thread().start()會(huì)讓代碼看起來(lái)混亂臃腫,并且不好管理和維護(hù),那么我們就需要用到了線程池。

在編程中經(jīng)常會(huì)使用線程來(lái)異步處理任務(wù),但是每個(gè)線程的創(chuàng)建和銷毀都需要一定的開(kāi)銷。如果每次執(zhí)行一個(gè)任務(wù)都需要開(kāi)一個(gè)新線程去執(zhí)行,則這些線程的創(chuàng)建和銷毀將消耗大量的資源;并且線程都是“各自為政”的,很難對(duì)其進(jìn)行控制,更何況有一堆的線程在執(zhí)行。線程池為我們做的,就是線程創(chuàng)建之后為我們保留,當(dāng)我們需要的時(shí)候直接拿來(lái)用,省去了重復(fù)創(chuàng)建銷毀的過(guò)程。

線程池的處理邏輯

線程池ThreadPoolExecutor構(gòu)造函數(shù)

//五個(gè)參數(shù)的構(gòu)造函數(shù)publicThreadPoolExecutor(intcorePoolSize,intmaximumPoolSize,longkeepAliveTime, TimeUnit unit, BlockingQueue workQueue)

//六個(gè)參數(shù)的構(gòu)造函數(shù)-1

publicThreadPoolExecutor(intcorePoolSize,intmaximumPoolSize,longkeepAliveTime, TimeUnit unit, BlockingQueue workQueue, ThreadFactory threadFactory)

//六個(gè)參數(shù)的構(gòu)造函數(shù)-2

publicThreadPoolExecutor(intcorePoolSize,intmaximumPoolSize,longkeepAliveTime, TimeUnit unit, BlockingQueue workQueue, RejectedExecutionHandler handler)

//七個(gè)參數(shù)的構(gòu)造函數(shù)publicThreadPoolExecutor(intcorePoolSize,intmaximumPoolSize,longkeepAliveTime, TimeUnit unit, BlockingQueue workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)

1.corePoolSize -> 該線程池中核心線程數(shù)最大值

核心線程:在創(chuàng)建完線程池之后,核心線程先不創(chuàng)建,在接到任務(wù)之后創(chuàng)建核心線程。并且會(huì)一直存在于線程池中(即使這個(gè)線程啥都不干),有任務(wù)要執(zhí)行時(shí),如果核心線程沒(méi)有被占用,會(huì)優(yōu)先用核心線程執(zhí)行任務(wù)。數(shù)量一般情況下設(shè)置為CPU核數(shù)的二倍即可。

2.maximumPoolSize -> 該線程池中線程總數(shù)最大值

線程總數(shù)=核心線程數(shù)+非核心線程數(shù)

非核心線程:簡(jiǎn)單理解,即核心線程都被占用,但還有任務(wù)要做,就創(chuàng)建非核心線程

3.keepAliveTime -> 非核心線程閑置超時(shí)時(shí)長(zhǎng)

這個(gè)參數(shù)可以理解為,任務(wù)少,但池中線程多,非核心線程不能白養(yǎng)著,超過(guò)這個(gè)時(shí)間不工作的就會(huì)被干掉,但是核心線程會(huì)保留。

4.TimeUnit -> keepAliveTime的單位

TimeUnit是一個(gè)枚舉類型,其包括:

NANOSECONDS : 1微毫秒 = 1微秒 / 1000

MICROSECONDS : 1微秒 = 1毫秒 / 1000

MILLISECONDS : 1毫秒 = 1秒 /1000

SECONDS : 秒

MINUTES : 分

HOURS : 小時(shí)

DAYS : 天

5.BlockingQueue workQueue -> 線程池中的任務(wù)隊(duì)列

默認(rèn)情況下,任務(wù)進(jìn)來(lái)之后先分配給核心線程執(zhí)行,核心線程如果都被占用,并不會(huì)立刻開(kāi)啟非核心線程執(zhí)行任務(wù),而是將任務(wù)插入任務(wù)隊(duì)列等待執(zhí)行,核心線程會(huì)從任務(wù)隊(duì)列取任務(wù)來(lái)執(zhí)行,任務(wù)隊(duì)列可以設(shè)置最大值,一旦插入的任務(wù)足夠多,達(dá)到最大值,才會(huì)創(chuàng)建非核心線程執(zhí)行任務(wù)。

常見(jiàn)的workQueue有四種:

1.SynchronousQueue:這個(gè)隊(duì)列接收到任務(wù)的時(shí)候,會(huì)直接提交給線程處理,而不保留它,如果所有線程都在工作怎么辦?那就新建一個(gè)線程來(lái)處理這個(gè)任務(wù)!所以為了保證不出現(xiàn)<線程數(shù)達(dá)到了maximumPoolSize而不能新建線程>的錯(cuò)誤,使用這個(gè)類型隊(duì)列的時(shí)候,maximumPoolSize一般指定成Integer.MAX_VALUE,即無(wú)限大

2.LinkedBlockingQueue:這個(gè)隊(duì)列接收到任務(wù)的時(shí)候,如果當(dāng)前已經(jīng)創(chuàng)建的核心線程數(shù)小于線程池的核心線程數(shù)上限,則新建線程(核心線程)處理任務(wù);如果當(dāng)前已經(jīng)創(chuàng)建的核心線程數(shù)等于核心線程數(shù)上限,則進(jìn)入隊(duì)列等待。由于這個(gè)隊(duì)列沒(méi)有最大值限制,即所有超過(guò)核心線程數(shù)的任務(wù)都將被添加到隊(duì)列中,這也就導(dǎo)致了maximumPoolSize的設(shè)定失效,因?yàn)榭偩€程數(shù)永遠(yuǎn)不會(huì)超過(guò)corePoolSize

3.ArrayBlockingQueue:可以限定隊(duì)列的長(zhǎng)度,接收到任務(wù)的時(shí)候,如果沒(méi)有達(dá)到corePoolSize的值,則新建線程(核心線程)執(zhí)行任務(wù),如果達(dá)到了,則入隊(duì)等候,如果隊(duì)列已滿,則新建線程(非核心線程)執(zhí)行任務(wù),又如果總線程數(shù)到了maximumPoolSize,并且隊(duì)列也滿了,則發(fā)生錯(cuò)誤,或是執(zhí)行實(shí)現(xiàn)定義好的飽和策略

4.DelayQueue:隊(duì)列內(nèi)元素必須實(shí)現(xiàn)Delayed接口,這就意味著你傳進(jìn)去的任務(wù)必須先實(shí)現(xiàn)Delayed接口。這個(gè)隊(duì)列接收到任務(wù)時(shí),首先先入隊(duì),只有達(dá)到了指定的延時(shí)時(shí)間,才會(huì)執(zhí)行任務(wù)

6.ThreadFactory threadFactory -> 創(chuàng)建線程的工廠

可以用線程工廠給每個(gè)創(chuàng)建出來(lái)的線程設(shè)置名字。一般情況下無(wú)須設(shè)置該參數(shù)。

7.RejectedExecutionHandler handler -> 飽和策略

這是當(dāng)任務(wù)隊(duì)列和線程池都滿了時(shí)所采取的應(yīng)對(duì)策略,默認(rèn)是AbordPolicy, 表示無(wú)法處理新任務(wù),并拋出 RejectedExecutionException 異常。此外還有3種策略,它們分別如下。

(1)CallerRunsPolicy:用調(diào)用者所在的線程來(lái)處理任務(wù)。此策略提供簡(jiǎn)單的反饋控制機(jī)制,能夠減緩新任務(wù)的提交速度。

(2)DiscardPolicy:不能執(zhí)行的任務(wù),并將該任務(wù)刪除。

(3)DiscardOldestPolicy:丟棄隊(duì)列最近的任務(wù),并執(zhí)行當(dāng)前的任務(wù)。


如何使用線程池

說(shuō)了半天原理,接下來(lái)就要用了,java為我們提供了4種線程池FixedThreadPool、CachedThreadPool、SingleThreadExecutor、ScheduledThreadPool,幾乎可以滿足我們大部分的需要了:

1.FixedThreadPool

可重用固定線程數(shù)的線程池,超出的線程會(huì)在隊(duì)列中等待,在Executors類中我們可以找到創(chuàng)建方式:

publicstaticExecutorServicenewFixedThreadPool(intnThreads){returnnewThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,newLinkedBlockingQueue());? ? }

FixedThreadPool的corePoolSize和maximumPoolSize都設(shè)置為參數(shù)nThreads,也就是只有固定數(shù)量的核心線程,不存在非核心線程。keepAliveTime為0L表示多余的線程立刻終止,因?yàn)椴粫?huì)產(chǎn)生多余的線程,所以這個(gè)參數(shù)是無(wú)效的。FixedThreadPool的任務(wù)隊(duì)列采用的是LinkedBlockingQueue。

創(chuàng)建線程池的方法,在我們的程序中只需要,后面其他種類的同理:

publicstaticvoidmain(String[] args){// 參數(shù)是要線程池的線程最大值ExecutorService executorService = Executors.newFixedThreadPool(10);

}

2.CachedThreadPool

CachedThreadPool是一個(gè)根據(jù)需要?jiǎng)?chuàng)建線程的線程池

publicstaticExecutorServicenewCachedThreadPool(){returnnewThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,newSynchronousQueue());? ? }

CachedThreadPool的corePoolSize是0,maximumPoolSize是Int的最大值,也就是說(shuō)CachedThreadPool沒(méi)有核心線程,全部都是非核心線程,并且沒(méi)有上限。keepAliveTime是60秒,就是說(shuō)空閑線程等待新任務(wù)60秒,超時(shí)則銷毀。此處用到的隊(duì)列是阻塞隊(duì)列SynchronousQueue,這個(gè)隊(duì)列沒(méi)有緩沖區(qū),所以其中最多只能存在一個(gè)元素,有新的任務(wù)則阻塞等待。

3.SingleThreadExecutor

SingleThreadExecutor是使用單個(gè)線程工作的線程池。其創(chuàng)建源碼如下:

publicstaticExecutorServicenewSingleThreadExecutor(){returnnewFinalizableDelegatedExecutorService? ? ? ? ? ? (newThreadPoolExecutor(1,1,0L, TimeUnit.MILLISECONDS,newLinkedBlockingQueue()));? ? }

我們可以看到總線程數(shù)和核心線程數(shù)都是1,所以就只有一個(gè)核心線程。該線程池才用鏈表阻塞隊(duì)列LinkedBlockingQueue,先進(jìn)先出原則,所以保證了任務(wù)的按順序逐一進(jìn)行。

4.ScheduledThreadPool

ScheduledThreadPool是一個(gè)能實(shí)現(xiàn)定時(shí)和周期性任務(wù)的線程池,它的創(chuàng)建源碼如下:

publicstaticScheduledExecutorServicenewScheduledThreadPool(intcorePoolSize){returnnewScheduledThreadPoolExecutor(corePoolSize);? ? }

這里創(chuàng)建了ScheduledThreadPoolExecutor,繼承自ThreadPoolExecutor,主要用于定時(shí)延時(shí)或者定期處理任務(wù)。ScheduledThreadPoolExecutor的構(gòu)造如下:

publicScheduledThreadPoolExecutor(intcorePoolSize){super(corePoolSize, Integer.MAX_VALUE,? ? ? ? ? ? ? DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,newDelayedWorkQueue());? ? }

可以看出corePoolSize是傳進(jìn)來(lái)的固定值,maximumPoolSize無(wú)限大,因?yàn)椴捎玫年?duì)列DelayedWorkQueue是無(wú)解的,所以maximumPoolSize參數(shù)無(wú)效。

當(dāng)執(zhí)行scheduleAtFixedRate或者scheduleWithFixedDelay方法時(shí),會(huì)向DelayedWorkQueue添加一個(gè)實(shí)現(xiàn)RunnableScheduledFuture接口的ScheduledFutureTask(任務(wù)的包裝類),并會(huì)檢查運(yùn)行的線程是否達(dá)到corePoolSize。如果沒(méi)有則新建線程并啟動(dòng)ScheduledFutureTask,然后去執(zhí)行任務(wù)。如果運(yùn)行的線程達(dá)到了corePoolSize時(shí),則將任務(wù)添加到DelayedWorkQueue中。DelayedWorkQueue會(huì)將任務(wù)進(jìn)行排序,先要執(zhí)行的任務(wù)會(huì)放在隊(duì)列的前面。在跟此前介紹的線程池不同的是,當(dāng)執(zhí)行完任務(wù)后,會(huì)將ScheduledFutureTask中的time變量改為下次要執(zhí)行的時(shí)間并放回到DelayedWorkQueue中。

如何合理配置線程池的大小

一般需要根據(jù)任務(wù)的類型來(lái)配置線程池大?。?/p>

如果是CPU密集型任務(wù),就需要盡量壓榨CPU,參考值可以設(shè)為 NCPU+1

如果是IO密集型任務(wù),參考值可以設(shè)置為2*NCPU

當(dāng)然,這只是一個(gè)參考值,具體的設(shè)置還需要根據(jù)實(shí)際情況進(jìn)行調(diào)整,比如可以先將線程池大小設(shè)置為參考值,再觀察任務(wù)運(yùn)行情況和系統(tǒng)負(fù)載、資源利用率來(lái)進(jìn)行適當(dāng)調(diào)整。

實(shí)現(xiàn)一個(gè)ThreadManager的線程池管理類:

public?class?ThreadManager?{??

//?定義兩個(gè)池子,mNormalPool?訪問(wèn)網(wǎng)絡(luò)用的,mDownloadPool?是下載用的??

private?static?ThreadPoolProxy?mNormalPool???=?new?ThreadPoolProxy(1,?3,?5?*?1000);//param?0??最大線程數(shù),param?1?核心線程數(shù)??

private?static?ThreadPoolProxy?mDownloadPool?=?new?ThreadPoolProxy(3,?3,?5?*?1000);??

//?proxy?是代理的意思??

//?定義兩個(gè)get方法,獲得兩個(gè)池子的對(duì)象?,直接get?獲得到的是代理對(duì)象??

public?static?ThreadPoolProxy?getNormalPool()?{??

return?mNormalPool;??

????}??

public?static?ThreadPoolProxy?getDownloadPool()?{??

return?mDownloadPool;??

????}??

//?代理設(shè)計(jì)模式類似一個(gè)中介,所以在中介這里有我們真正想獲取的對(duì)象??

//?所以要先獲取代理,再獲取這個(gè)線程池??

public?static?class?ThreadPoolProxy?{??

private?final?int????????????????mCorePoolSize;?????//?核心線程數(shù)??

private?final?int????????????????mMaximumPoolSize;??//?最大線程數(shù)??

private?final?long???????????????mKeepAliveTime;????//?所有任務(wù)執(zhí)行完畢后普通線程回收的時(shí)間間隔??

private?ThreadPoolExecutor?mPool;??//?代理對(duì)象內(nèi)部保存的是原來(lái)類的對(duì)象??

//?賦值??

public?ThreadPoolProxy(int?corePoolSize,?int?maximumPoolSize,?long?keepAliveTime)?{??

this.mCorePoolSize?=?corePoolSize;??

this.mMaximumPoolSize?=?maximumPoolSize;??

this.mKeepAliveTime?=?keepAliveTime;??

????????}??

private?void?initPool()?{??

if?(mPool?==?null?||?mPool.isShutdown())?{??

//????????????????int?corePoolSize?=?1;//核心線程池大小??

//????????????????int?maximumPoolSize?=?3;//最大線程池大小??

//????????????????long?keepAliveTime?=?5?*?1000;//保持存活的時(shí)間??

TimeUnit?unit??????=?TimeUnit.MILLISECONDS;//單位??

BlockingQueue?workQueue?=null;//阻塞隊(duì)列??

workQueue?=new?ArrayBlockingQueue(3);//FIFO,大小有限制,為3個(gè)??

//workQueue?=?new?LinkedBlockingQueue();??//隊(duì)列類型為linked,其大小不定,無(wú)限大小??

//????????????????workQueue?=?new?PriorityBlockingQueue();??

ThreadFactory?threadFactory?=?Executors.defaultThreadFactory();//線程工廠??

RejectedExecutionHandler?handler?=null;//異常捕獲器??

//????????????????handler?=?new?ThreadPoolExecutor.DiscardOldestPolicy();//去掉隊(duì)列中首個(gè)任務(wù),將新加入的放到隊(duì)列中去??

//????????????????handler?=?new?ThreadPoolExecutor.AbortPolicy();//觸發(fā)異常??

handler?=new?ThreadPoolExecutor.DiscardPolicy();//不做任何處理??

//????????????????handler?=?new?ThreadPoolExecutor.CallerRunsPolicy();//直接執(zhí)行,不歸線程池控制,在調(diào)用線程中執(zhí)行??

//????????????????new?Thread(task).start();??

//?創(chuàng)建線程池??

mPool?=new?ThreadPoolExecutor(mCorePoolSize,??

????????????????????????mMaximumPoolSize,??

????????????????????????mKeepAliveTime,??

????????????????????????unit,??

????????????????????????workQueue,??

????????????????????????threadFactory,??

????????????????????????handler);??

????????????}??

????????}??

/**

?????????*?執(zhí)行任務(wù)

?????????*?@param?task

?????????*/??

public?void?execute(Runnable?task)?{??

????????????initPool();??


//執(zhí)行任務(wù)??

????????????mPool.execute(task);??

????????}??

//?提交任務(wù)??

public?Future?submit(Runnable?task)?{??

????????????initPool();??

return?mPool.submit(task);??

????????}??

//?取消任務(wù)??

public?void?remove(Runnable?task)?{??

if?(mPool?!=?null?&&?!mPool.isShutdown())?{??

????????????????mPool.getQueue()??

????????????????????????.remove(task);??

????????????}??

????????}??

????}??

}??


END

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容