文章系列:
Async Squences/Streams
在介紹Concurrency Framework中的Async Squences/Streams,我們先回顧一下swift的集合中的 Sequence和Iterators。
Swift集合中的Sqeuence
swift集合中的Sequence是一系列相同類型值的集合,并提供了對這些值的迭代能力。
for element in someSequence {
doSomething(with: element)
}
Sequence 協(xié)議的定義:
protocol Sequence {
associatedtype Iterator: IteratorProtocol
func makeIterator() -> Iterator
}
Sequence 協(xié)議需要實現(xiàn)makeIterator方法,并返回一個Iterator,Iterator遵循IteratorProtocol:
public protocol IteratorProtocol {
associatedtype Element
public mutating func next() -> Self.Element?
}
IteratorProtocol需要實現(xiàn)next方法,返回存儲的值對象。當沒有下一個元素返回nil。
我們以下載一系列的url的任務為例子,使用同步Squence方式:
struct RemoteDataSequence: Sequence {
var urls: [URL]
func makeIterator() -> RemoteDataIterator {
RemoteDataIterator(urls: urls)
}
}
為了返回數(shù)據(jù),我們需要實現(xiàn)RemoteDataIterator類型,設計上我們使用index跟蹤下一個待下載的urls數(shù)組索引
struct RemoteDataIterator: IteratorProtocol {
var urls: [URL]
fileprivate var index = 0
mutating func next() -> Data? {
guard index < urls.count else {
return nil
}
let url = urls[index]
index += 1
// If a download fails, we simply move on to
// the next URL in this case:
guard let data = try? Data(contentsOf: url) else {
return next()
}
return data
}
}
我們現(xiàn)在可以通過for循環(huán)來遍歷訪問下載的所有圖片數(shù)據(jù)
for data in RemoteDataSequence(urls: urls) {
...
}
雖然我們通過Sqeuence實現(xiàn)了一個簡潔的批量下載器,但是批量下載使用同步的方式顯然比較難于接受,這樣會完全阻塞線程。接下來我們通過使用asynchronous sequence來達到我們的要求。
Asynchronous iterations
Swift 5.5中Concurrency為了方便并行任務的開發(fā),提供了AsyncSequence,使用方式類似同步版本的Sequence。針對批量下載器我們可以這樣改造一下:
struct RemoteDataSequence: AsyncSequence {
typealias Element = Data
var urls: [URL]
func makeAsyncIterator() -> RemoteDataIterator {
RemoteDataIterator(urls: urls)
}
}
AsyncSequence重要實現(xiàn)其實是在RemoteDataIterator中,Concurrency為RemoteDataIterator的next返回方法添加了async。通過URLSession的async-wait API,我們可以異步下載我們的數(shù)據(jù):
struct RemoteDataIterator: AsyncIteratorProtocol {
var urls: [URL]
fileprivate var urlSession = URLSession.shared
fileprivate var index = 0
mutating func next() async throws -> Data? {
guard index < urls.count else {
return nil
}
let url = urls[index]
index += 1
let (data, _) = try await urlSession.data(from: url)
return data
}
}
通過AsyncSequence的改造,現(xiàn)在我們的批量下載器已經(jīng)可以全異步執(zhí)行,不過在我們訪問數(shù)據(jù)時還是需要調(diào)用await和try,數(shù)據(jù)會通過后臺線程下載并允許我們使用for循環(huán)來遍歷訪問
for try await data in RemoteDataSequence(urls: urls) {
...
}
在for循環(huán)中,如果一個步驟拋出了異常則循環(huán)會中止,這樣有利于簡化異常捕獲的處理。如果不想要異常導致循環(huán)中斷,也可以實現(xiàn)無異常的方法。
Asynchronous streams
通過實現(xiàn)AsyncIteratorProtocol有時候還是稍嫌麻煩(需要自定義 AsyncIteratorProtocol 的類型),Concurrency提供了AsyncStream和AsyncThrowingStream。在AsyncStream和AsyncThrowingStream構(gòu)造閉包函數(shù)中,需要使用Task來執(zhí)行異步任務,使用yield方法來返回數(shù)據(jù),同時調(diào)用finish來告知是否存在異常。上面的例子可以改造為:
func remoteDataStream(
forURLs urls: [URL],
urlSession: URLSession = .shared
) -> AsyncThrowingStream<Data, Error> {
AsyncThrowingStream { continuation in
Task {
do {
for url in urls {
let (data, _) = try await urlSession.data(from: url)
continuation.yield(data)
}
continuation.finish(throwing: nil)
} catch {
continuation.finish(throwing: error)
}
}
}
}
現(xiàn)在我們可以同樣使用for來遍歷我們的下載數(shù)據(jù):
for try await data in remoteDataStream(forURLs: urls) {
...
}
AsyncStream和AsyncThrowingStream可以認為是AsyncSequence協(xié)議的具體實現(xiàn),相當于Array是Sequence的具體實現(xiàn)。在開發(fā)中使用stream可以簡化我們的異步程序編寫。
在Apple的響應式框架Combine也提供了對AsyncSequence的兼容,可以輕松地將任何publisher都轉(zhuǎn)換為AsyncSequence的值對象。上面的下載器可以使用Combine來改寫:
func remoteDataPublisher(
forURLs urls: [URL],
urlSession: URLSession = .shared
) -> AnyPublisher<Data, URLError> {
urls.publisher
.setFailureType(to: URLError.self)
.flatMap(maxPublishers: .max(1)) {
urlSession.dataTaskPublisher(for: $0)
}
.map(\.data)
.eraseToAnyPublisher()
}
將AnyPublisher轉(zhuǎn)換為AsyncSequence,我們只需要訪問publisher的values屬性:
let publisher = remoteDataPublisher(forURLs: urls)
for try await data in publisher.values {
...
}