OkHttp源碼剖析

大家好,我是Cooper,一名熱愛技術(shù)的 Android 開發(fā),本文宗旨在于幫助大家快速梳理OkHttp的源碼流程, 本文基于okhttp-4.9.0

0x01 OkHttpClient

解釋OkHttpClient之前,我們先了解下 Call 的定義:

interface Call : Cloneable {
    fun interface Factory { 
        fun newCall(request: Request): Call
    }
}

fun interface 是kotlin 1.4 新加的函數(shù)式接口, OkHttpClient 實(shí)現(xiàn)了此接口

Call 是一個(gè)已經(jīng)準(zhǔn)備好執(zhí)行的請(qǐng)求,可以取消,因?yàn)檫@個(gè)對(duì)象表示單個(gè)請(qǐng)求或者響應(yīng)對(duì)(流),因此無法執(zhí)行兩次

OkHttpClient其實(shí)就是Call的工廠,它可以用來發(fā)送HTTP請(qǐng)求和讀取其響應(yīng)

注意,OkHttpClients應(yīng)該被共享,原因如下:

當(dāng)你創(chuàng)建單個(gè)OkHttpClient實(shí)例并將其用于所有HTTP調(diào)用時(shí),OkHttp的性能最佳。這是因?yàn)槊總€(gè)客戶端都擁有自己的連接池和線程池。復(fù)用連接和線程可減少延遲并節(jié)省內(nèi)存。相反,為每個(gè)請(qǐng)求創(chuàng)建客戶端都會(huì)浪費(fèi)空閑池上的資源

另外,通過 newBuilder() 方法可以自定義共享的OkHttpClient實(shí)例,這樣可以構(gòu)建共享相同連接池,線程池和配置的客戶端。使用此方法可以為特定目的配置派生的客戶端

Shutdown 不是必要的

如果保留的線程和連接保持空閑狀態(tài),他們會(huì)自動(dòng)釋放。但是如果應(yīng)用程序需要主動(dòng)釋放資源,那么可以如下做:

client.dispatcher().executorService().shutdown()

client.connectionPool().evictAll()

client.cache().close()

OkHttp還使用守護(hù)程序線程進(jìn)行HTTP / 2連接。 如果它們保持空閑狀態(tài),它們將自動(dòng)退出。

/* Builder 主要源碼 */
class Builder constructor() {
    internal var dispatcher: Dispatcher = Dispatcher()
    internal var connectionPool: ConnectionPool = ConnectionPool()
    internal val interceptors: MutableList<Interceptor> = mutableListOf()
    internal val networkInterceptors: MutableList<Interceptor> = mutableListOf()
    internal var eventListenerFactory: EventListener.Factory = EventListener.NONE.asFactory()
    internal var retryOnConnectionFailure = true
    internal var authenticator: Authenticator = Authenticator.NONE
    internal var followRedirects = true
    internal var followSslRedirects = true
    internal var cookieJar: CookieJar = CookieJar.NO_COOKIES
    internal var cache: Cache? = null
    internal var dns: Dns = Dns.SYSTEM
    internal var proxy: Proxy? = null
    internal var proxySelector: ProxySelector? = null
    internal var proxyAuthenticator: Authenticator = Authenticator.NONE
    internal var socketFactory: SocketFactory = SocketFactory.getDefault()
    internal var sslSocketFactoryOrNull: SSLSocketFactory? = null
    internal var x509TrustManagerOrNull: X509TrustManager? = null
    internal var connectionSpecs: List<ConnectionSpec> = DEFAULT_CONNECTION_SPECS
    internal var protocols: List<Protocol> = DEFAULT_PROTOCOLS
    internal var hostnameVerifier: HostnameVerifier = OkHostnameVerifier
    internal var certificatePinner: CertificatePinner = CertificatePinner.DEFAULT
    internal var certificateChainCleaner: CertificateChainCleaner? = null
    internal var callTimeout = 0
    internal var connectTimeout = 10_000
    internal var readTimeout = 10_000
    internal var writeTimeout = 10_000
    internal var pingInterval = 0
    internal var minWebSocketMessageToCompress = RealWebSocket.DEFAULT_MINIMUM_DEFLATE_SIZE
    internal var routeDatabase: RouteDatabase? = null
    // ...
}

