Kotlin(二十一)協(xié)程(異步流)

1.Flows(Flows)

掛起函數(shù)可以通過(guò)返回值返回單個(gè)計(jì)算值,但如何返回多個(gè)計(jì)算值呢?我們可以使用Flows。我們使用集合遍歷,打印來(lái)返回多個(gè)計(jì)算值來(lái)舉例子

package com.example.kotlin01
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
 fun main()  = runBlocking {
     launch {
        for (k in 1..3){
            println("我沒有被阻塞,當(dāng)前值$k")
            delay(100)
        }
     }
     foo().collect{
         value ->
         println(value)
     }
 }
fun foo(): Flow<Int> = flow {
    for (i in 1..3) {
        delay(100)
        emit(i)
    }
}
我沒有被阻塞,當(dāng)前值1
1
我沒有被阻塞,當(dāng)前值2
2
我沒有被阻塞,當(dāng)前值3
3

從打印的結(jié)果來(lái)看,兩個(gè)打印的地方都是異步的,并沒有阻塞主線程。

  • Flow類型的構(gòu)造器函數(shù)名為flow
  • flow中的代碼塊可以掛起
  • 計(jì)算值通過(guò)emit函數(shù)發(fā)出
  • 結(jié)果值通過(guò)collect函數(shù)從fllow取出
  • 此場(chǎng)景有點(diǎn)類型RxJava

2.流是冷的(Flows are cold)

流是冷的意思是,flow build中的代碼在開始收集之前不會(huì)運(yùn)行,也就是在collect之前,不會(huì)運(yùn)行flow里面的代碼塊

package com.example.kotlin01

import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking


fun main() = runBlocking {
    var flow = foo()
    flow.collect { value ->
        println(value)
    }
    println("flow agin")
    flow.collect { value ->
        println(value)
    }
}


fun foo(): Flow<Int> = flow {
    println("flow start")
    for (i in 1..3) {
        delay(100)
        emit(i)
    }
}



flow start
1
2
3
flow agin
flow start
1
2
3

每次 collect的時(shí)候都會(huì)start

3.取消流(Flow cancellation)

流的收集操作可以在當(dāng)流在一個(gè)可以取消的掛起函數(shù)中掛起的時(shí)候取消(比如delay ),否則不能取消
下面通過(guò)withTimeoutOrNull 塊演示流如何在超時(shí)的時(shí)候被取消并停止執(zhí)行

package com.example.kotlin01

import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.withTimeoutOrNull


fun main() = runBlocking {
    var flow = foo()
    withTimeoutOrNull(250) {
        flow.collect { value ->
            println("collect $value")
        }
    }
    println("done")


}


fun foo(): Flow<Int> = flow {

    for (i in 1..Int.MAX_VALUE) {
        delay(100)
        println("emit $i")
        emit(i)
    }
}



emit 1
collect 1
emit 2
done

以上的代碼,foo函數(shù)中每隔100毫秒收集一次,main函數(shù),收集flow的計(jì)算值。并且在250毫秒的時(shí)候超時(shí)。我們看打印知道,打印了1和2之后,就打印了 done,因?yàn)榇藭r(shí)當(dāng)前流在一個(gè)可以取消的掛起函數(shù)中delay函數(shù)掛起。因?yàn)槌瑫r(shí)之后被取消了。

4.流構(gòu)建器(Flow builders)

前面說(shuō)的flow{}是最基礎(chǔ)的流構(gòu)建器,還有其他構(gòu)建器可以聲明流

  • flowOf()定義了一個(gè)發(fā)射固定值集的流構(gòu)建器
  • 可以使用asFlow()擴(kuò)展函數(shù)將各種集合和序列轉(zhuǎn)換成流
fun main() = runBlocking {
    (1..3).asFlow().collect() { value -> println(value) }

}

1
2
3

5. 中間流運(yùn)算符

可以使用運(yùn)算符來(lái)轉(zhuǎn)換流,中間運(yùn)算符應(yīng)用于上游流并返回下游流,比如我們熟悉的map,filter
使用map將請(qǐng)求值映射成結(jié)果值

