優(yōu)美的異步 --- RxAndroid

優(yōu)美的異步 --- RxAndroid

這里和大家一起分享一下一個(gè)著名的Android異步庫(kù)RxAndroid。它應(yīng)該是2016年最流行的開(kāi)源庫(kù)之一。RxAndroid起源于RxJava,是一個(gè)專門針對(duì)Android版本的Rxjava庫(kù)。RxAndroid-Github 目前最新的版本是v2.0.x我們今天的分享也基于2.0版本的API。

響應(yīng)式編程

什么是響應(yīng)式編程?和平常經(jīng)常聽(tīng)說(shuō)的面向?qū)ο缶幊毯秃瘮?shù)式編程一樣,響應(yīng)式編程(Reactive Programming)就是一個(gè)編程范式,但是與其他編程范式不同的是它是基于數(shù)據(jù)流和變化傳播的。我們經(jīng)常在程序中這樣寫

A = B + C

A被賦值為B和C的值。這時(shí),如果我們改變B的值,A的值并不會(huì)隨之改變。而如果我們運(yùn)用一種機(jī)制,當(dāng)B或者C的值發(fā)現(xiàn)變化的時(shí)候,A的值也隨之改變,這樣就實(shí)現(xiàn)了響應(yīng)式
而響應(yīng)式編程的提出,其目的就是簡(jiǎn)化類似的操作,因此它在用戶界面編程領(lǐng)域以及基于實(shí)時(shí)系統(tǒng)的動(dòng)畫(huà)方面都有廣泛的應(yīng)用。另一方面,在處理嵌套回調(diào)的異步事件,復(fù)雜的列表過(guò)濾和變換的時(shí)候也都有良好的表現(xiàn)。
RxAndroid其實(shí)是一個(gè)響應(yīng)式編程思想的實(shí)現(xiàn)庫(kù)。也因?yàn)檫@樣的思想,是它在一些方面表現(xiàn)的異常優(yōu)秀。下面我將先用一個(gè)簡(jiǎn)單的例子,讓大家直觀的感受一下的樣子。

網(wǎng)絡(luò)加載圖片顯示
Observable.just(getDrawableFromNet())
        .subscribeOn(Schedulers.newThread())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Consumer<Drawable>() {
            @Override
            public void accept(Drawable drawable) throws Exception {
                ((ImageView)findViewById(R.id.imageView)).setImageDrawable(drawable);
            }
        });

環(huán)境搭建

RxAndroid環(huán)境只需求要引入如下項(xiàng)目即可,我們不但需要RxAndroid項(xiàng)目還需要RxJava項(xiàng)目。

compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
compile 'io.reactivex.rxjava2:rxjava:2.1.5'

基礎(chǔ)知識(shí)

RxAndroid的核心就是“異步”兩個(gè)字,其最關(guān)鍵的東西就是三個(gè):

Observable(被觀察者)
Observer(觀察者)
Subscriber (訂閱)
Observable可以理解為事件的發(fā)送者,就好像快遞的寄出者,而這些事件就好比快遞
Observer可以理解為事件的接收者,就好像快遞的接收者

Subscriber 綁定兩者

Observable可以發(fā)出一系列的 事件,這里的事件可以是任何東西,例如網(wǎng)絡(luò)請(qǐng)求、復(fù)雜計(jì)算處理、數(shù)據(jù)庫(kù)操作、文件操作等等,事件執(zhí)行結(jié)束后交給 Observer回調(diào)處理。

那他們之間是如何進(jìn)行聯(lián)系的呢?答案就是通過(guò)subscribe()方法。
下面我們通過(guò)一個(gè)HelloDemo來(lái)看看Observable與Observer進(jìn)行關(guān)聯(lián)的典型方式,

  private void test_1() {
        Observable<String> oble = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
                e.onNext("hello");
                e.onComplete();
                e.onNext("hello2");

            }
        });

        Observer<String> oser = new Observer<String>() {
            @Override
            public void onSubscribe(@NonNull Disposable d) {
                Log.w("kaelpu","onSubscribe");
            }

            @Override
            public void onNext(@NonNull String s) {
                Log.w("kaelpu","onNext = "+s);
            }

            @Override
            public void onError(@NonNull Throwable e) {
                Log.w("kaelpu","onError" + e);
            }

            @Override
            public void onComplete() {
                Log.w("kaelpu","onComplete");
            }
        };

        Log.w("kaelpu","subscribe");
        oble.subscribe(oser);

    }

