OKHttp源碼解析(九):OKHTTP連接中三個(gè)"核心"RealConnection、ConnectionPool、StreamAllocation

本片文章終于講到OKHTTP中的核心了,復(fù)用連接池,本片文章的順序是

  • 1、RealConnection類
  • 2、ConnectionPool類
  • 3、StreamAllocation類

一、RealConnection

RealConnection是Connection的實(shí)現(xiàn)類,代表著鏈接socket的鏈路,如果擁有了一個(gè)RealConnection就代表了我們已經(jīng)跟服務(wù)器有了一條通信鏈路,而且通過
RealConnection代表是連接socket鏈路,RealConnection對(duì)象意味著我們已經(jīng)跟服務(wù)端有了一條通信鏈路了。很多朋友這時(shí)候會(huì)想到,有通信鏈路了,是不是與意味著在這個(gè)類實(shí)現(xiàn)的三次握手,你們猜對(duì)了,的確是在這個(gè)類里面實(shí)現(xiàn)的三次握手。在講握手的之前,看下它的屬性和構(gòu)造函數(shù),對(duì)他有個(gè)大概的了解。

  private final ConnectionPool connectionPool;
  private final Route route;

  // The fields below are initialized by connect() and never reassigned.
  //下面這些字段,通過connect()方法開始初始化,并且絕對(duì)不會(huì)再次賦值
  /** The low-level TCP socket. */
  private Socket rawSocket; //底層socket
  /**
   * The application layer socket. Either an {@link SSLSocket} layered over {@link #rawSocket}, or
   * {@link #rawSocket} itself if this connection does not use SSL.
   */
  private Socket socket;  //應(yīng)用層socket
  //握手
  private Handshake handshake;
   //協(xié)議
  private Protocol protocol;
   // http2的鏈接
  private Http2Connection http2Connection;
  //通過source和sink,大家可以猜到是與服務(wù)器交互的輸入輸出流
  private BufferedSource source;
  private BufferedSink sink;

  // The fields below track connection state and are guarded by connectionPool.
  //下面這個(gè)字段是 屬于表示鏈接狀態(tài)的字段,并且有connectPool統(tǒng)一管理
  /** If true, no new streams can be created on this connection. Once true this is always true. */
  //如果noNewStreams被設(shè)為true,則noNewStreams一直為true,不會(huì)被改變,并且表示這個(gè)鏈接不會(huì)再創(chuàng)新的stream流
  public boolean noNewStreams;
  
  //成功的次數(shù)
  public int successCount;

  /**
   * The maximum number of concurrent streams that can be carried by this connection. If {@code
   * allocations.size() < allocationLimit} then new streams can be created on this connection.
   */
  //此鏈接可以承載最大并發(fā)流的限制,如果不超過限制,可以隨意增加
  public int allocationLimit = 1;

通過上面代碼,我們可以得出以下結(jié)論:

1、里面除了route 字段,部分的字段都是在connect()方法里面賦值的,并且不會(huì)再次賦值
2、這里含有source和sink,所以可以以流的形式對(duì)服務(wù)器進(jìn)行交互
3、noNewStream可以簡(jiǎn)單理解為它表示該連接不可用。這個(gè)值一旦被設(shè)為true,則這個(gè)conncetion則不會(huì)再創(chuàng)建stream。
4、allocationLimit是分配流的數(shù)量上限,一個(gè)connection最大只能支持一個(gè)1并發(fā)
5、allocations是關(guān)聯(lián)StreamAllocation,它用來統(tǒng)計(jì)在一個(gè)連接上建立了哪些流,通過StreamAllocation的acquire方法和release方法可以將一個(gè)allcation對(duì)方添加到鏈表或者移除鏈表,

其實(shí)大家估計(jì)已經(jīng)猜到了connect()里面進(jìn)行了三次握手,大家也猜對(duì)了,那咱們就簡(jiǎn)單的介紹下connect()方法:

public void connect( int connectTimeout, int readTimeout, int writeTimeout, boolean connectionRetryEnabled) {
    if (protocol != null) throw new IllegalStateException("already connected");
     // 線路的選擇
    RouteException routeException = null;
    List<ConnectionSpec> connectionSpecs = route.address().connectionSpecs();
    ConnectionSpecSelector connectionSpecSelector = new ConnectionSpecSelector(connectionSpecs);

    if (route.address().sslSocketFactory() == null) {
      if (!connectionSpecs.contains(ConnectionSpec.CLEARTEXT)) {
        throw new RouteException(new UnknownServiceException(
            "CLEARTEXT communication not enabled for client"));
      }
      String host = route.address().url().host();
      if (!Platform.get().isCleartextTrafficPermitted(host)) {
        throw new RouteException(new UnknownServiceException(
            "CLEARTEXT communication to " + host + " not permitted by network security policy"));
      }
    }
    // 連接開始
    while (true) {
      try {
        // 如果要求隧道模式,建立通道連接,通常不是這種
        if (route.requiresTunnel()) {
          connectTunnel(connectTimeout, readTimeout, writeTimeout);
        } else {
           // 一般都走這條邏輯了,實(shí)際上很簡(jiǎn)單就是socket的連接
          connectSocket(connectTimeout, readTimeout);
        }
        // https的建立
        establishProtocol(connectionSpecSelector);
        break;
      } catch (IOException e) {
        closeQuietly(socket);
        closeQuietly(rawSocket);
        socket = null;
        rawSocket = null;
        source = null;
        sink = null;
        handshake = null;
        protocol = null;
        http2Connection = null;

        if (routeException == null) {
          routeException = new RouteException(e);
        } else {
          routeException.addConnectException(e);
        }

        if (!connectionRetryEnabled || !connectionSpecSelector.connectionFailed(e)) {
          throw routeException;
        }
      }
    }

    if (http2Connection != null) {
      synchronized (connectionPool) {
        allocationLimit = http2Connection.maxConcurrentStreams();
      }
    }
  }

這里的執(zhí)行過程大體如下;

  • 1、檢查連接是否已經(jīng)建立,若已經(jīng)建立,則拋出異常,否則繼續(xù),連接的是否簡(jiǎn)歷由protocol標(biāo)示,它表示在整個(gè)連接建立,及可能的協(xié)商過程中選擇所有要用到的協(xié)議。
  • 2、用 集合connnectionspecs構(gòu)造ConnectionSpecSelector。
  • 3、如果請(qǐng)求是不安全的請(qǐng)求,會(huì)對(duì)請(qǐng)求執(zhí)行一些額外的限制:
    3.1、ConnectionSpec集合必須包含ConnectionSpec.CLEARTEXT。也就是說OkHttp用戶可以通過OkHttpClient設(shè)置不包含ConnectionSpec.CLEARTEXT的ConnectionSpec集合來禁用所有的明文要求。
    3.2、平臺(tái)本身的安全策略允向相應(yīng)的主機(jī)發(fā)送明文請(qǐng)求。對(duì)于Android平臺(tái)而言,這種安全策略主要由系統(tǒng)的組件android.security.NetworkSecurityPolicy執(zhí)行。平臺(tái)的這種安全策略不是每個(gè)Android版本都有的。Android6.0之后存在這種控制。
    (okhttp/okhttp/src/main/java/okhttp3/internal/platform/AndroidPlatform.java 里面的isCleartextTrafficPermitted()方法)
  • 4、根據(jù)請(qǐng)求判斷是否需要建立隧道連接,如果建立隧道連接則調(diào)用
    connectTunnel(connectTimeout, readTimeout, writeTimeout);
  • 5、如果不是隧道連接則調(diào)用connectSocket(connectTimeout, readTimeout);建立普通連接。
  • 6、通過調(diào)用establishProtocol建立協(xié)議
  • 7、如果是HTTP/2,則設(shè)置相關(guān)屬性。

整個(gè)流程已經(jīng)梳理完,咱們就摳一下具體的細(xì)節(jié),首先來看下建立普通連接,因?yàn)樗淼肋B接也會(huì)用到普通連接的代碼:
看下connectSocket()方法

