RxJava常用操作符

  • 創(chuàng)建

    • unfaseCreate(create)
      創(chuàng)建一個(gè)Observable(被觀察者),當(dāng)被觀察者(Observer)/訂閱者(Subscriber)
      subscribe(訂閱)的時(shí)候就會(huì)依次發(fā)出三條字符串?dāng)?shù)據(jù)("Hello","RxJava","Nice to meet you")
      最終onComplete
Observable.unsafeCreate(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {
                subscriber.onNext("Hello");
                subscriber.onNext("RxJava");
                subscriber.onNext("Nice to meet you");
                subscriber.onCompleted();
            }
        });
  • just
    作用同上,訂閱時(shí)依次發(fā)出三條數(shù)據(jù),不過此方法參數(shù)可以有1-9條
Observable.just("Hello", "RxJava", "Nice to meet you")
  • from
    作用同just不過是把參數(shù)封裝成數(shù)組或者可迭代的集合在依次發(fā)送出來,突破了just9個(gè)參數(shù)的限制
String[] strings = {"Hello", "RxJava", "Nice to meet you"};
Observable.from(strings)
        .subscribe(new Action1<String>() {
            @Override
            public void call(String s) {
                System.out.println("onNext--> " + s);
            }
        }, new Action1<Throwable>() {
            @Override
            public void call(Throwable throwable) {
                System.out.println("onError--> " + throwable.getMessage());
            }
        }, new Action0() {
            @Override
            public void call() {
                System.out.println("onComplete");
            }
        });
  • defer
    顧名思義,延遲創(chuàng)建
private String[] strings1 = {"Hello", "World"};
    private String[] strings2 = {"Hello", "RxJava"};

    private void test() {
        Observable<String> observable = Observable.defer(new Func0<Observable<String>>() {
            @Override
            public Observable<String> call() {
                return Observable.from(strings1);
            }
        });

        strings1 = strings2; //訂閱前把strings給改了
        observable.subscribe(new Action1<String>() {
            @Override
            public void call(String s) {
                System.out.println("onNext--> " + s);
            }
        }, new Action1<Throwable>() {
            @Override
            public void call(Throwable throwable) {
                System.out.println("onError--> " + throwable.getMessage());
            }
        }, new Action0() {
            @Override
            public void call() {
                System.out.println("onComplete");
            }
        });
    }

我們發(fā)現(xiàn)數(shù)據(jù)結(jié)果變成這樣了

onNext--> Hello
onNext--> RxJava
onComplete

由此可以證明defer操作符起到的不過是一個(gè)“預(yù)創(chuàng)建”的作用,真正創(chuàng)建是發(fā)生在訂閱的時(shí)候

  • empty
    創(chuàng)建一個(gè)空的,不會(huì)發(fā)射任何事件(數(shù)據(jù))的Observable
Observable.empty()
        .subscribe(new Action1<Object>() {
            @Override
            public void call(Object o) {
                System.out.println("onNext--> " + o);
            }
        }, new Action1<Throwable>() {
            @Override
            public void call(Throwable throwable) {
                System.out.println("onError--> " + throwable.getMessage());
            }
        }, new Action0() {
            @Override
            public void call() {
                System.out.println("onComplete");
            }
        });
onComplete
  • never
    創(chuàng)建一個(gè)不會(huì)發(fā)出任何事件也不會(huì)結(jié)束的Observable
Observable.never()
        .subscribe(new Action1<Object>() {
            @Override
            public void call(Object o) {
                System.out.println("onNext--> " + o);
            }
        }, new Action1<Throwable>() {
            @Override
            public void call(Throwable throwable) {
                System.out.println("onError--> " + throwable.getMessage());
            }
        }, new Action0() {
            @Override
            public void call() {
                System.out.println("onComplete");
            }
        });
······
  • error
    創(chuàng)建一個(gè)會(huì)發(fā)出一個(gè)error事件的Observable
Observable.error(new RuntimeException("fuck!"))
        .subscribe(new Action1<Object>() {
            @Override
            public void call(Object o) {
                System.out.println("onNext--> " + o);
            }
        }, new Action1<Throwable>() {
            @Override
            public void call(Throwable throwable) {
                System.out.println("onError--> " + throwable.getMessage());
            }
        }, new Action0() {
            @Override
            public void call() {
                System.out.println("onComplete");
            }
        });
