kotlin - Coroutine 協(xié)程

我是在深入學(xué)習(xí) kotlin 時(shí)第一次看到協(xié)程,作為傳統(tǒng)線(xiàn)程模型的進(jìn)化版,雖說(shuō)協(xié)程這個(gè)概念幾十年前就有了,但是協(xié)程只是在近年才開(kāi)始興起,應(yīng)用的語(yǔ)言有:go 、goLand、kotlin、python , 都是支持協(xié)程的,可能不同平臺(tái) API 上有差異

首次學(xué)習(xí)協(xié)程可能會(huì)費(fèi)些時(shí)間,協(xié)程和 thread 類(lèi)似,但是和 thread 有很大區(qū)別,搞懂,學(xué)會(huì),熟悉協(xié)程在線(xiàn)程上如何運(yùn)作是要鉆研一下的,上手可能不是那么快

這里有個(gè)很有建設(shè)性意見(jiàn)的例子,Coroutines 代替 rxjava


怎么理解協(xié)程

我們先來(lái)把協(xié)程這個(gè)概念搞懂,不是很好理解,但是也不難理解

協(xié)程 - 也叫微線(xiàn)程,是一種新的多任務(wù)并發(fā)的操作手段(也不是很新,概念早就有了)

  • 特征:協(xié)程是運(yùn)行在單線(xiàn)程中的并發(fā)程序
  • 優(yōu)點(diǎn):省去了傳統(tǒng) Thread 多線(xiàn)程并發(fā)機(jī)制中切換線(xiàn)程時(shí)帶來(lái)的線(xiàn)程上下文切換、線(xiàn)程狀態(tài)切換、Thread 初始化上的性能損耗,能大幅度唐提高并發(fā)性能
  • 漫畫(huà)版概念解釋?zhuān)?a target="_blank">漫畫(huà):什么是協(xié)程?
  • 簡(jiǎn)單理解:在單線(xiàn)程上由程序員自己調(diào)度運(yùn)行的并行計(jì)算

下面是關(guān)于協(xié)程這個(gè)概念的一些描述:

協(xié)程的開(kāi)發(fā)人員 Roman Elizarov 是這樣描述協(xié)程的:協(xié)程就像非常輕量級(jí)的線(xiàn)程。線(xiàn)程是由系統(tǒng)調(diào)度的,線(xiàn)程切換或線(xiàn)程阻塞的開(kāi)銷(xiāo)都比較大。而協(xié)程依賴(lài)于線(xiàn)程,但是協(xié)程掛起時(shí)不需要阻塞線(xiàn)程,幾乎是無(wú)代價(jià)的,協(xié)程是由開(kāi)發(fā)者控制的。所以協(xié)程也像用戶(hù)態(tài)的線(xiàn)程,非常輕量級(jí),一個(gè)線(xiàn)程中可以創(chuàng)建任意個(gè)協(xié)程。

Coroutine,翻譯成”協(xié)程“,初始碰到的人馬上就會(huì)跟進(jìn)程和線(xiàn)程兩個(gè)概念聯(lián)系起來(lái)。直接先說(shuō)區(qū)別,Coroutine是編譯器級(jí)的,Process和Thread是操作系統(tǒng)級(jí)的。Coroutine的實(shí)現(xiàn),通常是對(duì)某個(gè)語(yǔ)言做相應(yīng)的提議,然后通過(guò)后成編譯器標(biāo)準(zhǔn),然后編譯器廠商來(lái)實(shí)現(xiàn)該機(jī)制。Process和Thread看起來(lái)也在語(yǔ)言層次,但是內(nèi)生原理卻是操作系統(tǒng)先有這個(gè)東西,然后通過(guò)一定的API暴露給用戶(hù)使用,兩者在這里有不同。Process和Thread是os通過(guò)調(diào)度算法,保存當(dāng)前的上下文,然后從上次暫停的地方再次開(kāi)始計(jì)算,重新開(kāi)始的地方不可預(yù)期,每次CPU計(jì)算的指令數(shù)量和代碼跑過(guò)的CPU時(shí)間是相關(guān)的,跑到os分配的cpu時(shí)間到達(dá)后就會(huì)被os強(qiáng)制掛起。Coroutine是編譯器的魔術(shù),通過(guò)插入相關(guān)的代碼使得代碼段能夠?qū)崿F(xiàn)分段式的執(zhí)行,重新開(kāi)始的地方是yield關(guān)鍵字指定的,一次一定會(huì)跑到一個(gè)yield對(duì)應(yīng)的地方

對(duì)于多線(xiàn)程應(yīng)用,CPU通過(guò)切片的方式來(lái)切換線(xiàn)程間的執(zhí)行,線(xiàn)程切換時(shí)需要耗時(shí)(保存狀態(tài),下次繼續(xù))。協(xié)程,則只使用一個(gè)線(xiàn)程,在一個(gè)線(xiàn)程中規(guī)定某個(gè)代碼塊執(zhí)行順序。協(xié)程能保留上一次調(diào)用時(shí)的狀態(tài),不需要像線(xiàn)程一樣用回調(diào)函數(shù),所以性能上會(huì)有提升。缺點(diǎn)是本質(zhì)是個(gè)單線(xiàn)程,不能利用到單個(gè)CPU的多個(gè)核

