一開始沒注意有兩個方法,下面從源碼看兩個方法
- (RACSignal *)concat;
- (RACSignal *)concat:(RACSignal *)signal;
先說concat,源碼:
- (RACSignal *)concat {
return [[self flatten:1] setNameWithFormat:@"[%@] -concat", self.name];
}
方法還是走到了flatten:先看用法吧
RACSubject *test1 = [RACSubject subject];
RACSubject *test2 = [RACSubject subject];
RACSubject *test3 = [RACSubject subject];
RACSignal *sig = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
[subscriber sendNext:test1];
[subscriber sendNext:test2];
[subscriber sendNext:test3];
return nil;
}];
[[sig flatten:2] subscribeNext:^(id x) {
NSLog(@"%@",x);
}];
[test1 sendNext:@"test1 1"];
[test2 sendNext:@"test2 1"];
[test3 sendNext:@"test3 1"];
打印的日志:
2016-01-28 16:17:34.207 demo[10439:133672] test1 1
2016-01-28 16:17:34.208 demo[10439:133672] test2 1
如果改成[sig flatten:3] 打印日志:
2016-01-28 16:27:24.848 demo[17056:144114] test1 1
2016-01-28 16:27:24.848 demo[17056:144114] test2 1
2016-01-28 16:27:24.849 demo[17056:144114] test3 1
可以大概猜出flatten:帶的參數(shù)為最大的并發(fā)數(shù)量,下面開始看源碼
源碼如下
- (RACSignal *)flatten:(NSUInteger)maxConcurrent {
return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
RACCompoundDisposable *compoundDisposable = [[RACCompoundDisposable alloc] init];
// Contains disposables for the currently active subscriptions.
//
// This should only be used while synchronized on `subscriber`.
NSMutableArray *activeDisposables = [[NSMutableArray alloc] initWithCapacity:maxConcurrent];
// Whether the signal-of-signals has completed yet.
//
// This should only be used while synchronized on `subscriber`.
__block BOOL selfCompleted = NO;
// Subscribes to the given signal.
__block void (^subscribeToSignal)(RACSignal *);
// Weak reference to the above, to avoid a leak.
__weak __block void (^recur)(RACSignal *);
// Sends completed to the subscriber if all signals are finished.
//
// This should only be used while synchronized on `subscriber`.
void (^completeIfAllowed)(void) = ^{
if (selfCompleted && activeDisposables.count == 0) {
[subscriber sendCompleted];
// A strong reference is held to `subscribeToSignal` until completion,
// preventing it from deallocating early.
subscribeToSignal = nil;
}
};
// The signals waiting to be started.
//
// This array should only be used while synchronized on `subscriber`.
NSMutableArray *queuedSignals = [NSMutableArray array];
recur = subscribeToSignal = ^(RACSignal *signal) {
//處理sendnext得到的racsignal
RACSerialDisposable *serialDisposable = [[RACSerialDisposable alloc] init];
@synchronized (subscriber) {
[compoundDisposable addDisposable:serialDisposable];
[activeDisposables addObject:serialDisposable];
}
//訂閱這個signal
serialDisposable.disposable = [signal subscribeNext:^(id x) {
[subscriber sendNext:x];
} error:^(NSError *error) {
[subscriber sendError:error];
} completed:^{
//如果這個信號已經(jīng)結(jié)束,那么就去判斷剛才那些因為不能超過最大并發(fā)而存到queuedSignals里面的信號
__strong void (^subscribeToSignal)(RACSignal *) = recur;
RACSignal *nextSignal;
@synchronized (subscriber) {
[compoundDisposable removeDisposable:serialDisposable];
[activeDisposables removeObjectIdenticalTo:serialDisposable];
if (queuedSignals.count == 0) {
completeIfAllowed();
return;
}
nextSignal = queuedSignals[0];
[queuedSignals removeObjectAtIndex:0];
}
subscribeToSignal(nextSignal);
}];
};
//訂閱自己,只接受sendnext類型為RACSignal的值
[compoundDisposable addDisposable:[self subscribeNext:^(RACSignal *signal) {
if (signal == nil) return;
NSCAssert([signal isKindOfClass:RACSignal.class], @"Expected a RACSignal, got %@", signal);
@synchronized (subscriber) {
//維護了一個數(shù)組,并發(fā)的數(shù)量不能超過maxConcurrent,如果超過就加入queuedSignals數(shù)組
if (maxConcurrent > 0 && activeDisposables.count >= maxConcurrent) {
[queuedSignals addObject:signal];
// If we need to wait, skip subscribing to this
// signal.
return;
}
}
subscribeToSignal(signal);
} error:^(NSError *error) {
[subscriber sendError:error];
} completed:^{
@synchronized (subscriber) {
selfCompleted = YES;
completeIfAllowed();
}
}]];
return compoundDisposable;
}] setNameWithFormat:@"[%@] -flatten: %lu", self.name, (unsigned long)maxConcurrent];
}