【領(lǐng)略RxSwift源碼】- 主題類族(Subject)

(一)前言

前兩篇文章分析了RxSwift的整個(gè)基礎(chǔ)的訂閱流程以及變換操作(Operators)的概念實(shí)現(xiàn),有興趣的讀者可以點(diǎn)擊以下鏈接。

【領(lǐng)略RxSwift源碼】- 訂閱的工作流(Subscribing)
【領(lǐng)略RxSwift源碼】- 變換操作(Operators)

本篇文章將闡述Subject的概念以及在RxSwift當(dāng)中的具體實(shí)現(xiàn),在分析源碼的過程中,我們或許會(huì)發(fā)現(xiàn)一個(gè)不一樣的世界,或許我們會(huì)看到平時(shí)看不到的風(fēng)景。

(二)SubjectType

ReactiveX的世界中,一共定義了4種不同的Subject,分別是AsyncSubject、BehaviorSubject、PublishSubjectReplaySubject。無一例外,這四種Subject都實(shí)現(xiàn)了SubjectType協(xié)議,當(dāng)然這也是非常樸素的面向協(xié)議了??。

我們來看看SubjectType協(xié)議:

/// Represents an object that is both an observable sequence as well as an observer.
public protocol SubjectType : ObservableType {
    /// The type of the observer that represents this subject.
    ///
    /// Usually this type is type of subject itself, but it doesn't have to be.
    associatedtype SubjectObserverType : ObserverType

    /// Returns observer interface for subject.
    ///
    /// - returns: Observer interface for subject.
    func asObserver() -> SubjectObserverType
    
}

我們可以看到,在SubjectType中定義了一個(gè)ObserverType類型的associatedtype以及一個(gè)func asObserver() -> SubjectObserverType的方法。于此同時(shí),它也是繼承自Observable。也就是說,一個(gè)SubjectType既是一個(gè)觀察者Observer,又是一個(gè)可觀察序列(Observable)。

(三)Subject的實(shí)現(xiàn)細(xì)節(jié)

AsyncSubject

問:AsyncSubject 是一個(gè)具有什么樣特性的Subject?

答:簡(jiǎn)單的來說,當(dāng)AsyncSubject被訂閱的時(shí)候,如果AsyncSubject已經(jīng)發(fā)送過了.complete事件,那么訂閱者只能得到最后一個(gè)序列的值(如果沒有發(fā)送過序列那么不觸發(fā)任何訂閱)。如果沒有發(fā)送.complete事件,那么訂閱者一直都不會(huì)訂閱到值,直到AsyncSubject發(fā)送了.complete事件。

我們可以先來看一看AsyncSubject的繼承和協(xié)議:

public final class AsyncSubject<Element>
    : Observable<Element>
    , SubjectType
    , ObserverType
    , SynchronizedUnsubscribeType {
...

SubjectType剛剛我們已經(jīng)看過它的定義了,而ObserverType也在之前的文章中有過認(rèn)識(shí),那么只剩下SynchronizedUnsubscribeType是還沒有見到過的一個(gè)協(xié)議,看它的名字貌似是“同步取消訂閱者”的一個(gè)協(xié)議,我們來看一下具體的定義:

protocol SynchronizedUnsubscribeType : class {
    associatedtype DisposeKey

    func synchronizedUnsubscribe(_ disposeKey: DisposeKey)
}

emmm...看樣子是定義一個(gè)DisposeKey,然后可以通過這個(gè)DisposeKey來同步取消訂閱。而這個(gè)DisposeKey其實(shí)就是一個(gè)BagKey的結(jié)構(gòu)體,這個(gè)結(jié)構(gòu)體只有一個(gè)UInt64類型的存儲(chǔ)屬性rawValue,如下:

struct BagKey {
    fileprivate let rawValue: UInt64
}

那么既然是移除所有的訂閱者,那么這些訂閱者被存儲(chǔ)在哪里呢?

在RxSwift中定義了一個(gè)數(shù)據(jù)結(jié)構(gòu)叫做Bag,它是一個(gè)用來存儲(chǔ)少量元素的高效容器,它的插入刪除時(shí)間復(fù)雜度為O(n)。

struct Bag<T> : CustomDebugStringConvertible { ... }

值得一提的是,在Bag的內(nèi)部,真正存儲(chǔ)元素的容器并不是我們常用的Array類型,而是使用了ContiguousArray。我們可以看一下ContiguousArray的官方解釋:

/// If your array's `Element` type is a class or `@objc` protocol and you do
/// not need to bridge the array to `NSArray` or pass the array to Objective-C
/// APIs, using `ContiguousArray` may be more efficient and have more
/// predictable performance than `Array`. If the array's `Element` type is a
/// struct or enumeration, `Array` and `ContiguousArray` should have similar
/// efficiency.

顯然,使用ContiguousArray這是因?yàn)?code>ContiguousArray在處理class或者@objc修飾的類型的時(shí)候更加的高效,而在處理Swift基礎(chǔ)類型的時(shí)候效率就和Array差不多了。

