探索Android開源框架 - 3. RxJava使用及源碼解析

相關(guān)概念

Android多線程編程的原則:

  1. 不要阻塞UI線程;
  2. 不要在UI線程之外訪問(wèn)UI組件;

ReactiveX

  • Reactive Extensions的縮寫,一般簡(jiǎn)寫為Rx;
  • 是一個(gè)使用可觀察數(shù)據(jù)流進(jìn)行異步編程的編程接口,ReactiveX結(jié)合了觀察者模式、迭代器模式和函數(shù)式編程的精華;

RxJava

  • Reactive Extensions for the JVM:,RxJava就是ReactiveX在JVM平臺(tái)的實(shí)現(xiàn);
  • 基于事件流的鏈?zhǔn)秸{(diào)用,進(jìn)行耗時(shí)任務(wù),線程切換,其本質(zhì)是一個(gè)異步操作庫(kù);

原理

  • 基于觀察者模式, 一個(gè)方面的操作依賴于另一個(gè)方面的狀態(tài)變化,當(dāng)一個(gè)對(duì)象必須通知其他對(duì)象,又希望這個(gè)對(duì)象和其他被通知的對(duì)象是松散耦合的;

三個(gè)要素

  • 被觀察者(Observable),觀察者(Subscriber),訂閱(subscribe);

被觀察者

1. Observable
  • 可多次發(fā)送事件(onNext),直到 onComplete 或 onError 被調(diào)用結(jié)束訂閱;
  • 不支持背壓:當(dāng)被觀察者快速發(fā)送大量數(shù)據(jù)時(shí),下游不會(huì)做其他處理,即使數(shù)據(jù)大量堆積,調(diào)用鏈也不會(huì)報(bào)MissingBackpressureException,消耗內(nèi)存過(guò)大只會(huì)OOM。(官方給出以1000個(gè)事件為分界線作為參考);
val observable = Observable.create(ObservableOnSubscribe<Int> {
    it.onNext(1)
    it.onNext(3)
    it.onNext(5)
    it.onComplete()
    LjyLogUtil.d("${Thread.currentThread().name}_subscribe")
})
//Disposable通過(guò) dispose() ?方法來(lái)讓上游停?止?工作,達(dá)到「丟棄」的效果
var disposable: Disposable? = null
val observer = object : Observer<String> {
    override fun onSubscribe(d: Disposable) {
        //訂閱后發(fā)送數(shù)據(jù)之前, 回調(diào)這個(gè)方法,Disposable可用于取消訂閱
        disposable = d
        LjyLogUtil.d("${Thread.currentThread().name}_onSubscribe")
    }

    override fun onNext(t: String) {
        LjyLogUtil.d("${Thread.currentThread().name}_onNext:$t")
    }

    override fun onError(e: Throwable) {
        LjyLogUtil.d("${Thread.currentThread().name}_onError:${e.message}")
    }

    override fun onComplete() {
        LjyLogUtil.d("${Thread.currentThread().name}_onComplete")
    }
}

//rxJava的整體結(jié)構(gòu)是一條鏈,鏈的最上游是被觀察者observable,最下游是觀察者observer,
//鏈的中間各個(gè)節(jié)點(diǎn),既是其下游的observable,又是其上游的observer
observable
    //subscribeOn: 切換起源 Observable 的線程,
    // 當(dāng)多次調(diào)?用 subscribeOn() 的時(shí)候,只有最上?面的會(huì)對(duì)起源 Observable 起作?用,
    // 原因subscribeOn底層通過(guò)新建observable實(shí)現(xiàn)
    .subscribeOn(Schedulers.io())
    .subscribeOn(AndroidSchedulers.mainThread())
    //observeOn指定的是它之后的操作所在的線程,通過(guò)observeOn的多次調(diào)用,程序?qū)崿F(xiàn)了線程的多次切換
    //影響范圍observeOn是它下面的每個(gè)observer,除非又遇到新的observeOn
    .observeOn(Schedulers.io())
    .map {
        LjyLogUtil.d("map:${Thread.currentThread().name}")
        "num_$it"
    }
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(observer)
2. Flowable:
  • 和 Observable 一樣, 且支持Reactive-Streams和背壓;
  • 上游的被觀察者會(huì)響應(yīng)下游觀察者的數(shù)據(jù)請(qǐng)求,下游調(diào)用request(n)來(lái)告訴上游發(fā)送多少個(gè)數(shù)據(jù)。這樣避免了大量數(shù)據(jù)堆積在調(diào)用鏈上,使內(nèi)存一直處于較低水平;
var sub: Subscription? = null
Flowable.create(FlowableOnSubscribe<Int> {
    it.onNext(1)
    it.onNext(3)
    it.onNext(5)
    it.onComplete()
    LjyLogUtil.d("${Thread.currentThread().name}_subscribe")
    //使用create創(chuàng)建Flowable,需要指定背壓策略
}, BackpressureStrategy.BUFFER)

Flowable.range(0, 5)
    .subscribe(object : Subscriber<Int> {
        override fun onSubscribe(s: Subscription?) {
            //當(dāng)訂閱后,會(huì)首先調(diào)用這個(gè)方法,其實(shí)就相當(dāng)于onStart(),
            //傳入的Subscription s參數(shù)可以用于請(qǐng)求數(shù)據(jù)或者取消訂閱
            LjyLogUtil.d("${Thread.currentThread().name}_onSubscribe start")
            sub = s
            sub?.request(1)
            LjyLogUtil.d("${Thread.currentThread().name}_onSubscribe end")
        }

        override fun onNext(t: Int?) {
            LjyLogUtil.d("${Thread.currentThread().name}_onNext:$t")
            sub?.request(1)
        }

        override fun onError(t: Throwable?) {
            LjyLogUtil.d("${Thread.currentThread().name}_onError:${t?.message}")
        }

        override fun onComplete() {
            LjyLogUtil.d("${Thread.currentThread().name}_onComplete")
        }

    })
3. Single
  • 單次發(fā)送事件(onSuccess、onError),發(fā)完即結(jié)束訂閱;
  • 總是只發(fā)射一個(gè)值,或者一個(gè)錯(cuò)誤通知,而不是發(fā)射一系列的值(當(dāng)然就不存在背壓?jiǎn)栴});
  • 一般一個(gè)接口只是一次請(qǐng)求一次返回,所以使用Single 與 Retrofit 配合是更為合理;
Single.just(1)
    .map { "num_$it" }
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe { num ->
        LjyLogUtil.d(num)
    }
Single.create(SingleOnSubscribe<String> { emitter ->
    LjyLogUtil.d("${Thread.currentThread().name}_subscribe")
    emitter.onSuccess("str1")
    emitter.onSuccess("str2")//錯(cuò)誤寫法,重復(fù)調(diào)用也不會(huì)處理,因?yàn)橹粫?huì)調(diào)用一次
}).subscribe(object : SingleObserver<String> {
    override fun onSubscribe(d: Disposable) {
        LjyLogUtil.d("${Thread.currentThread().name}_onSubscribe")
    }

    override fun onSuccess(t: String) {
        LjyLogUtil.d("${Thread.currentThread().name}_onSuccess:$t")
    }

    override fun onError(e: Throwable) {
        LjyLogUtil.d("${Thread.currentThread().name}_onError:${e.message}")
    }

})
4. Completable
  • 單次發(fā)送事件(onError、onComplete),發(fā)完即結(jié)束訂閱;
  • 如果你的觀察者連onNext事件都不關(guān)心,可以使用Completable,它只有onComplete和onError兩個(gè)事件,要轉(zhuǎn)換成其他類型的被觀察者,也是可以使用toFlowable()、toObservable()等方法去轉(zhuǎn)換;
Completable.create { emitter ->
    LjyLogUtil.d("${Thread.currentThread().name}_subscribe")
    emitter.onComplete()//單一onComplete或者onError
}.subscribe(object : CompletableObserver {
    override fun onSubscribe(d: Disposable) {
        LjyLogUtil.d("${Thread.currentThread().name}_onSubscribe")
    }

    override fun onComplete() {
        LjyLogUtil.d("${Thread.currentThread().name}_onComplete")
    }

    override fun onError(e: Throwable) {
        LjyLogUtil.d("${Thread.currentThread().name}_onError:${e.message}")
    }

})
5. Maybe
  • 單次發(fā)送事件(onSuccess、onError、onComplete),發(fā)完即結(jié)束訂閱。相當(dāng)于 Completable 和 Single 結(jié)合;
  • 如果可能發(fā)送一個(gè)數(shù)據(jù)或者不會(huì)發(fā)送任何數(shù)據(jù),這時(shí)候你就需要Maybe,它類似于Single和Completable的混合體;
  • onSuccess和onComplete是互斥的存在;
Maybe.create(MaybeOnSubscribe<String> {
    it.onSuccess("str1")
    it.onComplete()
}).subscribe(object : MaybeObserver<String> {
    override fun onSubscribe(d: Disposable) {
        LjyLogUtil.d("${Thread.currentThread().name}_onSubscribe")
    }

    override fun onSuccess(t: String) {
        LjyLogUtil.d("${Thread.currentThread().name}_onSuccess:$t")
    }

    override fun onError(e: Throwable) {
        LjyLogUtil.d("${Thread.currentThread().name}_onError:${e.message}")
    }

    override fun onComplete() {
        LjyLogUtil.d("${Thread.currentThread().name}_onComplete")
    }

})
  • 五種被觀察者可通過(guò)toObservable,toFlowable,toSingle,toCompletable,toMaybe相互轉(zhuǎn)換;

觀察者

val observer = object : Observer<Int> {
        override fun onSubscribe(d: Disposable) {
            LjyLogUtil.d("${Thread.currentThread().name}_onSubscribe")
        }

        override fun onNext(t: Int) {
            LjyLogUtil.d("${Thread.currentThread().name}_onNext:$t")
        }

        override fun onError(e: Throwable) {
            LjyLogUtil.d("${Thread.currentThread().name}_onError:${e.message}")
        }

        override fun onComplete() {
            LjyLogUtil.d("${Thread.currentThread().name}_onComplete")
        }
    }

