深入理解 Kotlin Coroutine (一)

原文鏈接:https://github.com/enbandari/Kotlin-Tutorials

本文主要介紹 Kotlin Coroutine 的基礎(chǔ) API,有關(guān) Kotlinx.Coroutine 的內(nèi)容,我們將在下一期給大家介紹。由于本人水平有限,如果大家有什么異議,歡迎直接拋出來跟我討論。

1. 什么是 Coroutine

Coroutine 被翻譯成了“協(xié)程”,意思就是要各個子任務(wù)協(xié)作運行的意思,所以大家一下就明白了它被創(chuàng)造出來是要解決異步問題的。

我們寫 Java 的程序員,對線程更熟悉一些。線程是比進程更小一級的運行單位,它的調(diào)度由操作系統(tǒng)來完成,所以我們只管 new Thread 和 start,至于什么時候 run,什么時候 run 完,我們都沒辦法預(yù)見。

 Thread t = new Thread(task); 
 t.start(); 

盡管有諸多不可控的因素,不過我們可以肯定的是起了一個新的線程并啟動它之后,當(dāng)前線程并不會受到阻塞。如果大家再往深處想想,CPU 在任意時刻運行什么進程及其線程,是操作系統(tǒng)決定的,但歸根結(jié)底一個單線程的 CPU 在任一時刻只能運行一個任務(wù)。

那么協(xié)程呢?協(xié)程的調(diào)度是應(yīng)用層完成的,比如我們說 Lua 支持協(xié)程,那么各個協(xié)程如何運行,這一調(diào)度工作實際上是 Lua 自己的虛擬機來完成的。這個調(diào)度與線程調(diào)度有著比較大的差別,線程調(diào)度是搶占式調(diào)度,很有可能線程 A 運行得美滋滋的,線程 B 突然把 CPU 搶過來,跟 A 說“你給我下去吧你”,于是線程 A 只能干瞪眼沒辦法;而協(xié)程的調(diào)度是非搶占式的,目前常見的各種支持協(xié)程的語言實現(xiàn)中都有 yield 關(guān)鍵字,它有“妥協(xié)、退讓”的意思,如果一個協(xié)程執(zhí)行到一段代碼需要歇會兒,那么它將把執(zhí)行權(quán)讓出來,如果它不這么做,沒人跟它搶。

在 接觸 Kotlin 的協(xié)程之前呢,我們先給大家看一個 Lua 的例子,比較直觀:

 function foo(a) 
     print("foo", a) 
     return coroutine.yield(2 * a) 
 end 
   
 co = coroutine.create(function ( a, b ) 
     print("co-body", a, b) 
     local r = foo(a + 1) 
     print("co-body", r) 
     local r, s = coroutine.yield(a + b, a - b) 
     print("co-body", r, s) 
     return b, "end" 
 end) 
   
 print("main", coroutine.resume(co, 1, 10)) 
 print("main", coroutine.resume(co, "r")) 
 print("main", coroutine.resume(co, "x", "y")) 
 print("main", coroutine.resume(co, "x", "y")) 

運行結(jié)果如下:

 co-body    1   10 
 foo    2 
 main   true    4 
 co-body    r 
 main   true    11  -9 
 co-body    x   y 
 main   true    10  end 
 main   false   cannot resume dead coroutine 

首先定義了一個 foo 函數(shù),然后創(chuàng)建 coroutine,創(chuàng)建了之后還需要調(diào)用 resume 才能執(zhí)行協(xié)程,運行過程是謙讓的,是交替的:

圖中數(shù)字表示第n次

協(xié)程為我們的程序提供了一種暫停的能力,就好像狀態(tài)機,只有等到下一次輸入,它才做狀態(tài)轉(zhuǎn)移。顯然,用協(xié)程來描述一個狀態(tài)機是再合適不過的了。

也許大家對 lua 的語法不是很熟悉,不過沒關(guān)系,上面的例子只需要知道大概是在干什么就行:這例子就好像,main 和 Foo 在交替干活,有點兒像 A B 兩個人分工協(xié)作,A 干一會兒 B 來,B 干一會兒,再讓 A 來一樣。如果我們用線程來描述這個問題,那么可能會用到很多回調(diào),相信寫 Js 的兄弟聽到這兒要感到崩潰了,因為 Js 的代碼寫著寫著就容易回調(diào)滿天飛,業(yè)務(wù)邏輯的實現(xiàn)越來越抽象,可讀性越來越差;而用協(xié)程的話,就好像一個很平常的同步操作一樣,一點兒異步任務(wù)的感覺都沒有。

