RxJava 的 Subject

streams everywhere.png

Subject 是一種特殊的存在

在前面一篇文章Cold Observable 和 Hot Observable中,曾經(jīng)介紹過 Subject 既是 Observable 又是 Observer(Subscriber)。官網(wǎng)稱 Subject 可以看成是一個(gè)橋梁或者代理。

Subject的分類

Subject包含四種類型分別是AsyncSubject、BehaviorSubject、ReplaySubject和PublishSubject。

1. AsyncSubject

Observer會(huì)接收AsyncSubject的onComplete()之前的最后一個(gè)數(shù)據(jù)。

AsyncSubject<String> subject = AsyncSubject.create();
        subject.onNext("asyncSubject1");
        subject.onNext("asyncSubject2");
        subject.onComplete();
        subject.subscribe(new Consumer<String>() {
            @Override
            public void accept(@NonNull String s) throws Exception {
                System.out.println("asyncSubject:"+s);
            }
        }, new Consumer<Throwable>() {
            @Override
            public void accept(@NonNull Throwable throwable) throws Exception {
                System.out.println("asyncSubject onError");  //不輸出(異常才會(huì)輸出)
            }
        }, new Action() {
            @Override
            public void run() throws Exception {
                System.out.println("asyncSubject:complete");  //輸出 asyncSubject onComplete
            }
        });

        subject.onNext("asyncSubject3");
        subject.onNext("asyncSubject4");

執(zhí)行結(jié)果:

asyncSubject:asyncSubject2
asyncSubject:complete

改一下代碼,將subject.onComplete()放在最后。

        AsyncSubject<String> subject = AsyncSubject.create();
        subject.onNext("asyncSubject1");
        subject.onNext("asyncSubject2");

        subject.subscribe(new Consumer<String>() {
            @Override
            public void accept(@NonNull String s) throws Exception {
                System.out.println("asyncSubject:"+s);
            }
        }, new Consumer<Throwable>() {
            @Override
            public void accept(@NonNull Throwable throwable) throws Exception {
                System.out.println("asyncSubject onError");  //不輸出(異常才會(huì)輸出)
            }
        }, new Action() {
            @Override
            public void run() throws Exception {
                System.out.println("asyncSubject:complete");  //輸出 asyncSubject onComplete
            }
        });

        subject.onNext("asyncSubject3");
        subject.onNext("asyncSubject4");
        subject.onComplete();

執(zhí)行結(jié)果:

asyncSubject:asyncSubject4
asyncSubject:complete

注意,subject.onComplete()必須要調(diào)用才會(huì)開始發(fā)送數(shù)據(jù),否則Subscriber將不接收任何數(shù)據(jù)。

2. BehaviorSubject

Observer會(huì)接收到BehaviorSubject被訂閱之前的最后一個(gè)數(shù)據(jù),再接收訂閱之后發(fā)射過來的數(shù)據(jù)。如果BehaviorSubject被訂閱之前沒有發(fā)送任何數(shù)據(jù),則會(huì)發(fā)送一個(gè)默認(rèn)數(shù)據(jù)。

        BehaviorSubject<String> subject = BehaviorSubject.createDefault("behaviorSubject1");

        subject.subscribe(new Consumer<String>() {
            @Override
            public void accept(@NonNull String s) throws Exception {
                System.out.println("behaviorSubject:"+s); 
            }
        }, new Consumer<Throwable>() {
            @Override
            public void accept(@NonNull Throwable throwable) throws Exception {
                System.out.println("behaviorSubject onError");  //不輸出(異常才會(huì)輸出)
            }
        }, new Action() {
            @Override
            public void run() throws Exception {
                System.out.println("behaviorSubject:complete");  //輸出 behaviorSubject onComplete
            }
        });

        subject.onNext("behaviorSubject2");
        subject.onNext("behaviorSubject3");

執(zhí)行結(jié)果:

behaviorSubject:behaviorSubject1
behaviorSubject:behaviorSubject2
behaviorSubject:behaviorSubject3

在這里,behaviorSubject1是默認(rèn)值。因?yàn)閳?zhí)行了

BehaviorSubject<String> subject = BehaviorSubject.createDefault("behaviorSubject1");

