簡介
這篇RxJava源碼分析非常精簡,僅通過最簡單的調(diào)用方式對RxJava的整個調(diào)用流程做深入剖析,未貼大量源碼,需要結合源碼,對比流程分析,跟上思路。
先看下簡單使用
Observable.create(new ObservableOnSubscribe<Object>() {
@Override
public void subscribe(ObservableEmitter<Object> emitter) throws Exception {
emitter.onNext("發(fā)射一條消息");
}
}).subscribe(new Observer<Object>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Object value) {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
調(diào)用Observable的create時,返回一個ObservableCreate并傳入ObservableOnSubscribe(即source)
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
鏈式調(diào)用Observable的訂閱方法subscribe,傳入觀察者Observer
@SchedulerSupport(SchedulerSupport.NONE)
@Override
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// can't call onError because no way to know if a Disposable has been set or not
// can't call onSubscribe because the call might have set a Subscription already
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}
再調(diào)用ObservableCreate的subscribeActual方法,可以看到創(chuàng)建了一個發(fā)射器其入?yún)⑹怯^察者,然后立馬調(diào)用了觀察者的onSubscribe方法,最終再調(diào)用我們最初傳入的source的subscribe方法。
整體流程詳盡分析
Observable
.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(99);
}
})
.map(new Function<Integer, String>() {
@Override
public String apply(Integer integer) throws Exception {
if (integer == 99){
return "一切正常";
}
return "";
}
})
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String value) {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
通過這段調(diào)用代碼,對源碼調(diào)用流程進行深入分析,精心繪制流程圖如下:

整體流程概述,大概可分為三個階段:
- 創(chuàng)建階段(從上向下創(chuàng)建):從Observable.create開始,創(chuàng)建一個ObservableCreate(傳入一個發(fā)射消息源source即為ObservableOnSubscribe)返回-->然后再通過返回的ObservableCreate的.map創(chuàng)建一個ObserverableMap(并將自己傳入作為ObserverableMap的source),返回一個ObserverableMap。
- 訂閱階段(從下向上訂閱):這時候來到最后的.subscribe(傳入終點觀察者Observer)(即ObserverableMap的subscribe())時,會進而調(diào)用它的subscribeActual(),其內(nèi)部則又會創(chuàng)建一個MapObserver(并傳入終點觀察者Observer),調(diào)用創(chuàng)建階段傳入的source的.subscribe(傳入MapObserver),而這個source即為ObservableCreate --> 接著ObservableCreate也一樣調(diào)用它的subscribeActual(),內(nèi)部創(chuàng)建一個發(fā)射器CreateEmitter(并傳入MapObserver),再調(diào)用之前傳入的source的.subscribe(傳入發(fā)射器),這個source其實就是最初通過Observable.create傳入ObservableOnSubscribe
- 發(fā)射階段(從上向下分發(fā)):又回到了開始調(diào)用ObservableOnSubscribe.subscribe(),內(nèi)部調(diào)用了emitter.onNext();這個CreateEmitter就是剛剛我們訂閱時在ObservableCreate中創(chuàng)建的,它的onNext中又會調(diào)用我們創(chuàng)建發(fā)射器時傳入的observer(即為MapObserver)的onNext(),而MapObserver.onNext()內(nèi)部會通過創(chuàng)建ObserverableMap時傳入的function進行轉(zhuǎn)換,最后再調(diào)用真正的observer.onNext(),即最終的Observer。
如果上面明白了,加入線程切換其實就非常簡單了,根據(jù)整體調(diào)用流向加入中間層。
.subscribeOn(Schedulers.io())
//Schedulers.io()返回一個線程池
.observeOn(AndroidSchedulers.mainThread())
//AndroidSchedulers.mainThread()返回一個持有主線程Handler
如上.subscribeOn()也只是包裹了一層Observerable而已,當向上訂閱調(diào)用上層的source.subscribe()時,會把這句放入子線程執(zhí)行,所以從此后向上訂閱以及再向下分發(fā)的部分都是在子線程;
而.observeOn同樣也只是包裝了一層Observerable,當向下發(fā)射調(diào)用到observer.onNext時會通過Handler切換到主線程,后續(xù)的分發(fā)就都切換到主線程了。