RxSwift學習筆記

最近在學習RxSwift相關的內容,在這里記錄一些基本的知識點,以便今后查閱。

Observable

在RxSwift中,最關鍵的一個概念是可觀察序列(Observable Sequence),它相當于Swift中的序列(Sequence),可觀察序列中的每個元素都是一個事件,我們知道Swift的序列中可以包含任意多個元素,類似的,可觀察序列會不斷產生新的事件直到發(fā)生錯誤或正常結束為止。訂閱者(Observer)通過訂閱(subscribe)一個可觀察隊列來接收序列所產生的新事件,只有在有觀察者的情況下序列才可以發(fā)送事件。

例如,使用of操作創(chuàng)建一個可觀察序列:

let seq = Observable.of(1, 2, 3) 

of是一種用來創(chuàng)建Observable的簡便操作,在上面的代碼中創(chuàng)建了一個類型為Observable<Int>的Observable,里面包含了三個元素:1,2,3。

來看看Observable中都提供了哪些操作,可觀察序列是一個實現了ObservableType協議的類型,ObservableType協議的定義非常簡單:

protocol ObservableType : ObservableConvertibleType {
    associatedtype E
    func subscribe<O: ObserverType>(_ observer: O) -> Disposable where O.E == E
}

其中E是一個關聯類型,表示序列中元素的類型,除此之外協議只定義了一個方法:subscribe,用于向可觀察序列添加一個觀察者(ObserverType類型):

// 接收閉包的subscribe函數是通過協議擴展提供的簡便方法
seq.subscribe { (event) in
    print(event)
}

subscribe相當于Swift序列中的遍歷操作(makeIterator),如上,向seq序列添加一個觀察者,在序列中有新的事件時調用該閉包,上面的代碼會輸出1,2,3。

Observer

觀察者是實現了ObserverType協議的對象,ObserverType協議同樣十分簡單:

public protocol ObserverType {
    associatedtype E
    func on(_ event: Event<E>)
}

E為觀察者所觀察序列中的元素類型,當序列中有新的事件產生時,會調用on方法來接收新的事件。其中事件的類型Event是一個枚舉,其中包含3個類型:

enum Event<Element> {
    case next(Element)
    case error(Swift.Error)
    case completed
}
  1. .next:表示序列中產生了下一個事件,關聯值Element保存了該事件的值。
  2. .error:序列產生了一個錯誤,關聯值Error保存了錯誤類型,在這之后序列會直接結束(不再產生新的next事件)。
  3. .completed:序列正常結束。

Dispose

除了產生錯誤和自然結束以外,還可以手動結束觀察,在使用subscribe訂閱一個可觀察序列時,會返回一個Disposable類型的對象。這里的Disposable是一個協議,只定義了一個方法:

protocol Disposable {
    func dispose()
}

dispose方法用來結束此次訂閱并釋放可觀察序列中的相關資源,通常來說你并不需要直接調用該方法,而是通過調用其擴展方法addDisposableToDisposable添加到一個DisposeBag對象中。DisposeBag對象會自動管理所有添加到其中的Disposable對象,在DisposeBag對象銷毀的時候會自動調用其中所有Disposable的dispose方法釋放資源。

也可以使用takeUntil來自動結束訂閱:

seq.takeUntil(otherSeq)
    .subscribe({ (event) in
        print(event)
    })

在otherSeq序列發(fā)出任意類型的事件之后,自動結束本次訂閱。

創(chuàng)建序列

通過Observable類型提供的方法create可以創(chuàng)建一個自定義的可觀察序列:

let seq = Observable<Int>.create { (observer) -> Disposable in
    observer.on(.next(1))
    observer.on(.completed)
    return Disposables.create {
        // do some cleanup
    }
}

create方法使用一個閉包來創(chuàng)建自定義的序列,閉包接收一個ObserverType的參數observer,并通過observer來發(fā)送相應的事件。如上面的代碼,創(chuàng)建了一個Observable<Int>類型的可觀察序列,訂閱該序列的觀察者會收到事件1和一個完成事件。最后create方法返回一個自己創(chuàng)建的Disposable對象,可以在這里進行一些相關的資源回收操作。

