一篇不太一樣的RxJava介紹

距離上篇文章已有半年的時(shí)間,雖然這期間沒(méi)什么輸出,但是還是關(guān)注著RxJava和國(guó)內(nèi)一些動(dòng)向/文章等等,感覺(jué)很多人對(duì)RxJava還有些許誤會(huì)和“錯(cuò)誤”的理解。所以今天我們從最基礎(chǔ)的開(kāi)始,來(lái)了解一下RxJava。

我們先退回一步,忘了RxJava,討論一個(gè)我們Android開(kāi)發(fā),甚至很多開(kāi)發(fā)都會(huì)遇到的非常棘手的問(wèn)題,異步問(wèn)題。
舉個(gè)例子,我們需要向 MVP/MVVM中的 Model層 取到一組數(shù)據(jù),來(lái)做我們的首頁(yè)展示,比如這樣的:

dribbble.jpg

一個(gè)簡(jiǎn)單的Dribbble頁(yè)面,由于圖片很多,我們不希望在第一次請(qǐng)求就獲得所有的圖片,我們更希望先獲得一些圖片的MetaData,比如url等等。然后在分別異步加載,來(lái)實(shí)現(xiàn)一個(gè)比較好的用戶體驗(yàn)。這里我們先不考慮RecyclerView加Glide的組合,用簡(jiǎn)單的偽代碼來(lái)顯示。

我們的 Model 層需要兩個(gè)方法,一個(gè)是獲取到我們首頁(yè)圖片的MetaData,一個(gè)是根據(jù)MetaData獲取Bitmap。

如果萬(wàn)物皆可同步,那么代碼非常簡(jiǎn)單:

interface Model{
    fun getList() : List<MetaData>

    fun getBitmap(metaData : MetaData) : Bitmap
}

很多同學(xué)喜歡RxJava都是因?yàn)?strong>“鏈?zhǔn)秸{(diào)用”看起來(lái)非常舒服,而鏈?zhǔn)秸{(diào)用或者說(shuō)高階函數(shù)或者操作符并不是RxJava的專利,Java8的 Stream API和Kotlin都有相關(guān)的操作,比如我們的代碼在kotlin中可以這樣調(diào)用

    model.getList()
        .map { model.getBitmap(it) }
        .forEach { showBitMap(it) }

是不是看起來(lái)和你們所謂的優(yōu)雅,簡(jiǎn)潔的RxJava鏈?zhǔn)秸{(diào)用一樣呢?

但是同步意味著阻塞,而網(wǎng)絡(luò)加載Bitmap大家都知道是非常耗時(shí)的。為了不阻塞用戶界面(UI線程),我們希望他在后臺(tái)異步執(zhí)行,執(zhí)行后再輸出到前臺(tái)。
所以我們Android中最簡(jiǎn)單直接的方法就是加入CallBack,來(lái)實(shí)現(xiàn)異步通信。我們的代碼就變成這樣

//定義CallBack
interface CallBack<T> {
    fun onSuccess(t:T)

    fun onError(error:Error)
}

interface Model{
    fun getList(callback:CallBack<List<MetaData>>)

    fun getBitmap(metaData:MetaData, callback:Callback<MetaData>)
}

看過(guò)很多RxJava教程的同學(xué)肯定覺(jué)得這里我要講Callback Hell(回調(diào)地獄)了,然后開(kāi)始展示代碼RxJava來(lái)解決回調(diào)地獄的問(wèn)題,但如果這樣我這篇文章也沒(méi)什么意義了,豈不是和很多入門文章都一樣了?

我們先來(lái)看看為什么我們會(huì)出現(xiàn)回調(diào)地獄?而在同步的時(shí)候卻可以保持我們喜歡的“鏈?zhǔn)秸{(diào)用”
我們?cè)谕降臅r(shí)候,我們做的事情可以簡(jiǎn)化成這樣:
進(jìn)入主界面 -> 通過(guò)getList方法獲取 List<MetaData> -> 根據(jù)list逐一操作獲取bitmap -> 顯示bitmap
可以看到,我們確實(shí)是一條鏈,所以很簡(jiǎn)單的通過(guò)stream api來(lái)實(shí)現(xiàn)“鏈?zhǔn)秸{(diào)用”。

但是異步的時(shí)候呢?
進(jìn)入主界面 -> getList(callback:CallBack<List<MetaData>>)方法將我們的CallBack傳給后臺(tái) -> 等待后臺(tái)回調(diào)我們的CallBack

重點(diǎn)來(lái)了,與同步的不同,我們這里不是直接獲得了我們的List。而是在等待著異步的另一方通知我們。
同步的時(shí)候,我們直接拉取數(shù)據(jù) :

1.png

而異步的時(shí)候,直觀的看我們應(yīng)該是在“等待”數(shù)據(jù),異步對(duì)象向我們推送數(shù)據(jù)。

2.png

3.png

所以在我們的角度,我們是被動(dòng)的,也就是英語(yǔ)中的reactive ,也就是所謂的響應(yīng)式

我們回到我們的例子:

同步的時(shí)候,我們是這樣的

interface Model{
    fun getList() : List<MetaData>

    fun getBitmap(metaData : MetaData) : Bitmap
}

而異步的時(shí)候,我們的方法沒(méi)有了返回值,多了個(gè)參數(shù),所以不能使用漂亮的“鏈?zhǔn)秸{(diào)用”。
這是因?yàn)長(zhǎng)ist 本身,就是一種同步的類型。我們每次操作List,都是對(duì)List來(lái)拉取數(shù)據(jù)。不信?我們來(lái)看下:

大家都知道List并不是最基礎(chǔ)的集合,常用的集合還有HashMap,Set,Table,Vector等等等等。他們都有一個(gè)共同的父類: Iterable<T>

interface Iterable<out T> {
    fun iterator(): Iterator<T>
} 

這里的iterator就是迭代器,他是這個(gè)樣子的

interface Iterator<out T> {

    fun next(): T
    
    fun hasNext(): Boolean
}

使用的時(shí)候也就是我們最麻煩的迭代方式:

    val i = iterator()
    while(i.hasNext()){
        val value = i.next()
    }

所以我們?cè)贘ava中有了foreach,以及后面的stream api等等語(yǔ)法糖。
這里我們看到了,我們每次確實(shí)首先詢問(wèn)List,有沒(méi)有值,如果有我們獲取這個(gè)值,如果沒(méi)有,跳出循環(huán),對(duì)List的操作結(jié)束。讀取完畢。

想象一下,如果我們有一種 AsyncList,對(duì)他的讀取都是AsyncList來(lái)通知我們,然后再和同步的時(shí)候一樣使用高階函數(shù)比如map/foreach等等該多好。比如

interface Model{
    fun getList() : AsyncList<MetaData>

    fun getBitmap(metaData : MetaData) : Bitmap
}

我們就可以像同步一樣,

        model.getList()
        .map { model.getBitmap(it) }
        .forEach { showBitMap(it) }

現(xiàn)在我們來(lái)根據(jù)Iterable設(shè)計(jì)我們的 AsyncList,上面我們知道了Iterable是同步的,是拉取數(shù)據(jù),我們需要的AsyncList是異步的,是他推送數(shù)據(jù)給我們。
我們和List一樣,給所有的異步集合來(lái)一個(gè)父類,來(lái)設(shè)計(jì)一個(gè)AsyncIterable,我們知道Iterable提供Iterator通過(guò)我們主動(dòng)詢問(wèn)Iteratornext,hasNext等方法我們主動(dòng)拉取數(shù)據(jù)。
所以我們的AsyncIterable理論上來(lái)說(shuō),應(yīng)該是我們通過(guò)注冊(cè)AsyncIterator的方式,將我們的AsyncIterator傳遞給AsyncIterable,讓他來(lái)通知我們,實(shí)現(xiàn)異步和推送數(shù)據(jù)。
所以我們的AsyncIterable的實(shí)現(xiàn)應(yīng)該是這樣的

interface AsyncIterable<T> {
    fun iterator(iterator : AsyncIterator<T>) 
} 

(看起來(lái)好像有點(diǎn)眼熟?)

我們?cè)賮?lái)設(shè)計(jì)AsyncIterator,同步的方式兩個(gè)方法,一個(gè)是hasNext,也就是我們主動(dòng)詢問(wèn)iterable接下來(lái)之后還有沒(méi)有值的過(guò)程,如果是異步的方式,這應(yīng)該是我們的AsyncIterable,來(lái)通知我們,他接下來(lái)以后還有沒(méi)有值。
所以變成了這樣:

    fun hasNext(has : Boolean) 

