在 ReactiveCocoa 中除了不可變的信號 RACSignal,也有用于橋接非 RAC 代碼到 ReactiveCocoa 世界的『可變』信號 RACSubject。

RACSubject 到底是什么?根據(jù)其字面意思,可以將它理解為一個(gè)可以訂閱的主題,我們在訂閱主題之后,向主題發(fā)送新的消息時(shí),所有的訂閱者都會接收到最新的消息。
但是這么解釋確實(shí)有點(diǎn)晦澀,也不易于理解,ReactiveCocoa 團(tuán)隊(duì)對 RACSubject 的解釋是,RACSubject 其實(shí)就是一個(gè)可以手動(dòng)控制的信號(感覺這么解釋更難理解了)。
A subject, represented by the RACSubject class, is a signal that can be manually controlled.
RACSubject 簡介
RACSubject 是 RACSignal 的子類,與 RACSignal 以及 RACSequence 有著眾多的類簇不同,RACSubject 在整個(gè)工程中并沒有多少子類;不過,在大多數(shù)情況下,我們也只會使用 RACSubject 自己或者 RACReplaySubject。

相比于 RACSignal 豐富的頭文件 ,RACSubject 對外的接口并沒有提供太多的方法:
@interface RACSubject : RACSignal <RACSubscriber>
+ (instancetype)subject;
@end
唯一提供的接口就是用于返回一個(gè)新實(shí)例的 +subject 方法;除此之外,在筆者看來它與 RACSignal 最大的不同就是:RACSubject 實(shí)現(xiàn)了 RACSubscriber 協(xié)議,也就是下面的這些方法:
@protocol RACSubscriber <NSObject>
@required
- (void)sendNext:(nullable id)value;
- (void)sendError:(nullable NSError *)error;
- (void)sendCompleted;
- (void)didSubscribeWithDisposable:(RACCompoundDisposable *)disposable;
@end
我們并不能在一個(gè) RACSignal 對象上執(zhí)行這些方法,只能在創(chuàng)建信號的 block 里面向遵循 RACSubscriber 協(xié)議的對象發(fā)送新的值或者錯(cuò)誤,這也是 RACSubject 和父類最大的不同:在 RACSubject 實(shí)例初始化之后,也可以通過這個(gè)實(shí)例向所有的訂閱者發(fā)送消息。
冷信號與熱信號
提到 RACSubject 就不得不提 ReactiveCocoa 中的另一對概念,冷信號和熱信號。
其實(shí)解釋這兩者之間區(qū)別的文章已經(jīng)很多了,我相信各位讀者能找到很多的資料,在這里就簡單介紹一下冷熱信號的概念,如果想要了解更多的內(nèi)容可以在 References 中找到更多的文章。
對于冷熱信號概念,我們借用 Rx 中的描述:

Cold signal is sequences that are passive and start producing notifications on request (when subscribed to), and hot signal is sequences that are active and produce notifications regardless of subscriptions. ---- Hot and Cold observables
冷信號是被動(dòng)的,只會在被訂閱時(shí)向訂閱者發(fā)送通知;熱信號是主動(dòng)的,它會在任意時(shí)間發(fā)出通知,與訂閱者的訂閱時(shí)間無關(guān)。
也就是說冷信號所有的訂閱者會在訂閱時(shí)收到完全相同的序列;而訂閱熱信號之后,只會收到在訂閱之后發(fā)出的序列。
熱信號的訂閱者能否收到消息取決于訂閱的時(shí)間。
熱信號在我們生活中有很多的例子,比如訂閱雜志時(shí)并不會把之前所有的期刊都送到我們手中,只會接收到訂閱之后的期刊;而對于冷信號的話,舉一個(gè)不恰當(dāng)?shù)睦?,每一年的高考考生在『訂閱』高考之后,收到往年所有的試卷,并在高考之后會取消訂閱?/p>
熱信號 RACSubject
在 ReactiveCocoa 中,我們使用 RACSignal 來表示冷信號,也就是每一個(gè)訂閱者在訂閱信號時(shí)都會收到完整的序列;RACSubject 用于表示熱信號,訂閱者接收到多少值取決于它訂閱的時(shí)間。
前面的文章中已經(jīng)對 RACSignal 冷信號有了很多的介紹,這里也就不會多說了;這一小節(jié)主要的內(nèi)容是想通過一個(gè)例子,簡單展示 RACSubject 的訂閱者收到的內(nèi)容與訂閱時(shí)間的關(guān)系:
RACSubject *subject = [RACSubject subject];
// Subscriber 1
[subject subscribeNext:^(id _Nullable x) {
NSLog(@"1st Sub: %@", x);
}];
[subject sendNext:@1];
// Subscriber 2
[subject subscribeNext:^(id _Nullable x) {
NSLog(@"2nd Sub: %@", x);
}];
[subject sendNext:@2];
// Subscriber 3
[subject subscribeNext:^(id _Nullable x) {
NSLog(@"3rd Sub: %@", x);
}];
[subject sendNext:@3];
[subject sendCompleted];
這里以圖的方式來展示整個(gè)訂閱與訂閱者接收消息的過程:

從圖中我們可以清楚的看到,幾個(gè)訂閱者根據(jù)訂閱時(shí)間的不同收到了不同的數(shù)字序列,RACSubject 是時(shí)間相關(guān)的,它在發(fā)送消息時(shí)只會向已訂閱的訂閱者推送消息。
RACSubject 的實(shí)現(xiàn)
RACSubject 的實(shí)現(xiàn)并不復(fù)雜,它『可變』的特性都來源于持有的訂閱者數(shù)組 subscribers,在每次執(zhí)行 subscribeNext:error:completed: 一類便利方法時(shí),都會將傳入的 id<RACSubscriber> 對象加入數(shù)組:
- (RACDisposable *)subscribe:(id<RACSubscriber>)subscriber {
RACCompoundDisposable *disposable = [RACCompoundDisposable compoundDisposable];
subscriber = [[RACPassthroughSubscriber alloc] initWithSubscriber:subscriber signal:self disposable:disposable];
NSMutableArray *subscribers = self.subscribers;
@synchronized (subscribers) {
[subscribers addObject:subscriber];
}
[disposable addDisposable:[RACDisposable disposableWithBlock:^{
@synchronized (subscribers) {
NSUInteger index = [subscribers indexOfObjectWithOptions:NSEnumerationReverse passingTest:^ BOOL (id<RACSubscriber> obj, NSUInteger index, BOOL *stop) {
return obj == subscriber;
}];
if (index != NSNotFound) [subscribers removeObjectAtIndex:index];
}
}]];
return disposable;
}
訂閱的過程分為三個(gè)部分:
- 初始化一個(gè)
RACPassthroughSubscriber實(shí)例; - 將
subscriber加入RACSubject持有的數(shù)組中; - 創(chuàng)建一個(gè)
RACDisposable對象,在當(dāng)前subscriber銷毀時(shí),將自身從數(shù)組中移除。

-subscribe: 將所有遵循 RACSubscriber 協(xié)議的對象全部加入當(dāng)前 RACSubject 持有的數(shù)組 subscribers 中。
在上一節(jié)的例子中,我們能對 RACSubject 發(fā)送 -sendNext: 等消息也都取決于它實(shí)現(xiàn)了 ?RACSubscriber 協(xié)議:
- (void)sendNext:(id)value {
[self enumerateSubscribersUsingBlock:^(id<RACSubscriber> subscriber) {
[subscriber sendNext:value];
}];
}
- (void)sendError:(NSError *)error {
[self.disposable dispose];
[self enumerateSubscribersUsingBlock:^(id<RACSubscriber> subscriber) {
[subscriber sendError:error];
}];
}
- (void)sendCompleted {
[self.disposable dispose];
[self enumerateSubscribersUsingBlock:^(id<RACSubscriber> subscriber) {
[subscriber sendCompleted];
}];
}
RACSubject 會在自身接受到這些方法時(shí),下發(fā)給持有的全部的 subscribers。

