Rxjava2源碼學(xué)習(xí)

RxJava2源碼學(xué)習(xí)

Rxjava最引以為傲的鏈?zhǔn)讲僮?,每個方法都是產(chǎn)生一個Obserable,這樣才能鏈?zhǔn)秸{(diào)用。每個方法產(chǎn)生的Obserable內(nèi)部都有三個東西,代理Observer,下游Observer,上游Obserable。

1.事件的創(chuàng)建:

這是一段沒有任何操作符和線程調(diào)度的代碼:

    Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
                e.onNext(1);
            }
    }).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(@NonNull Disposable d) {
            }

            @Override
            public void onNext(@NonNull Integer integer) {
            }

            @Override
            public void onError(@NonNull Throwable e) {
            }

            @Override
            public void onComplete() {
            }
    });
        

進去看看create操作符到底干了什么,怎么創(chuàng)建的一個Obserable。

    public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
    //檢查是否為null
        ObjectHelper.requireNonNull(source, "source is null");
        return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
    }

--->onAssembly,這里創(chuàng)建了一個ObservableCreate傳了進去

public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
        Function<? super Observable, ? extends Observable> f = onObservableAssembly;
        //f默認為null
        if (f != null) {
            return apply(f, source);
        }
        return source;
    }

這個方法接受一個Obserable,方法里的f默認為null,需要我們預(yù)先設(shè)置,這個有關(guān)hook,一般用不到,后面所有有關(guān)hook的代碼都先忽略。所以實際上就是返回了我們剛剛傳進來的suorce。這樣create方法實際返回了ObservableCreate對象,就是我們需要的Obserable了。

--->ObservableCreate,構(gòu)造方法的代碼,可以看到繼承自O(shè)bserable

public final class ObservableCreate<T> extends Observable<T> {
    final ObservableOnSubscribe<T> source;

    public ObservableCreate(ObservableOnSubscribe<T> source) {
        //構(gòu)造方法傳進來了我們在外面的回調(diào)
        this.source = source;
    }
}

創(chuàng)建完成了,再看訂閱:

public final void subscribe(Observer<? super T> observer) {
        ObjectHelper.requireNonNull(observer, "observer is null");
        try {
            //有關(guān)hook,忽略
            observer = RxJavaPlugins.onSubscribe(this, observer);
            //檢查observer是否為null
            ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");

            //這是實際訂閱方法
            subscribeActual(observer);
        } catch (NullPointerException e) { // NOPMD
            throw e;
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            // can't call onError because no way to know if a Disposable has been set or not
            // can't call onSubscribe because the call might have set a Subscription already
            RxJavaPlugins.onError(e);

            NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
            npe.initCause(e);
            throw npe;
        }
    }

--->subscribeActual,這是個Obserable的抽象方法,需要子類具體Obserable去實現(xiàn)。我們?nèi)倓偸褂胏reate方法創(chuàng)建的ObservableCreate看看實現(xiàn)。

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        //創(chuàng)建一個發(fā)射器,同時也是一個開關(guān),數(shù)據(jù)將由它源源不斷的發(fā)射到下游
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        
        //調(diào)用下游觀察者的onSubscribe將開關(guān)傳遞下去,用來控制事件發(fā)射
        observer.onSubscribe(parent);

        try {
        //suorce就是我們的回調(diào)類,在subscribe里面我們操縱e.onNext(),e.onComplete(),e.onError()發(fā)射事件
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }

這里可以看到上游的Obserable已經(jīng)可以將事件發(fā)射出去了,那到底怎么傳遞到下游的,既然是通過e.onNext(),e.onComplete(),e.onError()發(fā)射出去的,看看唄

//繼承了發(fā)射器ObservableEmitter,實現(xiàn)了Disposable開關(guān),同時還發(fā)現(xiàn)它還繼承了AtomicReference<Disposable>,AtomicXXX系列的類是線程安全的原子操作類,不用加鎖,Rxjava里面的Disposable開關(guān)的控制就是通過它來保證線程安全的。
static final class CreateEmitter<T> extends AtomicReference<Disposable> implements ObservableEmitter<T>, Disposable {


        private static final long serialVersionUID = -3434801548987643227L;

        final Observer<? super T> observer;

        //構(gòu)造方法,將下游Observer保存
        CreateEmitter(Observer<? super T> observer) {
            this.observer = observer;
        }

        
        //重點,數(shù)據(jù)傳遞
        @Override
        public void onNext(T t) {
        //null判斷,因此Rxjava2不能發(fā)射null了。
            if (t == null) {
                onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                return;
            }
            
            //檢查控制訂閱關(guān)系的開關(guān)Dispose,不為false才發(fā)送數(shù)據(jù)
            if (!isDisposed()) {
                observer.onNext(t);
            }
        }

