透視RxSwift核心邏輯

透視RxSwift核心邏輯

篇幅稍微有點長,了解程度不同,可以跳過某些部分。

  1. 如果對源碼比較熟悉的,建議直接看圖就行了,時序圖更加清晰。第一次摸索有必要閱讀文字內(nèi)容。
  2. 貼出來的代碼省略了不必要的部分,用省略號代替。

示例

RxSwift的基礎(chǔ)用法就是很簡單的幾步

  1. 創(chuàng)建可觀察序列
  2. 監(jiān)聽序列(訂閱信號)
  3. 銷毀序列
//創(chuàng)建序列
let ob = Observable<Any>.create { (observer) -> Disposable in
    //發(fā)送信號
    observer.onNext("今日份麻醬涼皮")
    observer.onCompleted()

    return Disposables.create()
}
//訂閱信號
let _ = ob.subscribe(onNext: { (text) in
    print("訂閱到:\(text)")
}, onError: { (error) in
    print(error)
}, onCompleted: {
    print("完成")
}) {
    print("銷毀")
}

控制臺輸出:

訂閱到:今日份麻醬涼皮
完成
銷毀

探究

在看源碼之前,應(yīng)該對接觸到的類和協(xié)議有些認(rèn)識,方便之后的理解。下面的關(guān)系圖在需要的時候回頭熟悉一下就行:

類關(guān)系圖.png

到底是什么在支撐如此便捷的調(diào)用?

第一句Observable<Any>.create創(chuàng)建了一個可觀察序列Observable對象,第二句就是這個Observable序列對象訂閱了消息。

從輸出可以看出,都是訂閱到的消息。那么訂閱時傳入subscribe的閉包是什么時候調(diào)用的呢?

單從現(xiàn)在的幾句代碼,也能猜出是第一句代碼的閉包中的observer.onNext的調(diào)用引起的。但是,我們也沒有看到這個create函數(shù)中的閉包是在哪里執(zhí)行的?

為了能夠清晰的描述,暫且稱第一句create中的閉包為create閉包,第二句subscribe中的幾個閉包為subscribe閉包。

外面看不出來,那我們只能進(jìn)去RxSwift里面探索下createsubscribe到底做了什么?

create函數(shù)的實現(xiàn)

extension ObservableType {
    public static func create(_ subscribe: @escaping (AnyObserver<E>) -> Disposable) -> Observable<E> {
        return AnonymousObservable(subscribe)
    }
}

原來這函數(shù)內(nèi)部實際上是創(chuàng)建了一個AnonymousObservable匿名可觀察序列對象。而之前的閉包也是給AnonymousObservable對象初始化用了。

AnonymousObservable

final private class AnonymousObservable<Element>: Producer<Element> {
    typealias SubscribeHandler = (AnyObserver<Element>) -> Disposable

    let _subscribeHandler: SubscribeHandler

    init(_ subscribeHandler: @escaping SubscribeHandler) {
        self._subscribeHandler = subscribeHandler
    }

    override func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element {
        let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
        let subscription = sink.run(self)
        return (sink: sink, subscription: subscription)
    }
}

這里在初始化的時候把create閉包賦值給了_subscribeHandler屬性。

到此為止,Observable<Any>.create函數(shù)實際上創(chuàng)建了一個AnonymousObservable匿名可觀察序列對象,并保存了create閉包

一圖概千言1.png

。。。貌似這條不是主線??!沒有找到任何一個問題的答案。

再來翻翻subscribe函數(shù)

extension ObservableType {
    public func subscribe(onNext: ((E) -> Void)? = nil, onError: ((Swift.Error) -> Void)? = nil, onCompleted: (() -> Void)? = nil, onDisposed: (() -> Void)? = nil)
        -> Disposable {
            let disposable: Disposable
            
            ......
            
            let observer = AnonymousObserver<E> { event in
                ......
                
                switch event {
                case .next(let value):
                    onNext?(value)
                case .error(let error):
                    if let onError = onError {
                        onError(error)
                    }
                    else {
                        Hooks.defaultErrorHandler(callStack, error)
                    }
                    disposable.dispose()
                case .completed:
                    onCompleted?()
                    disposable.dispose()
                }
            }
            return Disposables.create(
                self.asObservable().subscribe(observer),
                disposable
            )
    }
}

