Rx系列之Rxjava操作符進(jìn)階-使用場景

通過上一篇《Rx系列之RxJava操作符》,相信已經(jīng)能夠熟練的使用一些基本的操作符了。但是對于我們大家而言,其實(shí)最傳統(tǒng)的命令式編程已經(jīng)是我們順手就可以拈來的,但是,現(xiàn)在用響應(yīng)式編程,突然發(fā)現(xiàn):臥槽,這個(gè)地方用響應(yīng)式怎么寫,這樣寫對么?估計(jì)很多人才開始接觸RxJava的時(shí)候應(yīng)該都有這樣的疑慮。不用擔(dān)心,這一篇就給大家講講RxJava到底該怎么用,在什么情況下用!

RxJava的使用場景

眼尖的小伙伴,可能已經(jīng)發(fā)現(xiàn),在上一篇中,很多那么重要的操作符怎么都沒講!哈哈哈,答案在這里。好廢話不多說,來看看Rxjava到底在哪些情況下可以使用。

動(dòng)態(tài)搜索的場景

我們先來看一個(gè)動(dòng)態(tài)搜索的場景:

搜索

假設(shè),我要在這進(jìn)行網(wǎng)絡(luò)搜索,那么,我就要在這里面進(jìn)行網(wǎng)絡(luò)訪問,如果是輸入完成之后點(diǎn)擊確定進(jìn)行搜索還好,但是如果是動(dòng)態(tài)收索呢?只要搜索框中的搜索內(nèi)容一改變,那么是不是就要進(jìn)行網(wǎng)絡(luò)請求呢?那這樣就不是那么友好了。為了解決這樣的問題,rxjava為我們提供了一個(gè)很好的解決方案:

  • 使用debounce作為textSearch
    debounce()函數(shù)過濾掉由Observable發(fā)射的速率過快的數(shù)據(jù);如果在一個(gè)指定的時(shí)間間隔過去了仍舊沒有發(fā)射一個(gè),那么它將發(fā)射最后的那個(gè)。
    debounce()使用TimeUnit對象指定時(shí)間間隔。
    是不是感覺棒棒噠,昂,不管你喜不喜歡,反正我是愛死它了。

來看一下示意圖


debounce示意圖
debounce示意圖

由上圖我們可以看出,在比較密集的數(shù)據(jù)(2,3,4,5)發(fā)射之后,其實(shí)最終只是發(fā)射5。

附上代碼:

        RxTextView.textChanges(editText)
                .debounce(5000,TimeUnit.MILLISECONDS)
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Observer<CharSequence>() {
                    @Override
                    public void onCompleted() {
                        Log.d(TAG, "onCompleted: onCompleted");
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "onError: onError");
                    }

                    @Override
                    public void onNext(CharSequence charSequence) {
                        Log.d(TAG, "onNext: "+charSequence.toString());
                    }
                });

在這5s內(nèi),我輸入了2,3,4,5(出最后一個(gè)5,其他輸入之后就刪除哈),但是最后得到的結(jié)果卻是:

onNext: 5

注意:這個(gè)操作符會(huì)會(huì)接著最后一項(xiàng)數(shù)據(jù)發(fā)射原始Observable的onCompleted通知,即使這個(gè)通知發(fā)生在你指定的時(shí)間窗口內(nèi)(從最后一項(xiàng)數(shù)據(jù)的發(fā)射算起)。也就是說,onCompleted通知不會(huì)觸發(fā)限流。

在上面可能會(huì)存在一個(gè)疑問,那就是
RxTextView.textChanges(editText)
這是個(gè)什么東西?你丫怎么沒講,哈哈,這個(gè)呀,要在后面的Rx系列中單獨(dú)來講,所以不要著急,暫時(shí)說一下這個(gè)的功能,這個(gè)RxTextView.textChanges(editText)其實(shí)是RxBinding里面的一個(gè)對控件的操作,其功能就跟TextWatcher一樣,就是對數(shù)據(jù)的變更進(jìn)行監(jiān)聽,所以上面的數(shù)據(jù)變化之后5s后將數(shù)據(jù)發(fā)射出去。嗯嗯,到這里就把動(dòng)態(tài)搜索場景講解了。

