RACSubject
//創(chuàng)建信號(hào)
RACSubject *subject = [RACSubject subject];
//訂閱信號(hào)
[subject subscribeNext:^(id _Nullable x) {
NSLog(@"subscribeNext %@",x);
}];
//發(fā)送信號(hào)
[subject sendNext:@"1"];
分析
step1: 先看下RACSubject
@interface RACSubject<ValueType> : RACSignal<ValueType> <RACSubscriber>
/// Returns a new subject.
+ (instancetype)subject;
// Redeclaration of the RACSubscriber method. Made in order to specify a generic type.
- (void)sendNext:(nullable ValueType)value;
@end
發(fā)現(xiàn)RACSubject繼承于RACSignal,有一個(gè)初始化方法subject和一個(gè)發(fā)送信號(hào)的方法 sendNext
step2: 查看subject方法
+ (instancetype)subject {
return [[self alloc] init];
}
- (instancetype)init {
self = [super init];
if (self == nil) return nil;
_disposable = [RACCompoundDisposable compoundDisposable];
_subscribers = [[NSMutableArray alloc] initWithCapacity:1];
return self;
}
調(diào)用了[[self alloc] init]方法,并對(duì)init方法進(jìn)行重寫(xiě),在init中對(duì)兩個(gè)屬性進(jìn)行了初始化
- 看下屬性的聲明
// Contains all current subscribers to the receiver.
//
// This should only be used while synchronized on `self`.
@property (nonatomic, strong, readonly) NSMutableArray *subscribers;
// Contains all of the receiver's subscriptions to other signals.
@property (nonatomic, strong, readonly) RACCompoundDisposable *disposable;
用一個(gè)數(shù)組保存所有的訂閱者,另一個(gè)對(duì)象用來(lái)取消訂閱
step3: 查看訂閱subscribeNext:方法,這是父類(lèi)的方法
- (RACDisposable *)subscribeNext:(void (^)(id x))nextBlock {
NSCParameterAssert(nextBlock != NULL);
RACSubscriber *o = [RACSubscriber subscriberWithNext:nextBlock error:NULL completed:NULL];
return [self subscribe:o];
}
- 創(chuàng)建訂閱者
+ (instancetype)subscriberWithNext:(void (^)(id x))next error:(void (^)(NSError *error))error completed:(void (^)(void))completed {
RACSubscriber *subscriber = [[self alloc] init];
subscriber->_next = [next copy];
subscriber->_error = [error copy];
subscriber->_completed = [completed copy];
return subscriber;
}
創(chuàng)建一個(gè)訂閱者,將傳進(jìn)來(lái)的參數(shù)賦值給訂閱者的成員變量,在例子中就是將訂閱的block與訂閱者關(guān)聯(lián)上。
- subscribe 調(diào)用的是
RACSubject類(lèi)中的subscribe
- (RACDisposable *)subscribe:(id<RACSubscriber>)subscriber {
NSCParameterAssert(subscriber != nil);
//創(chuàng)建一個(gè)取消訂閱對(duì)象
RACCompoundDisposable *disposable = [RACCompoundDisposable compoundDisposable];
//訂閱者
subscriber = [[RACPassthroughSubscriber alloc] initWithSubscriber:subscriber signal:self disposable:disposable];
//最關(guān)鍵的代碼
//將上一步創(chuàng)建好的訂閱者對(duì)象保存在第一步創(chuàng)建的數(shù)組中
NSMutableArray *subscribers = self.subscribers;
@synchronized (subscribers) {
[subscribers addObject:subscriber];
}
[disposable addDisposable:[RACDisposable disposableWithBlock:^{
@synchronized (subscribers) {
// Since newer subscribers are generally shorter-lived, search
// starting from the end of the list.
NSUInteger index = [subscribers indexOfObjectWithOptions:NSEnumerationReverse passingTest:^ BOOL (id<RACSubscriber> obj, NSUInteger index, BOOL *stop) {
return obj == subscriber;
}];
if (index != NSNotFound) [subscribers removeObjectAtIndex:index];
}
}]];
return disposable;
}
將上一步創(chuàng)建的訂閱者保存在第一步創(chuàng)建的數(shù)組中
step4: 查看sendNext:方法
- (void)sendNext:(id)value {
[self enumerateSubscribersUsingBlock:^(id<RACSubscriber> subscriber) {
[subscriber sendNext:value];
}];
}
遍歷所有的訂閱者
- 查看遍歷方法
- (void)enumerateSubscribersUsingBlock:(void (^)(id<RACSubscriber> subscriber))block {
NSArray *subscribers;
@synchronized (self.subscribers) {
subscribers = [self.subscribers copy];
}
for (id<RACSubscriber> subscriber in subscribers) {
block(subscriber);
}
}
self.subscribers是第一步中創(chuàng)建的數(shù)組,遍歷數(shù)組將其中的訂閱者對(duì)象取出來(lái),調(diào)用傳進(jìn)來(lái)的block方法。
- 查看遍歷回調(diào)中的
sendNext,這是調(diào)用的RACSubscriber的方法
- (void)sendNext:(id)value {
@synchronized (self) {
void (^nextBlock)(id) = [self.next copy];
if (nextBlock == nil) return;
nextBlock(value);
}
}
從訂閱者對(duì)象中取出next變量,即第二步中訂閱信號(hào)時(shí)的block,然后調(diào)用這個(gè)block方法。
RACSignal
既然從前面得知RACSubject繼承于RACSignal,那我們就研究下RACSignal
// 1.創(chuàng)建信號(hào)
RACSignal *signal = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
// 3.發(fā)送信號(hào)
[subscriber sendNext:@"liu"];
//發(fā)送完成信號(hào) 并取消訂閱
[subscriber sendCompleted];
// 4.取消信號(hào),如果信號(hào)想要被取消,就必須返回一個(gè)RACDisposable
return [RACDisposable disposableWithBlock:^{
NSLog(@"取消訂閱");
}];
}];
[signal subscribeNext:^(id x) {
// block的調(diào)用時(shí)刻:只要信號(hào)內(nèi)部發(fā)出數(shù)據(jù)就會(huì)調(diào)用這個(gè)block
NSLog(@"======%@", x);
}];
運(yùn)行結(jié)果

