相關(guān)概念
Android多線程編程的原則:
- 不要阻塞UI線程;
- 不要在UI線程之外訪問(wèn)UI組件;
ReactiveX
- Reactive Extensions的縮寫,一般簡(jiǎn)寫為Rx;
- 是一個(gè)使用可觀察數(shù)據(jù)流進(jìn)行異步編程的編程接口,ReactiveX結(jié)合了觀察者模式、迭代器模式和函數(shù)式編程的精華;
RxJava
- Reactive Extensions for the JVM:,RxJava就是ReactiveX在JVM平臺(tái)的實(shí)現(xiàn);
- 基于事件流的鏈?zhǔn)秸{(diào)用,進(jìn)行耗時(shí)任務(wù),線程切換,其本質(zhì)是一個(gè)異步操作庫(kù);
原理
- 基于觀察者模式, 一個(gè)方面的操作依賴于另一個(gè)方面的狀態(tài)變化,當(dāng)一個(gè)對(duì)象必須通知其他對(duì)象,又希望這個(gè)對(duì)象和其他被通知的對(duì)象是松散耦合的;
三個(gè)要素
- 被觀察者(Observable),觀察者(Subscriber),訂閱(subscribe);
被觀察者
1. Observable
- 可多次發(fā)送事件(onNext),直到 onComplete 或 onError 被調(diào)用結(jié)束訂閱;
- 不支持背壓:當(dāng)被觀察者快速發(fā)送大量數(shù)據(jù)時(shí),下游不會(huì)做其他處理,即使數(shù)據(jù)大量堆積,調(diào)用鏈也不會(huì)報(bào)MissingBackpressureException,消耗內(nèi)存過(guò)大只會(huì)OOM。(官方給出以1000個(gè)事件為分界線作為參考);
val observable = Observable.create(ObservableOnSubscribe<Int> {
it.onNext(1)
it.onNext(3)
it.onNext(5)
it.onComplete()
LjyLogUtil.d("${Thread.currentThread().name}_subscribe")
})
//Disposable通過(guò) dispose() ?方法來(lái)讓上游停?止?工作,達(dá)到「丟棄」的效果
var disposable: Disposable? = null
val observer = object : Observer<String> {
override fun onSubscribe(d: Disposable) {
//訂閱后發(fā)送數(shù)據(jù)之前, 回調(diào)這個(gè)方法,Disposable可用于取消訂閱
disposable = d
LjyLogUtil.d("${Thread.currentThread().name}_onSubscribe")
}
override fun onNext(t: String) {
LjyLogUtil.d("${Thread.currentThread().name}_onNext:$t")
}
override fun onError(e: Throwable) {
LjyLogUtil.d("${Thread.currentThread().name}_onError:${e.message}")
}
override fun onComplete() {
LjyLogUtil.d("${Thread.currentThread().name}_onComplete")
}
}
//rxJava的整體結(jié)構(gòu)是一條鏈,鏈的最上游是被觀察者observable,最下游是觀察者observer,
//鏈的中間各個(gè)節(jié)點(diǎn),既是其下游的observable,又是其上游的observer
observable
//subscribeOn: 切換起源 Observable 的線程,
// 當(dāng)多次調(diào)?用 subscribeOn() 的時(shí)候,只有最上?面的會(huì)對(duì)起源 Observable 起作?用,
// 原因subscribeOn底層通過(guò)新建observable實(shí)現(xiàn)
.subscribeOn(Schedulers.io())
.subscribeOn(AndroidSchedulers.mainThread())
//observeOn指定的是它之后的操作所在的線程,通過(guò)observeOn的多次調(diào)用,程序?qū)崿F(xiàn)了線程的多次切換
//影響范圍observeOn是它下面的每個(gè)observer,除非又遇到新的observeOn
.observeOn(Schedulers.io())
.map {
LjyLogUtil.d("map:${Thread.currentThread().name}")
"num_$it"
}
.observeOn(AndroidSchedulers.mainThread())
.subscribe(observer)
2. Flowable:
- 和 Observable 一樣, 且支持Reactive-Streams和背壓;
- 上游的被觀察者會(huì)響應(yīng)下游觀察者的數(shù)據(jù)請(qǐng)求,下游調(diào)用request(n)來(lái)告訴上游發(fā)送多少個(gè)數(shù)據(jù)。這樣避免了大量數(shù)據(jù)堆積在調(diào)用鏈上,使內(nèi)存一直處于較低水平;
var sub: Subscription? = null
Flowable.create(FlowableOnSubscribe<Int> {
it.onNext(1)
it.onNext(3)
it.onNext(5)
it.onComplete()
LjyLogUtil.d("${Thread.currentThread().name}_subscribe")
//使用create創(chuàng)建Flowable,需要指定背壓策略
}, BackpressureStrategy.BUFFER)
Flowable.range(0, 5)
.subscribe(object : Subscriber<Int> {
override fun onSubscribe(s: Subscription?) {
//當(dāng)訂閱后,會(huì)首先調(diào)用這個(gè)方法,其實(shí)就相當(dāng)于onStart(),
//傳入的Subscription s參數(shù)可以用于請(qǐng)求數(shù)據(jù)或者取消訂閱
LjyLogUtil.d("${Thread.currentThread().name}_onSubscribe start")
sub = s
sub?.request(1)
LjyLogUtil.d("${Thread.currentThread().name}_onSubscribe end")
}
override fun onNext(t: Int?) {
LjyLogUtil.d("${Thread.currentThread().name}_onNext:$t")
sub?.request(1)
}
override fun onError(t: Throwable?) {
LjyLogUtil.d("${Thread.currentThread().name}_onError:${t?.message}")
}
override fun onComplete() {
LjyLogUtil.d("${Thread.currentThread().name}_onComplete")
}
})
3. Single
- 單次發(fā)送事件(onSuccess、onError),發(fā)完即結(jié)束訂閱;
- 總是只發(fā)射一個(gè)值,或者一個(gè)錯(cuò)誤通知,而不是發(fā)射一系列的值(當(dāng)然就不存在背壓?jiǎn)栴});
- 一般一個(gè)接口只是一次請(qǐng)求一次返回,所以使用Single 與 Retrofit 配合是更為合理;
Single.just(1)
.map { "num_$it" }
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe { num ->
LjyLogUtil.d(num)
}
Single.create(SingleOnSubscribe<String> { emitter ->
LjyLogUtil.d("${Thread.currentThread().name}_subscribe")
emitter.onSuccess("str1")
emitter.onSuccess("str2")//錯(cuò)誤寫法,重復(fù)調(diào)用也不會(huì)處理,因?yàn)橹粫?huì)調(diào)用一次
}).subscribe(object : SingleObserver<String> {
override fun onSubscribe(d: Disposable) {
LjyLogUtil.d("${Thread.currentThread().name}_onSubscribe")
}
override fun onSuccess(t: String) {
LjyLogUtil.d("${Thread.currentThread().name}_onSuccess:$t")
}
override fun onError(e: Throwable) {
LjyLogUtil.d("${Thread.currentThread().name}_onError:${e.message}")
}
})
4. Completable
- 單次發(fā)送事件(onError、onComplete),發(fā)完即結(jié)束訂閱;
- 如果你的觀察者連onNext事件都不關(guān)心,可以使用Completable,它只有onComplete和onError兩個(gè)事件,要轉(zhuǎn)換成其他類型的被觀察者,也是可以使用toFlowable()、toObservable()等方法去轉(zhuǎn)換;
Completable.create { emitter ->
LjyLogUtil.d("${Thread.currentThread().name}_subscribe")
emitter.onComplete()//單一onComplete或者onError
}.subscribe(object : CompletableObserver {
override fun onSubscribe(d: Disposable) {
LjyLogUtil.d("${Thread.currentThread().name}_onSubscribe")
}
override fun onComplete() {
LjyLogUtil.d("${Thread.currentThread().name}_onComplete")
}
override fun onError(e: Throwable) {
LjyLogUtil.d("${Thread.currentThread().name}_onError:${e.message}")
}
})
5. Maybe
- 單次發(fā)送事件(onSuccess、onError、onComplete),發(fā)完即結(jié)束訂閱。相當(dāng)于 Completable 和 Single 結(jié)合;
- 如果可能發(fā)送一個(gè)數(shù)據(jù)或者不會(huì)發(fā)送任何數(shù)據(jù),這時(shí)候你就需要Maybe,它類似于Single和Completable的混合體;
- onSuccess和onComplete是互斥的存在;
Maybe.create(MaybeOnSubscribe<String> {
it.onSuccess("str1")
it.onComplete()
}).subscribe(object : MaybeObserver<String> {
override fun onSubscribe(d: Disposable) {
LjyLogUtil.d("${Thread.currentThread().name}_onSubscribe")
}
override fun onSuccess(t: String) {
LjyLogUtil.d("${Thread.currentThread().name}_onSuccess:$t")
}
override fun onError(e: Throwable) {
LjyLogUtil.d("${Thread.currentThread().name}_onError:${e.message}")
}
override fun onComplete() {
LjyLogUtil.d("${Thread.currentThread().name}_onComplete")
}
})
- 五種被觀察者可通過(guò)toObservable,toFlowable,toSingle,toCompletable,toMaybe相互轉(zhuǎn)換;
觀察者
val observer = object : Observer<Int> {
override fun onSubscribe(d: Disposable) {
LjyLogUtil.d("${Thread.currentThread().name}_onSubscribe")
}
override fun onNext(t: Int) {
LjyLogUtil.d("${Thread.currentThread().name}_onNext:$t")
}
override fun onError(e: Throwable) {
LjyLogUtil.d("${Thread.currentThread().name}_onError:${e.message}")
}
override fun onComplete() {
LjyLogUtil.d("${Thread.currentThread().name}_onComplete")
}
}
線程調(diào)度器(Schedulers)
- 指定 被觀察者(Observable)/ 觀察者(Observer)的工作線程,簡(jiǎn)化了異步操作;
- 默認(rèn)在創(chuàng)建自身的線程;
- 配合操作符 subscribeOn , observeOn 使用;
AndroidSchedulers.mainThread()
- 需要引用RxAndroid, 切換到UI線程(Android的主線程), 為Android開發(fā)定制;
Schedulers.io()
- 用于IO密集型任務(wù),如讀寫SD卡文件,查詢數(shù)據(jù)庫(kù),訪問(wèn)網(wǎng)絡(luò)等;
- 具有線程緩存機(jī)制,默認(rèn)是一個(gè)CacheThreadScheduler;
Schedulers.newThread()
- 為每一個(gè)任務(wù)創(chuàng)建一個(gè)新線程;
- 不具有線程緩存機(jī)制,雖然使用Schedulers.io的地方,都可以使用Schedulers.newThread,但是,Schedulers.newThread的效率沒(méi)有Schedulers.io高;
Schedulers.computation()
- 用于CPU 密集型計(jì)算任務(wù),即不會(huì)被 I/O 等操作限制性能的耗時(shí)操作,例如xml,json文件的解析,Bitmap圖片的壓縮取樣等,具有固定的線程池,大小為CPU的核數(shù)。不可以用于I/O操作,因?yàn)镮/O操作的等待時(shí)間會(huì)浪費(fèi)CPU。
Schedulers.trampoline()
- 在當(dāng)前線程立即執(zhí)行任務(wù),如果當(dāng)前線程有任務(wù)在執(zhí)行,則會(huì)將其暫停,等插入進(jìn)來(lái)的任務(wù)執(zhí)行完之后,再將未完成的任務(wù)接著執(zhí)行;
Schedulers.single()
- 擁有一個(gè)線程單例,所有的任務(wù)都在這一個(gè)線程中執(zhí)行,當(dāng)此線程中有任務(wù)執(zhí)行時(shí),其他任務(wù)將會(huì)按照先進(jìn)先出的順序依次執(zhí)行;
Scheduler.from(executor)
- 指定一個(gè)線程調(diào)度器,由此調(diào)度器來(lái)控制任務(wù)的執(zhí)行策略;
背壓(Backpressure)
- 背壓是指在異步場(chǎng)景中,被觀察者發(fā)送事件速度遠(yuǎn)快于觀察者的處理速度的情況下,一種告訴上游的被觀察者降低發(fā)送速度的策略;
- 支持背壓的被觀察者為Flowable;
- Android中很少用到,除非在線視頻流,直播等場(chǎng)景,當(dāng)畫面卡頓已取得的數(shù)據(jù)失效了,需要拋棄等;
背壓策略模式
BackpressureStrategy.MISSING
- 在此策略下,通過(guò)Create方法創(chuàng)建的Flowable相當(dāng)于沒(méi)有指定背壓策略,不會(huì)對(duì)通過(guò)onNext發(fā)射的數(shù)據(jù)做緩存或丟棄處理,需要下游通過(guò)背壓操作符處理
BackpressureStrategy.ERROR:
- 在此策略下,如果放入Flowable的異步緩存池中的數(shù)據(jù)超限了,則會(huì)拋出MissingBackpressureException異常;
BackpressureStrategy.BUFFER:
- 內(nèi)部維護(hù)了一個(gè)緩存池SpscLinkedArrayQueue,其大小不限,此策略下,如果Flowable默認(rèn)的異步緩存池滿了,會(huì)通過(guò)此緩存池暫存數(shù)據(jù),它與Observable的異步緩存池一樣,可以無(wú)限制向里添加數(shù)據(jù),不會(huì)拋出MissingBackpressureException異常,但會(huì)導(dǎo)致OOM;
- 當(dāng)緩存區(qū)大小存滿(默認(rèn)緩存區(qū)大小 = 128)、被觀察者仍然繼續(xù)發(fā)送下1個(gè)事件時(shí),將緩存區(qū)大小設(shè)置成無(wú)限大,被觀察者可無(wú)限發(fā)送事件 觀察者,但實(shí)際上是存放在緩存區(qū),但要注意內(nèi)存情況,防止出現(xiàn)OOM;
BackpressureStrategy.DROP
- 在此策略下,如果Flowable的異步緩存池滿了,會(huì)丟掉上游發(fā)送的數(shù)據(jù);
BackpressureStrategy.LATEST
- 與Drop策略一樣,如果緩存池滿了,會(huì)丟掉將要放入緩存池中的數(shù)據(jù),不同的是,不管緩存池的狀態(tài)如何,LATEST都會(huì)將最后一條數(shù)據(jù)強(qiáng)行放入緩存池中,來(lái)保證觀察者在接收到完成通知之前,能夠接收到Flowable最新發(fā)射的一條數(shù)據(jù);
- 即如果發(fā)送了150個(gè)事件,緩存區(qū)里會(huì)保存129個(gè)事件(第1-第128 + 第150事件);
RxJava 2.0內(nèi)部提供 封裝了背壓策略模式的方法
- 默認(rèn)采用BackpressureStrategy.ERROR模式;
onBackpressureBuffer()
onBackpressureDrop()
onBackpressureLatest()
Flowable.interval(1, TimeUnit.MILLISECONDS)
//添加背壓策略封裝好的方法,此處選擇Buffer模式,即緩存區(qū)大小無(wú)限制
.onBackpressureBuffer()
.observeOn(Schedulers.newThread())
.subscribe(subscriber)
冷熱流
Cold Observable
- subscribe時(shí)才會(huì)發(fā)射數(shù)據(jù);
- 常見(jiàn)的工廠方法提供的都是ColdObservable,包括just(),fromXX,create(),interval(),defer();
- 當(dāng)你有多個(gè)Subscriber的時(shí)候,他們的事件是獨(dú)立的,示例代碼如下:
val interval = Observable.interval(1, TimeUnit.SECONDS)
var disposable1: Disposable? = null
val observer1 = object : Observer<Long> {
override fun onSubscribe(d: Disposable) {
disposable1 = d
}
override fun onNext(t: Long) {
LjyLogUtil.d("觀察者1:$t")
}
override fun onError(e: Throwable) {
}
override fun onComplete() {
}
}
var disposable2: Disposable? = null
val observer2 = object : Observer<Long> {
override fun onSubscribe(d: Disposable) {
disposable1 = d
}
override fun onNext(t: Long) {
LjyLogUtil.d("觀察者2:$t")
}
override fun onError(e: Throwable) {
}
override fun onComplete() {
}
}
findViewById<Button>(R.id.btn_subscribe1)
.clicks().subscribe {
interval.subscribe(observer1)
}
findViewById<Button>(R.id.btn_dispose1)
.clicks().subscribe {
disposable1?.dispose()
}
findViewById<Button>(R.id.btn_subscribe2)
.clicks().subscribe {
interval.subscribe(observer2)
}
findViewById<Button>(R.id.btn_dispose2)
.clicks().subscribe {
disposable2?.dispose()
}
Hot Observable
- 對(duì)于Hot Observable的所有subscriber,他們會(huì)在同一時(shí)刻收到相同的數(shù)據(jù);
- 通常使用publish()操作符來(lái)將ColdObservable變?yōu)镠ot?;蛘?使用 Subjects 也是Hot Observable;
- 如果他開始傳輸數(shù)據(jù),你不主動(dòng)喊停(dispose()/cancel()),那么他就不會(huì)停,一直發(fā)射數(shù)據(jù),即使他已經(jīng)沒(méi)有Subscriber了;
val interval = Observable.interval(1, TimeUnit.SECONDS).publish()
var disposable: Disposable? = null
findViewById<Button>(R.id.btn_connect)
.clicks().subscribe {
disposable = interval.connect()
}
findViewById<Button>(R.id.btn_dispose)
.clicks().subscribe {
disposable?.dispose()
}
findViewById<Button>(R.id.btn_subscribe3)
.clicks().subscribe {
interval.subscribe {
LjyLogUtil.d("觀察者3:$it")
}
}
findViewById<Button>(R.id.btn_subscribe4)
.clicks().subscribe {
interval.subscribe {
LjyLogUtil.d("觀察者4:$it")
}
}
操作符
- 操作符很多,不用完全背下來(lái),瀏覽一遍,用時(shí)知道在哪找(關(guān)注+收藏本文[狗頭])即可
線程切換
- subscribeOn:指定被觀察者Observable的工作線程;
- observeOn:指定觀察者的observer工作線程;
observable
//subscribeOn: 切換起源 Observable 的線程,
// 當(dāng)多次調(diào)?用 subscribeOn() 的時(shí)候,只有最上?面的會(huì)對(duì)起源 Observable 起作?用,
// 原因subscribeOn底層通過(guò)新建observable實(shí)現(xiàn)
.subscribeOn(Schedulers.io())
.subscribeOn(AndroidSchedulers.mainThread())
//observeOn指定的是它之后的操作所在的線程,通過(guò)observeOn的多次調(diào)用,程序?qū)崿F(xiàn)了線程的多次切換
//影響范圍observeOn是它下面的每個(gè)observer,除非又遇到新的observeOn
.observeOn(Schedulers.io())
.map {
LjyLogUtil.d("map:${Thread.currentThread().name}")
"num_$it"
}
.observeOn(AndroidSchedulers.mainThread())
.subscribe(observer)
創(chuàng)建操作
基本創(chuàng)建
create
- 通過(guò)調(diào)用觀察者的方法從頭創(chuàng)建一個(gè)Observable;
Observable.create(ObservableOnSubscribe<Int> {
LjyLogUtil.d("${Thread.currentThread().name}_subscribe")
it.onNext(1)
it.onNext(3)
it.onNext(5)
it.onComplete()
})
快速創(chuàng)建
just
- 將對(duì)象或者對(duì)象集合轉(zhuǎn)換為一個(gè)會(huì)發(fā)射這些對(duì)象的Observable;
Observable.just(1, 2, 3, 4)
fromArray & fromIterable
- 將其它的對(duì)象或數(shù)據(jù)結(jié)構(gòu)轉(zhuǎn)換為Observable;
Observable.fromArray(arrayOf(1,2,3))
Observable.fromIterable(listOf(4,5,6))
never
- 創(chuàng)建的被觀察者不發(fā)送任何事件, 觀察者接收后什么都不調(diào)用;
Observable.never<Int>().subscribe(observer)
empty
- 創(chuàng)建的被觀察者僅發(fā)送Complete事件,直接通知完成, 觀察者接收后會(huì)直接調(diào)用onCompleted;
Observable.empty<Int>().subscribe(observer)
error
- 創(chuàng)建的被觀察者僅發(fā)送Error事件,直接通知異常, 觀察者接收后會(huì)直接調(diào)用onError;
Observable.error<Int>(RuntimeException()).subscribe(observer)
延遲創(chuàng)建
defer:
- 在觀察者訂閱之前不創(chuàng)建這個(gè)Observable,為每一個(gè)觀察者創(chuàng)建一個(gè)新的Observable;
var num=1
val observable=Observable.defer {Observable.just(num)}
num=2
observable.subscribe(observer)
timer
- 創(chuàng)建在一個(gè)指定的延遲之后發(fā)射單個(gè)數(shù)據(jù)的Observable;
//延時(shí)3秒后,發(fā)送一個(gè)整數(shù)0
Observable.timer(3, TimeUnit.SECONDS)
interval & intervalRange
- 創(chuàng)建一個(gè)定時(shí)發(fā)射整數(shù)序列的Observable;
//初始延時(shí)1秒,每3秒發(fā)一個(gè)自增整數(shù)
Observable.interval(1, 3, TimeUnit.SECONDS)
//初始延時(shí)2秒,后每1秒發(fā)一個(gè)從10開始的整數(shù),發(fā)5個(gè)(發(fā)到14)停止
Observable.intervalRange(10, 5, 2, 1, TimeUnit.SECONDS);
range & rangeLong
- 創(chuàng)建發(fā)射指定范圍的整數(shù)序列的Observable;
Observable.range(0, 5)
Observable.rangeLong(0, 5)
Repeat
- 創(chuàng)建重復(fù)發(fā)射特定的數(shù)據(jù)或數(shù)據(jù)序列的Observable;
//一直重復(fù)
Observable.fromArray(1, 2, 3, 4).repeat()
//重復(fù)發(fā)送5次
Observable.fromArray(1, 2, 3, 4).repeat(5)
//重復(fù)發(fā)送直到符合條件時(shí)停止重復(fù)
Observable.fromArray(1, 2, 3, 4).repeatUntil { false }
//
Observable.just(1, 2, 3, 4)
.repeatWhen {
it.flatMap { obj ->
if (obj is NumberFormatException) {
Observable.error(Throwable("repeatWhen終止"))
} else {
Observable.just(5, 6, 7)
}
}
}.subscribe(observer)
2. 變換操作:
map
- 映射,通過(guò)對(duì)序列的每一項(xiàng)都應(yīng)用一個(gè)函數(shù)變換Observable發(fā)射的數(shù)據(jù),實(shí)質(zhì)是對(duì)序列中的每一項(xiàng)執(zhí)行一個(gè)函數(shù),函數(shù)的參數(shù)就是這個(gè)數(shù)據(jù)項(xiàng);
Observable.just("1", "2", "3").map { it.toInt() }.subscribe(observer)
flatMap & concatMap
- 扁平映射,將Observable發(fā)射的數(shù)據(jù)變換為Observables集合,然后將這些Observable發(fā)射的數(shù)據(jù)平坦化的放進(jìn)一個(gè)單獨(dú)的Observable,可以認(rèn)為是一個(gè)將嵌套的數(shù)據(jù)結(jié)構(gòu)展開的過(guò)程;
//concatMap與flatMap的區(qū)別: concatMap是有序的,flatMap是無(wú)序的
Observable.just("A", "B", "C")
.flatMap { x ->
Observable.intervalRange(1, 3, 0, 1, TimeUnit.SECONDS).map { y ->
"($x,$y)"
}
}
Observable.just("A", "B", "C")
.concatMap { m ->
Observable.intervalRange(1, 3, 0, 1, TimeUnit.SECONDS).map { n ->
"($m,$n)"
}
}
groupBy
- 分組,將原來(lái)的Observable分拆為Observable集合,將原始Observable發(fā)射的數(shù)據(jù)按Key分組,每一個(gè)Observable發(fā)射一組不同的數(shù)據(jù);
Observable.just(
"Tiger",
"Elephant",
"Cat",
"Chameleon",
"Frog",
"Fish",
"Turtle",
"Flamingo"
)
.groupBy { it[0].uppercaseChar() }
.concatMapSingle { it.toList() }
.subscribe(observer4)
scan
- 掃描,對(duì)Observable發(fā)射的每一項(xiàng)數(shù)據(jù)應(yīng)用一個(gè)函數(shù),然后按順序依次發(fā)射這些值;
- 對(duì)發(fā)射的數(shù)據(jù)和上一輪發(fā)射的數(shù)據(jù)進(jìn)行函數(shù)處理,并返回的數(shù)據(jù)供下一輪使用,持續(xù)這個(gè)過(guò)程來(lái)產(chǎn)生剩余的數(shù)據(jù)流。其應(yīng)用場(chǎng)景有簡(jiǎn)單的累加計(jì)算,判斷所有數(shù)據(jù)的最小值等;
Observable.just(1, 2, 3, 4)
.scan { t1, t2 -> t1 + t2 }
.subscribe(observer)
buffer
- 緩存,定期從Observable收集數(shù)據(jù)到一個(gè)集合,然后把這些數(shù)據(jù)集合打包發(fā)射,而不是一次發(fā)射一個(gè)
Observable.just(1, 2, 3, 4, 5, 6, 7, 8)
.buffer(3)
.subscribe(observer5)
Window
- 窗口,定期將來(lái)自O(shè)bservable的數(shù)據(jù)分拆成一些Observable窗口,然后發(fā)射這些窗口,而不是每次發(fā)射一項(xiàng); 類似于Buffer,但Buffer發(fā)射的是數(shù)據(jù),Window發(fā)射的是Observable,每一個(gè)Observable發(fā)射原始Observable的數(shù)據(jù)的一個(gè)子集;
//window操作符和buffer操作符在功能上實(shí)現(xiàn)的效果是一樣的,但window操作符最大區(qū)別在于同樣是緩存一定數(shù)量的數(shù)據(jù)項(xiàng),
// window操作符最終發(fā)射出來(lái)的是新的事件流integerObservable,而buffer操作符發(fā)射出來(lái)的是新的數(shù)據(jù)流,
// 也就是說(shuō),window操作符發(fā)射出來(lái)新的事件流中的數(shù)據(jù)項(xiàng),還可以經(jīng)過(guò)Rxjava其他操作符進(jìn)行處理。
Observable.just(1, 2, 3, 4)
.window(2, 1)
.subscribe(observer6)
3. 過(guò)濾操作:
Filter
- 過(guò)濾,過(guò)濾掉沒(méi)有通過(guò)謂詞測(cè)試的數(shù)據(jù)項(xiàng),只發(fā)射通過(guò)測(cè)試的;
Observable.just(1, 2, 3, 4, 5)
.filter {
it % 2 == 0
}.subscribe(observer)
ofType
- 過(guò)濾特定類型的數(shù)據(jù);
Observable.just(1, 2.1, "3", 4L, 5.0)
.ofType(Int::class.java)
.subscribe(observer)
distinct & distinctUntilChanged
- 去重,過(guò)濾掉重復(fù)數(shù)據(jù)項(xiàng);
Observable.just(1, 2, 3, 4, 3, 2, 1)
.distinct()
// .distinctUntilChanged()//去掉相鄰連續(xù)重復(fù)數(shù)據(jù)
.subscribe(observer)
skip & skipLast
val observable = Observable.just(1, 2, 3, 4, 5, 6, 7, 8)
// 跳過(guò), 跳過(guò)前面的若干項(xiàng)數(shù)據(jù);
observable.skip(4).subscribe(observer)
//skipLast:跳過(guò)后面的若干項(xiàng)數(shù)據(jù)
observable.skipLast(4).subscribe(observer)
take & takeLast
val observable = Observable.just(1, 2, 3, 4, 5, 6, 7, 8)
//take:只保留前面的若干項(xiàng)數(shù)據(jù)
observable.take(4).subscribe(observer)
//takeLast:只保留后面的若干項(xiàng)數(shù)據(jù)
observable.takeLast(4).subscribe(observer)
debounce
- 只有在空閑了一段時(shí)間后才發(fā)射數(shù)據(jù),通俗的說(shuō),就是如果一段時(shí)間沒(méi)有操作,就執(zhí)行一次操作;
val observable2 = Observable.create<Int> {
it.onNext(1)
Thread.sleep(400)
it.onNext(2)
Thread.sleep(1200)
it.onNext(3)
Thread.sleep(1000)
it.onNext(4)
Thread.sleep(800)
it.onNext(5)
Thread.sleep(2000)
it.onNext(6)
}
observable2
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.io())
.debounce(1, TimeUnit.SECONDS)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(observer)
sample
- 取樣,定期發(fā)射最新的數(shù)據(jù),等于是數(shù)據(jù)抽樣;
//sample:與debounce的區(qū)別是,sample是以時(shí)間為周期的發(fā)射,一秒又一秒內(nèi)的最新數(shù)據(jù)。而debounce是最后一個(gè)有效數(shù)據(jù)開始
observable2
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.io())
.sample(1, TimeUnit.SECONDS)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(observer)
throttleFirst & throttleLast & throttleWithTimeout & throttleLatest
- throttleFirst是指定周期內(nèi)第一個(gè)數(shù)據(jù),throttleLast與sample一致。throttleWithTimeout與debounce一致;
observable2
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.io())
.throttleFirst(1, TimeUnit.SECONDS)
// .throttleLast(1, TimeUnit.SECONDS)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(observer)
timeout
- 后一個(gè)數(shù)據(jù)發(fā)射未在前一個(gè)元素發(fā)射后規(guī)定時(shí)間內(nèi)發(fā)射則返回超時(shí)異常;
observable2
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.io())
.timeout(1, TimeUnit.SECONDS)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(observer)
elementAt
- 取值,取特定位置的數(shù)據(jù)項(xiàng);
Observable.just(4, 3, 2, 1)
.elementAt(1)
//出現(xiàn)越界時(shí),拋出異常
// .elementAtOrError(1)
.subscribe(maybeObserver)
first
- 首項(xiàng),只發(fā)射滿足條件的第一條數(shù)據(jù);
Observable.just(1, 2, 3, 4, 5)
// .first(-1)
// .firstOrError()
.firstElement()
.subscribe(maybeObserver)
last
- 末項(xiàng),只發(fā)射最后一條數(shù)據(jù);
Observable.just(1, 2, 3, 4, 5)
// .last(-1)
// .lastOrError()
.lastElement()
.subscribe(maybeObserver)
IgnoreElements
- 忽略所有的數(shù)據(jù),只保留終止通知(onError或onCompleted),ignoreElements 作用于Flowable、Observable。ignoreElement作用于Maybe、Single;
Observable.just(1, 2, 3, 4, 5)
.ignoreElements()
.subscribe(object : CompletableObserver {
override fun onSubscribe(d: Disposable) {
LjyLogUtil.d("${Thread.currentThread().name}_onSubscribe")
}
override fun onComplete() {
LjyLogUtil.d("${Thread.currentThread().name}_onComplete")
}
override fun onError(e: Throwable) {
LjyLogUtil.d("${Thread.currentThread().name}_onError:${e.message}")
}
})
4. 組合操作:
concat & concatArray
- 不交錯(cuò)的連接多個(gè)Observable的數(shù)據(jù);
- 組合多個(gè)被觀察者一起發(fā)送數(shù)據(jù),合并后 按發(fā)送順序串行執(zhí)行;
val just1 = Observable.just(1, 2, 3)
val just2 = Observable.just("A", "B", "C")
val just3 = Observable.just(4, 5, 6)
Observable.concat(just1, just2).subscribe(observer3)
//concat組合被觀察者數(shù)量<=4個(gè),而concatArray則可>4個(gè)
Observable.concatArray(
Observable.just(1, 2, 3),
Observable.just(4, 5, 6),
Observable.just(7, 8, 9)
).subscribe(observer3)
merge & mergeArray
- 組合多個(gè)被觀察者一起發(fā)送數(shù)據(jù),合并后 按時(shí)間線并行執(zhí)行;
- merge和concat的區(qū)別: merge合并后發(fā)射的數(shù)據(jù)項(xiàng)是并行無(wú)序的,concat合并后發(fā)射的數(shù)據(jù)項(xiàng)是串行有序的;
Observable.merge(just1, just2).subscribe(observer3)
//mergeWith
just1.mergeWith(just3).subscribe(observer3)
zip
- 打包,使用一個(gè)指定的函數(shù)將多個(gè)Observable發(fā)射的數(shù)據(jù)組合在一起,然后將這個(gè)函數(shù)的結(jié)果作為單項(xiàng)數(shù)據(jù)發(fā)射;
//zip操作符是將兩個(gè)數(shù)據(jù)流進(jìn)行指定的函數(shù)規(guī)則合并
Observable.zip(just1, just2, { t1, t2 -> "${t1}_${t2}" })
.subscribe(observer3)
//zipWith
just1.zipWith(just2, { t1, t2 -> "${t1}_${t2}" })
.subscribe(observer3)
startWith & startWithArray
- 在發(fā)射原來(lái)的Observable的數(shù)據(jù)序列之前,先發(fā)射一個(gè)指定的數(shù)據(jù)序列或數(shù)據(jù)項(xiàng);
Observable.just(1,2,3)
.startWith(Observable.just(4,5,6))
.startWithArray(7,8,9)
.subscribe(observer3)
join
- 無(wú)論何時(shí),如果一個(gè)Observable發(fā)射了一個(gè)數(shù)據(jù)項(xiàng),只要在另一個(gè)Observable發(fā)射的數(shù)據(jù)項(xiàng)定義的時(shí)間窗口內(nèi),就將兩個(gè)Observable發(fā)射的數(shù)據(jù)合并發(fā)射;
just1.join(just2,
//規(guī)定just2的過(guò)期期限
{ Observable.timer(3, TimeUnit.SECONDS) },
//規(guī)定just1的過(guò)期期限
{ Observable.timer(8, TimeUnit.SECONDS) },
//規(guī)定just1和just2的合并規(guī)則
{ t1, t2 -> "${t1}_${t2}" })
.subscribe(observer3)
combineLatest
- 當(dāng)兩個(gè)Observables中的任何一個(gè)發(fā)射了一個(gè)數(shù)據(jù)時(shí),通過(guò)一個(gè)指定的函數(shù)組合每個(gè)Observable發(fā)射的最新數(shù)據(jù)(一共兩個(gè)數(shù)據(jù)),然后發(fā)射這個(gè)函數(shù)的結(jié)果;
Observable.combineLatest(just1, just2,
{ t1, t2 -> "${t1}_${t2}" }).subscribe(observer3)
switch
- 將一個(gè)發(fā)射Observable序列的Observable轉(zhuǎn)換為這樣一個(gè)Observable:它逐個(gè)發(fā)射那些Observable最近發(fā)射的數(shù)據(jù);
Observable.switchOnNext(ObservableSource<Observable<Int>> { }, 12)
collect
- 將被觀察者Observable發(fā)送的數(shù)據(jù)事件收集到一個(gè)數(shù)據(jù)結(jié)構(gòu)里;
Observable.just(1, 2, 3, 4, 5, 6)
.collect(
{ ArrayList() },
BiConsumer<ArrayList<Int?>, Int> { t1, t2 -> t1.add(t2) }
).subscribe(Consumer { LjyLogUtil.d("num: $it") })
count
- 計(jì)算Observable發(fā)射的數(shù)據(jù)個(gè)數(shù),然后發(fā)射這個(gè)結(jié)果;
Observable.just(1, 2, 3)
.count()
.subscribe(object : SingleObserver<Long>{
override fun onSubscribe(d: Disposable) {
LjyLogUtil.d("${Thread.currentThread().name}_onSubscribe")
}
override fun onSuccess(t: Long) {
LjyLogUtil.d("${Thread.currentThread().name}_onSuccess:$it")
}
override fun onError(e: Throwable) {
LjyLogUtil.d("${Thread.currentThread().name}_onError:${e.message}")
}
})
5. 錯(cuò)誤處理:
cast
- 將數(shù)據(jù)元素轉(zhuǎn)型成其他類型,轉(zhuǎn)型失敗會(huì)拋出異常;
Observable.just(1, 4.0, 3f, 7, 12, 4.6, 5)
.cast(Int::class.java)
.subscribe(observer)
onErrorReturn
- 調(diào)用數(shù)據(jù)源的onError函數(shù)后會(huì)回到該函數(shù),可對(duì)錯(cuò)誤進(jìn)行處理,然后返回值,會(huì)調(diào)用觀察者onNext()繼續(xù)執(zhí)行,執(zhí)行完調(diào)用onComplete()函數(shù)結(jié)束所有事件的發(fā)射;
Observable.just(1, 2, 3.2, 4)
.map { it.toInt() }
.onErrorReturn {
if (it is NumberFormatException) {
0
} else {
throw IllegalArgumentException()
}
}.subscribe(observer)
onErrorReturnItem
- 與onErrorReturn類似,onErrorReturnItem不對(duì)錯(cuò)誤進(jìn)行處理,直接返回一個(gè)值;
Observable.just(1, 2, 3.2, 4)
.map { it.toInt() }
.onErrorReturnItem(0)
.subscribe(observer)
onErrorResumeNext & onExceptionResumeNext
- 遇到錯(cuò)誤時(shí),發(fā)送1個(gè)新的Observable;
Observable.just(1, 2, 3.2, 4)
.map { it.toInt() }
.onErrorResumeNext { Observable.just(5,6,7) }
.subscribe(observer)
retry
- 重試,如果Observable發(fā)射了一個(gè)錯(cuò)誤通知,重新訂閱它,期待它正常終止;
//retry:當(dāng)發(fā)生錯(cuò)誤時(shí),數(shù)據(jù)源重復(fù)發(fā)射item,直到?jīng)]有異常或者達(dá)到所指定的次數(shù)
Observable.just(1, 2, 3, 4)
.retry(3)
.subscribe(observer)
retryUntil
- 發(fā)生異常時(shí),返回值是false表示繼續(xù)執(zhí)行(重復(fù)發(fā)射數(shù)據(jù)),true不再執(zhí)行,但會(huì)調(diào)用onError方法;
var temp = 0
Observable.just(1, 2, 3, 4)
.map {
temp = it
it
}
.retryUntil {
temp > 3
}
.subscribe(observer)
retryWhen
- 遇到錯(cuò)誤時(shí),將發(fā)生的錯(cuò)誤傳遞給一個(gè)新的被觀察者(Observable),并決定是否需要重新訂閱原始被觀察者(Observable)& 發(fā)送事件;
Observable.just(1, 2, 3.2, 4)
.map { it.toInt() }
.retryWhen {
it.flatMap { throwable ->
if (throwable is NumberFormatException) {
Observable.error(Throwable("retryWhen終止"))
} else {
Observable.just(5, 6, 7)
}
}
}
.subscribe(observer)
6. 輔助操作:
delay
- 延遲一段時(shí)間發(fā)射結(jié)果數(shù)據(jù);
Observable.just(1, 2, 3)
.delay(1,TimeUnit.SECONDS)
.subscribe(observer)
do
- 在某個(gè)事件的生命周期中調(diào)用;
- doOnEach:數(shù)據(jù)源(Observable)每發(fā)送一次數(shù)據(jù),就調(diào)用一次;
- doOnNext:數(shù)據(jù)源每次調(diào)用onNext() 之前都會(huì)先回調(diào)該方法;
- doOnError:數(shù)據(jù)源每次調(diào)用onError() 之前會(huì)回調(diào)該方法;
- doOnComplete:數(shù)據(jù)源每次調(diào)用onComplete() 之前會(huì)回調(diào)該方法;
- doOnSubscribe:數(shù)據(jù)源每次調(diào)用onSubscribe() 之后會(huì)回調(diào)該方法;
- doOnDispose:數(shù)據(jù)源每次調(diào)用dispose() 之后會(huì)回調(diào)該方法;
Observable.just(1, 2, 3, 4)
.observeOn(Schedulers.io())
.subscribeOn(AndroidSchedulers.mainThread())
.doOnSubscribe { LjyLogUtil.d("${Thread.currentThread().name}_doOnSubscribe") }
.doOnEach { LjyLogUtil.d("${Thread.currentThread().name}_doOnEach:$it") }
.doOnNext { LjyLogUtil.d("${Thread.currentThread().name}_doOnNext:$it") }
.doOnError { LjyLogUtil.d("${Thread.currentThread().name}_doOnError:${it.localizedMessage}") }
.doOnComplete { LjyLogUtil.d("${Thread.currentThread().name}_doOnComplete") }
.doOnDispose { LjyLogUtil.d("${Thread.currentThread().name}_doOnDispose") }
.subscribe(observer)
7. 條件和布爾操作:
all
- 判斷Observable發(fā)射的所有的數(shù)據(jù)項(xiàng)是否都滿足某個(gè)條件;
Observable.just(1, -2, 3)
.all {
it > 0
}.subscribe { it ->
LjyLogUtil.d("${Thread.currentThread().name}_onSuccess:$it")
}
takeWhile & skipWhile & takeUntil & skipUntil
// 從0開始每1s發(fā)送1個(gè)數(shù)據(jù)
Observable.interval(1, TimeUnit.SECONDS)
//條件滿足時(shí),取數(shù)據(jù)
.takeWhile {
it<10
}
Observable.interval(1, TimeUnit.SECONDS)
//滿足條件時(shí),跳過(guò)數(shù)據(jù)
.skipWhile {
it<10
}
Observable.interval(1, TimeUnit.SECONDS)
//取數(shù)據(jù),直到滿足條件
.takeUntil {
it>10
}
Observable.interval(1, TimeUnit.SECONDS)
//等待直到傳入的Observable開始發(fā)送數(shù)據(jù)
.skipUntil (Observable.timer(5, TimeUnit.SECONDS))
SequenceEqual:
- 判斷兩個(gè)Observable是否按相同的數(shù)據(jù)序列;
Observable.sequenceEqual(
Observable.just(4, 5, 6),
Observable.just(4, 5, 6)
).subscribe { it ->
LjyLogUtil.d("${Thread.currentThread().name}_onSuccess:$it")
}
contains
- 判斷Observable是否會(huì)發(fā)射一個(gè)指定的數(shù)據(jù)項(xiàng);
Observable.just(1, -2, 3)
.contains(2)
.subscribe { it ->
LjyLogUtil.d("${Thread.currentThread().name}_onSuccess:$it")
}
isEmpty
- 判斷發(fā)送的數(shù)據(jù)是否為空;
Observable.just(1, -2, 3)
.isEmpty()
.subscribe { it ->
LjyLogUtil.d("${Thread.currentThread().name}_onSuccess:$it")
}
amb
-給定多個(gè)Observable,只讓第一個(gè)發(fā)射數(shù)據(jù)的Observable發(fā)射全部數(shù)據(jù);
val list: MutableList<ObservableSource<Int>> = ArrayList()
list.add(Observable.just(1, 2, 3).delay(1, TimeUnit.SECONDS))
list.add(Observable.just(4, 5, 6))
Observable.amb(list).subscribe{
LjyLogUtil.d("${Thread.currentThread().name}_onSuccess:$it")
}
defaultIfEmpty
- 發(fā)射來(lái)自原始Observable的數(shù)據(jù),如果原始Observable沒(méi)有發(fā)射數(shù)據(jù),就發(fā)射一個(gè)默認(rèn)數(shù)據(jù);
//在不發(fā)送onNext事件, 僅發(fā)送onComplete事件
Observable.empty<Int>()
.defaultIfEmpty(-1)
.subscribe { it ->
LjyLogUtil.d("${Thread.currentThread().name}_onSuccess:$it")
}
- SkipUntil:丟棄原始Observable發(fā)射的數(shù)據(jù),直到第二個(gè)Observable發(fā)射了一個(gè)數(shù)據(jù),然后發(fā)射原始Observable的剩余數(shù)據(jù);
實(shí)戰(zhàn)
結(jié)合 RxBinding 使用
- RxBinding是對(duì) Android View 事件的擴(kuò)展, 它使得開發(fā)者可以對(duì) View 事件使用 RxJava 的各種操作;
1. 添加依賴
//RxBinding
implementation 'com.jakewharton.rxbinding4:rxbinding:4.0.0'
//Google 'material' library bindings:
implementation 'com.jakewharton.rxbinding4:rxbinding-material:4.0.0'
//AndroidX library bindings:
implementation 'com.jakewharton.rxbinding4:rxbinding-core:4.0.0'
implementation 'com.jakewharton.rxbinding4:rxbinding-appcompat:4.0.0'
implementation 'com.jakewharton.rxbinding4:rxbinding-drawerlayout:4.0.0'
implementation 'com.jakewharton.rxbinding4:rxbinding-leanback:4.0.0'
implementation 'com.jakewharton.rxbinding4:rxbinding-recyclerview:4.0.0'
implementation 'com.jakewharton.rxbinding4:rxbinding-slidingpanelayout:4.0.0'
implementation 'com.jakewharton.rxbinding4:rxbinding-swiperefreshlayout:4.0.0'
implementation 'com.jakewharton.rxbinding4:rxbinding-viewpager:4.0.0'
implementation 'com.jakewharton.rxbinding4:rxbinding-viewpager2:4.0.0'
2. 按鈕防抖
val btn1 = findViewById<Button>(R.id.btn_1)
btn1.clicks()
.throttleFirst(2, TimeUnit.SECONDS)
.subscribeOn(AndroidSchedulers.mainThread())
.subscribe {
LjyLogUtil.d("點(diǎn)擊按鈕")
}
3. editText輸入監(jiān)聽(tīng)
//也可用作聯(lián)想搜索優(yōu)化
val et1 = findViewById<EditText>(R.id.et_1)
et1.textChanges()
.debounce(1, TimeUnit.SECONDS)
//跳過(guò)第1次請(qǐng)求 因?yàn)槌跏驾斎肟虻目兆址麪顟B(tài)
.skip(1)
.observeOn(AndroidSchedulers.mainThread())
.subscribe{
LjyLogUtil.d("${Thread.currentThread().name}_onSuccess:$it")
}
4. 聯(lián)合/表單判斷
val etName = findViewById<EditText>(R.id.et_name)
val etPwd = findViewById<EditText>(R.id.et_pwd)
val obName = etName.textChanges()
val obPwd = etPwd.textChanges()
Observable.combineLatest(
obName, obPwd, { name, pwd -> name == "ljy" && pwd == "123" })
//跳過(guò)第1次請(qǐng)求 因?yàn)槌跏驾斎肟虻目兆址麪顟B(tài)
.skip(1)
.observeOn(AndroidSchedulers.mainThread())
.subscribe { isLogin -> LjyLogUtil.d(if (isLogin) "登錄成功" else "登錄失敗") }
5. 定時(shí)器任務(wù)
val time = 10L
val btnLogin = findViewById<Button>(R.id.btn_login)
btnLogin.clicks()
.throttleFirst(time, TimeUnit.SECONDS)
.subscribeOn(AndroidSchedulers.mainThread())
.doOnNext { btnLogin.isEnabled = false }
.subscribe {
LjyLogUtil.d("點(diǎn)擊登錄")
Observable.intervalRange(
0, time, 0, 1,
TimeUnit.SECONDS, AndroidSchedulers.mainThread()
)
.subscribe(
{ btnLogin.text = "剩余${time - it}秒" },
{ LjyLogUtil.e(it.message) },
{
btnLogin.text = "獲取驗(yàn)證碼"
btnLogin.isEnabled = true
})
}
利用RxLifecycle解決內(nèi)存泄漏問(wèn)題
1. 添加依賴
//RxLifecycle
implementation 'com.trello.rxlifecycle4:rxlifecycle:4.0.2'
// If you want to bind to Android-specific lifecycles
implementation 'com.trello.rxlifecycle4:rxlifecycle-android:4.0.2'
// If you want pre-written Activities and Fragments you can subclass as providers
implementation 'com.trello.rxlifecycle4:rxlifecycle-components:4.0.2'
// If you want pre-written support preference Fragments you can subclass as providers
implementation 'com.trello.rxlifecycle4:rxlifecycle-components-preference:4.0.2'
// If you want to use Android Lifecycle for providers
implementation 'com.trello.rxlifecycle4:rxlifecycle-android-lifecycle:4.0.2'
// If you want to use Kotlin syntax
implementation 'com.trello.rxlifecycle4:rxlifecycle-kotlin:4.0.2'
// If you want to use Kotlin syntax with Android Lifecycle
implementation 'com.trello.rxlifecycle4:rxlifecycle-android-lifecycle-kotlin:4.0.2'
2. 綁定
Observable.intervalRange(0, 60, 0, 1, TimeUnit.SECONDS)
//利用RxLifecycle來(lái)解決內(nèi)存泄漏問(wèn)題
//bindToLifecycle的自動(dòng)取消訂閱示例
//.compose(bindToLifecycle())
//手動(dòng)設(shè)置在activity onPause的時(shí)候取消訂閱
.compose(this.bindUntilEvent(ActivityEvent.PAUSE))
.subscribe(
{ LjyLogUtil.d("剩余${60 - it}秒") },
{ LjyLogUtil.e(it.message) },
{ LjyLogUtil.d("完成") }
)
網(wǎng)絡(luò)請(qǐng)求
1. 網(wǎng)絡(luò)請(qǐng)求嵌套回調(diào)
//使用observeOn多次切換線程
apiService.searchRepo(emptyMap()) // 請(qǐng)求搜索列表
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.doOnNext {
//展示列表
}
.observeOn(Schedulers.io())
.flatMap {
Observable.fromIterable(it.items).map { repo -> repo.id }
}
.flatMap {
apiService.getItem(it) // 請(qǐng)求詳情
}
.observeOn(AndroidSchedulers.mainThread())
.subscribe {
//展示詳情
}
2. 網(wǎng)絡(luò)請(qǐng)求輪詢
//每60秒一次,無(wú)限輪詢
Observable.interval(60, TimeUnit.SECONDS)
.doOnNext {
apiService.searchRepo(emptyMap())
.subscribe(Consumer { LjyLogUtil.d("accept: ${it.items}") })
}.subscribe { LjyLogUtil.d("第 $it 次輪詢") }
//有條件輪詢
var count = 0
apiService.searchRepo(emptyMap())
.repeatWhen {
it.flatMap {
if (count > 3) {
Observable.error(Throwable("輪詢結(jié)束"))
} else {
LjyLogUtil.d("第 $count 次輪詢")
Observable.just(1).delay(60, TimeUnit.SECONDS)
}
}
}
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe({
LjyLogUtil.d("accept: ${it.items}")
count++
}, {
LjyLogUtil.d(it.message)
})
3. 網(wǎng)絡(luò)請(qǐng)求出錯(cuò)重連
//最大重試次數(shù)
val maxConnectCount = 10
//已重試次數(shù)
var currentRetryCount = 0
apiService.searchRepo(emptyMap())
.retryWhen {
it.flatMap { throwable ->
//根據(jù)異常類型選擇是否重試
if (throwable is IOException) {
if (currentRetryCount < maxConnectCount) {
currentRetryCount++
//遇到的異常越多,重試延遲間隔時(shí)間越長(zhǎng)
Observable.just(1).delay(currentRetryCount * 60L, TimeUnit.SECONDS)
} else {
Observable.error(Throwable("重試結(jié)束"))
}
} else {
Observable.error(Throwable("發(fā)生了非網(wǎng)絡(luò)異常(非I/O異常)"))
}
}
}
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe({
LjyLogUtil.d("accept: ${it.items}")
}, {
LjyLogUtil.d(it.message)
})
4. 內(nèi)存,磁盤,網(wǎng)絡(luò) 三級(jí)緩存
var memoryCache: String? = null
var diskCache: String? = null
private fun test10() {
val memory = Observable.create<String> {
if (memoryCache != null) {
it.onNext(memoryCache)
} else {
it.onComplete()
}
}
val disk = Observable.create<String> {
if (diskCache != null) {
it.onNext(diskCache)
} else {
it.onComplete()
}
}.doOnNext {
memoryCache = "內(nèi)存數(shù)據(jù)"
}
val net = Observable.just("網(wǎng)絡(luò)數(shù)據(jù)")
.doOnNext {
// memoryCache="內(nèi)存數(shù)據(jù)"
diskCache = "磁盤數(shù)據(jù)"
}
//通過(guò)concat()合并memory、disk、network 3個(gè)被觀察者的事件(即檢查內(nèi)存緩存、磁盤緩存 & 發(fā)送網(wǎng)絡(luò)請(qǐng)求)
Observable.concat(memory, disk, net)
//通過(guò)firstElement(),從串聯(lián)隊(duì)列中取出并發(fā)送第1個(gè)有效事件(Next事件),即依次判斷檢查memory、disk、network
.firstElement()
.subscribe {
LjyLogUtil.d("accept: $it")
}
}
5. 合并數(shù)據(jù)源 & 同時(shí)展示
var result = "數(shù)據(jù)來(lái)自:"
val net = Observable.just("網(wǎng)絡(luò)")
val disk = Observable.just("磁盤")
//使用merge,從網(wǎng)絡(luò)和本地獲取數(shù)據(jù)并展示
Observable.merge(net, disk)
.subscribe({
result += "$it, "
}, {
}, {
LjyLogUtil.d("result: $result")
})
//使用zip,合并2個(gè)網(wǎng)絡(luò)請(qǐng)求向獲取數(shù)據(jù)并展示
val repo1 = apiService.getItem(1001).subscribeOn(Schedulers.io())
val repo2 = apiService.getItem(1002).subscribeOn(Schedulers.io())
Observable.zip(
repo1, repo2, { data1, data2 ->
val repoList = ArrayList<RepoDetail>()
repoList.add(data1)
repoList.add(data2)
repoList
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe {
for (repoDetail in it) {
LjyLogUtil.d("result: ${repoDetail.name}")
}
}
源碼解析
入口
- 以下面代碼為源碼閱讀入口,Single是最簡(jiǎn)單的被觀察者;
Single.just(1)
.subscribe(object : SingleObserver<String> {
override fun onSubscribe(d: Disposable) {
LjyLogUtil.d("${Thread.currentThread().name}_onSubscribe")
}
override fun onSuccess(t: String) {
LjyLogUtil.d("${Thread.currentThread().name}_onSuccess:$t")
}
override fun onError(e: Throwable) {
LjyLogUtil.d("${Thread.currentThread().name}_onError:${e.message}")
}
})
被觀察者的創(chuàng)建
- 先看上面代碼中被觀察者的創(chuàng)建方法:Single.just();
public static <@NonNull T> Single<T> just(T item) {
Objects.requireNonNull(item, "item is null");//判空
return RxJavaPlugins.onAssembly(new SingleJust<>(item));
}
- 其中第一行為判空,第二行RxJavaPlugins.onAssembly為一個(gè)鉤子方法,方便添加一些額外操作,代碼如下:
@Nullable
static volatile Function<? super Single, ? extends Single> onSingleAssembly;
public static <T> Single<T> onAssembly(@NonNull Single<T> source) {
Function<? super Single, ? extends Single> f = onSingleAssembly;
if (f != null) {
return apply(f, source);
}
return source;
}
- 其中onSingleAssembly默認(rèn)為空,所以實(shí)際默認(rèn)是直接返回source,也就是SingleJust;
- 所以Single.just就是直接創(chuàng)建了一個(gè)被觀察者的實(shí)現(xiàn)類SingleJust的實(shí)例并返回,那么我們看一下SingleJust的代碼;
public final class SingleJust<T> extends Single<T> {
final T value;
public SingleJust(T value) {
this.value = value;
}
@Override
protected void subscribeActual(SingleObserver<? super T> observer) {
observer.onSubscribe(Disposable.disposed());
observer.onSuccess(value);
}
}
- 很簡(jiǎn)單吧,繼承了Single,并重寫了Single唯一的抽象方法subscribeActual,有個(gè)泛型變量value;
- Single的代碼很多,基本都是上面介紹的操作符方法的實(shí)現(xiàn),用到時(shí)可以點(diǎn)進(jìn)源碼看一看;
public abstract class Single<@NonNull T> implements SingleSource<T> {
...
protected abstract void subscribeActual(@NonNull SingleObserver<? super T> observer);
...
public final void subscribe(@NonNull SingleObserver<? super T> observer) {
...
}
}
- Single實(shí)現(xiàn)了SingleSource接口,并實(shí)現(xiàn)subscribe方法;
public interface SingleSource<@NonNull T> {
/**
* Subscribes the given {@link SingleObserver} to this {@link SingleSource} instance.
* @param observer the {@code SingleObserver}, not {@code null}
* @throws NullPointerException if {@code observer} is {@code null}
*/
void subscribe(@NonNull SingleObserver<? super T> observer);
}
訂閱
- 下面來(lái)看看開頭例子中的第二行的subscribe方法:Single.subscribe();
public final void subscribe(@NonNull SingleObserver<? super T> observer) {
//判空
Objects.requireNonNull(observer, "observer is null");
//鉤子方法,默認(rèn)還是入?yún)⒌腟ingleObserver
observer = RxJavaPlugins.onSubscribe(this, observer);
//判空
Objects.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null SingleObserver. Please check the handler provided to RxJavaPlugins.setOnSingleSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");
try {
subscribeActual(observer);
} catch (NullPointerException ex) {
throw ex;
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
NullPointerException npe = new NullPointerException("subscribeActual failed");
npe.initCause(ex);
throw npe;
}
}
- 可以看到訂閱方法subscribe中最有用的一行就是調(diào)用了subscribeActual,而這里的subscribeActual的實(shí)現(xiàn)正式上面的SingleJust中的實(shí)現(xiàn);
@Override
protected void subscribeActual(SingleObserver<? super T> observer) {
observer.onSubscribe(Disposable.disposed());
observer.onSuccess(value);
}
- 可以看到SingleJust中的subscribeActual直接連著調(diào)用了observer的onSubscribe和onSuccess方法,并且onSubscribe的入?yún)橐粋€(gè)disposed的Disposable,是個(gè)已取消的Disposable,因?yàn)閖ust是沒(méi)有延遲的,無(wú)需取消和修改,而且onError是不會(huì)被調(diào)用的,因?yàn)樗舶l(fā)一個(gè)也不會(huì)有出錯(cuò)的可能;
操作符實(shí)現(xiàn)
map
- 以map為例,對(duì)之前的示例擴(kuò)展一下,增加個(gè)map操作符對(duì)數(shù)據(jù)進(jìn)行轉(zhuǎn)換,map搞懂了,其他操作符的主要思路也是一樣的;
Single.just(1)
.map { "num_$it" }
.subscribe { num ->
LjyLogUtil.d(num)
}
- 那么看一下map的實(shí)現(xiàn):
public final <@NonNull R> Single<R> map(@NonNull Function<? super T, ? extends R> mapper) {
Objects.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new SingleMap<>(this, mapper));
}
- map中也是有判空和鉤子,創(chuàng)建了一個(gè)SingleMap并返回,其入?yún)⒌谝淮螀?shù)為this,也就是just返回的SingleJust實(shí)例,第二個(gè)參數(shù)為數(shù)據(jù)轉(zhuǎn)換的轉(zhuǎn)換器;
- 那么來(lái)看一下SingleMap:
public final class SingleMap<T, R> extends Single<R> {
final SingleSource<? extends T> source;
final Function<? super T, ? extends R> mapper;
public SingleMap(SingleSource<? extends T> source, Function<? super T, ? extends R> mapper) {
this.source = source;
this.mapper = mapper;
}
@Override
protected void subscribeActual(final SingleObserver<? super R> t) {
source.subscribe(new MapSingleObserver<T, R>(t, mapper));
}
...//省略內(nèi)部類MapSingleObserver
}
- 可以看到SingleMap的subscribeActual中只有一行代碼,就是調(diào)用source的訂閱方法subscribe,并傳入一個(gè)觀察者M(jìn)apSingleObserver實(shí)例和SingleMap持有的數(shù)據(jù)轉(zhuǎn)換器,其中source也就是前一步的被觀察者SingleJust實(shí)例;
- 而觀察者M(jìn)apSingleObserver則是負(fù)責(zé)對(duì)上下游數(shù)據(jù)進(jìn)行轉(zhuǎn)換 和傳遞,其入?yún)⑾掠蝧ubscribe傳入的觀察者SingleObserver實(shí)例,來(lái)看一下它的實(shí)現(xiàn):
static final class MapSingleObserver<T, R> implements SingleObserver<T> {
final SingleObserver<? super R> t;
final Function<? super T, ? extends R> mapper;
MapSingleObserver(SingleObserver<? super R> t, Function<? super T, ? extends R> mapper) {
this.t = t;
this.mapper = mapper;
}
@Override
public void onSubscribe(Disposable d) {
t.onSubscribe(d);
}
@Override
public void onSuccess(T value) {
R v;
try {
//mapper.apply數(shù)據(jù)轉(zhuǎn)換器對(duì)數(shù)據(jù)進(jìn)行轉(zhuǎn)換
v = Objects.requireNonNull(mapper.apply(value), "The mapper function returned a null value.");
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
onError(e);
return;
}
t.onSuccess(v);
}
@Override
public void onError(Throwable e) {
t.onError(e);
}
}
- 可以看到MapSingleObserver的onSuccess中調(diào)用數(shù)據(jù)轉(zhuǎn)換器mapper的apply方法對(duì)數(shù)據(jù)進(jìn)行轉(zhuǎn)換,
并調(diào)用構(gòu)造函數(shù)傳入的SingleObserver的onSuccess將轉(zhuǎn)換后的數(shù)據(jù)傳遞過(guò)去,而onSubscribe,onError方法則是直接調(diào)用了SingleObserver的方法
小結(jié)
- 那么示例代碼中map前后整個(gè)流程串起來(lái)就是:
- Single.just創(chuàng)建了一個(gè)被觀察者SingleJust,負(fù)責(zé)發(fā)送原始數(shù)據(jù);
- map創(chuàng)建了一個(gè)被觀察者SingleMap,構(gòu)造函數(shù)入?yún)⑹巧弦徊降腟ingleJust和數(shù)據(jù)轉(zhuǎn)換器,也就是對(duì)SingleJust進(jìn)行了包裝,或者說(shuō)接管,如何接管的呢,SingleMap中調(diào)用了SingleJust的訂閱方法subscribe,并傳入觀察者M(jìn)apSingleObserver,也就是對(duì)SingleJust說(shuō),小j啊,你這項(xiàng)目給我(SingleMap)了,后面的客戶你不用跟了,把你手里的數(shù)據(jù)給我小弟MapSingleObserver就行了,以后需要啥他找你要;
- 后面再調(diào)用subscribe呢,實(shí)際是調(diào)用SingleMap的subscribe,也就是后面再有客戶其實(shí)都是跟SingleMap簽的單子了
- SingleMap收到單子呢轉(zhuǎn)手就給小弟MapSingleObserver了,反正SingleJust的數(shù)據(jù)也在你手里呢你看看咋給客戶服務(wù)吧
- MapSingleObserver就拿著SingleJust的數(shù)據(jù)進(jìn)行轉(zhuǎn)換,然后調(diào)用客戶(下游subscribe傳入的觀察者)的回調(diào)方法進(jìn)行服務(wù)
- ps:
- MapSingleObserver:同樣是觀察者,我也太累了,又負(fù)責(zé)接收上游的數(shù)據(jù),又要進(jìn)行處理,處理完還得反饋給下游的客戶;
- SingleJust:你累?同樣是被觀察者,我辛辛苦苦做的項(xiàng)目,還不是被SingleMap拿走了,還讓你監(jiān)視(觀察)我,這點(diǎn)業(yè)績(jī)都被內(nèi)部消化了啊,而且項(xiàng)目開始了還把我工號(hào)留給客戶( t.onSubscribe(d)),那客戶是說(shuō)停就停啊,我這出錯(cuò)了你也是轉(zhuǎn)頭就給客戶說(shuō)?。?t.onError(e));
- 這哪是RxJava,這分明是職場(chǎng)啊;
subscribeOn & observeOn
- 在對(duì)之前的示例擴(kuò)展一下,加入線程切換,那就是用到了subscribeOn & observeOn 操作符了;
Single.just(1)
.map { "num_$it" }
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe { num ->
LjyLogUtil.d(num)
}
subscribeOn
- 先來(lái)看看subscribeOn的入?yún)chedulers.io():
public static Scheduler io() {
return RxJavaPlugins.onIoScheduler(IO);
}
static {
SINGLE = RxJavaPlugins.initSingleScheduler(new SingleTask());
COMPUTATION = RxJavaPlugins.initComputationScheduler(new ComputationTask());
IO = RxJavaPlugins.initIoScheduler(new IOTask());
TRAMPOLINE = TrampolineScheduler.instance();
NEW_THREAD = RxJavaPlugins.initNewThreadScheduler(new NewThreadTask());
}
static final class IOTask implements Supplier<Scheduler> {
@Override
public Scheduler get() {
return IoHolder.DEFAULT;
}
}
static final class IoHolder {
static final Scheduler DEFAULT = new IoScheduler();
}
public final class IoScheduler extends Scheduler {
...
}
- 可以看到最后返回的是Scheduler的實(shí)現(xiàn)類IoScheduler實(shí)例;
- 再來(lái)看看subscribeOn方法的實(shí)現(xiàn):
public final Single<T> subscribeOn(@NonNull Scheduler scheduler) {
Objects.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new SingleSubscribeOn<>(this, scheduler));
}
- 還是判空和鉤子,然后創(chuàng)建一個(gè)SingleSubscribeOn并返回,入?yún)⑹巧嫌蔚谋挥^察者(this),和線程調(diào)度器Scheduler;
public final class SingleSubscribeOn<T> extends Single<T> {
final SingleSource<? extends T> source;
final Scheduler scheduler;
public SingleSubscribeOn(SingleSource<? extends T> source, Scheduler scheduler) {
this.source = source;
this.scheduler = scheduler;
}
@Override
protected void subscribeActual(final SingleObserver<? super T> observer) {
//創(chuàng)建一個(gè)觀察者SubscribeOnObserver的實(shí)例,傳入上游的被觀察者source
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<>(observer, source);
//調(diào)用下游的觀察者observer的onSubscribe,通知其開始訂閱了,取消的話可以找SubscribeOnObserver
observer.onSubscribe(parent);
//調(diào)用IoScheduler的scheduleDirect方法切換線程
Disposable f = scheduler.scheduleDirect(parent);
parent.task.replace(f);
}
...//省略內(nèi)部類SubscribeOnObserver
}
- 上面subscribeActual中創(chuàng)建了一個(gè)被觀察者SubscribeOnObserver,并且調(diào)用scheduler.scheduleDirect切換線程;
public Disposable scheduleDirect(@NonNull Runnable run) {
return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
}
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
final Worker w = createWorker();
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
DisposeTask task = new DisposeTask(decoratedRun, w);
w.schedule(task, delay, unit);
return task;
}
//其中DisposeTask為Scheduler的內(nèi)部類
static final class DisposeTask implements Disposable, Runnable, SchedulerRunnableIntrospection {
...
DisposeTask(@NonNull Runnable decoratedRun, @NonNull Worker w) {
this.decoratedRun = decoratedRun;
this.w = w;
}
@Override
public void run() {
runner = Thread.currentThread();
try {
try {
decoratedRun.run();
} catch (Throwable ex) {
// Exceptions.throwIfFatal(e); nowhere to go
RxJavaPlugins.onError(ex);
throw ex;
}
} finally {
dispose();
runner = null;
}
}
...
}
- 其中scheduleDirect的入?yún)⑹莻€(gè)Runnable,而SubscribeOnObserver實(shí)現(xiàn)了Runnable接口和SingleObserver接口,構(gòu)造方法入?yún)⑹窍掠蔚挠^察者和上游的被觀察者,在自己的onSuccess,onError中直接傳給下游的觀察者,在run方法中訂閱上游的被觀察者;
static final class SubscribeOnObserver<T>
extends AtomicReference<Disposable>
implements SingleObserver<T>, Disposable, Runnable {
...
SubscribeOnObserver(SingleObserver<? super T> actual, SingleSource<? extends T> source) {
this.downstream = actual;
this.source = source;
this.task = new SequentialDisposable();
}
...
@Override
public void onSuccess(T value) {
downstream.onSuccess(value);
}
@Override
public void onError(Throwable e) {
downstream.onError(e);
}
...
@Override
public void run() {
source.subscribe(this);
}
}
observeOn
- 先來(lái)看看Android專用的線程調(diào)度器AndroidSchedulers.mainThread():
//通過(guò)靜態(tài)內(nèi)部類提供HandlerScheduler的單例
public final class AndroidSchedulers {
//開放的入口方法
public static Scheduler mainThread() {
//鉤子
return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);
}
//私有的靜態(tài)變量
private static final Scheduler MAIN_THREAD =
RxAndroidPlugins.initMainThreadScheduler(() -> MainHolder.DEFAULT);
//私有的靜態(tài)內(nèi)部類
private static final class MainHolder {
//本質(zhì)還說(shuō)離不開Handler,通過(guò)Looper.getMainLooper()切換到UI線程
static final Scheduler DEFAULT
= new HandlerScheduler(new Handler(Looper.getMainLooper()), true);
}
//私有的構(gòu)造方法,并拋出異常,因?yàn)檫@里是創(chuàng)建HandlerScheduler的單例,而不是AndroidSchedulers本身
private AndroidSchedulers() {
throw new AssertionError("No instances.");
}
}
final class HandlerScheduler extends Scheduler {
private final Handler handler;
private final boolean async;
HandlerScheduler(Handler handler, boolean async) {
this.handler = handler;
this.async = async;
}
public Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit) {
...//省略判空和鉤子方法
ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
Message message = Message.obtain(handler, scheduled);
if (async) {
message.setAsynchronous(true);
}
//通過(guò)主線程的handler發(fā)送消息
handler.sendMessageDelayed(message, unit.toMillis(delay));
return scheduled;
}
@Override
public Worker createWorker() {
return new HandlerWorker(handler, async);
}
private static final class HandlerWorker extends Worker {
private final Handler handler;
private final boolean async;
private volatile boolean disposed;
HandlerWorker(Handler handler, boolean async) {
this.handler = handler;
this.async = async;
}
public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
...
if (disposed) {
return Disposable.disposed();
}
...
ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
Message message = Message.obtain(handler, scheduled);
message.obj = this;
...
handler.sendMessageDelayed(message, unit.toMillis(delay));
if (disposed) {
handler.removeCallbacks(scheduled);
return Disposable.disposed();
}
return scheduled;
}
@Override
public void dispose() {
disposed = true;
handler.removeCallbacksAndMessages(this /* token */);
}
@Override
public boolean isDisposed() {
return disposed;
}
}
private static final class ScheduledRunnable implements Runnable, Disposable {
private final Handler handler;
private final Runnable delegate;
private volatile boolean disposed;
ScheduledRunnable(Handler handler, Runnable delegate) {
this.handler = handler;
this.delegate = delegate;
}
@Override
public void run() {
try {
delegate.run();
} catch (Throwable t) {
RxJavaPlugins.onError(t);
}
}
@Override
public void dispose() {
handler.removeCallbacks(this);
disposed = true;
}
@Override
public boolean isDisposed() {
return disposed;
}
}
}
- 然后來(lái)看observeOn方法:
public final Single<T> observeOn(@NonNull Scheduler scheduler) {
Objects.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new SingleObserveOn<>(this, scheduler));
}
- 本質(zhì)還是創(chuàng)建了一個(gè)被觀察者:
public final class SingleObserveOn<T> extends Single<T> {
final SingleSource<T> source;
final Scheduler scheduler;
public SingleObserveOn(SingleSource<T> source, Scheduler scheduler) {
this.source = source;
this.scheduler = scheduler;
}
@Override
protected void subscribeActual(final SingleObserver<? super T> observer) {
//調(diào)用上游被觀察者source的訂閱方法subscribe,傳入一個(gè)觀察者ObserveOnSingleObserver,其入?yún)⑹窍掠蔚挠^察者observer
source.subscribe(new ObserveOnSingleObserver<>(observer, scheduler));
}
...//省略內(nèi)部類ObserveOnSingleObserver
}
- ObserveOnSingleObserver代碼如下, 在自己的onSubscribe調(diào)用下游觀察者的onSubscribe,在自己的onSuccess和onError中調(diào)用scheduler.scheduleDirect切換線程,scheduleDirect的入?yún)⑹荝unnable,會(huì)調(diào)用自己的run方法,而在run方法中調(diào)用了下游觀察者的onSuccess和onError傳遞結(jié)果,以此實(shí)現(xiàn)改變下游觀察者的線程;
static final class ObserveOnSingleObserver<T> extends AtomicReference<Disposable>
implements SingleObserver<T>, Disposable, Runnable {
...
ObserveOnSingleObserver(SingleObserver<? super T> actual, Scheduler scheduler) {
this.downstream = actual;
this.scheduler = scheduler;
}
@Override
public void onSubscribe(Disposable d) {
if (DisposableHelper.setOnce(this, d)) {
downstream.onSubscribe(this);
}
}
@Override
public void onSuccess(T value) {
this.value = value;
Disposable d = scheduler.scheduleDirect(this);
DisposableHelper.replace(this, d);
}
@Override
public void onError(Throwable e) {
this.error = e;
Disposable d = scheduler.scheduleDirect(this);
DisposableHelper.replace(this, d);
}
@Override
public void run() {
Throwable ex = error;
if (ex != null) {
downstream.onError(ex);
} else {
downstream.onSuccess(value);
}
}
...
}
番外:Single.create
- 看了上面的Single.just, 可能有種被忽悠的感覺(jué),也太low了吧,這么簡(jiǎn)單么,那么我們不妨再來(lái)看看Single.create:
public static <@NonNull T> Single<T> create(@NonNull SingleOnSubscribe<T> source) {
Objects.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new SingleCreate<>(source));
}
- 同樣的create方法中創(chuàng)建了一個(gè)被觀察者SingleCreate并返回:
public final class SingleCreate<T> extends Single<T> {
final SingleOnSubscribe<T> source;
public SingleCreate(SingleOnSubscribe<T> source) {
this.source = source;
}
@Override
protected void subscribeActual(SingleObserver<? super T> observer) {
Emitter<T> parent = new Emitter<>(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
...//省略了內(nèi)部類Emitter
}
- 看了上面代碼,create和just有什么區(qū)別呢,just{}中的代碼是直接賦值給value的,不需要下游subscribe就會(huì)執(zhí)行,而create是通過(guò)匿名內(nèi)部類傳入一個(gè)SingleOnSubscribe的實(shí)例,并在實(shí)現(xiàn)的subscribe方法中產(chǎn)生數(shù)據(jù),在SingleCreate.subscribeActual中調(diào)用SingleOnSubscribe.subscribe,SingleCreate.subscribeActual則是在下游subscribe時(shí)才會(huì)執(zhí)行,也就是Single.create在下游訂閱后才開始產(chǎn)生數(shù)據(jù);那么顯然,Observable.just()不適合封裝網(wǎng)絡(luò)數(shù)據(jù),因?yàn)槲覀兺ǔ2幌朐趕ubscribe之前做網(wǎng)絡(luò)請(qǐng)求。
參考
- Rxjava的源碼
- 給 Android 開發(fā)者的 RxJava 詳解
- 扔物線.henCoder-RxJava原理完全解析
- Carson帶你學(xué)Android:RxJava
- 擁抱 RxJava(三):關(guān)于 Observable 的冷熱,常見(jiàn)的封裝方式以及誤區(qū)
- Rxjava3文檔級(jí)教程