參考:
源碼版本:3.14.9
源碼地址:https://github.com/square/okhttp
此篇文章主要講解OkHttp的請(qǐng)求過(guò)程,以及源碼中一些比較難懂的知識(shí)點(diǎn)的使用,并把這些知識(shí)點(diǎn)抽取成一個(gè)Demo,可以比較深刻的去理解源碼的執(zhí)行過(guò)程。Demo 包含 :
- 源碼中 Dispatcher類(lèi)的調(diào)度過(guò)程
- 以及RealCall類(lèi)中getResponseWithInterceptorChain()方法中責(zé)任鏈模式的調(diào)度過(guò)程。

簡(jiǎn)單調(diào)用
String url = "www.baidu.com";
OkHttpClient client = new OkHttpClient.Builder().build();
Request request = new Request.Builder().url(url).build();
Call call = client.newCall(request);
call.enqueue(new Callback() {
@Override
public void onFailure(Call call, IOException e) {
}
@Override
public void onResponse(Call call, Response response) throws IOException {
}
});
源碼解析流程
此處不講解OkHttpClient的初始化過(guò)程,以及Request的構(gòu)建過(guò)程,用到的是 Builder設(shè)計(jì)模式,直接從enqueue()方法開(kāi)始解析。
1. 請(qǐng)求發(fā)送過(guò)程
- 注意:call.execute()是同步方法,call.enqueue()是異步方法,一般調(diào)用call.enqueue()
點(diǎn)擊查看call.enqueue()方法,進(jìn)入到 RealCall類(lèi)中的call.enqueue()方法,如下:
@Override public void enqueue(Callback responseCallback) {
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
transmitter.callStart();
client.dispatcher().enqueue(new AsyncCall(responseCallback));
}
這個(gè) RealCall里面執(zhí)行的方法,首先會(huì)把這個(gè)Call加入到Dispatcher這個(gè)調(diào)度器里面,進(jìn)入Dispatcher類(lèi)中的 enqueue() 方法,如下:
void enqueue(AsyncCall call) {
synchronized (this) {
//將此次請(qǐng)求加入準(zhǔn)備請(qǐng)求隊(duì)列中
readyAsyncCalls.add(call);
// Mutate the AsyncCall so that it shares the AtomicInteger of an existing running call to
// the same host.
if (!call.get().forWebSocket) {
AsyncCall existingCall = findExistingCallWithHost(call.host());
if (existingCall != null) call.reuseCallsPerHostFrom(existingCall);
}
}
promoteAndExecute();
}
在Dispatcher類(lèi)中包含了三個(gè)隊(duì)列,一個(gè)線程池:
為了管理所有的請(qǐng)求,Dispatcher采用了隊(duì)列+生產(chǎn)+消費(fèi)的模式。
- 為同步執(zhí)行提供了runningSyncCalls來(lái)管理所有的同步請(qǐng)求;
- 為異步執(zhí)行提供了runningAsyncCalls和readyAsyncCalls來(lái)管理所有的異步請(qǐng)求。其中readyAsyncCalls是在當(dāng)前可用資源不足時(shí),用于緩存請(qǐng)求的。
- ExecutorService線程池
查看 promoteAndExecute()方法,
private boolean promoteAndExecute() {
assert (!Thread.holdsLock(this));
List<AsyncCall> executableCalls = new ArrayList<>();
boolean isRunning;
synchronized (this) {
for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
AsyncCall asyncCall = i.next();
if (runningAsyncCalls.size() >= maxRequests) break; // Max capacity.
if (asyncCall.callsPerHost().get() >= maxRequestsPerHost) continue; // Host max capacity.
i.remove();
asyncCall.callsPerHost().incrementAndGet();
executableCalls.add(asyncCall);
runningAsyncCalls.add(asyncCall);
}
isRunning = runningCallsCount() > 0;
}
for (int i = 0, size = executableCalls.size(); i < size; i++) {
AsyncCall asyncCall = executableCalls.get(i);
//調(diào)用 AsyncCall 類(lèi) executeOn的方法,在executeOn方法中開(kāi)啟線程池
asyncCall.executeOn(executorService());//1
}
return isRunning;
}
在1處會(huì)調(diào)用AsyncCall中的executeOn()方法
void executeOn(ExecutorService executorService) {
assert (!Thread.holdsLock(client.dispatcher()));
boolean success = false;
try {
executorService.execute(this);//開(kāi)啟線程池
success = true;
} catch (RejectedExecutionException e) {
InterruptedIOException ioException = new InterruptedIOException("executor rejected");
ioException.initCause(e);
transmitter.noMoreExchanges(ioException);
responseCallback.onFailure(RealCall.this, ioException);
} finally {
if (!success) {
client.dispatcher().finished(this); // This call is no longer running!
}
}
}
此時(shí)會(huì)調(diào)用AsyncCall中的execute()方法,
@Override protected void execute() {
boolean signalledCallback = false;
transmitter.timeoutEnter();
try {
//核心函數(shù)
Response response = getResponseWithInterceptorChain();
signalledCallback = true;
responseCallback.onResponse(RealCall.this, response);
} catch (IOException e) {
if (signalledCallback) {
// Do not signal the callback twice!
Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
} else {
responseCallback.onFailure(RealCall.this, e);
}
} catch (Throwable t) {
cancel();
if (!signalledCallback) {
IOException canceledException = new IOException("canceled due to " + t);
canceledException.addSuppressed(t);
responseCallback.onFailure(RealCall.this, canceledException);
}
throw t;
} finally {
client.dispatcher().finished(this);
}
}
}
getResponseWithInterceptorChain()方法就是執(zhí)行攔截器鏈,直到返回 Response。通過(guò)責(zé)任鏈模式循環(huán)調(diào)用所有攔截器,每個(gè)攔截器可以根據(jù)Request和Reponse前后執(zhí)行相應(yīng)的邏輯。
Response getResponseWithInterceptorChain() throws IOException {
// Build a full stack of interceptors.
List<Interceptor> interceptors = new ArrayList<>();
interceptors.addAll(client.interceptors());
interceptors.add(new RetryAndFollowUpInterceptor(client));
interceptors.add(new BridgeInterceptor(client.cookieJar()));
interceptors.add(new CacheInterceptor(client.internalCache()));
interceptors.add(new ConnectInterceptor(client));
if (!forWebSocket) {
interceptors.addAll(client.networkInterceptors());
}
interceptors.add(new CallServerInterceptor(forWebSocket));
Interceptor.Chain chain = new RealInterceptorChain(interceptors, transmitter, null, 0,
originalRequest, this, client.connectTimeoutMillis(),
client.readTimeoutMillis(), client.writeTimeoutMillis());
boolean calledNoMoreExchanges = false;
try {
Response response = chain.proceed(originalRequest);
if (transmitter.isCanceled()) {
closeQuietly(response);
throw new IOException("Canceled");
}
return response;
} catch (IOException e) {
calledNoMoreExchanges = true;
throw transmitter.noMoreExchanges(e);
} finally {
if (!calledNoMoreExchanges) {
transmitter.noMoreExchanges(null);
}
}
}
這里把自定義的攔截器,還有內(nèi)置的必要的攔截器都添加到一個(gè)List里面了,最后用List的攔截器創(chuàng)建一個(gè)攔截器的鏈—— RealInterceptorChain。而這個(gè)RealInterceptorChain 其實(shí)是實(shí)現(xiàn)了Interceptor 里面的一個(gè)Chain 接口。
我們先了解一下RealInterceptorChain 這個(gè)類(lèi)是如何進(jìn)行工作的。
首先RealInterceptorChain,是實(shí)現(xiàn)了Interceptor.Chain 接口的一個(gè)類(lèi),這個(gè)接口有是三個(gè)方法,后面會(huì)介紹到,其中一個(gè)就是前面調(diào)用的proceed方法。
public Response proceed(Request request, Transmitter transmitter, @Nullable Exchange exchange)
throws IOException {
...
// Call the next interceptor in the chain.
RealInterceptorChain next = new RealInterceptorChain(interceptors, transmitter, exchange,
index + 1, request, call, connectTimeout, readTimeout, writeTimeout);
Interceptor interceptor = interceptors.get(index);
Response response = interceptor.intercept(next);
return response;
}
省略了前后的一些代碼,發(fā)現(xiàn),其實(shí)這里又新建一個(gè) RealInterceptorChain,然后調(diào)用了這個(gè)攔截器的 intercept 的方法。那么在每個(gè)攔截器里面做了什么,這個(gè)攔截的方法又是做了什么呢?
首先看一下攔截器這個(gè)接口:
public interface Interceptor {
Response intercept(Chain chain) throws IOException;
interface Chain {
Request request();
Response proceed(Request request) throws IOException;
...
}
}
很明顯,每個(gè)實(shí)現(xiàn)Interceptor的攔截器。都會(huì)實(shí)現(xiàn)了intercept,這樣一個(gè)攔截的方法。而傳入的就是一個(gè)Chain。符合前面RealInterceptorChain的調(diào)用。
以一個(gè)攔截器的代碼為例:BridgeInterceptor,這個(gè)類(lèi)實(shí)現(xiàn)的intercept方法,而這個(gè)方法里面,除了那些前置或后續(xù)的操作之外,在中間你會(huì)發(fā)現(xiàn) 下面一句代碼:
Response networkResponse = chain.proceed(requestBuilder.build());
又通過(guò)傳進(jìn)來(lái)的Chain ,重新調(diào)用了proceed的方法。好吧,到這里應(yīng)該可以了解到這些攔截器,是如何一個(gè)個(gè)調(diào)用被調(diào)用的了,原理都是通過(guò)了中間的一個(gè)RealInterceptorChain,一個(gè)接一個(gè)的向下調(diào)用。然后得到的結(jié)果又一個(gè)一個(gè)向上傳。
總結(jié)一下,這些攔截器的調(diào)用:
剛開(kāi)始getResponseWithInterceptorChain的最后開(kāi)始調(diào)用了 chain.proceed,然后在RealInterceptorChain的里面,又新建一個(gè) RealInterceptorChain,并且調(diào)用當(dāng)前的攔截器的 intercept,并且把新建的Chain傳遞進(jìn)去,然后攔截器的 intercept方法里面,除了一些自身的操作外,又調(diào)用Chain的proceed方法。就這樣一直調(diào)用新建Chian, 一直調(diào)用到最后一個(gè)攔截器。
這里的調(diào)用其實(shí)是一個(gè)迭代,遞歸的結(jié)構(gòu)。請(qǐng)求做了前一步的處理才能到下一級(jí),一級(jí)級(jí)下去,到最后真正發(fā)出去請(qǐng)求,相對(duì)做了攔截。而得到結(jié)果,只有當(dāng)最后一個(gè)攔截器完全了請(qǐng)求,拿到結(jié)果之后,才會(huì)把結(jié)果往上一級(jí),上一級(jí)拿到結(jié)果,做了相應(yīng)的處理之后,再到上一級(jí),這就是對(duì)結(jié)果的攔截處理。
Demo
結(jié)構(gòu):

