RX第二章

為什么是Observables?

在面向?qū)ο蟮募軜?gòu)中,開(kāi)發(fā)者致力于創(chuàng)建一組解耦的實(shí)體。這樣的話,實(shí)體就可以在不用妨礙整個(gè)系統(tǒng)的情況下可以被測(cè)試、復(fù)用和維護(hù)。設(shè)計(jì)這種系統(tǒng)就帶來(lái)一個(gè)棘手的負(fù)面影響:維護(hù)相關(guān)對(duì)象之間的統(tǒng)一。

在Smalltalk MVC架構(gòu)中,創(chuàng)建模式的第一個(gè)例子就是用來(lái)解決這個(gè)問(wèn)題的。用戶界面框架提供一種途徑使UI元素與包含數(shù)據(jù)的實(shí)體對(duì)象相分離,并且同時(shí),它提供一種靈活的方法來(lái)保持它們之間的同步。

在這本暢銷(xiāo)的四人組編寫(xiě)的《設(shè)計(jì)模式——可復(fù)用面向?qū)ο筌浖幕A(chǔ)》一書(shū)中,觀察者模式是最有名的設(shè)計(jì)模式之一。它是一種行為模式并提供一種以一對(duì)多的依賴來(lái)綁定對(duì)象的方法:即當(dāng)一個(gè)對(duì)象發(fā)生變化時(shí),依賴它的所有對(duì)象都會(huì)被通知并且會(huì)自動(dòng)更新。

在本章中,我們將會(huì)對(duì)觀察者模式有一個(gè)概述,它是如何實(shí)現(xiàn)的以及如何用RxJava來(lái)擴(kuò)展,Observable是什么,以及Observables如何與Iterables相關(guān)聯(lián)。

你什么時(shí)候使用觀察者模式?

觀察者模式很適合下面這些場(chǎng)景中的任何一個(gè):

  • 當(dāng)你的架構(gòu)有兩個(gè)實(shí)體類(lèi),一個(gè)依賴另一個(gè),你想讓它們互不影響或者是獨(dú)立復(fù)用它們時(shí)。
  • 當(dāng)一個(gè)變化的對(duì)象通知那些與它自身變化相關(guān)聯(lián)的未知數(shù)量的對(duì)象時(shí)。
  • 當(dāng)一個(gè)變化的對(duì)象通知那些無(wú)需推斷具體是誰(shuí)的對(duì)象時(shí)。

RxJava觀察者模式工具包

在RxJava的世界里,我們有四種角色:

  • Observable
  • Observer
  • Subscriber
  • Subjects

Observables和Subjects是兩個(gè)“生產(chǎn)”實(shí)體,Observers和Subscribers是兩個(gè)“消費(fèi)”實(shí)體。

Observable

當(dāng)我們異步執(zhí)行一些復(fù)雜的事情,Java提供了傳統(tǒng)的類(lèi),例如Thread、Future、FutureTask、CompletableFuture來(lái)處理這些問(wèn)題。當(dāng)復(fù)雜度提升,這些方案就會(huì)變得麻煩和難以維護(hù)。最糟糕的是,它們都不支持鏈?zhǔn)秸{(diào)用。

RxJava Observables被設(shè)計(jì)用來(lái)解決這些問(wèn)題。它們靈活,且易于使用,也可以鏈?zhǔn)秸{(diào)用,并且可以作用于單個(gè)結(jié)果程序上,更有甚者,也可以作用于序列上。無(wú)論何時(shí)你想發(fā)射單個(gè)標(biāo)量值,或者一連串值,甚至是無(wú)窮個(gè)數(shù)值流,你都可以使用Observable。

Observable的生命周期包含了三種可能的易于與Iterable生命周期事件相比較的事件,下表展示了如何將Observable async/push 與 Iterable sync/pull相關(guān)聯(lián)起來(lái)。

Event Iterable(pull) Observable(push)
檢索數(shù)據(jù) T next() onNext(T)
發(fā)現(xiàn)錯(cuò)誤 throws Exception onError(Throwable)
完成 !hasNext() onCompleted()

使用Iterable時(shí),消費(fèi)者從生產(chǎn)者那里以同步的方式得到值,在這些值得到之前線程處于阻塞狀態(tài)。相反,使用Observable時(shí),生產(chǎn)者以異步的方式把值推給觀察者,無(wú)論何時(shí),這些值都是可用的。這種方法之所以更靈活是因?yàn)榧幢阒凳峭交虍惒椒绞降竭_(dá),消費(fèi)者在這兩種場(chǎng)景都可以根據(jù)自己的需要來(lái)處理。

