iOS-RAC綜述

上一篇我們探索了RAC的核心流程就是:

  • 創(chuàng)建信號
  • 訂閱信號
  • 訂閱者發(fā)送信號
  • 銷毀

那么我們根據(jù)這些操作來看看RAC的核心類以及具體實(shí)現(xiàn),主要就是信號、訂閱、銷毀這3個(gè)點(diǎn)。

RACSignal

RACSiganl,信號類,是抽象類RACStream的子類。在RAC中,萬物皆信號,只有有了信號,才會有的一系列操作。一般表示將來有數(shù)據(jù)傳遞,只要有數(shù)據(jù)改變,信號內(nèi)部接收到數(shù)據(jù),就會馬上發(fā)出數(shù)據(jù)。但是它本身不具備發(fā)送信號的能力,而是交給內(nèi)部一個(gè)訂閱者去發(fā)出。

+ (RACSignal *)createSignal:(RACDisposable * (^)(id<RACSubscriber> subscriber))didSubscribe {
    return [RACDynamicSignal createSignal:didSubscribe];
}

而最終的訂閱方法的實(shí)現(xiàn)是由RACDynamicSignal來完成的。該類有2個(gè)方法,一個(gè)就是創(chuàng)建信號,保存信號創(chuàng)建的回調(diào);另一個(gè)就是執(zhí)行訂閱方法的時(shí)候調(diào)用信號創(chuàng)建保存的block,返回銷毀者。

+ (RACSignal *)createSignal:(RACDisposable * (^)(id<RACSubscriber> subscriber))didSubscribe {
    RACDynamicSignal *signal = [[self alloc] init];
    signal->_didSubscribe = [didSubscribe copy];
    return [signal setNameWithFormat:@"+createSignal:"];
}


- (RACDisposable *)subscribe:(id<RACSubscriber>)subscriber {
    NSCParameterAssert(subscriber != nil);

    RACCompoundDisposable *disposable = [RACCompoundDisposable compoundDisposable];
    subscriber = [[RACPassthroughSubscriber alloc] initWithSubscriber:subscriber signal:self disposable:disposable];

    if (self.didSubscribe != NULL) {
        RACDisposable *schedulingDisposable = [RACScheduler.subscriptionScheduler schedule:^{
            RACDisposable *innerDisposable = self.didSubscribe(subscriber);
            [disposable addDisposable:innerDisposable];
        }];

        [disposable addDisposable:schedulingDisposable];
    }
    
    return disposable;
}

RACPassthroughSubscriber

核心訂閱類RACPassthroughSubscriber實(shí)現(xiàn)了一個(gè)協(xié)議RACSubscriber,實(shí)現(xiàn)了該協(xié)議就有訂閱信號,接收值處理值的能力。

- (instancetype)initWithSubscriber:(id<RACSubscriber>)subscriber signal:(RACSignal *)signal disposable:(RACCompoundDisposable *)disposable {
    NSCParameterAssert(subscriber != nil);

    self = [super init];

    _innerSubscriber = subscriber;
    _signal = signal;
    _disposable = disposable;

    [self.innerSubscriber didSubscribeWithDisposable:self.disposable];
    return self;
}
  • subscriber:核心訂閱者
  • signal:信號
  • disposable:銷毀者

那我們再來看看, RACSubscriber這個(gè)協(xié)議都定義了那些方法

  1. 保存nextBlock、錯(cuò)誤信息、完成信息。
+ (instancetype)subscriberWithNext:(void (^)(id x))next error:(void (^)(NSError *error))error completed:(void (^)(void))completed {
    RACSubscriber *subscriber = [[self alloc] init];

    subscriber->_next = [next copy];
    subscriber->_error = [error copy];
    subscriber->_completed = [completed copy];

    return subscriber;
}
  1. 發(fā)送正常的信號,然后執(zhí)行數(shù)據(jù)的回調(diào)。
- (void)sendNext:(id)value {
    @synchronized (self) {
        void (^nextBlock)(id) = [self.next copy];
        if (nextBlock == nil) return;

        nextBlock(value);
    }
}
  1. 發(fā)送錯(cuò)誤,會銷毀信號,然后回調(diào)錯(cuò)誤,以后再也不接收這個(gè)信號的變化了。
- (void)sendError:(NSError *)e {
    @synchronized (self) {
        void (^errorBlock)(NSError *) = [self.error copy];
        [self.disposable dispose];

        if (errorBlock == nil) return;
        errorBlock(e);
    }
}
  1. 發(fā)送完成信號,會銷毀信號,然后回調(diào)完成的操作,同樣,以后再也不接收這個(gè)信號的變化了。
- (void)sendCompleted {
    @synchronized (self) {
        void (^completedBlock)(void) = [self.completed copy];
        [self.disposable dispose];

        if (completedBlock == nil) return;
        completedBlock();
    }
}

可以看到sendErrorsendCompleted是在方法里就會銷毀,那么sendNext是什么時(shí)候銷毀呢?就是調(diào)用者銷毀的時(shí)候,也就是我們所說的self或者viewController銷毀的時(shí)候。就會調(diào)用到RACSubscriber重寫的dealloc方法。

- (void)dealloc {
    [self.disposable dispose];
}

RACScheduler

Schedulers are used to control when and where work is performed.

調(diào)度者,用于控制工作何時(shí)何地執(zhí)行。其實(shí),它就是對GCD的封裝。