緩存檢測場景

在請求取數(shù)據(jù)的處理過程中,我們的操作一般是這樣一個(gè)原理:

  • ** 首先檢查內(nèi)存是否有緩存**
  • 然后檢查文件緩存中是否有
  • 最后才從網(wǎng)絡(luò)中取
    任何一步一旦發(fā)現(xiàn)數(shù)據(jù)后面的操作都不執(zhí)行
    在rxjava中為我們提供了兩個(gè)解決這個(gè)問題的操作符,分別是: concatfirst

concat
不交錯(cuò)的發(fā)射兩個(gè)或多個(gè)Observable
concat操作符連接多個(gè)Observable的輸出,就好像它們是一個(gè)Observable,第一個(gè)Observable發(fā)射的所有數(shù)據(jù)在第二個(gè)Observable發(fā)射的任何數(shù)據(jù)前面,以此類推。直到前面一個(gè)Observable終止,Concat
才會(huì)訂閱額外的一個(gè)Observable

請注意上面所說的“就好像它們是一個(gè)Observable”,其實(shí)并不是一個(gè)Observable,是前面一個(gè)停止之后才會(huì)訂閱下一個(gè),所以說他們并不是一個(gè),請君注意咯。

concat示意圖
concat示意圖

如上所示,就是將兩個(gè)Observable連接起來了。
還有一個(gè)實(shí)例方法concatWith,它是和concat等價(jià)的:Observable.concat(a,b)==a.concatWith(b)

來看一下是不是這個(gè)樣子的:

        Subscriber<Integer> subscriber = new Subscriber<Integer>() {
            @Override
            public void onCompleted() {
                Log.d(TAG, "onCompleted: onCompleted");
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "onError: onError");
            }

            @Override
            public void onNext(Integer integer) {
                Log.d(TAG, "onNext: " + integer);
            }
        };

        Observable a = Observable.just(1, 2, 3, 4, 5);
        Observable b = Observable.just(6, 7, 8, 9, 10);

        Observable.concat(a, b)
                .subscribe(subscriber);

然后我們得到:

onNext: 1
 ...
onNext: 10
onCompleted: onCompleted

這時(shí)估計(jì)就會(huì)有人說了:你不是說這個(gè)操作符其實(shí)是將兩個(gè)訂閱連接起來了嘛!那么,為什么只是在最后打印了onCompleted,在onNext: 5后面不是也應(yīng)該打印一個(gè)嗎?
我們都知道觀察者和被觀察者之間,是由訂閱建立關(guān)系的,那么對于被觀察者來說,確實(shí)我發(fā)射了兩個(gè)數(shù)據(jù)源,但是對于觀察者來說,我不知道你有幾個(gè)數(shù)據(jù)源,我的職責(zé)就只是,數(shù)據(jù)發(fā)射過來后,我打印而已。所以,只有當(dāng)onNext沒有接收到數(shù)據(jù)時(shí),才會(huì)調(diào)用onCompleted。

最后對這個(gè)操作符,再補(bǔ)充一點(diǎn):如果當(dāng)?shù)谝粋€(gè)Observable a拋異常,那么將不會(huì)繼續(xù)執(zhí)行后面的Observable b了。
如果想測試請將上面的

Observable a = Observable.just(1, 2, 3, 4, 5);

變成

Observable a = Observable.just(1, 2, 3, 4, new RuntimeException());

進(jìn)行測試。

first
只發(fā)射第一項(xiàng)(或者滿足某個(gè)條件的第一項(xiàng))數(shù)據(jù)

first示意圖
first示意圖

