RxJava2學(xué)習(xí)總結(jié)——操作符和應(yīng)用場(chǎng)景

前言

2017年,剛畢業(yè)的我在項(xiàng)目組里打下手,組里其他人決定在新項(xiàng)目里使用RxJava,那時(shí)網(wǎng)絡(luò)庫(kù)用的是他們自己再次封裝了的noHttp,加上了RxJava后,以前簡(jiǎn)單的網(wǎng)絡(luò)請(qǐng)求多寫了很多代碼,這是我對(duì)RxJava的第一感受。
如今的項(xiàng)目基本上都是MVP+Retrofit2+Rxjava2的框架,但往往只是把Rxjava2和Retrofit2搭配進(jìn)行網(wǎng)絡(luò)請(qǐng)求而已,我也不例外。也曾看過(guò)好幾次RxJava2的入門教程、操作符總結(jié)等,過(guò)段時(shí)間就忘了,而且不知在什么樣的情景下適用恰當(dāng),所以寫下這篇文章,打算帶著問(wèn)題出發(fā),重新整理一下。
問(wèn)題:
1、RxJava是什么,它優(yōu)勢(shì)是什么?
2、RxJava基礎(chǔ)知識(shí)。
3、RxJava2操作符。
4、背壓策略。
5、具體的使用場(chǎng)景。

1、RxJava是什么,它優(yōu)勢(shì)是什么?

RxJava官方介紹——一個(gè)在 Java VM 上使用可觀測(cè)的序列來(lái)組成異步的、基于事件的程序的庫(kù)。

Rxjava本質(zhì)就是異步,它的優(yōu)勢(shì)是簡(jiǎn)潔。簡(jiǎn)潔是指相比于AsyncTask 、Handler、runOnUiThread等異步操作,RxJava的邏輯更簡(jiǎn)潔。一方面它是鏈?zhǔn)降?,邏輯代碼直觀,二是它提供了很多強(qiáng)大的操作符。

2、RxJava基礎(chǔ)知識(shí)

RxJava的基礎(chǔ)知識(shí),首先要知道它是用了可拓展的觀察者模式,然后再了解這些概念,Observable、Observer、subscribe、處理事件、Scheduler。

1)什么是觀察者模式?
觀察者(Observer)不時(shí)刻盯著被觀察者(Observable),而是通過(guò)注冊(cè)(Register)或者稱為訂閱(Subscribe)的方式,告訴被觀察者我需要你的某某狀態(tài),你要在它變化的時(shí)候通知我。
舉例1:
A(觀察者)和B(被觀察者),A不需要每過(guò) 1s 就檢查一次 B 的狀態(tài),而是B狀態(tài)發(fā)生改變時(shí),去通知A。
舉例2:
View的點(diǎn)擊事件,View(被觀察者)->OnClickLisetener(觀察者)->setOnClickListener(訂閱)->OnClick(事件)。
舉例3:
小偷在偷錢時(shí)告訴警察它要偷錢。小偷(被觀察者)、警察(觀察者)、小偷偷錢是某種狀態(tài)、警察把小偷抓起來(lái)(事件)。

2)RxJava的觀察者模式
Observable(被觀察者)、Observer(觀察者)、 subscribe(訂閱)、事件。
看代碼清楚明了,ObservableEmitter是事件發(fā)射器,在onSubscribe方法里可得到Disposable,可用于取消訂閱。在觀察者的方法體里有4個(gè)事件,解釋看注釋。如果發(fā)送了onError() 或 onComplete()事件,就取消了這個(gè)訂閱。
一般用于網(wǎng)絡(luò)請(qǐng)求時(shí),都會(huì)寫一個(gè)繼承Observer的類來(lái)做處理,比如在onSubscribe()里做加載框顯示,在onComplete()做某些共性判斷,在onError()做統(tǒng)一失敗處理等。所以O(shè)bserver的4個(gè)方法肯定是要清楚的。

/**
* 打印結(jié)果:  只會(huì)打印1和2,3不打印
*/
Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
            //ObservableEmitter類,事件發(fā)射器,作用是定義需要發(fā)送的事件 & 向觀察者發(fā)送事件
            emitter.onNext(1);
            emitter.onNext(2);
            emitter.onComplete();
            emitter.onNext(3);
        }
    }).subscribe(new Observer<Integer>() {

        private Disposable mDisposable;  //用于取消訂閱

        /**
         * 訂閱成功就執(zhí)行的方法
         * @param d  可切斷操作
         */
        @Override
        public void onSubscribe(Disposable d) {
            mDisposable = d;
        }

        /**
         * 接收事件
         * @param integer
         */
        @Override
        public void onNext(Integer integer) {
            KLog.d(TAG, "onNext: " + integer + "\n");
        }

        /**
         * 失敗,不再接收事件
         * @param e
         */
        @Override
        public void onError(Throwable e) {

        }

        /**
         * 完成,不再接收事件
         */
        @Override
        public void onComplete() {
        }
    });

