一、RxJava操作符概述
- RxJava中的操作符就是為了提供函數(shù)式的特性,函數(shù)式最大的好處就是使得數(shù)據(jù)處理簡(jiǎn)潔易懂。
操作符實(shí)質(zhì)上就是RxJava函數(shù)式編程模式的體現(xiàn),在上篇文章中,我詳細(xì)闡述了RxJava中觀察者模式、Iterator模式、函數(shù)式編程模式概念的解讀,詳情請(qǐng)戳→文章傳送門 。
Rxjava操作符的類型眾多,在本文中,我詳細(xì)解釋如圖1.1所示的9種操作符。


二、RxJava操作符詳解
1、創(chuàng)建操作符
創(chuàng)建操作符的分類如下圖所示,關(guān)于create操作符的詳細(xì)操作可在我的上篇文章中查看,在本文中不加以贅述,文章鏈接戳→文章傳送門 。在本文中從from操作符開始介紹。

- 注:在這里,我將請(qǐng)求的不完整回調(diào)在父類中進(jìn)行了封裝,具體代碼可查看GitHub的代碼鏈接。
①from操作符
在這里以發(fā)送數(shù)組為例,from操作符的使用代碼如下所示:
//from操作符,創(chuàng)建以數(shù)組內(nèi)容發(fā)送事件的Observable
String[] observableArr = new String[]{"Alex", "Payne"};
//onNextAction、onErrorAction提取到父類中,具體代碼可查看GitHub的代碼鏈接
Observable.from(observableArr).subscribe(onNextAction, onErrorAction);
首先,我們查看如下所示from操作符的結(jié)構(gòu)圖,可以看到它有多種實(shí)現(xiàn)方式,但是有一個(gè)共同點(diǎn),都會(huì)返回一個(gè)Observable對(duì)象。

進(jìn)一步查看源碼,可得知from操作符的作用:將一個(gè)Iterable、一個(gè)Future、 或者一個(gè)數(shù)組,內(nèi)部通過代理的方式轉(zhuǎn)換成一個(gè)Observable。
Future轉(zhuǎn)換為OnSubscribe是通過OnSubscribeToObservableFuture進(jìn)行的,Iterable轉(zhuǎn)換通過OnSubscribeFromIterable進(jìn)行。數(shù)組通過OnSubscribeFromArray轉(zhuǎn)換。
②just操作符
使用代碼如下所示:
//just操作符,創(chuàng)建將逐個(gè)內(nèi)容進(jìn)行發(fā)送的Observable,其內(nèi)部發(fā)送內(nèi)容在內(nèi)部以from的操作符的方式進(jìn)行轉(zhuǎn)換
Observable.just("Alex", "Payne").subscribe(onNextAction);

查看just操作符的結(jié)構(gòu)圖,結(jié)合源碼得知,just操作符將單個(gè)參數(shù)發(fā)送的內(nèi)容通過ScalarSynchronousObservable轉(zhuǎn)換為一個(gè)新的Observable對(duì)象,而將多個(gè)參數(shù)發(fā)送的內(nèi)容轉(zhuǎn)換為一個(gè)數(shù)組,然后將數(shù)組通過from操作符進(jìn)行發(fā)送。
③interval操作符
interval操作符使用代碼如下所示:
//interval操作符,創(chuàng)建以1秒為事件間隔發(fā)送整數(shù)序列的Observable
Observable.interval(1, TimeUnit.SECONDS, AndroidSchedulers.mainThread()).subscribe(onNextAction);

