Android-RxJava源碼解析

RxJava3.0已經(jīng)發(fā)布了,但是這里還是以RxJava2.x來分析部分源碼。RxJava采用的是響應(yīng)式編程的原理,采用觀察者模式。

一、RxJava案例和流程

        Observable<String> observable = Observable
                .create(new ObservableOnSubscribe<String>() {
                    @Override
                    public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Exception {
                        emitter.onNext("hello");
                    }
                })
                .map(new Function<String, String>() {
                    @Override
                    public String apply(String s) throws Exception {
                        return s;
                    }
                })
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread());

        observable.subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(String s) {
                Log.i(TAG, s);
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });
RxJava調(diào)用流程圖.png

簡易的流程圖.png

這是一個(gè)標(biāo)注的RxJava使用例子,其實(shí)這個(gè)流程就是先從上往下一級一級的封裝對應(yīng)的Observable,然后再從下往上通過Observable.subscribe訂閱向上傳遞Observer觀察者,最后ObservableCreate中將Observer進(jìn)行封裝CreateEmitter,調(diào)用ObservableOnSubscribe.subscribe(emitter)用于業(yè)務(wù)中通知,CreateEmitter是ObservableEmitter子類實(shí)現(xiàn),是ObservableCreate的靜態(tài)內(nèi)部類

    static final class CreateEmitter<T>
    extends AtomicReference<Disposable>
    implements ObservableEmitter<T>, Disposable {

        private static final long serialVersionUID = -3434801548987643227L;

        final Observer<? super T> observer;

        CreateEmitter(Observer<? super T> observer) {
            this.observer = observer;
        }

        @Override
        public void onNext(T t) {
            if (t == null) {
                onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                return;
            }
            if (!isDisposed()) {
                observer.onNext(t);
            }
        }

        @Override
        public void onError(Throwable t) {
            if (!tryOnError(t)) {
                RxJavaPlugins.onError(t);
            }
        }

        @Override
        public boolean tryOnError(Throwable t) {
            if (t == null) {
                t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
            }
            if (!isDisposed()) {
                try {
                    observer.onError(t);
                } finally {
                    dispose();
                }
                return true;
            }
            return false;
        }

        @Override
        public void onComplete() {
            if (!isDisposed()) {
                try {
                    observer.onComplete();
                } finally {
                    dispose();
                }
            }
        }

        @Override
        public void setDisposable(Disposable d) {
            DisposableHelper.set(this, d);
        }

        @Override
        public void setCancellable(Cancellable c) {
            setDisposable(new CancellableDisposable(c));
        }

        @Override
        public ObservableEmitter<T> serialize() {
            return new SerializedEmitter<T>(this);
        }

        @Override
        public void dispose() {
            DisposableHelper.dispose(this);
        }

        @Override
        public boolean isDisposed() {
            return DisposableHelper.isDisposed(get());
        }

        @Override
        public String toString() {
            return String.format("%s{%s}", getClass().getSimpleName(), super.toString());
        }
    }

從CreateEmitter的內(nèi)部onNext方法可以看出,其內(nèi)部就是通過調(diào)用Observer.onNext通知觀察者結(jié)果。其實(shí)整個(gè)流程來看,Observable向下一層層進(jìn)行封裝,通過鉤子函數(shù)的方式,而Observer從下向上進(jìn)行封裝傳遞,而且業(yè)務(wù)都是在Observer中的onNext執(zhí)行的,比如.map()中定義的Function,其實(shí)就是在MapObserver中的onNext中執(zhí)行


RxJava處理S型結(jié)構(gòu).png

二、分析每步的源碼

RxJavaPlugins.onAssembly

在具體分析之前,首先先看RxJavaPlugins.onAssembly的具體實(shí)現(xiàn),RxJavaPlugins.onAssembly方法可以說貫穿整個(gè)RxJava流程,RxJavaPlugins.onAssembly方法目的就是作為一個(gè)鉤子函數(shù),將之前的Observable進(jìn)行封裝,變成一個(gè)新的Observable。
在上面的例子中,這里的f對象一直都是null。

    @NonNull
    public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
        Function<? super Observable, ? extends Observable> f = onObservableAssembly;
        if (f != null) {
            return apply(f, source);
        }
        return source;
    }

