前言
首先要感謝 Season_zlc 的一系列RxJava2的教程,關(guān)于上游、下游、水缸的類比,讓我對(duì)于整個(gè)RxJava2的基本思想有了更加清晰的認(rèn)識(shí)。大家有興趣的話一定要多看看,寫的通俗易懂,傳送門:給初學(xué)者的 RxJava 2.0 教程 (一) ,本文的思想都來源于它的一系列文章。
文章比較長,為了避免耽誤大家的時(shí)間,先列出需要介紹的知識(shí)點(diǎn):

一、RxJava2 的基本模型
1.1 使用實(shí)例
在開始學(xué)習(xí)之前,我們先看一下最簡(jiǎn)單的例子:
- 第一步:導(dǎo)入依賴包:
dependencies {
//在build.gradle中,導(dǎo)入依賴。
compile 'io.reactivex.rxjava2:rxjava:2.0.1'
compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
}
- 第二步:使用最基本的
Observable+Observer的最簡(jiǎn)單示例,這里我們?cè)谏嫌伟l(fā)送了四個(gè)onNext(String s)事件之后,最后發(fā)送了一個(gè)onComplete()事件。
public static void classicalSample() {
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> observableEmitter) throws Exception {
observableEmitter.onNext("1");
observableEmitter.onNext("2");
observableEmitter.onNext("3");
observableEmitter.onNext("4");
observableEmitter.onComplete();
}
}).subscribe(new Observer<String>() {
private Disposable mDisposable;
@Override
public void onSubscribe(Disposable disposable) {
Log.d(TAG, "onSubscribe");
mDisposable = disposable;
}
@Override
public void onNext(String s) {
Log.d(TAG, "onNext=" + s);
}
@Override
public void onError(Throwable throwable) {
Log.d(TAG, "onError");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
}
- 第三步:運(yùn)行結(jié)果,訂閱成功之后,會(huì)依次回調(diào)以下三步操作:
onSubscribe;onNext;onComplete。
1.2 基本元素
在上面的例子中,涉及到了以下五個(gè)類:
-
Observable:上游。 -
ObservableOnSubscribe:上游的create方法所接收的參數(shù)。 -
ObservableEmitter:上游事件的發(fā)送者。 -
Observer:下游的接收者。 -
Disposable:用于維系上游、下游之間的聯(lián)系。
對(duì)于整個(gè)模型,可以總結(jié)為以下幾點(diǎn):
-
RxJava2簡(jiǎn)單的來說,就是一個(gè)發(fā)送事件、接收事件的過程,我們可以將發(fā)送事件方類比作上游,而接收事件方類比作下游。 - 上游每產(chǎn)生一個(gè)事件,下游就能收到事件,上游對(duì)應(yīng)
Observable,而下游對(duì)應(yīng)Observer。 - 只有當(dāng)上游和下游建立連接之后,上游才會(huì)開始發(fā)送事件,這一關(guān)系的建立是通過
subscribe方法。
各關(guān)鍵元素的UML圖如下:

1.3 ObservableEmitter
用于 發(fā)出事件,它可以分別發(fā)出onNext/onComplete/onError事件:
- 上游可以發(fā)送無限個(gè)
onNext,下游也可以接收無限個(gè)onNext。 - 當(dāng)上游發(fā)送了一個(gè)
onComplete/onError后,上游onComplete/onError后的事件將會(huì)繼續(xù)發(fā)送,但是下游在收到onComplete/onError事件后不再繼續(xù)接收事件。 - 上游可以不發(fā)送
onComplete或者onError事件。 - 調(diào)用
onError或者onComplete切斷了上游和下游的聯(lián)系,在聯(lián)系切斷后上游再發(fā)送onError事件就會(huì)報(bào)錯(cuò),onComplete和onError的調(diào)用情況有以下幾種:
(1)onComplete可以發(fā)送多次,但是只會(huì)收到一次回調(diào)。
(2)onError只可以發(fā)送一次,發(fā)送多次會(huì)報(bào)錯(cuò)。
(3)onComplete之后不可以發(fā)送onError,否則會(huì)報(bào)錯(cuò)。
(4)onError之后可以發(fā)送onComplete,但是只會(huì)收到onError事件。 -
onError的參數(shù)不允許為空。
其繼承關(guān)系如下圖所示:

1.4 Disposable
理解成為 水管的機(jī)關(guān),當(dāng)調(diào)用它的dispose方法時(shí),將會(huì)將上游和下游之間的管道切斷,從而導(dǎo)致 下游接收不到事件。
- 在
Observer的onSubscribe回調(diào)中,會(huì)傳入一個(gè)Disposable對(duì)象,下游可以通過該對(duì)象的dispose()方法主動(dòng)切斷和上游的聯(lián)系,在這之后上游的observableEmitter.isDisposed()方法將返回true。 - 當(dāng)上游和下游的聯(lián)系切斷之后,下游收不到包括
onComplete/onError在內(nèi)的任何事件,若此時(shí)上游再調(diào)用onError方法發(fā)送事件,那么將會(huì)報(bào)錯(cuò)。
我們來模擬一下,在下游收到2之后,通過Disposable來切斷上游和下游之間的聯(lián)系:
public static void classicalSample() {
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> observableEmitter) throws Exception {
observableEmitter.onNext("1");
observableEmitter.onNext("2");
observableEmitter.onNext("3");
observableEmitter.onNext("4");
observableEmitter.onComplete();
}
}).subscribe(new Observer<String>() {
private Disposable mDisposable;
@Override
public void onSubscribe(Disposable disposable) {
Log.d(TAG, "onSubscribe");
mDisposable = disposable;
}
@Override
public void onNext(String s) {
Log.d(TAG, "onNext=" + s);
if ("2".equals(s)) {
mDisposable.dispose();
}
}
@Override
public void onError(Throwable throwable) {
Log.d(TAG, "onError");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
}
最終的運(yùn)行結(jié)果為:

1.5 Subscribe 的重載方法
通過subscribe確定上游和下游的聯(lián)系有以下幾種方法:

可以看到,這里可以分為三類:
- 不帶參數(shù)
-
Consumer<T>類 -
Observer類 -
Action類
對(duì)于不使用Observer類作為形參的subscribe函數(shù),其實(shí)實(shí)現(xiàn)的功能和使用Observer類作為參數(shù)的方法相同,只不過它們是將Observer的四個(gè)回調(diào)分解成形參,有參數(shù)的回調(diào)用Consumer<T>代替,而沒有參數(shù)的則用Action代替。

二、線程切換
2.1 基本概念
- 當(dāng)我們?cè)谏嫌蝿?chuàng)建一個(gè)
Observable來發(fā)送事件,那么這個(gè)上游就默認(rèn)在主線程發(fā)送事件;而當(dāng)我們?cè)谙掠蝿?chuàng)建一個(gè)Observer來接收事件,那么這個(gè)下游就默認(rèn)在主線程中接收事件。 -
subscribeOn指定的是 上游發(fā)送事件 的線程,而observeOn指定的是 下游接收事件 的線程。 - 多次調(diào)用
subscribeOn只有第一次有效,而每調(diào)用一次observeOn,那么下游接收消息的線程就會(huì)切換一次。 -
CompositeDisposable可以用來容納Disposable對(duì)象,每當(dāng)我們得到一個(gè)Disposable對(duì)象時(shí),就通過add方法將它添加進(jìn)入容器,在退出的時(shí)候,調(diào)用clear方法,即可切斷所有的水管。
2.2 線程類型
-
Schedulers.io():代表IO操作,通常用于網(wǎng)絡(luò)請(qǐng)求、文件讀寫等IO密集型的操作。 -
Schedulers.computation():代表CPU密集型的操作,適用于大量計(jì)算。 -
Schedulers.newThread():創(chuàng)建新的常規(guī)線程。 -
AndroidSchedulers.mainThread():代表Android的主線程。
2.3 示例
在鏈?zhǔn)秸{(diào)用當(dāng)中,我們可以通過observeOn方法多次切換管道下游處理消息的線程,例如下面的代碼,我們對(duì)下游進(jìn)行了兩次線程的切換:
static void mapSample() {
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> observableEmitter) throws Exception {
Log.d(TAG, "observableEmitter's thread=" + Thread.currentThread().getId() + ",string=true");
observableEmitter.onNext("true");
Log.d(TAG, "observableEmitter's thread=" + Thread.currentThread().getId() + ",string=false");
observableEmitter.onNext("false");
Log.d(TAG, "observableEmitter's thread=" + Thread.currentThread().getId() + ",onComplete");
observableEmitter.onComplete();
}
//1.指定了subscribe方法執(zhí)行的線程,并進(jìn)行第一次下游線程的切換,將其切換到新的子線程。
}).subscribeOn(Schedulers.io()).observeOn(Schedulers.newThread()).map(new Function<String, Boolean>() {
@Override
public Boolean apply(String s) throws Exception {
Log.d(TAG, "apply's thread=" + Thread.currentThread().getId() + ",s=" + s);
return "true".equals(s);
}
//2.進(jìn)行第二次下游線程的切換,將其切換到主線程。
}).observeOn(AndroidSchedulers.mainThread()).subscribe(new Observer<Boolean>() {
@Override
public void onSubscribe(Disposable disposable) {
}
@Override
public void onNext(Boolean aBoolean) {
Log.d(TAG, "Observer's thread=" + Thread.currentThread().getId() + ",boolean=" + aBoolean);
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onComplete() {
Log.d(TAG, "Observer's thread=" + Thread.currentThread().getId() + ",onComplete");
}
});
}
以上代碼的運(yùn)行的結(jié)果為:

三、Map 和 FlatMap 操作符
3.1 Map
-
Map操作符的作用是對(duì)上游發(fā)送的每一個(gè)事件應(yīng)用一個(gè)函數(shù),使得每個(gè)事件按照函數(shù)的邏輯進(jìn)行變換,通過Map就可以把上游發(fā)送的每一個(gè)事件,轉(zhuǎn)換成Object或者集合,其英文注釋為:
- 以下面使用
map的代碼為例,可以看到map接收一個(gè)Function類,它有兩個(gè)泛型變量,分別為調(diào)用map方法的Observable<T>的<T>泛型,和返回的Obervable<R>的<R>泛型。
public static void mapVerify() {
Observable<Integer> sourceObservable = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
}
});
Observable<String> convertObservable = sourceObservable.map(new Function<Integer, String>() {
@Override
public String apply(Integer integer) throws Exception {
return integer.toString();
}
});
Log.d(TAG, "sourceObservable=" + sourceObservable + "\n convertObservable=" + convertObservable);
}
Function為一個(gè)接口:

并且在
map函數(shù)調(diào)用完畢之后,將返回一個(gè)新的Observable,它的類型為ObservableMap:
3.2 FlatMap
-
FlatMap用于將一個(gè)發(fā)送事件的上游Observable變換成多個(gè)發(fā)送事件的Observable,然后將它們發(fā)送的事件合并,放進(jìn)一個(gè)單獨(dú)的Observable中,其注釋為:
- 上游每發(fā)送一個(gè)事件,就會(huì)針對(duì)該事件創(chuàng)建一個(gè)單獨(dú)的水管,然后發(fā)送轉(zhuǎn)換后的新的事件,下游接收到的就是這些新的水管發(fā)送的事件。
-
FlatMap不保證不同水管之間事件的順序,如果需要保證順序,則需要使用contactMap。
3.2.1 示例
static void flatMapSample() {
Observable<Integer> sourceObservable = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> observableEmitter) throws Exception {
observableEmitter.onNext(1);
observableEmitter.onNext(2);
observableEmitter.onNext(3);
}
});
Observable<String> flatObservable = sourceObservable.flatMap(new Function<Integer, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(Integer integer) throws Exception {
return Observable.fromArray("a value of " + integer + ",b value of " + integer);
}
});
flatObservable.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d(TAG, s);
}
});
}
和map操作符類似,它也接收一個(gè)類型為Function的接口,只不過它的? extends R參數(shù)類型換成了? extends Observable<? extends R>。
3.2.2 FlatMap 不保證下游接收事件的順序
前面我們說到,flatMap操作符不會(huì)保證下游接收事件的順序,下面,我們就以一個(gè)例子來說明,在flatMap的apply函數(shù)中,我們將一個(gè)事件轉(zhuǎn)換成兩個(gè)Observable,并且加上了延時(shí):
static void flatMapOrderSample() {
Observable<Integer> sourceObservable = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> observableEmitter) throws Exception {
Log.d(TAG, "flatMapOrderSample emit 1");
observableEmitter.onNext(1);
Log.d(TAG, "flatMapOrderSample emit 2");
observableEmitter.onNext(2);
Log.d(TAG, "flatMapOrderSample emit 3");
observableEmitter.onNext(3);
}
});
Observable<String> flatObservable = sourceObservable.flatMap(new Function<Integer, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(Integer integer) throws Exception {
Log.d(TAG, "flatMapOrderSample apply=" + integer);
long delay = (3 - integer) * 100;
return Observable.fromArray("a value of " + integer, "b value of " + integer).delay(delay, TimeUnit.MILLISECONDS);
}
});
flatObservable.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d(TAG, s);
}
});
}
可以看到,最終的輸出結(jié)果和flatMap收到事件的順序并不相同:

下面,還是同樣的場(chǎng)景,將
flatMap換成contactMap:
static void contactMapOrderSample() {
Observable<Integer> sourceObservable = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> observableEmitter) throws Exception {
Log.d(TAG, "contactMapOrderSample emit 1");
observableEmitter.onNext(1);
Log.d(TAG, "contactMapOrderSample emit 1");
observableEmitter.onNext(2);
Log.d(TAG, "contactMapOrderSample emit 1");
observableEmitter.onNext(3);
}
});
Observable<String> flatObservable = sourceObservable.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).concatMap(new Function<Integer, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(Integer integer) throws Exception {
Log.d(TAG, "contactMapOrderSample apply=" + integer);
long delay = (3 - integer) * 100;
return Observable.fromArray("a value of " + integer, "b value of " + integer).delay(delay, TimeUnit.MILLISECONDS);
}
});
flatObservable.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d(TAG, s);
}
});
}
最終的運(yùn)行結(jié)果為:

四、Zip 操作符
4.1 基本概念
-
Zip通過一個(gè)函數(shù)從多個(gè)Observable每次各取出一個(gè)事件,合并成一個(gè)新的事件發(fā)送給下游。 - 組合的順序是嚴(yán)格按照事件發(fā)送的順序來的。
- 最終下游收到的事件數(shù)量和上游中發(fā)送事件最少的那一根水管的事件數(shù)量相同。
4.1.1 兩個(gè) Observable 運(yùn)行在同一線程當(dāng)中
static void zipSample() {
Observable<Integer> sourceObservable = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> observableEmitter) throws Exception {
Log.d(TAG, "sourceObservable emit 1");
observableEmitter.onNext(1);
Thread.sleep(1000);
Log.d(TAG, "sourceObservable emit 2");
observableEmitter.onNext(2);
Log.d(TAG, "sourceObservable emit 3");
observableEmitter.onNext(3);
Log.d(TAG, "sourceObservable emit 4");
observableEmitter.onNext(4);
}
});
Observable<Integer> otherObservable = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> observableEmitter) throws Exception {
Log.d(TAG, "otherObservable emit 1");
observableEmitter.onNext(1);
Log.d(TAG, "otherObservable emit 2");
observableEmitter.onNext(2);
Log.d(TAG, "otherObservable emit 3");
observableEmitter.onNext(3);
}
});
Observable.zip(sourceObservable, otherObservable, new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(Integer integer, Integer integer2) throws Exception {
return integer + integer2;
}
}).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable disposable) {
Log.d(TAG, "resultObservable onSubscribe");
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "resultObservable onNext=" + integer);
}
@Override
public void onError(Throwable throwable) {
Log.d(TAG, "resultObservable onError");
}
@Override
public void onComplete() {
Log.d(TAG, "resultObservable onComplete");
}
});
}
此時(shí)的運(yùn)行結(jié)果為:

4.1.2 兩個(gè) Observable 運(yùn)行在不同的線程
static void zipSample() {
Observable<Integer> sourceObservable = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> observableEmitter) throws Exception {
Log.d(TAG, "sourceObservable emit 1");
observableEmitter.onNext(1);
Thread.sleep(1000);
Log.d(TAG, "sourceObservable emit 2");
observableEmitter.onNext(2);
Log.d(TAG, "sourceObservable emit 3");
observableEmitter.onNext(3);
Log.d(TAG, "sourceObservable emit 4");
observableEmitter.onNext(4);
}
});
Observable<Integer> otherObservable = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> observableEmitter) throws Exception {
Log.d(TAG, "otherObservable emit 1");
observableEmitter.onNext(1);
Log.d(TAG, "otherObservable emit 2");
observableEmitter.onNext(2);
Log.d(TAG, "otherObservable emit 3");
observableEmitter.onNext(3);
}
}).subscribeOn(Schedulers.io());
Observable.zip(sourceObservable, otherObservable, new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(Integer integer, Integer integer2) throws Exception {
return integer + integer2;
}
}).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable disposable) {
Log.d(TAG, "resultObservable onSubscribe");
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "resultObservable onNext=" + integer);
}
@Override
public void onError(Throwable throwable) {
Log.d(TAG, "resultObservable onError");
}
@Override
public void onComplete() {
Log.d(TAG, "resultObservable onComplete");
}
});
}
運(yùn)行結(jié)果為:

五、背壓
“背壓”其實(shí)就是一種用于解決問題的工具,那么我們的問題又是什么呢?
- 問題:當(dāng)上游發(fā)送事件的速度很快,下游消費(fèi)事件的速度又很慢,而系統(tǒng)又必須緩存這些上游發(fā)送的消息以便下游處理,那么就會(huì)導(dǎo)致系統(tǒng)中堆積了很多的資源。
- 工具:下游告知上游目前自己的處理能力,上游根據(jù)下游的處理能力,進(jìn)行適當(dāng)?shù)恼{(diào)整。
想必大家在很多文章中都聽過這個(gè)一句話:在RxJava2中,Observable不支持“背壓”,而Flowable支持背壓。
5.1 不支持背壓的 Observable
關(guān)于Observable不支持背壓,我們應(yīng)當(dāng)從兩種情況去考慮,即上游、下游是否位于相同的線程。
5.1.1 Observable 之上游、下游位于相同線程
首先,我們不調(diào)用observeOn和subscribeOn方法來改變上游、下游的工作線程,這樣,上游和下游就位于同一線程,同時(shí),我們?cè)谙掠蔚奶幚砗瘮?shù)中,每收到一個(gè)消息就休眠2000ms,以模擬上游處理速度大于下游的場(chǎng)景。
static void oomSample() {
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> observableEmitter) throws Exception {
for (int i = 0; i < 1000; i++) {
Log.d(TAG, "observableEmitter=" + i);
observableEmitter.onNext(i);
}
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Thread.sleep(2000);
Log.d(TAG, "accept=" + integer);
}
});
}
從下面的打印結(jié)果可以看到,當(dāng)“使用 Observable,并且上游、下游位于相同線程”時(shí),并不會(huì)出現(xiàn)消息堆積的情況,因?yàn)樯嫌伟l(fā)射完一條消息后,必須要等到下游處理完該消息,才會(huì)發(fā)射一條新的消息。

5.1.2 Observable 之上游、下游位于不同線程
接著,我們采用subscribeOn和observeOn來使得上游和下游位于不同的工作線程,其它均和2.2中相同。
static void oomSample() {
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> observableEmitter) throws Exception {
for (int i = 0; i < 1000; i++) {
Log.d(TAG, "observableEmitter=" + i);
observableEmitter.onNext(i);
}
}
}).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Thread.sleep(2000);
Log.d(TAG, "accept=" + integer);
}
});
}
和2.2中不同,當(dāng)上游和下游位于不同的工作線程,那么上游發(fā)送消息時(shí),不會(huì)考慮下游是否已經(jīng)處理了之前的消息,它會(huì)直接發(fā)送,而這些發(fā)送的消息被存放在水缸當(dāng)中,下游每處理完一條消息,就去水缸中取下一條數(shù)據(jù),那么隨著水缸中數(shù)據(jù)越來越多,那么系統(tǒng)中的無用資源就會(huì)急劇增加。

