這篇以及之后的文章主要會對 ReactiveObjc v2.5 的實(shí)現(xiàn)進(jìn)行分析,從最簡單的例子中了解 ReactiveCocoa 的工作原理以及概念,也是筆者個(gè)人對于 RAC 學(xué)習(xí)的總結(jié)與理解。本文主要會圍繞 RAC 中核心概念 RACSignal 展開,詳細(xì)了解其底層實(shí)現(xiàn)。
狀態(tài)驅(qū)動
2015 年的夏天的時(shí)候,做了幾個(gè)簡單的開源框架,想做點(diǎn)其它更有意思的框架卻沒什么思路,就開始看一些跟編程沒有太大關(guān)系的書籍。

其中一本叫做《失控》給了我很大的啟發(fā),其中有一則故事是這樣的:
布魯克斯開展了一個(gè)雄心勃勃的研究生課題項(xiàng)目,研發(fā)更接近昆蟲而非恐龍的機(jī)器人。
布魯克斯的設(shè)想在一個(gè)叫「成吉思」的機(jī)巧裝置上成形。成吉思有橄欖球大小,像只蟑螂似的。布魯克斯把他的精簡理念發(fā)揮到了極致。小成吉思有 6 條腿卻沒有一丁點(diǎn)兒可以稱為「腦」的東西。所有 12 個(gè)電機(jī)和 21 個(gè)傳感器分布在沒有中央處理器的可解耦網(wǎng)絡(luò)上。然而這 12 個(gè)充當(dāng)肌肉的電機(jī)和 21 個(gè)傳感器之間的交互作用居然產(chǎn)生了令人驚嘆的復(fù)雜性和類似生命體的行為。
成吉思的每條小細(xì)腿都在自顧自地工作,和其余的腿毫無關(guān)系。每條腿都通過自己的一組神經(jīng)元——一個(gè)微型處理器——來控制其動作。每條腿只需管好自己!對成吉思來說,走路是一個(gè)團(tuán)隊(duì)合作項(xiàng)目,至少有六個(gè)小頭腦在工作。它體內(nèi)其余更微小的腦力則負(fù)責(zé)腿與腿之間的通訊。昆蟲學(xué)家說這正是螞蟻和蟑螂的解決之道——這些爬行昆蟲的足肢上的神經(jīng)元負(fù)責(zé)為該足肢進(jìn)行思考。
------ 《失控》第三章·第二節(jié) 快速、廉價(jià)、失控
書中對于機(jī)器人的介紹比較冗長,在這里就簡單總結(jié)一下:機(jī)器人的每一條腿都單獨(dú)進(jìn)行工作,通過傳感器感應(yīng)的狀態(tài)做出響應(yīng):
- 如果腿抬起來了,那么它要落下去;
- 如果腿在向前動,要讓另外五條腿距離它遠(yuǎn)一點(diǎn);
這種去中心化的方式,簡化了整個(gè)系統(tǒng)的構(gòu)造,使得各個(gè)組件只需要關(guān)心狀態(tài),以及狀態(tài)對應(yīng)的動作;不再需要一個(gè)中樞系統(tǒng)來組織、管理其它的組件,并負(fù)責(zé)大多數(shù)的業(yè)務(wù)邏輯。這種自底向下的、狀態(tài)驅(qū)動的構(gòu)建方式能夠使用多個(gè)較小的組件,減少臃腫的中樞出現(xiàn)的可能性,從而降低系統(tǒng)的復(fù)雜度。
ReactiveCocoa 與信號
ReactiveCocoa 對于狀態(tài)的理解與《失控》一書中十分類似,將原有的各種設(shè)計(jì)模式,包括代理、Target/Action、block、通知中心以及觀察者模式各種『輸入』,都抽象成了數(shù)據(jù)流或者信號(也可以理解為狀態(tài)流)讓單一的組件能夠?qū)ψ约旱捻憫?yīng)動作進(jìn)行控制,簡化了視圖控制器的負(fù)擔(dān)。
在 ReactiveCocoa 中最重要的信號,也就是 RACSignal 對象是這一篇文章介紹的核心;文章中主要會介紹下面的代碼片段出現(xiàn)的內(nèi)容:
RACSignal *signal = [RACSignal createSignal:^RACDisposable * _Nullable(id<RACSubscriber> _Nonnull subscriber) {
[subscriber sendNext:@1];
[subscriber sendNext:@2];
[subscriber sendCompleted];
return [RACDisposable disposableWithBlock:^{
NSLog(@"dispose");
}];
}];
[signal subscribeNext:^(id _Nullable x) {
NSLog(@"%@", x);
}];
在上述代碼執(zhí)行時(shí),會在控制臺中打印出以下內(nèi)容:
1
2
dispose
代碼片段基本都是圍繞 RACSignal 類進(jìn)行的,文章會分四部分對上面的代碼片段的工作流程進(jìn)行簡單的介紹:
- 簡單了解
RACSignal - 信號的創(chuàng)建
- 信號的訂閱與發(fā)送
- 訂閱的回收過程
RACSignal 簡介
RACSignal 其實(shí)是抽象類 RACStream 的子類,在整個(gè) ReactiveObjc 工程中有另一個(gè)類 RACSequence 也繼承自抽象類 RACStream:

RACSignal 可以說是 ReactiveCocoa 中的核心類,也是最重要的概念,整個(gè)框架圍繞著 RACSignal 的概念進(jìn)行組織,對 RACSignal 最簡單的理解就是它表示一連串的狀態(tài):

在狀態(tài)改變時(shí),對應(yīng)的訂閱者 RACSubscriber 就會收到通知執(zhí)行相應(yīng)的指令,在 ReactiveCocoa 的世界中所有的消息都是通過信號的方式來傳遞的,原有的設(shè)計(jì)模式都會簡化為一種模型,這篇文章作為 ReactiveCocoa 系列的第一篇文章并不會對這些問題進(jìn)行詳細(xì)的展開和介紹,只會對 RACSignal 使用過程的原理進(jìn)行簡單的分析。
這一小節(jié)會對 RACStream 以及 RACSignal 中與 RACStream 相關(guān)的部分進(jìn)行簡單的介紹。
RACStream
RACStream 作為抽象類本身不提供方法的實(shí)現(xiàn),其實(shí)現(xiàn)內(nèi)部原生提供的而方法都是抽象方法,會在調(diào)用時(shí)直接拋出異常:
+ (__kindof RACStream *)empty {
NSString *reason = [NSString stringWithFormat:@"%@ must be overridden by subclasses", NSStringFromSelector(_cmd)];
@throw [NSException exceptionWithName:NSInternalInconsistencyException reason:reason userInfo:nil];
}
- (__kindof RACStream *)bind:(RACStreamBindBlock (^)(void))block;
+ (__kindof RACStream *)return:(id)value;
- (__kindof RACStream *)concat:(RACStream *)stream;
- (__kindof RACStream *)zipWith:(RACStream *)stream;

上面的這些抽象方法都需要子類覆寫,不過 RACStream 在 Operations 分類中使用上面的抽象方法提供了豐富的內(nèi)容,比如說 -flattenMap: 方法:
- (__kindof RACStream *)flattenMap:(__kindof RACStream * (^)(id value))block {
Class class = self.class;
return [[self bind:^{
return ^(id value, BOOL *stop) {
id stream = block(value) ?: [class empty];
NSCAssert([stream isKindOfClass:RACStream.class], @"Value returned from -flattenMap: is not a stream: %@", stream);
return stream;
};
}] setNameWithFormat:@"[%@] -flattenMap:", self.name];
}
其他方法比如 -skip:、-take:、-ignore: 等等實(shí)例方法都構(gòu)建在這些抽象方法之上,只要子類覆寫了所有抽象方法就能自動獲得所有的 Operation 分類中的方法。

RACSignal 與 Monad
如果你對 Monad 有所了解,那么你應(yīng)該知道
bind和return其實(shí)是 Monad 中的概念,但 Monad 并不是本篇文章所覆蓋的內(nèi)容,并不會具體解釋它到底是什么。
ReactiveCocoa 框架中借鑒了很多其他平臺甚至語言中的概念,包括微軟中的 Reactive Extension 以及 Haskell 中的 Monad,RACStream 提供的抽象方法中的 +return: 和 -bind: 就與 Haskell 中 Monad 完全一樣。
很多人都說 Monad 只是一個(gè)自函子范疇上的一個(gè)幺半群而已;在筆者看來這種說法雖然是正確的,不過也很扯淡,這句話解釋了還是跟沒解釋一樣,如果有人再跟你用這句話解釋 Monad,我覺得你最好的回應(yīng)就是買一本范疇論糊他一臉。如果真的想了解 Haskell 中的 Monad 到底是什么?可以從代碼的角度入手,多寫一些代碼就明白了,這個(gè)概念理解起來其實(shí)根本沒什么困難的,當(dāng)然也可以看一下 A Fistful of Monads,寫寫其中的代碼,會對 Monad 有自己的認(rèn)知,當(dāng)然,請不要再寫一篇解釋 Monad 的教程了(手動微笑)。
首先來看一下 +return 方法的 實(shí)現(xiàn):
+ (RACSignal *)return:(id)value {
return [RACReturnSignal return:value];
}
該方法接受一個(gè) NSObject 對象,并返回一個(gè) RACSignal 的實(shí)例,它會將一個(gè) UIKit 世界的對象 NSObject 轉(zhuǎn)換成 ReactiveCocoa 中的 RACSignal:

而 RACReturnSignal 也僅僅是把 NSObject 對象包裝一下,并沒有做什么復(fù)雜的事情:
+ (RACSignal *)return:(id)value {
RACReturnSignal *signal = [[self alloc] init];
signal->_value = value;
return signal;
}
但是 -bind: 方法的 實(shí)現(xiàn) 相比之下就十分復(fù)雜了:
- (RACSignal *)bind:(RACSignalBindBlock (^)(void))block {
return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
RACSignalBindBlock bindingBlock = block();
return [self subscribeNext:^(id x) {
BOOL stop = NO;
id signal = bindingBlock(x, &stop);
if (signal != nil) {
[signal subscribeNext:^(id x) {
[subscriber sendNext:x];
} error:^(NSError *error) {
[subscriber sendError:error];
} completed:^{
[subscriber sendCompleted];
}];
}
if (signal == nil || stop) {
[subscriber sendCompleted];
}
} error:^(NSError *error) {
[subscriber sendError:error];
} completed:^{
[subscriber sendCompleted];
}];
}] setNameWithFormat:@"[%@] -bind:", self.name];
}
筆者在這里對
-bind:方法進(jìn)行了大量的省略,省去了其中對各種RACDisposable的處理過程。
-bind: 方法會在原信號每次發(fā)出消息時(shí),都執(zhí)行 RACSignalBindBlock 對原有的信號中的消息進(jìn)行變換生成一個(gè)新的信號:

