優(yōu)美的異步 --- RxAndroid
這里和大家一起分享一下一個(gè)著名的Android異步庫(kù)RxAndroid。它應(yīng)該是2016年最流行的開(kāi)源庫(kù)之一。RxAndroid起源于RxJava,是一個(gè)專門針對(duì)Android版本的Rxjava庫(kù)。RxAndroid-Github 目前最新的版本是v2.0.x我們今天的分享也基于2.0版本的API。
響應(yīng)式編程
什么是響應(yīng)式編程?和平常經(jīng)常聽(tīng)說(shuō)的面向?qū)ο缶幊毯秃瘮?shù)式編程一樣,響應(yīng)式編程(Reactive Programming)就是一個(gè)編程范式,但是與其他編程范式不同的是它是基于數(shù)據(jù)流和變化傳播的。我們經(jīng)常在程序中這樣寫
A = B + C
A被賦值為B和C的值。這時(shí),如果我們改變B的值,A的值并不會(huì)隨之改變。而如果我們運(yùn)用一種機(jī)制,當(dāng)B或者C的值發(fā)現(xiàn)變化的時(shí)候,A的值也隨之改變,這樣就實(shí)現(xiàn)了響應(yīng)式。
而響應(yīng)式編程的提出,其目的就是簡(jiǎn)化類似的操作,因此它在用戶界面編程領(lǐng)域以及基于實(shí)時(shí)系統(tǒng)的動(dòng)畫(huà)方面都有廣泛的應(yīng)用。另一方面,在處理嵌套回調(diào)的異步事件,復(fù)雜的列表過(guò)濾和變換的時(shí)候也都有良好的表現(xiàn)。
RxAndroid其實(shí)是一個(gè)響應(yīng)式編程思想的實(shí)現(xiàn)庫(kù)。也因?yàn)檫@樣的思想,是它在一些方面表現(xiàn)的異常優(yōu)秀。下面我將先用一個(gè)簡(jiǎn)單的例子,讓大家直觀的感受一下的樣子。
網(wǎng)絡(luò)加載圖片顯示
Observable.just(getDrawableFromNet())
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Drawable>() {
@Override
public void accept(Drawable drawable) throws Exception {
((ImageView)findViewById(R.id.imageView)).setImageDrawable(drawable);
}
});
環(huán)境搭建
RxAndroid環(huán)境只需求要引入如下項(xiàng)目即可,我們不但需要RxAndroid項(xiàng)目還需要RxJava項(xiàng)目。
compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
compile 'io.reactivex.rxjava2:rxjava:2.1.5'
基礎(chǔ)知識(shí)
RxAndroid的核心就是“異步”兩個(gè)字,其最關(guān)鍵的東西就是三個(gè):
Observable(被觀察者)
Observer(觀察者)
Subscriber (訂閱)
Observable可以理解為事件的發(fā)送者,就好像快遞的寄出者,而這些事件就好比快遞
Observer可以理解為事件的接收者,就好像快遞的接收者
Subscriber 綁定兩者
Observable可以發(fā)出一系列的 事件,這里的事件可以是任何東西,例如網(wǎng)絡(luò)請(qǐng)求、復(fù)雜計(jì)算處理、數(shù)據(jù)庫(kù)操作、文件操作等等,事件執(zhí)行結(jié)束后交給 Observer回調(diào)處理。
那他們之間是如何進(jìn)行聯(lián)系的呢?答案就是通過(guò)subscribe()方法。
下面我們通過(guò)一個(gè)HelloDemo來(lái)看看Observable與Observer進(jìn)行關(guān)聯(lián)的典型方式,
private void test_1() {
Observable<String> oble = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
e.onNext("hello");
e.onComplete();
e.onNext("hello2");
}
});
Observer<String> oser = new Observer<String>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
Log.w("kaelpu","onSubscribe");
}
@Override
public void onNext(@NonNull String s) {
Log.w("kaelpu","onNext = "+s);
}
@Override
public void onError(@NonNull Throwable e) {
Log.w("kaelpu","onError" + e);
}
@Override
public void onComplete() {
Log.w("kaelpu","onComplete");
}
};
Log.w("kaelpu","subscribe");
oble.subscribe(oser);
}
10-21 01:28:01.600 11386-11386/? W/kaelpu: subscribe
10-21 01:28:01.600 11386-11386/? W/kaelpu: onSubscribe
10-21 01:28:01.600 11386-11386/? W/kaelpu: onNext = hello
10-21 01:28:01.600 11386-11386/? W/kaelpu: onComplete
其實(shí)這段代碼干了三件事:
- 創(chuàng)建被觀察者對(duì)象oble
- 創(chuàng)建觀察者oser
- 連接觀察者和被觀察者
被觀察者通過(guò)onNext函數(shù)給觀察者通知結(jié)果
被貫徹者onComplete函數(shù)通知觀察者執(zhí)行結(jié)束
連接觀察者和被觀察者我們使用subscribe函數(shù)
- 通過(guò)打印的log我們可以看到觀察者函數(shù)調(diào)用情況,調(diào)用subscribe函數(shù)去綁定觀察者和被觀察者時(shí)候,觀察者的onSubscribe函數(shù)會(huì)被回調(diào)表示建立關(guān)聯(lián)。
- 接著每當(dāng)被觀察者調(diào)用onNext給觀察者發(fā)送數(shù)據(jù)時(shí)候,觀察者的onNext 會(huì)收到回調(diào),并且得到所發(fā)送的數(shù)據(jù)。
- 當(dāng)被觀察者調(diào)用onComplete函數(shù)時(shí)候,代表著完成,觀察者的onComplete回調(diào)會(huì)被觸發(fā),并且斷開(kāi)了兩者的關(guān)聯(lián),這時(shí)被觀察者再發(fā)送數(shù)據(jù),觀察者也不會(huì)收到。
當(dāng)然我們注意到觀察者還有一個(gè)onError函數(shù)沒(méi)有被觸發(fā)過(guò),那么該怎么觸發(fā)呢,又代表著什么意思呢?我們來(lái)改變一下代碼:
private String error() throws Exception {
throw new Exception();
}
添加一個(gè)函數(shù),名字隨便但是返回值是String,這里我們叫做error函數(shù)。函數(shù)很簡(jiǎn)單就是拋出一個(gè)異常。然后我們繼續(xù)修改被觀察者的代碼如下:
Observable<String> oble = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
e.onNext("hello");
e.onNext(error());
e.onNext("hello1");
e.onComplete();
e.onNext("hello2");
}
});
其實(shí)我們就添加了兩行,添加了一個(gè)e.onNext(error()) 并且在之后還添加了一個(gè)e.onNext("hello1") 運(yùn)行一下我們看看
W/kaelpu: subscribe
W/kaelpu: onSubscribe
W/kaelpu: onNexthello
W/kaelpu: onErrorjava.lang.Exception
折斷l(xiāng)og說(shuō)明三個(gè)問(wèn)題:
- 被觀察者onNext中是可以運(yùn)行函數(shù)的
- 如果運(yùn)行的函數(shù)報(bào)錯(cuò),則會(huì)調(diào)用我們觀察者的onError函數(shù)
- 當(dāng)調(diào)用onError函數(shù)時(shí)候,也會(huì)斷開(kāi)關(guān)聯(lián),被觀察者收不到后面的數(shù)據(jù),但是觀察者依然會(huì)繼續(xù)發(fā)送。
最為關(guān)鍵的是onComplete和onError必須唯一并且互斥, 即不能發(fā)多個(gè)onComplete, 也不能發(fā)多個(gè)onError, 也不能先發(fā)一個(gè)onComplete, 然后再發(fā)一個(gè)onError, 反之亦然。
關(guān)于onComplete和onError唯一并且互斥這一點(diǎn), 是需要自行在代碼中進(jìn)行控制, 如果你的代碼邏輯中違背了這個(gè)規(guī)則, 并不一定會(huì)導(dǎo)致程序崩潰. 比如發(fā)送多個(gè)onComplete是可以正常運(yùn)行的, 依然是收到第一個(gè)onComplete就不再接收了, 但若是發(fā)送多個(gè)onError, 則收到第二個(gè)onError事件會(huì)導(dǎo)致程序會(huì)崩潰.當(dāng)我們寫多個(gè)onComplete時(shí),不會(huì)報(bào)錯(cuò)。
除了被觀察者能斷開(kāi)關(guān)聯(lián),觀察者也能主動(dòng)斷開(kāi)連接,調(diào)用onSubscribe函數(shù)中傳入的對(duì)象Disposable的dispose()函數(shù)即可完成斷開(kāi)連接,同樣關(guān)聯(lián)斷開(kāi)后,被觀察者依然會(huì)繼續(xù)發(fā)送數(shù)據(jù)
**講到這里第一感覺(jué)是不是?**