10-21 01:28:01.600 11386-11386/? W/kaelpu: subscribe
10-21 01:28:01.600 11386-11386/? W/kaelpu: onSubscribe
10-21 01:28:01.600 11386-11386/? W/kaelpu: onNext = hello
10-21 01:28:01.600 11386-11386/? W/kaelpu: onComplete

其實(shí)這段代碼干了三件事:

  1. 創(chuàng)建被觀察者對(duì)象oble
  2. 創(chuàng)建觀察者oser
  3. 連接觀察者和被觀察者

被觀察者通過(guò)onNext函數(shù)給觀察者通知結(jié)果
被貫徹者onComplete函數(shù)通知觀察者執(zhí)行結(jié)束
連接觀察者和被觀察者我們使用subscribe函數(shù)

  • 通過(guò)打印的log我們可以看到觀察者函數(shù)調(diào)用情況,調(diào)用subscribe函數(shù)去綁定觀察者和被觀察者時(shí)候,觀察者的onSubscribe函數(shù)會(huì)被回調(diào)表示建立關(guān)聯(lián)。
  • 接著每當(dāng)被觀察者調(diào)用onNext給觀察者發(fā)送數(shù)據(jù)時(shí)候,觀察者的onNext 會(huì)收到回調(diào),并且得到所發(fā)送的數(shù)據(jù)。
  • 當(dāng)被觀察者調(diào)用onComplete函數(shù)時(shí)候,代表著完成,觀察者的onComplete回調(diào)會(huì)被觸發(fā),并且斷開(kāi)了兩者的關(guān)聯(lián),這時(shí)被觀察者再發(fā)送數(shù)據(jù),觀察者也不會(huì)收到。

當(dāng)然我們注意到觀察者還有一個(gè)onError函數(shù)沒(méi)有被觸發(fā)過(guò),那么該怎么觸發(fā)呢,又代表著什么意思呢?我們來(lái)改變一下代碼:

    private String error() throws Exception {
        throw new Exception();
    }

添加一個(gè)函數(shù),名字隨便但是返回值是String,這里我們叫做error函數(shù)。函數(shù)很簡(jiǎn)單就是拋出一個(gè)異常。然后我們繼續(xù)修改被觀察者的代碼如下:

  Observable<String> oble = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
                e.onNext("hello");
                e.onNext(error());
                e.onNext("hello1");
                e.onComplete();
                e.onNext("hello2");

            }
        });

其實(shí)我們就添加了兩行,添加了一個(gè)e.onNext(error()) 并且在之后還添加了一個(gè)e.onNext("hello1") 運(yùn)行一下我們看看

W/kaelpu: subscribe
W/kaelpu: onSubscribe
W/kaelpu: onNexthello
W/kaelpu: onErrorjava.lang.Exception

折斷l(xiāng)og說(shuō)明三個(gè)問(wèn)題:

  1. 被觀察者onNext中是可以運(yùn)行函數(shù)的
  2. 如果運(yùn)行的函數(shù)報(bào)錯(cuò),則會(huì)調(diào)用我們觀察者的onError函數(shù)
  3. 當(dāng)調(diào)用onError函數(shù)時(shí)候,也會(huì)斷開(kāi)關(guān)聯(lián),被觀察者收不到后面的數(shù)據(jù),但是觀察者依然會(huì)繼續(xù)發(fā)送。

最為關(guān)鍵的是onComplete和onError必須唯一并且互斥, 即不能發(fā)多個(gè)onComplete, 也不能發(fā)多個(gè)onError, 也不能先發(fā)一個(gè)onComplete, 然后再發(fā)一個(gè)onError, 反之亦然。

