協(xié)程Flow簡(jiǎn)單使用

前言

本文是閱讀協(xié)程Flow的總結(jié)筆記。

什么是Flow

Kotlin中的Flow API是可以更好的異步處理按順序執(zhí)行的數(shù)據(jù)流的方法。

在RxJava中,Observables類型是表示項(xiàng)目流結(jié)構(gòu)的示例。 在訂閱者進(jìn)行訂閱之前,其主體不會(huì)被執(zhí)行。 訂閱后,訂閱者便開(kāi)始獲取發(fā)射的數(shù)據(jù)項(xiàng)。 同樣,F(xiàn)low在相同的條件下工作,即在流生成器內(nèi)部的代碼到了收集流后才開(kāi)始運(yùn)行。

你可以認(rèn)為他是Kotlin的RxJava,但比RxJava學(xué)習(xí)成本低,天然與協(xié)程友好(協(xié)程庫(kù)的一部分)

怎樣使用

首先引入kotlin的協(xié)程的核心庫(kù)

 implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-core:1.4.1'

編寫(xiě)如下代碼

fun main() =
    runBlocking {
        launch {
            for (k in 1..3) {
                println("I'm not blocked $k")
                delay(100)
            }
        }
        sample().collect {
            println("接收到的消息${it}")
        }
    }


fun sample() = flow<Int> {
    repeat(3) {
        delay(100)
        emit(it)
    }
}

運(yùn)行如下

I'm not blocked 1
接收到的消息0
I'm not blocked 2
接收到的消息1
I'm not blocked 3
接收到的消息2

Process finished with exit code 0

這就是Flow的基礎(chǔ)用法,需要注意的點(diǎn)如下:
(1)flow是Flow的構(gòu)造方法,F(xiàn)low使用emit發(fā)射數(shù)據(jù),使用collect來(lái)接收數(shù)據(jù)(與RxJava的上下游概念類似)
(2)flow代碼塊是可以掛起的
(3)sample函數(shù)沒(méi)有用suspend關(guān)鍵字進(jìn)行標(biāo)識(shí)

流是冷的

Flow是冷數(shù)據(jù)流,表現(xiàn)為構(gòu)建Flow的代碼中的emit需要在調(diào)用collect才開(kāi)始發(fā)送數(shù)據(jù)。

suspend fun main() = coroutineScope {
    println("收集數(shù)據(jù)")
    sample1().collect {
        println(it)
    }
    println("再次收集數(shù)據(jù)")
    sample1().collect {
        println(it)
    }

}

fun sample1() = flow {
    repeat(3) {
        emit(it)
    }
}

代碼運(yùn)行如下

收集數(shù)據(jù)
0
1
2
再次收集數(shù)據(jù)
0
1
2

Process finished with exit code 0

從運(yùn)行結(jié)果可以看到,調(diào)用了collect,flow的emit才會(huì)調(diào)用。

流的超時(shí)取消

Flow提供了withTimeoutOrNull來(lái)在超時(shí)的情況下取消并停止執(zhí)行其代碼的。
代碼如下:

fun main() = runBlocking {

    withTimeoutOrNull(300) {
        sample2().collect {
            println(it)
        }
    }
    println("完成")

}


fun sample2() = flow {
    repeat(3) {
        kotlinx.coroutines.delay(100)
        emit(it)
    }
}

運(yùn)行結(jié)果如下:

0
1
完成

Process finished with exit code 0

可以看到3沒(méi)有打印出來(lái)

流構(gòu)造器

Flow的構(gòu)造器除了flow{}這種還有:
1.asFlow ,擴(kuò)展函數(shù),將相關(guān)的集合或者序列轉(zhuǎn)換為流
2.flowOf(...),定義固定發(fā)射的數(shù)據(jù)流
代碼如下:

suspend fun main() = coroutineScope {

    listOf(1, 2, 3).asFlow().collect {
        println(it)
    }

    flowOf(4, 5, 6).collect {
        println(it)
    }

}

運(yùn)行結(jié)果如下:

1
2
3
4
5
6

Process finished with exit code 0

過(guò)度操作符

