在上一篇文章中,主要描述了RxSwift的核心邏輯,也就是一個序列從創(chuàng)建到訂閱然后從發(fā)送消息到接收消息的整個流程是怎樣串聯(lián)起來的。還不太理解的同學(xué)可以移步到上一篇文章了解一下。
這篇文章主要來分析一下RxSwift的幾個核心類和協(xié)議的實現(xiàn)和設(shè)計。
Observable類解析
Observable是可觀察序列,是所有可觀察序列的基類,我們不會直接使用Observable這個類,一般都是使用子類。Observable也可以理解成抽象類,實際上不是抽象類,因為可觀察序列最重要的一個訂閱序列的方法subscribe必須在其子類中重寫。
我們先來看看Observable的源碼:
public class Observable<Element> : ObservableType {
/// Type of elements in sequence.
public typealias E = Element
init() {
#if TRACE_RESOURCES
_ = Resources.incrementTotal()
#endif
}
public func subscribe<O: ObserverType>(_ observer: O) -> Disposable where O.E == E {
rxAbstractMethod()
}
public func asObservable() -> Observable<E> {
return self
}
deinit {
#if TRACE_RESOURCES
_ = Resources.decrementTotal()
#endif
}
internal func composeMap<R>(_ transform: @escaping (Element) throws -> R) -> Observable<R> {
return _map(source: self, transform: transform)
}
}
-
Observable實現(xiàn)了一個協(xié)議ObservableType,而且ObservableType協(xié)議繼承自ObservableConvertibleType協(xié)議,所以在Observable中實現(xiàn)了兩個協(xié)議方法:subscribe和asObservable。 -
subscribe方法沒有具體實現(xiàn)的邏輯,需要子類去實現(xiàn)。 -
asObservable方法返回的是self,看似用處不大,其實不是這樣的。asObservable是非常有用的,如果一類是Observable的子類,我們可以直接返回self,如果不是Observable的子類,我們可以通過重寫這個協(xié)議方法來返回一個Observable對象,這樣保證了協(xié)議的一致性。在使用的時候我們可以直接寫類似self.asObservable().subscribe(observer)這樣的代碼,有利于保持代碼的簡潔性,是良好的封裝性的體現(xiàn)。所以我覺得這個設(shè)計非常的好,在我們?nèi)粘i_發(fā)中也可以借鑒。 -
_ = Resources.incrementTotal()和_ = Resources.decrementTotal()這兩行代碼其實是RxSwift內(nèi)部實現(xiàn)的一個引用計數(shù)。這部分內(nèi)容我會在后面的文章中再詳解。 -
composeMap<R>優(yōu)化map的一個函數(shù),不太理解用處。 -
Observable子類非常多,這里不一一去看,主要區(qū)別在于對subscribe方法的實現(xiàn)不一樣。
AnonymousObservableSink類解析
final private class AnonymousObservableSink<O: ObserverType>: Sink<O>, ObserverType {
typealias E = O.E
typealias Parent = AnonymousObservable<E>
// state
private let _isStopped = AtomicInt(0)
#if DEBUG
fileprivate let _synchronizationTracker = SynchronizationTracker()
#endif
override init(observer: O, cancel: Cancelable) {
super.init(observer: observer, cancel: cancel)
}
func on(_ event: Event<E>) {
#if DEBUG
self._synchronizationTracker.register(synchronizationErrorMessage: .default)
defer { self._synchronizationTracker.unregister() }
#endif
switch event {
case .next:
if load(self._isStopped) == 1 {
return
}
self.forwardOn(event)
case .error, .completed:
if fetchOr(self._isStopped, 1) == 0 {
self.forwardOn(event)
self.dispose()
}
}
}
func run(_ parent: Parent) -> Disposable {
return parent._subscribeHandler(AnyObserver(self))
}
}
-
AnonymousObservableSink是Sink的子類,AnonymousObservableSink本身遵守ObseverType協(xié)議,與此同時實現(xiàn)了run方法,雖然沒有實現(xiàn)subscribe方法,但是已經(jīng)足夠了,這樣AnonymousObservableSink從某種程度來說也是Observable。 -
AnonymousObservableSink是Observer和Observable的銜接的橋梁,也可以理解成管道。它存儲了_observer和銷毀者_cancel。通過sink就可以完成從Observable到Obsever的轉(zhuǎn)變。 - 在
run方法中的這行代碼parent._subscribeHandler(AnyObserver(self)),其中parent是一個AnonymousObservable對象。_subscribeHandler這個block調(diào)用,代碼會執(zhí)行到創(chuàng)建序列時的block。然后會調(diào)用發(fā)送信號的代碼obserber.onNext("發(fā)送信號"),然后代碼會經(jīng)過幾個中間步驟會來到AnonymousObservableSink類的on方法。
有問題或者建議和意見,歡迎大家評論或者私信。
喜歡的朋友可以點下關(guān)注和喜歡,后續(xù)會持續(xù)更新文章。