內(nèi)存管理總是開發(fā)過程中難以繞開的問題, 在使用 RxSwift 的過程中, 避免不了寫各種 .disposed(by: disposeBag) 來管理內(nèi)存的釋放時機. 那么究竟管理的是哪些對象的釋放, 不寫又會造成什么問題呢?
在探究這個問題之前, 我們先按照 RxSwift 的接口定義實現(xiàn)一套事件源&觀察者. 對比一下在內(nèi)存管理上跟 RxSwift 有何區(qū)別.
struct Observer {
func on() {
print("Helo")
}
}
struct Observable {
let subscription: (Observer) -> ()
init(_ subscription: @escaping (Observer) -> ()) {
self.subscription = subscription
}
func subscribe(_ observer: Observer) {
subscription(observer)
}
}
func heloFunc() {
let observer = Observer()
let observable = Observable { (observer) in
observer.on()
}
observable.subscribe(observer)
}
示例代碼刪除了事件類型, observable 對象無參調(diào)用 observer 的 on 方法. observer 也不區(qū)分事件類型, 響應事件打印 "Helo". 我們執(zhí)行 heloFunc, 控制臺輸出 Helo.
結構體(跟 RxSwift 保持一致, 所以先不使用 Class)沒有 deinit 方法, 為了便于觀察生命周期, 引入一個 Delete 類, 該類在釋放的時候輸出 log 信息. 同時為 Observable 跟 Observer 添加 Delete 屬性.
class Delete {
let deinitCallBack: () -> ()
init(_ onDeinit: @escaping () -> ()) {
deinitCallBack = onDeinit
}
deinit {
deinitCallBack()
}
}
struct Observer {
let delete = Delete {
print("Observer deinit")
} ...
}
struct Observable {
let delete = Delete {
print("Observable deinit")
} ...
}
這個時候再次執(zhí)行 heloFunc , 可以得到如下輸出: Helo Observable deinit Observer deinit
可以看到由于出了作用域, Observable, Observer 對象都被釋放.
接下來我們再使用同樣的方式對 RxSwift 進行測試, 由于 RxSwift 調(diào)用層級較深, 無法確定某些類是否只初始化一次, 因此我們在 init 方法里同樣添加日志信息.
let observer = AnyObserver<Void> { (_) in
print("on")
}
let observable = Observable<Void>.create { observer in
observer.on(Event.next(()))
return Disposables.create()
}
observable.subscribe(observer)
// 省略添加 Log 的代碼
運行代碼, 得到輸出: AnyObserver init AnonymousObservable init AnyObserver init on AnyObserver deinit AnonymousObservable deinit
可以看到 AnyObserver 生成兩次卻只釋放一次, 對比我們自己實現(xiàn)的事件源&觀察者, 在出了作用域之外, 依然存在對象生命周期沒有結束, 導致內(nèi)存泄漏.
引用循環(huán)
示例代碼中只顯式生成一個 AnonymousObservable 對象以及一個 AnyObserver 對象, 所以問題應該在于 subscribe 操作生成了另一個 AnyObserver 對象, 并對兩個 AnyObserver 對象其中的一個形成了循環(huán)引用.
override func subscribe<O : ObserverType>(_ observer: O) -> Disposable where O.E == Element {
if !CurrentThreadScheduler.isScheduleRequired {
// The returned disposable needs to release all references once it was disposed.
let disposer = SinkDisposer()
let sinkAndSubscription = run(observer, cancel: disposer)
disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
return disposer
}
else {
return CurrentThreadScheduler.instance.schedule(()) { _ in
let disposer = SinkDisposer()
let sinkAndSubscription = self.run(observer, cancel: disposer)
disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
return disposer
}
}
}
CurrentThreadScheduler 是用來保證線程安全的類, 我們先忽略它.
可以看到生成了一個 SinkDisposer 對象, 并且向該對象注入 sinkAndSubscription 的兩個屬性. 我們暫且不管 sinkAndSubscription 的類型, 將該持有關系記錄下來.
SinkDisposer -> sinkAndSubscription.sink
SinkDisposer -> sinkAndSubscription.subscription
接下來到了 run 函數(shù)的實現(xiàn), 該函數(shù)的入?yún)橛^察者 observer 以及剛剛創(chuàng)建的 SinkDisposer.
override func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element {
let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
let subscription = sink.run(self)
return (sink: sink, subscription: subscription)
}
可以看到 AnonymousObservableSink 持有了傳入的 observer 以及 SinkDisposer, 并且作為 sinkAndSubscription.sink 返回到上層. 用有向圖記錄該持有關系:

SinkDisposer 與 AnonymousObservableSink 構成循環(huán)引用, 從而導致 Observer 無法釋放!
解決方案
回想起 Closure 循環(huán)引用的解決方案, 無非就兩種: 弱引用以及手動打破持有關系. SinkDisposer 本身是作為 subscribe 的返回值返回到上層的, 所以最簡單的方式當然是持有該 SinkDisposer, 在合適的方法調(diào)用 dispose.
該對象的 dispose 方法對 AnonymousObservableSink 置 nil, 從而手動打破了持有關系.
let diposer = observable.subscribe(observer)
disposer.disposed()
DisposableBag
然而這就需要用戶去關心 observer 什么時候不再需要監(jiān)聽事件. 其實我們的需求只是不要造成內(nèi)存泄漏, 完全可以接受內(nèi)存的釋放不那么及時. 因此 RxSwift 還提供了一個類 DisposeBag.
DisposeBag 就是一個 Disposable 對象的集合, 可以將訂閱產(chǎn)生的 Disposable 對象加入到 DisposeBag 中, 由其統(tǒng)一在特定時機調(diào)用 dispose. 通過觀察代碼可以發(fā)現(xiàn), 假如用戶不手動清理 DisposeBag, 那么它將在 deinit 的時候 對所有 Disposable 對象調(diào)用 dispose.
deinit {
dispose()
}
private func dispose() {
let oldDisposables = _dispose()
for disposable in oldDisposables {
disposable.dispose()
}
}
onError/onCompleted
此外, 事件源內(nèi)部也可以通過事件來管理 observer 的生命周期, 任意 .error/.Complete 事件都將觸發(fā) dispose.
在這個例子中是通過 AnonymousObservableSink 來實現(xiàn)這個功能的, Sink 可以理解為事件的管道, 所有發(fā)送的事件都是通過 Sink 轉(zhuǎn)發(fā)給真正的 observer. AnonymousObservableSink 只要攔截到 .error/.Complete 事件則會觸發(fā)自身的 dispose, 從而觸發(fā) SinkDisposer 打破循環(huán)引用.
func on(_ event: Event<E>) {
...
case .error, .completed:
if AtomicCompareAndSwap(0, 1, &_isStopped) {
forwardOn(event)
dispose()
}
}
}
TakeUntil
TakeUntil 則是將事件的生命周期跟另一個事件源相綁定, 只要另一個事件源發(fā)出.next/.error事件, 則觸發(fā) dispose.
RxSwift 使用 TakeUntilSink 這種類型的管道實現(xiàn)這個功能, TakeUntilSink 在為正常事件流提供事件轉(zhuǎn)發(fā)功能的同時, 創(chuàng)建了另一個觀察者 TakeUntilSinkOther, 該觀察者對 TakeUntil 的參數(shù)進行觀察, 一旦接收到.next/.error事件, 則對 TakeUntilSink 進行 dispose, 從而打破循環(huán)引用.
需要注意的是另一個事件源發(fā)出的.complete 事件并不會觸發(fā) dispose, 反而會造成 TakeUntilSinkOther 被釋放, 失去生命周期綁定的效果. 因此, 除非對事件源的事件產(chǎn)生順序非常有把握, 或者是為了實現(xiàn)一些特定的需求, 一般不推薦單純使用 TakeUntil 做內(nèi)存管理.