問(wèn)題:如果一個(gè)網(wǎng)絡(luò)請(qǐng)求的數(shù)據(jù)被多個(gè)地方多次使用,我們會(huì)進(jìn)行多次訂閱,結(jié)果會(huì)造成多次的網(wǎng)絡(luò)請(qǐng)求,代碼如下:
let netOB = Observable<Any>.create { (observer) -> Disposable in
sleep(2)// 模擬網(wǎng)絡(luò)延遲
print("我開(kāi)始請(qǐng)求網(wǎng)絡(luò)了")
observer.onNext("請(qǐng)求到的網(wǎng)絡(luò)數(shù)據(jù)")
observer.onNext("請(qǐng)求到的本地")
observer.onCompleted()
return Disposables.create {
print("銷毀回調(diào)了")
}
}
.publish()
netOB.subscribe(onNext: { (anything) in
print("訂閱1:",anything)
})
.disposed(by: disposeBag)
// 我們有時(shí)候不止一次網(wǎng)絡(luò)訂閱,因?yàn)橛袝r(shí)候我們的數(shù)據(jù)可能用在不同的額地方
// 所以在訂閱一次 會(huì)出現(xiàn)什么問(wèn)題?
netOB.subscribe(onNext: { (anything) in
print("訂閱2:",anything)
})
.disposed(by: disposeBag)
_ = netOB.connect()
如果沒(méi)有添加 .publish() 和 _ = netOB.connect(),則網(wǎng)絡(luò)請(qǐng)求data序列的創(chuàng)建會(huì)執(zhí)行兩次,而這是我們不需要的。
publish是如何做到網(wǎng)絡(luò)共享的呢?且看源碼分析:
首先,因?yàn)榭隙〞?huì)用到RXSwift的核心邏輯,這里放一張RXSwift的核心邏輯的思維導(dǎo)圖以供參考:

ok,上源碼:
----------------------------publish進(jìn)來(lái)---------------------------------
//封裝了一個(gè)multicast
public func publish() -> ConnectableObservable<Element> {
return self.multicast { PublishSubject() }
}
public func multicast<Subject: SubjectType>(makeSubject: @escaping () -> Subject)
-> ConnectableObservable<Subject.Element> where Subject.Observer.Element == Element {
return ConnectableObservableAdapter(source: self.asObservable(), makeSubject: makeSubject)
}
----------------------------核心方法,老長(zhǎng)了---------------------------------
final private class ConnectableObservableAdapter<Subject: SubjectType>
: ConnectableObservable<Subject.Element> {
typealias ConnectionType = Connection<Subject>
fileprivate let _source: Observable<Subject.Observer.Element>
fileprivate let _makeSubject: () -> Subject
fileprivate let _lock = RecursiveLock()
fileprivate var _subject: Subject?
// state
fileprivate var _connection: ConnectionType?
init(source: Observable<Subject.Observer.Element>, makeSubject: @escaping () -> Subject) {
self._source = source
self._makeSubject = makeSubject
self._subject = nil
self._connection = nil
}
override func connect() -> Disposable {
return self._lock.calculateLocked {
if let connection = self._connection {
return connection
}
let singleAssignmentDisposable = SingleAssignmentDisposable()
let connection = Connection(parent: self, subjectObserver: self.lazySubject.asObserver(), lock: self._lock, subscription: singleAssignmentDisposable)
self._connection = connection
let subscription = self._source.subscribe(connection)
singleAssignmentDisposable.setDisposable(subscription)
return connection
}
}
fileprivate var lazySubject: Subject {
if let subject = self._subject {
return subject
}
let subject = self._makeSubject()
self._subject = subject
return subject
}
//自己重寫一個(gè)subscribe
override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Subject.Element {
return self.lazySubject.subscribe(observer)
}
}
ConnectableObservableAdapter序列繼承的是ConnectableObservable,并沒(méi)有繼承produce,沒(méi)有繼承produce??是的,問(wèn)題很嚴(yán)重,因?yàn)闆](méi)有produce,意味著就沒(méi)有subscribe函數(shù)的具體實(shí)現(xiàn),繼承ConnectableObservable我們只有一個(gè)subscribe的抽象函數(shù),怎么辦?答案是自己重寫一個(gè)subscribe.
先總結(jié)一下序列的創(chuàng)建:
- 首先是正常的序列創(chuàng)建
create,創(chuàng)建的序列保存了閉包1??;- 然后通過(guò)
.publish(),得到一個(gè)ConnectableObservableAdapter序列,也就是我們的netOB;netOB序列,沒(méi)有繼承produce,自己重寫了subscribe;self.lazySubject.subscribe(observer)中的序列self.lazySubject是懶加載的,意味著每次執(zhí)行重寫的subscribe,調(diào)用的是同一個(gè)序列。
然后就是訂閱了,訂閱---->創(chuàng)建一個(gè)觀察者observer,保存閉包---->然后subscribe,是調(diào)用的重寫的self.lazySubject.subscribe方法,最后走的是PublishSubject的subscribe
public override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
self._lock.lock()
let subscription = self._synchronized_subscribe(observer)
self._lock.unlock()
return subscription
}
func _synchronized_subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
if let stoppedEvent = self._stoppedEvent {
observer.on(stoppedEvent)
return Disposables.create()
}
if self._isDisposed {
observer.on(.error(RxError.disposed(object: self)))
return Disposables.create()
}
let key = self._observers.insert(observer.on)
return SubscriptionDisposable(owner: self, key: key)
}
到這里,其實(shí)只做了兩件事情:
let key = self._observers.insert(observer.on)---->觀察者回調(diào)的收集,這樣就可以等接收到信號(hào)發(fā)送后統(tǒng)一響應(yīng),而且self._lock.lock()與self._lock.unlock()的處理還能保證響應(yīng)的執(zhí)行順序不會(huì)打亂;return SubscriptionDisposable(owner: self, key: key)---->垃圾袋的處理,這個(gè)先不管。
說(shuō)明:因?yàn)闆](méi)有對(duì)應(yīng)sink.on方法的執(zhí)行,或者說(shuō)是沒(méi)有具體閉包1??的調(diào)用,也可以理解為是沒(méi)有消息發(fā)送onNext,就是create保存的閉包沒(méi)法調(diào)用。解決的方法就是調(diào)用_ = netOB.connect().大家可以嘗試一下,如果只寫.publish()不寫_ = netOB.connect(),是得不到如何的回調(diào)的,沒(méi)有任何的打印出現(xiàn)。
下面講解connect()方法:netOB.connect()就是序列 ConnectableObservableAdapter內(nèi)的connect方法,上面已經(jīng)貼出來(lái)了,這里再放一次代碼:
override func connect() -> Disposable {
return self._lock.calculateLocked {
if let connection = self._connection {
return connection
}
let singleAssignmentDisposable = SingleAssignmentDisposable()
let connection = Connection(parent: self, subjectObserver: self.lazySubject.asObserver(), lock: self._lock, subscription: singleAssignmentDisposable)
self._connection = connection
let subscription = self._source.subscribe(connection)
singleAssignmentDisposable.setDisposable(subscription)
return connection
}
}
let connection = Connection(parent: self, subjectObserver: self.lazySubject.asObserver(), lock: self._lock, subscription: singleAssignmentDisposable)這個(gè)方法,內(nèi)部是這樣的:
final private class Connection<Subject: SubjectType>: ObserverType, Disposable {
...
init(parent: ConnectableObservableAdapter<Subject>, subjectObserver: Subject.Observer, lock: RecursiveLock, subscription: Disposable) {
self._parent = parent
self._subscription = subscription
self._lock = lock
self._subjectObserver = subjectObserver
}
func on(_ event: Event<Subject.Observer.Element>) {
if isFlagSet(self._disposed, 1) {
return
}
if event.isStopEvent {
self.dispose()
}
self._subjectObserver.on(event)
}
...
}
傳過(guò)來(lái)的觀察者是 self.lazySubject.asObserver()創(chuàng)建的,self.lazySubject只有一個(gè),意味著觀察者只有一個(gè),然后實(shí)現(xiàn)了on方法,on方法如下:
public func on(_ event: Event<Element>) {
#if DEBUG
self._synchronizationTracker.register(synchronizationErrorMessage: .default)
defer { self._synchronizationTracker.unregister() }
#endif
dispatch(self._synchronized_on(event), event)
}
func _synchronized_on(_ event: Event<Element>) -> Observers {
self._lock.lock(); defer { self._lock.unlock() }
switch event {
case .next:
if self._isDisposed || self._stopped {
return Observers()
}
return self._observers
case .completed, .error:
if self._stoppedEvent == nil {
self._stoppedEvent = event
self._stopped = true
let observers = self._observers
self._observers.removeAll()
return observers
}
return Observers()
}
}
后面的邏輯就是和RXSwift一模模一樣樣的了,就不再贅敘了。
最后的思考:
- 為什么
publish能實(shí)現(xiàn)只網(wǎng)絡(luò)請(qǐng)求一次?因?yàn)橛^察者只有一個(gè),所以對(duì)閉包1??的調(diào)用只有一次;(類似于有多個(gè)block實(shí)現(xiàn),但是我在調(diào)用了一次self.block();
詳細(xì)點(diǎn)解釋就是我們通過(guò)self._subjectObserver.on(event)發(fā)起了響應(yīng)--->接收的是有netOB.subscribe創(chuàng)建保存的兩個(gè)閉包,so。- 如果不用
懶加載lazySubject行不行?答案是可以的,只是用了更好,為啥子,自己探究下??。publish和connetc被多次調(diào)用會(huì)有什么影響?(懶加載起到了什么作用,connect創(chuàng)建流程分析就可以知道答案了)- 為什么
publish新創(chuàng)建的序列是publishSubject類型?見(jiàn)后面博客Subject的分析。