上周我們把 Kotlin Coroutine 的基本 API 挨個(gè)講了一下,也給出了一些簡單的封裝。
真是不太給力,就在前幾天發(fā)布的 1.1 Beta 2 當(dāng)中,所有協(xié)程的 API 包名后面都加了一個(gè) experimental,這意味著 Kotlin 官方在 1.1 當(dāng)中還是傾向于將 Coroutine 作為一個(gè)實(shí)驗(yàn)性質(zhì)的特性的,不過,這也沒關(guān)系,我們學(xué)習(xí)的心不以外界的變化而變化不是?
這一篇我們基于前面的基礎(chǔ)來了解一下 Kotlinx.coroutines 這個(gè)庫的使用,如果大家對(duì)它的實(shí)現(xiàn)原理有興趣,可以再讀一讀上一篇文章,我們也可以在后面繼續(xù)寫一些文章來給深入地大家介紹。
1. 準(zhǔn)備工作
就像前面我們說到的,1.1 Beta 2 當(dāng)中協(xié)程相關(guān)的基礎(chǔ)庫的包名都增加了 experimental,所以我們在選擇 kotlinx.coroutines 的版本的時(shí)候也一定要對(duì)應(yīng)好編譯器的版本,不然...你自己想哈哈。
我們強(qiáng)調(diào)一下,kotlin 的版本選擇 1.1.0-beta-38,kotlinx.coroutines 的版本選擇 0.6-beta,如果你恰好使用 gradle,那么告訴你一個(gè)好消息,我會(huì)直接告訴你怎么配置:
buildscript {
ext.kotlin_version = '1.1.0-beta-38'
repositories {
jcenter()
maven {
url "http://dl.bintray.com/kotlin/kotlin-eap-1.1"
}
}
...
}
repositories {
jcenter()
maven {
url "http://dl.bintray.com/kotlin/kotlin-eap-1.1"
}
}
kotlin {
experimental {
coroutines 'enable'
}
}
dependencies {
compile "org.jetbrains.kotlin:kotlin-stdlib:$kotlin_version"
compile 'org.jetbrains.kotlinx:kotlinx-coroutines-core:0.6-beta'
}
2. 一個(gè)基本的協(xié)程的例子
這個(gè)例子是 kotlinx.coroutines 的第一個(gè)小例子。
fun main(args: Array<String>) {
launch(CommonPool) { // create new coroutine in common thread pool
delay(1000L) // non-blocking delay for 1 second (default time unit is ms)
println("World!") // print after delay
}
println("Hello,") // main function continues while coroutine is delayed
Thread.sleep(2000L) // block main thread for 2 seconds to keep JVM alive
}
這個(gè)例子的運(yùn)行結(jié)果是:
Hello,
World!
其實(shí)有了上一篇文章的基礎(chǔ)我們很容易知道,launch 方法啟動(dòng)了一個(gè)協(xié)程,CommonPool 是一個(gè)有線程池的上下文,它可以負(fù)責(zé)把協(xié)程的執(zhí)行分配到合適的線程上。所以從線程的角度來看,打印的這兩句是在不同的線程上的。
20170206-063015.015 [main] Hello,
20170206-063016.016 [ForkJoinPool.commonPool-worker-1] World!
這段代碼的執(zhí)行效果與線程的版本看上去是一樣的:
thread(name = "MyThread") {
Thread.sleep(1000L)
log("World!")
}
log("Hello,")
Thread.sleep(2000L)
3. 主線程上的協(xié)程
我們剛才通過 launch 創(chuàng)建的協(xié)程是在 CommonPool 的線程池上面的,所以協(xié)程的運(yùn)行并不在主線程。如果我們希望直接在主線程上面創(chuàng)建協(xié)程,那怎么辦?
fun main(args: Array<String>) = runBlocking<Unit> {
launch(CommonPool) {
delay(1000L)
println("World!")
}
println("Hello,")
delay(2000L)
}
這個(gè)還是 kotlinx.coroutines 的例子,我們來分析一下。runBlocking 實(shí)際上也跟 launch 一樣,啟動(dòng)一個(gè)協(xié)程,只不過它傳入的 context 不會(huì)進(jìn)行線程切換,也就是說,由它創(chuàng)建的協(xié)程會(huì)直接運(yùn)行在當(dāng)前線程上。
在 runBlocking 當(dāng)中通過 launch 再創(chuàng)建一個(gè)協(xié)程,顯然,這段代碼的運(yùn)行結(jié)果與上一個(gè)例子是完全一樣的。需要注意的是,盡管我們可以在協(xié)程中通過 launch 這樣的方法創(chuàng)建協(xié)程,但不要再協(xié)程當(dāng)中通過 runBlocking 再來創(chuàng)建協(xié)程,因?yàn)檫@樣做雖然一般來說不會(huì)導(dǎo)致程序異常,不過,這樣的程序也沒有多大意義:
fun main(args: Array<String>) = runBlocking<Unit> {
runBlocking {
delay(1000L)
println("World!")
}
println("Hello,")
}
運(yùn)行結(jié)果:
World!
Hello,
大家看到了,嵌套的 runBlocking 實(shí)際上仍然只是一段順序代碼而已。
那么,讓我們再仔細(xì)看看前面的例子,不知道大家有沒有問題:如果我在 launch 創(chuàng)建的協(xié)程當(dāng)中多磨嘰一會(huì)兒,主線程上的協(xié)程 delay(2000L) 好像也沒多大用啊。有沒有什么方法保證協(xié)程執(zhí)行完?
4. 外部控制協(xié)程
我們在上一篇文章當(dāng)中只是對(duì)內(nèi)置的基礎(chǔ) API 進(jìn)行了簡單的封裝,而 kotlinx.coroutines 卻為我們做了非常多的事情。比如,每一個(gè)協(xié)程都看做一個(gè) Job,我們在一個(gè)協(xié)程的外部也可以控制它的運(yùn)行。
fun main(args: Array<String>) = runBlocking<Unit> {
val job = launch(CommonPool) {
delay(1000L)
println("World!")
}
println("Hello,")
job.join()
}
job.join 其實(shí)就是要求當(dāng)前協(xié)程等待 job 執(zhí)行完成之后再繼續(xù)執(zhí)行。
其實(shí),我們還可以取消協(xié)程,讓他直接停止執(zhí)行:
fun main(args: Array<String>) = runBlocking<Unit> {
val job = launch(CommonPool) {
delay(1000L)
println("World!")
}
println("Hello,")
job.cancel()
}
job.cancel 會(huì)直接終止 job 的執(zhí)行。如果 job 已經(jīng)執(zhí)行完畢,那么 job.cancel 的執(zhí)行時(shí)沒有意義的。我們也可以根據(jù) cancel 的返回值來判斷是否取消成功。
另外,cancel 還可以提供原因:
job.cancel(IllegalAccessException("World!"))
如果我們提供了這個(gè)原因,那么被取消的協(xié)程會(huì)將它打印出來。
Hello,
Exception in thread "main" java.lang.IllegalAccessException: World!
at example13.Example_13Kt$main$1.doResume(example-13.kt:14)
at kotlin.coroutines.experimental.jvm.internal.CoroutineImpl.resume(CoroutineImpl.kt:53)
at kotlinx.coroutines.experimental.DispatchedContinuation$resume$1.run(CoroutineDispatcher.kt:57)
其實(shí),如果你自己做過對(duì)線程任務(wù)的取消,你大概會(huì)知道除非被取消的線程自己去檢查取消的標(biāo)志位,或者被 interrupt,否則取消是無法實(shí)現(xiàn)的,這有點(diǎn)兒像一個(gè)人執(zhí)意要做一件事兒,另一個(gè)人說你別做啦,結(jié)果人家壓根兒沒聽見,你說他能停下來嗎?那么我們前面的取消到底是誰去監(jiān)聽了這個(gè) cancel 操作呢?
當(dāng)然是 delay 這個(gè)操作了。其實(shí)所有 kotlinx.coroutines 當(dāng)中定義的操作都可以做到這一點(diǎn),我們對(duì)代碼稍加改動(dòng),你就會(huì)發(fā)現(xiàn)異常來自何處了:
val job = launch(CommonPool) {
try {
delay(1000L)
println("World!")
} catch(e: Exception) {
e.printStackTrace()
}finally {
println("finally....")
}
}
println("Hello,")
job.cancel(IllegalAccessException("World!"))
是的,你沒看錯(cuò),我們居然可以在協(xié)程里面對(duì) cancel 進(jìn)行捕獲,如果你愿意的話,你甚至可以繼續(xù)在這個(gè)協(xié)程里面運(yùn)行代碼,但請(qǐng)不要這樣做,下面的示例破壞了 cancel 的設(shè)計(jì)本意,所以請(qǐng)勿模仿:
val job = launch(CommonPool) {
try {
...
}finally {
println("finally....")
}
println("I'm an EVIL!!! Hahahaha")
}
說這個(gè)是什么意思呢?在協(xié)程被 cancel 掉的時(shí)候,我們應(yīng)該做的其實(shí)是把戰(zhàn)場打掃干凈,比如:
val job = launch(CommonPool) {
val inputStream = ...
try {
...
}finally {
inputStream.close()
}
}
我們再來考慮下面的情形:
fun main(args: Array<String>) = runBlocking<Unit> {
val job = launch(CommonPool) {
var nextPrintTime = 0L
var i = 0
while (true) { // computation loop
val currentTime = System.currentTimeMillis()
if (currentTime >= nextPrintTime) {
println("I'm sleeping ${i++} ...")
nextPrintTime = currentTime + 500L
}
}
}
delay(1300L) // delay a bit
println("main: I'm tired of waiting!")
job.cancel() // cancels the job
delay(1300L) // delay a bit to see if it was cancelled....
println("main: Now I can quit.")
}
不得不說,kotlinx.coroutines 在幾天前剛剛更新的文檔和示例非常的棒。我們看到這個(gè)例子,while(true) 會(huì)讓這個(gè)協(xié)程不斷運(yùn)行來模擬耗時(shí)計(jì)算,盡管外部調(diào)用了 job.cancel(),但由于內(nèi)部并沒有 care 自己是否被 cancel,所以這個(gè) cancel 顯然有點(diǎn)兒失敗。如果你想要在類似這種耗時(shí)計(jì)算當(dāng)中檢測當(dāng)前協(xié)程是否被取消的話,你可以這么寫:
...
while (isActive) { // computation loop
...
}
...
isActive 會(huì)在 cancel 之后被置為 false。
其實(shí),通過這幾個(gè)示例大家就會(huì)發(fā)現(xiàn)協(xié)程的取消,與我們通常取消線程操作的思路非常類似,只不過人家封裝的比較好,而我們呢,每次還得自己搞一個(gè) CancelableTask 來實(shí)現(xiàn) Runnable 接口去承載自己的異步操作,想想也是夠原始呢。
5. 輕量級(jí)線程
協(xié)程時(shí)輕量級(jí)的,它擁有自己的運(yùn)行狀態(tài),但它對(duì)資源的消耗卻非常的小。其實(shí)能做到這一點(diǎn)的本質(zhì)原因,我們已經(jīng)在上一篇文章當(dāng)中提到過,一臺(tái)服務(wù)器開 1k 線程和 1k 協(xié)程來響應(yīng)服務(wù),前者對(duì)資源的消耗必然很大,而后者可能只是基于很少的幾個(gè)或幾十個(gè)線程來工作的,隨著請(qǐng)求數(shù)量的增加,協(xié)程的優(yōu)勢可能會(huì)體現(xiàn)的更加明顯。
我們來看個(gè)比較簡單的例子:
fun main(args: Array<String>) = runBlocking<Unit> {
val jobs = List(100_000) {
launch(CommonPool) {
delay(1000L)
print(".")
}
}
jobs.forEach { it.join() } //這里不能用 jobs.forEach(Job::join),因?yàn)?Job.join 是 suspend 方法
}
通過 List 這個(gè)方法,我們可以瞬間創(chuàng)建出很多對(duì)象放入返回的 List,注意到這里的 jobs 其實(shí)就是協(xié)程的一個(gè) List。
運(yùn)行上面的代碼,我們發(fā)現(xiàn) CommonPool 當(dāng)中的線程池的線程數(shù)量基本上維持在三四個(gè)就足夠了,如果我們用線程來寫上面的代碼會(huì)是什么感覺?
fun main(args: Array<String>) = runBlocking<Unit> {
val jobs = List(100_000) {
thread {
Thread.sleep(1000L)
log(".")
}
}
jobs.forEach(Thread::join) // Thread::join 說起來也是 1.1 的新特性呢!
}
運(yùn)行時(shí),在創(chuàng)建了 1k 多個(gè)線程之后,就拋出了異常:
Exception in thread "main" java.lang.OutOfMemoryError: unable to create new native thread
at java.lang.Thread.start0(Native Method)
嗯,又多了一個(gè)用協(xié)程的理由,對(duì)不對(duì)?
6. 攜帶值的 Job
我們前面說了,通過攜程返回的 Job,我們可以控制攜程的運(yùn)行??捎袝r(shí)候我們更關(guān)注協(xié)程運(yùn)行的結(jié)果,比如從網(wǎng)絡(luò)加載一張圖片:
suspend fun loadImage(url: String): Bitmap {
...
return ...
}
沒錯(cuò),我們更關(guān)注它的結(jié)果,這種情況我們該怎么辦呢?如果 loadImage 不是 suspend 方法,那么我們在非 UI 線程當(dāng)中直接獲取他們:
val imageA = loadImage(urlA)
val imageB = loadImage(urlB)
onImageGet(imageA, imageB)
這樣的操作有什么問題?順序獲取兩張圖片,耗時(shí),不經(jīng)濟(jì)。所以傳統(tǒng)的做法就是開兩個(gè)線程做這件事兒,這意味著你會(huì)看到兩個(gè)回調(diào),并且還要同步這兩個(gè)回調(diào),想想都頭疼。
不過我們現(xiàn)在有更好的辦法:
val imageA = defer(CommonPool) { loadImage(urlA) }
val imageB = defer(CommonPool) { loadImage(urlB) }
onImageGet(imageA.await(),imageB.await())
代碼量幾乎沒有增加,不過我們卻做到了兩張圖片異步獲取,并同時(shí)傳給 onImageGet 以便繼續(xù)后面的操作。
defer 到底是個(gè)什么東西?其實(shí)大家大可不必看到新詞就感到恐慌,這東西用法幾乎跟 launch 一樣,只不過它返回的 Deferred 功能比 Job 多了一樣:攜帶返回值。我們前面看到的 imageA 其實(shí)就是一個(gè) Deferred 實(shí)例,而它的 await 方法返回的則是 Bitmap 類型,也即 loadImage(urlA) 的返回值。
所以如果你對(duì)協(xié)程運(yùn)行的結(jié)果感興趣,直接使用 defer 來替換你的 launch 就可以了。需要注意的是,即便你不調(diào)用 await,defer 啟動(dòng)的協(xié)程也會(huì)立即運(yùn)行,如果你希望你的協(xié)程能夠按需啟動(dòng),例如只有你調(diào)用 await 之后再啟動(dòng),那么你可以用 lazyDefer:
val imageA = lazyDefer(CommonPool) { loadImage(urlA) }
val imageB = lazyDefer(CommonPool) { loadImage(urlB) }
onImageGet(imageA.await(),imageB.await()) //這時(shí)候才開始真正去加載圖片
7. 生成器
不知道大家對(duì) python 的生成器有沒有了解,這個(gè)感覺就好似延遲計(jì)算一樣。
假設(shè)我們要計(jì)算 fibonacci 數(shù)列,這個(gè)大家都知道,也非常容易寫,你可能分分鐘寫出一個(gè)遞歸的函數(shù)來求得這個(gè)序列,不過你應(yīng)該知道遞歸的層級(jí)越多,stackOverflow 的可能性越大吧?另外,如果我們只是用到其中的幾個(gè),那么遞歸的函數(shù)一下子都給求出來,而且每次調(diào)用也沒有記憶性導(dǎo)致同一個(gè)值計(jì)算多次,非常不經(jīng)濟(jì)。大家看一個(gè) python 的例子:
def fibonacci():
yield 1 # 直接返回 1, 并且在此處暫停
first = 1
second = 1
while True:
yield first
first, second = first + second, first
a = fibonacci()
for x in a:
print x
if x > 100: break
前面給出的這種計(jì)算方法,fibonacci 函數(shù)返回一個(gè)可迭代的對(duì)象,這個(gè)對(duì)象其實(shí)就是生成器,只有我們在迭代它的時(shí)候,它才會(huì)去真正執(zhí)行計(jì)算,只要遇到 yield,那么這一次迭代到的值就是 yield 后面的值,比如,我們第一次調(diào)用 fibonacci 這個(gè)函數(shù)的時(shí)候,得到的值就是 1,后面依次類推。
Kotlin 在添加了協(xié)程這個(gè)功能之后,也可以這么搞了:
val fibonacci = buildSequence {
yield(1) // first Fibonacci number
var cur = 1
var next = 1
while (true) {
yield(next) // next Fibonacci number
val tmp = cur + next
cur = next
next = tmp
}
}
...
for (i in fibonacci){
println(i)
if(i > 100) break //大于100就停止循環(huán)
}
可以這么說,這段代碼與前面的 python 版本功能是完全相同的,在 yield 方法調(diào)用時(shí),傳入的值就是本次迭代的值。
fibonacci 這個(gè)變量的類型如下:
public interface Sequence<out T> {
public operator fun iterator(): Iterator<T>
}
既然有 iterator 方法,那么我們可以直接對(duì) fibonacci 進(jìn)行迭代也就沒什么大驚小怪的了。這個(gè) iterator 保證每次迭代的時(shí)候去執(zhí)行 buildSequence 后面的 Lambda 的代碼,從上一個(gè) yield 之后開始到下一個(gè) yield 結(jié)束,yield 傳入的值就是 iterator 的 next 的返回值。
有了這個(gè)特性,我們就可以構(gòu)造許多“懶”序列,只有在用到的時(shí)候才去真正計(jì)算每一個(gè)元素的值,而且運(yùn)算狀態(tài)可以保存,每次計(jì)算的結(jié)果都不會(huì)浪費(fèi)。
注:這個(gè)特性是被 Kotlin 標(biāo)準(zhǔn)庫收錄了的,并不存在于 kotlinx.coroutines 當(dāng)中,不過這也沒關(guān)系啦,kotlinx.coroutines 的 API 會(huì)不會(huì)在不久的將來也作為 Kotlin 標(biāo)準(zhǔn)庫的內(nèi)容出現(xiàn)呢?
8. 小結(jié)
這一篇的內(nèi)容其實(shí)相對(duì)上一篇要簡單一些,面對(duì) kotlinx.coroutines 這樣的框架,我們直接通過分析案例,將 coroutine 這么理論化的東西投入實(shí)際場景,讓大家從感性上對(duì)其有個(gè)更加深入的認(rèn)識(shí)。
當(dāng)然,我們并沒有深入其中了解其原理,原因就是上一篇我們?yōu)榇俗隽俗銐虻臏?zhǔn)備 —— kotlinx.coroutines 作為官方的框架,自然要實(shí)現(xiàn)得完善一些,但也是萬變不離其宗。
寫到這里,我想,我們還是需要有一篇文章再來介紹一些協(xié)程使用的一些注意事項(xiàng),那么我們下一篇再見吧。