關(guān)于onComplete和onError唯一并且互斥這一點(diǎn), 是需要自行在代碼中進(jìn)行控制, 如果你的代碼邏輯中違背了這個(gè)規(guī)則, 并不一定會(huì)導(dǎo)致程序崩潰. 比如發(fā)送多個(gè)onComplete是可以正常運(yùn)行的, 依然是收到第一個(gè)onComplete就不再接收了, 但若是發(fā)送多個(gè)onError, 則收到第二個(gè)onError事件會(huì)導(dǎo)致程序會(huì)崩潰.當(dāng)我們寫多個(gè)onComplete時(shí),不會(huì)報(bào)錯(cuò)。

除了被觀察者能斷開(kāi)關(guān)聯(lián),觀察者也能主動(dòng)斷開(kāi)連接,調(diào)用onSubscribe函數(shù)中傳入的對(duì)象Disposable的dispose()函數(shù)即可完成斷開(kāi)連接,同樣關(guān)聯(lián)斷開(kāi)后,被觀察者依然會(huì)繼續(xù)發(fā)送數(shù)據(jù)

**講到這里第一感覺(jué)是不是?**

Paste_Image.png

就輸出個(gè)數(shù)字就這么麻煩,完全沒(méi)看出哪里方便了!別著急我剛開(kāi)始看RxAndroid文章也是這樣的感覺(jué),而且很多網(wǎng)上的文章都沒(méi)有解釋這個(gè)問(wèn)題。所以看一會(huì)你就更暈了。別著急我一起深呼吸,來(lái)看看如何簡(jiǎn)化操作

你可能覺(jué)得,我就打印幾個(gè)數(shù),還要把Observable寫的那么麻煩,能不能簡(jiǎn)便一點(diǎn)呢?答案是肯定的,RxAndroid內(nèi)置了很多簡(jiǎn)化創(chuàng)建Observable對(duì)象的函數(shù),比如Observable.just就是用來(lái)創(chuàng)建只發(fā)出一個(gè)事件就結(jié)束的Observable對(duì)象,上面創(chuàng)建Observable對(duì)象的代碼可以簡(jiǎn)化為一行

        Observable<String> observable = Observable.just("hello");

同樣對(duì)于Observer,這個(gè)例子中,我們其實(shí)并不關(guān)心OnComplete和OnError,我們只需要在onNext的時(shí)候做一些處理,這時(shí)候就可以使用Consumer類。

        Observable<String> observable = Observable.just("hello");
        Consumer<String> consumer = new Consumer<String>() {
           @Override
           public void accept(String s) throws Exception {
               System.out.println(s);
           }
        };
        observable.subscribe(consumer);

其實(shí)在RxAndroid中,我們可以為 Observer中的三種狀態(tài)根據(jù)自身需要分別創(chuàng)建一個(gè)回調(diào)動(dòng)作,通過(guò)Action 來(lái)替代onComplete():,通過(guò)Consumer來(lái)替代 onError(Throwable t)和onNext(T t)

Observable<String> observable = Observable.just("hello");
    Action onCompleteAction = new Action() {
        @Override
        public void run() throws Exception {
            Log.i("kaelpu", "complete");
        }
    };
    Consumer<String> onNextConsumer = new Consumer<String>() {
        @Override
        public void accept(String s) throws Exception {
            Log.i("kaelpu", s);
        }
    };
    Consumer<Throwable> onErrorConsumer = new Consumer<Throwable>() {
        @Override
        public void accept(Throwable throwable) throws Exception {
            Log.i("kaelpu", "error");
        }
    };
    observable.subscribe(onNextConsumer, onErrorConsumer, onCompleteAction);

}

subscribe()有多個(gè)重載的方法:

 public final Disposable subscribe() {}
 public final Disposable subscribe(Consumer<? super T> onNext) {}
 public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError) {} 
 public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete) {}
 public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Disposable> onSubscribe) {}
 public final void subscribe(Observer<? super T> observer) {}

不帶任何參數(shù)的subscribe() 表示Observer不關(guān)心任何事件,Observable發(fā)送什么數(shù)據(jù)都隨你
帶有一個(gè)Consumer參數(shù)的方法表示Observer只關(guān)心onNext事件, 其他的事件我假裝沒(méi)看見(jiàn), 因此我們?nèi)绻恍枰猳nNext事件可以這么寫

只要我們?cè)俦竟?jié)中能明白觀察者和被觀察者之間是如何工作關(guān)聯(lián)的就可以


