上一篇我們探索了RAC的核心流程就是:
- 創(chuàng)建信號
- 訂閱信號
- 訂閱者發(fā)送信號
- 銷毀
那么我們根據(jù)這些操作來看看RAC的核心類以及具體實(shí)現(xiàn),主要就是信號、訂閱、銷毀這3個(gè)點(diǎn)。
RACSignal
RACSiganl,信號類,是抽象類RACStream的子類。在RAC中,萬物皆信號,只有有了信號,才會有的一系列操作。一般表示將來有數(shù)據(jù)傳遞,只要有數(shù)據(jù)改變,信號內(nèi)部接收到數(shù)據(jù),就會馬上發(fā)出數(shù)據(jù)。但是它本身不具備發(fā)送信號的能力,而是交給內(nèi)部一個(gè)訂閱者去發(fā)出。
+ (RACSignal *)createSignal:(RACDisposable * (^)(id<RACSubscriber> subscriber))didSubscribe {
return [RACDynamicSignal createSignal:didSubscribe];
}
而最終的訂閱方法的實(shí)現(xiàn)是由RACDynamicSignal來完成的。該類有2個(gè)方法,一個(gè)就是創(chuàng)建信號,保存信號創(chuàng)建的回調(diào);另一個(gè)就是執(zhí)行訂閱方法的時(shí)候調(diào)用信號創(chuàng)建保存的block,返回銷毀者。
+ (RACSignal *)createSignal:(RACDisposable * (^)(id<RACSubscriber> subscriber))didSubscribe {
RACDynamicSignal *signal = [[self alloc] init];
signal->_didSubscribe = [didSubscribe copy];
return [signal setNameWithFormat:@"+createSignal:"];
}
- (RACDisposable *)subscribe:(id<RACSubscriber>)subscriber {
NSCParameterAssert(subscriber != nil);
RACCompoundDisposable *disposable = [RACCompoundDisposable compoundDisposable];
subscriber = [[RACPassthroughSubscriber alloc] initWithSubscriber:subscriber signal:self disposable:disposable];
if (self.didSubscribe != NULL) {
RACDisposable *schedulingDisposable = [RACScheduler.subscriptionScheduler schedule:^{
RACDisposable *innerDisposable = self.didSubscribe(subscriber);
[disposable addDisposable:innerDisposable];
}];
[disposable addDisposable:schedulingDisposable];
}
return disposable;
}
RACPassthroughSubscriber
核心訂閱類RACPassthroughSubscriber實(shí)現(xiàn)了一個(gè)協(xié)議RACSubscriber,實(shí)現(xiàn)了該協(xié)議就有訂閱信號,接收值處理值的能力。
- (instancetype)initWithSubscriber:(id<RACSubscriber>)subscriber signal:(RACSignal *)signal disposable:(RACCompoundDisposable *)disposable {
NSCParameterAssert(subscriber != nil);
self = [super init];
_innerSubscriber = subscriber;
_signal = signal;
_disposable = disposable;
[self.innerSubscriber didSubscribeWithDisposable:self.disposable];
return self;
}
-
subscriber:核心訂閱者 -
signal:信號 -
disposable:銷毀者
那我們再來看看, RACSubscriber這個(gè)協(xié)議都定義了那些方法
- 保存
nextBlock、錯(cuò)誤信息、完成信息。
+ (instancetype)subscriberWithNext:(void (^)(id x))next error:(void (^)(NSError *error))error completed:(void (^)(void))completed {
RACSubscriber *subscriber = [[self alloc] init];
subscriber->_next = [next copy];
subscriber->_error = [error copy];
subscriber->_completed = [completed copy];
return subscriber;
}
- 發(fā)送正常的信號,然后執(zhí)行數(shù)據(jù)的回調(diào)。
- (void)sendNext:(id)value {
@synchronized (self) {
void (^nextBlock)(id) = [self.next copy];
if (nextBlock == nil) return;
nextBlock(value);
}
}
- 發(fā)送錯(cuò)誤,會銷毀信號,然后回調(diào)錯(cuò)誤,以后再也不接收這個(gè)信號的變化了。
- (void)sendError:(NSError *)e {
@synchronized (self) {
void (^errorBlock)(NSError *) = [self.error copy];
[self.disposable dispose];
if (errorBlock == nil) return;
errorBlock(e);
}
}
- 發(fā)送完成信號,會銷毀信號,然后回調(diào)完成的操作,同樣,以后再也不接收這個(gè)信號的變化了。
- (void)sendCompleted {
@synchronized (self) {
void (^completedBlock)(void) = [self.completed copy];
[self.disposable dispose];
if (completedBlock == nil) return;
completedBlock();
}
}
可以看到sendError和sendCompleted是在方法里就會銷毀,那么sendNext是什么時(shí)候銷毀呢?就是調(diào)用者銷毀的時(shí)候,也就是我們所說的self或者viewController銷毀的時(shí)候。就會調(diào)用到RACSubscriber重寫的dealloc方法。
- (void)dealloc {
[self.disposable dispose];
}
RACScheduler
Schedulers are used to control when and where work is performed.
調(diào)度者,用于控制工作何時(shí)何地執(zhí)行。其實(shí),它就是對GCD的封裝。
當(dāng)我們在- (RACDisposable *)subscribe:(id<RACSubscriber>)subscriber方法中調(diào)用schedule方法的時(shí)候,會判斷是否有currentScheduler,如果有執(zhí)行schedule中的block,沒有的話就使用自己定義的backgroundScheduler,進(jìn)行處理block。
- (RACDisposable *)schedule:(void (^)(void))block {
NSCParameterAssert(block != NULL);
if (RACScheduler.currentScheduler == nil) return [self.backgroundScheduler schedule:block];
block();
return nil;
}
會先從threadDictionary里面獲取RACScheduler,如果獲取的值不為nil就將其值返回,用來調(diào)用信號的處理。如果為空的話,再來判斷當(dāng)前的任務(wù)是否在主隊(duì)列或者主線程,如果是的話就返回自己封裝的GCD主隊(duì)列用來調(diào)用。
+ (RACScheduler *)currentScheduler {
RACScheduler *scheduler = NSThread.currentThread.threadDictionary[RACSchedulerCurrentSchedulerKey];
if (scheduler != nil) return scheduler;
if ([self.class isOnMainThread]) return RACScheduler.mainThreadScheduler;
return nil;
}
+ (BOOL)isOnMainThread {
return [NSOperationQueue.currentQueue isEqual:NSOperationQueue.mainQueue] || [NSThread isMainThread];
}
+ (RACScheduler *)mainThreadScheduler {
static dispatch_once_t onceToken;
static RACScheduler *mainThreadScheduler;
dispatch_once(&onceToken, ^{
mainThreadScheduler = [[RACTargetQueueScheduler alloc] initWithName:@"org.reactivecocoa.ReactiveObjC.RACScheduler.mainThreadScheduler" targetQueue:dispatch_get_main_queue()];
});
return mainThreadScheduler;
}
_backgroundScheduler其實(shí)也是一個(gè)默認(rèn)優(yōu)先級的全局串行隊(duì)列,它是由RACTargetQueueScheduler生成的實(shí)例。
RACTargetQueueScheduler, A scheduler that enqueues blocks on a private serial queue, targeting an arbitrary GCD queue.
而RACTargetQueueScheduler是在任意的GCD的線程中創(chuàng)建一個(gè)串行隊(duì)列,然后將這些待處理的block都放在這個(gè)串行對劣質(zhì),讓調(diào)度者處理的block能夠同步執(zhí)行,也就是讓信號的任務(wù)同步執(zhí)行。
_backgroundScheduler = [RACScheduler scheduler];
+ (RACScheduler *)scheduler {
return [self schedulerWithPriority:RACSchedulerPriorityDefault];
}
+ (RACScheduler *)schedulerWithPriority:(RACSchedulerPriority)priority {
return [self schedulerWithPriority:priority name:@"org.reactivecocoa.ReactiveObjC.RACScheduler.backgroundScheduler"];
}
// 創(chuàng)建一個(gè)全局的并發(fā)隊(duì)列
+ (RACScheduler *)schedulerWithPriority:(RACSchedulerPriority)priority name:(NSString *)name {
return [[RACTargetQueueScheduler alloc] initWithName:name targetQueue:dispatch_get_global_queue(priority, 0)];
}
- (instancetype)initWithName:(NSString *)name targetQueue:(dispatch_queue_t)targetQueue {
NSCParameterAssert(targetQueue != NULL);
if (name == nil) {
name = [NSString stringWithFormat:@"org.reactivecocoa.ReactiveObjC.RACTargetQueueScheduler(%s)", dispatch_queue_get_label(targetQueue)];
}
// 此處使用串行隊(duì)列的原因就是想要信號一個(gè)一個(gè)的處理
dispatch_queue_t queue = dispatch_queue_create(name.UTF8String, DISPATCH_QUEUE_SERIAL);
if (queue == NULL) return nil;
// 防止信號并發(fā)處理
dispatch_set_target_queue(queue, targetQueue);
return [super initWithName:name queue:queue];
}
全局串行隊(duì)列調(diào)度處理的方式如下:
- (RACDisposable *)schedule:(void (^)(void))block {
NSCParameterAssert(block != NULL);
RACDisposable *disposable = [[RACDisposable alloc] init];
dispatch_async(self.queue, ^{
if (disposable.disposed) return;
[self performAsCurrentScheduler:block];
});
return disposable;
}
- (void)performAsCurrentScheduler:(void (^)(void))block {
NSCParameterAssert(block != NULL);
// 獲取當(dāng)前的調(diào)度者
RACScheduler *previousScheduler = RACScheduler.currentScheduler;
NSThread.currentThread.threadDictionary[RACSchedulerCurrentSchedulerKey] = self;
// 先執(zhí)行調(diào)度的block
autoreleasepool {
block();
}
if (previousScheduler != nil) {
// 如果不是空的 放入threadDictionary
NSThread.currentThread.threadDictionary[RACSchedulerCurrentSchedulerKey] = previousScheduler;
} else {
// 如果是空的就移除threadDictionary中對應(yīng)的值
// 執(zhí)行下一次schedule操作的又會進(jìn)入一次新的判斷
[NSThread.currentThread.threadDictionary removeObjectForKey:RACSchedulerCurrentSchedulerKey];
}
}
驗(yàn)證一下:
- 主線程執(zhí)行調(diào)度

