RxKotlin 例子不超過15行教程 7----Backpressure Flowable 與 Subscriber 簡(jiǎn)介

依照慣例, 先放兩個(gè)能跑的代碼
!!本節(jié)中所有 Item 均為這段代碼!!

// item.kt
data class Item(val id: Int) {
    init {  // 錨點(diǎn)1
        println("Item Created $id")
    }
}

能跑的代碼

// 7.1.kt
import io.reactivex.Observable

fun main(args: Array<String>) {
    Observable.just(1, 2, 3).map { Item(it) }  // 注釋1
            .subscribe({
                println("Received $it")
                Thread.sleep(100)
            })
    Thread.sleep(1000)
}

輸出

Item Created 1
Received Item(id=1)
Item Created 2
Received Item(id=2)
Item Created 3
Received Item(id=3)

注釋1
這里 Map 的意義就是利用對(duì)象初始化(錨點(diǎn)1)時(shí)能打印出來一個(gè)值這個(gè)特性, 便于我們看 一個(gè)值被彈出的時(shí)間點(diǎn) (用來和 這個(gè)值被處理的時(shí)間點(diǎn) 作比較), 把數(shù)字轉(zhuǎn)不轉(zhuǎn)化成對(duì)象在這里沒有特殊含義(是副產(chǎn)品)

上面代碼主旨大意

從輸出中可以看到基本流程是:
一個(gè)值被 Observable 彈出 -> 被 Observer 處理 -> 下一個(gè)值被彈出 -> ...
這是因?yàn)?ObservableObserver 運(yùn)行在一個(gè)線程中, 所以在 Observer 沒處理完上一個(gè)值之前 Observable 是不能彈出下一個(gè)值的。那如果 ObservableObserver 運(yùn)行在不同的線程中呢?來看下一個(gè)例子

// 7.2.kt
import io.reactivex.Observable
import io.reactivex.schedulers.Schedulers

fun main(args: Array<String>) {
    Observable.just(1, 2, 3).map { Item(it) }
            .observeOn(Schedulers.newThread())  // 注釋1  Scheduler 會(huì)在之后的章節(jié)說明
            .subscribe({
                Thread.sleep(100)
                println("Received $it")
            })
    Thread.sleep(1000)
}

輸出

Item Created 1
Item Created 2
Item Created 3
Received Item(id=1)
Received Item(id=2)
Received Item(id=3)

注釋1
這里只需要知道這一行代碼使得 Observer 在另一個(gè)線程中運(yùn)行即可

上面代碼主旨大意

從輸出中可以看到 Observable 完全不會(huì)顧及 Observer 的感受, 只會(huì)一個(gè)值接一個(gè)值的彈。如果此時(shí) Observer 執(zhí)行某些耗時(shí)計(jì)算(即 值被 Observable 彈出的速度遠(yuǎn)遠(yuǎn)快于 被 Observer 處理的速度)(這里耗時(shí)計(jì)算我們用 sleep 模擬), 那么這些值在未被處理之前都會(huì)累積在內(nèi)存中,如果數(shù)據(jù)量很大甚至可以導(dǎo)致內(nèi)存溢出(OutOfMemory)。

如果數(shù)據(jù)的 消費(fèi)者(在這里是 Observer) 可以通過某種渠道告知數(shù)據(jù)的 生產(chǎn)者(在這里是 Observable) "先別彈出值了, 等我把累積的值處理完你再接著彈" 這一信息呢?

這個(gè)反饋渠道是存在的, 在 ReactiveX 中, 我們叫它 Backpressure (背壓)

Backpressure

背壓 , 指的是后端的壓力, 通常用于描述系統(tǒng)排出的流體在出口處或二次側(cè)受到的與流動(dòng)方向相反的壓力。----百度百科

ObservableObserver 是不支持 Backpressure 的。替代方案是 FlowableSubscriber

Backpressure 生產(chǎn)者 消費(fèi)者
不支持 Observable Observer
支持 Flowable Subscriber

FlowableReactiveX 2.x (RxKotlin 2.X) 中被加入, 之前的版本不包括它。

Flowable

// 7.3.kt
import io.reactivex.Flowable  // 和 7.2.kt 相比, 這一行有改變
import io.reactivex.schedulers.Schedulers

fun main(args: Array<String>) {
    Flowable.just(1, 2, 3).map { Item(it) }  // 和 7.2.kt 相比, 這一行有改變
            .observeOn(Schedulers.newThread())
            .subscribe({  // 注釋1
                Thread.sleep(100)
                println("Received $it")
            })
    Thread.sleep(1000)
}

