一、簡介
RxJava是響應(yīng)式編程(Reactive Extensions)的java實現(xiàn),它基于觀察者模式的實現(xiàn)了異步編程接口。
Rxjava 3.x 的github官網(wǎng);
RxJava2將被支持到2021年2月28日,錯誤的會同時在2.x和3.x修復(fù),但新功能只會在3.x上添加;
Rxjava 3.0的一些改變:官方Wiki;
Rxjava 3.x 文檔可以在官方j(luò)avadoc中找到;
使用Rxjava3.x之前的準(zhǔn)備工作:
1.1 添加依賴:
//RxJava的依賴包
implementation 'io.reactivex.rxjava3:rxandroid:3.0.0'
//RxAndroid的依賴包
implementation 'io.reactivex.rxjava3:rxjava:3.0.0'
1.2 將項目的編譯目標(biāo)設(shè)置更改為 java8:
android {
compileOptions {
sourceCompatibility JavaVersion.VERSION_1_8
targetCompatibility JavaVersion.VERSION_1_8
}
}
二、Rx概念
2.1 字段含義
Reactive 直譯為反應(yīng)性的,有活性的,根據(jù)上下文一般翻譯為反應(yīng)式、響應(yīng)式
Iterable 可迭代對象,支持以迭代器的形式遍歷,許多語言中都存在這個概念
Observable 可觀察對象,在Rx中定義為更強(qiáng)大的Iterable,在觀察者模式中是被觀察的對象,一旦數(shù)據(jù)產(chǎn)生或發(fā)生變化,會通過某種方式通知觀察者或訂閱者
Observer 觀察者對象,監(jiān)聽Observable發(fā)射的數(shù)據(jù)并做出響應(yīng),Subscriber是它的一個特殊實現(xiàn)
emit 直譯為發(fā)射,發(fā)布,發(fā)出,含義是Observable在數(shù)據(jù)產(chǎn)生或變化時發(fā)送通知給Observer,調(diào)用Observer對應(yīng)的方法,文章里一律譯為發(fā)射
items 直譯為項目,條目,在Rx里是指Observable發(fā)射的數(shù)據(jù)項,文章里一律譯為數(shù)據(jù),數(shù)據(jù)項。
2.2 上/下流
在RxJava中,數(shù)據(jù)以流的方式組織:Rxjava包括一個源數(shù)據(jù)流,源數(shù)據(jù)流后跟著若干個用于消費(fèi)數(shù)據(jù)流的步驟。
source
.operator1()
.operator2()
.operator3()
.subscribe(consumer)
在代碼中,對于operator2來說,在它前面叫做上流,在它后面的叫做下流。
2.3 流對象
在RxJava的文檔中,emission, emits, item, event, signal, data and message都被認(rèn)為在數(shù)據(jù)流中被傳遞的數(shù)據(jù)對象。
2.4 背壓(Backpressure)
當(dāng)上下游在不同的線程中,通過Observable發(fā)射,處理,響應(yīng)數(shù)據(jù)流時,如果上游發(fā)射數(shù)據(jù)的速度快于下游接收處理數(shù)據(jù)的速度,這樣對于那些沒來得及處理的數(shù)據(jù)就會造成積壓,這些數(shù)據(jù)既不會丟失,也不會被垃圾回收機(jī)制回收,而是存放在一個異步緩存池中,如果緩存池中的數(shù)據(jù)一直得不到處理,越積越多,最后就會造成內(nèi)存溢出,這便是響應(yīng)式編程中的背壓(backpressure)問題。
為此,RxJava帶來了backpressure的概念。背壓是一種流量的控制步驟,在不知道上流還有多少數(shù)據(jù)的情形下控制內(nèi)存的使用,表示它們還能處理多少數(shù)據(jù)。背壓是指在異步場景中,被觀察者發(fā)送事件速度遠(yuǎn)快于觀察者的處理速度的情況下,一種告訴上游的被觀察者降低發(fā)送速度的策略
在Rxjava1.0中,有的Observable支持背壓,有的不支持,為了解決這種問題,2.0把支持背壓和不支持背壓的Observable區(qū)分開來:支持背壓的有Flowable類,不支持背壓的有Observable,Single, Maybe and Completable類。
- 在訂閱的時候如果使用FlowableSubscriber,那么需要通過s.request(Long.MAX_VALUE)去主動請求上游的數(shù)據(jù)項。如果遇到背壓報錯的時候,F(xiàn)lowableSubscriber默認(rèn)已經(jīng)將錯誤try-catch,并通過onError()進(jìn)行回調(diào),程序并不會崩潰;
- 在訂閱的時候如果使用Consumer,那么不需要主動去請求上游數(shù)據(jù),默認(rèn)已經(jīng)調(diào)用了s.request(Long.MAX_VALUE)。如果遇到背壓報錯、且對Throwable的Consumer沒有new出來,則程序直接崩潰;
- 背壓策略的上游的默認(rèn)緩存池是128。
背壓策略:
error, 緩沖區(qū)大概在128
buffer, 緩沖區(qū)在1000左右
drop, 把存不下的事件丟棄
latest, 只保留最新的
missing, 缺省設(shè)置,不做任何操作
public enum BackpressureStrategy {
/**
* OnNext events are written without any buffering or dropping.
* Downstream has to deal with any overflow.
* <p>Useful when one applies one of the custom-parameter onBackpressureXXX operators.
*/
MISSING,
/**
* Signals a MissingBackpressureException in case the downstream can't keep up.
*/
ERROR,
/**
* Buffers <em>all</em> onNext values until the downstream consumes it.
*/
BUFFER,
/**
* Drops the most recent onNext value if the downstream can't keep up.
*/
DROP,
/**
* Keeps only the latest onNext value, overwriting any previous value if the
* downstream can't keep up.
*/
LATEST
}
2.5 線程調(diào)度器(Schedulers)
對于Android開發(fā)者而言,RxJava最簡單的是通過調(diào)度器來方便地切換線程。在不同平臺還有不同的調(diào)度器,例如我們Android的主線程:AndroidSchedulers.mainThread()。
| 屬性 | 類型 |
|---|---|
| AndroidSchedulers.mainThread() | 需要引用rxandroid, 切換到UI線程 |
| Schedulers.computation() | 用于計算任務(wù),如事件循環(huán)和回調(diào)處理,默認(rèn)線程數(shù)等于處理器數(shù)量 |
| Schedulers.io() | 用于IO密集型任務(wù),如異步阻塞IO操作,這個調(diào)度器的線程池會根據(jù)需求,它默認(rèn)是一個CacheThreadScheduler |
| Schedulers.newThread() | 為每一個任務(wù)創(chuàng)建一個新線程 |
| Schedulers.trampoline() | 在當(dāng)前線程中立刻執(zhí)行,如當(dāng)前線程中有任務(wù)在執(zhí)行則將其暫停, 等插入進(jìn)來的任務(wù)執(zhí)行完成之后,在將未完成的任務(wù)繼續(xù)完成。 |
| Scheduler.from(executor) | 指定Executor作為調(diào)度器 |
2.6 事件調(diào)度器
RxJava事件發(fā)出去并不是置之不顧,要有合理的管理者來管理它們,在合適的時機(jī)要進(jìn)行釋放事件,這樣才不會導(dǎo)致內(nèi)存泄漏,這里的管理者我們稱為事件調(diào)度器(或事件管理者)CompositeDisposable。
2.7 基類
RxJava 3 中的基類相比RxJava 2 沒啥改變,主要有以下幾個基類:
- io.reactivex.Flowable:發(fā)送0個N個的數(shù)據(jù),支持Reactive-Streams和背壓
- io.reactivex.Observable:發(fā)送0個N個的數(shù)據(jù),不支持背壓,
- io.reactivex.Single:只能發(fā)送單個數(shù)據(jù)或者一個錯誤
- io.reactivex.Completable:沒有發(fā)送任何數(shù)據(jù),但只處理 onComplete 和 onError 事件。
- io.reactivex.Maybe:能夠發(fā)射0或者1個數(shù)據(jù),要么成功,要么失敗。
2.8 Observables的"熱"和"冷"
Observable什么時候開始發(fā)射數(shù)據(jù)序列?這取決于Observable的實現(xiàn),一個"熱"的Observable可能一創(chuàng)建完就開始發(fā)射數(shù)據(jù),因此所有后續(xù)訂閱它的觀察者可能從序列中間的某個位置開始接受數(shù)據(jù)(有一些數(shù)據(jù)錯過了)。一個"冷"的Observable會一直等待,直到有觀察者訂閱它才開始發(fā)射數(shù)據(jù),因此這個觀察者可以確保會收到整個數(shù)據(jù)序列。
在一些ReactiveX實現(xiàn)里,還存在一種被稱作Connectable的Observable,不管有沒有觀察者訂閱它,這種Observable都不會開始發(fā)射數(shù)據(jù),除非Connect方法被調(diào)用。
基本使用
需要知道的是,RxJava以觀察者模式為骨架,有兩種常見的觀察者模式:
- Observable(被觀察者)/Observer(觀察者):不支持背壓
- Flowable(被觀察者)/Subscriber(觀察者):支持背壓
3.1 Observable/Observer用法:
Observable mObservable=Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onComplete();
}
});
Observer mObserver=new Observer<Integer>() {
//這是新加入的方法,在訂閱后發(fā)送數(shù)據(jù)之前,
//回首先調(diào)用這個方法,而Disposable可用于取消訂閱
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer value) {
Log.e("lucas", "onNext: "+value );
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
};
mObservable.subscribe(mObserver);
這種觀察者模型不支持背壓:當(dāng)被觀察者快速發(fā)送大量數(shù)據(jù)時,下游不會做其他處理,即使數(shù)據(jù)大量堆積,調(diào)用鏈也不會報MissingBackpressureException,消耗內(nèi)存過大只會OOM。所以,當(dāng)我們使用Observable/Observer的時候,我們需要考慮的是,數(shù)據(jù)量是不是很大(官方給出以1000個事件為分界線作為參考)。
3.2 Flowable/Subscriber用法
Flowable.range(0,10)
.subscribe(new Subscriber<Integer>() {
Subscription sub;
//當(dāng)訂閱后,會首先調(diào)用這個方法,其實就相當(dāng)于onStart(),
//傳入的Subscription s參數(shù)可以用于請求數(shù)據(jù)或者取消訂閱
@Override
public void onSubscribe(Subscription s) {
Log.w("TAG","onsubscribe start");
sub=s;
sub.request(1);
Log.w("TAG","onsubscribe end");
}
@Override
public void onNext(Integer o) {
Log.w("TAG","onNext--->"+o);
sub.request(1);
}
@Override
public void onError(Throwable t) {
t.printStackTrace();
}
@Override
public void onComplete() {
Log.w("TAG","onComplete");
}
});
Flowable是支持背壓的,也就是說,一般而言,上游的被觀察者會響應(yīng)下游觀察者的數(shù)據(jù)請求,下游調(diào)用request(n)來告訴上游發(fā)送多少個數(shù)據(jù)。這樣避免了大量數(shù)據(jù)堆積在調(diào)用鏈上,使內(nèi)存一直處于較低水平。
當(dāng)然,F(xiàn)lowable也可以通過creat()來創(chuàng)建:
Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onNext(4);
e.onComplete();
}
}
//需要指定背壓策略
, BackpressureStrategy.BUFFER);
Flowable雖然可以通過create()來創(chuàng)建,但是你必須指定背壓的策略,以保證你創(chuàng)建的Flowable是支持背壓的。
根據(jù)上面的代碼的結(jié)果輸出中可以看到,當(dāng)我們調(diào)用subscription.request(n)方法的時候,不等onSubscribe()中后面的代碼執(zhí)行,就會立刻執(zhí)行到onNext方法,因此,如果你在onNext方法中使用到需要初始化的類時,應(yīng)當(dāng)盡量在subscription.request(n)這個方法調(diào)用之前做好初始化的工作;
當(dāng)然,這也不是絕對的,我在測試的時候發(fā)現(xiàn),通過create()自定義Flowable的時候,即使調(diào)用了subscription.request(n)方法,也會等onSubscribe()方法中后面的代碼都執(zhí)行完之后,才開始調(diào)用onNext。
TIPS: 盡可能確保在request()之前已經(jīng)完成了所有的初始化工作,否則就有空指針的風(fēng)險。
3.3 其他的觀察者
最常用的其實就是上面說的兩種訂閱觀察者,但是一些情況下,我們也會用到一些其他的一類觀察者比如
- Single/SingleObserver
- Completable/CompletableObserver
- Maybe/MaybeObserver
3.3.1 Single/SingleObserver用法
//被觀察者
Single<String> single = Single.create(new SingleOnSubscribe<String>() {
@Override
public void subscribe(SingleEmitter<String> e) throws Exception {
e.onSuccess("test");
e.onSuccess("test2");//錯誤寫法,重復(fù)調(diào)用也不會處理,因為只會調(diào)用一次
}
});
//訂閱觀察者SingleObserver
single.subscribe(new SingleObserver<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onSuccess(String s) {
//相當(dāng)于onNext和onComplete
Log.d("lucas", s );
}
@Override
public void onError(Throwable e) {
}
});
//運(yùn)行結(jié)果
2020-04-03 23:02:37.337 15462-15462/com.ysalliance.getfan.myapplication D/lucas: test
Single類似于Observable,不同的是,它總是只發(fā)射一個值,或者一個錯誤通知,而不是發(fā)射一系列的值(當(dāng)然就不存在背壓問題),所以當(dāng)你使用一個單一連續(xù)事件流,這樣你可以使用Single。Single觀察者只包含兩個事件,一個是正常處理成功的onSuccess,另一個是處理失敗的onError。因此,不同于Observable需要三個方法onNext, onError, onCompleted,訂閱Single只需要兩個方法:
onSuccess - Single發(fā)射單個的值到這個方法
onError - 如果無法發(fā)射需要的值,Single發(fā)射一個Throwable對象到這個方法
Single只會調(diào)用這兩個方法中的一個,而且只會調(diào)用一次,調(diào)用了任何一個方法之后,訂閱關(guān)系終止。
Single的操作符:
Single也可以組合使用多種操作,一些操作符讓你可以混合使用Observable和Single:

詳細(xì)可參考:Single操作符
3.3.2 Completable/CompletableObserver
如果你的觀察者連onNext事件都不關(guān)心,可以使用Completable,它只有onComplete和onError兩個事件:
Completable.create(new CompletableOnSubscribe() {//被觀察者
@Override
public void subscribe(CompletableEmitter e) throws Exception {
e.onComplete();//單一onComplete或者onError
}
}).subscribe(new CompletableObserver() {//觀察者
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onComplete() {
Log.e("lucas", "onComplete: ");
}
@Override
public void onError(Throwable e) {
}
});
//打印結(jié)果
2020-04-03 23:12:08.099 16264-16264/com.ysalliance.getfan.myapplication E/lucas: onComplete:
3.3.3 Maybe/MaybeObserver
如果你有一個需求是可能發(fā)送一個數(shù)據(jù)或者不會發(fā)送任何數(shù)據(jù),這時候你就需要Maybe,它類似于Single和Completable的混合體。
Maybe可能會調(diào)用以下其中一種情況(也就是所謂的Maybe):
onSuccess或者onError
onComplete或者onError
可以看到onSuccess和onComplete是互斥的存在,例子代碼如下:
//被觀察者
Maybe<String> maybe = Maybe.create(new MaybeOnSubscribe<String>() {
@Override
public void subscribe(MaybeEmitter<String> e) throws Exception {
e.onSuccess("test");//發(fā)送一個數(shù)據(jù)的情況,或者onError,不需要再調(diào)用onComplete(調(diào)用了也不會觸發(fā)onComplete回調(diào)方法)
//e.onComplete();//不需要發(fā)送數(shù)據(jù)的情況,或者onError
}
});
//訂閱觀察者
maybe.subscribe(new MaybeObserver<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onSuccess(String s) {
//發(fā)送一個數(shù)據(jù)時,相當(dāng)于onNext和onComplete,但不會觸發(fā)另一個方法onComplete
Log.i("lucas", s);
}
@Override
public void onComplete() {
//無數(shù)據(jù)發(fā)送時候的onComplete事件
Log.i("lucas", "onComplete");
}
@Override
public void onError(Throwable e) {
}
});
//打印結(jié)果
2020-04-03 23:14:40.266 16558-16558/com.ysalliance.getfan.myapplication I/lucas: test
要轉(zhuǎn)換成其他類型的被觀察者,也是可以使用toFlowable()、toObservable()等方法去轉(zhuǎn)換。
//判斷是否登陸
Maybe.just(isLogin())
//可能涉及到IO操作,放在子線程
.subscribeOn(Schedulers.newThread())
//取回結(jié)果傳到主線程
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new MaybeObserver<Boolean>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onSuccess(Boolean value) {
if(value){
...
}else{
...
}
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
上面就是Maybe/MaybeObserver的普通用法,你可以看到,實際上,這種觀察者模式并不用于發(fā)送大量數(shù)據(jù),而是發(fā)送單個數(shù)據(jù),也就是說,當(dāng)你只想要某個事件的結(jié)果(true or false)的時候,你可以用這種觀察者模式
3.4 事件調(diào)度器釋放事件
public class Main {
private static CompositeDisposable mRxEvent = new CompositeDisposable();
public static void main(String[] args) {
Disposable subscribe = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
e.onNext("俊俊俊很帥");
e.onNext("你值得擁有");
e.onNext("取消關(guān)注");
e.onNext("但還是要保持微笑");
e.onComplete();
}
}).subscribe(
new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
//對應(yīng)onNext()
System.out.println("accept=" + s);
}
}, new Consumer<Throwable>() {
@Override
public void accept(@NonNull Throwable throwable) throws Exception {
//對應(yīng)onError()
}
}, new Action() {
@Override
public void run() throws Exception {
//對應(yīng)onComplete()
}
}, new Consumer<Disposable>() {
@Override
public void accept(@NonNull Disposable disposable) throws Exception {
//對應(yīng)onSubscribe()
}
});
mRxEvent.add(subscribe);
mRxEvent.clear();
}
}
CompositeDisposable提供的方法中,都是對事件的管理
- dispose():釋放所有事件
- clear():釋放所有事件,實現(xiàn)同dispose()
- add():增加某個事件
- addAll():增加所有事件
- remove():移除某個事件并釋放
- delete():移除某個事件