RxSwift的核心流程可以簡化為三個步驟:
- 創(chuàng)建序列
- 訂閱序列
- 發(fā)送信號
// 創(chuàng)建序列
Observable<Int>.create { (anyObserver) -> Disposable in
// 發(fā)送信號
anyObserver.onNext(2)
return Disposables.create()
}
// 訂閱序列
.subscribe(onNext: { (element) in
print("訂閱到: \(element)")
})
.disposed(by: DisposeBag())
在執(zhí)行這行代碼得到的結(jié)果是: 訂閱到: 2 , 那么在RxSwift內(nèi)部是在什么時候開始發(fā)送信號(其實就是create(_ subscribe:)中的 subscribe閉包 什么時候執(zhí)行),又是什么時候訂閱到結(jié)果(就是subscribe(onNext:)中的 onNext閉包 什么時候執(zhí)行)。
創(chuàng)建序列
因為 Observable 繼承于 ObservableType, 所以點擊 create 方法可以看到 ObservableType 的擴展方法
extension ObservableType {
public static func create(_ subscribe: @escaping (AnyObserver<Element>) -> Disposable) -> Observable<Element> {
return AnonymousObservable(subscribe)
}
}
在 create 方法中,其實是初始化了 AnonymousObservable 類的對象,而在初始化的時候保存了 SubscribeHandler 閉包。
訂閱序列
-
subscribe(onNext: onError: onCompleted: onDisposed:)方法也是ObservableType的擴展方法,實現(xiàn)如下(忽略不關(guān)心的代碼):
public func subscribe(onNext: ((Element) -> Void)? = nil, onError: ((Swift.Error) -> Void)? = nil, onCompleted: (() -> Void)? = nil, onDisposed: (() -> Void)? = nil)
-> Disposable {
let observer = AnonymousObserver<Element> { event in
switch event {
case .next(let value):
onNext?(value)
case .error(let error):
if let onError = onError {
onError(error)
}
else {
Hooks.defaultErrorHandler(callStack, error)
}
disposable.dispose()
case .completed:
onCompleted?()
disposable.dispose()
}
}
return Disposables.create(
self.asObservable().subscribe(observer),
disposable
)
}
-
在這個方法中我們關(guān)注兩個方法:
- 初始化
AnonymousObserver(匿名觀察者)對象observer,保存了EventHandler閉包 -
self.asObservable()調(diào)用了subscribe(observer)
- 初始化
在前面創(chuàng)建序列分析
create方法時知道這里的self.asObservable()其實就是AnonymousObservable()對象,那么self.asObservable().subscribe(observer)=>AnonymousObservable().subscribe(observer)由于
AnonymousObservable繼承于Producer, 在Producer類中找到subscribe(observer)的實現(xiàn)如下:
override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
if !CurrentThreadScheduler.isScheduleRequired {
// 省略部分代碼
...
return disposer
}
else {
return CurrentThreadScheduler.instance.schedule(()) { _ in
let disposer = SinkDisposer()
let sinkAndSubscription = self.run(observer, cancel: disposer)
// 省略部分代碼
...
return disposer
}
}
}
- 在
subscribe(_ observer:)方法中其實調(diào)用的就是self.run(observer, ...),run(observer, ...)的具體實現(xiàn)是在AnonymousObservable中:
override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
let subscription = sink.run(self)
return (sink: sink, subscription: subscription)
}
- 我們可以看到在這里初始化了
AnonymousObservableSink類的對象,保存在外面創(chuàng)建的屬于AnonymousObserver類的observer對象。 -
sink.run中的實現(xiàn)為:
typealias Parent = AnonymousObservable<Element>
func run(_ parent: Parent) -> Disposable {
return parent._subscribeHandler(AnyObserver(self))
}
- 這里的
parent就是AnonymousObservable()對象 - 到此前面
AnonymousObservable(subscribe)保存的那份閉包會開始執(zhí)行,也就是開始發(fā)送信號。
-
捋一下整個流程為:
其中let ob = AnonymousObservable(subscribe)
let sink = AnonymousObservableSink(observer: observer, ...),
let observer = AnonymousObserver(event)self.asObservable().subscribe(observer)->ob.subscribe(observer)->ob.run(observer, ...)->sink.run(ob)->ob._subscribeHandler(AnyObserver(sink))
發(fā)送信號
- 在前面執(zhí)行
AnonymousObservable(subscribe)._subscribeHandler(AnyObserver(sink))中傳遞AnyObserver()對象出去。
public struct AnyObserver<Element> : ObserverType {
public typealias EventHandler = (Event<Element>) -> Void
private let observer: EventHandler
...
public init<Observer: ObserverType>(_ observer: Observer) where Observer.Element == Element {
self.observer = observer.on
}
public func on(_ event: Event<Element>) {
return self.observer(event)
}
...
}
- 從初始化方法中可以看到
AnyObserver(sink)保存的sink.on的閉包。 - 當執(zhí)行
anyObserver.onNext(2)時,因為AnyObserver是ObserverType的類型,所以會走到ObserverType的擴展方法
extension ObserverType {
public func onNext(_ element: Element) {
self.on(.next(element))
}
...
}
- 繼續(xù)走到
AnyObserver的on(_ event:)方法,傳遞.next事件, 從上面的on(_ event:)方法中看到它會執(zhí)行之前保存的閉包,因為我們之前保存的是sink.on閉包,所以最終會走到sink.on方法里:
func on(_ event: Event<Element>) {
...
switch event {
case .next:
if load(self._isStopped) == 1 {
return
}
self.forwardOn(event)
case .error, .completed:
if fetchOr(self._isStopped, 1) == 0 {
self.forwardOn(event)
self.dispose()
}
}
}
- 來到這里會找到父類的
forwardOn(_ event:)方法:
final func forwardOn(_ event: Event<Observer.Element>) {
...
self._observer.on(event)
}
- 還記得之前在創(chuàng)建
AnonymousObservableSink對象時,保存了AnonymousObserver對象嗎?self._observer.on(event)執(zhí)行AnonymousObserver的父類ObserverBase的on(event)方法:
func on(_ event: Event<Element>) {
switch event {
case .next:
if load(self._isStopped) == 0 {
self.onCore(event)
}
case .error, .completed:
if fetchOr(self._isStopped, 1) == 0 {
self.onCore(event)
}
}
}
-
onCore(_ event:)方法:
override func onCore(_ event: Event<Element>) {
return self._eventHandler(event)
}
執(zhí)行了在創(chuàng)建
AnonymousObserver對象時保存的閉包。到這里我們也繼續(xù)捋一下整個流程:
let observer = AnonymousObserver(_ event:)
anyObserver.onNext(2) -> anyObserver.on(.next(value)) -> sink.on(.next(value)) -> sink.forwardOn(.next(value)) -> sink._observer.on(.next(value)) -> observer.on(.next(value)) -> observer.onCore(.next(value)) -> observer._eventHandler(.next(value)) -> onNext?(value)
最后附上一張相關(guān)類繼承圖以幫助分析
