RXJava的使用與源碼解析

RxJava 到底是什么

一個詞:異步。

RxJava 在 GitHub 主頁上的自我介紹是 "a library for composing asynchronous and event-based programs using observable sequences for the Java VM"(一個在 Java VM 上使用可觀測的序列來組成異步的、基于事件的程序的庫)。這就是 RxJava ,概括得非常精準。

先來看一段代碼

new Thread() {
    @Override
    public void run() {
        super.run();
        for (File folder : folders) {
            File[] files = folder.listFiles();
            for (File file : files) {
                if (file.getName().endsWith(".png")) {
                    final Bitmap bitmap = getBitmapFromFile(file);
                    getActivity().runOnUiThread(new Runnable() {
                        @Override
                        public void run() {
                            imageCollectorView.addImage(bitmap);
                        }
                    });
                }
            }
        }
    }
}.start();

如果是rxJava的話,實現(xiàn)方法是這樣的

Observable.from(folders)
    .flatMap(new Func1<File, Observable<File>>() {
        @Override
        public Observable<File> call(File file) {
            return Observable.from(file.listFiles());
        }
    })
    .filter(new Func1<File, Boolean>() {
        @Override
        public Boolean call(File file) {
            return file.getName().endsWith(".png");
        }
    })
    .map(new Func1<File, Bitmap>() {
        @Override
        public Bitmap call(File file) {
            return getBitmapFromFile(file);
        }
    })
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Action1<Bitmap>() {
        @Override
        public void call(Bitmap bitmap) {
            imageCollectorView.addImage(bitmap);
        }
    });

從上面的代碼中,我們可以看到,rxJava的代碼,要比New Thread的代碼要長。RXJava的代碼可讀性,要比new Thead要強很多

在rxJava中需要知道三個對象
1.Observer 觀察者
對于觀察者而言,是對事件做出處理后的回調(diào)。Observer回調(diào)包含onSubscribe注冊時回調(diào),onNext事件到達時回調(diào),onError拋出異常時回調(diào),onComplete事件完成時回調(diào)。
2.Observable 被觀察對象
由該對象開始訂閱,并對事件進行一系列的變換處理。最終該事件到達觀察者的回調(diào)中。
3.subscribe() 訂閱
觀察者訂閱被觀察者,訂閱的時候會執(zhí)行ObservableOnSubscribe中的subscribe訂閱函數(shù)。
ObservableEmitter 發(fā)射器
emitter.onNext();通過發(fā)射器發(fā)送一個事件

   // 1、創(chuàng)建觀察者
        final Observer<Integer> observer = new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
              //被訂閱之后,會先執(zhí)行這個方法
                Log.e(TAG, "subscribe");
            }

            @Override
            public void onNext(Integer value) {
                //被觀察者調(diào)用onNext
                Log.e(TAG, "" + value);
            }

            @Override
            public void onError(Throwable e) {
          //被觀察者調(diào)用onError
                Log.e(TAG, "error");
            }

            @Override
            public void onComplete() {
//被觀察者調(diào)用onComplete
                Log.e(TAG, "complete");
            }
        };

        //2、創(chuàng)建被觀察者
        Observable<Integer> observable =  Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                ///4、發(fā)射相應事件
                emitter.onNext(“hello”);
                emitter.onNext(2);
                emitter.onComplete();
            }
        });

        ///3、創(chuàng)建訂閱者,將觀察者與被觀察者關聯(lián)起來
        observable.subscribe(observer);

輸出為
subscribe
hello
2
complete

被觀察者Observable 的第二個用法just

Observable observable = Observable.just("Hello", "Hi", "Aloha");
// 將會依次調(diào)用:
// onNext("Hello");
// onNext("Hi");
// onNext("Aloha");
// onCompleted();

被觀察Observable 者的第三個用法from

String[] words = {"Hello", "Hi", "Aloha"};
Observable observable = Observable.from(words);
// 將會依次調(diào)用:
// onNext("Hello");
// onNext("Hi");
// onNext("Aloha");
// onCompleted();

被觀察Observable 者的第四個用法fromArray

Observable observable = Observable.fromArray("Hello", "Hi", "Aloha");
// 將會依次調(diào)用:
// onNext("Hello");
// onNext("Hi");
// onNext("Aloha");
// onCompleted();