線程調(diào)度器(Schedulers)

  • 指定 被觀察者(Observable)/ 觀察者(Observer)的工作線程,簡(jiǎn)化了異步操作;
  • 默認(rèn)在創(chuàng)建自身的線程;
  • 配合操作符 subscribeOn , observeOn 使用;
AndroidSchedulers.mainThread()
  • 需要引用RxAndroid, 切換到UI線程(Android的主線程), 為Android開發(fā)定制;
Schedulers.io()
  • 用于IO密集型任務(wù),如讀寫SD卡文件,查詢數(shù)據(jù)庫(kù),訪問(wèn)網(wǎng)絡(luò)等;
  • 具有線程緩存機(jī)制,默認(rèn)是一個(gè)CacheThreadScheduler;
Schedulers.newThread()
  • 為每一個(gè)任務(wù)創(chuàng)建一個(gè)新線程;
  • 不具有線程緩存機(jī)制,雖然使用Schedulers.io的地方,都可以使用Schedulers.newThread,但是,Schedulers.newThread的效率沒(méi)有Schedulers.io高;
Schedulers.computation()
  • 用于CPU 密集型計(jì)算任務(wù),即不會(huì)被 I/O 等操作限制性能的耗時(shí)操作,例如xml,json文件的解析,Bitmap圖片的壓縮取樣等,具有固定的線程池,大小為CPU的核數(shù)。不可以用于I/O操作,因?yàn)镮/O操作的等待時(shí)間會(huì)浪費(fèi)CPU。
Schedulers.trampoline()
  • 在當(dāng)前線程立即執(zhí)行任務(wù),如果當(dāng)前線程有任務(wù)在執(zhí)行,則會(huì)將其暫停,等插入進(jìn)來(lái)的任務(wù)執(zhí)行完之后,再將未完成的任務(wù)接著執(zhí)行;
Schedulers.single()
  • 擁有一個(gè)線程單例,所有的任務(wù)都在這一個(gè)線程中執(zhí)行,當(dāng)此線程中有任務(wù)執(zhí)行時(shí),其他任務(wù)將會(huì)按照先進(jìn)先出的順序依次執(zhí)行;
Scheduler.from(executor)
  • 指定一個(gè)線程調(diào)度器,由此調(diào)度器來(lái)控制任務(wù)的執(zhí)行策略;

背壓(Backpressure)

  • 背壓是指在異步場(chǎng)景中,被觀察者發(fā)送事件速度遠(yuǎn)快于觀察者的處理速度的情況下,一種告訴上游的被觀察者降低發(fā)送速度的策略;
  • 支持背壓的被觀察者為Flowable;
  • Android中很少用到,除非在線視頻流,直播等場(chǎng)景,當(dāng)畫面卡頓已取得的數(shù)據(jù)失效了,需要拋棄等;
背壓策略模式
BackpressureStrategy.MISSING
  • 在此策略下,通過(guò)Create方法創(chuàng)建的Flowable相當(dāng)于沒(méi)有指定背壓策略,不會(huì)對(duì)通過(guò)onNext發(fā)射的數(shù)據(jù)做緩存或丟棄處理,需要下游通過(guò)背壓操作符處理
BackpressureStrategy.ERROR:
  • 在此策略下,如果放入Flowable的異步緩存池中的數(shù)據(jù)超限了,則會(huì)拋出MissingBackpressureException異常;
BackpressureStrategy.BUFFER:
  • 內(nèi)部維護(hù)了一個(gè)緩存池SpscLinkedArrayQueue,其大小不限,此策略下,如果Flowable默認(rèn)的異步緩存池滿了,會(huì)通過(guò)此緩存池暫存數(shù)據(jù),它與Observable的異步緩存池一樣,可以無(wú)限制向里添加數(shù)據(jù),不會(huì)拋出MissingBackpressureException異常,但會(huì)導(dǎo)致OOM;
  • 當(dāng)緩存區(qū)大小存滿(默認(rèn)緩存區(qū)大小 = 128)、被觀察者仍然繼續(xù)發(fā)送下1個(gè)事件時(shí),將緩存區(qū)大小設(shè)置成無(wú)限大,被觀察者可無(wú)限發(fā)送事件 觀察者,但實(shí)際上是存放在緩存區(qū),但要注意內(nèi)存情況,防止出現(xiàn)OOM;
BackpressureStrategy.DROP
  • 在此策略下,如果Flowable的異步緩存池滿了,會(huì)丟掉上游發(fā)送的數(shù)據(jù);
BackpressureStrategy.LATEST
  • 與Drop策略一樣,如果緩存池滿了,會(huì)丟掉將要放入緩存池中的數(shù)據(jù),不同的是,不管緩存池的狀態(tài)如何,LATEST都會(huì)將最后一條數(shù)據(jù)強(qiáng)行放入緩存池中,來(lái)保證觀察者在接收到完成通知之前,能夠接收到Flowable最新發(fā)射的一條數(shù)據(jù);
  • 即如果發(fā)送了150個(gè)事件,緩存區(qū)里會(huì)保存129個(gè)事件(第1-第128 + 第150事件);
RxJava 2.0內(nèi)部提供 封裝了背壓策略模式的方法
  • 默認(rèn)采用BackpressureStrategy.ERROR模式;
onBackpressureBuffer()
onBackpressureDrop()
onBackpressureLatest()

Flowable.interval(1, TimeUnit.MILLISECONDS)
    //添加背壓策略封裝好的方法,此處選擇Buffer模式,即緩存區(qū)大小無(wú)限制
    .onBackpressureBuffer()
    .observeOn(Schedulers.newThread())
    .subscribe(subscriber)

冷熱流

Cold Observable

  • subscribe時(shí)才會(huì)發(fā)射數(shù)據(jù);
  • 常見(jiàn)的工廠方法提供的都是ColdObservable,包括just(),fromXX,create(),interval(),defer();
  • 當(dāng)你有多個(gè)Subscriber的時(shí)候,他們的事件是獨(dú)立的,示例代碼如下:
val interval = Observable.interval(1, TimeUnit.SECONDS)
var disposable1: Disposable? = null
val observer1 = object : Observer<Long> {
    override fun onSubscribe(d: Disposable) {
        disposable1 = d
    }

    override fun onNext(t: Long) {
        LjyLogUtil.d("觀察者1:$t")
    }

    override fun onError(e: Throwable) {

    }

    override fun onComplete() {

    }
}

var disposable2: Disposable? = null
val observer2 = object : Observer<Long> {
    override fun onSubscribe(d: Disposable) {
        disposable1 = d
    }

    override fun onNext(t: Long) {
        LjyLogUtil.d("觀察者2:$t")
    }

    override fun onError(e: Throwable) {

    }

    override fun onComplete() {

    }
}

findViewById<Button>(R.id.btn_subscribe1)
    .clicks().subscribe {
        interval.subscribe(observer1)
    }
findViewById<Button>(R.id.btn_dispose1)
    .clicks().subscribe {
        disposable1?.dispose()
    }


findViewById<Button>(R.id.btn_subscribe2)
    .clicks().subscribe {
        interval.subscribe(observer2)
    }
findViewById<Button>(R.id.btn_dispose2)
    .clicks().subscribe {
        disposable2?.dispose()
    }

Hot Observable

  • 對(duì)于Hot Observable的所有subscriber,他們會(huì)在同一時(shí)刻收到相同的數(shù)據(jù);
  • 通常使用publish()操作符來(lái)將ColdObservable變?yōu)镠ot?;蛘?使用 Subjects 也是Hot Observable;
  • 如果他開始傳輸數(shù)據(jù),你不主動(dòng)喊停(dispose()/cancel()),那么他就不會(huì)停,一直發(fā)射數(shù)據(jù),即使他已經(jīng)沒(méi)有Subscriber了;
val interval = Observable.interval(1, TimeUnit.SECONDS).publish()
var disposable: Disposable? = null
findViewById<Button>(R.id.btn_connect)
    .clicks().subscribe {
        disposable = interval.connect()
    }
findViewById<Button>(R.id.btn_dispose)
    .clicks().subscribe {
        disposable?.dispose()
    }
findViewById<Button>(R.id.btn_subscribe3)
    .clicks().subscribe {
        interval.subscribe {
            LjyLogUtil.d("觀察者3:$it")
        }
    }
findViewById<Button>(R.id.btn_subscribe4)
    .clicks().subscribe {
        interval.subscribe {
            LjyLogUtil.d("觀察者4:$it")
        }
    }

操作符

  • 操作符很多,不用完全背下來(lái),瀏覽一遍,用時(shí)知道在哪找(關(guān)注+收藏本文[狗頭])即可

線程切換

  • subscribeOn:指定被觀察者Observable的工作線程;
  • observeOn:指定觀察者的observer工作線程;
observable
    //subscribeOn: 切換起源 Observable 的線程,
    // 當(dāng)多次調(diào)?用 subscribeOn() 的時(shí)候,只有最上?面的會(huì)對(duì)起源 Observable 起作?用,
    // 原因subscribeOn底層通過(guò)新建observable實(shí)現(xiàn)
    .subscribeOn(Schedulers.io())
    .subscribeOn(AndroidSchedulers.mainThread())
    //observeOn指定的是它之后的操作所在的線程,通過(guò)observeOn的多次調(diào)用,程序?qū)崿F(xiàn)了線程的多次切換
    //影響范圍observeOn是它下面的每個(gè)observer,除非又遇到新的observeOn
    .observeOn(Schedulers.io())
    .map {
        LjyLogUtil.d("map:${Thread.currentThread().name}")
        "num_$it"
    }
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(observer)

創(chuàng)建操作

基本創(chuàng)建
create
  • 通過(guò)調(diào)用觀察者的方法從頭創(chuàng)建一個(gè)Observable;