稍微改一下代碼,在subscribe()之前,再發(fā)射一個(gè)事件。

        BehaviorSubject<String> subject = BehaviorSubject.createDefault("behaviorSubject1");
        subject.onNext("behaviorSubject2");

        subject.subscribe(new Consumer<String>() {
            @Override
            public void accept(@NonNull String s) throws Exception {
                System.out.println("behaviorSubject:"+s);  //輸出asyncSubject:asyncSubject3
            }
        }, new Consumer<Throwable>() {
            @Override
            public void accept(@NonNull Throwable throwable) throws Exception {
                System.out.println("behaviorSubject onError");  //不輸出(異常才會(huì)輸出)
            }
        }, new Action() {
            @Override
            public void run() throws Exception {
                System.out.println("behaviorSubject:complete");  //輸出 behaviorSubject onComplete
            }
        });

        subject.onNext("behaviorSubject3");
        subject.onNext("behaviorSubject4");

執(zhí)行結(jié)果:

behaviorSubject:behaviorSubject2
behaviorSubject:behaviorSubject3
behaviorSubject:behaviorSubject4

這次丟棄了默認(rèn)值,而發(fā)射behaviorSubject2。
因?yàn)锽ehaviorSubject 每次只會(huì)發(fā)射調(diào)用subscribe()方法之前的最后一個(gè)事件和調(diào)用subscribe()方法之后的事件。

BehaviorSubject還可以緩存最近一次發(fā)出信息的數(shù)據(jù)。

3. ReplaySubject

ReplaySubject會(huì)發(fā)射所有來自原始Observable的數(shù)據(jù)給觀察者,無論它們是何時(shí)訂閱的。

        ReplaySubject<String> subject = ReplaySubject.create();
        subject.onNext("replaySubject1");
        subject.onNext("replaySubject2");

        subject.subscribe(new Consumer<String>() {
            @Override
            public void accept(@NonNull String s) throws Exception {
                System.out.println("replaySubject:"+s);
            }
        }, new Consumer<Throwable>() {
            @Override
            public void accept(@NonNull Throwable throwable) throws Exception {
                System.out.println("replaySubject onError");  //不輸出(異常才會(huì)輸出)
            }
        }, new Action() {
            @Override
            public void run() throws Exception {
                System.out.println("replaySubject:complete");  //輸出 replaySubject onComplete
            }
        });

        subject.onNext("replaySubject3");
        subject.onNext("replaySubject4");

執(zhí)行結(jié)果:

replaySubject:replaySubject1
replaySubject:replaySubject2
replaySubject:replaySubject3
replaySubject:replaySubject4

稍微改一下代碼,將create()改成createWithSize(1)只緩存訂閱前最后發(fā)送的1條數(shù)據(jù)

        ReplaySubject<String> subject = ReplaySubject.createWithSize(1);
        subject.onNext("replaySubject1");
        subject.onNext("replaySubject2");

        subject.subscribe(new Consumer<String>() {
            @Override
            public void accept(@NonNull String s) throws Exception {
                System.out.println("replaySubject:"+s);
            }
        }, new Consumer<Throwable>() {
            @Override
            public void accept(@NonNull Throwable throwable) throws Exception {
                System.out.println("replaySubject onError");  //不輸出(異常才會(huì)輸出)
            }
        }, new Action() {
            @Override
            public void run() throws Exception {
                System.out.println("replaySubject:complete");  //輸出 replaySubject onComplete
            }
        });

        subject.onNext("replaySubject3");
        subject.onNext("replaySubject4");

執(zhí)行結(jié)果:

replaySubject:replaySubject2
replaySubject:replaySubject3
replaySubject:replaySubject4

這個(gè)執(zhí)行結(jié)果跟BehaviorSubject是一樣的。但是從并發(fā)的角度來看,ReplaySubject 在處理并發(fā) subscribe() 和 onNext() 時(shí)會(huì)更加復(fù)雜。

ReplaySubject除了可以限制緩存數(shù)據(jù)的數(shù)量和還能限制緩存的時(shí)間。使用createWithTime()即可。

4. PublishSubject

Observer只接收PublishSubject被訂閱之后發(fā)送的數(shù)據(jù)。

        PublishSubject<String> subject = PublishSubject.create();
        subject.onNext("publicSubject1");
        subject.onNext("publicSubject2");
        subject.onComplete();

        subject.subscribe(new Consumer<String>() {
            @Override
            public void accept(@NonNull String s) throws Exception {
                System.out.println("publicSubject:"+s);
            }
        }, new Consumer<Throwable>() {
            @Override
            public void accept(@NonNull Throwable throwable) throws Exception {
                System.out.println("publicSubject onError");  //不輸出(異常才會(huì)輸出)
            }
        }, new Action() {
            @Override
            public void run() throws Exception {
                System.out.println("publicSubject:complete");  //輸出 publicSubject onComplete
            }
        });

        subject.onNext("publicSubject3");
        subject.onNext("publicSubject4");

