okhttp源碼分析(四)-ConnectInterceptor過濾器

1.okhttp源碼分析(一)——基本流程(超詳細(xì))
2.okhttp源碼分析(二)——RetryAndFollowUpInterceptor過濾器
3.okhttp源碼分析(三)——CacheInterceptor過濾器
4.okhttp源碼分析(四)——ConnectInterceptor過濾器
5.okhttp源碼分析(五)——CallServerInterceptor過濾器

前言

前一篇博客分析了CacheInterceptor過濾器,這篇博客主要分析下一個過濾器ConnectInterceptor。其實分析了OkHttp看了這么多代碼,學(xué)習(xí)的不僅僅是OkHttp的處理邏輯和思路,從OkHttp的編程規(guī)范和命名規(guī)則其實也可以學(xué)習(xí)很多,就像過濾器,慢慢會發(fā)現(xiàn)每個過濾器的名字準(zhǔn)確的定位了每個過濾器的任務(wù)。

ConnectInterceptor正如名字所示,是OkHttp中負(fù)責(zé)和服務(wù)器建立連接的過濾器,其實到這里已經(jīng)可以慢慢意識到OkHttp已經(jīng)和Android已有的網(wǎng)絡(luò)框架Volley,Android-async-http等的不同,Volley的底層是提供HttpStack的接口,利用策略模式,這里對版本進(jìn)行了判斷,大于等于2.3則創(chuàng)建HttpURLConnection,小于則創(chuàng)建HttpClientStack,也就是說實際上與服務(wù)器建立連接的是利用Google提供的兩種連接服務(wù)器的類(具體可以看我原來分析過的Volley源碼系列)。也就是說Volley開發(fā)的層次面到此也就結(jié)束了。但是這里OkHttp的不同點(diǎn)就很明顯,OkHttp沒有單純的直接使用上面提到的Google提供的現(xiàn)有的HttpURLConnection等類來直接建立連接,而是專門使用一個過濾器用于建立連接,也就是說在建立連接的層面也有開發(fā)和優(yōu)化(Okio)。我的理解,從開發(fā)歸屬層面來說其實Okhttp是更深層次的,將建立網(wǎng)絡(luò)請求優(yōu)化一直衍生到Socket連接的。

分析

1.宏觀流程

一樣的配方,先從大體流程上對這個過濾器進(jìn)行理解,當(dāng)然就是看這個過濾器的intercept方法。出乎意料的是這個過濾器單從這個方法來看沒有成噸的方法和代碼行,不需要我們做過多的刪減。

@Override public Response intercept(Chain chain) throws IOException {
    RealInterceptorChain realChain = (RealInterceptorChain) chain;
    Request request = realChain.request();
    StreamAllocation streamAllocation = realChain.streamAllocation();

    boolean doExtensiveHealthChecks = !request.method().equals("GET");
    //建立HttpCodec
    HttpCodec httpCodec = streamAllocation.newStream(client, chain, doExtensiveHealthChecks);
    //獲取連接
    RealConnection connection = streamAllocation.connection();

    return realChain.proceed(request, streamAllocation, httpCodec, connection);
  }

可以看到這里流程上看很簡單:

1.建立HttpCodec對象。
2.調(diào)用streamAllocation.connection()獲取連接。

所以大體的流程上來看可以看出這個過濾器的作用就是來建立連接的。

2.過程細(xì)節(jié)

(1)HttpCodec

這里第一步是HttpCodec的建立過程。所以理所當(dāng)然首先要了解一下HttpCodec是個什么東西。

/** Encodes HTTP requests and decodes HTTP responses. */
public interface HttpCodec {
  int DISCARD_STREAM_TIMEOUT_MILLIS = 100;
  Sink createRequestBody(Request request, long contentLength);
  void writeRequestHeaders(Request request) throws IOException;
  void flushRequest() throws IOException;
  void finishRequest() throws IOException;
  Response.Builder readResponseHeaders(boolean expectContinue) throws IOException;
  ResponseBody openResponseBody(Response response) throws IOException;
  void cancel();
}

不出意外這是個借口,不得不說Okhttp面向接口編程的思想體現(xiàn)的真的很好。這里我特意把這個類的注釋留了下來,通過注釋我們知道了這個接口的作用是編碼和解碼HTTP響應(yīng)HTTP請求。順便看一個方法其實也可以看出個大概。