- 子線程執(zhí)行調(diào)度

可以看出當(dāng)在主線程中使用信號的時(shí)候,調(diào)度者是把主線程的中的信號任務(wù)加入到主線程串行隊(duì)列中執(zhí)行,而在子線程中使用信號的時(shí)候,調(diào)度者會自己創(chuàng)建一個(gè)子線程,并把任務(wù)都加入到自己創(chuàng)建的這個(gè)子線程同步隊(duì)列中執(zhí)行。整個(gè)流程大致如下:

RACDisposable & RACCompoundDisposable
RACDisposable,銷毀者,取消訂閱,銷毀訂閱信號操作中間生成的一些輔助銷毀對象。
RACCompoundDisposable繼承自RACDisposable,它和RACDisposable的區(qū)別是RACDisposable是單個(gè)銷毀者,RACCompoundDisposable是銷毀者集合,它管理著一個(gè)或者多個(gè)RACDisposable。
- (instancetype)initWithBlock:(void (^)(void))block {
NSCParameterAssert(block != nil);
self = [super init];
// 保存銷毀后續(xù)操作的block
_disposeBlock = (void *)CFBridgingRetain([block copy]);
OSMemoryBarrier();
return self;
}
+ (instancetype)disposableWithBlock:(void (^)(void))block {
return [[self alloc] initWithBlock:block];
}
- (void)dealloc {
if (_disposeBlock == NULL || _disposeBlock == (__bridge void *)self) return;
CFRelease(_disposeBlock);
_disposeBlock = NULL;
}
- (void)dispose {
void (^disposeBlock)(void) = NULL;
while (YES) {
void *blockPtr = _disposeBlock;
// 防止多線程操作引發(fā)的錯(cuò)誤
if (OSAtomicCompareAndSwapPtrBarrier(blockPtr, NULL, &_disposeBlock)) {
if (blockPtr != (__bridge void *)self) {
disposeBlock = CFBridgingRelease(blockPtr);
}
break;
}
}
// 執(zhí)行銷毀后續(xù)的block
if (disposeBlock != nil) disposeBlock();
}
當(dāng)訂閱者調(diào)用dealloc方法的時(shí)候執(zhí)行[self.disposable dispose],由于銷毀者都被RACCompoundDisposable管理,就會調(diào)用[RACCompoundDisposable dispose],在這個(gè)方法中,遍歷管理的銷毀者,找到對應(yīng)需要銷毀的,然后調(diào)用[RACDisposable dispose]進(jìn)行銷毀。
+ (instancetype)compoundDisposable {
return [[self alloc] initWithDisposables:nil];
}
+ (instancetype)compoundDisposableWithDisposables:(NSArray *)disposables {
return [[self alloc] initWithDisposables:disposables];
}
- (instancetype)init {
self = [super init];
const int result __attribute__((unused)) = pthread_mutex_init(&_mutex, NULL);
return self;
}
- (instancetype)initWithDisposables:(NSArray *)otherDisposables {
self = [self init];
// 將所有disposable保存到_inlineDisposables
#if RACCompoundDisposableInlineCount
[otherDisposables enumerateObjectsUsingBlock:^(RACDisposable *disposable, NSUInteger index, BOOL *stop) {
self->_inlineDisposables[index] = disposable;
if (index == RACCompoundDisposableInlineCount - 1) *stop = YES;
}];
#endif
// 相對應(yīng)的添加操作
if (otherDisposables.count > RACCompoundDisposableInlineCount) {
_disposables = RACCreateDisposablesArray();
CFRange range = CFRangeMake(RACCompoundDisposableInlineCount, (CFIndex)otherDisposables.count - RACCompoundDisposableInlineCount);
CFArrayAppendArray(_disposables, (__bridge CFArrayRef)otherDisposables, range);
}
return self;
}
- (instancetype)initWithBlock:(void (^)(void))block {
RACDisposable *disposable = [RACDisposable disposableWithBlock:block];
return [self initWithDisposables:@[ disposable ]];
}
// 向RACCompoundDisposable管理集合中添加 RACDisposable
// 將disposable添加到_inlineDisposables
// 如果_inlineDisposables滿了就添加到_disposables
- (void)addDisposable:(RACDisposable *)disposable {
NSCParameterAssert(disposable != self);
if (disposable == nil || disposable.disposed) return;
BOOL shouldDispose = NO;
pthread_mutex_lock(&_mutex);
{
if (_disposed) {
shouldDispose = YES;
} else {
#if RACCompoundDisposableInlineCount
for (unsigned i = 0; i < RACCompoundDisposableInlineCount; i++) {
if (_inlineDisposables[i] == nil) {
_inlineDisposables[i] = disposable;
goto foundSlot;
}
}
#endif
if (_disposables == NULL)
_disposables = RACCreateDisposablesArray();
CFArrayAppendValue(_disposables, (__bridge void *)disposable);
if (RACCOMPOUNDDISPOSABLE_ADDED_ENABLED()) {
RACCOMPOUNDDISPOSABLE_ADDED(self.description.UTF8String, disposable.description.UTF8String, CFArrayGetCount(_disposables) + RACCompoundDisposableInlineCount);
}
#if RACCompoundDisposableInlineCount
foundSlot:;
#endif
}
}
pthread_mutex_unlock(&_mutex);
if (shouldDispose) [disposable dispose];
}
// 從RACCompoundDisposable管理集合中移除 RACDisposable
// 刪除_inlineDisposables對應(yīng)的disposable
- (void)removeDisposable:(RACDisposable *)disposable {
if (disposable == nil) return;
pthread_mutex_lock(&_mutex);
{
if (!_disposed) {
// 移除_inlineDisposables中的RACDisposable
#if RACCompoundDisposableInlineCount
for (unsigned i = 0; i < RACCompoundDisposableInlineCount; i++) {
if (_inlineDisposables[i] == disposable)
_inlineDisposables[i] = nil;
}
#endif
// 移除_disposables中的RACDisposable
if (_disposables != NULL) {
CFIndex count = CFArrayGetCount(_disposables);
for (CFIndex i = count - 1; i >= 0; i--) {
const void *item = CFArrayGetValueAtIndex(_disposables, i);
if (item == (__bridge void *)disposable) {
CFArrayRemoveValueAtIndex(_disposables, i);
}
}
if (RACCOMPOUNDDISPOSABLE_REMOVED_ENABLED()) {
RACCOMPOUNDDISPOSABLE_REMOVED(self.description.UTF8String, disposable.description.UTF8String, CFArrayGetCount(_disposables) + RACCompoundDisposableInlineCount);
}
}
}
}
pthread_mutex_unlock(&_mutex);
}
// 對_disposables中的RACDisposable進(jìn)行銷毀
static void disposeEach(const void *value, void *context) {
RACDisposable *disposable = (__bridge id)value;
[disposable dispose];
}
// 遍歷_inlineDisposables中的RACDisposable,進(jìn)行銷毀
// 如果_disposables中存在RACDisposable,也需要進(jìn)行遍歷,銷毀
// 銷毀的操作就是將原來的值置為nil
- (void)dispose {
#if RACCompoundDisposableInlineCount
RACDisposable *inlineCopy[RACCompoundDisposableInlineCount];
#endif
CFArrayRef remainingDisposables = NULL;
pthread_mutex_lock(&_mutex);
{
_disposed = YES;
#if RACCompoundDisposableInlineCount
for (unsigned i = 0; i < RACCompoundDisposableInlineCount; i++) {
inlineCopy[i] = _inlineDisposables[i];
_inlineDisposables[i] = nil;
}
#endif
remainingDisposables = _disposables;
_disposables = NULL;
}
pthread_mutex_unlock(&_mutex);
// 對_inlineDisposables中的RACDisposable 進(jìn)行銷毀
#if RACCompoundDisposableInlineCount
for (unsigned i = 0; i < RACCompoundDisposableInlineCount; i++) {
[inlineCopy[i] dispose];
}
#endif
if (remainingDisposables == NULL) return;
CFIndex count = CFArrayGetCount(remainingDisposables);
CFArrayApplyFunction(remainingDisposables, CFRangeMake(0, count), &disposeEach, NULL);
CFRelease(remainingDisposables);
}
#define RACCompoundDisposableInlineCount 2
RACCompoundDisposable通過2種方式對RACDisposable實(shí)例進(jìn)行管理:
-
_inlineDisposables:RACDisposable *_inlineDisposables[RACCompoundDisposableInlineCount],這是一種快捷方式,但是只能添加2個(gè),一旦滿了,就需要?jiǎng)?chuàng)建_disposables這個(gè)數(shù)組來存儲。對_inlineDisposables的操作需要加鎖。 -
_disposables:CFMutableArrayRef _disposables,這個(gè)數(shù)組是作為一個(gè)備用數(shù)組,當(dāng)_inlineDisposables存滿了就需要使用這個(gè)數(shù)組來存儲RACDisposable。操作需要加鎖。當(dāng)_disposables也存滿了就需要將保存的disposable銷毀。
在上述例子,我們在銷毀信號的地方設(shè)置一個(gè)斷點(diǎn),查看一下堆棧信息:

這也驗(yàn)證了我們上述的流程。
RACObserve
RACObserve是對KVO的封裝,下面我們來看看其具體實(shí)現(xiàn)。
#define RACObserve(TARGET, KEYPATH) _RACObserve(TARGET, KEYPATH)
#define _RACObserve(TARGET, KEYPATH) \
({ \
__weak id target_ = (TARGET); \
[target_ rac_valuesForKeyPath:@keypath(TARGET, KEYPATH) observer:self]; \
})
- (RACSignal *)rac_valuesForKeyPath:(NSString *)keyPath observer:(__weak NSObject *)observer {
return [[[self
rac_valuesAndChangesForKeyPath:keyPath options:NSKeyValueObservingOptionInitial observer:observer]
map:^(RACTuple *value) {
return value[0];
}]
setNameWithFormat:@"RACObserve(%@, %@)", RACDescription(self), keyPath];
}
- (RACSignal *)rac_valuesAndChangesForKeyPath:(NSString *)keyPath options:(NSKeyValueObservingOptions)options observer:(__weak NSObject *)weakObserver {
NSObject *strongObserver = weakObserver;
keyPath = [keyPath copy];
__weak NSObject *weakSelf = self;
// target的銷毀信號,不再訂閱信號
RACSignal *deallocSignal = [[RACSignal
zip:@[
self.rac_willDeallocSignal,
strongObserver.rac_willDeallocSignal ?: [RACSignal never]
]]
doCompleted:^{
......
}];
return [[[RACSignal
createSignal:^ RACDisposable * (id<RACSubscriber> subscriber) {
......
// 觀察者銷毀 訂閱者也發(fā)送完成信號
if (self == nil) {
[subscriber sendCompleted];
return nil;
}
return [self rac_observeKeyPath:keyPath options:options observer:observer block:^(id value, NSDictionary *change, BOOL causedByDealloc, BOOL affectedOnlyLastComponent) {
// 如果KVO檢測到被觀察者發(fā)生變化,則通過訂閱者發(fā)送信號數(shù)據(jù)
[subscriber sendNext:RACTuplePack(value, change)];
}];
}]
// 銷毀信號之后不再訂閱信號
takeUntil:deallocSignal]];
}
- (RACDisposable *)rac_observeKeyPath:(NSString *)keyPath options:(NSKeyValueObservingOptions)options observer:(__weak NSObject *)weakObserver block:(void (^)(id, NSDictionary *, BOOL, BOOL))block {
......
keyPath = [keyPath copy];
NSObject *strongObserver = weakObserver;
// 對keyPath的處理
NSArray *keyPathComponents = keyPath.rac_keyPathComponents;
BOOL keyPathHasOneComponent = (keyPathComponents.count == 1);
NSString *keyPathHead = keyPathComponents[0];
NSString *keyPathTail = keyPath.rac_keyPathByDeletingFirstKeyPathComponent;
// 初始化銷毀者管理集合
RACCompoundDisposable *disposable = [RACCompoundDisposable compoundDisposable];
BOOL shouldAddDeallocObserver = NO;
// 獲取觀察者的屬性 這也就是為什么我們能在RACObserve()里面獲取到響應(yīng)屬性的原因
objc_property_t property = class_getProperty(object_getClass(self), keyPathHead.UTF8String);
if (property != NULL) {
rac_propertyAttributes *attributes = rac_copyPropertyAttributes(property);
if (attributes != NULL) {
shouldAddDeallocObserver = isObject && isWeak && !isBlock && !isProtocol;
}
}
// dealloc觀察者訂閱信號的銷毀操作
void (^addDeallocObserverToPropertyValue)(NSObject *) = ^(NSObject *value) {
.......
RACDisposable *deallocDisposable = [RACDisposable disposableWithBlock:^{
block(nil, change, YES, keyPathHasOneComponent);
}];
[valueDisposable addDisposable:deallocDisposable];
[firstComponentDisposable() addDisposable:[RACDisposable disposableWithBlock:^{
[valueDisposable removeDisposable:deallocDisposable];
}]];
};
// 將回調(diào)塊添加到值的dealloc操作中,把清理回調(diào)的邏輯添加到銷毀者
void (^addObserverToValue)(NSObject *) = ^(NSObject *value) {
RACDisposable *observerDisposable = [value rac_observeKeyPath:keyPathTail options:(options & ~NSKeyValueObservingOptionInitial) observer:weakObserver block:block];
[firstComponentDisposable() addDisposable:observerDisposable];
};
NSKeyValueObservingOptions trampolineOptions = (options | NSKeyValueObservingOptionPrior) & ~NSKeyValueObservingOptionInitial;
// 將觀察的屬性封裝起來 進(jìn)行觀察
// RACKVOTrampoline 是KVO的觀察者 一旦被dispose 觀察就會停止
RACKVOTrampoline *trampoline = [[RACKVOTrampoline alloc] initWithTarget:self observer:strongObserver keyPath:keyPathHead options:trampolineOptions block:^(id trampolineTarget, id trampolineObserver, NSDictionary *change) {
// 觀察者發(fā)生變化的回調(diào)
// 之后再通過sendNext的方式傳給訂閱者
......
block([value valueForKeyPath:keyPathTail], change, NO, keyPathHasOneComponent);
......
}];
// 當(dāng)銷毀trampoline的時(shí)候就停止KVO觀察
[disposable addDisposable:trampoline];
// 如果需要的話 返回初始化的值
block(initialValue, initialChange, NO, keyPathHasOneComponent);
......
// 銷毀對應(yīng)的觀察
return [RACDisposable disposableWithBlock:^{
[disposable dispose];
[observerDisposable removeDisposable:disposable];
[selfDisposable removeDisposable:disposable];
}];
}
// RACKVOTrampoline繼承自RACDisposable,是一個(gè)KVO觀察
// 將被觀察者封裝起來
- (instancetype)initWithTarget:(__weak NSObject *)target observer:(__weak NSObject *)observer keyPath:(NSString *)keyPath options:(NSKeyValueObservingOptions)options block:(RACKVOBlock)block {
NSObject *strongTarget = target;
if (strongTarget == nil) return nil;
self = [super init];
// 保存觀察的相關(guān)信息:keyPath、回調(diào)block、被觀察者、觀察者
_keyPath = [keyPath copy];
_block = [block copy];
_weakTarget = target;
_unsafeTarget = strongTarget;
_observer = observer;
// 使用中間代理層來響應(yīng)KVO
// strongTarget 被觀察對象
// RACKVOProxy.sharedProxy 觀察者
// 在RACKVOProxy.sharedP內(nèi)部維護(hù)一張NSMapTable 把所有的觀察者都存在這個(gè)表里
[RACKVOProxy.sharedProxy addObserver:self forContext:(__bridge void *)self];
[strongTarget addObserver:RACKVOProxy.sharedProxy forKeyPath:self.keyPath options:options context:(__bridge void *)self];
[strongTarget.rac_deallocDisposable addDisposable:self];
[self.observer.rac_deallocDisposable addDisposable:self];
return self;
}
// 響應(yīng)KVO,被觀察者變化的回調(diào)
- (void)observeValueForKeyPath:(NSString *)keyPath ofObject:(id)object change:(NSDictionary *)change context:(void *)context {
RACKVOBlock block;
id observer;
id target;
@synchronized (self) {
block = self.block;
observer = self.observer;
target = self.weakTarget;
}
if (block == nil || target == nil) return;
// 觀察到被觀察者變化的回調(diào)block
block(target, observer, change);
}
- (void)dispose {
NSObject *target;
NSObject *observer;
@synchronized (self) {
_block = nil;
target = self.unsafeTarget;
observer = self.observer;
_unsafeTarget = nil;
_observer = nil;
}
[target.rac_deallocDisposable removeDisposable:self];
[observer.rac_deallocDisposable removeDisposable:self];
// 以下兩步就確定了RACKVO不需要手動(dòng)移除觀察者
// 移除中間層的觀察
[target removeObserver:RACKVOProxy.sharedProxy forKeyPath:self.keyPath context:(__bridge void *)self];
// 中間層移除觀察者
[RACKVOProxy.sharedProxy removeObserver:self forContext:(__bridge void *)self];
}
總結(jié)一下:
-
RACObserve是封裝的一個(gè)宏定義,這個(gè)宏定義的實(shí)現(xiàn)在NSObject的一個(gè)名為RACPropertySubscribing的分類, - 在
RACPropertySubscribing中主要的處理:- 當(dāng)收到釋放訂閱觀察的信號就不再訂閱、
- 當(dāng)前類被銷毀的時(shí)候,訂閱者發(fā)送完成的信號
- 進(jìn)入
NSObject的RACKVOWrapper分類中,主要處理如下:- 對
keyPath的處理,比如二級路由等 - 初始化
RACCompoundDisposable管理銷毀者 - 獲取觀察者的屬性
- 處理一些關(guān)于
dealloc的銷毀操作
- 對
- 進(jìn)入
RACKVOTrampoline這個(gè)類:- 包裝觀察需要的相關(guān)屬性
- 使用中間層
RACKVOProxy來觀察被觀察的對象,而不是直接使用觀察者,將觀察者保存在RACKVOProxy中 -
RACKVOTrampoline被釋放的時(shí)候,移除觀察者 - 回調(diào)給外界觀察者發(fā)生的變化
-
RACKVOProxy:中間層,充當(dāng)觀察者- 維護(hù)一張表
NSMapTable來保存原來的觀察者 - 響應(yīng)系統(tǒng)被觀察者發(fā)生變化的回調(diào)
- 維護(hù)一張表
被觀察者的變化則是通過訂閱者發(fā)送給外界:
[subscriber sendNext:RACTuplePack(value, change)]
移除觀察者的流程則是當(dāng)觀察者被銷毀的時(shí)候,就會銷毀相關(guān)的臨時(shí)變量,也就是觀察的銷毀者等,這樣就會調(diào)用到RACKVOTrampoline這個(gè)類的dispose方法,移除觀察者。

