RxSwift(五)操作符的使用

在RxSwift中,有許多實用的操作符,也可以稱之為高階函數(shù),可以幫助我們創(chuàng)建特定功能的序列,或者組合變換原有的序列,生成一個新的序列。這里記錄一下,可以作為手冊以供快速查看。

組合操作符

  • merge
    將多個序列合并成一個新序列,當這多個序列中某一個序列發(fā)出一個元素時,新序列就將這個元素發(fā)出,當某一個序列發(fā)出error時,新序列也發(fā)出error,并終止序列。
        let subject1 = PublishSubject<String>()
        let subject2 = PublishSubject<String>()
        Observable.of(subject1, subject2)
            .merge()
            .subscribe(onNext: { print($0) })
            .disposed(by: disposeBag)
        
        subject1.onNext("R")
        subject1.onNext("x")
        subject2.onNext("S")
        subject2.onNext("w")
        subject1.onNext("i")
        subject2.onNext("f")
        subject1.onNext("t")

打?。?/p>

R
x
S
w
i
f
t
  • zip
    將多個(最多不超過8個) 序列的元素組合壓縮,而且是等到每個序列元素事件一一對應地湊齊之后再合并,然后將合并的結(jié)果元素發(fā)出來。組合時,嚴格按照序列的索引數(shù)進行組合。即返回序列第一個元素,是由每一個源序列的第一個元素組合出來,它的第二個元素 ,是由每一個源序列的第二個元素組合出來的,以此類推。它的元素數(shù)量等于源序列中元素數(shù)量最少的那個序列。
        let stringSubject = PublishSubject<String>()
        let intSubject = PublishSubject<Int>()

        Observable.zip(stringSubject, intSubject) { stringElement, intElement in
                "\(stringElement) \(intElement)"
            }
            .subscribe(onNext: { print($0) })
            .disposed(by: disposeBag)
        
        stringSubject.onNext("R")
        stringSubject.onNext("x")   // 到這里存儲了 R X, 但是不會響應
        intSubject.onNext(1)    // 與R 組合成一個新序列,元素:R 1
        intSubject.onNext(2)    // 與X 組合成一個新序列,元素:X 2
        stringSubject.onNext("Swift")   // 保存Swift
        intSubject.onNext(3)   // 與Swift 組合成一個新序列,元素:Swift 3
        // 劃重點: 只有兩個序列同時有值的時候才會響應,否則存值

打?。?/p>

R 1
x 2
Swift 3
  • combineLatest
    將多個序列中最新的元素組合起來,然后將這個組合的結(jié)果發(fā)出來。這些源序列中任何一個發(fā)出一個元素,他都會發(fā)出一個元素(前提是,這些序列曾經(jīng)發(fā)出過元素)。應用:同時滿足條件下觸發(fā)業(yè)務,如登錄賬號、密碼合法性校驗。
        let stringSub = PublishSubject<String>()
        let intSub = PublishSubject<Int>()
        Observable.combineLatest(stringSub, intSub) { strElement, intElement in
                "\(strElement) \(intElement)"
            }
            .subscribe(onNext: { print($0) })
            .disposed(by: disposeBag)
        
        stringSub.onNext("R")   // 保存R
        stringSub.onNext("x")   // 保存x覆蓋了R,和zip不一樣
        intSub.onNext(1)        // 這時兩個序列都有元素,響應 x 1
        intSub.onNext(2)        // 保存1覆蓋了2,都有值, 響應 x 2
        stringSub.onNext("Swift") // 保存Swift覆蓋了x, 都有值,響應 Swift 2

打?。?/p>

x 1
x 2
Swift 2
  • switchLatest
    將序列發(fā)出的元素轉(zhuǎn)換成一個新序列,并從新序列發(fā)出元素。
        let switchLatestSub1 = BehaviorSubject(value: "R")
        let switchLatestSub2 = BehaviorSubject(value: "1")
        // 選擇了 switchLatestSub1 就不會監(jiān)聽 switchLatestSub2
        let switchLatestSub  = BehaviorSubject(value: switchLatestSub1)  
        
        switchLatestSub.asObservable()
            .switchLatest()
            .subscribe(onNext: { print($0) })
            .disposed(by: disposeBag)
        
        switchLatestSub1.onNext("x")
        switchLatestSub1.onNext("_")
        switchLatestSub2.onNext("2")
        switchLatestSub2.onNext("3")  // 2-3都會不會監(jiān)聽,保存的元素由2覆蓋1,3覆蓋2
        switchLatestSub.onNext(switchLatestSub2)   // 切換到 switchLatestSub2
        switchLatestSub1.onNext("*")
        switchLatestSub1.onNext("Swift") // 原理同上,最后保存的元素為Swift
        switchLatestSub2.onNext("4")
        switchLatestSub.onNext(switchLatestSub1) // 切換到 switchLatestSub1