(2)streamAllocation.newStream

public HttpCodec newStream(
      OkHttpClient client, Interceptor.Chain chain, boolean doExtensiveHealthChecks) {
    int connectTimeout = chain.connectTimeoutMillis();
    int readTimeout = chain.readTimeoutMillis();
    int writeTimeout = chain.writeTimeoutMillis();
    boolean connectionRetryEnabled = client.retryOnConnectionFailure();

    try {
      RealConnection resultConnection = findHealthyConnection(connectTimeout, readTimeout,
          writeTimeout, connectionRetryEnabled, doExtensiveHealthChecks);
      //建立HttpCodec
      HttpCodec resultCodec = resultConnection.newCodec(client, chain, this);

      synchronized (connectionPool) {
        codec = resultCodec;
        return resultCodec;
      }
    } catch (IOException e) {
      throw new RouteException(e);
    }
  }

接下來就要看這個HttpCodec的建立過程,可以看到這里其實就兩步。

1.findHealthyConnection找到一條“健康”的連接
2.建立HttpCodec

這里先看你findHealthyConnection這個方法,一開始我是很懵的,何為“健康”。

private RealConnection findHealthyConnection(int connectTimeout, int readTimeout,
      int writeTimeout, boolean connectionRetryEnabled, boolean doExtensiveHealthChecks)
      throws IOException {
    while (true) { //循環(huán)查找一個鏈接
      RealConnection candidate = findConnection(connectTimeout, readTimeout, writeTimeout,
          connectionRetryEnabled);

      // If this is a brand new connection, we can skip the extensive health checks.
      synchronized (connectionPool) {
        if (candidate.successCount == 0) {
          return candidate;
        }
      }
      // Do a (potentially slow) check to confirm that the pooled connection is still good. If it
      // isn't, take it out of the pool and start again.
      //如果這條連接不健康
      if (!candidate.isHealthy(doExtensiveHealthChecks)) {
        //禁止這條連接
        noNewStreams();
        continue;
      }
      return candidate;
    }
  }

這里的流程是這樣的,其實從方法層面上來看流程還是比較好理解的。

RealConnection candidate = findConnection(connectTimeout, readTimeout, writeTimeout,connectionRetryEnabled);

1).while循環(huán)遍歷尋找一個連接,既然是遍歷就會發(fā)現(xiàn)OkHttp中是存在連接池的概念的,這也是OkHttp中的一個特有的優(yōu)化。

synchronized (connectionPool) {
        if (candidate.successCount == 0) {
          return candidate;
        }
      }

2)日常線程安全措施,如果建立的連接candidate是新建立的(新的當(dāng)然還沒有用過,所以successCount=0),直接返回,不再需要后面的“健康檢查”。這里的線程安全當(dāng)然是保證當(dāng)兩個線程同事進(jìn)行檢查的時候發(fā)生的情況,保證線程安全。

//如果這條連接不健康
      if (!candidate.isHealthy(doExtensiveHealthChecks)) {
        //禁止這條連接
        noNewStreams();
        continue;
      }

3)進(jìn)行安全檢查,如果不健康了,禁止這條連接,繼續(xù)執(zhí)行循環(huán),繼續(xù)在連接池中查找能用的連接。
4)返回得到的連接。

這里在來詳細(xì)看一下1)個步驟,也就是查找健康的連接這個過程findConnection。