在原有的
RACSignal對象上調(diào)用-bind:方法傳入RACSignalBindBlock,圖示中的右側(cè)就是具體的執(zhí)行過程,原信號在變換之后變成了新的藍(lán)色的RACSignal對象。
RACSignalBindBlock 可以簡單理解為一個(gè)接受 NSObject 對象返回 RACSignal 對象的函數(shù):
typedef RACSignal * _Nullable (^RACSignalBindBlock)(id _Nullable value, BOOL *stop);
其函數(shù)簽名可以理解為 id -> RACSignal,然而這種函數(shù)是無法直接對 RACSignal 對象進(jìn)行變換的;不過通過 -bind: 方法就可以使用這種函數(shù)操作 RACSignal,其實(shí)現(xiàn)如下:
- 將
RACSignal對象『解包』出NSObject對象; - 將
NSObject傳入RACSignalBindBlock返回RACSignal。
如果在不考慮 RACSignal 會發(fā)出錯(cuò)誤或者完成信號時(shí),-bind: 可以簡化為更簡單的形式:
- (RACSignal *)bind:(RACSignalBindBlock (^)(void))block {
return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
RACSignalBindBlock bindingBlock = block();
return [self subscribeNext:^(id x) {
BOOL stop = NO;
[bindingBlock(x, &stop) subscribeNext:^(id x) {
[subscriber sendNext:x];
}];
}];
}] setNameWithFormat:@"[%@] -bind:", self.name];
}
調(diào)用 -subscribeNext: 方法訂閱當(dāng)前信號,將信號中的狀態(tài)解包,然后將原信號中的狀態(tài)傳入 bindingBlock 中并訂閱返回的新的信號,將生成的新狀態(tài) x 傳回原信號的訂閱者。
這里通過兩個(gè)簡單的例子來了解 -bind: 方法的作用:
RACSignal *signal = [RACSignal createSignal:^RACDisposable * _Nullable(id<RACSubscriber> _Nonnull subscriber) {
[subscriber sendNext:@1];
[subscriber sendNext:@2];
[subscriber sendNext:@3];
[subscriber sendNext:@4];
[subscriber sendCompleted];
return nil;
}];
RACSignal *bindSignal = [signal bind:^RACSignalBindBlock _Nonnull{
return ^(NSNumber *value, BOOL *stop) {
value = @(value.integerValue * value.integerValue);
return [RACSignal return:value];
};
}];
[signal subscribeNext:^(id _Nullable x) {
NSLog(@"signal: %@", x);
}];
[bindSignal subscribeNext:^(id _Nullable x) {
NSLog(@"bindSignal: %@", x);
}];
上面的代碼中直接使用了 +return: 方法將 value 打包成了 RACSignal * 對象:

在 BindSignal 中的每一個(gè)數(shù)字其實(shí)都是由一個(gè)
RACSignal包裹的,這里沒有畫出,在下一個(gè)例子中,讀者可以清晰地看到其中的區(qū)別。
上圖簡要展示了變化前后的信號中包含的狀態(tài),在運(yùn)行上述代碼時(shí),會在終端中打印出:
signal: 1
signal: 2
signal: 3
signal: 4
bindSignal: 1
bindSignal: 4
bindSignal: 9
bindSignal: 16
這是一個(gè)最簡單的例子,直接使用 -return: 打包 NSObject 返回一個(gè) RACSignal,接下來用一個(gè)更復(fù)雜的例子來幫助我們更好的了解 -bind: 方法:
RACSignal *signal = [RACSignal createSignal:^RACDisposable * _Nullable(id<RACSubscriber> _Nonnull subscriber) {
[subscriber sendNext:@1];
[subscriber sendNext:@2];
[subscriber sendCompleted];
return nil;
}];
RACSignal *bindSignal = [signal bind:^RACSignalBindBlock _Nonnull{
return ^(NSNumber *value, BOOL *stop) {
NSNumber *returnValue = @(value.integerValue * value.integerValue);
return [RACSignal createSignal:^RACDisposable * _Nullable(id<RACSubscriber> _Nonnull subscriber) {
for (NSInteger i = 0; i < value.integerValue; i++) [subscriber sendNext:returnValue];
[subscriber sendCompleted];
return nil;
}];
};
}];
[bindSignal subscribeNext:^(id _Nullable x) {
NSLog(@"%@", x);
}];
下圖相比上面例子中的圖片更能精確的表現(xiàn)出 -bind: 方法都做了什么:

信號中原有的狀態(tài)經(jīng)過 -bind: 方法中傳入 RACSignalBindBlock 的處理實(shí)際上返回了多個(gè) RACSignal。
在源代碼的注釋中清楚地寫出了方法的實(shí)現(xiàn)過程:
- 訂閱原信號中的值;
- 將原信號發(fā)出的值傳入
RACSignalBindBlock進(jìn)行轉(zhuǎn)換; - 如果
RACSignalBindBlock返回一個(gè)信號,就會訂閱該信號并將信號中的所有值傳給訂閱者subscriber; - 如果
RACSignalBindBlock請求終止信號就會向原信號發(fā)出-sendCompleted消息; - 當(dāng)所有信號都完成時(shí),會向訂閱者發(fā)送
-sendCompleted; - 無論何時(shí),如果信號發(fā)出錯(cuò)誤,都會向訂閱者發(fā)送
-sendError:消息。
如果想要了解 -bind: 方法在執(zhí)行的過程中是如何處理訂閱的清理和銷毀的,可以閱讀文章最后的 -bind: 中對訂閱的銷毀 部分。
信號的創(chuàng)建
信號的創(chuàng)建過程十分簡單,-createSignal: 是推薦的創(chuàng)建信號的方法,方法其實(shí)只做了一次轉(zhuǎn)發(fā):
+ (RACSignal *)createSignal:(RACDisposable * (^)(id<RACSubscriber> subscriber))didSubscribe {
return [RACDynamicSignal createSignal:didSubscribe];
}
+ (RACSignal *)createSignal:(RACDisposable * (^)(id<RACSubscriber> subscriber))didSubscribe {
RACDynamicSignal *signal = [[self alloc] init];
signal->_didSubscribe = [didSubscribe copy];
return [signal setNameWithFormat:@"+createSignal:"];
}
該方法其實(shí)只是創(chuàng)建了一個(gè) RACDynamicSignal 實(shí)例并保存了傳入的 didSubscribe 代碼塊,在每次有訂閱者訂閱當(dāng)前信號時(shí),都會執(zhí)行一遍,向訂閱者發(fā)送消息。
RACSignal 類簇
雖然 -createSignal: 的方法簽名上返回的是 RACSignal 對象的實(shí)例,但是實(shí)際上這里返回的是 RACDynamicSignal,也就是 RACSignal 的子類;同樣,在 ReactiveCocoa 中也有很多其他的 RACSignal 子類。
使用類簇的方式設(shè)計(jì)的 RACSignal 在創(chuàng)建實(shí)例時(shí)可能會返回 RACDynamicSignal、RACEmptySignal、RACErrorSignal 和 RACReturnSignal 對象:

其實(shí)這幾種子類并沒有對原有的 RACSignal 做出太大的改變,它們的創(chuàng)建過程也不是特別的復(fù)雜,只需要調(diào)用 RACSignal 不同的類方法:

RACSignal 只是起到了一個(gè)代理的作用,最后的實(shí)現(xiàn)過程還是會指向?qū)?yīng)的子類:
+ (RACSignal *)error:(NSError *)error {
return [RACErrorSignal error:error];
}
+ (RACSignal *)empty {
return [RACEmptySignal empty];
}
+ (RACSignal *)return:(id)value {
return [RACReturnSignal return:value];
}
以 RACReturnSignal 的創(chuàng)建過程為例:
+ (RACSignal *)return:(id)value {
RACReturnSignal *signal = [[self alloc] init];
signal->_value = value;
return signal;
}
這個(gè)信號的創(chuàng)建過程和 RACDynamicSignal 的初始化過程一樣,都非常簡單;只是將傳入的 value 簡單保存一下,在有其他訂閱者 -subscribe: 時(shí),向訂閱者發(fā)送 value:
- (RACDisposable *)subscribe:(id<RACSubscriber>)subscriber {
return [RACScheduler.subscriptionScheduler schedule:^{
[subscriber sendNext:self.value];
[subscriber sendCompleted];
}];
}
RACEmptySignal 和 RACErrorSignal 的創(chuàng)建過程也異常的簡單,只是對傳入的數(shù)據(jù)進(jìn)行簡單的存儲,然后在訂閱時(shí)發(fā)送出來:
// RACEmptySignal
+ (RACSignal *)empty {
return [[[self alloc] init] setNameWithFormat:@"+empty"];
}
- (RACDisposable *)subscribe:(id<RACSubscriber>)subscriber {
return [RACScheduler.subscriptionScheduler schedule:^{
[subscriber sendCompleted];
}];
}
// RACErrorSignal
+ (RACSignal *)error:(NSError *)error {
RACErrorSignal *signal = [[self alloc] init];
signal->_error = error;
return signal;
}
- (RACDisposable *)subscribe:(id<RACSubscriber>)subscriber {
return [RACScheduler.subscriptionScheduler schedule:^{
[subscriber sendError:self.error];
}];
}
這兩個(gè)創(chuàng)建過程的唯一區(qū)別就是一個(gè)發(fā)送的是『空值』,另一個(gè)是 NSError 對象。
信號的訂閱與信息的發(fā)送
ReactiveCocoa 中信號的訂閱與信息的發(fā)送過程主要是由 RACSubscriber 類來處理的,而這也是信號的處理過程中最重要的一部分,這一小節(jié)會先分析整個(gè)工作流程,之后會深入代碼的實(shí)現(xiàn)。

