如果線程池的拒絕策略設置成DiscardPolicy或者DiscardOldestPolicy,通過Future獲取執(zhí)行結果,可能導致線程會一直阻塞。
問題復現(xiàn)
// 創(chuàng)建一個單線程,拒絕策略時 DiscardPolicy
private final static ThreadPoolExecutor executorService = new
ThreadPoolExecutor(1, 1, 1L, TimeUnit.MINUTES,
new SynchronousQueue<Runnable>(), new ThreadPoolExecutor.DiscardPolicy());
public static void main(String[] args) throws Exception {
//提交任務,阻塞 5 秒
Future taskOne = executorService.submit(() -> {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
//此時,隊列和線程已經都被占用,當前提交的任務會執(zhí)行拒絕策略
Future taskTwo = null;
try {
taskTwo = executorService.submit(() -> System.out.println("start runable three"));
} catch (Exception e) {
System.out.println(e.getLocalizedMessage());
}
System.out.println("獲取結果:");
System.out.println("task one " + taskOne.get()); //(5)等待任務one執(zhí)行完畢
System.out
.println("task two " + (taskTwo == null ? null : taskTwo.get())); // (7)等待任務three執(zhí)行完畢
executorService.shutdown(); //關閉線程池,阻塞直到所有任務執(zhí)行完畢
}
執(zhí)行結果如下,第一個task正??梢垣@取結果,但是第二個task一直獲取不到結果,程序一直卡在這里,不會繼續(xù)執(zhí)行。
獲取結果:
task one null
問題分析
提交任務到線程池時,會包裝成 FutureTask ,初始狀態(tài)是 NEW。執(zhí)行的任務是包裝后的FutureTask對象。
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
// 包裝成 FutureTask
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
提交執(zhí)行任務方法邏輯如下。
public void execute(Runnable command) {
...
//如果線程個數(shù)小于核心線程數(shù)則新增處理線程
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// 如果當前線程個數(shù)已經達到核心線程數(shù)則把任務放入隊列
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 嘗試新增處理線程
else if (! addWorker(command, false))
reject(command); //新增失敗則調用拒絕策略
}
示例代碼中第二個任務會執(zhí)行到reject邏輯。DiscardPolicy的方法是空實現(xiàn),所以新創(chuàng)建的FutureTask還是NEW狀態(tài),這個狀態(tài)和get方法阻塞有密切的關系。
DiscardPolicy 和 DiscardOldestPolicy 代碼如下。他們有一個共同點就是沒有處理task的狀態(tài)。
public static class DiscardPolicy implements RejectedExecutionHandler {
/**
* 空方法,task會保留在NEW狀態(tài)
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
}
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
/**
* poll 出一個任務,但是沒有任務處理,所以poll出來的任務是NEW狀態(tài)
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
}
先看下 FutureTask 的狀態(tài)。前面我們看到了初始化狀態(tài)是NEW,其他狀態(tài)說明如下。
private static final int NEW = 0; 新的任務,初始狀態(tài)
private static final int COMPLETING = 1; 當任務被設置結果時,處于COMPLETING狀態(tài),這是一個中間狀態(tài)。
private static final int NORMAL = 2; 表示任務正常結束。
private static final int EXCEPTIONAL = 3; 表示任務因異常而結束
private static final int CANCELLED = 4; 任務還未執(zhí)行之前就調用了cancel(true)方法,任務處于CANCELLED
private static final int INTERRUPTING = 5; 當任務調用cancel(true)中斷程序時,任務處于INTERRUPTING狀態(tài),這是一個中間狀態(tài)。
private static final int INTERRUPTED = 6; 任務調用cancel(true)中斷程序時會調用interrupt()方法中斷線程運行,任務狀態(tài)由INTERRUPTING轉變?yōu)镮NTERRUPTED
繼續(xù)看下 FutureTask 的get方法。
public V get() throws InterruptedException, ExecutionException {
int s = state;
//當狀態(tài)值<=COMPLETING時需要等待,否則調用report返回
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}
private V report(int s) throws ExecutionException {
Object x = outcome;
// 正常結束,返回結果
if (s == NORMAL)
return (V)x;
// 如果是 >= CANCELLED 拋出取消異常,包括:CANCELLED,INTERRUPTING,INTERRUPTED狀態(tài)
if (s >= CANCELLED)
throw new CancellationException();
// 剩下的條件就是 EXCEPTIONAL 了,執(zhí)行的任務拋出異常
throw new ExecutionException((Throwable)x)
}
到這里已經很清楚了。FutureTask狀態(tài)>COMPLETING 才會返回。因為拒絕策略沒有修改FutureTask的狀態(tài),F(xiàn)utureTask的狀態(tài)一直是NEW,所以不會返回。
其他 RejectedExecutionHandler 為什么不會導致阻塞
我看看下默認的 AbortPolicy 的實現(xiàn):
public static class AbortPolicy implements RejectedExecutionHandler {
// 回憶一下submit方法,最后會執(zhí)行reject策略。
// AbortPolicy 直接拋出異常,調用方馬上可以獲取結果
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
CallerRunsPolicy 策略則是讓調用線程執(zhí)行提交的任務,執(zhí)行任務時會更新狀態(tài),自然也不會阻塞。
public static class CallerRunsPolicy implements RejectedExecutionHandler {
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
}
解決方案
- 使用帶超時時間的get方法,這樣使用DiscardPolicy拒絕策略不會一直阻塞。
- 如果一定要使用Discardpolicy 拒絕策略,需要自定義拒絕策略。
public void rejectedExecution(Runnable runable, ThreadPoolExecutor e) {
if (! e.isShutdown()) {
if(null ! = runable && runable instanceof FutureTask){
((FutureTask) runable).cancel(true);
}
}
}