1.Observable.subscribe

所有的Observable以及子類調(diào)用subscribe方法時(shí),都是調(diào)用Observable.subscribe(),所以在上述流程中,第一個(gè)調(diào)用subscribe()方法的就是ObservableObServeOn這個(gè)Observable子類,ObservableObServeOn是在observeOn()方法中對上一個(gè)Observable進(jìn)行封裝創(chuàng)建的。

    @SchedulerSupport(SchedulerSupport.NONE)
    @Override
    public final void subscribe(Observer<? super T> observer) {
        ObjectHelper.requireNonNull(observer, "observer is null");
        try {
            observer = RxJavaPlugins.onSubscribe(this, observer);

            ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");

            subscribeActual(observer);
        } catch (NullPointerException e) { // NOPMD
            throw e;
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            // can't call onError because no way to know if a Disposable has been set or not
            // can't call onSubscribe because the call might have set a Subscription already
            RxJavaPlugins.onError(e);

            NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
            npe.initCause(e);
            throw npe;
        }
    }

2.ObservableObserveOn

ObservableObserveOn對象其實(shí)就是在調(diào)用ObserveOn的時(shí)候創(chuàng)建的。封裝當(dāng)前的Observable即在observeOn方法調(diào)用之前最新創(chuàng)建的Observable

    @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.CUSTOM)
    public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        ObjectHelper.verifyPositive(bufferSize, "bufferSize");
        return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
    }
public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;
    final boolean delayError;
    final int bufferSize;
// 這里的source其實(shí)就是Observable實(shí)現(xiàn)類,在上面的例子中,其實(shí)就是ObservableSubscribeOn對象
    public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
        super(source);
        this.scheduler = scheduler;
        this.delayError = delayError;
        this.bufferSize = bufferSize;
    }

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        if (scheduler instanceof TrampolineScheduler) {
            source.subscribe(observer);
        } else {
            Scheduler.Worker w = scheduler.createWorker();
            // 封裝Observer,然后通過調(diào)用上一個(gè)Observable.subscribe,向上傳遞Observer觀察者
            source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
        }
    }
    ...
}

在這里創(chuàng)建了一個(gè)Scheduler.Worker,Scheduler是一個(gè)線程調(diào)度類,Worker其實(shí)就是針對線程調(diào)用的工作者。而Scheduler會通過不同的子類實(shí)現(xiàn),將當(dāng)前observeOn定義的線程,即讓在observeOn之前封裝的Observer都在該線程之前(Observer是從下向上傳遞的)。
ObserveOn與SubscribeOn有點(diǎn)相反,ObserveOn針對的是Observer,即觀察者,觀察者是從下向上傳遞封裝的,而ObserverOn中接收到的觀察者,其實(shí)是其下游封裝之后的觀察者,所以O(shè)bserveOn針對的是其下游

具體分析observeOn(AndroidSchedulers.mainThread())
(1)AndroidSchedulers.mainThread()
public static Scheduler mainThread() {
    return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);
}
(2)AndroidSchedulers.MAIN_THREAD
private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler(
        new Callable<Scheduler>() {
            @Override public Scheduler call() throws Exception {
                return MainHolder.DEFAULT;
            }
        });
(3)AndroidSchedulers.MainHolder.DEFAULT
private static final class MainHolder {
    static final Scheduler DEFAULT
        = new HandlerScheduler(new Handler(Looper.getMainLooper()), false);
}

所以AndroidSchedulers.mainThread()的線程切換到主線程,其實(shí)就是交由Handler來實(shí)現(xiàn)。HandlerScheduler其實(shí)是Scheduler的子類,依然是用來創(chuàng)建Scheduler.Worker實(shí)例,然后通過Worker.schedule方法進(jìn)行線程切換,將之前的線程切換到主線程。

(4)observeOn對應(yīng)的Observable->ObservableObserveOn

而ObservableObserveOn中也有一個(gè)Scheduler實(shí)例,這個(gè)實(shí)例其實(shí)就是HandlerScheduler對象。

