響應(yīng)式編程在Android中的應(yīng)用

響應(yīng)式編程簡(jiǎn)介

  • 響應(yīng)式編程是一種基于異步數(shù)據(jù)流概念的編程模式。數(shù)據(jù)流就像一條河:它可以被觀測(cè),被過濾,被操作,或者為新的消費(fèi)者與另外一條流合并為一條新的流。
  • 響應(yīng)式編程的一個(gè)關(guān)鍵概念是事件。事件可以被等待,可以觸發(fā)過程,也可以觸發(fā)其它事件。事件是唯一的以合適的方式將我們的現(xiàn)實(shí)世界映射到我們的軟件中:如果屋里太熱了我們就打開一扇窗戶。同樣的,當(dāng)我們更改電子表(變化的傳播)中的一些數(shù)值時(shí),我們需要更新整個(gè)表格或者我們的機(jī)器人碰到墻時(shí)會(huì)轉(zhuǎn)彎(響應(yīng)事件)。
  • 今天,響應(yīng)式編程最通用的一個(gè)場(chǎng)景是UI:我們的移動(dòng)App必須做出對(duì)網(wǎng)絡(luò)調(diào)用、用戶觸摸輸入和系統(tǒng)彈框的響應(yīng)。在這個(gè)世界上,軟件之所以是事件驅(qū)動(dòng)并響應(yīng)的是因?yàn)楝F(xiàn)實(shí)生活也是如此。

響應(yīng)式編程的具體實(shí)現(xiàn) - RxJava

基本概念

RxJava的四種角色

  • Observable
  • Observer
  • Subscriber
  • Subject

Observable和Subject是兩個(gè)“生產(chǎn)”實(shí)體,Observer和Subscriber是兩個(gè)“消費(fèi)”實(shí)體。

熱Observable和冷Observable

從發(fā)射物的角度來看,有兩種不同的Observable:熱的和冷的。一個(gè)"熱"的Observable典型的只要一創(chuàng)建完就開始發(fā)射數(shù)據(jù),因此所有后續(xù)訂閱它的觀察者可能從序列中間的某個(gè)位置開始接受數(shù)據(jù)(有一些數(shù)據(jù)錯(cuò)過了)。一個(gè)"冷"的Observable會(huì)一直等待,直到有觀察者訂閱它才開始發(fā)射數(shù)據(jù),因此這個(gè)觀察者可以確保會(huì)收到整個(gè)數(shù)據(jù)序列。

Observable創(chuàng)建符

  • Observable.create()