由上圖我們可以看出,這個(gè)只要第一項(xiàng)滿足條件,后面的將不會(huì)再進(jìn)行發(fā)射,所以只是得到了1這個(gè)數(shù)字。

        Subscriber<Integer> subscriber = new Subscriber<Integer>() {
            @Override
            public void onCompleted() {
                Log.d(TAG, "onCompleted: onCompleted");
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "onError: onError");
            }

            @Override
            public void onNext(Integer integer) {
                Log.d(TAG, "onNext: " + integer);
            }
        };

        Observable a = Observable.just(1, 2, 3, 4, 5);
        a.first().subscribe(subscriber);

得到結(jié)果:

onNext: 1
onCompleted: onCompleted

這個(gè)應(yīng)該很容易就看出來了。就是只是打印了第一個(gè)數(shù)據(jù)!
在這兒必須為大家區(qū)別一個(gè)操作符:single(),這個(gè)操作符也是只打印一個(gè)數(shù)據(jù)的,但是single()和first()最大的區(qū)別在于:前者只會(huì)發(fā)射一個(gè)數(shù)據(jù),不能發(fā)射多個(gè),否則會(huì)報(bào)錯(cuò);而first確實(shí)滿足條件的那一個(gè)。
如下:

    Observable a = Observable.just(1);
    a.single().subscribe(subscriber);

估計(jì)到這兒應(yīng)該已經(jīng)有人知道了上面的3個(gè)步驟改真沒寫了,來我們來看看代碼:

 final Observable<String> memory = Observable.create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {
                if (memoryCache != null) {
                    subscriber.onNext(memoryCache);
                } else {
                    subscriber.onCompleted();
                }
            }
        });
        Observable<String> disk = Observable.create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {
                String cachePref = rxPreferences.getString("cache").get();
                if (!TextUtils.isEmpty(cachePref)) {
                    subscriber.onNext(cachePref);
                } else {
                    subscriber.onCompleted();
                }
            }
        });

        Observable<String> network = Observable.just("network");

        //依次檢查memory、disk、network  
        Observable
                .concat(memory, disk, network)
                .first()
                .subscribeOn(Schedulers.newThread())
                .subscribe(s -> {
                    memoryCache = "memory";
                    System.out.println("--------------subscribe: " + s);
                });

現(xiàn)在看上面的代碼是不是就知道它在干什么了,是不是很簡單!這個(gè)緩存檢測場景就講到這里。

輸入合法場景

在某些時(shí)候,我們需要所以的輸入都合法后,我們的某些按鈕才亮起來,或者才能點(diǎn)擊,如下圖:


輸入合法示意圖

在這個(gè)場景中,我們得掌握兩個(gè)操作符:skipcombineLatest

skip
抑制Observable發(fā)射的前N項(xiàng)數(shù)據(jù)

skip示意圖
skip示意圖

從上圖可以看到,總共發(fā)射了4個(gè)數(shù)據(jù),只有最后兩個(gè)發(fā)射出去了,這就是skip(2)的作用。

        Observable.just(1,2,3,4).skip(2).subscribe(new Subscriber<Integer>() {
            @Override
            public void onCompleted() {
                Log.d(TAG, "onCompleted: onCompleted");
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "onError: onError");
            }

            @Override
            public void onNext(Integer integer) {
                Log.d(TAG, "onNext: "+integer);
            }
        });

得到如下結(jié)果:

onNext: 3
onNext: 4
onCompleted: onCompleted

