RxSwift的使用三步曲
第一步:創(chuàng)建序列
let ob = Observable<Any>.create { observer in
return Disposables.create()
}
第二步:訂閱序列
ob.subscribe(onNext: { text in
print("訂閱信息: \(text)")
}, onError: { error in
print("error: \(error)")
}, onCompleted: {
print("訂閱結(jié)束")
}) {
print("已銷毀")
}
第三步:發(fā)送信號(hào)
let ob = Observable<Any>.create { observer in
obserber.onNext("你好明天")
return Disposables.create()
}
整體代碼
let ob = Observable<Any>.create { observer in
obserber.onNext("你好明天")
return Disposables.create()
}
ob.subscribe(onNext: { text in
print("訂閱信息: \(text)")
}, onError: { error in
print("error: \(error)")
}, onCompleted: {
print("訂閱結(jié)束")
}) {
print("已銷毀")
}
.disposed(by: disposeBag)
分析代碼
- 1:創(chuàng)建序列后,RxSwift返回了一個(gè)observer,在這個(gè)閉包內(nèi)返回了Disposables.create(),創(chuàng)建的序列。
- 2:訂閱序列的各個(gè)信息,發(fā)送成功,結(jié)束等等的信號(hào)
- 3:發(fā)送的信號(hào)由RxSwift返回的observer來發(fā)送,這樣形成一個(gè)完整的環(huán)。
那么這個(gè)observer怎么來的?在onNext中發(fā)送的“你好明天”是怎么到訂閱的text中的呢?以及,這些信號(hào)怎么發(fā)送給訂閱者的呢? - 4:猜想:以UIButton為例子,UI序列的相應(yīng),比如UIButton它的點(diǎn)擊事件,監(jiān)聽的就是點(diǎn)擊事件,這個(gè)事件由UIButton來發(fā)送,而UIButton的事件響應(yīng),是依賴于target,在對(duì)應(yīng)的target去實(shí)現(xiàn)響應(yīng)的event。
那么在RxSwift中,UIControl為什么能夠監(jiān)聽到UI層,猜測(cè)是UIControl.addTarget(rx內(nèi)部類),事件的響應(yīng)就由rx內(nèi)部類來響應(yīng),類似于中間類或者是proxy。
同理,self.button.rx.tap.subscribe(),這個(gè)點(diǎn)擊事件就是通過UIControl響應(yīng)層去添加響應(yīng),而且這個(gè)響應(yīng)不再是原始的UI事件的響應(yīng)了,也不在控制器中了,在rx的內(nèi)部類中,所以在事件中可以直接調(diào)用observer的onNext方法,來發(fā)送響應(yīng)的信號(hào)。
還有一個(gè)在onNext中發(fā)送的“你好明天”是怎么到訂閱的text中的呢?這個(gè)問題我們沒有找到答案,繼續(xù)去查看RxSwift的源碼。
RxSwift源碼分析
前提因素:首先,通過creat來創(chuàng)建了一個(gè)序列,所以應(yīng)該先去探索creat這個(gè)方法,而能訂閱到“你好明天”這段字符串,依賴于發(fā)送信號(hào)obserber.onNext("你好明天"),而代碼順序是從上往下執(zhí)行的,那在ob.subscribe中能打印onNext發(fā)送的“你好明天”,那說明在ob.subscribe中應(yīng)該暗藏一句代碼,執(zhí)行onNext。即subscribe的閉包執(zhí)行依賴發(fā)送信號(hào)onNext的執(zhí)行。帶著這些前提,去分析查看源碼。
一 首先來看一下creat序列的創(chuàng)建
public static func create(_ subscribe: @escaping (AnyObserver<Element>) -> Disposable) -> Observable<Element> {
AnonymousObservable(subscribe)
}
creat創(chuàng)建了一個(gè)匿名序列AnonymousObservable
final private class AnonymousObservable<Element>: Producer<Element> {
typealias SubscribeHandler = (AnyObserver<Element>) -> Disposable
let subscribeHandler: SubscribeHandler
init(_ subscribeHandler: @escaping SubscribeHandler) {
self.subscribeHandler = subscribeHandler
}
override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
let subscription = sink.run(self)
return (sink: sink, subscription: subscription)
}
}
這個(gè)匿名序列繼承自Producer,而且init方法保存了通過create傳遞的self.subscribeHandler。
Producer繼承自O(shè)bservable即序列的基類,Observable遵循了協(xié)議ObservableType,ObservableType協(xié)議提供了subscribe方法。該協(xié)議是貫穿全劇的,可擴(kuò)展協(xié)議可以根據(jù)不同的內(nèi)容需求擴(kuò)展。
匿名序列的run方法,就是重寫的父類的方法。

