Kotlin語(yǔ)言基礎(chǔ)筆記
Kotlin流程控制語(yǔ)句筆記
Kotlin操作符重載與中綴表示法筆記
Kotlin擴(kuò)展函數(shù)和擴(kuò)展屬性筆記
Kotlin空指針安全(null-safety)筆記
Kotlin類型系統(tǒng)筆記
Kotlin面向?qū)ο缶幊坦P記
Kotlin委托(Delegation)筆記
Kotlin泛型型筆記
Kotlin函數(shù)式編程筆記
Kotlin與Java互操作筆記
Kotlin協(xié)程筆記
很多小伙伴可能會(huì)覺(jué)得Java有了線程、線程池了,我們還要協(xié)程(Coroutines)干嘛。這里還是有些區(qū)別的。區(qū)別有:
- 線程是為了提高CPU的利用率,調(diào)度是由操作系統(tǒng)決定的,而協(xié)程是為了解決多個(gè)任務(wù)更好的協(xié)作,調(diào)度是由我們代碼控制的。
- 協(xié)程并不是為了取代線程,協(xié)程對(duì)線程進(jìn)行抽象,你可以看成協(xié)程是一個(gè)異步調(diào)用的框架,解決了之前線程間協(xié)作代碼繁瑣的問(wèn)題。
我們先來(lái)看一段代碼,如下:
data class Product(var id: String, var title: String)
data class Stock(var pid: String, var stock: Int)
data class Pms(var pid: String, var pmsTips: String)
suspend fun getProductsByIds(pids: List<String>): List<Product> {
delay(1000)
return listOf(Product("1", "a"), Product("2", "b"))
}
suspend fun getProductStocksByIds(pids: List<String>): List<Stock> {
delay(2000)
return listOf(Stock("1", 2), Stock("2", 4))
}
suspend fun getProductPMSByIds(pids: List<String>): List<Pms> {
delay(3000)
return listOf(Pms("1", "100減99"), Pms("2", "100減99"))
}
fun combine(products: List<Product>?, productStocks: List<Stock>?, productPMS: List<Pms>?) {
println(products)
println(productStocks)
println(productPMS)
}
fun main(args: Array<String>) = runBlocking<Unit> {
val pids = listOf<String>("1", "2")
val products = async {
withTimeoutOrNull(1500) {
getProductsByIds(pids)
}
}
val productStocks = async {
withTimeoutOrNull(2500) {
getProductStocksByIds(pids)
}
}
val productPMS = async {
withTimeoutOrNull(2500) {
getProductPMSByIds(pids)
}
}
val measureTimeMillis = measureTimeMillis {
combine(products.await(), productStocks.await(), productPMS.await())
}
println(measureTimeMillis)
}
這段代碼看起來(lái)就像是偽代碼,不過(guò)還是非常容易理解,就是通過(guò)一批商品id,分別調(diào)用三個(gè)接口拿到商品的信息,商品的庫(kù)存,商品的優(yōu)惠信息,然后再合并數(shù)據(jù),這個(gè)場(chǎng)景無(wú)論在后端還是前端都會(huì)經(jīng)常遇到,比如APP調(diào)用的一個(gè)接口,需要從不同的底層系統(tǒng)獲取到不同部分的數(shù)據(jù),然后聚合好一次性返回給APP。想想如果是用Java來(lái)實(shí)現(xiàn)的會(huì)有多復(fù)雜。用Kotlin的協(xié)程實(shí)現(xiàn)就像是寫(xiě)順序執(zhí)行的代碼,但實(shí)際上你做的是異步調(diào)用。
1.第一個(gè)協(xié)程代碼
fun main(args: Array<String>) {
launch { // launch new coroutine in background and continue
delay(1000L) // non-blocking delay for 1 second (default time unit is ms)
println("World!") // print after delay
}
println("Hello,") // main thread continues while coroutine is delayed
Thread.sleep(2000L) // block main thread for 2 seconds to keep JVM alive
}
我們使用lauch來(lái)啟動(dòng)一個(gè)協(xié)程,其中要注意的是delay這個(gè)函數(shù),看起來(lái)它跟Thread.sleep是一樣的作用,但是他們有本質(zhì)的區(qū)別,Thread.sleep會(huì)阻塞當(dāng)前線程(線程就傻傻的在等待),而delay是暫停當(dāng)前的協(xié)程,不會(huì)阻塞當(dāng)前線程,這個(gè)線程可以去做其他事情。delay是一個(gè)suspending function,它只能運(yùn)行在協(xié)程里面,如果不在協(xié)程中運(yùn)行,會(huì)報(bào)以下異常。
Error: Kotlin: Suspend functions are only allowed to be called from a coroutine or another suspend function
2. runBlocking
runBlocking函數(shù)會(huì)阻塞當(dāng)前線程,一直等到協(xié)程運(yùn)行完。上面的例子可以改成下面的:
fun main(args: Array<String>) = runBlocking<Unit> { // start main coroutine
launch { // launch new coroutine in background and continue
delay(1000L)
println("World!")
}
println("Hello,") // main coroutine continues here immediately
delay(2000L) // delaying for 2 seconds to keep JVM alive
}
3.等待協(xié)程完成
延時(shí)一段時(shí)間來(lái)等待協(xié)程完成通常不是很高效,我們可以通過(guò)join來(lái)實(shí)現(xiàn)一旦協(xié)程完成就退出main函數(shù)。
fun main(args: Array<String>) = runBlocking<Unit> {
val job = launch { // launch new coroutine and keep a reference to its Job
delay(1000L)
println("World!")
}
println("Hello,")
job.join() // wait until child coroutine completes
}
4. suspending function 暫停函數(shù)
我們也可以使用suspending function重構(gòu)下。
fun main(args: Array<String>) = runBlocking<Unit> {
val job = launch { doWorld() }
println("Hello,")
job.join()
}
// this is your first suspending function
suspend fun doWorld() {
delay(1000L)
println("World!")
}
注意:delay也是一個(gè)suspending function,所以depay只能放在suspending function或者協(xié)程代碼(lanuch)里面。
5. 協(xié)程是非常輕量級(jí)的
fun main(args: Array<String>) = runBlocking<Unit> {
val jobs = List(100_000) { // launch a lot of coroutines and list their jobs
launch {
delay(1000L)
print(".")
}
}
jobs.forEach { it.join() } // wait for all jobs to complete
}
啟動(dòng)了10萬(wàn)個(gè)協(xié)程,最后代碼能夠成功的執(zhí)行完成。同樣,大家可以試試換成起10萬(wàn)個(gè)線程試試,應(yīng)該會(huì)得出OOM的結(jié)果。
6. 協(xié)程像守護(hù)線程
請(qǐng)看下面這段代碼:
fun main(args: Array<String>) = runBlocking<Unit> {
launch {
repeat(1000) { i ->
println("I'm sleeping $i ...")
delay(500L)
}
}
delay(1300L) // just quit after delay
}
輸出如下:
I'm sleeping 0 ...
I'm sleeping 1 ...
I'm sleeping 2 ...
可以知道,等待1.3秒后,main退出了。不會(huì)等待launch的協(xié)程運(yùn)行完。
7. 協(xié)程取消
launch返回一個(gè)Job對(duì)象,它可以被取消:
fun main(args: Array<String>) = runBlocking<Unit> {
val job = launch {
repeat(1000) { i ->
println("I'm sleeping $i ...")
delay(500L)
}
}
delay(1300L) // delay a bit
println("main: I'm tired of waiting!")
job.cancel() // cancels the job
job.join() // waits for job's completion
println("main: Now I can quit.")
}
輸出如下:
I'm sleeping 0 ...
I'm sleeping 1 ...
I'm sleeping 2 ...
main: I'm tired of waiting!
main: Now I can quit.
可以看到,一旦調(diào)用了job.cancel(),就退出了main函數(shù)。Job還有一個(gè)cancelAndJoin方法,合并了cancel和join操作。
8. 協(xié)程的取消可能需要協(xié)作完成
協(xié)程的取消可能需要協(xié)作完成,所有在kotlinx.coroutines包下面的suspending functions都可以被取消,但是如果一個(gè)協(xié)程處在計(jì)算中,他是不能被取消的,比如這個(gè)例子:
fun main(args: Array<String>) = runBlocking<Unit> {
val startTime = System.currentTimeMillis()
val job = launch {
var nextPrintTime = startTime
var i = 0
while (i < 5) { // computation loop, just wastes CPU
// print a message twice a second
if (System.currentTimeMillis() >= nextPrintTime) {
println("I'm sleeping ${i++} ...")
nextPrintTime += 500L
}
}
}
delay(1300L) // delay a bit
println("main: I'm tired of waiting!")
job.cancelAndJoin() // cancels the job and waits for its completion
println("main: Now I can quit.")
}
你可以看到調(diào)用取消后,還在打印。
9. 讓處于計(jì)算中的協(xié)程可取消
有兩種方式可以做到:
- 最簡(jiǎn)單的在while循環(huán)最后面調(diào)用下yield函數(shù)。這樣就在每次循環(huán)后讓協(xié)程有了被取消的機(jī)會(huì),yield是
kotlinx.coroutines包下的suspending functions。 - 檢查協(xié)程取消的狀態(tài),如果發(fā)現(xiàn)被取消,則退出循環(huán)。
下面我們以第二種方式演示下:
fun main(args: Array<String>) = runBlocking<Unit> {
val startTime = System.currentTimeMillis()
val job = launch {
var nextPrintTime = startTime
var i = 0
while (isActive) { // cancellable computation loop
// print a message twice a second
if (System.currentTimeMillis() >= nextPrintTime) {
println("I'm sleeping ${i++} ...")
nextPrintTime += 500L
}
}
}
delay(1300L) // delay a bit
println("main: I'm tired of waiting!")
job.cancelAndJoin() // cancels the job and waits for its completion
println("main: Now I can quit.")
isActive是協(xié)程的CoroutineScope的一個(gè)屬性。
10. 協(xié)程中try catch finally
當(dāng)協(xié)程被取消時(shí),catch和finally可以被執(zhí)行。
fun main(args: Array<String>) = runBlocking<Unit> {
val job = launch {
try {
repeat(1000) { i ->
println("I'm sleeping $i ...")
delay(500L)
}
}catch (e:Throwable){
println("I'm running catch")
} finally {
println("I'm running finally")
}
}
delay(1300L) // delay a bit
println("main: I'm tired of waiting!")
job.cancelAndJoin() // cancels the job and waits for its completion
println("main: Now I can quit.")
}
輸出:
I'm sleeping 0 ...
I'm sleeping 1 ...
I'm sleeping 2 ...
main: I'm tired of waiting!
I'm running catch
I'm running finally
main: Now I can quit.
11. withContext函數(shù)
在上個(gè)例子中,如果我們?cè)趂inally塊中調(diào)用suspending functions的話,會(huì)拋出CancellationException,因?yàn)閰f(xié)程已經(jīng)被取消了。不過(guò)一般來(lái)說(shuō)沒(méi)什么太大問(wèn)題,只要不調(diào)用suspending functions。如果你一定要在調(diào)用的話,你可以使用withContext(NonCancellable) {...}。如下:
fun main(args: Array<String>) = runBlocking<Unit> {
val job = launch {
try {
repeat(1000) { i ->
println("I'm sleeping $i ...")
delay(500L)
}
} finally {
withContext(NonCancellable) {
println("I'm running finally")
delay(1000L)
println("And I've just delayed for 1 sec because I'm non-cancellable")
}
}
}
delay(1300L) // delay a bit
println("main: I'm tired of waiting!")
job.cancelAndJoin() // cancels the job and waits for its completion
println("main: Now I can quit.")
}
12. Timeout超時(shí)
如果要設(shè)定協(xié)程調(diào)用超時(shí)時(shí)間,我們可以使用withTimeout函數(shù),如下:
fun main(args: Array<String>) = runBlocking<Unit> {
withTimeout(1300L) {
repeat(1000) { i ->
println("I'm sleeping $i ...")
delay(500L)
}
}
}
輸出如下:
I'm sleeping 0 ...
I'm sleeping 1 ...
I'm sleeping 2 ...
Exception in thread "main" kotlinx.coroutines.experimental.TimeoutCancellationException: Timed out waiting for 1300 MILLISECONDS
如果超時(shí)的時(shí)候你不想拋出異常,你可以使用withTimeoutOrNull函數(shù),超時(shí)的時(shí)候它會(huì)返回null。
fun main(args: Array<String>) = runBlocking<Unit> {
val result = withTimeoutOrNull(1300L) {
repeat(1000) { i ->
println("I'm sleeping $i ...")
delay(500L)
}
"Done" // will get cancelled before it produces this result
}
println("Result is $result")
}
輸出如下:
I'm sleeping 0 ...
I'm sleeping 1 ...
I'm sleeping 2 ...
Result is null
13. 使用async并發(fā)調(diào)用
async與launch類似,它也是啟動(dòng)一個(gè)協(xié)程,只不過(guò)lauch返回的是Job(沒(méi)有返回值),而async返回的是Deferred(帶返回值),你可以使用.await()來(lái)獲取Deferred的值。Deferred是Job的子類,所以Deferred也可以被取消??纯聪旅孢@段代碼:
suspend fun doSomethingUsefulOne(): Int {
delay(1000L) // pretend we are doing something useful here
return 13
}
suspend fun doSomethingUsefulTwo(): Int {
delay(1000L) // pretend we are doing something useful here, too
return 29
}
fun main(args: Array<String>) = runBlocking<Unit> {
val time = measureTimeMillis {
val one = async { doSomethingUsefulOne() }
val two = async { doSomethingUsefulTwo() }
println("The answer is ${one.await() + two.await()}")
}
println("Completed in $time ms")
}
輸出如下:
The answer is 42
Completed in 1016 ms
因?yàn)槭遣⑿姓{(diào)用,所以時(shí)間差不多是1秒。
14. async延時(shí)調(diào)用
fun main(args: Array<String>) = runBlocking<Unit> {
val time = measureTimeMillis {
val one = async(start = CoroutineStart.LAZY) { doSomethingUsefulOne() }
val two = async(start = CoroutineStart.LAZY) { doSomethingUsefulTwo() }
println("The answer is ${one.await() + two.await()}")
}
println("Completed in $time ms")
}
如果async帶上了start = CoroutineStart.LAZY參數(shù),協(xié)程不會(huì)立即執(zhí)行,會(huì)等到調(diào)用await的時(shí)候才開(kāi)始執(zhí)行。上面代碼輸出如下:
The answer is 42
Completed in 2017 ms
執(zhí)行結(jié)果看起來(lái)變成了順序執(zhí)行,那是因?yàn)閛ne.await執(zhí)行完成之后,才會(huì)開(kāi)始調(diào)用two.await()執(zhí)行。所以變成了順序執(zhí)行。
15. Async-style functions
// The result type of somethingUsefulOneAsync is Deferred<Int>
fun somethingUsefulOneAsync() = async {
doSomethingUsefulOne()
}
// The result type of somethingUsefulTwoAsync is Deferred<Int>
fun somethingUsefulTwoAsync() = async {
doSomethingUsefulTwo()
}
上面兩個(gè)方法xxxAsync并不是suspending functions,所以他們可以在任何地方調(diào)用。
// note, that we don't have `runBlocking` to the right of `main` in this example
fun main(args: Array<String>) {
val time = measureTimeMillis {
// we can initiate async actions outside of a coroutine
val one = somethingUsefulOneAsync()
val two = somethingUsefulTwoAsync()
// but waiting for a result must involve either suspending or blocking.
// here we use `runBlocking { ... }` to block the main thread while waiting for the result
runBlocking {
println("The answer is ${one.await() + two.await()}")
}
}
println("Completed in $time ms")
}
16. Dispatchers and threads
launch和async都接收一個(gè)可選的CoroutineContext參數(shù)可以用來(lái)指定CoroutineDispatcher。如下:
fun main(args: Array<String>) = runBlocking<Unit> {
val jobs = arrayListOf<Job>()
jobs += launch(Unconfined) { // not confined -- will work with main thread
println(" 'Unconfined': I'm working in thread ${Thread.currentThread().name}")
}
jobs += launch(coroutineContext) { // context of the parent, runBlocking coroutine
println("'coroutineContext': I'm working in thread ${Thread.currentThread().name}")
}
jobs += launch(CommonPool) { // will get dispatched to ForkJoinPool.commonPool (or equivalent)
println(" 'CommonPool': I'm working in thread ${Thread.currentThread().name}")
}
jobs += launch(newSingleThreadContext("MyOwnThread")) { // will get its own new thread
println(" 'newSTC': I'm working in thread ${Thread.currentThread().name}")
}
jobs.forEach { it.join() }
}
輸出如下:
'Unconfined': I'm working in thread main
'CommonPool': I'm working in thread ForkJoinPool.commonPool-worker-1
'newSTC': I'm working in thread MyOwnThread
'coroutineContext': I'm working in thread main
默認(rèn)的dispatcher是DefaultDispatcher當(dāng)前的實(shí)現(xiàn)是CommonPool
17. Unconfined vs confined dispatcher
Unconfined dispatcher會(huì)在當(dāng)前線程開(kāi)始執(zhí)行協(xié)程,但是僅僅是在第一個(gè)暫停點(diǎn),之后它恢復(fù)后的dispatcher取決于那個(gè)線程執(zhí)行suspending function。
coroutineContext 是CoroutineScope的一個(gè)屬性,它的dispatcher會(huì)繼承它parent線程的dispatcher。 代碼如下:
fun main(args: Array<String>) = runBlocking<Unit> {
val jobs = arrayListOf<Job>()
jobs += launch(Unconfined) { // not confined -- will work with main thread
println(" 'Unconfined': I'm working in thread ${Thread.currentThread().name}")
delay(500)
println(" 'Unconfined': After delay in thread ${Thread.currentThread().name}")
}
jobs += launch(coroutineContext) { // context of the parent, runBlocking coroutine
println("'coroutineContext': I'm working in thread ${Thread.currentThread().name}")
delay(1000)
println("'coroutineContext': After delay in thread ${Thread.currentThread().name}")
}
jobs.forEach { it.join() }
}
輸出如下:
'Unconfined': I'm working in thread main
'coroutineContext': I'm working in thread main
'Unconfined': After delay in thread kotlinx.coroutines.DefaultExecutor
'coroutineContext': After delay in thread main
所以,coroutineContext繼承了runBlocking的main線程,而unconfined恢復(fù)后變成了default executor線程。
18. 線程切換
加上-Dkotlinx.coroutines.debugJVM參數(shù)運(yùn)行下面的代碼:
fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")
fun main(args: Array<String>) {
newSingleThreadContext("Ctx1").use { ctx1 ->
newSingleThreadContext("Ctx2").use { ctx2 ->
runBlocking(ctx1) {
log("Started in ctx1")
withContext(ctx2) {
log("Working in ctx2")
}
log("Back to ctx1")
}
}
}
}
這里展示了幾個(gè)用法:一個(gè)是使用runBlocking指明一個(gè)特殊的Context,另外一個(gè)是使用withContext來(lái)切換Context,輸出如下:
[Ctx1 @coroutine#1] Started in ctx1
[Ctx2 @coroutine#1] Working in ctx2
[Ctx1 @coroutine#1] Back to ctx1
還有就是run來(lái)釋放線程。
19. 通過(guò)Context來(lái)獲取Job
協(xié)程的Job是Context的一個(gè)屬性,如下:
fun main(args: Array<String>) = runBlocking<Unit> {
println("My job is ${coroutineContext[Job]}")
}
19. 子協(xié)程
在協(xié)程中使用coroutineContext來(lái)啟動(dòng)另一個(gè)協(xié)程,新協(xié)程的Job變成了父協(xié)程的子Job,當(dāng)父協(xié)程取消時(shí),子協(xié)程也會(huì)被取消。
fun main(args: Array<String>) = runBlocking<Unit> {
// launch a coroutine to process some kind of incoming request
val request = launch {
// it spawns two other jobs, one with its separate context
val job1 = launch {
println("job1: I have my own context and execute independently!")
delay(1000)
println("job1: I am not affected by cancellation of the request")
}
// and the other inherits the parent context
val job2 = launch(coroutineContext) {
delay(100)
println("job2: I am a child of the request coroutine")
delay(1000)
println("job2: I will not execute this line if my parent request is cancelled")
}
// request completes when both its sub-jobs complete:
job1.join()
job2.join()
}
delay(500)
request.cancel() // cancel processing of the request
delay(1000) // delay a second to see what happens
println("main: Who has survived request cancellation?")
}
輸出結(jié)果如下:
job1: I have my own context and execute independently!
job2: I am a child of the request coroutine
job1: I am not affected by cancellation of the request
main: Who has survived request cancellation?
20. Context聯(lián)合
協(xié)程Context可以使用+聯(lián)合,如下:
fun main(args: Array<String>) = runBlocking<Unit> {
// start a coroutine to process some kind of incoming request
val request = launch(coroutineContext) { // use the context of `runBlocking`
// spawns CPU-intensive child job in CommonPool !!!
val job = launch(coroutineContext + CommonPool) {
println("job: I am a child of the request coroutine, but with a different dispatcher")
delay(1000)
println("job: I will not execute this line if my parent request is cancelled")
}
job.join() // request completes when its sub-job completes
}
delay(500)
request.cancel() // cancel processing of the request
delay(1000) // delay a second to see what happens
println("main: Who has survived request cancellation?")
}
job是request的子協(xié)程,但是是在CommonPool的線程中執(zhí)行操作。所以取消request,job也會(huì)取消。
21. 父協(xié)程會(huì)等待子協(xié)程完成
父協(xié)程會(huì)等待子協(xié)程完成,不需要使用join來(lái)等待他們完成。
fun main(args: Array<String>) = runBlocking<Unit> {
// launch a coroutine to process some kind of incoming request
val request = launch {
repeat(3) { i -> // launch a few children jobs
launch(coroutineContext) {
delay((i + 1) * 200L) // variable delay 200ms, 400ms, 600ms
println("Coroutine $i is done")
}
}
println("request: I'm done and I don't explicitly join my children that are still active")
}
request.join() // wait for completion of the request, including all its children
println("Now processing of the request is complete")
}
輸出如下:
request: I'm done and I don't explicitly join my children that are still active
Coroutine 0 is done
Coroutine 1 is done
Coroutine 2 is done
Now processing of the request is complete
22. Tricks
假如我們現(xiàn)在在寫(xiě)一個(gè)anroid app,在activity中啟動(dòng)了很多協(xié)程異步調(diào)用接口獲取數(shù)據(jù),當(dāng)這個(gè)activity被destory后,所有的協(xié)程需要被取消,要不然就可能會(huì)發(fā)生內(nèi)存泄漏。
我們可以創(chuàng)建一個(gè)Job實(shí)例,然后使用launch(coroutineContext, parent = job)來(lái)明確指定parent job。
這樣的話,我們可以調(diào)用Job.cancel來(lái)取消所有的子協(xié)程,而Job.join可以等待所有的子協(xié)程完成。如下:
fun main(args: Array<String>) = runBlocking<Unit> {
val job = Job() // create a job object to manage our lifecycle
// now launch ten coroutines for a demo, each working for a different time
val coroutines = List(10) { i ->
// they are all children of our job object
launch(coroutineContext, parent = job) { // we use the context of main runBlocking thread, but with our parent job
delay((i + 1) * 200L) // variable delay 200ms, 400ms, ... etc
println("Coroutine $i is done")
}
}
println("Launched ${coroutines.size} coroutines")
delay(500L) // delay for half a second
println("Cancelling the job!")
job.cancelAndJoin() // cancel all our coroutines and wait for all of them to complete
}
輸出如下:
Launched 10 coroutines
Coroutine 0 is done
Coroutine 1 is done
Cancelling the job!
23. channel, select, actor
請(qǐng)看:https://github.com/Kotlin/kotlinx.coroutines/blob/master/coroutines-guide.md