阿里開源 iOS 協(xié)程開發(fā)框架 coobjc 學(xué)習(xí)
coobjc 概覽
coobjc 為 OC 和 Swift 提供了協(xié)程的功能。支持 await、generator 和 actor model,并且在 cokit 庫中為 Foundation 和 UIKit 的部分 API 提供了協(xié)程化支持,包括 NSFileManager , JSON , NSData , UIImage 等。coobjc 同時(shí)還提供了元組的支持。
什么是協(xié)程?
通俗的講,協(xié)程是一種比線程更輕量級(jí)的存在。如一個(gè)進(jìn)程可以擁有多個(gè)線程一樣,一個(gè)線程可以擁有多個(gè)協(xié)程。一個(gè)協(xié)程 A 在執(zhí)行過程中,如果碰到 yield 關(guān)鍵字,就會(huì)中斷執(zhí)行,直到主線程調(diào)用 send/next 方法發(fā)送了數(shù)據(jù),協(xié)程 A 才會(huì)接收到數(shù)據(jù)繼續(xù)執(zhí)行。
在協(xié)程中通過 yield 來暫停協(xié)程執(zhí)行和在線程的阻塞是有本質(zhì)區(qū)別的。線程的阻塞狀態(tài)是由操作系統(tǒng)的內(nèi)核進(jìn)行切換的,而協(xié)程不是被操作系統(tǒng)內(nèi)核管理,而是完全由程序控制,也就是在用戶態(tài)執(zhí)行,不需要像線程那樣在用戶態(tài)和內(nèi)核態(tài)之間來回切換。所以,協(xié)程的開銷會(huì)遠(yuǎn)遠(yuǎn)小于線程的開銷。和多線程相比,線程的數(shù)量越多,協(xié)程的性能優(yōu)勢也就越明顯。
協(xié)程相對(duì)使用線程的另一個(gè)優(yōu)點(diǎn)就是:協(xié)程不需要多線程的鎖機(jī)制,因?yàn)橹挥幸粋€(gè)線程,也不存在同時(shí)寫變量沖突,在協(xié)程中控制共享資源不需要加鎖,只需要判斷狀態(tài)就好了,所以執(zhí)行效率也會(huì)比多線程高很多。
! 使用協(xié)程的優(yōu)點(diǎn):
- 協(xié)程不是被操作系統(tǒng)內(nèi)核管理,是在用戶態(tài)執(zhí)行,所以節(jié)省了線程切換的性能開銷
- 不需要多線程的鎖機(jī)制
- !使用協(xié)程的缺點(diǎn):
- 我們需要自己承擔(dān)協(xié)程之間調(diào)度的責(zé)任。
- 由于協(xié)程本質(zhì)上是在單線程上跑的,也就失去了線程使用多 CPU 的能力,無法利用多核資源。只有將協(xié)程和進(jìn)程配合才可以使用多 CPU。
- 在協(xié)程中如果使用了阻塞操作,會(huì)阻塞掉整個(gè)程序。。
使用 coobjc 能解決什么問題?
coobjc 使用協(xié)程的方式優(yōu)化了 iOS 中的異步操作,解決了 iOS 基于 block 的異步編程回調(diào)中容易碰到的以下問題:
- 容易進(jìn)入"嵌套地獄"
- 錯(cuò)誤處理復(fù)雜和冗長
- 容易忘記調(diào)用 completion handler
- 條件執(zhí)行變得很困難
- 從互相獨(dú)立的調(diào)用中組合返回結(jié)果變得極其困難
- 在錯(cuò)誤的線程中繼續(xù)執(zhí)行
- 難以定位原因的多線程崩潰
- 鎖和信號(hào)量濫用帶來的卡頓、卡死
一個(gè)明顯的優(yōu)點(diǎn)就是,coobjc 使用協(xié)程把異步變同步,簡化代碼,方便使用和維護(hù)。
下面是官方文檔中給出的一個(gè)使用傳統(tǒng)異步回調(diào)方法通過網(wǎng)絡(luò)請(qǐng)求加載一張圖片和使用 coobjc 的方式加載圖片的代碼對(duì)比示例:
//Asynchronous loading of data from the network
[NSURLSession sharedSession].configuration.requestCachePolicy = NSURLRequestReloadIgnoringCacheData;
NSURLSessionDownloadTask *task = [[NSURLSession sharedSession] downloadTaskWithURL:url completionHandler:
^(NSURL *location, NSURLResponse *response, NSError *error) {
if (error) {
return;
}
//Parsing data in child threads and generating images
dispatch_async(dispatch_get_global_queue(0, 0), ^{
NSData *data = [[NSData alloc] initWithContentsOfURL:location];
UIImage *image = [[UIImage alloc] initWithData:data];
dispatch_async(dispatch_get_main_queue(), ^{
//Dispatch to the main thread to display the image
imageView.image = image;
});
});
}];
coobjc:
co_launch(^{
NSData *data = await(downloadDataFromUrl(url));
UIImage *image = await(imageFromData(data));
imageView.image = image;
});
使用 coobjc
Simple Launch
你可以在任何地方添加一個(gè)協(xié)程:
// Create a coroutine with block, and just resume it.
co_launch(^{
// do something. The coroutine just asynchronous run in the current `dispatch_queue`
});
co_launch_onqueue(q, ^{
// ...
});
co_launch_now(^{
NSLog(@"deal things in coroutine now");
});
創(chuàng)建一個(gè)協(xié)程的便攜方式有以上幾種,你可以控制協(xié)程創(chuàng)建在哪個(gè) queue 上,或者控制協(xié)程的 resume 是同步還是異步。
-> Code testForLaunch
- (void)testForLaunch {
co_launch(^{
NSLog(@"deal things in coroutine 1");
});
co_launch_now(^{
NSLog(@"deal things in coroutine now");
});
co_launch(^{
NSLog(@"deal things in coroutine 2");
});
NSLog(@"testForLaunch");
}
我們運(yùn)行跑一下 demo 中的 testForLaunch 方法,通過控制臺(tái)輸出或者打斷點(diǎn)的方式可以看到,代碼的執(zhí)行順序是 1. co_launch_now 的 block 2.NSLog(@"testForLaunch"); 3. 第一個(gè) co_launch 的 block 4.第二個(gè) co_launch 的 block。
為什么執(zhí)行順序是這樣呢?
我們?cè)?co_launch 中打斷點(diǎn)就可以看到函數(shù)的調(diào)用棧如下:

co_launch 的作用就是創(chuàng)建一個(gè)協(xié)程并 resume,然后在 CoCoroutine 的 resume 方法中,我們可以看到它的內(nèi)部實(shí)現(xiàn):
- (COCoroutine *)resume {
dispatch_async(self.queue, ^{
if (self.isResume) {
return;
}
self.isResume = YES;
coroutine_resume(self.co);
});
return self;
}
- (void)resumeNow {
[self performBlockOnQueue:^{
if (self.isResume) {
return;
}
self.isResume = YES;
coroutine_resume(self.co);
}];
}
沒錯(cuò),resume 的內(nèi)部代碼是通過異步的方法去調(diào)用的,而 resumeNow
也就是 co_launch_now 調(diào)用的 resumeNow 方法是同步執(zhí)行代碼塊的。在 resume 之后,就會(huì)調(diào)用 Cocoroutine 的 execute 方法
去執(zhí)行 co_launch block 中的代碼。這就解釋了 testForLaunch 中代碼的執(zhí)行順序問題。
- 修改于4.16:
resumeNow 中調(diào)用 performBlockOnQueue 函數(shù),內(nèi)部判斷 coroutine 持有的queue 是不是當(dāng)前執(zhí)行的 queue, 如果是則立即執(zhí)行 block,如果不是則還是異步到這個(gè)協(xié)程持有的 queue 去執(zhí)行。
dispatch_queue_t queue = self.queue;
if (queue == co_get_current_queue()) {
block();
} else {
dispatch_async(queue, block);
}
Await
/**
Wait a `COPromise` or `COChan` object, until Promise is fulfilled/rejected, or Channel has value send.
@param awaitable `COPromise` or `COChan` object
@return If await Promise, return fulfilled value; elseif Channel, return sent value.
*/
id _Nullable co_await(id awaitable);
用 COPromise 或者 COChan 實(shí)現(xiàn)一個(gè)函數(shù),然后在協(xié)程 coroutine 中使用 await 方法去等待函數(shù)執(zhí)行的結(jié)果。直到函數(shù)返回的 promise 執(zhí)行 resolve 或者 reject,或者函數(shù)返回的 channel 中有值被 send 進(jìn)來,await 才會(huì)繼續(xù)往下執(zhí)行。
COPromise && COChan
什么是 COChan?
coobjc 的 COChan,也就是 Channel,是 CSP(Communicating Sequential Processes)的一種并發(fā)模型。它的實(shí)現(xiàn)是參考 libtask 的。
CSP并發(fā)模型
CSP模型是上個(gè)世紀(jì)七十年代提出的,用于描述兩個(gè)獨(dú)立的并發(fā)實(shí)體通過共享的通訊 channel(管道)進(jìn)行通信的并發(fā)模型。 CSP中channel是一類對(duì)象,它不關(guān)注發(fā)送消息的實(shí)體,而關(guān)注與發(fā)送消息時(shí)使用的channel。
channel 是被單獨(dú)創(chuàng)建并且可以在進(jìn)程之間傳遞,它的通信模式類似于 boss-worker 模式的,一個(gè)實(shí)體通過將消息發(fā)送到channel 中,然后又監(jiān)聽這個(gè) channel 的實(shí)體處理,兩個(gè)實(shí)體之間是匿名的,這個(gè)就實(shí)現(xiàn)實(shí)體中間的解耦,其中 channel 是同步的一個(gè)消息被發(fā)送到 channel 中,最終是一定要被另外的實(shí)體消費(fèi)掉的,在實(shí)現(xiàn)原理上其實(shí)是一個(gè)阻塞的消息隊(duì)列。
畫重點(diǎn):channel 在實(shí)現(xiàn)原理上其實(shí)是一個(gè)阻塞的消息隊(duì)列
那么在 coobjc 中也是使用 COCoroutine 做為并發(fā)實(shí)體,coroutine 非常輕量級(jí)可以創(chuàng)建幾十萬個(gè)實(shí)體。實(shí)體間通過 COChan 繼續(xù)匿名消息傳遞使之解耦。
[圖片上傳失敗...(image-f88924-1551780623659)]
[圖片上傳失敗...(image-21cd7c-1551780623659)]
COChan 類的結(jié)構(gòu)也比較簡單,整個(gè) .h 文件中,只有 COChan 的初始化方法、sendValue、receiveValue 和 cancel 方法。-> COChan.h

