在RxSwift中,有許多實用的操作符,也可以稱之為高階函數(shù),可以幫助我們創(chuàng)建特定功能的序列,或者組合變換原有的序列,生成一個新的序列。這里記錄一下,可以作為手冊以供快速查看。
組合操作符
- merge
將多個序列合并成一個新序列,當這多個序列中某一個序列發(fā)出一個元素時,新序列就將這個元素發(fā)出,當某一個序列發(fā)出error時,新序列也發(fā)出error,并終止序列。
let subject1 = PublishSubject<String>()
let subject2 = PublishSubject<String>()
Observable.of(subject1, subject2)
.merge()
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
subject1.onNext("R")
subject1.onNext("x")
subject2.onNext("S")
subject2.onNext("w")
subject1.onNext("i")
subject2.onNext("f")
subject1.onNext("t")
打?。?/p>
R
x
S
w
i
f
t
- zip
將多個(最多不超過8個) 序列的元素組合壓縮,而且是等到每個序列元素事件一一對應地湊齊之后再合并,然后將合并的結(jié)果元素發(fā)出來。組合時,嚴格按照序列的索引數(shù)進行組合。即返回序列第一個元素,是由每一個源序列的第一個元素組合出來,它的第二個元素 ,是由每一個源序列的第二個元素組合出來的,以此類推。它的元素數(shù)量等于源序列中元素數(shù)量最少的那個序列。
let stringSubject = PublishSubject<String>()
let intSubject = PublishSubject<Int>()
Observable.zip(stringSubject, intSubject) { stringElement, intElement in
"\(stringElement) \(intElement)"
}
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
stringSubject.onNext("R")
stringSubject.onNext("x") // 到這里存儲了 R X, 但是不會響應
intSubject.onNext(1) // 與R 組合成一個新序列,元素:R 1
intSubject.onNext(2) // 與X 組合成一個新序列,元素:X 2
stringSubject.onNext("Swift") // 保存Swift
intSubject.onNext(3) // 與Swift 組合成一個新序列,元素:Swift 3
// 劃重點: 只有兩個序列同時有值的時候才會響應,否則存值
打?。?/p>
R 1
x 2
Swift 3
- combineLatest
將多個序列中最新的元素組合起來,然后將這個組合的結(jié)果發(fā)出來。這些源序列中任何一個發(fā)出一個元素,他都會發(fā)出一個元素(前提是,這些序列曾經(jīng)發(fā)出過元素)。應用:同時滿足條件下觸發(fā)業(yè)務,如登錄賬號、密碼合法性校驗。
let stringSub = PublishSubject<String>()
let intSub = PublishSubject<Int>()
Observable.combineLatest(stringSub, intSub) { strElement, intElement in
"\(strElement) \(intElement)"
}
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
stringSub.onNext("R") // 保存R
stringSub.onNext("x") // 保存x覆蓋了R,和zip不一樣
intSub.onNext(1) // 這時兩個序列都有元素,響應 x 1
intSub.onNext(2) // 保存1覆蓋了2,都有值, 響應 x 2
stringSub.onNext("Swift") // 保存Swift覆蓋了x, 都有值,響應 Swift 2
打?。?/p>
x 1
x 2
Swift 2
- switchLatest
將序列發(fā)出的元素轉(zhuǎn)換成一個新序列,并從新序列發(fā)出元素。
let switchLatestSub1 = BehaviorSubject(value: "R")
let switchLatestSub2 = BehaviorSubject(value: "1")
// 選擇了 switchLatestSub1 就不會監(jiān)聽 switchLatestSub2
let switchLatestSub = BehaviorSubject(value: switchLatestSub1)
switchLatestSub.asObservable()
.switchLatest()
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
switchLatestSub1.onNext("x")
switchLatestSub1.onNext("_")
switchLatestSub2.onNext("2")
switchLatestSub2.onNext("3") // 2-3都會不會監(jiān)聽,保存的元素由2覆蓋1,3覆蓋2
switchLatestSub.onNext(switchLatestSub2) // 切換到 switchLatestSub2
switchLatestSub1.onNext("*")
switchLatestSub1.onNext("Swift") // 原理同上,最后保存的元素為Swift
switchLatestSub2.onNext("4")
switchLatestSub.onNext(switchLatestSub1) // 切換到 switchLatestSub1
打印:
R
x
_
3
4
Swift
- concat
將多個序列按順序串聯(lián)起來,只有當上一個序列發(fā)出了completed事件,才會開始發(fā)送下一個序列事件。
let concatSub1 = BehaviorSubject(value: "1")
let concatSub2 = BehaviorSubject(value: "A")
let concatSub = BehaviorSubject(value: concatSub1)
concatSub.asObservable()
.concat()
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
concatSub1.onNext("2")
concatSub.onNext(concatSub2) // 切換到 concatSub2
concatSub2.onNext("B")
concatSub2.onNext("C") // B-C都不會監(jiān)聽,但是默認保存由 B覆蓋A C覆蓋B
concatSub1.onCompleted() // 結(jié)束concatSub1
concatSub2.onNext("D")
打印:
1
2
C
D
- startWith
在發(fā)出序列的事件元素之前,會先發(fā)出這些預先插入的事件元素。
Observable.of("1", "2", "3", "4")
.startWith("A")
.startWith("B")
.startWith("C", "a", "b")
.subscribe(onNext: { print($0) })
.disposed(by: DisposeBag())
打?。?/p>
C
a
b
B
A
1
2
3
4
- withLatestFrom
將兩個序列最新的元素組合起來,當?shù)谝粋€序列發(fā)出一個元素,就將組合后的元素發(fā)送出來。
let withLatestFromStrSub = PublishSubject<String>()
let withLatestFromIntSub = PublishSubject<Int>()
// 當withLatestFromStrSub發(fā)出一個元素時,
// 就立即取出withLatestFromIntSub最新的元素,
// 將withLatestFromStrSub 中最新的元素strElement和withLatestFromIntSub最新的元素intElement組合,
//然后把組合結(jié)果 strElement+intElement發(fā)送出去
withLatestFromStrSub.withLatestFrom(withLatestFromIntSub) { strElement, intElement in
"\(strElement) \(intElement)"
}
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
// withLatestFromStrSub發(fā)出一個元素時,
// 立即取出withLatestFromIntSub最新的元素,
// 然后把withLatestFromIntSub中最新的元素發(fā)送出去
// withLatestFromStrSub.withLatestFrom(withLatestFromIntSub)
// .subscribe(onNext: { print($0) })
// .disposed(by: disposeBag)
withLatestFromStrSub.onNext("R")
withLatestFromStrSub.onNext("x") // 存了x覆蓋R
withLatestFromIntSub.onNext(1)
withLatestFromIntSub.onNext(2) // 存了2覆蓋1
withLatestFromStrSub.onNext("Swift") // 存了Swift覆蓋x,withLatestFromIntSub有最新元素2,發(fā)出組合序列事件
withLatestFromStrSub.onNext("Hello") // 存了Hello覆蓋Swift,withLatestFromIntSub有最新元素2,發(fā)出組合序列事件
打印:
Swift 2
Hello 2
2
2
轉(zhuǎn)換操作符
- map
通過傳入一個閉包把原序列轉(zhuǎn)變?yōu)橐粋€新序列,map函數(shù)會將原序列的所有元素進行轉(zhuǎn)換。
let ob = Observable.of(1,2,3,4)
ob.map { (number) -> Int in
return number+2
}
.subscribe{
print("\($0)")
}
.disposed(by: disposeBag)
打?。?/p>
next(3)
next(4)
next(5)
next(6)
completed
- flatMap
升維:flatMap 操作符會對源序列的每一個元素應用一個轉(zhuǎn)換方法,將他們轉(zhuǎn)換成 Observables,降維:然后將這些 Observables 的元素合并成一個序列之后再發(fā)送出來。
let strSub = BehaviorSubject(value: "A")
let intSub = BehaviorSubject(value: "1")
let flatSub = BehaviorSubject(value: strSub)
flatSub.asObservable()
.flatMap { $0 }
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
strSub.onNext("B")
flatSub.onNext(intSub) // 加入intSub序列,發(fā)出intSub和strSub序列元素轉(zhuǎn)化后的序列
intSub.onNext("2")
strSub.onNext("C")
intSub.onNext("3")
打?。?/p>
A
B
1
2
C
3
- flatMapLatest
將序列元素轉(zhuǎn)換成 Observables,然后取這些 Observables 中最新的一個發(fā)送。若轉(zhuǎn)換出一個新的 Observables,就只發(fā)出它的元素,舊的 Observables 的元素將被忽略掉。
let strSub = BehaviorSubject(value: "A")
let intSub = BehaviorSubject(value: "1")
let flatSub = BehaviorSubject(value: strSub)
flatSub.asObservable()
.flatMapLatest { $0 }
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
strSub.onNext("B")
// 加入intSub序列,發(fā)出intSub元素轉(zhuǎn)化后的序列,忽略strSub的轉(zhuǎn)化序列
flatSub.onNext(intSub)
intSub.onNext("2")
strSub.onNext("C")
intSub.onNext("3")
打?。?/p>
A
B
1
2
3
flatMapLatest實際上是map和switchLatest操作符的組合。
相對的有flatMapFirst,flatMapFirst只會接收最初加入序列的元素事件。
- concatMap
將源序列的每一個元素轉(zhuǎn)換,轉(zhuǎn)化成一個 Observables。然后將這些Observables 按順序發(fā)出元素。與flatMap 的區(qū)別是:當前一個序列的元素發(fā)送完畢后,下一個 Observable 才可以開始發(fā)出元素。
let concatMapSubject1 = BehaviorSubject(value: "A")
let concatMapSubject2 = BehaviorSubject(value: "1")
let concatMapSubject = BehaviorSubject(value: concatMapSubject1)
concatMapSubject.asObservable()
.concatMap() { $0 }
.subscribe { print($0) }
.disposed(by: disposeBag)
concatMapSubject1.onNext("B")
concatMapSubject1.onNext("C")
concatMapSubject.onNext(concatMapSubject2)
concatMapSubject2.onNext("2")
concatMapSubject2.onNext("3") // 保存3覆蓋2
// concatMapSubject1結(jié)束,開始訂閱concatMapSubject2的元素
concatMapSubject1.onCompleted()
concatMapSubject2.onNext("4")
打?。?/p>
next(A)
next(B)
next(C)
next(3)
next(4)
- scan
對第一個元素和傳入的初始參數(shù)使用傳入的閉包運算,將結(jié)果作為第一個元素發(fā)出。然后,將結(jié)果作為參數(shù)填入到第二個元素的閉包運算中,創(chuàng)建第二個元素。以此類推,直到遍歷完全部的元素。也就是scan 先給一個初始化的參數(shù)數(shù),然后不斷的拿前一個結(jié)果和最新的元素進行運算。
Observable.of(10, 100, 1000)
.scan(2) { aggregateValue, newValue in
aggregateValue + newValue // 10 + 2, 100 + 12, 1000 + 112
}
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
打?。?/p>
12
112
1112
- reduce
將對第一個元素使用傳入的閉包運算。然后,將結(jié)果作為參數(shù)填入到第二個元素的閉包運算中。以此類推,直到遍歷完全部的元素后發(fā)出最終結(jié)果。
Observable.of(10, 100, 1000)
.reduce(2) { aggregateValue, newValue in
aggregateValue + newValue // 1000 + 100 + 10 + 2
}
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
打印:
1112
- toArray
將一個序列轉(zhuǎn)成一個數(shù)組,并作為一個單一的事件發(fā)送,然后結(jié)束。
Observable.of(1, 2, 3)
.toArray()
.subscribe({ print($0) })
.disposed(by: disposeBag)
打印:
success([1, 2, 3])
過濾操作符
- filter
用來過濾掉不符合條件的事件,僅僅發(fā)出序列中通過判定的元素。
Observable.of(1,2,3,4,5,6,7,8,9,0)
.filter { $0 % 2 == 0 }
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
打?。?/p>
2
4
6
8
0
- single
只發(fā)出可觀察序列發(fā)出的第一個元素(或滿足條件的第一個元素)。如果可觀察序列發(fā)出多個元素,將拋出一個錯誤。
Observable.of("Rx", "Swift")
.single()
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
打?。?/p>
Rx
Unhandled error happened: Sequence contains more than one element.
subscription called from:
Observable.of("Rx", "Swift")
.single { $0 == "Swift" }
.subscribe { print($0) }
.disposed(by: disposeBag)
打?。?/p>
next(Swift)
completed
- take
只發(fā)出頭n個元素,忽略掉后面的元素,直接結(jié)束序列。
print("*****take*****")
Observable.of("A", "B","C", "D")
.take(2)
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
打印:
A
B
- takeLast
僅發(fā)送序列的后n個元素,忽略前面的元素。
Observable.of("R", "x","S", "wift")
.takeLast(3)
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
打?。?/p>
x
S
wift
- takeWhile
依次判斷序列的每一個值是否滿足給定的條件, 當?shù)谝粋€不滿足條件的值出現(xiàn)時,序列便自動結(jié)束。
Observable.of(1, 2, 3, 4, 2, 6)
.takeWhile { $0 < 3 }
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
打?。?/p>
1
2
- takeUntil
監(jiān)聽源Observable,同時監(jiān)聽一個條件序列。若條件序列發(fā)出一個元素或者產(chǎn)生一個終止事件,那么源Observable 將自動完成,停止發(fā)送事件。
let sourceSequence = PublishSubject<String>()
let referenceSequence = PublishSubject<String>()
sourceSequence
.takeUntil(referenceSequence)
.subscribe { print($0) }
.disposed(by: disposeBag)
sourceSequence.onNext("A")
sourceSequence.onNext("B")
sourceSequence.onNext("C")
referenceSequence.onNext("0") // 條件序列訂閱發(fā)出,源序列就結(jié)束
sourceSequence.onNext("1")
sourceSequence.onNext("2")
sourceSequence.onNext("3")
打?。?/p>
next(A)
next(B)
next(C)
completed
- skip
用于跳過序列發(fā)出的前n 個元素事件。
Observable.of(1, 2, 3, 4, 5, 6)
.skip(2)
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
打印:
3
4
5
6
- skipWhile
忽略源序列中頭幾個滿足條件的事件。
Observable.of(1, 7, 3, 4, 5, 6)
.skipWhile { $0 < 4 }
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
打?。?/p>
7
3
4
5
6
- skipUntil
與takeUntil類似,監(jiān)聽源Observable,同時監(jiān)聽一個條件序列。跳過Observable 中頭幾個元素,直到條件序列發(fā)出一個元素。
let sourceSeq = PublishSubject<String>()
let referenceSeq = PublishSubject<String>()
sourceSeq.skipUntil(referenceSeq)
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
// 沒有條件序列事件,源序列無法訂閱
sourceSeq.onNext("A")
sourceSeq.onNext("B")
sourceSeq.onNext("C")
referenceSeq.onNext("0") // 訂閱條件序列,源序列就開始訂閱
sourceSeq.onNext("1")
sourceSeq.onNext("2")
sourceSeq.onNext("3")
打?。?/p>
1
2
3
- elementAt
只發(fā)出出來中的第 n 個元素,即是只處理指定位置的元素事件。
Observable.of("R", "X", "S", "wift")
.elementAt(3)
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
打印:
wift
- distinctUntilChanged
用于過濾掉連續(xù)重復的事件。如果后一個元素和前一個元素是相同的,那么這個元素將不會被發(fā)出來。不連續(xù)的相同元素不受影響。
Observable.of("1", "2", "2", "2", "3", "3", "4")
.distinctUntilChanged()
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
打印:
1
2
3
4
- amb
amb操作符在多個源 Observables中, 取第一個發(fā)出元素或產(chǎn)生事件的 Observable,這個事件可以是一個 next,error 或者 completed事件,然后就只發(fā)出這個Observable的元素事件,忽略掉其他的Observables。
let subject1 = PublishSubject<Int>()
let subject2 = PublishSubject<Int>()
let subject3 = PublishSubject<Int>()
subject1
.amb(subject2)
.amb(subject3)
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
subject2.onNext(2) // 先發(fā)出事件,之后就只發(fā)出subject2的事件
subject1.onNext(1)
subject2.onNext(2)
subject1.onNext(1)
subject3.onNext(3)
subject2.onNext(2)
subject1.onNext(1)
subject3.onNext(3)
subject3.onNext(3)
打?。?/p>
2
2
2
Debug 操作符
- debug
打印所有的訂閱,事件以及銷毀信息。
打開RxSwift的Debug Mode方式:在 podfile 的末端添加如下代碼,重新 pod install 一下。
post_install do |installer|
installer.pods_project.targets.each do |target|
if target.name == 'RxSwift'
target.build_configurations.each do |config|
if config.name == 'Debug'
config.build_settings['OTHER_SWIFT_FLAGS'] ||= ['-D', 'TRACE_RESOURCES']
end
end
end
end
end
var count = 1
let error = NSError.init(domain: "com.xxx", code: 10000, userInfo: nil)
let sequenceThatErrors = Observable<String>.create { observer in
observer.onNext("A")
observer.onNext("B")
observer.onNext("C")
if count < 3 {
observer.onError(error)
print("onError")
count += 1
}
observer.onNext("D")
observer.onNext("E")
observer.onNext("F")
observer.onCompleted()
return Disposables.create()
}
sequenceThatErrors
.debug()
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
打?。?/p>
2019-08-27 17:44:45.792: ViewController.swift:572 (testDebug()) -> subscribed
2019-08-27 17:44:45.840: ViewController.swift:572 (testDebug()) -> Event next(A)
A
2019-08-27 17:44:45.840: ViewController.swift:572 (testDebug()) -> Event next(B)
B
2019-08-27 17:44:45.840: ViewController.swift:572 (testDebug()) -> Event next(C)
C
2019-08-27 17:44:45.842: ViewController.swift:572 (testDebug()) -> Event error(Error Domain=com.xxx Code=10000 "(null)")
Unhandled error happened: Error Domain=com.xxx Code=10000 "(null)"
subscription called from:
2019-08-27 17:44:45.845: ViewController.swift:572 (testDebug()) -> isDisposed
onError
- RxSwift.Resources.total
提供所有Rx資源分配的計數(shù),在開發(fā)期間檢測泄漏非常有用。
print(RxSwift.Resources.total)
let subject = BehaviorSubject(value: "RxSwift")
let subscription1 = subject.subscribe(onNext: { print($0) })
print(RxSwift.Resources.total)
let subscription2 = subject.subscribe(onNext: { print($0) })
print(RxSwift.Resources.total)
subscription1.dispose()
print(RxSwift.Resources.total)
subscription2.dispose()
print(RxSwift.Resources.total)
打?。?/p>
4
RxSwift
11
RxSwift
14
12
10
連接操作符
- publish
將序列轉(zhuǎn)換為可被連接的序列。可被連接的序列 在被訂閱后不會發(fā)出元素,直到connect操作符被應用為止。這樣你就可以控制序列在什么時候開始發(fā)出元素。
let interval = Observable<Int>.interval(.seconds(1), scheduler: MainScheduler.instance).publish()
interval.subscribe(onNext: { print("訂閱: 1, 事件: \($0)") })
.disposed(by: disposeBag)
DispatchQueue.main.asyncAfter(deadline: .now() + 1) {
// 在connect之前的訂閱不會發(fā)出元素,直到connect發(fā)出后
print("do connect")
_ = interval.connect()
}
DispatchQueue.main.asyncAfter(deadline: .now() + 2) {
interval.subscribe(onNext: { print("訂閱: 2, 事件: \($0)") })
.disposed(by: self.disposeBag)
}
DispatchQueue.main.asyncAfter(deadline: .now() + 4) {
interval.subscribe(onNext: { print("訂閱: 3, 事件: \($0)") })
.disposed(by: self.disposeBag)
}
DispatchQueue.main.asyncAfter(deadline: .now() + 6) {
self.disposeBag = DisposeBag()
}
打印:
訂閱: 1, 事件: 0
訂閱: 2, 事件: 0
訂閱: 1, 事件: 1
訂閱: 2, 事件: 1
訂閱: 1, 事件: 2
訂閱: 2, 事件: 2
訂閱: 3, 事件: 2
訂閱: 1, 事件: 3
訂閱: 2, 事件: 3
訂閱: 3, 事件: 3
- replay
將序列轉(zhuǎn)換為可被連接的序列,并且這個可被連接的序列將緩存最新的 n 個元素。當有新的觀察者對它進行訂閱時,它就把這些被緩存的元素發(fā)送給觀察者。
let sequenceThatErrors = Observable<String>.create { observer in
observer.onNext("A")
observer.onNext("B")
observer.onNext("C")
observer.onNext("D")
observer.onNext("E")
observer.onNext("F")
return Disposables.create()
}
.replay(5) // 緩存5個元素
sequenceThatErrors
.subscribe(onNext: {
print("before connect subscribe \($0)")
})
.disposed(by: disposeBag)
// 在connect之前的訂閱不會發(fā)出元素,直到connect發(fā)出后
print("do connect")
_ = sequenceThatErrors.connect()
sequenceThatErrors
.subscribe(onNext: {
print("after connect first subscribe \($0)")
})
.disposed(by: disposeBag)
sequenceThatErrors
.subscribe(onNext: {
print("after connect second subscribe \($0)")
})
.disposed(by: disposeBag)
打印:
do connect
before connect subscribe A
before connect subscribe B
before connect subscribe C
before connect subscribe D
before connect subscribe E
before connect subscribe F
after connect first subscribe B
after connect first subscribe C
after connect first subscribe D
after connect first subscribe E
after connect first subscribe F
after connect second subscribe B
after connect second subscribe C
after connect second subscribe D
after connect second subscribe E
after connect second subscribe F
- multicast
將序列轉(zhuǎn)換為可被連接的序列,同時傳入一個Subject,每當序列發(fā)送事件時都會觸發(fā)這個Subject的發(fā)送。
let subject = PublishSubject<Any>()
subject.subscribe{print("subject:\($0)")}
.disposed(by: disposeBag)
let netOB = Observable<Any>.create { (observer) -> Disposable in
sleep(2)// 模擬網(wǎng)絡延遲
print("開始請求網(wǎng)絡")
observer.onNext("請求到的網(wǎng)絡數(shù)據(jù)")
observer.onNext("請求到的本地")
observer.onCompleted()
return Disposables.create {
print("銷毀")
}
}.multicast(subject)
netOB.subscribe(onNext: { (anything) in
print("訂閱1:",anything)
})
.disposed(by: disposeBag)
netOB.subscribe(onNext: { (anything) in
print("訂閱2:",anything)
})
.disposed(by: disposeBag)
_ = netOB.connect()
打印:
subject:next(請求到的網(wǎng)絡數(shù)據(jù))
訂閱1: 請求到的網(wǎng)絡數(shù)據(jù)
訂閱2: 請求到的網(wǎng)絡數(shù)據(jù)
subject:next(請求到的本地)
訂閱1: 請求到的本地
訂閱2: 請求到的本地
subject:completed
銷毀
錯誤恢復操作符
- timeout
規(guī)定時間內(nèi)沒有產(chǎn)生元素就產(chǎn)生一個超時的error事件
let times = [
[ "value": 1, "time": 1 ],
[ "value": 2, "time": 2 ],
[ "value": 3, "time": 4.1 ]
]
let timeOutOb = Observable.from(times)
.flatMap({ (anyTime) in
return Observable.of(anyTime["value"]!)
.delaySubscription(Double(anyTime["time"]!), scheduler: MainScheduler.instance)
})
.timeout(2, scheduler: MainScheduler.instance)
timeOutOb.subscribe(onNext: { (value) in
print("onNext:", value)
}, onError: { (error) in
print("error:", error)
})
.disposed(by: disposeBag)
打?。?/p>
onNext: 1.0
onNext: 2.0
error: Sequence timeout.
- catchErrorJustReturn
如果產(chǎn)生錯誤,返回一個可觀察序列,該序列發(fā)出單個元素,然后終止。
let error = NSError.init(domain: "com.xxx", code: 10000, userInfo: nil)
let sequenceThatFails = PublishSubject<String>()
sequenceThatFails
.catchErrorJustReturn("AA")
.subscribe { print($0) }
.disposed(by: disposeBag)
sequenceThatFails.onNext("BB")
sequenceThatFails.onNext("CC") // 正常序列發(fā)送
sequenceThatFails.onError(error) // 發(fā)送失敗,返回設定的錯誤的預案
打?。?/p>
next(BB)
next(CC)
next(AA)
completed
- catchError
攔截一個error事件,將它替換成其他序列,然后傳遞給觀察者,訂閱新的序列。
let recoverySequence = PublishSubject<String>()
sequenceThatFails
.catchError {
print("Error:", $0)
return recoverySequence // 截獲到了錯誤序列,,返回一個新序列
}
.subscribe { print($0) }
.disposed(by: disposeBag)
sequenceThatFails.onNext("AA")
sequenceThatFails.onNext("BB") // 正常序列發(fā)送成功的
sequenceThatFails.onError(error) // 發(fā)送失敗的序列
recoverySequence.onNext("CC") // 替換的序列發(fā)送信號
sequenceThatFails.onNext("DD") // 原序列已結(jié)束,不能放松信號
recoverySequence.onNext("EE")
next(AA)
next(BB)
Error: Error Domain=com.xxx Code=10000 "(null)"
next(CC)
next(EE)
- retry
如果源序列產(chǎn)生一個錯誤事件,重新對它進行訂閱,讓它有機會不產(chǎn)生error事件。即便源序列產(chǎn)生了一個error事件,retry總是對觀察者發(fā)出 next 事件,所以這樣可能會產(chǎn)生重復的元素。
var count = 1 // 外界變量控制流程
let sequenceRetryErrors = Observable<String>.create { observer in
observer.onNext("AA")
observer.onNext("BB")
observer.onNext("CC")
if count == 1 { // 流程進來之后就會過度-這里的條件可以作為出口,失敗的次數(shù)
observer.onError(error) // 接收到了錯誤序列,重試序列發(fā)生
print("onError")
count += 1
}
observer.onNext("DD")
observer.onNext("EE")
observer.onCompleted()
return Disposables.create()
}
sequenceRetryErrors
.retry()
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
打?。?/p>
AA
BB
CC
onError
AA
BB
CC
DD
EE
設置重新訂閱次數(shù)(retry(_:))
let sequenceThatErrors = Observable<String>.create { observer in
observer.onNext("AA")
observer.onNext("BB")
observer.onNext("CC")
observer.onError(error)
observer.onNext("DD")
observer.onNext("EE")
observer.onCompleted()
return Disposables.create()
}
sequenceThatErrors
.retry(3) // 重新訂閱3次
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
}
打?。?/p>
AA
BB
CC
AA
BB
CC
AA
BB
CC
Unhandled error happened: Error Domain=com.xxx Code=10000 "(null)"
subscription called from:
總結(jié):
篇幅較長,想快速了解請直接看這里。
-
組合操作符
- merge
將多個序列合并成一個新序列,當這多個序列中某一個序列發(fā)出一個元素時,新序列就將這個元素發(fā)出,當某一個序列發(fā)出error時,新序列也發(fā)出error,并終止序列。 - zip
將多個(最多不超過8個) 序列的元素組合壓縮,而且是等到每個序列元素事件一一對應地湊齊之后再合并,然后將合并的結(jié)果元素發(fā)出來。 - combineLatest
將多個序列中最新的元素組合起來,然后將這個組合的結(jié)果發(fā)出來。 - switchLatest
將序列發(fā)出的元素轉(zhuǎn)換成一個新序列,并從新序列發(fā)出元素。 - concat
將多個序列按順序串聯(lián)起來,只有當上一個序列發(fā)出了completed事件,才會開始發(fā)送下一個序列事件。 - startWith
在發(fā)出序列的事件元素之前,會先發(fā)出這些預先插入的事件元素。 - withLatestFrom
將兩個序列最新的元素組合起來,當?shù)谝粋€序列發(fā)出一個元素,就將組合后的元素發(fā)送出來。
- merge
-
轉(zhuǎn)換操作符
- map
通過傳入的閉包把原序列轉(zhuǎn)變?yōu)橐粋€新序列,map函數(shù)會將原序列的所有元素進行轉(zhuǎn)換。 - flatMap
升維:對源序列的每一個元素應用轉(zhuǎn)換方法,將他們轉(zhuǎn)換成 Observables,降維:然后將這些 Observables 的元素合并成一個序列之后再發(fā)送出來。 - flatMapLatest
將序列元素轉(zhuǎn)換成 Observables,然后取這些 Observables 中最新的一個發(fā)送。若轉(zhuǎn)換出一個新的 Observables,就只發(fā)出它的元素,舊的 Observables 的元素將被忽略掉。 - concatMap
將源序列的每一個元素轉(zhuǎn)換,轉(zhuǎn)化成一個 Observables。然后將這些Observables 按順序發(fā)出元素。與flatMap 的區(qū)別是:當前一個序列的元素發(fā)送完畢后,下一個 Observable 才可以開始發(fā)出元素。 - scan
遍歷全部元素,對第一個元素和傳入的初始參數(shù)使用傳入的閉包運算,將結(jié)果作為第一個元素發(fā)出。然后不斷的拿前一個結(jié)果和最新的元素進行運算,并發(fā)出結(jié)果。 - reduce
將對第一個元素使用傳入的閉包運算。然后,將結(jié)果作為參數(shù)填入到第二個元素的閉包運算中。以此類推,直到遍歷完全部的元素后發(fā)出最終結(jié)果。 - toArray
將一個序列轉(zhuǎn)成一個數(shù)組,并作為一個單一的事件發(fā)送,然后結(jié)束。
- map
-
過濾操作符
- filter
用來過濾掉不符合條件的事件,僅僅發(fā)出序列中通過判定的元素。 - single
只發(fā)出可觀察序列發(fā)出的第一個元素(或滿足條件的第一個元素)。如果可觀察序列發(fā)出多個元素,將拋出一個錯誤。 - take
只發(fā)出頭n個元素,忽略掉后面的元素,直接結(jié)束序列。 - takeLast
僅發(fā)送序列的后n個元素,忽略前面的元素。 - takeWhile
依次判斷序列的每一個值是否滿足給定的條件, 當第一個不滿足條件的值出現(xiàn)時,序列便自動結(jié)束。 - takeUntil
監(jiān)聽源Observable,同時監(jiān)聽一個條件序列。若條件序列發(fā)出一個元素或者產(chǎn)生一個終止事件,那么源Observable 將自動完成,停止發(fā)送事件。 - skip
用于跳過序列發(fā)出的前n 個元素事件。 - skipWhile
忽略源序列中頭幾個滿足條件的事件。 - skipUntil
監(jiān)聽源Observable,同時監(jiān)聽一個條件序列。跳過Observable 中頭幾個元素,直到條件序列發(fā)出一個元素。 - elementAt
只發(fā)出出來中的第 n 個元素,即是只處理指定位置的元素事件。 - distinctUntilChanged
用于過濾掉連續(xù)重復的事件。如果后一個元素和前一個元素是相同的,那么這個元素將不會被發(fā)出來。不連續(xù)的相同元素不受影響。 - amb
在多個源 Observables中, 取第一個產(chǎn)生事件的 Observable,忽略掉其他的Observables。
- filter
-
Debug 操作符
- debug
打印所有的訂閱,事件以及銷毀信息。 - RxSwift.Resources.total
提供所有Rx資源分配的計數(shù),在開發(fā)期間檢測泄漏非常有用
- debug
-
連接操作符。
- publish
將序列轉(zhuǎn)換為可被連接的序列。可被連接的序列 在被訂閱后不會發(fā)出元素,直到 connect 操作符被應用為止。 - replay
將序列轉(zhuǎn)換為可被連接的序列,并且這個可被連接的序列將緩存最新的 n 個元素。當有新的觀察者對它進行訂閱時,它就把這些被緩存的元素發(fā)送給觀察者。 - multicast
將序列轉(zhuǎn)換為可被連接的序列,同時傳入一個 Subject,每當序列發(fā)送事件時都會觸發(fā)這個 Subject 的發(fā)送。
- publish
-
錯誤恢復操作符
- timeout
規(guī)定時間內(nèi)沒有產(chǎn)生元素就產(chǎn)生一個超時的 error 事件。 - catchErrorJustReturn
如果產(chǎn)生錯誤,返回一個可觀察序列,該序列發(fā)出單個元素,然后終止。 - catchError
攔截 error 事件,將它替換成其他序列,然后傳遞給觀察者,訂閱新的序列。 - retry
如果源序列產(chǎn)生一個錯誤事件,重新對它進行訂閱,讓它有機會不產(chǎn)生error事件??稍O置重新訂閱次數(shù),默認遇到一次錯誤事件重新訂閱一次。
- timeout