
三. Flow VS Sequences
每一個 Flow 其內部是按照順序執(zhí)行的,這一點跟 Sequences 很類似。
Flow 跟 Sequences 之間的區(qū)別是 Flow 不會阻塞主線程的運行,而 Sequences 會阻塞主線程的運行。
使用 flow:
fun main() = runBlocking {
launch {
for (j in 1..5) {
delay(100)
println("I'm not blocked $j")
}
}
flow {
for (i in 1..5) {
delay(100)
emit(i)
}
}.collect { println(it) }
println("Done")
}
執(zhí)行結果:
1
I'm not blocked 1
2
I'm not blocked 2
3
I'm not blocked 3
4
I'm not blocked 4
5
Done
I'm not blocked 5
使用 sequence:
fun main() = runBlocking {
launch {
for (k in 1..5) {
delay(100)
println("I'm blocked $k")
}
}
sequence {
for (i in 1..5) {
Thread.sleep(100)
yield(i)
}
}.forEach { println(it) }
println("Done")
}
執(zhí)行結果:
1
2
3
4
5
Done
I'm blocked 1
I'm blocked 2
I'm blocked 3
I'm blocked 4
I'm blocked 5
由此,可以得出 Flow 在使用各個 suspend 函數(shù)時(本例子中使用了collect、emit函數(shù))不會阻塞主線程的運行。
四. Flow VS RxJava
Kotlin 協(xié)程庫的設計本身也參考了 RxJava ,下圖展示了如何從 RxJava 遷移到 Kotlin 協(xié)程。(火和冰形象地表示了 Hot、Cold Stream)

4.1 Cold Stream
flow 的代碼塊只有調用 collected() 才開始運行,正如 RxJava 創(chuàng)建的 Observables 只有調用 subscribe() 才開始運行一樣。
4.2 Hot Stream
如圖上所示,可以借助 Kotlin Channel 來實現(xiàn) Hot Stream。
4.3. Completion
Flow 完成時(正?;虺霈F(xiàn)異常時),如果需要執(zhí)行一個操作,它可以通過兩種方式完成:imperative、declarative。
4.3.1 imperative
通過使用 try ... finally 實現(xiàn)
fun main() = runBlocking {
try {
flow {
for (i in 1..5) {
delay(100)
emit(i)
}
}.collect { println(it) }
} finally {
println("Done")
}
}
4.3.2 declarative
通過 onCompletion() 函數(shù)實現(xiàn)
fun main() = runBlocking {
flow {
for (i in 1..5) {
delay(100)
emit(i)
}
}.onCompletion { println("Done") }
.collect { println(it) }
}
4.3.3 onCompleted (借助擴展函數(shù)實現(xiàn))
借助擴展函數(shù)可以實現(xiàn)類似 RxJava 的 onCompleted() 功能,只有在正常結束時才會被調用:
fun <T> Flow<T>.onCompleted(action: () -> Unit) = flow {
collect { value -> emit(value) }
action()
}
它的使用類似于 onCompletion()
fun <T> Flow<T>.onCompleted(action: () -> Unit) = flow {
collect { value -> emit(value) }
action()
}
fun main() = runBlocking {
flow {
for (i in 1..5) {
delay(100)
emit(i)
}
}.onCompleted { println("Completed...") }
.collect{println(it)}
}
但是假如 Flow 異常結束時,是不會執(zhí)行 onCompleted() 函數(shù)的。
4.4 Backpressure
Backpressure 是響應式編程的功能之一。
RxJava2 Flowable 支持的 Backpressure 策略,包括:
- MISSING:創(chuàng)建的 Flowable 沒有指定背壓策略,不會對通過 OnNext 發(fā)射的數(shù)據(jù)做緩存或丟棄處理。
- ERROR:如果放入 Flowable 的異步緩存池中的數(shù)據(jù)超限了,則會拋出 MissingBackpressureException 異常。
- BUFFER:Flowable 的異步緩存池同 Observable 的一樣,沒有固定大小,可以無限制添加數(shù)據(jù),不會拋出 MissingBackpressureException 異常,但會導致 OOM。
- DROP:如果 Flowable 的異步緩存池滿了,會丟掉將要放入緩存池中的數(shù)據(jù)。
- LATEST:如果緩存池滿了,會丟掉將要放入緩存池中的數(shù)據(jù)。這一點跟 DROP 策略一樣,不同的是,不管緩存池的狀態(tài)如何,LATEST 策略會將最后一條數(shù)據(jù)強行放入緩存池中。
而 Flow 的 Backpressure 是通過 suspend 函數(shù)實現(xiàn)。
4.4.1 buffer() 對應 BUFFER 策略
fun currTime() = System.currentTimeMillis()
var start: Long = 0
fun main() = runBlocking {
val time = measureTimeMillis {
(1..5)
.asFlow()
.onStart { start = currTime() }
.onEach {
delay(100)
println("Emit $it (${currTime() - start}ms) ")
}
.buffer()
.collect {
println("Collect $it starts (${currTime() - start}ms) ")
delay(500)
println("Collect $it ends (${currTime() - start}ms) ")
}
}
println("Cost $time ms")
}
執(zhí)行結果:
Emit 1 (104ms)
Collect 1 starts (108ms)
Emit 2 (207ms)
Emit 3 (309ms)
Emit 4 (411ms)
Emit 5 (513ms)
Collect 1 ends (613ms)
Collect 2 starts (613ms)
Collect 2 ends (1114ms)
Collect 3 starts (1114ms)
Collect 3 ends (1615ms)
Collect 4 starts (1615ms)
Collect 4 ends (2118ms)
Collect 5 starts (2118ms)
Collect 5 ends (2622ms)
Collected in 2689 ms
4.4.2 conflate() 對應 LATEST 策略
fun main() = runBlocking {
val time = measureTimeMillis {
(1..5)
.asFlow()
.onStart { start = currTime() }
.onEach {
delay(100)
println("Emit $it (${currTime() - start}ms) ")
}
.conflate()
.collect {
println("Collect $it starts (${currTime() - start}ms) ")
delay(500)
println("Collect $it ends (${currTime() - start}ms) ")
}
}
println("Cost $time ms")
}
執(zhí)行結果:
Emit 1 (106ms)
Collect 1 starts (110ms)
Emit 2 (213ms)
Emit 3 (314ms)
Emit 4 (419ms)
Emit 5 (520ms)
Collect 1 ends (613ms)
Collect 5 starts (613ms)
Collect 5 ends (1113ms)
Cost 1162 ms
4.4.3 DROP 策略
RxJava 的 contributor:David Karnok, 他寫了一個kotlin-flow-extensions庫,其中包括:FlowOnBackpressureDrop.kt,這個類支持 DROP 策略。
/**
* Drops items from the upstream when the downstream is not ready to receive them.
*/
@FlowPreview
fun <T> Flow<T>.onBackpressurureDrop() : Flow<T> = FlowOnBackpressureDrop(this)
使用這個庫的話,可以通過使用 Flow 的擴展函數(shù) onBackpressurureDrop() 來支持 DROP 策略。
該系列的相關文章:
Kotlin Coroutines Flow 系列(一) Flow 基本使用
Kotlin Coroutines Flow 系列(三) 異常處理
Kotlin Coroutines Flow 系列(四) 線程操作
Kotlin Coroutines Flow 系列(五) 其他的操作符