協(xié)程基礎(chǔ)
- 輕量級線程。在一個(gè)線程中可以啟動多個(gè)協(xié)程。
- 在協(xié)程中使用同步方式寫出異步代碼(協(xié)程掛起時(shí)不會阻塞線程),解決回調(diào)地獄。

GlobalScope.launch(Dispatchers.Main) {//開始協(xié)程:主線程
val result = userApi.getUserSuspend("suming")//網(wǎng)絡(luò)請求(IO 線程)
tv_name.text = result?.name //更新 UI(主線程)
}
在主線程中創(chuàng)建協(xié)程A中執(zhí)行整個(gè)業(yè)務(wù)流程,如果遇到異步調(diào)用任務(wù)則協(xié)程A被掛起,切換到IO線程中創(chuàng)建子協(xié)程B,獲取結(jié)果后再恢復(fù)到主線程的協(xié)程A上,然后繼續(xù)執(zhí)行剩下的流程。
xxxScope.launch()、runBlocking:T與async
- runBlocking:T:頂層函數(shù),創(chuàng)建一個(gè)新的協(xié)程同時(shí)阻塞當(dāng)前線程,直到其內(nèi)部所有邏輯以及子協(xié)程所有邏輯全部執(zhí)行完成,返回值是泛型T,一般在項(xiàng)目中不會使用,主要是為main函數(shù)和測試設(shè)計(jì)的。
- launch:創(chuàng)建一個(gè)新的協(xié)程,不會阻塞當(dāng)前線程,必須在協(xié)程作用域中才可以調(diào)用。它返回的是一個(gè)該協(xié)程任務(wù)的引用,即Job對象。這是最常用的用于啟動協(xié)程的方式。
- async:創(chuàng)建一個(gè)新的協(xié)程,不會阻塞當(dāng)前線程,必須在協(xié)程作用域中才可以調(diào)用。并返回Deffer對象,可通過調(diào)用Deffer.await()方法等待該子協(xié)程執(zhí)行完成并獲取結(jié)果。常用于并發(fā)執(zhí)行-同步等待和獲取返回值的情況。
# Builders.common.kt
public fun CoroutineScope.launch(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> Unit
): Job {
val newContext = newCoroutineContext(context)
val coroutine = if (start.isLazy)
LazyStandaloneCoroutine(newContext, block) else
StandaloneCoroutine(newContext, active = true)
coroutine.start(start, coroutine, block)
return coroutine
}
Job與Deffered
- Job與Deffered的api設(shè)計(jì)類似于Thread
- Job實(shí)例作為協(xié)程的唯一標(biāo)識,用于處理協(xié)程,并且負(fù)責(zé)管理協(xié)程的生命周期
public interface Job : CoroutineContext.Element {
//活躍的,是否仍在執(zhí)行
public val isActive: Boolean
//啟動協(xié)程,如果啟動了協(xié)程,則為true;如果協(xié)程已經(jīng)啟動或完成,則為false
public fun start(): Boolean
//取消Job,可通過傳入Exception說明具體原因
public fun cancel(cause: CancellationException? = null)
//掛起協(xié)程直到此Job完成
public suspend fun join()
//取消任務(wù)并等待任務(wù)完成,結(jié)合了[cancel]和[join]的調(diào)用
public suspend fun Job.cancelAndJoin()
//給Job設(shè)置一個(gè)完成通知,當(dāng)Job執(zhí)行完成的時(shí)候會同步執(zhí)行這個(gè)函數(shù)
public fun invokeOnCompletion(handler: CompletionHandler): DisposableHandle
}
Job 還可以有層級關(guān)系,一個(gè)Job可以包含多個(gè)子Job,當(dāng)父Job被取消后,所有的子Job也會被自動取消;當(dāng)子Job被取消或者出現(xiàn)異常后父Job也會被取消。具有多個(gè)子 Job 的父Job 會等待所有子Job完成(或者取消)后,自己才會執(zhí)行完成。
public interface Deferred<out T> : Job {
//等待協(xié)程執(zhí)行完成并獲取結(jié)果
public suspend fun await(): T
}
協(xié)程作用域
- runBlocking:頂層函數(shù),它的第二個(gè)參數(shù)為接收者是CoroutineScope的函數(shù)字面量,可啟動協(xié)程。但是它會阻塞當(dāng)前線程,主要用于測試。
- GlobalScope:全局協(xié)程作用域,通過GlobalScope創(chuàng)建的協(xié)程不會有父協(xié)程,可以把它稱為根協(xié)程。它啟動的協(xié)程的生命周期只受整個(gè)應(yīng)用程序的生命周期的限制,且不能取消,在運(yùn)行時(shí)會消耗一些內(nèi)存資源,這可能會導(dǎo)致內(nèi)存泄露,所以仍不適用于業(yè)務(wù)開發(fā)。
- coroutineScope:創(chuàng)建一個(gè)獨(dú)立的協(xié)程作用域,直到所有啟動的協(xié)程都完成后才結(jié)束自身。它是一個(gè)掛起函數(shù),需要運(yùn)行在協(xié)程內(nèi)或掛起函數(shù)內(nèi)。當(dāng)這個(gè)作用域中的任何一個(gè)子協(xié)程失敗時(shí),這個(gè)作用域失敗,所有其他的子程序都被取消。為并行分解工作而設(shè)計(jì)的。
- supervisorScope:與coroutineScope類似,不同的是子協(xié)程的異常不會影響父協(xié)程,也不會影響其他子協(xié)程。(作用域本身的失敗(在block或取消中拋出異常)會導(dǎo)致作用域及其所有子協(xié)程失敗,但不會取消父協(xié)程。)
- MainScope:為UI組件創(chuàng)建主作用域。一個(gè)頂層函數(shù),上下文是SupervisorJob() + Dispatchers.Main,說明它是一個(gè)在主線程執(zhí)行的協(xié)程作用域,通過cancel對協(xié)程進(jìn)行取消。推薦使用。
- lifecycleScope:Lifecycle Ktx庫提供的具有生命周期感知的協(xié)程作用域,與Lifecycle綁定生命周期,生命周期被銷毀時(shí),此作用域?qū)⒈蝗∠?。會與當(dāng)前的UI組件綁定生命周期,界面銷毀時(shí)該協(xié)程作用域?qū)⒈蝗∠?,不會造成協(xié)程泄漏,推薦使用。
- viewModelScope:與lifecycleScope類似,與ViewModel綁定生命周期,當(dāng)ViewModel被清除時(shí),這個(gè)作用域?qū)⒈蝗∠?。推薦使用。
fun launchTest2() {
print("start")
//開啟一個(gè)IO模式的協(xié)程,通過協(xié)程上下文創(chuàng)建一個(gè)CoroutineScope對象,需要一個(gè)類型為CoroutineContext的參數(shù)
val job = CoroutineScope(Dispatchers.IO).launch {
delay(1000)//1秒無阻塞延遲(默認(rèn)單位為毫秒)
print("CoroutineScope.launch")
}
print("end")//主線程繼續(xù),而協(xié)程被延遲
}
private suspend fun testSupervisorScope() = supervisorScope {
launch { throw IllegalArgumentException("隨便拋一個(gè)異常") }
launch {
delay(1000)
Log.e("crx", "另一個(gè)協(xié)程")
}
}
private suspend fun testCoroutineScope() = coroutineScope {
launch { throw IllegalArgumentException("隨便拋一個(gè)異常") }
launch {
delay(1000)
Log.e("crx", "另一個(gè)協(xié)程")
}
}
//執(zhí)行testSupervisorScope方法打印的結(jié)果
E/crx: 異常信息: 隨便拋一個(gè)異常
E/crx: 另一個(gè)協(xié)程
E/crx: 在執(zhí)行完了Scope之后
//執(zhí)行testCoroutineScope方法打印的結(jié)果
E/crx: 異常信息: 隨便拋一個(gè)異常
class MainActivity : AppCompatActivity() {
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_main)
btn_data.setOnClickListener {
lifecycleScope.launch {//使用lifecycleScope創(chuàng)建協(xié)程
//協(xié)程執(zhí)行體
}
}
}
}
class MainViewModel : ViewModel() {
fun getData() {
viewModelScope.launch {//使用viewModelScope創(chuàng)建協(xié)程
//執(zhí)行協(xié)程
}
}
}
協(xié)程異常
當(dāng)協(xié)程作用域中的一個(gè)協(xié)程發(fā)生異常時(shí),此時(shí)的異常流程如下所示:
- 發(fā)生異常的協(xié)程被cancel
- 異常傳遞到它的父協(xié)程
- 父協(xié)程cancel(取消其所有子協(xié)程)
- 將異常在協(xié)程樹上進(jìn)一步向上傳播
被封裝到deferred對象中的異常才會在調(diào)用await時(shí)拋出。
private val job: Job = Job()
private val scope = CoroutineScope(Dispatchers.Default + job)
private fun doWork(): Deferred<String> = scope.async { throw NullPointerException("自定義空指針異常") }
private fun loadData() = scope.launch {
try {
doWork().await()
} catch (e: Exception) {
Log.d("try catch捕獲的異常:", e.toString())
}
}
Job.cancel 取消任務(wù)時(shí)會拋出CancellationException 給指定協(xié)程,但是不會結(jié)構(gòu)化并發(fā)到父協(xié)程。
CoroutineExceptionHandler
CoroutineExceptionHandler只能處理當(dāng)前域內(nèi)開啟的子協(xié)程或者當(dāng)前協(xié)程拋出的異常。
supervisorScope 和SupervisorJob
supervisorScope 和 SupervisorJob的原理是:將異常不傳播給自己的父協(xié)程。
調(diào)度器Dispatcher
public abstract class CoroutineDispatcher :
AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {
//將可運(yùn)行塊的執(zhí)行分派到給定上下文中的另一個(gè)線程上。這個(gè)方法應(yīng)該保證給定的[block]最終會被調(diào)用。
public abstract fun dispatch(context: CoroutineContext, block: Runnable)
//返回一個(gè)continuation,它封裝了提供的[continuation],攔截了所有的恢復(fù)。
public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T>
//CoroutineDispatcher是一個(gè)協(xié)程上下文元素,而'+'是一個(gè)用于協(xié)程上下文的集合和操作符。
public operator fun plus(other: CoroutineDispatcher): CoroutineDispatcher = other
}
使用方式一:
fun dispatchersTest() {
//創(chuàng)建一個(gè)在主線程執(zhí)行的協(xié)程作用域
val mainScope = MainScope()
mainScope.launch {
launch(Dispatchers.Main) {//在協(xié)程上下參數(shù)中指定調(diào)度器
print("主線程調(diào)度器")
}
launch(Dispatchers.Default) {
print("默認(rèn)調(diào)度器")
}
launch(Dispatchers.Unconfined) {
print("任意調(diào)度器")
}
launch(Dispatchers.IO) {
print("IO調(diào)度器")
}
}
}
使用方式二:
//用給定的協(xié)程上下文調(diào)用指定的掛起塊,掛起直到它完成,并返回結(jié)果。
public suspend fun <T> withContext(
context: CoroutineContext,
block: suspend CoroutineScope.() -> T
): T
GlobalScope.launch(Dispatchers.Main) {//開始協(xié)程:主線程
val result: User = withContext(Dispatchers.IO) {//網(wǎng)絡(luò)請求(IO 線程)
userApi.getUserSuspend("FollowExcellence")
}
tv_title.text = result.name //更新 UI(主線程)
}
啟動模式
CoroutineStart是一個(gè)枚舉類,為協(xié)程構(gòu)建器定義啟動選項(xiàng)。在協(xié)程構(gòu)建的start參數(shù)中使用。

