本文只分析 RxJava 的基本原理與流程,不深入探討具體操作符的實(shí)現(xiàn)細(xì)節(jié)。
背景
為什么使用 RxJava?
解決異步回調(diào)多重嵌套
比如類(lèi)似的代碼:
new Thread(new Runnable() {
@Override
public void run() {
//do something
handler.post(new Runnable() {
@Override
public void run() {
//getResult do something
}
});
}
}).start();
RxJava 的實(shí)現(xiàn)
Observable.create((ObservableOnSubscribe<String>) e -> {
//do something
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(s -> {
//getResult do something
});
鏈?zhǔn)秸{(diào)用,解決代碼多層嵌套問(wèn)題,在邏輯越發(fā)復(fù)雜時(shí)優(yōu)勢(shì)也將越發(fā)明顯。
解決回調(diào)地獄
當(dāng)有倆個(gè)及以上要求順序訪問(wèn)的網(wǎng)絡(luò)請(qǐng)求,往往會(huì)出現(xiàn)下面的代碼:
Net.mockNet(TOKEN_URL, new Net.Callback() {
@Override
public void onResponse(String result) {
// getResult do something
Net.mockNet(CART_URL, new Net.Callback() {
@Override
public void onResponse(String result) {
// getResult do something
}
});
}
});
這種不優(yōu)雅的實(shí)現(xiàn),可以用 RxJava 解決:
Observable<String> o1 = Observable.create((ObservableOnSubscribe<String>) e -> e.onNext(Net.mockNet(TOKEN_URL, 2000)));
o1.concatMap((Function<String, ObservableSource<?>>) s -> {
return Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
//getResult do something
e.onNext(Net.mockNet(CART_URL, 2000));
}
});
})
.cast(String.class)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(s -> {
//getResult do something
});
看似并不簡(jiǎn)潔,但是如果我們把創(chuàng)建 Observable 對(duì)象的過(guò)程(o1、o2)封裝為方法(暫不考慮封裝的實(shí)現(xiàn)),代碼就會(huì)變?yōu)槿缦滦Ч?/p>
askToken().concatMap(s -> askCart((String) s)) // getResult do something
//.concatMap(s -> askAnother((String) s)) // 你還可以接著續(xù)接
.cast(String.class)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(s ->
// getResult do something
);
實(shí)際上使用 Retrofit 框架后,代碼就會(huì)變得像上述一樣簡(jiǎn)潔,而且就邏輯而言也更加清晰易懂。
其它
配合其它框架實(shí)現(xiàn)更多優(yōu)勢(shì)功能。
使用
RxJava 的使用教程網(wǎng)上有很多,這里不作具體說(shuō)明。
源碼分析
基本原理分析 - 1
Rxjava 2.0 和 1.0 的核心原理區(qū)別不大,都是基于觀察者模式。
關(guān)于觀察者模式的核心原理,可以參考下面的演示代碼(初級(jí) demo,僅用作啟發(fā)):
/**
* ?? 觀察者將實(shí)例交給被觀察者,并且當(dāng)被觀察者數(shù)據(jù)變化時(shí),被觀察者'主動(dòng)'去調(diào)用觀察者實(shí)例的方法。
*/
//被觀察者
public class MyObservable {
private MyObserver myObserver;
public void setObserver(MyObserver observer) {
this.myObserver = observer;
}
public void deleteObserver() {
this.myObserver = null;
}
protected void notifyData(Object obj) {
if (myObserver != null) {
myObserver.updateData(obj);
}
}
}
//觀察者
public interface MyObserver {
void updateData(Object obj);
}
首先我們從 RxJava 最基礎(chǔ)的實(shí)現(xiàn)開(kāi)始,研究其原理:
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
// todo something
e.onNext("test");
}
})
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String value) {
//回調(diào)
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
Observable 是被觀察者,Observer 是觀察者,根據(jù)觀察者模式原理,消息是從被觀察者發(fā)送給觀察者的,所以我們從 Observable 的代碼開(kāi)始分析,先看 create 方法:
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
閱讀源碼注意不能被細(xì)節(jié)牽絆,要充分利用命名的意義。這里我們可以大致推斷出來(lái),我們得到的 Observable 就是 new ObservableCreate<T>(source),閱讀 RxJavaPlugins.onAssembly 源碼也能印證這一點(diǎn),這里就不深入討論,直接看 ObservableCreate 源碼。
public final class ObservableCreate<T> extends Observable<T> {
final ObservableOnSubscribe<T> source;
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
static final class CreateEmitter<T> extends AtomicReference<Disposable> implements ObservableEmitter<T>, Disposable {
final Observer<? super T> observer;
CreateEmitter(Observer<? super T> observer) {
this.observer = observer;
}
@Override
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if (!isDisposed()) {
observer.onNext(t);
}
}
@Override
public void onError(Throwable t) {
if (t == null) {
t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
}
if (!isDisposed()) {
try {
observer.onError(t);
} finally {
dispose();
}
} else {
RxJavaPlugins.onError(t);
}
}
@Override
public void onComplete() {
if (!isDisposed()) {
try {
observer.onComplete();
} finally {
dispose();
}
}
}
...
}
...
這里截取了類(lèi)的部分代碼,先分析它的關(guān)鍵方法 subscribeActual()。
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
subscribeActual() 最終調(diào)用了 source.subscribe(parent);,從上下文可知 source 即我們重寫(xiě)并傳入的 ObservableOnSubscribe 的參數(shù),所以 subscribeActual() 最終就是執(zhí)行我們 Observable.create 實(shí)現(xiàn)的 subscribe() 方法。
而 parent,就是在 Observable.create 中調(diào)用 emitter.onNext() 的 ObservableEmitter 的具體實(shí)現(xiàn)。它的代碼也在上述源碼中:
@Override
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if (!isDisposed()) {
observer.onNext(t);
}
}
也就是說(shuō) Emitter 作為發(fā)射器,最終還是將 onNext 還是交給了觀察者 observer.onNext()。
至此下游鏈路通了,代碼的執(zhí)行順序?yàn)?ObservableCreate.subscribeActual() → ObservableOnSubscribe.subscribe() → CreateEmitter.onNext() → Observer.onNext()。
那么是什么時(shí)候調(diào)用的 subscribeActual() 的呢?
我們知道被觀察者通知觀察者需要持有觀察者的實(shí)例,所以在這個(gè)問(wèn)題之前,我們先分析觀察者 observer 的實(shí)例是多會(huì)交給被觀察者 Observable 的。
在最初的示例代碼中,Observable 在 create 之后,緊接著調(diào)用 subscribe(new Observer()) 將觀察者傳給被觀察者:
obervable.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String value) {
//回調(diào)
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
閱讀 Observable.subscribe 源碼:
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
subscribeActual(observer); //??
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// can't call onError because no way to know if a Disposable has been set or not
// can't call onSubscribe because the call might have set a Subscription already
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}
就是說(shuō) Observable.subscribe() 直接調(diào)用了 Observable.subscribeActual();
所以對(duì)于這個(gè)簡(jiǎn)單模型,代碼執(zhí)行的完整順序?yàn)椋?/p>