被觀察Observable 者的第五個用法range

Observable<Integer> observable =  Observable.range(1,3);
// 將會依次調(diào)用:
// onNext(1);
// onNext(2);
// onNext(3);
// onCompleted();

被觀察Observable 者的還有第六個用法interval

    @CheckReturnValue
    @SchedulerSupport("io.reactivex:computation")
    public static Observable<Long> interval(long period, TimeUnit unit) {
        return interval(period, period, unit, Schedulers.computation());
    }
observable = Observable.interval(10, TimeUnit.SECONDS);//間隔10秒
輸出日志
16:03:31.359 3518-3518/com.example.testrxjava E/MainActivity: subscribe
16:03:41.363 3518-3538/com.example.testrxjava E/MainActivity: 0
16:03:51.361 3518-3538/com.example.testrxjava E/MainActivity: 1
16:04:01.361 3518-3538/com.example.testrxjava E/MainActivity: 2
16:04:11.361 3518-3538/com.example.testrxjava E/MainActivity: 3
16:04:21.362 3518-3538/com.example.testrxjava E/MainActivity: 4
16:04:31.361 3518-3538/com.example.testrxjava E/MainActivity: 5

被觀察Observable 者的還有第七個用法timer

observable = Observable.timer(10, TimeUnit.SECONDS);
輸出日志為
 16:02:18.312 3433-3433/com.example.testrxjava E/MainActivity: subscribe
 16:02:28.315 3433-3453/com.example.testrxjava E/MainActivity: 0
 16:02:28.316 3433-3453/com.example.testrxjava E/MainActivity: complete

Rxjava的Disposable

rxjava雖然好用,但是總所周知,容易遭層內(nèi)存泄漏。也就說在訂閱了事件后沒有及時取閱,導致在activity或者fragment銷毀后仍然占用著內(nèi)存,無法釋放。而disposable便是這個訂閱事件,可以用來取消訂閱。

public interface Disposable {
    void dispose();//中斷訂閱事件

    boolean isDisposed(); //判斷時間是否中斷的
}

關于Scheduler的線程操作

默認情況下, RxJava 遵循的是線程不變的原則,即:在哪個線程調(diào)用 subscribe(),就在哪個線程生產(chǎn)事件;在哪個線程生產(chǎn)事件,就在哪個線程消費事件。如果需要切換線程,就需要用到 Scheduler (調(diào)度器)。

Schedulers.immediate(): 直接在當前線程運行,相當于不指定線程。這是默認的 Scheduler。

Schedulers.newThread(): 總是啟用新線程,并在新線程執(zhí)行操作。

Schedulers.io(): I/O 操作(讀寫文件、讀寫數(shù)據(jù)庫、網(wǎng)絡信息交互等)所使用的 Scheduler。行為模式和 newThread() 差不多,區(qū)別在于 io() 的內(nèi)部實現(xiàn)是是用一個無數(shù)量上限的線程池,可以重用空閑的線程,因此多數(shù)情況下 io() 比 newThread() 更有效率。不要把計算工作放在 io() 中,可以避免創(chuàng)建不必要的線程。

Schedulers.computation(): 計算所使用的 Scheduler。這個計算指的是 CPU 密集型計算,即不會被 I/O 等操作限制性能的操作,例如圖形的計算。這個 Scheduler 使用的固定的線程池,大小為 CPU 核數(shù)。不要把 I/O 操作放在 computation() 中,否則 I/O 操作的等待時間會浪費 CPU。

Android 還有一個專用的 AndroidSchedulers.mainThread(),它指定的操作將在 Android 主線程運行。

        observable.subscribeOn(Schedulers.io()); // 指定onSubscribe方法執(zhí)行的線程為 IO線程
        observable.observeOn(AndroidSchedulers.mainThread());// 指定observer回調(diào)方法執(zhí)行的線程為 android 主線程

觀察者的源碼分析

public interface Observer<T> {

    /**
     * Provides the Observer with the means of cancelling (disposing) the
     * connection (channel) with the Observable in both
     * synchronous (from within {@link #onNext(Object)}) and asynchronous manner.
     * @param d the Disposable instance whose {@link Disposable#dispose()} can
     * be called anytime to cancel the connection
     * @since 2.0
     */
    void onSubscribe(@NonNull Disposable d);