查看interval的結(jié)構(gòu)圖,其只能發(fā)送Long類型的數(shù),實(shí)質(zhì)上其作用為:創(chuàng)建一個(gè)按固定時(shí)間間隔發(fā)射整數(shù)序列的Observable,這個(gè)序列為一個(gè)無限遞增的整數(shù)序列。
需要注意的是:interval默認(rèn)在computation調(diào)度器上執(zhí)行。你也可以傳遞一個(gè)可選的Scheduler參數(shù)來指定調(diào)度器。
④range操作符
range操作符使用的代碼如下所示:
//range操作符,創(chuàng)建以發(fā)送范圍內(nèi)的整數(shù)序列的Observable
Observable.range(0, 3).subscribe(onNextAction);
range操作符發(fā)射一個(gè)范圍內(nèi)的有序整數(shù)序列,并且我們可以指定范圍的起始和長(zhǎng)度
⑤repeat操作符
repeat操作符使用的代碼如下所示:
//repeat操作符,創(chuàng)建一個(gè)以N次重復(fù)發(fā)送數(shù)據(jù)的Observable
Observable.range(0, 3).repeat(2).subscribe(onNextAction);
在這里需要強(qiáng)調(diào)一下,它不是創(chuàng)建一個(gè)Observable,而是重復(fù)發(fā)射原始Observable的數(shù)據(jù)序列,這個(gè)序列或者是無限的,或者通過repeat(n)指定重復(fù)次數(shù)。
2、變換操作符
在這里我介紹如下圖所示7種變換操作符,變換操作符的作用是將源Observable發(fā)送的數(shù)據(jù)進(jìn)行變換。
①map操作符
map操作符使用的代碼如下所示:
//map操作符,通過指定一個(gè)Func,將Observable轉(zhuǎn)換為另一個(gè)Observable對(duì)象并發(fā)送
Observable.just("Alex_Payne")
.map(new Func1<String, String>() {
@Override
public String call(String s) {
return "My Name is" + s;
}
}).subscribe(onNextAction);
map操作符將源Observable發(fā)送的數(shù)據(jù)轉(zhuǎn)換為一個(gè)新的Observable對(duì)象。
在這里,F(xiàn)unc1和Action的區(qū)別在于,F(xiàn)unc1包裝的是有返回值的方法。另外,和ActionX 一樣,F(xiàn)uncX 也有多個(gè),用于不同參數(shù)個(gè)數(shù)的方法。
FuncX 和 ActionX 的區(qū)別在 FuncX 包裝的是有返回值的方法。
②flatMap操作符
flatMap操作符使用的代碼如下所示:
//flatMap操作符,將Observable發(fā)送的數(shù)據(jù)集合轉(zhuǎn)換為Observable集合
//flatMap的合并運(yùn)行允許交叉,允許交錯(cuò)的發(fā)送事件
String[] observableArr = {"Alex", "Max", "Bruce", "Frank", "Tom"};
Observable.from(observableArr).flatMap(new Func1<String, Observable<String>>() {
@Override
public Observable<String> call(String s) {
return Observable.just("My Name is:" + s);
}
}).subscribe(onNextAction);
源Observable通過flatMap操作符轉(zhuǎn)換為包含源Observable發(fā)送的所有子條目的Observable集合,可見下圖的示意圖,然后從Observable集合中逐個(gè)取出轉(zhuǎn)化為單個(gè)Observable對(duì)象進(jìn)行發(fā)送。不同于map操作符的一點(diǎn)就是一對(duì)多的轉(zhuǎn)化。

注意:FlatMap對(duì)這些Observables發(fā)射的數(shù)據(jù)做的是合并(merge)操作,因此它們可能是交錯(cuò)的。
③concatMap操作符
concatMap操作符使用的代碼如下所示:
//concatMap操作符,將Observable發(fā)送的數(shù)據(jù)集合轉(zhuǎn)換為Observable集合
//解決了flatMap的交叉問題,將發(fā)送的數(shù)據(jù)連接發(fā)送
String[] observableArr = {"Alex", "Max", "Bruce", "Frank", "Tom"};
Observable.from(observableArr).concatMap(new Func1<String, Observable<String>>() {
@Override
public Observable<String> call(String s) {
return Observable.just("My Name is:" + s);
}
}).subscribe(onNextAction);
concatMap操作符類似于flatMap操作符,不同的一點(diǎn)是它按次序連接。
④cast操作符
cast操作符使用的代碼如下所示:
//cast操作符,將類對(duì)象進(jìn)行轉(zhuǎn)換
Object[] objectsArr = {"1", "2", "3"};
Observable.from(objectsArr).cast(String.class).subscribe(onNextAction);
cast操作符將源Observable發(fā)送的數(shù)據(jù)都強(qiáng)制轉(zhuǎn)換為一個(gè)指定的類型,然后再發(fā)射數(shù)據(jù)。
需強(qiáng)調(diào)的一點(diǎn)是只能由父類對(duì)象轉(zhuǎn)換為子類對(duì)象,否則會(huì)報(bào)錯(cuò)。
⑤flatMapIterable操作符
flatMapIterable操作符使用的代碼如下所示:
//將數(shù)據(jù)集合轉(zhuǎn)換為Iterable,在Iterable中對(duì)數(shù)據(jù)進(jìn)行處理
Observable.just(1, 2, 3).flatMapIterable(new Func1<Integer, Iterable<Integer>>() {
@Override
public Iterable<Integer> call(Integer number) {
ArrayList<Integer> mList = new ArrayList<>();
mList.add(1000 + number);
return mList;
}
}).subscribe(onNextAction);
flatMapIterable相當(dāng)于是flatMap的變體,直接在內(nèi)部以Iterable接口將集合數(shù)據(jù)進(jìn)行接收,示意圖如下所示:

⑥buffer操作符
buffer操作符使用的代碼如下所示:
//buffer操作符,將原有Observable轉(zhuǎn)換為一個(gè)新的Observable,這個(gè)新的Observable每次發(fā)送一組值,而不是一個(gè)個(gè)進(jìn)行發(fā)送
Observable.just(1, 2, 3, 4, 5, 6)
.buffer(3).subscribe(new Action1<List<Integer>>() {
@Override
public void call(List<Integer> mList) {
for (Integer i : mList) {
Toast.makeText(getActivity(), "new Number i is:" + i, Toast.LENGTH_SHORT).show();
}
Toast.makeText(getActivity(), "Another request is called", Toast.LENGTH_SHORT).show();
}
});
buffer操作符將原有Observable轉(zhuǎn)換為一個(gè)新的Observable,這個(gè)新的Observable每次發(fā)送一組值,而不是一個(gè)個(gè)進(jìn)行發(fā)送,我們可以定義這個(gè)新的Observable存放幾個(gè)原有的Observable對(duì)象。

⑦groupBy操作符
groupBy操作符使用的代碼如下所示:
//groupBy操作符,可以做分組操作
Observable.range(0, 10).groupBy(new Func1<Integer, Integer>() {
@Override
public Integer call(Integer num) {
return num % 3;
}
}).subscribe(new Action1<GroupedObservable<Integer, Integer>>() {
@Override
public void call(final GroupedObservable<Integer, Integer> groupedObservable) {
groupedObservable.subscribe(new Action1<Integer>() {
@Override
public void call(Integer num) {
Toast.makeText(getActivity(), "當(dāng)前的組別是:" + groupedObservable.getKey() + "組別內(nèi)的數(shù)字是:" + num, Toast.LENGTH_SHORT).show();
}
});
}
});
groupBy操作符,將原有的Observable對(duì)象轉(zhuǎn)換為發(fā)送一組Observable集合的GroupedObservable對(duì)象,可以做分組操作,GroupedObservable將分組完畢的Observable對(duì)象可以繼續(xù)發(fā)送。
注意:groupBy將原始Observable分解為一個(gè)發(fā)射多個(gè)GroupedObservable的Observable,一旦有訂閱,每個(gè)GroupedObservable就開始緩存數(shù)據(jù)。因此,如果你忽略這些GroupedObservable中的任何一個(gè),這個(gè)緩存可能形成一個(gè)潛在的 內(nèi)存泄露 。因此,如果你不想觀察,也不要忽略GroupedObservable。你應(yīng)該使用像take(0)這樣會(huì)丟棄自己的緩存的操作符。
3、過濾操作符
過濾操作符用于從Observable發(fā)射的數(shù)據(jù)中進(jìn)行選擇,在這里介紹如下圖所示的8種。
①filter操作符
filter操作符使用的代碼如下所示:
//filter過濾操作符,對(duì)Observable發(fā)送的內(nèi)容根據(jù)自定義的規(guī)則進(jìn)行過濾
Observable.range(0, 5).filter(new Func1<Integer, Boolean>() {
@Override
public Boolean call(Integer num) {
return num > 2;//自定義的條件,只有符合條件的結(jié)果才會(huì)提交給觀察者
}
}).subscribe(onNextAction);
filter默認(rèn)不在任何特定的調(diào)度器上執(zhí)行。
②elementAt操作符
elementAt操作符使用的代碼如下所示:
//elementAt操作符,用于返回指定位置后一位的數(shù)據(jù),即腳標(biāo)+1的數(shù)據(jù)
//在這里發(fā)送0、1、2、3、4,腳標(biāo)為3的數(shù)據(jù)為2,發(fā)送其后一位數(shù)據(jù)3
Observable.range(0, 5).elementAt(3).subscribe(onNextAction);
elementAt操作符獲取原始Observable發(fā)射的數(shù)據(jù)序列指定索引位置的數(shù)據(jù)項(xiàng),然后當(dāng)做自己的唯一數(shù)據(jù)發(fā)射。對(duì)應(yīng)示意圖如下:
③distinct操作符
distinct操作符使用的代碼如下所示:
//distinct操作符,用于Observable發(fā)送的元素的去重
Observable.just(1, 1, 2, 2, 2, 3).distinct().subscribe(onNextAction);
在這里需要強(qiáng)調(diào)一點(diǎn):distinct操作符只允許還沒有發(fā)射過的數(shù)據(jù)項(xiàng)通過。
④skip操作符
skip操作符使用的代碼如下所示:
//skip操作符,用于Observable發(fā)送的元素前N項(xiàng)去除掉
Observable.range(0, 5).skip(2).subscribe(onNextAction);
skip操作符抑制Observable發(fā)射的前N項(xiàng)數(shù)據(jù),只發(fā)送后N項(xiàng)數(shù)據(jù)
⑤take操作符
//take操作符,用于Observable發(fā)送的元素只取前N項(xiàng)
Observable.range(0, 5).take(2).subscribe(onNextAction);

