RxSwift - 核心流程源碼解析

由于項目的需要,筆者最近在學習swift,為了更加高效的擼代碼,研究了RxSwift這個牛逼的庫,對于它的核心流程,進行了一番探索(實際上耗費了畢生心血),對于下面的敘述,如果錯誤,還望大家多多留言指正,謝謝了??????

核心代碼塊

    override func viewDidLoad() {
        super.viewDidLoad()
        //1、創(chuàng)建序列
        let ob = Observable<Any>.create { (obserber) -> Disposable in
            //3、發(fā)送信號
            obserber.onNext("佐籩")
            obserber.onError(NSError.init(domain: "1234", code: 10086, userInfo: nil))
            obserber.onCompleted()
            return Disposables.create()
        }
        
        //2、訂閱信號
        let _ = ob.subscribe(onNext: { (text) in
            print("訂閱到:\(text)")
        }, onError: { (error) in
            print("錯誤:\(error)")
        }, onCompleted: {
            print("完成")
        }) {
            print("銷毀")
        }
    }

運行代碼結果如下:

image.png

核心代碼如上代碼所示,共三步

  • 創(chuàng)建序列
  • 訂閱信號
  • 發(fā)送信號

那么對于上面的流程中提出一下幾個問題:

  • 序列是如何創(chuàng)建?
  • 序列是如何接受到發(fā)送的信號的?
  • 整個流程又是如何實現(xiàn)的?

下面筆者會跟著大家一起進入源碼,一探究竟!
想要具體了解原理的小伙伴,最好帶著源碼進行閱讀下面的內容,便于你的理解

源碼解析

創(chuàng)建序列

序列的創(chuàng)建很簡單,就是下面一行代碼

let ob = Observable<Any>.create { 閉包A }

進入查看create,這里的create只能在create.swift文件里查看,不然跟蹤不到這個方法??

可是,為神馬會走到這里來呢?

進入對象Observable查看,會得到下面的繼承關系

image.png

所以在ObservableType的擴展中查看create方法,具體實現(xiàn)如下

    public static func create(_ subscribe: @escaping (AnyObserver<Element>) -> Disposable) -> Observable<Element> {
        return AnonymousObservable(subscribe)
    }

可以看到,返回了一個AnonymousObservable對象,并且傳入了一個參數(shù)subscribe,通過創(chuàng)建的代碼可以知道,這里的subscribe就是外面我們定義的閉包,后面文章中筆者會稱為它為閉包A,也就是發(fā)送信號的部分。

進入AnonymousObservable類的查看源碼,

final private class AnonymousObservable<Element>: Producer<Element> {
    typealias SubscribeHandler = (AnyObserver<Element>) -> Disposable

    let _subscribeHandler: SubscribeHandler

    init(_ subscribeHandler: @escaping SubscribeHandler) {
        self._subscribeHandler = subscribeHandler
    }

    override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
        let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
        let subscription = sink.run(self)
        return (sink: sink, subscription: subscription)
    }
}

這個類的實現(xiàn)很簡單,通過init初始化方法可以看到,閉包A被保存在了_subscribeHandler屬性中。 同時這里還提供了一個非常重要run方法,后面會走到這里,所以這里先強調一下。

到此,序列的創(chuàng)建就結束了,下面在看訂閱信號的解析

訂閱信號

        //2、訂閱信號
        let _ = ob.subscribe(onNext: { (text) in
            print("訂閱到:\(text)")
        }, onError: { (error) in
            print("錯誤:\(error)")
        }, onCompleted: {
            print("完成")
        }) {
            print("銷毀")
        }

從上面代碼可以知道,重點解析在于方法subscribe,進入源碼,過濾掉其他無關代碼如下

image.png

嗯,不錯,就這兩行代碼,信號的訂閱到發(fā)送,就結束了。

此刻請允許筆者通過下面內容來表達一下此刻的內心

image.png

嗯。。。耐下心來,慢慢琢磨可以知道,想要突破,那么關鍵代碼就在下面這一行

self.asObservable().subscribe(observer)

通過斷點調試,可以知道這里的self表示著AnonymousObservable,或者通過我們寫的核心代碼塊也可以知道,首先創(chuàng)建了一個序列ob,它的類型上面說過了是AnonymousObservable類型,接著訂閱信號是直接用ob調用的方法subscribe,所以這里的self表示著AnonymousObservable,它就是創(chuàng)建序列時返回的對象。

但是在上面對AnonymousObservable解析中,并沒有發(fā)現(xiàn)它擁有一個叫做subscribe的方法,再次回到AnonymousObservable類的源碼中,可以發(fā)現(xiàn),它是繼承自Producer類,不錯,在它的父類Producer中我們發(fā)現(xiàn)了subscribe方法。

這里停一下,因為此時傳了一個參數(shù)observer,它也是一個AnonymousObserver對象,同時帶著一個尾隨閉包,暫且稱這個參數(shù)為閉包B,以便后面使用它的時候,我們可以容易想起來。

接著上面的思路,進入Producer類中查看,在subscribe方法中,調用了一個run方法,本類中,并沒有對這個方法進行具體實現(xiàn),而是分發(fā)給了它的子類去實現(xiàn),那么這里又需要回到AnonymousObservable類中,在上面的序列創(chuàng)建的內容中,筆者也就和大家說了這里run方法的重要,下面具體分析。

在此之前,需要注意的是,參數(shù)閉包B一直在傳遞。

    override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
        let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
        let subscription = sink.run(self)
        return (sink: sink, subscription: subscription)
    }

創(chuàng)建了一個實例對象sink,然后調用了一個run方法。這里看不出來什么東西,進入AnonymousObservableSink查看源碼。