協(xié)程上下文
協(xié)程使用以下幾種元素集定義協(xié)程的行為,它們均繼承自CoroutineContext:
- Job:協(xié)程的句柄,對協(xié)程的控制和管理生命周期。
- CoroutineName:協(xié)程的名稱,可用于調(diào)試。
- CoroutineDispatcher:調(diào)度器,確定協(xié)程在指定的線程來執(zhí)行。
- CoroutineExceptionHandler:協(xié)程異常處理器,處理未捕獲的異常。
suspend本質(zhì)
//Continuation接口表示掛起點(diǎn)之后的延續(xù),該掛起點(diǎn)返回類型為“T”的值。
public interface Continuation<in T> {
//對應(yīng)這個(gè)Continuation的協(xié)程上下文
public val context: CoroutineContext
//恢復(fù)相應(yīng)協(xié)程的執(zhí)行,傳遞一個(gè)成功或失敗的結(jié)果作為最后一個(gè)掛起點(diǎn)的返回值。
public fun resumeWith(result: Result<T>)
}
//將[value]作為最后一個(gè)掛起點(diǎn)的返回值,恢復(fù)相應(yīng)協(xié)程的執(zhí)行。
fun <T> Continuation<T>.resume(value: T): Unit =
resumeWith(Result.success(value))
//恢復(fù)相應(yīng)協(xié)程的執(zhí)行,以便在最后一個(gè)掛起點(diǎn)之后重新拋出[異常]。
fun <T> Continuation<T>.resumeWithException(exception: Throwable): Unit =
resumeWith(Result.failure(exception))
Kotlin 使用堆棧幀來管理要運(yùn)行哪個(gè)函數(shù)以及所有局部變量。掛起(暫停)協(xié)程時(shí),會復(fù)制并保存當(dāng)前的堆棧幀以供稍后使用,將信息保存到Continuation對象中。恢復(fù)協(xié)程時(shí),會將堆棧幀從其保存位置復(fù)制回來,對應(yīng)的Continuation通過調(diào)用resumeWith函數(shù)才會恢復(fù)協(xié)程的執(zhí)行,然后函數(shù)再次開始運(yùn)行。同時(shí)返回Result<T>類型的成功或者異常的結(jié)果。
@GET("users/{login}")
suspend fun getUserSuspend(@Path("login") login: String): User
反編譯后
public abstract getUserSuspend(Ljava/lang/String;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public interface Continuation<in T> {
//協(xié)程上下文
public val context: CoroutineContext
//恢復(fù)相應(yīng)協(xié)程的執(zhí)行,傳遞一個(gè)成功或失敗的[result]作為最后一個(gè)掛起點(diǎn)的返回值。
public fun resumeWith(result: Result<T>)
}
協(xié)程原理
https://mp.weixin.qq.com/s/nXfweTaOCpm6Bj34rW-wLA 協(xié)程的本質(zhì)和原理:基于CPS( Continuation-Passing-Style Transformation)和狀態(tài)機(jī)
- 掛起函數(shù),在執(zhí)行的時(shí)候并不一定都會掛起,掛起函數(shù)里包含其他掛起函數(shù)的時(shí)候,它才會真正被掛起
- 掛起函數(shù)只能在其他掛起函數(shù)中被調(diào)用(or 協(xié)程作用域)
Continuation 則代表了,程序繼續(xù)運(yùn)行下去需要執(zhí)行的代碼,接下來要執(zhí)行的代碼 或者 剩下的代碼。


internal abstract class BaseContinuationImpl(...) {
// 實(shí)現(xiàn) Continuation 的 resumeWith,并且是 final 的,不可被重寫
public final override fun resumeWith(result: Result<Any?>) {
...
val outcome = invokeSuspend(param)
...
}
// 由編譯生成的協(xié)程相關(guān)類來實(shí)現(xiàn),例如 postItem$1
protected abstract fun invokeSuspend(result: Result<Any?>): Any?
}
suspend fun testCoroutine() {
log("start")
val user = getUserInfo()
log(user)
val friendList = getFriendList(user)
log(friendList)
val feedList = getFeedList(friendList)
log(feedList)
}
fun testCoroutine(completion: Continuation<Any?>): Any? {
//completion表示運(yùn)行完testCoroutine掛起函數(shù)后需要運(yùn)行的代碼
class TestContinuation(completion: Continuation<Any?>?) : ContinuationImpl(completion) {
// 表示協(xié)程狀態(tài)機(jī)當(dāng)前的狀態(tài)
var label: Int = 0
// 協(xié)程返回結(jié)果
var result: Any? = null
// 用于保存之前協(xié)程的計(jì)算結(jié)果
var mUser: Any? = null
var mFriendList: Any? = null
// invokeSuspend 是協(xié)程的關(guān)鍵(Continuation#resumeWith中會調(diào)用invokeSuspend)
// 它最終會調(diào)用 testCoroutine(this) 開啟協(xié)程狀態(tài)機(jī)
// 狀態(tài)機(jī)相關(guān)代碼就是后面的 when 語句
// 協(xié)程的本質(zhì),可以說就是 CPS + 狀態(tài)機(jī)
override fun invokeSuspend(_result: Result<Any?>): Any? {
result = _result
label = label or Int.Companion.MIN_VALUE
return testCoroutine(this)
}
}
//說明在運(yùn)行期間只會生成一個(gè)Continuation對象
val continuation = if (completion is TestContinuation) {
completion
} else {
// 作為參數(shù)
// ↓
//1.初次運(yùn)行
//2.用一個(gè)新的Continuation包裝了舊的Continuation
TestContinuation(completion)
}
// 三個(gè)變量,對應(yīng)原函數(shù)的三個(gè)變量
lateinit var user: String
lateinit var friendList: String
lateinit var feedList: String
// result 接收協(xié)程的運(yùn)行結(jié)果
var result = continuation.result
// suspendReturn 接收掛起函數(shù)的返回值
var suspendReturn: Any? = null
// CoroutineSingletons 是個(gè)枚舉類
// COROUTINE_SUSPENDED 代表當(dāng)前函數(shù)被掛起了
val sFlag = CoroutineSingletons.COROUTINE_SUSPENDED
when (continuation.label) {
0 -> {
// 檢測異常
throwOnFailure(result)
log("start")
// 將 label 置為 1,準(zhǔn)備進(jìn)入下一次狀態(tài)
continuation.label = 1
// 執(zhí)行 getUserInfo
suspendReturn = getUserInfo(continuation)
// 判斷是否掛起
if (suspendReturn == sFlag) {
return suspendReturn
} else {
result = suspendReturn
//go to next state
}
}
1 -> {
throwOnFailure(result)
// 獲取 user 值
user = result as String
log(user)
// 將協(xié)程結(jié)果存到 continuation 里
continuation.mUser = user
// 準(zhǔn)備進(jìn)入下一個(gè)狀態(tài)
continuation.label = 2
// 執(zhí)行 getFriendList
suspendReturn = getFriendList(user, continuation)
// 判斷是否掛起
if (suspendReturn == sFlag) {
return suspendReturn
} else {
result = suspendReturn
//go to next state
}
}
2 -> {
throwOnFailure(result)
user = continuation.mUser as String
// 獲取 friendList 的值
friendList = result as String
log(friendList)
// 將協(xié)程結(jié)果存到 continuation 里
continuation.mUser = user
continuation.mFriendList = friendList
// 準(zhǔn)備進(jìn)入下一個(gè)狀態(tài)
continuation.label = 3
// 執(zhí)行 getFeedList
suspendReturn = getFeedList(friendList, continuation)
// 判斷是否掛起
if (suspendReturn == sFlag) {
return suspendReturn
} else {
result = suspendReturn
//go to next state
}
}
3 -> {
throwOnFailure(result)
user = continuation.mUser as String
friendList = continuation.mFriendList as String
feedList = continuation.result as String
log(feedList)
loop = false
}
}
}
協(xié)程線程切換原理
https://mp.weixin.qq.com/s/iitYHxn6vPpE_wMsiPoplg
協(xié)程的并發(fā)處理
https://mp.weixin.qq.com/s/6paEFQDD-lHYMjcWmwZdhw
非阻塞式鎖Mutex:拿不到鎖時(shí)協(xié)程就掛起
線程中鎖都是阻塞式,在沒有獲取鎖時(shí)無法執(zhí)行其他邏輯,而協(xié)程可以通過掛起函數(shù)解決這個(gè),沒有獲取鎖就掛起協(xié)程,獲取后再恢復(fù)協(xié)程,協(xié)程掛起時(shí)線程并沒有阻塞可以執(zhí)行其他邏輯。這種互斥鎖就是Mutex,它與synchronized關(guān)鍵字有些類似,還提供了withLock擴(kuò)展函數(shù),替代常用的mutex.lock; try {...} finally { mutex.unlock() }:

Mutex大致邏輯還是非常清晰的,協(xié)程先獲取鎖,然后執(zhí)行代碼塊,然后釋放鎖,其他協(xié)程如果進(jìn)入,必須先獲取鎖,獲取不到協(xié)程執(zhí)行掛起方法suspend fun lockSuspend(owner: Any?), 加入等待隊(duì)列,掛起協(xié)程。等待其他協(xié)程釋放鎖之后,恢復(fù)協(xié)程。
整體還是建立在CAS基礎(chǔ)上,封裝的一套解決方案。
協(xié)程的異常處理方式
https://juejin.cn/post/6935472332735512606/
如果協(xié)程本身不使用try-catch子句自行處理異常,則不會重新拋出該異常,因此無法通過外部try-catch子句進(jìn)行處理。
異常會在“Job層次結(jié)構(gòu)中傳播”,可以由已設(shè)置的CoroutineExceptionHandler處理。 如果未設(shè)置,則調(diào)用該線程的未捕獲異常處理程序。如下代碼依然會崩潰
fun main() {
val topLevelScope = CoroutineScope(Job())
topLevelScope.launch {
try {
launch {
throw RuntimeException("RuntimeException in nested coroutine")
}
} catch (exception: Exception) {
println("Handle $exception")
}
}
Thread.sleep(100)
}
為了使CoroutineExceptionHandler起作用,必須將其設(shè)置在CoroutineScope或頂級協(xié)程中, 給子協(xié)程設(shè)置CoroutineExceptionHandler是沒有效果的。
// ...
val topLevelScope = CoroutineScope(Job() + coroutineExceptionHandler)
// ...
// ...
topLevelScope.launch(coroutineExceptionHandler) {
// ...
在代碼的特定部分處理異常,可使用try-catch。
全局捕獲異常,并且其中一個(gè)任務(wù)異常,其他任務(wù)不執(zhí)行,可使用CoroutineExceptionHandler,節(jié)省資源消耗。
并行任務(wù)間互不干擾,任何一個(gè)任務(wù)失敗,其他任務(wù)照常運(yùn)行,可使用SupervisorScope+async模式。
協(xié)程的結(jié)構(gòu)化并發(fā)
1、父作用域的生命周期持續(xù)到所有子作用域執(zhí)行完;
2、當(dāng)結(jié)束父作用域結(jié)束時(shí),同時(shí)結(jié)束它的各個(gè)子作用域;
3、子作用域未捕獲到的異常將不會被重新拋出,而是一級一級向父作用域傳遞,這種異常傳播將導(dǎo)致父作用域失敗,進(jìn)而導(dǎo)致其子作用域的所有請求被取消。