07RXSwift的publish狀態(tài)共享

問(wèn)題:如果一個(gè)網(wǎng)絡(luò)請(qǐng)求的數(shù)據(jù)被多個(gè)地方多次使用,我們會(huì)進(jìn)行多次訂閱,結(jié)果會(huì)造成多次的網(wǎng)絡(luò)請(qǐng)求,代碼如下:

 let netOB = Observable<Any>.create { (observer) -> Disposable in
            sleep(2)// 模擬網(wǎng)絡(luò)延遲
            print("我開(kāi)始請(qǐng)求網(wǎng)絡(luò)了")
            observer.onNext("請(qǐng)求到的網(wǎng)絡(luò)數(shù)據(jù)")
            observer.onNext("請(qǐng)求到的本地")
            observer.onCompleted()
            return Disposables.create {
                print("銷毀回調(diào)了")
            }
            }
            .publish()
        
        netOB.subscribe(onNext: { (anything) in
            print("訂閱1:",anything)
        })
            
            .disposed(by: disposeBag)
        
        // 我們有時(shí)候不止一次網(wǎng)絡(luò)訂閱,因?yàn)橛袝r(shí)候我們的數(shù)據(jù)可能用在不同的額地方
        // 所以在訂閱一次 會(huì)出現(xiàn)什么問(wèn)題?
        netOB.subscribe(onNext: { (anything) in
            print("訂閱2:",anything)
        })
            .disposed(by: disposeBag)
         _ = netOB.connect()

如果沒(méi)有添加 .publish() 和 _ = netOB.connect(),則網(wǎng)絡(luò)請(qǐng)求data序列的創(chuàng)建會(huì)執(zhí)行兩次,而這是我們不需要的。

publish是如何做到網(wǎng)絡(luò)共享的呢?且看源碼分析:

首先,因?yàn)榭隙〞?huì)用到RXSwift的核心邏輯,這里放一張RXSwift的核心邏輯的思維導(dǎo)圖以供參考:


RXSwift核心邏輯簡(jiǎn)化圖.png

ok,上源碼:

----------------------------publish進(jìn)來(lái)---------------------------------
//封裝了一個(gè)multicast
 public func publish() -> ConnectableObservable<Element> {
        return self.multicast { PublishSubject() }
    }

public func multicast<Subject: SubjectType>(makeSubject: @escaping () -> Subject)
        -> ConnectableObservable<Subject.Element> where Subject.Observer.Element == Element {
        return ConnectableObservableAdapter(source: self.asObservable(), makeSubject: makeSubject)
    }
----------------------------核心方法,老長(zhǎng)了---------------------------------
final private class ConnectableObservableAdapter<Subject: SubjectType>
    : ConnectableObservable<Subject.Element> {
    typealias ConnectionType = Connection<Subject>

    fileprivate let _source: Observable<Subject.Observer.Element>
    fileprivate let _makeSubject: () -> Subject

    fileprivate let _lock = RecursiveLock()
    fileprivate var _subject: Subject?

    // state
    fileprivate var _connection: ConnectionType?

    init(source: Observable<Subject.Observer.Element>, makeSubject: @escaping () -> Subject) {
        self._source = source
        self._makeSubject = makeSubject
        self._subject = nil
        self._connection = nil
    }

    override func connect() -> Disposable {
        return self._lock.calculateLocked {
            if let connection = self._connection {
                return connection
            }

            let singleAssignmentDisposable = SingleAssignmentDisposable()
            let connection = Connection(parent: self, subjectObserver: self.lazySubject.asObserver(), lock: self._lock, subscription: singleAssignmentDisposable)
            self._connection = connection
            let subscription = self._source.subscribe(connection)
            singleAssignmentDisposable.setDisposable(subscription)
            return connection
        }
    }

    fileprivate var lazySubject: Subject {
        if let subject = self._subject {
            return subject
        }

        let subject = self._makeSubject()
        self._subject = subject
        return subject
    }
//自己重寫一個(gè)subscribe
    override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Subject.Element {
        return self.lazySubject.subscribe(observer) 
    }
}

ConnectableObservableAdapter序列繼承的是ConnectableObservable,并沒(méi)有繼承produce,沒(méi)有繼承produce??是的,問(wèn)題很嚴(yán)重,因?yàn)闆](méi)有produce,意味著就沒(méi)有subscribe函數(shù)的具體實(shí)現(xiàn),繼承ConnectableObservable我們只有一個(gè)subscribe的抽象函數(shù),怎么辦?答案是自己重寫一個(gè)subscribe.

先總結(jié)一下序列的創(chuàng)建:

  1. 首先是正常的序列創(chuàng)建create,創(chuàng)建的序列保存了閉包1??;
  2. 然后通過(guò).publish(),得到一個(gè)ConnectableObservableAdapter序列,也就是我們的netOB;
  3. netOB序列,沒(méi)有繼承produce,自己重寫了subscribe
  4. self.lazySubject.subscribe(observer) 中的序列self.lazySubject是懶加載的,意味著每次執(zhí)行重寫的subscribe,調(diào)用的是同一個(gè)序列。

