Android Develop——RxJava2(二) RxJava2牛X到不行的操作符

在RxJava2(一)教程中,已經(jīng)跟著大神們學(xué)習(xí)了RxJava2的基本使用,現(xiàn)在我們來(lái)學(xué)習(xí)一下RxJava2很強(qiáng)大的操作符

Android RxJava2操作符

Map

  • Map是RxJava中的一個(gè)變換操作符,它的作用就是對(duì)上游發(fā)送的每一個(gè)事件應(yīng)用一個(gè)函數(shù),使得每一個(gè)事件都按照指定的函數(shù)去變化。通過(guò)Map可以將上游發(fā)來(lái)的事件轉(zhuǎn)換為任意的類型,可以是一個(gè)Object也可以是一個(gè)集合,圖示表示如下:
map操作符示意
  • 代碼表示:
Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                Log.d(TAG, "subscribe: ");
                e.onNext(1);
                e.onNext(2);
                e.onNext(3);
            }
        }).map(new Function<Integer, String>() {
            @Override
            public String apply(Integer integer) throws Exception {
                String mapStr = String.valueOf(integer + 1);
                return mapStr;
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                Log.d(TAG, "accept: " + s);
            }
        });

FlatMap

flatMap是一個(gè)非常強(qiáng)大的操作符,flatMap將一個(gè)發(fā)送事件的上游Observable變換為多個(gè)發(fā)送事件的Observables,然后將它們發(fā)射的事件合并后放進(jìn)一個(gè)單獨(dú)的Observable里。

  • 圖示:
flatMap

上游發(fā)送三個(gè)事件,分別是1,2,3注意它們的顏色,中間flatMap的作用是將圓形的事件轉(zhuǎn)換為一個(gè)發(fā)送矩形事件和三角形事件的新的上游Observable

flatmap分解動(dòng)作.jpg

上游每發(fā)送一個(gè)事件,flatMap都將創(chuàng)建一個(gè)新的水管,然后發(fā)送轉(zhuǎn)換之后的新的事件,下游接收到的就是這些新的水管發(fā)送的數(shù)據(jù)。flatMap不能保證事件的順序

 Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                e.onNext(1);
                e.onNext(2);
                e.onNext(3);
            }
        }).flatMap(new Function<Integer, Observable<String>>() {
            @Override
            public Observable<String> apply(Integer integer) throws Exception {
                ArrayList<String> arrayList = new ArrayList<>();
                for (int i = 0; i < 5; i++) {
                    String iStr = "flatMap value" + integer;
                    arrayList.add(iStr);
                }
                return Observable.fromIterable(arrayList).delay(10, TimeUnit.MICROSECONDS);
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                Log.d(TAG, "accept: " + s);
            }
        });

concatMap

concatMap和flatMap的作用是一樣的,它的結(jié)果是嚴(yán)格按照上游發(fā)送的順序來(lái)發(fā)送的。

Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                e.onNext(1);
                e.onNext(11);
                e.onNext(111);
            }
        }).concatMap(new Function<Integer, ObservableSource<String>>() {
            @Override
            public ObservableSource<String> apply(Integer integer) throws Exception {
                ArrayList<String> arrayList = new ArrayList<>();
                for (int i = 0; i < 3; i++) {
                    arrayList.add("concatMap value" + i);
                }
                return Observable.fromIterable(arrayList).delay(5, TimeUnit.MILLISECONDS);
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                Log.d(TAG, "accept: " + s);
            }
        });
  • 運(yùn)行結(jié)果
09-11 03:21:32.970 25661-25678/? D/Rxjava2Lesson: accept: concatMap value0
09-11 03:21:32.970 25661-25678/? D/Rxjava2Lesson: accept: concatMap value1
09-11 03:21:32.970 25661-25678/? D/Rxjava2Lesson: accept: concatMap value2
09-11 03:21:32.981 25661-25680/? D/Rxjava2Lesson: accept: concatMap value0
09-11 03:21:32.981 25661-25680/? D/Rxjava2Lesson: accept: concatMap value1
09-11 03:21:32.981 25661-25680/? D/Rxjava2Lesson: accept: concatMap value2
09-11 03:21:32.988 25661-25681/? D/Rxjava2Lesson: accept: concatMap value0
09-11 03:21:32.988 25661-25681/? D/Rxjava2Lesson: accept: concatMap value1
09-11 03:21:32.988 25661-25681/? D/Rxjava2Lesson: accept: concatMap value2