public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;
    final boolean delayError;
    final int bufferSize;
    public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
        super(source);
        this.scheduler = scheduler;
        this.delayError = delayError;
        this.bufferSize = bufferSize;
    }

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        if (scheduler instanceof TrampolineScheduler) {
            source.subscribe(observer);
        } else {
            Scheduler.Worker w = scheduler.createWorker();

            source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
        }
    }
    ...
}
(5)Scheduler.Worker w = scheduler.createWorker();

這里其實(shí)就是調(diào)用了HandlerScheduler這個(gè)Scheduler的createWorkder()方法

// HandlerScheduler.createWorker()
@Override
public Worker createWorker() {
    return new HandlerWorker(handler, async);
}
(6)線程切換

而ObserveOnObserver中的線程切換,其實(shí)就是調(diào)用ObserveOnObserver的schedule()方法實(shí)現(xiàn)的,而ObserveOnObserver中的Scheduler.Worker worker對象,是一個(gè)HandlerWorker對象。

// ObserveOnObserver類中的部分方法

@Override
public void run() {
    if (outputFused) {
        // 直接執(zhí)行下游Observer.onNext方法。
        drainFused();
    } else {
        // 從隊(duì)列中取任務(wù)
        drainNormal();
    }
}

void schedule() {
    if (getAndIncrement() == 0) {
        worker.schedule(this);
    }
}

這里的this,其實(shí)就是ObserveOnObserver對象,而在ObserveOnObserver中,都會將要執(zhí)行的task放到隊(duì)列中。而ObserveOnObserver本身就是一個(gè)Runnable

(7)HandlerSchedule.HandlerWorker.schedule
@Override
@SuppressLint("NewApi") // Async will only be true when the API is available to call.
public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
    if (run == null) throw new NullPointerException("run == null");
    if (unit == null) throw new NullPointerException("unit == null");

    if (disposed) {
        return Disposables.disposed();
    }

    // 這里的run,其實(shí)就是ObserveOnObserver對象
    run = RxJavaPlugins.onSchedule(run);

    // 封裝ObserveOnObserver對象,ObserveOnObserver本身就是一個(gè)Runnable
    ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);

    Message message = Message.obtain(handler, scheduled);
    message.obj = this; // Used as token for batch disposal of this worker's runnables.

    if (async) {
        message.setAsynchronous(true);
    }

    // 通過Handler執(zhí)行消息,進(jìn)而達(dá)到切換到主線程的目的
    handler.sendMessageDelayed(message, unit.toMillis(delay));

    // Re-check disposed state for removing in case we were racing a call to dispose().
    if (disposed) {
        handler.removeCallbacks(scheduled);
        return Disposables.disposed();
    }

    return scheduled;
}

所以從上面的代碼可以看出,其實(shí)ObserveOn的切換任務(wù),首先會接收到會被后一個(gè)Observer調(diào)用onNext觸發(fā)調(diào)用ObserveOnObserver的onNext()方法(Observer從下向上封裝,所以下面是前一個(gè),上面是后一個(gè))然后就會調(diào)用ObserveOnObserver的schedule()方法,觸發(fā)Handler同步執(zhí)行任務(wù),而封裝的Runnable其實(shí)就是在其run()方法中調(diào)用了ObserveOnObserver的run()方法調(diào)用了前一個(gè)(下游)的Observer的onNext()方法,將結(jié)果轉(zhuǎn)換線程回調(diào)給了觀察者
所以是針對ObserveOn下游代碼的。

3.ObservableSubscribeOn

public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;

    public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
        super(source);
        this.scheduler = scheduler;
    }

    @Override
    public void subscribeActual(final Observer<? super T> observer) {
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);

        observer.onSubscribe(parent);

        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }
    ...
}