源碼中使用Builder設(shè)計(jì)模式構(gòu)建OkHttpClient對(duì)象,所以這些成員,OkHttpClient也是一一對(duì)應(yīng)的,這些組件下文中會(huì)找?guī)讉€(gè)重要的展開分析。

0x02 從newCall出發(fā)

/** Prepares the [request] to be executed at some point in the future. */
override fun newCall(request: Request): Call = RealCall(this, request, forWebSocket = false)

Request 比較簡(jiǎn)單,主要包括 url,method,headers,body的定義

重點(diǎn)分析一下RealCall:

class RealCall(
  val client: OkHttpClient,
  /** 
   * The application's original request unadulterated by redirects or auth headers. 
   * 應(yīng)用程序的原始請(qǐng)求不受重定向或auth標(biāo)頭的影響
   * 一般情況下,就是我們上面說的Request對(duì)象
   */
  val originalRequest: Request,
  val forWebSocket: Boolean
) : Call {
    private val connectionPool: RealConnectionPool = client.connectionPool.delegate
    // ...
}

之前我們已經(jīng)說過Call的作用了,RealCall也是Call的唯一實(shí)現(xiàn)

RealCall是OkHttp的應(yīng)用程序和網(wǎng)絡(luò)層之間的橋梁。RealCall暴露了高級(jí)應(yīng)用程序?qū)拥脑冀M成:連接,請(qǐng)求,響應(yīng)和流

RealCall支持異步取消,如果HTTP/2處于活動(dòng)狀態(tài),則取消操作將取消該流,但不會(huì)取消共享其連接的其他流。 但是,如果TLS握手仍在進(jìn)行中,則取消操作可能會(huì)中斷整個(gè)連接。

超時(shí)處理:

private val timeout = object : AsyncTimeout() {
    override fun timedOut() {
      cancel()
    }
  }.apply {
    timeout(client.callTimeoutMillis.toLong(), MILLISECONDS)
  }

 /**
  * 等待最多timeout時(shí)間,然后中止操作。 使用每個(gè)操作超時(shí)意味著只要向前取得進(jìn)展,操作序列就不會(huì)失敗。
  * 如果timeout == 0 ,則操作將無限期運(yùn)行。 (操作系統(tǒng)超時(shí)可能仍然適用)
  */
  open fun timeout(timeout: Long, unit: TimeUnit): Timeout {
    require(timeout >= 0) { "timeout < 0: $timeout" }
    timeoutNanos = unit.toNanos(timeout)
    return this
  }

發(fā)起請(qǐng)求的入口:

override fun execute(): Response {
    check(executed.compareAndSet(false, true)) { "Already Executed" }

    timeout.enter()
    callStart()
    try {
      client.dispatcher.executed(this)
      return getResponseWithInterceptorChain()
    } finally {
      client.dispatcher.finished(this)
    }
  }

  override fun enqueue(responseCallback: Callback) {
    check(executed.compareAndSet(false, true)) { "Already Executed" }

    callStart()
    client.dispatcher.enqueue(AsyncCall(responseCallback))
  }

首先,回過頭,看一下Client中的dispatcher:

class Dispatcher constructor() {
  // 同時(shí)執(zhí)行的最大請(qǐng)求數(shù)
  @get:Synchronized var maxRequests = 64

  //每個(gè)主機(jī)要同時(shí)執(zhí)行的最大請(qǐng)求數(shù)。 這通過URL的主機(jī)名限制了請(qǐng)求。 請(qǐng)注意,對(duì)單個(gè)IP地址的并發(fā)請(qǐng)求可能仍會(huì)超出此限制:多個(gè)主機(jī)名可能共享一個(gè)IP地址或通過同一HTTP代理路由
  @get:Synchronized var maxRequestsPerHost = 5