combineLatest
當(dāng)多個(gè)Observables中的任何一個(gè)發(fā)射了數(shù)據(jù)時(shí),使用一個(gè)函數(shù)結(jié)合每個(gè)Observable發(fā)射的最近數(shù)據(jù)項(xiàng),并且基于這個(gè)函數(shù)的結(jié)果發(fā)射數(shù)據(jù)。
CombineLatest在原始的Observable中任意一個(gè)發(fā)射了數(shù)據(jù)時(shí)發(fā)射一條數(shù)據(jù)。當(dāng)原始Observables的任何一個(gè)發(fā)射了一條數(shù)據(jù)時(shí),CombineLatest
使用一個(gè)函數(shù)結(jié)合它們最近發(fā)射的數(shù)據(jù),然后發(fā)射這個(gè)函數(shù)的返回值。
一開始看到這句話,我又懵b了,這tm幾個(gè)意思?我們先來看看這個(gè)場景的實(shí)現(xiàn)代碼,然后再解釋:

        private void combineLatestEvent() {

        Observable<CharSequence> usernameObservable = RxTextView.textChanges(mUsername).skip(1);
        Observable<CharSequence> emailObservable = RxTextView.textChanges(mEmail).skip(1);
        Observable<CharSequence> passwordObservable = RxTextView.textChanges(mPassword).skip(1);

       Subscription subscription = Observable.combineLatest(usernameObservable, emailObservable,
                passwordObservable,
                new Func3<CharSequence, CharSequence, CharSequence, Boolean>() {
                    @Override
                    public Boolean call(CharSequence userName, CharSequence email, CharSequence
                            password) {

                        boolean isUserNameValid = !TextUtils.isEmpty(userName) && (userName
                                .toString().length() > 2 && userName.toString().length() < 9);

                        if (!isUserNameValid) {
                            mUsername.setError("用戶名無效");
                        }


                        boolean isEmailValid = !TextUtils.isEmpty(email) && Patterns
                                .EMAIL_ADDRESS.matcher(email).matches();

                        if (!isEmailValid) {
                            mEmail.setError("郵箱無效");
                        }

                        boolean isPasswordValid = !TextUtils.isEmpty(password) && (password
                                .toString().length() >5 && password.toString().length() < 11);

                        if (!isPasswordValid) {
                            mPassword.setError("密碼無效");
                        }


                        return isUserNameValid && isEmailValid && isPasswordValid;
                    }
                })
                .subscribe(getObserver());
    }

  
    private Observer<Boolean> getObserver() {
        return new Observer<Boolean>() {
            @Override
            public void onCompleted() {

            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onNext(Boolean aBoolean) {
                //更改注冊按鈕是否可用的狀態(tài)
                register.setEnabled(aBoolean);
            }
        };
    }

這個(gè)場景,有3個(gè)edittext,分別是mUsername,mEmail,mPassword,通過輸入合法的內(nèi)容進(jìn)行判定注冊按鈕是否亮起來。
當(dāng)我點(diǎn)擊其中的任何一個(gè)進(jìn)行編寫的時(shí)候,就會(huì)發(fā)射數(shù)據(jù),發(fā)射的是什么?是我們編輯的內(nèi)容嗎?其實(shí)不是的,發(fā)射的是結(jié)合Func3這個(gè)方法的返回值,在這里這個(gè)返回值是Boolean型的。返回了boolean型之后,就可以在觀察者里面設(shè)置注冊按鈕是否亮起來。現(xiàn)在再看上面那句高深莫測的話,是不是簡單多了!

這兒可能有人有疑問了:這3個(gè)edittext為什么要使用skip(1)呢?
答案其實(shí)很簡答啊,那就是當(dāng)我們不寫skip(1)的時(shí)候,edittext中沒有輸入任何值的時(shí)候,會(huì)把它當(dāng)作第一個(gè)數(shù)據(jù)進(jìn)行發(fā)射,雖然發(fā)射的是個(gè)空數(shù)據(jù),但是還是會(huì)發(fā)射啊!
奧偶,這個(gè)場景解釋完了!

數(shù)據(jù)過期場景

其實(shí)這個(gè)場景可以和上面的數(shù)據(jù)緩存檢測場景進(jìn)行合并:在緩存檢測場景中,我們知道,如果memory中沒有數(shù)據(jù),就從disk上面尋找,然后再是網(wǎng)絡(luò)請求,那么,問題來了,如果我們的memory中一直有數(shù)據(jù),但是網(wǎng)絡(luò)數(shù)據(jù)已經(jīng)變更了,又由于緩存檢測原則的只要有一個(gè)有數(shù)據(jù)就不會(huì)進(jìn)行網(wǎng)絡(luò)請求了,這就會(huì)造成我們顯示的數(shù)據(jù)一直是一個(gè)舊數(shù)據(jù)。

哦豁

那這個(gè)該怎么辦呢?
解決方法有如下兩個(gè):

  • 采用定時(shí)進(jìn)行清除本地緩存數(shù)據(jù)
  • 采用過濾操作符

我們先來看看第一種,如果是進(jìn)行定時(shí)做本地?cái)?shù)據(jù)清空的話,那么就會(huì)用到,我們一個(gè)輪詢的操作符Interval
創(chuàng)建一個(gè)按固定時(shí)間間隔發(fā)射整數(shù)序列的Observable
Interval通俗的講,就是每隔一段時(shí)間過后做什么事情!上一篇已經(jīng)講過了,所以這里就不詳細(xì)講解了,直接上代碼:

        Observable.interval(3, TimeUnit.SECONDS).subscribe(new Observer<Long>() {
           ...
           @Override
           public void onNext(Long aLong) {
                //清除緩存操作
           }
       });

