版權(quán)聲明,本文來(lái)自門心叼龍的博客,屬于原創(chuàng)內(nèi)容,轉(zhuǎn)載請(qǐng)注明出處。https://blog.csdn.net/geduo_83/article/details/89929354
在前面兩篇文章中我們主要講了RxJava在實(shí)戰(zhàn)開(kāi)發(fā)中的一些應(yīng)用案例,這些案例都是公司項(xiàng)目中的實(shí)戰(zhàn)應(yīng)用,項(xiàng)目在五一的時(shí)候已經(jīng)上線了,通過(guò)先入為主的方式直接上案例,然后在結(jié)合理論在進(jìn)行學(xué)習(xí),這樣我們的理解就會(huì)更加的深刻,通過(guò)前面兩篇文章的介紹,大家已經(jīng)掌握了RxJava的一些基本使用,這篇文章我們主要介紹RxJava的高級(jí)應(yīng)用: 線程調(diào)度。
我們知道,響應(yīng)式編程是通過(guò)異步和數(shù)據(jù)流來(lái)構(gòu)建事物關(guān)系的編程模型,后臺(tái)處理數(shù)據(jù),前臺(tái)響應(yīng)數(shù)據(jù),而RxJava是響應(yīng)式編程在Java語(yǔ)言中拓展庫(kù),它以觀察者模式為核心,通過(guò)強(qiáng)大的操作符,對(duì)事件中的信息進(jìn)行操作轉(zhuǎn)換,并可以靈活的實(shí)現(xiàn)線程調(diào)度的一個(gè)框架,隨著業(yè)務(wù)邏輯的越來(lái)越復(fù)雜,使我們的代碼依然保持簡(jiǎn)潔,正是由于這些優(yōu)勢(shì),深受廣大開(kāi)發(fā)者的追捧。
RxJava的最大優(yōu)勢(shì)之一是它能夠輕松地在各種線程上安排工作和處理結(jié)果,我們?cè)趯?shí)際開(kāi)發(fā)中,一個(gè)RxJava調(diào)用能不能多次操作observeOn和subscribeOn?observeOn和subscribeOn的先后調(diào)用順序?qū)€程切換有沒(méi)有影響?Schedulers.io()和Schedulers.newThread()到底有何區(qū)別?通過(guò)本文的學(xué)習(xí)這些問(wèn)題將逐一揭開(kāi)他們的神秘面紗,避免線程調(diào)度中出現(xiàn)的一些常見(jiàn)問(wèn)題,因?yàn)榕c線程相關(guān)的錯(cuò)誤非常難以追蹤。
RxJava線程調(diào)度
RxJava中的線程調(diào)度是在Scheduler的幫助下完成的。Scheduler可以被認(rèn)為是管理1個(gè)或多個(gè)線程的線程池。每當(dāng)Scheduler需要執(zhí)行任務(wù)時(shí),它將從其池中獲取一個(gè)線程并在該線程中運(yùn)行該任務(wù)。
讓我們總結(jié)一下可用的Scheduler類型及其常見(jiàn)用途:
1.Schedulers.io()由無(wú)限制的線程池支持。它用于非CPU密集型I / O類型工作,包括與文件系統(tǒng)的交互,執(zhí)行網(wǎng)絡(luò)調(diào)用,數(shù)據(jù)庫(kù)交互等。此線程池旨在用于異步執(zhí)行阻塞IO,該線程池中的線程可以重復(fù)使用的。
2.Schedulers.computation()由有限線程池支持,其大小可達(dá)處理器的數(shù)量。它用于計(jì)算或CPU密集型工作,例如調(diào)整圖像大小,處理大型數(shù)據(jù)集等。
3.Schedulers.newThread()為每個(gè)安排的工作任務(wù)創(chuàng)建一個(gè)新線程,這個(gè)調(diào)度程序很昂貴,因?yàn)槊看味紩?huì)生成新線程,并且不會(huì)重復(fù)使用。
4.Schedulers.from(Executor executor)創(chuàng)建并返回由指定執(zhí)行程序支持的自定義調(diào)度程序。要限制線程池中同時(shí)線程的數(shù)量,請(qǐng)使用Scheduler.from(Executors.newFixedThreadPool(n))。這保證了如果在所有線程都被占用時(shí)調(diào)度任務(wù),它將排隊(duì)。池中的線程將一直存在,直到它被明確關(guān)閉。
5.主線程或AndroidSchedulers.mainThread()由RxAndroid擴(kuò)展庫(kù)提供給RxJava。主線程(也稱為UI線程)是用戶交互發(fā)生的地方。應(yīng)該注意不要重載此線程以防止應(yīng)用程序無(wú)響應(yīng)(ANR)對(duì)話框。
6.Schedulers.single() 是RxJava 2中的新增功能。此調(diào)度程序由單個(gè)線程支持,該線程按請(qǐng)求的順序依次執(zhí)行任務(wù)。
7.Schedulers.trampoline()由參與的工作線程之一以FIFO(先進(jìn)先出)方式執(zhí)行任務(wù)。它通常在實(shí)現(xiàn)遞歸時(shí)使用,以避免增加調(diào)用堆棧。
警告:小心使用無(wú)界線程調(diào)度:如Schedulers.io()和Schedulers.newThread()
簡(jiǎn)單的例子
讓我們從一個(gè)基本的RxJava代碼開(kāi)始,其中Observable<String>發(fā)出一個(gè)字符串并計(jì)算每個(gè)字符串的長(zhǎng)度,我們將在以下部分中以此示例為基礎(chǔ)。
Observable.just("long", "longer", "longest")
.map(String::length)
.subscribe(length -> System.out.println("item length " + length));
執(zhí)行時(shí),將打?。?/p>
item length 4
item length 6
item length 7
現(xiàn)在,讓我們通過(guò)在doOnNext()中打印出線程信息來(lái)看看這項(xiàng)工作正在進(jìn)行的線程,這是一個(gè)為每個(gè)發(fā)出的的數(shù)據(jù)執(zhí)行的操作。
Observable.just("long", "longer", "longest")
.doOnNext(c -> System.out.println("processing item on thread " + Thread.currentThread().getName()))
.map(String::length)
.subscribe(length -> System.out.println("item length " + length));
執(zhí)行時(shí),將打?。?/p>
processing item on thread main
item length 4
processing item on thread main
item length 6
processing item on thread main
item length 7
所以這個(gè)流是在主線程上發(fā)出和處理的,因?yàn)樯厦娴拇a塊位于我的類的main方法中
在后臺(tái)線程上工作
通常將某些工作委托給后臺(tái)線程是有意義的,
subscribeOn運(yùn)算符告訴源Observable發(fā)出哪個(gè)線程并將項(xiàng)目一直推送到Observer,因此它會(huì)影響上游和下游的所有操作, 將subscribeOn放在Observable操作符鏈中的哪個(gè)位置其實(shí)并不重要。
關(guān)于我們的Observable要記住的事情是:
subscribeOn()這僅影響Observable訂閱時(shí)使用的線程,并且它將保留在下游
- 1.Observable它的工作是被動(dòng)的,只有訂閱subscribe發(fā)生后它才開(kāi)始工作,并且每訂閱一次,工作就再次進(jìn)行一次。
- 2.subscribeOn()指定工作執(zhí)行所在的線程池,它的位置無(wú)關(guān)緊要,它可以在流的任何位置,如果流中有多個(gè)實(shí)例subscribeOn,則只有第一個(gè)具有實(shí)際效果。
- 3.數(shù)據(jù)的處理工作和數(shù)據(jù)的接收工作是在同一線程中完成的,可以使用observeOn()進(jìn)行更改線程。
使用subscribeOn()
Observable.just("long", "longer", "longest")
.doOnNext(c -> System.out.println("processing item on thread " + Thread.currentThread().getName()))
.subscribeOn(Schedulers.newThread())
.map(String::length)
.subscribe(length -> System.out.println("item length " + length));
讓我們?cè)趍ain方法中運(yùn)行更新的代碼示例,執(zhí)行時(shí),這將不會(huì)打印任何內(nèi)容
這是因?yàn)閙ain方法在后臺(tái)線程返回結(jié)果之前完成執(zhí)行。為了解決這個(gè)問(wèn)題,我們將主方法保持活動(dòng)3秒鐘?以便有足夠長(zhǎng)的時(shí)間讓我們有機(jī)會(huì)在后臺(tái)線程上處理數(shù)據(jù)。
public static void main(String[] args) throws InterruptedException {
Observable.just("long", "longer", "longest")
.doOnNext(c -> System.out.println("processing item on thread " + Thread.currentThread().getName()))
.subscribeOn(Schedulers.newThread())
.map(String::length)
.subscribe(length -> System.out.println("item length " + length + "received on " + Thread.currentThread().getName()));
Thread.sleep(3000);
}
執(zhí)行時(shí),將打?。?/p>
processing item on thread RxNewThreadScheduler-1
item length 4 received on RxNewThreadScheduler-1
processing item on thread RxNewThreadScheduler-1
item length 6 received on RxNewThreadScheduler-1
processing item on thread RxNewThreadScheduler-1
item length 7 received on RxNewThreadScheduler-1
后臺(tái)線程工作的結(jié)果在同一個(gè)線程RxNewThreadScheduler-1上返回。
執(zhí)行網(wǎng)絡(luò)/ IO /計(jì)算任務(wù)時(shí),使用后臺(tái)調(diào)度程序至關(guān)重要。如果沒(méi)有subscribeOn(),您的代碼將使用調(diào)用程序線程來(lái)執(zhí)行操作,從而導(dǎo)致Observable阻塞
理解observeOn()
observeOn()指定下游運(yùn)算所在的線程
正如我們?cè)谏厦婵吹降哪菢?,subscribeOn()指示源Observable從哪個(gè)線程發(fā)出數(shù)據(jù),這個(gè)線程將把數(shù)據(jù)流一直推到我們的Observer。但是如果遇到observeOn()鏈中的任何位置,它將使用observeOn所指定的線程來(lái)操作的后續(xù)切換和數(shù)據(jù)流推送。
通常Android中的觀察線程是主UI線程AndroidSchedulers.mainThread()。這需要RxAndroid擴(kuò)展庫(kù)到RxJava。
讓我們修改我們的示例代碼以執(zhí)行后臺(tái)工作,Schedulers.newThread()然后切換到AndroidSchedulers.mainThread()。
使用observeOn()
Observable.just("long", "longer", "longest")
.doOnNext(c -> System.out.println("processing item on thread " + Thread.currentThread().getName()))
.subscribeOn(Schedulers.newThread())
.map(String::length)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(length -> System.out.println("item length " + length + " received on thread " + Thread.currentThread().getName()));
執(zhí)行時(shí),我們將看到主線程現(xiàn)在收到結(jié)果
processing item on thread RxNewThreadScheduler-1
processing item on thread RxNewThreadScheduler-1
processing item on thread RxNewThreadScheduler-1
item length 4 received on thread main
item length 6 received on thread main
item length 7 received on thread main
異步工作
雖然RxJava被稱為使用可觀察序列組成異步和基于事件的程序的庫(kù),但它可以同步執(zhí)行許多有用的任務(wù)。例如,map(String::length)上面使用相同的線程RxNewThreadScheduler-1處理每個(gè)項(xiàng)目,順序保留相同的順序。
簡(jiǎn)單地subscribeOn()在Observable鏈的開(kāi)頭使用意味著該過(guò)程仍然在單個(gè)線程上運(yùn)行并且在下游同步發(fā)送事件。然而,當(dāng)你開(kāi)始在不同的線程或使用操作符,如不同的流相結(jié)合observeOn(),interval(),delay(),你可觀察鏈不再是同步的。
現(xiàn)在,讓我們看看如何修改上面的示例,以便每個(gè)發(fā)出的事件同時(shí)由一個(gè)單獨(dú)的線程處理。
介紹flatMap()
flatMap()包裝了被發(fā)射的每一個(gè)項(xiàng)目Observable,讓你自己RxJava操作符,包括使用分配一個(gè)新的計(jì)劃subscribeOn()來(lái)處理這些操作。一旦所有的項(xiàng)目中flatMap()都已經(jīng)被處理,個(gè)別Observables的然后合并到一個(gè)單一的Observable沒(méi)有特定的順序。
為了使事情變得更加真實(shí),讓我們假裝每個(gè)項(xiàng)目的轉(zhuǎn)換最多需要3秒鐘才能完成。以下兩件事應(yīng)該成立:
- 1.每個(gè)項(xiàng)目都由自己的線程處理
- 2.由于處理每個(gè)項(xiàng)目所需的隨機(jī)時(shí)間,因此無(wú)法保證完成的項(xiàng)目的順序
使用 flatMap()
public static void main(String[] args) throws InterruptedException {
Observable.just("long", "longer", "longest")
.flatMap(v ->
performLongOperation(v)
.doOnNext(s -> System.out.println("processing item on thread " + Thread.currentThread().getName()))
.subscribeOn(Schedulers.newThread()))
.subscribe(length -> System.out.println("received item length " + length + " on thread " + Thread.currentThread().getName()));
Thread.sleep(10000);
}
/**
* Returns length of each param wrapped into an Observable.
*/
protected static Observable<Integer> performLongOperation(String v) {
Random random = new Random();
try {
Thread.sleep(random.nextInt(3) * 1000);
return Observable.just(v.length());
} catch (InterruptedException e) {
e.printStackTrace();
}
return null;
}
這將導(dǎo)致以下輸出:
processing item on thread RxNewThreadScheduler-3
processing item on thread RxNewThreadScheduler-1
processing item on thread RxNewThreadScheduler-2
received item length 7 on thread RxNewThreadScheduler-3
received item length 4 on thread RxNewThreadScheduler-1
received item length 6 on thread RxNewThreadScheduler-2
請(qǐng)注意:
- 1.每個(gè)項(xiàng)目由一個(gè)單獨(dú)的線程處理。
- 2.轉(zhuǎn)換后元素的順序是隨機(jī)的。
如果您需要保留結(jié)果項(xiàng)的順序怎么辦?
介紹concatMap()
concatMap()類似flatMap()但保證處理的項(xiàng)目的順序與原始排放的順序相同。
使用concatMap()
public static void main(String[] args) throws InterruptedException {
Observable.just("long", "longer", "longest")
.concatMap(v ->
performLongOperation(v)
.doOnNext(s -> System.out.println("processing item on thread " + Thread.currentThread().getName()))
.subscribeOn(Schedulers.newThread()))
.subscribe(length -> System.out.println("received item length " + length + " on thread " + Thread.currentThread().getName()));
Thread.sleep(10000);
}
這導(dǎo)致以下輸出:
processing item on thread RxNewThreadScheduler-1
received item length 4 on thread RxNewThreadScheduler-1
processing item on thread RxNewThreadScheduler-2
received item length 6 on thread RxNewThreadScheduler-2
processing item on thread RxNewThreadScheduler-3
received item length 7 on thread RxNewThreadScheduler-3
請(qǐng)注意,項(xiàng)目的返回順序與原始流中的順序相同
subscribeOn()的陷阱
如上所示,subscribeOn()更改我們Observable發(fā)出和轉(zhuǎn)換的線程。在沒(méi)有observeOn()的情況下,流處理的結(jié)果被發(fā)送到完成工作的線程。例如,如果我們有subscribeOn(Schedulers.computation())和observeOn()沒(méi)有指定,結(jié)果也會(huì)被分派到Computation線程。
將subscribeOn()操作符放在鏈中的位置并不重要,它仍然表示將在其上發(fā)出Observable的線程。
如果subscribeOn()在鏈中指定了多個(gè)RxJava操作符,則只使用第一個(gè)運(yùn)算符,除非在flatMap()中使用subscribeOn(),否則將忽略后面的運(yùn)算符,如上所示。
這是一個(gè)例子:
使用多個(gè)subscribeOn()
Observable.just("long", "longer", "longest")
.doOnNext(c -> System.out.println("processing item on thread " + Thread.currentThread().getName()))
.subscribeOn(Schedulers.computation())
.subscribeOn(Schedulers.newThread())
.map(String::length)
.subscribe(length -> System.out.println("item length " + length + " received on " + Thread.currentThread().getName()));
這將導(dǎo)致以下輸出:
processing item on thread RxComputationThreadPool-1
item length 4 received on RxComputationThreadPool-1
processing item on thread RxComputationThreadPool-1
item length 6 received on RxComputationThreadPool-1
processing item on thread RxComputationThreadPool-1
item length 7 received on RxComputationThreadPool-1
請(qǐng)注意,Schedulers.computation()上面的線程池Schedulers.newThread()在從未使用過(guò)的情況下完成了工作。這是因?yàn)橛?jì)算調(diào)度程序首先列出,所有后續(xù)subscribeOn()運(yùn)算符都被忽略。
默認(rèn)調(diào)度程序
一些庫(kù)在subscribeOn()內(nèi)部指定以強(qiáng)制執(zhí)行后臺(tái)工作的線程。例如,默認(rèn)情況下,Observable.delay()RxJava庫(kù)將在Computation上發(fā)出Scheduler。subscribeOn()您在其上指定的任何內(nèi)容都將無(wú)效。但是您可以為該運(yùn)算符使用重載版本的工廠方法,而不是傳遞您選擇的自定義Scheduler。
@SchedulerSupport("io.reactivex:computation")
public final Observable<T> delay(long delay, TimeUnit unit) {
return this.delay(delay, unit, Schedulers.computation(), false);
}
@SchedulerSupport("custom")
public final Observable<T> delay(long delay, TimeUnit unit, Scheduler scheduler) {
return this.delay(delay, unit, scheduler, false);
}
提示: 當(dāng)您使用運(yùn)算符時(shí),編輯器可以向您發(fā)出警告,例如delay()不覆蓋其默認(rèn)調(diào)度程序
這也意味著當(dāng)您在使用時(shí)使用依賴于調(diào)度程序的運(yùn)算符(如delay(),interval()等)時(shí)subscribeOn(),您可能會(huì)在沒(méi)有意識(shí)到的情況下生成(但不使用)線程。始終查看這些運(yùn)算符的Javadoc以確保最佳使用。特別要注意@SchedulerSupport注釋
onError()
最后,當(dāng)subscribeOn()使用但沒(méi)有使用onError(),如果發(fā)生錯(cuò)誤,它將被拋出訂閱的調(diào)度程序線程,但錯(cuò)誤堆棧跟蹤將沒(méi)有引用您訂閱的位置。這將使調(diào)試非常困難。要避免此問(wèn)題,請(qǐng)使用onError()。
observeOn()的陷阱
重要的是要記住,與事物的subscribeOn()放置不同observeOn()。切換調(diào)度程序observeOn()適用于所有下游操作符。
例如,在以下示例中由于observeOn()放置map(String::length)而filter(length -> length == 6)將在主線程上執(zhí)行。這真的是有意的嗎?
Observable.just("long", "longer", "longest")
.subscribeOn(Schedulers.computation())
.observeOn(AndroidSchedulers.mainThread())
.map(String::length)
.filter(length -> length == 6)
.subscribe(length -> System.out.println("item length " + length));
放置observeOn()操作符的位置要小心,因?yàn)樗鼤?huì)更改執(zhí)行工作的調(diào)度程序,在大多數(shù)情況下,您可能希望延遲切換到觀察線程,直到Rx鏈的最后。
例如,讓我們看看下面的RxJava鏈,該鏈進(jìn)行HTTP網(wǎng)絡(luò)調(diào)用:
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.map(response -> response.body().string())
沒(méi)有理由在observeOn()操作符上方應(yīng)用map()操作員。實(shí)際上,這段代碼會(huì)導(dǎo)致NetworkOnMainThreadException!我們不想在主線程上讀取HTTP響應(yīng) - 它應(yīng)該在我們切換回主線程之前完成:
.subscribeOn(Schedulers.io())
.map(response -> response.body().string())
.observeOn(AndroidSchedulers.mainThread())
警惕多重 observeOn()
您可以擁有多個(gè)observeOn()操作符。當(dāng)在下游執(zhí)行時(shí),observeOn()下面的每個(gè)都將覆蓋上面的操作符。
這是一個(gè)例子:
使用多個(gè) observeOn()
Observable.just("long", "longer", "longest")
.doOnNext(s -> System.out.println("first doOnNext: processing item on thread " + Thread.currentThread().getName()))
.observeOn(Schedulers.computation())
.map(String::toString)
.doOnNext(s -> System.out.println("second doOnNext: processing item on thread " + Thread.currentThread().getName()))
.observeOn(Schedulers.io())
.map(String::toString)
.subscribeOn(Schedulers.newThread())
.map(String::length)
.subscribe(length -> System.out.println("received item length " + length + " on thread " + Thread.currentThread().getName()));
輸出如下:
first doOnNext: processing item on thread RxNewThreadScheduler-1
first doOnNext: processing item on thread RxNewThreadScheduler-1
first doOnNext: processing item on thread RxNewThreadScheduler-1
second doOnNext: processing item on thread RxComputationThreadPool-1
second doOnNext: processing item on thread RxComputationThreadPool-1
second doOnNext: processing item on thread RxComputationThreadPool-1
received item length 4 on thread RxCachedThreadScheduler-1
received item length 6 on thread RxCachedThreadScheduler-1
received item length 7 on thread RxCachedThreadScheduler-1
最后,我建議你盡可能避免這種復(fù)雜性。這樣做將使將來(lái)調(diào)試和維護(hù)此代碼變得更加容易。