庖丁解牛,一文搞懂Kotlin協(xié)程的常用方法

本篇文章舉例協(xié)程的各種方法的使用,并簡單闡述各個方法的一些注意事項。
協(xié)程作用域的創(chuàng)建
1.通過工廠函數(shù)創(chuàng)建自定義上下文的作用域
// 舉個例子:
// 協(xié)程異常處理器(也是一個上下文元素)
private val exceptionHandler =
        CoroutineExceptionHandler { _, throwable ->
            onError(throwable)
        }
val myScope = CoroutineScope(exceptionHandler + SupervisorJob() + Dispatchers.IO)

// 源碼
public fun CoroutineScope(
    context: CoroutineContext
): CoroutineScope = ContextScope(
    if (context[Job] != null) context
    else context + Job()
)
    
internal class ContextScope(
    context: CoroutineContext
) : CoroutineScope {
    override val coroutineContext: CoroutineContext = context
    
    override fun toString(): String =
        "CoroutineScope(coroutineContext=$coroutineContext)"
}

2.通過工廠函數(shù)MainScope()創(chuàng)建主線程調(diào)度器的作用域
// 舉個例子:
val mainScope = MainScope()

// 源碼
public fun MainScope(): CoroutineScope = ContextScope(SupervisorJob() + Dispatchers.Main)
3.官方架構(gòu)提供的一些作用域viewModelScope和lifecycleScope
//  Android JetPack ViewModel里的viewModelScope,ViewModeld的擴展變量
// 來自androidx.lifecycle:lifecycle-viewmodel-ktx:2.3.1
public val ViewModel.viewModelScope: CoroutineScope
    get() {
        val scope: CoroutineScope? = this.getTag(JOB_KEY)
        
        if (scope != null) {
            return scope
        }
        
        return setTagIfAbsent(
            JOB_KEY,
            CloseableCoroutineScope(
                SupervisorJob() +
                Dispatchers.Main.immediate
            )
        )
}

internal class CloseableCoroutineScope(
    context: CoroutineContext
) : Closeable, CoroutineScope {
    override val coroutineContext: CoroutineContext = context
    
    override fun close() {
        coroutineContext.cancel()
    }
}

//  Android JetPack lifecycle里的lifecycleScope,LifecycleOwner(Activity實現(xiàn)了此接口,可以直接用)的擴展變量
// 來自androidx.lifecycle:lifecycle-runtime-ktx:2.2.0

val LifecycleOwner.lifecycleScope: LifecycleCoroutineScope
    get() = lifecycle.coroutineScope

val Lifecycle.coroutineScope: LifecycleCoroutineScope
    get() {
        while (true) {
            val existing = mInternalScopeRef.get() as LifecycleCoroutineScopeImpl?
            if (existing != null) {
                return existing
            }
            val newScope = LifecycleCoroutineScopeImpl(
                this,
                SupervisorJob() + Dispatchers.Main.immediate
            )
            if (mInternalScopeRef.compareAndSet(null, newScope)) {
                newScope.register()
                return newScope
            }
        }
    }
協(xié)程的創(chuàng)建
方式一:launch

創(chuàng)建一個協(xié)程,并返回協(xié)程的引用job,最常用的方式

val job = MainScope().launch (Dispatchers.IO){
                delay(1000)
                println("run in coroutine")
            }

job.cancel()  // 取消協(xié)程
job.join()  // 等待協(xié)程執(zhí)行完畢
job.cancelAndJoin() // 取消并等待協(xié)程執(zhí)行完畢(取消操作并不等于會立馬響應(yīng),這里其實是分別執(zhí)行了cancel和join)
job.cancelChildren() // 取消子協(xié)程
方式二:async

創(chuàng)建一個協(xié)程,并返回協(xié)程的引用Deferred(也是個job),通過Deferred的await拿到返回結(jié)果,此時協(xié)程會掛起

MainScope().launch (Dispatchers.Main){
              // 啟動協(xié)程
                val deferred = async {
                    "async result"
                }
                // 掛起等待協(xié)程結(jié)果
                val result = deferred.await()
                // 恢復(fù)掛起
                println("result = $result")
   }
作用域構(gòu)建器
runBlocking

runBlocking {} 會在開啟一個新的協(xié)程,并且阻塞主線程,直到操作完成。這個函數(shù)不應(yīng)該在協(xié)程里面使用,它是用來橋接需要阻塞的掛起函數(shù),主要用于 main function 和 junit 測試。注意他只是一個普通函數(shù)。