線程調(diào)度

關(guān)鍵的章節(jié)來(lái)了,看完上面的基礎(chǔ)知識(shí),很多人都會(huì)感覺(jué)就一個(gè)發(fā)送,一個(gè)接收,不就是個(gè)觀察者模式嘛,感覺(jué)一點(diǎn)卵用都沒(méi)有,還寫這么多回調(diào)方法!完全沒(méi)有看出什么優(yōu)點(diǎn)。那么這一節(jié)就讓你看到RxAndroid真正厲害的地方。

正常情況下, Observer和Observable是工作在同一個(gè)線程中的, 也就是說(shuō)Observable在哪個(gè)線程發(fā)事件, Observer就在哪個(gè)線程接收事件.
RxAndroid中, 當(dāng)我們?cè)谥骶€程中去創(chuàng)建一個(gè)Observable來(lái)發(fā)送事件, 則這個(gè)Observable默認(rèn)就在主線程發(fā)送事件.
當(dāng)我們?cè)谥骶€程去創(chuàng)建一個(gè)Observer來(lái)接收事件, 則這個(gè)Observer默認(rèn)就在主線程中接收事件,但其實(shí)在現(xiàn)實(shí)工作中我們更多的是需要進(jìn)行線程切換的,最常見(jiàn)的例子就是在子線程中請(qǐng)求網(wǎng)絡(luò)數(shù)據(jù),在主線程中進(jìn)行展示

要達(dá)到這個(gè)目的, 我們需要先改變Observable發(fā)送事件的線程, 讓它去子線程中發(fā)送事件, 然后再改變Observer的線程, 讓它去主線程接收事件. 通過(guò)RxAndroid內(nèi)置的線程調(diào)度器可以很輕松的做到這一點(diǎn). 接下來(lái)看一段代碼:

Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
            Log.d("kaelpu", "Observable thread is : " + Thread.currentThread().getName());
            Log.d("kaelpu", "emitter 1");
            emitter.onNext(1);
        }
    });

    Consumer<Integer> consumer = new Consumer<Integer>() {
        @Override
        public void accept(Integer integer) throws Exception {
            Log.d("kaelpu", "Observer thread is :" + Thread.currentThread().getName());
            Log.d("kaelpu", "onNext: " + integer);
        }
    };

    observable.subscribeOn(Schedulers.newThread())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(consumer);
}

Observable thread is : RxNewThreadScheduler-1
emitter 1
Observer thread is :main
onNext: 1

可以看到, observable發(fā)送事件的線程的確改變了, 是在一個(gè)叫 RxNewThreadScheduler-1的線程中發(fā)送的事件, 而consumer 仍然在主線程中接收事件, 這說(shuō)明我們的目的達(dá)成了, 接下來(lái)看看是如何做到的.

這段代碼只不過(guò)是增加了兩行代碼:

.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())

簡(jiǎn)單的來(lái)說(shuō), subscribeOn() 指定的是Observable發(fā)送事件的線程, observeOn() 指定的是Observer接收事件的線程.
多次指定Observable的線程只有第一次指定的有效, 也就是說(shuō)多次調(diào)用subscribeOn() 只有第一次的有效, 其余的會(huì)被忽略.
多次指定Observer的線程是可以的, 也就是說(shuō)每調(diào)用一次observeOn() , Observer的線程就會(huì)切換一次.例如:

observable.subscribeOn(Schedulers.newThread())
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .observeOn(Schedulers.io())
        .subscribe(consumer);

Observable thread is : RxNewThreadScheduler-1
emitter 1
Observer thread is :RxCachedThreadScheduler-2
onNext: 1