        //異常事件
        @Override
        public void onError(Throwable t) {
            if (t == null) {
                t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
            }
            
            //同樣檢查開關(guān)
            if (!isDisposed()) {
                try {
                    observer.onError(t);
                } finally {
                    //這是finally塊,因為產(chǎn)生onError或complete,就意味著訂閱關(guān)系已終止,必須解除
                    dispose();
                }
            } else {
                
                RxJavaPlugins.onError(t);
            }
        }

        @Override
        public void onComplete() {
            //同樣檢查開關(guān)
            if (!isDisposed()) {
                try {
                    //這是finally塊,因為產(chǎn)生onError或complete,就意味著訂閱關(guān)系已終止,必須解除
                    observer.onComplete();
                } finally {
                    dispose();
                }
            }
        }

        @Override
        public void setDisposable(Disposable d) {
            DisposableHelper.set(this, d);
        }

        @Override
        public void setCancellable(Cancellable c) {
            setDisposable(new CancellableDisposable(c));
        }

        @Override
        public ObservableEmitter<T> serialize() {
            return new SerializedEmitter<T>(this);
        }


        //解除訂閱關(guān)系
        @Override
        public void dispose() {
            DisposableHelper.dispose(this);
        }

        //是否已解除訂閱關(guān)系
        @Override
        public boolean isDisposed() {
            return DisposableHelper.isDisposed(get());
        }
    }

2.操作符map:

    @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.NONE)
    public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
        ObjectHelper.requireNonNull(mapper, "mapper is null");
        //RxJavaPlugins.onAssembly有關(guān)hook,實際就是返回了ObservableMap對象,上面講過
        return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
    }

--->ObservableMap

public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
    final Function<? super T, ? extends U> function;

    public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
        //保存上游Obserable
        super(source);
        //保存提供實際轉(zhuǎn)換操作的外部回調(diào)對象
        this.function = function;
    }

    @Override
    public void subscribeActual(Observer<? super U> t) {
        //這里創(chuàng)建了一個Observer,用于訂閱上游,這樣數(shù)據(jù)才能鏈?zhǔn)絺鬟f,但是它只是一個中間代理,用于接受上游數(shù)據(jù),但是還需要轉(zhuǎn)換并且傳遞到下游。所以傳進去下游Observer t和回調(diào)的 function對象。
        source.subscribe(new MapObserver<T, U>(t, function));
    }
}

從第一個創(chuàng)建的Obserable說起,它要做的工作是調(diào)用下游的onNext等等方法傳遞事件。那么這需要一個Observer對象,但是現(xiàn)在我們做了中間操作,事件需要經(jīng)過處理,因此就需要在本節(jié)點Obserable內(nèi)部維護一個代理Observer用于訂閱上游的事件,然后完成特定的操作如map數(shù)據(jù)類型轉(zhuǎn)換。再繼續(xù)調(diào)用下游的Observer.onNext等方法,將事件傳遞下去。

static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
        final Function<? super T, ? extends U> mapper;

        MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
            //保存下游observer
            super(actual);
            //保存外部轉(zhuǎn)換操作的回調(diào)對象
            this.mapper = mapper;
        }

        @Override
        public void onNext(T t) {
            //當(dāng)onComplete或onError事件發(fā)生后,done為true,是開關(guān)
            if (done) {
                return;
            }

            //sourceMode默認為NONE
            if (sourceMode != NONE) {
                actual.onNext(null);
                return;
            }

            U v;

            try {
                //數(shù)據(jù)轉(zhuǎn)換
                v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
            } catch (Throwable ex) {
                fail(ex);
                return;
            }
            //將數(shù)據(jù)傳遞給調(diào)用下游
            actual.onNext(v);
        }

    }

在中間插了個map操作符的Rx鏈子:

Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
                e.onNext(1);
            }
        }).map(new Function<Integer, String>() {

            @Override
            public String apply(@NonNull Integer integer) throws Exception {
                return null;
            }
        }).subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(@NonNull Disposable d) {
            }

            @Override
            public void onNext(@NonNull String integer) {
            }

            @Override
            public void onError(@NonNull Throwable e) {
            }

            @Override
            public void onComplete() {
            }
        });

Obserable--->map--->Observer

