本篇文章已授權(quán)微信公眾號(hào) YYGeeker 獨(dú)家發(fā)布轉(zhuǎn)載請(qǐng)標(biāo)明出處
CSDN學(xué)院課程地址
- RxJava2從入門到精通-初級(jí)篇:https://edu.csdn.net/course/detail/10036
- RxJava2從入門到精通-中級(jí)篇:https://edu.csdn.net/course/detail/10037
- RxJava2從入門到精通-進(jìn)階篇:https://edu.csdn.net/course/detail/10038
- RxJava2從入門到精通-源碼分析篇:https://edu.csdn.net/course/detail/10138
5. RxJava背壓策略(BackpressureStrategy)
5.1 背壓是什么
背壓的概念是在平時(shí)業(yè)務(wù)開發(fā)時(shí)較為常見,大多數(shù)是針對(duì)高并發(fā)的業(yè)務(wù),背壓是必須考慮的因素之一。在異步場(chǎng)景中,由于數(shù)據(jù)流的發(fā)射速度高于數(shù)據(jù)流的接收速度,就會(huì)導(dǎo)致數(shù)據(jù)不能及時(shí)處理,從而導(dǎo)致數(shù)據(jù)流的阻塞。背壓所要做的事情就是主動(dòng)控制數(shù)據(jù)流發(fā)射的速度
在RxJava2.0中,推出了Flowable用來支持背壓,去除了Observable對(duì)背壓的支持,下面在背壓策略的講解中,我們都使用Flowable作為我們的響應(yīng)類型。在使用背壓時(shí),只需要在create()方法中第二個(gè)參數(shù)添加背壓策略即可
- 在訂閱的時(shí)候如果使用
FlowableSubscriber,那么需要通過s.request(Long.MAX_VALUE)去主動(dòng)請(qǐng)求上游的數(shù)據(jù)項(xiàng)。如果遇到背壓報(bào)錯(cuò)的時(shí)候,FlowableSubscriber默認(rèn)已經(jīng)將錯(cuò)誤try-catch,并通過onError()進(jìn)行回調(diào),程序并不會(huì)崩潰 - 在訂閱的時(shí)候如果使用
Consumer,那么不需要主動(dòng)去請(qǐng)求上游數(shù)據(jù),默認(rèn)已經(jīng)調(diào)用了s.request(Long.MAX_VALUE)。如果遇到背壓報(bào)錯(cuò)、且對(duì)Throwable的Consumer沒有new出來,則程序直接崩潰 - 背壓策略的上游的默認(rèn)緩存池是128
public abstract class Flowable<T> implements Publisher<T> {
/** The default buffer size. */
static final int BUFFER_SIZE;
static {
BUFFER_SIZE = Math.max(1, Integer.getInteger("rx2.buffer-size", 128));
}
}
5.2 MISSING
MISSING表示OnNext事件沒有任何緩存和丟棄,下游要處理任何溢出,可以理解為相當(dāng)于沒有指定背壓策略。Flowable相當(dāng)于沒有指定背壓策略可以將下游要處理任何溢出理解為,上游發(fā)射的數(shù)據(jù)未得到處理,就會(huì)緩存起來,當(dāng)緩存容量達(dá)到128時(shí),再增加一個(gè)未處理的數(shù)據(jù)項(xiàng),就會(huì)拋出MissingBackpressureException,且?guī)в嘘?duì)列已經(jīng)滿了的友好提示。這里就好比一個(gè)大水缸,當(dāng)水注滿的時(shí)候,它就會(huì)把蓋子蓋上,不讓你再繼續(xù)注水了
這里我們模擬上游發(fā)送速度高于下游數(shù)據(jù)流的處理速度,在數(shù)據(jù)處理的時(shí)候加上
Thread.sleep(1000)
public void missing() {
Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
for (int i = 0; i < 129; i++) {
emitter.onNext(i);
}
}
}, BackpressureStrategy.MISSING)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new FlowableSubscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
s.request(Long.MAX_VALUE);
}
@Override
public void onNext(Integer integer) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
Log.e("TAG", "onNext=" + integer);
}
@Override
public void onError(Throwable t) {
t.printStackTrace();
}
@Override
public void onComplete() {
}
});
}
輸出
io.reactivex.exceptions.MissingBackpressureException: Queue is full?!
5.3 ERROR
ERROR表示在下游無法跟上時(shí),會(huì)拋出MissingBackpressureException??梢詫⑾掠螣o法跟上理解為,上游發(fā)射的數(shù)據(jù)未得到處理,就會(huì)緩存起來,當(dāng)緩存容量達(dá)到128時(shí),再增加一個(gè)未處理的數(shù)據(jù)項(xiàng),就會(huì)拋出MissingBackpressureException。這里好比一個(gè)大水缸,當(dāng)水注滿的時(shí)候,它會(huì)把水缸撐破了,直接破裂
public void error() {
Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
for (int i = 0; i < 129; i++) {
emitter.onNext(i);
}
}
}, BackpressureStrategy.ERROR)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new FlowableSubscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
s.request(Long.MAX_VALUE);
}
@Override
public void onNext(Integer integer) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
Log.e("TAG", "onNext=" + integer);
}
@Override
public void onError(Throwable t) {
t.printStackTrace();
}
@Override
public void onComplete() {
}
});
}
輸出
io.reactivex.exceptions.MissingBackpressureException: create: could not emit value due to lack of requests
5.4 BUFFER
上游不斷的發(fā)出onNext請(qǐng)求,直到下游處理完,上游發(fā)射的數(shù)據(jù)項(xiàng)的緩存池是無限大的,程序也不會(huì)拋出錯(cuò)誤,但是要注意程序OOM的現(xiàn)象,因?yàn)榫彺嬖酱?,占用的?nèi)存就越多。例子中發(fā)射129個(gè)數(shù)據(jù)項(xiàng),然而程序并沒有崩潰,只會(huì)一直讀取緩存池的數(shù)據(jù)項(xiàng),直到數(shù)據(jù)項(xiàng)被處理完。這里就是一個(gè)無限大的水缸
背壓策略除了BUFFER策略的緩存池是無限大之外,其他默認(rèn)的緩存池都是128
public void buffer() {
Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
for (int i = 0; i < 1000; i++) {
emitter.onNext(i);
}
}
}, BackpressureStrategy.BUFFER)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new FlowableSubscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
s.request(Long.MAX_VALUE);
}
@Override
public void onNext(Integer integer) {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
Log.e("TAG", "onNext=" + integer);
}
@Override
public void onError(Throwable t) {
t.printStackTrace();
}
@Override
public void onComplete() {
}
});
}
輸出
onNext=0
onNext=1
onNext=2
......
onNext=998
onNext=999
5.5 DROP
會(huì)在下游跟不上速度時(shí),把onNext的值丟棄,簡(jiǎn)單的說就是,超過緩存區(qū)大?。?28)的數(shù)據(jù)項(xiàng)都會(huì)被丟棄。例子中通過發(fā)射800個(gè)數(shù)據(jù)項(xiàng),那么我們只會(huì)收到0-127的數(shù)據(jù)項(xiàng)。如果我們?cè)俅握{(diào)用request(),這時(shí)候取到的數(shù)據(jù)就是上一次request()后的128個(gè)數(shù)據(jù)。這里好比一個(gè)大水缸,當(dāng)水注滿的時(shí)候,水還是在繼續(xù)的流,一旦有request調(diào)用的時(shí)候,它就會(huì)去取出水缸里的所有水,這時(shí)候水缸就是空的,但水一直在流,所以水缸馬上又會(huì)被注滿,這個(gè)時(shí)候就要等request再次取出水缸里的水
public void drop() {
Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
for (int i = 0; i < 1000; i++) {
emitter.onNext(i);
}
}
}, BackpressureStrategy.DROP)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new FlowableSubscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
s.request(Long.MAX_VALUE);
}
@Override
public void onNext(Integer integer) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
Log.e("TAG", "onNext=" + integer);
}
@Override
public void onError(Throwable t) {
t.printStackTrace();
}
@Override
public void onComplete() {
}
});
}
輸出
onNext=0
onNext=1
onNext=2
......
onNext=127
5.6 LATEST
LATEST與Drop策略一樣,如果超過緩存池容量大小的數(shù)據(jù)項(xiàng)都會(huì)被丟棄。不同的是,不管緩存池的狀態(tài)如何,LATEST都會(huì)將最后一條數(shù)據(jù)強(qiáng)行放入緩存池中。這里的水缸容納下了最后一滴水
public void latest() {
Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
for (int i = 0; i < 1000; i++) {
emitter.onNext(i);
}
}
}, BackpressureStrategy.LATEST)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new FlowableSubscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
s.request(Long.MAX_VALUE);
}
@Override
public void onNext(Integer integer) {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
Log.e("TAG", "onNext=" + integer);
}
@Override
public void onError(Throwable t) {
t.printStackTrace();
}
@Override
public void onComplete() {
}
});
}
輸出
onNext=0
onNext=1
......
onNext=126
onNext=127
onNext=999
5.7 小結(jié)
- MISSING:沒有任何緩存和丟棄,下游要處理任何溢出
- ERROR:下游的處理速度無法跟上上游的發(fā)射速度時(shí)報(bào)錯(cuò)
- BUFFER:數(shù)據(jù)項(xiàng)的緩存池?zé)o限大
- DROP:下游的處理速度無法跟上上游的發(fā)射速度時(shí)丟棄
- LATEST:最后一條數(shù)據(jù)項(xiàng)被強(qiáng)行放入緩存池