private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout,
      boolean connectionRetryEnabled) throws IOException {
    boolean foundPooledConnection = false;
    RealConnection result = null;
    Route selectedRoute = null;
    Connection releasedConnection;
    Socket toClose;
    synchronized (connectionPool) {
      if (released) throw new IllegalStateException("released");
      if (codec != null) throw new IllegalStateException("codec != null");
      if (canceled) throw new IOException("Canceled");

      // Attempt to use an already-allocated connection. We need to be careful here because our
      // already-allocated connection may have been restricted from creating new streams.
      releasedConnection = this.connection;
      toClose = releaseIfNoNewStreams();
      if (this.connection != null) {
        //如果當(dāng)前connection不為空可以直接使用
        // We had an already-allocated connection and it's good.
        result = this.connection;
        releasedConnection = null;
      }
      if (!reportedAcquired) {
        // If the connection was never reported acquired, don't report it as released!
        releasedConnection = null;
      }

      //當(dāng)前這個connection不能使用,嘗試從連接池里面獲取一個請求
      if (result == null) {
        // Attempt to get a connection from the pool.
        //Internal是一個抽象類,instance是在OkHttpClient中實現(xiàn)的,get方法實現(xiàn)的時候從pool的get方法
        Internal.instance.get(connectionPool, address, this, null);
        if (connection != null) {
          foundPooledConnection = true;
          result = connection;
        } else {
          selectedRoute = route;
        }
      }
    }
    closeQuietly(toClose);
    //釋放一條連接,回調(diào)
    if (releasedConnection != null) {
      eventListener.connectionReleased(call, releasedConnection);
    }
    //如果找到復(fù)用的,則使用這條連接,回調(diào)
    if (foundPooledConnection) {
      eventListener.connectionAcquired(call, result);
    }
    if (result != null) {
      //找到一條可復(fù)用的連接
      // If we found an already-allocated or pooled connection, we're done.
      return result;
    }

    // If we need a route selection, make one. This is a blocking operation.
    boolean newRouteSelection = false;
    //切換路由再在連接池里面找下,如果有則返回
    if (selectedRoute == null && (routeSelection == null || !routeSelection.hasNext())) {
      newRouteSelection = true;
      routeSelection = routeSelector.next();
    }

    synchronized (connectionPool) {
      if (canceled) throw new IOException("Canceled");

      if (newRouteSelection) {
        // Now that we have a set of IP addresses, make another attempt at getting a connection from
        // the pool. This could match due to connection coalescing.
        //遍歷RooteSelector
        List<Route> routes = routeSelection.getAll();
        for (int i = 0, size = routes.size(); i < size; i++) {
          Route route = routes.get(i);
          Internal.instance.get(connectionPool, address, this, route);
          if (connection != null) {
            foundPooledConnection = true;
            result = connection;
            this.route = route;
            break;
          }
        }
      }

      if (!foundPooledConnection) {
        //沒找到則創(chuàng)建一條
        if (selectedRoute == null) {
          selectedRoute = routeSelection.next();
        }

        // Create a connection and assign it to this allocation immediately. This makes it possible
        // for an asynchronous cancel() to interrupt the handshake we're about to do.
        route = selectedRoute;
        refusedStreamCount = 0;
        result = new RealConnection(connectionPool, selectedRoute);
        //往連接中增加流
        acquire(result, false);
      }
    }

    // If we found a pooled connection on the 2nd time around, we're done.
    //如果第二次找到了可以復(fù)用的,則返回
    if (foundPooledConnection) {
      eventListener.connectionAcquired(call, result);
      return result;
    }

    // Do TCP + TLS handshakes. This is a blocking operation.
    // 建立連接,開始握手
    result.connect(
        connectTimeout, readTimeout, writeTimeout, connectionRetryEnabled, call, eventListener);
    //將這條路由從錯誤緩存中清除
    routeDatabase().connected(result.route());

    Socket socket = null;
    synchronized (connectionPool) {
      reportedAcquired = true;

      // Pool the connection.
      //將這個請求加入連接池
      Internal.instance.put(connectionPool, result);

      // If another multiplexed connection to the same address was created concurrently, then
      // release this connection and acquire that one.
      // 如果是多路復(fù)用,則合并
      if (result.isMultiplexed()) {
        socket = Internal.instance.deduplicate(connectionPool, address, this);
        result = connection;
      }
    }
    closeQuietly(socket);

    eventListener.connectionAcquired(call, result);
    return result;
  }

這里我們一步一步看。首先這里需要提前這個函數(shù)體中的相關(guān)變量,便于后面對過程的理解。

    boolean foundPooledConnection = false;
    RealConnection result = null;
    Route selectedRoute = null;
    Connection releasedConnection;
    Socket toClose;