除了create方法之外,RxSwift中提供了很多中簡便的方法用于創(chuàng)建序列,常用的有:

  • just:創(chuàng)建一個只包含一個值的可觀察序列:

    let justSeq = Observable.just(1)
    justSeq.subscribe { (event) in
        print(event)
    }
    ---- example output ----
    next(1)
    completed
    
  • ofofjust有點類似,不同的是of可以將一系列元素創(chuàng)建成事件隊列,該Observable依次發(fā)送相應事件和結束事件:

    let ofSeq = Observable.of(1, 2, 3)
    ofSeq.subscribe { (event) in
        print(event)
    }
    ---- example output ----
    next(1)
    next(2)
    next(3)
    completed
    
  • empty:這種類型的Observable只發(fā)送結束(Completed)事件

    let emptySequence = Observable<String>.empty()
    
  • error:該隊列只發(fā)送一個error事件,傳遞一個自定義的錯誤類型。

    let errorSeq = Observable<TestError>.error(TestError.Error1)
    

Share

通常在我們在訂閱一個可觀察序列的時候,每一次的訂閱行為都是獨立的,也就是說:

let seq = Observable.of(1, 2)
// 1
seq.subscribe { (event) in
    print("sub 1: \(event)")
}
// 2
seq.subscribe { (event) in
    print("sub 2: \(event)")
}
---- example output ----
sub 1: next(1)
sub 1: next(2)
sub 1: completed 
sub 2: next(1)
sub 2: next(2)
sub 2: completed 

我們連續(xù)訂閱同一序列兩次,每次都會接收到相同的事件,第二次訂閱時并沒有因為第一次訂閱的行為導致元素"耗盡"。有些時候我們希望讓所有的觀察者都共享同一份事件,這個時候可以使用share

  • shareshareObservableType協議的一個擴展方法,它返回一個可觀察序列,該序列的所有觀察者都會共享同一份訂閱,上面的代碼加上share之后:

    let seq = Observable.of(1, 2).share()
    // 1
    seq.subscribe { (event) in
        print("sub 1: \(event)")
    }
    // 2
    seq.subscribe { (event) in
        print("sub 2: \(event)")
    }
    ---- example output ----
    sub 1: next(1)
    sub 1: next(2)
    sub 1: completed 
    sub 2: completed 
    

    可以看到,在第一次訂閱時序列已經將所有的事件發(fā)送,后面再進行第二次訂閱的時候只收到了一個完成事件。

  • shareReplayshareReplay的用法與share類似,它的方法簽名如下:

    func shareReplay(_ bufferSize: Int) -> Observable<Element>
    

    不同的地方在于,shareReplay接收一個整型參數bufferSize,指定緩沖區(qū)大小,訂閱該序列的觀察者會立即收到最近bufferSize條事件。

序列的變換和組合

在Swift的序列Sequence中,可以使用map、flatMap和reduce等常見的函數式方法對其中的元素進行變換,RxSwift中的可觀察序列同樣也支持這些方法。