我們知道,subscribeOn只有設(shè)置第一次有效,這其實(shí)是因?yàn)镾ubscribeOnObserver.setDisposable方法中,調(diào)用的是DisposableHelper.setOnce(this, d);方法

    static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {

        private static final long serialVersionUID = 8094547886072529208L;
        final Observer<? super T> downstream;

        final AtomicReference<Disposable> upstream;

        SubscribeOnObserver(Observer<? super T> downstream) {
            this.downstream = downstream;
            this.upstream = new AtomicReference<Disposable>();
        }

        @Override
        public void onSubscribe(Disposable d) {
            DisposableHelper.setOnce(this.upstream, d);
        }
        ...

        void setDisposable(Disposable d) {
            DisposableHelper.setOnce(this, d);
        }
    }

而DisposableHelper.setOnce方法的實(shí)現(xiàn)如下:

    public static boolean setOnce(AtomicReference<Disposable> field, Disposable d) {
        ObjectHelper.requireNonNull(d, "d is null");
        if (!field.compareAndSet(null, d)) {
            d.dispose();
            if (field.get() != DISPOSED) {
                reportDisposableSet();
            }
            return false;
        }
        return true;
    }

而當(dāng)前subscribeOn對應(yīng)的Observable即ObservableSubscribeOn中的subscribeActual方法中,針對之前傳進(jìn)來的Observer做了封裝,而這里的source其實(shí)就是在subscribeOn之前的Observable,所以在subscribeTask中的run方法中調(diào)用source.subscribe(parent);其實(shí)就是將subscribeOn之前的邏輯運(yùn)行的線程切換到了subscribeOn指定的線程。而subscribeOn后面部分的代碼,如果沒有指定線程切換,都是在subscribeActual中調(diào)用source.subscribe()的,所以并不會在subscribeOn指定的線程中執(zhí)行

    final class SubscribeTask implements Runnable {
        private final SubscribeOnObserver<T> parent;

        SubscribeTask(SubscribeOnObserver<T> parent) {
            this.parent = parent;
        }

        @Override
        public void run() {
            source.subscribe(parent);
        }
    }

如果調(diào)用多次的subscribeOn,其實(shí)每次的線程切換都會生效,但是最終只有第一個(gè)調(diào)用的subscribeOn會生效的,這個(gè)原因其實(shí)就是subscribeOn是切換其上游的線程,而subscribeOn線程切換,其實(shí)切換的就是source.subscribe(parent)所在的線程,如果create().subscribeOn,那么subscribeOn切換的就是create()中的source(即ObservableCreate)所在線程。subscribeOn創(chuàng)建對應(yīng)的ObservableSubscribeOn這個(gè)Observable是從上向下的,但是調(diào)用subscribeActual,封裝Observer是從下向上的,所以就算多次使用subscribeOn進(jìn)行線程的切換,最終只有第一個(gè)subscribeOn生效,即最后被調(diào)用subscribeActual的ObservableSubscribeOn生效了。

subscribeOn與ObserveOn不同的是,ObserveOn切換的是Observer的線程,而subscribeOn切換的是Observable的線程。所以,subscribeOn是影響上游的操作,而observeOn影響的是其下游的操作

image.png

看這個(gè)圖,其實(shí)ObservableSubscribeOn就是subscribeOn創(chuàng)建的對應(yīng)的Observable,在這個(gè)Observable的subscribeActual方法中,其實(shí)就是線程切換執(zhí)行任務(wù),而對應(yīng)的Runnable.run方法中調(diào)用的其實(shí)就是source.subscribe(),即ObservableSubscribeOn內(nèi)部封裝的Observable.subscribe線程做了切換調(diào)度,而ObservableSubscribeOn內(nèi)部封裝的Observable是subscribeOn上游創(chuàng)建的Observable
image.png

針對parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));做分析
(1)Scheduler.scheduleDirect
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run) {
    return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
}
(2)Scheduler.scheduleDirect
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
    // 創(chuàng)建工作對象,內(nèi)部通過線程池執(zhí)行runable任務(wù),這個(gè)任務(wù)其實(shí)就是SubscribeTask
    // 創(chuàng)建Scheduler.Worker實(shí)例,其目的就是為了創(chuàng)建EventLoopWorker實(shí)例以及其內(nèi)部的ThreadWorker實(shí)例
    // 最終的線程池調(diào)用就是通過ThreadWorker內(nèi)部的線程池來進(jìn)行,這樣就將任務(wù)交給了ThreadWorker中的線程池
    final Worker w = createWorker();

    final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

    // 封裝Runable,其實(shí)還是一個(gè)Runbale,只不過多實(shí)現(xiàn)了Disposable接口
    // 這是為了可以在中斷任務(wù)的時(shí)候,將這個(gè)異步執(zhí)行的任務(wù)中斷
    DisposeTask task = new DisposeTask(decoratedRun, w);

    w.schedule(task, delay, unit);

    return task;
}

