深入理解 Kotlin coroutine (二)

原文鏈接:https://github.com/enbandari/Kotlin-Tutorials

上周我們把 Kotlin Coroutine 的基本 API 挨個(gè)講了一下,也給出了一些簡單的封裝。

真是不太給力,就在前幾天發(fā)布的 1.1 Beta 2 當(dāng)中,所有協(xié)程的 API 包名后面都加了一個(gè) experimental,這意味著 Kotlin 官方在 1.1 當(dāng)中還是傾向于將 Coroutine 作為一個(gè)實(shí)驗(yàn)性質(zhì)的特性的,不過,這也沒關(guān)系,我們學(xué)習(xí)的心不以外界的變化而變化不是?

這一篇我們基于前面的基礎(chǔ)來了解一下 Kotlinx.coroutines 這個(gè)庫的使用,如果大家對(duì)它的實(shí)現(xiàn)原理有興趣,可以再讀一讀上一篇文章,我們也可以在后面繼續(xù)寫一些文章來給深入地大家介紹。

1. 準(zhǔn)備工作

就像前面我們說到的,1.1 Beta 2 當(dāng)中協(xié)程相關(guān)的基礎(chǔ)庫的包名都增加了 experimental,所以我們在選擇 kotlinx.coroutines 的版本的時(shí)候也一定要對(duì)應(yīng)好編譯器的版本,不然...你自己想哈哈。

我們強(qiáng)調(diào)一下,kotlin 的版本選擇 1.1.0-beta-38,kotlinx.coroutines 的版本選擇 0.6-beta,如果你恰好使用 gradle,那么告訴你一個(gè)好消息,我會(huì)直接告訴你怎么配置:

buildscript { 
    ext.kotlin_version = '1.1.0-beta-38' 
  
    repositories { 
        jcenter() 
  
        maven { 
            url "http://dl.bintray.com/kotlin/kotlin-eap-1.1" 
        } 
    } 
  
    ... 
} 
  
repositories { 
    jcenter() 
  
    maven { 
        url "http://dl.bintray.com/kotlin/kotlin-eap-1.1" 
    } 
} 
  
kotlin { 
    experimental { 
        coroutines 'enable' 
    } 
} 
  
dependencies { 
    compile "org.jetbrains.kotlin:kotlin-stdlib:$kotlin_version" 
    compile 'org.jetbrains.kotlinx:kotlinx-coroutines-core:0.6-beta' 
} 

2. 一個(gè)基本的協(xié)程的例子

這個(gè)例子是 kotlinx.coroutines 的第一個(gè)小例子。

fun main(args: Array<String>) { 
    launch(CommonPool) { // create new coroutine in common thread pool 
        delay(1000L) // non-blocking delay for 1 second (default time unit is ms) 
        println("World!") // print after delay 
    } 
    println("Hello,") // main function continues while coroutine is delayed 
    Thread.sleep(2000L) // block main thread for 2 seconds to keep JVM alive 
} 

這個(gè)例子的運(yùn)行結(jié)果是:

Hello, 
World! 

其實(shí)有了上一篇文章的基礎(chǔ)我們很容易知道,launch 方法啟動(dòng)了一個(gè)協(xié)程,CommonPool 是一個(gè)有線程池的上下文,它可以負(fù)責(zé)把協(xié)程的執(zhí)行分配到合適的線程上。所以從線程的角度來看,打印的這兩句是在不同的線程上的。

20170206-063015.015 [main] Hello, 
20170206-063016.016 [ForkJoinPool.commonPool-worker-1] World! 

這段代碼的執(zhí)行效果與線程的版本看上去是一樣的:

thread(name = "MyThread") {  
    Thread.sleep(1000L)  
    log("World!")  
} 
log("Hello,")  
Thread.sleep(2000L)  

3. 主線程上的協(xié)程

我們剛才通過 launch 創(chuàng)建的協(xié)程是在 CommonPool 的線程池上面的,所以協(xié)程的運(yùn)行并不在主線程。如果我們希望直接在主線程上面創(chuàng)建協(xié)程,那怎么辦?

fun main(args: Array<String>) = runBlocking<Unit> {  
    launch(CommonPool) {  
        delay(1000L) 
        println("World!") 
    } 
    println("Hello,")  
    delay(2000L)  
} 