打印:

R
x
_
3
4
Swift
  • concat
    將多個序列按順序串聯(lián)起來,只有當上一個序列發(fā)出了completed事件,才會開始發(fā)送下一個序列事件。
        let concatSub1 = BehaviorSubject(value: "1")
        let concatSub2 = BehaviorSubject(value: "A")
        let concatSub = BehaviorSubject(value: concatSub1)
        
        concatSub.asObservable()
            .concat()
            .subscribe(onNext: { print($0) })
            .disposed(by: disposeBag)
        
        concatSub1.onNext("2")
        concatSub.onNext(concatSub2)    // 切換到 concatSub2
        concatSub2.onNext("B")
        concatSub2.onNext("C")    // B-C都不會監(jiān)聽,但是默認保存由 B覆蓋A C覆蓋B
        concatSub1.onCompleted()    // 結(jié)束concatSub1
        concatSub2.onNext("D")

打印:

1
2
C
D
  • startWith
    在發(fā)出序列的事件元素之前,會先發(fā)出這些預先插入的事件元素。
        Observable.of("1", "2", "3", "4")
            .startWith("A")
            .startWith("B")
            .startWith("C", "a", "b")
            .subscribe(onNext: { print($0) })
            .disposed(by: DisposeBag())

打?。?/p>

C
a
b
B
A
1
2
3
4
  • withLatestFrom
    將兩個序列最新的元素組合起來,當?shù)谝粋€序列發(fā)出一個元素,就將組合后的元素發(fā)送出來。
        let withLatestFromStrSub = PublishSubject<String>()
        let withLatestFromIntSub = PublishSubject<Int>()
        // 當withLatestFromStrSub發(fā)出一個元素時,
        // 就立即取出withLatestFromIntSub最新的元素,
        // 將withLatestFromStrSub 中最新的元素strElement和withLatestFromIntSub最新的元素intElement組合,
        //然后把組合結(jié)果 strElement+intElement發(fā)送出去
        withLatestFromStrSub.withLatestFrom(withLatestFromIntSub) { strElement, intElement in
            "\(strElement) \(intElement)"
            }
            .subscribe(onNext: { print($0) })
            .disposed(by: disposeBag)
        
        // withLatestFromStrSub發(fā)出一個元素時,
        // 立即取出withLatestFromIntSub最新的元素,
        // 然后把withLatestFromIntSub中最新的元素發(fā)送出去
//        withLatestFromStrSub.withLatestFrom(withLatestFromIntSub)
//            .subscribe(onNext: { print($0) })
//            .disposed(by: disposeBag)
        withLatestFromStrSub.onNext("R")
        withLatestFromStrSub.onNext("x") // 存了x覆蓋R
        withLatestFromIntSub.onNext(1)
        withLatestFromIntSub.onNext(2)      // 存了2覆蓋1
        withLatestFromStrSub.onNext("Swift") // 存了Swift覆蓋x,withLatestFromIntSub有最新元素2,發(fā)出組合序列事件
        withLatestFromStrSub.onNext("Hello") // 存了Hello覆蓋Swift,withLatestFromIntSub有最新元素2,發(fā)出組合序列事件

打印:

Swift 2
Hello 2
2
2

轉(zhuǎn)換操作符

  • map
    通過傳入一個閉包把原序列轉(zhuǎn)變?yōu)橐粋€新序列,map函數(shù)會將原序列的所有元素進行轉(zhuǎn)換。
        let ob = Observable.of(1,2,3,4)
        ob.map { (number) -> Int in
            return number+2
            }
            .subscribe{
                print("\($0)")
            }
            .disposed(by: disposeBag)

打?。?/p>

next(3)
next(4)
next(5)
next(6)
completed
  • flatMap
    升維:flatMap 操作符會對源序列的每一個元素應用一個轉(zhuǎn)換方法,將他們轉(zhuǎn)換成 Observables,降維:然后將這些 Observables 的元素合并成一個序列之后再發(fā)送出來。
        let strSub = BehaviorSubject(value: "A")
        let intSub = BehaviorSubject(value: "1")
        let flatSub = BehaviorSubject(value: strSub)
        
        flatSub.asObservable()
            .flatMap { $0 }
            .subscribe(onNext: { print($0) })
            .disposed(by: disposeBag)
        
        strSub.onNext("B")
        flatSub.onNext(intSub)  // 加入intSub序列,發(fā)出intSub和strSub序列元素轉(zhuǎn)化后的序列
        intSub.onNext("2")
        strSub.onNext("C")
        intSub.onNext("3")

