上一篇:RxJava 源碼學(xué)習(xí)之過濾操作符
今天我們繼續(xù)學(xué)習(xí)RxJava的組合操作符。
StartWith
-
作用分析
StartWith操作符可以在發(fā)射一個Observable的數(shù)據(jù)之前先發(fā)射一個指定的數(shù)據(jù)序列。
操作符

-
示例代碼
可接受一個Iterable或者多個Observable作為函數(shù)的參數(shù)。
Javadoc: startWith(Iterable)
Javadoc: startWith(T) (最多接受九個參數(shù))
//測試代碼
Integer[] nums = {1,2,3,4};
Observable.from(nums)
.startWith(9,8)
.subscribe(new Subscriber<Integer>() {
@Override
public void onNext(Integer item) {System.out.println("Next: " + item); }
@Override
public void onError(Throwable error) { System.err.println("Error: " + error.getMessage()); }
@Override
public void onCompleted() { System.out.println("Sequence complete."); }
});
//###########################################
輸出結(jié)果:
Next: 9
Next: 8
Next: 1
Next: 2
Next: 3
Next: 4
Sequence complete.
Merge
-
作用分析
merge可以將多個Observables的輸出合并,就好像它們是一個單個的Observable一樣。merge可能會讓合并的Observables發(fā)射的數(shù)據(jù)交錯(有一個類似的操作符 Concat不會讓數(shù)據(jù)交錯,它會按順序一個接著一個發(fā)射多個Observables的發(fā)射物)。
注意:如果傳遞給merge的任何一個的Observable發(fā)射了onError通知終止了,merge操作符生成的Observable也會立即以onError通知終止。如果你想讓它繼續(xù)發(fā)射數(shù)據(jù),在最后才報告錯誤,可以使用mergeDelayError。
-
示例代碼
Observable<Integer> odds = Observable.just(1, 3, 5).subscribeOn(someScheduler); // subscribeOn 切換線程
Observable<Integer> evens = Observable.just(2, 4, 6);
Observable.merge(odds, evens)
.subscribe(new Subscriber<Integer>() {
@Override
public void onNext(Integer item) {
System.out.println("Next: " + item);
}
@Override
public void onError(Throwable error) {
System.err.println("Error: " + error.getMessage());
}
@Override
public void onCompleted() {
System.out.println("Sequence complete.");
}
});
//###########################################
輸出結(jié)果:
Next: 1
Next: 3
Next: 5
Next: 2
Next: 4
Next: 6
Sequence complete.
ZIP
-
作用分析
Zip操作符返回一個Obversable,它使用特定函數(shù)按順序結(jié)合兩個或多個Observables發(fā)射的數(shù)據(jù)項,然后它發(fā)射這個函數(shù)返回的結(jié)果。它按照嚴格的順序應(yīng)用這個函數(shù)。它只發(fā)射與發(fā)射數(shù)據(jù)項最少的那個Observable一樣多的數(shù)據(jù)。
下圖:把 shape(形狀)、size(尺寸) 和 color(顏色) 合并后,發(fā)射出來。

-
示例代碼
Observable.zip(
Observable.just("a1","a2","a3","a4","a5"),
Observable.just(1,2,3,4),
Observable.just("b1","b2","b3","b4","b5","b6"),
new Func3<String,Integer,String,String>(){
@Override
public String call(String s, Integer integer, String s2) {
return s+"_"+integer+"_"+s2;
}
}).subscribe(new Subscriber<String>() {
@Override
public void onCompleted() { System.out.println("onCompleted."); }
@Override
public void onError(Throwable e) { System.out.println("onError: " + e.getMessage()); }
@Override
public void onNext(String s) { System.out.println("onNext: " + s); }});
//############################################
輸出結(jié)果:
onNext: a1_1_b1
onNext: a2_2_b2
onNext: a3_3_b3
onNext: a4_4_b4
onCompleted.
Join
-
作用分析
任何時候,只要在另一個Observable發(fā)射的數(shù)據(jù)定義的時間窗口內(nèi),這個Observable發(fā)射了一條數(shù)據(jù),就結(jié)合兩個Observable發(fā)射的數(shù)據(jù)。
比如: ObservableA 在 5s內(nèi)發(fā)射一條數(shù)據(jù) dataA1, ObservableB 這時剛好也在發(fā)射數(shù)據(jù)dataB1,就把ObservableA 的數(shù)據(jù)dataA1和 ObservableB的數(shù)據(jù)dataB1合并一起發(fā)射;5s還沒結(jié)束,ObservableB又發(fā)射數(shù)據(jù)dataB2,就把ObservableA 的數(shù)據(jù)dataA1和 ObservableB的數(shù)據(jù)dataB2合并一起發(fā)射。

