響應式編程&鏈式編程
公司的幾個共享項目, 有較多后臺定時的網(wǎng)絡請求,定位和藍牙操作的組合.
原方案是通過閉包嵌套起來, 如此一來有些比較頭疼的問題:
- 閉包回調(diào)地獄, 每個組合操作的業(yè)務上有變動需要做大修.
- 無法獲取或取消上一次的請求/操作.
- 異步響應不及時可能造成之前的請求后至, 讓數(shù)據(jù)出錯. 或在頁面退出之后仍然在進行未完成的請求.
網(wǎng)上較推薦的解決方案, 就是使用響應式編程框架: ReactiveCocoa/RxSwift
剛好公司的項目里面本來已經(jīng)使用了Swagger來自動生成網(wǎng)絡請求業(yè)務的代碼,
自帶Moya框架并配搭了RxSwift.
理所當然用了RxSwift.
串行異步處理
舉個場景: 設備列表界面
需求大致如下:
- 列表下拉刷新.
- 每次下拉刷新, 需要先更新坐標, 然后再獲取附近的設備列表.
- 考慮定位超時或者定位出錯的情況(例如沒有開啟權限).
分析之后得出, 下拉刷新觸發(fā)之后, 需要實現(xiàn)以下函數(shù):
// 檢查定位權限
func checkAuthStatus(completion: (Error?) -> Void)
// 更新坐標
func updateLocation(success: (CLLocation) -> Void, failure: (Error) -> Void)
// 請求設備
func fetchDevices(near location: CLLocation, success: ([Device]) -> Void, failure: (Error) -> Void)
// 更新數(shù)據(jù)以及界面
func update(with devices: [Device])
// 處理錯誤
func handlError(_ error: Error)
閉包實現(xiàn)的調(diào)用代碼:
self.checkAuthStatus(completion: { [weak self] error in
if let _error = error {
self?.handleError(_error)
}else {
self?.updateLocation(
success: { location in
self?.fetchDevices(
near: location,
success: {self?.update(with: $0)}
failure: {self?.handlError($0)}
)
},
failure: {
self?.handlError($0)
}
)
}
})
是的, 它能跑, 但是它也很難看.
如果把這段代碼放在列表下拉刷新的回調(diào)里面, 視覺上會更加讓人崩潰.
這個時候搬出RxSwift. 我們先直接看一下, 接入后代碼的樣子:
self.tableView.rx_pullToRefresh
.flatMap({[unowned self] in self.rx_checkAuthStatus()})
.flatMap({[unowned self] in self.rx_updateLocation()})
.flatMap({[unowned self] in self.rx_fetchDevices(near: $0)})
.subscribe(onNext: {[weak self] in self?.update(with: $0)},
onError: {[weak self] in self?.handleError($0)})
.disposed(by: self.disposeBag)
(以上出現(xiàn)的rx_xxx函數(shù)會在后面討論如何實現(xiàn). 同時, 這段代碼仍未完善, 后面會繼續(xù)討論.)
這段代碼關鍵點主要有:
鏈式調(diào)用
把原來通過閉包回調(diào)的函數(shù)都改造成了返回Observable的函數(shù), 然后鏈式調(diào)用Observable的Operator(flatMap函數(shù)).函數(shù)式
把處理回調(diào)的函數(shù)(匿名函數(shù))作為參數(shù)傳遞給 Operator 函數(shù)和subscribe函數(shù).Error聚合處理
用閉包回調(diào)的方式需要在每個函數(shù)調(diào)用的時候傳入處理Error的函數(shù).
而使用 RxSwift 的話,Error的處理入口可以聚合到1個地方. 就是在subscribe函數(shù)的onError參數(shù).
以上便是串行異步操作使用RxSwift處理的效果.
并行異步處理
照樣舉例: 下載多張圖片并本地化.
大致需求:
- 并發(fā)下載若干張圖片(不考慮并發(fā)數(shù)量)
- 全部圖片下載完成后把數(shù)據(jù)本地化
- 如果任意一張圖片下載失敗則不把數(shù)據(jù)本地化
分解之后有以下幾個函數(shù):
// 下載圖片
func downloadImage(withURL: URL) -> (imageData: Data?, error: Error?)
// 處理下載好的圖片, 更新臨時數(shù)據(jù)
func handleDownloadImage(imageData: Data, url: URL)
// 處理錯誤
func handleError(error: Error)
// 把臨時數(shù)據(jù)寫入本地數(shù)據(jù)庫
func updateDatabase()
原來的實現(xiàn), 比較簡單地遍歷了全部URL并開啟異步下載, 通過DispatchGroup做任務依賴:
let group = DispatchGroup()
urls.forEach { value in
DispatchQueue.global().async(group: group, execute: { [weak self] in
let res = self?.downloadImage(withURL: value)
if let data = res?.imageData {
self?.handleDownloadImage(imageData: data, url: value)
}else if let error = res?.error {
group.leave() // 遇到錯誤, 停止group
DispatchQueue.main.async { // 處理錯誤只需要執(zhí)行一次
self?.handleError(error: error)
}
}
})
}
group.notify(queue: DispatchQueue.main) { [weak self] in
self?.updateDatabase()
}
而接入RxSwift的做法, 利用Observable的zip函數(shù)把多個下載任務聚合:
let observables =
urls.map { [unowned self] url in
self.rx_downloadImage(withURL: url).map { data in self.handleDownloadImage(imageData: data, url: url) }
}
Observable.zip(observables)
.subscribe(onNext: {[unowned self] _ in self.updateDatabase()}
onError: {[unowned self] in self.handleError(error: $0)})
.disposed(by: self.disposeBag)
對原函數(shù)進行改造
上面我們說了要引入 RxSwift做異步處理, 接著我們討論如何把原來的函數(shù)改造并接入 RxSwift 體系.
Observable
Observable是 RxSwift 中數(shù)據(jù)傳遞的載體.
以前, 我們從一個函數(shù)獲得其執(zhí)行結果, 一般是:
- 同步函數(shù)情況下, 通過這個函數(shù)的返回值.
- 異步函數(shù)情況下, 通過向函數(shù)傳入回調(diào)閉包.
- 其他方式(如傳入回調(diào)代理, 或者監(jiān)聽廣播通知等), 這些在這里先不討論.
RxSwift 可以讓我們在不傳入回調(diào)閉包的情況下, 用類似調(diào)用同步函數(shù)的方式, 從返回值拿到異步函數(shù)的運行結果. 而這個結果, 就是通過Observable包裝而來的.
因此我們要做的, 就是把運行結果用 Observable 包裝起來返回.
RxSwift提供了一個比較簡單的方式創(chuàng)建 Observable, 就是其靜態(tài)函數(shù)create:
// Creates an observable sequence from a specified subscribe method implementation.
public static func create(_ subscribe: @escaping (RxSwift.AnyObserver<Self.E>) -> Disposable) -> RxSwift.Observable<Self.E>
這個函數(shù)只有一個閉包參數(shù), 閉包會給我們提供一個Observer的實例, 我們的操作需要在這個閉包里頭完成, 而操作結果則需要傳給這個 Observer .
比方上面的下載圖片函數(shù):
func rx_downloadImage(withURL url: URL) -> Observable<Data> {
return Observable.create({ [weak self] observer -> Disposable in
DispatchQueue.global().async({ // 子線程中調(diào)用
let res = self?.downloadImage(withURL: url)
if let data = res?.data {
observer.onNext(data) // 把下載的數(shù)據(jù)傳遞給Observer
observer.onCompleted() // 告訴Observer, 已經(jīng)完成了
}else if let error = res?.error {
observer.onError(error) // 把錯誤傳遞給Observer
}
})
return Disposables.create()
})
}
這樣就封裝了一個在內(nèi)部子線程調(diào)用舊函數(shù)downloadImage的函數(shù). 對于子線程調(diào)用這個需求, 簡單地使用了global隊列來異步執(zhí)行下載.
也許有人會有疑問, 那么這個下載操作實際上是在什么時候執(zhí)行呢? Observable 被返回的時候嗎? 還是Observable被創(chuàng)建的時候?
這里先直接給出答案:
就是這個Observable(或者通過它轉換得來的新Observable)被subscribe的時候.
具體為什么, 這里暫時不探討.
隊列的切換
剛剛的我們實現(xiàn)了在函數(shù)內(nèi)異步下載并回調(diào)的需求, 但是這個函數(shù)又有一個新問題:
下載操作必然是在DispatchQueue.global中執(zhí)行, 調(diào)用者無法控制.
那我們有辦法自己去控制下載任務在哪個隊列中進行嗎?
有的, 通過以下兩個函數(shù)observeOn和subscribeOn:
// Wraps the source sequence in order to run its observer callbacks on the specified scheduler.
public func observeOn(_ scheduler: ImmediateSchedulerType) -> PrimitiveSequence<Trait, Element>
// Wraps the source sequence in order to run its subscription and unsubscription logic on the specified
public func subscribeOn(_ scheduler: ImmediateSchedulerType)
其中, observeOn是指定了Observable對應的任務在什么隊列里執(zhí)行.
而subscribeOn則是指定了我們調(diào)用subscribe函數(shù)的時候, 回調(diào)任務在什么隊列執(zhí)行. 如果沒有調(diào)用subscribeOn, 那么回調(diào)里任務執(zhí)行的隊列就和ObserveOn指定的保持一致.
這兩個函數(shù)接收的參數(shù)ImmediateSchedulerType是RxSwift定義的協(xié)議, 說白了就是隊列的信息, 以及執(zhí)行任務的方式.
實際上我們有幾個已經(jīng)實現(xiàn)好結構體可以直接使用, 其中比較常用的是:
-
SerialDispatchQueueScheduler(串行隊列) -
ConcurrentDispatchQueueScheduler(并行隊列) -
ConcurrentMainScheduler(主隊列, 是為了subscribeOn函數(shù)設計的)
Abstracts work that needs to be performed on
MainThread. In caseschedulemethods are called from main thread, it will perform action immediately without scheduling.
This scheduler is optimized forsubscribeOnoperator. If you want to observe observable sequence elements on main thread usingobserveOnoperator,
MainScheduleris more suitable for that purpose.
-
MainScheduler(主隊列, 是為了observeOn函數(shù)設計的)
Abstracts work that needs to be performed on
DispatchQueue.main. In caseschedulemethods are called fromDispatchQueue.main, it will perform action immediately without scheduling.
This scheduler is usually used to perform UI work.
Main scheduler is a specialization ofSerialDispatchQueueScheduler.
This scheduler is optimized forobserveOnoperator. To ensure observable sequence is subscribed on main thread usingsubscribeOn
operator please useConcurrentMainSchedulerbecause it is more optimized for that purpose.
我們可以在SerialDispatchQueueScheduler和ConcurrentDispatchQueueScheduler的init函數(shù)中傳入對應的隊列. 或者直接指定qos, 類似DispatchQueue的global函數(shù).
通過這2個函數(shù), 我們就可以去掉上面封裝的下載任務中的GCD調(diào)用, 在外部調(diào)用的時候自由切換隊列:
func rx_downloadImage(withURL url: URL) -> Observable<Data> {
return Observable.create({ [unowned self] observer -> Disposable in
let res = self.downloadImage(withURL: url)
if let data = res.data {
observer.onNext(data)
observer.onCompleted()
}else if let error = res.error {
observer.onError(error)
}
return Disposables.create()
})
}
let observables = ...
let observeScheduler = ConcurrentDispatchQueueScheduler(qos: DispatchQoS.default)
Observable.zip(observables)
.observeOn(observeScheduler)
.subscribeOn(MainScheduler.asyncInstance)
.subscribe(onNext: {[unowned self] res in self.updateDatabase()}
onError: {[unowned self] in self.handleError(error: $0)})
.disposed(by: self.disposeBag)
Observable對于Error報錯處理的"缺陷"
在串行異步請求處理里面提到了這樣的處理方式并不完善, 因為Observable有一個特點:
拋出Error之后就會自行銷毀訂閱
而這段代碼里面, 下拉刷新的Observable是與另外兩個異步操作聚合在一起的, 就是說如果網(wǎng)絡請求或者定位的操作拋出Error, 那么用戶下一次下拉刷新也是不會被處理的.
所以, 為了避免這個問題, 解決方式有兩種:
不要把下拉刷新的
Observable與另外兩個異步操作的Observable聚合;攔截另外兩個
Observable可能拋出的Error
第1個方案的解決代碼:
self.tableView.rx_pullToRefresh
.subscribe(onNext: { [unowned self] in
_ = self.rx_updateLocation()
.flatMap({self.rx_fetchDevices(near: $0)})
.subscribe(onNext: {self.update(with: $0)},
onError: {self.handleError($0)})
})
.disposed(by: self.disposeBag)
這樣就可以保證rx_pullToRefresh會一直被監(jiān)聽, 但是這個處理方式會讓代碼可讀性又下降了.
所幸RxSwift還給我們提供了另外的選擇, 利用Observable的catchError函數(shù)或者catchErrorJustReturn函數(shù), 我們就可以把以上2個異步操作的錯誤攔截.
Continues an observable sequence that is terminated by an error with the observable sequence produced by the handler.
public func catchError(_ handler: @escaping (Error) throws -> RxSwift.Observable<Self.E>) -> RxSwift.Observable<Self.E>
Continues an observable sequence that is terminated by an error with a single element.
public func catchErrorJustReturn(_ element: Self.E) -> RxSwift.Observable<Self.E>
其中catchErrorJustReturn函數(shù)可以讓我們產(chǎn)生一個默認值來繼續(xù)事件鏈,
而catchError則接收一個閉包參數(shù), 可以通過我們產(chǎn)生的Error來制定一個值給后續(xù)事件鏈或者直接在閉包里面處理Error.
顯然這個場景我們是需要處理Error的, 所以選用catchError. 而這樣的話, subscribe函數(shù)的onError就永遠都不會執(zhí)行了, 等于是把Error的處理提前到了catchError:
// 處理Error并且返回默認值
func rx_handleError(_ error: Error) -> Observable<[Device]> {
// 處理Error, 生成默認參數(shù)
...
return Observable.of(__defaultValue__)
}
// 請求列表的數(shù)據(jù)(聚合2個異步操作的Observable)
func rx_fetchTableViewData() -> Observable<[Device]> {
return self.rx_updateLocation().flatMap({self.rx_fetchDevices(near: $0)})
}
self.tableView.rx_pullToRefresh
.flatMap({ [unowned self] in
self.rx_fetchTableViewData().catchError({self.rx_handleError($0)})
})
.subscribe(onNext: {[weak self] in self?.update(with: $0)})
.disposed(by: self.disposeBag)
以上這樣操作算是把Error處理了, 但是還是存在問題, 我們還需要再改進一下, 不要讓handleResponseValue和handleError的代碼分散.
配合enum以及泛型更舒服地處理Error
Swift里面的枚舉類型是可以帶參數(shù)的, 我們可以把我們要的結果抽象出來, 定義一個枚舉:
enum Result {
case value([Device])
case error(Error)
}
然后改動一下我們的fetch以及handle方式:
// 改進后的fetch函數(shù)把[Device]和Error轉換成Result
func rx_fetchTableViewData() -> Observable<Result> {
return self.rx_updateLocation()
.flatMap({self.rx_fetchDevices(near: $0)})
.map({Result.value($0)})
.catchError({Observable.of(Result.error($0))})
}
// 統(tǒng)一處理Result
func handleResult(_ result: Result) {
switch result {
case .value(let value):
self.update(with: value)
case .error(let error):
self.handleError(error)
}
}
最后我們這個下拉刷新->更新定位坐標->獲取附近設備->更新table的代碼就可以變成:
self.tableView.rx_pullToRefresh
.flatMap({[unowned self] in self.rx_fetchTableViewData()})
.subscribe(onNext: {[weak self] in self?.handleResult($0)})
.disposed(by: self.disposeBag)
但是, 實際上有這種場景的不止是設備列表, 還有比如附近的設備中心(Station), 這個Result顯然可以承擔更多的任務.
利用Swift的泛型改進, 讓Result適用于通用的場景:
enum Result<T> {
case value(T)
case error(Error)
}
func rx_fetchDeviceTableViewData() -> Observable<Result<[Device]>> {
return ...
}
func handleDeviceResult(_ result: Result<[Device]>) {
...
}
func rx_fetchStationTableViewData() -> Observable<Result<[Station]>> {
return ...
}
func handleStationResult(_ result: Result<[Station]>) {
...
}
快速改造原來的GCD異步函數(shù)
如果在項目中途接入Rx, 原來項目中已經(jīng)存在大量通過GCD回調(diào)的函數(shù)了.
這個時候把全部函數(shù)都改造是很高成本的, 而且部分函數(shù)可能在項目中被調(diào)用了很多次, 涉及的模塊可能比較多, 但是不一定每個調(diào)用了這個函數(shù)的模塊都有必要接入Rx.
在這種情況下通用可以使用Observable的create函數(shù)去封裝原來的函數(shù).
比如有一個加載本地數(shù)據(jù)的函數(shù):
func loadDataFromLocal(filePath: URL, success: (Data)->Void, failure: (Error)->Void) {
...
...
...
}
在不改動原函數(shù)的情況下, 增加一個新的函數(shù):
func rx_loadDataFromLocal(filePath: URL) -> Observable<Result<Data>> {
return Observable.create({ observer in
loadDataFromLocal(
filePath: filePath,
success: {observer.onNext(.value($0))},
failure: {observer.onNext(.error($0))}
)
return Disposables.create()
})
}
這樣的函數(shù)改造和同步函數(shù)的改造一樣, 有一個類似的缺陷, 就是不可以再改變loadDataFromLocal函數(shù)在哪個隊列執(zhí)行.
相對地, 這個改造方式可以比較簡便地復用現(xiàn)有的函數(shù).
所以這是一個折中的改造方案.
取消任務
回顧上面的代碼, 有一段
self.tableView.rx_pullToRefresh
.flatMap({ [unowned self] in
self.rx_fetchTableViewData().catchError({self.rx_handleError($0)})
})
.subscribe(onNext: {[weak self] in self?.update(with: $0)})
.disposed(by: self.disposeBag)
在訂閱Observable之后, 我們又調(diào)用了其結果的disposed(by:)函數(shù), 這里傳入的參數(shù)就是用于管理這個訂閱的生命周期.
這個參數(shù)是個DisposeBag類型, 當它被釋放的時候, 對應它的訂閱任務也會取消. 因此我們可以通過管理DisposeBag來取消異步任務.
比方說讓ViewController持有一個DisposeBag, 在ViewController調(diào)用deinit的時候, 綁定在這個DisposeBag上的Observable就都不會繼續(xù)處理了.
但是上面是一種間接取消, 我們也有直接取消的方式:
調(diào)用dispose函數(shù), 如下:
let disposable = self.rx_fetchTableViewData().subscribe() // 保存訂閱的任務
...
...
...
disposable.dispose() // 在適當?shù)臅r候調(diào)用dispose取消訂閱
小結
以上是項目中接入 RxSwift 對異步/并行操作的優(yōu)化改進經(jīng)歷. 概括來說, 就是:
降維閉包嵌套, 替換成鏈式的
Observable變量或返回Observable的函數(shù).使用RxSwift提供的Operator函數(shù)進行數(shù)據(jù)流處理.
Error聚合處理.通過
observeOn和subscribeOn函數(shù)控制隊列切換.通過
DisposeBag類型, 或者dispose函數(shù)控制任務的銷毀.
持續(xù)更新...