RxKotlin 例子不超過15行教程 4----Observer Subscribe 與 Hot/Cold Observable

本節(jié)代碼中的 observer 就是第二節(jié)中的

Observer 與 Subscribe

按照慣例,先來看兩段能跑的代碼

兩段能跑的代碼

// 4.1.kt
import io.reactivex.Observable

fun main(args: Array<String>) {
    val observable: Observable<Int> = Observable.range(1, 3)
    observable.subscribe({  // 我知道你要問我為什么 subscribe 后面還可以接三個 Lambda,先看例子,下面說
        //onNext method
        println("Next $it")
    }, {
        //onError Method
        println("Error ${it.message}")
    }, {
        //onComplete Method
        println("All Completed")
    })
}

輸出

Next 1
Next 2
Next 3
All Completed

再來一段(上一節(jié)用過的例子)

// 3.11.kt
import io.reactivex.Observable

fun main(args: Array<String>) {
    Observable.range(1, 3).subscribe(observer)
}

輸出

New Subscription  // 上面那個例子沒有這一行
Next 1
Next 2
Next 3
All Completed

上面代碼的主旨

Observer

之前(第二節(jié))我們說過,一個 Observer 需要實現(xiàn)四個方法(它們的作用參見第二節(jié))

  • onNext
  • onComplete
  • onError
  • onSubscribe

當我們把 Observable 連接到 Observer 上的時候,系統(tǒng)會調(diào)用這四個方法并把相應的值傳給它們。

subscribe 的參數(shù)都能是什么

subscribe 在 ReactiveX 中有幾個重載方法,這里不列出?;灸J接羞@兩個

  • subscribe(onNext,onError,onComplete,onSubscribe)
    這幾個參數(shù)都可以省略,但是只能從后往前省略(這句是廢話)
    是廢話也要說,因為 subscribe 是在 Java 文件中定義的,不能使用 Kotlin 的命名參數(shù)
    4.1.kt 中省略了 onSubscribe
  • subscribe(observer)
    3.11.kt 已經(jīng)很清晰,這里不展開了

除了 subscribe 方法,還有 RxKotlin 提供的小語法糖 subscribeBy

這個函數(shù)是 RxKotlin 為 Observable (等可以 subscribe 的對象)定義的擴展函數(shù),函數(shù)定義如下

fun <T : Any> Observable<T>.subscribeBy(
        onError: (Throwable) -> Unit = onErrorStub,
        onComplete: () -> Unit = onCompleteStub,
        onNext: (T) -> Unit = onNextStub
        ): Disposable = subscribe(onNext, onError, onComplete)  // 好的好的,我知道你要問 Disposable 是什么,稍等。

因為被定義在 Kotlin 文件中,它可以使用命名參數(shù)(例子見 第一節(jié) 1.kt)

Subscribe

從之前的例子可知,subscribe 可以連接 ObservableObserver。
它有兩種形式(上面說過,這里再概括一下)

  • onNext 等,以參數(shù)的形式傳進去
  • 直接傳入一個 Observer 對象

如果你選擇第一種形式,那么 subscribe 方法是有返回值的,返回值類型是 Disposable (不要急,它的介紹馬上就到了)
如果你選擇第二種形式,那么 subscribe 方法是有返回值的
這兩種形式中的 onSubscribe 都是一個 (d:Disposable):Unit 類型的函數(shù)。
那么 Disposable 有什么用呢?

Disposable

disposable: 一次性的,可任意處理的; 用后就拋棄的; 免洗的; 可供使用的。講真,這幾個中文翻譯放在這里我覺得并不是很合適,我也沒有想到合適的中文翻譯(如果有合適的歡迎指出)。我就一直用英文了。
Disposable 對象的 dispose 方法可以停止本次訂閱
看一個例子
我保證這個例子是為數(shù)不多的長例子之一,真的不能再精簡了
下面示例用到了 lateinit 可以自行 Google 下,此處不介紹。(如果有好的鏈接歡迎發(fā)給我,加在這里)

// 4.2.kt
import io.reactivex.Observable
import io.reactivex.Observer
import io.reactivex.disposables.Disposable
import java.util.concurrent.TimeUnit

fun main(args: Array<String>) {

    val observale: Observable<Long> = Observable.interval(100, TimeUnit.MILLISECONDS)

    val observer: Observer<Long> = object : Observer<Long> {
        lateinit var disposable: Disposable

        override fun onSubscribe(d: Disposable) {
            disposable = d
        }

        override fun onNext(item: Long) {
            if (item >= 5 && !disposable.isDisposed) {
                disposable.dispose()
                println("Disposed")
            }
            println("Received $item")
        }

        override fun onError(e: Throwable) {
            println("Error ${e.message}")
        }

        override fun onComplete() {
            println("Complete")
        }

    }

    observale.subscribe(observer)
    Thread.sleep(1000)
}

輸出

Received 0
Received 1
Received 2
Received 3
Received 4
Disposed    // 注釋1
Received 5 // 注釋2
// 注釋3