這個(gè)還是 kotlinx.coroutines 的例子,我們來分析一下。runBlocking 實(shí)際上也跟 launch 一樣,啟動(dòng)一個(gè)協(xié)程,只不過它傳入的 context 不會(huì)進(jìn)行線程切換,也就是說,由它創(chuàng)建的協(xié)程會(huì)直接運(yùn)行在當(dāng)前線程上。

在 runBlocking 當(dāng)中通過 launch 再創(chuàng)建一個(gè)協(xié)程,顯然,這段代碼的運(yùn)行結(jié)果與上一個(gè)例子是完全一樣的。需要注意的是,盡管我們可以在協(xié)程中通過 launch 這樣的方法創(chuàng)建協(xié)程,但不要再協(xié)程當(dāng)中通過 runBlocking 再來創(chuàng)建協(xié)程,因?yàn)檫@樣做雖然一般來說不會(huì)導(dǎo)致程序異常,不過,這樣的程序也沒有多大意義:

fun main(args: Array<String>) = runBlocking<Unit> { 
    runBlocking { 
        delay(1000L) 
        println("World!") 
    } 
    println("Hello,") 
} 

運(yùn)行結(jié)果:

World! 
Hello, 

大家看到了,嵌套的 runBlocking 實(shí)際上仍然只是一段順序代碼而已。

那么,讓我們再仔細(xì)看看前面的例子,不知道大家有沒有問題:如果我在 launch 創(chuàng)建的協(xié)程當(dāng)中多磨嘰一會(huì)兒,主線程上的協(xié)程 delay(2000L) 好像也沒多大用啊。有沒有什么方法保證協(xié)程執(zhí)行完?

4. 外部控制協(xié)程

我們在上一篇文章當(dāng)中只是對(duì)內(nèi)置的基礎(chǔ) API 進(jìn)行了簡單的封裝,而 kotlinx.coroutines 卻為我們做了非常多的事情。比如,每一個(gè)協(xié)程都看做一個(gè) Job,我們在一個(gè)協(xié)程的外部也可以控制它的運(yùn)行。

fun main(args: Array<String>) = runBlocking<Unit> { 
    val job = launch(CommonPool) {  
        delay(1000L) 
        println("World!") 
    } 
    println("Hello,") 
    job.join()  
} 

job.join 其實(shí)就是要求當(dāng)前協(xié)程等待 job 執(zhí)行完成之后再繼續(xù)執(zhí)行。

其實(shí),我們還可以取消協(xié)程,讓他直接停止執(zhí)行:

fun main(args: Array<String>) = runBlocking<Unit> { 
    val job = launch(CommonPool) {  
        delay(1000L) 
        println("World!") 
    } 
    println("Hello,") 
    job.cancel()  
} 

job.cancel 會(huì)直接終止 job 的執(zhí)行。如果 job 已經(jīng)執(zhí)行完畢,那么 job.cancel 的執(zhí)行時(shí)沒有意義的。我們也可以根據(jù) cancel 的返回值來判斷是否取消成功。

另外,cancel 還可以提供原因:

job.cancel(IllegalAccessException("World!")) 

如果我們提供了這個(gè)原因,那么被取消的協(xié)程會(huì)將它打印出來。

Hello, 
Exception in thread "main" java.lang.IllegalAccessException: World! 
    at example13.Example_13Kt$main$1.doResume(example-13.kt:14) 
    at kotlin.coroutines.experimental.jvm.internal.CoroutineImpl.resume(CoroutineImpl.kt:53) 
    at kotlinx.coroutines.experimental.DispatchedContinuation$resume$1.run(CoroutineDispatcher.kt:57) 

其實(shí),如果你自己做過對(duì)線程任務(wù)的取消,你大概會(huì)知道除非被取消的線程自己去檢查取消的標(biāo)志位,或者被 interrupt,否則取消是無法實(shí)現(xiàn)的,這有點(diǎn)兒像一個(gè)人執(zhí)意要做一件事兒,另一個(gè)人說你別做啦,結(jié)果人家壓根兒沒聽見,你說他能停下來嗎?那么我們前面的取消到底是誰去監(jiān)聽了這個(gè) cancel 操作呢?

