『可變』的熱信號 RACSubject

在 ReactiveCocoa 中除了不可變的信號 RACSignal,也有用于橋接非 RAC 代碼到 ReactiveCocoa 世界的『可變』信號 RACSubject。

![“Mutable” RACSignal — RACSubject](http://img.draveness.me/2017-02-07-“Mutable” RACSignal — RACSubject.png)

RACSubject 到底是什么?根據(jù)其字面意思,可以將它理解為一個(gè)可以訂閱的主題,我們在訂閱主題之后,向主題發(fā)送新的消息時(shí),所有的訂閱者都會接收到最新的消息。

但是這么解釋確實(shí)有點(diǎn)晦澀,也不易于理解,ReactiveCocoa 團(tuán)隊(duì)對 RACSubject 的解釋是,RACSubject 其實(shí)就是一個(gè)可以手動(dòng)控制的信號(感覺這么解釋更難理解了)。

A subject, represented by the RACSubject class, is a signal that can be manually controlled.

RACSubject 簡介

RACSubjectRACSignal 的子類,與 RACSignal 以及 RACSequence 有著眾多的類簇不同,RACSubject 在整個(gè)工程中并沒有多少子類;不過,在大多數(shù)情況下,我們也只會使用 RACSubject 自己或者 RACReplaySubject。

![RACSubject - Subclasses](http://img.draveness.me/2017-02-07-RACSubject - Subclasses.png)

相比于 RACSignal 豐富的頭文件 ,RACSubject 對外的接口并沒有提供太多的方法:

@interface RACSubject : RACSignal <RACSubscriber>

+ (instancetype)subject;

@end

唯一提供的接口就是用于返回一個(gè)新實(shí)例的 +subject 方法;除此之外,在筆者看來它與 RACSignal 最大的不同就是:RACSubject 實(shí)現(xiàn)了 RACSubscriber 協(xié)議,也就是下面的這些方法:

@protocol RACSubscriber <NSObject>
@required

- (void)sendNext:(nullable id)value;
- (void)sendError:(nullable NSError *)error;
- (void)sendCompleted;
- (void)didSubscribeWithDisposable:(RACCompoundDisposable *)disposable;

@end

我們并不能在一個(gè) RACSignal 對象上執(zhí)行這些方法,只能在創(chuàng)建信號的 block 里面向遵循 RACSubscriber 協(xié)議的對象發(fā)送新的值或者錯(cuò)誤,這也是 RACSubject 和父類最大的不同:在 RACSubject 實(shí)例初始化之后,也可以通過這個(gè)實(shí)例向所有的訂閱者發(fā)送消息。

冷信號與熱信號

提到 RACSubject 就不得不提 ReactiveCocoa 中的另一對概念,冷信號和熱信號。

其實(shí)解釋這兩者之間區(qū)別的文章已經(jīng)很多了,我相信各位讀者能找到很多的資料,在這里就簡單介紹一下冷熱信號的概念,如果想要了解更多的內(nèi)容可以在 References 中找到更多的文章。

對于冷熱信號概念,我們借用 Rx 中的描述:

Hot-Signal-And-Cold-Signa

Cold signal is sequences that are passive and start producing notifications on request (when subscribed to), and hot signal is sequences that are active and produce notifications regardless of subscriptions. ---- Hot and Cold observables

冷信號是被動(dòng)的,只會在被訂閱時(shí)向訂閱者發(fā)送通知;熱信號是主動(dòng)的,它會在任意時(shí)間發(fā)出通知,與訂閱者的訂閱時(shí)間無關(guān)。

也就是說冷信號所有的訂閱者會在訂閱時(shí)收到完全相同的序列;而訂閱熱信號之后,只會收到在訂閱之后發(fā)出的序列。

熱信號的訂閱者能否收到消息取決于訂閱的時(shí)間。

熱信號在我們生活中有很多的例子,比如訂閱雜志時(shí)并不會把之前所有的期刊都送到我們手中,只會接收到訂閱之后的期刊;而對于冷信號的話,舉一個(gè)不恰當(dāng)?shù)睦?,每一年的高考考生在『訂閱』高考之后,收到往年所有的試卷,并在高考之后會取消訂閱?/p>

熱信號 RACSubject

在 ReactiveCocoa 中,我們使用 RACSignal 來表示冷信號,也就是每一個(gè)訂閱者在訂閱信號時(shí)都會收到完整的序列;RACSubject 用于表示熱信號,訂閱者接收到多少值取決于它訂閱的時(shí)間。

前面的文章中已經(jīng)對 RACSignal 冷信號有了很多的介紹,這里也就不會多說了;這一小節(jié)主要的內(nèi)容是想通過一個(gè)例子,簡單展示 RACSubject 的訂閱者收到的內(nèi)容與訂閱時(shí)間的關(guān)系:

RACSubject *subject = [RACSubject subject];

// Subscriber 1
[subject subscribeNext:^(id  _Nullable x) {
    NSLog(@"1st Sub: %@", x);
}];
[subject sendNext:@1];

// Subscriber 2
[subject subscribeNext:^(id  _Nullable x) {
    NSLog(@"2nd Sub: %@", x);
}];
[subject sendNext:@2];

// Subscriber 3
[subject subscribeNext:^(id  _Nullable x) {
    NSLog(@"3rd Sub: %@", x);
}];
[subject sendNext:@3];
[subject sendCompleted];

這里以圖的方式來展示整個(gè)訂閱與訂閱者接收消息的過程:

Track-RACSubject-Subscription-Process

從圖中我們可以清楚的看到,幾個(gè)訂閱者根據(jù)訂閱時(shí)間的不同收到了不同的數(shù)字序列,RACSubject時(shí)間相關(guān)的,它在發(fā)送消息時(shí)只會向已訂閱的訂閱者推送消息。

RACSubject 的實(shí)現(xiàn)

RACSubject 的實(shí)現(xiàn)并不復(fù)雜,它『可變』的特性都來源于持有的訂閱者數(shù)組 subscribers,在每次執(zhí)行 subscribeNext:error:completed: 一類便利方法時(shí),都會將傳入的 id<RACSubscriber> 對象加入數(shù)組:

- (RACDisposable *)subscribe:(id<RACSubscriber>)subscriber {
    RACCompoundDisposable *disposable = [RACCompoundDisposable compoundDisposable];
    subscriber = [[RACPassthroughSubscriber alloc] initWithSubscriber:subscriber signal:self disposable:disposable];

    NSMutableArray *subscribers = self.subscribers;
    @synchronized (subscribers) {
        [subscribers addObject:subscriber];
    }
    
    [disposable addDisposable:[RACDisposable disposableWithBlock:^{
        @synchronized (subscribers) {
            NSUInteger index = [subscribers indexOfObjectWithOptions:NSEnumerationReverse passingTest:^ BOOL (id<RACSubscriber> obj, NSUInteger index, BOOL *stop) {
                return obj == subscriber;
            }];

            if (index != NSNotFound) [subscribers removeObjectAtIndex:index];
        }
    }]];

    return disposable;
}

訂閱的過程分為三個(gè)部分:

  1. 初始化一個(gè) RACPassthroughSubscriber 實(shí)例;
  2. subscriber 加入 RACSubject 持有的數(shù)組中;
  3. 創(chuàng)建一個(gè) RACDisposable 對象,在當(dāng)前 subscriber 銷毀時(shí),將自身從數(shù)組中移除。
Send-Subscibe-to-RACSubject

-subscribe: 將所有遵循 RACSubscriber 協(xié)議的對象全部加入當(dāng)前 RACSubject 持有的數(shù)組 subscribers 中。

在上一節(jié)的例子中,我們能對 RACSubject 發(fā)送 -sendNext: 等消息也都取決于它實(shí)現(xiàn)了 ?RACSubscriber 協(xié)議:

- (void)sendNext:(id)value {
    [self enumerateSubscribersUsingBlock:^(id<RACSubscriber> subscriber) {
        [subscriber sendNext:value];
    }];
}

- (void)sendError:(NSError *)error {
    [self.disposable dispose];
    
    [self enumerateSubscribersUsingBlock:^(id<RACSubscriber> subscriber) {
        [subscriber sendError:error];
    }];
}

- (void)sendCompleted {
    [self.disposable dispose];
    
    [self enumerateSubscribersUsingBlock:^(id<RACSubscriber> subscriber) {
        [subscriber sendCompleted];
    }];
}

RACSubject 會在自身接受到這些方法時(shí),下發(fā)給持有的全部的 subscribers。

Send-Messages-to-RACSubject

代碼中的 -enumerateSubscribersUsingBlock: 只是一個(gè)使用 for 循環(huán)遍歷 subscribers 的安全方法:

- (void)enumerateSubscribersUsingBlock:(void (^)(id<RACSubscriber> subscriber))block {
    NSArray *subscribers;
    @synchronized (self.subscribers) {
        subscribers = [self.subscribers copy];
    }

    for (id<RACSubscriber> subscriber in subscribers) {
        block(subscriber);
    }
}

RACSubject 就是圍繞一個(gè) NSMutableArray 數(shù)組實(shí)現(xiàn)的,實(shí)現(xiàn)還是非常簡單的,只是在需要訪問 subscribers 的方法中使用 @synchronized 避免線程競爭。

@interface RACSubject ()

@property (nonatomic, strong, readonly) NSMutableArray *subscribers;

@end

RACSubject 提供的初始化類方法 +subject 也只是初始化了幾個(gè)成員變量:

+ (instancetype)subject {
    return [[self alloc] init];
}

- (instancetype)init {
    self = [super init];
    if (self == nil) return nil;

    _disposable = [RACCompoundDisposable compoundDisposable];
    _subscribers = [[NSMutableArray alloc] initWithCapacity:1];
    
    return self;
}

至此,對于 RACSubject 的分析就結(jié)束了,接下來會分析更多的子類。

RACBehaviorSubject 與 RACReplaySubject

這一節(jié)會介紹 RACSubject 的兩個(gè)子類 RACBehaviorSubjectRACReplaySubject,前者在訂閱時(shí)會向訂閱者發(fā)送最新的消息,后者在訂閱之后可以重新發(fā)送之前的所有消息序列。

RACBehaviorSubject

先來介紹兩者中實(shí)現(xiàn)較簡單的 RACBehaviorSubject,它在內(nèi)部會保存一個(gè) currentValue 對象,也就是最后一次發(fā)送的消息:

@interface RACBehaviorSubject ()

@property (nonatomic, strong) id currentValue;

@end

在每次執(zhí)行 -sendNext: 時(shí),都會對 RACBehaviorSubject 中保存的 currentValue 進(jìn)行更新,并使用父類的 -sendNext: 方法,向所有的訂閱者發(fā)送最新的消息:

- (void)sendNext:(id)value {
    @synchronized (self) {
        self.currentValue = value;
        [super sendNext:value];
    }
}

RACBehaviorSubject 最重要的特性就是在訂閱時(shí),向最新的訂閱者發(fā)送之前的消息,這是通過覆寫 -subscribe: 方法實(shí)現(xiàn)的。

在調(diào)用子類的 -subscribe: 方法之后,會在 subscriber 對象上執(zhí)行 -sendNext: 方法:

- (RACDisposable *)subscribe:(id<RACSubscriber>)subscriber {
    RACDisposable *subscriptionDisposable = [super subscribe:subscriber];

    RACDisposable *schedulingDisposable = [RACScheduler.subscriptionScheduler schedule:^{
        @synchronized (self) {
            [subscriber sendNext:self.currentValue];
        }
    }];
    
    return [RACDisposable disposableWithBlock:^{
        [subscriptionDisposable dispose];
        [schedulingDisposable dispose];
    }];
}

接下來,通過一個(gè)簡單的例子來演示 RACBehaviorSubject 到底是如何工作的:

RACBehaviorSubject *subject = [RACBehaviorSubject subject];

[subject subscribeNext:^(id  _Nullable x) {
    NSLog(@"1st Sub: %@", x);
}];
[subject sendNext:@1];

[subject subscribeNext:^(id  _Nullable x) {
    NSLog(@"2nd Sub: %@", x);
}];
[subject sendNext:@2];

[subject subscribeNext:^(id  _Nullable x) {
    NSLog(@"3rd Sub: %@", x);
}];
[subject sendNext:@3];
[subject sendCompleted];

上面的代碼其實(shí)與 RACSubject 一節(jié)中的代碼差不多,只將 RACSubject 轉(zhuǎn)換成了 RACBehaviorSubject 對象。

Track-RACBehaviorSubject-Subscription-Process

在每次訂閱者訂閱 RACBehaviorSubject 之后,都會向該訂閱者發(fā)送最新的消息,這也就是 RACBehaviorSubject 最重要的行為。

RACBehaviorSubject 有一個(gè)用于創(chuàng)建包含默認(rèn)值的類方法 +behaviorSubjectWithDefaultValue:,如果將上面的第一行代碼改成:

RACBehaviorSubject *subject = [RACBehaviorSubject behaviorSubjectWithDefaultValue:@0];

那么在第一個(gè)訂閱者剛訂閱 RACBehaviorSubject 時(shí)就會收到 @0 對象。

Track-RACBehaviorSubject-Subscription-Process-With-Default-Value

RACReplaySubject

RACReplaySubject 相當(dāng)于一個(gè)自帶 bufferRACBehaviorSubject,它可以在每次有新的訂閱者訂閱之后發(fā)送之前的全部消息。

@interface RACReplaySubject ()

@property (nonatomic, assign, readonly) NSUInteger capacity;
@property (nonatomic, strong, readonly) NSMutableArray *valuesReceived;

@end

實(shí)現(xiàn)的方式是通過持有一個(gè) valuesReceived 的數(shù)組和能夠存儲的對象的上限 capacity,默認(rèn)值為:

const NSUInteger RACReplaySubjectUnlimitedCapacity = NSUIntegerMax;

當(dāng)然你可以用 +replaySubjectWithCapacity: 初始化一個(gè)其它大小的 RACReplaySubject 對象:

+ (instancetype)replaySubjectWithCapacity:(NSUInteger)capacity {
    return [(RACReplaySubject *)[self alloc] initWithCapacity:capacity];
}

- (instancetype)initWithCapacity:(NSUInteger)capacity {
    self = [super init];
    
    _capacity = capacity;
    _valuesReceived = (capacity == RACReplaySubjectUnlimitedCapacity ? [NSMutableArray array] : [NSMutableArray arrayWithCapacity:capacity]);
    
    return self;
}

在每次調(diào)用 -sendNext: 方法發(fā)送消息時(shí),都會將其加入 valuesReceived 數(shù)組中,并踢出之前的元素:

- (void)sendNext:(id)value {
    @synchronized (self) {
        [self.valuesReceived addObject:value ?: RACTupleNil.tupleNil];
        [super sendNext:value];
        
        if (self.capacity != RACReplaySubjectUnlimitedCapacity && self.valuesReceived.count > self.capacity) {
            [self.valuesReceived removeObjectsInRange:NSMakeRange(0, self.valuesReceived.count - self.capacity)];
        }
    }
}

需要注意的有兩點(diǎn),一是對 valuesReceived 的數(shù)組的操作必須使用 @synchronized 加鎖;第二,如果 value 為空的話,也需要將其轉(zhuǎn)換成 RACTupleNil.tupleNil 對象進(jìn)行保存。

Send-Messages-to-RACReplaySubject

-sendError:-sendCompleted 方法都會標(biāo)記對應(yīng) flag,即 hasCompletedhasError,這里就不介紹了;同樣的,RACReplaySubject 也覆寫了 -subscribe: 方法,在每次有訂閱者訂閱時(shí)重新發(fā)送所有的序列:

- (RACDisposable *)subscribe:(id<RACSubscriber>)subscriber {
    RACCompoundDisposable *compoundDisposable = [RACCompoundDisposable compoundDisposable];

    RACDisposable *schedulingDisposable = [RACScheduler.subscriptionScheduler schedule:^{
        @synchronized (self) {
            for (id value in self.valuesReceived) {
                if (compoundDisposable.disposed) return;

                [subscriber sendNext:(value == RACTupleNil.tupleNil ? nil : value)];
            }

            if (compoundDisposable.disposed) return;

            if (self.hasCompleted) {
                [subscriber sendCompleted];
            } else if (self.hasError) {
                [subscriber sendError:self.error];
            } else {
                RACDisposable *subscriptionDisposable = [super subscribe:subscriber];
                [compoundDisposable addDisposable:subscriptionDisposable];
            }
        }
    }];

    [compoundDisposable addDisposable:schedulingDisposable];

    return compoundDisposable;
}

我們?nèi)匀皇褂蒙弦还?jié)中的例子來展示 RACReplaySubject 是如何工作的,只修改第一行代碼:

RACReplaySubject *subject = [RACReplaySubject subject];

[subject subscribeNext:^(id  _Nullable x) {
    NSLog(@"1st Subscriber: %@", x);
}];
[subject sendNext:@1];

[subject subscribeNext:^(id  _Nullable x) {
    NSLog(@"2nd Subscriber: %@", x);
}];
[subject sendNext:@2];

[subject subscribeNext:^(id  _Nullable x) {
    NSLog(@"3rd Subscriber: %@", x);
}];
[subject sendNext:@3];
[subject sendCompleted];

運(yùn)行這段代碼之后,會得到如下圖的結(jié)果:

Track-RACReplaySubject-Subscription-Process

所有訂閱 RACReplaySubject 的對象(默認(rèn)行為)都能獲得完整的序列,而這個(gè)特性在與 RACMulticastConnection 一起使用也有著巨大威力,我們會在之后的文章中介紹。

總結(jié)

RACSubjectRACSignal 對象之上進(jìn)行了簡單的修改,將原有的冷信號改造成了熱信號,將不可變變成了可變。

雖然 RACSubject 的實(shí)現(xiàn)并不復(fù)雜,只是存儲了一個(gè)遵循 RACSubscriber 協(xié)議的對象列表以及所有的消息,但是在解決實(shí)際問題時(shí)卻能夠很好地解決很多與網(wǎng)絡(luò)操作相關(guān)的問題。

References

Github Repo:iOS-Source-Code-Analyze

Follow: Draveness · GitHub

Source: http://draveness.me/racsubject

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容