OkHttp源碼(三)

簡單的梳理一下五個攔截器的邏輯:

  • RetryAndFollowUpInterceptior
  • BridgeInterceptor
  • CacheInterceptor
  • ConnectionInterceptor
  • CallServerInterceptor

RetryAndFollowUpInterceptor

這是OkHttp代碼中的第一個攔截器,也就是說除了自定義的應用攔截器外第一個處理request的攔截器,從名字來猜測它主要負責請求的重試操作和重定向操作。攔截器的重點都在于其intercept()的實現(xiàn),所以來看RetryAndFollowUpInterceptor的intercept()是如何實現(xiàn)的。

初始

Request request = chain.request();
RealInterceptorChain realChain = (RealInterceptorChain) chain;
Call call = realChain.call();
EventListener eventListener = realChain.eventListener();

從參數(shù)攔截器連中拿到request,將Chain強制準換成RealInterceptorChain,獲取到call對象和監(jiān)聽回調,這個realChain是之后調用proceed()的。

StreamAllocation streamAllocation = new StreamAllocation(client.connectionPool(),
        createAddress(request.url()), call, eventListener, callStackTrace);
this.streamAllocation = streamAllocation;

這里創(chuàng)建了一個StreamAllocation對象,在 RetryAndFollowUpInterceptor中只是做了初始工作,并沒有用到它,在之后會有攔截器用到。

重試循環(huán)

一個while(true)的循環(huán),先看下面的代碼塊。

    boolean releaseConnection = true;
    try {
        response = realChain.proceed(request, streamAllocation, null, null);
        releaseConnection = false;
    } catch (RouteException e) {
        // The attempt to connect via a route failed. The request will not have been sent.
        if (!recover(e.getLastConnectException(), streamAllocation, false, request)) {
          throw e.getLastConnectException();
        }
        releaseConnection = false;
        continue;
    } catch (IOException e) {
        // An attempt to communicate with a server failed. The request may have been sent.
        boolean requestSendStarted = !(e instanceof ConnectionShutdownException);
        if (!recover(e, streamAllocation, requestSendStarted, request)) throw e;
        releaseConnection = false;
        continue;
    } finally {
        // We're throwing an unchecked exception. Release any resources.
        if (releaseConnection) {
          streamAllocation.streamFailed(null);
          streamAllocation.release();
        }
    }

releaseConnection先設置為true,鏈接是釋放的。try塊調用了realChain的proceed(),調用了下一個攔截器,也傳入了初始化過的streamAllocation對象。因為在之后的攔截器鏈中會有鏈接網(wǎng)絡的工作,所以此時設置releaseConnection為false。接下來是對異常的捕獲,捕獲到異常說明在攔截器調用中的某個地方出現(xiàn)了問題導致,connection未釋放,設置為false之后進入下一次循環(huán)。

但是在跳轉到下一次循環(huán)之前還會執(zhí)行finally代碼塊,我們來分析一下。如果在proced()中出現(xiàn)異常,則不會執(zhí)行后一句releaseConnection = false;而在捕獲異常中,如果是RouteException或IOException的話,操作之后就會賦值false,所以finally中主要是針對proceed()中出現(xiàn)其他異常的情況處理。

if (++followUpCount > MAX_FOLLOW_UPS) {
    streamAllocation.release();
    throw new ProtocolException("Too many follow-up requests: " + followUpCount);
}

MAX_FOLLOW_UPS這個常量定為20,循環(huán)每重試一次followUpCount就會自增,所以重試是有次數(shù)限制的。

BridgeInterceptor

BridgeInterceptor主要是負責添加頭部的任務,和對返回來的response進行解壓的工作。核心的intercept()如下,我們逐步分析。

下面代碼主要為request添加Content-Type、Content-Length或Transfer-Encoding,從這里我們也可以發(fā)現(xiàn)其實這些頭信息是不需要我們手動添加的,BridgeInterceptor都會根據(jù)body獲取這些信息自動幫我們添加,就算我們自己添加了,它也會幫我們覆蓋掉。

    if (body != null) {
      MediaType contentType = body.contentType();
      if (contentType != null) {
        requestBuilder.header("Content-Type", contentType.toString());
      }

      long contentLength = body.contentLength();
      if (contentLength != -1) {
        requestBuilder.header("Content-Length", Long.toString(contentLength));
        requestBuilder.removeHeader("Transfer-Encoding");
      } else {
        requestBuilder.header("Transfer-Encoding", "chunked");
        requestBuilder.removeHeader("Content-Length");
      }
    }

下面這段代碼,如果我們沒有手動添加Host、Connection和User-Agent字段,OkHttp會幫我們添加默認,也就是說不想之前的Content-Type等會幫我們覆蓋,就算寫錯了也不影響,這些字段如果寫了就不能寫錯。

if (userRequest.header("Host") == null) {
    requestBuilder.header("Host", hostHeader(userRequest.url(), false));
}

