ThreadPoolExecutor是Executor框架的主要成員,也是最核心的類,是線程池的實(shí)現(xiàn)類。
通過(guò)Executor框架的Executors工具類,可以創(chuàng)建3種類型的ThreadPoolExecutor,如下
-
FixedThreadPool:適用于需要限制當(dāng)前線程數(shù)量的應(yīng)用場(chǎng)景 -
SingleThreadExecutor:適用于需要保證順序地執(zhí)行各個(gè)任務(wù)且在任意時(shí)間點(diǎn),不會(huì)有多個(gè)線程活動(dòng)的應(yīng)用場(chǎng)景 -
CachedThreadPool:適用于執(zhí)行很多的短期異步任務(wù)
FixedThreadPool
可重用固定線程數(shù)的線程池
源碼
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}
FixedThreadPool的運(yùn)行示意圖

FixedThreadPool的運(yùn)行示意圖
- 如果當(dāng)前運(yùn)行的線程數(shù)少于
corePoolSize,則創(chuàng)建新的線程來(lái)執(zhí)行任務(wù)。 - 當(dāng)前運(yùn)行的線程數(shù)等于或者大于
corePoolSize,將任務(wù)加入到LinkedBlockingQueue。 - 任務(wù)執(zhí)行完后會(huì)在循環(huán)中反復(fù)從
LinkedBlockingQueue中獲取任務(wù)來(lái)執(zhí)行
-因?yàn)槭褂玫氖菬o(wú)界隊(duì)列,所以運(yùn)行中的FixedThreadPool不會(huì)拒絕任務(wù),maximumPoolSize和keepAliveTime都是無(wú)效參數(shù)。
例子
package com.sy.thread.example;
import java.util.concurrent.*;
/**
* Description: thread
*
* @author songyu
*/
public class FixedThreadPoolTest {
public static void main(String[] args) {
//因?yàn)閚ewFixedThreadPool創(chuàng)建的是使用的無(wú)界的隊(duì)列,如果IDEA里面安裝了阿里的java編碼規(guī)范會(huì)提示:
//FixedThreadPool和SingleThreadPool: 允許的請(qǐng)求隊(duì)列長(zhǎng)度為Integer.MAX_VALUE,可能會(huì)堆積大量的請(qǐng)求,從而導(dǎo)致OOM
//可以換一種寫法等同寫法,如下
//int nThreads = 1;
//ExecutorService service = new ThreadPoolExecutor(nThreads,nThreads,0L, TimeUnit.MILLISECONDS,
// new LinkedBlockingQueue<Runnable>());
ExecutorService service = Executors.newFixedThreadPool(1);
for(int i = 0; i < 5; i++) {
int finalI = i;
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
System.out.println("第" + finalI + "任務(wù)執(zhí)行");
}
});
service.execute(thread);
}
}
}
SingleThreadExecutor
使用單個(gè)工作線程
源碼
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory));
}
SingleThreadExecutor的運(yùn)行示意圖

