前言
緊接著上篇的源碼實現分析,繼續(xù)分析RACSignal的變換操作的底層實現。
目錄
1.高階信號操作
2.同步操作
3.副作用操作
4.多線程操作
5.其他操作
一. 高階信號操作
高階操作大部分的操作是針對高階信號的,也就是說信號里面發(fā)送的值還是一個信號或者是一個高階信號??梢灶惐葦到M,這里就是多維數組,數組里面還是套的數組。
- flattenMap: (在父類RACStream中定義的)
flattenMap:在整個RAC中具有很重要的地位,很多信號變換都是可以用flattenMap:來實現的。
map:,flatten,filter,sequenceMany:這4個操作都是用flattenMap:來實現的。然而其他變換操作實現里面用到map:,flatten,filter又有很多。
回顧一下map:的實現:
- (instancetype)map:(id (^)(id value))block {
NSCParameterAssert(block != nil);
Class class = self.class;
return [[self flattenMap:^(id value) {
return [class return:block(value)];
}] setNameWithFormat:@"[%@] -map:", self.name];
}
map:的操作其實就是直接原信號進行的 flattenMap:的操作,變換出來的新的信號的值是block(value)。
flatten的實現接下去會具體分析,這里先略過。
filter的實現:
- (instancetype)filter:(BOOL (^)(id value))block {
NSCParameterAssert(block != nil);
Class class = self.class;
return [[self flattenMap:^ id (id value) {
block(value) ? return [class return:value] : return class.empty;
}] setNameWithFormat:@"[%@] -filter:", self.name];
}
filter的實現和map:有點類似,也是對原信號進行 flattenMap:的操作,只不過block(value)不是作為返回值,而是作為判斷條件,滿足這個閉包的條件,變換出來的新的信號返回值就是value,不滿足的就返回empty信號
接下去要分析的高階操作里面,switchToLatest,try:,tryMap:的實現中也將會使用到flattenMap:。
flattenMap:的源碼實現:
- (instancetype)flattenMap:(RACStream * (^)(id value))block {
Class class = self.class;
return [[self bind:^{
return ^(id value, BOOL *stop) {
id stream = block(value) ?: [class empty];
NSCAssert([stream isKindOfClass:RACStream.class], @"Value returned from -flattenMap: is not a stream: %@", stream);
return stream;
};
}] setNameWithFormat:@"[%@] -flattenMap:", self.name];
}
flattenMap:的實現是調用了bind函數,對原信號進行變換,并返回block(value)的新信號。關于bind操作的具體流程這篇文章里面已經分析過了,這里不再贅述。
從flattenMap:的源碼可以看到,它是可以支持類似Promise的串行異步操作的,并且flattenMap:是滿足Monad中bind部分定義的。flattenMap:沒法去實現takeUntil:和take:的操作。
然而,bind操作可以實現take:的操作,bind是完全滿足Monad中bind部分定義的。
- flatten (在父類RACStream中定義的)
flatten的源碼實現:
- (instancetype)flatten {
__weak RACStream *stream __attribute__((unused)) = self;
return [[self flattenMap:^(id value) {
return value;
}] setNameWithFormat:@"[%@] -flatten", self.name];
}
flatten操作必須是對高階信號進行操作,如果信號里面不是信號,即不是高階信號,那么就會崩潰。崩潰信息如下:
*** Terminating app due to uncaught exception 'NSInternalInconsistencyException', reason: 'Value returned from -flattenMap: is not a stream
所以flatten是對高階信號進行的降階操作。高階信號每發(fā)送一次信號,經過flatten變換,由于flattenMap:操作之后,返回的新的信號的每個值就是原信號中每個信號的值。
如果對信號A,信號B,信號C進行merge:操作,可以達到和flatten一樣的效果。
[RACSignal merge:@[signalA,signalB,signalC]];
merge:操作在上篇文章分析過,再來復習一下:
+ (RACSignal *)merge:(id<NSFastEnumeration>)signals {
NSMutableArray *copiedSignals = [[NSMutableArray alloc] init];
for (RACSignal *signal in signals) {
[copiedSignals addObject:signal];
}
return [[[RACSignal
createSignal:^ RACDisposable * (id<RACSubscriber> subscriber) {
for (RACSignal *signal in copiedSignals) {
[subscriber sendNext:signal];
}
[subscriber sendCompleted];
return nil;
}]
flatten]
setNameWithFormat:@"+merge: %@", copiedSignals];
}
現在在回來看這段代碼,copiedSignals雖然是一個NSMutableArray,但是它近似合成了一個上圖中的高階信號。然后這些信號們每發(fā)送出來一個信號就發(fā)給訂閱者。整個操作如flatten的字面意思一樣,壓平。
另外,在ReactiveCocoa v2.5中,flatten默認就是flattenMap:這一種操作。
public func flatten(_ strategy: FlattenStrategy) -> Signal<Value.Value, Error> {
switch strategy {
case .merge:
return self.merge()
case .concat:
return self.concat()
case .latest:
return self.switchToLatest()
}
}
而在ReactiveCocoa v3.x,v4.x,v5.x中,flatten的操作是可以選擇3種操作選擇的。merge,concat,switchToLatest。
- flatten:
flatten:操作也必須是對高階信號進行操作,如果信號里面不是信號,即不是高階信號,那么就會崩潰。
flatten:的實現比較復雜,一步步的來分析:
- (RACSignal *)flatten:(NSUInteger)maxConcurrent {
return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
RACCompoundDisposable *compoundDisposable = [[RACCompoundDisposable alloc] init];
NSMutableArray *activeDisposables = [[NSMutableArray alloc] initWithCapacity:maxConcurrent];
NSMutableArray *queuedSignals = [NSMutableArray array];
__block BOOL selfCompleted = NO;
__block void (^subscribeToSignal)(RACSignal *);
__weak __block void (^recur)(RACSignal *);
recur = subscribeToSignal = ^(RACSignal *signal) { // 暫時省略};
void (^completeIfAllowed)(void) = ^{ // 暫時省略};
[compoundDisposable addDisposable:[self subscribeNext:^(RACSignal *signal) {
if (signal == nil) return;
NSCAssert([signal isKindOfClass:RACSignal.class], @"Expected a RACSignal, got %@", signal);
@synchronized (subscriber) {
if (maxConcurrent > 0 && activeDisposables.count >= maxConcurrent) {
[queuedSignals addObject:signal];
return;
}
}
subscribeToSignal(signal);
} error:^(NSError *error) {
[subscriber sendError:error];
} completed:^{
@synchronized (subscriber) {
selfCompleted = YES;
completeIfAllowed();
}
}]];
return compoundDisposable;
}] setNameWithFormat:@"[%@] -flatten: %lu", self.name, (unsigned long)maxConcurrent];
}
先來解釋一些變量,數組的作用
activeDisposables里面裝的是當前正在訂閱的訂閱者們的disposables信號。
queuedSignals里面裝的是被暫時緩存起來的信號,它們等待被訂閱。
selfCompleted表示高階信號是否Completed。
subscribeToSignal閉包的作用是訂閱所給的信號。這個閉包的入參參數就是一個信號,在閉包內部訂閱這個信號,并進行一些操作。
recur是對subscribeToSignal閉包的一個弱引用,防止strong-weak循環(huán)引用,在下面會分析subscribeToSignal閉包,就會明白為什么recur要用weak修飾了。
completeIfAllowed的作用是在所有信號都發(fā)送完畢的時候,通知訂閱者,給訂閱者發(fā)送completed。
入參maxConcurrent的意思是最大可容納同時被訂閱的信號個數。
再來詳細分析一下具體訂閱的過程。
flatten:的內部,訂閱高階信號發(fā)出來的信號,這部分的代碼比較簡單:
[self subscribeNext:^(RACSignal *signal) {
if (signal == nil) return;
NSCAssert([signal isKindOfClass:RACSignal.class], @"Expected a RACSignal, got %@", signal);
@synchronized (subscriber) {
// 1
if (maxConcurrent > 0 && activeDisposables.count >= maxConcurrent) {
[queuedSignals addObject:signal];
return;
}
}
// 2
subscribeToSignal(signal);
} error:^(NSError *error) {
[subscriber sendError:error];
} completed:^{
@synchronized (subscriber) {
selfCompleted = YES;
// 3
completeIfAllowed();
}
}]];
如果當前最大可容納信號的個數 > 0 ,且,activeDisposables數組里面已經裝滿到最大可容納信號的個數,不能再裝新的信號了。那么就把當前的信號緩存到queuedSignals數組中。
直到activeDisposables數組里面有空的位子可以加入新的信號,那么就調用subscribeToSignal( )閉包,開始訂閱這個新的信號。
最后完成的時候標記變量selfCompleted為YES,并且調用completeIfAllowed( )閉包。
void (^completeIfAllowed)(void) = ^{
if (selfCompleted && activeDisposables.count == 0) {
[subscriber sendCompleted];
subscribeToSignal = nil;
}
};
當selfCompleted = YES 并且activeDisposables數組里面的信號都發(fā)送完畢,沒有可以發(fā)送的信號了,即activeDisposables.count = 0,那么就給訂閱者sendCompleted。這里值得一提的是,還需要把subscribeToSignal手動置為nil。因為在subscribeToSignal閉包中強引用了completeIfAllowed閉包,防止completeIfAllowed閉包被提早的銷毀掉了。所以在completeIfAllowed閉包執(zhí)行完畢的時候,需要再把subscribeToSignal閉包置為nil。
那么接下來需要看的重點就是subscribeToSignal( )閉包。
recur = subscribeToSignal = ^(RACSignal *signal) {
RACSerialDisposable *serialDisposable = [[RACSerialDisposable alloc] init];
// 1
@synchronized (subscriber) {
[compoundDisposable addDisposable:serialDisposable];
[activeDisposables addObject:serialDisposable];
}
serialDisposable.disposable = [signal subscribeNext:^(id x) {
[subscriber sendNext:x];
} error:^(NSError *error) {
[subscriber sendError:error];
} completed:^{
// 2
__strong void (^subscribeToSignal)(RACSignal *) = recur;
RACSignal *nextSignal;
// 3
@synchronized (subscriber) {
[compoundDisposable removeDisposable:serialDisposable];
[activeDisposables removeObjectIdenticalTo:serialDisposable];
// 4
if (queuedSignals.count == 0) {
completeIfAllowed();
return;
}
// 5
nextSignal = queuedSignals[0];
[queuedSignals removeObjectAtIndex:0];
}
// 6
subscribeToSignal(nextSignal);
}];
};
activeDisposables先添加當前高階信號發(fā)出來的信號的Disposable( 也就是入參信號的Disposable)
這里會對recur進行__strong,因為下面第6步會用到subscribeToSignal( )閉包,同樣也是為了防止出現循環(huán)引用。
訂閱入參信號,給訂閱者發(fā)送信號。當發(fā)送完畢后,activeDisposables中移除它對應的Disposable。
如果當前緩存的queuedSignals數組里面沒有緩存的信號,那么就調用completeIfAllowed( )閉包。
如果當前緩存的queuedSignals數組里面有緩存的信號,那么就取出第0個信號,并在queuedSignals數組移除它。
把第4步取出的信號繼續(xù)訂閱,繼續(xù)調用subscribeToSignal( )閉包。
總結一下:高階信號每發(fā)送一個信號值,判斷activeDisposables數組裝的個數是否已經超過了maxConcurrent。如果裝不下了就緩存進queuedSignals數組中。如果還可以裝的下就開始調用subscribeToSignal( )閉包,訂閱當前信號。
每發(fā)送完一個信號就判斷緩存數組queuedSignals的個數,如果緩存數組里面已經沒有信號了,那么就結束原來高階信號的發(fā)送。如果緩存數組里面還有信號就繼續(xù)訂閱。如此循環(huán),直到原高階信號所有的信號都發(fā)送完畢。
整個flatten:的執(zhí)行流程都分析清楚了,最后,關于入參maxConcurrent進行更進一步的解讀。
回看上面flatten:的實現中有這樣一句話:
if (maxConcurrent > 0 && activeDisposables.count >= maxConcurrent)
那么maxConcurrent的值域就是最終決定flatten:表現行為。
如果maxConcurrent < 0,會發(fā)生什么?程序會崩潰。因為在源碼中有這樣一行的初始化的代碼:
NSMutableArray *activeDisposables = [[NSMutableArray alloc] initWithCapacity:maxConcurrent];
activeDisposables在初始化的時候會初始化一個大小為maxConcurrent的NSMutableArray。如果maxConcurrent < 0,那么這里初始化就會崩潰。
如果maxConcurrent = 0,會發(fā)生什么?那么flatten:就退化成flatten了。
如果maxConcurrent = 1,會發(fā)生什么?那么flatten:就退化成concat了。
如果maxConcurrent > 1,會發(fā)生什么?由于至今還沒有遇到能用到maxConcurrent > 1的需求情況,所以這里暫時不展示圖解了。maxConcurrent > 1之后,flatten的行為還依照高階信號的個數和maxConcurrent的關系。如果高階信號的個數<=maxConcurrent的值,那么flatten:又退化成flatten了。如果高階信號的個數>maxConcurrent的值,那么多的信號就會進入queuedSignals緩存數組。
- concat
這里的concat實現是在RACSignal里面定義的。
- (RACSignal *)concat {
return [[self flatten:1] setNameWithFormat:@"[%@] -concat", self.name];
}
一看源碼就知道了,concat其實就是flatten:1。
當然在RACSignal中定義了concat:方法,這個方法在之前的文章已經分析過了,這里回顧對比一下:
- (RACSignal *)concat:(RACSignal *)signal {
return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
RACSerialDisposable *serialDisposable = [[RACSerialDisposable alloc] init];
RACDisposable *sourceDisposable = [self subscribeNext:^(id x) {
[subscriber sendNext:x];
} error:^(NSError *error) {
[subscriber sendError:error];
} completed:^{
RACDisposable *concattedDisposable = [signal subscribe:subscriber];
serialDisposable.disposable = concattedDisposable;
}];
serialDisposable.disposable = sourceDisposable;
return serialDisposable;
}] setNameWithFormat:@"[%@] -concat: %@", self.name, signal];
}
經過對比可以發(fā)現,雖然最終變換出來的結果類似,但是針對的信號的對象是不同的,concat是針對高階信號進行降階操作。concat:是把兩個信號連接起來的操作。如果把高階信號按照時間軸,從左往右,依次把每個信號都concat:連接起來,那么結果就是concat。
- switchToLatest
- (RACSignal *)switchToLatest {
return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
RACMulticastConnection *connection = [self publish];
RACDisposable *subscriptionDisposable = [[connection.signal
flattenMap:^(RACSignal *x) {
NSCAssert(x == nil || [x isKindOfClass:RACSignal.class], @"-switchToLatest requires that the source signal (%@) send signals. Instead we got: %@", self, x);
return [x takeUntil:[connection.signal concat:[RACSignal never]]];
}]
subscribe:subscriber];
RACDisposable *connectionDisposable = [connection connect];
return [RACDisposable disposableWithBlock:^{
[subscriptionDisposable dispose];
[connectionDisposable dispose];
}];
}] setNameWithFormat:@"[%@] -switchToLatest", self.name];
}
switchToLatest這個操作只能用在高階信號上,如果原信號里面有不是信號的值,那么就會崩潰,崩潰信息如下:
***** Terminating app due to uncaught exception 'NSInternalInconsistencyException', reason: '-switchToLatest requires that the source signal (<RACDynamicSignal: 0x608000038ec0> name: ) send signals.
在switchToLatest操作中,先把原信號轉換成熱信號,connection.signal就是RACSubject類型的。對RACSubject進行flattenMap:變換。在flattenMap:變換中,connection.signal會先concat:一個never信號。這里concat:一個never信號的原因是為了內部的信號過早的結束而導致訂閱者收到complete信號。
flattenMap:變換中x也是一個信號,對x進行takeUntil:變換,效果就是下一個信號到來之前,x會一直發(fā)送信號,一旦下一個信號到來,x就會被取消訂閱,開始訂閱新的信號。
一個高階信號經過switchToLatest降階操作之后,能得到上圖中的信號。
- switch: cases: default:
switch: cases: default:源碼實現如下:
+ (RACSignal *)switch:(RACSignal *)signal cases:(NSDictionary *)cases default:(RACSignal *)defaultSignal {
NSCParameterAssert(signal != nil);
NSCParameterAssert(cases != nil);
for (id key in cases) {
id value __attribute__((unused)) = cases[key];
NSCAssert([value isKindOfClass:RACSignal.class], @"Expected all cases to be RACSignals, %@ isn't", value);
}
NSDictionary *copy = [cases copy];
return [[[signal
map:^(id key) {
if (key == nil) key = RACTupleNil.tupleNil;
RACSignal *signal = copy[key] ?: defaultSignal;
if (signal == nil) {
NSString *description = [NSString stringWithFormat:NSLocalizedString(@"No matching signal found for value %@", @""), key];
return [RACSignal error:[NSError errorWithDomain:RACSignalErrorDomain code:RACSignalErrorNoMatchingCase userInfo:@{ NSLocalizedDescriptionKey: description }]];
}
return signal;
}]
switchToLatest]
setNameWithFormat:@"+switch: %@ cases: %@ default: %@", signal, cases, defaultSignal];
}
實現中有3個斷言,全部都是針對入參的要求。入參signal信號和cases字典都不能是nil。其次,cases字典里面所有key對應的value必須是RACSignal類型的。注意,defaultSignal是可以為nil的。
接下來的實現比較簡單,對入參傳進來的signal信號進行map變換,這里的變換是升階的變換。
signal每次發(fā)送出來的一個值,就把這個值當做key值去cases字典里面去查找對應的value。當然value對應的是一個信號。如果value對應的信號不為空,就把signal發(fā)送出來的這個值map成字典里面對應的信號。如果value對應為空,那么就把原signal發(fā)出來的值map成defaultSignal信號。
如果經過轉換之后,得到的信號為nil,就會返回一個error信號。如果得到的信號不為nil,那么原信號完全轉換完成就會變成一個高階信號,這個高階信號里面裝的都是信號。最后再對這個高階信號執(zhí)行switchToLatest轉換。
- if: then: else:
if: then: else:源碼實現如下:
+ (RACSignal *)if:(RACSignal *)boolSignal then:(RACSignal *)trueSignal else:(RACSignal *)falseSignal {
NSCParameterAssert(boolSignal != nil);
NSCParameterAssert(trueSignal != nil);
NSCParameterAssert(falseSignal != nil);
return [[[boolSignal
map:^(NSNumber *value) {
NSCAssert([value isKindOfClass:NSNumber.class], @"Expected %@ to send BOOLs, not %@", boolSignal, value);
return (value.boolValue ? trueSignal : falseSignal);
}]
switchToLatest]
setNameWithFormat:@"+if: %@ then: %@ else: %@", boolSignal, trueSignal, falseSignal];
}
入參boolSignal,trueSignal,falseSignal三個信號都不能為nil。
boolSignal里面都必須裝的是NSNumber類型的值。
針對boolSignal進行map升階操作,boolSignal信號里面的值如果是YES,那么就轉換成trueSignal信號,如果為NO,就轉換成falseSignal。升階轉換完成之后,boolSignal就是一個高階信號,然后再進行switchToLatest操作。
- catch:
catch:的實現如下:
- (RACSignal *)catch:(RACSignal * (^)(NSError *error))catchBlock {
NSCParameterAssert(catchBlock != NULL);
return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
RACSerialDisposable *catchDisposable = [[RACSerialDisposable alloc] init];
RACDisposable *subscriptionDisposable = [self subscribeNext:^(id x) {
[subscriber sendNext:x];
} error:^(NSError *error) {
RACSignal *signal = catchBlock(error);
NSCAssert(signal != nil, @"Expected non-nil signal from catch block on %@", self);
catchDisposable.disposable = [signal subscribe:subscriber];
} completed:^{
[subscriber sendCompleted];
}];
return [RACDisposable disposableWithBlock:^{
[catchDisposable dispose];
[subscriptionDisposable dispose];
}];
}] setNameWithFormat:@"[%@] -catch:", self.name];
}
當對原信號進行訂閱的時候,如果出現了錯誤,會去執(zhí)行catchBlock( )閉包,入參為剛剛產生的error。catchBlock( )閉包產生的是一個新的RACSignal,并再次用訂閱者訂閱該信號。
這里之所以說是高階操作,是因為這里原信號發(fā)生錯誤之后,錯誤會升階成一個信號。
- catchTo:
catchTo:的實現如下:
- (RACSignal *)catchTo:(RACSignal *)signal {
return [[self catch:^(NSError *error) {
return signal;
}] setNameWithFormat:@"[%@] -catchTo: %@", self.name, signal];
}
catchTo:的實現就是調用catch:方法,只不過原來catch:方法里面的catchBlock( )閉包,永遠都只返回catchTo:的入參,signal信號。
- try:
- (RACSignal *)try:(BOOL (^)(id value, NSError **errorPtr))tryBlock {
NSCParameterAssert(tryBlock != NULL);
return [[self flattenMap:^(id value) {
NSError *error = nil;
BOOL passed = tryBlock(value, &error);
return (passed ? [RACSignal return:value] : [RACSignal error:error]);
}] setNameWithFormat:@"[%@] -try:", self.name];
}
try:也是一個高階操作。對原信號進行flattenMap變換,對信號發(fā)出來的每個值都調用一遍tryBlock( )閉包,如果這個閉包的返回值是YES,那么就返回[RACSignal return:value],如果閉包的返回值是NO,那么就返回error。原信號中如果都是值,那么經過try:操作之后,每個值都會變成RACSignal,于是原信號也就變成了高階信號了。
- tryMap:
- (RACSignal *)tryMap:(id (^)(id value, NSError **errorPtr))mapBlock {
NSCParameterAssert(mapBlock != NULL);
return [[self flattenMap:^(id value) {
NSError *error = nil;
id mappedValue = mapBlock(value, &error);
return (mappedValue == nil ? [RACSignal error:error] : [RACSignal return:mappedValue]);
}] setNameWithFormat:@"[%@] -tryMap:", self.name];
}
tryMap:的實現和try:的實現基本一致,唯一不同的就是入參閉包的返回值不同。在tryMap:中調用mapBlock( )閉包,返回是一個對象,如果這個對象不為nil,就返回[RACSignal return:mappedValue]。如果返回的對象是nil,那么就變換成error信號。
- timeout: onScheduler:
- (RACSignal *)timeout:(NSTimeInterval)interval onScheduler:(RACScheduler *)scheduler {
NSCParameterAssert(scheduler != nil);
NSCParameterAssert(scheduler != RACScheduler.immediateScheduler);
return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
RACCompoundDisposable *disposable = [RACCompoundDisposable compoundDisposable];
RACDisposable *timeoutDisposable = [scheduler afterDelay:interval schedule:^{
[disposable dispose];
[subscriber sendError:[NSError errorWithDomain:RACSignalErrorDomain code:RACSignalErrorTimedOut userInfo:nil]];
}];
[disposable addDisposable:timeoutDisposable];
RACDisposable *subscriptionDisposable = [self subscribeNext:^(id x) {
[subscriber sendNext:x];
} error:^(NSError *error) {
[disposable dispose];
[subscriber sendError:error];
} completed:^{
[disposable dispose];
[subscriber sendCompleted];
}];
[disposable addDisposable:subscriptionDisposable];
return disposable;
}] setNameWithFormat:@"[%@] -timeout: %f onScheduler: %@", self.name, (double)interval, scheduler];
}
timeout: onScheduler:的實現很簡單,它比正常的信號訂閱多了一個timeoutDisposable操作。它在信號訂閱的內部開啟了一個scheduler,經過interval的時間之后,就會停止訂閱原信號,并對訂閱者sendError。
這個操作的表意和方法名完全一致,經過interval的時間之后,就算timeout,那么就停止訂閱原信號,并sendError。
總結一下ReactiveCocoa v2.5中高階信號的升階 / 降階操作:
升階操作:
map( 把值map成一個信號)
[RACSignal return:signal]
降階操作:
flatten(等效于flatten:0,+merge:)
concat(等效于flatten:1)
flatten:1
switchToLatest
flattenMap:
這5種操作能將高階信號變?yōu)榈碗A信號,但是最終降階之后的效果就只有3種:switchToLatest,flatten,concat。具體的圖示見上面的分析。
二. 同步操作
在ReactiveCocoa中還包含一些同步的操作,這些操作一般我們很少使用,除非真的很確定這樣做了之后不會有什么問題,否則胡亂使用會導致線程死鎖等一些嚴重的問題。
- firstOrDefault: success: error:
- (id)firstOrDefault:(id)defaultValue success:(BOOL *)success error:(NSError **)error {
NSCondition *condition = [[NSCondition alloc] init];
condition.name = [NSString stringWithFormat:@"[%@] -firstOrDefault: %@ success:error:", self.name, defaultValue];
__block id value = defaultValue;
__block BOOL done = NO;
// Ensures that we don't pass values across thread boundaries by reference.
__block NSError *localError;
__block BOOL localSuccess;
[[self take:1] subscribeNext:^(id x) {
// 加鎖
[condition lock];
value = x;
localSuccess = YES;
done = YES;
[condition broadcast];
// 解鎖
[condition unlock];
} error:^(NSError *e) {
// 加鎖
[condition lock];
if (!done) {
localSuccess = NO;
localError = e;
done = YES;
[condition broadcast];
}
// 解鎖
[condition unlock];
} completed:^{
// 加鎖
[condition lock];
localSuccess = YES;
done = YES;
[condition broadcast];
// 解鎖
[condition unlock];
}];
// 加鎖
[condition lock];
while (!done) {
[condition wait];
}
if (success != NULL) *success = localSuccess;
if (error != NULL) *error = localError;
// 解鎖
[condition unlock];
return value;
}
從源碼上看,firstOrDefault: success: error:這種同步的方法很容易導致線程死鎖。它在subscribeNext,error,completed的閉包里面都調用condition鎖先lock再unlock。如果一個信號發(fā)送值過來,都沒有執(zhí)行subscribeNext,error,completed這3個操作里面的任意一個,那么就會執(zhí)行[condition wait],等待。
由于對原信號進行了take:1操作,所以只會對第一個值進行操作。執(zhí)行完subscribeNext,error,completed這3個操作里面的任意一個,又會加一次鎖,對外部傳進來的入參success和error進行賦值,已便外部可以拿到里面的狀態(tài)。最終返回信號是原信號中第一個next里面的值,如果原信號第一個值沒有,比如直接error或者completed,那么返回的是defaultValue。
done為YES表示已經成功執(zhí)行了subscribeNext,error,completed這3個操作里面的任意一個。反之為NO。
localSuccess為YES表示成功發(fā)送值或者成功發(fā)送完了原信號的所有值,期間沒有發(fā)生錯誤。
condition的broadcast操作是喚醒其他線程的操作,相當于操作系統(tǒng)里面互斥信號量的signal操作。
入參defaultValue是給內部變量value的一個初始值。當原信號發(fā)送出一個值之后,value的值時刻都會與原信號的值保持一致。
success和error是外部變量的地址,從外面可以監(jiān)聽到里面的狀態(tài)。在函數內部賦值,在函數外面拿到它們的值。
- firstOrDefault:
- (id)firstOrDefault:(id)defaultValue {
return [self firstOrDefault:defaultValue success:NULL error:NULL];
}
firstOrDefault:的實現就是調用了firstOrDefault: success: error:方法。只不過不需要傳success和error,不關心內部的狀態(tài)。最終返回信號是原信號中第一個next里面的值,如果原信號第一個值沒有,比如直接error或者completed,那么返回的是defaultValue。
- first
- (id)first {
return [self firstOrDefault:nil];
}
first方法就更加省略,連defaultValue也不傳。最終返回信號是原信號中第一個next里面的值,如果原信號第一個值沒有,比如直接error或者completed,那么返回的是nil。
- waitUntilCompleted:
- (BOOL)waitUntilCompleted:(NSError **)error {
BOOL success = NO;
[[[self
ignoreValues]
setNameWithFormat:@"[%@] -waitUntilCompleted:", self.name]
firstOrDefault:nil success:&success error:error];
return success;
}
waitUntilCompleted:里面還是調用firstOrDefault: success: error:方法。返回值是success。只要原信號正常的發(fā)送完信號,success應該為YES,但是如果發(fā)送過程中出現了error,success就為NO。success作為返回值,外部就可以監(jiān)聽到是否發(fā)送成功。
雖然這個方法可以監(jiān)聽到發(fā)送結束的狀態(tài),但是也盡量不要使用,因為它的實現調用了firstOrDefault: success: error:方法,這個方法里面有大量的鎖的操作,一不留神就會導致死鎖。
- toArray
- (NSArray *)toArray {
return [[[self collect] first] copy];
}
經過collect之后,原信號所有的值都會被加到一個數組里面,取出信號的第一個值就是一個數組。所以執(zhí)行完first之后第一個值就是原信號所有值的數組。
三. 副作用操作
ReactiveCocoa v2.5中還為我們提供了一些可以進行副作用操作的函數。
- doNext:
- (RACSignal *)doNext:(void (^)(id x))block {
NSCParameterAssert(block != NULL);
return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
return [self subscribeNext:^(id x) {
block(x);
[subscriber sendNext:x];
} error:^(NSError *error) {
[subscriber sendError:error];
} completed:^{
[subscriber sendCompleted];
}];
}] setNameWithFormat:@"[%@] -doNext:", self.name];
}
doNext:能讓我們在原信號sendNext之前,能執(zhí)行一個block閉包,在這個閉包中我們可以執(zhí)行我們想要執(zhí)行的副作用操作。
- doError:
- (RACSignal *)doError:(void (^)(NSError *error))block {
NSCParameterAssert(block != NULL);
return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
return [self subscribeNext:^(id x) {
[subscriber sendNext:x];
} error:^(NSError *error) {
block(error);
[subscriber sendError:error];
} completed:^{
[subscriber sendCompleted];
}];
}] setNameWithFormat:@"[%@] -doError:", self.name];
}
doError:能讓我們在原信號sendError之前,能執(zhí)行一個block閉包,在這個閉包中我們可以執(zhí)行我們想要執(zhí)行的副作用操作。
- doCompleted:
- (RACSignal *)doCompleted:(void (^)(void))block {
NSCParameterAssert(block != NULL);
return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
return [self subscribeNext:^(id x) {
[subscriber sendNext:x];
} error:^(NSError *error) {
[subscriber sendError:error];
} completed:^{
block();
[subscriber sendCompleted];
}];
}] setNameWithFormat:@"[%@] -doCompleted:", self.name];
}
doCompleted:能讓我們在原信號sendCompleted之前,能執(zhí)行一個block閉包,在這個閉包中我們可以執(zhí)行我們想要執(zhí)行的副作用操作。
- initially:
- (RACSignal *)initially:(void (^)(void))block {
NSCParameterAssert(block != NULL);
return [[RACSignal defer:^{
block();
return self;
}] setNameWithFormat:@"[%@] -initially:", self.name];
}
initially:能讓我們在原信號發(fā)送之前,先調用了defer:操作,在return self之前先執(zhí)行了一個閉包,在這個閉包中我們可以執(zhí)行我們想要執(zhí)行的副作用操作。
- finally:
- (RACSignal *)finally:(void (^)(void))block {
NSCParameterAssert(block != NULL);
return [[[self
doError:^(NSError *error) {
block();
}]
doCompleted:^{
block();
}]
setNameWithFormat:@"[%@] -finally:", self.name];
}
finally:操作調用了doError:和doCompleted:操作,依次在sendError之前,sendCompleted之前,插入一個block( )閉包。這樣當信號因為錯誤而要終止取消訂閱,或者,發(fā)送結束之前,都能執(zhí)行一段我們想要執(zhí)行的副作用操作。
四. 多線程操作
在RACSignal里面有3個關于多線程的操作。
- deliverOn:
- (RACSignal *)deliverOn:(RACScheduler *)scheduler {
return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
return [self subscribeNext:^(id x) {
[scheduler schedule:^{
[subscriber sendNext:x];
}];
} error:^(NSError *error) {
[scheduler schedule:^{
[subscriber sendError:error];
}];
} completed:^{
[scheduler schedule:^{
[subscriber sendCompleted];
}];
}];
}] setNameWithFormat:@"[%@] -deliverOn: %@", self.name, scheduler];
}
deliverOn:的入參是一個scheduler,當原信號subscribeNext,sendError,sendCompleted的時候,都去調用scheduler的schedule方法。
- (RACDisposable *)schedule:(void (^)(void))block {
NSCParameterAssert(block != NULL);
if (RACScheduler.currentScheduler == nil) return [self.backgroundScheduler schedule:block];
block();
return nil;
}
在schedule的方法里面會判斷當前currentScheduler是否為nil,如果是nil就調用backgroundScheduler去執(zhí)行block( )閉包,如果不為nil,當前currentScheduler直接執(zhí)行block( )閉包。
+ (instancetype)currentScheduler {
RACScheduler *scheduler = NSThread.currentThread.threadDictionary[RACSchedulerCurrentSchedulerKey];
if (scheduler != nil) return scheduler;
if ([self.class isOnMainThread]) return RACScheduler.mainThreadScheduler;
return nil;
}
判斷currentScheduler是否存在,看兩點,一是當前線程的字典里面,是否存在RACSchedulerCurrentSchedulerKey( @"RACSchedulerCurrentSchedulerKey" ),如果存在對應的value,返回scheduler,二是看當前的類是不是在主線程,如果在主線程,返回mainThreadScheduler。如果兩個條件都不存在,那么當前currentScheduler就不存在,返回nil。
deliverOn:操作的特點是原信號發(fā)送sendNext,sendError,sendCompleted所在線程是確定的。
- subscribeOn:
- (RACSignal *)subscribeOn:(RACScheduler *)scheduler {
return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
RACCompoundDisposable *disposable = [RACCompoundDisposable compoundDisposable];
RACDisposable *schedulingDisposable = [scheduler schedule:^{
RACDisposable *subscriptionDisposable = [self subscribe:subscriber];
[disposable addDisposable:subscriptionDisposable];
}];
[disposable addDisposable:schedulingDisposable];
return disposable;
}] setNameWithFormat:@"[%@] -subscribeOn: %@", self.name, scheduler];
}
subscribeOn:操作就是在傳入的scheduler的閉包內部訂閱原信號的。它與deliverOn:操作就不同:
subscribeOn:操作能夠保證didSubscribe block( )閉包在入參scheduler中執(zhí)行,但是不能保證原信號subscribeNext,sendError,sendCompleted在哪個scheduler中執(zhí)行。
deliverOn:與subscribeOn:正好反過來,能保證原信號subscribeNext,sendError,sendCompleted在哪個scheduler中執(zhí)行,但是不能保證didSubscribe block( )閉包在哪個scheduler中執(zhí)行。
- deliverOnMainThread
- (RACSignal *)deliverOnMainThread {
return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
__block volatile int32_t queueLength = 0;
void (^performOnMainThread)(dispatch_block_t) = ^(dispatch_block_t block) { // 暫時省略};
return [self subscribeNext:^(id x) {
performOnMainThread(^{
[subscriber sendNext:x];
});
} error:^(NSError *error) {
performOnMainThread(^{
[subscriber sendError:error];
});
} completed:^{
performOnMainThread(^{
[subscriber sendCompleted];
});
}];
}] setNameWithFormat:@"[%@] -deliverOnMainThread", self.name];
}
對比deliverOn:的源碼實現,發(fā)現兩者比較相似,只不過這里deliverOnMainThread把sendNext,sendError,sendCompleted都包在了performOnMainThread閉包中執(zhí)行。
__block volatile int32_t queueLength = 0;
void (^performOnMainThread)(dispatch_block_t) = ^(dispatch_block_t block) {
int32_t queued = OSAtomicIncrement32(&queueLength);
if (NSThread.isMainThread && queued == 1) {
block();
OSAtomicDecrement32(&queueLength);
} else {
dispatch_async(dispatch_get_main_queue(), ^{
block();
OSAtomicDecrement32(&queueLength);
});
}
};
performOnMainThread閉包內部保證了入參block( )閉包一定是在主線程中執(zhí)行。
OSAtomicIncrement32 和 OSAtomicDecrement32是原子操作,分別代表+1和-1。下面的if-else判斷里面,不管是滿足哪一條,最終都還是在主線程中執(zhí)行block( )閉包。
deliverOnMainThread能保證原信號subscribeNext,sendError,sendCompleted都在主線程MainThread中執(zhí)行。
五. 其他操作
- setKeyPath: onObject: nilValue:
setKeyPath: onObject: nilValue: 的源碼實現如下:
- (RACDisposable *)setKeyPath:(NSString *)keyPath onObject:(NSObject *)object nilValue:(id)nilValue {
NSCParameterAssert(keyPath != nil);
NSCParameterAssert(object != nil);
keyPath = [keyPath copy];
RACCompoundDisposable *disposable = [RACCompoundDisposable compoundDisposable];
__block void * volatile objectPtr = (__bridge void *)object;
RACDisposable *subscriptionDisposable = [self subscribeNext:^(id x) {
// 1
__strong NSObject *object __attribute__((objc_precise_lifetime)) = (__bridge __strong id)objectPtr;
[object setValue:x ?: nilValue forKeyPath:keyPath];
} error:^(NSError *error) {
__strong NSObject *object __attribute__((objc_precise_lifetime)) = (__bridge __strong id)objectPtr;
NSCAssert(NO, @"Received error from %@ in binding for key path \"%@\" on %@: %@", self, keyPath, object, error);
NSLog(@"Received error from %@ in binding for key path \"%@\" on %@: %@", self, keyPath, object, error);
[disposable dispose];
} completed:^{
[disposable dispose];
}];
[disposable addDisposable:subscriptionDisposable];
#if DEBUG
static void *bindingsKey = &bindingsKey;
NSMutableDictionary *bindings;
@synchronized (object) {
// 2
bindings = objc_getAssociatedObject(object, bindingsKey);
if (bindings == nil) {
bindings = [NSMutableDictionary dictionary];
objc_setAssociatedObject(object, bindingsKey, bindings, OBJC_ASSOCIATION_RETAIN_NONATOMIC);
}
}
@synchronized (bindings) {
NSCAssert(bindings[keyPath] == nil, @"Signal %@ is already bound to key path \"%@\" on object %@, adding signal %@ is undefined behavior", [bindings[keyPath] nonretainedObjectValue], keyPath, object, self);
bindings[keyPath] = [NSValue valueWithNonretainedObject:self];
}
#endif
RACDisposable *clearPointerDisposable = [RACDisposable disposableWithBlock:^{
#if DEBUG
@synchronized (bindings) {
// 3
[bindings removeObjectForKey:keyPath];
}
#endif
while (YES) {
void *ptr = objectPtr;
// 4
if (OSAtomicCompareAndSwapPtrBarrier(ptr, NULL, &objectPtr)) {
break;
}
}
}];
[disposable addDisposable:clearPointerDisposable];
[object.rac_deallocDisposable addDisposable:disposable];
RACCompoundDisposable *objectDisposable = object.rac_deallocDisposable;
return [RACDisposable disposableWithBlock:^{
[objectDisposable removeDisposable:disposable];
[disposable dispose];
}];
}
代碼雖然有點長,但是逐行讀下來不是很難,需要注意的有4點地方,已經在上述代碼里面標明了。接下來一一分析。
- objc_precise_lifetime的問題。
作者在這里寫了一段注釋:
Possibly spec, possibly compiler bug, but this __bridge cast does not result in a retain here, effectively an invisible __unsafe_unretained qualifier. Using objc_precise_lifetime gives the __strong reference desired. The explicit use of __strong is strictly defensive.
作者懷疑是編譯器的一個bug,即使是顯示的調用了__strong,依舊沒法保證被強引用了,所以還需要用objc_precise_lifetime來保證強引用。
關于這個問題,筆者查詢了一下LLVM的文檔,在6.3 precise lifetime semantics這一節(jié)中提到了這個問題。
通常上,凡是聲明了__strong的變量,都會有很確切的生命周期。ARC會維持這些__strong的變量在其生命周期中被retained。
但是自動存儲的局部變量是沒有確切的生命周期的。這些變量僅僅只是簡單的持有一個強引用,強引用著retain對象的指針類型的值。這些值完全受控于本地控制者的如何優(yōu)化。所以要想改變這些局部變量的生命周期,是不可能的事情。因為有太多的優(yōu)化,理論上都會導致局部變量的生命周期減少,但是這些優(yōu)化非常有用。
但是LLVM為我們提供了一個關鍵字objc_precise_lifetime,使用這個可以是局部變量的生命周期變成確切的。這個關鍵字有時候還是非常有用的。甚至更加極端情況,該局部變量都沒有被使用,但是它依舊可以保持一個確定的生命周期。
回到源碼上來,接著代碼會對入參object進行setValue: forKeyPath:
[object setValue:x ?: nilValue forKeyPath:keyPath];
如何x為nil就返回nilValue傳進來的值。
- AssociatedObject關聯對象
如果bindings字典不存在,那么就調用objc_setAssociatedObject對object進行關聯對象。參數是OBJC_ASSOCIATION_RETAIN_NONATOMIC。如果bindings字典存在,就用objc_getAssociatedObject取出字典。
在字典里面重新更新綁定key-value值,key就是入參keyPath,value是原信號。
- 取消訂閱原信號的時候
[bindings removeObjectForKey:keyPath];
當信號取消訂閱的時候,移除所有的關聯值。
- OSAtomicCompareAndSwapPtrBarrier
這個函數屬于OSAtomic原子操作,原型如下:
OSAtomicCompareAndSwapPtrBarrier(type __oldValue, type __newValue, volatile type *__theValue)
Compares a variable against the specified old value. If the two values are equal, this function assigns the specified new value to the variable; otherwise, it does nothing. The comparison and assignment are done as one atomic operation and the function returns a Boolean value indicating whether the swap actually occurred.
這個函數用于比較__oldValue是否與__theValue指針指向的內存位置的值匹配,如果匹配,則將__newValue的值存儲到__theValue指向的內存位置。整個函數的返回值就是交換是否成功的BOOL值。
while (YES) {
void *ptr = objectPtr;
if (OSAtomicCompareAndSwapPtrBarrier(ptr, NULL, &objectPtr)) {
break;
}
}
在這個while的死循環(huán)里面只有當OSAtomicCompareAndSwapPtrBarrier返回值為YES,才能退出整個死循環(huán)。返回值為YES就代表&objectPtr被置為了NULL,這樣就確保了在線程安全的情況下,不存在野指針的問題了。
- setKeyPath: onObject:
- (RACDisposable *)setKeyPath:(NSString *)keyPath onObject:(NSObject *)object {
return [self setKeyPath:keyPath onObject:object nilValue:nil];
}
setKeyPath: onObject:就是調用setKeyPath: onObject: nilValue:方法,只不過nilValue傳遞的是nil。
最后
關于RACSignal的所有操作底層分析實現都已經分析完成。最后請大家多多指教。