執(zhí)行結(jié)果:

publicSubject:complete

因?yàn)閟ubject在訂閱之前,已經(jīng)執(zhí)行了onComplete()方法,所以無法發(fā)射數(shù)據(jù)。稍微改一下代碼,將onComplete()方法放在最后。

        PublishSubject<String> subject = PublishSubject.create();
        subject.onNext("publicSubject1");
        subject.onNext("publicSubject2");

        subject.subscribe(new Consumer<String>() {
            @Override
            public void accept(@NonNull String s) throws Exception {
                System.out.println("publicSubject:"+s);
            }
        }, new Consumer<Throwable>() {
            @Override
            public void accept(@NonNull Throwable throwable) throws Exception {
                System.out.println("publicSubject onError");  //不輸出(異常才會(huì)輸出)
            }
        }, new Action() {
            @Override
            public void run() throws Exception {
                System.out.println("publicSubject:complete");  //輸出 publicSubject onComplete
            }
        });

        subject.onNext("publicSubject3");
        subject.onNext("publicSubject4");
        subject.onComplete();

執(zhí)行結(jié)果:

publicSubject:publicSubject3
publicSubject:publicSubject4
publicSubject:complete

最后,一句話總結(jié)一下四個(gè)Subject的特性。

Subject 發(fā)射行為
AsyncSubject 不論訂閱發(fā)生在什么時(shí)候,只會(huì)發(fā)射最后一個(gè)數(shù)據(jù)
BehaviorSubject 發(fā)送訂閱之前一個(gè)數(shù)據(jù)和訂閱之后的全部數(shù)據(jù)
ReplaySubject 不論訂閱發(fā)生在什么時(shí)候,都發(fā)射全部數(shù)據(jù)
PublishSubject 發(fā)送訂閱之后全部數(shù)據(jù)

可能錯(cuò)過的事件

Subject 作為一個(gè)Observable時(shí),可以不停地調(diào)用onNext()來發(fā)送事件,直到遇到onComplete()才會(huì)結(jié)束。

PublishSubject<String> subject = PublishSubject.create();
        subject.subscribe(new Consumer<String>() {
                    @Override
                    public void accept(@NonNull String s) throws Exception {
                        System.out.println(s);
                    }
                }, new Consumer<Throwable>() {

                    @Override
                    public void accept(@NonNull Throwable throwable) throws Exception {

                    }
                },new Action() {
                    @Override
                    public void run() throws Exception {
                        System.out.println("completed");
                    }
                });
        subject.onNext("Foo");
        subject.onNext("Bar");
        subject.onComplete();

執(zhí)行的結(jié)果:

Foo
Bar
completed

如果,使用 subsribeOn 操作符將 subject 切換到IO線程,再使用 Thread.sleep(2000) 讓主線程休眠2秒。

 PublishSubject<String> subject = PublishSubject.create();
        subject.subscribeOn(Schedulers.io())
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(@NonNull String s) throws Exception {
                        System.out.println(s);
                    }
                }, new Consumer<Throwable>() {

                    @Override
                    public void accept(@NonNull Throwable throwable) throws Exception {

                    }
                },new Action() {
                    @Override
                    public void run() throws Exception {
                        System.out.println("completed");
                    }
                });
        subject.onNext("Foo");
        subject.onNext("Bar");
        subject.onComplete();
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

這時(shí),其執(zhí)行的結(jié)果變?yōu)椋?/p>

completed

為何會(huì)缺少打印Foo和Bar?

因?yàn)?,subject 發(fā)射元素的線程被指派到了 IO 線程,此時(shí) IO 線程正在初始化還沒起來,subject 發(fā)射前這兩個(gè)元素Foo、Bar還在主線程中,主線程的這兩個(gè)元素往 IO 線程轉(zhuǎn)發(fā)的過程中由于 IO 線程還沒有起來,所以就被丟棄了。此時(shí),無論Thread睡了多少秒,F(xiàn)oo、Bar都不會(huì)被打印出來。

其實(shí),解決辦法也很簡單,將subject改成使用Observable.create()來替代,它允許為每個(gè)訂閱者精確控制事件的發(fā)送,這樣就不會(huì)缺少打印Foo和Bar。

使用PublishSubject來實(shí)現(xiàn)簡化的RxBus