注:還有值得一提的是&=操作符,這是一個(gè)日常開發(fā)中很少使用到的一個(gè)操作符。與普通的加法操作符(+)的區(qū)別在于,當(dāng)加法操作完成之后的結(jié)果類型溢出之后,任然可以安全的使用不至于奔潰;

例如:

let val: Int8 = 64
val + 64 // output: error
val &+ 64 // output: -128

OK~ 我們認(rèn)識(shí)到了在AsyncSubject中使用Box來存儲(chǔ),那么具體的實(shí)現(xiàn)AsyncSubject的邏輯是怎么樣的呢?我們先來看一下AsyncSubject有著那些屬性:

    typealias Observers = AnyObserver<Element>.s
    typealias DisposeKey = Observers.KeyType

    /// Indicates whether the subject has any observers
    public var hasObservers: Bool {
        _lock.lock(); defer { _lock.unlock() }
        return _observers.count > 0
    }

    let _lock = RecursiveLock()

    // state
    private var _observers = Observers()
    private var _isStopped = false
    private var _stoppedEvent = nil as Event<Element>? {
        didSet {
            _isStopped = _stoppedEvent != nil
        }
    }
    private var _lastElement: Element?

    #if DEBUG
        fileprivate let _synchronizationTracker = SynchronizationTracker()
    #endif

我們可以看到,重點(diǎn)的實(shí)現(xiàn)相關(guān)邏輯的屬性都被標(biāo)注成了private。_observers是一個(gè)存儲(chǔ)元素類型為Event<Element>) -> ()Box,_isStopped是一個(gè)Bool類型的flag,一旦發(fā)送了complete或者error時(shí)間,那么這個(gè)flag就會(huì)置為true。而_stoppedEvent則是一個(gè)可選的Event<Element>類型,它永遠(yuǎn)是最新發(fā)送的一個(gè)事件,如果從來沒有發(fā)送next事件,那么這個(gè)屬性就永遠(yuǎn)為空。

由于Subject既有Observer的特性又有Observable的特性,所以我們一個(gè)一個(gè)看它如何實(shí)現(xiàn)這些特性。我們先來看看Observer

    /// Notifies all subscribed observers about next event.
    ///
    /// - parameter event: Event to send to the observers.
    public func on(_ event: Event<E>) {
        #if DEBUG
            _synchronizationTracker.register(synchronizationErrorMessage: .default)
            defer { _synchronizationTracker.unregister() }
        #endif
        let (observers, event) = _synchronized_on(event)
        switch event {
        case .next:
            dispatch(observers, event)
            dispatch(observers, .completed)
        case .completed:
            dispatch(observers, event)
        case .error:
            dispatch(observers, event)
        }
    }

在這里有兩個(gè)比較關(guān)鍵的函數(shù),一個(gè)是_synchronized_on,另一個(gè)是dispatch_synchronized_on是實(shí)現(xiàn)AsyncSubject的關(guān)鍵函數(shù),我們可以待會(huì)了解了其他細(xì)節(jié)之后再看。

dispatch是一個(gè)在Bag+Rx.swift中定義的一個(gè)內(nèi)聯(lián)函數(shù),它的主要作用是給bag內(nèi)的所有(Event<E>) -> ()閉包對(duì)象派發(fā)執(zhí)行一個(gè)指定的事件(Event)。源碼如下:

@inline(__always)
func dispatch<E>(_ bag: Bag<(Event<E>) -> ()>, _ event: Event<E>) {
    if bag._onlyFastPath {
        bag._value0?(event)
        return
    }

    let value0 = bag._value0
    let dictionary = bag._dictionary

    if let value0 = value0 {
        value0(event)
    }

    let pairs = bag._pairs
    for i in 0 ..< pairs.count {
        pairs[i].value(event)
    }

    if let dictionary = dictionary {
        for element in dictionary.values {
            element(event)
        }
    }
}