3)線程控制Scheduler
Scheduler是線程調(diào)度,RxJava通過(guò)它來(lái)指定每一段代碼運(yùn)行在什么樣的線程。例如網(wǎng)絡(luò)請(qǐng)求用io流線程,更新UI就用mainThread,最常用也是這兩個(gè)。
Schedulers.newThread():新線程。
Schedulers.io():在io()內(nèi)部有線程池,可以使用空閑的線程,多數(shù)情況下比newThread()效率高。
Schedulers.computation():計(jì)算線程(沒(méi)用過(guò))。
AndroidSchedulers.mainThread():主線程。

4)取消訂閱
RxJava2的取消訂閱用Disposable.cancel(),如果訂閱事件過(guò)多,要寫管理類來(lái)管理。如果不取消訂閱,就會(huì)造成內(nèi)存泄漏,網(wǎng)上有RxLifecycle第三方庫(kù),可以很方便做取消訂閱,它綁定了Activity or Fragment的生命周期。

3、RxJava2操作符

這里重新整理了一下當(dāng)初學(xué)習(xí)的操作符,原本打算整理全部的操作符的,然后發(fā)現(xiàn)實(shí)在是太多了,就放棄了,下面的操作符絕大多數(shù)都來(lái)自于Android Rxjava:這是一篇 清晰 & 易懂的Rxjava 入門教程,想詳細(xì)學(xué)習(xí)的可看這篇文章。網(wǎng)上也有很多文章有對(duì)操作符做簡(jiǎn)述,這篇文章比較完整Android拾萃 - RxJava2操作符匯總。最好結(jié)合代碼學(xué)習(xí),下載地址在本文最下面。

1)創(chuàng)建操作符
包括完整&快速創(chuàng)建被觀察者,定時(shí)操作,周期性操作,數(shù)據(jù)/集合遍歷

操作符 簡(jiǎn)述
基本創(chuàng)建
create() 普通創(chuàng)建,可以定義要發(fā)送的事件
快速創(chuàng)建
just() 快速創(chuàng)建一個(gè)Observable,最多發(fā)送10個(gè)參數(shù)
fromArray() 和just()類似,區(qū)別是傳的是數(shù)組
fromIterable() 和fromArray()類似,區(qū)別是傳入list集合
never() 不發(fā)送事件
empty() 僅發(fā)送complete事件
error() 僅發(fā)送empty事件
延遲創(chuàng)建
defer() 直到訂閱才創(chuàng)建Observable
timer() 延遲一段時(shí)間再才發(fā)送
interval() 周期性發(fā)送,值從0開(kāi)始遞增。
intervalRange() 指定范圍,周期性發(fā)送??捎糜谧鲵?yàn)證碼倒計(jì)時(shí)
range() 連續(xù)發(fā)送一個(gè)時(shí)間序列,可指定范圍

2)變換操作符

操作符 簡(jiǎn)述
map() 將被觀察者發(fā)送的事件轉(zhuǎn)換為任意的類型事件。返回的是結(jié)果集,適用于一對(duì)一轉(zhuǎn)換。(數(shù)據(jù)類型轉(zhuǎn)換)
flatMap() 將被觀察者發(fā)送的事件序列進(jìn)行 拆分 & 單獨(dú)轉(zhuǎn)換,再合并成一個(gè)新的事件序列,最后再進(jìn)行發(fā)送。返回的是包含結(jié)果集的Observable,適用于一對(duì)多,多對(duì)多得場(chǎng)景。
concatMap() 和flatMap()類似,區(qū)別是flatMap是無(wú)序的,concatMap是有序的,它的新事件序列和舊序列順序一致。具體可以在flatMap生成事件的邏輯里加個(gè)延遲看到差異。
buffer() 定期從被觀察者需要發(fā)送的事件中獲取一定數(shù)量的事件,放到緩存區(qū)中,最終發(fā)送。兩個(gè)參數(shù),count是緩存區(qū)大小,skip是步長(zhǎng)。
switchMap() 將Observable發(fā)射的數(shù)據(jù)集合變換為Observables集合,然后只發(fā)射這些Observables最近發(fā)射的數(shù)據(jù)
scan() 對(duì)Observable發(fā)射的每一項(xiàng)數(shù)據(jù)應(yīng)用一個(gè)函數(shù),然后按順序依次發(fā)射每一個(gè)值
groupBy() 將Observable分拆為Observable集合,將原始Observable發(fā)射的數(shù)據(jù)按Key分組,每一個(gè)Observable發(fā)射一組不同的數(shù)據(jù)

