【譯】對(duì)RxJava中.repeatWhen()和.retryWhen()操作符的思考

第一次見(jiàn)到.repeatWhen().retryWhen()這兩個(gè)操作符的時(shí)候就非常困惑了。不得不說(shuō),它們絕對(duì)是“最令人困惑彈珠圖”的有力角逐者。

然而它們都是非常有用的操作符:允許你有條件的重新訂閱已經(jīng)結(jié)束的Observable。我最近研究了它們的工作原理,現(xiàn)在我希望嘗試著去解釋它們(因?yàn)?,我也是耗費(fèi)了一些精力才參透它們)。

Repeat與Retry的對(duì)比

首先,來(lái)了解一下.repeat().retry()之間最直觀的區(qū)別是什么?這個(gè)問(wèn)題并不難:區(qū)別就在于什么樣的終止事件會(huì)觸發(fā)重訂閱。

當(dāng).repeat()接收到.onCompleted()事件后觸發(fā)重訂閱。

當(dāng).retry()接收到.onError()事件后觸發(fā)重訂閱。

然而,這種簡(jiǎn)單的敘述尚不能令人滿意。試想如果你要實(shí)現(xiàn)一個(gè)延遲數(shù)秒的重訂閱該如何去做?或者想通過(guò)觀察錯(cuò)誤來(lái)決定是否應(yīng)該重訂閱呢?這種情況下就需要.repeatWhen().retryWhen()的介入了,因?yàn)樗鼈冊(cè)试S你為重試提供自定義邏輯。

Notification Handler

你可以通過(guò)一個(gè)叫做notificationHandler的函數(shù)來(lái)實(shí)現(xiàn)重試邏輯。這是.retryWhen()的方法簽名(譯者注:方法簽名,指方法名稱、參數(shù)類型和參數(shù)數(shù)量等):

retryWhen(Func1<? super Observable<? extends java.lang.Throwable>,? extends Observable<?>> notificationHandler) 

簽名很長(zhǎng),甚至不能一口氣讀完。我發(fā)現(xiàn)它很難理解的原因是因?yàn)榇嬖谝淮蠖训姆盒图s定。

簡(jiǎn)化后,它包括三個(gè)部分:

  1. Func1像個(gè)工廠類,用來(lái)實(shí)現(xiàn)你自己的重試邏輯。
  2. 輸入的是一個(gè)Observable<Throwable>。
  3. 輸出的是一個(gè)Observable<?>。

首先,讓我們來(lái)看一下最后一部分。被返回的Observable<?>所要發(fā)送的事件決定了重訂閱是否會(huì)發(fā)生。如果發(fā)送的是onCompleted或者onError事件,將不會(huì)觸發(fā)重訂閱。相對(duì)的,如果它發(fā)送onNext事件,則觸發(fā)重訂閱(不管onNext實(shí)際上是什么事件)。這就是為什么使用了通配符作為泛型類型:這僅僅是個(gè)通知(next, error或者completed),一個(gè)很重要的通知而已。

source每次一調(diào)用onError(Throwable),Observable<Throwable>都會(huì)被作為輸入傳入方法中。換句話說(shuō)就是,它的每一次調(diào)用你都需要決定是否需要重訂閱。

當(dāng)訂閱發(fā)生的時(shí)候,工廠Func1被調(diào)用,從而準(zhǔn)備重試邏輯。那樣的話,當(dāng)onError被調(diào)用后,你已經(jīng)定義的重試邏輯就能夠處理它了。

這里有個(gè)例子展示了我們應(yīng)該在哪些場(chǎng)景下訂閱source,比如,只有在ThrowableIOException的情況下請(qǐng)求重訂閱,否則不(重訂閱)。

source.retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>() {
          @Override public Observable<?> call(Observable<? extends Throwable> errors) {

            return errors.flatMap(new Func1<Throwable, Observable<?>>() {
              @Override public Observable<?> call(Throwable error) {

                // For IOExceptions, we  retry
                if (error instanceof IOException) {
                  return Observable.just(null);
                }

                // For anything else, don't retry
                return Observable.error(error);
              }
            });
          }  
        })

由于每一個(gè)error都被flatmap過(guò),因此我們不能通過(guò)直接調(diào)用.onNext(null)觸發(fā)重訂閱或者.onError(error)來(lái)避免重訂閱。

經(jīng)驗(yàn)之談

這里有一些關(guān)于.repeatWhen().retryWhen()的要點(diǎn),我們應(yīng)該牢記于心。

  • .repeatWhen().retryWhen()非常相似,只不過(guò)不再響應(yīng)onError作為重試條件,而是onCompleted。因?yàn)?code>onCompleted沒(méi)有類型,所有輸入變?yōu)?code>Observable<Void>。

  • 每一次事件流的訂閱notificationHandler(也就是Func1)只會(huì)調(diào)用一次。這也是講得通的,因?yàn)槟阌幸粋€(gè)可觀測(cè)的Observable<Throwable>,它能夠發(fā)送任意數(shù)量的error。

  • 輸入的Observable必須作為輸出Observable的源。你必須對(duì)Observable<Throwable>做出反應(yīng),然后基于它發(fā)送事件;你不能只返回一個(gè)通用泛型流。

換言之就是,你不能做類似的操作:

 .retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>() {
              @Override public Observable<?> call(Observable<? extends Throwable> errors) {

                return Observable.just(null);}
            })

因?yàn)樗粌H不能奏效,而且還會(huì)打斷你的鏈?zhǔn)浇Y(jié)構(gòu)。你應(yīng)該做的是,而且至少應(yīng)該做的是,把輸入作為結(jié)果返回,就像這樣:

.retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>() {
              @Override public Observable<?> call(Observable<? extends Throwable> errors) {

                return errors;
              }
            })