注釋1
dispose 處理后不會執(zhí)行 observer 的 onComplete 方法(所以 Complete 沒有輸出)
注釋2
disposable.dispose() 之后,observer 不會再處理其它值(所以 Received 6 Received 7 等等并沒有輸出)
但是當前值依然會繼續(xù)處理(所以 Received 5 依然被輸出)
總結(jié)一下, Disposable 是用來控制訂閱的

下面我們回到 Observable,看看它的分類

Hot/Cold Observable

在本教程前面所有示例中,如果多次訂閱同一個 Observable,則所有訂閱都會得到從一開始的所有值。
例子

// 4.3.kt
import io.reactivex.Observable
import io.reactivex.rxkotlin.toObservable

// Cold Observables
fun main(args: Array<String>) {
    val observable: Observable<Int> = listOf(1, 2, 3, 4).toObservable()

    observable.subscribe(observer)

    observable.subscribe(observer)
}

輸出

New Subscription
Next 1
Next 2
Next 3
Next 4
All Completed
New Subscription
Next 1
Next 2
Next 3
Next 4
All Completed

我們可以看到每一個 Observer 都被推送了了從 1-4 的所有值。
到目前為止,我們遇到的所有 Observable 都是這樣的。
這樣的 Observable 被稱作 Cold Observable
我之前曾經(jīng)比喻 Observable 為電臺,這是有一些不恰當?shù)?。因為當你錯過時間再打開電臺會聽不到原先的內(nèi)容。
Cold Observable 更像是光盤(容量可能無限),隨時打開都能從頭開始聽。

電臺這個比喻更適合 Hot Observable,看下一個例子

// 4.4.kt
import io.reactivex.rxkotlin.toObservable

//Hot Observable
fun main(args: Array<String>) {
    val connectableObservable = listOf(1, 2, 3).toObservable().publish()  // 注釋1
    connectableObservable.subscribe({ println("Subscription 1: $it") })  // 描點1
    connectableObservable.subscribe({ println("Subscription 2: $it") })  // 描點2
    connectableObservable.connect()  // 注釋2
    connectableObservable.subscribe({ println("Subscription 3: $it") })  // 注釋3
}

輸出

Subscription 1: 1
Subscription 2: 1
Subscription 1: 2
Subscription 2: 2
Subscription 1: 3
Subscription 2: 3
// 并沒有輸出 Subscription 3

注釋1
我們用 publish 方法把 Cold Observable 變成 ConnectableObservable (ConnectableObservableHot Observable 的一種)
注釋2
ConnectableObservable描點1描點2 處都不會發(fā)送消息,它會在 注釋2 處(調(diào)用 connect 方法時)開始發(fā)送消息
Cold Observable 會在調(diào)用 subscribe 時開始發(fā)送消息
如果訂閱晚了(如 注釋3),則會錯過一些消息(在這里,注釋3 錯過了所有消息(計算機速度太快....),接下來有其他例子,不要急)
注釋3
訂閱3不會收到任何信息

我們來看下一個例子,在這個例子中,調(diào)用 connect 方法后我們又增加了新的訂閱,這個訂閱會丟失部分消息

import io.reactivex.Observable
import java.util.concurrent.TimeUnit

fun main(args: Array<String>) {
    val connectableObservable = Observable.interval(10, TimeUnit.MILLISECONDS).publish()
    connectableObservable.subscribe({ println("Subscription 1: $it") })
    connectableObservable.subscribe({ println("Subscription 2: $it") })
    connectableObservable.connect()  // ConnectableObservable 開始發(fā)送消息
    println("Sleep 1 starts")
    Thread.sleep(20)
    println("Sleep 1 ends")
    connectableObservable.subscribe({ println("Subscription 3: $it") })  // 不用再次調(diào)用 connect 方法
    println("Sleep 2 starts")
    Thread.sleep(30)
    println("Sleep 2 ends")
}

輸出(有點長)

Sleep 1 starts
Subscription 1: 0
Subscription 2: 0
Subscription 1: 1
Subscription 2: 1 // 注釋1
Sleep 1 ends      // 開始 訂閱3
Sleep 2 starts
Subscription 1: 2
Subscription 2: 2
Subscription 3: 2  // 注釋2
Subscription 1: 3
Subscription 2: 3
Subscription 3: 3
Subscription 1: 4
Subscription 2: 4
Subscription 3: 4
Sleep 2 ends

注釋1
到這里我們沒有開始 訂閱3 所以沒有輸出任何 Subscription 3
注釋2
訂閱3 的輸出是從 2 開始的,它錯過了 01

這一節(jié)到這里就 OK 了,明天說 Subject

RxKotlin 例子不超過15行教程 1----環(huán)境配置與初體驗

RxKotlin 例子不超過15行教程 2----Observable Observer 與 Subscribe 簡介

RxKotlin 例子不超過15行教程 3----Observable 的創(chuàng)建

RxKotlin 例子不超過15行教程 4----Observer Subscribe 與 Hot/Cold Observable

RxKotlin 例子不超過15行教程 5----Subject

RxKotlin 例子不超過15行教程 6----Operator 與 Marble Diagram

RxKotlin 例子不超過15行教程 7----Backpressure Flowable 與 Subscriber 簡介

RxKotlin 例子不超過15行教程 8----Error Handling

最后編輯于
?著作權歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

友情鏈接更多精彩內(nèi)容