本文首發(fā)于“隨手記技術(shù)團(tuán)隊”公眾號
大概從2015年開始,RxJava1.0開始快速流行起來,短短兩年時間,RxJava在Android開發(fā)中已經(jīng)算是無人不知無人不曉了,加之它與Retrofit等流行框架的完美結(jié)合,已經(jīng)成為Android項目開發(fā)的必備利器。隨手記作為一個大型項目,引入三方框架一直比較慎重,但也從今年初開始,正式引入了RxJava2.0,并配合Retrofit對項目的網(wǎng)絡(luò)框架和繁瑣的異步邏輯進(jìn)行重構(gòu)。RxJava雖然好用,但伴隨而來的是不可避免的學(xué)習(xí)成本,為了讓大家快速的了解RxJava的來龍去脈以及快速上手使用,特地總結(jié)該篇文章。本文將詳細(xì)講解如何快速理解RxJava的操作符,并從源碼角度來分析RxJava操作符的原理。
RxJava的優(yōu)點
簡單來講RxJava是一個簡化異步調(diào)用的庫,但其實它更是一種優(yōu)雅的編程方式和編程思想,當(dāng)你熟悉RxJava的使用方式之后,會很容易愛上它。
我總結(jié)它的優(yōu)點主要有兩個方面:
- 簡潔,免除傳統(tǒng)異步代碼邏輯中的callback hell
- 增加業(yè)務(wù)邏輯代碼的可讀性
關(guān)于第一點大家應(yīng)該都會認(rèn)同,關(guān)于第二點可能有人會有疑惑,因為很多人覺得RxJava大量不明所以的操作符會讓代碼的可讀性變得更差,其實產(chǎn)生這種印象恰恰就是因為沒有掌握RxJava操作符的使用和原理所導(dǎo)致的。
比如隨手記項目中綁定用戶QQ賬號的業(yè)務(wù)邏輯,這段邏輯的代碼涉及三個異步接口,兩個是QQ登錄SDK的,一個是隨手記后臺的,在使用RxJava重構(gòu)前,這段代碼使用了3個AsyncTask,也就是三個嵌套的回調(diào),代碼復(fù)雜,可讀性非常差。而改造之后,它變成了下面這樣子

如果你對這里面的幾個RxJava操作符比較熟悉的話,你會迅速了解我這段代碼做了什么事情,而且不用再去梳理一堆嵌套回調(diào)了,這就是RxJava帶來的可讀性。
所以,學(xué)習(xí)RxJava,理解和掌握操作符是不可避免的第一步。
RxJava2.0與RxJava1.0的關(guān)系
從RxJava1.0到RxJava2.0,基本思想沒有變化,但RxJava2.0按照Reactive-Streams規(guī)范對整個架構(gòu)進(jìn)行了重新設(shè)計,并變更了Maven倉庫依賴地址和包名。所以現(xiàn)在RxJava的github網(wǎng)站中,RxJava1.0和RxJava2.0是兩個獨立的分支,不相互兼容 ,也不能同時使用,而且RxJava1.0再過一段時間也將不再維護(hù)。所以,目前還使用RxJava1.0的,建議盡早切換到RxJava2.0,而如果沒有接觸過RxJava1.0,直接使用和學(xué)習(xí)RxJava2.0就可以了。如果想了解RxJava1.0和RxJava2.0的詳細(xì)區(qū)別,請參考官方文檔。
為行文方便,從此處開始,本文使用Rx來表示RxJava2.x。
Rx的操作符有哪些
剛接觸Rx的人面對一堆各式各樣的操作符會覺得不知如何去學(xué)習(xí)記憶,其實你只需要從整體上了解Rx操作符的類別和掌握一些使用頻率較高的操作符就足夠了,至于其他的操作符,你只需要知道它的使用場景和掌握如何快速理解一個操作符的方法,就可以在需要的時候快速拿來用了。
下圖是我根據(jù)官方文檔總結(jié)的Rx操作符的分類及每個類別下的代表性操作符

從上圖可以看出,Rx的操作符主要十個大類別,每個類別下常用的操作符也就三五個左右,所以只要掌握這些,你就可以應(yīng)付大部分的業(yè)務(wù)場景了。
如何快速理解一個Rx操作符
提到Rx操作符,相信很多人都會對描述Rx操作符的花花綠綠的寶石圖有很大印象。