當(dāng)我們在- (RACDisposable *)subscribe:(id<RACSubscriber>)subscriber方法中調(diào)用schedule方法的時(shí)候,會判斷是否有currentScheduler,如果有執(zhí)行schedule中的block,沒有的話就使用自己定義的backgroundScheduler,進(jìn)行處理block。

- (RACDisposable *)schedule:(void (^)(void))block {
    NSCParameterAssert(block != NULL);

    if (RACScheduler.currentScheduler == nil) return [self.backgroundScheduler schedule:block];

    block();
    return nil;
}

會先從threadDictionary里面獲取RACScheduler,如果獲取的值不為nil就將其值返回,用來調(diào)用信號的處理。如果為空的話,再來判斷當(dāng)前的任務(wù)是否在主隊(duì)列或者主線程,如果是的話就返回自己封裝的GCD主隊(duì)列用來調(diào)用。

+ (RACScheduler *)currentScheduler {
    RACScheduler *scheduler = NSThread.currentThread.threadDictionary[RACSchedulerCurrentSchedulerKey];
    if (scheduler != nil) return scheduler;
    if ([self.class isOnMainThread]) return RACScheduler.mainThreadScheduler;

    return nil;
}

+ (BOOL)isOnMainThread {
    return [NSOperationQueue.currentQueue isEqual:NSOperationQueue.mainQueue] || [NSThread isMainThread];
}

+ (RACScheduler *)mainThreadScheduler {
    static dispatch_once_t onceToken;
    static RACScheduler *mainThreadScheduler;
    dispatch_once(&onceToken, ^{
        mainThreadScheduler = [[RACTargetQueueScheduler alloc] initWithName:@"org.reactivecocoa.ReactiveObjC.RACScheduler.mainThreadScheduler" targetQueue:dispatch_get_main_queue()];
    });
    
    return mainThreadScheduler;
}

_backgroundScheduler其實(shí)也是一個(gè)默認(rèn)優(yōu)先級的全局串行隊(duì)列,它是由RACTargetQueueScheduler生成的實(shí)例。

RACTargetQueueScheduler, A scheduler that enqueues blocks on a private serial queue, targeting an arbitrary GCD queue.

RACTargetQueueScheduler是在任意的GCD的線程中創(chuàng)建一個(gè)串行隊(duì)列,然后將這些待處理的block都放在這個(gè)串行對劣質(zhì),讓調(diào)度者處理的block能夠同步執(zhí)行,也就是讓信號的任務(wù)同步執(zhí)行。

_backgroundScheduler = [RACScheduler scheduler];

+ (RACScheduler *)scheduler {
    return [self schedulerWithPriority:RACSchedulerPriorityDefault];
}

+ (RACScheduler *)schedulerWithPriority:(RACSchedulerPriority)priority {
    return [self schedulerWithPriority:priority name:@"org.reactivecocoa.ReactiveObjC.RACScheduler.backgroundScheduler"];
}

// 創(chuàng)建一個(gè)全局的并發(fā)隊(duì)列
+ (RACScheduler *)schedulerWithPriority:(RACSchedulerPriority)priority name:(NSString *)name {
    return [[RACTargetQueueScheduler alloc] initWithName:name targetQueue:dispatch_get_global_queue(priority, 0)];
}

- (instancetype)initWithName:(NSString *)name targetQueue:(dispatch_queue_t)targetQueue {
    NSCParameterAssert(targetQueue != NULL);

    if (name == nil) {
        name = [NSString stringWithFormat:@"org.reactivecocoa.ReactiveObjC.RACTargetQueueScheduler(%s)", dispatch_queue_get_label(targetQueue)];
    }

    // 此處使用串行隊(duì)列的原因就是想要信號一個(gè)一個(gè)的處理
    dispatch_queue_t queue = dispatch_queue_create(name.UTF8String, DISPATCH_QUEUE_SERIAL);
    if (queue == NULL) return nil;

    // 防止信號并發(fā)處理
    dispatch_set_target_queue(queue, targetQueue);

    return [super initWithName:name queue:queue];
}

全局串行隊(duì)列調(diào)度處理的方式如下:

- (RACDisposable *)schedule:(void (^)(void))block {
    NSCParameterAssert(block != NULL);

    RACDisposable *disposable = [[RACDisposable alloc] init];

    dispatch_async(self.queue, ^{
        if (disposable.disposed) return;
        [self performAsCurrentScheduler:block];
    });

    return disposable;
}

- (void)performAsCurrentScheduler:(void (^)(void))block {
    NSCParameterAssert(block != NULL);

    // 獲取當(dāng)前的調(diào)度者 
    RACScheduler *previousScheduler = RACScheduler.currentScheduler;
    NSThread.currentThread.threadDictionary[RACSchedulerCurrentSchedulerKey] = self;

    // 先執(zhí)行調(diào)度的block
    autoreleasepool {
        block();
    }

    if (previousScheduler != nil) {
        // 如果不是空的 放入threadDictionary
        NSThread.currentThread.threadDictionary[RACSchedulerCurrentSchedulerKey] = previousScheduler;
    } else {
        // 如果是空的就移除threadDictionary中對應(yīng)的值
        // 執(zhí)行下一次schedule操作的又會進(jìn)入一次新的判斷
        [NSThread.currentThread.threadDictionary removeObjectForKey:RACSchedulerCurrentSchedulerKey];
    }
}

驗(yàn)證一下:

  1. 主線程執(zhí)行調(diào)度