Observable.create(new Observable.OnSubscribe<Object>(){
    @Override
    public void call(Subscriber<? super Object> subscriber{
    }
});
  • Observable.from()
    from() 創(chuàng)建符可以從一個(gè)列表/數(shù)組來創(chuàng)建Observable,并一個(gè)接一個(gè)的從列表/數(shù)組中發(fā)射出來每一個(gè)對(duì)象,或者也可以從Java Future 類來創(chuàng)建Observable,并發(fā)射Future對(duì)象的 .get() 方法返回的結(jié)果值。傳入 Future 作為參數(shù)時(shí),我們可以指定一個(gè)超時(shí)的值。Observable將等待來自 Future 的結(jié)果;如果在超時(shí)之前仍然沒有結(jié)果返回,Observable將會(huì)觸發(fā) onError() 方法通知觀察者有錯(cuò)誤發(fā)生了。

    List<Integer> items = new ArrayList<Integer>();
    items.add(1);
    items.add(10);
    items.add(100);
    items.add(200);
    
    Observable<Integer> observableString = Observable.from(items);
    Subscription subscriptionPrint = observableString.subscribe(new        Observer<Integer>() {
      @Override
      public void onCompleted() {
      System.out.println("Observable completed");
      }
      @Override
      public void onError(Throwable e) {
      System.out.println("Oh,no! Something wrong happened!");
      }
      @Override
      public void onNext(Integer item) {
      System.out.println("Item is " + item);
      }
    });
    
  • Observable.just()
    just() 方法可以傳入一到九個(gè)參數(shù),它們會(huì)按照傳入的參數(shù)的順序來發(fā)射它們。 just() 方法也可以接受列表或數(shù)組,就像 from() 方法,但是它不會(huì)迭代列表發(fā)射每個(gè)值,它將會(huì)發(fā)射整個(gè)列表。通常,當(dāng)我們想發(fā)射一組已經(jīng)定義好的值時(shí)會(huì)用到它。但是如果我們的函數(shù)不是時(shí)變性的,我們可以用just來創(chuàng)建一個(gè)更有組織性和可測(cè)性的代碼庫。

Observable<String> observableString = Observable.just(helloWorld
());
Subscription subscriptionPrint = observableString.subscribe(new
Observer<String>() {
    @Override
    public void onCompleted() {
    System.out.println("Observable completed");
    }
    @Override
    public void onError(Throwable e) {
    System.out.println("Oh,no! Something wrong happened!");
    }
    @Override
    public void onNext(String message) {
    System.out.println(message);
    }
});

helloWorld() 方法比較簡(jiǎn)單,像這樣:

private String helloWorld(){
    return "Hello World";
}

Subject

Subject 既可以是 Observable,也可以是 Observer。
RxJava 提供四種不同的 Subject :

  • PublishSubject
  • BehaviorSubject
    BehaviorSubject會(huì)首先向他的訂閱者發(fā)送截至訂閱前最新的一個(gè)數(shù)據(jù)對(duì)象(或初始值),然后正常發(fā)送訂閱后的數(shù)據(jù)流。

BehaviorSubject<Integer> behaviorSubject = BehaviorSubject.create(1);
```
在這個(gè)短例子中,我們創(chuàng)建了一個(gè)能發(fā)射整形(Integer)的BehaviorSubject。由于每當(dāng)Observes訂閱它時(shí)就會(huì)發(fā)射最新的數(shù)據(jù),所以它需要一個(gè)初始值。

  • ReplaySubject
    ReplaySubject 會(huì)緩存它所訂閱的所有數(shù)據(jù),向任意一個(gè)訂閱它的觀察者重發(fā):

ReplaySubject<Integer> replaySubject = ReplaySubject.create();
```

  • AsyncSubject

    當(dāng)Observable完成時(shí)AsyncSubject只會(huì)發(fā)布最后一個(gè)數(shù)據(jù)給已經(jīng)訂閱的每一個(gè)觀察者。

    AsyncSubject<Integer> asyncSubject = AsyncSubject.create();
    

直接創(chuàng)建 Observable

在我們的第一個(gè)列子里,我們將檢索安裝的應(yīng)用列表并填充RecycleView的item來展示它們。我們也設(shè)想一個(gè)下拉刷新的功能和一個(gè)進(jìn)度條來告知用戶當(dāng)前任務(wù)正在執(zhí)行。

首先,我們創(chuàng)建Observable。我們需要一個(gè)函數(shù)來檢索安裝的應(yīng)用程序列表并把它提供給我們的觀察者。我們一個(gè)接一個(gè)的發(fā)射這些應(yīng)用程序數(shù)據(jù),將它們分組到一個(gè)單獨(dú)的列表中,以此來展示響應(yīng)式方法的靈活性。

private Observable<AppInfo> getApps(){
    return Observable.create(subscriber -> {
        List<AppInfoRich> apps = new ArrayList<AppInfoRich>();
        final Intent mainIntent = new Intent(Intent.ACTION_MAIN, null);
        mainIntent.addCategory(Intent.CATEGORY_LAUNCHER);
        List<ResolveInfo> infos = getActivity().queryIntentActivities(mainIntent, 0);
        for(ResolveInfo info : infos){
            apps.add(new AppInfoRich(getActivity(),info));
        }
        for (AppInfoRich appInfo:apps) {
            Bitmap icon = Utils.drawableToBitmap(appInfo.getIcon());
            String name = appInfo.getName();
            String iconPath = mFilesDir + "/" + name;
            Utils.storeBitmap(App.instance, icon,name);
            if (subscriber.isUnsubscribed()){
                return;
            }
            subscriber.onNext(new AppInfo(name, iconPath, appInfo.getLastUpdateTime()));
        }
        if (!subscriber.isUnsubscribed()){
            subscriber.onCompleted();
        }
    });
}

AppInfo為App信息的實(shí)體類,包括上次更新時(shí)間、圖標(biāo)、名字三個(gè)屬性,此處省略。

需要重點(diǎn)注意的是在發(fā)射新的數(shù)據(jù)或者完成序列之前要檢測(cè)觀察者的訂閱情況。這樣的話代碼會(huì)更高效,因?yàn)槿绻麤]有觀察者等待時(shí)我們就不生成沒有必要的數(shù)據(jù)項(xiàng)。

接下來,我們來定義下拉刷新的方法:

private void refreshTheList() {
    getApps().toSortedList()
    .subscribe(new Observer<List<AppInfo>>() {
    @Override
    public void onCompleted() {
        Toast.makeText(getActivity(), "Here is the list!", Toast.LENGTH_LONG).show();
    }
    @Override
    public void onError(Throwable e) {
        Toast.makeText(getActivity(), "Something went wrong!", Toast.LENGTH_SHORT).show();
        mSwipeRefreshLayout.setRefreshing(false);
    }
    @Override
    public void onNext(List<AppInfo> appInfos) {
        mRecyclerView.setVisibility(View.VISIBLE);
        mAdapter.addApplications(appInfos);
        mSwipeRefreshLayout.setRefreshing(false);
    }
    });
}

從列表創(chuàng)建 Observable

在這個(gè)例子中,我們將引入 from() 函數(shù)。使用這個(gè)特殊的“創(chuàng)建”函數(shù),我們可以從一個(gè)列表中創(chuàng)建一個(gè)Observable。Observable將發(fā)射出列表中的每一個(gè)元素,我們可以通過訂閱它們來對(duì)這些發(fā)出的元素做出響應(yīng)。

