
就問此時此刻還有誰?45度仰望天空,該死!我這無處安放的魅力!
- RxSwift(1)—— 初探
- RxSwift(2)—— 核心邏輯源碼分析
- RxSwift(3)—— Observable序列的創(chuàng)建方式
- RxSwift(4)—— 高階函數(shù)(上)
- RxSwift(5)—— 高階函數(shù)(下)
- RxSwift(6)—— scheduler源碼解析(上)
- RxSwift(7)—— scheduler源碼解析(下)
- RxSwift(8)—— KVO底層探索(上)
- RxSwift(9)—— KVO底層探索(下)
- RxSwift(10)—— 場景序列總結(jié)
- RxSwift(11)—— 銷毀者-dispose源碼解析
- RxSwift(12)—— Subject即攻也守
- RxSwift(13)—— 爬過的坑
- RxSwift(14)—— MVVM雙向綁定
RxSwift目錄直通車--- 和諧學習,不急不躁!
在掌握前面序列以還有觀察者的前提下,我們今天來看一個非常特殊的類型-
Subject.為什么說它特殊呢?原因很簡單:Subject既可以做序列,也可以做觀察者!正是因為這一特性,所以在實際開發(fā)中被大量運用。下面我們一起來解讀一下這個特殊的Subject
即攻也守的原理
首先我們來看看:SubjectType的原理!
public protocol SubjectType : ObservableType {
// 關(guān)聯(lián)了觀察者類型,具備這個類型的能力
associatedtype SubjectObserverType : ObserverType
func asObserver() -> SubjectObserverType
}
-
SubjectType首先就是繼承了ObservableType,具有序列特性 - 關(guān)聯(lián)了觀察者類型,具備這個類型的能力
- 下面我們通過一個具體類型來感受一下
subject
// 1:初始化序列
let publishSub = PublishSubject<Int>()
// 2:發(fā)送響應(yīng)序列
publishSub.onNext(1)
// 3:訂閱序列
publishSub.subscribe { print("訂閱到了:",$0)}
.disposed(by: disposbag)
// 再次發(fā)送響應(yīng)
publishSub.onNext(2)
publishSub.onNext(3)
- 很明顯能夠訂閱信號(序列最基本的能力)
- 能夠發(fā)送響應(yīng),又是觀察者的能力
- 查看底層源碼分析
訂閱響應(yīng)流程
public override func subscribe -> Disposable {
self._lock.lock()
let subscription = self._synchronized_subscribe(observer)
self._lock.unlock()
return subscription
}
func _synchronized_subscribe -> Disposable {
// 省略不必要的代碼
let key = self._observers.insert(observer.on)
return SubscriptionDisposable(owner: self, key: key)
}
-
self._observers.insert(observer.on):通過一個集合添加進去所有的訂閱事件,很明顯在合適的地方一次性全部執(zhí)行 - 其中也返回這次訂閱的銷毀者,方便執(zhí)行善后工作:
synchronizedUnsubscribe->self._observers.removeKey(disposeKey)
mutating func removeKey(_ key: BagKey) -> T? {
if _key0 == key {
_key0 = nil
let value = _value0!
_value0 = nil
return value
}
if let existingObject = _dictionary?.removeValue(forKey: key) {
return existingObject
}
for i in 0 ..< _pairs.count where _pairs[i].key == key {
let value = _pairs[i].value
_pairs.remove(at: i)
return value
}
return nil
}
- 便利通過
key獲取響應(yīng)bag中的value,執(zhí)行集合移除 - 因為沒有相應(yīng)持有關(guān)系,達到自動釋放銷毀
發(fā)送信號流程
public func on(_ event: Event<Element>) {
dispatch(self._synchronized_on(event), event)
}
- 這個地方估計大家看起來麻煩惡心一點,但是你用心看不難體會
- 這里主要調(diào)用了
dispatch函數(shù),傳了兩個參數(shù):self._synchronized_on(event)和event - 查看
dispatch函數(shù)源碼
func dispatch<E>(_ bag: Bag) {
bag._value0?(event)
if bag._onlyFastPath {
return
}
let pairs = bag._pairs
for i in 0 ..< pairs.count {
pairs[i].value(event)
}
if let dictionary = bag._dictionary {
for element in dictionary.values {
element(event)
}
}
}
-
bag._value0?(event)首先執(zhí)行事件的回調(diào) - 判斷
bag._onlyFastPath的情況,默認會開啟快速通道! - 如果是開啟慢速通道,需要從剛剛添加進
bag包裹里面的匹配對挨個進行pairs[i].value(event),外界事件回調(diào),然后拿回外界封裝的閉包的閉包調(diào)用:element(event)
func _synchronized_on(_ event: Event<E>) -> Observers {
self._lock.lock(); defer { self._lock.unlock() }
switch event {
case .next:
if self._isDisposed || self._stopped {
return Observers()
}
return self._observers
case .completed, .error:
if self._stoppedEvent == nil {
self._stoppedEvent = event
self._stopped = true
let observers = self._observers
self._observers.removeAll()
return observers
}
return Observers()
}
}
- 這里如果
self._isDisposed || self._stopped成立就會返回一個空的集合,也就沒有序列的響應(yīng) - 在
.completed, .error都會改變狀態(tài)self._stopped = true,也就是說序列完成或者錯誤之后都無法再次響應(yīng)了 - 在
.completed, .error還會移除添加在集合里面的內(nèi)容
其實如果你對前面序列的流程掌握了,這個
subject的流程也不再話下,只是subject把訂閱流程和響應(yīng)流程都內(nèi)部實現(xiàn),所以也就沒有必要引入sink

