Rxjava2-二、操作符

Rxjava記錄總結(jié)操作符:創(chuàng)建操作符、轉(zhuǎn)換操作符、合并操作符、過濾操作符、其他操作符、條件操作符.

創(chuàng)建操作符

just
自動一次發(fā)送事件序列

Observable .just("1", "2", "3", "4", "5", "6", "7", "8", "9", "10") 
依次發(fā)送調(diào)用onNext(),最后默認(rèn)調(diào)用complete()

create
手動創(chuàng)建事件序列,返回一個可自由操作的emitter,有點是自用控制事件流程.

emitter.onNext(); 
emitter.onError(); 
emitter.onComplete();

fromIterable
傳入數(shù)組并按角標(biāo)一次發(fā)送事件

Observable.fromIterable(list),每次接收單個元素。

fromArray
傳入數(shù)組一次性發(fā)送,一次接收所有元素。

timer
延時發(fā)送事件

Observable .timer(2, TimeUnit.SECONDS)

interval
可取代CountDownTimer、Handler,5秒發(fā)送一次事件: 倒計時操作

Observable .interval(5, TimeUnit.SECONDS)

intervalRange
給事件更多的時間控制:
intervalRange(long start, long count, long initialDelay, long period, TimeUnit unit)
參數(shù)1:起始發(fā)送值
參數(shù)2:發(fā)送數(shù)量
參數(shù)3:首次發(fā)送延遲事件
參數(shù)4:每次發(fā)送事件間隔
參數(shù)5:時間單位

Range
依次發(fā)送范圍內(nèi)的事件

Observable.range(2, 6),接收類型Integer

轉(zhuǎn)換操作符

map
實現(xiàn)單個數(shù)據(jù)的轉(zhuǎn)換
實例:把網(wǎng)絡(luò)中ResponseBody用Gson轉(zhuǎn)換為相對應(yīng)的數(shù)據(jù)實體再下發(fā)給子類。

.map(new Function<Response, MobileAddress.class>() {
           @Override
           public MobileAddress apply(@NonNull Response response) throws Exception {
            if (response.isSuccessful()) {
                ResponseBody body = response.body();
                if (body != null) {
                    Log.e(TAG, "map:轉(zhuǎn)換前:" + response.body());
                    return new Gson().fromJson(body.string(), MobileAddress.class);
                }
            }
                    return null;
                }
            }).observeOn(AndroidSchedulers.mainThread())
            .doOnNext(new Consumer<MobileAddress>() {
             @Override
             public void accept(@NonNull MobileAddress s) throws Exception {
                 Log.e(TAG, "doOnNext: Number:" + s.getNumbser() + "\n");
             }
         })

flatMap和concatMap
兩個都可以實現(xiàn)數(shù)據(jù)集合中一對多事件的轉(zhuǎn)換,concatMap會按發(fā)送的順序獲取接收結(jié)果,flatMap可能是亂序接收(不確定哪個事件先完成)
一對多事假轉(zhuǎn)換:在flatMap集合中例如可以操作一個公司實體,并轉(zhuǎn)換為單個部門實體,返回在后續(xù)的accept中,又可以使用單個部門實體對每個成員進(jìn)行邏輯處理.
實例:

Observable.fromArray(1,2,3,4,5)
                .flatMap(new Function<Integer, ObservableSource<Integer>>() {
                    @Override
                    public ObservableSource<Integer> apply(@NonNull Integer integer) throws Exception {

                        int delay = 0;
                        if(integer == 3){
                            delay = 500;//延遲500ms
                        }
                        return Observable.just(integer *10).delay(delay, TimeUnit.MILLISECONDS);
                    }
                })
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<Integer>() {
            @Override
            public void accept(@NonNull Integer integer) throws Exception {
                Log.e("tag","accept:"+integer);
            }
        });

使用flatMap結(jié)果:10,20,40,30,50
使用contactMap結(jié)果:10,20,30,40,50

buffer
分批發(fā)送事件

Observable .just(1, 2, 3, 4, 5, 6) .buffer(2) 
發(fā)送1,2;發(fā)送3,4;在發(fā)送5,6

合并操作符

merge和contat
兩者都可以合并多個Observable事件,前者發(fā)送順序不確定(并行無序),后者按順序發(fā)送(串行有序)。
mergeArray和concatArray效果相同,適用于大于4個事件的情況。
實例:
定義cache和network兩個事件,先查看緩存是否有數(shù)據(jù),有即onNext去刷新頁面,沒有則onComplete讀取網(wǎng)絡(luò)數(shù)據(jù)。

 Observable.concat(cache,network)

concatDelayError和 mergeDelayError
兩者都可以在merge和contat操作中出現(xiàn)錯誤時停止發(fā)送當(dāng)前事件集合,但不影響合并中的另一個事件集合發(fā)送

zip

zip 操作符可以將多個 Observable 的數(shù)據(jù)結(jié)合為一個數(shù)據(jù)源再發(fā)射出去
實例:分別請求生日、地址、性別等信息后,將多個請求結(jié)果合成一個,再進(jìn)行UI更新。

....分別請求生日、地址...
Observable.zip(observable1, observable2, new BiFunction<Birth, Address, String>() {
            @Override
            public String apply(@NonNull Birth birth, @NonNull Address address) throws Exception {
                return "合并后的數(shù)據(jù)為 Birth:"+birth.getResult()+" Address:"+address.getResult();
            }
        }).subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(@NonNull String s) throws Exception {
                        Log.e(TAG, "accept: 成功:" + s+"\n");
                    }
                });

