RxSwift deferred

網(wǎng)上只有deferred的使用,鮮有其源碼流程,因其十分簡潔,故詳細(xì)貼上步驟,如有誤,請留言指出,謝謝

let obv_defer = Observable<Int>.deferred { () -> Observable<Int> in
            return Observable.just(1)
        }        
obv_defer.subscribe(onNext:{
            print($0)
            }).disposed(by: disposeBag)
貼出關(guān)鍵步驟:

deferred后返回return Deferred(observableFactory: observableFactory),保存了工廠閉包,subscribe了AnonymousObserver后,來到deferred的run方法

override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable)
             where Observer.Element == Source.Element {
        let sink = DeferredSink(observableFactory: self._observableFactory, observer: observer, cancel: cancel)
        let subscription = sink.run()
        return (sink: sink, subscription: subscription)
    }

生成一個繼承于Sink的DeferredSink,然后sink走run

func run() -> Disposable {
        do {
            let result = try self._observableFactory()
            return result.subscribe(self)
        }
        catch let e {
            self.forwardOn(.error(e))
            self.dispose()
            return Disposables.create()
        }
    }

這里是核心,也就是工廠里生成新Observable的時機,執(zhí)行deferred保存的工廠閉包,拿到閉包返回的Observable,這里是Just(1),由工廠新生成的observable來subscribe自身,也就是deferredSink

override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
        observer.on(.next(self._element))
        observer.on(.completed)
        return Disposables.create()
    }

來到Just的subscribe方法,on出next后,來到deferredSink的on方法

func on(_ event: Event<Element>) {
        self.forwardOn(event)
        
        switch event {
        case .next:
            break
        case .error:
            self.dispose()
        case .completed:
            self.dispose()
        }
    }

forwardOn來到父類Sink

final func forwardOn(_ event: Event<Observer.Element>) {
        #if DEBUG
            self._synchronizationTracker.register(synchronizationErrorMessage: .default)
            defer { self._synchronizationTracker.unregister() }
        #endif
        if isFlagSet(self._disposed, 1) {
            return
        }
        self._observer.on(event)
    }

self._observer就是之前的AnonymousObserver

override func onCore(_ event: Event<Element>) {
        return self._eventHandler(event)
    }

AnonymousObserver只保存了一個閉包,也只有一個onCore方法,走到父類observerBase

func on(_ event: Event<Element>) {
        switch event {
        case .next:
            if load(self._isStopped) == 0 {
                self.onCore(event)
            }
        case .error, .completed:
            if fetchOr(self._isStopped, 1) == 0 {
                self.onCore(event)
            }
        }
    }

onCore后,調(diào)用AnonymousObserver的閉包,然后事件閉包onNext調(diào)用,走完最開始的print。最后是方法一系列返回,走完Just發(fā)送的completed,再一系列返回,走完BinaryDisposable,走完dispose,結(jié)束。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容