主線程調(diào)度.png
  1. 子線程執(zhí)行調(diào)度
子線程調(diào)度.png

可以看出當(dāng)在主線程中使用信號的時(shí)候,調(diào)度者是把主線程的中的信號任務(wù)加入到主線程串行隊(duì)列中執(zhí)行,而在子線程中使用信號的時(shí)候,調(diào)度者會自己創(chuàng)建一個(gè)子線程,并把任務(wù)都加入到自己創(chuàng)建的這個(gè)子線程同步隊(duì)列中執(zhí)行。整個(gè)流程大致如下:

RACScheduler.png

RACDisposable & RACCompoundDisposable

RACDisposable,銷毀者,取消訂閱,銷毀訂閱信號操作中間生成的一些輔助銷毀對象。
RACCompoundDisposable繼承自RACDisposable,它和RACDisposable的區(qū)別是RACDisposable是單個(gè)銷毀者,RACCompoundDisposable是銷毀者集合,它管理著一個(gè)或者多個(gè)RACDisposable

- (instancetype)initWithBlock:(void (^)(void))block {
    NSCParameterAssert(block != nil);

    self = [super init];
    
    // 保存銷毀后續(xù)操作的block
    _disposeBlock = (void *)CFBridgingRetain([block copy]); 
    OSMemoryBarrier();

    return self;
}

+ (instancetype)disposableWithBlock:(void (^)(void))block {
    return [[self alloc] initWithBlock:block];
}

- (void)dealloc {
    if (_disposeBlock == NULL || _disposeBlock == (__bridge void *)self) return;

    CFRelease(_disposeBlock);
    _disposeBlock = NULL;
}

- (void)dispose {
    void (^disposeBlock)(void) = NULL;

    while (YES) {
        void *blockPtr = _disposeBlock;
        // 防止多線程操作引發(fā)的錯(cuò)誤 
        if (OSAtomicCompareAndSwapPtrBarrier(blockPtr, NULL, &_disposeBlock)) {
            if (blockPtr != (__bridge void *)self) {
                disposeBlock = CFBridgingRelease(blockPtr);
            }
            break;
        }
    }
    
    // 執(zhí)行銷毀后續(xù)的block
    if (disposeBlock != nil) disposeBlock();
}

當(dāng)訂閱者調(diào)用dealloc方法的時(shí)候執(zhí)行[self.disposable dispose],由于銷毀者都被RACCompoundDisposable管理,就會調(diào)用[RACCompoundDisposable dispose],在這個(gè)方法中,遍歷管理的銷毀者,找到對應(yīng)需要銷毀的,然后調(diào)用[RACDisposable dispose]進(jìn)行銷毀。

+ (instancetype)compoundDisposable {
    return [[self alloc] initWithDisposables:nil];
}

+ (instancetype)compoundDisposableWithDisposables:(NSArray *)disposables {
    return [[self alloc] initWithDisposables:disposables];
}

- (instancetype)init {
    self = [super init];
    const int result __attribute__((unused)) = pthread_mutex_init(&_mutex, NULL);
    return self;
}

- (instancetype)initWithDisposables:(NSArray *)otherDisposables {
    self = [self init];

    // 將所有disposable保存到_inlineDisposables
#if RACCompoundDisposableInlineCount
    [otherDisposables enumerateObjectsUsingBlock:^(RACDisposable *disposable, NSUInteger index, BOOL *stop) {
        self->_inlineDisposables[index] = disposable;
        if (index == RACCompoundDisposableInlineCount - 1) *stop = YES;
    }];
#endif
    // 相對應(yīng)的添加操作
    if (otherDisposables.count > RACCompoundDisposableInlineCount) {
        _disposables = RACCreateDisposablesArray();

        CFRange range = CFRangeMake(RACCompoundDisposableInlineCount, (CFIndex)otherDisposables.count - RACCompoundDisposableInlineCount);
            CFArrayAppendArray(_disposables, (__bridge CFArrayRef)otherDisposables, range);
    }
    return self;
}

- (instancetype)initWithBlock:(void (^)(void))block {
    RACDisposable *disposable = [RACDisposable disposableWithBlock:block];
    return [self initWithDisposables:@[ disposable ]];
}

// 向RACCompoundDisposable管理集合中添加 RACDisposable
// 將disposable添加到_inlineDisposables
// 如果_inlineDisposables滿了就添加到_disposables
- (void)addDisposable:(RACDisposable *)disposable {
    NSCParameterAssert(disposable != self);
    if (disposable == nil || disposable.disposed) return;

    BOOL shouldDispose = NO;

    pthread_mutex_lock(&_mutex);
    {
        if (_disposed) {
            shouldDispose = YES;
        } else {
        #if RACCompoundDisposableInlineCount
            for (unsigned i = 0; i < RACCompoundDisposableInlineCount; i++) {
                if (_inlineDisposables[i] == nil) {
                    _inlineDisposables[i] = disposable;
                    goto foundSlot;
                }
            }
        #endif

            if (_disposables == NULL) 
                _disposables = RACCreateDisposablesArray();
            CFArrayAppendValue(_disposables, (__bridge void *)disposable);

            if (RACCOMPOUNDDISPOSABLE_ADDED_ENABLED()) {
                RACCOMPOUNDDISPOSABLE_ADDED(self.description.UTF8String, disposable.description.UTF8String, CFArrayGetCount(_disposables) + RACCompoundDisposableInlineCount);
            }

    #if RACCompoundDisposableInlineCount
        foundSlot:;
    #endif
        }
        
    }
    pthread_mutex_unlock(&_mutex);

    if (shouldDispose) [disposable dispose];
}