可以看到, Observable雖然指定了兩次線程, 但只有第一次指定的有效, 依然是在RxNewThreadScheduler線程中, 而Observer則跑到了RxCachedThreadScheduler 中, 這個(gè)CacheThread其實(shí)就是IO線程池中的一個(gè).
在 RxAndroid 中,提供了一個(gè)名為 Scheduler 的線程調(diào)度器,RxAndroid 內(nèi)部提供了4個(gè)調(diào)度器,分別是:

  • Schedulers.io(): I/O 操作(讀寫文件、數(shù)據(jù)庫(kù)、網(wǎng)絡(luò)請(qǐng)求等),與newThread()差不多,區(qū)別在于io() 的內(nèi)部實(shí)現(xiàn)是是用一個(gè)無(wú)數(shù)量上限的線程池,可以重用空閑的線程,因此多數(shù)情況下 io() 效率比 newThread() 更高。值得注意的是,在 io() 下,不要進(jìn)行大量的計(jì)算,以免產(chǎn)生不必要的線程;
  • Schedulers.newThread(): 開(kāi)啟新線程操作;
  • Schedulers.immediate(): 默認(rèn)指定的線程,也就是當(dāng)前線程;
  • Schedulers.computation():計(jì)算所使用的調(diào)度器。這個(gè)計(jì)算指的是 CPU 密集型計(jì)算,即不會(huì)被 I/O等操作限制性能的操作,例如圖形的計(jì)算。這個(gè) Scheduler 使用的固定的線程池,大小為 CPU 核數(shù)。值得注意的是,不要把 I/O 操作放在 computation() 中,否則 I/O 操作的等待時(shí)間會(huì)浪費(fèi) CPU;
  • AndroidSchedulers.mainThread(): Rxndroid 擴(kuò)展的 Android 主線程;

這些內(nèi)置的Scheduler已經(jīng)足夠滿足我們開(kāi)發(fā)的需求, 因此我們應(yīng)該使用內(nèi)置的這些選項(xiàng), 在RxAndroid內(nèi)部使用的是線程池來(lái)維護(hù)這些線程, 所有效率也比較高。

對(duì)于線程還需要注意

  • create() , just() , from() 等 --- 事件產(chǎn)生
  • map() , flapMap() , scan() , filter() 等 -- 事件加工
  • subscribe() -- 事件消費(fèi)

事件產(chǎn)生:默認(rèn)運(yùn)行在當(dāng)前線程,可以由 subscribeOn() 自定義線程
事件加工:默認(rèn)跟事件產(chǎn)生的線程保持一致, 可由 observeOn() 自定義線程
事件消費(fèi):默認(rèn)運(yùn)行在當(dāng)前線程,可以有observeOn() 自定義

好了說(shuō)了這么多了,我們來(lái)寫個(gè)簡(jiǎn)單的異步的例子,看看實(shí)際效果。我們這個(gè)例子就以加載網(wǎng)絡(luò)圖片并顯示為例:
首先我們寫一個(gè)耗時(shí)函數(shù),用來(lái)模擬圖片請(qǐng)求

    // 模擬網(wǎng)絡(luò)請(qǐng)求圖片
    private Drawable getDrawableFromUrl(String url){
        try {
            Thread.sleep(6000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return getResources().getDrawable(R.drawable.baidu);
    }

代碼很簡(jiǎn)答,就是線程sleep 6秒,然后返回一張圖片,如果運(yùn)行在主線程那就會(huì)NAR,然后我么來(lái)用RxAndroid寫一下這個(gè)異步拉去圖片并顯是的操作!

Observable.just(getDrawableFromNet("http://www.baidu.com/icon.png"))
        .subscribeOn(Schedulers.newThread())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Consumer<Drawable>() {
            @Override
            public void accept(Drawable drawable) throws Exception {
                ((ImageView)findViewById(R.id.imageView)).setImageDrawable(drawable);
            }
        });

就這幾行代碼就搞定了!自己結(jié)合上面講的理解一下~這里就不做解釋了!因?yàn)槲覀冞€有更重要的一個(gè)環(huán)節(jié),這個(gè)環(huán)節(jié)堪稱RxAndroid的精髓!

操作符的使用

在了解基本知識(shí)和線程調(diào)度后,我們來(lái)學(xué)習(xí)一下RxAndroid各種神奇的操作符

Map
Map是RxAndroid中最簡(jiǎn)單的一個(gè)變換操作符了, 它的作用就是對(duì)Observable發(fā)送的每一個(gè)事件應(yīng)用一個(gè)函數(shù), 使得每一個(gè)事件都按照指定的函數(shù)去變化。

Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
            emitter.onNext(1);
            emitter.onNext(2);
            emitter.onNext(3);
        }
    }).map(new Function<Integer, String>() {
        @Override
        public String apply(Integer integer) throws Exception {
            return "This is result " + integer;
        }
    }).subscribe(new Consumer<String>() {
        @Override
        public void accept(String s) throws Exception {
            Log.d("kaelpu", s);
        }
    });