    /**
     * Provides the Observer with a new item to observe.
     * <p>
     * The {@link Observable} may call this method 0 or more times.
     * <p>
     * The {@code Observable} will not call this method again after it calls either {@link #onComplete} or
     * {@link #onError}.
     *
     * @param t
     *          the item emitted by the Observable
     */
    void onNext(@NonNull T t);

    /**
     * Notifies the Observer that the {@link Observable} has experienced an error condition.
     * <p>
     * If the {@link Observable} calls this method, it will not thereafter call {@link #onNext} or
     * {@link #onComplete}.
     *
     * @param e
     *          the exception encountered by the Observable
     */
    void onError(@NonNull Throwable e);

    /**
     * Notifies the Observer that the {@link Observable} has finished sending push-based notifications.
     * <p>
     * The {@link Observable} will not call this method if it calls {@link #onError}.
     */
    void onComplete();

}

以上代碼可以看出觀察者實際上就是一個接口,接口的使用在上面已結介紹過了

接下來是被觀察者源碼分析

我們從被觀察者的create()方法看起

    @CheckReturnValue
    @NonNull
    @SchedulerSupport(SchedulerSupport.NONE)
    public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
        ObjectHelper.requireNonNull(source, "source is null");//判斷是不是空的source對象
 //傳入Observable和Function對象,返回ObservableMap對象(hook功能默認沒有開啟)
        return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
    }

requireNonNull方法里就是判斷source是否為空,如果是空的就會拋出一個空指針異常,這個可以忽略,我們發(fā)現(xiàn),ObservableMap的subscribeActual直接調(diào)用了source的subscribe函數(shù),現(xiàn)在由兩個問題,第一是source是什么?答案就是我們在上一步(這里是第一步)傳入的ObservableCreate對象,就是說這里調(diào)用了ObservableCreate的subscribe函數(shù)。第二參數(shù)是什么呢,參數(shù)是MapObserver,它把我們原來的Observer對象t包裝成MapObserver對象,我們現(xiàn)在去看看ObservableCreate的subscribe,發(fā)現(xiàn)它并沒有實現(xiàn),而是復用了父類Observable的subscribe,就是我們上面看到的那一段代碼,所以直接看ObservableCreate的subscribeActual函數(shù)即可:


public final class ObservableCreate<T> extends Observable<T> {
    final ObservableOnSubscribe<T> source;

    public ObservableCreate(ObservableOnSubscribe<T> source) {//這個new ObservableCreate<T>(source)
        this.source = source;
    }
@Override
protected void subscribeActual(Observer<? super T> observer) {
    //創(chuàng)建了一個發(fā)射器對象
    CreateEmitter<T> parent = new CreateEmitter<T>(observer);
    //回調(diào)Observer實例的onSubscribe函數(shù)。這里是MapObserver
    observer.onSubscribe(parent);

    try {
        //回調(diào)訂閱,此處為ObservableCreate訂閱ObservableOnSubscribe
        source.subscribe(parent);
    } catch (Throwable ex) {
        Exceptions.throwIfFatal(ex);
        parent.onError(ex);
    }
}

...
  }

這個對象的構造函數(shù)中,就是傳入了source這個參數(shù)

    public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
        Function<? super Observable, ? extends Observable> f = onObservableAssembly;
        if (f != null) {
            return apply(f, source);
        }
        return source;
    }

    static <T, R> R apply(@NonNull Function<T, R> f, @NonNull T t) {
        try {
            return f.apply(t);
        } catch (Throwable ex) {
            throw ExceptionHelper.wrapOrThrow(ex);
        }
    }

 */
public interface Function<T, R> {
    /**
     * Apply some calculation to the input value and return some other value.
     * @param t the input value
     * @return the output value
     * @throws Exception on error
     */
    R apply(@NonNull T t) throws Exception;
}

