RxJava學(xué)習(xí)筆記(過(guò)濾Observables)

Filter

RxJava讓我們使用filter()方法來(lái)過(guò)濾觀測(cè)序列中我們不想要的值。

  • 先來(lái)個(gè)沒(méi)有帶過(guò)濾的

      public static void main(String... args) {
          Observable.create(new Observable.OnSubscribe<String>() {
              @Override
              public void call(Subscriber<? super String> subscriber) {
                  subscriber.onNext("張三");
                  subscriber.onNext("李四");
                  subscriber.onNext("王五");
              }
          }).subscribe(new Observer<String>() {
              @Override
              public void onCompleted() {
    
              }
    
              @Override
              public void onError(Throwable e) {
    
              }
    
              @Override
              public void onNext(String s) {
                  System.out.println("我是" + s);
              }
          });
      }
    

打印結(jié)果:</br>
我是張三</br>
我是李四</br>
我是王五</br>

  • 現(xiàn)在在創(chuàng)建Observable之后添加filter()方法。

      public static void main(String... args) {
          Observable.create(new Observable.OnSubscribe<String>() {
              @Override
              public void call(Subscriber<? super String> subscriber) {
                  subscriber.onNext("張三");
                  subscriber.onNext("李四");
                  subscriber.onNext("王五");
              }
          }).filter(new Func1<String, Boolean>() {
              @Override
              public Boolean call(String s) {
                  return s.startsWith("李");
              }
          }).subscribe(new Observer<String>() {
              @Override
              public void onCompleted() {
    
              }
    
              @Override
              public void onError(Throwable e) {
    
              }
    
              @Override
              public void onNext(String s) {
                  System.out.println("我是" + s);
              }
          });
      }
    
  • 可以看到添加的filter()方法,判斷字符串是否以"李"開(kāi)頭,返回一個(gè)布爾值,只要條件符合filter()函數(shù)就會(huì)返回true。此時(shí),該值就會(huì)發(fā)送出去。

  • 打印結(jié)果:</br>
    我是李四</br>

Take

許多時(shí)候,可能生產(chǎn)者(也就是被觀察者)訂閱了好幾個(gè)消費(fèi)者(也就是觀察者),以后就用生產(chǎn)者和消費(fèi)者來(lái)敘述,觀察者和被觀察者拗口得要命有木有!言歸正傳,生產(chǎn)者會(huì)產(chǎn)生一條數(shù)據(jù)流,而你消費(fèi)者可能僅僅只需要開(kāi)頭或者結(jié)尾的幾個(gè)元素,那么RxJava也為我們提供了take()和takeLast()方法來(lái)實(shí)現(xiàn)。

  • take(),如果我們只想要一個(gè)觀測(cè)序列中的前兩個(gè)元素,給take()傳入?yún)?shù):整數(shù)2,就能實(shí)現(xiàn)。

      public static void main(String... args) {
          Observable.create(new Observable.OnSubscribe<String>() {
              @Override
              public void call(Subscriber<? super String> subscriber) {
                  subscriber.onNext("張三");
                  subscriber.onNext("李四");
                  subscriber.onNext("王五");
              }
          })
                  .take(2)
                  .subscribe(new Observer<String>() {
                      @Override
                      public void onCompleted() {
    
                      }
    
                      @Override
                      public void onError(Throwable e) {
    
                      }
    
                      @Override
                      public void onNext(String s) {
                          System.out.println("我是" + s);
                      }
                  });
      }
    

打印結(jié)果:</br>
我是張三</br>
我是李四</br>
在這里的take(n),表示的是說(shuō)生產(chǎn)者發(fā)送前n個(gè)數(shù)據(jù),n = 2 也就發(fā)送前兩個(gè)數(shù)據(jù),并不是說(shuō)數(shù)據(jù)全部發(fā)完,截取前兩個(gè)數(shù)據(jù)。

  • takeLast()能夠讓我們發(fā)送后幾個(gè)數(shù)據(jù)元素。

      public static void main(String... args) {
          Observable.create(new Observable.OnSubscribe<String>() {
              @Override
              public void call(Subscriber<? super String> subscriber) {
                  subscriber.onNext("張三");
                  subscriber.onNext("李四");
                  subscriber.onNext("王五");
                  subscriber.onCompleted();
              }
          })
                  .takeLast(1)
                  .subscribe(new Observer<String>() {
                      @Override
                      public void onCompleted() {
      
                      }
      
                      @Override
                      public void onError(Throwable e) {
      
                      }
      
                      @Override
                      public void onNext(String s) {
                          System.out.println("我是" + s);
                      }
                  });
      }
    