輸出

Item Created 1
Item Created 2
Item Created 3
Received Item(id=1)
Received Item(id=2)
Received Item(id=3)

注釋1
這里是 Subscriber 而不是 Observer, 但是由于用的 Lambda 形式, 看起來一樣。我們會(huì)在之后討論。
上面的輸出和 7.2.kt 的輸出完全一致, 我一會(huì)說為什么沒有效果
看下一個(gè)例子

// 7.4.kt
import io.reactivex.Flowable
import io.reactivex.schedulers.Schedulers

fun main(args: Array<String>) {
    Flowable.range(1, 260).map { Item(it) }  // 和 7.3.kt 相比, 只有這一行有改變
            .observeOn(Schedulers.newThread())
            .subscribe({
                Thread.sleep(100)
                println("Received $it")
            })
    Thread.sleep(27000)
}

輸出(原諒我吧, 數(shù)據(jù)量太小根本沒有效果)

Item Created 1
Item Created 2
...
Item Created 127
Item Created 128  // 當(dāng) Flowable 彈出 128 個(gè)值就暫時(shí)停止了, 緩沖區(qū)達(dá)到上限(128)
Received Item(id=1)
Received Item(id=2)
...
Received Item(id=95)
Received Item(id=96)  // Subscriber 僅僅處理了 96 個(gè)值, 緩沖區(qū)沒有被清空
Item Created 129
Item Created 130
...
Item Created 223
Item Created 224  // 緩沖區(qū)再次達(dá)到上限 224-96=128
Received Item(id=97)
Received Item(id=98)
...
Received Item(id=191)
Received Item(id=192)
Item Created 225
Item Created 226
...
Item Created 259
Item Created 260  // 這次是因?yàn)闆]有值了
Received Item(id=193)
Received Item(id=194)
...
Received Item(id=259)
Received Item(id=260)

Flowable 不會(huì)一下子把所有值全部彈出, 它會(huì)一塊一塊的彈, 當(dāng) Subscriber 跟上時(shí)才會(huì)繼續(xù)
Flowable 會(huì)維護(hù)一個(gè)默認(rèn)大小為 128 個(gè)元素的緩沖區(qū), 被彈出的元素會(huì)暫存其中。如果滿了 Flowable 就會(huì)暫時(shí)停止彈射。

7.3.kt 數(shù)據(jù)量還沒有達(dá)到緩沖區(qū)上限, 所以看起來沒有效果。
7.3.kt 這個(gè)例子, 還有其他原因, 之后再說。

Subscriber

Subscriber 其實(shí)可以 動(dòng)態(tài) 限定從上游拿到的值的數(shù)量(動(dòng)態(tài) 這個(gè)詞接下來就會(huì)解釋,不要急), 但是我們之前一直用的 Lambda 形式, 系統(tǒng)默認(rèn)從上游獲取所有值。
我們來看一看如何限定這個(gè)數(shù)量。
先來看一個(gè) Subscriber(有點(diǎn)長(zhǎng),并且本節(jié)其它的 Subscriber 都這么長(zhǎng), 非常抱歉)

// subscriber_1.kt
import org.reactivestreams.Subscriber
import org.reactivestreams.Subscription

val subscriber_1 = object : Subscriber<Item> {
    override fun onSubscribe(subscription: Subscription) {
        subscription.request(4)  // 注釋1  限定請(qǐng)求 4 個(gè)值
        println("New Subscription ")
    }

    override fun onNext(s: Item) {
        Thread.sleep(200)
        println("Subscriber received " + s)
    }

    override fun onError(e: Throwable) {
        e.printStackTrace()
    }

    override fun onComplete() {
        println("Done!")
    }
}

可以通過下面來調(diào)用上面的 subscriber_1

// 7.5.kt
import io.reactivex.Flowable
import io.reactivex.schedulers.Schedulers

fun main(args: Array<String>) {
    Flowable.range(1, 6)
            .map { Item(it) }
            .observeOn(Schedulers.newThread())
            .subscribe(subscriber_1)  // 注釋2
    Thread.sleep(2000)
}

輸出

New Subscription
Item Created 1
Item Created 2
Item Created 3
Item Created 4
Item Created 5
Item Created 6
Subscriber received Item(id=1)
Subscriber received Item(id=2)
Subscriber received Item(id=3)
Subscriber received Item(id=4)  // 注釋3