Buffer

Buffer操作符會(huì)定期收集Observable的數(shù)據(jù)放進(jìn)一個(gè)數(shù)據(jù)包裹,然后發(fā)射這些包裹,并不是一次發(fā)射一個(gè)值
Buffer操作符將一個(gè)Observable變換為另一個(gè),原來(lái)的Observable正常發(fā)射數(shù)據(jù),變換產(chǎn)生的Observable發(fā)射這些數(shù)據(jù)的緩存集合。如果原來(lái)的Observable發(fā)射了一個(gè)onError通知,Buffer會(huì)立即傳遞這個(gè)通知,而不是首先發(fā)射緩存的數(shù)據(jù)。

Buffer變體

  • Buffer(count) 以列表List的形式發(fā)射非重疊的緩存,每一個(gè)緩存至多包含來(lái)自原始Observable的count項(xiàng)數(shù)據(jù)
  • Buffer(count,skip) 從原始Observable的第一項(xiàng)數(shù)據(jù)開(kāi)始創(chuàng)建新的緩存。每當(dāng)接收到skip數(shù)據(jù),用count項(xiàng)數(shù)據(jù)來(lái)填充‘

Scan

Scan連續(xù)地對(duì)數(shù)據(jù)序列的每一項(xiàng)應(yīng)用一個(gè)函數(shù),然后連續(xù)發(fā)射結(jié)果
Scan操作符對(duì)原始Observable發(fā)射的第一項(xiàng)數(shù)據(jù)應(yīng)用一個(gè)函數(shù),然后將這個(gè)函數(shù)的結(jié)果作為自己的第一項(xiàng)數(shù)據(jù)發(fā)射。將函數(shù)的結(jié)果同第二項(xiàng)數(shù)據(jù)一起填充給這個(gè)函數(shù)來(lái)產(chǎn)生自己的第二項(xiàng)數(shù)據(jù)。持續(xù)進(jìn)行這個(gè)過(guò)程來(lái)產(chǎn)生剩余的數(shù)據(jù)序列。

scan.jpg
 Observable.just(1, 2, 3, 4, 5)
                .scan(new BiFunction<Integer, Integer, Integer>() {
                    @Override
                    public Integer apply(Integer integer, Integer integer2) throws Exception {

                        return integer + integer2;
                    }
                }).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                Log.d(TAG, "accept: " + integer);
            }
        });

Window

Window定期將來(lái)自原始Observable的數(shù)據(jù)分解為一個(gè)Observable窗口,發(fā)射這些窗口而不是每次發(fā)射一項(xiàng)數(shù)據(jù)

window.jpg

window和Buffer類似,但不是發(fā)射來(lái)自原始Observable的數(shù)據(jù)包,發(fā)射的是Observables,這些Observables中的每一個(gè)都發(fā)射原始Observable數(shù)據(jù)的一個(gè)子集,最后發(fā)射一個(gè)onComplete通知。

   Observable.range(1, 10).window(new Observable<Integer>() {
            @Override
            protected void subscribeActual(Observer<? super Integer> observer) {
                Log.d(TAG, "subscribeActual: ");
                observer.onNext(1);
                observer.onNext(1);
                observer.onNext(1);
            }
        }).subscribe(new Consumer<Observable<Integer>>() {
            @Override
            public void accept(Observable<Integer> integerObservable) throws Exception {
                integerObservable.subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        Log.d(TAG, "accept: " + integer);
                    }
                });
            }
        });

ZIP操作符

