RxJava強(qiáng)大的地方之一是他的鏈?zhǔn)秸{(diào)用,輕松地在線程之間進(jìn)行切換。這幾天也大概分析了一下RxJava的線程切換的主流程于是打算寫一篇文章及記錄一下。
我們使用RxJava進(jìn)行線程切換的場景很多時(shí)候都是在進(jìn)行網(wǎng)絡(luò)請求的時(shí)候,在IO線程進(jìn)行網(wǎng)絡(luò)數(shù)據(jù)的請求處理,最后在Android的主線程進(jìn)行請求數(shù)據(jù)的結(jié)果處理。
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
當(dāng)然因?yàn)檫@段代碼的使用場景太多我們還可以利用ObservableTransformer操作符對其進(jìn)行簡化
public static <T>ObservableTransformer<T,T> io_main()
{
return new ObservableTransformer<T, T>() {
@Override
public ObservableSource<T> apply(@NonNull Observable<T> upstream) {
return upstream.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread());
}
};
}
這樣我們在使用的時(shí)候就是這樣的:
.compose(RxTransformUtil.<Object>io_main())
是不是感覺方便了一丟丟
好了扯遠(yuǎn)了,現(xiàn)在來分析一下RxJava是如何做到線程的輕松調(diào)度的。
首先有幾個(gè)概念是非常重要的:
Scheduler官方的解釋是這樣的
A Scheduler is an object that specifies an API for scheduling units of work with or without delays or periodically.
初步看來Scheduler就是一個(gè)任務(wù)調(diào)度器相當(dāng)于就是一個(gè)調(diào)度中心的指揮者。當(dāng)然它是一個(gè)抽象類就肯定了Scheduler有很多具體的實(shí)現(xiàn)類,例如IO線程的具體調(diào)度器就是IoScheduler。就像調(diào)度中心指揮者有客運(yùn)中心的指揮者,有機(jī)場中心的指揮者一樣分別有不同的實(shí)現(xiàn)類。
當(dāng)然現(xiàn)在只有指揮者是肯定不行的,光頭司令怎么得行?這個(gè)時(shí)候關(guān)鍵的Worker類出現(xiàn)了,Worker官方的解釋是這樣的
Sequential Scheduler for executing actions on a single thread or event loop.
Disposing the Scheduler.Worker cancels all outstanding work and allows resource cleanup.
可以看到Worker就是線程任務(wù)的具體執(zhí)行者了。和Scheduler一樣Worker同樣也是抽象類,在不同的Scheduler具體實(shí)現(xiàn)類里面Worker也有自己的具體實(shí)現(xiàn)類,例如在IoScheduler類里面,Worker的具體實(shí)現(xiàn)類就是EventLoopWorker,它負(fù)責(zé)管理IO線程的具體操作,接下來我們就找到切入點(diǎn)看一看RxJava源碼里面都做了什么。
這里我們就以最典型的IO線程和主線程之間的切換為例來分析,線程切換的代碼就是上面的代碼。
Scheduler是以工廠方法對外提供它具體的實(shí)現(xiàn)類的。Schedulers.io()可以提供一個(gè)IoScheduler的對象。你可以往里面看最后源碼是如何進(jìn)行IoScheduler的創(chuàng)建的
//創(chuàng)建IoScheduler
static final class IoHolder {
static final Scheduler DEFAULT = new IoScheduler();
}
//接著就行了IoScheduler的一系列初始化,CachedWorkerPool地初始化 ,并由RxThreadFactory進(jìn)行線程地創(chuàng)建,線程優(yōu)先級別設(shè)置,是否是守護(hù)進(jìn)程等等
現(xiàn)在IoScheduler有了,我們就看subscribe里面到底做了什么
public final Observable<T> subscribeOn(Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
Hook我們不用管,可以看到是把當(dāng)前ObservableCreater對象和IoScheduler一起傳進(jìn)了ObservableSubscribeoOn的構(gòu)造函數(shù)里面。進(jìn)入到ObservableSubscribeOn里面看看。
//AbstractObservableWithUpstream只是用來保存上游的源事件流的,就是保存剛剛傳入進(jìn)來的ObservableCreater
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
super(source);
this.scheduler = scheduler;
}
@Override
public void subscribeActual(final Observer<? super T> s) {
//裝飾模式 把下游的Observer裝飾成SubscribeOnObserver
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s); //執(zhí)行下游Observer的onSubscribe(Disposable disposabel)方法,當(dāng)前線程是上游的執(zhí)行線程
s.onSubscribe(parent);
//開啟的子線程最終是以帶Disposable的返回值返回的
//在這里是將子線程加入管理,因?yàn)檫@里是并發(fā)操作所以使用了AtomicReference<Object>的院子操作類,是一種效率高于synchronized的樂觀鎖,感興趣的可以自行上網(wǎng)搜索
//我們只用知道這里加入管理了以后方便在以后我們切斷上下游的時(shí)候可以將我們的子線程一同dispose().
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {
private static final long serialVersionUID = 8094547886072529208L;
final Observer<? super T> actual;
final AtomicReference<Disposable> s;
SubscribeOnObserver(Observer<? super T> actual) {
this.actual = actual;
this.s = new AtomicReference<Disposable>();
}
//這中間的代碼和最基本的鏈?zhǔn)秸{(diào)用關(guān)系是一樣的,只不過在onNext、onError、onComplete中實(shí)際上還是調(diào)用的下游真正的onNext、onError、onComplete
@Override
public void onNext(T t) {
actual.onNext(t);
}
@Override
public void onError(Throwable t) {
actual.onError(t);
}
@Override
public void onComplete() {
actual.onComplete();
}
void setDisposable(Disposable d) {
DisposableHelper.setOnce(this, d);
}
}
//這就是實(shí)際執(zhí)行的Runnable 會把其傳入IoScheduler中供Worker使用。
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
@Override
public void run() {
/*看到了吧,SubscribeOnObserver 作用其實(shí)就是將源事件流發(fā)生的地點(diǎn)和下游的事件流處理的地點(diǎn)訂閱在了子線程中進(jìn)行處理。
這樣上游發(fā)送事件流的地方就被切換到了子線程中。*/
source.subscribe(parent);
}
}
}
接下來我們仔細(xì)看一下上面代碼的這一段:
@Override
public void subscribeActual(final Observer<? super T> s) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
s.onSubscribe(parent);
//這里scheduler.schedlerDirect非常的重要,可以看到RxJava把剛剛包裝好的Runnable對象傳入了方法里
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
我們跟進(jìn)去看一下里面的具體實(shí)現(xiàn)
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run) {
//實(shí)際上是調(diào)用的下面3個(gè)參數(shù)的方法,延遲時(shí)間為0
return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
}
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
//創(chuàng)建具體的Worker類
final Worker w = createWorker();
//hook函數(shù)我們不用管,只要沒有設(shè)置依舊返回的是傳入的Runnable
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
//將runnable和worker封裝到DisposeTask中
DisposeTask task = new DisposeTask(decoratedRun, w);
//執(zhí)行Worker的schedule方法具體的就是EventLoopWorker里面的schedule方法
w.schedule(task, delay, unit);
return task;
}
接下來我們來看一下EventLoopWorker里面的schedule方法是怎么實(shí)現(xiàn)的
@NonNull
@Override
public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
//判斷是否解除訂閱
if (tasks.isDisposed()) {
// don't schedule, we are unsubscribed
return EmptyDisposable.INSTANCE;
}
return threadWorker.scheduleActual(action, delayTime, unit, tasks);
}
可以看到這里如果沒有被解除訂閱的話又會執(zhí)行到NewThreadWorker的scheduleActual方法里面。
@NonNull
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
//hook函數(shù)我們這里不用管decoratedRun依然是傳進(jìn)來的Runnable對象run
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
//ScheduledRunnable是一個(gè)即實(shí)現(xiàn)了Runnable接口又實(shí)現(xiàn)了Callable接口的對象,為了后面能成功加入到線程池當(dāng)中
ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
//將sr加入到CompositeDisposable中,方便管理
if (parent != null) {
if (!parent.add(sr)) {
return sr;
}
}
Future<?> f;
try {
if (delayTime <= 0) {
//將sr加入到線程池當(dāng)中 并將線程的執(zhí)行結(jié)果返回給 Future<?> f
f = executor.submit((Callable<Object>)sr);
} else {
f = executor.schedule((Callable<Object>)sr, delayTime, unit);
}
sr.setFuture(f);//對運(yùn)行結(jié)果進(jìn)行處理
} catch (RejectedExecutionException ex) {
if (parent != null) {
//在CompositeDisposable中一處剛剛加入的sr
parent.remove(sr);
}
RxJavaPlugins.onError(ex);
}
return sr;
}
接下來看一下ScheduledRunnable是如何對返回的結(jié)果進(jìn)行處理的
public void setFuture(Future<?> f) {
//一個(gè)死循環(huán)會一直判斷返回回來的結(jié)果 因?yàn)槠鋵?shí)原子操作類,樂觀鎖的機(jī)制決定了如果不是想要的結(jié)果的話會重新執(zhí)行一次
for (;;) {
Object o = get(FUTURE_INDEX);
if (o == DONE) {
//完成直接return
return;
}
//如果取消訂閱了則直接取消線程任務(wù)
if (o == DISPOSED) {
f.cancel(get(THREAD_INDEX) != Thread.currentThread());
return;
}
//前兩者都不滿足的話 就將future的值存下來
if (compareAndSet(FUTURE_INDEX, o, f)) {
return;
}
}
}
到現(xiàn)在為止上游的線程切換大體的流程就分析的差不多了,我們從源碼中也可以分析出很多網(wǎng)上經(jīng)常說的一些結(jié)論,最經(jīng)典的一條就是上游切換線程只有第一次生效,后面的線程切換都不起作用了,其實(shí)分析這點(diǎn)最重要的就是理解 ObservableSubscribeOn類里面下面的這段代碼了
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
@Override
public void run() {
source.subscribe(parent);
}
}
再結(jié)合RxJava的鏈?zhǔn)讲僮鳎幚頂?shù)據(jù)的時(shí)候是自下而上,而發(fā)射數(shù)據(jù)的時(shí)候是自上而下(這句話網(wǎng)上說的太多了,我最開始也是不理解,只有自己真正看過源碼分析了,自己Debug一邊才能真正地理解)。
好了先寫到這里了,剩下的內(nèi)容我會放到另外一篇博客里面,感覺文章太長不利于閱讀。
這篇文章也是我第一次試著去分析源碼最后寫出的,很多都是我自己的理解,所以肯定有不妥當(dāng)或者錯(cuò)誤的地方希望大家看到了以后能給我指出來,我一定改正!
最后
沒有最后了 大家再見~~~