foundPooledConnection對應(yīng)是否在連接池中找到Connection。
result對應(yīng)找到的可用的連接。
seletedRoute對應(yīng)找到的路由。
releasedConnection對應(yīng)可以釋放的連接、
toClose對應(yīng)需要關(guān)閉的連接。
下面開始看流程:

      releasedConnection = this.connection;
      toClose = releaseIfNoNewStreams();
      if (this.connection != null) {
        //如果當(dāng)前connection不為空可以直接使用
        // We had an already-allocated connection and it's good.
        result = this.connection;
        releasedConnection = null;
      }

這里如果當(dāng)前的StreamAllocation持有的Connection先賦值給releasedConnection,執(zhí)行releaseIfNoNewStreams()方法獲得需要關(guān)閉的Socket。如果當(dāng)前的Connection不為空,則非常棒(注釋...),暫且將這個連接賦值個result,將releasedConnection賦值為空。這里看一下releaseIfNoNewStreams()方法。

/**
   * Releases the currently held connection and returns a socket to close if the held connection
   * restricts new streams from being created. With HTTP/2 multiple requests share the same
   * connection so it's possible that our connection is restricted from creating new streams during
   * a follow-up request.
   */
  private Socket releaseIfNoNewStreams() {
    assert (Thread.holdsLock(connectionPool));
    RealConnection allocatedConnection = this.connection;
    if (allocatedConnection != null && allocatedConnection.noNewStreams) {
      return deallocate(false, false, true);
    }
    return null;
  }

這里先從看注釋看一下,其實注釋寫的很清楚

釋放當(dāng)前的連接,返回一個socket為了防止當(dāng)前的連接限制了新的連接被create。由于Http2多個請求可以用一條連接的特性,所以我們連接可能會限制后續(xù)的請求。
從代碼上看,先將當(dāng)前的Connection賦值給需要回收的連接allocatedConnection,如果allocatedConnection不為空(也就是當(dāng)前的連接不為空),并且當(dāng)前的連接沒有新的流可以創(chuàng)建,則釋放這條連接。否則返回空。所以這個方法的作用其實可以歸結(jié)到以下幾點(diǎn):
1.如果當(dāng)前這條連接為空,也就是沒有連接,直接返回null.
2.如果當(dāng)期這條連接不為空,并且還可以創(chuàng)建流(也就是還可以用),返回null.
3.如果當(dāng)前這條連接不為空,并且不能再創(chuàng)建流了(不能用了),回收。

這里看一下回收的方法deallocate。

/**
   * Releases resources held by this allocation. If sufficient resources are allocated, the
   * connection will be detached or closed. Callers must be synchronized on the connection pool.
   *
   * <p>Returns a closeable that the caller should pass to {@link Util#closeQuietly} upon completion
   * of the synchronized block. (We don't do I/O while synchronized on the connection pool.)
   */
  private Socket deallocate(boolean noNewStreams, boolean released, boolean streamFinished) {
    assert (Thread.holdsLock(connectionPool));

    if (streamFinished) {
      this.codec = null;
    }
    if (released) {
      this.released = true;
    }
    Socket socket = null;
    if (connection != null) {
      if (noNewStreams) {
        connection.noNewStreams = true;
      }
      if (this.codec == null && (this.released || connection.noNewStreams)) {
        release(connection);
        if (connection.allocations.isEmpty()) {
          connection.idleAtNanos = System.nanoTime();
          if (Internal.instance.connectionBecameIdle(connectionPool, connection)) {
            socket = connection.socket();
          }
        }
        connection = null;
      }
    }
    return socket;
  }

從注釋上看,其實就可以發(fā)現(xiàn)這個方法的作用其實就是回收資源,也就是將所持有的資源至空,關(guān)閉。
這里可以看一下做了哪些。

1.codec = null
2.released = true
3.noNewStreams = true
4.connection = null
5.返回connection對應(yīng)的socket

其實可以看到,當(dāng)這一系列方法執(zhí)行完后,如果有可以回收關(guān)閉的Connection,則最后釋放Connection持有的資源后,返回了這個Connection對應(yīng)的Socket給toClose。接下來看下一步。

      //當(dāng)前這個connection不能使用,嘗試從連接池里面獲取一個請求
      if (result == null) {
        // Attempt to get a connection from the pool.
        //Internal是一個抽象類,instance是在OkHttpClient中實現(xiàn)的,get方法實現(xiàn)的時候從pool的get方法
        Internal.instance.get(connectionPool, address, this, null);
        if (connection != null) {
          foundPooledConnection = true;
          result = connection;
        } else {
          selectedRoute = route;
        }
      }

