Retrofit 2.1 + Rxjava 源碼解析(一)

1.創(chuàng)建Retrofit對象


OkHttpClient.Builder okHttpClient = new OkHttpClient.Builder();
retrofit = new Retrofit.Builder()
                .client(okHttpClient.build())
                .addConverterFactory(GsonConverterFactory.create())
                .addCallAdapterFactory(RxJavaCallAdapterFactory.create())
                .baseUrl(base_url)
                .build();

這里是普通的 Retrofit 對象創(chuàng)建過程,傳入一些必要的參數(shù):okHttpClientconverterFactory,callAdapterFactory(不搭配 Rxjava 的時候使用 Retrofit 默認(rèn)的 callAdapterFactory,什么都不做),baseUrl 。

這里特別要注意的是傳入了 RxJavaCallAdapterFactory.create() 這個RxjavaCallAdapter 對象,這個對象將徹底改變 Retrofit 的使用。使得 Retrofit 搭配 Rxjava 變成可能,不得不佩服 Retrofit 作者的編程功底,開放 CallAdapterFactory 這個接口,使 Retrofit 的靈活性更高。

2.創(chuàng)建接口的動態(tài)代理對象

給出實驗的接口

public interface NetApiService {

    //post請求
    @FormUrlEncoded
    @POST("{url}")
    Observable<ResponseBody> executePost(
            @Path("url") String url,
            @Field("params") String params,
            @Field("signature") String signature
    );

}
netApiService = retrofit.create(NetApiService .class);  //返回一個動態(tài)代理對象

這里也是 Retrofit 神奇的地方,傳入一個接口,就可以生成實現(xiàn)了這個接口的對象,當(dāng)然這個只是 Java 代碼生成的動態(tài)代理對象。下面我們進(jìn)
create() 方法看看。

public <T> T create(final Class<T> service) {
    Utils.validateServiceInterface(service);  //驗證外部傳進(jìn)的“服務(wù)”接口是否合法
    if (validateEagerly) {
      eagerlyValidateMethods(service);  //根據(jù)validateEagerly判斷是否對接口中的全部方法進(jìn)行緩存
    }
    //使用Proxy工廠類返回一個泛型動態(tài)代理實例。
    return (T) Proxy.newProxyInstance(service.getClassLoader(), new Class<?>[] { service },
        new InvocationHandler() {
          private final Platform platform = Platform.get();

          @Override public Object invoke(Object proxy, Method method, Object... args)
              throws Throwable {
            // If the method is a method from Object then defer to normal invocation.
            if (method.getDeclaringClass() == Object.class) {
              return method.invoke(this, args);
            }
            if (platform.isDefaultMethod(method)) {
              return platform.invokeDefaultMethod(method, service, proxy, args);
            }
            ServiceMethod serviceMethod = loadServiceMethod(method);
            OkHttpCall okHttpCall = new OkHttpCall<>(serviceMethod, args);
            return serviceMethod.callAdapter.adapt(okHttpCall);
          }
        });
  }

在這里我們首先對傳入的接口進(jìn)行檢驗是否是接口,然后根據(jù) validateEagerly 判斷是否對接口中的全部方法進(jìn)行緩存,最后我們用 java.lang.reflect.Proxy; 創(chuàng)建一個泛型的動態(tài)代理對象,返回這個對象。(不懂 java動態(tài)代理技術(shù) 的同學(xué)別著急,我會在文末給出參考資料)

3.創(chuàng)建Observable

Observable<ResponseBody> observable = netApiService.executePost(url, params, signature);

調(diào)用動態(tài)代理對象的接口方法,這時候會調(diào)用

new InvocationHandler() {
          private final Platform platform = Platform.get();

          @Override public Object invoke(Object proxy, Method method, Object... args)
              throws Throwable {
            // If the method is a method from Object then defer to normal invocation.
            if (method.getDeclaringClass() == Object.class) {
              return method.invoke(this, args);
            }
            if (platform.isDefaultMethod(method)) {
              return platform.invokeDefaultMethod(method, service, proxy, args);
            }
            ServiceMethod serviceMethod = loadServiceMethod(method);
            OkHttpCall okHttpCall = new OkHttpCall<>(serviceMethod, args);
            return serviceMethod.callAdapter.adapt(okHttpCall);
          }
        }

InvocationHandlerinvoke() 方法,在這里我們可以看到有三個參數(shù):
proxy 表示通過 Proxy.newProxyInstance() 生成的代理類對象。
method 表示代理對象被調(diào)用的函數(shù)。
args 表示代理對象被調(diào)用的函數(shù)的參數(shù)。
調(diào)用代理對象的每個函數(shù)實際最終都是調(diào)用了 InvocationHandlerinvoke 函數(shù)。

由于這個是接口的方法,所以不會進(jìn)第一個 if ,因為也不是默認(rèn)方法,所以也不會進(jìn)第二個 if 。這樣就可以看到我們的代理對象在調(diào)用了接口的方法后實際上是 new 了一個 okHttpCall<> 對象,然后將這個對象作為參數(shù)傳進(jìn)了 callAdapter.adapt(); 方法中。