Obserable鏈子的產(chǎn)生是從上游到下游,每個方法都是產(chǎn)生一個Obserable,每個下游的Obserable在創(chuàng)建時就保存了上游的Obserable。事件訂閱動作肯定是發(fā)生在最后一個Obserable。每次Obserable的subcribe動作都是直接調(diào)用的subscribeActual方法

map Obserable訂閱observer:

ObservableMap.subcribe(observer);

再到-->

@Override
    public void subscribeActual(Observer<? super U> t) {
        //這個suorce哪來的,不就是ObservableMap創(chuàng)建時傳進來的上游Obserable嗎
        source.subscribe(new MapObserver<T, U>(t, function));
    }

訂閱到這里還不夠啊,因為數(shù)據(jù)源在最頂部d的Obserable,于是必須要創(chuàng)建中間代理Observer訂閱上游Obserable,接受上游的事件。

@Override
    protected void subscribeActual(Observer<? super T> observer) {
        //將下游Observer保存
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        
        //開關(guān)也是這里傳遞下去的
        observer.onSubscribe(parent);

        try {
            //最終訂閱到了這里,就是我們數(shù)據(jù)發(fā)射的源頭,外部回調(diào)對象
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }

于是就這樣,訂閱動作從鏈子最底部傳到了最頂部的Obserable。接著畫風(fēng)一轉(zhuǎn),訂閱流程結(jié)束,開始事件發(fā)射流程:

Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
                //數(shù)據(jù)發(fā)射
                e.onNext(1);
            }
        }).

而在發(fā)射器的onNext方法里面:

if (t == null) {
                onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                return;
            }
            if (!isDisposed()) {
                //開始向下游傳遞事件了,這個Observer顯然就是ObservableMap里面的代理Observer。
                observer.onNext(t);
            }

--->ObservableMap.onNext,事件已經(jīng)從上一個Obserable傳遞到了ObservableMap

        
        @Override
        public void onNext(T t) {
            if (done) {
                return;
            }

            if (sourceMode != NONE) {
                actual.onNext(null);
                return;
            }

            U v;

            try {
                //做完數(shù)據(jù)類型轉(zhuǎn)換
                v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
            } catch (Throwable ex) {
                fail(ex);
                return;
            }
            //繼續(xù)調(diào)用下游onNext向下游發(fā)送事件,這里顯然就是最終的Observer
            actual.onNext(v);
        }

總結(jié):先從下往上訂閱,再從上往下發(fā)送。

3.線程調(diào)度

  • subcribeOn():
    @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.CUSTOM)
    public final Observable<T> subscribeOn(Scheduler scheduler) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
    }
    public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;

    public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
        super(source);
        this.scheduler = scheduler;
    }

    @Override
    public void subscribeActual(final Observer<? super T> s) {
        //創(chuàng)建一個代理Observer,同時又是一個Disposed開關(guān)
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
        //開關(guān)傳遞到下游
        s.onSubscribe(parent);

        //將線程調(diào)度返回一個Disposed開關(guān),方便對線程進行控制管理
        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
        
        //這里好像并沒有suorce.subcribe(s);
    }
}

--->SubscribeTask

    final class SubscribeTask implements Runnable {
        private final SubscribeOnObserver<T> parent;

        SubscribeTask(SubscribeOnObserver<T> parent) {
            this.parent = parent;
        }

        @Override
        public void run() {
            //訂閱在這里,注意,是將subscrible操作放到了線程當(dāng)中哦,想想后面哪些操作在這個線程中。
            source.subscribe(parent);
        }
    }
}

--->scheduleDirect

        @NonNull
    public Disposable scheduleDirect(@NonNull Runnable run) {
        return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
    }
    
    再到
    
    @NonNull
    public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
        //這里由選擇的線程決定,會獲得不同的Worker,subscribeOn(Schedulers.io())
        final Worker w = createWorker();

        //先不管
        final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

        //將線程加入Disposed控制,以便在取消訂閱時,能及時關(guān)閉線程
        DisposeTask task = new DisposeTask(decoratedRun, w);

        //任務(wù)開始執(zhí)行
        w.schedule(task, delay, unit);

        return task;
    }    