(順便提一下,這在邏輯上與單純使用.retry()操作符的效果是一樣噠)

  • 輸入Observable只在終止事件發(fā)生的時(shí)候才會(huì)觸發(fā)(對(duì)于.repeatWhen()來(lái)說(shuō)是onCompleted,而對(duì)于.retryWhen()來(lái)說(shuō)是onError)。它不會(huì)從源中接收到任何onNext的通知,所以你不能通過(guò)觀察被發(fā)送的事件來(lái)決定重訂閱。如果你真的需要這樣做,你應(yīng)該添加像.takeUntil()這樣的操作符,來(lái)攔截事件流。

使用方式

現(xiàn)在,假設(shè)你已大概了解了.repeatWhen().retryWhen(),那么你能將一些什么樣的精簡(jiǎn)邏輯放入到notificationHandler中呢?

使用.repeatWhen() + .delay()定期輪詢數(shù)據(jù):

source.repeatWhen(new Func1<Observable<? extends Void>, Observable<?>>() {
              @Override public Observable<?> call(Observable<? extends Void> completed) {

                return completed.delay(5, TimeUnit.SECONDS);
              }
            })

直到notificationHandler發(fā)送onNext()才會(huì)重訂閱到source。因?yàn)樵诎l(fā)送onNext()之前delay了一段時(shí)間,所以優(yōu)雅的實(shí)現(xiàn)了延遲重訂閱,從而避免了不間斷的數(shù)據(jù)輪詢。

非此即彼,使用.flatMap() + .timer()實(shí)現(xiàn)延遲重訂閱:
(譯者注:在RxJava 1.0.0及其之后的版本,官方已不再提倡使用.timer()操作符,因?yàn)?code>.interval()具有同樣的功能)

source.retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>() {
              @Override public Observable<?> call(Observable<? extends Throwable> errors) {

                return errors.flatMap(new Func1<Throwable, Observable<?>>() {
                  @Override public Observable<?> call(Throwable error) {

                    return Observable.timer(5, TimeUnit.SECONDS);
                  }
                });
              }
            })

當(dāng)需要與其他邏輯協(xié)同的時(shí)候,這種替代方案就變得非常有用了,比如。。。

使用.zip() + .range()實(shí)現(xiàn)有限次數(shù)的重訂閱

source.retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>() {
              @Override public Observable<?> call(Observable<? extends Throwable> errors) {

                return errors.zipWith(Observable.range(1, 3), new Func2<Throwable, Integer, Integer>() {
                  @Override public Integer call(Throwable throwable, Integer i) {

                    return i;
                  }
                });
              }
            })

最后的結(jié)果就是每個(gè)error都與range中一個(gè)輸出配對(duì)出現(xiàn),就像這樣:

zip(error1, 1) -> onNext(1)  <-- Resubscribe  
zip(error2, 2) -> onNext(2)  <-- Resubscribe  
zip(error3, 3) -> onNext(3)  <-- Resubscribe  
onCompleted()                <-- No resubscription  

因?yàn)楫?dāng)?shù)谒拇蝒rror出現(xiàn)的時(shí)候,range(1,3)中的數(shù)字已經(jīng)耗盡了,所以它隱式調(diào)用了onCompleted(),從而導(dǎo)致整個(gè)zip的結(jié)束。防止了進(jìn)一步的重試。

將可變延遲策略與次數(shù)限制的重試機(jī)制結(jié)合起來(lái)

source.retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>() {
              @Override public Observable<?> call(Observable<? extends Throwable> errors) {

                return errors.zipWith(Observable.range(1, 3), new Func2<Throwable, Integer, Integer>() {
                  @Override public Integer call(Throwable throwable, Integer i) {

                    return i;
                  }
                }).flatMap(new Func1<Integer, Observable<? extends Long>>() {
                  @Override public Observable<? extends Long> call(Integer retryCount) {
                    
                    return Observable.timer((long) Math.pow(5, retryCount), TimeUnit.SECONDS);
                  }
                });
              }
            })

在這種用例的比較上,我認(rèn)為.flatMap()+.timer()的組合比單純使用.delay()更可取,因?yàn)槲覀兛梢酝ㄟ^(guò)重試次數(shù)來(lái)修改延遲時(shí)間。重試三次,并且每一次的重試時(shí)間都是5 ^ retryCount,僅僅通過(guò)一些操作符的組合就幫助我們實(shí)現(xiàn)了指數(shù)退避算法(譯者注:可參考二進(jìn)制指數(shù)退避算法)。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 注:只包含標(biāo)準(zhǔn)包中的操作符,用于個(gè)人學(xué)習(xí)及備忘參考博客:http://blog.csdn.net/maplejaw...
    小白要超神閱讀 1,059評(píng)論 0 3
  • 本篇文章介主要紹RxJava中操作符是以函數(shù)作為基本單位,與響應(yīng)式編程作為結(jié)合使用的,對(duì)什么是操作、操作符都有哪些...
    嘎啦果安卓獸閱讀 2,994評(píng)論 0 10
  • 我從去年開(kāi)始使用 RxJava ,到現(xiàn)在一年多了。今年加入了 Flipboard 后,看到 Flipboard 的...
    Jason_andy閱讀 5,777評(píng)論 7 62
  • 作者: maplejaw本篇只解析標(biāo)準(zhǔn)包中的操作符。對(duì)于擴(kuò)展包,由于使用率較低,如有需求,請(qǐng)讀者自行查閱文檔。 創(chuàng)...
    maplejaw_閱讀 46,219評(píng)論 8 93
  • 文章轉(zhuǎn)自:http://gank.io/post/560e15be2dca930e00da1083作者:扔物線在正...
    xpengb閱讀 7,148評(píng)論 9 73

友情鏈接更多精彩內(nèi)容