同樣的,不能少了subscriber.onCompleted()。
打印結(jié)果:</br>
我是王五</br>

Distinct

distinct()作用于一個(gè)完整的序列,所有重復(fù)的數(shù)據(jù)項(xiàng)只會(huì)發(fā)射一次。

public static void main(String... args) {
    Observable.just(1,2,1,2)
            .distinct()
            .subscribe(new Observer<Integer>() {
                @Override
                public void onCompleted() {

                }

                @Override
                public void onError(Throwable e) {

                }

                @Override
                public void onNext(Integer integer) {
                    System.out.println("i = " + integer);
                }
            });
}

打印結(jié)果:</br>
i = 1</br>
i = 2</br>

DistinctUntilChanged

distinctUntilChanged()與distinct()相類(lèi)似,不過(guò)distinctUntilChanged()是判斷當(dāng)前發(fā)射的值與前一個(gè)數(shù)據(jù)是否相同,在實(shí)際中,可以假設(shè)情形比如說(shuō)UI根據(jù)獲取到的數(shù)據(jù)不同更新自身UI,但是如果數(shù)據(jù)內(nèi)容并沒(méi)有發(fā)生改變,出于不浪費(fèi)資源的目的,就不要發(fā)射數(shù)據(jù)。

public static void main(String... args) {
    Observable.just(1,2,2)
            .distinct()
            .subscribe(new Observer<Integer>() {
                @Override
                public void onCompleted() {

                }

                @Override
                public void onError(Throwable e) {

                }

                @Override
                public void onNext(Integer integer) {
                    System.out.println("i = " + integer);
                }
            });
}

打印結(jié)果:</br>
i = 1</br>
i = 2</br>

First

first()從Observable中只發(fā)射第一個(gè)元素,或者添加參數(shù)first(Fun1)只發(fā)送符合條件的第一個(gè)數(shù)據(jù)項(xiàng)。

public static void main(String... args) {
    Observable.just(1,2,2)
            .first()
            .subscribe(new Observer<Integer>() {
                @Override
                public void onCompleted() {

                }

                @Override
                public void onError(Throwable e) {

                }

                @Override
                public void onNext(Integer integer) {
                    System.out.println("i = " + integer);
                }
            });
}

打印結(jié)果:</br>
i = 1

public static void main(String... args) {
    Observable.just(1,2,2)
            .first(new Func1<Integer, Boolean>() {
                @Override
                public Boolean call(Integer integer) {
                    return integer == 2;
                }
            })
            .subscribe(new Observer<Integer>() {
                @Override
                public void onCompleted() {

                }

                @Override
                public void onError(Throwable e) {

                }

                @Override
                public void onNext(Integer integer) {
                    System.out.println("i = " + integer);
                }
            });
}

打印結(jié)果:</br>
i = 2</br>

Last

first()從Observable中只發(fā)射最后一個(gè)元素,或者添加參數(shù)first(Fun1)只發(fā)送符合條件的最后一個(gè)數(shù)據(jù)項(xiàng)。代碼參考First。

Skip

skip(int)可以讓我們忽略O(shè)bservable前n個(gè)元素,而直接跳過(guò)這n個(gè)元素發(fā)射后面的元素。

public static void main(String... args) {
    Observable.just(1,2,2)
            .skip(2)
            .subscribe(new Observer<Integer>() {
                @Override
                public void onCompleted() {

                }

                @Override
                public void onError(Throwable e) {

                }

                @Override
                public void onNext(Integer integer) {
                    System.out.println("i = " + integer);
                }
            });
}

打印結(jié)果:</br>
i = 2</br>

SkipLast