要快速理解Rx操作符,看懂寶石圖是個快捷有效的方式,現(xiàn)在我們就來詳細(xì)分析一下構(gòu)成寶石圖的各個主要元素。
首先,我們有必要回顧一下Rx中的幾個主要的基類
-
io.reactivex.Flowable: 事件源(0..N個元素), 支持 Reactive-Streams and 背壓 -
io.reactivex.Observable:事件源(0..N個元素), 不支持背壓 -
io.reactivex.Single: 僅發(fā)射一個元素或產(chǎn)生error的事件源, -
io.reactivex.Completable: 不發(fā)射任何元素,只產(chǎn)生completion或error的事件源 -
io.reactivex.Maybe: 不發(fā)射任何元素,或只發(fā)射一個元素,或產(chǎn)生error的事件源 -
Subject: 既是事件源,也是事件接受者
可以看到Rx中最重要的概念就是事件源了,基本上所有的操作符都是針對事件源來進(jìn)行一些轉(zhuǎn)換、組合等操作,而我們最常用的事件源就是Observable了。
本文中我們就以Observable事件源為例來講解Rx的操作符,Observable發(fā)射的事件我們統(tǒng)一稱之為item。首先我們需要詳細(xì)了解一下寶石圖中各個圖像元素的含義:
-
—>:Observable的時間線,從左至右流動 -
★:星星、圓、方塊等表示Observable發(fā)射的item -
|:時間線最后的小豎線表示Observable的事件流已經(jīng)成功發(fā)射完畢了 -
X:時間線最后的X符合表示由于某種原因Observable非正常終止發(fā)射,產(chǎn)生了error
上面幾種元素組合在一起代表一個完整的Observable,也可以稱為源Observable
-->方向朝下的虛線箭頭表示以及中間的長方框表示正在對上面的源Observable進(jìn)行某種轉(zhuǎn)換。長方框里的文字展示了轉(zhuǎn)換的性質(zhì)。下面的Observable是對上面的源Observable轉(zhuǎn)換后的結(jié)果。
掌握了寶石圖的含義,我們就可以根據(jù)某個操作符的寶石圖快速理解這個操作符了。舉幾個例子:
1. map

可以看到,這幅圖表達(dá)的意思是一個源Observable先后發(fā)射了1、2、3的三個item,而經(jīng)過map操作符一轉(zhuǎn)換,就變成了一個發(fā)射了10、20、30三個item的新的Observable。描述操作符的長方框中也清楚的說明了該map操作符進(jìn)行了何種具體的轉(zhuǎn)換操作(圖中的10*x只是一個例子,這個具體的轉(zhuǎn)換函數(shù)是可以自定義的)。
于是,我們就很快速地理解了map操作符的含義和用法,簡單來講,它就是通過一個函數(shù)將一個Observable發(fā)射的item逐個進(jìn)行某種轉(zhuǎn)換。
示例代碼:
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
}
}).map(new Function<Integer, Integer>() {
@Override
public Integer apply(@NonNull Integer integer) throws Exception {
return integer * 10;
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer result) throws Exception {
Log.i(TAG, "accept : " + result +"\n" );
}
});
輸出結(jié)果:

2. zip

根據(jù)
zip的寶石圖,可以知道zip操作符的作用是把多個源Observable發(fā)射的item通過特定函數(shù)組合在一起,然后發(fā)射組合后的item。從圖中還可以看到一個重要的信息是,最終發(fā)射的item是對上面的兩個源Observable發(fā)射的item按照發(fā)射順序逐個組合的結(jié)果,而且最終發(fā)射的1A等item的發(fā)射時間是由組合它的1和A等item中發(fā)射時間較晚的那個item決定的,也正是如此,zip操作符經(jīng)??梢杂迷谛枰瑫r組合處理多個網(wǎng)絡(luò)請求的結(jié)果的業(yè)務(wù)場景中。示例代碼:
Observable.zip(Observable.just(1, 2, 3),
Observable.just("A", "B", "C"),
new BiFunction<Integer, String, String>() {
@Override
public String apply(@NonNull Integer integer, @NonNull String s) throws Exception {
return integer + s;
}
})
.subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
Log.i(TAG, "zip : accept : " + s + "\n");
}
});
輸出結(jié)果:

3. concat