⑥ignoreElements操作符
//ignoreElements操作符,忽略掉源Observable發(fā)送的結(jié)果,只把Observable的onCompleted或onError發(fā)送
Observable.range(0, 5).ignoreElements().subscribe(onNextAction, onErrorAction, onCompletedAction);
IgnoreElements操作符抑制原始Observable發(fā)射的所有數(shù)據(jù),只允許它的終止通知(onError或onCompleted)進(jìn)行發(fā)送。
⑦throttleFirst操作符
//throttleFirst操作符,會(huì)定期發(fā)送這個(gè)時(shí)間段里源Observable發(fā)送的第一個(gè)數(shù)據(jù)
//throttleFirst操作符默認(rèn)在computaioin調(diào)度器上執(zhí)行,其他的數(shù)據(jù)都會(huì)被過濾掉
Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
for (int i = 0; i < 10; i++) {
subscriber.onNext(i);
//線程休眠100毫秒
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
})
.throttleFirst(200, TimeUnit.MILLISECONDS)
.subscribe(onNextAction);
throttleFirst操作符會(huì)按照固定的時(shí)間間隔將信息進(jìn)行發(fā)送。在這里我設(shè)置的事件間隔為200毫秒,其中每發(fā)送一個(gè)數(shù)據(jù)線程休眠100毫秒,所以最后會(huì)顯示的數(shù)據(jù)為0,示意圖如下:
注:throttleFirst操作符默認(rèn)在computation調(diào)度器上執(zhí)行,但是你可以使用第三個(gè)參數(shù)指定其它的調(diào)度器。
⑧throttleWithTimeOut操作符
//throttleWithTimeout操作符
//源發(fā)射數(shù)據(jù)時(shí),如果兩次數(shù)據(jù)的發(fā)射間隔小于指定時(shí)間,就會(huì)丟棄前一次的數(shù)據(jù),直到指定時(shí)間內(nèi)都沒有新數(shù)據(jù)發(fā)射時(shí)才進(jìn)行發(fā)射
Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
subscriber.onNext(1);
try {
Thread.sleep(500);
} catch (InterruptedException e) {
throw Exceptions.propagate(e);
}
subscriber.onNext(2);
try {
Thread.sleep(500);
} catch (InterruptedException e) {
throw Exceptions.propagate(e);
}
subscriber.onNext(3);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw Exceptions.propagate(e);
}
subscriber.onNext(4);
subscriber.onNext(5);
subscriber.onCompleted();
}
})
.throttleWithTimeout(800, TimeUnit.MILLISECONDS)
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(onNextAction);
在這里,我設(shè)置的時(shí)間間隔指定為800毫秒,所以最后顯示的數(shù)據(jù)是有3、4、5。
4、組合操作符
組合操作符用于將多個(gè)Observable組合成一個(gè)單一的Observable,在這里我介紹如下圖所示5種操作符:

①startWith組合操作符
startWith組合操作符使用的代碼如下所示:
//startWith操作符,會(huì)在發(fā)送的數(shù)據(jù)之前插入數(shù)據(jù)
Observable.range(3, 5).startWith(0, 10086).subscribe(onNextAction);
很簡(jiǎn)單,會(huì)在發(fā)送的數(shù)據(jù)序列前插入數(shù)據(jù)序列,并且會(huì)發(fā)送插入的數(shù)據(jù)序列。
②merge組合操作符
merge組合操作符使用的代碼如下所示:
//merge操作符,會(huì)將多個(gè)Observable對(duì)象合并到一個(gè)Observable對(duì)象中進(jìn)行發(fā)送
Observable<Integer> firstObservable = Observable.just(0, 1, 2).subscribeOn(Schedulers.io());
Observable<Integer> secondObservable = Observable.just(3, 4, 5);
Observable.merge(firstObservable, secondObservable).subscribe(onNextAction, onErrorAction);
如下圖所示,merge操作符會(huì)將多個(gè)Observable對(duì)象進(jìn)行合并。