基本原理分析 - 2
上面理清了 RxJava 最基礎(chǔ)代碼的實(shí)現(xiàn)原理 -- 觀察者模式。下面稍加擴(kuò)展,通過(guò)分析 subscribeOn、concatMap 等操作符的基本實(shí)現(xiàn),研究整個(gè) Rx 鏈路的串行原理。這里不會(huì)分析 subscribeOn 如何跨線程、concatMap 如何分發(fā),有興趣可以自行研究。
subscribeOn 是指定被觀察者所在線程,看下源碼:
public final Observable<T> subscribeOn(Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
這里,最終返回了 new ObservableSubscribeOn<T>(this, scheduler);
先來(lái)看下 ObservableSubscribeOn 以及它的參數(shù)究竟是誰(shuí)。
1.類(lèi) ObservableSubscribeOn
【ObservableSubscribeOn】 extends 【AbstractObservableWithUpstream】 extends 【Observable】 implements 【ObservableSource】
2.參數(shù) this
ObservableSource
3.參數(shù) scheduler
指定線程類(lèi)型,不具體分析。
subscribeOn() 的返回值 ObservableSubscribeOn 對(duì)象也是 Observable 類(lèi)型,且參數(shù)是 ObservableSource 類(lèi)型,而 Observable 實(shí)現(xiàn)了 ObservableSource 接口。
這種實(shí)現(xiàn)是不是很熟悉,類(lèi)似于代理模式,通過(guò)嵌套同一接口,實(shí)現(xiàn)功能的鏈?zhǔn)蒋B加。也就是說(shuō) subscribeOn() 方法返回了上層 Observable 的代理對(duì)象,我們看下 ObservableSubscribeOn 的源碼:
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
super(source);
this.scheduler = scheduler;
}
@Override
public void subscribeActual(final Observer<? super T> s) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s); //2
s.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new Runnable() {
@Override
public void run() {
source.subscribe(parent); //1.
}
}));
}
static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {
private static final long serialVersionUID = 8094547886072529208L;
final Observer<? super T> actual;
final AtomicReference<Disposable> s;
SubscribeOnObserver(Observer<? super T> actual) {
this.actual = actual;
this.s = new AtomicReference<Disposable>();
}
@Override
public void onSubscribe(Disposable s) {
DisposableHelper.setOnce(this.s, s);
}
@Override
public void onNext(T t) {
actual.onNext(t);
}
...
}
}
注釋 1 處,在線程調(diào)度之后,ObservableSubscribeOn 最終調(diào)用了上層的 ObservableSource(即原 Observable)。
同樣的,注釋 2 處,Observer 也使用了類(lèi)似的嵌套。
所以對(duì)于基礎(chǔ)模型上加 subscribeOn(),代碼執(zhí)行順序?yàn)椋?/p>
ObservableSubscribeOn.subscribe() → ObservableSubscribeOn.subscribeActual() → ObservableCreate.subscribe() → ObservableCreate.subscribeActual() → ObservableOnSubscribe.subscribe() 你的被觀察者實(shí)現(xiàn) → CreateEmitter.onNext() → SubscribeOnObserver.onNext() → Observer.onNext() 你的觀察者實(shí)現(xiàn)。
從上面分析可以推測(cè),正是這種層層代理實(shí)現(xiàn)了 Rxjava 的鏈?zhǔn)秸{(diào)用,實(shí)際代碼調(diào)用順序可以對(duì)比參考如下模型,在把握宏觀流程的情況下,有助于閱讀代碼細(xì)節(jié),如具體操作符的實(shí)現(xiàn)等。

簡(jiǎn)單看下 concatMap 的源碼:
public final class ObservableConcatMap<T, U> extends AbstractObservableWithUpstream<T, U> {
final Function<? super T, ? extends ObservableSource<? extends U>> mapper;
final int bufferSize;
final ErrorMode delayErrors;
public ObservableConcatMap(ObservableSource<T> source, Function<? super T, ? extends ObservableSource<? extends U>> mapper,
int bufferSize, ErrorMode delayErrors) {
super(source);
this.mapper = mapper;
this.delayErrors = delayErrors;
this.bufferSize = Math.max(8, bufferSize);
}
@Override
public void subscribeActual(Observer<? super U> s) {
if (ObservableScalarXMap.tryScalarXMapSubscribe(source, s, mapper)) {
return;
}
if (delayErrors == ErrorMode.IMMEDIATE) {
SerializedObserver<U> serial = new SerializedObserver<U>(s);
source.subscribe(new SourceObserver<T, U>(serial, mapper, bufferSize));
} else {
source.subscribe(new ConcatMapDelayErrorObserver<T, U>(s, mapper, bufferSize, delayErrors == ErrorMode.END));
}
}
...
可以看出大同小異遵循上述模式。其余細(xì)節(jié)有興趣可以自行研究。
總結(jié)
閱讀源碼旨在學(xué)習(xí)框架結(jié)構(gòu),從中獲得啟發(fā),細(xì)節(jié)實(shí)現(xiàn)并非研究重點(diǎn)。
源碼閱讀遵循從宏觀到微觀、從簡(jiǎn)單到復(fù)雜逐步拆解分析的原則,千萬(wàn)不要被細(xì)節(jié)所牽絆。
后面接著分析 Retrofit。