可以看到這里如果上面的第一個沒有合適的連接,result==null,這時候就是OkHttp的獨(dú)特之處:連接池。

Internal.instance.get(connectionPool, address, this, null);


//Internal.java
public abstract class Internal {
  ...
  public static Internal instance;
  ...
}

這里可以看到用到了Internal這個對象,這個對象通過查看源碼可以發(fā)現(xiàn)是一個抽象類,并且實際上調(diào)用的是instance對象,看到這個名詞其實第一個反應(yīng)就是單例模式,源碼上看也沒錯,這里確實是單例模式中的類似餓漢模式。但是不同的是這里的初始化并沒有在這里寫,其實也難怪,這個類是抽象類,是不能初始化的,所以這里我們就需要找到這個抽象類的實現(xiàn)類。通過尋找可以發(fā)現(xiàn)這個類的實現(xiàn)類的初始化是在OkHttpClient中,這里進(jìn)到源碼中看一看。

static {
    Internal.instance = new Internal() {
    ...
      @Override public RealConnection get(ConnectionPool pool, Address address,
          StreamAllocation streamAllocation, Route route) {
        return pool.get(address, streamAllocation, route);
      }
    ...
    }

這里可以看到這里初始化是在靜態(tài)代碼塊中寫的,也就是在類加載的時候初始化的,這里我們調(diào)用了get方法,其實可以看到實際上調(diào)用的是ConnectionPool.get()方法。所以繼續(xù)看源碼。

/**
   * Returns a recycled connection to {@code address}, or null if no such connection exists. The
   * route is null if the address has not yet been routed.
   */
  @Nullable RealConnection get(Address address, StreamAllocation streamAllocation, Route route) {
    //這種方法的目的是允許一個程序斷言當(dāng)前線程已經(jīng)持有指定的鎖
    assert (Thread.holdsLock(this));
    for (RealConnection connection : connections) {
      if (connection.isEligible(address, route)) {
        //連接池里面存在可以復(fù)用的連接
        //往連接池中這條可以復(fù)用的連接增加一條流
        streamAllocation.acquire(connection, true);
        return connection;
      }
    }
    return null;
  }

注釋其實也可以幫助我們理解,這里就不翻譯了,其實代碼也比較清楚,遍歷pool中的connections(ArrayQueue),如果連接是可以復(fù)用的,則將這個連接返回。
這里看一下判斷連接可以復(fù)用的isEligible()方法。

/**
   * Returns true if this connection can carry a stream allocation to {@code address}. If non-null
   * {@code route} is the resolved route for a connection.
   */
  public boolean isEligible(Address address, @Nullable Route route) {
    // If this connection is not accepting new streams, we're done.
    //如果當(dāng)前這次連接的最大并發(fā)數(shù)達(dá)到上限,false
    if (allocations.size() >= allocationLimit || noNewStreams) return false;

    // If the non-host fields of the address don't overlap, we're done.
    //如果兩個address的其他參數(shù)不相同,false
    if (!Internal.instance.equalsNonHost(this.route.address(), address)) return false;

    // If the host exactly matches, we're done: this connection can carry the address.
    //如果兩個address的url的host相同,true,復(fù)用這條連接
    if (address.url().host().equals(this.route().address().url().host())) {
      return true; // This connection is a perfect match.
    }
    //如果上面的不符合,在下面的情況下可以合并連接
    // At this point we don't have a hostname match. But we still be able to carry the request if
    // our connection coalescing requirements are met. See also:
    // https://hpbn.co/optimizing-application-delivery/#eliminate-domain-sharding
    // https://daniel.haxx.se/blog/2016/08/18/http2-connection-coalescing/
    //首先這個連接需要時HTTP/2
    // 1. This connection must be HTTP/2.
    if (http2Connection == null) return false;

    // 2. The routes must share an IP address. This requires us to have a DNS address for both
    // hosts, which only happens after route planning. We can't coalesce connections that use a
    // proxy, since proxies don't tell us the origin server's IP address.
    if (route == null) return false;
    //代理不可以
    if (route.proxy().type() != Proxy.Type.DIRECT) return false;
    if (this.route.proxy().type() != Proxy.Type.DIRECT) return false;
    //IP address需要相同
    if (!this.route.socketAddress().equals(route.socketAddress())) return false;

    // 3. This connection's server certificate's must cover the new host.
    //這個連接的服務(wù)器證書必須覆蓋新的主機(jī)。
    if (route.address().hostnameVerifier() != OkHostnameVerifier.INSTANCE) return false;
    if (!supportsUrl(address.url())) return false;

    // 4. Certificate pinning must match the host.
    //證書將必須匹配主機(jī)
    try {
      address.certificatePinner().check(address.url().host(), handshake().peerCertificates());
    } catch (SSLPeerUnverifiedException e) {
      return false;
    }

    return true; // The caller's address can be carried by this connection.
  }

這里其實涉及到的其實是比較多的HTTP和HTTP/2的知識,原理細(xì)節(jié)上準(zhǔn)備后期入手本書研究研究,這里其實流程上理解還是比較容易的,總結(jié)一下,這里連接池里的一個連接可以復(fù)用的判定條件有這幾個(注釋我寫的也比較清楚):

1.當(dāng)前的連接的最大并發(fā)數(shù)不能達(dá)到上限,否則不能復(fù)用
2.兩個連接的address的Host不相同,不能復(fù)用
3.1、2通過后,url的host相同則可以復(fù)用
4.如果3中url的host不相同,可以通過合并連接實現(xiàn)復(fù)用
5.但首先這個連接需要時HTTP/2
6.不能是代理
7.IP的address要相同
8.這個連接的服務(wù)器證書必須覆蓋新的主機(jī)
9.證書將必須匹配主機(jī)
10.以上都不行,則這個連接就不能復(fù)用

其實這里主要就是分為兩種復(fù)用方式:一.host相同直接復(fù)用連接。二.如果是HTTP/2,通過其特性合并連接復(fù)用。
這里看完判斷連接是否合格的方法后,就執(zhí)行acquire()方法,這里來看一下。

/**
//pool中的get方法
   streamAllocation.acquire(connection, true);
//StreamAllocation中的acquire方法
   * Use this allocation to hold {@code connection}. Each call to this must be paired with a call to
   * {@link #release} on the same connection.
   */
  public void acquire(RealConnection connection, boolean reportedAcquired) {
    assert (Thread.holdsLock(connectionPool));
    if (this.connection != null) throw new IllegalStateException();

    this.connection = connection;
    this.reportedAcquired = reportedAcquired;
    //往這條連接中增加一條流
    connection.allocations.add(new StreamAllocationReference(this, callStackTrace));
  }

可以看到這里如果通過isEligible()判斷通過后,執(zhí)行acquire方法。
這里講reportedAcquired設(shè)置為true,并向connection持有的allocations中增加了一條新new的流的弱引用,也就是往這條連接中增加了一條流。
至此從連接池中的get方法也分析結(jié)束了,要回到我們的主線方法中了,也就是findConnection()方法中。

//當(dāng)前這個connection不能使用,嘗試從連接池里面獲取一個請求
      if (result == null) {
        // Attempt to get a connection from the pool.
        //Internal是一個抽象類,instance是在OkHttpClient中實現(xiàn)的,get方法實現(xiàn)的時候從pool的get方法
        Internal.instance.get(connectionPool, address, this, null);
        if (connection != null) {
          foundPooledConnection = true;
          result = connection;
        } else {
          selectedRoute = route;
        }
      }
    }
    closeQuietly(toClose);
    //釋放一條連接,回調(diào)
    if (releasedConnection != null) {
      eventListener.connectionReleased(call, releasedConnection);
    }
    //如果找到復(fù)用的,則使用這條連接,回調(diào)
    if (foundPooledConnection) {
      eventListener.connectionAcquired(call, result);
    }
    if (result != null) {
      //找到一條可復(fù)用的連接
      // If we found an already-allocated or pooled connection, we're done.
      return result;
    }