skipLast(int)則是忽略后n個(gè)元素的發(fā)射。

ElementAt

現(xiàn)在我們有了控制前后的元素過(guò)濾規(guī)則,那么自然會(huì)有一個(gè)問(wèn)題,如果我只想要觀測(cè)序列其中的一個(gè)元素該怎么辦呢,那么elementAt(int)就能實(shí)現(xiàn)。elementAt(int)用來(lái)獲取元素Observable發(fā)射的事件序列中的第n項(xiàng)數(shù)據(jù),并當(dāng)做唯一的數(shù)據(jù)發(fā)射出去。

public static void main(String... args) {
    Observable.just(1,2,3)
            .elementAt(2)
            .subscribe(new Observer<Integer>() {
                @Override
                public void onCompleted() {

                }

                @Override
                public void onError(Throwable e) {

                }

                @Override
                public void onNext(Integer integer) {
                    System.out.println("i = " + integer);
                }
            });
}

打印結(jié)果:</br>
i = 3</br>
同時(shí)還有一個(gè)拓展方法,如果想查找第六個(gè)元素,但是可觀測(cè)序列只有三個(gè)元素怎么辦,可以用elementAtOrDefault(int index, T defaultValue),在第二個(gè)參數(shù)傳入一個(gè)默認(rèn)值。

Sample

假如我們有一個(gè)溫度傳感器,每秒鐘都會(huì)發(fā)射一次室內(nèi)溫度,然后UI根據(jù)溫度變化而更新,但是有一個(gè)問(wèn)題,我們認(rèn)為每秒鐘就獲取一次數(shù)據(jù)并更新相當(dāng)?shù)睦速M(fèi)資源,再說(shuō)溫度也不一定變化這么快,那么我們就需要一個(gè)小小的發(fā)射間隔。sample()就能幫我們做到這一點(diǎn),在Observable后面加一個(gè)sample(),將創(chuàng)建一個(gè)新的觀測(cè)序列,并且它會(huì)在指定的時(shí)間間隔里由Observable發(fā)射最近的一次數(shù)值。

public static void main(String... args) {
    Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() {
        @Override
        public void call(Subscriber<? super Integer> subscriber) {
            for (int i = 0; i <= 50; i++) {
                if (i % 10 ==0){
                    try {
                        Thread.sleep(5000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                subscriber.onNext(i);
            }
        }
    });
    observable.sample(2, TimeUnit.SECONDS)
            .subscribe(new Observer<Integer>() {
                @Override
                public void onCompleted() {

                }

                @Override
                public void onError(Throwable e) {

                }

                @Override
                public void onNext(Integer integer) {
                    System.out.println("i = " + integer);
                }
            });
}

先打印結(jié)果:</br>
i = 9</br>
i = 19</br>
i = 29</br>
i = 39</br>
i = 49</br>

分析:這里我們先創(chuàng)建了一個(gè)Observable,主要就是一個(gè)for循環(huán),依次發(fā)射0~50,為了驗(yàn)證方便呢,就加了一個(gè)判斷,如果發(fā)射的是整數(shù)就線程休眠5s,為什么要這樣干呢,你想cpu多快啊,才50個(gè)數(shù)不是一滋溜就發(fā)射完了么,那之后通過(guò)sample(2, TimeUnit.SECONDS)設(shè)置的2s發(fā)射一個(gè)最近的值不是只有最后的一個(gè)值了么,打印的結(jié)果也就達(dá)不到驗(yàn)證的目的了呀。這里再附上一張圖:

sample.jpg

如果我們想讓它定時(shí)發(fā)射第一個(gè)元素而不是最近的一個(gè)元素,我們可以使用throttleFirst()。

Timeout

有的時(shí)候我們?cè)谝?guī)定的時(shí)間內(nèi)必須要有一個(gè)數(shù)據(jù),就上文的溫度傳感器來(lái)說(shuō),我們想讓它每隔兩秒至少發(fā)射一個(gè),那么我們就可以用timeout函數(shù)來(lái)監(jiān)聽(tīng)觀測(cè)序列,如果在我們?cè)O(shè)定的時(shí)間內(nèi)沒(méi)有得到一個(gè)值就發(fā)射一個(gè)錯(cuò)誤。