源碼分析
step1: 查看createSignal方法
+ (RACSignal *)createSignal:(RACDisposable * (^)(id<RACSubscriber> subscriber))didSubscribe {
return [RACDynamicSignal createSignal:didSubscribe];
}
//簡(jiǎn)單看下 RACDynamicSignal
//一個(gè)私有的RACSignal子類(lèi)實(shí)現(xiàn)它們的訂閱行為
// A private `RACSignal` subclasses that implements its subscription behavior
// using a block.
@interface RACDynamicSignal : RACSignal
+ (RACSignal *)createSignal:(RACDisposable * (^)(id<RACSubscriber> subscriber))didSubscribe;
@end
- 查看
RACDynamicSignal類(lèi)中的createSignal方法
+ (RACSignal *)createSignal:(RACDisposable * (^)(id<RACSubscriber> subscriber))didSubscribe {
RACDynamicSignal *signal = [[self alloc] init];
signal->_didSubscribe = [didSubscribe copy];
return [signal setNameWithFormat:@"+createSignal:"];
}
這個(gè)方法中主要做了兩件事
- 1、創(chuàng)建一個(gè)信號(hào)量對(duì)象
- 2、將外面?zhèn)鬟M(jìn)來(lái)的block保存在信號(hào)量對(duì)象的
_didSubscribe變量中
step2: 查看訂閱信號(hào)subscribeNext
- (RACDisposable *)subscribeNext:(void (^)(id x))nextBlock {
NSCParameterAssert(nextBlock != NULL);
RACSubscriber *o = [RACSubscriber subscriberWithNext:nextBlock error:NULL completed:NULL];
return [self subscribe:o];
}
這個(gè)方法主要做了兩件事
1、創(chuàng)建訂閱者
2、執(zhí)行訂閱命令
subscriberWithNext: error: completed:前面已經(jīng)講過(guò),這里就不再講述查看
subscribe:方法,是在RACDynamicSignal類(lèi)中
- (RACDisposable *)subscribe:(id<RACSubscriber>)subscriber {
NSCParameterAssert(subscriber != nil);
RACCompoundDisposable *disposable = [RACCompoundDisposable compoundDisposable];
subscriber = [[RACPassthroughSubscriber alloc] initWithSubscriber:subscriber signal:self disposable:disposable];
if (self.didSubscribe != NULL) {
RACDisposable *schedulingDisposable = [RACScheduler.subscriptionScheduler schedule:^{
RACDisposable *innerDisposable = self.didSubscribe(subscriber);
[disposable addDisposable:innerDisposable];
}];
[disposable addDisposable:schedulingDisposable];
}
return disposable;
}
關(guān)鍵代碼在self.didSubscribe(subscriber);,在創(chuàng)建信號(hào)量時(shí)將傳進(jìn)來(lái)的block 賦值給了self.didSubscribe,此時(shí)調(diào)用self.didSubscribe(subscriber);即調(diào)用block并將訂閱者對(duì)象傳進(jìn)去
問(wèn)題是
self.didSubscribe(subscriber);的調(diào)用這是在block中,那這個(gè)block是在什么時(shí)候調(diào)用?
- 查看
schedule方法,是RACSubscriptionScheduler類(lèi)的方法
- (RACDisposable *)schedule:(void (^)(void))block {
NSCParameterAssert(block != NULL);
if (RACScheduler.currentScheduler == nil) return [self.backgroundScheduler schedule:block];
block();
return nil;
}
可以看到傳進(jìn)來(lái)的參數(shù)block被調(diào)用
step3: sendNext發(fā)送信號(hào)
- (void)sendNext:(id)value {
@synchronized (self) {
void (^nextBlock)(id) = [self.next copy];
if (nextBlock == nil) return;
nextBlock(value);
}
}
從訂閱者對(duì)象中取出next變量,即第二步中訂閱信號(hào)時(shí)的block,然后調(diào)用這個(gè)block方法
RACReplaySubject
RACReplaySubject *subject = [RACReplaySubject subject];
[subject subscribeNext:^(id _Nullable x) {
NSLog(@"subscribeNext - %@",x);
}];
[subject sendNext:@"liu"];
運(yùn)行結(jié)果
subscribeNext - liu
源碼分析
step1: 先看下它的.h文件
保存它發(fā)送的值
/// A replay subject saves the values it is sent (up to its defined capacity)
/// and resends those to new subscribers. It will also replay an error or
/// completion.
@interface RACReplaySubject<ValueType> : RACSubject<ValueType>
/// Creates a new replay subject with the given capacity. A capacity of
/// RACReplaySubjectUnlimitedCapacity means values are never trimmed.
+ (instancetype)replaySubjectWithCapacity:(NSUInteger)capacity;
@end
RACReplaySubject是RACSubject的子類(lèi),對(duì)外只暴露了一個(gè)replaySubjectWithCapacity方法,這是個(gè)初始化方法
step2: 看下replaySubjectWithCapacity方法實(shí)現(xiàn)
+ (instancetype)replaySubjectWithCapacity:(NSUInteger)capacity {
return [(RACReplaySubject *)[self alloc] initWithCapacity:capacity];
}
- (instancetype)init {
return [self initWithCapacity:RACReplaySubjectUnlimitedCapacity];
}
- (instancetype)initWithCapacity:(NSUInteger)capacity {
self = [super init];
_capacity = capacity;
_valuesReceived = (capacity == RACReplaySubjectUnlimitedCapacity ? [NSMutableArray array] : [NSMutableArray arrayWithCapacity:capacity]);
return self;
}
不管是通過(guò)自己的初始化方法replaySubjectWithCapacity,還是通過(guò)父類(lèi)的初始化方法subject,最終都會(huì)調(diào)用initWithCapacity方法,在initWithCapacity方法里創(chuàng)建了一個(gè)數(shù)組用來(lái)存儲(chǔ)發(fā)送信號(hào)時(shí)傳遞的數(shù)據(jù)。
step2: 訂閱信號(hào)subscribeNext
- (RACDisposable *)subscribeNext:(void (^)(id x))nextBlock {
NSCParameterAssert(nextBlock != NULL);
RACSubscriber *o = [RACSubscriber subscriberWithNext:nextBlock error:NULL completed:NULL];
return [self subscribe:o];
}
第一步創(chuàng)建訂閱者與父類(lèi)是相同的,創(chuàng)建一個(gè)訂閱者并將block保存在訂閱者的成員變量中。
第二步調(diào)用的
RACReplaySubject自己的方法
- (RACDisposable *)subscribe:(id<RACSubscriber>)subscriber {
RACCompoundDisposable *compoundDisposable = [RACCompoundDisposable compoundDisposable];
RACDisposable *schedulingDisposable = [RACScheduler.subscriptionScheduler schedule:^{
@synchronized (self) {
for (id value in self.valuesReceived) {
if (compoundDisposable.disposed) return;
[subscriber sendNext:(value == RACTupleNil.tupleNil ? nil : value)];
}
if (compoundDisposable.disposed) return;
if (self.hasCompleted) {
[subscriber sendCompleted];
} else if (self.hasError) {
[subscriber sendError:self.error];
} else {
RACDisposable *subscriptionDisposable = [super subscribe:subscriber];
[compoundDisposable addDisposable:subscriptionDisposable];
}
}
}];
[compoundDisposable addDisposable:schedulingDisposable];
return compoundDisposable;
}
會(huì)遍歷第一步創(chuàng)建的數(shù)組保存的數(shù)據(jù),然后發(fā)送信號(hào),即訂閱信號(hào)流程中包含發(fā)送信號(hào)。
step3: 發(fā)送信號(hào)sendNext,注意這是在RACReplaySubject中實(shí)現(xiàn)
- (void)sendNext:(id)value {
@synchronized (self) {
[self.valuesReceived addObject:value ?: RACTupleNil.tupleNil];
if (self.capacity != RACReplaySubjectUnlimitedCapacity && self.valuesReceived.count > self.capacity) {
[self.valuesReceived removeObjectsInRange:NSMakeRange(0, self.valuesReceived.count - self.capacity)];
}
[super sendNext:value];
}
}
會(huì)將發(fā)送的數(shù)據(jù)保存在第一步創(chuàng)建的數(shù)組中以便訂閱信號(hào)時(shí)使用,最終發(fā)送信號(hào)還是調(diào)用的父類(lèi)的方法。
問(wèn)題:RACReplaySubject與RACSubject有什么區(qū)別
RACSubject中代碼必須按照創(chuàng)建信號(hào)-->訂閱信號(hào)-->發(fā)送信號(hào),這樣的先后順序,否則無(wú)效;而在RACReplaySubject中創(chuàng)建信號(hào)后,訂閱信號(hào)與發(fā)送信號(hào)順序無(wú)所謂。
所以RACReplaySubject寫(xiě)成下面這樣也是可以的。
RACReplaySubject *subject = [RACReplaySubject subject];
[subject sendNext:@"liu"];
[subject subscribeNext:^(id _Nullable x) {
NSLog(@"subscribeNext - %@",x);
}];
RACCommand
- (void)testRACCommand {
/**
* RACCommand使用注意
* 1、RACCommand內(nèi)部必須返回RACSignal
* 2、executionSignals信號(hào)中的信號(hào),一開(kāi)始獲取不到內(nèi)部信號(hào)
* 2.1 使用switchToLatest:獲取內(nèi)部信號(hào)
* 2.2 使用execute:獲取內(nèi)部信號(hào)
* 3、executing判斷是否正在執(zhí)行
* 3.1 第一次不準(zhǔn)確,需要skip:跳過(guò)
* 3.2 一定要記得sendCompleted,否則永遠(yuǎn)不會(huì)執(zhí)行完成
* 4、通過(guò)執(zhí)行execute,執(zhí)行command的block
*/
RACCommand *command = [[RACCommand alloc] initWithSignalBlock:^RACSignal * _Nonnull(id _Nullable input) {
return [RACSignal createSignal:^RACDisposable * _Nullable(id<RACSubscriber> _Nonnull subscriber) {
[subscriber sendNext:@"發(fā)送第1條信號(hào)"];
[subscriber sendCompleted];
return nil;
}];
}];
[command.executionSignals.switchToLatest subscribeNext:^(id _Nullable x) {
}];
[[command.executing skip:1] subscribeNext:^(NSNumber * _Nullable x) {
//NSLog(@"x = %@", x);
if (x.boolValue) {
NSLog(@"x = %@ 正在執(zhí)行", x);
} else {
NSLog(@"x = %@ 執(zhí)行完成", x);
}
}];
[command execute:@1];
}
源碼分析
step1: 查看RACCommand類(lèi)的.h文件
1. executionSignals:需要執(zhí)行的block成功的時(shí)候返回的信號(hào),他是在主線程執(zhí)行的。
2. executing:信號(hào)量 監(jiān)聽(tīng)RACCommand的狀態(tài)
3. enabled:當(dāng)前命令是否enabled,默認(rèn)是no,他也可以根據(jù)enableSignal來(lái)設(shè)置或者allowsConcurrentExecution設(shè)置為NO的時(shí)候(command已經(jīng)開(kāi)始執(zhí)行)
4. errors:執(zhí)行command的時(shí)候獲取的error都會(huì)通過(guò)這個(gè)信號(hào)發(fā)送
5. allowsConcurrentExecution:是否允許并發(fā)執(zhí)行command,默認(rèn)是NO。
6.initWithSignalBlock:(RACSignal * (^)(id input))signalBlock:初始化RACCommand,參數(shù)為返回一個(gè)信號(hào)的block,即block返回的是executionSignals
7.- (id)initWithEnabled:(RACSignal *)enabledSignal signalBlock:(RACSignal * (^)(id input))signalBlock:第一個(gè)參數(shù)設(shè)置當(dāng)前command是否可用,第二個(gè)是執(zhí)行的block。enableed默認(rèn)是yes,所以第二個(gè)參數(shù)也可以為nil。
8.execute:(id)input:調(diào)用command,input為executionSignals的訂閱者發(fā)送的值
step2: 查看初始化方法
- (instancetype)initWithSignalBlock:(RACSignal<id> * (^)(id input))signalBlock {
return [self initWithEnabled:nil signalBlock:signalBlock];
}
- (instancetype)initWithEnabled:(RACSignal *)enabledSignal signalBlock:(RACSignal<id> * (^)(id input))signalBlock {
NSCParameterAssert(signalBlock != nil);
self = [super init];
_addedExecutionSignalsSubject = [RACSubject new];
_allowsConcurrentExecutionSubject = [RACSubject new];
_signalBlock = [signalBlock copy];
_executionSignals = [[[self.addedExecutionSignalsSubject
map:^(RACSignal *signal) {
return [signal catchTo:[RACSignal empty]];
}]
deliverOn:RACScheduler.mainThreadScheduler]
setNameWithFormat:@"%@ -executionSignals", self];
// `errors` needs to be multicasted so that it picks up all
// `activeExecutionSignals` that are added.
//
// In other words, if someone subscribes to `errors` _after_ an execution
// has started, it should still receive any error from that execution.
RACMulticastConnection *errorsConnection = [[[self.addedExecutionSignalsSubject
flattenMap:^(RACSignal *signal) {
return [[signal
ignoreValues]
catch:^(NSError *error) {
return [RACSignal return:error];
}];
}]
deliverOn:RACScheduler.mainThreadScheduler]
publish];
_errors = [errorsConnection.signal setNameWithFormat:@"%@ -errors", self];
[errorsConnection connect];
RACSignal *immediateExecuting = [[[[self.addedExecutionSignalsSubject
flattenMap:^(RACSignal *signal) {
return [[[signal
catchTo:[RACSignal empty]]
then:^{
return [RACSignal return:@-1];
}]
startWith:@1];
}]
scanWithStart:@0 reduce:^(NSNumber *running, NSNumber *next) {
return @(running.integerValue + next.integerValue);
}]
map:^(NSNumber *count) {
return @(count.integerValue > 0);
}]
startWith:@NO];
_executing = [[[[[immediateExecuting
deliverOn:RACScheduler.mainThreadScheduler]
// This is useful before the first value arrives on the main thread.
startWith:@NO]
distinctUntilChanged]
replayLast]
setNameWithFormat:@"%@ -executing", self];
RACSignal *moreExecutionsAllowed = [RACSignal
if:[self.allowsConcurrentExecutionSubject startWith:@NO]
then:[RACSignal return:@YES]
else:[immediateExecuting not]];
if (enabledSignal == nil) {
enabledSignal = [RACSignal return:@YES];
} else {
enabledSignal = [enabledSignal startWith:@YES];
}
_immediateEnabled = [[[[RACSignal
combineLatest:@[ enabledSignal, moreExecutionsAllowed ]]
and]
takeUntil:self.rac_willDeallocSignal]
replayLast];
_enabled = [[[[[self.immediateEnabled
take:1]
concat:[[self.immediateEnabled skip:1] deliverOn:RACScheduler.mainThreadScheduler]]
distinctUntilChanged]
replayLast]
setNameWithFormat:@"%@ -enabled", self];
return self;
}
將傳進(jìn)來(lái)的block拷貝到變量_signalBlock 中,以便后面使用。
step3: 查看command.executing subscribeNext:,訂閱信號(hào)從而獲取RACCommand的狀態(tài)
- (RACDisposable *)subscribeNext:(void (^)(id x))nextBlock {
NSCParameterAssert(nextBlock != NULL);
RACSubscriber *o = [RACSubscriber subscriberWithNext:nextBlock error:NULL completed:NULL];
return [self subscribe:o];
}
點(diǎn)擊subscribeNext進(jìn)入的是RACSignal文件,但是查看self會(huì)發(fā)現(xiàn)是RACReplaySubject