看到這里,感覺像是在onAssembly里直接把source變量直接給返回了
下面是我看別人寫的帖子中寫到的,應該是沒有什么代碼上的處理邏輯了
這是一個hook實現(xiàn),關于hook,可以理解為這是一個抽象代理,這個代理默認情況下不會對咱們的形參Observable做任何的處理,但是如果開發(fā)者想要對Observable做處理,可以調(diào)用RxJavaPlugins的SetonObservableAssembly()設置開發(fā)者自己實現(xiàn)的代理,從而替換原Observable,最后真正返回的是Observable的實現(xiàn)類ObservableCreate類的實例對象。在這里咱們沒做任何處理,所以返回默認的Observable實現(xiàn)類ObservableCreate。至此創(chuàng)建完了一個被觀察者對象Observable。

subscribe方法

    public final void subscribe(Observer<? super T> observer) {
        ObjectHelper.requireNonNull(observer, "observer is null");
        try {
            observer = RxJavaPlugins.onSubscribe(this, observer);

            ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");

            subscribeActual(observer);
        } catch (NullPointerException e) { // NOPMD
            throw e;
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            // can't call onError because no way to know if a Disposable has been set or not
            // can't call onSubscribe because the call might have set a Subscription already
            RxJavaPlugins.onError(e);

            NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
            npe.initCause(e);
            throw npe;
        }
    }

ObjectHelper.requireNonNull(observer, "observer is null")與observer = RxJavaPlugins.onSubscribe(this, observer)這兩個方法與上面的被觀察者中的方法一樣就不在介紹
接下來我們看subscribeActual(observer);這個方法

public final class ObservableCreate<T> extends Observable<T> {
    final ObservableOnSubscribe<T> source;

    public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;
    }

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        observer.onSubscribe(parent);//建立訂閱關系,此時在觀察者中會介紹到onSubscribe的消息

        try {
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }
...

我們可以看到代碼中創(chuàng)建了一個類似發(fā)射器的東西CreateEmitter,CreateEmitter是在ObservableCreate類中的一個內(nèi)部靜態(tài)類,他實現(xiàn)了ObservableEmitter的方法
source.subscribe(parent)中source就是在創(chuàng)建被觀察者對象時傳入的ObservableOnSubscribe對象實例,調(diào)用其subscribe方法,將上游事件發(fā)送對象(ObservableOnSubscribe)和下游接收對象(Observer)關聯(lián)起來。
大致流程是這樣的:
(1)通過Observable.create創(chuàng)建了一個ObservableCreate對象,這個對象保存了我們實現(xiàn)的匿名實現(xiàn)類ObservableOnSubscribe。
(2)通過map,創(chuàng)建了一個ObservableMap對象,這個對象保存了(1)中的ObservableCreate對象。
(3)通過subscribe,實現(xiàn)了ObservableMap對Observer的訂閱(不要奇怪是不是反了,這樣是為了連續(xù)性)。
(4)在ObservableMap實現(xiàn)了對Observer的訂閱時,內(nèi)部會調(diào)用保存的ObservableCreate對象對MapObserver對象進行訂閱(構造MapObserver對象,會將Observer保存在MapObserver的成員變量actual中)
(5)在ObservableCreate對象實現(xiàn)了對MapObserver的訂閱時,ObservableCreate保存的ObservableOnSubscribe對象會對CreateEmitter對象進行了訂閱(構造CreateEmitter對象時,保存了MapObserver)

RXjava線程調(diào)度

下面是我從別的地方盜過來的一張圖,忘了是從哪找的了,完美解釋線程的切換


2814211-cde17013310e2674.png

我們來看切換線程用的方法subscribeOn
observable.subscribeOn(AndroidSchedulers.mainThread())

    @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.CUSTOM)
    public final Observable<T> subscribeOn(Scheduler scheduler) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
    }

又是熟悉的兩個方法,我們忽略,看new ObservableSubscribeOn

public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;

    public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
        super(source);
        this.scheduler = scheduler;
    }

    @Override
    public void subscribeActual(final Observer<? super T> observer) {
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);

        observer.onSubscribe(parent);

        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }
...

這里是不是也是很熟悉,根據(jù)上面的subscribe方法的經(jīng)驗,最后肯定也會執(zhí)行到subscribeActual方法,所以我們直接看subscribeActual方法
我們來看parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));這句

