今天要解讀的源碼是隊列調(diào)度,同時探討下Sink的設(shè)計思想,我們可以指定訂閱和序列發(fā)送操作在哪個隊列上執(zhí)行,以observe(on:)和subscribe(on:)兩個操作符為例,探究一下內(nèi)部原理。
observe指定在哪個隊列接受序列,而subscribe是指定創(chuàng)建序列的閉包在哪個隊列執(zhí)行。下面給出一個例子:
示例
Observable<Int>.create { (anyObserver) -> Disposable in
print("Subscribe Thread:", Thread.current)
anyObserver.onNext(1)
anyObserver.onCompleted()
return Disposables.create()
}
.observe(on: MainScheduler.instance)
.subscribe(on: SerialDispatchQueueScheduler(qos: .background))
.subscribe(onNext: { ele in
print("Observe Thread:", Thread.current)
print(ele)
}, onDisposed: {
print("disposed2")
})
.disposed(by: bag)
打印結(jié)果:
Subscribe Thread: <NSThread: 0x600003c45380>{number = 5, name = (null)}
Observe Thread: <NSThread: 0x600003c047c0>{number = 1, name = main}
1
disposed2
可見因為指定subscribe在SerialDispatchQueueScheduler(串行隊列執(zhí)行),所以第一條打印的線程不是主線程,而observe指定在主隊列執(zhí)行,所以第二條打印的線程是主線程。
源碼解讀
我們看一下.observe(on: MainScheduler.instance)的內(nèi)部實現(xiàn),在ObservableType的extension中可以找到代碼:
public func observe(on scheduler: ImmediateSchedulerType)
-> Observable<Element> {
guard let serialScheduler = scheduler as? SerialDispatchQueueScheduler else {
return ObserveOn(source: self.asObservable(), scheduler: scheduler)
}
return ObserveOnSerialDispatchQueue(source: self.asObservable(),
scheduler: serialScheduler)
}
如果是同步隊列調(diào)度者,則返回ObserveOnSerialDispatchQueue對象,將當前Observable和scheduler傳入,否則返回ObserveOn對象,這兩個類都是Observable,它們都繼承了Producer,通過這種方式實現(xiàn)鏈式調(diào)用,可以繼續(xù)調(diào)用其他操作符,每個操作符都內(nèi)部都有對應的ObservableType實現(xiàn)類,它們一般會重寫run方法,而且還有對應的Sink類,用來實現(xiàn)操作符的功能,比如ObserveOn類有一個對應的ObserveOnSink類,ObserveOnSink有自己的run方法,同樣ObserveOnSerialDispatchQueue類有對應的ObserveOnSerialDispatchQueueSink類,這些Sink類繼承自ObserverBase類,最終都實現(xiàn)了ObserverType,能發(fā)送序列。
在看看subscribe(on:)
public func subscribe(on scheduler: ImmediateSchedulerType)
-> Observable<Element> {
SubscribeOn(source: self, scheduler: scheduler)
}
同理返回一個SubscribeOn對象,將當前對象和scheduler傳入。SubscribeOn繼承自Producer,對應有一個SubscribeOnSink類,它繼承自Sink類;因為它需要使用Sink的forwardOn方法。
當我們調(diào)用.subscribe(onNext方法時,程序依然會走到Producer的subscribe方法,這個流程沒有變,然后調(diào)用當前對象run方法,因為當前對象已經(jīng)再是AnonymousObservable對象了,而是 SubscribeOn, 而且SubscribeOn重寫了run方法,所以會調(diào)用SubscribeOn的run方法,這是面向?qū)ο蟮亩鄳B(tài),然后我們進入SubscribeOn的run方法看看:
override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Ob.Element {
let sink = SubscribeOnSink(parent: self, observer: observer, cancel: cancel)
let subscription = sink.run()
return (sink: sink, subscription: subscription)
}
這里依然會創(chuàng)建sink對象,但是不再是AnonymousObservableSink,而是SubscribeOnSink,它來完成這個操作符的功能;這個Sink保存了SubscribeOn對象,observer和cancel,observer依然還是剛開始創(chuàng)建的AnonymousObserver對象,然后調(diào)用了SubscribeOnSink的run方法:
func run() -> Disposable {
let disposeEverything = SerialDisposable()
let cancelSchedule = SingleAssignmentDisposable()
disposeEverything.disposable = cancelSchedule
let disposeSchedule = self.parent.scheduler.schedule(()) { _ -> Disposable in
let subscription = self.parent.source.subscribe(self)
disposeEverything.disposable = ScheduledDisposable(scheduler: self.parent.scheduler, disposable: subscription)
return Disposables.create()
}
cancelSchedule.setDisposable(disposeSchedule)
return disposeEverything
}
SerialDisposable: 表示一個可釋放資源,其底層可釋放資源可被另一個可釋放資源替換,從而導致前一個底層可釋放資源的自動釋放。
SingleAssignmentDisposable: 表示只允許對其底層可釋放資源進行一次賦值的可釋放資源。如果已經(jīng)設(shè)置了底層可釋放資源,那么將來嘗試設(shè)置底層可釋放資源將拋出異常。
ScheduledDisposable:釋放資源時會在對應的隊列中調(diào)度執(zhí)行。
關(guān)鍵代碼:self.parent.scheduler.schedule, 這里parent是SubscribeOn對象,scheduler是調(diào)度者SerialDispatchQueueScheduler,然后調(diào)用它的schedule方法,去看看:
public final func schedule<StateType>(_ state: StateType, action: @escaping (StateType) -> Disposable) -> Disposable {
self.scheduleInternal(state, action: action)
}
func scheduleInternal<StateType>(_ state: StateType, action: @escaping (StateType) -> Disposable) -> Disposable {
self.configuration.schedule(state, action: action)
}
調(diào)到self.configuration.schedule:
func schedule<StateType>(_ state: StateType, action: @escaping (StateType) -> Disposable) -> Disposable {
let cancel = SingleAssignmentDisposable()
self.queue.async {
if cancel.isDisposed {
return
}
cancel.setDisposable(action(state))
}
return cancel
}
終于看到隊列調(diào)用了self.queue.async 將action異步派發(fā)到quene中執(zhí)行,而queue是在創(chuàng)建SerialDispatchQueueScheduler時創(chuàng)建的, SerialDispatchQueueScheduler明顯會創(chuàng)建串行隊列。這里有個小細節(jié),如果資源已經(jīng)被釋放了則不執(zhí)行。
cancel.setDisposable(action(state)) 設(shè)置釋放資源對象,如果已經(jīng)設(shè)置則拋出異常。
action 在哪里, 回到action定義的地方?:
let disposeSchedule = self.parent.scheduler.schedule(()) { _ -> Disposable in
let subscription = self.parent.source.subscribe(self)
disposeEverything.disposable = ScheduledDisposable(scheduler: self.parent.scheduler, disposable: subscription)
return Disposables.create()
}
接著執(zhí)行self.parent.source.subscribe(self), 這個source是subscibe(:on,的調(diào)用者,即在鏈式調(diào)用序列中,先調(diào)用observe(:on)再調(diào)用subscibe(:on) 所以source是ObserveOnSerialDispatchQueue對象。
很多observable都會保存它的上一個observable,即source, 以此實現(xiàn)鏈式調(diào)用
ObserveOnSerialDispatchQueue對象現(xiàn)在要執(zhí)行subscribe方法,進入看看:
return CurrentThreadScheduler.instance.schedule(()) { _ in
let disposer = SinkDisposer()
let sinkAndSubscription = self.run(observer, cancel: disposer)
disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
return disposer
}
又到這里來了,然又進入run,這里會進入ObserveOnSerialDispatchQueue的run方法:
override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
let sink = ObserveOnSerialDispatchQueueSink(scheduler: self.scheduler, observer: observer, cancel: cancel)
let subscription = self.source.subscribe(sink)
return (sink: sink, subscription: subscription)
}
這個地方代碼不太一樣了,不再是調(diào)用sink的run,因為ObserveOnSerialDispatchQueueSink只能處理observe而不能處理subscribe,只能轉(zhuǎn)發(fā)給source處理subscribe,所以調(diào)用self.source.subscribe, 這里的source是最初創(chuàng)建的未變形的AnonymousObservable對象,這相當于繞了一圈又回到了原來的流程,調(diào)用subscribe(sink),但是這里的sink是ObserveOnSerialDispatchQueueSink對象,它是Observer,看看這個會怎么影響后面的流程:
if !CurrentThreadScheduler.isScheduleRequired {
// The returned disposable needs to release all references once it was disposed.
let disposer = SinkDisposer()
let sinkAndSubscription = self.run(observer, cancel: disposer)
disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
return disposer
}
這里調(diào)到了前面的if分支:因為
isScheduleRequired用來標示在當前線程是否正在通過schedule執(zhí)行action,如果action內(nèi)部又在當前線程執(zhí)行了subscribe,則無需再調(diào)度到當前線程執(zhí)行,即CurrentThreadScheduler.instance.schedule不會也沒必要嵌套調(diào)用。
繼續(xù)調(diào)用AnonymousObservable的run方法,然后調(diào)用創(chuàng)建AnonymousObservableSink,調(diào)用run,最后執(zhí)行subscribeHandler(AnyObserver(self)),這里回到了熟悉的流程,不過因為Observer是ObserveOnSerialDispatchQueueSink對象,所以發(fā)送序列時會調(diào)用ObserveOnSerialDispatchQueueSink的onCore方法:
override func onCore(_ event: Event<Element>) {
_ = self.scheduler.schedule((self, event), action: self.cachedScheduleLambda!)
}
這里會做隊列調(diào)度,在對應的隊列(主隊列)中執(zhí)行action :
self.cachedScheduleLambda = { pair in
guard !cancel.isDisposed else { return Disposables.create() }
pair.sink.observer.on(pair.event)
if pair.event.isStopEvent {
pair.sink.dispose()
}
return Disposables.create()
}
接著調(diào)用observer.on(event),這里的sink就是當前這個ObserveOnSerialDispatchQueueSink對象,而observer是在創(chuàng)建ObserveOnSerialDispatchQueueSink對象時傳入的SubscribeOnSink,SubscribeOnSink的observer對象是AnonymousObserver,因為它們都遵循了ObserverType,所以都可以作為ObserverType被其他Sink持有。
各類
Sink遵循了ObserverType又持有了ObserverType,這樣Sink之間可以相互持有,調(diào)用協(xié)議方法時又可以調(diào)用observer的相同協(xié)議方法,這樣可以一直調(diào)用下去,跟裝飾器模式很像,當我需要在現(xiàn)有操作符基礎(chǔ)上再增加操作,無需修改原有操作符的邏輯代碼,通過擴展方式增加新的功能,不過這里的設(shè)計更復雜。
我們繼續(xù)看代碼,SubscribeOnSink實現(xiàn)了on方法,進去看看:
func on(_ event: Event<Element>) {
self.forwardOn(event)
if event.isStopEvent {
self.dispose()
}
}
然后進入Sink類的forwardOn:在forwardOn中又又調(diào)用了observer.on(event), 這里observer是AnonymousObserver,最終調(diào)用AnonymousObserver的onCore,完成最后一擊:
調(diào)用self.eventHandler(event)
如果將
observe(:on)和subscribe(:on)互換位置,subscribe(:on)先調(diào)用,observe(:on)后調(diào)用結(jié)果會怎么樣? 結(jié)果依然不變,但是內(nèi)部調(diào)用流程會不一樣,大家可以自己試試!
Scheduler
上面的例子中已經(jīng)見到了兩個Scheduler:
-
SerialDispatchQueueScheduler: 串行隊列調(diào)度者, -
MainScheduler: 主隊列調(diào)度者,它繼承自SerialDispatchQueueScheduler
還有一個并行隊列: -
ConcurrentDispatchQueueScheduler: 并行隊列調(diào)度者
每個調(diào)度者都會維護自己的隊列,MainScheduler維護一個主隊列DispatchQueue.main,SerialDispatchQueueScheduler維護一個串行隊列,ConcurrentDispatchQueueScheduler維護一個并行隊列; 當需要執(zhí)行action時,調(diào)度對應的隊列執(zhí)行action即可。
這三個調(diào)度者都遵循了SchedulerType協(xié)議,SchedulerType繼承ImmediateSchedulerType,ImmediateSchedulerType有一個協(xié)議方法:
func schedule<StateType>(_ state: StateType, action: @escaping (StateType) -> Disposable) -> Disposable用來執(zhí)行隊列調(diào)度。
Scheduler包含一個DispatchQueueConfiguration對象,DispatchQueueConfiguration對象持有隊列,當執(zhí)行調(diào)度方法時,會轉(zhuǎn)發(fā)到這個類的schedule進行實際的調(diào)度。