onError--> fuck!
  • range
    創(chuàng)建一個(gè)發(fā)射一組整數(shù)序列的Observable
Observable.range(3, 8)
        .subscribe(new Action1<Object>() {
            @Override
            public void call(Object o) {
                System.out.println("onNext--> " + o);
            }
        }, new Action1<Throwable>() {
            @Override
            public void call(Throwable throwable) {
                System.out.println("onError--> " + throwable.getMessage());
            }
        }, new Action0() {
            @Override
            public void call() {
                System.out.println("onComplete");
            }
        });
onNext--> 3
onNext--> 4
onNext--> 5
onNext--> 6
onNext--> 7
onNext--> 8
onNext--> 9
onNext--> 10
onComplete
  • interval
    創(chuàng)建一個(gè)無限的計(jì)時(shí)序列,每隔一段時(shí)間發(fā)射一個(gè)數(shù)字(從0開始)的Observable
   Observable.interval(1, TimeUnit.SECONDS)
        .subscribe(new Action1<Object>() {
            @Override
            public void call(Object o) {
                System.out.println("onNext--> " + o);
            }
        }, new Action1<Throwable>() {
            @Override
            public void call(Throwable throwable) {
                System.out.println("onError--> " + throwable.getMessage());
            }
        }, new Action0() {
            @Override
            public void call() {
                System.out.println("onComplete");
            }
        });

   System.in.read();//阻塞當(dāng)前線程,防止JVM結(jié)束程序
onNext--> 0
onNext--> 1
onNext--> 2
onNext--> 3
onNext--> 4
onNext--> 5
onNext--> 6
...
  • 轉(zhuǎn)換

    • buffer(int count)
      將原發(fā)射出來的數(shù)據(jù)已count為單元打包之后在分別發(fā)射出來
Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
        .buffer(3)
        .subscribe(new Action1<Object>() {
            @Override
            public void call(Object o) {
                System.out.println("onNext--> " + o);
            }
        }, new Action1<Throwable>() {
            @Override
            public void call(Throwable throwable) {
                System.out.println("onError--> " + throwable.getMessage());
            }
        }, new Action0() {
            @Override
            public void call() {
                System.out.println("onComplete");
            }
        });
onNext--> [1, 2, 3]
onNext--> [4, 5, 6]
onNext--> [7, 8, 9]
onNext--> [10]
onComplete
  • map
    將原Observable發(fā)射出來的數(shù)據(jù)轉(zhuǎn)換為另外一種類型的數(shù)據(jù)
Observable.just("Hello", "RxJava", "Nice to meet you")
        .map(new Func1<String, Integer>() { //泛型第一個(gè)類型是原數(shù)據(jù)類型,第二個(gè)類型是想要變換的數(shù)據(jù)類型
            @Override
            public Integer call(String s) {
                // 這是轉(zhuǎn)換成了Student類型
                // Student student = new Student();
                // student.setName(s);
                // return student;
                return s.hashCode();        //將數(shù)據(jù)轉(zhuǎn)換為了int(取得其hashCode值)
            }
        })
        .subscribe(new Action1<Integer>() {
            @Override
            public void call(Integer o) {
                System.out.println("onNext--> " + o);
            }
        }, new Action1<Throwable>() {
            @Override
            public void call(Throwable throwable) {
                System.out.println("onError--> " + throwable.getMessage());
            }
        }, new Action0() {
            @Override
            public void call() {
                System.out.println("onComplete");
            }
        });
onNext--> 69609650
onNext--> -1834252888
onNext--> -1230747480
onComplete
  • flatMap
    作用類似于map又比map強(qiáng)大,map是單純的數(shù)據(jù)類型的轉(zhuǎn)換,而flapMap可以將原數(shù)據(jù)新的Observables,再將這些Observables的數(shù)據(jù)順序緩存到一個(gè)新的隊(duì)列中,在統(tǒng)一發(fā)射出來
Observable.just("Hello", "RxJava", "Nice to meet you")
        .flatMap(new Func1<String, Observable<Integer>>() {
            @Override
            public Observable<Integer> call(String s) {
                return Observable.just(s.hashCode());
            }
        })
        .subscribe(new Action1<Integer>() {
            @Override
            public void call(Integer o) {
                System.out.println("onNext--> " + o);
            }
        }, new Action1<Throwable>() {
            @Override
            public void call(Throwable throwable) {
                System.out.println("onError--> " + throwable.getMessage());
            }
        }, new Action0() {
            @Override
            public void call() {
                System.out.println("onComplete");
            }
        });
    }