協(xié)程和線(xiàn)程的對(duì)比:

  • Thread - 線(xiàn)程擁有獨(dú)立的棧、局部變量,基于進(jìn)程的共享內(nèi)存,因此數(shù)據(jù)共享比較容易,但是多線(xiàn)程時(shí)需要加鎖來(lái)進(jìn)行訪問(wèn)控制,不加鎖就容易導(dǎo)致數(shù)據(jù)錯(cuò)誤,但加鎖過(guò)多又容易出現(xiàn)死鎖。線(xiàn)程之間的調(diào)度由內(nèi)核控制(時(shí)間片競(jìng)爭(zhēng)機(jī)制),程序員無(wú)法介入控制(即便我們擁有sleep、yield這樣的API,這些API只是看起來(lái)像,但本質(zhì)還是交給內(nèi)核去控制,我們最多就是加上幾個(gè)條件控制罷了),線(xiàn)程之間的切換需要深入到內(nèi)核級(jí)別,因此線(xiàn)程的切換代價(jià)比較大,表現(xiàn)在:
    * 線(xiàn)程對(duì)象的創(chuàng)建和初始化
    * 線(xiàn)程上下文切換
    * 線(xiàn)程狀態(tài)的切換由系統(tǒng)內(nèi)核完成
    * 對(duì)變量的操作需要加鎖

  • Coroutine 協(xié)程是跑在線(xiàn)程上的優(yōu)化產(chǎn)物,被稱(chēng)為輕量級(jí) Thread,擁有自己的棧內(nèi)存和局部變量,共享成員變量。傳統(tǒng) Thread 執(zhí)行的核心是一個(gè)while(true) 的函數(shù),本質(zhì)就是一個(gè)耗時(shí)函數(shù),Coroutine 可以用來(lái)直接標(biāo)記方法,由程序員自己實(shí)現(xiàn)切換,調(diào)度,不再采用傳統(tǒng)的時(shí)間段競(jìng)爭(zhēng)機(jī)制。在一個(gè)線(xiàn)程上可以同時(shí)跑多個(gè)協(xié)程,同一時(shí)間只有一個(gè)協(xié)程被執(zhí)行,在單線(xiàn)程上模擬多線(xiàn)程并發(fā),協(xié)程何時(shí)運(yùn)行,何時(shí)暫停,都是有程序員自己決定的,使用: yield/resume API,優(yōu)勢(shì)如下:

    • 因?yàn)樵谕粋€(gè)線(xiàn)程里,協(xié)程之間的切換不涉及線(xiàn)程上下文的切換和線(xiàn)程狀態(tài)的改變,不存在資源、數(shù)據(jù)并發(fā),所以不用加鎖,只需要判斷狀態(tài)就OK,所以執(zhí)行效率比多線(xiàn)程高很多
    • 協(xié)程是非阻塞式的(也有阻塞API),一個(gè)協(xié)程在進(jìn)入阻塞后不會(huì)阻塞當(dāng)前線(xiàn)程,當(dāng)前線(xiàn)程會(huì)去執(zhí)行其他協(xié)程任務(wù)


程序員能夠控制協(xié)程的切換,是通過(guò)yield API 讓協(xié)程在空閑時(shí)(比如等待io,網(wǎng)絡(luò)數(shù)據(jù)未到達(dá))放棄執(zhí)行權(quán),然后在合適的時(shí)機(jī)再通過(guò)resume API 喚醒協(xié)程繼續(xù)運(yùn)行。協(xié)程一旦開(kāi)始運(yùn)行就不會(huì)結(jié)束,直到遇到yield交出執(zhí)行權(quán)。Yieldresume 這一對(duì) API 可以非常便捷的實(shí)現(xiàn)異步,這可是目前所有高級(jí)語(yǔ)法孜孜不倦追求的

拿 python 代碼舉個(gè)例子,在一個(gè)線(xiàn)程里運(yùn)行下面2個(gè)方法:

def A():
    print '1'
    print '2'
    print '3'

def B():
    print 'x'
    print 'y'
    print 'z'

假設(shè)由協(xié)程執(zhí)行,每個(gè)方法都用協(xié)程標(biāo)記,在執(zhí)行A的過(guò)程中,可以隨時(shí)中斷,去執(zhí)行B,B也可能在執(zhí)行過(guò)程中中斷再去執(zhí)行A,結(jié)果可能是:1 2 x y 3 z


添加依賴(lài)

在 module 項(xiàng)目中添加下面的依賴(lài):

    kotlin{
        experimental {
            coroutines 'enable'
        }
    }

    implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-core:1.1.1'
    implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-android:1.1.1'

kotlin Coroutine 部分在最近幾個(gè)版本變化較大,推薦大家使用 kotlin 的最新版本 1.3.21,同時(shí) kotlin 1.3.21 版本 kotlin-stdlib-jre7 支持庫(kù)更新為 kotlin-stdlib-jdk7

buildscript {
    ext.kotlin_version = '1.3.21'
    ......
}
implementation "org.jetbrains.kotlin:kotlin-stdlib-jdk7:$kotlin_version"

kotlin 種協(xié)程的概念

