今天我們來看部分RxJava相關(guān)的知識,主要是上一篇RxJava內(nèi)存泄漏的一種解決方案提到的開源框架RxLifecycle里面會涉及到的知識點,有下面幾個:
1 Subject
2 takeUntil
3 filter
4 compose
1.Subject
從代碼可以看出來Subject既可以當觀察者也可以當被觀察者。
public abstract class Subject<T> extends Observable<T> implements Observer<T>
所以可以在生命周期中通過Subject發(fā)送事件然后又自己接收,從而根據(jù)事件類型做相應(yīng)的操作。
Subject總共有四種類型
1 AsyncSubject
2 BehaviorSubject
3 PublishSubject
4 ReplaySubject
今天我們就說下第二種類型BehaviorSubject,它可以給訂閱者發(fā)送訂閱前最近的事件和訂閱后發(fā)送的事件:
圖中橙色的就是訂閱前最近發(fā)送的事件,在訂閱后也可以收到。文字解釋始終太蒼白,我們來看下代碼:
BehaviorSubject<Integer> behaviorSubject = BehaviorSubject.create();
behaviorSubject.onNext(1);
behaviorSubject.onNext(2);
behaviorSubject.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Timber.tag(TAG).d("running num : " + integer);
}
});
behaviorSubject.onNext(3);
behaviorSubject.onNext(4);
上面代碼運行結(jié)果就是收到2, 3,4
2.takeUntil
這是一個操作符,可以這樣用
AObservable.takeUntil(BObservable)
可以AObservable監(jiān)聽另外一個BObservable,如果BObservable開始發(fā)送數(shù)據(jù),AObservable就不再發(fā)送數(shù)據(jù)。
看一下官方的圖片解釋,B發(fā)送0數(shù)據(jù)后,A就停止發(fā)送數(shù)據(jù)了。
talk is cheap, show me the code:
Observable.interval(1, TimeUnit.SECONDS).
subscribeOn(Schedulers.io()).
takeUntil(Observable.timer(5, TimeUnit.SECONDS)).
subscribe(new Consumer<Long>() {
@Override
public void accept(Long num) throws Exception {
Timber.tag(TAG).d("running num : " + num);
}
});
上面代碼的意思就是從0開發(fā)每隔1秒發(fā)送一個數(shù)據(jù),5s時停止發(fā)送,看下運行結(jié)果,和我們的預(yù)期完美一致:
3.filter
filter操作符就是過濾的意思,只有事件滿足過濾條件時被觀察者才會發(fā)送給觀察者??聪鹿俜降慕忉寛D,很清晰明了我就不做解釋了哈。
看一下怎么用,這個代碼的意思還是每個1s發(fā)送數(shù)據(jù),但是會進行過濾只發(fā)送偶數(shù),也是5秒后停止發(fā)送:
Observable.interval(1, TimeUnit.SECONDS).
subscribeOn(Schedulers.io()).
filter(new Predicate<Long>() {
@Override
public boolean test(Long aLong) throws Exception {
return aLong % 2 == 0;
}
}).
takeUntil(Observable.timer(5, TimeUnit.SECONDS)).
subscribe(new Consumer<Long>() {
@Override
public void accept(Long num) throws Exception {
Timber.tag(TAG).e("running num : " + num);
}
});
上面代碼的運行效果,確實是只收到了偶數(shù)。
4.compose
compose操作符是用來對Observable進行轉(zhuǎn)換操作的,并且可以保證調(diào)用鏈不被破壞。
比如我們經(jīng)常這樣用:
Observable.interval(1,TimeUnit.SECONDS)
.subscribeOn(Schedulers.io()).
observeOn(AndroidSchedulers.mainThread());
這部分代碼經(jīng)常寫,怎么進行封裝呢?可能有的小伙伴立馬就想到下面的方法:
private Observable composeObservable(Observable observable){
return observable.subscribeOn(Schedulers.io()).
observeOn(AndroidSchedulers.mainThread());
}
但是上面這樣用就破壞了調(diào)用鏈了,因為你肯定得這樣調(diào)用,這樣就會變得怪怪的,不是Observable開頭了,變成函數(shù)開頭。
composeObservable(Observable.interval(1,TimeUnit.SECONDS)).subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
}
});
這個問題用compose就可以完美解決:
Observable.interval(1, TimeUnit.SECONDS).
compose(bindUntil(5)).
subscribe(new Consumer<Long>() {
@Override
public void accept(Long num) throws Exception {
Timber.tag(TAG).d("running num : " + num);
}
});
private ObservableTransformer<Long, Long> bindUntil(final long deleyTime) {
return new ObservableTransformer<Long, Long>() {
@Override
public ObservableSource<Long> apply(Observable<Long> upstream) {
return upstream.subscribeOn(Schedulers.io()).takeUntil(Observable.timer(deleyTime, TimeUnit.SECONDS));
}
};
}
5.總結(jié)
上面的內(nèi)容是假定大家有一點點RxJava的知識的,沒有涉及到基本的使用。本次分享可能看起來毫無章法哈,其實還是有針對目的的,就是前面提到的開源框架RxLifecycle,這次分享就是針對里面用到的RxJava的一些知識點進行解析。RxJava的操作符挺多的,也不太可能也沒必要一個個進行分析,用到的時候進行查找官方文檔就可以了。
下面會用前面提到的這些知識點來自己實現(xiàn)一個類似于RxLifecycle的小Demo,歡迎大家關(guān)注和點贊哈。
最后感謝@右傾傾的理解和支持哈。
以上!
歡迎關(guān)注公眾號:JueCode