為了更好地復(fù)用Iterable接口,RxJava Observable類(lèi)擴(kuò)展了GOF觀察者模式的語(yǔ)義。引入了兩個(gè)新的接口:

  • onCompleted() 即通知觀察者Observable沒(méi)有更多的數(shù)據(jù)。
  • onError() 即觀察者有錯(cuò)誤出現(xiàn)了。

熱Observables和冷Observables

從發(fā)射物的角度來(lái)看,有兩種不同的Observables:熱的和冷的。一個(gè)"熱"的Observable典型的只要一創(chuàng)建完就開(kāi)始發(fā)射數(shù)據(jù),因此所有后續(xù)訂閱它的觀察者可能從序列中間的某個(gè)位置開(kāi)始接受數(shù)據(jù)(有一些數(shù)據(jù)錯(cuò)過(guò)了)。一個(gè)"冷"的Observable會(huì)一直等待,直到有觀察者訂閱它才開(kāi)始發(fā)射數(shù)據(jù),因此這個(gè)觀察者可以確保會(huì)收到整個(gè)數(shù)據(jù)序列。

創(chuàng)建一個(gè)Observable

在接下來(lái)的小節(jié)中將討論Observables提供的兩種創(chuàng)建Observable的方法。

Observable.create()

create()方法使開(kāi)發(fā)者有能力從頭開(kāi)始創(chuàng)建一個(gè)Observable。它需要一個(gè)OnSubscribe對(duì)象,這個(gè)對(duì)象繼承Action1,當(dāng)觀察者訂閱我們的Observable時(shí),它作為一個(gè)參數(shù)傳入并執(zhí)行call()函數(shù)。

Observable.create(new Observable.OnSubscribe<Object>(){
        @Override
        public void call(Subscriber<? super Object> subscriber) {
            
        }
});

Observable通過(guò)使用subscriber變量并根據(jù)條件調(diào)用它的方法來(lái)和觀察者通信。讓我們看一個(gè)“現(xiàn)實(shí)世界”的例子:

Observable<Integer> observableString = Observable.create(new Observable.OnSubscribe<Integer>() {
        @Override
        public void call(Subscriber<? super Integer> observer) {
            for (int i = 0; i < 5; i++) {
                observer.onNext(i);
            }
            observer.onCompleted();
        }
});

Subscription subscriptionPrint = observableString.subscribe(new Observer<Integer>() {
    @Override
    public void onCompleted() {
        System.out.println("Observable completed");
    }

    @Override
    public void onError(Throwable e) {
        System.out.println("Oh,no! Something wrong happened!");
    }

    @Override
    public void onNext(Integer item) {
        System.out.println("Item is " + item);
    }
});

例子故意寫(xiě)的簡(jiǎn)單,是因?yàn)榧幢闶悄愕谝淮我?jiàn)到RxJava的操作,我想讓你明白接下來(lái)要發(fā)生什么。

我們創(chuàng)建一個(gè)新的Observable<Integer>,它執(zhí)行了5個(gè)元素的for循環(huán),一個(gè)接一個(gè)的發(fā)射他們,最后完成。

另一方面,我們訂閱了Observable,返回一個(gè)Subscription
。一旦我們訂閱了,我們就開(kāi)始接受整數(shù),并一個(gè)接一個(gè)的打印出它們。我們并不知道要接受多少整數(shù)。事實(shí)上,我們也無(wú)需知道是因?yàn)槲覀優(yōu)槊糠N場(chǎng)景都提供對(duì)應(yīng)的處理操作:

  • 如果我們接收到了整數(shù),那么就打印它。
  • 如果序列結(jié)束,我們就打印一個(gè)關(guān)閉的序列信息。
  • 如果錯(cuò)誤發(fā)生了,我們就打印一個(gè)錯(cuò)誤信息。