private void loadList(List<AppInfo> apps) {
    mRecyclerView.setVisibility(View.VISIBLE);
    Observable.from(apps).subscribe(new Observer<AppInfo>() {
        @Override
        public void onCompleted() {
            mSwipeRefreshLayout.setRefreshing(false);
            Toast.makeText(getActivity(), "Here is the list!", Toast.LENGTH_LONG).show();
        }
        @Override
        public void onError(Throwable e) {
            Toast.makeText(getActivity(), "Something went wrong!", Toast.LENGTH_SHORT).show();
            mSwipeRefreshLayout.setRefreshing(false);
        }
        @Override
        public void onNext(AppInfo appInfo) {
            mAddedApps.add(appInfo);
            mAdapter.addApplication(mAddedApps.size() - 1, appInfo);
        }
    });
}

和第一個(gè)例子一個(gè)主要的不同是我們?cè)?onCompleted() 函數(shù)中停掉進(jìn)度條是因?yàn)槲覀円粋€(gè)一個(gè)的發(fā)射元素;
第一個(gè)例子中的Observable發(fā)射的是整個(gè)list,因此在 onNext() 函數(shù)中停掉進(jìn)度條的做法是安全的。

具有特殊功能的創(chuàng)建符

  • just()

    你可以將一個(gè)函數(shù)作為參數(shù)傳給 just() 方法,你將會(huì)得到一個(gè)已存在代碼的原始Observable版本。在一個(gè)新的響應(yīng)式架構(gòu)的基礎(chǔ)上遷移已存在的代碼,這個(gè)方法可能是一個(gè)有用的開始點(diǎn)。

  • repeat()

    假如你想對(duì)一個(gè)Observable重復(fù)發(fā)射三次數(shù)據(jù) :

    Observable.just(appOne,appTwo,appThree)
        .repeat(3)
        .subscribe();
    

    我們?cè)?just() 創(chuàng)建Observable后追加了 repeat(3) ,它將會(huì)創(chuàng)建9個(gè)元素的序列,每一個(gè)都單獨(dú)發(fā)射。

  • defer()

    有這樣一個(gè)場(chǎng)景,你想在這聲明一個(gè)Observable但是你又想推遲這個(gè)Observable的創(chuàng)建直到觀察者訂閱時(shí)??聪旅娴?getInt() 函數(shù):

    private Observable<Integer> getInt(){
        return Observable.create(subscriber -> {
            if(subscriber.isUnsubscribed()){
                return;
            }
            App.L.debug("GETINT");
            subscriber.onNext(42);
            subscriber.onCompleted();
        });
    }
    

    這比較簡(jiǎn)單,并且它沒有做太多事情,但是它正好為我們服務(wù)?,F(xiàn)在,我們可以創(chuàng)建一個(gè)新的Observable并且應(yīng)用 defer() :

    Observable<Integer> deferred = Observable.defer(this::getInt);
    

    這次, deferred 存在,但是 getInt() create() 方法還沒有調(diào)用 : logcat日志也沒有“GETINT”打印出來 :

    deferred.subscribe(number -> {
        App.L.debug(String.valueOf(number));
    });
    

    但是一旦我們訂閱了, create() 方法就會(huì)被調(diào)用并且我們也可以在logcat日志中打印出兩個(gè)值:GETINT 和 42。

  • range()

    從一個(gè)指定的數(shù)字X開始發(fā)射N個(gè)數(shù)字。range() 函數(shù)用兩個(gè)數(shù)字作為參數(shù):第一個(gè)是起始點(diǎn),第二個(gè)是我們想發(fā)射數(shù)字的個(gè)數(shù)。

  • interval()

    interval() 函數(shù)在你需要?jiǎng)?chuàng)建一個(gè)輪詢程序時(shí)非常好用。interval() 函數(shù)的兩個(gè)參數(shù):一個(gè)指定兩次發(fā)射的時(shí)間間隔,另一個(gè)是用到的時(shí)間單位。

  • timer()

    如果你需要一個(gè)一段時(shí)間之后才發(fā)射的Observable,你可以使用 timer()。

過濾Observables

過濾序列

RxJava讓我們使用 filter() 方法來過濾我們觀測(cè)序列中不想要的值。

我們從發(fā)出的每個(gè)元素中過濾掉開頭字母不是C的 :

.filter(new Func1<AppInfo,Boolean>(){
    @Override
    public Boolean call(AppInfo appInfo){
        return appInfo.getName().startsWith("C");
    }
})

我們傳一個(gè)新的 Func1 對(duì)象給 filter() 函數(shù),即只有一個(gè)參數(shù)的函數(shù)。 Func1 有一個(gè) AppInfo 對(duì)象來作為它的參數(shù)類型并且返回 Boolean 對(duì)象。只要條件符合 filter() 函數(shù)就會(huì)返回 true 。此時(shí),值會(huì)發(fā)射出去并且所有的觀察者都會(huì)接收到。

filter() 函數(shù)最常用的用法之一時(shí)過濾 null 對(duì)象:

.filter(new Func1<AppInfo,Boolean>(){
    @Override
    public Boolean call(AppInfo appInfo){
        return appInfo != null;
    }
})

它幫我們免去了在 onNext() 函數(shù)調(diào)用中再去檢測(cè) null 值,讓我們把注意力集中在應(yīng)用業(yè)務(wù)邏輯上。

獲取我們需要的數(shù)據(jù)

當(dāng)我們不需要整個(gè)序列時(shí),而是只想取開頭或結(jié)尾的幾個(gè)元素,我們可以用 take() 或 takeLast() 。

  • take()

    take() 函數(shù)用整數(shù)N來作為一個(gè)參數(shù),從原始的序列中發(fā)射前N個(gè)元素,然后完成:

    Observable.from(apps)
        .take(3)
        .subscribe(...);
    
  • takeLast()

    如果我們想要最后N個(gè)元素,我們只需使用 takeLast() 函數(shù):

    Observable.from(apps)
        .takeLast(3)
        .subscribe(...);
    

有且僅有一次

  • distinct()

    就像 takeLast() 一樣, distinct() 作用于一個(gè)完整的序列,然后得到重復(fù)的過濾項(xiàng),它需要記錄每一個(gè)發(fā)射的值。如果你在處理一大堆序列或者大的數(shù)據(jù)記得關(guān)注內(nèi)存使用情況。

    Observable<AppInfo> fullOfDuplicates = Observable.from(apps)
        .take(3)
        .repeat(3);
    fullOfDuplicates.distinct()
        .subscribe(...);
    
  • ditinctUntilChanged()

    如果在一個(gè)可觀測(cè)序列發(fā)射一個(gè)不同于之前的一個(gè)新值時(shí)讓我們得到通知這時(shí)候該怎么做?ditinctUntilChanged() 過濾函數(shù)能做到這一點(diǎn)。它能輕易的忽略掉所有的重復(fù)并且只發(fā)射出新的值。

First and last

first() 方法和 last() 方法很容易弄明白。它們從Observable中只發(fā)射第一個(gè)元素或者最后一個(gè)元素。這兩個(gè)都可以傳 Func1 作為參數(shù)。
與 first() 和 last() 相似的變量有: firstOrDefault() 和 lastOrDefault() 。這兩個(gè)函數(shù)當(dāng)可觀測(cè)序列完成時(shí)不再發(fā)射任何值時(shí)用得上。在這種場(chǎng)景下,如果Observable不再發(fā)射任何值時(shí)我們可以指定發(fā)射一個(gè)默認(rèn)的值。

Skip and SkipLast

skip() 和 skipLast() 函數(shù)與 take() 和 takeLast() 相對(duì)應(yīng)。它們用整數(shù)N作參數(shù),從本質(zhì)上來說,它們不讓Observable發(fā)射前N個(gè)或者后N個(gè)值。

ElementAt

如果我們只想要可觀測(cè)序列發(fā)射的第五個(gè)元素該怎么辦? elementAt() 函數(shù)僅從一個(gè)序列中發(fā)射第n個(gè)元素然后就完成了。
如果我們想查找第五個(gè)元素但是可觀測(cè)序列只有三個(gè)元素可供發(fā)射時(shí)該怎么辦?我們可以使用 elementAtOrDefault() 。

Sampling

在Observable后面加一個(gè) sample() ,我們將創(chuàng)建一個(gè)新的可觀測(cè)序列,它將在一個(gè)指定的時(shí)間間隔里由Observable發(fā)射最近一次的數(shù)值:

Observable<Integer> sensor = [...]
sensor.sample(30,TimeUnit.SECONDS)
    .subscribe(...);

如果我們想讓它定時(shí)發(fā)射第一個(gè)元素而不是最近的一個(gè)元素,我們可以使用 throttleFirst() 。

Timeout

我們可以使用 timeout() 函數(shù)來監(jiān)聽源可觀測(cè)序列,就是在我們?cè)O(shè)定的時(shí)間間隔內(nèi)如果沒有得到一個(gè)值則發(fā)射一個(gè)錯(cuò)誤。我們可以認(rèn)為 timeout() 為一個(gè)Observable的限時(shí)的副本。如果在指定的時(shí)間間隔內(nèi)Observable不發(fā)射值的話,它監(jiān)聽的原始的Observable時(shí)就會(huì)觸發(fā) onError() 函數(shù)。

Subscription subscription = getCurrentTemperature()
    .timeout(2,TimeUnit.SECONDS)
    .subscribe(...);

Debounce

debounce() 函數(shù)過濾掉由Observable發(fā)射的速率過快的數(shù)據(jù);如果在一個(gè)指定的時(shí)間間隔過去了仍舊沒有發(fā)射一個(gè),那么它將發(fā)射最后的那個(gè)。

下圖展示了多久從Observable發(fā)射一次新的數(shù)據(jù), debounce() 函數(shù)開啟一個(gè)內(nèi)部定時(shí)器,如果在這個(gè)時(shí)間間隔內(nèi)沒有新的據(jù)發(fā)射,則新的Observable發(fā)射出最后一個(gè)數(shù)據(jù):

