Rxjava加載流程淺析

基本用法

         Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {

            @Override
            public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(123);
                emitter.onComplete();
            }
        });
        Observer<Integer> observer = new Observer<Integer>() {
            @Override
            public void onSubscribe(@NonNull Disposable d) {
                LogUtils.d("onSubscribe");
            }

            @Override
            public void onNext(@NonNull Integer o) {
                System.out.println(o);
            }

            @Override
            public void onError(@NonNull Throwable e) {
                System.out.println(e.getMessage());
            }

            @Override
            public void onComplete() {
                LogUtils.d("onComplete");
            }
        };
        observable.subscribe(observer); 

我們以這段簡單的代碼為基礎(chǔ),講解下貫穿整個ReactiveX設(shè)計的四個概念:觀察者,被觀察者,事件,訂閱

  • 觀察者
    對事件進行響應(yīng)的對象,也可以稱作消費者,在上述的代碼中,subscirbe方法有很多個重載,其參數(shù)要么是一個Consumer對象,要么是一個Observer對象,Consumer對象后續(xù)也會被包裝成一個LambdaObserver對象。因此可以說subscribe方法的參數(shù)即為Observer(觀察者)。
  • 被觀察者
    產(chǎn)生事件的對象,也可以稱作生產(chǎn)者,在上述代碼中,Observable.create(...)返回的是一個Observable對象,即為這段程序的被觀察者(生產(chǎn)者)。
  • 事件
    RxJava中存在四種事件流:onSubscribe(訂閱事件),onNext(正常事件),onError(異常事件),onComplete(完成事件)。在上述代碼中,是將一個整型元素作為onNext事件中的數(shù)據(jù)進行發(fā)送。
  • 訂閱
    創(chuàng)建觀察者與被觀察者之間觀察關(guān)系,對應(yīng)著上述代碼中的subscribe()方法。RxJava的事件驅(qū)動模型是一種“拉模型”,在觀察者沒有進行事件訂閱之前是不會有事件產(chǎn)生的,只有觀察者進行訂閱后,才會觸發(fā)被觀察者生產(chǎn)事件。

集大成者Observable

在整個數(shù)據(jù)處理的過程中,Observable可以說是最重要的一個對象??蛻舳耍ㄏ⒌纳a(chǎn)者或者消費者)只和Observable進行交互,觀察者和被觀察者之間關(guān)系的創(chuàng)建也是由Observable去實現(xiàn),而不用我們顯示的編碼實現(xiàn),這大大降低了我們使用觀察者模式的成本。

image

從圖中我們可以看出:

  • Observable實現(xiàn)了ObservableSource接口,該接口中只有一個方法:subscribe()。從字面意思就可以理解,這是一個提供觀察能力的接口,所以O(shè)bservable的一大能力是供觀察者進行事件訂閱,而進行事件訂閱的方法實現(xiàn)就是調(diào)用Observable的subscribe()方法

  • Observable是一個抽象類,它提供了subscribeActual模板方法供子類實現(xiàn),從源碼中可以看出,Observable的subscribe()方法最終會委托子類的subscribeActual()方法實現(xiàn),這個方法會建立生產(chǎn)者與消費者之間的關(guān)聯(lián)關(guān)系。

  • 除此之外,Observable還是一個工廠類,它提供了靜態(tài)方法fromArray()、create()等用來創(chuàng)建具體的可觀察對象,同時還提供了flatMap()、concatMap()等操作方法對可觀察對象進行包裝。

Observable的存在讓生產(chǎn)者和消費者完全的解耦了,生產(chǎn)者只需關(guān)注自己生成何種Observable對象,而消費者也只需關(guān)注自己觀察的是哪種Observable。

在實際的應(yīng)用中,Rxjava已經(jīng)提供了各種各樣的操作符供我們使用,生產(chǎn)者只需要調(diào)用Observable中相應(yīng)的方法即可以生成所需的可觀察對象,供消費者進行事件訂閱。消費者只需調(diào)用可觀察對象的subscribe()方法即可與生產(chǎn)者建立觀察關(guān)系,極其方便。

