RxSwift源碼分析(二)-Observable和AnonymousObservableSink解析

上一篇文章中,主要描述了RxSwift的核心邏輯,也就是一個序列從創(chuàng)建到訂閱然后從發(fā)送消息到接收消息的整個流程是怎樣串聯(lián)起來的。還不太理解的同學(xué)可以移步到上一篇文章了解一下。

這篇文章主要來分析一下RxSwift的幾個核心類和協(xié)議的實現(xiàn)和設(shè)計。

Observable類解析

Observable是可觀察序列,是所有可觀察序列的基類,我們不會直接使用Observable這個類,一般都是使用子類。Observable也可以理解成抽象類,實際上不是抽象類,因為可觀察序列最重要的一個訂閱序列的方法subscribe必須在其子類中重寫。

我們先來看看Observable的源碼:

public class Observable<Element> : ObservableType {
    /// Type of elements in sequence.
    public typealias E = Element
    
    init() {
#if TRACE_RESOURCES
        _ = Resources.incrementTotal()
#endif
    }
    
    public func subscribe<O: ObserverType>(_ observer: O) -> Disposable where O.E == E {
        rxAbstractMethod()
    }
    
    public func asObservable() -> Observable<E> {
        return self
    }
    
    deinit {
#if TRACE_RESOURCES
        _ = Resources.decrementTotal()
#endif
    }
    internal func composeMap<R>(_ transform: @escaping (Element) throws -> R) -> Observable<R> {
        return _map(source: self, transform: transform)
    }
}
  • Observable實現(xiàn)了一個協(xié)議ObservableType,而且ObservableType協(xié)議繼承自ObservableConvertibleType協(xié)議,所以在Observable中實現(xiàn)了兩個協(xié)議方法:subscribeasObservable。
  • subscribe方法沒有具體實現(xiàn)的邏輯,需要子類去實現(xiàn)。
  • asObservable方法返回的是self,看似用處不大,其實不是這樣的。asObservable是非常有用的,如果一類是Observable的子類,我們可以直接返回self,如果不是Observable的子類,我們可以通過重寫這個協(xié)議方法來返回一個Observable對象,這樣保證了協(xié)議的一致性。在使用的時候我們可以直接寫類似self.asObservable().subscribe(observer)這樣的代碼,有利于保持代碼的簡潔性,是良好的封裝性的體現(xiàn)。所以我覺得這個設(shè)計非常的好,在我們?nèi)粘i_發(fā)中也可以借鑒。
  • _ = Resources.incrementTotal()_ = Resources.decrementTotal()這兩行代碼其實是RxSwift內(nèi)部實現(xiàn)的一個引用計數(shù)。這部分內(nèi)容我會在后面的文章中再詳解。
  • composeMap<R>優(yōu)化map的一個函數(shù),不太理解用處。
  • Observable子類非常多,這里不一一去看,主要區(qū)別在于對subscribe方法的實現(xiàn)不一樣。

AnonymousObservableSink類解析

final private class AnonymousObservableSink<O: ObserverType>: Sink<O>, ObserverType {
    typealias E = O.E
    typealias Parent = AnonymousObservable<E>

    // state
    private let _isStopped = AtomicInt(0)

    #if DEBUG
        fileprivate let _synchronizationTracker = SynchronizationTracker()
    #endif

    override init(observer: O, cancel: Cancelable) {
        super.init(observer: observer, cancel: cancel)
    }

    func on(_ event: Event<E>) {
        #if DEBUG
            self._synchronizationTracker.register(synchronizationErrorMessage: .default)
            defer { self._synchronizationTracker.unregister() }
        #endif
        switch event {
        case .next:
            if load(self._isStopped) == 1 {
                return
            }
            self.forwardOn(event)
        case .error, .completed:
            if fetchOr(self._isStopped, 1) == 0 {
                self.forwardOn(event)
                self.dispose()
            }
        }
    }

    func run(_ parent: Parent) -> Disposable {
        return parent._subscribeHandler(AnyObserver(self))
    }
}
  • AnonymousObservableSinkSink的子類,AnonymousObservableSink本身遵守ObseverType協(xié)議,與此同時實現(xiàn)了run方法,雖然沒有實現(xiàn)subscribe方法,但是已經(jīng)足夠了,這樣AnonymousObservableSink從某種程度來說也是Observable。
  • AnonymousObservableSink是Observer和Observable的銜接的橋梁,也可以理解成管道。它存儲了_observer和銷毀者_cancel。通過sink就可以完成從Observable到Obsever的轉(zhuǎn)變。
  • run方法中的這行代碼parent._subscribeHandler(AnyObserver(self)),其中parent是一個AnonymousObservable對象。_subscribeHandler這個block調(diào)用,代碼會執(zhí)行到創(chuàng)建序列時的block。然后會調(diào)用發(fā)送信號的代碼obserber.onNext("發(fā)送信號"),然后代碼會經(jīng)過幾個中間步驟會來到AnonymousObservableSink類的on方法。

有問題或者建議和意見,歡迎大家評論或者私信。
喜歡的朋友可以點下關(guān)注和喜歡,后續(xù)會持續(xù)更新文章。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

友情鏈接更多精彩內(nèi)容