Combine最簡流程源碼解析

Combine

Combine是Apple出的Functional Reactive Programming (FRP)模式框架,類似很出名的RxSwift。

Combine 框架主要分成發(fā)布者 (Publisher)、操作者 (Operator)、和訂閱者 (Subscriber) 三大部分??梢酝ㄟ^這三種元素的組裝,來建立各式各樣的訂閱關(guān)系 (Subscription)。

蘋果公司創(chuàng)建了自己的FRP框架這點可以看出Combine的重要性,SwiftUI中也大量使用了Combine。

本篇的目標就是通過下面這段代碼背后的源碼來了解Combine:

先說明一下,本篇所有的代碼都是參考OpenCombine簡化后的,只保留了可以理解原理必須的代碼,類型加了前綴??

extension YYPublishers {
    func demo() {
        /// 1
        var store = [YYAnyCancellable]()
        
        /// 2
        Just("just").sink(
            receiveCompletion: { print($0) },
            receiveValue: { print($0) }
        ).store(in: &store)
    }
}

可能還是有點復雜,可以再拆一下,更方便理解:

extension YYPublishers {
    func demo() {
        /// 1
        var store = [YYAnyCancellable]()
        
        /// 2
        let just = Just("just")
        
        /// 3
        let canceller = just.sink(
            receiveCompletion: { print($0) },
            receiveValue: { print($0) }
        )
        
        /// 4
        canceller.store(in: &store)
    }
}

先簡單分析一下:

  1. 創(chuàng)建YYAnyCancellable對象數(shù)組store

  2. 創(chuàng)建只包含一個值,也就是字符串"just"的發(fā)布者

  3. 調(diào)用just的sink方法來訂閱,在它發(fā)射字符串或結(jié)束時打印出來,同時會返回一個可以提前取消訂閱關(guān)系的canceller

  4. 將canceller放入store數(shù)組中來被管理

步驟1和4很簡單,可以先不用看,后面講到Cancellable相關(guān)時再講解,我們先來看步驟2,創(chuàng)建發(fā)布者Just。

發(fā)布者(Just)

Just是一個發(fā)布者,實現(xiàn)如下:

public struct Just<Output>: YYPublisher {
    public typealias Failure = Never

    public let output: Output

    public init(_ output: Output) {
        self.output = output
    }
        
        public func receive<Subscriber: YYSubscriber>(subscriber: Subscriber)
        where Failure == Subscriber.Failure, Output == Subscriber.Input {
        subscriber.receive(
            subscription: Inner(value: output, downstream: subscriber)
        )
    }
}

看上去很簡單,就是一個實現(xiàn)YYPublisher協(xié)議的泛型結(jié)構(gòu)體,只有一個屬性,保存初始化時傳進來的那個值。

receive方法的細節(jié)先不用管,知道它提供讓訂閱者注冊的能力即可。

再來看Publisher協(xié)議:

public protocol YYPublisher {
    associatedtype Output
    associatedtype Failure: Error

    func receive<Subscriber: YYSubscriber>(subscriber: Subscriber)
        where Failure == Subscriber.Failure, Output == Subscriber.Input
}

也比較簡單,只有2個關(guān)聯(lián)類型和一個方法

  • Output:產(chǎn)生值的類型
  • Failure :失敗的錯誤類型,當發(fā)布者不產(chǎn)生錯誤時,可以使用 Never
  • receive方法:提供讓訂閱者注冊的能力

看到這里,我們對創(chuàng)建Just對象的代碼let just = Just("just"),應(yīng)該是能理解了:

簡單來說就是創(chuàng)建了一個保存字符串"just"的結(jié)構(gòu)體,同時提供了一個receive方法。

下面就繼續(xù)來看訂閱的代碼:

let canceller = just.sink(
    receiveCompletion: { print($0) },
    receiveValue: { print($0) }
)

sink方法

sink是協(xié)議YYPublisher的公開擴展方法,也就是說所有發(fā)布者都提供了基于閉包的訂閱方式:

extension YYPublisher {
    public func sink(
        receiveCompletion: @escaping (YYSubscribers.Completion<Failure>) -> Void,
        receiveValue: @escaping (Output) -> Void
    ) -> YYAnyCancellable {
        let subscriber = YYSubscribers.Sink(receiveCompletion: receiveCompletion,
                                            receiveValue: receiveValue)
        subscribe(subscriber)
        return YYAnyCancellable(subscriber)
    }
}

該方法內(nèi)部會創(chuàng)建一個YYSubscribers.Sink對象,可以看成匿名訂閱者,然后通過subscribe方法和訂閱者建立關(guān)系,再返回YYAnyCancellable對象。