然后就是訂閱了,訂閱---->創(chuàng)建一個(gè)觀察者observer,保存閉包---->然后subscribe,是調(diào)用的重寫的self.lazySubject.subscribe方法,最后走的是PublishSubjectsubscribe

 public override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
        self._lock.lock()
        let subscription = self._synchronized_subscribe(observer)
        self._lock.unlock()
        return subscription
    }

    func _synchronized_subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
        if let stoppedEvent = self._stoppedEvent {
            observer.on(stoppedEvent)
            return Disposables.create()
        }
        
        if self._isDisposed {
            observer.on(.error(RxError.disposed(object: self)))
            return Disposables.create()
        }
        
        let key = self._observers.insert(observer.on)
        return SubscriptionDisposable(owner: self, key: key)
    }

到這里,其實(shí)只做了兩件事情:

  1. let key = self._observers.insert(observer.on)---->觀察者回調(diào)的收集,這樣就可以等接收到信號(hào)發(fā)送后統(tǒng)一響應(yīng),而且self._lock.lock()self._lock.unlock()的處理還能保證響應(yīng)的執(zhí)行順序不會(huì)打亂;
  2. return SubscriptionDisposable(owner: self, key: key)---->垃圾袋的處理,這個(gè)先不管。
    說(shuō)明:因?yàn)闆](méi)有對(duì)應(yīng)sink.on方法的執(zhí)行,或者說(shuō)是沒(méi)有具體閉包1??的調(diào)用,也可以理解為是沒(méi)有消息發(fā)送onNext,就是create保存的閉包沒(méi)法調(diào)用。解決的方法就是調(diào)用_ = netOB.connect().大家可以嘗試一下,如果只寫.publish()不寫_ = netOB.connect(),是得不到如何的回調(diào)的,沒(méi)有任何的打印出現(xiàn)。

下面講解connect()方法:netOB.connect()就是序列 ConnectableObservableAdapter內(nèi)的connect方法,上面已經(jīng)貼出來(lái)了,這里再放一次代碼:

override func connect() -> Disposable {
        return self._lock.calculateLocked {
            if let connection = self._connection {
                return connection
            }

            let singleAssignmentDisposable = SingleAssignmentDisposable()
            let connection = Connection(parent: self, subjectObserver: self.lazySubject.asObserver(), lock: self._lock, subscription: singleAssignmentDisposable)
            self._connection = connection
            let subscription = self._source.subscribe(connection)
            singleAssignmentDisposable.setDisposable(subscription)
            return connection
        }
    }

let connection = Connection(parent: self, subjectObserver: self.lazySubject.asObserver(), lock: self._lock, subscription: singleAssignmentDisposable)這個(gè)方法,內(nèi)部是這樣的:

final private class Connection<Subject: SubjectType>: ObserverType, Disposable {
   ...
init(parent: ConnectableObservableAdapter<Subject>, subjectObserver: Subject.Observer, lock: RecursiveLock, subscription: Disposable) {
        self._parent = parent
        self._subscription = subscription
        self._lock = lock
        self._subjectObserver = subjectObserver
    }
    func on(_ event: Event<Subject.Observer.Element>) {
        if isFlagSet(self._disposed, 1) {
            return
        }
        if event.isStopEvent {
            self.dispose()
        }
        self._subjectObserver.on(event)
    }
...
}

傳過(guò)來(lái)的觀察者是 self.lazySubject.asObserver()創(chuàng)建的,self.lazySubject只有一個(gè),意味著觀察者只有一個(gè),然后實(shí)現(xiàn)了on方法,on方法如下:

 public func on(_ event: Event<Element>) {
        #if DEBUG
            self._synchronizationTracker.register(synchronizationErrorMessage: .default)
            defer { self._synchronizationTracker.unregister() }
        #endif
        dispatch(self._synchronized_on(event), event)
    }

    func _synchronized_on(_ event: Event<Element>) -> Observers {
        self._lock.lock(); defer { self._lock.unlock() }
        switch event {
        case .next:
            if self._isDisposed || self._stopped {
                return Observers()
            }
            
            return self._observers
        case .completed, .error:
            if self._stoppedEvent == nil {
                self._stoppedEvent = event
                self._stopped = true
                let observers = self._observers
                self._observers.removeAll()
                return observers
            }

            return Observers()
        }
    }

后面的邏輯就是和RXSwift一模模一樣樣的了,就不再贅敘了。

最后的思考:

  1. 為什么publish能實(shí)現(xiàn)只網(wǎng)絡(luò)請(qǐng)求一次?因?yàn)橛^察者只有一個(gè),所以對(duì)閉包1??的調(diào)用只有一次;(類似于有多個(gè)block實(shí)現(xiàn),但是我在調(diào)用了一次self.block();
    詳細(xì)點(diǎn)解釋就是我們通過(guò)self._subjectObserver.on(event)發(fā)起了響應(yīng)--->接收的是有netOB.subscribe創(chuàng)建保存的兩個(gè)閉包,so。
  2. 如果不用懶加載lazySubject行不行?答案是可以的,只是用了更好,為啥子,自己探究下??。
  3. publishconnetc被多次調(diào)用會(huì)有什么影響?(懶加載起到了什么作用,connect創(chuàng)建流程分析就可以知道答案了)
  4. 為什么publish新創(chuàng)建的序列是publishSubject類型?見(jiàn)后面博客Subject的分析。
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容