OkHttp3源碼解析

OkHttp是一款非常常用的網(wǎng)絡(luò)框架,本文試圖對(duì)其源碼進(jìn)行解析,為方便大家閱讀,先列目錄如下:
1.基本用法
2.Dispatcher解析
3.攔截器執(zhí)行流程
4.RetryAndFollowUpInterceptor、BridgeInterceptor和CacheInterceptor
5.ConnectInterceptor
6.CallServerInterceptor

流程圖如下:
okhttp_full_process.png

首先構(gòu)造RealCall對(duì)象,若是異步請(qǐng)求則由Dispatcher分發(fā),同步則直接進(jìn)行請(qǐng)求,最后依次執(zhí)行五個(gè)攔截器的intercept方法完成請(qǐng)求。

1. 基本用法首先是創(chuàng)建OkHttpClient對(duì)象。

OkHttpClient client = new OkHttpClient.Builder().readTimeout(5, TimeUnit.SECONDS).build();   

這里采用建造者模式,我們可以很方便的設(shè)置各個(gè)參數(shù)。下面我詳細(xì)看下OkHttpClient.Builder()這個(gè)內(nèi)部類的構(gòu)造方法里有哪些參數(shù)。

public Builder() { dispatcher = new Dispatcher(); protocols = DEFAULT_PROTOCOLS; connectionSpecs = DEFAULT_CONNECTION_SPECS; eventListenerFactory = EventListener.factory(EventListener.NONE); proxySelector = ProxySelector.getDefault(); cookieJar = CookieJar.NO_COOKIES; socketFactory = SocketFactory.getDefault(); hostnameVerifier = OkHostnameVerifier.INSTANCE; certificatePinner = CertificatePinner.DEFAULT; proxyAuthenticator = Authenticator.NONE; authenticator = Authenticator.NONE; connectionPool = new ConnectionPool(); dns = Dns.SYSTEM; followSslRedirects = true; followRedirects = true; retryOnConnectionFailure = true; connectTimeout = 10_000; readTimeout = 10_000; writeTimeout = 10_000; pingInterval = 0; }  

這里有兩個(gè)參數(shù)比較重要,一個(gè)是dispatcher,由它來決定異步請(qǐng)求是直接執(zhí)行還是放入等待隊(duì)列;另一個(gè)是connectionPool,OkHttp把請(qǐng)求都抽象成Connection,而connectionPool就是來管理這些Connection的。這兩者后文都會(huì)詳細(xì)講解。 第二步就是創(chuàng)建Request對(duì)象。

Request request = new Request.Builder().url("http://www.baidu.com").get().build(); 

同樣采用了建造者模式。這里封裝了請(qǐng)求的URL、請(qǐng)求方式等信息。 第三步是創(chuàng)建Call對(duì)象。

Call call = client.newCall(request); 

由于Call是個(gè)接口,具體的操作都是在其實(shí)現(xiàn)類RealCall里完成的,來看RealCall的構(gòu)造方法。

RealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) { final EventListener.Factory eventListenerFactory = client.eventListenerFactory(); this.client = client; this.originalRequest = originalRequest; this.forWebSocket = forWebSocket; this.retryAndFollowUpInterceptor = new RetryAndFollowUpInterceptor(client, forWebSocket);// TODO(jwilson): this is unsafe publication and not threadsafe. this.eventListener = eventListenerFactory.create(this); } 

可以看到它將前面創(chuàng)建的OkHttpClient和Request對(duì)象都傳給了RealCall。 第四步,根據(jù)是同步請(qǐng)求還是異步請(qǐng)求調(diào)用不同的方法。 同步請(qǐng)求:

Response response = call.execute(); 

我們來具體看下execute()方法。

@Override public Response execute() throws IOException { synchronized (this) { if (executed) throw new IllegalStateException("Already Executed"); executed = true; } captureCallStackTrace(); try { client.dispatcher().executed(this); Response result = getResponseWithInterceptorChain(); if (result == null) throw new IOException("Canceled"); return result; } finally { client.dispatcher().finished(this); } } 

我們直接看重點(diǎn)代碼,首先看client.dispatcher().executed(this);,client.dispatcher()是獲取到Dispatcher對(duì)象,然后執(zhí)行其executed()方法,我們看下這個(gè)方法的實(shí)現(xiàn)。

 /** Used by {@code Call#execute} to signal it is in-flight. */ synchronized void executed(RealCall call) { runningSyncCalls.add(call); } 

就是將當(dāng)前的請(qǐng)求call添加到runningSyncCalls這個(gè)隊(duì)列中去,而runningSyncCalls就是同步請(qǐng)求的隊(duì)列。 下面一行Response result = getResponseWithInterceptorChain();就是具體的網(wǎng)絡(luò)請(qǐng)求操作,這個(gè)后文會(huì)細(xì)講。然后我們?cè)賮砜磃inally里面的代碼client.dispatcher().finished(this);,finished()方法代碼如下:

privatevoid finished(Dequecalls, T call, boolean promoteCalls) { int runningCallsCount; Runnable idleCallback; synchronized (this) { if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!"); if (promoteCalls) promoteCalls(); runningCallsCount = runningCallsCount(); idleCallback = this.idleCallback; } if (runningCallsCount == 0 && idleCallback != null) { idleCallback.run(); } } 

重點(diǎn)看這一行if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");,將call從runningSyncCalls移除了,如果不能移除就拋出異常。 所以總結(jié)一下,同步方法所做的工作就是先將call添加到Dispatcher的runningSyncCalls隊(duì)列中去,完成請(qǐng)求后再將其移除。 異步請(qǐng)求:

 call.enqueue(new Callback() { @Override public void onFailure(Call call, IOException e) { System.out.println("Response:onFailure"); } @Override public void onResponse(Call call, Response response) throws IOException { System.out.println("Response:" + response.body().string()); } }); 

這里enqueue最終會(huì)調(diào)用Dispatcher的enqueue方法,代碼如下:

synchronized void enqueue(AsyncCall call) { if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) { runningAsyncCalls.add(call); executorService().execute(call); } else { readyAsyncCalls.add(call); } } 

判斷進(jìn)行中的異步請(qǐng)求隊(duì)列runningAsyncCalls的size是否小于最大請(qǐng)求數(shù)及進(jìn)行中的請(qǐng)求數(shù)是否小于每個(gè)主機(jī)最大請(qǐng)求數(shù),若都滿足,則將call添加到異步請(qǐng)求隊(duì)列runningAsyncCalls,并放入線程池中執(zhí)行;若不滿足,則將call添加到就緒異步請(qǐng)求隊(duì)列readyAsyncCalls中。 接下來我們具體看下executorService().execute(call);,也就是將call放入線程池中執(zhí)行時(shí)具體做了哪些操作。

 @Override protected void execute() { boolean signalledCallback = false; try { Response response = getResponseWithInterceptorChain(); if (retryAndFollowUpInterceptor.isCanceled()) { signalledCallback = true; responseCallback.onFailure(RealCall.this, new IOException("Canceled")); } else { signalledCallback = true; responseCallback.onResponse(RealCall.this, response); } } catch (IOException e) { if (signalledCallback) { // Do not signal the callback twice! Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e); } else { responseCallback.onFailure(RealCall.this, e); } } finally { client.dispatcher().finished(this); } } 

首先在Response response = getResponseWithInterceptorChain();這一行進(jìn)行實(shí)際的網(wǎng)絡(luò)請(qǐng)求,這與同步請(qǐng)求是一樣的,在finally中執(zhí)行client.dispatcher().finished(this);,也與同步請(qǐng)求一樣。通過判斷retryAndFollowUpInterceptor.isCanceled()分別回調(diào)onFailure和onResponse,這兩個(gè)回調(diào)方法正是我們傳入的callback。 ###2. Dispatcher解析 上面我們已經(jīng)反復(fù)提到Dispatcher了,現(xiàn)在我們來具體分析Dispatcher這個(gè)類,總的來說,它的作用就是管理請(qǐng)求隊(duì)列,并用線程池執(zhí)行請(qǐng)求。

 /** Ready async calls in the order they'll be run. */ private final DequereadyAsyncCalls = new ArrayDeque<>(); /** Running asynchronous calls. Includes canceled calls that haven't finished yet. */ private final DequerunningAsyncCalls = new ArrayDeque<>(); /** Running synchronous calls. Includes canceled calls that haven't finished yet. */ private final DequerunningSyncCalls = new ArrayDeque<>(); 

首先Dispatcher里維護(hù)了這三個(gè)隊(duì)列,readyAsyncCalls是異步請(qǐng)求就緒隊(duì)列,也就是等待執(zhí)行的異步請(qǐng)求隊(duì)列。runningAsyncCalls是進(jìn)行中的異步請(qǐng)求隊(duì)列。runningSyncCalls是進(jìn)行中的同步請(qǐng)求隊(duì)列。 當(dāng)執(zhí)行同步請(qǐng)求時(shí)比較簡單,首先調(diào)用synchronized void executed(RealCall call) { runningSyncCalls.add(call); }添加到同步請(qǐng)求隊(duì)列中,執(zhí)行完后調(diào)用if (!calls.remove(call))將call移除。 當(dāng)執(zhí)行異步請(qǐng)求時(shí),方法如下

synchronized void enqueue(AsyncCall call) { if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) { runningAsyncCalls.add(call); executorService().execute(call); } else { readyAsyncCalls.add(call); } } 

如上文所述,通過判斷runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost是否滿足來決定將call添加到runningAsyncCalls還是readyAsyncCalls,那么問題來了,如果條件不滿足,call被添加到readyAsyncCalls中等待了,待條件線程池中有空余的線程時(shí),如何執(zhí)行call呢。 上文也有提到,上述方法中的executorService().execute(call);執(zhí)行時(shí),最終會(huì)調(diào)用client.dispatcher().finished(this);方法,方法如下

privatevoid finished(Dequecalls, T call, boolean promoteCalls) { int runningCallsCount; Runnable idleCallback; synchronized (this) { if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!"); if (promoteCalls) promoteCalls(); runningCallsCount = runningCallsCount(); idleCallback = this.idleCallback; } if (runningCallsCount == 0 && idleCallback != null) { idleCallback.run(); } } 

if (!calls.remove(call))runningAsyncCalls進(jìn)行中的異步請(qǐng)求隊(duì)列從中將call移除掉,然后執(zhí)行if (promoteCalls) promoteCalls();,這里promoteCalls這個(gè)標(biāo)志位,同步為false,即不執(zhí)行promoteCalls(),異步為true,會(huì)執(zhí)行,下面我們來看下這個(gè)方法的具體實(shí)現(xiàn)。

private void promoteCalls() { if (runningAsyncCalls.size() >= maxRequests) return; // Already running max capacity. if (readyAsyncCalls.isEmpty()) return; // No ready calls to promote. for (Iteratori = readyAsyncCalls.iterator(); i.hasNext(); ) { AsyncCall call = i.next(); if (runningCallsForHost(call) < maxRequestsPerHost) { i.remove(); runningAsyncCalls.add(call); executorService().execute(call); } if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity. } } 

對(duì)就緒異步請(qǐng)求隊(duì)列進(jìn)行了遍歷,只要滿足if (runningCallsForHost(call) < maxRequestsPerHost),就將其從就緒隊(duì)列中remove掉,然后添加到進(jìn)行中的隊(duì)列,并放入線程池中執(zhí)行。 最后我們看下線程池的實(shí)例化,

 public synchronized ExecutorService executorService() { if (executorService == null) { executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS, new SynchronousQueue(), Util.threadFactory("OkHttp Dispatcher", false)); } return executorService; } 

核心線程數(shù)為0,這意味著線程空閑超過等待時(shí)間時(shí),所以線程都會(huì)被殺掉;最大線程數(shù)為int最大值,但實(shí)際上Dispatcher對(duì)最大請(qǐng)求數(shù)做了限制為64個(gè),所以最多只會(huì)有64個(gè)線程;等待時(shí)間為60s,即線程空閑一分鐘后會(huì)被殺掉。 ###3. 攔截器執(zhí)行流程 上文我們提到過,不管同步還是異步,最終進(jìn)行網(wǎng)絡(luò)請(qǐng)求的都是這一行Response response = getResponseWithInterceptorChain();,來看其具體實(shí)現(xiàn),

 Response getResponseWithInterceptorChain() throws IOException { // Build a full stack of interceptors. Listinterceptors = new ArrayList<>();

interceptors.addAll(client.interceptors());

interceptors.add(retryAndFollowUpInterceptor);

interceptors.add(new BridgeInterceptor(client.cookieJar()));

interceptors.add(new CacheInterceptor(client.internalCache()));

interceptors.add(new ConnectInterceptor(client));

if (!forWebSocket) {

interceptors.addAll(client.networkInterceptors());

}

interceptors.add(new CallServerInterceptor(forWebSocket));

Interceptor.Chain chain = new RealInterceptorChain(

interceptors, null, null, null, 0, originalRequest);

return chain.proceed(originalRequest);

}

OkHttp進(jìn)行網(wǎng)絡(luò)請(qǐng)求的過程就是五個(gè)攔截器依次執(zhí)行的過程,這五個(gè)攔截器分別是:

a. 負(fù)責(zé)失敗重試以及重定向的RetryAndFollowUpInterceptor;

b. 負(fù)責(zé)把封裝一些頭部信息的BridgeInterceptor

c. 負(fù)責(zé)讀取緩存直接返回、更新緩存的CacheInterceptor

d. 負(fù)責(zé)和服務(wù)器建立連接的ConnectInterceptor;

e. 負(fù)責(zé)向服務(wù)器發(fā)送請(qǐng)求數(shù)據(jù)、從服務(wù)器讀取響應(yīng)數(shù)據(jù)的CallServerInterceptor.

下面我們理一下這些攔截器是如何依次執(zhí)行的,上面的代碼首先將這些攔截器存入一個(gè)list保存起來,然后傳入RealInterceptorChain的構(gòu)造方法里,然后調(diào)用RealInterceptorChainproceed方法,在proceed里有如下代碼


// Call the next interceptor in the chain.

RealInterceptorChain next = new RealInterceptorChain(

interceptors, streamAllocation, httpCodec, connection, index + 1, request);

Interceptor interceptor = interceptors.get(index);

Response response = interceptor.intercept(next);

這里調(diào)用了攔截器的intercept方法,注意,這里將index+1了,而在每個(gè)intercept又會(huì)調(diào)用RealInterceptorChainproceed方法,而每次index都會(huì)+1,這樣就將list里的攔截器的intercept方法都執(zhí)行了。接下來我們來看這五個(gè)攔截器。

4. RetryAndFollowUpInterceptor、BridgeInterceptor和CacheInterceptor

RetryAndFollowUpInterceptor是用來進(jìn)行重試和重定向的,在這個(gè)攔截器中創(chuàng)建了StreamAllocation對(duì)象,但并未使用,一直傳到ConnectInterceptor。

BridgeInterceptor是用來封裝一些請(qǐng)求頭部信息的。

CacheInterceptor是用于處理緩存的,這三個(gè)攔截器這里不做過多延伸,重點(diǎn)來看后兩個(gè)攔截器。

5. ConnectInterceptor

這個(gè)攔截器主要的作用是


@Override public Response intercept(Chain chain) throws IOException {

RealInterceptorChain realChain = (RealInterceptorChain) chain;

Request request = realChain.request();

StreamAllocation streamAllocation = realChain.streamAllocation();

// We need the network to satisfy this request. Possibly for validating a conditional GET.

boolean doExtensiveHealthChecks = !request.method().equals("GET");

HttpCodec httpCodec = streamAllocation.newStream(client, doExtensiveHealthChecks);

RealConnection connection = streamAllocation.connection();

return realChain.proceed(request, streamAllocation, httpCodec, connection);

}

首先獲取在RetryAndFollowUpInterceptor中創(chuàng)建的StreamAllocation對(duì)象,然后獲取HttpCodec對(duì)象和RealConnection對(duì)象,HttpCodec用于編碼Request和解碼Response,RealConnection用于實(shí)際進(jìn)行網(wǎng)絡(luò)IO傳輸。然后將這兩個(gè)對(duì)象傳遞給下一個(gè)攔截器。接下來我們進(jìn)入streamAllocation.newStream(client, doExtensiveHealthChecks);方法詳細(xì)看看是如何獲取HttpCodec對(duì)象的。


RealConnection resultConnection = findHealthyConnection(connectTimeout, readTimeout,

writeTimeout, connectionRetryEnabled, doExtensiveHealthChecks);

HttpCodec resultCodec = resultConnection.newCodec(client, this);

繼續(xù)跟進(jìn)到findHealthyConnection里;


* Finds a connection and returns it if it is healthy. If it is unhealthy the process is repeated

* until a healthy connection is found.

*/

private RealConnection findHealthyConnection(int connectTimeout, int readTimeout,

int writeTimeout, boolean connectionRetryEnabled, boolean doExtensiveHealthChecks)

throws IOException {

while (true) {

RealConnection candidate = findConnection(connectTimeout, readTimeout, writeTimeout,

connectionRetryEnabled);

// If this is a brand new connection, we can skip the extensive health checks.

synchronized (connectionPool) {

if (candidate.successCount == 0) {

return candidate;

}

}

// Do a (potentially slow) check to confirm that the pooled connection is still good. If it

// isn't, take it out of the pool and start again.

if (!candidate.isHealthy(doExtensiveHealthChecks)) {

noNewStreams();

continue;

}

return candidate;

}

}.

這是個(gè)死循環(huán),獲取到candidate后首先判斷if (candidate.successCount == 0),滿足即代表這是個(gè)全新的RealConnection,直接返回;不滿足則檢查是否健康可用,不可用則跳出此次循環(huán)繼續(xù)獲取下個(gè)RealConnection對(duì)象,直到找到可用的。

findConnection里有如下核心代碼,


// Attempt to use an already-allocated connection.

RealConnection allocatedConnection = this.connection;

if (allocatedConnection != null && !allocatedConnection.noNewStreams) {

return allocatedConnection;

}

// Attempt to get a connection from the pool.

Internal.instance.get(connectionPool, address, this, null);

if (connection != null) {

return connection;

}

首先嘗試復(fù)用RealConnection,如果不為空則直接將其返回;若為空則從connectionPool里獲取一個(gè)。如果獲取不到,就new一個(gè)并放入connectionPool。最終調(diào)用connect方法進(jìn)行打開一個(gè)socket鏈接。

6. CallServerInterceptor

具體的網(wǎng)絡(luò)請(qǐng)求就是在這個(gè)攔截器的intercept方法中執(zhí)行的,重點(diǎn)的代碼有以下幾步:

a. httpCodec.writeRequestHeaders(request);向socket中寫入頭部信息。

b. request.body().writeTo(bufferedRequestBody);向socket中寫入body信息。

c. httpCodec.finishRequest();寫入完畢。

d. responseBuilder = httpCodec.readResponseHeaders(false);讀取頭部信息。

e. response = response.newBuilder().body(httpCodec.openResponseBody(response)).build();讀取body信息。

至此本文對(duì)OkHttp的分析已全部完畢,限于篇幅,部分環(huán)節(jié)的深度還顯不夠,待日后再補(bǔ)充。如有錯(cuò)漏之處還請(qǐng)指出。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 一 前言 Retrofit + Okhttp + RxJava,可以說是現(xiàn)在最火的網(wǎng)絡(luò)請(qǐng)求組合了,而它們背后的設(shè)計(jì)...
    求閑居士閱讀 816評(píng)論 0 2
  • 基本用法介紹 okhttp一直是一個(gè)應(yīng)用非常廣泛的網(wǎng)絡(luò)框架。首先看一下okhttp的基本用法 這里為了方便 我把同...
    Big_Sweet閱讀 390評(píng)論 0 0
  • 參考資源 官網(wǎng) 國內(nèi)博客 GitHub官網(wǎng) 鑒于一些關(guān)于OKHttp3源碼的解析文檔過于碎片化,本文系統(tǒng)的,由淺入...
    風(fēng)骨依存閱讀 12,693評(píng)論 11 82
  • 引用 okhttp問世以來,以其高度封裝、定制、簡潔的api調(diào)用獲得廣大使用者的喜愛,目前最流行的網(wǎng)絡(luò)請(qǐng)求框架莫過...
    luweicheng24閱讀 2,791評(píng)論 0 4
  • 在一片金黃色的麥田里,風(fēng)吹過,麥子左右搖動(dòng),像一層又一層的海浪。我趴在草叢里,手里拿著裝著消音器的M4步槍,白色的...
    立七閱讀 1,824評(píng)論 0 0

友情鏈接更多精彩內(nèi)容