public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
...
    final class SubscribeTask implements Runnable {
        private final SubscribeOnObserver<T> parent;

        SubscribeTask(SubscribeOnObserver<T> parent) {
            this.parent = parent;
        }

        @Override
        public void run() {
            source.subscribe(parent);
        }
    }
...

SubscribeTask 是ObservableSubscribeOn的內(nèi)部類 里面的run方法,執(zhí)行了source.subscribe(parent)訂閱操作,source為ObservableSubscribeOn執(zhí)行構造方法是傳入
接著我們來看scheduleDirect方法

    @NonNull
    public Disposable scheduleDirect(@NonNull Runnable run) {
        return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
    }

  接著查看scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);方法
    @NonNull
    public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
        final Worker w = createWorker();

        final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

        DisposeTask task = new DisposeTask(decoratedRun, w);

        w.schedule(task, delay, unit);

        return task;
    }

createWorker()是一個抽象方法
于是搜索creatteWorker方法結果如下


搜索createWorker.png

可以看到里面有IoScheduler 有newThreadScheduler有ImmediateHginScheduler等類,這些在Scheduler類中都有似曾相識的感覺,于是我猜測在public final Observable<T> subscribeOn(Scheduler scheduler) 方法中
scheduler變量,比如傳入的是Scheduler.IO就會調(diào)用IOScheduler的createWorker方法

public final class IoScheduler extends Scheduler {
...
final AtomicReference<CachedWorkerPool> pool;
...
    @NonNull
    @Override
    public Worker createWorker() {
        return new EventLoopWorker(pool.get());
    }
...
 static final class CachedWorkerPool implements Runnable {
        private final long keepAliveTime;
        private final ConcurrentLinkedQueue<ThreadWorker> expiringWorkerQueue;
        final CompositeDisposable allWorkers;
        private final ScheduledExecutorService evictorService;
        private final Future<?> evictorTask;
        private final ThreadFactory threadFactory;

        CachedWorkerPool(long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) {
            this.keepAliveTime = unit != null ? unit.toNanos(keepAliveTime) : 0L;
            this.expiringWorkerQueue = new ConcurrentLinkedQueue<ThreadWorker>();
            this.allWorkers = new CompositeDisposable();
            this.threadFactory = threadFactory;

            ScheduledExecutorService evictor = null;
            Future<?> task = null;
            if (unit != null) {
                evictor = Executors.newScheduledThreadPool(1, EVICTOR_THREAD_FACTORY);
                task = evictor.scheduleWithFixedDelay(this, this.keepAliveTime, this.keepAliveTime, TimeUnit.NANOSECONDS);
            }
            evictorService = evictor;
            evictorTask = task;
        }
...
    static final class EventLoopWorker extends Scheduler.Worker {
        private final CompositeDisposable tasks;
        private final CachedWorkerPool pool;
        private final ThreadWorker threadWorker;

        final AtomicBoolean once = new AtomicBoolean();

        EventLoopWorker(CachedWorkerPool pool) {
            this.pool = pool;
            this.tasks = new CompositeDisposable();
            this.threadWorker = pool.get();
        }

        @Override
        public void dispose() {
            if (once.compareAndSet(false, true)) {
                tasks.dispose();

                // releasing the pool should be the last action
                pool.release(threadWorker);
            }
        }

        @Override
        public boolean isDisposed() {
            return once.get();
        }

        @NonNull
        @Override
        public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
            if (tasks.isDisposed()) {
                // don't schedule, we are unsubscribed
                return EmptyDisposable.INSTANCE;
            }

            return threadWorker.scheduleActual(action, delayTime, unit, tasks);
        }
    }
    static final class ThreadWorker extends NewThreadWorker {
        private long expirationTime;

        ThreadWorker(ThreadFactory threadFactory) {
            super(threadFactory);
            this.expirationTime = 0L;
        }

        public long getExpirationTime() {
            return expirationTime;
        }

        public void setExpirationTime(long expirationTime) {
            this.expirationTime = expirationTime;
        }
    }
}