/** Does all the work necessary to build a full HTTP or HTTPS connection on a raw socket. */
  private void connectSocket(int connectTimeout, int readTimeout) throws IOException {
    Proxy proxy = route.proxy();
    Address address = route.address();
     // 根據(jù)代理類型來選擇socket類型,是代理還是直連
    rawSocket = proxy.type() == Proxy.Type.DIRECT || proxy.type() == Proxy.Type.HTTP
        ? address.socketFactory().createSocket()
        : new Socket(proxy);

    rawSocket.setSoTimeout(readTimeout);
    try {
      // 連接socket,之所以這樣寫是因?yàn)橹С植煌钠脚_(tái)
      //里面實(shí)際上是  socket.connect(address, connectTimeout);
      Platform.get().connectSocket(rawSocket, route.socketAddress(), connectTimeout);
    } catch (ConnectException e) {
      ConnectException ce = new ConnectException("Failed to connect to " + route.socketAddress());
      ce.initCause(e);
      throw ce;
    }
     // 得到輸入/輸出流
    source = Okio.buffer(Okio.source(rawSocket));
    sink = Okio.buffer(Okio.sink(rawSocket));
  }

有3種情況需要建立普通連接:

  • 無代理
  • 明文的HTTP代理
  • SOCKS代理

普通連接的建立過程為建立TCP連接,建立TCP連接的過程為:

  • 1、創(chuàng)建Socket,非SOCKS代理的情況下,通過SocketFactory創(chuàng)建;在SOCKS代理則傳入proxy手動(dòng)new一個(gè)出來。
  • 2、為Socket設(shè)置超時(shí)
  • 3、完成特定于平臺(tái)的連接建立
  • 4、創(chuàng)建用于I/O的source和sink

下面我來看下connectSocket()的具體實(shí)現(xiàn),connectSocket()具體實(shí)現(xiàn)是AndroidPlatform.java里面的connectSocket()。
關(guān)于AndroidPlatform.java請(qǐng)看上一篇文章。

設(shè)置了SOCKS代理的情況下,僅有的特別之處在于,是通過傳入proxy手動(dòng)創(chuàng)建Socket。route的socketAddress包含目標(biāo)HTTP服務(wù)器的域名。由此可見SOCKS協(xié)議的處理,主要是在Java標(biāo)準(zhǔn)庫的java.net.Socket中處理,對(duì)于外界而言,就好像是HTTP服務(wù)器直接建立連接一樣,因此連接時(shí)傳入的地址都是HTTP服務(wù)器的域名。

而對(duì)于明文的HTTP代理的情況下,這里滅有任何特殊處理。route的socketAddress包含著代理服務(wù)器的IP地址。HTTP代理自身會(huì)根據(jù)請(qǐng)求及相應(yīng)的實(shí)際內(nèi)容,建立與HTTP服務(wù)器的TCP連接,并轉(zhuǎn)發(fā)數(shù)據(jù)。

這時(shí)候我們?cè)賮砜聪陆⑺淼肋壿嫞?/p>

  /**
   * Does all the work to build an HTTPS connection over a proxy tunnel. The catch here is that a
   * proxy server can issue an auth challenge and then close the connection.
   */
  private void connectTunnel(int connectTimeout, int readTimeout, int writeTimeout)
      throws IOException {
    Request tunnelRequest = createTunnelRequest();
    HttpUrl url = tunnelRequest.url();
    int attemptedConnections = 0;
    int maxAttempts = 21;
    while (true) {
      if (++attemptedConnections > maxAttempts) {
        throw new ProtocolException("Too many tunnel connections attempted: " + maxAttempts);
      }

      connectSocket(connectTimeout, readTimeout);
      tunnelRequest = createTunnel(readTimeout, writeTimeout, tunnelRequest, url);

      if (tunnelRequest == null) break; // Tunnel successfully created.

      // The proxy decided to close the connection after an auth challenge. We need to create a new
      // connection, but this time with the auth credentials.
      closeQuietly(rawSocket);
      rawSocket = null;
      sink = null;
      source = null;
    }
  }

建立隧道連接的過程又分為幾個(gè)步驟:

  • 創(chuàng)建隧道請(qǐng)求
  • 建立Socket連接
  • 發(fā)送請(qǐng)求建立隧道

隧道請(qǐng)求是一個(gè)常規(guī)的HTTP請(qǐng)求,只是請(qǐng)求的內(nèi)容有點(diǎn)特殊。最初創(chuàng)建的隧道請(qǐng)求如:

  /**
   * Returns a request that creates a TLS tunnel via an HTTP proxy. Everything in the tunnel request
   * is sent unencrypted to the proxy server, so tunnels include only the minimum set of headers.
   * This avoids sending potentially sensitive data like HTTP cookies to the proxy unencrypted.
   */
  private Request createTunnelRequest() {
    return new Request.Builder()
        .url(route.address().url())
        .header("Host", Util.hostHeader(route.address().url(), true))
        .header("Proxy-Connection", "Keep-Alive") // For HTTP/1.0 proxies like Squid.
        .header("User-Agent", Version.userAgent())
        .build();
  }

一個(gè)隧道請(qǐng)求的例子如下:

隧道.png

請(qǐng)求的"Host" header中包含了目標(biāo)HTTP服務(wù)器的域名。建立socket連接的過程這里就不細(xì)說了

創(chuàng)建隧道的過程是這樣子的:

/**
   * To make an HTTPS connection over an HTTP proxy, send an unencrypted CONNECT request to create
   * the proxy connection. This may need to be retried if the proxy requires authorization.
   */
  private Request createTunnel(int readTimeout, int writeTimeout, Request tunnelRequest,
      HttpUrl url) throws IOException {
    // Make an SSL Tunnel on the first message pair of each SSL + proxy connection.
    String requestLine = "CONNECT " + Util.hostHeader(url, true) + " HTTP/1.1";
    while (true) {
      Http1Codec tunnelConnection = new Http1Codec(null, null, source, sink);
      source.timeout().timeout(readTimeout, MILLISECONDS);
      sink.timeout().timeout(writeTimeout, MILLISECONDS);
      tunnelConnection.writeRequest(tunnelRequest.headers(), requestLine);
      tunnelConnection.finishRequest();
      Response response = tunnelConnection.readResponseHeaders(false)
          .request(tunnelRequest)
          .build();
      // The response body from a CONNECT should be empty, but if it is not then we should consume
      // it before proceeding.
      long contentLength = HttpHeaders.contentLength(response);
      if (contentLength == -1L) {
        contentLength = 0L;
      }
      Source body = tunnelConnection.newFixedLengthSource(contentLength);
      Util.skipAll(body, Integer.MAX_VALUE, TimeUnit.MILLISECONDS);
      body.close();

      switch (response.code()) {
        case HTTP_OK:
          // Assume the server won't send a TLS ServerHello until we send a TLS ClientHello. If
          // that happens, then we will have buffered bytes that are needed by the SSLSocket!
          // This check is imperfect: it doesn't tell us whether a handshake will succeed, just
          // that it will almost certainly fail because the proxy has sent unexpected data.
          if (!source.buffer().exhausted() || !sink.buffer().exhausted()) {
            throw new IOException("TLS tunnel buffered too many bytes!");
          }
          return null;

        case HTTP_PROXY_AUTH:
          tunnelRequest = route.address().proxyAuthenticator().authenticate(route, response);
          if (tunnelRequest == null) throw new IOException("Failed to authenticate with proxy");

          if ("close".equalsIgnoreCase(response.header("Connection"))) {
            return tunnelRequest;
          }
          break;

        default:
          throw new IOException(
              "Unexpected response code for CONNECT: " + response.code());
      }
    }
  }

在前面創(chuàng)建的TCP連接值上,完成代理服務(wù)器的HTTP請(qǐng)求/響應(yīng)交互。請(qǐng)求的內(nèi)容類似下面這樣:

"CONNECT m.taobao.com:443 HTTP/1.1"

這里可能會(huì)根據(jù)HTTP代理是否需要認(rèn)證而有多次HTTP請(qǐng)求/響應(yīng)交互。
總結(jié)一下OkHttp3中代理相關(guān)的處理;

  • 1、沒有設(shè)置代理的情況下,直接與HTTP服務(wù)器建立TCP連接,然后進(jìn)行HTTP請(qǐng)求/響應(yīng)的交互。
  • 2、設(shè)置了SOCKS代理的情況下,創(chuàng)建Socket時(shí),為其傳入proxy,連接時(shí)還是以HTTP服務(wù)器為目標(biāo)。在標(biāo)準(zhǔn)庫的Socket中完成SOCKS協(xié)議相關(guān)的處理。此時(shí)基本上感知不到代理的存在。
  • 3、設(shè)置了HTTP代理時(shí)的HTTP請(qǐng)求,與HTTP代理服務(wù)器建立TCP連接。HTTP代理服務(wù)器解析HTTP請(qǐng)求/響應(yīng)的內(nèi)容,并根據(jù)其中的信息來完成數(shù)據(jù)的轉(zhuǎn)發(fā)。也就是說,如果HTTP請(qǐng)求中不包含"Host"header,則有可能在設(shè)置了HTTP代理的情況下無法與HTTP服務(wù)器建立連接。
  • 4、HTTP代理時(shí)的HTTPS/HTTP2請(qǐng)求,與HTTP服務(wù)器建立通過HTTP代理的隧道連接。HTTP代理不再解析傳輸?shù)臄?shù)據(jù),僅僅完成數(shù)據(jù)轉(zhuǎn)發(fā)的功能。此時(shí)HTTP代理的功能退化為如同SOCKS代理類似。
  • 5、設(shè)置了代理類時(shí),HTTP的服務(wù)器的域名解析會(huì)交給代理服務(wù)器執(zhí)行。其中設(shè)置了HTTP代理時(shí),會(huì)對(duì)HTTP代理的域名做域名解析。

上述流程弄明白后,來看下建立協(xié)議
不管是建立隧道連接,還是建立普通連接,都少不了建立協(xié)議這一步驟,這一步是建立好TCP連接之后,而在該TCP能被拿來手法數(shù)據(jù)之前執(zhí)行的。它主要為了數(shù)據(jù)的加密傳輸做一些初始化,比如TCL握手,HTTP/2的協(xié)商。

  private void establishProtocol(ConnectionSpecSelector connectionSpecSelector) throws IOException {
    //如果不是ssl
    if (route.address().sslSocketFactory() == null) {
      protocol = Protocol.HTTP_1_1;
      socket = rawSocket;
      return;
    }
    //如果是sll
    connectTls(connectionSpecSelector);
   //如果是HTTP2
    if (protocol == Protocol.HTTP_2) {
      socket.setSoTimeout(0); // HTTP/2 connection timeouts are set per-stream.
      http2Connection = new Http2Connection.Builder(true)
          .socket(socket, route.address().url().host(), source, sink)
          .listener(this)
          .build();
      http2Connection.start();
    }
  }

上面的代碼大體上可以歸納為兩點(diǎn)

  • 1、對(duì)于加密的數(shù)據(jù)傳輸,創(chuàng)建TLS連接。對(duì)于明文傳輸,則設(shè)置protocol和socket。socket直接指向應(yīng)用層,如HTTP或HTTP/2,交互的Socket。
    1.1對(duì)于明文傳輸沒有設(shè)置HTTP代理的HTTP請(qǐng)求,它是與HTTP服務(wù)器之間的TCP socket。
    1.2對(duì)于加密傳輸沒有設(shè)置HTTP代理服務(wù)器的HTTP或HTTP2請(qǐng)求,它是與HTTP服務(wù)器之間的SSLSocket。
    1.3對(duì)于加密傳輸設(shè)置了HTTP代理服務(wù)器的HTTP或HTTP2請(qǐng)求,它是與HTTP服務(wù)器之間經(jīng)過代理服務(wù)器的SSLSocket,一個(gè)隧道連接;
    1.4對(duì)于加密傳輸設(shè)置了SOCKS代理的HTTP或HTTP2請(qǐng)求,它是一條經(jīng)過了代理服務(wù)器的SSLSocket連接。
  • 2、對(duì)于HTTP/2,通過new 一個(gè)Http2Connection.Builder會(huì)建立HTTP/2連接 Http2Connection,然后執(zhí)行http2Connection.start()和服務(wù)器建立協(xié)議。我們先來看下建立TLS連接的connectTls()方法
 private void connectTls(ConnectionSpecSelector connectionSpecSelector) throws IOException {
    Address address = route.address();
    SSLSocketFactory sslSocketFactory = address.sslSocketFactory();
    boolean success = false;
    SSLSocket sslSocket = null;
    try {
      // Create the wrapper over the connected socket.
      //在原來的Socket加一層ssl
      sslSocket = (SSLSocket) sslSocketFactory.createSocket(
          rawSocket, address.url().host(), address.url().port(), true /* autoClose */);

      // Configure the socket's ciphers, TLS versions, and extensions.
      ConnectionSpec connectionSpec = connectionSpecSelector.configureSecureSocket(sslSocket);
      if (connectionSpec.supportsTlsExtensions()) {
        Platform.get().configureTlsExtensions(
            sslSocket, address.url().host(), address.protocols());
      }

      // Force handshake. This can throw!
      sslSocket.startHandshake();
      Handshake unverifiedHandshake = Handshake.get(sslSocket.getSession());

      // Verify that the socket's certificates are acceptable for the target host.
      if (!address.hostnameVerifier().verify(address.url().host(), sslSocket.getSession())) {
        X509Certificate cert = (X509Certificate) unverifiedHandshake.peerCertificates().get(0);
        throw new SSLPeerUnverifiedException("Hostname " + address.url().host() + " not verified:"
            + "\n    certificate: " + CertificatePinner.pin(cert)
            + "\n    DN: " + cert.getSubjectDN().getName()
            + "\n    subjectAltNames: " + OkHostnameVerifier.allSubjectAltNames(cert));
      }

      // Check that the certificate pinner is satisfied by the certificates presented.
      address.certificatePinner().check(address.url().host(),
          unverifiedHandshake.peerCertificates());

      // Success! Save the handshake and the ALPN protocol.
      String maybeProtocol = connectionSpec.supportsTlsExtensions()
          ? Platform.get().getSelectedProtocol(sslSocket)
          : null;
      socket = sslSocket;
      source = Okio.buffer(Okio.source(socket));
      sink = Okio.buffer(Okio.sink(socket));
      handshake = unverifiedHandshake;
      protocol = maybeProtocol != null
          ? Protocol.get(maybeProtocol)
          : Protocol.HTTP_1_1;
      success = true;
    } catch (AssertionError e) {
      if (Util.isAndroidGetsocknameError(e)) throw new IOException(e);
      throw e;
    } finally {
      if (sslSocket != null) {
        Platform.get().afterHandshake(sslSocket);
      }
      if (!success) {
        closeQuietly(sslSocket);
      }
    }
  }

TLS連接是對(duì)原始TCP連接的一個(gè)封裝,以及聽過TLS握手,及數(shù)據(jù)手法過程中的加解密等功能。在Java中,用SSLSocket來描述。上面建立的TLS連接的過程大體為:

  • 1、用SSLSocketFactory基于原始的TCP Socket,創(chuàng)建一個(gè)SSLSocket。
  • 2、并配置SSLSocket。
  • 3、在前面選擇的ConnectionSpec支持TLS擴(kuò)展參數(shù)時(shí),配置TLS擴(kuò)展參數(shù)。
  • 4、啟動(dòng)TLS握手
  • 5、TLS握手完成之后,獲取證書信息。
  • 6、對(duì)TLS握手過程中傳回來的證書進(jìn)行驗(yàn)證。
  • 7、在前面選擇的ConnectionSpec支持TLS擴(kuò)展參數(shù)時(shí),獲取TLS握手過程中順便完成的協(xié)議協(xié)商過程所選擇的協(xié)議。這個(gè)過程主要用于HTTP/2的ALPN擴(kuò)展。
  • 8、OkHttp主要使用Okio來做IO操作,這里會(huì)基于前面獲取到SSLSocket創(chuàng)建于執(zhí)行的IO的BufferedSource和BufferedSink等,并保存握手信息以及所選擇的協(xié)議。