在信號創(chuàng)建之后調(diào)用 -subscribeNext: 方法返回一個(gè) RACDisposable,然而這不是這一流程關(guān)心的重點(diǎn),在訂閱過程中生成了一個(gè) RACSubscriber 對象,向這個(gè)對象發(fā)送消息 -sendNext: 時(shí),就會向所有的訂閱者發(fā)送消息。
信號的訂閱
信號的訂閱與 -subscribe: 開頭的一系列方法有關(guān):

訂閱者可以選擇自己想要感興趣的信息類型 next/error/completed 進(jìn)行關(guān)注,并在對應(yīng)的信息發(fā)生時(shí)調(diào)用 block 進(jìn)行處理回調(diào)。
所有的方法其實(shí)只是對 nextBlock、completedBlock 以及 errorBlock 的組合,這里以其中最長的 -subscribeNext:error:completed: 方法的實(shí)現(xiàn)為例(也只需要介紹這一個(gè)方法):
- (RACDisposable *)subscribeNext:(void (^)(id x))nextBlock error:(void (^)(NSError *error))errorBlock completed:(void (^)(void))completedBlock {
RACSubscriber *o = [RACSubscriber subscriberWithNext:nextBlock error:errorBlock completed:completedBlock];
return [self subscribe:o];
}
方法中傳入的所有 block 參數(shù)都應(yīng)該是非空的。
拿到了傳入的 block 之后,使用 +subscriberWithNext:error:completed: 初始化一個(gè) RACSubscriber 對象的實(shí)例:
+ (instancetype)subscriberWithNext:(void (^)(id x))next error:(void (^)(NSError *error))error completed:(void (^)(void))completed {
RACSubscriber *subscriber = [[self alloc] init];
subscriber->_next = [next copy];
subscriber->_error = [error copy];
subscriber->_completed = [completed copy];
return subscriber;
}
在拿到這個(gè)對象之后,調(diào)用 RACSignal 的 -subscribe: 方法傳入訂閱者對象:
- (RACDisposable *)subscribe:(id<RACSubscriber>)subscriber {
NSCAssert(NO, @"This method must be overridden by subclasses");
return nil;
}
RACSignal 類中其實(shí)并沒有實(shí)現(xiàn)這個(gè)實(shí)例方法,需要在上文提到的四個(gè)子類對這個(gè)方法進(jìn)行覆寫,這里僅分析 RACDynamicSignal 中的方法:
- (RACDisposable *)subscribe:(id<RACSubscriber>)subscriber {
RACCompoundDisposable *disposable = [RACCompoundDisposable compoundDisposable];
subscriber = [[RACPassthroughSubscriber alloc] initWithSubscriber:subscriber signal:self disposable:disposable];
RACDisposable *schedulingDisposable = [RACScheduler.subscriptionScheduler schedule:^{
RACDisposable *innerDisposable = self.didSubscribe(subscriber);
[disposable addDisposable:innerDisposable];
}];
[disposable addDisposable:schedulingDisposable];
return disposable;
}
這里暫時(shí)不需要關(guān)注與
RACDisposable有關(guān)的任何內(nèi)容,我們會在下一節(jié)中詳細(xì)介紹。
RACPassthroughSubscriber 就像它的名字一樣,只是對上面創(chuàng)建的訂閱者對象進(jìn)行簡單的包裝,將所有的消息轉(zhuǎn)發(fā)給內(nèi)部的 innerSubscriber,也就是傳入的 RACSubscriber 對象:
- (instancetype)initWithSubscriber:(id<RACSubscriber>)subscriber signal:(RACSignal *)signal disposable:(RACCompoundDisposable *)disposable {
self = [super init];
_innerSubscriber = subscriber;
_signal = signal;
_disposable = disposable;
[self.innerSubscriber didSubscribeWithDisposable:self.disposable];
return self;
}
如果直接簡化 -subscribe: 方法的實(shí)現(xiàn),你可以看到一個(gè)看起來極為敷衍的代碼:
- (RACDisposable *)subscribe:(id<RACSubscriber>)subscriber {
return self.didSubscribe(subscriber);
}
方法只是執(zhí)行了在創(chuàng)建信號時(shí)傳入的 RACSignalBindBlock:
[RACSignal createSignal:^RACDisposable * _Nullable(id<RACSubscriber> _Nonnull subscriber) {
[subscriber sendNext:@1];
[subscriber sendNext:@2];
[subscriber sendCompleted];
return [RACDisposable disposableWithBlock:^{
NSLog(@"dispose");
}];
}];
總而言之,信號的訂閱過程就是初始化 RACSubscriber 對象,然后執(zhí)行 didSubscribe 代碼塊的過程。

信息的發(fā)送
在 RACSignalBindBlock 中,訂閱者可以根據(jù)自己的興趣選擇自己想要訂閱哪種消息;我們也可以按需發(fā)送三種消息:

而現(xiàn)在只需要簡單看一下這三個(gè)方法的實(shí)現(xiàn),就能夠明白信息的發(fā)送過程了(真是沒啥好說的,不過為了湊字?jǐn)?shù)完整性):
- (void)sendNext:(id)value {
@synchronized (self) {
void (^nextBlock)(id) = [self.next copy];
if (nextBlock == nil) return;
nextBlock(value);
}
}
-sendNext: 只是將方法傳入的值傳入 nextBlock 再調(diào)用一次,并沒有什么值得去分析的地方,而剩下的兩個(gè)方法實(shí)現(xiàn)也差不多,會調(diào)用對應(yīng)的 block,在這里就省略了。
訂閱的回收過程
在創(chuàng)建信號時(shí),我們向 -createSignal: 方法中傳入了 didSubscribe 信號,這個(gè) block 在執(zhí)行結(jié)束時(shí)會返回一個(gè) RACDisposable 對象,用于在訂閱結(jié)束時(shí)進(jìn)行必要的清理,同樣也可以用于取消因?yàn)橛嗛唲?chuàng)建的正在執(zhí)行的任務(wù)。
而處理這些事情的核心類就是 RACDisposable 以及它的子類:

這篇文章中主要關(guān)注的是左側(cè)的三個(gè)子類,當(dāng)然
RACDisposable的子類不止這三個(gè),還有用于處理 KVO 的RACKVOTrampoline,不過在這里我們不會討論這個(gè)類的實(shí)現(xiàn)。
RACDisposable
在繼續(xù)分析討論訂閱的回收過程之前,筆者想先對 RACDisposable 進(jìn)行簡要的剖析和介紹:

類 RACDisposable 是以 _disposeBlock 為核心進(jìn)行組織的,幾乎所有的方法以及屬性其實(shí)都是對 _disposeBlock 進(jìn)行的操作。
關(guān)于 _disposeBlock 中的 self
這一小節(jié)的內(nèi)容是可選的,跳過不影響整篇文章閱讀的連貫性。
_disposeBlock 是一個(gè)私有的指針變量,當(dāng) void (^)(void) 類型的 block 被傳入之后都會轉(zhuǎn)換成 CoreFoundation 中的類型并以 void * 的形式存入 _disposeBlock 中:
+ (instancetype)disposableWithBlock:(void (^)(void))block {
return [[self alloc] initWithBlock:block];
}
- (instancetype)initWithBlock:(void (^)(void))block {
self = [super init];
_disposeBlock = (void *)CFBridgingRetain([block copy]);
OSMemoryBarrier();
return self;
}
奇怪的是,_disposeBlock 中不止會存儲代碼塊 block,還有可能存儲橋接之后的 self:
- (instancetype)init {
self = [super init];
_disposeBlock = (__bridge void *)self;
OSMemoryBarrier();
return self;
}
這里,剛開始看到可能會覺得比較奇怪,有兩個(gè)疑問需要解決:
- 為什么要提供一個(gè)
-init方法來初始化RACDisposable對象? - 為什么要向
_disposeBlock中傳入當(dāng)前對象?
對于 RACDisposable 來說,雖然一個(gè)不包含 _disposeBlock 的對象沒什么太多的意義,但是對于 RACSerialDisposable 等子類來說,卻不完全是這樣,因?yàn)?RACSerialDisposable 在 -dispose 時(shí),并不需要執(zhí)行 disposeBlock,這樣就浪費(fèi)了內(nèi)存和 CPU 時(shí)間;但是同時(shí)我們需要一個(gè)合理的方法準(zhǔn)確地判斷當(dāng)前對象的 isDisposed:
- (BOOL)isDisposed {
return _disposeBlock == NULL;
}
所以,使用向 _disposeBlock 中傳入 NULL 的方式來判斷 isDisposed;在 -init 調(diào)用時(shí)傳入 self 而不是 NULL 防止?fàn)顟B(tài)被誤判,這樣就在不引入其他實(shí)例變量、增加對象的設(shè)計(jì)復(fù)雜度的同時(shí),解決了這兩個(gè)問題。
如果仍然不理解上述的兩個(gè)問題,在這里舉一個(gè)錯(cuò)誤的例子,如果 _disposeBlock 在使用時(shí)只傳入 NULL 或者 block,那么在 RACCompoundDisposable 初始化時(shí),是應(yīng)該向 _disposeBlock 中傳入什么呢?
- 傳入
NULL會導(dǎo)致在初始化之后isDisposed == YES,然而當(dāng)前對象根本沒有被回收; - 傳入
block會導(dǎo)致無用的 block 的執(zhí)行,浪費(fèi)內(nèi)存以及 CPU 時(shí)間;
這也就是為什么要引入 self 來作為 _disposeBlock 內(nèi)容的原因。
-dispose: 方法的實(shí)現(xiàn)
這個(gè)只有不到 20 行的 -dispose: 方法已經(jīng)是整個(gè) RACDisposable 類中最復(fù)雜的方法了:
- (void)dispose {
void (^disposeBlock)(void) = NULL;
while (YES) {
void *blockPtr = _disposeBlock;
if (OSAtomicCompareAndSwapPtrBarrier(blockPtr, NULL, &_disposeBlock)) {
if (blockPtr != (__bridge void *)self) {
disposeBlock = CFBridgingRelease(blockPtr);
}
break;
}
}
if (disposeBlock != nil) disposeBlock();
}
但是其實(shí)它的實(shí)現(xiàn)也沒有復(fù)雜到哪里去,從 _disposeBlock 實(shí)例變量中調(diào)用 CFBridgingRelease 取出一個(gè) disposeBlock,然后執(zhí)行這個(gè) block,整個(gè)方法就結(jié)束了。
RACSerialDisposable
RACSerialDisposable 是一個(gè)用于持有 RACDisposable 的容器,它一次只能持有一個(gè) RACDisposable 的實(shí)例,并可以原子地?fù)Q出容器中保存的對象:
- (RACDisposable *)swapInDisposable:(RACDisposable *)newDisposable {
RACDisposable *existingDisposable;
BOOL alreadyDisposed;
pthread_mutex_lock(&_mutex);
alreadyDisposed = _disposed;
if (!alreadyDisposed) {
existingDisposable = _disposable;
_disposable = newDisposable;
}
pthread_mutex_unlock(&_mutex);
if (alreadyDisposed) {
[newDisposable dispose];
return nil;
}
return existingDisposable;
}
線程安全的 RACSerialDisposable 使用 pthred_mutex_t 互斥鎖來保證在訪問關(guān)鍵變量時(shí)不會出現(xiàn)線程競爭問題。
-dispose 方法的處理也十分簡單:
- (void)dispose {
RACDisposable *existingDisposable;
pthread_mutex_lock(&_mutex);
if (!_disposed) {
existingDisposable = _disposable;
_disposed = YES;
_disposable = nil;
}
pthread_mutex_unlock(&_mutex);
[existingDisposable dispose];
}
使用鎖保證線程安全,并在內(nèi)部的 _disposable 換出之后在執(zhí)行 -dispose 方法對訂閱進(jìn)行處理。
RACCompoundDisposable
與 RACSerialDisposable 只負(fù)責(zé)一個(gè) RACDisposable 對象的釋放不同;RACCompoundDisposable 同時(shí)負(fù)責(zé)多個(gè) RACDisposable 對象的釋放。
相比于只管理一個(gè) RACDisposable 對象的 RACSerialDisposable,RACCompoundDisposable 由于管理多個(gè)對象,其實(shí)現(xiàn)更加復(fù)雜,而且為了性能和內(nèi)存占用之間的權(quán)衡,其實(shí)現(xiàn)方式是通過持有兩個(gè)實(shí)例變量:
@interface RACCompoundDisposable () {
...
RACDisposable *_inlineDisposables[RACCompoundDisposableInlineCount];
CFMutableArrayRef _disposables;
...
}
在對象持有的 RACDisposable 不超過 RACCompoundDisposableInlineCount 時(shí),都會存儲在 _inlineDisposables 數(shù)組中,而更多的實(shí)例都會存儲在 _disposables 中:

RACCompoundDisposable 在使用 -initWithDisposables:初始化時(shí),會初始化兩個(gè) RACDisposable 的位置用于加速銷毀訂閱的過程,同時(shí)為了不浪費(fèi)內(nèi)存空間,在默認(rèn)情況下只占用兩個(gè)位置:
- (instancetype)initWithDisposables:(NSArray *)otherDisposables {
self = [self init];
[otherDisposables enumerateObjectsUsingBlock:^(RACDisposable *disposable, NSUInteger index, BOOL *stop) {
self->_inlineDisposables[index] = disposable;
if (index == RACCompoundDisposableInlineCount - 1) *stop = YES;
}];
if (otherDisposables.count > RACCompoundDisposableInlineCount) {
_disposables = RACCreateDisposablesArray();
CFRange range = CFRangeMake(RACCompoundDisposableInlineCount, (CFIndex)otherDisposables.count - RACCompoundDisposableInlineCount);
CFArrayAppendArray(_disposables, (__bridge CFArrayRef)otherDisposables, range);
}
return self;
}
如果傳入的 otherDisposables 多于 RACCompoundDisposableInlineCount,就會創(chuàng)建一個(gè)新的 CFMutableArrayRef 引用,并將剩余的 RACDisposable 全部傳入這個(gè)數(shù)組中。
在 RACCompoundDisposable 中另一個(gè)值得注意的方法就是 -addDisposable:
- (void)addDisposable:(RACDisposable *)disposable {
if (disposable == nil || disposable.disposed) return;
BOOL shouldDispose = NO;
pthread_mutex_lock(&_mutex);
{
if (_disposed) {
shouldDispose = YES;
} else {
for (unsigned i = 0; i < RACCompoundDisposableInlineCount; i++) {
if (_inlineDisposables[i] == nil) {
_inlineDisposables[i] = disposable;
goto foundSlot;
}
}
if (_disposables == NULL) _disposables = RACCreateDisposablesArray();
CFArrayAppendValue(_disposables, (__bridge void *)disposable);
foundSlot:;
}
}
pthread_mutex_unlock(&_mutex);
if (shouldDispose) [disposable dispose];
}
在向 RACCompoundDisposable 中添加新的 RACDisposable 對象時(shí),會先嘗試在 _inlineDisposables 數(shù)組中尋找空閑的位置,如果沒有找到,就會加入到 _disposables 中;但是,在添加 RACDisposable 的過程中也難免遇到當(dāng)前 RACCompoundDisposable 已經(jīng) dispose 的情況,而這時(shí)就會直接 -dispose 剛剛加入的對象。
訂閱的銷毀過程
在了解了 ReactiveCocoa 中與訂閱銷毀相關(guān)的類,我們就可以繼續(xù)對 -bind: 方法的分析了,之前在分析該方法時(shí)省略了 -bind: 在執(zhí)行過程中是如何處理訂閱的清理和銷毀的,所以會省略對于正常值和錯(cuò)誤的處理過程,首先來看一下簡化后的代碼:
- (RACSignal *)bind:(RACSignalBindBlock (^)(void))block {
return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
RACSignalBindBlock bindingBlock = block();
__block volatile int32_t signalCount = 1;
RACCompoundDisposable *compoundDisposable = [RACCompoundDisposable compoundDisposable];
void (^completeSignal)(RACDisposable *) = ...
void (^addSignal)(RACSignal *) = ...
RACSerialDisposable *selfDisposable = [[RACSerialDisposable alloc] init];
[compoundDisposable addDisposable:selfDisposable];
RACDisposable *bindingDisposable = [self subscribeNext:^(id x) {
BOOL stop = NO;
id signal = bindingBlock(x, &stop);
if (signal != nil) addSignal(signal);
if (signal == nil || stop) {
[selfDisposable dispose];
completeSignal(selfDisposable);
}
} completed:^{
completeSignal(selfDisposable);
}];
selfDisposable.disposable = bindingDisposable;
return compoundDisposable;
}] setNameWithFormat:@"[%@] -bind:", self.name];
}
在簡化的代碼中,訂閱的清理是由一個(gè) RACCompoundDisposable 的實(shí)例負(fù)責(zé)的,向這個(gè)實(shí)例中添加 RACSerialDisposable 以及 RACDisposable 對象,并在 RACCompoundDisposable 銷毀時(shí)銷毀。
completeSignal 和 addSignal 兩個(gè) block 主要負(fù)責(zé)處理新創(chuàng)建信號的清理工作:
void (^completeSignal)(RACDisposable *) = ^(RACDisposable *finishedDisposable) {
if (OSAtomicDecrement32Barrier(&signalCount) == 0) {
[subscriber sendCompleted];
[compoundDisposable dispose];
} else {
[compoundDisposable removeDisposable:finishedDisposable];
}
};
void (^addSignal)(RACSignal *) = ^(RACSignal *signal) {
OSAtomicIncrement32Barrier(&signalCount);
RACSerialDisposable *selfDisposable = [[RACSerialDisposable alloc] init];
[compoundDisposable addDisposable:selfDisposable];
RACDisposable *disposable = [signal completed:^{
completeSignal(selfDisposable);
}];
selfDisposable.disposable = disposable;
};
先通過一個(gè)例子來看一下 -bind: 方法調(diào)用之后,訂閱是如何被清理的:
RACSignal *signal = [RACSignal createSignal:^RACDisposable * _Nullable(id<RACSubscriber> _Nonnull subscriber) {
[subscriber sendNext:@1];
[subscriber sendNext:@2];
[subscriber sendCompleted];
return [RACDisposable disposableWithBlock:^{
NSLog(@"Original Signal Dispose.");
}];
}];
RACSignal *bindSignal = [signal bind:^RACSignalBindBlock _Nonnull{
return ^(NSNumber *value, BOOL *stop) {
NSNumber *returnValue = @(value.integerValue);
return [RACSignal createSignal:^RACDisposable * _Nullable(id<RACSubscriber> _Nonnull subscriber) {
for (NSInteger i = 0; i < value.integerValue; i++) [subscriber sendNext:returnValue];
[subscriber sendCompleted];
return [RACDisposable disposableWithBlock:^{
NSLog(@"Binding Signal Dispose.");
}];
}];
};
}];
[bindSignal subscribeNext:^(id _Nullable x) {
NSLog(@"%@", x);
}];
在每個(gè)訂閱創(chuàng)建以及所有的值發(fā)送之后,訂閱就會被就地銷毀,調(diào)用 disposeBlock,并從 RACCompoundDisposable 實(shí)例中移除:
1
Binding Signal Dispose.
2
2
Binding Signal Dispose.
Original Signal Dispose.
原訂閱的銷毀時(shí)間以及綁定信號的控制是由 SignalCount 控制的,其表示 RACCompoundDisposable 中的 RACSerialDisposable 實(shí)例的個(gè)數(shù),在每次有新的訂閱被創(chuàng)建時(shí)都會向 RACCompoundDisposable 加入一個(gè)新的 RACSerialDisposable,并在訂閱發(fā)送結(jié)束時(shí)從數(shù)組中移除,整個(gè)過程用圖示來表示比較清晰:

紫色的
RACSerialDisposable為原訂閱創(chuàng)建的對象,灰色的為新信號訂閱的對象。
總結(jié)
這是整個(gè) ReactiveCocoa 源代碼分析系列文章的第一篇,想寫一個(gè)跟這個(gè)系列有關(guān)的代碼已經(jīng)很久了,文章中對于 RACSignal 進(jìn)行了一些簡單的介紹,項(xiàng)目中絕大多數(shù)的方法都是很簡潔的,行數(shù)并不多,代碼的組織方式也很易于理解。雖然沒有太多讓人意外的東西,不過整個(gè)工程還是很值得閱讀的。
References
方法實(shí)現(xiàn)對照表
| 方法 | 實(shí)現(xiàn) |
|---|---|
+return: |
RACSignal.m#L89-L91 |
-bind: |
RACSignal.m#L93-176 |
Github Repo:iOS-Source-Code-Analyze
Follow: Draveness · GitHub
Source: http://draveness.me/racsignal