當(dāng)然是 delay 這個(gè)操作了。其實(shí)所有 kotlinx.coroutines 當(dāng)中定義的操作都可以做到這一點(diǎn),我們對(duì)代碼稍加改動(dòng),你就會(huì)發(fā)現(xiàn)異常來自何處了:

val job = launch(CommonPool) {  
    try { 
        delay(1000L) 
        println("World!") 
    } catch(e: Exception) { 
        e.printStackTrace() 
    }finally { 
        println("finally....") 
    } 
} 
println("Hello,") 
job.cancel(IllegalAccessException("World!"))  

是的,你沒看錯(cuò),我們居然可以在協(xié)程里面對(duì) cancel 進(jìn)行捕獲,如果你愿意的話,你甚至可以繼續(xù)在這個(gè)協(xié)程里面運(yùn)行代碼,但請(qǐng)不要這樣做,下面的示例破壞了 cancel 的設(shè)計(jì)本意,所以請(qǐng)勿模仿:

val job = launch(CommonPool) {  
    try { 
        ... 
    }finally { 
        println("finally....") 
    } 
    println("I'm an EVIL!!! Hahahaha") 
} 

說這個(gè)是什么意思呢?在協(xié)程被 cancel 掉的時(shí)候,我們應(yīng)該做的其實(shí)是把戰(zhàn)場打掃干凈,比如:

val job = launch(CommonPool) { 
    val inputStream = ...
    try {
        ...
    }finally {
        inputStream.close()
    }
}

我們再來考慮下面的情形:

fun main(args: Array<String>) = runBlocking<Unit> { 
    val job = launch(CommonPool) { 
        var nextPrintTime = 0L 
        var i = 0 
        while (true) { // computation loop 
            val currentTime = System.currentTimeMillis() 
            if (currentTime >= nextPrintTime) { 
                println("I'm sleeping ${i++} ...") 
                nextPrintTime = currentTime + 500L 
            } 
        } 
    } 
    delay(1300L) // delay a bit 
    println("main: I'm tired of waiting!") 
    job.cancel() // cancels the job 
    delay(1300L) // delay a bit to see if it was cancelled.... 
    println("main: Now I can quit.") 
} 

不得不說,kotlinx.coroutines 在幾天前剛剛更新的文檔和示例非常的棒。我們看到這個(gè)例子,while(true) 會(huì)讓這個(gè)協(xié)程不斷運(yùn)行來模擬耗時(shí)計(jì)算,盡管外部調(diào)用了 job.cancel(),但由于內(nèi)部并沒有 care 自己是否被 cancel,所以這個(gè) cancel 顯然有點(diǎn)兒失敗。如果你想要在類似這種耗時(shí)計(jì)算當(dāng)中檢測當(dāng)前協(xié)程是否被取消的話,你可以這么寫:

... 
while (isActive) { // computation loop 
   ... 
} 
...       

isActive 會(huì)在 cancel 之后被置為 false。

其實(shí),通過這幾個(gè)示例大家就會(huì)發(fā)現(xiàn)協(xié)程的取消,與我們通常取消線程操作的思路非常類似,只不過人家封裝的比較好,而我們呢,每次還得自己搞一個(gè) CancelableTask 來實(shí)現(xiàn) Runnable 接口去承載自己的異步操作,想想也是夠原始呢。

5. 輕量級(jí)線程

協(xié)程時(shí)輕量級(jí)的,它擁有自己的運(yùn)行狀態(tài),但它對(duì)資源的消耗卻非常的小。其實(shí)能做到這一點(diǎn)的本質(zhì)原因,我們已經(jīng)在上一篇文章當(dāng)中提到過,一臺(tái)服務(wù)器開 1k 線程和 1k 協(xié)程來響應(yīng)服務(wù),前者對(duì)資源的消耗必然很大,而后者可能只是基于很少的幾個(gè)或幾十個(gè)線程來工作的,隨著請(qǐng)求數(shù)量的增加,協(xié)程的優(yōu)勢可能會(huì)體現(xiàn)的更加明顯。

我們來看個(gè)比較簡單的例子:

fun main(args: Array<String>) = runBlocking<Unit> { 
    val jobs = List(100_000) {  
        launch(CommonPool) { 
            delay(1000L) 
            print(".") 
        } 
    } 
    jobs.forEach { it.join() } //這里不能用 jobs.forEach(Job::join),因?yàn)?Job.join 是 suspend 方法 
} 

通過 List 這個(gè)方法,我們可以瞬間創(chuàng)建出很多對(duì)象放入返回的 List,注意到這里的 jobs 其實(shí)就是協(xié)程的一個(gè) List。

運(yùn)行上面的代碼,我們發(fā)現(xiàn) CommonPool 當(dāng)中的線程池的線程數(shù)量基本上維持在三四個(gè)就足夠了,如果我們用線程來寫上面的代碼會(huì)是什么感覺?

fun main(args: Array<String>) = runBlocking<Unit> { 
    val jobs = List(100_000) {  
        thread { 
            Thread.sleep(1000L) 
            log(".") 
        } 
    } 
    jobs.forEach(Thread::join) // Thread::join 說起來也是 1.1 的新特性呢! 
}  

運(yùn)行時(shí),在創(chuàng)建了 1k 多個(gè)線程之后,就拋出了異常:

Exception in thread "main" java.lang.OutOfMemoryError: unable to create new native thread 
    at java.lang.Thread.start0(Native Method) 

嗯,又多了一個(gè)用協(xié)程的理由,對(duì)不對(duì)?

6. 攜帶值的 Job

我們前面說了,通過攜程返回的 Job,我們可以控制攜程的運(yùn)行??捎袝r(shí)候我們更關(guān)注協(xié)程運(yùn)行的結(jié)果,比如從網(wǎng)絡(luò)加載一張圖片:

suspend fun loadImage(url: String): Bitmap { 
    ... 
    return ... 
} 

沒錯(cuò),我們更關(guān)注它的結(jié)果,這種情況我們該怎么辦呢?如果 loadImage 不是 suspend 方法,那么我們在非 UI 線程當(dāng)中直接獲取他們:

val imageA = loadImage(urlA) 
val imageB = loadImage(urlB) 
onImageGet(imageA, imageB) 

這樣的操作有什么問題?順序獲取兩張圖片,耗時(shí),不經(jīng)濟(jì)。所以傳統(tǒng)的做法就是開兩個(gè)線程做這件事兒,這意味著你會(huì)看到兩個(gè)回調(diào),并且還要同步這兩個(gè)回調(diào),想想都頭疼。

不過我們現(xiàn)在有更好的辦法:

val imageA = defer(CommonPool) { loadImage(urlA) } 
val imageB = defer(CommonPool) { loadImage(urlB) } 
onImageGet(imageA.await(),imageB.await()) 

代碼量幾乎沒有增加,不過我們卻做到了兩張圖片異步獲取,并同時(shí)傳給 onImageGet 以便繼續(xù)后面的操作。

defer 到底是個(gè)什么東西?其實(shí)大家大可不必看到新詞就感到恐慌,這東西用法幾乎跟 launch 一樣,只不過它返回的 Deferred 功能比 Job 多了一樣:攜帶返回值。我們前面看到的 imageA 其實(shí)就是一個(gè) Deferred 實(shí)例,而它的 await 方法返回的則是 Bitmap 類型,也即 loadImage(urlA) 的返回值。

所以如果你對(duì)協(xié)程運(yùn)行的結(jié)果感興趣,直接使用 defer 來替換你的 launch 就可以了。需要注意的是,即便你不調(diào)用 await,defer 啟動(dòng)的協(xié)程也會(huì)立即運(yùn)行,如果你希望你的協(xié)程能夠按需啟動(dòng),例如只有你調(diào)用 await 之后再啟動(dòng),那么你可以用 lazyDefer:

val imageA = lazyDefer(CommonPool) { loadImage(urlA) } 
val imageB = lazyDefer(CommonPool) { loadImage(urlB) } 
onImageGet(imageA.await(),imageB.await()) //這時(shí)候才開始真正去加載圖片 

7. 生成器

不知道大家對(duì) python 的生成器有沒有了解,這個(gè)感覺就好似延遲計(jì)算一樣。

