All RxJava - 為Retrofit添加重試

在我們的日常開發(fā)中離不開I/O操作,尤其是網(wǎng)絡(luò)請(qǐng)求,但并不是所有的請(qǐng)求都是可信賴的,因此我們必須為APP添加請(qǐng)求重試功能。

對(duì)于一個(gè)網(wǎng)絡(luò)請(qǐng)求重試而言,我認(rèn)為它至少應(yīng)該做到以下兩點(diǎn):

  • 可配置次數(shù)的重試
    因?yàn)椴⒉皇撬械木W(wǎng)絡(luò)請(qǐng)求都需要頻繁地重試,比如說一個(gè)重要的表單提交,它應(yīng)該盡可能多失敗重連,相反地,埋點(diǎn)上報(bào)等統(tǒng)計(jì)功能,它可能最多只需要重試一次就足夠了。因此針對(duì)不同的場(chǎng)景,我們需要不同的重試次數(shù)。

  • 退避策略。
    我們應(yīng)該為請(qǐng)求重試加入一個(gè)合理的退避算法,而不是一旦遭遇了失敗就立即無腦般的再次發(fā)起請(qǐng)求,這樣做沒有一點(diǎn)好處,不但降低了用戶體驗(yàn),甚至還在浪費(fèi)網(wǎng)絡(luò)資源。一個(gè)合理的重試策略應(yīng)該是:遇到網(wǎng)絡(luò)異常時(shí)應(yīng)該等待一段時(shí)間后再重試,若遇到的異常次數(shù)越多,等待(退避)的時(shí)間就應(yīng)該越長(zhǎng)。

我一直使用SquareretrofitReactiveXRxJava,接下來我就來分享一下我是如何使用這兩個(gè)庫來實(shí)現(xiàn)一個(gè)可配置次數(shù)的退避重試策略的。

Repeat? Retry!

RxJava中有兩個(gè)操作符能夠觸發(fā)重訂閱,分別是:

從上面的彈珠圖中,我們可以了解到,這兩個(gè)操作符的區(qū)別僅僅是針對(duì)不同的“終止事件”來會(huì)觸發(fā)重訂閱:.repeat()接收到onCompleted后觸發(fā)重訂閱;而.retry()則是接收到OnError后觸發(fā)重訂閱。

需要注意的是,千萬不要使用這兩個(gè)操作符無限地重訂閱源Observable,一定要在恰當(dāng)?shù)臅r(shí)候通過取消訂閱的方式來停止它們,避免陷入無限循環(huán),從而導(dǎo)致系統(tǒng)崩潰。除此之外還可以使用它們的重載函數(shù).repeat(n).retry(n),來設(shè)置一個(gè)合適的重訂閱次數(shù)n。

ps : 寫這篇博客的時(shí)候我參照了RxJava-1.2.10的源碼,.repeat().retry()的內(nèi)部實(shí)現(xiàn)幾乎是一模一樣的,一點(diǎn)細(xì)微不同是:除了取消訂閱能夠同時(shí)終止它倆的重訂閱之外,.repeat()還能被OnError終止,相對(duì)的.retry()能被onCompleted終止。

回到本篇文章的主題上,我們需要的是在遭遇I/O異常時(shí),發(fā)起重試,而不是請(qǐng)求成功時(shí),很明顯的.retry()勝出!

Retry?RetryWhen!

首先,我們需要認(rèn)清的事實(shí)是:所有的網(wǎng)絡(luò)異常都屬于I/O異常。

我們的重點(diǎn)是,只有遭遇了IOException時(shí)才重試網(wǎng)絡(luò)請(qǐng)求,也就是說那些IllegalStateException,NullPointerException或者當(dāng)你使用gson來解析json時(shí)還可能出現(xiàn)的JsonParseException等非I/O異常均不在重試的范圍內(nèi)。

因此.retry()以及它的重載函數(shù)已經(jīng)不能滿足我們的需求了,好在RxJava為我們提供了另一個(gè)非常有用的操作符.retryWhen(),我們可以通過判斷異常類型,來決定是否發(fā)起重試(重訂閱)。