debounce() 函數(shù)示意圖

變換Observables

*map家族

RxJava提供了幾個(gè)mapping函數(shù): map() , flatMap() , concatMap() , flatMapIterable() 以及 switchMap() .所有這些函數(shù)都作用于一個(gè)可觀測(cè)序列,然后變換它發(fā)射的值,最后用一種新的形式返回它們。

  • Map

    RxJava的 map 函數(shù)接收一個(gè)指定的 Func 對(duì)象然后將它應(yīng)用到每一個(gè)由Observable發(fā)射的值上。

    Observable.from(apps)
        .map(new Func1<AppInfo,AppInfo>(){
            @Override
            public Appinfo call(AppInfo appInfo){
                String currentName = appInfo.getName();
                String lowerCaseName = currentName.toLowerCase();
                appInfo.setName(lowerCaseName);
                return appInfo;
            }
        })
        .subscribe(...);
    

    正如你看到的,像往常一樣創(chuàng)建我們發(fā)射的Observable之后,我們追加一個(gè) map 調(diào)用,我們創(chuàng)建一個(gè)簡(jiǎn)單的函數(shù)來更新 AppInfo對(duì)象并提供一個(gè)名字小寫的新版本給觀察者。

  • FlatMap

    在復(fù)雜的場(chǎng)景中,我們有一個(gè)這樣的Observable:它發(fā)射一個(gè)數(shù)據(jù)序列,這些數(shù)據(jù)本身也可以發(fā)射Observable。RxJava的 flatMap() 函數(shù)提供一種鋪平序列的方式,然后合并這些Observables發(fā)射的數(shù)據(jù),最后將合并后的結(jié)果作為最終的Observable。

    flatMap() 函數(shù)示意圖

    當(dāng)我們?cè)谔幚砜赡苡写罅康腛bservables時(shí),重要是記住任何一個(gè)Observables發(fā)生錯(cuò)誤的情況, flatMap() 將會(huì)觸發(fā)它自己的 onError() 函數(shù)并放棄整個(gè)鏈。重要的一點(diǎn)提示是關(guān)于合并部分:它允許交叉。正如上圖所示,這意味著 flatMap() 不能夠保證在最終生成的Observable中源Observables確切的發(fā)射順序。

  • ConcatMap

    RxJava的 concatMap() 函數(shù)解決了 flatMap() 的交叉問題,提供了一種能夠把發(fā)射的值連續(xù)在一起的鋪平函數(shù),而不是合并它們,如下圖所示:

    這里寫圖片描述
  • FlatMapIterable

    作為*map家族的一員, flatMapInterable() 和 flatMap() 很像。僅有的本質(zhì)不同是它將源數(shù)據(jù)兩兩結(jié)成對(duì)并生成Iterable,而不是原始數(shù)據(jù)項(xiàng)和生成的Observables。

  • SwitchMap

    switchMap() 和 flatMap() 很像,除了一點(diǎn):每當(dāng)源Observable發(fā)射一個(gè)新的數(shù)據(jù)項(xiàng)(Observable)時(shí),它將取消訂閱并停止監(jiān)視之前那個(gè)數(shù)據(jù)項(xiàng)產(chǎn)生的Observable,并開始監(jiān)視當(dāng)前發(fā)射的這一個(gè)。

  • Scan

    RxJava的 scan() 函數(shù)可以看做是一個(gè)累積函數(shù)。 scan() 函數(shù)對(duì)原始Observable發(fā)射的每一項(xiàng)數(shù)據(jù)都應(yīng)用一個(gè)函數(shù),計(jì)算出函數(shù)的結(jié)果值,并將該值填充回可觀測(cè)序列,等待和下一次發(fā)射的數(shù)據(jù)一起使用。

    作為一個(gè)通用的例子,給出一個(gè)累加器:

    Observable.just(1,2,3,4,5)
        .scan((sum,item) -> sum + item)
        .subscribe(new Subscriber<Integer>() {
            @Override
            public void onCompleted() {
                Log.d("RXJAVA", "Sequence completed.");
            }
            @Override
            public void onError(Throwable e) {
                Log.e("RXJAVA", "Something went south!");
            }
            @Override
            public void onNext(Integer item) {
                Log.d("RXJAVA", "item is: " + item);
            }
        });
    

    我們得到的結(jié)果是:

    RXJAVA: item is: 1
    RXJAVA: item is: 3
    RXJAVA: item is: 6
    RXJAVA: item is: 10
    RXJAVA: item is: 15
    RXJAVA: Sequence completed.

GroupBy

RxJava提供了一個(gè)有用的函數(shù)從列表中按照指定的規(guī)則: groupBy() 來分組元素。下圖中的例子展示了 groupBy() 如何將發(fā)射的值根據(jù)他們的形狀來進(jìn)行分組。

這里寫圖片描述