public static void main(String... args) {
    Observable.create(new Observable.OnSubscribe<Integer>() {
        @Override
        public void call(Subscriber<? super Integer> subscriber) {
            for (int i = 0; i < 50; i++) {
                if (i % 10 == 0){
                    try {
                        Thread.sleep(2000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                subscriber.onNext(i);
            }
        }
    })
            .timeout(2, TimeUnit.SECONDS)
            .subscribe(new Observer<Integer>() {
                @Override
                public void onCompleted() {

                }

                @Override
                public void onError(Throwable e) {
                    System.out.println("Timeout error");
                }

                @Override
                public void onNext(Integer integer) {
                    System.out.println("i = " + integer);
                }
            });
}

打印結(jié)果:</br>
Timeout error</br>
可以看到,我們通過(guò)timeout(2, TimeUnit.SECONDS)設(shè)置了2s的時(shí)間限制,而在Observable中讓線程休眠了2s,那么觸發(fā)了Timeout,發(fā)射了一個(gè)錯(cuò)誤。

Debounce

debounce()過(guò)濾掉了由Observable發(fā)射的速率過(guò)快的數(shù)據(jù),如果在一個(gè)指定的時(shí)間間隔過(guò)去了仍舊沒(méi)有發(fā)射一個(gè),那么它將發(fā)射最后的那個(gè)。

public static void main(String... args) {
    Observable.create(new Observable.OnSubscribe<Integer>() {
        @Override
        public void call(Subscriber<? super Integer> subscriber) {
            for (int i = 0; i <= 50; i++) {
                if (i % 10 == 0){
                    try {
                        Thread.sleep(3000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                subscriber.onNext(i);
            }
        }
    })
    .debounce(2 , TimeUnit.SECONDS)
    .subscribe(new Observer<Integer>() {
        @Override
        public void onCompleted() {

        }

        @Override
        public void onError(Throwable e) {

        }

        @Override
        public void onNext(Integer integer) {
            System.out.println("i = " + integer);
        }
    });
}

打印結(jié)果:</br>
i = 9</br>
i = 19</br>
i = 29</br>
i = 39</br>
i = 49</br>
從打印的結(jié)果來(lái)看,與之前的sample一般無(wú)二,但是要理解意義的不同,sample是在一條可觀測(cè)序列中,選擇指定時(shí)間段要發(fā)射的元素發(fā)射出來(lái),而debounce是指一段時(shí)間內(nèi)沒(méi)有新數(shù)據(jù)發(fā)射,那么就發(fā)射最后的那一個(gè)。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 注:只包含標(biāo)準(zhǔn)包中的操作符,用于個(gè)人學(xué)習(xí)及備忘參考博客:http://blog.csdn.net/maplejaw...
    小白要超神閱讀 2,366評(píng)論 2 8
  • 注:只包含標(biāo)準(zhǔn)包中的操作符,用于個(gè)人學(xué)習(xí)及備忘參考博客:http://blog.csdn.net/maplejaw...
    小白要超神閱讀 1,053評(píng)論 0 3
  • 作者: maplejaw本篇只解析標(biāo)準(zhǔn)包中的操作符。對(duì)于擴(kuò)展包,由于使用率較低,如有需求,請(qǐng)讀者自行查閱文檔。 創(chuàng)...
    maplejaw_閱讀 46,188評(píng)論 8 93
  • 版權(quán)聲明:本文為小斑馬偉原創(chuàng)文章,轉(zhuǎn)載請(qǐng)注明出處! 上篇簡(jiǎn)單的闡述了響應(yīng)式編程的基本理論。這篇主要對(duì)響應(yīng)編程進(jìn)行詳...
    ZebraWei閱讀 3,220評(píng)論 0 2
  • 【12.28】超級(jí)個(gè)體—彭加輝 每日打卡50/100 【三件事】 1.詳情頁(yè)的制作,第三天。(昨天基本已大致完成,...
    純簡(jiǎn)之佳閱讀 261評(píng)論 0 1

友情鏈接更多精彩內(nèi)容