RxJava原理學(xué)習(xí)

今天項目終于告一段落了,也有機會研究一下源碼,鑒于對RxJava的理解一直停留在三條流的階段,但是尚未真正明白所謂的三條流的實現(xiàn)思想,所以翻出源碼來了解一下,特此記錄,方便后期回顧。

首先,寫一個比較常見的RxJava調(diào)用方式如下:

 Observable.create(ObservableOnSubscribe<Int> { emitter ->
            emitter?.onNext(1)
            emitter?.onNext(2)
            emitter?.onNext(3)
        }).map { t -> t?.toString() }
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(object : Observer<String?> {
                override fun onSubscribe(d: Disposable?) {
                    Log.i("onSubscribe==========>", d?.javaClass!!.name)
                }

                override fun onNext(t: String?) {
                    Log.i("onNext==========>", t!!)
                }

                override fun onError(e: Throwable?) {
                    Log.i("onError==========>", e?.message!!)

                }

                override fun onComplete() {
                    Log.i("onComplete==========>", "onComplete")
                }

            })

1.構(gòu)建流

首先進入 Observable.create方法查看可以看到方法如下:

public static <T> Observable<T> create(@NonNull ObservableOnSubscribe<T> source) {
        Objects.requireNonNull(source, "source is null");
        return RxJavaPlugins.onAssembly(new ObservableCreate<>(source));
    }

可以看到該方法有一個返回值RxJavaPlugins.onAssembly(new ObservableCreate<>(source));進入``可以看到如下方法:

public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
        Function<? super Observable, ? extends Observable> f = onObservableAssembly;
        if (f != null) {
            return apply(f, source);
        }
        return source;
    }

最終還是返回了我們傳入的source,即Observable的子類ObservableCreate,故而不需要關(guān)注RxJavaPlugins.onAssembly這個方法。
接下來只需要關(guān)注ObservableCreate<>(source),這個對象在我們的示例中接下來將會調(diào)用.map方法,跟著方法進入:

public final <R> Observable<R> map(@NonNull Function<? super T, ? extends R> mapper) {
        Objects.requireNonNull(mapper, "mapper is null");
        return RxJavaPlugins.onAssembly(new ObservableMap<>(this, mapper));
    }

這個方法和create方法一樣都是返回一個Observable的子類,這個方法返回了 ObservableMap
同理接下來進入.subscribeOn.observeOn可以分別看到如下方法:

 public final Observable<T> subscribeOn(@NonNull Scheduler scheduler) {
        Objects.requireNonNull(scheduler, "scheduler is null");
        return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<>(this, scheduler));
    }
public final Observable<T> observeOn(@NonNull Scheduler scheduler, boolean delayError, int bufferSize) {
        Objects.requireNonNull(scheduler, "scheduler is null");
        ObjectHelper.verifyPositive(bufferSize, "bufferSize");
        return RxJavaPlugins.onAssembly(new ObservableObserveOn<>(this, scheduler, delayError, bufferSize));
    }

可以知道分別返回了Observable的子類ObservableSubscribeOnObservableObserveOn
接下來進入subscribe方法可以看到如下代碼:

public final void subscribe(@NonNull Observer<? super T> observer) {
        Objects.requireNonNull(observer, "observer is null");
        try {
            observer = RxJavaPlugins.onSubscribe(this, observer);

            Objects.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");

            subscribeActual(observer);
        } catch (NullPointerException e) { // NOPMD
            throw e;
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            // can't call onError because no way to know if a Disposable has been set or not
            // can't call onSubscribe because the call might have set a Subscription already
            RxJavaPlugins.onError(e);

            NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
            npe.initCause(e);
            throw npe;
        }
    }

其中RxJavaPlugins.onSubscribe(this, observer);僅僅是返回一個observer,故而也不需要關(guān)注,那么訂閱的重點應(yīng)該也就是subscribeActual(observer);這個方法上。

2.訂閱流

進入subscribeActual方法,可以看到該方法是一個抽象方法:

 protected abstract void subscribeActual(@NonNull Observer<? super T> observer);

那么它的實現(xiàn)也就是在上游ObservableObserveOn中,可以看到如下代碼:

@Override
    protected void subscribeActual(Observer<? super T> observer) {
        if (scheduler instanceof TrampolineScheduler) {
            source.subscribe(observer);
        } else {
            Scheduler.Worker w = scheduler.createWorker();

            source.subscribe(new ObserveOnObserver<>(observer, w, delayError, bufferSize));
        }

其中的source是上游的ObservableSubscribeOn類,那么這里其實就是調(diào)用了ObservableSubscribeOn.subscribe()方法,由于這些類都是Observable的子類,所以最終還是會調(diào)用subscribeActual()方法,同時new ObserveOnObserver<>(observer, w, delayError, bufferSize))會創(chuàng)建一個新的Observer對象傳給上游。
對于ObservableSubscribeOn方法中的subscribeActual()方法:

 @Override
    public void subscribeActual(final Observer<? super T> observer) {
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<>(observer);

        observer.onSubscribe(parent);

        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }

observer.onSubscribe(parent);會先保證調(diào)用下游ObserveronSubscribe
然后看scheduler.scheduleDirect()方法:

 public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
        final Worker w = createWorker();

        final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

        DisposeTask task = new DisposeTask(decoratedRun, w);

        w.schedule(task, delay, unit);

        return task;
    }

其中的createWorker()是抽象方法:

 public abstract Worker createWorker();

然后示例中我們使用的是Schedulers.io(),只需要去IoScheduler這個類找createWorker的實現(xiàn)就好了:

 @Override
    public Worker createWorker() {
        return new EventLoopWorker(pool.get());
    }

繼續(xù)找w.schedule(task, delay, unit);schedule的實現(xiàn):

 if (delayTime <= 0) {
                f = executor.submit((Callable<Object>)sr);
            } else {
                f = executor.schedule((Callable<Object>)sr, delayTime, unit);
            }

看到這里就知道利用線程池切換了線程,到這里也就差不多了。
最后看SubscribeTaskrun()方法:

        @Override
        public void run() {
            source.subscribe(parent);
        }

同理還是會調(diào)用上一層的subscribeActual()方法并且會把這層的新的觀察者對象new SubscribeOnObserver<>(observer);傳給上游。
ObservableMapsubscribeActual()方法如下:

public void subscribeActual(Observer<? super U> t) {
        source.subscribe(new MapObserver<T, U>(t, function));
    }