代碼中的 -enumerateSubscribersUsingBlock: 只是一個(gè)使用 for 循環(huán)遍歷 subscribers 的安全方法:
- (void)enumerateSubscribersUsingBlock:(void (^)(id<RACSubscriber> subscriber))block {
NSArray *subscribers;
@synchronized (self.subscribers) {
subscribers = [self.subscribers copy];
}
for (id<RACSubscriber> subscriber in subscribers) {
block(subscriber);
}
}
RACSubject 就是圍繞一個(gè) NSMutableArray 數(shù)組實(shí)現(xiàn)的,實(shí)現(xiàn)還是非常簡單的,只是在需要訪問 subscribers 的方法中使用 @synchronized 避免線程競爭。
@interface RACSubject ()
@property (nonatomic, strong, readonly) NSMutableArray *subscribers;
@end
RACSubject 提供的初始化類方法 +subject 也只是初始化了幾個(gè)成員變量:
+ (instancetype)subject {
return [[self alloc] init];
}
- (instancetype)init {
self = [super init];
if (self == nil) return nil;
_disposable = [RACCompoundDisposable compoundDisposable];
_subscribers = [[NSMutableArray alloc] initWithCapacity:1];
return self;
}
至此,對于 RACSubject 的分析就結(jié)束了,接下來會分析更多的子類。
RACBehaviorSubject 與 RACReplaySubject
這一節(jié)會介紹 RACSubject 的兩個(gè)子類 RACBehaviorSubject 和 RACReplaySubject,前者在訂閱時(shí)會向訂閱者發(fā)送最新的消息,后者在訂閱之后可以重新發(fā)送之前的所有消息序列。
RACBehaviorSubject
先來介紹兩者中實(shí)現(xiàn)較簡單的 RACBehaviorSubject,它在內(nèi)部會保存一個(gè) currentValue 對象,也就是最后一次發(fā)送的消息:
@interface RACBehaviorSubject ()
@property (nonatomic, strong) id currentValue;
@end
在每次執(zhí)行 -sendNext: 時(shí),都會對 RACBehaviorSubject 中保存的 currentValue 進(jìn)行更新,并使用父類的 -sendNext: 方法,向所有的訂閱者發(fā)送最新的消息:
- (void)sendNext:(id)value {
@synchronized (self) {
self.currentValue = value;
[super sendNext:value];
}
}
RACBehaviorSubject 最重要的特性就是在訂閱時(shí),向最新的訂閱者發(fā)送之前的消息,這是通過覆寫 -subscribe: 方法實(shí)現(xiàn)的。
在調(diào)用子類的 -subscribe: 方法之后,會在 subscriber 對象上執(zhí)行 -sendNext: 方法:
- (RACDisposable *)subscribe:(id<RACSubscriber>)subscriber {
RACDisposable *subscriptionDisposable = [super subscribe:subscriber];
RACDisposable *schedulingDisposable = [RACScheduler.subscriptionScheduler schedule:^{
@synchronized (self) {
[subscriber sendNext:self.currentValue];
}
}];
return [RACDisposable disposableWithBlock:^{
[subscriptionDisposable dispose];
[schedulingDisposable dispose];
}];
}
接下來,通過一個(gè)簡單的例子來演示 RACBehaviorSubject 到底是如何工作的:
RACBehaviorSubject *subject = [RACBehaviorSubject subject];
[subject subscribeNext:^(id _Nullable x) {
NSLog(@"1st Sub: %@", x);
}];
[subject sendNext:@1];
[subject subscribeNext:^(id _Nullable x) {
NSLog(@"2nd Sub: %@", x);
}];
[subject sendNext:@2];
[subject subscribeNext:^(id _Nullable x) {
NSLog(@"3rd Sub: %@", x);
}];
[subject sendNext:@3];
[subject sendCompleted];
上面的代碼其實(shí)與 RACSubject 一節(jié)中的代碼差不多,只將 RACSubject 轉(zhuǎn)換成了 RACBehaviorSubject 對象。

在每次訂閱者訂閱 RACBehaviorSubject 之后,都會向該訂閱者發(fā)送最新的消息,這也就是 RACBehaviorSubject 最重要的行為。
RACBehaviorSubject 有一個(gè)用于創(chuàng)建包含默認(rèn)值的類方法 +behaviorSubjectWithDefaultValue:,如果將上面的第一行代碼改成:
RACBehaviorSubject *subject = [RACBehaviorSubject behaviorSubjectWithDefaultValue:@0];
那么在第一個(gè)訂閱者剛訂閱 RACBehaviorSubject 時(shí)就會收到 @0 對象。

