透視RxSwift核心邏輯
篇幅稍微有點長,了解程度不同,可以跳過某些部分。
- 如果對源碼比較熟悉的,建議直接看圖就行了,時序圖更加清晰。第一次摸索有必要閱讀文字內(nèi)容。
- 貼出來的代碼省略了不必要的部分,用省略號代替。
示例
RxSwift的基礎(chǔ)用法就是很簡單的幾步:
- 創(chuàng)建可觀察序列
- 監(jiān)聽序列(訂閱信號)
- 銷毀序列
//創(chuàng)建序列
let ob = Observable<Any>.create { (observer) -> Disposable in
//發(fā)送信號
observer.onNext("今日份麻醬涼皮")
observer.onCompleted()
return Disposables.create()
}
//訂閱信號
let _ = ob.subscribe(onNext: { (text) in
print("訂閱到:\(text)")
}, onError: { (error) in
print(error)
}, onCompleted: {
print("完成")
}) {
print("銷毀")
}
控制臺輸出:
訂閱到:今日份麻醬涼皮
完成
銷毀
探究
在看源碼之前,應(yīng)該對接觸到的類和協(xié)議有些認(rèn)識,方便之后的理解。下面的關(guān)系圖在需要的時候回頭熟悉一下就行:

到底是什么在支撐如此便捷的調(diào)用?
第一句Observable<Any>.create創(chuàng)建了一個可觀察序列Observable對象,第二句就是這個Observable序列對象訂閱了消息。
從輸出可以看出,都是訂閱到的消息。那么訂閱時傳入subscribe的閉包是什么時候調(diào)用的呢?
單從現(xiàn)在的幾句代碼,也能猜出是第一句代碼的閉包中的observer.onNext的調(diào)用引起的。但是,我們也沒有看到這個create函數(shù)中的閉包是在哪里執(zhí)行的?
為了能夠清晰的描述,暫且稱第一句
create中的閉包為create閉包,第二句subscribe中的幾個閉包為subscribe閉包。
外面看不出來,那我們只能進(jìn)去RxSwift里面探索下create和subscribe到底做了什么?
create函數(shù)的實現(xiàn):
extension ObservableType {
public static func create(_ subscribe: @escaping (AnyObserver<E>) -> Disposable) -> Observable<E> {
return AnonymousObservable(subscribe)
}
}
原來這函數(shù)內(nèi)部實際上是創(chuàng)建了一個AnonymousObservable匿名可觀察序列對象。而之前的閉包也是給AnonymousObservable對象初始化用了。
AnonymousObservable類:
final private class AnonymousObservable<Element>: Producer<Element> {
typealias SubscribeHandler = (AnyObserver<Element>) -> Disposable
let _subscribeHandler: SubscribeHandler
init(_ subscribeHandler: @escaping SubscribeHandler) {
self._subscribeHandler = subscribeHandler
}
override func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element {
let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
let subscription = sink.run(self)
return (sink: sink, subscription: subscription)
}
}
這里在初始化的時候把create閉包賦值給了_subscribeHandler屬性。
到此為止,
Observable<Any>.create函數(shù)實際上創(chuàng)建了一個AnonymousObservable匿名可觀察序列對象,并保存了create閉包。