Observable.create(ObservableOnSubscribe<Int> {
    LjyLogUtil.d("${Thread.currentThread().name}_subscribe")
    it.onNext(1)
    it.onNext(3)
    it.onNext(5)
    it.onComplete()
})
快速創(chuàng)建
just
  • 將對(duì)象或者對(duì)象集合轉(zhuǎn)換為一個(gè)會(huì)發(fā)射這些對(duì)象的Observable;
Observable.just(1, 2, 3, 4)
fromArray & fromIterable
  • 將其它的對(duì)象或數(shù)據(jù)結(jié)構(gòu)轉(zhuǎn)換為Observable;
Observable.fromArray(arrayOf(1,2,3))

Observable.fromIterable(listOf(4,5,6))
never
  • 創(chuàng)建的被觀察者不發(fā)送任何事件, 觀察者接收后什么都不調(diào)用;
Observable.never<Int>().subscribe(observer)
empty
  • 創(chuàng)建的被觀察者僅發(fā)送Complete事件,直接通知完成, 觀察者接收后會(huì)直接調(diào)用onCompleted;
Observable.empty<Int>().subscribe(observer)
error
  • 創(chuàng)建的被觀察者僅發(fā)送Error事件,直接通知異常, 觀察者接收后會(huì)直接調(diào)用onError;
Observable.error<Int>(RuntimeException()).subscribe(observer)
延遲創(chuàng)建
defer:
  • 在觀察者訂閱之前不創(chuàng)建這個(gè)Observable,為每一個(gè)觀察者創(chuàng)建一個(gè)新的Observable;
var num=1
val observable=Observable.defer {Observable.just(num)}
num=2
observable.subscribe(observer)
timer
  • 創(chuàng)建在一個(gè)指定的延遲之后發(fā)射單個(gè)數(shù)據(jù)的Observable;
//延時(shí)3秒后,發(fā)送一個(gè)整數(shù)0
Observable.timer(3, TimeUnit.SECONDS)
interval & intervalRange
  • 創(chuàng)建一個(gè)定時(shí)發(fā)射整數(shù)序列的Observable;
//初始延時(shí)1秒,每3秒發(fā)一個(gè)自增整數(shù)
Observable.interval(1, 3, TimeUnit.SECONDS)

//初始延時(shí)2秒,后每1秒發(fā)一個(gè)從10開始的整數(shù),發(fā)5個(gè)(發(fā)到14)停止
Observable.intervalRange(10, 5, 2, 1, TimeUnit.SECONDS);
range & rangeLong
  • 創(chuàng)建發(fā)射指定范圍的整數(shù)序列的Observable;
Observable.range(0, 5)

Observable.rangeLong(0, 5)
Repeat
  • 創(chuàng)建重復(fù)發(fā)射特定的數(shù)據(jù)或數(shù)據(jù)序列的Observable;
//一直重復(fù)
Observable.fromArray(1, 2, 3, 4).repeat()
//重復(fù)發(fā)送5次
Observable.fromArray(1, 2, 3, 4).repeat(5)
//重復(fù)發(fā)送直到符合條件時(shí)停止重復(fù)
Observable.fromArray(1, 2, 3, 4).repeatUntil { false }
//
Observable.just(1, 2, 3, 4)
    .repeatWhen {
        it.flatMap { obj ->
            if (obj is NumberFormatException) {
                Observable.error(Throwable("repeatWhen終止"))
            } else {
                Observable.just(5, 6, 7)
            }
        }
    }.subscribe(observer)

2. 變換操作:

map
  • 映射,通過(guò)對(duì)序列的每一項(xiàng)都應(yīng)用一個(gè)函數(shù)變換Observable發(fā)射的數(shù)據(jù),實(shí)質(zhì)是對(duì)序列中的每一項(xiàng)執(zhí)行一個(gè)函數(shù),函數(shù)的參數(shù)就是這個(gè)數(shù)據(jù)項(xiàng);
Observable.just("1", "2", "3").map { it.toInt() }.subscribe(observer)
flatMap & concatMap
  • 扁平映射,將Observable發(fā)射的數(shù)據(jù)變換為Observables集合,然后將這些Observable發(fā)射的數(shù)據(jù)平坦化的放進(jìn)一個(gè)單獨(dú)的Observable,可以認(rèn)為是一個(gè)將嵌套的數(shù)據(jù)結(jié)構(gòu)展開的過(guò)程;
//concatMap與flatMap的區(qū)別: concatMap是有序的,flatMap是無(wú)序的
Observable.just("A", "B", "C")
    .flatMap { x ->
        Observable.intervalRange(1, 3, 0, 1, TimeUnit.SECONDS).map { y ->
            "($x,$y)"
        }
    }
Observable.just("A", "B", "C")
    .concatMap { m ->
        Observable.intervalRange(1, 3, 0, 1, TimeUnit.SECONDS).map { n ->
            "($m,$n)"
        }
    }
groupBy
  • 分組,將原來(lái)的Observable分拆為Observable集合,將原始Observable發(fā)射的數(shù)據(jù)按Key分組,每一個(gè)Observable發(fā)射一組不同的數(shù)據(jù);
Observable.just(
    "Tiger",
    "Elephant",
    "Cat",
    "Chameleon",
    "Frog",
    "Fish",
    "Turtle",
    "Flamingo"
)
    .groupBy { it[0].uppercaseChar() }
    .concatMapSingle { it.toList() }
    .subscribe(observer4)
scan
  • 掃描,對(duì)Observable發(fā)射的每一項(xiàng)數(shù)據(jù)應(yīng)用一個(gè)函數(shù),然后按順序依次發(fā)射這些值;
  • 對(duì)發(fā)射的數(shù)據(jù)和上一輪發(fā)射的數(shù)據(jù)進(jìn)行函數(shù)處理,并返回的數(shù)據(jù)供下一輪使用,持續(xù)這個(gè)過(guò)程來(lái)產(chǎn)生剩余的數(shù)據(jù)流。其應(yīng)用場(chǎng)景有簡(jiǎn)單的累加計(jì)算,判斷所有數(shù)據(jù)的最小值等;
Observable.just(1, 2, 3, 4)
    .scan { t1, t2 -> t1 + t2 }
    .subscribe(observer)
buffer
  • 緩存,定期從Observable收集數(shù)據(jù)到一個(gè)集合,然后把這些數(shù)據(jù)集合打包發(fā)射,而不是一次發(fā)射一個(gè)
Observable.just(1, 2, 3, 4, 5, 6, 7, 8)
    .buffer(3)
    .subscribe(observer5)
Window
  • 窗口,定期將來(lái)自O(shè)bservable的數(shù)據(jù)分拆成一些Observable窗口,然后發(fā)射這些窗口,而不是每次發(fā)射一項(xiàng); 類似于Buffer,但Buffer發(fā)射的是數(shù)據(jù),Window發(fā)射的是Observable,每一個(gè)Observable發(fā)射原始Observable的數(shù)據(jù)的一個(gè)子集;
//window操作符和buffer操作符在功能上實(shí)現(xiàn)的效果是一樣的,但window操作符最大區(qū)別在于同樣是緩存一定數(shù)量的數(shù)據(jù)項(xiàng),
// window操作符最終發(fā)射出來(lái)的是新的事件流integerObservable,而buffer操作符發(fā)射出來(lái)的是新的數(shù)據(jù)流,
// 也就是說(shuō),window操作符發(fā)射出來(lái)新的事件流中的數(shù)據(jù)項(xiàng),還可以經(jīng)過(guò)Rxjava其他操作符進(jìn)行處理。
Observable.just(1, 2, 3, 4)
    .window(2, 1)
    .subscribe(observer6)

3. 過(guò)濾操作:

Filter
  • 過(guò)濾,過(guò)濾掉沒(méi)有通過(guò)謂詞測(cè)試的數(shù)據(jù)項(xiàng),只發(fā)射通過(guò)測(cè)試的;
Observable.just(1, 2, 3, 4, 5)
    .filter {
            it % 2 == 0
    }.subscribe(observer)
ofType
  • 過(guò)濾特定類型的數(shù)據(jù);
Observable.just(1, 2.1, "3", 4L, 5.0)
    .ofType(Int::class.java)
    .subscribe(observer)
distinct & distinctUntilChanged
  • 去重,過(guò)濾掉重復(fù)數(shù)據(jù)項(xiàng);
Observable.just(1, 2, 3, 4, 3, 2, 1)
    .distinct()
//  .distinctUntilChanged()//去掉相鄰連續(xù)重復(fù)數(shù)據(jù)
    .subscribe(observer)
skip & skipLast
val observable = Observable.just(1, 2, 3, 4, 5, 6, 7, 8)
// 跳過(guò), 跳過(guò)前面的若干項(xiàng)數(shù)據(jù);
observable.skip(4).subscribe(observer)
//skipLast:跳過(guò)后面的若干項(xiàng)數(shù)據(jù)
observable.skipLast(4).subscribe(observer)
take & takeLast
val observable = Observable.just(1, 2, 3, 4, 5, 6, 7, 8)
//take:只保留前面的若干項(xiàng)數(shù)據(jù)
observable.take(4).subscribe(observer)
//takeLast:只保留后面的若干項(xiàng)數(shù)據(jù)
observable.takeLast(4).subscribe(observer)
debounce
  • 只有在空閑了一段時(shí)間后才發(fā)射數(shù)據(jù),通俗的說(shuō),就是如果一段時(shí)間沒(méi)有操作,就執(zhí)行一次操作;
val observable2 = Observable.create<Int> {
    it.onNext(1)
    Thread.sleep(400)
    it.onNext(2)
    Thread.sleep(1200)
    it.onNext(3)
    Thread.sleep(1000)
    it.onNext(4)
    Thread.sleep(800)
    it.onNext(5)
    Thread.sleep(2000)
    it.onNext(6)
}
observable2
    .subscribeOn(Schedulers.io())
    .observeOn(Schedulers.io())
    .debounce(1, TimeUnit.SECONDS)
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(observer)
sample
  • 取樣,定期發(fā)射最新的數(shù)據(jù),等于是數(shù)據(jù)抽樣;