可以看到這里執(zhí)行完get方法后,如果connection!=null,則標(biāo)記foundPooledConnection = true,將connection賦值給result,沒找到則保存當(dāng)前路由route到selectedRoute變量。執(zhí)行完這一系列東西后則是一些關(guān)閉和回調(diào)操作,最后如果找到了可用的連接,則返回這條可以復(fù)用的連接。

// If we need a route selection, make one. This is a blocking operation.
    boolean newRouteSelection = false;
    //切換路由再在連接池里面找下,如果有則返回
    if (selectedRoute == null && (routeSelection == null || !routeSelection.hasNext())) {
      newRouteSelection = true;
      routeSelection = routeSelector.next();
    }

如果上面沒有找到可以復(fù)用的連接,則繼續(xù)執(zhí)行下面的步驟,可以看這里其實做的是切換路由的操作。

synchronized (connectionPool) {
      if (canceled) throw new IOException("Canceled");

      if (newRouteSelection) {
        // Now that we have a set of IP addresses, make another attempt at getting a connection from
        // the pool. This could match due to connection coalescing.
        //遍歷RooteSelector
        List<Route> routes = routeSelection.getAll();
        for (int i = 0, size = routes.size(); i < size; i++) {
          Route route = routes.get(i);
          Internal.instance.get(connectionPool, address, this, route);
          if (connection != null) {
            foundPooledConnection = true;
            result = connection;
            this.route = route;
            break;
          }
        }
      }

     ...
    }

