
okhttp 流程圖
val client = OkHttpClient.Builder().build()
val request = Request.Builder().get().url("https://www.baidu.com/").build()
// 同步請(qǐng)求
val response:Response = client.newCall(request).execute()
Log.e("===",response.body?.string()?:"Null")
//異步請(qǐng)求
client.newCall(request).enqueue(object : Callback {
override fun onFailure(call: Call, e: IOException) {}
override fun onResponse(call: Call, response: Response) {}
})
1、OkHttpClient 的創(chuàng)建
val client = OkHttpClient.Builder().build()
通過(guò) Builder 模式設(shè)置OkHttpClient 的參數(shù),比如超時(shí)、dns等,可以設(shè)置 dns 解決 IPV6 訪問(wèn)慢問(wèn)題解決 解決方案
2、創(chuàng)建 Request
- 設(shè)置 url url(url: String)
- 設(shè)置 header addHeader(name: String, value: String)
- 設(shè)置 method
- get() = method("GET", null)
- post(body: RequestBody) = method("POST", body)
- method(method: String, body: RequestBody?)
- 設(shè)置 body method(method: String, body: RequestBody?)
3、創(chuàng)建 RealCall
用來(lái)包裹一個(gè)請(qǐng)求,這個(gè)請(qǐng)求可以被取消,一個(gè)請(qǐng)求只能被執(zhí)行一次
/**
* 用來(lái)包裹一個(gè)請(qǐng)求,這個(gè)請(qǐng)求可以被取消,一個(gè)請(qǐng)求只能被執(zhí)行一次
*/
interface Call : Cloneable {
/** 真正請(qǐng)求包含 url、header、method、body */
fun request(): Request
/**
* 同步請(qǐng)求 為了防止內(nèi)存泄露 需要關(guān)閉 Response
*/
@Throws(IOException::class)
fun execute(): Response
/**
* 異步請(qǐng)求
* @param responseCallback 請(qǐng)求回調(diào)
*/
fun enqueue(responseCallback: Callback)
/**
* 取消請(qǐng)求,但是已經(jīng)執(zhí)行完成的請(qǐng)求無(wú)法取消
*/
fun cancel()
/**
* 如果執(zhí)行了 executed 或 enqueued 返回 true
* 如果一個(gè) call 被執(zhí)行的次數(shù)多余1次就會(huì)拋出異常
*/
fun isExecuted(): Boolean
/**
* 判定 這個(gè)call 是否被被取消了
*/
fun isCanceled(): Boolean
/**
* 通過(guò) OkHttpClient.Builder.callTimeout 設(shè)置
* 包含 DNS解析、鏈接、寫入請(qǐng)求、服務(wù)器處理、讀取響應(yīng)
* 如果包含重定向和重試 這些用時(shí)也包含其中
*/
fun timeout(): Timeout
/**
* 重新復(fù)制一個(gè)新的 call ,可以被再次執(zhí)行
*/
public override fun clone(): Call
/**
* 創(chuàng)建一個(gè)新的Call
*/
fun interface Factory {
fun newCall(request: Request): Call
}
}
4、同步流程

同步流程
override fun execute(): Response {
// CAS 判定這個(gè) Call 是否被執(zhí)行,如果被執(zhí)行就 返回異常
check(executed.compareAndSet(false, true)) { "Already Executed" }
timeout.enter() //超時(shí)計(jì)時(shí)開始
callStart() //回調(diào)請(qǐng)求監(jiān)聽器的請(qǐng)求開始
try {
client.dispatcher.executed(this) //放入隊(duì)列
return getResponseWithInterceptorChain() // 執(zhí)行請(qǐng)求獲取結(jié)果
} finally {
client.dispatcher.finished(this) //請(qǐng)求結(jié)束
}
}
- 這是一個(gè)同步的方法,無(wú)論多少線程調(diào)用 execute 方法都被阻塞掉
- runningSyncCalls 是一個(gè)雙向列表
@Synchronized
internal fun executed(call: RealCall) {
runningSyncCalls.add(call)
}
internal fun finished(call: RealCall) {
finished(runningSyncCalls, call)
}
//異步、同步的結(jié)束都會(huì)走到這里:從running中移除,并調(diào)用promoteAndExecute
private fun <T> finished(calls: Deque<T>, call: T) {
val idleCallback: Runnable?
synchronized(this) {
//從隊(duì)列中移除
if (!calls.remove(call)) throw AssertionError("Call wasn't in-flight!")
idleCallback = this.idleCallback
}
val isRunning = promoteAndExecute() //當(dāng)一個(gè)請(qǐng)求執(zhí)行完成 會(huì)調(diào)用這個(gè)方法 執(zhí)行新的任務(wù)
//如果異步調(diào)用都執(zhí)行完成 網(wǎng)絡(luò)空隙的回調(diào)
// 通過(guò) client.dispatcher.idleCallback 可以設(shè)置回調(diào)
if (!isRunning && idleCallback != null) {
idleCallback.run()
}
}
5、異步流程