This is result 1
This is result 2
This is result 3

通過(guò)Map, 可以將Observable發(fā)來(lái)的事件轉(zhuǎn)換為任意的類型, 可以是一個(gè)Object, 也可以是一個(gè)集合,功能非常強(qiáng)大

例子:還是以圖片加載的例子,我們傳進(jìn)來(lái)一個(gè)圖片的路徑,然后通過(guò)Map把drawble轉(zhuǎn)換成bitmap再發(fā)送給觀察者

Observable.just(getDrawableFromNet())
        .map(new Function<Drawable, Bitmap>() {
            @Override
            public Bitmap apply(@NonNull Drawable drawable) throws Exception {
                BitmapDrawable bt = (BitmapDrawable)drawable;
                return bt.getBitmap();
            }
        })
        .subscribeOn(AndroidSchedulers.mainThread())
        .observeOn(Schedulers.newThread())
        .subscribe(new Consumer<Bitmap>() {
            @Override
            public void accept(Bitmap bitmap) throws Exception {

            }
        });

Observable –> map變換 –> Observable
url -> drawable -> bitmap

不用到處調(diào)代碼,直接一個(gè)鏈?zhǔn)讲僮?.. 是不是感覺(jué)很爽!

ZIP
Zip通過(guò)一個(gè)函數(shù)將多個(gè)Observable發(fā)送的事件結(jié)合到一起,然后發(fā)送這些組合到一起的事件. 它按照嚴(yán)格的順序應(yīng)用這個(gè)函數(shù)。它只發(fā)射與發(fā)射數(shù)據(jù)項(xiàng)最少的那個(gè)Observable一樣多的數(shù)據(jù)。

Observable<Integer> observable1 = Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
            Log.d(TAG, "emitter 1");
            emitter.onNext(1);
            Log.d(TAG, "emitter 2");
            emitter.onNext(2);
            Log.d(TAG, "emitter 3");
            emitter.onNext(3);
            Log.d(TAG, "emitter 4");
            emitter.onNext(4);
            Log.d(TAG, "emit complete1");
            emitter.onComplete();
        }
    });

    Observable<String> observable2 = Observable.create(new ObservableOnSubscribe<String>() {
        @Override
        public void subscribe(ObservableEmitter<String> emitter) throws Exception {
            Log.d(TAG, "emitter A");
            emitter.onNext("A");
            Log.d(TAG, "emitter B");
            emitter.onNext("B");
            Log.d(TAG, "emitter C");
            emitter.onNext("C");
            Log.d(TAG, "emitter complete2");
            emitter.onComplete();
        }
    });

    Observable.zip(observable1, observable2, new BiFunction<Integer, String, String>() {
        @Override
        public String apply(Integer integer, String s) throws Exception {
            return integer + s;
        }
    }).subscribe(new Observer<String>() {
        @Override
        public void onSubscribe(Disposable d) {
            Log.d(TAG, "onSubscribe");
        }

        @Override
        public void onNext(String value) {
            Log.d(TAG, "onNext: " + value);
        }

        @Override
        public void onError(Throwable e) {
            Log.d(TAG, "onError");
        }

        @Override
        public void onComplete() {
            Log.d(TAG, "onComplete");
        }
    });

我們分別創(chuàng)建了observable, 一個(gè)發(fā)送1,2,3,4,Complete, 另一個(gè)發(fā)送A,B,C,Complete, 接著用Zip把發(fā)出的事件組合, 來(lái)看看運(yùn)行結(jié)果吧:

onSubscribe
emitter 1
emitter 2
emitter 3
emitter 4
emit complete1
emitter A
onNext: 1A
emitter B
onNext: 2B
emitter C
onNext: 3C
emitter complete2
onComplete