打?。?/p>

A
B
1
2
C
3
  • flatMapLatest
    將序列元素轉(zhuǎn)換成 Observables,然后取這些 Observables 中最新的一個發(fā)送。若轉(zhuǎn)換出一個新的 Observables,就只發(fā)出它的元素,舊的 Observables 的元素將被忽略掉。
        let strSub = BehaviorSubject(value: "A")
        let intSub = BehaviorSubject(value: "1")
        let flatSub = BehaviorSubject(value: strSub)
        
        flatSub.asObservable()
            .flatMapLatest { $0 }
            .subscribe(onNext: { print($0) })
            .disposed(by: disposeBag)
        
        strSub.onNext("B")
        // 加入intSub序列,發(fā)出intSub元素轉(zhuǎn)化后的序列,忽略strSub的轉(zhuǎn)化序列
        flatSub.onNext(intSub)  
        intSub.onNext("2")
        strSub.onNext("C")
        intSub.onNext("3")

打?。?/p>

A
B
1
2
3

flatMapLatest實際上是mapswitchLatest操作符的組合。
相對的有flatMapFirst,flatMapFirst只會接收最初加入序列的元素事件。

  • concatMap
    將源序列的每一個元素轉(zhuǎn)換,轉(zhuǎn)化成一個 Observables。然后將這些Observables 按順序發(fā)出元素。與flatMap 的區(qū)別是:當前一個序列的元素發(fā)送完畢后,下一個 Observable 才可以開始發(fā)出元素。
        let concatMapSubject1 = BehaviorSubject(value: "A")
        let concatMapSubject2 = BehaviorSubject(value: "1")
        let concatMapSubject = BehaviorSubject(value: concatMapSubject1)
        
        concatMapSubject.asObservable()
            .concatMap() { $0 }
            .subscribe { print($0) }
            .disposed(by: disposeBag)
        
        concatMapSubject1.onNext("B")
        concatMapSubject1.onNext("C")
        concatMapSubject.onNext(concatMapSubject2)
        concatMapSubject2.onNext("2")
        concatMapSubject2.onNext("3")   // 保存3覆蓋2
        // concatMapSubject1結(jié)束,開始訂閱concatMapSubject2的元素
        concatMapSubject1.onCompleted() 
        concatMapSubject2.onNext("4")

打?。?/p>

next(A)
next(B)
next(C)
next(3)
next(4)
  • scan
    對第一個元素和傳入的初始參數(shù)使用傳入的閉包運算,將結(jié)果作為第一個元素發(fā)出。然后,將結(jié)果作為參數(shù)填入到第二個元素的閉包運算中,創(chuàng)建第二個元素。以此類推,直到遍歷完全部的元素。也就是scan 先給一個初始化的參數(shù)數(shù),然后不斷的拿前一個結(jié)果和最新的元素進行運算。
        Observable.of(10, 100, 1000)
            .scan(2) { aggregateValue, newValue in
                aggregateValue + newValue // 10 + 2, 100 + 12, 1000 + 112
            }
            .subscribe(onNext: { print($0) })
            .disposed(by: disposeBag)

打?。?/p>

12
112
1112
  • reduce
    將對第一個元素使用傳入的閉包運算。然后,將結(jié)果作為參數(shù)填入到第二個元素的閉包運算中。以此類推,直到遍歷完全部的元素后發(fā)出最終結(jié)果。
        Observable.of(10, 100, 1000)
            .reduce(2) { aggregateValue, newValue in
                aggregateValue + newValue // 1000 + 100 + 10 + 2
            }
            .subscribe(onNext: { print($0) })
            .disposed(by: disposeBag)

打印:

1112
  • toArray
    將一個序列轉(zhuǎn)成一個數(shù)組,并作為一個單一的事件發(fā)送,然后結(jié)束。
        Observable.of(1, 2, 3)
            .toArray()
            .subscribe({ print($0) })
            .disposed(by: disposeBag)

打印:

success([1, 2, 3])

過濾操作符

  • filter
    用來過濾掉不符合條件的事件,僅僅發(fā)出序列中通過判定的元素。
        Observable.of(1,2,3,4,5,6,7,8,9,0)
            .filter { $0 % 2 == 0 }
            .subscribe(onNext: { print($0) })
            .disposed(by: disposeBag)

打?。?/p>

2
4
6
8
0
  • single
    只發(fā)出可觀察序列發(fā)出的第一個元素(或滿足條件的第一個元素)。如果可觀察序列發(fā)出多個元素,將拋出一個錯誤。
        Observable.of("Rx", "Swift")
            .single()
            .subscribe(onNext: { print($0) })
            .disposed(by: disposeBag)

