前面的內(nèi)容中已經(jīng)學(xué)習(xí)了如何創(chuàng)建, 過濾, 變換 observable 的事件序列, 尤其是需要留意 flatMap 的強(qiáng)大處理能力.
本章則是看里面的一些組合操作符, 它們用于將異步序列中的數(shù)據(jù)進(jìn)行組合.
1 概述
本章沒有新的工程學(xué)習(xí), 打開一個(gè)空項(xiàng)目來實(shí)現(xiàn)即可.
RxSwift 的主旋律就是處理和管理異步序列. 不過經(jīng)常都會(huì)在這些序列的洪流中迷失方向. 故需要掌握組合操作符來化解這樣的困境.
2 拼接
在現(xiàn)實(shí)中的一個(gè)典型需求是保證一個(gè) observer 開始觀察的時(shí)候都能收到一個(gè)初始值. 而經(jīng)常都需要 "當(dāng)前狀態(tài)" 這樣的初始值, 而初始值之前又想要增加一些前驅(qū)狀態(tài), 比如下面的序列:
原始序列:
2---3---4---comp
希望得到的序列:
1---2---3---4---comp
這樣的操作可以使用如下的代碼實(shí)現(xiàn), 即利用 startWith 操作符:
exampleOf("startWith") {
let numbers = Observable.of(2, 3, 4)
let obv = numbers.startWith(1)
obv.subscribe(onNext: { (elem) in
print(elem)
}, onError: { (error) in
print(error)
}, onCompleted: {
print("complete")
}, onDisposed: {
print("disposed")
}).addDisposableTo(disBag)
}
startWith 操作符的作用是在 Observable 事件序列最前面添加指定的前驅(qū)事件, 當(dāng)然添加的事件數(shù)據(jù)都必須是和之前序列中事件元素類型一致.
但是不要被這個(gè)操作符所處的位置迷惑了! 雖然它是在處理鏈的下一個(gè)節(jié)點(diǎn), 但是將元素添加到的是之前序列的最前面!
startWith 操作符的用途也十分廣泛. 比如它可以保證任何序列都可以人為添加初始化時(shí)候的元素.
實(shí)際上 startWith 操作符是 concat 操作符家族中的一員.
Observable 的類方法 concat(_:) 可以將兩個(gè) observable 序列組合為一個(gè):
exampleOf("concat") {
let nums1 = Observable.of(1, 2, 3)
let nums2 = Observable.of(4, 5, 6)
let concatedSeq = Observable.concat([nums1, nums2])
concatedSeq.subscribe(onNext: { (num) in
print(num)
}, onError: { (error) in
print(error)
}, onCompleted: {
print("completed")
}, onDisposed: {
print("disposed")
}).addDisposableTo(disBag)
}
這個(gè)類方法的參數(shù)是一個(gè)有序序列(比如數(shù)組, 如上所示). 如果傳入的參數(shù)中的任何一個(gè)點(diǎn)上遇到 error, 則會(huì)直接將 error 及其之前的輸出出來.
另外還有一個(gè)對(duì)象方法 concat 用于兩個(gè)序列之間的組合:
exampleOf("concat(obj method)") {
let nums1 = Observable.of(1, 2, 3)
let nums2 = Observable.of(4, 5, 6)
let obv = nums1.concat(nums2)
obv.subscribe(onNext: { (num) in
print(num)
}, onError: { (error) in
print(error)
}, onCompleted: {
print("completed")
}, onDisposed: {
print("disposed")
}).addDisposableTo(disBag)
}
這個(gè)方法的輸出實(shí)際上也和之前的類方法輸出一致, 只是這個(gè)是鏈接兩個(gè) observable. 且 concat 對(duì)象方法會(huì)等待作為主調(diào)對(duì)象的序列 complete, 然后去觀察作為參數(shù)的 observable. 最后的輸出就是兩個(gè)序列的組合.
而上面的 startWith 操作符可以替換為 concat 來實(shí)現(xiàn)同樣的功能:
exampleOf("replace startWith with concat") {
let nums = Observable.of(2, 3, 4)
Observable
.just(1)
.concat(nums)
.subscribe(onNext: { (num) in
print(num)
}).addDisposableTo(disBag)
}
實(shí)際效果和之前的 startWith 一致. 需要注意的是, 這個(gè)方法的隱含條件是需要源序列產(chǎn)生了 complete 后才會(huì)拼接.
當(dāng)然, 需要注意的是拼接的序列中元素類型必須一致, 如果不一致則會(huì)編譯錯(cuò)誤. 不過學(xué)過了變換操作符, 可以按需進(jìn)行變換后再拼接.
3 合并
RxSwift 中提供了多種合并序列的方式. 最簡(jiǎn)單的就是 merge. 比如下面的兩個(gè)序列, 如果要按它們的元素的發(fā)射時(shí)間的不同來合并:
序列1: 1------2------3--comp
序列2: --4--5----6------comp
源序列: 序列1---序列2---comp
則合并后的序列為:
1---4---5---2---6---3---comp
這樣的結(jié)果可以通過 merge 操作符來實(shí)現(xiàn), 不過過程需要注意:
-
首先構(gòu)造一個(gè)源序列, 該序列為兩個(gè)序列的組合:
let obv1 = PublishSubject<String>() let obv2 = PublishSubject<String>() let source = Observable.of(obv1.asObservable(), obv2.asObservable()) -
在源序列上使用
merge操作符, 得到目標(biāo)序列:let target = source.merge() -
然后開始觀察目標(biāo)序列即可.
target.subscribe(onNext: { (str) in print(str) }).addDisposableTo(disBag)
下面就按照類似的方式來在兩個(gè)序列上添加next事件, 看輸出的情況是否和之前的猜測(cè)相同:
obv1.onNext("1")
obv2.onNext("4")
obv2.onNext("5")
obv1.onNext("2")
obv2.onNext("6")
obv1.onNext("3")
打印結(jié)果是 "1 4 5 2 6 3" 證明是合并成功了的.
merge 操作符相當(dāng)于是去觀察源序列中的所有子序列, 每當(dāng)觀察到 next , 就會(huì)把它放到新的目標(biāo)序列上. 而子序列的順序是不重要的, 因?yàn)槭强醋有蛄兄械脑氐陌l(fā)射時(shí)間來決定最終的元素排列順序.
下面是 RxSwift 中定義的 merge 后的序列的結(jié)束條件:
- merge 等待源序列中的所有子序列都 complete 后才會(huì) complete, 或者是 源序列 complete 時(shí)才會(huì) complete.
- 子序列在源序列中的出現(xiàn)順序沒有要求, 因?yàn)槭强疵總€(gè)子序列的元素發(fā)射時(shí)間順序.
- 但假如任何一個(gè)子序列發(fā)射 error, 則目標(biāo)序列會(huì)發(fā)射 error 并立即結(jié)束.
由于 merge 方法不會(huì)限制子序列的個(gè)數(shù), 如果想要進(jìn)行限制, 則使用 merge(maxConcurrent:) 操作符, 這個(gè)方法會(huì)在源序列中只選擇前面若干個(gè)序列來觀察. 超過數(shù)量的子序列會(huì)被放到一個(gè)隊(duì)列中, 當(dāng)之前的子序列有 complete 的之后, 再開始觀察隊(duì)列中放置的子序列.
在實(shí)際編程中經(jīng)常用到這個(gè)做了數(shù)量限制的 merge 操作符, 尤其是在資源受限的情況下, 比如同時(shí)進(jìn)行若干個(gè)網(wǎng)絡(luò)請(qǐng)求, 但實(shí)際同時(shí)發(fā)送的請(qǐng)求是有限制的情況下.
4 組合元素
在 RxSwift 中還有一個(gè)非常重要的操作符類型: combineLatest. 它們的作用也是將若干個(gè)序列進(jìn)行合并.
每次子序列發(fā)射一個(gè)值, 都會(huì)調(diào)用一次自定義 closure, 這個(gè) closure 里就可以對(duì)每個(gè)子序列的最新元素進(jìn)行處理, 從而每次獲取到的都是所有子序列的最新的事件數(shù)據(jù)的組合.
而這類操作符的應(yīng)用十分廣泛, 比如想同時(shí)觀察若干個(gè) textField 中的用戶輸入并進(jìn)行處理.
下面就來看如何使用:
-
先創(chuàng)建若干個(gè)子序列, 這里以兩個(gè)為例:
let obv1 = PublishSubject<String>() let obv2 = PublishSubject<String>() -
創(chuàng)建一個(gè)源序列, 用于包含這兩個(gè)子序列:
let target = Observable.combineLatest(obv1, obv2, resultSelector: { elem1, elem2 in return "\(elem1) \(elem2)" }) -
然后觀察該目標(biāo)序列:
target.subscribe(onNext: { res in print(res) }).addDisposableTo(disBag) -
下面就可以來驗(yàn)證輸出是否是最新的結(jié)果的組合:
// 經(jīng)過下面兩條后, 打印 hello world obv1.onNext("hello") obv2.onNext("world") // 經(jīng)過下面這條后, 打印 say world obv1.onNext("say")
可以發(fā)現(xiàn)輸出和預(yù)期一致.
而在實(shí)際使用的時(shí)候, 就可以在自定義塊中對(duì)數(shù)據(jù)進(jìn)行各種想要的組合, 然后再在觀察中去對(duì)組合數(shù)據(jù)進(jìn)行各種不同的操作.
上面例子中需要注意的內(nèi)容:
-
在組合數(shù)據(jù)的時(shí)候可以組合成任何類型的數(shù)據(jù), 子序列的數(shù)據(jù)類型不是必須一致, 且返回的類型也可以是自定義的, 比如下面的代碼:
let target = Observable.combineLatest(obv1, obv2, resultSelector: { elem1, elem2 in return "\(elem1) \(elem2)".characters.count })則組合后的數(shù)據(jù)是每個(gè)單詞的字符個(gè)數(shù).
目標(biāo)序列發(fā)射數(shù)據(jù)的前提條件是每個(gè)子序列都發(fā)射過一次數(shù)據(jù), 即至少每個(gè)子序列都擁有當(dāng)前 "最新數(shù)據(jù)". 這里需要額外注意, 因?yàn)槿绻硞€(gè)子序列沒有任何數(shù)據(jù)的話, 則目標(biāo)序列是不會(huì)發(fā)射任何數(shù)據(jù)的, 此時(shí)也不會(huì)調(diào)用自定義的 closure. 而每次滿足條件后, 任何一個(gè)子序列有新的值, 則都會(huì)調(diào)用自定義 closure, 并將組合數(shù)據(jù)發(fā)射出去.
正是因?yàn)樵谧远x closure 中, 可以返回任意類型的組合數(shù)據(jù), 故這個(gè)方法有很多作用. 實(shí)際實(shí)現(xiàn)中經(jīng)常在自定義 closure 中返回一個(gè) tuple. 然后將它傳到下一級(jí)去進(jìn)行過濾操作:
let target = Observable
.combineLatest(obv1, obv2, resultSelector: { ($0, $1) })
.filter({ !$0.0.isEmpty && !$0.1.isEmpty})
代碼中將兩個(gè)序列的最新的值在自定義 closure 中組合為一個(gè) tuple 并傳給下一級(jí)的 filter, 在 filter 中過濾掉有任意一個(gè)是空的情況.
上面的代碼還可以用尾隨閉包的方式書寫, 這樣就更加好看了:
// 下面將兩個(gè)不同類型的子序列元素組合成 tuple, 然后傳到下一級(jí)過濾
let target = Observable
.combineLatest(obv1, obv2) { ( $1) }
.filter { !$0.0.isEmpty && $0> 0 }
下面就來看看實(shí)際使用的時(shí)候的一個(gè)例子, 這個(gè)例子組合了用戶選擇的日期呈現(xiàn)方式和當(dāng)前的日期數(shù)據(jù):
exampleOf("組合用戶選擇和數(shù)據(jù)") {
let choice Observable<DateFormatter.Style>.(.short, .long)
let date = Observable.of(Date())
Observable
.combineLatest(choice, date) (format, when) -> String in
let formatter = DateFormatt()
formatter.dateStyle = format
return formatter.stri(from: when)
}.subscribe(onNext: (dateString) in
print(dateString)
})
.addDisposableTo(disBag)
}
通過上面的例子可以看出, 每次用戶選擇更新, 或者是時(shí)間更新的時(shí)候, 就會(huì)對(duì)最新數(shù)據(jù)進(jìn)行組合, 而后就可以在下一級(jí)進(jìn)行操作, 比如對(duì)組合后獲取到的對(duì)應(yīng)日期字符串進(jìn)行顯示或進(jìn)行處理.(這里只是演示, 實(shí)際的選擇需要通過用戶操作獲取, 而日期需要更新.)
最后需要注意的是: combineLatest 只有在所有的子序列都 complete 后, 目標(biāo)序列才會(huì) complete. 而如果有的已經(jīng)結(jié)束, 而又有的沒有結(jié)束, 則還是會(huì)將結(jié)束的子序列的最后一個(gè)值傳入進(jìn)行組合.
組合操作符中還有一類 zip, 它里面也有若干變體.
先來看一個(gè)例子.
example(of: "zip") {
enum Weather {
case cloudy
case sunny }
let left: Observable<Weather> = Observable.of(.sunny, .cloudy, .cloudy,
.sunny)
let right = Observable.of("Lisbon", "Copenhagen", "London", "Madrid",
"Vienna")
let observable = Observable.zip(left, right) { weather, city in
return "It's \(weather) in \(city)"
}
observable.subscribe(onNext: { value in
print(value)
})
}
輸出為:
--- Example of: zip ---
It's sunny in Lisbon
It's cloudy in Copenhagen
It's cloudy in London
It's sunny in Madrid
可以看到上面的 zip(_:_:resultSelector:) 作用是:
- 觀察每一個(gè)子序列
- 等待每一個(gè)發(fā)射新值
- 遇到新值后調(diào)用自定義的 closure
只是上面的 5 個(gè)城市對(duì)應(yīng) 4 個(gè)天氣, 輸出只有四個(gè). 因?yàn)橹灰粋€(gè)子序列發(fā)送了 complete, 則目標(biāo)序列就結(jié)束了.
5 觸發(fā)器
在實(shí)際 app 中, 通常都會(huì)有若干個(gè)輸入. 在這樣的場(chǎng)景中, 通常都會(huì)同時(shí)從多個(gè)輸入源(即 observable)上觀察到輸入, 而這些輸入可以分為兩類, 一類用于觸發(fā)代碼內(nèi)的操作, 一類用于提供數(shù)據(jù). 在 RxSwift 中提供了處理這些輸入的操作符.
首先來看 withLatestFrom(_:) 操作符, 這個(gè)操作符在處理 UI 操作的時(shí)候十分有用.
下面來看看例子.
如下代碼演示如何使用 withLatestFrom(_:) 操作符:
example(of: "withLatestFrom") {
// 1
let button = PublishSubject<Void>()
let textField = PublishSubject<String>()
// 2
let observable = button.withLatestFrom(textField)
let disposable = observable.subscribe(onNext: { value in
print(value)
})
// 3
textField.onNext("Par")
textField.onNext("Pari")
textField.onNext("Paris")
button.onNext()
button.onNext()
}
上面的代碼就在演示一個(gè)按鈕和一個(gè)輸入框, 末尾按鈕點(diǎn)擊兩次是有意為之.
輸入如下:
Paris
Paris
上面的代碼的工作流程是:
- 創(chuàng)建兩個(gè) Subject 來模擬按鈕和輸入框.
- 當(dāng)按鈕發(fā)射一個(gè)值, 忽略按鈕的值, 而去發(fā)射輸入框的最新的值.
- 通過模擬相鄰的兩次按鈕點(diǎn)擊來模擬兩次輸入框輸入.
可以看出, 使用這個(gè)操作符的目的是在發(fā)生某個(gè)事件的時(shí), 引起發(fā)射另外的某個(gè)特定值.
和 withLatestFrom(_:) 操作符類似的還有一個(gè) sample 操作符.
要理解它的作用, 首先來看代碼:
exampleOf("sample") {
let btn = PublishSubject<Bool>()
let textFieldText PublishSubject<String>()
// 這個(gè)操作符的作用是: 忽略按鈕發(fā)射的值,次按鈕發(fā)射值以后, 就去獲取輸入框的最新的值
let target = textFieldText.samp(btn)
target.subscribe(onNext: { (resultin
print(result)
}, onError: nil, onCompleted: nilonDisposed: nil)
.addDisposableTo(disBag)
textFieldText.onNext("pa")
textFieldText.onNext("par")
textFieldText.onNext("pari")
btn.onNext(true)
btn.onNext(true)
}
sample 的使用方式需要注意看. 上面代碼的輸出只有一個(gè) "pari". 因?yàn)楸M管模擬點(diǎn)擊了兩次按鈕, 但由于點(diǎn)擊兩次中間 textField 的 text 沒有改變, 故只會(huì)發(fā)射一次, 另外一次點(diǎn)擊引起的 sample 會(huì)被忽略.
這些等待觸發(fā)操作符在 UI 編程的時(shí)候十分有用.
不過有時(shí)觸發(fā)器的觸發(fā)條件是一系列的 observable, 或者你想等有了兩個(gè) observable 后只取其中一個(gè)? 這個(gè)是什么意思? 需要繼續(xù)往下看才知道!
6 切換
在 RxSwift 中有兩個(gè)廣義的 "開關(guān)" 操作符: amb 和 switchLatest. 它們都允許通過在源序列和組合序列之間切換來生成新的序列. 這樣的話, 就允許你在運(yùn)行時(shí)決定哪個(gè)序列上的事件能夠被觀察到.
下面的代碼演示 amb 的作用:
example(of: "amb") {
let left = PublishSubject<String>()
let right = PublishSubject<String>()
// 1
let observable = left.amb(right)
let disposable = observable.subscribe(onNext: { value in
print(value)
})
// 2
left.onNext("Lisbon")
right.onNext("Copenhagen")
left.onNext("London")
left.onNext("Madrid")
right.onNext("Vienna")
disposable.dispose()
}
上面的代碼的輸出只有 left 的事件數(shù)據(jù), 為什么呢? 下面再說, 先來看上面代碼的功能:
- 創(chuàng)建一個(gè) observable 來解決左和右的不匹配情況
- 兩個(gè)序列都開始發(fā)射數(shù)據(jù)
amb 操作符觀察 left 和 right 序列, 它會(huì)等待其中任何一個(gè)開始發(fā)射數(shù)據(jù), 誰(shuí)先發(fā)射數(shù)據(jù), 就放棄觀察另外一個(gè). 所以結(jié)果序列中始終都是首先觀察到發(fā)射數(shù)據(jù)的那個(gè)序列所發(fā)射的數(shù)據(jù).
同時(shí), 結(jié)果和方法調(diào)用時(shí)誰(shuí)作為參數(shù)是無關(guān)的!
amb 操作符的價(jià)值經(jīng)常會(huì)被忽略, 尤其是在一些特殊的使用場(chǎng)景中. 比如同時(shí)嘗試連接多個(gè)服務(wù)器, 并且選擇其中最先響應(yīng)的一個(gè).
另外一個(gè)使用更加普遍的操作符是 switchLatest 操作符:
要想明白它的功能, 首先也來看代碼:
exampleOf("switch Latest") {
let pub1 = PublishSubject<String>()
let pub2 = PublishSubject<String>()
let pub3 = PublishSubject<String>()
let source PublishSubject<Observable<String>>()
let target = source.switchLatest()
target.subscribe(onNext: { (elem) in
print(elem)
}).addDisposableTo(disBag
source.onNext(pub1)
pub1.onNext("1-1")
pub2.onNext("1-2")
pub3.onNext("1-3"
source.onNext(pub2)
pub1.onNext("2-1")
pub2.onNext("2-2")
pub3.onNext("2-3"
source.onNext(pub3)
pub1.onNext("3-1")
pub2.onNext("3-2")
pub3.onNext("3-3")
}
而輸出是由 source 中當(dāng)前輸出來決定的.
7 組合同一個(gè)序列中的元素
首先來看 reduce(_:accumulator:) 操作符, 它和 swift 中的 reduce 操作符類似.
還是先來看一個(gè)例子:
exampleOf("reduce") {
let source = Observable.of(1, 3, 5, 7, 9)
// let obv = source.reduce(0, accumulator: +)
let obv = source.reduce(0, accumulator: { (base, otherNum) -> Int in
return base + otherNum
})
obv.subscribe(onNext: { (num) in
print(num)
}, onError: { (err) in
print(err)
}, onCompleted: {
print("complete")
}, onDisposed: {
print("disposed")
}).addDisposableTo(disBag)
}
上面代碼中:
- 首先創(chuàng)建一個(gè) observable.
- 在創(chuàng)建的 observable 上使用 reduce 操作符, 其中第一個(gè)參數(shù)是累積器的初始值(注意是累積, 不一定是累加), 第二個(gè) closure 用于指定之后的值應(yīng)該如何計(jì)算.
- 當(dāng)源 observable 每發(fā)射一個(gè)值, 就會(huì)被送入累積器, 當(dāng)源發(fā)射 complete 時(shí), reduce 操作符也會(huì)將結(jié)果發(fā)射出來.然后再發(fā)送一個(gè) complete.
- 最終的結(jié)果是將序列中所有 next 值組合為一個(gè)值.
(在上面代碼中可以使用特殊語(yǔ)法允許直接使用一個(gè) "+" 號(hào), 這個(gè)是什么語(yǔ)法? 需要去找找... 用這個(gè)語(yǔ)法太方便了!)
需要注意, reduce 只有在源 observable 發(fā)射 complete 的時(shí)候才會(huì)發(fā)射最終結(jié)果. 如果將它用在永遠(yuǎn)不會(huì)發(fā)射 complete 的序列上, 則什么也不會(huì)發(fā)射.
和 reduce(_:accumulator:) 操作符類似的有一個(gè) scan(_:accumulator:) 操作符, 這個(gè)操作符的作用是對(duì)序列中的每一個(gè) next 事件數(shù)據(jù)進(jìn)行自定義處理.
來看如下代碼:
exampleOf("scan") {
let source = Observable.of(1, 3, 5, 7, 9)
// 也可以使用語(yǔ)法: let obv = source.scan(0, accumulator: +)
// 只是下面這樣完整的語(yǔ)法更方便理解.
let obv = source.scan(0, accumulator: { (base, nextValue) -> Int in
return base + nextValue
})
obv.subscribe(onNext: { (num) in
print(num)
}, onError: { (err) in
print(err)
}, onCompleted: {
print("complete")
}, onDisposed: {
print("disposed")
}).addDisposableTo(disBag)
}
上面代碼中只是把 reduce 換成了 scan:
- 首先創(chuàng)建一個(gè) observable.
- 在創(chuàng)建的 observable 上使用 scan 操作符, 其中第一個(gè)參數(shù)是累積器的初始值(注意是累積, 不一定是累加), 第二個(gè) closure 用于指定之后的值應(yīng)該如何計(jì)算.
- 當(dāng)源 observable 每發(fā)射一個(gè)值, 就會(huì)被送入累積器, 這里和 reduce 的區(qū)別是, reduce 只有當(dāng)源發(fā)射 complete 的時(shí)候才會(huì)發(fā)射最終結(jié)果, 而 scan 是源每發(fā)射一個(gè) next, 它就會(huì)把該 next 數(shù)據(jù)送去累積, 并在新的 observable 序列上發(fā)射出去. 當(dāng)源 observable 發(fā)射 complete 時(shí), 它同樣發(fā)射一個(gè) complete.
- 最終的結(jié)果是一個(gè)新的 observable, 序列中的每個(gè)值就是每次累積后的值.
相比而言, scan 的應(yīng)用范圍十分廣泛, 可以用它來實(shí)時(shí)計(jì)算總和, 統(tǒng)計(jì)結(jié)果, 計(jì)算狀態(tài)等. scan 中特別適合進(jìn)行狀態(tài)轉(zhuǎn)變操作, 并且不用再去定義一個(gè)外部變量記錄狀態(tài). 這樣的用法在第 20 章有很多.
九章完. 十章略先. 講的是實(shí)操.