- 進(jìn)入
RACReplaySubject的subscribe方法
- (RACDisposable *)subscribe:(id<RACSubscriber>)subscriber {
RACCompoundDisposable *compoundDisposable = [RACCompoundDisposable compoundDisposable];
RACDisposable *schedulingDisposable = [RACScheduler.subscriptionScheduler schedule:^{
@synchronized (self) {
for (id value in self.valuesReceived) {
if (compoundDisposable.disposed) return;
[subscriber sendNext:(value == RACTupleNil.tupleNil ? nil : value)];
}
if (compoundDisposable.disposed) return;
if (self.hasCompleted) {
[subscriber sendCompleted];
} else if (self.hasError) {
[subscriber sendError:self.error];
} else {
RACDisposable *subscriptionDisposable = [super subscribe:subscriber];
[compoundDisposable addDisposable:subscriptionDisposable];
}
}
}];
[compoundDisposable addDisposable:schedulingDisposable];
return compoundDisposable;
}
step4: RACCommand執(zhí)行命令execute
- (RACSignal *)execute:(id)input {
// `immediateEnabled` is guaranteed to send a value upon subscription, so
// -first is acceptable here.
BOOL enabled = [[self.immediateEnabled first] boolValue];
if (!enabled) {
NSError *error = [NSError errorWithDomain:RACCommandErrorDomain code:RACCommandErrorNotEnabled userInfo:@{
NSLocalizedDescriptionKey: NSLocalizedString(@"The command is disabled and cannot be executed", nil),
RACUnderlyingCommandErrorKey: self
}];
return [RACSignal error:error];
}
RACSignal *signal = self.signalBlock(input);
NSCAssert(signal != nil, @"nil signal returned from signal block for value: %@", input);
// We subscribe to the signal on the main thread so that it occurs _after_
// -addActiveExecutionSignal: completes below.
//
// This means that `executing` and `enabled` will send updated values before
// the signal actually starts performing work.
RACMulticastConnection *connection = [[signal
subscribeOn:RACScheduler.mainThreadScheduler]
multicast:[RACReplaySubject subject]];
[self.addedExecutionSignalsSubject sendNext:connection.signal];
[connection connect];
return [connection.signal setNameWithFormat:@"%@ -execute: %@", self, RACDescription(input)];
}
-
self.signalBlock(input),self.signalBlock是第一步中外部傳進(jìn)來(lái)的block,這句代碼就是執(zhí)行外面的block
RACTuple
在 CocoaReactive 中,有些信號(hào)流傳遞的信號(hào)量是由其他多個(gè)信號(hào)流傳遞的信號(hào)量組合而成的,而這多個(gè)信號(hào)量則是封裝在一個(gè) RACTuple 實(shí)例中進(jìn)行傳遞的。