打造RxJava生命周期管理框架RxLife

在前邊RxJava實(shí)戰(zhàn)技巧大全一文中,我們介紹了RxJava使用過程中常見的應(yīng)用場(chǎng)景和可能遇到的問題,其中我們談到利用RxLifeCycle來管理RxJava的生命周期,避免內(nèi)存泄漏問題,今天自己動(dòng)手打生命周期管理框RxLife來加深對(duì)RxJava的認(rèn)識(shí)。


詳解Subject

什么是Subject

在RxJava當(dāng)中,有四個(gè)對(duì)象值得我們關(guān)注:Observable,Subject,Observer,Subscriber,它們之間的關(guān)系如下:


這里寫圖片描述

對(duì)于Observable,Observer,Subscriber我們比較熟悉,故不做說明,重點(diǎn)來看Subject。

通過上面的圖我們可以看出Subject繼承自O(shè)bservable,也就意味著Subject可以作為被觀察者,另外,它又實(shí)現(xiàn)了Observer接口,這意味著它也可以作為觀察者。不難看出,Subject既能作為Observer訂閱Observable,又能作為Observable被其他Observer訂閱。總之,Subject承擔(dān)了這么一種角色:對(duì)上作為觀察者,對(duì)下作為被觀察者。

和Observable必須有訂閱者才能發(fā)射數(shù)據(jù)不一樣,無論Subject是否有訂閱者,它都可以發(fā)射數(shù)據(jù)。這有點(diǎn)類似廣播電臺(tái),不會(huì)因?yàn)槲覀冴P(guān)閉收音機(jī)就停止廣播,在收聽的人自然收聽的到,沒收聽的人也無關(guān)緊要。

常見的Subject

從上面的uml中我們看出,RxJava為我們提供了四種常用的Subject,
即syncSubject,BehabviorSubject,PublishSubject,ReplaySubject,下面我們對(duì)這四者進(jìn)行說明。

AsyncSubject

AsyncSubject會(huì)緩存最后一個(gè)數(shù)據(jù)并在調(diào)用onCompleted()時(shí)將該數(shù)據(jù)發(fā)送給訂閱者,原理如下:

這里寫圖片描述

在該過程中,一旦發(fā)生任何異常都不會(huì)發(fā)送數(shù)據(jù)到訂閱者,而是發(fā)送給訂閱者一個(gè)異常通知,即訂閱者只能接受到一個(gè)異常的通知,如下:


這里寫圖片描述

舉例來說明AsyncSubject的用法:

asyncSubject.onNext("1");
asyncSubject.onNect("2");
asyncSubject.onCompleted();//必須調(diào)用才會(huì)開始發(fā)送數(shù)據(jù)

以上代碼執(zhí)行后,訂閱者接受到的數(shù)據(jù)是2.

BehaviorSubject

當(dāng)BehaviorSubject被訂閱后,它首先會(huì)發(fā)送原始Observable最近發(fā)射的數(shù)據(jù),如果最近沒有,會(huì)發(fā)射一個(gè)默認(rèn)值,接下繼續(xù)發(fā)射原始Observable的數(shù)據(jù),如下圖:


這里寫圖片描述

如果原始的Observable因?yàn)榘l(fā)生了錯(cuò)誤而終止,那么BehaviorSubject在發(fā)送一個(gè)錯(cuò)誤通知后不再發(fā)射數(shù)據(jù),如下:


這里寫圖片描述

我們舉例來說明BehabviorSubject的用法:

behaviorSubject.onNext("1");
behaviorSubject.onNect("2");
behaviorSubject.onNext("3");
behaviorSubject.subscribe(new Action<String>(){
    @Override
    public void call(String s){
        System.out.println(“result:”+s);
    }
});
behaviorSubject.onNext("4");

輸出結(jié)果是3,4.

PublishSubject

默認(rèn)情況下,RxJava中的Observable一旦被訂閱就開始發(fā)送事件,這和我們傳統(tǒng)的觀察者模式有所區(qū)別。而PublishSuject的行為則類似傳統(tǒng)的觀察這模式,觀察者可以先訂閱被觀察者,然后在某個(gè)時(shí)刻手動(dòng)調(diào)用方法來發(fā)射數(shù)據(jù)(訂閱之后的數(shù)據(jù))到所有的觀察者。如下圖:


這里寫圖片描述