onNext--> 69609650
onNext--> -1834252888
onNext--> -1230747480
onComplete

這里雖然結(jié)果和map是相同的,但是過程卻是不同的。flatMap是先將原來的三個(gè)字符串("Hello","RxJava","Nice to meet you")依次取其hashCode,在利用Observable.just將轉(zhuǎn)換之后的int類型的值在發(fā)射出來。map只是單穿的轉(zhuǎn)換了數(shù)據(jù)類型,而flapMap是轉(zhuǎn)換成了新的Observable了,這在開發(fā)過程中遇到嵌套網(wǎng)絡(luò)請求的時(shí)候十分方便。

  • window
    看到網(wǎng)上其他人說他的總用類似于buffer,不過我倒是認(rèn)為他更像flatMap,區(qū)別的是flapMap在轉(zhuǎn)換之后形成新的Observable會(huì)再將新的數(shù)據(jù)發(fā)射出來,不過window就僅僅只轉(zhuǎn)換成了發(fā)射新的數(shù)據(jù)類型的Observable,有點(diǎn)像是flatMap在干活時(shí)半途而廢的意思。

  • 過濾

    • filter
      對發(fā)射的數(shù)據(jù)做一個(gè)限制,只有滿足條件的數(shù)據(jù)才會(huì)被發(fā)射
Observable.just("Hello", "RxJava", "Nice to meet you")
        .filter(new Func1<String, Boolean>() {
            @Override
            public Boolean call(String s) {
                //這里的顯示條件是s的長度大于5,而Hello的長度剛好是5
                //所以不能滿足條件
                return s.length() > 5;
            }
        })
        .subscribe(new Action1<String>() {
            @Override
            public void call(String s) {
                System.out.println("onNext--> " + s);
            }
        }, new Action1<Throwable>() {
            @Override
            public void call(Throwable throwable) {
                System.out.println("onError--> " + throwable.getMessage());
            }
        }, new Action0() {
            @Override
            public void call() {
                System.out.println("onComplete");
            }
        });
onNext--> RxJava
onNext--> Nice to meet you
onComplete
  • take
    只發(fā)射前N項(xiàng)的數(shù)據(jù)(takeLast與take想反,只取最后N項(xiàng)數(shù)據(jù)
Observable.just("Hello", "RxJava", "Nice to meet you")
        .take(2)
        //.taktLast(2)
        .subscribe(new Action1<String>() {
            @Override
            public void call(String s) {
                System.out.println("onNext--> " + s);
            }
        }, new Action1<Throwable>() {
            @Override
            public void call(Throwable throwable) {
                System.out.println("onError--> " + throwable.getMessage());
            }
        }, new Action0() {
            @Override
            public void call() {
                System.out.println("onComplete");
            }
        });
    }
onNext--> Hello
onNext--> RxJava
onComplete
  • skip
    發(fā)射數(shù)據(jù)時(shí)忽略前N項(xiàng)數(shù)據(jù)(skpiLast忽略后N項(xiàng)數(shù)據(jù)
Observable.just("Hello", "RxJava", "Nice to meet you")
        .skip(2)
        //.skipLast(2)
        .subscribe(new Action1<String>() {
            @Override
            public void call(String s) {
                System.out.println("onNext--> " + s);
            }
       }, new Action1<Throwable>() {
            @Override
            public void call(Throwable throwable) {
                System.out.println("onError--> " + throwable.getMessage());
            }
        }, new Action0() {
            @Override
            public void call() {
                System.out.println("onComplete");
            }
        });
    }
