(一)前言

前兩篇文章分析了RxSwift的整個(gè)基礎(chǔ)的訂閱流程以及變換操作(Operators)的概念實(shí)現(xiàn),有興趣的讀者可以點(diǎn)擊以下鏈接。
【領(lǐng)略RxSwift源碼】- 訂閱的工作流(Subscribing)
【領(lǐng)略RxSwift源碼】- 變換操作(Operators)
本篇文章將闡述Subject的概念以及在RxSwift當(dāng)中的具體實(shí)現(xiàn),在分析源碼的過程中,我們或許會(huì)發(fā)現(xiàn)一個(gè)不一樣的世界,或許我們會(huì)看到平時(shí)看不到的風(fēng)景。
(二)SubjectType
在ReactiveX的世界中,一共定義了4種不同的Subject,分別是AsyncSubject、BehaviorSubject、PublishSubject和ReplaySubject。無一例外,這四種Subject都實(shí)現(xiàn)了SubjectType協(xié)議,當(dāng)然這也是非常樸素的面向協(xié)議了??。
我們來看看SubjectType協(xié)議:
/// Represents an object that is both an observable sequence as well as an observer.
public protocol SubjectType : ObservableType {
/// The type of the observer that represents this subject.
///
/// Usually this type is type of subject itself, but it doesn't have to be.
associatedtype SubjectObserverType : ObserverType
/// Returns observer interface for subject.
///
/// - returns: Observer interface for subject.
func asObserver() -> SubjectObserverType
}
我們可以看到,在SubjectType中定義了一個(gè)ObserverType類型的associatedtype以及一個(gè)func asObserver() -> SubjectObserverType的方法。于此同時(shí),它也是繼承自Observable。也就是說,一個(gè)SubjectType既是一個(gè)觀察者Observer,又是一個(gè)可觀察序列(Observable)。
(三)Subject的實(shí)現(xiàn)細(xì)節(jié)
AsyncSubject
問:AsyncSubject 是一個(gè)具有什么樣特性的Subject?
答:簡(jiǎn)單的來說,當(dāng)AsyncSubject被訂閱的時(shí)候,如果AsyncSubject已經(jīng)發(fā)送過了
.complete事件,那么訂閱者只能得到最后一個(gè)序列的值(如果沒有發(fā)送過序列那么不觸發(fā)任何訂閱)。如果沒有發(fā)送.complete事件,那么訂閱者一直都不會(huì)訂閱到值,直到AsyncSubject發(fā)送了.complete事件。
我們可以先來看一看AsyncSubject的繼承和協(xié)議:
public final class AsyncSubject<Element>
: Observable<Element>
, SubjectType
, ObserverType
, SynchronizedUnsubscribeType {
...
SubjectType剛剛我們已經(jīng)看過它的定義了,而ObserverType也在之前的文章中有過認(rèn)識(shí),那么只剩下SynchronizedUnsubscribeType是還沒有見到過的一個(gè)協(xié)議,看它的名字貌似是“同步取消訂閱者”的一個(gè)協(xié)議,我們來看一下具體的定義:
protocol SynchronizedUnsubscribeType : class {
associatedtype DisposeKey
func synchronizedUnsubscribe(_ disposeKey: DisposeKey)
}
emmm...看樣子是定義一個(gè)DisposeKey,然后可以通過這個(gè)DisposeKey來同步取消訂閱。而這個(gè)DisposeKey其實(shí)就是一個(gè)BagKey的結(jié)構(gòu)體,這個(gè)結(jié)構(gòu)體只有一個(gè)UInt64類型的存儲(chǔ)屬性rawValue,如下:
struct BagKey {
fileprivate let rawValue: UInt64
}
那么既然是移除所有的訂閱者,那么這些訂閱者被存儲(chǔ)在哪里呢?
在RxSwift中定義了一個(gè)數(shù)據(jù)結(jié)構(gòu)叫做Bag,它是一個(gè)用來存儲(chǔ)少量元素的高效容器,它的插入刪除時(shí)間復(fù)雜度為O(n)。
struct Bag<T> : CustomDebugStringConvertible { ... }
值得一提的是,在Bag的內(nèi)部,真正存儲(chǔ)元素的容器并不是我們常用的Array類型,而是使用了ContiguousArray。我們可以看一下ContiguousArray的官方解釋:
/// If your array's `Element` type is a class or `@objc` protocol and you do
/// not need to bridge the array to `NSArray` or pass the array to Objective-C
/// APIs, using `ContiguousArray` may be more efficient and have more
/// predictable performance than `Array`. If the array's `Element` type is a
/// struct or enumeration, `Array` and `ContiguousArray` should have similar
/// efficiency.
顯然,使用ContiguousArray這是因?yàn)?code>ContiguousArray在處理class或者@objc修飾的類型的時(shí)候更加的高效,而在處理Swift基礎(chǔ)類型的時(shí)候效率就和Array差不多了。
注:還有值得一提的是
&=操作符,這是一個(gè)日常開發(fā)中很少使用到的一個(gè)操作符。與普通的加法操作符(+)的區(qū)別在于,當(dāng)加法操作完成之后的結(jié)果類型溢出之后,任然可以安全的使用不至于奔潰;例如:
let val: Int8 = 64 val + 64 // output: error val &+ 64 // output: -128
OK~ 我們認(rèn)識(shí)到了在AsyncSubject中使用Box來存儲(chǔ),那么具體的實(shí)現(xiàn)AsyncSubject的邏輯是怎么樣的呢?我們先來看一下AsyncSubject有著那些屬性:
typealias Observers = AnyObserver<Element>.s
typealias DisposeKey = Observers.KeyType
/// Indicates whether the subject has any observers
public var hasObservers: Bool {
_lock.lock(); defer { _lock.unlock() }
return _observers.count > 0
}
let _lock = RecursiveLock()
// state
private var _observers = Observers()
private var _isStopped = false
private var _stoppedEvent = nil as Event<Element>? {
didSet {
_isStopped = _stoppedEvent != nil
}
}
private var _lastElement: Element?
#if DEBUG
fileprivate let _synchronizationTracker = SynchronizationTracker()
#endif
我們可以看到,重點(diǎn)的實(shí)現(xiàn)相關(guān)邏輯的屬性都被標(biāo)注成了private。_observers是一個(gè)存儲(chǔ)元素類型為Event<Element>) -> ()的Box,_isStopped是一個(gè)Bool類型的flag,一旦發(fā)送了complete或者error時(shí)間,那么這個(gè)flag就會(huì)置為true。而_stoppedEvent則是一個(gè)可選的Event<Element>類型,它永遠(yuǎn)是最新發(fā)送的一個(gè)事件,如果從來沒有發(fā)送next事件,那么這個(gè)屬性就永遠(yuǎn)為空。
由于Subject既有Observer的特性又有Observable的特性,所以我們一個(gè)一個(gè)看它如何實(shí)現(xiàn)這些特性。我們先來看看Observer:
/// Notifies all subscribed observers about next event.
///
/// - parameter event: Event to send to the observers.
public func on(_ event: Event<E>) {
#if DEBUG
_synchronizationTracker.register(synchronizationErrorMessage: .default)
defer { _synchronizationTracker.unregister() }
#endif
let (observers, event) = _synchronized_on(event)
switch event {
case .next:
dispatch(observers, event)
dispatch(observers, .completed)
case .completed:
dispatch(observers, event)
case .error:
dispatch(observers, event)
}
}
在這里有兩個(gè)比較關(guān)鍵的函數(shù),一個(gè)是_synchronized_on,另一個(gè)是dispatch。_synchronized_on是實(shí)現(xiàn)AsyncSubject的關(guān)鍵函數(shù),我們可以待會(huì)了解了其他細(xì)節(jié)之后再看。
dispatch是一個(gè)在Bag+Rx.swift中定義的一個(gè)內(nèi)聯(lián)函數(shù),它的主要作用是給bag內(nèi)的所有(Event<E>) -> ()閉包對(duì)象派發(fā)執(zhí)行一個(gè)指定的事件(Event)。源碼如下:
@inline(__always)
func dispatch<E>(_ bag: Bag<(Event<E>) -> ()>, _ event: Event<E>) {
if bag._onlyFastPath {
bag._value0?(event)
return
}
let value0 = bag._value0
let dictionary = bag._dictionary
if let value0 = value0 {
value0(event)
}
let pairs = bag._pairs
for i in 0 ..< pairs.count {
pairs[i].value(event)
}
if let dictionary = dictionary {
for element in dictionary.values {
element(event)
}
}
}
注:在Swift中我們可以通過@ inline關(guān)鍵字來標(biāo)識(shí)一個(gè)函數(shù)是內(nèi)聯(lián)函數(shù)。簡(jiǎn)單的來說,在Swift中我們有三種"內(nèi)聯(lián)策略": sometimes, nerver, always。
sometimes: 當(dāng)我們申明一個(gè)函數(shù)的時(shí)候,默認(rèn)這個(gè)函數(shù)的內(nèi)聯(lián)策略就是sometimes的。這個(gè)時(shí)候,swift的編譯器會(huì)自動(dòng)的為它所認(rèn)為足夠短小的函數(shù)增添上內(nèi)聯(lián)特性,而對(duì)于相對(duì)而言比較龐大的函數(shù)不使用內(nèi)聯(lián)特性,以此達(dá)到代碼執(zhí)行優(yōu)化的目的。
always: 當(dāng)我們需要某個(gè)函數(shù)強(qiáng)制內(nèi)聯(lián)的時(shí)候,我們只需要在函數(shù)之前加上@inline(__always)關(guān)鍵字。當(dāng)編譯器檢測(cè)到該關(guān)鍵字的時(shí)候,編譯器就知道在這里永遠(yuǎn)都需要內(nèi)聯(lián)展開,就不會(huì)執(zhí)行自己的那一套默認(rèn)的內(nèi)鏈優(yōu)化策略。
nerver: 當(dāng)我們需要某一個(gè)函數(shù)永遠(yuǎn)都不要進(jìn)行內(nèi)聯(lián)的時(shí)候,我們只需要在函數(shù)之前加上@inline(never)。那么,當(dāng)編譯器檢測(cè)到該關(guān)鍵字的時(shí)候,編譯器就知道在這里永遠(yuǎn)都不需要內(nèi)聯(lián)展開。
正如我們看到的,dsipatch方法是基于Bag<(Event<E>) -> ()>類型的容器來實(shí)現(xiàn)的,之所以之前有一堆復(fù)雜的判斷邏輯,就是因?yàn)閮?yōu)化代碼執(zhí)行效率。當(dāng)bag中只有一個(gè)元素的時(shí)候,_onlyFastPath為true,那么我們只需要取出那一個(gè)執(zhí)行操作就可以了。然而當(dāng)我們超過一個(gè),小于三十個(gè)的時(shí)候,我們會(huì)將元素存儲(chǔ)在ContiguousArray中,通過下標(biāo)的方式來獲取元素進(jìn)行操作。而當(dāng)容器內(nèi)元素超過30這個(gè)閾值的時(shí)候,我們就要將元素插入到字典中,需要使用的時(shí)候再用鍵值對(duì)取出使用。
那么,_synchronized_on到底是如何配合dispatch來實(shí)現(xiàn)AsyncSubject的特性的呢?
func _synchronized_on(_ event: Event<E>) -> (Observers, Event<E>) {
_lock.lock(); defer { _lock.unlock() }
if _isStopped {
return (Observers(), .completed)
}
switch event {
case .next(let element):
_lastElement = element
return (Observers(), .completed)
case .error:
_stoppedEvent = event
let observers = _observers
_observers.removeAll()
return (observers, event)
case .completed:
let observers = _observers
_observers.removeAll()
if let lastElement = _lastElement {
_stoppedEvent = .next(lastElement)
return (observers, .next(lastElement))
}
else {
_stoppedEvent = event
return (observers, .completed)
}
}
}
當(dāng)源序列發(fā)送next事件的時(shí)候,AsyncSubject僅僅使用內(nèi)部的_lastElement屬性來記錄下當(dāng)前的next事件,然后構(gòu)造一個(gè)空的Bag來執(zhí)行completed事件(相當(dāng)于沒做什么事情)。
當(dāng)源序列發(fā)送error事件的時(shí)候,使用_stoppedEvent來記錄最后的最后的事件,然后構(gòu)造一個(gè)observers常量,將自身所有的觀察者拷貝到observers常量中,將自身所有的觀察者移除,最后把observers和該error事件返回。
(四)Subject的意義
當(dāng)然,除了AsyncSubject之外,我們還有還有以下幾種Subject:
- PublishSubject: 標(biāo)準(zhǔn)的熱信號(hào),訂閱者只會(huì)接收到訂閱操作之后的事件。
- ReplaySubject:訂閱者會(huì)接受到訂閱之前的事件以及訂閱之后的事件,類似于冷信號(hào)。
- BehaviorSubject:訂閱之后首先會(huì)接收到最近一次發(fā)送的事件(如果最近沒有發(fā)送,那么發(fā)送一個(gè)初始的事件)。
- Variable: 基于
BehaviorSubject的封裝,會(huì)將初始值或者最近的值發(fā)送給訂閱者。
然而寫到這里,我并不想一一詳細(xì)的分析剩下四種的實(shí)現(xiàn)細(xì)節(jié)。因?yàn)椋c剛剛分析完成的AsyncSubject相比,其余的Subject實(shí)現(xiàn)的方法都沒有太大的區(qū)別。所以筆者也不想在這里流水賬似的浪費(fèi)時(shí)間。
不如做一些更有意思的事情:為什么我們需要Subject?
現(xiàn)在我們先不妨設(shè)想一個(gè)這樣的場(chǎng)景:
我們需要追蹤用戶在iPhone上的每一次點(diǎn)擊,當(dāng)用戶點(diǎn)擊一次系統(tǒng)就會(huì)調(diào)用一次screenDidTapped(on point: CGPoint)方法。
那么在ReactiveX中,我們自然可以想到類似這樣的做法:
var observer: AnyObserver<CGPoint>!
let tapped = Observable<CGPoint>.create { (observer) -> Disposable in
observer = observer
return Disposables.create()
}
func screenDidTapped(on point: CGPoint) {
observer.on(point)
}
然而這樣的實(shí)現(xiàn)確實(shí)存在一些問題:
- 一對(duì)一的限制
由于Observables的特性限制,如果我們希望有多個(gè)觀察者來訂閱該點(diǎn)擊事件,那么Observables是無法做到的。當(dāng)你存在兩個(gè)及以上的訂閱的時(shí)候,只有最新的觀察者可以接收到序列的事件信息。
- 訂閱前的行為
還是由于Observables的特性,create構(gòu)造器的閉包只會(huì)在第一次被訂閱的時(shí)候會(huì)調(diào)用。然而當(dāng)點(diǎn)擊屏幕的時(shí)候,我們并不能保證就一定有觀察者訂閱了這個(gè)序列。
也就是說,當(dāng)你遇到類似上述的情況的時(shí)候,你需要使用熱信號(hào)(hot observeable)。
Hot Observables VS Cold observables
雖然冷熱信號(hào)已經(jīng)是被講爛的話題了,但是既然寫到這里已經(jīng)是不得不說的地步了。
Bnaya Eshet在他的博客中對(duì)"冷熱信號(hào)"有過一個(gè)非常形象的比喻:
if a tree falls in a forest and no one is around to hear it, does it make a sound? if it do make a sound when nobody observed it, we should mark it as hot, otherwise it should be marked as cold.
倘若一顆沙漠中的枯樹黯然傾倒而無人問津,是不是可以說它從未對(duì)這個(gè)世界發(fā)出聲音。倘若無人關(guān)心而算作發(fā)出了聲音,那么它就是熱信號(hào),反之,則是冷信號(hào)。
我們?cè)賮砜纯蠢錈嵝盘?hào)的對(duì)比:
| Hot Observables | Cold observables |
|---|---|
| 屬于序列 | 屬于序列 |
| 無論有或者沒有被訂閱,都會(huì)產(chǎn)生事件。 | 只有被訂閱的時(shí)候才會(huì)產(chǎn)生事件。 |
| 變量、屬性、點(diǎn)擊操作、鼠標(biāo)操作、UI的變化等 | 異步操作、HTTP連接、TCP連接等 |
| 通常包含N個(gè)Next事件 | 通常只有一個(gè)Next事件 |
| 數(shù)據(jù)源的變化能夠作用到所有的訂閱者。 | 數(shù)據(jù)源的變化只會(huì)作用到當(dāng)前的訂閱者。 |
| 它是有狀態(tài)的 | 它是無狀態(tài)的 |
在現(xiàn)實(shí)世界的編程中,我們總是面對(duì)著各種各樣復(fù)雜的情景。絕大多數(shù)的情況之下,我們的信號(hào)流可以是純函數(shù)的,不可變的,安全的。然而當(dāng)我們面對(duì)諸如鼠標(biāo)追蹤,變量的流式表達(dá)的時(shí)候,不可避免的我們需要使用到熱信號(hào)。
當(dāng)我們需要使用到熱信號(hào)的時(shí)候,我們?cè)俑鶕?jù)當(dāng)前的環(huán)境選擇最適合的Subject。比如我們希望對(duì)數(shù)據(jù)有“回看”功能,那么我們就可以選擇使用ReplaySubject。如果我們只關(guān)心最后一個(gè)數(shù)據(jù)變化,那么我們可以使用AsyncSubject,諸如此類等等。
(五)結(jié)語
對(duì)于Rx的使用者來說,我們更加希望使用的是Cold observables。從函數(shù)式的角度來說,Cold observables是不可變的,而Hot observables是可變的。不可變的數(shù)據(jù)總是更加的符合人的心智模型,而更加易于維護(hù)和理解,同時(shí)也更加的安全。希望這篇文章可以加深讀者對(duì)于Rx的理解。