前言
關(guān)于RxJava的介紹這里就不多說了,網(wǎng)上有很多相關(guān)的資料。
但有一點(diǎn)需要說明一下,很多同學(xué)可能在網(wǎng)上找到很多RxJava 1.X的教程,那么1.X和2.X有什么區(qū)別?學(xué)習(xí)2.X前需不需要先學(xué)習(xí)1.X?
其實(shí)1.X和2.X有很大的改變,如果你已學(xué)習(xí)過1.X,那么恭喜你,你只需要看看2.X有什么更新就可以了。如果你沒學(xué)習(xí)過1.X,那么也不需要擔(dān)心,你可以直接跳過1.X,來學(xué)習(xí)2.X。
所以本教程是直接使用2.X,概念性的東西這里也不多說,本教程直接使用例子一步一步帶大家入門。
先在Android Studio 項(xiàng)目添加Gradle配置:
compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
compile 'io.reactivex.rxjava2:rxjava:2.1.3'
可能你想添加最新的版本,那在哪里可以找到最新的版本呢?答案在這里:
https://github.com/ReactiveX/RxAndroid
入門例子:
//create創(chuàng)建一個(gè)上游Observable(被觀察者)
Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
Log.d(TAG, "Observable發(fā)出:1");
e.onNext(1);//向下游(觀察者)發(fā)射事件
Log.d(TAG, "Observable發(fā)出:2");
e.onNext(2);
Log.d(TAG, "Observable發(fā)出:3");
e.onNext(3);
Log.d(TAG, "Observable發(fā)出:Complete");
e.onComplete();
Log.d(TAG, "Observable發(fā)出:4");
e.onNext(4);//雖然無法接收事件,但發(fā)送事件還是繼續(xù)的
}
});
//創(chuàng)建一個(gè)下游Observer(觀察者)
Observer<Integer> observer = new Observer<Integer>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
Log.d(TAG, "subscribe");
}
@Override
public void onNext(@NonNull Integer integer) {
Log.d(TAG, "onNext收到:" + integer);
}
@Override
public void onError(@NonNull Throwable e) {
Log.d(TAG, "error");
}
@Override
public void onComplete() {
Log.d(TAG, "Observer complete");
}
};
//建立連接(訂閱),開始發(fā)送事件
observable.subscribe(observer);
ObservableEmitter是發(fā)射器,就是用來發(fā)出事件的,它可以發(fā)出三種類型的事件。通過調(diào)用emitter的onNext(T value)、onComplete()和onError(Throwable error)就可以分別發(fā)出next事件、complete事件和error事件。
寫成RxJava引以為傲的鏈?zhǔn)讲僮鳎?/p>
//create創(chuàng)建一個(gè)上游Observable(被觀察者)
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
Log.d(TAG, "Observable發(fā)出:1");
e.onNext(1);//向下游(觀察者)發(fā)射內(nèi)容1
Log.d(TAG, "Observable發(fā)出:2");
e.onNext(2);
Log.d(TAG, "Observable發(fā)出:3");
e.onNext(3);
Log.d(TAG, "Observable發(fā)出:Complete");
e.onComplete();
Log.d(TAG, "Observable發(fā)出:4");
e.onNext(4);//雖然無法接收事件,但發(fā)送事件還是繼續(xù)的
}
}).subscribe(new Observer<Integer>() {
//創(chuàng)建一個(gè)下游Observer(觀察者)
@Override
public void onSubscribe(@NonNull Disposable d) {
Log.d(TAG, "subscribe");
}
@Override
public void onNext(@NonNull Integer integer) {
Log.d(TAG, "onNext收到:" + integer);
}
@Override
public void onError(@NonNull Throwable e) {
Log.d(TAG, "error");
}
@Override
public void onComplete() {
Log.d(TAG, "Observer complete");
}
});
1、上游可以發(fā)送無限個(gè)onNext,下游也可以接收無限個(gè)onNext。
2、 當(dāng)上游發(fā)送了一個(gè)onComplete后, 上游onComplete之后的事件將會(huì)繼續(xù)發(fā)送,而下游收到onComplete事件之后將不再繼續(xù)接收事件。
3、當(dāng)上游發(fā)送了一個(gè)onError后,上游onError之后的事件將繼續(xù)發(fā)送,而下游收到onError事件之后將不再繼續(xù)接收事件。
4、上游可以不發(fā)送onComplete或onError。
5、最為關(guān)鍵的是onComplete和onError必須唯一并且互斥, 即不能發(fā)多個(gè)onComplete, 也不能發(fā)多個(gè)onError,也不能先發(fā)一個(gè)onComplete, 然后再發(fā)一個(gè)onError,反之亦然。
注:關(guān)于onComplete和onError唯一并且互斥這一點(diǎn), 是需要自行在代碼中進(jìn)行控制, 如果你的代碼邏輯中違背了這個(gè)規(guī)則, 并不一定會(huì)導(dǎo)致程序崩潰. 比如發(fā)送多個(gè)onComplete是可以正常運(yùn)行的, 依然是收到第一個(gè)onComplete就不再接收了, 但若是發(fā)送多個(gè)onError, 則收到第二個(gè)onError事件會(huì)導(dǎo)致程序會(huì)崩潰.
入門例子:
//create創(chuàng)建一個(gè)上游 Observable(被觀察者)
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
Log.d(TAG, "Observable發(fā)出:1");
e.onNext(1);//向下游(觀察者)發(fā)射內(nèi)容1
Log.d(TAG, "Observable發(fā)出:2");
e.onNext(2);
Log.d(TAG, "Observable發(fā)出:3");
e.onNext(3);
Log.d(TAG, "Observable發(fā)出:4");
e.onNext(4);
Log.d(TAG, "Observable發(fā)出:Complete");
e.onComplete();
Log.d(TAG, "Observable發(fā)出:5");
e.onNext(5);//雖然無法接收事件,但發(fā)送事件還是繼續(xù)的
}
}).subscribe(new Observer<Integer>() {
//創(chuàng)建一個(gè)下游 Observer(觀察者)
private Disposable mDisposable;
private int i;
@Override
public void onSubscribe(@NonNull Disposable d) {
Log.d(TAG, "subscribe");
mDisposable=d;
}
@Override
public void onNext(@NonNull Integer integer) {
Log.d(TAG, "onNext收到:" + integer);
i++;
if (i==2){
Log.d(TAG, "onNext:dispose");
mDisposable.dispose();//取消訂閱,不再接收事件
Log.d(TAG, "onNext isDisposed : " + mDisposable.isDisposed());
}
}
@Override
public void onError(@NonNull Throwable e) {
Log.d(TAG, "error");
}
@Override
public void onComplete() {
Log.d(TAG, "Observer complete");
}
});
下游調(diào)用dispose(),取消收收事件,但并不會(huì)導(dǎo)致上游不再繼續(xù)發(fā)送事件, 上游可以繼續(xù)發(fā)送剩余的事件。
subscribe()有多個(gè)重載:
public final Disposable subscribe();
public final Disposable subscribe(Consumer<? super T> onNext);
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError);
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete);
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Disposable> onSubscribe);
public final void subscribe(Observer<? super T> observer);
帶有一個(gè)Consumer參數(shù)的方法,表示下游只關(guān)心onNext事件,其他的事件不管。
因此,如果只需要onNext事件可以這么寫:
//create創(chuàng)建一個(gè)上游 Observable(被觀察者)
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
Log.d(TAG, "Observable發(fā)出:1");
e.onNext(1);//向下游(觀察者)發(fā)射內(nèi)容1
Log.d(TAG, "Observable發(fā)出:2");
e.onNext(2);
Log.d(TAG, "Observable發(fā)出:3");
e.onNext(3);
Log.d(TAG, "Observable發(fā)出:4");
e.onNext(4);
Log.d(TAG, "Observable發(fā)出:Complete");
e.onComplete();
Log.d(TAG, "Observable發(fā)出:5");
e.onNext(5);//雖然無法接收事件,但發(fā)送事件還是繼續(xù)的
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "onNext收到:" + integer);
}
});