// 從RACCompoundDisposable管理集合中移除 RACDisposable 
// 刪除_inlineDisposables對應(yīng)的disposable
- (void)removeDisposable:(RACDisposable *)disposable {
    if (disposable == nil) return;

    pthread_mutex_lock(&_mutex);
    {
        if (!_disposed) {
        // 移除_inlineDisposables中的RACDisposable
        #if RACCompoundDisposableInlineCount
            for (unsigned i = 0; i < RACCompoundDisposableInlineCount; i++) {
                if (_inlineDisposables[i] == disposable)
                    _inlineDisposables[i] = nil;
            }
        #endif
            
            // 移除_disposables中的RACDisposable
            if (_disposables != NULL) {
                CFIndex count = CFArrayGetCount(_disposables);
                for (CFIndex i = count - 1; i >= 0; i--) {
                    const void *item = CFArrayGetValueAtIndex(_disposables, i);
                    if (item == (__bridge void *)disposable) {
                        CFArrayRemoveValueAtIndex(_disposables, i);
                    }
                }

                if (RACCOMPOUNDDISPOSABLE_REMOVED_ENABLED()) {
                    RACCOMPOUNDDISPOSABLE_REMOVED(self.description.UTF8String, disposable.description.UTF8String, CFArrayGetCount(_disposables) + RACCompoundDisposableInlineCount);
                }
            }
        }
    }
    pthread_mutex_unlock(&_mutex);
}

// 對_disposables中的RACDisposable進(jìn)行銷毀
static void disposeEach(const void *value, void *context) {
    RACDisposable *disposable = (__bridge id)value;
    [disposable dispose];
}

// 遍歷_inlineDisposables中的RACDisposable,進(jìn)行銷毀
// 如果_disposables中存在RACDisposable,也需要進(jìn)行遍歷,銷毀
// 銷毀的操作就是將原來的值置為nil
- (void)dispose {
    #if RACCompoundDisposableInlineCount
        RACDisposable *inlineCopy[RACCompoundDisposableInlineCount];
    #endif

    CFArrayRef remainingDisposables = NULL;

    pthread_mutex_lock(&_mutex);
    {
        _disposed = YES;

        #if RACCompoundDisposableInlineCount
            for (unsigned i = 0; i < RACCompoundDisposableInlineCount; i++) {
                inlineCopy[i] = _inlineDisposables[i];
                _inlineDisposables[i] = nil;
            }
        #endif

        remainingDisposables = _disposables;
        _disposables = NULL;
    }
    pthread_mutex_unlock(&_mutex);
    
    
    // 對_inlineDisposables中的RACDisposable 進(jìn)行銷毀
    #if RACCompoundDisposableInlineCount
    for (unsigned i = 0; i < RACCompoundDisposableInlineCount; i++) {
        [inlineCopy[i] dispose];
    }
    #endif

    if (remainingDisposables == NULL) return;

    CFIndex count = CFArrayGetCount(remainingDisposables);
    CFArrayApplyFunction(remainingDisposables, CFRangeMake(0, count), &disposeEach, NULL);
    CFRelease(remainingDisposables);
}

#define RACCompoundDisposableInlineCount 2

RACCompoundDisposable通過2種方式對RACDisposable實(shí)例進(jìn)行管理:

  • _inlineDisposablesRACDisposable *_inlineDisposables[RACCompoundDisposableInlineCount],這是一種快捷方式,但是只能添加2個(gè),一旦滿了,就需要?jiǎng)?chuàng)建_disposables這個(gè)數(shù)組來存儲。對_inlineDisposables的操作需要加鎖。
  • _disposablesCFMutableArrayRef _disposables,這個(gè)數(shù)組是作為一個(gè)備用數(shù)組,當(dāng)_inlineDisposables存滿了就需要使用這個(gè)數(shù)組來存儲RACDisposable。操作需要加鎖。當(dāng)_disposables也存滿了就需要將保存的disposable銷毀。

在上述例子,我們在銷毀信號的地方設(shè)置一個(gè)斷點(diǎn),查看一下堆棧信息:

銷毀者流程.png

這也驗(yàn)證了我們上述的流程。

RACObserve

RACObserve是對KVO的封裝,下面我們來看看其具體實(shí)現(xiàn)。

#define RACObserve(TARGET, KEYPATH) _RACObserve(TARGET, KEYPATH)

#define _RACObserve(TARGET, KEYPATH) \
({ \
    __weak id target_ = (TARGET); \
    [target_ rac_valuesForKeyPath:@keypath(TARGET, KEYPATH) observer:self]; \
})

- (RACSignal *)rac_valuesForKeyPath:(NSString *)keyPath observer:(__weak NSObject *)observer {
    return [[[self
        rac_valuesAndChangesForKeyPath:keyPath options:NSKeyValueObservingOptionInitial observer:observer]
        map:^(RACTuple *value) {
            return value[0];
    }]
    setNameWithFormat:@"RACObserve(%@, %@)", RACDescription(self), keyPath];
}

