概述
在我的上一篇文章 《理解RxJava(二)操作符流程原理分析》 中,分析了依靠多個操作符鏈?zhǔn)秸{(diào)用的原理。
簡單總結(jié)如下:
1.創(chuàng)建:訂閱前,每一步都生成對應(yīng)的Observable對象,中間的每一步都將上游的Observable存儲;
2.訂閱: 每一步都會生成對應(yīng)的Observer對上一步生成并存儲的Observable進(jìn)行訂閱。訂閱的執(zhí)行順序是由下到上的。
3.執(zhí)行:先執(zhí)行每一步傳入的函數(shù)操作,然后將操作后的數(shù)據(jù)交給下游的Observer繼續(xù)處理。 數(shù)據(jù)的傳遞和處理順序是由上到下的。
本文我將嘗試對RxJava最核心的 線程調(diào)度 的原理進(jìn)行分析。
基本代碼
來看一下基本代碼:
@Test
public void test() throws Exception {
Observable.create((ObservableOnSubscribe<Integer>) e -> {
e.onNext(1);
e.onNext(2);
e.onComplete();
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(i -> System.out.println("onNext : i= " + i));
}
很簡單,即訂閱時將task交給子線程去做,而數(shù)據(jù)的回調(diào)則在Android主線程中執(zhí)行。
一、subscribeOn()
點(diǎn)擊查看源碼:
public final Observable<T> subscribeOn(Scheduler scheduler) {
//非空判斷和hook
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
有了前兩篇文章的基礎(chǔ),我們很清楚,排除掉非空判斷和hook相關(guān)邏輯,實(shí)際上這個方法返回了一個ObservableSubscribeOn對象。
我們有理由猜測這個ObservableSubscribeOn應(yīng)該和上文的ObservableMap及ObservableDoOnEach相似,都是Observable的一個包裝類(裝飾器):
//1.和上文基本一樣,ObservableSubscribeOn也是Observable的一個裝飾器
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
//2.存儲上游的ObservableSource和調(diào)度器
super(source);
this.scheduler = scheduler;
}
@Override
public void subscribeActual(final Observer<? super T> s) {
//3.new 一個SubscribeOnObserver
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
//4.回調(diào)方法,這說明下游的onSubscribe回調(diào)方法所在線程和線程調(diào)度無關(guān)
// 是訂閱時所在的線程
s.onSubscribe(parent);
//5.立即執(zhí)行線程調(diào)度
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
}
前兩步我們不需要 再多解釋,直接看第三點(diǎn),我們看看SubscribeOnObserver這個類:
SubscribeOnObserver
static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {
private static final long serialVersionUID = 8094547886072529208L;
//下游的Observer
final Observer<? super T> actual;
//保存上游的Disposable,自身dispose時,連同上游一起dispose
final AtomicReference<Disposable> s;
SubscribeOnObserver(Observer<? super T> actual) {
this.actual = actual;
this.s = new AtomicReference<Disposable>();
}
@Override
public void onSubscribe(Disposable s) {
DisposableHelper.setOnce(this.s, s);
}
@Override
public void onNext(T t) {
actual.onNext(t);
}
@Override
public void onError(Throwable t) {
actual.onError(t);
}
@Override
public void onComplete() {
actual.onComplete();
}
@Override
public void dispose() {
DisposableHelper.dispose(s);
DisposableHelper.dispose(this);
}
類似Observable和ObservableMap,SubscribeOnObserver同樣是Disposable和Observer的一個裝飾器,提供了對下游數(shù)據(jù)的傳遞,以及將task dispose的接口。
第4步我們之前就講過了,直接看第5步:
//5.立即執(zhí)行線程調(diào)度
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
我們看看SubscribeTask這個類:
SubscribeTask
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
@Override
public void run() {
source.subscribe(parent);
}
}
難以置信的簡單,SubscribeTask 僅僅是一個Runnable 接口的實(shí)現(xiàn)類而已,通過將SubscribeOnObserver作為參數(shù)存起來,在run()方法中添加了上游Observable的被訂閱事件,就沒有了別的操作,
接下來我們看一下scheduler.scheduleDirect(SubscribeTask)中的代碼:
public abstract class Scheduler {
//...
public Disposable scheduleDirect(@NonNull Runnable run) {
return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
}
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
// Worker 本身就是Disposable 的實(shí)現(xiàn)類
// 請注意, createWorker()所創(chuàng)建的worker,
// 實(shí)際就是Schdulers.io()所提供的IoScheduler所創(chuàng)建的worker
final Worker w = createWorker();
//hook相關(guān)
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
DisposeTask task = new DisposeTask(decoratedRun, w);
//即 worker.schedule(task, 0, TimeUnit.NANOSECONDS): 立即執(zhí)行task
w.schedule(task, delay, unit);
return task;
}
//...
}
我們不要追究過深,我們看一下這個createWorker方法的注釋說明:
/**
* Retrieves or creates a new {@link Scheduler.Worker} that represents serial execution of actions.
* 檢索或創(chuàng)建一個新的{@link Scheduler.Worker}表示一系列的action
*
* When work is completed it should be unsubscribed using {@link Scheduler.Worker#dispose()}.
* 當(dāng)work完成后,應(yīng)使用{@link Scheduler.Worker#dispose()}取消訂閱。
*
* Work on a {@link Scheduler.Worker} is guaranteed to be sequential.
* {@link Scheduler.Worker} 上面的work保證是順序執(zhí)行的
*/
現(xiàn)在我們知道了:
我們通過調(diào)用subscribeOn()傳入Scheduler,當(dāng)下游ObservableSource被訂閱時(請注意,訂閱順序是由下到上的),距離最近的線程調(diào)度subscribeOn()方法中,保存的Scheduler會創(chuàng)建一個worker(對應(yīng)相應(yīng)的線程,本文中為IoScheduler),在其對應(yīng)的線程中,立即執(zhí)行task
關(guān)于對應(yīng)的Worker相關(guān)和IoScheduler相關(guān),篇幅所限,本文不做細(xì)講,有興趣的同學(xué)可以自行研究。
多次subscribeOn()
現(xiàn)在考慮一個問題,假如在我們的代碼中,多次使用了subscribeOn()代碼,到線程會怎么處理呢?
上文已經(jīng)講到了,不管我們怎么通過subscribeOn()方法切換線程,由于訂閱執(zhí)行順序是由下到上,因此當(dāng)最上游的ObservableSource被訂閱時,所在線程當(dāng)然是距離上游最近的subscribeOn()所提供的線程,即最終Observable總是在第一個subscribeOn()所在的線程中執(zhí)行。
二、observeOn()
先看observeOn()內(nèi)部,果然是hook+Observable的包裝類:
public final Observable<T> observeOn(Scheduler scheduler) {
return observeOn(scheduler, false, bufferSize());
}
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
//實(shí)例化ObservableObserveOn對象并返回
return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}
再看ObservableObserveOn:
public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
final boolean delayError;
final int bufferSize;
public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
super(source);
//1.相關(guān)依賴注入
this.scheduler = scheduler;
this.delayError = delayError;
this.bufferSize = bufferSize;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
//2.創(chuàng)建主線程的worker
Scheduler.Worker w = scheduler.createWorker();
//3.上游數(shù)據(jù)源被訂閱
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
}
和subscribeOn()不同的是,我們并不是立即在對應(yīng)的線程執(zhí)行task,而是將對應(yīng)的線程(實(shí)際上是worker)作為參數(shù),實(shí)例化ObserveOnObserver并存儲起來。
當(dāng)上游的數(shù)據(jù)傳遞過來時,ObserveOnObserver執(zhí)行對應(yīng)的方法,比如onNext(T),再切換到對應(yīng)線程中,并交由下游的Observer去接收:
ObserveOnObserver
ObserveOnObserver中代碼極多,我們簡單了解原理后,以onNext(T)為例:
static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
implements Observer<T>, Runnable {
//...省略其他代碼
ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {
this.actual = actual;
this.worker = worker;
this.delayError = delayError;
this.bufferSize = bufferSize;
}
//隊(duì)列
SimpleQueue<T> queue;
@Override
public void onNext(T t) {
if (done) {
return;
}
//將數(shù)據(jù)存入隊(duì)列
if (sourceMode != QueueDisposable.ASYNC) {
queue.offer(t);
}
//對應(yīng)線程取出數(shù)據(jù)并交由下游的Observer
schedule();
}
void schedule() {
if (getAndIncrement() == 0) {
worker.schedule(this);
}
}
//...省略其他代碼
}
多次observerOn()
由上文得知,與subscribeOn()相反,observerOn()操作會將切換到對應(yīng)的線程,然后交由下游的Observer處理,因此observerOn()僅對下游的Observer生效,并且,如果多次調(diào)用,observerOn()的線程調(diào)度會持續(xù)到下一個observerOn()操作之前。
我在《RxJava(11-線程調(diào)度Scheduler)》 @open-Xu文章中看到了這張圖片,完美詮釋了observerOn()的原理:
注意:
接下來的內(nèi)容都是@open-Xu在他的文章《RxJava(11-線程調(diào)度Scheduler)》中總結(jié)的筆記,為了方便以后翻閱,特此轉(zhuǎn)載過來,下文的轉(zhuǎn)載請@原作者open-Xu并注明原文地址,謝謝!
調(diào)度器的種類
| 調(diào)度器類型 | 效果 |
|---|---|
| Schedulers.computation(?) | 用于計(jì)算任務(wù),如事件循環(huán)或和回調(diào)處理,不要用于IO操作(IO操作請使用Schedulers.io());默認(rèn)線程數(shù)等于處理器的數(shù)量 |
| Schedulers.from(executor) | 使用指定的Executor作為調(diào)度器 |
| Schedulers.immediate(?) | 在當(dāng)前線程立即開始執(zhí)行任務(wù) |
| Schedulers.newThread(?) | 為每個任務(wù)創(chuàng)建一個新線程 |
| Schedulers.io(?) | 用于IO密集型任務(wù),如異步阻塞IO操作,這個調(diào)度器的線程池會根據(jù)需要增長;對于普通的計(jì)算任務(wù),請使用Schedulers.computation();Schedulers.io(?)默認(rèn)是一個CachedThreadScheduler,很像一個有線程緩存的新線程調(diào)度器 |
| Schedulers.trampoline(?) | 當(dāng)其它排隊(duì)的任務(wù)完成后,在當(dāng)前線程排隊(duì)開始執(zhí)行 |
| AndroidSchedulers.mainThread(?) | 主線程,UI線程,可以用于更新界面 |
總結(jié)
subscribeOn()
- 訂閱順序當(dāng)從下到上,上游的ObservableSource被訂閱時,先切換線程,然后立即執(zhí)行task;
- 當(dāng)存在多個subscribeOn()方法時,僅第一個subscribeOn()有效。
observerOn()
- 訂閱順序當(dāng)從下到上,上游的ObservableSource被訂閱時,會將對應(yīng)的worker創(chuàng)建并作為構(gòu)造參數(shù)存儲在Observer的裝飾器中,并不會立即切換線程;
- 當(dāng)數(shù)據(jù)由上游發(fā)送過來時,先將數(shù)據(jù)存儲到隊(duì)列中,然后切換線程,然后在新的線程中將數(shù)據(jù)發(fā)送給下游的Observer;
- 當(dāng)存在多個observerOn()方法時,僅對距下游下一個observerOn()之前的observer有效