kotlin協(xié)程

協(xié)程基礎(chǔ)

  • 輕量級線程。在一個(gè)線程中可以啟動多個(gè)協(xié)程。
  • 在協(xié)程中使用同步方式寫出異步代碼(協(xié)程掛起時(shí)不會阻塞線程),解決回調(diào)地獄。
image.png
GlobalScope.launch(Dispatchers.Main) {//開始協(xié)程:主線程
   val result = userApi.getUserSuspend("suming")//網(wǎng)絡(luò)請求(IO 線程)
   tv_name.text = result?.name //更新 UI(主線程)
}

在主線程中創(chuàng)建協(xié)程A中執(zhí)行整個(gè)業(yè)務(wù)流程,如果遇到異步調(diào)用任務(wù)則協(xié)程A被掛起,切換到IO線程中創(chuàng)建子協(xié)程B,獲取結(jié)果后再恢復(fù)到主線程的協(xié)程A上,然后繼續(xù)執(zhí)行剩下的流程。

xxxScope.launch()、runBlocking:T與async

  • runBlocking:T:頂層函數(shù),創(chuàng)建一個(gè)新的協(xié)程同時(shí)阻塞當(dāng)前線程,直到其內(nèi)部所有邏輯以及子協(xié)程所有邏輯全部執(zhí)行完成,返回值是泛型T,一般在項(xiàng)目中不會使用,主要是為main函數(shù)和測試設(shè)計(jì)的。
  • launch:創(chuàng)建一個(gè)新的協(xié)程,不會阻塞當(dāng)前線程,必須在協(xié)程作用域中才可以調(diào)用。它返回的是一個(gè)該協(xié)程任務(wù)的引用,即Job對象。這是最常用的用于啟動協(xié)程的方式。
  • async:創(chuàng)建一個(gè)新的協(xié)程,不會阻塞當(dāng)前線程,必須在協(xié)程作用域中才可以調(diào)用。并返回Deffer對象,可通過調(diào)用Deffer.await()方法等待該子協(xié)程執(zhí)行完成并獲取結(jié)果。常用于并發(fā)執(zhí)行-同步等待和獲取返回值的情況。
# Builders.common.kt
    
public fun CoroutineScope.launch(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> Unit
): Job {
    val newContext = newCoroutineContext(context)
    val coroutine = if (start.isLazy)
        LazyStandaloneCoroutine(newContext, block) else
        StandaloneCoroutine(newContext, active = true)
    coroutine.start(start, coroutine, block)
    return coroutine
}

Job與Deffered

  • Job與Deffered的api設(shè)計(jì)類似于Thread
  • Job實(shí)例作為協(xié)程的唯一標(biāo)識,用于處理協(xié)程,并且負(fù)責(zé)管理協(xié)程的生命周期
public interface Job : CoroutineContext.Element {
    //活躍的,是否仍在執(zhí)行
    public val isActive: Boolean


    //啟動協(xié)程,如果啟動了協(xié)程,則為true;如果協(xié)程已經(jīng)啟動或完成,則為false
    public fun start(): Boolean
    
    //取消Job,可通過傳入Exception說明具體原因
    public fun cancel(cause: CancellationException? = null)
    
    //掛起協(xié)程直到此Job完成
    public suspend fun join()
    
    //取消任務(wù)并等待任務(wù)完成,結(jié)合了[cancel]和[join]的調(diào)用
    public suspend fun Job.cancelAndJoin()


    //給Job設(shè)置一個(gè)完成通知,當(dāng)Job執(zhí)行完成的時(shí)候會同步執(zhí)行這個(gè)函數(shù)
    public fun invokeOnCompletion(handler: CompletionHandler): DisposableHandle
}

Job 還可以有層級關(guān)系,一個(gè)Job可以包含多個(gè)子Job,當(dāng)父Job被取消后,所有的子Job也會被自動取消;當(dāng)子Job被取消或者出現(xiàn)異常后父Job也會被取消。具有多個(gè)子 Job 的父Job 會等待所有子Job完成(或者取消)后,自己才會執(zhí)行完成。

