RxJava 2 源碼分析(一)

在分析之前呢,首先需要大家打開(kāi)源碼對(duì)照分析。效果更加??!

首先我們看一下RxJava 2 三步曲的一個(gè)基本實(shí)現(xiàn):
1 創(chuàng)建被觀察者(也叫數(shù)據(jù)發(fā)射者)
2 創(chuàng)建觀察者(也叫數(shù)據(jù)消費(fèi)者)
3 建立訂閱關(guān)系

代碼如下:

    //第一步 創(chuàng)建被觀察者
    Observable<String> observable = Observable.create(
            new ObservableOnSubscribe<String>() {
                @Override
                public void subscribe(ObservableEmitter<String> e) throws Exception {

                    e.onNext("發(fā)射數(shù)據(jù)");
                    e.onComplete();
                }
            }
    );

    //第二步 創(chuàng)建觀察者
    Observer<String> observer = new Observer<String>() {

        private Disposable disposable;

        @Override
        public void onSubscribe(Disposable d) {
            disposable = d;
        }

        @Override
        public void onNext(String value) {

        }

        @Override
        public void onError(Throwable e) {

        }

        @Override
        public void onComplete() {

        }
    };

    //第三步 建立訂閱關(guān)系
    observable.subscribe(observer);

新記:
我們來(lái)看一下創(chuàng)建Observable的這個(gè)靜態(tài)方法create.實(shí)際上這個(gè)是RxJava大量的操作符中的一個(gè),create方法會(huì)返回一個(gè)Observable實(shí)例。
create方法的參數(shù)是一個(gè)實(shí)現(xiàn)了ObservableOnSubscribe接口的對(duì)象實(shí)例,該接口提供了發(fā)射數(shù)據(jù)的回調(diào)subscribe()方法,回調(diào)回來(lái)的ObservableEmitter實(shí)例就可以看成是數(shù)據(jù)發(fā)射器,用來(lái)發(fā)射數(shù)據(jù)。

我們來(lái)看看Observable.create()的內(nèi)部實(shí)現(xiàn):

public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
    ObjectHelper.requireNonNull(source, "source is null");
    return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}

首先是創(chuàng)建了一個(gè)ObservableCreate實(shí)例,是Observable的子類,把之前參數(shù)中創(chuàng)建的ObservableOnSubscribe實(shí)例直接傳了進(jìn)去,作了件什么事呢?實(shí)際上實(shí)現(xiàn)了一個(gè)代理的作用,代理的是誰(shuí)?是Observer,后面我們?cè)僭敿?xì)分析,我們可以確定,ObservableCreate就是一個(gè)Observable。
我們?cè)倏碦xJavaPlugins.onAssembly().這個(gè)方法拿了ObservableCreate(Observable)實(shí)例去做了什么:

public static <T> Observable<T> onAssembly(Observable<T> source) {
    Function<Observable, Observable> f = onObservableAssembly;
    if (f != null) {
        return apply(f, source);
    }
    return source;
}

RxJavaPlugins這個(gè)類是一個(gè)鉤子函數(shù)集合類,為RxJava中大量的操作符提供鉤子函數(shù)的注入。這里的鉤子函數(shù)會(huì)對(duì)Observable實(shí)例按照鉤子函數(shù)實(shí)際提供的功能進(jìn)行加工處理,然后返回一個(gè)處理過(guò)的Observable。實(shí)際上在我們上面所寫的這個(gè)代碼示例中,這里的鉤子函數(shù)為null,并沒(méi)有對(duì)Observable做任何處理就直接返回了。

實(shí)際上我們示例代碼中整個(gè)被觀察者Observable的創(chuàng)建,實(shí)際創(chuàng)建的是一個(gè)ObservableCreate實(shí)例,該實(shí)例提供了回調(diào)方法subscribe(),當(dāng)發(fā)生訂閱行為時(shí)會(huì)回調(diào),也就是示例代碼中執(zhí)行observable.subscribe(),訂閱后就可以發(fā)射數(shù)據(jù)了。通過(guò)數(shù)據(jù)發(fā)射器ObservableEmitter來(lái)進(jìn)行數(shù)據(jù)發(fā)射。