這個(gè)函數(shù)將源Observable變換成一個(gè)發(fā)射Observables的新的Observable。它們中的每一個(gè)新的Observable都發(fā)射一組指定的數(shù)據(jù)。

為了創(chuàng)建一個(gè)分組了的已安裝應(yīng)用列表,我們?cè)?loadList() 函數(shù)中引入了一個(gè)新的元素:

Observable<GroupedObservable<String,AppInfo>> groupedItems = Observable.from(apps)
            .groupBy(new Func1<AppInfo,String>(){
                @Override
                public String call(AppInfo appInfo){
                    SimpleDateFormat formatter = new SimpleDateFormat("MM/yyyy");
                    return formatter.format(new Date(appInfo.getLastUpdateTime()));
                }
            });

現(xiàn)在我們創(chuàng)建了一個(gè)新的Observable, groupedItems ,它將會(huì)發(fā)射一個(gè)帶有 GroupedObservable 的序列。 GroupedObservable 是一個(gè)特殊的Observable,它源自一個(gè)分組的key。在這個(gè)例子中,key就是 String ,代表的意思是 Month/Year 格式化的最近更新日期。

Buffer

RxJava中的 buffer() 函數(shù)將源Observable變換一個(gè)新的Observable,這個(gè)新的Observable每次發(fā)射一組列表值而不是一個(gè)一個(gè)發(fā)射。

buffer() 函數(shù)有幾種變體。其中有一個(gè)是允許你指定一個(gè) skip 值:此后每 skip 項(xiàng)數(shù)據(jù),用count項(xiàng)數(shù)據(jù)填充緩沖區(qū)。另一個(gè)是buffer() 帶一個(gè) timespan 的參數(shù),會(huì)創(chuàng)建一個(gè)每隔timespan時(shí)間段就會(huì)發(fā)射一個(gè)列表的Observable。

Window

RxJava的 window() 函數(shù)和 buffer() 很像,但是它發(fā)射的是Observable而不是列表。

正如 buffer() 一樣, window() 也有一個(gè) skip 變體。

Cast

cast() 函數(shù)是 map() 操作符的特殊版本。它將源Observable中的每一項(xiàng)數(shù)據(jù)都轉(zhuǎn)換為新的類型,把它變成了不同的 Class 。

組合Observables

Merge

在”異步的世界“中經(jīng)常會(huì)創(chuàng)建這樣的場(chǎng)景,我們有多個(gè)來源但是又只想有一個(gè)結(jié)果:多輸入,單輸出。RxJava的 merge() 方法將幫助你把兩個(gè)甚至更多的Observables合并到他們發(fā)射的數(shù)據(jù)項(xiàng)里。下圖給出了把兩個(gè)序列合并在一個(gè)最終發(fā)射的Observable。

這里寫圖片描述

正如你看到的那樣,發(fā)射的數(shù)據(jù)被交叉合并到一個(gè)Observable里面。注意如果你同步的合并Observable,它們將連接在一起并且不會(huì)交叉。

Observable<AppInfo> mergedObserbable = Observable.merge(observableApps,observableReversedApps);
mergedObserbable.subscribe(...);

注意錯(cuò)誤時(shí)的toast消息,你可以認(rèn)為每個(gè)Observable拋出的錯(cuò)誤都將會(huì)打斷合并。如果你需要避免這種情況,RxJava提供了 mergeDelayError() ,它能從一個(gè)Observable中繼續(xù)發(fā)射數(shù)據(jù)即便是其中有一個(gè)拋出了錯(cuò)誤。當(dāng)所有的Observables都完成時(shí), mergeDelayError() 將會(huì)發(fā)射 onError()。

ZIP

在一種新的可能場(chǎng)景中處理多個(gè)數(shù)據(jù)來源時(shí)會(huì)帶來:多從個(gè)Observables接收數(shù)據(jù),處理它們,然后將它們合并成一個(gè)新的可觀測(cè)序列來使用。RxJava有一個(gè)特殊的方法可以完成: zip() 合并兩個(gè)或者多個(gè)Observables發(fā)射出的數(shù)據(jù)項(xiàng),根據(jù)指定的函數(shù)Func* 變換它們,并發(fā)射一個(gè)新值。下圖展示了 zip() 方法如何處理發(fā)射的“numbers”和“l(fā)etters”然后將它們合并一個(gè)新的數(shù)據(jù)項(xiàng):

這里寫圖片描述
Observable.zip(observableApp, tictoc, (AppInfo appInfo, Long time) -> updateTitle(appInfo, time))
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(...);

zip() 函數(shù)有三個(gè)參數(shù):兩個(gè)Observables和一個(gè) Func2 。

Join

前面兩個(gè)方法, zip() 和 merge() 方法作用在發(fā)射數(shù)據(jù)的范疇內(nèi),在決定如何操作值之前有些場(chǎng)景我們需要考慮時(shí)間的。RxJava的 join() 函數(shù)基于時(shí)間窗口將兩個(gè)Observables發(fā)射的數(shù)據(jù)結(jié)合在一起。

