java多線程系列_讓主線程等待子任務(wù)執(zhí)行的各種方式

業(yè)務(wù)場(chǎng)景

在web應(yīng)用開(kāi)發(fā)中我們經(jīng)常會(huì)遇到這樣的場(chǎng)景:一個(gè)請(qǐng)求任務(wù),我們需要去查多個(gè)庫(kù),并對(duì)查詢到的數(shù)據(jù)做處理,此時(shí)如果采用同步的方式去查,往往會(huì)導(dǎo)致請(qǐng)求響應(yīng)時(shí)間過(guò)慢。比如:兩個(gè)查詢?nèi)蝿?wù)task1,task2,task1查詢數(shù)據(jù)要花2s,處理數(shù)據(jù)要花1s;task2查詢數(shù)據(jù)花5s,處理數(shù)據(jù)花2s,那一次請(qǐng)求的時(shí)間是2+1+5+2=10s。而如果我們用異步的方式,則能減少請(qǐng)求響應(yīng)的時(shí)間。
而利用異步的方式,常常子任務(wù)還未執(zhí)行完,主線程就已經(jīng)結(jié)束了,導(dǎo)致數(shù)據(jù)不能很好的返回到前端,所以主線程必須保證所有的子任務(wù)執(zhí)行結(jié)束后才能退出。
接下來(lái)我講討論各種異步方式來(lái)處理這種業(yè)務(wù)場(chǎng)景的方式。

方式一:利用java多線程工具Future.get()獲取數(shù)據(jù)

public class TestFuture {
    // 任務(wù)一執(zhí)行2s
    public static class Task1 implements Callable {
        public Object call() throws Exception {
            System.out.println("task1 starting ...");
            List<String> lists = new ArrayList<String>();
            lists.add("task1");
            Thread.sleep(2000);
            System.out.println("task1 ending ...");
            return lists;
        }
    }
    // 任務(wù)一執(zhí)行5s
    public static class Task2 implements Callable {
        public Object call() throws Exception {
            System.out.println("task2 starting ...");
            List<String> lists = new ArrayList<String>();
            lists.add("task2");
            Thread.sleep(5000);
            System.out.println("task2 ending ...");
            return lists;
        }
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        Long start = System.currentTimeMillis();
        int cpuNum = Runtime.getRuntime().availableProcessors();
        ExecutorService executor = new ThreadPoolExecutor(cpuNum, cpuNum * 2, 60000L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
        Future<List> future1 = executor.submit(new Task1());
        Future<List> future2 = executor.submit(new Task2());
        // 獲取任務(wù)一和任務(wù)二的數(shù)據(jù) 進(jìn)行處理
        List<String> lists1 = future1.get();
        List<String> lists2 = future2.get();
        // ===》分析點(diǎn):
        dealTask1Data(lists1);
        dealTask2Data(lists2);
        System.out.println("task ending");
        Long time = System.currentTimeMillis() - start;
        System.out.println("執(zhí)行任務(wù)所花的時(shí)間:" + time + "s");
    }

    // 處理任務(wù)1數(shù)據(jù) 處理1s
    public static void dealTask1Data(List<String> lists) throws InterruptedException {
        System.out.println("deal task1 data ...");
        Thread.sleep(1000);
    }

    // 處理任務(wù)2數(shù)據(jù) 處理2s
    public static void dealTask2Data(List<String> lists) throws InterruptedException {
        System.out.println("deal task2 data ...");
        Thread.sleep(2000);
    }
}

執(zhí)行結(jié)果

task1 starting ...
task2 starting ...
task1 ending ...
task2 ending ...
deal task1 data ...
deal task2 data ...
task ending
執(zhí)行任務(wù)所花的時(shí)間:8009s

結(jié)果分析:
查看源碼===》標(biāo)注處,利用future1.get(),future2.get()獲取數(shù)據(jù),需要等到future1和future2所有的數(shù)據(jù)返回后,主線程才能繼續(xù)往下執(zhí)行,所以執(zhí)行到future2.get()的時(shí)間需要5s,而后處理task1數(shù)據(jù)1s,處理task2數(shù)據(jù)2s,執(zhí)行時(shí)間為5+1+2 = 8s。

方式二: 利用CountDownLatch讓主線程等待子線程任務(wù)結(jié)束

public class TestCountDownLatch {
    // 任務(wù)一執(zhí)行2s
    public static class Task1 implements Callable {
        private CountDownLatch countDownLatch;
        public Task1(CountDownLatch countDownLatch) {
            this.countDownLatch = countDownLatch;
        }
        public Object call() throws Exception {
            System.out.println("task1 starting ...");
            List<String> lists = new ArrayList<String>();
            lists.add("task1");
            Thread.sleep(2000);
            System.out.println("task1 ending ...");
            // 對(duì)任務(wù)一的數(shù)據(jù)進(jìn)行處理
            dealTask1Data(lists);
            // 任務(wù)一結(jié)束 對(duì)countDownLatch計(jì)數(shù)器--
            countDownLatch.countDown();
            return lists;
        }
        // 處理任務(wù)1數(shù)據(jù)
        public void dealTask1Data(List<String> lists) throws InterruptedException {
            System.out.println("deal task1 data ...");
            Thread.sleep(1000);
        }
    }

