在之前有淺淺的分享了一下RxSwift簡單使用,
但是同樣的也有一些困惑伴隨著我,比如它是如何實現(xiàn),為什么所有的對象類都可以使用rx方法呢,再比如Timer實現(xiàn)方式為什么跟原生的又差別如此之大呢,帶著這些個疑問,就想著看一下這強大的庫是如何實現(xiàn)的,下面大概分享一下個人的拙見;
RxSwift本質(zhì)上就是信號的產(chǎn)生、訂閱、發(fā)送跟銷毀,核心邏輯就是產(chǎn)生、訂閱、發(fā)送三步曲:1、創(chuàng)建信號 2、訂閱信號 3、發(fā)送信號,下面就以一個最簡單信號創(chuàng)建訂閱流程來分析一下,它內(nèi)部是怎么實現(xiàn)的;
先創(chuàng)建Observable可觀察者對象,然后使用subscribe訂閱,最后第三步發(fā)送信號就是隱藏步驟,實際開發(fā)中,我們不需要去直接調(diào)用onNext、onError操作;
class ViewController: UIViewController {
let disposeBag = DisposeBag()
override func viewDidLoad() {
super.viewDidLoad()
// 1、創(chuàng)建信號
let ob = Observable<Any>.create { observer in
// 3、發(fā)送信號
observer.onNext("下一步")
// observer.onError(NSError.init(domain: "Chris's error", code: 10086, userInfo: nil))
observer.onCompleted()
return Disposables.create()
}
// 2、訂閱信號
let _ = ob.subscribe { text in
print("訂閱到了:\(text)")
} onError: { error in
print("error:\(error)")
} onCompleted: {
print("完成")
} onDisposed: {
print("銷毀")
}
.disposed(by: disposeBag)
}
}
一、創(chuàng)建信號
1、Observable<Any>.create
通過Observable<Any>.create創(chuàng)建一個可觀察對象,傳入一個尾隨閉包作為參數(shù),點進去create看一下其內(nèi)部實現(xiàn),發(fā)現(xiàn)create是ObservableType的一個擴展方法;

而ObservableType其實是一個協(xié)議,繼承自O(shè)bservableConvertibleType,Observable遵循了ObservableType協(xié)議,即Observable調(diào)用ObservableType協(xié)議里的create方法;

2、AnonymousObservable.subscribeHandler
create方法內(nèi)部就一句代碼 AnonymousObservable(subscribe),AnonymousObservable這個字面意思,是個匿名的可觀察者,把subscribe傳給AnonymousObservable,這個subscribe是我們創(chuàng)建的尾隨閉包,那么AnonymousObservable里面又做什么處理呢?只能繼續(xù)往下翻了;
public static func create(_ subscribe: @escaping (AnyObserver<Element>) -> Disposable) -> Observable<Element> {
AnonymousObservable(subscribe)
}
點進去AnonymousObservable看,發(fā)現(xiàn)AnonymousObservable給回調(diào)閉包取了個別名,又定義了一個全局變量subscribeHandler,保存我們外面?zhèn)鬟M來的閉包;

這里我們看到AnonymousObservable是繼承的Producer,而Producer點擊發(fā)現(xiàn),它其實是繼承自O(shè)bservable;

到這一步,其實我們創(chuàng)建訂閱信號的步驟就已經(jīng)完成了;可能有點繞,總結(jié)一下,本質(zhì)的思想就是通過父類Observable創(chuàng)建的訂閱信號閉包,交給子類AnonymousObservable去保存實現(xiàn);
二、訂閱信號
1、創(chuàng)建一個AnonymousObserver訂閱者
上面我們已經(jīng)說了,我們創(chuàng)建的ob對象,其實是AnonymousObserver對象,所以此處subscribe就是創(chuàng)建一個AnonymousObserver訂閱者;

2、AnonymousObserver初始化的時候保存eventHandler
前面的disposable這些銷毀對象的創(chuàng)建先不看,重點看return返回值,self.asObservable().subscribe(observer) 這個做為參數(shù)傳給可銷毀對象Disposables;
observer這個其實就是訂閱者,它是AnonymousObserver對象,繼承自O(shè)bserverBase,后面?zhèn)魅氲膮?shù)是個事件回調(diào)閉包,AnonymousObserver就是將eventHandler事件保存下來;注意到里面還有個onCore方法,里面是調(diào)用eventHandler執(zhí)行操作;

