1.startWith:給你被觀察者的數(shù)據(jù)流前再增加一點(diǎn)同類型的數(shù)據(jù)或者增加一個(gè)數(shù)據(jù)流

Observable.just("a", "b", "c").startWith("呵呵", "哦哦")
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
Log.e("Tag", "call: " + s);
}
});
or
Observable.just("a", "b", "c")
.startWith(Observable.from(new String[]{"呵呵", "哦哦"}))
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
Log.e("Tag", "call: " + s);
}
});
輸出日志:
07-19 15:50:50.023 27691-27691/com.example.audiotalk E/Tag: call: 呵呵
07-19 15:50:50.023 27691-27691/com.example.audiotalk E/Tag: call: 哦哦
07-19 15:50:50.023 27691-27691/com.example.audiotalk E/Tag: call: a
07-19 15:50:50.023 27691-27691/com.example.audiotalk E/Tag: call: b
07-19 15:50:50.023 27691-27691/com.example.audiotalk E/Tag: call: c
2.merge:把多個(gè)被觀察者合并到一個(gè)被觀察者身上輸出,但是可能會(huì)讓合并的被觀察者發(fā)射的數(shù)據(jù)交錯(cuò)。

Observable.merge(Observable.just("a", "b", "c"), Observable.just("d", "e", "f"),
Observable.just("g", "h", "i"))
.subscribe(s -> Log.e("Tag", "merge: " + s));
輸出日志:
07-19 15:55:51.304 29657-29657/com.example.audiotalk E/Tag: merge: a
07-19 15:55:51.304 29657-29657/com.example.audiotalk E/Tag: merge: b
07-19 15:55:51.304 29657-29657/com.example.audiotalk E/Tag: merge: c
07-19 15:55:51.305 29657-29657/com.example.audiotalk E/Tag: merge: d
07-19 15:55:51.305 29657-29657/com.example.audiotalk E/Tag: merge: e
07-19 15:55:51.305 29657-29657/com.example.audiotalk E/Tag: merge: f
07-19 15:55:51.305 29657-29657/com.example.audiotalk E/Tag: merge: g
07-19 15:55:51.305 29657-29657/com.example.audiotalk E/Tag: merge: h
07-19 15:55:51.305 29657-29657/com.example.audiotalk E/Tag: merge: i
3.concat:同是合并數(shù)據(jù),但是嚴(yán)格按照順序發(fā)射,一個(gè)被觀察者數(shù)據(jù)發(fā)送完前不會(huì)發(fā)送后一個(gè)被觀察者的數(shù)據(jù)。所以說(shuō)concat肯定是不會(huì)發(fā)生數(shù)據(jù)交錯(cuò)發(fā)射的情況。

Observable.concat(Observable.just("a", "b", "c"), Observable.just("d", "e", "f"),
Observable.just("g", "h","i"))
.subscribe(s -> Log.e("concat", "initTest: " + s));
輸出日志:
07-19 15:59:42.885 30429-30429/com.example.audiotalk E/concat: initTest: a
07-19 15:59:42.885 30429-30429/com.example.audiotalk E/concat: initTest: b
07-19 15:59:42.885 30429-30429/com.example.audiotalk E/concat: initTest: c
07-19 15:59:42.885 30429-30429/com.example.audiotalk E/concat: initTest: d
07-19 15:59:42.885 30429-30429/com.example.audiotalk E/concat: initTest: e
07-19 15:59:42.886 30429-30429/com.example.audiotalk E/concat: initTest: f
07-19 15:59:42.886 30429-30429/com.example.audiotalk E/concat: initTest: g
07-19 15:59:42.886 30429-30429/com.example.audiotalk E/concat: initTest: h
07-19 15:59:42.886 30429-30429/com.example.audiotalk E/concat: initTest: i
4.zip:合并多個(gè)被觀察者發(fā)出的數(shù)據(jù)項(xiàng),但是如果一個(gè)被觀察者的數(shù)據(jù)更多,多出來(lái)的那部分不會(huì)被發(fā)送。(zipWith和zip類似)
此操作符適用情況:比如說(shuō)我們?cè)谡?qǐng)求第三個(gè)接口的時(shí)候需要用到前兩個(gè)接口請(qǐng)求成功之后返回的數(shù)據(jù),這時(shí)候我們可能會(huì)想到使用嵌套請(qǐng)求,在第一個(gè)接口請(qǐng)求成功之后,再請(qǐng)求第二個(gè)接口,第二個(gè)接口請(qǐng)求成功之后,再請(qǐng)求第三個(gè)接口。但是這樣嵌套起來(lái)代碼可讀性非常差。使用zip操作符就清晰明朗很多。
zip