SingleThreadExecutor的運(yùn)行示意圖
- 任務(wù)提交到線程池后,如果線程池中沒有運(yùn)行的線程,則創(chuàng)建新的線程來(lái)執(zhí)行任務(wù)。
- 如果線程中有一個(gè)運(yùn)行的線程,則將任務(wù)加入到
LinkedBlockingQueue。 - 任務(wù)執(zhí)行完后會(huì)在循環(huán)中反復(fù)從
LinkedBlockingQueue中獲取任務(wù)來(lái)執(zhí)行。
例子
package com.sy.thread.example;
import java.util.concurrent.*;
/**
* Description: thread
*
* @author songyu
*/
public class SingleThreadExecutorTest {
public static void main(String[] args) {
//因?yàn)閚ewSingleThreadExecutor創(chuàng)建的是使用的無(wú)界的隊(duì)列,如果IDEA里面安裝了阿里的java編碼規(guī)范會(huì)提示:
//FixedThreadPool和SingleThreadPool: 允許的請(qǐng)求隊(duì)列長(zhǎng)度為Integer.MAX_VALUE,可能會(huì)堆積大量的請(qǐng)求,從而導(dǎo)致OOM
//可以換一種寫法等同寫法,如下
//ExecutorService service = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
ExecutorService service = Executors.newSingleThreadExecutor();
for(int i = 0; i < 5; i++) {
int finalI = i;
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
System.out.println("第" + finalI + "任務(wù)執(zhí)行");
}
});
service.execute(thread);
}
}
}
CachedThreadPool
根據(jù)需要?jiǎng)?chuàng)建新線程
源碼
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);
}
CachedThreadPool的corePoolSize設(shè)置成0,即corePool為空,maximumPoolSize被設(shè)置為Integer.MAX_VALUE,即maximumPool是無(wú)界的,keepAliveTime設(shè)置為60L,即CachedThreadPool中的空閑線程等待新任務(wù)的最長(zhǎng)時(shí)間為60s,超過(guò)60s就會(huì)被終止。
CachedThreadPool使用的是沒有容量的SynchronousQueue作為工作隊(duì)列,而maximumPool也是無(wú)界的,所以,如果主線程提交任務(wù)的速度大于線程池中的線程處理速度,CachedThreadPool會(huì)不斷創(chuàng)建新線程,在極端情況下會(huì)因?yàn)閯?chuàng)建過(guò)多的線程而耗盡CPU和內(nèi)存資源。
CachedThreadPool運(yùn)行示意圖

CachedThreadPool運(yùn)行示意圖
- 首先執(zhí)行
SynchronousQueue.offer(Runnable task),如果當(dāng)前maximumPool中的空閑線程正在執(zhí)行SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS),那么主線程執(zhí)行offer操作與空閑線程執(zhí)行的poll操作配對(duì)成功,主線程把任務(wù)交給空閑線程執(zhí)行(當(dāng)初始maximumPool為空或者maximumPool中當(dāng)前沒有空閑線程時(shí),將沒有線程執(zhí)行SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS),上述步驟將會(huì)失敗,這個(gè)時(shí)候CachedThreadPool會(huì)創(chuàng)建一個(gè)新線程執(zhí)行任務(wù)。 - 新創(chuàng)建的線程將任務(wù)執(zhí)行完畢后,會(huì)執(zhí)行
SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS)讓空閑的線程最多在SynchronousQueue中等待60s,如果60s內(nèi)主線程提交了新任務(wù),那么空線程將執(zhí)行新任務(wù),否則線程終止。
SynchronousQueue是一個(gè)沒有容量的阻塞隊(duì)列,CachedThreadPool使用SynchronousQueue把主線程提交的任務(wù)傳遞給空閑線程執(zhí)行。
CachedThreadPool中任務(wù)傳遞示意圖

CachedThreadPool中任務(wù)傳遞示意圖
例子
package com.sy.thread.example;
import java.util.concurrent.*;
/**
* Description: thread
*
* @author songyu
*/
public class CachedThreadPoolTest {
public static void main(String[] args) {
//因?yàn)镃achedThreadPool允許的創(chuàng)建線程數(shù)量為Integer.MAX_VALUE,如果IDEA里面安裝了阿里的java編碼規(guī)范會(huì)提示:
//CachedThreadPool: 允許的創(chuàng)建線程數(shù)量為Integer.MAX_VALUE,可能會(huì)創(chuàng)建大量的線程,從而導(dǎo)致OOM
//可以換一種寫法等同寫法,如下
//ExecutorService service = new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());
ExecutorService service = Executors.newCachedThreadPool();
for(int i = 0; i < 5; i++) {
int finalI = i;
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
System.out.println("第" + finalI + "任務(wù)執(zhí)行");
}
});
service.execute(thread);
}
}
}
參考書籍《java并發(fā)編程的藝術(shù)》推薦大家閱讀。