Swift Concurrency框架之Async Squences/Streams

文章系列:

Async Squences/Streams

在介紹Concurrency Framework中的Async Squences/Streams,我們先回顧一下swift的集合中的 Sequence和Iterators。

Swift集合中的Sqeuence

swift集合中的Sequence是一系列相同類型值的集合,并提供了對這些值的迭代能力。

for element in someSequence {
    doSomething(with: element)
}

Sequence 協(xié)議的定義:

protocol Sequence {
    associatedtype Iterator: IteratorProtocol
    func makeIterator() -> Iterator
}

Sequence 協(xié)議需要實現(xiàn)makeIterator方法,并返回一個Iterator,Iterator遵循IteratorProtocol:

public protocol IteratorProtocol {
    associatedtype Element
    public mutating func next() -> Self.Element?
}

IteratorProtocol需要實現(xiàn)next方法,返回存儲的值對象。當沒有下一個元素返回nil。
我們以下載一系列的url的任務為例子,使用同步Squence方式:

struct RemoteDataSequence: Sequence {
    var urls: [URL]

    func makeIterator() -> RemoteDataIterator {
        RemoteDataIterator(urls: urls)
    }
}

為了返回數(shù)據(jù),我們需要實現(xiàn)RemoteDataIterator類型,設計上我們使用index跟蹤下一個待下載的urls數(shù)組索引

struct RemoteDataIterator: IteratorProtocol {
    var urls: [URL]
    fileprivate var index = 0

    mutating func next() -> Data? {
        guard index < urls.count else {
            return nil
        }

        let url = urls[index]
        index += 1

        // If a download fails, we simply move on to
        // the next URL in this case:
        guard let data = try? Data(contentsOf: url) else {
            return next()
        }

        return data
    }
}

我們現(xiàn)在可以通過for循環(huán)來遍歷訪問下載的所有圖片數(shù)據(jù)

for data in RemoteDataSequence(urls: urls) {
    ...
}

雖然我們通過Sqeuence實現(xiàn)了一個簡潔的批量下載器,但是批量下載使用同步的方式顯然比較難于接受,這樣會完全阻塞線程。接下來我們通過使用asynchronous sequence來達到我們的要求。

Asynchronous iterations

Swift 5.5中Concurrency為了方便并行任務的開發(fā),提供了AsyncSequence,使用方式類似同步版本的Sequence。針對批量下載器我們可以這樣改造一下:

struct RemoteDataSequence: AsyncSequence {
    typealias Element = Data

    var urls: [URL]

    func makeAsyncIterator() -> RemoteDataIterator {
        RemoteDataIterator(urls: urls)
    }
}

AsyncSequence重要實現(xiàn)其實是在RemoteDataIterator中,Concurrency為RemoteDataIterator的next返回方法添加了async。通過URLSession的async-wait API,我們可以異步下載我們的數(shù)據(jù):

struct RemoteDataIterator: AsyncIteratorProtocol {
    var urls: [URL]
    fileprivate var urlSession = URLSession.shared
    fileprivate var index = 0

    mutating func next() async throws -> Data? {
        guard index < urls.count else {
            return nil
        }

        let url = urls[index]
        index += 1

        let (data, _) = try await urlSession.data(from: url)
        return data
    }
}

通過AsyncSequence的改造,現(xiàn)在我們的批量下載器已經(jīng)可以全異步執(zhí)行,不過在我們訪問數(shù)據(jù)時還是需要調(diào)用awaittry,數(shù)據(jù)會通過后臺線程下載并允許我們使用for循環(huán)來遍歷訪問

for try await data in RemoteDataSequence(urls: urls) {
    ...
}

在for循環(huán)中,如果一個步驟拋出了異常則循環(huán)會中止,這樣有利于簡化異常捕獲的處理。如果不想要異常導致循環(huán)中斷,也可以實現(xiàn)無異常的方法。

Asynchronous streams

通過實現(xiàn)AsyncIteratorProtocol有時候還是稍嫌麻煩(需要自定義 AsyncIteratorProtocol 的類型),Concurrency提供了AsyncStreamAsyncThrowingStream。在AsyncStreamAsyncThrowingStream構(gòu)造閉包函數(shù)中,需要使用Task來執(zhí)行異步任務,使用yield方法來返回數(shù)據(jù),同時調(diào)用finish來告知是否存在異常。上面的例子可以改造為:

func remoteDataStream(
    forURLs urls: [URL],
    urlSession: URLSession = .shared
) -> AsyncThrowingStream<Data, Error> {
    AsyncThrowingStream { continuation in
        Task {
            do {
                for url in urls {
                    let (data, _) = try await urlSession.data(from: url)
                    continuation.yield(data)
                }

                continuation.finish(throwing: nil)
            } catch {
                continuation.finish(throwing: error)
            }
        }
    }
}

現(xiàn)在我們可以同樣使用for來遍歷我們的下載數(shù)據(jù):

for try await data in remoteDataStream(forURLs: urls) {
    ...
}

AsyncStreamAsyncThrowingStream可以認為是AsyncSequence協(xié)議的具體實現(xiàn),相當于ArraySequence的具體實現(xiàn)。在開發(fā)中使用stream可以簡化我們的異步程序編寫。
在Apple的響應式框架Combine也提供了對AsyncSequence的兼容,可以輕松地將任何publisher都轉(zhuǎn)換為AsyncSequence的值對象。上面的下載器可以使用Combine來改寫:

func remoteDataPublisher(
    forURLs urls: [URL],
    urlSession: URLSession = .shared
) -> AnyPublisher<Data, URLError> {
    urls.publisher
        .setFailureType(to: URLError.self)
        .flatMap(maxPublishers: .max(1)) {
            urlSession.dataTaskPublisher(for: $0)
        }
        .map(\.data)
        .eraseToAnyPublisher()
}

AnyPublisher轉(zhuǎn)換為AsyncSequence,我們只需要訪問publisher的values屬性:

let publisher = remoteDataPublisher(forURLs: urls)

for try await data in publisher.values {
    ...
}

文章參考: async sequences streams and combine

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容