public interface Deferred<out T> : Job {
    //等待協(xié)程執(zhí)行完成并獲取結(jié)果
    public suspend fun await(): T
}

協(xié)程作用域

  • runBlocking:頂層函數(shù),它的第二個(gè)參數(shù)為接收者是CoroutineScope的函數(shù)字面量,可啟動協(xié)程。但是它會阻塞當(dāng)前線程,主要用于測試。
  • GlobalScope:全局協(xié)程作用域,通過GlobalScope創(chuàng)建的協(xié)程不會有父協(xié)程,可以把它稱為根協(xié)程。它啟動的協(xié)程的生命周期只受整個(gè)應(yīng)用程序的生命周期的限制,且不能取消,在運(yùn)行時(shí)會消耗一些內(nèi)存資源,這可能會導(dǎo)致內(nèi)存泄露,所以仍不適用于業(yè)務(wù)開發(fā)。
  • coroutineScope:創(chuàng)建一個(gè)獨(dú)立的協(xié)程作用域,直到所有啟動的協(xié)程都完成后才結(jié)束自身。它是一個(gè)掛起函數(shù),需要運(yùn)行在協(xié)程內(nèi)或掛起函數(shù)內(nèi)。當(dāng)這個(gè)作用域中的任何一個(gè)子協(xié)程失敗時(shí),這個(gè)作用域失敗,所有其他的子程序都被取消。為并行分解工作而設(shè)計(jì)的。
  • supervisorScope:與coroutineScope類似,不同的是子協(xié)程的異常不會影響父協(xié)程,也不會影響其他子協(xié)程。(作用域本身的失敗(在block或取消中拋出異常)會導(dǎo)致作用域及其所有子協(xié)程失敗,但不會取消父協(xié)程。)
  • MainScope:為UI組件創(chuàng)建主作用域。一個(gè)頂層函數(shù),上下文是SupervisorJob() + Dispatchers.Main,說明它是一個(gè)在主線程執(zhí)行的協(xié)程作用域,通過cancel對協(xié)程進(jìn)行取消。推薦使用。
  • lifecycleScope:Lifecycle Ktx庫提供的具有生命周期感知的協(xié)程作用域,與Lifecycle綁定生命周期,生命周期被銷毀時(shí),此作用域?qū)⒈蝗∠?。會與當(dāng)前的UI組件綁定生命周期,界面銷毀時(shí)該協(xié)程作用域?qū)⒈蝗∠?,不會造成協(xié)程泄漏,推薦使用。
  • viewModelScope:與lifecycleScope類似,與ViewModel綁定生命周期,當(dāng)ViewModel被清除時(shí),這個(gè)作用域?qū)⒈蝗∠?。推薦使用。
fun launchTest2() {
    print("start")
    //開啟一個(gè)IO模式的協(xié)程,通過協(xié)程上下文創(chuàng)建一個(gè)CoroutineScope對象,需要一個(gè)類型為CoroutineContext的參數(shù)
    val job = CoroutineScope(Dispatchers.IO).launch {
        delay(1000)//1秒無阻塞延遲(默認(rèn)單位為毫秒)
        print("CoroutineScope.launch")
    }
    print("end")//主線程繼續(xù),而協(xié)程被延遲
}
private suspend fun testSupervisorScope() = supervisorScope {    
    launch { throw IllegalArgumentException("隨便拋一個(gè)異常") }    
    launch {        
        delay(1000)        
        Log.e("crx", "另一個(gè)協(xié)程")
    }
}


private suspend fun testCoroutineScope() = coroutineScope {    
    launch { throw IllegalArgumentException("隨便拋一個(gè)異常") }    
    launch {        
        delay(1000)        
        Log.e("crx", "另一個(gè)協(xié)程")    
    }
}

//執(zhí)行testSupervisorScope方法打印的結(jié)果
E/crx: 異常信息: 隨便拋一個(gè)異常
E/crx: 另一個(gè)協(xié)程
E/crx: 在執(zhí)行完了Scope之后