調(diào)用:
public static void main(String[] args) throws IOException {
//Dispatcher類(lèi)的調(diào)度過(guò)程
new Dispatcher().enqueue(new AsyncCall("22","33"));
//demo02調(diào)用RealCall類(lèi)中g(shù)etResponseWithInterceptorChain()
//方法中責(zé)任鏈模式的調(diào)度過(guò)程
Request request = new Request();
List<Interceptor> interceptors = new ArrayList<>();
interceptors.add(new RetryAndFollowUpInterceptor());
interceptors.add(new BridgeInterceptor());
interceptors.add(new CacheInterceptor());
interceptors.add(new ConnectInterceptor());
Interceptor.Chain chain = new RealInterceptorChain(interceptors,0,request);
Response response = chain.proceed(request);
}
運(yùn)行結(jié)果:
---Dispatcher---enqueue-
--AsyncCall執(zhí)行execute方法--
1.失敗重試攔截器
2.請(qǐng)求和響應(yīng)轉(zhuǎn)換攔截器
3.緩存攔截器
4.與服務(wù)器建立連接攔截器
RealInterceptorChain 第 3 次執(zhí)行
RealInterceptorChain 第 2 次執(zhí)行
RealInterceptorChain 第 1 次執(zhí)行
RealInterceptorChain 第 0 次執(zhí)行
Demo挺簡(jiǎn)單的,可以自行下載:
https://github.com/Ityang/OKHttpDemo