深入理解 Kotlin Coroutine (一)

原文鏈接:https://github.com/enbandari/Kotlin-Tutorials

本文主要介紹 Kotlin Coroutine 的基礎(chǔ) API,有關(guān) Kotlinx.Coroutine 的內(nèi)容,我們將在下一期給大家介紹。由于本人水平有限,如果大家有什么異議,歡迎直接拋出來(lái)跟我討論。

1. 什么是 Coroutine

Coroutine 被翻譯成了“協(xié)程”,意思就是要各個(gè)子任務(wù)協(xié)作運(yùn)行的意思,所以大家一下就明白了它被創(chuàng)造出來(lái)是要解決異步問(wèn)題的。

我們寫 Java 的程序員,對(duì)線程更熟悉一些。線程是比進(jìn)程更小一級(jí)的運(yùn)行單位,它的調(diào)度由操作系統(tǒng)來(lái)完成,所以我們只管 new Thread 和 start,至于什么時(shí)候 run,什么時(shí)候 run 完,我們都沒(méi)辦法預(yù)見(jiàn)。

 Thread t = new Thread(task); 
 t.start(); 

盡管有諸多不可控的因素,不過(guò)我們可以肯定的是起了一個(gè)新的線程并啟動(dòng)它之后,當(dāng)前線程并不會(huì)受到阻塞。如果大家再往深處想想,CPU 在任意時(shí)刻運(yùn)行什么進(jìn)程及其線程,是操作系統(tǒng)決定的,但歸根結(jié)底一個(gè)單線程的 CPU 在任一時(shí)刻只能運(yùn)行一個(gè)任務(wù)。

那么協(xié)程呢?協(xié)程的調(diào)度是應(yīng)用層完成的,比如我們說(shuō) Lua 支持協(xié)程,那么各個(gè)協(xié)程如何運(yùn)行,這一調(diào)度工作實(shí)際上是 Lua 自己的虛擬機(jī)來(lái)完成的。這個(gè)調(diào)度與線程調(diào)度有著比較大的差別,線程調(diào)度是搶占式調(diào)度,很有可能線程 A 運(yùn)行得美滋滋的,線程 B 突然把 CPU 搶過(guò)來(lái),跟 A 說(shuō)“你給我下去吧你”,于是線程 A 只能干瞪眼沒(méi)辦法;而協(xié)程的調(diào)度是非搶占式的,目前常見(jiàn)的各種支持協(xié)程的語(yǔ)言實(shí)現(xiàn)中都有 yield 關(guān)鍵字,它有“妥協(xié)、退讓”的意思,如果一個(gè)協(xié)程執(zhí)行到一段代碼需要歇會(huì)兒,那么它將把執(zhí)行權(quán)讓出來(lái),如果它不這么做,沒(méi)人跟它搶。

在 接觸 Kotlin 的協(xié)程之前呢,我們先給大家看一個(gè) Lua 的例子,比較直觀:

 function foo(a) 
     print("foo", a) 
     return coroutine.yield(2 * a) 
 end 
   
 co = coroutine.create(function ( a, b ) 
     print("co-body", a, b) 
     local r = foo(a + 1) 
     print("co-body", r) 
     local r, s = coroutine.yield(a + b, a - b) 
     print("co-body", r, s) 
     return b, "end" 
 end) 
   
 print("main", coroutine.resume(co, 1, 10)) 
 print("main", coroutine.resume(co, "r")) 
 print("main", coroutine.resume(co, "x", "y")) 
 print("main", coroutine.resume(co, "x", "y")) 

運(yùn)行結(jié)果如下:

 co-body    1   10 
 foo    2 
 main   true    4 
 co-body    r 
 main   true    11  -9 
 co-body    x   y 
 main   true    10  end 
 main   false   cannot resume dead coroutine 

首先定義了一個(gè) foo 函數(shù),然后創(chuàng)建 coroutine,創(chuàng)建了之后還需要調(diào)用 resume 才能執(zhí)行協(xié)程,運(yùn)行過(guò)程是謙讓的,是交替的:

圖中數(shù)字表示第n次

協(xié)程為我們的程序提供了一種暫停的能力,就好像狀態(tài)機(jī),只有等到下一次輸入,它才做狀態(tài)轉(zhuǎn)移。顯然,用協(xié)程來(lái)描述一個(gè)狀態(tài)機(jī)是再合適不過(guò)的了。

