RxSwift - Suject

前言

RxSwift中Subject是一種非常特殊的序列協(xié)議,全局搜索class Subject / struct Subject都無結(jié)果,搜索protocol Subject獲取到SubjectType協(xié)議,其中有asObserver方法,并且遵循ObservableType ->遵循->ObservableConvertibleType, 也就有了asObservable方法。

SujectType

所以遵循SubjectType的序列同時有了序列和觀察者相應的技能。在實際開發(fā)中應用很頻繁,這樣就能自己訂閱自己,自己響應自己的訂閱了。
下面我們就來使用及分析幾種常用的Subject。

1、PublishSubject

先來看一段使用的示例代碼:

      // 1:初始化序列
        let publishSub = PublishSubject<Int>() //初始化一個PublishSubject 裝著Int類型的序列
        // 2:發(fā)送響應序列
        publishSub.onNext(1)
        // 3:訂閱序列
        publishSub.subscribe { print("訂閱到了:", $0) }
            .disposed(by: disposbag)
        // 再次發(fā)送響應
        publishSub.onNext(2)
        publishSub.onNext(3)
//輸出:訂閱到了: next(2)
//訂閱到了: next(3)

查看輸出,只打印了2和3,那為什么1沒有打印呢?
上一篇文章RxSwift - 高階函數(shù)中關于publish的使用分析,其中publish就是對PublishSubject的封裝,證明了publish就是將多個觀察者交由唯一的PublishSubject來管理,PublishSubject將所有觀察者.on方法保存,并順序地響應外界的訂閱事件。
那么此處關于1為響應的問題,還是得分析訂閱.subscribePublishSubject.on關鍵代碼:

subscribe

PublishSubject.on

此處.onNext(1)執(zhí)行時,暫未有過.subscribe訂閱,所以暫不會生成任何觀察者,即PublishSubject中的observers中啥也沒有,也就不會有任何的observer.on響應任何的事件。那么.onNext(1)也就無從響應。
publishSub.subscribe之后,才有了觀察者,observers中也就會將觀察者.on保存以響應后續(xù)事件。

2、BehaviorSubject
        let behaviorSub = BehaviorSubject.init(value: 100)
        // 2:發(fā)送信號
        //behaviorSub.onNext(2)
        //behaviorSub.onNext(3)
        // 3:訂閱序列
        behaviorSub.subscribe{ print("訂閱到了1:", $0) }
            .disposed(by: disposbag)
        // 再次發(fā)送
        //behaviorSub.onNext(4)
        //behaviorSub.onNext(5)
        // 再次訂閱
        behaviorSub.subscribe{ print("訂閱到了:", $0) }
            .disposed(by: disposbag)
//輸出:訂閱到了1: next(100)
//訂閱到了: next(100)

此時代碼中的2、3、4、5都是已注釋狀態(tài),那么訂閱之后會響應初始值100。
打開2、3的注釋結(jié)果打?。?/p>

訂閱到了1: next(3)
訂閱到了: next(3)

說明此時響應了最新的序列元素3。
接著打開4、5的注釋結(jié)果打印:

訂閱到了1: next(3)
訂閱到了1: next(4)
訂閱到了1: next(5)
訂閱到了: next(5)

此時可以理解為,在第一次訂閱之前的只響應最新的,但訂閱之后的都會響應,第二次訂閱響應了之前最新的序列5。
問題一:為什么會有初始值,且會保存最新的值?
問題二:相對于PublishSubject沒有訂閱就無法響應的問題,BehaviorSubject怎么能響應訂閱之前的事件?
查看BehaviorSubject初始化方法:

BehaviorSubject初始化

發(fā)現(xiàn)利用一個屬性變量element將初始化的值保存起來。
那這個element是怎么保存最新值的?每當.onNext()時候就會來到BehaviorSubject.on方法中,接著在synchronized_on中將最新值給element:
element賦最新值

接著我們來看訂閱之后都干了什么:

.subscribe

與PublishSubject不同的是,訂閱發(fā)起時候,在保存完observer.on之后,默認會執(zhí)行一次observer.on即會響應之前保存最新的序列元素值。每一次.onNext的作用就是更新element的值。

3、ReplaySubject
let replaySub = ReplaySubject<Int>.create(bufferSize: 2)
        // let replaySub = ReplaySubject<Int>.createUnbounded()

        // 2:發(fā)送信號
        replaySub.onNext(1)
        replaySub.onNext(2)
        replaySub.onNext(3)
        replaySub.onNext(4)

        // 3:訂閱序列
        replaySub.subscribe{ print("訂閱到了:", $0) }
            .disposed(by: disposbag)
        // 再次發(fā)送
        replaySub.onNext(7)
        replaySub.onNext(8)
        replaySub.onNext(9)
//輸出:訂閱到了: next(3)
//訂閱到了: next(4)
//訂閱到了: next(7)
//訂閱到了: next(8)
//訂閱到了: next(9)

能打印3、4以及對應bufferSize: 2可知,會保存下最新的有效的兩次序列元素。
我們直接來到.create方法:

.create

其中還對緩存?zhèn)€數(shù)是否為1做了區(qū)分。ReplayOne與ReplayMany有共同的祖先類ReplayBufferBase,所以我們來到ReplayBufferBase:
ReplayBufferBase

所以查看訂閱以及響應可直接看ReplayBufferBase中的實現(xiàn)方法。重點看ReplayBufferBase中的onsubscribe兩個方法實現(xiàn):
synchronized_on

