Kotlin Coroutines Flow 系列(二) Flow VS RxJava2

shallow-focus-photo-of-a-woman-in-black-sleeveless-top-2521481.jpg

三. Flow VS Sequences

每一個 Flow 其內部是按照順序執(zhí)行的,這一點跟 Sequences 很類似。

Flow 跟 Sequences 之間的區(qū)別是 Flow 不會阻塞主線程的運行,而 Sequences 會阻塞主線程的運行。

使用 flow:

fun main() = runBlocking {

    launch {
        for (j in 1..5) {
            delay(100)
            println("I'm not blocked $j")
        }
    }

    flow {
        for (i in 1..5) {
            delay(100)
            emit(i)
        }
    }.collect { println(it) }

    println("Done")
}

執(zhí)行結果:

1
I'm not blocked 1
2
I'm not blocked 2
3
I'm not blocked 3
4
I'm not blocked 4
5
Done
I'm not blocked 5

使用 sequence:

fun main() = runBlocking {

    launch {
        for (k in 1..5) {
            delay(100)
            println("I'm blocked $k")
        }
    }

    sequence {
        for (i in 1..5) {
            Thread.sleep(100)
            yield(i)
        }
    }.forEach { println(it) }

    println("Done")
}

執(zhí)行結果:

1
2
3
4
5
Done
I'm blocked 1
I'm blocked 2
I'm blocked 3
I'm blocked 4
I'm blocked 5

由此,可以得出 Flow 在使用各個 suspend 函數(shù)時(本例子中使用了collect、emit函數(shù))不會阻塞主線程的運行。

四. Flow VS RxJava

Kotlin 協(xié)程庫的設計本身也參考了 RxJava ,下圖展示了如何從 RxJava 遷移到 Kotlin 協(xié)程。(火和冰形象地表示了 Hot、Cold Stream)

migration from rxjava.jpeg

4.1 Cold Stream

flow 的代碼塊只有調用 collected() 才開始運行,正如 RxJava 創(chuàng)建的 Observables 只有調用 subscribe() 才開始運行一樣。

4.2 Hot Stream

如圖上所示,可以借助 Kotlin Channel 來實現(xiàn) Hot Stream。

4.3. Completion

Flow 完成時(正?;虺霈F(xiàn)異常時),如果需要執(zhí)行一個操作,它可以通過兩種方式完成:imperative、declarative。

4.3.1 imperative

通過使用 try ... finally 實現(xiàn)

fun main() = runBlocking {
    try {
        flow {
            for (i in 1..5) {
                delay(100)
                emit(i)
            }
        }.collect { println(it) }
    } finally {
        println("Done")
    }
}

4.3.2 declarative

通過 onCompletion() 函數(shù)實現(xiàn)

fun main() = runBlocking {
    flow {
        for (i in 1..5) {
            delay(100)
            emit(i)
        }
    }.onCompletion { println("Done") }
        .collect { println(it) }
}

4.3.3 onCompleted (借助擴展函數(shù)實現(xiàn))

借助擴展函數(shù)可以實現(xiàn)類似 RxJava 的 onCompleted() 功能,只有在正常結束時才會被調用:

fun <T> Flow<T>.onCompleted(action: () -> Unit) = flow {

    collect { value -> emit(value) }

    action()
}

它的使用類似于 onCompletion()

fun <T> Flow<T>.onCompleted(action: () -> Unit) = flow {

    collect { value -> emit(value) }

    action()
}

fun main() = runBlocking {
    flow {
        for (i in 1..5) {
            delay(100)
            emit(i)
        }
    }.onCompleted { println("Completed...") }
        .collect{println(it)}
}

但是假如 Flow 異常結束時,是不會執(zhí)行 onCompleted() 函數(shù)的。

4.4 Backpressure

Backpressure 是響應式編程的功能之一。