觀察發(fā)現(xiàn)observable1發(fā)送事件后,observable2才發(fā)送
這是因?yàn)槲覀儍蓚€(gè)observable都是運(yùn)行在同一個(gè)線程里, 同一個(gè)線程里執(zhí)行代碼肯定有先后順序呀.

from
在Rxndroid的from操作符到2.0已經(jīng)被拆分成了3個(gè),fromArray, fromIterable, fromFuture接收一個(gè)集合作為輸入,然后每次輸出一個(gè)元素給subscriber。

Observable.fromArray(new Integer[]{1, 2, 3, 4, 5}).subscribe(new Consumer<Integer>() {
    @Override
    public void accept(Integer integer) throws Exception {
        Log.i(TAG, "number:" + integer);
    }
});

number:1
number:2
number:3
number:4
number:5

注意:如果from()里面執(zhí)行了耗時(shí)操作,即使使用了subscribeOn(Schedulers.io()),仍然是在主線程執(zhí)行,可能會(huì)造成界面卡頓甚至崩潰,所以耗時(shí)操作還是使用Observable.create(…);

filter
條件過(guò)濾,去除不符合某些條件的事件。舉個(gè)栗子:

Observable.fromArray(new Integer[]{1, 2, 3, 4, 5})
       .filter(new Predicate<Integer>() {
           @Override
           public boolean test(Integer integer) throws Exception {
               // 偶數(shù)返回true,則表示剔除奇數(shù),留下偶數(shù)
               return integer % 2 == 0;

           }
       }).subscribe(new Consumer<Integer>() {
    @Override
    public void accept(Integer integer) throws Exception {
        Log.i(TAG, "number:" + integer);
    }
});

number:2
number:4

take
最多保留的事件數(shù)。

 Observable.just("1", "2", "6", "3", "4", "5").take(2).subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(String value) {
                Log.d(TAG,value);
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });

1
2

可以發(fā)現(xiàn)我們發(fā)送了6個(gè)String,最后只打印了前兩個(gè),這就是take過(guò)濾掉的結(jié)果

doOnNext
如果你想在處理下一個(gè)事件之前做某些事,就可以調(diào)用該方法

Observable.fromArray(new Integer[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12}).filter(new Predicate<Integer>() {
    @Override
    public boolean test(Integer integer) throws Exception {
        // 偶數(shù)返回true,則表示剔除奇數(shù)
        return integer % 2 == 0;
    }
})// 最多保留三個(gè),也就是最后剩三個(gè)偶數(shù)
        .take(3).doOnNext(new Consumer<Integer>() {
    @Override
    public void accept(Integer integer) throws Exception {
        // 在輸出偶數(shù)之前輸出它的hashCode
        Log.i(TAG, "hahcode = " + integer.hashCode() + "");
    }
}).subscribe(new Observer<Integer>() {
    @Override
    public void onSubscribe(Disposable d) {

    }

    @Override
    public void onNext(Integer value) {
        Log.i(TAG, "number = " + value);
    }

    @Override
    public void onError(Throwable e) {

    }

    @Override
    public void onComplete() {

    }
});

hahcode = 2
number = 2
hahcode = 4
number = 4
hahcode = 6
number = 6


針對(duì)Android的一些擴(kuò)展

RxAndroid是RxJava的一個(gè)針對(duì)Android平臺(tái)的擴(kuò)展。它包含了一些能夠簡(jiǎn)化Android開(kāi)發(fā)的工具。
首先,AndroidSchedulers提供了針對(duì)Android的線程系統(tǒng)的調(diào)度器。需要在UI線程中運(yùn)行某些代碼?很簡(jiǎn)單,只需要使用AndroidSchedulers.mainThread():

retrofitService.getImage(url)
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(bitmap -> myImageView.setImageBitmap(bitmap));

接著要介紹的就是AndroidObservable,它提供了跟多的功能來(lái)配合Android的生命周期。bindActivity()和bindFragment()方法默認(rèn)使用AndroidSchedulers.mainThread()來(lái)執(zhí)行觀察者代碼,這兩個(gè)方法會(huì)在Activity或者Fragment結(jié)束的時(shí)候通知被觀察者停止發(fā)出新的消息。