RACSubject
RACSubject可以看做是一種可以手動(dòng)控制發(fā)送next, completed, and error的信號,對橋接RAC和非RAC很有幫助。它RACSignal的子類,還實(shí)現(xiàn)了RACSubscriber協(xié)議。
它有以下特點(diǎn):
- 能夠手動(dòng)發(fā)送事件
- 能夠訂閱信號

那么,它是如何做到又能發(fā)送信號,又能訂閱信號的呢?我們來看看它的內(nèi)部實(shí)現(xiàn)。
@property (nonatomic, strong, readonly) NSMutableArray *subscribers;
@property (nonatomic, strong, readonly) RACCompoundDisposable *disposable;
+ (instancetype)subject {
return [[self alloc] init];
}
- (instancetype)init {
self = [super init];
if (self == nil) return nil;
_disposable = [RACCompoundDisposable compoundDisposable];
_subscribers = [[NSMutableArray alloc] initWithCapacity:1];
return self;
}
它的內(nèi)部維護(hù)了一個(gè)銷毀者集合和一個(gè)訂閱者數(shù)組。這個(gè)訂閱者的數(shù)組就是實(shí)現(xiàn)訂閱的關(guān)鍵。
外部調(diào)用訂閱方法的時(shí)候,就會把生成的訂閱者存放在_subscribers這個(gè)數(shù)組里面。
- (RACDisposable *)subscribeNext:(void (^)(id x))nextBlock {
NSCParameterAssert(nextBlock != NULL);
RACSubscriber *o = [RACSubscriber subscriberWithNext:nextBlock error:NULL completed:NULL];
return [self subscribe:o];
}
- (RACDisposable *)subscribe:(id<RACSubscriber>)subscriber {
NSCParameterAssert(subscriber != nil);
RACCompoundDisposable *disposable = [RACCompoundDisposable compoundDisposable];
// 生成訂閱者
subscriber = [[RACPassthroughSubscriber alloc] initWithSubscriber:subscriber signal:self disposable:disposable];
// 添加訂閱者 保存
NSMutableArray *subscribers = self.subscribers;
@synchronized (subscribers) {
[subscribers addObject:subscriber];
}
// 銷毀者銷毀訂閱者
[disposable addDisposable:[RACDisposable disposableWithBlock:^{
@synchronized (subscribers) {
NSUInteger index = [subscribers indexOfObjectWithOptions:NSEnumerationReverse passingTest:^ BOOL (id<RACSubscriber> obj, NSUInteger index, BOOL *stop) {
return obj == subscriber;
}];
if (index != NSNotFound) [subscribers removeObjectAtIndex:index];
}
}]];
return disposable;
}
當(dāng)外部調(diào)用sendNext發(fā)送信號的方法時(shí),它會遍歷之前存放的_subscribers數(shù)組,一一回調(diào)執(zhí)行每個(gè)訂閱者的sendNext方法。
- (void)sendNext:(id)value {
[self enumerateSubscribersUsingBlock:^(id<RACSubscriber> subscriber) {
[subscriber sendNext:value];
}];
}
- (void)enumerateSubscribersUsingBlock:(void (^)(id<RACSubscriber> subscriber))block {
NSArray *subscribers;
@synchronized (self.subscribers) {
subscribers = [self.subscribers copy];
}
// 遍歷訂閱者列表 回調(diào)
for (id<RACSubscriber> subscriber in subscribers) {
block(subscriber);
}
}
由于RACSubject響應(yīng)信號的時(shí)候時(shí)候是遍歷訂閱者,一一響應(yīng),那么我們可以得出多個(gè)訂閱者訂閱同一個(gè)信號,響應(yīng)的時(shí)候不需要多次發(fā)送信號消息。