Flow中的過(guò)度操作符有Map和Filter兩種,其中Map是把Flow中的數(shù)據(jù)轉(zhuǎn)換為另外一種數(shù)據(jù)類型發(fā)射出來(lái)。Filter則是將符合條件的數(shù)據(jù)發(fā)送出來(lái)。

Map代碼如下:

suspend fun main()= coroutineScope {
    listOf(1, 2, 3).asFlow().map { it ->
        waitAWhile(it)
    }.collect {
        println(it)
    }
}


suspend fun waitAWhile(int: Int): String {
    delay(100)
    return "等了一會(huì)$int"
}

代碼將Int類型轉(zhuǎn)換為String類型的數(shù)據(jù)發(fā)送出來(lái)
運(yùn)行結(jié)果如下:

等了一會(huì)1
等了一會(huì)2
等了一會(huì)3

Fliter操作符代碼如下:

suspend fun main()= coroutineScope {
    listOf(1, 2, 3).asFlow().filter {
        it>1
    }.collect {
        println(it)
    }

}

代碼中只發(fā)射大于1的數(shù)據(jù),也就是2,3
運(yùn)行結(jié)果如下:

2
3

轉(zhuǎn)換操作符

Flow中的轉(zhuǎn)換操作符主要是TransForm,可以實(shí)現(xiàn)更復(fù)雜的變換(時(shí)間上map和filter都是基于Transform實(shí)現(xiàn)的),代碼如下:

listOf(1, 2, 3).asFlow().transform {
        emit("${it}發(fā)射")
        emit("${it}再發(fā)射")
    }.collect {
        println(it)
    }

代碼中將發(fā)射一次轉(zhuǎn)換為發(fā)射兩次,運(yùn)行代碼如下:

1發(fā)射
1再發(fā)射
2發(fā)射
2再發(fā)射
3發(fā)射
3再發(fā)射

Process finished with exit code 0

限長(zhǎng)操作符

Flow中的限長(zhǎng)操作符為T(mén)ake,在流觸及相應(yīng)限制的時(shí)候會(huì)將它的執(zhí)行取消(與協(xié)程一致都是通過(guò)拋出異常的方式進(jìn)行取消,該操作符已經(jīng)處理了異常拋出)。
代碼如下:

listOf(1, 2, 3).asFlow().take(2).collect {
        println(it)
    }

代碼中設(shè)置了限長(zhǎng)為2,標(biāo)識(shí)只會(huì)接受兩個(gè)發(fā)射的數(shù)據(jù)
運(yùn)行結(jié)果如下:

1
2

末端操作符

末端操作符是在流上用于啟動(dòng)流收集的掛起函數(shù)。 collect 是最基礎(chǔ)的末端操作符,但是還有另外一些更方便使用的末端操作符

  • 轉(zhuǎn)化為各種集合,例如 toListtoSet
  • 獲取第一個(gè)(first)值與確保流發(fā)射單個(gè)(single)值的操作符。
  • 使用 reducefold 將流規(guī)約到單個(gè)值。

toList代碼如下:

suspend fun main() {
    val stringList = mutableListOf<String>()
//轉(zhuǎn)換為list
    flowOf(1,2,3).map {
         "haha$it"
     }.toList(stringList).forEach {
        println(it)
    }
}

運(yùn)行結(jié)果如下:

haha1
haha2
haha3

Process finished with exit code 0

toSet代碼如下:

 flowOf(1, 2, 3).map {
        "haha$it"
    }.toSet(mutableSetOf()).forEach {
        println(it)
    }

運(yùn)行結(jié)果如下:

haha1
haha2
haha3

Process finished with exit code 0

toCollection代碼如下:

 //轉(zhuǎn)換為collection
    flowOf(1, 2, 3).map {
        "haha$it"
    }.toCollection(mutableSetOf()).forEach {
        println(it)
    }

運(yùn)行結(jié)果如下:

haha1
haha2
haha3

Process finished with exit code 0

first:獲取第一個(gè)發(fā)射的數(shù)據(jù),其余的拋棄
代碼如下:

  val first = flowOf(1, 2, 3).map {
        "haha$it"
    }.first()
    println(first)