可以到這里切換完路由后,其實就是遍歷路由,再執(zhí)行一次上面分析過的Internal.instance.get()方法,也就是在切換完路由后再嘗試在連接池中尋找可以復(fù)用的連接.

if (!foundPooledConnection) {
        //沒找到則創(chuàng)建一條
        if (selectedRoute == null) {
          selectedRoute = routeSelection.next();
        }

        // Create a connection and assign it to this allocation immediately. This makes it possible
        // for an asynchronous cancel() to interrupt the handshake we're about to do.
        route = selectedRoute;
        refusedStreamCount = 0;
        result = new RealConnection(connectionPool, selectedRoute);
        //往連接中增加流
        acquire(result, false);
      }

如果經(jīng)歷了上面的操作后還是沒有找到可以復(fù)用的連接,那么則創(chuàng)建一個新的連接,終于看到了RealConnection的構(gòu)造方法,new了一個新的RealConnection,并賦值給result,并執(zhí)行了上面分析過的acquire()方法,往new的連接中加入了流。

當(dāng)然這里還沒有說如果剛在交換路由后找到可以復(fù)用的連接怎么辦,接著往下看。

// If we found a pooled connection on the 2nd time around, we're done.
    //如果第二次找到了可以復(fù)用的,則返回
    if (foundPooledConnection) {
      eventListener.connectionAcquired(call, result);
      return result;
    }

可以看到如果第二次找到了,同樣回調(diào),然后返回找到的連接。

// Do TCP + TLS handshakes. This is a blocking operation.
    // 建立連接,開始握手
    result.connect(
        connectTimeout, readTimeout, writeTimeout, connectionRetryEnabled, call, eventListener);
    //將這條路由從錯誤緩存中清除
    

    Socket socket = null;
    synchronized (connectionPool) {
      reportedAcquired = true;

      // Pool the connection.
      //將這個請求加入連接池
      Internal.instance.put(connectionPool, result);

      // If another multiplexed connection to the same address was created concurrently, then
      // release this connection and acquire that one.
      // 如果是多路復(fù)用,則合并
      if (result.isMultiplexed()) {
        socket = Internal.instance.deduplicate(connectionPool, address, this);
        result = connection;
      }
    }
    closeQuietly(socket);

    eventListener.connectionAcquired(call, result);
    return result;

接下來的這些代碼都是基于沒有找到可以復(fù)用的連接這一前提下的,沒有找到可以復(fù)用的,則是上面6)new出來的新的連接,所以接下來的代碼就是執(zhí)行connect()方法,里面其實就涉及到三次握手連接流程了。
后面執(zhí)行的這行代碼特意說一下,routeDatabase().connected(result.route());
單從代碼上看,可能只是理解為往數(shù)據(jù)庫記錄了下這個路由的記錄,但是詳細(xì)進(jìn)入看一下源碼。