注:在Swift中我們可以通過@ inline關(guān)鍵字來標(biāo)識(shí)一個(gè)函數(shù)是內(nèi)聯(lián)函數(shù)。簡(jiǎn)單的來說,在Swift中我們有三種"內(nèi)聯(lián)策略": sometimes, nerver, always。
sometimes: 當(dāng)我們申明一個(gè)函數(shù)的時(shí)候,默認(rèn)這個(gè)函數(shù)的內(nèi)聯(lián)策略就是sometimes的。這個(gè)時(shí)候,swift的編譯器會(huì)自動(dòng)的為它所認(rèn)為足夠短小的函數(shù)增添上內(nèi)聯(lián)特性,而對(duì)于相對(duì)而言比較龐大的函數(shù)不使用內(nèi)聯(lián)特性,以此達(dá)到代碼執(zhí)行優(yōu)化的目的。
always: 當(dāng)我們需要某個(gè)函數(shù)強(qiáng)制內(nèi)聯(lián)的時(shí)候,我們只需要在函數(shù)之前加上@inline(__always)關(guān)鍵字。當(dāng)編譯器檢測(cè)到該關(guān)鍵字的時(shí)候,編譯器就知道在這里永遠(yuǎn)都需要內(nèi)聯(lián)展開,就不會(huì)執(zhí)行自己的那一套默認(rèn)的內(nèi)鏈優(yōu)化策略。
nerver: 當(dāng)我們需要某一個(gè)函數(shù)永遠(yuǎn)都不要進(jìn)行內(nèi)聯(lián)的時(shí)候,我們只需要在函數(shù)之前加上@inline(never)。那么,當(dāng)編譯器檢測(cè)到該關(guān)鍵字的時(shí)候,編譯器就知道在這里永遠(yuǎn)都需要內(nèi)聯(lián)展開。

正如我們看到的,dsipatch方法是基于Bag<(Event<E>) -> ()>類型的容器來實(shí)現(xiàn)的,之所以之前有一堆復(fù)雜的判斷邏輯,就是因?yàn)閮?yōu)化代碼執(zhí)行效率。當(dāng)bag中只有一個(gè)元素的時(shí)候,_onlyFastPathtrue,那么我們只需要取出那一個(gè)執(zhí)行操作就可以了。然而當(dāng)我們超過一個(gè),小于三十個(gè)的時(shí)候,我們會(huì)將元素存儲(chǔ)在ContiguousArray中,通過下標(biāo)的方式來獲取元素進(jìn)行操作。而當(dāng)容器內(nèi)元素超過30這個(gè)閾值的時(shí)候,我們就要將元素插入到字典中,需要使用的時(shí)候再用鍵值對(duì)取出使用。

那么,_synchronized_on到底是如何配合dispatch來實(shí)現(xiàn)AsyncSubject的特性的呢?

    func _synchronized_on(_ event: Event<E>) -> (Observers, Event<E>) {
        _lock.lock(); defer { _lock.unlock() }
        if _isStopped {
            return (Observers(), .completed)
        }

        switch event {
        case .next(let element):
            _lastElement = element
            return (Observers(), .completed)
        case .error:
            _stoppedEvent = event

            let observers = _observers
            _observers.removeAll()

            return (observers, event)
        case .completed:

            let observers = _observers
            _observers.removeAll()

            if let lastElement = _lastElement {
                _stoppedEvent = .next(lastElement)
                return (observers, .next(lastElement))
            }
            else {
                _stoppedEvent = event
                return (observers, .completed)
            }
        }
    }

當(dāng)源序列發(fā)送next事件的時(shí)候,AsyncSubject僅僅使用內(nèi)部的_lastElement屬性來記錄下當(dāng)前的next事件,然后構(gòu)造一個(gè)空的Bag來執(zhí)行completed事件(相當(dāng)于沒做什么事情)。

當(dāng)源序列發(fā)送error事件的時(shí)候,使用_stoppedEvent來記錄最后的最后的事件,然后構(gòu)造一個(gè)observers常量,將自身所有的觀察者拷貝到observers常量中,將自身所有的觀察者移除,最后把observers和該error事件返回。

(四)Subject的意義

當(dāng)然,除了AsyncSubject之外,我們還有還有以下幾種Subject:

  • PublishSubject: 標(biāo)準(zhǔn)的熱信號(hào),訂閱者只會(huì)接收到訂閱操作之后的事件。
  • ReplaySubject:訂閱者會(huì)接受到訂閱之前的事件以及訂閱之后的事件,類似于冷信號(hào)。
  • BehaviorSubject:訂閱之后首先會(huì)接收到最近一次發(fā)送的事件(如果最近沒有發(fā)送,那么發(fā)送一個(gè)初始的事件)。
  • Variable: 基于BehaviorSubject的封裝,會(huì)將初始值或者最近的值發(fā)送給訂閱者。

然而寫到這里,我并不想一一詳細(xì)的分析剩下四種的實(shí)現(xiàn)細(xì)節(jié)。因?yàn)椋c剛剛分析完成的AsyncSubject相比,其余的Subject實(shí)現(xiàn)的方法都沒有太大的區(qū)別。所以筆者也不想在這里流水賬似的浪費(fèi)時(shí)間。