運(yùn)行結(jié)果如下:

haha1

Process finished with exit code 0

first可以通過(guò)lambda函數(shù)作為選擇條件,返回滿足條件的第一個(gè)值,代碼如下

   //first 返回第一個(gè)滿足條件的元素
    val first = flowOf(1, 2, 3).map {
        "haha$it"
    }.first {
        it.contains("2")
    }
    println(first)

運(yùn)行結(jié)果如下:

haha2

Process finished with exit code 0

single也是返回一個(gè)發(fā)送的數(shù)據(jù),但,他要求至多有一個(gè)數(shù)據(jù)發(fā)送,發(fā)送多個(gè)數(shù)據(jù)會(huì)報(bào)錯(cuò)
代碼如下:

   //single返回單個(gè)值
    val single = flowOf(1).map {
        "haha$it"
    }.single()
    println(single)

運(yùn)行結(jié)果如下:

haha1

Process finished with exit code 0

singleOrNull 單個(gè)發(fā)射返回?cái)?shù)值,否則返回null

    //singleOrNull 單個(gè)發(fā)射返回?cái)?shù)值,否則返回null
    val singleOrNull = mutableListOf<Int>().asFlow().map {
        "haha$it"
    }.singleOrNull()
    println(singleOrNull)

運(yùn)行結(jié)果如下:

null

Process finished with exit code 0

reduce 對(duì)集合進(jìn)行計(jì)算操作

  val reduce = flowOf(1, 2, 3)
        .reduce { accumulator, value ->
            accumulator + value
        }
    println(reduce)

運(yùn)行結(jié)果如下:

6

Process finished with exit code 0

fold帶初始值的reduce

    //帶初始值的reduce
    val fold = flowOf(1, 2, 3)
        .fold(100) { acc, value ->
            acc + value
        }
    println(fold)

運(yùn)行結(jié)果如下

106

Process finished with exit code 0

流是連續(xù)的

流的每次單獨(dú)收集都是按順序執(zhí)行的,除非進(jìn)行特殊操作的操作符使用多個(gè)流(Zip操作符)。該收集過(guò)程直接在協(xié)程中運(yùn)行,該協(xié)程調(diào)用末端操作符。 默認(rèn)情況下不啟動(dòng)新協(xié)程。 從上游到下游每個(gè)過(guò)渡操作符都會(huì)處理每個(gè)發(fā)射出的值然后再交給末端操作符。
簡(jiǎn)單來(lái)說(shuō)就是一個(gè)發(fā)射數(shù)據(jù)執(zhí)行完所有的操作符(collect除外),下一個(gè)數(shù)據(jù)才能發(fā)射并執(zhí)行相關(guān)的操作符
代碼如下:


(1..5).asFlow()
    .filter {
        println("Filter $it")
        it % 2 == 0              
    }              
    .map { 
        println("Map $it")
        "string $it"
    }.collect { 
        println("Collect $it")
    }  

運(yùn)行結(jié)果如下:

Filter 1
Filter 2
Map 2
Collect string 2
Filter 3
Filter 4
Map 4
Collect string 4
Filter 5

流的上下文切換

流的收集總是在調(diào)用協(xié)程的上下文中發(fā)生。如無(wú)特殊指定,則默認(rèn)與所在協(xié)程處于同一個(gè)上下文中。

suspend fun main() {
    println(Thread.currentThread().name)
        flow {
            println(Thread.currentThread().name)
            emit(1)
            emit(2)
            emit(3)
        }.collect {
            println(Thread.currentThread().name)
            println(it)
        }
}

運(yùn)行結(jié)果如下

main
main
main
1
main
2
main
3

Process finished with exit code 0

但是如果發(fā)射器與接收器不處于同一個(gè)上下文時(shí)則會(huì)報(bào)錯(cuò)

 //發(fā)射邏輯與接收邏輯不在同一個(gè)線程中,會(huì)發(fā)生報(bào)錯(cuò)
    val flow = flow {
        withContext(Dispatchers.Default) {
            println("發(fā)射時(shí)的線程:${Thread.currentThread().name}")
            emit(1)
            emit(2)
            emit(3)
        }

    }
    flow.collect {
        println(it)
    }