由于我們之前傳入的是 RxJavaCallAdapterFactory.create() ,所以我們深入 RxJavaCallAdapterFactory.java 看看構(gòu)造 Observable 的方法,在adapt()可以看到:

static final class ResponseCallAdapter implements CallAdapter<Observable<?>> {
    private final Type responseType;
    private final Scheduler scheduler;

    ResponseCallAdapter(Type responseType, Scheduler scheduler) {
      this.responseType = responseType;
      this.scheduler = scheduler;
    }

    @Override public Type responseType() {
      return responseType;
    }

    @Override public <R> Observable<Response<R>> adapt(Call<R> call) {
      Observable<Response<R>> observable = Observable.create(new CallOnSubscribe<>(call));
      if (scheduler != null) {
        return observable.subscribeOn(scheduler);
      }
      return observable;
    }
  }

在這里我們看到,這里將傳入的 okHttp 對象 作為參數(shù),構(gòu)造了 CallOnSubscribe 對象。
CallOnSubscribe 是何方神圣???按照 Rxjava 構(gòu)造 Observable 方法來說,這個 CallOnSubscribe 應(yīng)該是一個實現(xiàn)了Observable.OnSubscribe<T> 接口的對象。

我們看看源碼,果然如此。

static final class CallOnSubscribe<T> implements Observable.OnSubscribe<Response<T>> {
    private final Call<T> originalCall;

    CallOnSubscribe(Call<T> originalCall) {
      this.originalCall = originalCall;
    }

    @Override public void call(final Subscriber<? super Response<T>> subscriber) {
      // Since Call is a one-shot type, clone it for each new subscriber.
      Call<T> call = originalCall.clone();

      // Wrap the call in a helper which handles both unsubscription and backpressure.
      RequestArbiter<T> requestArbiter = new RequestArbiter<>(call, subscriber);
      subscriber.add(requestArbiter);
      subscriber.setProducer(requestArbiter);
    }
  }

看到這里,相信你大概也懂了為什么調(diào)用生成的動態(tài)代理對象的接口方法不像只使用 Retrofit 那樣返回一個 okHttpCall<> 對象,而是返回一個 Observable<ResponseBody> 對象。其實這就是
RxJavaCallAdapterFactory 做的轉(zhuǎn)換。

仔細(xì)看 CallOnSubscribe<T>call() 方法,我們發(fā)現(xiàn)這里的 subscriber (其實是調(diào)用 subscribe() 方法時傳進(jìn)來的 subscriber ,就是外部的觀察者)添加了一個 requestArbiter 對象。這個對象很重要,在 subscriber.setProducer(requestArbiter); 時,它會控制
okHttpCall對象 直接聯(lián)網(wǎng)獲取數(shù)據(jù),然后回調(diào)給觀察者 subscriber 。

4.observable.subscribe(subscriber);訂閱

這里的代碼不多,就一行 observable.subscribe(subscriber); 。我們仔細(xì)看看在 subscribe 方法里面發(fā)生了什么神奇的事?(提前劇透一下,Observable.OnSubscribe<T>對象 很棒,它就相當(dāng)于橋梁,將 Observable 和 Observer 連接起來)

 public final Subscription subscribe(Subscriber<? super T> subscriber) {
        return Observable.subscribe(subscriber, this);
    }
    
    static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
     // validate and proceed
    //判斷傳進(jìn)來的參數(shù),即是觀察者對象,被觀察者對象,是否為空。
        if (subscriber == null) {
            throw new IllegalArgumentException("observer can not be null");
        }
        if (observable.onSubscribe == null) {
            throw new IllegalStateException("onSubscribe function can not be null.");
            /*
             * the subscribe function can also be overridden but generally that's not the appropriate approach
             * so I won't mention that in the exception
             */
        }
        
        // new Subscriber so onStart it
       //重要的操作,可以在訂閱之前做一些準(zhǔn)備工作
        subscriber.onStart();
        
        /*
         * See https://github.com/ReactiveX/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls
         * to user code from within an Observer"
         */
        // if not already wrapped
        if (!(subscriber instanceof SafeSubscriber)) {
            // assign to `observer` so we return the protected version
            subscriber = new SafeSubscriber<T>(subscriber);
        }

        // The code below is exactly the same an unsafeSubscribe but not used because it would 
        // add a significant depth to already huge call stacks.
        try {
            // allow the hook to intercept and/or decorate
            hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);
            return hook.onSubscribeReturn(subscriber);
        } catch (Throwable e) {
            // special handling for certain Throwable/Error/Exception types
            Exceptions.throwIfFatal(e);
            // in case the subscriber can't listen to exceptions anymore
            if (subscriber.isUnsubscribed()) {
                RxJavaPluginUtils.handleException(hook.onSubscribeError(e));
            } else {
                // if an unhandled error occurs executing the onSubscribe we will propagate it
                try {
                    subscriber.onError(hook.onSubscribeError(e));
                } catch (Throwable e2) {
                    Exceptions.throwIfFatal(e2);
                    // if this happens it means the onError itself failed (perhaps an invalid function implementation)
                    // so we are unable to propagate the error correctly and will just throw
                    RuntimeException r = new OnErrorFailedException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
                    // TODO could the hook be the cause of the error in the on error handling.
                    hook.onSubscribeError(r);
                    // TODO why aren't we throwing the hook's return value.
                    throw r;
                }
            }
            return Subscriptions.unsubscribed();
        }
    }

