RxSwift(12)—— Subject即攻也守

就問此時此刻還有誰?45度仰望天空,該死!我這無處安放的魅力!


RxSwift目錄直通車--- 和諧學習,不急不躁!


在掌握前面序列以還有觀察者的前提下,我們今天來看一個非常特殊的類型-Subject.為什么說它特殊呢?原因很簡單:Subject既可以做序列,也可以做觀察者!正是因為這一特性,所以在實際開發(fā)中被大量運用。下面我們一起來解讀一下這個特殊的Subject

即攻也守的原理

首先我們來看看:SubjectType的原理!

public protocol SubjectType : ObservableType {
      // 關(guān)聯(lián)了觀察者類型,具備這個類型的能力
    associatedtype SubjectObserverType : ObserverType
    func asObserver() -> SubjectObserverType
}
  • SubjectType首先就是繼承了ObservableType,具有序列特性
  • 關(guān)聯(lián)了觀察者類型,具備這個類型的能力
  • 下面我們通過一個具體類型來感受一下subject
// 1:初始化序列
let publishSub = PublishSubject<Int>() 
// 2:發(fā)送響應(yīng)序列
publishSub.onNext(1)
// 3:訂閱序列
publishSub.subscribe { print("訂閱到了:",$0)}
    .disposed(by: disposbag)
// 再次發(fā)送響應(yīng)
publishSub.onNext(2)
publishSub.onNext(3)
  • 很明顯能夠訂閱信號(序列最基本的能力)
  • 能夠發(fā)送響應(yīng),又是觀察者的能力
  • 查看底層源碼分析

訂閱響應(yīng)流程

public override func subscribe -> Disposable {
    self._lock.lock()
    let subscription = self._synchronized_subscribe(observer)
    self._lock.unlock()
    return subscription
}

func _synchronized_subscribe -> Disposable  {
    // 省略不必要的代碼
    let key = self._observers.insert(observer.on)
    return SubscriptionDisposable(owner: self, key: key)
}
  • self._observers.insert(observer.on): 通過一個集合添加進去所有的訂閱事件,很明顯在合適的地方一次性全部執(zhí)行
  • 其中也返回這次訂閱的銷毀者,方便執(zhí)行善后工作: synchronizedUnsubscribe->self._observers.removeKey(disposeKey)
mutating func removeKey(_ key: BagKey) -> T? {
    if _key0 == key {
        _key0 = nil
        let value = _value0!
        _value0 = nil
        return value
    }

    if let existingObject = _dictionary?.removeValue(forKey: key) {
        return existingObject
    }

    for i in 0 ..< _pairs.count where _pairs[i].key == key {
        let value = _pairs[i].value
        _pairs.remove(at: i)
        return value
    }
    return nil
}
  • 便利通過key獲取響應(yīng)bag中的value,執(zhí)行集合移除
  • 因為沒有相應(yīng)持有關(guān)系,達到自動釋放銷毀

發(fā)送信號流程

public func on(_ event: Event<Element>) {
    dispatch(self._synchronized_on(event), event)
}
  • 這個地方估計大家看起來麻煩惡心一點,但是你用心看不難體會
  • 這里主要調(diào)用了dispatch函數(shù),傳了兩個參數(shù):self._synchronized_on(event)event
  • 查看dispatch函數(shù)源碼
func dispatch<E>(_ bag: Bag) {
    bag._value0?(event)

    if bag._onlyFastPath {
        return
    }

    let pairs = bag._pairs
    for i in 0 ..< pairs.count {
        pairs[i].value(event)
    }

    if let dictionary = bag._dictionary {
        for element in dictionary.values {
            element(event)
        }
    }
}
  • bag._value0?(event)首先執(zhí)行事件的回調(diào)
  • 判斷bag._onlyFastPath的情況,默認會開啟快速通道!
  • 如果是開啟慢速通道,需要從剛剛添加進bag包裹里面的匹配對挨個進行pairs[i].value(event),外界事件回調(diào),然后拿回外界封裝的閉包的閉包調(diào)用:element(event)