注釋1
如果刪掉這一行, 我們就沒有限定請(qǐng)求數(shù)量, 一個(gè)值都接收不到(我原本以為會(huì)接收到所有值的, 做了實(shí)驗(yàn)才發(fā)現(xiàn)和我想的不太一樣)
注釋2
我沒有在接下來都使用一個(gè) Subscriber 來避免重復(fù)代碼
但是我會(huì)對(duì)代碼進(jìn)行處理,把和上一個(gè)不一樣的地方用注釋標(biāo)識(shí)出來
因?yàn)?Subscriber 的重點(diǎn)在通過不同的配置來控制流速和總量。
為了靈活, 我選擇創(chuàng)建多個(gè) Subscriber
注釋3
我們只請(qǐng)求 4 個(gè)值, 數(shù)據(jù)流并沒有到結(jié)尾, 系統(tǒng)沒有調(diào)用 onComplete 方法。所以沒有輸出 Done!
我再舉一個(gè)例子, 這個(gè)例子只是想說明何時(shí) onComplete 會(huì)被調(diào)用

// 7.6.kt
import io.reactivex.Flowable
import io.reactivex.schedulers.Schedulers

fun main(args: Array<String>) {
    Flowable.range(1, 4)  // 只有這一行與 7.5.kt 相比有變動(dòng)
            .map { Item(it) }
            .observeOn(Schedulers.newThread())
            .subscribe(subscriber_1)
    Thread.sleep(2000)
}

輸出

New Subscription
Item Created 1
Item Created 2
Item Created 3
Item Created 4
Subscriber received Item(id=1)
Subscriber received Item(id=2)
Subscriber received Item(id=3)
Subscriber received Item(id=4)
Done!  // onComplete 被調(diào)用了

當(dāng)且僅當(dāng)數(shù)據(jù)流真正到達(dá)末尾時(shí)才會(huì)觸發(fā) onComplete。限定數(shù)量是不會(huì)的

之前我說 Subscriber 可以 動(dòng)態(tài) 限定從上游拿到的值的數(shù)量, 這個(gè)動(dòng)態(tài)可以從下面體現(xiàn)

// subscriber_2.kt
import org.reactivestreams.Subscriber
import org.reactivestreams.Subscription

val subscriber_2 = object : Subscriber<Item> {
    lateinit var subscription: Subscription  // 與 subscriber_1 相比多了這一行
    override fun onSubscribe(subscription: Subscription) {
        this.subscription = subscription  // 與 subscriber_1 相比多了這一行
        subscription.request(4)
        println("New Subscription ")
    }

    override fun onNext(s: Item) {
        Thread.sleep(200)
        println("Subscriber received " + s)
        if (s.id == 4) {                    // |\
            println("Requesting two more")  // | \
            subscription.request(2)         // | /--- 與 subscriber_1 相比多了這幾行
        }                                   // |/
    }

    override fun onError(e: Throwable) {
        e.printStackTrace()
    }

    override fun onComplete() {
        println("Done!")
    }
}
// 7.7.kt
import io.reactivex.Flowable
import io.reactivex.schedulers.Schedulers

fun main(args: Array<String>) {
    Flowable.range(1, 6)
            .map { Item(it) }
            .observeOn(Schedulers.newThread())
            .subscribe(subscriber_2) // 只有這一行與 7.5.kt 相比有變動(dòng)
    Thread.sleep(2000)
}

輸出

New Subscription
Item Created 1
Item Created 2
Item Created 3
Item Created 4
Item Created 5
Item Created 6
Subscriber received Item(id=1)
Subscriber received Item(id=2)
Subscriber received Item(id=3)
Subscriber received Item(id=4)
Requesting two more  // 決定再?gòu)纳嫌文?2 個(gè)值進(jìn)行處理
Subscriber received Item(id=5)
Subscriber received Item(id=6)
Done!

OK 這一節(jié)就到這里。下一節(jié)我們說一說 Flowable 把速度降下來的幾種方式~

RxKotlin 例子不超過15行教程 1----環(huán)境配置與初體驗(yàn)

RxKotlin 例子不超過15行教程 2----Observable Observer 與 Subscribe 簡(jiǎn)介

RxKotlin 例子不超過15行教程 3----Observable 的創(chuàng)建

RxKotlin 例子不超過15行教程 4----Observer Subscribe 與 Hot/Cold Observable

RxKotlin 例子不超過15行教程 5----Subject

RxKotlin 例子不超過15行教程 6----Operator 與 Marble Diagram

RxKotlin 例子不超過15行教程 7----Backpressure Flowable 與 Subscriber 簡(jiǎn)介

RxKotlin 例子不超過15行教程 8----Error Handling

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容