前言
上一節(jié)我們學習了使用Observable解決上、下游發(fā)射事件速度不平衡的問題,之所以學習 Observable,是因為Observable有很多的使用場景,而這一節(jié)我們要學習一個新的操作符 —— Flowable操作符,這個操作符可以解決絕大部分的問題,但是Observable、Flowable各有應用場景,也各有優(yōu)勢和缺點。
1. Flowable
Flowable如下圖所示:

- Observable的上、下游分別是Observable、Observer,建立連接是subscribe();
- Flowable的上、下游分別是Flowable、Subscriber,建立連接是subscribe();
Flowable最基本用法如下:
/**
* Flowable最基本用法
*/
public static void demo1(){
// 創(chuàng)建一個上游:Flowable
Flowable<Integer> upStream = Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
Log.e("TAG" , "emit 1") ;
emitter.onNext(1);
Log.e("TAG" , "emit 2") ;
emitter.onNext(2);
Log.e("TAG" , "emit 3") ;
emitter.onNext(3);
Log.e("TAG" , "emit complete") ;
emitter.onComplete();
} // 參數(shù)BackpressureStrategy.ERROR作用:
// 用來選擇背壓,用于解決上下游發(fā)射數(shù)據(jù)速度不平衡問題,如果速度不一致,
// 直接拋異常MissingBackpressureException
} , BackpressureStrategy.ERROR) ;
// 創(chuàng)建一個下游:Subscriber
Subscriber<Integer> downStream = new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
Log.e("TAG" , "subscribe") ;
s.request(Long.MAX_VALUE);
}
@Override
public void onNext(Integer integer) {
Log.e("TAG" , "next -> " + integer) ;
}
@Override
public void onError(Throwable t) {
Log.e("TAG" , "error -> " + t) ;
}
@Override
public void onComplete() {
Log.e("TAG" , "complete") ;
}
} ;
// 建立連接
upStream.subscribe(downStream) ;
}
運行結果如下:
TAG: subscribe
TAG: emit 1
TAG: next -> 1
TAG: emit 2
TAG: next -> 2
TAG: emit 3
TAG: next -> 3
TAG: emit complete
TAG: complete
Flowable寫法與Observable稍有區(qū)別:
- 第一個區(qū)別是:Flowable上游中新增一個參數(shù),BackpressureStrategy.ERROR,用來選擇背壓,用于是解決上下游發(fā)射數(shù)據(jù)速度不平衡問題,如果速度不一致,直接拋出異常MissingBackpressureException;
- 第二個區(qū)別是:在下游的onSubscribe()方法中,接收的是Subscription ,而不是Observable中的Disposable,這兩個作用都是用于切斷水管,調用Subscription.cancel()、調用Disposable.dispose(),不同的是在 Subscription中需要調用 request(),即就是代碼中的
s.request(Long.MAX_VALUE);
- 如果不添加 s.request(Long.MAX_VALUE); ,直接報錯MissingBackpressureException,示例代碼如下:
/**
* Flowable用法:在onSubscribe()中不加 s.request(Long.MAX_VALUE);
*/
public static void demo2(){
Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
Log.e("TAG" , "emit 1") ;
emitter.onNext(1);
Log.e("TAG" , "emit 2") ;
emitter.onNext(2);
Log.e("TAG" , "emit 3") ;
emitter.onNext(3);
Log.e("TAG" , "emit complete") ;
emitter.onComplete();
}
} , BackpressureStrategy.ERROR).subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
Log.e("TAG" , "subscribe") ;
}
@Override
public void onNext(Integer integer) {
Log.e("TAG" , "next -> " + integer) ;
}
@Override
public void onError(Throwable t) {
Log.e("TAG" , "error -> " + t) ;
}
@Override
public void onComplete() {
Log.e("TAG" , "complete") ;
}
});
}

可以看到,在上游發(fā)送第一個事件后,下游直接拋出著名異常MissingBackpressureException異常,且下游并沒有收到任何其余的事件,上、下游在同一個線程,是同步關系,按道理來講,上游每次發(fā)送事件都應該等下游處理完事件后,才會繼續(xù)發(fā)送事件,下邊我們看異步線程的情況。
2. Flowable讓上下游在異步線程中執(zhí)行
代碼如下:
/**
* Flowable:
* 讓上、下游處于異步線程中,也就是說讓上游在子線程中執(zhí)行,下游在主線程中執(zhí)行
*/
public static void demo3(){
Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
Log.e("TAG" , "emit 1") ;
emitter.onNext(1);
Log.e("TAG" , "emit 2") ;
emitter.onNext(2);
Log.e("TAG" , "emit 3") ;
emitter.onNext(3);
Log.e("TAG" , "emit complete") ;
emitter.onComplete();
}
} , BackpressureStrategy.ERROR).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
Log.e("TAG" , "subscribe") ;
mSubscription = s ;
}
@Override
public void onNext(Integer integer) {
Log.e("TAG" , "next -> " + integer) ;
}
@Override
public void onError(Throwable t) {
Log.e("TAG" , "error -> " + t) ;
}
@Override
public void onComplete() {
Log.e("TAG" , "complete") ;
}
}) ;
}
運行結果如下:
TAG: subscribe
TAG: emit 1
TAG: emit 2
TAG: emit 3
TAG: emit complete
可以看到,上游發(fā)送所有事件,但是下游一個都沒有收到
- 在onSubscribe()方法中調用的s.request(Long.MAX_VALUE); 可以把request()方法當做一種能力,當成下游處理事件的能力,下游能處理幾個,就告訴上游我要幾個,可以解決上游一次性發(fā)射全部數(shù)據(jù)到下游,導致OOM,可以解決上一節(jié)學習的兩種處理方式的缺陷:
1>:取樣事件會導致事件丟失;
2>:減速導致性能丟失;
而調用s.request(Long.MAX_VALUE)方法 這種 方式既解決了事件丟失又解決了性能丟失;
3. 認識 request()方法
1>:對于上、下游二者在同一個線程中,上游發(fā)射第一個事件,下游直接拋出異常MissingBackpressureException異常,是因為下游沒有調用request()方法,表示上游認為下游沒有處理事件的能力,上游發(fā)送事件,下游必須在onSubscribe()方法中調用request()方法,表示下游有處理事件的能力,比如 request(3)或者request(Long.MAX_VALUE)都是可以的;
2>:對于上、下游二者沒有在同一個線程中,即異步操作,上游可以正確發(fā)送所有事件,因為在 Flowable中默認有一個 128的水缸,當上下游在不同的線程中工作時,上游會先把事件發(fā)送到這個水缸中,所以說,即使下游沒有調用 request()方法,但是水缸中保存著上游發(fā)射的事件,只有當下游調用request()方法時,才從水缸中取出事件發(fā)送給 下游;
注意:
1>:Flowable中的 水缸大小只有 128,如果是129,直接拋MissingBackpressureException異常;
2>:我們這里把上游發(fā)射的事件全部存儲到 水缸中,下游一個都沒有消費,只要下游快速消費一個,就不會OOM,如果下游10秒之后再消費也會OOM;