系列索引
本系列文章基于 OkHttp3.14
OkHttp 源碼剖析系列(一)——請求的發(fā)起及攔截器機制概述
OkHttp 源碼剖析系列(六)——連接復用機制及連接的建立
OkHttp 源碼剖析系列(七)——請求的發(fā)起及響應的讀取
前言
前面的文章分析完了 OkHttp 中的緩存機制,現(xiàn)在讓我們繼續(xù)來研究其在 ConnectInterceptor 中所進行的連接建立的相關(guān)原理。由于連接建立的過程涉及到很多在 OkHttp 中非常重要的機制,因此將分為多篇文章進行介紹,這篇文章主要是對連接建立的大體流程進行介紹。
連接建立流程概述
在 ConnectInterceptor.intercept 方法中真正實現(xiàn)了連接的建立的代碼如下:
// 如果請求是GET格式,需要一些額外的檢查
boolean doExtensiveHealthChecks = !request.method().equals("GET");
Exchange exchange = transmitter.newExchange(chain, doExtensiveHealthChecks);
根據(jù)上面的代碼我們可以推測,這個 Exchange 類與我們的連接是有一些關(guān)系的,真正連接的建立過程在 transmitter.newExchange 中實現(xiàn)。
我們看到 transmitter.newExchange 方法:
/**
* Returns a new exchange to carry a new request and response.
*/
Exchange newExchange(Interceptor.Chain chain, boolean doExtensiveHealthChecks) {
synchronized (connectionPool) {
if (noMoreExchanges) {
throw new IllegalStateException("released");
}
if (exchange != null) {
throw new IllegalStateException("cannot make a new request because the previous response "
+ "is still open: please call response.close()");
}
}
// 尋找ExchangeCodec對象
ExchangeCodec codec = exchangeFinder.find(client, chain, doExtensiveHealthChecks);
// 通過找到的codec對象構(gòu)建Exchange對象
Exchange result = new Exchange(this, call, eventListener, exchangeFinder, codec);
// 進行一些變量的賦值
synchronized (connectionPool) {
this.exchange = result;
this.exchangeRequestDone = false;
this.exchangeResponseDone = false;
return result;
}
}
獲取連接
上面首先通過 exchangeFinder.find 方法進行了對 ExchangeCodec 的查找,找到對應的 ExchangeCodec 對象,之后通過這個 codec 對象構(gòu)建了一個 Exchange 對象并返回
那么什么是 ExchangeCodec 對象呢?我們先看到 exchangeFinder.find 方法:
public ExchangeCodec find(
OkHttpClient client, Interceptor.Chain chain, boolean doExtensiveHealthChecks) {
int connectTimeout = chain.connectTimeoutMillis();
int readTimeout = chain.readTimeoutMillis();
int writeTimeout = chain.writeTimeoutMillis();
int pingIntervalMillis = client.pingIntervalMillis();
boolean connectionRetryEnabled = client.retryOnConnectionFailure();
try {
RealConnection resultConnection = findHealthyConnection(connectTimeout, readTimeout,
writeTimeout, pingIntervalMillis, connectionRetryEnabled, doExtensiveHealthChecks);
return resultConnection.newCodec(client, chain);
} catch (RouteException e) {
trackFailure();
throw e;
} catch (IOException e) {
trackFailure();
throw new RouteException(e);
}
}
可以看到這里調(diào)用到了 findHealthyConnection 方法從而獲取 RealConnection 對象,看來這個就是我們的連接了,之后調(diào)用了 RealConnection.newCodec 方法獲取 ExchangeCodec 對象。
尋找可用連接
我們先看到 findHealthyConnection 方法:
/**
* Finds a connection and returns it if it is healthy. If it is unhealthy the process is repeated
* until a healthy connection is found.
*/
private RealConnection findHealthyConnection(int connectTimeout, int readTimeout,
int writeTimeout, int pingIntervalMillis, boolean connectionRetryEnabled,
boolean doExtensiveHealthChecks) throws IOException {
while (true) {
RealConnection candidate = findConnection(connectTimeout, readTimeout, writeTimeout,
pingIntervalMillis, connectionRetryEnabled);
// If this is a brand new connection, we can skip the extensive health checks.
synchronized (connectionPool) {
if (candidate.successCount == 0) {
return candidate;
}
}
// Do a (potentially slow) check to confirm that the pooled connection is still good. If it
// isn't, take it out of the pool and start again.
if (!candidate.isHealthy(doExtensiveHealthChecks)) {
candidate.noNewExchanges();
continue;
}
return candidate;
}
}
可以看到這里是一個循環(huán),不斷地在調(diào)用 findConnection 方法尋找連接,若找不到 Healthy(可用)的連接,則繼續(xù)循環(huán)直到找到為止。
尋找連接
我們先看到 findConnection 方法:
/**
* Returns a connection to host a new stream. This prefers the existing connection if it exists,
* then the pool, finally building a new connection.
*/
private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout,
int pingIntervalMillis, boolean connectionRetryEnabled) throws IOException {
boolean foundPooledConnection = false;
RealConnection result = null;
Route selectedRoute = null;
RealConnection releasedConnection;
Socket toClose;
synchronized (connectionPool) {
if (transmitter.isCanceled()) throw new IOException("Canceled");
hasStreamFailure = false; // This is a fresh attempt.
// 嘗試使用之前已分配的連接,但可能該連接不能用來創(chuàng)建新的Exchange
releasedConnection = transmitter.connection;
// 如果當前的連接不能被用來創(chuàng)建新的Exchange,則將連接釋放并返回對應Socket準備close
toClose = transmitter.connection != null && transmitter.connection.noNewExchanges
? transmitter.releaseConnectionNoEvents()
: null;
if (transmitter.connection != null) {
// 存在已分配的連接,將其置為result,并置releasedConnection為null
result = transmitter.connection;
releasedConnection = null;
}
if (result == null) {
// 如果不存在已經(jīng)分配的連接,則嘗試從連接池中獲取連接
if (connectionPool.transmitterAcquirePooledConnection(address, transmitter, null, false)) {
foundPooledConnection = true;
result = transmitter.connection;
} else if (nextRouteToTry != null) {
// 修改當前選擇路由為下一個路由
selectedRoute = nextRouteToTry;
nextRouteToTry = null;
} else if (retryCurrentRoute()) {
// 如果當前Connection的路由應當重試,則將選擇的路由設(shè)置為當前路由
selectedRoute = transmitter.connection.route();
}
}
}
closeQuietly(toClose);
if (releasedConnection != null) {
eventListener.connectionReleased(call, releasedConnection);
}
if (foundPooledConnection) {
eventListener.connectionAcquired(call, result);
}
if (result != null) {
// 如果已經(jīng)找到了已分配的或從連接池中取出的Connection,則直接返回
return result;
}
// 如果需要進行路由選擇,則進行一次路由選擇
boolean newRouteSelection = false;
if (selectedRoute == null && (routeSelection == null || !routeSelection.hasNext())) {
newRouteSelection = true;
routeSelection = routeSelector.next();
}
List<Route> routes = null;
synchronized (connectionPool) {
if (transmitter.isCanceled()) throw new IOException("Canceled");
if (newRouteSelection) {
// 路由選擇過后如今有了一組IP地址,我們再次嘗試從連接池中獲取連接
routes = routeSelection.getAll();
if (connectionPool.transmitterAcquirePooledConnection(
address, transmitter, routes, false)) {
foundPooledConnection = true;
result = transmitter.connection;
}
}
if (!foundPooledConnection) {
if (selectedRoute == null) {
selectedRoute = routeSelection.next();
}
// 如果第二次嘗試從連接池獲取連接仍然失敗,則創(chuàng)建新的連接。
result = new RealConnection(connectionPool, selectedRoute);
connectingConnection = result;
}
}
if (foundPooledConnection) {
// 如果第二次嘗試從連接池獲取連接成功,則將其返回
eventListener.connectionAcquired(call, result);
return result;
}
// 執(zhí)行TCP+TLS握手,這是個阻塞的過程
result.connect(connectTimeout, readTimeout, writeTimeout, pingIntervalMillis,
connectionRetryEnabled, call, eventListener);
connectionPool.routeDatabase.connected(result.route());
Socket socket = null;
synchronized (connectionPool) {
connectingConnection = null;
// 最后一次嘗試從連接池中獲取連接,這種情況只可能在一個host下多個并發(fā)連接這種情況下
if (connectionPool.transmitterAcquirePooledConnection(address, transmitter, routes, true)) {
// 如果成功拿到則關(guān)閉我們前面創(chuàng)建的連接的Socket,并返回連接池中的連接
result.noNewExchanges = true;
socket = result.socket();
result = transmitter.connection;
} else {
// 如果失敗則在連接池中放入我們剛剛創(chuàng)建的連接,并將其設(shè)置為transmitter中的連接
connectionPool.put(result);
transmitter.acquireConnectionNoEvents(result);
}
}
closeQuietly(socket);
eventListener.connectionAcquired(call, result);
return result;
}
這個尋找連接的過程是非常復雜的,主要是下列幾個步驟:
- 嘗試獲取
transmitter中已經(jīng)存在的連接,也就是當前Call之前創(chuàng)建的連接。 - 若獲取不到,則嘗試從連接池中調(diào)用
transmitterAcquirePooledConnection方法獲取連接,傳入的routes參數(shù)為 null - 若仍獲取不到連接,判斷是否需要路由選擇,如果需要,調(diào)用
routeSelector.next進行路由選擇 - 如果進行了路由選擇,則再次嘗試從連接池中調(diào)用
transmitterAcquirePooledConnection方法獲取連接,傳入的routes為剛剛路由選擇后所獲取的路由列表 - 若仍然獲取不到連接,則調(diào)用
RealConnection的構(gòu)造函數(shù)創(chuàng)建新的連接,并對其執(zhí)行 TCP + TLS握手。 - TCP + TSL握手之后,會再次嘗試從連接池中通過
transmitterAcquirePooledConnection方法獲取連接,這種情況只會出現(xiàn)在一個 Host 對應多個并發(fā)連接的情況下(因為 HTTP/2 支持了多路復用,使得多個請求可以并發(fā)執(zhí)行,此時可能有其他使用該 TCP 連接的請求也創(chuàng)建了連接,就不需要重新創(chuàng)建了)。 - 若最后一次從連接池中獲取連接獲取成功,會釋放之前創(chuàng)建的連接的相關(guān)資源。
- 若仍獲取不到,則將該連接放入連接池,并將其設(shè)置為
transmitter的連接。
可以看到,尋找連接的過程主要被分成了三種行為,分別是
- 嘗試獲取
transmitter中已經(jīng)分配的連接 - 嘗試從線程池中調(diào)用
transmitterAcquirePooledConnection獲取連接 - 創(chuàng)建新連接。
有點類似圖片加載的三級緩存,顯然自上而下是越來越消耗資源的,因此 OkHttp 更偏向于前面直接能夠獲取到連接,尤其是嘗試從連接池進行獲取連接這一操作進行了三次。
不過我們現(xiàn)在只是知道了大體流程,還有許多疑問沒有解開。比如路由選擇是怎樣的?OkHttp 中的連接池是如何實現(xiàn)的?連接的建立過程是如何實現(xiàn)的?等等疑問都還沒有解開,我們將在后續(xù)文章中介紹到。
判斷連接是否可用
我們接著看看 RealConnection.isHealthy 的實現(xiàn),看看它是如何判斷一個連接是否可用的:
/**
* Returns true if this connection is ready to host new streams.
*/
public boolean isHealthy(boolean doExtensiveChecks) {
// 判斷Socket是否可用
if (socket.isClosed() || socket.isInputShutdown() || socket.isOutputShutdown()) {
return false;
}
// 如果包含Http2的連接,檢測是否shutdown
if (http2Connection != null) {
return !http2Connection.isShutdown();
}
if (doExtensiveChecks) {
try {
int readTimeout = socket.getSoTimeout();
try {
// 設(shè)置一秒延時,檢測Stream是否枯竭,若枯竭則該連接不可用
socket.setSoTimeout(1);
if (source.exhausted()) {
return false; // Stream is exhausted; socket is closed.
}
return true;
} finally {
socket.setSoTimeout(readTimeout);
}
} catch (SocketTimeoutException ignored) {
// Read timed out; socket is good.
} catch (IOException e) {
return false; // Couldn't read; socket is closed.
}
}
return true;
}
可以看到,上面主要是對 Socket、HTTP2連接、Stream 進行了檢測,從而判斷該連接是否可用。
什么是 Exchange
現(xiàn)在我們已經(jīng)知道了連接究竟是如何尋找到的,現(xiàn)在讓我們回到 Exchange 類,讓我們研究一下究竟什么是 Exchange,它是用來做什么的。
讓我們先從它的 JavaDoc 看到:
Transmits a single HTTP request and a response pair. This layers connection management and events on {@link ExchangeCodec}, which handles the actual I/O.
可以看到,這里講到,Exchange 是一個用于發(fā)送 HTTP 請求和讀取響應的類,而真正進行 I/O 的類是它的一個成員變量——ExchangeCodec 。在 Exchange 中暴露了許多對 Stream 進行讀寫的方法,如 writeRequestHeaders、createRequestBody 等等,在 CallServerInterceptor 中就會通過 Exchange ,向服務器發(fā)起請求,并讀取其所返回的響應。
什么是 ExchangeCodec
讓我們看看 ExchangeCodec 又是什么:
/**
* Encodes HTTP requests and decodes HTTP responses.
*/
public interface ExchangeCodec {
int DISCARD_STREAM_TIMEOUT_MILLIS = 100;
RealConnection connection();
Sink createRequestBody(Request request, long contentLength) throws IOException;
void writeRequestHeaders(Request request) throws IOException;
void flushRequest() throws IOException;
void finishRequest() throws IOException;
@Nullable
Response.Builder readResponseHeaders(boolean expectContinue) throws IOException;
long reportedContentLength(Response response) throws IOException;
Source openResponseBodySource(Response response) throws IOException;
Headers trailers() throws IOException;
void cancel();
}
可以看到,它僅僅是個接口,根據(jù)上面的 JavaDoc 可以看出,它的作用是用于對請求進行編碼,以及對響應進行解碼。
我們看看它有哪些實現(xiàn)類,通過 Android Studio 我們可以很容易找到它有如下兩個實現(xiàn)類:
Http1ExchangeCodecHttp2ExchangeCodec
看得出來,OkHttp 采用了一種非常典型的面向接口編程,將對 Http 請求的編碼及解碼等功能抽象成了接口,再通過不同的實現(xiàn)類來實現(xiàn)將相同的 Request 對象編碼為 HTTP1 及 HTTP2 的格式的數(shù)據(jù),將 HTTP1 及 HTTP2 格式的數(shù)據(jù)解碼為相同格式的 Response 對象。通過這樣的一種面向接口的設(shè)計,大大地提高了 OkHttp 的可擴展性,可以通過實現(xiàn)接口的形式對更多的應用層進行支持。
什么是 Transmitter
接下來我們看看貫穿了我們整個請求流程的 Transimitter,究竟是一個用來做什么的類。我們先從 JavaDoc 入手:
Bridge between OkHttp's application and network layers. This class exposes high-level application
layer primitives: connections, requests, responses, and streams.
根據(jù)上面的注釋可以看出,Transmitter 是一座 OkHttp 中應用層與網(wǎng)絡(luò)層溝通的橋梁。就像我們之前的連接創(chuàng)建,就是在應用層通過了 transmitter.newExchange 方法來通知網(wǎng)絡(luò)層進行 Exchange 的獲取,并返回給應用層。那么 Transmitter 是什么時候創(chuàng)建的呢?
static RealCall newRealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {
// Safely publish the Call instance to the EventListener.
RealCall call = new RealCall(client, originalRequest, forWebSocket);
call.transmitter = new Transmitter(client, call);
return call;
}
可以看到,它是在 RealCall 被創(chuàng)建的時候進行創(chuàng)建的,也就是說一個 Trasmitter 對應了一個 Call。這個 Call 在應用層通過 trasnmitter 與它的網(wǎng)絡(luò)層進行通信。