。。。貌似這條不是主線??!沒有找到任何一個問題的答案。
再來翻翻subscribe函數(shù):
extension ObservableType {
public func subscribe(onNext: ((E) -> Void)? = nil, onError: ((Swift.Error) -> Void)? = nil, onCompleted: (() -> Void)? = nil, onDisposed: (() -> Void)? = nil)
-> Disposable {
let disposable: Disposable
......
let observer = AnonymousObserver<E> { event in
......
switch event {
case .next(let value):
onNext?(value)
case .error(let error):
if let onError = onError {
onError(error)
}
else {
Hooks.defaultErrorHandler(callStack, error)
}
disposable.dispose()
case .completed:
onCompleted?()
disposable.dispose()
}
}
return Disposables.create(
self.asObservable().subscribe(observer),
disposable
)
}
}
這也是對定義在ObservableType協(xié)議中的函數(shù)的實現(xiàn),返回了一個Disposable。這個Disposable就是用來管理訂閱的生命周期的,示例代碼中并沒有體現(xiàn)出來,實際是在訂閱信號的內(nèi)部處理的。前面都沒什么,后面創(chuàng)建了AnonymousObserver,并且在AnonymousObserver初始化時傳入了閉包,并賦值給_eventHandler屬性。
final class AnonymousObserver<ElementType> : ObserverBase<ElementType> {
typealias Element = ElementType
typealias EventHandler = (Event<Element>) -> Void
private let _eventHandler : EventHandler
init(_ eventHandler: @escaping EventHandler) {
self._eventHandler = eventHandler
}
override func onCore(_ event: Event<Element>) {
return self._eventHandler(event)
}
}
之前,
AnonymousObservable匿名序列對象,保存了create閉包。
此時,創(chuàng)建了AnonymousObserver匿名觀察者對象,保存了對subscribe閉包的回調(diào)執(zhí)行的EventHandler閉包。
又一條支線。。。一路走來,都是在創(chuàng)建對象,保存閉包。兩個主線疑問還是無跡可尋。難道一開始就走上了歧路?非也!繼續(xù)看下去就明白了什么叫「柳暗花明又一村」。
AnonymousObservable的subscribe函數(shù)中,在創(chuàng)建了AnonymousObserver對象后,還返回了一個新建的Disposable對象。重點就在這里的第一個參數(shù):self.asObservable().subscribe(observer)中。asObservable還是返回了self,后面貼上的ObserverType中可以看到。剩下的就是AnonymousObservable的父類Producer中的subscribe了:
class Producer<Element> : Observable<Element> {
override func subscribe<O : ObserverType>(_ observer: O) -> Disposable where O.E == Element {
if !CurrentThreadScheduler.isScheduleRequired {
......
} else {
return CurrentThreadScheduler.instance.schedule(()) { _ in
let disposer = SinkDisposer()
let sinkAndSubscription = self.run(observer, cancel: disposer)
disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
return disposer
}
}
}
}
由于是在當(dāng)前線程中執(zhí)行的,只看else那部分。disposer相關(guān)的不用關(guān)心。關(guān)鍵是observer參數(shù),這個參數(shù)中有對subscribe閉包的處理的EventHandler閉包。observer傳入了self.run(observer, cancel)。所以,還要回頭再看看AnonymousObservable類中的run函數(shù):
override func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element {
let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
let subscription = sink.run(self)
return (sink: sink, subscription: subscription)
}
這里又創(chuàng)建了一個AnonymousObservableSink對象,observer和cancel繼續(xù)往初始化函數(shù)中丟:
final private class AnonymousObservableSink<O: ObserverType>: Sink<O>, ObserverType {
typealias E = O.E
typealias Parent = AnonymousObservable<E>
// state
private let _isStopped = AtomicInt(0)
override init(observer: O, cancel: Cancelable) {
super.init(observer: observer, cancel: cancel)
}
func on(_ event: Event<E>) {
......
switch event {
case .next:
if load(self._isStopped) == 1 {
return
}
self.forwardOn(event)
case .error, .completed:
if fetchOr(self._isStopped, 1) == 0 {
self.forwardOn(event)
self.dispose()
}
}
}
func run(_ parent: Parent) -> Disposable {
return parent._subscribeHandler(AnyObserver(self))
}
}
前面sink.run(self)把self傳了進(jìn)來,又對AnonymousObservable起了別名Parent。parent._subscribeHandler不就是AnonymousObservable在調(diào)用它最開始保存的那個create閉包么?AnyObserver(self)則把AnonymousObservableSink作為AnyObserver初始化的參數(shù)。
public struct AnyObserver<Element> : ObserverType {
public typealias E = Element
public typealias EventHandler = (Event<Element>) -> Void
private let observer: EventHandler
public init(eventHandler: @escaping EventHandler) {
self.observer = eventHandler
}
public init<O : ObserverType>(_ observer: O) where O.E == Element {
self.observer = observer.on
}
public func on(_ event: Event<Element>) {
return self.observer(event)
}
public func asObserver() -> AnyObserver<E> {
return self
}
}
其中還把AnonymousObservableSink的on函數(shù)賦值給了AnyObserver的屬性observer,observer就是EventHandler。這個EventHandler在create閉包中會用到。
這不就是第二個主線疑問(create函數(shù)中的閉包何時調(diào)用)的答案么!
整理一下:
- 從
subscribe(onNext, onError, onCompleted, onDisposed) -> Disposable函數(shù)中創(chuàng)建Disposable開始 -
AnonymousObservable調(diào)用subscribe(observer) -
AnonymousObservable調(diào)用run(observer, cancel) - 創(chuàng)建
AnonymousObservableSink(observer: observer, cancel: cancel),并且sink.run(self) parent._subscribeHandler(AnyObserver(self))
這是一條從
subscribe閉包-->create閉包的線。