    // 任務(wù)一執(zhí)行5s
    public static class Task2 implements Callable {
        private CountDownLatch countDownLatch;
        public Task2(CountDownLatch countDownLatch) {
            this.countDownLatch = countDownLatch;
        }
        public Object call() throws Exception {
            System.out.println("task2 starting ...");
            List<String> lists = new ArrayList<String>();
            lists.add("task2");
            Thread.sleep(5000);
            System.out.println("task2 ending ...");
            // 對(duì)任務(wù)二的數(shù)據(jù)進(jìn)行處理
            dealTask2Data(lists);
            // 任務(wù)二結(jié)束 對(duì)countDownLatch計(jì)數(shù)器--
            countDownLatch.countDown();
            return lists;
        }
        // 處理任務(wù)2數(shù)據(jù)
        public static void dealTask2Data(List<String> lists) throws InterruptedException {
            System.out.println("deal task2 data ...");
            Thread.sleep(2000);
        }
    }
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        long start = System.currentTimeMillis();
        CountDownLatch countDownLatch = new CountDownLatch(2);
        int cpuNum = Runtime.getRuntime().availableProcessors();
        ExecutorService executor = new ThreadPoolExecutor(cpuNum, cpuNum * 2, 60000L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
        executor.submit(new Task1(countDownLatch));
        executor.submit(new Task2(countDownLatch));
        // 等countDownLatch == 0時(shí),主線程結(jié)束 10s超時(shí),自動(dòng)結(jié)束,如果任務(wù)沒(méi)超過(guò)10s,也得等10s
        // countDownLatch.await(10000, TimeUnit.MILLISECONDS);
        // ===> 等到countDownLatch計(jì)數(shù)器為0,才往下執(zhí)行
        countDownLatch.await();
        System.out.println("task ending ...");
        long time = System.currentTimeMillis() - start;
        System.out.println("執(zhí)行任務(wù)所花的時(shí)間:" + time + "s");
    }
}

執(zhí)行結(jié)果:

task1 starting ...
task2 starting ...
task1 ending ...
deal task1 data ...
task2 ending ...
deal task2 data ...
task ending ...
執(zhí)行任務(wù)所花的時(shí)間:7031s

結(jié)果分析:
將任務(wù)查詢到的數(shù)據(jù)處理放到每個(gè)線程里處理,然后利用CountDownLatch作為計(jì)數(shù)器,開(kāi)始給CountDownLatch設(shè)置任務(wù)數(shù),在每個(gè)線程執(zhí)行完畢之后,計(jì)數(shù)器減一,在===》標(biāo)注點(diǎn),主線程會(huì)等countDownLatch計(jì)數(shù)器為0的時(shí)候才會(huì)繼續(xù)往下執(zhí)行。因?yàn)樯厦娲a將數(shù)據(jù)處理放到了每個(gè)線程中,每個(gè)線程是并發(fā)執(zhí)行的,所以任務(wù)執(zhí)行時(shí)間是5+2=7s。

方式三:利用CyclicBarrier讓主線程等待子線程

