終于到了分析源碼的部分了。很多朋友在使用過RxJava之后都會覺得這個庫很玄妙,竟然能把事件發(fā)生的源不停的通過不同的操作符改變。比如說這次介紹的map就是,在抽象的概念上,我們經(jīng)常要求使用者要把map操作符當(dāng)成改變源stream的一個方法,也就是說map把整個事件的發(fā)射流重新構(gòu)造了一次。

但是其實map操作符真的創(chuàng)造了一個新的流么?

這個答案是非常明顯的,如果RxJava的操作符都是這樣要等源發(fā)射流全部分析完再重新構(gòu)造的話(一個map重新遍歷一次的話),效率上肯定說不過去。所以我們可以肯定一點,即使再多個map(),我們肯定也是一邊發(fā)射源stream的元素一邊進(jìn)行map()里面的轉(zhuǎn)換的。RxJava并不是一個新的語言,java沒有的它也沒有,一切你們看到的操作符,只是對java現(xiàn)有的一些常用api的整合而已。尤其是我們經(jīng)常用的subscribeOn,observeOn,map,等等,都只是把引用,線程池,ExecutorService玩耍出來的組合。
當(dāng)然,我的意思不是說RxJava很簡單。。。。只是作者們在Java的基礎(chǔ)上,非常優(yōu)雅的把java現(xiàn)有的東西完美的結(jié)合起來,使得這個庫使用起來就像函數(shù)式編程的感覺。
那RxJava到底是怎么優(yōu)雅的實現(xiàn)的呢?
在我們深入代碼之前,我們先來補(bǔ)充一些細(xì)節(jié):
1.使用鏈?zhǔn)秸{(diào)用產(chǎn)生新對象,優(yōu)雅的讓新對象持有源對象的引用的。
比如 下面一個例子:

大家可以看到line 13,這個方法generateNext()返回了一個新的Ob對象,但是把自己-this,傳給了新生成的對象,新的對象就獲取了源對象的引用了。于是在我不斷的鏈?zhǔn)秸{(diào)用方法generateNext()的時候,雖然最后獲取的是最后一個Ob對象,但是我的新對象已經(jīng)擁有一條完整的,通向源對象的鏈了。不同于以往我們一般的setNext()這種方法,generateNext()返回一個新的Ob對象,所以程序可以以一個鏈?zhǔn)降慕Y(jié)構(gòu)寫出來。
這里可以提前告訴大家,RxJava里面的Observable就是這樣的,每一次鏈?zhǔn)降恼{(diào)用操作符,比如map,返回的都是一個新的Observable,而新的Observable是持有舊的Observable的引用的(其實也不是整個Observable,是Observable的一個成員對象)。
2.一個Observable所需要的元素
一個Observable需要兩個最重要的元素,一個是OnSubscribe對象,一個是Subscriber。前者負(fù)責(zé)定義整個Observable所要執(zhí)行的action,后者則是定義怎樣處理每一個發(fā)射的元素,一般來說后者都是用戶自己實現(xiàn)的。
比如以下例子:

這個例子很簡單,我們創(chuàng)建了一個發(fā)射數(shù)字1和2的流,在create方法里面,我們創(chuàng)建了一個新的對象OnSubscribe,它定義了我們的事件發(fā)生的順序的元素的個數(shù)種類。而subscribe()方法里面我們創(chuàng)建了一個新的Subscriber對象。讓我們看看create()方法做了些啥。


僅僅是return了一個Observable,然后把OnSubscribe對象的引用拿到手而已。
然后每次我們call subscribe()方法的時候發(fā)生了啥呢?我們把代碼跟蹤到subscribe(),


看到了嘛?每次在subscribe()的時候,我們會調(diào)用我們Observable里面的onSubscribe對象的call方法,參數(shù)就是我們傳進(jìn)subscribe()的subscriber對象(當(dāng)然嚴(yán)格意義上來講并不是同一個subscriber,subscribe()方法在前面一點的地方稍微包裝了一下我們傳進(jìn)的subscriber,但是我們可以理解為同一個)。

所以總結(jié)一句,每一個要發(fā)射元素的Observable,都必須會有一個OnSubscribe對象和Subscriber。前者定義執(zhí)行的順序、事件,后者處理。
3.分析!
ok,在了解了這些先決知識之后,我們可以深入分析map到底是怎么工作的了。


讓我們重點看看圖2,lift()方法生成了一個新的Observable,新的Observable是干嘛的?
1.新的Observable的OnSubscribe對象的call()方法里面,我們把用戶定義的Subscriber對象,line 156-158 ,通過我們的轉(zhuǎn)換操作符map,轉(zhuǎn)換成了一個新的subscriber
2.生成新的subscriber,傳給上一個Observable的onSubscribe對象執(zhí)行call(). 大家一定要仔細(xì)看清楚,line 162的這個OnSubscribe,是新的Observable的對象還是舊的Observable的對象。
簡單的用圖來解釋一下可能更清楚:

新生成的Observable會在用戶call subscribe()的時候,把原有的,用戶自己定義的subscriber修改了之后(lift()里面做的),傳給上一個Observable的OnSubscribe對象處理。
那么在第一步的時候,我們對用戶原有的subscriber動了什么手腳?
這是第二重要的點,用戶原有的subscriber會在lift()里面,被map這個Operator改造

這個Operator就是OperatorMap。讓我們看看這個Operator的call方法究竟做了些啥,讓原有的subscriber變成一個怎樣的新的subscriber。

乍一看!好簡單。
的確,map這個操作符的確很簡單,讓我們來看看它把原來的,用戶定義的subscriber怎么改造了。
1.line 39, 我們可以看到call()方法返回一個新的subscriber
2.line 54, 這個o對象是啥?沒錯,就是原來舊的subscriber,用戶自己創(chuàng)建的subscriber。
3.line 54, ?transformer就是我們轉(zhuǎn)換的方法,也就是map()里面的Func1對象。
也就是說,新的subscriber每次在執(zhí)行onNext的時候,其實是把上一層Observable發(fā)射的元素先用map里面在轉(zhuǎn)換方程轉(zhuǎn)換成新的類型,然后再用下一層的subscriber去發(fā)射新的類型的元素。
比如我們把一組Integer轉(zhuǎn)換成String對象,在上圖里面的新的Subscriber的泛型T就是Integer,舊的Susbcriber(用戶自己定義的Subscriber)的泛型就是R。
這里就很有意思了,這個過程里面發(fā)生了很多有趣的事情。
我們在subscribe()之后,會把原有的subscriber不停的包裝,每map()一次就會包裝一次,一路傳給最頂端,最原始的Observable,然后在發(fā)射元素的時候,會不停的調(diào)用轉(zhuǎn)化方程,發(fā)送給下一級的subscriber。
為什么說這個過程很有趣呢?有趣的地方在于,這個過程像在爬山一樣,最開始我們要一路往上走,最下面的Observable持有上一層的Observable的引用,而在最頂端的原始Observable要發(fā)射元素的時候,subscriber已經(jīng)是包裝過好多層的subscriber了,執(zhí)行onNext() 的時候,會一路往下調(diào)用下一層的subscriber的onNext().,所以回到我們在文章最開頭講的,每一次一個元素在發(fā)射之后,我們會即刻執(zhí)行該元素的轉(zhuǎn)換(也就是不停的執(zhí)行下一級的onNext()),而不是重新構(gòu)造一個新的流。
換句話說,Observable的執(zhí)行鏈一路向上,Subscriber的執(zhí)行鏈一路向下,他們產(chǎn)生鏈的方向恰好是相反的。
不過想想也是合情合理的,因為從邏輯上來講,我們create的一個Observable的是一個原始的Observable,只擁有原始數(shù)據(jù),但是用戶卻是提供的一個不同參數(shù)類型的,最終類型版本的Subscriber,原本方向就是相反的。
讓我們再把思維拓展一下,發(fā)生多層map的過程是咋樣的:

再看完整篇文章之后,再來體會一下這張圖,是不是豁然開朗了呢?產(chǎn)生新的Observable向下,產(chǎn)生新的Subscriber向上,執(zhí)行onNext向下。
三個字總結(jié),
下上下!

創(chuàng)建新的Observable一路向下,創(chuàng)建新的Subscriber一路向上,最后每次發(fā)射元素的時候一路向下直到到達(dá)用戶創(chuàng)建的subscriber為止。
希望大家能好好理解一下這里設(shè)計的精妙之處。下周會帶來subscribeOn()操作符的源碼講解。
