解析RxSwift核心流程

RxSwift的核心流程可以簡化為三個步驟:

  • 創(chuàng)建序列
  • 訂閱序列
  • 發(fā)送信號
// 創(chuàng)建序列
Observable<Int>.create { (anyObserver) -> Disposable in
    // 發(fā)送信號
    anyObserver.onNext(2)
    return Disposables.create()
    }
    // 訂閱序列
    .subscribe(onNext: { (element) in
        print("訂閱到: \(element)")
    })
    .disposed(by: DisposeBag())

在執(zhí)行這行代碼得到的結(jié)果是: 訂閱到: 2 , 那么在RxSwift內(nèi)部是在什么時候開始發(fā)送信號(其實就是create(_ subscribe:)中的 subscribe閉包 什么時候執(zhí)行),又是什么時候訂閱到結(jié)果(就是subscribe(onNext:)中的 onNext閉包 什么時候執(zhí)行)。

創(chuàng)建序列

因為 Observable 繼承于 ObservableType, 所以點擊 create 方法可以看到 ObservableType 的擴展方法

extension ObservableType {
    public static func create(_ subscribe: @escaping (AnyObserver<Element>) -> Disposable) -> Observable<Element> {
        return AnonymousObservable(subscribe)
    }
}

create 方法中,其實是初始化了 AnonymousObservable 類的對象,而在初始化的時候保存了 SubscribeHandler 閉包。

訂閱序列

  • subscribe(onNext: onError: onCompleted: onDisposed:) 方法也是 ObservableType 的擴展方法,實現(xiàn)如下(忽略不關(guān)心的代碼):
public func subscribe(onNext: ((Element) -> Void)? = nil, onError: ((Swift.Error) -> Void)? = nil, onCompleted: (() -> Void)? = nil, onDisposed: (() -> Void)? = nil)
        -> Disposable {

            let observer = AnonymousObserver<Element> { event in
                
                switch event {
                case .next(let value):
                    onNext?(value)
                case .error(let error):
                    if let onError = onError {
                        onError(error)
                    }
                    else {
                        Hooks.defaultErrorHandler(callStack, error)
                    }
                    disposable.dispose()
                case .completed:
                    onCompleted?()
                    disposable.dispose()
                }
            }
            return Disposables.create(
                self.asObservable().subscribe(observer),
                disposable
            )
    }
  • 在這個方法中我們關(guān)注兩個方法:

    • 初始化 AnonymousObserver(匿名觀察者) 對象 observer,保存了 EventHandler 閉包
    • self.asObservable() 調(diào)用了 subscribe(observer)
  • 在前面創(chuàng)建序列分析 create 方法時知道這里的 self.asObservable() 其實就是 AnonymousObservable() 對象,那么self.asObservable().subscribe(observer) => AnonymousObservable().subscribe(observer)

  • 由于 AnonymousObservable 繼承于 Producer, 在 Producer 類中找到 subscribe(observer)的實現(xiàn)如下:

override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
        if !CurrentThreadScheduler.isScheduleRequired {
            // 省略部分代碼
            ...
            return disposer
        }
        else {
            return CurrentThreadScheduler.instance.schedule(()) { _ in
                let disposer = SinkDisposer()
                let sinkAndSubscription = self.run(observer, cancel: disposer)
                // 省略部分代碼
                ...
                return disposer
            }
        }
    }
  • subscribe(_ observer:) 方法中其實調(diào)用的就是 self.run(observer, ...), run(observer, ...) 的具體實現(xiàn)是在 AnonymousObservable 中:
override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
        let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
        let subscription = sink.run(self)
        return (sink: sink, subscription: subscription)
    }
  • 我們可以看到在這里初始化了 AnonymousObservableSink 類的對象,保存在外面創(chuàng)建的屬于AnonymousObserver 類的 observer 對象。
  • sink.run 中的實現(xiàn)為:
typealias Parent = AnonymousObservable<Element>
func run(_ parent: Parent) -> Disposable {
        return parent._subscribeHandler(AnyObserver(self))
}
  • 這里的 parent 就是 AnonymousObservable()對象
  • 到此前面 AnonymousObservable(subscribe) 保存的那份閉包會開始執(zhí)行,也就是開始發(fā)送信號。
  • 捋一下整個流程為:
    其中 let ob = AnonymousObservable(subscribe)
    let sink = AnonymousObservableSink(observer: observer, ...),
    let observer = AnonymousObserver(event)

    self.asObservable().subscribe(observer) -> ob.subscribe(observer) -> ob.run(observer, ...) -> sink.run(ob) -> ob._subscribeHandler(AnyObserver(sink))

發(fā)送信號

  • 在前面執(zhí)行 AnonymousObservable(subscribe)._subscribeHandler(AnyObserver(sink)) 中傳遞 AnyObserver() 對象出去。
public struct AnyObserver<Element> : ObserverType {
    
    public typealias EventHandler = (Event<Element>) -> Void
    private let observer: EventHandler
    ...
    public init<Observer: ObserverType>(_ observer: Observer) where Observer.Element == Element {
        self.observer = observer.on
    }
    
     public func on(_ event: Event<Element>) {
        return self.observer(event)
    }    
    ...
}
  • 從初始化方法中可以看到 AnyObserver(sink) 保存的 sink.on的閉包。
  • 當執(zhí)行 anyObserver.onNext(2) 時,因為 AnyObserverObserverType的類型,所以會走到 ObserverType 的擴展方法
extension ObserverType {
    public func onNext(_ element: Element) {
        self.on(.next(element))
    }    
    ...
  }
  • 繼續(xù)走到 AnyObserveron(_ event:) 方法,傳遞 .next 事件, 從上面的 on(_ event:) 方法中看到它會執(zhí)行之前保存的閉包,因為我們之前保存的是 sink.on 閉包,所以最終會走到 sink.on 方法里:
func on(_ event: Event<Element>) {
    ...
    switch event {
    case .next:
        if load(self._isStopped) == 1 {
            return
        }
        self.forwardOn(event)
    case .error, .completed:
        if fetchOr(self._isStopped, 1) == 0 {
            self.forwardOn(event)
            self.dispose()
        }
    }
}
  • 來到這里會找到父類的 forwardOn(_ event:) 方法:
final func forwardOn(_ event: Event<Observer.Element>) {
    ...
    self._observer.on(event)
}
  • 還記得之前在創(chuàng)建 AnonymousObservableSink 對象時,保存了 AnonymousObserver 對象嗎? self._observer.on(event) 執(zhí)行 AnonymousObserver 的父類 ObserverBaseon(event)方法:
func on(_ event: Event<Element>) {
    switch event {
    case .next:
        if load(self._isStopped) == 0 {
            self.onCore(event)
        }
    case .error, .completed:
        if fetchOr(self._isStopped, 1) == 0 {
            self.onCore(event)
        }
    }
}
  • onCore(_ event:) 方法:
override func onCore(_ event: Event<Element>) {
    return self._eventHandler(event)
}
  • 執(zhí)行了在創(chuàng)建 AnonymousObserver 對象時保存的閉包。

  • 到這里我們也繼續(xù)捋一下整個流程:
    let observer = AnonymousObserver(_ event:)

anyObserver.onNext(2) -> anyObserver.on(.next(value)) -> sink.on(.next(value)) -> sink.forwardOn(.next(value)) -> sink._observer.on(.next(value)) -> observer.on(.next(value)) -> observer.onCore(.next(value)) -> observer._eventHandler(.next(value)) -> onNext?(value)

最后附上一張相關(guān)類繼承圖以幫助分析
RxSwift.png
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

友情鏈接更多精彩內(nèi)容