import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.runBlocking


fun main() = runBlocking {
    (1..5).asFlow().map { value -> request(value) }
        .collect { response -> println(response) }


}
suspend fun request(parm: Int): Int {
    delay(100)
    return parm
}

1
2
3
4
5

5.1.轉(zhuǎn)換操作符(Transform operator)

轉(zhuǎn)換運(yùn)算符Transform可以用來(lái)模擬簡(jiǎn)單的數(shù)據(jù)轉(zhuǎn)換也可以用來(lái)更復(fù)雜的運(yùn)算符轉(zhuǎn)換
使用Transform模擬在執(zhí)行一個(gè)耗時(shí)任務(wù)之前發(fā)出一個(gè)字符串并在字符串后跟出一個(gè)響應(yīng)

package com.example.kotlin01

import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.transform
import kotlinx.coroutines.runBlocking


fun main() = runBlocking<Unit> {
   val flow =  (1..5).asFlow().transform { param ->
        emit("這是第 $param 次請(qǐng)求")
        emit(request(param))
    }
    flow.collect{data-> println(data)}


}

suspend fun request(parm: Int): Int {
    delay(100)
    return parm
}



這是第 1 次請(qǐng)求
1
這是第 2 次請(qǐng)求
2
這是第 3 次請(qǐng)求
3
這是第 4 次請(qǐng)求
4
這是第 5 次請(qǐng)求
5

5.2.限長(zhǎng)運(yùn)算符(Size-limiting operators)

限長(zhǎng)運(yùn)算符在達(dá)到相應(yīng)限制的時(shí)候,取消流的執(zhí)行。協(xié)程中的取消總是通過(guò)拋出異常來(lái)實(shí)現(xiàn)的,這樣所有的資源管理函數(shù)就可以在取消時(shí)正常執(zhí)行
限制長(zhǎng)度是2,我們可以看到打印的結(jié)果2之后就停止了

package com.example.kotlin01
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.take
import kotlinx.coroutines.runBlocking
fun main() = runBlocking<Unit> {
  try {
      val flow = (1..5).asFlow().take(2).map { value ->
          request(value)
      }.collect { response -> println(response) }
  } finally {
      println("Finally in numbers")

  }
}
suspend fun request(parm: Int): Int {
  delay(100)
  return parm
}
1
2
Finally in numbers

6. 流運(yùn)算符(Terminal flow operators)

終端流運(yùn)算符是用于啟動(dòng)流的掛起函數(shù),collect是最基本的終端流運(yùn)算符,但還要其他的終端流運(yùn)算符,可以使得操作更加簡(jiǎn)單

  • 轉(zhuǎn)換各種集合 toList,toSet
  • first 用于獲取第一個(gè)值,single,用于確保流發(fā)出單個(gè)值
  • 使用reduce和fold 將流還原為某個(gè)值
package com.example.kotlin01

import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.runBlocking


fun main() = runBlocking<Unit> {
    try {
        val list = (1..5).asFlow().take(2).map { value ->
            request(value)
        }.toList()
        println(list)
    } finally {
        println("Finally in numbers")

    }


}

suspend fun request(parm: Int): Int {
    delay(100)
    return parm
}


[1, 2]
Finally in numbers

將流轉(zhuǎn)換成集合

7. 流是連續(xù)的(Flows are sequential)

每個(gè)流的單獨(dú)集合都是按照順序執(zhí)行的。

package com.example.kotlin01

import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.filter
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.runBlocking


fun main() = runBlocking<Unit> {
    (1..5).asFlow().filter {
        println("當(dāng)前傳入的值$it")
        it % 2 == 0//過(guò)濾下 模 2 = 0的
    }
        .map {
            println("當(dāng)前轉(zhuǎn)換的值$it")
            "$it"
        }
        .collect {
            println("當(dāng)前收集的值 $it ")
        }

}