也許大家對(duì) lua 的語(yǔ)法不是很熟悉,不過(guò)沒(méi)關(guān)系,上面的例子只需要知道大概是在干什么就行:這例子就好像,main 和 Foo 在交替干活,有點(diǎn)兒像 A B 兩個(gè)人分工協(xié)作,A 干一會(huì)兒 B 來(lái),B 干一會(huì)兒,再讓 A 來(lái)一樣。如果我們用線程來(lái)描述這個(gè)問(wèn)題,那么可能會(huì)用到很多回調(diào),相信寫 Js 的兄弟聽(tīng)到這兒要感到崩潰了,因?yàn)?Js 的代碼寫著寫著就容易回調(diào)滿天飛,業(yè)務(wù)邏輯的實(shí)現(xiàn)越來(lái)越抽象,可讀性越來(lái)越差;而用協(xié)程的話,就好像一個(gè)很平常的同步操作一樣,一點(diǎn)兒異步任務(wù)的感覺(jué)都沒(méi)有。

我們前面提到的協(xié)程的非搶占調(diào)度方式,以及這個(gè)交替執(zhí)行代碼的例子,基本上可以說(shuō)明協(xié)程實(shí)際上致力于用同步一樣的代碼來(lái)完成異步任務(wù)的運(yùn)行。

一句話,有了協(xié)程,你的異步程序看起來(lái)就像同步代碼一樣。

2. Kotlin 協(xié)程初體驗(yàn)

Kotlin 1.1 對(duì)協(xié)程的基本支持都在 Kotlin 標(biāo)準(zhǔn)庫(kù)當(dāng)中,主要涉及兩個(gè)類和幾個(gè)包級(jí)函數(shù)和擴(kuò)展方法:

  • CoroutineContext,協(xié)程的上下文,這個(gè)上下文可以是多個(gè)的組合,組合的上下文可以通過(guò) key 來(lái)獲取。EmptyCoroutineContext 是一個(gè)空實(shí)現(xiàn),沒(méi)有任何功能,如果我們?cè)谑褂脜f(xié)程時(shí)不需要上下文,那么我們就用這個(gè)對(duì)象作為一個(gè)占位即可。上下文這個(gè)東西,不管大家做什么應(yīng)用,總是能遇到,比如 Android 里面的 Context,JSP 里面的 PageContext 等等,他們扮演的角色都大同小異:資源管理,數(shù)據(jù)持有等等,協(xié)程的上下文也基本上是如此。

  • Continuation,顧名思義,繼續(xù)、持續(xù)的意思。我們前面說(shuō)過(guò),協(xié)程提供了一種暫停的能力,可繼續(xù)執(zhí)行才是最終的目的,Continuation 有兩個(gè)方法,一個(gè)是 resume,如果我們的程序沒(méi)有任何異常,那么直接調(diào)用這個(gè)方法并傳入需要返回的值;另一個(gè)是 resumeWithException,如果我們的程序出了異常,那我們可以通過(guò)調(diào)用這個(gè)方法把異常傳遞出去。

  • 協(xié)程的基本操作,包括創(chuàng)建、啟動(dòng)、暫停和繼續(xù),繼續(xù)的操作在 Continuation 當(dāng)中,剩下的三個(gè)都是包級(jí)函數(shù)或擴(kuò)展方法:

這幾個(gè)類和函數(shù)其實(shí)與我們前面提到的 Lua 的協(xié)程 API 非常相似,都是協(xié)程最基礎(chǔ)的 API。

除此之外,Kotlin 還增加了一個(gè)關(guān)鍵字:suspend,用作修飾會(huì)被暫停的函數(shù),被標(biāo)記為 suspend 的函數(shù)只能運(yùn)行在協(xié)程或者其他 suspend 函數(shù)當(dāng)中。