下面的代碼是一個(gè)簡化版本的Event Bus,在這里使用了PublishSubject。因?yàn)槭录偩€是基于發(fā)布/訂閱模式實(shí)現(xiàn)的,如果某一事件在多個(gè)Activity/Fragment中被訂閱的話,在App的任意地方一旦發(fā)布該事件,則多個(gè)訂閱的地方都能夠同時(shí)收到這一事件(在這里,訂閱事件的Activity/Fragment不能被destory,一旦被destory就不能收到事件),這很符合Hot Observable的特性。所以,我們使用PublishSubject,考慮到多線程的情況,還需要使用 Subject 的 toSerialized() 方法。

import io.reactivex.Observable;
import io.reactivex.subjects.PublishSubject;
import io.reactivex.subjects.Subject;

public class RxBus {

    private final Subject<Object> mBus;

    private RxBus() {
        mBus = PublishSubject.create().toSerialized();
    }

    public static RxBus get() {
        return Holder.BUS;
    }

    public void post(Object obj) {
        mBus.onNext(obj);
    }

    public <T> Observable<T> toObservable(Class<T> tClass) {
        return mBus.ofType(tClass);
    }

    public Observable<Object> toObservable() {
        return mBus;
    }

    public boolean hasObservers() {
        return mBus.hasObservers();
    }

    private static class Holder {
        private static final RxBus BUS = new RxBus();
    }
}

在這里Subject的toSerialized(),使用SerializedSubject包裝了原先的Subject。

    /**
     * Wraps this Subject and serializes the calls to the onSubscribe, onNext, onError and
     * onComplete methods, making them thread-safe.
     * <p>The method is thread-safe.
     * @return the wrapped and serialized subject
     */
    @NonNull
    public final Subject<T> toSerialized() {
        if (this instanceof SerializedSubject) {
            return this;
        }
        return new SerializedSubject<T>(this);
    }

這個(gè)版本的Event Bus比較簡單,并沒有考慮到背壓的情況,因?yàn)樵?RxJava2.x 中 Subject 已經(jīng)不再支持背壓了。如果要增加背壓的處理,可以使用Processor,我們需要將 PublishSubject 改成 PublishProcessor,對(duì)應(yīng)的 Observable 也需要改成 Flowable。

使用BehaviorSubject來實(shí)現(xiàn)預(yù)加載

預(yù)加載可以很好的提高程序的用戶體驗(yàn)。
每當(dāng)用戶處于弱網(wǎng)絡(luò)時(shí),打開一個(gè)App可能出現(xiàn)一片空白或者一直在loading,那用戶一定會(huì)很煩躁。此時(shí),如果能夠預(yù)先加載一些數(shù)據(jù),例如上一次打開App時(shí)保存的數(shù)據(jù),這樣不至于會(huì)損傷App的用戶體驗(yàn)。

下面是借助 BehaviorSubject 的特性來實(shí)現(xiàn)一個(gè)簡單的預(yù)加載類RxPreLoader。

import io.reactivex.disposables.Disposable;
import io.reactivex.functions.Consumer;
import io.reactivex.subjects.BehaviorSubject;

/**
 * Created by Tony Shen on 2017/6/2.
 */

public class RxPreLoader<T> {

    //能夠緩存訂閱之前的最新數(shù)據(jù)
    private  BehaviorSubject<T> mData;
    private Disposable disposable;

    public RxPreLoader(T defaultValue) {

        mData = BehaviorSubject.createDefault(defaultValue);
    }

    /**
     * 發(fā)送事件
     * @param object
     */
    public void publish(T object) {
        mData.onNext(object);
    }

    /**
     * 訂閱事件
     * @param onNext
     * @return
     */
    public  Disposable subscribe(Consumer onNext) {
        disposable = mData.subscribe(onNext);
        return disposable;
    }

    /**
     * 反訂閱
     *
     */
    public void dispose() {
        if (disposable != null && !disposable.isDisposed()) {
            disposable.dispose();
            disposable = null;
        }
    }

    /**
     * 獲取緩存數(shù)據(jù)的Subject
     *
     * @return
     */
    public BehaviorSubject<T> getCacheDataSubject() {
        return mData;
    }

    /**
     * 直接獲取最近的一個(gè)數(shù)據(jù)
     *
     * @return
     */
    public T getLastCacheData() {
        return mData.getValue();
    }
}

可以考慮在基類的Activity/Fragment中也實(shí)現(xiàn)一個(gè)類似的RxPreLoader。

總結(jié)

RxJava 的 Subject 是一種特殊的存在,它的靈活性在使用時(shí)也會(huì)伴隨著風(fēng)險(xiǎn),沒有用好它的話會(huì)錯(cuò)過事件,并且使用時(shí)還要小心 Subject 不是線程安全的。當(dāng)然很多開源框架都在使用Subject,例如大名鼎鼎的RxLifecycle使用了BehaviorSubject。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容