  //每次調(diào)度程序空閑時(shí)(運(yùn)行的調(diào)用數(shù)返回零時(shí))將調(diào)用的回調(diào)
  @set:Synchronized
  @get:Synchronized
  var idleCallback: Runnable? = null

  private var executorServiceOrNull: ExecutorService? = null

  @get:Synchronized
  @get:JvmName("executorService") val executorService: ExecutorService
    get() {
      if (executorServiceOrNull == null) {
        executorServiceOrNull = ThreadPoolExecutor(0, Int.MAX_VALUE, 60, TimeUnit.SECONDS,
            SynchronousQueue(), threadFactory("$okHttpName Dispatcher", false))
      }
      return executorServiceOrNull!!
    }

  /** Ready async calls in the order they'll be run. */
  private val readyAsyncCalls = ArrayDeque<AsyncCall>()

  /** Running asynchronous calls. Includes canceled calls that haven't finished yet. */
  private val runningAsyncCalls = ArrayDeque<AsyncCall>()

  /** Running synchronous calls. Includes canceled calls that haven't finished yet. */
  private val runningSyncCalls = ArrayDeque<RealCall>()

  constructor(executorService: ExecutorService) : this() {
    this.executorServiceOrNull = executorService
  }

  internal fun enqueue(call: AsyncCall) {
    synchronized(this) {
      readyAsyncCalls.add(call)
      if (!call.call.forWebSocket) {
        val existingCall = findExistingCallWithHost(call.host)
        if (existingCall != null) call.reuseCallsPerHostFrom(existingCall)
      }
    }
    promoteAndExecute()
  }

  /** Used by [Call.execute] to signal it is in-flight. */
  @Synchronized internal fun executed(call: RealCall) {
    runningSyncCalls.add(call)
  }
}

對(duì)于同步請(qǐng)求的情況,直接就是把RealCall對(duì)象加到runningSyncCalls中,然后執(zhí)行g(shù)etResponseWithInterceptorChain(),這個(gè)方法直接返回的就是Response對(duì)象,并且執(zhí)行一系列的攔截器,最后調(diào)用dispatcher的finish方法,移除RealCall對(duì)象。

@Throws(IOException::class)
  internal fun getResponseWithInterceptorChain(): Response {
    // Build a full stack of interceptors.
    val interceptors = mutableListOf<Interceptor>()
    interceptors += client.interceptors
    interceptors += RetryAndFollowUpInterceptor(client)
    interceptors += BridgeInterceptor(client.cookieJar)
    interceptors += CacheInterceptor(client.cache)
    interceptors += ConnectInterceptor
    if (!forWebSocket) {
      interceptors += client.networkInterceptors
    }
    interceptors += CallServerInterceptor(forWebSocket)

    val chain = RealInterceptorChain(
        call = this,
        interceptors = interceptors,
        index = 0,
        exchange = null,
        request = originalRequest,
        connectTimeoutMillis = client.connectTimeoutMillis,
        readTimeoutMillis = client.readTimeoutMillis,
        writeTimeoutMillis = client.writeTimeoutMillis
    )

    try {
      val response = chain.proceed(originalRequest)
      if (isCanceled()) {
        response.closeQuietly()
        throw IOException("Canceled")
      }
      return response
    } catch (e: IOException) {
    } finally {
    }
  }

我們可以看到,這里是RealChain調(diào)用proceed的入口,并且如果cancel的話,拋出IO異常

對(duì)于攔截器的說明:

fun interface Interceptor {
  @Throws(IOException::class)
  fun intercept(chain: Chain): Response

  interface Chain {
      // ...
  }
}

簡(jiǎn)單說,攔截器是觀察,修改并可能使發(fā)出的請(qǐng)求和相應(yīng)的請(qǐng)求短路返回。該接口的實(shí)現(xiàn)拋出[IOException]以表示連接失敗。

interface Chain 的唯一實(shí)現(xiàn)是RealInterceptorChain,這里是攔截器調(diào)用的關(guān)鍵入口,重點(diǎn)分析下proceed方法:

@Throws(IOException::class)
  override fun proceed(request: Request): Response {
    calls++

    // Call the next interceptor in the chain.
    val next = copy(index = index + 1, request = request)
    val interceptor = interceptors[index]

    @Suppress("USELESS_ELVIS")
    val response = interceptor.intercept(next) ?: throw NullPointerException(
        "interceptor $interceptor returned null")
    return response
  }

我們只看check除外的代碼,其實(shí)就是每次執(zhí)行proceed后,就從interceptors中拿下一個(gè)攔截器,并調(diào)用intercept方法

至此我們知道,Client中的我們自定義的攔截器會(huì)先調(diào)用,這也就是為什么,我們一定會(huì)在自定義攔截器中調(diào)用proceed的原因,那么方法返回呢,其實(shí)順序正好反過來,按照源碼順序,當(dāng)我們自定義的最后一個(gè)攔截器走完后:

RetryAndFollowUpInterceptor:此攔截器從故障中恢復(fù),并根據(jù)需要進(jìn)行重定向。

BridgeInterceptor:從應(yīng)用程序代碼到網(wǎng)絡(luò)代碼的橋梁。 首先,它根據(jù)用戶請(qǐng)求構(gòu)建網(wǎng)絡(luò)請(qǐng)求。 然后,它繼續(xù)呼叫網(wǎng)絡(luò)。 最后,它根據(jù)網(wǎng)絡(luò)響應(yīng)建立用戶響應(yīng)。

CacheInterceptor:從緩存中獲取服務(wù)器請(qǐng)求數(shù)據(jù),和將響應(yīng)寫入緩存的功能

ConnectInterceptor:打開與目標(biāo)服務(wù)器的連接,然后進(jìn)入下一個(gè)攔截器。 該網(wǎng)絡(luò)可能用于返回的響應(yīng),或者用于使用條件GET驗(yàn)證緩存的響應(yīng)。

object ConnectInterceptor : Interceptor {
  @Throws(IOException::class)
  override fun intercept(chain: Interceptor.Chain): Response {
    val realChain = chain as RealInterceptorChain
    val exchange = realChain.call.initExchange(chain)
    val connectedChain = realChain.copy(exchange = exchange)
    return connectedChain.proceed(realChain.request)
  }
}

其實(shí)到這里,已經(jīng)是最后一個(gè)攔截器了,proceed方法會(huì)直接返回Response,然后從這里向上,把Response對(duì)象逐一的返回給CacheInterceptor,BridgeInterceptor,RetryAndFollowUpInterceptor 和我們自定義的攔截器。這里我們重點(diǎn)看下initExchange方法:

internal fun initExchange(chain: RealInterceptorChain): Exchange {
    val exchangeFinder = this.exchangeFinder!!
    val codec = exchangeFinder.find(client, chain)
    val result = Exchange(this, eventListener, exchangeFinder, codec)
    return result
  }

我們跟進(jìn)一下exchangeFinder.find(client, chain):

fun find(
    client: OkHttpClient,
    chain: RealInterceptorChain
  ): ExchangeCodec {
    try {
      val resultConnection = findHealthyConnection(
          connectTimeout = chain.connectTimeoutMillis,
          readTimeout = chain.readTimeoutMillis,
          writeTimeout = chain.writeTimeoutMillis,
          pingIntervalMillis = client.pingIntervalMillis,
          connectionRetryEnabled = client.retryOnConnectionFailure,
          doExtensiveHealthChecks = chain.request.method != "GET"
      )
      return resultConnection.newCodec(client, chain)
    } catch (e: RouteException) {
    }
  }

繼續(xù)跟進(jìn):