就輸出個(gè)數(shù)字就這么麻煩,完全沒(méi)看出哪里方便了!別著急我剛開(kāi)始看RxAndroid文章也是這樣的感覺(jué),而且很多網(wǎng)上的文章都沒(méi)有解釋這個(gè)問(wèn)題。所以看一會(huì)你就更暈了。別著急我一起深呼吸,來(lái)看看如何簡(jiǎn)化操作
你可能覺(jué)得,我就打印幾個(gè)數(shù),還要把Observable寫的那么麻煩,能不能簡(jiǎn)便一點(diǎn)呢?答案是肯定的,RxAndroid內(nèi)置了很多簡(jiǎn)化創(chuàng)建Observable對(duì)象的函數(shù),比如Observable.just就是用來(lái)創(chuàng)建只發(fā)出一個(gè)事件就結(jié)束的Observable對(duì)象,上面創(chuàng)建Observable對(duì)象的代碼可以簡(jiǎn)化為一行
Observable<String> observable = Observable.just("hello");
同樣對(duì)于Observer,這個(gè)例子中,我們其實(shí)并不關(guān)心OnComplete和OnError,我們只需要在onNext的時(shí)候做一些處理,這時(shí)候就可以使用Consumer類。
Observable<String> observable = Observable.just("hello");
Consumer<String> consumer = new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println(s);
}
};
observable.subscribe(consumer);
其實(shí)在RxAndroid中,我們可以為 Observer中的三種狀態(tài)根據(jù)自身需要分別創(chuàng)建一個(gè)回調(diào)動(dòng)作,通過(guò)Action 來(lái)替代onComplete():,通過(guò)Consumer來(lái)替代 onError(Throwable t)和onNext(T t)
Observable<String> observable = Observable.just("hello");
Action onCompleteAction = new Action() {
@Override
public void run() throws Exception {
Log.i("kaelpu", "complete");
}
};
Consumer<String> onNextConsumer = new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.i("kaelpu", s);
}
};
Consumer<Throwable> onErrorConsumer = new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
Log.i("kaelpu", "error");
}
};
observable.subscribe(onNextConsumer, onErrorConsumer, onCompleteAction);
}
subscribe()有多個(gè)重載的方法:
public final Disposable subscribe() {}
public final Disposable subscribe(Consumer<? super T> onNext) {}
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError) {}
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete) {}
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Disposable> onSubscribe) {}
public final void subscribe(Observer<? super T> observer) {}
不帶任何參數(shù)的subscribe() 表示Observer不關(guān)心任何事件,Observable發(fā)送什么數(shù)據(jù)都隨你
帶有一個(gè)Consumer參數(shù)的方法表示Observer只關(guān)心onNext事件, 其他的事件我假裝沒(méi)看見(jiàn), 因此我們?nèi)绻恍枰猳nNext事件可以這么寫
只要我們?cè)俦竟?jié)中能明白觀察者和被觀察者之間是如何工作關(guān)聯(lián)的就可以
線程調(diào)度
關(guān)鍵的章節(jié)來(lái)了,看完上面的基礎(chǔ)知識(shí),很多人都會(huì)感覺(jué)就一個(gè)發(fā)送,一個(gè)接收,不就是個(gè)觀察者模式嘛,感覺(jué)一點(diǎn)卵用都沒(méi)有,還寫這么多回調(diào)方法!完全沒(méi)有看出什么優(yōu)點(diǎn)。那么這一節(jié)就讓你看到RxAndroid真正厲害的地方。
正常情況下, Observer和Observable是工作在同一個(gè)線程中的, 也就是說(shuō)Observable在哪個(gè)線程發(fā)事件, Observer就在哪個(gè)線程接收事件.
RxAndroid中, 當(dāng)我們?cè)谥骶€程中去創(chuàng)建一個(gè)Observable來(lái)發(fā)送事件, 則這個(gè)Observable默認(rèn)就在主線程發(fā)送事件.
當(dāng)我們?cè)谥骶€程去創(chuàng)建一個(gè)Observer來(lái)接收事件, 則這個(gè)Observer默認(rèn)就在主線程中接收事件,但其實(shí)在現(xiàn)實(shí)工作中我們更多的是需要進(jìn)行線程切換的,最常見(jiàn)的例子就是在子線程中請(qǐng)求網(wǎng)絡(luò)數(shù)據(jù),在主線程中進(jìn)行展示
要達(dá)到這個(gè)目的, 我們需要先改變Observable發(fā)送事件的線程, 讓它去子線程中發(fā)送事件, 然后再改變Observer的線程, 讓它去主線程接收事件. 通過(guò)RxAndroid內(nèi)置的線程調(diào)度器可以很輕松的做到這一點(diǎn). 接下來(lái)看一段代碼:
Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
Log.d("kaelpu", "Observable thread is : " + Thread.currentThread().getName());
Log.d("kaelpu", "emitter 1");
emitter.onNext(1);
}
});
Consumer<Integer> consumer = new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d("kaelpu", "Observer thread is :" + Thread.currentThread().getName());
Log.d("kaelpu", "onNext: " + integer);
}
};
observable.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(consumer);
}
Observable thread is : RxNewThreadScheduler-1
emitter 1
Observer thread is :main
onNext: 1
可以看到, observable發(fā)送事件的線程的確改變了, 是在一個(gè)叫 RxNewThreadScheduler-1的線程中發(fā)送的事件, 而consumer 仍然在主線程中接收事件, 這說(shuō)明我們的目的達(dá)成了, 接下來(lái)看看是如何做到的.
這段代碼只不過(guò)是增加了兩行代碼:
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
簡(jiǎn)單的來(lái)說(shuō), subscribeOn() 指定的是Observable發(fā)送事件的線程, observeOn() 指定的是Observer接收事件的線程.
多次指定Observable的線程只有第一次指定的有效, 也就是說(shuō)多次調(diào)用subscribeOn() 只有第一次的有效, 其余的會(huì)被忽略.
多次指定Observer的線程是可以的, 也就是說(shuō)每調(diào)用一次observeOn() , Observer的線程就會(huì)切換一次.例如:
observable.subscribeOn(Schedulers.newThread())
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.observeOn(Schedulers.io())
.subscribe(consumer);
Observable thread is : RxNewThreadScheduler-1
emitter 1
Observer thread is :RxCachedThreadScheduler-2
onNext: 1
可以看到, Observable雖然指定了兩次線程, 但只有第一次指定的有效, 依然是在RxNewThreadScheduler線程中, 而Observer則跑到了RxCachedThreadScheduler 中, 這個(gè)CacheThread其實(shí)就是IO線程池中的一個(gè).
在 RxAndroid 中,提供了一個(gè)名為 Scheduler 的線程調(diào)度器,RxAndroid 內(nèi)部提供了4個(gè)調(diào)度器,分別是:
- Schedulers.io(): I/O 操作(讀寫文件、數(shù)據(jù)庫(kù)、網(wǎng)絡(luò)請(qǐng)求等),與newThread()差不多,區(qū)別在于io() 的內(nèi)部實(shí)現(xiàn)是是用一個(gè)無(wú)數(shù)量上限的線程池,可以重用空閑的線程,因此多數(shù)情況下 io() 效率比 newThread() 更高。值得注意的是,在 io() 下,不要進(jìn)行大量的計(jì)算,以免產(chǎn)生不必要的線程;
- Schedulers.newThread(): 開(kāi)啟新線程操作;
- Schedulers.immediate(): 默認(rèn)指定的線程,也就是當(dāng)前線程;
- Schedulers.computation():計(jì)算所使用的調(diào)度器。這個(gè)計(jì)算指的是 CPU 密集型計(jì)算,即不會(huì)被 I/O等操作限制性能的操作,例如圖形的計(jì)算。這個(gè) Scheduler 使用的固定的線程池,大小為 CPU 核數(shù)。值得注意的是,不要把 I/O 操作放在 computation() 中,否則 I/O 操作的等待時(shí)間會(huì)浪費(fèi) CPU;
- AndroidSchedulers.mainThread(): Rxndroid 擴(kuò)展的 Android 主線程;
這些內(nèi)置的Scheduler已經(jīng)足夠滿足我們開(kāi)發(fā)的需求, 因此我們應(yīng)該使用內(nèi)置的這些選項(xiàng), 在RxAndroid內(nèi)部使用的是線程池來(lái)維護(hù)這些線程, 所有效率也比較高。
對(duì)于線程還需要注意
- create() , just() , from() 等 --- 事件產(chǎn)生
- map() , flapMap() , scan() , filter() 等 -- 事件加工
- subscribe() -- 事件消費(fèi)
事件產(chǎn)生:默認(rèn)運(yùn)行在當(dāng)前線程,可以由 subscribeOn() 自定義線程
事件加工:默認(rèn)跟事件產(chǎn)生的線程保持一致, 可由 observeOn() 自定義線程
事件消費(fèi):默認(rèn)運(yùn)行在當(dāng)前線程,可以有observeOn() 自定義
好了說(shuō)了這么多了,我們來(lái)寫個(gè)簡(jiǎn)單的異步的例子,看看實(shí)際效果。我們這個(gè)例子就以加載網(wǎng)絡(luò)圖片并顯示為例:
首先我們寫一個(gè)耗時(shí)函數(shù),用來(lái)模擬圖片請(qǐng)求
// 模擬網(wǎng)絡(luò)請(qǐng)求圖片
private Drawable getDrawableFromUrl(String url){
try {
Thread.sleep(6000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return getResources().getDrawable(R.drawable.baidu);
}
代碼很簡(jiǎn)答,就是線程sleep 6秒,然后返回一張圖片,如果運(yùn)行在主線程那就會(huì)NAR,然后我么來(lái)用RxAndroid寫一下這個(gè)異步拉去圖片并顯是的操作!
Observable.just(getDrawableFromNet("http://www.baidu.com/icon.png"))
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Drawable>() {
@Override
public void accept(Drawable drawable) throws Exception {
((ImageView)findViewById(R.id.imageView)).setImageDrawable(drawable);
}
});
就這幾行代碼就搞定了!自己結(jié)合上面講的理解一下~這里就不做解釋了!因?yàn)槲覀冞€有更重要的一個(gè)環(huán)節(jié),這個(gè)環(huán)節(jié)堪稱RxAndroid的精髓!
操作符的使用
在了解基本知識(shí)和線程調(diào)度后,我們來(lái)學(xué)習(xí)一下RxAndroid各種神奇的操作符
Map
Map是RxAndroid中最簡(jiǎn)單的一個(gè)變換操作符了, 它的作用就是對(duì)Observable發(fā)送的每一個(gè)事件應(yīng)用一個(gè)函數(shù), 使得每一個(gè)事件都按照指定的函數(shù)去變化。
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
}
}).map(new Function<Integer, String>() {
@Override
public String apply(Integer integer) throws Exception {
return "This is result " + integer;
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d("kaelpu", s);
}
});
This is result 1
This is result 2
This is result 3
通過(guò)Map, 可以將Observable發(fā)來(lái)的事件轉(zhuǎn)換為任意的類型, 可以是一個(gè)Object, 也可以是一個(gè)集合,功能非常強(qiáng)大
例子:還是以圖片加載的例子,我們傳進(jìn)來(lái)一個(gè)圖片的路徑,然后通過(guò)Map把drawble轉(zhuǎn)換成bitmap再發(fā)送給觀察者
Observable.just(getDrawableFromNet())
.map(new Function<Drawable, Bitmap>() {
@Override
public Bitmap apply(@NonNull Drawable drawable) throws Exception {
BitmapDrawable bt = (BitmapDrawable)drawable;
return bt.getBitmap();
}
})
.subscribeOn(AndroidSchedulers.mainThread())
.observeOn(Schedulers.newThread())
.subscribe(new Consumer<Bitmap>() {
@Override
public void accept(Bitmap bitmap) throws Exception {
}
});
Observable –> map變換 –> Observable
url -> drawable -> bitmap
不用到處調(diào)代碼,直接一個(gè)鏈?zhǔn)讲僮?.. 是不是感覺(jué)很爽!
ZIP
Zip通過(guò)一個(gè)函數(shù)將多個(gè)Observable發(fā)送的事件結(jié)合到一起,然后發(fā)送這些組合到一起的事件. 它按照嚴(yán)格的順序應(yīng)用這個(gè)函數(shù)。它只發(fā)射與發(fā)射數(shù)據(jù)項(xiàng)最少的那個(gè)Observable一樣多的數(shù)據(jù)。
Observable<Integer> observable1 = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
Log.d(TAG, "emitter 1");
emitter.onNext(1);
Log.d(TAG, "emitter 2");
emitter.onNext(2);
Log.d(TAG, "emitter 3");
emitter.onNext(3);
Log.d(TAG, "emitter 4");
emitter.onNext(4);
Log.d(TAG, "emit complete1");
emitter.onComplete();
}
});
Observable<String> observable2 = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
Log.d(TAG, "emitter A");
emitter.onNext("A");
Log.d(TAG, "emitter B");
emitter.onNext("B");
Log.d(TAG, "emitter C");
emitter.onNext("C");
Log.d(TAG, "emitter complete2");
emitter.onComplete();
}
});
Observable.zip(observable1, observable2, new BiFunction<Integer, String, String>() {
@Override
public String apply(Integer integer, String s) throws Exception {
return integer + s;
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe");
}
@Override
public void onNext(String value) {
Log.d(TAG, "onNext: " + value);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
我們分別創(chuàng)建了observable, 一個(gè)發(fā)送1,2,3,4,Complete, 另一個(gè)發(fā)送A,B,C,Complete, 接著用Zip把發(fā)出的事件組合, 來(lái)看看運(yùn)行結(jié)果吧:
onSubscribe
emitter 1
emitter 2
emitter 3
emitter 4
emit complete1
emitter A
onNext: 1A
emitter B
onNext: 2B
emitter C
onNext: 3C
emitter complete2
onComplete
觀察發(fā)現(xiàn)observable1發(fā)送事件后,observable2才發(fā)送
這是因?yàn)槲覀儍蓚€(gè)observable都是運(yùn)行在同一個(gè)線程里, 同一個(gè)線程里執(zhí)行代碼肯定有先后順序呀.
from
在Rxndroid的from操作符到2.0已經(jīng)被拆分成了3個(gè),fromArray, fromIterable, fromFuture接收一個(gè)集合作為輸入,然后每次輸出一個(gè)元素給subscriber。
Observable.fromArray(new Integer[]{1, 2, 3, 4, 5}).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.i(TAG, "number:" + integer);
}
});
number:1
number:2
number:3
number:4
number:5
注意:如果from()里面執(zhí)行了耗時(shí)操作,即使使用了subscribeOn(Schedulers.io()),仍然是在主線程執(zhí)行,可能會(huì)造成界面卡頓甚至崩潰,所以耗時(shí)操作還是使用Observable.create(…);
filter
條件過(guò)濾,去除不符合某些條件的事件。舉個(gè)栗子:
Observable.fromArray(new Integer[]{1, 2, 3, 4, 5})
.filter(new Predicate<Integer>() {
@Override
public boolean test(Integer integer) throws Exception {
// 偶數(shù)返回true,則表示剔除奇數(shù),留下偶數(shù)
return integer % 2 == 0;
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.i(TAG, "number:" + integer);
}
});
number:2
number:4
take
最多保留的事件數(shù)。
Observable.just("1", "2", "6", "3", "4", "5").take(2).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String value) {
Log.d(TAG,value);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
1
2
可以發(fā)現(xiàn)我們發(fā)送了6個(gè)String,最后只打印了前兩個(gè),這就是take過(guò)濾掉的結(jié)果
doOnNext
如果你想在處理下一個(gè)事件之前做某些事,就可以調(diào)用該方法
Observable.fromArray(new Integer[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12}).filter(new Predicate<Integer>() {
@Override
public boolean test(Integer integer) throws Exception {
// 偶數(shù)返回true,則表示剔除奇數(shù)
return integer % 2 == 0;
}
})// 最多保留三個(gè),也就是最后剩三個(gè)偶數(shù)
.take(3).doOnNext(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
// 在輸出偶數(shù)之前輸出它的hashCode
Log.i(TAG, "hahcode = " + integer.hashCode() + "");
}
}).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer value) {
Log.i(TAG, "number = " + value);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
hahcode = 2
number = 2
hahcode = 4
number = 4
hahcode = 6
number = 6
針對(duì)Android的一些擴(kuò)展
RxAndroid是RxJava的一個(gè)針對(duì)Android平臺(tái)的擴(kuò)展。它包含了一些能夠簡(jiǎn)化Android開(kāi)發(fā)的工具。
首先,AndroidSchedulers提供了針對(duì)Android的線程系統(tǒng)的調(diào)度器。需要在UI線程中運(yùn)行某些代碼?很簡(jiǎn)單,只需要使用AndroidSchedulers.mainThread():
retrofitService.getImage(url)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(bitmap -> myImageView.setImageBitmap(bitmap));
接著要介紹的就是AndroidObservable,它提供了跟多的功能來(lái)配合Android的生命周期。bindActivity()和bindFragment()方法默認(rèn)使用AndroidSchedulers.mainThread()來(lái)執(zhí)行觀察者代碼,這兩個(gè)方法會(huì)在Activity或者Fragment結(jié)束的時(shí)候通知被觀察者停止發(fā)出新的消息。
AndroidObservable.bindActivity(this, retrofitService.getImage(url))
.subscribeOn(Schedulers.io())
.subscribe(bitmap -> myImageView.setImageBitmap(bitmap);
我自己也很喜歡AndroidObservable.fromBroadcast()方法,它允許你創(chuàng)建一個(gè)類似BroadcastReceiver的Observable對(duì)象。下面的例子展示了如何在網(wǎng)絡(luò)變化的時(shí)候被通知到:
IntentFilter filter = new IntentFilter(ConnectivityManager.CONNECTIVITY_ACTION);
AndroidObservable.fromBroadcast(context, filter)
.subscribe(intent -> handleConnectivityChange(intent));
最后要介紹的是ViewObservable,使用它可以給View添加了一些綁定。如果你想在每次點(diǎn)擊view的時(shí)候都收到一個(gè)事件,可以使用ViewObservable.clicks(),或者你想監(jiān)聽(tīng)TextView的內(nèi)容變化,可以使用ViewObservable.text()
ViewObservable.clicks(mCardNameEditText, false)
.subscribe(view -> handleClick(view));
RxAndroid的一些使用場(chǎng)景
這里總結(jié)了一些很合適使用RxAndroid的場(chǎng)景,供大家打開(kāi)腦洞~分享時(shí)候有時(shí)間給大家看看demo
- 界面需要等到多個(gè)接口并發(fā)取完數(shù)據(jù),再更新
Observable<String> observable1 = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("haha");
}
}).subscribeOn(Schedulers.newThread());
Observable<String> observable2 = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("hehe");
}
}).subscribeOn(Schedulers.newThread());
Observable.merge(observable1, observable2)
.subscribeOn(Schedulers.newThread())
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String value) {
Log.d(TAG,value);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
- 界面按鈕需要防止連續(xù)點(diǎn)擊的情況
RxView.clicks(button)
.throttleFirst(1, TimeUnit.SECONDS)
.subscribe(new Observer<Object>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Object o) {
Log.i(TAG, "do clicked!");
}
});
- 響應(yīng)式的界面 比如勾選了某個(gè)checkbox,自動(dòng)更新對(duì)應(yīng)的preference
SharedPreferences preferences = PreferenceManager.getDefaultSharedPreferences(context);
RxSharedPreferences rxPreferences = RxSharedPreferences.create(preferences);
Preference<String> username = rxPreferences.getString("username");
Preference<Boolean> showWhatsNew = rxPreferences.getBoolean("show-whats-new", true);
username.asObservable().subscribe(new Action1<String>() {
@Override public void call(String username) {
Log.d(TAG, "Username: " + username); 讀取到當(dāng)前值
}
}
RxCompoundButton.checks(showWhatsNewView)
.subscribe(showWhatsNew.asAction());
最后的話
通過(guò)本篇文章,大家應(yīng)該對(duì)RxAndroid有個(gè)大體的認(rèn)識(shí)了,也應(yīng)該體會(huì)到它在異步操作,代碼鏈?zhǔn)綍?shū)寫等方面的優(yōu)勢(shì)了。需要注意的是由于RxJava存在理解的門檻,貿(mào)然引入項(xiàng)目要確保協(xié)同開(kāi)發(fā)的人員也都對(duì)Rxjava有所了解~
[參考資料]