像這種訂閱后才開(kāi)始發(fā)射數(shù)據(jù)的,我們稱為Cold Observable; 另外一種稱作Hot Observable,這種是不管有沒(méi)有觀察者來(lái)訂閱都會(huì)不斷地發(fā)射數(shù)據(jù)。

我們?cè)賮?lái)看看observable.subscribe()訂閱的內(nèi)部代碼:

  public final void subscribe(Observer<? super T> observer) {
    ObjectHelper.requireNonNull(observer, "observer is null");
    try {
        observer = RxJavaPlugins.onSubscribe(this, observer);

        ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");

        subscribeActual(observer);
    ...//這里把不關(guān)心的代碼省略掉了
}

首先是代碼中的RxJavaPlugins.onSubscribe()的調(diào)用與上面講的鉤子方法是一樣的,這里是通過(guò)鉤子方法對(duì)observer作了某種處理。示例中實(shí)際上也并未調(diào)用實(shí)際的鉤子方法。

最后就執(zhí)行到了subscribeActual()方法,我們前面講過(guò)我們的Observable是一個(gè)ObservableCreate實(shí)例,subscribeActual方法在Observable中是一個(gè)虛方法,真正的實(shí)現(xiàn)是在ObservableCreate中,代碼如下:

protected void subscribeActual(Observer<? super T> observer) {
    CreateEmitter<T> parent = new CreateEmitter<T>(observer);
    observer.onSubscribe(parent);

    try {
        source.subscribe(parent);
    } catch (Throwable ex) {
        Exceptions.throwIfFatal(ex);
        parent.onError(ex);
    }
}

代碼中的CreateEmitter是ObservableCreate的內(nèi)部類,實(shí)現(xiàn)了對(duì)observer的代理,CreateEmitter同時(shí)也實(shí)現(xiàn)了Disposable接口,該接口提供了dispose方法,可以用來(lái)停止對(duì)數(shù)據(jù)的接收。
我們接下來(lái)再看后面的幾行代碼就很明了了,首先是進(jìn)行了observer的onSubscribe回調(diào),然后是調(diào)用了observable的subscribe回調(diào),回調(diào)后就執(zhí)行數(shù)據(jù)發(fā)射操作。
整個(gè)過(guò)程就這樣。

最后,我們來(lái)看一下執(zhí)行的順序:

整個(gè)執(zhí)行順序
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 本篇文章介主要紹RxJava中操作符是以函數(shù)作為基本單位,與響應(yīng)式編程作為結(jié)合使用的,對(duì)什么是操作、操作符都有哪些...
    嘎啦果安卓獸閱讀 2,983評(píng)論 0 10
  • 注:只包含標(biāo)準(zhǔn)包中的操作符,用于個(gè)人學(xué)習(xí)及備忘參考博客:http://blog.csdn.net/maplejaw...
    小白要超神閱讀 2,376評(píng)論 2 8
  • 我從去年開(kāi)始使用 RxJava ,到現(xiàn)在一年多了。今年加入了 Flipboard 后,看到 Flipboard 的...
    Jason_andy閱讀 5,763評(píng)論 7 62
  • 參考:給 Android 開(kāi)發(fā)者的 RxJava 詳解-扔物線深入淺出RxJava 基礎(chǔ) "a library f...
    Vincen1024閱讀 576評(píng)論 0 1
  • 作者寄語(yǔ) 很久之前就想寫一個(gè)專題,專寫Android開(kāi)發(fā)框架,專題的名字叫 XXX 從入門到放棄 ,沉淀了這么久,...
    戴定康閱讀 7,735評(píng)論 13 85

友情鏈接更多精彩內(nèi)容