@Throws(SocketException::class)
  internal fun newCodec(client: OkHttpClient, chain: RealInterceptorChain): ExchangeCodec {
    val socket = this.socket!!
    val source = this.source!!
    val sink = this.sink!!
    val http2Connection = this.http2Connection

    return if (http2Connection != null) {
      Http2ExchangeCodec(client, this, chain, http2Connection)
    } else {
      socket.soTimeout = chain.readTimeoutMillis()
      source.timeout().timeout(chain.readTimeoutMillis.toLong(), MILLISECONDS)
      sink.timeout().timeout(chain.writeTimeoutMillis.toLong(), MILLISECONDS)
      Http1ExchangeCodec(client, this, source, sink)
    }
  }

到這里已經(jīng)真相大白了,最終http的網(wǎng)絡(luò)實(shí)現(xiàn)就是Http2ExchangeCodec或者Http1ExchangeCodec

大家如果認(rèn)真思考,我們是否有遺漏的地方?沒錯(cuò),就是OkHttp的連接復(fù)用機(jī)制,我們回頭看下源碼,我們講dispatcher的時(shí)候,緊挨著的那個(gè)成員,就是ConnectionPool:

class ConnectionPool internal constructor(
  internal val delegate: RealConnectionPool
) {
  constructor(
    maxIdleConnections: Int,
    keepAliveDuration: Long,
    timeUnit: TimeUnit
  ) : this(RealConnectionPool(
      taskRunner = TaskRunner.INSTANCE,
      maxIdleConnections = maxIdleConnections,
      keepAliveDuration = keepAliveDuration,
      timeUnit = timeUnit
  ))

  constructor() : this(5, 5, TimeUnit.MINUTES)

  /** Returns the number of idle connections in the pool. */
  fun idleConnectionCount(): Int = delegate.idleConnectionCount()

  /** Returns total number of connections in the pool. */
  fun connectionCount(): Int = delegate.connectionCount()

  /** Close and remove all idle connections in the pool. */
  fun evictAll() {
    delegate.evictAll()
  }
}

管理HTTP和HTTP / 2連接的重用,以減少網(wǎng)絡(luò)延遲。 共享相同地址的HTTP請(qǐng)求可以共享一個(gè)Connection 。 此類實(shí)現(xiàn)了將哪些連接保持打開狀態(tài)以備將來使用的策略。

注意:我們看構(gòu)造方法的默認(rèn)參數(shù),官方有如下解釋:使用適合于單用戶應(yīng)用程序的調(diào)整參數(shù)創(chuàng)建一個(gè)新的連接池。此池中的調(diào)整參數(shù)可能會(huì)在將來的OkHttp版本中更改。當(dāng)前,該池最多可容納5個(gè)空閑連接,這些空閑連接在閑置5分鐘后將被驅(qū)逐。

然后,我們不難發(fā)現(xiàn),構(gòu)造方法最終其實(shí)構(gòu)建了RealConnectionPool,也就是delegate對(duì)象,OK,回頭看RealCall的代碼,其中第一個(gè)成員變量就是 connectionPool,而且就是這個(gè)delegate,RealConnectionPool的代碼我們暫且不去展開了,主要有如下幾個(gè)方法:

fun put(connection: RealConnection) {
    connection.assertThreadHoldsLock()

    connections.add(connection)
    cleanupQueue.schedule(cleanupTask)
}

fun evictAll() {
}

fun cleanup(now: Long): Long {
}

其實(shí),我們可以大膽猜測(cè)了,還記得創(chuàng)建HttpExchangeCodec的地方吧,應(yīng)該就在那里調(diào)用的put,把連接加進(jìn)來進(jìn)行維護(hù)吧。我們回頭看一下exchangeFinder.find(client, chain)這個(gè)方法,在newCodec之前,有一個(gè)findHealthyConnection,哈哈,“大白話就是找個(gè)身體好點(diǎn)的連接?。?!”:

/**
 * 查找連接,如果連接狀況良好,則將其返回。 如果不健康,請(qǐng)重復(fù)此過程,直到找到健康的連接為止。
 */