當(dāng)前傳入的值1
當(dāng)前傳入的值2
當(dāng)前轉(zhuǎn)換的值2
當(dāng)前收集的值 2 
當(dāng)前傳入的值3
當(dāng)前傳入的值4
當(dāng)前轉(zhuǎn)換的值4
當(dāng)前收集的值 4 
當(dāng)前傳入的值5

只有2和4是滿足的,2和4會(huì)執(zhí)行到收集完畢,再繼續(xù)發(fā)射下一個(gè)值。

8. 流上下文

流的收集總是在調(diào)用協(xié)程的上下文中執(zhí)行

package com.example.kotlin01

import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.runBlocking


fun main() = runBlocking {
    foo().collect { value -> println("當(dāng)前線程${Thread.currentThread().name}-當(dāng)前值$value") }

}

fun foo(): Flow<Int> = flow {
    for (i in 1..4) {
        emit(i)
    }

}

當(dāng)前線程main-當(dāng)前值1
當(dāng)前線程main-當(dāng)前值2
當(dāng)前線程main-當(dāng)前值3
當(dāng)前線程main-當(dāng)前值4

調(diào)用協(xié)程的地方是在主線程中執(zhí)行的,所有流的收集也是在主線程中執(zhí)行。因?yàn)槠渖舷挛脑谥骶€程。

8.1.錯(cuò)誤地使用 withContext(Wrong emission withContext)

通常withContext用于使用協(xié)程的時(shí)候更改代碼中的上下文,但是flow中的代碼塊必須要保留上下文的屬性,不能修改上下文。

package com.example.kotlin01

import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.withContext


fun main() = runBlocking {
    foo().collect { value -> println("當(dāng)前線程${Thread.currentThread().name}-當(dāng)前值$value") }

}

fun foo(): Flow<Int> = flow {
    withContext(Dispatchers.Default){
        for (i in 1..4) {
            emit(i)
        }
    }


}

Exception in thread "main" java.lang.IllegalStateException: Flow invariant is violated:
        Flow was collected in [BlockingCoroutine{Active}@614b37e6, BlockingEventLoop@185a864f],
        but emission happened in [DispatchedCoroutine{Active}@64e54ce0, Dispatchers.Default].
        Please refer to 'flow' documentation or use 'flowOn' instead
    at kotlinx.coroutines.flow.internal.SafeCollector_commonKt.checkContext(SafeCollector.common.kt:84)
    at kotlinx.coroutines.flow.internal.SafeCollector.checkContext(SafeCollector.kt:84)
    at kotlinx.coroutines.flow.internal.SafeCollector.emit(SafeCollector.kt:70)
    at kotlinx.coroutines.flow.internal.SafeCollector.emit(SafeCollector.kt:55)
    at com.example.kotlin01.CoroutineKt$foo$1$1.invokeSuspend(coroutine.kt:19)

8.2.flowOn 運(yùn)算符(flowOn operator)

有個(gè)例外就是flowOn運(yùn)算符可以修改發(fā)射時(shí)的上下文


import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.flowOn
import kotlinx.coroutines.runBlocking


fun main() = runBlocking {
    foo().collect { value -> println("當(dāng)前線程${Thread.currentThread().name}-當(dāng)前值$value") }

}

fun foo(): Flow<Int> = flow {
        for (i in 1..4) {
            Thread.sleep(100)
            emit(i)
            println("發(fā)射值的當(dāng)前線程${Thread.currentThread().name}")
        }
}.flowOn(Dispatchers.Default)



發(fā)射值的當(dāng)前線程DefaultDispatcher-worker-1
當(dāng)前線程main-當(dāng)前值1
發(fā)射值的當(dāng)前線程DefaultDispatcher-worker-1
當(dāng)前線程main-當(dāng)前值2
發(fā)射值的當(dāng)前線程DefaultDispatcher-worker-1
當(dāng)前線程main-當(dāng)前值3
發(fā)射值的當(dāng)前線程DefaultDispatcher-worker-1
當(dāng)前線程main-當(dāng)前值4

結(jié)果是發(fā)射時(shí)的線程是在DefaultDispatcher,而收集值的線程是在main