文章開(kāi)頭已經(jīng)介紹了協(xié)程的概念,但畢竟平臺(tái)不同,也許多協(xié)程也有不一樣的地方,我們還是看看 kotlin 中的協(xié)程的描述,下面來(lái)自官方文檔:

協(xié)程通過(guò)將復(fù)雜性放入庫(kù)來(lái)簡(jiǎn)化異步編程。程序的邏輯可以在協(xié)程中順序地表達(dá),而底層庫(kù)會(huì)為我們解決其異步性。該庫(kù)可以將用戶(hù)代碼的相關(guān)部分包裝為回調(diào)、訂閱相關(guān)事件、在不同線(xiàn)程(甚至不同機(jī)器)上調(diào)度執(zhí)行,而代碼則保持如同順序執(zhí)行一樣簡(jiǎn)單

總結(jié)下,協(xié)程是跑在線(xiàn)程上的,一個(gè)線(xiàn)程可以同時(shí)跑多個(gè)協(xié)程,每一個(gè)協(xié)程則代表一個(gè)耗時(shí)任務(wù),我們手動(dòng)控制多個(gè)協(xié)程之間的運(yùn)行、切換,決定誰(shuí)什么時(shí)候掛起,什么時(shí)候運(yùn)行,什么時(shí)候喚醒,而不是 Thread 那樣交給系統(tǒng)內(nèi)核來(lái)操作去競(jìng)爭(zhēng) CPU 時(shí)間片

協(xié)程在線(xiàn)程中是順序執(zhí)行的,既然是順序執(zhí)行的那怎么實(shí)現(xiàn)異步,這自然是有手段的。Thread 中我們有阻塞、喚醒的概念,協(xié)程里同樣也有,掛起等同于阻塞,區(qū)別是 Thread 的阻塞是會(huì)阻塞當(dāng)前線(xiàn)程的(此時(shí)線(xiàn)程只能空耗 cpu 時(shí)間而不能執(zhí)行其他計(jì)算任務(wù),是種浪費(fèi)),而協(xié)程的掛起不會(huì)阻塞線(xiàn)程。當(dāng)線(xiàn)程接收到某個(gè)協(xié)程的掛起請(qǐng)求后,會(huì)去執(zhí)行其他計(jì)算任務(wù),比如其他協(xié)程。協(xié)程通過(guò)這樣的手段來(lái)實(shí)現(xiàn)多線(xiàn)程、異步的效果,在思維邏輯上同 Thread 的確有比較大的區(qū)別,大家需要適應(yīng)下思路上的變化


suspend 關(guān)鍵字

協(xié)程天然親近方法,協(xié)程表現(xiàn)為標(biāo)記、切換方法、代碼段,協(xié)程里使用 suspend 關(guān)鍵字修飾方法,既該方法可以被協(xié)程掛起,沒(méi)用suspend修飾的方法不能參與協(xié)程任務(wù),suspend修飾的方法只能在協(xié)程中只能與另一個(gè)suspend修飾的方法交流

suspend fun requestToken(): Token { ... }   // 掛起函數(shù)
suspend fun createPost(token: Token, item: Item): Post { ... }  // 掛起函數(shù)

fun postItem(item: Item) {
    GlobalScope.launch { // 創(chuàng)建一個(gè)新協(xié)程
        val token = requestToken()
        val post = createPost(token, item)
        processPost(post)
        // 需要異常處理,直接加上 try/catch 語(yǔ)句即可
    }
}

創(chuàng)建協(xié)程

kotlin 里沒(méi)有 new ,自然也不像 JAVA 一樣 new Thread,另外 kotlin 里面提供了大量的高階函數(shù),所以不難猜出協(xié)程這里 kotlin 也是有提供專(zhuān)用函數(shù)的。kotlin 中 GlobalScope 類(lèi)提供了幾個(gè)攜程構(gòu)造函數(shù):

  • launch - 創(chuàng)建協(xié)程
  • async - 創(chuàng)建帶返回值的協(xié)程,返回的是 Deferred 類(lèi)
  • withContext - 不創(chuàng)建新的協(xié)程,在指定協(xié)程上運(yùn)行代碼塊
  • runBlocking - 不是 GlobalScope 的 API,可以獨(dú)立使用,區(qū)別是 runBlocking 里面的 delay 會(huì)阻塞線(xiàn)程,而 launch 創(chuàng)建的不會(huì)

kotlin 在 1.3 之后要求協(xié)程必須由 CoroutineScope 創(chuàng)建,CoroutineScope 不阻塞當(dāng)前線(xiàn)程,在后臺(tái)創(chuàng)建一個(gè)新協(xié)程,也可以指定協(xié)程調(diào)度器。比如 CoroutineScope.launch{} 可以看成 new Coroutine

