上一篇詳細(xì)介紹了Future與Callable,而CompletionService則對(duì)它們的優(yōu)化。?
Future的缺陷
Future通過(guò)get方法來(lái)獲取異步任務(wù)的結(jié)果,如果任務(wù)還沒(méi)有完成則阻塞線程,因?yàn)槲覀冃枰慕Y(jié)果,所以等待是應(yīng)該的。
如果需要處理一批這類任務(wù),提交到線程池我們最終會(huì)得到多個(gè)Future,但是每個(gè)任務(wù)執(zhí)行的時(shí)間可能并不相同,那么我們應(yīng)該優(yōu)先調(diào)用哪個(gè)Future的get呢?示例如下圖:

可以看到有些任務(wù)因?yàn)閳?zhí)行時(shí)間較長(zhǎng),而在他后面的任務(wù)可能執(zhí)行任務(wù)的時(shí)間較短,已經(jīng)提前完成了,但是并不能得到及時(shí)的處理,只能等到前面的任務(wù)執(zhí)行完成后才能處理,這樣設(shè)計(jì)是非常不合理的。
CompletionService解決
線程Java中提供了CompletionService來(lái)解決這個(gè)問(wèn)題的,同樣可以把一批任務(wù)提交到CompletionService,CompletionService可以把先執(zhí)行完成的任務(wù)通過(guò)take方法獲取到。使用方法如下圖:

ExecutorCompletionService是CompletionService的具體實(shí)現(xiàn),我們把線程池設(shè)置給ExecutorCompletionService后,也可以通過(guò)submit提交任務(wù)到ExecutorCompletionService,最后通過(guò)take方法獲取到已經(jīng)完成的Future。
從上面例子可以看到因?yàn)閕d等于3的任務(wù)先執(zhí)行完成,所以也先處理了這個(gè)任務(wù)。最先提交的任務(wù)id等于0的因?yàn)樾菝邥r(shí)間較長(zhǎng),所以先完成的任務(wù)就可以先執(zhí)行處理。
ExecutorCompletionService源碼
從上面的例子可以看到使用ExecutorCompletionService有三個(gè)關(guān)鍵步驟:設(shè)置一個(gè)線程池、submit提交任務(wù)、take獲取完成的Future。
看源碼首先看他的屬性,查看源碼得到他有兩個(gè)關(guān)鍵屬性:
Executor executor:執(zhí)行線程的線程池;
BlockingQueue<Future<V>> completionQueue:阻塞隊(duì)列,保存完成的Future;
看到這兩個(gè)屬性就大概能夠猜到它的實(shí)現(xiàn)方案:利用線程池去執(zhí)行任務(wù),利用阻塞隊(duì)列來(lái)保存完成的Future。
在submit方法肯定調(diào)用的是線程池的submit方法這個(gè)很簡(jiǎn)單,唯一的問(wèn)題是如何把完成的Future放到阻塞隊(duì)列中的?
上一篇文章中我們知道最終實(shí)現(xiàn)有返回的異步任務(wù)的類是FutureTask,最終運(yùn)行的是FutureTask的run方法,run方法完成后會(huì)調(diào)用finishCompletion方法去喚醒那些因?yàn)檎{(diào)用FutureTask.get()方法而阻塞的線程,在finishCompletion的最后調(diào)用了done()方法,只不過(guò)在FutureTask中done()方法是一個(gè)空方法。
而ExecutorCompletionService就是利用FutureTask的done()實(shí)現(xiàn)了把任務(wù)放到阻塞隊(duì)列中。
ExecutorCompletionService有一個(gè)私有的內(nèi)部類QueueingFuture,它繼承FutureTask,并實(shí)現(xiàn)了done()方法,done()方法把任務(wù)放到了ExecutorCompletionService的阻塞隊(duì)列中(所以QueueingFuture是私有類)。done()方法只有在任務(wù)執(zhí)行完成后才會(huì)調(diào)用。
圖解ExecutorCompletionService功能
通過(guò)以上分析基本上弄懂了ExecutorCompletionService的執(zhí)行流程,分析如下圖:

可以看到整個(gè)源碼實(shí)際上就只有三步:
submit方法把任務(wù)封裝FutureTask并作為QueueingFuture的屬性保存,然后提交QueueingFuture到線程池;
線程池執(zhí)行任務(wù)調(diào)用的是QueueingFuture的run方法,run方法最后調(diào)用done方法把屬性FutureTask(此時(shí)任務(wù)已經(jīng)完成)添加到阻塞隊(duì)列;
take方法從阻塞隊(duì)列中獲取FutureTask;
總結(jié)
ExecutorCompletionService實(shí)際上還是比較簡(jiǎn)單的一個(gè)類,提交任務(wù)利用的是線程池的提交,而他自己只創(chuàng)建了一個(gè)FutureTask的子類用來(lái)實(shí)現(xiàn)done方法,在任務(wù)完成后把FutureTask添加到阻塞隊(duì)列中。take方法調(diào)用的是阻塞隊(duì)列的take獲取FutureTask。?
Java程序員日常學(xué)習(xí)筆記,如理解有誤歡迎各位交流討論!