if (userRequest.header("Connection") == null) {
    requestBuilder.header("Connection", "Keep-Alive");
}
if (userRequest.header("User-Agent") == null) {
    requestBuilder.header("User-Agent", Version.userAgent());
}

默認支持gzip壓縮

// If we add an "Accept-Encoding: gzip" header field we're responsible for also decompressing
// the transfer stream.
boolean transparentGzip = false;
if (userRequest.header("Accept-Encoding") == null && userRequest.header("Range") == null) {
    transparentGzip = true;
    requestBuilder.header("Accept-Encoding", "gzip");
}

再來看關于cookie的部分

List<Cookie> cookies = cookieJar.loadForRequest(userRequest.url());
if (!cookies.isEmpty()) {
    requestBuilder.header("Cookie", cookieHeader(cookies));
}
  /** Returns a 'Cookie' HTTP request header with all cookies, like {@code a=b; c=d}. */
  private String cookieHeader(List<Cookie> cookies) {
    StringBuilder cookieHeader = new StringBuilder();
    for (int i = 0, size = cookies.size(); i < size; i++) {
      if (i > 0) {
        cookieHeader.append("; ");
      }
      Cookie cookie = cookies.get(i);
      cookieHeader.append(cookie.name()).append('=').append(cookie.value());
    }
    return cookieHeader.toString();
  }

cookieJar一直可以追蹤到client的Builder中有一個默認賦值,cookieJar = CookieJar.NO_COOKIES;,所以如果在client初始時沒有傳入cookieJar此時這里給cookies賦值為一個空集合,就不會默認添加Cookie的header,如果初始傳入過cookieJar,這里就會將cookies中的cookie寫到header中。

CookieJar NO_COOKIES = new CookieJar() {
    @Override public void saveFromResponse(HttpUrl url, List<Cookie> cookies) {
    }

    @Override public List<Cookie> loadForRequest(HttpUrl url) {
      return Collections.emptyList();
    }
  };

再之后就是下一個攔截器的連接了。

Response networkResponse = chain.proceed(requestBuilder.build());

在獲取到response后,如果response的header中有有cookie,那么就將它們存到cookieJar。

HttpHeaders.receiveHeaders(cookieJar, userRequest.url(), networkResponse.headers());
  public static void receiveHeaders(CookieJar cookieJar, HttpUrl url, Headers headers) {
    if (cookieJar == CookieJar.NO_COOKIES) return;

    List<Cookie> cookies = Cookie.parseAll(url, headers);
    if (cookies.isEmpty()) return;

    cookieJar.saveFromResponse(url, cookies);
  }

下面是對response的解壓工作,將流轉化成解壓過直接能使用的response,然后對header進行了一些處理構建了一個response返回給上一個攔截器。

    Response.Builder responseBuilder = networkResponse.newBuilder()
        .request(userRequest);

    if (transparentGzip
        && "gzip".equalsIgnoreCase(networkResponse.header("Content-Encoding"))
        && HttpHeaders.hasBody(networkResponse)) {
      GzipSource responseBody = new GzipSource(networkResponse.body().source());
      Headers strippedHeaders = networkResponse.headers().newBuilder()
          .removeAll("Content-Encoding")
          .removeAll("Content-Length")
          .build();
      responseBuilder.headers(strippedHeaders);
      String contentType = networkResponse.header("Content-Type");
      responseBuilder.body(new RealResponseBody(contentType, -1L, Okio.buffer(responseBody)));
    }
    return responseBuilder.build();

CacheInterceptor

名字來看就是處理緩存的攔截器了??梢钥吹紺acheInterceptor唯一一個成員:

final InternalCache cache;

InternalCache是一個接口,注釋有這樣一段話:

/**
 * OkHttp's internal cache interface. Applications shouldn't implement this: instead use {@link
 * okhttp3.Cache}.
 */

點進Cache類發(fā)現(xiàn)里面有InternalCache的實現(xiàn),而實現(xiàn)都是直接調用外部類的方法的。

  final InternalCache internalCache = new InternalCache() {
    @Override public Response get(Request request) throws IOException {
      return Cache.this.get(request);
    }

    @Override public CacheRequest put(Response response) throws IOException {
      return Cache.this.put(response);
    }
    ...
  };

所以我們先看看Cache類是什么。

Cache類

put()

看到這一個判斷,OkHttp不緩存非GET方法的響應。

if (!requestMethod.equals("GET")) {
      // Don't cache non-GET responses. We're technically allowed to cache
      // HEAD requests and some POST requests, but the complexity of doing
      // so is high and the benefit is low.
      return null;
}

創(chuàng)建了一個Entry對象,看到Entry成員就差不多明白其實OkHttp就是把所有屬性封裝成了Entry類,方便進行緩存。