各種Subject
PublishSubject
可以不需要初始來進行初始化(也就是可以為空),并且它只會向訂閱者發(fā)送在訂閱之后才接收到的元素。
// PublishSubject
// 1:初始化序列
let publishSub = PublishSubject<Int>() //初始化一個PublishSubject 裝著Int類型的序列
// 2:發(fā)送響應(yīng)序列
publishSub.onNext(1)
// 3:訂閱序列
publishSub.subscribe { print("訂閱到了:",$0)}
.disposed(by: disposbag)
// 再次發(fā)送響應(yīng)
publishSub.onNext(2)
publishSub.onNext(3)
-
信號:1是無法被訂閱的,只接受訂閱之后的響應(yīng)

BehaviorSubject
通過一個默認初始值來創(chuàng)建,當訂閱者訂閱BehaviorSubject時,會收到訂閱后Subject上一個發(fā)出的Event,如果還沒有收到任何數(shù)據(jù),會發(fā)出一個默認值。之后就和PublishSubject一樣,正常接收新的事件。
和publish 稍微不同就是behavior這個家伙有個存儲功能:存儲上一次的信號
// BehaviorSubject
// 1:創(chuàng)建序列
let behaviorSub = BehaviorSubject.init(value: 100)
// 2:發(fā)送信號
behaviorSub.onNext(2)
behaviorSub.onNext(3)
// 3:訂閱序列
behaviorSub.subscribe{ print("訂閱到了:",$0)}
.disposed(by: disposbag)
// 再次發(fā)送
behaviorSub.onNext(4)
behaviorSub.onNext(5)
// 再次訂閱
behaviorSub.subscribe{ print("訂閱到了:",$0)}
.disposed(by: disposbag)
- 當沒有信號的時候,會默認發(fā)送
信號:100 - 只能儲存一個信號:
信號2會被信號3覆蓋 - 訂閱信號之前能夠儲存信號
// 初始化
public init(value: Element) {
self._element = value
}
// 事件響應(yīng)
func _synchronized_on(_ event: Event<E>) -> Observers {
switch event {
case .next(let element):
self._element = element
case .error, .completed:
self._stoppedEvent = event
}
return self._observers
}
- 初始化的時候帶有一個屬性保存一個信號
- 事件響應(yīng):新事件會覆蓋原來的事件
- 其他流程和
publish一樣