打?。?/p>

Rx
Unhandled error happened: Sequence contains more than one element.
 subscription called from:
        Observable.of("Rx", "Swift")
            .single { $0 == "Swift" }
            .subscribe { print($0) }
            .disposed(by: disposeBag)

打?。?/p>

next(Swift)
completed
  • take
    只發(fā)出頭n個元素,忽略掉后面的元素,直接結(jié)束序列。
        print("*****take*****")
        Observable.of("A", "B","C", "D")
            .take(2)
            .subscribe(onNext: { print($0) })
            .disposed(by: disposeBag)

打印:

A
B
  • takeLast
    僅發(fā)送序列的后n個元素,忽略前面的元素。
        Observable.of("R", "x","S", "wift")
            .takeLast(3)
            .subscribe(onNext: { print($0) })
            .disposed(by: disposeBag)

打?。?/p>

x
S
wift
  • takeWhile
    依次判斷序列的每一個值是否滿足給定的條件, 當?shù)谝粋€不滿足條件的值出現(xiàn)時,序列便自動結(jié)束。
        Observable.of(1, 2, 3, 4, 2, 6)
            .takeWhile { $0 < 3 }
            .subscribe(onNext: { print($0) })
            .disposed(by: disposeBag)

打?。?/p>

1
2
  • takeUntil
    監(jiān)聽源Observable,同時監(jiān)聽一個條件序列。若條件序列發(fā)出一個元素或者產(chǎn)生一個終止事件,那么源Observable 將自動完成,停止發(fā)送事件。
        let sourceSequence = PublishSubject<String>()
        let referenceSequence = PublishSubject<String>()
        
        sourceSequence
            .takeUntil(referenceSequence)
            .subscribe { print($0) }
            .disposed(by: disposeBag)
        
        sourceSequence.onNext("A")
        sourceSequence.onNext("B")
        sourceSequence.onNext("C")

        referenceSequence.onNext("0") // 條件序列訂閱發(fā)出,源序列就結(jié)束
        
        sourceSequence.onNext("1")
        sourceSequence.onNext("2")
        sourceSequence.onNext("3")

打?。?/p>

next(A)
next(B)
next(C)
completed
  • skip
    用于跳過序列發(fā)出的前n 個元素事件。
        Observable.of(1, 2, 3, 4, 5, 6)
            .skip(2)
            .subscribe(onNext: { print($0) })
            .disposed(by: disposeBag)

打印:

3
4
5
6
  • skipWhile
    忽略源序列中頭幾個滿足條件的事件。
        Observable.of(1, 7, 3, 4, 5, 6)
            .skipWhile { $0 < 4 }
            .subscribe(onNext: { print($0) })
            .disposed(by: disposeBag)

打?。?/p>

7
3
4
5
6
  • skipUntil
    與takeUntil類似,監(jiān)聽源Observable,同時監(jiān)聽一個條件序列。跳過Observable 中頭幾個元素,直到條件序列發(fā)出一個元素。
        let sourceSeq = PublishSubject<String>()
        let referenceSeq = PublishSubject<String>()
        
        sourceSeq.skipUntil(referenceSeq)
            .subscribe(onNext: { print($0) })
            .disposed(by: disposeBag)
        
        // 沒有條件序列事件,源序列無法訂閱
        sourceSeq.onNext("A")
        sourceSeq.onNext("B")
        sourceSeq.onNext("C")
        
        referenceSeq.onNext("0") // 訂閱條件序列,源序列就開始訂閱
        
        sourceSeq.onNext("1")
        sourceSeq.onNext("2")
        sourceSeq.onNext("3")

打?。?/p>

1
2
3
  • elementAt
    只發(fā)出出來中的第 n 個元素,即是只處理指定位置的元素事件。
        Observable.of("R", "X", "S", "wift")
            .elementAt(3)
            .subscribe(onNext: { print($0) })
            .disposed(by: disposeBag)

打印:

wift
  • distinctUntilChanged
    用于過濾掉連續(xù)重復的事件。如果后一個元素和前一個元素是相同的,那么這個元素將不會被發(fā)出來。不連續(xù)的相同元素不受影響。
        Observable.of("1", "2", "2", "2", "3", "3", "4")
            .distinctUntilChanged()
            .subscribe(onNext: { print($0) })
            .disposed(by: disposeBag)

打印:

1
2
3
4
  • amb
    amb操作符在多個源 Observables中, 取第一個發(fā)出元素或產(chǎn)生事件的 Observable,這個事件可以是一個 next,error 或者 completed事件,然后就只發(fā)出這個Observable的元素事件,忽略掉其他的Observables。
        let subject1 = PublishSubject<Int>()
        let subject2 = PublishSubject<Int>()
        let subject3 = PublishSubject<Int>()
        
        subject1
            .amb(subject2)
            .amb(subject3)
            .subscribe(onNext: { print($0) })
            .disposed(by: disposeBag)
        
        subject2.onNext(2)  // 先發(fā)出事件,之后就只發(fā)出subject2的事件
        subject1.onNext(1)
        subject2.onNext(2)
        subject1.onNext(1)
        subject3.onNext(3)
        subject2.onNext(2)
        subject1.onNext(1)
        subject3.onNext(3)
        subject3.onNext(3)

打?。?/p>

2
2
2

Debug 操作符

  • debug
    打印所有的訂閱,事件以及銷毀信息。
    打開RxSwift的Debug Mode方式:在 podfile 的末端添加如下代碼,重新 pod install 一下。
post_install do |installer|
  installer.pods_project.targets.each do |target|
    if target.name == 'RxSwift'
      target.build_configurations.each do |config|
        if config.name == 'Debug'
          config.build_settings['OTHER_SWIFT_FLAGS'] ||= ['-D', 'TRACE_RESOURCES']
        end
      end
    end
  end
end
        var count = 1
        let error = NSError.init(domain: "com.xxx", code: 10000, userInfo: nil)
        let sequenceThatErrors = Observable<String>.create { observer in
            observer.onNext("A")
            observer.onNext("B")
            observer.onNext("C")
            
            if count < 3 {
                observer.onError(error)
                print("onError")
                count += 1
            }
            
            observer.onNext("D")
            observer.onNext("E")
            observer.onNext("F")
            observer.onCompleted()
            
            return Disposables.create()
        }
        
        sequenceThatErrors
            .debug()
            .subscribe(onNext: { print($0) })
            .disposed(by: disposeBag)

打?。?/p>

2019-08-27 17:44:45.792: ViewController.swift:572 (testDebug()) -> subscribed
2019-08-27 17:44:45.840: ViewController.swift:572 (testDebug()) -> Event next(A)
A
2019-08-27 17:44:45.840: ViewController.swift:572 (testDebug()) -> Event next(B)
B
2019-08-27 17:44:45.840: ViewController.swift:572 (testDebug()) -> Event next(C)
C
2019-08-27 17:44:45.842: ViewController.swift:572 (testDebug()) -> Event error(Error Domain=com.xxx Code=10000 "(null)")
Unhandled error happened: Error Domain=com.xxx Code=10000 "(null)"
 subscription called from:

2019-08-27 17:44:45.845: ViewController.swift:572 (testDebug()) -> isDisposed
onError
  • RxSwift.Resources.total
    提供所有Rx資源分配的計數(shù),在開發(fā)期間檢測泄漏非常有用。
        print(RxSwift.Resources.total)
        let subject = BehaviorSubject(value: "RxSwift")

        let subscription1 = subject.subscribe(onNext: { print($0) })
        print(RxSwift.Resources.total)

        let subscription2 = subject.subscribe(onNext: { print($0) })
        print(RxSwift.Resources.total)

        subscription1.dispose()
        print(RxSwift.Resources.total)

        subscription2.dispose()
        print(RxSwift.Resources.total)

打?。?/p>

4
RxSwift
11
RxSwift
14
12
10

連接操作符

  • publish
    將序列轉(zhuǎn)換為可被連接的序列可被連接的序列 在被訂閱后不會發(fā)出元素,直到 connect操作符被應用為止。這樣你就可以控制序列在什么時候開始發(fā)出元素。
        let interval = Observable<Int>.interval(.seconds(1), scheduler: MainScheduler.instance).publish()
        
        interval.subscribe(onNext: { print("訂閱: 1, 事件: \($0)") })
            .disposed(by: disposeBag)
        
        DispatchQueue.main.asyncAfter(deadline: .now() + 1) {
            // 在connect之前的訂閱不會發(fā)出元素,直到connect發(fā)出后
            print("do connect")
            _ = interval.connect()
        }
        DispatchQueue.main.asyncAfter(deadline: .now() + 2) {
            interval.subscribe(onNext: { print("訂閱: 2, 事件: \($0)") })
                .disposed(by: self.disposeBag)
        }
        DispatchQueue.main.asyncAfter(deadline: .now() + 4) {
            interval.subscribe(onNext: { print("訂閱: 3, 事件: \($0)") })
                .disposed(by: self.disposeBag)
        }
        DispatchQueue.main.asyncAfter(deadline: .now() + 6) {
            self.disposeBag = DisposeBag()
        }

打印:

訂閱: 1, 事件: 0
訂閱: 2, 事件: 0
訂閱: 1, 事件: 1
訂閱: 2, 事件: 1
訂閱: 1, 事件: 2
訂閱: 2, 事件: 2
訂閱: 3, 事件: 2
訂閱: 1, 事件: 3
訂閱: 2, 事件: 3
訂閱: 3, 事件: 3
  • replay
    將序列轉(zhuǎn)換為可被連接的序列,并且這個可被連接的序列將緩存最新的 n 個元素。當有新的觀察者對它進行訂閱時,它就把這些被緩存的元素發(fā)送給觀察者。
        
        let sequenceThatErrors = Observable<String>.create { observer in
            observer.onNext("A")
            observer.onNext("B")
            observer.onNext("C")
            observer.onNext("D")
            observer.onNext("E")
            observer.onNext("F")
            return Disposables.create()
        }
        .replay(5)  // 緩存5個元素
        
        sequenceThatErrors
            .subscribe(onNext: {
                print("before connect subscribe \($0)")
            })
            .disposed(by: disposeBag)

        // 在connect之前的訂閱不會發(fā)出元素,直到connect發(fā)出后
        print("do connect")
        _ = sequenceThatErrors.connect()
        
        sequenceThatErrors
            .subscribe(onNext: {
                print("after connect first subscribe \($0)")
            })
            .disposed(by: disposeBag)
        sequenceThatErrors
            .subscribe(onNext: {
                print("after connect second subscribe \($0)")
                })
            .disposed(by: disposeBag)

打印:

do connect
before connect subscribe A
before connect subscribe B
before connect subscribe C
before connect subscribe D
before connect subscribe E
before connect subscribe F
after connect first subscribe B
after connect first subscribe C
after connect first subscribe D
after connect first subscribe E
after connect first subscribe F
after connect second subscribe B
after connect second subscribe C
after connect second subscribe D
after connect second subscribe E
after connect second subscribe F
  • multicast
    將序列轉(zhuǎn)換為可被連接的序列,同時傳入一個 Subject,每當序列發(fā)送事件時都會觸發(fā)這個 Subject的發(fā)送。
        let subject = PublishSubject<Any>()
        subject.subscribe{print("subject:\($0)")}
            .disposed(by: disposeBag)
        
        let netOB = Observable<Any>.create { (observer) -> Disposable in
                sleep(2)// 模擬網(wǎng)絡延遲
                print("開始請求網(wǎng)絡")
                observer.onNext("請求到的網(wǎng)絡數(shù)據(jù)")
                observer.onNext("請求到的本地")
                observer.onCompleted()
                return Disposables.create {
                    print("銷毀")
                }
            }.multicast(subject)
        
        netOB.subscribe(onNext: { (anything) in
                print("訂閱1:",anything)
            })
            .disposed(by: disposeBag)
        
        netOB.subscribe(onNext: { (anything) in
                print("訂閱2:",anything)
            })
            .disposed(by: disposeBag)
        
        _ = netOB.connect()

打印:

subject:next(請求到的網(wǎng)絡數(shù)據(jù))
訂閱1: 請求到的網(wǎng)絡數(shù)據(jù)
訂閱2: 請求到的網(wǎng)絡數(shù)據(jù)
subject:next(請求到的本地)
訂閱1: 請求到的本地
訂閱2: 請求到的本地
subject:completed
銷毀

錯誤恢復操作符

  • timeout
    規(guī)定時間內(nèi)沒有產(chǎn)生元素就產(chǎn)生一個超時的 error事件
        let times = [
            [ "value": 1, "time": 1 ],
            [ "value": 2, "time": 2 ],
            [ "value": 3, "time": 4.1 ]
        ]
        let timeOutOb = Observable.from(times)
            .flatMap({ (anyTime) in
                return Observable.of(anyTime["value"]!)
                    .delaySubscription(Double(anyTime["time"]!), scheduler: MainScheduler.instance)
            })
            .timeout(2, scheduler: MainScheduler.instance)
        timeOutOb.subscribe(onNext: { (value) in
            print("onNext:", value)
        }, onError: { (error) in
            print("error:", error)
        })
        .disposed(by: disposeBag)

打?。?/p>

onNext: 1.0
onNext: 2.0
error: Sequence timeout.
  • catchErrorJustReturn
    如果產(chǎn)生錯誤,返回一個可觀察序列,該序列發(fā)出單個元素,然后終止。
        let error = NSError.init(domain: "com.xxx", code: 10000, userInfo: nil)
        let sequenceThatFails = PublishSubject<String>()
        
        sequenceThatFails
            .catchErrorJustReturn("AA")
            .subscribe { print($0) }
            .disposed(by: disposeBag)
        
        sequenceThatFails.onNext("BB")
        sequenceThatFails.onNext("CC")  // 正常序列發(fā)送
        sequenceThatFails.onError(error)   // 發(fā)送失敗,返回設定的錯誤的預案