3)組合/合并轉(zhuǎn)換符

操作符 簡(jiǎn)述
組合多個(gè)被觀察者
concat() / concatArray() 組合多個(gè)被觀察者,按發(fā)送順序串行執(zhí)行。有Array是指?jìng)鲾?shù)組
merge()/mergeArray() 組合多個(gè)被觀察者,按時(shí)間線并行執(zhí)行
concatDelayError()/mergeDelayError() onError事件推遲到其他被觀察者發(fā)送事件結(jié)束后才觸發(fā)
合并多個(gè)事件
zip() 按數(shù)量合并,最終事件的數(shù)量=多個(gè)觀察者中事件最少的數(shù)量。比如被觀察者1有4個(gè)事件,被觀察者2有5個(gè)事件,則組合起來(lái)事件數(shù)量是4。
combineLatest() 按時(shí)間合并,和最新事件合并。當(dāng)兩個(gè)Observable中的任何一個(gè)發(fā)送了事件后,將先發(fā)送了數(shù)據(jù)的Obervable的最新一個(gè)事件與另一個(gè)Observable發(fā)送的每個(gè)事件結(jié)合,最終基于該函數(shù)的結(jié)果發(fā)送事件。和zip的區(qū)別是,zip()按個(gè)數(shù)合并,1對(duì)1合并;combineLastest 按時(shí)間合并,在同一時(shí)間點(diǎn)上合并。
reduce() 把被觀察者需要發(fā)送的事件聚合成1個(gè)事件
collect() 將被觀察者Observable發(fā)送的數(shù)據(jù)事件收集到一個(gè)數(shù)據(jù)結(jié)構(gòu)里
其他
startWith() 在一個(gè)被觀察者發(fā)送事件前,追加發(fā)送一些數(shù)據(jù) / 一個(gè)新的被觀察者
count() 統(tǒng)計(jì)被觀察者發(fā)送事件的數(shù)量

4)功能性操作符

操作符 簡(jiǎn)述
subscribe() 訂閱,即連接觀察者 & 被觀察者
delay() 使得被觀察者延遲一段時(shí)間再發(fā)送事件
doXXX() 各個(gè)事件操作符,如 doOnEach、doOnNext、doComplete等。
retry() 重試,即當(dāng)出現(xiàn)錯(cuò)誤時(shí),讓被觀察者(Observable)重新發(fā)射數(shù)據(jù)
retryUntil() Observable遇到錯(cuò)誤時(shí),在retryUntil()的方法里決定,是否讓Observable重新訂閱
retryWhen() retryWhen將onError中的Throwable傳遞給一個(gè)函數(shù),這個(gè)函數(shù)產(chǎn)生另一個(gè)Observable,由這個(gè)Observable來(lái)決定是否要重新訂閱原Observable。
repeat() 無(wú)條件地、重復(fù)發(fā)送 被觀察者事件
repeatWhen( int ) 傳入?yún)?shù) = 重復(fù)發(fā)送次數(shù)有限

5)過(guò)濾操作符

操作符 簡(jiǎn)述
filter() 過(guò)濾 特定條件的事件
ofType() 過(guò)濾 特定數(shù)據(jù)類型的數(shù)據(jù)
skip( int ) 跳過(guò)n個(gè)事件
skipLast( int ) 跳過(guò)最后的n個(gè)事件
distinct() 過(guò)濾事件序列中重復(fù)的事件
distinctUntilChanged() 只確保相鄰元素不重復(fù)出現(xiàn)
take( int ) 指定只接收n個(gè)事件
takeLast( int ) 指定只接收最后發(fā)送的n個(gè)事件

6)條件/布爾操作符

操作符 簡(jiǎn)述
all() 判斷發(fā)送的每項(xiàng)數(shù)據(jù)是否都滿足設(shè)置的函數(shù)條件,若滿足,返回 true;否則,返回 false。
takeWhile() 判斷發(fā)送的每項(xiàng)數(shù)據(jù)是否滿足設(shè)置函數(shù)條件,為true就發(fā)送,為false不發(fā)送,且終止發(fā)送事件(后面的事件也不發(fā)送了)。
skipWhile() 判斷發(fā)送的每項(xiàng)數(shù)據(jù)是否滿足設(shè)置函數(shù)條件,為true就不發(fā)送,為false發(fā)送,且終止發(fā)送事件。
takeUntil() 接收第一個(gè)Observable(調(diào)用takUtil的Observable)發(fā)送的數(shù)據(jù),當(dāng)?shù)诙€(gè)Observable(takUtil參數(shù)中的Observable)發(fā)送數(shù)據(jù)時(shí),兩個(gè)Obserable會(huì)同時(shí)取消訂閱。
skipUntil() 與takeUtil()正好相反,不接收第一個(gè)Observable發(fā)送的數(shù)據(jù),直到第二個(gè)Observable發(fā)送數(shù)據(jù)時(shí)才接收第一個(gè)Observable的數(shù)據(jù),此時(shí)第二個(gè)Observable會(huì)取消訂閱。
sequenceEqual() 判定兩個(gè)Observables需要發(fā)送的數(shù)據(jù)是否相同,相同返回 true,不相同返回 false
contains() 判斷發(fā)送的數(shù)據(jù)中是否包含指定數(shù)據(jù)。
isEmpty() 判斷發(fā)送的數(shù)據(jù)是否為空
amb() 當(dāng)需要發(fā)送多個(gè) Observable時(shí),只發(fā)送最先發(fā)送的Observable的數(shù)據(jù),而其余Observable則被丟棄。