這里寫圖片描述

為了正確的理解上一張圖,我們解釋下 join() 需要的參數(shù):

  • 第二個(gè)Observable和源Observable結(jié)合。
  • Func1 參數(shù):在指定的由時(shí)間窗口定義時(shí)間間隔內(nèi),源Observable發(fā)射的數(shù)據(jù)和從第二個(gè)Observable發(fā)射的數(shù)據(jù)相互配合返回的Observable。
  • Func1 參數(shù):在指定的由時(shí)間窗口定義時(shí)間間隔內(nèi),第二個(gè)Observable發(fā)射的數(shù)據(jù)和從源Observable發(fā)射的數(shù)據(jù)相互配合返回的Observable。
  • Func2 參數(shù):定義已發(fā)射的數(shù)據(jù)如何與新發(fā)射的數(shù)據(jù)項(xiàng)相結(jié)合。

combineLatest

RxJava的 combineLatest() 函數(shù)有點(diǎn)像 zip() 函數(shù)的特殊形式。正如我們已經(jīng)學(xué)習(xí)的, zip() 作用于最近未打包的兩個(gè)Observables。相反, combineLatest() 作用于最近發(fā)射的數(shù)據(jù)項(xiàng):如果 Observable1 發(fā)射了A并且 Observable2 發(fā)射了B和C, combineLatest() 將會(huì)分組處理AB和AC,如下圖所示:

這里寫圖片描述

And,Then和When

在將來還有一些 zip() 滿足不了的場(chǎng)景。如復(fù)雜的架構(gòu),或者是僅僅為了個(gè)人愛好,你可以使用And/Then/When解決方案。它們?cè)赗xJava的joins包下,使用Pattern和Plan作為中介,將發(fā)射的數(shù)據(jù)集合并到一起。

這里寫圖片描述

Switch

給出一個(gè)發(fā)射多個(gè)Observables序列的源Observable, switch() 訂閱到源Observable然后開始發(fā)射由第一個(gè)發(fā)射的Observable發(fā)射的一樣的數(shù)據(jù)。當(dāng)源Observable發(fā)射一個(gè)新的Observable時(shí), switch() 立即取消訂閱前一個(gè)發(fā)射數(shù)
據(jù)的Observable(因此打斷了從它那里發(fā)射的數(shù)據(jù)流)然后訂閱一個(gè)新的Observable,并開始發(fā)射它的數(shù)據(jù)。

StartWith

RxJava的 startWith() 是 concat() 的對(duì)應(yīng)部分。正如 concat() 向發(fā)射數(shù)據(jù)的Observable追加數(shù)據(jù)那樣,在Observable開始發(fā)射他們的數(shù)據(jù)之前,startWith() 通過傳遞一個(gè)參數(shù)來先發(fā)射一個(gè)數(shù)據(jù)序列。

Schedulers-解決Android主線程問題

Schedulers

調(diào)度器以一種最簡(jiǎn)單的方式將多線程用在你的Apps的中。它們時(shí)RxJava重要的一部分并能很好地與Observables協(xié)同工作。它們無需處理實(shí)現(xiàn)、同步、線程、平臺(tái)限制、平臺(tái)變化而可以提供一種靈活的方式來創(chuàng)建并發(fā)程序。

RxJava提供了5種調(diào)度器:

  • .io()
  • .computation()
  • .immediate()
  • .newThread()
  • .trampoline()
Schedulers.io()

這個(gè)調(diào)度器時(shí)用于I/O操作。它基于根據(jù)需要,增長或縮減來自適應(yīng)的線程池。我們將使用它來修復(fù)我們之前看到的 StrictMode 違規(guī)做法。由于它專用于I/O操作,所以并不是RxJava的默認(rèn)方法;正確的使用它是由開發(fā)者決定的。

重點(diǎn)需要注意的是線程池是無限制的,大量的I/O調(diào)度操作將創(chuàng)建許多個(gè)線程并占用內(nèi)存。一如既往的是,我們需要在性能和簡(jiǎn)捷兩者之間找到一個(gè)有效的平衡點(diǎn)。

Schedulers.computation()

這個(gè)是計(jì)算工作默認(rèn)的調(diào)度器,它與I/O操作無關(guān)。它也是許多RxJava方法的默認(rèn)調(diào)度器: buffer() , debounce() , delay() , interval() , sample() , skip()。

Schedulers.immediate()

這個(gè)調(diào)度器允許你立即在當(dāng)前線程執(zhí)行你指定的工作。它是 timeout() , timeInterval() ,以及 timestamp() 方法默認(rèn)的調(diào)度器。

Schedulers.newThread()

這個(gè)調(diào)度器正如它所看起來的那樣:它為指定任務(wù)啟動(dòng)一個(gè)新的線程。

Schedulers.trampoline()