- (RACSignal *)rac_valuesAndChangesForKeyPath:(NSString *)keyPath options:(NSKeyValueObservingOptions)options observer:(__weak NSObject *)weakObserver {
    NSObject *strongObserver = weakObserver;
    keyPath = [keyPath copy];
    
    __weak NSObject *weakSelf = self;
    
    // target的銷毀信號,不再訂閱信號
    RACSignal *deallocSignal = [[RACSignal
        zip:@[
            self.rac_willDeallocSignal,
            strongObserver.rac_willDeallocSignal ?: [RACSignal never]
        ]]
        doCompleted:^{
            ......
        }];

    return [[[RACSignal
        createSignal:^ RACDisposable * (id<RACSubscriber> subscriber) {
            
            ......
            // 觀察者銷毀 訂閱者也發(fā)送完成信號
            if (self == nil) {
                [subscriber sendCompleted];
                return nil; 
            }

            return [self rac_observeKeyPath:keyPath options:options observer:observer block:^(id value, NSDictionary *change, BOOL causedByDealloc, BOOL affectedOnlyLastComponent) {
                // 如果KVO檢測到被觀察者發(fā)生變化,則通過訂閱者發(fā)送信號數(shù)據(jù)
                [subscriber sendNext:RACTuplePack(value, change)];
            }];
        }]
        // 銷毀信號之后不再訂閱信號
        takeUntil:deallocSignal]];
}

- (RACDisposable *)rac_observeKeyPath:(NSString *)keyPath options:(NSKeyValueObservingOptions)options observer:(__weak NSObject *)weakObserver block:(void (^)(id, NSDictionary *, BOOL, BOOL))block {
    ......

    keyPath = [keyPath copy];

    NSObject *strongObserver = weakObserver;
    
    // 對keyPath的處理
    NSArray *keyPathComponents = keyPath.rac_keyPathComponents;
    BOOL keyPathHasOneComponent = (keyPathComponents.count == 1);
    NSString *keyPathHead = keyPathComponents[0];
    NSString *keyPathTail = keyPath.rac_keyPathByDeletingFirstKeyPathComponent;

    // 初始化銷毀者管理集合 
    RACCompoundDisposable *disposable = [RACCompoundDisposable compoundDisposable];

    BOOL shouldAddDeallocObserver = NO;

    // 獲取觀察者的屬性 這也就是為什么我們能在RACObserve()里面獲取到響應(yīng)屬性的原因
    objc_property_t property = class_getProperty(object_getClass(self), keyPathHead.UTF8String);
    if (property != NULL) {
        rac_propertyAttributes *attributes = rac_copyPropertyAttributes(property);
        if (attributes != NULL) {
            shouldAddDeallocObserver = isObject && isWeak && !isBlock && !isProtocol;
        }
    }

    // dealloc觀察者訂閱信號的銷毀操作
    void (^addDeallocObserverToPropertyValue)(NSObject *) = ^(NSObject *value) {
        .......
        RACDisposable *deallocDisposable = [RACDisposable disposableWithBlock:^{
            block(nil, change, YES, keyPathHasOneComponent);
        }];

        [valueDisposable addDisposable:deallocDisposable];
        [firstComponentDisposable() addDisposable:[RACDisposable disposableWithBlock:^{
            [valueDisposable removeDisposable:deallocDisposable];
        }]];
    };

    // 將回調(diào)塊添加到值的dealloc操作中,把清理回調(diào)的邏輯添加到銷毀者
    void (^addObserverToValue)(NSObject *) = ^(NSObject *value) {
        RACDisposable *observerDisposable = [value rac_observeKeyPath:keyPathTail options:(options & ~NSKeyValueObservingOptionInitial) observer:weakObserver block:block];
            [firstComponentDisposable() addDisposable:observerDisposable];
    };

    NSKeyValueObservingOptions trampolineOptions = (options | NSKeyValueObservingOptionPrior) & ~NSKeyValueObservingOptionInitial;
    
    // 將觀察的屬性封裝起來 進(jìn)行觀察
    // RACKVOTrampoline 是KVO的觀察者 一旦被dispose 觀察就會停止
    RACKVOTrampoline *trampoline = [[RACKVOTrampoline alloc] initWithTarget:self observer:strongObserver keyPath:keyPathHead options:trampolineOptions block:^(id trampolineTarget, id trampolineObserver, NSDictionary *change) {
            // 觀察者發(fā)生變化的回調(diào)
            // 之后再通過sendNext的方式傳給訂閱者
            ......
            block([value valueForKeyPath:keyPathTail], change, NO, keyPathHasOneComponent);
            ......
    }];
    
    // 當(dāng)銷毀trampoline的時(shí)候就停止KVO觀察
    [disposable addDisposable:trampoline];
    
    // 如果需要的話 返回初始化的值
    block(initialValue, initialChange, NO, keyPathHasOneComponent);

    ......
    
    // 銷毀對應(yīng)的觀察
    return [RACDisposable disposableWithBlock:^{
        [disposable dispose];
        [observerDisposable removeDisposable:disposable];
        [selfDisposable removeDisposable:disposable];
    }];
}

