一直以來,響應(yīng)式編程都是業(yè)界討論的熱門話題之一。為了推廣響應(yīng)式編程,ReactiveX 社區(qū)幾乎為每一種編程語言設(shè)計(jì)實(shí)現(xiàn)了一種對應(yīng)的響應(yīng)式編程框架。RxSwift 就是針對 Swift 所開發(fā)的響應(yīng)式框架。
關(guān)于 RxSwift,網(wǎng)上有不少相關(guān)的學(xué)習(xí)資料,但絕大多數(shù)都是 RxSwift 的使用說明,鮮有文章介紹 RxSwift 背后的設(shè)計(jì)原理。通過閱讀源碼,查閱資料,正向設(shè)計(jì),我逐步理解了 RxSwift 的設(shè)計(jì)思想。因此,趁熱打鐵,記錄并總結(jié)一下我的理解。
本文所實(shí)現(xiàn)的 RxSwift 代碼已在 Github 開源——傳送門。參照源碼閱讀本文效果更佳。
基礎(chǔ)
RxSwift 主要蘊(yùn)含了以下幾種設(shè)計(jì)思想:
- 發(fā)布-訂閱模式
- 流編程
- 函數(shù)式編程
下面,我們依次來進(jìn)行介紹。
發(fā)布-訂閱模式
發(fā)布-訂閱模式 是 RxSwift 所呈現(xiàn)的一種最直觀思想。發(fā)布-訂閱模式可以分為兩個(gè)角色:發(fā)布者、訂閱者。
訂閱者的主要職責(zé)是:
- 訂閱:監(jiān)聽并處理某個(gè)事件。其本質(zhì)就是向發(fā)布者注冊一個(gè)處理某個(gè)事件的閉包。
- 取消(訂閱)
發(fā)布者的主要職責(zé)是:
- 發(fā)布:分發(fā)某個(gè)事件
發(fā)布-訂閱模式的基本原理是:
- 訂閱者調(diào)用發(fā)布者提供的訂閱方法進(jìn)行訂閱,從而在發(fā)布者內(nèi)部注冊訂閱者。
- 發(fā)布者內(nèi)部會維護(hù)一個(gè)訂閱者列表。
- 當(dāng)發(fā)布者發(fā)布事件時(shí),會遍歷訂閱者列表,執(zhí)行其中的處理方法(將事件作為參數(shù)傳遞給閉包,并執(zhí)行)。
舉例
在日常開發(fā)中,發(fā)布-訂閱模式是一種廣泛被應(yīng)用的設(shè)計(jì)模式。比如:iOS 中的 NotificationCenter、Flutter 中的事件總線都是基于這種模式實(shí)現(xiàn)的。如下所示為 Flutter 中事件總線的一種實(shí)現(xiàn)方式,其中的代碼邏輯基本遵循了上述所描述的發(fā)布-訂閱模式的基本原理。
// 訂閱者訂閱內(nèi)容簽名
typedef void EventCallback(arg);
class EventBus {
// 私有構(gòu)造函數(shù)
EventBus._internal();
// 保存單例
static EventBus _singleton = new EventBus._internal();
// 工廠構(gòu)造函數(shù)
factory EventBus()=> _singleton;
// 保存事件訂閱者隊(duì)列,key:事件名(id),value: 對應(yīng)事件的訂閱者隊(duì)列
var _emap = new Map<Object, List<EventCallback>>();
// 添加訂閱者,即訂閱
void on(eventName, EventCallback f) {
if (eventName == null || f == null) return;
_emap[eventName] ??= new List<EventCallback>();
_emap[eventName].add(f);
}
// 移除訂閱者,即取消
void off(eventName, [EventCallback f]) {
var list = _emap[eventName];
if (eventName == null || list == null) return;
if (f == null) {
_emap[eventName] = null;
} else {
list.remove(f);
}
}
// 觸發(fā)事件,事件觸發(fā)后該事件所有訂閱者會被調(diào)用,即發(fā)布
void emit(eventName, [arg]) {
var list = _emap[eventName];
if (list == null) return;
int len = list.length - 1;
// 反向遍歷,防止訂閱者在回調(diào)中移除自身帶來的下標(biāo)錯(cuò)位
for (var i = len; i > -1; --i) {
// 執(zhí)行注冊的閉包
list[i](arg);
}
}
}
// 定義一個(gè) top-level(全局)變量,頁面引入該文件后可以直接使用 bus
var bus = new EventBus();
思考:為什么 iOS 中一個(gè)監(jiān)聽了某個(gè)通知的類必須要在
dealloc時(shí)要執(zhí)行removeObserver:方法?
流編程
流(stream) 是 RxSwift 另一個(gè)重要的設(shè)計(jì)思想。Observable<T> 是 Rx 框架的基礎(chǔ),也被稱為 可觀察序列。它的作用是可以異步地產(chǎn)生一系列數(shù)據(jù),即一個(gè) Observable<T> 對象會隨著時(shí)間的推移不定期地發(fā)出 event(element: T)。數(shù)據(jù)就像水流一樣持續(xù)不斷地在流動(dòng),顧名思義,這也被稱為 流編程。
關(guān)于流編程,《計(jì)算機(jī)程序的構(gòu)造與解釋》一書中認(rèn)為 流編程是一種調(diào)用驅(qū)動(dòng)的編程思想。流編程的基本思想是:一般情況下,只是部分地構(gòu)造出流的結(jié)構(gòu),并將這樣的部分結(jié)構(gòu)傳給使用流的程序。如果使用者需要訪問這個(gè)流中未構(gòu)造出的那個(gè)部分,那么這個(gè)流就會自動(dòng)地繼續(xù)構(gòu)造下去,但是只做出滿足當(dāng)時(shí)需要的那一部分。
如下所示是 RxSwift 中常見使用形式,observable 是數(shù)據(jù)源,不斷地發(fā)出數(shù)據(jù),如果水流一樣,最終流向 subscribe 中的閉包。期間會流經(jīng) map,filter 等操作符,經(jīng)過轉(zhuǎn)換或過濾。這就是流編程的思想。
let observable: Observable<Int> = Observable.create { (observer) -> Disposable in
observer.onNext(0)
observer.onNext(1)
observer.onNext(2)
observer.onNext(3)
observer.onCompleted()
return Disposables.create()
}
observable.map { $0 + 2}.filter { $0 > 3 }.subscribe { (event) in
...
}
流編程底層實(shí)現(xiàn)的本質(zhì)則是 閉包的延遲執(zhí)行和強(qiáng)制執(zhí)行。具體來說是基于一種稱為 delay 的特殊形式,對于 delay <exp> 的求值不會對 <exp> 求值,而是返回一個(gè)稱為 延時(shí)對象 的對象。它可以看做是對未來的某個(gè)時(shí)間求值 <exp> 的允諾。在各類編程語言中,返回特定類型的閉包常被用于描述一個(gè)延遲對象。與 delay 配對的是 force 的過程。它以一個(gè)延時(shí)對象為參數(shù),執(zhí)行相應(yīng)的求值工作,即迫使 delay 完成其所允諾的求值。在各類編程語言中,執(zhí)行返回特定類型的閉包常被用于描述 force 的過程。
函數(shù)式編程
RxSwift 提供了大量無副作用的操作符,無副作用也是函數(shù)式編程的一種重要特性。RxSwift 能夠?qū)崿F(xiàn)操作符的鏈?zhǔn)秸{(diào)用,一個(gè)重要的前提是:提供操作符的類型和操作符的返回類型必須保持一致。這里就涉及到了函數(shù)式編程中的一些高階概念:函子(Functor)、適用函子(Applicative)、單子(Monad)。詳細(xì)內(nèi)容可參見《函數(shù)式編程——Functor、Applicative、Monad》一文。
不過,RxSwift 操作符中運(yùn)用最多的是 函子(Functor),什么是函子?簡而言之,函子能夠?qū)⑵胀ê瘮?shù)應(yīng)用到一個(gè)包裝類型。如下圖及代碼所示,一個(gè)包裝類型包含了一個(gè)原始值,當(dāng)函子作用在其上后,使用普通函數(shù)對原始值進(jìn)行轉(zhuǎn)換,最終將結(jié)果值放入包裝類型中返回。
extension Result {
// 滿足 Functor 的條件:map 方法能夠?qū)?普通函數(shù) 應(yīng)用到包裝類
func map<U>(_ f: (T) -> U) -> Result<U> {
switch self {
case .success(let x): return .success(f(x))
case .failure: return .failure
}
}
}
以 RxSwift 中最常用的 map 操作符為例,如下所示。map 方法擴(kuò)展自 ObservableType,其能夠?qū)⑵胀ê瘮?shù) (Element) -> Result 應(yīng)用到 ObservableType 包裝類型。這其實(shí)就是一種典型的 函子 應(yīng)用。
extension ObservableType {
public func map<Result>(_ transform: @escaping (Element) throws -> Result)
-> Observable<Result> {
return Map(source: self.asObservable(), transform: transform)
}
}
核心實(shí)現(xiàn)
下面我們將以正向設(shè)計(jì)的方式,結(jié)合 RxSwift 中的設(shè)計(jì)思想,手動(dòng)實(shí)現(xiàn)一個(gè) RxSwift 的核心部分。
基本功能(RxDemo-01)
首先,我們來定義事件,RxSwift 中有三種類型的事件,如下所示:
// MARK: - Event
enum Event<Element> {
case next(Element)
case error(Error)
case completed
}
其次,我們來定義訂閱者,在 RxDemo-01 中,我們先忽略 取消訂閱 的功能,如下所示。訂閱者必須遵循訂閱者協(xié)議,需要實(shí)現(xiàn) 監(jiān)聽事件 的方法 on(event: Event<Element>)。訂閱者內(nèi)部維護(hù)一個(gè)處理事件的閉包 _handler。當(dāng)監(jiān)聽事件方法觸發(fā)時(shí),會立即執(zhí)行處理事件閉包。
// MARK: - Observer
protocol ObserverType {
associatedtype Element
// 監(jiān)聽事件
func on(event: Event<Element>)
}
class Observer<Element>: ObserverType {
// 處理事件的閉包
private let _handler: (Event<Element>) -> Void
init(handler: @escaping (Event<Element>) -> Void) {
_handler = handler
}
// 實(shí)現(xiàn) 監(jiān)聽事件 的協(xié)議,內(nèi)部處理事件
func on(event: Event<Element>) {
// 處理事件
_handler(event)
}
}
最后,我們來定義發(fā)布者,如下所示。發(fā)布者必須遵循發(fā)布者協(xié)議,需要實(shí)現(xiàn) 訂閱操作 的方法 subscribe<O: ObserverType>(observer: O) where O.Element == Element。發(fā)布者內(nèi)部維護(hù)一個(gè)發(fā)布事件的閉包 _eventGenerator。當(dāng)訂閱發(fā)生時(shí)(訂閱操作方法被執(zhí)行時(shí)),會立即執(zhí)行發(fā)布事件的閉包。
// MARK: - Observable
protocol ObservableType {
associatedtype Element
// 訂閱操作
func subscribe<O: ObserverType>(observer: O) where O.Element == Element
}
class Observable<Element>: ObservableType {
// 定義 發(fā)布事件 的閉包
private let _eventGenerator: (Observer<Element>) -> Void
init(eventGenerator: @escaping (Observer<Element>) -> Void) {
_eventGenerator = eventGenerator
}
// 實(shí)現(xiàn) 訂閱操作 的協(xié)議,內(nèi)部發(fā)布事件
func subscribe<O: ObserverType>(observer: O) where O.Element == Element {
// 生成事件
_eventGenerator(observer as! Observer<Element>)
}
}
接下來,我們來試用一下 RxDemo-01 所實(shí)現(xiàn)的 RxSwift。很顯然,這種模式與原始的 RxSwift 的使用方式基本吻合。
// MARK: - Test
let observable = Observable<Int> { (observer) in
print("send 0")
observer.on(event: .next(0))
print("send 1")
observer.on(event: .next(1))
print("send 2")
observer.on(event: .next(2))
print("send 3")
observer.on(event: .next(3))
print("send completed")
observer.on(event: .completed)
}
let observer = Observer<Int> { (event) in
switch event {
case .next(let value):
print("recive \(value)")
case .error(let error):
print("recive \(error)")
case .completed:
print("recive completed")
}
}
observable.subscribe(observer: observer)
// send 0
// recive 0
// send 1
// recive 1
// send 2
// recive 2
// send 3
// recive 3
// send completed
// recive completed
下圖所示為 RxDemo-01 實(shí)現(xiàn)的 RxSwift 的內(nèi)部調(diào)用關(guān)系。通過閉包實(shí)現(xiàn)將 observer 傳遞給 observable,在發(fā)布事件時(shí)能夠?qū)⑹录鬟f給 observer,從而形成一條看似自左向右流動(dòng)的數(shù)據(jù)流。
取消訂閱(RxDemo-02)
RxDemo-01 有一個(gè)明顯的缺陷——無法取消訂閱。我們來參考 RxSwift 的實(shí)現(xiàn),它 并不是直接讓訂閱者支持取消訂閱,而是通過一個(gè)第三方類型 Disposable 對訂閱進(jìn)行管理。Disposable 的核心作用是 提供一個(gè)狀態(tài)位標(biāo)識訂閱是否已經(jīng)取消。
關(guān)于由第三方類來管理訂閱,而不是讓訂閱者自己管理的原因,我猜測有兩個(gè):一是出于職責(zé)單一的原則;二是為了支持函數(shù)式編程,抽取一個(gè)第三方類型作為返回值,從而在鏈?zhǔn)秸{(diào)用時(shí)保持類型一致。
Disposable 協(xié)議要求所有的 Disposable 類型都實(shí)現(xiàn) dispose 方法,表示取消訂閱。
protocol Disposable {
// 取消訂閱
func dispose()
}
這里我們定義兩個(gè)遵循 Dispoable 協(xié)議的類型:AnonymousDisposable 和 CompositeDisposable。
AnonymousDisposable 作為一個(gè)匿名 Disposable,在本例中作為最底層的 Disposable 并沒有什么作用,只是為了實(shí)現(xiàn)模式統(tǒng)一而已。
class AnonymousDisposable: Disposable {
// AnonymousDisposable 封裝了 取消訂閱 的閉包
private let _disposeHandler: () -> Void
init(_ disposeClosure: @escaping () -> Void) {
_disposeHandler = disposeClosure
}
func dispose() {
_disposeHandler()
}
}
CompositeDisposable 作為一個(gè)可管理多個(gè) Disposable 的容器,它內(nèi)部維持一個(gè)標(biāo)志位表示訂閱是否被取消。CompositeDisposable 所實(shí)現(xiàn)的 dispose 方法真正改變了標(biāo)志位,并對其所維護(hù)的所有 Disposable 執(zhí)行各自的 dispose 方法,從而完成它們定義的在取消訂閱時(shí)需要執(zhí)行的附帶操作。
class CompositeDisposable: Disposable {
// 可用于管理一組 Disposable 的 CompositeDisposable
// 判斷是否已銷毀(取消訂閱)的標(biāo)志位
private(set) var isDisposed: Bool = false
// 管理一組 Disposable
private var disposables: [Disposable] = []
init() {}
func add(disposable: Disposable) {
if isDisposed {
disposable.dispose()
return
}
disposables.append(disposable)
}
func dispose() {
guard !isDisposed else { return }
// 銷毀所有 disposable,并設(shè)置標(biāo)志位
disposables.forEach {
$0.dispose()
}
isDisposed = true
}
}
很顯然,這里執(zhí)行 dispose 操作只是修改了狀態(tài),并沒有釋放訂閱資源。只有當(dāng) CompositeDisposable 對象被釋放后才算真正釋放資源。在原版 RxSwift 中,DisposeBag 差不多就是 CompositeDisposable。這樣也是為什么我們要把訂閱交給 DisposeBag 來進(jìn)行管理,DisposeBag 作為某個(gè)對象的屬性,會隨著對象的釋放,從而自動(dòng)釋放真正的訂閱資源。
為了支持 Disposable,我們需要在 RxDemo-01 的基礎(chǔ)上稍作修改即可,主要是修改發(fā)布者 Observable 。
// MARK: - Observable
protocol ObservableType {
associatedtype Element
// 訂閱操作
func subscribe<O: ObserverType>(observer: O) -> Disposable where O.Element == Element
}
class Observable<Element>: ObservableType {
// 定義 發(fā)布事件 的閉包
private let _eventGenerator: (Observer<Element>) -> Disposable
init(_ eventGenerator: @escaping (Observer<Element>) -> Disposable) {
_eventGenerator = eventGenerator
}
// 實(shí)現(xiàn) 訂閱操作 的協(xié)議,內(nèi)部生成事件
func subscribe<O: ObserverType>(observer: O) -> Disposable where O.Element == Element {
let composite = CompositeDisposable()
// 通過一個(gè)中間 Observer 對原始 Observer 進(jìn)行封裝,用于過濾事件的傳遞。
let disposable = _eventGenerator(Observer { (event) in
guard !composite.isDisposed else { return }
// 事件傳遞給原始 observer
observer.on(event: event)
// 通過 composite 管理 error、completed 時(shí),自動(dòng)取消訂閱
switch event {
case .error(_), .completed:
composite.dispose()
default:
break
}
})
// 將 _eventGenerator 返回的 AnonymousDisposable 加入至 CompositeDisposable 中進(jìn)行管理
composite.add(disposable: disposable)
return composite
}
}
接下來,我們來試用一下 RxDemo-02 所實(shí)現(xiàn)的 RxSwift。很顯然,這種模式與原始的 RxSwift 的使用方式進(jìn)一步吻合。這里,我們實(shí)現(xiàn)了取消訂閱的功能。
// MARK: - Test
let observable = Observable<Int> { (observer) -> Disposable in
print("send 0")
observer.on(event: .next(0)) // observer.on(event: .next(0).map({ $0 * 2 }))
print("send 1")
observer.on(event: .next(1))
print("send 2")
observer.on(event: .next(2))
print("send 3")
observer.on(event: .next(3))
DispatchQueue.main.asyncAfter(deadline: .now() + 3) {
print("send completed")
observer.on(event: .completed)
}
return AnonymousDisposable {
print("dispose")
}
}
let observer = Observer<Int> { (event) in
switch event {
case .next(let value):
print("recive \(value)")
case .error(let error):
print("recive \(error)")
case .completed:
print("recive completed")
}
}
let disposable = observable.subscribe(observer: observer)
DispatchQueue.main.asyncAfter(deadline: .now() + 2) {
disposable.dispose()
}
// send 0
// recive 0
// send 1
// recive 1
// send 2
// recive 2
// send 3
// recive 3
// dispose
// send completed
RxDemo-02 的核心思想是在 RxDemo-01 的基礎(chǔ)上對事件進(jìn)行攔截和過濾,如下圖所示。
具體的實(shí)現(xiàn)方式如下所示,通過 CompositeDisposable 管理 AnonymousDisposable(原始 subscribe 中的閉包執(zhí)行后的返回類型)。同時(shí),在執(zhí)行發(fā)布事件時(shí),使用一個(gè)中間 Observer 接收原始事件,中間 Observer 引用外部 CompositeDisposable 的狀態(tài)決定是否將事件發(fā)送給原始 Observer。
使用 CompositeDisposable 的本質(zhì)就是添加了一個(gè)中間層來解決管理訂閱的問題。
結(jié)構(gòu)優(yōu)化(RxDemo-03)
在 RxDemo-02 中,AnonymousObserver 引用了外部的 CompositeDisposable 中的訂閱狀態(tài),從而決定事件的傳遞方向。這種代碼邏輯由內(nèi)而外實(shí)現(xiàn),并不是很直觀。
為了更加清晰地描述這個(gè)事件流動(dòng)方向,RxDemo-03 通過增加一個(gè)中間層,將原始 observer、中間 observer、事件轉(zhuǎn)發(fā)邏輯聚合在同一層級下,讓代碼具有更好的可讀性。這里我們實(shí)現(xiàn)一個(gè)遵循 Disposable 協(xié)議的 Sink 類型。Sink 是 水槽 的意思,象征著這里我們通過它來控制事件的流動(dòng)方向,暗示了這個(gè)類的作用。
class Sink<O: ObserverType>: Disposable {
private var _disposed: Bool = false
private let _forward: O
private let _eventGenerator: (Observer<O.Element>) -> Disposable
private let _composite = CompositeDisposable()
init(forward: O, eventGenerator: @escaping (Observer<O.Element>) -> Disposable) {
_forward = forward
_eventGenerator = eventGenerator
}
func run() {
// 通過一個(gè)中間 Observer 接收原始事件
// 根據(jù) CompositionDisposable 的狀態(tài)決定是否傳遞給原始 Observer
let observer = Observer<O.Element>(forward)
// 執(zhí)行事件生成器
// 將返回值 Disposable 加入到 CompositeDisposable 中進(jìn)行管理
_composite.add(disposable: _eventGenerator(observer))
}
private func forward(event: Event<O.Element>) {
guard !_disposed else { return }
// 事件傳遞給原始 observer
_forward.on(event: event)
// 通過 composite 管理 error、completed 時(shí),自動(dòng)取消訂閱
switch event {
case .completed, .error(_):
dispose()
default:
break
}
}
func dispose() {
_disposed = true
_composite.dispose()
}
}
在定義了 Sink 之后,我們就可以簡化 Observable 中 subscribe 方法的具體實(shí)現(xiàn)。
class Observable<Element>: ObservableType {
// 定義 發(fā)布事件 的閉包
private let _eventGenerator: (Observer<Element>) -> Disposable
init(_ eventGenerator: @escaping (Observer<Element>) -> Disposable) {
_eventGenerator = eventGenerator
}
// 實(shí)現(xiàn) 訂閱操作 的協(xié)議,內(nèi)部生成事件
func subscribe<O: ObserverType>(observer: O) -> Disposable where O.Element == Element {
let sink = Sink(forward: observer, eventGenerator: _eventGenerator)
sink.run()
return sink
}
}
SinkDisposable 引用了原始的事件發(fā)生器,并定義一個(gè)中間 Observer 轉(zhuǎn)入至原始事件發(fā)生器,從而讓中間 observer 接收原始事件。除此之外,SinkDisposable 還引用了原始 observer,當(dāng)中間 observer 處理原始事件時(shí),會判斷訂閱是否已經(jīng)取消,從而決定是否將原始事件轉(zhuǎn)發(fā)給原始 observer。此時(shí),RxDemo-03 實(shí)現(xiàn)的 RxSwift 的內(nèi)部調(diào)用關(guān)系如下所示。
操作符(RxDemo-04)
下面,我們來實(shí)現(xiàn)操作符,RxSwift 中包含了大量的操作符,它們基本上都是對函數(shù)式編程中 函子、單子 等進(jìn)行了應(yīng)用。我們以 map 為例進(jìn)行介紹。
如下所示,map 方法能夠?qū)⒁粋€(gè)普通函數(shù)應(yīng)用到包裝類型 ObservableType 上。map 方法最終返回一個(gè) Observable<Result> 類型(同樣遵循 ObservableType 協(xié)議),因此能夠完美支持鏈?zhǔn)讲僮鳌?/p>
extension ObservableType {
func map<Result>(_ transform: @escaping (Element) throws -> Result) -> Observable<Result> {
return Observable<Result> { (observer) in // observer 為原始 observer
// 此閉包可看成是一個(gè) eventGenerator
// 向原始 observable 中傳入一個(gè)中間 map observer,即由中間 map observer 替換原始 observer 監(jiān)聽原始事件
// 中間 map observer 對原始事件進(jìn)行轉(zhuǎn)換后,轉(zhuǎn)發(fā)給原始 observer
return self.subscribe(observer: Observer { (event) in
switch event {
case .next(let element):
do {
try observer.on(event: .next(transform(element)))
} catch {
observer.on(event: .error(error))
}
case .error(let error):
observer.on(event: .error(error))
case .completed:
observer.on(event: .completed)
}
})
}
}
}
接下來,我們來試用一下 RxDemo-04 所實(shí)現(xiàn)的 RxSwift。這里,我們實(shí)現(xiàn)了 map 操作符的功能。
// MARK: - Test
let observable = Observable<Int> { (observer) -> Disposable in
print("send 0")
observer.on(event: .next(0)) // observer.on(event: .next(0).map({ $0 * 2 }))
print("send 1")
observer.on(event: .next(1))
print("send 2")
observer.on(event: .next(2))
print("send 3")
observer.on(event: .next(3))
DispatchQueue.main.asyncAfter(deadline: .now() + 3) {
print("send completed")
observer.on(event: .completed)
}
return AnonymousDisposable {
print("dispose")
}
}
let observer = Observer<Int> { (event) in
switch event {
case .next(let value):
print("recive \(value)")
case .error(let error):
print("recive \(error)")
case .completed:
print("recive completed")
}
}
let disposable = observable.map { $0 * 2 }.subscribe(observer: observer)
DispatchQueue.main.asyncAfter(deadline: .now() + 2) {
disposable.dispose()
}
// send 0
// recive 0
// send 1
// recive 2
// send 2
// recive 4
// send 3
// recive 6
// dispose
// send completed
RxDemo-04 中由于增加了操作符功能,其內(nèi)部的調(diào)用關(guān)系也發(fā)生了變化。特別是,發(fā)布事件和過濾事件的邏輯,很顯然,每增加一個(gè)操作符,就會增加一個(gè) SinkDisposable 中間層,調(diào)用棧也會更深。
如果我們仔細(xì)分析 RxDemo-04,其實(shí)我們可以發(fā)現(xiàn)內(nèi)部隱藏了如下所示的調(diào)用關(guān)系鏈:observable -> MapObserver -> MapObservable -> observer。實(shí)際執(zhí)行時(shí),調(diào)用關(guān)系如下所示:
- observable 調(diào)用 MapObserver.on 方法,將原始事件傳遞給 MapObserver;
- MapObserver 使用 map 方法將原始事件轉(zhuǎn)換成 map 事件(即 map 后的數(shù)據(jù)),作為 MapObservable 發(fā)出的事件;
- MapObservable 調(diào)用 observer.on 方法,將 map 事件傳給 observer。
分類細(xì)化(RxDemo-05)
在 RxDemo-04 中,為了增加 map 操作符,對 ObservableType 進(jìn)行了擴(kuò)展,其本質(zhì)就是在原始的 Observable 和 Observer 之間插入了一個(gè) MapObserver 和一個(gè) MapObservable。本節(jié),我們繼續(xù)進(jìn)行優(yōu)化,對中間類也進(jìn)行細(xì)分和定義。
在 RxDemo-04 中,Sink 的主要作用是 根據(jù)訂閱是否取消決定是否攔截事件的傳遞。這里我們可能會想到:中間訂閱者(如:MapObserver)本身是不是就應(yīng)該具備 Sink 的這種功能呢?事實(shí)上,RxSwift 就是讓 Sink 作為所有 Observer 的基類。
對于操作符,我們可以實(shí)現(xiàn)上述的模式;但是,對于不帶操作符的情況,該如何處理呢?為了實(shí)現(xiàn)模式的統(tǒng)一,我們可以認(rèn)為不帶操作符的情況等同于帶了 只返回原值的匿名操作符,即等同于 map { $0 }。針對此情況,我們也需要定義兩個(gè)類 AnonymousObserver 和 AnonymousObservable。
下面,我們來進(jìn)行優(yōu)化改進(jìn)。
首先,我們將原來的 Observable<Element> 改成抽象基類。
// 抽象類
class Observable<Element>: ObservableType {
// // 定義 發(fā)布事件 的閉包,有子類來定義
// private let _eventGenerator: (Observer<Element>) -> Disposable
//
// init(_ eventGenerator: @escaping (Observer<Element>) -> Disposable) {
// _eventGenerator = eventGenerator
// }
// 實(shí)現(xiàn) 訂閱操作 的協(xié)議,內(nèi)部生成事件
func subscribe<O: ObserverType>(observer: O) -> Disposable where O.Element == Element {
rxAbstractMethod()
}
}
func rxAbstractMethod(file: StaticString = #file, line: UInt = #line) -> Swift.Never {
fatalError("Abstract Method", file: file, line: line)
}
其次,我們再一定一個(gè)新的類 Producer<Element> 代替 Observable<Element>。Producer<Element> 繼承自 Observable<Element>,作為發(fā)布者的基類,該類內(nèi)部沒有時(shí)間生成器 eventGenerator 的閉包,由子類選擇性進(jìn)行定義。
class Producer<Element>: Observable<Element> {
// 實(shí)現(xiàn) 訂閱操作 的協(xié)議,內(nèi)部生成事件
override func subscribe<O: ObserverType>(observer: O) -> Disposable where O.Element == Element {
return run(observer: observer)
}
func run<O: ObserverType>(observer: O) -> Disposable where O.Element == Element {
rxAbstractMethod()
}
}
然后,我們來對 Sink 進(jìn)行修改。在 RxDemo-04 中,Sink 即提供了事件生成的功能和事件轉(zhuǎn)發(fā)的功能。這里,我們讓 Sink 的職責(zé)更加單一,僅僅是提供事件轉(zhuǎn)發(fā)的功能。修改結(jié)果如下:
class Sink<O: ObserverType>: Disposable {
private var _disposed: Bool = false
private let _forward: O
private let _composite = CompositeDisposable()
init(forward: O) {
_forward = forward
}
func forward(event: Event<O.Element>) {
guard !_disposed else { return }
// 事件傳遞給原始 observer
_forward.on(event: event)
// 通過 composite 管理 error、completed 時(shí),自動(dòng)取消訂閱
switch event {
case .completed, .error(_):
dispose()
default:
break
}
}
func dispose() {
_disposed = true
print("dispose execute")
_composite.dispose()
}
}
為了便于泛型類型的轉(zhuǎn)換,我們給 ObservableType 協(xié)議增加一個(gè)方法,并由 Observable<Element> 予以實(shí)現(xiàn)。
protocol ObservableType {
// ...
func asObservable() -> Observable<Element>
}
class Observable<Element>: ObservableType {
// ...
func asObservable() -> Observable<Element> {
return self
}
}
接下來,我們來分別實(shí)現(xiàn) Sink 的子類 AnonymousObserver 和 MapObserver 以及 Producer 的子類 AnonymousObservable 和 MapObservable。
class AnonymousObserver<O: ObserverType>: Sink<O>, ObserverType {
typealias Element = O.Element
override init(forward: O) {
super.init(forward: forward) // forward 為原始訂閱者
}
func on(event: Event<Element>) {
// 對原始事件進(jìn)行轉(zhuǎn)發(fā)
switch event {
case .next(let element):
self.forward(event: .next(element))
case .error(let error):
self.forward(event: .error(error))
self.dispose()
case .completed:
self.forward(event: .completed)
self.dispose()
}
}
func run(parent: AnonymousObservable<Element>) -> Disposable {
// 執(zhí)行事件生成器
parent._eventGenerator(Observer(self))
}
}
class AnonymousObservable<Element>: Producer<Element> {
// 持有事件生成器閉包
let _eventGenerator: (Observer<Element>) -> Disposable
init(eventGenerator: @escaping (Observer<Element>) -> Disposable) {
self._eventGenerator = eventGenerator
}
override func run<O>(observer: O) -> Disposable where Element == O.Element, O : ObserverType {
// 訂閱發(fā)生時(shí),生成一個(gè)中間訂閱者 AnonymousObserver 來訂閱原始事件,并將事件轉(zhuǎn)發(fā)給原始訂閱者
let sink = AnonymousObserver(forward: observer)
sink.run(parent: self)
return sink
}
}
class MapObserver<Source, Result, O: ObserverType>: Sink<O>, ObserverType {
typealias Element = Source
typealias Result = O.Element
typealias Transform = (Source) throws -> Result
private let _transform: Transform
init(forward: O, transform: @escaping Transform) { // forward 為原始訂閱者
self._transform = transform
super.init(forward: forward)
}
func on(event: Event<Element>) {
// 對原始事件進(jìn)行 map 轉(zhuǎn)換,對結(jié)果進(jìn)行轉(zhuǎn)發(fā)
switch event {
case .next(let element):
do {
let mappedElement = try _transform(element)
self.forward(event: .next(mappedElement as! O.Element))
} catch {
self.forward(event: .error(error))
}
case .error(let error):
self.forward(event: .error(error))
self.dispose()
case .completed:
self.forward(event: .completed)
self.dispose()
}
}
}
class MapObservable<Source, Result>: Producer<Result> {
typealias Transform = (Source) throws -> Result
private let _transform: Transform
private let _source: Observable<Source>
init(source: Observable<Source>, transform: @escaping Transform) {
self._source = source
self._transform = transform
}
override func run<O: ObserverType>(observer: O) -> Disposable where Result == O.Element {
// 訂閱發(fā)生時(shí),生成一個(gè)中間訂閱者 MapObserver 來訂閱上游事件
let sink = MapObserver(forward: observer, transform: self._transform)
self._source.subscribe(observer: sink)
return sink
}
}
現(xiàn)在我們再來看發(fā)布者、操作符,其實(shí)兩者的本質(zhì)都是一樣,都是創(chuàng)建了一個(gè)發(fā)布者。對此,我們采用擴(kuò)展的方式來提供相應(yīng)的方法。如下所示:
extension ObservableType {
static func create(_ eventGenerator: @escaping (Observer<Element>) -> Disposable) -> Observable<Element> {
return AnonymousObservable(eventGenerator: eventGenerator)
}
}
extension ObservableType {
func map<Result>(_ transform: @escaping (Element) throws -> Result) -> Observable<Result> {
return MapObservable(source: self.asObservable(), transform: transform)
}
}
最后,我們再來驗(yàn)證一下實(shí)現(xiàn)結(jié)果,如下所示。
// MARK: - Test
let observable = Observable<Int>.create { (observer) -> Disposable in // observer 為 MapObserver
print("send 0")
observer.on(event: .next(0))
print("send 1")
observer.on(event: .next(1))
print("send 2")
observer.on(event: .next(2))
print("send 3")
observer.on(event: .next(3))
DispatchQueue.main.asyncAfter(deadline: .now() + 3) {
print("send completed")
observer.on(event: .completed)
}
return AnonymousDisposable {
print("dispose")
}
}
let observer = Observer<Int> { (event) in
switch event {
case .next(let value):
print("recive \(value)")
case .error(let error):
print("recive \(error)")
case .completed:
print("recive completed")
}
}
let disposable = observable.map { $0 * 2 }.map { $0 + 1 }.subscribe(observer: observer)
DispatchQueue.main.asyncAfter(deadline: .now() + 2) {
disposable.dispose()
}
// send 0
// recive 1
// send 1
// recive 3
// send 2
// recive 5
// send 3
// recive 7
// dispose execute
// send completed
從運(yùn)行結(jié)果而言,RxDemo-05 基本實(shí)現(xiàn)了分類細(xì)化,并且達(dá)到了取消訂閱的功能。此時(shí),上述例子中實(shí)際的訂閱關(guān)系如下所示。相比 RxDemo-04,多了 AnonymousObserver 和 AnonymousObservable,但是整體的內(nèi)部訂閱關(guān)系鏈更加清晰了。
此時(shí),我們再來對照一下 RxSwift 和 RxDemo-05 中類的定義,如下表所示。各個(gè)類的功能基本相同,整體結(jié)構(gòu)也是大同小異,只是在類名上略有差異。
| RxDemo-05 | Producer | Sink | AnonymousObserver | AnonymousObservable | MapObserver | MapObservable |
|---|---|---|---|---|---|---|
| RxSwift | Producer | Sink | AnonymousObservableSink | AnonymousObservable | Map | MapSink |
訂閱管理(RxDemo-06)
如果,我們仔細(xì)對比,可以發(fā)現(xiàn) RxSwift 中 Producer 的 subscribe 方法內(nèi)部與 RxDemo-05 還是不太一樣,前者內(nèi)部還引用了一個(gè) SinkDisposer 類。其作用是什么呢?事實(shí)上,其主要作用是管理 disposeHandler。細(xì)心的同學(xué)可能會發(fā)現(xiàn) RxDemo-05 中有個(gè) BUG:取消訂閱時(shí)沒有執(zhí)行 print("dispose") 閉包。
對此,我們也可以用類似的方式來解決,通過增加一個(gè) Diposer 類來進(jìn)行管理。具體代碼見:RxDemo-06。
當(dāng)訂閱發(fā)生時(shí)(即執(zhí)行 subscribe 方法時(shí)),內(nèi)部會產(chǎn)生一個(gè)遞歸的控制流,如下圖所示。
通過遞歸返回的方式構(gòu)建整個(gè)訂閱管理關(guān)系鏈,如下圖所示。diposer0 是 subscribe 方法最終返回的 Disposable 對象。當(dāng)我們對 disposer0 執(zhí)行 dispose 方法時(shí),內(nèi)部會遞歸地執(zhí)行 dispose 方法,最終取消訂閱鏈中所有的訂閱。
注意:這里面會有循環(huán)引用,如 MapObserver 內(nèi)部又引用了 Disposer0,RxDemo-06 以及 RxSwift 中的處理是給 Disposer 類內(nèi)部添加一個(gè)狀態(tài),表示是否已經(jīng)取消了訂閱,從而避免循環(huán)引用導(dǎo)致的循環(huán)調(diào)用。這里的循環(huán)引用并不一定是壞事,它在下述的場景下起到了非常關(guān)鍵的作用。
當(dāng)事件中出現(xiàn)一個(gè) complete 或 error 事件時(shí),由于事件會依次傳遞至 Observer,最后一次傳遞時(shí),即 MapObserver 進(jìn)行傳遞時(shí),會判斷是否是 complete 或 error 事件,從而決定是否執(zhí)行 dispose 方法。當(dāng) MapObserver 執(zhí)行 dipose 方法時(shí),會通過上述的循環(huán)引用,調(diào)用 Disposer0 執(zhí)行 dispose 方法,從而實(shí)現(xiàn)整體取消訂閱。
總結(jié)
本文通過逐步實(shí)現(xiàn) RxSwift 核心部分中的功能,一窺其背后的設(shè)計(jì)思路。從中我們也看到了其對函數(shù)式編程的應(yīng)用,以及其所呈現(xiàn)出來的流編程模式的底層實(shí)現(xiàn)原理。
后續(xù),我們將進(jìn)一步探索原版 RxSwift 中其他的一些內(nèi)容。
參考
- RxSwift repo
- ReactiveX
- 函數(shù)式編程——Functor、Applicative、Monad
- 《計(jì)算機(jī)程序的構(gòu)造與解釋》
- Modern RxSwift Architectures
- Learn Rx by implementing Observable