在 coobjc 中,COChan 的 send 和 receive 分別有兩種方法。一種是有緩沖區(qū),另一種是沒有緩沖區(qū)的方法。
-> Code testForChannelWithNoCache
運(yùn)行一下 testForChannelWithNoCache 代碼并觀察控制臺(tái)輸出的結(jié)果,思考如果把 receive 和 send 改成 receive_nonblock 和 send_nonblock 方法結(jié)果會(huì)怎么樣? 如果只改其中的一個(gè)呢?
PS:
- receive 和 send 必須要在協(xié)程中進(jìn)行調(diào)用
- receive 和 send 會(huì)使當(dāng)前協(xié)程掛起。如果使用send,并且沒有人 receive 這個(gè)消息或者 buffer 已經(jīng)滿了的情況下,會(huì)導(dǎo)致當(dāng)前協(xié)程被掛起。直到有人 receive 處理了這個(gè)消息。如果使用 receive,但是 Channel 中沒有任何消息,那么當(dāng)前協(xié)程會(huì)被掛起,直到 Channel 中被 send 了消息。
- send_nonblock 和 receive_nonblock 沒有必須在協(xié)程中使用的限制。調(diào)用這兩個(gè)方法不會(huì)造成阻塞。
- send_nonblock 在調(diào)用時(shí),如果有人正在 receiving,那么就把消息發(fā)送給他。如果沒有人在 receive 消息,并且 channel 的 buffer 沒有滿的情況下,就將消息保存到 buffer 中。如果沒人 receive 并且 buffer 也滿了的情況下,就丟棄掉這條消息。
- receive_nonblock 在調(diào)用時(shí),如果 channel 的 buffer 中有值,那么就取這個(gè)值。如果 buffer 中沒有值,但是這時(shí)候有人正在調(diào)用 sending,那么就接收 sending 的值。如果 buffer 中沒有值,也沒有人在 sending 消息,那么就 return nil。
什么是 COPromise?
COPromise 和前端中的 Promise 用法大致相同。
Promise 是異步編程的一種解決方案:從語法上講,promise是一個(gè)對(duì)象,從它可以獲取異步操作的消息;從本意上講,它是承諾,承諾它過一段時(shí)間會(huì)給你一個(gè)結(jié)果。promise有三種狀態(tài):pending(等待態(tài)),fulfiled(成功態(tài)),rejected(失敗態(tài));狀態(tài)一旦改變,就不會(huì)再變。創(chuàng)造promise實(shí)例后,它會(huì)立即執(zhí)行。
用起來就類似這樣:
- (COPromise<id> *)co_fetchSomethingAsynchronous {
return [COPromise promise:^(COPromiseResolve _Nonnull resolve, COPromiseReject _Nonnull reject) {
dispatch_async(_someQueue, ^{
id ret = nil;
NSError *error = nil;
// fetch result operations
...
if (error) {
reject(error);
} else {
resolve(ret);
}
});
}];
}
回到 Await
-> Code testForAwaitPromise
/// COPromise
- (COPromise<id> *)co_fetchSomethingAsynchronous {
return [COPromise promise:^(COPromiseFullfill _Nonnull fullfill, COPromiseReject _Nonnull reject) {
NSError *error = nil;
int number = arc4random() % 2;
if (number) {
NSLog(@"result is %d,spend some time to deal it..",number);
dispatch_after(dispatch_time(DISPATCH_TIME_NOW, (int64_t)(1.0 * NSEC_PER_SEC)), dispatch_get_main_queue(), ^{
fullfill(@(number));
});
} else {
NSLog(@"result is %d,throw out an error.",number);
error = [NSError errorWithDomain:@"error" code:10000 userInfo:nil];
reject(error);
}
} onQueue:dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0)];
}
- (void)testForAwaitPromise {
co_launch(^{
id ret = await([self co_fetchSomethingAsynchronous]);
NSError *error = co_getError();
if (error) {
NSLog(@"get an error in testForAwait, error: %@",error);
} else {
NSLog(@"get the result in testForAwait,value:%d",[ret intValue]);
}
});
}
-> Code testForAwaitChan
/// COChan
- (COChan<id> *)co_fetchSomething {
COChan *chan = [COChan chan];
dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^{
NSError *error = nil;
int number = arc4random() % 2;
if (number) {
NSLog(@"result is %d",number);
//??? 如果使用send?
[chan send_nonblock:@(number)];
} else {
NSLog(@"result is %d,throw out an error.",number);
error = [NSError errorWithDomain:@"error" code:10000 userInfo:nil];
[chan send_nonblock:error];
}
});
return chan;
}
- (COChan<id> *)co_fetchSomething1 {
COChan *chan = [COChan chan];
co_launch(^{
NSError *error = nil;
int number = arc4random() % 2;
if (number) {
NSLog(@"result is %d",number);
//??? 如果使用send?
[chan send:@(number)];
} else {
NSLog(@"result is %d,throw out an error.",number);
error = [NSError errorWithDomain:@"error" code:10000 userInfo:nil];
[chan send:error];
}
});
return chan;
}
- (void)testForAwaitChan {
co_launch(^{
id ret = await([self co_fetchSomething]);
if ([ret isKindOfClass:[NSError class]]) {
NSLog(@"get an error in testForAwaitChan, error: %@",ret);
} else {
NSLog(@"get the result in testForAwaitChan,value:%d",[ret intValue]);
}
});
}
運(yùn)行 testForAwaitPromise 和 testForAwaitChan 了解 promise 和 chan 作為返回類型時(shí),await 的使用方法。
思考??:
在 co_fetchSomething 中,如果把 send_nonblock 方法改成 send 方法會(huì)怎么樣?
如果想要用 send 方法去實(shí)現(xiàn),需要怎么做?
-> Code co_fetchSomething1在 co_fetchSomething1 中,如果把 co_launch 注釋掉會(huì)怎么樣?
解答:
會(huì)發(fā)生崩潰??梢钥吹娇刂婆_(tái)輸出內(nèi)容:reason: 'send blocking must call in a coroutine.' 為什么呢? 因?yàn)樵?co_fetchSomething 中,代碼是異步到 global queue 上去執(zhí)行的,也就是在異步線程的環(huán)境,已經(jīng)不是在之前的那條協(xié)程的環(huán)境下了。而 send 或者 receive 方法只能在 coroutine 中使用,所以就會(huì)發(fā)生以上報(bào)錯(cuò)。
如果想要用 send 可以如 co_fetchSomething1 方法所示,另外開辟一個(gè)協(xié)程,并且將代碼放到該協(xié)程的執(zhí)行塊中執(zhí)行。
如果注釋掉 co_launch,為什么不會(huì)執(zhí)行 testForAwaitChan 的 log 輸出呢?因?yàn)槿绻麤]有開啟新的 coroutine,也沒有異步到其他線程上去做,那么其實(shí)當(dāng)前的環(huán)境就是外部的 coroutine 環(huán)境。然后在調(diào)用到 send 的時(shí)候,是先執(zhí)行 [self co_fetchSomething] 再執(zhí)行 await。上面我們提到過,send 方法在沒有接收數(shù)據(jù)并且 channel 的 buffer 已經(jīng)滿的情況下,會(huì)阻塞當(dāng)前協(xié)程。所以,導(dǎo)致外部 [self co_fetchSomething] 的 log 沒有輸出。
所以以上阻塞的情況該如何解決呢? 就從 send 阻塞的條件入手,如果有人 receive 或者 buffer 不滿的話,就可以破除阻塞。我們可以注意到在 co_fetchSomething1 中,chan 的初始化方法是 COChan *chan = [COChan chan]; 的。那么我們看到 chan 方法到底做了什么東西:
+ (instancetype)chan {
COChan *chan = [[self alloc] initWithBuffCount:0];
return chan;
}
+ (instancetype)chanWithBuffCount:(int32_t)buffCount {
COChan *chan = [[self alloc] initWithBuffCount:buffCount];
return chan;
}
+ (instancetype _Nonnull )expandableChan {
COChan *chan = [[self alloc] initWithBuffCount:-1];
return chan;
}
COChan 的初始化方法中,我們看到 chan 方法其實(shí)提供的 buffer count 為 0,也就是說緩沖區(qū)大小為0,所以我們?cè)谡{(diào)用 send 方法并且沒有人 receive 的時(shí)候會(huì)直接導(dǎo)致協(xié)程掛起。那么我們可以注意到,chan 還有另外兩個(gè)初始化的方法,一個(gè)是指定 buffer 大小,另一個(gè)是根據(jù)需要會(huì)自動(dòng)擴(kuò)充 buffer 區(qū)的方法。我們用后者任何一個(gè)方法初始化有 buffer 區(qū)的 channel 都可以解除 send 阻塞的問題啦。
- Await 的內(nèi)部實(shí)現(xiàn)
id co_await(id awaitable) {
coroutine_t *t = coroutine_self();
if (t == nil) {
@throw [NSException exceptionWithName:COInvalidException reason:@"Cannot call co_await out of a coroutine" userInfo:nil];
}
if (t->is_cancelled) {
return nil;
}
if ([awaitable isKindOfClass:[COChan class]]) {
COCoroutine *co = co_get_obj(t);
co.lastError = nil;
id val = [(COChan *)awaitable receive];
return val;
} else if ([awaitable isKindOfClass:[COPromise class]]) {
COChan *chan = [COChan chanWithBuffCount:1];
COCoroutine *co = co_get_obj(t);
COPromise *promise = awaitable;
[[promise
then:^id _Nullable(id _Nullable value) {
[chan send_nonblock:value];
return value;
}]
catch:^(NSError * _Nonnull error) {
co.lastError = error;
[chan send_nonblock:nil];
}];
[chan onCancel:^(COChan * _Nonnull chan) {
[promise cancel];
}];
id val = [chan receive];
return val;
} else {
@throw [NSException exceptionWithName:COInvalidException
reason:[NSString stringWithFormat:@"Cannot await object: %@.", awaitable]
userInfo:nil];
}
}
await 內(nèi)部對(duì)參數(shù)進(jìn)行了類型判斷,如果是 Channel 就調(diào)用 channel 的 receive 方法,阻塞當(dāng)前的協(xié)程并且等待 receive 返回值,這也就是 await 會(huì)使當(dāng)前 coroutine 掛起的原因。那么如果參數(shù)是 Promise 類型,那么內(nèi)部會(huì)生成一個(gè) Channel,將這個(gè) Channel 與 Promise 綁定在一起,然后調(diào)用 channel 的 receive 方法,阻塞當(dāng)前的協(xié)程并且等待返回值。當(dāng) Promise 返回處理結(jié)果時(shí),channel 會(huì)通過 send_nonblock 的方法將值 send 過來,然后由于這時(shí)候 channel 在 receive 等待中,所以 receive 會(huì)馬上接收到這個(gè)值然后返回結(jié)果。那么,如果 Promise 返回的是 error, send_nonblock 會(huì)塞一個(gè) nil 進(jìn)來。所以外部可以通過值是否為 nil 來判斷是否發(fā)生了錯(cuò)誤。
Generator
生成器:生成器在迭代中以某種方式生成下一個(gè)值并且返回和next()調(diào)用一樣的東西。掛起返回出中間值并多次繼續(xù)的協(xié)同程序被稱作生成器。
生成器可以在很多場景中進(jìn)行使用,比如消息隊(duì)列、批量下載文件、批量加載緩存等:
[圖片上傳失敗...(image-db8482-1551780623659)]
-> Code testForRandomGenerator
執(zhí)行 demo 中的 testForRandomGenerator 方法,觀察輸出。
- (void)testForRandomGenerator {
COCoroutine *generator = co_sequence(^{
NSArray *array = @[@"??",@"??",@"???",@"??"];
while(co_isActive()){
int index = arc4random() % array.count;
NSString *result = [array objectAtIndex:index];
NSLog(@"this is a %@",result);
yield_val(result);
}
});
co_launch(^{
for(int i = 0; i < 10; i++){
NSString *whatIsThis = [generator next];
NSLog(@"look, what I get in the box! %@",whatIsThis);
}
[generator cancel];
});
}
生成器中的代碼只有在外部需要的時(shí)候才會(huì)執(zhí)行,也就是外部向生成器發(fā)送 next 消息的時(shí)候才會(huì)觸發(fā)生成器并開始產(chǎn)生數(shù)據(jù)。在生成數(shù)據(jù)之后,它就會(huì)掛起,等待下次收到 next 消息才會(huì)繼續(xù)執(zhí)行生成數(shù)據(jù)。
和傳統(tǒng)的 NSArray、NSSet、NSDictionary 等數(shù)據(jù)容器相比,生成器不需要提前將所有數(shù)據(jù)準(zhǔn)備好并存儲(chǔ)到容器中。并且,生成器的實(shí)現(xiàn)是線程安全的,因?yàn)樗鼈兌际窃趩尉€程上運(yùn)行,數(shù)據(jù)在生成器中生成,然后在另一條協(xié)程上使用,期間不需要加任何鎖。而使用傳統(tǒng)容器需要注意線程安全問題并且容易引發(fā) crash。
使用生成器去實(shí)現(xiàn)生產(chǎn)者消費(fèi)者模型的時(shí)候,我們可以把傳統(tǒng)的生產(chǎn)者生產(chǎn)出東西,然后去通知消費(fèi)者消費(fèi)的方式轉(zhuǎn)變?yōu)?消費(fèi)者需要消費(fèi)的時(shí)候去告訴生產(chǎn)者馬上生產(chǎn)出東西來給我。與傳統(tǒng)的模式相比,使用生成器實(shí)現(xiàn)的方式,避免了去使用一些多線程共享的變量計(jì)算,也避免了鎖的使用。
???: 如果注釋掉 [generator cancel] 會(huì)有什么問題?
如果注釋掉 cancel 會(huì)導(dǎo)致 generator 繼續(xù)生成一個(gè)數(shù)據(jù)。因?yàn)樵谡{(diào)用最后一個(gè) next 的時(shí)候,生成器會(huì)繼續(xù)往下執(zhí)行一個(gè)循環(huán)。
這里還有一個(gè)問題:為什么 generator 和 外部調(diào)用 next 的循環(huán)都是兩次兩次執(zhí)行輸出呢?
Actor
actor模式是一種最古老的也是最簡單的并行和分布式計(jì)算解決方案
Actor模型=數(shù)據(jù)+行為+消息。
Actor模型內(nèi)部的狀態(tài)由自己的行為維護(hù),外部線程不能直接調(diào)用對(duì)象的行為,必須通過消息才能激發(fā)行為,這樣就保證Actor內(nèi)部數(shù)據(jù)只有被自己修改。
實(shí)現(xiàn)一個(gè)計(jì)數(shù)器的代碼可能是這樣:
@interface Counter: NSObject
@property (nonatomic, assign) int count;
- (void)incCount;
- (int)getCount;
@end
@implementation Counter
- (void)incCount {
@synchronized(self) {
_count ++;
}
}
- (int)getCount {
@synchronized(self) {
return _count;
}
}
@end
使用 coobjc 實(shí)現(xiàn) Actor 的計(jì)數(shù)器代碼:
- (void)testForActor {
COActor *countActor = co_actor(^(COActorChan * _Nonnull channel) {
//定義actor的狀態(tài)變量
int count = 0;
for (COActorMessage *message in channel) {
//處理消息
if ([[message stringType] isEqualToString:@"inc"]) {
count++;
NSLog(@"the count is %d now.", count);
}
else if ([[message stringType] isEqualToString:@"get"]) {
message.complete(@(count));
NSLog(@"get the count %d", count);
}
}
});
// 給 actor 發(fā)送消息
[countActor sendMessage:@"inc"];
[countActor sendMessage:@"inc"];
}
-> Code COActor.h
COActor: COActor 是繼承于 COCoroutine 的一個(gè)子類,一個(gè) Actor 就是一條協(xié)程。COActor 的方法列表很簡單,除了初始化方法,就只有一個(gè) sendMessage 方法用于在 Actors 之間發(fā)送消息。所以整個(gè) Actor 的機(jī)制就是用 sendMessage 來維持消息傳遞的。
COActorChan: COChan 的子類,實(shí)現(xiàn)了快速枚舉協(xié)議
COActorMessage:發(fā)送給 COActor 的消息對(duì)象類型
COActorCompletable:COPromise 的子類,啥??兒沒寫,就是一個(gè) COPromise。
message 的 complete 方法內(nèi)部其實(shí)就是返回 promise 的 fulfill 值。
- (void (^)(id))complete {
COActorCompletable *completable = _completableObj;
return ^(id val){
if (completable) {
[completable fulfill:val];
}
};
}
-> Code testForActor
- (void)testForActor {
COActor *countActor = co_actor(^(COActorChan * _Nonnull channel) {
//定義actor的狀態(tài)變量
int count = 0;
for (COActorMessage *message in channel) {
//處理消息
if ([[message stringType] isEqualToString:@"inc"]) {
count++;
NSLog(@"the count is %d now.", count);
}
else if ([[message stringType] isEqualToString:@"get"]) {
message.complete(@(count));
NSLog(@"get the count %d", count);
}
}
});
// 給 actor 發(fā)送消息
// dispatch_after(dispatch_time(DISPATCH_TIME_NOW, (int64_t)(3.0 * NSEC_PER_SEC)), dispatch_get_main_queue(), ^{
[countActor sendMessage:@"inc"];
[countActor sendMessage:@"inc"];
// });
id value = [countActor sendMessage:@"get"].value;
NSLog(@"the Actor count now is %d",[value intValue]);
// co_launch(^{
// id ret = await([countActor sendMessage:@"get"]);
// NSLog(@"the Actor count now is %d",[ret intValue]);
// });
}
運(yùn)行代碼 testForActor 觀察控制臺(tái)輸出。
???
- 為什么發(fā)送 get 獲取到值為0?
通過之前的 co_launch 執(zhí)行順序可以了解到,co_launch 是異步喚醒的,所以會(huì)先執(zhí)行下面 get 的代碼,這時(shí)候 execute block 還未執(zhí)行,所以得到的值就是 0。
我們可以用上面提到過的 await 方法去獲取這個(gè) promise 的返回值。
- sendMessage的內(nèi)部實(shí)現(xiàn):
- (COActorCompletable *)sendMessage:(id)message {
COActorCompletable *completable = [COActorCompletable promise];
dispatch_async(self.queue, ^{
COActorMessage *actorMessage = [[COActorMessage alloc] initWithType:message completable:completable];
[self.messageChan send_nonblock:actorMessage];
});
return completable;
}
sendMessage 內(nèi)部其實(shí)是初始化了一個(gè) Promise,然后再異步根據(jù) promise 和 消息內(nèi)容生成一個(gè) COActorMessage,然后對(duì)這個(gè) Actor 中的 channel 通過 send_nonblock 發(fā)送這個(gè) message。
???
- 那么我們?cè)诔跏蓟?actor 的時(shí)候去設(shè)置了這個(gè) excute block,在 sendMessage 的時(shí)候往 channel 中 send 了消息。那,有關(guān) receive 的代碼呢?
在 ActorChannel 里面有一個(gè) next 方法,其中調(diào)用了 receive 方法去處理這些消息。而 next 的調(diào)用就在 excute block 中 for 循環(huán)遍歷時(shí)就調(diào)用了,如果此時(shí)沒有消息 send 進(jìn)來,就會(huì)中斷等待消息。
actor并發(fā)模型的應(yīng)用場景?
適合有狀態(tài)或者稱可變狀態(tài)的業(yè)務(wù)場景,具體案例如訂單,訂單有狀態(tài),比如未付款未發(fā)貨,已經(jīng)付款未發(fā)貨,已付款已發(fā)貨,導(dǎo)致訂單狀態(tài)的變化是事件行為,比如付款行為導(dǎo)致頂大狀態(tài)切換到"已經(jīng)付款未發(fā)貨"。
行為導(dǎo)致狀態(tài)變化,行為執(zhí)行是依靠線程,比如用戶發(fā)出一個(gè)付款的請(qǐng)求,服務(wù)器后端派出一個(gè)線程來執(zhí)行付款請(qǐng)求,攜帶付款的金額和銀行卡等等信息,當(dāng)付款請(qǐng)求被成功完成后,線程還要做的事情就是改變訂單狀態(tài),這時(shí)線程訪問訂單的一個(gè)方法比如changeState。
如果后臺(tái)有管理員同時(shí)修改這個(gè)訂單狀態(tài),那么實(shí)際有兩個(gè)線程共同訪問同一個(gè)數(shù)據(jù),這時(shí)就必須鎖,比如我們?cè)赾hangeState方法前加上sychronized這樣同步語法。
使用同步語法壞處是每次只能一個(gè)線程進(jìn)行處理,如同上廁所,只有一個(gè)蹲坑,人多就必須排隊(duì),這種情況性能很低。
如何避免鎖?
避免changeState方法被外部兩個(gè)線程同時(shí)占用訪問,那么我們自己設(shè)計(jì)專門的線程守護(hù)訂單狀態(tài),而不是普通方法代碼,普通方法代碼比較弱勢,容易被外部線程hold住,而我們?cè)O(shè)計(jì)的這個(gè)對(duì)象沒有普通方法,只有線程,這樣就變成Order的守護(hù)線程和外部訪問請(qǐng)求線程的通訊問題了。
Actor采取的這種類似消息機(jī)制的方式,實(shí)際在守護(hù)線程和外部線程之間有一個(gè)隊(duì)列,俗稱信箱,外部線程只要把請(qǐng)求放入,守護(hù)線程就讀取進(jìn)行處理。
More
- COTuple
- cokit
- coobjc 更底層的實(shí)現(xiàn)
- 更多的使用場景
...
Demo 下載地址
coobjc learn Demo
PS: 文中有關(guān)的問題大家可以通過運(yùn)行 Demo 或者自己看源碼跑一下尋找答案。有疑問的地方可以在評(píng)論區(qū)提問~
參考資料 && 擴(kuò)展閱讀
剛剛,阿里開源 iOS 協(xié)程開發(fā)框架 coobjc!