Rx ObservableSequence 遞歸調(diào)度


        DispatchQueue.global().async {
            assert(!Thread.current.isMainThread)
            Observable.from(["??", "??", "??", "??"], scheduler: MainScheduler.instance)
                .subscribe { event in
                    assert(Thread.current.isMainThread)
                    print(event)
            }
        }


先看下簡(jiǎn)單的例子:
上面例子的訂閱事件會(huì)被調(diào)度到主線程中去,那么具體是怎么調(diào)度,以及細(xì)節(jié)是什么樣的呢?


  public static func from(_ array: [E], scheduler: ImmediateSchedulerType = CurrentThreadScheduler.instance) -> Observable<E> {
        return ObservableSequence(elements: array, scheduler: scheduler)
    }


// ObservableSequenceSink run
    func run() -> Disposable {
        
        return _parent._scheduler.scheduleRecursive((_parent._elements.makeIterator(), _parent._elements)) { (iterator, recurse) in

            var mutableIterator = iterator

            if let next = mutableIterator.0.next() {
                print("scheduleRecursive: \(next)")
                self.forwardOn(.next(next))
                recurse(mutableIterator)
            }
            else {
                self.forwardOn(.completed)
                self.dispose()
            }
        }
    }


輕車熟路的找到入口位置

// scheduleRecursive method
    public func scheduleRecursive<State>(_ state: State, action: @escaping (_ state: State, _ recurse: (State) -> ()) -> ()) -> Disposable {
        let recursiveScheduler = RecursiveImmediateScheduler(action: action, scheduler: self)
        
        recursiveScheduler.schedule(state)
        
        return Disposables.create(with: recursiveScheduler.dispose)
    }


這個(gè)scheduleRecursive的入?yún)⒎浅?fù)雜,要看懂它的結(jié)合上下文看,從方法名來(lái)看可以理解為遞歸調(diào)度,暫且先了解到這,繼續(xù)看。

    func schedule(_ state: State) {
        
        
        var scheduleState: ScheduleState = .initial

        let d = _scheduler.schedule(state) { (state) -> Disposable in
            // best effort
            if self._group.isDisposed {
                return Disposables.create()
            }
            
            let action = self._lock.calculateLocked { () -> Action? in
                switch scheduleState {
                case let .added(removeKey):
                    self._group.remove(for: removeKey)
                case .initial:
                    break
                case .done:
                    break
                }

                scheduleState = .done

                return self._action
            }
            
            if let action = action {
                action(state, self.schedule)
            }
            
            return Disposables.create()
        }
 
                
        
        _lock.performLocked {
            switch scheduleState {
            case .added:
                rxFatalError("Invalid state")
                break
            case .initial:
                if let removeKey = _group.insert(d) {
                    scheduleState = .added(removeKey)
                }
                else {
                    scheduleState = .done
                }
                break
            case .done:
                break
            }
        }
    }


暫且不管它的生命周期和線程安全 ,去掉這些細(xì)節(jié)代碼就變成如下模樣:


    func schedule(_ state: State) {
        
        let d = _scheduler.schedule(state) { (state) -> Disposable in
            // best effort
            if self._group.isDisposed {
                return Disposables.create()
            }
 
            action(state, self.schedule)

            return Disposables.create()
        }
 
        

    }


去掉這些邊角料代碼就清晰多了, schedule(state) { } 這個(gè)是一個(gè)線程調(diào)度,確保closure在指定的線程運(yùn)行,詳情參見(jiàn):CurrentThreadScheduler

去掉這個(gè)線程調(diào)度,再來(lái)看一下:


    func schedule(_ state: State) {
         
        action(state, self.schedule)

    }


再結(jié)合著看一下:

// 代碼片段1

    func run() -> Disposable {
        
        return _parent._scheduler.scheduleRecursive((_parent._elements.makeIterator(), _parent._elements)) { (iterator, recurse) in

            var mutableIterator = iterator

            if let next = mutableIterator.0.next() {
                print("scheduleRecursive: \(next)")
                self.forwardOn(.next(next))
                recurse(mutableIterator)
            }
            else {
                self.forwardOn(.completed)
                self.dispose()
            }
        }
    }
    
    
// 代碼片段2

 recursiveScheduler.schedule(state)
 
// 代碼片段3
     func schedule(_ state: State) {
         
        action(state, self.schedule)

    }



----> 表示觸發(fā)的話,那么就是:
ObservableSequenceSink.run ----> recursiveScheduler.schedule ----> action, run函數(shù)的執(zhí)行最終會(huì)觸發(fā)action閉包的執(zhí)行。好了現(xiàn)在來(lái)分析一下:
action(state, self.schedule), 將對(duì)象自身schedule函數(shù)作為入?yún)魅腴]包,那么action閉包干了啥呢? 首先使用迭代器迭代下一個(gè)元素,如果不為空的話調(diào)用recurse(mutableIterator), 也就是 recursiveScheduler.schedule方法,recursiveScheduler.schedule又會(huì)調(diào)用action,如此往復(fù)直到遍歷所有元素。對(duì)迭代器不太理解的可以參考下面這個(gè)簡(jiǎn)單的例子:

         let numbers = [2, 3, 5, 7]
         var numbersIterator = numbers.makeIterator()
   
         while let num = numbersIterator.next() {
             print(num)
         }
         // Prints "2"
         // Prints "3"
         // Prints "5"
         // Prints "7"


