
參考資料:《RAC基礎(chǔ):信號(hào)和訂閱者模式》、《可變的熱信號(hào) RACSubject》
本文知識(shí)點(diǎn):探究 RACSignal 及 RACSubject 的底層實(shí)現(xiàn)。RACSubject 及其子類(lèi)(RACBehaviorSubject 、RACReplaySubject)的使用場(chǎng)景。冷熱信號(hào)。
1. RACSignal

所有的信號(hào)(RACSignal)都可以進(jìn)行操作處理,因?yàn)樗胁僮鞣椒ǘ级x在RACStream.h中,因此只要繼承RACStream就有了操作處理方法。
1.1 底層實(shí)現(xiàn)概述
- 創(chuàng)建信號(hào),首先把didSubscribe保存到信號(hào)中,還不會(huì)觸發(fā)。
- 當(dāng)信號(hào)被訂閱,也就是調(diào)用signal的subscribeNext:nextBlock
- subscribeNext內(nèi)部會(huì)創(chuàng)建訂閱者subscriber,并且把nextBlock保存到subscriber中。
- subscribeNext內(nèi)部會(huì)調(diào)用signal的didSubscribe
- signal的didSubscribe中調(diào)用[subscriber sendNext:@1];
- sendNext底層其實(shí)就是執(zhí)行subscriber的nextBlock
1.2 逐步分析
1> RACSignal的創(chuàng)建
RACSignal通過(guò)子類(lèi)[RACDynamicSignal createSignal:]方法獲得Signal,并將didSubscribe這個(gè)block保存在Signal中,先不觸發(fā)。
<RACSignal.m>
+ (RACSignal *)createSignal:(RACDisposable * (^)(id<RACSubscriber> subscriber))didSubscribe {
return [RACDynamicSignal createSignal:didSubscribe];
}
<RACDynamicSignal.m>
+ (RACSignal *)createSignal:(RACDisposable * (^)(id<RACSubscriber> subscriber))didSubscribe {
RACDynamicSignal *signal = [[self alloc] init];
//將didSubscribe這個(gè)block保存在signal中
signal->_didSubscribe = [didSubscribe copy];
return [signal setNameWithFormat:@"+createSignal:"];
}
2> subscriber的創(chuàng)建
當(dāng)信號(hào)被訂閱,signal通過(guò)調(diào)用subscribeNext方法生成subscriber,并將next、error、completed block保存在subscriber中
- 注意:subscriber方法返回值為RACDisposable * ,如果在發(fā)出信號(hào)之前就取消訂閱([disposable dispose])就無(wú)法發(fā)出信號(hào)。
<RACSignal.m>
//當(dāng)信號(hào)被訂閱(sendNext:..),內(nèi)部調(diào)用subscribeNext:方法
- (RACDisposable *)subscribeNext:(void (^)(id x))nextBlock {
NSCParameterAssert(nextBlock != NULL);
//signal通過(guò)調(diào)用subscribeNext方法生成subscriber,并將next、error、completed block保存在subscriber中
RACSubscriber *o = [RACSubscriber subscriberWithNext:nextBlock error:NULL completed:NULL];
return [self subscribe:o];
}
<RACSubscriber.m>
+ (instancetype)subscriberWithNext:(void (^)(id x))next error:(void (^)(NSError *error))error completed:(void (^)(void))completed {
RACSubscriber *subscriber = [[self alloc] init];
subscriber->_next = [next copy];
subscriber->_error = [error copy];
subscriber->_completed = [completed copy];
return subscriber;
}
3> 進(jìn)行訂閱subscribe
創(chuàng)建subscriber之后調(diào)用signal的subscribe方法,并將創(chuàng)建的subscriber作為參數(shù)。
這一步會(huì)生成RACCompoundDisposable和RACPassthroughSubscriber對(duì)象。
- RACCompoundDisposable:RACDisposable的子類(lèi),可以加入多個(gè)RACDisposable對(duì)象。當(dāng)RACCompoundDisposable對(duì)象被dispose的時(shí)候,會(huì)dispose容器內(nèi)的所有RACDisposable對(duì)象。
- RACPassthroughSubscriber:分別保存對(duì)RACSignal,RACSubscriber,RACCompoundDisposable的引用。通過(guò)RACPassthroughSubscriber對(duì)象來(lái)轉(zhuǎn)發(fā)給真正的Subscriber。
<RACDynamicSignal.m>
- (RACDisposable *)subscribe:(id<RACSubscriber>)subscriber {
NSCParameterAssert(subscriber != nil);
//RACDisposable的子類(lèi),可以加入多個(gè)RACDisposable對(duì)象。
//當(dāng)RACCompoundDisposable對(duì)象被dispose的時(shí)候,會(huì)dispose容器內(nèi)的所有RACDisposable對(duì)象。
RACCompoundDisposable *disposable = [RACCompoundDisposable compoundDisposable];
//分別保存對(duì)RACSignal,RACSubscriber,RACCompoundDisposable的引用。
//通過(guò)RACPassthroughSubscriber對(duì)象來(lái)轉(zhuǎn)發(fā)給真正的Subscriber。
subscriber = [[RACPassthroughSubscriber alloc] initWithSubscriber:subscriber signal:self disposable:disposable];
if (self.didSubscribe != NULL) {
RACDisposable *schedulingDisposable = [RACScheduler.subscriptionScheduler schedule:^{
RACDisposable *innerDisposable = self.didSubscribe(subscriber);
[disposable addDisposable:innerDisposable];
}];
[disposable addDisposable:schedulingDisposable];
}
return disposable;
}
4> 執(zhí)行didSubscribe block
RACSignal通過(guò)RACScheduler.subscriptionScheduler來(lái)執(zhí)行閉包,didSubscribe真正被調(diào)用的的位置就是上一步的RACDisposable *innerDisposable = self.didSubscribe(subscriber);
- (RACDisposable *)schedule:(void (^)(void))block {
NSCParameterAssert(block != NULL);
if (RACScheduler.currentScheduler == nil) return [self.backgroundScheduler schedule:block];
block();
return nil;
}
5> 調(diào)用sendNext、sendError、sendCompleted
進(jìn)入didSubscribe閉包后,調(diào)用sendNext:、sendError:、sendCompleted。由于第三步中將subscriber替換為RACPassthroughSubscriber對(duì)象,真正的subscriber被存儲(chǔ)在RACPassthroughSubscriber對(duì)象中,即innerSubscriber,所以這一步的各種send方法其實(shí)是一個(gè)轉(zhuǎn)發(fā)過(guò)程。
<RACPassthroughSubscriber.m>
- (void)sendNext:(id)value {
if (self.disposable.disposed) return;
if (RACSIGNAL_NEXT_ENABLED()) {
RACSIGNAL_NEXT(cleanedSignalDescription(self.signal), cleanedDTraceString(self.innerSubscriber.description), cleanedDTraceString([value description]));
}
[self.innerSubscriber sendNext:value];
}
- (void)sendError:(NSError *)error {
if (self.disposable.disposed) return;
if (RACSIGNAL_ERROR_ENABLED()) {
RACSIGNAL_ERROR(cleanedSignalDescription(self.signal), cleanedDTraceString(self.innerSubscriber.description), cleanedDTraceString(error.description));
}
[self.innerSubscriber sendError:error];
}
- (void)sendCompleted {
if (self.disposable.disposed) return;
if (RACSIGNAL_COMPLETED_ENABLED()) {
RACSIGNAL_COMPLETED(cleanedSignalDescription(self.signal), cleanedDTraceString(self.innerSubscriber.description));
}
[self.innerSubscriber sendCompleted];
}
6> 執(zhí)行next、error、completed閉包
通過(guò)調(diào)用innerSubscriber的sendNext:、sendError、和sendCompleted方法執(zhí)行真正的subscriber中的next、error 、completed閉包。
<RACSubscriber.m>
- (void)sendNext:(id)value {
@synchronized (self) {
void (^nextBlock)(id) = [self.next copy];
if (nextBlock == nil) return;
nextBlock(value);
}
}
- (void)sendError:(NSError *)e {
@synchronized (self) {
void (^errorBlock)(NSError *) = [self.error copy];
[self.disposable dispose];
if (errorBlock == nil) return;
errorBlock(e);
}
}
- (void)sendCompleted {
@synchronized (self) {
void (^completedBlock)(void) = [self.completed copy];
[self.disposable dispose];
if (completedBlock == nil) return;
completedBlock();
}
}
簡(jiǎn)單來(lái)說(shuō),過(guò)程就是:
(1) 通過(guò)createSignal生成信號(hào)
(2) 通過(guò)subscribeNext確定信號(hào)內(nèi)容到來(lái)時(shí)的處理方式
(3) didSubscribe block塊中異步處理完畢之后,進(jìn)行sendNext、sendError和sendCompleted自動(dòng)處理next、error、complete。
舉個(gè)例子,我們發(fā)送網(wǎng)絡(luò)請(qǐng)求,希望在出錯(cuò)的時(shí)候,能給用戶提示,那么首先,創(chuàng)建信號(hào)的時(shí)候,在網(wǎng)絡(luò)請(qǐng)求失敗的回調(diào)中,我們要[subscriber sendError:netError],也就是發(fā)送錯(cuò)誤事件。然后再訂閱錯(cuò)誤事件,也就是subscriberError:...這樣就完成了錯(cuò)誤信號(hào)的訂閱。
complete事件比較特殊,它有終止訂閱關(guān)系的意味,我們先大致了解一下RACDispoable對(duì)象吧,我們知道,訂閱關(guān)系需要有終止的時(shí)候,比如,在tableViewCell的復(fù)用的時(shí)候,cell會(huì)訂閱model類(lèi)產(chǎn)生一個(gè)信號(hào),但是當(dāng)cell被復(fù)用的時(shí)候,如果不把之前的訂閱關(guān)系取消掉,就會(huì)出現(xiàn)同時(shí)訂閱了2個(gè)model的情況。我們可以發(fā)現(xiàn),subscribeNext、subscribeError、subscribeComplete事件返回的都是RACDisopable對(duì)象,當(dāng)我們希望終止訂閱的時(shí)候,調(diào)用[RACDisposable dispose]就可以了。complete也是這個(gè)原理。
2. RACSubject
底層實(shí)現(xiàn)和 RACSignal 不一樣,是信號(hào)提供者,自己可以充當(dāng)信號(hào),又能發(fā)送信號(hào)。
可以將 RACSubject 理解為一個(gè)可以訂閱的主題,我們?cè)谟嗛喼黝}之后,向主題發(fā)送新的消息時(shí),所有的訂閱者都會(huì)接收到最新的消息。
2.1 底層實(shí)現(xiàn)概述
2.2 RACSubject
RACSubject不關(guān)心歷史值,錯(cuò)過(guò)就錯(cuò)過(guò)了。
使用場(chǎng)景:通常用來(lái)代替代理,有了它,就不必要定義代理了。
- 調(diào)用subscribeNext訂閱信號(hào),只是把訂閱者和 nextBlock保存起來(lái)。
- 調(diào)用sendNext發(fā)送信號(hào),遍歷剛剛保存的所有訂閱者,一個(gè)一個(gè)調(diào)用訂閱者的nextBlock。
- 訂閱信號(hào) - (RACDisposable *)subscribeNext:(void (^)(id x))nextBlock
- 發(fā)送信號(hào) sendNext:(id)value
RACSubject *subject = [RACSubject subject];
//還沒(méi)有訂閱,發(fā)送信號(hào)不作處理
[subject sendNext:@"ooo"];
//調(diào)用subscribeNext訂閱信號(hào),只是把訂閱者保存起來(lái)
[subject subscribeNext:^(id _Nullable x) {
NSLog(@"訂閱者1%@",x);
}];
//發(fā)送信號(hào),遍歷剛剛保存的所有訂閱者,一個(gè)一個(gè)調(diào)用訂閱者的nextBlock。
[subject sendNext:@"aaa"];
[subject subscribeNext:^(id _Nullable x) {
NSLog(@"訂閱者2%@",x);
}];
[subject sendNext:@"bbb"];
Output:
2018-04-03 13:39:43.680110+0800 Demo[5748:334998] 訂閱者1aaa
2018-04-03 13:39:43.680316+0800 Demo[5748:334998] 訂閱者1bbb
2018-04-03 13:39:43.680489+0800 Demo[5748:334998] 訂閱者2bbb
2.2.1 使用場(chǎng)景:代替代理
// ViewController.m
- (void)viewDidLoad {
[super viewDidLoad];
self.view.bgColor(@"gray");
MView *view = [MView new];
view.bgColor(@"white");
view.addTo(self.view).makeCons(^{
make.center.equal.superview.constants(0);
make.width.equal.constants(300);
make.height.equal.constants(500);
});
[view.delegateSignal subscribeNext:^(id _Nullable x) {
NSLog(@"received: %@",x);
}];
}
// MView.h
@interface MView : UIView
@property (nonatomic, strong) RACSubject *delegateSignal;
@end
// MView.m
@implementation MView
- (instancetype)initWithFrame:(CGRect)frame {
if (self = [super initWithFrame:frame]) {
@weakify(self);
Button.str(@"click").color(@"red").addTo(self).makeCons(^{
make.center.equal.superview.constants(0);
}).onClick(^{
@strongify(self);
[self.delegateSignal sendNext:@"clicked"];
});
}
return self;
}
- (RACSubject *)delegateSignal {
if (!_delegateSignal) {
_delegateSignal = [RACSubject subject];
}
return _delegateSignal;
}
@end
2.3 RACBehaviorSubject
在訂閱時(shí)會(huì)向最新的訂閱者發(fā)送最近的(上一條)消息。
它在內(nèi)部會(huì)保存一個(gè) currentValue 對(duì)象,也就是最后一次發(fā)送的消息:
@interface RACBehaviorSubject ()
@property (nonatomic, strong) id currentValue;
@end
在每次執(zhí)行 -sendNext: 時(shí),都會(huì)對(duì) RACBehaviorSubject 中保存的 currentValue 進(jìn)行更新,并使用父類(lèi)的 -sendNext: 方法,向所有的訂閱者發(fā)送最新的消息。
RACBehaviorSubject *subject = [RACBehaviorSubject subject];
[subject subscribeNext:^(id _Nullable x) {
NSLog(@"1st Sub: %@", x);
}];
[subject sendNext:@1];
[subject subscribeNext:^(id _Nullable x) {
NSLog(@"2nd Sub: %@", x);
}];
[subject sendNext:@2];
[subject subscribeNext:^(id _Nullable x) {
NSLog(@"3rd Sub: %@", x);
}];
[subject sendNext:@3];
[subject subscribeNext:^(id _Nullable x) {
NSLog(@"4th Sub: %@", x);
}];
[subject sendNext:@4];
[subject sendCompleted];
Output:
2019-06-12 16:04:57.586368+0800 RacDemo[5865:1933277] 1st Sub: (null) //向最新的訂閱者發(fā)送上一條消息,可以通過(guò) +behaviorSubjectWithDefaultValue:方法設(shè)置默認(rèn)值
2019-06-12 16:04:57.586502+0800 RacDemo[5865:1933277] 1st Sub: 1
2019-06-12 16:04:57.586543+0800 RacDemo[5865:1933277] 2nd Sub: 1 //向最新的訂閱者發(fā)送上一條消息
2019-06-12 16:04:57.586566+0800 RacDemo[5865:1933277] 1st Sub: 2
2019-06-12 16:04:57.586581+0800 RacDemo[5865:1933277] 2nd Sub: 2
2019-06-12 16:04:57.586622+0800 RacDemo[5865:1933277] 3rd Sub: 2 //向最新的訂閱者發(fā)送上一條消息
2019-06-12 16:04:57.586669+0800 RacDemo[5865:1933277] 1st Sub: 3
2019-06-12 16:04:57.586684+0800 RacDemo[5865:1933277] 2nd Sub: 3
2019-06-12 16:04:57.586697+0800 RacDemo[5865:1933277] 3rd Sub: 3
2019-06-12 16:04:57.586754+0800 RacDemo[5865:1933277] 4th Sub: 3 //向最新的訂閱者發(fā)送上一條消息
2019-06-12 16:04:57.586771+0800 RacDemo[5865:1933277] 1st Sub: 4
2019-06-12 16:04:57.586784+0800 RacDemo[5865:1933277] 2nd Sub: 4
2019-06-12 16:04:57.586797+0800 RacDemo[5865:1933277] 3rd Sub: 4
2019-06-12 16:04:57.586810+0800 RacDemo[5865:1933277] 4th Sub: 4
在每次訂閱者訂閱 RACBehaviorSubject 之后,都會(huì)向該訂閱者發(fā)送最新的消息,這也就是 RACBehaviorSubject 最重要的行為。
RACBehaviorSubject 有一個(gè)用于創(chuàng)建包含默認(rèn)值的類(lèi)方法 +behaviorSubjectWithDefaultValue:,如果將上面的第一行代碼改成:
RACBehaviorSubject *subject = [RACBehaviorSubject behaviorSubjectWithDefaultValue:@0];
那么在第一個(gè)訂閱者剛訂閱 RACBehaviorSubject 時(shí)就會(huì)收到 @0 對(duì)象。
2.4 RACReplaySubject
在訂閱之后可以重新發(fā)送之前的所有消息序列。
可以先訂閱信號(hào),也可以先發(fā)送信號(hào)。
RACReplaySubject 相當(dāng)于一個(gè)自帶 buffer 的 RACBehaviorSubject,它可以在每次有新的訂閱者訂閱之后發(fā)送之前的全部消息。
@interface RACReplaySubject ()
@property (nonatomic, assign, readonly) NSUInteger capacity;
@property (nonatomic, strong, readonly) NSMutableArray *valuesReceived;
@end
實(shí)現(xiàn)的方式是通過(guò)持有一個(gè) valuesReceived 的數(shù)組和能夠存儲(chǔ)的對(duì)象的上限 capacity,默認(rèn)值為:
const NSUInteger RACReplaySubjectUnlimitedCapacity = NSUIntegerMax;
當(dāng)然你可以用 +replaySubjectWithCapacity: 初始化一個(gè)其它大小的 RACReplaySubject 對(duì)象:
+ (instancetype)replaySubjectWithCapacity:(NSUInteger)capacity {
return [(RACReplaySubject *)[self alloc] initWithCapacity:capacity];
}
- (instancetype)initWithCapacity:(NSUInteger)capacity {
self = [super init];
_capacity = capacity;
_valuesReceived = (capacity == RACReplaySubjectUnlimitedCapacity ? [NSMutableArray array] : [NSMutableArray arrayWithCapacity:capacity]);
return self;
}
使用時(shí):
- 調(diào)用sendNext發(fā)送信號(hào),把值保存起來(lái),然后遍歷剛剛保存的所有訂閱者,一個(gè)一個(gè)調(diào)用訂閱者的nextBlock。
- 調(diào)用subscribeNext訂閱信號(hào),遍歷保存的所有值,一個(gè)一個(gè)調(diào)用訂閱者的nextBlock
RACReplaySubject *subject = [RACReplaySubject subject];
[subject subscribeNext:^(id x) { //遍歷保存的所有值,一個(gè)一個(gè)調(diào)用訂閱者的nextBlock
NSLog(@"%@",x);
}];
[subject sendNext:@1]; //把值保存起來(lái),然后遍歷剛剛保存的所有訂閱者,一個(gè)一個(gè)調(diào)用訂閱者的nextBlock。
[subject subscribeNext:^(id x) { //遍歷保存的所有值,一個(gè)一個(gè)調(diào)用訂閱者的nextBlock
NSLog(@"%@",x);
}];
[subject sendNext:@2]; //把值保存起來(lái),然后遍歷剛剛保存的所有訂閱者,一個(gè)一個(gè)調(diào)用訂閱者的nextBlock。
[subject subscribeNext:^(id x) { //遍歷保存的所有值,一個(gè)一個(gè)調(diào)用訂閱者的nextBlock
NSLog(@"%@",x);
}];
Output:
2018-04-03 14:07:39.578714+0800 Demo[6362:362707] 1
2018-04-03 14:07:39.579439+0800 Demo[6362:362707] 1
2018-04-03 14:07:39.579744+0800 Demo[6362:362707] 2
2018-04-03 14:07:39.579947+0800 Demo[6362:362707] 2
2018-04-03 14:07:39.580170+0800 Demo[6362:362707] 1
2018-04-03 14:07:39.580351+0800 Demo[6362:362707] 2
如果想當(dāng)一個(gè)信號(hào)被訂閱,就重復(fù)播放之前所有值,需要先發(fā)送信號(hào),在訂閱信號(hào)。也就是先保存值,再訂閱值。
2.5 RACReplaySubject與RACSubject區(qū)別
- RACReplaySubject可以先發(fā)送信號(hào),在訂閱信號(hào),RACSubject就不可以。
- 使用場(chǎng)景一:如果一個(gè)信號(hào)每被訂閱一次,就需要把之前的值重復(fù)發(fā)送一遍,使用RACReplaySubject。
- 使用場(chǎng)景二:可以設(shè)置capacity數(shù)量來(lái)限制緩存的value的數(shù)量,即只緩充最新的幾個(gè)值。
3. 冷熱信號(hào)
冷信號(hào) RACSignal、熱信號(hào)RACSubject
- Hot Observable可以有多個(gè)訂閱者,是一對(duì)多,集合可以與訂閱者共享信息;
Cold Observable只能一對(duì)一,當(dāng)有不同的訂閱者,消息是重新完整發(fā)送。 - Hot Observable是主動(dòng)的,它會(huì)在任意時(shí)間發(fā)出通知,與訂閱者的訂閱時(shí)間無(wú)關(guān);
Cold Observable是被動(dòng)的,只會(huì)在被訂閱時(shí)向訂閱者發(fā)送通知。
也就是說(shuō)冷信號(hào)所有的訂閱者會(huì)在訂閱時(shí)收到完全相同的序列;而訂閱熱信號(hào)之后,只會(huì)收到在訂閱之后發(fā)出的序列。
熱信號(hào)的訂閱者能否收到消息取決于訂閱的時(shí)間。
熱信號(hào)在我們生活中有很多的例子,比如訂閱雜志時(shí)并不會(huì)把之前所有的期刊都送到我們手中,只會(huì)接收到訂閱之后的期刊;而對(duì)于冷信號(hào)的話,舉一個(gè)不恰當(dāng)?shù)睦?,每一年的高考考生在『訂閱』高考之后,收到往年所有的試卷,并在高考之后?huì)取消訂閱。