承接上一篇,輕松無痛實現(xiàn)異步操作串行。 如果沒看過上一篇,閱讀本篇可能會有點懵逼。
在上一篇文章中,我主要描述了如何實現(xiàn)異步串行運(yùn)算符,+>。并演示了如何基于他來做一些諸如參數(shù)的傳遞和錯誤的處理等操作。
這篇文章中,我們會基于之前的發(fā)現(xiàn),來實現(xiàn)異步并行運(yùn)算符 <>。 以及基于 +> 和 <> 來做一些有趣的應(yīng)用。
本文的主要內(nèi)容:
- 實現(xiàn)并行折疊運(yùn)算符:
<>; - 基于
+>和<>,實現(xiàn)一個簡潔優(yōu)雅的 Promise 接口;
第一部分 能夠折疊異步并行操作的運(yùn)算符
什么是折疊
首先,我們需要定義什么是異步并行? 就是我們同時執(zhí)行多個異步操作,當(dāng)所有操作都執(zhí)行完畢后,執(zhí)行異步(Complete)回調(diào)。比如我們已經(jīng)有了用戶的 ID,需要同時請求用戶的頭像和基本資料。在兩個請求都拿到數(shù)據(jù)時,刷新界面。
在上一篇文章中,我們在提出運(yùn)算符 +> 之前,提出了一個連接的概念。指的是把兩個異步操作連接起來,一個執(zhí)行完就執(zhí)行另一個。通過連接,把兩個異步操作合并為一個。
但現(xiàn)在異步并行,顯然不能用連接,因為多個請求是一起發(fā)生的,沒有先后順序。在本文中,用折疊來表示把多個異步請求以并行的方式合并為一個的過程。
基本分析
首先,回憶一下我們異步串行運(yùn)算符的簽名:
typealias AsyncFunc = (info : AnyObject,complete:(AnyObject?,NSError?)->Void) -> Void
+> : (AsyncFunc,AsyncFunc) -> AsyncFunc
我們通過實現(xiàn)把兩個異步操作折疊為一個,來實現(xiàn)串行折疊任意多個異步操作。
并行的思路也是一樣的,我們只要實現(xiàn)并行折疊兩個異步操作,我們就能折疊任意多個異步操作。
我們首先寫出函數(shù)的簽名:
func <>(left : AsyncFunc, right : AsyncFunc) -> AsyncFunc
為什么我們選擇的串行異步運(yùn)算符
+>是非對稱的,而并行異步運(yùn)算符<>卻是對稱的呢?這還是由串行異步和并行異步兩個運(yùn)算的性質(zhì)決定的,串行異步不滿足交換律,因為串行就代表了運(yùn)算本身有先后。而并行卻沒這個限制。a <> b == b <> a,但a +> b != b +> a
按照慣例,我們先根據(jù)函數(shù)的簽名(返回一個函數(shù)),擼個基本的架子:
func <>(left : AsyncFunc , right : AsyncFunc) -> AsyncFunc{
return { info, complete in
}
}
架子搭好以后,我們來思考一下如何實現(xiàn)函數(shù)體, 有以下幾個方面
這里的函數(shù)體,是指我們
return后面的函數(shù)的函數(shù)體,而不是<>的函數(shù)體,如果一味思考后者,很容易懵逼。函數(shù)式編程的一個關(guān)鍵技巧就是通過類型來拆分抽象層次,局部具體,總體抽象。
- 主體邏輯
既然我們的<>是用來把兩個異步操作并行折疊成一個,所以我們返回的函數(shù)體要實現(xiàn)的功能就是同時執(zhí)行left和right這兩個函數(shù),當(dāng)兩個函數(shù)都執(zhí)行完畢后(兩者都調(diào)用了自己的 complete 閉包),再調(diào)用最外層的 complete 閉包,也就是我們返回的函數(shù)簽名的第二個參數(shù)。
- 參數(shù)傳遞
最外層的參數(shù)info, 代表總的輸入?yún)?shù)。需要分別在調(diào)用left和right時傳給它們。那如何表達(dá)并行折疊后的異步調(diào)用的結(jié)果呢?我們知道left和right作為類型為AsyncFunc的異步函數(shù),在它們調(diào)用自己的complete閉包時都會帶上自己的結(jié)果。其中一種可選的方式就是把left和right的結(jié)果通過數(shù)組合并,當(dāng)做折疊后的異步的結(jié)果。
實現(xiàn)異步折疊運(yùn)算符
基于以上的分析,我們大概可以給出如下的實現(xiàn):
func <>(left : AsyncFunc , right : AsyncFunc) -> AsyncFunc{
return { info, complete in
var leftComplete = false
var rightComplete = false
var leftResult:AnyObject? = nil
var rightResult:AnyObject? = nil
let checkComplete = {
if leftComplete && rightComplete{
let finalResult:[AnyObject] = [leftResult!, rightResult!]
complete(finalResult, nil)
}
}
left(info: info){result,error in
guard error == nil else{
complete(nil, error)
return
}
leftComplete = true
leftResult = result;
checkComplete()
}
right(info: info){result,error in
guard error == nil else{
complete(nil, error)
return
}
rightComplete = true
rightResult = result;
checkComplete()
}
}
}
上面的代碼邏輯其實很簡單,我們通過一個 checkComplete 函數(shù)來檢查兩個任務(wù)是否都已經(jīng)完成,如果完成則合并兩個異步函數(shù)返回的結(jié)果,并調(diào)用最外層的 complete 閉包。 兩個異步函數(shù)則直接調(diào)用,在 complete 閉包中檢查是否出錯,沒有則保存相應(yīng)的結(jié)果,和置對應(yīng)的標(biāo)志位。
測試一下
let delay = dispatch_time(DISPATCH_TIME_NOW, Int64(NSEC_PER_SEC))
let test1:AsyncFunc = { _,complete in
print("test1")
dispatch_after(delay, dispatch_get_main_queue(), {
complete(0,nil);
})
}
let test2:AsyncFunc = { _,complete in
print("test2")
dispatch_after(delay, dispatch_get_main_queue(), {
complete(0,nil);
})
}
let test = test1 <> test2;
test(info: 0){ _,_ in print("all finished")};
上述代碼中,我們創(chuàng)建了兩個異步操作:test1 和 test2。 然后通過我們的并行折疊運(yùn)算符 <> 折疊為一個: test。之后直接運(yùn)行 test。
結(jié)果輸出:
test1
test2
all finished
我們運(yùn)行折疊后的函數(shù),test1 和 test2 都得到了調(diào)用,并且在都完成之后,調(diào)用了最外層的 complete 閉包:打印出了 all finished。看上去很完美。
精益求精
但是真的完美了嗎?
在上述測試代碼中,我們把 main_queue 換成 global_queue之后,我們會發(fā)現(xiàn)最外層的 complete 閉包被執(zhí)行了兩次,最終打印了兩次 all finished, 這明顯不是我們想要的結(jié)果。
上面的代碼其實會有一個經(jīng)典的多線程問題,如果 left 和 right的 complete 閉包是并發(fā)調(diào)用的話,就有可能在執(zhí)行完 leftComplete = true 的時候執(zhí)行被切走,執(zhí)行 right 的 complete 閉包,執(zhí)行完 right 之后繼續(xù) left 這邊的執(zhí)行。這個時序就會導(dǎo)致最終被執(zhí)行兩次。
解決也很簡單,我們只要加一個變量來當(dāng)做互斥鎖即可,最終的并行折疊運(yùn)算符修改如下:
func <>(left : AsyncFunc , right : AsyncFunc) -> AsyncFunc{
return { info, complete in
var leftComplete = false
var rightComplete = false
var finishedComplete = false
var leftResult:AnyObject? = nil
var rightResult:AnyObject? = nil
let checkComplete = {
if leftComplete && rightComplete{
objc_sync_enter(finishedComplete)
if !finishedComplete{
let finalResult:[AnyObject] = [leftResult!, rightResult!]
complete(finalResult, nil)
finishedComplete = true
}
objc_sync_exit(finishedComplete)
}
}
left(info: info){result,error in
guard error == nil else{
complete(nil, error)
return
}
leftComplete = true
leftResult = result;
checkComplete()
}
right(info: info){result,error in
guard error == nil else{
complete(nil, error)
return
}
rightComplete = true
rightResult = result;
checkComplete()
}
}
}
至此,我們擁有了一個優(yōu)雅的并行折疊運(yùn)算符:<>, 和 +> 一樣??梢詭椭覀兒喕a,抽象邏輯。 當(dāng)然,閑的蛋疼要對其玩一玩map/filter/reduce之類也是支持的,和上篇介紹的思路一樣。在此不再贅述。
第二部分,100行實現(xiàn)類 PromiseKit 的接口
鏡頭切換到一些實際應(yīng)用的場景,很多時候我們傾向于通過 closure 來組織邏輯,這樣可以把本身就耦合的邏輯寫在一個地方,也更容易維護(hù)。我們的并行折疊和串行連接運(yùn)算符都是基于函數(shù)的,能不能應(yīng)用在 closure based scenario 呢? let try it.
考慮接口易用性,我們 API 的設(shè)計可以直接參(shan)考(zhai) PromiseKit.
PromiseKit GitHub主頁的 Readme 給了這樣的一個例子:
firstly {
when(NSURLSession.GET(url).asImage(), CLLocationManager.promise())
}.then { image, location -> Void in
self.imageView.image = image
self.label.text = "\(location)"
}.always {
UIApplication.sharedApplication().networkActivityIndicatorVisible = false
}.error { error in
UIAlertView(/*…*/).show()
}
我們來分析一下他都做了什么:
僅從 API 字面分析,本文不涉及 PromiseKit 內(nèi)部真正的實現(xiàn)機(jī)制
- 通過
firstly注冊第一個任務(wù),并返回一個 Promise 對象。用于后面的鏈?zhǔn)酱a書寫。 -
when函數(shù)接受兩個同步的任務(wù),同時觸發(fā)兩個任務(wù)并阻塞當(dāng)前的執(zhí)行,直到兩個任務(wù)都完成。(異步并行的場景),這里雖然when會阻塞執(zhí)行,但when本身是運(yùn)行在主線程中的,也不會阻塞主線程。 -
then可以有任意多個,順序執(zhí)行。then塊中直接用同步的方式寫代碼。但最終這些任務(wù)都會被異步的執(zhí)行。(異步串行的場景) - 不管執(zhí)行過程中是否出錯,都會執(zhí)行
always塊 - 如果執(zhí)行過程中出錯,則執(zhí)行
error塊。
基于以上的分析,我們一步步來實現(xiàn)這幾個組件:
firstly
firstly用于接收第一個任務(wù),任務(wù)書寫是同步的方式,但必須異步運(yùn)行。
func firstly(body : Void->Void)->Promise{
let starter: AsyncFunc = { _,complete in
dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT,0)) {
body();
complete(0,nil);
}
}
return Promise(starter: starter)
}
我們的 firstly 實現(xiàn)只做了兩件事, 把第一個任務(wù)包成異步的,并用這個任務(wù)創(chuàng)建了一個 Promise 對象并返回。
因為所有任務(wù)最終都是由
Promise對象來維護(hù)的,所以firstly只需要把第一個任務(wù)直接給他即可。
Promise 類基礎(chǔ)
根據(jù)之前的分析,我們先把顯而易見的架子擼出來:
class Promise {
var chain : AsyncFunc
var alwaysClosure : (Void->Void)?
var errorClosure : (NSError?->Void)?
init(starter : AsyncFunc){
chain = starter
}
func then(body : AnyObject throws->Void )->Promise{
//TO BE IMP
return self
}
func always(closure : Void->Void)->Promise{
alwaysClosure = closure
return self
}
func error(closure : NSError?->Void)->Promise{
errorClosure = closure
fire()
return self
}
func fire(){
chain(info: 0) { (info, error) in
if let always = self.alwaysClosure{
always()
}
if error == nil{
print("all task finished")
}else{
if let errorC = self.errorClosure{
errorC(error)
}
}
}
}
}
上述代碼實現(xiàn)了除 then 函數(shù)之外的所有部件。我們把初始任務(wù)存在成員 chain 上面,然后分別用成員保存 error closure 和 always closure, 然后在注冊完 error closure 之后調(diào)用 fire 來觸發(fā) chain 的執(zhí)行,在 chain 執(zhí)行完畢后分別執(zhí)行 always 和是否出錯來執(zhí)行 error.
then,always,error都返回self, 實現(xiàn)鏈?zhǔn)秸{(diào)用。
至此,我們已經(jīng)實現(xiàn)了能執(zhí)行一個任務(wù),并且實現(xiàn) always 和 error 機(jī)制的 Promise 對象。
無限的、鏈?zhǔn)?then 塊。
如之前所說,我們把 firstly 傳進(jìn)來的初始任務(wù)保存在 chain 這個成員中。那之后的 then 傳入的其實就是后續(xù)的任務(wù),比如有三個鏈?zhǔn)降?then,就代表我們需要串行的執(zhí)行四個任務(wù):初始任務(wù),三個 then塊的任務(wù)。
所以,我們的 then 函數(shù)可以這樣來實現(xiàn):
let async: AsyncFunc = { info, complete in
dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT,0)) {
var error : NSError?
do{
try body(info)
}catch let err as NSError{
error = err
}
complete(0,error)
}
}
chain = chain +> async
return self
}
顯而易見, then 做的事情和 firstly并無太多區(qū)別,首先把傳進(jìn)來的同步任務(wù)打包成異步,第二步是把新的任務(wù)通過異步串行運(yùn)算符 +> 合并到成員 chain 中。這樣,chain 保存的就不僅僅是初始任務(wù),而是像一個累加器一樣,有多少 then, chain就是最終合并的任務(wù)。這樣,我們不管 then 多少次,每個 then 塊中的任務(wù)都會被合并到 chain 里。最終我們只需要執(zhí)行 chain, 即可觸發(fā)所有任務(wù)的鏈?zhǔn)綀?zhí)行(因為合并用的是 +>)。
注意在
then塊中執(zhí)行body的時候用了do-catch結(jié)構(gòu),目的就是在then塊接受的任務(wù)可以通過throw拋出錯誤,然后在這里捕獲,實現(xiàn)錯誤的感知(如果捕獲到錯誤,則最終會調(diào)用errorClosure)
實現(xiàn) when 函數(shù)
我們溫習(xí)一下上文對 when 函數(shù)的分析:
when函數(shù)接受兩個同步的任務(wù),同時觸發(fā)兩個任務(wù)并阻塞當(dāng)前的執(zhí)行,直到兩個任務(wù)都完成。(異步并行的場景),這里雖然when會阻塞執(zhí)行,但when本身是運(yùn)行在主線程中的,也不會阻塞主線程。
根據(jù) when 函數(shù)的定位,只要簡單實現(xiàn)成獨立的函數(shù)即可,不需要實現(xiàn)為 Promise類的成員。
func when(fstBody : (Void->Void), sndBody : (Void->Void)){
let async1 : AsyncFunc = { _ , complete in
dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT,0)) {
fstBody();
complete(0,nil);
}
}
let async2 : AsyncFunc = { _ , complete in
dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT,0)) {
sndBody();
complete(0,nil);
}
}
let async = async1 <> async2
var finished = false
async(info: 0) { (_, _) in
finished = true
}
while finished == false {
}
}
上述代碼中,when 首先把傳入的兩個同步任務(wù)打包成一部,并通過異步并行運(yùn)算符 <> 合并,然后直接執(zhí)行合并后的結(jié)果。合并后的結(jié)果回調(diào)時(也就是兩個任務(wù)都完成時),置 finished 為 true。 末尾用一個 while 在 finished 為 false 時阻塞函數(shù)的執(zhí)行。
至此,我們完成了一個最簡單的 Promise 的封裝, firstly、 Promise 主類和when 三個組件,加起來一共100行
老規(guī)矩,來測試一下
firstly { () in
when({ () in
print(“begin fst job")
sleep(1)
print("fst job in when finished")
}, sndBody: { () in
print(“begin snd job")
sleep(5)
print("snd job in when finished")
})
}.then { (info) in
print("second job")
}.then { (info) in
print("third job")
}.always { () in
print("always block")
}.error { (error) in
print("error occurred")
}
執(zhí)行流程:同時執(zhí)行 when 的兩個任務(wù),都完成之后按順序執(zhí)行 then, 最后執(zhí)行 always。因為過程中沒有error,所以 error 塊沒有被調(diào)用。
begin fst job
begin snd job
(間隔1秒)fst job in when finished
(間隔4秒)
snd job in when finished
second job
third job
always block
現(xiàn)在來簡單修改一下代碼,在 second job 里拋出一個 error:
firstly { () in
when({ () in
print(“begin fst job")
sleep(1)
print("fst job in when finished")
}, sndBody: { () in
print(“begin snd job")
sleep(5)
print("snd job in when finished")
})
}.then { (info) in
print("second job")
throw NSError(domain: "error", code: 0, userInfo: [:])
}.then { (info) in
print("third job")
}.always { () in
print("always block")
}.error { (error) in
print("error occurred")
}
最終輸出:
begin fst job
begin snd job
(間隔1秒)fst job in when finished
(間隔4秒)
snd job in when finished
second job
always block
error occurred
對比之前的結(jié)果,因為拋出了錯誤,所以 error 塊得以執(zhí)行,并且thrid job 沒有執(zhí)行,因為出錯中斷了 then 鏈的執(zhí)行。
總結(jié)
- 上一篇文章中,我們實現(xiàn)了異步串行運(yùn)算符:
+>; - 本篇文章中,我們首先實現(xiàn)了異步并行運(yùn)算符:
<>; - 然后,基于上面兩個運(yùn)算符,我們用100行實現(xiàn)了一個簡單的 Promise 實現(xiàn);