4.背壓策略。

當(dāng)在異步訂閱中,通過(guò)Observable發(fā)射、處理、響應(yīng)數(shù)據(jù)流時(shí),如果事件產(chǎn)生的速度遠(yuǎn)遠(yuǎn)快于事件消費(fèi)的速度,這些沒(méi)來(lái)得及處理的數(shù)據(jù)就會(huì)越積越多,這些數(shù)據(jù)不會(huì)丟失,也不會(huì)被垃圾回收機(jī)制回收,而是存放在一個(gè)異步緩存池中,緩存池的數(shù)據(jù)一直得不到處理,最終會(huì)導(dǎo)致OOM等異常。這就是響應(yīng)式編程中的背壓?jiǎn)栴}??偨Y(jié)一下就是: 事件產(chǎn)生的速度大于事件消費(fèi)的速度,數(shù)據(jù)堆積,最終造成OOM等異常。
RxJava2把對(duì)背壓?jiǎn)栴}的處理邏輯從Observable中抽取出來(lái)產(chǎn)生了新的可觀察對(duì)象Flowable,它是在Observable基礎(chǔ)上做了優(yōu)化,所以O(shè)bservable能做的,它都能做,但是加了背壓支持和其他的邏輯處理,它的效率比Observable慢得多,所以在需要用到背壓的時(shí)候再用Flowable,其他時(shí)候還是正常使用Observable。網(wǎng)上找了兩張圖,可以很直觀地看出RxJava1和2對(duì)背壓的改動(dòng)。


RxJava1.png

RxJava2.png

簡(jiǎn)單舉個(gè)例子:

Flowable.create(new FlowableOnSubscribe<Integer>() {
            @Override
            public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {

                //異步訂閱時(shí),代表的是 異步緩存池中可放入數(shù)據(jù)的數(shù)量,一開(kāi)始是128,當(dāng)產(chǎn)生10個(gè)事件而沒(méi)有消費(fèi)時(shí),此時(shí)這個(gè)值是128-10=118。
                KLog.d(TAG, "異步緩存池中可放入數(shù)據(jù)的數(shù)量 = " + emitter.requested());

                // 一共發(fā)送4個(gè)事件
                KLog.d(TAG, "發(fā)送事件 1");
                emitter.onNext(1);
                KLog.d(TAG, "發(fā)送事件 2");
                emitter.onNext(2);
                KLog.d(TAG, "發(fā)送事件 3");
                emitter.onNext(3);
                KLog.d(TAG, "發(fā)送事件 4");
                emitter.onNext(4);
                KLog.d(TAG, "發(fā)送事件 onComplete()");
                emitter.onComplete();

                KLog.d(TAG, "異步緩存池中可放入數(shù)據(jù)的數(shù)量 = " + emitter.requested());

//                //模擬緩存超過(guò)128
//                for (int i = 0;i< 129; i++) {
//                    Log.d(TAG, "發(fā)送了事件" + i);
//                    emitter.onNext(i);
//                }
//                emitter.onComplete();
//
            }
        }, BackpressureStrategy.ERROR)  //緩存區(qū)超過(guò)128,直接拋出異常
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Subscriber<Integer>() {
                    @Override
                    public void onSubscribe(Subscription s) {
                        // 在異步訂閱情況下,一定要調(diào)用request,否則下流不接收事件
                        // 只接收多少個(gè)事件
                        s.request(3);
                    }

                    @Override
                    public void onNext(Integer integer) {
                        KLog.d(TAG, "接收到了事件" + integer);
                    }

                    @Override
                    public void onError(Throwable t) {
                        KLog.d(TAG, "onError:", t);
                    }

                    @Override
                    public void onComplete() {
                        KLog.d(TAG, "onComplete");
                    }
                });