在這里我將firstObservable指定在IO線程中進(jìn)行發(fā)送,secondObservable沒有指定線程,兩者合并然后發(fā)送數(shù)據(jù)時(shí)便會(huì)產(chǎn)生數(shù)據(jù)交錯(cuò)的現(xiàn)象。
③concat組合操作符
concat組合操作符使用的代碼如下所示:
//concat操作符,會(huì)將多個(gè)Observable對(duì)象合并到一個(gè)Observable對(duì)象中進(jìn)行發(fā)送,嚴(yán)格按照順序進(jìn)行發(fā)送
Observable<Integer> firstObservable = Observable.just(0, 1, 2).subscribeOn(Schedulers.io());
Observable<Integer> secondObservable = Observable.just(3, 4, 5);
Observable.concat(firstObservable, secondObservable)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(onNextAction);
concat操作符不同于merge操作符的區(qū)別就是:會(huì)將多個(gè)Observable對(duì)象合并到一個(gè)Observable對(duì)象中進(jìn)行發(fā)送,嚴(yán)格按照順序進(jìn)行發(fā)送。如下圖所示,直到第一個(gè)Observable發(fā)送完畢數(shù)據(jù)后,第二個(gè)Observable才會(huì)進(jìn)行數(shù)據(jù)的發(fā)送。

④zip組合操作符
zip組合操作符使用的代碼如下所示:
//zip操作符,會(huì)將多個(gè)Observable對(duì)象轉(zhuǎn)換成一個(gè)Observable對(duì)象然后進(jìn)行發(fā)送,轉(zhuǎn)換關(guān)系可根據(jù)需求自定義
Observable<Integer> integerObservable = Observable.range(0, 4);
Observable<String> stringObservable = Observable.just("a", "b", "c", "d");
Observable.zip(integerObservable, stringObservable, new Func2<Integer, String, String>() {
@Override
public String call(Integer num, String info) {
//在這里的轉(zhuǎn)換關(guān)系為將數(shù)字與字串內(nèi)容進(jìn)行拼接
return "數(shù)字為:" + num + "……字符為:" + info;
}
}).subscribe(onNextAction);

zip操作符返回一個(gè)Obversable,它使用這個(gè)函數(shù)按順序結(jié)合兩個(gè)或多個(gè)Observables發(fā)射的數(shù)據(jù)項(xiàng),然后它發(fā)射這個(gè)函數(shù)返回的結(jié)果。
它按照嚴(yán)格的順序進(jìn)行數(shù)據(jù)發(fā)送。它只發(fā)射與發(fā)射數(shù)據(jù)項(xiàng)最少的那個(gè)Observable一樣多的數(shù)據(jù)。
⑤combineLastest組合操作符
combineLastest組合操作符使用的代碼如下所示:
//combineLastest操作符,會(huì)將多個(gè)Observable對(duì)象轉(zhuǎn)換一個(gè)Observable對(duì)象然后進(jìn)行發(fā)送,轉(zhuǎn)換關(guān)系可以根據(jù)需求自定義
//不同于zip操作符的是,會(huì)將最新發(fā)送的數(shù)據(jù)組合到一起
integerObservable = Observable.just(1, 2, 3);
stringObservable = Observable.just("a", "b", "c");
Observable.combineLatest(integerObservable, stringObservable, new Func2<Integer, String, String>() {
@Override
public String call(Integer num, String info) {
//在這里的轉(zhuǎn)換關(guān)系為將數(shù)字與字串內(nèi)容進(jìn)行拼接
return "數(shù)字為:" + num + "……字符為:" + info;
}
}).subscribe(onNextAction);
當(dāng)兩個(gè)Observables中的任何一個(gè)發(fā)射了數(shù)據(jù)時(shí),使用一個(gè)函數(shù)結(jié)合每個(gè)Observable發(fā)射的最近數(shù)據(jù)項(xiàng),并且基于這個(gè)函數(shù)的結(jié)果發(fā)射數(shù)據(jù)。可在本案例代碼中進(jìn)行驗(yàn)證。
5、輔助操作符
輔助操作符就是處理Observable的幫助動(dòng)作,在這里介紹如下5種輔助操作符。
①delay操作符
delay操作符使用的代碼如下所示:
//delay操作符可以讓源Observable對(duì)象發(fā)送數(shù)據(jù)之前暫停一段制定的時(shí)間
Observable.just(1, 2, 3)
.delay(2, TimeUnit.SECONDS)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(onNextAction);
在這里我將延時(shí)時(shí)間設(shè)置為2秒,延遲指定的時(shí)間后發(fā)射源Observable中的數(shù)據(jù)。
②do操作符
do操作符使用的代碼如下所示:
//doOnNext是do操作符中的一種
Observable.range(0, 3).doOnNext(onNextAction).subscribe(onNextAction);
do操作符,其下細(xì)分有很多內(nèi)容,以doOnNext為例,其作用就是為源Observable對(duì)象發(fā)送數(shù)據(jù)后,當(dāng)Subscriber接收到數(shù)據(jù)時(shí),即當(dāng)Subscriber的onNext方法被調(diào)用時(shí),提供回調(diào)相應(yīng)數(shù)據(jù)。
③subscribeOn輔助操作符
④observeOn輔助操作符
subscribeOn、observeOn操作符使用的代碼如下所示:
Observable.just("當(dāng)前的線程ID為" + Thread.currentThread().getName())
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(onNextAction);
- subscribeOn操作符,指定subscribe()所發(fā)生的線程,即Observable.OnSubscribe被激活時(shí)所處的線程?;蛘呓凶鍪录a(chǎn)生的線程。
- observeOn操作符,指定Subscriber所運(yùn)行的線程?;蛘呓凶鍪录M(fèi)的線程。
上篇文章中我提到Schedulers可以使得RxJava實(shí)現(xiàn)線程切換,實(shí)質(zhì)上就是借助于lift變換方法進(jìn)行轉(zhuǎn)換,subscribeOn發(fā)生在下圖的通知過程,observeOn發(fā)生在下圖中的發(fā)送過程。