過濾操作符

filter: 自定義篩選條件,返回boolean
distinct : 去重
distinctUntilChanged: 過濾連續(xù)相同事件
skip,skipLast:跳過前n個事件或最后n個
take和takeLast:只接收前n個事件或最后n個
elementAt和elementAtOrError:前者只發(fā)送第n個,可設(shè)置默認(rèn)值,不拋異常;后者越界拋異常。
ignoreElements:只接收完成和報錯信息
distinct:去重
ofType:指定接收數(shù)據(jù)類型
throttleFirst/throttleLast:只接收指定時間內(nèi)第一個或最后一個事件

其他操作符

do

doOnEach() :當(dāng)Observable每發(fā)送一次事件就會調(diào)用一次(包含onNext(),onError(),onComplete())
doOnNext(): 執(zhí)行 onNext()前調(diào)用
doAfterNext(): 執(zhí)行onNext()后調(diào)用
doOnComplete():執(zhí)行onComplete()前調(diào)用
doOnError():執(zhí)行 onError()前調(diào)用
doOnTerminate(): 執(zhí)行終止(無論正常發(fā)送完畢/異常終止)
doFinally(): 最后執(zhí)行
doOnSubscribe():觀察者訂閱是調(diào)用
doOnUnScbscribe(): 觀察者取消訂閱時調(diào)用

onErrorReturn

捕獲錯誤返回,不發(fā)送后續(xù)事件

onExceptionResumeNext/onErrorResumeNext

捕獲錯誤跳過當(dāng)前事件同時不中斷發(fā)送后續(xù)事件。

retry

retry() : 出現(xiàn)錯誤時,讓被觀察者重新發(fā)送數(shù)據(jù)。若錯誤一直發(fā)生,則一直重新發(fā)送
retry(long time):與retry不同的書,若錯誤一直發(fā)生,被觀察者則一直重新發(fā)送數(shù)據(jù),但這持續(xù)重新發(fā)送有次數(shù)限制
retry(Predicate predicate) : 出現(xiàn)錯誤時,根據(jù)指定邏輯(可以捕獲到發(fā)生的錯誤)決定是否讓被觀察者重新發(fā)送數(shù)據(jù)
retry(new BiPredicate<Integer, Throwable>):出現(xiàn)錯誤時,根據(jù)指定邏輯(可以捕獲重發(fā)的次數(shù)和發(fā)生的錯誤)決定是否讓被觀察者重新發(fā)送數(shù)據(jù)
retry(long time,Predicate predicate) : 出現(xiàn)錯誤時,根據(jù)指定邏輯(可以捕獲到發(fā)生的錯誤)決定是否讓被觀察者重新發(fā)送數(shù)據(jù)。并且有持續(xù)重發(fā)的次數(shù)限制

retryUntil

遇到錯誤時根據(jù)制定規(guī)則選擇是否重發(fā)

retryWhen

遇到錯誤時,將發(fā)生的錯誤傳遞給一個新的被觀察者(Observable),并決定是否需要重新訂閱原始被觀察者(Observable)

repeat和repeatWhen

repeat重復(fù)發(fā)射 observable的數(shù)據(jù)序列,可以使無限次也可以是指定次數(shù).不傳時為重復(fù)無限次。
repeatWhen遇到錯誤選擇返回object給新觀察者或中止事件
返回參數(shù)選擇:
Observable.empty();
發(fā)送Complete事件,但不會回調(diào)觀察者的Complete()
onComplete()
直接完成。
Observable.error(new Throwable("不再重新訂閱事件"));
Observable.just(1);
繼續(xù)發(fā)送事件。

debounce

一定的時間內(nèi)沒有操作就會發(fā)送事件(只會發(fā)送最后一次操作的事件)
實例:

Observable.intervalRange(1, 2, 3, 4, TimeUnit.SECONDS) 
.debounce(2, TimeUnit.SECONDS) 
只有最后一個4的事件會被發(fā)送(2秒后)

條件操作符

all:判斷被觀察者所有事件是否滿足某個事件,如果全部滿足則返回true,都在返回false
takeUntil:
當(dāng)事件滿足設(shè)定的條件時,該事件的下一個事件不會被發(fā)送了。包含超過臨界條件的第一個事件
takeWhile:
當(dāng)事件滿足設(shè)定的條件時,發(fā)送事件
skipUntil
直到設(shè)定的條件事件發(fā)出之后,開始發(fā)送原始事件。
skipWhile
跳過while范圍內(nèi)事件
amb
多個Observable序列中,只發(fā)送第一個
contains
是否存在特定元素
exists
是否滿足特定條件
DefaultIfEmpty
如果沒有正常結(jié)束事件(onComlete執(zhí)行),返回默認(rèn)值
SequenceEqual
判斷兩個事件序列是否是相同的數(shù)據(jù),相同的順序,相同的終止?fàn)顟B(tài)

相似操作符對比

timer():用于創(chuàng)建Observable,延遲發(fā)送一次。
interval():用于創(chuàng)建Observable,跟TimerTask類似,用于周期性發(fā)送。
delay():用于事件流中,可以延遲發(fā)送事件流中的某一次發(fā)送。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

友情鏈接更多精彩內(nèi)容