Entry entry = new Entry(response);
private static final class Entry {
    /** Synthetic response header: the local time when the request was sent. */
    private static final String SENT_MILLIS = Platform.get().getPrefix() + "-Sent-Millis";
    /** Synthetic response header: the local time when the response was received. */
    private static final String RECEIVED_MILLIS = Platform.get().getPrefix() + "-Received-Millis";
    private final String url;
    private final Headers varyHeaders;
    private final String requestMethod;
    private final Protocol protocol;
    private final int code;
    private final String message;
    private final Headers responseHeaders;
    private final @Nullable Handshake handshake;
    ...

創(chuàng)建了DiskLruCache的Editor對象,猜測OkHttp的緩存寫入工作都是交給它來實現(xiàn)的。

DiskLruCache.Editor editor = null;

通過DiskLruCache的edit()方法拿到editor。調用entry的writeTo()緩存了請求和響應等一些信息,到這里,我們發(fā)現(xiàn)還沒有緩存body的信息,那么看最后返回的CacheRequestImpl的實現(xiàn),發(fā)現(xiàn)內部有一個editor和一個body,body的緩存和它有關系,它主要用于返回給CacheInterceptor用來更新和寫入緩存的。

try {
      editor = cache.edit(key(response.request().url()));
      if (editor == null) {
        return null;
      }
      entry.writeTo(editor);
      return new CacheRequestImpl(editor);
    } catch (IOException e) {
      abortQuietly(editor);
      return null;
    }

key()的實現(xiàn),實際就是將請求的url做md5加密處理再得到其十六進制的表示形式。

public static String key(HttpUrl url) {
    return ByteString.encodeUtf8(url.toString()).md5().hex();
}

writeTo()的實現(xiàn),可以看到緩存的內容。
緩存的不僅僅是相應的頭部信息,還包括請求的頭部信息,如果是https的請求還會緩一些握手的信息。

public void writeTo(DiskLruCache.Editor editor) throws IOException {
      BufferedSink sink = Okio.buffer(editor.newSink(ENTRY_METADATA));

      sink.writeUtf8(url)
          .writeByte('\n');
      sink.writeUtf8(requestMethod)
          .writeByte('\n');
      ...
      sink.writeUtf8(new StatusLine(protocol, code, message).toString())
          .writeByte('\n');
      sink.writeDecimalLong(responseHeaders.size() + 2)
          .writeByte('\n');
      ...
      if (isHttps()) {
        sink.writeByte('\n');
        sink.writeUtf8(handshake.cipherSuite().javaName())
            .writeByte('\n');
        ...
      }
      sink.close();
    }
get()

用來從緩存中讀取響應體response的。
拿到url的key值。

String key = key(request.url());

聲明緩存快照對象和Entry對象。

DiskLruCache.Snapshot snapshot;
Entry entry;

通過DiskLruCache的get()拿到快照。

try {
  snapshot = cache.get(key);
  if (snapshot == null) {
    return null;
  }
} catch (IOException e) {
  // Give up because the cache cannot be read.
  return null;
}

創(chuàng)建Entry對象,如果創(chuàng)建不成功就關閉資源。

try {
  entry = new Entry(snapshot.getSource(ENTRY_METADATA));
} catch (IOException e) {
  Util.closeQuietly(snapshot);
  return null;
}

調用entry的response()構建了response對象。

Response response = entry.response(snapshot);

看看response()的實現(xiàn),大部分是從Entry里面拿值,都是在構造Entry的時候就拿到緩存了。這里拿到了body,builder構建了response并返回。

public Response response(DiskLruCache.Snapshot snapshot) {
  String contentType = responseHeaders.get("Content-Type");
  String contentLength = responseHeaders.get("Content-Length");
  Request cacheRequest = new Request.Builder()
      .url(url)
      .method(requestMethod, null)
      .headers(varyHeaders)
      .build();
  return new Response.Builder()
      .request(cacheRequest)
      .protocol(protocol)
      .code(code)
      .message(message)
      .headers(responseHeaders)
      .body(new CacheResponseBody(snapshot, contentType, contentLength))
      .handshake(handshake)
      .sentRequestAtMillis(sentRequestMillis)
      .receivedResponseAtMillis(receivedResponseMillis)
      .build();
}

最后做了一個request和response匹配判斷的工作,并返回了Response。

if (!entry.matches(request, response)) {
  Util.closeQuietly(response.body());
  return null;
}
return response;
public boolean matches(Request request, Response response) {
  return url.equals(request.url().toString())
      && requestMethod.equals(request.method())
      && HttpHeaders.varyMatches(response, varyHeaders, request);
}

CacheInterceptor

還是看intercept()。

先嘗試獲取緩存。

Response cacheCandidate = cache != null
    ? cache.get(chain.request())
    : null;

CacheStrategy緩存策略類,再看get方法。

CacheStrategy strategy = new CacheStrategy.Factory(now, chain.request(), cacheCandidate).get();

邏輯其實在getCandidate()方法中。

public CacheStrategy get() {
  CacheStrategy candidate = getCandidate();
  if (candidate.networkRequest != null && request.cacheControl().onlyIfCached()) {
    // We're forbidden from using the network and the cache is insufficient.
    return new CacheStrategy(null, null);
  }
  return candidate;
}

getCandidate()真的是很長,分了很多種情況來判斷是使用緩存網(wǎng)絡請求還是直接使用緩存好的response,大概看一下實現(xiàn)。

如果沒有response的緩存,那就使用請求。

if (cacheResponse == null) {
    return new CacheStrategy(request, null);
}

如果請求是https的并且沒有握手,那么重新請求。

if (request.isHttps() && cacheResponse.handshake() == null) {
    return new CacheStrategy(request, null);
}

如果response是不該被緩存的,就請求,isCacheable()內部是根據(jù)狀態(tài)碼判斷的。

// If this response shouldn't have been stored, it should never be used
// as a response source. This check should be redundant as long as the
// persistence store is well-behaved and the rules are constant.
if (!isCacheable(cacheResponse, request)) {
return new CacheStrategy(request, null);
}

如果請求指定不使用緩存響應,或者是可選擇的,就重新請求。

CacheControl requestCaching = request.cacheControl();
  if (requestCaching.noCache() || hasConditions(request)) {
    return new CacheStrategy(request, null);
}

是否是不容易影響的,傳入request=null,直接從緩存的response拿數(shù)據(jù)。

CacheControl responseCaching = cacheResponse.cacheControl();
  if (responseCaching.immutable()) {
    return new CacheStrategy(null, cacheResponse);
}

如果response有緩存,并且時間比較近,添加一些頭部信息后返回request=null的策略。

if (!responseCaching.noCache() && ageMillis + minFreshMillis < freshMillis + maxStaleMillis) {
    Response.Builder builder = cacheResponse.newBuilder();
    if (ageMillis + minFreshMillis >= freshMillis) {
      builder.addHeader("Warning", "110 HttpURLConnection \"Response is stale\"");
    }
    long oneDayMillis = 24 * 60 * 60 * 1000L;
    if (ageMillis > oneDayMillis && isFreshnessLifetimeHeuristic()) {
      builder.addHeader("Warning", "113 HttpURLConnection \"Heuristic expiration\"");
    }
    return new CacheStrategy(null, builder.build());
}

回到intercept(),分情況得到strategy之后分別拿到request和response。

Request networkRequest = strategy.networkRequest;
Response cacheResponse = strategy.cacheResponse;

判斷了如果緩存不為空,調用了trackResponse(),和前面分析的get() put()一樣,是在Cache中的。

if (cache != null) {
  cache.trackResponse(strategy);
}

每次調用這個方法requestCount都會自增,之后判斷request是否為空,如果不空,requestCount自增,如果response不空,hitCount命中率加一。