Observable.from()

在上一個(gè)例子中,我們創(chuàng)建了一個(gè)整數(shù)序列并一個(gè)一個(gè)的發(fā)射它們。假如我們已經(jīng)有一個(gè)列表呢?我們是不是可以不用for循環(huán)而也可以一個(gè)接一個(gè)的發(fā)射它們呢?

在下面的例子代碼中,我們從一個(gè)已有的列表中創(chuàng)建一個(gè)Observable序列:

List<Integer> items = new ArrayList<Integer>();
items.add(1);
items.add(10);
items.add(100);
items.add(200);

Observable<Integer> observableString = Observable.from(items);
Subscription subscriptionPrint = observableString.subscribe(new Observer<Integer>() {
    @Override
    public void onCompleted() {
        System.out.println("Observable completed");
    }

    @Override
    public void onError(Throwable e) {
        System.out.println("Oh,no! Something wrong happened!");
    }

    @Override
    public void onNext(Integer item) {
        System.out.println("Item is " + item);
    }
});

輸出的結(jié)果和上面的例子絕對(duì)是一樣的。

from()創(chuàng)建符可以從一個(gè)列表/數(shù)組來(lái)創(chuàng)建Observable,并一個(gè)接一個(gè)的從列表/數(shù)組中發(fā)射出來(lái)每一個(gè)對(duì)象,或者也可以從Java Future類(lèi)來(lái)創(chuàng)建Observable,并發(fā)射Future對(duì)象的.get()方法返回的結(jié)果值。傳入Future作為參數(shù)時(shí),我們可以指定一個(gè)超時(shí)的值。Observable將等待來(lái)自Future的結(jié)果;如果在超時(shí)之前仍然沒(méi)有結(jié)果返回,Observable將會(huì)觸發(fā)onError()方法通知觀察者有錯(cuò)誤發(fā)生了。

Observable.just()

如果我們已經(jīng)有了一個(gè)傳統(tǒng)的Java函數(shù),我們想把它轉(zhuǎn)變?yōu)橐粋€(gè)Observable又改怎么辦呢?我們可以用create()方法,正如我們先前看到的,或者我們也可以像下面那樣使用以此來(lái)省去許多模板代碼:

Observable<String> observableString = Observable.just(helloWorld());

Subscription subscriptionPrint = observableString.subscribe(new Observer<String>() {
    @Override
    public void onCompleted() {
        System.out.println("Observable completed");
    }

    @Override
    public void onError(Throwable e) {
        System.out.println("Oh,no! Something wrong happened!");
    }

    @Override
    public void onNext(String message) {
        System.out.println(message);
    }
});

helloWorld()方法比較簡(jiǎn)單,像這樣:

private String helloWorld(){
    return "Hello World";
}

不管怎樣,它可以是我們想要的任何函數(shù)。在剛才的例子中,我們一旦創(chuàng)建了Observable,just()執(zhí)行函數(shù),當(dāng)我們訂閱Observable時(shí),它就會(huì)發(fā)射出返回的值。

just()方法可以傳入一到九個(gè)參數(shù),它們會(huì)按照傳入的參數(shù)的順序來(lái)發(fā)射它們。just()方法也可以接受列表或數(shù)組,就像from()方法,但是它不會(huì)迭代列表發(fā)射每個(gè)值,它將會(huì)發(fā)射整個(gè)列表。通常,當(dāng)我們想發(fā)射一組已經(jīng)定義好的值時(shí)會(huì)用到它。但是如果我們的函數(shù)不是時(shí)變性的,我們可以用just來(lái)創(chuàng)建一個(gè)更有組織性和可測(cè)性的代碼庫(kù)。

最后注意just()創(chuàng)建符,它發(fā)射出值后,Observable正常結(jié)束,在上面那個(gè)例子中,我們會(huì)在控制臺(tái)打印出兩條信息:“Hello World”和“Observable completed”。

Observable.empty(),Observable.never(),和Observable.throw()