public class TestCyclicBarrier {
    // 任務(wù)一執(zhí)行2s
    public static class Task1 implements Callable {
        private CyclicBarrier cyclicBarrier;
        public Task1(CyclicBarrier cyclicBarrier) {
            this.cyclicBarrier = cyclicBarrier;
        }
        public Object call() throws Exception {
            System.out.println("task1 starting ...");
            List<String> lists = new ArrayList<String>();
            lists.add("task1");
            Thread.sleep(2000);
            System.out.println("task1 ending ...");
            // 對(duì)任務(wù)一的數(shù)據(jù)進(jìn)行處理
            dealTask1Data(lists);
            // 任務(wù)一結(jié)束 對(duì)countDownLatch計(jì)數(shù)器--
            cyclicBarrier.await();
            return lists;
        }
        // 處理任務(wù)1數(shù)據(jù)
        public void dealTask1Data(List<String> lists) throws InterruptedException {
            System.out.println("deal task1 data ...");
            Thread.sleep(1000);
        }
    }
    // 任務(wù)一執(zhí)行2s
    public static class Task2 implements Callable {
        private CyclicBarrier cyclicBarrier;
        public Task2(CyclicBarrier cyclicBarrier) {
            this.cyclicBarrier = cyclicBarrier;
        }
        public Object call() throws Exception {
            System.out.println("task2 starting ...");
            List<String> lists = new ArrayList<String>();
            lists.add("task2");
            Thread.sleep(5000);
            System.out.println("task2 ending ...");
            // 對(duì)任務(wù)二的數(shù)據(jù)進(jìn)行處理
            dealTask2Data(lists);
            // 任務(wù)二結(jié)束 對(duì)countDownLatch計(jì)數(shù)器--
            cyclicBarrier.await();
            return lists;
        }
        // 處理任務(wù)2數(shù)據(jù)
        public static void dealTask2Data(List<String> lists) throws InterruptedException {
            System.out.println("deal task2 data ...");
            Thread.sleep(2000);
        }
    }
    public static void main(String[] args) throws ExecutionException, InterruptedException, BrokenBarrierException, TimeoutException {
        long start = System.currentTimeMillis();
        CyclicBarrier cyclicBarrier = new CyclicBarrier(3);
        int cpuNum = Runtime.getRuntime().availableProcessors();
        ExecutorService executor = new ThreadPoolExecutor(cpuNum, cpuNum * 2, 60000L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
        executor.submit(new Task1(cyclicBarrier));
        executor.submit(new Task2(cyclicBarrier));
        // 等countDownLatch == 0時(shí),主線程結(jié)束 3s超時(shí),超時(shí)會(huì)報(bào)異常
        // cyclicBarrier.await(3000, TimeUnit.MILLISECONDS);
        // ===》
        cyclicBarrier.await();
        System.out.println("task ending ...");
        long time = System.currentTimeMillis() - start;
        System.out.println("執(zhí)行任務(wù)所花的時(shí)間:" + time + "s");
    }
}

執(zhí)行結(jié)果:

task1 starting ...
task2 starting ...
task1 ending ...
deal task1 data ...
task2 ending ...
deal task2 data ...
task ending ...
執(zhí)行任務(wù)所花的時(shí)間:7022s

結(jié)果分析:
當(dāng)代碼執(zhí)行到===》標(biāo)注點(diǎn)的時(shí)候,cyclicBarrier.await()會(huì)看task1和task2的代碼是否也執(zhí)行到了cyclicBarrier.await(),如果有任務(wù)沒(méi)有執(zhí)行到,則會(huì)繼續(xù)等待,只有3個(gè)任務(wù)同時(shí)執(zhí)行到了cyclicBarrier.await()任務(wù)才會(huì)繼續(xù)往下執(zhí)行。

CountDownLatch與CyclicBarrier的區(qū)別

javadoc的解釋?zhuān)?/p>

  • CountDownLatch:
    A synchronization aid that allows one or more threads to wait until a set of operations being performed in other threads completes.
    一個(gè)線程(或者多個(gè)), 只有另外N個(gè)線程完成某個(gè)事情之后才能繼續(xù)往下執(zhí)行。(即只有計(jì)數(shù)器為0的時(shí)候,才能繼續(xù)往下執(zhí)行)
  • CyclicBarrier :
    A synchronization aid that allows a set of threads to all wait for each other to reach a common barrier point.
    N個(gè)線程相互等待,只有所有的線程都執(zhí)行到了barrier點(diǎn),所有線程才能繼續(xù)往下執(zhí)行,否則所有線程都必須等待。

方式四:利用CompletionService