從寶石圖可以看出,
concat操作符的作用就是將兩個源Observable發(fā)射的item連接在一起發(fā)射出來。這里的連接指的是整體連接,被concat操作后產(chǎn)生的Observable會先發(fā)射第一個源Observable的所有item,然后緊接著再發(fā)射第二個源Observable的所有的item。示例代碼:
Observable.concat(Observable.just(1, 2, 3), Observable.just(4, 5, 6))
.subscribe(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer integer) throws Exception {
Log.i(TAG, "concat : " + integer + "\n");
}
});
輸出結(jié)果:

大部分操作符都配有這樣的寶石圖,通過官方文檔或者直接在Rx源碼中查看JavaDoc就可以找到,不再過多舉例。你也可以在rxmarbles這樣的網(wǎng)站上查看更多可以動態(tài)交互的寶石圖。
Rx操作符的原理
要了解操作符的原理,肯定要從源碼入手嘍。所以我們先來簡單擼一遍Rx的最基本的Create操作符的源碼。
Rx的源碼目錄結(jié)構(gòu)是比較清晰的,我們先從Observable.create方法來分析
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
e.onNext("s");
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
// 創(chuàng)建的Observer中多了一個回調(diào)方法onSubscribe,傳遞參數(shù)為Disposable ,Disposable相當(dāng)于RxJava1.x中的Subscription,用于解除訂閱。
}
@Override
public void onNext(@NonNull String s) {
}
@Override
public void onError(@NonNull Throwable e) {
}
@Override
public void onComplete() {
}
});
create方法如下
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
代碼很簡單,第一行判空不用管,第二行調(diào)用RxJavaPlugins的方法是為了實現(xiàn)Rx的hook功能,我們暫時也無需關(guān)注,在一般情況下,第二行代碼會直接返回它的入?yún)⒓?code>ObservableCreate對象,ObservableCreate是Observable的子類,實現(xiàn)了Observable的一些抽象方法比如subscribeActual。事實上Rx的每個操作符都對應(yīng)Observable的一個子類。
這里create方法接受的是一個ObservableOnSubscribe的接口實現(xiàn)類:
/**
* A functional interface that has a {@code subscribe()} method that receives
* an instance of an {@link ObservableEmitter} instance that allows pushing
* events in a cancellation-safe manner.
*
* @param <T> the value type pushed
*/
public interface ObservableOnSubscribe<T> {
/**
* Called for each Observer that subscribes.
* @param e the safe emitter instance, never null
* @throws Exception on error
*/
void subscribe(@NonNull ObservableEmitter<T> e) throws Exception;
}
通過注釋可以知道這個接口的作用是通過一個subscribe方法接受一個ObservableEmitter類型的實例,俗稱發(fā)射器。
Observable.create方法執(zhí)行時,我們傳入的就是一個ObservableOnSubscribe類型的匿名內(nèi)部類,并實現(xiàn)了它的subscribe方法,然后它又被傳入create方法的返回對象ObservableCreate,最終成為ObservableCreate的成員source
public final class ObservableCreate<T> extends Observable<T> {
final ObservableOnSubscribe<T> source;
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
...
接著我們來看Observable的subscribe方法,它的入?yún)⑹且粋€Observer(即觀察者,也就是事件接收者)
@SchedulerSupport(SchedulerSupport.NONE)
@Override
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// can't call onError because no way to know if a Disposable has been set or not
// can't call onSubscribe because the call might have set a Subscription already
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}
最終它會調(diào)用它的子類ObservableCreate的subscribeActual方法:
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
在subscribeActual里首先創(chuàng)建了用于發(fā)射事件的CreateEmitter對象parent,CreateEmitter實現(xiàn)了接口Emitter和Disposable,并持有observer。
這段代碼的關(guān)鍵語句是source.subscribe(parent),這行代碼執(zhí)行后,就會觸發(fā)事件源進(jìn)行發(fā)射事件,即e.onNext("s")會被調(diào)用。細(xì)心的同學(xué)也會注意到這行代碼之前,parent先被傳入了observer的onSubscribe()方法,而在上面我們說過,observer的onSubscribe()方法接受一個Disposable類型的參數(shù),可以用于解除訂閱,之所以能夠解除訂閱,正是因為在觸發(fā)事件發(fā)射之前調(diào)用了observer的onSubscribe(),給了我們調(diào)用CreateEmitter的解除訂閱的方法dispose()的機會。
繼續(xù)來看CreateEmitter的onNext()方法,它最終是通過調(diào)用observer的onNext()方法將事件發(fā)射出去的
static final class CreateEmitter<T>
extends AtomicReference<Disposable>
implements ObservableEmitter<T>, Disposable {
private static final long serialVersionUID = -3434801548987643227L;
final Observer<? super T> observer;
CreateEmitter(Observer<? super T> observer) {
this.observer = observer;
}
@Override
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
// 在真正發(fā)射之前,會先判斷該CreateEmitter是否已經(jīng)解除訂閱
if (!isDisposed()) {
observer.onNext(t);
}
}
...
}
至此,Rx事件源的創(chuàng)建和訂閱的流程就走通了。
下面我們從map操作符來入手看一下Rx操作符的原理,map方法如下
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}
map方法接受一個Function類型的參數(shù)mapper,返回了一個ObservableMap對象,它也是繼承自Observable,而mapper被傳給了ObservableMap的成員function,同時當(dāng)前的源Observable被傳給ObservableMap的成員source,進(jìn)入ObservableMap類
public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
final Function<? super T, ? extends U> function;
public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
super(source);
this.function = function;
}
@Override
public void subscribeActual(Observer<? super U> t) {
source.subscribe(new MapObserver<T, U>(t, function));
}
static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
final Function<? super T, ? extends U> mapper;
MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
super(actual);
this.mapper = mapper;
}
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != NONE) {
actual.onNext(null);
return;
}
U v;
try {
v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
} catch (Throwable ex) {
fail(ex);
return;
}
actual.onNext(v);
}
@Override
public int requestFusion(int mode) {
return transitiveBoundaryFusion(mode);
}
@Nullable
@Override
public U poll() throws Exception {
T t = qs.poll();
return t != null ? ObjectHelper.<U>requireNonNull(mapper.apply(t), "The mapper function returned a null value.") : null;
}
}
}
可以看到這里用到了裝飾者模式,ObservableMap持有來自它上游的事件源source,MapObserver持有來自它下游的事件接收者和我們實現(xiàn)的轉(zhuǎn)換方法function,在subscribeActual()方法中完成ObservableMap對source的訂閱,觸發(fā)MapObserver的onNext()方法,繼而將來自source的原始數(shù)據(jù)經(jīng)過函數(shù)mapper轉(zhuǎn)換后再發(fā)射給下游的事件接收者,從而實現(xiàn)map這一功能。
現(xiàn)在我們終于能夠來總結(jié)一下包含多個操作符時的訂閱流程了,以下面這段代碼為例
Observable.
create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
e.onNext("holen");
}
})
.map(new Function<String, Integer>() {
@Override
public Integer apply(@NonNull String s) throws Exception {
return s.length();
}
})
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull Integer integer) {
}
@Override
public void onError(@NonNull Throwable e) {
}
@Override
public void onComplete() {
}
});
執(zhí)行代碼時,自上而下每一步操作符都會創(chuàng)建一個新的Observable(均為Observable的子類,對應(yīng)不同的操作符),當(dāng)執(zhí)行create時,創(chuàng)建并返回了ObservableCreate,當(dāng)執(zhí)行map時,創(chuàng)建并返回了ObservableMap,并且每一個新的Observable都持有它上游的源Observable(即source)及當(dāng)前涉及到的操作函數(shù)function。當(dāng)最后一步執(zhí)行訂閱方法subscribe時會觸發(fā)ObservableMap的subscribeActual()方法,并將最下游的Observer包裝成MapObserver,同時該方法又會繼續(xù)調(diào)用它所持有ObservableCreate的訂閱方法(即執(zhí)行source.subscribe),由此也會觸發(fā)ObservableCreate的subscribeActual()方法,此時我們的發(fā)射器CreateEmitter才會調(diào)用它的onNext()方法發(fā)射事件,再依次調(diào)用MapObserver的操作函數(shù)mapper和onNext()方法,最終將事件傳遞給了最下游的Observer的onNext()方法。
我簡單的將這段邏輯用下面這幅圖來表示

