前言
之前看了好多開源庫的源碼,奈何工作原因,每次看都是零零碎碎的一段段的源碼,沒有一個(gè)大局觀。最近抽空把一些源碼重新翻看了一遍,理理大致流程,并做相關(guān)的記錄,以便以后查看時(shí)能一眼就回憶起來。
okhttp使用流程
使用時(shí),無非就是先創(chuàng)建一個(gè)okhttpclient的實(shí)例,然后再通過client創(chuàng)建Call,再使用call.enqueue()方法執(zhí)行異步請(qǐng)求或者call.execute()執(zhí)行同步請(qǐng)求,代碼大概就是下面的樣子,但凡點(diǎn)進(jìn)來看到這個(gè)文章的,基本上都使用過okhttp,這里不多做贅述。
OkHttpClient okHttpClient = new OkHttpClient.Builder().build();//注釋1
Call call = okHttpClient.newCall(new Request());//注釋2
//異步
call.enqueue(new Callback() {//注釋3
@Override
public void onFailure(@NotNull Call call, @NotNull IOException e) {
}
@Override
public void onResponse(@NotNull Call call, @NotNull Response response) throws IOException {
}
});
// call.execute();//同步
看源碼
從使用的入口我們點(diǎn)進(jìn)去查看源碼,注釋1中使用build模式創(chuàng)建了一個(gè)okhttpclient,可以在build中進(jìn)行個(gè)性化設(shè)置,比如超時(shí)時(shí)間,代理,緩存等。
注釋2中newCall創(chuàng)建了一個(gè)繼承自Call的RealCall,源碼如下:
/**
* Prepares the {@code request} to be executed at some point in the future.
*/
@Override
public Call newCall(Request request) {
return RealCall.newRealCall(this, request, false /* for web socket */);
}
通過Realcall的一個(gè)靜態(tài)方法newRealCall來創(chuàng)建RealCall的實(shí)例。
在注釋3中RealCall會(huì)執(zhí)行enqueue來進(jìn)行異步請(qǐng)求,源碼如下:
@Override
public void enqueue(Callback responseCallback) {
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
captureCallStackTrace();
eventListener.callStart(this);
client.dispatcher().enqueue(new AsyncCall(responseCallback));//注釋1
}
這個(gè)方法中被synchronized包裹的代碼是為了確保這個(gè)方法只執(zhí)行一次,如果同一個(gè)RealCall實(shí)例試圖第二次調(diào)用這個(gè)方法時(shí),就會(huì)報(bào)錯(cuò)IllegalStateException("Already Executed");,我們需要看的重點(diǎn)是注釋1這一行這一行里面使用了okhttpclient的分發(fā)器,這個(gè)分發(fā)器的作用就是將我們的各種Call放到runningCall和readyCall的隊(duì)列中。
- runningCall:正在運(yùn)行的隊(duì)列
- readyCall:就緒(準(zhǔn)備運(yùn)行的)的隊(duì)列
就此引出兩個(gè)問題:
- 一個(gè)call過來什么時(shí)候才應(yīng)該放到runningCall隊(duì)列,又是什么時(shí)候放到readyCall隊(duì)列呢?
- 還有就是在readyCall的隊(duì)列要什么時(shí)候才能運(yùn)行呢?
我們繼續(xù)往下追蹤
client.dispatcher().enqueue(new AsyncCall(responseCallback))執(zhí)行這個(gè)方法后會(huì)調(diào)用下面的方法
synchronized void enqueue(AsyncCall call) {
if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {//注釋1
runningAsyncCalls.add(call);
executorService().execute(call);
} else {
readyAsyncCalls.add(call);
}
}
注釋1這一行是分揀call實(shí)例的核心,上面有兩個(gè)條件,第一個(gè)runningAsyncCalls.size() < maxRequests 表示在runningCall隊(duì)列中排隊(duì)的call要小于maxRequests (默認(rèn)為64);第二個(gè)條件runningCallsForHost(call) < maxRequestsPerHost 表示訪問相同主機(jī)的call的數(shù)量要小于maxRequestsPerHost(默認(rèn)為5),如果同時(shí)滿足這兩個(gè)條件就直接放到runningCall中執(zhí)行,如果不滿足就放到readyCall中就緒。
runningCallsForHost 源碼如下
/**
* Returns the number of running calls that share a host with {@code call}.
*/
private int runningCallsForHost(AsyncCall call) {
int result = 0;
for (AsyncCall c : runningAsyncCalls) {//遍歷整個(gè)runningCall隊(duì)列
if (c.get().forWebSocket) continue;
if (c.host().equals(call.host())) result++;//如果發(fā)現(xiàn)和當(dāng)前call訪問相同的主機(jī),就將數(shù)量++
}
return result;
}
這樣第一個(gè)問題就解決了
在放入runningCall隊(duì)列后,會(huì)執(zhí)行executorService().execute(call); 這里就是獲取/創(chuàng)建線程池,并執(zhí)行runnable(call),其中executorService()方法如下:
public synchronized ExecutorService executorService() {
if (executorService == null) {
executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher",
false));
}
return executorService;
}
就是創(chuàng)建了一個(gè)線程池,這個(gè)線程池是一個(gè)高并發(fā)的寫法,保證了最高的吞吐量。這里大概講一下,線程池的構(gòu)造函數(shù)中有幾個(gè)參數(shù)第一個(gè)是核心線程數(shù),第二個(gè)是最大線程數(shù),第三個(gè)是空閑線程存活的最長時(shí)間,第四個(gè)是第三個(gè)參數(shù)的單位,第五個(gè)是隊(duì)列(一會(huì)兒講這個(gè)隊(duì)列是干嘛的),第六個(gè)是線程池的創(chuàng)建工廠。當(dāng)一個(gè)任務(wù)需要放到線程池中執(zhí)行時(shí),首先會(huì)放在核心線程中,核心線程數(shù)滿了以后會(huì)放在隊(duì)列中,隊(duì)列滿了以后,會(huì)放在最大線程數(shù)中,如果最大也滿了,就直接執(zhí)行拒絕策略。當(dāng)一個(gè)任務(wù)(runnable)放到了okhttp線程池中時(shí),首先發(fā)現(xiàn)核心線程數(shù)為0,那就直接放隊(duì)列中,這時(shí)候發(fā)現(xiàn)隊(duì)列是SynchronousQueue這么個(gè)玩意,我們直到SynchronousQueue這個(gè)隊(duì)列是壓根沒有容器的,然后就找空閑線程啊,一找發(fā)現(xiàn),哎!沒有空閑線程,那就重開一個(gè)線程來跑這個(gè)runnable。
這么寫有一個(gè)好處,就是假如我們的某個(gè)runnable1是一個(gè)需要很久才能返回結(jié)果,說極端點(diǎn)就是直接一個(gè)死循環(huán),永遠(yuǎn)執(zhí)行不完,這時(shí)候呢下一個(gè)runnable2進(jìn)來了,發(fā)現(xiàn)空閑線程沒有,核心線程數(shù)為0,隊(duì)列也是一個(gè)沒容器的玩意兒,那怎么辦?就直接new Thread出一個(gè)線程自己跑自己的。我們反過來說,如果說我們的隊(duì)列是有容器的,假如隊(duì)列的容量為1,runnable1一開始在隊(duì)列中,然后被線程執(zhí)行,換句話說,現(xiàn)在隊(duì)列為空,但是線程已經(jīng)在跑runnable1的死循環(huán)了,你再來一個(gè)runnable2也是只能塞到隊(duì)列中,默默地看runnable1在線程中瘋狂死循環(huán),永遠(yuǎn)也等不到線程空閑的時(shí)候。唯一能拯救runnable1的只有再來一個(gè)runnable3,當(dāng)runnable3進(jìn)入線程池后,發(fā)現(xiàn)runnable1在瘋狂死循環(huán),runnable2就像小鳥一樣被強(qiáng)制囚禁在籠子里,觀看runnable1的瘋狂死循環(huán),runnable3既進(jìn)不了籠子,也沒有核心線程給他跑,于是線程池就給它(runnable3)新建了一個(gè)線程3,等runnable3跑完了以后,線程3就處于空閑,那線程池能讓這個(gè)線程3閑著嗎?當(dāng)然不行,那線程池就去找啊,找啊,發(fā)現(xiàn)runnable2還在隊(duì)列中沒有被執(zhí)行,于是就吧runnable2交給了線程3來執(zhí)行,這個(gè)時(shí)候runnable2才終于被執(zhí)行了。
那么call它具體是做了什么呢?我們看看它究竟做了啥
剛才講到這里client.dispatcher().enqueue(new AsyncCall(responseCallback));我們進(jìn)去看看AsyncCall到底是個(gè)啥
final class AsyncCall extends NamedRunnable {
public abstract class NamedRunnable implements Runnable {
protected final String name;
public NamedRunnable(String format, Object... args) {
this.name = Util.format(format, args);
}
@Override
public final void run() {
String oldName = Thread.currentThread().getName();
Thread.currentThread().setName(name);
try {
execute();
} finally {
Thread.currentThread().setName(oldName);
}
}
protected abstract void execute();
}
AsyncCall--繼承了-->NamedRunnable --繼承了-->Runnable,而在NamedRunnable的run方法中又執(zhí)行了 execute();,所以說,線程池要執(zhí)行AsyncCall ,那肯定是直接執(zhí)行AsyncCall 的execute方法,不然你這么寫也沒意義啊,我們帶著猜測再回源碼中打探一番
final class AsyncCall extends NamedRunnable {
private final Callback responseCallback;
AsyncCall(Callback responseCallback) {
super("OkHttp %s", redactedUrl());
this.responseCallback = responseCallback;
}
String host() {
return originalRequest.url().host();
}
Request request() {
return originalRequest;
}
RealCall get() {
return RealCall.this;
}
@Override
protected void execute() {//注釋1
boolean signalledCallback = false;
try {
Response response = getResponseWithInterceptorChain();
if (retryAndFollowUpInterceptor.isCanceled()) {
signalledCallback = true;
responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
} else {
signalledCallback = true;
responseCallback.onResponse(RealCall.this, response);
}
} catch (IOException e) {
if (signalledCallback) {
// Do not signal the callback twice!
Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
} else {
eventListener.callFailed(RealCall.this, e);
responseCallback.onFailure(RealCall.this, e);
}
} finally {//注釋2
client.dispatcher().finished(this);
}
}
}
發(fā)現(xiàn)果不其然,在注釋1中確實(shí)是實(shí)現(xiàn)了execute方法,然后在方法內(nèi)就是一通try catch,但是我們注意一下注釋2的這個(gè)地方,也就是說,我不管你是請(qǐng)求成功還是失敗,你肯定得執(zhí)行注釋2的內(nèi)容,這個(gè)你跑不了,我們?cè)俅胃M(jìn)一下代碼,發(fā)現(xiàn)Dispatcher.java中有這幾個(gè)重載的finish方法
//方法1
void finished(AsyncCall call) {
finished(runningAsyncCalls, call, true);
}
/**
* Used by {@code Call#execute} to signal completion.
*/
void finished(RealCall call) {
finished(runningSyncCalls, call, false);
}
//方法3
private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
int runningCallsCount;
Runnable idleCallback;
synchronized (this) {
if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
if (promoteCalls) promoteCalls();//注釋1
runningCallsCount = runningCallsCount();
idleCallback = this.idleCallback;
}
if (runningCallsCount == 0 && idleCallback != null) {
idleCallback.run();
}
}
很明顯, client.dispatcher().finished(this);是直接執(zhí)行了方法1,然后再執(zhí)行方法3,其中要關(guān)注注釋1中的這一行,這一行執(zhí)行的方法如下
private void promoteCalls() {
if (runningAsyncCalls.size() >= maxRequests) return; // Already running max capacity.
if (readyAsyncCalls.isEmpty()) return; // No ready calls to promote.
for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
AsyncCall call = i.next();
if (runningCallsForHost(call) < maxRequestsPerHost) {
i.remove();
runningAsyncCalls.add(call);
executorService().execute(call);
}
if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity.
}
}
其中前面的兩個(gè)return就不多講了,后面有個(gè)for循環(huán),這里面首先遍歷了readyCall的隊(duì)列,發(fā)現(xiàn)如果說runningCall隊(duì)列中有位置被騰出來了,就將readyCall隊(duì)列中的任務(wù)拿出來放到runningCall隊(duì)列中執(zhí)行,如果沒有那就繼續(xù)遍歷。在這個(gè)方法中分別判斷了runningCall隊(duì)列是否是最大,readyCall隊(duì)列是否為空,遍歷循環(huán)中又判斷了runningCall隊(duì)列是不是大于了最大值,不得不說到底是大公司的程序員,代碼寫的十分嚴(yán)謹(jǐn)??。
到這里就解釋了第二個(gè)問題:在readyCall的隊(duì)列要什么時(shí)候才能運(yùn)行呢?
現(xiàn)在我們?cè)倏椿谹synCall的execute()方法:
@Override
protected void execute() {
boolean signalledCallback = false;
try {
Response response = getResponseWithInterceptorChain();//注釋1
if (retryAndFollowUpInterceptor.isCanceled()) {
signalledCallback = true;
responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
} else {
signalledCallback = true;
responseCallback.onResponse(RealCall.this, response);
}
} catch (IOException e) {
if (signalledCallback) {
// Do not signal the callback twice!
Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
} else {
eventListener.callFailed(RealCall.this, e);
responseCallback.onFailure(RealCall.this, e);
}
} finally {
client.dispatcher().finished(this);
}
}
點(diǎn)進(jìn)注釋1中的這個(gè)方法
Response getResponseWithInterceptorChain() throws IOException {
// Build a full stack of interceptors.
List<Interceptor> interceptors = new ArrayList<>();
interceptors.addAll(client.interceptors()); //注釋0
interceptors.add(retryAndFollowUpInterceptor);//注釋1
interceptors.add(new BridgeInterceptor(client.cookieJar()));/注釋2
interceptors.add(new CacheInterceptor(client.internalCache()));/注釋3
interceptors.add(new ConnectInterceptor(client));/注釋4
if (!forWebSocket) {
interceptors.addAll(client.networkInterceptors());/注釋5
}
interceptors.add(new CallServerInterceptor(forWebSocket));/注釋6
Interceptor.Chain chain = new RealInterceptorChain(interceptors, null, null, null, 0,
originalRequest, this, eventListener, client.connectTimeoutMillis(),
client.readTimeoutMillis(), client.writeTimeoutMillis());
return chain.proceed(originalRequest);
}
這個(gè)方法就是各種okhttp的攔截器
注釋0中是用戶自定義的攔截器,注釋1~6是系統(tǒng)加的攔截器,網(wǎng)絡(luò)訪問的時(shí)候,都是靠這些攔截器來實(shí)現(xiàn)的。下一篇文章將會(huì)詳細(xì)說明這些攔截器的作用