假設(shè)我們要計(jì)算 fibonacci 數(shù)列,這個(gè)大家都知道,也非常容易寫,你可能分分鐘寫出一個(gè)遞歸的函數(shù)來求得這個(gè)序列,不過你應(yīng)該知道遞歸的層級(jí)越多,stackOverflow 的可能性越大吧?另外,如果我們只是用到其中的幾個(gè),那么遞歸的函數(shù)一下子都給求出來,而且每次調(diào)用也沒有記憶性導(dǎo)致同一個(gè)值計(jì)算多次,非常不經(jīng)濟(jì)。大家看一個(gè) python 的例子:

def fibonacci(): 
    yield 1 # 直接返回 1, 并且在此處暫停 
    first = 1 
    second = 1 
    while True: 
        yield first 
        first, second = first + second, first 
        
a = fibonacci() 
for x in a: 
    print x 
    if x > 100: break 

前面給出的這種計(jì)算方法,fibonacci 函數(shù)返回一個(gè)可迭代的對(duì)象,這個(gè)對(duì)象其實(shí)就是生成器,只有我們在迭代它的時(shí)候,它才會(huì)去真正執(zhí)行計(jì)算,只要遇到 yield,那么這一次迭代到的值就是 yield 后面的值,比如,我們第一次調(diào)用 fibonacci 這個(gè)函數(shù)的時(shí)候,得到的值就是 1,后面依次類推。

Kotlin 在添加了協(xié)程這個(gè)功能之后,也可以這么搞了:

val fibonacci = buildSequence { 
    yield(1) // first Fibonacci number 
    var cur = 1 
    var next = 1 
    while (true) { 
        yield(next) // next Fibonacci number 
        val tmp = cur + next 
        cur = next 
        next = tmp 
    } 
} 
... 
for (i in fibonacci){ 
    println(i) 
    if(i > 100) break //大于100就停止循環(huán) 
} 

可以這么說,這段代碼與前面的 python 版本功能是完全相同的,在 yield 方法調(diào)用時(shí),傳入的值就是本次迭代的值。

fibonacci 這個(gè)變量的類型如下:

public interface Sequence<out T> { 
   public operator fun iterator(): Iterator<T> 
} 

既然有 iterator 方法,那么我們可以直接對(duì) fibonacci 進(jìn)行迭代也就沒什么大驚小怪的了。這個(gè) iterator 保證每次迭代的時(shí)候去執(zhí)行 buildSequence 后面的 Lambda 的代碼,從上一個(gè) yield 之后開始到下一個(gè) yield 結(jié)束,yield 傳入的值就是 iterator 的 next 的返回值。

有了這個(gè)特性,我們就可以構(gòu)造許多“懶”序列,只有在用到的時(shí)候才去真正計(jì)算每一個(gè)元素的值,而且運(yùn)算狀態(tài)可以保存,每次計(jì)算的結(jié)果都不會(huì)浪費(fèi)。

注:這個(gè)特性是被 Kotlin 標(biāo)準(zhǔn)庫收錄了的,并不存在于 kotlinx.coroutines 當(dāng)中,不過這也沒關(guān)系啦,kotlinx.coroutines 的 API 會(huì)不會(huì)在不久的將來也作為 Kotlin 標(biāo)準(zhǔn)庫的內(nèi)容出現(xiàn)呢?

8. 小結(jié)

這一篇的內(nèi)容其實(shí)相對(duì)上一篇要簡單一些,面對(duì) kotlinx.coroutines 這樣的框架,我們直接通過分析案例,將 coroutine 這么理論化的東西投入實(shí)際場景,讓大家從感性上對(duì)其有個(gè)更加深入的認(rèn)識(shí)。

當(dāng)然,我們并沒有深入其中了解其原理,原因就是上一篇我們?yōu)榇俗隽俗銐虻臏?zhǔn)備 —— kotlinx.coroutines 作為官方的框架,自然要實(shí)現(xiàn)得完善一些,但也是萬變不離其宗。

寫到這里,我想,我們還是需要有一篇文章再來介紹一些協(xié)程使用的一些注意事項(xiàng),那么我們下一篇再見吧。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容