⑤timeout輔助操作符
timeout操作符使用的代碼如下所示:
//timeout操作符,如果源Observable對(duì)象過了一段時(shí)間沒有發(fā)送數(shù)據(jù),timeout會(huì)以onError通知終止這個(gè)Observable
Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
for (int i = 0; i < 5; i++) {
try {
Thread.sleep(i * 100);
} catch (InterruptedException e) {
e.printStackTrace();
}
subscriber.onNext(i);
}
}
}).timeout(200, TimeUnit.MILLISECONDS, Observable.just(100, 200))
.observeOn(AndroidSchedulers.mainThread())
.subscribe(onNextAction);
需要強(qiáng)調(diào)的一點(diǎn)是,在這里timeout(long timeout, TimeUnit timeUnit, Observable<? extends T> other)是timeout其中的一種,它在超時(shí)的時(shí)候會(huì)將源Observable轉(zhuǎn)換為備用的Observable對(duì)象進(jìn)行發(fā)送。
6、錯(cuò)誤操作符

①catch操作符
實(shí)質(zhì)上在這里catch操作符細(xì)分有三種實(shí)現(xiàn)方案:onErrorReturn、onErrorResumeNext、onExceptionResumeNext。
- 首先分析onErrorReturn的代碼:
Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
for (int i = 0; i < 5; i++) {
if (i > 3) {
subscriber.onError(new Throwable("User Alex Defined Error"));
}
subscriber.onNext(i);
}
}
}).onErrorReturn(new Func1<Throwable, Integer>() {
@Override
public Integer call(Throwable throwable) {
return 404;
}
}).subscribe(onNextAction, onErrorAction, onCompletedAction);
onErrorReturn操作符,會(huì)在遇到錯(cuò)誤時(shí),停止源Observable的,并調(diào)用用戶自定義的返回請(qǐng)求,實(shí)質(zhì)上就是調(diào)用一次OnNext方法進(jìn)行內(nèi)容發(fā)送后,停止消息發(fā)送。
- 然后分析onErrorResumeNext的代碼:
Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
for (int i = 0; i < 5; i++) {
if (i > 3) {
subscriber.onError(new Throwable("User Alex Defined Error"));
}
subscriber.onNext(i);
}
}
}).onErrorResumeNext(new Func1<Throwable, Observable<? extends Integer>>() {
@Override
public Observable<? extends Integer> call(Throwable throwable) {
return Observable.just(100,101,102);
}
}).subscribe(onNextAction,onErrorAction,onCompletedAction);
onErrorResumeNext操作符,會(huì)在源Observable遇到錯(cuò)誤時(shí),立即停止源Observable的數(shù)據(jù)發(fā)送,并取用新的Observable對(duì)象進(jìn)行新的數(shù)據(jù)發(fā)送。
- 最后,分析onExceptionResumeNext的代碼:
Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
for (int i = 0; i < 5; i++) {
if (i > 3) {
subscriber.onError(new Throwable("User Alex Defined Error"));
}
subscriber.onNext(i);
}
}
}).onExceptionResumeNext(Observable.just(100,101,102)).subscribe(onNextAction,onErrorAction,onCompletedAction);
onExceptionResumeNext,會(huì)將錯(cuò)誤發(fā)給Observer,而不會(huì)調(diào)用備用的Observable
②retry操作符
retry操作符實(shí)現(xiàn)的代碼如下所示:
//retry操作符,當(dāng)遇到exception時(shí)會(huì)進(jìn)行重試,重試次數(shù)可以由用戶進(jìn)行定義
Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
for (int i = 0; i < 5; i++) {
if (i > 1) {
subscriber.onError(new Throwable("User Alex Defined Error"));
}
subscriber.onNext(i);
}
}
}).retry(2).subscribe(onNextAction,onErrorAction,onCompletedAction);
retry操作符不會(huì)將原始Observable的onError通知傳遞給觀察者,它會(huì)重新訂閱這個(gè)Observable。