.retryWhen()的函數(shù)簽名如下:

public final Observable<T> retryWhen(Func1<? super Observable<? extends java.lang.Throwable>,? extends Observable<?>> notificationHandler)

其中notificationHandler是我們需要實(shí)現(xiàn)的函數(shù),它有兩個(gè)概念必須弄清:

  • 參數(shù)Observable<Throwable>,其中的泛型意指上游操作符拋出的異常,我們可以通過這個(gè)條件來判斷異常的類型。

  • 返回值Observable<?>,通配符(泛型)表示我們可以返回任意類型的Observable,它的作用是:一旦這個(gè)Observable通過onNext()發(fā)送事件,則重訂閱(重試)發(fā)生一次,如果這個(gè)Observable調(diào)用了onComplete或者onError那么將跳過重訂閱,最終這些終止事件將會(huì)向下傳遞,從此這個(gè)操作符的重訂閱功能也就失效了。

RX-CODE!

下面這段代碼是我使用的notificationHandler的實(shí)現(xiàn)類RetryWhenHandler,它基本滿足了我的重試要求。

final class RetryWhenHandler implements Func1<Observable<? extends Throwable>, Observable<Long>> {

  private static final int INITIAL = 1;
  private int maxConnectCount = 1;

  RetryWhenHandler(int retryCount) {
    this.maxConnectCount += retryCount;
  }

  @Override public Observable<Long> call(Observable<? extends Throwable> errorObservable) {
    return errorObservable.zipWith(Observable.range(INITIAL, maxConnectCount),
        new Func2<Throwable, Integer, ThrowableWrapper>() {
          @Override public ThrowableWrapper call(Throwable throwable, Integer i) {

            //①
            if (throwable instanceof IOException) return new ThrowableWrapper(throwable, i);

            return new ThrowableWrapper(throwable, maxConnectCount);
          }
        }).concatMap(new Func1<ThrowableWrapper, Observable<Long>>() {
      @Override public Observable<Long> call(ThrowableWrapper throwableWrapper) {

        final int retryCount = throwableWrapper.getRetryCount();

        //②
        if (maxConnectCount == retryCount) {
          return Observable.error(throwableWrapper.getSourceThrowable());
        }

        //③
        return Observable.timer((long) Math.pow(2, retryCount), TimeUnit.SECONDS,
            Schedulers.immediate());
      }
    });
  }

  private static final class ThrowableWrapper {

    private Throwable sourceThrowable;
    private Integer retryCount;

    ThrowableWrapper(Throwable sourceThrowable, Integer retryCount) {
      this.sourceThrowable = sourceThrowable;
      this.retryCount = retryCount;
    }

    Throwable getSourceThrowable() {
      return sourceThrowable;
    }

    Integer getRetryCount() {
      return retryCount;
    }
  }
}

有三點(diǎn)地方需要注意:

① 只在IOException的情況下記錄本次請(qǐng)求在最大請(qǐng)求次數(shù)中的位置,否則視為最后一次請(qǐng)求,避免多余的請(qǐng)求重試。

②如果最后一次網(wǎng)絡(luò)請(qǐng)求依然遭遇了異常,則將此異常繼續(xù)向下傳遞,以便在最后的onError()函數(shù)中處理。

③使用.timer()操作符實(shí)現(xiàn)一個(gè)簡(jiǎn)單的二進(jìn)制指數(shù)退避算法,需要注意的是.timer()操作符默認(rèn)執(zhí)行在Schedulers.computation(),我們并不希望它切換到別的線程去執(zhí)行重試邏輯,因此使用了它的重載函數(shù),并指定在當(dāng)前線程立即執(zhí)行。

@Retry

由于retrofit的請(qǐng)求參數(shù)是基于函數(shù)描述的,因此我們創(chuàng)建一個(gè)注解Retry用來描述重試次數(shù)。代碼如下:

@Documented
@Retention(RUNTIME)
@Target(METHOD)
public @interface Retry {
  //retry times when an IOException is encountered
  int count() default 0;
}

