OkHttp3源碼和設(shè)計模式-1

《打車APP實戰(zhàn)》課程中,我們使用 OkHttp3 簡單搭建了一個網(wǎng)絡(luò)框架, 實踐了 OkHttp3 的用法。不過課程本身的重點是 MVP 架構(gòu)的實踐,所以沒有進一步 OkHttp3 底層的實現(xiàn)細節(jié)。本文來探究一下 OkHttp3 的源碼和其中的設(shè)計思想。

關(guān)于 OkHttp3 的源碼分析的文章挺多,不過大多還是在為了源碼而源碼。個人覺得如果讀源碼不去分析源碼背后的設(shè)計模式或設(shè)計思想,那么讀源碼的意義不大。 同時,如果熟悉的設(shè)計模式越多,那么讀某個框架的源碼的時候就越容易,兩者是相輔相成的,這也是許多大牛認為多讀源碼能提高編程能力的原因。

整體架構(gòu)

整體架構(gòu)

為了方面后面的理解,我這里簡單畫了個架構(gòu)圖,圖中畫出了 OkHttp3 核心的功能模塊。為了方便整體理解,這里分了三個層次: 客戶層、執(zhí)行層和連接層。
首先,客戶層的OkHttpClient ,使用過 OkHttp 網(wǎng)絡(luò)庫的同學應(yīng)該都熟悉,在發(fā)送網(wǎng)絡(luò)請求,執(zhí)行層決定怎么處理請求,比如同步還是異步,同步請求的話直接在當前線程完成請求, 請求要經(jīng)過多層攔截器處理; 如果是異步處理,需要 Dispatcher 執(zhí)行分發(fā)策略, 線程池管理執(zhí)行任務(wù); 又比如,一個請求下來,要不要走緩存,如果不走緩存,進行網(wǎng)絡(luò)請求。最后執(zhí)行層將從連接層進行網(wǎng)絡(luò) IO 獲取數(shù)據(jù)。

OkHttpClient

使用過 OkHttp 網(wǎng)絡(luò)庫的同學應(yīng)該都熟悉 OkHttpClient , 許多第三方框架都會提供一個類似的類作為客戶訪問的一個入口。 關(guān)于 OkHttpClient 代碼注釋上就說的很清楚:

   /**
 * Factory for {@linkplain Call calls}, which can be used to send 
   HTTP requests and read their
 * responses.
 *
 * <h3>OkHttpClients should be shared</h3>
 *
 * <p>OkHttp performs best when you create a single {@code 
 OkHttpClient} instance and reuse it for
 * all of your HTTP calls. This is because each client holds its own 
connection pool and thread
 * pools. Reusing connections and threads reduces latency and 
saves memory. Conversely, creating a
 * client for each request wastes resources on idle pools.
 *
 * <p>Use {@code new OkHttpClient()} to create a shared instance 
with the default settings:
 * <pre>   {@code
 *
 *   // The singleton HTTP client.
 *   public final OkHttpClient client = new OkHttpClient();
 * }</pre>
 *
 * <p>Or use {@code new OkHttpClient.Builder()} to create a shared 
  instance with custom settings:
 * <pre>   {@code
 *
 *   // The singleton HTTP client.
 *   public final OkHttpClient client = new OkHttpClient.Builder()
 *       .addInterceptor(new HttpLoggingInterceptor())
 *       .cache(new Cache(cacheDir, cacheSize))
 *       .build();
 * }</pre>
 *
 ....  省略
*/

簡單提煉:
1、OkHttpClient, 可以通過 new OkHttpClient() 或 new OkHttpClient.Builder() 來創(chuàng)建對象, 但是---特別注意, OkHttpClient() 對象最好是共享的, 建議使用單例模式創(chuàng)建。 因為每個 OkHttpClient 對象都管理自己獨有的線程池和連接池。 這一點很多同學,甚至在我經(jīng)歷的團隊中就有人踩過坑, 每一個請求都創(chuàng)建一個 OkHttpClient 導(dǎo)致內(nèi)存爆掉。

2、 從上面的整體框架圖,其實執(zhí)行層有很多屬性功能是需要OkHttpClient 來制定,例如緩存、線程池、攔截器等。如果你是設(shè)計者你會怎樣設(shè)計 OkHttpClient ? 建造者模式,OkHttpClient 比較復(fù)雜, 太多屬性, 而且客戶的組合需求多樣化, 這種情況下就考慮使用建造者模式。 new OkHttpClien() 創(chuàng)建對象, 內(nèi)部默認指定了很多屬性:

 public OkHttpClient() {
   this(new Builder());
}