變換

  • map:這是map方法的簽名:

    func map<Result>(_ transform: @escaping (E) throws -> Result) -> Observable<Result>
    

    在一個自定義的閉包中對序列的每一個元素進行變換,返回一個包含轉換后結果的可觀察序列,與Swift中Sequence的map類似。

    let mappedSeq: Observable<String> = seq.map { (element) -> String in
      return "value: \(element)"
    }
    
  • flatMap:先來看看flatMap的簽名:

    func flatMap<O: ObservableConvertibleType>(_ selector: @escaping (E) throws -> O)
            -> Observable<O.E> 
    

    關于flatMap的作用同樣可以類比Sequence,Sequence中的flatMap閉包遍歷每一個元素進行處理后返回一個新的序列,最后會將這些序列"展平",得到一個包含所有序列元素的新序列:

    let array = [1, 2]
    let res = array.flatMap { (n) -> [String] in
        return ["\(n)a", "\(n)b"]
    }
    // res: ["1a", "1b", "2a", "2b"]
    

    RxSwift中的flatMap用法與之類似,flatMap中的閉包會遍歷可觀察序列中的所有元素,并返回一個新的可觀察序列,最后flatMap會返回一個包含所有元素的可觀察序列:

    let seq = Observable.of(1, 2)
        .flatMap { (n) -> Observable<String> in
            return Observable.of("\(n)a", "\(n)b") // (1)
        }
        .subscribe { (event) in
            print(event)
        }
    // 得到的seq類型為Observable<String>
    ---- example output ----
    next(1a)
    next(1b)
    next(2a)
    next(2b)
    completed
    

    在閉包中創(chuàng)建了若干個可觀察序列(1),這些序列中發(fā)送的next事件都會被傳遞到seq序列中,其中任何一個序列發(fā)生錯誤(發(fā)送了error事件)時,seq序列都會直接結束,不再繼續(xù)接收事件;但是只有所有序列都完成(發(fā)送了completed事件)后,seq序列才會正常結束。

  • flatMapLatest:作用與flatMap類似,但是對于閉包中生成的可觀察序列,它并不會保留所有的序列的訂閱,在遍歷結束后,只保留最后創(chuàng)建的序列的訂閱,之前創(chuàng)建的Observables都會取消訂閱(相應序列的dispose方法也會被調用):

    // 與上一個例子相同的代碼,僅將flatMap改成flatMapLatest
    let seq = Observable.of(1, 2)
        .flatMapLatest { (n) -> Observable<String> in
            return Observable.of("\(n)a", "\(n)b") // (1)
        }
        .subscribe { (event) in
            print(event)
        }
    ---- example output ----
    next(1a)
    next(2a)
    next(2b)
    completed
    

    因為訂閱關系的改變,現在只有當最后創(chuàng)建的那個Observable正常結束時,seq才會收到completed事件。

    在這種情況下,flatMapLatest會得到與flatMap相同的輸出:

    let seq = Observable.of(1, 2)
        .flatMapLatest { (n) -> Observable<String> in
            return Observable<String>.create({ (observer) -> Disposable in
                observer.onNext("\(n)a")
                observer.onNext("\(n)b")
    
                return Disposables.create { }
            })
        }
        .subscribe { (event) in
            print(event)
        }
    

    這是因為在上面的這個例子中所創(chuàng)建的Observable是同步創(chuàng)建元素的,無法被打斷。

    類似的方法還有flatMapFirst,使用方法可以類比flatMapLatest

  • reduce和scanreduce的作用與Sequence中定義的一樣,它接收一個初始值和一個閉包,在Observable中的每個值上調用該閉包,并將每一步的結果作為下一次調用的輸入:

    Observable.of(1, 2, 3).reduce(0) { (first, num) -> Float in
            return Float(first + num)
        }
        .subscribe { (event) in
            print(event)
        }
    // 輸出:next(6.0), completed
    

    在上面的代碼中,提供了一個初始值0,在閉包中計算和,并將結果序列的元素類型改成Float,序列的觀察者最后接收到所有元素的和。

    scan的作用類似于reduce,它跟reduce之間唯一的區(qū)別在于,scan會發(fā)送每一次調用閉包后的結果:

    Observable.of(1, 2, 3).scan(0) { (first, num) -> Float in
            return Float(first + num)
        }
        .subscribe { (event) in
            print(event)
        }
    // 輸出:next(1.0), next(3.0), next(6.0), completed
    

組合

  • startWith:在序列的開頭加入一個指定的元素

    Observable.of(2, 3).startWith(1).subscribe { (event) in
        print(event)
    }
    ---- example output ----
    next(1)
    next(2)
    next(3)
    completed
    

    訂閱該序列之后,會立即收到startWith指定的事件,即使此時序列并沒有開始發(fā)送事件。

  • merge:當你有多個類型相同的Observable,可以使用merge方法將它們合并起來,同時訂閱所有Observable中的事件:

    let seq1 = Observable.just(1)
    let seq2 = Observable.just(2)
    let seq = Observable.of(seq1, seq2).merge()
    seq.subscribe { (event) in
        print(event)
    }
    ---- example output ----
    next(1)
    next(2)
    completed
    

    只有當Observable中的元素也是Observable類型的時候才可以使用merge方法,當其中一個序列發(fā)生錯誤的時候,seq都會被終止,同樣的只有所有序列都完成之后,seq才會收到完成事件。

  • zipzip方法也可以將多個Observable合并在一起,與merge不同的是,zip提供了一個閉包用來對多個Observable中的元素進行組合變化,最后獲得一個新的序列:

    let seq1 = Observable.just(1)
    let seq2 = Observable.just(2)
    let seq: Observable<String> = Observable.zip(seq1, seq2) { (num1, num2) -> String in
        return "\(num1 + num2)"
    }
    seq.subscribe { (event) in
        print(event)
    }
    ---- example output ----
    next(3)
    completed
    

    zip方法按照參數個數的不同有多個版本,最多支持合并8個可觀察序列,需要注意的一點是,閉包所接收的參數是各個序列中對應位置的元素。也就是說,如果seq1發(fā)送了一個事件,而seq2發(fā)送了多個事件,閉包也只會被執(zhí)行一次,seq中只有一個元素。

    組合的Observable中任意一個發(fā)生錯誤,最后的seq都會直接出錯終止,當所有的Observable都發(fā)出completed事件后,seq才會正常結束。

  • combineLatestcombineLatest同樣用于將多個序列組合成一個,使用方法與zip一樣,但是它的調用機制跟zip不同,每當其中一個序列有新元素時,combineLatest都會從其他所有序列中取出最后一個元素,傳入閉包中生成新的元素添加到結果序列中。