運(yùn)行如下:
···
接收時(shí)的線程:main
發(fā)射時(shí)的線程:DefaultDispatcher-worker-1
Exception in thread "main" java.lang.IllegalStateException: Flow invariant is violated:
···
為了實(shí)現(xiàn)更高發(fā)射器的上下文,可以使用flowOn手動(dòng)切換所處線程

val flowOn = flow {
        println("發(fā)射時(shí)的線程:${Thread.currentThread().name}")
        emit(1)
        emit(2)
        emit(3)
    }.flowOn(Dispatchers.Default)


    withContext(Dispatchers.IO) {
        flowOn.collect {
            println("接收時(shí)的線程:${Thread.currentThread().name}")
            println(it)
        }
    }

運(yùn)行結(jié)果如下:

發(fā)射時(shí)的線程:DefaultDispatcher-worker-3
接收時(shí)的線程:DefaultDispatcher-worker-1
1
接收時(shí)的線程:DefaultDispatcher-worker-1
2
接收時(shí)的線程:DefaultDispatcher-worker-1
3

Process finished with exit code 0

緩沖操作符

從收集流所花費(fèi)的時(shí)間來(lái)看,將流的不同部分運(yùn)行在不同的協(xié)程中將會(huì)很有幫助,特別是當(dāng)涉及到長(zhǎng)時(shí)間運(yùn)行的異步操作時(shí)。例如,考慮一種情況, 一個(gè) 發(fā)射器 流的發(fā)射很慢,它每花費(fèi) 100 毫秒才產(chǎn)生一個(gè)元素;而收集器也非常慢, 需要花費(fèi) 200 毫秒來(lái)處理元素。讓我們看看從該流收集三個(gè)數(shù)字要花費(fèi)多長(zhǎng)時(shí)間。

val measureTimeMillis = measureTimeMillis {
        flow {
            repeat(3) {
                delay(100)
                emit(it)
            }
        }.collect {
            delay(200)
            println(it)
        }
    }
    println(measureTimeMillis)

運(yùn)行結(jié)果如下:

0
1
2
1031

Process finished with exit code 0

那么如何加快該過(guò)程呢,答案是使用Buffer操作符

 val measureTimeMillis = measureTimeMillis {
        flow {
            repeat(3) {
                delay(100)
                emit(it)
            }
        }.buffer().collect {
            delay(200)
            println(it)
        }
    }
    println(measureTimeMillis)

運(yùn)行結(jié)果如下:

0
1
2
985

它產(chǎn)生了相同的數(shù)字,只是更快了,由于我們高效地創(chuàng)建了處理流水線, 僅僅需要等待第一個(gè)數(shù)字產(chǎn)生的 100 毫秒以及處理每個(gè)數(shù)字各需花費(fèi)的 200 毫秒。(PS:上邊的flowOn也用到了類似的緩存機(jī)制,所以不需要再次調(diào)用buffer操作符)。

合并操作符

當(dāng)流代表部分操作結(jié)果或操作狀態(tài)更新時(shí),可能沒(méi)有必要處理每個(gè)值,而是只處理最新的那個(gè)。在本示例中,當(dāng)收集器處理它們太慢的時(shí)候, conflate 操作符可以用于跳過(guò)中間值。構(gòu)建前面的示例:

suspend fun main() {
    flow {
        repeat(3) {
            delay(100)
            emit(it)
        }
    }.conflate().collect {
        delay(500)
        println(it)
    }
}

運(yùn)行結(jié)果如下:

0
2

Process finished with exit code 0

沒(méi)有打印中間的1

當(dāng)發(fā)射器和收集器都很慢的時(shí)候,合并是加快處理速度的一種方式。它通過(guò)刪除發(fā)射值來(lái)實(shí)現(xiàn)。 另一種方式是取消緩慢的收集器,并在每次發(fā)射新值的時(shí)候重新啟動(dòng)它。有一組與 xxx 操作符執(zhí)行相同基本邏輯的 xxxLatest 操作符,但是在新值產(chǎn)生的時(shí)候取消執(zhí)行其塊中的代碼。讓我們?cè)谙惹暗氖纠袊L試更換 conflatecollectLatest