addValueToBuffer字面意思就是將值存入緩存,重點?。?!
分別查看在子類ReplayOne和ReplayManyBase中的實現(xiàn):

override func addValueToBuffer(_ value: Element) {
        self.value = value
    }
override func addValueToBuffer(_ value: Element) {
        self.queue.enqueue(value)
    }

ReplayOne只是將當前值保存在屬性self.value中,當訂閱發(fā)起時就會調(diào)用.replayBuffer方法,便會執(zhí)行當先觀察者.on觸發(fā)事件,并將當前保存的value發(fā)射出去:

ReplayOne

ReplayManyBase則是將值保存在值保存在一個集合屬性中,通過添加及更新集合中的元素來達到只緩存相應個數(shù)的元素。當訂閱發(fā)起時,就會從集合中將觀察者一個一個拿出來觸發(fā).on方法。

ReplayManyBase.replayBuffer

緩存機制分析
當訂閱發(fā)起之前,ReplayManyBase將值保存到集合中,集合中的queue容量大小是緩存大小加1,訂閱前一開始能存下1、2、3,此時會觸發(fā)trim(),即會調(diào)用刪除queue中元素queue為nil、2、3,當要存4時候,按照替換index為0->1->2->0這樣的循環(huán)順序?qū)⒑罄m(xù)數(shù)值更新進queue中,所以此時的queue由是4、2、3,此時也會觸發(fā)trim(),刪除舊的值2,queue此時為4、nil、3,此時訂閱發(fā)起,執(zhí)行self.replayBuffer(anyObserver),所以會遍歷queue中的item用于響應事件,斷點調(diào)試為逆序遍歷即先發(fā)射3再發(fā)射4。
當后續(xù)7來到時直接走的.on方法,當7來時,此時queue為4、7、3,會直接走dispatch(self.synchronized_on(event), event)即:

dispatch

接著會queue為4、7、nil,然后發(fā)射當前的7出去。
當8來時,queue為4、7、8,然后調(diào)用dequeue()后queue為nil、7、8,也只會響應oberver.on(event),那么也只會發(fā)射8出去。
當9來時,queue為9、7、8,然后調(diào)用dequeue(),最后queue為9、nil、8。
所以queue變化為:( 1, 2, 3) -> ( nil, 2, 3) -> ( 4, 2, 3) -> ( 4, nil, 3) -> ( 4, 7, 3) -> ( 4, 7, nil) -> ( 4, 7, 8) -> ( nil, 7, 8) -> ( 9, 7, 8) -> ( 9, nil, 8)
發(fā)射的順序還是按響應的先后順序發(fā)射。

4、AsyncSubject
// 1:創(chuàng)建序列
        let asynSub = AsyncSubject<Int>.init()
        // 2:發(fā)送信號
        asynSub.onNext(1)
        asynSub.onNext(2)
        // 3:訂閱序列
        asynSub.subscribe{ print("訂閱到了:", $0) }
            .disposed(by: disposbag)
        // 再次發(fā)送
        asynSub.onNext(3)
        asynSub.onNext(4)
        asynSub.onError(NSError.init(domain: "LcrError", code: 10085, userInfo: nil))
        asynSub.onCompleted()
//輸出:訂閱到了: error(Error Domain=LcrError Code=10085 "(null)")

發(fā)現(xiàn)當有錯誤響應時,前面的響應都無效了。
將錯誤注釋,再次打印結(jié)果為:

訂閱到了: next(4)
訂閱到了: completed

然后再給onCompleted添加注釋,發(fā)現(xiàn)沒有打印任何東西。
里面的工作原理是怎樣的呢?
我們把目光聚集在.on方法上:

.on

synchronized_on

.onNext來時,synchronized_on中會將當前值設為最新值,意思就是只保留最新的值,然后判斷.completed有沒有來。沒有的話就返回(Observers(),.completed)到.on中,即此時沒有有效觀察者,即使dispatch進去了也啥也不做。
.onError來時,會將前面保存的觀察者全部刪除,即使回到.on中也只會發(fā)射錯誤信息。
.completed來時,會將保存的最新的值返回放入.next返回到.on去的.next:中去執(zhí)行,最終也會調(diào)用.completed。

所以一定需要有.completed來且不能發(fā)射錯誤,才能正常響應訂閱信息。

5、BehaviorRelay

Variable : 5.0已經(jīng)廢棄(BehaviorRelay 替換)

// 1:創(chuàng)建序列
        let variableSub = BehaviorRelay.init(value: 1)
        // 2:發(fā)送信號
        variableSub.accept(100)
        variableSub.accept(10)
        // 3:訂閱信號
        variableSub.asObservable().subscribe{ print("訂閱到了:", $0) }
            .disposed(by: disposbag)
        print("打印:\(variableSub.value)")
        // 再次發(fā)送
        variableSub.accept(1000)
//輸出:訂閱到了: next(10)
//打印:10
//訂閱到了: next(1000)

BehaviorRelay不僅可以緩存最新的元素,還可以通過屬性訪問到最新緩存的元素。


BehaviorRelay

源碼可知,BehaviorRelay就是內(nèi)部封裝了一個BehaviorSubject類型的序列subject,利用BehaviorSubject能緩存以及能訪問屬性value的特性來實現(xiàn)屬性訪問。

最后編輯于
?著作權歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容