Future.get卡死,線程池的一個坑點

如果線程池的拒絕策略設置成DiscardPolicy或者DiscardOldestPolicy,通過Future獲取執(zhí)行結果,可能導致線程會一直阻塞。

問題復現(xiàn)

  // 創(chuàng)建一個單線程,拒絕策略時 DiscardPolicy
  private final static ThreadPoolExecutor executorService = new
      ThreadPoolExecutor(1, 1, 1L, TimeUnit.MINUTES,
      new SynchronousQueue<Runnable>(), new ThreadPoolExecutor.DiscardPolicy());

  public static void main(String[] args) throws Exception {
    //提交任務,阻塞 5 秒
    Future taskOne = executorService.submit(() -> {
      try {
        Thread.sleep(5000);
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    });
    //此時,隊列和線程已經都被占用,當前提交的任務會執(zhí)行拒絕策略
    Future taskTwo = null;
    try {
      taskTwo = executorService.submit(() -> System.out.println("start runable three"));
    } catch (Exception e) {
      System.out.println(e.getLocalizedMessage());
    }
    System.out.println("獲取結果:");
    System.out.println("task one " + taskOne.get()); //(5)等待任務one執(zhí)行完畢
    System.out
        .println("task two " + (taskTwo == null ? null : taskTwo.get())); // (7)等待任務three執(zhí)行完畢
    executorService.shutdown(); //關閉線程池,阻塞直到所有任務執(zhí)行完畢
  }

執(zhí)行結果如下,第一個task正??梢垣@取結果,但是第二個task一直獲取不到結果,程序一直卡在這里,不會繼續(xù)執(zhí)行。

獲取結果:
task one null

問題分析

提交任務到線程池時,會包裝成 FutureTask ,初始狀態(tài)是 NEW。執(zhí)行的任務是包裝后的FutureTask對象。

public Future<?> submit(Runnable task) {
    if (task == null) throw new NullPointerException();
    // 包裝成 FutureTask
    RunnableFuture<Void> ftask = newTaskFor(task, null);
    execute(ftask);
    return ftask;
}
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
    return new FutureTask<T>(runnable, value);
}
public FutureTask(Runnable runnable, V result) {
    this.callable = Executors.callable(runnable, result);
    this.state = NEW;       // ensure visibility of callable
}

提交執(zhí)行任務方法邏輯如下。

public void execute(Runnable command) {
  ...
  //如果線程個數(shù)小于核心線程數(shù)則新增處理線程
  int c = ctl.get();
  if (workerCountOf(c) < corePoolSize) {
      if (addWorker(command, true))
          return;
      c = ctl.get();
  }
  // 如果當前線程個數(shù)已經達到核心線程數(shù)則把任務放入隊列
  if (isRunning(c) && workQueue.offer(command)) {
      int recheck = ctl.get();
      if (! isRunning(recheck) && remove(command))
          reject(command);
      else if (workerCountOf(recheck) == 0)
      addWorker(null, false);
  }
  // 嘗試新增處理線程
  else if (! addWorker(command, false))
      reject(command); //新增失敗則調用拒絕策略
}

示例代碼中第二個任務會執(zhí)行到reject邏輯。DiscardPolicy的方法是空實現(xiàn),所以新創(chuàng)建的FutureTask還是NEW狀態(tài),這個狀態(tài)和get方法阻塞有密切的關系。

DiscardPolicy 和 DiscardOldestPolicy 代碼如下。他們有一個共同點就是沒有處理task的狀態(tài)。

public static class DiscardPolicy implements RejectedExecutionHandler {
    /**
     * 空方法,task會保留在NEW狀態(tài)
     */
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    }
}
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
    /**
     * poll 出一個任務,但是沒有任務處理,所以poll出來的任務是NEW狀態(tài)
     */
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if (!e.isShutdown()) {
            e.getQueue().poll();
            e.execute(r);
        }
    }
}    

先看下 FutureTask 的狀態(tài)。前面我們看到了初始化狀態(tài)是NEW,其他狀態(tài)說明如下。

private static final int NEW          = 0; 新的任務,初始狀態(tài)
private static final int COMPLETING   = 1; 當任務被設置結果時,處于COMPLETING狀態(tài),這是一個中間狀態(tài)。
private static final int NORMAL       = 2; 表示任務正常結束。
private static final int EXCEPTIONAL  = 3; 表示任務因異常而結束
private static final int CANCELLED    = 4; 任務還未執(zhí)行之前就調用了cancel(true)方法,任務處于CANCELLED
private static final int INTERRUPTING = 5; 當任務調用cancel(true)中斷程序時,任務處于INTERRUPTING狀態(tài),這是一個中間狀態(tài)。
private static final int INTERRUPTED  = 6; 任務調用cancel(true)中斷程序時會調用interrupt()方法中斷線程運行,任務狀態(tài)由INTERRUPTING轉變?yōu)镮NTERRUPTED

繼續(xù)看下 FutureTask 的get方法。

public V get() throws InterruptedException, ExecutionException {
    int s = state;
    //當狀態(tài)值<=COMPLETING時需要等待,否則調用report返回
    if (s <= COMPLETING)
        s = awaitDone(false, 0L);
    return report(s);
}
private V report(int s) throws ExecutionException {
    Object x = outcome;
    // 正常結束,返回結果
    if (s == NORMAL)
        return (V)x;
    // 如果是 >= CANCELLED 拋出取消異常,包括:CANCELLED,INTERRUPTING,INTERRUPTED狀態(tài)
    if (s >= CANCELLED)
        throw new CancellationException();
    // 剩下的條件就是 EXCEPTIONAL 了,執(zhí)行的任務拋出異常
    throw new ExecutionException((Throwable)x)
}

到這里已經很清楚了。FutureTask狀態(tài)>COMPLETING 才會返回。因為拒絕策略沒有修改FutureTask的狀態(tài),F(xiàn)utureTask的狀態(tài)一直是NEW,所以不會返回。

其他 RejectedExecutionHandler 為什么不會導致阻塞

我看看下默認的 AbortPolicy 的實現(xiàn):

public static class AbortPolicy implements RejectedExecutionHandler {
    // 回憶一下submit方法,最后會執(zhí)行reject策略。
    // AbortPolicy 直接拋出異常,調用方馬上可以獲取結果
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        throw new RejectedExecutionException("Task " + r.toString() +
                                             " rejected from " +
                                             e.toString());
    }
}

CallerRunsPolicy 策略則是讓調用線程執(zhí)行提交的任務,執(zhí)行任務時會更新狀態(tài),自然也不會阻塞。

public static class CallerRunsPolicy implements RejectedExecutionHandler {
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if (!e.isShutdown()) {
            r.run();
        }
    }
}

解決方案

  1. 使用帶超時時間的get方法,這樣使用DiscardPolicy拒絕策略不會一直阻塞。
  2. 如果一定要使用Discardpolicy 拒絕策略,需要自定義拒絕策略。
public void rejectedExecution(Runnable runable, ThreadPoolExecutor e) {
    if (! e.isShutdown()) {
        if(null ! = runable && runable instanceof FutureTask){
            ((FutureTask) runable).cancel(true);
          }
      }
}
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

友情鏈接更多精彩內容