通常Java線程池執(zhí)行的任務(wù)有兩種類型,一種是不帶返回值的Runnable, 另一種是帶返回值的Callable。
對于不帶返回值的任務(wù)通常我們不太關(guān)注任務(wù)是否執(zhí)行結(jié)束以及結(jié)束后應(yīng)該做做些什么,我們將任務(wù)提交給線程池, 然后顧自己干別的事情。
帶返回值的任務(wù)執(zhí)行結(jié)果通常受到當前任務(wù)的依賴,任務(wù)提交給線程池后還需要等待任務(wù)的返回。對于任務(wù)結(jié)果我們會有不同的需求,有時候當前任務(wù)依賴所有提交給線程池的任務(wù)的結(jié)果, 而有時候有只依賴某一個任務(wù)的執(zhí)行結(jié)果,就好比飯店的服務(wù)員需要等待寶箱中所有顧客用餐完畢才來收拾,而食堂的阿姨卻可以單個學(xué)生用餐完畢而來收拾。
Java線程池對對于這兩種需求提供不同的解決方案
對于依賴所有任務(wù)執(zhí)行結(jié)果的可以直接使用線程池的invokeAll方法
public class Main {
public static void main(String[] args) throws InterruptedException, ExecutionException {
List<Callable<Integer>> tasks = new ArrayList<>();
for( int i = 0; i < 10; i++) {
tasks.add(()->{
Random random = new Random();
int second = random.nextInt(10);
Thread.sleep(second * 1000) ;
return second;
});
}
ExecutorService executorService = Executors.newFixedThreadPool(10);
List<Future<Integer>> futures = executorService.invokeAll(tasks);
for( int i = 0; i < futures.size(); i++) {
System.out.println(futures.get(i).get());
}
executorService.shutdown();
}
}
以上程序清單中的線程池執(zhí)行10個任務(wù),這些任務(wù)會做隨機延時,所有的任務(wù)都放在tasks變量中。
我們初始化一個長度為時的固定大小的線程池執(zhí)行這些任務(wù),方法invokeAll調(diào)用會阻塞,在所有任務(wù)執(zhí)行完畢后返回,然后程序打印這些返回結(jié)果。我們運行這段代碼會卡斷很長時間,接著瞬間出結(jié)果, 這是invokeAll的特性:所欲任務(wù)必須執(zhí)行完畢后才返回。
對于不依賴所有任務(wù)的執(zhí)行結(jié)果,而可以單獨處理每個任務(wù)結(jié)果的,invokeAll就顯得不友好了,雖然最終結(jié)果沒區(qū)別,執(zhí)行完所有任務(wù)都需要話同樣的時間,可是執(zhí)行完一個任務(wù)就處理一個任務(wù)的結(jié)果不是顯得更加人性化么,比如加載多張網(wǎng)絡(luò)圖片,加載完成一張就顯示一張顯然有更好的用戶體驗,對于這種需求我們可以使用CompletionService。
CompletionService能逐個返回任務(wù)的執(zhí)行結(jié)果,誰先執(zhí)行完畢返回誰。 它利用了阻塞隊列的特想,當它察覺到有任務(wù)執(zhí)行完畢時則將執(zhí)行的結(jié)果,一個Future放入它維護的一個無界阻塞隊列,外部程序就可以通過take方法拿取,如果阻塞隊列為空,也就是還沒有執(zhí)行完畢的任務(wù), 那么take方法則阻塞,外部程序繼續(xù)等待。
public class Main {
public static void main(String[] args) throws InterruptedException, ExecutionException {
List<Callable<Integer>> tasks = new ArrayList<>();
for( int i = 0; i < 10; i++) {
tasks.add(()->{
Random random = new Random();
int second = random.nextInt(10);
Thread.sleep(second * 1000) ;
return second;
});
}
ExecutorService executorService = Executors.newFixedThreadPool(10);
CompletionService<Integer> completionService = new ExecutorCompletionService(executorService);
tasks.forEach(task -> completionService.submit(task));
for( int i = 0; i < tasks.size(); i++) {
System.out.println(completionService.take().get());
}
executorService.shutdown();
}
}
執(zhí)行上面的代碼不會長時間卡斷后瞬間出結(jié)果,它會平緩的打印每個任務(wù)的執(zhí)行結(jié)果, 直到所有任務(wù)執(zhí)行完畢而結(jié)束程序。