真實的觀察

觀察者模式是RxJava設(shè)計的核心思想,在觀察者模式中總是存在觀察的對象和被觀察的對象,從上文的解析中也可以看出Observable更多的是一個控制器的作用,而并非真正的事件的來源。那么在RxJava中,什么才是真正的生產(chǎn)者,什么才是真正的消費者呢。

我們來分析下以下三種常見的Observable:

Observable arrayObservable = Observable.fromArray(1,2,3,4,5);
Observable createObservable = Observable.create(emmit->emmit.onNext(1));
Observable justObservable = Observable.just(1);

先簡單介紹下這幾個Observable的作用,fromArray的作用是將數(shù)組中的元素作為onNext事件發(fā)送,create的作用是發(fā)送自定義事件,just的作用是發(fā)送單個事件。

上一小節(jié)有講到實際的訂閱行為是由各個Observable類中subscribeActual()方法實現(xiàn)的,我們來看下這三個類的subscribeActual()方法。


image

除去細枝末節(jié),這三個方法都可以分成以下三步

1.創(chuàng)建被觀察者對象,并傳入觀察者observer,建立兩者的關(guān)聯(lián)關(guān)系;
2.觸發(fā)onSubscribe事件,觀察者響應(yīng)該事件;
3.進行事件的拉取,我們可以進入到d.run()source.subscribe(parent),sd.run()這些方法的內(nèi)部看一些,可以看到這些方法就是在發(fā)送onNext(),onError(),onComplete()等事件。

下圖是整個流程中的相關(guān)類圖。實際事件的發(fā)送者是FromArrayDisposable等對象,而實際的觀察者,則是一個實現(xiàn)了Observer接口的實體類。如果我們在subscribe時傳入的是一個lambda表達式,之后會被包裝成一個默認(rèn)的LambdaObserver對象,進行事件消費。

image

包裝

RxJava中提供了豐富的操作符,比如flatMap,concatMap等可以對事件轉(zhuǎn)換,subscribeOn,observableOn等可以對生產(chǎn)和消費的線程進行控制。這些操作符實際上調(diào)用了Observable中的包裝方法對原有的可觀察對象進行包裝,返回了一個增強了的可觀察對象。

操作符種類繁多,在這就不一一舉例,我們以flatMap為例,分析一下這些操作符是如何工作的。

首先,flatMap操作會返回一個ObservableFlatMap對象,在創(chuàng)建這個對象時,會將原始的Observable對象作為構(gòu)造函數(shù)的參數(shù)傳入。

查看其核心方法subscribeActual,

@Override
    public void subscribeActual(Observer<? super U> t) {

        if (ObservableScalarXMap.tryScalarXMapSubscribe(source, t, mapper)) {
            return;
        }

        source.subscribe(new MergeObserver<T, U>(t, mapper, delayErrors, maxConcurrency, bufferSize));
    }

可以看到這一類對象的subscribeActual方法和上一節(jié)中的方法不太一樣,這里面并沒有去實際的創(chuàng)建觀察關(guān)系,而是做了兩件事:

1.對觀察者進行增強,將其包裝成為MergeObserver對象
2.再調(diào)用source的subscribe方法,這里source就是前面構(gòu)造函數(shù)中傳入的Observable對象,由其再進行觀察關(guān)系的建立。

下圖是RxJava中裝飾器模式的相關(guān)類圖:所有的包裝類都繼承了AbstractObservableWithUpstream類,該抽象類有一個類型為ObservableSource的成員函數(shù),用來持有被裝飾的對象。


image

Observable是支持鏈?zhǔn)讲僮鞯?,就和Java 8中的Stream一樣,我們來考慮這樣一行代碼。

        Observable.fromArray(1,2,3,4,5).flatMap(num->Observable.just(num)).observeOn(Schedulers.newThread()).subscribe(num-> System.out.println(num));

