RxJava2框架源碼分析二(Create篇)

1.回顧

上篇已經(jīng)介紹了RxJava的基本概念以及用法 RxJava2基本框架分析一(基礎(chǔ)篇)

2.實(shí)例講解

       // RxJava的鏈?zhǔn)讲僮?        // 1. 創(chuàng)建被觀察者(Observable) & 定義需發(fā)送的事件
        Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
                emitter.onComplete();
            }
        });
        // 2. 創(chuàng)建觀察者(Observer) & 定義響應(yīng)事件的行為
        Observer<Integer> observer = new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
                System.out.println("開始采用subscribe連接");
            }
            // 默認(rèn)最先調(diào)用復(fù)寫的 onSubscribe()

            @Override
            public void onNext(Integer value) {
                System.out.println("對(duì)Next事件" + value + "作出響應(yīng)");
            }

            @Override
            public void onError(Throwable e) {
                System.out.println("對(duì)Error事件作出響應(yīng)");
            }

            @Override
            public void onComplete() {
                System.out.println("對(duì)Complete事件作出響應(yīng)");
            }

        };
        // 3. 通過訂閱(subscribe)連接觀察者和被觀察者
        observable.subscribe(observer);
  • 運(yùn)行結(jié)果


    示意圖

3. 源碼分析

下面,我講根據(jù) 使用步驟 進(jìn)行RxJava2的源碼進(jìn)行分析
步驟1:創(chuàng)建被觀察者(Observable)&定義需發(fā)送的事件
步驟2:創(chuàng)建觀察者(Observer)&定義響應(yīng)事件的行為
步驟3:通過訂閱(subscribe)連接觀察者和被觀察者

步驟一:創(chuàng)建被觀察者(Observable)

  • 源碼分析如下
// 1. 創(chuàng)建被觀察者(Observable) & 定義需發(fā)送的事件
 Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
                emitter.onComplete();
            }
        });
 /**
  * 源碼分析 Observable.create(object : ObservableOnSubscribe<Int>{...])
  *  create 操作主要是創(chuàng)建了 ObservableCreate 對(duì)象并且返回出去
 */
    @CheckReturnValue
    @NonNull
    @SchedulerSupport(SchedulerSupport.NONE)
    public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
        //判斷source是否為空  
        ObjectHelper.requireNonNull(source, "source is null");
        //hook函數(shù):判斷是否需要再原對(duì)象加上一些代碼操作(暫時(shí)可以當(dāng)做返回對(duì)象本身)
        return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
    }
  
  /**
   * 下面我們來看看 ObservableCreate 對(duì)象里面做了什么操作
   */
    public final class ObservableCreate<T> extends Observable<T> {
    // ObservableCreate 是Observable的子類
    final ObservableOnSubscribe<T> source;

    public ObservableCreate(ObservableOnSubscribe<T> source) {
        //構(gòu)造函數(shù)
        //傳入source對(duì)象,并且賦值全局 = 手動(dòng)創(chuàng)建的ObservableOnSubscribe匿名內(nèi)部類對(duì)象(Observable.create(new ObservableOnSubscribe<Integer>())
        this.source = source;
    }
  //這里需要留心關(guān)注subscribeActual方法后面會(huì)講到

  • 步驟1總結(jié):創(chuàng)建被觀察者的操作已經(jīng)完成了,調(diào)用 Observable.create()返回了一個(gè)ObservableCreate 對(duì)象。

步驟二創(chuàng)建觀察者(Observer)

  • 源碼分析
/** 
  * 使用步驟2:創(chuàng)建觀察者 & 定義響應(yīng)事件的行為(方法內(nèi)的創(chuàng)建對(duì)象代碼)
  **/
 // 2. 創(chuàng)建觀察者(Observer) & 定義響應(yīng)事件的行為
        Observer<Integer> observer = new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
                System.out.println("開始采用subscribe連接");
            }
            // 默認(rèn)最先調(diào)用復(fù)寫的 onSubscribe()

            @Override
            public void onNext(Integer value) {
                System.out.println("對(duì)Next事件" + value + "作出響應(yīng)");
            }

            @Override
            public void onError(Throwable e) {
                System.out.println("對(duì)Error事件作出響應(yīng)");
            }

            @Override
            public void onComplete() {
                System.out.println("對(duì)Complete事件作出響應(yīng)");
            }

        };
