Kotlin協(xié)程是一種編程思想,其中一個(gè)比較重要的應(yīng)用場景就是線程控制。
以往我們?cè)谛枰l(fā)的時(shí)候,往往會(huì)用到j(luò)ava 的Executor和Android 的AsyncTask, 這樣會(huì)導(dǎo)致跟直接使用Thead一樣的困難和麻煩
1、線程什么時(shí)候結(jié)束
2、線程間的互相通信
3、多個(gè)線程的管理
然后就有了比較出名的Rxjava,它采用[Obervable]的編程范式的鏈?zhǔn)秸{(diào)用方式,解決了callback比較多的難題。而同樣Kotlin 協(xié)程也能夠解決這個(gè)問題,而且它以同步的方式寫異步代碼,從代碼角度來看結(jié)構(gòu)更清晰。
舉個(gè)例子:
coroutineScope.launch(Dispatcher.Main){
val user = withContext(Dispatcher.IO){
api.login() // IO 線程執(zhí)行請(qǐng)求
}
nameTv.text = user.name // 主線程更新ui
}
是不是比較簡單,另外Kotlin協(xié)程還可以處理并行的請(qǐng)求處理,比如
coroutineScope.launch(Dispatcher.Main){
val avatar = async { api.getAvatar(user)}
val logo = async { api.getLogo(user)}
show(avatar.await(), logo.await())
}
async函數(shù)返回的是Deferred類型,意思是延時(shí),稍后拿到結(jié)果,取結(jié)果需調(diào)用Deferred.await()方法。
suspend 函數(shù)
suspend標(biāo)注的函數(shù)是一個(gè)耗時(shí)函數(shù),該函數(shù)只能在kotlin協(xié)程函數(shù)或者是另外一個(gè)suspend函數(shù)里面被調(diào)用。而真正實(shí)施線程切換的是withContext(Dispatchers.IO)。
這里先介紹一下Dispatchers.IO是個(gè)什么東西
public val IO: CoroutineDispatcher = DefaultScheduler.IO
public actual val Main: MainCoroutineDispatcher get() = MainDispatcherLoader.dispatcher
public actual val Default: CoroutineDispatcher = createDefaultDispatcher()
CoroutineDispatcher 是所有Coroutine Dispatcher的父類
public abstract class CoroutineDispatcher :
AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {
// 判斷是否需求分發(fā)
public open fun isDispatchNeeded(context: CoroutineContext): Boolean = true
//分發(fā)runnable
public abstract fun dispatch(context: CoroutineContext, block: Runnable)
//創(chuàng)建一個(gè)DispatchedContinuation
public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =
DispatchedContinuation(this, continuation)
public operator fun plus(other: CoroutineDispatcher): CoroutineDispatcher = other
}
val IO: CoroutineDispatcher = LimitingDispatcher(
this,
systemProp(IO_PARALLELISM_PROPERTY_NAME, 64.coerceAtLeast(AVAILABLE_PROCESSORS)),
"Dispatchers.IO",
TASK_PROBABLY_BLOCKING
)
這里知道Dispatchers.IO是個(gè)CoroutineDispatcher的實(shí)例就行了,具體它是如何實(shí)現(xiàn)線程間的調(diào)度的我們稍后再說。
我們先看啟動(dòng)一個(gè)協(xié)程的過程
public fun CoroutineScope.launch(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> Unit
): Job {
//調(diào)用內(nèi)部的plus方法生成一個(gè)新的CoroutineContext
val newContext = newCoroutineContext(context)
val coroutine = if (start.isLazy)
LazyStandaloneCoroutine(newContext, block) else
StandaloneCoroutine(newContext, active = true)
coroutine.start(start, coroutine, block)
return coroutine
}
private open class StandaloneCoroutine(
parentContext: CoroutineContext,
active: Boolean
) : AbstractCoroutine<Unit>(parentContext, active) {
override fun handleJobException(exception: Throwable): Boolean {
handleCoroutineException(context, exception)
return true
}
}
//AbstractCoroutine
public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) {
initParentJob()
// 調(diào)用CoroutineStart里面的invoke方法
start(block, receiver, this)
}
//CoroutineStart
public operator fun <T> invoke(block: suspend () -> T, completion: Continuation<T>): Unit =
when (this) {
DEFAULT -> block.startCoroutineCancellable(completion)
ATOMIC -> block.startCoroutine(completion)
UNDISPATCHED -> block.startCoroutineUndispatched(completion)
LAZY -> Unit // will start lazily
}
public fun <T> (suspend () -> T).startCoroutineCancellable(completion: Continuation<T>): Unit = runSafely(completion) {
createCoroutineUnintercepted(completion).intercepted().resumeCancellableWith(Result.success(Unit))
}
//1、
public actual fun <R, T> (suspend R.() -> T).createCoroutineUnintercepted(
receiver: R, completion: Continuation<T>
): Continuation<Unit> {
//createCoroutineUnintercepted 創(chuàng)建一個(gè)Continuation對(duì)象,就是我們反編譯的時(shí)候的那個(gè)create方法
return if (this is BaseContinuationImpl) create(receiver, completion) else // ...
}
@NotNull
public final Continuation create(@NotNull FlowCollector $this$create, Object it, @NotNull Continuation continuation) {
PlantListViewModel$$special$$inlined$flatMapLatest$1 var4 = new PlantListViewModel$$special$$inlined$flatMapLatest$1(continuation, this.$plantRepository$inlined);
var4.p$ = $this$create;
var4.p$0 = it;
return var4;
}
//2、
public actual fun <T> Continuation<T>.intercepted(): Continuation<T> =
(this as? ContinuationImpl)?.intercepted() ?: this
//ContinuationImpl
public fun intercepted(): Continuation<Any?> =
intercepted
?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this)
.also { intercepted = it }
//CoroutineDispatcher
public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =
DispatchedContinuation(this, continuation)
//3、DispatchedContinuation
public fun <T> Continuation<T>.resumeCancellableWith(
result: Result<T>,
onCancellation: ((cause: Throwable) -> Unit)? = null
): Unit = when (this) {
is DispatchedContinuation -> resumeCancellableWith(result, onCancellation)
else -> resumeWith(result)
}
inline fun resumeCancellableWith(
result: Result<T>,
noinline onCancellation: ((cause: Throwable) -> Unit)?
) {
val state = result.toState(onCancellation)
//這里判斷是否需要切換線程
if (dispatcher.isDispatchNeeded(context)) {
_state = state
resumeMode = MODE_CANCELLABLE
//調(diào)用對(duì)應(yīng)的調(diào)度器,分發(fā)事件,實(shí)現(xiàn)線程切換
dispatcher.dispatch(context, this)
} else {
executeUnconfined(state, MODE_CANCELLABLE) {
if (!resumeCancelled(state)) {
//不需要切換
resumeUndispatchedWith(result)
}
}
}
}
inline fun resumeUndispatchedWith(result: Result<T>) {
withCoroutineContext(context, countOrElement) {
continuation.resumeWith(result)
}
}
continuation.resumeWith()調(diào)用的是SuspendLambda.resumeWith,然后它調(diào)用的是父類的BaseContinuationImpl里面的resumeWith
//BaseContinuationImpl
public final override fun resumeWith(result: Result<Any?>) {
// This loop unrolls recursion in current.resumeWith(param) to make saner and shorter stack traces on resume
var current = this
var param = result
while (true) {
// Invoke "resume" debug probe on every resumed continuation, so that a debugging library infrastructure
// can precisely track what part of suspended callstack was already resumed
probeCoroutineResumed(current)
with(current) {
val completion = completion!! // fail fast when trying to resume continuation without completion
val outcome: Result<Any?> =
try {
/// 調(diào)用SuspendLambda.invokeSuspend方法
val outcome = invokeSuspend(param)
if (outcome === COROUTINE_SUSPENDED) return
Result.success(outcome)
} catch (exception: Throwable) {
Result.failure(exception)
}
releaseIntercepted() // this state machine instance is terminating
if (completion is BaseContinuationImpl) {
// unrolling recursion via loop
current = completion
param = outcome
} else {
// top-level completion reached -- invoke and return
completion.resumeWith(outcome)
return
}
}
}
}
協(xié)程的線程調(diào)度
主線程
public actual val Main: MainCoroutineDispatcher get() = MainDispatcherLoader.dispatcher
//MainDispatcherLoader
val dispatcher: MainCoroutineDispatcher = loadMainDispatcher()
private fun loadMainDispatcher(): MainCoroutineDispatcher {
return try {
// 初始化factories,List<MainDispatcherFactory>
factories.maxBy { it.loadPriority }?.tryCreateDispatcher(factories)
?: createMissingDispatcher()
} catch (e: Throwable) {
// Service loader can throw an exception as well
createMissingDispatcher(e)
}
}
public fun MainDispatcherFactory.tryCreateDispatcher(factories: List<MainDispatcherFactory>): MainCoroutineDispatcher =
try {
createDispatcher(factories)
} catch (cause: Throwable) {
createMissingDispatcher(cause, hintOnError())
}
//AndroidDispatcherFactory
override fun createDispatcher(allFactories: List<MainDispatcherFactory>) =
HandlerContext(Looper.getMainLooper().asHandler(async = true))
//HandlerContext
internal class HandlerContext private constructor(
private val handler: Handler,
private val name: String?,
private val invokeImmediately: Boolean
) : HandlerDispatcher(), Delay {
public constructor(
handler: Handler,
name: String? = null
) : this(handler, name, false)
//invokeImmediately 默認(rèn)為false
override fun isDispatchNeeded(context: CoroutineContext): Boolean {
return !invokeImmediately || Looper.myLooper() != handler.looper
}
// 用handler分發(fā)事件
override fun dispatch(context: CoroutineContext, block: Runnable) {
handler.post(block)
}
override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
val block = Runnable {
with(continuation) { resumeUndispatched(Unit) }
}
handler.postDelayed(block, timeMillis.coerceAtMost(MAX_DELAY))
continuation.invokeOnCancellation { handler.removeCallbacks(block) }
}
override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle {
handler.postDelayed(block, timeMillis.coerceAtMost(MAX_DELAY))
return object : DisposableHandle {
override fun dispose() {
handler.removeCallbacks(block)
}
}
}
主線程的線程調(diào)度,使用的是handler進(jìn)行消息的分發(fā)。
Dispatchers.IO
//DefaultScheduler
val IO: CoroutineDispatcher = LimitingDispatcher(
this,
systemProp(IO_PARALLELISM_PROPERTY_NAME, 64.coerceAtLeast(AVAILABLE_PROCESSORS)),
"Dispatchers.IO",
TASK_PROBABLY_BLOCKING
)
//ExperimentalCoroutineDispatcher
public open class ExperimentalCoroutineDispatcher(
private val corePoolSize: Int,
private val maxPoolSize: Int,
private val idleWorkerKeepAliveNs: Long,
private val schedulerName: String = "CoroutineScheduler"
) : ExecutorCoroutineDispatcher() {
public constructor(
corePoolSize: Int = CORE_POOL_SIZE,
maxPoolSize: Int = MAX_POOL_SIZE,
schedulerName: String = DEFAULT_SCHEDULER_NAME
) : this(corePoolSize, maxPoolSize, IDLE_WORKER_KEEP_ALIVE_NS, schedulerName)
@Deprecated(message = "Binary compatibility for Ktor 1.0-beta", level = DeprecationLevel.HIDDEN)
public constructor(
corePoolSize: Int = CORE_POOL_SIZE,
maxPoolSize: Int = MAX_POOL_SIZE
) : this(corePoolSize, maxPoolSize, IDLE_WORKER_KEEP_ALIVE_NS)
override val executor: Executor
get() = coroutineScheduler
// This is variable for test purposes, so that we can reinitialize from clean state
private var coroutineScheduler = createScheduler()
override fun dispatch(context: CoroutineContext, block: Runnable): Unit =
try {
//分發(fā)事件
coroutineScheduler.dispatch(block)
} catch (e: RejectedExecutionException) {
// CoroutineScheduler only rejects execution when it is being closed and this behavior is reserved
// for testing purposes, so we don't have to worry about cancelling the affected Job here.
DefaultExecutor.dispatch(context, block)
}
override fun dispatchYield(context: CoroutineContext, block: Runnable): Unit =
try {
coroutineScheduler.dispatch(block, tailDispatch = true)
} catch (e: RejectedExecutionException) {
// CoroutineScheduler only rejects execution when it is being closed and this behavior is reserved
// for testing purposes, so we don't have to worry about cancelling the affected Job here.
DefaultExecutor.dispatchYield(context, block)
}
//...
internal fun dispatchWithContext(block: Runnable, context: TaskContext, tailDispatch: Boolean) {
try {
coroutineScheduler.dispatch(block, context, tailDispatch)
} catch (e: RejectedExecutionException) {
// CoroutineScheduler only rejects execution when it is being closed and this behavior is reserved
// for testing purposes, so we don't have to worry about cancelling the affected Job here.
// TaskContext shouldn't be lost here to properly invoke before/after task
DefaultExecutor.enqueue(coroutineScheduler.createTask(block, context))
}
}
private fun createScheduler() = CoroutineScheduler(corePoolSize, maxPoolSize, idleWorkerKeepAliveNs, schedulerName)
}
// CoroutineScheduler 繼承至Executor 線程池
internal class CoroutineScheduler(
@JvmField val corePoolSize: Int,
@JvmField val maxPoolSize: Int,
@JvmField val idleWorkerKeepAliveNs: Long = IDLE_WORKER_KEEP_ALIVE_NS,
@JvmField val schedulerName: String = DEFAULT_SCHEDULER_NAME
) : Executor, Closeable {
override fun execute(command: Runnable) = dispatch(command)
fun dispatch(block: Runnable, taskContext: TaskContext = NonBlockingContext, tailDispatch: Boolean = false) {
trackTask() // this is needed for virtual time support
// task是繼承runnable
val task = createTask(block, taskContext)
// try to submit the task to the local queue and act depending on the result
// Worker 繼承 Thread, 里面有一個(gè)local queue
val currentWorker = currentWorker()
//先加入到local queue里面
val notAdded = currentWorker.submitToLocalQueue(task, tailDispatch)
if (notAdded != null) {
// 加入到local queue失敗,就加入到global queue里面
if (!addToGlobalQueue(notAdded)) {
// Global queue is closed in the last step of close/shutdown -- no more tasks should be accepted
throw RejectedExecutionException("$schedulerName was terminated")
}
}
val skipUnpark = tailDispatch && currentWorker != null
// Checking 'task' instead of 'notAdded' is completely okay
if (task.mode == TASK_NON_BLOCKING) {
if (skipUnpark) return
signalCpuWork()
} else {
// Increment blocking tasks anyway
signalBlockingWork(skipUnpark = skipUnpark)
}
}
}
Dispatchers.IO復(fù)雜一點(diǎn),內(nèi)部實(shí)現(xiàn)了線程池Executor,通過線程池進(jìn)行事件的分發(fā),原理也是類似的。
到此,Dispatchers.IO也分析完畢。
最后,說一下協(xié)程容易搞錯(cuò)的一些問題
1). 協(xié)程支持并發(fā)嗎?開啟多個(gè)協(xié)程改變某一個(gè)值,對(duì)值有什么影響?有什么辦法可以避免?
首先,協(xié)程肯定是可以支持并發(fā)的,Dispatchers.IO以及Dispatchers.Default內(nèi)部都是采用線程池來執(zhí)行的,默認(rèn)的核心線程數(shù)是CUP的核數(shù)。既然支持并發(fā),那對(duì)同時(shí)對(duì)某個(gè)值進(jìn)行賦值,肯定會(huì)導(dǎo)致該值混亂。那么遇到這種情況,我們有幾種辦法處理
- 可以采用單線程的模式,比如Dispatchers.Unconfined,或者withContext()的方式啟動(dòng)協(xié)程。雖然launch()和withContext()都可以開啟一個(gè)協(xié)程,但是launch是并行啟動(dòng),而withContext卻是串行的。用async{}.await()也是可以的,await()是一個(gè)掛起函數(shù),效果類似withContext().
- 使用Mutex,也就是加鎖機(jī)制,和java的synchronized一樣。使用mutex.withLock{*}實(shí)現(xiàn)數(shù)據(jù)同步。當(dāng)然用Atomic也是可行的。
2). 啟動(dòng)協(xié)程的方式有l(wèi)aunch 、withContext、async三種,各有什么區(qū)別?
launch是一個(gè)非suspend函數(shù),也就是說它是非阻塞式的,可以并行執(zhí)行
withContext是一個(gè)suspend函數(shù),所以它是串行的
async不使用await()的時(shí)候跟launch一樣,使用await()的時(shí)候跟withContext一樣,因?yàn)閍wait是一個(gè)suspend函數(shù)。async適用于多個(gè)并行任務(wù),而且需要等待返回結(jié)果的情況下。