阿里Java開(kāi)發(fā)手冊(cè)明確java開(kāi)發(fā)使用 ThreadPoolExecutor 的方式創(chuàng)建線程池
3. 【強(qiáng)制】線程資源必須通過(guò)線程池提供,不允許在應(yīng)用中自行顯式創(chuàng)建線程。
說(shuō)明:使用線程池的好處是減少在創(chuàng)建和銷毀線程上所花的時(shí)間以及系統(tǒng)資源的開(kāi)銷,解決資源不足的問(wèn)題。
如果不使用線程池,有可能造成系統(tǒng)創(chuàng)建大量同類線程而導(dǎo)致消耗完內(nèi)存或者 “過(guò)度切換”的問(wèn)題。
4. 【強(qiáng)制】線程池不允許使用 Executors 去創(chuàng)建,而是通過(guò) ThreadPoolExecutor 的方式,
這樣的處理方式讓寫(xiě)的同學(xué)更加明確線程池的運(yùn)行規(guī)則,規(guī)避資源耗盡的風(fēng)險(xiǎn)。
說(shuō)明: Executors 返回的線程池對(duì)象的弊端如下:
1) FixedThreadPool 和 SingleThreadPool :
允許的請(qǐng)求隊(duì)列長(zhǎng)度為 Integer.MAX_VALUE ,可能會(huì)堆積大量的請(qǐng)求,從而導(dǎo)致 OOM 。
2) CachedThreadPool 和 ScheduledThreadPool :
允許的創(chuàng)建線程數(shù)量為 Integer.MAX_VALUE ,可能會(huì)創(chuàng)建大量的線程,從而導(dǎo)致 OOM 。
java五種常見(jiàn)線程池
threadPool = Executors.newCachedThreadPool();//有緩沖的線程池,線程數(shù) JVM 控制
threadPool = Executors.newFixedThreadPool(3);//固定大小的線程池
threadPool = Executors.newScheduledThreadPool(2);//定時(shí)任務(wù)線程數(shù)
threadPool = Executors.newSingleThreadExecutor();//單線程的線程池,只有一個(gè)線程在工作
threadPool = new ThreadPoolExecutor(...);//默認(rèn)線程池,可控制參數(shù)比較多
ThreadPoolExecutor創(chuàng)建線程池
//五個(gè)參數(shù)的構(gòu)造函數(shù),少了ThreadFactory 和 拒絕策略,使用默認(rèn)的
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue)
//六個(gè)參數(shù)的構(gòu)造函數(shù),少了拒絕策略,即使用默認(rèn)的
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory)
//六個(gè)參數(shù)的構(gòu)造函數(shù),少了threadFactory,即使用默認(rèn)的
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler)
//七個(gè)參數(shù)的構(gòu)造函數(shù),全
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
corePoolSize - 線程池大小,即核心線程的最大數(shù)量
默認(rèn)線程池創(chuàng)建之后,線程池中的線程數(shù)為0,當(dāng)任務(wù)過(guò)來(lái)就會(huì)創(chuàng)建一個(gè)核心線程去執(zhí)行,當(dāng)線程數(shù)小于核心線程數(shù)時(shí),即使有空閑線程,線程池也會(huì)優(yōu)先創(chuàng)建新線程處理。直到核心線程數(shù)達(dá)到corePoolSize 之后,就會(huì)被到達(dá)的任務(wù)放在緩存隊(duì)列中。(注意是到達(dá)的任務(wù))。
核心線程會(huì)一直存活,即使沒(méi)有任務(wù)需要執(zhí)行
如果執(zhí)行了線程池的prestartAllCoreThreads()方法,線程池會(huì)提前創(chuàng)建并啟動(dòng)所有核心線程。
maximumPoolSize - 線程池的最大線程數(shù),即核心線程+非核心線程的最大數(shù)量。
keepAliveTime - 針對(duì)非核心線程,當(dāng)非核心線程的空閑時(shí)間超過(guò)keepAliveTime,則會(huì)被銷毀
unit - keepAliveTime 的時(shí)間單位。
workQueue - 用來(lái)儲(chǔ)存等待執(zhí)行任務(wù)的隊(duì)列。
threadFactory - 線程工廠。
handler - 拒絕策略。
(本人拙見(jiàn):核心線程:體制員工 ,非核心線程:合同工)
流程


