概述
前言:前一節(jié)http://www.itdecent.cn/p/f3f228d3598c,總結(jié)了一下OkHttp3的簡單使用教程。在項(xiàng)目中使用了這個(gè)網(wǎng)絡(luò)框架,在看完基本的源碼之后,還是想總結(jié)一下OkHttp的實(shí)現(xiàn)流程。在學(xué)習(xí)框架的過程中,從使用方法出發(fā),首先是怎么使用,其次是我們使用的功能在內(nèi)部是如何實(shí)現(xiàn)的,實(shí)現(xiàn)方案上有什么技巧,有什么范式。
OkHttp的整體流程
整個(gè)流程是:通過OkHttpClient將構(gòu)建的Request轉(zhuǎn)換為Call,然后在RealCall中進(jìn)行異步或同步任務(wù),最后通過一些的攔截器interceptor發(fā)出網(wǎng)絡(luò)請(qǐng)求和得到返回的response。總體流程用下面的圖表示

拆組件
在整體流程中,主要的組件是OkHttpClient,其次有Call,RealCall,Disptcher,各種Interceptors,Request和Response組件。Request和Response已經(jīng)在上一篇的對(duì)其結(jié)構(gòu)源碼進(jìn)行了分析。
1. OkHttpClient對(duì)象:網(wǎng)絡(luò)請(qǐng)求的主要操控者
創(chuàng)建OkHttpClient對(duì)象
//通過Builder構(gòu)造OkHttpClient
OkHttpClient.Builder builder = new OkHttpClient.Builder()
.connectTimeout(20, TimeUnit.SECONDS)
.writeTimeout(20, TimeUnit.SECONDS)
.readTimeout(20, TimeUnit.SECONDS);
return builder.build();
OkHttpClient.Builder類有很多變量,OkHttpClient有很多的成員變量:
final Dispatcher dispatcher; //重要:分發(fā)器,分發(fā)執(zhí)行和關(guān)閉由request構(gòu)成的Call
final Proxy proxy; //代理
final List<Protocol> protocols; //協(xié)議
final List<ConnectionSpec> connectionSpecs; //傳輸層版本和連接協(xié)議
final List<Interceptor> interceptors; //重要:攔截器
final List<Interceptor> networkInterceptors; //網(wǎng)絡(luò)攔截器
final ProxySelector proxySelector; //代理選擇
final CookieJar cookieJar; //cookie
final Cache cache; //緩存
final InternalCache internalCache; //內(nèi)部緩存
final SocketFactory socketFactory; //socket 工廠
final SSLSocketFactory sslSocketFactory; //安全套接層socket 工廠,用于HTTPS
final CertificateChainCleaner certificateChainCleaner; // 驗(yàn)證確認(rèn)響應(yīng)證書 適用 HTTPS 請(qǐng)求連接的主機(jī)名。
final HostnameVerifier hostnameVerifier; // 主機(jī)名字確認(rèn)
final CertificatePinner certificatePinner; // 證書鏈
final Authenticator proxyAuthenticator; //代理身份驗(yàn)證
final Authenticator authenticator; // 本地身份驗(yàn)證
final ConnectionPool connectionPool; //連接池,復(fù)用連接
final Dns dns; //域名
final boolean followSslRedirects; //安全套接層重定向
final boolean followRedirects; //本地重定向
final boolean retryOnConnectionFailure; //重試連接失敗
final int connectTimeout; //連接超時(shí)
final int readTimeout; //read 超時(shí)
final int writeTimeout; //write 超時(shí)
OkHttpClient完成整個(gè)請(qǐng)求設(shè)計(jì)到很多參數(shù),都可以通過OkHttpClient.builder使用創(chuàng)建者模式構(gòu)建。事實(shí)上,你能夠通過它來設(shè)置改變一些參數(shù),因?yàn)樗峭ㄟ^建造者模式實(shí)現(xiàn)的,因此你可以通過builder()來設(shè)置。如果不進(jìn)行設(shè)置,在Builder中就會(huì)使用默認(rèn)的設(shè)置:
public Builder() {
dispatcher = new Dispatcher();
protocols = DEFAULT_PROTOCOLS;
connectionSpecs = DEFAULT_CONNECTION_SPECS;
eventListenerFactory = EventListener.factory(EventListener.NONE);
proxySelector = ProxySelector.getDefault();
cookieJar = CookieJar.NO_COOKIES;
socketFactory = SocketFactory.getDefault();
hostnameVerifier = OkHostnameVerifier.INSTANCE;
certificatePinner = CertificatePinner.DEFAULT;
proxyAuthenticator = Authenticator.NONE;
authenticator = Authenticator.NONE;
connectionPool = new ConnectionPool();
dns = Dns.SYSTEM;
followSslRedirects = true;
followRedirects = true;
retryOnConnectionFailure = true;
connectTimeout = 10_000;
readTimeout = 10_000;
writeTimeout = 10_000;
pingInterval = 0;
}
2,RealCall:真正的請(qǐng)求執(zhí)行者
2.1之前文章中的Http發(fā)起同步請(qǐng)求的代碼:
Request request = new Request.Builder()
.url(url)
.build();
Response response = client.newCall(request).execute();
client.newCall(request).execute()創(chuàng)建了Call執(zhí)行了網(wǎng)絡(luò)請(qǐng)求獲得response響應(yīng)。重點(diǎn)看一看這個(gè)執(zhí)行的請(qǐng)求者的內(nèi)部是什么鬼。
/**
* Prepares the {@code request} to be executed at some point in the future.
*/
//OkHttpClient中的方法,可以看出RealCall的真面目
@Override public Call newCall(Request request) {
return RealCall.newRealCall(this, request, false /* for web socket */);
}
RealCall的構(gòu)造函數(shù):
private RealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {
//client請(qǐng)求
this.client = client;
//我們構(gòu)造的請(qǐng)求
this.originalRequest = originalRequest;
this.forWebSocket = forWebSocket;
//負(fù)責(zé)重試和重定向攔截器
this.retryAndFollowUpInterceptor = new RetryAndFollowUpInterceptor(client, forWebSocket);
}
Call對(duì)象其實(shí)是一個(gè)接口,Call的源碼:
public interface Call extends Cloneable {
/** Returns the original request that initiated this call. */
//用于返回Call對(duì)象中的request對(duì)象
Request request();
//用于執(zhí)行同步請(qǐng)求的方法
Response execute() throws IOException;
//用于執(zhí)行異步請(qǐng)求的方法,通過responseCallback回調(diào)結(jié)果
void enqueue(Callback responseCallback);
/** Cancels the request, if possible. Requests that are already complete cannot be canceled. */
//取消這個(gè)call,當(dāng)call被取消時(shí)請(qǐng)求不在執(zhí)行,拋出異常??梢杂糜诮K止請(qǐng)求
void cancel();
/**
* Returns true if this call has been either {@linkplain #execute() executed} or {@linkplain
* #enqueue(Callback) enqueued}. It is an error to execute a call more than once.
*/
//是否被執(zhí)行
boolean isExecuted();
//是否被取消
boolean isCanceled();
/**
* Create a new, identical call to this one which can be enqueued or executed even if this call
* has already been.
*/
Call clone();
interface Factory {
Call newCall(Request request);
}
}
Realcall是Call的實(shí)現(xiàn)類。顯然重要的執(zhí)行任務(wù)就交個(gè)RealCall對(duì)象execute()和enqueue(Callback responseCallback)方法了。
我們首先看 RealCall#execute:
@Override public Response execute() throws IOException {
//(1)
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
captureCallStackTrace();
//事件監(jiān)聽器調(diào)
eventListener.callStart(this);
try {
//(2)
client.dispatcher().executed(this);
//(3)
Response result = getResponseWithInterceptorChain();
if (result == null) throw new IOException("Canceled");
return result;
} catch (IOException e) {
eventListener.callFailed(this, e);
throw e;
} finally {
//(4)
client.dispatcher().finished(this);
}
}
(1)檢查這個(gè) call 是否已經(jīng)被執(zhí)行了,每個(gè) call 只能被執(zhí)行一次,如果想要一個(gè)完全一樣的 call,可以利用 call#clone 方法進(jìn)行克隆。
(2)利用 client.dispatcher().executed(this) 來進(jìn)行實(shí)際執(zhí)行,分發(fā)器負(fù)責(zé)分發(fā)。dispatcher 是剛才看到的 OkHttpClient.Builder 的成員之一,它的文檔說自己是異步 HTTP 請(qǐng)求的執(zhí)行策略,現(xiàn)在看來,同步請(qǐng)求它也有摻和。
(3)調(diào)用 getResponseWithInterceptorChain() 函數(shù)獲取 HTTP 返回結(jié)果,從函數(shù)名可以看出,這一步還會(huì)進(jìn)行一系列“攔截”操作。
(4)最后還要通知 dispatcher 自己已經(jīng)執(zhí)行完畢
dispatcher 這里我們不過度關(guān)注,在同步執(zhí)行的流程中,涉及到 dispatcher 的內(nèi)容只不過是告知它我們的執(zhí)行狀態(tài),比如開始執(zhí)行了(調(diào)用 executed),比如執(zhí)行完畢了(調(diào)用 finished),在異步執(zhí)行流程中它會(huì)有更多的參與。
Dispatcher的源碼:主要在異步請(qǐng)求時(shí)參與多,這里有執(zhí)行異步請(qǐng)求的線程池
/**
* Policy on when async requests are executed.
*
* <p>Each dispatcher uses an {@link ExecutorService} to run calls internally. If you supply your
* own executor, it should be able to run {@linkplain #getMaxRequests the configured maximum} number
* of calls concurrently.
*/
//請(qǐng)求分發(fā)器,**主要在異步請(qǐng)求時(shí)參與多,這里有執(zhí)行異步請(qǐng)求的線程池**
public final class Dispatcher {
//最大的請(qǐng)求數(shù)量
private int maxRequests = 64;
//每個(gè)主機(jī)的請(qǐng)求數(shù)量,默認(rèn)在摸個(gè)主機(jī)上同時(shí)請(qǐng)求5個(gè)
private int maxRequestsPerHost = 5;
private @Nullable Runnable idleCallback;
/** Executes calls. Created lazily. */
//執(zhí)行異步call時(shí)的線程池,就在這兒
private @Nullable ExecutorService executorService;
/** Ready async calls in the order they'll be run. */
//即將被執(zhí)行的異步call隊(duì)列
private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();
/** Running asynchronous calls. Includes canceled calls that haven't finished yet. */
//正在運(yùn)行的異步call,包括被取消的還沒有完成的
private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();
/** Running synchronous calls. Includes canceled calls that haven't finished yet. */
//正在運(yùn)行的同步call。包括被取消的還沒有完成的
private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();
//可以執(zhí)行自定義線程池,傳進(jìn)來
public Dispatcher(ExecutorService executorService) {
this.executorService = executorService;
}
public Dispatcher() {
}
//構(gòu)造線程池
public synchronized ExecutorService executorService() {
if (executorService == null) {
executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
}
return executorService;
}
/**
* Set the maximum number of requests to execute concurrently. Above this requests queue in
* memory, waiting for the running calls to complete.
*
* <p>If more than {@code maxRequests} requests are in flight when this is invoked, those requests
* will remain in flight.
*/
public synchronized void setMaxRequests(int maxRequests) {
if (maxRequests < 1) {
throw new IllegalArgumentException("max < 1: " + maxRequests);
}
this.maxRequests = maxRequests;
promoteCalls();
}
public synchronized int getMaxRequests() {
return maxRequests;
}
/**
* Set the maximum number of requests for each host to execute concurrently. This limits requests
* by the URL's host name. Note that concurrent requests to a single IP address may still exceed
* this limit: multiple hostnames may share an IP address or be routed through the same HTTP
* proxy.
*
* <p>If more than {@code maxRequestsPerHost} requests are in flight when this is invoked, those
* requests will remain in flight.
*/
public synchronized void setMaxRequestsPerHost(int maxRequestsPerHost) {
if (maxRequestsPerHost < 1) {
throw new IllegalArgumentException("max < 1: " + maxRequestsPerHost);
}
this.maxRequestsPerHost = maxRequestsPerHost;
promoteCalls();
}
public synchronized int getMaxRequestsPerHost() {
return maxRequestsPerHost;
}
/**
* Set a callback to be invoked each time the dispatcher becomes idle (when the number of running
* calls returns to zero).
*
* <p>Note: The time at which a {@linkplain Call call} is considered idle is different depending
* on whether it was run {@linkplain Call#enqueue(Callback) asynchronously} or
* {@linkplain Call#execute() synchronously}. Asynchronous calls become idle after the
* {@link Callback#onResponse onResponse} or {@link Callback#onFailure onFailure} callback has
* returned. Synchronous calls become idle once {@link Call#execute() execute()} returns. This
* means that if you are doing synchronous calls the network layer will not truly be idle until
* every returned {@link Response} has been closed.
*/
public synchronized void setIdleCallback(@Nullable Runnable idleCallback) {
this.idleCallback = idleCallback;
}
//分發(fā)異步執(zhí)行的call,是提交到線程池
synchronized void enqueue(AsyncCall call) {
if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
runningAsyncCalls.add(call);
//提交到線程此執(zhí)行
executorService().execute(call);
} else {
readyAsyncCalls.add(call);
}
}
/**
* Cancel all calls currently enqueued or executing. Includes calls executed both {@linkplain
* Call#execute() synchronously} and {@linkplain Call#enqueue asynchronously}.
*/
public synchronized void cancelAll() {
for (AsyncCall call : readyAsyncCalls) {
call.get().cancel();
}
for (AsyncCall call : runningAsyncCalls) {
call.get().cancel();
}
for (RealCall call : runningSyncCalls) {
call.cancel();
}
}
private void promoteCalls() {
if (runningAsyncCalls.size() >= maxRequests) return; // Already running max capacity.
if (readyAsyncCalls.isEmpty()) return; // No ready calls to promote.
for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
AsyncCall call = i.next();
if (runningCallsForHost(call) < maxRequestsPerHost) {
i.remove();
runningAsyncCalls.add(call);
executorService().execute(call);
}
if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity.
}
}
/** Returns the number of running calls that share a host with {@code call}. */
private int runningCallsForHost(AsyncCall call) {
int result = 0;
for (AsyncCall c : runningAsyncCalls) {
if (c.host().equals(call.host())) result++;
}
return result;
}
/** Used by {@code Call#execute} to signal it is in-flight. */
//分發(fā)同步call,只加入到正在運(yùn)行同步call的隊(duì)列
synchronized void executed(RealCall call) {
runningSyncCalls.add(call);
}
/** Used by {@code AsyncCall#run} to signal completion. */
void finished(AsyncCall call) {
finished(runningAsyncCalls, call, true);
}
/** Used by {@code Call#execute} to signal completion. */
//同步call已經(jīng)完成,移除隊(duì)列
void finished(RealCall call) {
finished(runningSyncCalls, call, false);
}
private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
int runningCallsCount;
Runnable idleCallback;
synchronized (this) {
if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
if (promoteCalls) promoteCalls();
runningCallsCount = runningCallsCount();
idleCallback = this.idleCallback;
}
if (runningCallsCount == 0 && idleCallback != null) {
idleCallback.run();
}
}
/** Returns a snapshot of the calls currently awaiting execution. */
//返回等待執(zhí)行call的集合
public synchronized List<Call> queuedCalls() {
List<Call> result = new ArrayList<>();
for (AsyncCall asyncCall : readyAsyncCalls) {
result.add(asyncCall.get());
}
return Collections.unmodifiableList(result);
}
/** Returns a snapshot of the calls currently being executed. */
public synchronized List<Call> runningCalls() {
List<Call> result = new ArrayList<>();
result.addAll(runningSyncCalls);
for (AsyncCall asyncCall : runningAsyncCalls) {
result.add(asyncCall.get());
}
return Collections.unmodifiableList(result);
}
public synchronized int queuedCallsCount() {
return readyAsyncCalls.size();
}
public synchronized int runningCallsCount() {
return runningAsyncCalls.size() + runningSyncCalls.size();
}
}
在上面的同步call中,真正發(fā)出網(wǎng)絡(luò)請(qǐng)求,解析返回結(jié)果的,還是getResponseWithInterceptorChain:
//重要的攔截器的責(zé)任鏈
Response getResponseWithInterceptorChain() throws IOException {
// Build a full stack of interceptors.
List<Interceptor> interceptors = new ArrayList<>();
interceptors.addAll(client.interceptors()); //(1)
interceptors.add(retryAndFollowUpInterceptor); //(2)
interceptors.add(new BridgeInterceptor(client.cookieJar())); //(3)
interceptors.add(new CacheInterceptor(client.internalCache())); //(4)
interceptors.add(new ConnectInterceptor(client)); //(5)
if (!forWebSocket) {
interceptors.addAll(client.networkInterceptors());
}
interceptors.add(new CallServerInterceptor(forWebSocket));
Interceptor.Chain chain = new RealInterceptorChain(interceptors, null, null, null, 0,
originalRequest, this, eventListener, client.connectTimeoutMillis(),
client.readTimeoutMillis(), client.writeTimeoutMillis());
return chain.proceed(originalRequest);
}
3,在獲得相應(yīng)之前經(jīng)過的最后一關(guān)就是攔截器Interceptor
the whole thing is just a stack of built-in interceptors.
可見 Interceptor 是 OkHttp 最核心的一個(gè)東西,不要誤以為它只負(fù)責(zé)攔截請(qǐng)求進(jìn)行一些額外的處理(例如 cookie),實(shí)際上它把實(shí)際的網(wǎng)絡(luò)請(qǐng)求、緩存、透明壓縮等功能都統(tǒng)一了起來,每一個(gè)功能都只是一個(gè) Interceptor,它們?cè)龠B接成一個(gè) Interceptor.Chain,環(huán)環(huán)相扣,最終圓滿完成一次網(wǎng)絡(luò)請(qǐng)求。
從 getResponseWithInterceptorChain 函數(shù)我們可以看到,Interceptor.Chain 的分布依次是:

(1)在配置 OkHttpClient時(shí)設(shè)置的interceptors;
(2)負(fù)責(zé)失敗重試以及重定向的 RetryAndFollowUpInterceptor;
(3)負(fù)責(zé)把用戶構(gòu)造的請(qǐng)求轉(zhuǎn)換為發(fā)送到服務(wù)器的請(qǐng)求、把服務(wù)器返回的響應(yīng)轉(zhuǎn)換為用戶友好的響應(yīng)的BridgeInterceptor;
(4)負(fù)責(zé)讀取緩存直接返回、更新緩存的 CacheInterceptor
(5)負(fù)責(zé)和服務(wù)器建立連接的ConnectInterceptor;
(6)配置 OkHttpClient 時(shí)設(shè)置的 networkInterceptors;
(7)負(fù)責(zé)向服務(wù)器發(fā)送請(qǐng)求數(shù)據(jù)、從服務(wù)器讀取響應(yīng)數(shù)據(jù)的 CallServerInterceptor
在這里,位置決定了功能,最后一個(gè) Interceptor 一定是負(fù)責(zé)和服務(wù)器實(shí)際通訊的,重定向、緩存等一定是在實(shí)際通訊之前的
2.2 在2.1中我們深入討論了同步請(qǐng)求的過程,下面講講異步請(qǐng)求原理
代碼:
Request request = new Request.Builder()
.url("http://publicobject.com/helloworld.txt")
.build();
//用request新建的call使用enqueue異步請(qǐng)求
client.newCall(request).enqueue(new Callback() {
@Override
public void onFailure(Call call, IOException e) {
e.printStackTrace();
}
@Override
public void onResponse(Call call, Response response) throws IOException {
//相應(yīng)成功回調(diào),response,非主線程
if (!response.isSuccessful()) throw new IOException("Unexpected code " + response);
Headers responseHeaders = response.headers();
for (int i = 0, size = responseHeaders.size(); i < size; i++) {
System.out.println(responseHeaders.name(i) + ": " + responseHeaders.value(i));
}
System.out.println(response.body().string());
}
});
由代碼中client.newCall(request).enqueue(Callback),開始我們知道client.newCall(request)方法返回的是RealCall對(duì)象,接下來繼續(xù)向下看enqueue()方法:
//異步任務(wù)使用
@Override
public void enqueue(Callback responseCallback) {
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
//送給分發(fā)器Dispatcher分發(fā),其實(shí)Dispatcher中有線程池,把AsyncCall這個(gè)任務(wù)提交到線程池執(zhí)行,通過responseCallback回調(diào)
client.dispatcher().enqueue(new AsyncCall(responseCallback));
}
我們先看一下上面的Dispatcher類中的enqueue(Call )方法,在看看AsyncCall類:
synchronized void enqueue(AsyncCall call) {
if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
runningAsyncCalls.add(call);
executorService().execute(call);
} else {
readyAsyncCalls.add(call);
}
}
如果中的runningAsynCalls不滿,且call占用的host小于最大數(shù)量,則將call加入到runningAsyncCalls中執(zhí)行,同時(shí)利用線程池執(zhí)行call;否者將call加入到readyAsyncCalls中。runningAsyncCalls和readyAsyncCalls是什么呢?在把上面將同步Http請(qǐng)求時(shí)講過了,可以瞄一眼。
call加入到線程池中執(zhí)行了?,F(xiàn)在再看AsynCall的代碼,它是RealCall中的內(nèi)部類
//異步請(qǐng)求,顯然是繼承了NamedRunnable ,在NamedRunnable 的run方法中執(zhí)行繼承的execute() 方法
final class AsyncCall extends NamedRunnable {
private final Callback responseCallback;
private AsyncCall(Callback responseCallback) {
super("OkHttp %s", redactedUrl());
this.responseCallback = responseCallback;
}
String host() {
return originalRequest.url().host();
}
Request request() {
return originalRequest;
}
RealCall get() {
return RealCall.this;
}
@Override protected void execute() {
boolean signalledCallback = false;
try {
//還是回到這個(gè)攔截器責(zé)任鏈函數(shù)得到響應(yīng),只不過當(dāng)前這個(gè)過程是在線程池中進(jìn)行的
Response response = getResponseWithInterceptorChain();
if (retryAndFollowUpInterceptor.isCanceled()) {
signalledCallback = true;
//回調(diào)異常
responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
} else {
signalledCallback = true;
//回調(diào)成功
responseCallback.onResponse(RealCall.this, response);
}
} catch (IOException e) {
if (signalledCallback) {
// Do not signal the callback twice!
Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
} else {
//回調(diào)失敗
responseCallback.onFailure(RealCall.this, e);
}
} finally {
//告訴分發(fā)器Dispatcher請(qǐng)求執(zhí)行完成
client.dispatcher().finished(this);
}
}
}
AysncCall中的execute()中的方法,同樣是通過Response response = getResponseWithInterceptorChain();來獲得response,這樣異步任務(wù)也同樣通過了interceptor,剩下的就想看看上面的幾個(gè)攔截器是什么鬼。
責(zé)任鏈攔截器Interceptor
RetryAndFollowUpInterceptor:負(fù)責(zé)失敗重試以及重定向
BridgeInterceptor:負(fù)責(zé)把用戶構(gòu)造的請(qǐng)求轉(zhuǎn)換為發(fā)送到服務(wù)器的請(qǐng)求、把服務(wù)器返回的響應(yīng)轉(zhuǎn)換為用戶友好的響應(yīng)的 。
ConnectInterceptor:建立連接
NetworkInterceptors:配置OkHttpClient時(shí)設(shè)置的 NetworkInterceptors
CallServerInterceptor:發(fā)送和接收數(shù)據(jù)