再來(lái)復(fù)習(xí)一下遞歸,什么時(shí)候可以使用遞歸:

  1. 一個(gè)問(wèn)題總可以分解為若干個(gè)規(guī)模更小的相似的問(wèn)題
  2. 最小的子問(wèn)題可以直接求解

顯然序列的遍歷問(wèn)題滿足這個(gè)條件,對(duì)一個(gè)序列的遍歷總可以分解為:

  • 求解序列的首部元素
  • 遍歷除開(kāi)首部元素的子序列

回頭再分析下 scheduleRecursive 方法

    public func scheduleRecursive<State>(_ state: State, action: @escaping (_ state: State, _ recurse: (State) -> ()) -> ()) -> Disposable

  • state: 初始值,action在執(zhí)行的時(shí)候必須要有個(gè)實(shí)參,這個(gè)值是由state決定
  • action:重點(diǎn)介紹這個(gè), 第一個(gè)入?yún)ⅲ?state 即當(dāng)前的值
    第二個(gè)入?yún)? 即 (State) -> (), 這個(gè)閉包用來(lái)做一些額外的處理, 比如在上面的例子當(dāng)中,就成功的將每一次迭代都放在指定的線程執(zhí)行。

了解這些我們可以寫個(gè)非遞歸的run方法,看起來(lái)如下:


        return _parent._scheduler.schedule(_parent._elements.makeIterator(), action: { iterator  -> Disposable in
            var mutableIterator = iterator
            
            while let next = mutableIterator.next() {
                self.forwardOn(.next(next))
                 print("unScheduleRecursive: \(next)")
            }
            
            self.forwardOn(.completed)
            self.dispose()
            return Disposables.create()
        })


你看多好啊,線程調(diào)度也實(shí)現(xiàn)了,而且簡(jiǎn)潔明了為什么非要用遞歸調(diào)度呢? 我一次調(diào)度就完事了。我想了很久終于想到了, 想象一個(gè)應(yīng)用場(chǎng)景假設(shè)這個(gè)序列元素非常多,就比如有100萬(wàn)個(gè),當(dāng)?shù)鞯降?00個(gè)的時(shí)候突然客戶說(shuō)后面的我都不要了,那你怎么讓這個(gè)迭代器停止避免不必要的運(yùn)算呢?為了驗(yàn)證這個(gè)想法了寫了如下代碼:

         let range = 0..<100000
        let source = range.map {"\($0)" }
         fromArrayDisposables =  Observable.from(source)
            .elementAt(1000)
            .subscribe(onNext: {[weak self] element in
                print(element)       
            })


然后我在run函數(shù)加了一日志語(yǔ)句, 果然迭代器迭代到1000的時(shí)候,就停止了不再迭代,換上我自己的寫的run函數(shù), 迭代器一口氣迭代完了100000個(gè)元素,這里不討論elementAt具體是怎么實(shí)現(xiàn)的,只是想說(shuō)明這個(gè)遞歸調(diào)度不是為了炫耀技術(shù)是,而是出于可擴(kuò)展性需要而實(shí)現(xiàn)的。

線程安全和生命周期管理


  func schedule(_ state: State) {
        
        //FIX ME
   // ConcurrentMainScheduler
        var scheduleState: ScheduleState = .initial

        let d = _scheduler.schedule(state) { (state) -> Disposable in
            // best effort
            if self._group.isDisposed {
                return Disposables.create()
            }
            
            let action = self._lock.calculateLocked { () -> Action? in
                switch scheduleState {
                case let .added(removeKey):
                    self._group.remove(for: removeKey)
                case .initial:
                    break
                case .done:
                    break
                }

                scheduleState = .done

                return self._action
            }
            
            
            
            if let action = action {
                action(state, self.schedule)
            }
            
            return Disposables.create()
        }
 

        
        
        _lock.performLocked {
            switch scheduleState {
            case .added:
                rxFatalError("Invalid state")
                break
            case .initial:
                if let removeKey = _group.insert(d) {
                    scheduleState = .added(removeKey)
                }
                else {
                    scheduleState = .done
                }
                break
            case .done:
                break
            }
        }
    }


這里主要是要保證scheduleState的線程安全, schedule {}的代碼塊不一定立馬執(zhí)行,如果調(diào)度的線程正忙的時(shí)候,只會(huì)簡(jiǎn)單的將closure存儲(chǔ)起來(lái)。