  synchronized void trackResponse(CacheStrategy cacheStrategy) {
    requestCount++;

    if (cacheStrategy.networkRequest != null) {
      // If this is a conditional request, we'll increment hitCount if/when it hits.
      networkCount++;
    } else if (cacheStrategy.cacheResponse != null) {
      // This response uses the cache and not the network. That's a cache hit.
      hitCount++;
    }
  }

如果需要使用緩存,但是緩存Response為空,就關掉。

if (cacheCandidate != null && cacheResponse == null) {
  closeQuietly(cacheCandidate.body()); // The cache candidate wasn't applicable. Close it.}

如果策略不是從網(wǎng)絡獲取,并且也沒緩存,就會構建一個504的response。

// If we're forbidden from using the network and the cache is insufficient, fail.
    if (networkRequest == null && cacheResponse == null) {
      return new Response.Builder()
          .request(chain.request())
          .protocol(Protocol.HTTP_1_1)
          .code(504)
          .message("Unsatisfiable Request (only-if-cached)")
          .body(Util.EMPTY_RESPONSE)
          .sentRequestAtMillis(-1L)
          .receivedResponseAtMillis(System.currentTimeMillis())
          .build();
    }

如果有緩存,策略也不是從網(wǎng)絡獲取,那么就直接返回緩存結果。

    // If we don't need the network, we're done.
    if (networkRequest == null) {
      return cacheResponse.newBuilder()
          .cacheResponse(stripBody(cacheResponse))
          .build();
    }

調用攔截器進行網(wǎng)絡獲取,調用下一個攔截器ConnectInterceptor。

Response networkResponse = null;
    try {
      networkResponse = chain.proceed(networkRequest);
    } finally {
      // If we're crashing on I/O or otherwise, don't leak the cache body.
      if (networkResponse == null && cacheCandidate != null) {
        closeQuietly(cacheCandidate.body());
      }
    }

攔截器回來拿到網(wǎng)絡請求的response之后,我們判斷了如果響應碼304的話,就直接從緩存中讀取數(shù)據(jù)。

    // If we have a cache response too, then we're doing a conditional get.
    if (cacheResponse != null) {
      if (networkResponse.code() == HTTP_NOT_MODIFIED) {
        Response response = cacheResponse.newBuilder()
            .headers(combine(cacheResponse.headers(), networkResponse.headers()))
            .sentRequestAtMillis(networkResponse.sentRequestAtMillis())
            .receivedResponseAtMillis(networkResponse.receivedResponseAtMillis())
            .cacheResponse(stripBody(cacheResponse))
            .networkResponse(stripBody(networkResponse))
            .build();
        networkResponse.body().close();

        // Update the cache after combining headers but before stripping the
        // Content-Encoding header (as performed by initContentStream()).
        cache.trackConditionalCacheHit();
        cache.update(cacheResponse, response);
        return response;
      } else {
        closeQuietly(cacheResponse.body());
      }
    }

如果有響應體并且可緩存,那么就將響應寫入緩存。

if (HttpHeaders.hasBody(response) && CacheStrategy.isCacheable(response, networkRequest)) {
        // Offer this request to the cache.
        CacheRequest cacheRequest = cache.put(response);
        return cacheWritingResponse(cacheRequest, response);
      }

如果request是無效的,就要從緩存中刪除。

if (HttpMethod.invalidatesCache(networkRequest.method())) {
    try {
      cache.remove(networkRequest);
    } catch (IOException ignored) {
      // The cache cannot be written.
    }
  }

ConnectInterceptor

作用就是打開與服務器之間的連接,正式開啟OkHttp的網(wǎng)絡請求。

還是從ConnectInterceptor的intercept()來看,代碼非常短。

首先從realChain中拿到了streamAllocation對象,這個對象在RetryAndFollowInterceptor中就已經(jīng)初始化過了,只不過一直沒有使用,到了ConnectTnterceptor才使用。

StreamAllocation streamAllocation = realChain.streamAllocation();

判斷是否是GET請求,之后傳入newStream()生成了一個HttpCodec對象。這個對象是用于編碼request和解碼response的一個封裝好的對象。

// We need the network to satisfy this request. Possibly for validating a conditional GET.
boolean doExtensiveHealthChecks = !request.method().equals("GET");
HttpCodec httpCodec = streamAllocation.newStream(client, chain, doExtensiveHealthChecks);

接著獲取了一個RalConnection對象,這個對象就是用于實際的網(wǎng)絡傳輸?shù)模瑢懭雛equest、讀取response。

RealConnection connection = streamAllocation.connection();

將創(chuàng)建好的httpCodec和connection對象傳遞給下一個攔截器。

return realChain.proceed(request, streamAllocation, httpCodec, connection);

ConnectInterceptor就是這些,我們再來看一看StreamAllocation的newStream()方法。核心代碼在下面,首先使用findHealthyConnection獲取到了RealConnection,接著調用了realConnection的newCodec()獲取到了HttpCodec對象,最后在同步代碼塊中返回了httpCodec對象。

try {
      RealConnection resultConnection = findHealthyConnection(connectTimeout, readTimeout,
          writeTimeout, pingIntervalMillis, connectionRetryEnabled, doExtensiveHealthChecks);
      HttpCodec resultCodec = resultConnection.newCodec(client, chain, this);

      synchronized (connectionPool) {
        codec = resultCodec;
        return resultCodec;
      }
    } catch (IOException e) {
      throw new RouteException(e);
    }

我們再來看findHealthyConnection()??磧炔康难h(huán),可以看到又封裝了一層,調用findConnect()獲得RealConnection。接著在同步代碼塊判斷了realConnetion的successCount是否為零,如果是零就直接返回這個realConnection。如果不是,那就判斷一下是否為healthy的,也就是是否是可用的鏈接,如果不是就進行回收工作并繼續(xù)循環(huán)找下一個,如果是就直接返回。

while (true) {
      RealConnection candidate = findConnection(connectTimeout, readTimeout, writeTimeout,
          pingIntervalMillis, connectionRetryEnabled);

      // If this is a brand new connection, we can skip the extensive health checks.
      synchronized (connectionPool) {
        if (candidate.successCount == 0) {
          return candidate;
        }
      }

      // Do a (potentially slow) check to confirm that the pooled connection is still good. If it
      // isn't, take it out of the pool and start again.
      if (!candidate.isHealthy(doExtensiveHealthChecks)) {
        noNewStreams();
        continue;
      }

      return candidate;
    }

來看最終的findConnectiion(),代碼還是比較長的,但是看注釋就差不多知道邏輯了,如果連接存在,就直接使用,如果不存在就去連接池中獲取,再獲取不到的話就去新建一個連接。

