Okhttp設(shè)計與源碼解析
本文針對OkHttp的設(shè)計及源碼作一下解析:
**Okhttp如何做到請求的控制?
Okhttp核心設(shè)計Interceptor
Okhttp如何支持https? **
1.核心類庫、簡單使用、主要流程

(此圖片來自網(wǎng)絡(luò),作者不詳)
從上述UML類圖可以看出,Okhttp在設(shè)計時采用的門面模式,將整個系統(tǒng)的復(fù)雜性給隱藏起來,將子系統(tǒng)接口通過一個客戶端OkHttpClient統(tǒng)一暴露出來,使用戶與內(nèi)部系統(tǒng)之間達(dá)到解構(gòu)的目的,這種方式在很多開源框架上都有所體現(xiàn),像Glide、Picaso、ImageLoader、EventBus之類的,都有所體現(xiàn)。
Okhttp使用上和HttpClient基本一致,沒啥好說的。下面是基本的get異步請求:
OkHttpClient client = new OkHttpClient();
Request request = new Request.Builder().url("http://www.baidu.com").build();
client.newCall(request).enqueue(new Callback() {
@Override
public void onFailure(Call call, IOException e) {
Log.i("ggg",e.getMessage());
}
@Override
public void onResponse(Call call,Response response) throws IOException {
// NOT UI Thread
if (response.isSuccessful()) {
Log.i("ggg",response.code()+"");
Log.i("ggg",response.headers()+"");
Log.i("ggg",response.body().string());
}
response.close();
}
});
有兩點是需要注意的:一是Okhttp建議使用一個OkHttpClient,二是Reponse使用完需要關(guān)閉。
Okhttp的基本加載流程如下:

上圖中的攔截器即Interceptor,后面會著重講。
2.請求的分發(fā)處理,如何做到對請求數(shù)量控制前提下做到高并發(fā)、低阻塞?
想到并發(fā)必然想到多線程,繼而就自然想到線程池,通常我們在使用線程池的時候,通過ThreadPoolExecutor構(gòu)造方法:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory)
來創(chuàng)建線程池,那么要想做到高并發(fā)低阻塞,就意味著maximumPoolSize的值需要比較大,Okhttp在構(gòu)造線程池時,使用了如下參數(shù):
public synchronized ExecutorService executorService() {
if (executorService == null) {
executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
}
return executorService;
}
可以看到,maximumPoolSize基本沒有限制大小,工作隊列上,采用了SynchronousQueue,這是一個沒有容量的隊列。當(dāng)執(zhí)行enqueue的時候,事實上,Okhttp并沒有直接將任務(wù)交給線程池來執(zhí)行,而是通過講任務(wù)調(diào)度交給了Dispatcher這個類似代理服務(wù)器,來進行調(diào)度,這樣才做到了在控制同一個host請求數(shù)量的前提下,達(dá)到高并發(fā)低阻塞,我們來看看enqueue的實現(xiàn):
@Override
public void enqueue(Callback responseCallback) {
synchronized (this) {
if (executed)
throw new IllegalStateException("Already Executed");
executed = true;
}
captureCallStackTrace();
client.dispatcher().enqueue(new AsyncCall(responseCallback));
}
當(dāng)調(diào)用Realcall的enqueue時,會重新創(chuàng)建一個AsyncCall,AsyncCall是RealCall的一個內(nèi)部類,因此可以看作是RealCall的一個超集。然后交給了Dispatcher來進行調(diào)度:
synchronized void enqueue(AsyncCall call) {
if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
runningAsyncCalls.add(call);
executorService().execute(call);
} else {
readyAsyncCalls.add(call);
}
}
這段代碼邏輯很清晰,如果正在執(zhí)行請求與同一個Host請求沒有達(dá)到上限,那么將該次請求加入到正在執(zhí)行集合中,并交由線程池立刻執(zhí)行,否則講該次請求加入到等待隊列中,然后我們繼續(xù)看AsyncCall的執(zhí)行代碼:
@Override
protected void execute() {
//這個標(biāo)記用于確?;卣{(diào)只得到一次執(zhí)行
boolean signalledCallback = false;
try {
//這句代碼會通過攔截鏈來獲取響應(yīng)
Response response = getResponseWithInterceptorChain();
//請求被取消
if (retryAndFollowUpInterceptor.isCanceled()) {
signalledCallback = true;
responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
} else {
signalledCallback = true;
responseCallback.onResponse(RealCall.this, response);
}
} catch (IOException e) {
if (signalledCallback) {
// Do not signal the callback twice!
Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
} else {
responseCallback.onFailure(RealCall.this, e);
}
} finally {
//無論執(zhí)行過程發(fā)生了什么,通知Dispatcher該次請求已執(zhí)行完畢
client.dispatcher().finished(this);
}
}
再看看finished方法:
private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
int runningCallsCount;
Runnable idleCallback;
synchronized (this) {
if (!calls.remove(call))
throw new AssertionError("Call wasn't in-flight!");
if (promoteCalls)
//異步請求會執(zhí)行這里
promoteCalls();
runningCallsCount = runningCallsCount();
idleCallback = this.idleCallback;
}
//如果沒有正在執(zhí)行的請求并且設(shè)置了空閑回調(diào),那么會執(zhí)行此分發(fā)器的空閑回掉
if (runningCallsCount == 0 && idleCallback != null) {
idleCallback.run();
}
}
如果是異步的請求,那么當(dāng)這個請求走到finally中的finished方法時,會調(diào)用promoteCalls:
private void promoteCalls() {
if (runningAsyncCalls.size() >= maxRequests)
return; // Already running max capacity.
if (readyAsyncCalls.isEmpty())
return; // No ready calls to promote.
//注意這里的等待請求是一個先進先出的隊列結(jié)構(gòu)
for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext();) {
AsyncCall call = i.next();
//如果主機請求限制沒到上限就會取出執(zhí)行
if (runningCallsForHost(call) < maxRequestsPerHost) {
i.remove();
runningAsyncCalls.add(call);
executorService().execute(call);
}
//一直遍歷到達(dá)到總數(shù)上限為止
if (runningAsyncCalls.size() >= maxRequests)
return; // Reached max capacity.
}
}
也就是說,當(dāng)一個請求結(jié)束時,會馬上使用該線程繼續(xù)執(zhí)行等待中的請求,避免了線程的閑置與浪費,這里,finally的使用是一個亮點,這在AsyncCall的父類NamedRunable中,同樣有所體現(xiàn):
public abstract class NamedRunnable implements Runnable {
protected final String name;
public NamedRunnable(String format, Object... args) {
this.name = Util.format(format, args);
}
@Override
public final void run() {
String oldName = Thread.currentThread().getName();
Thread.currentThread().setName(name);
try {
execute();
} finally {
Thread.currentThread().setName(oldName);
}
}
protected abstract void execute();
}
這個類實現(xiàn)了,在該線程執(zhí)行該Runable時,線程名字與該Runable一致。
3.Okhttp的核心設(shè)計,Interceptor
不管是同步還是異步的請求,最終都會走到方法getResponseWithInterceptorChain來獲得Reponse,從字面意思,這個方法可以理解為根據(jù)攔截鏈獲得Reponse,后面我們會說到怎么自定義攔截器:
Response getResponseWithInterceptorChain() throws IOException {
// Build a full stack of interceptors.
List<Interceptor> interceptors = new ArrayList<>();
//如果使用時我們沒有自定義過攔截器,那么這里addAll的集合其實是空的
interceptors.addAll(client.interceptors());
interceptors.add(retryAndFollowUpInterceptor);
interceptors.add(new BridgeInterceptor(client.cookieJar()));
interceptors.add(new CacheInterceptor(client.internalCache()));
interceptors.add(new ConnectInterceptor(client));
if (!forWebSocket) {
//如果使用時我們沒有自定義過攔截器,那么這里addAll的集合其實是空的
interceptors.addAll(client.networkInterceptors());
}
interceptors.add(new CallServerInterceptor(forWebSocket));
//這里構(gòu)造方法接收五個參數(shù),目前除了攔截器,當(dāng)前攔截器位置,初始的請求,其它為null。
Interceptor.Chain chain = new RealInterceptorChain(interceptors, null, null, null, 0,
originalRequest);
return chain.proceed(originalRequest);
}
這里代碼最終創(chuàng)建了一個RealInterceptorChain,即攔截鏈,并調(diào)用了其proceed方法,來獲取Reponse,繼續(xù)跟進去:
public Response proceed(Request request, StreamAllocation streamAllocation, HttpCodec httpCodec,
RealConnection connection) throws IOException {
if (index >= interceptors.size())
throw new AssertionError();
calls++;
// If we already have a stream, confirm that the incoming request will use it.
if (this.httpCodec != null && !this.connection.supportsUrl(request.url())) {
throw new IllegalStateException("network interceptor " + interceptors.get(index - 1)
+ " must retain the same host and port");
}
// If we already have a stream, confirm that this is the only call to chain.proceed().
if (this.httpCodec != null && calls > 1) {
throw new IllegalStateException("network interceptor " + interceptors.get(index - 1)
+ " must call proceed() exactly once");
}
// Call the next interceptor in the chain.
RealInterceptorChain next = new RealInterceptorChain(interceptors, streamAllocation, httpCodec,
connection, index + 1, request);
Interceptor interceptor = interceptors.get(index);
Response response = interceptor.intercept(next);
// Confirm that the next interceptor made its required call to chain.proceed().
if (httpCodec != null && index + 1 < interceptors.size() && next.calls != 1) {
throw new IllegalStateException(
"network interceptor " + interceptor + " must call proceed() exactly once");
}
// Confirm that the intercepted response isn't null.
if (response == null) {
throw new NullPointerException("interceptor " + interceptor + " returned null");
}
return response;
}
這段代碼比較長,但是核心代碼其實就三行:
RealInterceptorChain next = new RealInterceptorChain(interceptors, streamAllocation, httpCodec,
connection, index + 1, request);
Interceptor interceptor = interceptors.get(index);
Response response = interceptor.intercept(next);
創(chuàng)建一個攔截鏈,將其index+1,并拿到當(dāng)前的攔截器,然后調(diào)用攔截器的intercept方法,根據(jù)我們前面的getResponseWithInterceptorChain方法可以知道,這第一個應(yīng)該是retryAndFollowUpInterceptor(前提是沒有自定義攔截器),即用來作重試與重定向處理的攔截器:
@Override
public Response intercept(Chain chain) throws IOException {
Request request = chain.request();
streamAllocation = new StreamAllocation(client.connectionPool(), createAddress(request.url()),
callStackTrace);
int followUpCount = 0;
Response priorResponse = null;
while (true) {
if (canceled) {
streamAllocation.release();
throw new IOException("Canceled");
}
Response response = null;
boolean releaseConnection = true;
try {
response = ((RealInterceptorChain) chain).proceed(request, streamAllocation, null, null);
//如果上面這句代碼執(zhí)行時沒有發(fā)生異常,那么標(biāo)記置為false,即連接是正??捎玫? releaseConnection = false;
} catch (RouteException e) {
// The attempt to connect via a route failed. The request will not have been sent.
if (!recover(e.getLastConnectException(), false, request)) {
//如果無法恢復(fù)連接 直接拋異常結(jié)束循環(huán)
throw e.getLastConnectException();
}
//可恢復(fù),標(biāo)記置為false,即連接是正??捎玫?,不需要釋放
releaseConnection = false;
//循環(huán)繼續(xù),即重試
continue;
} catch (IOException e) {
// An attempt to communicate with a server failed. The request may have been sent.
boolean requestSendStarted = !(e instanceof ConnectionShutdownException);
if (!recover(e, requestSendStarted, request))
throw e;
releaseConnection = false;
continue;
} finally {
// We're throwing an unchecked exception. Release any resources.
if (releaseConnection) {
//最終如果連接不可用,那么需要釋放掉,因為catch塊拋出了異常,這里需要在finally處理
streamAllocation.streamFailed(null);
streamAllocation.release();
}
}
// Attach the prior response if it exists. Such responses never have a body.
if (priorResponse != null) {
//如果有重定向之前的reponse
response = response.newBuilder().priorResponse(priorResponse.newBuilder().body(null).build())
.build();
}
Request followUp = followUpRequest(response);
if (followUp == null) {
//如果沒有重定向請求
if (!forWebSocket) {
streamAllocation.release();
}
//大部分情況下,會以返回response結(jié)束
return response;
}
closeQuietly(response.body());
//重定向
if (++followUpCount > MAX_FOLLOW_UPS) {
streamAllocation.release();
throw new ProtocolException("Too many follow-up requests: " + followUpCount);
}
if (followUp.body() instanceof UnrepeatableRequestBody) {
streamAllocation.release();
throw new HttpRetryException("Cannot retry streamed HTTP body", response.code());
}
if (!sameConnection(response, followUp.url())) {
streamAllocation.release();
streamAllocation = new StreamAllocation(client.connectionPool(),
createAddress(followUp.url()), callStackTrace);
} else if (streamAllocation.codec() != null) {
throw new IllegalStateException("Closing the body of " + response
+ " didn't close its backing stream. Bad interceptor?");
}
//
request = followUp;
priorResponse = response;
}
}
方法也很長,但是流程比較簡單,其中有一個StreamAllocation比較重要,但在這里我們先不講。在try語塊中,重新又回到了RealInterceptorChain的proceed方法之中,這個方法執(zhí)行完畢后,會根據(jù)結(jié)果做不同處理,如失敗、返回Reponse、重試、重定向。值得注意的是,這里的RealInterceptorChain是我們在之前new出來的,index為加1之后的,對應(yīng)的Interceptor為BridgeInterceptor:
@Override
public Response intercept(Chain chain) throws IOException {
Request userRequest = chain.request();
Request.Builder requestBuilder = userRequest.newBuilder();
RequestBody body = userRequest.body();
if (body != null) {
MediaType contentType = body.contentType();
if (contentType != null) {
requestBuilder.header("Content-Type", contentType.toString());
}
long contentLength = body.contentLength();
if (contentLength != -1) {
requestBuilder.header("Content-Length", Long.toString(contentLength));
requestBuilder.removeHeader("Transfer-Encoding");
} else {
requestBuilder.header("Transfer-Encoding", "chunked");
requestBuilder.removeHeader("Content-Length");
}
}
if (userRequest.header("Host") == null) {
requestBuilder.header("Host", hostHeader(userRequest.url(), false));
}
if (userRequest.header("Connection") == null) {
requestBuilder.header("Connection", "Keep-Alive");
}
// If we add an "Accept-Encoding: gzip" header field we're responsible for also decompressing
// the transfer stream.
boolean transparentGzip = false;
if (userRequest.header("Accept-Encoding") == null && userRequest.header("Range") == null) {
transparentGzip = true;
requestBuilder.header("Accept-Encoding", "gzip");
}
List<Cookie> cookies = cookieJar.loadForRequest(userRequest.url());
if (!cookies.isEmpty()) {
requestBuilder.header("Cookie", cookieHeader(cookies));
}
if (userRequest.header("User-Agent") == null) {
requestBuilder.header("User-Agent", Version.userAgent());
}
Response networkResponse = chain.proceed(requestBuilder.build());
HttpHeaders.receiveHeaders(cookieJar, userRequest.url(), networkResponse.headers());
Response.Builder responseBuilder = networkResponse.newBuilder().request(userRequest);
if (transparentGzip && "gzip".equalsIgnoreCase(networkResponse.header("Content-Encoding"))
&& HttpHeaders.hasBody(networkResponse)) {
GzipSource responseBody = new GzipSource(networkResponse.body().source());
Headers strippedHeaders = networkResponse.headers().newBuilder().removeAll("Content-Encoding")
.removeAll("Content-Length").build();
responseBuilder.headers(strippedHeaders);
responseBuilder.body(new RealResponseBody(strippedHeaders, Okio.buffer(responseBody)));
}
return responseBuilder.build();
}
代碼雖長,但是很簡明,主要是對請求的header、body與響應(yīng)的header、body做了一些處理,然后繼續(xù)調(diào)用攔截鏈的proceed方法來處理對request處理后的request,這一次,我們來到了CacheInterceptor:
@Override
public Response intercept(Chain chain) throws IOException {
//拿到該request的緩存Response
Response cacheCandidate = cache != null ? cache.get(chain.request()) : null;
long now = System.currentTimeMillis();
//CacheStrategy即決定了發(fā)網(wǎng)絡(luò)請求還是使用緩存
CacheStrategy strategy = new CacheStrategy.Factory(now, chain.request(), cacheCandidate).get();
//如果沒有緩存,或者緩存無效,并且請求頭沒有設(shè)置“Cache-Control: only-if-cached”,networkRequest則不為null
Request networkRequest = strategy.networkRequest;
//如果緩存存在或者有效,那么cacheResponse不為null
Response cacheResponse = strategy.cacheResponse;
if (cache != null) {
//統(tǒng)計被請求、發(fā)出網(wǎng)絡(luò)請求、使用緩存的數(shù)目
cache.trackResponse(strategy);
}
if (cacheCandidate != null && cacheResponse == null) {
//緩存雖存在,但是已經(jīng)失效了,關(guān)閉資源
closeQuietly(cacheCandidate.body()); // The cache candidate wasn't applicable. Close it.
}
// If we're forbidden from using the network and the cache is insufficient, fail.
if (networkRequest == null && cacheResponse == null) {
//直接504超時
return new Response.Builder().request(chain.request()).protocol(Protocol.HTTP_1_1).code(504)
.message("Unsatisfiable Request (only-if-cached)").body(Util.EMPTY_RESPONSE)
.sentRequestAtMillis(-1L).receivedResponseAtMillis(System.currentTimeMillis()).build();
}
// If we don't need the network, we're done.
if (networkRequest == null) {
//緩存有效,自此,整個攔截鏈調(diào)用就結(jié)束了,這里就會回到我們之前的RetryAndFollowUpInterceptor的循環(huán)中去了
return cacheResponse.newBuilder().cacheResponse(stripBody(cacheResponse)).build();
}
Response networkResponse = null;
try {
//緩存無效,那么繼續(xù)走我們的攔截鏈
networkResponse = chain.proceed(networkRequest);
} finally {
// If we're crashing on I/O or otherwise, don't leak the cache body.
if (networkResponse == null && cacheCandidate != null) {
closeQuietly(cacheCandidate.body());
}
}
// If we have a cache response too, then we're doing a conditional get.
if (cacheResponse != null) {
//如果攔截鏈繼續(xù)走執(zhí)行完返回的header有not modified,那么取緩存更新并返回
if (networkResponse.code() == HTTP_NOT_MODIFIED) {
Response response = cacheResponse.newBuilder()
.headers(combine(cacheResponse.headers(), networkResponse.headers()))
.sentRequestAtMillis(networkResponse.sentRequestAtMillis())
.receivedResponseAtMillis(networkResponse.receivedResponseAtMillis())
.cacheResponse(stripBody(cacheResponse)).networkResponse(stripBody(networkResponse))
.build();
networkResponse.body().close();
// Update the cache after combining headers but before stripping the
// Content-Encoding header (as performed by initContentStream()).
cache.trackConditionalCacheHit();
cache.update(cacheResponse, response);
return response;
} else {
closeQuietly(cacheResponse.body());
}
}
//如果是新數(shù)據(jù),做緩存處理后返回
Response response = networkResponse.newBuilder().cacheResponse(stripBody(cacheResponse))
.networkResponse(stripBody(networkResponse)).build();
if (HttpHeaders.hasBody(response)) {
CacheRequest cacheRequest = maybeCache(response, networkResponse.request(), cache);
response = cacheWritingResponse(cacheRequest, response);
}
return response;
}
這里注釋比較詳細(xì),根據(jù)注釋就能明白整個緩存處理的流程,我們繼續(xù)走攔截鏈,當(dāng)沒有緩存或者緩存已經(jīng)失效時,上述代碼會執(zhí)行:
networkResponse = chain.proceed(networkRequest);
繼續(xù)我們的攔截鏈,這一次,來到了ConnectInterceptor:
@Override
public Response intercept(Chain chain) throws IOException {
RealInterceptorChain realChain = (RealInterceptorChain) chain;
Request request = realChain.request();
StreamAllocation streamAllocation = realChain.streamAllocation();
// We need the network to satisfy this request. Possibly for validating a conditional GET.
boolean doExtensiveHealthChecks = !request.method().equals("GET");
HttpCodec httpCodec = streamAllocation.newStream(client, doExtensiveHealthChecks);
RealConnection connection = streamAllocation.connection();
return realChain.proceed(request, streamAllocation, httpCodec, connection);
}
終于有個比較短的intercept了,但其實這里代碼確是最復(fù)雜的。注意上述代碼中,從RealInterceptorChain中拿到了一個StreamAllocation,這個我們之前沒有提及,事實上,可以將它理解為一個協(xié)調(diào)器,用來協(xié)調(diào)connection、stream、call三者之間的關(guān)系,在http/2中每個connection可能對應(yīng)多個stream,而同一個call可能也對應(yīng)著多個stream。它是在第一個攔截器,即retryAndFollowUpInterceptor被初始化的,我們先看看它時如何初始化的:
public StreamAllocation(ConnectionPool connectionPool, Address address, Object callStackTrace) {
this.connectionPool = connectionPool;
this.address = address;
this.routeSelector = new RouteSelector(address, routeDatabase());
this.callStackTrace = callStackTrace;
}
其初始化需要一個連接池、物理地址、路由選擇器,新的類比較多,我們用到的時候再解釋每個是干嘛用的,這里我們只看看Address這個參數(shù)是怎么獲取的:
private Address createAddress(HttpUrl url) {
SSLSocketFactory sslSocketFactory = null;
HostnameVerifier hostnameVerifier = null;
CertificatePinner certificatePinner = null;
if (url.isHttps()) {
//如果是https,那么還需要初始化sslsocket、主機認(rèn)證、證書編解碼相關(guān)類
sslSocketFactory = client.sslSocketFactory();
hostnameVerifier = client.hostnameVerifier();
certificatePinner = client.certificatePinner();
}
return new Address(url.host(), url.port(), client.dns(), client.socketFactory(), sslSocketFactory,
hostnameVerifier, certificatePinner, client.proxyAuthenticator(), client.proxy(),
client.protocols(), client.connectionSpecs(), client.proxySelector());
}
回到ConnectInterceptor的intercept方法:
HttpCodec httpCodec = streamAllocation.newStream(client, doExtensiveHealthChecks);
這句代碼通過調(diào)用streamAllocation的newStream拿到了一個HttpCodec,HttpCodec實質(zhì)上就是一個輔助類,主要負(fù)責(zé)在Socket連接中,遵循h(huán)ttp的規(guī)范來進行流的輸入輸出,其內(nèi)部是通過OK IO這個jar包中的相關(guān)API實現(xiàn)流的讀寫操作,OK IO我們就不去詳細(xì)介紹了,內(nèi)部通過java nio相關(guān)api實現(xiàn)的。只要記住,凡是繼承sink的就是輸出流,繼承source的就是輸入流即可,HttpCodec有兩個子類,Http1Codec 與Http2Codec,分別對應(yīng)著http/1.x和http/2.x,我們來看看newStream的具體實現(xiàn):
public HttpCodec newStream(OkHttpClient client, boolean doExtensiveHealthChecks) {
int connectTimeout = client.connectTimeoutMillis();
int readTimeout = client.readTimeoutMillis();
int writeTimeout = client.writeTimeoutMillis();
boolean connectionRetryEnabled = client.retryOnConnectionFailure();
try {
RealConnection resultConnection = findHealthyConnection(connectTimeout, readTimeout, writeTimeout,
connectionRetryEnabled, doExtensiveHealthChecks);
HttpCodec resultCodec = resultConnection.newCodec(client, this);
synchronized (connectionPool) {
codec = resultCodec;
return resultCodec;
}
} catch (IOException e) {
throw new RouteException(e);
}
}
這里的關(guān)鍵代碼在于如何拿到一個RealConnection,ReConnection實質(zhì)上是一個對Socket連接的封裝類,與Sdk的HttpUrlConnection功能上是差不多的,最終獲取RealConnection的代碼走到了方法:
private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout,
boolean connectionRetryEnabled) throws IOException {
Route selectedRoute;
synchronized (connectionPool) {
if (released)
throw new IllegalStateException("released");
if (codec != null)
throw new IllegalStateException("codec != null");
if (canceled)
throw new IOException("Canceled");
// 嘗試去獲取之前已經(jīng)獲取的connection
RealConnection allocatedConnection = this.connection;
if (allocatedConnection != null && !allocatedConnection.noNewStreams) {
//如果這個連接存在并且沒有新的請求直接拿來用
return allocatedConnection;
}
// 嘗試從連接池中獲取連接
Internal.instance.get(connectionPool, address, this, null);
if (connection != null) {
return connection;
}
selectedRoute = route;
}
//這里阻塞去獲取能用的服務(wù)器,如果遍歷完沒有可用的服務(wù)器,會拋出異常結(jié)束程序
if (selectedRoute == null) {
selectedRoute = routeSelector.next();
}
RealConnection result;
//涉及到連接池的讀寫,加上同步
synchronized (connectionPool) {
if (canceled)
throw new IOException("Canceled");
//獲取新的服務(wù)器地址之后,再次嘗試去連接池中找到連接來復(fù)用
Internal.instance.get(connectionPool, address, this, selectedRoute);
if (connection != null)
return connection;
// Create a connection and assign it to this allocation immediately. This makes it possible
// for an asynchronous cancel() to interrupt the handshake we're about to do.
route = selectedRoute;
refusedStreamCount = 0;
result = new RealConnection(connectionPool, selectedRoute);
//這個方法將創(chuàng)建的connection賦值給成員變量connection
acquire(result);
}
//這里釋放了鎖
// Do TCP + TLS handshakes. This is a blocking operation.
result.connect(connectTimeout, readTimeout, writeTimeout, connectionRetryEnabled);
routeDatabase().connected(result.route());
Socket socket = null;
//涉及到連接池的讀寫這里又鎖上了
synchronized (connectionPool) {
// 將連接存入連接池
Internal.instance.put(connectionPool, result);
// If another multiplexed connection to the same address was created concurrently, then
// release this connection and acquire that one.
//這里意思是如果同時有一個http/2.0的connection存在,那么我們廢棄剛剛創(chuàng)建的那個改用那個連接
//之所以會存在同時創(chuàng)建的connection,注意到這段代碼中間握手的代碼并沒有加上同步,因為它是阻塞代碼
//那么很可能我們這條線程在執(zhí)行握手的時候,別的線程也在握手中,并且先于我們走到到將連接放入連接池
// 的代碼,之后我們再進來,就可能存在指向同一個地址的連接。
if (result.isMultiplexed()) {
socket = Internal.instance.deduplicate(connectionPool, address, this);
result = connection;
}
}
//關(guān)閉socket的代碼放在同步外面,因為這句代碼沒有必要同步,加快釋放鎖的時間
closeQuietly(socket);
return result;
}
這個方法代碼比較復(fù)雜,所以注釋的比較詳細(xì),我們再來看看RealConnection的握手過程
public void connect(int connectTimeout, int readTimeout, int writeTimeout,
boolean connectionRetryEnabled) {
//協(xié)議已經(jīng)確定
if (protocol != null)
throw new IllegalStateException("already connected");
RouteException routeException = null;
//傳輸層協(xié)議(tls)與加密套件,即OkHttpClient的默認(rèn)數(shù)組中的
List<ConnectionSpec> connectionSpecs = route.address().connectionSpecs();
//創(chuàng)建一個傳輸協(xié)議選擇器
ConnectionSpecSelector connectionSpecSelector = new ConnectionSpecSelector(connectionSpecs);
if (route.address().sslSocketFactory() == null) {
//說明不是https
if (!connectionSpecs.contains(ConnectionSpec.CLEARTEXT)) {
throw new RouteException(
new UnknownServiceException("CLEARTEXT communication not enabled for client"));
}
String host = route.address().url().host();
//沒看懂,因為isCleartextTrafficPermitted永遠(yuǎn)返回的true
if (!Platform.get().isCleartextTrafficPermitted(host)) {
throw new RouteException(new UnknownServiceException(
"CLEARTEXT communication to " + host + " not permitted by network security policy"));
}
}
while (true) {
try {
//如果是https而又是通過http代理訪問的,最終都會走到connnectSocket
if (route.requiresTunnel()) {
connectTunnel(connectTimeout, readTimeout, writeTimeout);
} else {
connectSocket(connectTimeout, readTimeout);
}
establishProtocol(connectionSpecSelector);
break;
} catch (IOException e) {
closeQuietly(socket);
closeQuietly(rawSocket);
socket = null;
rawSocket = null;
source = null;
sink = null;
handshake = null;
protocol = null;
http2Connection = null;
if (routeException == null) {
routeException = new RouteException(e);
} else {
routeException.addConnectException(e);
}
if (!connectionRetryEnabled || !connectionSpecSelector.connectionFailed(e)) {
throw routeException;
}
}
}
if (http2Connection != null) {
synchronized (connectionPool) {
allocationLimit = http2Connection.maxConcurrentStreams();
}
}
}
最終,代碼走到了connectSock中真正建立了連接:
private void connectSocket(int connectTimeout, int readTimeout) throws IOException {
Proxy proxy = route.proxy();
Address address = route.address();
rawSocket = proxy.type() == Proxy.Type.DIRECT || proxy.type() == Proxy.Type.HTTP
? address.socketFactory().createSocket() : new Socket(proxy);
rawSocket.setSoTimeout(readTimeout);
try {
//這里,便真正地建立了連接,Platform用于平臺適配
Platform.get().connectSocket(rawSocket, route.socketAddress(), connectTimeout);
} catch (ConnectException e) {
ConnectException ce = new ConnectException("Failed to connect to " + route.socketAddress());
ce.initCause(e);
throw ce;
}
source = Okio.buffer(Okio.source(rawSocket));
sink = Okio.buffer(Okio.sink(rawSocket));
}
這里主要做了三件事,創(chuàng)建了Socket、拿到了Socket建立連接后的輸入流、輸出流、有了這三者,和服務(wù)器的通訊就建立起來了,然后便通過establishProtocol(connectionSpecSelector),建立起與服務(wù)器進行通訊的協(xié)議:
private void establishProtocol(ConnectionSpecSelector connectionSpecSelector) throws IOException {
if (route.address().sslSocketFactory() == null) {
//都不是https,那只能1.x了
protocol = Protocol.HTTP_1_1;
socket = rawSocket;
return;
}
connectTls(connectionSpecSelector);
//如果支持http2.0協(xié)議,那么創(chuàng)建一個http2Connection,并開啟,這里實際是開啟一個線程去不斷地進行讀操作
if (protocol == Protocol.HTTP_2) {
socket.setSoTimeout(0); // HTTP/2 connection timeouts are set per-stream.
http2Connection = new Http2Connection.Builder(true)
.socket(socket, route.address().url().host(), source, sink).listener(this).build();
http2Connection.start();
}
}
我們再看看connectTls:
private void connectTls(ConnectionSpecSelector connectionSpecSelector) throws IOException {
Address address = route.address();
SSLSocketFactory sslSocketFactory = address.sslSocketFactory();
boolean success = false;
SSLSocket sslSocket = null;
try {
// Create the wrapper over the connected socket.
sslSocket = (SSLSocket) sslSocketFactory.createSocket(rawSocket, address.url().host(),
address.url().port(), true /* autoClose */);
// Configure the socket's ciphers, TLS versions, and extensions.
//這里是去驗證服務(wù)器握手傳遞過來的數(shù)據(jù),包括公鑰,傳輸協(xié)議版本等信息
ConnectionSpec connectionSpec = connectionSpecSelector.configureSecureSocket(sslSocket);
if (connectionSpec.supportsTlsExtensions()) {
Platform.get().configureTlsExtensions(sslSocket, address.url().host(), address.protocols());
}
// Force handshake. This can throw!
sslSocket.startHandshake();
//通過握手,拿到Session,并將傳輸協(xié)議版本,支持的解密套件,及SSL證書解析出來
Handshake unverifiedHandshake = Handshake.get(sslSocket.getSession());
// Verify that the socket's certificates are acceptable for the target host.
if (!address.hostnameVerifier().verify(address.url().host(), sslSocket.getSession())) {
X509Certificate cert = (X509Certificate) unverifiedHandshake.peerCertificates().get(0);
throw new SSLPeerUnverifiedException(
"Hostname " + address.url().host() + " not verified:" + "\n certificate: "
+ CertificatePinner.pin(cert) + "\n DN: " + cert.getSubjectDN().getName()
+ "\n subjectAltNames: " + OkHostnameVerifier.allSubjectAltNames(cert));
}
// Check that the certificate pinner is satisfied by the certificates presented.
//檢查該證書是否有效,注意這里如果沒有檢查沒通過將會拋出AssertException
//結(jié)束程序,因此,訪問自簽名證書且沒有將證書配置到OkHttpClient的https網(wǎng)站,將會導(dǎo)致程序crash
address.certificatePinner().check(address.url().host(), unverifiedHandshake.peerCertificates());
//到這里證書驗證通過
// Success! Save the handshake and the ALPN protocol.
String maybeProtocol = connectionSpec.supportsTlsExtensions()
? Platform.get().getSelectedProtocol(sslSocket) : null;
socket = sslSocket;
source = Okio.buffer(Okio.source(socket));
sink = Okio.buffer(Okio.sink(socket));
handshake = unverifiedHandshake;
//拿到服務(wù)器支持的協(xié)議
protocol = maybeProtocol != null ? Protocol.get(maybeProtocol) : Protocol.HTTP_1_1;
success = true;
} catch (AssertionError e) {
if (Util.isAndroidGetsocknameError(e))
throw new IOException(e);
throw e;
} finally {
if (sslSocket != null) {
Platform.get().afterHandshake(sslSocket);
}
if (!success) {
closeQuietly(sslSocket);
}
}
}
到這里,整個握手的階段就已經(jīng)完成了,也就是RealConnection創(chuàng)建成功,我們回到方法streamAllocation.newStream中,現(xiàn)在我們有了Connnection來初始化一個HttpCodec,來看看它的初始化過程,RealConnection通過newCodec來進行初始化HttpCodec:
public HttpCodec newCodec(OkHttpClient client, StreamAllocation streamAllocation) throws SocketException {
//根據(jù)協(xié)議版本的不同,創(chuàng)建不同的HttpCodec
if (http2Connection != null) {
return new Http2Codec(client, streamAllocation, http2Connection);
} else {
socket.setSoTimeout(client.readTimeoutMillis());
source.timeout().timeout(client.readTimeoutMillis(), MILLISECONDS);
sink.timeout().timeout(client.writeTimeoutMillis(), MILLISECONDS);
return new Http1Codec(client, streamAllocation, source, sink);
}
}
還記得我們getResponseWithInterceptorChain方法中創(chuàng)建的第一個RealInterceptorChain,那時大多數(shù)參數(shù)還是null,攔截鏈執(zhí)行到這兒,總算是把參數(shù)都湊齊了,我們繼續(xù)回到ConnectInterceptor的intercept方法中,由于隔得遠(yuǎn),這里干脆再貼下代碼:
@Override
public Response intercept(Chain chain) throws IOException {
RealInterceptorChain realChain = (RealInterceptorChain) chain;
Request request = realChain.request();
StreamAllocation streamAllocation = realChain.streamAllocation();
// We need the network to satisfy this request. Possibly for validating a conditional GET.
boolean doExtensiveHealthChecks = !request.method().equals("GET");
HttpCodec httpCodec = streamAllocation.newStream(client, doExtensiveHealthChecks);
RealConnection connection = streamAllocation.connection();
return realChain.proceed(request, streamAllocation, httpCodec, connection);
}
終于是來到了攔截鏈的最后一環(huán),這一次的proceed將對應(yīng)著會調(diào)用到CallServerInterceptor的intercept方法:
@Override
public Response intercept(Chain chain) throws IOException {
HttpCodec httpCodec = ((RealInterceptorChain) chain).httpStream();
StreamAllocation streamAllocation = ((RealInterceptorChain) chain).streamAllocation();
Request request = chain.request();
long sentRequestMillis = System.currentTimeMillis();
//寫入請求頭
httpCodec.writeRequestHeaders(request);
Response.Builder responseBuilder = null;
if (HttpMethod.permitsRequestBody(request.method()) && request.body() != null) {
// If there's a "Expect: 100-continue" header on the request, wait for a "HTTP/1.1 100
// Continue" response before transmitting the request body. If we don't get that, return what
// we did get (such as a 4xx response) without ever transmitting the request body.
//100-continue用于客戶端在發(fā)送POST數(shù)據(jù)給服務(wù)器前,征詢服務(wù)器情況,看服務(wù)器是否處
// 理POST的數(shù)據(jù),如果不處理,客戶端則不上傳POST數(shù)據(jù),如果處理,則POST上傳數(shù)據(jù)。在現(xiàn)實應(yīng)
// 用中,通過在POST大數(shù)據(jù)時,才會使用100-continue協(xié)議
if ("100-continue".equalsIgnoreCase(request.header("Expect"))) {
httpCodec.flushRequest();
responseBuilder = httpCodec.readResponseHeaders(true);
}
// Write the request body, unless an "Expect: 100-continue" expectation failed.
//如果沒有100-continue頭,那么會執(zhí)行這里
if (responseBuilder == null) {
Sink requestBodyOut = httpCodec.createRequestBody(request, request.body().contentLength());
BufferedSink bufferedRequestBody = Okio.buffer(requestBodyOut);
request.body().writeTo(bufferedRequestBody);
bufferedRequestBody.close();
}
}
//到這里 整個request數(shù)據(jù)就寫進去了,下面這句代碼flush了一下
httpCodec.finishRequest();
if (responseBuilder == null) {
//這里builder會記錄下這次響應(yīng)的頭信息
responseBuilder = httpCodec.readResponseHeaders(false);
}
//創(chuàng)建一個響應(yīng),依次為綁定request,獲取握手的session數(shù)據(jù),請求發(fā)出去時間,接收到響應(yīng)時間
Response response = responseBuilder.request(request)
.handshake(streamAllocation.connection().handshake()).sentRequestAtMillis(sentRequestMillis)
.receivedResponseAtMillis(System.currentTimeMillis()).build();
int code = response.code();
if (forWebSocket && code == 101) {
//切換協(xié)議,這里需要確保拿到的不是空的body
response = response.newBuilder().body(Util.EMPTY_RESPONSE).build();
} else {
//注意這里responsebody是一個輸入流,意味著可能是大文件
response = response.newBuilder().body(httpCodec.openResponseBody(response)).build();
}
//響應(yīng)頭包含close,標(biāo)識關(guān)閉,此時可以復(fù)用Connection
if ("close".equalsIgnoreCase(response.request().header("Connection"))
|| "close".equalsIgnoreCase(response.header("Connection"))) {
streamAllocation.noNewStreams();
}
//響應(yīng)成功,標(biāo)識無內(nèi)容,但是又明明有內(nèi)容
if ((code == 204 || code == 205) && response.body().contentLength() > 0) {
throw new ProtocolException(
"HTTP " + code + " had non-zero Content-Length: " + response.body().contentLength());
}
return response;
}
代碼不是很復(fù)雜,主要是一些讀流寫流的操作罷了,如果以上環(huán)節(jié)都沒有出現(xiàn)問題,那么這個Response會一路return到攔截鏈的開始,繼而通過方法getResponseWithInterceptorChain()返回,從而我們在AsyncCall中的回調(diào)得到了執(zhí)行,自此,整個Interceptor的流程就結(jié)束了。我們將整個流程作出草圖:

整個設(shè)計是非常優(yōu)雅的,可以說是責(zé)任鏈模式的完美體現(xiàn),又有點類似AOP設(shè)計思想。不但將從Request獲取Reponse整個過程從RealCall中剝離出來,而且分層設(shè)計,每個攔截器均有可能完成這一工作,每個攔截器自行決定是自己完成整個工作,還是完成部分,然后交由下一個攔截器來完成,各個攔截器之間代碼獨立,各司其職,耦合度極低,即使需要替換某一個攔截器,也完全不會影響到其他攔截器的運行,有點類似于工廠的流水線。
責(zé)任鏈模式在Android中的體現(xiàn)即觸摸事件的傳遞機制,這里我們不再展開,下面說說AOP,面向切面編程的思想。
面向切面編程(AOP是Aspect Oriented Program的首字母縮寫) ,我們知道,面向?qū)ο蟮奶攸c是繼承、多態(tài)和封裝。而封裝就要求將功能分散到不同的對象中去,這在軟件設(shè)計中往往稱為職責(zé)分配。實際上也就是說,讓不同的類設(shè)計不同的方法。這樣代碼就分散到一個個的類中去了。這樣做的好處是降低了代碼的復(fù)雜程度,使類可重用。 但是人們也發(fā)現(xiàn),在分散代碼的同時,也增加了代碼的重復(fù)性。什么意思呢?比如說,我們在兩個類中,可能都需要在每個方法中做日志。按面向?qū)ο蟮脑O(shè)計方法,我們就必須在兩個類的方法中都加入日志的內(nèi)容。也許他們是完全相同的,但就是因為面向?qū)ο蟮脑O(shè)計讓類與類之間無法聯(lián)系,而不能將這些重復(fù)的代碼統(tǒng)一起來。 也許有人會說,那好辦啊,我們可以將這段代碼寫在一個獨立的類獨立的方法里,然后再在這兩個類中調(diào)用。但是,這樣一來,這兩個類跟我們上面提到的獨立的類就有耦合了,它的改變會影響這兩個類。那么,有沒有什么辦法,能讓我們在需要的時候,隨意地加入代碼呢?這種在運行時,動態(tài)地將代碼切入到類的指定方法、指定位置上的編程思想就是面向切面的編程。 (注:本段文字來自知乎,鏈接https://www.zhihu.com/question/24863332/answer/48376158,著作權(quán)歸作者所有)
雖然真正的AOP是與動態(tài)代理密不可分的,但為什么說這里類似AOP的設(shè)計思想呢?從上述的攔截鏈,我們可以發(fā)現(xiàn),Okhttp提供了兩個位置供開發(fā)者去自定義攔截器,一個是攔截鏈的開頭,一個是與服務(wù)器數(shù)據(jù)交互之前,為什么不是其他位置呢?事實上,Okhttp將整個攔截鏈流程,分為兩個切面,連接層與通訊層,從而,開發(fā)者自定義攔截器的地方恰好就是連接點:

雖然在使用Okhttp時,我們一般比較少去對攔截器進行自定義,但是可以看到,Okhttp早就為這種擴展預(yù)留好了接口,事實上,包括java的線程池、android的asynctask對aop的設(shè)計思路均有一定體現(xiàn),從而做到了以不變應(yīng)萬變,極大地提高了程序的擴展性,在Okhttp的wiki上可以找到這樣一段話:
Since making this change we’ve been able to simplify OkHttp’s internals substantially. The code is faster and easier to understand: the whole thing is just a stack of built-in interceptors.
下面貼兩個Okhttp wiki上的兩個自定義的Interceptor,打印Log:
class LoggingInterceptor implements Interceptor {
@Override public Response intercept(Interceptor.Chain chain) throws IOException {
Request request = chain.request();
long t1 = System.nanoTime();
logger.info(String.format("Sending request %s on %s%n%s",
request.url(), chain.connection(), request.headers()));
Response response = chain.proceed(request);
long t2 = System.nanoTime();
logger.info(String.format("Received response for %s in %.1fms%n%s",
response.request().url(), (t2 - t1) / 1e6d, response.headers()));
return response;
}
}
Gzip壓縮:
final class GzipRequestInterceptor implements Interceptor {
@Override public Response intercept(Interceptor.Chain chain) throws IOException {
Request originalRequest = chain.request();
if (originalRequest.body() == null || originalRequest.header("Content-Encoding") != null) {
return chain.proceed(originalRequest);
}
Request compressedRequest = originalRequest.newBuilder()
.header("Content-Encoding", "gzip")
.method(originalRequest.method(), gzip(originalRequest.body()))
.build();
return chain.proceed(compressedRequest);
}
private RequestBody gzip(final RequestBody body) {
return new RequestBody() {
@Override public MediaType contentType() {
return body.contentType();
}
@Override public long contentLength() {
return -1; // We don't know the compressed length in advance!
}
@Override public void writeTo(BufferedSink sink) throws IOException {
BufferedSink gzipSink = Okio.buffer(new GzipSink(sink));
body.writeTo(gzipSink);
gzipSink.close();
}
};
}
}