ZIP通過(guò)一個(gè)函數(shù)將多個(gè)Observable發(fā)送的事件結(jié)合到一起,然后發(fā)送這些組合到一起的事件。按照嚴(yán)格的順序應(yīng)用這個(gè)函數(shù),只發(fā)射與發(fā)射項(xiàng)最少的那個(gè)Observable一樣多的數(shù)據(jù)

zip.jpg

從圖中看到,有兩個(gè)上游的水管,通過(guò)ZIP操作符,使得兩個(gè)事件合并為了一個(gè)事件

  • 分解動(dòng)作
zip分解動(dòng)作.jpg
  • 組合的過(guò)程是分別從 兩根水管里各取出一個(gè)事件 來(lái)進(jìn)行組合, 并且一個(gè)事件只能被使用一次, 組合的順序是嚴(yán)格按照事件發(fā)送的順利 來(lái)進(jìn)行的, 也就是說(shuō)不會(huì)出現(xiàn)圓形1 事件和三角形B 事件進(jìn)行合并, 也不可能出現(xiàn)圓形2 和三角形A 進(jìn)行合并的情況.

  • 最終下游收到的事件數(shù)量 是和上游中發(fā)送事件最少的那一根水管的事件數(shù)量 相同. 這個(gè)也很好理解, 因?yàn)槭菑拿恳桓?里取一個(gè)事件來(lái)進(jìn)行合并, 最少的 那個(gè)肯定就最先取完 , 這個(gè)時(shí)候其他的水管盡管還有事件 , 但是已經(jīng)沒(méi)有足夠的事件來(lái)組合了, 因此下游就不會(huì)收到剩余的事件了.

  //上游水管第一個(gè)事件
        Observable<Integer> observable1 = Observable.range(1, 5);
        //上游水管第二個(gè)事件
        Observable<Integer> observable2 = Observable.range(6, 10);
        //合并事件
        Observable.zip(observable1, observable2, new BiFunction<Integer, Integer, String>() {
            @Override
            public String apply(Integer integer, Integer integer2) throws Exception {
                return String.valueOf(integer + integer2);
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                Log.d(TAG, "accept: " + s);
            }
        });

實(shí)踐

  • zip在Android中的使用,可以適用于如下場(chǎng)景,一個(gè)界面需要展示用戶的一些信息,這些信息分別要從兩個(gè)服務(wù)器接口中獲取,只有當(dāng)兩個(gè)數(shù)據(jù)都獲取后才能進(jìn)行展示。這類同時(shí)的信息請(qǐng)求比較適用zip
public interface Api {
    @GET
    Observable<UserBaseInfoResponse> getUserBaseInfo(@Body UserBaseInfoRequest request);

    @GET
    Observable<UserExtraInfoResponse> getUserExtraInfo(@Body UserExtraInfoRequest request);

}

zip打包

Observable<UserBaseInfoResponse> observable1 =                                            
        api.getUserBaseInfo(new UserBaseInfoRequest()).subscribeOn(Schedulers.io());      

Observable<UserExtraInfoResponse> observable2 =                                           
        api.getUserExtraInfo(new UserExtraInfoRequest()).subscribeOn(Schedulers.io());    

Observable.zip(observable1, observable2,                                                  
        new BiFunction<UserBaseInfoResponse, UserExtraInfoResponse, UserInfo>() {         
            @Override                                                                     
            public UserInfo apply(UserBaseInfoResponse baseInfo,                          
                                  UserExtraInfoResponse extraInfo) throws Exception {     
                return new UserInfo(baseInfo, extraInfo);                                 
            }                                                                             
        }).observeOn(AndroidSchedulers.mainThread())                                      
        .subscribe(new Consumer<UserInfo>() {                                             
            @Override                                                                     
            public void accept(UserInfo userInfo) throws Exception {                      
                //do something;                                                           
            }                                                                             
        });

基本的操作符就是這些了,以后再學(xué)習(xí)到其它的運(yùn)算符再繼續(xù)補(bǔ)充

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容