--->DisposeTask

    //就是個包裝過后的任務(wù),實現(xiàn)了Disposed接口,實現(xiàn)了開關(guān)管理的控制
    static final class DisposeTask implements Runnable, Disposable {
        
        //要實施的任務(wù)
        final Runnable decoratedRun;
        
        //線程工作
        final Worker w;

        Thread runner;

        DisposeTask(Runnable decoratedRun, Worker w) {
            this.decoratedRun = decoratedRun;
            this.w = w;
        }

        @Override
        public void run() {
            runner = Thread.currentThread();
            try {
                decoratedRun.run();
            } finally {
                //運行完了必須切斷
                dispose();
                runner = null;
            }
        }

        //關(guān)閉線程
        @Override
        public void dispose() {
            if (runner == Thread.currentThread() && w instanceof NewThreadWorker) {
                ((NewThreadWorker)w).shutdown();
            } else {
                w.dispose();
            }
        }

        @Override
        public boolean isDisposed() {
            return w.isDisposed();
        }
    }

選中Worker,Ctrl+H查看他的繼承樹。隨便找個具體看看。


看看NewThreadWorker關(guān)鍵源碼:

    //線程調(diào)度的最終方法
    @NonNull
    public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
        Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

        //包裝任務(wù)
        ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);

        //一般為null,忽略
        if (parent != null) {
            if (!parent.add(sr)) {
                return sr;
            }
        }

        Future<?> f;
        try {
            //加入線程池
            if (delayTime <= 0) {
                f = executor.submit((Callable<Object>)sr);
            } else {
                f = executor.schedule((Callable<Object>)sr, delayTime, unit);
            }
            sr.setFuture(f);
        } catch (RejectedExecutionException ex) {
            if (parent != null) {
                parent.remove(sr);
            }
            RxJavaPlugins.onError(ex);
        }

        return sr;
    }

    //記得之前在創(chuàng)建worker時就將它加入了Disposed管理。這里控制了線程調(diào)度
    @Override
    public void dispose() {
        if (!disposed) {
            disposed = true;
            executor.shutdownNow();
        }
    }

再來看看:

    final class SubscribeTask implements Runnable {
        private final SubscribeOnObserver<T> parent;

        SubscribeTask(SubscribeOnObserver<T> parent) {
            this.parent = parent;
        }

        @Override
        public void run() {
            source.subscribe(parent);
        }
    }
        @Override
        public void onNext(T t) {
            //代碼執(zhí)行到這里,一定是在某個調(diào)度的線程當(dāng)中,但是不一定就是在這次調(diào)度的線程,因為它不一定就是離頂部最近的subscribeOn,因為線程任務(wù)都會加入Disposed管理,因此這里不需要判斷了
            actual.onNext(t);
        }

        @Override
        public void onError(Throwable t) {
            actual.onError(t);
        }

        @Override
        public void onComplete() {
            actual.onComplete();
        }

前面知道了整個事件鏈子是先從下往上訂閱,再從上往下發(fā)射。source.subscribe(parent);這句代碼發(fā)生在調(diào)度線程中。因此在后面的所有操作都是發(fā)生在了這個調(diào)度線程當(dāng)總中,這也就解釋了為什么多個subscribeOn()只有第一個有效,因為subscribeOn()是在訂閱的鏈子上,所有的發(fā)射操作都是在訂閱的后面,自然發(fā)射操作也就只受離頂部最近的subscribeOn的影響了。而下面的subscribeOn只是影響了一些訂閱操作而已,但是這我們察覺不出來,并不關(guān)心。

  • ObserveOn():
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> observeOn(Scheduler scheduler) {
        //這里好像沒有RxJavaPlugins.onAssembly()那家伙了。。。bufferSize()是一個常量,緩沖池的大小
        return observeOn(scheduler, false, bufferSize());
}

再看

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        ObjectHelper.verifyPositive(bufferSize, "bufferSize");
        //又來了。。。
        return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}
public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;
    final boolean delayError;
    final int bufferSize;
    public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
        //保存上游Obserable
        super(source);
        //保存Scheduler
        this.scheduler = scheduler;
        this.delayError = delayError;
        this.bufferSize = bufferSize;
    }

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        //特殊調(diào)度器,暫不做考慮
        if (scheduler instanceof TrampolineScheduler) {
            source.subscribe(observer);
        } else {
            //直接創(chuàng)建Worker
            Scheduler.Worker w = scheduler.createWorker();
            //訂閱
            source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
        }
    }
}

--->ObserveOnObserver

