Filter
RxJava讓我們使用filter()方法來(lái)過(guò)濾觀測(cè)序列中我們不想要的值。
-
先來(lái)個(gè)沒(méi)有帶過(guò)濾的
public static void main(String... args) { Observable.create(new Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? super String> subscriber) { subscriber.onNext("張三"); subscriber.onNext("李四"); subscriber.onNext("王五"); } }).subscribe(new Observer<String>() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { } @Override public void onNext(String s) { System.out.println("我是" + s); } }); }
打印結(jié)果:</br>
我是張三</br>
我是李四</br>
我是王五</br>
-
現(xiàn)在在創(chuàng)建Observable之后添加filter()方法。
public static void main(String... args) { Observable.create(new Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? super String> subscriber) { subscriber.onNext("張三"); subscriber.onNext("李四"); subscriber.onNext("王五"); } }).filter(new Func1<String, Boolean>() { @Override public Boolean call(String s) { return s.startsWith("李"); } }).subscribe(new Observer<String>() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { } @Override public void onNext(String s) { System.out.println("我是" + s); } }); } 可以看到添加的filter()方法,判斷字符串是否以"李"開(kāi)頭,返回一個(gè)布爾值,只要條件符合filter()函數(shù)就會(huì)返回true。此時(shí),該值就會(huì)發(fā)送出去。
打印結(jié)果:</br>
我是李四</br>
Take
許多時(shí)候,可能生產(chǎn)者(也就是被觀察者)訂閱了好幾個(gè)消費(fèi)者(也就是觀察者),以后就用生產(chǎn)者和消費(fèi)者來(lái)敘述,觀察者和被觀察者拗口得要命有木有!言歸正傳,生產(chǎn)者會(huì)產(chǎn)生一條數(shù)據(jù)流,而你消費(fèi)者可能僅僅只需要開(kāi)頭或者結(jié)尾的幾個(gè)元素,那么RxJava也為我們提供了take()和takeLast()方法來(lái)實(shí)現(xiàn)。
-
take(),如果我們只想要一個(gè)觀測(cè)序列中的前兩個(gè)元素,給take()傳入?yún)?shù):整數(shù)2,就能實(shí)現(xiàn)。
public static void main(String... args) { Observable.create(new Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? super String> subscriber) { subscriber.onNext("張三"); subscriber.onNext("李四"); subscriber.onNext("王五"); } }) .take(2) .subscribe(new Observer<String>() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { } @Override public void onNext(String s) { System.out.println("我是" + s); } }); }
打印結(jié)果:</br>
我是張三</br>
我是李四</br>
在這里的take(n),表示的是說(shuō)生產(chǎn)者發(fā)送前n個(gè)數(shù)據(jù),n = 2 也就發(fā)送前兩個(gè)數(shù)據(jù),并不是說(shuō)數(shù)據(jù)全部發(fā)完,截取前兩個(gè)數(shù)據(jù)。
-
takeLast()能夠讓我們發(fā)送后幾個(gè)數(shù)據(jù)元素。
public static void main(String... args) { Observable.create(new Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? super String> subscriber) { subscriber.onNext("張三"); subscriber.onNext("李四"); subscriber.onNext("王五"); subscriber.onCompleted(); } }) .takeLast(1) .subscribe(new Observer<String>() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { } @Override public void onNext(String s) { System.out.println("我是" + s); } }); }
同樣的,不能少了subscriber.onCompleted()。
打印結(jié)果:</br>
我是王五</br>
Distinct
distinct()作用于一個(gè)完整的序列,所有重復(fù)的數(shù)據(jù)項(xiàng)只會(huì)發(fā)射一次。
public static void main(String... args) {
Observable.just(1,2,1,2)
.distinct()
.subscribe(new Observer<Integer>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Integer integer) {
System.out.println("i = " + integer);
}
});
}
打印結(jié)果:</br>
i = 1</br>
i = 2</br>
DistinctUntilChanged
distinctUntilChanged()與distinct()相類(lèi)似,不過(guò)distinctUntilChanged()是判斷當(dāng)前發(fā)射的值與前一個(gè)數(shù)據(jù)是否相同,在實(shí)際中,可以假設(shè)情形比如說(shuō)UI根據(jù)獲取到的數(shù)據(jù)不同更新自身UI,但是如果數(shù)據(jù)內(nèi)容并沒(méi)有發(fā)生改變,出于不浪費(fèi)資源的目的,就不要發(fā)射數(shù)據(jù)。
public static void main(String... args) {
Observable.just(1,2,2)
.distinct()
.subscribe(new Observer<Integer>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Integer integer) {
System.out.println("i = " + integer);
}
});
}
打印結(jié)果:</br>
i = 1</br>
i = 2</br>
First
first()從Observable中只發(fā)射第一個(gè)元素,或者添加參數(shù)first(Fun1)只發(fā)送符合條件的第一個(gè)數(shù)據(jù)項(xiàng)。
public static void main(String... args) {
Observable.just(1,2,2)
.first()
.subscribe(new Observer<Integer>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Integer integer) {
System.out.println("i = " + integer);
}
});
}
打印結(jié)果:</br>
i = 1
public static void main(String... args) {
Observable.just(1,2,2)
.first(new Func1<Integer, Boolean>() {
@Override
public Boolean call(Integer integer) {
return integer == 2;
}
})
.subscribe(new Observer<Integer>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Integer integer) {
System.out.println("i = " + integer);
}
});
}
打印結(jié)果:</br>
i = 2</br>
Last
first()從Observable中只發(fā)射最后一個(gè)元素,或者添加參數(shù)first(Fun1)只發(fā)送符合條件的最后一個(gè)數(shù)據(jù)項(xiàng)。代碼參考First。
Skip
skip(int)可以讓我們忽略O(shè)bservable前n個(gè)元素,而直接跳過(guò)這n個(gè)元素發(fā)射后面的元素。
public static void main(String... args) {
Observable.just(1,2,2)
.skip(2)
.subscribe(new Observer<Integer>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Integer integer) {
System.out.println("i = " + integer);
}
});
}
打印結(jié)果:</br>
i = 2</br>
SkipLast
skipLast(int)則是忽略后n個(gè)元素的發(fā)射。
ElementAt
現(xiàn)在我們有了控制前后的元素過(guò)濾規(guī)則,那么自然會(huì)有一個(gè)問(wèn)題,如果我只想要觀測(cè)序列其中的一個(gè)元素該怎么辦呢,那么elementAt(int)就能實(shí)現(xiàn)。elementAt(int)用來(lái)獲取元素Observable發(fā)射的事件序列中的第n項(xiàng)數(shù)據(jù),并當(dāng)做唯一的數(shù)據(jù)發(fā)射出去。
public static void main(String... args) {
Observable.just(1,2,3)
.elementAt(2)
.subscribe(new Observer<Integer>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Integer integer) {
System.out.println("i = " + integer);
}
});
}
打印結(jié)果:</br>
i = 3</br>
同時(shí)還有一個(gè)拓展方法,如果想查找第六個(gè)元素,但是可觀測(cè)序列只有三個(gè)元素怎么辦,可以用elementAtOrDefault(int index, T defaultValue),在第二個(gè)參數(shù)傳入一個(gè)默認(rèn)值。
Sample
假如我們有一個(gè)溫度傳感器,每秒鐘都會(huì)發(fā)射一次室內(nèi)溫度,然后UI根據(jù)溫度變化而更新,但是有一個(gè)問(wèn)題,我們認(rèn)為每秒鐘就獲取一次數(shù)據(jù)并更新相當(dāng)?shù)睦速M(fèi)資源,再說(shuō)溫度也不一定變化這么快,那么我們就需要一個(gè)小小的發(fā)射間隔。sample()就能幫我們做到這一點(diǎn),在Observable后面加一個(gè)sample(),將創(chuàng)建一個(gè)新的觀測(cè)序列,并且它會(huì)在指定的時(shí)間間隔里由Observable發(fā)射最近的一次數(shù)值。
public static void main(String... args) {
Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
for (int i = 0; i <= 50; i++) {
if (i % 10 ==0){
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
subscriber.onNext(i);
}
}
});
observable.sample(2, TimeUnit.SECONDS)
.subscribe(new Observer<Integer>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Integer integer) {
System.out.println("i = " + integer);
}
});
}
先打印結(jié)果:</br>
i = 9</br>
i = 19</br>
i = 29</br>
i = 39</br>
i = 49</br>
分析:這里我們先創(chuàng)建了一個(gè)Observable,主要就是一個(gè)for循環(huán),依次發(fā)射0~50,為了驗(yàn)證方便呢,就加了一個(gè)判斷,如果發(fā)射的是整數(shù)就線程休眠5s,為什么要這樣干呢,你想cpu多快啊,才50個(gè)數(shù)不是一滋溜就發(fā)射完了么,那之后通過(guò)sample(2, TimeUnit.SECONDS)設(shè)置的2s發(fā)射一個(gè)最近的值不是只有最后的一個(gè)值了么,打印的結(jié)果也就達(dá)不到驗(yàn)證的目的了呀。這里再附上一張圖:

如果我們想讓它定時(shí)發(fā)射第一個(gè)元素而不是最近的一個(gè)元素,我們可以使用throttleFirst()。
Timeout
有的時(shí)候我們?cè)谝?guī)定的時(shí)間內(nèi)必須要有一個(gè)數(shù)據(jù),就上文的溫度傳感器來(lái)說(shuō),我們想讓它每隔兩秒至少發(fā)射一個(gè),那么我們就可以用timeout函數(shù)來(lái)監(jiān)聽(tīng)觀測(cè)序列,如果在我們?cè)O(shè)定的時(shí)間內(nèi)沒(méi)有得到一個(gè)值就發(fā)射一個(gè)錯(cuò)誤。
public static void main(String... args) {
Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
for (int i = 0; i < 50; i++) {
if (i % 10 == 0){
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
subscriber.onNext(i);
}
}
})
.timeout(2, TimeUnit.SECONDS)
.subscribe(new Observer<Integer>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
System.out.println("Timeout error");
}
@Override
public void onNext(Integer integer) {
System.out.println("i = " + integer);
}
});
}
打印結(jié)果:</br>
Timeout error</br>
可以看到,我們通過(guò)timeout(2, TimeUnit.SECONDS)設(shè)置了2s的時(shí)間限制,而在Observable中讓線程休眠了2s,那么觸發(fā)了Timeout,發(fā)射了一個(gè)錯(cuò)誤。
Debounce
debounce()過(guò)濾掉了由Observable發(fā)射的速率過(guò)快的數(shù)據(jù),如果在一個(gè)指定的時(shí)間間隔過(guò)去了仍舊沒(méi)有發(fā)射一個(gè),那么它將發(fā)射最后的那個(gè)。
public static void main(String... args) {
Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
for (int i = 0; i <= 50; i++) {
if (i % 10 == 0){
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
subscriber.onNext(i);
}
}
})
.debounce(2 , TimeUnit.SECONDS)
.subscribe(new Observer<Integer>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Integer integer) {
System.out.println("i = " + integer);
}
});
}
打印結(jié)果:</br>
i = 9</br>
i = 19</br>
i = 29</br>
i = 39</br>
i = 49</br>
從打印的結(jié)果來(lái)看,與之前的sample一般無(wú)二,但是要理解意義的不同,sample是在一條可觀測(cè)序列中,選擇指定時(shí)間段要發(fā)射的元素發(fā)射出來(lái),而debounce是指一段時(shí)間內(nèi)沒(méi)有新數(shù)據(jù)發(fā)射,那么就發(fā)射最后的那一個(gè)。