這里的createWorker()方法,其實(shí)是調(diào)用的IoScheduler.createWorker方法。

(3)IoScheduler.createWorker
@NonNull
@Override
public Worker createWorker() {
    // 創(chuàng)建Scheduler.Worker實(shí)例對象,這時(shí)會構(gòu)造器內(nèi)部創(chuàng)建ThreadWorker實(shí)例
    // ThreadWorker實(shí)例是從CachedWorkerPool實(shí)例調(diào)用get()方法創(chuàng)建的
    return new EventLoopWorker(pool.get());
}

這里的pool是一個(gè)AtomicReference<CachedWorkerPool>對象,get()得到的是一個(gè)線程池包裝類。

        ThreadWorker get() {
            if (allWorkers.isDisposed()) {
                return SHUTDOWN_THREAD_WORKER;
            }
            while (!expiringWorkerQueue.isEmpty()) {
                ThreadWorker threadWorker = expiringWorkerQueue.poll();
                if (threadWorker != null) {
                    return threadWorker;
                }
            }

            // No cached worker found, so create a new one.
            // 如果cached中沒有ThreadWorker,則會創(chuàng)建一個(gè),并且在ThreadWorker構(gòu)造器中會創(chuàng)建線程池ScheduledExecutorService executor;
            // 這個(gè)是因?yàn)門hreadWorker構(gòu)造器執(zhí)行父類構(gòu)造器的時(shí)候創(chuàng)建的
            ThreadWorker w = new ThreadWorker(threadFactory);
            allWorkers.add(w);
            return w;
        }
public class NewThreadWorker extends Scheduler.Worker implements Disposable {
    private final ScheduledExecutorService executor;

    volatile boolean disposed;

    public NewThreadWorker(ThreadFactory threadFactory) {
        executor = SchedulerPoolFactory.create(threadFactory);
    }
...
}
(4)第三步里的w.schedule其實(shí)就是調(diào)用IoScheduler中的內(nèi)部類EventLoopWorker.schedule
@NonNull
@Override
public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
    if (tasks.isDisposed()) {
        // don't schedule, we are unsubscribed
        return EmptyDisposable.INSTANCE;
    }

    return threadWorker.scheduleActual(action, delayTime, unit, tasks);
}

而threadWorker其實(shí)就是通過CachedWorkerPool.get()得到的。
而threadWorker其實(shí)就是ThreadWorker,是IoScheduler的內(nèi)部類,是NewThreadWorker的子類實(shí)現(xiàn)。所以這里調(diào)用scheduleActual,其實(shí)就是調(diào)用NewThreadWorker.scheduleActual

(5)NewThreadWorker.scheduleActual
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
    Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

    ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);

    if (parent != null) {
        if (!parent.add(sr)) {
            return sr;
        }
    }

    Future<?> f;
    try {
        // executor其實(shí)就是線程池對象。sr就是要執(zhí)行的任務(wù),
        // executor這個(gè)線程池是在創(chuàng)建ThreadWorker的時(shí)候初始化創(chuàng)建的
        if (delayTime <= 0) {
            f = executor.submit((Callable<Object>)sr);
        } else {
            f = executor.schedule((Callable<Object>)sr, delayTime, unit);
        }
        sr.setFuture(f);
    } catch (RejectedExecutionException ex) {
        if (parent != null) {
            parent.remove(sr);
        }
        RxJavaPlugins.onError(ex);
    }

    return sr;
}