/** 
  * 源碼分析Observer類
  **/
     public interface Observer<T> {
        // 注:Observer本質(zhì) = 1個(gè)接口
        // 接口內(nèi)含4個(gè)方法,分別用于 響應(yīng) 對(duì)應(yīng)于被觀察者發(fā)送的不同事件
        void onSubscribe(@NonNull Disposable d); // 內(nèi)部參數(shù):Disposable 對(duì)象,可結(jié)束事件
        void onNext(@NonNull T t);
        void onError(@NonNull Throwable e);
        void onComplete();
    }
  • 步驟2總結(jié):創(chuàng)建觀察者的操作已經(jīng)完成了,通過new了一個(gè)Observer的匿名內(nèi)部類

步驟三:通過訂閱(subscribe)連接觀察者和被觀察者

  • 源碼分析
 // 3. 通過訂閱(subscribe)連接觀察者和被觀察者
        observable.subscribe(observer);

/** 
  * 源碼分析:Observable.subscribe(observer)
  * 說明:該方法屬于 Observable 類的方法(注:傳入1個(gè) Observer 對(duì)象)
  **/  
public abstract class Observable<T> implements ObservableSource<T> {
     ...
    // 僅貼出關(guān)鍵源碼
  @Override
  public final void subscribe(Observer<? super T> observer) {
         ...
         // 僅貼出關(guān)鍵源碼
        //可以看到調(diào)用的是本類的下面抽象方法
         subscribeActual(observer); 
   }
    //定義了一個(gè)抽象方法當(dāng)調(diào)用subscribe時(shí)會(huì)跟這個(gè)調(diào)用Observable子類的實(shí)現(xiàn)方法(就是調(diào)用者)
   protected abstract void subscribeActual(Observer<? super T> observer);
}

/**
*  現(xiàn)在我們回到先前創(chuàng)建的被觀察者中 ObservableCreate類 
**/
public final class ObservableCreate<T> extends Observable<T> {
 // ObservableCreate 是Observable的子類
    final ObservableOnSubscribe<T> source;

    public ObservableCreate(ObservableOnSubscribe<T> source) {
        //構(gòu)造函數(shù)
        //傳入source對(duì)象,并且賦值全局 = 手動(dòng)創(chuàng)建的ObservableOnSubscribe匿名內(nèi)部類對(duì)象(Observable.create(new ObservableOnSubscribe<Integer>())
        this.source = source;
    }

