DispatchQueue.global().async {
assert(!Thread.current.isMainThread)
Observable.from(["??", "??", "??", "??"], scheduler: MainScheduler.instance)
.subscribe { event in
assert(Thread.current.isMainThread)
print(event)
}
}
先看下簡(jiǎn)單的例子:
上面例子的訂閱事件會(huì)被調(diào)度到主線程中去,那么具體是怎么調(diào)度,以及細(xì)節(jié)是什么樣的呢?
public static func from(_ array: [E], scheduler: ImmediateSchedulerType = CurrentThreadScheduler.instance) -> Observable<E> {
return ObservableSequence(elements: array, scheduler: scheduler)
}
// ObservableSequenceSink run
func run() -> Disposable {
return _parent._scheduler.scheduleRecursive((_parent._elements.makeIterator(), _parent._elements)) { (iterator, recurse) in
var mutableIterator = iterator
if let next = mutableIterator.0.next() {
print("scheduleRecursive: \(next)")
self.forwardOn(.next(next))
recurse(mutableIterator)
}
else {
self.forwardOn(.completed)
self.dispose()
}
}
}
輕車熟路的找到入口位置
// scheduleRecursive method
public func scheduleRecursive<State>(_ state: State, action: @escaping (_ state: State, _ recurse: (State) -> ()) -> ()) -> Disposable {
let recursiveScheduler = RecursiveImmediateScheduler(action: action, scheduler: self)
recursiveScheduler.schedule(state)
return Disposables.create(with: recursiveScheduler.dispose)
}
這個(gè)scheduleRecursive的入?yún)⒎浅?fù)雜,要看懂它的結(jié)合上下文看,從方法名來(lái)看可以理解為遞歸調(diào)度,暫且先了解到這,繼續(xù)看。
func schedule(_ state: State) {
var scheduleState: ScheduleState = .initial
let d = _scheduler.schedule(state) { (state) -> Disposable in
// best effort
if self._group.isDisposed {
return Disposables.create()
}
let action = self._lock.calculateLocked { () -> Action? in
switch scheduleState {
case let .added(removeKey):
self._group.remove(for: removeKey)
case .initial:
break
case .done:
break
}
scheduleState = .done
return self._action
}
if let action = action {
action(state, self.schedule)
}
return Disposables.create()
}
_lock.performLocked {
switch scheduleState {
case .added:
rxFatalError("Invalid state")
break
case .initial:
if let removeKey = _group.insert(d) {
scheduleState = .added(removeKey)
}
else {
scheduleState = .done
}
break
case .done:
break
}
}
}
暫且不管它的生命周期和線程安全 ,去掉這些細(xì)節(jié)代碼就變成如下模樣:
func schedule(_ state: State) {
let d = _scheduler.schedule(state) { (state) -> Disposable in
// best effort
if self._group.isDisposed {
return Disposables.create()
}
action(state, self.schedule)
return Disposables.create()
}
}
去掉這些邊角料代碼就清晰多了, schedule(state) { } 這個(gè)是一個(gè)線程調(diào)度,確保closure在指定的線程運(yùn)行,詳情參見(jiàn):CurrentThreadScheduler
去掉這個(gè)線程調(diào)度,再來(lái)看一下:
func schedule(_ state: State) {
action(state, self.schedule)
}
再結(jié)合著看一下:
// 代碼片段1
func run() -> Disposable {
return _parent._scheduler.scheduleRecursive((_parent._elements.makeIterator(), _parent._elements)) { (iterator, recurse) in
var mutableIterator = iterator
if let next = mutableIterator.0.next() {
print("scheduleRecursive: \(next)")
self.forwardOn(.next(next))
recurse(mutableIterator)
}
else {
self.forwardOn(.completed)
self.dispose()
}
}
}
// 代碼片段2
recursiveScheduler.schedule(state)
// 代碼片段3
func schedule(_ state: State) {
action(state, self.schedule)
}
用 ----> 表示觸發(fā)的話,那么就是:
ObservableSequenceSink.run ----> recursiveScheduler.schedule ----> action, run函數(shù)的執(zhí)行最終會(huì)觸發(fā)action閉包的執(zhí)行。好了現(xiàn)在來(lái)分析一下:
action(state, self.schedule), 將對(duì)象自身schedule函數(shù)作為入?yún)魅腴]包,那么action閉包干了啥呢? 首先使用迭代器迭代下一個(gè)元素,如果不為空的話調(diào)用recurse(mutableIterator), 也就是 recursiveScheduler.schedule方法,recursiveScheduler.schedule又會(huì)調(diào)用action,如此往復(fù)直到遍歷所有元素。對(duì)迭代器不太理解的可以參考下面這個(gè)簡(jiǎn)單的例子:
let numbers = [2, 3, 5, 7]
var numbersIterator = numbers.makeIterator()
while let num = numbersIterator.next() {
print(num)
}
// Prints "2"
// Prints "3"
// Prints "5"
// Prints "7"
再來(lái)復(fù)習(xí)一下遞歸,什么時(shí)候可以使用遞歸:
- 一個(gè)問(wèn)題總可以分解為若干個(gè)規(guī)模更小的相似的問(wèn)題
- 最小的子問(wèn)題可以直接求解
顯然序列的遍歷問(wèn)題滿足這個(gè)條件,對(duì)一個(gè)序列的遍歷總可以分解為:
- 求解序列的首部元素
- 遍歷除開(kāi)首部元素的子序列
回頭再分析下 scheduleRecursive 方法
public func scheduleRecursive<State>(_ state: State, action: @escaping (_ state: State, _ recurse: (State) -> ()) -> ()) -> Disposable
- state: 初始值,
action在執(zhí)行的時(shí)候必須要有個(gè)實(shí)參,這個(gè)值是由state決定 - action:重點(diǎn)介紹這個(gè), 第一個(gè)入?yún)ⅲ?state 即當(dāng)前的值
第二個(gè)入?yún)? 即 (State) -> (), 這個(gè)閉包用來(lái)做一些額外的處理, 比如在上面的例子當(dāng)中,就成功的將每一次迭代都放在指定的線程執(zhí)行。
了解這些我們可以寫個(gè)非遞歸的run方法,看起來(lái)如下:
return _parent._scheduler.schedule(_parent._elements.makeIterator(), action: { iterator -> Disposable in
var mutableIterator = iterator
while let next = mutableIterator.next() {
self.forwardOn(.next(next))
print("unScheduleRecursive: \(next)")
}
self.forwardOn(.completed)
self.dispose()
return Disposables.create()
})
你看多好啊,線程調(diào)度也實(shí)現(xiàn)了,而且簡(jiǎn)潔明了為什么非要用遞歸調(diào)度呢? 我一次調(diào)度就完事了。我想了很久終于想到了, 想象一個(gè)應(yīng)用場(chǎng)景假設(shè)這個(gè)序列元素非常多,就比如有100萬(wàn)個(gè),當(dāng)?shù)鞯降?00個(gè)的時(shí)候突然客戶說(shuō)后面的我都不要了,那你怎么讓這個(gè)迭代器停止避免不必要的運(yùn)算呢?為了驗(yàn)證這個(gè)想法了寫了如下代碼:
let range = 0..<100000
let source = range.map {"\($0)" }
fromArrayDisposables = Observable.from(source)
.elementAt(1000)
.subscribe(onNext: {[weak self] element in
print(element)
})
然后我在run函數(shù)加了一日志語(yǔ)句, 果然迭代器迭代到1000的時(shí)候,就停止了不再迭代,換上我自己的寫的run函數(shù), 迭代器一口氣迭代完了100000個(gè)元素,這里不討論elementAt具體是怎么實(shí)現(xiàn)的,只是想說(shuō)明這個(gè)遞歸調(diào)度不是為了炫耀技術(shù)是,而是出于可擴(kuò)展性需要而實(shí)現(xiàn)的。
線程安全和生命周期管理
func schedule(_ state: State) {
//FIX ME
// ConcurrentMainScheduler
var scheduleState: ScheduleState = .initial
let d = _scheduler.schedule(state) { (state) -> Disposable in
// best effort
if self._group.isDisposed {
return Disposables.create()
}
let action = self._lock.calculateLocked { () -> Action? in
switch scheduleState {
case let .added(removeKey):
self._group.remove(for: removeKey)
case .initial:
break
case .done:
break
}
scheduleState = .done
return self._action
}
if let action = action {
action(state, self.schedule)
}
return Disposables.create()
}
_lock.performLocked {
switch scheduleState {
case .added:
rxFatalError("Invalid state")
break
case .initial:
if let removeKey = _group.insert(d) {
scheduleState = .added(removeKey)
}
else {
scheduleState = .done
}
break
case .done:
break
}
}
}
這里主要是要保證scheduleState的線程安全, schedule {}的代碼塊不一定立馬執(zhí)行,如果調(diào)度的線程正忙的時(shí)候,只會(huì)簡(jiǎn)單的將closure存儲(chǔ)起來(lái)。
//代碼塊1
let action = self._lock.calculateLocked { () -> Action? in
switch scheduleState {
case let .added(removeKey):
self._group.remove(for: removeKey)
case .initial:
break
case .done:
break
}
scheduleState = .done
return self._action
}
// 代碼塊2
_lock.performLocked {
switch scheduleState {
case .added:
rxFatalError("Invalid state")
break
case .initial:
if let removeKey = _group.insert(d) {
scheduleState = .added(removeKey)
}
else {
scheduleState = .done
}
break
case .done:
break
}
}
也就是說(shuō)這兩個(gè)代碼塊不一定按照順序執(zhí)行,而這兩塊代碼都涉及到scheduleState的狀態(tài),所以必須加鎖。 先假設(shè)按12執(zhí)行,那么皆大歡喜。
- 代碼塊1 會(huì)執(zhí)行 initial case
- 代碼塊2 會(huì)執(zhí)行 done case
這兩個(gè)case什么都沒(méi)干
那么按21執(zhí)行呢?
- 代碼塊2 會(huì)執(zhí)行 initial case
- 代碼塊1 會(huì)執(zhí)行 added case
那么先將Disposable 加入到 _group中進(jìn)行管理,然后在閉包執(zhí)行完畢之后,執(zhí)行dispose,這個(gè)主要是為了管理其生命周期.如果沒(méi)有這個(gè)dispose操作,有可能造成內(nèi)存泄漏
Simple Demo
/// 實(shí)現(xiàn)階加, 比如 輸入5的話 ,就是 1 + 2 + 3 + 4 + 5
typealias Action = (_ state: Int, _ recurse: (Int) -> Void) -> Void
struct RecursiveAdder {
// 最簡(jiǎn)單的遞歸,遞歸的缺陷在于容易造成爆棧,當(dāng)然這種遞歸也是尾遞歸,很多編譯器能夠?qū)ζ溥M(jìn)行優(yōu)化
static func recursiveAdd(_ number: Int, result: Int = 0) -> Int {
if number <= 0 {
return result
} else {
let updateResult = result + number
return recursiveAdd(number - 1, result: updateResult)
}
}
// 非遞歸
static func unRecursiveAdd(_ number: Int) -> Int {
var m = number
var result = 0
while m > 0 {
result = addNumber(m, number2: result)
m -= 1
}
return result
}
static func addNumber(_ number1: Int, number2: Int) -> Int {
return number1 + number2
}
// 重現(xiàn)Rx 遞歸調(diào)度原理
static func functinalRecursiveAdd(_ number: Int, result: Int = 0) -> Int {
var _result = result
// number 當(dāng)前狀態(tài)
// recursive 遞歸調(diào)度函數(shù)
// number - 1 模擬迭代器迭代
add(number) { (number, recursive) in
_result += number
if number <= 0 {
print("recursive end")
} else {
recursive(number - 1)
}
}
return _result
}
static func add(_ state: Int, action: @escaping Action) {
let item = RecursiveAdderItem(action: action)
item.add(state)
}
}
struct RecursiveAdderItem {
let action: Action
// 這里什么也沒(méi)有做,在rx中這一步是做了線程調(diào)度的
func add(_ state: Int) {
action(state, self.add)
}
}