概述
在JAVA的世界里,如果想并行的執(zhí)行一些任務(wù),可以使用ThreadPoolExecutor。
大部分情況下直接使用ThreadPoolExecutor就可以滿足要求了,但是在某些場(chǎng)景下,比如瞬時(shí)大流量的,為了提高響應(yīng)和吞吐量,最好還是擴(kuò)展一下ThreadPoolExecutor。
全宇宙的JAVA IT人士應(yīng)該都知道ThreadPoolExecutor的執(zhí)行流程:
- core線程還能應(yīng)付的,則不斷的<font color="red">創(chuàng)建</font>新的線程;
- core線程無(wú)法應(yīng)付,則將任務(wù)扔到隊(duì)列里面;
- 隊(duì)列滿了(<font color="red">意味著插入任務(wù)失敗</font>),則開始創(chuàng)建MAX線程,線程數(shù)達(dá)到MAX后,隊(duì)列還一直是滿的,則拋出RejectedExecutionException.
這個(gè)執(zhí)行流程有個(gè)小問(wèn)題,就是當(dāng)core線程無(wú)法應(yīng)付請(qǐng)求的時(shí)候,會(huì)<font color="red">立刻</font>將任務(wù)添加到隊(duì)列中,如果隊(duì)列非常長(zhǎng),而任務(wù)又非常多,那么將會(huì)有頻繁的<font color="red">任務(wù)入隊(duì)列</font>和<font color="red">任務(wù)出隊(duì)列</font>的操作。
根據(jù)實(shí)際的壓測(cè)發(fā)現(xiàn),這種操作也是有一定消耗的。其實(shí)JAVA提供的SynchronousQueue隊(duì)列是一個(gè)零長(zhǎng)度的隊(duì)列,任務(wù)都是直接由生產(chǎn)者遞交給消費(fèi)者,中間沒(méi)有入隊(duì)列的過(guò)程,可見JAVA API的設(shè)計(jì)者也是有考慮過(guò)入隊(duì)列這種操作的開銷。
另外,任務(wù)一多,立刻扔到隊(duì)列里,而MAX線程又不干活,如果隊(duì)列里面太多任務(wù)了,只有可憐的core線程在忙,也是會(huì)影響性能的。
當(dāng)core線程無(wú)法應(yīng)付請(qǐng)求的時(shí)候,能不能延后<font color="red">入隊(duì)列</font>這個(gè)操作呢? 讓MAX線程盡快啟動(dòng)起來(lái),幫忙處理任務(wù)。
也即是說(shuō),當(dāng)core線程無(wú)法應(yīng)付請(qǐng)求的時(shí)候,如果當(dāng)前線程池中的線程數(shù)量還小于MAX線程數(shù)的時(shí)候,繼續(xù)創(chuàng)建新的線程處理任務(wù),一直到線程數(shù)量到達(dá)MAX后,才將任務(wù)插入到隊(duì)列里
我們通過(guò)覆蓋隊(duì)列的offer方法來(lái)實(shí)現(xiàn)這個(gè)目標(biāo)。
@Override
public boolean offer(Runnable o) {
int currentPoolThreadSize = executor.getPoolSize();
//如果線程池里的線程數(shù)量已經(jīng)到達(dá)最大,將任務(wù)添加到隊(duì)列中
if (currentPoolThreadSize == executor.getMaximumPoolSize()) {
return super.offer(o);
}
//說(shuō)明有空閑的線程,這個(gè)時(shí)候無(wú)需創(chuàng)建core線程之外的線程,而是把任務(wù)直接丟到隊(duì)列里即可
if (executor.getSubmittedTaskCount() < currentPoolThreadSize) {
return super.offer(o);
}
//如果線程池里的線程數(shù)量還沒(méi)有到達(dá)最大,直接創(chuàng)建線程,而不是把任務(wù)丟到隊(duì)列里面
if (currentPoolThreadSize < executor.getMaximumPoolSize()) {
return false;
}
return super.offer(o);
}
注意其中的
if (executor.getSubmittedTaskCount() < currentPoolThreadSize) {
return super.offer(o);
}
是表示core線程仍然能處理的來(lái),同時(shí)又有空閑線程的情況,將任務(wù)插入到隊(duì)列中。 如何判斷線程池中有空閑線程呢? 可以使用一個(gè)計(jì)數(shù)器來(lái)實(shí)現(xiàn),每當(dāng)execute方法被執(zhí)行的時(shí)候,計(jì)算器加1,當(dāng)afterExecute被執(zhí)行后,計(jì)數(shù)器減1.
@Override
public void execute(Runnable command) {
submittedTaskCount.incrementAndGet();
//代碼未完整,待補(bǔ)充。。。。。
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
submittedTaskCount.decrementAndGet();
}
這樣,當(dāng)
executor.getSubmittedTaskCount() < currentPoolThreadSize
的時(shí)候,說(shuō)明有空閑線程。
完整代碼
EnhancedThreadPoolExecutor類
package executer;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
public class EnhancedThreadPoolExecutor extends java.util.concurrent.ThreadPoolExecutor {
/**
* 計(jì)數(shù)器,用于表示已經(jīng)提交到隊(duì)列里面的task的數(shù)量,這里task特指還未完成的task。
* 當(dāng)task執(zhí)行完后,submittedTaskCount會(huì)減1的。
*/
private final AtomicInteger submittedTaskCount = new AtomicInteger(0);
public EnhancedThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, TaskQueue workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, new ThreadPoolExecutor.AbortPolicy());
workQueue.setExecutor(this);
}
/**
* 覆蓋父類的afterExecute方法,當(dāng)task執(zhí)行完成后,將計(jì)數(shù)器減1
*/
@Override
protected void afterExecute(Runnable r, Throwable t) {
submittedTaskCount.decrementAndGet();
}
public int getSubmittedTaskCount() {
return submittedTaskCount.get();
}
/**
* 覆蓋父類的execute方法,在任務(wù)開始執(zhí)行之前,計(jì)數(shù)器加1。
*/
@Override
public void execute(Runnable command) {
submittedTaskCount.incrementAndGet();
try {
super.execute(command);
} catch (RejectedExecutionException rx) {
//當(dāng)發(fā)生RejectedExecutionException,嘗試再次將task丟到隊(duì)列里面,如果還是發(fā)生RejectedExecutionException,則直接拋出異常。
BlockingQueue<Runnable> taskQueue = super.getQueue();
if (taskQueue instanceof TaskQueue) {
final TaskQueue queue = (TaskQueue)taskQueue;
if (!queue.forceTaskIntoQueue(command)) {
submittedTaskCount.decrementAndGet();
throw new RejectedExecutionException("隊(duì)列已滿");
}
} else {
submittedTaskCount.decrementAndGet();
throw rx;
}
}
}
}
TaskQueue
package executer;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
public class TaskQueue extends LinkedBlockingQueue<Runnable> {
private EnhancedThreadPoolExecutor executor;
public TaskQueue(int capacity) {
super(capacity);
}
public void setExecutor(EnhancedThreadPoolExecutor exec) {
executor = exec;
}
public boolean forceTaskIntoQueue(Runnable o) {
if (executor.isShutdown()) {
throw new RejectedExecutionException("Executor已經(jīng)關(guān)閉了,不能將task添加到隊(duì)列里面");
}
return super.offer(o);
}
@Override
public boolean offer(Runnable o) {
int currentPoolThreadSize = executor.getPoolSize();
//如果線程池里的線程數(shù)量已經(jīng)到達(dá)最大,將任務(wù)添加到隊(duì)列中
if (currentPoolThreadSize == executor.getMaximumPoolSize()) {
return super.offer(o);
}
//說(shuō)明有空閑的線程,這個(gè)時(shí)候無(wú)需創(chuàng)建core線程之外的線程,而是把任務(wù)直接丟到隊(duì)列里即可
if (executor.getSubmittedTaskCount() < currentPoolThreadSize) {
return super.offer(o);
}
//如果線程池里的線程數(shù)量還沒(méi)有到達(dá)最大,直接創(chuàng)建線程,而不是把任務(wù)丟到隊(duì)列里面
if (currentPoolThreadSize < executor.getMaximumPoolSize()) {
return false;
}
return super.offer(o);
}
}
TestExecuter
package executer;
import java.util.concurrent.TimeUnit;
public class TestExecuter {
private static final int CORE_SIZE = 5;
private static final int MAX_SIZE = 10;
private static final long KEEP_ALIVE_TIME = 30;
private static final int QUEUE_SIZE = 5;
static EnhancedThreadPoolExecutor executor = new EnhancedThreadPoolExecutor(CORE_SIZE,MAX_SIZE,KEEP_ALIVE_TIME, TimeUnit.SECONDS , new TaskQueue(QUEUE_SIZE));
public static void main(String[] args){
for (int i = 0; i < 15; i++) {
executor.execute(new Runnable() {
@Override
public void run() {
try {
Thread.currentThread().sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
System.out.println("線程池中現(xiàn)在的線程數(shù)目是:"+executor.getPoolSize()+", 隊(duì)列中正在等待執(zhí)行的任務(wù)數(shù)量為:"+ executor.getQueue().size());
}
}
}
先運(yùn)行一下代碼,看看是否如何預(yù)期。直接執(zhí)行TestExecuter類中的main方法,運(yùn)行結(jié)果如下:
線程池中現(xiàn)在的線程數(shù)目是:1, 隊(duì)列中正在等待執(zhí)行的任務(wù)數(shù)量為:0
線程池中現(xiàn)在的線程數(shù)目是:2, 隊(duì)列中正在等待執(zhí)行的任務(wù)數(shù)量為:0
線程池中現(xiàn)在的線程數(shù)目是:3, 隊(duì)列中正在等待執(zhí)行的任務(wù)數(shù)量為:0
線程池中現(xiàn)在的線程數(shù)目是:4, 隊(duì)列中正在等待執(zhí)行的任務(wù)數(shù)量為:0
線程池中現(xiàn)在的線程數(shù)目是:5, 隊(duì)列中正在等待執(zhí)行的任務(wù)數(shù)量為:0
線程池中現(xiàn)在的線程數(shù)目是:6, 隊(duì)列中正在等待執(zhí)行的任務(wù)數(shù)量為:0
線程池中現(xiàn)在的線程數(shù)目是:7, 隊(duì)列中正在等待執(zhí)行的任務(wù)數(shù)量為:0
線程池中現(xiàn)在的線程數(shù)目是:8, 隊(duì)列中正在等待執(zhí)行的任務(wù)數(shù)量為:0
線程池中現(xiàn)在的線程數(shù)目是:9, 隊(duì)列中正在等待執(zhí)行的任務(wù)數(shù)量為:0
線程池中現(xiàn)在的線程數(shù)目是:10, 隊(duì)列中正在等待執(zhí)行的任務(wù)數(shù)量為:0
線程池中現(xiàn)在的線程數(shù)目是:10, 隊(duì)列中正在等待執(zhí)行的任務(wù)數(shù)量為:1
線程池中現(xiàn)在的線程數(shù)目是:10, 隊(duì)列中正在等待執(zhí)行的任務(wù)數(shù)量為:2
線程池中現(xiàn)在的線程數(shù)目是:10, 隊(duì)列中正在等待執(zhí)行的任務(wù)數(shù)量為:3
線程池中現(xiàn)在的線程數(shù)目是:10, 隊(duì)列中正在等待執(zhí)行的任務(wù)數(shù)量為:4
線程池中現(xiàn)在的線程數(shù)目是:10, 隊(duì)列中正在等待執(zhí)行的任務(wù)數(shù)量為:5
可以看到當(dāng)線程數(shù)增加到core數(shù)量的時(shí)候,隊(duì)列中是沒(méi)有任務(wù)的。一直到線程數(shù)量增加到MAX數(shù)量,也即是10的時(shí)候,隊(duì)列中才開始有任務(wù)。符合我們的預(yù)期。
如果我們注釋掉TaskQueue類中的offer方法,也即是不覆蓋隊(duì)列的offer方法,那么運(yùn)行結(jié)果如下:
線程池中現(xiàn)在的線程數(shù)目是:1, 隊(duì)列中正在等待執(zhí)行的任務(wù)數(shù)量為:0
線程池中現(xiàn)在的線程數(shù)目是:2, 隊(duì)列中正在等待執(zhí)行的任務(wù)數(shù)量為:0
線程池中現(xiàn)在的線程數(shù)目是:3, 隊(duì)列中正在等待執(zhí)行的任務(wù)數(shù)量為:0
線程池中現(xiàn)在的線程數(shù)目是:4, 隊(duì)列中正在等待執(zhí)行的任務(wù)數(shù)量為:0
線程池中現(xiàn)在的線程數(shù)目是:5, 隊(duì)列中正在等待執(zhí)行的任務(wù)數(shù)量為:0
線程池中現(xiàn)在的線程數(shù)目是:5, 隊(duì)列中正在等待執(zhí)行的任務(wù)數(shù)量為:1
線程池中現(xiàn)在的線程數(shù)目是:5, 隊(duì)列中正在等待執(zhí)行的任務(wù)數(shù)量為:2
線程池中現(xiàn)在的線程數(shù)目是:5, 隊(duì)列中正在等待執(zhí)行的任務(wù)數(shù)量為:3
線程池中現(xiàn)在的線程數(shù)目是:5, 隊(duì)列中正在等待執(zhí)行的任務(wù)數(shù)量為:4
線程池中現(xiàn)在的線程數(shù)目是:5, 隊(duì)列中正在等待執(zhí)行的任務(wù)數(shù)量為:5
線程池中現(xiàn)在的線程數(shù)目是:6, 隊(duì)列中正在等待執(zhí)行的任務(wù)數(shù)量為:5
線程池中現(xiàn)在的線程數(shù)目是:7, 隊(duì)列中正在等待執(zhí)行的任務(wù)數(shù)量為:5
線程池中現(xiàn)在的線程數(shù)目是:8, 隊(duì)列中正在等待執(zhí)行的任務(wù)數(shù)量為:5
線程池中現(xiàn)在的線程數(shù)目是:9, 隊(duì)列中正在等待執(zhí)行的任務(wù)數(shù)量為:5
線程池中現(xiàn)在的線程數(shù)目是:10, 隊(duì)列中正在等待執(zhí)行的任務(wù)數(shù)量為:5
可以看到當(dāng)線程數(shù)增加到core數(shù)量的時(shí)候,隊(duì)列中已經(jīng)有任務(wù)了。
進(jìn)一步思考
在使用ThreadPoolExecutor的時(shí)候,如果發(fā)生了RejectedExecutionException,該如何處理?本文中的代碼是采用了重新將任務(wù)嘗試插入到隊(duì)列中,如果還是失敗則直接將reject異常拋出去。
@Override
public void execute(Runnable command) {
submittedTaskCount.incrementAndGet();
try {
super.execute(command);
} catch (RejectedExecutionException rx) {
//當(dāng)發(fā)生RejectedExecutionException,嘗試再次將task丟到隊(duì)列里面,如果還是發(fā)生RejectedExecutionException,則直接拋出異常。
BlockingQueue<Runnable> taskQueue = super.getQueue();
if (taskQueue instanceof TaskQueue) {
final TaskQueue queue = (TaskQueue)taskQueue;
if (!queue.forceTaskIntoQueue(command)) {
submittedTaskCount.decrementAndGet();
throw new RejectedExecutionException("隊(duì)列已滿");
}
} else {
submittedTaskCount.decrementAndGet();
throw rx;
}
}
}
TaskQueue類提供了forceTaskIntoQueue方法,將任務(wù)插入到隊(duì)列中。
還有另一種解決方案,就是使用另外一個(gè)線程池來(lái)執(zhí)行任務(wù),當(dāng)?shù)谝粋€(gè)線程池拋出Reject異常時(shí),catch住它,并使用第二個(gè)線程池處理任務(wù)。