1.創(chuàng)建Retrofit對象
OkHttpClient.Builder okHttpClient = new OkHttpClient.Builder();
retrofit = new Retrofit.Builder()
.client(okHttpClient.build())
.addConverterFactory(GsonConverterFactory.create())
.addCallAdapterFactory(RxJavaCallAdapterFactory.create())
.baseUrl(base_url)
.build();
這里是普通的 Retrofit 對象創(chuàng)建過程,傳入一些必要的參數(shù):okHttpClient,converterFactory,callAdapterFactory(不搭配 Rxjava 的時候使用 Retrofit 默認(rèn)的 callAdapterFactory,什么都不做),baseUrl 。
這里特別要注意的是傳入了 RxJavaCallAdapterFactory.create() 這個RxjavaCallAdapter 對象,這個對象將徹底改變 Retrofit 的使用。使得 Retrofit 搭配 Rxjava 變成可能,不得不佩服 Retrofit 作者的編程功底,開放 CallAdapterFactory 這個接口,使 Retrofit 的靈活性更高。
2.創(chuàng)建接口的動態(tài)代理對象
給出實驗的接口
public interface NetApiService {
//post請求
@FormUrlEncoded
@POST("{url}")
Observable<ResponseBody> executePost(
@Path("url") String url,
@Field("params") String params,
@Field("signature") String signature
);
}
netApiService = retrofit.create(NetApiService .class); //返回一個動態(tài)代理對象
這里也是 Retrofit 神奇的地方,傳入一個接口,就可以生成實現(xiàn)了這個接口的對象,當(dāng)然這個只是 Java 代碼生成的動態(tài)代理對象。下面我們進(jìn)
create() 方法看看。
public <T> T create(final Class<T> service) {
Utils.validateServiceInterface(service); //驗證外部傳進(jìn)的“服務(wù)”接口是否合法
if (validateEagerly) {
eagerlyValidateMethods(service); //根據(jù)validateEagerly判斷是否對接口中的全部方法進(jìn)行緩存
}
//使用Proxy工廠類返回一個泛型動態(tài)代理實例。
return (T) Proxy.newProxyInstance(service.getClassLoader(), new Class<?>[] { service },
new InvocationHandler() {
private final Platform platform = Platform.get();
@Override public Object invoke(Object proxy, Method method, Object... args)
throws Throwable {
// If the method is a method from Object then defer to normal invocation.
if (method.getDeclaringClass() == Object.class) {
return method.invoke(this, args);
}
if (platform.isDefaultMethod(method)) {
return platform.invokeDefaultMethod(method, service, proxy, args);
}
ServiceMethod serviceMethod = loadServiceMethod(method);
OkHttpCall okHttpCall = new OkHttpCall<>(serviceMethod, args);
return serviceMethod.callAdapter.adapt(okHttpCall);
}
});
}
在這里我們首先對傳入的接口進(jìn)行檢驗是否是接口,然后根據(jù) validateEagerly 判斷是否對接口中的全部方法進(jìn)行緩存,最后我們用 java.lang.reflect.Proxy; 創(chuàng)建一個泛型的動態(tài)代理對象,返回這個對象。(不懂 java動態(tài)代理技術(shù) 的同學(xué)別著急,我會在文末給出參考資料)
3.創(chuàng)建Observable
Observable<ResponseBody> observable = netApiService.executePost(url, params, signature);
調(diào)用動態(tài)代理對象的接口方法,這時候會調(diào)用
new InvocationHandler() {
private final Platform platform = Platform.get();
@Override public Object invoke(Object proxy, Method method, Object... args)
throws Throwable {
// If the method is a method from Object then defer to normal invocation.
if (method.getDeclaringClass() == Object.class) {
return method.invoke(this, args);
}
if (platform.isDefaultMethod(method)) {
return platform.invokeDefaultMethod(method, service, proxy, args);
}
ServiceMethod serviceMethod = loadServiceMethod(method);
OkHttpCall okHttpCall = new OkHttpCall<>(serviceMethod, args);
return serviceMethod.callAdapter.adapt(okHttpCall);
}
}
InvocationHandler 的 invoke() 方法,在這里我們可以看到有三個參數(shù):
proxy 表示通過 Proxy.newProxyInstance() 生成的代理類對象。
method 表示代理對象被調(diào)用的函數(shù)。
args 表示代理對象被調(diào)用的函數(shù)的參數(shù)。
調(diào)用代理對象的每個函數(shù)實際最終都是調(diào)用了 InvocationHandler 的 invoke 函數(shù)。
由于這個是接口的方法,所以不會進(jìn)第一個 if ,因為也不是默認(rèn)方法,所以也不會進(jìn)第二個 if 。這樣就可以看到我們的代理對象在調(diào)用了接口的方法后實際上是 new 了一個 okHttpCall<> 對象,然后將這個對象作為參數(shù)傳進(jìn)了 callAdapter.adapt(); 方法中。
由于我們之前傳入的是 RxJavaCallAdapterFactory.create() ,所以我們深入 RxJavaCallAdapterFactory.java 看看構(gòu)造 Observable 的方法,在adapt()可以看到:
static final class ResponseCallAdapter implements CallAdapter<Observable<?>> {
private final Type responseType;
private final Scheduler scheduler;
ResponseCallAdapter(Type responseType, Scheduler scheduler) {
this.responseType = responseType;
this.scheduler = scheduler;
}
@Override public Type responseType() {
return responseType;
}
@Override public <R> Observable<Response<R>> adapt(Call<R> call) {
Observable<Response<R>> observable = Observable.create(new CallOnSubscribe<>(call));
if (scheduler != null) {
return observable.subscribeOn(scheduler);
}
return observable;
}
}
在這里我們看到,這里將傳入的 okHttp 對象 作為參數(shù),構(gòu)造了 CallOnSubscribe 對象。
CallOnSubscribe 是何方神圣???按照 Rxjava 構(gòu)造 Observable 方法來說,這個 CallOnSubscribe 應(yīng)該是一個實現(xiàn)了Observable.OnSubscribe<T> 接口的對象。
我們看看源碼,果然如此。
static final class CallOnSubscribe<T> implements Observable.OnSubscribe<Response<T>> {
private final Call<T> originalCall;
CallOnSubscribe(Call<T> originalCall) {
this.originalCall = originalCall;
}
@Override public void call(final Subscriber<? super Response<T>> subscriber) {
// Since Call is a one-shot type, clone it for each new subscriber.
Call<T> call = originalCall.clone();
// Wrap the call in a helper which handles both unsubscription and backpressure.
RequestArbiter<T> requestArbiter = new RequestArbiter<>(call, subscriber);
subscriber.add(requestArbiter);
subscriber.setProducer(requestArbiter);
}
}
看到這里,相信你大概也懂了為什么調(diào)用生成的動態(tài)代理對象的接口方法不像只使用 Retrofit 那樣返回一個 okHttpCall<> 對象,而是返回一個 Observable<ResponseBody> 對象。其實這就是
RxJavaCallAdapterFactory 做的轉(zhuǎn)換。
仔細(xì)看 CallOnSubscribe<T> 的 call() 方法,我們發(fā)現(xiàn)這里的 subscriber (其實是調(diào)用 subscribe() 方法時傳進(jìn)來的 subscriber ,就是外部的觀察者)添加了一個 requestArbiter 對象。這個對象很重要,在 subscriber.setProducer(requestArbiter); 時,它會控制
okHttpCall對象 直接聯(lián)網(wǎng)獲取數(shù)據(jù),然后回調(diào)給觀察者 subscriber 。
4.observable.subscribe(subscriber);訂閱
這里的代碼不多,就一行 observable.subscribe(subscriber); 。我們仔細(xì)看看在 subscribe 方法里面發(fā)生了什么神奇的事?(提前劇透一下,Observable.OnSubscribe<T>對象 很棒,它就相當(dāng)于橋梁,將 Observable 和 Observer 連接起來)
public final Subscription subscribe(Subscriber<? super T> subscriber) {
return Observable.subscribe(subscriber, this);
}
static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
// validate and proceed
//判斷傳進(jìn)來的參數(shù),即是觀察者對象,被觀察者對象,是否為空。
if (subscriber == null) {
throw new IllegalArgumentException("observer can not be null");
}
if (observable.onSubscribe == null) {
throw new IllegalStateException("onSubscribe function can not be null.");
/*
* the subscribe function can also be overridden but generally that's not the appropriate approach
* so I won't mention that in the exception
*/
}
// new Subscriber so onStart it
//重要的操作,可以在訂閱之前做一些準(zhǔn)備工作
subscriber.onStart();
/*
* See https://github.com/ReactiveX/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls
* to user code from within an Observer"
*/
// if not already wrapped
if (!(subscriber instanceof SafeSubscriber)) {
// assign to `observer` so we return the protected version
subscriber = new SafeSubscriber<T>(subscriber);
}
// The code below is exactly the same an unsafeSubscribe but not used because it would
// add a significant depth to already huge call stacks.
try {
// allow the hook to intercept and/or decorate
hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);
return hook.onSubscribeReturn(subscriber);
} catch (Throwable e) {
// special handling for certain Throwable/Error/Exception types
Exceptions.throwIfFatal(e);
// in case the subscriber can't listen to exceptions anymore
if (subscriber.isUnsubscribed()) {
RxJavaPluginUtils.handleException(hook.onSubscribeError(e));
} else {
// if an unhandled error occurs executing the onSubscribe we will propagate it
try {
subscriber.onError(hook.onSubscribeError(e));
} catch (Throwable e2) {
Exceptions.throwIfFatal(e2);
// if this happens it means the onError itself failed (perhaps an invalid function implementation)
// so we are unable to propagate the error correctly and will just throw
RuntimeException r = new OnErrorFailedException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
// TODO could the hook be the cause of the error in the on error handling.
hook.onSubscribeError(r);
// TODO why aren't we throwing the hook's return value.
throw r;
}
}
return Subscriptions.unsubscribed();
}
}
在這段代碼中,我們看到 subscribe() 過程中,會先調(diào)用 onStart() ,一般這個方法在 subscribe 剛開始,而事件還未發(fā)送之前被調(diào)用,可以用于做一些準(zhǔn)備工作,例如數(shù)據(jù)的清零或重置。這是一個可選方法,默認(rèn)情況下它的實現(xiàn)為空。需要注意的是,如果對準(zhǔn)備工作的線程有要求(例如彈出一個顯示進(jìn)度的對話框,這必須在主線程執(zhí)行), onStart()就不適用了(因為它總是在 subscribe 所發(fā)生的線程被調(diào)用,而不能指定線程。要在指定的線程來做準(zhǔn)備工作,可以使用 doOnSubscribe()
方法)(先記住這個知識點,請保留,現(xiàn)在先不管線程切換)
高能來了?。?!
我們重點看看這句代碼:
hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);
return hook.onSubscribeReturn(subscriber);
public <T> OnSubscribe<T> onSubscribeStart(Observable<? extends T> observableInstance, final OnSubscribe<T> onSubscribe) {
// pass through by default
return onSubscribe;
}
其實 onSubscribeStart() 方法直接返回了 onSubscribe 對象,然后直接調(diào)用 onSubscribe 的 call(subscriber) 方法。記得我們剛剛分析,這里其實是調(diào)用了 CallOnSubscribe<T>對象 的 call() 方法。也就是在這里進(jìn)行了聯(lián)網(wǎng)獲取數(shù)據(jù),然后回調(diào) Subscriber 觀察者的方法。(具體的代碼就是 call() 方法的subscriber.setProducer(requestArbiter);)。
public void setProducer(Producer p) {
long toRequest;
boolean passToSubscriber = false;
synchronized (this) {
toRequest = requested;
producer = p;
if (subscriber != null) {
// middle operator ... we pass through unless a request has been made
if (toRequest == NOT_SET) {
// we pass through to the next producer as nothing has been requested
passToSubscriber = true;
}
}
}
// do after releasing lock
if (passToSubscriber) {
subscriber.setProducer(producer);
} else {
// we execute the request with whatever has been requested (or Long.MAX_VALUE)
if (toRequest == NOT_SET) {
producer.request(Long.MAX_VALUE);
} else {
producer.request(toRequest);
}
}
}
最后會調(diào)用 producer.request(toRequest); 方法。
這個 request() 方法,就是 RequestArbiter<T> requestArbiter = new RequestArbiter<>(call, subscriber); 的 request() 。
static final class RequestArbiter<T> extends AtomicBoolean implements Subscription, Producer {
private final Call<T> call;
private final Subscriber<? super Response<T>> subscriber;
RequestArbiter(Call<T> call, Subscriber<? super Response<T>> subscriber) {
this.call = call;
this.subscriber = subscriber;
}
@Override public void request(long n) {
if (n < 0) throw new IllegalArgumentException("n < 0: " + n);
if (n == 0) return; // Nothing to do when requesting 0.
if (!compareAndSet(false, true)) return; // Request was already triggered.
try {
Response<T> response = call.execute();
if (!subscriber.isUnsubscribed()) {
subscriber.onNext(response);
}
} catch (Throwable t) {
Exceptions.throwIfFatal(t);
if (!subscriber.isUnsubscribed()) {
subscriber.onError(t);
}
return;
}
if (!subscriber.isUnsubscribed()) {
subscriber.onCompleted();
}
}
@Override public void unsubscribe() {
call.cancel();
}
@Override public boolean isUnsubscribed() {
return call.isCanceled();
}
}
Response<T> response = call.execute(); 這里進(jìn)行了網(wǎng)絡(luò)請求;
if (!subscriber.isUnsubscribed()) { subscriber.onNext(response); } 這里進(jìn)行回調(diào)。
接下來的 onError() 和 onCompleted() 方法的回調(diào)一樣的,就不分析了。
至此,我們就完整地了解了 Retrofit + Rxjava 中從創(chuàng)建 Observable 和 Observer 到 Observable 訂閱 Observer 的流程,以及中間隱藏的聯(lián)網(wǎng)和回調(diào)的過程。