   /** 
      * 重點(diǎn)關(guān)注:復(fù)寫了subscribeActual()
      * 作用:訂閱時(shí),通過接口回調(diào) 調(diào)用被觀察者(Observerable) 與 觀察者(Observer)的方法
      **/
    @Override
    protected void subscribeActual(Observer<? super T> observer) {
      //1. 創(chuàng)建1個(gè)CreateEmitter對(duì)象(封裝成一個(gè)Disposable對(duì)象)
      //作用:發(fā)射事件
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
      //2. 調(diào)用觀察者(Observer)的onSubscribe()
     // onSubscribe()的實(shí)現(xiàn) = 使用步驟2(創(chuàng)建觀察者(Observer))時(shí)復(fù)寫的onSubscribe()
       //將Disposable(CreateEmitter) 傳到觀察者onSubscribe(Disposable d) 參數(shù)中,使之可以解除訂閱
        observer.onSubscribe(parent);

        try {
            //3.調(diào)用source對(duì)象的subscribe()方法
            // source對(duì)象 = 使用步驟1(創(chuàng)建被觀察者(Observable))中創(chuàng)建的ObservableOnSubscribe對(duì)象
            //subscribe()的實(shí)現(xiàn) = 使用步驟1(創(chuàng)建被觀察者(Observable))中復(fù)寫的subscribe()
            //將CreateEmitter對(duì)象傳遞給被觀察者進(jìn)行對(duì)象方法的調(diào)用(onNext(),onComplete()...)
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }

  /** 
    * 分析2:emitter.onNext("1");
    * 此處僅講解subscribe()實(shí)現(xiàn)中的onNext()
    * onError()、onComplete()類似,此處不作過多描述
    **/
    static final class CreateEmitter<T>
    extends AtomicReference<Disposable>
    implements ObservableEmitter<T>, Disposable {

        final Observer<? super T> observer;

        CreateEmitter(Observer<? super T> observer) {
            //初始化講觀察者賦值到全局變量observer
            this.observer = observer;
        }

        @Override
        public void onNext(T t) {
          //當(dāng)被觀察者調(diào)用onNext()方法時(shí),回調(diào)此方法(步驟一中創(chuàng)建Observable.create()匿名內(nèi)部類中的onNext())
            //發(fā)送的事件不能為null
            if (t == null) {
                onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                return;
            }
          //判斷是否斷開連接(調(diào)用Disposable.dispose())
          //沒有斷開的話,則調(diào)用觀察者中的onNext()方法
            if (!isDisposed()) {
                observer.onNext(t);
            }
        }

        @Override
        public void onError(Throwable t) {
            if (!tryOnError(t)) {
                RxJavaPlugins.onError(t);
            }
        }
}

步驟3總結(jié):當(dāng)被觀察者訂閱觀察者的時(shí)候,會(huì)調(diào)用被觀察者ObservablesubscribeActual()抽象方法,回調(diào)其子類重新的subscribeActual()方法。這方法里面有三個(gè)步驟:

  • 創(chuàng)建1個(gè)CreateEmitter對(duì)象(封裝成一個(gè)Disposable對(duì)象)
  • 調(diào)用觀察者(Observer)的onSubscribe(CreateEmitter parent ) 使其可以取消訂閱
  • 調(diào)用source對(duì)象的subscribe(CreateEmitter parent)方法,通過 parent發(fā)送事件回調(diào)

4. 源碼總結(jié)

  • 在步驟1(創(chuàng)建被觀察者(Observable))、步驟2(創(chuàng)建觀察者(Observer))時(shí),僅僅只是定義了發(fā)送的事件 & 響應(yīng)事件的行為;
  • 只有在步驟3(訂閱時(shí)),才開始發(fā)送事件 & 響應(yīng)事件,真正連接了被觀察者 & 觀察者
  • 具體源碼總結(jié)如下


    總結(jié)
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • RxJava——目前最熱門的響應(yīng)式函數(shù)編程框架。筆者也是初涉Rx,所以打算通過這篇文章來理解Rx的操作流程,加深自...
    Robin_Lrange閱讀 11,202評(píng)論 10 44
  • 序言 RxJava是現(xiàn)在最流行的響應(yīng)式函數(shù)編程框架,之前的項(xiàng)目中一直使用RxJava,結(jié)合Retrofit+OkH...
    左大人閱讀 3,318評(píng)論 5 16
  • 前言:學(xué)習(xí)了這么多天的RxJava系列文章,雖然會(huì)用了,但是確不懂的具體是怎么回事,所以說會(huì)用的話還是不行,要去了...
    六_六閱讀 318評(píng)論 0 0
  • 第四次拿起綠底白邊的殘次品瓷杯大口大口灌可樂時(shí),像煎鐵板魷魚一樣滋滋作響的氣泡又把我的眼淚嗆出來。 是的沒錯(cuò),我不...
    南逢酒館閱讀 606評(píng)論 0 1
  • 今天沒有加班,和老公相約在四惠地鐵一起回家,在外面吃了飯,為了少長點(diǎn)肉,我倆又商量去超市溜溜食,一路上聊聊這聊...
    鳳_b89a閱讀 206評(píng)論 0 1

友情鏈接更多精彩內(nèi)容