coroutineScope
        runBlocking {
            val result = coroutineScope {
                println("run thread = ${Thread.currentThread().name}")
                launch {
                    delay(1000)
                    println("run launch success1")
                }
                launch {
                    delay(2000)
                    println("run launch success2")
                }
                "coroutineScope result"
            }
            println("result = $result")
        }
// 運行結(jié)果(看得出來使用的是父協(xié)程的線程調(diào)度器)
run thread = main   
run launch success1
run launch success2
result = coroutineScope result

// 注意這是一個掛起函數(shù),它會掛起執(zhí)行一直到里面的子協(xié)程執(zhí)行完畢、代碼執(zhí)行完畢然后返回結(jié)果,這是他與runBlocking不同的地方,runBlocking只是個普通函數(shù),里面的協(xié)程體會阻塞線程。
public suspend fun <R> coroutineScope(block: suspend CoroutineScope.() -> R): R {
    contract {
        callsInPlace(block, InvocationKind.EXACTLY_ONCE)
    }
    return suspendCoroutineUninterceptedOrReturn { uCont ->
        val coroutine = ScopeCoroutine(uCont.context, uCont)
        coroutine.startUndispatchedOrReturn(coroutine, block)
    }
}

coroutineScope經(jīng)常用來把一個長耗時的任務(wù)拆分成多個子任務(wù),使這些子任務(wù)并行執(zhí)行,這就是所謂的結(jié)構(gòu)化并發(fā):

suspend fun showSomeData() = coroutineScope {
    val data1 = async {         //子任務(wù)1
        delay(2000)
        100
    }
    val data2 = async {         //子任務(wù)2
        delay(3000)
        20
    }
    
    withContext(Dispatchers.Default) {      //合并結(jié)果并返回
        delay(3000)
        val random = Random(10)
        data1.await() + data2.await() + random.nextInt(100)
    }
}
supervisorScope

supervisorScope功能與coroutinScope類似,不同之處在于,它可以讓里面的子協(xié)程發(fā)生異常而崩潰的時候不影響自身和其他的子協(xié)程的執(zhí)行。

        runBlocking {
            val result = supervisorScope {
                println("run thread = ${Thread.currentThread().name}")
                launch {
                    delay(1000)
                  // 主動拋異常
                    throw RuntimeException()
                    println("run launch success1")
                }
                launch {
                    delay(2000)
                    println("run launch success2")
                }
                "coroutineScope result"
            }
            println("result = $result")
        }
// 運行結(jié)果
run thread = main
run launch success2
result = coroutineScope result

說到supervisorScope順便提一嘴SupervisorJob,創(chuàng)建作用域或者啟動協(xié)程的時候如果上下文帶上SupervisorJob()也可以達到自身協(xié)程處理異常而不影響父協(xié)程和兄弟協(xié)程的運行。

常用函數(shù)
withTimeout
suspend fun timeOut(){
    try {
        withTimeout(1000){
            println("在此做一些耗時操作,超過設(shè)定的時間就拋異常")
            delay(2500)
        }
    }catch (e:TimeoutCancellationException){
        println(e.message)
        // 處理超時異常
    }
}
withContext
// 掛起函數(shù),適合一些耗時異步操作
suspend fun timeOut(){
   val result = withContext(Dispatchers.IO){
        delay(2500)
        "result"
    }
println(result)
}
flow

private fun flowEmit() = flow<Int> {
        // 這段代碼塊實際運行在 flowEmit().collect所處的協(xié)程代碼塊里
        println("...$coroutineContext")
        for (k in 1 .. 3){
            delay(1000)
            emit(k)
        }
    }
    @Test
    fun flowTest() = runBlocking {
        println("...${this.coroutineContext}")
        //  flowEmit()只是創(chuàng)建了個對象
        // 調(diào)用collect的時候才開始執(zhí)行上面定義的代碼塊
        flowEmit().collect(object : FlowCollector<Int> {
            // 這個代碼其實也是運行在runBlocking協(xié)程域里
            override suspend fun emit(value: Int) {
                println("...$value")
            }
        })
        //
        println("...end")
    }

輸出
...[CoroutineId(1), "coroutine#1":BlockingCoroutine{Active}@2bdd8394, BlockingEventLoop@5f9edf14]
...[CoroutineId(1), "coroutine#1":BlockingCoroutine{Active}@2bdd8394, BlockingEventLoop@5f9edf14]
...1
...2
...3
...end
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容