可以發(fā)現(xiàn)下面幾點

  • 第一,重寫了父類的init方法,而傳進來的閉包B,傳遞給了父類Sink,同時賦值給了父類的_observer屬性。
  • 第二,方法on,參數(shù)為event
  • 第三,方法run,參數(shù)parent

OK,AnonymousObservableSink類的內容,差不多就這么多,回到之前的代碼中

let subscription = sink.run(self)

這行代碼里的self表示類AnonymousObservable,我想大家應該都沒有意見,那么具體看下這個方法的實現(xiàn)。

    func run(_ parent: Parent) -> Disposable {
        return parent._subscribeHandler(AnyObserver(self))
    }

轉化一下代碼AnonymousObservable._subscribeHandler(AnyObserver(self)),這樣看的話,我相信大家應該非常的熟悉了。

AnonymousObservable是什么? 它是在我們創(chuàng)建序列時的返回的那個對象。
_subscribeHandler是什么? 它是在我們創(chuàng)建序列時傳的參數(shù)閉包A。

那么這句話的意思就是執(zhí)行閉包A,即發(fā)送信號

發(fā)送信號

分兩步來分析AnonymousObservable._subscribeHandler(AnyObserver(self))這行代碼;

1、發(fā)送信號的閉包

回到發(fā)送信號的閉包中,

obserber.onNext("佐籩")

這里只分析onNext(),因為其他原理一樣,就不過多描述了啊!

按住command,追蹤onNext()方法,可以跳轉到ObserverType擴展中

    public func onNext(_ element: Element) {
        self.on(.next(element))
    }

可以通過斷點調試知道,這里的self指的是AnyObserver類,或者可以回到create.swift文件中的create方法中查看

image.png

這里已經把對應的obserber的類型已經描述的很清晰了。obserberAnyObserver類型,且遵循ObserverType協(xié)議,調用的onNext()方法就是ObserverType協(xié)議里的方法(查看一下AnyObserver類的源碼便知)。

回到AnyObserver類中,查看on方法

    public func on(_ event: Event<Element>) {
        return self.observer(event)
    }

可以斷點看一下,這里已經把信號傳了進來

image.png

那么這里的self.observer()具體是什么呢?請看下面的分析

2、解析 AnyObserver(self)

通過上面流程可以知道,關鍵代碼在于AnyObserver(self),通過進入源碼查看

    public init<Observer: ObserverType>(_ observer: Observer) where Observer.Element == Element {
        self.observer = observer.on
    }

在初始化代碼里有這么一個操作,observer.on賦值的操作,這里的observer是通過初始化構造方法傳來的,回顧上面的流程可以知道,它是AnonymousObservableSink類。那么它執(zhí)行on方法,跟著代碼走,會調用方法forwardOn,最后會來到Sink類里的forwardOn方法,源碼如下

    final func forwardOn(_ event: Event<Observer.Element>) {
        #if DEBUG
            self._synchronizationTracker.register(synchronizationErrorMessage: .default)
            defer { self._synchronizationTracker.unregister() }
        #endif
        if isFlagSet(self._disposed, 1) {
            return
        }
        self._observer.on(event)
    }

關鍵代碼在于self._observer.on(event),在訂閱信號的分析中,特別說明了_observer是閉包B,如果這里有疑問的小伙伴,可會回到前面看一下。

閉包B的具體結構如下(部分代碼省略):

AnonymousObserver<Element> { event in
                switch event {
                case .next(let value):
                    onNext?(value)
                case .error(let error):
                    if let onError = onError {
                        onError(error)
                    }
                    else {
                        Hooks.defaultErrorHandler(callStack, error)
                    }
                    disposable.dispose()
                case .completed:
                    onCompleted?()
                    disposable.dispose()
                }
            }

那么對于self._observer.on(event),可以理解為類AnonymousObserver調用方法on,進入源碼查看,AnonymousObserver類中并無該方法,然后通過它的父類ObserverBase可以找到該方法,具體實現(xiàn)如下:

    func on(_ event: Event<Element>) {
        switch event {
        case .next:
            if load(self._isStopped) == 0 {
                self.onCore(event)
            }
        case .error, .completed:
            if fetchOr(self._isStopped, 1) == 0 {
                self.onCore(event)
            }
        }
    }

緊接著,執(zhí)行self.onCore(event),同樣的,該方法,父類ObserverBase并沒有具體實現(xiàn),而是分發(fā)給子類去實現(xiàn),回到類AnonymousObserver,查看該方法的實現(xiàn)代碼

    override func onCore(_ event: Event<Element>) {
        return self._eventHandler(event)
    }

那么對于self._eventHandler()是個什么東西,可以具體分析一下,在當前類AnonymousObserver中,

    typealias EventHandler = (Event<Element>) -> Void
    
    private let _eventHandler : EventHandler

追溯到枚舉Event

public enum Event<Element> {
    /// Next element is produced.
    case next(Element)

    /// Sequence terminated with an error.
    case error(Swift.Error)

    /// Sequence completed successfully.
    case completed
}

到這里,我想大家應該可以知道下面我們應該走到哪里了,沒錯,就是剛剛說的閉包B,根據(jù)event執(zhí)行不同的方法,比如這里所說的onNext方法,那么就會打印輸出訂閱到:佐籩

總結

OK,到這里關于RxSwift的核心流程的源碼分析已經結束了,了解了其中的原理,筆者相信,對于RxSwift的運用會更加得心應手。

好難,筆者在整理這份筆記的時候,確實花費了不少時間,本想畫個流程圖來幫助大家去理解,奈何時間有限,后續(xù)筆者會整理一份思路較清晰的流程圖;

上述的分析流程,如果有什么錯誤,希望大家給筆者留言指正一下,謝謝?。?!

持續(xù)學習,希望大家持續(xù)關注?。?!

?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內容

友情鏈接更多精彩內容