Observable.zip(Observable.just(1), Observable.just(2), new Func2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer integer, Integer integer2) {
return integer+integer2;
}
}).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
// TODO: 2018/7/19 0019 do next request
}
});
zipWith
Observable.just(1).zipWith( Observable.just(2), new Func2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer integer, Integer integer2) {
return integer+integer2;
}
}).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
// TODO: 2018/7/19 0020 do next request
}
});
5.combineLatest:他可以組合兩個(gè)Observable,進(jìn)行一定的操作之后,再次發(fā)射下去
它繼續(xù)發(fā)射的前提是:其中的一個(gè)Observable還有數(shù)據(jù)沒(méi)有發(fā)射,那么,他講兩個(gè)Observable目前最新發(fā)射的數(shù)據(jù)組合在一起,比如上面,第一個(gè)Observable最新的數(shù)據(jù)是8,然后第二個(gè)的依次在變,然后再把他們組合在一起。8&&10 8&&11 ...

Observable.combineLatest(Observable.range(7, 2),
Observable.range(10, 4), new Func2<Integer, Integer, String>() {
@Override
public String call(Integer integer, Integer integer2) {
return integer + "&&" + integer2;
}
}).subscribe(new Action1<String>() {
@Override
public void call(String s) {
Log.e("combineLatest", s);
}
});
輸出日志:
07-19 17:08:56.778 6545-6545/com.example.audiotalk E/combineLatest: 8&&10
07-19 17:08:56.778 6545-6545/com.example.audiotalk E/combineLatest: 8&&11
07-19 17:08:56.778 6545-6545/com.example.audiotalk E/combineLatest: 8&&12
07-19 17:08:56.778 6545-6545/com.example.audiotalk E/combineLatest: 8&&13
6.retryWhen:請(qǐng)求失敗重試
下面是我自定義的一個(gè)請(qǐng)求失敗重試的類,供參考(可根據(jù)請(qǐng)求失敗返回的異常信息,自定義是否需要重試請(qǐng)求)

使用方法:
Observable.just(2)//這里換成自己的接口請(qǐng)求
.retryWhen(new RetryWhenProcess(5, 3))//請(qǐng)求失敗之后延時(shí)5,秒重試,總共重試次數(shù)3次
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
}
}, new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
}
});
封裝的請(qǐng)求失敗重試類
public static class RetryWhenProcess implements Func1<Observable<? extends Throwable>, Observable<?>> {
private long mInterval;//請(qǐng)求失敗延時(shí)幾秒之后重試
private int times;//重試次數(shù)
private int count;
public RetryWhenProcess(long interval, int times) {
mInterval = interval;
this.times = times;
}
@Override
public Observable<?> call(Observable<? extends Throwable> observable) {
return observable.flatMap((Func1<Throwable, Observable<?>>) throwable -> {
if (throwable instanceof UnknownHostException/*||throwable instanceof SocketTimeoutException*/) {
return Observable.error(throwable);
}
if (this.times <= count) {
return Observable.error(throwable);
}
count++;
return Observable.just(Observable.timer((long) Math.pow(mInterval, 1), TimeUnit.SECONDS));
});
}
}