03. RxSwift源碼解讀:Sink 和 Queue Scheduler

今天要解讀的源碼是隊列調(diào)度,同時探討下Sink的設(shè)計思想,我們可以指定訂閱和序列發(fā)送操作在哪個隊列上執(zhí)行,以observe(on:)subscribe(on:)兩個操作符為例,探究一下內(nèi)部原理。
observe指定在哪個隊列接受序列,而subscribe是指定創(chuàng)建序列的閉包在哪個隊列執(zhí)行。下面給出一個例子:

示例

        Observable<Int>.create { (anyObserver) -> Disposable in
            print("Subscribe Thread:", Thread.current)
            anyObserver.onNext(1)
            anyObserver.onCompleted()
            return Disposables.create()
        }
        .observe(on: MainScheduler.instance)
        .subscribe(on: SerialDispatchQueueScheduler(qos: .background))
        .subscribe(onNext: { ele in
            print("Observe Thread:", Thread.current)
            print(ele)
        }, onDisposed: {
            print("disposed2")
        })
        .disposed(by: bag)

打印結(jié)果:

Subscribe Thread: <NSThread: 0x600003c45380>{number = 5, name = (null)}
Observe Thread: <NSThread: 0x600003c047c0>{number = 1, name = main}
1
disposed2

可見因為指定subscribeSerialDispatchQueueScheduler(串行隊列執(zhí)行),所以第一條打印的線程不是主線程,而observe指定在主隊列執(zhí)行,所以第二條打印的線程是主線程。

源碼解讀

我們看一下.observe(on: MainScheduler.instance)的內(nèi)部實現(xiàn),在ObservableType的extension中可以找到代碼:

    public func observe(on scheduler: ImmediateSchedulerType)
        -> Observable<Element> {
        guard let serialScheduler = scheduler as? SerialDispatchQueueScheduler else {
            return ObserveOn(source: self.asObservable(), scheduler: scheduler)
        }

        return ObserveOnSerialDispatchQueue(source: self.asObservable(),
                                            scheduler: serialScheduler)
    }

如果是同步隊列調(diào)度者,則返回ObserveOnSerialDispatchQueue對象,將當前Observablescheduler傳入,否則返回ObserveOn對象,這兩個類都是Observable,它們都繼承了Producer,通過這種方式實現(xiàn)鏈式調(diào)用,可以繼續(xù)調(diào)用其他操作符,每個操作符都內(nèi)部都有對應的ObservableType實現(xiàn)類,它們一般會重寫run方法,而且還有對應的Sink類,用來實現(xiàn)操作符的功能,比如ObserveOn類有一個對應的ObserveOnSink類,ObserveOnSink有自己的run方法,同樣ObserveOnSerialDispatchQueue類有對應的ObserveOnSerialDispatchQueueSink類,這些Sink類繼承自ObserverBase類,最終都實現(xiàn)了ObserverType,能發(fā)送序列。
在看看subscribe(on:)

    public func subscribe(on scheduler: ImmediateSchedulerType)
        -> Observable<Element> {
        SubscribeOn(source: self, scheduler: scheduler)
    }

同理返回一個SubscribeOn對象,將當前對象和scheduler傳入。SubscribeOn繼承自Producer,對應有一個SubscribeOnSink類,它繼承自Sink類;因為它需要使用SinkforwardOn方法。

當我們調(diào)用.subscribe(onNext方法時,程序依然會走到Producersubscribe方法,這個流程沒有變,然后調(diào)用當前對象run方法,因為當前對象已經(jīng)再是AnonymousObservable對象了,而是 SubscribeOn, 而且SubscribeOn重寫了run方法,所以會調(diào)用SubscribeOnrun方法,這是面向?qū)ο蟮亩鄳B(tài),然后我們進入SubscribeOnrun方法看看:

    override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Ob.Element {
        let sink = SubscribeOnSink(parent: self, observer: observer, cancel: cancel)
        let subscription = sink.run()
        return (sink: sink, subscription: subscription)
    }