7、布爾操作符
布爾操作符根據(jù)給定規(guī)則進(jìn)行判斷,是否符合規(guī)則然后返回布爾值。布爾操作符意義簡(jiǎn)單操作簡(jiǎn)便在這里介紹如下5種:

①all操作符
all操作符實(shí)現(xiàn)的代碼如下所示:
Observable.just(1, 2, 3, 4).all(new Func1<Integer, Boolean>() {
@Override
public Boolean call(Integer num) {
return num > 3;
}
}).subscribe(onNextAction, onErrorAction, onCompletedAction);
all操作符,對(duì)源Observable發(fā)送的每一個(gè)數(shù)據(jù)根據(jù)給定的條件進(jìn)行判斷。如果全部符合條件,返回true,否則返回false。
②contains操作符
contains操作符實(shí)現(xiàn)的代碼如下所示:
Observable.just(1, 2, 3, 4).contains(2).subscribe(onNextAction, onErrorAction, onCompletedAction);
contains操作符,對(duì)源Observable發(fā)送的數(shù)據(jù)是否包含定義的選項(xiàng)進(jìn)行判斷。如果包含返回true,否則返回false。
③isEmpty操作符
isEmpty操作符實(shí)現(xiàn)的代碼如下所示:
Observable.just(1, 2, 3, 4).isEmpty().subscribe(onNextAction, onErrorAction, onCompletedAction);
isEmpty操作符,對(duì)源Observable發(fā)送的數(shù)據(jù)是否為空進(jìn)行判斷。如果源Observable發(fā)送的數(shù)據(jù)為空返回true,否則返回false。
④exists操作符
exists操作符實(shí)現(xiàn)的代碼如下所示:
Observable.just(1, 2, 3, 4).exists(new Func1<Integer, Boolean>() {
@Override
public Boolean call(Integer num) {
return num > 3;
}
}).subscribe(onNextAction, onErrorAction, onCompletedAction);
exists操作符,對(duì)源Observable發(fā)送的單獨(dú)一個(gè)數(shù)據(jù)根據(jù)給定的條件進(jìn)行判斷。如果有一個(gè)數(shù)據(jù)符合條件,返回true,否則返回false。
⑤sequenceEqual操作符
sequenceEqual操作符實(shí)現(xiàn)的代碼如下所示:
Observable.sequenceEqual(Observable.just(1, 2, 3, 4), Observable.just(1))
.subscribe(onNextAction, onErrorAction, onCompletedAction);
sequenceEqual操作符,對(duì)兩個(gè)Observable進(jìn)行判斷,兩個(gè)Observable相同時(shí)返回true,否則返回false。這里包含兩個(gè)Observable的數(shù)據(jù),發(fā)射順序,終止?fàn)顟B(tài)是否相同。
8、條件操作符