來(lái)看一個(gè)最簡(jiǎn)單的例子:

    Log.d("AA", "協(xié)程初始化開(kāi)始,時(shí)間: " + System.currentTimeMillis())

    GlobalScope.launch(Dispatchers.Unconfined) {
        Log.d("AA", "協(xié)程初始化完成,時(shí)間: " + System.currentTimeMillis())
        for (i in 1..3) {
            Log.d("AA", "協(xié)程任務(wù)1打印第$i 次,時(shí)間: " + System.currentTimeMillis())
        }
        delay(500)
        for (i in 1..3) {
            Log.d("AA", "協(xié)程任務(wù)2打印第$i 次,時(shí)間: " + System.currentTimeMillis())
        }
    }

    Log.d("AA", "主線(xiàn)程 sleep ,時(shí)間: " + System.currentTimeMillis())
    Thread.sleep(1000)
    Log.d("AA", "主線(xiàn)程運(yùn)行,時(shí)間: " + System.currentTimeMillis())

    for (i in 1..3) {
        Log.d("AA", "主線(xiàn)程打印第$i 次,時(shí)間: " + System.currentTimeMillis())
    }
協(xié)程初始化開(kāi)始,時(shí)間: 1553752816027
協(xié)程初始化完成,時(shí)間: 1553752816060
協(xié)程任務(wù)1打印第1 次,時(shí)間: 1553752816060
協(xié)程任務(wù)1打印第2 次,時(shí)間: 1553752816060
協(xié)程任務(wù)1打印第3 次,時(shí)間: 1553752816060
主線(xiàn)程 sleep ,時(shí)間: 1553752816063
協(xié)程任務(wù)2打印第1 次,時(shí)間: 1553752816567
協(xié)程任務(wù)2打印第2 次,時(shí)間: 1553752816567
協(xié)程任務(wù)2打印第3 次,時(shí)間: 1553752816567
主線(xiàn)程運(yùn)行,時(shí)間: 1553752817067
主線(xiàn)程打印第1 次,時(shí)間: 1553752817068
主線(xiàn)程打印第2 次,時(shí)間: 1553752817068
主線(xiàn)程打印第3 次,時(shí)間: 1553752817068

以 launch 函數(shù)為例

launch 函數(shù)定義:

public fun CoroutineScope.launch(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> Unit
): Job

launch 是個(gè)擴(kuò)展函數(shù),接受3個(gè)參數(shù),前面2個(gè)是常規(guī)參數(shù),最后一個(gè)是個(gè)對(duì)象式函數(shù),這樣的話(huà) kotlin 就可以使用以前說(shuō)的閉包的寫(xiě)法:() 里面寫(xiě)常規(guī)參數(shù),{} 里面寫(xiě)函數(shù)式對(duì)象的實(shí)現(xiàn),就像上面的例子一樣,剛從 java 轉(zhuǎn)過(guò)來(lái)的朋友看著很別扭不是,得適應(yīng)

 GlobalScope.launch(Dispatchers.Unconfined) {...}

我們需要關(guān)心的是 launch 的3個(gè)參數(shù)和返回值 Job:

  • CoroutineContext - 可以理解為協(xié)程的上下文,在這里我們可以設(shè)置 CoroutineDispatcher 協(xié)程運(yùn)行的線(xiàn)程調(diào)度器,有 4種線(xiàn)程模式:
    • Dispatchers.Default
    • Dispatchers.IO -
    • Dispatchers.Main - 主線(xiàn)程
    • Dispatchers.Unconfined - 沒(méi)指定,就是在當(dāng)前線(xiàn)程

不寫(xiě)的話(huà)就是 Dispatchers.Default 模式的,或者我們可以自己創(chuàng)建協(xié)程上下文,也就是線(xiàn)程池,newSingleThreadContext 單線(xiàn)程,newFixedThreadPoolContext 線(xiàn)程池,具體的可以點(diǎn)進(jìn)去看看,這2個(gè)都是方法

val singleThreadContext = newSingleThreadContext("aa")
GlobalScope.launch(singleThreadContext) { ... }
  • CoroutineStart - 啟動(dòng)模式,默認(rèn)是DEAFAULT,也就是創(chuàng)建就啟動(dòng);還有一個(gè)是LAZY,意思是等你需要它的時(shí)候,再調(diào)用啟動(dòng)
    • DEAFAULT - 模式模式,不寫(xiě)就是默認(rèn)
    • ATOMIC -
    • UNDISPATCHED
    • LAZY - 懶加載模式,你需要它的時(shí)候,再調(diào)用啟動(dòng),看這個(gè)例子
var job:Job = GlobalScope.launch( start = CoroutineStart.LAZY ){
    Log.d("AA", "協(xié)程開(kāi)始運(yùn)行,時(shí)間: " + System.currentTimeMillis())
}

Thread.sleep( 1000L )
// 手動(dòng)啟動(dòng)協(xié)程
job.start()
  • block - 閉包方法體,定義協(xié)程內(nèi)需要執(zhí)行的操作
  • Job - 協(xié)程構(gòu)建函數(shù)的返回值,可以把 Job 看成協(xié)程對(duì)象本身,協(xié)程的操作方法都在 Job 身上了
    • job.start() - 啟動(dòng)協(xié)程,除了 lazy 模式,協(xié)程都不需要手動(dòng)啟動(dòng)
    • job.join() - 等待協(xié)程執(zhí)行完畢
    • job.cancel() - 取消一個(gè)協(xié)程
    • job.cancelAndJoin() - 等待協(xié)程執(zhí)行完畢然后再取消

GlobalScope.async