當(dāng)我們需要一個(gè)Observable毫無(wú)理由的不再發(fā)射數(shù)據(jù)正常結(jié)束時(shí),我們可以使用empty()。我們可以使用never()創(chuàng)建一個(gè)不發(fā)射數(shù)據(jù)并且也永遠(yuǎn)不會(huì)結(jié)束的Observable。我們也可以使用throw()創(chuàng)建一個(gè)不發(fā)射數(shù)據(jù)并且以錯(cuò)誤結(jié)束的Observable。

Subject = Observable + Observer

subject是一個(gè)神奇的對(duì)象,它可以是一個(gè)Observable同時(shí)也可以是一個(gè)Observer:它作為連接這兩個(gè)世界的一座橋梁。一個(gè)Subject可以訂閱一個(gè)Observable,就像一個(gè)觀察者,并且它可以發(fā)射新的數(shù)據(jù),或者傳遞它接受到的數(shù)據(jù),就像一個(gè)Observable。很明顯,作為一個(gè)Observable,觀察者們或者其它Subject都可以訂閱它。

一旦Subject訂閱了Observable,它將會(huì)觸發(fā)Observable開(kāi)始發(fā)射。如果原始的Observable是“冷”的,這將會(huì)對(duì)訂閱一個(gè)“熱”的Observable變量產(chǎn)生影響。

RxJava提供四種不同的Subject:

  • PublishSubject
  • BehaviorSubject
  • ReplaySubject.
  • AsyncSubject

PublishSubject

Publish是Subject的一個(gè)基礎(chǔ)子類(lèi)。讓我們看看用PublishSubject實(shí)現(xiàn)傳統(tǒng)的Observable Hello World:

PublishSubject<String> stringPublishSubject = PublishSubject.create();
Subscription subscriptionPrint = stringPublishSubject.subscribe(new Observer<String>() {
    @Override
    public void onCompleted() {
        System.out.println("Observable completed");
    }

    @Override
    public void onError(Throwable e) {
        System.out.println("Oh,no!Something wrong happened!");                
    }

    @Override
    public void onNext(String message) {
        System.out.println(message);
    }
});
stringPublishSubject.onNext("Hello World");

在剛才的例子中,我們創(chuàng)建了一個(gè)PublishSubject,用create()方法發(fā)射一個(gè)String值,然后我們訂閱了PublishSubject。此時(shí),沒(méi)有數(shù)據(jù)要發(fā)送,因此我們的觀察者只能等待,沒(méi)有阻塞線程,也沒(méi)有消耗資源。就在這隨時(shí)準(zhǔn)備從subject接收值,如果subject沒(méi)有發(fā)射值那么我們的觀察者就會(huì)一直在等待。再次聲明的是,無(wú)需擔(dān)心:觀察者知道在每個(gè)場(chǎng)景中該做什么,我們不用擔(dān)心什么時(shí)候是因?yàn)樗琼憫?yīng)式的:系統(tǒng)會(huì)響應(yīng)。我們并不關(guān)心它什么時(shí)候響應(yīng)。我們只關(guān)心它響應(yīng)時(shí)該做什么。

最后一行代碼展示了手動(dòng)發(fā)射字符串“Hello World”,它觸發(fā)了觀察者的onNext()方法,讓我們?cè)诳刂婆_(tái)打印出“Hello World”信息。

讓我們看一個(gè)更復(fù)雜的例子。話說(shuō)我們有一個(gè)private聲明的Observable,外部不能訪問(wèn)。Observable在它生命周期內(nèi)發(fā)射值,我們不用關(guān)心這些值,我們只關(guān)心他們的結(jié)束。

首先,我們創(chuàng)建一個(gè)新的PublishSubject來(lái)響應(yīng)它的onNext()方法,并且外部也可以訪問(wèn)它。

final PublishSubject<Boolean> subject = PublishSubject.create();
        
subject.subscribe(new Observer<Boolean>() {
    @Override
    public void onCompleted() {
        
    }

    @Override
    public void onError(Throwable e) {

    }

    @Override
    public void onNext(Boolean aBoolean) {
        System.out.println("Observable Completed");
    }
});

然后,我們創(chuàng)建“私有”的Observable,只有subject才可以訪問(wèn)的到。