至此連接已經(jīng)建立連接已經(jīng)結(jié)束了。

這里說一下isHealthy(boolean doExtensiveChecks)方法,入?yún)⑹且粋€(gè)布爾類,表示是否需要額外的檢查。這里主要是檢查,判斷這個(gè)連接是否是健康的連接,即是否可以重用。那我們來看下

/** Returns true if this connection is ready to host new streams. */
  public boolean isHealthy(boolean doExtensiveChecks) {
    if (socket.isClosed() || socket.isInputShutdown() || socket.isOutputShutdown()) {
      return false;
    }

    if (http2Connection != null) {
      return !http2Connection.isShutdown();
    }

    if (doExtensiveChecks) {
      try {
        int readTimeout = socket.getSoTimeout();
        try {
          socket.setSoTimeout(1);
          if (source.exhausted()) {
            return false; // Stream is exhausted; socket is closed.
          }
          return true;
        } finally {
          socket.setSoTimeout(readTimeout);
        }
      } catch (SocketTimeoutException ignored) {
        // Read timed out; socket is good.
      } catch (IOException e) {
        return false; // Couldn't read; socket is closed.
      }
    }
    return true;
  }

看上述代碼可知,同時(shí)滿足如下條件才是健康的連接,否則返回false

  • 1、socket已經(jīng)關(guān)閉
  • 2、輸入流關(guān)閉
  • 3、輸出流關(guān)閉
  • 4、如果是HTTP/2連接,則HTTP/2連接也要關(guān)閉。

讓我們?cè)賮砜聪耰sEligible(Address, Route)方法,這個(gè)方法主要是判斷面對(duì)給出的addres和route,這個(gè)realConnetion是否可以重用。

/**
   * Returns true if this connection can carry a stream allocation to {@code address}. If non-null
   * {@code route} is the resolved route for a connection.
   */
  public boolean isEligible(Address address, Route route) {
    // If this connection is not accepting new streams, we're done.
    if (allocations.size() >= allocationLimit || noNewStreams) return false;

    // If the non-host fields of the address don't overlap, we're done.
    if (!Internal.instance.equalsNonHost(this.route.address(), address)) return false;

    // If the host exactly matches, we're done: this connection can carry the address.
    if (address.url().host().equals(this.route().address().url().host())) {
      return true; // This connection is a perfect match.
    }

    // At this point we don't have a hostname match. But we still be able to carry the request if
    // our connection coalescing requirements are met. See also:
    // https://hpbn.co/optimizing-application-delivery/#eliminate-domain-sharding
    // https://daniel.haxx.se/blog/2016/08/18/http2-connection-coalescing/

    // 1. This connection must be HTTP/2.
    if (http2Connection == null) return false;

    // 2. The routes must share an IP address. This requires us to have a DNS address for both
    // hosts, which only happens after route planning. We can't coalesce connections that use a
    // proxy, since proxies don't tell us the origin server's IP address.
    if (route == null) return false;
    if (route.proxy().type() != Proxy.Type.DIRECT) return false;
    if (this.route.proxy().type() != Proxy.Type.DIRECT) return false;
    if (!this.route.socketAddress().equals(route.socketAddress())) return false;

    // 3. This connection's server certificate's must cover the new host.
    if (route.address().hostnameVerifier() != OkHostnameVerifier.INSTANCE) return false;
    if (!supportsUrl(address.url())) return false;

    // 4. Certificate pinning must match the host.
    try {
      address.certificatePinner().check(address.url().host(), handshake().peerCertificates());
    } catch (SSLPeerUnverifiedException e) {
      return false;
    }

    return true; // The caller's address can be carried by this connection.
  }

判斷邏輯如下:

  • 如果連接達(dá)到共享上限,則不能重用
  • 非host域必須完全一樣,如果不一樣不能重用
  • 如果此時(shí)host域也相同,則符合條件,可以被復(fù)用
  • 如果host不相同,在HTTP/2的域名切片場(chǎng)景下一樣可以復(fù)用
    關(guān)于HTTP/2的可以參考下面的文章
    https://hpbn.co/optimizing-application-delivery/#eliminate-domain-sharding

最后再來看下newCodec(OkHttpClient, StreamAllocation)方法

    if (http2Connection != null) {
      return new Http2Codec(client, streamAllocation, http2Connection);
    } else {
      socket.setSoTimeout(client.readTimeoutMillis());
      source.timeout().timeout(client.readTimeoutMillis(), MILLISECONDS);
      sink.timeout().timeout(client.writeTimeoutMillis(), MILLISECONDS);
      return new Http1Codec(client, streamAllocation, source, sink);
    }

里面主要是判斷是否是HTTP/2,如果是HTTP/2則new一個(gè)Http2Codec。如果不是HTTP/2則new一個(gè)Http1Codec。

上面提到了connection的跟蹤狀態(tài)由ConncetionPool來管理。

二、ConnectionPool

大家先來看下一個(gè)類的注釋

/**
 * Manages reuse of HTTP and HTTP/2 connections for reduced network latency. HTTP requests that
 * share the same {@link Address} may share a {@link Connection}. This class implements the policy
 * of which connections to keep open for future use.
 */

簡(jiǎn)單的翻譯下,如下:
管理http和http/2的鏈接,以便減少網(wǎng)絡(luò)請(qǐng)求延遲。同一個(gè)address將共享同一個(gè)connection。該類實(shí)現(xiàn)了復(fù)用連接的目標(biāo)。

然后看下這個(gè)類的字段:

/**
   * Background threads are used to cleanup expired connections. There will be at most a single
   * thread running per connection pool. The thread pool executor permits the pool itself to be
   * garbage collected.
   */
  //這是一個(gè)用于清楚過期鏈接的線程池,每個(gè)線程池最多只能運(yùn)行一個(gè)線程,并且這個(gè)線程池允許被垃圾回收
  private static final Executor executor = new ThreadPoolExecutor(0 /* corePoolSize */,
      Integer.MAX_VALUE /* maximumPoolSize */, 60L /* keepAliveTime */, TimeUnit.SECONDS,
      new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp ConnectionPool", true));

  /** The maximum number of idle connections for each address. */
  //每個(gè)address的最大空閑連接數(shù)。
  private final int maxIdleConnections;
  private final long keepAliveDurationNs;
  //清理任務(wù)
  private final Runnable cleanupRunnable = new Runnable() {
    @Override public void run() {
      while (true) {
        long waitNanos = cleanup(System.nanoTime());
        if (waitNanos == -1) return;
        if (waitNanos > 0) {
          long waitMillis = waitNanos / 1000000L;
          waitNanos -= (waitMillis * 1000000L);
          synchronized (ConnectionPool.this) {
            try {
              ConnectionPool.this.wait(waitMillis, (int) waitNanos);
            } catch (InterruptedException ignored) {
            }
          }
        }
      }
    }
  };
  //鏈接的雙向隊(duì)列
  private final Deque<RealConnection> connections = new ArrayDeque<>();
  //路由的數(shù)據(jù)庫
  final RouteDatabase routeDatabase = new RouteDatabase();
   //清理任務(wù)正在執(zhí)行的標(biāo)志
  boolean cleanupRunning;

來看下它的屬性,

  • 1、主要就是connections,可見ConnectionPool內(nèi)部以隊(duì)列方式存儲(chǔ)連接;
  • 2、routDatabase是一個(gè)黑名單,用來記錄不可用的route,但是看代碼貌似ConnectionPool并沒有使用它。所以此處不做分析。
  • 3、剩下的就是和清理有關(guān)了,所以executor是清理任務(wù)的線程池,cleanupRunning是清理任務(wù)的標(biāo)志,cleanupRunnable是清理任務(wù)。

再來看下他的構(gòu)造函數(shù)

  /**
   * Create a new connection pool with tuning parameters appropriate for a single-user application.
   * The tuning parameters in this pool are subject to change in future OkHttp releases. Currently
   * this pool holds up to 5 idle connections which will be evicted after 5 minutes of inactivity.
   */
 //創(chuàng)建一個(gè)適用于單個(gè)應(yīng)用程序的新連接池。
 //該連接池的參數(shù)將在未來的okhttp中發(fā)生改變
 //目前最多可容乃5個(gè)空閑的連接,存活期是5分鐘
  public ConnectionPool() {
    this(5, 5, TimeUnit.MINUTES);
  }

  public ConnectionPool(int maxIdleConnections, long keepAliveDuration, TimeUnit timeUnit) {
    this.maxIdleConnections = maxIdleConnections;
    this.keepAliveDurationNs = timeUnit.toNanos(keepAliveDuration);

    // Put a floor on the keep alive duration, otherwise cleanup will spin loop.
    //保持活著的時(shí)間,否則清理將旋轉(zhuǎn)循環(huán)
    if (keepAliveDuration <= 0) {
      throw new IllegalArgumentException("keepAliveDuration <= 0: " + keepAliveDuration);
    }
  }

通過這個(gè)構(gòu)造器我們知道了這個(gè)連接池最多維持5個(gè)連接,且每個(gè)鏈接最多活5分鐘。并且包含一個(gè)線程池包含一個(gè)清理任務(wù)。
所以maxIdleConnections和keepAliveDurationNs則是清理中淘汰連接的的指標(biāo),這里需要說明的是maxIdleConnections是值每個(gè)地址上最大的空閑連接數(shù)。所以O(shè)kHttp只是限制與同一個(gè)遠(yuǎn)程服務(wù)器的空閑連接數(shù)量,對(duì)整體的空閑連接并沒有限制。

PS:

這時(shí)候說下ConnectionPool的實(shí)例化的過程,一個(gè)OkHttpClient只包含一個(gè)ConnectionPool,其實(shí)例化也是在OkHttpClient的過程。這里說一下ConnectionPool各個(gè)方法的調(diào)用并沒有直接對(duì)外暴露,而是通過OkHttpClient的Internal接口統(tǒng)一對(duì)外暴露。

然后我們來看下他的get和put方法

  /**
   * Returns a recycled connection to {@code address}, or null if no such connection exists. The
   * route is null if the address has not yet been routed.
   */
  RealConnection get(Address address, StreamAllocation streamAllocation, Route route) {
    //斷言,判斷線程是不是被自己鎖住了
    assert (Thread.holdsLock(this));
    // 遍歷已有連接集合
    for (RealConnection connection : connections) { 
       //如果connection和需求中的"地址"和"路由"匹配
      if (connection.isEligible(address, route)) {
       //復(fù)用這個(gè)連接
        streamAllocation.acquire(connection);
        //返回這個(gè)連接
        return connection;
      }
    }
    return null;
  }

get() 方法遍歷 connections 中的所有 RealConnection 尋找同時(shí)滿足條件的RealConnection。

void put(RealConnection connection) {
    assert (Thread.holdsLock(this));
    if (!cleanupRunning) {
      cleanupRunning = true;
      executor.execute(cleanupRunnable);
    }
    connections.add(connection);
  }

put方法更為簡(jiǎn)單,就是異步觸發(fā)清理任務(wù),然后將連接添加到隊(duì)列中。那么下面開始重點(diǎn)分析他的清理任務(wù)。

  private final long keepAliveDurationNs;
  private final Runnable cleanupRunnable = new Runnable() {
    @Override public void run() {
      while (true) {
        long waitNanos = cleanup(System.nanoTime());
        if (waitNanos == -1) return;
        if (waitNanos > 0) {
          long waitMillis = waitNanos / 1000000L;
          waitNanos -= (waitMillis * 1000000L);
          synchronized (ConnectionPool.this) {
            try {
              ConnectionPool.this.wait(waitMillis, (int) waitNanos);
            } catch (InterruptedException ignored) {
            }
          }
        }
      }
    }
  };

這個(gè)邏輯也很簡(jiǎn)單,就是調(diào)用cleanup方法執(zhí)行清理,并等待一段時(shí)間,持續(xù)清理,其中cleanup方法返回的值來來決定而等待的時(shí)間長(zhǎng)度。那我們繼續(xù)來看下cleanup函數(shù):

  /**
   * Performs maintenance on this pool, evicting the connection that has been idle the longest if
   * either it has exceeded the keep alive limit or the idle connections limit.
   *
   * <p>Returns the duration in nanos to sleep until the next scheduled call to this method. Returns
   * -1 if no further cleanups are required.
   */
  long cleanup(long now) {
    int inUseConnectionCount = 0;
    int idleConnectionCount = 0;
    RealConnection longestIdleConnection = null;
    long longestIdleDurationNs = Long.MIN_VALUE;

    // Find either a connection to evict, or the time that the next eviction is due.
    synchronized (this) {
      for (Iterator<RealConnection> i = connections.iterator(); i.hasNext(); ) {
        RealConnection connection = i.next();

        // If the connection is in use, keep searching.
        if (pruneAndGetAllocationCount(connection, now) > 0) {
          inUseConnectionCount++;
          continue;
        }
        //統(tǒng)計(jì)空閑連接數(shù)量
        idleConnectionCount++;

        // If the connection is ready to be evicted, we're done.
        long idleDurationNs = now - connection.idleAtNanos;
        if (idleDurationNs > longestIdleDurationNs) {
          //找出空閑時(shí)間最長(zhǎng)的連接以及對(duì)應(yīng)的空閑時(shí)間
          longestIdleDurationNs = idleDurationNs;
          longestIdleConnection = connection;
        }
      }

      if (longestIdleDurationNs >= this.keepAliveDurationNs
          || idleConnectionCount > this.maxIdleConnections) {
        // We've found a connection to evict. Remove it from the list, then close it below (outside
        // of the synchronized block).
       //在符合清理?xiàng)l件下,清理空閑時(shí)間最長(zhǎng)的連接
        connections.remove(longestIdleConnection);
      } else if (idleConnectionCount > 0) {
        // A connection will be ready to evict soon.
        //不符合清理?xiàng)l件,則返回下次需要執(zhí)行清理的等待時(shí)間,也就是此連接即將到期的時(shí)間
        return keepAliveDurationNs - longestIdleDurationNs;
      } else if (inUseConnectionCount > 0) {
        // All connections are in use. It'll be at least the keep alive duration 'til we run again.
       //沒有空閑的連接,則隔keepAliveDuration(分鐘)之后再次執(zhí)行
        return keepAliveDurationNs;
      } else {
        // No connections, idle or in use.
       //清理結(jié)束
        cleanupRunning = false;
        return -1;
      }
    }
    //關(guān)閉socket資源
    closeQuietly(longestIdleConnection.socket());

    // Cleanup again immediately.
     //這里是在清理一個(gè)空閑時(shí)間最長(zhǎng)的連接以后會(huì)執(zhí)行到這里,需要立即再次執(zhí)行清理
    return 0;
  }

  

這里的首先統(tǒng)計(jì)空閑連接數(shù)量,然后通過for循環(huán)查找最長(zhǎng)空閑時(shí)間的連接以及對(duì)應(yīng)空閑時(shí)長(zhǎng),然后判斷是否超出最大空閑連接數(shù)(maxIdleConnections)或者或者超過最大空閑時(shí)間(keepAliveDurationNs),滿足其一則清除最長(zhǎng)空閑時(shí)長(zhǎng)的連接。如果不滿足清理?xiàng)l件,則返回一個(gè)對(duì)應(yīng)等待時(shí)間。
這個(gè)對(duì)應(yīng)等待的時(shí)間又分二種情況:

  • 1 有連接則等待下次需要清理的時(shí)間去清理:keepAliveDurationNs-longestIdleDurationNs;
  • 2 沒有空閑的連接,則等下一個(gè)周期去清理:keepAliveDurationNs

