RxJava2.X源碼解析(四)

更多分享:http://www.cherylgood.cn

一、前言

  • 基于RxJava2.1.1
  • 我們在前面的 RxJava2.0使用詳解(一)初步分析了RxJava從創(chuàng)建到執(zhí)行的流程。RxJava2.0使用詳解(二) 中分析了RxJava的隨意終止Reactive流的能力的來源;也明白了RxJavaonComplete();onError(t);只有一個會被執(zhí)行的秘密。RxJava2.X 源碼分析(三)中探索了RxJava2調(diào)用subscribeOn切換被觀察者線程的原理。
  • 本次我們將繼續(xù)探索RxJava2.x切換觀察者的原理,分析observeOnsubscribeOn的不同之處。繼續(xù)實現(xiàn)我們在第一篇中定下的小目標(biāo)

二、從Demo到原理

  • OK,我們的Demo還是上次的demo,忘記了的小伙伴可以點擊RxJava2.X 源碼分析(三),這里就不再重復(fù)了哦,我們直接進入正題。
  • Ok,按照套路,我們從observeOn方法入手。
  • Ok,我點~_
 @CheckReturnValue
  @SchedulerSupport(SchedulerSupport.CUSTOM)
  public final Observable<T> observeOn(Scheduler scheduler) {
      //false為默認(rèn)無延遲發(fā)送錯誤,bufferSize為緩沖區(qū)大小
      return observeOn(scheduler, false, bufferSize());
  }
  • 我們繼續(xù)往下看,我猜套路跟subscribeOn的逃不多,也是采用裝飾者模式,wrapper我們的ObservableObserver產(chǎn)生一個中間被觀察者和觀察中,通過中間被觀察者訂閱上游被觀察者,通過中間觀察者接收上游被觀察者下發(fā)的數(shù)據(jù),然后通過線程切換將數(shù)據(jù)傳遞給下游觀察者。
  • Ok,我們來驗證下才想。我覺得就是沒完全猜對,也能猜對其中的大部分。
 @CheckReturnValue
  @SchedulerSupport(SchedulerSupport.CUSTOM)
  public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
      ObjectHelper.requireNonNull(scheduler, "scheduler is null");
      ObjectHelper.verifyPositive(bufferSize, "bufferSize");
      return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
  }
  • Ok,熟悉的RxJavaPlugins.onAssemblyhook處理,略過,直接看new ObservableObserveOn(this, scheduler, delayError, bufferSize)這句

      public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
          final Scheduler scheduler;
          final boolean delayError;
          final int bufferSize;
          public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
              super(source);
              this.scheduler = scheduler;
              this.delayError = delayError;
              this.bufferSize = bufferSize;
          }
    
          @Override
        protected void subscribeActual(Observersuper T> observer) {
             //1、在當(dāng)前線程調(diào)度,但不是立即執(zhí)行,放入隊列中
              if (scheduler instanceof TrampolineScheduler) {
                  source.subscribe(observer);
              } else {
               //2、本次走的是這里
                  Scheduler.Worker w = scheduler.createWorker();
                //3
                  source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
              }
          }
    
  • Ok,果然,熟悉的模式,對我們上游的Observable,下游的Observerwrapper一次。
    1、ObservableObserveOn繼承了AbstractObservableWithUpstream
    2、source保存上游的Observable
    3、scheduler為本次的調(diào)度器
    4、在下游調(diào)用subscribe訂閱時觸發(fā)->subscribeActual->Wrapper了下游的Observer觀察者
  • 3處:source為游Observable,下游Observer被wrapper到ObserveOnObserver,發(fā)生訂閱數(shù)件,上游Observable開始執(zhí)行subscribeActual,調(diào)用ObserveOnObserver的onSubscribe以及onNext、onError、onComplete等

  • OK,我們接著看Observer被包裝進 ObserveOnObserver的樣子,代碼有點多,我們分段講解
