前言
2017年,剛畢業(yè)的我在項(xiàng)目組里打下手,組里其他人決定在新項(xiàng)目里使用RxJava,那時(shí)網(wǎng)絡(luò)庫(kù)用的是他們自己再次封裝了的noHttp,加上了RxJava后,以前簡(jiǎn)單的網(wǎng)絡(luò)請(qǐng)求多寫了很多代碼,這是我對(duì)RxJava的第一感受。
如今的項(xiàng)目基本上都是MVP+Retrofit2+Rxjava2的框架,但往往只是把Rxjava2和Retrofit2搭配進(jìn)行網(wǎng)絡(luò)請(qǐng)求而已,我也不例外。也曾看過(guò)好幾次RxJava2的入門教程、操作符總結(jié)等,過(guò)段時(shí)間就忘了,而且不知在什么樣的情景下適用恰當(dāng),所以寫下這篇文章,打算帶著問(wèn)題出發(fā),重新整理一下。
問(wèn)題:
1、RxJava是什么,它優(yōu)勢(shì)是什么?
2、RxJava基礎(chǔ)知識(shí)。
3、RxJava2操作符。
4、背壓策略。
5、具體的使用場(chǎng)景。
1、RxJava是什么,它優(yōu)勢(shì)是什么?
RxJava官方介紹——一個(gè)在 Java VM 上使用可觀測(cè)的序列來(lái)組成異步的、基于事件的程序的庫(kù)。
Rxjava本質(zhì)就是異步,它的優(yōu)勢(shì)是簡(jiǎn)潔。簡(jiǎn)潔是指相比于AsyncTask 、Handler、runOnUiThread等異步操作,RxJava的邏輯更簡(jiǎn)潔。一方面它是鏈?zhǔn)降?,邏輯代碼直觀,二是它提供了很多強(qiáng)大的操作符。
2、RxJava基礎(chǔ)知識(shí)
RxJava的基礎(chǔ)知識(shí),首先要知道它是用了可拓展的觀察者模式,然后再了解這些概念,Observable、Observer、subscribe、處理事件、Scheduler。
1)什么是觀察者模式?
觀察者(Observer)不時(shí)刻盯著被觀察者(Observable),而是通過(guò)注冊(cè)(Register)或者稱為訂閱(Subscribe)的方式,告訴被觀察者我需要你的某某狀態(tài),你要在它變化的時(shí)候通知我。
舉例1:
A(觀察者)和B(被觀察者),A不需要每過(guò) 1s 就檢查一次 B 的狀態(tài),而是B狀態(tài)發(fā)生改變時(shí),去通知A。
舉例2:
View的點(diǎn)擊事件,View(被觀察者)->OnClickLisetener(觀察者)->setOnClickListener(訂閱)->OnClick(事件)。
舉例3:
小偷在偷錢時(shí)告訴警察它要偷錢。小偷(被觀察者)、警察(觀察者)、小偷偷錢是某種狀態(tài)、警察把小偷抓起來(lái)(事件)。
2)RxJava的觀察者模式
Observable(被觀察者)、Observer(觀察者)、 subscribe(訂閱)、事件。
看代碼清楚明了,ObservableEmitter是事件發(fā)射器,在onSubscribe方法里可得到Disposable,可用于取消訂閱。在觀察者的方法體里有4個(gè)事件,解釋看注釋。如果發(fā)送了onError() 或 onComplete()事件,就取消了這個(gè)訂閱。
一般用于網(wǎng)絡(luò)請(qǐng)求時(shí),都會(huì)寫一個(gè)繼承Observer的類來(lái)做處理,比如在onSubscribe()里做加載框顯示,在onComplete()做某些共性判斷,在onError()做統(tǒng)一失敗處理等。所以O(shè)bserver的4個(gè)方法肯定是要清楚的。
/**
* 打印結(jié)果: 只會(huì)打印1和2,3不打印
*/
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
//ObservableEmitter類,事件發(fā)射器,作用是定義需要發(fā)送的事件 & 向觀察者發(fā)送事件
emitter.onNext(1);
emitter.onNext(2);
emitter.onComplete();
emitter.onNext(3);
}
}).subscribe(new Observer<Integer>() {
private Disposable mDisposable; //用于取消訂閱
/**
* 訂閱成功就執(zhí)行的方法
* @param d 可切斷操作
*/
@Override
public void onSubscribe(Disposable d) {
mDisposable = d;
}
/**
* 接收事件
* @param integer
*/
@Override
public void onNext(Integer integer) {
KLog.d(TAG, "onNext: " + integer + "\n");
}
/**
* 失敗,不再接收事件
* @param e
*/
@Override
public void onError(Throwable e) {
}
/**
* 完成,不再接收事件
*/
@Override
public void onComplete() {
}
});
3)線程控制Scheduler
Scheduler是線程調(diào)度,RxJava通過(guò)它來(lái)指定每一段代碼運(yùn)行在什么樣的線程。例如網(wǎng)絡(luò)請(qǐng)求用io流線程,更新UI就用mainThread,最常用也是這兩個(gè)。
Schedulers.newThread():新線程。
Schedulers.io():在io()內(nèi)部有線程池,可以使用空閑的線程,多數(shù)情況下比newThread()效率高。
Schedulers.computation():計(jì)算線程(沒(méi)用過(guò))。
AndroidSchedulers.mainThread():主線程。
4)取消訂閱
RxJava2的取消訂閱用Disposable.cancel(),如果訂閱事件過(guò)多,要寫管理類來(lái)管理。如果不取消訂閱,就會(huì)造成內(nèi)存泄漏,網(wǎng)上有RxLifecycle第三方庫(kù),可以很方便做取消訂閱,它綁定了Activity or Fragment的生命周期。
3、RxJava2操作符
這里重新整理了一下當(dāng)初學(xué)習(xí)的操作符,原本打算整理全部的操作符的,然后發(fā)現(xiàn)實(shí)在是太多了,就放棄了,下面的操作符絕大多數(shù)都來(lái)自于Android Rxjava:這是一篇 清晰 & 易懂的Rxjava 入門教程,想詳細(xì)學(xué)習(xí)的可看這篇文章。網(wǎng)上也有很多文章有對(duì)操作符做簡(jiǎn)述,這篇文章比較完整Android拾萃 - RxJava2操作符匯總。最好結(jié)合代碼學(xué)習(xí),下載地址在本文最下面。
1)創(chuàng)建操作符
包括完整&快速創(chuàng)建被觀察者,定時(shí)操作,周期性操作,數(shù)據(jù)/集合遍歷
| 操作符 | 簡(jiǎn)述 |
|---|---|
| 基本創(chuàng)建 | |
| create() | 普通創(chuàng)建,可以定義要發(fā)送的事件 |
| 快速創(chuàng)建 | |
| just() | 快速創(chuàng)建一個(gè)Observable,最多發(fā)送10個(gè)參數(shù) |
| fromArray() | 和just()類似,區(qū)別是傳的是數(shù)組 |
| fromIterable() | 和fromArray()類似,區(qū)別是傳入list集合 |
| never() | 不發(fā)送事件 |
| empty() | 僅發(fā)送complete事件 |
| error() | 僅發(fā)送empty事件 |
| 延遲創(chuàng)建 | |
| defer() | 直到訂閱才創(chuàng)建Observable |
| timer() | 延遲一段時(shí)間再才發(fā)送 |
| interval() | 周期性發(fā)送,值從0開(kāi)始遞增。 |
| intervalRange() | 指定范圍,周期性發(fā)送??捎糜谧鲵?yàn)證碼倒計(jì)時(shí) |
| range() | 連續(xù)發(fā)送一個(gè)時(shí)間序列,可指定范圍 |
2)變換操作符
| 操作符 | 簡(jiǎn)述 |
|---|---|
| map() | 將被觀察者發(fā)送的事件轉(zhuǎn)換為任意的類型事件。返回的是結(jié)果集,適用于一對(duì)一轉(zhuǎn)換。(數(shù)據(jù)類型轉(zhuǎn)換) |
| flatMap() | 將被觀察者發(fā)送的事件序列進(jìn)行 拆分 & 單獨(dú)轉(zhuǎn)換,再合并成一個(gè)新的事件序列,最后再進(jìn)行發(fā)送。返回的是包含結(jié)果集的Observable,適用于一對(duì)多,多對(duì)多得場(chǎng)景。 |
| concatMap() | 和flatMap()類似,區(qū)別是flatMap是無(wú)序的,concatMap是有序的,它的新事件序列和舊序列順序一致。具體可以在flatMap生成事件的邏輯里加個(gè)延遲看到差異。 |
| buffer() | 定期從被觀察者需要發(fā)送的事件中獲取一定數(shù)量的事件,放到緩存區(qū)中,最終發(fā)送。兩個(gè)參數(shù),count是緩存區(qū)大小,skip是步長(zhǎng)。 |
| switchMap() | 將Observable發(fā)射的數(shù)據(jù)集合變換為Observables集合,然后只發(fā)射這些Observables最近發(fā)射的數(shù)據(jù) |
| scan() | 對(duì)Observable發(fā)射的每一項(xiàng)數(shù)據(jù)應(yīng)用一個(gè)函數(shù),然后按順序依次發(fā)射每一個(gè)值 |
| groupBy() | 將Observable分拆為Observable集合,將原始Observable發(fā)射的數(shù)據(jù)按Key分組,每一個(gè)Observable發(fā)射一組不同的數(shù)據(jù) |
3)組合/合并轉(zhuǎn)換符
| 操作符 | 簡(jiǎn)述 |
|---|---|
| 組合多個(gè)被觀察者 | |
| concat() / concatArray() | 組合多個(gè)被觀察者,按發(fā)送順序串行執(zhí)行。有Array是指?jìng)鲾?shù)組 |
| merge()/mergeArray() | 組合多個(gè)被觀察者,按時(shí)間線并行執(zhí)行 |
| concatDelayError()/mergeDelayError() | onError事件推遲到其他被觀察者發(fā)送事件結(jié)束后才觸發(fā) |
| 合并多個(gè)事件 | |
| zip() | 按數(shù)量合并,最終事件的數(shù)量=多個(gè)觀察者中事件最少的數(shù)量。比如被觀察者1有4個(gè)事件,被觀察者2有5個(gè)事件,則組合起來(lái)事件數(shù)量是4。 |
| combineLatest() | 按時(shí)間合并,和最新事件合并。當(dāng)兩個(gè)Observable中的任何一個(gè)發(fā)送了事件后,將先發(fā)送了數(shù)據(jù)的Obervable的最新一個(gè)事件與另一個(gè)Observable發(fā)送的每個(gè)事件結(jié)合,最終基于該函數(shù)的結(jié)果發(fā)送事件。和zip的區(qū)別是,zip()按個(gè)數(shù)合并,1對(duì)1合并;combineLastest 按時(shí)間合并,在同一時(shí)間點(diǎn)上合并。 |
| reduce() | 把被觀察者需要發(fā)送的事件聚合成1個(gè)事件 |
| collect() | 將被觀察者Observable發(fā)送的數(shù)據(jù)事件收集到一個(gè)數(shù)據(jù)結(jié)構(gòu)里 |
| 其他 | |
| startWith() | 在一個(gè)被觀察者發(fā)送事件前,追加發(fā)送一些數(shù)據(jù) / 一個(gè)新的被觀察者 |
| count() | 統(tǒng)計(jì)被觀察者發(fā)送事件的數(shù)量 |
4)功能性操作符
| 操作符 | 簡(jiǎn)述 |
|---|---|
| subscribe() | 訂閱,即連接觀察者 & 被觀察者 |
| delay() | 使得被觀察者延遲一段時(shí)間再發(fā)送事件 |
| doXXX() | 各個(gè)事件操作符,如 doOnEach、doOnNext、doComplete等。 |
| retry() | 重試,即當(dāng)出現(xiàn)錯(cuò)誤時(shí),讓被觀察者(Observable)重新發(fā)射數(shù)據(jù) |
| retryUntil() | Observable遇到錯(cuò)誤時(shí),在retryUntil()的方法里決定,是否讓Observable重新訂閱 |
| retryWhen() | retryWhen將onError中的Throwable傳遞給一個(gè)函數(shù),這個(gè)函數(shù)產(chǎn)生另一個(gè)Observable,由這個(gè)Observable來(lái)決定是否要重新訂閱原Observable。 |
| repeat() | 無(wú)條件地、重復(fù)發(fā)送 被觀察者事件 |
| repeatWhen( int ) | 傳入?yún)?shù) = 重復(fù)發(fā)送次數(shù)有限 |
5)過(guò)濾操作符
| 操作符 | 簡(jiǎn)述 |
|---|---|
| filter() | 過(guò)濾 特定條件的事件 |
| ofType() | 過(guò)濾 特定數(shù)據(jù)類型的數(shù)據(jù) |
| skip( int ) | 跳過(guò)n個(gè)事件 |
| skipLast( int ) | 跳過(guò)最后的n個(gè)事件 |
| distinct() | 過(guò)濾事件序列中重復(fù)的事件 |
| distinctUntilChanged() | 只確保相鄰元素不重復(fù)出現(xiàn) |
| take( int ) | 指定只接收n個(gè)事件 |
| takeLast( int ) | 指定只接收最后發(fā)送的n個(gè)事件 |
6)條件/布爾操作符
| 操作符 | 簡(jiǎn)述 |
|---|---|
| all() | 判斷發(fā)送的每項(xiàng)數(shù)據(jù)是否都滿足設(shè)置的函數(shù)條件,若滿足,返回 true;否則,返回 false。 |
| takeWhile() | 判斷發(fā)送的每項(xiàng)數(shù)據(jù)是否滿足設(shè)置函數(shù)條件,為true就發(fā)送,為false不發(fā)送,且終止發(fā)送事件(后面的事件也不發(fā)送了)。 |
| skipWhile() | 判斷發(fā)送的每項(xiàng)數(shù)據(jù)是否滿足設(shè)置函數(shù)條件,為true就不發(fā)送,為false發(fā)送,且終止發(fā)送事件。 |
| takeUntil() | 接收第一個(gè)Observable(調(diào)用takUtil的Observable)發(fā)送的數(shù)據(jù),當(dāng)?shù)诙€(gè)Observable(takUtil參數(shù)中的Observable)發(fā)送數(shù)據(jù)時(shí),兩個(gè)Obserable會(huì)同時(shí)取消訂閱。 |
| skipUntil() | 與takeUtil()正好相反,不接收第一個(gè)Observable發(fā)送的數(shù)據(jù),直到第二個(gè)Observable發(fā)送數(shù)據(jù)時(shí)才接收第一個(gè)Observable的數(shù)據(jù),此時(shí)第二個(gè)Observable會(huì)取消訂閱。 |
| sequenceEqual() | 判定兩個(gè)Observables需要發(fā)送的數(shù)據(jù)是否相同,相同返回 true,不相同返回 false |
| contains() | 判斷發(fā)送的數(shù)據(jù)中是否包含指定數(shù)據(jù)。 |
| isEmpty() | 判斷發(fā)送的數(shù)據(jù)是否為空 |
| amb() | 當(dāng)需要發(fā)送多個(gè) Observable時(shí),只發(fā)送最先發(fā)送的Observable的數(shù)據(jù),而其余Observable則被丟棄。 |
4.背壓策略。
當(dāng)在異步訂閱中,通過(guò)Observable發(fā)射、處理、響應(yīng)數(shù)據(jù)流時(shí),如果事件產(chǎn)生的速度遠(yuǎn)遠(yuǎn)快于事件消費(fèi)的速度,這些沒(méi)來(lái)得及處理的數(shù)據(jù)就會(huì)越積越多,這些數(shù)據(jù)不會(huì)丟失,也不會(huì)被垃圾回收機(jī)制回收,而是存放在一個(gè)異步緩存池中,緩存池的數(shù)據(jù)一直得不到處理,最終會(huì)導(dǎo)致OOM等異常。這就是響應(yīng)式編程中的背壓?jiǎn)栴}??偨Y(jié)一下就是: 事件產(chǎn)生的速度大于事件消費(fèi)的速度,數(shù)據(jù)堆積,最終造成OOM等異常。
RxJava2把對(duì)背壓?jiǎn)栴}的處理邏輯從Observable中抽取出來(lái)產(chǎn)生了新的可觀察對(duì)象Flowable,它是在Observable基礎(chǔ)上做了優(yōu)化,所以O(shè)bservable能做的,它都能做,但是加了背壓支持和其他的邏輯處理,它的效率比Observable慢得多,所以在需要用到背壓的時(shí)候再用Flowable,其他時(shí)候還是正常使用Observable。網(wǎng)上找了兩張圖,可以很直觀地看出RxJava1和2對(duì)背壓的改動(dòng)。