@Throws(IOException::class)
  private fun findHealthyConnection(
    connectTimeout: Int,
    readTimeout: Int,
    writeTimeout: Int,
    pingIntervalMillis: Int,
    connectionRetryEnabled: Boolean,
    doExtensiveHealthChecks: Boolean
  ): RealConnection {
    while (true) {
      val candidate = findConnection(
          connectTimeout = connectTimeout,
          readTimeout = readTimeout,
          writeTimeout = writeTimeout,
          pingIntervalMillis = pingIntervalMillis,
          connectionRetryEnabled = connectionRetryEnabled
      )
      // Confirm that the connection is good.
      if (candidate.isHealthy(doExtensiveHealthChecks)) {
        return candidate
      }
      // ...
      throw IOException("exhausted all routes")
    }
  }

我們繼續(xù)跟一下 findConnection:

  /**
   * 返回用于托管新流的連接。如果存在現(xiàn)有連接,則首選現(xiàn)有連接,然后是池,最后建立一個(gè)新連接。
   * 這將在每次阻止操作之前檢查取消。
   */
  @Throws(IOException::class)
  private fun findConnection(
    connectTimeout: Int,
    readTimeout: Int,
    writeTimeout: Int,
    pingIntervalMillis: Int,
    connectionRetryEnabled: Boolean
  ): RealConnection {
    //...
    if (connectionPool.callAcquirePooledConnection(address, call, null, false)) {
      val result = call.connection!!
      eventListener.connectionAcquired(call, result)
      return result
    }
    // ...

    // Connect. Tell the call about the connecting call so async cancels work.
    val newConnection = RealConnection(connectionPool, route)
    call.connectionToCancel = newConnection
    try {
      newConnection.connect(
          connectTimeout,
          readTimeout,
          writeTimeout,
          pingIntervalMillis,
          connectionRetryEnabled,
          call,
          eventListener
      )
    } finally {
      call.connectionToCancel = null
    }
    call.client.routeDatabase.connected(newConnection.route())

    synchronized(newConnection) {
      connectionPool.put(newConnection)             // put 到連接池
      call.acquireConnectionNoEvents(newConnection)
    }
    return newConnection
  }

同步網(wǎng)絡(luò)請(qǐng)求到此為止,下面我們回過頭來,看一下異步的網(wǎng)絡(luò)請(qǐng)求:

0x03 夢(mèng)回newCall

我們回頭看RealCall的enqueue方法,其實(shí)是把AsyncCall對(duì)象添加給Dispatcher組件,回頭去看Dispatcher的源碼,我們發(fā)現(xiàn)把AsyncCall添加到readyAsyncCalls,然后執(zhí)行promoteAndExecute方法:

private fun promoteAndExecute(): Boolean {
    this.assertThreadDoesntHoldLock()
    // ... ...
    for (i in 0 until executableCalls.size) {
      val asyncCall = executableCalls[i]
      asyncCall.executeOn(executorService)
    }
    return isRunning
  }

executorService對(duì)象我們?cè)倏碊ispatcher組件的時(shí)候應(yīng)該注意到了吧,是一個(gè)自定義的線程池,我們跟一下executeOn方法:

fun executeOn(executorService: ExecutorService) {
      client.dispatcher.assertThreadDoesntHoldLock()
      var success = false
      try {
        executorService.execute(this)  // AsyncCall 的 run 會(huì)被調(diào)用
        success = true
      } catch (e: RejectedExecutionException) {
        val ioException = InterruptedIOException("executor rejected")
        ioException.initCause(e)
        noMoreExchanges(ioException)
        responseCallback.onFailure(this@RealCall, ioException)
      } finally {
        if (!success) {
          client.dispatcher.finished(this) // This call is no longer running!
        }
      }
    }

所以最終線程池會(huì)調(diào)用AsyncCall的run方法:

override fun run() {
      threadName("OkHttp ${redactedUrl()}") {
        var signalledCallback = false
        timeout.enter()
        try {
          val response = getResponseWithInterceptorChain()
          signalledCallback = true
          responseCallback.onResponse(this@RealCall, response)
        } // ...
      }
    }
  }

所以,流程又來到了getResponseWithInterceptorChain()。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容