好,介紹完這些基本概念,讓我們來(lái)看一個(gè)例子:

 fun main(args: Array<String>) { 
     log("before coroutine") 
     //啟動(dòng)我們的協(xié)程 
     asyncCalcMd5("test.zip") { 
         log("in coroutine. Before suspend.") 
         //暫停我們的線程,并開(kāi)始執(zhí)行一段耗時(shí)操作 
         val result: String = suspendCoroutine { 
             continuation -> 
             log("in suspend block.") 
             continuation.resume(calcMd5(continuation.context[FilePath]!!.path)) 
             log("after resume.") 
         } 
         log("in coroutine. After suspend. result = $result") 
     } 
     log("after coroutine") 
 } 
   
 /** 
  * 上下文,用來(lái)存放我們需要的信息,可以靈活的自定義 
  */ 
 class FilePath(val path: String): AbstractCoroutineContextElement(FilePath){ 
     companion object Key : CoroutineContext.Key<FilePath> 
 } 
   
 fun asyncCalcMd5(path: String, block: suspend () -> Unit) { 
     val continuation = object : Continuation<Unit> { 
         override val context: CoroutineContext 
             get() = FilePath(path) 
   
         override fun resume(value: Unit) { 
             log("resume: $value") 
         } 
   
         override fun resumeWithException(exception: Throwable) { 
             log(exception.toString()) 
         } 
     } 
     block.startCoroutine(continuation) 
 } 
   
 fun calcMd5(path: String): String{ 
     log("calc md5 for $path.") 
     //暫時(shí)用這個(gè)模擬耗時(shí) 
     Thread.sleep(1000) 
     //假設(shè)這就是我們計(jì)算得到的 MD5 值 
     return System.currentTimeMillis().toString() 
 } 

這段程序在模擬計(jì)算文件的 Md5 值。我們知道,文件的 Md5 值計(jì)算是一項(xiàng)耗時(shí)操作,所以我們希望啟動(dòng)一個(gè)協(xié)程來(lái)處理這個(gè)耗時(shí)任務(wù),并在任務(wù)運(yùn)行結(jié)束時(shí)打印出來(lái)計(jì)算的結(jié)果。

我們先來(lái)一段一段分析下這個(gè)示例:

 /** 
  * 上下文,用來(lái)存放我們需要的信息,可以靈活的自定義 
  */ 
 class FilePath(val path: String): AbstractCoroutineContextElement(FilePath){ 
     companion object Key : CoroutineContext.Key<FilePath> 
 } 

我們?cè)谟?jì)算過(guò)程中需要知道計(jì)算哪個(gè)文件的 Md5,所以我們需要通過(guò)上下文把這個(gè)路徑傳入?yún)f(xié)程當(dāng)中。如果有多個(gè)數(shù)據(jù),也可以一并添加進(jìn)去,在運(yùn)行當(dāng)中,我們可以通過(guò) Continuation 的實(shí)例拿到上下文,進(jìn)而獲取到這個(gè)路徑:

 continuation.context[FilePath]!!.path 

接著,我們?cè)賮?lái)看下 Continuation:

 val continuation = object : Continuation<Unit> { 
     override val context: CoroutineContext 
         get() = FilePath(path) 
   
     override fun resume(value: Unit) { 
         log("resume: $value") 
     } 
   
     override fun resumeWithException(exception: Throwable) { 
         log(exception.toString()) 
     } 
 } 