-
示例代碼
Javadoc: Join(Observable,Func1,Func1,Func2)
- 第二個Observable和源Observable結(jié)合。
- Func1參數(shù):在指定的由時間窗口定義時間間隔內(nèi),源Observable發(fā)射的數(shù)據(jù)和從第二個Observable發(fā)射的數(shù)據(jù)相互配合返回的Observable。
- Func1參數(shù):在指定的由時間窗口定義時間間隔內(nèi),第二個Observable發(fā)射的數(shù)據(jù)和從源Observable發(fā)射的數(shù)據(jù)相互配合返回的Observable。
- Func2參數(shù):定義已發(fā)射的數(shù)據(jù)如何與新發(fā)射的數(shù)據(jù)項相結(jié)合。
Observable<Integer> create1 = Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
for (int i = 0; i < 6; i++) {
subscriber.onNext(i);
try {
Thread.sleep(600);
} catch (InterruptedException e) {
subscriber.onError(e);
}
}
}}).subscribeOn(Schedulers.newThread());
Observable<String> create2 = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
for (int i = 0; i < 4; i++) {
subscriber.onNext("hello_"+ i);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
subscriber.onError(e);
}
}
}}).subscribeOn(Schedulers.newThread());
create1.join(create2,
new Func1<Integer, Observable<Long>>() {
@Override
public Observable<Long> call(Integer integer) {
return Observable.timer(1000, TimeUnit.MILLISECONDS);
}
},
new Func1<String, Observable<Long>>() {
@Override
public Observable<Long> call(String s) {
return Observable.timer(1000, TimeUnit.MILLISECONDS);
}
},
new Func2<Integer, String, String>() {
@Override
public String call(Integer integer1, String s) {
return integer1 + "-" + s;
}
}).subscribe(new Subscriber<String>() {
@Override
public void onCompleted() { System.out.println("onCompleted."); }
@Override
public void onError(Throwable e) { System.out.println("onError: " + e.getMessage()); }
@Override
public void onNext(String s) { System.out.println("onNext: " + s); }});
//##############################################
輸出結(jié)果:
onNext: 0-hello_0
onNext: 1-hello_0
onNext: 1-hello_1
onNext: 2-hello_1
onNext: 3-hello_1
onNext: 3-hello_2
onNext: 2-hello_2
onNext: 4-hello_2
onNext: 4-hello_3
onNext: 5-hello_3
Switch
有這樣一個復(fù)雜的場景就是在一個subscribe-unsubscribe的序列里我們能夠從一個Observable自動取消訂閱來訂閱一個新的Observable。
RxJava的switch(),正如定義的,將一個發(fā)射多個Observables的Observable轉(zhuǎn)換成另一個單獨的Observable,后者發(fā)射那些Observables最近發(fā)射的數(shù)據(jù)項。
給出一個發(fā)射多個Observables序列的源Observable,switch()訂閱到源Observable然后開始發(fā)射由第一個發(fā)射的Observable發(fā)射的一樣的數(shù)據(jù)。當源Observable發(fā)射一個新的Observable時,switch()立即取消訂閱前一個發(fā)射數(shù)據(jù)的Observable(因此打斷了從它那里發(fā)射的數(shù)據(jù)流)然后訂閱一個新的Observable,并開始發(fā)射它的數(shù)據(jù)。

結(jié)束語
ok,RxJava之組合操作符已經(jīng)學(xué)習(xí)完啦,當然這里都是分析一些常用的,想了解更多的操作符就去看RxJava官方文檔吧。
下一篇:RxJava 源碼學(xué)習(xí)之調(diào)度器Scheduler。