本文主要介紹 Kotlin Coroutine 的基礎(chǔ) API,有關(guān) Kotlinx.Coroutine 的內(nèi)容,我們將在下一期給大家介紹。由于本人水平有限,如果大家有什么異議,歡迎直接拋出來(lái)跟我討論。
1. 什么是 Coroutine
Coroutine 被翻譯成了“協(xié)程”,意思就是要各個(gè)子任務(wù)協(xié)作運(yùn)行的意思,所以大家一下就明白了它被創(chuàng)造出來(lái)是要解決異步問(wèn)題的。
我們寫 Java 的程序員,對(duì)線程更熟悉一些。線程是比進(jìn)程更小一級(jí)的運(yùn)行單位,它的調(diào)度由操作系統(tǒng)來(lái)完成,所以我們只管 new Thread 和 start,至于什么時(shí)候 run,什么時(shí)候 run 完,我們都沒(méi)辦法預(yù)見(jiàn)。
Thread t = new Thread(task);
t.start();
盡管有諸多不可控的因素,不過(guò)我們可以肯定的是起了一個(gè)新的線程并啟動(dòng)它之后,當(dāng)前線程并不會(huì)受到阻塞。如果大家再往深處想想,CPU 在任意時(shí)刻運(yùn)行什么進(jìn)程及其線程,是操作系統(tǒng)決定的,但歸根結(jié)底一個(gè)單線程的 CPU 在任一時(shí)刻只能運(yùn)行一個(gè)任務(wù)。
那么協(xié)程呢?協(xié)程的調(diào)度是應(yīng)用層完成的,比如我們說(shuō) Lua 支持協(xié)程,那么各個(gè)協(xié)程如何運(yùn)行,這一調(diào)度工作實(shí)際上是 Lua 自己的虛擬機(jī)來(lái)完成的。這個(gè)調(diào)度與線程調(diào)度有著比較大的差別,線程調(diào)度是搶占式調(diào)度,很有可能線程 A 運(yùn)行得美滋滋的,線程 B 突然把 CPU 搶過(guò)來(lái),跟 A 說(shuō)“你給我下去吧你”,于是線程 A 只能干瞪眼沒(méi)辦法;而協(xié)程的調(diào)度是非搶占式的,目前常見(jiàn)的各種支持協(xié)程的語(yǔ)言實(shí)現(xiàn)中都有 yield 關(guān)鍵字,它有“妥協(xié)、退讓”的意思,如果一個(gè)協(xié)程執(zhí)行到一段代碼需要歇會(huì)兒,那么它將把執(zhí)行權(quán)讓出來(lái),如果它不這么做,沒(méi)人跟它搶。
在 接觸 Kotlin 的協(xié)程之前呢,我們先給大家看一個(gè) Lua 的例子,比較直觀:
function foo(a)
print("foo", a)
return coroutine.yield(2 * a)
end
co = coroutine.create(function ( a, b )
print("co-body", a, b)
local r = foo(a + 1)
print("co-body", r)
local r, s = coroutine.yield(a + b, a - b)
print("co-body", r, s)
return b, "end"
end)
print("main", coroutine.resume(co, 1, 10))
print("main", coroutine.resume(co, "r"))
print("main", coroutine.resume(co, "x", "y"))
print("main", coroutine.resume(co, "x", "y"))
運(yùn)行結(jié)果如下:
co-body 1 10
foo 2
main true 4
co-body r
main true 11 -9
co-body x y
main true 10 end
main false cannot resume dead coroutine
首先定義了一個(gè) foo 函數(shù),然后創(chuàng)建 coroutine,創(chuàng)建了之后還需要調(diào)用 resume 才能執(zhí)行協(xié)程,運(yùn)行過(guò)程是謙讓的,是交替的:
圖中數(shù)字表示第n次
協(xié)程為我們的程序提供了一種暫停的能力,就好像狀態(tài)機(jī),只有等到下一次輸入,它才做狀態(tài)轉(zhuǎn)移。顯然,用協(xié)程來(lái)描述一個(gè)狀態(tài)機(jī)是再合適不過(guò)的了。
也許大家對(duì) lua 的語(yǔ)法不是很熟悉,不過(guò)沒(méi)關(guān)系,上面的例子只需要知道大概是在干什么就行:這例子就好像,main 和 Foo 在交替干活,有點(diǎn)兒像 A B 兩個(gè)人分工協(xié)作,A 干一會(huì)兒 B 來(lái),B 干一會(huì)兒,再讓 A 來(lái)一樣。如果我們用線程來(lái)描述這個(gè)問(wèn)題,那么可能會(huì)用到很多回調(diào),相信寫 Js 的兄弟聽(tīng)到這兒要感到崩潰了,因?yàn)?Js 的代碼寫著寫著就容易回調(diào)滿天飛,業(yè)務(wù)邏輯的實(shí)現(xiàn)越來(lái)越抽象,可讀性越來(lái)越差;而用協(xié)程的話,就好像一個(gè)很平常的同步操作一樣,一點(diǎn)兒異步任務(wù)的感覺(jué)都沒(méi)有。
我們前面提到的協(xié)程的非搶占調(diào)度方式,以及這個(gè)交替執(zhí)行代碼的例子,基本上可以說(shuō)明協(xié)程實(shí)際上致力于用同步一樣的代碼來(lái)完成異步任務(wù)的運(yùn)行。
一句話,有了協(xié)程,你的異步程序看起來(lái)就像同步代碼一樣。
2. Kotlin 協(xié)程初體驗(yàn)
Kotlin 1.1 對(duì)協(xié)程的基本支持都在 Kotlin 標(biāo)準(zhǔn)庫(kù)當(dāng)中,主要涉及兩個(gè)類和幾個(gè)包級(jí)函數(shù)和擴(kuò)展方法:
-
CoroutineContext,協(xié)程的上下文,這個(gè)上下文可以是多個(gè)的組合,組合的上下文可以通過(guò) key 來(lái)獲取。EmptyCoroutineContext 是一個(gè)空實(shí)現(xiàn),沒(méi)有任何功能,如果我們?cè)谑褂脜f(xié)程時(shí)不需要上下文,那么我們就用這個(gè)對(duì)象作為一個(gè)占位即可。上下文這個(gè)東西,不管大家做什么應(yīng)用,總是能遇到,比如 Android 里面的 Context,JSP 里面的 PageContext 等等,他們扮演的角色都大同小異:資源管理,數(shù)據(jù)持有等等,協(xié)程的上下文也基本上是如此。
Continuation,顧名思義,繼續(xù)、持續(xù)的意思。我們前面說(shuō)過(guò),協(xié)程提供了一種暫停的能力,可繼續(xù)執(zhí)行才是最終的目的,Continuation 有兩個(gè)方法,一個(gè)是 resume,如果我們的程序沒(méi)有任何異常,那么直接調(diào)用這個(gè)方法并傳入需要返回的值;另一個(gè)是 resumeWithException,如果我們的程序出了異常,那我們可以通過(guò)調(diào)用這個(gè)方法把異常傳遞出去。
- 協(xié)程的基本操作,包括創(chuàng)建、啟動(dòng)、暫停和繼續(xù),繼續(xù)的操作在 Continuation 當(dāng)中,剩下的三個(gè)都是包級(jí)函數(shù)或擴(kuò)展方法:
這幾個(gè)類和函數(shù)其實(shí)與我們前面提到的 Lua 的協(xié)程 API 非常相似,都是協(xié)程最基礎(chǔ)的 API。
除此之外,Kotlin 還增加了一個(gè)關(guān)鍵字:suspend,用作修飾會(huì)被暫停的函數(shù),被標(biāo)記為 suspend 的函數(shù)只能運(yùn)行在協(xié)程或者其他 suspend 函數(shù)當(dāng)中。
好,介紹完這些基本概念,讓我們來(lái)看一個(gè)例子:
fun main(args: Array<String>) {
log("before coroutine")
//啟動(dòng)我們的協(xié)程
asyncCalcMd5("test.zip") {
log("in coroutine. Before suspend.")
//暫停我們的線程,并開(kāi)始執(zhí)行一段耗時(shí)操作
val result: String = suspendCoroutine {
continuation ->
log("in suspend block.")
continuation.resume(calcMd5(continuation.context[FilePath]!!.path))
log("after resume.")
}
log("in coroutine. After suspend. result = $result")
}
log("after coroutine")
}
/**
* 上下文,用來(lái)存放我們需要的信息,可以靈活的自定義
*/
class FilePath(val path: String): AbstractCoroutineContextElement(FilePath){
companion object Key : CoroutineContext.Key<FilePath>
}
fun asyncCalcMd5(path: String, block: suspend () -> Unit) {
val continuation = object : Continuation<Unit> {
override val context: CoroutineContext
get() = FilePath(path)
override fun resume(value: Unit) {
log("resume: $value")
}
override fun resumeWithException(exception: Throwable) {
log(exception.toString())
}
}
block.startCoroutine(continuation)
}
fun calcMd5(path: String): String{
log("calc md5 for $path.")
//暫時(shí)用這個(gè)模擬耗時(shí)
Thread.sleep(1000)
//假設(shè)這就是我們計(jì)算得到的 MD5 值
return System.currentTimeMillis().toString()
}
這段程序在模擬計(jì)算文件的 Md5 值。我們知道,文件的 Md5 值計(jì)算是一項(xiàng)耗時(shí)操作,所以我們希望啟動(dòng)一個(gè)協(xié)程來(lái)處理這個(gè)耗時(shí)任務(wù),并在任務(wù)運(yùn)行結(jié)束時(shí)打印出來(lái)計(jì)算的結(jié)果。
我們先來(lái)一段一段分析下這個(gè)示例:
/**
* 上下文,用來(lái)存放我們需要的信息,可以靈活的自定義
*/
class FilePath(val path: String): AbstractCoroutineContextElement(FilePath){
companion object Key : CoroutineContext.Key<FilePath>
}
我們?cè)谟?jì)算過(guò)程中需要知道計(jì)算哪個(gè)文件的 Md5,所以我們需要通過(guò)上下文把這個(gè)路徑傳入?yún)f(xié)程當(dāng)中。如果有多個(gè)數(shù)據(jù),也可以一并添加進(jìn)去,在運(yùn)行當(dāng)中,我們可以通過(guò) Continuation 的實(shí)例拿到上下文,進(jìn)而獲取到這個(gè)路徑:
continuation.context[FilePath]!!.path
接著,我們?cè)賮?lái)看下 Continuation:
val continuation = object : Continuation<Unit> {
override val context: CoroutineContext
get() = FilePath(path)
override fun resume(value: Unit) {
log("resume: $value")
}
override fun resumeWithException(exception: Throwable) {
log(exception.toString())
}
}
我們除了給定了 FilePath 這樣一個(gè)上下文之外就是簡(jiǎn)單的打了幾行日志,比較簡(jiǎn)單。這里傳入的 Continuation 當(dāng)中的 resume 和 resumeWithException 只有在協(xié)程最終執(zhí)行完成后才會(huì)被調(diào)用,這一點(diǎn)需要注意一下,也正是因?yàn)槿绱?,startCoroutine 把它叫做 completion:
public fun <T> (suspend () -> T).startCoroutine(completion: Continuation<T>
那么下面我們看下最關(guān)鍵的這段代碼:
asyncCalcMd5("test.zip") {
log("in coroutine. Before suspend.")
//暫停我們的協(xié)程,并開(kāi)始執(zhí)行一段耗時(shí)操作
val result: String = suspendCoroutine {
continuation ->
log("in suspend block.")
continuation.resume(calcMd5(continuation.context[FilePath]!!.path))
log("after resume.")
}
log("in coroutine. After suspend. result = $result")
}
suspendCoroutine 這個(gè)方法將外部的代碼執(zhí)行權(quán)拿走,并轉(zhuǎn)入傳入的 Lambda 表達(dá)式中,而這個(gè)表達(dá)式當(dāng)中的操作就對(duì)應(yīng)異步的耗時(shí)操作了,在這里我們“計(jì)算”出了 Md5 值,接著調(diào)用 continuation.resume 將結(jié)果傳了出去,傳給了誰(shuí)呢?傳給了 suspendCoroutine 的返回值也即 result,這時(shí)候協(xié)程繼續(xù)執(zhí)行,打印 result 結(jié)束。
下面就是運(yùn)行結(jié)果了:
2017-01-30T06:43:52.284Z [main] before coroutine
2017-01-30T06:43:52.422Z [main] in coroutine. Before suspend.
2017-01-30T06:43:52.423Z [main] in suspend block.
2017-01-30T06:43:52.423Z [main] calc md5 for test.zip.
2017-01-30T06:43:53.426Z [main] after resume.
2017-01-30T06:43:53.427Z [main] in coroutine. After suspend. result = 1485758633426
2017-01-30T06:43:53.427Z [main] resume: 1485758633426
2017-01-30T06:43:53.427Z [main] after coroutine
細(xì)心的讀者肯定一看就發(fā)現(xiàn),所謂的異步操作是怎么個(gè)異步法?從日志上面看,明明上面這段代碼就是順序執(zhí)行的嘛,不然 after coroutine 這句日志為什么非要等到最后才打?。?/p>
還有,整個(gè)程序都只運(yùn)行在了主線程上,我們的日志足以說(shuō)明這一點(diǎn)了,根本沒(méi)有異步嘛。難道說(shuō)協(xié)程就是一個(gè)大騙子??
3. 實(shí)現(xiàn)異步
這一部分我們就要回答上一節(jié)留下的問(wèn)題。不過(guò)在此之前,我們?cè)賮?lái)回顧一下協(xié)程存在的意義:讓異步代碼看上去像同步代碼,直接自然易懂。至于它如何做到這一點(diǎn),可能各家的語(yǔ)言實(shí)現(xiàn)各有不同,但協(xié)程給人的感覺(jué)更像是底層并發(fā) API(比如線程)的語(yǔ)法糖。當(dāng)然,如果你愿意,我們通常所謂的線程也可以被稱作操作系統(tǒng)級(jí) API 的語(yǔ)法糖了吧,畢竟各家語(yǔ)言對(duì)于線程的實(shí)現(xiàn)也各有不同,這個(gè)就不是我們今天要討論的內(nèi)容了。
不管怎么樣,你只需要知道,協(xié)程的異步需要依賴比它更底層的 API 支持,那么在 Kotlin 當(dāng)中,這個(gè)所謂的底層 API 就非線程莫屬了。
知道了這一點(diǎn),我們就要考慮想辦法來(lái)把前面的示例完善一下了。
首先我們實(shí)例化一個(gè)線程池:
private val executor = Executors.newSingleThreadScheduledExecutor {
Thread(it, "scheduler")
}
接著我們把計(jì)算 Md5 的部分交給線程池去運(yùn)行:
asyncCalcMd5("test.zip") {
log("in coroutine. Before suspend.")
//暫停我們的線程,并開(kāi)始執(zhí)行一段耗時(shí)操作
val result: String = suspendCoroutine {
continuation ->
log("in suspend block.")
executor.submit {
continuation.resume(calcMd5(continuation.context[FilePath]!!.path))
log("after resume.")
}
}
log("in coroutine. After suspend. result = $result")
executor.shutdown()
}
那么結(jié)果呢?
2017-01-30T07:18:04.496Z [main] before coroutine
2017-01-30T07:18:04.754Z [main] in coroutine. Before suspend.
2017-01-30T07:18:04.757Z [main] in suspend block.
2017-01-30T07:18:04.765Z [main] after coroutine
2017-01-30T07:18:04.765Z [scheduler] calc md5 for test.zip.
2017-01-30T07:18:05.769Z [scheduler] in coroutine. After suspend. result = 1485760685768
2017-01-30T07:18:05.769Z [scheduler] resume: 1485760685768
2017-01-30T07:18:05.769Z [scheduler] after resume.
我們看到在協(xié)程被暫停的那一刻,協(xié)程外面的代碼被執(zhí)行了。一段時(shí)間之后,協(xié)程被繼續(xù)執(zhí)行,打印結(jié)果。
截止到現(xiàn)在,我們用協(xié)程來(lái)實(shí)現(xiàn)異步操作的功能已經(jīng)實(shí)現(xiàn)。
你可能要問(wèn),如果我們想要完成異步操作,直接用線程池加回調(diào)豈不更直接簡(jiǎn)單,為什么要用協(xié)程呢,搞得代碼這么讓人費(fèi)解不說(shuō),也沒(méi)有變的很簡(jiǎn)單啊。
說(shuō)的對(duì),如果我們實(shí)際當(dāng)中把協(xié)程的代碼都寫成這樣,肯定會(huì)被蛋疼死,我前面展示給大家的,是 Kotlin 標(biāo)準(zhǔn)庫(kù)當(dāng)中最為基礎(chǔ)的 API,看起來(lái)非常的原始也是理所應(yīng)當(dāng)?shù)模绻覀儗?duì)其加以封裝,那效果肯定大不一樣。
除此之外,在高并發(fā)的場(chǎng)景下,多個(gè)協(xié)程可以共享一個(gè)或者多個(gè)線程,性能可能會(huì)要好一些。舉個(gè)簡(jiǎn)單的例子,一臺(tái)服務(wù)器有 1k 用戶與之連接,如果我們采用類似于 Tomcat 的實(shí)現(xiàn)方式,一個(gè)用戶開(kāi)一個(gè)線程去處理請(qǐng)求,那么我們將要開(kāi) 1k 個(gè)線程,這算是個(gè)不小的數(shù)目了;而我們?nèi)绻褂脜f(xié)程,為每一個(gè)用戶創(chuàng)建一個(gè)協(xié)程,考慮到同一時(shí)刻并不是所有用戶都需要數(shù)據(jù)傳輸,因此我們并不需要同時(shí)處理所有用戶的請(qǐng)求,那么這時(shí)候可能只需要幾個(gè)專門的 IO 線程和少數(shù)來(lái)承載用戶請(qǐng)求對(duì)應(yīng)的協(xié)程的線程,只有當(dāng)用戶有數(shù)據(jù)傳輸事件到來(lái)的時(shí)候才去響應(yīng),其他時(shí)間直接掛起,這種事件驅(qū)動(dòng)的服務(wù)器顯然對(duì)資源的消耗要小得多。
4. 進(jìn)一步封裝
這一節(jié)的內(nèi)容較多的參考了 Kotlin 官方的 Coroutine Example,里面有更多的例子,大家可以參考學(xué)習(xí)。
4.1 異步
剛才那個(gè)示例讓我們感覺(jué)到,寫個(gè)協(xié)程調(diào)用異步代碼實(shí)在太原始了,所以我們決定對(duì)它做一下封裝。如果我們能在調(diào)用 suspendCoroutine 的時(shí)候直接把后面的代碼攔截,并切到線程池當(dāng)中執(zhí)行,那么我們就不用每次自己搞一個(gè)線程池來(lái)做這事兒了,嗯,讓我們研究下有什么辦法可以做到這一點(diǎn)。
攔截...怎么攔截呢?
public interface ContinuationInterceptor : CoroutineContext.Element {
companion object Key : CoroutineContext.Key<ContinuationInterceptor>
public fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T>
}
我們發(fā)現(xiàn),Kotlin 的協(xié)程 API 當(dāng)中提供了這么一個(gè)攔截器,可以把協(xié)程的操作攔截,傳入的是原始的 Continuation,返回的是我們經(jīng)過(guò)線程切換的 Continuation,這樣就可以實(shí)現(xiàn)我們的目的了。
open class Pool(val pool: ForkJoinPool)
: AbstractCoroutineContextElement(ContinuationInterceptor),
ContinuationInterceptor {
override fun <T> interceptContinuation(continuation: Continuation<T>)
: Continuation<T> =
PoolContinuation(pool,
//下面這段代碼是要查找其他攔截器,并保證能調(diào)用它們的攔截方法
continuation.context.fold(continuation, { cont, element ->
if (element != this@Pool && element is ContinuationInterceptor)
element.interceptContinuation(cont) else cont
}))
}
private class PoolContinuation<T>(
val pool: ForkJoinPool,
val continuation: Continuation<T>
) : Continuation<T> by continuation {
override fun resume(value: T) {
if (isPoolThread()) continuation.resume(value)
else pool.execute { continuation.resume(value) }
}
override fun resumeWithException(exception: Throwable) {
if (isPoolThread()) continuation.resumeWithException(exception)
else pool.execute { continuation.resumeWithException(exception) }
}
fun isPoolThread(): Boolean = (Thread.currentThread() as? ForkJoinWorkerThread)?.pool == pool
}
這個(gè) Pool 是什么鬼?我們讓它繼承 AbstractCoroutineContextElement 表明它其實(shí)就是我們需要的上下文。實(shí)際上這個(gè)上下文可以給任意協(xié)程使用,于是我們?cè)俣x一個(gè) object:
object CommonPool : Pool(ForkJoinPool.commonPool())
有了這個(gè),我們就可以把沒(méi)加線程池的版本改改了:
fun main(args: Array<String>) {
log("before coroutine")
//啟動(dòng)我們的協(xié)程
asyncCalcMd5("test.zip") {
...
}
log("after coroutine")
//加這句的原因是防止程序在協(xié)程運(yùn)行完之前停止
CommonPool.pool.awaitTermination(10000, TimeUnit.MILLISECONDS)
}
...
fun asyncCalcMd5(path: String, block: suspend () -> String) {
val continuation = object : Continuation<String> {
override val context: CoroutineContext
//注意這個(gè)寫法,上下文可以通過(guò) + 來(lái)組合使用
get() = FilePath(path) + CommonPool
...
}
block.startCoroutine(continuation)
}
...
那么運(yùn)行結(jié)果呢?
2017-01-30T09:13:11.183Z [main] before coroutine
2017-01-30T09:13:11.334Z [main] after coroutine
2017-01-30T09:13:11.335Z [ForkJoinPool.commonPool-worker-1] in coroutine. Before suspend.
2017-01-30T09:13:11.337Z [ForkJoinPool.commonPool-worker-1] in suspend block.
2017-01-30T09:13:11.337Z [ForkJoinPool.commonPool-worker-1] calc md5 for test.zip.
2017-01-30T09:13:12.340Z [ForkJoinPool.commonPool-worker-1] after resume.
2017-01-30T09:13:12.341Z [ForkJoinPool.commonPool-worker-1] in coroutine. After suspend. result = 1485767592340
2017-01-30T09:13:12.341Z [ForkJoinPool.commonPool-worker-1] resume: 1485767592340
我們看到程序已經(jīng)非常完美的實(shí)現(xiàn)異步調(diào)用。顯然,這種寫法要比線程池回調(diào)的寫法看上去順理成章得多。
4.2 啟動(dòng)協(xié)程
在討論完異步的封裝后,有人肯定還是會(huì)提出新問(wèn)題:?jiǎn)?dòng)協(xié)程的寫法是不是有點(diǎn)兒?jiǎn)铝税。繘](méi)錯(cuò),每次構(gòu)造一個(gè) Continuation,也沒(méi)干多少事兒,實(shí)在沒(méi)什么必要,干脆封裝一個(gè)通用的版本得了:
class StandaloneCoroutine(override val context: CoroutineContext): Continuation<Unit> {
override fun resume(value: Unit) {}
override fun resumeWithException(exception: Throwable) {
//處理異常
val currentThread = Thread.currentThread()
currentThread.uncaughtExceptionHandler.uncaughtException(currentThread, exception)
}
}
這樣就好辦了,我們每次啟動(dòng)協(xié)程只需要針對(duì)當(dāng)前協(xié)程提供特定的上下文即可,那么我們是不是再把啟動(dòng)的那個(gè)函數(shù)改改呢?
fun launch(context: CoroutineContext, block: suspend () -> Unit) =
block.startCoroutine(StandaloneCoroutine(context))
有了這個(gè),我們前面的代碼就可以進(jìn)一步修改:
fun main(args: Array<String>) {
log("before coroutine")
//啟動(dòng)我們的協(xié)程
launch(FilePath("test.zip") + CommonPool) {
log("in coroutine. Before suspend.")
//暫停我們的線程,并開(kāi)始執(zhí)行一段耗時(shí)操作
val result: String = suspendCoroutine {
continuation ->
log("in suspend block.")
continuation.resume(calcMd5(continuation.context[FilePath]!!.path))
log("after resume.")
}
log("in coroutine. After suspend. result = $result")
}
log("after coroutine")
CommonPool.pool.awaitTermination(10000, TimeUnit.MILLISECONDS)
}
/**
* 上下文,用來(lái)存放我們需要的信息,可以靈活的自定義
*/
class FilePath(val path: String) : AbstractCoroutineContextElement(Key) {
companion object Key : CoroutineContext.Key<FilePath>
}
fun calcMd5(path: String): String {
log("calc md5 for $path.")
//暫時(shí)用這個(gè)模擬耗時(shí)
Thread.sleep(1000)
//假設(shè)這就是我們計(jì)算得到的 MD5 值
return System.currentTimeMillis().toString()
}
運(yùn)行結(jié)果自然也沒(méi)什么好說(shuō)的。
4.3 暫停協(xié)程
暫停協(xié)程這塊兒也太亂了,看著莫名其妙的,能不能直白一點(diǎn)兒呢?其實(shí)我們的代碼不過(guò)是想要獲取 Md5 的值,所以如果能寫成下面這樣就好了:
val result = calcMd5(continuation.context[FilePath]!!.path).await()
毋庸置疑,這肯定是可以的。想一下,有哪個(gè)類可以支持我們直接阻塞線程,等到獲取到結(jié)果之后再返回呢?當(dāng)然是 Future 了。
suspend fun <T> CompletableFuture<T>.await(): T {
return suspendCoroutine {
continuation ->
whenComplete { result, e ->
if (e == null) continuation.resume(result)
else continuation.resumeWithException(e)
}
}
}
我們干脆就直接給 CompletableFuture 定義一個(gè)擴(kuò)展方法,當(dāng)中只是用來(lái)掛起協(xié)程,并在結(jié)果拿到之后繼續(xù)執(zhí)行協(xié)程。這樣,我們的代碼可以進(jìn)一步修改:
fun main(args: Array<String>) {
log("before coroutine")
//啟動(dòng)我們的協(xié)程
val coroutineContext = FilePath("test.zip") + CommonPool
launch(coroutineContext) {
log("in coroutine. Before suspend.")
//暫停我們的線程,并開(kāi)始執(zhí)行一段耗時(shí)操作
val result: String = calcMd5(coroutineContext[FilePath]!!.path).await()
log("in coroutine. After suspend. result = $result")
}
log("after coroutine")
CommonPool.pool.awaitTermination(10, TimeUnit.SECONDS)
}
fun calcMd5(path: String): CompletableFuture<String> = CompletableFuture.supplyAsync {
log("calc md5 for $path.")
//暫時(shí)用這個(gè)模擬耗時(shí)
Thread.sleep(1000)
//假設(shè)這就是我們計(jì)算得到的 MD5 值
System.currentTimeMillis().toString()
}
... 省略掉一些沒(méi)有修改的代碼 ...
4.4 帶有 Receiver 的協(xié)程
不知道大家注意到?jīng)]有, 4.3 的代碼中有個(gè)地方比較別扭:
val coroutineContext = FilePath("test.zip") + CommonPool
launch(coroutineContext) {
...
//在協(xié)程內(nèi)部想要訪問(wèn)上下文居然需要用到外部的變量
val result: String = calcMd5(coroutineContext[FilePath]!!.path).await()
...
}
在協(xié)程內(nèi)部想要訪問(wèn)上下文居然需要用到外部的變量。這個(gè)上下文畢竟是協(xié)程自己的,自己居然沒(méi)有辦法直接獲取到,一點(diǎn)兒都不自然。
其實(shí)這也不是沒(méi)有辦法,startCoroutine 其實(shí)還有一個(gè)帶 receiver 的版本:
public fun <R, T> (suspend R.() -> T).startCoroutine(
receiver: R,
completion: Continuation<T>
也就是說(shuō),我們不僅可以傳入一個(gè)獨(dú)立的函數(shù)作為協(xié)程的代碼塊,還可以將一個(gè)對(duì)象的方法傳入,也就是說(shuō),我們完全可以在啟動(dòng)協(xié)程的時(shí)候?yàn)樗付ㄒ粋€(gè) receiver:
fun <T> launch(
receiver: T,
context: CoroutineContext,
block: suspend T.() -> Unit)
= block.startCoroutine(receiver, StandaloneCoroutine(context))
我們修改了 launch,加入了 receiver,于是我們的代碼也可以這么改:
val coroutineContext = FilePath("test.zip") + CommonPool
//需要傳入 receiver
launch(coroutineContext, coroutineContext) {
...
//注意下面直接用 this 來(lái)獲取路徑
val result: String = calcMd5(this[FilePath]!!.path).await()
...
}
如果你覺(jué)得絕大多數(shù)情況下 receiver 都會(huì)是上下文那么上面的代碼還可以接著簡(jiǎn)化:
fun launchWithContext(
context: CoroutineContext,
block: suspend CoroutineContext.() -> Unit)
= launch(context, context, block)
launchWithContext(FilePath("test.zip") + CommonPool) {
log("in coroutine. Before suspend.")
//暫停我們的線程,并開(kāi)始執(zhí)行一段耗時(shí)操作
val result: String = calcMd5(this[FilePath]!!.path).await()
log("in coroutine. After suspend. result = $result")
}
截止到現(xiàn)在,我們對(duì)最初的代碼做了各種封裝,這些封裝后的代碼可以在各種場(chǎng)景下直接使用,于是我們的協(xié)程代碼也得到了大幅簡(jiǎn)化。另外,不知道大家有沒(méi)有注意到,協(xié)程當(dāng)中異常的處理也要比直接用線程寫回調(diào)的方式容易的多,我們只需要在 Continuation 當(dāng)中覆寫 resumeWithException 方法就可以做到這一點(diǎn)。
5. 拿來(lái)主義:Kotlinx.Coroutine
Kotlinx.Coroutine 是官方單獨(dú)發(fā)出來(lái)的一個(gè) Coroutine 的庫(kù),這個(gè)庫(kù)為什么沒(méi)有隨著標(biāo)準(zhǔn)庫(kù)一并發(fā)出來(lái),想必大家從其包名就能略窺一二:kotlinx.coroutines.experimental,experimental,還處于試驗(yàn)階段。不過(guò)既然敢隨著 1.1 Beta 一并發(fā)出來(lái),也說(shuō)明后面的大方向不會(huì)太遠(yuǎn),大家可以直接開(kāi)始嘗試其中的 API 了。
應(yīng)該說(shuō),Kotlinx.Coroutine 做的事情跟我們?cè)谏弦还?jié)做的事情是相同的,只不過(guò)它在這個(gè)方向上面走的更遠(yuǎn)。有關(guān)它的一些用法和細(xì)節(jié),我們將在下一期給大家介紹。
6. 小結(jié)
本文主要對(duì) Kotlin 1.1Beta 標(biāo)準(zhǔn)庫(kù)的 Coroutine API 做了介紹,也給出了相應(yīng)的示例向大家展示 Coroutine 能為我們帶來(lái)什么。
協(xié)程是干什么的?是用來(lái)讓異步代碼更具表現(xiàn)力的。如果運(yùn)用得當(dāng),它將讓我們免于回調(diào)嵌套之苦,并發(fā)加鎖之痛,使我們能夠利用我們有限的時(shí)間寫出更有魅力的程序。