在看看 new Builder() 的默認實現(xiàn):

public Builder() {
  dispatcher = new Dispatcher();
  protocols = DEFAULT_PROTOCOLS;
  connectionSpecs = DEFAULT_CONNECTION_SPECS;
  eventListenerFactory = EventListener.factory(EventListener.NONE);
  proxySelector = ProxySelector.getDefault();
  cookieJar = CookieJar.NO_COOKIES;
  socketFactory = SocketFactory.getDefault();
  hostnameVerifier = OkHostnameVerifier.INSTANCE;
  certificatePinner = CertificatePinner.DEFAULT;
  proxyAuthenticator = Authenticator.NONE;
  authenticator = Authenticator.NONE;
  connectionPool = new ConnectionPool();
  dns = Dns.SYSTEM;
  followSslRedirects = true;
  followRedirects = true;
  retryOnConnectionFailure = true;
  connectTimeout = 10_000;
  readTimeout = 10_000;
  writeTimeout = 10_000;
  pingInterval = 0;
}

默認指定 Dispatcher (管理線程池)、鏈接池、超時時間等。

3、 內(nèi)部對于線程池、鏈接池管理有默認的管理策略,例如空閑時候的線程池、連接池會在一定時間自動釋放,但如果你想主動去釋放也可以通過客戶層去釋放。(很少)

執(zhí)行層

 Response response = mOkHttpClient.newCall(request).execute();

這是應(yīng)用程序中發(fā)起網(wǎng)絡(luò)請求最頂端的調(diào)用,newCall(request) 方法返回 RealCall 對象。RealCall 封裝了一個 request 代表一個請求調(diào)用任務(wù),RealCall 有兩個重要的方法 execute() 和 enqueue(Callback responseCallback)。 execute() 是直接在當前線程執(zhí)行請求,enqueue(Callback responseCallback) 是將當前任務(wù)加到任務(wù)隊列中,執(zhí)行異步請求。

同步請求

 @Override public Response execute() throws IOException {
  synchronized (this) {
    if (executed) throw new IllegalStateException("Already Executed");
    executed = true;
  }
  captureCallStackTrace();
  try {
    // client.dispatcher().executed(this) 內(nèi)部只是記錄下執(zhí)行狀態(tài),
    client.dispatcher().executed(this);
    // 真正執(zhí)行發(fā)生在這里
    Response result = getResponseWithInterceptorChain();
    if (result == null) throw new IOException("Canceled");
    return result;
  } finally {
    // 后面再解釋
    client.dispatcher().finished(this);
  }
}

執(zhí)行方法關(guān)鍵在 getResponseWithInterceptorChain() 這個方法中, 關(guān)于 client.dispatcher().executed(this) 和 client.dispatcher().finished(this); 這里先忽略 ,后面再看。

請求過程要從執(zhí)行層說到連接層,涉及到 getResponseWithInterceptorChain 方法中組織的各個攔截器的執(zhí)行過程,內(nèi)容比較多,后面章節(jié)在說。先說說 RealCall 中 enqueue(Callback responseCallback) 方法涉及的異步請求和線程池。

Dispatcher 和線程池

 @Override public void enqueue(Callback responseCallback) {
  synchronized (this) {
  if (executed) throw new IllegalStateException("Already Executed");
  executed = true;
}
 captureCallStackTrace();
 client.dispatcher().enqueue(new AsyncCall(responseCallback));
}

調(diào)用了 dispatcher 的 enqueue()方法
dispatcher 結(jié)合線程池完成了所有異步請求任務(wù)的調(diào)配。

synchronized void enqueue(AsyncCall call) {
if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
runningAsyncCalls.add(call);
executorService().execute(call);
} else {
readyAsyncCalls.add(call);
}
}

Dispatcher調(diào)度

dispatcher 主要維護了三兩個隊列 readyAsyncCalls、runningAsyncCalls 和 runningSyncCalls,分別代表了準備中隊列, 正在執(zhí)行的異步任務(wù)隊列和正在執(zhí)行的同步隊列, 重點關(guān)注下前面兩個。
現(xiàn)在我們可以回頭來看看前面 RealCall 方法 client.dispatcher().finished(this) 這個疑點了。 在每個任務(wù)執(zhí)行完之后要回調(diào) client.dispatcher().finished(this) 方法, 主要是要將當前任務(wù)從 runningAsyncCalls 或 runningSyncCalls 中移除, 同時把 readyAsyncCalls 的任務(wù)調(diào)度到 runningAsyncCalls 中并執(zhí)行。