我們前面提到的協(xié)程的非搶占調(diào)度方式,以及這個交替執(zhí)行代碼的例子,基本上可以說明協(xié)程實際上致力于用同步一樣的代碼來完成異步任務(wù)的運行。

一句話,有了協(xié)程,你的異步程序看起來就像同步代碼一樣。

2. Kotlin 協(xié)程初體驗

Kotlin 1.1 對協(xié)程的基本支持都在 Kotlin 標(biāo)準(zhǔn)庫當(dāng)中,主要涉及兩個類和幾個包級函數(shù)和擴展方法:

  • CoroutineContext,協(xié)程的上下文,這個上下文可以是多個的組合,組合的上下文可以通過 key 來獲取。EmptyCoroutineContext 是一個空實現(xiàn),沒有任何功能,如果我們在使用協(xié)程時不需要上下文,那么我們就用這個對象作為一個占位即可。上下文這個東西,不管大家做什么應(yīng)用,總是能遇到,比如 Android 里面的 Context,JSP 里面的 PageContext 等等,他們扮演的角色都大同小異:資源管理,數(shù)據(jù)持有等等,協(xié)程的上下文也基本上是如此。

  • Continuation,顧名思義,繼續(xù)、持續(xù)的意思。我們前面說過,協(xié)程提供了一種暫停的能力,可繼續(xù)執(zhí)行才是最終的目的,Continuation 有兩個方法,一個是 resume,如果我們的程序沒有任何異常,那么直接調(diào)用這個方法并傳入需要返回的值;另一個是 resumeWithException,如果我們的程序出了異常,那我們可以通過調(diào)用這個方法把異常傳遞出去。

  • 協(xié)程的基本操作,包括創(chuàng)建、啟動、暫停和繼續(xù),繼續(xù)的操作在 Continuation 當(dāng)中,剩下的三個都是包級函數(shù)或擴展方法:

這幾個類和函數(shù)其實與我們前面提到的 Lua 的協(xié)程 API 非常相似,都是協(xié)程最基礎(chǔ)的 API。

除此之外,Kotlin 還增加了一個關(guān)鍵字:suspend,用作修飾會被暫停的函數(shù),被標(biāo)記為 suspend 的函數(shù)只能運行在協(xié)程或者其他 suspend 函數(shù)當(dāng)中。

好,介紹完這些基本概念,讓我們來看一個例子:

 fun main(args: Array<String>) { 
     log("before coroutine") 
     //啟動我們的協(xié)程 
     asyncCalcMd5("test.zip") { 
         log("in coroutine. Before suspend.") 
         //暫停我們的線程,并開始執(zhí)行一段耗時操作 
         val result: String = suspendCoroutine { 
             continuation -> 
             log("in suspend block.") 
             continuation.resume(calcMd5(continuation.context[FilePath]!!.path)) 
             log("after resume.") 
         } 
         log("in coroutine. After suspend. result = $result") 
     } 
     log("after coroutine") 
 } 
   
 /** 
  * 上下文,用來存放我們需要的信息,可以靈活的自定義 
  */ 
 class FilePath(val path: String): AbstractCoroutineContextElement(FilePath){ 
     companion object Key : CoroutineContext.Key<FilePath> 
 } 
   
 fun asyncCalcMd5(path: String, block: suspend () -> Unit) { 
     val continuation = object : Continuation<Unit> { 
         override val context: CoroutineContext 
             get() = FilePath(path) 
   
         override fun resume(value: Unit) { 
             log("resume: $value") 
         } 
   
         override fun resumeWithException(exception: Throwable) { 
             log(exception.toString()) 
         } 
     } 
     block.startCoroutine(continuation) 
 } 
   
 fun calcMd5(path: String): String{ 
     log("calc md5 for $path.") 
     //暫時用這個模擬耗時 
     Thread.sleep(1000) 
     //假設(shè)這就是我們計算得到的 MD5 值 
     return System.currentTimeMillis().toString() 
 } 