  /**
   * Returns a connection to host a new stream. This prefers the existing connection if it exists,
   * then the pool, finally building a new connection.
   */

如果連接存在,就直接給result賦值。

if (this.connection != null) {
    // We had an already-allocated connection and it's good.
    result = this.connection;
    releasedConnection = null;
}

如果result為空,也就是沒有存在的連接,就去連接池獲取,如果獲取到了,就給result。

if (result == null) {
    // Attempt to get a connection from the pool.        
    Internal.instance.get(connectionPool, address, this, null);
    if (connection != null) {
      foundPooledConnection = true;
      result = connection;
    } else {
      selectedRoute = route;
    }
}

如果獲取到了result,那么就進行網(wǎng)絡連接。

// Do TCP + TLS handshakes. This is a blocking operation.
result.connect(connectTimeout, readTimeout, writeTimeout, pingIntervalMillis, connectionRetryEnabled, call, eventListener);

連接完之后,會把這個connection放到連接池中。

Internal.instance.put(connectionPool, result);

來看一看connect()的實現(xiàn),太多了,來看重點幾個地方。

檢查連接是否已經(jīng)建立了,如果建立了就拋出異常,protocol是連接中所可能用到的協(xié)議。

if (protocol != null) throw new IllegalStateException("already connected");

ConnectionSpec是一個指定連接的socket的配置。ConnectionSpecSelector用于選擇連接的,OkHttp中有兩種,一種是隧道連接,一種是socket連接。

List<ConnectionSpec> connectionSpecs = route.address().connectionSpecs();
ConnectionSpecSelector connectionSpecSelector = new ConnectionSpecSelector(connectionSpecs);

在這里while循環(huán)判斷是否要建立隧道連接,是就建立。

while (true) {
  try {
    if (route.requiresTunnel()) {
      connectTunnel(connectTimeout, readTimeout, writeTimeout, call, eventListener);
    ...
}

requiresTunnel()的實現(xiàn):

  /**
   * Returns true if this route tunnels HTTPS through an HTTP proxy. See <a
   * >RFC 2817, Section 5.2</a>.
   */
  public boolean requiresTunnel() {
    return address.sslSocketFactory != null && proxy.type() == Proxy.Type.HTTP;
  }

連接池

keep-alive和多路復用的實現(xiàn)都會引入一個連接池的概念,來維護整個OkHttp的網(wǎng)絡連接。OkHttp會把客戶端與服務端之間的連接抽先給Connection,RealConnection就是其實現(xiàn)類,為了管理這些連接,就提供了ConnectionPool這個類,為了復用連接而設計,當可以共用相同的地址,在時間范圍內就可以復用連接。還實現(xiàn)了哪些連接保持打開狀態(tài)以備后面來使用的策略。還有有有效的清理回收工作。

get()

遍歷連接集合總的所有連接,如果可用,就調用streamAllocation的aquire()。

  /**
   * Returns a recycled connection to {@code address}, or null if no such connection exists. The
   * route is null if the address has not yet been routed.
   */
  @Nullable RealConnection get(Address address, StreamAllocation streamAllocation, Route route) {
    assert (Thread.holdsLock(this));
    for (RealConnection connection : connections) {
      if (connection.isEligible(address, route)) {
        streamAllocation.acquire(connection, true);
        return connection;
      }
    }
    return null;
  }

來看aquire(),它將connection賦值給StreamAllocation的connection,最后將這個streamAllocation的弱應引用添加到這個connection的allocations這個集合當中去,這個集合是用來通過當前連接對象所持有的弱引用數(shù)量判斷這個網(wǎng)絡連接的負載量是否超過了它的最大值。

  /**
   * Use this allocation to hold {@code connection}. Each call to this must be paired with a call to
   * {@link #release} on the same connection.
   */
  public void acquire(RealConnection connection, boolean reportedAcquired) {
    assert (Thread.holdsLock(connectionPool));
    if (this.connection != null) throw new IllegalStateException();

    this.connection = connection;
    this.reportedAcquired = reportedAcquired;
    connection.allocations.add(new StreamAllocationReference(this, callStackTrace));
  }
put()

首先執(zhí)行了一個為清理工工作,之后將connection加入到連接集合當中。這個clenupRunnable就是屬于連接池清理回收的實現(xiàn)。

  void put(RealConnection connection) {
    assert (Thread.holdsLock(this));
    if (!cleanupRunning) {
      cleanupRunning = true;
      executor.execute(cleanupRunnable);
    }
    connections.add(connection);
  }
  • 每次http請求都會產(chǎn)生一個StreamAllocation對象
  • 每次都會將StreamAllocation的弱引用添加到RealConnection對象的allocations集合中。(通過這個集合的大小來判斷每一個connection是否超過了其最大連接數(shù))
  • 從連接池中復用連接

Connection 清理回收

  • Gc回收算法
  • streamAllocaton的數(shù)量漸漸變?yōu)?,被線程池檢測到,回收。
  • 獨立的線程cleanupRunnable清理連接池

來看cleanupRunnable,它是一段死循環(huán)

  private final Runnable cleanupRunnable = new Runnable() {
    @Override public void run() {
      while (true) {
        long waitNanos = cleanup(System.nanoTime());
        if (waitNanos == -1) return;
        if (waitNanos > 0) {
          long waitMillis = waitNanos / 1000000L;
          waitNanos -= (waitMillis * 1000000L);
          synchronized (ConnectionPool.this) {
            try {
              ConnectionPool.this.wait(waitMillis, (int) waitNanos);
            } catch (InterruptedException ignored) {
            }
          }
        }
      }
    }
  };

來一步一步分析,第一次清理的時候會返回下一次清理的時間間隔。

long waitNanos = cleanup(System.nanoTime());

try catch中會等待,時間過了之后會繼續(xù)循環(huán)清理。

ConnectionPool.this.wait(waitMillis, (int) waitNanos);

來看cleanup(),這個就是具體的gc算法,類似于標記清除算法。

在同步代碼塊中遍歷連接池所有的連接。

for (Iterator<RealConnection> i = connections.iterator(); i.hasNext(); )

判斷此連接是否有使用,如果有就繼續(xù)循環(huán)判斷下一個連接。

if (pruneAndGetAllocationCount(connection, now) > 0) {
  inUseConnectionCount++;
  continue;
}

看pruneAndGetAllocationCount()的實現(xiàn)

拿到當前connection的allocations這個allocation弱應用集合,對其每個弱引用循環(huán)遍歷。

List<Reference<StreamAllocation>> references = connection.allocations;
    for (int i = 0; i < references.size(); )

查看這個streamallocation是否為空,如果不為空就繼續(xù)遍歷下一個弱應用。

Reference<StreamAllocation> reference = references.get(i);
if (reference.get() != null) {
    i++;
    continue;
}

如果為空就應該刪除這個引用。

references.remove(i);

如果都刪空了,就意味著這個connection沒有任何引用,就返回零,其它情況返回引用的數(shù)值。

if (references.isEmpty()) {
    connection.idleAtNanos = now - keepAliveDurationNs;
    return 0;
}

回到cleanup方法,如果pruneAndGetAllocationCount()返回0,那么進行下面的回收工作。

如果控線連接超過數(shù)量,那么就回收這個連接。

if (longestIdleDurationNs >= this.keepAliveDurationNs
      || idleConnectionCount > this.maxIdleConnections) {
    // We've found a connection to evict. Remove it from the list, then close it below (outside
    // of the synchronized block).
    connections.remove(longestIdleConnection);
}

如果都是活躍連接,那么返回時間。

else if (idleConnectionCount > 0) {
    // A connection will be ready to evict soon.
    return keepAliveDurationNs - longestIdleDurationNs;
}

如果還可以塞下連接,但是有可能有空閑連接,直接返回keepAlive時間。

else if (inUseConnectionCount > 0) {
    // All connections are in use. It'll be at least the keep alive duration 'til we run again.
    return keepAliveDurationNs;
}

如果沒有任何連接,那就返回-1跳出cleanupRunnable的清理循環(huán)。

else {
    // No connections, idle or in use.
    cleanupRunning = false;
    return -1;
}
  • OkHttp使用了gc回收算法
  • 判斷streamAllocation數(shù)量
  • 就可以保持多個健康的keep-alive連接。

CallServerInterceptor

如果沒有添加networkInterceptor的話,ConnectInterceptor的下一步就到了CallServerInterceptor。負責向流中寫入請求并讀取響應的工作。
首先拿到上一個攔截器傳來的httpCodec對象,這個對象是整個攔截器的核心,因為寫入請求和讀取響應都是使用這個httpCodec的。

HttpCodec httpCodec = realChain.httpStream();

這些對象都是之前攔截器已經(jīng)初始化好的,connection是上一步已經(jīng)完成連接工作的連接。

StreamAllocation streamAllocation = realChain.streamAllocation();
RealConnection connection = (RealConnection) realChain.connection();
Request request = realChain.request();

使用httpCodec將請求頭寫入

httpCodec.writeRequestHeaders(request);

看看writeRequestHeaders()的實現(xiàn)。HttpCodec是一個接口,看Http1Codec中的實現(xiàn)。拿到請求行和請求頭,傳給writeRequest()。

@Override public void writeRequestHeaders(Request request) throws IOException {
  String requestLine = RequestLine.get(
      request, streamAllocation.connection().route().proxy().type());
  writeRequest(request.headers(), requestLine);
}

sink是封裝了socket的輸出流,涉及到Okio的知識,按照Http消息格式寫入請求行和請求頭。

public void writeRequest(Headers headers, String requestLine) throws IOException {
  if (state != STATE_IDLE) throw new IllegalStateException("state: " + state);
  sink.writeUtf8(requestLine).writeUtf8("\r\n");
  for (int i = 0, size = headers.size(); i < size; i++) {
    sink.writeUtf8(headers.name(i))
        .writeUtf8(": ")
        .writeUtf8(headers.value(i))
        .writeUtf8("\r\n");
  }
  sink.writeUtf8("\r\n");
  state = STATE_OPEN_REQUEST_BODY;
}

寫入請求頭之后,會有一個請求頭的判斷,如果有Expect:100-continue,就不去寫入請求body了,直接開始讀取響應頭。100-continue用于客戶端在發(fā)送POST數(shù)據(jù)給服務器前,征詢服務器情況,看服務器是否處理POST的數(shù)據(jù),如果不處理,客戶端則不上傳POST的body數(shù)據(jù),如果處理,則POST上傳數(shù)據(jù)。在現(xiàn)實應用中,當在POST大數(shù)據(jù)時,才會使用100-continue協(xié)議。

客戶端在發(fā)送請求數(shù)據(jù)之前去判斷服務器是否愿意接收該數(shù)據(jù),如果服務器愿意接收,客戶端才會真正發(fā)送數(shù)據(jù),這么做的原因是如果客戶端直接發(fā)送請求數(shù)據(jù),但是服務器又將該請求拒絕的話,這種行為將帶來很大的資源開銷。所以為了避免這種情況,并不是所有的server都會正確處理并且應答”100-continue“。

if ("100-continue".equalsIgnoreCase(request.header("Expect"))) {
  httpCodec.flushRequest();
  realChain.eventListener().responseHeadersStart(realChain.call());
  responseBuilder = httpCodec.readResponseHeaders(true);
}

接著判斷100-continue是否讀取到響應頭,如果沒有讀取到響應,說明服務端是想接受request body的,或者是正常沒有100-continue的情況,就繼續(xù)正常的步驟寫入request body。

if (responseBuilder == null) {
  realChain.eventListener().requestBodyStart(realChain.call());
  long contentLength = request.body().contentLength();
  CountingSink requestBodyOut =
      new CountingSink(httpCodec.createRequestBody(request, contentLength));
  BufferedSink bufferedRequestBody = Okio.buffer(requestBodyOut);
  request.body().writeTo(bufferedRequestBody);
  bufferedRequestBody.close();
  realChain.eventListener()
      .requestBodyEnd(realChain.call(), requestBodyOut.successfulCount);
} 

else if中如果已有響應,那么再去判斷一下這個連接是否為http2.0的,因為http2.0可以多路復用,這個連接可以復用。如果不是,那么這個流就已經(jīng)不需要了,調用streamAllocation的noNewStreams()來關閉這個連接,防止之后在這個連接上創(chuàng)建新的流。

else if (!connection.isMultiplexed()) {
  streamAllocation.noNewStreams();
}
/**
 * Returns true if this is an HTTP/2 connection. Such connections can be used in multiple HTTP
 * requests simultaneously.
 */
public boolean isMultiplexed() {
  return http2Connection != null;
}

到這里,寫入工作就完成了,下面就開始了讀取工作。

httpCodec.finishRequest();

如果之前還沒有讀取到響應頭,就讀,是上面不經(jīng)歷100-continue的情況。到這一步,其實不管是經(jīng)歷了100-continue還是沒經(jīng)歷,都已經(jīng)入去了一次響應頭,只不過readResponseHeaders的參數(shù)不同,100-continue是true,表示之后還有可能再讀取一次,因為最初讀到的可能不是真正響應的響應頭。

if (responseBuilder == null) {
  realChain.eventListener().responseHeadersStart(realChain.call());
  responseBuilder = httpCodec.readResponseHeaders(false);
}

獲取響應碼,如果是100,則再讀取一次響應頭,完善response對象,code賦值真正響應碼。

int code = response.code();
if (code == 100) {
  // server sent a 100-continue even though we did not request one.
  // try again to read the actual response
  responseBuilder = httpCodec.readResponseHeaders(false);
  response = responseBuilder
          .request(request)
          .handshake(streamAllocation.connection().handshake())
          .sentRequestAtMillis(sentRequestMillis)
          .receivedResponseAtMillis(System.currentTimeMillis())
          .build();
  code = response.code();
}

這一步判斷是否為websocket,如果不是就讀取響應體,繼續(xù)構建response。

if (forWebSocket && code == 101) { 
  response = response.newBuilder()
      .body(Util.EMPTY_RESPONSE)
      .build();
} else {
  response = response.newBuilder()
      .body(httpCodec.openResponseBody(response))
      .build();
}

從請求頭和響應頭判斷其中是否有表明需要保持連接打開,如果不需要就釋放連接和流。

if ("close".equalsIgnoreCase(response.request().header("Connection"))
    || "close".equalsIgnoreCase(response.header("Connection"))) {
  streamAllocation.noNewStreams();
}

判斷是否無內容,拋異常。

204:空內容,服務器成功執(zhí)行請求,但是沒有返回信息。205:重置內容,服務器成功執(zhí)行了請求,但是但是沒有返回內容,與204不同,他需要請求者重置文檔視圖(比如,清除表單內容,重新輸入)。

if ((code == 204 || code == 205) && response.body().contentLength() > 0) {
  throw new ProtocolException(
      "HTTP " + code + " had non-zero Content-Length: " + response.body().contentLength());
}

最后把response返回給之前的攔截器。

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

友情鏈接更多精彩內容