Java線程池執(zhí)行任務(wù)的兩種機制,invokeAll和 CompletionService

通常Java線程池執(zhí)行的任務(wù)有兩種類型,一種是不帶返回值的Runnable, 另一種是帶返回值的Callable。

對于不帶返回值的任務(wù)通常我們不太關(guān)注任務(wù)是否執(zhí)行結(jié)束以及結(jié)束后應(yīng)該做做些什么,我們將任務(wù)提交給線程池, 然后顧自己干別的事情。

帶返回值的任務(wù)執(zhí)行結(jié)果通常受到當前任務(wù)的依賴,任務(wù)提交給線程池后還需要等待任務(wù)的返回。對于任務(wù)結(jié)果我們會有不同的需求,有時候當前任務(wù)依賴所有提交給線程池的任務(wù)的結(jié)果, 而有時候有只依賴某一個任務(wù)的執(zhí)行結(jié)果,就好比飯店的服務(wù)員需要等待寶箱中所有顧客用餐完畢才來收拾,而食堂的阿姨卻可以單個學(xué)生用餐完畢而來收拾。

Java線程池對對于這兩種需求提供不同的解決方案

對于依賴所有任務(wù)執(zhí)行結(jié)果的可以直接使用線程池的invokeAll方法

public class Main {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        List<Callable<Integer>> tasks = new ArrayList<>();
        for( int i = 0; i < 10; i++) {
            tasks.add(()->{
                Random random = new Random();
                int second = random.nextInt(10);
                Thread.sleep(second * 1000) ;
                return second;
            });
        }
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        List<Future<Integer>> futures = executorService.invokeAll(tasks);
        for( int i = 0; i < futures.size(); i++) {
            System.out.println(futures.get(i).get());
        }

        executorService.shutdown();
    }
}

以上程序清單中的線程池執(zhí)行10個任務(wù),這些任務(wù)會做隨機延時,所有的任務(wù)都放在tasks變量中。

我們初始化一個長度為時的固定大小的線程池執(zhí)行這些任務(wù),方法invokeAll調(diào)用會阻塞,在所有任務(wù)執(zhí)行完畢后返回,然后程序打印這些返回結(jié)果。我們運行這段代碼會卡斷很長時間,接著瞬間出結(jié)果, 這是invokeAll的特性:所欲任務(wù)必須執(zhí)行完畢后才返回。

對于不依賴所有任務(wù)的執(zhí)行結(jié)果,而可以單獨處理每個任務(wù)結(jié)果的,invokeAll就顯得不友好了,雖然最終結(jié)果沒區(qū)別,執(zhí)行完所有任務(wù)都需要話同樣的時間,可是執(zhí)行完一個任務(wù)就處理一個任務(wù)的結(jié)果不是顯得更加人性化么,比如加載多張網(wǎng)絡(luò)圖片,加載完成一張就顯示一張顯然有更好的用戶體驗,對于這種需求我們可以使用CompletionService。

CompletionService能逐個返回任務(wù)的執(zhí)行結(jié)果,誰先執(zhí)行完畢返回誰。 它利用了阻塞隊列的特想,當它察覺到有任務(wù)執(zhí)行完畢時則將執(zhí)行的結(jié)果,一個Future放入它維護的一個無界阻塞隊列,外部程序就可以通過take方法拿取,如果阻塞隊列為空,也就是還沒有執(zhí)行完畢的任務(wù), 那么take方法則阻塞,外部程序繼續(xù)等待。

public class Main {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        List<Callable<Integer>> tasks = new ArrayList<>();
        for( int i = 0; i < 10; i++) {
            tasks.add(()->{
                Random random = new Random();
                int second = random.nextInt(10);
                Thread.sleep(second * 1000) ;
                return second;
            });
        }
        ExecutorService executorService = Executors.newFixedThreadPool(10);

        CompletionService<Integer> completionService = new ExecutorCompletionService(executorService);
        tasks.forEach(task -> completionService.submit(task));

        for( int i = 0; i < tasks.size(); i++) {
            System.out.println(completionService.take().get());
        }

        executorService.shutdown();
    }
}

執(zhí)行上面的代碼不會長時間卡斷后瞬間出結(jié)果,它會平緩的打印每個任務(wù)的執(zhí)行結(jié)果, 直到所有任務(wù)執(zhí)行完畢而結(jié)束程序。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

友情鏈接更多精彩內(nèi)容