suspend fun main() {
    flow {
        repeat(3) {
            kotlinx.coroutines.delay(100)
            emit(it)
        }
    }.collectLatest {
        println(it)
        delay(500)
        println(it)
    }
}

運(yùn)行結(jié)果如下

2

Process finished with exit code 0

只打印出最后的2,沒(méi)有執(zhí)行0,1的打印

組合操作符

就像 Kotlin 標(biāo)準(zhǔn)庫(kù)中的 Sequence.zip 擴(kuò)展函數(shù)一樣, 流擁有一個(gè) zip 操作符用于組合兩個(gè)流中的相關(guān)值(多余數(shù)據(jù)直接丟棄)

/**
 * zip 組合兩個(gè)流中的相關(guān)值,多余的進(jìn)行丟棄
 */
suspend fun main() {
    val flow1 = flowOf(1, 2, 3, 4)
    val flow2 = flowOf("one", "two", "three")
    flow1.zip(flow2) { i: Int, s: String ->
        "$s:$i"
    }.collect {
        println(it)
    }

}

運(yùn)行結(jié)果如下

one:1
two:2
three:3

Process finished with exit code 0

當(dāng)流表示一個(gè)變量或操作的最新值時(shí)(請(qǐng)參閱相關(guān)小節(jié) conflation),可能需要執(zhí)行計(jì)算,這依賴于相應(yīng)流的最新值,并且每當(dāng)上游流產(chǎn)生值的時(shí)候都需要重新計(jì)算。這種相應(yīng)的操作符家族稱為 combine

suspend fun main() {
    val flow1 = flowOf(1, 2, 3, 4).onEach { delay(300) }
    val flow2 = flowOf("one", "two", "three").onEach { delay(400) }
    flow1.combine(flow2) { i: Int, s: String ->
        "$s:$i"
    }.collect {
        println(it)
    }
}

運(yùn)行結(jié)果如下:

one:1
one:2
two:2
two:3
three:3
three:4

Process finished with exit code 0

展開(kāi)操作符

流表示異步接收的值序列,所以很容易遇到這樣的情況: 每個(gè)值都會(huì)觸發(fā)對(duì)另一個(gè)值序列的請(qǐng)求。比如說(shuō),我們可以擁有下面這樣一個(gè)返回間隔 500 毫秒的兩個(gè)字符串流的函數(shù):

fun reqeustFlow(it: Int) = flow {
    emit("${it}:first")
    kotlinx.coroutines.delay(500)
    emit("${it}:second")
}

現(xiàn)在,如果我們有一個(gè)包含三個(gè)整數(shù)的流,并為每個(gè)整數(shù)調(diào)用 requestFlow

 val map = flowOf(1, 2, 3).map {
        reqeustFlow(it)
    }

然后我們得到了一個(gè)包含流的流(Flow<Flow<String>>),需要將其進(jìn)行展平為單個(gè)流以進(jìn)行下一步處理。集合與序列都擁有 flattenflatMap 操作符來(lái)做這件事。然而,由于流具有異步的性質(zhì),因此需要不同的展平模式。

flatMapConcat

順序的將發(fā)射器發(fā)射的數(shù)據(jù)轉(zhuǎn)變?yōu)橐粋€(gè)新的流

 flowOf(1, 2, 3).onEach { delay(100) }.flatMapConcat {
        reqeustFlow(it)
    }.collect {
        println(it)
    }

運(yùn)行結(jié)果如下:

1:first
1:second
2:first
2:second
3:first
3:second

Process finished with exit code 0

flatMapMerge