//執(zhí)行testCoroutineScope方法打印的結(jié)果
E/crx: 異常信息: 隨便拋一個(gè)異常
class MainActivity : AppCompatActivity() {


    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_main)


        btn_data.setOnClickListener {
            lifecycleScope.launch {//使用lifecycleScope創(chuàng)建協(xié)程
                //協(xié)程執(zhí)行體
            }
        }
    }
}


class MainViewModel : ViewModel() {
    fun getData() {
        viewModelScope.launch {//使用viewModelScope創(chuàng)建協(xié)程
            //執(zhí)行協(xié)程
        }
    }
}

協(xié)程異常

當(dāng)協(xié)程作用域中的一個(gè)協(xié)程發(fā)生異常時(shí),此時(shí)的異常流程如下所示:

  • 發(fā)生異常的協(xié)程被cancel
  • 異常傳遞到它的父協(xié)程
  • 父協(xié)程cancel(取消其所有子協(xié)程)
  • 將異常在協(xié)程樹上進(jìn)一步向上傳播

被封裝到deferred對象中的異常才會在調(diào)用await時(shí)拋出。

    private val job: Job = Job()
    private val scope = CoroutineScope(Dispatchers.Default + job)

    private fun doWork(): Deferred<String> = scope.async { throw NullPointerException("自定義空指針異常") }


    private fun loadData() = scope.launch {
        try {
            doWork().await()
        } catch (e: Exception) {
            Log.d("try catch捕獲的異常:", e.toString())
        }
    }

Job.cancel 取消任務(wù)時(shí)會拋出CancellationException 給指定協(xié)程,但是不會結(jié)構(gòu)化并發(fā)到父協(xié)程。

CoroutineExceptionHandler

CoroutineExceptionHandler只能處理當(dāng)前域內(nèi)開啟的子協(xié)程或者當(dāng)前協(xié)程拋出的異常。

supervisorScope 和SupervisorJob

supervisorScope 和 SupervisorJob的原理是:將異常不傳播給自己的父協(xié)程。

調(diào)度器Dispatcher

public abstract class CoroutineDispatcher :
    AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {


    //將可運(yùn)行塊的執(zhí)行分派到給定上下文中的另一個(gè)線程上。這個(gè)方法應(yīng)該保證給定的[block]最終會被調(diào)用。
    public abstract fun dispatch(context: CoroutineContext, block: Runnable)


    //返回一個(gè)continuation,它封裝了提供的[continuation],攔截了所有的恢復(fù)。
    public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T>


    //CoroutineDispatcher是一個(gè)協(xié)程上下文元素,而'+'是一個(gè)用于協(xié)程上下文的集合和操作符。
    public operator fun plus(other: CoroutineDispatcher): CoroutineDispatcher = other
}

使用方式一:

fun dispatchersTest() {
    //創(chuàng)建一個(gè)在主線程執(zhí)行的協(xié)程作用域
    val mainScope = MainScope()
    mainScope.launch {
        launch(Dispatchers.Main) {//在協(xié)程上下參數(shù)中指定調(diào)度器
            print("主線程調(diào)度器")
        }
        launch(Dispatchers.Default) {
            print("默認(rèn)調(diào)度器")
        }
        launch(Dispatchers.Unconfined) {
            print("任意調(diào)度器")
        }
        launch(Dispatchers.IO) {
            print("IO調(diào)度器")
        }
    }
}

使用方式二:

//用給定的協(xié)程上下文調(diào)用指定的掛起塊,掛起直到它完成,并返回結(jié)果。
public suspend fun <T> withContext(
    context: CoroutineContext,
    block: suspend CoroutineScope.() -> T
): T


GlobalScope.launch(Dispatchers.Main) {//開始協(xié)程:主線程
    val result: User = withContext(Dispatchers.IO) {//網(wǎng)絡(luò)請求(IO 線程)
        userApi.getUserSuspend("FollowExcellence")
    }
    tv_title.text = result.name //更新 UI(主線程)
}

啟動模式

