依照慣例, 先放兩個(gè)能跑的代碼
!!本節(jié)中所有 Item 均為這段代碼!!
// item.kt
data class Item(val id: Int) {
init { // 錨點(diǎn)1
println("Item Created $id")
}
}
能跑的代碼
// 7.1.kt
import io.reactivex.Observable
fun main(args: Array<String>) {
Observable.just(1, 2, 3).map { Item(it) } // 注釋1
.subscribe({
println("Received $it")
Thread.sleep(100)
})
Thread.sleep(1000)
}
輸出
Item Created 1
Received Item(id=1)
Item Created 2
Received Item(id=2)
Item Created 3
Received Item(id=3)
注釋1
這里 Map 的意義就是利用對(duì)象初始化(錨點(diǎn)1)時(shí)能打印出來一個(gè)值這個(gè)特性, 便于我們看 一個(gè)值被彈出的時(shí)間點(diǎn) (用來和 這個(gè)值被處理的時(shí)間點(diǎn) 作比較), 把數(shù)字轉(zhuǎn)不轉(zhuǎn)化成對(duì)象在這里沒有特殊含義(是副產(chǎn)品)
上面代碼主旨大意
從輸出中可以看到基本流程是:
一個(gè)值被 Observable 彈出 -> 被 Observer 處理 -> 下一個(gè)值被彈出 -> ...
這是因?yàn)?Observable 和 Observer 運(yùn)行在一個(gè)線程中, 所以在 Observer 沒處理完上一個(gè)值之前 Observable 是不能彈出下一個(gè)值的。那如果 Observable 和 Observer 運(yùn)行在不同的線程中呢?來看下一個(gè)例子
// 7.2.kt
import io.reactivex.Observable
import io.reactivex.schedulers.Schedulers
fun main(args: Array<String>) {
Observable.just(1, 2, 3).map { Item(it) }
.observeOn(Schedulers.newThread()) // 注釋1 Scheduler 會(huì)在之后的章節(jié)說明
.subscribe({
Thread.sleep(100)
println("Received $it")
})
Thread.sleep(1000)
}
輸出
Item Created 1
Item Created 2
Item Created 3
Received Item(id=1)
Received Item(id=2)
Received Item(id=3)
注釋1
這里只需要知道這一行代碼使得 Observer 在另一個(gè)線程中運(yùn)行即可
上面代碼主旨大意
從輸出中可以看到 Observable 完全不會(huì)顧及 Observer 的感受, 只會(huì)一個(gè)值接一個(gè)值的彈。如果此時(shí) Observer 執(zhí)行某些耗時(shí)計(jì)算(即 值被 Observable 彈出的速度遠(yuǎn)遠(yuǎn)快于 被 Observer 處理的速度)(這里耗時(shí)計(jì)算我們用 sleep 模擬), 那么這些值在未被處理之前都會(huì)累積在內(nèi)存中,如果數(shù)據(jù)量很大甚至可以導(dǎo)致內(nèi)存溢出(OutOfMemory)。
如果數(shù)據(jù)的 消費(fèi)者(在這里是 Observer) 可以通過某種渠道告知數(shù)據(jù)的 生產(chǎn)者(在這里是 Observable) "先別彈出值了, 等我把累積的值處理完你再接著彈" 這一信息呢?
這個(gè)反饋渠道是存在的, 在 ReactiveX 中, 我們叫它 Backpressure (背壓)
Backpressure
背壓, 指的是后端的壓力, 通常用于描述系統(tǒng)排出的流體在出口處或二次側(cè)受到的與流動(dòng)方向相反的壓力。----百度百科
Observable 和 Observer 是不支持 Backpressure 的。替代方案是 Flowable 和 Subscriber
Backpressure |
生產(chǎn)者 |
消費(fèi)者 |
|---|---|---|
| 不支持 | Observable |
Observer |
| 支持 | Flowable |
Subscriber |
Flowable 在 ReactiveX 2.x (RxKotlin 2.X) 中被加入, 之前的版本不包括它。
Flowable
// 7.3.kt
import io.reactivex.Flowable // 和 7.2.kt 相比, 這一行有改變
import io.reactivex.schedulers.Schedulers
fun main(args: Array<String>) {
Flowable.just(1, 2, 3).map { Item(it) } // 和 7.2.kt 相比, 這一行有改變
.observeOn(Schedulers.newThread())
.subscribe({ // 注釋1
Thread.sleep(100)
println("Received $it")
})
Thread.sleep(1000)
}
輸出
Item Created 1
Item Created 2
Item Created 3
Received Item(id=1)
Received Item(id=2)
Received Item(id=3)
注釋1
這里是 Subscriber 而不是 Observer, 但是由于用的 Lambda 形式, 看起來一樣。我們會(huì)在之后討論。
上面的輸出和 7.2.kt 的輸出完全一致, 我一會(huì)說為什么沒有效果
看下一個(gè)例子
// 7.4.kt
import io.reactivex.Flowable
import io.reactivex.schedulers.Schedulers
fun main(args: Array<String>) {
Flowable.range(1, 260).map { Item(it) } // 和 7.3.kt 相比, 只有這一行有改變
.observeOn(Schedulers.newThread())
.subscribe({
Thread.sleep(100)
println("Received $it")
})
Thread.sleep(27000)
}
輸出(原諒我吧, 數(shù)據(jù)量太小根本沒有效果)
Item Created 1
Item Created 2
...
Item Created 127
Item Created 128 // 當(dāng) Flowable 彈出 128 個(gè)值就暫時(shí)停止了, 緩沖區(qū)達(dá)到上限(128)
Received Item(id=1)
Received Item(id=2)
...
Received Item(id=95)
Received Item(id=96) // Subscriber 僅僅處理了 96 個(gè)值, 緩沖區(qū)沒有被清空
Item Created 129
Item Created 130
...
Item Created 223
Item Created 224 // 緩沖區(qū)再次達(dá)到上限 224-96=128
Received Item(id=97)
Received Item(id=98)
...
Received Item(id=191)
Received Item(id=192)
Item Created 225
Item Created 226
...
Item Created 259
Item Created 260 // 這次是因?yàn)闆]有值了
Received Item(id=193)
Received Item(id=194)
...
Received Item(id=259)
Received Item(id=260)
Flowable 不會(huì)一下子把所有值全部彈出, 它會(huì)一塊一塊的彈, 當(dāng) Subscriber 跟上時(shí)才會(huì)繼續(xù)
Flowable 會(huì)維護(hù)一個(gè)默認(rèn)大小為 128 個(gè)元素的緩沖區(qū), 被彈出的元素會(huì)暫存其中。如果滿了 Flowable 就會(huì)暫時(shí)停止彈射。
7.3.kt 數(shù)據(jù)量還沒有達(dá)到緩沖區(qū)上限, 所以看起來沒有效果。
舉 7.3.kt 這個(gè)例子, 還有其他原因, 之后再說。
Subscriber
Subscriber 其實(shí)可以 動(dòng)態(tài) 限定從上游拿到的值的數(shù)量(動(dòng)態(tài) 這個(gè)詞接下來就會(huì)解釋,不要急), 但是我們之前一直用的 Lambda 形式, 系統(tǒng)默認(rèn)從上游獲取所有值。
我們來看一看如何限定這個(gè)數(shù)量。
先來看一個(gè) Subscriber(有點(diǎn)長(zhǎng),并且本節(jié)其它的 Subscriber 都這么長(zhǎng), 非常抱歉)
// subscriber_1.kt
import org.reactivestreams.Subscriber
import org.reactivestreams.Subscription
val subscriber_1 = object : Subscriber<Item> {
override fun onSubscribe(subscription: Subscription) {
subscription.request(4) // 注釋1 限定請(qǐng)求 4 個(gè)值
println("New Subscription ")
}
override fun onNext(s: Item) {
Thread.sleep(200)
println("Subscriber received " + s)
}
override fun onError(e: Throwable) {
e.printStackTrace()
}
override fun onComplete() {
println("Done!")
}
}
可以通過下面來調(diào)用上面的 subscriber_1
// 7.5.kt
import io.reactivex.Flowable
import io.reactivex.schedulers.Schedulers
fun main(args: Array<String>) {
Flowable.range(1, 6)
.map { Item(it) }
.observeOn(Schedulers.newThread())
.subscribe(subscriber_1) // 注釋2
Thread.sleep(2000)
}
輸出
New Subscription
Item Created 1
Item Created 2
Item Created 3
Item Created 4
Item Created 5
Item Created 6
Subscriber received Item(id=1)
Subscriber received Item(id=2)
Subscriber received Item(id=3)
Subscriber received Item(id=4) // 注釋3
注釋1
如果刪掉這一行, 我們就沒有限定請(qǐng)求數(shù)量, 一個(gè)值都接收不到(我原本以為會(huì)接收到所有值的, 做了實(shí)驗(yàn)才發(fā)現(xiàn)和我想的不太一樣)
注釋2
我沒有在接下來都使用一個(gè) Subscriber 來避免重復(fù)代碼
但是我會(huì)對(duì)代碼進(jìn)行處理,把和上一個(gè)不一樣的地方用注釋標(biāo)識(shí)出來
因?yàn)?Subscriber 的重點(diǎn)在通過不同的配置來控制流速和總量。
為了靈活, 我選擇創(chuàng)建多個(gè) Subscriber
注釋3
我們只請(qǐng)求 4 個(gè)值, 數(shù)據(jù)流并沒有到結(jié)尾, 系統(tǒng)沒有調(diào)用 onComplete 方法。所以沒有輸出 Done!
我再舉一個(gè)例子, 這個(gè)例子只是想說明何時(shí) onComplete 會(huì)被調(diào)用
// 7.6.kt
import io.reactivex.Flowable
import io.reactivex.schedulers.Schedulers
fun main(args: Array<String>) {
Flowable.range(1, 4) // 只有這一行與 7.5.kt 相比有變動(dòng)
.map { Item(it) }
.observeOn(Schedulers.newThread())
.subscribe(subscriber_1)
Thread.sleep(2000)
}
輸出
New Subscription
Item Created 1
Item Created 2
Item Created 3
Item Created 4
Subscriber received Item(id=1)
Subscriber received Item(id=2)
Subscriber received Item(id=3)
Subscriber received Item(id=4)
Done! // onComplete 被調(diào)用了
當(dāng)且僅當(dāng)數(shù)據(jù)流真正到達(dá)末尾時(shí)才會(huì)觸發(fā) onComplete。限定數(shù)量是不會(huì)的
之前我說 Subscriber 可以 動(dòng)態(tài) 限定從上游拿到的值的數(shù)量, 這個(gè)動(dòng)態(tài)可以從下面體現(xiàn)
// subscriber_2.kt
import org.reactivestreams.Subscriber
import org.reactivestreams.Subscription
val subscriber_2 = object : Subscriber<Item> {
lateinit var subscription: Subscription // 與 subscriber_1 相比多了這一行
override fun onSubscribe(subscription: Subscription) {
this.subscription = subscription // 與 subscriber_1 相比多了這一行
subscription.request(4)
println("New Subscription ")
}
override fun onNext(s: Item) {
Thread.sleep(200)
println("Subscriber received " + s)
if (s.id == 4) { // |\
println("Requesting two more") // | \
subscription.request(2) // | /--- 與 subscriber_1 相比多了這幾行
} // |/
}
override fun onError(e: Throwable) {
e.printStackTrace()
}
override fun onComplete() {
println("Done!")
}
}
// 7.7.kt
import io.reactivex.Flowable
import io.reactivex.schedulers.Schedulers
fun main(args: Array<String>) {
Flowable.range(1, 6)
.map { Item(it) }
.observeOn(Schedulers.newThread())
.subscribe(subscriber_2) // 只有這一行與 7.5.kt 相比有變動(dòng)
Thread.sleep(2000)
}
輸出
New Subscription
Item Created 1
Item Created 2
Item Created 3
Item Created 4
Item Created 5
Item Created 6
Subscriber received Item(id=1)
Subscriber received Item(id=2)
Subscriber received Item(id=3)
Subscriber received Item(id=4)
Requesting two more // 決定再?gòu)纳嫌文?2 個(gè)值進(jìn)行處理
Subscriber received Item(id=5)
Subscriber received Item(id=6)
Done!
OK 這一節(jié)就到這里。下一節(jié)我們說一說 Flowable 把速度降下來的幾種方式~
RxKotlin 例子不超過15行教程 1----環(huán)境配置與初體驗(yàn)
RxKotlin 例子不超過15行教程 2----Observable Observer 與 Subscribe 簡(jiǎn)介
RxKotlin 例子不超過15行教程 3----Observable 的創(chuàng)建
RxKotlin 例子不超過15行教程 4----Observer Subscribe 與 Hot/Cold Observable
RxKotlin 例子不超過15行教程 5----Subject
RxKotlin 例子不超過15行教程 6----Operator 與 Marble Diagram
RxKotlin 例子不超過15行教程 7----Backpressure Flowable 與 Subscriber 簡(jiǎn)介