onNext--> Nice to meet you
onComplete
  • elementAt
    獲取原數(shù)據(jù)的第N項(xiàng)數(shù)據(jù)作為唯一的數(shù)據(jù)發(fā)射出去(elementAtOrDefault會(huì)在index超出范圍時(shí),給出一個(gè)默認(rèn)值發(fā)射出來
Observable.just("Hello", "RxJava", "Nice to meet you")
        .elementAtOrDefault(1, "Great")
        .subscribe(new Action1<String>() {
            @Override
            public void call(String s) {
                System.out.println("onNext--> " + s);
            }
        }, new Action1<Throwable>() {
            @Override
            public void call(Throwable throwable) {
                System.out.println("onError--> " + throwable.getMessage());
            }
        }, new Action0() {
            @Override
            public void call() {
                System.out.println("onComplete");
            }
        });
onNext--> RxJava
onComplete
  • distinct
    過濾掉重復(fù)項(xiàng)
Observable.just("Hello", "Hello", "Hello", "RxJava", "Nice to meet you")
        .distinct()
        .subscribe(new Action1<String>() {
            @Override
            public void call(String s) {
                System.out.println("onNext--> " + s);
            }
        }, new Action1<Throwable>() {
            @Override
            public void call(Throwable throwable) {
                System.out.println("onError--> " + throwable.getMessage());
            }
        }, new Action0() {
            @Override
            public void call() {
                System.out.println("onComplete");
            }
        });
onNext--> Hello
onNext--> RxJava
onNext--> Nice to meet you
onComplete
  • 組合

    • startWith
      在發(fā)射一組數(shù)據(jù)之前先發(fā)射另一組數(shù)據(jù)
Observable.just("Hello", "RxJava", "Nice to meet you")
        .startWith("One", "Two", "Three")
        .subscribe(new Action1<String>() {
            @Override
            public void call(String s) {
                System.out.println("onNext--> " + s);
            }
        }, new Action1<Throwable>() {
            @Override
            public void call(Throwable throwable) {
                System.out.println("onError--> " + throwable.getMessage());
            }
        }, new Action0() {
            @Override
            public void call() {
                System.out.println("onComplete");
            }
        });
onNext--> One
onNext--> Two
onNext--> Three
onNext--> Hello
onNext--> RxJava
onNext--> Nice to meet you
onComplete
  • merge
    將多個(gè)Observables發(fā)射的數(shù)據(jù)合并后在發(fā)射
Observable.merge(Observable.just(1, 2, 3), Observable.just(4, 5),
        Observable.just(6, 7), Observable.just(8, 9, 10))
        .subscribe(new Action1<Integer>() {
            @Override
            public void call(Integer s) {
                System.out.println("onNext--> " + s);
            }
        }, new Action1<Throwable>() {
            @Override
            public void call(Throwable throwable) {
                System.out.println("onError--> " + throwable.getMessage());
            }
        }, new Action0() {
            @Override
            public void call() {
                System.out.println("onComplete");
            }
        });
onNext--> 1
onNext--> 2
onNext--> 3
onNext--> 4
onNext--> 5
onNext--> 6
onNext--> 7
onNext--> 8
onNext--> 9
onNext--> 10
onComplete
  • zip
    按照自己的規(guī)則發(fā)射與發(fā)射數(shù)據(jù)項(xiàng)最少的相同的數(shù)據(jù)
Observable.zip(Observable.just(1, 8, 7), Observable.just(2, 5),
        Observable.just(3, 6), Observable.just(4, 9, 0), new Func4<Integer, Integer, Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer integer, Integer integer2, Integer integer3, Integer integer4) {
                return integer < integer2 ? integer3 : integer4;
            }
        })
        .subscribe(new Action1<Integer>() {
            @Override
            public void call(Integer s) {
                System.out.println("onNext--> " + s);
            }
        }, new Action1<Throwable>() {
            @Override
            public void call(Throwable throwable) {
                System.out.println("onError--> " + throwable.getMessage());
            }
        }, new Action0() {
            @Override
            public void call() {
                System.out.println("onComplete");
            }
        });
onNext--> 3
onNext--> 9
onComplete

通過觀察以上例子可以發(fā)現(xiàn)我們的發(fā)射規(guī)則是如果發(fā)射的第一個(gè)數(shù)據(jù)小于第二個(gè)數(shù)則發(fā)射第三個(gè)數(shù)據(jù),否則發(fā)射第四個(gè)數(shù)據(jù)(我們來驗(yàn)證一下,1確實(shí)是小于2的,隨意發(fā)射的是3;8并不小于5,所以發(fā)射的是9,又因?yàn)樗膫€(gè)發(fā)射箱,最少的之后兩項(xiàng),所以最后只發(fā)射了兩項(xiàng)數(shù)據(jù))

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容