Java的異步編程是一項非常常用的多線程技術(shù)。
之前通過源碼詳細(xì)分析了ThreadPoolExecutor《你真的懂ThreadPoolExecutor線程池技術(shù)嗎?看了源碼你會有全新的認(rèn)識》。通過創(chuàng)建一個ThreadPoolExecutor,往里面丟任務(wù)就可以實現(xiàn)多線程異步執(zhí)行了。
但之前的任務(wù)主要傾向于線程池,并沒有講到異步編程方面的內(nèi)容。本文將通過介紹Executor+Future框架(FutureTask是實現(xiàn)的核心),來深入了解下Java的異步編程。
萬事從示例開始,我們先通過示例Demo有一個直觀的印象,再深入去了解概念與原理。
使用示例
Demo:
使用上比較簡單,
運行結(jié)果:
任務(wù)1異步執(zhí)行:0
任務(wù)2異步執(zhí)行:0
任務(wù)2異步執(zhí)行:1
...
任務(wù)2異步執(zhí)行:45
同步代碼
任務(wù)2異步執(zhí)行:24
...
任務(wù)1異步執(zhí)行:199
任務(wù)1:執(zhí)行完成
...
任務(wù)2異步執(zhí)行:199
任務(wù)2:執(zhí)行完成
假若你多次執(zhí)行這個程序,會發(fā)現(xiàn)結(jié)果大大的不一樣,因為兩個任務(wù)和同步代碼是異步由多條線程執(zhí)行的,打印的結(jié)果當(dāng)然是隨機的。
回顧這個Demo做了什么,
- 構(gòu)建了一個線程池
- 往線程池里面丟兩個需要執(zhí)行的任務(wù)
- 最后獲取這兩個任務(wù)的結(jié)果
其中第二點是異步執(zhí)行兩個任務(wù),這兩個任務(wù)和主線程分別是用了三個線程并發(fā)執(zhí)行的,第三點是在主線程中同步等待兩個任務(wù)的結(jié)果。
很容易看出來,異步編程的好處就在于可以讓不相干的任務(wù)異步執(zhí)行,不阻塞主線程。若是主線程需要異步執(zhí)行的結(jié)果,此時再去等待結(jié)果會更加高效,提高程序的執(zhí)行效率。
下面來看看整個流程的實現(xiàn)原理。
源碼分析
一般在實際項目中,都會有配置有自己的線程池,建議大家在用異步編程時,配置一個專用的線程池,做好線程隔離,避免異步線程影響到其他模塊的工作。Demo中為了方便,直接調(diào)用Exectors的方法生成一個臨時的線程池,日常不建議使用。
我們從這個ExecutorService.submit()方法入手,看看整體實現(xiàn)。
ExecutorService.submit()定義一個接口。這個接口接收一個Callable參數(shù)(執(zhí)行的任務(wù)),返回一個Future(計算結(jié)果)。
Callable,相當(dāng)于一個需要執(zhí)行的任務(wù)。它不接收任何參數(shù),可以返回結(jié)果,可以拋出異常。相類似的還有Runnable,它也是不接收,不同點在于它不返回結(jié)果,也不拋異常,異常需要在任務(wù)內(nèi)部處理??偨Y(jié)來說Callable更像一個方法的調(diào)用,Runnable則是一個不需要理會結(jié)果的調(diào)用。在JDK 8以后,它們都可以通過Lamda表達式寫法去替代內(nèi)部類的寫法(詳見Demo)。
Future,一個異步計算的結(jié)果。調(diào)用get()方法可以得到對應(yīng)的計算結(jié)果,如果調(diào)用時沒有異步計算完,會阻塞等待計算的結(jié)果。同時它還提供方法可以嘗試取消任務(wù)的執(zhí)行。
看回ExecutorService.submit()的實現(xiàn),代碼在實現(xiàn)類AbstractExecutorService中。
除了它接口的實現(xiàn),還提供了兩種變形。原來接口只接收
Callable參數(shù),實現(xiàn)類中還新增了接收Runnable參數(shù)的。
如果看過之前寫的《你真的懂ThreadPoolExecutor線程池技術(shù)嗎?看了源碼你會有全新的認(rèn)識》,應(yīng)該了解ThreadPoolExecutor執(zhí)行任務(wù)是可以調(diào)用execute()方法的。而這里面submit()方法則是為Callable/Runnable加多一層FutureTask,從而
使執(zhí)行結(jié)果有一個存放的地方,同時也添加一個可以取消的功能。原本的execute()只能執(zhí)行任務(wù),不會返回結(jié)果的,具體實現(xiàn)原理可以看看之前的文章分析。
FutureTask是RunnableFuture的實現(xiàn)。而RunnableFuture是繼承Future和Runnable接口的,定義run()接口。
因為
FutureTask有run()接口,所以可以直接用一個Callable/Runnable創(chuàng)建一個FutureTask單獨執(zhí)行。但這樣并沒有異步的效果,因為沒有啟用新的線程去跑,而是在原來的線程阻塞執(zhí)行的。
到這里我們清楚知道了,submit()方法重點是利用Callable/Runnable創(chuàng)建一個FutureTask,然后多線程執(zhí)行run()方法,達到異步處理并且得到結(jié)果的效果。而FutureTask的重點則是run()方法如何持有保存計算的結(jié)果。
FutureTask.run()
首先判斷
futureTask對象的state狀態(tài),如果不是NEW的話,證明已經(jīng)開始運行過了,則退出執(zhí)行。同時futureTask對象通過CAS,把當(dāng)前線程賦值給變量runner(是Thread類型,說明對象使用哪個線程執(zhí)行的),如果CAS失敗則退出。
外層try{}代碼塊中,對callable判空和state狀態(tài)必須是NEW。內(nèi)層try{}代碼真正調(diào)用callable,開始執(zhí)行任務(wù)。若執(zhí)行成功,則把ran變量設(shè)為true,保存結(jié)果在result變量中,證明已跑成功過了;若拋異常了,則設(shè)為false,result為空,并且調(diào)用setException()保存異常。最后如果ran為true的話,則調(diào)用set()保存result結(jié)果。
看下setException()和set()的實現(xiàn)。
兩者的基本流程一樣,CAS置換狀態(tài),保存結(jié)果在
outcome變量道中,但setException()保存的結(jié)果類型固定是Throwable。另外一個不同在于最終state狀態(tài),一個是EXCEPTION,一個是NORMAL。
這兩個方法最后都調(diào)用了finishCompletion()。這個方法主要是配合線程池喚醒下一個任務(wù)。
FutureTask.get()
從上面run()方法得知,最后執(zhí)行的結(jié)果放在了outcome變量中。那最終怎么從其中取出結(jié)果來,我們來看看get()方法。
從源碼可知,
get()方法分兩步。第一步,先判斷狀態(tài),如果計算為完成,則需要阻塞地等待完成。第二步,如果完成了,則調(diào)用report()方法獲取結(jié)果并返回。
先看看awaitDone()阻塞等待完成。該方法可以選用超時功能。
在自旋的for()循環(huán)中,
- 先判斷是否線程被中斷,中斷的話拋異常退出。
- 然后開始判斷運行的
state值,如果state大于COMPLETING,證明計算已經(jīng)是終態(tài)了,此時返回終態(tài)變量。 - 若
state等于COMPLETING,證明已經(jīng)開始計算,并且還在計算中。此時為了避免過多的CPU時間放在這個for循環(huán)的自旋上,程序執(zhí)行Thread.yield(),把線程從運行態(tài)降為就緒態(tài),讓出CPU時間。 - 若以上狀態(tài)都不是,則證明
state為NEW,還沒開始執(zhí)行。那么程序在當(dāng)前循環(huán)現(xiàn)在會新增一個WaitNode,在下一個循環(huán)里面調(diào)用LockSupport.park()把當(dāng)前線程阻塞。當(dāng)run()方法結(jié)束的時候,會再次喚醒此線程,避免自旋消耗CPU時間。 - 如果選用了超時功能,在阻塞和自旋過程中超時了,則會返回當(dāng)前超時的狀態(tài)。
第二步的report()方法比較簡單。
- 如果狀態(tài)是
NORMAL,正常結(jié)束的話,則把outcome變量返回; - 如果是取消或者中斷狀態(tài)的,則拋出取消異常;
- 如果是
EXCEPTION,則把outcome當(dāng)作異常拋出(之前setException()保存的類型就是Throwable)。從而整個get()會有一個異常拋出。
總結(jié)
至此我們已經(jīng)比較完整地了解Executor+Future的框架原理了,而FutureTask則是該框架的主要實現(xiàn)。下面總結(jié)下要點
-
Executor.sumbit()方法異步執(zhí)行一個任務(wù),并且返回一個Future結(jié)果。 -
submit()的原理是利用Callable創(chuàng)建一個FutureTask對象,然后執(zhí)行對象的run()方法,把結(jié)果保存在outcome中。 - 調(diào)用
get()獲取outcome時,如果任務(wù)未完成,會阻塞線程,等待執(zhí)行完畢。 - 異常和正常結(jié)果都放在
outcome中,調(diào)用get()獲取結(jié)果或拋出異常。
更多技術(shù)文章、精彩干貨,請關(guān)注
博客:zackku.com
微信公眾號:Zack說碼