應(yīng)用場景
當(dāng)向Executor提交多個(gè)任務(wù)并且希望獲得它們?cè)谕瓿芍蟮慕Y(jié)果,如果用FutureTask,可以循環(huán)獲取task,并調(diào)用get方法去獲取task執(zhí)行結(jié)果,但是如果task還未完成,獲取結(jié)果的線程將阻塞直到task完成,由于不知道哪個(gè)task優(yōu)先執(zhí)行完畢,使用這種方式效率不會(huì)很高。在jdk5時(shí)候提出接口CompletionService,它整合了Executor和BlockingQueue的功能,可以更加方便在多個(gè)任務(wù)執(zhí)行時(shí)獲取到任務(wù)執(zhí)行結(jié)果。
案例
需求:不使用求和公式,計(jì)算從1到100000000相加的和。
分析設(shè)計(jì):需求指明不能使用求和公式,只能循環(huán)依次相加,為了提高效率,我們可以將1到100000000的數(shù)分為n段由n個(gè)task執(zhí)行,執(zhí)行結(jié)束后merge結(jié)果求最后的和。
-
代碼實(shí)現(xiàn):
CompletionService使用案例聲明task執(zhí)行載體,線程池executor;
聲明CompletionService,通過參數(shù)指定執(zhí)行task的線程池,存放已完成狀態(tài)task的阻塞隊(duì)列,隊(duì)列默認(rèn)為基于鏈表結(jié)構(gòu)的阻塞隊(duì)列LinkedBlockingQueue;
調(diào)用submit方法提交task;
調(diào)用take方法獲取已完成狀態(tài)task。
CompletionService源碼分析
CompletionService接口提供五個(gè)方法:
Future<V> submit(Callable<V> task)
提交Callable類型的task;Future<V> submit(Runnable task, V result)
提交Runnable類型的task;Future<V> take() throws InterruptedException
獲取并移除已完成狀態(tài)的task,如果目前不存在這樣的task,則等待;Future<V> poll()
獲取并移除已完成狀態(tài)的task,如果目前不存在這樣的task,返回null;Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException
獲取并移除已完成狀態(tài)的task,如果在指定等待時(shí)間內(nèi)不存在這樣的task,返回null。
接下來我們來看看CompletionService接口的具體實(shí)現(xiàn):ExecutorCompletionService。
ExecutorCompletionService實(shí)現(xiàn)分析
-
成員變量
成員變量
ExecutorCompletionService有三個(gè)成員變量:
executor:執(zhí)行task的線程池,創(chuàng)建CompletionService必須指定;
aes:主要用于創(chuàng)建待執(zhí)行task;
completionQueue:存儲(chǔ)已完成狀態(tài)的task,默認(rèn)是基于鏈表結(jié)構(gòu)的阻塞隊(duì)列LinkedBlockingQueue。
-
構(gòu)造方法
構(gòu)造方法
ExecutorCompletionService提供兩個(gè)構(gòu)造方法,具體的使用具體情況具體分析,使用者可以根據(jù)業(yè)務(wù)場景來進(jìn)行選擇。
-
task提交
ExecutorCompletionService提供submit方法來提交Callable類型或者Runnable類型的task:
線程提交.png
具體的執(zhí)行流程如下:
參數(shù)校驗(yàn),不符合條件的task拋出異常,程序結(jié)束;
將Callable類型或者Runnable類型的task構(gòu)造成FutureTask;
把構(gòu)造好的FutureTask交由線程池executor執(zhí)行。
看到這里可能大家會(huì)比較疑惑了,task調(diào)用submit方法可以提交,完成的task是什么時(shí)候被加入到completionQueue里的呢?
針對(duì)這個(gè)問題,從submit方法的源碼可以看出,在提交到線程池的時(shí)候需要將FutureTask封裝成QueueingFuture,我們來看看QueueingFuture的具體實(shí)現(xiàn):
QueueingFuture實(shí)現(xiàn)
從源碼可以看出,QueueingFuture是FutureTask的子類,實(shí)現(xiàn)了done方法,在task執(zhí)行完成之后將當(dāng)前task添加到completionQueue,done方法的具體調(diào)用在FutureTask的finishCompletion方法,上篇介紹FutureTask的文章已經(jīng)做過具體的分析,在這里就不再贅述了。
-
已完成狀態(tài)task獲取
CompletionService的take方法和poll方法都可以獲取已完成狀態(tài)的task,我們來看看具體的實(shí)現(xiàn):
take、poll實(shí)現(xiàn)
從源碼可以看出,take和poll都是調(diào)用BlockingQueue提供的方法。既然take和poll都可以獲取到已完成狀態(tài)的task,那么他們的區(qū)別是什么呢?
take在獲取并移除已完成狀態(tài)的task時(shí),如果目前暫時(shí)不存在這樣的task,等待,直到存在這樣的task;
poll在獲取并移除已完成狀態(tài)的task時(shí),如果目前暫時(shí)不存在這樣的task,不等待,直接返回null。





