本文只分析 RxJava 的基本原理與流程,不深入探討具體操作符的實現(xiàn)細節(jié)。
背景
為什么使用 RxJava?
解決異步回調(diào)多重嵌套
比如類似的代碼:
new Thread(new Runnable() {
@Override
public void run() {
//do something
handler.post(new Runnable() {
@Override
public void run() {
//getResult do something
}
});
}
}).start();
RxJava 的實現(xiàn)
Observable.create((ObservableOnSubscribe<String>) e -> {
//do something
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(s -> {
//getResult do something
});
鏈式調(diào)用,解決代碼多層嵌套問題,在邏輯越發(fā)復(fù)雜時優(yōu)勢也將越發(fā)明顯。
解決回調(diào)地獄
當(dāng)有倆個及以上要求順序訪問的網(wǎng)絡(luò)請求,往往會出現(xiàn)下面的代碼:
Net.mockNet(TOKEN_URL, new Net.Callback() {
@Override
public void onResponse(String result) {
// getResult do something
Net.mockNet(CART_URL, new Net.Callback() {
@Override
public void onResponse(String result) {
// getResult do something
}
});
}
});
這種不優(yōu)雅的實現(xiàn),可以用 RxJava 解決:
Observable<String> o1 = Observable.create((ObservableOnSubscribe<String>) e -> e.onNext(Net.mockNet(TOKEN_URL, 2000)));
o1.concatMap((Function<String, ObservableSource<?>>) s -> {
return Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
//getResult do something
e.onNext(Net.mockNet(CART_URL, 2000));
}
});
})
.cast(String.class)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(s -> {
//getResult do something
});
看似并不簡潔,但是如果我們把創(chuàng)建 Observable 對象的過程(o1、o2)封裝為方法(暫不考慮封裝的實現(xiàn)),代碼就會變?yōu)槿缦滦Ч?/p>
askToken().concatMap(s -> askCart((String) s)) // getResult do something
//.concatMap(s -> askAnother((String) s)) // 你還可以接著續(xù)接
.cast(String.class)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(s ->
// getResult do something
);
實際上使用 Retrofit 框架后,代碼就會變得像上述一樣簡潔,而且就邏輯而言也更加清晰易懂。
其它
配合其它框架實現(xiàn)更多優(yōu)勢功能。
使用
RxJava 的使用教程網(wǎng)上有很多,這里不作具體說明。
源碼分析
基本原理分析 - 1
Rxjava 2.0 和 1.0 的核心原理區(qū)別不大,都是基于觀察者模式。
關(guān)于觀察者模式的核心原理,可以參考下面的演示代碼(初級 demo,僅用作啟發(fā)):
/**
* ?? 觀察者將實例交給被觀察者,并且當(dāng)被觀察者數(shù)據(jù)變化時,被觀察者'主動'去調(diào)用觀察者實例的方法。
*/
//被觀察者
public class MyObservable {
private MyObserver myObserver;
public void setObserver(MyObserver observer) {
this.myObserver = observer;
}
public void deleteObserver() {
this.myObserver = null;
}
protected void notifyData(Object obj) {
if (myObserver != null) {
myObserver.updateData(obj);
}
}
}
//觀察者
public interface MyObserver {
void updateData(Object obj);
}
首先我們從 RxJava 最基礎(chǔ)的實現(xiàn)開始,研究其原理:
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
// todo something
e.onNext("test");
}
})
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String value) {
//回調(diào)
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
Observable 是被觀察者,Observer 是觀察者,根據(jù)觀察者模式原理,消息是從被觀察者發(fā)送給觀察者的,所以我們從 Observable 的代碼開始分析,先看 create 方法:
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
閱讀源碼注意不能被細節(jié)牽絆,要充分利用命名的意義。這里我們可以大致推斷出來,我們得到的 Observable 就是 new ObservableCreate<T>(source),閱讀 RxJavaPlugins.onAssembly 源碼也能印證這一點,這里就不深入討論,直接看 ObservableCreate 源碼。
public final class ObservableCreate<T> extends Observable<T> {
final ObservableOnSubscribe<T> source;
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
static final class CreateEmitter<T> extends AtomicReference<Disposable> implements ObservableEmitter<T>, Disposable {
final Observer<? super T> observer;
CreateEmitter(Observer<? super T> observer) {
this.observer = observer;
}
@Override
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if (!isDisposed()) {
observer.onNext(t);
}
}
@Override
public void onError(Throwable t) {
if (t == null) {
t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
}
if (!isDisposed()) {
try {
observer.onError(t);
} finally {
dispose();
}
} else {
RxJavaPlugins.onError(t);
}
}
@Override
public void onComplete() {
if (!isDisposed()) {
try {
observer.onComplete();
} finally {
dispose();
}
}
}
...
}
...
這里截取了類的部分代碼,先分析它的關(guān)鍵方法 subscribeActual()。
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
subscribeActual() 最終調(diào)用了 source.subscribe(parent);,從上下文可知 source 即我們重寫并傳入的 ObservableOnSubscribe 的參數(shù),所以 subscribeActual() 最終就是執(zhí)行我們 Observable.create 實現(xiàn)的 subscribe() 方法。
而 parent,就是在 Observable.create 中調(diào)用 emitter.onNext() 的 ObservableEmitter 的具體實現(xiàn)。它的代碼也在上述源碼中:
@Override
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if (!isDisposed()) {
observer.onNext(t);
}
}
也就是說 Emitter 作為發(fā)射器,最終還是將 onNext 還是交給了觀察者 observer.onNext()。
至此下游鏈路通了,代碼的執(zhí)行順序為 ObservableCreate.subscribeActual() → ObservableOnSubscribe.subscribe() → CreateEmitter.onNext() → Observer.onNext()。
那么是什么時候調(diào)用的 subscribeActual() 的呢?
我們知道被觀察者通知觀察者需要持有觀察者的實例,所以在這個問題之前,我們先分析觀察者 observer 的實例是多會交給被觀察者 Observable 的。
在最初的示例代碼中,Observable 在 create 之后,緊接著調(diào)用 subscribe(new Observer()) 將觀察者傳給被觀察者:
obervable.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String value) {
//回調(diào)
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
閱讀 Observable.subscribe 源碼:
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
subscribeActual(observer); //??
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// can't call onError because no way to know if a Disposable has been set or not
// can't call onSubscribe because the call might have set a Subscription already
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}
就是說 Observable.subscribe() 直接調(diào)用了 Observable.subscribeActual();
所以對于這個簡單模型,代碼執(zhí)行的完整順序為:

基本原理分析 - 2
上面理清了 RxJava 最基礎(chǔ)代碼的實現(xiàn)原理 -- 觀察者模式。下面稍加擴展,通過分析 subscribeOn、concatMap 等操作符的基本實現(xiàn),研究整個 Rx 鏈路的串行原理。這里不會分析 subscribeOn 如何跨線程、concatMap 如何分發(fā),有興趣可以自行研究。
subscribeOn 是指定被觀察者所在線程,看下源碼:
public final Observable<T> subscribeOn(Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
這里,最終返回了 new ObservableSubscribeOn<T>(this, scheduler);
先來看下 ObservableSubscribeOn 以及它的參數(shù)究竟是誰。
1.類 ObservableSubscribeOn
【ObservableSubscribeOn】 extends 【AbstractObservableWithUpstream】 extends 【Observable】 implements 【ObservableSource】
2.參數(shù) this
ObservableSource
3.參數(shù) scheduler
指定線程類型,不具體分析。
subscribeOn() 的返回值 ObservableSubscribeOn 對象也是 Observable 類型,且參數(shù)是 ObservableSource 類型,而 Observable 實現(xiàn)了 ObservableSource 接口。
這種實現(xiàn)是不是很熟悉,類似于代理模式,通過嵌套同一接口,實現(xiàn)功能的鏈式疊加。也就是說 subscribeOn() 方法返回了上層 Observable 的代理對象,我們看下 ObservableSubscribeOn 的源碼:
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
super(source);
this.scheduler = scheduler;
}
@Override
public void subscribeActual(final Observer<? super T> s) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s); //2
s.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new Runnable() {
@Override
public void run() {
source.subscribe(parent); //1.
}
}));
}
static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {
private static final long serialVersionUID = 8094547886072529208L;
final Observer<? super T> actual;
final AtomicReference<Disposable> s;
SubscribeOnObserver(Observer<? super T> actual) {
this.actual = actual;
this.s = new AtomicReference<Disposable>();
}
@Override
public void onSubscribe(Disposable s) {
DisposableHelper.setOnce(this.s, s);
}
@Override
public void onNext(T t) {
actual.onNext(t);
}
...
}
}
注釋 1 處,在線程調(diào)度之后,ObservableSubscribeOn 最終調(diào)用了上層的 ObservableSource(即原 Observable)。
同樣的,注釋 2 處,Observer 也使用了類似的嵌套。
所以對于基礎(chǔ)模型上加 subscribeOn(),代碼執(zhí)行順序為:
ObservableSubscribeOn.subscribe() → ObservableSubscribeOn.subscribeActual() → ObservableCreate.subscribe() → ObservableCreate.subscribeActual() → ObservableOnSubscribe.subscribe() 你的被觀察者實現(xiàn) → CreateEmitter.onNext() → SubscribeOnObserver.onNext() → Observer.onNext() 你的觀察者實現(xiàn)。
從上面分析可以推測,正是這種層層代理實現(xiàn)了 Rxjava 的鏈式調(diào)用,實際代碼調(diào)用順序可以對比參考如下模型,在把握宏觀流程的情況下,有助于閱讀代碼細節(jié),如具體操作符的實現(xiàn)等。

簡單看下 concatMap 的源碼:
public final class ObservableConcatMap<T, U> extends AbstractObservableWithUpstream<T, U> {
final Function<? super T, ? extends ObservableSource<? extends U>> mapper;
final int bufferSize;
final ErrorMode delayErrors;
public ObservableConcatMap(ObservableSource<T> source, Function<? super T, ? extends ObservableSource<? extends U>> mapper,
int bufferSize, ErrorMode delayErrors) {
super(source);
this.mapper = mapper;
this.delayErrors = delayErrors;
this.bufferSize = Math.max(8, bufferSize);
}
@Override
public void subscribeActual(Observer<? super U> s) {
if (ObservableScalarXMap.tryScalarXMapSubscribe(source, s, mapper)) {
return;
}
if (delayErrors == ErrorMode.IMMEDIATE) {
SerializedObserver<U> serial = new SerializedObserver<U>(s);
source.subscribe(new SourceObserver<T, U>(serial, mapper, bufferSize));
} else {
source.subscribe(new ConcatMapDelayErrorObserver<T, U>(s, mapper, bufferSize, delayErrors == ErrorMode.END));
}
}
...
可以看出大同小異遵循上述模式。其余細節(jié)有興趣可以自行研究。
總結(jié)
閱讀源碼旨在學(xué)習(xí)框架結(jié)構(gòu),從中獲得啟發(fā),細節(jié)實現(xiàn)并非研究重點。
源碼閱讀遵循從宏觀到微觀、從簡單到復(fù)雜逐步拆解分析的原則,千萬不要被細節(jié)所牽絆。
后面接著分析 Retrofit。