9.緩沖(Buffering)

package com.example.kotlin01

import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.runBlocking
import kotlin.system.measureTimeMillis


fun main() = runBlocking {
    val time = measureTimeMillis {
        foo().collect {
                value ->
            delay(300)
        }
    }
    println("當(dāng)前流執(zhí)行需要花費(fèi)的時(shí)間$time")

}

fun foo(): Flow<Int> = flow {
    for (i in 1..10) {
        delay(100)
        emit(i)
    }
}



當(dāng)前流執(zhí)行需要花費(fèi)的時(shí)間4081

以上的代碼我們是按照?qǐng)?zhí)行,先執(zhí)行完收集代碼,在執(zhí)行fool.時(shí)間花費(fèi)是4081

package com.example.kotlin01

import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.runBlocking
import kotlin.system.measureTimeMillis


fun main() = runBlocking {
    val time = measureTimeMillis {
        foo().buffer().collect {
                value ->
            delay(300)
        }
    }
    println("當(dāng)前流執(zhí)行需要花費(fèi)的時(shí)間$time")

}

fun foo(): Flow<Int> = flow {
    for (i in 1..10) {
        delay(100)
        emit(i)
    }
}



當(dāng)前流執(zhí)行需要花費(fèi)的時(shí)間3214

使用buffer運(yùn)算符在運(yùn)行取值代碼的同時(shí)同時(shí)運(yùn)行 fool(),而不是按順序運(yùn)行他門.時(shí)間花費(fèi)了3214

9.1合并(Conflation)

有時(shí)候我們的需求是不關(guān)心每一個(gè)值的結(jié)果,值關(guān)心最近的值,比如最新的狀態(tài)等.當(dāng)時(shí)可能前面的值還在處理中,導(dǎo)致沒法生成后面的值,這樣我們可以使用Conflation跳過(guò)前面的值

package com.example.kotlin01

import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.runBlocking
import kotlin.system.measureTimeMillis


fun main() = runBlocking {
    val time = measureTimeMillis {
        foo().conflate().collect {
                value ->
            delay(300)
            println(value)
        }
    }
    println("當(dāng)前流執(zhí)行需要花費(fèi)的時(shí)間$time")

}

fun foo(): Flow<Int> = flow {
    for (i in 1..10) {
        delay(100)
        emit(i)
    }
}



1
4
6
9
10

我們看到前面沒有完全打印,比如4,是因?yàn)?這個(gè)數(shù)字還在處理中,而234已經(jīng)生成,所以就可以合并到4中.所以打印了4

9.2.處理最新值(Processing the latest value)

在發(fā)射端和收集端處理很慢的時(shí)候,合并加快處理速度的一種方法,如9.1,她通過(guò)丟棄發(fā)射值來(lái)實(shí)現(xiàn).另外一種方式是取消慢速的收集器,每次發(fā)送出新值的時(shí)候,重新啟動(dòng)她.比如我們可以把Conflation改成collectLatest.字面意思上就是收集最近的發(fā)射值

package com.example.kotlin01

import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.runBlocking
import kotlin.system.measureTimeMillis


fun main() = runBlocking {
    val time = measureTimeMillis {
        foo().collectLatest { value ->
            delay(300)
            println(value)
        }
    }
    println("當(dāng)前流執(zhí)行需要花費(fèi)的時(shí)間$time")

}

fun foo(): Flow<Int> = flow {
    for (i in 1..10) {
        delay(100)
        emit(i)
    }
}

10
當(dāng)前流執(zhí)行需要花費(fèi)的時(shí)間1467

10.組合多個(gè)流(Composing multiple flows)

10.1.zip

和kotlin標(biāo)準(zhǔn)庫(kù)中的 Sequence.zip 擴(kuò)展函數(shù)一樣,流有一個(gè)zip合并運(yùn)算符,組合兩個(gè)流的相應(yīng)的值

package com.example.kotlin01

import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.flowOf
import kotlinx.coroutines.flow.zip
import kotlinx.coroutines.runBlocking