可以看到Flowable和Subscriber的使用方式和之前的Observable和Observer極其類似,不同點(diǎn)如下:
1、create方法中多了一個(gè)BackpressureStrategy類型的參數(shù)。
2、Flowable發(fā)射數(shù)據(jù)時(shí),使用FlowableEmitter,而Observable用的是ObservableEmitter。
3、Subscriber中,方法onSubscribe回調(diào)的參數(shù)不是Disposable而是Subscription。
下面對(duì)這三點(diǎn)進(jìn)行說(shuō)明。

BackpressureStrategy
緩存策略,也既是當(dāng)緩存區(qū)存滿、上流仍然繼續(xù)發(fā)送下事件時(shí),該如何處理的策略。默認(rèn)緩存區(qū)大小是128(與Flowable的buffersize大小有關(guān))。具體有哪些策略如下:
BackpressureStrategy.ERROR: 直接拋異常。
BackpressureStrategy.MISSING: 友好提示:緩存區(qū)滿了。
BackpressureStrategy.BUFFER: 將緩存區(qū)設(shè)為無(wú)限大。
BackpressureStrategy.DROP: 超過(guò)緩存區(qū)大小128的事件丟棄。
BackpressureStrategy.LATEST: 只保存最后事件,超過(guò)緩存區(qū)大小128的事件丟棄。

Subscription
Subscription比Disposable多了一個(gè)方法request()。它的作用是告訴上游,下游需要多少數(shù)據(jù),如果不設(shè)置request默認(rèn)是0。比如設(shè)置為3,那么超過(guò)范圍之外的數(shù)據(jù)就不接收了。如果多次調(diào)用request(),會(huì)累加。

FlowableEmitter
它比ObservableEmitter多了一個(gè)方法requested(),這個(gè)方法返回的是異步緩存池中可放入數(shù)據(jù)的數(shù)量,比如一開(kāi)始是128,當(dāng)產(chǎn)生10個(gè)事件而沒(méi)有消費(fèi)時(shí),此時(shí)這個(gè)值是128-10=118。

Tips:不管Subscription.request(xx)設(shè)置了什么值,F(xiàn)lowableEmitter都會(huì)發(fā)送事件的,發(fā)送了不接收的就放入緩存里。背壓在異步訂閱中才有用,如果是同步訂閱,是不會(huì)有緩存池的。

5.具體的使用場(chǎng)景

實(shí)際項(xiàng)目中,我只用來(lái)做過(guò)驗(yàn)證碼倒計(jì)時(shí)、網(wǎng)絡(luò)請(qǐng)求,RxBus。雖然了解了一些操作符,但網(wǎng)上的教程講解這些操作符的時(shí)候都是以簡(jiǎn)單的例子來(lái)講解的,具體的應(yīng)用場(chǎng)景還是比較玄乎。

1)RxBus
EventBus是一個(gè)基于發(fā)布/訂閱的事件總線,它簡(jiǎn)化了組件之間的通信操作。而這些RxBus都能做,所以用RxBus替換EventBus,在RxJava1就已經(jīng)提出來(lái)了,如果自己的事件發(fā)送的要求不高,可以自己封裝一個(gè)RxBus使用。17年的時(shí)候找的一篇文章 Android 用RxJava模擬一個(gè)EventBus ———RxBus,我在這基礎(chǔ)上稍微改動(dòng)了一下,但是沒(méi)有做粘性事件。
原理簡(jiǎn)單來(lái)說(shuō)就是找個(gè)容器裝所有的觀察者,當(dāng)有某個(gè)事件產(chǎn)生時(shí),找到所有需要這個(gè)事件的觀察者,向它們發(fā)送事件。這個(gè)是否發(fā)送事件的依據(jù),其實(shí)就是看觀察者要什么樣的class,比如訂閱String類型的,那么Integer類型的事件就不會(huì)發(fā)給它。

兩個(gè)新知識(shí):
1)CompositeDisposable
一個(gè)disposable的容器,可以容納多個(gè)disposable
2)Subject
Subject可以同時(shí)代表 Observer 和 Observable,允許從數(shù)據(jù)源中多次發(fā)送結(jié)果給多個(gè)觀察者。