這段程序在模擬計算文件的 Md5 值。我們知道,文件的 Md5 值計算是一項耗時操作,所以我們希望啟動一個協(xié)程來處理這個耗時任務(wù),并在任務(wù)運行結(jié)束時打印出來計算的結(jié)果。

我們先來一段一段分析下這個示例:

 /** 
  * 上下文,用來存放我們需要的信息,可以靈活的自定義 
  */ 
 class FilePath(val path: String): AbstractCoroutineContextElement(FilePath){ 
     companion object Key : CoroutineContext.Key<FilePath> 
 } 

我們在計算過程中需要知道計算哪個文件的 Md5,所以我們需要通過上下文把這個路徑傳入?yún)f(xié)程當(dāng)中。如果有多個數(shù)據(jù),也可以一并添加進去,在運行當(dāng)中,我們可以通過 Continuation 的實例拿到上下文,進而獲取到這個路徑:

 continuation.context[FilePath]!!.path 

接著,我們再來看下 Continuation:

 val continuation = object : Continuation<Unit> { 
     override val context: CoroutineContext 
         get() = FilePath(path) 
   
     override fun resume(value: Unit) { 
         log("resume: $value") 
     } 
   
     override fun resumeWithException(exception: Throwable) { 
         log(exception.toString()) 
     } 
 } 

我們除了給定了 FilePath 這樣一個上下文之外就是簡單的打了幾行日志,比較簡單。這里傳入的 Continuation 當(dāng)中的 resume 和 resumeWithException 只有在協(xié)程最終執(zhí)行完成后才會被調(diào)用,這一點需要注意一下,也正是因為如此,startCoroutine 把它叫做 completion:

 public fun <T> (suspend  () -> T).startCoroutine(completion: Continuation<T> 

那么下面我們看下最關(guān)鍵的這段代碼:

 asyncCalcMd5("test.zip") { 
     log("in coroutine. Before suspend.") 
     //暫停我們的協(xié)程,并開始執(zhí)行一段耗時操作 
     val result: String = suspendCoroutine { 
         continuation -> 
         log("in suspend block.") 
         continuation.resume(calcMd5(continuation.context[FilePath]!!.path)) 
         log("after resume.") 
     } 
     log("in coroutine. After suspend. result = $result") 
 } 

suspendCoroutine 這個方法將外部的代碼執(zhí)行權(quán)拿走,并轉(zhuǎn)入傳入的 Lambda 表達式中,而這個表達式當(dāng)中的操作就對應(yīng)異步的耗時操作了,在這里我們“計算”出了 Md5 值,接著調(diào)用 continuation.resume 將結(jié)果傳了出去,傳給了誰呢?傳給了 suspendCoroutine 的返回值也即 result,這時候協(xié)程繼續(xù)執(zhí)行,打印 result 結(jié)束。

下面就是運行結(jié)果了:

 2017-01-30T06:43:52.284Z [main] before coroutine 
 2017-01-30T06:43:52.422Z [main] in coroutine. Before suspend. 
 2017-01-30T06:43:52.423Z [main] in suspend block. 
 2017-01-30T06:43:52.423Z [main] calc md5 for test.zip. 
 2017-01-30T06:43:53.426Z [main] after resume. 
 2017-01-30T06:43:53.427Z [main] in coroutine. After suspend. result = 1485758633426 
 2017-01-30T06:43:53.427Z [main] resume: 1485758633426 
 2017-01-30T06:43:53.427Z [main] after coroutine 

細心的讀者肯定一看就發(fā)現(xiàn),所謂的異步操作是怎么個異步法?從日志上面看,明明上面這段代碼就是順序執(zhí)行的嘛,不然 after coroutine 這句日志為什么非要等到最后才打???

還有,整個程序都只運行在了主線程上,我們的日志足以說明這一點了,根本沒有異步嘛。難道說協(xié)程就是一個大騙子??

3. 實現(xiàn)異步

這一部分我們就要回答上一節(jié)留下的問題。不過在此之前,我們再來回顧一下協(xié)程存在的意義:讓異步代碼看上去像同步代碼,直接自然易懂。至于它如何做到這一點,可能各家的語言實現(xiàn)各有不同,但協(xié)程給人的感覺更像是底層并發(fā) API(比如線程)的語法糖。當(dāng)然,如果你愿意,我們通常所謂的線程也可以被稱作操作系統(tǒng)級 API 的語法糖了吧,畢竟各家語言對于線程的實現(xiàn)也各有不同,這個就不是我們今天要討論的內(nèi)容了。

不管怎么樣,你只需要知道,協(xié)程的異步需要依賴比它更底層的 API 支持,那么在 Kotlin 當(dāng)中,這個所謂的底層 API 就非線程莫屬了。

知道了這一點,我們就要考慮想辦法來把前面的示例完善一下了。

首先我們實例化一個線程池:

 private val executor = Executors.newSingleThreadScheduledExecutor { 
     Thread(it, "scheduler") 
 } 

接著我們把計算 Md5 的部分交給線程池去運行:

 asyncCalcMd5("test.zip") { 
     log("in coroutine. Before suspend.") 
     //暫停我們的線程,并開始執(zhí)行一段耗時操作 
     val result: String = suspendCoroutine { 
         continuation -> 
         log("in suspend block.") 
         executor.submit { 
             continuation.resume(calcMd5(continuation.context[FilePath]!!.path)) 
             log("after resume.") 
         } 
     } 
     log("in coroutine. After suspend. result = $result") 
     executor.shutdown() 
 } 

那么結(jié)果呢?

 2017-01-30T07:18:04.496Z [main] before coroutine 
 2017-01-30T07:18:04.754Z [main] in coroutine. Before suspend. 
 2017-01-30T07:18:04.757Z [main] in suspend block. 
 2017-01-30T07:18:04.765Z [main] after coroutine 
 2017-01-30T07:18:04.765Z [scheduler] calc md5 for test.zip. 
 2017-01-30T07:18:05.769Z [scheduler] in coroutine. After suspend. result = 1485760685768 
 2017-01-30T07:18:05.769Z [scheduler] resume: 1485760685768 
 2017-01-30T07:18:05.769Z [scheduler] after resume. 

我們看到在協(xié)程被暫停的那一刻,協(xié)程外面的代碼被執(zhí)行了。一段時間之后,協(xié)程被繼續(xù)執(zhí)行,打印結(jié)果。

截止到現(xiàn)在,我們用協(xié)程來實現(xiàn)異步操作的功能已經(jīng)實現(xiàn)。

你可能要問,如果我們想要完成異步操作,直接用線程池加回調(diào)豈不更直接簡單,為什么要用協(xié)程呢,搞得代碼這么讓人費解不說,也沒有變的很簡單啊。

說的對,如果我們實際當(dāng)中把協(xié)程的代碼都寫成這樣,肯定會被蛋疼死,我前面展示給大家的,是 Kotlin 標(biāo)準(zhǔn)庫當(dāng)中最為基礎(chǔ)的 API,看起來非常的原始也是理所應(yīng)當(dāng)?shù)模绻覀儗ζ浼右苑庋b,那效果肯定大不一樣。

