RxJava2 源碼分析(二)

概述

上一節(jié)我們分析了最簡單的Rxjava的例子,了解了Rxjava是如何創(chuàng)建事件源,如何發(fā)射事件,何時發(fā)射事件,也清楚了上游和下游是如何關(guān)聯(lián)起來的。
這一節(jié)我們著重來分析下Rxjava強大的線程調(diào)度是如何實現(xiàn)的。

簡單的例子

private void doSomeWork() {
        Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> e) throws Exception {
                Log.i("lx", " subscribe: " + Thread.currentThread().getName());
                Thread.sleep(2000);
                e.onNext("a");
                e.onComplete();
            }
        }).subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.i("lx", " onSubscribe: " + Thread.currentThread().getName());
            }
            @Override
            public void onNext(String str) {
                Log.i("lx", " onNext: " + Thread.currentThread().getName());
            }
            @Override
            public void onError(Throwable e) {
                Log.i("lx", " onError: " + Thread.currentThread().getName());
            }
            @Override
            public void onComplete() {
                Log.i("lx", " onComplete: " + Thread.currentThread().getName());
            }
        });
    }

運行結(jié)果:

com.rxjava2.android.samples I/lx:  onSubscribe: main
com.rxjava2.android.samples I/lx:  subscribe: main
com.rxjava2.android.samples I/lx:  onNext: main
com.rxjava2.android.samples I/lx:  onComplete: main

因為此方法筆者是在main線程中調(diào)用的,所以沒有進行線程調(diào)度的情況下,所有方法都運行在main線程中。但我們知道Android的UI線程是不能做網(wǎng)絡(luò)操作,也不能做耗時操作,所以一般我們把網(wǎng)絡(luò)或耗時操作都放在非UI線程中執(zhí)行。接下來我們就來感受下Rxjava強大的線程調(diào)度能力。

private void doSomeWork() {
        Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> e) throws Exception {
                Log.i("lx", " subscribe: " + Thread.currentThread().getName());
                Thread.sleep(2000);
                e.onNext("a");
                e.onComplete();
            }
        }).subscribeOn(Schedulers.io()) //增加了這一句
          .subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.i("lx", " onSubscribe: " + Thread.currentThread().getName());
            }
            @Override
            public void onNext(String str) {
                Log.i("lx", " onNext: " + Thread.currentThread().getName());
            }
            @Override
            public void onError(Throwable e) {
                Log.i("lx", " onError: " + Thread.currentThread().getName());
            }
            @Override
            public void onComplete() {
                Log.i("lx", " onComplete: " + Thread.currentThread().getName());
            }
        });
    }

運行結(jié)果:

com.rxjava2.android.samples I/lx:  onSubscribe: main
com.rxjava2.android.samples I/lx:  subscribe: RxCachedThreadScheduler-1
com.rxjava2.android.samples I/lx:  onNext: RxCachedThreadScheduler-1
com.rxjava2.android.samples I/lx:  onComplete: RxCachedThreadScheduler-1

只增加了subscribeOn這一句代碼, 就發(fā)生如此神奇的現(xiàn)象,除了onSubscribe方法還運行在main線程(訂閱發(fā)生的線程)其它方法全部都運行在一個名為RxCachedThreadScheduler-1的線程中。我們來看看rxjava是怎么完成這個線程調(diào)度的。

線程調(diào)度subscribeOn

首先我們先分析下Schedulers.io()這個東東。

    @NonNull
    public static Scheduler io() {
        return RxJavaPlugins.onIoScheduler(IO); // hook function
        // 等價于
        return IO;
    }

再看看IO是什么, IO是個static變量,初始化的地方是

IO = RxJavaPlugins.initIoScheduler(new IOTask()); // 又是hook function
// 等價于
IO = callRequireNonNull(new IOTask());
// 等價于
IO = new IOTask().call();

繼續(xù)看看IOTask

static final class IOTask implements Callable<Scheduler> {
        @Override
        public Scheduler call() throws Exception {
            return IoHolder.DEFAULT;
            // 等價于
            return new IoScheduler();
        }
    }

代碼層次很深,為了便于記憶,我們再回顧一下:

Schedulers.io()等價于 new IoScheduler()

    // Schedulers.io()等價于
    @NonNull
    public static Scheduler io() {
        return new IoScheduler();
    }

好了,排除了其他干擾代碼,接下來看看IoScheduler()是什么東東了
IoScheduler看名稱就知道是個IO線程調(diào)度器,根據(jù)代碼注釋得知,它就是一個用來創(chuàng)建和緩存線程的線程池。看到這個豁然開朗了,原來Rxjava就是通過這個調(diào)度器來調(diào)度線程的,至于具體怎么實現(xiàn)我們接著往下看

    public IoScheduler() {
        this(WORKER_THREAD_FACTORY);
    }
    
    public IoScheduler(ThreadFactory threadFactory) {
        this.threadFactory = threadFactory;
        this.pool = new AtomicReference<CachedWorkerPool>(NONE);
        start();
    }

    @Override
    public void start() {
        CachedWorkerPool update = new CachedWorkerPool(KEEP_ALIVE_TIME, KEEP_ALIVE_UNIT, threadFactory);
        if (!pool.compareAndSet(NONE, update)) {
            update.shutdown();
        }
    }
    
    CachedWorkerPool(long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) {
            this.keepAliveTime = unit != null ? unit.toNanos(keepAliveTime) : 0L;
            this.expiringWorkerQueue = new ConcurrentLinkedQueue<ThreadWorker>();
            this.allWorkers = new CompositeDisposable();
            this.threadFactory = threadFactory;

            ScheduledExecutorService evictor = null;
            Future<?> task = null;
            if (unit != null) {
                evictor = Executors.newScheduledThreadPool(1, EVICTOR_THREAD_FACTORY);
                task = evictor.scheduleWithFixedDelay(this, this.keepAliveTime, this.keepAliveTime, TimeUnit.NANOSECONDS);
            }
            evictorService = evictor;
            evictorTask = task;
        }

從上面的代碼可以看出,new IoScheduler()后Rxjava會創(chuàng)建CachedWorkerPool的線程池,同時也創(chuàng)建并運行了一個名為RxCachedWorkerPoolEvictor的清除線程,主要作用是清除不再使用的一些線程。

但目前只創(chuàng)建了線程池并沒有實際的thread,所以Schedulers.io()相當(dāng)于只做了線程調(diào)度的前期準(zhǔn)備。

OK,終于可以開始分析Rxjava是如何實現(xiàn)線程調(diào)度的?;氐紻emo來看subscribeOn()方法的內(nèi)部實現(xiàn):

public final Observable<T> subscribeOn(Scheduler scheduler) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
    }

很熟悉的代碼RxJavaPlugins.onAssembly,上一篇已經(jīng)分析過這個方法,就是個hook function, 等價于直接return new ObservableSubscribeOn<T>(this, scheduler);, 現(xiàn)在知道了這里的scheduler其實就是IoScheduler。

跟蹤代碼進入ObservableSubscribeOn,
可以看到這個ObservableSubscribeOn 繼承自O(shè)bservable,并且擴展了一些屬性,增加了scheduler。 各位看官,這不就是典型的裝飾模式嘛,Rxjava中大量用到了裝飾模式,后面還會經(jīng)常看到這種wrap類。

上篇文章我們已經(jīng)知道了Observable.subscribe()方法最終都是調(diào)用了對應(yīng)的實現(xiàn)類的subscribeActual方法。我們重點分析下subscribeActual:

    @Override
    public void subscribeActual(final Observer<? super T> s) {
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);

        // 沒有任何線程調(diào)度,直接調(diào)用的,所以下游的onSubscribe方法沒有切換線程, 
        //本文demo中下游就是觀察者,所以我們明白了為什么只有onSubscribe還運行在main線程
        s.onSubscribe(parent);

        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }

SubscribeOnObserver也是裝飾模式的體現(xiàn), 是對下游observer的一個wrap,只是添加了Disposable的管理。

接下來分析最重要的scheduler.scheduleDirect(new SubscribeTask(parent))