async 同 launch 唯一的區(qū)別就是 async 是有返回值的,看下面的例子:

GlobalScope.launch(Dispatchers.Unconfined) {
  val deferred = GlobalScope.async{
  delay(1000L)
  Log.d("AA","This is async ")
  return@async "taonce"
  }

  Log.d("AA","協(xié)程 other start")
  val result = deferred.await()
  Log.d("AA","async result is $result")
  Log.d("AA","協(xié)程 other end ")
}

Log.d("AA", "主線(xiàn)程位于協(xié)程之后的代碼執(zhí)行,時(shí)間:  ${System.currentTimeMillis()}")

async 返回的是 Deferred 類(lèi)型,Deferred 繼承自 Job 接口,Job有的它都有,增加了一個(gè)方法 await ,這個(gè)方法接收的是 async 閉包中返回的值,async 的特點(diǎn)是不會(huì)阻塞當(dāng)前線(xiàn)程,但會(huì)阻塞所在協(xié)程,也就是掛起

但是注意啊,async 并不會(huì)阻塞線(xiàn)程,只是阻塞鎖調(diào)用的協(xié)程


runBlocking

runBlocking 和 launch 區(qū)別的地方就是 runBlocking 的 delay 方法是可以阻塞當(dāng)前的線(xiàn)程的,和Thread.sleep() 一樣,看下面的例子:

fun main(args: Array<String>) {
  runBlocking {
    // 阻塞1s
    delay(1000L)
    println("This is a coroutines ${TimeUtil.getTimeDetail()}")
  }

  // 阻塞2s
  Thread.sleep(2000L)
  println("main end ${TimeUtil.getTimeDetail()}")
  }

~~~~~~~~~~~~~~log~~~~~~~~~~~~~~~~
This is a coroutines 11:00:51
main end 11:00:53

runBlocking 通常的用法是用來(lái)橋接普通阻塞代碼和掛起風(fēng)格的非阻塞代碼,在 runBlocking 閉包里面啟動(dòng)另外的協(xié)程,協(xié)程里面是可以嵌套啟動(dòng)別的協(xié)程的


協(xié)程的掛起和恢復(fù)

前面說(shuō)過(guò),協(xié)程的特點(diǎn)就是多個(gè)協(xié)程可以運(yùn)行在一個(gè)線(xiàn)程內(nèi),單個(gè)協(xié)程掛起后不會(huì)阻塞當(dāng)前線(xiàn)程,線(xiàn)程還可以繼續(xù)執(zhí)行其他任務(wù)。學(xué)習(xí)協(xié)程最大的難點(diǎn)是搞清楚協(xié)程是如何運(yùn)行的、何時(shí)掛起、何時(shí)恢復(fù),多個(gè)協(xié)程之間的組織運(yùn)行、協(xié)程和線(xiàn)程之間的組織運(yùn)行

1. 協(xié)程執(zhí)行時(shí), 協(xié)程和協(xié)程,協(xié)程和線(xiàn)程內(nèi)代碼是順序運(yùn)行的

這點(diǎn)是和 thread 最大的不同,thread 線(xiàn)程之間采取的是競(jìng)爭(zhēng) cpu 時(shí)間段的方法,誰(shuí)搶到誰(shuí)運(yùn)行,由系統(tǒng)內(nèi)核控制,對(duì)我們來(lái)說(shuō)是不可見(jiàn)不可控的。協(xié)程不同,協(xié)程之間不用競(jìng)爭(zhēng)、誰(shuí)運(yùn)行、誰(shuí)掛起、什么時(shí)候恢復(fù)都是由我們自己控制的

最簡(jiǎn)單的協(xié)程運(yùn)行模式,不涉及掛起時(shí),誰(shuí)寫(xiě)在前面誰(shuí)先運(yùn)行,后面的等前面的協(xié)程運(yùn)行完之后再運(yùn)行。涉及到掛起時(shí),前面的協(xié)程掛起了,那么線(xiàn)程不會(huì)空閑,而是繼續(xù)運(yùn)行下一個(gè)協(xié)程,而前面掛起的那個(gè)協(xié)程在掛起結(jié)速后不會(huì)馬上運(yùn)行,而是等待當(dāng)前正在運(yùn)行的協(xié)程運(yùn)行完畢后再去執(zhí)行

典型的例子:

GlobalScope.launch(Dispatchers.Unconfined) {
  for (i in 1..6) {
    Log.d("AA", "協(xié)程任務(wù)打印第$i 次,時(shí)間: ${System.currentTimeMillis()}")
  }
}

  for (i in 1..8) {
  Log.d("AA", "主線(xiàn)程打印第$i 次,時(shí)間:  ${System.currentTimeMillis()}")
}

2. 協(xié)程掛起時(shí),就不會(huì)執(zhí)行了,而是等待掛起完成且線(xiàn)程空閑時(shí)才能繼續(xù)執(zhí)行