簡(jiǎn)單舉個(gè)例子:
Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
//異步訂閱時(shí),代表的是 異步緩存池中可放入數(shù)據(jù)的數(shù)量,一開(kāi)始是128,當(dāng)產(chǎn)生10個(gè)事件而沒(méi)有消費(fèi)時(shí),此時(shí)這個(gè)值是128-10=118。
KLog.d(TAG, "異步緩存池中可放入數(shù)據(jù)的數(shù)量 = " + emitter.requested());
// 一共發(fā)送4個(gè)事件
KLog.d(TAG, "發(fā)送事件 1");
emitter.onNext(1);
KLog.d(TAG, "發(fā)送事件 2");
emitter.onNext(2);
KLog.d(TAG, "發(fā)送事件 3");
emitter.onNext(3);
KLog.d(TAG, "發(fā)送事件 4");
emitter.onNext(4);
KLog.d(TAG, "發(fā)送事件 onComplete()");
emitter.onComplete();
KLog.d(TAG, "異步緩存池中可放入數(shù)據(jù)的數(shù)量 = " + emitter.requested());
// //模擬緩存超過(guò)128
// for (int i = 0;i< 129; i++) {
// Log.d(TAG, "發(fā)送了事件" + i);
// emitter.onNext(i);
// }
// emitter.onComplete();
//
}
}, BackpressureStrategy.ERROR) //緩存區(qū)超過(guò)128,直接拋出異常
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
// 在異步訂閱情況下,一定要調(diào)用request,否則下流不接收事件
// 只接收多少個(gè)事件
s.request(3);
}
@Override
public void onNext(Integer integer) {
KLog.d(TAG, "接收到了事件" + integer);
}
@Override
public void onError(Throwable t) {
KLog.d(TAG, "onError:", t);
}
@Override
public void onComplete() {
KLog.d(TAG, "onComplete");
}
});
可以看到Flowable和Subscriber的使用方式和之前的Observable和Observer極其類似,不同點(diǎn)如下:
1、create方法中多了一個(gè)BackpressureStrategy類型的參數(shù)。
2、Flowable發(fā)射數(shù)據(jù)時(shí),使用FlowableEmitter,而Observable用的是ObservableEmitter。
3、Subscriber中,方法onSubscribe回調(diào)的參數(shù)不是Disposable而是Subscription。
下面對(duì)這三點(diǎn)進(jìn)行說(shuō)明。
BackpressureStrategy
緩存策略,也既是當(dāng)緩存區(qū)存滿、上流仍然繼續(xù)發(fā)送下事件時(shí),該如何處理的策略。默認(rèn)緩存區(qū)大小是128(與Flowable的buffersize大小有關(guān))。具體有哪些策略如下:
BackpressureStrategy.ERROR: 直接拋異常。
BackpressureStrategy.MISSING: 友好提示:緩存區(qū)滿了。
BackpressureStrategy.BUFFER: 將緩存區(qū)設(shè)為無(wú)限大。
BackpressureStrategy.DROP: 超過(guò)緩存區(qū)大小128的事件丟棄。
BackpressureStrategy.LATEST: 只保存最后事件,超過(guò)緩存區(qū)大小128的事件丟棄。
Subscription
Subscription比Disposable多了一個(gè)方法request()。它的作用是告訴上游,下游需要多少數(shù)據(jù),如果不設(shè)置request默認(rèn)是0。比如設(shè)置為3,那么超過(guò)范圍之外的數(shù)據(jù)就不接收了。如果多次調(diào)用request(),會(huì)累加。
FlowableEmitter
它比ObservableEmitter多了一個(gè)方法requested(),這個(gè)方法返回的是異步緩存池中可放入數(shù)據(jù)的數(shù)量,比如一開(kāi)始是128,當(dāng)產(chǎn)生10個(gè)事件而沒(méi)有消費(fèi)時(shí),此時(shí)這個(gè)值是128-10=118。
Tips:不管Subscription.request(xx)設(shè)置了什么值,F(xiàn)lowableEmitter都會(huì)發(fā)送事件的,發(fā)送了不接收的就放入緩存里。背壓在異步訂閱中才有用,如果是同步訂閱,是不會(huì)有緩存池的。
5.具體的使用場(chǎng)景
實(shí)際項(xiàng)目中,我只用來(lái)做過(guò)驗(yàn)證碼倒計(jì)時(shí)、網(wǎng)絡(luò)請(qǐng)求,RxBus。雖然了解了一些操作符,但網(wǎng)上的教程講解這些操作符的時(shí)候都是以簡(jiǎn)單的例子來(lái)講解的,具體的應(yīng)用場(chǎng)景還是比較玄乎。
1)RxBus
EventBus是一個(gè)基于發(fā)布/訂閱的事件總線,它簡(jiǎn)化了組件之間的通信操作。而這些RxBus都能做,所以用RxBus替換EventBus,在RxJava1就已經(jīng)提出來(lái)了,如果自己的事件發(fā)送的要求不高,可以自己封裝一個(gè)RxBus使用。17年的時(shí)候找的一篇文章 Android 用RxJava模擬一個(gè)EventBus ———RxBus,我在這基礎(chǔ)上稍微改動(dòng)了一下,但是沒(méi)有做粘性事件。
原理簡(jiǎn)單來(lái)說(shuō)就是找個(gè)容器裝所有的觀察者,當(dāng)有某個(gè)事件產(chǎn)生時(shí),找到所有需要這個(gè)事件的觀察者,向它們發(fā)送事件。這個(gè)是否發(fā)送事件的依據(jù),其實(shí)就是看觀察者要什么樣的class,比如訂閱String類型的,那么Integer類型的事件就不會(huì)發(fā)給它。
兩個(gè)新知識(shí):
1)CompositeDisposable
一個(gè)disposable的容器,可以容納多個(gè)disposable
2)Subject
Subject可以同時(shí)代表 Observer 和 Observable,允許從數(shù)據(jù)源中多次發(fā)送結(jié)果給多個(gè)觀察者。
| Subject 類別 | 簡(jiǎn)述 |
|---|---|
| AsyncSubject | 只有當(dāng) Subject 調(diào)用 onComplete 方法時(shí),才會(huì)將 Subject 中的最后一個(gè)事件傳遞給所有的 Observer。 |
| BehaviorSubject | 當(dāng)觀察者訂閱BehaviorSubject時(shí),它開(kāi)始發(fā)射原始Observable最近發(fā)射的數(shù)據(jù)(如果此時(shí)還沒(méi)有收到任何數(shù)據(jù),它會(huì)發(fā)射一個(gè)默認(rèn)值),然后繼續(xù)發(fā)射其它任何來(lái)自原始Observable的數(shù)據(jù)。然而,如果原始的Observable因?yàn)榘l(fā)生了一個(gè)錯(cuò)誤而終止,BehaviorSubject將不會(huì)發(fā)射任何數(shù)據(jù),只是簡(jiǎn)單的向前傳遞這個(gè)錯(cuò)誤通知。Rxlifecycle2庫(kù)用的就是這個(gè)。 |
| PublishSubject | 不會(huì)改變事件的發(fā)送順序,在已經(jīng)發(fā)送了一部分事件之后注冊(cè)的 Observer 不會(huì)收到之前發(fā)送的事件。RxBus用的這個(gè)。 |
| ReplaySubject | 無(wú)論什么時(shí)候注冊(cè) Observer 都可以接收到任何時(shí)候通過(guò)該 Observable 發(fā)射的事件。 RxBus粘性事件可用這個(gè)。 |
| UnicastSubject | 只允許一個(gè) Observer 進(jìn)行監(jiān)聽(tīng),在該 Observer 注冊(cè)之前會(huì)將發(fā)射的所有的事件放進(jìn)一個(gè)隊(duì)列中, 并在 Observer 注冊(cè)的時(shí)候一起通知給它。 |
public class RxBus {
/**
* CompositeDisposable定義:
* 一個(gè)disposable的容器,可以容納多個(gè)disposable,添加和去除的復(fù)雜度為O(1)
*
* 此處使用目的:
* 是為了一個(gè)訂閱者能夠?qū)?yīng)多個(gè)Disposable,在需要的時(shí)候調(diào)用 Disposable 的 dispose()取消訂閱。
*
* 舉個(gè)例子:
* XXActivity,訂閱了事件A和事件B,關(guān)閉時(shí)要取消訂閱。
* 那么只需在mSubscriptionMap里找到key為XXActivity的Value(CompositeDisposable),再取出Disposable
* 取消訂閱即可。
*
* 如果不用的話,就要為每一個(gè)Activity寫一個(gè)容器去保存它的訂閱的事件了。
* */
private HashMap<String, CompositeDisposable> mSubscriptionMap;
private static volatile RxBus mRxBus;
private final Subject<Object> mSubject;
public static RxBus getIntanceBus(){
if (mRxBus==null){
synchronized (RxBus.class){
if(mRxBus==null){
mRxBus = new RxBus();
}
}
}
return mRxBus;
}
/**
* 正常訂閱可用PublishSubject、黏性事件可用ReplaySubject。
*/
private RxBus(){
mSubject = PublishSubject.create().toSerialized();
}
/**
* 一個(gè)默認(rèn)的訂閱方法
* @param <T>
* @param type 過(guò)濾,只返回特定數(shù)據(jù)類型的數(shù)據(jù)
* @param next next()事件,正常接收的事件
* @param error error()事件,錯(cuò)誤接收的事件
* @return
*/
public <T> Disposable doSubscribe(Class<T> type, Consumer<T> next, Consumer<Throwable> error){
return getObservable(type)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(next,error);
}
/**
* 返回指定類型的帶背壓的Flowable實(shí)例
* 這里選定背壓模式是 BackpressureStrategy.BUFFER,緩存區(qū)無(wú)限大
* @param <T>
* @param type 過(guò)濾,只返回特定數(shù)據(jù)類型的數(shù)據(jù)
* @return
*/
public <T> Flowable<T> getObservable(Class<T> type){
return mSubject.toFlowable(BackpressureStrategy.BUFFER).ofType(type);
}
/**
* 發(fā)送事件
* @param o 事件
*/
public void post(Object o){
mSubject.onNext(o);
}
/**
* 判斷是否已有觀察者訂閱
*
* @return
*/
public boolean hasObservers() {
return mSubject.hasObservers();
}
/**
* 保存訂閱后的disposable,取消訂閱的時(shí)候要用
* @param o 訂閱的目標(biāo)
* @param disposable
*/
public void addSubscription(Object o, Disposable disposable) {
if (mSubscriptionMap == null) {
mSubscriptionMap = new HashMap<>();
}
//這里key值取訂閱目標(biāo)的實(shí)體名稱(com.xx.xx.xxx),不是底層類檢稱
String key = o.getClass().getName();
//disposable 放到對(duì)應(yīng)的 CompositeDisposable 里
if (mSubscriptionMap.get(key) != null) {
mSubscriptionMap.get(key).add(disposable);
} else {
CompositeDisposable disposables = new CompositeDisposable();
disposables.add(disposable);
mSubscriptionMap.put(key, disposables);
}
}
/**
* 取消訂閱
* @param o
*/
public void unSubscribe(Object o) {
if (mSubscriptionMap == null) {
return;
}
String key = o.getClass().getName();
if (!mSubscriptionMap.containsKey(key)){
return;
}
if (mSubscriptionMap.get(key) != null) {
mSubscriptionMap.get(key).dispose();
}
mSubscriptionMap.remove(key);
}
/********************************* 為RxEventBean 封裝 **********************************/
/**
* 使用EventBus時(shí),為了方便查找,一般都會(huì)封裝 EventBean(int code,Object content)
* 但是所有的訂閱者都是訂閱的這個(gè)類型,所以要自己做判斷做類型轉(zhuǎn)換。
*
* 這里寫死事件是RxEventBean,且錯(cuò)誤處理一般不處理,只用處理onNext即可。
* @param context
* @param action
* */
public void register(Context context, Consumer<RxEventBean> action) {
Disposable disposable = RxBus.getIntanceBus().doSubscribe(RxEventBean.class, action,
throwable -> KLog.e("RxEventBean onError()", throwable.toString()));
RxBus.getIntanceBus().addSubscription(context,disposable);
}
/**
* 發(fā)送RxEventBean 事件
* @param code code,用于判斷
* @param content 內(nèi)容,接收后做類型轉(zhuǎn)換
*/
public void post(int code, Object content){
RxEventBean<Object> event = new RxEventBean<>();
event.code = code;
event.content = content;
post(event);
}
}
2)驗(yàn)證碼倒計(jì)時(shí)
其實(shí)關(guān)鍵代碼就是用intervalRange()操作符。
public class RxCodeHelper {
private Context mContext;
private TextView codeBt;
private Disposable mDisposable;
/**
* 構(gòu)造函數(shù)
* @param mContext 上下文
* @param codeBt 驗(yàn)證碼的button
*/
public RxCodeHelper(Context mContext, TextView codeBt) {
this.mContext = mContext;
this.codeBt = codeBt;
}
/**
* 開(kāi)啟倒計(jì)時(shí)
*/
public void start(){
codeBt.setEnabled(false);
//從0開(kāi)始,走60個(gè)數(shù),延遲是0s,周期為1次。
Observable.intervalRange(0,60,0,1, TimeUnit.SECONDS)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<Long>() {
@Override
public void onSubscribe(Disposable d) {
mDisposable = d;
}
@Override
public void onNext(Long aLong) {
codeBt.setText((60 - aLong) + "s后可重發(fā)");
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
//倒計(jì)時(shí)完畢置為可點(diǎn)擊狀態(tài)
codeBt.setEnabled(true);
codeBt.setText("獲取驗(yàn)證碼");
}
});
}
/**
* 請(qǐng)求失敗時(shí),重置狀態(tài)
* 取消倒計(jì)時(shí)的訂閱事件
*/
public void reset(){
if(mDisposable!=null){
mDisposable.dispose();
}
codeBt.setEnabled(true);
codeBt.setText("獲取驗(yàn)證碼");
}
/**
* 界面銷毀時(shí),取消倒計(jì)時(shí)訂閱事件
*/
public void stop(){
if(mDisposable!=null){
mDisposable.dispose();
}
}
}
3)RxLifecycle
目的:解決RxJava的內(nèi)存泄漏問(wèn)題。
用法:
bindUntilEvent(@NonNull ActivityEvent event)——綁定指定的生命周期,在指定生命周期時(shí)取消訂閱。
bindToLifecycle() ——綁定生命周期,取消訂閱策略可看源碼。以Activity為例,可看RxLifecycleAndroid 下的 ACTIVITY_LIFECYCLE, 實(shí)際上在onCreate()訂閱的,在onDestroy()取消訂閱;在onResume()訂閱的,在onPause()取消訂閱。
知識(shí)點(diǎn):
compose():
將一種類型的Observable轉(zhuǎn)換成另一種類型的Observable,保證調(diào)用的鏈?zhǔn)浇Y(jié)構(gòu)。
LifecycleTransformer:
LifecycleTransformer實(shí)現(xiàn)了各種Transformer接口,能夠?qū)⒁粋€(gè)Observable/Flowable/Single/Completable/Maybe對(duì)象轉(zhuǎn)換成另一個(gè) Observable/Flowable/Single/Completable/Maybe對(duì)象。正好配合上文的compose操作符,使用在鏈?zhǔn)秸{(diào)用中。
舉例在網(wǎng)絡(luò)請(qǐng)求時(shí)綁定生命周期,mvp模式。
Activity繼承了RxFragmentActivity后,BaseView接口里新增兩個(gè)方法,其他View層接口繼承它。
public interface BaseView {
//為了讓 IView 可以調(diào)用 RxLifeCycle的生命周期綁定
<T> LifecycleTransformer<T> bindToLifecycle();
<T> LifecycleTransformer<T> bindUntilEvent(@NonNull ActivityEvent event);
}
在P層使用,此處模擬網(wǎng)絡(luò)請(qǐng)求,3s后回調(diào)
Observable.timer(3, TimeUnit.SECONDS)
.subscribeOn(Schedulers.io()) //訂閱在io線程
.observeOn(AndroidSchedulers.mainThread()) //回調(diào)在主線程
.compose(mView.bindUntilEvent(ActivityEvent.DESTROY)) //指定在onDestroy銷毀
// .compose(mView.bindToLifecycle()) //取消訂閱交由RxLifeCycle來(lái)判斷
.subscribe(new Observer<Long>() {
@Override
public void onSubscribe(Disposable d) {
mView.showLoading();
}
@Override
public void onNext(Long aLong) {
KLog.d("onTest1Success()回調(diào)成功");
mView.onTest1Success("onTest1 請(qǐng)求成功");
}
@Override
public void onError(Throwable e) {
mView.dismissLoading();
}
@Override
public void onComplete() {
mView.dismissLoading();
}
});
一般使用,可以用compose封裝一下,如下:
/**
* 統(tǒng)一線程處理,且綁定生命周期
* 用法: xxx .compose(RxUtil.rxSchedulerHelper(mView))
* @param view
* @param <T>
* @return
*/
public static <T> ObservableTransformer<T,T> rxSchedulerHelper(BaseView view){
return new ObservableTransformer<T,T>() {
@Override
public ObservableSource<T> apply(@NonNull Observable<T> upstream) {
return upstream
.subscribeOn(Schedulers.io()) //訂閱在io線程
.unsubscribeOn(Schedulers.io()) //取消訂閱在io線程,為啥要這個(gè),不太清楚
.observeOn(AndroidSchedulers.mainThread()) //回調(diào)在主線程
.compose(view.bindToLifecycle()); //綁定生命周期
}
};
}
結(jié)尾
在網(wǎng)上找了一些操作符的簡(jiǎn)單例子和應(yīng)用場(chǎng)景,寫了一個(gè)Demo,看操作符概述的時(shí)候,結(jié)合代碼運(yùn)行結(jié)果來(lái)看更容易理解,有興趣的可以看看。后續(xù)會(huì)在這個(gè)項(xiàng)目里繼續(xù)更新操作符和應(yīng)用場(chǎng)景。
Github地址:
https://github.com/Dengszzzzz/DRxJavaSummary
參考
給 Android 開(kāi)發(fā)者的 RxJava 詳解
這可能是最好的RxJava 2.x 入門教程
Android拾萃 - RxJava2操作符匯總
Rxjava2入門教程五:Flowable背壓支持——對(duì)Flowable最全面而詳細(xì)的講解
Android Rxjava:這是一篇 清晰 & 易懂的Rxjava 入門教程
實(shí)際應(yīng)用場(chǎng)景
Android 用RxJava模擬一個(gè)EventBus ———RxBus
RxLifecycle詳細(xì)解析
...