而任務(wù)中,其實(shí)就是ObservableSubscribeOn的靜態(tài)內(nèi)部類實(shí)現(xiàn)對象,SubscribeTask。而SubscribeTask這個(gè)Runnable的run方法中是調(diào)用了source.subscribe(),這個(gè)source其實(shí)就是在subscribeOn之前封裝的Observable實(shí)例,所以這里的線程池異步調(diào)用的時(shí)候,其實(shí)就是執(zhí)行subscribeOn之前的Observable.subscribe過程,所以subscribeOn是針對代碼上游的線程切換。
但是Observer.onSubscribe并不會為異步,subscribeOn的線程切換不會針對onSubscribe,調(diào)用subscribe的Observable在什么線程,則onSubscribe就在什么線程中執(zhí)行。而onError和onComplete都是會因?yàn)榫€程切換而影響。

4.ObservableMap

其實(shí)map操作符,內(nèi)部創(chuàng)建的是ObservableMap,會傳入Function對象,而Function對象是被封裝在MapObserver這個(gè)Observer中的,當(dāng)從上游向下調(diào)用發(fā)送Observer.onNext的時(shí)候,就會在MapObserver中觸發(fā)Function中的操作。

    static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
        final Function<? super T, ? extends U> mapper;

        MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
            super(actual);
            this.mapper = mapper;
        }

        @Override
        public void onNext(T t) {
            if (done) {
                return;
            }

            if (sourceMode != NONE) {
                downstream.onNext(null);
                return;
            }

            U v;

            try {
                v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
            } catch (Throwable ex) {
                fail(ex);
                return;
            }
            downstream.onNext(v);
        }

        ...
    }

5.ObservableCreate

這里就看關(guān)鍵部分的源碼,即subscribeActual的實(shí)現(xiàn)

    public final class ObservableCreate<T> extends Observable<T> {
  // ObservableCreate類 = Observable的子類 

      ...
      // 僅貼出關(guān)鍵源碼

        final ObservableOnSubscribe<T> source;

        // 構(gòu)造函數(shù)
      // 傳入了傳入source對象 = 手動(dòng)創(chuàng)建的ObservableOnSubscribe對象
        public ObservableCreate(ObservableOnSubscribe<T> source) {
            this.source = source;
        }
        
    /** 
      * 重點(diǎn)關(guān)注:復(fù)寫了subscribeActual()
      * 作用:訂閱時(shí),通過接口回調(diào) 調(diào)用被觀察者(Observerable) 與 觀察者(Observer)的方法
      * 該方法,是在被觀察者調(diào)用subscribe()方法與觀察者綁定的時(shí)候,調(diào)用的。
      **/
        @Override
        protected void subscribeActual(Observer<? super T> observer) {

            // 1. 創(chuàng)建1個(gè)CreateEmitter對象(封裝成1個(gè)Disposable對象)
            // 作用:發(fā)射事件
            // CreateEmitter類中是對觀察者的一個(gè)封裝類,用于被觀察者變化時(shí)向觀察者分發(fā)事件
            CreateEmitter<T> parent = new CreateEmitter<T>(observer);

            // 2. 調(diào)用觀察者(Observer)的onSubscribe()
            // onSubscribe()的實(shí)現(xiàn) = 使用步驟2(創(chuàng)建觀察者(Observer))時(shí)復(fù)寫的onSubscribe()
            // Observer對象的onSubscribe方法實(shí)現(xiàn)
            observer.onSubscribe(parent);

            try {
                // 3. 調(diào)用source對象的subscribe()
                // source對象 = 使用步驟1(創(chuàng)建被觀察者(Observable))中創(chuàng)建的ObservableOnSubscribe對象 
                // subscribe()的實(shí)現(xiàn) = 使用步驟1(創(chuàng)建被觀察者(Observable))中復(fù)寫的subscribe()->>分析2
                // 這里調(diào)用的,其實(shí)就是在Observable.create方法中,
                // 實(shí)現(xiàn)ObservableOnSubscribe接口的時(shí)候,實(shí)現(xiàn)的subscrebe方法
                source.subscribe(parent);

            } catch (Throwable ex) {
                Exceptions.throwIfFatal(ex);
                parent.onError(ex);
            }
    }

從這部分代碼可以看出,在ObservableCreate.subscribeActual內(nèi)部會調(diào)用ObservableOnSubscribe.subscribe,但是在調(diào)用這個(gè)方法之前,會調(diào)用觀察者的onSubscribe()方法,其實(shí)就是事件開始。

三、操作符分析

1.分析map和flatMap的區(qū)別

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper) {
    return flatMap(mapper, false);
}

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
    ObjectHelper.requireNonNull(mapper, "mapper is null");
    return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}

