Java異步任務(wù)優(yōu)化CompletionService

上一篇詳細(xì)介紹了Future與Callable,而CompletionService則對(duì)它們的優(yōu)化。?

Future的缺陷

Future通過(guò)get方法來(lái)獲取異步任務(wù)的結(jié)果,如果任務(wù)還沒(méi)有完成則阻塞線程,因?yàn)槲覀冃枰慕Y(jié)果,所以等待是應(yīng)該的。

如果需要處理一批這類任務(wù),提交到線程池我們最終會(huì)得到多個(gè)Future,但是每個(gè)任務(wù)執(zhí)行的時(shí)間可能并不相同,那么我們應(yīng)該優(yōu)先調(diào)用哪個(gè)Future的get呢?示例如下圖:


可以看到有些任務(wù)因?yàn)閳?zhí)行時(shí)間較長(zhǎng),而在他后面的任務(wù)可能執(zhí)行任務(wù)的時(shí)間較短,已經(jīng)提前完成了,但是并不能得到及時(shí)的處理,只能等到前面的任務(wù)執(zhí)行完成后才能處理,這樣設(shè)計(jì)是非常不合理的。

CompletionService解決

線程Java中提供了CompletionService來(lái)解決這個(gè)問(wèn)題的,同樣可以把一批任務(wù)提交到CompletionService,CompletionService可以把先執(zhí)行完成的任務(wù)通過(guò)take方法獲取到。使用方法如下圖:


ExecutorCompletionService是CompletionService的具體實(shí)現(xiàn),我們把線程池設(shè)置給ExecutorCompletionService后,也可以通過(guò)submit提交任務(wù)到ExecutorCompletionService,最后通過(guò)take方法獲取到已經(jīng)完成的Future

從上面例子可以看到因?yàn)閕d等于3的任務(wù)先執(zhí)行完成,所以也先處理了這個(gè)任務(wù)。最先提交的任務(wù)id等于0的因?yàn)樾菝邥r(shí)間較長(zhǎng),所以先完成的任務(wù)就可以先執(zhí)行處理。

ExecutorCompletionService源碼

從上面的例子可以看到使用ExecutorCompletionService有三個(gè)關(guān)鍵步驟:設(shè)置一個(gè)線程池、submit提交任務(wù)、take獲取完成的Future。

看源碼首先看他的屬性,查看源碼得到他有兩個(gè)關(guān)鍵屬性:

Executor executor:執(zhí)行線程的線程池;

BlockingQueue<Future<V>> completionQueue:阻塞隊(duì)列,保存完成的Future;

看到這兩個(gè)屬性就大概能夠猜到它的實(shí)現(xiàn)方案:利用線程池去執(zhí)行任務(wù),利用阻塞隊(duì)列來(lái)保存完成的Future。

在submit方法肯定調(diào)用的是線程池的submit方法這個(gè)很簡(jiǎn)單,唯一的問(wèn)題是如何把完成的Future放到阻塞隊(duì)列中的?

上一篇文章中我們知道最終實(shí)現(xiàn)有返回的異步任務(wù)的類是FutureTask,最終運(yùn)行的是FutureTask的run方法,run方法完成后會(huì)調(diào)用finishCompletion方法去喚醒那些因?yàn)檎{(diào)用FutureTask.get()方法而阻塞的線程,在finishCompletion的最后調(diào)用了done()方法,只不過(guò)在FutureTask中done()方法是一個(gè)空方法

而ExecutorCompletionService就是利用FutureTask的done()實(shí)現(xiàn)了把任務(wù)放到阻塞隊(duì)列中。

ExecutorCompletionService有一個(gè)私有的內(nèi)部類QueueingFuture,它繼承FutureTask,并實(shí)現(xiàn)了done()方法,done()方法把任務(wù)放到了ExecutorCompletionService的阻塞隊(duì)列中(所以QueueingFuture是私有類)。done()方法只有在任務(wù)執(zhí)行完成后才會(huì)調(diào)用。

圖解ExecutorCompletionService功能

通過(guò)以上分析基本上弄懂了ExecutorCompletionService的執(zhí)行流程,分析如下圖:


可以看到整個(gè)源碼實(shí)際上就只有三步:

submit方法把任務(wù)封裝FutureTask并作為QueueingFuture的屬性保存,然后提交QueueingFuture到線程池;

線程池執(zhí)行任務(wù)調(diào)用的是QueueingFuture的run方法,run方法最后調(diào)用done方法把屬性FutureTask(此時(shí)任務(wù)已經(jīng)完成)添加到阻塞隊(duì)列;

take方法從阻塞隊(duì)列中獲取FutureTask;

總結(jié)

ExecutorCompletionService實(shí)際上還是比較簡(jiǎn)單的一個(gè)類,提交任務(wù)利用的是線程池的提交,而他自己只創(chuàng)建了一個(gè)FutureTask的子類用來(lái)實(shí)現(xiàn)done方法,在任務(wù)完成后把FutureTask添加到阻塞隊(duì)列中。take方法調(diào)用的是阻塞隊(duì)列的take獲取FutureTask。?

Java程序員日常學(xué)習(xí)筆記,如理解有誤歡迎各位交流討論!


?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容