并發(fā)收集所有傳入的流,并將它們的值合并到一個(gè)單獨(dú)的流,以便盡快的發(fā)射值。需要注意的是flatMapMerge 會(huì)順序調(diào)用代碼塊(本示例中的 { requestFlow(it) }),但是并發(fā)收集結(jié)果流,相當(dāng)于執(zhí)行順序是首先執(zhí)行 map { requestFlow(it) } 然后在其返回結(jié)果上調(diào)用 flattenMerge從現(xiàn)象上來(lái)說(shuō)就是:先獲取所有的發(fā)射項(xiàng),然后每個(gè)發(fā)射項(xiàng)逐次調(diào)用flatMapMerge中的代碼(flatMapMerge有a,b代碼,所有發(fā)射項(xiàng)先執(zhí)行a,執(zhí)行完后所有發(fā)射項(xiàng)再執(zhí)行b)。

  flowOf(1, 2, 3).onEach { delay(100) }.flatMapMerge {
        reqeustFlow(it)
    }.collect {
        println(it)
    }

運(yùn)行代碼如下:

1:first
2:first
3:first
1:second
2:second
3:second

Process finished with exit code 0

flatMapLatest

在發(fā)出新流后立即取消先前流的收集

flowOf(1, 2, 3).onEach { delay(100) }.flatMapLatest {
        reqeustFlow(it)
    }.collect {
        println(it)
    }

運(yùn)行結(jié)果如下:

1:first
2:first
3:first
3:second

Process finished with exit code 0

try-catch

flow在收集器代碼塊使用try-catch的話,會(huì)接收到flow所有的異常拋出。我們首先看發(fā)射器發(fā)生錯(cuò)誤的時(shí)候

 val fl = flow {
        emit(1)
        emit(2)
        emit(3)
    }.map {
        check(it <3) {
            "上游發(fā)生錯(cuò)誤"
        }
        it
    }

    try {
        fl.collect {
            println(it)
        }
    } catch (e: Exception) {
        println(e.message)
    }

運(yùn)行結(jié)果如下:

1
2
上游發(fā)生錯(cuò)誤

Process finished with exit code 0

這里需要注意的是check操作符,如果不符合條件才會(huì)執(zhí)行大括號(hào)內(nèi)的代碼,并將返回值作為異常的消息。
如果下游發(fā)生異常的話,是否能捕獲到異常呢?,看如下代碼

 val flowOf = flowOf(1, 2, 3)
    try {
        flowOf.collect {
            check(it <= 2) {
                "下游發(fā)生錯(cuò)誤"
            }
            println(it)
        }
    } catch (e: Exception) {
        println(e.message)
    }

運(yùn)行結(jié)果如下:

1
2
下游發(fā)生錯(cuò)誤

Process finished with exit code 0

可以看到如果用try-catch包圍收集器的話是可以捕獲到上下游的異常的,需要注意的是流必須對(duì)異常透明,即在 flow { ... } 構(gòu)建器內(nèi)部的 try/catch 塊中發(fā)射值是違反異常透明性的。這樣可以保證收集器拋出的一個(gè)異常能被像先前示例中那樣的 try/catch 塊捕獲。那么Flow有沒(méi)有專門(mén)的捕獲異常的操作符呢,答案是有的,他就是catch操作符。

 flowOf(1, 2, 3).map {
        check(it <= 2) {
            "上游出現(xiàn)錯(cuò)誤"
        }
        it
    }.catch {
        println(it.message)
    }.collect {
        println(it)
    }

運(yùn)行結(jié)果如下:

1
2
上游出現(xiàn)錯(cuò)誤

Process finished with exit code 0

但是這樣是不能獲取到接收器發(fā)生的錯(cuò)誤的。如果想捕獲發(fā)射器收集器的異常,可采取如下的方法

    flowOf(1, 2, 3).map {
        check(it < 3) {
            "上游發(fā)生了錯(cuò)誤"
        }
        it
    }.onEach {
//放開(kāi)該注釋,注釋掉上面的check代碼,則能捕獲下游發(fā)生的錯(cuò)誤
//        check(it < 2) {
//            "下游發(fā)生錯(cuò)誤"
//        }
        println(it)

    }.catch {
        println(it.message)
    }.collect()

運(yùn)行結(jié)果如下:

1
2
上游發(fā)生了錯(cuò)誤

Process finished with exit code 0

至于你采用try-catch還是catch操作符,那就取決于你的習(xí)慣了。

流完成

流的完成也有finally和onCompletion操作符兩種。先看finally這種吧