除此之外,在高并發(fā)的場景下,多個協(xié)程可以共享一個或者多個線程,性能可能會要好一些。舉個簡單的例子,一臺服務(wù)器有 1k 用戶與之連接,如果我們采用類似于 Tomcat 的實現(xiàn)方式,一個用戶開一個線程去處理請求,那么我們將要開 1k 個線程,這算是個不小的數(shù)目了;而我們?nèi)绻褂脜f(xié)程,為每一個用戶創(chuàng)建一個協(xié)程,考慮到同一時刻并不是所有用戶都需要數(shù)據(jù)傳輸,因此我們并不需要同時處理所有用戶的請求,那么這時候可能只需要幾個專門的 IO 線程和少數(shù)來承載用戶請求對應(yīng)的協(xié)程的線程,只有當(dāng)用戶有數(shù)據(jù)傳輸事件到來的時候才去響應(yīng),其他時間直接掛起,這種事件驅(qū)動的服務(wù)器顯然對資源的消耗要小得多。

4. 進一步封裝

這一節(jié)的內(nèi)容較多的參考了 Kotlin 官方的 Coroutine Example,里面有更多的例子,大家可以參考學(xué)習(xí)。

4.1 異步

剛才那個示例讓我們感覺到,寫個協(xié)程調(diào)用異步代碼實在太原始了,所以我們決定對它做一下封裝。如果我們能在調(diào)用 suspendCoroutine 的時候直接把后面的代碼攔截,并切到線程池當(dāng)中執(zhí)行,那么我們就不用每次自己搞一個線程池來做這事兒了,嗯,讓我們研究下有什么辦法可以做到這一點。