static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
    implements Observer<T>, Runnable {
        private static final long serialVersionUID = 6576896619930983584L;
        final Observer<? super T> actual;
        final Scheduler.Worker worker;
        final boolean delayError;
        final int bufferSize;

        //緩沖隊列,上游發(fā)射過來的事件都會先存到這里,然后在這里取事件發(fā)射給下游
        SimpleQueue<T> queue;
        Disposable s;
        //存儲異常
        Throwable error;
        //是否已完成
        volatile boolean done;
        volatile boolean cancelled;
        int sourceMode;
        boolean outputFused;
        ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {
            this.actual = actual;
            this.worker = worker;
            this.delayError = delayError;
            this.bufferSize = bufferSize;
        }

        @Override
        public void onNext(T t) {
        //判斷是否終止
            if (done) {
                return;
            }

            //true
            if (sourceMode != QueueDisposable.ASYNC) {
                //將事件加入隊列
                queue.offer(t);
            }
            //調(diào)度線程,注意啊,onNext方法是在發(fā)射鏈子上的,因此可以想到,ObserveOn()影響發(fā)射過程,且只影響后面的發(fā)射操作
            schedule();
        }

        void schedule() {
            //原子操作,自增然后返回原值
            if (getAndIncrement() == 0) {
                //把自己當(dāng)做任務(wù),傳了進去
                worker.schedule(this);
            }
        }
}

--->run

    @Override
        public void run() {
            //默認false
            if (outputFused) {
                drainFused();
            } else {
                drainNormal();
            }
    }

--->drainNormal

void drainNormal() {
            
            int missed = 1;
            //事件緩沖隊列
            final SimpleQueue<T> q = queue;
            //下游
            final Observer<? super T> a = actual;

            for (;;) {
                //檢查是否已終結(jié)
                if (checkTerminated(done, q.isEmpty(), a)) {
                    return;
                }

                for (;;) {
                    boolean d = done;
                    T v;

                    try {
                        //事件出列
                        v = q.poll();
                    } catch (Throwable ex) {
                        Exceptions.throwIfFatal(ex);
                        s.dispose();
                        q.clear();
                        a.onError(ex);
                        worker.dispose();
                        return;
                    }
                    boolean empty = v == null;

                    //再次檢查是否終結(jié)
                    if (checkTerminated(d, empty, a)) {
                        return;
                    }

                    if (empty) {
                        break;
                    }
                    //將事件發(fā)射給下游
                    a.onNext(v);
                }
                missed = addAndGet(-missed);
                if (missed == 0) {
                    break;
                }
            }
        }

總結(jié):subscribeOn在訂閱鏈子上執(zhí)行,observeOn在發(fā)射鏈子上執(zhí)行,影響的操作


但是有些操作或方法比較特別點:doOnSubscribe()和onSubscribe()
doOnSubscribe是指在onSubscribe()發(fā)生之前調(diào)用。

看看ObserableCreate:

@Override
    protected void subscribeActual(Observer<? super T> observer) {
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        //在這里將Dispose開關(guān)傳遞下去,
        observer.onSubscribe(parent);

        try {
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }

一般的操作符也是規(guī)規(guī)矩矩的將這個Dispose開關(guān)一樣的傳下去,例如ObserableMap里面的代理Observer:

    @SuppressWarnings("unchecked")
    @Override
    public final void onSubscribe(Disposable s) {
        if (DisposableHelper.validate(this.s, s)) {

            this.s = s;
            if (s instanceof QueueDisposable) {
                this.qs = (QueueDisposable<T>)s;
            }

            if (beforeDownstream()) {

                //直接傳遞
                actual.onSubscribe(this);

                afterDownstream();
            }

        }
    }

但是,看看subscribeOn里面的代理Observer的代碼:

        @Override
        public void onSubscribe(Disposable s) {
            //沒有直接傳遞dispose開關(guān),只是對上游的開關(guān)做了設(shè)置
            DisposableHelper.setOnce(this.s, s);
        }

那么我們的下游需要的開關(guān)在哪里呢?
ObservableSubscribeOn

@Override
    public void subscribeActual(final Observer<? super T> s) {
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
        //自己創(chuàng)建了一個dispose傳給了下游,并且在下面的線程調(diào)度之前執(zhí)行
        s.onSubscribe(parent);

        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }

可以看到,在訂閱階段,并且在線程調(diào)度之前執(zhí)行。

因此我們可以得出結(jié)論:
如果有subscribeOn在doOnSubscribe()的下面,那么doOnSubscribe()和onSubscribe()都執(zhí)行在下面最近的subscribeOn指定的線程里,否則執(zhí)行在默認線程里面。ObserveOn不對doOnSubscribe()和onSubscribe()造成任何影響,因為前面說過,ObserveOn只對訂閱之后的發(fā)射階段可能造成影響。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容