RxJava 相信各位已經(jīng)使用了很久,大部分人在剛學(xué)習(xí) RxJava 感嘆切換線程的方便,調(diào)用邏輯清晰的同時(shí),并不知道其中的原理,主要是靠記住運(yùn)行的順序。
隨著我們?cè)O(shè)計(jì)出的 RxJava流 越來越復(fù)雜,一些復(fù)雜的問題并不能靠著記住的運(yùn)行順序就能解決。
下面,就通過最常用的操作符的源碼來看看所謂的流是什么運(yùn)行的。
首先我們用Single舉例,設(shè)計(jì)一個(gè)最基本的 RxJava 流,只有一個(gè) Observable(ColdObservable) 和Obsever:
Disposable disposable = Single.just("wtf")
.subscribe(it -> Log.i("subscribe", it));
上游發(fā)送一個(gè)"wtf" ,下游接受時(shí)將其打印出來。上游發(fā)送端使用 Single.just 作為創(chuàng)建方法,
看一下 just() 方法里做了什么。
public static <T> Single<T> just(final T item) {
ObjectHelper.requireNonNull(item, "value is null");
return RxJavaPlugins.onAssembly(new SingleJust<T>(item));
}
public static <T> Single<T> onAssembly(@NonNull Single<T> source) {
Function<? super Single, ? extends Single> f = onSingleAssembly;
if (f != null) {
return apply(f, source);
}
return source;
}
其中 ObjectHelper.requireNonNull 只是空檢查。
RxJavaPlugins.onAssembly 方法,這個(gè)方法其實(shí)就是通過一個(gè)全局的變量 onSingleAssembly 來對(duì)方法進(jìn)行 Hook ,這一系列xxxAssembly全局變量默認(rèn)為空,所以實(shí)際上當(dāng)我們沒有設(shè)置的時(shí)候其實(shí) just 方法是直接返回了一個(gè) 新實(shí)例化的SingleJust對(duì)象。
再看看SingleJust內(nèi)部:
public final class SingleJust<T> extends Single<T> {
final T value;
public SingleJust(T value) {
this.value = value;
}
@Override
protected void subscribeActual(SingleObserver<? super T> observer) {
observer.onSubscribe(Disposables.disposed());
observer.onSuccess(value);
}
}
實(shí)例化的時(shí)候只是將值保存了下來,沒有其它操作。
下一步調(diào)用subscribe()來啟動(dòng)這個(gè)流(ColdObservable),然后看看subscribe中做了什么:
public final void subscribe(SingleObserver<? super T> subscriber) {
ObjectHelper.requireNonNull(subscriber, "subscriber is null");
subscriber = RxJavaPlugins.onSubscribe(this, subscriber);
ObjectHelper.requireNonNull(subscriber, "subscriber returned by the RxJavaPlugins hook is null");
try {
//核心邏輯
subscribeActual(subscriber);
} catch (NullPointerException ex) {
throw ex;
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
NullPointerException npe = new NullPointerException("subscribeActual failed");
npe.initCause(ex);
throw npe;
}
}
同樣 RxJavaPlugins.onSubscribe 默認(rèn)沒有作用,實(shí)際的核心邏輯是調(diào)用了subscribeActual(SingleObserver)。
對(duì)于我們上面設(shè)計(jì)的流,則是調(diào)用了 SingleJust 中的 subscribeActual(SingleObserver)
回顧上面 SingleJust 中 subscribeActual(SingleObserver) 的實(shí)現(xiàn):
observer.onSubscribe(Disposables.disposed());
observer.onSuccess(value);
得到兩個(gè)信息
- 首先調(diào)用下游觀察者
SingleObserver的OnSubscribe方法并傳遞用于取消操作的Disposable - 調(diào)用
OnSuccess方法并傳遞之前保存下來的value
Map 操作符
現(xiàn)在我們加入一個(gè)常用且重要的Map操作到流中
Disposable disposable = Single.just("wtf")
.map(it-> 0)
.subscribe(it -> Log.i("subscribe", String.of(it)));
上面這個(gè)流包括了三種典型的操作 創(chuàng)建Creation 操作符Transformation和 訂閱Subscribe。
依然先檢查map() 方法,可以看到其中實(shí)例化了一個(gè)SingleMap
public final <R> Single<R> map(Function<? super T, ? extends R> mapper) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new SingleMap<T, R>(this, mapper));
}
再看看 SingleMap
public final class SingleMap<T, R> extends Single<R> {
final SingleSource<? extends T> source;
final Function<? super T, ? extends R> mapper;
public SingleMap(SingleSource<? extends T> source, Function<? super T, ? extends R> mapper) {
this.source = source;
this.mapper = mapper;
}
@Override
protected void subscribeActual(final SingleObserver<? super R> t) {
source.subscribe(new MapSingleObserver<T, R>(t, mapper));
}
static final class MapSingleObserver<T, R> implements SingleObserver<T> {
final SingleObserver<? super R> t;
final Function<? super T, ? extends R> mapper;
MapSingleObserver(SingleObserver<? super R> t, Function<? super T, ? extends R> mapper) {
this.t = t;
this.mapper = mapper;
}
@Override
public void onSubscribe(Disposable d) {
t.onSubscribe(d);
}
@Override
public void onSuccess(T value) {
R v;
try {
v = ObjectHelper.requireNonNull(mapper.apply(value), "The mapper function returned a null value.");
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
onError(e);
return;
}
t.onSuccess(v);
}
@Override
public void onError(Throwable e) {
t.onError(e);
}
}
}
類中信息稍微復(fù)雜一些:
- 首先我們關(guān)注在
SingleMap實(shí)例化的時(shí)候也是只做了保存數(shù)據(jù)的操作,而沒有實(shí)際邏輯:將流的上游保存為source將數(shù)據(jù)轉(zhuǎn)換的方法保存為mapper - 第二步我們知道下游觀察者
SingleObserver會(huì)調(diào)用核心邏輯subscribeActual方法來啟動(dòng)流 - 在這里的
subscribeActual方法中可以看到幾個(gè)重要的信息-
MapSingleObserver是一個(gè)觀察者 -
MapSingleObserver保存了下游的觀察者SingleObserver以及mapper - 上游
source被MapSingleObserver訂閱
-
由此可以看出在SingleMap被下游觀察者訂閱了之后,實(shí)例化了一個(gè)新的觀察者MapSingleObserver并保存下游觀察者SingleObserver的信息,再去訂閱上游SingleJust。
這種模式創(chuàng)建了一個(gè)裝飾類,用來包裝原有的類,并在保持類方法簽名完整性的前提下,提供了額外的功能的設(shè)計(jì)模式稱為裝飾者模式。
總結(jié)上面的執(zhí)行順序:
- 在
Rx流的最后一步調(diào)用subscribe啟動(dòng)流(ColdObservable) - 首先執(zhí)行
SingleMap中的subscribeActual方法,其中包括生成新的MapSingleObserver并訂閱SingleJust - 執(zhí)行
SingleJust中的subscribeActual:調(diào)用下游MapSingleObserver的onSubscribeonSuccess方法 -
MapSingleObserver中的onSubsribeonSuccess方法也很簡單,分別調(diào)用下游Observer的onSubsribe``onSuccess(異常時(shí) onError)方法
observeOn 操作符
Rxjava首先被大家津津樂道之處是可以方便的切換線程,避免Callback Hell,現(xiàn)在來看看線程切換操作符。
我們加入線程切換操作符 observeOn
Disposable disposable = Single.just("wtf")
.map(it-> 0)
.observeOn(Schedulers.io())
.subscribe(it -> Log.i("subscribe", String.of(it)));
同樣的,在 observeOn方法中實(shí)例化了一個(gè)SingleObserveOn
public final Single<T> observeOn(final Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new SingleObserveOn<T>(this, scheduler));
}
繼續(xù)看SingleObserveOn類中信息
public final class SingleObserveOn<T> extends Single<T> {
final SingleSource<T> source;
final Scheduler scheduler;
public SingleObserveOn(SingleSource<T> source, Scheduler scheduler) {
this.source = source;
this.scheduler = scheduler;
}
@Override
protected void subscribeActual(final SingleObserver<? super T> s) {
source.subscribe(new ObserveOnSingleObserver<T>(s, scheduler));
}
static final class ObserveOnSingleObserver<T> extends AtomicReference<Disposable>
implements SingleObserver<T>, Disposable, Runnable {
private static final long serialVersionUID = 3528003840217436037L;
final SingleObserver<? super T> actual;
final Scheduler scheduler;
T value;
Throwable error;
ObserveOnSingleObserver(SingleObserver<? super T> actual, Scheduler scheduler) {
this.actual = actual;
this.scheduler = scheduler;
}
@Override
public void onSubscribe(Disposable d) {
if (DisposableHelper.setOnce(this, d)) {
actual.onSubscribe(this);
}
}
@Override
public void onSuccess(T value) {
this.value = value;
Disposable d = scheduler.scheduleDirect(this);
DisposableHelper.replace(this, d);
}
@Override
public void onError(Throwable e) {
this.error = e;
Disposable d = scheduler.scheduleDirect(this);
DisposableHelper.replace(this, d);
}
@Override
public void run() {
Throwable ex = error;
if (ex != null) {
actual.onError(ex);
} else {
actual.onSuccess(value);
}
}
@Override
public void dispose() {
DisposableHelper.dispose(this);
}
@Override
public boolean isDisposed() {
return DisposableHelper.isDisposed(get());
}
}
}
類似的
- 構(gòu)造函數(shù)中保存了上游和線程切換的信息
-
subscribeActual實(shí)例化了一個(gè)新的觀察者ObserveOnSingleObserver
不同的
-
ObserveOnSingleObserver還繼承了AtomicReference<Disposable>、實(shí)現(xiàn)了Disposable``Runnable接口 -
onSuccess``onError中都沒有直接調(diào)用下游的onSuccessonError方法,而是調(diào)用了Disposable d = scheduler.scheduleDirect(this);來執(zhí)行run方法中的邏輯,而run方法中的邏輯則是調(diào)用下游的onSuccessonError方法
查看schedulerDirect內(nèi)部信息
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
final Worker w = createWorker();
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
DisposeTask task = new DisposeTask(decoratedRun, w);
w.schedule(task, delay, unit);
return task;
}
創(chuàng)建了一個(gè)對(duì)應(yīng)線程的Worker和一個(gè)可用于取消的DisposeTask并執(zhí)行,對(duì)于IoScheduler則是創(chuàng)建了EventLoopWorker,再看看EventLoopWorker中的信息。
@Override
public Worker createWorker() {
return new EventLoopWorker(pool.get());
}
static final class EventLoopWorker extends Scheduler.Worker {
private final CompositeDisposable tasks;
private final CachedWorkerPool pool;
private final ThreadWorker threadWorker;
final AtomicBoolean once = new AtomicBoolean();
EventLoopWorker(CachedWorkerPool pool) {
this.pool = pool;
this.tasks = new CompositeDisposable();
this.threadWorker = pool.get();
}
@Override
public void dispose() {
if (once.compareAndSet(false, true)) {
tasks.dispose();
// releasing the pool should be the last action
pool.release(threadWorker);
}
}
@Override
public boolean isDisposed() {
return once.get();
}
@NonNull
@Override
public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
if (tasks.isDisposed()) {
// don't schedule, we are unsubscribed
return EmptyDisposable.INSTANCE;
}
return threadWorker.scheduleActual(action, delayTime, unit, tasks);
}
}
EventLoopWorker中則是維護(hù)了一套包含相應(yīng)的線程池、可取消的CompositeDisposable、以及用于運(yùn)行Runable的ThreadWorker??偟膩碚f就是一套可以在相應(yīng)線程運(yùn)行且可取消的類和邏輯。
- 上面則解釋了為什么
observeOn可以切換下游的線程(onSuccessonError) - 同樣解釋了為什么不會(huì)改變
onSubsribe的調(diào)用線程,因?yàn)榭梢钥吹?code>onSubscribe方法中直接調(diào)用了下游的onSucsribe,并沒有受到線程切換的影響。
SubscribeOn
實(shí)際上,subsribeOn 是 RxJava2.x 中比較復(fù)雜也是相較于 RxJava1.x 改動(dòng)比較大的一個(gè)操作符,它甚至?xí)绊懥鞯膱?zhí)行順序。(可以參見唐雪茂寫的 Rxjava流的設(shè)計(jì) 中的1 2兩個(gè)流)
我們現(xiàn)在設(shè)計(jì)兩個(gè)Rx流
Disposable disposable = Single.just("wtf")
.doOnSubsribe(it-> Log.i("doOnSubsribe", 0)
.doOnSubsribe(it-> Log.i("doOnSubsribe", 1)
.doOnSubsribe(it-> Log.i("doOnSubsribe", 2)
.doOnSubsribe(it-> Log.i("doOnSubsribe", 3)
.subscribe(it -> Log.i("subscribe", 4);
Disposable disposable2 = Single.just("wtf")
.doOnSubsribe(it-> Log.i("doOnSubsribe", 0)
.doOnSubsribe(it-> Log.i("doOnSubsribe", 1)
.subscribeOn(Schedulers.io())
.doOnSubsribe(it-> Log.i("doOnSubsribe", 2)
.doOnSubsribe(it-> Log.i("doOnSubsribe", 3)
.subscribe(it -> Log.i("subscribe", 4);
你可能已經(jīng)知道并記住了兩個(gè)流的打印的順序分別是 01234 23014,但是為什么doOnSubsribe方法和RxJava1中調(diào)用順序完全不一樣,為什么通過subscribeOn切換線程會(huì)影響執(zhí)行順序?
先找到 SingleSubscribeOn 類
public final class SingleSubscribeOn<T> extends Single<T> {
final SingleSource<? extends T> source;
final Scheduler scheduler;
public SingleSubscribeOn(SingleSource<? extends T> source, Scheduler scheduler) {
this.source = source;
this.scheduler = scheduler;
}
@Override
protected void subscribeActual(final SingleObserver<? super T> s) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s, source);
//直接調(diào)用下游 onSubscribe
s.onSubscribe(parent);
//再執(zhí)行訂閱上游的方法
Disposable f = scheduler.scheduleDirect(parent);
parent.task.replace(f);
}
static final class SubscribeOnObserver<T>
extends AtomicReference<Disposable>
implements SingleObserver<T>, Disposable, Runnable {
private static final long serialVersionUID = 7000911171163930287L;
final SingleObserver<? super T> actual;
final SequentialDisposable task;
final SingleSource<? extends T> source;
SubscribeOnObserver(SingleObserver<? super T> actual, SingleSource<? extends T> source) {
this.actual = actual;
this.source = source;
this.task = new SequentialDisposable();
}
@Override
public void onSubscribe(Disposable d) {
//沒有繼續(xù)調(diào)用下游的 onSubscribe 方法
DisposableHelper.setOnce(this, d);
}
@Override
public void onSuccess(T value) {
actual.onSuccess(value);
}
@Override
public void onError(Throwable e) {
actual.onError(e);
}
@Override
public void dispose() {
DisposableHelper.dispose(this);
task.dispose();
}
@Override
public boolean isDisposed() {
return DisposableHelper.isDisposed(get());
}
@Override
public void run() {
source.subscribe(this);
}
}
}
同樣的直接看subscribeActual方法及onSubscribe方法,發(fā)現(xiàn)事情并沒有那么簡單,和之前的操作符的邏輯區(qū)別很大:
-
SubscribeOnObserver同樣還繼承了AtomicReference<Disposable>,實(shí)現(xiàn)了Disposable``Runnable接口 - 并沒有直接調(diào)用
subscribe訂閱上游,而是執(zhí)行了其它操作符在onSubscribe中訂閱下游的操作 - 然后再結(jié)合
Disposable f = scheduler.scheduleDirect(parent);和run方法可以知道在新的線程中執(zhí)行了訂閱上游的操作source.subscribe(this); -
onSubsribe中并沒有再繼續(xù)調(diào)用下游的onSubsribe
綜合起來可以知道,本來應(yīng)該在整個(gè)流從下至上訂閱完成后按照從上至下的順序執(zhí)行 onSubscribe的流,在使用subsribeOn操作符的后,在訂閱的時(shí)(執(zhí)行subscribeActual),就開始執(zhí)行下游的onSubscribe且在當(dāng)前線程!然后才在指定的io線程執(zhí)行之下而上的操作,這也是為什么subsribeOn影響的是上游的線程。
小結(jié):
我認(rèn)為實(shí)際上 Rx 使用了很多優(yōu)秀的設(shè)計(jì)將我們各種常用的操作進(jìn)行了封裝,讓我們自由組合使用,其本身并沒有用什么黑科技。例如切換線程本質(zhì)上則是幫我們啟用了一個(gè)新的線程并把接下來的代碼放進(jìn)去執(zhí)行。
當(dāng)然,其中還有很多更深入的內(nèi)容需要我們繼續(xù)發(fā)現(xiàn)和學(xué)習(xí)。