Subject

Subject對象相當于一種中間的代理和橋梁的作用,它既是觀察者又是可觀察序列,在向一個Subject對象添加觀察者之后,可以通過該Subject向其發(fā)送事件。Subject對象并不會主動發(fā)送completed事件,并且在發(fā)送了error或completed事件之后,Subject中的序列會直接終結,無法再發(fā)送新的消息。Subject同樣也分為幾種類型:

  • PublishSubjectPublishSubject的訂閱者只會收到在其訂閱(subscribe)之后發(fā)送的事件

    let subject = PublishSubject<Int>()
    subject.onNext(1)
    subject.subscribe { (event) in
        print(event)
    }
    subject.onNext(2)
    
    ---- example output ----
    next(2)
    

    可以看到,觀察者只收到了事件2,在訂閱之前發(fā)送的事件1并沒有接收到。

  • ReplaySubjectReplaySubject在初始化時指定一個大小為n的緩沖區(qū),里面會保存最近發(fā)送的n條事件,在訂閱之后,觀察者會立即收到緩沖區(qū)中的事件:

    let subject = ReplaySubject<Int>.create(bufferSize: 2)
    subject.onNext(1)
    subject.subscribe { (event) in
        print(event)
    }
    subject.onNext(2)
    
    ---- example output ----
    next(1)
    next(2)
    
  • BehaviorSubjectBehaviorSubject在初始化時需要提供一個默認值,在訂閱時觀察者會立刻收到序列上一次發(fā)送的事件,如果沒有發(fā)送過事件則會收到默認值:

    let subject = BehaviorSubject(value: 1)
    subject.subscribe { (event) in
        print(event)
    }
    
    ---- example output ----
    next(1)
    
  • VariableVariable是對BehaviorSubject的一個封裝,行為上與BehaviorSubject類似。Variable沒有on之類的方法來發(fā)送事件,取而代之的是一個value屬性,向value賦值可以向觀察者發(fā)送next事件,并且訪問value可以獲取最后一次發(fā)送的數據:

    let variable = Variable(1)
    variable.asObservable().subscribe { (event) in
        print(event)
    }
    variable.value = 2
    
    ---- example output ----
    next(1)
    next(2)
    completed
    

    與其他Subject類型不同的是,Variable在釋放的時候會發(fā)送completed事件,并且Variable對象永遠不會發(fā)送error事件。

Scheduler

Scheduler是RxSwift中進行多線程編程的一種方式,一個Observable在執(zhí)行的時候會指定一個Scheduler,這個Scheduler決定了在哪個線程對序列進行操作以及事件回調。默認情況下,在訂閱Observable之后,觀察者會在與調用subscribe方法時相同的線程收到通知,并且也會在該線程進行銷毀(dispose)。

與GCD類似,Scheduler分為串行(serial)和并行(concurrent)兩種類型,RxSwift中定義了幾種Schedular:

  • CurrentThreadScheduler:這是默認的Scheduler,代表了當前的線程,serial類型。
  • MainScheduler:表示主線程,serial類型
  • SerialDispatchQueueScheduler:提供了一些快捷的方法來創(chuàng)建串行Scheduler,內部封裝了DispatchQueue
  • ConcurrentDispatchQueueScheduler:提供了快捷的方法來創(chuàng)建并行Scheduler,同樣封裝了DispatchQueue

subscribeOn和observeOn

subscribeOnobserveOn是其中兩個最重要的方法,它們可以改變Observable所在的Scheduler:

// main thread
let scheduler = ConcurrentDispatchQueueScheduler(qos: .default)
let seq = Observable.of(1, 2)
seq.subscribeOn(scheduler)
    .map {
        return $0 * 2 // 子線程
    }
    .subscribe { (event) in
        print(event) // 子線程
    }

在上面的代碼中創(chuàng)建了一個并發(fā)的Scheduler,并在序列seq上調用subscribeOn指定了該Scheduler,可以看到,我們在主線程中訂閱該序列,但是map方法以及事件的回調都是在創(chuàng)建的子線程中執(zhí)行。