public final class RouteDatabase {
  //這個太屌了,錯誤緩存,錯誤過的連接會被緩存,防止錯誤請求重復(fù)請求
  private final Set<Route> failedRoutes = new LinkedHashSet<>();

  /** Records a failure connecting to {@code failedRoute}. */
  public synchronized void failed(Route failedRoute) {
    failedRoutes.add(failedRoute);
  }

  /** Records success connecting to {@code route}. */
  public synchronized void connected(Route route) {
    failedRoutes.remove(route);
  }

  /** Returns true if {@code route} has failed recently and should be avoided. */
  public synchronized boolean shouldPostpone(Route route) {
    return failedRoutes.contains(route);
  }
}

代碼很簡單,思想和全面,用Set<>集合保存錯誤過的數(shù)據(jù)集,因為是new出來的連接,所有肯定沒有錯誤,所以講這個路由從set中移除,防止多余的檢測,那么對應(yīng)的就可以聯(lián)想到這里肯定有如果路由發(fā)生過錯誤的記錄,每次使用前先檢查一下,如果原來錯誤過,就不用執(zhí)行后面的流程了(考慮的很全面有木有,相當(dāng)于緩存了發(fā)生過錯誤的信息)
執(zhí)行完這個后,就將這new得到的連接加入連接池,
Internal.instance.put(connectionPool, result);
后面還有個多路合并的判斷,但是具體細(xì)節(jié)這里就不深入了(需要詳細(xì)了解HTTP+底層代碼)。
至此:findConnection()分析完了,這里大體總結(jié)一下流程吧:

1.嘗試當(dāng)前連接是否可以復(fù)用。
2.嘗試連接池中找可以復(fù)用的連接
3.切換路由,繼續(xù)在連接中嘗試找可以復(fù)用的連接
4.以上都沒有則new一個新的。

到這里其實findHealthyConnection()也分析完了,過程在上上上……上面已經(jīng)分析了。。。再回到主流程上了。

try {
      RealConnection resultConnection = findHealthyConnection(connectTimeout, readTimeout,
          writeTimeout, connectionRetryEnabled, doExtensiveHealthChecks);
      //建立HttpCodec
      HttpCodec resultCodec = resultConnection.newCodec(client, chain, this);

      synchronized (connectionPool) {
        codec = resultCodec;
        return resultCodec;
      }
    } catch (IOException e) {
      throw new RouteException(e);
    }

可以看到找到健康的連接后,執(zhí)行了newCodec方法,得到了HttpCodec實例,這個上面我們已經(jīng)分析過了,是一個接口,只是這里再放一下,便于回顧:

Encodes HTTP requests and decodes HTTP responses

這里就看一下newCodec方法

public HttpCodec newCodec(OkHttpClient client, Interceptor.Chain chain,
      StreamAllocation streamAllocation) throws SocketException {
    if (http2Connection != null) {
      return new Http2Codec(client, chain, streamAllocation, http2Connection);
    } else {
      socket.setSoTimeout(chain.readTimeoutMillis());
      source.timeout().timeout(chain.readTimeoutMillis(), MILLISECONDS);
      sink.timeout().timeout(chain.writeTimeoutMillis(), MILLISECONDS);
      return new Http1Codec(client, streamAllocation, source, sink);
    }
  }

可以看到這里其實就是判斷是Http還是Http2,然后根據(jù)策略模式最后返回。
至此ConnectInterceptor過濾器的的全部流程就分析完了。再放一下主要方法的代碼:

@Override public Response intercept(Chain chain) throws IOException {
    RealInterceptorChain realChain = (RealInterceptorChain) chain;
    Request request = realChain.request();
    StreamAllocation streamAllocation = realChain.streamAllocation();

    // We need the network to satisfy this request. Possibly for validating a conditional GET.
    boolean doExtensiveHealthChecks = !request.method().equals("GET");
    //建立HttpCodec
    HttpCodec httpCodec = streamAllocation.newStream(client, chain, doExtensiveHealthChecks);
    //返回RealConnection
    RealConnection connection = streamAllocation.connection();

    return realChain.proceed(request, streamAllocation, httpCodec, connection);
  }

到此。。。結(jié)束了。。。沒有結(jié)束語。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容