對(duì)的,通過(guò)這種類似CallBack的方式,通知我們有沒(méi)有值。true就是還有值,一旦接收到false,就代表迭代結(jié)束,我們的AsyncIterable已經(jīng)遍歷完成了。
另一個(gè)方法 next() 就是我們來(lái)主動(dòng)詢問(wèn),當(dāng)前的值是什么。所以我們的AsyncIterable就是通過(guò)這個(gè)方法,來(lái)通知我們當(dāng)前的值是什么,依然還是通過(guò)類似CallBack的方式:

    fun onNext(current:T)

(是不是有些眼熟?(手動(dòng)滑稽))

這里有兩個(gè)問(wèn)題:
第一個(gè)問(wèn)題:我們?cè)谶@里隱藏了一個(gè)錯(cuò)誤,因?yàn)閔asNext()方法返回 false的時(shí)候不一定是沒(méi)有接下來(lái)的值了,也有可能是處理當(dāng)前值的時(shí)候出現(xiàn)了某些個(gè)錯(cuò)誤或者異常,這樣他就不能處理接下來(lái)的值,這時(shí)候我們的app就會(huì)崩潰。所以在異步的時(shí)候,我們希望我們的AsyncIterable在出錯(cuò)的時(shí)候,可以通知我們他出錯(cuò)了,我們也就不進(jìn)行接下來(lái)的處理了。所以我們有了:

    fun onError(e:Throwable)

(是不是也有些眼熟?(手動(dòng)滑稽))

第二個(gè)問(wèn)題,在hasNext方法顯然有些過(guò)于多余,因?yàn)樵谕降臅r(shí)候,我們并不知道他究竟接下來(lái)有沒(méi)有值,所以我們每次訪問(wèn)List的時(shí)候,要詢問(wèn)還有沒(méi)有接下來(lái)的值,我們?cè)龠M(jìn)行下一步。而異步的時(shí)候,我們的AsyncIterable肯定知道他自己接下來(lái)有沒(méi)有值了,我們只希望在最后他沒(méi)有值的時(shí)候通知我們結(jié)束了即可,也就是說(shuō)我們之前的 hasNext(true)都是多余的。我們其實(shí)只關(guān)心hasNext(true)被調(diào)用的時(shí)候。所以我們把他簡(jiǎn)化成只有最后結(jié)束的時(shí)候才調(diào)用的方法:

    fun onComplete()

這樣,我們有了我們的AsyncIterator

interface AsyncIterator<T> {

    fun onNext(current:T): 
    
    fun onComplete()

    fun onError(e:Throwable)
}

對(duì)的,他就是我們RxJava中的 Observer,而我們的 Asynciterable 就對(duì)應(yīng)著我們的Observable。

interface Observable<T> {
    fun subscribe(observer : Observer<T>) 
} 

由此,我給Observable下一個(gè)的定義:

Observable 是一組異步數(shù)據(jù)的集合

對(duì)的,他就是一個(gè)集合,和List,Set,Vector一樣。他是一組數(shù)據(jù),Collection可以包含0,1很多甚至無(wú)限個(gè)數(shù)據(jù)。所以Observable也可以包含0,1,n,甚至無(wú)限個(gè)數(shù)據(jù)。

當(dāng)我們?cè)谔幚鞢ollection出現(xiàn)異常時(shí)(比如NullPointerException),我們的程序會(huì)崩潰,不會(huì)有接下來(lái)的處理。所以我們的Observable在收到onError之后,也不會(huì)再有數(shù)據(jù)推送給我們。

Collection可以通過(guò)高階函數(shù)(High Oroder Function)進(jìn)行組合,變換等等,所以作為集合之一的Observable也可以進(jìn)行組合,變換。

對(duì)Iterable進(jìn)行操作,我們是通過(guò)getIterator方法,來(lái)獲得Iterator來(lái)進(jìn)行主動(dòng)詢問(wèn),拉取數(shù)據(jù)實(shí)現(xiàn)迭代。
對(duì)Observable進(jìn)行操作,我們是通過(guò)subscribe方法,來(lái)注冊(cè)Observer來(lái)進(jìn)行被動(dòng)獲取數(shù)據(jù),由Obseravble來(lái)推送數(shù)據(jù),實(shí)現(xiàn)迭代。