這里出現(xiàn)了很多陌生的類型和方法:

  • YYAnyCancellable

  • YYSubscribers

  • YYSubscribers.Completion

  • YYSubscribers.Sink

  • subscribe

我們一個個深入,首先是YYAnyCancellable。

YYAnyCancellable

可以提前取消訂閱關(guān)系的類,實現(xiàn)如下:

public protocol YYCancellable {
    func cancel()
}

extension YYAnyCancellable {
    public func store<Cancellables: RangeReplaceableCollection>(
        in collection: inout Cancellables
    ) where Cancellables.Element == YYAnyCancellable {
        collection.append(self)
    }

    public func store(in set: inout Set<YYAnyCancellable>) {
        set.insert(self)
    }
}

public final class YYAnyCancellable: YYCancellable {
    private var _cancel: (() -> Void)?

    public init(_ cancel: @escaping () -> Void) {
        _cancel = cancel
    }
    
    public init<Other: YYCancellable>(_ canceller: Other) {
        _cancel = canceller.cancel
    }

    public func cancel() {
        _cancel?()
        _cancel = nil
    }

    deinit {
        _cancel?()
    }
}

首先是YYCancellable協(xié)議和擴展公開方法store(in:),都很簡單,就不說了。

YYAnyCancellable是實現(xiàn)YYCancellable的類,并且內(nèi)部會保存一個_cancel閉包,在外部調(diào)用cancel方法或該對象釋放時,都會被調(diào)用。

看完這里再回去看最開始的1和4,應(yīng)該是有些能理解了:

/// 1
var store = [YYAnyCancellable]()

/// 4
subscriber.store(in: &store)

另外說明一下,我們應(yīng)該經(jīng)常會看到XXX協(xié)議,然后對應(yīng)一個AnyXXX的類或結(jié)構(gòu)體,尤其XXX是泛型時,這就是協(xié)議與類型擦除,建議大家可以先了解下這點

YYSubscribers

只是個空枚舉,其實就是與YYSubscriber協(xié)議相關(guān)的類型的名稱空間,實現(xiàn)如下:

public enum YYSubscribers {}

當然還有類似的YYPublishers:

public enum YYPublishers {}

YYSubscribers.Completion

也只是個簡單的枚舉,表示正常完成或錯誤的信號,實現(xiàn)如下:

extension YYSubscribers {
    public enum Completion<Failure: Error> {
        case finished
        case failure(Failure)
    }
}

YYSubscribers.Sink

其實就是一個簡單的訂閱者,實現(xiàn)了YYSubscriber和YYCancellable協(xié)議,實現(xiàn)如下:

extension YYSubscribers {
    public final class Sink<Input, Failure: Error>: YYSubscriber, YYCancellable {
        
        private var status: YYSubscriptions.Status = .awaiting
        
        public let receiveValue: (Input) -> Void
        public let receiveCompletion: (YYSubscribers.Completion<Failure>) -> Void

        public init(
            receiveCompletion: @escaping (YYSubscribers.Completion<Failure>) -> Void,
            receiveValue: @escaping ((Input) -> Void)
        ) {
            self.receiveCompletion = receiveCompletion
            self.receiveValue = receiveValue
        }

        public func receive(subscription: YYSubscription) {
            switch status {
            case .subscribed, .terminal:
                subscription.cancel()
            case .awaiting:
                status = .subscribed(subscription)
                subscription.request(.unlimited)
            }
        }

        public func receive(_ input: Input) -> YYSubscribers.Demand {
            receiveValue(input)
            return .none
        }

        public func receive(completion: YYSubscribers.Completion<Failure>) {
            receiveCompletion(completion)
            status = .terminal
        }

        public func cancel() {
            guard case let .subscribed(subscription) = status else {
                return
            }

            subscription.cancel()
            status = .terminal
        }
    }
}

這里我們應(yīng)該只能看懂它的初始化流程:保存接收值和完成事件的2個閉包以后用。

其他的代碼都涉及到了陌生的類型:

  • YYSubscription
  • YYSubscriptions
  • YYSubscriptions.Status

沒關(guān)系,一個一個來,先看看怎么才算是訂閱者。

YYSubscriber

表示訂閱者的協(xié)議,實現(xiàn)如下:

public protocol YYSubscriber {
    associatedtype Input
    associatedtype Failure: Error

    func receive(subscription: YYSubscription)
    func receive(_ input: Input) -> YYSubscribers.Demand
    func receive(completion: YYSubscribers.Completion<Failure>)
}

關(guān)聯(lián)類型 InputFailure定義了接受的值和錯誤,