線程池

public synchronized ExecutorService executorService() {
  if (executorService == null) {
  executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
      new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
 }
  return executorService;
 }

默認實現(xiàn)是一個不限容量的線程池 , 線程空閑時存活時間為 60 秒。線程池實現(xiàn)了對象復(fù)用,降低線程創(chuàng)建開銷,從設(shè)計模式上來講,使用了享元模式。

責任鏈 (攔截器執(zhí)行過程)

  Response getResponseWithInterceptorChain() throws IOException {
// Build a full stack of interceptors.
List<Interceptor> interceptors = new ArrayList<>();
interceptors.addAll(client.interceptors());
interceptors.add(retryAndFollowUpInterceptor);
interceptors.add(new BridgeInterceptor(client.cookieJar()));
interceptors.add(new CacheInterceptor(client.internalCache()));
interceptors.add(new ConnectInterceptor(client));
if (!forWebSocket) {
  interceptors.addAll(client.networkInterceptors());
}
interceptors.add(new CallServerInterceptor(forWebSocket));

Interceptor.Chain chain = new RealInterceptorChain(
    interceptors, null, null, null, 0, originalRequest);
return chain.proceed(originalRequest);
  }
}

要跟蹤 Okhttp3 的網(wǎng)絡(luò)請求任務(wù)執(zhí)行過程 ,需要看懂以上代碼,看懂以上代碼必須理解設(shè)計模式-責任鏈。在責任鏈模式里,很多對象由每一個對象對其下家的引用而連接起來形成一條鏈。請求在這個鏈上傳遞,直到鏈上的某一個對象決定處理此請求。發(fā)出這個請求的客戶端并不知道鏈上的哪一個對象最終處理這個請求,這使得系統(tǒng)可以在不影響客戶端的情況下動態(tài)地重新組織和分配責任。 網(wǎng)絡(luò)請求過程,是比較典型的復(fù)合責任鏈的場景,比如請求傳遞過程,我們需要做請求重試, 需要執(zhí)行緩存策略, 需要建立連接等, 每一個處理節(jié)點可以由一個鏈上的對象來處理; 同時客戶端使用的時候可能也會在請求過程中做一些應(yīng)用層需要的事情,比如我要記錄網(wǎng)絡(luò)請求的耗時、日志等, 責任鏈還可以動態(tài)的擴展到客戶業(yè)務(wù)方。

攔截器

在 OkHttp3 的攔截器鏈中, 內(nèi)置了5個默認的攔截器,分別用于重試、請求對象轉(zhuǎn)換、緩存、鏈接、網(wǎng)絡(luò)讀寫。
以上方法中先是添加了客戶端自定義的連接器,然后在分別添加內(nèi)置攔截器。

Okhttp3 攔截器類圖

攔截器類圖

現(xiàn)在我們把對 OkHttp 網(wǎng)絡(luò)請求執(zhí)行過程的研究轉(zhuǎn)化對每個攔截器處理的研究。

retryAndFollowUpInterceptor 重試機制

重試流程

retryAndFollowUpInterceptor 處于內(nèi)置攔截器鏈的最頂端,在一個循環(huán)中執(zhí)行重試過程:
1、首先下游攔截器在處理網(wǎng)絡(luò)請求過程如拋出異常,則通過一定的機制判斷一下當前鏈接是否可恢復(fù)的(例如,異常是不是致命的、有沒有更多的線路可以嘗試等),如果可恢復(fù)則重試,否則跳出循環(huán)。
2、 如果沒什么異常則校驗下返回狀態(tài)、代理鑒權(quán)、重定向等,如果需要重定向則繼續(xù),否則直接跳出循環(huán)返回結(jié)果。
3、 如果重定向,則要判斷下是否已經(jīng)達到最大可重定向次數(shù), 達到則拋出異常,跳出循環(huán)。

