前言
通過(guò)前一篇的從觀察者模式出發(fā),聊聊RxJava,我們大致理解了RxJava的實(shí)現(xiàn)原理,在RxJava中可以非常方便的實(shí)現(xiàn)不同線(xiàn)程間的切換。subscribeOn 用于指定上游線(xiàn)程,observeOn 用于指定下游線(xiàn)程,多次用 subscribeOn 指定上游線(xiàn)程只有第一次有效,多次用 observeOn 指定下次線(xiàn)程,每次都有效;簡(jiǎn)直太方便了,比直接使用Handler省了不少力氣,同時(shí)也不用去關(guān)注內(nèi)存泄漏的問(wèn)題了。本篇就來(lái)看看在RxJava中上游是如何實(shí)現(xiàn)線(xiàn)程切換。
RxJava 基礎(chǔ)原理
為了方便后面的敘述,這里通過(guò)下面的UML圖簡(jiǎn)單回顧一下上一篇的內(nèi)容。

此圖并沒(méi)有完整的展現(xiàn)圖中各個(gè)接口和類(lèi)之間的各種關(guān)系,因?yàn)槟菢訒?huì)導(dǎo)致整個(gè)圖錯(cuò)綜復(fù)雜,不便于查看,這里只繪制出了RxJava各個(gè)類(lèi)之間核心關(guān)系網(wǎng)絡(luò)
從上面的UML圖中可以看出,具體的實(shí)現(xiàn)類(lèi)只有ObservableCreate和CreateEmitter。CreateEmitter是ObservableCreate的內(nèi)部類(lèi)(PlantUML 怎么繪制內(nèi)部類(lèi),沒(méi)搞懂,玩的轉(zhuǎn)的同學(xué)請(qǐng)賜教呀(▽))。
上篇說(shuō)過(guò)Observable創(chuàng)建的過(guò)程,可以簡(jiǎn)化如下:
Observable mObservable=newObservableCreate(newObservableOnSubscribe())
結(jié)合圖可以更直觀的體現(xiàn)出這一點(diǎn)。ObservableCreate 內(nèi)部持有ObservableOnSubscribe的引用。
當(dāng)觀察者訂閱主題后:
mObservable.subscribe(mObserver);
ObservableCreate 中的subscribeActual()方法就會(huì)執(zhí)行,
protectedvoidsubscribeActual(Observer observer){? ? ? ? CreateEmitter parent =newCreateEmitter(observer);? ? ? ? observer.onSubscribe(parent);try{? ? ? ? ? ? source.subscribe(parent);? ? ? ? }catch(Throwable ex) {? ? ? ? ? ? Exceptions.throwIfFatal(ex);? ? ? ? ? ? parent.onError(ex);? ? ? ? }? ? }
在這個(gè)過(guò)程中會(huì)創(chuàng)建CreateEmitter 的實(shí)例,而這個(gè)CreateEmitter實(shí)現(xiàn)了Emitter和Disposable接口,同時(shí)又持有Observer的引用(當(dāng)然這個(gè)引用是ObservableCreate傳遞給他的)。接著就會(huì)執(zhí)行ObservableOnSubscribe的subscribe 方法,方法的參數(shù)即為剛剛創(chuàng)建的CreateEmitter 的實(shí)例,接著一系列連鎖反應(yīng),Emitter 接口中的方法(onNext,onComplete等)開(kāi)始執(zhí)行,在CreateEmitter內(nèi)部,Observer接口中對(duì)應(yīng)的方法依次執(zhí)行,這樣就實(shí)現(xiàn)了一次從主題(上游)到觀察者(下游)的事件傳遞。
source.subscribe(parent)
這里的 source 是ObservableOnSubscribe的實(shí)例,parent是CreateEmitter的實(shí)例。上面加粗文本敘述的內(nèi)容,就是這行代碼,可以說(shuō)這是整個(gè)訂閱過(guò)程最核心的實(shí)現(xiàn)。
好了,回顧完基礎(chǔ)知識(shí)后,馬上進(jìn)入正題,看看RxJava是如何實(shí)現(xiàn)線(xiàn)程切換的。
RxJava 之 subscribeOn
我們知道正常情況下,所有的內(nèi)容都是在主線(xiàn)程執(zhí)行,既然這里提到了線(xiàn)程切換,那么必然是切換到了子線(xiàn)程,因此,這里需要關(guān)注線(xiàn)程的問(wèn)題,我們就帶著下面這幾個(gè)問(wèn)題去閱讀代碼。
1.是哪個(gè)對(duì)象在什么時(shí)候創(chuàng)建了子線(xiàn)程,是一種怎樣的方式創(chuàng)建的?
2.子線(xiàn)程又是如何啟動(dòng)的?
3.上游事件是怎么跑到子線(xiàn)程里執(zhí)行的?
4.多次用 subscribeOn 指定上游線(xiàn)程為什么只有第一次有效 ?
示例
首先看一下,日常開(kāi)發(fā)中實(shí)現(xiàn)線(xiàn)程切換的具體實(shí)現(xiàn)
privatevoidmultiThread(){? ? ? ? Observable.create(newObservableOnSubscribe() {@Overridepublicvoidsubscribe(ObservableEmitter e)throwsException{? ? ? ? ? ? ? ? e.onNext("This msg from work thread :"+ Thread.currentThread().getName());? ? ? ? ? ? ? ? sb.append("\nsubscribe: currentThreadName=="+ Thread.currentThread().getName());? ? ? ? ? ? }? ? ? ? })? ? ? ? ? ? ? ? .subscribeOn(Schedulers.newThread())? ? ? ? ? ? ? ? .observeOn(AndroidSchedulers.mainThread())? ? ? ? ? ? ? ? .subscribe(newConsumer() {@Overridepublicvoidaccept(String s)throwsException{? ? ? ? ? ? ? ? ? ? ? ? Log.e(TAG,"accept: s= "+ s);? ? ? ? ? ? ? ? ? ? }? ? ? ? ? ? ? ? });? ? }
這段代碼,使用過(guò)RxJava的同學(xué)再熟悉不過(guò)了,上游事件會(huì)在一個(gè)名為 RxNewThreadScheduler-1 的線(xiàn)程執(zhí)行,下游線(xiàn)程會(huì)切換回我們熟悉的Android UI線(xiàn)程。
我們就從subscribeOn(Schedulers.newThread()) 出發(fā),看看這個(gè)代碼的背后,到底發(fā)生了什么。