val map = flowOf(1, 2, 3).map {
        check(it < 3) {
            "上游發(fā)生錯(cuò)誤"
        }
        it
    }

    try {
        map.collect {
            println(it)
        }
    } catch (e: Exception) {
        println(e.message)
    }finally {
        println("結(jié)束了")
    }

運(yùn)行結(jié)果如下

1
2
上游發(fā)生錯(cuò)誤
結(jié)束了

Process finished with exit code 0

onCompletion操作符如下:

flowOf(1, 2, 3).map {
        check(it <= 2) {
            "上游發(fā)生錯(cuò)誤"
        }
        it
    }.catch {
        println(it.message)
    }.onCompletion {
        if (it != null) {
            println("發(fā)生了異常")
        } else {
            println("沒(méi)有異常")
        }

    }.collect {
        println(it)
    }

運(yùn)行結(jié)果

1
2
上游發(fā)生錯(cuò)誤
沒(méi)有異常

Process finished with exit code 0

這里需要注意的是onComplettion是可以捕獲到相關(guān)異常的,但他不能處理異常,異常仍會(huì)向下游流動(dòng)。本文中的代碼因?yàn)樵趏nCompletion之前調(diào)用了catch,捕獲了異常,所以
onCompletion不會(huì)收到相關(guān)異常信息。

啟動(dòng)流

使用流表示來(lái)自一些源的異步事件是很簡(jiǎn)單的。 在這個(gè)案例中,我們需要一個(gè)類似 addEventListener 的函數(shù),該函數(shù)注冊(cè)一段響應(yīng)的代碼處理即將到來(lái)的事件,并繼續(xù)進(jìn)行進(jìn)一步的處理。onEach 操作符可以擔(dān)任該角色。 然而,onEach 是一個(gè)過(guò)渡操作符。我們也需要一個(gè)末端操作符來(lái)收集流。 否則僅調(diào)用 onEach 是無(wú)效的。

如果我們?cè)?onEach 之后使用 collect 末端操作符,那么后面的代碼會(huì)一直等待直至流被收集:

// 模仿事件流
fun events(): Flow<Int> = (1..3).asFlow().onEach { delay(100) }

fun main() = runBlocking<Unit> {
    events()
        .onEach { event -> println("Event: $event") }
        .collect() // <--- 等待流收集
    println("Done")
}       

運(yùn)行結(jié)果

Event: 1
Event: 2
Event: 3
Done

launchIn 末端操作符可以在這里派上用場(chǎng)。使用 launchIn 替換 collect 我們可以在單獨(dú)的協(xié)程中啟動(dòng)流的收集,這樣就可以立即繼續(xù)進(jìn)一步執(zhí)行代碼:

fun main() = runBlocking<Unit> {
    events()
        .onEach { event -> println("Event: $event") }
        .launchIn(this) // <--- 在單獨(dú)的協(xié)程中執(zhí)行流
    println("Done")
}      
Done
Event: 1
Event: 2
Event: 3

launchIn 必要的參數(shù) CoroutineScope 指定了用哪一個(gè)協(xié)程來(lái)啟動(dòng)流的收集。在先前的示例中這個(gè)作用域來(lái)自 runBlocking 協(xié)程構(gòu)建器,在這個(gè)流運(yùn)行的時(shí)候,runBlocking 作用域等待它的子協(xié)程執(zhí)行完畢并防止 main 函數(shù)返回并終止此示例。

在實(shí)際的應(yīng)用中,作用域來(lái)自于一個(gè)壽命有限的實(shí)體。在該實(shí)體的壽命終止后,相應(yīng)的作用域就會(huì)被取消,即取消相應(yīng)流的收集。這種成對(duì)的 onEach { ... }.launchIn(scope) 工作方式就像 addEventListener 一樣。而且,這不需要相應(yīng)的 removeEventListener 函數(shù), 因?yàn)槿∠c結(jié)構(gòu)化并發(fā)可以達(dá)成這個(gè)目的。

注意,launchIn 也會(huì)返回一個(gè) Job,可以在不取消整個(gè)作用域的情況下僅取消相應(yīng)的流收集或?qū)ζ溥M(jìn)行 join。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容