//sample:與debounce的區(qū)別是,sample是以時(shí)間為周期的發(fā)射,一秒又一秒內(nèi)的最新數(shù)據(jù)。而debounce是最后一個(gè)有效數(shù)據(jù)開始
observable2
    .subscribeOn(Schedulers.io())
    .observeOn(Schedulers.io())
    .sample(1, TimeUnit.SECONDS)
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(observer)
throttleFirst & throttleLast & throttleWithTimeout & throttleLatest
  • throttleFirst是指定周期內(nèi)第一個(gè)數(shù)據(jù),throttleLast與sample一致。throttleWithTimeout與debounce一致;
observable2
    .subscribeOn(Schedulers.io())
    .observeOn(Schedulers.io())
    .throttleFirst(1, TimeUnit.SECONDS)
//  .throttleLast(1, TimeUnit.SECONDS)
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(observer)
timeout
  • 后一個(gè)數(shù)據(jù)發(fā)射未在前一個(gè)元素發(fā)射后規(guī)定時(shí)間內(nèi)發(fā)射則返回超時(shí)異常;
observable2
    .subscribeOn(Schedulers.io())
    .observeOn(Schedulers.io())
    .timeout(1, TimeUnit.SECONDS)
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(observer)
elementAt
  • 取值,取特定位置的數(shù)據(jù)項(xiàng);
Observable.just(4, 3, 2, 1)
    .elementAt(1)
//出現(xiàn)越界時(shí),拋出異常
//  .elementAtOrError(1)
    .subscribe(maybeObserver)
first
  • 首項(xiàng),只發(fā)射滿足條件的第一條數(shù)據(jù);
Observable.just(1, 2, 3, 4, 5)
//  .first(-1)
//  .firstOrError()
    .firstElement()
    .subscribe(maybeObserver)
last
  • 末項(xiàng),只發(fā)射最后一條數(shù)據(jù);
 Observable.just(1, 2, 3, 4, 5)
//  .last(-1)
//  .lastOrError()
    .lastElement()
    .subscribe(maybeObserver)
IgnoreElements
  • 忽略所有的數(shù)據(jù),只保留終止通知(onError或onCompleted),ignoreElements 作用于Flowable、Observable。ignoreElement作用于Maybe、Single;
Observable.just(1, 2, 3, 4, 5)
    .ignoreElements()
    .subscribe(object : CompletableObserver {
        override fun onSubscribe(d: Disposable) {
            LjyLogUtil.d("${Thread.currentThread().name}_onSubscribe")
        }

        override fun onComplete() {
            LjyLogUtil.d("${Thread.currentThread().name}_onComplete")
        }

        override fun onError(e: Throwable) {
            LjyLogUtil.d("${Thread.currentThread().name}_onError:${e.message}")
        }
    })

4. 組合操作:

concat & concatArray
  • 不交錯(cuò)的連接多個(gè)Observable的數(shù)據(jù);
  • 組合多個(gè)被觀察者一起發(fā)送數(shù)據(jù),合并后 按發(fā)送順序串行執(zhí)行;
val just1 = Observable.just(1, 2, 3)
val just2 = Observable.just("A", "B", "C")
val just3 = Observable.just(4, 5, 6)
Observable.concat(just1, just2).subscribe(observer3)

//concat組合被觀察者數(shù)量<=4個(gè),而concatArray則可>4個(gè)
Observable.concatArray(
    Observable.just(1, 2, 3),
    Observable.just(4, 5, 6),
    Observable.just(7, 8, 9)
).subscribe(observer3)
merge & mergeArray
  • 組合多個(gè)被觀察者一起發(fā)送數(shù)據(jù),合并后 按時(shí)間線并行執(zhí)行;
  • merge和concat的區(qū)別: merge合并后發(fā)射的數(shù)據(jù)項(xiàng)是并行無(wú)序的,concat合并后發(fā)射的數(shù)據(jù)項(xiàng)是串行有序的;
Observable.merge(just1, just2).subscribe(observer3)
//mergeWith
just1.mergeWith(just3).subscribe(observer3)
zip
  • 打包,使用一個(gè)指定的函數(shù)將多個(gè)Observable發(fā)射的數(shù)據(jù)組合在一起,然后將這個(gè)函數(shù)的結(jié)果作為單項(xiàng)數(shù)據(jù)發(fā)射;
//zip操作符是將兩個(gè)數(shù)據(jù)流進(jìn)行指定的函數(shù)規(guī)則合并
Observable.zip(just1, just2, { t1, t2 -> "${t1}_${t2}" })
    .subscribe(observer3)
//zipWith
just1.zipWith(just2, { t1, t2 -> "${t1}_${t2}" })
    .subscribe(observer3)
startWith & startWithArray
  • 在發(fā)射原來(lái)的Observable的數(shù)據(jù)序列之前,先發(fā)射一個(gè)指定的數(shù)據(jù)序列或數(shù)據(jù)項(xiàng);
Observable.just(1,2,3)
    .startWith(Observable.just(4,5,6))
    .startWithArray(7,8,9)
    .subscribe(observer3)
join
  • 無(wú)論何時(shí),如果一個(gè)Observable發(fā)射了一個(gè)數(shù)據(jù)項(xiàng),只要在另一個(gè)Observable發(fā)射的數(shù)據(jù)項(xiàng)定義的時(shí)間窗口內(nèi),就將兩個(gè)Observable發(fā)射的數(shù)據(jù)合并發(fā)射;
just1.join(just2,
    //規(guī)定just2的過(guò)期期限
    { Observable.timer(3, TimeUnit.SECONDS) },
    //規(guī)定just1的過(guò)期期限
    { Observable.timer(8, TimeUnit.SECONDS) },
    //規(guī)定just1和just2的合并規(guī)則
    { t1, t2 -> "${t1}_${t2}" })
    .subscribe(observer3)
combineLatest
  • 當(dāng)兩個(gè)Observables中的任何一個(gè)發(fā)射了一個(gè)數(shù)據(jù)時(shí),通過(guò)一個(gè)指定的函數(shù)組合每個(gè)Observable發(fā)射的最新數(shù)據(jù)(一共兩個(gè)數(shù)據(jù)),然后發(fā)射這個(gè)函數(shù)的結(jié)果;
Observable.combineLatest(just1, just2,
    { t1, t2 -> "${t1}_${t2}" }).subscribe(observer3)
switch
  • 將一個(gè)發(fā)射Observable序列的Observable轉(zhuǎn)換為這樣一個(gè)Observable:它逐個(gè)發(fā)射那些Observable最近發(fā)射的數(shù)據(jù);
Observable.switchOnNext(ObservableSource<Observable<Int>> { }, 12)
collect
  • 將被觀察者Observable發(fā)送的數(shù)據(jù)事件收集到一個(gè)數(shù)據(jù)結(jié)構(gòu)里;
Observable.just(1, 2, 3, 4, 5, 6)
    .collect(
        { ArrayList() },
        BiConsumer<ArrayList<Int?>, Int> { t1, t2 -> t1.add(t2) }
    ).subscribe(Consumer { LjyLogUtil.d("num: $it") })
count
  • 計(jì)算Observable發(fā)射的數(shù)據(jù)個(gè)數(shù),然后發(fā)射這個(gè)結(jié)果;
Observable.just(1, 2, 3)
    .count()
    .subscribe(object : SingleObserver<Long>{
        override fun onSubscribe(d: Disposable) {
            LjyLogUtil.d("${Thread.currentThread().name}_onSubscribe")
        }

        override fun onSuccess(t: Long) {
            LjyLogUtil.d("${Thread.currentThread().name}_onSuccess:$it")
        }

        override fun onError(e: Throwable) {
            LjyLogUtil.d("${Thread.currentThread().name}_onError:${e.message}")
        }

    })

5. 錯(cuò)誤處理:

cast
  • 將數(shù)據(jù)元素轉(zhuǎn)型成其他類型,轉(zhuǎn)型失敗會(huì)拋出異常;
Observable.just(1, 4.0, 3f, 7, 12, 4.6, 5)
    .cast(Int::class.java)
    .subscribe(observer)
onErrorReturn
  • 調(diào)用數(shù)據(jù)源的onError函數(shù)后會(huì)回到該函數(shù),可對(duì)錯(cuò)誤進(jìn)行處理,然后返回值,會(huì)調(diào)用觀察者onNext()繼續(xù)執(zhí)行,執(zhí)行完調(diào)用onComplete()函數(shù)結(jié)束所有事件的發(fā)射;
Observable.just(1, 2, 3.2, 4)
    .map { it.toInt() }
    .onErrorReturn {
        if (it is NumberFormatException) {
            0
        } else {
            throw IllegalArgumentException()
        }
    }.subscribe(observer)
onErrorReturnItem
  • 與onErrorReturn類似,onErrorReturnItem不對(duì)錯(cuò)誤進(jìn)行處理,直接返回一個(gè)值;
Observable.just(1, 2, 3.2, 4)
    .map { it.toInt() }
    .onErrorReturnItem(0)
    .subscribe(observer)
onErrorResumeNext & onExceptionResumeNext
  • 遇到錯(cuò)誤時(shí),發(fā)送1個(gè)新的Observable;
Observable.just(1, 2, 3.2, 4)
    .map { it.toInt() }
    .onErrorResumeNext { Observable.just(5,6,7) }
    .subscribe(observer)
retry
  • 重試,如果Observable發(fā)射了一個(gè)錯(cuò)誤通知,重新訂閱它,期待它正常終止;
//retry:當(dāng)發(fā)生錯(cuò)誤時(shí),數(shù)據(jù)源重復(fù)發(fā)射item,直到?jīng)]有異常或者達(dá)到所指定的次數(shù)
Observable.just(1, 2, 3, 4)
    .retry(3)
    .subscribe(observer)
retryUntil
  • 發(fā)生異常時(shí),返回值是false表示繼續(xù)執(zhí)行(重復(fù)發(fā)射數(shù)據(jù)),true不再執(zhí)行,但會(huì)調(diào)用onError方法;