ReplaySubject
ReplaySubject 發(fā)送源Observable 的所有事件無論observer什么時候開始訂閱。
// ReplaySubject
// 1:創(chuàng)建序列
let replaySub = ReplaySubject<Int>.create(bufferSize: 2)
// let replaySub = ReplaySubject<Int>.createUnbounded()
// 2:發(fā)送信號
replaySub.onNext(1)
replaySub.onNext(2)
replaySub.onNext(3)
replaySub.onNext(4)
// 3:訂閱序列
replaySub.subscribe{ print("訂閱到了:",$0)}
.disposed(by: disposbag)
// 再次發(fā)送
replaySub.onNext(7)
replaySub.onNext(8)
replaySub.onNext(9)
- 一個
bufferSize空間,想存儲多少次響應(yīng)就是多少次 - 其他流程照舊
- 源碼里面就是相對于
BehaviorSubject的儲存屬性變成了集合

AsyncSubject
AsyncSubject只發(fā)送由源Observable發(fā)送的最后一個事件,并且只在源Observable完成之后。(如果源Observable沒有發(fā)送任何值,AsyncSubject也不會發(fā)送任何值。)
// AsyncSubject
// 1:創(chuàng)建序列
let asynSub = AsyncSubject<Int>.init()
// 2:發(fā)送信號
asynSub.onNext(1)
asynSub.onNext(2)
// 3:訂閱序列
asynSub.subscribe{ print("訂閱到了:",$0)}
.disposed(by: disposbag)
// 再次發(fā)送
asynSub.onNext(3)
asynSub.onNext(4)
// asynSub.onError(NSError.init(domain: "lgcooci", code: 10086, userInfo: nil))
asynSub.onCompleted()
- 我們普通序列發(fā)送回來,都不會響應(yīng)!直到完成序列響應(yīng)
func _synchronized_on(_ event: Event<E>) -> (Observers, Event<E>) {
switch event {
case .next(let element):
self._lastElement = element
return (Observers(), .completed)
case .error:
self._stoppedEvent = event
let observers = self._observers
self._observers.removeAll()
return (observers, event)
case .completed:
let observers = self._observers
self._observers.removeAll()
if let lastElement = self._lastElement {
self._stoppedEvent = .next(lastElement)
return (observers, .next(lastElement))
}
else {
self._stoppedEvent = event
return (observers, .completed)
}
}
}
- 可以很清晰的看出,普通Next事件都是,元素的替換,根本沒有響應(yīng)出來
*complete事件發(fā)送到時候,就會把最新保存的self._lastElement當成事件值傳出去,響應(yīng).next(lastElement) - 如果沒有保存事件就發(fā)送完成事件:
.completed -
error事件會移空整個響應(yīng)集合:self._observers.removeAll()

Variable
Variable廢棄了,這里貼出代碼以供大家遇到老版本! 由于這個Variable的靈活性所以在開發(fā)里面應(yīng)用非常之多!
// Variable : 5.0已經(jīng)廢棄(BehaviorSubject 替換) - 這里板書 大家可以了解一下
// 1:創(chuàng)建序列
let variableSub = Variable.init(1)
// 2:發(fā)送信號
variableSub.value = 100
variableSub.value = 10
// 3:訂閱信號 })
variableSub.asObservable().subscribe{ print("訂閱到了:",$0)}
.disposed(by: disposbag)
// 再次發(fā)送
variableSub.value = 1000

BehaviorRelay
- 替換原來的
Variable - 可以儲存一個信號
- 隨時訂閱響應(yīng)
- 響應(yīng)發(fā)送的時候要注意:
behaviorR.accept(20)
let behaviorRelay = BehaviorRelay(value: 100)
behaviorRelay.subscribe(onNext: { (num) in
print(num)
.disposed(by: disposbag)
print("打印:\(behaviorRelay.value)")
behaviorRelay.accept(1000)
Subject在實際開發(fā)中,應(yīng)用非常的廣泛!平時很多時候都會在惆悵選擇什么序列更合適,那么聰明的你一定要掌握底層的原理,并不說你背下特色就能真正開發(fā)的,因為如果后面一旦發(fā)生了BUG,你根本無法解決。作為iOS中高級發(fā)開人員一定要知其然,而知其所以然!碌碌無為的應(yīng)用層開發(fā)畢竟走不長遠!就問此時此刻還有誰?45度仰望天空,該死!我這無處安放的魅力!