OkHttp 4.2.2 源碼分析
原文鏈接
注意:OkHttp 3.12.0 以上版本需要 Android 21+,否則會(huì)奔潰,這個(gè)問題之前遇見過,但是具體什么錯(cuò)誤因?yàn)楫?dāng)時(shí)沒有記錄所以遺忘了,但是這里分析的代碼還是采用當(dāng)前最新的版本 4.2.2,并且這個(gè)版本的代碼已經(jīng)采用了 Kotlin 了。
基本使用
val okHttpClient = OkHttpClient.Builder().build()
val request = Request.Builder().get().url("https://www.baidu.com").build()
val call = okHttpClient.newCall(request)
call.enqueue(object : Callback {
override fun onFailure(call: Call, e: IOException) {
Log.d("OkHttpClient", e.toString())
}
override fun onResponse(call: Call, response: Response) {
Log.d("OkHttpClient", response.toString())
}
})
以上就是 OkHttp 的基本使用,里面涉及到了 OkHttpClient、Request、Call、Callback 對象,為了最快的理解 OkHttp 是如何工作的我們可以從 call.enqueue() 入手,但是 Call 僅僅是一個(gè)接口,所以我們要看看它的實(shí)現(xiàn)類和這個(gè)實(shí)現(xiàn)類是怎么來的。
一、 OkHttpClient.newCall
// OkHttpClient.kt
override fun newCall(request: Request): Call {
return RealCall.newRealCall(this, request, forWebSocket = false)
}
// RealCall.kt
fun newRealCall(
client: OkHttpClient,
originalRequest: Request,
forWebSocket: Boolean
): RealCall {
// Safely publish the Call instance to the EventListener.
return RealCall(client, originalRequest, forWebSocket).apply {
transmitter = Transmitter(client, this)
}
}
僅僅是調(diào)用了 RealCall 的方法,把 OkHttpClient 和 Request 對象當(dāng)作參數(shù),新建了一個(gè) RealCall,然后初始化 Transmitter(發(fā)射器)對象。
二、RealCall.execute
我們發(fā)起請求的方式有同步和異步兩種,execute 就是同步請求,而 enqueue 為異步請求。
override fun execute(): Response {
//1、
synchronized(this) {
check(!executed) { "Already Executed" }
executed = true
}
//2、
transmitter.timeoutEnter()
//3、
transmitter.callStart()
try {
//4、
client.dispatcher.executed(this)
//5、
return getResponseWithInterceptorChain()
} finally {
//6、
client.dispatcher.finished(this)
}
}
首先通過 executed 判斷是否已經(jīng)調(diào)用過,如果多次調(diào)用則拋出異常。
請求超時(shí)的判斷處理。
在初始化 OkHttpClient 的時(shí)候我們可以創(chuàng)建 EventListener.Factory 對象,為每個(gè) RealCall 對象創(chuàng)建監(jiān)聽對象,在 transmitter.callStart 中會(huì)調(diào)用 EventListener.callStart。
-
調(diào)用 Dispatcher 對象的 executed 方法,把 RealCall 傳入,事實(shí)上僅僅是加入到一個(gè)列表當(dāng)中。
@Synchronized internal fun executed(call: RealCall) { runningSyncCalls.add(call) } getResponseWithInterceptorChain 是真正開始處理請求的地方,這里我們后面單獨(dú)講。
請求結(jié)束后從列表中移除,如果發(fā)現(xiàn) Dispatcher 空閑還會(huì)調(diào)用空閑的回調(diào),
三、RealCall.enqueue
enqueue 為異步請求。
// RealCall.kt
override fun enqueue(responseCallback: Callback) {
//1、
synchronized(this) {
check(!executed) { "Already Executed" }
executed = true
}
//2、
transmitter.callStart()
//3、
client.dispatcher.enqueue(AsyncCall(responseCallback))
}
- 防止重復(fù)調(diào)用。
- 事件開始分發(fā)。
- 與同步調(diào)用不同的地方,這里把我們使用的時(shí)候傳入的 Callback 對象,進(jìn)一步用 AsyncCall 封裝,然后調(diào)用 Dispatcher.enqueue 方法。
//Dispatcher.kt
internal fun enqueue(call: AsyncCall) {
synchronized(this) {
//4、
readyAsyncCalls.add(call)
if (!call.get().forWebSocket) {
//5、
val existingCall = findExistingCallWithHost(call.host())
if (existingCall != null) call.reuseCallsPerHostFrom(existingCall)
}
}
//6、
promoteAndExecute()
}
- 把 AsycCall 對象添加到等待隊(duì)列。
- 如果不是 WebScoket 的請求(一般都不是),會(huì)進(jìn)一步從正在執(zhí)行的和等待中的異步請求列表中查找 Host 相同的請求,把它的 callsPerHost 對象的引用拷貝過來,用來統(tǒng)計(jì)相同 Host 的請求數(shù)。reuseCallsPerHostFrom 方法實(shí)現(xiàn)見下。
// RealCall.kt 中內(nèi)部類 AsyncCall
@Volatile private var callsPerHost = AtomicInteger(0)
fun callsPerHost(): AtomicInteger = callsPerHost
fun reuseCallsPerHostFrom(other: AsyncCall) {
this.callsPerHost = other.callsPerHost
}
- promoteAndExecute 方法見下。
private fun promoteAndExecute(): Boolean {
assert(!Thread.holdsLock(this))
val executableCalls = mutableListOf<AsyncCall>()
val isRunning: Boolean
synchronized(this) {
val i = readyAsyncCalls.iterator()
// 7、
while (i.hasNext()) {
val asyncCall = i.next()
//到達(dá)最大請求數(shù),默認(rèn)為 64
if (runningAsyncCalls.size >= this.maxRequests) break // Max capacity.
//到達(dá)單個(gè) host 最大請求數(shù),默認(rèn)為 5
if (asyncCall.callsPerHost().get() >= this.maxRequestsPerHost) continue // Host max capacity.
i.remove()
asyncCall.callsPerHost().incrementAndGet()
executableCalls.add(asyncCall)
runningAsyncCalls.add(asyncCall)
}
isRunning = runningCallsCount() > 0
}
// 8、
for (i in 0 until executableCalls.size) {
val asyncCall = executableCalls[i]
asyncCall.executeOn(executorService)
}
return isRunning
}
- 遍歷等待執(zhí)行的異步請求列表 readyAsyncCalls,將符合要求的請求收集到 executableCalls 列表中,同時(shí)在 readyAsyncCalls 中移除。這里有兩種情況為暫時(shí)不處理的請求:一個(gè)是到達(dá)單個(gè) host 最大請求數(shù),另一個(gè)是總體請求到達(dá)最大請求數(shù)。
- 遍歷收集到的請求,調(diào)用 AsyncCall.executeOn,參數(shù)為線程池。
四、AsyncCall.executeOn
AsyncCall 實(shí)現(xiàn)了 Runnable 接口,這里又把 executorService 傳入,不難想到后面的邏輯就是用線程池執(zhí)行 Runnable 的 run 方法,下面我們來看下。
fun executeOn(executorService: ExecutorService) {
assert(!Thread.holdsLock(client.dispatcher))
var success = false
try {
//1、
executorService.execute(this)
success = true
} catch (e: RejectedExecutionException) {
//2、
val ioException = InterruptedIOException("executor rejected")
ioException.initCause(e)
transmitter.noMoreExchanges(ioException)
responseCallback.onFailure(this@RealCall, ioException)
} finally {
//3、
if (!success) {
client.dispatcher.finished(this) // This call is no longer running!
}
}
}
- 用線程池執(zhí)行,后面馬上就會(huì)看 run 的實(shí)現(xiàn)。
- 異常處理,并分發(fā)。
- 異常結(jié)束的結(jié)束處理。
override fun run() {
// 4、
threadName("OkHttp ${redactedUrl()}") {
var signalledCallback = false
// 5、
transmitter.timeoutEnter()
try {
// 6、
val response = getResponseWithInterceptorChain()
signalledCallback = true
responseCallback.onResponse(this@RealCall, response)
} catch (e: IOException) {
// 7、
if (signalledCallback) {
// Do not signal the callback twice!
Platform.get().log(INFO, "Callback failure for ${toLoggableString()}", e)
} else {
responseCallback.onFailure(this@RealCall, e)
}
} finally {
// 8、
client.dispatcher.finished(this)
}
}
}
- 修改線程名,在執(zhí)行完內(nèi)部代碼之后修改回來,贊美 kotlin 的便捷性。
- 請求超時(shí)的判斷處理。
- 殊途同歸的 getResponseWithInterceptorChain,后面分析,緊跟著調(diào)用回調(diào)。
- 異常處理,以及回調(diào)。
- 處理結(jié)束把 AsyncCall 從 Dispatcher 的列表中移除,相同 Host 的請求計(jì)數(shù)減去一。
五、getResponseWithInterceptorChain
OkHttp 以責(zé)任鏈模式通過攔截器把需要進(jìn)行的網(wǎng)絡(luò)請求進(jìn)行了責(zé)任的隔離,各個(gè)攔截器承擔(dān)著各自的職責(zé)。
@Throws(IOException::class)
fun getResponseWithInterceptorChain(): Response {
// Build a full stack of interceptors.
// 1、
val interceptors = mutableListOf<Interceptor>()
interceptors += client.interceptors
interceptors += RetryAndFollowUpInterceptor(client)
interceptors += BridgeInterceptor(client.cookieJar)
interceptors += CacheInterceptor(client.cache)
interceptors += ConnectInterceptor
if (!forWebSocket) {
interceptors += client.networkInterceptors
}
interceptors += CallServerInterceptor(forWebSocket)
// 2、
val chain = RealInterceptorChain(interceptors, transmitter, null, 0, originalRequest, this,
client.connectTimeoutMillis, client.readTimeoutMillis, client.writeTimeoutMillis)
var calledNoMoreExchanges = false
try {
// 3、
val response = chain.proceed(originalRequest)
if (transmitter.isCanceled) {
response.closeQuietly()
throw IOException("Canceled")
}
return response
} catch (e: IOException) {
calledNoMoreExchanges = true
throw transmitter.noMoreExchanges(e) as Throwable
} finally {
if (!calledNoMoreExchanges) {
transmitter.noMoreExchanges(null)
}
}
}
- 把各個(gè)攔截器添加到列表,包括:自定義攔截器,重試重定向攔截器,橋接攔截器,緩存攔截器,連接攔截器,自定義網(wǎng)絡(luò)攔截器,網(wǎng)絡(luò)請求攔截器。
- 把前面的攔截器列表封裝成 RealInterceptorChain 對象。
- 調(diào)用 RealInterceptorChain 對象的 proceed 方法,獲取 Response 對象,最后進(jìn)行一些收尾的工作。
// RealInterceptorChain.kt
@Throws(IOException::class)
fun proceed(request: Request, transmitter: Transmitter, exchange: Exchange?): Response {
if (index >= interceptors.size) throw AssertionError()
// 省略一些檢查代碼
calls++
// Call the next interceptor in the chain.
//4、
val next = RealInterceptorChain(interceptors, transmitter, exchange,
index + 1, request, call, connectTimeout, readTimeout, writeTimeout)
//5、
val interceptor = interceptors[index]
@Suppress("USELESS_ELVIS")
val response = interceptor.intercept(next) ?: throw NullPointerException(
"interceptor $interceptor returned null")
return response
}
新建 RealInterceptorChain 對象,把當(dāng)前的對象的參數(shù)傳入,index + 1。
獲取當(dāng)前 index 的攔截器,調(diào)用 intercept 方法,傳入步驟 4 新建的 RealInterceptorChain 對象,返回結(jié)果 Response。
那么這樣能清楚攔截器的調(diào)用過程了,每個(gè)攔截器的 intercept 會(huì)被 RealInterceptorChain.proceed 觸發(fā),然后接收到一個(gè)新的 RealInterceptorChain 對象,只要在自身的 intercept 方法中調(diào)用接收到的對象的 proceed 方法,就能觸發(fā)下一個(gè)動(dòng)作,拿到一個(gè) Response 對象。這樣層層調(diào)用,就能走完我們一開始收集的攔截器列表中所有的攔截方法。這個(gè)就是核心思想。
六、攔截器
1、重試重定向攔截器 RetryAndFollowUpInterceptor
重試重定向攔截器,主要是請求失敗或者請求需要重定向的時(shí)候進(jìn)行新的請求。
// RetryAndFollowUpInterceptor.kt
@Throws(IOException::class)
override fun intercept(chain: Interceptor.Chain): Response {
var request = chain.request()
val realChain = chain as RealInterceptorChain
val transmitter = realChain.transmitter()
var followUpCount = 0
var priorResponse: Response? = null
// 循環(huán)操作
while (true) {
//1、
transmitter.prepareToConnect(request)
if (transmitter.isCanceled) {
throw IOException("Canceled")
}
var response: Response
var success = false
try {
//2、
response = realChain.proceed(request, transmitter, null)
success = true
} catch (e: RouteException) {
// The attempt to connect via a route failed. The request will not have been sent.
// 路由錯(cuò)誤,如果不可以恢復(fù)則直接拋出異常
if (!recover(e.lastConnectException, transmitter, false, request)) {
throw e.firstConnectException
}
continue
} catch (e: IOException) {
// An attempt to communicate with a server failed. The request may have been sent.
//
val requestSendStarted = e !is ConnectionShutdownException
// IO 錯(cuò)誤,如果不可以恢復(fù)則直接拋出異常
if (!recover(e, transmitter, requestSendStarted, request)) throw e
continue
} finally {
// The network call threw an exception. Release any resources.
// 3、
if (!success) {
transmitter.exchangeDoneDueToException()
}
}
// Attach the prior response if it exists. Such responses never have a body.
// 4、第一次 priorResponse 肯定是空的,如果經(jīng)過重試的話則會(huì)把之前的 Response 賦值給新的 Response
if (priorResponse != null) {
response = response.newBuilder()
.priorResponse(priorResponse.newBuilder()
.body(null)
.build())
.build()
}
val exchange = response.exchange
val route = exchange?.connection()?.route()
// 5、
val followUp = followUpRequest(response, route)
// 6、
if (followUp == null) {
if (exchange != null && exchange.isDuplex) {
transmitter.timeoutEarlyExit()
}
return response
}
val followUpBody = followUp.body
// 7、一次性請求的話直接返回
if (followUpBody != null && followUpBody.isOneShot()) {
return response
}
response.body?.closeQuietly()
// 8、
if (transmitter.hasExchange()) {
exchange?.detachWithViolence()
}
// 9、次數(shù)判斷,最大為20
if (++followUpCount > MAX_FOLLOW_UPS) {
throw ProtocolException("Too many follow-up requests: $followUpCount")
}
// 10、
request = followUp
priorResponse = response
}
}
transmitter 對象是在創(chuàng)建 RealCall 的時(shí)候初始化的,判斷是否有可以重用的連接或者對沒用的連接的回收,如果沒有則創(chuàng)建新的 ExchangeFinder 對象。
realChain.proceed 調(diào)用到下一個(gè)攔截器,該行代碼之后都是請求回來后的處理。
-
catch 和 finally,catch 中主要是出現(xiàn)異常后,判斷是否可以重試,如果可以就 continue 否則拋出異常并進(jìn)入 finally 做回收操作。稍微看下 recover 方法。
private fun recover( e: IOException, transmitter: Transmitter, requestSendStarted: Boolean, userRequest: Request ): Boolean { // 應(yīng)用層面不允許重試,我們可以在 OkHttpClient 初始化時(shí)候設(shè)置 if (!client.retryOnConnectionFailure) return false // 已經(jīng)發(fā)起過請求,且只允許發(fā)送一次 if (requestSendStarted && requestIsOneShot(e, userRequest)) return false // 一些致命問題,比如證書認(rèn)證錯(cuò)誤等 if (!isRecoverable(e, requestSendStarted)) return false // 沒有更多的路由去嘗試 if (!transmitter.canRetry()) return false // For failure recovery, use the same route selector with a new connection. return true } 第一次 priorResponse 肯定是空的,如果經(jīng)過重試的話則會(huì)把之前的 Response 賦值給新的 Response 的 priorResponse 成員變量,并且顯示的將 priorResponse 的 body 置空。這個(gè)對象的賦值意義是什么?其實(shí)作用是在步驟 5 中,作為停止重試的判斷使用:如果先后兩次 Http 請求結(jié)果獲取的狀態(tài)碼都是 408(請求超時(shí))或者都是 503(服務(wù)不可用)則直接停止重試。
followUpRequest 根據(jù)請求回來的 Response 的響應(yīng)碼構(gòu)建新的 Request,其中就包含了 301,302 等重定向的處理,步驟 4 提到的處理,以及我們一開始初始化 OkHttpClient 對象的是否允許重試的處理等。如果需要重新請求則 followUp 對象不為空,否則為 null 停止繼續(xù)請求。
如果 followUp 為 null 則可以直接返回請求結(jié)果,這里還涉及到一個(gè) Exchange 對象,這個(gè)我們后面再講。
一次性請求的話直接返回。
走到這里表示,我們需要跟進(jìn)一個(gè)新的請求,但是之前請求的一些操作還沒有結(jié)束,則需要在這里停止。
重試次數(shù)判斷,最大為20。
更新 Request 對象為新生成的對象,priorResponse 賦值(作用在步驟 4 中已經(jīng)說明),進(jìn)入下一次循環(huán)。
2、橋接攔截器 BridgeInterceptor
在發(fā)起請求之前,用應(yīng)用層發(fā)起的請求將 Http 請求需要的請求頭填充完整,在請求之后,將 Http 響應(yīng)解析為應(yīng)用層的響應(yīng)。
// BridgeInterceptor.kt
@Throws(IOException::class)
override fun intercept(chain: Interceptor.Chain): Response {
val userRequest = chain.request()
val requestBuilder = userRequest.newBuilder()
val body = userRequest.body
if (body != null) {
// 1、
val contentType = body.contentType()
if (contentType != null) {
requestBuilder.header("Content-Type", contentType.toString())
}
val contentLength = body.contentLength()
if (contentLength != -1L) {
requestBuilder.header("Content-Length", contentLength.toString())
requestBuilder.removeHeader("Transfer-Encoding")
} else {
requestBuilder.header("Transfer-Encoding", "chunked")
requestBuilder.removeHeader("Content-Length")
}
}
// 2、
if (userRequest.header("Host") == null) {
requestBuilder.header("Host", userRequest.url.toHostHeader())
}
if (userRequest.header("Connection") == null) {
requestBuilder.header("Connection", "Keep-Alive")
}
var transparentGzip = false
if (userRequest.header("Accept-Encoding") == null && userRequest.header("Range") == null) {
transparentGzip = true
requestBuilder.header("Accept-Encoding", "gzip")
}
// 3、
val cookies = cookieJar.loadForRequest(userRequest.url)
if (cookies.isNotEmpty()) {
requestBuilder.header("Cookie", cookieHeader(cookies))
}
if (userRequest.header("User-Agent") == null) {
requestBuilder.header("User-Agent", userAgent)
}
val networkResponse = chain.proceed(requestBuilder.build())
// 4、
cookieJar.receiveHeaders(userRequest.url, networkResponse.headers)
val responseBuilder = networkResponse.newBuilder()
.request(userRequest)
// 5、
if (transparentGzip &&
"gzip".equals(networkResponse.header("Content-Encoding"), ignoreCase = true) &&
networkResponse.promisesBody()) {
val responseBody = networkResponse.body
if (responseBody != null) {
val gzipSource = GzipSource(responseBody.source())
val strippedHeaders = networkResponse.headers.newBuilder()
.removeAll("Content-Encoding")
.removeAll("Content-Length")
.build()
responseBuilder.headers(strippedHeaders)
val contentType = networkResponse.header("Content-Type")
responseBuilder.body(RealResponseBody(contentType, -1L, gzipSource.buffer()))
}
}
return responseBuilder.build()
}
- 如果 Request.body 不為空,則添加 Content-Type、Content-Length 或者 Transfer-Encoding 請求頭。
- 添加 Host、Connection 請求頭,如果支持壓縮還需要添加 Accept-Encoding:gzip,相應(yīng)的需要在請求回來之后進(jìn)行解壓的操作。
- CookJar 是對 cookie 的支持,但是它默認(rèn)是一個(gè)空實(shí)現(xiàn)(具體可以看 OkHttpClient.Builder 中的代碼),如果我們需要?jiǎng)t可以自己實(shí)現(xiàn)一個(gè) CookJar 來保存和讀取 cookie。
- 這一步,以及之后的邏輯都是響應(yīng)后的處理了,這里首先是 cookie 的保持處理,默認(rèn)是沒有的。
- 如果之前請求的時(shí)候在請求頭中添加了 Accept-Encoding:gzip,而且響應(yīng)頭中也的確說明了是 gzip 的,則進(jìn)行解壓。
3、緩存攔截器 CacheInterceptor
主要是用于從緩存讀取和寫入請求所對應(yīng)的響應(yīng)。這里緩存的策略事實(shí)上和 Http 的緩存機(jī)制有很大的關(guān)聯(lián)。
// CacheInterceptor.kt
@Throws(IOException::class)
override fun intercept(chain: Interceptor.Chain): Response {
// 1、
val cacheCandidate = cache?.get(chain.request())
val now = System.currentTimeMillis()
// 2、
val strategy = CacheStrategy.Factory(now, chain.request(), cacheCandidate).compute()
val networkRequest = strategy.networkRequest
val cacheResponse = strategy.cacheResponse
cache?.trackResponse(strategy)
if (cacheCandidate != null && cacheResponse == null) {
// The cache candidate wasn't applicable. Close it.
cacheCandidate.body?.closeQuietly()
}
// If we're forbidden from using the network and the cache is insufficient, fail.
// 3、
if (networkRequest == null && cacheResponse == null) {
return Response.Builder()
.request(chain.request())
.protocol(Protocol.HTTP_1_1)
.code(HTTP_GATEWAY_TIMEOUT)
.message("Unsatisfiable Request (only-if-cached)")
.body(EMPTY_RESPONSE)
.sentRequestAtMillis(-1L)
.receivedResponseAtMillis(System.currentTimeMillis())
.build()
}
// If we don't need the network, we're done.
// 4、
if (networkRequest == null) {
return cacheResponse!!.newBuilder()
.cacheResponse(stripBody(cacheResponse))
.build()
}
// 5、
var networkResponse: Response? = null
try {
networkResponse = chain.proceed(networkRequest)
} finally {
// If we're crashing on I/O or otherwise, don't leak the cache body.
if (networkResponse == null && cacheCandidate != null) {
cacheCandidate.body?.closeQuietly()
}
}
// If we have a cache response too, then we're doing a conditional get.
// 6、
if (cacheResponse != null) {
if (networkResponse?.code == HTTP_NOT_MODIFIED) {
val response = cacheResponse.newBuilder()
.headers(combine(cacheResponse.headers, networkResponse.headers))
.sentRequestAtMillis(networkResponse.sentRequestAtMillis)
.receivedResponseAtMillis(networkResponse.receivedResponseAtMillis)
.cacheResponse(stripBody(cacheResponse))
.networkResponse(stripBody(networkResponse))
.build()
networkResponse.body!!.close()
// Update the cache after combining headers but before stripping the
// Content-Encoding header (as performed by initContentStream()).
cache!!.trackConditionalCacheHit()
cache.update(cacheResponse, response)
return response
} else {
cacheResponse.body?.closeQuietly()
}
}
// 7、
val response = networkResponse!!.newBuilder()
.cacheResponse(stripBody(cacheResponse))
.networkResponse(stripBody(networkResponse))
.build()
if (cache != null) {
if (response.promisesBody() && CacheStrategy.isCacheable(response, networkRequest)) {
// Offer this request to the cache.
val cacheRequest = cache.put(response)
return cacheWritingResponse(cacheRequest, response)
}
if (HttpMethod.invalidatesCache(networkRequest.method)) {
try {
cache.remove(networkRequest)
} catch (_: IOException) {
// The cache cannot be written.
}
}
}
return response
}
- Cache 對象可以在 OkHttpClient 初始化的時(shí)候構(gòu)建,里面是通過 DiskLruCache 進(jìn)行緩存的,以 url 生成 key 在緩存中查找到目標(biāo)之后重新構(gòu)造成 Response,然后進(jìn)一步對比 Request 和從緩存恢復(fù) Response,如果沒命中則會(huì)返回 null,否則返回 Response。
- 獲取緩存策略,根據(jù)策略的 networkRequest 和 cacheResponse 是否為 null 會(huì)有不同的處理。具體緩存策略的邏輯稍后再講。
- 如果 networkRequest 和 cacheResponse 都為 null,則不再進(jìn)行網(wǎng)絡(luò)請求,直接構(gòu)造失敗的 Response, 結(jié)束。
- 如果只有 networkRequest 為 null,則表示緩存可用,結(jié)束。
- 如果 networkRequest 不為 null,則進(jìn)行網(wǎng)絡(luò)請求。
- 從這里開始是網(wǎng)絡(luò)請求回來后的邏輯,當(dāng) cacheResponse 不為 null 且 networkResponse 的相應(yīng)碼為 304(客戶端有緩存。http 請求時(shí)候發(fā)現(xiàn)自己緩存的文件有 Last Modified ,那么在請求中會(huì)包含 If Modified Since,服務(wù)器通過對比,如果沒有改變,就會(huì)返回 304,響應(yīng)體就不需要繼續(xù)發(fā)送了以此減少帶寬的消化。),則合并兩個(gè)響應(yīng)頭,更新緩存,結(jié)束。
- 如果 cache 不為 null,驗(yàn)證 networkResponse 是否需要緩存,并緩存,結(jié)束;否則返回結(jié)果,結(jié)束。
CacheStrategy
前面講到了緩存策略,事實(shí)證明返回的 CacheStrategy 的 networkRequest 和 cacheResponse 直接影響到了后續(xù)的調(diào)用邏輯。緩存策略是怎么產(chǎn)生的,主要還是要看 CacheStrategy.compute 方法:
//CacheStrategy.kt
fun compute(): CacheStrategy {
val candidate = computeCandidate()
// We're forbidden from using the network and the cache is insufficient.
// 如果獲得的 networkRequest 為 null,且只支持從緩存讀取,那么就可以直接構(gòu)造失敗的響應(yīng)了
// 對應(yīng)前面步驟 3
if (candidate.networkRequest != null && request.cacheControl.onlyIfCached) {
return CacheStrategy(null, null)
}
return candidate
}
private fun computeCandidate(): CacheStrategy {
// 緩存沒有命中,發(fā)起網(wǎng)絡(luò)請求,對應(yīng)前面步驟 5
if (cacheResponse == null) {
return CacheStrategy(request, null)
}
// 請求為 https,緩存沒有握手信息,發(fā)起網(wǎng)絡(luò)請求,對應(yīng)前面步驟 5
if (request.isHttps && cacheResponse.handshake == null) {
return CacheStrategy(request, null)
}
// isCacheable 中對 cacheResponse 的響應(yīng)碼以及 request 和 cacheResponse
// Cache-Control 頭做了判斷,如果不滿足,則對應(yīng)前面步驟 5
if (!isCacheable(cacheResponse, request)) {
return CacheStrategy(request, null)
}
val requestCaching = request.cacheControl
// hasConditions 中對請求頭中是否存在 If-Modified-Since 和 If-None-Match 做了判斷,
// 事實(shí)上如果有這兩個(gè)字段,也可以發(fā)起請求,如果服務(wù)端驗(yàn)證沒有過期,則只發(fā)送響應(yīng)頭回來
// 對應(yīng)步驟 5 以及后續(xù)驗(yàn)證的步驟 6
if (requestCaching.noCache || hasConditions(request)) {
return CacheStrategy(request, null)
}
val responseCaching = cacheResponse.cacheControl
val ageMillis = cacheResponseAge()
var freshMillis = computeFreshnessLifetime()
if (requestCaching.maxAgeSeconds != -1) {
freshMillis = minOf(freshMillis, SECONDS.toMillis(requestCaching.maxAgeSeconds.toLong()))
}
var minFreshMillis: Long = 0
if (requestCaching.minFreshSeconds != -1) {
minFreshMillis = SECONDS.toMillis(requestCaching.minFreshSeconds.toLong())
}
var maxStaleMillis: Long = 0
if (!responseCaching.mustRevalidate && requestCaching.maxStaleSeconds != -1) {
maxStaleMillis = SECONDS.toMillis(requestCaching.maxStaleSeconds.toLong())
}
// 有緩存,不新鮮則進(jìn)行標(biāo)記,但是沒必要發(fā)起網(wǎng)絡(luò)請求,對應(yīng)步驟 4
if (!responseCaching.noCache && ageMillis + minFreshMillis < freshMillis + maxStaleMillis) {
val builder = cacheResponse.newBuilder()
if (ageMillis + minFreshMillis >= freshMillis) {
builder.addHeader("Warning", "110 HttpURLConnection \"Response is stale\"")
}
val oneDayMillis = 24 * 60 * 60 * 1000L
if (ageMillis > oneDayMillis && isFreshnessLifetimeHeuristic()) {
builder.addHeader("Warning", "113 HttpURLConnection \"Heuristic expiration\"")
}
return CacheStrategy(null, builder.build())
}
val conditionName: String
val conditionValue: String?
when {
etag != null -> {
// 對應(yīng) cacheResponse 響應(yīng)頭中的 ETag 字段,
// 如果不為空則給新的請求頭添加 If-None-Match
conditionName = "If-None-Match"
conditionValue = etag
}
lastModified != null -> {
// 對應(yīng) cacheResponse 響應(yīng)頭中的 Last-Modified 字段,
// 如果不為空則給新的請求頭添加 If-Modified-Since
conditionName = "If-Modified-Since"
conditionValue = lastModifiedString
}
servedDate != null -> {
// 對應(yīng) cacheResponse 響應(yīng)頭中的 Date 字段,
// 如果不為空則給新的請求頭添加 If-Modified-Since
conditionName = "If-Modified-Since"
conditionValue = servedDateString
}
// 如果不滿足以上情況,則直接發(fā)起請求,對應(yīng)步驟 5
else -> return CacheStrategy(request, null) // No condition! Make a regular request.
}
// 如果有滿足上面 when 的條件,則添加進(jìn)相應(yīng)的請求頭中,對應(yīng)步驟 5以及后續(xù)驗(yàn)證的步驟 6
val conditionalRequestHeaders = request.headers.newBuilder()
conditionalRequestHeaders.addLenient(conditionName, conditionValue!!)
val conditionalRequest = request.newBuilder()
.headers(conditionalRequestHeaders.build())
.build()
return CacheStrategy(conditionalRequest, cacheResponse)
}
那么分析到這里,了解過 Http 緩存機(jī)制的同學(xué)應(yīng)該也發(fā)現(xiàn)了,這里的代碼和 Http 緩存機(jī)制完全一致。而且緩存策略中的處理,也和緩存攔截器里面的操作都能相互對應(yīng)上。
4、連接攔截器 ConnectInterceptor
用于開啟一個(gè)連接。
// ConnectInterceptor.kt
@Throws(IOException::class)
override fun intercept(chain: Interceptor.Chain): Response {
val realChain = chain as RealInterceptorChain
val request = realChain.request()
val transmitter = realChain.transmitter()
// 1、
val doExtensiveHealthChecks = request.method != "GET"
val exchange = transmitter.newExchange(chain, doExtensiveHealthChecks)
return realChain.proceed(request, transmitter, exchange)
}
- 當(dāng)請求方式不是 GET 的時(shí)候需要更全面的檢查,獲取 Exchange 對象(就叫他轉(zhuǎn)換器),transmitter 之前提到過在創(chuàng)建 RealCall 的時(shí)候創(chuàng)建的。
//Transmitter.kt
internal fun newExchange(chain: Interceptor.Chain, doExtensiveHealthChecks: Boolean): Exchange {
synchronized(connectionPool) {
// 2、
check(!noMoreExchanges) { "released" }
check(exchange == null) {
"cannot make a new request because the previous response is still open: " +
"please call response.close()"
}
}
// 3、
val codec = exchangeFinder!!.find(client, chain, doExtensiveHealthChecks)
// 4、
val result = Exchange(this, call, eventListener, exchangeFinder!!, codec)
synchronized(connectionPool) {
this.exchange = result
this.exchangeRequestDone = false
this.exchangeResponseDone = false
return result
}
}
- noMoreExchanges 如果為 true 表示 Transmitter 已經(jīng)要釋放不再接受新的處理,exchange 不為 null 表示之前的響應(yīng)還沒有關(guān)閉。
- 通過重試攔截器里面初始化的 ExchangeFinder 對象,查找或者新建一個(gè)健康的連接并封裝到一個(gè) ExchangeCodec(編碼器)返回,根據(jù) http 的不同版本,編碼器也不同。
- 將 ExchangeCodec 編碼器等參數(shù)封裝到 Exchange 對象返回,通過最終通過 RealInterceptorChain.proceed 方法傳遞到下一個(gè)攔截器(如果我們有自定義的網(wǎng)絡(luò)攔截器那么就會(huì)被調(diào)用到,如果沒有則到了最后一個(gè)攔截器:網(wǎng)絡(luò)請求攔截器)。
如何獲取一個(gè)健康的連接?
連接攔截器自身的代碼看起來很簡潔,但核心點(diǎn)是怎么查找連接。所以我們還需要深入看下 ExchangeFinder.find 方法。
// ExchangeFinder.kt
fun find(
client: OkHttpClient,
chain: Interceptor.Chain,
doExtensiveHealthChecks: Boolean
): ExchangeCodec {
// 省略一些代碼
try {
val resultConnection = findHealthyConnection(
connectTimeout = connectTimeout,
readTimeout = readTimeout,
writeTimeout = writeTimeout,
pingIntervalMillis = pingIntervalMillis,
connectionRetryEnabled = connectionRetryEnabled,
doExtensiveHealthChecks = doExtensiveHealthChecks
)
return resultConnection.newCodec(client, chain)
}
// 省略一些代碼
}
@Throws(IOException::class)
private fun findHealthyConnection(
connectTimeout: Int,
readTimeout: Int,
writeTimeout: Int,
pingIntervalMillis: Int,
connectionRetryEnabled: Boolean,
doExtensiveHealthChecks: Boolean
): RealConnection {
// 循環(huán)查找可用的連接
while (true) {
val candidate = findConnection(
connectTimeout = connectTimeout,
readTimeout = readTimeout,
writeTimeout = writeTimeout,
pingIntervalMillis = pingIntervalMillis,
connectionRetryEnabled = connectionRetryEnabled
)
// 如果成功的次數(shù)是0,表示找到的連接為新的,就不需要做全面的檢查直接返回
synchronized(connectionPool) {
if (candidate.successCount == 0) {
return candidate
}
}
// 檢查連接池里面的連接是否可用,如果可用則直接返回,否則從連接池中移除,進(jìn)入下一次循環(huán)查找
if (!candidate.isHealthy(doExtensiveHealthChecks)) {
candidate.noNewExchanges()
continue
}
return candidate
}
}
// ExchangeFinder.kt
@Throws(IOException::class)
private fun findConnection(
connectTimeout: Int,
readTimeout: Int,
writeTimeout: Int,
pingIntervalMillis: Int,
connectionRetryEnabled: Boolean
): RealConnection {
var foundPooledConnection = false
var result: RealConnection? = null
var selectedRoute: Route? = null
var releasedConnection: RealConnection?
val toClose: Socket?
synchronized(connectionPool) {
// 如果已經(jīng)通過發(fā)射器標(biāo)記了取消,則直接拋異常退出
if (transmitter.isCanceled) throw IOException("Canceled")
// 新嘗試的標(biāo)記
hasStreamFailure = false // This is a fresh attempt.
// 第一次進(jìn)來應(yīng)該為 null,因?yàn)樵谘h(huán)中所以可以先看下邊
releasedConnection = transmitter.connection
// 如果不為空,且持有的連接被標(biāo)記不能做更多的處理,則釋放資源(怎么釋放呢?不著急,我們先看怎么獲?。? toClose = if (transmitter.connection != null && transmitter.connection!!.noNewExchanges) {
transmitter.releaseConnectionNoEvents()
} else {
null
}
if (transmitter.connection != null) {
// 如果能走到這里表示,是復(fù)用的連接,而且沒有被上面的操作回收掉,即 noNewExchanges 為 false
result = transmitter.connection
releasedConnection = null
}
if (result == null) {
// result 為 null 需要嘗試從連接池里面獲取一個(gè)
if (connectionPool.transmitterAcquirePooledConnection(address, transmitter, null, false)) {
// 走到這里表示已經(jīng)從連接池里面獲取到
foundPooledConnection = true
result = transmitter.connection
} else if (nextRouteToTry != null) {
// 標(biāo)記1、第一次應(yīng)該不會(huì)到這里,目前看不明白,我們一會(huì)回來看就清楚了
selectedRoute = nextRouteToTry
nextRouteToTry = null
} else if (retryCurrentRoute()) {
// 第一次應(yīng)該不會(huì)到這里
selectedRoute = transmitter.connection!!.route()
}
}
}
// 關(guān)閉上面需要釋放的 socket
toClose?.closeQuietly()
if (releasedConnection != null) {
eventListener.connectionReleased(call, releasedConnection!!)
}
if (foundPooledConnection) {
eventListener.connectionAcquired(call, result!!)
}
if (result != null) {
// 復(fù)用的或者是剛剛從連接池里面查找到的
return result!!
}
// 創(chuàng)建一個(gè)路由選擇器,第一次進(jìn)來的話會(huì)從這里獲取
var newRouteSelection = false
if (selectedRoute == null && (routeSelection == null || !routeSelection!!.hasNext())) {
newRouteSelection = true
// 如果我們沒有自己給 OkHttpClient 設(shè)置額外的 Proxy 或者 ProxySelector 的話,
// routeSelector 創(chuàng)建的時(shí)候,會(huì)走到 sun.net.spi.DefaultProxySelector 獲取 Routes
// next 里面會(huì)把 Routes 依次通過 dns 解析地址之后重新封裝成 Route,
// 再經(jīng)過之前是否存在失敗的情況篩選,
// 最終保存在 Selection 對象里,返回。
routeSelection = routeSelector.next()
}
var routes: List<Route>? = null
synchronized(connectionPool) {
if (transmitter.isCanceled) throw IOException("Canceled")
if (newRouteSelection) {
// 通過上一步創(chuàng)建 routeSelection 的操作,我們獲取了一個(gè) ip 地址的集合
// 再次嘗試從連接池中匹配是否有合適的連接
// 與上一次不同的是 routes 對象不為空
routes = routeSelection!!.routes
if (connectionPool.transmitterAcquirePooledConnection(
address, transmitter, routes, false)) {
foundPooledConnection = true
result = transmitter.connection
}
}
if (!foundPooledConnection) {
if (selectedRoute == null) {
// 連接池沒有找到,那么我們從路由選擇中選中一個(gè)路由
selectedRoute = routeSelection!!.next()
}
// 創(chuàng)建一個(gè)全新的連接,
result = RealConnection(connectionPool, selectedRoute!!)
connectingConnection = result
}
}
// 如果是復(fù)用的那么到這里就結(jié)束了,如果是新建的還需要進(jìn)行其他的操作
if (foundPooledConnection) {
eventListener.connectionAcquired(call, result!!)
return result!!
}
// 新建的連接,建立 TCP 連接,如果是 Https 那么還需要進(jìn)行密鑰的協(xié)商
result!!.connect(
connectTimeout,
readTimeout,
writeTimeout,
pingIntervalMillis,
connectionRetryEnabled,
call,
eventListener
)
// 從失敗的路由集合中移除成功的路由
connectionPool.routeDatabase.connected(result!!.route())
var socket: Socket? = null
synchronized(connectionPool) {
connectingConnection = null
// 最終再次從連接池中獲取可以多路復(fù)用的連接,注意和之前兩次不同的是最后一個(gè)參數(shù)為 true
// 如果有多個(gè)并發(fā)的連接到同一個(gè) host 這時(shí)候后來的就能找到前面的
if (connectionPool.transmitterAcquirePooledConnection(address, transmitter, routes, true)) {
// 走到這里,就表示可以從連接池中獲取的現(xiàn)成的,那么剛剛新建的就可以回收掉了
result!!.noNewExchanges = true
socket = result!!.socket()
result = transmitter.connection
// 有可能剛獲取到的之后馬上這個(gè)連接就不可用了,那么我們把之前能成功連接的路由暫時(shí)保存一下
// 見前面注釋中標(biāo)記 1 的位置
nextRouteToTry = selectedRoute
} else {
// 沒有在連接池中找到,那么我們就把新建的添加進(jìn)去
connectionPool.put(result!!)
transmitter.acquireConnectionNoEvents(result!!)
}
}
socket?.closeQuietly()
eventListener.connectionAcquired(call, result!!)
return result!!
}
上面的步驟比較多,但是已經(jīng)詳細(xì)注釋了,有些地方可以第一次不能理解,但是繼續(xù)看下去就能發(fā)現(xiàn)什么情況才能調(diào)用到了,畢竟整個(gè)過程在一個(gè)循環(huán)中。
下面單獨(dú)提一下 connectionPool.transmitterAcquirePooledConnection、transmitter.acquireConnectionNoEvents 以及 transmitter.releaseConnectionNoEvents 三個(gè)方法,分別是 transmitter 從連接池獲取連接,取得后的綁定,和釋放的過程。
// RealConnectionPool.kt
fun transmitterAcquirePooledConnection(
address: Address,
transmitter: Transmitter,
routes: List<Route>?,
requireMultiplexed: Boolean
): Boolean {
assert(Thread.holdsLock(this))
// 遍歷所有連接
for (connection in connections) {
// 如果是多路復(fù)用,但是連接不支持則跳過
if (requireMultiplexed && !connection.isMultiplexed) continue
// 檢查是否合適,不合適則跳過
if (!connection.isEligible(address, routes)) continue
// 找到了,綁定一下
transmitter.acquireConnectionNoEvents(connection)
return true
}
return false
}
// Transmitter.kt
fun acquireConnectionNoEvents(connection: RealConnection) {
assert(Thread.holdsLock(connectionPool))
check(this.connection == null)
this.connection = connection
// 處理上很簡單,把 Transmitter 自身用弱引用添加進(jìn) RealConnection 的集合中
connection.transmitters.add(TransmitterReference(this, callStackTrace))
}
fun releaseConnectionNoEvents(): Socket? {
assert(Thread.holdsLock(connectionPool))
// 從 RealConnection 的請求集合中查找的自己的索引,然后移除并置空 this.connection 的引用
val index = connection!!.transmitters.indexOfFirst { it.get() == this@Transmitter }
check(index != -1)
val released = this.connection
released!!.transmitters.removeAt(index)
this.connection = null
// 進(jìn)一步判斷 RealConnection.transmitters 已經(jīng)空了,則讓連接池把這個(gè)連接移除
// 最終返回 socket 等待回收資源
if (released.transmitters.isEmpty()) {
released.idleAtNanos = System.nanoTime()
if (connectionPool.connectionBecameIdle(released)) {
return released.socket()
}
}
return null
}
通過前面三個(gè)方法,發(fā)射器對象通過持有連接的引用,然后持有的請求就會(huì)在這個(gè)連接處理;
而連接很可能是處理多個(gè)請求的,所以用集合保存了發(fā)射器對象的弱引用;
而每個(gè)請求完成的時(shí)候那么就需要從這個(gè)弱引用集合中移除,當(dāng)集合中所有的發(fā)射器對象都請求完畢之后,那么就可以考慮從連接池中移除這個(gè)連接釋放資源了。
5、網(wǎng)絡(luò)請求攔截器 CallServerInterceptor
開始真正的向服務(wù)器發(fā)送請求,讀取響應(yīng),數(shù)據(jù)交換。
@Throws(IOException::class)
override fun intercept(chain: Interceptor.Chain): Response {
val realChain = chain as RealInterceptorChain
val exchange = realChain.exchange()
val request = realChain.request()
val requestBody = request.body
val sentRequestMillis = System.currentTimeMillis()
// 1、
exchange.writeRequestHeaders(request)
var responseHeadersStarted = false
var responseBuilder: Response.Builder? = null
if (HttpMethod.permitsRequestBody(request.method) && requestBody != null) {
// 2、
if ("100-continue".equals(request.header("Expect"), ignoreCase = true)) {
exchange.flushRequest()
responseHeadersStarted = true
exchange.responseHeadersStart()
responseBuilder = exchange.readResponseHeaders(true)
}
// 3、
if (responseBuilder == null) {
if (requestBody.isDuplex()) {
exchange.flushRequest()
val bufferedRequestBody = exchange.createRequestBody(request, true).buffer()
requestBody.writeTo(bufferedRequestBody)
} else {
// 4、
val bufferedRequestBody = exchange.createRequestBody(request, false).buffer()
requestBody.writeTo(bufferedRequestBody)
bufferedRequestBody.close()
}
} else {
// 5、
exchange.noRequestBody()
if (!exchange.connection()!!.isMultiplexed) {
exchange.noNewExchangesOnConnection()
}
}
} else {
exchange.noRequestBody()
}
if (requestBody == null || !requestBody.isDuplex()) {
exchange.finishRequest()
}
if (!responseHeadersStarted) {
exchange.responseHeadersStart()
}
if (responseBuilder == null) {
// 6、
responseBuilder = exchange.readResponseHeaders(false)!!
}
var response = responseBuilder
.request(request)
.handshake(exchange.connection()!!.handshake())
.sentRequestAtMillis(sentRequestMillis)
.receivedResponseAtMillis(System.currentTimeMillis())
.build()
var code = response.code
if (code == 100) {
// 7、
response = exchange.readResponseHeaders(false)!!
.request(request)
.handshake(exchange.connection()!!.handshake())
.sentRequestAtMillis(sentRequestMillis)
.receivedResponseAtMillis(System.currentTimeMillis())
.build()
code = response.code
}
exchange.responseHeadersEnd(response)
response = if (forWebSocket && code == 101) {
// 8、
response.newBuilder()
.body(EMPTY_RESPONSE)
.build()
} else {
// 9、
response.newBuilder()
.body(exchange.openResponseBody(response))
.build()
}
// 10、
if ("close".equals(response.request.header("Connection"), ignoreCase = true) ||
"close".equals(response.header("Connection"), ignoreCase = true)) {
exchange.noNewExchangesOnConnection()
}
if ((code == 204 || code == 205) && response.body?.contentLength() ?: -1L > 0L) {
throw ProtocolException(
"HTTP $code had non-zero Content-Length: ${response.body?.contentLength()}")
}
return response
}
- 寫入請求頭。
- 如果不為 GET 或者 HEAD 請求,而且請求體不為空,檢查到 Expect: 100-continue 包含在請求頭中,則先發(fā)送請求頭,暫時(shí)不發(fā)送請求體。等待讀取響應(yīng)頭,如果這里得到服務(wù)器的響應(yīng)碼為 100,則獲得的 responseBuilder 為 null,否則不為 null。
- responseBuilder 為 null 表示服務(wù)器響應(yīng) 100,那么我就可以繼續(xù)發(fā)送請求體(先發(fā)響應(yīng)頭的操作就是為了減少帶寬消耗),ps:暫不討論請求體支持雙工的情況,因?yàn)闆]有看到支持雙工的子類。
- 根據(jù)步驟 2、3 那么現(xiàn)在就可以開始發(fā)送請求體了。
- 走到這一步,表示在步驟 2 中 Expect: 100-continue 的請求沒有被服務(wù)器同意,那么就不發(fā)送請求體,并標(biāo)記請求完成,針對不可以多路復(fù)用的連接則直接標(biāo)記使用完成。
- 沒有響應(yīng)頭的,就再次讀取響應(yīng)頭,經(jīng)歷過步驟 5 的不會(huì)走到這里。
- 如果步驟 6 中讀取的響應(yīng)碼是 100,就直接嘗試讀取真正的響應(yīng)。
- 如果是 WebSocket 且響應(yīng)碼為 101(升級(jí)協(xié)議),則給一個(gè)空的響應(yīng)體,準(zhǔn)備升級(jí)協(xié)議。
- 解析響應(yīng)體的類型、長度以及準(zhǔn)備字節(jié)流。
- 如果請求或者響應(yīng)頭里面包含 Connection:close,則標(biāo)記連接使用完畢,防止被重用。
- 針對響應(yīng)碼為 204(沒有新文檔)205(沒有新內(nèi)容),但是內(nèi)容長度又大于 0 的響應(yīng),直接拋出異常。
小結(jié)
沒有小結(jié),畢竟是源碼分析,看完了自己能理清流程才是真的收獲,不能建立整體的概念,太注重別人的總結(jié)的話最終會(huì)忽略很多細(xì)節(jié)。
結(jié)語
簡單分析了 OkHttp 的調(diào)用流程,以及各個(gè)攔截器的實(shí)現(xiàn),還有很多細(xì)節(jié)沒有提到,如果有興趣可以自己再鉆研一下,復(fù)雜的東西拆解了就不會(huì)那么困難。很多時(shí)候閱讀源碼第一次閱讀可能會(huì)的復(fù)雜,先粗略的了解建立整體的輪廓,再各個(gè)擊破才是閱讀源碼的方法。
另外,喜歡的同學(xué),覺得對自己有幫助的同學(xué),務(wù)必請花一點(diǎn)點(diǎn)時(shí)間幫我點(diǎn)個(gè)贊!點(diǎn)贊之交淡如水,但這個(gè)很重要!