RxAndroid常用操作符

RxAndroid入門一文中,我們可以知道,RxJava主要構(gòu)建了一個主要應(yīng)用于異步場景、通過觀察者模式及使用響應(yīng)式編程和函數(shù)式編程規(guī)范實現(xiàn)的生產(chǎn)者-消費者模型:

  1. 事件生產(chǎn):生產(chǎn)者是Observable,Observable對象的創(chuàng)建除了最基礎(chǔ)的create方法之外,還可以使用justfrom、zip等其他多個方法。RxJava中提供了大量創(chuàng)建Observable的工廠方法,按需取用。
  2. 事件消費:消費者是Observer及其拓展,使用subcribe關(guān)聯(lián)生產(chǎn)者。
  3. 事件加工:這是我們在RxAndroid入門一文中沒有提到的,RxJava提供了大量的方法(如map、filter)對事件進行加工。

事件生產(chǎn)和加工的方法一般也被稱為操作符,RxJava提供的大量操作符不可能全部記住,因此只需要記住它們的作用,具體應(yīng)用時再查閱代碼或相關(guān)資料。接著,我們來看下一些常見的操作符。

創(chuàng)建Observable

create

方法原型如下:

static <T> Observable<T> create(ObservableOnSubscribe<T> source);

這個我們在入門時已經(jīng)使用過了,它是創(chuàng)建Observable對象最基礎(chǔ)的方法,需要實現(xiàn)ObservableOnSubscribe接口,然后在ObservableOnSubscribe.subscribe中生產(chǎn)和交付事件。

just

justcreate的簡化操作,它直接將需要由Emitter發(fā)射出去的數(shù)據(jù)在初始化時傳入,有Observer訂閱后,Observable會自動依次發(fā)射這些數(shù)據(jù)并最后調(diào)用onComplete。示例如下:

Observable.just(1, 2, 3).subscribe(
        (Integer i) -> log("onNext:" + i),
        (Throwable e) -> e.printStackTrace(),
        () -> log("onComplete"));

輸出日志如下:

onNext:1
onNext:2
onNext:3
onComplete

amb

方法原型如下:

static <T> Observable<T> amb(Iterable<? extends ObservableSource<? extends T>> sources);
static <T> Observable<T> ambArray(ObservableSource<? extends T>... sources);

所有Observable中,只有最早生產(chǎn)出事件的那個Observable的事件能夠被Observer消費。即消費者選定最優(yōu)的生產(chǎn)者,拋棄其他生產(chǎn)者,判定條件是事件的生產(chǎn)順序。

Observable.ambArray(
        Observable.create((ObservableEmitter<String> emitter) -> emitter.onNext("1")).delay(1, TimeUnit.SECONDS),
        Observable.create((ObservableEmitter<String> emitter) -> emitter.onNext("2"))
).subscribe((String s) -> log("onNext:" + s));

上述例子中的日志輸出結(jié)果是onNext:2,如果刪除第一個Observabledelay調(diào)用,那么輸出就是onNext:1。此外,還有一個成員方法abmWith可以在創(chuàng)建出Observable之后再使用,用來動態(tài)添加source。

使用場景:

同時向多個ip發(fā)起請求,最快響應(yīng)的那個服務(wù)器將作為后續(xù)訪問的節(jié)點。

concat

方法原型如下(方法有點多,參數(shù)數(shù)量不一樣但功能完全一樣的只列出其中一個):

static <T> Observable<T> concat(Iterable<? extends ObservableSource<? extends T>> sources);
static <T> Observable<T> concat(ObservableSource<? extends ObservableSource<? extends T>> sources, int prefetch);
static <T> Observable<T> concat(ObservableSource<? extends T> source1, ObservableSource<? extends T> source2, ObservableSource<? extends T> source3, ObservableSource<? extends T> source4)
static <T> Observable<T> concatArray(ObservableSource<? extends T>... sources);
static <T> Observable<T> concatArrayDelayError(ObservableSource<? extends T>... sources);
static <T> Observable<T> concatArrayEager(int maxConcurrency, int prefetch, ObservableSource<? extends T>... sources);
static <T> Observable<T> concatArrayEagerDelayError(int maxConcurrency, int prefetch, ObservableSource<? extends T>... sources);
static <T> Observable<T> concatDelayError(ObservableSource<? extends ObservableSource<? extends T>> sources, int prefetch, boolean tillTheEnd);
static <T> Observable<T> concatEager(Iterable<? extends ObservableSource<? extends T>> sources, int maxConcurrency, int prefetch);

使用concat連接的所有Observable會串行執(zhí)行,當上一個Observable執(zhí)行結(jié)束后,即觸發(fā)了onCompleteonError(如果可能觸發(fā)onError記得設(shè)置異常處理器)后,下一個Observable才開始執(zhí)行(是否立即執(zhí)行取決于是否設(shè)置了delay)。還記得入門時講解響應(yīng)式編程中提到的例子嗎(不記得的話再翻一下),用RxJava實現(xiàn)如下:

Observable.concat(
        Observable.create((ObservableEmitter<Runnable> emitter) -> {
            emitter.onNext(new TaskA());
            emitter.onComplete();
        }),
        Observable.create((ObservableEmitter<Runnable> emitter) -> {
            emitter.onNext(new TaskB());
            emitter.onComplete();
        }).delay(1, TimeUnit.SECONDS),
        Observable.create((ObservableEmitter<Runnable> emitter) -> {
            emitter.onNext(new TaskC());
            emitter.onComplete();
        }).delay(1, TimeUnit.SECONDS)
).subscribe((Runnable runnable) -> runnable.run(), (Throwable e) -> e.printStackTrace());

應(yīng)用場景:
concat的應(yīng)用場景非常廣,實際項目中,任務(wù)(或業(yè)務(wù))之間的線性依賴關(guān)系很普遍。

merge

mergeconcat用法類似(方法原型這里就不貼了),差別在于concat是串行的,而merge是并行的。所有Observable并行運行,直到它們?nèi)坑|發(fā)onComplete后,Observer才會觸發(fā)onComplete,示例如下:

Observable.merge(
        Observable.create((ObservableEmitter<String> emitter) -> {
            emitter.onNext("A");
            try {
                Thread.sleep(1000);
            } catch (Exception e) {
            }
            emitter.onNext("B");
            emitter.onComplete();
        }).subscribeOn(Schedulers.newThread()),
        Observable.create((ObservableEmitter<String> emitter) -> {
            try {
                Thread.sleep(500);
            } catch (Exception e) {
            }
            emitter.onNext("5");
            emitter.onComplete();
        }).subscribeOn(Schedulers.newThread())
).subscribe(
        (String s) -> log("onNext:" + s),
        (Throwable e) -> e.printStackTrace(),
        () -> log("onComplete"));

日志輸出如下:

onNext:A
onNext:5
onNext:B
onComplete

從日志中可以看到2個Observable是并行運行的(需要手動設(shè)置不同的調(diào)度線程),并且Observer只回調(diào)了一次onComplete,且是在2個Observable都觸發(fā)了onComplete之后回調(diào)。

使用場景:

mergeconcat應(yīng)該是使用場景最廣泛的兩種操作了。當一個任務(wù)(假設(shè)為C)依賴于其它多個任務(wù)時(假設(shè)為A、B),而A、B之間又沒有相互依賴關(guān)系,為了保證效率,A、B顯然需要并行運行,等到A、B都運行結(jié)束了就運行C。

zip

方法原型如下(方法有點多,參數(shù)數(shù)量不一樣但功能完全一樣的只列出其中一個):

static <T1, T2, T3, T4, T5, T6, T7, T8, T9, R> Observable<R> zip(
            ObservableSource<? extends T1> source1, ObservableSource<? extends T2> source2, ObservableSource<? extends T3> source3,
            ObservableSource<? extends T4> source4, ObservableSource<? extends T5> source5, ObservableSource<? extends T6> source6,
            ObservableSource<? extends T7> source7, ObservableSource<? extends T8> source8, ObservableSource<? extends T9> source9,
            Function9<? super T1, ? super T2, ? super T3, ? super T4, ? super T5, ? super T6, ? super T7, ? super T8, ? super T9, ? extends R> zipper);
static <T, R> Observable<R> zipArray(Function<? super Object[], ? extends R> zipper,
            boolean delayError, int bufferSize, ObservableSource<? extends T>... sources);
static <T, R> Observable<R> zipIterable(Iterable<? extends ObservableSource<? extends T>> sources,
            Function<? super Object[], ? extends R> zipper, boolean delayError, int bufferSize);     

zip將多個Observable的事件組合在一起,然后再依次發(fā)射。這話不是很容易理解,我們先來看下例子:

Observable.zip(
        Observable.create((ObservableEmitter<String> emitter) -> {
            emitter.onNext("A");
            emitter.onNext("B");
            emitter.onNext("C");
        }),
        Observable.create((ObservableEmitter<Integer> emitter) -> {
            emitter.onNext(1);
            emitter.onNext(2);
        }),
        (String t1, Integer t2) -> new Pair<String, Integer>(t1, t2)
).subscribe((Pair<String, Integer> result) -> log("onNext:" + result.first + result.second));

日志輸出如下:

onNext:A1
onNext:B2

