上一篇:RxJava 源碼學(xué)習(xí)之創(chuàng)建操作符
本篇將通過源碼來學(xué)習(xí)下RxJava 的變換操作符,期待與大家一起探討學(xué)習(xí)。
Buffer
-
作用分析
Buffer 可義簡(jiǎn)單地理解為緩存,它定期從Observable收集數(shù)據(jù)到一個(gè)集合,然后把這些數(shù)據(jù)集合打包發(fā)射,而不是一次發(fā)射一個(gè)。
-
代碼示例(一)
buffer(count)
以列表(List)的形式發(fā)射非重疊的緩存,每一個(gè)緩存至多包含來自原始Observable的count項(xiàng)數(shù)據(jù)(最后發(fā)射的列表數(shù)據(jù)可能少于count項(xiàng))

//測(cè)是代碼:
Integer[] nums = {1,2,3,4,5};
Observable.from(nums).buffer(2)
.subscribe(new Subscriber<List<Integer>>() {
@Override
public void onCompleted() { System.out.println("Sequence complete."); }
@Override
public void onError(Throwable error) { System.err.println("Error: " + error.getMessage()); }
@Override
public void onNext(List<Integer> integers) {
System.out.print("integers = ");
for (Integer i: integers) {
System.out.print(i +" ");
}
System.out.println();
}
});
###################################################
輸出結(jié)果:
integers = 1 2
integers = 3 4
integers = 5
Sequence complete.
-
代碼示例(二)
buffer(count,?skip)
從原始Observable的第一項(xiàng)數(shù)據(jù)開始創(chuàng)建新的緩存,此后每當(dāng)收到skip
項(xiàng)數(shù)據(jù),用count項(xiàng)數(shù)據(jù)填充緩存:開頭的一項(xiàng)和后續(xù)的count-1
項(xiàng),它以列表(List)的形式發(fā)射緩存,取決于count和skip的值,這些緩存可能會(huì)有重疊部分(比如skip < count時(shí)),也可能會(huì)有間隙(比如skip > count時(shí))。

Integer[] nums = {1,2,3,4,5,6,7,8,9};
Observable.from(nums).buffer(2,3)
.subscribe(new Subscriber<List<Integer>>() {
@Override
public void onCompleted() { System.out.println("Sequence complete."); }
@Override
public void onError(Throwable error) { System.err.println("Error: " + error.getMessage()); }
@Override
public void onNext(List<Integer> integers) {
System.out.print("integers = ");
for (Integer i: integers) {
System.out.print(i +" ");
}
System.out.println();
}
});
###################################################
輸出結(jié)果:
integers = 1 2
integers = 4 5
integers = 7 8
Sequence complete.
Map
-
作用分析
Map操作符對(duì)原始Observable發(fā)射的每一項(xiàng)數(shù)據(jù)應(yīng)用一個(gè)你選擇的函數(shù),然后返回一個(gè)發(fā)射這些結(jié)果的Observable。

-
代碼示例
//將Integer 轉(zhuǎn)成 String 輸出
Integer[] nums = {1,2,3,4};
Observable.from(nums).map(new Func1<Integer, String>() {
@Override
public String call(Integer i) { return "this is i="+i; }})
.subscribe(new Subscriber<String>() {
@Override
public void onCompleted() { System.out.println("Sequence complete."); }
@Override
public void onError(Throwable e) { System.err.println("Error: " + e.getMessage()); }
@Override
public void onNext(String s) { System.out.println("onNext = " + s); }});
###################################
輸出結(jié)果:
onNext = this is i=1
onNext = this is i=2
onNext = this is i=3
onNext = this is i=4
Sequence complete.
FlatMap
-
作用分析
FlatMap操作符使用一個(gè)指定的函數(shù)對(duì)原始Observable發(fā)射的每一項(xiàng)數(shù)據(jù)執(zhí)行變換操作,這個(gè)函數(shù)返回一個(gè)本身也發(fā)射數(shù)據(jù)的Observable,然后FlatMap合并這些Observables發(fā)射的數(shù)據(jù),最后將合并后的結(jié)果當(dāng)做它自己的數(shù)據(jù)序列發(fā)射。
注意:FlatMap對(duì)這些Observables發(fā)射的數(shù)據(jù)做的是合并(merge)操作,因此它們可能是交錯(cuò)的。如果想按順序輸出,可以用 concatMap
。
比較:RxJava Observable transformation: concatMap() vs. flatMap()
注意:如果任何一個(gè)通過這個(gè)flatMap操作產(chǎn)生的單獨(dú)的Observable調(diào)用onError異常終止了,這個(gè)Observable自身會(huì)立即調(diào)用onError并終止。

-
代碼示例
//測(cè)是代碼
Integer[] nums = {1,2,3,4};
Observable.from(nums).flatMap(new Func1<Integer, Observable<String>>() {
@Override
public Observable<String> call(Integer integer) {
return Observable.just("this is i="+integer.intValue());
}}).subscribe(new Subscriber<String>() {
@Override
public void onCompleted() { System.out.println("Sequence complete."); }
@Override
public void onError(Throwable e) { System.err.println("Error: " + e.getMessage()); }
@Override
public void onNext(String s) { System.out.println("onNext = " + s); }});
########################################
輸出結(jié)果:
onNext = this is i=1
onNext = this is i=2
onNext = this is i=3
onNext = this is i=4
Sequence complete.
Window
-
作用分析
Window和Buffer類似,但不是發(fā)射來自原始Observable的數(shù)據(jù)包,它發(fā)射的是Observables,這些Observables中的每一個(gè)都發(fā)射原始Observable數(shù)據(jù)的一個(gè)子集,最后發(fā)射一個(gè)onCompleted通知。

Scan
-
作用分析
Scan操作符對(duì)原始Observable發(fā)射的第一項(xiàng)數(shù)據(jù)應(yīng)用一個(gè)函數(shù),然后將那個(gè)函數(shù)的結(jié)果作為自己的第一項(xiàng)數(shù)據(jù)發(fā)射。它將函數(shù)的結(jié)果同第二項(xiàng)數(shù)據(jù)一起填充給這個(gè)函數(shù)來產(chǎn)生它自己的第二項(xiàng)數(shù)據(jù)。它持續(xù)進(jìn)行這個(gè)過程來產(chǎn)生剩余的數(shù)據(jù)序列。

-
代碼示例
//測(cè)是代碼
/**
* 累加操作
**/
Observable.just(1, 2, 3, 4, 5)
.scan(new Func2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer sum, Integer item) {
return sum + item;
}
}).subscribe(new Subscriber<Integer>() {
@Override
public void onNext(Integer item) { System.out.println("Next: " + item); }
@Override
public void onError(Throwable error) { System.err.println("Error: " + error.getMessage()); }
@Override
public void onCompleted() { System.out.println("Sequence complete."); }});
################################################
輸出結(jié)果:
Next: 1
Next: 3
Next: 6
Next: 10
Next: 15
Sequence complete.
參考文檔:
結(jié)束語
RxJava之變換操作符到此就學(xué)完啦,下一篇就開始學(xué)習(xí)RxJava之過濾操作符吧 :P