func _synchronized_on(_ event: Event<E>) -> Observers {
    self._lock.lock(); defer { self._lock.unlock() }
    switch event {
    case .next:
        if self._isDisposed || self._stopped {
            return Observers()
        }
        
        return self._observers
    case .completed, .error:
        if self._stoppedEvent == nil {
            self._stoppedEvent = event
            self._stopped = true
            let observers = self._observers
            self._observers.removeAll()
            return observers
        }

        return Observers()
    }
}
  • 這里如果self._isDisposed || self._stopped成立就會返回一個空的集合,也就沒有序列的響應(yīng)
  • .completed, .error都會改變狀態(tài)self._stopped = true,也就是說序列完成或者錯誤之后都無法再次響應(yīng)了
  • .completed, .error還會移除添加在集合里面的內(nèi)容

其實如果你對前面序列的流程掌握了,這個subject的流程也不再話下,只是subject 把訂閱流程和響應(yīng)流程都內(nèi)部實現(xiàn),所以也就沒有必要引入sink

各種Subject

PublishSubject

可以不需要初始來進行初始化(也就是可以為空),并且它只會向訂閱者發(fā)送在訂閱之后才接收到的元素。

// PublishSubject
// 1:初始化序列
let publishSub = PublishSubject<Int>() //初始化一個PublishSubject 裝著Int類型的序列
// 2:發(fā)送響應(yīng)序列
publishSub.onNext(1)
// 3:訂閱序列
publishSub.subscribe { print("訂閱到了:",$0)}
    .disposed(by: disposbag)
// 再次發(fā)送響應(yīng)
publishSub.onNext(2)
publishSub.onNext(3)
  • 信號:1是無法被訂閱的,只接受訂閱之后的響應(yīng)

BehaviorSubject

通過一個默認初始值來創(chuàng)建,當訂閱者訂閱BehaviorSubject時,會收到訂閱后Subject上一個發(fā)出的Event,如果還沒有收到任何數(shù)據(jù),會發(fā)出一個默認值。之后就和PublishSubject一樣,正常接收新的事件。

publish 稍微不同就是behavior這個家伙有個存儲功能:存儲上一次的信號

// BehaviorSubject
// 1:創(chuàng)建序列
let behaviorSub = BehaviorSubject.init(value: 100)
// 2:發(fā)送信號
behaviorSub.onNext(2)
behaviorSub.onNext(3)
// 3:訂閱序列
behaviorSub.subscribe{ print("訂閱到了:",$0)}
    .disposed(by: disposbag)
// 再次發(fā)送
behaviorSub.onNext(4)
behaviorSub.onNext(5)
// 再次訂閱
behaviorSub.subscribe{ print("訂閱到了:",$0)}
    .disposed(by: disposbag)
  • 當沒有信號的時候,會默認發(fā)送 信號:100
  • 只能儲存一個信號:信號2 會被 信號3 覆蓋
  • 訂閱信號之前能夠儲存信號
// 初始化
public init(value: Element) {
      self._element = value
}

// 事件響應(yīng)
func _synchronized_on(_ event: Event<E>) -> Observers {

    switch event {
    case .next(let element):
        self._element = element
    case .error, .completed:
        self._stoppedEvent = event
    }
    return self._observers
}
  • 初始化的時候帶有一個屬性保存一個信號
  • 事件響應(yīng):新事件會覆蓋原來的事件
  • 其他流程和publish一樣

ReplaySubject

ReplaySubject 發(fā)送源Observable 的所有事件無論observer什么時候開始訂閱。

// ReplaySubject
// 1:創(chuàng)建序列
let replaySub = ReplaySubject<Int>.create(bufferSize: 2)
// let replaySub = ReplaySubject<Int>.createUnbounded()

// 2:發(fā)送信號
replaySub.onNext(1)
replaySub.onNext(2)
replaySub.onNext(3)
replaySub.onNext(4)