這里依然會創(chuàng)建sink對象,但是不再是AnonymousObservableSink,而是SubscribeOnSink,它來完成這個操作符的功能;這個Sink保存了SubscribeOn對象,observercancelobserver依然還是剛開始創(chuàng)建的AnonymousObserver對象,然后調(diào)用了SubscribeOnSinkrun方法:

     func run() -> Disposable {
        let disposeEverything = SerialDisposable()
        let cancelSchedule = SingleAssignmentDisposable()
        
        disposeEverything.disposable = cancelSchedule
        
        let disposeSchedule = self.parent.scheduler.schedule(()) { _ -> Disposable in
            let subscription = self.parent.source.subscribe(self)
            disposeEverything.disposable = ScheduledDisposable(scheduler: self.parent.scheduler, disposable: subscription)
            return Disposables.create()
        }

        cancelSchedule.setDisposable(disposeSchedule)
    
        return disposeEverything
    }

SerialDisposable: 表示一個可釋放資源,其底層可釋放資源可被另一個可釋放資源替換,從而導致前一個底層可釋放資源的自動釋放。
SingleAssignmentDisposable: 表示只允許對其底層可釋放資源進行一次賦值的可釋放資源。如果已經(jīng)設(shè)置了底層可釋放資源,那么將來嘗試設(shè)置底層可釋放資源將拋出異常。
ScheduledDisposable:釋放資源時會在對應的隊列中調(diào)度執(zhí)行。
關(guān)鍵代碼:self.parent.scheduler.schedule, 這里parent是SubscribeOn對象,scheduler是調(diào)度者SerialDispatchQueueScheduler,然后調(diào)用它的schedule方法,去看看:

    public final func schedule<StateType>(_ state: StateType, action: @escaping (StateType) -> Disposable) -> Disposable {
        self.scheduleInternal(state, action: action)
    }

    func scheduleInternal<StateType>(_ state: StateType, action: @escaping (StateType) -> Disposable) -> Disposable {
        self.configuration.schedule(state, action: action)
    }

調(diào)到self.configuration.schedule:

    func schedule<StateType>(_ state: StateType, action: @escaping (StateType) -> Disposable) -> Disposable {
        let cancel = SingleAssignmentDisposable()

        self.queue.async {
            if cancel.isDisposed {
                return
            }


            cancel.setDisposable(action(state))
        }

        return cancel
    }

終于看到隊列調(diào)用了self.queue.async 將action異步派發(fā)到quene中執(zhí)行,而queue是在創(chuàng)建SerialDispatchQueueScheduler時創(chuàng)建的, SerialDispatchQueueScheduler明顯會創(chuàng)建串行隊列。這里有個小細節(jié),如果資源已經(jīng)被釋放了則不執(zhí)行。
cancel.setDisposable(action(state)) 設(shè)置釋放資源對象,如果已經(jīng)設(shè)置則拋出異常。
action 在哪里, 回到action定義的地方?:

        let disposeSchedule = self.parent.scheduler.schedule(()) { _ -> Disposable in
            let subscription = self.parent.source.subscribe(self)
            disposeEverything.disposable = ScheduledDisposable(scheduler: self.parent.scheduler, disposable: subscription)
            return Disposables.create()
        }

接著執(zhí)行self.parent.source.subscribe(self), 這個source是subscibe(:on,的調(diào)用者,即在鏈式調(diào)用序列中,先調(diào)用observe(:on)再調(diào)用subscibe(:on) 所以sourceObserveOnSerialDispatchQueue對象。

很多observable都會保存它的上一個observable,即source, 以此實現(xiàn)鏈式調(diào)用
ObserveOnSerialDispatchQueue對象現(xiàn)在要執(zhí)行subscribe方法,進入看看:

    return CurrentThreadScheduler.instance.schedule(()) { _ in
                let disposer = SinkDisposer()
                let sinkAndSubscription = self.run(observer, cancel: disposer)
                disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)

                return disposer
            }

又到這里來了,然又進入run,這里會進入ObserveOnSerialDispatchQueue的run方法:

    override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
        let sink = ObserveOnSerialDispatchQueueSink(scheduler: self.scheduler, observer: observer, cancel: cancel)
        let subscription = self.source.subscribe(sink)
        return (sink: sink, subscription: subscription)
    }

這個地方代碼不太一樣了,不再是調(diào)用sinkrun,因為ObserveOnSerialDispatchQueueSink只能處理observe而不能處理subscribe,只能轉(zhuǎn)發(fā)給source處理subscribe,所以調(diào)用self.source.subscribe, 這里的source是最初創(chuàng)建的未變形的AnonymousObservable對象,這相當于繞了一圈又回到了原來的流程,調(diào)用subscribe(sink),但是這里的sink是ObserveOnSerialDispatchQueueSink對象,它是Observer,看看這個會怎么影響后面的流程:

if !CurrentThreadScheduler.isScheduleRequired {
            // The returned disposable needs to release all references once it was disposed.
            let disposer = SinkDisposer()
            let sinkAndSubscription = self.run(observer, cancel: disposer)
            disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)

            return disposer
        }

這里調(diào)到了前面的if分支:因為isScheduleRequired用來標示在當前線程是否正在通過schedule執(zhí)行action,如果action內(nèi)部又在當前線程執(zhí)行了subscribe,則無需再調(diào)度到當前線程執(zhí)行,即CurrentThreadScheduler.instance.schedule不會也沒必要嵌套調(diào)用。

繼續(xù)調(diào)用AnonymousObservablerun方法,然后調(diào)用創(chuàng)建AnonymousObservableSink,調(diào)用run,最后執(zhí)行subscribeHandler(AnyObserver(self)),這里回到了熟悉的流程,不過因為ObserverObserveOnSerialDispatchQueueSink對象,所以發(fā)送序列時會調(diào)用ObserveOnSerialDispatchQueueSinkonCore方法:

    override func onCore(_ event: Event<Element>) {
        _ = self.scheduler.schedule((self, event), action: self.cachedScheduleLambda!)
    }

這里會做隊列調(diào)度,在對應的隊列(主隊列)中執(zhí)行action :

        self.cachedScheduleLambda = { pair in
            guard !cancel.isDisposed else { return Disposables.create() }

            pair.sink.observer.on(pair.event)

            if pair.event.isStopEvent {
                pair.sink.dispose()
            }

            return Disposables.create()
        }

接著調(diào)用observer.on(event),這里的sink就是當前這個ObserveOnSerialDispatchQueueSink對象,而observer是在創(chuàng)建ObserveOnSerialDispatchQueueSink對象時傳入的SubscribeOnSink,SubscribeOnSinkobserver對象是AnonymousObserver,因為它們都遵循了ObserverType,所以都可以作為ObserverType被其他Sink持有。

各類Sink遵循了ObserverType又持有了ObserverType,這樣Sink之間可以相互持有,調(diào)用協(xié)議方法時又可以調(diào)用observer的相同協(xié)議方法,這樣可以一直調(diào)用下去,跟裝飾器模式很像,當我需要在現(xiàn)有操作符基礎(chǔ)上再增加操作,無需修改原有操作符的邏輯代碼,通過擴展方式增加新的功能,不過這里的設(shè)計更復雜。

我們繼續(xù)看代碼,SubscribeOnSink實現(xiàn)了on方法,進去看看:

    func on(_ event: Event<Element>) {
        self.forwardOn(event)
        
        if event.isStopEvent {
            self.dispose()
        }
    }

然后進入Sink類的forwardOn:在forwardOn又又調(diào)用了observer.on(event), 這里observerAnonymousObserver,最終調(diào)用AnonymousObserveronCore,完成最后一擊:
調(diào)用self.eventHandler(event)

如果將observe(:on)subscribe(:on) 互換位置,subscribe(:on)先調(diào)用,observe(:on)后調(diào)用結(jié)果會怎么樣? 結(jié)果依然不變,但是內(nèi)部調(diào)用流程會不一樣,大家可以自己試試!

Scheduler

上面的例子中已經(jīng)見到了兩個Scheduler:

  • SerialDispatchQueueScheduler : 串行隊列調(diào)度者,
  • MainScheduler: 主隊列調(diào)度者,它繼承自SerialDispatchQueueScheduler
    還有一個并行隊列:
  • ConcurrentDispatchQueueScheduler: 并行隊列調(diào)度者
    每個調(diào)度者都會維護自己的隊列,MainScheduler維護一個主隊列DispatchQueue.main, SerialDispatchQueueScheduler維護一個串行隊列,ConcurrentDispatchQueueScheduler維護一個并行隊列; 當需要執(zhí)行action時,調(diào)度對應的隊列執(zhí)行action即可。
    這三個調(diào)度者都遵循了SchedulerType協(xié)議,SchedulerType繼承ImmediateSchedulerType, ImmediateSchedulerType有一個協(xié)議方法:
    func schedule<StateType>(_ state: StateType, action: @escaping (StateType) -> Disposable) -> Disposable用來執(zhí)行隊列調(diào)度。

Scheduler包含一個DispatchQueueConfiguration對象,DispatchQueueConfiguration對象持有隊列,當執(zhí)行調(diào)度方法時,會轉(zhuǎn)發(fā)到這個類的schedule進行實際的調(diào)度。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容