AnonymousObserver的父類ObserverBase,它里面有個on方法,里面就一個switch方法,Event是個枚舉類,往下看就能看到我們熟悉next\error\completed,再往后就是調(diào)用onCore方法,這個調(diào)用的時機后面具體分析;


3、self.asObservable().subscribe(observer) 方法調(diào)用
接下來看,self上面也說了,是AnonymousObservable對象,asObservable也不用過多關(guān)心,其實就是類似OC里面的多態(tài),強制性返回Observable Class,重點看subscribe方法調(diào)用,傳入的observer參數(shù),上面步驟2已經(jīng)說過了;

4、producer.subscribe
subscribe方法點擊jump發(fā)現(xiàn)好多地方都有該方法,如上文所說,ob本質(zhì)是AnonymousObservable對象,他們的繼承鏈關(guān)系A(chǔ)nonymousObservable->Producer->Observable;Observable又遵循ObservableType協(xié)議,上訴幾個類都有實現(xiàn)subscribe方法,具體也不知道要執(zhí)行哪一個方法,所以這個時候找起來就比較麻煩了,我這個比較懶,不想挨個去找了,此時最簡單的方法其實就是查看調(diào)用堆棧了;

這里可以發(fā)現(xiàn)asObservable().subscribe是執(zhí)行的父類Producer里面的subscribe方法,其實仔細查看源碼也會發(fā)現(xiàn),AnonymousObservable沒有實現(xiàn)subscribe方法,而Observable里面只是調(diào)用rxAbstractMethod()構(gòu)造方法而已,做一些錯誤處理,也沒有具體實現(xiàn),所以關(guān)鍵代碼還是在Producer里;
分析Producer里面的方法,主要就是通過不同條件,執(zhí)行不同代碼,其實本質(zhì)執(zhí)行的都是執(zhí)行下面三行代碼;
let disposer = SinkDisposer()
let sinkAndSubscription = self.run(observer, cancel: disposer)
disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
這個schedule就先不深究了,調(diào)用之后還是會執(zhí)行action方法,就是剛才subscribe里面的閉包函數(shù);

5、AnonymousObservable.run
重點還是看一下上面run方法;調(diào)用的AnonymousObservable對象本身的run方法,run里面又調(diào)用AnonymousObservableSink的run方法,這個AnonymousObservableSink又是啥子?xùn)|西???只能接著往下看了

6、AnonymousObservableSink.run
AnonymousObservableSink是繼承自Sink,Sink是什么先不深究,里面定義了一些方法實現(xiàn),里面保存了observe、cancel對象;

7、parent.subscribeHandler(AnyObserver(self))
看當前的這個run方法,里面就一句代碼,parent就是步驟5傳入的self,即AnonymousObservable對象,即表示AnonymousObservable調(diào)用subscribeHandler方法;

這個時候就串起來了,前文創(chuàng)建信號的時候講過創(chuàng)建Observable的訂閱信號的時候,交給子類AnonymousObservable去保存,這個時候就是調(diào)用之前保存的閉包回調(diào)了,就會來到下面這一步;

三、發(fā)送信號
1、 AnonymousObserver.onNext執(zhí)行
根據(jù)上面的一系列操作,我們已經(jīng)可以執(zhí)行create里面的回調(diào)了,這一步我們開頭的時候也說了,實際開發(fā)中不需要我們手動去調(diào)用onNext、onError等方法,但是既然我們是探究他的原理,那就繼續(xù)往下看;
這個observer是個什么東西?為什么能調(diào)用onNext等方法呢?
上面步驟7有提到,subscribeHandler.(AnyObserver(self)),我們可以得出observer == AnyObserver(self),那我們點開AnyObserver,發(fā)現(xiàn)它其實就是個結(jié)構(gòu)體,遵循了ObserverType協(xié)議,用observer保存了AnonymousObservableSink的on方法,AnonymousObservableSink我們在訂閱信號的步驟6有提到,截圖中有被收納起來的on方法;
所以create中的observer =AnyObserver(self),里面AnyObserver所持有的對象self.observer = AnonymousObservableSink.on;

2、ObserverType.onNext
到了這一步,我們還是不知道onNext怎么來的,既然它自身沒有實現(xiàn),那么只能去看他的協(xié)議方法了,果不其然,在協(xié)議擴展方法里面實現(xiàn)了onNext,往下看,其實他是調(diào)用當前的on方法,傳入.next枚舉外帶value值;