// 這個類很簡單,就是一個Runnable,最終運行上游的subscribe方法
    final class SubscribeTask implements Runnable {
        private final SubscribeOnObserver<T> parent;

        SubscribeTask(SubscribeOnObserver<T> parent) {
            this.parent = parent;
        }

        @Override
        public void run() {
            source.subscribe(parent);
        }
    }
    @NonNull
    public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
        // IoSchedular 中的createWorker()
        final Worker w = createWorker();
        // hook decoratedRun=run;
        final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
        // decoratedRun的wrap,增加了Dispose的管理
        DisposeTask task = new DisposeTask(decoratedRun, w);
        // 線程調(diào)度
        w.schedule(task, delay, unit);

        return task;
    }

回到IoSchedular

    public Worker createWorker() {
        // 工作線程是在此時創(chuàng)建的
        return new EventLoopWorker(pool.get());
    }
    
    public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
            if (tasks.isDisposed()) {
                // don't schedule, we are unsubscribed
                return EmptyDisposable.INSTANCE;
            }
            // action 中就包含上游subscribe的runnable
            return threadWorker.scheduleActual(action, delayTime, unit, tasks);
        }
    

最終線程是在這個方法內(nèi)調(diào)度并執(zhí)行的。

public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
        // decoratedRun = run, 包含上游subscribe方法的runnable
        Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

        // decoratedRun的wrap,增加了dispose的管理
        ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);

        if (parent != null) {
            if (!parent.add(sr)) {
                return sr;
            }
        }

        // 最終decoratedRun被調(diào)度到之前創(chuàng)建或從線程池中取出的線程,
        // 也就是說在RxCachedThreadScheduler-x運行
        Future<?> f;
        try {
            if (delayTime <= 0) {
                f = executor.submit((Callable<Object>)sr);
            } else {
                f = executor.schedule((Callable<Object>)sr, delayTime, unit);
            }
            sr.setFuture(f);
        } catch (RejectedExecutionException ex) {
            if (parent != null) {
                parent.remove(sr);
            }
            RxJavaPlugins.onError(ex);
        }

        return sr;
    }

至此我們終于明白了Rxjava是如何調(diào)度線程并執(zhí)行的,通過subscribeOn方法將上游生產(chǎn)事件的方法運行在指定的調(diào)度線程中。

com.rxjava2.android.samples I/lx:  onSubscribe: main
com.rxjava2.android.samples I/lx:  subscribe: RxCachedThreadScheduler-1
com.rxjava2.android.samples I/lx:  onNext: RxCachedThreadScheduler-1
com.rxjava2.android.samples I/lx:  onComplete: RxCachedThreadScheduler-1

從上面的運行結(jié)果來看,因為上游生產(chǎn)者已被調(diào)度到RxCachedThreadScheduler-1線程中,同時發(fā)射事件并沒有切換線程,所以發(fā)射后消費事件的onNext onErro onComplete也在RxCachedThreadScheduler-1線程中。

總結(jié)

  1. Schedulers.io()等價于 new IoScheduler()
  2. new IoScheduler() Rxjava創(chuàng)建了線程池,為后續(xù)創(chuàng)建線程做準(zhǔn)備,同時創(chuàng)建并運行了一個清理線程RxCachedWorkerPoolEvictor,定期執(zhí)行清理任務(wù)。
  3. subscribeOn()返回一個ObservableSubscribeOn對象,它是Observable的一個裝飾類,增加了scheduler
  4. 調(diào)用subscribe()方法,在這個方法調(diào)用后,subscribeActual()被調(diào)用,才真正執(zhí)行了IoSchduler中的createWorker()創(chuàng)建線程并運行,最終將上游Observablesubscribe()方法調(diào)度到新創(chuàng)建的線程中運行。

現(xiàn)在我們知道了被觀察者(事件上游)執(zhí)行線程是如何被調(diào)度到指定線程中執(zhí)行的,但很多情況下,我們希望觀察者(事件下游)處理事件最好在UI線程執(zhí)行,比如更新UI操作等。但下游何時調(diào)度,如何調(diào)度由于篇幅問題,將放到下節(jié)繼續(xù)分析。敬請期待。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容