// 3:訂閱序列
replaySub.subscribe{ print("訂閱到了:",$0)}
    .disposed(by: disposbag)
// 再次發(fā)送
replaySub.onNext(7)
replaySub.onNext(8)
replaySub.onNext(9)
  • 一個bufferSize空間,想存儲多少次響應(yīng)就是多少次
  • 其他流程照舊
  • 源碼里面就是相對于BehaviorSubject的儲存屬性變成了集合

AsyncSubject

AsyncSubject只發(fā)送由源Observable發(fā)送的最后一個事件,并且只在源Observable完成之后。(如果源Observable沒有發(fā)送任何值,AsyncSubject也不會發(fā)送任何值。)

// AsyncSubject
// 1:創(chuàng)建序列
let asynSub = AsyncSubject<Int>.init()
// 2:發(fā)送信號
asynSub.onNext(1)
asynSub.onNext(2)
// 3:訂閱序列
asynSub.subscribe{ print("訂閱到了:",$0)}
    .disposed(by: disposbag)
// 再次發(fā)送
asynSub.onNext(3)
asynSub.onNext(4)
//        asynSub.onError(NSError.init(domain: "lgcooci", code: 10086, userInfo: nil))
asynSub.onCompleted()
  • 我們普通序列發(fā)送回來,都不會響應(yīng)!直到完成序列響應(yīng)
func _synchronized_on(_ event: Event<E>) -> (Observers, Event<E>) {
    switch event {
    case .next(let element):
        self._lastElement = element
        return (Observers(), .completed)
    case .error:
        self._stoppedEvent = event

        let observers = self._observers
        self._observers.removeAll()

        return (observers, event)
    case .completed:

        let observers = self._observers
        self._observers.removeAll()

        if let lastElement = self._lastElement {
            self._stoppedEvent = .next(lastElement)
            return (observers, .next(lastElement))
        }
        else {
            self._stoppedEvent = event
            return (observers, .completed)
        }
    }
}
  • 可以很清晰的看出,普通Next事件都是,元素的替換,根本沒有響應(yīng)出來
    *complete事件發(fā)送到時候,就會把最新保存的self._lastElement當成事件值傳出去,響應(yīng).next(lastElement)
  • 如果沒有保存事件就發(fā)送完成事件:.completed
  • error事件會移空整個響應(yīng)集合:self._observers.removeAll()

Variable

Variable廢棄了,這里貼出代碼以供大家遇到老版本! 由于這個Variable的靈活性所以在開發(fā)里面應(yīng)用非常之多!

// Variable : 5.0已經(jīng)廢棄(BehaviorSubject 替換) - 這里板書 大家可以了解一下
// 1:創(chuàng)建序列
let variableSub = Variable.init(1)
// 2:發(fā)送信號
variableSub.value = 100
variableSub.value = 10
// 3:訂閱信號        })

variableSub.asObservable().subscribe{ print("訂閱到了:",$0)}
    .disposed(by: disposbag)
// 再次發(fā)送
variableSub.value = 1000

BehaviorRelay

  • 替換原來的Variable
  • 可以儲存一個信號
  • 隨時訂閱響應(yīng)
  • 響應(yīng)發(fā)送的時候要注意:behaviorR.accept(20)
let behaviorRelay = BehaviorRelay(value: 100)
behaviorRelay.subscribe(onNext: { (num) in
    print(num)
.disposed(by: disposbag)
print("打印:\(behaviorRelay.value)")

behaviorRelay.accept(1000)

Subject在實際開發(fā)中,應(yīng)用非常的廣泛!平時很多時候都會在惆悵選擇什么序列更合適,那么聰明的你一定要掌握底層的原理,并不說你背下特色就能真正開發(fā)的,因為如果后面一旦發(fā)生了BUG,你根本無法解決。作為iOS中高級發(fā)開人員一定要知其然,而知其所以然!碌碌無為的應(yīng)用層開發(fā)畢竟走不長遠!

就問此時此刻還有誰?45度仰望天空,該死!我這無處安放的魅力!

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容