異步流程
override fun enqueue(responseCallback: Callback) {
check(executed.compareAndSet(false, true)) { "Already Executed" }
callStart() //回調(diào)請(qǐng)求監(jiān)聽器的請(qǐng)求開始
client.dispatcher.enqueue(AsyncCall(responseCallback)) //請(qǐng)求調(diào)度
}
// 繼承自 Runable 在線程池中執(zhí)行
internal inner class AsyncCall(
private val responseCallback: Callback // 執(zhí)行完成的回調(diào)函數(shù)
) : Runnable {
//相同的 host 的請(qǐng)求通用一個(gè) AtomicInteger 一個(gè) socket 最多有5個(gè)鏈接
@Volatile
var callsPerHost = AtomicInteger(0)
private set
fun reuseCallsPerHostFrom(other: AsyncCall) {
this.callsPerHost = other.callsPerHost
}
val host: String
get() = originalRequest.url.host
val request: Request
get() = originalRequest
val call: RealCall
get() = this@RealCall
/**
* 通過(guò)線程池執(zhí)行
*/
fun executeOn(executorService: ExecutorService) {
client.dispatcher.assertThreadDoesntHoldLock()
var success = false
try {
executorService.execute(this)
success = true
} catch (e: RejectedExecutionException) {
val ioException = InterruptedIOException("executor rejected")
ioException.initCause(e)
noMoreExchanges(ioException)
responseCallback.onFailure(this@RealCall, ioException)
} finally {
if (!success) {
client.dispatcher.finished(this) // This call is no longer running!
}
}
}
/**
* 執(zhí)行的方法
*/
override fun run() {
threadName("OkHttp ${redactedUrl()}") {
var signalledCallback = false
timeout.enter()
try {
val response = getResponseWithInterceptorChain() // 和同步RealCall的方法一直
signalledCallback = true
responseCallback.onResponse(this@RealCall, response)
} catch (e: IOException) {
if (signalledCallback) {
// Do not signal the callback twice!
Platform.get().log("Callback failure for ${toLoggableString()}", Platform.INFO, e)
} else {
responseCallback.onFailure(this@RealCall, e)
}
} catch (t: Throwable) {
cancel()
if (!signalledCallback) {
val canceledException = IOException("canceled due to $t")
canceledException.addSuppressed(t)
responseCallback.onFailure(this@RealCall, canceledException)
}
throw t
} finally {
client.dispatcher.finished(this)
}
}
}
}
private fun promoteAndExecute(): Boolean {
this.assertThreadDoesntHoldLock()
val executableCalls = mutableListOf<AsyncCall>()
val isRunning: Boolean
synchronized(this) {
val i = readyAsyncCalls.iterator()
while (i.hasNext()) {
val asyncCall = i.next() //取出一個(gè) Call
// runningAsyncCalls 的個(gè)數(shù)大于64 就停止加入
if (runningAsyncCalls.size >= this.maxRequests) break
// 一個(gè)socket 最多有 5個(gè)請(qǐng)求 如果大于5個(gè)請(qǐng)求就會(huì)繼續(xù)添加
if (asyncCall.callsPerHost.get() >= this.maxRequestsPerHost) continue // Host max capacity.
i.remove() //從 readyAsyncCalls 中移除
asyncCall.callsPerHost.incrementAndGet() //callsPerHost 增加
executableCalls.add(asyncCall)
runningAsyncCalls.add(asyncCall) //加入到 執(zhí)行中的列表
}
isRunning = runningCallsCount() > 0
}
for (i in 0 until executableCalls.size) {
val asyncCall = executableCalls[i]
asyncCall.executeOn(executorService) // 放入到線程池中執(zhí)行
}
return isRunning //返回執(zhí)行中的 call 不為空 不是空標(biāo)識(shí) 有任務(wù)需要執(zhí)行
}
6、getResponseWithInterceptorChain
無(wú)論是異步請(qǐng)求還是同步請(qǐng)求都會(huì)走到 RealCall中的這個(gè)方法,是責(zé)任鏈模式
@Throws(IOException::class)
internal fun getResponseWithInterceptorChain(): Response {
// Build a full stack of interceptors.
val interceptors = mutableListOf<Interceptor>()
//使用者配置的 應(yīng)用攔截器,最先攔截 使用 OkHttpClient.Builder().addInterceptor(interceptor: Interceptor)
interceptors += client.interceptors
interceptors += RetryAndFollowUpInterceptor(client) //重試跟進(jìn)攔截器
interceptors += BridgeInterceptor(client.cookieJar) //橋攔截器
interceptors += CacheInterceptor(client.cache) //緩存攔截器
interceptors += ConnectInterceptor //連接攔截器
if (!forWebSocket) {
//使用者配置的網(wǎng)絡(luò)攔截器 OkHttpClient.Builder().addNetworkInterceptor(interceptor: Interceptor)
interceptors += client.networkInterceptors
}
interceptors += CallServerInterceptor(forWebSocket) //請(qǐng)求服務(wù)攔截器
//攔截器鏈
val chain = RealInterceptorChain(
call = this,
interceptors = interceptors,
index = 0,
exchange = null,
request = originalRequest,
connectTimeoutMillis = client.connectTimeoutMillis,
readTimeoutMillis = client.readTimeoutMillis,
writeTimeoutMillis = client.writeTimeoutMillis
)
var calledNoMoreExchanges = false
try {
val response = chain.proceed(originalRequest)
if (isCanceled()) {
response.closeQuietly()
throw IOException("Canceled")
}
return response
} catch (e: IOException) {
calledNoMoreExchanges = true
throw noMoreExchanges(e) as Throwable
} finally {
if (!calledNoMoreExchanges) {
noMoreExchanges(null)
}
}
}