一個(gè)簡單的實(shí)例:
//傳參數(shù)url,獲取gif的基礎(chǔ)信息,并傳遞給下游做處理
private Observable<Bundle> showGifDialog(String url) {
return Observable.create(new ObservableOnSubscribe<Bundle>() {
@Override
public void subscribe(ObservableEmitter<Bundle> e) throws Exception {
e.onNext(BitmapUtils.getGifBitmapInfo(url));
}
}).subscribeOn(Schedulers.io());
}
//拿到gif信息后,展示一個(gè)自定義播放gif的Dialog
Disposable disposable = showGifDialog(picUrl)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Bundle>() {
@Override
public void accept(Bundle bundle) throws Exception {
//
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
//
}
});
我們從Observable的創(chuàng)建開始,分析源碼:
public static <T> Observable<T> create(ObservableCreate<T>(source)<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));//創(chuàng)建+封裝操作
}
? Observable.create()方法傳入一個(gè)ObservableOnSubscribe對象作為參數(shù),返回一個(gè)Observable對象,其內(nèi)部核心執(zhí)行的是ObservableCreate<T>(source)。源碼如下:
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
ObservableCreate是Observable的實(shí)現(xiàn)類,ObservableCreate<T>(source)將傳入的參數(shù)-----ObservableCreate對象賦值給ObservableCreate.source。后續(xù)的一操作是以該source為基礎(chǔ)執(zhí)行的。
? 除去線程切換的部分,Observable創(chuàng)建完,我們需要調(diào)用Observable.subscribe()方法將觀察者訂閱給Observable。
在subscribe()方法內(nèi)部完成真正訂閱之前,Consume將被new LambdaObserver<T>(onNext, onError, onComplete, onSubscribe);轉(zhuǎn)換成一個(gè)Observer對象。
? Observable.subscribe()方法如下,它傳入?yún)?shù)的是我們定義的下游觀察者,真正完成訂閱的是subscribeActual(observer)方法,將上游的被觀察者和觀察者結(jié)合起來,這是訂閱的核心步驟。
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// can't call onError because no way to know if a Disposable has been set or not
// can't call onSubscribe because the call might have set a Subscription already
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}
? 在Observable中,subscribeActual(observer)是一個(gè)抽象方法。我們之前提到,Observable.create()方法內(nèi)部實(shí)際創(chuàng)建的是一個(gè)ObservableCreate對象,所以,在ObservableCreate中找到真正執(zhí)行的subscribeActual方法,如下:
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);//發(fā)射器對象的實(shí)現(xiàn)類
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
? 在subscribeActual()方法中,主要執(zhí)行兩個(gè)操作:
observer.onSubscribe(parent); // 觸發(fā) Observer#onSubscribe(Disposable)
-
source.subscribe(parent);
傳入事件發(fā)射器執(zhí)行發(fā)射事件,發(fā)射傳給下游觀察者的數(shù)據(jù),即我們例子中的:
public void subscribe(ObservableEmitter<Bundle> e) throws Exception { e.onNext(BitmapUtils.getGifBitmapInfo(url)); }我們重點(diǎn)關(guān)注的是實(shí)際進(jìn)行發(fā)射事件的CreateEmitter.onNext()方法,它的實(shí)現(xiàn)如下:
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if (!isDisposed()) {
observer.onNext(t);
}
}
? 很簡單,它調(diào)用observer.onNext()方法將我們傳入的Value,傳遞給下游觀察者。而這個(gè)observer,正是Observeable.subscribe(Observer<? super T> observer)方法調(diào)用subscribeActual(observer);時(shí)傳入的。
下一篇:RxJava:線程切換