大家還記得 suspend 這個(gè)關(guān)鍵字嗎,suspend 表示掛起的意思,用來(lái)修飾方法的,一個(gè)協(xié)程內(nèi)有多個(gè) suspend 修飾的方法順序書(shū)寫(xiě)時(shí),代碼也是順序運(yùn)行的,為什么,suspend 函數(shù)會(huì)將整個(gè)協(xié)程掛起,而不僅僅是這個(gè) suspend 函數(shù)

  • 1. 單攜程內(nèi)多 suspend 函數(shù)運(yùn)行
    suspend 修飾的方法掛起的是協(xié)程本身,而非該方法,注意這點(diǎn),看下面的代碼體會(huì)下
suspend fun getToken(): String {
  delay(300)
  Log.d("AA", "getToken 開(kāi)始執(zhí)行,時(shí)間:  ${System.currentTimeMillis()}")
  return "ask"
}

suspend fun getResponse(token: String): String {
  delay(100)
  Log.d("AA", "getResponse 開(kāi)始執(zhí)行,時(shí)間:  ${System.currentTimeMillis()}")
  return "response"
}

fun setText(response: String) {
  Log.d("AA", "setText 執(zhí)行,時(shí)間:  ${System.currentTimeMillis()}")
}

// 運(yùn)行代碼
GlobalScope.launch(Dispatchers.Main) {
  Log.d("AA", "協(xié)程 開(kāi)始執(zhí)行,時(shí)間:  ${System.currentTimeMillis()}")
  val token = getToken()
  val response = getResponse(token)
  setText(response)
}

在 getToken 方法將協(xié)程掛起時(shí),getResponse 函數(shù)永遠(yuǎn)不會(huì)運(yùn)行,只有等 getToken 掛起結(jié)速將協(xié)程恢復(fù)時(shí)才會(huì)運(yùn)行

  • 2. 多協(xié)程間 suspend 函數(shù)運(yùn)行
GlobalScope.launch(Dispatchers.Unconfined){
  var token = GlobalScope.async(Dispatchers.Unconfined) {
    return@async getToken()
   }.await()

  var response = GlobalScope.async(Dispatchers.Unconfined) {
    return@async getResponse(token)
  }.await()

  setText(response)
}

注意我外面要包裹一層 GlobalScope.launch,要不運(yùn)行不了。這里我們搞了2個(gè)協(xié)程出來(lái),但是我們?cè)谶@里使用了await,這樣就會(huì)阻塞外部協(xié)程,所以代碼還是按順序執(zhí)行的。這樣適用于多個(gè)同級(jí) IO 操作的情況,這樣寫(xiě)比 rxjava 要省事不少

3. 協(xié)程掛起后何時(shí)恢復(fù)

這個(gè)問(wèn)題值得我們研究,畢竟代碼運(yùn)行是負(fù)載的,協(xié)程之外線(xiàn)程里肯定還有需要執(zhí)行的代碼,我們來(lái)看看前面的代碼在掛起后何時(shí)才能恢復(fù)執(zhí)行。我們把上面的方法延遲改成 1ms ,2ms

suspend fun getToken(): String {
  delay(1)
  Log.d("AA", "getToken 開(kāi)始執(zhí)行,時(shí)間:  ${System.currentTimeMillis()}")
  return "ask"
}

suspend fun getResponse(token: String): String {
  delay(2)
  Log.d("AA", "getResponse 開(kāi)始執(zhí)行,時(shí)間:  ${System.currentTimeMillis()}")
  return "response"
}

fun setText(response: String) {
  Log.d("AA", "setText 執(zhí)行,時(shí)間:  ${System.currentTimeMillis()}")
}

GlobalScope.launch(Dispatchers.Unconfined) {
  Log.d("AA", "協(xié)程 開(kāi)始執(zhí)行,時(shí)間:  ${System.currentTimeMillis()}")

  val token = getToken()
  val response = getResponse(token)

  setText(response)
}

for (i in 1..10) {
  Log.d("AA", "主線(xiàn)程打印第$i 次,時(shí)間:  ${System.currentTimeMillis()}")
}

協(xié)程掛起后,雖然延遲的時(shí)間到了,但是還得等到線(xiàn)程空閑時(shí)才能繼續(xù)執(zhí)行,這里要注意,協(xié)程可沒(méi)有競(jìng)爭(zhēng) cpu 時(shí)間段,協(xié)程掛起后即便可以恢復(fù)執(zhí)行了也不是馬上就能恢復(fù)執(zhí)行,需要我們自己結(jié)合上下文代碼去判斷,這里寫(xiě)不好是要出問(wèn)題的

4. 協(xié)程掛起后再恢復(fù)時(shí)在哪個(gè)線(xiàn)程運(yùn)行

為什么要寫(xiě)這個(gè)呢,在 Thread 中不存在這個(gè)問(wèn)題,但是協(xié)程中有句話(huà)這樣說(shuō)的:哪個(gè)線(xiàn)程恢復(fù)的協(xié)程,協(xié)程就運(yùn)行在哪個(gè)線(xiàn)程中,我們分別對(duì) kotlin 提供的 3個(gè)協(xié)程調(diào)度器測(cè)試一下。我們用這段代碼測(cè)試,分別設(shè)置 3個(gè)協(xié)程調(diào)度器

