RxJava學習筆記(過濾Observables)

Filter

RxJava讓我們使用filter()方法來過濾觀測序列中我們不想要的值。

  • 先來個沒有帶過濾的

      public static void main(String... args) {
          Observable.create(new Observable.OnSubscribe<String>() {
              @Override
              public void call(Subscriber<? super String> subscriber) {
                  subscriber.onNext("張三");
                  subscriber.onNext("李四");
                  subscriber.onNext("王五");
              }
          }).subscribe(new Observer<String>() {
              @Override
              public void onCompleted() {
    
              }
    
              @Override
              public void onError(Throwable e) {
    
              }
    
              @Override
              public void onNext(String s) {
                  System.out.println("我是" + s);
              }
          });
      }
    

打印結(jié)果:</br>
我是張三</br>
我是李四</br>
我是王五</br>

  • 現(xiàn)在在創(chuàng)建Observable之后添加filter()方法。

      public static void main(String... args) {
          Observable.create(new Observable.OnSubscribe<String>() {
              @Override
              public void call(Subscriber<? super String> subscriber) {
                  subscriber.onNext("張三");
                  subscriber.onNext("李四");
                  subscriber.onNext("王五");
              }
          }).filter(new Func1<String, Boolean>() {
              @Override
              public Boolean call(String s) {
                  return s.startsWith("李");
              }
          }).subscribe(new Observer<String>() {
              @Override
              public void onCompleted() {
    
              }
    
              @Override
              public void onError(Throwable e) {
    
              }
    
              @Override
              public void onNext(String s) {
                  System.out.println("我是" + s);
              }
          });
      }
    
  • 可以看到添加的filter()方法,判斷字符串是否以"李"開頭,返回一個布爾值,只要條件符合filter()函數(shù)就會返回true。此時,該值就會發(fā)送出去。

  • 打印結(jié)果:</br>
    我是李四</br>

Take

許多時候,可能生產(chǎn)者(也就是被觀察者)訂閱了好幾個消費者(也就是觀察者),以后就用生產(chǎn)者和消費者來敘述,觀察者和被觀察者拗口得要命有木有!言歸正傳,生產(chǎn)者會產(chǎn)生一條數(shù)據(jù)流,而你消費者可能僅僅只需要開頭或者結(jié)尾的幾個元素,那么RxJava也為我們提供了take()和takeLast()方法來實現(xiàn)。

  • take(),如果我們只想要一個觀測序列中的前兩個元素,給take()傳入?yún)?shù):整數(shù)2,就能實現(xiàn)。

      public static void main(String... args) {
          Observable.create(new Observable.OnSubscribe<String>() {
              @Override
              public void call(Subscriber<? super String> subscriber) {
                  subscriber.onNext("張三");
                  subscriber.onNext("李四");
                  subscriber.onNext("王五");
              }
          })
                  .take(2)
                  .subscribe(new Observer<String>() {
                      @Override
                      public void onCompleted() {
    
                      }
    
                      @Override
                      public void onError(Throwable e) {
    
                      }
    
                      @Override
                      public void onNext(String s) {
                          System.out.println("我是" + s);
                      }
                  });
      }
    

打印結(jié)果:</br>
我是張三</br>
我是李四</br>
在這里的take(n),表示的是說生產(chǎn)者發(fā)送前n個數(shù)據(jù),n = 2 也就發(fā)送前兩個數(shù)據(jù),并不是說數(shù)據(jù)全部發(fā)完,截取前兩個數(shù)據(jù)。

  • takeLast()能夠讓我們發(fā)送后幾個數(shù)據(jù)元素。

      public static void main(String... args) {
          Observable.create(new Observable.OnSubscribe<String>() {
              @Override
              public void call(Subscriber<? super String> subscriber) {
                  subscriber.onNext("張三");
                  subscriber.onNext("李四");
                  subscriber.onNext("王五");
                  subscriber.onCompleted();
              }
          })
                  .takeLast(1)
                  .subscribe(new Observer<String>() {
                      @Override
                      public void onCompleted() {
      
                      }
      
                      @Override
                      public void onError(Throwable e) {
      
                      }
      
                      @Override
                      public void onNext(String s) {
                          System.out.println("我是" + s);
                      }
                  });
      }
    

同樣的,不能少了subscriber.onCompleted()。
打印結(jié)果:</br>
我是王五</br>

Distinct

distinct()作用于一個完整的序列,所有重復的數(shù)據(jù)項只會發(fā)射一次。

