
都知道觀察模式吧?
在開始之前讓我們簡單了解一下觀察模式,就是某對象A的變化引起其他多個(gè)對象B變化,但是前提是你需要去訂閱我,打個(gè)比方:就是我的狀態(tài)發(fā)生了改變,那我怎么通知你呢?所以我需要知道的如何去通知其他對象說我這里已經(jīng)改變了,你看看那需不需要做出改變。就比如微信的訂閱號,如果你不訂閱,那該訂閱號在發(fā)布內(nèi)容也不會(huì)通知,這里的訂閱號就是被觀察者,而用戶就是觀察者。那怎么說讓這兩者關(guān)聯(lián)來呢?前面說的訂閱號是要提供一個(gè)接口,允許用戶去訂閱的,所以最后就是被觀察者和觀察者兩個(gè)都得提供接口,訂閱號提供的接口讓用戶去訂閱類比微信號,當(dāng)訂閱號發(fā)布內(nèi)容,就通過這個(gè)微信號通知觀察者,所以訂閱就是這兩者的關(guān)聯(lián)點(diǎn)。
開始之前的兩個(gè)重要的類或接口:Observable 和 Observer
- Observable 它實(shí)現(xiàn)ObservableSource接口,通俗來講Observable就是一個(gè)被觀察者也有人叫可觀察的資源,這里就叫被觀察者;
-
Observer 觀察者;
涉及的類:
RxJava2.png
訂閱流程分析
開始RxJava的訂閱流程分析之前,來個(gè)簡單的栗子,代碼如下:
Observable<String> observableCreate = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("發(fā)射 subscribe");
emitter.onComplete();
}
});//ObservableCreate
Observable observableSubscribeOn = observableSubscribeOn.subscribeOn(Schedulers.io());//1
observableSubscribeOn.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
showLog("onSubscribe");
}
@Override
public void onNext(String s) {
showLog("onNext");
}
@Override
public void onError(Throwable e) {
showLog("onError");
}
@Override
public void onComplete() {
showLog("onComplete");
}
});
日志結(jié)果:
onSubscribe ,Thread: main
onNext ,Thread: RxNewThreadScheduler-3
onComplete ,Thread: RxNewThreadScheduler-3
如上代碼,之所以分開來寫是為了更清晰的去理解每一步RxJava生成的相關(guān)類。
如果你認(rèn)真看前面的內(nèi)容,你一下就明白Observable.subscribe()方法也就是訂閱的意思,是 Observable 和 Observer 的關(guān)聯(lián)點(diǎn),也就是被觀察者和觀察者的關(guān)聯(lián)點(diǎn),所以我們的分析就從Observable.subscribe(Observer observer)方法開始代碼如下:
public final void subscribe(Observer<? super T> observer) {
try {
// .....此處省略幾億代碼....
//此方法在Observable類是中是抽象的,注定是子類實(shí)現(xiàn)
subscribeActual(observer);
// .....此處省略幾億代碼....
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
// .....此處省略幾億代碼....
}
}
- 上面代碼不難理解在subscribe方法中直接就調(diào)用了subscribeActual(observer)方法,我可以翻譯為
實(shí)際訂閱; - subscribe方法是
Observable類的方法,他是抽象類,傳入了一個(gè) Observer 對象,開始的時(shí)候栗子我們可以知道Observable是通過我們調(diào)用Observable.create(ObservableOnSubscribe) 所創(chuàng)建出來的; - 那subscribeActual在Observable中是抽象方法,肯定是子類去實(shí)現(xiàn)了該方法,從第二點(diǎn)知道子類肯定是在Observable.create(ObservableOnSubscribe)中給new出來的,那么接下我們看看Observable.create(ObservableOnSubscribe)方法的實(shí)現(xiàn);
// Observable.create(ObservableOnSubscribe)
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
// .....此處省略幾億代碼....
// 直接就創(chuàng)建了ObservableCreate,并把source作為參數(shù)傳進(jìn)去
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
// onAssembly
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
Function<? super Observable, ? extends Observable> f = onObservableAssembly;
if (f != null) {
return apply(f, source);
}
return source;
}
我們從上面代碼我們知道在Observable.create(ObservableOnSubscribe)中直接就創(chuàng)建了ObservableCreate,而ObservableCreate是Observable的子類,并把source作為參數(shù)傳進(jìn)去,最后調(diào)用RxJavaPlugins.onAssembly方法,我們默認(rèn)返回ObservableCreate實(shí)例,所以O(shè)bservable.create方法最后返回的是ObservableCreate實(shí)例,所以就驗(yàn)證了上面的第三點(diǎn)實(shí)際調(diào)用的是ObservableCreate.subscribeActual(observer)方法,這是在不考慮其他變換和線程切換的情況,那我們就來看看ObservableCreate.subscribeActual(observer)方法的實(shí)現(xiàn),代碼如下:
@Override
protected void subscribeActual(Observer<? super T> observer) {
//事件發(fā)射器
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
//直接回調(diào)Observer的onSubscribe方法,這個(gè)方法是和線程切換無關(guān),只在當(dāng)前的線程中執(zhí)行
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
代碼不多,也很好理解:
- 首先調(diào)用observer.onSubscribe(parent)方法通知Observer已經(jīng)訂閱成功了。
- 最后調(diào)用source.subscribe(parent)方法完成訂閱,source又是什么呢?我們知道在ObservableCreate是在Observable.create方法時(shí)創(chuàng)建的,并把ObservableOnSubscribe傳進(jìn)來,所以source就是ObservableOnSubscribe,直接回調(diào)ObservableOnSubscribe.subscribe方法并把CreateEmitter作為參數(shù)傳遞進(jìn)去,之后再我們是栗子中通過這個(gè)對象調(diào)研onNext方法或者onComplete方法發(fā)射事件;
看一下CreateEmitter的實(shí)現(xiàn),代碼如下:
static final class CreateEmitter<T> extends AtomicReference<Disposable> implements ObservableEmitter<T>, Disposable {
//通過構(gòu)造方法注入 觀察者實(shí)例
final Observer<? super T> observer;
CreateEmitter(Observer<? super T> observer) {
this.observer = observer;
}
@Override
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if (!isDisposed()) {
observer.onNext(t);
}
}
@Override
public void onError(Throwable t) {
if (!tryOnError(t)) {
RxJavaPlugins.onError(t);
}
}
// .....此處省略幾億代碼....
@Override
public void onComplete() {
if (!isDisposed()) {
try {
observer.onComplete();
} finally {
dispose();
}
}
}
// .....此處省略幾億代碼....
}
為了簡介清晰我刪掉很多無關(guān)代碼,只保留onNext等這些相關(guān)的方法。
- 其實(shí)CreateEmitter是Observable的靜態(tài)內(nèi)部類,
- 在上面我們知道Observable.subscribeActual方法中創(chuàng)建了CreateEmitter實(shí)例并將Observer作為參數(shù)通過構(gòu)造方法注入Observer實(shí)例,作為CreateEmitter的成員變量;
- 之后在subscribeActual方法中調(diào)用ObservableOnSubscribe.subscribe的方法并把CreateEmitter實(shí)例作為方法參數(shù)傳遞進(jìn)去;
- 簡單來說CreateEmitter的作用就是發(fā)射事件,里面分裝了Observer實(shí)例,發(fā)射事件就回調(diào)到Obsever中的方法,如onNext等方法;
有沒有發(fā)現(xiàn)從一開始我們就僅僅講了從Obsevable的創(chuàng)建到訂閱,這是比較漢理解的,如果我增加一個(gè)map或線程切換呢?這里暫時(shí)不展開講線程切換。
重新把栗子的代碼在貼一遍:
Observable<String> observableCreate = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("發(fā)射 subscribe");
emitter.onComplete();
}
});//ObservableCreate
Observable observableSubscribeOn = observableSubscribeOn.map(new Function<String, Object>() {
@Override
public Object apply(String s) throws Exception {
Log.e("tag", "map");
return "aa";
}
})
observableSubscribeOn.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
showLog("onSubscribe");
}
@Override
public void onNext(String s) {
showLog("onNext");
}
@Override
public void onError(Throwable e) {
showLog("onError");
}
@Override
public void onComplete() {
showLog("onComplete");
}
});
如上代碼,訂閱流程會(huì)和之前的有什么不一樣呢?那么我們看個(gè)究竟,就從 Observable observableSubscribeOn = observableSubscribeOn.subscribeOn(Schedulers.io())開始,代碼如下:
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper){
ObjectHelper.requireNonNull(mapper, "mapper is null");
//這里把上游this傳進(jìn)去也就是source,以便調(diào)用上游的subscribe方法
return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}
從上面代碼看,我們知道在map方法中創(chuàng)建了ObservableMap并把上游的Observable參進(jìn)去了,而我們知道從Observable.subscribe方法開始訂閱就會(huì)調(diào)用 subscribeActual(observer)方法,所以在Observable.subscribe之后就會(huì)調(diào)用ObservableMap的subscribbeActual方法,代碼如下:
public void subscribeActual(Observer<? super U> t) {
source.subscribe(new MapObserver<T, U>(t, function));
}
在ObservableMap的subscribbeActual方法中,直接調(diào)用傳進(jìn)來的Observable的subscribe方法又間接調(diào)用subscribbeActual方法沒所以,訂閱的過程實(shí)際上是一樣的。
總結(jié)
- Observable是由上游往下游傳遞的,并且每個(gè)操作符都會(huì)創(chuàng)建新的Observable對象包裹上游的實(shí)例;
- Observer是由下游往上游傳遞的,也就是從Observable.subscribe方法開始。
流程圖:


上圖包括訂閱流程、線程切換 以及 事件發(fā)布流傳的過程,非常詳細(xì)。