Subject 類別 簡(jiǎn)述
AsyncSubject 只有當(dāng) Subject 調(diào)用 onComplete 方法時(shí),才會(huì)將 Subject 中的最后一個(gè)事件傳遞給所有的 Observer。
BehaviorSubject 當(dāng)觀察者訂閱BehaviorSubject時(shí),它開(kāi)始發(fā)射原始Observable最近發(fā)射的數(shù)據(jù)(如果此時(shí)還沒(méi)有收到任何數(shù)據(jù),它會(huì)發(fā)射一個(gè)默認(rèn)值),然后繼續(xù)發(fā)射其它任何來(lái)自原始Observable的數(shù)據(jù)。然而,如果原始的Observable因?yàn)榘l(fā)生了一個(gè)錯(cuò)誤而終止,BehaviorSubject將不會(huì)發(fā)射任何數(shù)據(jù),只是簡(jiǎn)單的向前傳遞這個(gè)錯(cuò)誤通知。Rxlifecycle2庫(kù)用的就是這個(gè)。
PublishSubject 不會(huì)改變事件的發(fā)送順序,在已經(jīng)發(fā)送了一部分事件之后注冊(cè)的 Observer 不會(huì)收到之前發(fā)送的事件。RxBus用的這個(gè)。
ReplaySubject 無(wú)論什么時(shí)候注冊(cè) Observer 都可以接收到任何時(shí)候通過(guò)該 Observable 發(fā)射的事件。 RxBus粘性事件可用這個(gè)。
UnicastSubject 只允許一個(gè) Observer 進(jìn)行監(jiān)聽(tīng),在該 Observer 注冊(cè)之前會(huì)將發(fā)射的所有的事件放進(jìn)一個(gè)隊(duì)列中, 并在 Observer 注冊(cè)的時(shí)候一起通知給它。
public class RxBus {

    /**
     * CompositeDisposable定義:
     * 一個(gè)disposable的容器,可以容納多個(gè)disposable,添加和去除的復(fù)雜度為O(1)
     *
     * 此處使用目的:
     * 是為了一個(gè)訂閱者能夠?qū)?yīng)多個(gè)Disposable,在需要的時(shí)候調(diào)用 Disposable 的 dispose()取消訂閱。
     *
     * 舉個(gè)例子:
     * XXActivity,訂閱了事件A和事件B,關(guān)閉時(shí)要取消訂閱。
     * 那么只需在mSubscriptionMap里找到key為XXActivity的Value(CompositeDisposable),再取出Disposable
     * 取消訂閱即可。
     *
     * 如果不用的話,就要為每一個(gè)Activity寫一個(gè)容器去保存它的訂閱的事件了。
     * */
    private HashMap<String, CompositeDisposable> mSubscriptionMap;
    private static volatile RxBus mRxBus;
    private final Subject<Object> mSubject;

    public static RxBus getIntanceBus(){
        if (mRxBus==null){
            synchronized (RxBus.class){
                if(mRxBus==null){
                    mRxBus = new RxBus();
                }
            }
        }
        return mRxBus;
    }

    
    /**
     * 正常訂閱可用PublishSubject、黏性事件可用ReplaySubject。
     */
    private RxBus(){
        mSubject = PublishSubject.create().toSerialized();
    }

    /**
     * 一個(gè)默認(rèn)的訂閱方法
     * @param <T>
     * @param type   過(guò)濾,只返回特定數(shù)據(jù)類型的數(shù)據(jù)
     * @param next   next()事件,正常接收的事件
     * @param error  error()事件,錯(cuò)誤接收的事件
     * @return
     */
    public <T> Disposable doSubscribe(Class<T> type, Consumer<T> next, Consumer<Throwable> error){
        return getObservable(type)
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(next,error);
    }

    /**
     * 返回指定類型的帶背壓的Flowable實(shí)例
     * 這里選定背壓模式是 BackpressureStrategy.BUFFER,緩存區(qū)無(wú)限大
     * @param <T>
     * @param type  過(guò)濾,只返回特定數(shù)據(jù)類型的數(shù)據(jù)
     * @return
     */
    public <T> Flowable<T> getObservable(Class<T> type){
        return mSubject.toFlowable(BackpressureStrategy.BUFFER).ofType(type);
    }

    /**
     * 發(fā)送事件
     * @param o 事件
     */
    public void post(Object o){
        mSubject.onNext(o);
    }


    /**
     * 判斷是否已有觀察者訂閱
     *
     * @return
     */
    public boolean hasObservers() {
        return mSubject.hasObservers();
    }

    /**
     * 保存訂閱后的disposable,取消訂閱的時(shí)候要用
     * @param o       訂閱的目標(biāo)
     * @param disposable
     */
    public void addSubscription(Object o, Disposable disposable) {
        if (mSubscriptionMap == null) {
            mSubscriptionMap = new HashMap<>();
        }
        //這里key值取訂閱目標(biāo)的實(shí)體名稱(com.xx.xx.xxx),不是底層類檢稱
        String key = o.getClass().getName();
        //disposable 放到對(duì)應(yīng)的 CompositeDisposable 里
        if (mSubscriptionMap.get(key) != null) {
            mSubscriptionMap.get(key).add(disposable);
        } else {
            CompositeDisposable disposables = new CompositeDisposable();
            disposables.add(disposable);
            mSubscriptionMap.put(key, disposables);
        }
    }

