Anroid OKhttp筆記1 流程分析
Android OkhttpInterceptor 筆記:RetryAndFollowUpInterceptor
Android OkhttpInterceptor 筆記:BridgeInterceptor
Android OkhttpInterceptor 筆記:ConnectInterceptor
Android OkhttpInterceptor 筆記:CacheInterceptor
Android OkhttpInterceptor 筆記:CallServerInterceptor
Android Okhttp筆記:ConnectionPool
Android Okhttp3:Dispatcher分析筆記
一、概述
Dispatcher是異步請求的執(zhí)行策略。在OkHttp中承擔(dān)著對同步和異步請求的分發(fā)和回調(diào).
1.1構(gòu)造方法
public Dispatcher(ExecutorService executorService) {
this.executorService = executorService;
}
public Dispatcher() {
}
1.2成員變量
private int maxRequests = 64;//最多并發(fā)請求的個(gè)數(shù)
private int maxRequestsPerHost = 5;//每個(gè)主機(jī)最大請求數(shù)
//執(zhí)行請求的線程池
private @Nullable ExecutorService executorService;
//準(zhǔn)備執(zhí)行的異步請求隊(duì)列
private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();
//正在執(zhí)行的異步請求隊(duì)列
private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();
//正在執(zhí)行的同步請求隊(duì)列
private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();
//閑置接口,用來處理當(dāng)dispatcher變?yōu)榭臻e狀態(tài)(即所有的請求數(shù)為0)時(shí)的回調(diào)。
private @Nullable Runnable idleCallback;
1.3線程池
public synchronized ExecutorService executorService() {
if (executorService == null) {
executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
}
return executorService;
}
在Call去調(diào)用到Dispatcher的對應(yīng)的方法的時(shí)候才會(huì)去創(chuàng)建該實(shí)例,核心線程的個(gè)數(shù)為0,線程池的線程數(shù)的最大值為 Integer.MAX_VALUE,超時(shí)時(shí)間為60s,阻塞隊(duì)列為同步隊(duì)列。
二、分析
Dispatcher中共存有三個(gè)隊(duì)列:
readyAsyncCalls 等待執(zhí)行異步隊(duì)列
runningAsyncCalls 正在執(zhí)行異步隊(duì)列
runningSyncCalls 正在執(zhí)行同步隊(duì)列
RealCall在執(zhí)行execute/enqueue時(shí),就是將這次請求加入到Dispatcher的三個(gè)集合中。而 Dispatcher通過管理隊(duì)列集合元素對同步和異步請求進(jìn)行分發(fā)和回調(diào)。
2.1同步請求執(zhí)行過程
Call.execute()
繼續(xù)調(diào)用 RealCall. execute()方法
public Response execute() throws IOException {
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
captureCallStackTrace();
timeout.enter();
eventListener.callStart(this);
try {
//此處請求將進(jìn)入Dispatcher對象內(nèi)做相應(yīng)的調(diào)度處理
client.dispatcher().executed(this);
//做真正的網(wǎng)絡(luò)請求
Response result = getResponseWithInterceptorChain();
if (result == null) throw new IOException("Canceled");
return result;
} catch (IOException e) {
e = timeoutExit(e);
eventListener.callFailed(this, e);
throw e;
} finally {
client.dispatcher().finished(this);
}
}
client.dispatcher().executed(this):
synchronized void executed(RealCall call) {
runningSyncCalls.add(call);
}
將RealCall加入到runningSyncCalls集合中。這個(gè)Dispatcher管理的線程池并沒有對同步請求分發(fā)進(jìn)行管理,當(dāng)處理完返回結(jié)果的時(shí)候,會(huì)調(diào)用Dispatcher的finish()來進(jìn)行將該次的RealCall從隊(duì)列集合中清空,然后在重新計(jì)算Dispatcher是否為空閑狀態(tài)。
/** Used by {@code Call#execute} to signal completion. */
void finished(RealCall call) {
finished(runningSyncCalls, call, false);
}
private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
int runningCallsCount;
Runnable idleCallback;
synchronized (this) {
//將該call從集合中清空
if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
//如果是true才會(huì)調(diào)用,也就是異步請求的時(shí)候才會(huì)調(diào)用
if (promoteCalls) promoteCalls();
//計(jì)算里面正在運(yùn)行中請求的數(shù)量
runningCallsCount = runningCallsCount();
idleCallback = this.idleCallback;
}
//如果Dispatcher空閑,則回調(diào)idleCallback
if (runningCallsCount == 0 && idleCallback != null) {
idleCallback.run();
}
}
2.2異步
異步請求call.enqueue();也是調(diào)用到了RealCall中的enqueue(),進(jìn)入到源碼中可以看到
@Override public void enqueue(Callback responseCallback) {
//.......省略代碼
client.dispatcher().enqueue(new AsyncCall(responseCallback));
}
進(jìn)入到Dispatcher的 enqueue()中可以看到在Dispatcher里面會(huì)去判斷是否到達(dá)最大請求數(shù),如果沒有的話,直接將AsyncCall加入到runningAsyncCalls集合中,并執(zhí)行該AsyncCall的run();否則就將AsyncCall加入到緩存隊(duì)列readyAsyncCalls中。
synchronized void enqueue(AsyncCall call) {
if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
runningAsyncCalls.add(call);
executorService().execute(call);
} else {
readyAsyncCalls.add(call);
}
}
2.3AsyncCall
AsyncCall extends NamedRunnable ,NamedRunnable 的run方法為:
@Override public final void run() {
String oldName = Thread.currentThread().getName();
Thread.currentThread().setName(name);
try {
execute();
} finally {
Thread.currentThread().setName(oldName);
}
}
protected abstract void execute();
所以AsyncCall實(shí)現(xiàn)了NamedRunnable 的execute();
@Override protected void execute() {
try {
//請求的過程,注意這里也是阻塞的
Response response = getResponseWithInterceptorChain();
//先不管這個(gè)Interceptor是干嘛的,下面的代碼可以理解為:
//如果沒有被取消,并且沒有發(fā)生異常,回調(diào)onResponse方法。
//如果發(fā)生了異?;蛘弑蝗∠卣{(diào)onFailure方法。
if (retryAndFollowUpInterceptor.isCanceled()) {
//此請求被取消了,回調(diào)onFailure
responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
} else {
//此請求成功了,回調(diào)onResponse
responseCallback.onResponse(RealCall.this, response);
}
} catch (IOException e) {
//發(fā)生了異常,回調(diào)onFailure
responseCallback.onFailure(RealCall.this, e);
} finally {
//通知Dispatcher Call被執(zhí)行完畢了
client.dispatcher().finished(this);
}
}
我們可以看到同樣通過 Response result = getResponseWithInterceptorChain();對發(fā)送同步請求和處理請求返回的結(jié)果。最后調(diào)用Dispatcher的finish()。但是和同步請求不同的是,此時(shí)傳入的參數(shù)發(fā)生了變化
void finished(AsyncCall call) {
finished(runningAsyncCalls, call, true);
}
所以最終會(huì)比同步請求多一個(gè)調(diào)用promoteCalls();
private void promoteCalls() {
//正在運(yùn)行的隊(duì)列中如果超出了最大鏈接數(shù),直接返回
if (runningAsyncCalls.size() >= maxRequests) return; // Already running max capacity.
//沒有緩存的請求,也直接返回
if (readyAsyncCalls.isEmpty()) return; // No ready calls to promote.
//從請求緩存隊(duì)列中取出一個(gè)元素,加入到運(yùn)行請求隊(duì)列中,然后執(zhí)行該次請求
for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
AsyncCall call = i.next();
if (runningCallsForHost(call) < maxRequestsPerHost) {
i.remove();
runningAsyncCalls.add(call);
executorService().execute(call);
}
if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity.
}
}
promoteCalls()方法就是試圖去readyAsyncCalls中取出Call來加入runningAsyncCalls中執(zhí)行。