3、AnonymousObservableSink.on
繼續(xù)走會發(fā)現(xiàn),調(diào)用了AnyObserver本身的on方法,實現(xiàn)就一行代碼;
self.observer(event)
上面已經(jīng)分析出了observer == AnonymousObservableSink.on,其實這里就是調(diào)用AnonymousObservableSink的on方法,繼續(xù)往下看看sink.on里面做了什么操作;

下面是AnonymousObservableSink的on方法,這個方法很眼熟,跟前文中提到的ObserverBase的on方法很類似,只是ObserverBase的on最后是調(diào)用的onCore方法,這邊調(diào)用的是forwardOn;

4、Sink.forwardOn
forwardOn當前類沒有實現(xiàn),只能去它的父類找,繼續(xù)網(wǎng)上找,發(fā)現(xiàn)它是調(diào)用的self.observer.on方法,這個self.observer之前訂閱信號的步驟6有提到過,sink保存observer跟cancel對象用來后續(xù)處理,唉,這邊就用到了;

5、ObserverBase.on
這個self.observer打印發(fā)現(xiàn)它其實是AnonymousObserver類,繼承至ObserverBase,所以到這一步,還是調(diào)用我們前文提到的ObserverBase的on方法,在往下執(zhí)行onCore;

6、AnonymousObserver.onCore->self.eventHandler()
上一步驟的self其實是AnonymousObserver,那么onCore往下執(zhí)行就到了我們訂閱信號的步驟2提到的,執(zhí)行AnonymousObserver保存下來eventHandler事件;

這個eventHandler事件,在訂閱信號創(chuàng)建的時候,被我隱藏了,沒有展開講,這邊展開來看一下,里面到底有什么秘密;
7、onNext、onError、onCompleted、dispose執(zhí)行

點進去發(fā)現(xiàn)這個閉包就是響應(yīng)event事件,對不同事件執(zhí)行不同方法,onNext就是我們subscribe傳入的閉包回調(diào),value就是我們onNext傳入的值;
public func onNext(_ element: Element) {
self.on(.next(element))
}

到這里我們整個創(chuàng)建、訂閱、發(fā)送信號整個過程就已經(jīng)分析完了,前前后后執(zhí)行了二十多個方法;里面的涉及了很多的繼承、擴展、協(xié)議等等,可能有點繞;
四、小結(jié):
總的流程有點繞,下面簡單梳理一下:
首先是帶able結(jié)尾的信號生產(chǎn)者繼承鏈關(guān)系:

然后是帶observer結(jié)尾的信號訂閱者繼承鏈關(guān)系:

整體核心流程如下:

從圖中也可以看出,Observable、Observer分工明確,sink是起到承上啟下的作用,同時保存了observer、cancel,訂閱信號的run方法,發(fā)送信號的on方法,都是從這邊調(diào)用的,這個類就相當于一個業(yè)務(wù)中間層,所有業(yè)務(wù)邏輯都在這里處理,通過這個中間件可以串聯(lián)Observable信號生產(chǎn)者跟Observer信號訂閱者;
RxSwift使用了大量的繼承、協(xié)議、擴展,實現(xiàn)了接口分離,模塊分工清晰,里面設(shè)計雖然很復(fù)雜,但是暴露API都是很簡單的,讓開發(fā)者只想關(guān)心當前業(yè)務(wù)的開發(fā),無需關(guān)心業(yè)務(wù)直接的調(diào)度,這種設(shè)計模式可以很好的實現(xiàn)業(yè)務(wù)分離。
我最近做的音視頻開發(fā)的模塊,視頻的渲染跟解碼就是通過這種方式使之分離,誰要渲染直接訂閱當前暴露的接口就可以,調(diào)度者就根據(jù)訂閱的信號去返回AVframe就可以,也無需關(guān)心是誰訂閱的,同樣的,音頻幀處理也是這樣。其實我們平時開發(fā)中也可以借鑒這種設(shè)計模式,分析框架最主要的就是學(xué)習(xí)其優(yōu)秀的設(shè)計模式,為我所用;
以上,就是個人理解的RxSwift大體實現(xiàn)流程,如有不對,歡迎指正!?。?/p>
CSDN地址:https://blog.csdn.net/weixin_37498529/article/details/126560786
知乎地址:https://zhuanlan.zhihu.com/p/560495890