二 subscribe訂閱信號(hào)的內(nèi)容
public func subscribe(_ on: @escaping (Event<Element>) -> Void) -> Disposable {
let observer = AnonymousObserver { e in
on(e)
}
return self.asObservable().subscribe(observer)
}
在訂閱的時(shí)候,創(chuàng)建了一個(gè)匿名內(nèi)部類AnonymousObserver ,AnonymousObserver繼承自繼承ObserverBase,ObserverBase遵循了協(xié)議Disposable, ObserverType,而AnonymousObserver在初始化的時(shí)候保存了eventHandler。self.asObservable().subscribe(observer),.asObservable這個(gè)是可觀察序列基類Observerble的方法,是采用的接口隔離原則。事實(shí)上萬物皆可序列,而比如以UISwitch為例:
UISwitch().rx.value.asObservable()
.subscribe { bool in
}
.disposed(by: disposeBag)
為什么需要調(diào)用asObservable呢?
public var value: ControlProperty<Bool> {
return base.rx.controlPropertyWithDefaultEvents(
getter: { uiSwitch in
uiSwitch.isOn
}, setter: { uiSwitch, value in
uiSwitch.isOn = value
}
)
}
從源碼中看到,value是ControlProperty類型,ControlProperty遵循了協(xié)議ControlPropertyType,不是可觀察序列,通過調(diào)用asObservable強(qiáng)轉(zhuǎn)為可觀察序列,然后利用可觀察序列的各種方法,來達(dá)到相應(yīng)的需求,這也是萬物皆可序列的第二種解釋。
而asObservable返回的就是一個(gè)可觀察序列,返回的是self.values,類型就是 Observable<PropertyType>
let values: Observable<PropertyType>
public func asObservable() -> Observable<Element> {
self.values
}
繼續(xù)探究self.asObservable().subscribe(observer),subscribe的內(nèi)容。subscribe執(zhí)行了父類Produver實(shí)現(xiàn)的協(xié)議方法subscribe,然后執(zhí)行了self.run方法,即執(zhí)行了匿名序列AnonymousObservable的run方法
override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
let subscription = sink.run(self)
return (sink: sink, subscription: subscription)
}
匿名序列AnonymousObservable的run方法,執(zhí)行了AnonymousObservableSink的run方法
func run(_ parent: Parent) -> Disposable {
parent.subscribeHandler(AnyObserver(self))
}
AnonymousObservable 調(diào)用把AnonymousObservableSink作為參數(shù),調(diào)用創(chuàng)建的閉包,執(zhí)行了保存的subscribeHandler即是在create的時(shí)候保存的閉包,并將AnyObserver(self)傳遞了進(jìn)去,這就是在create的閉包中傳來的observer。流程如下圖所示:

三 onNext發(fā)送信號(hào)
extension ObserverType {
/// Convenience method equivalent to `on(.next(element: Element))`
///
/// - parameter element: Next element to send to observer(s)
public func onNext(_ element: Element) {
self.on(.next(element))
}
}
執(zhí)行了匿名類AnonymousObserver的父類ObserverBase的on方法
func on(_ event: Event<Element>) {
switch event {
case .next:
if load(self.isStopped) == 0 {
self.onCore(event)
}
case .error, .completed:
if fetchOr(self.isStopped, 1) == 0 {
self.onCore(event)
}
}
}
然后執(zhí)行了匿名類AnonymousObserver的onCore方法
override func onCore(_ event: Event<Element>) {
self.eventHandler(event)
}
然后在這個(gè)方法中執(zhí)行了匿名類在subscribe中保存的eventHandler,完整的流程圖如下:

總結(jié):Observable的核心流程
從create開始,得到一個(gè)匿名觀察序列AnonymousObservable,這個(gè)匿名類保存了subscribeHandler。
然后通過訂閱方法subscribe創(chuàng)建了匿名observer保存了事件eventHandler,并且通過AnonymousObservable的run方法將observer傳遞,最終執(zhí)行subscribeHandler將信號(hào)傳遞。
發(fā)送信號(hào)onNext最終通過核心方法onCore執(zhí)行了eventHandler將事件傳遞。