// RACKVOTrampoline繼承自RACDisposable,是一個(gè)KVO觀察
// 將被觀察者封裝起來
- (instancetype)initWithTarget:(__weak NSObject *)target observer:(__weak NSObject *)observer keyPath:(NSString *)keyPath options:(NSKeyValueObservingOptions)options block:(RACKVOBlock)block {

    NSObject *strongTarget = target;
    if (strongTarget == nil) return nil;

    self = [super init];

    // 保存觀察的相關(guān)信息:keyPath、回調(diào)block、被觀察者、觀察者
    _keyPath = [keyPath copy];
    _block = [block copy];
    _weakTarget = target;
    _unsafeTarget = strongTarget;
    _observer = observer;

    // 使用中間代理層來響應(yīng)KVO
    // strongTarget 被觀察對象
    // RACKVOProxy.sharedProxy 觀察者
    // 在RACKVOProxy.sharedP內(nèi)部維護(hù)一張NSMapTable 把所有的觀察者都存在這個(gè)表里
    [RACKVOProxy.sharedProxy addObserver:self forContext:(__bridge void *)self];
    [strongTarget addObserver:RACKVOProxy.sharedProxy forKeyPath:self.keyPath options:options context:(__bridge void *)self];

    [strongTarget.rac_deallocDisposable addDisposable:self];
    [self.observer.rac_deallocDisposable addDisposable:self];

    return self;
}

// 響應(yīng)KVO,被觀察者變化的回調(diào)
- (void)observeValueForKeyPath:(NSString *)keyPath ofObject:(id)object change:(NSDictionary *)change context:(void *)context {
    RACKVOBlock block;
    id observer;
    id target;

    @synchronized (self) {
        block = self.block;
        observer = self.observer;
        target = self.weakTarget;
    }

    if (block == nil || target == nil) return;

    // 觀察到被觀察者變化的回調(diào)block
    block(target, observer, change);
}

- (void)dispose {
    NSObject *target;
    NSObject *observer;

    @synchronized (self) {
        _block = nil;
        target = self.unsafeTarget;
        observer = self.observer;

        _unsafeTarget = nil;
        _observer = nil;
    }

    [target.rac_deallocDisposable removeDisposable:self];
    [observer.rac_deallocDisposable removeDisposable:self];
    
    // 以下兩步就確定了RACKVO不需要手動(dòng)移除觀察者
    // 移除中間層的觀察
    [target removeObserver:RACKVOProxy.sharedProxy forKeyPath:self.keyPath context:(__bridge void *)self];
    // 中間層移除觀察者
    [RACKVOProxy.sharedProxy removeObserver:self forContext:(__bridge void *)self];
}

總結(jié)一下:

  • RACObserve是封裝的一個(gè)宏定義,這個(gè)宏定義的實(shí)現(xiàn)在NSObject的一個(gè)名為RACPropertySubscribing的分類,
  • RACPropertySubscribing中主要的處理:
    • 當(dāng)收到釋放訂閱觀察的信號就不再訂閱、
    • 當(dāng)前類被銷毀的時(shí)候,訂閱者發(fā)送完成的信號
  • 進(jìn)入NSObjectRACKVOWrapper分類中,主要處理如下:
    • keyPath的處理,比如二級路由等
    • 初始化RACCompoundDisposable管理銷毀者
    • 獲取觀察者的屬性
    • 處理一些關(guān)于dealloc的銷毀操作
  • 進(jìn)入RACKVOTrampoline這個(gè)類:
    • 包裝觀察需要的相關(guān)屬性
    • 使用中間層RACKVOProxy來觀察被觀察的對象,而不是直接使用觀察者,將觀察者保存在RACKVOProxy
    • RACKVOTrampoline被釋放的時(shí)候,移除觀察者
    • 回調(diào)給外界觀察者發(fā)生的變化
  • RACKVOProxy:中間層,充當(dāng)觀察者
    • 維護(hù)一張表NSMapTable來保存原來的觀察者
    • 響應(yīng)系統(tǒng)被觀察者發(fā)生變化的回調(diào)

被觀察者的變化則是通過訂閱者發(fā)送給外界:

[subscriber sendNext:RACTuplePack(value, change)]

移除觀察者的流程則是當(dāng)觀察者被銷毀的時(shí)候,就會銷毀相關(guān)的臨時(shí)變量,也就是觀察的銷毀者等,這樣就會調(diào)用到RACKVOTrampoline這個(gè)類的dispose方法,移除觀察者。

RACObserve.png

RACSubject

RACSubject可以看做是一種可以手動(dòng)控制發(fā)送next, completed, and error的信號,對橋接RAC和非RAC很有幫助。它RACSignal的子類,還實(shí)現(xiàn)了RACSubscriber協(xié)議。
它有以下特點(diǎn):

  • 能夠手動(dòng)發(fā)送事件
  • 能夠訂閱信號
RACSubject.png

那么,它是如何做到又能發(fā)送信號,又能訂閱信號的呢?我們來看看它的內(nèi)部實(shí)現(xiàn)。

@property (nonatomic, strong, readonly) NSMutableArray *subscribers;
@property (nonatomic, strong, readonly) RACCompoundDisposable *disposable;

+ (instancetype)subject {
    return [[self alloc] init];
}

- (instancetype)init {
    self = [super init];
    if (self == nil) return nil;

    _disposable = [RACCompoundDisposable compoundDisposable];
    _subscribers = [[NSMutableArray alloc] initWithCapacity:1];
    
    return self;
}

它的內(nèi)部維護(hù)了一個(gè)銷毀者集合和一個(gè)訂閱者數(shù)組。這個(gè)訂閱者的數(shù)組就是實(shí)現(xiàn)訂閱的關(guān)鍵。

外部調(diào)用訂閱方法的時(shí)候,就會把生成的訂閱者存放在_subscribers這個(gè)數(shù)組里面。

- (RACDisposable *)subscribeNext:(void (^)(id x))nextBlock {
    NSCParameterAssert(nextBlock != NULL);
    
    RACSubscriber *o = [RACSubscriber subscriberWithNext:nextBlock error:NULL completed:NULL];
    return [self subscribe:o];
}