如果原始的Observable因?yàn)榘l(fā)生了錯(cuò)誤而終止,那么PublishSubject在發(fā)送一個(gè)錯(cuò)誤通知后不再發(fā)射數(shù)據(jù),如下:


這里寫圖片描述

舉例來說明PublishSubject的用法:

publishSubject.onNext("1");
publishSubject.onNect("2");
publishSubject.onNext("3");//訂閱之前不會(huì)被發(fā)送

publishSubject.subscribe(new Action<String>(){
    @Override
    public void call(String s){
        System.out.println(“result:”+s);
    }
});
publishSubject.onNect("4");
publishSubject.onNect("5");

1,2,3是在訂閱之前的數(shù)據(jù),不會(huì)被發(fā)射,最終輸出結(jié)果是4,5。

ReplaySubject

ReplaySubject會(huì)緩存所有已經(jīng)發(fā)射的數(shù)據(jù),當(dāng)一個(gè)新的訂閱關(guān)系產(chǎn)生時(shí),ReplaySuject會(huì)將所有數(shù)據(jù)都發(fā)送給他。另外,ReplaySubject支持設(shè)置緩存數(shù)據(jù)和緩存時(shí)間。如下圖:


這里寫圖片描述

舉例來說明ReplaySubject的用法:

replaySubject.onNext("1");
replaySubject.onNect("2");
replaySubject.onNext("3");

replaySubject.subscribe(new Action<String>(){
    @Override
    public void call(String s){
        System.out.println(“result:”+s);
    }
});
replaySubject.onNect("4");

默認(rèn)情況下ReplaySubject會(huì)緩存所有的數(shù)據(jù),因此最終數(shù)據(jù)的結(jié)果如下:

result:1
result:2
result:3
result:4

小結(jié)

回顧上面所談的,不難看出不同的Subject最大的區(qū)別在于發(fā)送數(shù)據(jù)的行為不同,簡(jiǎn)單概括如下:

Subject 發(fā)射行為
AsyncSubject 不論訂閱發(fā)生在什么時(shí)候,只會(huì)發(fā)射最后一個(gè)數(shù)據(jù)
BehaviorSubject 發(fā)送訂閱之前一個(gè)數(shù)據(jù)和訂閱之后的全部數(shù)據(jù)
ReplaySubject 不論訂閱發(fā)生在什么時(shí)候,都發(fā)射全部數(shù)據(jù)
PublishSubject 發(fā)送訂閱之后全部數(shù)據(jù)

關(guān)于Subject更詳細(xì)的使用方法請(qǐng)直接查閱api doc.


實(shí)現(xiàn)生命周期管理框架(RxLife)

在了解Subject之后就可以開始考慮如何實(shí)現(xiàn)一個(gè)生命周期管理框架。每當(dāng)Activity或者Fragment的生命周期發(fā)生變化時(shí)我們都希望產(chǎn)生一個(gè)對(duì)應(yīng)的事件來通知當(dāng)前所有的訂閱者,這樣我們就可以根據(jù)對(duì)應(yīng)的事件去確定是否取消訂閱關(guān)系了。

從上面的描述中,我們有兩個(gè)問題要解決:

  1. 如何監(jiān)聽Activity或Fragmeng生命周期變化并將其發(fā)送出去。
  2. 原有的觀察者如何接受生命周期,并在某生命周期下中斷原有的事件流。

通過以上兩個(gè)問題,我們知道我們需要一個(gè)既能夠發(fā)射生命周期,又能接受生命周期的觀察者,因此不難想到這里需要Subject。生命周期是連續(xù)產(chǎn)生的,無論是否有訂閱者,我們只關(guān)注最最近的生命周期,因此我們選擇使用BehaviorSubject。

現(xiàn)在我們來考慮如何監(jiān)聽Activity或Fragment的生命周期,并利用BehaviorSubject發(fā)射生命周期。這里我們以Activity為例進(jìn)行說明。

生命周期事件監(jiān)聽

定義生命周期事件

我們根據(jù)Activity的生命周期,定義相應(yīng)的事件。

public enum  ActivityEvent {
    CREATE,
    RESUME,
    START,
    PAUSE,
    STOP,
    DESTORY
}

監(jiān)聽生命周期