這也是對定義在ObservableType協(xié)議中的函數(shù)的實現(xiàn),返回了一個Disposable。這個Disposable就是用來管理訂閱的生命周期的,示例代碼中并沒有體現(xiàn)出來,實際是在訂閱信號的內(nèi)部處理的。前面都沒什么,后面創(chuàng)建了AnonymousObserver,并且在AnonymousObserver初始化時傳入了閉包,并賦值給_eventHandler屬性。

final class AnonymousObserver<ElementType> : ObserverBase<ElementType> {
    typealias Element = ElementType
    typealias EventHandler = (Event<Element>) -> Void
    
    private let _eventHandler : EventHandler
    
    init(_ eventHandler: @escaping EventHandler) {
        self._eventHandler = eventHandler
    }

    override func onCore(_ event: Event<Element>) {
        return self._eventHandler(event)
    }
}

之前,AnonymousObservable匿名序列對象,保存了create閉包。
此時,創(chuàng)建了AnonymousObserver匿名觀察者對象,保存了對subscribe閉包的回調(diào)執(zhí)行的EventHandler閉包。

又一條支線。。。一路走來,都是在創(chuàng)建對象,保存閉包。兩個主線疑問還是無跡可尋。難道一開始就走上了歧路?非也!繼續(xù)看下去就明白了什么叫「柳暗花明又一村」。

AnonymousObservablesubscribe函數(shù)中,在創(chuàng)建了AnonymousObserver對象后,還返回了一個新建的Disposable對象。重點就在這里的第一個參數(shù):self.asObservable().subscribe(observer)中。asObservable還是返回了self,后面貼上的ObserverType中可以看到。剩下的就是AnonymousObservable的父類Producer中的subscribe了:

class Producer<Element> : Observable<Element> {
    override func subscribe<O : ObserverType>(_ observer: O) -> Disposable where O.E == Element {
        if !CurrentThreadScheduler.isScheduleRequired {
            ......
        } else {
            return CurrentThreadScheduler.instance.schedule(()) { _ in
                let disposer = SinkDisposer()
                let sinkAndSubscription = self.run(observer, cancel: disposer)
                disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)

                return disposer
            }
        }
    }
}

由于是在當(dāng)前線程中執(zhí)行的,只看else那部分。disposer相關(guān)的不用關(guān)心。關(guān)鍵是observer參數(shù),這個參數(shù)中有對subscribe閉包的處理的EventHandler閉包。observer傳入了self.run(observer, cancel)。所以,還要回頭再看看AnonymousObservable類中的run函數(shù)

override func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element {
    let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
    let subscription = sink.run(self)
    return (sink: sink, subscription: subscription)
}

這里又創(chuàng)建了一個AnonymousObservableSink對象,observercancel繼續(xù)往初始化函數(shù)中丟:

final private class AnonymousObservableSink<O: ObserverType>: Sink<O>, ObserverType {
    typealias E = O.E
    typealias Parent = AnonymousObservable<E>
    // state
    private let _isStopped = AtomicInt(0)
    
    override init(observer: O, cancel: Cancelable) {
        super.init(observer: observer, cancel: cancel)
    }
    
    func on(_ event: Event<E>) {
        ......
        
        switch event {
        case .next:
            if load(self._isStopped) == 1 {
                return
            }
            self.forwardOn(event)
        case .error, .completed:
            if fetchOr(self._isStopped, 1) == 0 {
                self.forwardOn(event)
                self.dispose()
            }
        }
    }

    func run(_ parent: Parent) -> Disposable {
        return parent._subscribeHandler(AnyObserver(self))
    }
}

前面sink.run(self)self傳了進(jìn)來,又對AnonymousObservable起了別名Parent。parent._subscribeHandler不就是AnonymousObservable在調(diào)用它最開始保存的那個create閉包么?AnyObserver(self)則把AnonymousObservableSink作為AnyObserver初始化的參數(shù)。

public struct AnyObserver<Element> : ObserverType {
    public typealias E = Element
    public typealias EventHandler = (Event<Element>) -> Void