- (RACDisposable *)subscribe:(id<RACSubscriber>)subscriber {
    NSCParameterAssert(subscriber != nil);

    RACCompoundDisposable *disposable = [RACCompoundDisposable compoundDisposable];
    // 生成訂閱者
    subscriber = [[RACPassthroughSubscriber alloc] initWithSubscriber:subscriber signal:self disposable:disposable];
    
    // 添加訂閱者 保存
    NSMutableArray *subscribers = self.subscribers;
    @synchronized (subscribers) {
        [subscribers addObject:subscriber];
    }
    
    // 銷毀者銷毀訂閱者
    [disposable addDisposable:[RACDisposable disposableWithBlock:^{
        @synchronized (subscribers) {
            NSUInteger index = [subscribers indexOfObjectWithOptions:NSEnumerationReverse passingTest:^ BOOL (id<RACSubscriber> obj, NSUInteger index, BOOL *stop) {
                return obj == subscriber;
            }];

            if (index != NSNotFound) [subscribers removeObjectAtIndex:index];
        }
    }]];

    return disposable;
}

當(dāng)外部調(diào)用sendNext發(fā)送信號的方法時(shí),它會遍歷之前存放的_subscribers數(shù)組,一一回調(diào)執(zhí)行每個(gè)訂閱者的sendNext方法。

- (void)sendNext:(id)value {
    [self enumerateSubscribersUsingBlock:^(id<RACSubscriber> subscriber) {
        [subscriber sendNext:value];
    }];
}

- (void)enumerateSubscribersUsingBlock:(void (^)(id<RACSubscriber> subscriber))block {
    NSArray *subscribers;
    @synchronized (self.subscribers) {
        subscribers = [self.subscribers copy];
    }
    
    // 遍歷訂閱者列表 回調(diào)
    for (id<RACSubscriber> subscriber in subscribers) {
        block(subscriber);
    }
}

由于RACSubject響應(yīng)信號的時(shí)候時(shí)候是遍歷訂閱者,一一響應(yīng),那么我們可以得出多個(gè)訂閱者訂閱同一個(gè)信號,響應(yīng)的時(shí)候不需要多次發(fā)送信號消息。

multisubjectTest.png

RACMulticastConnection

RACMulticastConnection,多播信號,用于將一個(gè)信號發(fā)送給多個(gè)訂閱者。通俗的說,就是多個(gè)訂閱者都訂閱了一個(gè)信號,當(dāng)這個(gè)信號發(fā)出消息的時(shí)候,每個(gè)訂閱者都會收到,而不需要單獨(dú)的給每個(gè)訂閱者都發(fā)送一次消息。調(diào)用了-[RACMulticastConnection connect]方法才會開始訂閱多播信號。RACMulticastConnection不應(yīng)該手動(dòng)創(chuàng)建,而是通過-[RACSignal publish]或者-[RACSignal multicast:]方法創(chuàng)建。

- (RACMulticastConnection *)publish {
    // 在內(nèi)部會生成一個(gè)手動(dòng)信號
    RACSubject *subject = [[RACSubject subject] setNameWithFormat:@"[%@] -publish", self.name];
    RACMulticastConnection *connection = [self multicast:subject];
    return connection;
}

- (RACMulticastConnection *)multicast:(RACSubject *)subject {
    [subject setNameWithFormat:@"[%@] -multicast: %@", self.name, subject.name];
    RACMulticastConnection *connection = [[RACMulticastConnection alloc] initWithSourceSignal:self subject:subject];
    return connection;
}

- (instancetype)initWithSourceSignal:(RACSignal *)source subject:(RACSubject *)subject {
    NSCParameterAssert(source != nil);
    NSCParameterAssert(subject != nil);

    self = [super init];
    
    _sourceSignal = source; // 原信號
    _serialDisposable = [[RACSerialDisposable alloc] init];
    _signal = subject; // 新建的手動(dòng)信號
    
    return self;
}

我們再來看看RACMulticastConnectionRACSignal的區(qū)別:

RACMulticastConnection發(fā)送:

RACMulticastConnection.png

RACSignal發(fā)送:

RACSignal多次發(fā)送.png

同一個(gè)信號給多個(gè)訂閱者發(fā)送信息的時(shí)候,RACMulticastConnection只需要發(fā)送一次,所有訂閱者都會響應(yīng);而RACSignal則需要給每個(gè)訂閱者都發(fā)送一次。例子中的connection.signal其實(shí)已經(jīng)是內(nèi)部創(chuàng)建的RACSubject,而RACSubject訂閱信號的時(shí)候是把訂閱者都放在一個(gè)數(shù)組里,響應(yīng)信號的時(shí)候則是遍歷這個(gè)數(shù)組挨個(gè)響應(yīng),就不需要我們一一發(fā)信號了。

RACCommand

RACCommand是響應(yīng)某些動(dòng)作(通常與UI相關(guān))而觸發(fā)的信號。在用于與UIKit組件進(jìn)行交互時(shí),RACCommand能夠幫助我們更快的處理并且響應(yīng)任務(wù),減少編碼以及工程的復(fù)雜度。

我們先來看一個(gè)RACCommand的例子:

RACCommand.png