public class TestCompletion {
    // 任務(wù)一執(zhí)行2s
    public static class Task1 implements Callable {
        public Object call() throws Exception {
            System.out.println("task1 starting ...");
            List<String> lists = new ArrayList<String>();
            lists.add("task1");
            Thread.sleep(5000);
            System.out.println("task1 ending ...");
            return lists;
        }
    }
    // 任務(wù)一執(zhí)行3s
    public static class Task2 implements Callable {
        public Object call() throws Exception {
            System.out.println("task2 starting ...");
            List<String> lists = new ArrayList<String>();
            lists.add("task2");
            Thread.sleep(3000);
            System.out.println("task2 ending ...");
            return lists;
        }
    }
    public static void main(String[] args) throws ExecutionException, InterruptedException, BrokenBarrierException, TimeoutException {
        long start = System.currentTimeMillis();
        int cpuNum = Runtime.getRuntime().availableProcessors();
        ExecutorService executor = new ThreadPoolExecutor(cpuNum, cpuNum * 2, 60000L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
        CompletionService<List<String>> completionService = new ExecutorCompletionService<List<String>>(executor);
        completionService.submit(new Task1());
        completionService.submit(new Task2());
        // 能做到先返回任務(wù),結(jié)果就先輸出
        try {
            for (int i = 0; i < 2; i++) {
//                Future<List<String>> result = completionService.take();
//                System.out.println("hello world");
//                System.out.println(result.get());
                Future<List<String>> result2 = completionService.poll(5000, TimeUnit.MILLISECONDS);
                System.out.println(result2.get());
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        } catch (ExecutionException e) {
            throw e;
        }

        System.out.println("task ending ...");
        long time = System.currentTimeMillis() - start;
        System.out.println("執(zhí)行任務(wù)所花的時(shí)間:" + time + "s");
    }
}

CompletionService將Executor和BlockingQueue的功能融合在一起.可以將Callable任務(wù)提交給它來(lái)執(zhí)行,然后使用類(lèi)似與隊(duì)列操作的take和poll等方法來(lái)獲得已完成的結(jié)果,而這些結(jié)果會(huì)在完成時(shí)將被封裝為Future.ExecutorCompletionService實(shí)現(xiàn)了CompletionService,并將計(jì)算部分委托給一個(gè)Executor.

ExecutorCompletionService的實(shí)現(xiàn)非常簡(jiǎn)單.在構(gòu)造函數(shù)中創(chuàng)建一個(gè)BlockingQueue來(lái)保存計(jì)算完成的結(jié)果.當(dāng)計(jì)算完成時(shí),調(diào)用Future-Task中的done方法.當(dāng)提交某個(gè)任務(wù)時(shí),該任務(wù)將首先包裝為一個(gè)QueueingFuture,這是FutureTask的一個(gè)子類(lèi),然后再改寫(xiě)子類(lèi)的done方法,并將結(jié)果放入BlockingQueue中.take和poll方法委托給了BlockingQueue,這些方法會(huì)在得出結(jié)果之前阻塞.


private class QueueingFuture<V> extends FutureTask<V> {

    QueueingFuture(Callable<V> c){super(c);}

    QueueingFuture(Runnable t, V r) {super(t, r);}

    protected void done() {

        completionQueue.add(this);        

    }

}

結(jié)果:

task1 starting ...
task2 starting ...
task2 ending ...
[task2]
task1 ending ...
[task1]
task ending ...
執(zhí)行任務(wù)所花的時(shí)間:5015s

多個(gè)ExecutorCompletionservice可以共享一個(gè)Executor,因此可以創(chuàng)建一個(gè)特定計(jì)算私有,又能共享一個(gè)公共Executor的ExecutorCompletionService.因此,CompletionService的作用就相當(dāng)于一組計(jì)算的句柄.這與Future作為單個(gè)計(jì)算句柄是非常類(lèi)似的.通過(guò)記錄提交CompletionService的任務(wù)數(shù)量,并計(jì)算出已經(jīng)獲得的已完成結(jié)果的數(shù)量.通過(guò)記錄提交給CompletionService的任務(wù)數(shù)量,并計(jì)算出已經(jīng)獲得的已完成結(jié)果的數(shù)量,即使使用一個(gè)共享的Executor,也能知道已經(jīng)獲得了所有任務(wù)結(jié)果的時(shí)間.

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 下面是我自己收集整理的Java線程相關(guān)的面試題,可以用它來(lái)好好準(zhǔn)備面試。 參考文檔:-《Java核心技術(shù) 卷一》-...
    阿呆變Geek閱讀 15,141評(píng)論 14 507
  • Java-Review-Note——4.多線程 標(biāo)簽: JavaStudy PS:本來(lái)是分開(kāi)三篇的,后來(lái)想想還是整...
    coder_pig閱讀 1,772評(píng)論 2 17
  • “招聘”到底是什么?難道,難道就是報(bào)名、篩選、面試、錄用、解聘等這些流程的工業(yè)化“生產(chǎn)”從規(guī)劃到招聘到薪酬到績(jī)效到...
    中大MBA管理資訊閱讀 332評(píng)論 0 1
  • 欣賞一個(gè)人, 不是因?yàn)樗卸嗝磧?yōu)秀, 不是因?yàn)樗卸嗝疵利悾?而是在平常的言談舉止中 看到了一份美好, 欣賞一個(gè)人...
    古城蒼狼閱讀 317評(píng)論 0 3
  • 1月19日,17天了。預(yù)期是28天的服盆期,實(shí)際在14-18天左右,就已經(jīng)服盆完畢。主要原因如下: ⊙ 大氣候(天...
    園藝小綠閱讀 765評(píng)論 0 0

友情鏈接更多精彩內(nèi)容