本文為協(xié)程的開篇作,作者目前對協(xié)程的理解仍存在一些疑問,歡迎批評指正。

概念
?些 API 啟動?時間運(yùn)?的操作(例如?絡(luò) IO、?件 IO、CPU 或 GPU 密集型任務(wù)等),并要求調(diào)?者阻塞直到它們完成,通常的做法是使用異步加回調(diào)的方式來實現(xiàn)非阻塞,但異步回調(diào)代碼寫起來并不容易,尤其出現(xiàn)嵌套回調(diào)。
協(xié)程提供了?種避免阻塞線程并用更廉價、更可控的操作替代線程阻塞的?法:協(xié)程掛起。
kotlin協(xié)程是一種用戶態(tài)的輕量級線程。
協(xié)程主要是讓原來要使用"異步+回調(diào)方式"寫出來復(fù)雜代碼,簡化成可以用看似同步的方式,這樣我們就可以按串行的思維模式去組織原本分散在不同上下文的代碼邏輯。
//偽代碼
launch(Background) {
val bitmap = MediaStore.getBitmap(uri)
launch(UI) {
imageView.setImageBitmap(bitmap)
}
}
集成環(huán)境
- kotlin插件
ext.kotlin_version = '1.3.11'
dependencies {
classpath "org.jetbrains.kotlin:kotlin-gradle-plugin:$kotlin_version"
}
- 協(xié)程核心庫
implementation "org.jetbrains.kotlinx:kotlinx-coroutines-core:1.1.0"
//或使用android
implementation "org.jetbrains.kotlinx:kotlinx-coroutines-android:1.1.0"
- experimental聲明
//在module的build.gradle中聲明
kotlin {
experimental {
coroutines 'enable'
}
}
官方文檔
https://kotlinlang.org/docs/reference/coroutines/coroutines-guide.html
由于協(xié)程核心庫由experimental轉(zhuǎn)為release時間不長,第三方的blog幾乎都是基于experimental的(API與最新版存在差異),目前來看官網(wǎng)的文檔最是大而全,但也稍有滯后性。
最新版本的協(xié)程庫可以去mavenCenter搜索。
啟動協(xié)程的方法
通常啟動協(xié)程有l(wèi)aunch和async方法。
launch啟動協(xié)程
@Test
fun coroutineDemo1(){
println("test func coroutineDemo1")
GlobalScope.launch {
delay(2000)
println("coroutine finish")
}
println("coroutine start")
Thread.sleep(3000)
println("coroutine end")
}
運(yùn)行結(jié)果
I/System.out: coroutine start
I/System.out: coroutine finish
可以看到launch函數(shù)是以非阻塞的方式啟動一個協(xié)程,而Thread.sleep是阻塞式的。
事實上launch方法有三個參數(shù),并返回一個Job對象。
public fun CoroutineScope.launch(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> Unit
): Job {
val newContext = newCoroutineContext(context)
val coroutine = if (start.isLazy)
LazyStandaloneCoroutine(newContext, block) else
StandaloneCoroutine(newContext, active = true)
coroutine.start(start, coroutine, block)
return coroutine
}
suspend函數(shù)
代碼中的delay函數(shù)就是一個掛起函數(shù),它用suspend關(guān)鍵字修飾,掛起函數(shù)只能從一個協(xié)程代碼內(nèi)部調(diào)用,普通代碼不能調(diào)用,可以修改示例驗證。
runBlocking函數(shù)
上面的例子中既有非阻塞的delay函數(shù),又有Thread.sleep的阻塞函數(shù),這樣可讀性太差,我們可用runBlocking來實現(xiàn)。
@Test
fun testRunBlocking() = runBlocking {
println("test func testRunBlocking")
GlobalScope.launch {
delay(2000)
println("coroutine finish")
}
println("coroutine start")
delay(3000)
}
注:如果不使用runBlocking那么我們是不能在函數(shù)體內(nèi)調(diào)用delay函數(shù)的。
一般runBlocking函數(shù)不用來當(dāng)做普通協(xié)程函數(shù)使用,它的主要目的是用來橋接普通阻塞代碼和掛起風(fēng)格的非阻塞代碼,例如用在main函數(shù)或測試用例中。
協(xié)程的運(yùn)行狀態(tài)
| Job狀態(tài) | isActive | isCompleted |
|---|---|---|
| new(可選的初始狀態(tài)) | false | false |
| ??active(默認(rèn)的初始狀態(tài)) ?? | true | false |
| completed(結(jié)束狀態(tài)) | false | true |
當(dāng)以默認(rèn)參數(shù)創(chuàng)建協(xié)程時,協(xié)程就處于active狀態(tài),若想修改這種行為,可以使用Lazy模式創(chuàng)建
@Test
fun coroutineState() = runBlocking {
println("test func coroutineState")
val job = GlobalScope.launch {
delay(2000)
println("coroutine finish")
}
println("coroutineState 111 isActive:${job.isActive} isCompleted:${job.isCompleted} ")
delay(3000)
println("coroutineState 222 isActive:${job.isActive} isCompleted:${job.isCompleted} ")
}
等待協(xié)程執(zhí)行完畢
默認(rèn)情況下協(xié)程的跟隨當(dāng)前活動線程,如果協(xié)程中的掛起方法未執(zhí)行完畢,而活動線程已經(jīng)退出,則當(dāng)前協(xié)程也就退出了,所以他更像一個守護(hù)線程。
@Test
fun coroutineJoinTest() = runBlocking {
println("test func coroutineJoinTest")
val job = GlobalScope.launch {
delay(2000)
println("coroutine finish thread:${Thread.currentThread()}")
}
job.join()
println("test func coroutineJoinTest end")
}
async啟動協(xié)程
多個掛起函數(shù)的順序執(zhí)行和異步并發(fā)執(zhí)行的效率問題,我們先來看順序執(zhí)行。
@Test
fun testSequential() = runBlocking {
println("[testSequential] start thread:${Thread.currentThread()}")
//統(tǒng)計時長
val time = measureTimeMillis {
val one = doJob1()
val two = doJob2()
println("[testSequential] result :${one + two}")
}
println("[testSequential] completed in :$time ms")
}
使用async函數(shù)實現(xiàn)異步并發(fā)執(zhí)行,與launch函數(shù)不同的是啟動async協(xié)程必須指定調(diào)度器。
@Test
fun testAsync() = runBlocking(Dispatchers.Default) {
println("[testAsync] start thread:${Thread.currentThread()}")
val time = measureTimeMillis {
val one = GlobalScope.async(Dispatchers.Unconfined) {
println("[doJob1] thread:${Thread.currentThread()}")
doJob1() }
val two = GlobalScope.async(Dispatchers.Main) {
println("[doJob2] thread:${Thread.currentThread()}")
doJob2() }
println("[testAsync] result :${one.await() + two.await()}")
}
println("[testAsync] completed in :$time ms")
}
async返回一個DeferredCoroutine對象,它是一種輕量級的非阻塞future,表示后面后提供結(jié)果。通過await函數(shù)獲取結(jié)果,同時它與StandaloneCoroutine(launch函數(shù)返回的協(xié)程對象)一樣都是AbstractCoroutine的子類型,因此也具有isActive和isCompleted屬性。
取消協(xié)程
由launch函數(shù)啟動的協(xié)程返回一個Job對象引用當(dāng)前協(xié)程,可通過該對象的cancel函數(shù)取消正在運(yùn)行的協(xié)程。
@Test
fun coroutineCancelTest() = runBlocking {
println("test func coroutineCancelTest")
val job = GlobalScope.launch {
repeat(100) { i ->
println("coroutine I' am sleeping i:$i")
delay(300)
}
delay(2000)
println("coroutine finish thread:${Thread.currentThread()}")
}
delay(2000)
job.cancel()
println("test func coroutineJoinTest end isActive:${job.isActive} isCompleted:${job.isCompleted}")
}
cancel可以取消一個正在運(yùn)行的掛起函數(shù),但是不能取消一個計算函數(shù),此時可能需要判斷協(xié)程狀態(tài)。
@Test
fun coroutineCancelTest2() = runBlocking {
println("test func coroutineCancelTest2")
val job = GlobalScope.launch {
var i = 0
while(i < 100000) {
//用isActive檢驗協(xié)程的狀態(tài)
if(!isActive) {
return@launch
}
if(i % 7 == 0) {
println("coroutine executing i:$i")
}
i++
}
println("coroutine finish thread:${Thread.currentThread()}")
}
delay(200)
job.cancel()
println("test func coroutineCancelTest2 end")
}
協(xié)程上下文和調(diào)度
- Dispatchers.Default 默認(rèn)調(diào)度器(普通工作線程)
- Dispatchers.IO 普通IO線程
- Dispatchers.Main 安卓主線程
- Dispatchers.Unconfined
- newSingleThreadContext("myThread") 自定義線程
@Test
fun testDispatchers() = runBlocking {
println("[testDispatchers] start thread:${Thread.currentThread()}")
val jobs = arrayListOf<Job>()
jobs.add(GlobalScope.launch(Dispatchers.Unconfined) {
println("Unconfined is working thread:${Thread.currentThread()}")
})
jobs.add(GlobalScope.launch(Dispatchers.Main) {
println("Main is working thread:${Thread.currentThread()}")
doJob1()
})
jobs.add(GlobalScope.launch(Dispatchers.Default) {
println("Default is working thread:${Thread.currentThread()}")
doJob1()
})
jobs.add(GlobalScope.launch(newSingleThreadContext("myThread")) {
println("newSingleThreadContext is working thread:${Thread.currentThread()}")
})
jobs.forEach {
it.join()
}
println("testDispatchers....... end")
}
- withContext協(xié)程內(nèi)部調(diào)度,切換線程
@Test
fun testWithContext() = runBlocking(Dispatchers.Main) {
println("[testWithContext] start")
val view = View()
val provider = TestDataProvider()
view.showLoading()
val result = withContext(Dispatchers.IO) {provider.loadData()}
view.showData(result)
}
協(xié)程的嵌套
當(dāng)我們使用協(xié)程A的上下文啟動另一個協(xié)程B時, B將成為A的子協(xié)程。當(dāng)父協(xié)程A任務(wù)被取消時, B以及它的所有子協(xié)程都會被遞歸地取消。
@Test
fun testInnerCoroutines() = runBlocking {
println("[testInnerCoroutines] start ")
val request = GlobalScope.launch {
println("主協(xié)程 thread:${Thread.currentThread()}")
val job1 = GlobalScope.launch {
println("job1: 獨立的協(xié)程上下文! thread:${Thread.currentThread()}")
delay(1000)
println("job1: 不會受到request.cancel()的影響")
}
// 繼承父上下文
val job2 = GlobalScope.launch(Dispatchers.Unconfined) {
println("job2: 是request coroutine的子協(xié)程 thread:${Thread.currentThread()}")
delay(1000)
println("job2: 當(dāng)request.cancel(),job2也會被取消")
}
job1.join()
job2.join()
}
delay(500)
request.cancel()
delay(1000)
println("main: Who has survived request cancellation?")
}
特點與優(yōu)勢
協(xié)程計算可以被掛起而無需阻塞線程。線程阻塞的代價通常是昂貴的,尤其在高負(fù)載時,因為只有相對少量線程實際可用,因此阻塞其中?個會導(dǎo)致?些重要的任務(wù)被延遲。
另?方面,協(xié)程掛起幾乎是無代價的。不需要上下文切換或者 OS 的任何其他干預(yù)(但基于現(xiàn)在的協(xié)程使用確實已切換線程)。最重要的是,掛起可以在很大程度上由用戶控制:我們可以決定掛起時發(fā)生什么并根據(jù)需求優(yōu)化/記日志等。
使用協(xié)程,我們不再需要像異步編程時寫那么一堆callback函數(shù),代碼結(jié)構(gòu)不再支離破碎,整個代碼邏輯上看上去和同步代碼沒什么區(qū)別,簡單,易理解,優(yōu)雅。
基本原理
協(xié)程完全通過編譯技術(shù)實現(xiàn)(不需要來自 VM 或 OS 端的支持),掛起機(jī)制是通過狀態(tài)機(jī)來實現(xiàn),其中的狀態(tài)對應(yīng)于掛起調(diào)用。
- 輕量級
@Test
fun testLightWeightCoroutine() = runBlocking {
println("[testLightWeightCoroutine] start")
val jobs = List(100000) {
GlobalScope.launch {
delay(1000L)
print(".")
}
}
jobs.forEach { it.join() } // wait for all jobs to complete
println("----")
println("[testLightWeightCoroutine] end")
}
而運(yùn)行下面的代碼就會直接OOM
@Test
fun testThread(){
val jobs = List(100000) {
Thread({
Thread.sleep(1000L)
print(".")
})
}
jobs.forEach { it.start() }
jobs.forEach { it.join() }
}
但是通過日志打印線程,發(fā)現(xiàn)協(xié)程使用的是線程池,這樣比較是否合理?
協(xié)程拓展的演變領(lǐng)域
- kotlinx-coroutines-rx
- kotlinx-coroutines-android
- kotlinx-coroutines-swing
- kotlinx-coroutines-nio