Q/?為什么這么做?
同步調(diào)用結(jié)束因?yàn)椴]有涉及到runningAsyncCalls中的任何東西,對runningAsyncCalls沒任何影響,所以不需要調(diào)用promoteCalls。而異步的調(diào)用結(jié)束意味著runningAsyncCalls中會(huì)出現(xiàn)一個(gè)空位值,所以它會(huì)調(diào)用promoteCalls去嘗試從readyAsyncCalls中拉一個(gè)進(jìn)來。
三、總結(jié)
1)Dispatcher只是把同步請求放入了Dispatcher中的同步請求的緩存隊(duì)列中,這個(gè)執(zhí)行并不是由Dispatcher調(diào)度的。僅僅用來統(tǒng)計(jì)正在運(yùn)行的請求個(gè)數(shù)。
2)Dispatcher會(huì)對異步請求進(jìn)行緩存管理,并且有正在運(yùn)行的異步請求執(zhí)行完畢,只要沒有超出最大鏈接數(shù),就會(huì)自動(dòng)從緩存中取出一個(gè)請求加入到正在運(yùn)行的請求隊(duì)列中,等待執(zhí)行。
3)同步請求沒有另開線程去執(zhí)行,所以需要將代碼放到子線程中去執(zhí)行;而異步請求已經(jīng)在子線程中執(zhí)行了,不在需要開啟子線程執(zhí)行代碼,所有異步執(zhí)行的請求都會(huì)通過executorService線程池。
Q/?為什么不直接使用線程池來維護(hù)請求隊(duì)列?
個(gè)人理解dispatch配合阻塞式線程池的優(yōu)點(diǎn):
1.不管是同步還是異步請求,在try/finally中調(diào)用了finished函數(shù),是主動(dòng)控制等待隊(duì)列的移動(dòng),而不是采用鎖或者wait/notify,操作更加簡易,更容易控制隊(duì)列。
2.使用的ArrayDeque隊(duì)列,使用了可變數(shù)組實(shí)現(xiàn),效率高于LinkedList。dispatch主動(dòng)控制隊(duì)列然后提交給線程池執(zhí)行,線程池的核心線程數(shù)為0,最大線程數(shù)為Integer.MAX_VALUE,非核心線程的最大空閑時(shí)間為60秒, 任務(wù)隊(duì)列用SynchronousQueue隊(duì)列,這是一個(gè)無緩沖的阻塞隊(duì)列,有任務(wù)到達(dá)的時(shí)候,只要沒有空閑線程,就會(huì)創(chuàng)建一個(gè)新的線程來執(zhí)行,這樣的設(shè)計(jì)達(dá)到了高并發(fā),低阻塞的運(yùn)行。
@date 2020年4月6日 17:54:40
拖了很久的學(xué)習(xí)筆記總算填完坑了。
Anroid OKhttp筆記1 流程分析
Android OkhttpInterceptor 筆記:RetryAndFollowUpInterceptor
Android OkhttpInterceptor 筆記:BridgeInterceptor
Android OkhttpInterceptor 筆記:ConnectInterceptor
Android OkhttpInterceptor 筆記:CacheInterceptor
Android OkhttpInterceptor 筆記:CallServerInterceptor
Android Okhttp筆記:ConnectionPool
Android Okhttp3:Dispatcher分析筆記