Android 淺析 RxJava (二) 原理

RxJava


前言

Linus Benedict Torvalds : RTFSC – Read The Fucking Source Code

概括

原理解析我們從三個部分入手:1、基本流程梳理,2、lift的實現(xiàn)原理,3、線程切換原理。

基本流程

Observable.create(new Observable.OnSubscribe<ArrayList<String>>() {
              @Override
              public void call(Subscriber<? super ArrayList<String>> subscriber) {
                  subscriber.onNext(T);
                  subscriber.onCompleted();
              }
          })
          .subscribe(new Subscriber<String>() {
              @Override
              public void onCompleted() {}
              @Override
              public void onError(Throwable e) {}
              @Override
              public void onNext(String s) {}
          });

這就是Rxjava的基本使用流程。我們從流程開始分析:

Observable.create()

// Step 1:
public final static <T> Observable<T> create(OnSubscribe<T> f) {
    return new Observable<T>(hook.onCreate(f));
}

// Step 2:
protected Observable(OnSubscribe<T> f) {
    this.onSubscribe = f;
}

這兩步是創(chuàng)建了一個Observable對象和OnSubscribe對象。

Observable.subscribe()

private static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
    // new Subscriber so onStart it
    subscriber.onStart();
    
    // allow the hook to intercept and/or decorate
    hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);
}

這一步的操作實際上就是調(diào)用了 Observable.onSubscribe.call(subscriber);

整理下基本流程分兩步:
1、創(chuàng)建Observable, OnSubscribe, Subscriber 三個對象
2、調(diào)用Observable的subscribe方法

那么他們最終的程序調(diào)用順序是:
1、Observable.subscribe()
2、Observable.OnSubscribe.call()
3、Subscriber.onNext()

map操作符和lift原理

RxJava之所以好用是因為它提供了很多非常簡潔的操作符動作,利用lift原理可以很流暢的實現(xiàn)操作的順序過度。

Observable.create(new Observable.OnSubscribe<ArrayList<String>>() {
              @Override
              public void call(Subscriber<? super ArrayList<String>> subscriber) {
                  subscriber.onNext(T);
                  subscriber.onCompleted();
              }
          })
          .map(new Func1<String, String>() {
              @Override
              public String call(String s) {
                  return null;
              }
          })
          .subscribe(new Subscriber<String>() {
              @Override
              public void onCompleted() {}
              @Override
              public void onError(Throwable e) {}
              @Override
              public void onNext(String s) {}
          });

Observable.map()

public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
    return lift(new OperatorMap<T, R>(func));
}

其實可以看到我們的map函數(shù)的主要實現(xiàn)是調(diào)用了litf方法,并在里面?zhèn)魅氲氖荗peratorMap的對象。

Observable lift()

public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
    return new Observable<R>(new OnSubscribe<R>() {
        @Override
        public void call(Subscriber<? super R> o) {
            Subscriber<? super T> st = hook.onLift(operator).call(o);
            st.onStart();
            onSubscribe.call(st);
        }
    });
}
  1. 首先 new 了一個 Func函數(shù), 保存在了 OperatorMap.transform 中。
  2. new了一個新的 Observable. 這個 Observable 的構(gòu)造函數(shù)中, 傳入了一個新的 OnSubscribe。
    lift的重點(diǎn)就在這個OnSubscribe里面。

先來看下OperatorMap的代碼(只列出核心):

public final class OperatorMap<T, R> implements Operator<R, T> {
    @Override
    public Subscriber<? super T> call(final Subscriber<? super R> o) {
        return new Subscriber<T>(o) {
            @Override
            public void onNext(T t) {
                o.onNext(transformer.call(t));
            }
        };
    }
}

我們看到從hook.onLift(operator).call(o);這里創(chuàng)建出來的Subscriber的call函數(shù)里主要做的就是transformer.call(t)也就是轉(zhuǎn)換的操作。然后再將其轉(zhuǎn)換的結(jié)果返回給onNext。

lift 流程總結(jié)

總結(jié)下整個回調(diào)流程:

  1. 調(diào)用Create函數(shù),創(chuàng)建一個新的Observable,并且創(chuàng)建一個OnSubscribe。
  2. 調(diào)用Map函數(shù),再創(chuàng)建一個新的Observable,同時也創(chuàng)建一個新的OnSubscribe。并且其中的OnSubscribe.call()方法調(diào)用了第一步的Observable.onSubscribe.call方法,最后將結(jié)果進(jìn)行transform 操作。
  3. 最后將拿到的transform 操作結(jié)果傳給我們最后的onNext()方法中作為結(jié)果。

subscribeOn()和observeOn()線程切換原理

Rxjava還有另外一個原因讓它在使用上非常簡潔,就是線程處理上。
首先來看下他們的作用域

Observable.just(1, 2, 3, 4) // IO 線程,由 subscribeOn() 指定
    .subscribeOn(Schedulers.io())
    .observeOn(Schedulers.newThread())
    .map(mapOperator) // 新線程,由 observeOn() 指定
    .observeOn(Schedulers.io())
    .map(mapOperator2) // IO 線程,由 observeOn() 指定
    .observeOn(AndroidSchedulers.mainThread) 
    .subscribe(subscriber);  // Android 主線程,由 observeOn() 指定

我們先上一個例子:

Observable.create(new Observable.OnSubscribe<ArrayList<String>>() {
              @Override
              public void call(Subscriber<? super ArrayList<String>> subscriber) {
                  subscriber.onNext(T);
                  subscriber.onCompleted();
              }
          })
          .subscribeOn(Schedulers.io())
          .observeOn(Schedulers.newThread())
          .subscribe(new Subscriber<String>() {
              @Override
              public void onCompleted() {}
              @Override
              public void onError(Throwable e) {}
              @Override
              public void onNext(String s) {}
          });

首先來看下observeOn()代碼:

public final Observable<T> observeOn(Scheduler scheduler) {
    if (this instanceof ScalarSynchronousObservable) {
        return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
    }
    return lift(new OperatorObserveOn<T>(scheduler));
}
public Subscriber<? super T> call(Subscriber<? super T> child) {
    if (scheduler instanceof ImmediateScheduler) {
        return child;
    } else if (scheduler instanceof TrampolineScheduler) {
        return child;
    } else {
        ObserveOnSubscriber<T> parent = new ObserveOnSubscriber<T>(scheduler, child);
        parent.init();
        return parent;
    }
}

可以看到observeOn其實也是調(diào)用了OperatorObserveOn類來實現(xiàn)的,本意上所有的Rxjava都是以操作符的形式來調(diào)用。接下來就是OperatorObserveOn的call()方法,該方法回根據(jù)傳入的Subscriber類型決定返回的方式。
總結(jié)來說就是通過lift()方法將接下來傳入執(zhí)行的subscribe方法通過新的線程方式執(zhí)行,并且在結(jié)束后再接著往下走。

總結(jié)

Rxjava 的原理我們并沒有每條都往里深入,因為rxjava的代碼復(fù)雜度很高,需要跟著代碼有耐心的往下走才能看清,建議讀者可以根據(jù)源碼一步步跟進(jìn)。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容