    private let observer: EventHandler

    public init(eventHandler: @escaping EventHandler) {
        self.observer = eventHandler
    }
    
    public init<O : ObserverType>(_ observer: O) where O.E == Element {
        self.observer = observer.on
    }
    public func on(_ event: Event<Element>) {
        return self.observer(event)
    }
    public func asObserver() -> AnyObserver<E> {
        return self
    }
}

其中還把AnonymousObservableSinkon函數(shù)賦值給了AnyObserver的屬性observer,observer就是EventHandler。這個EventHandlercreate閉包中會用到。

這不就是第二個主線疑問(create函數(shù)中的閉包何時調(diào)用)的答案么!

整理一下:

  • subscribe(onNext, onError, onCompleted, onDisposed) -> Disposable函數(shù)中創(chuàng)建Disposable開始
  • AnonymousObservable調(diào)用subscribe(observer)
  • AnonymousObservable調(diào)用run(observer, cancel)
  • 創(chuàng)建AnonymousObservableSink(observer: observer, cancel: cancel),并且sink.run(self)
  • parent._subscribeHandler(AnyObserver(self))

這是一條從subscribe閉包-->create閉包的線。

一圖概千言2.png

還沒完,還有個create閉包中怎么觸發(fā)subscribe閉包的?

又臭又長的寫了這么多,這里就只看observer.onNext("今日份麻醬涼皮")吧。點進(jìn)去看:
observer.onNext("今日份麻醬涼皮")

extension ObserverType {
    public func onNext(_ element: E) {
        self.on(.next(element))
    }
    
    public func onCompleted() {
        self.on(.completed)
    }
    
    public func onError(_ error: Swift.Error) {
        self.on(.error(error))
    }
}

onNext中調(diào)用了on,在前面AnyObserver結(jié)構(gòu)體定義中可以看出,on函數(shù)中返回了self.observer(event),之前是把AnonymousObservableSinkon賦值給了這個self.observer,所以,此時會走到AnonymousObservableSinkon函數(shù)中。這里面又調(diào)用了self.forwardOn(event),看下AnonymousObservableSink的父類Sink中定義的forwardOn

class Sink<O : ObserverType> : Disposable {
    fileprivate let _observer: O
    fileprivate let _cancel: Cancelable
    fileprivate let _disposed = AtomicInt(0)

    ......

    init(observer: O, cancel: Cancelable) {
        ......
        
        self._observer = observer
        self._cancel = cancel
    }

    final func forwardOn(_ event: Event<O.E>) {
        ......
        
        self._observer.on(event)
    }
}

forwardOn中走了一句self._observer.on(event)。這里的_observer屬性不就是AnonymousObservableSink初始化時傳入的AnonymousObserver對象么!

繼續(xù)跟AnonymousObserveron

final class AnonymousObserver<ElementType> : ObserverBase<ElementType> {
    typealias Element = ElementType
    typealias EventHandler = (Event<Element>) -> Void
    
    private let _eventHandler : EventHandler
    
    init(_ eventHandler: @escaping EventHandler) {
        ......
        self._eventHandler = eventHandler
    }

    override func onCore(_ event: Event<Element>) {
        return self._eventHandler(event)
    }
}

這里沒有on,看父類ObserverBase:

class ObserverBase<ElementType> : Disposable, ObserverType {
    typealias E = ElementType
    
    private let _isStopped = AtomicInt(0)

    func on(_ event: Event<E>) {
        switch event {
        case .next:
            if load(self._isStopped) == 0 {
                self.onCore(event)
            }
        case .error, .completed:
            if fetchOr(self._isStopped, 1) == 0 {
                self.onCore(event)
            }
        }
    }
}

這里的on函數(shù)中,有個.next分支,調(diào)用了self.onCore(event)。子類AnonymousObserver實現(xiàn)的onCore中又調(diào)用了self._eventHandler(event)。

這個_eventHandler是什么?不就是AnonymousObserver初始化時保存的對subscribe閉包處理的閉包么!所以create閉包中的observer.onNext("今日份麻醬涼皮")就能觸發(fā)subscribe閉包了。