RACMulticastConnection
RACMulticastConnection,多播信號,用于將一個(gè)信號發(fā)送給多個(gè)訂閱者。通俗的說,就是多個(gè)訂閱者都訂閱了一個(gè)信號,當(dāng)這個(gè)信號發(fā)出消息的時(shí)候,每個(gè)訂閱者都會收到,而不需要單獨(dú)的給每個(gè)訂閱者都發(fā)送一次消息。調(diào)用了-[RACMulticastConnection connect]方法才會開始訂閱多播信號。RACMulticastConnection不應(yīng)該手動(dòng)創(chuàng)建,而是通過-[RACSignal publish]或者-[RACSignal multicast:]方法創(chuàng)建。
- (RACMulticastConnection *)publish {
// 在內(nèi)部會生成一個(gè)手動(dòng)信號
RACSubject *subject = [[RACSubject subject] setNameWithFormat:@"[%@] -publish", self.name];
RACMulticastConnection *connection = [self multicast:subject];
return connection;
}
- (RACMulticastConnection *)multicast:(RACSubject *)subject {
[subject setNameWithFormat:@"[%@] -multicast: %@", self.name, subject.name];
RACMulticastConnection *connection = [[RACMulticastConnection alloc] initWithSourceSignal:self subject:subject];
return connection;
}
- (instancetype)initWithSourceSignal:(RACSignal *)source subject:(RACSubject *)subject {
NSCParameterAssert(source != nil);
NSCParameterAssert(subject != nil);
self = [super init];
_sourceSignal = source; // 原信號
_serialDisposable = [[RACSerialDisposable alloc] init];
_signal = subject; // 新建的手動(dòng)信號
return self;
}
我們再來看看RACMulticastConnection和RACSignal的區(qū)別:
RACMulticastConnection發(fā)送:

RACSignal發(fā)送:

同一個(gè)信號給多個(gè)訂閱者發(fā)送信息的時(shí)候,RACMulticastConnection只需要發(fā)送一次,所有訂閱者都會響應(yīng);而RACSignal則需要給每個(gè)訂閱者都發(fā)送一次。例子中的connection.signal其實(shí)已經(jīng)是內(nèi)部創(chuàng)建的RACSubject,而RACSubject訂閱信號的時(shí)候是把訂閱者都放在一個(gè)數(shù)組里,響應(yīng)信號的時(shí)候則是遍歷這個(gè)數(shù)組挨個(gè)響應(yīng),就不需要我們一一發(fā)信號了。
RACCommand
RACCommand是響應(yīng)某些動(dòng)作(通常與UI相關(guān))而觸發(fā)的信號。在用于與UIKit組件進(jìn)行交互時(shí),RACCommand能夠幫助我們更快的處理并且響應(yīng)任務(wù),減少編碼以及工程的復(fù)雜度。
我們先來看一個(gè)RACCommand的例子:

executionSignals中發(fā)送了新的信號時(shí),switchToLatest方法返回的信號都會訂閱這個(gè)最新的信號,這里也就保證了每次都會打印出最新的信號中的值。executing用來監(jiān)聽是否正在執(zhí)行中,當(dāng)執(zhí)行開始后會置為1、執(zhí)行結(jié)束之后置為0。
那么RACCommand是如何實(shí)現(xiàn)訂閱信號的呢?在其內(nèi)部維護(hù)了一個(gè)RACSubject *addedExecutionSignalsSubject的對象,通過這個(gè)對象,它可以將信號的變化發(fā)給訂閱者。
- (RACSignal *)execute:(id)input {
......
RACSignal *signal = self.signalBlock(input);
// 多播信號發(fā)送信號給訂閱者
RACMulticastConnection *connection = [[signal
subscribeOn:RACScheduler.mainThreadScheduler]
multicast:[RACReplaySubject subject]];
[self.addedExecutionSignalsSubject sendNext:connection.signal];
[connection connect];
return [connection.signal setNameWithFormat:@"%@ -execute: %@", self, RACDescription(input)];
}
RACCommand在初始化的時(shí)候維護(hù)了很多關(guān)于信號的狀態(tài),而這些狀態(tài)可以幫助我們判斷信號的情況。
- (instancetype)initWithSignalBlock:(RACSignal<id> * (^)(id input))signalBlock {
return [self initWithEnabled:nil signalBlock:signalBlock];
}
- (instancetype)initWithEnabled:(RACSignal *)enabledSignal signalBlock:(RACSignal<id> * (^)(id input))signalBlock {
NSCParameterAssert(signalBlock != nil);
self = [super init];
_addedExecutionSignalsSubject = [RACSubject new];
_allowsConcurrentExecutionSubject = [RACSubject new];
_signalBlock = [signalBlock copy];
// 包含信號的信號
// 在每次執(zhí)行 -execute: 方法時(shí),最終都會向 executionSignals 中傳入一個(gè)最新的信號
_executionSignals = ......;
// errorsConnection 發(fā)生錯(cuò)誤的多播信號
RACMulticastConnection *errorsConnection = ......;
// immediateExecuting 是一個(gè)用于表示當(dāng)前是否有任務(wù)執(zhí)行的信號
RACSignal *immediateExecuting = ......;
// _executing是否正在執(zhí)行
_executing = ......;
// moreExecutionsAllowed 是否允許更多操作執(zhí)行的信號
RACSignal *moreExecutionsAllowed = ......;
// 在每次原信號發(fā)送消息時(shí)都會重新計(jì)算
_immediateEnabled = ......;
// 是否可以再次執(zhí)行
_enabled = ......;
return self;
}
總結(jié)
-
RACPassthroughSubscriber類,就是用來訂閱信號、發(fā)送信號任務(wù)的。 -
RACScheduler類,封裝GCD,讓信號任務(wù)同步執(zhí)行。 -
RACDisposable類,銷毀者,取消訂閱,銷毀訂閱信號操作中間生成的一些輔助銷毀對象。 -
RACCompoundDisposable繼承自RACDisposable,是銷毀者集合,它通過_inlineDisposables和_disposables管理著一個(gè)或者多個(gè)RACDisposable。 -
RACObserve:封裝系統(tǒng)的KVO,使用RACKVOTrampoline將觀察的相關(guān)信息封裝起來,使用中間層RACKVOProxy作為觀察者來進(jìn)行觀察,將原來的觀察者維護(hù)在NSMapTable表中,當(dāng)觀察到對象發(fā)生變化時(shí),RACKVOTrampoline通過回調(diào)通知RACKVOWrapper類,然后以發(fā)送信號的方式,通知外界。移除觀察也在RACKVOTrampoline中進(jìn)行處理。 -
RACSubject:訂閱信號的時(shí)候,將生成的RACPassthroughSubscriber訂閱者存在_subscribers數(shù)組里;發(fā)送信號的時(shí)候,遍歷數(shù)組,一一發(fā)放。 -
RACMulticastConnection,多播信號,用于將信號發(fā)送給多個(gè)訂閱者。與RACSignal不同的是,它只需要發(fā)送一次,所有訂閱者都會收到。 -
RACCommand用于表示事件的執(zhí)行,多用于響應(yīng)UI上的某些動(dòng)作。