這里我們使用的是3個參數(shù)的zip方法,前面2個參數(shù)傳入Observable實例,以下簡稱O1和O2,第3個參數(shù)傳入BiFunction實例。BiFunction只有一個apply方法,apply方法攔截O1、O2的事件發(fā)射,參數(shù)1表示O1發(fā)射的事件,參數(shù)2表示O2發(fā)射的事件,返回值是兩個事件的打包(打包類型自已定),apply方法的作用就是進行事件打包。打包后的事件會依次發(fā)射給Observer,且打包必須依次一一對應(yīng),如果一方發(fā)射了m個事件,另一個方只發(fā)射了n個事件(n<m),那么最終發(fā)射給Observer的事件將只有n個。

事件加工

上面列舉了創(chuàng)建Observable的幾種方式,事實上遠不止以上幾種,但以上幾種是比較常見,接下來我們來看下常見的事件加工。

filter

filter是事件加工中最簡易懂的操作了,它攔截Observable發(fā)射出來的事件,并將其中不符合要求的事件濾掉,只發(fā)射滿足要求的事件給Observer。看個例子:

Observable.create((ObservableEmitter<Integer> emitter) -> {
    emitter.onNext(1);
    emitter.onNext(2);
    emitter.onNext(3);
}).filter((Integer i) -> i > 1)
        .subscribe((Integer i) -> log("onNext:" + i));

日志輸出如下:

onNext:2
onNext:3

map

map攔截Observable發(fā)射出來的事件,將事件轉(zhuǎn)化為其他數(shù)據(jù)后再發(fā)射給Observer,示例如下:

Observable.create((ObservableEmitter<Integer> emitter) -> {
    emitter.onNext(1);
    emitter.onNext(2);
}).map((Integer i) -> "this is " + i)
        .subscribe((String s) -> log("onNext:" + s));

日志輸出如下:

onNext:this is 1
onNext:this is 2

flatMap

flatMapmap類似,都是進行數(shù)據(jù)的轉(zhuǎn)化,差別在于flatMap將數(shù)據(jù)轉(zhuǎn)化為Observable對象,這些Observable取代原有的Observable作為生產(chǎn)者提供數(shù)據(jù),使用map中的示例修改后代碼如下:

Observable.just(1, 2)
        .flatMap((Integer i) -> Observable.just("this is " + i))
        .subscribe((String s) -> log("onNext:" + s));

日志輸出如下:

onNext:this is 1
onNext:this is 2

僅通過上面的例子很難了解flatMap的用處,因為map也可以做到,其實flatMap適合處理更為復(fù)雜的數(shù)據(jù),如多重列表。假設(shè)有一群小朋友聚在一起玩耍,每個人都需要拿出自己的玩具,然后我們需要統(tǒng)計下這些玩具,代碼如下:

class Toy {
    String name;
    Toy(String name) {
        this.name = name;
    }
}

class Kid {
    List<Toy> toys;
}

List<Kid> kids() {
    List<Kid> kids = new LinkedList<>();
    Kid kid = new Kid();
    kid.toys = new LinkedList<>();
    kid.toys.add(new Toy("熊大"));
    kid.toys.add(new Toy("熊二"));
    kids.add(kid);
    kid = new Kid();
    kid.toys = new LinkedList<>();
    kid.toys.add(new Toy("水槍"));
    kids.add(kid);
    kid = new Kid();
    kid.toys = new LinkedList<>();
    kid.toys.add(new Toy("足球"));
    kid.toys.add(new Toy("扭扭車"));
    kids.add(kid);
    return kids;
}

void test() {
    Observable.fromIterable(kids())
            .flatMap((Kid kid) -> Observable.fromIterable(kid.toys))
            .subscribe((Toy toy) -> log("onNext:" + toy.name));
}

執(zhí)行test方法輸出日志如下:

onNext:熊大
onNext:熊二
onNext:水槍
onNext:足球
onNext:扭扭車

concatMap

concatMapflatMap的作用類似,差別在于前者輸出的數(shù)據(jù)順序與原始數(shù)據(jù)保持一致(使用concat),而后者不保證(使用merge),我們來看個例子:

Observable<Integer> o1 = Observable.just(1, 2, 3, 4, 5)
        .flatMap((Integer i) -> {
            if(i == 3) {
                return Observable.just(i).delay(1, TimeUnit.SECONDS);
            } else {
                return Observable.just(i);
            }
        });
Observable<Integer> o2 = Observable.just(-1, -2, -3, -4, -5)
        .concatMap((Integer i) -> {
            if(i == -3) {
                return Observable.just(i).delay(1, TimeUnit.SECONDS);
            } else {
                return Observable.just(i);
            }
        });
Observable.concat(o1, o2).subscribe((Integer i) -> log("onNext:" + i));

日志輸出如下:

onNext:1
onNext:2
onNext:4
onNext:5
onNext:3
onNext:-1
onNext:-2
onNext:-3
onNext:-4
onNext:-5
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

友情鏈接更多精彩內(nèi)容