var temp = 0
Observable.just(1, 2, 3, 4)
    .map {
        temp = it
        it
    }
    .retryUntil {
        temp > 3
    }
    .subscribe(observer)
retryWhen
  • 遇到錯(cuò)誤時(shí),將發(fā)生的錯(cuò)誤傳遞給一個(gè)新的被觀察者(Observable),并決定是否需要重新訂閱原始被觀察者(Observable)& 發(fā)送事件;
Observable.just(1, 2, 3.2, 4)
    .map { it.toInt() }
    .retryWhen {
        it.flatMap { throwable ->
            if (throwable is NumberFormatException) {
                Observable.error(Throwable("retryWhen終止"))
            } else {
                Observable.just(5, 6, 7)
            }
        }
    }
    .subscribe(observer)

6. 輔助操作:

delay
  • 延遲一段時(shí)間發(fā)射結(jié)果數(shù)據(jù);
Observable.just(1, 2, 3)
    .delay(1,TimeUnit.SECONDS)
    .subscribe(observer)
do
  • 在某個(gè)事件的生命周期中調(diào)用;
  • doOnEach:數(shù)據(jù)源(Observable)每發(fā)送一次數(shù)據(jù),就調(diào)用一次;
  • doOnNext:數(shù)據(jù)源每次調(diào)用onNext() 之前都會(huì)先回調(diào)該方法;
  • doOnError:數(shù)據(jù)源每次調(diào)用onError() 之前會(huì)回調(diào)該方法;
  • doOnComplete:數(shù)據(jù)源每次調(diào)用onComplete() 之前會(huì)回調(diào)該方法;
  • doOnSubscribe:數(shù)據(jù)源每次調(diào)用onSubscribe() 之后會(huì)回調(diào)該方法;
  • doOnDispose:數(shù)據(jù)源每次調(diào)用dispose() 之后會(huì)回調(diào)該方法;
Observable.just(1, 2, 3, 4)
    .observeOn(Schedulers.io())
    .subscribeOn(AndroidSchedulers.mainThread())
    .doOnSubscribe { LjyLogUtil.d("${Thread.currentThread().name}_doOnSubscribe") }
    .doOnEach { LjyLogUtil.d("${Thread.currentThread().name}_doOnEach:$it") }
    .doOnNext { LjyLogUtil.d("${Thread.currentThread().name}_doOnNext:$it") }
    .doOnError { LjyLogUtil.d("${Thread.currentThread().name}_doOnError:${it.localizedMessage}") }
    .doOnComplete { LjyLogUtil.d("${Thread.currentThread().name}_doOnComplete") }
    .doOnDispose { LjyLogUtil.d("${Thread.currentThread().name}_doOnDispose") }
    .subscribe(observer)

7. 條件和布爾操作:

all
  • 判斷Observable發(fā)射的所有的數(shù)據(jù)項(xiàng)是否都滿足某個(gè)條件;
Observable.just(1, -2, 3)
    .all {
        it > 0
    }.subscribe { it ->
        LjyLogUtil.d("${Thread.currentThread().name}_onSuccess:$it")
    }
takeWhile & skipWhile & takeUntil & skipUntil
// 從0開始每1s發(fā)送1個(gè)數(shù)據(jù)
Observable.interval(1, TimeUnit.SECONDS)
    //條件滿足時(shí),取數(shù)據(jù)
    .takeWhile {
        it<10
    }
Observable.interval(1, TimeUnit.SECONDS)
    //滿足條件時(shí),跳過(guò)數(shù)據(jù)
    .skipWhile {
        it<10
    }
Observable.interval(1, TimeUnit.SECONDS)
   //取數(shù)據(jù),直到滿足條件
    .takeUntil {
        it>10
    }
Observable.interval(1, TimeUnit.SECONDS)
    //等待直到傳入的Observable開始發(fā)送數(shù)據(jù)
    .skipUntil (Observable.timer(5, TimeUnit.SECONDS))
SequenceEqual:
  • 判斷兩個(gè)Observable是否按相同的數(shù)據(jù)序列;
Observable.sequenceEqual(
    Observable.just(4, 5, 6),
    Observable.just(4, 5, 6)
).subscribe { it ->
    LjyLogUtil.d("${Thread.currentThread().name}_onSuccess:$it")
}
contains
  • 判斷Observable是否會(huì)發(fā)射一個(gè)指定的數(shù)據(jù)項(xiàng);
Observable.just(1, -2, 3)
    .contains(2)
    .subscribe { it ->
        LjyLogUtil.d("${Thread.currentThread().name}_onSuccess:$it")
    }
isEmpty
  • 判斷發(fā)送的數(shù)據(jù)是否為空;
Observable.just(1, -2, 3)
    .isEmpty()
    .subscribe { it ->
        LjyLogUtil.d("${Thread.currentThread().name}_onSuccess:$it")
    }
amb

-給定多個(gè)Observable,只讓第一個(gè)發(fā)射數(shù)據(jù)的Observable發(fā)射全部數(shù)據(jù);

val list: MutableList<ObservableSource<Int>> = ArrayList()
list.add(Observable.just(1, 2, 3).delay(1, TimeUnit.SECONDS))
list.add(Observable.just(4, 5, 6))
Observable.amb(list).subscribe{
    LjyLogUtil.d("${Thread.currentThread().name}_onSuccess:$it")
}
defaultIfEmpty
  • 發(fā)射來(lái)自原始Observable的數(shù)據(jù),如果原始Observable沒(méi)有發(fā)射數(shù)據(jù),就發(fā)射一個(gè)默認(rèn)數(shù)據(jù);
//在不發(fā)送onNext事件, 僅發(fā)送onComplete事件
Observable.empty<Int>()
    .defaultIfEmpty(-1)
    .subscribe { it ->
        LjyLogUtil.d("${Thread.currentThread().name}_onSuccess:$it")
    }
  • SkipUntil:丟棄原始Observable發(fā)射的數(shù)據(jù),直到第二個(gè)Observable發(fā)射了一個(gè)數(shù)據(jù),然后發(fā)射原始Observable的剩余數(shù)據(jù);

實(shí)戰(zhàn)

結(jié)合 RxBinding 使用

  • RxBinding是對(duì) Android View 事件的擴(kuò)展, 它使得開發(fā)者可以對(duì) View 事件使用 RxJava 的各種操作;
1. 添加依賴
//RxBinding
implementation 'com.jakewharton.rxbinding4:rxbinding:4.0.0'
//Google 'material' library bindings:
implementation 'com.jakewharton.rxbinding4:rxbinding-material:4.0.0'
//AndroidX library bindings:
implementation 'com.jakewharton.rxbinding4:rxbinding-core:4.0.0'
implementation 'com.jakewharton.rxbinding4:rxbinding-appcompat:4.0.0'
implementation 'com.jakewharton.rxbinding4:rxbinding-drawerlayout:4.0.0'
implementation 'com.jakewharton.rxbinding4:rxbinding-leanback:4.0.0'
implementation 'com.jakewharton.rxbinding4:rxbinding-recyclerview:4.0.0'
implementation 'com.jakewharton.rxbinding4:rxbinding-slidingpanelayout:4.0.0'
implementation 'com.jakewharton.rxbinding4:rxbinding-swiperefreshlayout:4.0.0'
implementation 'com.jakewharton.rxbinding4:rxbinding-viewpager:4.0.0'
implementation 'com.jakewharton.rxbinding4:rxbinding-viewpager2:4.0.0'
2. 按鈕防抖
val btn1 = findViewById<Button>(R.id.btn_1)
btn1.clicks()
    .throttleFirst(2, TimeUnit.SECONDS)
    .subscribeOn(AndroidSchedulers.mainThread())
    .subscribe {
        LjyLogUtil.d("點(diǎn)擊按鈕")
    }
3. editText輸入監(jiān)聽(tīng)
//也可用作聯(lián)想搜索優(yōu)化
val et1 = findViewById<EditText>(R.id.et_1)
et1.textChanges()
    .debounce(1, TimeUnit.SECONDS)
    //跳過(guò)第1次請(qǐng)求 因?yàn)槌跏驾斎肟虻目兆址麪顟B(tài)
    .skip(1)
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe{
        LjyLogUtil.d("${Thread.currentThread().name}_onSuccess:$it")
    }
4. 聯(lián)合/表單判斷
val etName = findViewById<EditText>(R.id.et_name)
val etPwd = findViewById<EditText>(R.id.et_pwd)
val obName = etName.textChanges()
val obPwd = etPwd.textChanges()
Observable.combineLatest(
    obName, obPwd, { name, pwd -> name == "ljy" && pwd == "123" })
    //跳過(guò)第1次請(qǐng)求 因?yàn)槌跏驾斎肟虻目兆址麪顟B(tài)
    .skip(1)
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe { isLogin -> LjyLogUtil.d(if (isLogin) "登錄成功" else "登錄失敗") }
5. 定時(shí)器任務(wù)
val time = 10L
val btnLogin = findViewById<Button>(R.id.btn_login)
btnLogin.clicks()
    .throttleFirst(time, TimeUnit.SECONDS)
    .subscribeOn(AndroidSchedulers.mainThread())
    .doOnNext { btnLogin.isEnabled = false }
    .subscribe {
        LjyLogUtil.d("點(diǎn)擊登錄")
        Observable.intervalRange(
            0, time, 0, 1,
            TimeUnit.SECONDS, AndroidSchedulers.mainThread()
        )
            .subscribe(
                { btnLogin.text = "剩余${time - it}秒" },
                { LjyLogUtil.e(it.message) },
                {
                    btnLogin.text = "獲取驗(yàn)證碼"
                    btnLogin.isEnabled = true
                })
    }

利用RxLifecycle解決內(nèi)存泄漏問(wèn)題

1. 添加依賴
//RxLifecycle
    implementation 'com.trello.rxlifecycle4:rxlifecycle:4.0.2'