AndroidObservable.bindActivity(this, retrofitService.getImage(url))
    .subscribeOn(Schedulers.io())
    .subscribe(bitmap -> myImageView.setImageBitmap(bitmap);

我自己也很喜歡AndroidObservable.fromBroadcast()方法,它允許你創(chuàng)建一個(gè)類似BroadcastReceiver的Observable對(duì)象。下面的例子展示了如何在網(wǎng)絡(luò)變化的時(shí)候被通知到:

IntentFilter filter = new IntentFilter(ConnectivityManager.CONNECTIVITY_ACTION);
AndroidObservable.fromBroadcast(context, filter)
    .subscribe(intent -> handleConnectivityChange(intent));

最后要介紹的是ViewObservable,使用它可以給View添加了一些綁定。如果你想在每次點(diǎn)擊view的時(shí)候都收到一個(gè)事件,可以使用ViewObservable.clicks(),或者你想監(jiān)聽(tīng)TextView的內(nèi)容變化,可以使用ViewObservable.text()

ViewObservable.clicks(mCardNameEditText, false)
    .subscribe(view -> handleClick(view));

RxAndroid的一些使用場(chǎng)景

這里總結(jié)了一些很合適使用RxAndroid的場(chǎng)景,供大家打開(kāi)腦洞~分享時(shí)候有時(shí)間給大家看看demo

  1. 界面需要等到多個(gè)接口并發(fā)取完數(shù)據(jù),再更新
Observable<String> observable1 = Observable.create(new ObservableOnSubscribe<String>() {
        @Override
        public void subscribe(ObservableEmitter<String> e) throws Exception {
            e.onNext("haha");
        }
    }).subscribeOn(Schedulers.newThread());

    Observable<String> observable2 = Observable.create(new ObservableOnSubscribe<String>() {
        @Override
        public void subscribe(ObservableEmitter<String> e) throws Exception {
            e.onNext("hehe");
        }
    }).subscribeOn(Schedulers.newThread());


    Observable.merge(observable1, observable2)
            .subscribeOn(Schedulers.newThread())
            .subscribe(new Observer<String>() {
                @Override
                public void onSubscribe(Disposable d) {

                }

                @Override
                public void onNext(String value) {
                    Log.d(TAG,value);
                }

                @Override
                public void onError(Throwable e) {

                }

                @Override
                public void onComplete() {

                }
            });
  1. 界面按鈕需要防止連續(xù)點(diǎn)擊的情況
RxView.clicks(button)
        .throttleFirst(1, TimeUnit.SECONDS)
        .subscribe(new Observer<Object>() {
            @Override
            public void onCompleted() {

            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onNext(Object o) {
                Log.i(TAG, "do clicked!");
            }
        });
  1. 響應(yīng)式的界面 比如勾選了某個(gè)checkbox,自動(dòng)更新對(duì)應(yīng)的preference

SharedPreferences preferences = PreferenceManager.getDefaultSharedPreferences(context);
RxSharedPreferences rxPreferences = RxSharedPreferences.create(preferences);

Preference<String> username = rxPreferences.getString("username");
Preference<Boolean> showWhatsNew = rxPreferences.getBoolean("show-whats-new", true);

username.asObservable().subscribe(new Action1<String>() {
  @Override public void call(String username) {
    Log.d(TAG, "Username: " + username);  讀取到當(dāng)前值
  }
}

RxCompoundButton.checks(showWhatsNewView)
    .subscribe(showWhatsNew.asAction());

最后的話

通過(guò)本篇文章,大家應(yīng)該對(duì)RxAndroid有個(gè)大體的認(rèn)識(shí)了,也應(yīng)該體會(huì)到它在異步操作,代碼鏈?zhǔn)綍?shū)寫等方面的優(yōu)勢(shì)了。需要注意的是由于RxJava存在理解的門檻,貿(mào)然引入項(xiàng)目要確保協(xié)同開(kāi)發(fā)的人員也都對(duì)Rxjava有所了解~


[參考資料]

  1. 響應(yīng)式編程簡(jiǎn)介
  2. 深入淺出RxJava
  3. RxPreferences 簡(jiǎn)單整理
  4. RxBinding安卓UI響應(yīng)式編程
  5. 給 Android 開(kāi)發(fā)者的 RxJava 詳解
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容