為了能在Activitiy生命周期變化時(shí)發(fā)送相應(yīng)的事件,我們定義了RxAppcompatActivity,該類繼承了AppCompatActivity并重寫器生命周期方法:在不同方法中發(fā)射事件到BehaviorSubject中。這就好像我們的BehaviorSubject對(duì)象在不斷的觀察Activity生命周期的變化。當(dāng)然,由于Subject的特性,BehaviorSubject也具備了將這些事件發(fā)射出去的能力。

public class RxAppCompatActivity extends AppCompatActivity {
    protected final BehaviorSubject<ActivityEvent> lifeSubject = BehaviorSubject.create();

    @Override
    protected void onCreate(@Nullable Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        lifeSubject.onNext(ActivityEvent.CREATE);
    }

    @Override
    protected void onResume() {
        super.onResume();
        lifeSubject.onNext(ActivityEvent.RESUME);
    }

    @Override
    protected void onStart() {
        super.onStart();
        lifeSubject.onNext(ActivityEvent.START);
    }

    @Override
    protected void onPause() {
        super.onPause();
        lifeSubject.onNext(ActivityEvent.PAUSE);
    }

    @Override
    protected void onStop() {
        super.onStop();
        lifeSubject.onNext(ActivityEvent.STOP);
    }

    @Override
    protected void onDestroy() {
        super.onDestroy();
        lifeSubject.onNext(ActivityEvent.DESTORY);
    }
}


Observable自動(dòng)停止發(fā)射數(shù)據(jù)

到現(xiàn)在我們已經(jīng)利用Subject來監(jiān)視生命周期的變化,那又如何讓原有的Observable(比如網(wǎng)絡(luò)請(qǐng)求的Observable)來監(jiān)視Subject發(fā)射的數(shù)據(jù)呢,并根據(jù)Subject的狀態(tài)自動(dòng)停止原始數(shù)據(jù)的發(fā)射?換言之就是一個(gè)Observable如何在發(fā)射數(shù)據(jù)的同時(shí)監(jiān)視另一個(gè)Observable?

TakeUtil操作符

令人高興的是,RxJava中提供的TakeUntil操作符來實(shí)現(xiàn)上述需求。TakeUntil訂閱原始的Observable并發(fā)射數(shù)據(jù),此外它還監(jiān)視你提供的第二個(gè)Observable。當(dāng)?shù)诙€(gè)Observable發(fā)射了一項(xiàng)數(shù)據(jù)或者發(fā)射一項(xiàng)終止的通知時(shí)(onError通知或者onCompleted通知),TakeUntil返回的Observable會(huì)停止發(fā)射原始的Observable,如下圖所示:


這里寫圖片描述

我們用一個(gè)簡(jiǎn)單的例子來展示TakeUntil操作符的使用:

 Observable.interval(2, TimeUnit.SECONDS).subscribe(new Action1<Long>() {
            @Override
            public void call(Long num) {
                Log.d("MainActivity", "num:" + num);
            }
        });

上面的代碼每隔2s進(jìn)行輸出,現(xiàn)在我們希望5s后自動(dòng)停止輸出,就可以這樣做:

   Observable.interval(2, TimeUnit.SECONDS).takeUntil(Observable.timer(5,TimeUnit.SECONDS)).subscribe(new Action1<Long>() {
            @Override
            public void call(Long num) {
                Log.d("MainActivity", "num:" + num);
            }
        });

為了讓以上代碼更通用,我們利用compose操作符進(jìn)行改寫(對(duì)compose不熟悉的童鞋自行查閱資料):

private void startIntervalTask1() {
        Observable.interval(2, TimeUnit.SECONDS).compose(bindUntilDelay(5)).subscribe(new Action1<Long>() {
            @Override
            public void call(Long num) {
                Log.d("MainActivity", "num:" + num);
            }
        });

    }

    @NonNull
    private Observable.Transformer<Long, Long> bindUntilDelay(final int delaySecond) {
        return new Observable.Transformer<Long, Long>() {
            @Override
            public Observable<Long> call(Observable<Long> longObservable) {
                return longObservable.takeUntil(timer(delaySecond,TimeUnit.SECONDS));
            }
        };
    }


回到正題,現(xiàn)在我們已經(jīng)有了可以發(fā)射生命周期事件的BehaviorSubject,再結(jié)合TakeUntil不就可以實(shí)現(xiàn)在指定生命周期發(fā)生時(shí)自動(dòng)停止原有的Observable了嗎?