不如做一些更有意思的事情:為什么我們需要Subject?

現(xiàn)在我們先不妨設(shè)想一個(gè)這樣的場(chǎng)景:

我們需要追蹤用戶在iPhone上的每一次點(diǎn)擊,當(dāng)用戶點(diǎn)擊一次系統(tǒng)就會(huì)調(diào)用一次screenDidTapped(on point: CGPoint)方法。

那么在ReactiveX中,我們自然可以想到類似這樣的做法:

var observer: AnyObserver<CGPoint>!

let tapped = Observable<CGPoint>.create { (observer) -> Disposable in
    observer = observer
    return Disposables.create()
}

func screenDidTapped(on point: CGPoint) {
    observer.on(point)
}

然而這樣的實(shí)現(xiàn)確實(shí)存在一些問題:

  • 一對(duì)一的限制

由于Observables的特性限制,如果我們希望有多個(gè)觀察者來訂閱該點(diǎn)擊事件,那么Observables是無法做到的。當(dāng)你存在兩個(gè)及以上的訂閱的時(shí)候,只有最新的觀察者可以接收到序列的事件信息。

  • 訂閱前的行為

還是由于Observables的特性,create構(gòu)造器的閉包只會(huì)在第一次被訂閱的時(shí)候會(huì)調(diào)用。然而當(dāng)點(diǎn)擊屏幕的時(shí)候,我們并不能保證就一定有觀察者訂閱了這個(gè)序列。

也就是說,當(dāng)你遇到類似上述的情況的時(shí)候,你需要使用熱信號(hào)(hot observeable)。

Hot Observables VS Cold observables

雖然冷熱信號(hào)已經(jīng)是被講爛的話題了,但是既然寫到這里已經(jīng)是不得不說的地步了。

Bnaya Eshet在他的博客中對(duì)"冷熱信號(hào)"有過一個(gè)非常形象的比喻:

if a tree falls in a forest and no one is around to hear it, does it make a sound? if it do make a sound when nobody observed it, we should mark it as hot, otherwise it should be marked as cold.
倘若一顆沙漠中的枯樹黯然傾倒而無人問津,是不是可以說它從未對(duì)這個(gè)世界發(fā)出聲音。倘若無人關(guān)心而算作發(fā)出了聲音,那么它就是熱信號(hào),反之,則是冷信號(hào)。

我們?cè)賮砜纯蠢錈嵝盘?hào)的對(duì)比:

Hot Observables Cold observables
屬于序列 屬于序列
無論有或者沒有被訂閱,都會(huì)產(chǎn)生事件。 只有被訂閱的時(shí)候才會(huì)產(chǎn)生事件。
變量、屬性、點(diǎn)擊操作、鼠標(biāo)操作、UI的變化等 異步操作、HTTP連接、TCP連接等
通常包含N個(gè)Next事件 通常只有一個(gè)Next事件
數(shù)據(jù)源的變化能夠作用到所有的訂閱者。 數(shù)據(jù)源的變化只會(huì)作用到當(dāng)前的訂閱者。
它是有狀態(tài)的 它是無狀態(tài)的

在現(xiàn)實(shí)世界的編程中,我們總是面對(duì)著各種各樣復(fù)雜的情景。絕大多數(shù)的情況之下,我們的信號(hào)流可以是純函數(shù)的,不可變的,安全的。然而當(dāng)我們面對(duì)諸如鼠標(biāo)追蹤,變量的流式表達(dá)的時(shí)候,不可避免的我們需要使用到熱信號(hào)。

當(dāng)我們需要使用到熱信號(hào)的時(shí)候,我們?cè)俑鶕?jù)當(dāng)前的環(huán)境選擇最適合的Subject。比如我們希望對(duì)數(shù)據(jù)有“回看”功能,那么我們就可以選擇使用ReplaySubject。如果我們只關(guān)心最后一個(gè)數(shù)據(jù)變化,那么我們可以使用AsyncSubject,諸如此類等等。

(五)結(jié)語

對(duì)于Rx的使用者來說,我們更加希望使用的是Cold observables。從函數(shù)式的角度來說,Cold observables是不可變的,而Hot observables是可變的。不可變的數(shù)據(jù)總是更加的符合人的心智模型,而更加易于維護(hù)和理解,同時(shí)也更加的安全。希望這篇文章可以加深讀者對(duì)于Rx的理解。

參考

  1. https://developer.apple.com/documentation/swift/operator_declarations
  2. http://xgrommx.github.io/rx-book/content/getting_started_with_rxjs/subjects.html
  3. To Use Subject Or Not To Use Subject?
  4. https://github.com/ReactiveX/RxSwift/blob/master/Documentation/HotAndColdObservables.md
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容