static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
    implements Observer<T>, Runnable {

        private static final long serialVersionUID = 6576896619930983584L;
        //下游的Observer
        final Observersuper T> actual;
        //調(diào)度工作者
        final Scheduler.Worker worker;
        //是否延遲錯誤,默認(rèn)false
        final boolean delayError;
        //隊列大小
        final int bufferSize;
        //存儲上游Observable下發(fā)的數(shù)據(jù)隊列
        SimpleQueue<T> queue;
        //存儲下游Observer的Disposable
        Disposable s;
        //存儲錯誤信息
        Throwable error;
        //校驗是否完畢
        volatile boolean done;
        //是否被取消
        volatile boolean cancelled;
        //存儲執(zhí)行模式,同步或者異步 同步
        int sourceMode;

        boolean outputFused;

        ObserveOnObserver(Observersuper T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {
            this.actual = actual;
            this.worker = worker;
            this.delayError = delayError;
            this.bufferSize = bufferSize;
        }

        @Override
      public void onSubscribe(Disposable s) {

            if (DisposableHelper.validate(this.s, s)) {
                this.s = s;
                if (s instanceof QueueDisposable) {
                    @SuppressWarnings("unchecked")
                    QueueDisposable<T> qd = (QueueDisposable<T>) s;

                    int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY);

                  //1、判斷執(zhí)行模式并調(diào)用onSubscribe傳遞給下游Observer
                    if (m == QueueDisposable.SYNC) {
                        sourceMode = m;
                        queue = qd;
                        //true 后面的onXX方法都不會被調(diào)用
                        done = true;
                        actual.onSubscribe(this);
                        //2、同步模式下,直接調(diào)用schedule
                        schedule();
                        return;
                    }
                    if (m == QueueDisposable.ASYNC) {
                        sourceMode = m;
                        queue = qd;
                        actual.onSubscribe(this);
                        //2、異步模式下,等待schedule
                        return;
                    }
                }

                queue = new SpscLinkedArrayQueue<T>(bufferSize);
                //判斷執(zhí)行模式并調(diào)用onSubscribe傳遞給下游Observer
                actual.onSubscribe(this);
            }
        }
  • OK,執(zhí)行玩這里之后,就到我們的onXX方法了
  • 首先可無限調(diào)用的onNext
 @Override
  public void onNext(T t) {
       //3、數(shù)據(jù)源是同步模式或者執(zhí)行過error / complete 會是true
      if (done) {
          return;
      }
      //如果數(shù)據(jù)源不是異步類型,
      if (sourceMode != QueueDisposable.ASYNC) {
          //4、上游Observable下發(fā)的數(shù)據(jù)壓入queue
          queue.offer(t);
      }
      //5、開始調(diào)度
      schedule();
  }
  • 其次只能觸發(fā)一次的onError,基本差不多
 @Override
    public void onError(Throwable t) {
        if (done) {
            //6、已完成再執(zhí)行會拋一場
            RxJavaPlugins.onError(t);
            return;
        }
        //7、記錄錯誤信息
        error = t;
        //8、標(biāo)識已完成
        done = true;
        //9、開始調(diào)度
        schedule();
    }
  • 同樣是只能觸發(fā)一次的onComplete,同樣的套路,就不說了
 @Override
    public void onComplete() {
        if (done) {
            return;
        }
        done = true;
        schedule();
    }
  • 然后就是我們的關(guān)鍵點schedule();
 //關(guān)鍵點就是直接、簡單、里面線程調(diào)度工作者調(diào)用schedule(this),傳入了this
    void schedule() {
           //getAndIncrement很關(guān)鍵,他原子性的保證了worker.schedule(this);在調(diào)度完之前不會被再次調(diào)度
        if (getAndIncrement() == 0) {
            worker.schedule(this);
        }
    }
  • 什么?傳入了this?那么說明什么呢?( ̄? ̄)

  • 嗯?this是個runnable,沒錯,我們的ObserveOnObserver實現(xiàn)了Runnable接口

  • 那么,接下來自然是調(diào)用run方法

    @Override
    public void run() {
          //outputFused一般是false
        if (outputFused) {
            drainFused();
        } else {
            drainNormal();
        }
  • 好吧,在看drainNormal前,我們先看一個函數(shù)
 //從名字看是檢測是否已終止
    boolean checkTerminated(boolean d, boolean empty, Observersuper T> a) {
        //1、訂閱已取消
        if (cancelled) {
            //清空隊列
            queue.clear();
            return true;
        }
        //2、d其實是done,
        if (d) {
            //done==ture可能的情況onNext剛被調(diào)度完,onError或者onCompele被調(diào)用,
            Throwable e = error;
            if (delayError) {
                //delayError==true時等到隊列為空才調(diào)用
                if (empty) {
                    if (e != null) {
                        a.onError(e);
                    } else {
                        a.onComplete();
                    }
                    worker.dispose();
                    return true;
                }
            } else {
                //否則直接調(diào)用
                if (e != null) {
                    queue.clear();
                    a.onError(e);
                    worker.dispose();
                    return true;
                } else
     if (empty) {
                    a.onComplete();
                    worker.dispose();
                    return true;
                }
            }
        }
        //否則未終結(jié)
        return false;
    }
  • true:1、訂閱被取消cancelled==true,2、done==true onNext剛被調(diào)度完,onError或者onCompele被調(diào)用

  • 繼續(xù)看drainNormal

void drainNormal() {
      int missed = 1;
      final SimpleQueue<T> q = queue;
      final Observersuper T> a = actual;
      //Ok,死循環(huán),我們來看下有哪些出口
      for (;;) {
      //Ok,出口,該方法前面分析的
      if (checkTerminated(done, q.isEmpty(), a)) {
              return;
          }

          //在此死循環(huán)
          for (;;) {
              boolean d = done;
              T v;
              try {
                  //分發(fā)數(shù)據(jù)出隊列
                  v = q.poll();
              } catch (Throwable ex) {
                  //有異常時終止退出
                  Exceptions.throwIfFatal(ex);
                  s.dispose();
                  q.clear();
                  a.onError(ex);
                  //停止worker(線程)
                  worker.dispose();
                  return;
              }
              boolean empty = v == null;
              //判斷隊列是否為空
              if (checkTerminated(d, empty, a)) {
                  return;
              }
               //沒數(shù)據(jù)退出
              if (empty) {
                  break;
              }
              //數(shù)據(jù)下發(fā)給下游Obsever,這里支付者onNext,onComplete和onError主要放在了checkTerminated里面回調(diào)
              a.onNext(v);
          }
       //保證此時確實有一個 worker.schedule(this);正在被執(zhí)行,
          missed = addAndGet(-missed);
       //為何要這樣做呢?我的理解是保證drainNormal方法被原子性調(diào)用,如果執(zhí)行了addAndGet之后getAndIncrement() == 0就成立了,此時又一個worker.schedule(this);被調(diào)用了,那么就不能執(zhí)行break了
          if (missed == 0) {
              break;
          }
      }
  }

總結(jié)

  • Ok,看到這里我們基本了解了observeOn的實現(xiàn)流程,同樣是老套路,使用裝飾者模式,中間Wrapper了我們的Observable和Observer,通過中間增加一個Observable和Observer來實現(xiàn)線程的切換。
  • 喜歡就給我留言哦

相關(guān)文章

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容