如果清理完畢返回-1。
綜上所述,我們來梳理一下清理任務(wù),清理任務(wù)就是異步執(zhí)行的,遵循兩個(gè)指標(biāo),最大空閑連接數(shù)量和最大空閑時(shí)長(zhǎng),滿足其一則清理空閑時(shí)長(zhǎng)最大的那個(gè)連接,然后循環(huán)執(zhí)行,要么等待一段時(shí)間,要么繼續(xù)清理下一個(gè)連接,知道清理所有連接,清理任務(wù)才結(jié)束,下一次put的時(shí)候,如果已經(jīng)停止的清理任務(wù)則會(huì)被再次觸發(fā)

/**
   * Prunes any leaked allocations and then returns the number of remaining live allocations on
   * {@code connection}. Allocations are leaked if the connection is tracking them but the
   * application code has abandoned them. Leak detection is imprecise and relies on garbage
   * collection.
   */
  private int pruneAndGetAllocationCount(RealConnection connection, long now) {
    List<Reference<StreamAllocation>> references = connection.allocations;
     //遍歷弱引用列表
    for (int i = 0; i < references.size(); ) {
      Reference<StreamAllocation> reference = references.get(i);
       //若StreamAllocation被使用則接著循環(huán)
      if (reference.get() != null) {
        i++;
        continue;
      }

      // We've discovered a leaked allocation. This is an application bug.
      StreamAllocation.StreamAllocationReference streamAllocRef =
          (StreamAllocation.StreamAllocationReference) reference;
      String message = "A connection to " + connection.route().address().url()
          + " was leaked. Did you forget to close a response body?";
      Platform.get().logCloseableLeak(message, streamAllocRef.callStackTrace);
       //若StreamAllocation未被使用則移除引用,這邊注釋為泄露
      references.remove(i);
      connection.noNewStreams = true;

      // If this was the last allocation, the connection is eligible for immediate eviction.
      //如果列表為空則說明此連接沒有被引用了,則返回0,表示此連接是空閑連接
      if (references.isEmpty()) {
        connection.idleAtNanos = now - keepAliveDurationNs;
        return 0;
      }
    }
    return references.size();
  }

pruneAndGetAllocationCount主要是用來標(biāo)記泄露連接的。內(nèi)部通過遍歷傳入進(jìn)來的RealConnection的StreamAllocation列表,如果StreamAllocation被使用則接著遍歷下一個(gè)StreamAllocation。如果StreamAllocation未被使用則從列表中移除,如果列表中為空則說明此連接連接沒有引用了,返回0,表示此連接是空閑連接,否則就返回非0表示此連接是活躍連接。
接下來讓我看下ConnectionPool的connectionBecameIdle()方法,就是當(dāng)有連接空閑時(shí),喚起cleanup線程清洗連接池

  /**
   * Notify this pool that {@code connection} has become idle. Returns true if the connection has
   * been removed from the pool and should be closed.
   */
  boolean connectionBecameIdle(RealConnection connection) {
    assert (Thread.holdsLock(this));
     //該連接已經(jīng)不可用
    if (connection.noNewStreams || maxIdleConnections == 0) {
      connections.remove(connection);
      return true;
    } else {
      //歡迎clean 線程
      notifyAll(); // Awake the cleanup thread: we may have exceeded the idle connection limit.
      return false;
    }
  }

connectionBecameIdle標(biāo)示一個(gè)連接處于空閑狀態(tài),即沒有流任務(wù),那么久需要調(diào)用該方法,由ConnectionPool來決定是否需要清理該連接。
再來看下deduplicate()方法

  /**
   * Replaces the connection held by {@code streamAllocation} with a shared connection if possible.
   * This recovers when multiple multiplexed connections are created concurrently.
   */
  Socket deduplicate(Address address, StreamAllocation streamAllocation) {
    assert (Thread.holdsLock(this));
    for (RealConnection connection : connections) {
      if (connection.isEligible(address, null)
          && connection.isMultiplexed()
          && connection != streamAllocation.connection()) {
        return streamAllocation.releaseAndAcquire(connection);
      }
    }
    return null;
  }

該方法主要是針對(duì)HTTP/2場(chǎng)景下多個(gè)多路復(fù)用連接清除的場(chǎng)景。如果是當(dāng)前連接是HTTP/2,那么所有指向該站點(diǎn)的請(qǐng)求都應(yīng)該基于同一個(gè)TCP連接。這個(gè)方法比較簡(jiǎn)單就不詳細(xì)說了,再說下另外一個(gè)方法

  /** Close and remove all idle connections in the pool. */
  public void evictAll() {
    List<RealConnection> evictedConnections = new ArrayList<>();
    synchronized (this) {
      for (Iterator<RealConnection> i = connections.iterator(); i.hasNext(); ) {
        RealConnection connection = i.next();
        if (connection.allocations.isEmpty()) {
          connection.noNewStreams = true;
          evictedConnections.add(connection);
          i.remove();
        }
      }
    }
    for (RealConnection connection : evictedConnections) {
      closeQuietly(connection.socket());
    }
  }

該方法是刪除所有空閑的連接,比較簡(jiǎn)單,不說了

三、 StreamAllocation

這個(gè)類很重要,我們先來看下類的注釋

/**
 * This class coordinates the relationship between three entities:
 *
 * <ul>
 *     <li><strong>Connections:</strong> physical socket connections to remote servers. These are
 *         potentially slow to establish so it is necessary to be able to cancel a connection
 *         currently being connected.
 *     <li><strong>Streams:</strong> logical HTTP request/response pairs that are layered on
 *         connections. Each connection has its own allocation limit, which defines how many
 *         concurrent streams that connection can carry. HTTP/1.x connections can carry 1 stream
 *         at a time, HTTP/2 typically carry multiple.
 *     <li><strong>Calls:</strong> a logical sequence of streams, typically an initial request and
 *         its follow up requests. We prefer to keep all streams of a single call on the same
 *         connection for better behavior and locality.
 * </ul>
 *
 * <p>Instances of this class act on behalf of the call, using one or more streams over one or more
 * connections. This class has APIs to release each of the above resources:
 *
 * <ul>
 *     <li>{@link #noNewStreams()} prevents the connection from being used for new streams in the
 *         future. Use this after a {@code Connection: close} header, or when the connection may be
 *         inconsistent.
 *     <li>{@link #streamFinished streamFinished()} releases the active stream from this allocation.
 *         Note that only one stream may be active at a given time, so it is necessary to call
 *         {@link #streamFinished streamFinished()} before creating a subsequent stream with {@link
 *         #newStream newStream()}.
 *     <li>{@link #release()} removes the call's hold on the connection. Note that this won't
 *         immediately free the connection if there is a stream still lingering. That happens when a
 *         call is complete but its response body has yet to be fully consumed.
 * </ul>
 *
 * <p>This class supports {@linkplain #cancel asynchronous canceling}. This is intended to have the
 * smallest blast radius possible. If an HTTP/2 stream is active, canceling will cancel that stream
 * but not the other streams sharing its connection. But if the TLS handshake is still in progress
 * then canceling may break the entire connection.
 */