打?。?/p>

next(BB)
next(CC)
next(AA)
completed
  • catchError
    攔截一個 error事件,將它替換成其他序列,然后傳遞給觀察者,訂閱新的序列。
        let recoverySequence = PublishSubject<String>()
        
        sequenceThatFails
            .catchError {
                print("Error:", $0)
                return recoverySequence  // 截獲到了錯誤序列,,返回一個新序列
            }
            .subscribe { print($0) }
            .disposed(by: disposeBag)
        
        sequenceThatFails.onNext("AA")
        sequenceThatFails.onNext("BB") // 正常序列發(fā)送成功的
        sequenceThatFails.onError(error) // 發(fā)送失敗的序列
        
        recoverySequence.onNext("CC")   // 替換的序列發(fā)送信號
        sequenceThatFails.onNext("DD")  // 原序列已結(jié)束,不能放松信號
        recoverySequence.onNext("EE")
next(AA)
next(BB)
Error: Error Domain=com.xxx Code=10000 "(null)"
next(CC)
next(EE)
  • retry
    如果源序列產(chǎn)生一個錯誤事件,重新對它進行訂閱,讓它有機會不產(chǎn)生 error 事件。即便源序列產(chǎn)生了一個 error事件,retry總是對觀察者發(fā)出 next 事件,所以這樣可能會產(chǎn)生重復的元素。
        var count = 1 // 外界變量控制流程
        let sequenceRetryErrors = Observable<String>.create { observer in
            observer.onNext("AA")
            observer.onNext("BB")
            observer.onNext("CC")
            
            if count == 1 { // 流程進來之后就會過度-這里的條件可以作為出口,失敗的次數(shù)
                observer.onError(error)  // 接收到了錯誤序列,重試序列發(fā)生
                print("onError")
                count += 1
            }
            
            observer.onNext("DD")
            observer.onNext("EE")
            observer.onCompleted()
            
            return Disposables.create()
        }
        
        sequenceRetryErrors
            .retry()
            .subscribe(onNext: { print($0) })
            .disposed(by: disposeBag)

打?。?/p>

AA
BB
CC
onError
AA
BB
CC
DD
EE

設置重新訂閱次數(shù)(retry(_:))

        let sequenceThatErrors = Observable<String>.create { observer in
            observer.onNext("AA")
            observer.onNext("BB")
            observer.onNext("CC")
            
            observer.onError(error)
            observer.onNext("DD")
            observer.onNext("EE")
            observer.onCompleted()
            
            return Disposables.create()
        }
        
        sequenceThatErrors
            .retry(3)  // 重新訂閱3次
            .subscribe(onNext: { print($0) })
            .disposed(by: disposeBag)
        
    }

打?。?/p>

AA
BB
CC
AA
BB
CC
AA
BB
CC
Unhandled error happened: Error Domain=com.xxx Code=10000 "(null)"
 subscription called from:

總結(jié):
篇幅較長,想快速了解請直接看這里。

  1. 組合操作符

    • merge
      將多個序列合并成一個新序列,當這多個序列中某一個序列發(fā)出一個元素時,新序列就將這個元素發(fā)出,當某一個序列發(fā)出error時,新序列也發(fā)出error,并終止序列。
    • zip
      將多個(最多不超過8個) 序列的元素組合壓縮,而且是等到每個序列元素事件一一對應地湊齊之后再合并,然后將合并的結(jié)果元素發(fā)出來。
    • combineLatest
      將多個序列中最新的元素組合起來,然后將這個組合的結(jié)果發(fā)出來。
    • switchLatest
      將序列發(fā)出的元素轉(zhuǎn)換成一個新序列,并從新序列發(fā)出元素。
    • concat
      將多個序列按順序串聯(lián)起來,只有當上一個序列發(fā)出了completed事件,才會開始發(fā)送下一個序列事件。
    • startWith
      在發(fā)出序列的事件元素之前,會先發(fā)出這些預先插入的事件元素。
    • withLatestFrom
      將兩個序列最新的元素組合起來,當?shù)谝粋€序列發(fā)出一個元素,就將組合后的元素發(fā)送出來。
  2. 轉(zhuǎn)換操作符

    • map
      通過傳入的閉包把原序列轉(zhuǎn)變?yōu)橐粋€新序列,map函數(shù)會將原序列的所有元素進行轉(zhuǎn)換。
    • flatMap
      升維:對源序列的每一個元素應用轉(zhuǎn)換方法,將他們轉(zhuǎn)換成 Observables,降維:然后將這些 Observables 的元素合并成一個序列之后再發(fā)送出來。
    • flatMapLatest
      將序列元素轉(zhuǎn)換成 Observables,然后取這些 Observables 中最新的一個發(fā)送。若轉(zhuǎn)換出一個新的 Observables,就只發(fā)出它的元素,舊的 Observables 的元素將被忽略掉。
    • concatMap
      將源序列的每一個元素轉(zhuǎn)換,轉(zhuǎn)化成一個 Observables。然后將這些Observables 按順序發(fā)出元素。與flatMap 的區(qū)別是:當前一個序列的元素發(fā)送完畢后,下一個 Observable 才可以開始發(fā)出元素。
    • scan
      遍歷全部元素,對第一個元素和傳入的初始參數(shù)使用傳入的閉包運算,將結(jié)果作為第一個元素發(fā)出。然后不斷的拿前一個結(jié)果和最新的元素進行運算,并發(fā)出結(jié)果。
    • reduce
      將對第一個元素使用傳入的閉包運算。然后,將結(jié)果作為參數(shù)填入到第二個元素的閉包運算中。以此類推,直到遍歷完全部的元素后發(fā)出最終結(jié)果。
    • toArray
      將一個序列轉(zhuǎn)成一個數(shù)組,并作為一個單一的事件發(fā)送,然后結(jié)束。
  3. 過濾操作符

    • filter
      用來過濾掉不符合條件的事件,僅僅發(fā)出序列中通過判定的元素。
    • single
      只發(fā)出可觀察序列發(fā)出的第一個元素(或滿足條件的第一個元素)。如果可觀察序列發(fā)出多個元素,將拋出一個錯誤。
    • take
      只發(fā)出頭n個元素,忽略掉后面的元素,直接結(jié)束序列。
    • takeLast
      僅發(fā)送序列的后n個元素,忽略前面的元素。
    • takeWhile
      依次判斷序列的每一個值是否滿足給定的條件, 當第一個不滿足條件的值出現(xiàn)時,序列便自動結(jié)束。
    • takeUntil
      監(jiān)聽源Observable,同時監(jiān)聽一個條件序列。若條件序列發(fā)出一個元素或者產(chǎn)生一個終止事件,那么源Observable 將自動完成,停止發(fā)送事件。
    • skip
      用于跳過序列發(fā)出的前n 個元素事件。
    • skipWhile
      忽略源序列中頭幾個滿足條件的事件。
    • skipUntil
      監(jiān)聽源Observable,同時監(jiān)聽一個條件序列。跳過Observable 中頭幾個元素,直到條件序列發(fā)出一個元素。
    • elementAt
      只發(fā)出出來中的第 n 個元素,即是只處理指定位置的元素事件。
    • distinctUntilChanged
      用于過濾掉連續(xù)重復的事件。如果后一個元素和前一個元素是相同的,那么這個元素將不會被發(fā)出來。不連續(xù)的相同元素不受影響。
    • amb
      在多個源 Observables中, 取第一個產(chǎn)生事件的 Observable,忽略掉其他的Observables。
  4. Debug 操作符

    • debug
      打印所有的訂閱,事件以及銷毀信息。
    • RxSwift.Resources.total
      提供所有Rx資源分配的計數(shù),在開發(fā)期間檢測泄漏非常有用
  5. 連接操作符。

    • publish
      將序列轉(zhuǎn)換為可被連接的序列。可被連接的序列 在被訂閱后不會發(fā)出元素,直到 connect 操作符被應用為止。
    • replay
      將序列轉(zhuǎn)換為可被連接的序列,并且這個可被連接的序列將緩存最新的 n 個元素。當有新的觀察者對它進行訂閱時,它就把這些被緩存的元素發(fā)送給觀察者。
    • multicast
      將序列轉(zhuǎn)換為可被連接的序列,同時傳入一個 Subject,每當序列發(fā)送事件時都會觸發(fā)這個 Subject 的發(fā)送。
  6. 錯誤恢復操作符

    • timeout
      規(guī)定時間內(nèi)沒有產(chǎn)生元素就產(chǎn)生一個超時的 error 事件。
    • catchErrorJustReturn
      如果產(chǎn)生錯誤,返回一個可觀察序列,該序列發(fā)出單個元素,然后終止。
    • catchError
      攔截 error 事件,將它替換成其他序列,然后傳遞給觀察者,訂閱新的序列。
    • retry
      如果源序列產(chǎn)生一個錯誤事件,重新對它進行訂閱,讓它有機會不產(chǎn)生 error 事件??稍O置重新訂閱次數(shù),默認遇到一次錯誤事件重新訂閱一次。
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容