攔截...怎么攔截呢?

 public interface ContinuationInterceptor : CoroutineContext.Element { 
     companion object Key : CoroutineContext.Key<ContinuationInterceptor> 
   
     public fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> 
 } 

我們發(fā)現(xiàn),Kotlin 的協(xié)程 API 當(dāng)中提供了這么一個攔截器,可以把協(xié)程的操作攔截,傳入的是原始的 Continuation,返回的是我們經(jīng)過線程切換的 Continuation,這樣就可以實現(xiàn)我們的目的了。

 open class Pool(val pool: ForkJoinPool)  
    : AbstractCoroutineContextElement(ContinuationInterceptor),  
    ContinuationInterceptor { 
   
     override fun <T> interceptContinuation(continuation: Continuation<T>) 
        : Continuation<T> = 
         PoolContinuation(pool,  
            //下面這段代碼是要查找其他攔截器,并保證能調(diào)用它們的攔截方法 
            continuation.context.fold(continuation, { cont, element -> 
                if (element != this@Pool && element is ContinuationInterceptor) 
                    element.interceptContinuation(cont) else cont 
            })) 
 } 
   
 private class PoolContinuation<T>( 
         val pool: ForkJoinPool, 
         val continuation: Continuation<T> 
 ) : Continuation<T> by continuation { 
     override fun resume(value: T) { 
         if (isPoolThread()) continuation.resume(value) 
         else pool.execute { continuation.resume(value) } 
     } 
   
     override fun resumeWithException(exception: Throwable) { 
         if (isPoolThread()) continuation.resumeWithException(exception) 
         else pool.execute { continuation.resumeWithException(exception) } 
     } 
   
     fun isPoolThread(): Boolean = (Thread.currentThread() as? ForkJoinWorkerThread)?.pool == pool 
 } 

這個 Pool 是什么鬼?我們讓它繼承 AbstractCoroutineContextElement 表明它其實就是我們需要的上下文。實際上這個上下文可以給任意協(xié)程使用,于是我們再定義一個 object:

 object CommonPool : Pool(ForkJoinPool.commonPool()) 

有了這個,我們就可以把沒加線程池的版本改改了:

 fun main(args: Array<String>) { 
     log("before coroutine") 
     //啟動我們的協(xié)程 
     asyncCalcMd5("test.zip") { 
         ... 
     } 
     log("after coroutine") 
     //加這句的原因是防止程序在協(xié)程運行完之前停止 
     CommonPool.pool.awaitTermination(10000, TimeUnit.MILLISECONDS) 
 } 
   
 ... 
   
 fun asyncCalcMd5(path: String, block: suspend () -> String) { 
     val continuation = object : Continuation<String> { 
         override val context: CoroutineContext 
            //注意這個寫法,上下文可以通過 + 來組合使用 
             get() = FilePath(path) + CommonPool 
   
         ... 
     } 
     block.startCoroutine(continuation) 
 } 
   
 ... 

那么運行結(jié)果呢?

 2017-01-30T09:13:11.183Z [main] before coroutine 
 2017-01-30T09:13:11.334Z [main] after coroutine 
 2017-01-30T09:13:11.335Z [ForkJoinPool.commonPool-worker-1] in coroutine. Before suspend. 
 2017-01-30T09:13:11.337Z [ForkJoinPool.commonPool-worker-1] in suspend block. 
 2017-01-30T09:13:11.337Z [ForkJoinPool.commonPool-worker-1] calc md5 for test.zip. 
 2017-01-30T09:13:12.340Z [ForkJoinPool.commonPool-worker-1] after resume. 
 2017-01-30T09:13:12.341Z [ForkJoinPool.commonPool-worker-1] in coroutine. After suspend. result = 1485767592340 
 2017-01-30T09:13:12.341Z [ForkJoinPool.commonPool-worker-1] resume: 1485767592340 

我們看到程序已經(jīng)非常完美的實現(xiàn)異步調(diào)用。顯然,這種寫法要比線程池回調(diào)的寫法看上去順理成章得多。