這是一條從AnonymousObservable調(diào)用_subscribeHandler(也就是create閉包)時的參數(shù) AnyObserver-->subscribe閉包的線。

一圖概千言3.png

現(xiàn)在我們看清楚了響應(yīng)式的數(shù)據(jù)流:

  1. 在訂閱信號時創(chuàng)建了observer并執(zhí)行創(chuàng)建序列時的閉包
  2. 在創(chuàng)建序列的閉包中有回調(diào)observer,監(jiān)聽序列的變動而觸發(fā)訂閱信號的閉包

圖解

看清楚了么?好像清楚的不太明顯。畢竟好幾個類、協(xié)議,又那么多函數(shù)調(diào)來調(diào)去的。加把勁再擼一擼。既然這么五花八門的調(diào)用流程搞清楚了,那就來弄清楚它主要都做了什么?

一圖概千言,既然畫了圖,就少敲點鍵盤吧!

RxSwift核心邏輯圖解四.jpg

仔細(xì)看了流程圖就會發(fā)現(xiàn),出現(xiàn)的幾個類中,干活的主要是Anonymous開頭的那三個類。我們在外面調(diào)用的操作,其實是在使用RxSwift內(nèi)部封裝的一些類。

  • 創(chuàng)建可觀察序列AnonymousObservable,并保存create閉包
  • 訂閱信號
    • 首先創(chuàng)建了一個AnonymousObserver,并且把對subscribe閉包的操作封裝成了EventHandler
    • 苦力活還是AnonymousObservable來干。
      • 在創(chuàng)建返回值Disposable中,由subscribe(observer),把AnonymousObserver傳給了AnonymousObservableSink
        • AnonymousObservableSink才是信息處理的核心,因為他知道的太多了
        • AnonymousObservableSinkAnonymousObserver觀察者,AnonymousObserver持有著EventHandler。
        • AnonymousObservableSink在調(diào)用run函數(shù)時也傳入了AnonymousObservable序列,AnonymousObservable就是create閉包的持有者。
        • AnonymousObservableSink初始化的時候,除了觀察者外,還有個管理序列生命周期的Disposable。
      • AnonymousObservableSink作為一個內(nèi)部類,在被create閉包當(dāng)做參數(shù)回調(diào)給外界時需要轉(zhuǎn)換為AnyObserver,在這里AnyObserver則是以閉包屬性的形式保留了AnonymousObservableSinkon函數(shù)
      • 后面在信號發(fā)生改變時就可以讓AnyObserver通過這個屬性值聯(lián)系到AnonymousObservableSink

訂閱信號:Observable-->AnonymousObservable-->AnonymousObserver-->AnonymousObservableSink-->AnyObserver-->create閉包

  • 發(fā)出信號
    • 這個過程基本就是和訂閱信號時相反的
    • create閉包中調(diào)用AnyObserveronNext開始
    • 通過AnyObserver.observer訪問閉包中的AnonymousObservableSink
    • AnonymousObservableSink擁有AnonymousObserver
    • AnonymousObserver掌控EventHandler
    • 句號

發(fā)出信號:create閉包-->AnyObserver-->AnonymousObservableSink-->AnonymousObserver-->subscribe閉包

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 發(fā)現(xiàn) 關(guān)注 消息 RxSwift入坑解讀-你所需要知道的各種概念 沸沸騰關(guān)注 2016.11.27 19:11*字...
    楓葉1234閱讀 2,935評論 0 2
  • 轉(zhuǎn)載自:https://xiaobailong24.me/2017/03/18/Android-RxJava2.x...
    Young1657閱讀 2,106評論 1 9
  • RxSwift 核心原理解析 角色定位 觀察者(Observer) 被觀察者(Observable) 訂閱者(Su...
    狼性刀鋒閱讀 4,488評論 0 13
  • 轉(zhuǎn)一篇文章 原地址:http://gank.io/post/560e15be2dca930e00da1083 前言...
    jack_hong閱讀 1,028評論 0 2
  • 這段代碼中,request.GET['q']是中文,其在網(wǎng)頁中顯示的效果則是中文的編碼。要想顯示中文,就得把req...
    提莫總隊長閱讀 233評論 0 0

友情鏈接更多精彩內(nèi)容