要分析兩個(gè)操作符的區(qū)別,首先看源碼。
比如在map和flatMap兩個(gè)操作符中,分別做網(wǎng)絡(luò)請求,那么在map中,就會有問題。
因?yàn)槿绻趂latMap中執(zhí)行Retrofit請求接口,返回的是一個(gè)Observable<T>,那么可以看出,如果是使用flatMap的話,則flatMap中的泛型<R>其實(shí)是接口請求的返回值的Observable<T>的泛型T,而如果是使用map的話,那么map中的泛型<R>其實(shí)就是接口請求返回值Observable<T>,那么map的返回值就會變成Observable<Observable<T>>,這結(jié)構(gòu)出現(xiàn)了變化
所以map主要是用來做數(shù)據(jù)類型的轉(zhuǎn)換的,從一個(gè)數(shù)據(jù)類型轉(zhuǎn)為另外一個(gè)數(shù)據(jù)類型,而不會影響這個(gè)數(shù)據(jù)類型在Observable的,比如轉(zhuǎn)換之前是Observable<T>,轉(zhuǎn)換之后變成Observable<R>,依然是Observable結(jié)構(gòu)的,轉(zhuǎn)換的只是Observable中的泛型的類型。
而flatMap的話,可以使用Observable的泛型類型數(shù)據(jù),得到一個(gè)新的Observable,然后使用這個(gè)新的Observable替代了舊的Observable

flatMap在實(shí)際應(yīng)用場景中,可能會出現(xiàn)一個(gè)接口的請求你數(shù)據(jù)需要借助于前一個(gè)接口,這樣的接口多層嵌套的情況,在這樣的情況下,可以借助于flatMap來簡化嵌套層次,在flatMap中還可以借助于Observable.fromIterable實(shí)現(xiàn)一個(gè)發(fā)射器功能,即遍歷一個(gè)數(shù)組或者集合,然后按集合的長度進(jìn)行遍歷發(fā)射,這樣在這個(gè)flatMap的后面的觀察者就會執(zhí)行多次。

2.doOnNext

使用doOnNext代替subscribe,使用doOnNext在兩個(gè)請求的中間進(jìn)行一次UI更新操作

MyRetrofit.createRetrofit().create(TestApi.class)
        .register("947674559qq.com", "123456", "123456")
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .doOnNext(new Consumer<LoginResponse>() {
            @Override
            public void accept(LoginResponse loginResponse) throws Exception {
                // todo 注冊完成之后更新ui
            }
        })
        .observeOn(Schedulers.io())
        .flatMap(new Function<LoginResponse, ObservableSource<LoginResponse>>() {
            @Override
            public ObservableSource<LoginResponse> apply(LoginResponse loginResponse) throws Exception {
                Observable<LoginResponse> observable = MyRetrofit.createRetrofit().create(TestApi.class)
                        .loginWanAndroid(loginResponse.getData().getUsername(), loginResponse.getData().getPassword());
                return observable;
            }
        })
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Observer<LoginResponse>() {
            @Override
            public void onSubscribe(Disposable d) {
                // todo 如果是使用MVP模式,可以在BaseModel中定義CompositeDisposable,用來保存Disposable
                // todo 然后BaseModel實(shí)現(xiàn)LifecycleObserver接口,這樣在BaseModel中就可以根據(jù)注解回調(diào)到activity的生命周期onDestroy
                // todo 然后在BaseModel對生命周期的回調(diào)中mCompositeDisposable?.dispose()
                // todo 顯示加載中的dialog
            }

            @Override
            public void onNext(LoginResponse loginResponse) {
                // todo 登錄完成之后更新UI
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {
                // todo 登錄完成之后才會回調(diào)complete,關(guān)閉dialog
            }
        });
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

友情鏈接更多精彩內(nèi)容