RACReplaySubject
RACReplaySubject 相當(dāng)于一個(gè)自帶 buffer 的 RACBehaviorSubject,它可以在每次有新的訂閱者訂閱之后發(fā)送之前的全部消息。
@interface RACReplaySubject ()
@property (nonatomic, assign, readonly) NSUInteger capacity;
@property (nonatomic, strong, readonly) NSMutableArray *valuesReceived;
@end
實(shí)現(xiàn)的方式是通過持有一個(gè) valuesReceived 的數(shù)組和能夠存儲的對象的上限 capacity,默認(rèn)值為:
const NSUInteger RACReplaySubjectUnlimitedCapacity = NSUIntegerMax;
當(dāng)然你可以用 +replaySubjectWithCapacity: 初始化一個(gè)其它大小的 RACReplaySubject 對象:
+ (instancetype)replaySubjectWithCapacity:(NSUInteger)capacity {
return [(RACReplaySubject *)[self alloc] initWithCapacity:capacity];
}
- (instancetype)initWithCapacity:(NSUInteger)capacity {
self = [super init];
_capacity = capacity;
_valuesReceived = (capacity == RACReplaySubjectUnlimitedCapacity ? [NSMutableArray array] : [NSMutableArray arrayWithCapacity:capacity]);
return self;
}
在每次調(diào)用 -sendNext: 方法發(fā)送消息時(shí),都會將其加入 valuesReceived 數(shù)組中,并踢出之前的元素:
- (void)sendNext:(id)value {
@synchronized (self) {
[self.valuesReceived addObject:value ?: RACTupleNil.tupleNil];
[super sendNext:value];
if (self.capacity != RACReplaySubjectUnlimitedCapacity && self.valuesReceived.count > self.capacity) {
[self.valuesReceived removeObjectsInRange:NSMakeRange(0, self.valuesReceived.count - self.capacity)];
}
}
}
需要注意的有兩點(diǎn),一是對 valuesReceived 的數(shù)組的操作必須使用 @synchronized 加鎖;第二,如果 value 為空的話,也需要將其轉(zhuǎn)換成 RACTupleNil.tupleNil 對象進(jìn)行保存。

-sendError: 和 -sendCompleted 方法都會標(biāo)記對應(yīng) flag,即 hasCompleted 和 hasError,這里就不介紹了;同樣的,RACReplaySubject 也覆寫了 -subscribe: 方法,在每次有訂閱者訂閱時(shí)重新發(fā)送所有的序列:
- (RACDisposable *)subscribe:(id<RACSubscriber>)subscriber {
RACCompoundDisposable *compoundDisposable = [RACCompoundDisposable compoundDisposable];
RACDisposable *schedulingDisposable = [RACScheduler.subscriptionScheduler schedule:^{
@synchronized (self) {
for (id value in self.valuesReceived) {
if (compoundDisposable.disposed) return;
[subscriber sendNext:(value == RACTupleNil.tupleNil ? nil : value)];
}
if (compoundDisposable.disposed) return;
if (self.hasCompleted) {
[subscriber sendCompleted];
} else if (self.hasError) {
[subscriber sendError:self.error];
} else {
RACDisposable *subscriptionDisposable = [super subscribe:subscriber];
[compoundDisposable addDisposable:subscriptionDisposable];
}
}
}];
[compoundDisposable addDisposable:schedulingDisposable];
return compoundDisposable;
}
我們?nèi)匀皇褂蒙弦还?jié)中的例子來展示 RACReplaySubject 是如何工作的,只修改第一行代碼:
RACReplaySubject *subject = [RACReplaySubject subject];
[subject subscribeNext:^(id _Nullable x) {
NSLog(@"1st Subscriber: %@", x);
}];
[subject sendNext:@1];
[subject subscribeNext:^(id _Nullable x) {
NSLog(@"2nd Subscriber: %@", x);
}];
[subject sendNext:@2];
[subject subscribeNext:^(id _Nullable x) {
NSLog(@"3rd Subscriber: %@", x);
}];
[subject sendNext:@3];
[subject sendCompleted];
運(yùn)行這段代碼之后,會得到如下圖的結(jié)果:

所有訂閱 RACReplaySubject 的對象(默認(rèn)行為)都能獲得完整的序列,而這個(gè)特性在與 RACMulticastConnection 一起使用也有著巨大威力,我們會在之后的文章中介紹。
總結(jié)
RACSubject 在 RACSignal 對象之上進(jìn)行了簡單的修改,將原有的冷信號改造成了熱信號,將不可變變成了可變。
雖然 RACSubject 的實(shí)現(xiàn)并不復(fù)雜,只是存儲了一個(gè)遵循 RACSubscriber 協(xié)議的對象列表以及所有的消息,但是在解決實(shí)際問題時(shí)卻能夠很好地解決很多與網(wǎng)絡(luò)操作相關(guān)的問題。
References
Github Repo:iOS-Source-Code-Analyze
Follow: Draveness · GitHub
Source: http://draveness.me/racsubject