public static void main(String... args) {
    Observable.just(1,2,1,2)
            .distinct()
            .subscribe(new Observer<Integer>() {
                @Override
                public void onCompleted() {

                }

                @Override
                public void onError(Throwable e) {

                }

                @Override
                public void onNext(Integer integer) {
                    System.out.println("i = " + integer);
                }
            });
}

打印結(jié)果:</br>
i = 1</br>
i = 2</br>

DistinctUntilChanged

distinctUntilChanged()與distinct()相類似,不過distinctUntilChanged()是判斷當前發(fā)射的值與前一個數(shù)據(jù)是否相同,在實際中,可以假設(shè)情形比如說UI根據(jù)獲取到的數(shù)據(jù)不同更新自身UI,但是如果數(shù)據(jù)內(nèi)容并沒有發(fā)生改變,出于不浪費資源的目的,就不要發(fā)射數(shù)據(jù)。

public static void main(String... args) {
    Observable.just(1,2,2)
            .distinct()
            .subscribe(new Observer<Integer>() {
                @Override
                public void onCompleted() {

                }

                @Override
                public void onError(Throwable e) {

                }

                @Override
                public void onNext(Integer integer) {
                    System.out.println("i = " + integer);
                }
            });
}

打印結(jié)果:</br>
i = 1</br>
i = 2</br>

First

first()從Observable中只發(fā)射第一個元素,或者添加參數(shù)first(Fun1)只發(fā)送符合條件的第一個數(shù)據(jù)項。

public static void main(String... args) {
    Observable.just(1,2,2)
            .first()
            .subscribe(new Observer<Integer>() {
                @Override
                public void onCompleted() {

                }

                @Override
                public void onError(Throwable e) {

                }

                @Override
                public void onNext(Integer integer) {
                    System.out.println("i = " + integer);
                }
            });
}

打印結(jié)果:</br>
i = 1

public static void main(String... args) {
    Observable.just(1,2,2)
            .first(new Func1<Integer, Boolean>() {
                @Override
                public Boolean call(Integer integer) {
                    return integer == 2;
                }
            })
            .subscribe(new Observer<Integer>() {
                @Override
                public void onCompleted() {

                }

                @Override
                public void onError(Throwable e) {

                }

                @Override
                public void onNext(Integer integer) {
                    System.out.println("i = " + integer);
                }
            });
}

打印結(jié)果:</br>
i = 2</br>

Last

first()從Observable中只發(fā)射最后一個元素,或者添加參數(shù)first(Fun1)只發(fā)送符合條件的最后一個數(shù)據(jù)項。代碼參考First。

Skip

skip(int)可以讓我們忽略O(shè)bservable前n個元素,而直接跳過這n個元素發(fā)射后面的元素。

public static void main(String... args) {
    Observable.just(1,2,2)
            .skip(2)
            .subscribe(new Observer<Integer>() {
                @Override
                public void onCompleted() {

                }

                @Override
                public void onError(Throwable e) {

                }

                @Override
                public void onNext(Integer integer) {
                    System.out.println("i = " + integer);
                }
            });
}

打印結(jié)果:</br>
i = 2</br>

SkipLast

skipLast(int)則是忽略后n個元素的發(fā)射。

ElementAt

現(xiàn)在我們有了控制前后的元素過濾規(guī)則,那么自然會有一個問題,如果我只想要觀測序列其中的一個元素該怎么辦呢,那么elementAt(int)就能實現(xiàn)。elementAt(int)用來獲取元素Observable發(fā)射的事件序列中的第n項數(shù)據(jù),并當做唯一的數(shù)據(jù)發(fā)射出去。

public static void main(String... args) {
    Observable.just(1,2,3)
            .elementAt(2)
            .subscribe(new Observer<Integer>() {
                @Override
                public void onCompleted() {

                }

                @Override
                public void onError(Throwable e) {

                }

                @Override
                public void onNext(Integer integer) {
                    System.out.println("i = " + integer);
                }
            });
}

打印結(jié)果:</br>
i = 3</br>
同時還有一個拓展方法,如果想查找第六個元素,但是可觀測序列只有三個元素怎么辦,可以用elementAtOrDefault(int index, T defaultValue),在第二個參數(shù)傳入一個默認值。

Sample

