之前沒有了解過Rxjava的童鞋,建議先閱讀《Rxjava 從入門到開發(fā)》這篇文章,對入門比較有幫助。
StartWith操作符
作用:在數(shù)據(jù)序列的開頭增加一項(xiàng)數(shù)據(jù)

Observable.just(1,2).startWith(Observable.just(-1)).observeOn(Schedulers.io()).subscribe(new Action1<Integer>() {
@Override
public void call(Integer aLong) {
Logger.i(String.valueOf(aLong));
}
});
輸出結(jié)果:

例子說明:在Startwith的作用下-1輸出在結(jié)果的最前面。
CombineLatest操作符
作用:當(dāng)兩個Observables中的任何一個發(fā)射了數(shù)據(jù)時,使用一個函數(shù)結(jié)合每個Observable發(fā)射的最近數(shù)據(jù)項(xiàng),并且基于這個函數(shù)的結(jié)果發(fā)射數(shù)據(jù)。

//產(chǎn)生0,5,10數(shù)列
Observable<Long> observable1 = Observable.interval(1000, TimeUnit.MILLISECONDS)
.map(new Func1<Long, Long>() {
@Override
public Long call(Long aLong) {
return aLong * 5;
}
}).take(3);
//產(chǎn)生0,10,20數(shù)列
Observable<Long> observable2 = Observable.interval(500, TimeUnit.MILLISECONDS)
.map(new Func1<Long, Long>() {
@Override
public Long call(Long aLong) {
return aLong * 10;
}
}).take(3);
subscription=Observable.combineLatest(observable1, observable2, new Func2<Long, Long, Long>() {
@Override
public Long call(Long aLong, Long aLong2) {
System.out.println("aLong: " + aLong+" aLong2: "+aLong2);
return aLong+aLong2;
}
}).subscribe(new Subscriber<Long>() {
@Override
public void onCompleted() {
System.out.println("Sequence complete.");
}
@Override
public void onError(Throwable e) {
System.err.println("Error: " + e.getMessage());
}
@Override
public void onNext(Long aLong) {
System.out.println("Next: " + aLong);
}
});
輸出結(jié)果:

例子說明:observable1輸出的是0,5,10;observable2輸出的是0,10,20,輸出結(jié)果的關(guān)系如下圖:

數(shù)字的間隔就是時間差。
Join操作符
作用:join操作符把類似于combineLatest操作符,也是兩個Observable產(chǎn)生的結(jié)果進(jìn)行合并,合并的結(jié)果組成一個新的Observable,但是join操作符可以控制每個Observable產(chǎn)生結(jié)果的生命周期,在每個結(jié)果的生命周期內(nèi),可以與另一個Observable產(chǎn)生的結(jié)果按照一定的規(guī)則進(jìn)行合并。
join方法有四個參數(shù),具體定義如下:
observableA.join(observableB, observableA產(chǎn)生結(jié)果生命周期控制函數(shù), observableB產(chǎn)生結(jié)果生命周期控制函數(shù), observableA產(chǎn)生的結(jié)果與observableB產(chǎn)生的結(jié)果的合并規(guī)則)

//產(chǎn)生0,5數(shù)列
Observable<Long> observable1 = Observable.interval(1000, TimeUnit.MILLISECONDS)
.map(new Func1<Long, Long>() {
@Override
public Long call(Long aLong) {
return aLong * 5;
}
}).take(3);
//產(chǎn)生0,10,20數(shù)列
Observable<Long> observable2 = Observable.interval(1000, TimeUnit.MILLISECONDS)
.map(new Func1<Long, Long>() {
@Override
public Long call(Long aLong) {
return aLong * 10;
}
}).take(3);
subscription=observable1.join(observable2, new Func1<Long, Observable<Long>>() {
@Override
public Observable<Long> call(Long aLong) {
return Observable.just(aLong).delay(500, TimeUnit.MILLISECONDS);
}
}, new Func1<Long, Observable<Long>>() {
@Override
public Observable<Long> call(Long aLong) {
return Observable.just(aLong).delay(1000, TimeUnit.MILLISECONDS);
}
}, new Func2<Long, Long, Long>() {
@Override
public Long call(Long aLong, Long aLong2) {
System.out.println("aLong: " + aLong+" aLong2: "+aLong2);
return aLong + aLong2;
}
}).subscribe(new Subscriber<Long>() {
@Override
public void onCompleted() {
System.out.println("Sequence complete.");
}
@Override
public void onError(Throwable e) {
System.err.println("Error: " + e.getMessage());
}
@Override
public void onNext(Long aLong) {
System.out.println("Next: " + aLong);
}
});
輸出結(jié)果:

