從RxAndroid入門一文中,我們可以知道,RxJava主要構(gòu)建了一個主要應(yīng)用于異步場景、通過觀察者模式及使用響應(yīng)式編程和函數(shù)式編程規(guī)范實現(xiàn)的生產(chǎn)者-消費者模型:
- 事件生產(chǎn):生產(chǎn)者是
Observable,Observable對象的創(chuàng)建除了最基礎(chǔ)的create方法之外,還可以使用just、from、zip等其他多個方法。RxJava中提供了大量創(chuàng)建Observable的工廠方法,按需取用。 - 事件消費:消費者是
Observer及其拓展,使用subcribe關(guān)聯(lián)生產(chǎn)者。 - 事件加工:這是我們在RxAndroid入門一文中沒有提到的,RxJava提供了大量的方法(如
map、filter)對事件進行加工。
事件生產(chǎn)和加工的方法一般也被稱為操作符,RxJava提供的大量操作符不可能全部記住,因此只需要記住它們的作用,具體應(yīng)用時再查閱代碼或相關(guān)資料。接著,我們來看下一些常見的操作符。
創(chuàng)建Observable
create
方法原型如下:
static <T> Observable<T> create(ObservableOnSubscribe<T> source);
這個我們在入門時已經(jīng)使用過了,它是創(chuàng)建Observable對象最基礎(chǔ)的方法,需要實現(xiàn)ObservableOnSubscribe接口,然后在ObservableOnSubscribe.subscribe中生產(chǎn)和交付事件。
just
just是create的簡化操作,它直接將需要由Emitter發(fā)射出去的數(shù)據(jù)在初始化時傳入,有Observer訂閱后,Observable會自動依次發(fā)射這些數(shù)據(jù)并最后調(diào)用onComplete。示例如下:
Observable.just(1, 2, 3).subscribe(
(Integer i) -> log("onNext:" + i),
(Throwable e) -> e.printStackTrace(),
() -> log("onComplete"));
輸出日志如下:
onNext:1
onNext:2
onNext:3
onComplete
amb
方法原型如下:
static <T> Observable<T> amb(Iterable<? extends ObservableSource<? extends T>> sources);
static <T> Observable<T> ambArray(ObservableSource<? extends T>... sources);
所有Observable中,只有最早生產(chǎn)出事件的那個Observable的事件能夠被Observer消費。即消費者選定最優(yōu)的生產(chǎn)者,拋棄其他生產(chǎn)者,判定條件是事件的生產(chǎn)順序。
Observable.ambArray(
Observable.create((ObservableEmitter<String> emitter) -> emitter.onNext("1")).delay(1, TimeUnit.SECONDS),
Observable.create((ObservableEmitter<String> emitter) -> emitter.onNext("2"))
).subscribe((String s) -> log("onNext:" + s));
上述例子中的日志輸出結(jié)果是onNext:2,如果刪除第一個Observable的delay調(diào)用,那么輸出就是onNext:1。此外,還有一個成員方法abmWith可以在創(chuàng)建出Observable之后再使用,用來動態(tài)添加source。
使用場景:
同時向多個ip發(fā)起請求,最快響應(yīng)的那個服務(wù)器將作為后續(xù)訪問的節(jié)點。
concat
方法原型如下(方法有點多,參數(shù)數(shù)量不一樣但功能完全一樣的只列出其中一個):
static <T> Observable<T> concat(Iterable<? extends ObservableSource<? extends T>> sources);
static <T> Observable<T> concat(ObservableSource<? extends ObservableSource<? extends T>> sources, int prefetch);
static <T> Observable<T> concat(ObservableSource<? extends T> source1, ObservableSource<? extends T> source2, ObservableSource<? extends T> source3, ObservableSource<? extends T> source4)
static <T> Observable<T> concatArray(ObservableSource<? extends T>... sources);
static <T> Observable<T> concatArrayDelayError(ObservableSource<? extends T>... sources);
static <T> Observable<T> concatArrayEager(int maxConcurrency, int prefetch, ObservableSource<? extends T>... sources);
static <T> Observable<T> concatArrayEagerDelayError(int maxConcurrency, int prefetch, ObservableSource<? extends T>... sources);
static <T> Observable<T> concatDelayError(ObservableSource<? extends ObservableSource<? extends T>> sources, int prefetch, boolean tillTheEnd);
static <T> Observable<T> concatEager(Iterable<? extends ObservableSource<? extends T>> sources, int maxConcurrency, int prefetch);
使用concat連接的所有Observable會串行執(zhí)行,當上一個Observable執(zhí)行結(jié)束后,即觸發(fā)了onComplete或onError(如果可能觸發(fā)onError記得設(shè)置異常處理器)后,下一個Observable才開始執(zhí)行(是否立即執(zhí)行取決于是否設(shè)置了delay)。還記得入門時講解響應(yīng)式編程中提到的例子嗎(不記得的話再翻一下),用RxJava實現(xiàn)如下:
Observable.concat(
Observable.create((ObservableEmitter<Runnable> emitter) -> {
emitter.onNext(new TaskA());
emitter.onComplete();
}),
Observable.create((ObservableEmitter<Runnable> emitter) -> {
emitter.onNext(new TaskB());
emitter.onComplete();
}).delay(1, TimeUnit.SECONDS),
Observable.create((ObservableEmitter<Runnable> emitter) -> {
emitter.onNext(new TaskC());
emitter.onComplete();
}).delay(1, TimeUnit.SECONDS)
).subscribe((Runnable runnable) -> runnable.run(), (Throwable e) -> e.printStackTrace());
應(yīng)用場景:
concat的應(yīng)用場景非常廣,實際項目中,任務(wù)(或業(yè)務(wù))之間的線性依賴關(guān)系很普遍。
merge
merge與concat用法類似(方法原型這里就不貼了),差別在于concat是串行的,而merge是并行的。所有Observable并行運行,直到它們?nèi)坑|發(fā)onComplete后,Observer才會觸發(fā)onComplete,示例如下:
Observable.merge(
Observable.create((ObservableEmitter<String> emitter) -> {
emitter.onNext("A");
try {
Thread.sleep(1000);
} catch (Exception e) {
}
emitter.onNext("B");
emitter.onComplete();
}).subscribeOn(Schedulers.newThread()),
Observable.create((ObservableEmitter<String> emitter) -> {
try {
Thread.sleep(500);
} catch (Exception e) {
}
emitter.onNext("5");
emitter.onComplete();
}).subscribeOn(Schedulers.newThread())
).subscribe(
(String s) -> log("onNext:" + s),
(Throwable e) -> e.printStackTrace(),
() -> log("onComplete"));
日志輸出如下:
onNext:A
onNext:5
onNext:B
onComplete
從日志中可以看到2個Observable是并行運行的(需要手動設(shè)置不同的調(diào)度線程),并且Observer只回調(diào)了一次onComplete,且是在2個Observable都觸發(fā)了onComplete之后回調(diào)。
使用場景:
merge及concat應(yīng)該是使用場景最廣泛的兩種操作了。當一個任務(wù)(假設(shè)為C)依賴于其它多個任務(wù)時(假設(shè)為A、B),而A、B之間又沒有相互依賴關(guān)系,為了保證效率,A、B顯然需要并行運行,等到A、B都運行結(jié)束了就運行C。
zip
方法原型如下(方法有點多,參數(shù)數(shù)量不一樣但功能完全一樣的只列出其中一個):
static <T1, T2, T3, T4, T5, T6, T7, T8, T9, R> Observable<R> zip(
ObservableSource<? extends T1> source1, ObservableSource<? extends T2> source2, ObservableSource<? extends T3> source3,
ObservableSource<? extends T4> source4, ObservableSource<? extends T5> source5, ObservableSource<? extends T6> source6,
ObservableSource<? extends T7> source7, ObservableSource<? extends T8> source8, ObservableSource<? extends T9> source9,
Function9<? super T1, ? super T2, ? super T3, ? super T4, ? super T5, ? super T6, ? super T7, ? super T8, ? super T9, ? extends R> zipper);
static <T, R> Observable<R> zipArray(Function<? super Object[], ? extends R> zipper,
boolean delayError, int bufferSize, ObservableSource<? extends T>... sources);
static <T, R> Observable<R> zipIterable(Iterable<? extends ObservableSource<? extends T>> sources,
Function<? super Object[], ? extends R> zipper, boolean delayError, int bufferSize);
zip將多個Observable的事件組合在一起,然后再依次發(fā)射。這話不是很容易理解,我們先來看下例子:
Observable.zip(
Observable.create((ObservableEmitter<String> emitter) -> {
emitter.onNext("A");
emitter.onNext("B");
emitter.onNext("C");
}),
Observable.create((ObservableEmitter<Integer> emitter) -> {
emitter.onNext(1);
emitter.onNext(2);
}),
(String t1, Integer t2) -> new Pair<String, Integer>(t1, t2)
).subscribe((Pair<String, Integer> result) -> log("onNext:" + result.first + result.second));
日志輸出如下:
onNext:A1
onNext:B2
這里我們使用的是3個參數(shù)的zip方法,前面2個參數(shù)傳入Observable實例,以下簡稱O1和O2,第3個參數(shù)傳入BiFunction實例。BiFunction只有一個apply方法,apply方法攔截O1、O2的事件發(fā)射,參數(shù)1表示O1發(fā)射的事件,參數(shù)2表示O2發(fā)射的事件,返回值是兩個事件的打包(打包類型自已定),apply方法的作用就是進行事件打包。打包后的事件會依次發(fā)射給Observer,且打包必須依次一一對應(yīng),如果一方發(fā)射了m個事件,另一個方只發(fā)射了n個事件(n<m),那么最終發(fā)射給Observer的事件將只有n個。
事件加工
上面列舉了創(chuàng)建Observable的幾種方式,事實上遠不止以上幾種,但以上幾種是比較常見,接下來我們來看下常見的事件加工。
filter
filter是事件加工中最簡易懂的操作了,它攔截Observable發(fā)射出來的事件,并將其中不符合要求的事件濾掉,只發(fā)射滿足要求的事件給Observer。看個例子:
Observable.create((ObservableEmitter<Integer> emitter) -> {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
}).filter((Integer i) -> i > 1)
.subscribe((Integer i) -> log("onNext:" + i));
日志輸出如下:
onNext:2
onNext:3
map
map攔截Observable發(fā)射出來的事件,將事件轉(zhuǎn)化為其他數(shù)據(jù)后再發(fā)射給Observer,示例如下:
Observable.create((ObservableEmitter<Integer> emitter) -> {
emitter.onNext(1);
emitter.onNext(2);
}).map((Integer i) -> "this is " + i)
.subscribe((String s) -> log("onNext:" + s));
日志輸出如下:
onNext:this is 1
onNext:this is 2
flatMap
flatMap與map類似,都是進行數(shù)據(jù)的轉(zhuǎn)化,差別在于flatMap將數(shù)據(jù)轉(zhuǎn)化為Observable對象,這些Observable取代原有的Observable作為生產(chǎn)者提供數(shù)據(jù),使用map中的示例修改后代碼如下:
Observable.just(1, 2)
.flatMap((Integer i) -> Observable.just("this is " + i))
.subscribe((String s) -> log("onNext:" + s));
日志輸出如下:
onNext:this is 1
onNext:this is 2
僅通過上面的例子很難了解flatMap的用處,因為map也可以做到,其實flatMap適合處理更為復(fù)雜的數(shù)據(jù),如多重列表。假設(shè)有一群小朋友聚在一起玩耍,每個人都需要拿出自己的玩具,然后我們需要統(tǒng)計下這些玩具,代碼如下:
class Toy {
String name;
Toy(String name) {
this.name = name;
}
}
class Kid {
List<Toy> toys;
}
List<Kid> kids() {
List<Kid> kids = new LinkedList<>();
Kid kid = new Kid();
kid.toys = new LinkedList<>();
kid.toys.add(new Toy("熊大"));
kid.toys.add(new Toy("熊二"));
kids.add(kid);
kid = new Kid();
kid.toys = new LinkedList<>();
kid.toys.add(new Toy("水槍"));
kids.add(kid);
kid = new Kid();
kid.toys = new LinkedList<>();
kid.toys.add(new Toy("足球"));
kid.toys.add(new Toy("扭扭車"));
kids.add(kid);
return kids;
}
void test() {
Observable.fromIterable(kids())
.flatMap((Kid kid) -> Observable.fromIterable(kid.toys))
.subscribe((Toy toy) -> log("onNext:" + toy.name));
}
執(zhí)行test方法輸出日志如下:
onNext:熊大
onNext:熊二
onNext:水槍
onNext:足球
onNext:扭扭車
concatMap
concatMap與flatMap的作用類似,差別在于前者輸出的數(shù)據(jù)順序與原始數(shù)據(jù)保持一致(使用concat),而后者不保證(使用merge),我們來看個例子:
Observable<Integer> o1 = Observable.just(1, 2, 3, 4, 5)
.flatMap((Integer i) -> {
if(i == 3) {
return Observable.just(i).delay(1, TimeUnit.SECONDS);
} else {
return Observable.just(i);
}
});
Observable<Integer> o2 = Observable.just(-1, -2, -3, -4, -5)
.concatMap((Integer i) -> {
if(i == -3) {
return Observable.just(i).delay(1, TimeUnit.SECONDS);
} else {
return Observable.just(i);
}
});
Observable.concat(o1, o2).subscribe((Integer i) -> log("onNext:" + i));
日志輸出如下:
onNext:1
onNext:2
onNext:4
onNext:5
onNext:3
onNext:-1
onNext:-2
onNext:-3
onNext:-4
onNext:-5