4.2 啟動協(xié)程

在討論完異步的封裝后,有人肯定還是會提出新問題:啟動協(xié)程的寫法是不是有點兒啰嗦了???沒錯,每次構(gòu)造一個 Continuation,也沒干多少事兒,實在沒什么必要,干脆封裝一個通用的版本得了:

 class StandaloneCoroutine(override val context: CoroutineContext): Continuation<Unit> { 
     override fun resume(value: Unit) {} 
   
     override fun resumeWithException(exception: Throwable) { 
        //處理異常 
         val currentThread = Thread.currentThread() 
         currentThread.uncaughtExceptionHandler.uncaughtException(currentThread, exception) 
     } 
 } 

這樣就好辦了,我們每次啟動協(xié)程只需要針對當(dāng)前協(xié)程提供特定的上下文即可,那么我們是不是再把啟動的那個函數(shù)改改呢?

 fun launch(context: CoroutineContext, block: suspend () -> Unit) = 
         block.startCoroutine(StandaloneCoroutine(context)) 

有了這個,我們前面的代碼就可以進一步修改:

 fun main(args: Array<String>) { 
     log("before coroutine") 
     //啟動我們的協(xié)程 
     launch(FilePath("test.zip") + CommonPool) { 
         log("in coroutine. Before suspend.") 
         //暫停我們的線程,并開始執(zhí)行一段耗時操作 
         val result: String = suspendCoroutine { 
             continuation -> 
             log("in suspend block.") 
             continuation.resume(calcMd5(continuation.context[FilePath]!!.path)) 
             log("after resume.") 
         } 
         log("in coroutine. After suspend. result = $result") 
     } 
     log("after coroutine") 
     CommonPool.pool.awaitTermination(10000, TimeUnit.MILLISECONDS) 
 } 
   
 /** 
  * 上下文,用來存放我們需要的信息,可以靈活的自定義 
  */ 
 class FilePath(val path: String) : AbstractCoroutineContextElement(Key) { 
     companion object Key : CoroutineContext.Key<FilePath> 
 } 
 fun calcMd5(path: String): String { 
     log("calc md5 for $path.") 
     //暫時用這個模擬耗時 
     Thread.sleep(1000) 
     //假設(shè)這就是我們計算得到的 MD5 值 
     return System.currentTimeMillis().toString() 
 } 

運行結(jié)果自然也沒什么好說的。

4.3 暫停協(xié)程

暫停協(xié)程這塊兒也太亂了,看著莫名其妙的,能不能直白一點兒呢?其實我們的代碼不過是想要獲取 Md5 的值,所以如果能寫成下面這樣就好了:

 val result = calcMd5(continuation.context[FilePath]!!.path).await() 

毋庸置疑,這肯定是可以的。想一下,有哪個類可以支持我們直接阻塞線程,等到獲取到結(jié)果之后再返回呢?當(dāng)然是 Future 了。

 suspend fun <T> CompletableFuture<T>.await(): T { 
     return suspendCoroutine { 
         continuation -> 
         whenComplete { result, e -> 
             if (e == null) continuation.resume(result) 
             else continuation.resumeWithException(e) 
         } 
     } 
 } 

我們干脆就直接給 CompletableFuture 定義一個擴展方法,當(dāng)中只是用來掛起協(xié)程,并在結(jié)果拿到之后繼續(xù)執(zhí)行協(xié)程。這樣,我們的代碼可以進一步修改:

 fun main(args: Array<String>) { 
     log("before coroutine") 
     //啟動我們的協(xié)程 
     val coroutineContext = FilePath("test.zip") + CommonPool 
     launch(coroutineContext) { 
         log("in coroutine. Before suspend.") 
         //暫停我們的線程,并開始執(zhí)行一段耗時操作 
         val result: String = calcMd5(coroutineContext[FilePath]!!.path).await() 
         log("in coroutine. After suspend. result = $result") 
     } 
     log("after coroutine") 
     CommonPool.pool.awaitTermination(10, TimeUnit.SECONDS) 
 } 
   
 fun calcMd5(path: String): CompletableFuture<String> = CompletableFuture.supplyAsync { 
     log("calc md5 for $path.") 
     //暫時用這個模擬耗時 
     Thread.sleep(1000) 
     //假設(shè)這就是我們計算得到的 MD5 值 
     System.currentTimeMillis().toString() 
 } 
   
 ... 省略掉一些沒有修改的代碼 ...  