GlobalScope.launch(Dispatchers.Main){
  Log.d("AA", "協(xié)程測(cè)試 開(kāi)始執(zhí)行,線(xiàn)程:${Thread.currentThread().name}")

  var token = GlobalScope.async(Dispatchers.Unconfined) {
    return@async getToken()
  }.await()

  var response = GlobalScope.async(Dispatchers.Unconfined) {
    return@async getResponse(token)
  }.await()

  setText(response)
}

Log.d("AA", "主線(xiàn)程協(xié)程后面代碼執(zhí)行,線(xiàn)程:${Thread.currentThread().name}")
  • Dispatchers.Main


    看來(lái) Dispatchers.Main 這個(gè)調(diào)度器在協(xié)程掛起后會(huì)一直跑在主線(xiàn)程上,但是有一點(diǎn)注意啊,主線(xiàn)程中寫(xiě)在程后面的代碼先執(zhí)行了,這就有點(diǎn)坑了,要注意啊,Dispatchers.Main 是以給主線(xiàn)程 handle 添加任務(wù)的方式現(xiàn)實(shí)在主線(xiàn)程上的運(yùn)行的

  • Dispatchers.Unconfined


    Dispatchers.Unconfined 在首次掛起之后再恢復(fù)運(yùn)行,所在線(xiàn)程已經(jīng)不是首次運(yùn)行時(shí)的主線(xiàn)程了,而是默認(rèn)線(xiàn)程池中的線(xiàn)程,這里要特別注意啊,看來(lái)協(xié)程的喚醒不是那么簡(jiǎn)單的,協(xié)程內(nèi)部做了很多工作

  • Dispatchers.IO


    看來(lái) Dispatchers.IO 這個(gè)調(diào)度器在協(xié)程掛起后,也是切到默認(rèn)線(xiàn)程池去了,不過(guò)最后又切回最開(kāi)始的 IO 線(xiàn)程了

注意協(xié)程內(nèi)部,若是在前面有代碼切換了線(xiàn)程,后面的代碼若是沒(méi)有指定線(xiàn)程,那么就是運(yùn)行在這個(gè)切換到的線(xiàn)程上的,所以大家看上面的測(cè)試結(jié)果,setText 執(zhí)行的線(xiàn)程都和上一個(gè)方法一樣

我們最好給異步任務(wù)在外面套一個(gè)協(xié)程,這樣我們可以?huà)鞉炱鹫麄€(gè)異步任務(wù),然后給每段代碼指定運(yùn)行線(xiàn)程調(diào)度器,這樣省的因?yàn)閰f(xié)程內(nèi)部掛起恢復(fù)變更線(xiàn)程而帶來(lái)的問(wèn)題

Kotlin Coroutines封裝異步回調(diào)、協(xié)程間關(guān)系及協(xié)程的取消 一文中,作者分析了源碼,非 Dispatchers.Main 調(diào)度器的協(xié)程,會(huì)在協(xié)程掛起后把協(xié)程當(dāng)做一個(gè)任務(wù) DelayedResumeTask 放到默認(rèn)線(xiàn)程池 DefaultExecutor 隊(duì)列的最后,在延遲的時(shí)間到達(dá)才會(huì)執(zhí)行恢復(fù)協(xié)程任務(wù)。雖然多個(gè)協(xié)程之間可能不是在同一個(gè)線(xiàn)程上運(yùn)行的,但是協(xié)程內(nèi)部的機(jī)制可以保證我們書(shū)寫(xiě)的協(xié)程是按照我們指定的順序或者邏輯自行

看個(gè)例子:

    suspend fun getToken(): String {
        Log.d("AA", "getToken start,線(xiàn)程:${Thread.currentThread().name}")
        delay(100)
        Log.d("AA", "getToken end,線(xiàn)程:${Thread.currentThread().name}")
        return "ask"
    }

    suspend fun getResponse(token: String): String {
        Log.d("AA", "getResponse start,線(xiàn)程:${Thread.currentThread().name}")
        delay(200)
        Log.d("AA", "getResponse end,線(xiàn)程:${Thread.currentThread().name}")
        return "response"
    }

    fun setText(response: String) {
        Log.d("AA", "setText 執(zhí)行,線(xiàn)程:${Thread.currentThread().name}")
    }

    // 實(shí)際運(yùn)行
    GlobalScope.launch(Dispatchers.IO) {
        Log.d("AA", "協(xié)程測(cè)試 開(kāi)始執(zhí)行,線(xiàn)程:${Thread.currentThread().name}")
        var token = GlobalScope.async(Dispatchers.IO) {
            return@async getToken()
        }.await()

        var response = GlobalScope.async(Dispatchers.IO) {
            return@async getResponse(token)
        }.await()

        setText(response)
    }

5. relay、yield 區(qū)別

relay 和 yield 方法是協(xié)程內(nèi)部的操作,可以?huà)炱饏f(xié)程,區(qū)別是 relay 是掛起協(xié)程并經(jīng)過(guò)執(zhí)行時(shí)間恢復(fù)協(xié)程,當(dāng)線(xiàn)程空閑時(shí)就會(huì)運(yùn)行協(xié)程;yield 是掛起協(xié)程,讓協(xié)程放棄本次 cpu 執(zhí)行機(jī)會(huì)讓給別的協(xié)程,當(dāng)線(xiàn)程空閑時(shí)再次運(yùn)行協(xié)程。我們只要使用 kotlin 提供的協(xié)程上下文類(lèi)型,線(xiàn)程池是有多個(gè)線(xiàn)程的,再次執(zhí)行的機(jī)會(huì)很快就會(huì)有的。