在講解這個(gè)類的時(shí)候不得不說下背景:
HTTP的版本:
HTTP的版本從最初的1.0版本,到后續(xù)的1.1版本,再到后續(xù)的google推出的SPDY,后來再推出2.0版本,http協(xié)議越來越完善。(ps:okhttp也是根據(jù)2.0和1.1/1.0作為區(qū)分,實(shí)現(xiàn)了兩種連接機(jī)制)這里要說下http2.0和http1.0,1.1的主要區(qū)別,2.0解決了老版本(1.1和1.0)最重要兩個(gè)問題:連接無法復(fù)用和head of line blocking (HOL)問題.2.0使用多路復(fù)用的技術(shù),多個(gè)stream可以共用一個(gè)socket連接,每個(gè)tcp連接都是通過一個(gè)socket來完成的,socket對(duì)應(yīng)一個(gè)host和port,如果有多個(gè)stream(也就是多個(gè)request)都是連接在一個(gè)host和port上,那么它們就可以共同使用同一個(gè)socket,這樣做的好處就是可以減少TCP的一個(gè)三次握手的時(shí)間。在OKHttp里面,記錄一次連接的是RealConnection,這個(gè)負(fù)責(zé)連接,在這個(gè)類里面用socket來連接,用HandShake來處理握手。

在講解這個(gè)類的之前我們先熟悉3個(gè)概念:請(qǐng)求、連接、流。我們要明白HTTP通信執(zhí)行網(wǎng)絡(luò)"請(qǐng)求"需要在"連接"上建立一個(gè)新的"流",我們將StreamAllocation稱之流的橋梁,它負(fù)責(zé)為一次"請(qǐng)求"尋找"連接"并建立"流",從而完成遠(yuǎn)程通信。所以說StreamAllocation與"請(qǐng)求"、"連接"、"流"都有關(guān)。

從注釋我們看到。Connection是建立在Socket之上的物流通信信道,而Stream則是代表邏輯的流,至于Call是對(duì)一次請(qǐng)求過程的封裝。之前也說過一個(gè)Call可能會(huì)涉及多個(gè)流(比如重定向或者auth認(rèn)證等情況)。所以我們想一下,如果StreamAllocation要想解決上述問題,需要兩個(gè)步驟,一是尋找連接,二是獲取流。所以StreamAllocation里面應(yīng)該包含一個(gè)Stream(上文已經(jīng)說到了,OKHttp里面的流是HttpCodec);還應(yīng)該包含連接Connection。如果想找到合適的劉姐,還需要一個(gè)連接池ConnectionPool屬性。所以應(yīng)該有一個(gè)獲取流的方法在StreamAllocation里面是newStream();找到合適的流的方法findConnection();還應(yīng)該有完成請(qǐng)求任務(wù)的之后finish()的方法來關(guān)閉流對(duì)象,還有終止和取消等方法,以及釋放資源的方法。

1、那咱們先就看下他的屬性

  public final Address address;//地址
  private Route route; //路由
  private final ConnectionPool connectionPool;  //連接池
  private final Object callStackTrace; //日志

  // State guarded by connectionPool.
  private final RouteSelector routeSelector; //路由選擇器
  private int refusedStreamCount;  //拒絕的次數(shù)
  private RealConnection connection;  //連接
  private boolean released;  //是否已經(jīng)被釋放
  private boolean canceled  //是否被取消了

看完屬性,我們來看下構(gòu)造函數(shù)


  public StreamAllocation(ConnectionPool connectionPool, Address address, Object callStackTrace) {
    this.connectionPool = connectionPool;
    this.address = address;
    this.routeSelector = new RouteSelector(address, routeDatabase());
    this.callStackTrace = callStackTrace;
  }
 

這時(shí)候我們?cè)賮砜聪滤囊粋€(gè)比較重要的方法

  public HttpCodec newStream(OkHttpClient client, boolean doExtensiveHealthChecks) {
    int connectTimeout = client.connectTimeoutMillis();
    int readTimeout = client.readTimeoutMillis();
    int writeTimeout = client.writeTimeoutMillis();
    boolean connectionRetryEnabled = client.retryOnConnectionFailure();

    try {
      //獲取一個(gè)連接
      RealConnection resultConnection = findHealthyConnection(connectTimeout, readTimeout,
          writeTimeout, connectionRetryEnabled, doExtensiveHealthChecks);
   //實(shí)例化HttpCodec,如果是HTTP/2則是Http2Codec否則是Http1Codec
      HttpCodec resultCodec = resultConnection.newCodec(client, this);

      synchronized (connectionPool) {
        codec = resultCodec;
        return resultCodec;
      }
    } catch (IOException e) {
      throw new RouteException(e);
    }
  }

這里面兩個(gè)重要方法
1是通過findHealthyConnection獲取一個(gè)連接、2是通過resultConnection.newCodec獲取流。
我們接著來看findHealthyConnection()方法

  /**
   * Finds a connection and returns it if it is healthy. If it is unhealthy the process is repeated
   * until a healthy connection is found.
   */
  private RealConnection findHealthyConnection(int connectTimeout, int readTimeout,
      int writeTimeout, boolean connectionRetryEnabled, boolean doExtensiveHealthChecks)
      throws IOException {
    while (true) {
      RealConnection candidate = findConnection(connectTimeout, readTimeout, writeTimeout,
          connectionRetryEnabled);

      // If this is a brand new connection, we can skip the extensive health checks.
      synchronized (connectionPool) {
        if (candidate.successCount == 0) {
          return candidate;
        }
      }

      // Do a (potentially slow) check to confirm that the pooled connection is still good. If it
      // isn't, take it out of the pool and start again.
      if (!candidate.isHealthy(doExtensiveHealthChecks)) {
        noNewStreams();
        continue;
      }

      return candidate;
    }
  }

我們看到里面調(diào)用findConnection來獲取一個(gè)RealConnection,然后通過RealConnection自己的方法isHealthy,去判斷是否是健康的連接,如果是健康的連接,則重用,否則就繼續(xù)查找。那我們繼續(xù)看下findConnection()方法

 /**
   * Returns a connection to host a new stream. This prefers the existing connection if it exists,
   * then the pool, finally building a new connection.
   */
  private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout,
      boolean connectionRetryEnabled) throws IOException {
    Route selectedRoute;
    synchronized (connectionPool) {
      if (released) throw new IllegalStateException("released");
      if (codec != null) throw new IllegalStateException("codec != null");
      if (canceled) throw new IOException("Canceled");
      //獲取存在的連接
      // Attempt to use an already-allocated connection.
      RealConnection allocatedConnection = this.connection;
      if (allocatedConnection != null && !allocatedConnection.noNewStreams) {
          // 如果已經(jīng)存在的連接滿足要求,則使用已存在的連接
        return allocatedConnection;
      }
      //從緩存中去取
      // Attempt to get a connection from the pool.
      Internal.instance.get(connectionPool, address, this, null);
      if (connection != null) {
        return connection;
      }

      selectedRoute = route;
    }
       // 線路的選擇,多ip的支持
    // If we need a route, make one. This is a blocking operation.
    if (selectedRoute == null) {
      //里面是個(gè)遞歸
      selectedRoute = routeSelector.next();
    }

    RealConnection result;
    synchronized (connectionPool) {
      if (canceled) throw new IOException("Canceled");

      // Now that we have an IP address, make another attempt at getting a connection from the pool.
      // This could match due to connection coalescing.
      //更換路由再次嘗試
      Internal.instance.get(connectionPool, address, this, selectedRoute);
      if (connection != null) return connection;

      // Create a connection and assign it to this allocation immediately. This makes it possible
      // for an asynchronous cancel() to interrupt the handshake we're about to do.
      route = selectedRoute;
      refusedStreamCount = 0;
     // 以上都不符合,創(chuàng)建一個(gè)連接
      result = new RealConnection(connectionPool, selectedRoute);
      acquire(result);
    }
     //連接并握手
    // Do TCP + TLS handshakes. This is a blocking operation.
    result.connect(connectTimeout, readTimeout, writeTimeout, connectionRetryEnabled);
    //更新本地?cái)?shù)據(jù)庫
    routeDatabase().connected(result.route());

    Socket socket = null;
    synchronized (connectionPool) {
      // Pool the connection.
      //把連接放到連接池中
      Internal.instance.put(connectionPool, result);
      //如果這個(gè)連接是多路復(fù)用
      // If another multiplexed connection to the same address was created concurrently, then
      // release this connection and acquire that one.
      if (result.isMultiplexed()) {
        //調(diào)用connectionPool的deduplicate方法去重。
        socket = Internal.instance.deduplicate(connectionPool, address, this);
        result = connection;
      }
    }
    //如果是重復(fù)的socket則關(guān)閉socket,不是則socket為nul,什么也不做
    closeQuietly(socket);
    //返回整個(gè)連接
    return result;
  }