// If you want to bind to Android-specific lifecycles
    implementation 'com.trello.rxlifecycle4:rxlifecycle-android:4.0.2'
// If you want pre-written Activities and Fragments you can subclass as providers
    implementation 'com.trello.rxlifecycle4:rxlifecycle-components:4.0.2'
// If you want pre-written support preference Fragments you can subclass as providers
    implementation 'com.trello.rxlifecycle4:rxlifecycle-components-preference:4.0.2'
// If you want to use Android Lifecycle for providers
    implementation 'com.trello.rxlifecycle4:rxlifecycle-android-lifecycle:4.0.2'
// If you want to use Kotlin syntax
    implementation 'com.trello.rxlifecycle4:rxlifecycle-kotlin:4.0.2'
// If you want to use Kotlin syntax with Android Lifecycle
    implementation 'com.trello.rxlifecycle4:rxlifecycle-android-lifecycle-kotlin:4.0.2'
2. 綁定
Observable.intervalRange(0, 60, 0, 1, TimeUnit.SECONDS)
    //利用RxLifecycle來(lái)解決內(nèi)存泄漏問(wèn)題
    //bindToLifecycle的自動(dòng)取消訂閱示例
    //.compose(bindToLifecycle())
    //手動(dòng)設(shè)置在activity onPause的時(shí)候取消訂閱
    .compose(this.bindUntilEvent(ActivityEvent.PAUSE))
    .subscribe(
        { LjyLogUtil.d("剩余${60 - it}秒") },
        { LjyLogUtil.e(it.message) },
        { LjyLogUtil.d("完成") }
    )

網(wǎng)絡(luò)請(qǐng)求

1. 網(wǎng)絡(luò)請(qǐng)求嵌套回調(diào)
//使用observeOn多次切換線程
apiService.searchRepo(emptyMap()) // 請(qǐng)求搜索列表
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .doOnNext {
        //展示列表
    }
    .observeOn(Schedulers.io())
    .flatMap {
        Observable.fromIterable(it.items).map { repo -> repo.id }
    }
    .flatMap {
        apiService.getItem(it) // 請(qǐng)求詳情
    }
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe {
        //展示詳情
    }
2. 網(wǎng)絡(luò)請(qǐng)求輪詢
//每60秒一次,無(wú)限輪詢
Observable.interval(60, TimeUnit.SECONDS)
    .doOnNext {
        apiService.searchRepo(emptyMap())
            .subscribe(Consumer { LjyLogUtil.d("accept: ${it.items}") })
    }.subscribe { LjyLogUtil.d("第 $it 次輪詢") }
//有條件輪詢
var count = 0
apiService.searchRepo(emptyMap())
    .repeatWhen {
        it.flatMap {
            if (count > 3) {
                Observable.error(Throwable("輪詢結(jié)束"))
            } else {
                LjyLogUtil.d("第 $count 次輪詢")
                Observable.just(1).delay(60, TimeUnit.SECONDS)
            }
        }
    }
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe({
        LjyLogUtil.d("accept: ${it.items}")
        count++
    }, {
        LjyLogUtil.d(it.message)
    })
3. 網(wǎng)絡(luò)請(qǐng)求出錯(cuò)重連
//最大重試次數(shù)
val maxConnectCount = 10
//已重試次數(shù)
var currentRetryCount = 0
apiService.searchRepo(emptyMap())
    .retryWhen {
        it.flatMap { throwable ->
            //根據(jù)異常類型選擇是否重試
            if (throwable is IOException) {
                if (currentRetryCount < maxConnectCount) {
                    currentRetryCount++
                    //遇到的異常越多,重試延遲間隔時(shí)間越長(zhǎng)
                    Observable.just(1).delay(currentRetryCount * 60L, TimeUnit.SECONDS)
                } else {
                    Observable.error(Throwable("重試結(jié)束"))
                }
            } else {
                Observable.error(Throwable("發(fā)生了非網(wǎng)絡(luò)異常(非I/O異常)"))
            }
        }
    }
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe({
        LjyLogUtil.d("accept: ${it.items}")
    }, {
        LjyLogUtil.d(it.message)
    })
4. 內(nèi)存,磁盤,網(wǎng)絡(luò) 三級(jí)緩存
var memoryCache: String? = null
var diskCache: String? = null
private fun test10() {
    val memory = Observable.create<String> {
        if (memoryCache != null) {
            it.onNext(memoryCache)
        } else {
            it.onComplete()
        }
    }

    val disk = Observable.create<String> {
        if (diskCache != null) {
            it.onNext(diskCache)
        } else {
            it.onComplete()
        }
    }.doOnNext {
        memoryCache = "內(nèi)存數(shù)據(jù)"
    }

    val net = Observable.just("網(wǎng)絡(luò)數(shù)據(jù)")
        .doOnNext {
            // memoryCache="內(nèi)存數(shù)據(jù)"
            diskCache = "磁盤數(shù)據(jù)"
        }
    //通過(guò)concat()合并memory、disk、network 3個(gè)被觀察者的事件(即檢查內(nèi)存緩存、磁盤緩存 & 發(fā)送網(wǎng)絡(luò)請(qǐng)求)
    Observable.concat(memory, disk, net)
        //通過(guò)firstElement(),從串聯(lián)隊(duì)列中取出并發(fā)送第1個(gè)有效事件(Next事件),即依次判斷檢查memory、disk、network
        .firstElement()
        .subscribe {
            LjyLogUtil.d("accept: $it")
        }
}
5. 合并數(shù)據(jù)源 & 同時(shí)展示
var result = "數(shù)據(jù)來(lái)自:"
val net = Observable.just("網(wǎng)絡(luò)")
val disk = Observable.just("磁盤")
//使用merge,從網(wǎng)絡(luò)和本地獲取數(shù)據(jù)并展示
Observable.merge(net, disk)
    .subscribe({
        result += "$it, "
    }, {

    }, {
        LjyLogUtil.d("result: $result")
    })
//使用zip,合并2個(gè)網(wǎng)絡(luò)請(qǐng)求向獲取數(shù)據(jù)并展示
val repo1 = apiService.getItem(1001).subscribeOn(Schedulers.io())
val repo2 = apiService.getItem(1002).subscribeOn(Schedulers.io())
Observable.zip(
    repo1, repo2, { data1, data2 ->
        val repoList = ArrayList<RepoDetail>()
        repoList.add(data1)
        repoList.add(data2)
        repoList
    })
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe {
        for (repoDetail in it) {
            LjyLogUtil.d("result: ${repoDetail.name}")
        }
    }

源碼解析

入口

  • 以下面代碼為源碼閱讀入口,Single是最簡(jiǎn)單的被觀察者;
Single.just(1)
    .subscribe(object : SingleObserver<String> {
        override fun onSubscribe(d: Disposable) {
            LjyLogUtil.d("${Thread.currentThread().name}_onSubscribe")
        }

        override fun onSuccess(t: String) {
            LjyLogUtil.d("${Thread.currentThread().name}_onSuccess:$t")
        }

        override fun onError(e: Throwable) {
            LjyLogUtil.d("${Thread.currentThread().name}_onError:${e.message}")
        }
    })

被觀察者的創(chuàng)建

  • 先看上面代碼中被觀察者的創(chuàng)建方法:Single.just();
public static <@NonNull T> Single<T> just(T item) {
    Objects.requireNonNull(item, "item is null");//判空
    return RxJavaPlugins.onAssembly(new SingleJust<>(item));
}
  • 其中第一行為判空,第二行RxJavaPlugins.onAssembly為一個(gè)鉤子方法,方便添加一些額外操作,代碼如下:
@Nullable
static volatile Function<? super Single, ? extends Single> onSingleAssembly;

public static <T> Single<T> onAssembly(@NonNull Single<T> source) {
    Function<? super Single, ? extends Single> f = onSingleAssembly;
    if (f != null) {
        return apply(f, source);
    }
    return source;
}

  • 其中onSingleAssembly默認(rèn)為空,所以實(shí)際默認(rèn)是直接返回source,也就是SingleJust;
  • 所以Single.just就是直接創(chuàng)建了一個(gè)被觀察者的實(shí)現(xiàn)類SingleJust的實(shí)例并返回,那么我們看一下SingleJust的代碼;
public final class SingleJust<T> extends Single<T> {
    final T value;

    public SingleJust(T value) {
        this.value = value;
    }

    @Override
    protected void subscribeActual(SingleObserver<? super T> observer) {
        observer.onSubscribe(Disposable.disposed());
        observer.onSuccess(value);
    }
}
  • 很簡(jiǎn)單吧,繼承了Single,并重寫了Single唯一的抽象方法subscribeActual,有個(gè)泛型變量value;
  • Single的代碼很多,基本都是上面介紹的操作符方法的實(shí)現(xiàn),用到時(shí)可以點(diǎn)進(jìn)源碼看一看;
public abstract class Single<@NonNull T> implements SingleSource<T> {
     ...
     protected abstract void subscribeActual(@NonNull SingleObserver<? super T> observer);
     ...
      public final void subscribe(@NonNull SingleObserver<? super T> observer) {
         ...
      }
}
  • Single實(shí)現(xiàn)了SingleSource接口,并實(shí)現(xiàn)subscribe方法;
public interface SingleSource<@NonNull T> {

    /**
     * Subscribes the given {@link SingleObserver} to this {@link SingleSource} instance.
     * @param observer the {@code SingleObserver}, not {@code null}
     * @throws NullPointerException if {@code observer} is {@code null}
     */
    void subscribe(@NonNull SingleObserver<? super T> observer);
}

訂閱

  • 下面來(lái)看看開頭例子中的第二行的subscribe方法:Single.subscribe();
