前言:
本文所有代碼基于RxJava 2.2.3和RxAndroid 2.1.0,并使用kotlin來介紹RxJava中常用的操作符的用法及效果。
第一步:創(chuàng)建被觀察者Observable。
1、 create其實(shí)就是Observable里的一個(gè)靜態(tài)方法用于創(chuàng)建一個(gè)Observable實(shí)例。在下面這段代碼中我們主動(dòng)創(chuàng)建了兩個(gè)對象實(shí)例,一個(gè)是由Observable.create方法返回的Observable實(shí)例,一個(gè)是傳入create方法的參數(shù)ObservableOnSubscribe匿名對象并覆寫了定義事件發(fā)送順序的方法 ObservableOnSubscribe.subscribe。
//第一步
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter emitter) throws Exception {
Log.e("Create","ObservableOnSubscribe對象的subscribe方法被調(diào)用");
emitter.onNext("數(shù)據(jù)1");
emitter.onNext("數(shù)據(jù)2");
emitter.onNext("數(shù)據(jù)3");
emitter.onNext("數(shù)據(jù)4");
emitter.onError(new Exception());
emitter.onComplete();
}
});
2、Observable的create方法,該方法在這種情況下實(shí)際上返回的就是新new的 ObservableCreate對象,并且會將ObservableOnSubscribe對象賦值給其內(nèi)部的成員變量source
//Observable的create方法
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
//ObservableCreate類的成員變量及其構(gòu)造函數(shù) create傳入的參數(shù)ObservableOnSubscribe對象最終賦值給了source
final ObservableOnSubscribe<T> source;
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
//RxJavaPlugins.onAssembly方法
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
Function<? super Observable, ? extends Observable> f = onObservableAssembly;
if (f != null) {//此時(shí) f == null 所以會返回source即傳入的參數(shù)不會有其他操作
return apply(f, source);
}
return source;
}
第一步結(jié)論:創(chuàng)建了一個(gè)ObservableCreate對象,并將決定訂閱時(shí)事件調(diào)動(dòng)順序的ObservableOnSubscribe對象賦值給其成員變量source。
第二步:創(chuàng)建觀察者Observer
Observer<String> observer = new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.e("Create","Observer的onSubscribe方法");
}
@Override
public void onNext(String s) {
Log.e("Create","Observer的onNext方法 收到的參數(shù)為:"+s);
}
@Override
public void onError(Throwable e) {
Log.e("Create","Observer的onError方法");
}
@Override
public void onComplete() {
Log.e("Create","Observer的onComplete方法");
}
};
第二步結(jié)論:第二步就是單純的創(chuàng)建了一個(gè)觀察者,并覆寫如上所示的四個(gè)方法。
第三步:讓被觀察者(Observable)和觀察者(Observer)之間產(chǎn)生聯(lián)系
//觀察者訂閱被觀察者 兩者關(guān)系發(fā)生
observable.subscribe(observer);
1、訂閱之后立即調(diào)用被觀察者(observable)的subscribe方法。該方法中調(diào)用了抽象方法subscribeActual(observer);并將觀察者(observer)作為參數(shù)傳入傳入。
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned
a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe
for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");
//核心代碼是這句這個(gè)方法在Observable類中是個(gè)抽象方法第一步創(chuàng)建的ObservableCreate對象,覆寫了該方法,該方法的參數(shù)為
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// can't call onError because no way to know if a Disposable has been set or not
// can't call onSubscribe because the call might have set a Subscription already
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}
2、當(dāng)這個(gè)創(chuàng)建的Observable對象調(diào)用subscribe方法時(shí),subscribe方法會調(diào)用ObservableCreate類中覆寫的subscrubeActual方法,subscrubeActual的代碼如下:
protected void subscribeActual(Observer<? super T> observer) {
//將訂閱的Observer對象進(jìn)行封裝到CreateEmitter中,作為source調(diào)用subscribe方法時(shí)的參數(shù)
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
//source即為調(diào)用create操作符時(shí)傳入的用于定義調(diào)用順序的ObservableOnSubscribe對象,在此處調(diào)用了其subscribe方法
//傳入的參數(shù)是上面封裝了的CreateEmitter對象,所以我們在外面定義的調(diào)用順序就是調(diào)用的CreateEmitter的方法如onNext,
//onError,onComplete等方法
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
3、再看看CreateEmitter中對onNext,onError,onComplete的封裝。三個(gè)方法都調(diào)用了觀察者(observer)的相應(yīng)方法,但是在調(diào)用前都對其進(jìn)行了處理,判斷其是否被取消。若調(diào)用了onError或者onComplete中的任意一個(gè)方法時(shí),就會被取消,即用dispose()方法,這時(shí) !isDisposed())的值就為false了。所以后續(xù)的觀察者的其他方法都不可調(diào)用了。
//observer對象即為我們在第二步創(chuàng)建的觀察者(observer)
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally
not allowed in 2.x operators and sources."));
return;
}
if (!isDisposed()) {
observer.onNext(t);
}
}
public void onError(Throwable t) {
if (!tryOnError(t)) {
RxJavaPlugins.onError(t);
}
}
public boolean tryOnError(Throwable t) {
if (t == null) {
t = new NullPointerException("onError called with null. Null values are generally not
allowed in 2.x operators and sources.");
}
if (!isDisposed()) {
try {
observer.onError(t);
} finally {
dispose();
}
return true;
}
return false;
}
public void onComplete() {
if (!isDisposed()) {
try {
observer.onComplete();
} finally {
dispose();
}
}
}
第三步總結(jié):被觀察者(observable)調(diào)用subscribe方法,此方法最終將觀察者(observer)對象封裝成Emitter并作為參數(shù),調(diào)用ObservableCreate中source即第一步創(chuàng)建時(shí)傳入的參數(shù)的subscribe方法。即實(shí)現(xiàn)了調(diào)用第一步時(shí)定義的調(diào)用順序。而Emitter的封裝使onError及onComplete的調(diào)用做了限制,保證了onError和onComplete方法只能調(diào)用其中一個(gè)。
最后總結(jié):
上述一系列操作,可歸結(jié)為以下偽代碼:
//規(guī)定調(diào)用順序
ObservableOnSubscribe<String> source = new ObservableOnSubscribe<String>(){
@Override
public void subscribe(ObservableEmitter emitter) throws Exception {
Log.e("Create","ObservableOnSubscribe對象的subscribe方法被調(diào)用");
emitter.onNext("數(shù)據(jù)1");
emitter.onNext("數(shù)據(jù)2");
emitter.onNext("數(shù)據(jù)3");
emitter.onNext("數(shù)據(jù)4");
emitter.onError(new Exception());
emitter.onComplete();
}
};
//創(chuàng)建被觀察者
Observable<String> observable = new ObservableCreate<>(source);
observable.source = source;
//以上為第一步
//-------------------------------------------------------------------------
Observer<String> observer = new Observer<String>();
//創(chuàng)建觀察者這是第二步
//---------------------------------------------------------------------------
//調(diào)用observable.subscribe(observer)
Emitter<String> emitter = new EmitterCreate(observer);
observable.source.subscribe(emitter);
//第三步訂閱
//-----------------------------------------------------------------------------