- 如果當(dāng)前運(yùn)行的線程少于corePoolSize,則創(chuàng)建新線程來(lái)執(zhí)行任務(wù)(注意這一個(gè)步驟需要獲取全局鎖)。
- 如果運(yùn)行的線程等于或多于corePoolSize,則將任務(wù)加入BlockingQueue。
- 如果無(wú)法將任務(wù)加入BlockingQueue(隊(duì)列已滿),則創(chuàng)建新的線程來(lái)處理任務(wù)(注意這一個(gè)步驟需要獲取全局鎖)。
- 如果創(chuàng)建的新線程將使當(dāng)前運(yùn)行的線程超出maximumPoolSize,任務(wù)將被執(zhí)行飽和(拒絕)策略。ThreadPoolExecutor 采用上述的設(shè)計(jì)思路,是為執(zhí)行execute()方法時(shí),盡可能避免獲取全局鎖(一個(gè)嚴(yán)重的可伸縮瓶頸)。在ThreadPoolExecutor完成預(yù)熱之后,幾乎所有的execute()方法調(diào)用都是在執(zhí)行步驟2,而步驟2不需要獲取全局鎖。
workQueue(用于保存等待被執(zhí)行的任務(wù)的阻塞隊(duì)列)
- ArrayBlockingQueue :一個(gè)由數(shù)組結(jié)構(gòu)組成的有界阻塞隊(duì)列,按FIFO排序任務(wù)。
new ArrayBlockingQueue<Runnable>(100)
- LinkedBlockingQueue : 一個(gè)由鏈表結(jié)構(gòu)組成的有界阻塞隊(duì)列,按FIFO排序任務(wù),吞吐量通常要高于ArrayBlockingQueue。
new LinkedBlockingQueue<>();
- SynchronousQueue: 一個(gè)不存儲(chǔ)元素的阻塞隊(duì)列。每個(gè)插入操作必須等到另一個(gè)線程調(diào)用移除操作,否則插入操作一直處于阻塞狀態(tài),吞吐量通常高于LinkedBlockingQueue。
- PriorityBlockingQueue :一個(gè)支持優(yōu)先級(jí)排序的無(wú)界阻塞隊(duì)列。
threadFactory (線程工廠)
自定義ThreadFactory可以實(shí)現(xiàn)自定義線程的名稱、組以及優(yōu)先級(jí)等信息,甚至可以任性的將線程設(shè)置為守護(hù)線程??傊?,自定義ThreadFactory可以更加自由的設(shè)置線程池中所有線程的狀態(tài)。
class MyTreadFactory implements ThreadFactory {
private final AtomicInteger mThreadNum = new AtomicInteger(1);
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r, "my-thread-" + mThreadNum.getAndIncrement());
System.out.println(t.getName() + " has been created");
return t;
}
}
handler (處理并發(fā)量大的情況下,拒絕策略對(duì)程序健壯性非常有用)
RejectedExecutionHandler handler = new ThreadPoolExecutor.AbortPolicy ();
- AbortPolicy (默認(rèn)): 丟棄任務(wù)并拋出RejectedExecutionException異常。
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +e.toString());
}
- DiscardPolicy:也是丟棄任務(wù),但是不拋出異常。
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
- DiscardOldestPolicy:丟棄隊(duì)列最前面的任務(wù),然后重新嘗試執(zhí)行任務(wù)(重復(fù)此過(guò)程)
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
- CallerRunsPolicy:由調(diào)用線程處理該任務(wù)
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
如果你要自定義,添加寫(xiě)日志記錄啥的
public static class MyIgnorePolicy implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
doLog(r, e);
}
private void doLog(Runnable r, ThreadPoolExecutor e) {
// 可做日志記錄等
System.err.println( r.toString() + " rejected");
// System.out.println("completedTaskCount: " + e.getCompletedTaskCount());
}
}
線程池的關(guān)閉
ThreadPoolExecutor提供了兩個(gè)方法,用于線程池的關(guān)閉,分別是shutdown()和shutdownNow(),其中:
shutdown():不會(huì)立即終止線程池,而是要等所有任務(wù)緩存隊(duì)列中的任務(wù)都執(zhí)行完后才終止,但再也不會(huì)接受新的任務(wù)
-
shutdownNow():立即終止線程池,并嘗試打斷正在執(zhí)行的任務(wù),并且清空任務(wù)緩存隊(duì)列,返回尚未執(zhí)行的任務(wù)
1、RUNNING
狀態(tài)說(shuō)明:線程池處于RUNNING狀態(tài),能夠接收新任務(wù),以及對(duì)已添加的任務(wù)進(jìn)行處理。
狀態(tài)切換:線程池的初始化狀態(tài)是RUNNING。換句話說(shuō),線程池一旦被創(chuàng)建,就處于RUNNING狀態(tài),并且線程池中的任務(wù)數(shù)為0。2、SHUTDOWN
狀態(tài)說(shuō)明:線程池處于SHUTDOWN狀態(tài),不接收新任務(wù),能夠處理已經(jīng)添加的任務(wù)。
狀態(tài)切換:調(diào)用shutdown()方法時(shí),線程池由RUNNING -> SHUTDOWN。3、STOP
狀態(tài)說(shuō)明:線程池處于STOP狀態(tài),不接收新任務(wù),不處理已提交的任務(wù),并且會(huì)中斷正在處理的任務(wù)。
狀態(tài)切換:調(diào)用線程池中的shutdownNow()方法時(shí),線程池由(RUNNING or SHUTDOWN) -> STOP。4、TIDYING
狀態(tài)說(shuō)明:當(dāng)所有的任務(wù)已經(jīng)停止,ctl記錄“任務(wù)數(shù)量”為0,線程池會(huì)變?yōu)門IDYING狀態(tài)。當(dāng)線程池處于TIDYING狀態(tài)時(shí),會(huì)執(zhí)行鉤子函數(shù) terminated()。terminated()在ThreadPoolExecutor類中是空, 的,若用戶想在線程池變?yōu)門IDYING時(shí),進(jìn)行相應(yīng)處理,可以通過(guò)重載 terminated()函數(shù)來(lái)實(shí)現(xiàn)。
狀態(tài)切換:當(dāng)線程池在SHUTDOWN狀態(tài)下,阻塞隊(duì)列為空并且線程池中執(zhí)行任務(wù)也為空時(shí),就會(huì)由SHUTDOWN -> TIDYING。當(dāng)線程池在STOP狀態(tài)下,線程池中執(zhí)行的任務(wù)為空時(shí),就會(huì)由STOP-> TIDYING。5、TERMINATED
狀態(tài)說(shuō)明:線程池線程池徹底停止,線程池處于TERMINATED狀態(tài),
狀態(tài)切換:線程池處于TIDYING狀態(tài)時(shí),執(zhí)行完terminated()之后, 就會(huì)由TIDYING->TERMINATED。
submit()和execute()的區(qū)別
任務(wù)分兩類:一類是實(shí)現(xiàn)了Runnable接口的類,一類是實(shí)現(xiàn)了Callable接口的類。兩者都可以被ExecutorService執(zhí)行,它們的區(qū)別是:
execute(Runnable x) 沒(méi)有返回值??梢詧?zhí)行任務(wù),但無(wú)法判斷任務(wù)是否成功完成?!獙?shí)現(xiàn)Runnable接口的任務(wù)
submit(Runnable x) 返回一個(gè)future??梢杂眠@個(gè)future來(lái)判斷任務(wù)是否成功完成?!獙?shí)現(xiàn)Callable接口和Runnable接口的任務(wù),如果為Runnable,則get得到的為null:
使用submit方法還有一個(gè)特點(diǎn)就是,他的異??梢栽谥骶€程中catch到。而使用execute方法執(zhí)行任務(wù)是捕捉不到異常的。
future.get()方法是會(huì)有阻塞性,在調(diào)用submit提交任務(wù)之后,future.get()一直等到任務(wù)執(zhí)行完畢,拿到了返回的返回值,主線程才會(huì)繼續(xù)運(yùn)行。
設(shè)置合理的線程池大小
任務(wù)一般可分為:CPU密集型、IO密集型、混合型,對(duì)于不同類型的任務(wù)需要分配不同大小的線程池。
- CPU密集型任務(wù)
盡量使用較小的線程池,一般為CPU核心數(shù)+1。
因?yàn)镃PU密集型任務(wù)使得CPU使用率很高,若開(kāi)過(guò)多的線程數(shù),只能增加上下文切換的次數(shù),因此會(huì)帶來(lái)額外的開(kāi)銷。
IO密集型任務(wù)
可以使用稍大的線程池,一般為2*CPU核心數(shù)。 - IO密集型任務(wù)CPU使用率并不高,因此可以讓CPU在等待IO的時(shí)候去處理別的任務(wù),充分利用CPU時(shí)間。
- 混合型任務(wù)
可以將任務(wù)分成IO密集型和CPU密集型任務(wù),然后分別用不同的線程池去處理。
只要分完之后兩個(gè)任務(wù)的執(zhí)行時(shí)間相差不大,那么就會(huì)比串行執(zhí)行來(lái)的高效。
因?yàn)槿绻麆澐种髢蓚€(gè)任務(wù)執(zhí)行時(shí)間相差甚遠(yuǎn),那么先執(zhí)行完的任務(wù)就要等后執(zhí)行完的任務(wù),最終的時(shí)間仍然取決于后執(zhí)行完的任務(wù),而且還要加上任務(wù)拆分與合并的開(kāi)銷,得不償失。
public class ThreadTest {
//自定義ThreadFactory
private static class NameTreadFactory implements ThreadFactory {
private final AtomicInteger mThreadNum = new AtomicInteger(1);
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r, "my-thread-" + mThreadNum.getAndIncrement());
System.out.println(t.getName() + " has been created");
return t;
}
}
//自定義拒絕策略
private static class MyIgnorePolicy implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
doLog(r, e);
}
private void doLog(Runnable r, ThreadPoolExecutor e) {
// 日志記錄
System.err.println( r.toString() + " rejected");
}
}
private final static ThreadPoolExecutor executor = new ThreadPoolExecutor
(10, 30, 2000, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<Runnable>(100),new NameTreadFactory(),new MyIgnorePolicy());
public static void main(String[] args) throws ExecutionException, InterruptedException {
//execute
executor.execute(()->{
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("execute,無(wú)返回值");
});
//submit
Future<String> future = executor.submit(()->{
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "submit,有返回值Future<String>";
});
System.out.println(future.get());
System.out.println("結(jié)束");
}
}
打印結(jié)果
my-thread-1 has been created
my-thread-2 has been created
execute,無(wú)返回值
submit,有返回值Future<String>
結(jié)束