上面代碼大概的邏輯是:

  • 1、先找是否有已經(jīng)存在的連接,如果有已經(jīng)存在的連接,并且可以使用(!noNewStreams)則直接返回。
  • 2、根據(jù)已知的address在connectionPool里面找,如果有連接,則返回
  • 3、更換路由,更換線路,在connectionPool里面再次查找,如果有則返回。
  • 4、如果以上條件都不滿足則直接new一個(gè)RealConnection出來
  • 5、new出來的RealConnection通過acquire關(guān)聯(lián)到connection.allocations上
  • 6、做去重判斷,如果有重復(fù)的socket則關(guān)閉

里面涉及到的RealConnection的connect()方法,我們已經(jīng)在RealConnection里面講過,這里就不講了。不過這里說下acquire()方法

  /**
   * Use this allocation to hold {@code connection}. Each call to this must be paired with a call to
   * {@link #release} on the same connection.
   */
  public void acquire(RealConnection connection) {
    assert (Thread.holdsLock(connectionPool));
    if (this.connection != null) throw new IllegalStateException();

    this.connection = connection;
    connection.allocations.add(new StreamAllocationReference(this, callStackTrace));
  }

這里相當(dāng)于給connection的引用計(jì)數(shù)器加1
這里說下StreamAllocationReference,StreamAllocationReference其實(shí)是弱引用的子類。具體代碼如下:

  public static final class StreamAllocationReference extends WeakReference<StreamAllocation> {
    /**
     * Captures the stack trace at the time the Call is executed or enqueued. This is helpful for
     * identifying the origin of connection leaks.
     */
    public final Object callStackTrace;

    StreamAllocationReference(StreamAllocation referent, Object callStackTrace) {
      super(referent);
      this.callStackTrace = callStackTrace;
    }
  }

下面來看下他的他的其他方法streamFinished(boolean, HttpCodec)、release(RealConnection)和deallocate(boolean, boolean, boolean)方法。

  public void streamFinished(boolean noNewStreams, HttpCodec codec) {
    Socket socket;
    synchronized (connectionPool) {
      if (codec == null || codec != this.codec) {
        throw new IllegalStateException("expected " + this.codec + " but was " + codec);
      }
      if (!noNewStreams) {
        connection.successCount++;
      }
      socket = deallocate(noNewStreams, false, true);
    }
    closeQuietly(socket);
  }

/**
   * Releases resources held by this allocation. If sufficient resources are allocated, the
   * connection will be detached or closed. Callers must be synchronized on the connection pool.
   *
   * <p>Returns a closeable that the caller should pass to {@link Util#closeQuietly} upon completion
   * of the synchronized block. (We don't do I/O while synchronized on the connection pool.)
   */
  private Socket deallocate(boolean noNewStreams, boolean released, boolean streamFinished) {
    assert (Thread.holdsLock(connectionPool));

    if (streamFinished) {
      this.codec = null;
    }
    if (released) {
      this.released = true;
    }
    Socket socket = null;
    if (connection != null) {
      if (noNewStreams) {
        connection.noNewStreams = true;
      }
      if (this.codec == null && (this.released || connection.noNewStreams)) {
        release(connection);
        if (connection.allocations.isEmpty()) {
          connection.idleAtNanos = System.nanoTime();
          if (Internal.instance.connectionBecameIdle(connectionPool, connection)) {
            socket = connection.socket();
          }
        }
        connection = null;
      }
    }
    return socket;
  }

  /** Remove this allocation from the connection's list of allocations. */
  private void release(RealConnection connection) {
    for (int i = 0, size = connection.allocations.size(); i < size; i++) {
      Reference<StreamAllocation> reference = connection.allocations.get(i);
      if (reference.get() == this) {
        connection.allocations.remove(i);
        return;
      }
    }
    throw new IllegalStateException();
  }

其中deallocate(boolean, boolean, boolean)和release(RealConnection)方法都是private,而且均在streamFinished里面調(diào)用。
release(RealConnection)方法比較簡(jiǎn)單,主要是把RealConnection對(duì)應(yīng)的allocations清除掉,把計(jì)數(shù)器歸零。
deallocate(boolean, boolean, boolean)方法也簡(jiǎn)單,根據(jù)傳入的三個(gè)布爾類型的值進(jìn)行操作,如果streamFinished為true則代表關(guān)閉流,所以要通過連接池connectionPool把這個(gè)connection設(shè)置空閑連接,如果可以設(shè)為空閑連接則返回這個(gè)socket。不能則返回null。
streamFinished()主要做了一些異常判斷,然后調(diào)用deallocate()方法
綜上所述:streamFinished(boolean, HttpCodec)主要是關(guān)閉流,release(RealConnection)主要是釋放connection的引用,deallocate(boolean, boolean, boolean)主要是根據(jù)參數(shù)做一些設(shè)置。
上面說到了release(RealConnection),為了防止大家混淆概念,這里說一下另外一個(gè)方法release()這個(gè)是無參的方法。

  public void release() {
    Socket socket;
    synchronized (connectionPool) {
      socket = deallocate(false, true, false);
    }
    closeQuietly(socket);
  }

注意這個(gè)和上面的帶有RealConnection的參數(shù)release()的區(qū)別。
然后說一下noNewStreams()方法,主要是設(shè)置防止別人在這個(gè)連接上開新的流。

  /** Forbid new streams from being created on the connection that hosts this allocation. */
  public void noNewStreams() {
    Socket socket;
    synchronized (connectionPool) {
      socket = deallocate(true, false, false);
    }
    closeQuietly(socket);
  }

還有一個(gè)方法,平時(shí)也是經(jīng)常有遇到的就是cancel()方法

  public void cancel() {
    HttpCodec codecToCancel;
    RealConnection connectionToCancel;
    synchronized (connectionPool) {
      canceled = true;
      codecToCancel = codec;
      connectionToCancel = connection;
    }
    if (codecToCancel != null) {
      codecToCancel.cancel();
    } else if (connectionToCancel != null) {
      connectionToCancel.cancel();
    }
  }

其實(shí)也比較簡(jiǎn)單的就是調(diào)用RealConnection的Cancel方法。
如果在連接中過程出現(xiàn)異常,會(huì)調(diào)用streamFailed(IOException)方法

public void streamFailed(IOException e) {
    Socket socket;
    boolean noNewStreams = false;

    synchronized (connectionPool) {
      if (e instanceof StreamResetException) {
        StreamResetException streamResetException = (StreamResetException) e;
        if (streamResetException.errorCode == ErrorCode.REFUSED_STREAM) {
          refusedStreamCount++;
        }
        // On HTTP/2 stream errors, retry REFUSED_STREAM errors once on the same connection. All
        // other errors must be retried on a new connection.
        if (streamResetException.errorCode != ErrorCode.REFUSED_STREAM || refusedStreamCount > 1) {
          noNewStreams = true;
          route = null;
        }
      } else if (connection != null
          && (!connection.isMultiplexed() || e instanceof ConnectionShutdownException)) {
        noNewStreams = true;

        // If this route hasn't completed a call, avoid it for new connections.
        if (connection.successCount == 0) {
          if (route != null && e != null) {
            routeSelector.connectFailed(route, e);
          }
          route = null;
        }
      }
      socket = deallocate(noNewStreams, false, true);
    }

    closeQuietly(socket);
  }

根據(jù)異常類型來采取不同的應(yīng)對(duì)措施。注釋已經(jīng)比較清楚了,就不細(xì)說了。
其他的方法比較簡(jiǎn)單,我這里就不細(xì)說了。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容