①amb操作符
amb操作符實(shí)現(xiàn)的代碼如下所示:
//給定多個(gè)Observable,只讓第一個(gè)發(fā)送數(shù)據(jù)的Observable發(fā)送數(shù)據(jù)
Observable
.amb(Observable.range(0,3).delay(2000, TimeUnit.MILLISECONDS),Observable.range(100,3))
.subscribe(onNextAction);
如下圖所示,首先發(fā)送通知給Amb的那個(gè),不管發(fā)射的是一項(xiàng)數(shù)據(jù)還是一個(gè)onError或onCompleted通知。Amb將忽略和丟棄其它所有Observables的發(fā)射物。
②defaultIfEmpty操作符
amb操作符實(shí)現(xiàn)的代碼如下所示:
//如果源Observable沒有發(fā)送數(shù)據(jù),則發(fā)送一個(gè)默認(rèn)數(shù)據(jù)
Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
subscriber.onCompleted();
}
}).defaultIfEmpty(404).subscribe(onNextAction,onErrorAction,onCompletedAction);
9、轉(zhuǎn)換操作符
轉(zhuǎn)換操作符可以將Observable轉(zhuǎn)換為其它的對(duì)象或數(shù)據(jù)結(jié)構(gòu)。在這里介紹如下所示三種轉(zhuǎn)換操作符:
①toList操作符
toList操作符實(shí)現(xiàn)的代碼如下所示:
//toList操作符,將源Observable發(fā)送的數(shù)據(jù)組合為一個(gè)List集合
//然后再次在onNext方法中將轉(zhuǎn)換完的List集合進(jìn)行傳遞
Observable.just(1, 2, 3).toList().subscribe(new Action1<List<Integer>>() {
@Override
public void call(List<Integer> numList) {
for (Integer i : numList) {
Toast.makeText(getActivity(), "i:" + i, Toast.LENGTH_SHORT).show();
}
}
});
通常,發(fā)射多項(xiàng)數(shù)據(jù)的Observable會(huì)為每一項(xiàng)數(shù)據(jù)調(diào)用onNext方法。你可以用toList操作符改變這個(gè)行為,讓Observable將多項(xiàng)數(shù)據(jù)組合成一個(gè)List,然后調(diào)用一次onNext方法傳遞整個(gè)列表。
②toSortedList操作符
toSortedList操作符實(shí)現(xiàn)的代碼如下所示:
//toSortedList操作符,會(huì)將源Observable發(fā)送的數(shù)據(jù)組合為一個(gè)List集合,并會(huì)按照升序的方式進(jìn)行排序
//然后再次在onNext方法中將轉(zhuǎn)換完的List集合進(jìn)行傳遞
Observable.just(40, 10, 80, 30).toSortedList().subscribe(new Action1<List<Integer>>() {
@Override
public void call(List<Integer> numList) {
for (Integer i : numList) {
Toast.makeText(getActivity(), "i:" + i, Toast.LENGTH_SHORT).show();
}
}
});
不同于toList操作符的是,它會(huì)對(duì)產(chǎn)生的列表排序,默認(rèn)是自然升序。
③toMap操作符
toMap操作符實(shí)現(xiàn)的代碼如下所示:
//toMap操作符,將源Observable發(fā)送的數(shù)據(jù)作為Map集合中的值,需要值進(jìn)行鍵的定義
//將轉(zhuǎn)換完畢的Map集合在onNext方法中進(jìn)行發(fā)送
Observable.just("Alex","Payne").toMap(new Func1<String, Integer>() {
@Override
public Integer call(String s) {
return s.equals("Alex")?0:1;
}
}).subscribe(new Action1<Map<Integer, String>>() {
@Override
public void call(Map<Integer, String> convertMap) {
for (int i = 0; i < convertMap.size(); i++) {
Toast.makeText(getActivity(), convertMap.get(i), Toast.LENGTH_SHORT).show();
}
}
});
源Observable發(fā)送的數(shù)據(jù)作為鍵值對(duì)中的值,我們可以提供一個(gè)用于生成Map的Key的函數(shù),然后不同的鍵存儲(chǔ)源Observable發(fā)送的不同的值。
toMap默認(rèn)不在任何特定的調(diào)度器上執(zhí)行。

三、總結(jié)
1、在本文中,我結(jié)合項(xiàng)目代碼詳細(xì)介紹了部分RxJava的操作符,局部參照:RxJava中文翻譯文檔。
2、本文中的案例代碼已上傳Github,歡迎大家star、fork。詳情戳→GitHub案例代碼。
3、操作符實(shí)質(zhì)上就是RxJava函數(shù)式編程模式的體現(xiàn),Lambda表達(dá)式并且可以進(jìn)一步優(yōu)化RxJava。
4、在下篇文章中我會(huì)對(duì)于RxJava進(jìn)行深層次的剖析,還有RxJava結(jié)合例如Retrofit、RxBus等開源框架的內(nèi)容,希望本文對(duì)你在學(xué)習(xí)RxJava的路上有所啟發(fā)。