@Override public Response intercept(Chain chain) throws IOException {
Request request = chain.request();
// 創(chuàng)建連接池管理對象
streamAllocation = new StreamAllocation(
client.connectionPool(), createAddress(request.url()), callStackTrace);

int followUpCount = 0;
Response priorResponse = null;
while (true) {
  if (canceled) {
    streamAllocation.release();
    throw new IOException("Canceled");
  }

  Response response = null;
  boolean releaseConnection = true;
  try {
  // 將請求處理傳遞下游攔截器處理
    response = ((RealInterceptorChain) chain).proceed(request, streamAllocation, null, null);
    releaseConnection = false;
  } catch (RouteException e) {
    // The attempt to connect via a route failed. The request will not have been sent.
     //  線路異常,判斷滿足可恢復(fù)條件,滿足則繼續(xù)循環(huán)重試
    if (!recover(e.getLastConnectException(), false, request)) {
      throw e.getLastConnectException();
    }
    releaseConnection = false;
    continue;
  } catch (IOException e) {
    // An attempt to communicate with a server failed. The request may have been sent.

// IO異常,判斷滿足可恢復(fù)條件,滿足則繼續(xù)循環(huán)重試
boolean requestSendStarted = !(e instanceof ConnectionShutdownException);
if (!recover(e, requestSendStarted, request)) throw e;
releaseConnection = false;
continue;
} finally {
// We're throwing an unchecked exception. Release any resources.
if (releaseConnection) {
streamAllocation.streamFailed(null);
streamAllocation.release();
}
}

  // Attach the prior response if it exists. Such responses never have a body.
  if (priorResponse != null) {
    response = response.newBuilder()
        .priorResponse(priorResponse.newBuilder()
                .body(null)
                .build())
        .build();
  }
 //  是否需要重定向
  Request followUp = followUpRequest(response);

  if (followUp == null) {
    if (!forWebSocket) {
      streamAllocation.release();
    }
    // 不需要重定向,正常返回結(jié)果
    return response;
  }

  closeQuietly(response.body());

  if (++followUpCount > MAX_FOLLOW_UPS) {
   // 達到次數(shù)限制
    streamAllocation.release();
    throw new ProtocolException("Too many follow-up requests: " + followUpCount);
  }

  if (followUp.body() instanceof UnrepeatableRequestBody) {
    streamAllocation.release();
    throw new HttpRetryException("Cannot retry streamed HTTP body", response.code());
  }

  if (!sameConnection(response, followUp.url())) {
    streamAllocation.release();
    streamAllocation = new StreamAllocation(
        client.connectionPool(), createAddress(followUp.url()), callStackTrace);
  } else if (streamAllocation.codec() != null) {
    throw new IllegalStateException("Closing the body of " + response
        + " didn't close its backing stream. Bad interceptor?");
  }

  request = followUp;
  priorResponse = response;
 }
}

BridgeInterceptor

  /**
  * Bridges from application code to network code. First it builds a 
network request from a user
 * request. Then it proceeds to call the network. Finally it builds a 
user response from the network
 * response.
 */

這個攔截器比較簡單, 一個實現(xiàn)應(yīng)用層和網(wǎng)絡(luò)層直接的數(shù)據(jù)格式編碼的橋。 第一: 把應(yīng)用層客戶端傳過來的請求對象轉(zhuǎn)換為 Http 網(wǎng)絡(luò)協(xié)議所需字段的請求對象。 第二, 把下游網(wǎng)絡(luò)請求結(jié)果轉(zhuǎn)換為應(yīng)用層客戶所需要的響應(yīng)對象。 這個設(shè)計思想來自適配器設(shè)計模式,大家可以去體會一下。

CacheInterceptor 數(shù)據(jù)策略(策略模式)

CacheInterceptor 實現(xiàn)了數(shù)據(jù)的選擇策略, 來自網(wǎng)絡(luò)還是來自本地? 這個場景也是比較契合策略模式場景, CacheInterceptor 需要一個策略提供者提供它一個策略(錦囊), CacheInterceptor 根據(jù)這個策略去選擇走網(wǎng)絡(luò)數(shù)據(jù)還是本地緩存。

緩存策略

緩存的策略過程:
1、 請求頭包含 "If-Modified-Since" 或 "If-None-Match" 暫時不走緩存
2、 客戶端通過 cacheControl 指定了無緩存,不走緩存
3、客戶端通過 cacheControl 指定了緩存,則看緩存過期時間,符合要求走緩存。
4、 如果走了網(wǎng)絡(luò)請求,響應(yīng)狀態(tài)碼為 304(只有客戶端請求頭包含 "If-Modified-Since" 或 "If-None-Match" ,服務(wù)器數(shù)據(jù)沒變化的話會返回304狀態(tài)碼,不會返回響應(yīng)內(nèi)容), 表示客戶端繼續(xù)用緩存。