    /**
     * 取消訂閱
     * @param o
     */
    public void unSubscribe(Object o) {
        if (mSubscriptionMap == null) {
            return;
        }
        String key = o.getClass().getName();
        if (!mSubscriptionMap.containsKey(key)){
            return;
        }
        if (mSubscriptionMap.get(key) != null) {
            mSubscriptionMap.get(key).dispose();
        }
        mSubscriptionMap.remove(key);
    }

    /********************************* 為RxEventBean 封裝  **********************************/
    /**
     * 使用EventBus時(shí),為了方便查找,一般都會(huì)封裝 EventBean(int code,Object content)
     * 但是所有的訂閱者都是訂閱的這個(gè)類型,所以要自己做判斷做類型轉(zhuǎn)換。
     *
     * 這里寫死事件是RxEventBean,且錯(cuò)誤處理一般不處理,只用處理onNext即可。
     * @param context
     * @param action
     * */
    public void register(Context context, Consumer<RxEventBean> action) {
        Disposable disposable = RxBus.getIntanceBus().doSubscribe(RxEventBean.class, action,
                throwable -> KLog.e("RxEventBean onError()", throwable.toString()));
        RxBus.getIntanceBus().addSubscription(context,disposable);
    }


    /**
     * 發(fā)送RxEventBean 事件
     * @param code     code,用于判斷
     * @param content  內(nèi)容,接收后做類型轉(zhuǎn)換
     */
    public void post(int code, Object content){
        RxEventBean<Object> event = new RxEventBean<>();
        event.code = code;
        event.content = content;
        post(event);
    }
}

2)驗(yàn)證碼倒計(jì)時(shí)
其實(shí)關(guān)鍵代碼就是用intervalRange()操作符。

public class RxCodeHelper {

    private Context mContext;
    private TextView codeBt;
    private Disposable mDisposable;

    /**
     * 構(gòu)造函數(shù)
     * @param mContext 上下文
     * @param codeBt   驗(yàn)證碼的button
     */
    public RxCodeHelper(Context mContext, TextView codeBt) {
        this.mContext = mContext;
        this.codeBt = codeBt;
    }

    /**
     * 開(kāi)啟倒計(jì)時(shí)
     */
    public void start(){
        codeBt.setEnabled(false);
        //從0開(kāi)始,走60個(gè)數(shù),延遲是0s,周期為1次。
        Observable.intervalRange(0,60,0,1, TimeUnit.SECONDS)
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Observer<Long>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        mDisposable = d;
                    }

                    @Override
                    public void onNext(Long aLong) {
                        codeBt.setText((60 - aLong) + "s后可重發(fā)");
                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onComplete() {
                        //倒計(jì)時(shí)完畢置為可點(diǎn)擊狀態(tài)
                        codeBt.setEnabled(true);
                        codeBt.setText("獲取驗(yàn)證碼");
                    }
                });
    }

    /**
     * 請(qǐng)求失敗時(shí),重置狀態(tài)
     * 取消倒計(jì)時(shí)的訂閱事件
     */
    public void reset(){
        if(mDisposable!=null){
            mDisposable.dispose();
        }
        codeBt.setEnabled(true);
        codeBt.setText("獲取驗(yàn)證碼");
    }

    /**
     * 界面銷毀時(shí),取消倒計(jì)時(shí)訂閱事件
     */
    public void stop(){
        if(mDisposable!=null){
            mDisposable.dispose();
        }
    }

}

3)RxLifecycle
目的:解決RxJava的內(nèi)存泄漏問(wèn)題。
用法:
bindUntilEvent(@NonNull ActivityEvent event)——綁定指定的生命周期,在指定生命周期時(shí)取消訂閱。
bindToLifecycle() ——綁定生命周期,取消訂閱策略可看源碼。以Activity為例,可看RxLifecycleAndroid 下的 ACTIVITY_LIFECYCLE, 實(shí)際上在onCreate()訂閱的,在onDestroy()取消訂閱;在onResume()訂閱的,在onPause()取消訂閱。