除了 main 類(lèi)型,協(xié)程在掛起后都會(huì)封裝成任務(wù)放到協(xié)程默認(rèn)線(xiàn)程池的任務(wù)隊(duì)列里去,有延遲時(shí)間的在時(shí)間過(guò)后會(huì)放到隊(duì)列里去,沒(méi)有延遲時(shí)間的直接放到隊(duì)列里去

6. 協(xié)程的取消

我們?cè)趧?chuàng)建協(xié)程過(guò)后可以接受一個(gè) Job 類(lèi)型的返回值,我們操作 job 可以取消協(xié)程任務(wù),job.cancel 就可以了

        // 協(xié)程任務(wù)
        job = GlobalScope.launch(Dispatchers.IO) {
            Log.d("AA", "協(xié)程測(cè)試 開(kāi)始執(zhí)行,線(xiàn)程:${Thread.currentThread().name}")
            var token = GlobalScope.async(Dispatchers.IO) {
                return@async getToken()
            }.await()

            var response = GlobalScope.async(Dispatchers.IO) {
                return@async getResponse(token)
            }.await()

            setText(response)
        }
      
        // 取消協(xié)程
        job?.cancel()
        Log.d("AA", "btn_right 結(jié)束協(xié)程")

協(xié)程的取消有些特質(zhì),因?yàn)閰f(xié)程內(nèi)部可以在創(chuàng)建協(xié)程的,這樣的協(xié)程組織關(guān)系可以稱(chēng)為父協(xié)程,子協(xié)程:

  • 父協(xié)程手動(dòng)調(diào)用 cancel() 或者異常結(jié)束,會(huì)立即取消它的所有子協(xié)程
  • 父協(xié)程必須等待所有子協(xié)程完成(處于完成或者取消狀態(tài))才能完成
  • 子協(xié)程拋出未捕獲的異常時(shí),默認(rèn)情況下會(huì)取消其父協(xié)程

現(xiàn)在問(wèn)題來(lái)了,在 Thread 中我們想關(guān)閉線(xiàn)程有時(shí)候也不是掉個(gè)方法就行的,需要我們自行在線(xiàn)程中判斷縣城是不是已經(jīng)結(jié)束了。在協(xié)程中一樣,cancel 方法只是修改了協(xié)程的狀態(tài),在協(xié)程自身的方法比如 realy,yield 等中會(huì)判斷協(xié)程的狀態(tài)從而結(jié)束協(xié)程,但是若是在協(xié)程我們沒(méi)有用這幾個(gè)方法怎么辦,比如都是邏輯代碼,這時(shí)就要我們自己手動(dòng)判斷了,使用 job.isActive ,isActive 是個(gè)標(biāo)記,用來(lái)檢查協(xié)程狀態(tài)


其他內(nèi)容

我也是初次學(xué)習(xí)使用協(xié)程,這里放一些暫時(shí)沒(méi)高徹底的內(nèi)容

  1. Mutex 協(xié)程互斥鎖

線(xiàn)程中鎖都是阻塞式,在沒(méi)有獲取鎖時(shí)無(wú)法執(zhí)行其他邏輯,而協(xié)程可以通過(guò)掛起函數(shù)解決這個(gè),沒(méi)有獲取鎖就掛起協(xié)程,獲取后再恢復(fù)協(xié)程,協(xié)程掛起時(shí)線(xiàn)程并沒(méi)有阻塞可以執(zhí)行其他邏輯。這種互斥鎖就是 Mutex,它與 synchronized 關(guān)鍵字有些類(lèi)似,還提供了 withLock 擴(kuò)展函數(shù),替代常用的 mutex.lock; try {...} finally { mutex.unlock() }

更多的使用經(jīng)驗(yàn)就要大家自己取找找了

fun main(args: Array<String>) = runBlocking<Unit> {
    val mutex = Mutex()
    var counter = 0
    repeat(10000) {
        GlobalScope.launch {
            mutex.withLock {
                counter ++
            }
        }
    }
    println("The final count is $counter")
}

協(xié)程應(yīng)用

  1. 協(xié)程請(qǐng)求網(wǎng)絡(luò)數(shù)據(jù)

我們用帶返回值的協(xié)程 GlobalScope.async 在 IO 線(xiàn)程中去執(zhí)行網(wǎng)絡(luò)請(qǐng)求,然后通過(guò) await 返回請(qǐng)求結(jié)果,用launch 在主線(xiàn)程中更新UI就行了,注意外面用 runBlocking 包裹

class MainActivity : AppCompatActivity() {

    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_main)
        coroutine.setOnClickListener { click() }
    }

    private fun click() = runBlocking {
        GlobalScope.launch(Dispatchers.Main) {
            coroutine.text = GlobalScope.async(Dispatchers.IO) {
                // 比如進(jìn)行了網(wǎng)絡(luò)請(qǐng)求
                // 放回了請(qǐng)求后的結(jié)構(gòu)
                return@async "main"
            }.await()
        }
    }
}
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容