Rxjava2.0 AndroidSchedulers. mainThread() 主線程調(diào)度分析

這篇文章探究一下rxjava安卓主線程是怎么實現(xiàn)的,上代碼:

public final class AndroidSchedulers {

    public static Scheduler mainThread() {
        //老樣式,這個方法實際直接返回入?yún)AIN_THREAD,追著他看就行
        return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);
    }
  //靜態(tài)變量,initMainThreadScheduler()返回 MainHolder.DEFAULT,追著看MainHolder.DEFAULT
    private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler(
            new Callable<Scheduler>() {
                @Override public Scheduler call() throws Exception {
                    return MainHolder.DEFAULT;
                }
            });
    //靜態(tài)內(nèi)部類MainHolder ,DEFAULT 在這里初始化,HandlerScheduler就是我們要的主線程scheduler
    private static final class MainHolder {
        static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()));
    }
    public static Scheduler from(Looper looper) {
        if (looper == null) throw new NullPointerException("looper == null");
        return new HandlerScheduler(new Handler(looper));
    }

    private AndroidSchedulers() {
        throw new AssertionError("No instances.");
    }
}

最終得到一個HandlerScheduler對象,這個就是安卓主線程的Scheduler了,看他的構(gòu)造方法,傳入了一個new Handler(Looper.getMainLooper()),很熟悉吧。
了解這個類之前,先把它在哪里調(diào)用的代碼放出來,在上上篇線程切換的文章里講到過的:

  @Override
    protected void subscribeActual(Observer<? super T> observer) {
        if (scheduler instanceof TrampolineScheduler) {
            source.subscribe(observer);
        } else {
            //這個scheduler就是HandlerScheduler,createWorker()方法跟上篇的io線程方法步驟調(diào)用差不多
            Scheduler.Worker w = scheduler.createWorker();
             //最終這個 worker放入了ObserveOnObserver,最終執(zhí)行schdule()方法來執(zhí)行onNext()操作,達到切換線程的目的。
            source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
        }
    }

既然調(diào)用步驟知道了,就帶著這些方法來看看HandlerScheduler類:

final class HandlerScheduler extends Scheduler {
    private final Handler handler;

    HandlerScheduler(Handler handler) {
        this.handler = handler;
    }

    @Override
    public Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit) {
        if (run == null) throw new NullPointerException("run == null");
        if (unit == null) throw new NullPointerException("unit == null");

        run = RxJavaPlugins.onSchedule(run);
        ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
        handler.postDelayed(scheduled, Math.max(0L, unit.toMillis(delay)));
        return scheduled;
    }
    //直接new了一個HandlerWorker
    @Override
    public Worker createWorker() {
        return new HandlerWorker(handler);
    }
    //HandlerWorker是哥靜態(tài)內(nèi)部類,看來各種Scheduler的結(jié)構(gòu)都是相似的
    private static final class HandlerWorker extends Worker {
        private final Handler handler;

        private volatile boolean disposed;
        //構(gòu)造方法,傳入了那個有主線程looper的handler,
        HandlerWorker(Handler handler) {
            this.handler = handler;
        }
        //最終執(zhí)行線程任務(wù)的地方
        @Override
        public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
            if (run == null) throw new NullPointerException("run == null");
            if (unit == null) throw new NullPointerException("unit == null");

            if (disposed) {
                return Disposables.disposed();
            }

            run = RxJavaPlugins.onSchedule(run);

            ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
            //handler的消息機制,obtainf()方法把scheduled 這個raunnable給了massage的callback
          
            Message message = Message.obtain(handler, scheduled);
            message.obj = this; // Used as token for batch disposal of this worker's runnables.
             //發(fā)送出去了,最終這個message會被主線程的Handler劫持,然后因為msg的callback不為空,
            //最終它會自己執(zhí)行run()方法,這樣就達到了在主線程執(zhí)行下游onNext()的目的,
            //不熟悉handler的要再去看看它的源碼咯
            handler.sendMessageDelayed(message, Math.max(0L, unit.toMillis(delay)));

            // Re-check disposed state for removing in case we were racing a call to dispose().
            if (disposed) {
                handler.removeCallbacks(scheduled);
                return Disposables.disposed();
            }

            return scheduled;
        }

        @Override
        public void dispose() {
            disposed = true;
            handler.removeCallbacksAndMessages(this /* token */);
        }

        @Override
        public boolean isDisposed() {
            return disposed;
        }
    }
    //封裝的一個代理類,方便管理生命周期
    private static final class ScheduledRunnable implements Runnable, Disposable {
        private final Handler handler;
        private final Runnable delegate;

        private volatile boolean disposed;

        ScheduledRunnable(Handler handler, Runnable delegate) {
            this.handler = handler;
            this.delegate = delegate;
        }

        @Override
        public void run() {
            try {
                delegate.run();
            } catch (Throwable t) {
                IllegalStateException ie =
                    new IllegalStateException("Fatal Exception thrown on Scheduler.", t);
                RxJavaPlugins.onError(ie);
                Thread thread = Thread.currentThread();
                thread.getUncaughtExceptionHandler().uncaughtException(thread, ie);
            }
        }

        @Override
        public void dispose() {
            disposed = true;
            handler.removeCallbacks(this);
        }

        @Override
        public boolean isDisposed() {
            return disposed;
        }
    }
}
總結(jié):

rxjava切換到android主線程的關(guān)鍵就是那個獲取了mainLooper的handler,利用message和handler的機制達到了切換的目的。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容