結(jié)合BehaviorSubject與TakeUntil

有了上面的知識(shí)做鋪墊,實(shí)現(xiàn)生命周期管理框架也就顯得輕而易舉了。
為了方便使用,我們?cè)赗xAppcompatActivity中提供了bindUntilEvent(ActivityEvent nindEvent)方法:

public class RxAppCompatActivity extends AppCompatActivity {
    protected final BehaviorSubject<ActivityEvent> lifeSubject = BehaviorSubject.create();

    public <T> Observable.Transformer<T, T> bindUntilEvent(final ActivityEvent bindEvent) {
        //被監(jiān)視的Observable
        final Observable<ActivityEvent> observable = lifeSubject.takeFirst(new Func1<ActivityEvent, Boolean>() {
            @Override
            public Boolean call(ActivityEvent event) {
                return event.equals(bindEvent);
            }
        });

        return new Observable.Transformer<T, T>() {
            @Override
            public Observable<T> call(Observable<T> sourceOb) {
                return sourceOb.takeUntil(observable);
            }
        };
    }


    @Override
    protected void onCreate(@Nullable Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        lifeSubject.onNext(ActivityEvent.CREATE);
    }

    @Override
    protected void onResume() {
        super.onResume();
        lifeSubject.onNext(ActivityEvent.RESUME);
    }

    @Override
    protected void onStart() {
        super.onStart();
        lifeSubject.onNext(ActivityEvent.START);
    }

    @Override
    protected void onPause() {
        super.onPause();
        lifeSubject.onNext(ActivityEvent.PAUSE);
    }

    @Override
    protected void onStop() {
        super.onStop();
        lifeSubject.onNext(ActivityEvent.STOP);
    }

    @Override
    protected void onDestroy() {
        super.onDestroy();
        lifeSubject.onNext(ActivityEvent.DESTORY);
    }
}

接下來,我們用同樣的方式來處理Fragment或者其他組件即可。

具體使用

新建的Activity需要繼承我們的RxAppcompatActivity,新建的Fragment則繼承我們的RxFragment,就是這么簡(jiǎn)單。

我們同樣還是以師父說為例,由于我們的方法基本和RxLifeCycle保持一致,因此只要簡(jiǎn)單的改動(dòng)就可以讓RxLife工作起來,現(xiàn)在就可以用RxLife來代替RxLifeCycle。

仍然做個(gè)簡(jiǎn)單的示例:

ApiFactory.getWXApi().getWXHot(AppConstant.KEY_WX, getPageSize(), mCurrentPage + 1).compose(this.bindUntilEvent(FragmentEvent.STOP))
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(mSubscriber);

總結(jié)

通過自行實(shí)現(xiàn)一個(gè)RxJava生命周期管理框架(RxLife)加深le我們對(duì)RxJava中Subject的理解。另外,Subject的應(yīng)用非常廣泛,在下篇文章中,我們將會(huì)進(jìn)一步深入,利用Subject來打造自己的事件通信總線RxBus。

githubhttps://github.com/closedevice/FastApp

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 本篇文章介主要紹RxJava中操作符是以函數(shù)作為基本單位,與響應(yīng)式編程作為結(jié)合使用的,對(duì)什么是操作、操作符都有哪些...
    嘎啦果安卓獸閱讀 2,984評(píng)論 0 10
  • 創(chuàng)建操作 用于創(chuàng)建Observable的操作符Create通過調(diào)用觀察者的方法從頭創(chuàng)建一個(gè)ObservableEm...
    rkua閱讀 1,957評(píng)論 0 1
  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn),斷路器,智...
    卡卡羅2017閱讀 136,569評(píng)論 19 139
  • 前言 歡迎繼續(xù)收看《我所理解的RxJava--上手其實(shí)很簡(jiǎn)單(二)》,上周出了第一篇,各位程序猿大大的反應(yīng)還不錯(cuò),...
    Weavey閱讀 29,811評(píng)論 35 133
  • 前幾天,有一位初一的學(xué)生,津津地說 當(dāng)上帝給你關(guān)上一扇窗戶的同時(shí) 也會(huì)為你打開另一扇門 而于我,面對(duì)他這樣的口誤,...
    不落的塵埃閱讀 320評(píng)論 0 0

友情鏈接更多精彩內(nèi)容