Kotlin Coroutines(協(xié)程) 完全解析系列:
Kotlin Coroutines(協(xié)程) 完全解析(一),協(xié)程簡介
Kotlin Coroutines(協(xié)程) 完全解析(二),深入理解協(xié)程的掛起、恢復(fù)與調(diào)度
Kotlin Coroutines(協(xié)程) 完全解析(三),封裝異步回調(diào)、協(xié)程間關(guān)系及協(xié)程的取消
Kotlin Coroutines(協(xié)程) 完全解析(四),協(xié)程的異常處理
Kotlin Coroutines(協(xié)程) 完全解析(五),協(xié)程的并發(fā)
本文基于 Kotlin v1.3.0-rc-146,Kotlin-Coroutines v1.0.0-RC1
通過前面幾篇文章可以明白協(xié)程就是可以掛起和恢復(fù)執(zhí)行的運(yùn)算邏輯,掛起函數(shù)用狀態(tài)機(jī)的方式用掛起點(diǎn)將協(xié)程的運(yùn)算邏輯拆分為不同的片段,每次運(yùn)行協(xié)程執(zhí)行的不同的邏輯片段。所以協(xié)程在運(yùn)行時(shí)只是線程中的一塊代碼,線程的并發(fā)處理方式都可以用在協(xié)程上。不過協(xié)程還提供兩種特有的方式,一是不阻塞線程的互斥鎖Mutex,一是通過 ThreadLocal 實(shí)現(xiàn)的協(xié)程局部數(shù)據(jù)。
1. Mutex
線程中鎖都是阻塞式,在沒有獲取鎖時(shí)無法執(zhí)行其他邏輯,而協(xié)程可以通過掛起函數(shù)解決這個(gè),沒有獲取鎖就掛起協(xié)程,獲取后再恢復(fù)協(xié)程,協(xié)程掛起時(shí)線程并沒有阻塞可以執(zhí)行其他邏輯。這種互斥鎖就是Mutex,它與synchronized關(guān)鍵字有些類似,還提供了withLock擴(kuò)展函數(shù),替代常用的mutex.lock; try {...} finally { mutex.unlock() }:
fun main(args: Array<String>) = runBlocking<Unit> {
val mutex = Mutex()
var counter = 0
repeat(10000) {
GlobalScope.launch {
mutex.withLock {
counter ++
}
}
}
println("The final count is $counter")
}
Mutex的使用比較簡單,不過需要注意的是多個(gè)協(xié)程競爭的應(yīng)該是同一個(gè)Mutex互斥鎖。
2. 協(xié)程局部數(shù)據(jù)
線程中可以使用ThreadLocal作為線程局部數(shù)據(jù),每個(gè)線程中的數(shù)據(jù)都是獨(dú)立的。協(xié)程中可以通過ThreadLocal.asContextElement()擴(kuò)展函數(shù)實(shí)現(xiàn)協(xié)程局部數(shù)據(jù),每次協(xié)程切換會恢復(fù)之前的值。先看下面的示例:
fun main(args: Array<String>) = runBlocking<Unit> {
val threadLocal = ThreadLocal<String>().apply { set("Init") }
printlnValue(threadLocal)
val job = GlobalScope.launch(threadLocal.asContextElement("launch")) {
printlnValue(threadLocal)
threadLocal.set("launch changed")
printlnValue(threadLocal)
yield()
printlnValue(threadLocal)
}
job.join()
printlnValue(threadLocal)
}
private fun printlnValue(threadLocal: ThreadLocal<String>) {
println("${Thread.currentThread()} thread local value: ${threadLocal.get()}")
}
輸出如下:
Thread[main,5,main] thread local value: Init
Thread[DefaultDispatcher-worker-1,5,main] thread local value: launch
Thread[DefaultDispatcher-worker-1,5,main] thread local value: launch changed
Thread[DefaultDispatcher-worker-2,5,main] thread local value: launch
Thread[main,5,main] thread local value: Init
上面的輸出有個(gè)疑問的地方,為什么執(zhí)行yield()掛起函數(shù)后 threadLocal 的值不是launch changed而變回了launch?
下面直接分析源碼:
// 注意這里 value 的默認(rèn)值是 ThreadLocal 當(dāng)前值
public fun <T> ThreadLocal<T>.asContextElement(value: T = get()): ThreadContextElement<T> =
ThreadLocalElement(value, this)
internal class ThreadLocalElement<T>(
private val value: T,
private val threadLocal: ThreadLocal<T>
) : ThreadContextElement<T> {
override val key: CoroutineContext.Key<*> = ThreadLocalKey(threadLocal)
override fun updateThreadContext(context: CoroutineContext): T {
val oldState = threadLocal.get()
// 設(shè)置 threadLocal 的值為 value 前先保存了之前的值
threadLocal.set(value)
return oldState
}
override fun restoreThreadContext(context: CoroutineContext, oldState: T) {
// 將 threadLocal 修改為之前保存的值
threadLocal.set(oldState)
}
...
}
// 協(xié)程啟動和恢復(fù)都會用此函數(shù)包裝,在 Dispatched.run()、DisptchedContinuation.resumeWith() 、
// DisptchedContinuation.resumeUndispatched() 等協(xié)程啟動和恢復(fù)的地方都可以發(fā)現(xiàn)此函數(shù)的蹤影
internal actual inline fun <T> withCoroutineContext(context: CoroutineContext, countOrElement: Any?, block: () -> T): T {
// updateThreadContext() 函數(shù)會調(diào)用到 ThreadContextElement.updateThreadContext(context)
// oldValue 是 threadLocal 之前的值
val oldValue = updateThreadContext(context, countOrElement)
try {
return block()
} finally {
// restoreThreadContext() 函數(shù)會調(diào)用到 ThreadContextElement.restoreThreadContext(context, oldValue)
restoreThreadContext(context, oldValue)
}
}
根據(jù)上面的源碼和斷點(diǎn)調(diào)試,可以發(fā)現(xiàn)協(xié)程的啟動和恢復(fù)都會執(zhí)行一次ThreadContextElement.updateThreadContext(context)和ThreadContextElement.restoreThreadContext(context, oldValue),現(xiàn)在再分析一次上面的代碼運(yùn)行:
fun main(args: Array<String>) = runBlocking<Unit> {
val threadLocal = ThreadLocal<String>().apply { set("Init") }
// 此時(shí)在 main 線程,threadLocal 的值為 Init
printlnValue(threadLocal)
val job = GlobalScope.launch(threadLocal.asContextElement("launch")) {
// 啟動協(xié)程后,切換到 DefaultDispatcher-worker-1 線程,threadLocal 在該線程的值為 null
// 調(diào)用 updateThreadContext() 設(shè)置 threadLocal 的值為 launch,保存之前的為 null
printlnValue(threadLocal)
// 在 DefaultDispatcher-worker-1 線程,修改 threadLocal 的值為 launch changed
threadLocal.set("launch changed")
printlnValue(threadLocal)
// yield() 掛起函數(shù)會掛起當(dāng)前協(xié)程,并將協(xié)程分發(fā)到 Dispatcher.Default 的隊(duì)列中等待恢復(fù)
// 掛起協(xié)程后調(diào)用 restoreThreadContext() 修改 threadLocal 為 null
yield()
// 恢復(fù)協(xié)程后,此時(shí)在 DefaultDispatcher-worker-2 線程,threadLocal 的值為 null
// 再次調(diào)用 updateThreadContext() 設(shè)置 threadLocal 的值為 launch,保存之前的為 null
printlnValue(threadLocal)
// 結(jié)束協(xié)程后,restoreThreadContext() 修改 threadLocal 為 null
}
job.join()
// 此時(shí)已經(jīng)從 DefaultDispatcher-worker-2 線程切換回 main 線程,main 線程中的 threadlocal 沒有修改過,還是為 Init
printlnValue(threadLocal)
}
private fun printlnValue(threadLocal: ThreadLocal<String>) {
println("${Thread.currentThread()} thread local value: ${threadLocal.get()}")
}
所以 ThreadContextElement 并不能跟蹤所有ThreadLocal對象的訪問,而且每次掛起時(shí)更新的值將丟失。最重要的牢記它的原理:啟動和恢復(fù)時(shí)保存ThreadLocal在當(dāng)前線程的值,并修改為 value,掛起和結(jié)束時(shí)修改當(dāng)前線程ThreadLocal的值為之前保存的值。
3. 已有線程同步方式
AtomicInteger 等
java.util.concurrent.atomic包中的原子類ConcurrentHashMap 等線程安全的集合
協(xié)程中的并發(fā)與線程的并發(fā)大部分是相同的,所以本篇文章應(yīng)該是目前為止該系列文章中最容易理解的一篇,本系列Kotlin Coroutines(協(xié)程) 完全解析暫時(shí)就到這里,后面待 select 表達(dá)式、Channel、Actor 等實(shí)驗(yàn)性內(nèi)容正式發(fā)布后繼續(xù)解析,還有在 Android 項(xiàng)目中協(xié)程的實(shí)際運(yùn)用,敬請期待。