subscribeOnobserveOn都可以指定序列的Scheduler,它們之間的區(qū)別在于:

  • subscribeOn設定了整個序列開始的時候所在的Scheduler,序列在創(chuàng)建以及之后的操作都會在這個Scheduler上進行,subscribeOn在整個鏈式調用中只能調用一次,之后再次調用subscribeOn沒有任何效果。
  • observeOn指定一個Scheduler,在這之后的操作都會被派發(fā)到這個Scheduler上執(zhí)行,observeOn可以在鏈式操作的中間改變Scheduler
createObservable().
    .doSomething()
    .subscribeOn(scheduler1) // (1)
    .doSomethingElse()
    .observeOn(scheduler2) // (2)
    .doAnother()
    ...

如上代碼,在(1)處執(zhí)行了subscribeOn之后,之前的操作createObservable()和doSomething()都會在scheduler1中執(zhí)行,隨后的doSomethingElse()同樣也在scheduler1中執(zhí)行,隨后用observeOn指定了另外一個scheduler2,之后的doAnother()會在scheduler2上執(zhí)行。

為原有代碼添加Rx擴展

RxSwift中提供了一種擴展機制,可以很方便的為原有的代碼添加上Rx擴展。首先來看一個結構體Reactive

public struct Reactive<Base> {
    /// base是擴展的對象實例
    public let base: Base
    
    public init(_ base: Base) {
        self.base = base
    }
}

Reactive是一個泛型結構體,只定義了一個屬性base,并且在初始化結構體的時候傳入該屬性的值。

此外還定義了一個協議ReactiveCompatible

public protocol ReactiveCompatible {
    associatedtype CompatibleType

    static var rx: Reactive<CompatibleType>.Type { get set }
    var rx: Reactive<CompatibleType> { get set }
}

該協議中分別為類對象和實例對象定義一個名字相同的屬性:rx,類型為上面定義的Reactive,隨后通過協議擴展為其提供了get的默認的實現:

extension ReactiveCompatible {
    public static var rx: Reactive<Self>.Type {
        get {
            return Reactive<Self>.self
        }
        set {
            // this enables using Reactive to "mutate" base type
        }
    }

    public var rx: Reactive<Self> {
        get {
            return Reactive(self)
        }
        set {
            // this enables using Reactive to "mutate" base object
        }
    }
}

關聯類型CompatibleType被自動推導為實現該協議的類本身,使用self初始化一個Reactive對象。

最后通過協議擴展為所有的NSObject類型實現了ReactiveCompatible協議:

extension NSObject: ReactiveCompatible { }

這樣一來,代碼中所有繼承自NSObject的類型實例中都會有一個類型為Reactive的屬性rx,當我們要為自己的類型添加Rx擴展時,只需要通過擴展向Reactive中添加方法就可以了,例如向UIButton類型添加擴展:

extension Reactive where Base: UIButton { // 為Reactive<UIButton>添加擴展
    public var tap: ControlEvent<Void> {
        return controlEvent(.touchUpInside) // 通過base可以訪問該實例本身
    }
}

由于Reactive是一個泛型類型,我們可以通過where語句指定泛型的類型,這樣一來,我們就可以在UIButton實例的rx中訪問tap屬性了:

let button = UIButton(...)
button.rx.tap

類似RxCocoa這樣的RxSwift擴展庫都是通過這種方式進行Rx擴展的。

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內容

  • 發(fā)現 關注 消息 RxSwift入坑解讀-你所需要知道的各種概念 沸沸騰關注 2016.11.27 19:11*字...
    楓葉1234閱讀 2,934評論 0 2
  • 我從去年開始使用 RxJava ,到現在一年多了。今年加入了 Flipboard 后,看到 Flipboard 的...
    Jason_andy閱讀 5,740評論 7 62
  • 本文章內部分圖片資源來自RayWenderlich.com 本文結合自己的理解來總結介紹一下RxSwift最基本的...
    FKSky閱讀 3,029評論 4 14
  • 吃吃的快樂,吃吃的愛 01 經典的主食以外,新加坡的面點和小吃,也是毫不遜色。融合東南亞各國的特色,在這里交匯,組...
    洪一念閱讀 581評論 3 5
  • 已經開學一個周了,今天下午是我們開學的第一節(jié)體育課。今天體育課的內容是:學習蹲打籃球。 上課了,...
    zhoukewen閱讀 690評論 0 0

友情鏈接更多精彩內容