4.4 帶有 Receiver 的協(xié)程

不知道大家注意到?jīng)]有, 4.3 的代碼中有個地方比較別扭:

 val coroutineContext = FilePath("test.zip") + CommonPool 
 launch(coroutineContext) { 
    ... 
    //在協(xié)程內(nèi)部想要訪問上下文居然需要用到外部的變量 
     val result: String = calcMd5(coroutineContext[FilePath]!!.path).await() 
    ... 
 } 

在協(xié)程內(nèi)部想要訪問上下文居然需要用到外部的變量。這個上下文畢竟是協(xié)程自己的,自己居然沒有辦法直接獲取到,一點兒都不自然。

其實這也不是沒有辦法,startCoroutine 其實還有一個帶 receiver 的版本:

 public fun <R, T> (suspend R.() -> T).startCoroutine( 
         receiver: R, 
         completion: Continuation<T> 

也就是說,我們不僅可以傳入一個獨立的函數(shù)作為協(xié)程的代碼塊,還可以將一個對象的方法傳入,也就是說,我們完全可以在啟動協(xié)程的時候為它指定一個 receiver:

 fun <T> launch( 
    receiver: T,  
    context: CoroutineContext,  
    block: suspend T.() -> Unit)  
    = block.startCoroutine(receiver, StandaloneCoroutine(context)) 

我們修改了 launch,加入了 receiver,于是我們的代碼也可以這么改:

 val coroutineContext = FilePath("test.zip") + CommonPool 
 //需要傳入 receiver 
 launch(coroutineContext, coroutineContext) { 
    ... 
    //注意下面直接用 this 來獲取路徑 
     val result: String = calcMd5(this[FilePath]!!.path).await() 
    ... 
 } 

如果你覺得絕大多數(shù)情況下 receiver 都會是上下文那么上面的代碼還可以接著簡化:

 fun launchWithContext( 
    context: CoroutineContext,  
    block: suspend CoroutineContext.() -> Unit)  
    = launch(context, context, block) 
 launchWithContext(FilePath("test.zip") + CommonPool) { 
     log("in coroutine. Before suspend.") 
     //暫停我們的線程,并開始執(zhí)行一段耗時操作 
     val result: String = calcMd5(this[FilePath]!!.path).await() 
     log("in coroutine. After suspend. result = $result") 
 } 

截止到現(xiàn)在,我們對最初的代碼做了各種封裝,這些封裝后的代碼可以在各種場景下直接使用,于是我們的協(xié)程代碼也得到了大幅簡化。另外,不知道大家有沒有注意到,協(xié)程當(dāng)中異常的處理也要比直接用線程寫回調(diào)的方式容易的多,我們只需要在 Continuation 當(dāng)中覆寫 resumeWithException 方法就可以做到這一點。

5. 拿來主義:Kotlinx.Coroutine

Kotlinx.Coroutine 是官方單獨發(fā)出來的一個 Coroutine 的庫,這個庫為什么沒有隨著標(biāo)準(zhǔn)庫一并發(fā)出來,想必大家從其包名就能略窺一二:kotlinx.coroutines.experimental,experimental,還處于試驗階段。不過既然敢隨著 1.1 Beta 一并發(fā)出來,也說明后面的大方向不會太遠,大家可以直接開始嘗試其中的 API 了。

應(yīng)該說,Kotlinx.Coroutine 做的事情跟我們在上一節(jié)做的事情是相同的,只不過它在這個方向上面走的更遠。有關(guān)它的一些用法和細節(jié),我們將在下一期給大家介紹。

6. 小結(jié)

本文主要對 Kotlin 1.1Beta 標(biāo)準(zhǔn)庫的 Coroutine API 做了介紹,也給出了相應(yīng)的示例向大家展示 Coroutine 能為我們帶來什么。

協(xié)程是干什么的?是用來讓異步代碼更具表現(xiàn)力的。如果運用得當(dāng),它將讓我們免于回調(diào)嵌套之苦,并發(fā)加鎖之痛,使我們能夠利用我們有限的時間寫出更有魅力的程序。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容