//代碼塊1
            let action = self._lock.calculateLocked { () -> Action? in
                switch scheduleState {
                case let .added(removeKey):
                    self._group.remove(for: removeKey)
                case .initial:
                    break
                case .done:
                    break
                }

                scheduleState = .done

                return self._action
            }



// 代碼塊2
    _lock.performLocked {
            switch scheduleState {
            case .added:
                rxFatalError("Invalid state")
                break
            case .initial:
                if let removeKey = _group.insert(d) {
                    scheduleState = .added(removeKey)
                }
                else {
                    scheduleState = .done
                }
                break
            case .done:
                break
            }
        }

也就是說(shuō)這兩個(gè)代碼塊不一定按照順序執(zhí)行,而這兩塊代碼都涉及到scheduleState的狀態(tài),所以必須加鎖。 先假設(shè)按12執(zhí)行,那么皆大歡喜。

  • 代碼塊1 會(huì)執(zhí)行 initial case
  • 代碼塊2 會(huì)執(zhí)行 done case

這兩個(gè)case什么都沒(méi)干

那么按21執(zhí)行呢?

  • 代碼塊2 會(huì)執(zhí)行 initial case
  • 代碼塊1 會(huì)執(zhí)行 added case

那么先將Disposable 加入到 _group中進(jìn)行管理,然后在閉包執(zhí)行完畢之后,執(zhí)行dispose,這個(gè)主要是為了管理其生命周期.如果沒(méi)有這個(gè)dispose操作,有可能造成內(nèi)存泄漏

Simple Demo


/// 實(shí)現(xiàn)階加, 比如 輸入5的話 ,就是 1 + 2 + 3 + 4 + 5

typealias Action =  (_ state: Int, _ recurse: (Int) -> Void) -> Void


struct RecursiveAdder {
    
    // 最簡(jiǎn)單的遞歸,遞歸的缺陷在于容易造成爆棧,當(dāng)然這種遞歸也是尾遞歸,很多編譯器能夠?qū)ζ溥M(jìn)行優(yōu)化
    static func recursiveAdd(_ number: Int, result: Int = 0) -> Int {
        
        if number <= 0 {
            return result
        } else {
            let updateResult = result + number
            return recursiveAdd(number - 1, result: updateResult)
        }
    }
    
    // 非遞歸
    static func unRecursiveAdd(_ number: Int) -> Int {
        var m = number
        var result = 0
        while m > 0 {
            result = addNumber(m, number2: result)
            m -= 1
        }
        return result
    }
    
    static func addNumber(_ number1: Int, number2: Int) -> Int {
        return number1 + number2
    }
    
    // 重現(xiàn)Rx 遞歸調(diào)度原理
    static func functinalRecursiveAdd(_ number: Int, result: Int = 0) -> Int {
        var _result = result
        // number 當(dāng)前狀態(tài)
        // recursive 遞歸調(diào)度函數(shù)
        // number - 1 模擬迭代器迭代
        add(number) { (number, recursive) in
            _result += number
            if number <= 0 {
                
                print("recursive end")
            } else {
                recursive(number - 1)
            }
        }
        
        return _result
        
    }
    
    static func add(_ state: Int, action: @escaping Action)  {
        
        let item = RecursiveAdderItem(action: action)
        item.add(state)
        
    }
}

struct RecursiveAdderItem {
    let action: Action
    
    // 這里什么也沒(méi)有做,在rx中這一步是做了線程調(diào)度的
    func add(_ state: Int)  {
        action(state, self.add)
    }
}



最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 1. Java基礎(chǔ)部分 基礎(chǔ)部分的順序:基本語(yǔ)法,類相關(guān)的語(yǔ)法,內(nèi)部類的語(yǔ)法,繼承相關(guān)的語(yǔ)法,異常的語(yǔ)法,線程的語(yǔ)...
    子非魚(yú)_t_閱讀 34,696評(píng)論 18 399
  • Java繼承關(guān)系初始化順序 父類的靜態(tài)變量-->父類的靜態(tài)代碼塊-->子類的靜態(tài)變量-->子類的靜態(tài)代碼快-->父...
    第六象限閱讀 2,251評(píng)論 0 9
  • 孩子撒謊一定是父母給孩子制造了一個(gè)不撒謊活不下去的理由?!娣?01、奇怪的謊言 最近相繼有幾位媽媽找我,說(shuō)起孩...
    臨界冰閱讀 7,776評(píng)論 35 156
  • 貓,是懷中虎。 蟋蟀,是掌中牛。 雞,是庭前鷹鷲。 盆栽,是屋里山水。 珠玉,是指尖上的天空、海洋、大地。
    中浮閱讀 287評(píng)論 0 0
  • 我個(gè)人很喜歡,但是不推薦。 這部作品真沒(méi)什么必要用太長(zhǎng)篇幅去分析具體劇情,對(duì)它的評(píng)價(jià)不用搞那么復(fù)雜。 我個(gè)人喜歡是...
    風(fēng)格里哦閱讀 1,368評(píng)論 0 3

友情鏈接更多精彩內(nèi)容