Rxjava記錄總結(jié)操作符:創(chuàng)建操作符、轉(zhuǎn)換操作符、合并操作符、過濾操作符、其他操作符、條件操作符.
創(chuàng)建操作符
just
自動一次發(fā)送事件序列
Observable .just("1", "2", "3", "4", "5", "6", "7", "8", "9", "10")
依次發(fā)送調(diào)用onNext(),最后默認(rèn)調(diào)用complete()
create
手動創(chuàng)建事件序列,返回一個可自由操作的emitter,有點是自用控制事件流程.
emitter.onNext();
emitter.onError();
emitter.onComplete();
fromIterable
傳入數(shù)組并按角標(biāo)一次發(fā)送事件
Observable.fromIterable(list),每次接收單個元素。
fromArray
傳入數(shù)組一次性發(fā)送,一次接收所有元素。
timer
延時發(fā)送事件
Observable .timer(2, TimeUnit.SECONDS)
interval
可取代CountDownTimer、Handler,5秒發(fā)送一次事件: 倒計時操作
Observable .interval(5, TimeUnit.SECONDS)
intervalRange
給事件更多的時間控制:
intervalRange(long start, long count, long initialDelay, long period, TimeUnit unit)
參數(shù)1:起始發(fā)送值
參數(shù)2:發(fā)送數(shù)量
參數(shù)3:首次發(fā)送延遲事件
參數(shù)4:每次發(fā)送事件間隔
參數(shù)5:時間單位
Range
依次發(fā)送范圍內(nèi)的事件
Observable.range(2, 6),接收類型Integer
轉(zhuǎn)換操作符
map
實現(xiàn)單個數(shù)據(jù)的轉(zhuǎn)換
實例:把網(wǎng)絡(luò)中ResponseBody用Gson轉(zhuǎn)換為相對應(yīng)的數(shù)據(jù)實體再下發(fā)給子類。
.map(new Function<Response, MobileAddress.class>() {
@Override
public MobileAddress apply(@NonNull Response response) throws Exception {
if (response.isSuccessful()) {
ResponseBody body = response.body();
if (body != null) {
Log.e(TAG, "map:轉(zhuǎn)換前:" + response.body());
return new Gson().fromJson(body.string(), MobileAddress.class);
}
}
return null;
}
}).observeOn(AndroidSchedulers.mainThread())
.doOnNext(new Consumer<MobileAddress>() {
@Override
public void accept(@NonNull MobileAddress s) throws Exception {
Log.e(TAG, "doOnNext: Number:" + s.getNumbser() + "\n");
}
})
flatMap和concatMap
兩個都可以實現(xiàn)數(shù)據(jù)集合中一對多事件的轉(zhuǎn)換,concatMap會按發(fā)送的順序獲取接收結(jié)果,flatMap可能是亂序接收(不確定哪個事件先完成)
一對多事假轉(zhuǎn)換:在flatMap集合中例如可以操作一個公司實體,并轉(zhuǎn)換為單個部門實體,返回在后續(xù)的accept中,又可以使用單個部門實體對每個成員進(jìn)行邏輯處理.
實例:
Observable.fromArray(1,2,3,4,5)
.flatMap(new Function<Integer, ObservableSource<Integer>>() {
@Override
public ObservableSource<Integer> apply(@NonNull Integer integer) throws Exception {
int delay = 0;
if(integer == 3){
delay = 500;//延遲500ms
}
return Observable.just(integer *10).delay(delay, TimeUnit.MILLISECONDS);
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer integer) throws Exception {
Log.e("tag","accept:"+integer);
}
});
使用flatMap結(jié)果:10,20,40,30,50
使用contactMap結(jié)果:10,20,30,40,50
buffer
分批發(fā)送事件
Observable .just(1, 2, 3, 4, 5, 6) .buffer(2)
發(fā)送1,2;發(fā)送3,4;在發(fā)送5,6
合并操作符
merge和contat
兩者都可以合并多個Observable事件,前者發(fā)送順序不確定(并行無序),后者按順序發(fā)送(串行有序)。
mergeArray和concatArray效果相同,適用于大于4個事件的情況。
實例:
定義cache和network兩個事件,先查看緩存是否有數(shù)據(jù),有即onNext去刷新頁面,沒有則onComplete讀取網(wǎng)絡(luò)數(shù)據(jù)。
Observable.concat(cache,network)
concatDelayError和 mergeDelayError
兩者都可以在merge和contat操作中出現(xiàn)錯誤時停止發(fā)送當(dāng)前事件集合,但不影響合并中的另一個事件集合發(fā)送
zip
zip 操作符可以將多個 Observable 的數(shù)據(jù)結(jié)合為一個數(shù)據(jù)源再發(fā)射出去
實例:分別請求生日、地址、性別等信息后,將多個請求結(jié)果合成一個,再進(jìn)行UI更新。
....分別請求生日、地址...
Observable.zip(observable1, observable2, new BiFunction<Birth, Address, String>() {
@Override
public String apply(@NonNull Birth birth, @NonNull Address address) throws Exception {
return "合并后的數(shù)據(jù)為 Birth:"+birth.getResult()+" Address:"+address.getResult();
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
Log.e(TAG, "accept: 成功:" + s+"\n");
}
});
過濾操作符
filter: 自定義篩選條件,返回boolean
distinct : 去重
distinctUntilChanged: 過濾連續(xù)相同事件
skip,skipLast:跳過前n個事件或最后n個
take和takeLast:只接收前n個事件或最后n個
elementAt和elementAtOrError:前者只發(fā)送第n個,可設(shè)置默認(rèn)值,不拋異常;后者越界拋異常。
ignoreElements:只接收完成和報錯信息
distinct:去重
ofType:指定接收數(shù)據(jù)類型
throttleFirst/throttleLast:只接收指定時間內(nèi)第一個或最后一個事件
其他操作符
do
doOnEach() :當(dāng)Observable每發(fā)送一次事件就會調(diào)用一次(包含onNext(),onError(),onComplete())
doOnNext(): 執(zhí)行 onNext()前調(diào)用
doAfterNext(): 執(zhí)行onNext()后調(diào)用
doOnComplete():執(zhí)行onComplete()前調(diào)用
doOnError():執(zhí)行 onError()前調(diào)用
doOnTerminate(): 執(zhí)行終止(無論正常發(fā)送完畢/異常終止)
doFinally(): 最后執(zhí)行
doOnSubscribe():觀察者訂閱是調(diào)用
doOnUnScbscribe(): 觀察者取消訂閱時調(diào)用
onErrorReturn
捕獲錯誤返回,不發(fā)送后續(xù)事件
onExceptionResumeNext/onErrorResumeNext
捕獲錯誤跳過當(dāng)前事件同時不中斷發(fā)送后續(xù)事件。
retry
retry() : 出現(xiàn)錯誤時,讓被觀察者重新發(fā)送數(shù)據(jù)。若錯誤一直發(fā)生,則一直重新發(fā)送
retry(long time):與retry不同的書,若錯誤一直發(fā)生,被觀察者則一直重新發(fā)送數(shù)據(jù),但這持續(xù)重新發(fā)送有次數(shù)限制
retry(Predicate predicate) : 出現(xiàn)錯誤時,根據(jù)指定邏輯(可以捕獲到發(fā)生的錯誤)決定是否讓被觀察者重新發(fā)送數(shù)據(jù)
retry(new BiPredicate<Integer, Throwable>):出現(xiàn)錯誤時,根據(jù)指定邏輯(可以捕獲重發(fā)的次數(shù)和發(fā)生的錯誤)決定是否讓被觀察者重新發(fā)送數(shù)據(jù)
retry(long time,Predicate predicate) : 出現(xiàn)錯誤時,根據(jù)指定邏輯(可以捕獲到發(fā)生的錯誤)決定是否讓被觀察者重新發(fā)送數(shù)據(jù)。并且有持續(xù)重發(fā)的次數(shù)限制
retryUntil
遇到錯誤時根據(jù)制定規(guī)則選擇是否重發(fā)
retryWhen
遇到錯誤時,將發(fā)生的錯誤傳遞給一個新的被觀察者(Observable),并決定是否需要重新訂閱原始被觀察者(Observable)
repeat和repeatWhen
repeat重復(fù)發(fā)射 observable的數(shù)據(jù)序列,可以使無限次也可以是指定次數(shù).不傳時為重復(fù)無限次。
repeatWhen遇到錯誤選擇返回object給新觀察者或中止事件
返回參數(shù)選擇:
Observable.empty();
發(fā)送Complete事件,但不會回調(diào)觀察者的Complete()
onComplete()
直接完成。
Observable.error(new Throwable("不再重新訂閱事件"));
Observable.just(1);
繼續(xù)發(fā)送事件。
debounce
一定的時間內(nèi)沒有操作就會發(fā)送事件(只會發(fā)送最后一次操作的事件)
實例:
Observable.intervalRange(1, 2, 3, 4, TimeUnit.SECONDS)
.debounce(2, TimeUnit.SECONDS)
只有最后一個4的事件會被發(fā)送(2秒后)
條件操作符
all:判斷被觀察者所有事件是否滿足某個事件,如果全部滿足則返回true,都在返回false
takeUntil:
當(dāng)事件滿足設(shè)定的條件時,該事件的下一個事件不會被發(fā)送了。包含超過臨界條件的第一個事件
takeWhile:
當(dāng)事件滿足設(shè)定的條件時,發(fā)送事件
skipUntil
直到設(shè)定的條件事件發(fā)出之后,開始發(fā)送原始事件。
skipWhile
跳過while范圍內(nèi)事件
amb
多個Observable序列中,只發(fā)送第一個
contains
是否存在特定元素
exists
是否滿足特定條件
DefaultIfEmpty
如果沒有正常結(jié)束事件(onComlete執(zhí)行),返回默認(rèn)值
SequenceEqual
判斷兩個事件序列是否是相同的數(shù)據(jù),相同的順序,相同的終止?fàn)顟B(tài)
相似操作符對比
timer():用于創(chuàng)建Observable,延遲發(fā)送一次。
interval():用于創(chuàng)建Observable,跟TimerTask類似,用于周期性發(fā)送。
delay():用于事件流中,可以延遲發(fā)送事件流中的某一次發(fā)送。