public final void subscribe(@NonNull SingleObserver<? super T> observer) {
    //判空
    Objects.requireNonNull(observer, "observer is null");
    //鉤子方法,默認(rèn)還是入?yún)⒌腟ingleObserver
    observer = RxJavaPlugins.onSubscribe(this, observer);
    //判空
    Objects.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null SingleObserver. Please check the handler provided to RxJavaPlugins.setOnSingleSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");

    try {
        subscribeActual(observer);
    } catch (NullPointerException ex) {
        throw ex;
    } catch (Throwable ex) {
        Exceptions.throwIfFatal(ex);
        NullPointerException npe = new NullPointerException("subscribeActual failed");
        npe.initCause(ex);
        throw npe;
    }
}
  • 可以看到訂閱方法subscribe中最有用的一行就是調(diào)用了subscribeActual,而這里的subscribeActual的實(shí)現(xiàn)正式上面的SingleJust中的實(shí)現(xiàn);
@Override
protected void subscribeActual(SingleObserver<? super T> observer) {
    observer.onSubscribe(Disposable.disposed());
    observer.onSuccess(value);
}
  • 可以看到SingleJust中的subscribeActual直接連著調(diào)用了observer的onSubscribe和onSuccess方法,并且onSubscribe的入?yún)橐粋€(gè)disposed的Disposable,是個(gè)已取消的Disposable,因?yàn)閖ust是沒(méi)有延遲的,無(wú)需取消和修改,而且onError是不會(huì)被調(diào)用的,因?yàn)樗舶l(fā)一個(gè)也不會(huì)有出錯(cuò)的可能;

操作符實(shí)現(xiàn)

map

  • 以map為例,對(duì)之前的示例擴(kuò)展一下,增加個(gè)map操作符對(duì)數(shù)據(jù)進(jìn)行轉(zhuǎn)換,map搞懂了,其他操作符的主要思路也是一樣的;
Single.just(1)
    .map { "num_$it" }
    .subscribe { num ->
        LjyLogUtil.d(num)
    }
  • 那么看一下map的實(shí)現(xiàn):
public final <@NonNull R> Single<R> map(@NonNull Function<? super T, ? extends R> mapper) {
    Objects.requireNonNull(mapper, "mapper is null");
    return RxJavaPlugins.onAssembly(new SingleMap<>(this, mapper));
}
  • map中也是有判空和鉤子,創(chuàng)建了一個(gè)SingleMap并返回,其入?yún)⒌谝淮螀?shù)為this,也就是just返回的SingleJust實(shí)例,第二個(gè)參數(shù)為數(shù)據(jù)轉(zhuǎn)換的轉(zhuǎn)換器;
  • 那么來(lái)看一下SingleMap:
public final class SingleMap<T, R> extends Single<R> {
    final SingleSource<? extends T> source;

    final Function<? super T, ? extends R> mapper;

    public SingleMap(SingleSource<? extends T> source, Function<? super T, ? extends R> mapper) {
        this.source = source;
        this.mapper = mapper;
    }

    @Override
    protected void subscribeActual(final SingleObserver<? super R> t) {
        source.subscribe(new MapSingleObserver<T, R>(t, mapper));
    }
    ...//省略內(nèi)部類MapSingleObserver
}
  • 可以看到SingleMap的subscribeActual中只有一行代碼,就是調(diào)用source的訂閱方法subscribe,并傳入一個(gè)觀察者M(jìn)apSingleObserver實(shí)例和SingleMap持有的數(shù)據(jù)轉(zhuǎn)換器,其中source也就是前一步的被觀察者SingleJust實(shí)例;
  • 而觀察者M(jìn)apSingleObserver則是負(fù)責(zé)對(duì)上下游數(shù)據(jù)進(jìn)行轉(zhuǎn)換 和傳遞,其入?yún)⑾掠蝧ubscribe傳入的觀察者SingleObserver實(shí)例,來(lái)看一下它的實(shí)現(xiàn):
static final class MapSingleObserver<T, R> implements SingleObserver<T> {
    final SingleObserver<? super R> t;
    final Function<? super T, ? extends R> mapper;

    MapSingleObserver(SingleObserver<? super R> t, Function<? super T, ? extends R> mapper) {
        this.t = t;
        this.mapper = mapper;
    }

    @Override
    public void onSubscribe(Disposable d) {
        t.onSubscribe(d);
    }

    @Override
    public void onSuccess(T value) {
        R v;
        try {
            //mapper.apply數(shù)據(jù)轉(zhuǎn)換器對(duì)數(shù)據(jù)進(jìn)行轉(zhuǎn)換
            v = Objects.requireNonNull(mapper.apply(value), "The mapper function returned a null value.");
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            onError(e);
            return;
        }

        t.onSuccess(v);
    }

    @Override
    public void onError(Throwable e) {
        t.onError(e);
    }
}
  • 可以看到MapSingleObserver的onSuccess中調(diào)用數(shù)據(jù)轉(zhuǎn)換器mapper的apply方法對(duì)數(shù)據(jù)進(jìn)行轉(zhuǎn)換,
    并調(diào)用構(gòu)造函數(shù)傳入的SingleObserver的onSuccess將轉(zhuǎn)換后的數(shù)據(jù)傳遞過(guò)去,而onSubscribe,onError方法則是直接調(diào)用了SingleObserver的方法
小結(jié)
  • 那么示例代碼中map前后整個(gè)流程串起來(lái)就是:
  1. Single.just創(chuàng)建了一個(gè)被觀察者SingleJust,負(fù)責(zé)發(fā)送原始數(shù)據(jù);
  2. map創(chuàng)建了一個(gè)被觀察者SingleMap,構(gòu)造函數(shù)入?yún)⑹巧弦徊降腟ingleJust和數(shù)據(jù)轉(zhuǎn)換器,也就是對(duì)SingleJust進(jìn)行了包裝,或者說(shuō)接管,如何接管的呢,SingleMap中調(diào)用了SingleJust的訂閱方法subscribe,并傳入觀察者M(jìn)apSingleObserver,也就是對(duì)SingleJust說(shuō),小j啊,你這項(xiàng)目給我(SingleMap)了,后面的客戶你不用跟了,把你手里的數(shù)據(jù)給我小弟MapSingleObserver就行了,以后需要啥他找你要;
  3. 后面再調(diào)用subscribe呢,實(shí)際是調(diào)用SingleMap的subscribe,也就是后面再有客戶其實(shí)都是跟SingleMap簽的單子了
  4. SingleMap收到單子呢轉(zhuǎn)手就給小弟MapSingleObserver了,反正SingleJust的數(shù)據(jù)也在你手里呢你看看咋給客戶服務(wù)吧
  5. MapSingleObserver就拿著SingleJust的數(shù)據(jù)進(jìn)行轉(zhuǎn)換,然后調(diào)用客戶(下游subscribe傳入的觀察者)的回調(diào)方法進(jìn)行服務(wù)
  • ps:
    • MapSingleObserver:同樣是觀察者,我也太累了,又負(fù)責(zé)接收上游的數(shù)據(jù),又要進(jìn)行處理,處理完還得反饋給下游的客戶;
    • SingleJust:你累?同樣是被觀察者,我辛辛苦苦做的項(xiàng)目,還不是被SingleMap拿走了,還讓你監(jiān)視(觀察)我,這點(diǎn)業(yè)績(jī)都被內(nèi)部消化了啊,而且項(xiàng)目開始了還把我工號(hào)留給客戶( t.onSubscribe(d)),那客戶是說(shuō)停就停啊,我這出錯(cuò)了你也是轉(zhuǎn)頭就給客戶說(shuō)?。?t.onError(e));
    • 這哪是RxJava,這分明是職場(chǎng)啊;

subscribeOn & observeOn

  • 在對(duì)之前的示例擴(kuò)展一下,加入線程切換,那就是用到了subscribeOn & observeOn 操作符了;
Single.just(1)
    .map { "num_$it" }
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe { num ->
        LjyLogUtil.d(num)
    }
subscribeOn
  • 先來(lái)看看subscribeOn的入?yún)chedulers.io():
public static Scheduler io() {
    return RxJavaPlugins.onIoScheduler(IO);
}

static {
    SINGLE = RxJavaPlugins.initSingleScheduler(new SingleTask());

    COMPUTATION = RxJavaPlugins.initComputationScheduler(new ComputationTask());

    IO = RxJavaPlugins.initIoScheduler(new IOTask());

    TRAMPOLINE = TrampolineScheduler.instance();

    NEW_THREAD = RxJavaPlugins.initNewThreadScheduler(new NewThreadTask());
}

static final class IOTask implements Supplier<Scheduler> {
    @Override
    public Scheduler get() {
        return IoHolder.DEFAULT;
    }
}

static final class IoHolder {
    static final Scheduler DEFAULT = new IoScheduler();
}

public final class IoScheduler extends Scheduler {
    ...
}
  • 可以看到最后返回的是Scheduler的實(shí)現(xiàn)類IoScheduler實(shí)例;
  • 再來(lái)看看subscribeOn方法的實(shí)現(xiàn):
public final Single<T> subscribeOn(@NonNull Scheduler scheduler) {
    Objects.requireNonNull(scheduler, "scheduler is null");
    return RxJavaPlugins.onAssembly(new SingleSubscribeOn<>(this, scheduler));
}
  • 還是判空和鉤子,然后創(chuàng)建一個(gè)SingleSubscribeOn并返回,入?yún)⑹巧嫌蔚谋挥^察者(this),和線程調(diào)度器Scheduler;
public final class SingleSubscribeOn<T> extends Single<T> {
    final SingleSource<? extends T> source;

    final Scheduler scheduler;

    public SingleSubscribeOn(SingleSource<? extends T> source, Scheduler scheduler) {
        this.source = source;
        this.scheduler = scheduler;
    }

    @Override
    protected void subscribeActual(final SingleObserver<? super T> observer) {
        //創(chuàng)建一個(gè)觀察者SubscribeOnObserver的實(shí)例,傳入上游的被觀察者source
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<>(observer, source);
        //調(diào)用下游的觀察者observer的onSubscribe,通知其開始訂閱了,取消的話可以找SubscribeOnObserver
        observer.onSubscribe(parent);
        //調(diào)用IoScheduler的scheduleDirect方法切換線程
        Disposable f = scheduler.scheduleDirect(parent);

        parent.task.replace(f);

    }
    ...//省略內(nèi)部類SubscribeOnObserver
}
  • 上面subscribeActual中創(chuàng)建了一個(gè)被觀察者SubscribeOnObserver,并且調(diào)用scheduler.scheduleDirect切換線程;
