我十分推薦在項目中使用RxJava 2,它通過使用可觀察序列來編寫異步程序和切換線程,提供了響應(yīng)式的變成風(fēng)格,相比android推出的handler和asynctask,RxJava 2可以讓代碼更加簡潔明晰。
官方網(wǎng)站上對RxJava的定義是:
A library for composing asynchronous and event-based programs using observable sequences for the Java VM.
強調(diào)了“異步”,貌似跟多線程沒有關(guān)系哦!的確是這樣的,RxJava 2默認(rèn)并不支持多線程。好,用代碼說服你:
Observable.just(1, 6, 9)
.doOnNext(object : Consumer<Int> {
@Throws(Exception::class)
override fun accept(integer: Int?) {
Log.i(TAG, "Emitting item on: " + currentThread().name + ", value: $integer")
}
})
.map(object : Function<Int, Int> {
override fun apply(p0: Int): Int {
Log.i(TAG,"Processing item on: " + currentThread().name + ", value: $p0")
return p0!! * 2
}
})
.subscribeWith(object : DisposableObserver<Int>() {
override fun onNext(@NonNull integer: Int) {
Log.i(TAG,"Consuming item on: " + currentThread().name + ", value: $integer")
}
override fun onError(@NonNull e: Throwable) {}
override fun onComplete() {}
})
/*
Emitting item on: main, value: 1
Processing item on: main, value: 1
Consuming item on: main, value: 2
Emitting item on: main, value: 6
Processing item on: main, value: 6
Consuming item on: main, value: 12
Emitting item on: main, value: 9
Processing item on: main, value: 9
Consuming item on: main, value: 18
*/
根據(jù)注釋里的輸出結(jié)果可以推斷,RxJava 2的操作是運行在當(dāng)前的主線程的,是會被阻塞的。
你可能會問doOnNext()是作什么用的。它只是一個提供副作用的操作符,可以脫離observable鏈而執(zhí)行一些不純的操作。
小試牛刀:多線程
為了理解RxJava 2如何切換線程,你必須對RxJava 2的三個重要的操作符熟悉:Schedulers、observeOn和subscribeOn。
下面舉一個多線程的例子:加入從網(wǎng)絡(luò)上獲取一個書籍Book的列表信息,并在UI線程先是出來,進了RxJava 2坑的同學(xué)很快都會寫出下面的代碼:
getBooks().subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribeWith(object : DisposableObserver<Int>() {
override fun onNext(@NonNull integer: Int) {
// You can access your Book objects here
}
override fun onError(@NonNull e: Throwable) {
// Handler errors here
}
override fun onComplete() {
// All your book objects have been fetched. Done!
}
})
有沒有很簡潔?getBooks()方法執(zhí)行網(wǎng)絡(luò)操作并返回一個Book的列表。網(wǎng)絡(luò)操作是一個很耗時的操作,我們使用subscribeOn()方法使網(wǎng)絡(luò)操作在Schedulers.io()線程中進行,然后使用observeOn()方法指定消費者在Schedulers.mainThread()主線程執(zhí)行操作。
擁抱調(diào)度器:Schedulers
你可以把Schedulers認(rèn)為是一個執(zhí)行不同任務(wù)的線程池。如果你想在一個線程中執(zhí)行一個任務(wù),那么需要挑選一個合適的調(diào)度器Schedulers。RxJava 2提供了幾種不同類型的調(diào)度器Schedulers,如果你挑選了不合適的Schedulers,那么你的代碼就不是最優(yōu)的,下面看下這幾個調(diào)度器Schedulers:
- Schedulers.io。通常用來執(zhí)行一些非即CPU密集型的操作,比如讀寫文件、網(wǎng)絡(luò)操作、讀寫數(shù)據(jù)庫等等。這個調(diào)度器沒有上限,為了滿足需要,它的線程池的數(shù)量可以增加。
- Schedulers.computation()。這個調(diào)度器常用來執(zhí)行CPU密集型的操作,比如大量數(shù)據(jù)集的計算、圖片處理等等。它的線程池的數(shù)量是有線的。由于此調(diào)度器只適合于CPU密集型任務(wù),所以我們希望限制線程的數(shù)量,這樣它們就不會在CPU時間之間相互爭斗,從而使自己餓死。
- Schedulers.newThread()。每次使用這個調(diào)度器的時候,都會完全新建一個線程來執(zhí)行分配的任務(wù)。它不會使用線程池,當(dāng)然也不會享受線程池帶來的好處。線程的創(chuàng)建和銷毀都很昂貴,因此您應(yīng)該非常小心,不要濫用過多的線程生成導(dǎo)致嚴(yán)重的系統(tǒng)減速和內(nèi)存錯誤。理想情況下,您將很少使用此調(diào)度器,主要用于啟動一個完全獨立的線程來執(zhí)行長時間運行、隔離的任務(wù)。
- Schedulers.single()。這個是RxJava 2新引進的調(diào)度器,在RxJava 1中不存在的。它是一個單獨的線程,使用順序的方式執(zhí)行任務(wù)。如果你在后臺有很多任務(wù)要執(zhí)行,但是一次只能執(zhí)行一個,這種情況下使用這個調(diào)度器是最合適的。
- Schedulers.from(Executor executor)。這個方法可以讓你用自己的Executor來創(chuàng)建自定義的Scheduler。 假設(shè),你想限制并行網(wǎng)絡(luò)操作在你的應(yīng)用程序被調(diào)用的數(shù)量,你可以創(chuàng)建一個定制的調(diào)度器并固定線程池大小,Scheduler.from(Executors.newFixedThreadPool(n)),并在代碼中網(wǎng)絡(luò)相關(guān)的Observables使用它。
- AndroidSchedulers.mainThread()。這是一個特殊的調(diào)度器,在標(biāo)準(zhǔn)的Rxjava庫中找不到它,它存在于RxAndroid庫。它專門為android程序設(shè)計,在UI主線程執(zhí)行UI相關(guān)的操作。默認(rèn)情況下,它會在與應(yīng)用程序主線程關(guān)聯(lián)的looper中執(zhí)行隊列任務(wù),但是還有其他的特殊情況,允許我們使用像AndroidSchedulers.from(Looper looper)這樣的api來使用任何Looper。
注意:在使用由無邊界限制的線程池(如Schedulers.io())支持的調(diào)度程序時要小心,因為總是存在無限增長線程池和大量線程泛濫的風(fēng)險。
理解observeOn和subscribeOn
相信你對Rxjava提供的幾種不同的調(diào)度器已經(jīng)有所理解,那么就要理解下observeOn和subscribeOn這兩個重要的操作符了。
subscribeOn
這個操作符指定了上游的源觀察者釋放元素操作所在的線程。如果有一串觀察者,那么源觀察者總是位于頂部,在這里元素被生成。在前面章節(jié)中的第一個代碼段中我們沒有使用observeOn,你清楚地看到釋放操作是在UI主線程中進行的。如果observeOn指定了Schedulers.computation()調(diào)度器,那么上游的操作都會在comoutation線程執(zhí)行,如下面的代碼:
Observable.just(2, 3)
.doOnNext { Log.i(TAG,"Emitting item " + it + " on: " + currentThread().getName()) }
.subscribeOn(Schedulers.computation())
.map {
Log.i(TAG,"Mapping item " + it + " on: " + currentThread().getName())
it * it
}
.filter {
Log.i(TAG,"Filtering item " + it + " on: " + currentThread().getName())
it % 2 == 0
}
.subscribe { Log.i(TAG,"Consuming item " + it + " on: " + currentThread().getName()) }
/*
Emitting item 2 on: RxComputationThreadPool-1
Mapping item 2 on: RxComputationThreadPool-1
Filtering item 4 on: RxComputationThreadPool-1
Consuming item 4 on: RxComputationThreadPool-1
Emitting item 3 on: RxComputationThreadPool-1
Mapping item 3 on: RxComputationThreadPool-1
Filtering item 9 on: RxComputationThreadPool-1
* */
這段代碼中,我們沒有使用完整的DisposableSubscriber,因為我們不需要onError()和onComplete(),只處理onNext()一個單獨的消費者就足夠了。
在觀察鏈中,observeOn的位置沒有任何影響,如果你很好奇,你可以自己試下把上面代碼中的observeOn方法放到末尾(但是必須要在消費者即subscribe的前面)。
另一個重要的點是你不能在觀察鏈中多次使用observeOn,如果你那樣作的話,其實只有第一個observeOn即最接近源Observable的才會生效。
Observable.just(1, 2, 3, 4, 5, 6)
.subscribeOn(Schedulers.io())
.subscribeOn(Schedulers.computation())
.subscribeOn(Schedulers.newThread())
.doOnNext { it -> Log.i(TAG,"Emitting item " + it + " on: " + currentThread().getName()) }
.subscribe { it -> Log.i(TAG,"Consuming item " + it + " on: " + currentThread().getName()) }
/*
Emitting item 1 on: RxCachedThreadScheduler-1
Consuming item 1 on: RxCachedThreadScheduler-1
Emitting item 2 on: RxCachedThreadScheduler-1
Consuming item 2 on: RxCachedThreadScheduler-1
Emitting item 3 on: RxCachedThreadScheduler-1
Consuming item 3 on: RxCachedThreadScheduler-1
Emitting item 4 on: RxCachedThreadScheduler-1
Consuming item 4 on: RxCachedThreadScheduler-1
Emitting item 5 on: RxCachedThreadScheduler-1
Consuming item 5 on: RxCachedThreadScheduler-1
Emitting item 6 on: RxCachedThreadScheduler-1
Consuming item 6 on: RxCachedThreadScheduler-1
* */
通過看注釋,你已經(jīng)知道上面代碼中只有subscribeOn(Schedulers.io())是生效的,其余無效。為什么呢?我簡單暴力地說:Rxjava是鏈?zhǔn)讲僮?,自上而下,下游的調(diào)度器是是上游訂閱者指定的,那么要找到這個調(diào)度器就要自下而上回溯,自然就找到了距離源Observable最近的subscribeOn指定的調(diào)度器才真正起作用。不知道你有沒有搞懂。
observeOn()
observeOn可以很方便地切換線程,指定了消費者所在的線程。
Observable.just(2, 3)
.doOnNext { Log.i(TAG,"Emitting item " + it + " on: " + currentThread().getName()) }
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.computation())
.map {
Log.i(TAG,"Mapping item " + it + " on: " + currentThread().getName())
it * it
}
.observeOn(Schedulers.newThread())
.filter {
Log.i(TAG,"Filtering item " + it + " on: " + currentThread().getName())
it % 2 == 0
}
.observeOn(AndroidSchedulers.mainThread())
.subscribe { Log.i(TAG,"Consuming item " + it + " on: " + currentThread().getName()) }
/*
Emitting item 2 on: RxCachedThreadScheduler-1
Emitting item 3 on: RxCachedThreadScheduler-1
Mapping item 2 on: RxComputationThreadPool-1
Mapping item 3 on: RxComputationThreadPool-1
Filtering item 4 on: RxNewThreadScheduler-1
Filtering item 9 on: RxNewThreadScheduler-1
Consuming item 4 on: main
* */
通過上面的代碼,你應(yīng)該知道可以多次調(diào)用observeOn操作符,指定后面的操作所在的線程。這段代碼還有個不同的地方是Emitting、Mapping和Filtering使用不同的調(diào)度器,那么總是先執(zhí)行完Emitting,再執(zhí)行完Mapping,然后執(zhí)行完Filtering,最后執(zhí)行Consuming,而前面的代碼中Emitting、Mapping和Filtering使用相同的調(diào)度器,那么總是執(zhí)行完一個完整的事件(即Emitting、Mapping、Filtering和Consuming),再執(zhí)行下一個完整的事件。
Rxjava有很多的東西可講的,我也在不斷學(xué)習(xí)中。希望多多交流,有錯誤的地方也希望留言指正,我的聯(lián)系方式:owl@violetpersimmon.com