還沒完,還有個create閉包中怎么觸發(fā)subscribe閉包的?
又臭又長的寫了這么多,這里就只看observer.onNext("今日份麻醬涼皮")吧。點進(jìn)去看:
observer.onNext("今日份麻醬涼皮"):
extension ObserverType {
public func onNext(_ element: E) {
self.on(.next(element))
}
public func onCompleted() {
self.on(.completed)
}
public func onError(_ error: Swift.Error) {
self.on(.error(error))
}
}
onNext中調(diào)用了on,在前面AnyObserver結(jié)構(gòu)體定義中可以看出,on函數(shù)中返回了self.observer(event),之前是把AnonymousObservableSink的on賦值給了這個self.observer,所以,此時會走到AnonymousObservableSink的on函數(shù)中。這里面又調(diào)用了self.forwardOn(event),看下AnonymousObservableSink的父類Sink中定義的forwardOn:
class Sink<O : ObserverType> : Disposable {
fileprivate let _observer: O
fileprivate let _cancel: Cancelable
fileprivate let _disposed = AtomicInt(0)
......
init(observer: O, cancel: Cancelable) {
......
self._observer = observer
self._cancel = cancel
}
final func forwardOn(_ event: Event<O.E>) {
......
self._observer.on(event)
}
}
forwardOn中走了一句self._observer.on(event)。這里的_observer屬性不就是AnonymousObservableSink初始化時傳入的AnonymousObserver對象么!
繼續(xù)跟AnonymousObserver的on:
final class AnonymousObserver<ElementType> : ObserverBase<ElementType> {
typealias Element = ElementType
typealias EventHandler = (Event<Element>) -> Void
private let _eventHandler : EventHandler
init(_ eventHandler: @escaping EventHandler) {
......
self._eventHandler = eventHandler
}
override func onCore(_ event: Event<Element>) {
return self._eventHandler(event)
}
}
這里沒有on,看父類ObserverBase:
class ObserverBase<ElementType> : Disposable, ObserverType {
typealias E = ElementType
private let _isStopped = AtomicInt(0)
func on(_ event: Event<E>) {
switch event {
case .next:
if load(self._isStopped) == 0 {
self.onCore(event)
}
case .error, .completed:
if fetchOr(self._isStopped, 1) == 0 {
self.onCore(event)
}
}
}
}
這里的on函數(shù)中,有個.next分支,調(diào)用了self.onCore(event)。子類AnonymousObserver實現(xiàn)的onCore中又調(diào)用了self._eventHandler(event)。
這個_eventHandler是什么?不就是AnonymousObserver初始化時保存的對subscribe閉包處理的閉包么!所以create閉包中的observer.onNext("今日份麻醬涼皮")就能觸發(fā)subscribe閉包了。
這是一條從
AnonymousObservable調(diào)用_subscribeHandler(也就是create閉包)時的參數(shù)AnyObserver-->subscribe閉包的線。

現(xiàn)在我們看清楚了響應(yīng)式的數(shù)據(jù)流:
- 在訂閱信號時創(chuàng)建了observer并執(zhí)行創(chuàng)建序列時的閉包
- 在創(chuàng)建序列的閉包中有回調(diào)observer,監(jiān)聽序列的變動而觸發(fā)訂閱信號的閉包
圖解
看清楚了么?好像清楚的不太明顯。畢竟好幾個類、協(xié)議,又那么多函數(shù)調(diào)來調(diào)去的。加把勁再擼一擼。既然這么五花八門的調(diào)用流程搞清楚了,那就來弄清楚它主要都做了什么?
一圖概千言,既然畫了圖,就少敲點鍵盤吧!

仔細(xì)看了流程圖就會發(fā)現(xiàn),出現(xiàn)的幾個類中,干活的主要是Anonymous開頭的那三個類。我們在外面調(diào)用的操作,其實是在使用RxSwift內(nèi)部封裝的一些類。
- 創(chuàng)建可觀察序列
AnonymousObservable,并保存create閉包 - 訂閱信號
- 首先創(chuàng)建了一個
AnonymousObserver,并且把對subscribe閉包的操作封裝成了EventHandler - 苦力活還是
AnonymousObservable來干。- 在創(chuàng)建返回值
Disposable中,由subscribe(observer),把AnonymousObserver傳給了AnonymousObservableSink-
AnonymousObservableSink才是信息處理的核心,因為他知道的太多了 -
AnonymousObservableSink有AnonymousObserver觀察者,AnonymousObserver持有著EventHandler。 -
AnonymousObservableSink在調(diào)用run函數(shù)時也傳入了AnonymousObservable序列,AnonymousObservable就是create閉包的持有者。 -
AnonymousObservableSink初始化的時候,除了觀察者外,還有個管理序列生命周期的Disposable。
-
-
AnonymousObservableSink作為一個內(nèi)部類,在被create閉包當(dāng)做參數(shù)回調(diào)給外界時需要轉(zhuǎn)換為AnyObserver,在這里AnyObserver則是以閉包屬性的形式保留了AnonymousObservableSink的on函數(shù) - 后面在信號發(fā)生改變時就可以讓
AnyObserver通過這個屬性值聯(lián)系到AnonymousObservableSink
- 在創(chuàng)建返回值
- 首先創(chuàng)建了一個
訂閱信號:
Observable-->AnonymousObservable-->AnonymousObserver-->AnonymousObservableSink-->AnyObserver-->create閉包
- 發(fā)出信號
- 這個過程基本就是和訂閱信號時相反的
- 從
create閉包中調(diào)用AnyObserver的onNext開始 - 通過
AnyObserver.observer訪問閉包中的AnonymousObservableSink -
AnonymousObservableSink擁有AnonymousObserver -
AnonymousObserver掌控EventHandler - 句號
發(fā)出信號:
create閉包-->AnyObserver-->AnonymousObservableSink-->AnonymousObserver-->subscribe閉包