subscribeOn
這里我們先不管Schedulers.newThread() 是什么鬼,首先看看這個(gè)subscribeOn()方法。
Observable.java---? subscribeOn(Scheduler scheduler)
publicfinalObservablesubscribeOn(Scheduler scheduler){? ? ? ? ObjectHelper.requireNonNull(scheduler,"scheduler is null");returnRxJavaPlugins.onAssembly(newObservableSubscribeOn(this, scheduler));? ? }
可以看到,這個(gè)方法需要一個(gè)Scheduler 類(lèi)型的參數(shù)。
RxJavaPlugins.java--- onAssembly(@NonNull Observable source)
publicstaticObservableonAssembly(@NonNull Observable source){? ? ? ? Function f = onObservableAssembly;if(f !=null) {returnapply(f, source);? ? ? ? }returnsource;? ? }
O(∩_∩)O哈哈~,是不是覺(jué)得似曾相識(shí),和create操作符一個(gè)套路呀。因此,observeOn也可以簡(jiǎn)化如下:
newObservableSubscribeOn(this, Schedulers.newThread());
這里你也許會(huì)有疑問(wèn),這個(gè)this是什么呢?其實(shí)這個(gè)this就是Observable,具體到上面的代碼來(lái)說(shuō)就是ObservableCreate,總之就是一個(gè)具體的Observable。
接著看ObservableSubscribeOn 這個(gè)類(lèi)
publicfinalclassObservableSubscribeOnextendsAbstractObservableWithUpstream{}
看一下AbstractObservableWithUpstream.java
abstractclassAbstractObservableWithUpstreamextendsObservableimplementsHasUpstreamObservableSource{/** The source consumable Observable. */protectedfinalObservableSource source;? ? AbstractObservableWithUpstream(ObservableSource source) {this.source = source;? ? }@OverridepublicfinalObservableSourcesource(){returnsource;? ? }}
再看一下HasUpstreamObservableSource.java
/** * Interface indicating the implementor has an upstream ObservableSource-like source available * via {@link#source()} method. * *@param the value type */publicinterfaceHasUpstreamObservableSource{/**? ? * Returns the upstream source of this Observable.? ? *
Allows discovering the chain of observables.? ? *@returnthe source ObservableSource? ? */ObservableSourcesource();}
饒了半天,ObservableSubscribeOn 原來(lái)和上一篇說(shuō)的ObservableCreate一樣,也是Observable的一個(gè)子類(lèi)。只不過(guò)比ObservableCreate多實(shí)現(xiàn)了一個(gè)接口HasUpstreamObservableSource,這個(gè)接口很有意思,他的source()方法返回類(lèi)型是ObservableSource(還記得這個(gè)類(lèi)的角色嗎?)。也就是說(shuō)ObservableSubscribeOn這個(gè)Observable是一個(gè)擁有上游的Observable。他有一個(gè)非常關(guān)鍵的屬性source,這個(gè)source就代表了他的上游。
我們接著看ObservableSubscribeOn的具體實(shí)現(xiàn)。
publicfinalclassObservableSubscribeOnextendsAbstractObservableWithUpstream{finalScheduler scheduler;publicObservableSubscribeOn(ObservableSource source, Scheduler scheduler){super(source);this.scheduler = scheduler;? ? }@OverridepublicvoidsubscribeActual(finalObserver s){finalSubscribeOnObserver parent =newSubscribeOnObserver(s);// observer 調(diào)用onSubscribe方法,獲取上游的控制權(quán)s.onSubscribe(parent);? ? ? ? parent.setDisposable(scheduler.scheduleDirect(newSubscribeTask(parent)));? ? }}
首先看他的構(gòu)造函數(shù),參數(shù)source就是我們之前提到過(guò)的this,scheduler就是Schedulers.newThread()。同時(shí)調(diào)用了父類(lèi)AbstractObservableWithUpstream的構(gòu)造函數(shù),這里結(jié)合之前的結(jié)論,我們可以確定通過(guò)這個(gè)構(gòu)造函數(shù),就創(chuàng)建出來(lái)了一個(gè)包含上游的ObservableSubscribeOn實(shí)例。
再看實(shí)現(xiàn)訂閱關(guān)系的關(guān)鍵方法subscribeActual(),在這里創(chuàng)建了一個(gè)SubscribeOnObserver的實(shí)例,SubscribeOnObserver 是AtomicReference的子類(lèi)(保證原子性),同時(shí)實(shí)現(xiàn)了 Observer接口 和 Disposable 接口;你可以把他理解成一個(gè)Observer。
我們之前說(shuō)過(guò),subscribeActual()是實(shí)現(xiàn)上下游之間訂閱關(guān)系的重要方法。因?yàn)橹挥姓嬲龑?shí)現(xiàn)了訂閱關(guān)系,上下游之間才能連接起來(lái)。我們看這個(gè)方法的最后一句代碼。
parent.setDisposable(scheduler.scheduleDirect(newSubscribeTask(parent)));
這句代碼,可以說(shuō)就是非常關(guān)鍵,因?yàn)閺倪@里開(kāi)始了一系列的連鎖反應(yīng)。首先看一下SubscribeTask
finalclassSubscribeTaskimplementsRunnable{privatefinalSubscribeOnObserver parent;? ? ? ? SubscribeTask(SubscribeOnObserver parent) {this.parent = parent;? ? ? ? }@Overridepublicvoidrun(){? ? ? ? ? ? source.subscribe(parent);? ? ? ? }? ? }
看到這句source.subscribe(parent),是不是覺(jué)得似曾相識(shí)呢?
SubscribeTask 實(shí)現(xiàn)了是Runnable接口,在其run方法中,定義了一個(gè)需要在線(xiàn)程中執(zhí)行的任務(wù)。按照類(lèi)的繼承關(guān)系,很明顯source 就是ObservableSubscribeOn 的上游Observable,parent是一個(gè)Observer。也就是說(shuō)這個(gè)run方法要執(zhí)行的內(nèi)容就是實(shí)現(xiàn)ObservableSubscribeOn的上游和Observer的訂閱。一旦某個(gè)線(xiàn)程執(zhí)行了這個(gè)Runnable(SubscribeTask),就會(huì)觸發(fā)了這個(gè)run方法,從而實(shí)現(xiàn)訂閱,而一旦這個(gè)訂閱實(shí)現(xiàn),那么后面的流程就是上節(jié)所說(shuō)的事情了。
這里可以解答第三個(gè)問(wèn)題了,上游事件是怎么給弄到子線(xiàn)程里去的,這里很明顯了,就是直接把訂閱方法放在了一個(gè)Runnable中去執(zhí)行,這樣就一旦這個(gè)Runnable在某個(gè)子線(xiàn)程執(zhí)行,那么上游所有事件只能在這個(gè)子線(xiàn)程中執(zhí)行了。
好了,線(xiàn)程要執(zhí)行的任務(wù)似乎創(chuàng)建完了,下面就接著找看看子線(xiàn)程是怎么創(chuàng)建的。回過(guò)頭繼續(xù)看剛才的方法,
scheduler.scheduleDirect(newSubscribeTask(parent))
Scheduler.java----scheduleDirect
publicDisposablescheduleDirect(@NonNull Runnable run){returnscheduleDirect(run,0L, TimeUnit.NANOSECONDS);? ? }publicDisposablescheduleDirect(@NonNull Runnable run,longdelay, @NonNull TimeUnit unit){finalWorker w = createWorker();// 對(duì)run進(jìn)行了一次裝飾finalRunnable decoratedRun = RxJavaPlugins.onSchedule(run);? ? ? ? DisposeTask task =newDisposeTask(decoratedRun, w);? ? ? ? w.schedule(task, delay, unit);returntask;? ? }@NonNull// 抽象方法publicabstractWorkercreateWorker();
首先看一下Worker類(lèi)
/**? ? * Sequential Scheduler for executing actions on a single thread or event loop.? ? *
? ? * Disposing the {@linkWorker} cancels all outstanding work and allows resource cleanup.? ? */publicabstractstaticclassWorkerimplementsDisposable{@NonNullpublicDisposableschedule(@NonNull Runnable run){returnschedule(run,0L, TimeUnit.NANOSECONDS);? ? ? ? }@NonNullpublicabstractDisposableschedule(@NonNull Runnable run,longdelay, @NonNull TimeUnit unit);? ? ? ? ? ? }
Worker是Scheduler內(nèi)部的一個(gè)靜態(tài)抽象類(lèi),實(shí)現(xiàn)了Disposable接口,其schedule()方法也是抽象的。
再看一下DisposeTask
staticfinalclassDisposeTaskimplementsRunnable,Disposable{finalRunnable decoratedRun;finalWorker w;? ? ? ? Thread runner;? ? ? ? DisposeTask(Runnable decoratedRun, Worker w) {this.decoratedRun = decoratedRun;this.w = w;? ? ? ? }@Overridepublicvoidrun(){? ? ? ? ? ? runner = Thread.currentThread();try{? ? ? ? ? ? ? ? decoratedRun.run();? ? ? ? ? ? }finally{? ? ? ? ? ? ? ? dispose();? ? ? ? ? ? ? ? runner =null;? ? ? ? ? ? }? ? ? ? }@Overridepublicvoiddispose(){if(runner == Thread.currentThread() && winstanceofNewThreadWorker) {? ? ? ? ? ? ? ? ((NewThreadWorker)w).shutdown();? ? ? ? ? ? }else{? ? ? ? ? ? ? ? w.dispose();? ? ? ? ? ? }? ? ? ? }@OverridepublicbooleanisDisposed(){returnw.isDisposed();? ? ? ? }? ? }
DisposeTask 又是一個(gè)Runnable,同時(shí)也實(shí)現(xiàn)了Disposable接口。可以看到在他的run方法中會(huì)執(zhí)行decoratedRun的run方法,這個(gè)decoratedRun其實(shí)就是參數(shù)中傳遞進(jìn)來(lái)的run,也就是說(shuō),執(zhí)行了這個(gè)DisposeTask的run方法,就會(huì)觸發(fā)SubscribeTask中的run方法,因此,我們就要關(guān)注是誰(shuí)執(zhí)行了這個(gè)DisposeTask。
回到scheduleDirect()方法
publicDisposablescheduleDirect(@NonNull Runnable run,longdelay, @NonNull TimeUnit unit){finalWorker w = createWorker();// 對(duì)run進(jìn)行了一次裝飾finalRunnable decoratedRun = RxJavaPlugins.onSchedule(run);? ? ? ? DisposeTask task =newDisposeTask(decoratedRun, w);? ? ? ? w.schedule(task, delay, unit);returntask;? ? }
scheduleDirect()方法的實(shí)現(xiàn)我們總結(jié)一下:
創(chuàng)建一個(gè)Worker對(duì)象w,而在Scheduler類(lèi)中createWorker()方法被定義為抽象方法,因此我們需要去Scheduler的具體實(shí)現(xiàn)中了解這個(gè)Worker的具體實(shí)現(xiàn)。
對(duì)參數(shù)run通過(guò)RxJavaPlugins進(jìn)行一次裝飾,生成一個(gè)decoratedRun的Runnable(通過(guò)源碼可以發(fā)現(xiàn),其實(shí)什么也沒(méi)干,就是原樣返回)
通過(guò)decoratedRun和w生成一個(gè)DisposeTask對(duì)象task
通過(guò)Worker的schedule方法開(kāi)始執(zhí)行這個(gè)task。
ε=(′ο`*)))唉,說(shuō)了這么久,子線(xiàn)程是如何創(chuàng)建的依然不清楚,無(wú)論是SubscribeTask還是DisposeTask只是定義會(huì)在某個(gè)子線(xiàn)程中執(zhí)行的任務(wù),并不代表子線(xiàn)程已被創(chuàng)建。但是通過(guò)以上代碼,我們也可以收獲一些有價(jià)值的結(jié)論:
最終的Runnable任務(wù),將由某個(gè)具體的Worker對(duì)象的scheduler()方法執(zhí)行。
這個(gè)scheduleDirect會(huì)返回一個(gè)Disposable對(duì)象,這樣我們就可以通過(guò)Observer去控制整個(gè)上游的執(zhí)行了。
好了,到這里對(duì)于subscribeOn()方法的分析已經(jīng)到了盡頭,我們找了最終需要運(yùn)行子任務(wù)的對(duì)象Worker,而這個(gè)Worker是個(gè)抽象類(lèi),因此我們需要關(guān)注Worker的具體實(shí)現(xiàn)了。
下面我們就從剛才丟下的Schedulers.newThread() 換個(gè)角度來(lái)分析,看看能不能找到這個(gè)Worker的具體實(shí)現(xiàn)。
Schedulers.newThread()
前面說(shuō)了subscribeOn()方法需要一個(gè)Scheduler 類(lèi)型的參數(shù),然而通過(guò)前面的分析我們知道Scheduler是個(gè)抽象類(lèi),是無(wú)法被實(shí)例化的。因此,這里就從Schedulers類(lèi)出發(fā)。
/**
* Static factory methods for returning standard Scheduler instances.
*/publicfinalclassSchedulers{}
注釋很清楚,這個(gè)Schedulers就是一個(gè)用于生成Scheduler實(shí)例的靜態(tài)工廠。
下面我們就來(lái)看看,在這個(gè)工廠中newThread() 生成了一個(gè)什么樣的Scheduler實(shí)例。
@NonNullpublicstaticSchedulernewThread(){returnRxJavaPlugins.onNewThreadScheduler(NEW_THREAD);? ? }? ? NEW_THREAD = RxJavaPlugins.initNewThreadScheduler(newNewThreadTask());staticfinalclassNewThreadTaskimplementsCallable{@OverridepublicSchedulercall()throwsException{returnNewThreadHolder.DEFAULT;? ? ? ? }? ? }staticfinalclassNewThreadHolder{staticfinalScheduler DEFAULT =newNewThreadScheduler();? ? }
newThread() 方法經(jīng)過(guò)層層委托處理(最終的創(chuàng)建方式,有點(diǎn)單例模式的意味),最終我們需要的就是一個(gè)NewThreadScheduler的實(shí)例。
NewThreadScheduler.java
publicfinalclassNewThreadSchedulerextendsScheduler{finalThreadFactory threadFactory;privatestaticfinalString THREAD_NAME_PREFIX ="RxNewThreadScheduler";privatestaticfinalRxThreadFactory THREAD_FACTORY;/** The name of the system property for setting the thread priority for this Scheduler. */privatestaticfinalString KEY_NEWTHREAD_PRIORITY ="rx2.newthread-priority";static{intpriority = Math.max(Thread.MIN_PRIORITY, Math.min(Thread.MAX_PRIORITY,? ? ? ? ? ? ? ? Integer.getInteger(KEY_NEWTHREAD_PRIORITY, Thread.NORM_PRIORITY)));? ? ? ? THREAD_FACTORY =newRxThreadFactory(THREAD_NAME_PREFIX, priority);? ? }publicNewThreadScheduler(){this(THREAD_FACTORY);? ? }publicNewThreadScheduler(ThreadFactory threadFactory){this.threadFactory = threadFactory;? ? }@NonNull@OverridepublicWorkercreateWorker(){returnnewNewThreadWorker(threadFactory);? ? }}
不出所料NewThreadScheduler 是Scheduler的一個(gè)子類(lèi),在他的靜態(tài)代碼塊中構(gòu)造了一個(gè)Priority=5的線(xiàn)程工廠。而在我們最最關(guān)注的createWorker()方法中他又用這個(gè)線(xiàn)程工廠創(chuàng)建了一個(gè)NewThreadWorker 的實(shí)例。下面就讓我們看看最終的NewThreadWorker 做了些什么工作。
NewThreadWorker.java(節(jié)選關(guān)鍵內(nèi)容)
publicclassNewThreadWorkerextendsScheduler.WorkerimplementsDisposable{privatefinalScheduledExecutorService executor;volatilebooleandisposed;publicNewThreadWorker(ThreadFactory threadFactory){? ? ? ? executor = SchedulerPoolFactory.create(threadFactory);? ? }@NonNull@OverridepublicDisposableschedule(@NonNullfinalRunnable run){returnschedule(run,0,null);? ? }@Overridepublicvoiddispose(){if(!disposed) {? ? ? ? ? ? disposed =true;? ? ? ? ? ? executor.shutdownNow();? ? ? ? }? ? }}
眾里尋他千百度,終于找到了Worker的實(shí)現(xiàn)了,同時(shí)再一次不出所料的又一次實(shí)現(xiàn)了Disposable接口,o(╥﹏╥)o。
在其構(gòu)造函數(shù)中,通過(guò)NewThreadScheduler中提供的線(xiàn)程工廠threadFactory創(chuàng)建了一個(gè)ScheduledExecutorService。
ScheduledExecutorService.java ---create
publicstaticScheduledExecutorServicecreate(ThreadFactory factory){finalScheduledExecutorService exec = Executors.newScheduledThreadPool(1, factory);if(PURGE_ENABLED && execinstanceofScheduledThreadPoolExecutor) {? ? ? ? ? ? ScheduledThreadPoolExecutor e = (ScheduledThreadPoolExecutor) exec;? ? ? ? ? ? POOLS.put(e, exec);? ? ? ? }returnexec;? ? }
用大名鼎鼎的Executors(Executor的工具類(lèi)),創(chuàng)建了一個(gè)核心線(xiàn)程為1的線(xiàn)程。
至此,我們終于找到了第一個(gè)問(wèn)題的答案,子線(xiàn)程是誰(shuí)如何創(chuàng)建的;在NewThreadScheduler的createWorker()方法中,通過(guò)其構(gòu)建好的線(xiàn)程工廠,在Worker實(shí)現(xiàn)類(lèi)的構(gòu)造函數(shù)中創(chuàng)建了一個(gè)ScheduledExecutorService的實(shí)例,是通過(guò)SchedulerPoolFactory創(chuàng)建的。
同時(shí)可以看到,通過(guò)執(zhí)行dispose 方法,可以使用ScheduledExecutorService的shutdown()方法,停止線(xiàn)程的執(zhí)行。
線(xiàn)程已經(jīng)創(chuàng)建好了,下面就來(lái)看看到底是誰(shuí)啟動(dòng)了這個(gè)線(xiàn)程。前面我們說(shuō)過(guò),Worker的schedule()方法如果執(zhí)行了,就會(huì)執(zhí)行我們定義好的Runnable,通過(guò)這個(gè)Runnable中run方法的執(zhí)行,就可以實(shí)現(xiàn)上下游訂閱關(guān)系。下面就來(lái)看看這個(gè)scheduler()方法。
@NonNull@OverridepublicDisposableschedule(@NonNullfinalRunnable action,longdelayTime, @NonNull TimeUnit unit){if(disposed) {returnEmptyDisposable.INSTANCE;? ? ? ? }returnscheduleActual(action, delayTime, unit,null);? ? }@NonNullpublicScheduledRunnablescheduleActual(finalRunnable run,longdelayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent){? ? ? ? Runnable decoratedRun = RxJavaPlugins.onSchedule(run);? ? ? ? ScheduledRunnable sr =newScheduledRunnable(decoratedRun, parent);if(parent !=null) {if(!parent.add(sr)) {returnsr;? ? ? ? ? ? }? ? ? ? }? ? ? ? Future f;try{if(delayTime <=0) {? ? ? ? ? ? ? ? f = executor.submit((Callable)sr);? ? ? ? ? ? }else{? ? ? ? ? ? ? ? f = executor.schedule((Callable)sr, delayTime, unit);? ? ? ? ? ? }? ? ? ? ? ? sr.setFuture(f);? ? ? ? }catch(RejectedExecutionException ex) {if(parent !=null) {? ? ? ? ? ? ? ? parent.remove(sr);? ? ? ? ? ? }? ? ? ? ? ? RxJavaPlugins.onError(ex);? ? ? ? }returnsr;? ? }
到這里,已經(jīng)很明顯了,在schedulerActual方法中,會(huì)通過(guò)剛才創(chuàng)建好的子線(xiàn)程對(duì)象executor通過(guò)submit或schedule執(zhí)行一個(gè)Runnable任務(wù)(雖然這個(gè)Runnable對(duì)象再一次經(jīng)過(guò)了各種裝飾和包裝,但其本質(zhì)沒(méi)有發(fā)生變化),并將執(zhí)行結(jié)果封裝后返回。而這個(gè)Runnable對(duì)象追根溯源來(lái)說(shuō),就是我們?cè)贠bservableSubscribeOn類(lèi)中創(chuàng)建的一個(gè)SubscribeTask對(duì)象。因此,當(dāng)這個(gè)子線(xiàn)程開(kāi)始運(yùn)行的時(shí)候就是執(zhí)行SubscribeTask中run()方法的時(shí)機(jī);一旦這個(gè)run方法執(zhí)行,那么
source.subscribe(parent)
這句最關(guān)鍵的代碼就開(kāi)始執(zhí)行了,一切的一切又回到了我們上一篇那熟悉的流程了。
好了,按照上面的流程捋下來(lái),感覺(jué)還是有點(diǎn)分散,那么就用UML圖看看整體的結(jié)構(gòu)。

我們看最下面的ObservableSubscribeOn,他是subscribeOn 返回的Observable對(duì)象,他持有一個(gè)Scheduler 實(shí)例的引用,而這個(gè)Scheduler實(shí)例就是NewThreadScheduler(即Schedulers.newThreade())的一個(gè)實(shí)例。ObservableSubscribeOn 的subscribeActual方法,會(huì)觸發(fā)NewThreadScheduler去執(zhí)行SubscribeTask中定義的任務(wù),而這個(gè)具體的任務(wù)又將由Worker類(lèi)創(chuàng)建的子線(xiàn)程去執(zhí)行。這樣就把上游事件放到了一個(gè)子線(xiàn)程中實(shí)現(xiàn)。
至于最后一個(gè)問(wèn)題,多次用 subscribeOn 指定上游線(xiàn)程為什么只有第一次有效?,看完通篇其實(shí)也很好理解了,因?yàn)樯嫌蜲bservable只有一個(gè)任務(wù),就是subscribe(準(zhǔn)確的來(lái)說(shuō)是subscribeActual()),而subscribeOn 要做的事情就是把上游任務(wù)切換到一個(gè)指定線(xiàn)程里,那么一旦被切換到了某個(gè)指定的線(xiàn)程里,后面的切換不就是沒(méi)有意義了嗎。
好了,至此上游事件切換到子線(xiàn)程的過(guò)程我們就明白了。下游事件又是如何切換的且聽(tīng)下回分解,本來(lái)想一篇寫(xiě)完的,結(jié)果發(fā)現(xiàn)越寫(xiě)越多,只能分成兩篇了?。?!o(╯□╰)o。
寫(xiě)在后面的話(huà)
關(guān)于Disposable
在RxJava的分析中,我們經(jīng)常會(huì)遇到Disposable這個(gè)單詞,確切的說(shuō)是接口,這里簡(jiǎn)單說(shuō)一說(shuō)這個(gè)接口。
/**
* Represents a disposable resource.
*/publicinterfaceDisposable{voiddispose();booleanisDisposed();}
我們知道,在Java中,類(lèi)實(shí)現(xiàn)某個(gè)接口,通俗來(lái)說(shuō)就是代表這個(gè)類(lèi)多了一項(xiàng)功能,比如一個(gè)類(lèi)實(shí)現(xiàn)Serializable接口,代表這個(gè)類(lèi)是可以序列化的。這里Disposable也是代表一種能力,這個(gè)能力就是Disposable,就是代表一次性的,用后就丟棄的,比如一次性筷子,還有那啥。
在RxJava中很多類(lèi)都實(shí)現(xiàn)了這個(gè)接口,這個(gè)接口有兩個(gè)方法,isDisposed()顧名思義返回當(dāng)前類(lèi)是否被拋棄,dispose()就是主動(dòng)拋棄。因此,所有實(shí)現(xiàn)了這個(gè)接口的類(lèi),都擁有了這樣一種能力,就是可以判斷自己是否被拋棄,同時(shí)也可以主動(dòng)拋棄自己。
上一篇我們說(shuō)了,Observer通過(guò)onSubscribe(@NonNull Disposable d),會(huì)獲得一個(gè)Disposable,這樣就有能力控制上游的事件發(fā)送了。這樣,我們就不難理解,為什么那么多類(lèi)實(shí)現(xiàn)了這個(gè)接口,因?yàn)橄掠潍@取到的是一個(gè)擁有Disposable的對(duì)象,而一旦擁有了一個(gè)這樣的對(duì)象,那么就可以通過(guò)下游控制上游了??梢哉f(shuō),這是RxJava對(duì)常規(guī)的觀察者模式所做的最給力的改變。
關(guān)于各種ObservableXXX ,subscribeXXX,ObserverXXX
在查看RxJava的源碼時(shí),可能很多人都和我一樣,有一個(gè)巨大的困擾,就是這些類(lèi)的名字好他媽難記,感覺(jué)長(zhǎng)得都差不多,關(guān)鍵念起來(lái)好像也差不多。但其實(shí)本質(zhì)上來(lái)說(shuō),RxJava對(duì)類(lèi)的命名還是非常規(guī)范的,只是我們不太習(xí)慣而已。按照英文單詞翻譯:
Observable 可觀察的
Observer? 觀察者
Subscribe? 訂閱
其實(shí)就這么三個(gè)主語(yǔ),其他的什么ObservableCreate,ObservableSubscribeOn,AbstractObservableWithUpstream,還有上面提到的Disposable,都是對(duì)各種各樣的Observable和Observer的變形和修飾結(jié)果,只要理解這個(gè)類(lèi)的核心含義是什么,就不會(huì)被這些名字搞暈了。
RxJava 可以說(shuō)是博大精深,以上所有分析完全是個(gè)人平時(shí)使用時(shí)的總結(jié)與感悟,有任何錯(cuò)誤之處,還望各位讀者提出,共同進(jìn)步。
關(guān)于RxJava 這里墻裂推薦一篇文章一篇不太一樣的RxJava介紹,感覺(jué)是自扔物線(xiàn)那篇之后,對(duì)RxJava思想感悟最深的一篇了。對(duì)RxJava 有興趣的同學(xué),可以多度幾遍,每次都會(huì)有收獲!!
作者:IAM四十二
鏈接:http://www.itdecent.cn/p/3dd582bb10cc
來(lái)源:簡(jiǎn)書(shū)
簡(jiǎn)書(shū)著作權(quán)歸作者所有,任何形式的轉(zhuǎn)載都請(qǐng)聯(lián)系作者獲得授權(quán)并注明出處。