這里就是核心所在啦,步驟如下:
(1)創(chuàng)建了一個Worker對象。
(2)沒有hook,返回原Runnable對象,就是我們上面的SubscribeTask對象。
(3)將Worker對象和Runnable對象封裝到DisposeTask中。
(4)調(diào)用worker對象的schedule函數(shù)。
重點就在于threadWorker.scheduleActual,threadWorker通過CachedWorkerPool的get函數(shù)獲?。?br> ···

ThreadWorker get() {
     if (allWorkers.isDisposed()) {
         return SHUTDOWN_THREAD_WORKER;
     }

    //當緩存不為空時,優(yōu)先從緩存中獲取
     while (!expiringWorkerQueue.isEmpty()) {
         ThreadWorker threadWorker = expiringWorkerQueue.poll();
         if (threadWorker != null) {
             return threadWorker;
         }
     }

     // 沒有緩存,重新構建
     ThreadWorker w = new ThreadWorker(threadFactory);
     allWorkers.add(w);
     return w;
}

所以這里調(diào)用的是ThreadWorker的scheduleActual,但是它本身并沒有實現(xiàn)而是其父類NewThreadWorker實現(xiàn)的

@NonNull
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
    //沒有hook,返回原Runnable對象
    Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
    //封裝Runnable對象和訂閱容器對象
    ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);

    ...

    Future<?> f;
    try {
        //重點
        if (delayTime <= 0) {
            //沒有延時
            f = executor.submit((Callable<Object>)sr);
        } else {
            //延時
            f = executor.schedule((Callable<Object>)sr, delayTime, unit);
        }
        sr.setFuture(f);
    } catch (RejectedExecutionException ex) {
        if (parent != null) {
            parent.remove(sr);
        }
        RxJavaPlugins.onError(ex);
    }

    return sr;
}

我們看看executor是啥,原來是線程池的實現(xiàn)類啊,現(xiàn)在有了線程池實例,有了Runnable,不用問,肯定是將Runnable添加到線程池的工作隊列中執(zhí)行,即會調(diào)用Runnable的run方法?;氐缴厦?,去掉層層包裝,我們看下最初始的那個Runnable:

final class SubscribeTask implements Runnable {
    private final SubscribeOnObserver<T> parent;

    SubscribeTask(SubscribeOnObserver<T> parent) {
        this.parent = parent;
    }

    @Override
    public void run() {
        //調(diào)用source的訂閱方法,此處source為上游的ObservableCreate對象
        source.subscribe(parent);
    }
}

到這里一個subscribeOn的流程已經(jīng)完成了。

接下來我們看下observeOn(Scheduler scheduler)方法

    @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.CUSTOM)
    public final Observable<T> observeOn(Scheduler scheduler) {
        return observeOn(scheduler, false, bufferSize());
    }

    public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        ObjectHelper.verifyPositive(bufferSize, "bufferSize");
        return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
    }

看著與之前的套路一樣,我們接著看ObservableObserveOn類


public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;
    final boolean delayError;
    final int bufferSize;
    public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
        super(source);
        this.scheduler = scheduler;
        this.delayError = delayError;
        this.bufferSize = bufferSize;
    }
 
    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        //不會走這里
        if (scheduler instanceof TrampolineScheduler) {
            source.subscribe(observer);
        } else {
           //創(chuàng)建Worker
            Scheduler.Worker w = scheduler.createWorker();
           //注釋1
            source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
        }
    }
