最近復(fù)習(xí)RxJava的時(shí)候發(fā)現(xiàn)解說RxJava的原理不多,所以,來機(jī)智的騙心心來了。
依賴:
implementation "io.reactivex.rxjava2:rxjava:2.2.4"
簡(jiǎn)單使用
待會(huì)兒講解的原理呢,是由這個(gè)使用 demo 來講解。
Observable.create(new ObservableOnSubscribe<Integer>() { // 創(chuàng)建被觀察者
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1); // 發(fā)射器,發(fā)射一個(gè)事件
emitter.onNext(2);
}
}).subscribe(new Consumer<Integer>() { // 觀察者
@Override
public void accept(Integer integer) throws Exception { // 用于處理 onNext 事件
Log.d(TAG, "accept: "+integer);
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
Log.d(TAG, "accept: "+throwable.getMessage());
}
});
原理解析
Observable.create(ObservableOnSubscribe<T>)
在上面的使用demo中,這是整個(gè)訂閱過程的開始:創(chuàng)建一個(gè)被觀察者。來,我們看一下我們的被觀察者是什么。
Observable.class
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null"); // 判空
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source)); // 默認(rèn)直接返回這個(gè)new出來的ObservableCreate
}
這個(gè)ObjectHelper.requireNonNull怎么判空?。?/p>
public static <T> T requireNonNull(T object, String message) {
if (object == null) {
throw new NullPointerException(message);
}
return object;
}
簡(jiǎn)直不要太簡(jiǎn)單,就是判斷輸入的object是不是null,不是就返回,是就拋出異常。
那這個(gè)RxJavaPlugins又是干嘛的呢?用過的都知道,沒用過推薦看一下給初學(xué)者的RxJava2.0教程(十) 。其實(shí)我們沒有對(duì)這個(gè) RxJavaPlugins 做設(shè)置的話,就是返回我們傳進(jìn)來的這個(gè)參數(shù)。后面我們看到這個(gè)RxJavaPlugins就可以默認(rèn)為直接返回參數(shù)。
書接上文,那調(diào)用 Observable.create(ObservableOnSubscribe) ,豈不是就是把自己在外部實(shí)現(xiàn)的 ObservableOnSubscribe 匿名內(nèi)部類包裝到 ObservableCreate 中去羅。
public final class ObservableCreate<T> extends Observable<T> { // 注意繼承關(guān)系哦
final ObservableOnSubscribe<T> source; // 最原始的目標(biāo):被觀察者
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source; // 傳進(jìn)來的是外面new的匿名內(nèi)部類ObservableOnSubscribe
}
...
}
看到這里,emmm,nice,保存住了被觀察者,返回了ObservableCreate 對(duì)象。注意哦,這個(gè)時(shí)候后面一個(gè) . 操作的就是這個(gè) ObservableCreate對(duì)象(換言之,后面點(diǎn)的就是ObservableCreate中的方法)。
好的,下面就是 .subscribe
.subscribe
這個(gè)方法就很有意思了,實(shí)現(xiàn)了訂閱這個(gè)操作,或者說,是觸發(fā)了onSuscribe、onNext 等操作。
先看一下 subscribe 這個(gè)方法的重載:
public final Disposable subscribe() {} // 注意哦,這里的都是final方法
public final Disposable subscribe(Consumer<? super T> onNext) {}
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError) {}
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete) {}
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Disposable> onSubscribe) {}
public final void subscribe(Observer<? super T> observer) {}
有點(diǎn)多哈,但是實(shí)現(xiàn)呢?很簡(jiǎn)單,大家都調(diào)用最后一個(gè)處理,沒有傳參的都添上默認(rèn)的 Custom 就可以了。什么叫默認(rèn)實(shí)現(xiàn),就是實(shí)現(xiàn)了接口,但是默認(rèn)不處理,比如默認(rèn)的Action,還有就是簡(jiǎn)單處理,比如onError,但是都不是很重要,我們主要關(guān)注我們自己實(shí)現(xiàn)的東西嘛。
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError) {
return subscribe(onNext, onError, Functions.EMPTY_ACTION, Functions.emptyConsumer());
}
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
Action onComplete, Consumer<? super Disposable> onSubscribe) {
ObjectHelper.requireNonNull(onNext, "onNext is null"); // 挨個(gè)判空
ObjectHelper.requireNonNull(onError, "onError is null");
ObjectHelper.requireNonNull(onComplete, "onComplete is null");
ObjectHelper.requireNonNull(onSubscribe, "onSubscribe is null");
LambdaObserver<T> ls = new LambdaObserver<T>(onNext, onError, onComplete, onSubscribe); // 用一個(gè)類來包裹
subscribe(ls); // 包裹類的訂閱
return ls;
}
那么這個(gè)用來包裹的 LambdaObserver 又是什么鬼呢?
public final class LambdaObserver<T> extends AtomicReference<Disposable>
implements Observer<T>, Disposable, LambdaConsumerIntrospection {
private static final long serialVersionUID = -7251123623727029452L;
final Consumer<? super T> onNext;
final Consumer<? super Throwable> onError;
final Action onComplete;
final Consumer<? super Disposable> onSubscribe;
public LambdaObserver(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
Action onComplete,
Consumer<? super Disposable> onSubscribe) {
super();
this.onNext = onNext;
this.onError = onError;
this.onComplete = onComplete;
this.onSubscribe = onSubscribe;
}
...
}
就是將四個(gè)類型(onNext,onError,onComplete,onSubscribe)封裝成一個(gè)一個(gè)對(duì)象,方便對(duì)整個(gè)流程的調(diào)用。
將四個(gè)對(duì)象封裝在一起了過后,就是應(yīng)該是訂閱了把?
@SchedulerSupport(SchedulerSupport.NONE)
@Override
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer); // 檢查需要執(zhí)行預(yù)操作不,默認(rèn)返回 observer
// 判空
ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");
subscribeActual(observer); // 真正處理的函數(shù)
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// can't call onError because no way to know if a Disposable has been set or not
// can't call onSubscribe because the call might have set a Subscription already
RxJavaPlugins.onError(e); // 錯(cuò)誤的話,捕獲,并交給RxJavaPlugins#onError來處理,沒有交給我們自定義的onError,莫看錯(cuò)啦
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}
真正有用的就是 subscribeActual(observer);這一句,吐血哦,還不訂閱,我都快等不及了。好嘛,大神就是大神,一看,自己沒實(shí)現(xiàn)。后面一想,就是牛逼。
/**
* Operator implementations (both source and intermediate) should implement this method that
* performs the necessary business logic and handles the incoming {@link Observer}s.
* <p>There is no need to call any of the plugin hooks on the current {@code Observable} instance or
* the {@code Observer}; all hooks and basic safeguards have been
* applied by {@link #subscribe(Observer)} before this method gets called.
* @param observer the incoming Observer, never null
*/
protected abstract void subscribeActual(Observer<? super T> observer);
所有的操作符都會(huì)重寫這個(gè)方法,我是不是暴露了什么,emm,所有運(yùn)算符返回都是 Observable 的子類,就比如說 create 返回的就是 Observable 的子類 ObservableCreate,后面就是在這基礎(chǔ)上調(diào)用了。這里我們可以看到,其實(shí)所有的 subscribe 我們都重寫不了,唯一能重寫且必須重寫的就是這個(gè) subscribeActual。這就意味著,這個(gè)方法是我子類實(shí)現(xiàn)真正訂閱的入口。
前面講解我們知道,現(xiàn)在在demo中調(diào)用的是 ObservableCreate 的 subscribe,那么事情就變得很簡(jiǎn)單,我們直接找到 ObservableCreate#subscribeActual(observer) 進(jìn)行分析就好。在分析之前,先用圖總結(jié)下前面的東西:

好的嘞,那我們接下來就看一下核心的 subscribeActual 方法如何實(shí)現(xiàn)的?
@Override
protected void subscribeActual(Observer<? super T> observer) { //傳進(jìn)來的就是前面的 LambdaObserver
CreateEmitter<T> parent = new CreateEmitter<T>(observer); // 發(fā)射器,每一個(gè)操作符對(duì)應(yīng)的類內(nèi)部都會(huì)自己實(shí)現(xiàn),因?yàn)樘幚磉壿嫴灰粯影? observer.onSubscribe(parent); // 回調(diào)了自己實(shí)現(xiàn)的 onSubscribe 對(duì)用的Consumer
try {
source.subscribe(parent); // 這個(gè)source就是初始化類的時(shí)候傳進(jìn)來的被觀察者,這里將發(fā)送器給了它
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex); // 這里捕獲的異常才交給自己寫的Consumer處理
}
}
這里代碼不多,干的事還不少,首先是new出我們的發(fā)射器,然后觸發(fā)被觀察者的訂閱回調(diào),然后再,執(zhí)行被觀察者的subscribe方法,如果捕獲到異常就交給自定義的 onError 處理。
有一個(gè)概念可以先了解,無論以后把這個(gè)對(duì)象怎么包裹,傳遞,只有這里才調(diào)用了被觀察者 Observable 的 subscribe 方法。
接下來我們來詳解一下,這個(gè)回調(diào)流程。
observer.onSubscribe(parent); 這里的 observer 是什么?傳進(jìn)來的 包裝有自定義的觀察者的 LambdaObserver ,他是 Observable 的子類。接下來看一下他的調(diào)用:
@Override
public void onSubscribe(Disposable d) {
if (DisposableHelper.setOnce(this, d)) { // 設(shè)置并判斷是否是第一次
try {
onSubscribe.accept(this);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
d.dispose(); // 出現(xiàn)異常,解除訂閱
onError(ex); // 并向下傳遞
}
}
}
@Override
public void onError(Throwable t) {
if (!isDisposed()) { // 判斷是否斷開連接
lazySet(DisposableHelper.DISPOSED);
try {
onError.accept(t); //交給自定義的消費(fèi)者
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
RxJavaPlugins.onError(new CompositeException(t, e));
}
} else {
RxJavaPlugins.onError(t);
}
}
很簡(jiǎn)單,對(duì)不對(duì),就是直接調(diào)用了 onNext 消費(fèi)者的 accept 方法(自定義或者默認(rèn))。出現(xiàn)異常,如果沒有斷開連接,且是第一次接收到,就交給 onError (自定義或者默認(rèn))處理。
那么 source.subscribe(parent);不會(huì)也這么簡(jiǎn)單把?
source 就是傳進(jìn)來的 ObservableOnSubscribe,自定義的被觀察者
parent 就是發(fā)射器。這一調(diào)用,我們自己寫的被觀察者的邏輯就巴拉巴拉的執(zhí)行了
這里就是直接就回調(diào)了。嚇人。。
那 parent.onError(ex); 怎么實(shí)現(xiàn)的?這就要真正說道說道這個(gè)發(fā)射器了。
發(fā)送器是什么狗東西呢?它是ObservableCreate的靜態(tài)內(nèi)部類:
static final class CreateEmitter<T> extends AtomicReference<Disposable> implements ObservableEmitter<T>, Disposable {
final Observer<? super T> observer; // 觀察者,就是傳進(jìn)來的 LambdaObserver
CreateEmitter(Observer<? super T> observer) {
this.observer = observer;
}
@Override
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if (!isDisposed()) { // 是否已經(jīng)斷開連接
observer.onNext(t); // 調(diào)用自定義的Custom回調(diào)處理
}
}
@Override
public void onError(Throwable t) {
if (!tryOnError(t)) { // 交給觀察者
RxJavaPlugins.onError(t);
}
}
@Override
public boolean tryOnError(Throwable t) {
if (t == null) {
t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
}
if (!isDisposed()) { //沒斷開,不為null
try {
observer.onError(t);
} finally {
dispose(); //失敗直接斷開連接
}
return true;
}
return false;
}
@Override
public void onComplete() {
if (!isDisposed()) {
try {
observer.onComplete(); // 交給觀察者
} finally {
dispose();
}
}
}
@Override
public void setDisposable(Disposable d) { DisposableHelper.set(this, d); }
@Override
public void dispose() { DisposableHelper.dispose(this); }
@Override
public boolean isDisposed() { return DisposableHelper.isDisposed(get()); }
}
咦,這結(jié)構(gòu)簡(jiǎn)直不要太簡(jiǎn)單,就相當(dāng)于一次中轉(zhuǎn),只不過是加入了一些容錯(cuò)機(jī)制。到這里整個(gè)流程要用到的類就結(jié)束了,當(dāng)我們?cè)跁鴮懕挥^察者的時(shí)候,使用這個(gè)發(fā)射器提交東西,就是直接調(diào)用這個(gè)東西來調(diào)用消費(fèi)者的對(duì)應(yīng)回調(diào)。
千言不如一圖

不對(duì),放錯(cuò)了

- 創(chuàng)建真正的被觀察者包裹對(duì)象(繼承于 Observable),并將自己寫的真正被觀察者包裹起來
- 調(diào)用被觀察者的subscribe方法,將自己創(chuàng)建的觀察者包裹起來
- 作為參數(shù),傳入ObservableCreate 對(duì)方實(shí)現(xiàn)的 subscribeActual(Observer) 中。
- 在 subscribeActual 方法中生成 emitter,并且回調(diào)被觀察者的onSubscribe,確定連接關(guān)系。
- 自己寫的被觀察者的 subscribe 調(diào)用,我們可以使用 emitter 提交東西,一提交就將提交的東西交給 被觀察者對(duì)應(yīng)的方法執(zhí)行