和上文類似就不展開分析了。
接下來進入ObservableCreate中``subscribeActual()`方法是這樣的:

@Override
    protected void subscribeActual(Observer<? super T> observer) {
        CreateEmitter<T> parent = new CreateEmitter<>(observer);
        observer.onSubscribe(parent);

        try {
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }

可以看到先是創(chuàng)建了一個CreateEmitter對象,然后和ObservableSubscribeOn類很像,最終還是調(diào)用了上游即ObservableOnSubscribe對象的subscribe方法,按照我們的示例來說,就會執(zhí)行:

 emitter?.onNext(1)
 emitter?.onNext(2)
 emitter?.onNext(3)

到這里訂閱流也就結(jié)束了。

3.觀察者回調(diào)流

onNext方法是一個抽象函數(shù),只需要看它的實現(xiàn)子類CreateEmitter即可:

@Override
        public void onNext(T t) {
            if (t == null) {
                onError(ExceptionHelper.createNullPointerException("onNext called with a null value."));
                return;
            }
            if (!isDisposed()) {
                observer.onNext(t);
            }
        }

可以看到最終調(diào)用的還是下游observer對象的onNext方法
接著進入下游MapObserver和下游的下游SubscribeOnObserver中:

public void onNext(T t) {
            if (done) {
                return;
            }

            if (sourceMode != NONE) {
                downstream.onNext(null);
                return;
            }

            U v;

            try {
                v = Objects.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
            } catch (Throwable ex) {
                fail(ex);
                return;
            }
            downstream.onNext(v);
        }
@Override
        public void onNext(T t) {
            downstream.onNext(t);
        }

可以看到調(diào)用的均是下游的onNext方法。接下來看ObserveOnObserver類中onNext是怎么發(fā)生線程切換的:

@Override
        public void onNext(T t) {
            if (done) {
                return;
            }

            if (sourceMode != QueueDisposable.ASYNC) {
                queue.offer(t);
            }
            schedule();
        }

進入schedule()方法:

void schedule() {
      if (getAndIncrement() == 0) {
             worker.schedule(this);
         }
      }

同樣worker是抽象方法,這里我們示例中使用的線程是AndroidSchedulers.mainThread(),只需要找到createWorker的實現(xiàn)方法以及對應(yīng)Workerschedule方法即可:

@Override
    public Worker createWorker() {
        return new HandlerWorker(handler, async);
    }
@Override
        @SuppressLint("NewApi") // Async will only be true when the API is available to call.
        public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
            if (run == null) throw new NullPointerException("run == null");
            if (unit == null) throw new NullPointerException("unit == null");

            if (disposed) {
                return Disposable.disposed();
            }

            run = RxJavaPlugins.onSchedule(run);

            ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);

            Message message = Message.obtain(handler, scheduled);
            message.obj = this; // Used as token for batch disposal of this worker's runnables.

            if (async) {
                message.setAsynchronous(true);
            }

            handler.sendMessageDelayed(message, unit.toMillis(delay));

            // Re-check disposed state for removing in case we were racing a call to dispose().
            if (disposed) {
                handler.removeCallbacks(scheduled);
                return Disposable.disposed();
            }

            return scheduled;
        }

很明顯使用了Handler切換線程。
同時在

void schedule() {
      if (getAndIncrement() == 0) {
             worker.schedule(this);
         }
      }

ObserveOnObserver實現(xiàn)了Runnable接口,也說明了在接下來的線程做了什么:

 @Override
        public void run() {
            if (outputFused) {
                drainFused();
            } else {
                drainNormal();
            }
        }

分別進入兩個方法的關(guān)鍵看看:

void drainFused() {
            int missed = 1;

            for (;;) {
                if (disposed) {
                    return;
                }

                boolean d = done;
                Throwable ex = error;

                if (!delayError && d && ex != null) {
                    disposed = true;
                    downstream.onError(error);
                    worker.dispose();
                    return;
                }

                downstream.onNext(null);

                if (d) {
                    disposed = true;
                    ex = error;
                    if (ex != null) {
                        downstream.onError(ex);
                    } else {
                        downstream.onComplete();
                    }
                    worker.dispose();
                    return;
                }

                missed = addAndGet(-missed);
                if (missed == 0) {
                    break;
                }
            }
        }
     void drainNormal() {
            int missed = 1;

            final SimpleQueue<T> q = queue;
            final Observer<? super T> a = downstream;

            for (;;) {
                if (checkTerminated(done, q.isEmpty(), a)) {
                    return;
                }

                for (;;) {
                    boolean d = done;
                    T v;

                    try {
                        v = q.poll();
                    } catch (Throwable ex) {
                        Exceptions.throwIfFatal(ex);
                        disposed = true;
                        upstream.dispose();
                        q.clear();
                        a.onError(ex);
                        worker.dispose();
                        return;
                    }
                    boolean empty = v == null;

                    if (checkTerminated(d, empty, a)) {
                        return;
                    }

                    if (empty) {
                        break;
                    }

                    a.onNext(v);
                }

                missed = addAndGet(-missed);
                if (missed == 0) {
                    break;
                }
            }
        }

可以發(fā)現(xiàn)都調(diào)用了下游的onNext()方法。
那么接下來也就會調(diào)用最下游的onNext()方法了,即我們示例中的:

 override fun onNext(t: String?) {
                    Log.i("onNext==========>", t!!)
                }

同時打印結(jié)果如下:

I/onSubscribe==========>: io.reactivex.rxjava3.internal.operators.observable.ObservableObserveOn$ObserveOnObserver
I/onNext==========>: 1
I/onNext==========>: 2
I/onNext==========>: 3

可以看到onSubscribe打印出來的是上游對應(yīng)的observer對象,onNext()也打印出了符合我們預(yù)期的結(jié)果。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

友情鏈接更多精彩內(nèi)容