我們除了給定了 FilePath 這樣一個(gè)上下文之外就是簡(jiǎn)單的打了幾行日志,比較簡(jiǎn)單。這里傳入的 Continuation 當(dāng)中的 resume 和 resumeWithException 只有在協(xié)程最終執(zhí)行完成后才會(huì)被調(diào)用,這一點(diǎn)需要注意一下,也正是因?yàn)槿绱?,startCoroutine 把它叫做 completion:

 public fun <T> (suspend  () -> T).startCoroutine(completion: Continuation<T> 

那么下面我們看下最關(guān)鍵的這段代碼:

 asyncCalcMd5("test.zip") { 
     log("in coroutine. Before suspend.") 
     //暫停我們的協(xié)程,并開(kāi)始執(zhí)行一段耗時(shí)操作 
     val result: String = suspendCoroutine { 
         continuation -> 
         log("in suspend block.") 
         continuation.resume(calcMd5(continuation.context[FilePath]!!.path)) 
         log("after resume.") 
     } 
     log("in coroutine. After suspend. result = $result") 
 } 

suspendCoroutine 這個(gè)方法將外部的代碼執(zhí)行權(quán)拿走,并轉(zhuǎn)入傳入的 Lambda 表達(dá)式中,而這個(gè)表達(dá)式當(dāng)中的操作就對(duì)應(yīng)異步的耗時(shí)操作了,在這里我們“計(jì)算”出了 Md5 值,接著調(diào)用 continuation.resume 將結(jié)果傳了出去,傳給了誰(shuí)呢?傳給了 suspendCoroutine 的返回值也即 result,這時(shí)候協(xié)程繼續(xù)執(zhí)行,打印 result 結(jié)束。

下面就是運(yùn)行結(jié)果了:

 2017-01-30T06:43:52.284Z [main] before coroutine 
 2017-01-30T06:43:52.422Z [main] in coroutine. Before suspend. 
 2017-01-30T06:43:52.423Z [main] in suspend block. 
 2017-01-30T06:43:52.423Z [main] calc md5 for test.zip. 
 2017-01-30T06:43:53.426Z [main] after resume. 
 2017-01-30T06:43:53.427Z [main] in coroutine. After suspend. result = 1485758633426 
 2017-01-30T06:43:53.427Z [main] resume: 1485758633426 
 2017-01-30T06:43:53.427Z [main] after coroutine 

細(xì)心的讀者肯定一看就發(fā)現(xiàn),所謂的異步操作是怎么個(gè)異步法?從日志上面看,明明上面這段代碼就是順序執(zhí)行的嘛,不然 after coroutine 這句日志為什么非要等到最后才打?。?/p>

還有,整個(gè)程序都只運(yùn)行在了主線程上,我們的日志足以說(shuō)明這一點(diǎn)了,根本沒(méi)有異步嘛。難道說(shuō)協(xié)程就是一個(gè)大騙子??

3. 實(shí)現(xiàn)異步

這一部分我們就要回答上一節(jié)留下的問(wèn)題。不過(guò)在此之前,我們?cè)賮?lái)回顧一下協(xié)程存在的意義:讓異步代碼看上去像同步代碼,直接自然易懂。至于它如何做到這一點(diǎn),可能各家的語(yǔ)言實(shí)現(xiàn)各有不同,但協(xié)程給人的感覺(jué)更像是底層并發(fā) API(比如線程)的語(yǔ)法糖。當(dāng)然,如果你愿意,我們通常所謂的線程也可以被稱作操作系統(tǒng)級(jí) API 的語(yǔ)法糖了吧,畢竟各家語(yǔ)言對(duì)于線程的實(shí)現(xiàn)也各有不同,這個(gè)就不是我們今天要討論的內(nèi)容了。

不管怎么樣,你只需要知道,協(xié)程的異步需要依賴比它更底層的 API 支持,那么在 Kotlin 當(dāng)中,這個(gè)所謂的底層 API 就非線程莫屬了。

知道了這一點(diǎn),我們就要考慮想辦法來(lái)把前面的示例完善一下了。

首先我們實(shí)例化一個(gè)線程池:

 private val executor = Executors.newSingleThreadScheduledExecutor { 
     Thread(it, "scheduler") 
 } 

接著我們把計(jì)算 Md5 的部分交給線程池去運(yùn)行:

 asyncCalcMd5("test.zip") { 
     log("in coroutine. Before suspend.") 
     //暫停我們的線程,并開(kāi)始執(zhí)行一段耗時(shí)操作 
     val result: String = suspendCoroutine { 
         continuation -> 
         log("in suspend block.") 
         executor.submit { 
             continuation.resume(calcMd5(continuation.context[FilePath]!!.path)) 
             log("after resume.") 
         } 
     } 
     log("in coroutine. After suspend. result = $result") 
     executor.shutdown() 
 } 

那么結(jié)果呢?

 2017-01-30T07:18:04.496Z [main] before coroutine 
 2017-01-30T07:18:04.754Z [main] in coroutine. Before suspend. 
 2017-01-30T07:18:04.757Z [main] in suspend block. 
 2017-01-30T07:18:04.765Z [main] after coroutine 
 2017-01-30T07:18:04.765Z [scheduler] calc md5 for test.zip. 
 2017-01-30T07:18:05.769Z [scheduler] in coroutine. After suspend. result = 1485760685768 
 2017-01-30T07:18:05.769Z [scheduler] resume: 1485760685768 
 2017-01-30T07:18:05.769Z [scheduler] after resume. 

