Combine
Combine是Apple出的Functional Reactive Programming (FRP)模式框架,類似很出名的RxSwift。
Combine 框架主要分成發(fā)布者 (Publisher)、操作者 (Operator)、和訂閱者 (Subscriber) 三大部分??梢酝ㄟ^這三種元素的組裝,來建立各式各樣的訂閱關(guān)系 (Subscription)。
蘋果公司創(chuàng)建了自己的FRP框架這點可以看出Combine的重要性,SwiftUI中也大量使用了Combine。
本篇的目標就是通過下面這段代碼背后的源碼來了解Combine:
先說明一下,本篇所有的代碼都是參考OpenCombine簡化后的,只保留了可以理解原理必須的代碼,類型加了前綴??
extension YYPublishers {
func demo() {
/// 1
var store = [YYAnyCancellable]()
/// 2
Just("just").sink(
receiveCompletion: { print($0) },
receiveValue: { print($0) }
).store(in: &store)
}
}
可能還是有點復雜,可以再拆一下,更方便理解:
extension YYPublishers {
func demo() {
/// 1
var store = [YYAnyCancellable]()
/// 2
let just = Just("just")
/// 3
let canceller = just.sink(
receiveCompletion: { print($0) },
receiveValue: { print($0) }
)
/// 4
canceller.store(in: &store)
}
}
先簡單分析一下:
創(chuàng)建YYAnyCancellable對象數(shù)組store
創(chuàng)建只包含一個值,也就是字符串"just"的發(fā)布者
調(diào)用just的sink方法來訂閱,在它發(fā)射字符串或結(jié)束時打印出來,同時會返回一個可以提前取消訂閱關(guān)系的canceller
將canceller放入store數(shù)組中來被管理
步驟1和4很簡單,可以先不用看,后面講到Cancellable相關(guān)時再講解,我們先來看步驟2,創(chuàng)建發(fā)布者Just。
發(fā)布者(Just)
Just是一個發(fā)布者,實現(xiàn)如下:
public struct Just<Output>: YYPublisher {
public typealias Failure = Never
public let output: Output
public init(_ output: Output) {
self.output = output
}
public func receive<Subscriber: YYSubscriber>(subscriber: Subscriber)
where Failure == Subscriber.Failure, Output == Subscriber.Input {
subscriber.receive(
subscription: Inner(value: output, downstream: subscriber)
)
}
}
看上去很簡單,就是一個實現(xiàn)YYPublisher協(xié)議的泛型結(jié)構(gòu)體,只有一個屬性,保存初始化時傳進來的那個值。
receive方法的細節(jié)先不用管,知道它提供讓訂閱者注冊的能力即可。
再來看Publisher協(xié)議:
public protocol YYPublisher {
associatedtype Output
associatedtype Failure: Error
func receive<Subscriber: YYSubscriber>(subscriber: Subscriber)
where Failure == Subscriber.Failure, Output == Subscriber.Input
}
也比較簡單,只有2個關(guān)聯(lián)類型和一個方法
- Output:產(chǎn)生值的類型
- Failure :失敗的錯誤類型,當發(fā)布者不產(chǎn)生錯誤時,可以使用
Never - receive方法:提供讓訂閱者注冊的能力
看到這里,我們對創(chuàng)建Just對象的代碼let just = Just("just"),應(yīng)該是能理解了:
簡單來說就是創(chuàng)建了一個保存字符串"just"的結(jié)構(gòu)體,同時提供了一個receive方法。
下面就繼續(xù)來看訂閱的代碼:
let canceller = just.sink(
receiveCompletion: { print($0) },
receiveValue: { print($0) }
)
sink方法
sink是協(xié)議YYPublisher的公開擴展方法,也就是說所有發(fā)布者都提供了基于閉包的訂閱方式:
extension YYPublisher {
public func sink(
receiveCompletion: @escaping (YYSubscribers.Completion<Failure>) -> Void,
receiveValue: @escaping (Output) -> Void
) -> YYAnyCancellable {
let subscriber = YYSubscribers.Sink(receiveCompletion: receiveCompletion,
receiveValue: receiveValue)
subscribe(subscriber)
return YYAnyCancellable(subscriber)
}
}
該方法內(nèi)部會創(chuàng)建一個YYSubscribers.Sink對象,可以看成匿名訂閱者,然后通過subscribe方法和訂閱者建立關(guān)系,再返回YYAnyCancellable對象。
這里出現(xiàn)了很多陌生的類型和方法:
YYAnyCancellable
YYSubscribers
YYSubscribers.Completion
YYSubscribers.Sink
subscribe
我們一個個深入,首先是YYAnyCancellable。
YYAnyCancellable
可以提前取消訂閱關(guān)系的類,實現(xiàn)如下:
public protocol YYCancellable {
func cancel()
}
extension YYAnyCancellable {
public func store<Cancellables: RangeReplaceableCollection>(
in collection: inout Cancellables
) where Cancellables.Element == YYAnyCancellable {
collection.append(self)
}
public func store(in set: inout Set<YYAnyCancellable>) {
set.insert(self)
}
}
public final class YYAnyCancellable: YYCancellable {
private var _cancel: (() -> Void)?
public init(_ cancel: @escaping () -> Void) {
_cancel = cancel
}
public init<Other: YYCancellable>(_ canceller: Other) {
_cancel = canceller.cancel
}
public func cancel() {
_cancel?()
_cancel = nil
}
deinit {
_cancel?()
}
}
首先是YYCancellable協(xié)議和擴展公開方法store(in:),都很簡單,就不說了。
YYAnyCancellable是實現(xiàn)YYCancellable的類,并且內(nèi)部會保存一個_cancel閉包,在外部調(diào)用cancel方法或該對象釋放時,都會被調(diào)用。
看完這里再回去看最開始的1和4,應(yīng)該是有些能理解了:
/// 1
var store = [YYAnyCancellable]()
/// 4
subscriber.store(in: &store)
另外說明一下,我們應(yīng)該經(jīng)常會看到XXX協(xié)議,然后對應(yīng)一個AnyXXX的類或結(jié)構(gòu)體,尤其XXX是泛型時,這就是協(xié)議與類型擦除,建議大家可以先了解下這點
YYSubscribers
只是個空枚舉,其實就是與YYSubscriber協(xié)議相關(guān)的類型的名稱空間,實現(xiàn)如下:
public enum YYSubscribers {}
當然還有類似的YYPublishers:
public enum YYPublishers {}
YYSubscribers.Completion
也只是個簡單的枚舉,表示正常完成或錯誤的信號,實現(xiàn)如下:
extension YYSubscribers {
public enum Completion<Failure: Error> {
case finished
case failure(Failure)
}
}
YYSubscribers.Sink
其實就是一個簡單的訂閱者,實現(xiàn)了YYSubscriber和YYCancellable協(xié)議,實現(xiàn)如下:
extension YYSubscribers {
public final class Sink<Input, Failure: Error>: YYSubscriber, YYCancellable {
private var status: YYSubscriptions.Status = .awaiting
public let receiveValue: (Input) -> Void
public let receiveCompletion: (YYSubscribers.Completion<Failure>) -> Void
public init(
receiveCompletion: @escaping (YYSubscribers.Completion<Failure>) -> Void,
receiveValue: @escaping ((Input) -> Void)
) {
self.receiveCompletion = receiveCompletion
self.receiveValue = receiveValue
}
public func receive(subscription: YYSubscription) {
switch status {
case .subscribed, .terminal:
subscription.cancel()
case .awaiting:
status = .subscribed(subscription)
subscription.request(.unlimited)
}
}
public func receive(_ input: Input) -> YYSubscribers.Demand {
receiveValue(input)
return .none
}
public func receive(completion: YYSubscribers.Completion<Failure>) {
receiveCompletion(completion)
status = .terminal
}
public func cancel() {
guard case let .subscribed(subscription) = status else {
return
}
subscription.cancel()
status = .terminal
}
}
}
這里我們應(yīng)該只能看懂它的初始化流程:保存接收值和完成事件的2個閉包以后用。
其他的代碼都涉及到了陌生的類型:
- YYSubscription
- YYSubscriptions
- YYSubscriptions.Status
沒關(guān)系,一個一個來,先看看怎么才算是訂閱者。
YYSubscriber
表示訂閱者的協(xié)議,實現(xiàn)如下:
public protocol YYSubscriber {
associatedtype Input
associatedtype Failure: Error
func receive(subscription: YYSubscription)
func receive(_ input: Input) -> YYSubscribers.Demand
func receive(completion: YYSubscribers.Completion<Failure>)
}
關(guān)聯(lián)類型 Input 和 Failure定義了接受的值和錯誤,
同時必須實現(xiàn)3個receive方法,表示訂閱者從開始訂閱到完成的生命周期:
- receive(subscription: ) - 告訴訂閱者,它在發(fā)布者上被成功訂閱,可以請求值了
- receive(_ input: ) - 告訴訂閱者,發(fā)布者產(chǎn)生值了
- receive(completion:) - 告知訂閱者發(fā)布者已完成發(fā)布,可能是正常發(fā)布或發(fā)生錯誤
后面2個應(yīng)該很好理解,不用過多的解釋。
只有第一個函數(shù),為啥訂閱成功時接收的參數(shù)是Subscription,不應(yīng)該是Publisher么?
帶著疑問我們繼續(xù)。
YYSubscription
表示訂閱者與發(fā)布者連接的協(xié)議,實現(xiàn)如下:
public protocol YYSubscription: YYCancellable {
func request(_ demand: YYSubscribers.Demand)
}
只有一個request方法,還是有點懵逼??!!
沒關(guān)系,至少這里我們心里應(yīng)該有一個概念,訂閱者和發(fā)布者之間有一個Subscription來處理2者的邏輯。
繼續(xù)看下參數(shù)YYSubscribers.Demand
YYSubscribers.Demand
表示請求的項目數(shù),實現(xiàn)如下:
extension YYSubscribers {
public struct Demand {
internal let rawValue: UInt
internal init(rawValue: UInt) {
self.rawValue = min(UInt(Int.max) + 1, rawValue)
}
public static var unlimited: Demand {
return Demand(rawValue: .max)
}
public static var none: Demand { return .max(0) }
public static func max(_ value: Int) -> Demand {
precondition(value >= 0, "demand cannot be negative")
return Demand(rawValue: UInt(value))
}
}
其實就是可以保存數(shù)量的結(jié)構(gòu)體,還提供了unlimited,none,max(數(shù)量)的靜態(tài)方式,用來限制訂閱者可以獲得的值。
YYSubscriptions
與YYSubscription協(xié)議相關(guān)的類型的名稱空間,也是個空枚舉:
public enum YYSubscriptions {}
YYSubscriptions.Status
表示訂閱狀態(tài)的枚舉:
extension YYSubscriptions {
internal enum Status {
case awaiting
case subscribed(YYSubscription)
case terminal
}
}
- awaiting - 等待訂閱
- subscribed - 和某個Subscription建立了聯(lián)系
- terminal - 訂閱結(jié)束
了解了Subscription相關(guān)的含義,現(xiàn)在我們可以回到訂閱者YYSubscribers.Sink生命周期相關(guān)的實現(xiàn)了。
YYSubscribers.Sink生命周期
去掉了初始化相關(guān)的代碼:
extension YYSubscribers {
public final class Sink<Input, Failure: Error>: YYSubscriber, YYCancellable {
private var status: YYSubscriptions.Status = .awaiting
public func receive(subscription: YYSubscription) {
switch status {
case .subscribed, .terminal:
subscription.cancel()
case .awaiting:
status = .subscribed(subscription)
subscription.request(.unlimited)
}
}
public func receive(_ input: Input) -> YYSubscribers.Demand {
receiveValue(input)
return .none
}
public func receive(completion: YYSubscribers.Completion<Failure>) {
receiveCompletion(completion)
status = .terminal
}
public func cancel() {
guard case let .subscribed(subscription) = status else {
return
}
subscription.cancel()
status = .terminal
}
}
}
- 訂閱者內(nèi)部維護了一個表示當前狀態(tài)的屬性status,開始是awaiting
- receive(subscription: ):訂閱成功時
- 檢查狀態(tài),如果ok,就將狀態(tài)置為subscribed并保存subscription
- 然后向subscription發(fā)起request請求值
- receive(_ input: ):收到值,調(diào)用保存的對應(yīng)閉包
- receive(completion:):收到完成事件,調(diào)用保存的對應(yīng)閉包,并將狀態(tài)置為terminal
- cancel():被取消時如果是已經(jīng)訂閱狀態(tài),調(diào)用subscription的cancel方法,并將狀態(tài)置為terminal
省略了一些小細節(jié),不過這下我們對訂閱者就應(yīng)該有比較清晰的了解了,下面繼續(xù)看訂閱關(guān)系如何產(chǎn)生的。
subscribe方法
回到上面sink的實現(xiàn),來看看subscribe方法的流程:
extension YYPublisher {
public func sink(
receiveCompletion: @escaping (YYSubscribers.Completion<Failure>) -> Void,
receiveValue: @escaping (Output) -> Void
) -> YYAnyCancellable {
let subscriber = YYSubscribers.Sink(receiveCompletion: receiveCompletion,
receiveValue: receiveValue)
subscribe(subscriber)
return YYAnyCancellable(subscriber)
}
}
subscribe是協(xié)議YYPublisher的公開方法,作用是告訴發(fā)布者有人對你感興趣,訂閱你了,快開始干活吧~
可以看到內(nèi)部會調(diào)用具體對應(yīng)發(fā)布者對象的receive方法,也就是YYPublisher協(xié)議規(guī)定必須的方法:
extension YYPublisher {
public func subscribe<Subscriber: YYSubscriber>(_ subscriber: Subscriber)
where Failure == Subscriber.Failure, Output == Subscriber.Input {
receive(subscriber: subscriber)
}
}
這里的Publisher是Just,看看receive的具體實現(xiàn):
public struct Just<Output>: YYPublisher {
public func receive<Subscriber: YYSubscriber>(subscriber: Subscriber)
where Failure == Subscriber.Failure, Output == Subscriber.Input {
subscriber.receive(
subscription: Inner(value: output, downstream: subscriber)
)
}
}
可以看到receive方法做了2件事:
- 創(chuàng)建作為subscription的Inner對象
- 調(diào)用訂閱者生命周期方法receive(subscription: ),并將1創(chuàng)建的subscription傳遞過去
現(xiàn)在我們對訂閱者,發(fā)布者如何產(chǎn)生訂閱關(guān)系應(yīng)該了解了,不過最后還有一個問題,訂閱者是怎么收到值和完成事件的呢?
這里到調(diào)用receive方法就結(jié)束了,那答案應(yīng)該在里面。
還記得這里的訂閱者subscriber是誰吧,就是前面的YYSubscribers.Sink對象,回到它的receive(subscription: )再看看:
public func receive(subscription: YYSubscription) {
switch status {
case .subscribed, .terminal:
subscription.cancel()
case .awaiting:
status = .subscribed(subscription)
subscription.request(.unlimited)
}
}
正常情況下,最后會向subscription發(fā)起request請求值。
而這里的subscription,就是Inner(value: output, downstream: subscriber)對象了:
subscriber.receive(
subscription: Inner(value: output, downstream: subscriber)
)
最后來看看這個Inner。
Just.Inner
處理訂閱者和發(fā)布者Just之間關(guān)系的Subscription,實現(xiàn)如下:
extension YYPublishers.Just {
final class Inner<Downstream: YYSubscriber>: YYSubscription,
where Downstream.Input == Output {
private var downstream: Downstream?
private let value: Output
fileprivate init(value: Output, downstream: Downstream) {
self.downstream = downstream
self.value = value
}
func cancel() {
downstream = nil
}
func request(_ demand: YYSubscribers.Demand) {
demand.assertNonZero()
guard let downstream = self.downstream else { return }
self.downstream = nil
_ = downstream.receive(value)
downstream.receive(completion: .finished)
}
}
}
- 2有個屬性
- downstream:關(guān)聯(lián)的訂閱者,會強引用
- value:發(fā)布者的值
- request方法:
- 校驗demand合法性
- 如果有訂閱者,釋放并調(diào)用訂閱者的receive(value)和receive(completion)方法
然后訂閱者在這2個生命周期方法里面會調(diào)用我們步驟3中的2個閉包參數(shù)receiveValue和receiveCompletion:
/// 3
let subscriber = just.sink(
receiveCompletion: { print($0) },
receiveValue: { print($0) }
)
這樣整個流程就總算是走完了??!??!
總結(jié)
整個流程有些復雜,最后再來總結(jié)一下發(fā)布者和訂閱者是如何聯(lián)系在一起的。
使用sink方式訂閱某個發(fā)布者時:
- 它會在內(nèi)部創(chuàng)建匿名訂閱者,再創(chuàng)建訂閱對象,并將訂閱對象的引用傳遞給訂閱者。
- 然后,訂閱者將向訂閱對象請求多個值,再接收這些值。
- 最后,訂閱對象發(fā)送完成事件,結(jié)束。
還是看圖吧:

訂閱對象的工作是在發(fā)布者和訂閱者之間進行調(diào)解,并確保訂閱者獲得的值不超過其請求的值,同時還負責保留和釋放訂閱者。
理解了這3者的關(guān)系以及實現(xiàn)原理,可以讓我們使用Combine框架更加得心應(yīng)手,更進一步自定義發(fā)布者,訂閱對象和訂閱者。
當然,上面只是Combine框架最基本流程的分析,還有常用的Subject,Operator等著我們~~~