操作符lift和compose
lift和compose在?Rx中是兩個比較特殊的操作符。
lift讓我們可以對Observer進(jìn)行封裝,在RxJava1.0中大部分變換都基于lift這個神奇的操作符。
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Observable<R> lift(ObservableOperator<? extends R, ? super T> lifter) {
ObjectHelper.requireNonNull(lifter, "onLift is null");
return RxJavaPlugins.onAssembly(new ObservableLift<R, T>(this, lifter));
}
lift操作符接受一個ObservableOperator對象
/**
* Interface to map/wrap a downstream observer to an upstream observer.
*
* @param <Downstream> the value type of the downstream
* @param <Upstream> the value type of the upstream
*/
public interface ObservableOperator<Downstream, Upstream> {
/**
* Applies a function to the child Observer and returns a new parent Observer.
* @param observer the child Observer instance
* @return the parent Observer instance
* @throws Exception on failure
*/
@NonNull
Observer<? super Upstream> apply(@NonNull Observer<? super Downstream> observer) throws Exception;
}
看注釋可以知道,這是一個將下游訂閱者包裝成一個上游訂閱者的接口。類似Map操作符中的MapObserver。
而compose操作符讓我們可以對Observable進(jìn)行封裝
@SuppressWarnings("unchecked")
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Observable<R> compose(ObservableTransformer<? super T, ? extends R> composer) {
return wrap(((ObservableTransformer<T, R>) ObjectHelper.requireNonNull(composer, "composer is null")).apply(this));
}
wrap方法如下,僅僅是走了RxJavaPlugins的流程
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> wrap(ObservableSource<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
if (source instanceof Observable) {
return RxJavaPlugins.onAssembly((Observable<T>)source);
}
return RxJavaPlugins.onAssembly(new ObservableFromUnsafeSource<T>(source));
}
compose方法接受一個ObservableTransformer對象
/**
* Interface to compose Observables.
*
* @param <Upstream> the upstream value type
* @param <Downstream> the downstream value type
*/
public interface ObservableTransformer<Upstream, Downstream> {
/**
* Applies a function to the upstream Observable and returns an ObservableSource with
* optionally different element type.
* @param upstream the upstream Observable instance
* @return the transformed ObservableSource instance
*/
@NonNull
ObservableSource<Downstream> apply(@NonNull Observable<Upstream> upstream);
}
ObservableSource即為我們的基類Observable繼承的唯一接口??醋⑨尶梢灾?,ObservableTransformer是一個組合多個Observable的接口,它通過一個apply()方法接收上游的Observable,進(jìn)行一些操作后,返回新的Observable。
這里組合多個Observable的意思其實就是組合多個操作符,比如我們經(jīng)常會需要在使用Rx進(jìn)行網(wǎng)絡(luò)異步請求時進(jìn)行線程變化,這個操作一般都是差不多的,每次都寫會比較煩,這時我們就可以使用compose把常用的線程變換的幾個操作符組合起來
private final ObservableTransformer schedulersObservable = new ObservableTransformer() {
@Override
public ObservableSource apply(Observable upstream) {
return upstream.subscribeOn(Schedulers.io())
.unsubscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
}
};
protected void testCompose() {
getNetObservable()
.compose(schedulersObservable)
.subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
mRxOperatorsText.append(s);
}
});
}
關(guān)于compose的典型應(yīng)用,大家有興趣還可以去看一下開源項目RxLifecycle,它就是巧妙地利用compose操作符來解決了使用Rx可能會出現(xiàn)的內(nèi)存泄露問題。
Rx操作符的應(yīng)用場景
說了這么多,其實我們最關(guān)心的還是Rx操作符的應(yīng)用場景。其實只要存在異步的地方,都可以優(yōu)雅地使用Rx操作符。比如很多流行的Rx周邊開源項目

而針對自己想要實現(xiàn)的功能情景,如何去選擇特定的操作符,官網(wǎng)的文檔中也列出了一些指導(dǎo)——Rx操作符決策樹。
當(dāng)然除了這些,我們在開發(fā)項目時,還會有各種具體的業(yè)務(wù)場景需要選擇合適的操作符,這里我總結(jié)了一些經(jīng)常遇到的場景以及適合它們的操作符


只要我們理解了Rx操作符的原理,熟練掌握了一些使用頻率較高的操作符,就能夠在以上場景中輕松地使用,不再讓自己的代碼被復(fù)雜的業(yè)務(wù)邏輯搞得混亂。
以上就是本文的全部內(nèi)容,關(guān)于Rx還有很多東西值得深入地學(xué)習(xí)研究,后續(xù)有機會再跟大家分享更多Rx的使用心得。