@Override public Response intercept(Chain chain) throws IOException {
Response cacheCandidate = cache != null
? cache.get(chain.request())
: null;
long now = System.currentTimeMillis();
CacheStrategy strategy = new CacheStrategy.Factory(now, chain.request(), cacheCandidate).get();
// 獲取緩存策略
Request networkRequest = strategy.networkRequest;
Response cacheResponse = strategy.cacheResponse;
if (cache != null) {
cache.trackResponse(strategy);
}
if (cacheCandidate != null && cacheResponse == null) {
closeQuietly(cacheCandidate.body()); // The cache candidate wasn't applicable. Close it.
}
// If we're forbidden from using the network and the cache is insufficient, fail.
if (networkRequest == null && cacheResponse == null) {
return new Response.Builder()
.request(chain.request())
.protocol(Protocol.HTTP_1_1)
.code(504)
.message("Unsatisfiable Request (only-if-cached)")
.body(Util.EMPTY_RESPONSE)
.sentRequestAtMillis(-1L)
.receivedResponseAtMillis(System.currentTimeMillis())
.build();
}
// 走緩存
if (networkRequest == null) {
return cacheResponse.newBuilder()
.cacheResponse(stripBody(cacheResponse))
.build();
}
Response networkResponse = null;
try {
// 執(zhí)行網(wǎng)絡(luò)
networkResponse = chain.proceed(networkRequest);
} finally {
// If we're crashing on I/O or otherwise, don't leak the cache body.
if (networkResponse == null && cacheCandidate != null) {
closeQuietly(cacheCandidate.body());
}
}

// 返回 304 仍然走本地緩存
if (cacheResponse != null) {
  if (networkResponse.code() == HTTP_NOT_MODIFIED) {
    Response response = cacheResponse.newBuilder()
        .headers(combine(cacheResponse.headers(), networkResponse.headers()))
        .sentRequestAtMillis(networkResponse.sentRequestAtMillis())
        .receivedResponseAtMillis(networkResponse.receivedResponseAtMillis())
        .cacheResponse(stripBody(cacheResponse))
        .networkResponse(stripBody(networkResponse))
        .build();
    networkResponse.body().close();

    // Update the cache after combining headers but before stripping the
    // Content-Encoding header (as performed by initContentStream()).
    cache.trackConditionalCacheHit();
    cache.update(cacheResponse, response);
    return response;
  } else {
    closeQuietly(cacheResponse.body());
  }
}
Response response = networkResponse.newBuilder()
    .cacheResponse(stripBody(cacheResponse))
    .networkResponse(stripBody(networkResponse))
    .build();
if (cache != null) {
  if (HttpHeaders.hasBody(response) && CacheStrategy.isCacheable(response, networkRequest)) {
    //  存儲緩存
    CacheRequest cacheRequest = cache.put(response);
    return cacheWritingResponse(cacheRequest, response);
  }
  if (HttpMethod.invalidatesCache(networkRequest.method())) {
    try {
      cache.remove(networkRequest);
    } catch (IOException ignored) {
      // The cache cannot be written.
    }
  }
}
return response;
}

緩存實現(xiàn)

OkHttp3 內(nèi)部緩存默認實現(xiàn)是使用的 DiskLruCache, 這部分代碼有點繞:

interceptors.add(new CacheInterceptor(client.internalCache()));
初始化 CacheInterceptor 時候 client.internalCache() 這里獲取OkHttpClient的緩存。

InternalCache internalCache() {
  return cache != null ? cache.internalCache : internalCache;
}

注意到, 這個方法是非公開的。 客戶端只能通過 OkhttpClient.Builder的 cache(cache) 定義緩存, cache 是一個 Cache 對實例。 在看看 Cache 的內(nèi)部實現(xiàn), 內(nèi)部有一個 InternalCache 的內(nèi)部類實現(xiàn)。 內(nèi)部調(diào)用時使用 InternalCache 實例提供接口,而存儲邏輯在 Cache 中實現(xiàn)。

緩存

Cache 為什么不直接實現(xiàn) InternalCache ,而通過持有 InternalCache 的一個內(nèi)部類對象來實現(xiàn)方法? 是希望控制緩存實現(xiàn), 不希望用戶外部去實現(xiàn)緩存,同時對內(nèi)保持一定的擴展。

鏈接層

RealCall 封裝了請求過程, 組織了用戶和內(nèi)置攔截器,其中內(nèi)置攔截器 retryAndFollowUpInterceptor -> BridgeInterceptor -> CacheInterceptor 完執(zhí)行層的大部分邏輯 ,ConnectInterceptor -> CallServerInterceptor 兩個攔截器開始邁向連接層最終完成網(wǎng)絡(luò)請求。 關(guān)于 ConnectInterceptor -> CallServerInterceptor 要結(jié)合連接層一起說明,限于篇幅, 下一篇文章:《OkHttp3源碼和設(shè)計模式-2 》接著分析。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容