我們在分析上面這串代碼時,一定會凌亂非常,在看源碼時也會看到前面忘掉后面,但是如果我們對RxJava的包裝流程足夠了解的話,就可以很輕松的對上述代碼進行分析。


image

流程淺析

我們以一段常見的代碼分析事件產(chǎn)生和消費的流程

 Observable.create(new ObservableOnSubscribe<Integer>() {

            @Override
            public void subscribe(@io.reactivex.annotations.NonNull ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
            }
        }).subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(@io.reactivex.annotations.NonNull Disposable d) {
System.out.println("onSubscribe");
            }

            @Override
            public void onNext(@io.reactivex.annotations.NonNull Integer integer) {
                System.out.println(integer);
            }

            @Override
            public void onError(@io.reactivex.annotations.NonNull Throwable e) {
            }

            @Override
            public void onComplete() {
            }
        });

在分析之前,我們先定義幾個概念,source代表著被觀察者;downStream代表著觀察者;它們在下面的代碼將會重復(fù)出現(xiàn)。

Observable.create(ObservableOnSubscribe source)

Observable.create(ObservableOnSubscribe source)創(chuàng)建了一個ObservableCreate(ObservableOnSubscribe<T> source)對象,其內(nèi)部持有了真實的被觀察者ObservableOnSubscribe,我們將這個ObservableOnSubscribe稱為原始被觀察者,它是一個接口,內(nèi)部定義了subscribe()方法:

public interface ObservableOnSubscribe<T> {

    /**
     * Called for each Observer that subscribes.
     * @param emitter the safe emitter instance, never null
     * @throws Exception on error
     */
    void subscribe(@NonNull ObservableEmitter<T> emitter) throws Exception;
}

小結(jié):

ObservableCreate持有的sourceObservableOnSubscribe

ObservableCreate.subscribeOn(Schedulers.IO)

接著我們調(diào)用了ObservableCreate對象的subscribeOn,并傳入了IO線程調(diào)度器,subscribeOn()方法的實現(xiàn)在Observable類,ObservableCreate繼承了Observable類

#Observable
public final Observable<T> subscribeOn(Scheduler scheduler) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
    }

其實就是創(chuàng)建了ObservableSubscribeOn對象,并傳入了ObservableCreate對象和IO線程調(diào)度器。查看ObservableSubscribeOn的構(gòu)造函數(shù):

public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
        super(source);
        this.scheduler = scheduler;
    }

可以看到ObservableSubscribeOn把source傳給了父類,
ObservableSubscribeOn繼承了AbstractObservableWithUpstream,因此AbstractObservableWithUpstream持有了ObservableCreate對象,但我們可以理解為ObservableSubscribeOn持有了ObservableCreate對象。

小結(jié):

ObservableSubscribeOn持有了sourceObservableCreate

ObservableSubscribeOn.observerOn(AndroidSchedulers.mainThread())

observerOn()方法的實現(xiàn)同樣是在Observable類中:

public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        ObjectHelper.verifyPositive(bufferSize, "bufferSize");
        return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
    }

可以看到也只是創(chuàng)建了一個ObservableObserveOn對象,并傳入了ObservableObserveOn對象和主線程調(diào)度器,這里的delayErrorbufferSize傳入的是默認(rèn)值,暫且不表,查看ObservableObserveOn的構(gòu)造函數(shù):

public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
        super(source);
        this.scheduler = scheduler;
        this.delayError = delayError;
        this.bufferSize = bufferSize;
    }

這里同樣的是把source傳給了父類,ObservableObserveOn同樣也是繼承了AbstractObservableWithUpstream

小結(jié):

ObservableObserveOn持有了sourceObservableSubscribeOn

ObservableObserveOn<T>.subscribe(observer)

接著我們調(diào)用了ObservableObserveOn對象的subscribe()方法,并傳入了observer,該observer是最外層的Observer<T>的實現(xiàn)類:

public interface Observer<T> {
    void onSubscribe(@NonNull Disposable d);
    void onNext(@NonNull T t);
    void onError(@NonNull Throwable e);
    void onComplete();
}

subscribe(observer)方法的實現(xiàn)是在Observable類中,其內(nèi)部調(diào)用了subscribeActual(observer),subscribeActual(observer)是抽象方法,具體實現(xiàn)是在子類中,查看ObservableObserveOn類的subscribeActual(observer)

# ObservableObserveOn
@Override
    protected void subscribeActual(Observer<? super T> observer) {
        if (scheduler instanceof TrampolineScheduler) {
            source.subscribe(observer);
        } else {
            Scheduler.Worker w = scheduler.createWorker();

            source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
        }
    }

1.創(chuàng)建一個任務(wù),scheduler為AndroidSchedulers(主線程);

createWorker()的實現(xiàn)是在HandlerScheduler中,該方法創(chuàng)建了一個HandlerWorker對象,它ji'c持有一個主線程關(guān)聯(lián)的handler(new Handler(Looper.getMainLooper()));

2.創(chuàng)建一個Observer觀察者 => ObserveOnObserver,持有一個傳進來的observer和worker,傳進來的observer是最開始最外面的Observer<T> 接口實現(xiàn)類,woker是HandlerWorker對象。

 ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {
            this.downstream = actual;
            this.worker = worker;
            this.delayError = delayError;
            this.bufferSize = bufferSize;
        }

3.為被觀察者和觀察者之間建立訂閱關(guān)系

這里的sourceObservableSubscribeOn

小結(jié):

ObserveOnObserver持有了observerinterface Observer<T>

ObservableSubscribeOn.subscribe(observer)

接著查看ObservableSubscribeOnsubscribeActual(observer)方法,源碼如下:

# ObservableSubscribeOn.java
@Override
    public void subscribeActual(final Observer<? super T> observer) {
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);

        observer.onSubscribe(parent);

        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }

1.創(chuàng)建一個Observer觀察者 =>SubscribeOnObserver<T>

SubscribeOnObserver<T>持有一個傳進來的observer,這個傳進來的observer是我們上一步創(chuàng)建的ObserveOnObserver對象

2.調(diào)用ObserveOnObserver.onSubscribe(parent)

查看源碼:

# ObserveOnObserver.java
@Override
        public void onSubscribe(Disposable d) {
              // ...
                downstream.onSubscribe(this);
            }
        }

這里的downstream下游即我們傳進來的觀察者observer,也就是ObserveOnObserver所持有的interface Observer<T>的實現(xiàn)了,也就是最外層的代碼:

@Override
            public void onSubscribe(@io.reactivex.annotations.NonNull Disposable d) {
                System.out.println("onSubscribe");
            }
  1. scheduler.scheduleDirect(new SubscribeTask(parent))其實就是執(zhí)行線程,scheduler就是我們傳進來的線程調(diào)度器Schedulers.IO,
    SubscribeTask實現(xiàn)了Runnable,查看它的run方法:
final class SubscribeTask implements Runnable {
        private final SubscribeOnObserver<T> parent;

        SubscribeTask(SubscribeOnObserver<T> parent) {
            this.parent = parent;
        }

        @Override
        public void run() {
            source.subscribe(parent);
        }
    }

它的run方法只有一句代碼:source.subscribe(parent);parent是SubscribeOnObserver對象,source是傳遞給 ObservableSubscribeOn的source,也就是最開始的ObservableCreate對象。因此實際上是調(diào)用了ObservableCreatesubscribe()方法,并傳入了SubscribeOnObserver對象;

小結(jié)

SubscribeOnObserver持有了observerObserveOnObserver
onSubscribe方法是最先執(zhí)行的方法,并且它的線程和當(dāng)前線程一樣,因為此時還沒有執(zhí)行線程切換。
ObservableCreate.subscribeActual(observer)方法是運行在子線程的。

ObservableCreate.subscribe(observer)

查看ObservableCreatesubscribeActual方法源碼如下:

# ObservableCreate.java
@Override
    protected void subscribeActual(Observer<? super T> observer) {
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        observer.onSubscribe(parent);
       source.subscribe(parent);
    }
  1. 創(chuàng)建了一個事件發(fā)射器CreateEmitter,并傳入了observer;

這里的observerSubscribeOnObserver對象

2.SubscribeOnObserveronSubscribe()方法只是為了設(shè)置Dispoable的值,不影響流程;
3.為被觀察者和觀察者之間建立訂閱關(guān)系

這里的sourceObservableOnSubscribe

這里開始執(zhí)行,外層頁面代碼的subscribe部分,開始發(fā)射數(shù)據(jù)

@Override
 public void subscribe(@io.reactivex.annotations.NonNull ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
 }

小結(jié)

emitter 發(fā)射器發(fā)送數(shù)據(jù)是在子線程。

CreateEmitter.onNext()

上一步的emitter是CreateEmitter<T>,createEmitter的onNext()源碼如下:

# CreateEmitter.java
public void onNext(T t) {
            if (t == null) {
                onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                return;
            }
            if (!isDisposed()) {
                observer.onNext(t);
            }
        }

這里的observerSubscribeOnObserver,查看SubscribeOnObserver的onNext()

# SubscribeOnObserver.java
final Observer<? super T> downstream;
SubscribeOnObserver(Observer<? super T> downstream) {
            this.downstream = downstream;
            this.upstream = new AtomicReference<Disposable>();
        }
@Override
        public void onNext(T t) {
            downstream.onNext(t);
        }

這里的downStreamSubscribeOnObserver的構(gòu)造函數(shù)傳進來,SubscribeOnObserver是在ObservableSubscribeOn.subscribeActual創(chuàng)建的,downStream就是ObserveOnObserver對象,查看ObserveOnObserver的onNext:

#ObserveOnObserver.java
 @Override
        public void onNext(T t) {
            if (done) {
                return;
            }

            if (sourceMode != QueueDisposable.ASYNC) {
                queue.offer(t);
            }
            schedule();
        }

void schedule() {
            if (getAndIncrement() == 0) {
                worker.schedule(this);
            }
        }

最終是調(diào)用了worker執(zhí)行任務(wù),ObserveOnObserver本身實現(xiàn)了Runnable,它的run()方法中調(diào)用了ObserveOnObserver所持有的observeronNext方法,ObservableObserveOn所持有的正是最外層的Observer<T> 接口實現(xiàn)類,因此最后執(zhí)行了

@Override
            public void onNext(@io.reactivex.annotations.NonNull Integer integer) {
                System.out.println(integer);
            }

小結(jié)

worker.schedule(this);這里又切換了線程到主線程,worker正是之前AndroidThread創(chuàng)建的HandlerWorker,其內(nèi)部持有主線程關(guān)聯(lián)的handler,因此最后的onNext是在主線程執(zhí)行。
b784f4ad31217d419faedc0d0efc595f.png

只需要理解,每次 observerOn 和 subscribeOn 的時候,內(nèi)部都會創(chuàng)建一個新的 observable 和 observer。

新創(chuàng)建的 observable 會引用前面的 observable,就是代碼中我們分析的 source 變量。
新創(chuàng)建的 observer 會引用前面的 observer,就是代碼中我們分析的 observer 變量。

最后我們 subscribe 的時候,是調(diào)用的最后創(chuàng)建的 observable 的方法。而每個 observable 內(nèi)部又調(diào)用了 source 的 subscribe 方法,這樣就形成了一層一層往前傳遞的調(diào)用鏈。當(dāng)調(diào)用到最前面的一個 observable 的時候,就是我們自己創(chuàng)建的 observable,在這里我們需要手動觸發(fā)與該 observable 對應(yīng)的 observer 對象的 onNext 方法。而 observer 的 onNext 方法的內(nèi)部又調(diào)用了 downstream 的 onNext 方法,這樣就形成了一層一層往后傳遞的調(diào)用鏈。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容