我們看到在協(xié)程被暫停的那一刻,協(xié)程外面的代碼被執(zhí)行了。一段時(shí)間之后,協(xié)程被繼續(xù)執(zhí)行,打印結(jié)果。

截止到現(xiàn)在,我們用協(xié)程來(lái)實(shí)現(xiàn)異步操作的功能已經(jīng)實(shí)現(xiàn)。

你可能要問(wèn),如果我們想要完成異步操作,直接用線程池加回調(diào)豈不更直接簡(jiǎn)單,為什么要用協(xié)程呢,搞得代碼這么讓人費(fèi)解不說(shuō),也沒(méi)有變的很簡(jiǎn)單啊。

說(shuō)的對(duì),如果我們實(shí)際當(dāng)中把協(xié)程的代碼都寫成這樣,肯定會(huì)被蛋疼死,我前面展示給大家的,是 Kotlin 標(biāo)準(zhǔn)庫(kù)當(dāng)中最為基礎(chǔ)的 API,看起來(lái)非常的原始也是理所應(yīng)當(dāng)?shù)模绻覀儗?duì)其加以封裝,那效果肯定大不一樣。

除此之外,在高并發(fā)的場(chǎng)景下,多個(gè)協(xié)程可以共享一個(gè)或者多個(gè)線程,性能可能會(huì)要好一些。舉個(gè)簡(jiǎn)單的例子,一臺(tái)服務(wù)器有 1k 用戶與之連接,如果我們采用類似于 Tomcat 的實(shí)現(xiàn)方式,一個(gè)用戶開(kāi)一個(gè)線程去處理請(qǐng)求,那么我們將要開(kāi) 1k 個(gè)線程,這算是個(gè)不小的數(shù)目了;而我們?nèi)绻褂脜f(xié)程,為每一個(gè)用戶創(chuàng)建一個(gè)協(xié)程,考慮到同一時(shí)刻并不是所有用戶都需要數(shù)據(jù)傳輸,因此我們并不需要同時(shí)處理所有用戶的請(qǐng)求,那么這時(shí)候可能只需要幾個(gè)專門的 IO 線程和少數(shù)來(lái)承載用戶請(qǐng)求對(duì)應(yīng)的協(xié)程的線程,只有當(dāng)用戶有數(shù)據(jù)傳輸事件到來(lái)的時(shí)候才去響應(yīng),其他時(shí)間直接掛起,這種事件驅(qū)動(dòng)的服務(wù)器顯然對(duì)資源的消耗要小得多。

4. 進(jìn)一步封裝

這一節(jié)的內(nèi)容較多的參考了 Kotlin 官方的 Coroutine Example,里面有更多的例子,大家可以參考學(xué)習(xí)。

4.1 異步

剛才那個(gè)示例讓我們感覺(jué)到,寫個(gè)協(xié)程調(diào)用異步代碼實(shí)在太原始了,所以我們決定對(duì)它做一下封裝。如果我們能在調(diào)用 suspendCoroutine 的時(shí)候直接把后面的代碼攔截,并切到線程池當(dāng)中執(zhí)行,那么我們就不用每次自己搞一個(gè)線程池來(lái)做這事兒了,嗯,讓我們研究下有什么辦法可以做到這一點(diǎn)。

攔截...怎么攔截呢?

 public interface ContinuationInterceptor : CoroutineContext.Element { 
     companion object Key : CoroutineContext.Key<ContinuationInterceptor> 
   
     public fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> 
 } 

我們發(fā)現(xiàn),Kotlin 的協(xié)程 API 當(dāng)中提供了這么一個(gè)攔截器,可以把協(xié)程的操作攔截,傳入的是原始的 Continuation,返回的是我們經(jīng)過(guò)線程切換的 Continuation,這樣就可以實(shí)現(xiàn)我們的目的了。

 open class Pool(val pool: ForkJoinPool)  
    : AbstractCoroutineContextElement(ContinuationInterceptor),  
    ContinuationInterceptor { 
   
     override fun <T> interceptContinuation(continuation: Continuation<T>) 
        : Continuation<T> = 
         PoolContinuation(pool,  
            //下面這段代碼是要查找其他攔截器,并保證能調(diào)用它們的攔截方法 
            continuation.context.fold(continuation, { cont, element -> 
                if (element != this@Pool && element is ContinuationInterceptor) 
                    element.interceptContinuation(cont) else cont 
            })) 
 } 
   
 private class PoolContinuation<T>( 
         val pool: ForkJoinPool, 
         val continuation: Continuation<T> 
 ) : Continuation<T> by continuation { 
     override fun resume(value: T) { 
         if (isPoolThread()) continuation.resume(value) 
         else pool.execute { continuation.resume(value) } 
     } 
   
     override fun resumeWithException(exception: Throwable) { 
         if (isPoolThread()) continuation.resumeWithException(exception) 
         else pool.execute { continuation.resumeWithException(exception) } 
     } 
   
     fun isPoolThread(): Boolean = (Thread.currentThread() as? ForkJoinWorkerThread)?.pool == pool 
 } 

