本篇文章舉例協(xié)程的各種方法的使用,并簡單闡述各個方法的一些注意事項。
協(xié)程作用域的創(chuàng)建
1.通過工廠函數(shù)創(chuàng)建自定義上下文的作用域
// 舉個例子:
// 協(xié)程異常處理器(也是一個上下文元素)
private val exceptionHandler =
CoroutineExceptionHandler { _, throwable ->
onError(throwable)
}
val myScope = CoroutineScope(exceptionHandler + SupervisorJob() + Dispatchers.IO)
// 源碼
public fun CoroutineScope(
context: CoroutineContext
): CoroutineScope = ContextScope(
if (context[Job] != null) context
else context + Job()
)
internal class ContextScope(
context: CoroutineContext
) : CoroutineScope {
override val coroutineContext: CoroutineContext = context
override fun toString(): String =
"CoroutineScope(coroutineContext=$coroutineContext)"
}
2.通過工廠函數(shù)MainScope()創(chuàng)建主線程調(diào)度器的作用域
// 舉個例子:
val mainScope = MainScope()
// 源碼
public fun MainScope(): CoroutineScope = ContextScope(SupervisorJob() + Dispatchers.Main)
3.官方架構(gòu)提供的一些作用域viewModelScope和lifecycleScope
// Android JetPack ViewModel里的viewModelScope,ViewModeld的擴展變量
// 來自androidx.lifecycle:lifecycle-viewmodel-ktx:2.3.1
public val ViewModel.viewModelScope: CoroutineScope
get() {
val scope: CoroutineScope? = this.getTag(JOB_KEY)
if (scope != null) {
return scope
}
return setTagIfAbsent(
JOB_KEY,
CloseableCoroutineScope(
SupervisorJob() +
Dispatchers.Main.immediate
)
)
}
internal class CloseableCoroutineScope(
context: CoroutineContext
) : Closeable, CoroutineScope {
override val coroutineContext: CoroutineContext = context
override fun close() {
coroutineContext.cancel()
}
}
// Android JetPack lifecycle里的lifecycleScope,LifecycleOwner(Activity實現(xiàn)了此接口,可以直接用)的擴展變量
// 來自androidx.lifecycle:lifecycle-runtime-ktx:2.2.0
val LifecycleOwner.lifecycleScope: LifecycleCoroutineScope
get() = lifecycle.coroutineScope
val Lifecycle.coroutineScope: LifecycleCoroutineScope
get() {
while (true) {
val existing = mInternalScopeRef.get() as LifecycleCoroutineScopeImpl?
if (existing != null) {
return existing
}
val newScope = LifecycleCoroutineScopeImpl(
this,
SupervisorJob() + Dispatchers.Main.immediate
)
if (mInternalScopeRef.compareAndSet(null, newScope)) {
newScope.register()
return newScope
}
}
}
協(xié)程的創(chuàng)建
方式一:launch
創(chuàng)建一個協(xié)程,并返回協(xié)程的引用job,最常用的方式
val job = MainScope().launch (Dispatchers.IO){
delay(1000)
println("run in coroutine")
}
job.cancel() // 取消協(xié)程
job.join() // 等待協(xié)程執(zhí)行完畢
job.cancelAndJoin() // 取消并等待協(xié)程執(zhí)行完畢(取消操作并不等于會立馬響應(yīng),這里其實是分別執(zhí)行了cancel和join)
job.cancelChildren() // 取消子協(xié)程
方式二:async
創(chuàng)建一個協(xié)程,并返回協(xié)程的引用Deferred(也是個job),通過Deferred的await拿到返回結(jié)果,此時協(xié)程會掛起
MainScope().launch (Dispatchers.Main){
// 啟動協(xié)程
val deferred = async {
"async result"
}
// 掛起等待協(xié)程結(jié)果
val result = deferred.await()
// 恢復(fù)掛起
println("result = $result")
}
作用域構(gòu)建器
runBlocking
runBlocking {} 會在開啟一個新的協(xié)程,并且阻塞主線程,直到操作完成。這個函數(shù)不應(yīng)該在協(xié)程里面使用,它是用來橋接需要阻塞的掛起函數(shù),主要用于 main function 和 junit 測試。注意他只是一個普通函數(shù)。
coroutineScope
runBlocking {
val result = coroutineScope {
println("run thread = ${Thread.currentThread().name}")
launch {
delay(1000)
println("run launch success1")
}
launch {
delay(2000)
println("run launch success2")
}
"coroutineScope result"
}
println("result = $result")
}
// 運行結(jié)果(看得出來使用的是父協(xié)程的線程調(diào)度器)
run thread = main
run launch success1
run launch success2
result = coroutineScope result
// 注意這是一個掛起函數(shù),它會掛起執(zhí)行一直到里面的子協(xié)程執(zhí)行完畢、代碼執(zhí)行完畢然后返回結(jié)果,這是他與runBlocking不同的地方,runBlocking只是個普通函數(shù),里面的協(xié)程體會阻塞線程。
public suspend fun <R> coroutineScope(block: suspend CoroutineScope.() -> R): R {
contract {
callsInPlace(block, InvocationKind.EXACTLY_ONCE)
}
return suspendCoroutineUninterceptedOrReturn { uCont ->
val coroutine = ScopeCoroutine(uCont.context, uCont)
coroutine.startUndispatchedOrReturn(coroutine, block)
}
}
coroutineScope經(jīng)常用來把一個長耗時的任務(wù)拆分成多個子任務(wù),使這些子任務(wù)并行執(zhí)行,這就是所謂的結(jié)構(gòu)化并發(fā):
suspend fun showSomeData() = coroutineScope {
val data1 = async { //子任務(wù)1
delay(2000)
100
}
val data2 = async { //子任務(wù)2
delay(3000)
20
}
withContext(Dispatchers.Default) { //合并結(jié)果并返回
delay(3000)
val random = Random(10)
data1.await() + data2.await() + random.nextInt(100)
}
}
supervisorScope
supervisorScope功能與coroutinScope類似,不同之處在于,它可以讓里面的子協(xié)程發(fā)生異常而崩潰的時候不影響自身和其他的子協(xié)程的執(zhí)行。
runBlocking {
val result = supervisorScope {
println("run thread = ${Thread.currentThread().name}")
launch {
delay(1000)
// 主動拋異常
throw RuntimeException()
println("run launch success1")
}
launch {
delay(2000)
println("run launch success2")
}
"coroutineScope result"
}
println("result = $result")
}
// 運行結(jié)果
run thread = main
run launch success2
result = coroutineScope result
說到supervisorScope順便提一嘴SupervisorJob,創(chuàng)建作用域或者啟動協(xié)程的時候如果上下文帶上SupervisorJob()也可以達到自身協(xié)程處理異常而不影響父協(xié)程和兄弟協(xié)程的運行。
常用函數(shù)
withTimeout
suspend fun timeOut(){
try {
withTimeout(1000){
println("在此做一些耗時操作,超過設(shè)定的時間就拋異常")
delay(2500)
}
}catch (e:TimeoutCancellationException){
println(e.message)
// 處理超時異常
}
}
withContext
// 掛起函數(shù),適合一些耗時異步操作
suspend fun timeOut(){
val result = withContext(Dispatchers.IO){
delay(2500)
"result"
}
println(result)
}
flow
private fun flowEmit() = flow<Int> {
// 這段代碼塊實際運行在 flowEmit().collect所處的協(xié)程代碼塊里
println("...$coroutineContext")
for (k in 1 .. 3){
delay(1000)
emit(k)
}
}
@Test
fun flowTest() = runBlocking {
println("...${this.coroutineContext}")
// flowEmit()只是創(chuàng)建了個對象
// 調(diào)用collect的時候才開始執(zhí)行上面定義的代碼塊
flowEmit().collect(object : FlowCollector<Int> {
// 這個代碼其實也是運行在runBlocking協(xié)程域里
override suspend fun emit(value: Int) {
println("...$value")
}
})
//
println("...end")
}
輸出
...[CoroutineId(1), "coroutine#1":BlockingCoroutine{Active}@2bdd8394, BlockingEventLoop@5f9edf14]
...[CoroutineId(1), "coroutine#1":BlockingCoroutine{Active}@2bdd8394, BlockingEventLoop@5f9edf14]
...1
...2
...3
...end