Kotlin Coroutines Flow 系列(一) Flow 基本使用

woman-in-blue-spaghetti-strap-dress-2266519.jpg

一. Kotlin Flow 介紹

Flow 庫是在 Kotlin Coroutines 1.3.2 發(fā)布之后新增的庫。

官方文檔給予了一句話簡單的介紹:

Flow — cold asynchronous stream with flow builder and comprehensive operator set (filter, map, etc);

Flow 從文檔的介紹來看,它有點類似 RxJava 的 Observable。因為 Observable 也有 Cold 、Hot 之分。

二. Flow 基本使用

Flow 能夠返回多個異步計算的值,例如下面的 flow builder :

        flow {
            for (i in 1..5) {
                delay(100)
                emit(i)
            }
        }.collect{
            println(it)
        }

其中 Flow 接口,只有一個 collect 函數(shù)

public interface Flow<out T> {

    @InternalCoroutinesApi
    public suspend fun collect(collector: FlowCollector<T>)
}

如果熟悉 RxJava 的話,則可以理解為 collect() 對應(yīng)subscribe(),而 emit() 對應(yīng)onNext()。

2.1 創(chuàng)建 flow

除了剛剛展示的 flow builder 可以用于創(chuàng)建 flow,還有其他的幾種方式:

flowOf()

    flowOf(1,2,3,4,5)
        .onEach {
            delay(100)
        }
        .collect{
            println(it)
        }

asFlow()

    listOf(1, 2, 3, 4, 5).asFlow()
        .onEach {
            delay(100)
        }.collect {
            println(it)
        }

channelFlow()

    channelFlow {
        for (i in 1..5) {
            delay(100)
            send(i)
        }
    }.collect{
        println(it)
    }

最后的 channelFlow builder 跟 flow builder 是有一定差異的。

flow 是 Cold Stream。在沒有切換線程的情況下,生產(chǎn)者和消費者是同步非阻塞的。
channel 是 Hot Stream。而 channelFlow 實現(xiàn)了生產(chǎn)者和消費者異步非阻塞模型。

下面的代碼,展示了使用 flow builder 的情況,大致花費1秒:

fun main() = runBlocking {

    val time = measureTimeMillis {
        flow {
            for (i in 1..5) {
                delay(100)
                emit(i)
            }
        }.collect{
            delay(100)
            println(it)
        }
    }

    print("cost $time")
}
flow.png

使用 channelFlow builder 的情況,大致花費700毫秒:

fun main() = runBlocking {

    val time = measureTimeMillis{
        channelFlow {
            for (i in 1..5) {
                delay(100)
                send(i)
            }
        }.collect{
            delay(100)
            println(it)
        }
    }

    print("cost $time")
}
channelFlow.png

當(dāng)然,flow 如果切換線程的話,花費的時間也是大致700毫秒,跟使用 channelFlow builder 效果差不多。

fun main() = runBlocking {

    val time = measureTimeMillis{
        flow {
            for (i in 1..5) {
                delay(100)
                emit(i)
            }
        }.flowOn(Dispatchers.IO)
            .collect {
                delay(100)
                println(it)
            }
    }

    print("cost $time")
}

2.2 切換線程

相比于 RxJava 需要使用 observeOn、subscribeOn 來切換線程,flow 會更加簡單。只需使用 flowOn,下面的例子中,展示了 flow builder 和 map 操作符都會受到 flowOn 的影響。

    flow {
        for (i in 1..5) {
            delay(100)
            emit(i)
        }
    }.map {
        it * it
    }.flowOn(Dispatchers.IO)
        .collect {
            println(it)
        }

而 collect() 指定哪個線程,則需要看整個 flow 處于哪個 CoroutineScope 下。

例如,下面的代碼 collect() 則是在 main 線程:

fun main() = runBlocking {

    flow {
        for (i in 1..5) {
            delay(100)
            emit(i)
        }
    }.map {
        it * it
    }.flowOn(Dispatchers.IO)
        .collect {
            println("${Thread.currentThread().name}: $it")
        }
}

執(zhí)行結(jié)果:

main: 1
main: 4
main: 9
main: 16
main: 25

值得注意的地方,不要使用 withContext() 來切換 flow 的線程。

2.3 flow 取消

如果 flow 是在一個掛起函數(shù)內(nèi)被掛起了,那么 flow 是可以被取消的,否則不能取消。

fun main() = runBlocking {

    withTimeoutOrNull(2500) {
        flow {
            for (i in 1..5) {
                delay(1000)
                emit(i)
            }
        }.collect {
            println(it)
        }
    }

    println("Done")
}

執(zhí)行結(jié)果:

1
2
Done

2.4 Terminal flow operators

Flow 的 API 有點類似于 Java Stream 的 API。它也同樣擁有 Intermediate Operations、Terminal Operations。

Flow 的 Terminal 運算符可以是 suspend 函數(shù),如 collect、single、reduce、toList 等;也可以是 launchIn 運算符,用于在指定 CoroutineScope 內(nèi)使用 flow。

@ExperimentalCoroutinesApi // tentatively stable in 1.3.0
public fun <T> Flow<T>.launchIn(scope: CoroutineScope): Job = scope.launch {
    collect() // tail-call
}

整理一下 Flow 的 Terminal 運算符

  • collect
  • single/first
  • toList/toSet/toCollection
  • count
  • fold/reduce
  • launchIn/produceIn/broadcastIn

該系列的相關(guān)文章:

Kotlin Coroutines Flow 系列(二) Flow VS RxJava2
Kotlin Coroutines Flow 系列(三) 異常處理
Kotlin Coroutines Flow 系列(四) 線程操作
Kotlin Coroutines Flow 系列(五) 其他的操作符

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • ?異步掛起函數(shù)能夠返回單一值,那么我們?nèi)绾畏祷囟鄠€異步計算的值呢?而這個就是Kotlin Flow需要解決地。 R...
    兩三行代碼閱讀 3,214評論 0 5
  • RxJava RxJava是響應(yīng)式程序設(shè)計的一種實現(xiàn)。在響應(yīng)式程序設(shè)計中,當(dāng)數(shù)據(jù)到達的時候,消費者做出響應(yīng)。響應(yīng)式...
    Mr槑閱讀 1,046評論 0 5
  • 在正文開始之前的最后,放上 GitHub 鏈接和引入依賴的 gradle 代碼: Github: https://...
    松江野人閱讀 6,157評論 0 1
  • 可能是每天晚上我堅持和小寶一起看書的緣故,吃早飯時,小寶撿到一張卡片,她很自信地對我說:“媽媽,我念給你聽聽啊?!?..
    5239林中漫步閱讀 209評論 0 2
  • 汽車時代的便利 也造就了一座又一座的堵城 不管你走到哪里 都會堵的沒有脾氣 你享受它的舒適 就要接受它的不可預(yù)期 ...
    神于天圣于地閱讀 81評論 0 0

友情鏈接更多精彩內(nèi)容