之前介紹的啟動(dòng)協(xié)程方法,比如 launch、async 都是協(xié)程的單次啟動(dòng)。如果有復(fù)雜場景,比如發(fā)送多個(gè)數(shù)據(jù),就需要使用 flow 數(shù)據(jù)流。在 flow 中,數(shù)據(jù)如水流一樣經(jīng)過上游發(fā)送,中間站處理,下游接收。
創(chuàng)建 flow 有 3 種方式:
flow{}
flowOf()
asFlow()
1.flow{} 中使用 emit 發(fā)送數(shù)據(jù)。
fun flowEmit() = runBlocking {
? ? flow {
? ? ? ? emit(1)
? ? ? ? emit(2)
? ? ? ? emit(3)
? ? ? ? emit(4)
? ? ? ? emit(5)
? ? }
? ? ? ? .filter {
? ? ? ? ? ? it > 2
? ? ? ? }
? ? ? ? .map {
? ? ? ? ? ? it * 2
? ? ? ? }
? ? ? ? .take(2)
? ? ? ? .collect {
? ? ? ? ? ? // 6 8
? ? ? ? ? ? println(it)
? ? ? ? }
}
2.flowOf
flowOf() 可以將指定的一串數(shù)據(jù)轉(zhuǎn)換為 flow,接收可變參數(shù)。
fun flowOfFun() = runBlocking {
? ? flowOf(1, 2, 3, 4, 5)
? ? ? ? .filter { it > 2 }
? ? ? ? .map { it * 2 }
? ? ? ? .take(2)
? ? ? ? .collect {
? ? ? ? ? ? // 6 8
? ? ? ? ? ? println(it)
? ? ? ? }
? ? listOf(1, 2, 3, 4, 5)
? ? ? ? .filter { it > 2 }
? ? ? ? .map { it * 2 }
? ? ? ? .take(2)
? ? ? ? .forEach {
? ? ? ? ? ? // 6 8
? ? ? ? ? ? println(it)
? ? ? ? }
}
3.asFlow
asFlow() 可以將 List 集合轉(zhuǎn)換為 flow。toList() 可以將 flow 轉(zhuǎn)換為 List 集合。
fun flow2list() = runBlocking {
? ? flowOf(1, 2, 3, 4, 5)
? ? ? ? // flow to list
? ? ? ? .toList()
? ? ? ? .filter { it > 2 }
? ? ? ? .map { it * 2 }
? ? ? ? .take(2)
? ? ? ? .forEach {
? ? ? ? ? ? println(it)
? ? ? ? }
? ? listOf(1, 2, 3, 4, 5)
? ? ? ? // list as flow
? ? ? ? .asFlow()
? ? ? ? .filter { it > 2 }
? ? ? ? .map { it * 2 }
? ? ? ? .take(2)
? ? ? ? .collect {
? ? ? ? ? ? println(it)
? ? ? ? }
}
4.中間操作符
創(chuàng)建 flow 之后使用中間操作符處理 flow 的每一個(gè)數(shù)據(jù)。flow 的中間操作符和 list 集合的操作符非常類似。
常用中間操作符:
filter
filter 傳入判斷條件,條件滿足時(shí)過濾數(shù)據(jù),否則不將數(shù)據(jù)流向下游。
map 傳入映射函數(shù),將每個(gè)數(shù)據(jù)傳入映射函數(shù),得到結(jié)果繼續(xù)傳入下游。
take 傳入非負(fù)整數(shù) n,取前 n 個(gè)數(shù)據(jù)傳入下游。
5.終止操作符
collect 是 flow 的終止操作符,收集每一個(gè)數(shù)據(jù)經(jīng)過中間操作符后的最終結(jié)果,表示 flow 流的終止,后面不能再調(diào)用中間操作符。
除了 collect,還有一些其他的終止操作符,first、single、fold、reduce。
1)collect
返回所有元素,結(jié)束 flow。
2)first
返回第一個(gè)元素,結(jié)束 flow。
3)single?
返回唯一元素,結(jié)束flow。不能多于一個(gè),也不能一個(gè)沒有。
4).fold
折疊所有元素。指定一個(gè)函數(shù)和初始值,對(duì)每一個(gè)元素反復(fù)執(zhí)行函數(shù),返回最后的結(jié)果。
5)reduce
reduce 和 fold 很類似,reduce 沒有初始值。
first、single、fold、reduce 本質(zhì)都是封裝了 collect ,因此它們都是終止操作符。
6.onStart 是 flow 的開始生命周期回調(diào)。onStart 的執(zhí)行時(shí)機(jī)和它在 flow 位置無關(guān)。
7.onComplete
flow 執(zhí)行完后回調(diào) onComplete。onComplete 的執(zhí)行時(shí)機(jī)和它在 flow 的位置無關(guān)。
flow 正常執(zhí)行完回調(diào) onComplete。
8.異常處理
flow 的異常處理可以分為上游異常和下游異常。上游異常指創(chuàng)建 flow 或者中間操作符發(fā)生的異常。下游異常指終止操作符 collect 發(fā)生的異常。
上游異常
上游異??梢杂?catch 函數(shù)捕獲異常。catch 函數(shù)和它的位置相關(guān),只能捕獲 catch 上游的異常。
下游異常不能用 catch 函數(shù),需要在 collect 的作用域用 try-catch 捕獲。
catch 函數(shù)無法捕獲下游的 filter 除 0 異常。
9.線程切換
1)flowOn 可以指定上游所有操作符運(yùn)行的線程,和它的位置相關(guān)。
collect 運(yùn)行在 main 線程,上游運(yùn)行在 IO 線程,指定 DefaultDispatcher。
flowOn 在 filter 之前,emit 執(zhí)行在 IO 線程,filter 和 collect 執(zhí)行在 main 線程。
因?yàn)?flowOn 只能用于上游,在 collect 可以用 withContext 切換線程,但不建議這么用。
collect 運(yùn)行在 DefaultDispatcher,其他運(yùn)行在 main 線程。
flow 的 emit、filter、collect 都運(yùn)行在 DefaultDispatcher。
2)launchIn
flow 提供了 launchIn 函數(shù)指定在哪個(gè)線程執(zhí)行。launchIn 運(yùn)行在指定的 CoroutineScope。
flowOn 之前的運(yùn)行在 Dispatchers.IO,下游運(yùn)行在 launchIn 指定的 scope。
launchIn 調(diào)用了 scope 的 launch,然后執(zhí)行 collect。相當(dāng)于終止操作符。
10.flow 是冷的
flow 是冷的,只有接收者存在的情況下才會(huì)發(fā)送數(shù)據(jù)。如果不調(diào)用 collect,emit 不會(huì)執(zhí)行。相反 channel 是熱的,不管有沒有接收者都會(huì)發(fā)送。
flow 的 emit 未執(zhí)行。
總結(jié)
flow 是 kotlin 提供的解決復(fù)雜異步場景的方案。
flow 由創(chuàng)建、中間操作符、終止操作符三個(gè)部分組成。
flow 的生命周期可以分為 onStart 和 onComplete,與它們?cè)?flow 的位置無關(guān)。
flow 的異常處理使用 catch。catch 與位置相關(guān)。
flow 的線程切換使用 flowOn 和 launchIn。flowOn 控制上游,launchIn 控制全局。
flow 是冷的,只有存在接收者它才會(huì)開始執(zhí)行。