fun main() = runBlocking {
    val numbers = (1..3).asFlow()
    val strs = flowOf("a", "b", "c", "d")
    numbers.zip(strs) { a, b ->"$a and $b" }.collect { value ->
        println(value)
    }


}
1 and a
2 and b
3 and c
image.png

從上面我們知道兩個(gè)流是對(duì)應(yīng)合并
10.2.Combine
合并最近的

package com.example.kotlin01

import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.runBlocking


fun main() = runBlocking {
    val numbers = (1..3).asFlow()
    val strs = flowOf("a", "b", "c")
    numbers.combine(strs) { a, b ->"$a and $b" }.collect { value ->
        println("$value")

    }

}

1 and a
2 and a
2 and b
3 and b
3 and c

image.png
  • (1) 1和a - >1a
  • (2) a和2 ->2a
  • (3) 2和b ->2b
  • (4) b和3 ->3b
  • (5) 3和c ->3c

11.展平流(Flattening flows)

package com.example.kotlin01

import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.runBlocking


fun main() = runBlocking<Unit> {
    //map運(yùn)算符將請(qǐng)求值映射成結(jié)果值
    //傳入的是 1 2 3,得到的結(jié)果是一個(gè)flow  <flow<String>>
    (1..3).asFlow().map { request(it) }

}

fun request(request: Int): Flow<String> = flow {
    emit("這是第一個(gè) $request")
    delay(500)
    emit("這是第二個(gè) $request")
}

我們看以上的代碼,可以看到,main函數(shù)中的代碼塊得到的是一個(gè)flow <flow<String>>流.也就是流中包含了流.所以我們需要將其展平為單獨(dú)的一個(gè)流進(jìn)行處理.

11.1.flatMapConcat

使用flatMapConcat操作符.,等待內(nèi)部完成時(shí),然后收集下一個(gè)流

package com.example.kotlin01

import kotlinx.coroutines.flow.*
import kotlinx.coroutines.runBlocking


fun main() = runBlocking<Unit> {
    //map運(yùn)算符將請(qǐng)求值映射成結(jié)果值
    //傳入的是 1 2 3,得到的結(jié)果是一個(gè)flow  <flow<String>>
    (1..3).asFlow().flatMapConcat { request(it) }.collect{ println(it)}
}

fun request(request: Int): Flow<String> = flow {
    emit("這是第一個(gè) $request")
//    delay(500)
//    emit("這是第二個(gè) $request")
}
這是第一個(gè) 1
這是第一個(gè) 2
這是第一個(gè) 3

11.2.flatMapMerge

flatMapMerge操作符模式是同時(shí)收集傳入的流,然后合并到單個(gè)流中.

package com.example.kotlin01

import kotlinx.coroutines.flow.*
import kotlinx.coroutines.runBlocking


fun main() = runBlocking<Unit> {
    //map運(yùn)算符將請(qǐng)求值映射成結(jié)果值
    //傳入的是 1 2 3,得到的結(jié)果是一個(gè)flow  <flow<String>>
    //concurrency參數(shù)指的是限制同時(shí)收集的并發(fā)數(shù)
    //(1..3).asFlow().flatMapMerge(100, { request(it) }).collect { println(it) }
    (1..3).asFlow().flatMapMerge { request(it)  }.collect{ println(it)}
}

fun request(request: Int): Flow<String> = flow {
    emit("這是第一個(gè) $request")
//    delay(500)
//    emit("這是第二個(gè) $request")
}
這是第一個(gè) 1
這是第一個(gè) 2
這是第一個(gè) 3

11.3.flatMapLatest

和之前章節(jié)的collectLatest 操作符類似,一旦有新流發(fā)出,將取消之前已經(jīng)發(fā)出的流.

package com.example.kotlin01

import kotlinx.coroutines.flow.*
import kotlinx.coroutines.runBlocking