...
    static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
    implements Observer<T>, Runnable {

        private static final long serialVersionUID = 6576896619930983584L;
        final Observer<? super T> downstream;
        final Scheduler.Worker worker;
        final boolean delayError;
        final int bufferSize;

        SimpleQueue<T> queue;

        Disposable upstream;

        Throwable error;
        volatile boolean done;

        volatile boolean disposed;

        int sourceMode;

        boolean outputFused;

        ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {
            this.downstream = actual;
            this.worker = worker;
            this.delayError = delayError;
            this.bufferSize = bufferSize;
        }
...
        @Override
        public void onNext(T t) {
            if (done) {//控制不會二次執(zhí)行
                return;
            }
 //默認是0,不等于 QueueDisposable.ASYNC,所以將發(fā)射的T對象入隊
            if (sourceMode != QueueDisposable.ASYNC) {
                queue.offer(t);
            }
            schedule();
        }
        @Override
        public void onError(Throwable t) {
            if (done) {//控制不會二次執(zhí)行
                RxJavaPlugins.onError(t);
                return;
            }
            error = t;
            done = true;
            schedule();
        }

        @Override
        public void onComplete() {
            if (done) {//控制不會二次執(zhí)行
                return;
            }
            done = true;
            schedule();
        }
...
        void schedule() {
            if (getAndIncrement() == 0) {
                worker.schedule(this);
            }
        }
...
        @Override
        public void run() {
            if (outputFused) {
                drainFused();
            } else {
                drainNormal();
            }
        }
...

在上面代碼中,在onNext,onError,onComplete方法中都調(diào)用了schedule()方法,在這些方法中都沒有做什么特殊處理。里面習性worker.schedule(this)。代碼與上面的subscribeOn中類似,同樣都是通過Work進行調(diào)度與切換。
我們這里傳入的是AndroidSchedulers.mainThread(),它的本質(zhì)就是一個HandlerScheduler,我們就看HandleerScheduler中的schedule方法

@Override
public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
    ...
    ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);

    Message message = Message.obtain(handler, scheduled);
    message.obj = this; // Used as token for batch disposal of this worker's runnables.

    handler.sendMessageDelayed(message, unit.toMillis(delay));

    // Re-check disposed state for removing in case we were racing a call to dispose().
    if (disposed) {
        handler.removeCallbacks(scheduled);
        return Disposables.disposed();
    }

    return scheduled;
}

看著這里明白了,原來是通過handle發(fā)送到android主線程。
接下來我們繼續(xù)回到ObservableObserveOn類中繼續(xù)看他的run方法

@Override
public void run() {
    if (outputFused) {
        drainFused();
    } else {
        //走這里
        drainNormal();
    }
}

我們發(fā)現(xiàn)drainNormal就是從隊列中取出參數(shù)T,然后做了一些檢查,最后調(diào)用其onNext,而此時,因為經(jīng)由Hanlder回調(diào)在主線程了。不過你可能會感到奇怪,這里并沒有onError和onComplete的邏輯啊,不要急,我們回想一下剛才ObserveOnObserver的onComplete方法,它里面主要做了兩個操作:
(1)將done置為true
(2)同樣回調(diào)schedule
接下里我們看這里

if (checkTerminated(done, q.isEmpty(), a)) {
    return;
}

boolean checkTerminated(boolean d, boolean empty, Observer<? super T> a) {
            if (cancelled) {
                queue.clear();
                return true;
            }
            //d為done,即true
            if (d) {
                Throwable e = error;
                //delayError為false
                if (delayError) {
                    if (empty) {
                        if (e != null) {
                            a.onError(e);
                        } else {
                            a.onComplete();
                        }
                        worker.dispose();
                        return true;
                    }
                } else {
                    //此處我們回調(diào)的是onComplete,所以e為null
                    if (e != null) {
                        queue.clear();
                        a.onError(e);
                        worker.dispose();
                        return true;
                    } else
                    if (empty) {
                        //因為此時onNext任務隊列為空,所以走到這
                        a.onComplete();
                        worker.dispose();
                        return true;
                    }
                }
            }
            return false;
        }

到現(xiàn)在已經(jīng)很明朗了,執(zhí)行完全部的onNext,在回調(diào)onComplete時,返回true,所以drainNormal后面相關代碼就不再執(zhí)行,因為已經(jīng)return了。onError也是同樣的道理。同時,我們在上面代碼中還發(fā)現(xiàn)一個問題,那就是我一回調(diào)完onComplete就把worker給dispose了,所以后面如果我們繼續(xù)調(diào)用onError就不會繼續(xù)執(zhí)行了,因為已經(jīng)停止訂閱。
到這里我們就可以總結一下subscribeOn和observerOn的使用:
(1)subscribeOn只對上游有效,因為是在訂閱過程中傳遞的,如果有多個,那么只有第一個”生效”(其實對于傳遞訂閱關系都生效了,只是最終事件發(fā)射只體現(xiàn)出了最上游subscribeOn的作用)
(2)observerOn只對下游有效,因為它是在事件發(fā)射出來之后,回調(diào)事件的過程中生效的

參考自

最后編輯于
?著作權歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容