這個(gè) Pool 是什么鬼?我們讓它繼承 AbstractCoroutineContextElement 表明它其實(shí)就是我們需要的上下文。實(shí)際上這個(gè)上下文可以給任意協(xié)程使用,于是我們?cè)俣x一個(gè) object:

 object CommonPool : Pool(ForkJoinPool.commonPool()) 

有了這個(gè),我們就可以把沒(méi)加線程池的版本改改了:

 fun main(args: Array<String>) { 
     log("before coroutine") 
     //啟動(dòng)我們的協(xié)程 
     asyncCalcMd5("test.zip") { 
         ... 
     } 
     log("after coroutine") 
     //加這句的原因是防止程序在協(xié)程運(yùn)行完之前停止 
     CommonPool.pool.awaitTermination(10000, TimeUnit.MILLISECONDS) 
 } 
   
 ... 
   
 fun asyncCalcMd5(path: String, block: suspend () -> String) { 
     val continuation = object : Continuation<String> { 
         override val context: CoroutineContext 
            //注意這個(gè)寫法,上下文可以通過(guò) + 來(lái)組合使用 
             get() = FilePath(path) + CommonPool 
   
         ... 
     } 
     block.startCoroutine(continuation) 
 } 
   
 ... 

那么運(yùn)行結(jié)果呢?

 2017-01-30T09:13:11.183Z [main] before coroutine 
 2017-01-30T09:13:11.334Z [main] after coroutine 
 2017-01-30T09:13:11.335Z [ForkJoinPool.commonPool-worker-1] in coroutine. Before suspend. 
 2017-01-30T09:13:11.337Z [ForkJoinPool.commonPool-worker-1] in suspend block. 
 2017-01-30T09:13:11.337Z [ForkJoinPool.commonPool-worker-1] calc md5 for test.zip. 
 2017-01-30T09:13:12.340Z [ForkJoinPool.commonPool-worker-1] after resume. 
 2017-01-30T09:13:12.341Z [ForkJoinPool.commonPool-worker-1] in coroutine. After suspend. result = 1485767592340 
 2017-01-30T09:13:12.341Z [ForkJoinPool.commonPool-worker-1] resume: 1485767592340 

我們看到程序已經(jīng)非常完美的實(shí)現(xiàn)異步調(diào)用。顯然,這種寫法要比線程池回調(diào)的寫法看上去順理成章得多。

4.2 啟動(dòng)協(xié)程

在討論完異步的封裝后,有人肯定還是會(huì)提出新問(wèn)題:?jiǎn)?dòng)協(xié)程的寫法是不是有點(diǎn)兒?jiǎn)铝税。繘](méi)錯(cuò),每次構(gòu)造一個(gè) Continuation,也沒(méi)干多少事兒,實(shí)在沒(méi)什么必要,干脆封裝一個(gè)通用的版本得了:

 class StandaloneCoroutine(override val context: CoroutineContext): Continuation<Unit> { 
     override fun resume(value: Unit) {} 
   
     override fun resumeWithException(exception: Throwable) { 
        //處理異常 
         val currentThread = Thread.currentThread() 
         currentThread.uncaughtExceptionHandler.uncaughtException(currentThread, exception) 
     } 
 } 

這樣就好辦了,我們每次啟動(dòng)協(xié)程只需要針對(duì)當(dāng)前協(xié)程提供特定的上下文即可,那么我們是不是再把啟動(dòng)的那個(gè)函數(shù)改改呢?

 fun launch(context: CoroutineContext, block: suspend () -> Unit) = 
         block.startCoroutine(StandaloneCoroutine(context)) 