在這段代碼中,我們看到 subscribe() 過程中,會先調(diào)用 onStart() ,一般這個方法在 subscribe 剛開始,而事件還未發(fā)送之前被調(diào)用,可以用于做一些準(zhǔn)備工作,例如數(shù)據(jù)的清零或重置。這是一個可選方法,默認(rèn)情況下它的實現(xiàn)為空。需要注意的是,如果對準(zhǔn)備工作的線程有要求(例如彈出一個顯示進(jìn)度的對話框,這必須在主線程執(zhí)行), onStart()就不適用了(因為它總是在 subscribe 所發(fā)生的線程被調(diào)用,而不能指定線程。要在指定的線程來做準(zhǔn)備工作,可以使用 doOnSubscribe()
方法)(先記住這個知識點,請保留,現(xiàn)在先不管線程切換)

高能來了?。?!
我們重點看看這句代碼:

hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);
            return hook.onSubscribeReturn(subscriber);
 public <T> OnSubscribe<T> onSubscribeStart(Observable<? extends T> observableInstance, final OnSubscribe<T> onSubscribe) {
        // pass through by default
        return onSubscribe;
    }

其實 onSubscribeStart() 方法直接返回了 onSubscribe 對象,然后直接調(diào)用 onSubscribecall(subscriber) 方法。記得我們剛剛分析,這里其實是調(diào)用了 CallOnSubscribe<T>對象call() 方法。也就是在這里進(jìn)行了聯(lián)網(wǎng)獲取數(shù)據(jù),然后回調(diào) Subscriber 觀察者的方法。(具體的代碼就是 call() 方法的subscriber.setProducer(requestArbiter);)。

public void setProducer(Producer p) {
        long toRequest;
        boolean passToSubscriber = false;
        synchronized (this) {
            toRequest = requested;
            producer = p;
            if (subscriber != null) {
                // middle operator ... we pass through unless a request has been made
                if (toRequest == NOT_SET) {
                    // we pass through to the next producer as nothing has been requested
                    passToSubscriber = true;
                }
            }
        }
        // do after releasing lock
        if (passToSubscriber) {
            subscriber.setProducer(producer);
        } else {
            // we execute the request with whatever has been requested (or Long.MAX_VALUE)
            if (toRequest == NOT_SET) {
                producer.request(Long.MAX_VALUE);
            } else {
                producer.request(toRequest);
            }
        }
    }

最后會調(diào)用 producer.request(toRequest); 方法。
這個 request() 方法,就是 RequestArbiter<T> requestArbiter = new RequestArbiter<>(call, subscriber);request() 。

static final class RequestArbiter<T> extends AtomicBoolean implements Subscription, Producer {
    private final Call<T> call;
    private final Subscriber<? super Response<T>> subscriber;

    RequestArbiter(Call<T> call, Subscriber<? super Response<T>> subscriber) {
      this.call = call;
      this.subscriber = subscriber;
    }

    @Override public void request(long n) {
      if (n < 0) throw new IllegalArgumentException("n < 0: " + n);
      if (n == 0) return; // Nothing to do when requesting 0.
      if (!compareAndSet(false, true)) return; // Request was already triggered.

      try {
        Response<T> response = call.execute();
        if (!subscriber.isUnsubscribed()) {
          subscriber.onNext(response);
        }
      } catch (Throwable t) {
        Exceptions.throwIfFatal(t);
        if (!subscriber.isUnsubscribed()) {
          subscriber.onError(t);
        }
        return;
      }

      if (!subscriber.isUnsubscribed()) {
        subscriber.onCompleted();
      }
    }

    @Override public void unsubscribe() {
      call.cancel();
    }

    @Override public boolean isUnsubscribed() {
      return call.isCanceled();
    }
  }

Response<T> response = call.execute(); 這里進(jìn)行了網(wǎng)絡(luò)請求;
if (!subscriber.isUnsubscribed()) { subscriber.onNext(response); } 這里進(jìn)行回調(diào)。

接下來的 onError()onCompleted() 方法的回調(diào)一樣的,就不分析了。

至此,我們就完整地了解了 Retrofit + Rxjava 中從創(chuàng)建 Observable 和 Observer 到 Observable 訂閱 Observer 的流程,以及中間隱藏的聯(lián)網(wǎng)和回調(diào)的過程。

參考資料

Retrofit 2.1 源碼分析
Java 動態(tài)代理技術(shù)
Rxjava 源碼分析一

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容