fun main() = runBlocking<Unit> {
    //map運(yùn)算符將請(qǐng)求值映射成結(jié)果值
    //傳入的是 1 2 3,得到的結(jié)果是一個(gè)flow  <flow<String>>
    (1..3).asFlow().flatMapLatest { request(it)  }.collect{ println(it)}
}

fun request(request: Int): Flow<String> = flow {
    emit("這是第一個(gè) $request")
//    delay(500)
//    emit("這是第二個(gè) $request")
}
這是第一個(gè) 1
這是第一個(gè) 2
這是第一個(gè) 3

以上我們通過(guò)三個(gè)操作符都可以將流展平.

12.流異常(Flow exceptions)

當(dāng)發(fā)射器或運(yùn)算符內(nèi)部的代碼引發(fā)異常的時(shí)候,流收集器可以結(jié)束運(yùn)行.但會(huì)出現(xiàn)異常.可以通過(guò) try catch

fun main() = runBlocking<Unit> {
    //map運(yùn)算符將請(qǐng)求值映射成結(jié)果值
    //傳入的是 1 2 3,得到的結(jié)果是一個(gè)flow  <flow<String>>
    try {
        (1..3).asFlow().collect {
            println(it)
            it / 0
        }
    }
    catch (e:Exception){
        println(e.message)
    }

}

13.異常透明性

使用catch運(yùn)算符捕捉異常,catch只能收集下游的異常,如果上游的代碼出現(xiàn)異常就不會(huì)收集.

package com.example.kotlin01

import kotlinx.coroutines.flow.*
import kotlinx.coroutines.runBlocking
import java.lang.Exception


fun main() = runBlocking<Unit> {
    request(2).catch { println("這是異常...$it") }
        .collect {
            println(it)
        }
}

fun request(request: Int): Flow<String> = flow {
    emit("這是第一個(gè) $request")
    request/0

}
這是第一個(gè) 2
這是異常...java.lang.ArithmeticException: / by zero

14.流完成(Flow completion)

流程在完成的之后(正常完成或者異常完成)可以做一些操作,可以通過(guò)兩種方式完成,命令式和聲明式

14.1.命令式

使用try/finally

package com.example.kotlin01

import kotlinx.coroutines.flow.*
import kotlinx.coroutines.runBlocking
import java.lang.Exception


fun main() = runBlocking<Unit> {
    try {
        request(2).catch { println("這是異常...$it") }
            .collect {
                println(it)
            }
    }
    finally {
        println("done")
    }

}

fun request(request: Int): Flow<String> = flow {
    emit("這是第一個(gè) $request")
    request/0

}
這是第一個(gè) 2
這是異常...java.lang.ArithmeticException: / by zero
done

14.2.聲明式

使用onCompletion 操作符

package com.example.kotlin01

import kotlinx.coroutines.flow.*
import kotlinx.coroutines.runBlocking
import java.lang.Exception


fun main() = runBlocking<Unit> {

    request(2).onCompletion { println("done") }.catch { println("這是異常...$it") }
        .collect {
            println(it)
        }
}

fun request(request: Int): Flow<String> = flow {
    emit("這是第一個(gè) $request")
}

這是第一個(gè) 2
done

十五、命令式還是聲明式

我們知道如何收集流,并以命令和聲明式的處理完成和異常.那到底選擇哪一種呢,根據(jù)編程愛好來(lái),我覺得使用聲明式比較舒服

十六、啟動(dòng)流(Launching flow)

我們可以使用launchIn代替collect收集流,launchIn的作用是在單獨(dú)協(xié)程中啟動(dòng)收集流.而collect不是.

package com.example.kotlin01

import kotlinx.coroutines.flow.*
import kotlinx.coroutines.runBlocking


fun main() = runBlocking<Unit> {

    request(2)
        //onEach 是一個(gè)中間運(yùn)算符
        .onEach { println(it) }
        //單獨(dú)啟動(dòng)收集流的協(xié)程進(jìn)行對(duì)流的收集
        //.collect()
        .launchIn(this)
}

fun request(request: Int): Flow<String> = flow {
    emit("這是第一個(gè) $request")

}

這是第一個(gè) 2
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容