我們費(fèi)了這么大力氣,終于抽象出來(lái)一個(gè)異步的集合。那么他的好處是什么呢?

  1. 首先,這種推送數(shù)據(jù)的方式才是我們直觀的,異步操作方法,我們?cè)谏衔牧私饬?,異步操作的時(shí)候,作為接收方。我們是被動(dòng)的,我們沒(méi)辦法詢問(wèn)生產(chǎn)方到底有沒(méi)有完成異步任務(wù)。只有生產(chǎn)方自己才知道他有沒(méi)有完成生產(chǎn),所以他在完成生產(chǎn)后通知我們,并把結(jié)果交給我們這是一種直觀的解決方案。而Java或者其他高級(jí)語(yǔ)言沒(méi)有提供這一方案。我們才自定義CallBack來(lái)實(shí)現(xiàn)回調(diào)。

  2. 在使用CallBack方案的時(shí)候,你知道的信息太多了。舉個(gè)例子,我們上文中的

    fun getList(callback:CallBack<List<MetaData>>)

這個(gè)方法。我們通過(guò)callback知道了,這應(yīng)該是一個(gè)異步操作。可能是耗時(shí)的,所以我們可能需要一個(gè)線程來(lái)執(zhí)行他,執(zhí)行之后,他又會(huì)給我一個(gè)List<MetaData>,而這個(gè)list卻又是同步的。你需要關(guān)心的事情太多了。
俗話說(shuō),把握現(xiàn)在 展望未來(lái)!
我們能處理好現(xiàn)在的事情就已經(jīng)很不錯(cuò)了,Observable則解決了這一問(wèn)題。我們上面的方法改完之后應(yīng)該是這樣的

    fun getList() : Observable<List<MetaData>>

最正確的可能應(yīng)該是這樣的:

    fun getList() : Observable<MetaData>

對(duì)的,因?yàn)镺bservable本身就是個(gè)集合,無(wú)需再和同步的List嵌套使用。但是由于服務(wù)器設(shè)計(jì)原因, Observable<List<T>>這種使用方式還是很常見(jiàn)的。
在Observable我們無(wú)需關(guān)心這個(gè)方法究竟是怎么生成的。我們像往常一樣迭代數(shù)據(jù),我們只需要知道,他生產(chǎn)出數(shù)據(jù)之后,會(huì)通知我即可。
至于你到底怎么生產(chǎn)數(shù)據(jù)給我?在什么線程?是同步的異步的?有沒(méi)有阻塞?

I don't really care!

  1. 操作符,對(duì)的因?yàn)镺bservable是一個(gè)數(shù)學(xué)上的集合。集合就可以進(jìn)行一系列的變換,通過(guò)我們定義的高階函數(shù),比如map,filter等等。這些操作符不是RxJava的專利,他是我們對(duì)集合的一些常見(jiàn)操作。我們對(duì)List,Vector等等也應(yīng)該可以進(jìn)行這些操作,而Java本身沒(méi)有提供這些。在Java 8后通過(guò)stream API補(bǔ)充了這些方法,而RxJava的一大優(yōu)勢(shì)就是不僅僅提供了這個(gè)異步的集合類Observable。還提供了上百個(gè)常用的操作符。

總結(jié)

通過(guò)這篇文章,我的目的是讓你理解究竟什么是Observable,為什么Observable是這么設(shè)計(jì)的,好處是什么,解決了什么問(wèn)題。
而答案也很明顯。
Observable是一組異步數(shù)據(jù)的集合,因?yàn)楫惒讲僮骱屯讲僮饔兄举|(zhì)上的區(qū)別(推送數(shù)據(jù)和拉取數(shù)據(jù))所以我們根據(jù)iterable反過(guò)來(lái)設(shè)計(jì)observable。
好處是保持了數(shù)學(xué)上的集合定義,擺脫了Callback,通過(guò)操作符(高階函數(shù))可以對(duì)集合實(shí)現(xiàn)一些變換操作。解決了通常情況異步操作不直觀,復(fù)雜,回調(diào)地獄等等問(wèn)題。

參考文獻(xiàn)(部分鏈接可能需要梯子)

  1. A Playful Introduction to Rx by Erik Meijer
  2. Expert to Expert: Brian Beckman and Erik Meijer - Inside the .NET Reactive Framework (Rx)
  3. ReactiveX
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容