
從16年11月份推出RxJava 2.0 ,到現(xiàn)在差不多大半年的時間里,RxJava已經(jīng)來到了2.x時代,RxJava 1.x 可能也慢慢地被2.x 代替。RxJava 2.x在Reactive-Streams規(guī)范的基礎(chǔ)上從頭開始完全重寫,雖然操作符基本沒有發(fā)生變化,但是因為Reactive-Streams具有不同的架構(gòu),因此對一些眾所周知的RxJava類型進(jìn)行了更改。
RxJava 2.x相對與1.x還是有很多的不同,RxJava的類型更改了,很多類的命名和方法的命名發(fā)生了變化(可能功能與1.x相同),要想從RxJava 1.x 順利地過渡到2.x, 就得了解這些變化。因此本教程就帶你了解這些變化,從而能更快地上手RxJava2.x。
本篇文章就來先了解一下RxJava 2.x的5種響應(yīng)類型。

一、RxJava 2.x中5種類型介紹
1 . Observable and Flowable
關(guān)于在RxJava 0.x版本引入背壓的一個小的遺憾是,沒有設(shè)計一個單獨的基礎(chǔ)反應(yīng)類,而是對Observable本身進(jìn)行了改裝。背壓的主要問題在于熱源(如:UI事件),不能合理地反壓并導(dǎo)致不可預(yù)料的異常MissingBackpressureException,這是初學(xué)者不期望看到的。
在RxJava 2.x版本中修復(fù)了這種情況,將o.reactivex.Observable作為非背壓,引入新的io.reactivex.Flowable作為啟用背壓基礎(chǔ)反應(yīng)類。
好消息是,在2.x版本中,主要的操作符保持不變(同1.x版本),壞消息是,在導(dǎo)入的時候應(yīng)當(dāng)小心,它可能會無意的選擇非背壓的o.reactivex.Observable.
我們應(yīng)該選哪種?
當(dāng)構(gòu)建數(shù)據(jù)流(作為RxJava的最終消費者)或考慮您的2.x兼容庫應(yīng)該采取和返回什么類型時,您可以考慮幾個因素,以幫助您避免諸如MissingBackpressureException或OutOfMemoryError之類的問題。
Observable使用場景:
- 數(shù)據(jù)流最長不超過1000個元素,即隨著時間的流逝,應(yīng)用沒有機(jī)會報OOME(OutOfMemoryError)錯誤。
- 處理諸如鼠標(biāo)移動或觸摸事件之類的GUI事件
Flowable使用場景:
- 處理超過10K+ 的元素流
- 從磁盤讀取(解析文件)
- 從數(shù)據(jù)庫讀取數(shù)據(jù)
- 從網(wǎng)絡(luò)獲取數(shù)據(jù)流
2 . Single 使用介紹
Single是2.x版本中的一種基礎(chǔ)響應(yīng)類型,Single是從頭開始重新設(shè)計的,能單獨發(fā)射一個onSuccess或者onError事件,它現(xiàn)在的架構(gòu)來自于the Reactive-Streams設(shè)計。它的消費者類型已經(jīng)從接受rx.Subscription的rx.Single.SingleSubscriber<T>變?yōu)榱?code>io.reactivex.SingleObserver<T>,有3個方法:
interface SingleObserver<T> {
void onSubscribe(Disposable d);
void onSuccess(T value);
void onError(Throwable error);
}
3 . Completable使用介紹
Completeble類型基本保持不變,1.x的版本已經(jīng)沿著Reactive-Streams風(fēng)格設(shè)計,所以沒有用戶級別的更改。
相似地命名改變,rx.Completable.CompletableSubscriber變?yōu)閹в?code>onSubscribe(Disposable)方法的io.reactivex.CompletableObserver:
interface CompletableObserver<T> {
void onSubscribe(Disposable d);
void onComplete();
void onError(Throwable error);
}
4 . Maybe 使用介紹
RxJava 2.0.0-RC2 介紹了一個新的基礎(chǔ)響應(yīng)類型,它叫做Maybe。從概念上來將,它像是 Single和Completable的結(jié)合,它可能發(fā)射0個或者1個項目,或者一個error信號。
Maybe類通過依賴MaybeSource作為它的基礎(chǔ)接口類型MaybeObserver<T>作為信號響應(yīng)接口并且遵循協(xié)議onSubscribe (onSuccess | onError | onComplete)?因為最多可能發(fā)射1個元素,所以Maybe類型沒有背壓的概念(因為它沒有像Flowable和Observable一樣有未知長度的可膨脹緩沖區(qū))
這意味著onSubscribe(Disposable)的調(diào)用潛在地跟隨著其他onXXX方法之一的調(diào)用,不同于Flowable,如果這兒只有一個信號值發(fā)射信號,那么只有onSuccess被調(diào)用,而不會調(diào)用complete。
二、RxJava 2.x中5種類型使用示例
1 . Observable示例
在寫示例之前,我們先來回顧一下 1.x 版本是如何創(chuàng)建Observable和如何訂閱的:
RxJava 1.x :
//創(chuàng)建 observable
Observable observable = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("hello world");
subscriber.onCompleted();
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
//訂閱方式一
observable.subscribe(new Subscriber() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Object o) {
}
});
// 訂閱方式二
observable.subscribe(new Action1() {
@Override
public void call(Object o) {
// onNext
}
});
通過create方法創(chuàng)建Observable,接收一個OnSubscribe接口,其中有一個回調(diào)方法call,參數(shù)為Subscriber,我們用Subscriber來發(fā)射數(shù)據(jù)。通過subscribe方法來訂閱,可以接收一個Subscriber 實現(xiàn)全部方法,也可以接收一個Action1只實現(xiàn)onNext方法。
RxJava 2.x :
//創(chuàng)建Observable
Observable observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
Log.e(TAG,"start emitter data");
e.onNext("Hello");
e.onNext("world");
e.onComplete();
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
// 訂閱方式一:下游消費者 Observer
observable.subscribe(new Observer<String>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
// onSubscribe 是2.x新添加的方法,在發(fā)射數(shù)據(jù)前被調(diào)用,相當(dāng)于1.x的onStart方法
Log.e(TAG,"onSubscribe");
}
@Override
public void onNext(@NonNull String s) {
Log.e(TAG,"onNext");
Log.e(TAG,s);
}
@Override
public void onError(@NonNull Throwable e) {
Log.e(TAG,"onError");
}
@Override
public void onComplete() {
Log.e(TAG,"onComplete");
}
});
// 訂閱方式二:Consumer
observable.subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String o) throws Exception {
Log.e(TAG,"consumer:"+o);
}
});
打印結(jié)果如下:
06-25 14:31:35.435 21505-21505/com.zhouwei.demoforrxjava2 E/MainActivity: onSubscribe
06-25 14:31:35.437 21505-21853/com.zhouwei.demoforrxjava2 E/MainActivity: start emitter data
06-25 14:31:35.438 21505-21505/com.zhouwei.demoforrxjava2 E/MainActivity: onNext:Hello
06-25 14:31:35.438 21505-21505/com.zhouwei.demoforrxjava2 E/MainActivity: onNext:world
06-25 14:31:35.438 21505-21505/com.zhouwei.demoforrxjava2 E/MainActivity: onComplete
其實我們可以對比一下,1.x 和 2.x 方法都試一樣的,只是它們所接收的響應(yīng)接口改變了,對應(yīng)變化如下:
RxJava 1.x -> RxJava 2.x
---------------------------------------
OnSubscribe<T> -> ObservableOnSubscribe<T>
Subscriber<T> -> Observer<T>
Subscriber<T> -> ObservableEmitter<T>
Action1<T> -> Consumer<T>
RxJava 2.x 中對這些接口進(jìn)行了重新設(shè)計,讓一個接口的職責(zé)更加單一,類的命名和方法命名與它的功能更佳符合(見名知意)。如在1.x 中,Subscriber 既能發(fā)射數(shù)據(jù),又能消費數(shù)據(jù),充當(dāng)觀察者和被觀察者。在2.x 中 把它拆解成了2個接口。
ObservableEmitter<T>專門用來發(fā)射數(shù)據(jù),Consumer專門用來消費數(shù)據(jù)。 除此之外,在RxJava 2.x 中,多了一個void onSubscribe(@NonNull Disposable d)回調(diào)方法,參數(shù)為Disposable,Disposable是用來解除訂閱關(guān)系的,這讓我們的解除訂閱變得更佳容易(比起1.x 通過subscribe返回 Subscription)。
上面對比了在RxJava 1.x 和2.x 版本創(chuàng)建Observable的方式,其實在RxJava 2.x中,這5種類型的用法是非常相似的,它們的接口命名規(guī)則相同,只要你知道其中一種,就知道其他幾種類型該如何在上游發(fā)射數(shù)據(jù)和在下游消費數(shù)據(jù)。create接收的類型都為xxxOnSubscrible(xxx為5種類型對應(yīng)的名字),發(fā)射器為xxxEmitter,具體如下表:
| RxJava 2.x 類型 | create參數(shù)(響應(yīng)接口) | 發(fā)射器 | Observer |
|---|---|---|---|
| Observable | ObservableOnSubscribe | ObservableEmitter | Observer |
| Flowable | FlowableOnSubscribe | FlowableEmitter | FlowableSubscriber |
| Single | SingleOnSubscribe | SingleEmitter | SingleObserver |
| Completable | CompletableOnSubscribe | CompletableEmitter | CompletableObserver |
| Maybe | MaybeOnSubscribe | MaybeEmitter | MaybeObserver |
2 . Flowable示例
上面對比了RxJava 1.x 和 2.x 創(chuàng)建使用Observable的方式,并且總結(jié)了2.x 相關(guān)類的改變,如上面表。那么使用Flowable的方式和Observable是很相似的,看一下代碼:
Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull FlowableEmitter<Integer> e) throws Exception {
Log.e(TAG,"start send data ");
for(int i=0;i<100;i++){
e.onNext(i);
}
e.onComplete();
}
}, BackpressureStrategy.DROP)//指定背壓策略
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new FlowableSubscriber<Integer>() {
@Override
public void onSubscribe(@NonNull Subscription s) {
//1, onSubscribe 是2.x新添加的方法,在發(fā)射數(shù)據(jù)前被調(diào)用,相當(dāng)于1.x的onStart方法
//2, 參數(shù)為 Subscription ,Subscription 可用于向上游請求發(fā)射多少個元素,也可用于取笑請求
//3, 必須要調(diào)用Subscription 的request來請求發(fā)射數(shù)據(jù),不然上游是不會發(fā)射數(shù)據(jù)的。
Log.e(TAG,"onSubscribe...");
subscription = s;
s.request(100);
}
@Override
public void onNext(Integer integer) {
Log.e(TAG,"onNext:"+integer);
}
@Override
public void onError(Throwable t) {
Log.e(TAG,"onError..."+t);
}
@Override
public void onComplete() {
Log.e(TAG,"onComplete...");
}
});
Flowable和Observable的使用基本相同,只不過Observable不支持背壓,而Flowable支持背壓。使用的時候,還是要注意幾個小細(xì)節(jié):
1,創(chuàng)建Flowable的時候需要指定一個背壓策略,本文使用的是PBackpressureStrategy.DROP(丟棄策略),RxJava 2.x中,內(nèi)置了5種背壓策略,由于篇幅有限,背壓和背壓策略下一篇拿出來單獨講。
2,onSubscribe 回調(diào)方法中,參數(shù)是Subscription而不是Disposable,前文說過,RxJava 2.x 中,訂閱的管理換成了Disposable,但是Flowable使用的是Subscription,這個Subscription不是1.x 版本中的Subscription,雖然它有取消訂閱的能力。主要用于請求上游元素和取消訂閱。
3,在使用Flowable的時候,必須調(diào)用Subscription 的requsest方法請求,不然上游是不會發(fā)射數(shù)據(jù)的??磖equest的方法解釋:
3 . Single、Completable 和 Maybe 示例
Single、Completable和Maybe就比較簡單,Single用于只發(fā)射一個數(shù)據(jù),Completable不發(fā)送數(shù)據(jù),它給下游發(fā)射一個信號。而Maybe則是Single和Completable的結(jié)合,根據(jù)名字就可以看出,它的結(jié)果是不確定的,可能發(fā)發(fā)射0(Completable)或1(Single) 個元素,或者收到一個Error信號。
Single示例:
Single.create(new SingleOnSubscribe<Boolean>() {
@Override
public void subscribe(@NonNull SingleEmitter<Boolean> e) throws Exception {
Log.e(TAG,"subscribe...");
e.onSuccess(true);
}
})
.observeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new SingleObserver<Boolean>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
Log.e(TAG,"onSubscribe...");
}
@Override
public void onSuccess(@NonNull Boolean aBoolean) {
Log.e(TAG,"onSuccess...");
}
@Override
public void onError(@NonNull Throwable e) {
Log.e(TAG,"onError...");
}
});
Single只發(fā)射一個元素,所以沒有complete 方法,不像Observable或者Flowable,數(shù)據(jù)發(fā)射完成之后,需要調(diào)用complete告訴下游已經(jīng)完成。
Completable示例:
Completable.create(new CompletableOnSubscribe() {
@Override
public void subscribe(@NonNull CompletableEmitter e) throws Exception {
Log.e(TAG,"start send data");
e.onComplete();
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new CompletableObserver() {
@Override
public void onSubscribe(@NonNull Disposable d) {
Log.e(TAG,"onSubscribe");
}
@Override
public void onComplete() {
Log.e(TAG,"onComplete");
}
@Override
public void onError(@NonNull Throwable e) {
Log.e(TAG,"onError");
}
});
Completable 不會發(fā)射數(shù)據(jù),只會給下游發(fā)送一個信號?;卣{(diào) onComplete方法。
Maybe示例:
Maybe.create(new MaybeOnSubscribe<Boolean>() {
@Override
public void subscribe(@NonNull MaybeEmitter<Boolean> e) throws Exception {
Log.e(TAG,"start send data");
e.onSuccess(true);
e.onComplete();
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new MaybeObserver<Boolean>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
Log.e(TAG,"onSubscribe");
}
@Override
public void onSuccess(@NonNull Boolean aBoolean) {
Log.e(TAG,"1->onSuccess:"+aBoolean);
}
@Override
public void onError(@NonNull Throwable e) {
Log.e(TAG,"onError");
}
@Override
public void onComplete() {
Log.e(TAG,"onComplete");
}
});
Maybe是Single和Completable的結(jié)合,需要注意的是
onSuccess和onComplete方法只會執(zhí)行其中一個,這不同于Observable和Flowable最后是以onComplete()結(jié)尾.
如上面的代碼,先調(diào)用onSuccess發(fā)射一個元素,再調(diào)用onComplete,
e.onComplete();
e.onSuccess(true);
最后打印結(jié)果如下:
E/MainActivity: onSubscribe
E/MainActivity: start send data
E/MainActivity: onComplete
可以看到只回調(diào)了 onComplete,我們把調(diào)用的順序調(diào)換一下:
e.onSuccess(true);
e.onComplete();
打印結(jié)果如下:
E/MainActivity: onSubscribe
E/MainActivity: start send data
E/MainActivity: 1->onSuccess:true
可以看到調(diào)換了之后打印OnSucces()而沒有打印onComplete(),這也印證了只會回調(diào)其中之一。
三、總結(jié)
RxJava 2.x 相比于 1.x 還是有很大的變化,雖然操作符基本不變,但是很多類和接口都是基于Reactive-Streams 規(guī)范重新設(shè)計的,命名也發(fā)生了變換,要想玩轉(zhuǎn)RxJava 2.x ,你得了解這些變化和使用場景,本文介紹了RxJava 2.x 的5種基礎(chǔ)響應(yīng)類型,希望對才開始學(xué)習(xí)RxJava 2.x 的同學(xué)有所幫助。
