前言
幾個月前,跟過 OkHttp 的流程源碼,但是時間久了,現(xiàn)在能夠回想起來的的,只有幾個攔截器了,那我豈不是沒什么收獲了。所以,好好想想,我從 OkHttp 中能夠?qū)W到什么
疑問
- OkHttp 是怎么拆分功能的,大概有幾個模塊
- 它所用到的責(zé)任鏈模式,在實際開發(fā)中適合哪些場景
- 它是怎么使用線程池的,這么用有什么好處
OkHttp 是怎么拆分功能的,大概有幾個模塊
作為一個網(wǎng)絡(luò)框架,最核心的功能就是發(fā)起請求,處理響應(yīng)了,這倆個是功能部分, OkHttp 使用 Dispatcher 執(zhí)行任務(wù),內(nèi)部是一個高并發(fā)的線程池,另外整個流程的處理使用到了Interceptor 攔截器
請求執(zhí)行的調(diào)用流程
- Recall.enqueue(Callback)
- client.dispatcher().enqueue(new AsyncCall(responseCallback))
停一下,跟進(jìn)一下分發(fā)器器的 enqueue 方法,內(nèi)部時怎么處理異步請求的
// 創(chuàng)建 AsyncCall 對象
void enqueue(AsyncCall call) {
synchronized (this) {
// 添加到待執(zhí)行隊列中,雙向隊列
readyAsyncCalls.add(call);
}
// 執(zhí)行的重要代碼
promoteAndExecute();
}
private boolean promoteAndExecute() {
assert (!Thread.holdsLock(this));
// 創(chuàng)建一個可執(zhí)行隊列,目的是限制 64 個最大連接數(shù),每個 Host 最多 5 個連接的限制
List<AsyncCall> executableCalls = new ArrayList<>();
boolean isRunning;
synchronized (this) {
for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
AsyncCall asyncCall = i.next();
if (runningAsyncCalls.size() >= maxRequests) break; // Max capacity.
if (runningCallsForHost(asyncCall) >= maxRequestsPerHost) continue; // Host max capacity.
i.remove();
// 符合條件的,從準(zhǔn)備中的隊列挪到其他兩個隊列中
executableCalls.add(asyncCall);
runningAsyncCalls.add(asyncCall);
}
isRunning = runningCallsCount() > 0;
}
// 放到線程池中執(zhí)行
for (int i = 0, size = executableCalls.size(); i < size; i++) {
AsyncCall asyncCall = executableCalls.get(i);
asyncCall.executeOn(executorService());
}
return isRunning;
}
首先 AsyncCall 是實現(xiàn)了 Runnable 的一個類,也難怪,畢竟是要放到線程池中執(zhí)行的
響應(yīng)執(zhí)行的調(diào)用過程
剛才看了請求入隊后,經(jīng)過篩選,開始遍歷放到線程池中去執(zhí)行,那下來就是等待響應(yīng)了
具體代碼在 Recall 類的 run() 方法中,里面就是我們之前了解到的使用責(zé)任鏈模式-攔截器的代碼了
為什么要使用雙端隊列 ArrayDeque ?
OkHttp 源碼系列 之 ArrayDeque - 雙端隊列
首先 Jdk 提供的雙端隊列主要有兩個:
- LinkedList 鏈表實現(xiàn)的雙端隊列
- ArrayDeque 循環(huán)數(shù)組實現(xiàn)的雙端隊列
那為什么偏偏就選了 ArrayDeque 了,因為效率高.
Jdk 的說明中就說了,ArrayDeque 作為隊列使用時,將比 LinkedList 更快
ArrayQueue 是線程不安全的,Okhttp 是怎樣保證同步問題的? synchronized 關(guān)鍵字
OkHttp 是怎樣使用責(zé)任鏈模式的,Android 源碼中還有其他地方用到了嘛?
那就首先要補(bǔ)習(xí)下責(zé)任鏈模式是什么,以及怎樣用代碼實現(xiàn)
在網(wǎng)上查了很多文檔,有一種表述我覺得很形象,小張去外地出差回來,其中 2w 要去招公司報銷,他去找到組長
- 組長看到發(fā)票,面值超過了權(quán)限,說讓小張去找主管
- 主管一看,自己最大只能簽 3k 的,讓其去找經(jīng)理
- 經(jīng)理最大只能批 1w 的,讓小張去找老板
- 最終老板簽字處理
整個流程涉及到多個類(組長、主管、經(jīng)理等),一級一級的處理,最終處理結(jié)束
Android 源碼中 View 的事件分發(fā)也是使用了責(zé)任鏈模式,其中被分發(fā)的 MotionEvent 經(jīng)過 ViewGroup 層層分發(fā),最終被消費或者重新返回到最上層的 View
- 那 OkHttp 是怎樣實現(xiàn)責(zé)任鏈的呢?
Let is see fucking code ,Woohooo
// 同步執(zhí)行的代碼中
@Override protected void execute() {
boolean signalledCallback = false;
transmitter.timeoutEnter();
try {
// 使用責(zé)任鏈處理請求響應(yīng),重點看這里
Response response = getResponseWithInterceptorChain();
......
}
}
Response getResponseWithInterceptorChain() throws IOException {
// 創(chuàng)建一個攔截器隊列
List<Interceptor> interceptors = new ArrayList<>();
interceptors.addAll(client.interceptors());
interceptors.add(new RetryAndFollowUpInterceptor(client));
interceptors.add(new BridgeInterceptor(client.cookieJar()));
interceptors.add(new CacheInterceptor(client.internalCache()));
interceptors.add(new ConnectInterceptor(client));
if (!forWebSocket) {
interceptors.addAll(client.networkInterceptors());
}
interceptors.add(new CallServerInterceptor(forWebSocket));
// 添加完了,開始逐個處理了
Interceptor.Chain chain = new RealInterceptorChain(interceptors, transmitter, null, 0,
originalRequest, this, client.connectTimeoutMillis(),
client.readTimeoutMillis(), client.writeTimeoutMillis());
boolean calledNoMoreExchanges = false;
try {
// proceed 執(zhí)行后得到響應(yīng)結(jié)果
Response response = chain.proceed(originalRequest);
if (transmitter.isCanceled()) {
closeQuietly(response);
throw new IOException("Canceled");
}
return response;
} catch (IOException e) {
calledNoMoreExchanges = true;
throw transmitter.noMoreExchanges(e);
} finally {
if (!calledNoMoreExchanges) {
transmitter.noMoreExchanges(null);
}
}
}
這里使用了隊列保存所有的攔截器,然后一股腦傳進(jìn)了 RealInterceptorChain 對象中,最后調(diào)用 proceed 就有個返回結(jié)果,停,一下子就獲取到了結(jié)果了嘛? 進(jìn)去看看
RealInterceptorChain 里面是怎么處理的
public Response proceed(Request request, Transmitter transmitter, @Nullable Exchange exchange)
throws IOException {
... 省略部分代碼
// 看到 next 我就想起來鏈表里的 next,這個 next 是干嘛的呢?
RealInterceptorChain next = new RealInterceptorChain(interceptors, transmitter, exchange,
index + 1, request, call, connectTimeout, readTimeout, writeTimeout);
// 根據(jù) index 獲取對應(yīng)位置的攔截器
Interceptor interceptor = interceptors.get(index);
// 調(diào)用攔截器的攔截方法
Response response = interceptor.intercept(next);
// Confirm that the next interceptor made its required call to chain.proceed()
... 省略部分代碼
return response;
}
有一個地方?jīng)]看明白,創(chuàng)建 Chain 后,調(diào)用 proceed ,剛開始 index 為 0,那整個隊列是怎么遍歷的,沒看到有循環(huán)遍歷語句啊
進(jìn)入 interceptor.intercept(next) 后,一切截然而止了,懷著好奇心,點開了 interceptor 的實現(xiàn)類 CacheInterceptor ,果不其然,在 intercept 方法中,再次看到了 proceed 的身影,終于破案了
@Override public Response intercept(Chain chain) throws IOException {
... 省略部分代碼
// 每調(diào)用一次,內(nèi)部的 index 自增,就意味著不斷的傳遞到下一級攔截器
networkResponse = chain.proceed(networkRequest);
... 省略部分代碼
return response;
OkHttp 是怎么使用線程池的
最后一個問題,怎么使用線程池的,首先補(bǔ)習(xí)下線程池的各個參數(shù)的含義,以及線程池的工作原理
// OkHttp 中的線程池
public synchronized ExecutorService executorService() {
if (executorService == null) {
executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
new SynchronousQueue<>(), Util.threadFactory("OkHttp Dispatcher", false));
}
return executorService;
}
// 線程池的構(gòu)造方法
// corePoolSize – the number of threads to keep in the pool, even if they are idle, unless allowCoreThreadTimeOut is set
// maximumPoolSize – the maximum number of threads to allow in the pool
// keepAliveTime – when the number of threads is greater than the core, this is the maximum time that excess idle threads will wait for new tasks before terminating.
// unit – the time unit for the keepAliveTime argument
// workQueue – the queue to use for holding tasks before they are executed. This queue will hold only the Runnable tasks submitted by the execute method.
// threadFactory – the factory to use when the executor creates a new thread
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory, defaultHandler);
}
創(chuàng)建參數(shù):
- 核心線程數(shù),核心線程將保持存活,及時是空閑的,這里是 0
- 池中允許的最大線程數(shù),這里是 Int 的最大值,實際到不了這么多,OkHttp 有 64 最大連接數(shù)的限制
- 等待時間 可以理解為非核心線程等待任務(wù)時的超時時間 ,這里為 60 秒
- 等待時間的單位
- 工作隊列,這里是一個同步的阻塞隊列,內(nèi)部沒有容器,傳入一個時就會阻塞下一個的傳入
- 線程工廠
- 為什么 OkHttp 要這么設(shè)置線程池,有什么好處呢?
其實這種參數(shù)設(shè)置,就是 Excutor.newCachedThreadPool() ,
首先一個阻塞的同步隊列,內(nèi)部沒有容器,意味著什么呢,每當(dāng)一個網(wǎng)絡(luò)請求發(fā)起,只要核心線程滿了,就會在池中創(chuàng)建新的線程
如果線程池中的線程數(shù)大于核心線程數(shù)且隊列滿了,且線程數(shù)小于最大線程數(shù),則會創(chuàng)建新的線程,剛好 Okhttp 的最大線程數(shù)時是一個極大值,那就會不斷創(chuàng)建線程,是一個高并發(fā)的線程池