當(dāng)我們想在當(dāng)前線程執(zhí)行一個(gè)任務(wù)時(shí),并不是立即,我們可以用 .trampoline() 將它入隊(duì)。這個(gè)調(diào)度器將會(huì)處理它的隊(duì)列并且按序運(yùn)行隊(duì)列中每一個(gè)任務(wù)。它是 repeat() 和 retry() 方法默認(rèn)的調(diào)度器。

非阻塞I/O操作

使用 Schedulers.io() 創(chuàng)建非阻塞的版本:

public static void storeBitmap(Context context, Bitmap bitmap, String filename) {
    Schedulers.io().createWorker().schedule(() -> {
        blockingStoreBitmap(context, bitmap, filename);
    });
}

SubscribeOn and ObserveOn

我們學(xué)到了如何在一個(gè)調(diào)度器上運(yùn)行一個(gè)任務(wù)。但是我們?nèi)绾卫盟鼇砗蚈bservables一起工作呢?RxJava提供了 subscribeOn() 方法來用于每個(gè)Observable對(duì)象。 subscribeOn() 方法用 Scheduler 來作為參數(shù)并在這個(gè)Scheduler上執(zhí)行Observable調(diào)用。

首先,我們需要一個(gè)新的 getApps() 方法來檢索已安裝的應(yīng)用列表:

private Observable<AppInfo> getApps() {
    return Observable.create(subscriber -> {
        List<AppInfo> apps = new ArrayList<>();
        SharedPreferences sharedPref = getActivity().getPreferences(Context.MODE_PRIVATE);
        Type appInfoType = new TypeToken<List<AppInfo>>(){}.getType();
        String serializedApps = sharedPref.getString("APPS", "");
        if (!"".equals(serializedApps)) {
            apps = new Gson().fromJson(serializedApps,appInfoType);
        }
        for (AppInfo app : apps) {
            subscriber.onNext(app);
        }
        subscriber.onCompleted();
    });
}

然后,我們所需要做的是指定 getApps() 需要在調(diào)度器上執(zhí)行:

getApps().subscribeOn(Schedulers.io())
    .subscribe(new Observer<AppInfo>() { [...]

最后,我們只需在 loadList() 函數(shù)添加幾行代碼,那么每一項(xiàng)就都準(zhǔn)備好了:

getApps()
    .onBackpressureBuffer()
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Observer<AppInfo>() { [...]

observeOn() 方法將會(huì)在指定的調(diào)度器上返回結(jié)果:如例子中的UI線程。 onBackpressureBuffer() 方法將告訴Observable發(fā)射的數(shù)據(jù)如果比觀察者消費(fèi)的數(shù)據(jù)要更快的話,它必須把它們存儲(chǔ)在緩存中并提供一個(gè)合適的時(shí)間給它們。

處理耗時(shí)的任務(wù)

一個(gè)與I/O無關(guān)的耗時(shí)的任務(wù):

getObservableApps(apps)
    .onBackpressureBuffer()
    .subscribeOn(Schedulers.computation())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Observer<AppInfo>() { [...]

總結(jié)

RxJava提供了一種以面向時(shí)序的方式考慮數(shù)據(jù)的機(jī)會(huì):所有事情都是持續(xù)變化的,數(shù)據(jù)在更新,事件在觸發(fā),然后你就可以創(chuàng)建事件響應(yīng)式的、靈活的、運(yùn)行流暢的App。

謹(jǐn)記可觀測(cè)序列就像一條河:它們是流動(dòng)的。你可以“過濾”(filter)一條河,你可以“轉(zhuǎn)換”(transform)一條河,你可以將兩條河合并(combine)成一個(gè),然后依然暢流如初。最后,它就成了你想要的那條河。

“Be Water,my friend” - Bruce Lee

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 本篇文章介主要紹RxJava中操作符是以函數(shù)作為基本單位,與響應(yīng)式編程作為結(jié)合使用的,對(duì)什么是操作、操作符都有哪些...
    嘎啦果安卓獸閱讀 2,973評(píng)論 0 10
  • 版權(quán)聲明:本文為小斑馬偉原創(chuàng)文章,轉(zhuǎn)載請(qǐng)注明出處! 上篇簡(jiǎn)單的闡述了響應(yīng)式編程的基本理論。這篇主要對(duì)響應(yīng)編程進(jìn)行詳...
    ZebraWei閱讀 3,219評(píng)論 0 2
  • Android拾萃 - RxJava最簡(jiǎn)單的入門(一)Android拾萃 - RxJava操作符列表和響應(yīng)類型(二...
    三也視界閱讀 1,288評(píng)論 1 6
  • 創(chuàng)建操作 用于創(chuàng)建Observable的操作符Create通過調(diào)用觀察者的方法從頭創(chuàng)建一個(gè)ObservableEm...
    rkua閱讀 1,952評(píng)論 0 1
  • 作者: maplejaw本篇只解析標(biāo)準(zhǔn)包中的操作符。對(duì)于擴(kuò)展包,由于使用率較低,如有需求,請(qǐng)讀者自行查閱文檔。 創(chuàng)...
    maplejaw_閱讀 46,187評(píng)論 8 93

友情鏈接更多精彩內(nèi)容