RxJava2 Flowable 支持的 Backpressure 策略,包括:

  • MISSING:創(chuàng)建的 Flowable 沒有指定背壓策略,不會對通過 OnNext 發(fā)射的數(shù)據(jù)做緩存或丟棄處理。
  • ERROR:如果放入 Flowable 的異步緩存池中的數(shù)據(jù)超限了,則會拋出 MissingBackpressureException 異常。
  • BUFFER:Flowable 的異步緩存池同 Observable 的一樣,沒有固定大小,可以無限制添加數(shù)據(jù),不會拋出 MissingBackpressureException 異常,但會導致 OOM。
  • DROP:如果 Flowable 的異步緩存池滿了,會丟掉將要放入緩存池中的數(shù)據(jù)。
  • LATEST:如果緩存池滿了,會丟掉將要放入緩存池中的數(shù)據(jù)。這一點跟 DROP 策略一樣,不同的是,不管緩存池的狀態(tài)如何,LATEST 策略會將最后一條數(shù)據(jù)強行放入緩存池中。

而 Flow 的 Backpressure 是通過 suspend 函數(shù)實現(xiàn)。

4.4.1 buffer() 對應 BUFFER 策略

fun currTime() = System.currentTimeMillis()

var start: Long = 0

fun main() = runBlocking {

    val time = measureTimeMillis {
        (1..5)
            .asFlow()
            .onStart { start = currTime() }
            .onEach {
                delay(100)
                println("Emit $it (${currTime() - start}ms) ")
            }
            .buffer()
            .collect {
                println("Collect $it starts (${currTime() - start}ms) ")
                delay(500)
                println("Collect $it ends (${currTime() - start}ms) ")
            }
    }

    println("Cost $time ms")
}

執(zhí)行結果:

Emit 1 (104ms) 
Collect 1 starts (108ms) 
Emit 2 (207ms) 
Emit 3 (309ms) 
Emit 4 (411ms) 
Emit 5 (513ms) 
Collect 1 ends (613ms) 
Collect 2 starts (613ms) 
Collect 2 ends (1114ms) 
Collect 3 starts (1114ms) 
Collect 3 ends (1615ms) 
Collect 4 starts (1615ms) 
Collect 4 ends (2118ms) 
Collect 5 starts (2118ms) 
Collect 5 ends (2622ms) 
Collected in 2689 ms

4.4.2 conflate() 對應 LATEST 策略

fun main() = runBlocking {

    val time = measureTimeMillis {
        (1..5)
            .asFlow()
            .onStart { start = currTime() }
            .onEach {
                delay(100)
                println("Emit $it (${currTime() - start}ms) ")
            }
            .conflate()
            .collect {
                println("Collect $it starts (${currTime() - start}ms) ")
                delay(500)
                println("Collect $it ends (${currTime() - start}ms) ")
            }
    }

    println("Cost $time ms")
}

執(zhí)行結果:

Emit 1 (106ms) 
Collect 1 starts (110ms) 
Emit 2 (213ms) 
Emit 3 (314ms) 
Emit 4 (419ms) 
Emit 5 (520ms) 
Collect 1 ends (613ms) 
Collect 5 starts (613ms) 
Collect 5 ends (1113ms) 
Cost 1162 ms

4.4.3 DROP 策略

RxJava 的 contributor:David Karnok, 他寫了一個kotlin-flow-extensions庫,其中包括:FlowOnBackpressureDrop.kt,這個類支持 DROP 策略。

/**
 * Drops items from the upstream when the downstream is not ready to receive them.
 */
@FlowPreview
fun <T> Flow<T>.onBackpressurureDrop() : Flow<T> = FlowOnBackpressureDrop(this)

使用這個庫的話,可以通過使用 Flow 的擴展函數(shù) onBackpressurureDrop() 來支持 DROP 策略。

該系列的相關文章:

Kotlin Coroutines Flow 系列(一) Flow 基本使用
Kotlin Coroutines Flow 系列(三) 異常處理
Kotlin Coroutines Flow 系列(四) 線程操作
Kotlin Coroutines Flow 系列(五) 其他的操作符

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

友情鏈接更多精彩內容