CoroutineStart是一個(gè)枚舉類,為協(xié)程構(gòu)建器定義啟動選項(xiàng)。在協(xié)程構(gòu)建的start參數(shù)中使用。

image.png

協(xié)程上下文

協(xié)程使用以下幾種元素集定義協(xié)程的行為,它們均繼承自CoroutineContext:

  • Job:協(xié)程的句柄,對協(xié)程的控制和管理生命周期。
  • CoroutineName:協(xié)程的名稱,可用于調(diào)試。
  • CoroutineDispatcher:調(diào)度器,確定協(xié)程在指定的線程來執(zhí)行。
  • CoroutineExceptionHandler:協(xié)程異常處理器,處理未捕獲的異常。

suspend本質(zhì)

//Continuation接口表示掛起點(diǎn)之后的延續(xù),該掛起點(diǎn)返回類型為“T”的值。
public interface Continuation<in T> {
    //對應(yīng)這個(gè)Continuation的協(xié)程上下文
    public val context: CoroutineContext


    //恢復(fù)相應(yīng)協(xié)程的執(zhí)行,傳遞一個(gè)成功或失敗的結(jié)果作為最后一個(gè)掛起點(diǎn)的返回值。
    public fun resumeWith(result: Result<T>)
}


//將[value]作為最后一個(gè)掛起點(diǎn)的返回值,恢復(fù)相應(yīng)協(xié)程的執(zhí)行。
fun <T> Continuation<T>.resume(value: T): Unit =
    resumeWith(Result.success(value))


//恢復(fù)相應(yīng)協(xié)程的執(zhí)行,以便在最后一個(gè)掛起點(diǎn)之后重新拋出[異常]。
fun <T> Continuation<T>.resumeWithException(exception: Throwable): Unit =
    resumeWith(Result.failure(exception))

Kotlin 使用堆棧幀來管理要運(yùn)行哪個(gè)函數(shù)以及所有局部變量。掛起(暫停)協(xié)程時(shí),會復(fù)制并保存當(dāng)前的堆棧幀以供稍后使用,將信息保存到Continuation對象中。恢復(fù)協(xié)程時(shí),會將堆棧幀從其保存位置復(fù)制回來,對應(yīng)的Continuation通過調(diào)用resumeWith函數(shù)才會恢復(fù)協(xié)程的執(zhí)行,然后函數(shù)再次開始運(yùn)行。同時(shí)返回Result<T>類型的成功或者異常的結(jié)果。

@GET("users/{login}")
suspend fun getUserSuspend(@Path("login") login: String): User