很多人可能會(huì)想到,那既然,我能夠用清除本地緩存的方法,那么能不能用,每隔一段時(shí)間進(jìn)行請求,讓請求的結(jié)果與本地緩存進(jìn)行合并呢?
答案是肯定的,來看如下代碼:

    Observable.create(new Observable.OnSubscribe<String>() {  
            @Override  
           public void call(final Subscriber<? super String> observer) {  
 
              Schedulers.newThread().createWorker()  
                    .schedulePeriodically(new Action0() {  
                          @Override  
                          public void call() {  
                              observer.onNext(doNetworkCallAndGetStringResult());  
                         }  
                      }, INITIAL_DELAY, POLLING_INTERVAL, TimeUnit.MILLISECONDS);  
            }  
        }).subscribe(new Action1<String>() {  
           @Override  
           public void call(String s) {  
               
           }  
      })  

這個(gè)就是使用schedulePeriodically做輪詢請求

這樣造成每過一定時(shí)間我們就會(huì),清除緩存或者網(wǎng)絡(luò)請求。讀到這兒,是不是感覺這個(gè)方法真爛,哈哈哈,不著急,我們不是還有第二種方法嘛!來接著看第二種方法

  • 采用過濾操作符
    其實(shí)這個(gè)操作符我們已經(jīng)講過了,那就是操作符first,回顧一下上面的代碼,就是我們的first就是保證,眾多的數(shù)據(jù),有一個(gè)符合條件就發(fā)射數(shù)據(jù),后面的都將不執(zhí)行。我們的是否需要更新的條件不加在這里,就沒天理咯!
Observable source = Observable
    .concat(memory, disk, network)
    .first(new Func1() {
      @Override public Boolean call(Data data) {
        return data.isUpToDate();
      }
    });

哇偶,這個(gè)操作符完美的解決了如上的問題!那你丫的還將那么多,呵呵,我只是給大家講解操作符的使用場景而已,那個(gè)適合哪個(gè)場景,取決你們自己咯!

這一篇主要講解的內(nèi)容的就到這兒了,下面還有一些其他的場景,就簡單的介紹一下。

其他的場景

合并兩個(gè)數(shù)據(jù)源場景

使用merge合并兩個(gè)數(shù)據(jù)源,代碼如下:

    Observable.merge(getInfoFromFile(), getInfoFromNet())  
           .observeOn(AndroidSchedulers.mainThread())  
              .subscribe(new Subscriber<String>() {  
                  @Override  
                  public void onCompleted() {  
                     Log.d(TAG, "onCompleted: onCompleted");
                 }  
 
                 @Override  
                  public void onError(Throwable e) {  
                    Log.d(TAG, "onError: onError");
                  }  
  
                  @Override  
                  public void onNext(String data) {  
                       Log.d(TAG, "onNext: only one ! ");
             });  

Retrofit結(jié)合RxJava場景

這個(gè)場景的話,大家可以查看扔物線大神寫的給 Android 開發(fā)者的 RxJava 詳解,其中講解到了這個(gè)場景的結(jié)合!

就操作符使用場景這一塊而言,大概就講解這么多,如果大家有其他的使用場景,我們可以一起交流哦。感謝大家的支持,謝謝!

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容