例子說明:其實(shí)輸出排列方式和combineLatest差不多,主要差別在通過join可以進(jìn)一步控制observable結(jié)果的輸出時間周期,另外順便一提的是如果observable1輸出的數(shù)據(jù)只有兩個,那么即使observable2輸出3個數(shù)據(jù),但最終輸出的也僅有兩個。
Merge操作符
作用:將多個Observable合并為一個。

//產(chǎn)生0,5數(shù)列
Observable<Long> observable1 = Observable.interval(1000, TimeUnit.MILLISECONDS)
.map(new Func1<Long, Long>() {
@Override
public Long call(Long aLong) {
return aLong * 5;
}
}).take(2);
//產(chǎn)生0,10,20數(shù)列
Observable<Long> observable2 = Observable.interval(1000, TimeUnit.MILLISECONDS)
.map(new Func1<Long, Long>() {
@Override
public Long call(Long aLong) {
return aLong * 10;
}
}).take(3);
subscription=Observable.merge(observable1,observable2).subscribe(new Action1<Long>() {
@Override
public void call(Long aLong) {
System.out.println("out: " + aLong);
}
});
輸出結(jié)果:

例子說明:把observable1,observable2的數(shù)據(jù)合并輸出。
switchOnNext操作符
作用:把一組Observable轉(zhuǎn)換成一個Observable,轉(zhuǎn)換規(guī)則為:對于這組Observable中的每一個Observable所產(chǎn)生的結(jié)果,如果在同一個時間內(nèi)存在兩個或多個Observable提交的結(jié)果,只取最后一個Observable提交的結(jié)果給訂閱者。

Observable<Observable<Long>> observable = Observable.interval(500, TimeUnit.MILLISECONDS).map(new Func1<Long, Observable<Long>>() {
@Override
public Observable<Long> call(Long aLong) {
//每隔200毫秒產(chǎn)生一組數(shù)據(jù)(0,10,20,30,40)
return Observable.interval(200, TimeUnit.MILLISECONDS).map(new Func1<Long, Long>() {
@Override
public Long call(Long aLong) {
return aLong * 10;
}
}).take(3);
}
}).take(2);
subscription=Observable.switchOnNext(observable).subscribe(new Action1<Long>() {
@Override
public void call(Long aLong) {
System.out.println("Next:" + aLong);
}
});
輸出結(jié)果:

例子說明:沒人Switch的作用下,輸出結(jié)果為“0,10,20,0,10,20”,但是在Switch的作用下,結(jié)果為“0,10,0,10,20”,少了“20”,這就是switch的作用。
zip操作符
作用:把兩個observable提交的結(jié)果,嚴(yán)格按照順序進(jìn)行合并。

Observable<Long> observable1= Observable.interval(1, TimeUnit.SECONDS).take(3);
Observable<Long> observable2= Observable.interval(1, TimeUnit.SECONDS).take(4);
subscription=Observable.zip(observable1, observable2, new Func2<Long, Long, Integer>() {
@Override
public Integer call(Long aLong, Long aLong2) {
System.out.println("aLong:" + aLong+" aLong2: "+aLong2);
return (int) (aLong+aLong);
}
}).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
System.out.println("result:" + integer);
}
});
輸出結(jié)果:

例子說明:最終輸出為“0,2,4”,兩個Observable輸出的數(shù)據(jù)一一按順序?qū)?yīng),無法對應(yīng)的數(shù)據(jù)就拋棄,例子中的observable2本應(yīng)該輸出4個數(shù)據(jù)take(4)但是由于操作符zip的作用下,最終只輸出3個數(shù)據(jù)。
以上便是結(jié)合操作符的主要內(nèi)容了。rxjava的主要操作符講了差不多,有啥問題歡迎大家留言交流下??,歡迎關(guān)注。
附錄:
文章demo