All RxJava - 為Retrofit添加重試

在我們的日常開發(fā)中離不開I/O操作,尤其是網絡請求,但并不是所有的請求都是可信賴的,因此我們必須為APP添加請求重試功能。

對于一個網絡請求重試而言,我認為它至少應該做到以下兩點:

  • 可配置次數(shù)的重試。
    因為并不是所有的網絡請求都需要頻繁地重試,比如說一個重要的表單提交,它應該盡可能多失敗重連,相反地,埋點上報等統(tǒng)計功能,它可能最多只需要重試一次就足夠了。因此針對不同的場景,我們需要不同的重試次數(shù)。

  • 退避策略。
    我們應該為請求重試加入一個合理的退避算法,而不是一旦遭遇了失敗就立即無腦般的再次發(fā)起請求,這樣做沒有一點好處,不但降低了用戶體驗,甚至還在浪費網絡資源。一個合理的重試策略應該是:遇到網絡異常時應該等待一段時間后再重試,若遇到的異常次數(shù)越多,等待(退避)的時間就應該越長。

我一直使用SquareretrofitReactiveXRxJava,接下來我就來分享一下我是如何使用這兩個庫來實現(xiàn)一個可配置次數(shù)的退避重試策略的。

Repeat? Retry!

RxJava中有兩個操作符能夠觸發(fā)重訂閱,分別是:

從上面的彈珠圖中,我們可以了解到,這兩個操作符的區(qū)別僅僅是針對不同的“終止事件”來會觸發(fā)重訂閱:.repeat()接收到onCompleted后觸發(fā)重訂閱;而.retry()則是接收到OnError后觸發(fā)重訂閱。

需要注意的是,千萬不要使用這兩個操作符無限地重訂閱源Observable,一定要在恰當?shù)臅r候通過取消訂閱的方式來停止它們,避免陷入無限循環(huán),從而導致系統(tǒng)崩潰。除此之外還可以使用它們的重載函數(shù).repeat(n).retry(n),來設置一個合適的重訂閱次數(shù)n。

ps : 寫這篇博客的時候我參照了RxJava-1.2.10的源碼,.repeat().retry()的內部實現(xiàn)幾乎是一模一樣的,一點細微不同是:除了取消訂閱能夠同時終止它倆的重訂閱之外,.repeat()還能被OnError終止,相對的.retry()能被onCompleted終止。

回到本篇文章的主題上,我們需要的是在遭遇I/O異常時,發(fā)起重試,而不是請求成功時,很明顯的.retry()勝出!

Retry?RetryWhen!

首先,我們需要認清的事實是:所有的網絡異常都屬于I/O異常。

我們的重點是,只有遭遇了IOException時才重試網絡請求,也就是說那些IllegalStateException,NullPointerException或者當你使用gson來解析json時還可能出現(xiàn)的JsonParseException等非I/O異常均不在重試的范圍內。

因此.retry()以及它的重載函數(shù)已經不能滿足我們的需求了,好在RxJava為我們提供了另一個非常有用的操作符.retryWhen(),我們可以通過判斷異常類型,來決定是否發(fā)起重試(重訂閱)。

.retryWhen()的函數(shù)簽名如下:

public final Observable<T> retryWhen(Func1<? super Observable<? extends java.lang.Throwable>,? extends Observable<?>> notificationHandler)

其中notificationHandler是我們需要實現(xiàn)的函數(shù),它有兩個概念必須弄清:

  • 參數(shù)Observable<Throwable>,其中的泛型意指上游操作符拋出的異常,我們可以通過這個條件來判斷異常的類型。

  • 返回值Observable<?>,通配符(泛型)表示我們可以返回任意類型的Observable,它的作用是:一旦這個Observable通過onNext()發(fā)送事件,則重訂閱(重試)發(fā)生一次,如果這個Observable調用了onComplete或者onError那么將跳過重訂閱,最終這些終止事件將會向下傳遞,從此這個操作符的重訂閱功能也就失效了。

RX-CODE!

下面這段代碼是我使用的notificationHandler的實現(xiàn)類RetryWhenHandler,它基本滿足了我的重試要求。

final class RetryWhenHandler implements Func1<Observable<? extends Throwable>, Observable<Long>> {

  private static final int INITIAL = 1;
  private int maxConnectCount = 1;

  RetryWhenHandler(int retryCount) {
    this.maxConnectCount += retryCount;
  }

  @Override public Observable<Long> call(Observable<? extends Throwable> errorObservable) {
    return errorObservable.zipWith(Observable.range(INITIAL, maxConnectCount),
        new Func2<Throwable, Integer, ThrowableWrapper>() {
          @Override public ThrowableWrapper call(Throwable throwable, Integer i) {

            //①
            if (throwable instanceof IOException) return new ThrowableWrapper(throwable, i);

            return new ThrowableWrapper(throwable, maxConnectCount);
          }
        }).concatMap(new Func1<ThrowableWrapper, Observable<Long>>() {
      @Override public Observable<Long> call(ThrowableWrapper throwableWrapper) {

        final int retryCount = throwableWrapper.getRetryCount();

        //②
        if (maxConnectCount == retryCount) {
          return Observable.error(throwableWrapper.getSourceThrowable());
        }

        //③
        return Observable.timer((long) Math.pow(2, retryCount), TimeUnit.SECONDS,
            Schedulers.immediate());
      }
    });
  }