同時必須實現(xiàn)3個receive方法,表示訂閱者從開始訂閱到完成的生命周期:

  • receive(subscription: ) - 告訴訂閱者,它在發(fā)布者上被成功訂閱,可以請求值了
  • receive(_ input: ) - 告訴訂閱者,發(fā)布者產(chǎn)生值了
  • receive(completion:) - 告知訂閱者發(fā)布者已完成發(fā)布,可能是正常發(fā)布或發(fā)生錯誤

后面2個應(yīng)該很好理解,不用過多的解釋。

只有第一個函數(shù),為啥訂閱成功時接收的參數(shù)是Subscription,不應(yīng)該是Publisher么?

帶著疑問我們繼續(xù)。

YYSubscription

表示訂閱者與發(fā)布者連接的協(xié)議,實現(xiàn)如下:

public protocol YYSubscription: YYCancellable {
    func request(_ demand: YYSubscribers.Demand)
}

只有一個request方法,還是有點懵逼??!!

沒關(guān)系,至少這里我們心里應(yīng)該有一個概念,訂閱者和發(fā)布者之間有一個Subscription來處理2者的邏輯。

繼續(xù)看下參數(shù)YYSubscribers.Demand

YYSubscribers.Demand

表示請求的項目數(shù),實現(xiàn)如下:

extension YYSubscribers {
    public struct Demand {
        internal let rawValue: UInt

        internal init(rawValue: UInt) {
            self.rawValue = min(UInt(Int.max) + 1, rawValue)
        }

        public static var unlimited: Demand {
            return Demand(rawValue: .max)
        }

        public static var none: Demand { return .max(0) }

        public static func max(_ value: Int) -> Demand {
            precondition(value >= 0, "demand cannot be negative")
            return Demand(rawValue: UInt(value))
        }
}

其實就是可以保存數(shù)量的結(jié)構(gòu)體,還提供了unlimited,none,max(數(shù)量)的靜態(tài)方式,用來限制訂閱者可以獲得的值。

YYSubscriptions

與YYSubscription協(xié)議相關(guān)的類型的名稱空間,也是個空枚舉:

public enum YYSubscriptions {}

YYSubscriptions.Status

表示訂閱狀態(tài)的枚舉:

extension YYSubscriptions {
    internal enum Status {
        case awaiting
        case subscribed(YYSubscription)
        case terminal
    }
}
  • awaiting - 等待訂閱
  • subscribed - 和某個Subscription建立了聯(lián)系
  • terminal - 訂閱結(jié)束

了解了Subscription相關(guān)的含義,現(xiàn)在我們可以回到訂閱者YYSubscribers.Sink生命周期相關(guān)的實現(xiàn)了。

YYSubscribers.Sink生命周期

去掉了初始化相關(guān)的代碼:

extension YYSubscribers {
    public final class Sink<Input, Failure: Error>: YYSubscriber, YYCancellable {
        
        private var status: YYSubscriptions.Status = .awaiting

        public func receive(subscription: YYSubscription) {
            switch status {
            case .subscribed, .terminal:
                subscription.cancel()
            case .awaiting:
                status = .subscribed(subscription)
                subscription.request(.unlimited)
            }
        }

        public func receive(_ input: Input) -> YYSubscribers.Demand {
            receiveValue(input)
            return .none
        }

        public func receive(completion: YYSubscribers.Completion<Failure>) {
            receiveCompletion(completion)
            status = .terminal
        }

        public func cancel() {
            guard case let .subscribed(subscription) = status else {
                return
            }

            subscription.cancel()
            status = .terminal
        }
    }
}
  1. 訂閱者內(nèi)部維護了一個表示當前狀態(tài)的屬性status,開始是awaiting
  2. receive(subscription: ):訂閱成功時
    1. 檢查狀態(tài),如果ok,就將狀態(tài)置為subscribed并保存subscription
    2. 然后向subscription發(fā)起request請求值
  3. receive(_ input: ):收到值,調(diào)用保存的對應(yīng)閉包
  4. receive(completion:):收到完成事件,調(diào)用保存的對應(yīng)閉包,并將狀態(tài)置為terminal
  5. cancel():被取消時如果是已經(jīng)訂閱狀態(tài),調(diào)用subscription的cancel方法,并將狀態(tài)置為terminal

省略了一些小細節(jié),不過這下我們對訂閱者就應(yīng)該有比較清晰的了解了,下面繼續(xù)看訂閱關(guān)系如何產(chǎn)生的。

subscribe方法

回到上面sink的實現(xiàn),來看看subscribe方法的流程:

extension YYPublisher {
    public func sink(
        receiveCompletion: @escaping (YYSubscribers.Completion<Failure>) -> Void,
        receiveValue: @escaping (Output) -> Void
    ) -> YYAnyCancellable {
        let subscriber = YYSubscribers.Sink(receiveCompletion: receiveCompletion,
                                            receiveValue: receiveValue)
        subscribe(subscriber)
        return YYAnyCancellable(subscriber)
    }
}

subscribe是協(xié)議YYPublisher的公開方法,作用是告訴發(fā)布者有人對你感興趣,訂閱你了,快開始干活吧~

可以看到內(nèi)部會調(diào)用具體對應(yīng)發(fā)布者對象的receive方法,也就是YYPublisher協(xié)議規(guī)定必須的方法:

extension YYPublisher {
    public func subscribe<Subscriber: YYSubscriber>(_ subscriber: Subscriber)
        where Failure == Subscriber.Failure, Output == Subscriber.Input {
        receive(subscriber: subscriber)
    }
}

這里的Publisher是Just,看看receive的具體實現(xiàn):

public struct Just<Output>: YYPublisher {
        public func receive<Subscriber: YYSubscriber>(subscriber: Subscriber)
        where Failure == Subscriber.Failure, Output == Subscriber.Input {
        subscriber.receive(
            subscription: Inner(value: output, downstream: subscriber)
        )
    }
}

可以看到receive方法做了2件事:

  1. 創(chuàng)建作為subscription的Inner對象
  2. 調(diào)用訂閱者生命周期方法receive(subscription: ),并將1創(chuàng)建的subscription傳遞過去

現(xiàn)在我們對訂閱者,發(fā)布者如何產(chǎn)生訂閱關(guān)系應(yīng)該了解了,不過最后還有一個問題,訂閱者是怎么收到值和完成事件的呢?

這里到調(diào)用receive方法就結(jié)束了,那答案應(yīng)該在里面。

還記得這里的訂閱者subscriber是誰吧,就是前面的YYSubscribers.Sink對象,回到它的receive(subscription: )再看看:

public func receive(subscription: YYSubscription) {
      switch status {
      case .subscribed, .terminal:
          subscription.cancel()
      case .awaiting:
          status = .subscribed(subscription)
          subscription.request(.unlimited)
      }
  }

正常情況下,最后會向subscription發(fā)起request請求值。

而這里的subscription,就是Inner(value: output, downstream: subscriber)對象了:

subscriber.receive(
    subscription: Inner(value: output, downstream: subscriber)
)

最后來看看這個Inner。

Just.Inner

處理訂閱者和發(fā)布者Just之間關(guān)系的Subscription,實現(xiàn)如下:

extension YYPublishers.Just {
    final class Inner<Downstream: YYSubscriber>: YYSubscription,
        where Downstream.Input == Output {
        
        private var downstream: Downstream?
        private let value: Output
        
        fileprivate init(value: Output, downstream: Downstream) {
            self.downstream = downstream
            self.value = value
        }
        
        func cancel() {
            downstream = nil
        }
        
        func request(_ demand: YYSubscribers.Demand) {
            demand.assertNonZero()
            guard let downstream = self.downstream else { return }
            self.downstream = nil
            _ = downstream.receive(value)
            downstream.receive(completion: .finished)
        }
    }
}
  • 2有個屬性
    • downstream:關(guān)聯(lián)的訂閱者,會強引用
    • value:發(fā)布者的值
  • request方法:
    1. 校驗demand合法性
    2. 如果有訂閱者,釋放并調(diào)用訂閱者的receive(value)和receive(completion)方法

然后訂閱者在這2個生命周期方法里面會調(diào)用我們步驟3中的2個閉包參數(shù)receiveValue和receiveCompletion:

/// 3
let subscriber = just.sink(
    receiveCompletion: { print($0) },
    receiveValue: { print($0) }
)

這樣整個流程就總算是走完了??!??!

總結(jié)

整個流程有些復雜,最后再來總結(jié)一下發(fā)布者和訂閱者是如何聯(lián)系在一起的。

使用sink方式訂閱某個發(fā)布者時:

  • 它會在內(nèi)部創(chuàng)建匿名訂閱者,再創(chuàng)建訂閱對象,并將訂閱對象的引用傳遞給訂閱者。
  • 然后,訂閱者將向訂閱對象請求多個值,再接收這些值。
  • 最后,訂閱對象發(fā)送完成事件,結(jié)束。

還是看圖吧:

image

訂閱對象的工作是在發(fā)布者和訂閱者之間進行調(diào)解,并確保訂閱者獲得的值不超過其請求的值,同時還負責保留和釋放訂閱者。

理解了這3者的關(guān)系以及實現(xiàn)原理,可以讓我們使用Combine框架更加得心應(yīng)手,更進一步自定義發(fā)布者,訂閱對象和訂閱者。

當然,上面只是Combine框架最基本流程的分析,還有常用的Subject,Operator等著我們~~~

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容