executionSignals中發(fā)送了新的信號時(shí),switchToLatest方法返回的信號都會訂閱這個(gè)最新的信號,這里也就保證了每次都會打印出最新的信號中的值。executing用來監(jiān)聽是否正在執(zhí)行中,當(dāng)執(zhí)行開始后會置為1、執(zhí)行結(jié)束之后置為0。

那么RACCommand是如何實(shí)現(xiàn)訂閱信號的呢?在其內(nèi)部維護(hù)了一個(gè)RACSubject *addedExecutionSignalsSubject的對象,通過這個(gè)對象,它可以將信號的變化發(fā)給訂閱者。

- (RACSignal *)execute:(id)input {
    ......
    RACSignal *signal = self.signalBlock(input);

    // 多播信號發(fā)送信號給訂閱者
    RACMulticastConnection *connection = [[signal
        subscribeOn:RACScheduler.mainThreadScheduler]
        multicast:[RACReplaySubject subject]];
    
    [self.addedExecutionSignalsSubject sendNext:connection.signal];

    [connection connect];
    return [connection.signal setNameWithFormat:@"%@ -execute: %@", self, RACDescription(input)];
}

RACCommand在初始化的時(shí)候維護(hù)了很多關(guān)于信號的狀態(tài),而這些狀態(tài)可以幫助我們判斷信號的情況。

- (instancetype)initWithSignalBlock:(RACSignal<id> * (^)(id input))signalBlock {
    return [self initWithEnabled:nil signalBlock:signalBlock];
}

- (instancetype)initWithEnabled:(RACSignal *)enabledSignal signalBlock:(RACSignal<id> * (^)(id input))signalBlock {
    NSCParameterAssert(signalBlock != nil);

    self = [super init];
    
    _addedExecutionSignalsSubject = [RACSubject new];
    _allowsConcurrentExecutionSubject = [RACSubject new];
    _signalBlock = [signalBlock copy];  
    
    // 包含信號的信號
    // 在每次執(zhí)行 -execute: 方法時(shí),最終都會向 executionSignals 中傳入一個(gè)最新的信號
    _executionSignals = ......;
    
    // errorsConnection 發(fā)生錯(cuò)誤的多播信號
    RACMulticastConnection *errorsConnection = ......;

    // immediateExecuting 是一個(gè)用于表示當(dāng)前是否有任務(wù)執(zhí)行的信號
    RACSignal *immediateExecuting = ......;

    // _executing是否正在執(zhí)行
    _executing = ......;
    
    // moreExecutionsAllowed 是否允許更多操作執(zhí)行的信號
    RACSignal *moreExecutionsAllowed = ......;
    
    // 在每次原信號發(fā)送消息時(shí)都會重新計(jì)算
    _immediateEnabled = ......;
    
    // 是否可以再次執(zhí)行
    _enabled = ......;

    return self;
}

總結(jié)

  • RACPassthroughSubscriber類,就是用來訂閱信號、發(fā)送信號任務(wù)的。
  • RACScheduler類,封裝GCD,讓信號任務(wù)同步執(zhí)行。
  • RACDisposable類,銷毀者,取消訂閱,銷毀訂閱信號操作中間生成的一些輔助銷毀對象。
  • RACCompoundDisposable繼承自RACDisposable,是銷毀者集合,它通過_inlineDisposables_disposables管理著一個(gè)或者多個(gè)RACDisposable。
  • RACObserve:封裝系統(tǒng)的KVO,使用RACKVOTrampoline將觀察的相關(guān)信息封裝起來,使用中間層RACKVOProxy作為觀察者來進(jìn)行觀察,將原來的觀察者維護(hù)在NSMapTable表中,當(dāng)觀察到對象發(fā)生變化時(shí),RACKVOTrampoline通過回調(diào)通知RACKVOWrapper類,然后以發(fā)送信號的方式,通知外界。移除觀察也在RACKVOTrampoline中進(jìn)行處理。
  • RACSubject:訂閱信號的時(shí)候,將生成的RACPassthroughSubscriber訂閱者存在_subscribers數(shù)組里;發(fā)送信號的時(shí)候,遍歷數(shù)組,一一發(fā)放。
  • RACMulticastConnection,多播信號,用于將信號發(fā)送給多個(gè)訂閱者。與RACSignal不同的是,它只需要發(fā)送一次,所有訂閱者都會收到。
  • RACCommand用于表示事件的執(zhí)行,多用于響應(yīng)UI上的某些動(dòng)作。
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • RAC在iOS的實(shí)際開發(fā)中確實(shí)是一件有力的武器,此文將從以下幾方面講解 RACSignal RACSubject ...
    4b5cb36a2ee2閱讀 1,033評論 0 0
  • 前言 很多blog都說ReactiveCocoa好用,然后各種秀自己如何靈活運(yùn)用ReactiveCocoa,但是感...
    RainyGY閱讀 1,598評論 0 1
  • 1.ReactiveCocoa簡介 ReactiveCocoa(簡稱為RAC),是由Github開源的一個(gè)應(yīng)用于i...
    愛睡覺的魚閱讀 1,223評論 0 1
  • 1.ReactiveCocoa簡介 ReactiveCocoa(簡稱為RAC),是由Github開源的一個(gè)應(yīng)用于i...
    亂了夏末丶藍(lán)了海閱讀 914評論 0 0
  • 在RAC中最核心的類RACSiganl,搞定這個(gè)類就能用ReactiveCocoa開發(fā)了。 RACSiganl:信...
    Harely閱讀 228評論 0 0

友情鏈接更多精彩內(nèi)容