  private static final class ThrowableWrapper {

    private Throwable sourceThrowable;
    private Integer retryCount;

    ThrowableWrapper(Throwable sourceThrowable, Integer retryCount) {
      this.sourceThrowable = sourceThrowable;
      this.retryCount = retryCount;
    }

    Throwable getSourceThrowable() {
      return sourceThrowable;
    }

    Integer getRetryCount() {
      return retryCount;
    }
  }
}

有三點地方需要注意:

① 只在IOException的情況下記錄本次請求在最大請求次數(shù)中的位置,否則視為最后一次請求,避免多余的請求重試。

②如果最后一次網絡請求依然遭遇了異常,則將此異常繼續(xù)向下傳遞,以便在最后的onError()函數(shù)中處理。

③使用.timer()操作符實現(xiàn)一個簡單的二進制指數(shù)退避算法,需要注意的是.timer()操作符默認執(zhí)行在Schedulers.computation(),我們并不希望它切換到別的線程去執(zhí)行重試邏輯,因此使用了它的重載函數(shù),并指定在當前線程立即執(zhí)行。

@Retry

由于retrofit的請求參數(shù)是基于函數(shù)描述的,因此我們創(chuàng)建一個注解Retry用來描述重試次數(shù)。代碼如下:

@Documented
@Retention(RUNTIME)
@Target(METHOD)
public @interface Retry {
  //retry times when an IOException is encountered
  int count() default 0;
}

值得一提的是,我們只希望這個注解能夠被聲明在方法上,而且必須是RuntimeVisibleAnnotations,否則我們無法在運行時拿到。

假設你已經閱讀過了retrofit的源碼,至少知道如何使用CallAdapter.Factory來定義一個CallAdapter。如果對它不了解,則只需要記住,在CallAdapter.Factory中我們必須實現(xiàn)的抽象方法,其中第二個參數(shù)annotations包含了我們定義在方法上的所有RUNTIME注解。:

 public abstract @Nullable CallAdapter<?, ?> get(Type returnType, Annotation[] annotations,
 Retrofit retrofit);

接下來,稍微改造一下RxJavaCallAdapter的構造函數(shù),添加一個重試變量,并在Observable調用鏈中添加我們之前已經寫好的RetryWhenHandler:

final class RxJavaCallAdapter<R> implements CallAdapter<R, Object> {
  private final Type responseType;
  private final @Nullable Scheduler scheduler;
  private final int retryCount;
  private final boolean isAsync;
  private final boolean isResult;
  private final boolean isBody;
  private final boolean isSingle;
  private final boolean isCompletable;

  RxJavaCallAdapter(Type responseType, @Nullable Scheduler scheduler, int retryCount,
      boolean isAsync, boolean isResult, boolean isBody, boolean isSingle, boolean isCompletable) {
    this.responseType = responseType;
    this.scheduler = scheduler;
    this.retryCount = retryCount
    this.isAsync = isAsync;
    this.isResult = isResult;
    this.isBody = isBody;
    this.isSingle = isSingle;
    this.isCompletable = isCompletable;
  }

  @Override public Type responseType() {
    return responseType;
  }

  @Override public Object adapt(Call<R> call) {
    OnSubscribe<Response<R>> callFunc = isAsync
        ? new CallEnqueueOnSubscribe<>(call)
        : new CallExecuteOnSubscribe<>(call);

    OnSubscribe<?> func;
    if (isResult) {
      func = new ResultOnSubscribe<>(callFunc);
    } else if (isBody) {
      func = new BodyOnSubscribe<>(callFunc);
    } else {
      func = callFunc;
    }
    Observable<?> observable = Observable.create(func).retryWhen(new RetryWhenHandler(retryCount));

    if (scheduler != null) {
      observable = observable.subscribeOn(scheduler);
    }

    if (isSingle) {
      return observable.toSingle();
    }
    if (isCompletable) {
      return observable.toCompletable();
    }
    return observable;
  }
}

解析@Retry注解的操作需要放在RxJavaCallAdapterFactory#Line104中:

int count;
for (Annotation annotation : annotations) {
  if (!Retry.class.isAssignableFrom(annotation.getClass())) continue;
  count = Retry.class.cast(annotation).count();
  if (count<0) throw new IllegalArgumentException(
      "The count in the \'@Retry\' is less than zero");
}

總結

至此,我們基本完成了通過RxJava為retrofit添加重試的功能,它利用retrofit本身的“基于方法描述的特性”,因此足夠靈活,而且擴展性也很高 : )

當然,不局限于此,如果你使用了okhttp,還可以通過自定義Interceptor的方式,為你的網絡請求添加失敗重試功能。

這篇文章只是提供一個簡單的思路,對于健壯應用程序,我們仍然需要不斷的嘗試與探索,如果你有更好的經驗,歡迎分享,如果你喜歡這篇文章,請點個贊。

文中所有代碼,都可以從github上獲取Forked from retrofit,希望這篇文章能夠對你所有幫助。Happy coding, enjoy it.

參考

【譯】對RxJava中.repeatWhen()和.retryWhen()操作符的思考 - 小鄧子

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內容

友情鏈接更多精彩內容