對于Rxjava想必大家都很熟悉了,這里不再贅述什么是Rxjava。
今天的主題是:從源碼角度(2.0)分析,Rxjava是如何做到事件分發(fā)的??
以下是今天學(xué)習(xí)筆記的目錄:
- 關(guān)鍵類及方法簡要說明
- 分析源碼,查看事件是如何傳遞的
關(guān)鍵類及方法說明
首先我們知道Rxjava是一個擴(kuò)展的“觀察者”模式,既然是“觀察者”模式,那么不可避免的會涉及到:被觀察者,觀察者,訂閱操作,事件。那么Rxjava中哪些類是這些身份的扮演者呢?先上一下基本類圖。
類圖:
基本使用code:
結(jié)合圖-1和圖2,我們可以知道"被觀察者"以及"觀察者"是誰了。 那"ObservableOnSubscribe","ObservableEmitter","Disposable"又是什么? 接下來我們就從"Observer"開始逐一梳理下他們是什么,以及如何工作的。
Observer:觀察者
從上圖中我們可以知道 “Observer” 是Rxjava 中 觀察者的身份。我們簡單介紹下Observer中各個方法的作用。
onSubscribe(Disposable d):
1:當(dāng) Observable向 Observer發(fā)送事件前,調(diào)用此方法(具體在哪調(diào)用的,后續(xù)會分
析到,這里先給出結(jié)論)。
2:可以調(diào)用 Disposable(為了便于理解,我們可以把它想象成 “消息控制開關(guān)”,因為它就像我們平常接觸的各種開關(guān)一樣,來控制消息是否分發(fā)給 Observer)的dispose()來告知 Observable,當(dāng)前的 Observer是否需要接受事件。
onNext(T value):
1:該方法就是 Observable傳遞訂閱事件給Observer的回調(diào)方法,其中value就是 Observer向Observable訂閱的要監(jiān)聽的事件。
2:如方法注釋所說的那樣,該方法可以調(diào)用多次(想想也是,觀察者訂閱的事件當(dāng)然可能發(fā)生多次了)。
3:當(dāng)調(diào)用onComplete()/onError()之后就算再調(diào)用onNext()事件也不會發(fā)送至 Observer。
onComplete():
1:該方法標(biāo)記之后的任何事件將不會發(fā)送給 Observer
2:當(dāng)調(diào)用了onComplete()之后再調(diào)用onError()事件是不會發(fā)送給 Observer。
onError(Throwable e):
1:該方法通知觀察者出現(xiàn)了錯誤情況。
2:當(dāng)調(diào)用了onError()之后,再調(diào)用onNext()/onComplete()事件也不會發(fā)送給 Observer。
問題:
1:在實際應(yīng)用時發(fā)現(xiàn),“當(dāng)調(diào)用onComplete()之后再調(diào)用onError() app會拋出異常并crash”。
2:當(dāng)設(shè)置 observeOn(Scheduler s)之后,會導(dǎo)致 “onError()之前的事件接受不到或者丟失部分事件”。
以上兩個問題會在稍后給出原因。
Observable:被觀察者
首先,我們知道它是Rxjava中 “被觀察者” 的具體實現(xiàn)。 其次,我們要創(chuàng)建一個被觀察者可以通過Observable提供的N多靜態(tài)方法"去new一個出來"。我們拿"create()"舉個例子。
create():
create()方法其實做的事情并不多。
- 判斷傳入的ObservableOnSubscribe是否為null。
- 創(chuàng)建ObservableCreate(該類繼承Observable)并把ObservableOnSubscribe添加到 ObservableCreate,并返回ObservableCreate。
既然創(chuàng)建好了 被觀察者,那么接下來就需要 "觀察者 訂閱 被觀察者,讓被觀察者時刻保持警惕,當(dāng)有我要的事件發(fā)生時,記得通知我",那"訂閱"這個動作 Rxjava是如何實現(xiàn)的呢?
subscribe():訂閱操作
我們都知道 標(biāo)準(zhǔn)觀察者模式中,實現(xiàn)"訂閱"操作的話應(yīng)該是 "觀察者.訂閱(被觀察者)"這種寫法才對,就拿給View設(shè)置點擊事件來說應(yīng)該是"View.setOnClickListener()"這種。那為啥Rxjava中要反其道而行要采用"被觀察者.訂閱(觀察者)"這種方式呢? 這是因為Rxjava采用了“流式api”調(diào)用策略,這樣寫可以使代碼更簡潔,有種一氣呵成的感覺,所以就把 "訂閱"動作放到了Observable中。廢話不多說,我們來看下subscribe()都做了些什么事情吧。
首先,通過ObjectHelper.requireNonNull()判斷傳入的Observer是否為null。
其次,調(diào)用RxJavaPlugins.onSubscribe()返回當(dāng)前Observer(具體該方法會在后續(xù)章節(jié)分析)。
再次,調(diào)用ObjectHelper.requireNonNull()判斷傳入的Observer是否為null。
最后,調(diào)用subscribeActual()進(jìn)行 訂閱操作。
subscribeActual():
通過該方法的解釋,我們可以知道此方法是 "實際訂閱動作的發(fā)生地"。
到此“觀察者模式”中的三大主角,“觀察者”和“被觀察者”以及“訂閱操作”就簡單介紹完了。那么問題來了,接下來“事件”需要怎么發(fā)出呢?好,我們繼續(xù)往下看。
事件:
ObservableOnSubscribe:
首先從該接口的定義中我們知道,此接口是 “事件產(chǎn)生和推送的地方”。具體的事件的產(chǎn)生和發(fā)送是在subscribe()中實現(xiàn)的。我們來舉個例子,聲情并茂的說明下吧。 我們大都玩過cs,cf之類的射擊游戲,假如我是警,對面人物是匪,此時我們相遇了(我沒看到他),一根無形的線把我們“栓”在了一起。此時對面哥們一看,有敵人,二話不說舉槍就開始射擊,不過還好我穿了防彈衣,再加上反映比較快馬上就找好了掩體進(jìn)行了反擊。
對于這樣一個場景來說,假如我是Observer,那么對面那哥們就是Observable,那根“無形的線”就可以理解為subscribe動作,槍對于我們來說就是“事件的產(chǎn)生者或者容器”那么“子彈”可以理解為具體要發(fā)送的“事件”了。
這里的“槍”指的就是“ObservableOnSubscribe”。
ObservableEmitter:
繼續(xù)沿用上邊的例子,我們說到了可以用“槍”來比喻成“ObservableOnSubscribe”來理解它的定義。那么,槍要想發(fā)射子彈的話,人得扣動扳機才行,對吧,那我們就可以把“ObservableEmitter”理解成“槍的扳機”,用于控制事件的“發(fā)射”。
看一下類圖吧。
通過類圖我們不難發(fā)現(xiàn),ObservableEmitter實現(xiàn)了Emitter接口,Emitter接口中定義了onNext(),onError(),onComplete()等三個方法,是不是感覺在哪見過?沒錯,這三個方法正好對應(yīng)的是Observer中的onNext(),onError(),onComplete()。通過Emitter調(diào)用這三個方法,則會分別回調(diào)Observer對應(yīng)的方法(具體實現(xiàn)我們在稍后會給出)。那么調(diào)用Emitter的三個方法后Observer會收到消息,事件不就是從 被觀察者向觀察者傳遞了嗎?
Disposable
最后再來說一說,Disposable,老規(guī)矩,上圖。
我們可以把"Disposable"理解為“消息控制開關(guān)”,就像電燈的開關(guān)一樣,它控制了是否消息可以送達(dá)至Observer處。
而且細(xì)心的同學(xué)觀察 圖-2以及Observer接口的定義代碼可以發(fā)現(xiàn),Observer的onSubscribe(Disposable d)把該開關(guān)返回給了 Observer,為什么要這么設(shè)計呢?
原因應(yīng)該有2個:
1:當(dāng)消息發(fā)送前,首先回調(diào)Observer的onSubscribe()告知觀察者我要開始給你發(fā)消息了,你先做做準(zhǔn)備,這樣的 話我們可以在第一條消息未發(fā)送之前在該方法中做一下準(zhǔn)備工作之類的。
2:再調(diào)該方法時,如果觀察者不想被觀察者發(fā)送事件,可能我還沒做好準(zhǔn)備,或者我改變注意我不想接收 被觀察的發(fā)送的事件了,可以調(diào)用Disposable 的dispose(),這樣當(dāng) 被觀察發(fā)送事件的時候,就會判斷,觀察者是不是需要我的事件,如果不需要我就不發(fā)了(實際代碼內(nèi)部也是這樣處理的,看過源碼的東西大概都清楚這些事,這里就稍微介紹一下)。
簡單介紹就到這里了,接下來我們就從源碼角度來看看,事件是怎樣從 Disposable 傳遞到 Observer的。
源碼分析
看過Rxjava源碼的同學(xué)都知道,Rxjava的代碼量還是挺多的,我們不可能事無巨細(xì)。那么如何閱讀其中的代碼比較好呢?我覺得應(yīng)該遵從這樣一個原則:掌握大方向,梳理脈絡(luò)。細(xì)化小方面,深入理解。
首先,我們需要了解這個功能,是怎樣是實現(xiàn)的,用到了哪些類,接口等,最好列出來,畫一畫類圖,流程圖,要做到心中有數(shù),不要過分追究細(xì)節(jié),只要知道這個功能的每一步對應(yīng)的是哪些類就行了。
最后,當(dāng)上步我們完成之后,就可以對存在疑慮的地方做進(jìn)一步研究,比如某個知識點可能不大清楚,這時就需要花事件和心思搞明白了。如此這般流程走下來,我相信閱讀源碼并不只是枯燥乏味的事情,這其中定會充斥著很多歡樂。廢話不多說了,接下來我們就開始閱讀源碼吧。
大方向:
Observable創(chuàng)建:
在“關(guān)鍵類及方法說明”我們從create()方法入手簡要分析了下Observable是如何創(chuàng)建的。通過代碼我們知道,create()方法最終創(chuàng)建了一個名為“ObservableCreate”的Observable,并把“ObservableOnSubscribe”存儲到ObservableCreate中。
Observer創(chuàng)建:
Observer的創(chuàng)建也可以參考“關(guān)鍵類及方法說明”中關(guān)于Observer的介紹,這里不再說了。
接下來我們主要看一下,執(zhí)行“訂閱”操作之后事件是如何傳遞的。
事件的傳遞:
通過“關(guān)鍵類及方法說明”中關(guān)于subscribe()發(fā)生后的介紹,我們知道“實際訂閱”操作都是發(fā)生在subscribeActual()該方法中的,又因為該方法是抽象方法,所以我們直接進(jìn)入“ObservableCreate”類查看其對subscribeActual()方法的實現(xiàn)吧。
這是ObservableCreate中subscribeActual()方法的具體實現(xiàn)。接下來我們具體分析下。
1:創(chuàng)建CreateEmitter
在“關(guān)鍵類及方法說明”中我們介紹了什么是“ObservableEmitter”,這里不再贅述。閱讀源碼我們發(fā)現(xiàn),通過調(diào)用CreateEmitter的構(gòu)造方法,把“Observer”對象保存至到了CreateEmitter中。 問題:為什么此Emitter中要保留一份外部Observer的引用呢? 稍后我們給出原因。
2:回調(diào)Observer的onSubscribe(Disposable d)
因為CreateEmitter也實現(xiàn)了Disposable,所以就可以把CreateEmitter回調(diào)給 Observer了。此時觀察者就可以在onSubscribe()中做一些事件發(fā)送前的準(zhǔn)備工作什么的。
3:發(fā)送事件
首先,回調(diào)ObservableOnSubscribe的subscribe()并把新創(chuàng)建的CreateEmitter返回去。此時我們就可以用該Emitter發(fā)送事件了(這就是Observable產(chǎn)生以及發(fā)送事件的地方)。例如:
3.1:OnNext()
首先,判斷傳遞的“事件”是否為null。
其次,再調(diào)用“isDisposed()”判斷“消息開關(guān)”是否已經(jīng)關(guān)閉了(這個稍后分析)。
最后,如果沒關(guān)閉,則回調(diào)Observer的onNext()方法回調(diào)事件。還記在分析ObservableEmitter時引入的那個問題嗎?在此,就知道原因了吧。如果不給我一個觀察者的引用,我把事件回調(diào)給誰呢,是吧。
3.2:onComplete()
首先,調(diào)用“isDisposed()”判斷“消息開關(guān)”是否已經(jīng)關(guān)閉了,如果沒關(guān)閉和繼續(xù)。
其次,調(diào)用Observer的onComplete()回調(diào)Complete事件。
最后,調(diào)用dispose()關(guān)閉“消息控制開關(guān)”。
3.3:onError()
首先,判斷傳入的Throwable是否為null。
其次,調(diào)用“isDisposed()”判斷“消息開關(guān)”是否已經(jīng)關(guān)閉了,如果沒關(guān)閉和繼續(xù)。如果已經(jīng)關(guān)閉了,則調(diào)用RxJavaPlugins.onError(t),該方法稍后再解釋。
最后,調(diào)用Observer的onError()回調(diào)error事件。
小方面
自此我們結(jié)合源碼,大體捋清楚了,Rxjava中事件傳遞的一個過程,這就是前邊提到的“掌握大方向,梳理脈絡(luò)”。接下來我們對前面遺留的諸多問題進(jìn)行一一深入理解,這部分也就是“細(xì)化小方面,深入理解”。
首先我們在創(chuàng)建CreateEmitter的時候,發(fā)現(xiàn),它既實現(xiàn)了Emitter接口,又實現(xiàn)了Disposable接口,所以說它既是“消息發(fā)射器”又是“消息控制開關(guān)”。而且我們在分析onNext(),onComplete(),onError()方法時都發(fā)現(xiàn),這些方法內(nèi)部都先調(diào)用了"isDisposed()"判斷“消息開關(guān)”是否關(guān)閉了,接下來我們就從“消息發(fā)射器”角度來捋一捋這個方法是怎么實現(xiàn)的。
isDisposed()
我們知道CreateEmitter是繼承自AtomicReference(這是專門采用原子操作,進(jìn)行更新操作對象的一個原子類)。 首先,通過get()獲取AtomicReference中的value的值,默認(rèn)值為null。 其次,調(diào)用DisposableHelper的isDisposed()把get()獲取的值傳入,并與DisposableHelper.DISPOSED進(jìn)行比較,判斷不是DisposableHelper.DISPOSED則返回false,如果是的話則返回true。
setDisposable()
做為“消息發(fā)射器”CreateEmitter還必須實現(xiàn)此方法,我們看看一下這個方法都干了些什么吧。
1:把CreateEmitter的當(dāng)前實例和Disposable傳入RxJavaPlugins的set()方法中。
2:CreateEmitter做為AtomicReference,獲取當(dāng)前CreateEmitter的value并賦值給“current”。
2.1:如果 “current ==DISPOSED”,且傳入的Disposed不為null,則調(diào)用傳入的Disposable的dispose()并跳出當(dāng)前方法。
2.2:如果 “current !=DISPOSED”,執(zhí)行AtomicReference的compareAndSet() 給 CreateEmitter的value 設(shè)置為傳入的Disposable。如果current不為null的話,執(zhí)行dispose()方法。
至此我們發(fā)現(xiàn)setDisposable()就是把傳入的“Disposable”保存起來,等到調(diào)用“isDisposed等方法”來判斷“消息開關(guān)是否關(guān)閉了”。 接下來我們做個“猜想”: **如果在消息發(fā)送前設(shè)置Disposable為DisposableHelper.DISPOSED的話,消息是會繼續(xù)傳遞的,如果設(shè)置的是自定義的Disposable的話,消息則不會被傳遞。 ** 下面我們就實際測試下,這個“猜想”是否成立吧。
case1:在ObservableOnSubscribe的subscribe()中設(shè)置Disposable為DisposableHelper的唯一實例。
log信息為下圖。
case2:在ObservableOnSubscribe的subscribe()中設(shè)置Disposable為自定義的Dispaseable。
根據(jù)以上測試結(jié)果確實和我們的“猜想”一致為什么呢?此時我們回過頭來看看上邊關(guān)于** isDisposed() 以及 onComplete()的分析,發(fā)現(xiàn) 當(dāng)繼續(xù)執(zhí)行CreateEmitter的onComplete時,此時的Dispaseable如果為DisposableHelper的DISPOSED實例,isDisposed() 就會返回true,所以 后續(xù)的Complate會回傳給Observer了。如果為false,當(dāng)然就不會走了唄。**
CreateEmitter做為“消息發(fā)射器”的角色的責(zé)任分析完畢后,接下來我們分析下其做為“消息開關(guān)”又能干些什么吧。
當(dāng)CreateEmitter做為“消息開關(guān)”時,它自身有兩個方法需要實現(xiàn),它們分別是:isDisposed()和dispose()。isDisposed()以及分析過了,接下來只需要分析下dispose()就行了。
dispose():
通過閱讀源碼我們知道,當(dāng)CreateEmitter中的value對應(yīng)的不是DisposableHelper的DISPOSED實例的話,就會把DisposableHelper的DISPOSED保存至CreateEmitter中。當(dāng)通過Emitter發(fā)送事件時,就會先調(diào)用isDisposed()來判斷“消息開關(guān)”是否關(guān)閉了,如果關(guān)閉了,則中斷事件的傳遞。
到此RxJava的關(guān)于事件分發(fā)這塊到底是如何做的已經(jīng)分析完了,如果大家看完感覺有幫助的話歡迎點擊收藏和喜歡。如果有錯誤之處,還望各位指出,我會盡快改正的。后續(xù)我會繼續(xù)分享關(guān)于,線程調(diào)度,常用操作符以及2.0關(guān)于背壓這3個方面的介紹,謝謝大家。