public Disposable scheduleDirect(@NonNull Runnable run) {
    return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
}

public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
    final Worker w = createWorker();
    final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
    DisposeTask task = new DisposeTask(decoratedRun, w);
    w.schedule(task, delay, unit);
    return task;
}

//其中DisposeTask為Scheduler的內(nèi)部類
static final class DisposeTask implements Disposable, Runnable, SchedulerRunnableIntrospection {
    ...
    DisposeTask(@NonNull Runnable decoratedRun, @NonNull Worker w) {
        this.decoratedRun = decoratedRun;
        this.w = w;
    }

    @Override
    public void run() {
        runner = Thread.currentThread();
        try {
            try {
                decoratedRun.run();
            } catch (Throwable ex) {
                // Exceptions.throwIfFatal(e); nowhere to go
                RxJavaPlugins.onError(ex);
                throw ex;
            }
        } finally {
            dispose();
            runner = null;
        }
    }
    ...
}
  • 其中scheduleDirect的入?yún)⑹莻€(gè)Runnable,而SubscribeOnObserver實(shí)現(xiàn)了Runnable接口和SingleObserver接口,構(gòu)造方法入?yún)⑹窍掠蔚挠^察者和上游的被觀察者,在自己的onSuccess,onError中直接傳給下游的觀察者,在run方法中訂閱上游的被觀察者;
static final class SubscribeOnObserver<T>
    extends AtomicReference<Disposable>
    implements SingleObserver<T>, Disposable, Runnable {
    ...
    SubscribeOnObserver(SingleObserver<? super T> actual, SingleSource<? extends T> source) {
        this.downstream = actual;
        this.source = source;
        this.task = new SequentialDisposable();
    }
    ...
    @Override
    public void onSuccess(T value) {
        downstream.onSuccess(value);
    }

    @Override
    public void onError(Throwable e) {
        downstream.onError(e);
    }
    ...
    @Override
    public void run() {
        source.subscribe(this);
    }
}
observeOn
  • 先來(lái)看看Android專用的線程調(diào)度器AndroidSchedulers.mainThread():
//通過(guò)靜態(tài)內(nèi)部類提供HandlerScheduler的單例
public final class AndroidSchedulers {
    //開放的入口方法
    public static Scheduler mainThread() {
        //鉤子
        return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);
    }
    //私有的靜態(tài)變量
    private static final Scheduler MAIN_THREAD =
          RxAndroidPlugins.initMainThreadScheduler(() -> MainHolder.DEFAULT);
    //私有的靜態(tài)內(nèi)部類
    private static final class MainHolder {
        //本質(zhì)還說(shuō)離不開Handler,通過(guò)Looper.getMainLooper()切換到UI線程
        static final Scheduler DEFAULT
            = new HandlerScheduler(new Handler(Looper.getMainLooper()), true);
    }
    //私有的構(gòu)造方法,并拋出異常,因?yàn)檫@里是創(chuàng)建HandlerScheduler的單例,而不是AndroidSchedulers本身
    private AndroidSchedulers() {
        throw new AssertionError("No instances.");
    }
}

final class HandlerScheduler extends Scheduler {
    private final Handler handler;
    private final boolean async;

    HandlerScheduler(Handler handler, boolean async) {
        this.handler = handler;
        this.async = async;
    }

    public Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit) {
        ...//省略判空和鉤子方法
        ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
        Message message = Message.obtain(handler, scheduled);
        if (async) {
            message.setAsynchronous(true);
        }
        //通過(guò)主線程的handler發(fā)送消息
        handler.sendMessageDelayed(message, unit.toMillis(delay));
        return scheduled;
    }

    @Override
    public Worker createWorker() {
        return new HandlerWorker(handler, async);
    }

    private static final class HandlerWorker extends Worker {
        private final Handler handler;
        private final boolean async;

        private volatile boolean disposed;

        HandlerWorker(Handler handler, boolean async) {
            this.handler = handler;
            this.async = async;
        }

        public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
            ...
            if (disposed) {
                return Disposable.disposed();
            }
            ...
            ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
            Message message = Message.obtain(handler, scheduled);
            message.obj = this;
            ...
            handler.sendMessageDelayed(message, unit.toMillis(delay));

            if (disposed) {
                handler.removeCallbacks(scheduled);
                return Disposable.disposed();
            }

            return scheduled;
        }

        @Override
        public void dispose() {
            disposed = true;
            handler.removeCallbacksAndMessages(this /* token */);
        }

        @Override
        public boolean isDisposed() {
            return disposed;
        }
    }

    private static final class ScheduledRunnable implements Runnable, Disposable {
        private final Handler handler;
        private final Runnable delegate;

        private volatile boolean disposed;

        ScheduledRunnable(Handler handler, Runnable delegate) {
            this.handler = handler;
            this.delegate = delegate;
        }

        @Override
        public void run() {
            try {
                delegate.run();
            } catch (Throwable t) {
                RxJavaPlugins.onError(t);
            }
        }

        @Override
        public void dispose() {
            handler.removeCallbacks(this);
            disposed = true;
        }

        @Override
        public boolean isDisposed() {
            return disposed;
        }
    }
}
  • 然后來(lái)看observeOn方法:
public final Single<T> observeOn(@NonNull Scheduler scheduler) {
    Objects.requireNonNull(scheduler, "scheduler is null");
    return RxJavaPlugins.onAssembly(new SingleObserveOn<>(this, scheduler));
}
  • 本質(zhì)還是創(chuàng)建了一個(gè)被觀察者:
public final class SingleObserveOn<T> extends Single<T> {

    final SingleSource<T> source;

    final Scheduler scheduler;

    public SingleObserveOn(SingleSource<T> source, Scheduler scheduler) {
        this.source = source;
        this.scheduler = scheduler;
    }

    @Override
    protected void subscribeActual(final SingleObserver<? super T> observer) {
        //調(diào)用上游被觀察者source的訂閱方法subscribe,傳入一個(gè)觀察者ObserveOnSingleObserver,其入?yún)⑹窍掠蔚挠^察者observer
        source.subscribe(new ObserveOnSingleObserver<>(observer, scheduler));
    }
    ...//省略內(nèi)部類ObserveOnSingleObserver
}
  • ObserveOnSingleObserver代碼如下, 在自己的onSubscribe調(diào)用下游觀察者的onSubscribe,在自己的onSuccess和onError中調(diào)用scheduler.scheduleDirect切換線程,scheduleDirect的入?yún)⑹荝unnable,會(huì)調(diào)用自己的run方法,而在run方法中調(diào)用了下游觀察者的onSuccess和onError傳遞結(jié)果,以此實(shí)現(xiàn)改變下游觀察者的線程;
static final class ObserveOnSingleObserver<T> extends AtomicReference<Disposable>
implements SingleObserver<T>, Disposable, Runnable {
    ...
    ObserveOnSingleObserver(SingleObserver<? super T> actual, Scheduler scheduler) {
        this.downstream = actual;
        this.scheduler = scheduler;
    }

    @Override
    public void onSubscribe(Disposable d) {
        if (DisposableHelper.setOnce(this, d)) {
            downstream.onSubscribe(this);
        }
    }

    @Override
    public void onSuccess(T value) {
        this.value = value;
        Disposable d = scheduler.scheduleDirect(this);
        DisposableHelper.replace(this, d);
    }

    @Override
    public void onError(Throwable e) {
        this.error = e;
        Disposable d = scheduler.scheduleDirect(this);
        DisposableHelper.replace(this, d);
    }

    @Override
    public void run() {
        Throwable ex = error;
        if (ex != null) {
            downstream.onError(ex);
        } else {
            downstream.onSuccess(value);
        }
    }
   ...
}

番外:Single.create

  • 看了上面的Single.just, 可能有種被忽悠的感覺(jué),也太low了吧,這么簡(jiǎn)單么,那么我們不妨再來(lái)看看Single.create:
public static <@NonNull T> Single<T> create(@NonNull SingleOnSubscribe<T> source) {
    Objects.requireNonNull(source, "source is null");
    return RxJavaPlugins.onAssembly(new SingleCreate<>(source));
}
  • 同樣的create方法中創(chuàng)建了一個(gè)被觀察者SingleCreate并返回:
public final class SingleCreate<T> extends Single<T> {

    final SingleOnSubscribe<T> source;

    public SingleCreate(SingleOnSubscribe<T> source) {
        this.source = source;
    }

    @Override
    protected void subscribeActual(SingleObserver<? super T> observer) {
        Emitter<T> parent = new Emitter<>(observer);
        observer.onSubscribe(parent);

        try {
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }
    ...//省略了內(nèi)部類Emitter
}
  • 看了上面代碼,create和just有什么區(qū)別呢,just{}中的代碼是直接賦值給value的,不需要下游subscribe就會(huì)執(zhí)行,而create是通過(guò)匿名內(nèi)部類傳入一個(gè)SingleOnSubscribe的實(shí)例,并在實(shí)現(xiàn)的subscribe方法中產(chǎn)生數(shù)據(jù),在SingleCreate.subscribeActual中調(diào)用SingleOnSubscribe.subscribe,SingleCreate.subscribeActual則是在下游subscribe時(shí)才會(huì)執(zhí)行,也就是Single.create在下游訂閱后才開始產(chǎn)生數(shù)據(jù);那么顯然,Observable.just()不適合封裝網(wǎng)絡(luò)數(shù)據(jù),因?yàn)槲覀兺ǔ2幌朐趕ubscribe之前做網(wǎng)絡(luò)請(qǐng)求。

參考

我是今陽(yáng),如果想要進(jìn)階和了解更多的干貨,歡迎關(guān)注微信公眾號(hào) “今陽(yáng)說(shuō)” 接收我的最新文章

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容