知識(shí)點(diǎn):
compose():
將一種類型的Observable轉(zhuǎn)換成另一種類型的Observable,保證調(diào)用的鏈?zhǔn)浇Y(jié)構(gòu)。
LifecycleTransformer:
LifecycleTransformer實(shí)現(xiàn)了各種Transformer接口,能夠?qū)⒁粋€(gè)Observable/Flowable/Single/Completable/Maybe對(duì)象轉(zhuǎn)換成另一個(gè) Observable/Flowable/Single/Completable/Maybe對(duì)象。正好配合上文的compose操作符,使用在鏈?zhǔn)秸{(diào)用中。

舉例在網(wǎng)絡(luò)請(qǐng)求時(shí)綁定生命周期,mvp模式。
Activity繼承了RxFragmentActivity后,BaseView接口里新增兩個(gè)方法,其他View層接口繼承它。

public interface BaseView {
//為了讓 IView 可以調(diào)用 RxLifeCycle的生命周期綁定
<T> LifecycleTransformer<T> bindToLifecycle();
<T> LifecycleTransformer<T> bindUntilEvent(@NonNull ActivityEvent event);
}

在P層使用,此處模擬網(wǎng)絡(luò)請(qǐng)求,3s后回調(diào)

Observable.timer(3, TimeUnit.SECONDS)
.subscribeOn(Schedulers.io()) //訂閱在io線程
.observeOn(AndroidSchedulers.mainThread()) //回調(diào)在主線程
.compose(mView.bindUntilEvent(ActivityEvent.DESTROY)) //指定在onDestroy銷毀
// .compose(mView.bindToLifecycle()) //取消訂閱交由RxLifeCycle來(lái)判斷
.subscribe(new Observer<Long>() {
@Override
public void onSubscribe(Disposable d) {
mView.showLoading();
}

@Override
public void onNext(Long aLong) {
KLog.d("onTest1Success()回調(diào)成功");
mView.onTest1Success("onTest1 請(qǐng)求成功");
}

@Override
public void onError(Throwable e) {
mView.dismissLoading();
}

@Override
public void onComplete() {
mView.dismissLoading();
}
});

一般使用,可以用compose封裝一下,如下:

/**
     * 統(tǒng)一線程處理,且綁定生命周期
     * 用法: xxx .compose(RxUtil.rxSchedulerHelper(mView))
     * @param view
     * @param <T>
     * @return
     */
    public static <T> ObservableTransformer<T,T> rxSchedulerHelper(BaseView view){
        return new ObservableTransformer<T,T>() {
            @Override
            public ObservableSource<T> apply(@NonNull Observable<T> upstream) {
                return upstream
                        .subscribeOn(Schedulers.io())    //訂閱在io線程
                        .unsubscribeOn(Schedulers.io())  //取消訂閱在io線程,為啥要這個(gè),不太清楚
                        .observeOn(AndroidSchedulers.mainThread()) //回調(diào)在主線程
                        .compose(view.bindToLifecycle());  //綁定生命周期
            }
        };
    }

結(jié)尾

在網(wǎng)上找了一些操作符的簡(jiǎn)單例子和應(yīng)用場(chǎng)景,寫了一個(gè)Demo,看操作符概述的時(shí)候,結(jié)合代碼運(yùn)行結(jié)果來(lái)看更容易理解,有興趣的可以看看。后續(xù)會(huì)在這個(gè)項(xiàng)目里繼續(xù)更新操作符和應(yīng)用場(chǎng)景。
Github地址:
https://github.com/Dengszzzzz/DRxJavaSummary

參考

給 Android 開(kāi)發(fā)者的 RxJava 詳解
這可能是最好的RxJava 2.x 入門教程
Android拾萃 - RxJava2操作符匯總
Rxjava2入門教程五:Flowable背壓支持——對(duì)Flowable最全面而詳細(xì)的講解
Android Rxjava:這是一篇 清晰 & 易懂的Rxjava 入門教程
實(shí)際應(yīng)用場(chǎng)景
Android 用RxJava模擬一個(gè)EventBus ———RxBus
RxLifecycle詳細(xì)解析
...

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • RxJava RxJava是響應(yīng)式程序設(shè)計(jì)的一種實(shí)現(xiàn)。在響應(yīng)式程序設(shè)計(jì)中,當(dāng)數(shù)據(jù)到達(dá)的時(shí)候,消費(fèi)者做出響應(yīng)。響應(yīng)式...
    Mr槑閱讀 1,044評(píng)論 0 5
  • 本文首發(fā)于“隨手記技術(shù)團(tuán)隊(duì)”公眾號(hào) 大概從2015年開(kāi)始,RxJava1.0開(kāi)始快速流行起來(lái),短短兩年時(shí)間,RxJ...
    HolenZhou閱讀 2,217評(píng)論 0 19
  • 目錄 一:創(chuàng)建操作 二:合并操作 三:過(guò)濾操作 四:切換線程 五:條件/布爾操作 六:聚合操作 七:轉(zhuǎn)換操作 八:...
    Allens_Jiang閱讀 10,246評(píng)論 11 32
  • RxJava2.0的使用詳解 1,初識(shí)RxJava RxJava就是一種用Java語(yǔ)言實(shí)現(xiàn)的響應(yīng)式編程,來(lái)創(chuàng)建基于...
    CarlosLynn閱讀 2,952評(píng)論 0 0
  • 一、Retrofit詳解 ·Retrofit的官網(wǎng)地址為 : http://square.github.io/re...
    余生_d630閱讀 2,083評(píng)論 0 5

友情鏈接更多精彩內(nèi)容