今天我們聊聊在JAVA世界中如何實現(xiàn)一桶洗澡水可供成百上千的人反復使用的故事
意義:線程池可以反復使用已經(jīng)創(chuàng)建的線程,減小了線程反復創(chuàng)建和銷毀的開銷,避免過多線程占用太多內(nèi)存
構(gòu)造參數(shù)
1、corePoolSize
核心線程數(shù),線程池初始化完成后,不會馬上創(chuàng)建線程,而是等待任務(wù)的到來才會去創(chuàng)建線程,只要核心線程還沒有滿,每次有任務(wù)提交進來即時有其他線程處于空閑狀態(tài),線程池依然會創(chuàng)建新線程直到達到核心線程數(shù)量,之后就算沒有任務(wù),線程池中的線程數(shù)量也會維持核心線程數(shù)
2、maxPooolSize
最大線程數(shù),當任務(wù)存儲隊列的任務(wù)達到了設(shè)置的最大值時,線程池就會去創(chuàng)建新的線程去處理,線程池的數(shù)量最多只能是maxPooolSize的數(shù)量,而在等待了超設(shè)置的存活時間后依舊沒有新的任務(wù)處理,非核心線程就會被銷毀,maxPooolSize可以是非常大的數(shù)值,可以設(shè)置為Integer.MAX_VALUE
3、keepAliveTime
保持存活時間,當非核心線程空閑時間超過了keepAliveTime,就會被銷毀
4、workQueue
任務(wù)存儲隊列,當線程池沒有空閑的核心線程時,將緩存提交的任務(wù)到隊列中排隊等待核心線程的處理,當隊列超過設(shè)置的值時,線程池將會創(chuàng)建新的線程去處理隊列的任務(wù),如果線程池中的線程數(shù)量達了maxPooolSize,隊列也達到了設(shè)置值,將會執(zhí)行拒絕策略
小知識:當隊列滿的時候才會去創(chuàng)建多余corePoolSize的線程去處理任務(wù),如果這是個無限隊列(如linkBlockingQueue),無論隊列中有多少任務(wù),線程池都不會去創(chuàng)建多于corePoolSize的線程去處理任務(wù),所以任務(wù)存儲隊列對線程池的影響是非常大的
5、threadFactor
當線程池需要新的線程時,會使用threadFactor來生成新的線程,線程工廠默認使用的是DefaultThreadFactory,通過線程工廠可以設(shè)置線程的線程組、線程名、優(yōu)先級、是否是守護線程等,一般情況下使用默認的即可
6、Handler
線程池無法接受新提交的任務(wù)時就會使用拒絕策略
7、unit
線程存活時間單位
workQueue線程池的工作隊列
隊列的類型主要有以下幾個:
1、SynchronousQuene
不緩存任務(wù)的阻塞隊列,SynchronousQuene內(nèi)部沒有容量,當任務(wù)被提交進來后就會要求線程馬上處理,不會去緩存新進來的任務(wù),如果沒有足夠的核心線程處理,則會創(chuàng)建新線程去處理,當?shù)竭_maxPooolSize時就執(zhí)行拒絕策略,SynchronousQuene適用于任務(wù)數(shù)不多的場景,如果使用了SynchronousQuene應(yīng)當把maxPooolSize設(shè)置得很大,這樣才會避免隊列執(zhí)行拒絕策略
2、LinkedBlockingQuene
無界阻塞隊列,這個隊列是基于鏈表的隊列,按照FIFO排序,使用這個隊列時由于隊列是無界的,所以無論有多少任務(wù)都不會排滿隊列容量,因此線程池不管核心線程是否空閑,maxPooolSize有多大都不會創(chuàng)建任何非核心線程,此隊列適用于任務(wù)數(shù)量非常大的場景,但是風險就在于如果線程的處理速度跟不上任務(wù)的生產(chǎn)速度,任務(wù)緩存數(shù)量過多造成OOM,所以使用此隊列盡量將核心線程數(shù)設(shè)置的大些
3、ArrayBlockingQueue
有界阻塞隊列,這個隊列是基于數(shù)組的隊列,按照FIFO排序,這個隊列的容量是可以設(shè)置的,當任務(wù)數(shù)排滿隊列容量時線程池就會創(chuàng)建新的線程去處理任務(wù),當隊列排滿,線程池的線程數(shù)量也到達maxPooolSize后則會執(zhí)行拒絕策略
4、PriorityBlockingQueue
具有優(yōu)先級的無界阻塞隊列,隊列可以按照優(yōu)先級進行內(nèi)部元素排序,隊列的任務(wù)需要實現(xiàn)Comparable 接口,才能通過使用compareTo()方法進行排序。
5、DelayQueue
是一個具有延遲時效的無界隊列,只有在延遲期滿后才能從隊列中取出任務(wù)
拒絕策略
當線程池的任務(wù)緩存隊列已滿并且線程池中的線程數(shù)目達到maximumPoolSize時,如果還有任務(wù)到來就會采取任務(wù)拒絕策略,通常有以下四種策略:
ThreadPoolExecutor.AbortPolicy:丟棄任務(wù)并拋出RejectedExecutionException異常。這是線程池默認的拒絕策略
ThreadPoolExecutor.DiscardPolicy:丟棄任務(wù),但是不拋出異常。
ThreadPoolExecutor.DiscardOldestPolicy:丟棄隊列最前面的任務(wù),然后重新提交被拒絕的任務(wù)
ThreadPoolExecutor.CallerRunsPolicy:由調(diào)用線程(提交任務(wù)的線程)處理該任務(wù),例如如果是在main方法中提交的,就由main方法去執(zhí)行這個任務(wù)
JDK已被設(shè)定好的線程池類型:
在我們平時開發(fā)中我們總喜歡盲目使用類似newFixedThreadPool(),newCachedThreadPool(),無腦使用它們真的好嗎,我分析分析幾種比較常見的線程池類型:
1、newFixedThreadPool()
不多BB直接看源碼,看看這個方法的構(gòu)造方法的
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(
nThreads//核心線程數(shù),
nThreads//最大線程數(shù),
0L//存活時間,
TimeUnit.MILLISECONDS//存活時間單位,
new LinkedBlockingQueue<Runnable>()//無界隊列
);
}
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}
public ThreadPoolExecutor(int corePoolSize//核心線程數(shù),
int maximumPoolSize//最大線程數(shù),
long keepAliveTime//存活時間,
TimeUnit unit//存活時間單位,
BlockingQueue<Runnable> workQueue//隊列) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
我發(fā)現(xiàn)這個newFixedThreadPool()是一個使用的阻塞隊列是無界的,也就是說newFixedThreadPool()可以接受無限多的任務(wù),所以永遠不會執(zhí)行拒絕策略,但是無論接受了多少任務(wù),線程池都不會創(chuàng)建非核心線程,所以這個線程的缺點就是在并發(fā)量高,任務(wù)處理速度慢的情況下可能隊列數(shù)太多造成OOM。核心線程池等于最大線程池,當前的線程數(shù)能夠比較穩(wěn)定保證一個數(shù)。能夠避免頻繁回收線程和創(chuàng)建線程。故適用于處理cpu密集型的任務(wù),確保cpu在長期被工作線程使用的情況下,盡可能少的分配線程,即適用長期的任務(wù)。
2、newCachedThreadPool()
不多BB直接看源碼,依舊是看構(gòu)造函數(shù)
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);
}
我們看到newCachedThreadPool()使用的是SynchronousQueue隊列,這是一個不會緩存任務(wù)的隊列,并且設(shè)置了核心線程數(shù)為0,最大線程數(shù)可以看做是無限大,所以永遠不會執(zhí)行拒絕策略,線程的存活時間是60秒,我們可以知道這個類型的線程池每接受一個線程就會馬上創(chuàng)建新的線程去處理任務(wù),并且會清理掉線程池中空閑超過60秒的線程,這個線程池由于沒有排隊阻塞所以處理任務(wù)的速度會較其他的線程池快,但是在并發(fā)量很大的時候無節(jié)制的創(chuàng)建和銷毀線程會造成系統(tǒng)很大的資源開銷。這個線程池是適用于并發(fā)不固定的短期小任務(wù)
3、newSingleThreadExecutor()
不多BB直接看源碼,依舊是看構(gòu)造函數(shù)
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
我們看到newSingleThreadExecutor使用的是無界隊列,所以永遠不會執(zhí)行拒絕策略,但是核心線程數(shù)和最大線程數(shù)只是1而已,所以newSingleThreadExecutor無論接收多少任務(wù)都會只會用一個線程去處理任務(wù),任務(wù)只會一個一個的被處理,相當于串行處理,適用串行化任務(wù)
4、newScheduledThreadPool()
這個的源碼比較麻煩,我通過手寫一個事例說明:
public class TestTask implements Runnable {
@Override
public void run() {
System.out.println(Thread.currentThread().getName());
}
}
public static void main(String[] args) {
//核心線程數(shù)設(shè)置為3
ScheduledExecutorService scp = Executors.newScheduledThreadPool(3);
//延遲5秒
scp.schedule(new TestTask(),5, TimeUnit.SECONDS);
//延遲5秒,每兩秒執(zhí)行一次
scp.scheduleAtFixedRate(new TestTask(),5,2,TimeUnit.SECONDS);
}
這四個比較典型的線程池是已經(jīng)被JDK設(shè)置好了的,如果沒有學習過線程池基礎(chǔ)知識的程序員可能只會無腦使用它們,但是通過分析它不一定適合我們的業(yè)務(wù),實際的使用還是要看我們的業(yè)務(wù)具體設(shè)置線程池的參數(shù)值,所以學會自定義線程池是很有必要的
自定義線程池:
ThreadPoolExecutor threadPoolExecutor =
new ThreadPoolExecutor(3,
5,
30, TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(20));
線程池的基本使用
不多BB直接上代碼
private static ExecutorService fx = Executors.newFixedThreadPool(3);
class PooTaskReturn implements Callable<String>{
@Override
public String call() throws Exception {
return "有返回值的線程池方法";
}
}
class PoolTask implements Runnable{
@Override
public void run() {
System.out.println("無返回值的線程池方法");
}
}
@Test
public void test(){
for (int i=0;i<1000;i++) {
fx.execute(new PoolTask());
fx.execute(()->{
System.out.println("使用匿名內(nèi)部類");
});
}
List<Future<String>> fls=new ArrayList<>();
for (int i=0;i<1000;i++) {
Future<String> submit = fx.submit(new PooTaskReturn());
Future<String> lambda=fx.submit(()->{
return "使用匿名內(nèi)部類";
});
fls.add(submit);
fls.add(lambda);
}
fls.forEach(l-> { try {
System.out.println(l.get());
}catch (Exception e) {
System.out.println(e);
}
});
//關(guān)閉線程池,使用shutdown關(guān)閉時會調(diào)用拒絕策略拒絕所有新的任務(wù)進入隊列,
// 然后等待所有的任務(wù),包括隊列中的任務(wù)執(zhí)行完成
fx.shutdown();
//關(guān)閉線程池,使用shutdown關(guān)閉時會調(diào)用拒絕策略拒絕所有新的任務(wù)進入隊列,
// 然后把嘗試關(guān)閉正在執(zhí)行的任務(wù)
//將隊列中的任務(wù)直接丟棄
fx.shutdownNow();
//所有的任務(wù)已經(jīng)完成,返回true
fx.isTerminated();
//收到關(guān)閉指令后返回true,即執(zhí)行了shutdownNow或者shutdown
fx.isShutdown();
}
我看看到幾個關(guān)閉的命令,我們把它列出來:
shutdown():關(guān)閉線程池,使用shutdown關(guān)閉時會調(diào)用拒絕策略拒絕所有新的任務(wù)進入隊列,然后等待所有的任務(wù),包括隊列中的任務(wù)執(zhí)行完成
shutdownNow():關(guān)閉線程池,使用shutdown關(guān)閉時會調(diào)用拒絕策略拒絕所有新的任務(wù)進入隊列,然后把嘗試關(guān)閉正在執(zhí)行的任務(wù),將隊列中的任務(wù)直接丟棄
isTerminated():所有的任務(wù)已經(jīng)完成,返回true
isShutdown():收到關(guān)閉指令后返回true,即執(zhí)行了shutdownNow或者shutdown
CPU 密集型(I/O bound)
CPU密集型也叫計算密集型,指的是系統(tǒng)的硬盤、內(nèi)存性能相對CPU要好很多,此時,系統(tǒng)運作大部分的狀況是CPU Loading 100%,CPU要讀/寫I/O(硬盤/內(nèi)存),I/O在很短的時間就可以完成,而CPU還有許多運算要處理,CPU Loading很高。
CPU密集的意思是該任務(wù)需要大量的運算,而沒有阻塞,CPU一直全速運行。CPU密集任務(wù)只有在真正的多核CPU上才可能得到加速(通過多線程),而在單核CPU上,無論你開幾個模擬的多線程該任務(wù)都不可能得到加速,因為CPU總的運算能力就那些。
CPU 使用率較高(例如:計算一些復雜的運算,邏輯處理等情況)非常多的情況下,線程數(shù)一般只需要設(shè)置為CPU核心數(shù)的線程個數(shù)就可以了。 這一類型多出現(xiàn)在開發(fā)中的一些業(yè)務(wù)復雜計算和邏輯處理過程中。
I/O 密集型(I/O bound)
IO密集型指的是系統(tǒng)的CPU性能相對硬盤、內(nèi)存要好很多,此時,系統(tǒng)運作,大部分的狀況是CPU在等I/O (硬盤/內(nèi)存) 的讀/寫操作,此時CPU Loading并不高。I/O bound的程序一般在達到性能極限時,CPU占用率仍然較低。這可能是因為任務(wù)本身需要大量I/O操作,而pipeline做得不是很好,沒有充分利用處理器能力。
CPU 使用率較低,程序中會存在大量的 I/O 操作占用時間,導致線程空余時間很多,所以通常就需要開CPU核心數(shù)兩倍的線程。當線程進行 I/O 操作 CPU 空閑時,啟用其他線程繼續(xù)使用 CPU,以提高 CPU 的使用率。例如:數(shù)據(jù)庫交互,文件上傳下載,網(wǎng)絡(luò)傳輸?shù)取?/p>
線程等待時間所占比例越高,需要越多線程,啟用其他線程繼續(xù)使用CPU,以此提高CPU的利用率;線程 CPU 時間所占比例越高,需要越少的線程,這一類型在開發(fā)中主要出現(xiàn)在一些計算業(yè)務(wù)頻繁的邏輯中。
方法一:
由于IO密集型任務(wù)線程并不是一直在執(zhí)行任務(wù),則應(yīng)配置盡可能多的線程,如CPU核數(shù)*2
方法二:
IO密集型,即該任務(wù)需要大量的IO,即大量的阻塞。在單線程上運IO密集型的任務(wù)會導致浪費大量的CPU運算能力浪費在等待。所以在IO密集型任務(wù)中使用多線程可以大大的加速程序運行,即使在單核CPU上,這種加速主要就是利用了被浪費掉的阻塞時間。
IO密集型時,大部分線程都阻塞,故需要多配置線程數(shù):
參考公式:CPU核數(shù) /(1 - 阻系數(shù))
比如8核CPU:8/(1 - 0.9)=80個線程數(shù)
阻塞系數(shù)在0.8~0.9之間
總結(jié):
1.一個計算為主的程序(CPU密集型程序),多線程跑的時候,可以充分利用起所有的 CPU 核心數(shù),比如說 8 個核心的CPU ,開8 個線程的時候,可以同時跑 8 個線程的運算任務(wù),此時是最大效率。但是如果線程遠遠超出 CPU 核心數(shù)量,反而會使得任務(wù)效率下降,因為頻繁的切換線程也是要消耗時間的。因此對于 CPU 密集型的任務(wù)來說,線程數(shù)等于 CPU 數(shù)是最好的了。
2.如果是一個磁盤或網(wǎng)絡(luò)為主的程序(IO密集型程序),一個線程處在 IO 等待的時候,另一個線程還可以在 CPU 里面跑,有時候 CPU 閑著沒事干,所有的線程都在等著 IO,這時候他們就是同時的了,而單線程的話此時還是在一個一個等待的。我們都知道 IO 的速度比起 CPU 來是很慢的。此時線程數(shù)等于CPU核心數(shù)的兩倍是最佳的。
核心線程數(shù)計算公式
IO密集型:核心線程數(shù) = CPU核數(shù) / (1-阻塞系數(shù))
CPU密集型:核心線程數(shù) = CPU核數(shù) + 1
IO密集型:核心線程數(shù) = CPU核數(shù) * 2