5.1.3 關(guān)于 Observable 不支持背壓的小結(jié)
我們之所以說Observable不支持“背壓”,就是在2.1介紹的整個(gè)族譜中,沒有一個(gè)類,一種方法能讓下游通知上游說:不要再發(fā)消息到水缸里了,我已經(jīng)處理不過來了!
那是不是說Flowable支持“背壓”,而Observable不支持,那么Observable就要被取代了呢,其實(shí)不然,Flowable對(duì)于“背壓”的支持是以性能為代價(jià)的,我們應(yīng)當(dāng)只在有可能出現(xiàn)2.3中上游下游速率不匹配的問題時(shí),才去使用Flowable,否則就應(yīng)當(dāng)使用Observable,也就是滿足兩點(diǎn)條件:
- 上游和下游位于不同的工作線程
- 上游發(fā)送消息的速度,要遠(yuǎn)遠(yuǎn)大于下游處理消息的速度,有可能造成消息的堆積。
5.2 支持背壓的 Flowable
5.2.1 基本概念
-
Flowable和Subscriber分別對(duì)應(yīng)于之前討論的Observable和Observer,它們直接的連接仍然是通過subscribe方法。 -
Flowable在設(shè)計(jì)的時(shí)候采用了 響應(yīng)式拉取 的思想,當(dāng)下游調(diào)用了Subscription的request方法時(shí),就表明了下游處理事件的能力,這樣上游就可以根據(jù)這個(gè)值來控制事件發(fā)送的頻率,避免出現(xiàn)前面談到的上游發(fā)送太快,而下游處理太慢從而導(dǎo)致OOM的發(fā)生。 - 只有上游根據(jù)下游的處理能力來發(fā)送事件,才能達(dá)到理想的效果。
5.2.2 基本使用
static void flowSample() {
Flowable<Integer> sourceFlow = Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onComplete();
}
}, BackpressureStrategy.ERROR);
sourceFlow.subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription subscription) {
Log.d(TAG, "onSubscribe");
subscription.request(Long.MAX_VALUE);
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext=" + integer);
}
@Override
public void onError(Throwable throwable) {
Log.d(TAG, "onError");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
}
其類結(jié)構(gòu)圖和Observable幾乎完全一致:

5.3 Flowable 支持背壓的策略
從上面的類圖可以看出,Flowable和Observable最大的不同,就是在create方法中,需要傳入額外的參數(shù),它表示的是“背壓”的策略,這里可選的值包括:
ERRORBUFFERDROPLATEST
5.3.1 使用 ERROR 的策略
- 當(dāng)上游和下游位于同一個(gè)線程時(shí),如果上游發(fā)送的事件超過了下游聲明的
request(n)的值,那么會(huì)拋出MissingBackpressureException異常。 - 當(dāng)上游和下游位于不同線程時(shí),如果上游發(fā)送的事件超過了下游的聲明,事件會(huì)被放在水缸當(dāng)中,這個(gè)水缸默認(rèn)的大小是
128,只有當(dāng)下游調(diào)用request時(shí),才從水缸中取出事件發(fā)送給下游,如果水缸中事件的個(gè)數(shù)超過了128,那么也會(huì)拋出MissingBackpressureException異常。
下面這段代碼,我們先將三個(gè)事件放入到水缸當(dāng)中,之后每次調(diào)用request方法就會(huì)從水缸當(dāng)中取出一個(gè)事件發(fā)送給下游。
static void flowSample() {
Flowable<Integer> sourceFlow = Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onComplete();
}
}, BackpressureStrategy.ERROR).subscribeOn(Schedulers.io());
sourceFlow.observeOn(Schedulers.newThread()).subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription subscription) {
Log.d(TAG, "onSubscribe");
sSubscription = subscription;
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext=" + integer);
}
@Override
public void onError(Throwable throwable) {
Log.d(TAG, "onError");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
}
static void clickSubscription() {
if (sSubscription != null) {
sSubscription.request(1);
}
}
當(dāng)上游和下游位于不同的線程,每次通過Subscription調(diào)用request就會(huì)從水缸中取出一個(gè)事件,發(fā)送給下游:

5.3.2 BUFFER 策略
- 使用
BUFFER策略時(shí),相當(dāng)于在上游放置了一個(gè)容量無限大的水缸,所有下游暫時(shí)無法處理的消息都放在水缸當(dāng)中,這里不再像ERROR策略一樣,區(qū)分上游和下游是否位于同一線程。 - 因此,如果下游一直沒有處理消息,那么將會(huì)導(dǎo)致內(nèi)存一直增長,從而引起
OOM。
static void clickSubscription() {
if (sSubscription != null) {
sSubscription.request(10);
}
}
static void flowBufferSample() {
Flowable<Integer> sourceFlow = Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
for (int i = 0; i < 10000;i ++) {
emitter.onNext(i);
}
emitter.onComplete();
}
}, BackpressureStrategy.BUFFER).subscribeOn(Schedulers.io());
sourceFlow.observeOn(Schedulers.newThread()).subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription subscription) {
Log.d(TAG, "onSubscribe");
sSubscription = subscription;
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext=" + integer);
}
@Override
public void onError(Throwable throwable) {
Log.d(TAG, "onError");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
}
在上面的例子中,我們先把10000條消息放入到水缸當(dāng)中,之后通過Subscription每次從水缸中取出10條消息發(fā)送給下游,演示結(jié)果為:

5.3.3 DROP 策略
- 使用
DROP策略時(shí),會(huì)把水缸無法存放的事件丟棄掉,這里同樣不會(huì)受到下游和下游是否處于同一個(gè)線程的限制。
static void flowDropSample() {
Flowable<Integer> sourceFlow = Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
for (int i = 0; i < 130; i++) {
emitter.onNext(i);
}
}
}, BackpressureStrategy.DROP).subscribeOn(Schedulers.io());
sourceFlow.observeOn(Schedulers.io()).subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription subscription) {
Log.d(TAG, "onSubscribe");
sSubscription = subscription;
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext=" + integer);
}
@Override
public void onError(Throwable throwable) {
Log.d(TAG, "onError");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
}
我們先往水缸中放入130條消息,之后每次通過Subscription取出60條消息發(fā)送給下游,可以看到,最后最多只取到了第128條消息,第129/130條消息被丟棄了。

5.3.4 LATEST 策略
- 和
DROP類似,當(dāng)水缸無法容納下消息時(shí),會(huì)將它丟棄,但是除此之外,上游還會(huì)緩存最新的一條消息,實(shí)例如下:
static void flowLatestSample() {
Flowable<Integer> sourceFlow = Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
for (int i = 0; i < 130; i++) {
emitter.onNext(i);
}
}
}, BackpressureStrategy.LATEST).subscribeOn(Schedulers.io());
sourceFlow.observeOn(Schedulers.io()).subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription subscription) {
Log.d(TAG, "onSubscribe");
sSubscription = subscription;
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext=" + integer);
}
@Override
public void onError(Throwable throwable) {
Log.d(TAG, "onError");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
}
從下面的運(yùn)行結(jié)果可以看出,當(dāng)取出最后一批數(shù)據(jù)的時(shí)候,上游除了收到存儲(chǔ)在水缸當(dāng)中的數(shù)據(jù),還額外收到了最后一條消息,也就是第130條數(shù)據(jù),這就是DROP策略和LATEST策略的區(qū)別:

更多文章,歡迎訪問我的 Android 知識(shí)梳理系列:
- Android 知識(shí)梳理目錄:http://www.itdecent.cn/p/fd82d18994ce
- 個(gè)人主頁:http://lizejun.cn
- 個(gè)人知識(shí)總結(jié)目錄:http://lizejun.cn/categories/