Observable.create(new Observable.OnSubscribe<Integer>() {
    @Override
    public void call(Subscriber<? super Integer> subscriber) {
        for (int i = 0; i < 5; i++) {
            subscriber.onNext(i);
        }
        subscriber.onCompleted();
    }
}).doOnCompleted(new Action0() {
    @Override
    public void call() {
        subject.onNext(true);
    }
}).subscribe();

Observable.create()方法包含了我們熟悉的for循環(huán),發(fā)射數(shù)字。doOnCompleted()方法指定當(dāng)Observable結(jié)束時(shí)要做什么事情:在subject上發(fā)射true。最后,我們訂閱了Observable。很明顯,空的subscribe()調(diào)用僅僅是為了開(kāi)啟Observable,而不用管已發(fā)出的任何值,也不用管完成事件或者錯(cuò)誤事件。為了這個(gè)例子我們需要它像這樣。

在這個(gè)例子中,我們創(chuàng)建了一個(gè)可以連接Observables并且同時(shí)可被觀測(cè)的實(shí)體。當(dāng)我們想為公共資源創(chuàng)建獨(dú)立、抽象或更易觀測(cè)的點(diǎn)時(shí),這是極其有用的。

BehaviorSubject

簡(jiǎn)單的說(shuō),BehaviorSubject會(huì)首先向他的訂閱者發(fā)送截至訂閱前最新的一個(gè)數(shù)據(jù)對(duì)象(或初始值),然后正常發(fā)送訂閱后的數(shù)據(jù)流。

BehaviorSubject<Integer> behaviorSubject = BehaviorSubject.create(1);

在這個(gè)短例子中,我們創(chuàng)建了一個(gè)能發(fā)射整形(Integer)的BehaviorSubject。由于每當(dāng)Observes訂閱它時(shí)就會(huì)發(fā)射最新的數(shù)據(jù),所以它需要一個(gè)初始值。

ReplaySubject

ReplaySubject會(huì)緩存它所訂閱的所有數(shù)據(jù),向任意一個(gè)訂閱它的觀察者重發(fā):

ReplaySubject<Integer> replaySubject = ReplaySubject.create();

AsyncSubject

當(dāng)Observable完成時(shí)AsyncSubject只會(huì)發(fā)布最后一個(gè)數(shù)據(jù)給已經(jīng)訂閱的每一個(gè)觀察者。

AsyncSubject<Integer> asyncSubject = AsyncSubject.create();

總結(jié)

本章中,我們了解到了什么是觀察者模式,為什么Observables在今天的編程場(chǎng)景中如此重要,以及如何創(chuàng)建Observables和subjects。

下一章中,我們將創(chuàng)建第一個(gè)基于RxJava的Android應(yīng)用程序,學(xué)習(xí)如何檢索數(shù)據(jù)來(lái)填充listview,以及探索如何創(chuàng)建一個(gè)基于RxJava的響應(yīng)式UI。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 響應(yīng)式編程簡(jiǎn)介 響應(yīng)式編程是一種基于異步數(shù)據(jù)流概念的編程模式。數(shù)據(jù)流就像一條河:它可以被觀測(cè),被過(guò)濾,被操作,或者...
    說(shuō)碼解字閱讀 3,557評(píng)論 0 5
  • 本篇文章介主要紹RxJava中操作符是以函數(shù)作為基本單位,與響應(yīng)式編程作為結(jié)合使用的,對(duì)什么是操作、操作符都有哪些...
    嘎啦果安卓獸閱讀 2,984評(píng)論 0 10
  • 作者: maplejaw本篇只解析標(biāo)準(zhǔn)包中的操作符。對(duì)于擴(kuò)展包,由于使用率較低,如有需求,請(qǐng)讀者自行查閱文檔。 創(chuàng)...
    maplejaw_閱讀 46,210評(píng)論 8 93
  • 創(chuàng)建操作 用于創(chuàng)建Observable的操作符Create通過(guò)調(diào)用觀察者的方法從頭創(chuàng)建一個(gè)ObservableEm...
    rkua閱讀 1,957評(píng)論 0 1
  • RxJava技術(shù)分享 京金所—時(shí)光 2016.9.22 這里我拿出來(lái)給 Android 開(kāi)發(fā)者的 RxJava 詳...
    JC_Mobile閱讀 5,676評(píng)論 3 55

友情鏈接更多精彩內(nèi)容