本節(jié)代碼中的 observer 就是第二節(jié)中的
Observer 與 Subscribe
按照慣例,先來看兩段能跑的代碼
兩段能跑的代碼
// 4.1.kt
import io.reactivex.Observable
fun main(args: Array<String>) {
val observable: Observable<Int> = Observable.range(1, 3)
observable.subscribe({ // 我知道你要問我為什么 subscribe 后面還可以接三個 Lambda,先看例子,下面說
//onNext method
println("Next $it")
}, {
//onError Method
println("Error ${it.message}")
}, {
//onComplete Method
println("All Completed")
})
}
輸出
Next 1
Next 2
Next 3
All Completed
再來一段(上一節(jié)用過的例子)
// 3.11.kt
import io.reactivex.Observable
fun main(args: Array<String>) {
Observable.range(1, 3).subscribe(observer)
}
輸出
New Subscription // 上面那個例子沒有這一行
Next 1
Next 2
Next 3
All Completed
上面代碼的主旨
Observer
之前(第二節(jié))我們說過,一個 Observer 需要實現(xiàn)四個方法(它們的作用參見第二節(jié))
- onNext
- onComplete
- onError
- onSubscribe
當我們把 Observable 連接到 Observer 上的時候,系統(tǒng)會調(diào)用這四個方法并把相應的值傳給它們。
subscribe 的參數(shù)都能是什么
subscribe 在 ReactiveX 中有幾個重載方法,這里不列出?;灸J接羞@兩個
-
subscribe(onNext,onError,onComplete,onSubscribe)
這幾個參數(shù)都可以省略,但是只能從后往前省略(這句是廢話)
是廢話也要說,因為 subscribe 是在 Java 文件中定義的,不能使用 Kotlin 的命名參數(shù)
4.1.kt中省略了onSubscribe -
subscribe(observer)
3.11.kt已經(jīng)很清晰,這里不展開了
除了 subscribe 方法,還有 RxKotlin 提供的小語法糖 subscribeBy
這個函數(shù)是 RxKotlin 為 Observable (等可以 subscribe 的對象)定義的擴展函數(shù),函數(shù)定義如下
fun <T : Any> Observable<T>.subscribeBy(
onError: (Throwable) -> Unit = onErrorStub,
onComplete: () -> Unit = onCompleteStub,
onNext: (T) -> Unit = onNextStub
): Disposable = subscribe(onNext, onError, onComplete) // 好的好的,我知道你要問 Disposable 是什么,稍等。
因為被定義在 Kotlin 文件中,它可以使用命名參數(shù)(例子見 第一節(jié) 1.kt)
Subscribe
從之前的例子可知,subscribe 可以連接 Observable 與 Observer。
它有兩種形式(上面說過,這里再概括一下)
- 把
onNext等,以參數(shù)的形式傳進去 - 直接傳入一個
Observer對象
如果你選擇第一種形式,那么 subscribe 方法是有返回值的,返回值類型是 Disposable (不要急,它的介紹馬上就到了)
如果你選擇第二種形式,那么 subscribe 方法是沒有返回值的
這兩種形式中的 onSubscribe 都是一個 (d:Disposable):Unit 類型的函數(shù)。
那么 Disposable 有什么用呢?
Disposable
disposable: 一次性的,可任意處理的; 用后就拋棄的; 免洗的; 可供使用的。講真,這幾個中文翻譯放在這里我覺得并不是很合適,我也沒有想到合適的中文翻譯(如果有合適的歡迎指出)。我就一直用英文了。
Disposable 對象的 dispose 方法可以停止本次訂閱
看一個例子
我保證這個例子是為數(shù)不多的長例子之一,真的不能再精簡了
下面示例用到了 lateinit 可以自行 Google 下,此處不介紹。(如果有好的鏈接歡迎發(fā)給我,加在這里)
// 4.2.kt
import io.reactivex.Observable
import io.reactivex.Observer
import io.reactivex.disposables.Disposable
import java.util.concurrent.TimeUnit
fun main(args: Array<String>) {
val observale: Observable<Long> = Observable.interval(100, TimeUnit.MILLISECONDS)
val observer: Observer<Long> = object : Observer<Long> {
lateinit var disposable: Disposable
override fun onSubscribe(d: Disposable) {
disposable = d
}
override fun onNext(item: Long) {
if (item >= 5 && !disposable.isDisposed) {
disposable.dispose()
println("Disposed")
}
println("Received $item")
}
override fun onError(e: Throwable) {
println("Error ${e.message}")
}
override fun onComplete() {
println("Complete")
}
}
observale.subscribe(observer)
Thread.sleep(1000)
}
輸出
Received 0
Received 1
Received 2
Received 3
Received 4
Disposed // 注釋1
Received 5 // 注釋2
// 注釋3
注釋1
dispose 處理后不會執(zhí)行 observer 的 onComplete 方法(所以 Complete 沒有輸出)
注釋2
disposable.dispose() 之后,observer 不會再處理其它值(所以 Received 6 Received 7 等等并沒有輸出)
但是當前值依然會繼續(xù)處理(所以 Received 5 依然被輸出)
總結(jié)一下, Disposable 是用來控制訂閱的
下面我們回到 Observable,看看它的分類
Hot/Cold Observable
在本教程前面所有示例中,如果多次訂閱同一個 Observable,則所有訂閱都會得到從一開始的所有值。
例子
// 4.3.kt
import io.reactivex.Observable
import io.reactivex.rxkotlin.toObservable
// Cold Observables
fun main(args: Array<String>) {
val observable: Observable<Int> = listOf(1, 2, 3, 4).toObservable()
observable.subscribe(observer)
observable.subscribe(observer)
}
輸出
New Subscription
Next 1
Next 2
Next 3
Next 4
All Completed
New Subscription
Next 1
Next 2
Next 3
Next 4
All Completed
我們可以看到每一個 Observer 都被推送了了從 1-4 的所有值。
到目前為止,我們遇到的所有 Observable 都是這樣的。
這樣的 Observable 被稱作 Cold Observable
我之前曾經(jīng)比喻 Observable 為電臺,這是有一些不恰當?shù)?。因為當你錯過時間再打開電臺會聽不到原先的內(nèi)容。
Cold Observable 更像是光盤(容量可能無限),隨時打開都能從頭開始聽。
電臺這個比喻更適合 Hot Observable,看下一個例子
// 4.4.kt
import io.reactivex.rxkotlin.toObservable
//Hot Observable
fun main(args: Array<String>) {
val connectableObservable = listOf(1, 2, 3).toObservable().publish() // 注釋1
connectableObservable.subscribe({ println("Subscription 1: $it") }) // 描點1
connectableObservable.subscribe({ println("Subscription 2: $it") }) // 描點2
connectableObservable.connect() // 注釋2
connectableObservable.subscribe({ println("Subscription 3: $it") }) // 注釋3
}
輸出
Subscription 1: 1
Subscription 2: 1
Subscription 1: 2
Subscription 2: 2
Subscription 1: 3
Subscription 2: 3
// 并沒有輸出 Subscription 3
注釋1
我們用 publish 方法把 Cold Observable 變成 ConnectableObservable (ConnectableObservable 是 Hot Observable 的一種)
注釋2
ConnectableObservable 在 描點1 和 描點2 處都不會發(fā)送消息,它會在 注釋2 處(調(diào)用 connect 方法時)開始發(fā)送消息
而 Cold Observable 會在調(diào)用 subscribe 時開始發(fā)送消息
如果訂閱晚了(如 注釋3),則會錯過一些消息(在這里,注釋3 錯過了所有消息(計算機速度太快....),接下來有其他例子,不要急)
注釋3
訂閱3不會收到任何信息
我們來看下一個例子,在這個例子中,調(diào)用 connect 方法后我們又增加了新的訂閱,這個訂閱會丟失部分消息
import io.reactivex.Observable
import java.util.concurrent.TimeUnit
fun main(args: Array<String>) {
val connectableObservable = Observable.interval(10, TimeUnit.MILLISECONDS).publish()
connectableObservable.subscribe({ println("Subscription 1: $it") })
connectableObservable.subscribe({ println("Subscription 2: $it") })
connectableObservable.connect() // ConnectableObservable 開始發(fā)送消息
println("Sleep 1 starts")
Thread.sleep(20)
println("Sleep 1 ends")
connectableObservable.subscribe({ println("Subscription 3: $it") }) // 不用再次調(diào)用 connect 方法
println("Sleep 2 starts")
Thread.sleep(30)
println("Sleep 2 ends")
}
輸出(有點長)
Sleep 1 starts
Subscription 1: 0
Subscription 2: 0
Subscription 1: 1
Subscription 2: 1 // 注釋1
Sleep 1 ends // 開始 訂閱3
Sleep 2 starts
Subscription 1: 2
Subscription 2: 2
Subscription 3: 2 // 注釋2
Subscription 1: 3
Subscription 2: 3
Subscription 3: 3
Subscription 1: 4
Subscription 2: 4
Subscription 3: 4
Sleep 2 ends
注釋1
到這里我們沒有開始 訂閱3 所以沒有輸出任何 Subscription 3
注釋2
訂閱3 的輸出是從 2 開始的,它錯過了 0 和 1
這一節(jié)到這里就 OK 了,明天說 Subject
RxKotlin 例子不超過15行教程 1----環(huán)境配置與初體驗
RxKotlin 例子不超過15行教程 2----Observable Observer 與 Subscribe 簡介
RxKotlin 例子不超過15行教程 3----Observable 的創(chuàng)建
RxKotlin 例子不超過15行教程 4----Observer Subscribe 與 Hot/Cold Observable
RxKotlin 例子不超過15行教程 5----Subject
RxKotlin 例子不超過15行教程 6----Operator 與 Marble Diagram
RxKotlin 例子不超過15行教程 7----Backpressure Flowable 與 Subscriber 簡介