值得一提的是,我們只希望這個(gè)注解能夠被聲明在方法上,而且必須是RuntimeVisibleAnnotations,否則我們無法在運(yùn)行時(shí)拿到。

假設(shè)你已經(jīng)閱讀過了retrofit的源碼,至少知道如何使用CallAdapter.Factory來定義一個(gè)CallAdapter。如果對(duì)它不了解,則只需要記住,在CallAdapter.Factory中我們必須實(shí)現(xiàn)的抽象方法,其中第二個(gè)參數(shù)annotations包含了我們定義在方法上的所有RUNTIME注解。:

 public abstract @Nullable CallAdapter<?, ?> get(Type returnType, Annotation[] annotations,
 Retrofit retrofit);

接下來,稍微改造一下RxJavaCallAdapter的構(gòu)造函數(shù),添加一個(gè)重試變量,并在Observable調(diào)用鏈中添加我們之前已經(jīng)寫好的RetryWhenHandler:

final class RxJavaCallAdapter<R> implements CallAdapter<R, Object> {
  private final Type responseType;
  private final @Nullable Scheduler scheduler;
  private final int retryCount;
  private final boolean isAsync;
  private final boolean isResult;
  private final boolean isBody;
  private final boolean isSingle;
  private final boolean isCompletable;

  RxJavaCallAdapter(Type responseType, @Nullable Scheduler scheduler, int retryCount,
      boolean isAsync, boolean isResult, boolean isBody, boolean isSingle, boolean isCompletable) {
    this.responseType = responseType;
    this.scheduler = scheduler;
    this.retryCount = retryCount
    this.isAsync = isAsync;
    this.isResult = isResult;
    this.isBody = isBody;
    this.isSingle = isSingle;
    this.isCompletable = isCompletable;
  }

  @Override public Type responseType() {
    return responseType;
  }

  @Override public Object adapt(Call<R> call) {
    OnSubscribe<Response<R>> callFunc = isAsync
        ? new CallEnqueueOnSubscribe<>(call)
        : new CallExecuteOnSubscribe<>(call);

    OnSubscribe<?> func;
    if (isResult) {
      func = new ResultOnSubscribe<>(callFunc);
    } else if (isBody) {
      func = new BodyOnSubscribe<>(callFunc);
    } else {
      func = callFunc;
    }
    Observable<?> observable = Observable.create(func).retryWhen(new RetryWhenHandler(retryCount));

    if (scheduler != null) {
      observable = observable.subscribeOn(scheduler);
    }

    if (isSingle) {
      return observable.toSingle();
    }
    if (isCompletable) {
      return observable.toCompletable();
    }
    return observable;
  }
}

解析@Retry注解的操作需要放在RxJavaCallAdapterFactory#Line104中:

int count;
for (Annotation annotation : annotations) {
  if (!Retry.class.isAssignableFrom(annotation.getClass())) continue;
  count = Retry.class.cast(annotation).count();
  if (count<0) throw new IllegalArgumentException(
      "The count in the \'@Retry\' is less than zero");
}

總結(jié)

至此,我們基本完成了通過RxJava為retrofit添加重試的功能,它利用retrofit本身的“基于方法描述的特性”,因此足夠靈活,而且擴(kuò)展性也很高 : )

當(dāng)然,不局限于此,如果你使用了okhttp,還可以通過自定義Interceptor的方式,為你的網(wǎng)絡(luò)請(qǐng)求添加失敗重試功能。

這篇文章只是提供一個(gè)簡(jiǎn)單的思路,對(duì)于健壯應(yīng)用程序,我們?nèi)匀恍枰粩嗟膰L試與探索,如果你有更好的經(jīng)驗(yàn),歡迎分享,如果你喜歡這篇文章,請(qǐng)點(diǎn)個(gè)贊。

文中所有代碼,都可以從github上獲取Forked from retrofit,希望這篇文章能夠?qū)δ闼袔椭?。Happy coding, enjoy it.

參考

【譯】對(duì)RxJava中.repeatWhen()和.retryWhen()操作符的思考 - 小鄧子

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容