Outline
[TOC]
前言
好久不見朋友們,最近一段時(shí)間在忙工作上的事情,沒(méi)來(lái)得及寫文章,這兩天正好有點(diǎn)時(shí)間,趕緊寫下了這篇教程,免得大家說(shuō)我太監(jiān)了。
正題
先來(lái)回顧一下上上節(jié),我們講Flowable的時(shí)候,說(shuō)它采用了響應(yīng)式拉的方式,我們還舉了個(gè)葉問(wèn)打小日本的例子,再來(lái)回顧一下吧,我們說(shuō)把上游看成小日本, 把下游當(dāng)作葉問(wèn), 當(dāng)調(diào)用Subscription.request(1)時(shí), 葉問(wèn)就說(shuō)我要打一個(gè)! 然后小日本就拿出一個(gè)鬼子給葉問(wèn), 讓他打, 等葉問(wèn)打死這個(gè)鬼子之后, 再次調(diào)用request(10), 葉問(wèn)就又說(shuō)我要打十個(gè)! 然后小日本又派出十個(gè)鬼子給葉問(wèn), 然后就在邊上看熱鬧, 看葉問(wèn)能不能打死十個(gè)鬼子, 等葉問(wèn)打死十個(gè)鬼子后再繼續(xù)要鬼子接著打。
但是不知道大家有沒(méi)有發(fā)現(xiàn),在我們前兩節(jié)中的例子中,我們口中聲稱的響應(yīng)式拉并沒(méi)有完全體現(xiàn)出來(lái),比如這個(gè)例子:
Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
Log.d(TAG, "emit 1");
emitter.onNext(1);
Log.d(TAG, "emit 2");
emitter.onNext(2);
Log.d(TAG, "emit 3");
emitter.onNext(3);
Log.d(TAG, "emit complete");
emitter.onComplete();
}
}, BackpressureStrategy.ERROR).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
Log.d(TAG, "onSubscribe");
mSubscription = s;
s.request(1);
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext: " + integer);
mSubscription.request(1);
}
@Override
public void onError(Throwable t) {
Log.w(TAG, "onError: ", t);
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
雖然我們?cè)谙掠沃惺敲看翁幚淼袅艘粋€(gè)事件之后才調(diào)用request(1)去請(qǐng)求下一個(gè)事件,也就是說(shuō)葉問(wèn)的確是在打死了一個(gè)鬼子之后才繼續(xù)打下一個(gè)鬼子,可是上游呢?上游真的是每次當(dāng)下游請(qǐng)求一個(gè)才拿出一個(gè)嗎?從上上篇文章中我們知道并不是這樣的,上游仍然是一開始就發(fā)送了所有的事件,也就是說(shuō)小日本并沒(méi)有等葉問(wèn)打死一個(gè)才拿出一個(gè),而是一開始就拿出了所有的鬼子,這些鬼子從一開始就在這兒排隊(duì)等著被打死。
有個(gè)故事是這么說(shuō)的:
楚人有賣盾與矛者,先譽(yù)其盾之堅(jiān),曰:“吾盾之堅(jiān),物莫能陷也。”俄而又譽(yù)其矛之利,曰:“吾矛之利,萬(wàn)物莫不陷也。”市人詰之曰:"以子之矛陷子之盾,何如?”其人弗能應(yīng)也。眾皆笑之。
沒(méi)錯(cuò),我們前后所說(shuō)的就是自相矛盾了,這說(shuō)明了什么呢,說(shuō)明我們的實(shí)現(xiàn)并不是一個(gè)完整的實(shí)現(xiàn),那么,究竟怎樣的實(shí)現(xiàn)才是完整的呢?
我們先自己來(lái)想一想,在下游中調(diào)用Subscription.request(n)就可以告訴上游,下游能夠處理多少個(gè)事件,那么上游要根據(jù)下游的處理能力正確的去發(fā)送事件,那么上游是不是應(yīng)該知道下游的處理能力是多少啊,對(duì)吧,不然,一個(gè)巴掌拍不響啊,這種事情得你情我愿才行。
那么上游從哪里得知下游的處理能力呢?我們來(lái)看看上游最重要的部分,肯定就是FlowableEmitter了啊,我們就是通過(guò)它來(lái)發(fā)送事件的啊,來(lái)看看它的源碼吧(別緊張,它的代碼灰常簡(jiǎn)單):
public interface FlowableEmitter<T> extends Emitter<T> {
void setDisposable(Disposable s);
void setCancellable(Cancellable c);
/**
* The current outstanding request amount.
* <p>This method is thread-safe.
* @return the current outstanding request amount
*/
long requested();
boolean isCancelled();
FlowableEmitter<T> serialize();
}
FlowableEmitter是個(gè)接口,繼承Emitter,Emitter里面就是我們的onNext(),onComplete()和onError()三個(gè)方法。我們看到FlowableEmitter中有這么一個(gè)方法:
long requested();
方法注釋的意思就是當(dāng)前外部請(qǐng)求的數(shù)量,哇哦,這好像就是我們要找的答案呢. 我們還是實(shí)際驗(yàn)證一下吧.
先來(lái)看同步的情況吧:
public static void demo1() {
Flowable
.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
Log.d(TAG, "current requested: " + emitter.requested());
}
}, BackpressureStrategy.ERROR)
.subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
Log.d(TAG, "onSubscribe");
mSubscription = s;
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext: " + integer);
}
@Override
public void onError(Throwable t) {
Log.w(TAG, "onError: ", t);
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
}
這個(gè)例子中,我們?cè)谏嫌沃写蛴〕霎?dāng)前的request數(shù)量,下游什么也不做。
我們先猜測(cè)一下結(jié)果,下游沒(méi)有調(diào)用request(),說(shuō)明當(dāng)前下游的處理能力為0,那么上游得到的requested也應(yīng)該是0,是不是呢?
來(lái)看看運(yùn)行結(jié)果:
D/TAG: onSubscribe
D/TAG: current requested: 0
哈哈,結(jié)果果然是0,說(shuō)明我們的結(jié)論基本上是對(duì)的。
那下游要是調(diào)用了request()呢,來(lái)看看:
public static void demo1() {
Flowable
.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
Log.d(TAG, "current requested: " + emitter.requested());
}
}, BackpressureStrategy.ERROR)
.subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
Log.d(TAG, "onSubscribe");
mSubscription = s;
s.request(10); //我要打十個(gè)!
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext: " + integer);
}
@Override
public void onError(Throwable t) {
Log.w(TAG, "onError: ", t);
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
}
這次在下游中調(diào)用了request(10),告訴上游我要打十個(gè),看看運(yùn)行結(jié)果:
D/TAG: onSubscribe
D/TAG: current requested: 10
果然!上游的requested的確是根據(jù)下游的請(qǐng)求來(lái)決定的,那要是下游多次請(qǐng)求呢?比如這樣:
public static void demo1() {
Flowable
.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
Log.d(TAG, "current requested: " + emitter.requested());
}
}, BackpressureStrategy.ERROR)
.subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
Log.d(TAG, "onSubscribe");
mSubscription = s;
s.request(10); //我要打十個(gè)!
s.request(100); //再給我一百個(gè)!
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext: " + integer);
}
@Override
public void onError(Throwable t) {
Log.w(TAG, "onError: ", t);
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
}
下游先調(diào)用了request(10), 然后又調(diào)用了request(100),來(lái)看看運(yùn)行結(jié)果:
D/TAG: onSubscribe
D/TAG: current requested: 110
看來(lái)多次調(diào)用也沒(méi)問(wèn)題,做了加法。
誒加法?對(duì)哦,只是做加法,那什么時(shí)候做減法呢?
當(dāng)然是發(fā)送事件啦!
來(lái)看個(gè)例子吧:
public static void demo2() {
Flowable
.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(final FlowableEmitter<Integer> emitter) throws Exception {
Log.d(TAG, "before emit, requested = " + emitter.requested());
Log.d(TAG, "emit 1");
emitter.onNext(1);
Log.d(TAG, "after emit 1, requested = " + emitter.requested());
Log.d(TAG, "emit 2");
emitter.onNext(2);
Log.d(TAG, "after emit 2, requested = " + emitter.requested());
Log.d(TAG, "emit 3");
emitter.onNext(3);
Log.d(TAG, "after emit 3, requested = " + emitter.requested());
Log.d(TAG, "emit complete");
emitter.onComplete();
Log.d(TAG, "after emit complete, requested = " + emitter.requested());
}
}, BackpressureStrategy.ERROR)
.subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
Log.d(TAG, "onSubscribe");
mSubscription = s;
s.request(10); //request 10
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext: " + integer);
}
@Override
public void onError(Throwable t) {
Log.w(TAG, "onError: ", t);
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
}
代碼很簡(jiǎn)單,來(lái)看看運(yùn)行結(jié)果:
D/TAG: onSubscribe
D/TAG: before emit, requested = 10
D/TAG: emit 1
D/TAG: onNext: 1
D/TAG: after emit 1, requested = 9
D/TAG: emit 2
D/TAG: onNext: 2
D/TAG: after emit 2, requested = 8
D/TAG: emit 3
D/TAG: onNext: 3
D/TAG: after emit 3, requested = 7
D/TAG: emit complete
D/TAG: onComplete
D/TAG: after emit complete, requested = 7
大家應(yīng)該能看出端倪了吧,下游調(diào)用request(n) 告訴上游它的處理能力,上游每發(fā)送一個(gè)next事件之后,requested就減一,注意是next事件,complete和error事件不會(huì)消耗requested,當(dāng)減到0時(shí),則代表下游沒(méi)有處理能力了,這個(gè)時(shí)候你如果繼續(xù)發(fā)送事件,會(huì)發(fā)生什么后果呢?當(dāng)然是MissingBackpressureException啦,試一試:
public static void demo2() {
Flowable
.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(final FlowableEmitter<Integer> emitter) throws Exception {
Log.d(TAG, "before emit, requested = " + emitter.requested());
Log.d(TAG, "emit 1");
emitter.onNext(1);
Log.d(TAG, "after emit 1, requested = " + emitter.requested());
Log.d(TAG, "emit 2");
emitter.onNext(2);
Log.d(TAG, "after emit 2, requested = " + emitter.requested());
Log.d(TAG, "emit 3");
emitter.onNext(3);
Log.d(TAG, "after emit 3, requested = " + emitter.requested());
Log.d(TAG, "emit complete");
emitter.onComplete();
Log.d(TAG, "after emit complete, requested = " + emitter.requested());
}
}, BackpressureStrategy.ERROR)
.subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
Log.d(TAG, "onSubscribe");
mSubscription = s;
s.request(2); //request 2
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext: " + integer);
}
@Override
public void onError(Throwable t) {
Log.w(TAG, "onError: ", t);
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
}
還是這個(gè)例子,只不過(guò)這次只request(2), 看看運(yùn)行結(jié)果:
D/TAG: onSubscribe
D/TAG: before emit, requested = 2
D/TAG: emit 1
D/TAG: onNext: 1
D/TAG: after emit 1, requested = 1
D/TAG: emit 2
D/TAG: onNext: 2
D/TAG: after emit 2, requested = 0
D/TAG: emit 3
W/TAG: onError: io.reactivex.exceptions.MissingBackpressureException: create: could not emit value due to lack of requests
at io.reactivex.internal.operators.flowable.FlowableCreate$ErrorAsyncEmitter.onOverflow(FlowableCreate.java:411)
at io.reactivex.internal.operators.flowable.FlowableCreate$NoOverflowBaseAsyncEmitter.onNext(FlowableCreate.java:377)
at zlc.season.rxjava2demo.demo.ChapterNine$4.subscribe(ChapterNine.java:80)
at io.reactivex.internal.operators.flowable.FlowableCreate.subscribeActual(FlowableCreate.java:72)
at io.reactivex.Flowable.subscribe(Flowable.java:12218)
at zlc.season.rxjava2demo.demo.ChapterNine.demo2(ChapterNine.java:89)
at zlc.season.rxjava2demo.MainActivity$2.onClick(MainActivity.java:36)
at android.view.View.performClick(View.java:4780)
at android.view.View$PerformClick.run(View.java:19866)
at android.os.Handler.handleCallback(Handler.java:739)
at android.os.Handler.dispatchMessage(Handler.java:95)
at android.os.Looper.loop(Looper.java:135)
at android.app.ActivityThread.main(ActivityThread.java:5254)
at java.lang.reflect.Method.invoke(Native Method)
at java.lang.reflect.Method.invoke(Method.java:372)
at com.android.internal.os.ZygoteInit$MethodAndArgsCaller.run(ZygoteInit.java:903)
at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:698)
D/TAG: after emit 3, requested = 0
D/TAG: emit complete
D/TAG: after emit complete, requested = 0
到目前為止我們一直在說(shuō)同步的訂閱,現(xiàn)在同步說(shuō)完了,我們先用一張圖來(lái)總結(jié)一下同步的情況:

這張圖的意思就是當(dāng)上下游在同一個(gè)線程中的時(shí)候,在下游調(diào)用request(n)就會(huì)直接改變上游中的requested的值,多次調(diào)用便會(huì)疊加這個(gè)值,而上游每發(fā)送一個(gè)事件之后便會(huì)去減少這個(gè)值,當(dāng)這個(gè)值減少至0的時(shí)候,繼續(xù)發(fā)送事件便會(huì)拋異常了。
我們?cè)賮?lái)說(shuō)說(shuō)異步的情況,異步和同步會(huì)有區(qū)別嗎?會(huì)有什么區(qū)別呢?帶著這個(gè)疑問(wèn)我們繼續(xù)來(lái)探究。
同樣的先來(lái)看一個(gè)基本的例子:
public static void demo3() {
Flowable
.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
Log.d(TAG, "current requested: " + emitter.requested());
}
}, BackpressureStrategy.ERROR)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
Log.d(TAG, "onSubscribe");
mSubscription = s;
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext: " + integer);
}
@Override
public void onError(Throwable t) {
Log.w(TAG, "onError: ", t);
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
}
這次是異步的情況,上游啥也不做,下游也啥也不做,來(lái)看看運(yùn)行結(jié)果:
D/TAG: onSubscribe
D/TAG: current requested: 128
哈哈,又是128,看了我前幾篇文章的朋友肯定很熟悉這個(gè)數(shù)字啊!這個(gè)數(shù)字為什么和我們之前所說(shuō)的默認(rèn)的水缸大小一樣啊,莫非?
帶著這個(gè)疑問(wèn)我們繼續(xù)來(lái)研究一下:
public static void demo3() {
Flowable
.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
Log.d(TAG, "current requested: " + emitter.requested());
}
}, BackpressureStrategy.ERROR)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
Log.d(TAG, "onSubscribe");
mSubscription = s;
s.request(1000); //我要打1000個(gè)!!
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext: " + integer);
}
@Override
public void onError(Throwable t) {
Log.w(TAG, "onError: ", t);
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
}
這次我們?cè)谙掠握{(diào)用了request(1000)告訴上游我要打1000個(gè),按照之前我們說(shuō)的,這次的運(yùn)行結(jié)果應(yīng)該是1000,來(lái)看看運(yùn)行結(jié)果:
D/TAG: onSubscribe
D/TAG: current requested: 128
臥槽,你確定你沒(méi)貼錯(cuò)代碼?
是的,真相就是這樣,就是128,蜜汁128。。。


為了答疑解惑,我就直接上圖了:

可以看到,當(dāng)上下游工作在不同的線程里時(shí),每一個(gè)線程里都有一個(gè)requested,而我們調(diào)用request(1000)時(shí),實(shí)際上改變的是下游主線程中的requested,而上游中的requested的值是由RxJava內(nèi)部調(diào)用request(n)去設(shè)置的,這個(gè)調(diào)用會(huì)在合適的時(shí)候自動(dòng)觸發(fā)。
現(xiàn)在我們就能理解為什么沒(méi)有調(diào)用request,上游中的值是128了,因?yàn)橄掠卧?code>一開始就在內(nèi)部調(diào)用了request(128)去設(shè)置了上游中的值,因此即使下游沒(méi)有調(diào)用request(),上游也能發(fā)送128個(gè)事件,這也可以解釋之前我們?yōu)槭裁凑f(shuō)Flowable中默認(rèn)的水缸大小是128,其實(shí)就是這里設(shè)置的。
剛才同步的時(shí)候我們說(shuō)了,上游每發(fā)送一個(gè)事件,requested的值便會(huì)減一,對(duì)于異步來(lái)說(shuō)同樣如此,那有人肯定有疑問(wèn)了,一開始上游的requested的值是128,那這128個(gè)事件發(fā)送完了不就不能繼續(xù)發(fā)送了嗎?
剛剛說(shuō)了,設(shè)置上游requested的值的這個(gè)內(nèi)部調(diào)用會(huì)在合適的時(shí)候自動(dòng)觸發(fā),那到底什么時(shí)候是合適的時(shí)候呢?是發(fā)完128個(gè)事件才去調(diào)用嗎?還是發(fā)送了一半才去調(diào)用呢?
帶著這個(gè)疑問(wèn)我們來(lái)看下一段代碼:
public static void request() {
mSubscription.request(96); //請(qǐng)求96個(gè)事件
}
public static void demo4() {
Flowable
.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
Log.d(TAG, "First requested = " + emitter.requested());
boolean flag;
for (int i = 0; ; i++) {
flag = false;
while (emitter.requested() == 0) {
if (!flag) {
Log.d(TAG, "Oh no! I can't emit value!");
flag = true;
}
}
emitter.onNext(i);
Log.d(TAG, "emit " + i + " , requested = " + emitter.requested());
}
}
}, BackpressureStrategy.ERROR)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
Log.d(TAG, "onSubscribe");
mSubscription = s;
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext: " + integer);
}
@Override
public void onError(Throwable t) {
Log.w(TAG, "onError: ", t);
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
}
這次的上游稍微復(fù)雜了一點(diǎn)點(diǎn),首先仍然是個(gè)無(wú)限循環(huán)發(fā)事件,但是是有條件的,只有當(dāng)上游的requested != 0的時(shí)候才會(huì)發(fā)事件,然后我們調(diào)用request(96)去消費(fèi)96個(gè)事件(為什么是96而不是其他的數(shù)字先不要管),來(lái)看看運(yùn)行結(jié)果吧:
D/TAG: onSubscribe
D/TAG: First requested = 128
D/TAG: emit 0 , requested = 127
D/TAG: emit 1 , requested = 126
D/TAG: emit 2 , requested = 125
...
D/TAG: emit 124 , requested = 3
D/TAG: emit 125 , requested = 2
D/TAG: emit 126 , requested = 1
D/TAG: emit 127 , requested = 0
D/TAG: Oh no! I can't emit value!
首先運(yùn)行之后上游便會(huì)發(fā)送完128個(gè)事件,之后便不做任何事情,從打印的結(jié)果中我們也可以看出這一點(diǎn)。
然后我們調(diào)用request(96),這會(huì)讓下游去消費(fèi)96個(gè)事件,來(lái)看看運(yùn)行結(jié)果吧:
D/TAG: onNext: 0
D/TAG: onNext: 1
...
D/TAG: onNext: 92
D/TAG: onNext: 93
D/TAG: onNext: 94
D/TAG: onNext: 95
D/TAG: emit 128 , requested = 95
D/TAG: emit 129 , requested = 94
D/TAG: emit 130 , requested = 93
D/TAG: emit 131 , requested = 92
...
D/TAG: emit 219 , requested = 4
D/TAG: emit 220 , requested = 3
D/TAG: emit 221 , requested = 2
D/TAG: emit 222 , requested = 1
D/TAG: emit 223 , requested = 0
D/TAG: Oh no! I can't emit value!
可以看到,當(dāng)下游消費(fèi)掉第96個(gè)事件之后,上游又開始發(fā)事件了,而且可以看到當(dāng)前上游的requested的值是96(打印出來(lái)的95是已經(jīng)發(fā)送了一個(gè)事件減一之后的值),最終發(fā)出了第223個(gè)事件之后又進(jìn)入了等待區(qū),而223-127 正好等于 96。
這是不是說(shuō)明當(dāng)下游每消費(fèi)96個(gè)事件便會(huì)自動(dòng)觸發(fā)內(nèi)部的request()去設(shè)置上游的requested的值??!沒(méi)錯(cuò),就是這樣,而這個(gè)新的值就是96。
朋友們可以手動(dòng)試試請(qǐng)求95個(gè)事件,上游是不會(huì)繼續(xù)發(fā)送事件的。
至于這個(gè)96是怎么得出來(lái)的(肯定不是我猜的蒙的啊),感興趣的朋友可以自行閱讀源碼尋找答案,對(duì)于初學(xué)者而言應(yīng)該沒(méi)什么必要,管它內(nèi)部怎么實(shí)現(xiàn)的呢對(duì)吧。
好了今天的教程就到這里了!通過(guò)本節(jié)的學(xué)習(xí),大家應(yīng)該知道如何正確的去實(shí)現(xiàn)一個(gè)完整的響應(yīng)式拉取了,在某一些場(chǎng)景下,可以在發(fā)送事件前先判斷當(dāng)前的requested的值是否大于0,若等于0則說(shuō)明下游處理不過(guò)來(lái)了,則需要等待,例如下面這個(gè)例子。
實(shí)踐
這個(gè)例子是讀取一個(gè)文本文件,需要一行一行讀取,然后處理并輸出,如果文本文件很大的時(shí)候,比如幾十M的時(shí)候,全部先讀入內(nèi)存肯定不是明智的做法,因此我們可以一邊讀取一邊處理,實(shí)現(xiàn)的代碼如下:
public static void main(String[] args) {
practice1();
try {
Thread.sleep(10000000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void practice1() {
Flowable
.create(new FlowableOnSubscribe<String>() {
@Override
public void subscribe(FlowableEmitter<String> emitter) throws Exception {
try {
FileReader reader = new FileReader("test.txt");
BufferedReader br = new BufferedReader(reader);
String str;
while ((str = br.readLine()) != null && !emitter.isCancelled()) {
while (emitter.requested() == 0) {
if (emitter.isCancelled()) {
break;
}
}
emitter.onNext(str);
}
br.close();
reader.close();
emitter.onComplete();
} catch (Exception e) {
emitter.onError(e);
}
}
}, BackpressureStrategy.ERROR)
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.newThread())
.subscribe(new Subscriber<String>() {
@Override
public void onSubscribe(Subscription s) {
mSubscription = s;
s.request(1);
}
@Override
public void onNext(String string) {
System.out.println(string);
try {
Thread.sleep(2000);
mSubscription.request(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
public void onError(Throwable t) {
System.out.println(t);
}
@Override
public void onComplete() {
}
});
}
運(yùn)行的結(jié)果便是:

好了,本次的教程就到這里了,謝謝大家捧場(chǎng)!下節(jié)見,敬請(qǐng)期待!
(PS: 我這么用心的寫文章, 你們也不給個(gè)贊嗎?)