Okhttp流程解讀
調(diào)用代碼
//1生成一個(gè)okhttpclient對(duì)象
OkHttpClient client = new OkHttpClient();
//2利用url生成一個(gè)請(qǐng)求
Request request = new Request.Builder().url(url).build();
//3生成一個(gè)Call對(duì)象
final Call call = client.newCall(request);
//4從服務(wù)器獲取返回結(jié)果
call.enqueue(new Callback() {
@Override
public void onFailure(Call call, final IOException e) {
}
@Override
public void onResponse(Call call, final Response response) throws IOException {
}
});
//生成一個(gè)默認(rèn)的builder
public OkHttpClient() {
this(new Builder());
}
//初始化里面的對(duì)象
public Builder() {
dispatcher = new Dispatcher();
protocols = DEFAULT_PROTOCOLS;
connectionSpecs = DEFAULT_CONNECTION_SPECS;
eventListenerFactory = EventListener.factory(EventListener.NONE);
proxySelector = ProxySelector.getDefault();
cookieJar = CookieJar.NO_COOKIES;
socketFactory = SocketFactory.getDefault();
hostnameVerifier = OkHostnameVerifier.INSTANCE;
certificatePinner = CertificatePinner.DEFAULT;
proxyAuthenticator = Authenticator.NONE;
authenticator = Authenticator.NONE;
connectionPool = new ConnectionPool();
dns = Dns.SYSTEM;
followSslRedirects = true;
followRedirects = true;
retryOnConnectionFailure = true;
connectTimeout = 10_000;
readTimeout = 10_000;
writeTimeout = 10_000;
pingInterval = 0;
}
Request對(duì)象生成也是構(gòu)造者模式
//默認(rèn)GET方法
public Builder() {
this.method = "GET";
this.headers = new Headers.Builder();
}
//封裝成url 封裝到HttpUrl
public Builder url(String url) {
if (url == null) throw new NullPointerException("url == null");
// Silently replace web socket URLs with HTTP URLs.
if (url.regionMatches(true, 0, "ws:", 0, 3)) {
url = "http:" + url.substring(3);
} else if (url.regionMatches(true, 0, "wss:", 0, 4)) {
url = "https:" + url.substring(4);
}
HttpUrl parsed = HttpUrl.parse(url);
if (parsed == null) throw new IllegalArgumentException("unexpected url: " + url);
return url(parsed);
}
//生成一個(gè)RealCall對(duì)象
@Override
public Call newCall(Request request) {
return new RealCall(this, request, false /* for web socket */);
}
RealCall 里面的 enqueue方法,調(diào)用的OkHttpClient里面的Dispatcher對(duì)象方法
@Override
public void enqueue(Callback responseCallback) {
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
captureCallStackTrace();
//封裝到AsyncCall里面
client.dispatcher().enqueue(new AsyncCall(responseCallback));
}
Dispatcher 的enqueued方法,runningSyncCalls是個(gè)ArrayDeque集合。是個(gè)雙端列隊(duì)。
synchronized void enqueue(AsyncCall call) {
//如果集合大小小于64
if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
//添加到集合
runningAsyncCalls.add(call);
//放到線程池執(zhí)行
executorService().execute(call);
} else {
readyAsyncCalls.add(call);
}
}
AsyncCall繼承了NamedRunnable類,NamedRunnable類實(shí)現(xiàn)了Runnable接口,并且在run方法里面會(huì)調(diào)用execute方法。因此看execute方法
final class AsyncCall extends NamedRunnable {
@Override
protected void execute() {
boolean signalledCallback = false;
try {
//調(diào)用了getResponseWithInterceptorChain方法。
Response response = getResponseWithInterceptorChain();
if (retryAndFollowUpInterceptor.isCanceled()) {
signalledCallback = true;
responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
} else {
signalledCallback = true;
responseCallback.onResponse(RealCall.this, response);
}
} catch (IOException e) {
if (signalledCallback) {
// Do not signal the callback twice!
Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
} else {
responseCallback.onFailure(RealCall.this, e);
}
} finally {
client.dispatcher().finished(this);
}
}
}
調(diào)用了getResponseWithInterceptorChain方法,getResponseWithInterceptorChain里面添加了攔截器。然后調(diào)用RealInterceptorChain的proceed方法,
Response getResponseWithInterceptorChain() throws IOException {
// Build a full stack of interceptors.
List<Interceptor> interceptors = new ArrayList<>();
interceptors.addAll(client.interceptors());
interceptors.add(retryAndFollowUpInterceptor);
interceptors.add(new BridgeInterceptor(client.cookieJar()));
interceptors.add(new CacheInterceptor(client.internalCache()));
interceptors.add(new ConnectInterceptor(client));
if (!forWebSocket) {
interceptors.addAll(client.networkInterceptors());
}
interceptors.add(new CallServerInterceptor(forWebSocket));
Interceptor.Chain chain = new RealInterceptorChain(
interceptors, null, null, null, 0, originalRequest);
return chain.proceed(originalRequest);
}
public Response proceed(Request request, StreamAllocation streamAllocation, HttpCodec httpCodec,
RealConnection connection) throws IOException {
if (index >= interceptors.size()) throw new AssertionError();
calls++;
...
RealInterceptorChain next = new RealInterceptorChain(
interceptors, streamAllocation, httpCodec, connection, index + 1, request);
Interceptor interceptor = interceptors.get(index);
//執(zhí)行攔截器里面的方法
Response response = interceptor.intercept(next);
...
return response;
}
因?yàn)槲覀冏约簺]添加攔截器。而getResponseWithInterceptorChain方法里面添加了很多個(gè)攔截器,攔截器會(huì)依次一個(gè)一個(gè)執(zhí)行。
此時(shí)這個(gè)index=0,首先執(zhí)行的是第一個(gè)也就是retryAndFollowUpInterceptor的intercept方法 ,方法一點(diǎn)長,我們暫時(shí)省略流程無關(guān)。這里調(diào)用了傳進(jìn)來的Chain 對(duì)象的proceed方法
@Override
public Response intercept(Chain chain) throws IOException {
Request request = chain.request();
streamAllocation = new StreamAllocation(
client.connectionPool(), createAddress(request.url()), callStackTrace);
int followUpCount = 0;
Response priorResponse = null;
while (true) {
...
Response response = null;
boolean releaseConnection = true;
try {
//執(zhí)行了傳進(jìn)來的對(duì)象的proceed方法
response = ((RealInterceptorChain) chain).proceed(request, streamAllocation, null, null);
releaseConnection = false;
...
}
}
又到這里了,注意。這個(gè)時(shí)候index=1了。又生成了RealInterceptorChain對(duì)象。并且傳入index+1。
public Response proceed(Request request, StreamAllocation streamAllocation, HttpCodec httpCodec,
RealConnection connection) throws IOException {
if (index >= interceptors.size()) throw new AssertionError();
calls++;
...
RealInterceptorChain next = new RealInterceptorChain(
interceptors, streamAllocation, httpCodec, connection, index + 1, request);
Interceptor interceptor = interceptors.get(index);
//執(zhí)行攔截器里面的方法
Response response = interceptor.intercept(next);
...
return response;
}
再看前面添加攔截器的順序。這個(gè)時(shí)候是BridgeInterceptor攔截器對(duì)象
interceptors.addAll(client.interceptors());
interceptors.add(retryAndFollowUpInterceptor);
interceptors.add(new BridgeInterceptor(client.cookieJar()));
interceptors.add(new CacheInterceptor(client.internalCache()));
interceptors.add(new ConnectInterceptor(client));
if (!forWebSocket) {
interceptors.addAll(client.networkInterceptors());
}
interceptors.add(new CallServerInterceptor(forWebSocket));
看BridgeInterceptor的intercept方法,這個(gè)主要是設(shè)置了請(qǐng)求的頭部信息,繼續(xù)執(zhí)行后面的攔截器方法。
@Override
public Response intercept(Chain chain) throws IOException {
Request userRequest = chain.request();
Request.Builder requestBuilder = userRequest.newBuilder();
RequestBody body = userRequest.body();
if (body != null) {
MediaType contentType = body.contentType();
if (contentType != null) {
requestBuilder.header("Content-Type", contentType.toString());
}
long contentLength = body.contentLength();
if (contentLength != -1) {
requestBuilder.header("Content-Length", Long.toString(contentLength));
requestBuilder.removeHeader("Transfer-Encoding");
} else {
requestBuilder.header("Transfer-Encoding", "chunked");
requestBuilder.removeHeader("Content-Length");
}
}
if (userRequest.header("Host") == null) {
requestBuilder.header("Host", hostHeader(userRequest.url(), false));
}
if (userRequest.header("Connection") == null) {
requestBuilder.header("Connection", "Keep-Alive");
}
// If we add an "Accept-Encoding: gzip" header field we're responsible for also decompressing
// the transfer stream.
boolean transparentGzip = false;
if (userRequest.header("Accept-Encoding") == null && userRequest.header("Range") == null) {
transparentGzip = true;
requestBuilder.header("Accept-Encoding", "gzip");
}
List<Cookie> cookies = cookieJar.loadForRequest(userRequest.url());
if (!cookies.isEmpty()) {
requestBuilder.header("Cookie", cookieHeader(cookies));
}
if (userRequest.header("User-Agent") == null) {
requestBuilder.header("User-Agent", Version.userAgent());
}
Response networkResponse = chain.proceed(requestBuilder.build());
HttpHeaders.receiveHeaders(cookieJar, userRequest.url(), networkResponse.headers());
Response.Builder responseBuilder = networkResponse.newBuilder()
.request(userRequest);
if (transparentGzip
&& "gzip".equalsIgnoreCase(networkResponse.header("Content-Encoding"))
&& HttpHeaders.hasBody(networkResponse)) {
GzipSource responseBody = new GzipSource(networkResponse.body().source());
Headers strippedHeaders = networkResponse.headers().newBuilder()
.removeAll("Content-Encoding")
.removeAll("Content-Length")
.build();
responseBuilder.headers(strippedHeaders);
responseBuilder.body(new RealResponseBody(strippedHeaders, Okio.buffer(responseBody)));
}
return responseBuilder.build();
}
現(xiàn)在是CacheInterceptor的intercept,這里的請(qǐng)求是直接調(diào)用后面攔截器處理方法。繼續(xù)看后面的攔截器。
@Override
public Response intercept(Chain chain) throws IOException {
Request request = chain.request();
okhttp3.Response originalResponse = chain.proceed(chain.request());
String cacheControl = originalResponse.header("Cache-Control");
//String cacheControl = request.cacheControl().toString();
LogWraper.d("Novate", maxStaleOnline + "s load cache:" + cacheControl);
if (TextUtils.isEmpty(cacheControl) || cacheControl.contains("no-store") || cacheControl.contains("no-cache") ||
cacheControl.contains("must-revalidate") || cacheControl.contains("max-age") || cacheControl.contains("max-stale")) {
return originalResponse.newBuilder()
.removeHeader("Pragma")
.removeHeader("Cache-Control")
.header("Cache-Control", "public, max-age=" + maxStale)
.build();
} else {
return originalResponse;
}
}
ConnectInterceptor的intercept
@Override
public Response intercept(Chain chain) throws IOException {
RealInterceptorChain realChain = (RealInterceptorChain) chain;
Request request = realChain.request();
StreamAllocation streamAllocation = realChain.streamAllocation();
// We need the network to satisfy this request. Possibly for validating a conditional GET.
boolean doExtensiveHealthChecks = !request.method().equals("GET");
HttpCodec httpCodec = streamAllocation.newStream(client, doExtensiveHealthChecks);
RealConnection connection = streamAllocation.connection();
return realChain.proceed(request, streamAllocation, httpCodec, connection);
}
在streamAllocation對(duì)象的newStream方法里面建起和服務(wù)器的連接,newStream方法調(diào)用了findHealthyConnection方法,返回的是Http1Codec或者Http2Codec對(duì)象。對(duì)應(yīng)的就是HTTP/1.1或者HTTP/2協(xié)議
//返回的是Http1Codec或者Http2Codec
public HttpCodec newStream(OkHttpClient client, boolean doExtensiveHealthChecks) {
....
try {
RealConnection resultConnection = findHealthyConnection(connectTimeout, readTimeout,
writeTimeout, connectionRetryEnabled, doExtensiveHealthChecks);
...
} catch (IOException e) {
throw new RouteException(e);
}
}
現(xiàn)在主要看連接的部分。
findHealthyConnection方法調(diào)用了findConnection方法
private RealConnection findHealthyConnection(int connectTimeout, int readTimeout,
int writeTimeout, boolean connectionRetryEnabled, boolean doExtensiveHealthChecks)
throws IOException {
while (true) {
RealConnection candidate = findConnection(connectTimeout, readTimeout, writeTimeout,
connectionRetryEnabled);
...
}
findConnection方法會(huì)去連接池connectionPool中拿,沒拿到。則去連接。連接的方法是通過socket去連接的。連接成功把數(shù)據(jù)讀取,然后放入連接池。這樣做到復(fù)用連接。
private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout,
boolean connectionRetryEnabled) throws IOException {
Route selectedRoute;
synchronized (connectionPool) {
...
// Attempt to get a connection from the pool.
//嘗試從連接池中獲取。有直接返回
Internal.instance.get(connectionPool, address, this, null);
if (connection != null) {
return connection;
}
selectedRoute = route;
}
//...new 一個(gè)
result = new RealConnection(connectionPool, selectedRoute);
acquire(result);
// Do TCP + TLS handshakes. This is a blocking operation.
//去連接
result.connect(connectTimeout, readTimeout, writeTimeout, connectionRetryEnabled);
routeDatabase().connected(result.route());
Socket socket = null;
synchronized (connectionPool) {
// Pool the connection.
//把連接放入連接池
Internal.instance.put(connectionPool, result);
...
}
closeQuietly(socket);
return result;
}
connect方法.省略了部分代碼。最后都會(huì)調(diào)用connectSocket方法。利用socket去連接
public void connect(
int connectTimeout, int readTimeout, int writeTimeout, boolean connectionRetryEnabled) {
...
while (true) {
try {
if (route.requiresTunnel()) {
connectTunnel(connectTimeout, readTimeout, writeTimeout);
} else {
connectSocket(connectTimeout, readTimeout);
}
establishProtocol(connectionSpecSelector);
...
}
連接成功后,利用OkIO把數(shù)據(jù)讀取
private void connectSocket(int connectTimeout, int readTimeout) throws IOException {
Proxy proxy = route.proxy();
Address address = route.address();
rawSocket = proxy.type() == Proxy.Type.DIRECT || proxy.type() == Proxy.Type.HTTP
? address.socketFactory().createSocket()
: new Socket(proxy);
rawSocket.setSoTimeout(readTimeout);
try {
Platform.get().connectSocket(rawSocket, route.socketAddress(), connectTimeout);
} catch (ConnectException e) {
ConnectException ce = new ConnectException("Failed to connect to " + route.socketAddress());
ce.initCause(e);
throw ce;
}
....
try {
source = Okio.buffer(Okio.source(rawSocket));
sink = Okio.buffer(Okio.sink(rawSocket));
} catch (NullPointerException npe) {
if (NPE_THROW_WITH_NULL.equals(npe.getMessage())) {
throw new IOException(npe);
}
}
}
返回前面,ConnectInterceptor的intercept方法調(diào)用后,已經(jīng)建立好連接,并且讀取了數(shù)據(jù),主要是的工作在RealConnection類中,并且還兼容了HTTP1和HTTP2協(xié)議。
繼續(xù)看后面的攔截器。由于我們沒有添加networkInterceptor.因此現(xiàn)在到CallServerInterceptor執(zhí)行了。
interceptors.addAll(client.interceptors());
interceptors.add(retryAndFollowUpInterceptor);
interceptors.add(new BridgeInterceptor(client.cookieJar()));
interceptors.add(new CacheInterceptor(client.internalCache()));
interceptors.add(new ConnectInterceptor(client));
if (!forWebSocket) {
interceptors.addAll(client.networkInterceptors());
}
interceptors.add(new CallServerInterceptor(forWebSocket));
CallServerInterceptor的intercept方法
@Override
public Response intercept(Chain chain) throws IOException {
RealInterceptorChain realChain = (RealInterceptorChain) chain;
HttpCodec httpCodec = realChain.httpStream();
StreamAllocation streamAllocation = realChain.streamAllocation();
RealConnection connection = (RealConnection) realChain.connection();
Request request = realChain.request();
long sentRequestMillis = System.currentTimeMillis();
httpCodec.writeRequestHeaders(request);
Response.Builder responseBuilder = null;
if (HttpMethod.permitsRequestBody(request.method()) && request.body() != null) {
if (responseBuilder == null) {
// Write the request body if the "Expect: 100-continue" expectation was met.
Sink requestBodyOut = httpCodec.createRequestBody(request, request.body().contentLength());
BufferedSink bufferedRequestBody = Okio.buffer(requestBodyOut);
//數(shù)據(jù)寫入request的body里面
request.body().writeTo(bufferedRequestBody);
bufferedRequestBody.close();
} else if (!connection.isMultiplexed()) {
// If the "Expect: 100-continue" expectation wasn't met, prevent the HTTP/1 connection from
// being reused. Otherwise we're still obligated to transmit the request body to leave the
// connection in a consistent state.
streamAllocation.noNewStreams();
}
}
httpCodec.finishRequest();
if (responseBuilder == null) {
responseBuilder = httpCodec.readResponseHeaders(false);
}
Response response = responseBuilder
.request(request)
.handshake(streamAllocation.connection().handshake())
.sentRequestAtMillis(sentRequestMillis)
.receivedResponseAtMillis(System.currentTimeMillis())
.build();
....
return response;
}
RetryAndFollowUpInterceptor->BridgeInterceptor->CacheInterceptor->ConnectInterceptor->CallServerInterceptor
這么一長串的調(diào)用完后,現(xiàn)再往前查看方法的返回。
CallServerInterceptor 已經(jīng)把獲取到的數(shù)據(jù)寫入request的body里面。然后request也放到了response,返回給調(diào)用者。
ConnectInterceptor對(duì)response沒做什么處理。直接返回
@Override
public Response intercept(Chain chain) throws IOException {
RealInterceptorChain realChain = (RealInterceptorChain) chain;
Request request = realChain.request();
StreamAllocation streamAllocation = realChain.streamAllocation();
// We need the network to satisfy this request. Possibly for validating a conditional GET.
boolean doExtensiveHealthChecks = !request.method().equals("GET");
HttpCodec httpCodec = streamAllocation.newStream(client, doExtensiveHealthChecks);
RealConnection connection = streamAllocation.connection();
return realChain.proceed(request, streamAllocation, httpCodec, connection);
}
CacheInterceptor根據(jù)是否有頭部Cache-Control。做了判斷。不過對(duì)于服務(wù)器返回的數(shù)據(jù)也沒做處理
@Override
public Response intercept(Chain chain) throws IOException {
Request request = chain.request();
okhttp3.Response originalResponse = chain.proceed(chain.request());
String cacheControl = originalResponse.header("Cache-Control");
//String cacheControl = request.cacheControl().toString();
LogWraper.d("Novate", maxStaleOnline + "s load cache:" + cacheControl);
if (TextUtils.isEmpty(cacheControl) || cacheControl.contains("no-store") || cacheControl.contains("no-cache") ||
cacheControl.contains("must-revalidate") || cacheControl.contains("max-age") || cacheControl.contains("max-stale")) {
return originalResponse.newBuilder()
.removeHeader("Pragma")
.removeHeader("Cache-Control")
.header("Cache-Control", "public, max-age=" + maxStale)
.build();
} else {
return originalResponse;
}
}
BridgeInterceptor的intercept方法。如果Content-Encoding是gzip的話。就做一次處理。一般不會(huì)走if里面,繼續(xù)返回
@Override
public Response intercept(Chain chain) throws IOException {
Request userRequest = chain.request();
Request.Builder requestBuilder = userRequest.newBuilder();
...
后一個(gè)攔截器處理的結(jié)果
Response networkResponse = chain.proceed(requestBuilder.build());
HttpHeaders.receiveHeaders(cookieJar, userRequest.url(), networkResponse.headers());
Response.Builder responseBuilder = networkResponse.newBuilder()
.request(userRequest);
if (transparentGzip
&& "gzip".equalsIgnoreCase(networkResponse.header("Content-Encoding"))
&& HttpHeaders.hasBody(networkResponse)) {
GzipSource responseBody = new GzipSource(networkResponse.body().source());
Headers strippedHeaders = networkResponse.headers().newBuilder()
.removeAll("Content-Encoding")
.removeAll("Content-Length")
.build();
responseBuilder.headers(strippedHeaders);
responseBuilder.body(new RealResponseBody(strippedHeaders, Okio.buffer(responseBody)));
}
return responseBuilder.build();
}
RetryAndFollowUpInterceptor 的intercept方法。
@Override
public Response intercept(Chain chain) throws IOException {
Request request = chain.request();
streamAllocation = new StreamAllocation(
client.connectionPool(), createAddress(request.url()), callStackTrace);
int followUpCount = 0;
Response priorResponse = null;
while (true) {
if (canceled) {
streamAllocation.release();
throw new IOException("Canceled");
}
Response response = null;
boolean releaseConnection = true;
try {
//取到后一個(gè)攔截器處理的值
response = ((RealInterceptorChain) chain).proceed(request, streamAllocation, null, null);
releaseConnection = false;
} ...
Request followUp = followUpRequest(response);
//如果沒有重定向的。繼續(xù)返回?cái)?shù)據(jù)
if (followUp == null) {
if (!forWebSocket) {
streamAllocation.release();
}
return response;
}
...
}
繼續(xù)看AsynCall的execute方法。所有的攔截器走完后,把respose返回給調(diào)用者。
@Override
protected void execute() {
boolean signalledCallback = false;
try {
//所有攔截器處理完后的數(shù)據(jù)
Response response = getResponseWithInterceptorChain();
if (retryAndFollowUpInterceptor.isCanceled()) {
signalledCallback = true;
responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
} else {
signalledCallback = true;
//回調(diào)給調(diào)用者
responseCallback.onResponse(RealCall.this, response);
}
} catch (IOException e) {
if (signalledCallback) {
// Do not signal the callback twice!
Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
} else {
responseCallback.onFailure(RealCall.this, e);
}
} finally {
client.dispatcher().finished(this);
}
}
這個(gè)responseCallback也就是我們調(diào)用enqueue里面?zhèn)鞯?code>Callback。
call.enqueue(new Callback() {
@Override
public void onFailure(Call call, final IOException e) {
}
@Override
public void onResponse(Call call, final Response response) throws IOException {
}
});
小結(jié):
1 當(dāng)我們調(diào)用enqueue方法時(shí),生成了RealCall,并把我們的回調(diào)放入該對(duì)象。然后把該對(duì)象添加到Dispatcherl類的一個(gè)活動(dòng)隊(duì)列里面,并且在線程池中執(zhí)行。
2 執(zhí)行時(shí)會(huì)run調(diào)用execute方法。
3 execute會(huì)調(diào)用我們自定義的攔截器,RetryAndFollowUpInterceptor->BridgeInterceptor->CacheInterceptor->ConnectInterceptor->CallServerInterceptor的intercept方法。
4 在ConnectInterceptor的intercept方法建立連接,并且讀取數(shù)據(jù)。然后數(shù)據(jù)一層一層往回調(diào)
5 最后在execute方法回調(diào)給調(diào)用者。此時(shí)在子線程。不在UI線程。
攔截器的設(shè)計(jì)也比較巧妙,通過把所有攔截器放入一個(gè)集合,然后控制索引值的改變,依次執(zhí)行集合里面的攔截器的方法,并且通過繼承同樣的接口依次將處理的結(jié)果往回調(diào)