反編譯后
public abstract getUserSuspend(Ljava/lang/String;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public interface Continuation<in T> {
    //協(xié)程上下文
    public val context: CoroutineContext


    //恢復(fù)相應(yīng)協(xié)程的執(zhí)行,傳遞一個(gè)成功或失敗的[result]作為最后一個(gè)掛起點(diǎn)的返回值。
    public fun resumeWith(result: Result<T>)
}

協(xié)程原理

https://mp.weixin.qq.com/s/nXfweTaOCpm6Bj34rW-wLA 協(xié)程的本質(zhì)和原理:基于CPS( Continuation-Passing-Style Transformation)和狀態(tài)機(jī)

  • 掛起函數(shù),在執(zhí)行的時(shí)候并不一定都會掛起,掛起函數(shù)里包含其他掛起函數(shù)的時(shí)候,它才會真正被掛起
  • 掛起函數(shù)只能在其他掛起函數(shù)中被調(diào)用(or 協(xié)程作用域)
    Continuation 則代表了,程序繼續(xù)運(yùn)行下去需要執(zhí)行的代碼,接下來要執(zhí)行的代碼 或者 剩下的代碼。
image.png
image.png
internal abstract class BaseContinuationImpl(...) {
// 實(shí)現(xiàn) Continuation 的 resumeWith,并且是 final 的,不可被重寫
public final override fun resumeWith(result: Result<Any?>) {
...
val outcome = invokeSuspend(param)
...
}
// 由編譯生成的協(xié)程相關(guān)類來實(shí)現(xiàn),例如 postItem$1
protected abstract fun invokeSuspend(result: Result<Any?>): Any?
}
suspend fun testCoroutine() {
    log("start")
    val user = getUserInfo()
    log(user)
    val friendList = getFriendList(user)
    log(friendList)
    val feedList = getFeedList(friendList)
    log(feedList)
}
fun testCoroutine(completion: Continuation<Any?>): Any? {

    //completion表示運(yùn)行完testCoroutine掛起函數(shù)后需要運(yùn)行的代碼
    class TestContinuation(completion: Continuation<Any?>?) : ContinuationImpl(completion) {
        // 表示協(xié)程狀態(tài)機(jī)當(dāng)前的狀態(tài)
        var label: Int = 0
        // 協(xié)程返回結(jié)果
        var result: Any? = null


        // 用于保存之前協(xié)程的計(jì)算結(jié)果
        var mUser: Any? = null
        var mFriendList: Any? = null


        // invokeSuspend 是協(xié)程的關(guān)鍵(Continuation#resumeWith中會調(diào)用invokeSuspend)
        // 它最終會調(diào)用 testCoroutine(this) 開啟協(xié)程狀態(tài)機(jī)
        // 狀態(tài)機(jī)相關(guān)代碼就是后面的 when 語句
        // 協(xié)程的本質(zhì),可以說就是 CPS + 狀態(tài)機(jī)
        override fun invokeSuspend(_result: Result<Any?>): Any? {
            result = _result
            label = label or Int.Companion.MIN_VALUE
            return testCoroutine(this)
        }
    }

    //說明在運(yùn)行期間只會生成一個(gè)Continuation對象
    val continuation = if (completion is TestContinuation) {
        completion
    } else {
        //                作為參數(shù)
        //                   ↓
        //1.初次運(yùn)行
        //2.用一個(gè)新的Continuation包裝了舊的Continuation
        TestContinuation(completion)
    }


        // 三個(gè)變量,對應(yīng)原函數(shù)的三個(gè)變量
    lateinit var user: String
    lateinit var friendList: String
    lateinit var feedList: String


    // result 接收協(xié)程的運(yùn)行結(jié)果
    var result = continuation.result


    // suspendReturn 接收掛起函數(shù)的返回值
    var suspendReturn: Any? = null


    // CoroutineSingletons 是個(gè)枚舉類
    // COROUTINE_SUSPENDED 代表當(dāng)前函數(shù)被掛起了
    val sFlag = CoroutineSingletons.COROUTINE_SUSPENDED

    when (continuation.label) {
    0 -> {
        // 檢測異常
        throwOnFailure(result)


        log("start")
        // 將 label 置為 1,準(zhǔn)備進(jìn)入下一次狀態(tài)
        continuation.label = 1


        // 執(zhí)行 getUserInfo
        suspendReturn = getUserInfo(continuation)


        // 判斷是否掛起
        if (suspendReturn == sFlag) {
            return suspendReturn
        } else {
            result = suspendReturn
            //go to next state
        }
    }


    1 -> {
        throwOnFailure(result)


        // 獲取 user 值
        user = result as String
        log(user)
        // 將協(xié)程結(jié)果存到 continuation 里
        continuation.mUser = user
        // 準(zhǔn)備進(jìn)入下一個(gè)狀態(tài)
        continuation.label = 2


        // 執(zhí)行 getFriendList
        suspendReturn = getFriendList(user, continuation)


        // 判斷是否掛起
        if (suspendReturn == sFlag) {
            return suspendReturn
        } else {
            result = suspendReturn
            //go to next state
        }
    }


    2 -> {
        throwOnFailure(result)


        user = continuation.mUser as String


        // 獲取 friendList 的值
        friendList = result as String
        log(friendList)


        // 將協(xié)程結(jié)果存到 continuation 里
        continuation.mUser = user
        continuation.mFriendList = friendList


        // 準(zhǔn)備進(jìn)入下一個(gè)狀態(tài)
        continuation.label = 3


        // 執(zhí)行 getFeedList
        suspendReturn = getFeedList(friendList, continuation)


        // 判斷是否掛起
        if (suspendReturn == sFlag) {
            return suspendReturn
        } else {
            result = suspendReturn
            //go to next state
        }
    }


    3 -> {
        throwOnFailure(result)


        user = continuation.mUser as String
        friendList = continuation.mFriendList as String
        feedList = continuation.result as String
        log(feedList)
        loop = false
    }
}

}

協(xié)程線程切換原理

https://mp.weixin.qq.com/s/iitYHxn6vPpE_wMsiPoplg

協(xié)程的并發(fā)處理

https://mp.weixin.qq.com/s/6paEFQDD-lHYMjcWmwZdhw

非阻塞式鎖Mutex:拿不到鎖時(shí)協(xié)程就掛起

線程中鎖都是阻塞式,在沒有獲取鎖時(shí)無法執(zhí)行其他邏輯,而協(xié)程可以通過掛起函數(shù)解決這個(gè),沒有獲取鎖就掛起協(xié)程,獲取后再恢復(fù)協(xié)程,協(xié)程掛起時(shí)線程并沒有阻塞可以執(zhí)行其他邏輯。這種互斥鎖就是Mutex,它與synchronized關(guān)鍵字有些類似,還提供了withLock擴(kuò)展函數(shù),替代常用的mutex.lock; try {...} finally { mutex.unlock() }:

image.png

Mutex大致邏輯還是非常清晰的,協(xié)程先獲取鎖,然后執(zhí)行代碼塊,然后釋放鎖,其他協(xié)程如果進(jìn)入,必須先獲取鎖,獲取不到協(xié)程執(zhí)行掛起方法suspend fun lockSuspend(owner: Any?), 加入等待隊(duì)列,掛起協(xié)程。等待其他協(xié)程釋放鎖之后,恢復(fù)協(xié)程。

整體還是建立在CAS基礎(chǔ)上,封裝的一套解決方案。

協(xié)程的異常處理方式

https://juejin.cn/post/6935472332735512606/

如果協(xié)程本身不使用try-catch子句自行處理異常,則不會重新拋出該異常,因此無法通過外部try-catch子句進(jìn)行處理。

異常會在“Job層次結(jié)構(gòu)中傳播”,可以由已設(shè)置的CoroutineExceptionHandler處理。 如果未設(shè)置,則調(diào)用該線程的未捕獲異常處理程序。如下代碼依然會崩潰

fun main() {

    val topLevelScope = CoroutineScope(Job())

    topLevelScope.launch {

        try {

            launch {

                throw RuntimeException("RuntimeException in nested coroutine")

            }

        } catch (exception: Exception) {

            println("Handle $exception")

        }

    }

    Thread.sleep(100)

}

為了使CoroutineExceptionHandler起作用,必須將其設(shè)置在CoroutineScope或頂級協(xié)程中, 給子協(xié)程設(shè)置CoroutineExceptionHandler是沒有效果的。

// ...

val topLevelScope = CoroutineScope(Job() + coroutineExceptionHandler)

// ...

// ...

topLevelScope.launch(coroutineExceptionHandler) {

// ...

在代碼的特定部分處理異常,可使用try-catch。

全局捕獲異常,并且其中一個(gè)任務(wù)異常,其他任務(wù)不執(zhí)行,可使用CoroutineExceptionHandler,節(jié)省資源消耗。

并行任務(wù)間互不干擾,任何一個(gè)任務(wù)失敗,其他任務(wù)照常運(yùn)行,可使用SupervisorScope+async模式。

協(xié)程的結(jié)構(gòu)化并發(fā)

1、父作用域的生命周期持續(xù)到所有子作用域執(zhí)行完;

2、當(dāng)結(jié)束父作用域結(jié)束時(shí),同時(shí)結(jié)束它的各個(gè)子作用域;

3、子作用域未捕獲到的異常將不會被重新拋出,而是一級一級向父作用域傳遞,這種異常傳播將導(dǎo)致父作用域失敗,進(jìn)而導(dǎo)致其子作用域的所有請求被取消。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容