? RxAndroid學習筆記--2019-1-31
? 原文鏈接: http://www.itdecent.cn/p/0cd258eecf60
? https://juejin.im/entry/5993a80cf265da249150e93c
-
配置
implementation 'io.reactivex.rxjava2:rxjava:2.2.6' implementation 'io.reactivex.rxjava2:rxandroid:2.1.0' -
基礎(chǔ)用法
Observable被觀察者,即發(fā)射器(上游事件)
Observer 觀察者,即接收器(下游事件)
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("測試1");
emitter.onNext("測試2");
emitter.onNext("測試3");
emitter.onNext("測試4");
emitter.onComplete();
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
Log.d(TAG, s);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
運行程序
com.dxl.myapplication D/dxl: 測試1
com.dxl.myapplication D/dxl: 測試1
com.dxl.myapplication D/dxl: 測試2
com.dxl.myapplication D/dxl: 測試3
com.dxl.myapplication D/dxl: 測試4
com.dxl.myapplication D/dxl: onComplete
- 在發(fā)射事件過程中,我們調(diào)用了onComplete后,接收事件將停止,但是發(fā)射事件仍將繼續(xù):
例如:
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("測試1");
Log.d(TAG, "emitter.onNext(\"測試1\")");
emitter.onNext("測試2");
Log.d(TAG, "emitter.onNext(\"測試2\")");
emitter.onNext("測試3");
Log.d(TAG, "emitter.onNext(\"測試3\")");
emitter.onComplete();
Log.d(TAG, "emitter.onComplete()");
emitter.onNext("測試4");
Log.d(TAG, "emitter.onNext(\"測試4\")");
}
})
測試3發(fā)送完成后,調(diào)用了onComplete方法后,測試4仍然會發(fā)送,但是無法接收到
-
Disposable概念,可以切斷接收。當它的isDisposed為false時,可以繼續(xù)接收到事件。如果為true,將不再接收事件。使用方法:
... }).subscribe(new Observer<String>() { private Disposable mDisposable; @Override public void onSubscribe(Disposable d) { mDisposable = d; } @Override public void onNext(String s) { if (s.equals("測試3")) { mDisposable.dispose(); } Log.d(TAG, s); } ...當接收到測試3 后,切斷接收事件。后續(xù)測試4 將不會再接收到。
-
Map
它的作用是對發(fā)射時間發(fā)送的每一個事件應(yīng)用一個函數(shù),每一個事件都按照指定的函數(shù)去變化.
舉例:發(fā)送的是1,2,3,我們對發(fā)送的數(shù)字做*10處理。
Observable.just(1, 2, 3).map(new Function<Integer, Integer>() { @Override public Integer apply(Integer integer) throws Exception { return integer * 10; } }).subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.d(TAG, integer + ""); } });log輸出:
01-31 09:56:29.435 20363-20363/com.dxl.myapplication D/dxl: 10 01-31 09:56:29.435 20363-20363/com.dxl.myapplication D/dxl: 20 01-31 09:56:29.435 20363-20363/com.dxl.myapplication D/dxl: 30 -
ZIP
專用于合并事件,該合并不是連接(連接操作符后面會說),而是兩兩配對,也就意味著,最終配對出的
Observable發(fā)射事件數(shù)目只和少的那個相同。Observable.zip(Observable.just(1, 2, 3), Observable.just("one", "two"), new BiFunction<Integer, String, String>() { @Override public String apply(Integer integer, String s) throws Exception { return integer + s; } }).subscribe(new Consumer<String>() { @Override public void accept(String s) throws Exception { Log.d(TAG, s); } });日志輸出:
01-31 10:49:59.577 30063-30063/com.dxl.myapplication D/dxl: 1one 01-31 10:49:59.577 30063-30063/com.dxl.myapplication D/dxl: 2two -
Concat
兩個發(fā)射器連接成一個發(fā)射器
Observable.concat(Observable.just(1,2,3), Observable.just(4,5,6)) .subscribe(new Consumer<Integer>() { @Override public void accept(@NonNull Integer integer) throws Exception { Log.e(TAG, "concat : "+ integer + "\n" ); } });注意,concat必須是第一個發(fā)射器執(zhí)行完complete之后,才會去執(zhí)行第二個。如果第一個發(fā)射器沒有執(zhí)行onComplete,那么第二個將不會被執(zhí)行。
比如:
Observable<String> observable1 = Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> emitter) throws Exception { emitter.onNext("1"); emitter.onNext("2"); emitter.onNext("3"); //沒有調(diào)用onComplete,observable2將不會被執(zhí)行 // emitter.onComplete(); } }); Observable<String> observable2 = Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> emitter) throws Exception { emitter.onNext("4"); emitter.onNext("5"); emitter.onNext("6"); emitter.onComplete(); } }); Observable.concat(observable1, observable2).subscribe(new Observer<String>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(String s) { Log.d(TAG, s); } @Override public void onError(Throwable e) { } @Override public void onComplete() { Log.d(TAG, "onComplete"); } });這時候輸出的結(jié)果為:
02-01 08:26:45.516 30613-30613/com.dxl.myapplication D/dxl: 1 02-01 08:26:45.516 30613-30613/com.dxl.myapplication D/dxl: 2 02-01 08:26:45.516 30613-30613/com.dxl.myapplication D/dxl: 3如果我們把注釋打開:此時observable2就會被執(zhí)行了。
02-01 08:27:34.313 31055-31055/com.dxl.myapplication D/dxl: 1 02-01 08:27:34.313 31055-31055/com.dxl.myapplication D/dxl: 2 02-01 08:27:34.313 31055-31055/com.dxl.myapplication D/dxl: 3 02-01 08:27:34.313 31055-31055/com.dxl.myapplication D/dxl: 4 02-01 08:27:34.313 31055-31055/com.dxl.myapplication D/dxl: 5 02-01 08:27:34.313 31055-31055/com.dxl.myapplication D/dxl: 6 02-01 08:27:34.313 31055-31055/com.dxl.myapplication D/dxl: onComplete如果把
concat改為merge, 則observable1和observable2將都會被執(zhí)行。用途舉例,比如有些時候,對數(shù)據(jù)不太敏感時,我們需要先從緩存中讀取數(shù)據(jù),如果緩存中沒有數(shù)據(jù)的話,再去讀取網(wǎng)絡(luò)數(shù)據(jù)。
這時候可以分別定義緩存的observable和在線的observable,當成功從緩存中讀取數(shù)據(jù)時,調(diào)用onNext,如果緩存獲取不到,直接調(diào)用onComplete去執(zhí)行在線獲取的observable。
-
FlatMap
將一個發(fā)射器Observable轉(zhuǎn)換為多個發(fā)射器Observables,然后再將多個發(fā)射器裝入一個單一的發(fā)射器Observable。
有個需要注意的是,
flatMap并不能保證事件的順序,如果需要保證,需要用到我們下面要講的ConcatMap。Observable.just(1,2,3).flatMap(new Function<Integer, ObservableSource<String>>() { @Override public ObservableSource<String> apply(Integer integer) throws Exception { List<String> list = new ArrayList<>(); for (int i = 0; i < integer; i++) { list.add("integer:" + integer + "--" + i + ""); } return Observable.fromIterable(list); } }).subscribe(new Consumer<String>() { @Override public void accept(String s) throws Exception { Log.d(TAG, s); } });輸出結(jié)果:
01-31 11:36:27.957 32001-32001/com.dxl.myapplication D/dxl: integer:1--0 01-31 11:36:27.957 32001-32001/com.dxl.myapplication D/dxl: integer:2--0 01-31 11:36:27.957 32001-32001/com.dxl.myapplication D/dxl: integer:2--1 01-31 11:36:27.957 32001-32001/com.dxl.myapplication D/dxl: integer:3--0 01-31 11:36:27.957 32001-32001/com.dxl.myapplication D/dxl: integer:3--1 01-31 11:36:27.957 32001-32001/com.dxl.myapplication D/dxl: integer:3--2 -
concatMap
concatMap與FlatMap的唯一區(qū)別就是concatMap保證了順序,其他使用是一樣的。 -
distinct
去重。例如
Observable.just(1,1,2,3,3)輸出結(jié)果為1,2 ,3 -
Fliter
過濾器。接收一個參數(shù),過濾掉不需要的結(jié)果。
例如:Observable.just(1, 2, 3, 4, 5).filter(new Predicate<Integer>() { @Override public boolean test(Integer integer) throws Exception { //不滿足此條件的將被過濾 return integer > 3; } }).subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.d(TAG, integer + ""); } });輸出結(jié)果:
01-31 11:52:09.085 2005-2005/com.dxl.myapplication D/dxl: 4
01-31 11:52:09.085 2005-2005/com.dxl.myapplication D/dxl: 5
-
timer
相當于一個定時任務(wù),默認在新線程。
如:Log.d(TAG, "定時開始"); Observable.timer(2, TimeUnit.SECONDS).subscribe(new Consumer<Long>() { @Override public void accept(Long aLong) throws Exception { Log.d(TAG, "定時結(jié)束"); } });
aLong暫時沒有意義。都是0
01-31 13:10:47.518 8435-8435/com.dxl.myapplication D/dxl: 定時開始
01-31 13:10:49.560 8435-8505/com.dxl.myapplication D/dxl: 定時結(jié)束
-
interval
interval操作符用于間隔時間執(zhí)行某個操作,其接受三個參數(shù),分別是第一次發(fā)送延遲,間隔時間,時間單位。返回值是Disposable,可以利用用于取消事件。
Log.d(TAG, "定時開始"); mDisposable = Observable.interval(1, 2, TimeUnit.SECONDS).subscribe(new Consumer<Long>() { @Override public void accept(Long aLong) throws Exception { Log.d(TAG, "aLong = " + aLong); if (aLong >= 5) { mDisposable.dispose(); } } }); @Override protected void onDestroy() { super.onDestroy(); if (mDisposable != null && !mDisposable.isDisposed()) { mDisposable.dispose(); } }
輸出:
01-31 13:17:01.063 9118-9118/com.dxl.myapplication D/dxl: 定時開始
01-31 13:17:02.104 9118-9143/com.dxl.myapplication D/dxl: aLong = 0
01-31 13:17:04.096 9118-9143/com.dxl.myapplication D/dxl: aLong = 1
01-31 13:17:06.098 9118-9143/com.dxl.myapplication D/dxl: aLong = 2
01-31 13:17:08.100 9118-9143/com.dxl.myapplication D/dxl: aLong = 3
01-31 13:17:10.102 9118-9143/com.dxl.myapplication D/dxl: aLong = 4
01-31 13:17:12.104 9118-9143/com.dxl.myapplication D/dxl: aLong = 5
倒計時:
/**
* 倒計時方法
* @param time
* @return
*/
private Flowable<Long> countDown(final int time) {
return Flowable.interval(1, TimeUnit.SECONDS)
.map(new Function<Long, Long>() {
@Override
public Long apply(Long aLong) throws Exception {
return time - aLong;
}
}).take(time + 1);
}
mDisposable = countDown(5).observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
textview.setText(aLong + "");
}
});
@Override
protected void onDestroy() {
super.onDestroy();
if (mDisposable != null && !mDisposable.isDisposed()) {
mDisposable.dispose();
}
}
-
delay
延時發(fā)送數(shù)據(jù)。
mDisposable = Observable.just(1).delay(2, TimeUnit.SECONDS) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.d(TAG, System.currentTimeMillis() + " " + " integer = " + integer); } });
-
背壓BackPressure
背壓產(chǎn)生的原因: 被觀察者發(fā)送消息太快以至于它的操作符或者訂閱者不能及時處理相關(guān)的消息
為了解決這個問題,在RxJava2里,引入了
Flowable這個類:Observable不包含 backpressure 處理,而 Flowable 包含。下面我們來模擬一個觸發(fā)背壓的實例 , 發(fā)射器每1毫秒發(fā)射一個數(shù)據(jù),接收器每一秒處理一個數(shù)據(jù)。數(shù)據(jù)產(chǎn)生是數(shù)據(jù)處理的1000 倍。
首先用 RxJava 2.x 版本的 Observable 來實現(xiàn)。
Observable.interval(1, TimeUnit.MILLISECONDS) .subscribeOn(Schedulers.io()) .observeOn(Schedulers.newThread()) .subscribe(new Consumer<Long>() { @Override public void accept(Long aLong) throws Exception { Thread.sleep(1000); Log.e("zhao", "onNext: " + aLong); } });經(jīng)過測試,app 很健壯,沒有發(fā)生崩潰,日志每1秒打印一次。在上面我們說到 2.x 版本中 Observable 不再支持背壓,發(fā)神器生成的數(shù)據(jù)全部緩存在內(nèi)存中。
Observable :
- 不支持 backpressure 處理,不會發(fā)生 MissingBackpressureException 異常。
- 所有沒有處理的數(shù)據(jù)都緩存在內(nèi)存中,等待被訂閱者處理。
- 壞處是:當產(chǎn)生的數(shù)據(jù)過快,內(nèi)存中緩存的數(shù)據(jù)越來越多,占用大量內(nèi)存。
然后用 RxJava 2.x 版本的 Flowable 來實現(xiàn)。
Flowable.interval(1, TimeUnit.MILLISECONDS) .subscribeOn(Schedulers.io()) .observeOn(Schedulers.newThread()) .subscribe(new Consumer<Long>() { @Override public void accept(Long aLong) throws Exception { Thread.sleep(1000); Log.e("zhao", "onNext: " + aLong); } });運行起來發(fā)生崩潰,崩潰日志如下:
io.reactivex.exceptions.OnErrorNotImplementedException: Can't deliver value 128 due to lack of requests ... ... Caused by: io.reactivex.exceptions.MissingBackpressureException: Can't deliver value 128 due to lack of requests很明顯發(fā)生了 MissingBackpressureException 異常 , 128 代表是 Flowable 最多緩存 128 個數(shù)據(jù),緩存次超過 128 個數(shù)據(jù),就會報錯。可喜的是,Rxjava 已經(jīng)給我們提供了解決背壓的策略。
onBackpressureDrop
onBackpressureDrop() :當緩沖區(qū)數(shù)據(jù)滿 128 個時候,再新來的數(shù)據(jù)就會被丟棄,如果此時有數(shù)據(jù)被消費了,那么就會把當前最新產(chǎn)生的數(shù)據(jù),放到緩沖區(qū)。簡單來說 Drop 就是直接把存不下的事件丟棄。
onBackpressureDrop 測試
Flowable.interval( 1 , TimeUnit.MILLISECONDS) .onBackpressureDrop() //onBackpressureDrop 一定要放在 interval 后面否則不會生效 .subscribeOn(Schedulers.io()) .observeOn(Schedulers.newThread()) .subscribe(new Consumer<Long>() { @Override public void accept(Long aLong) throws Exception { Thread.sleep(1000); Log.e("zhao", "onNext: " + aLong); } });效果如下:
E/zhao: onNext: 0 E/zhao: onNext: 1 ... E/zhao: onNext: 126 E/zhao: onNext: 127 E/zhao: onNext: 96129 E/zhao: onNext: 96130 E/zhao: onNext: 96131從日志上分析來看,發(fā)射器發(fā)射的 0 ~ 127 總共 128 個數(shù)據(jù)是連續(xù)的,下一個數(shù)據(jù)就是 96129 , 128 ~ 96128 的數(shù)據(jù)被丟棄了。
注意事項
1、onBackpressureDrop 一定要放在 interval 后面否則不會生效
onBackpressureLatest
onBackpressureLatest 就是只保留最新的事件。
onBackpressureBuffer
- onBackpressureBuffer:默認情況下緩存所有的數(shù)據(jù),不會丟棄數(shù)據(jù),這個方法可以解決背壓問題,但是它有像 Observable 一樣的缺點,緩存數(shù)據(jù)太多,占用太多內(nèi)存。
- onBackpressureBuffer(int capacity) :設(shè)置緩存隊列大小,但是如果緩沖數(shù)據(jù)超過了設(shè)置的值,就會報錯,發(fā)生崩潰。
onBackpressureBuffer(int capacity) 測試
Flowable.interval( 1 , TimeUnit.MILLISECONDS) .onBackpressureBuffer( 1000 ) //設(shè)置緩沖隊列大小為 1000 .subscribeOn(Schedulers.io()) .observeOn(Schedulers.newThread()) .subscribe(new Consumer<Long>() { @Override public void accept(Long aLong) throws Exception { Thread.sleep(1000); Log.e("zhao", "onNext: " + aLong); } });運行起來后,過了幾秒鐘,發(fā)生崩潰,日志如下:
io.reactivex.exceptions.OnErrorNotImplementedException: Buffer is full ··· Caused by: io.reactivex.exceptions.MissingBackpressureException: Buffer is full通過日志可以看出,緩沖區(qū)已經(jīng)滿了。
-
doOnNext
可以讓訂閱者在接收到數(shù)據(jù)之前做一些操作,比如把數(shù)據(jù)進行保存。
Observable.just(1,2,3).doOnNext(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.d(TAG, "doOnNext-" + integer); } }).subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.d(TAG, integer + ""); } });輸出結(jié)果:
01-31 14:34:59.107 21047-21047/com.dxl.myapplication D/dxl: doOnNext-1 01-31 14:34:59.107 21047-21047/com.dxl.myapplication D/dxl: 1 01-31 14:34:59.107 21047-21047/com.dxl.myapplication D/dxl: doOnNext-2 01-31 14:34:59.107 21047-21047/com.dxl.myapplication D/dxl: 2 01-31 14:34:59.107 21047-21047/com.dxl.myapplication D/dxl: doOnNext-3 01-31 14:34:59.107 21047-21047/com.dxl.myapplication D/dxl: 3 -
skip
跳過count個數(shù)目開始接收。
Observable.just(1,2,3,4,5) .skip(2) .subscribe(new Consumer<Integer>() { @Override public void accept(@NonNull Integer integer) throws Exception { Log.d(TAG, "skip : "+integer + "\n"); } }); -
take
接受一個 long 型參數(shù) count ,代表至多接收 count 個數(shù)據(jù)。
Observable.just(1,2,3,4,5) .take(2) .subscribe(new Consumer<Integer>() { @Override public void accept(@NonNull Integer integer) throws Exception { Log.d(TAG, "skip : "+integer + "\n"); } }); -
just
簡單的一個發(fā)射器,依次調(diào)用next方法
-
Single
只會接收一個參數(shù),而SingleObserver只會調(diào)用onError()或者onSuccess()Single.just(new Random().nextInt(10)) .subscribe(new SingleObserver<Integer>() { @Override public void onSubscribe(Disposable d) { Log.d(TAG, "dispose"); } @Override public void onSuccess(Integer integer) { Log.d(TAG, integer + ""); } @Override public void onError(Throwable e) { Log.d(TAG, e.getMessage()); } });輸出結(jié)果:
01-31 14:47:59.949 22093-22093/com.dxl.myapplication D/dxl: dispose 01-31 14:47:59.949 22093-22093/com.dxl.myapplication D/dxl: 5
-
debounce
去除發(fā)送頻率過快的項Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { // send events with simulated time wait emitter.onNext(1); // skip Thread.sleep(400); emitter.onNext(2); // deliver Thread.sleep(505); emitter.onNext(3); // skip Thread.sleep(100); emitter.onNext(4); // deliver Thread.sleep(605); emitter.onNext(5); // deliver Thread.sleep(510); emitter.onComplete(); } }).debounce(500, TimeUnit.MILLISECONDS) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.d(TAG, integer + ""); } });設(shè)置的時間間隔是500ms, 發(fā)送1之后,400ms后發(fā)送2,所以1被舍棄。依次類推。
最后輸出結(jié)果:
01-31 14:53:50.290 22568-22601/com.dxl.myapplication D/dxl: 2 01-31 14:53:50.901 22568-22601/com.dxl.myapplication D/dxl: 4 01-31 14:53:51.502 22568-22601/com.dxl.myapplication D/dxl: 5 -
defer
簡單地時候就是每次訂閱都會創(chuàng)建一個新的Observable,并且如果沒有被訂閱,就不會產(chǎn)生新的Observable。Observable<Integer> observable = Observable.defer(new Callable<ObservableSource<Integer>>() { @Override public ObservableSource<Integer> call() throws Exception { return Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { emitter.onNext(1); Log.d(TAG, "1"); emitter.onNext(2); Log.d(TAG, "2"); emitter.onNext(3); Log.d(TAG, "3"); emitter.onNext(4); Log.d(TAG, "4"); emitter.onComplete(); } }); } }); observable.subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.d(TAG, "accept : " + integer); } });輸出結(jié)果:
01-31 15:03:28.124 23895-23895/com.dxl.myapplication D/dxl: accept : 1 01-31 15:03:28.124 23895-23895/com.dxl.myapplication D/dxl: 1 01-31 15:03:28.124 23895-23895/com.dxl.myapplication D/dxl: accept : 2 01-31 15:03:28.124 23895-23895/com.dxl.myapplication D/dxl: 2 01-31 15:03:28.124 23895-23895/com.dxl.myapplication D/dxl: accept : 3 01-31 15:03:28.124 23895-23895/com.dxl.myapplication D/dxl: 3 01-31 15:03:28.124 23895-23895/com.dxl.myapplication D/dxl: accept : 4 01-31 15:03:28.124 23895-23895/com.dxl.myapplication D/dxl: 4 -
last
僅取出可觀察到的最后一個值,或者是滿足某些條件的最后一項。(參數(shù)表示默認值,如果沒有發(fā)送的數(shù)據(jù),取默認值)Observable.just(1,2,3,4).last(2).subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.d(TAG, integer + ""); } });輸出結(jié)果為4.
如果改為:
Observable.just(1,2,3,4).skip(5).last(2).subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.d(TAG, integer + ""); } });這時候全部跳過,沒有要發(fā)送的數(shù)據(jù),返回默認值2
-
merge
作用是把多個Observable結(jié)合起來,接受可變參數(shù),也支持迭代器集合。注意它和concat的區(qū)別在于,不用等到 發(fā)射器 A 發(fā)送完所有的事件再進行發(fā)射器 B 的發(fā)送。
操作符每次用一個方法處理一個值
例如Observable.just(1, 2, 3, 4, 5).reduce(new BiFunction<Integer, Integer, Integer>() { @Override public Integer apply(Integer integer, Integer integer2) throws Exception { Log.d(TAG, "integer : " + integer + ", integer2 : " + integer2); return integer + integer2; } }).subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.d(TAG, integer + ""); } });第一次先取1,2,進行求和得到3,第二次利用求和得到的3與下一個3進行運算,得到6,依次類推。
最后輸出結(jié)果:
01-31 15:36:24.452 27942-27942/com.dxl.myapplication D/dxl: integer : 1, integer2 : 2 01-31 15:36:24.452 27942-27942/com.dxl.myapplication D/dxl: integer : 3, integer2 : 3 01-31 15:36:24.452 27942-27942/com.dxl.myapplication D/dxl: integer : 6, integer2 : 4 01-31 15:36:24.452 27942-27942/com.dxl.myapplication D/dxl: integer : 10, integer2 : 5 01-31 15:36:24.452 27942-27942/com.dxl.myapplication D/dxl: 15 -
scan
和reduce相似,但是reduce只輸出最后的結(jié)果,scan會輸出過程。例如上面的代碼reduce改為scan,輸出結(jié)果如下:
01-31 15:50:17.795 28628-28628/com.dxl.myapplication D/dxl: 1 01-31 15:50:17.795 28628-28628/com.dxl.myapplication D/dxl: integer : 1, integer2 : 2 01-31 15:50:17.795 28628-28628/com.dxl.myapplication D/dxl: 3 01-31 15:50:17.795 28628-28628/com.dxl.myapplication D/dxl: integer : 3, integer2 : 3 01-31 15:50:17.795 28628-28628/com.dxl.myapplication D/dxl: 6 01-31 15:50:17.795 28628-28628/com.dxl.myapplication D/dxl: integer : 6, integer2 : 4 01-31 15:50:17.795 28628-28628/com.dxl.myapplication D/dxl: 10 01-31 15:50:17.795 28628-28628/com.dxl.myapplication D/dxl: integer : 10, integer2 : 5 01-31 15:50:17.795 28628-28628/com.dxl.myapplication D/dxl: 15 -
實例
Observable.create(new ObservableOnSubscribe<Response>() {
@Override
public void subscribe(ObservableEmitter<Response> emitter) throws Exception {
Log.d(TAG, "create : " + Thread.currentThread().getName());
Request request = new Request.Builder().url("http://gank.io/api/xiandu/categories").build();
Response response = new OkHttpClient().newCall(request).execute();
if (response.isSuccessful()) {
emitter.onNext(response);
} else {
emitter.onError(new Exception(response.message()));
}
emitter.onComplete();
}
})
//指定map的操作線程
.observeOn(Schedulers.computation())
.map(new Function<Response, Category>() {
@Override
public Category apply(Response response) throws Exception {
Log.d(TAG, "map : " + Thread.currentThread().getName());
ResponseBody responseBody = response.body();
Category category = new Gson().fromJson(responseBody.string(), Category.class);
return category;
}
})
//指定doOnNext的線程
.observeOn(AndroidSchedulers.mainThread())
.doOnNext(new Consumer<Category>() {
@Override
public void accept(Category category) throws Exception {
Log.d(TAG, "doOnNext1 : " + Thread.currentThread().getName());
}
})
//指定第二次doOnNext的線程
.observeOn(Schedulers.io())
.doOnNext(new Consumer<Category>() {
@Override
public void accept(Category category) throws Exception {
Log.d(TAG, "doOnNext2 : " + Thread.currentThread().getName());
}
})
//指定事件產(chǎn)生的線程
.subscribeOn(Schedulers.io())
//指定事件消費線程
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<Category>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Category category) {
Log.d(TAG, "subscribe : " + Thread.currentThread().getName());
Log.d(TAG, category.toString());
}
@Override
public void onError(Throwable e) {
Log.e(TAG, e.toString());
}
@Override
public void onComplete() {
}
});
/**
* 實體類
* @author dxl
* @date 2019/1/31 15:57
*/
public class Category {
private boolean error;
public List<Results> results;
public class Results {
public String _id;
public String en_name;
public String name;
public int rank;
@Override
public String toString() {
return "Results{" +
"_id='" + _id + '\'' +
", en_name='" + en_name + '\'' +
", name='" + name + '\'' +
", rank=" + rank +
'}';
}
}
@Override
public String toString() {
return "Category{" +
"error=" + error +
", results=" + results.toString() +
'}';
}
}
輸出結(jié)果:
01-31 16:51:19.458 6151-6235/com.dxl.myapplication D/dxl: create : RxCachedThreadScheduler-1
01-31 16:51:19.738 6151-6151/com.dxl.myapplication D/dxl: map : main
01-31 16:51:19.778 6151-6151/com.dxl.myapplication D/dxl: doOnNext1 : main
01-31 16:51:19.788 6151-6282/com.dxl.myapplication D/dxl: doOnNext2 : RxCachedThreadScheduler-2
01-31 16:51:19.788 6151-6151/com.dxl.myapplication D/dxl: subscribe : main
01-31 16:51:19.788 6151-6151/com.dxl.myapplication D/dxl: Category{error=false, results=[Results{_id='57c83777421aa97cbd81c74d', en_name='wow', name='科技資訊', rank=1}, Results{_id='57c83577421aa97cb162d8b1', en_name='apps', name='趣味軟件/游戲', rank=5}, Results{_id='57c83627421aa97cbd81c74b', en_name='imrich', name='裝備黨', rank=50}, Results{_id='57c836b4421aa97cbd81c74c', en_name='funny', name='草根新聞', rank=100}, Results{_id='5827dc81421aa911e32d87cc', en_name='android', name='Android', rank=300}, Results{_id='582c5346421aa95002741a8e', en_name='diediedie', name='創(chuàng)業(yè)新聞', rank=340}, Results{_id='5829c2bc421aa911e32d87e7', en_name='thinking', name='獨立思想', rank=400}, Results{_id='5827dd7b421aa911d3bb7eca', en_name='iOS', name='iOS', rank=500}, Results{_id='5829b881421aa911dbc9156b', en_name='teamblog', name='團隊博客', rank=600}]}
- subscribeOn事件產(chǎn)生的線程只能指定一次, observeOn可以指定多次。
-
線程調(diào)度
subScribeOn
subscribeOn用于指定subscribe()時所發(fā)生的線程observeOn
observeOn方法用于指定下游Observer回調(diào)發(fā)生的線程。線程切換需要注意的
RxJava 內(nèi)置的線程調(diào)度器的確可以讓我們的線程切換得心應(yīng)手,但其中也有些需要注意的地方。
- 簡單地說,
subscribeOn()指定的就是發(fā)射事件的線程,observerOn指定的就是訂閱者接收事件的線程。 - 多次指定發(fā)射事件的線程只有第一次指定的有效,也就是說多次調(diào)用
subscribeOn()只有第一次的有效,其余的會被忽略。 - 但多次指定訂閱者接收線程是可以的,也就是說每調(diào)用一次
observerOn(),下游的線程就會切換一次。
Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception { Log.e(TAG, "Observable thread is : " + Thread.currentThread().getName()); e.onNext(1); e.onComplete(); } }).subscribeOn(Schedulers.newThread()) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .doOnNext(new Consumer<Integer>() { @Override public void accept(@NonNull Integer integer) throws Exception { Log.e(TAG, "After observeOn(mainThread),Current thread is " + Thread.currentThread().getName()); } }) .observeOn(Schedulers.io()) .subscribe(new Consumer<Integer>() { @Override public void accept(@NonNull Integer integer) throws Exception { Log.e(TAG, "After observeOn(io),Current thread is " + Thread.currentThread().getName()); } });07-03 14:54:01.177 15121-15438/com.nanchen.rxjava2examples E/RxThreadActivity: Observable thread is : RxNewThreadScheduler-1 07-03 14:54:01.178 15121-15121/com.nanchen.rxjava2examples E/RxThreadActivity: After observeOn(mainThread),Current thread is main 07-03 14:54:01.179 15121-15439/com.nanchen.rxjava2examples E/RxThreadActivity: After observeOn(io),Current thread is RxCachedThreadScheduler-2實例代碼中,分別用
Schedulers.newThread()和Schedulers.io()對發(fā)射線程進行切換,并采用observeOn(AndroidSchedulers.mainThread()和Schedulers.io()進行了接收線程的切換??梢钥吹捷敵鲋邪l(fā)射線程僅僅響應(yīng)了第一個newThread,但每調(diào)用一次observeOn(),線程便會切換一次,因此如果我們有類似的需求時,便知道如何處理了。RxJava 中,已經(jīng)內(nèi)置了很多線程選項供我們選擇,例如有:
-
Schedulers.io()代表io操作的線程, 通常用于網(wǎng)絡(luò),讀寫文件等io密集型的操作; -
Schedulers.computation()代表CPU計算密集型的操作, 例如需要大量計算的操作; -
Schedulers.newThread()代表一個常規(guī)的新線程; -
AndroidSchedulers.mainThread()代表Android的主線程
這些內(nèi)置的
Scheduler已經(jīng)足夠滿足我們開發(fā)的需求,因此我們應(yīng)該使用內(nèi)置的這些選項,而 RxJava 內(nèi)部使用的是線程池來維護這些線程,所以效率也比較高。 - 簡單地說,