有了這個(gè),我們前面的代碼就可以進(jìn)一步修改:

 fun main(args: Array<String>) { 
     log("before coroutine") 
     //啟動(dòng)我們的協(xié)程 
     launch(FilePath("test.zip") + CommonPool) { 
         log("in coroutine. Before suspend.") 
         //暫停我們的線程,并開(kāi)始執(zhí)行一段耗時(shí)操作 
         val result: String = suspendCoroutine { 
             continuation -> 
             log("in suspend block.") 
             continuation.resume(calcMd5(continuation.context[FilePath]!!.path)) 
             log("after resume.") 
         } 
         log("in coroutine. After suspend. result = $result") 
     } 
     log("after coroutine") 
     CommonPool.pool.awaitTermination(10000, TimeUnit.MILLISECONDS) 
 } 
   
 /** 
  * 上下文,用來(lái)存放我們需要的信息,可以靈活的自定義 
  */ 
 class FilePath(val path: String) : AbstractCoroutineContextElement(Key) { 
     companion object Key : CoroutineContext.Key<FilePath> 
 } 
 fun calcMd5(path: String): String { 
     log("calc md5 for $path.") 
     //暫時(shí)用這個(gè)模擬耗時(shí) 
     Thread.sleep(1000) 
     //假設(shè)這就是我們計(jì)算得到的 MD5 值 
     return System.currentTimeMillis().toString() 
 } 

運(yùn)行結(jié)果自然也沒(méi)什么好說(shuō)的。

4.3 暫停協(xié)程

暫停協(xié)程這塊兒也太亂了,看著莫名其妙的,能不能直白一點(diǎn)兒呢?其實(shí)我們的代碼不過(guò)是想要獲取 Md5 的值,所以如果能寫成下面這樣就好了:

 val result = calcMd5(continuation.context[FilePath]!!.path).await() 

毋庸置疑,這肯定是可以的。想一下,有哪個(gè)類可以支持我們直接阻塞線程,等到獲取到結(jié)果之后再返回呢?當(dāng)然是 Future 了。

 suspend fun <T> CompletableFuture<T>.await(): T { 
     return suspendCoroutine { 
         continuation -> 
         whenComplete { result, e -> 
             if (e == null) continuation.resume(result) 
             else continuation.resumeWithException(e) 
         } 
     } 
 } 

我們干脆就直接給 CompletableFuture 定義一個(gè)擴(kuò)展方法,當(dāng)中只是用來(lái)掛起協(xié)程,并在結(jié)果拿到之后繼續(xù)執(zhí)行協(xié)程。這樣,我們的代碼可以進(jìn)一步修改:

 fun main(args: Array<String>) { 
     log("before coroutine") 
     //啟動(dòng)我們的協(xié)程 
     val coroutineContext = FilePath("test.zip") + CommonPool 
     launch(coroutineContext) { 
         log("in coroutine. Before suspend.") 
         //暫停我們的線程,并開(kāi)始執(zhí)行一段耗時(shí)操作 
         val result: String = calcMd5(coroutineContext[FilePath]!!.path).await() 
         log("in coroutine. After suspend. result = $result") 
     } 
     log("after coroutine") 
     CommonPool.pool.awaitTermination(10, TimeUnit.SECONDS) 
 } 
   
 fun calcMd5(path: String): CompletableFuture<String> = CompletableFuture.supplyAsync { 
     log("calc md5 for $path.") 
     //暫時(shí)用這個(gè)模擬耗時(shí) 
     Thread.sleep(1000) 
     //假設(shè)這就是我們計(jì)算得到的 MD5 值 
     System.currentTimeMillis().toString() 
 } 
   
 ... 省略掉一些沒(méi)有修改的代碼 ...  

4.4 帶有 Receiver 的協(xié)程

不知道大家注意到?jīng)]有, 4.3 的代碼中有個(gè)地方比較別扭:

 val coroutineContext = FilePath("test.zip") + CommonPool 
 launch(coroutineContext) { 
    ... 
    //在協(xié)程內(nèi)部想要訪問(wèn)上下文居然需要用到外部的變量 
     val result: String = calcMd5(coroutineContext[FilePath]!!.path).await() 
    ... 
 } 