假如我們有一個溫度傳感器,每秒鐘都會發(fā)射一次室內(nèi)溫度,然后UI根據(jù)溫度變化而更新,但是有一個問題,我們認為每秒鐘就獲取一次數(shù)據(jù)并更新相當?shù)睦速M資源,再說溫度也不一定變化這么快,那么我們就需要一個小小的發(fā)射間隔。sample()就能幫我們做到這一點,在Observable后面加一個sample(),將創(chuàng)建一個新的觀測序列,并且它會在指定的時間間隔里由Observable發(fā)射最近的一次數(shù)值。

public static void main(String... args) {
    Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() {
        @Override
        public void call(Subscriber<? super Integer> subscriber) {
            for (int i = 0; i <= 50; i++) {
                if (i % 10 ==0){
                    try {
                        Thread.sleep(5000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                subscriber.onNext(i);
            }
        }
    });
    observable.sample(2, TimeUnit.SECONDS)
            .subscribe(new Observer<Integer>() {
                @Override
                public void onCompleted() {

                }

                @Override
                public void onError(Throwable e) {

                }

                @Override
                public void onNext(Integer integer) {
                    System.out.println("i = " + integer);
                }
            });
}

先打印結(jié)果:</br>
i = 9</br>
i = 19</br>
i = 29</br>
i = 39</br>
i = 49</br>

分析:這里我們先創(chuàng)建了一個Observable,主要就是一個for循環(huán),依次發(fā)射0~50,為了驗證方便呢,就加了一個判斷,如果發(fā)射的是整數(shù)就線程休眠5s,為什么要這樣干呢,你想cpu多快啊,才50個數(shù)不是一滋溜就發(fā)射完了么,那之后通過sample(2, TimeUnit.SECONDS)設(shè)置的2s發(fā)射一個最近的值不是只有最后的一個值了么,打印的結(jié)果也就達不到驗證的目的了呀。這里再附上一張圖:

sample.jpg

如果我們想讓它定時發(fā)射第一個元素而不是最近的一個元素,我們可以使用throttleFirst()。

Timeout

有的時候我們在規(guī)定的時間內(nèi)必須要有一個數(shù)據(jù),就上文的溫度傳感器來說,我們想讓它每隔兩秒至少發(fā)射一個,那么我們就可以用timeout函數(shù)來監(jiān)聽觀測序列,如果在我們設(shè)定的時間內(nèi)沒有得到一個值就發(fā)射一個錯誤。

public static void main(String... args) {
    Observable.create(new Observable.OnSubscribe<Integer>() {
        @Override
        public void call(Subscriber<? super Integer> subscriber) {
            for (int i = 0; i < 50; i++) {
                if (i % 10 == 0){
                    try {
                        Thread.sleep(2000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                subscriber.onNext(i);
            }
        }
    })
            .timeout(2, TimeUnit.SECONDS)
            .subscribe(new Observer<Integer>() {
                @Override
                public void onCompleted() {

                }

                @Override
                public void onError(Throwable e) {
                    System.out.println("Timeout error");
                }

                @Override
                public void onNext(Integer integer) {
                    System.out.println("i = " + integer);
                }
            });
}

打印結(jié)果:</br>
Timeout error</br>
可以看到,我們通過timeout(2, TimeUnit.SECONDS)設(shè)置了2s的時間限制,而在Observable中讓線程休眠了2s,那么觸發(fā)了Timeout,發(fā)射了一個錯誤。

Debounce

debounce()過濾掉了由Observable發(fā)射的速率過快的數(shù)據(jù),如果在一個指定的時間間隔過去了仍舊沒有發(fā)射一個,那么它將發(fā)射最后的那個。

public static void main(String... args) {
    Observable.create(new Observable.OnSubscribe<Integer>() {
        @Override
        public void call(Subscriber<? super Integer> subscriber) {
            for (int i = 0; i <= 50; i++) {
                if (i % 10 == 0){
                    try {
                        Thread.sleep(3000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                subscriber.onNext(i);
            }
        }
    })
    .debounce(2 , TimeUnit.SECONDS)
    .subscribe(new Observer<Integer>() {
        @Override
        public void onCompleted() {

        }

        @Override
        public void onError(Throwable e) {

        }

        @Override
        public void onNext(Integer integer) {
            System.out.println("i = " + integer);
        }
    });
}

打印結(jié)果:</br>
i = 9</br>
i = 19</br>
i = 29</br>
i = 39</br>
i = 49</br>
從打印的結(jié)果來看,與之前的sample一般無二,但是要理解意義的不同,sample是在一條可觀測序列中,選擇指定時間段要發(fā)射的元素發(fā)射出來,而debounce是指一段時間內(nèi)沒有新數(shù)據(jù)發(fā)射,那么就發(fā)射最后的那一個。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容