在協(xié)程內(nèi)部想要訪問(wèn)上下文居然需要用到外部的變量。這個(gè)上下文畢竟是協(xié)程自己的,自己居然沒(méi)有辦法直接獲取到,一點(diǎn)兒都不自然。

其實(shí)這也不是沒(méi)有辦法,startCoroutine 其實(shí)還有一個(gè)帶 receiver 的版本:

 public fun <R, T> (suspend R.() -> T).startCoroutine( 
         receiver: R, 
         completion: Continuation<T> 

也就是說(shuō),我們不僅可以傳入一個(gè)獨(dú)立的函數(shù)作為協(xié)程的代碼塊,還可以將一個(gè)對(duì)象的方法傳入,也就是說(shuō),我們完全可以在啟動(dòng)協(xié)程的時(shí)候?yàn)樗付ㄒ粋€(gè) receiver:

 fun <T> launch( 
    receiver: T,  
    context: CoroutineContext,  
    block: suspend T.() -> Unit)  
    = block.startCoroutine(receiver, StandaloneCoroutine(context)) 

我們修改了 launch,加入了 receiver,于是我們的代碼也可以這么改:

 val coroutineContext = FilePath("test.zip") + CommonPool 
 //需要傳入 receiver 
 launch(coroutineContext, coroutineContext) { 
    ... 
    //注意下面直接用 this 來(lái)獲取路徑 
     val result: String = calcMd5(this[FilePath]!!.path).await() 
    ... 
 } 

如果你覺(jué)得絕大多數(shù)情況下 receiver 都會(huì)是上下文那么上面的代碼還可以接著簡(jiǎn)化:

 fun launchWithContext( 
    context: CoroutineContext,  
    block: suspend CoroutineContext.() -> Unit)  
    = launch(context, context, block) 
 launchWithContext(FilePath("test.zip") + CommonPool) { 
     log("in coroutine. Before suspend.") 
     //暫停我們的線程,并開(kāi)始執(zhí)行一段耗時(shí)操作 
     val result: String = calcMd5(this[FilePath]!!.path).await() 
     log("in coroutine. After suspend. result = $result") 
 } 

截止到現(xiàn)在,我們對(duì)最初的代碼做了各種封裝,這些封裝后的代碼可以在各種場(chǎng)景下直接使用,于是我們的協(xié)程代碼也得到了大幅簡(jiǎn)化。另外,不知道大家有沒(méi)有注意到,協(xié)程當(dāng)中異常的處理也要比直接用線程寫回調(diào)的方式容易的多,我們只需要在 Continuation 當(dāng)中覆寫 resumeWithException 方法就可以做到這一點(diǎn)。

5. 拿來(lái)主義:Kotlinx.Coroutine

Kotlinx.Coroutine 是官方單獨(dú)發(fā)出來(lái)的一個(gè) Coroutine 的庫(kù),這個(gè)庫(kù)為什么沒(méi)有隨著標(biāo)準(zhǔn)庫(kù)一并發(fā)出來(lái),想必大家從其包名就能略窺一二:kotlinx.coroutines.experimental,experimental,還處于試驗(yàn)階段。不過(guò)既然敢隨著 1.1 Beta 一并發(fā)出來(lái),也說(shuō)明后面的大方向不會(huì)太遠(yuǎn),大家可以直接開(kāi)始嘗試其中的 API 了。

應(yīng)該說(shuō),Kotlinx.Coroutine 做的事情跟我們?cè)谏弦还?jié)做的事情是相同的,只不過(guò)它在這個(gè)方向上面走的更遠(yuǎn)。有關(guān)它的一些用法和細(xì)節(jié),我們將在下一期給大家介紹。

6. 小結(jié)

本文主要對(duì) Kotlin 1.1Beta 標(biāo)準(zhǔn)庫(kù)的 Coroutine API 做了介紹,也給出了相應(yīng)的示例向大家展示 Coroutine 能為我們帶來(lái)什么。

協(xié)程是干什么的?是用來(lái)讓異步代碼更具表現(xiàn)力的。如果運(yùn)用得當(dāng),它將讓我們免于回調(diào)嵌套之苦,并發(fā)加鎖之痛,使我們能夠利用我們有限的時(shí)間寫出更有魅力的程序。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容