從 OkHttp 中學(xué)點什么

前言

幾個月前,跟過 OkHttp 的流程源碼,但是時間久了,現(xiàn)在能夠回想起來的的,只有幾個攔截器了,那我豈不是沒什么收獲了。所以,好好想想,我從 OkHttp 中能夠?qū)W到什么

疑問

  1. OkHttp 是怎么拆分功能的,大概有幾個模塊
  2. 它所用到的責(zé)任鏈模式,在實際開發(fā)中適合哪些場景
  3. 它是怎么使用線程池的,這么用有什么好處

OkHttp 是怎么拆分功能的,大概有幾個模塊

作為一個網(wǎng)絡(luò)框架,最核心的功能就是發(fā)起請求,處理響應(yīng)了,這倆個是功能部分, OkHttp 使用 Dispatcher 執(zhí)行任務(wù),內(nèi)部是一個高并發(fā)的線程池,另外整個流程的處理使用到了Interceptor 攔截器

請求執(zhí)行的調(diào)用流程

  • Recall.enqueue(Callback)
  • client.dispatcher().enqueue(new AsyncCall(responseCallback))

停一下,跟進(jìn)一下分發(fā)器器的 enqueue 方法,內(nèi)部時怎么處理異步請求的

  // 創(chuàng)建 AsyncCall 對象 
  void enqueue(AsyncCall call) {
    synchronized (this) {
    // 添加到待執(zhí)行隊列中,雙向隊列
      readyAsyncCalls.add(call);
    }
    // 執(zhí)行的重要代碼
    promoteAndExecute();
  }
  
  
    private boolean promoteAndExecute() {
    assert (!Thread.holdsLock(this));
    // 創(chuàng)建一個可執(zhí)行隊列,目的是限制 64 個最大連接數(shù),每個 Host 最多 5 個連接的限制
    List<AsyncCall> executableCalls = new ArrayList<>();
    boolean isRunning;
    synchronized (this) {
      for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
        AsyncCall asyncCall = i.next();

        if (runningAsyncCalls.size() >= maxRequests) break; // Max capacity.
        if (runningCallsForHost(asyncCall) >= maxRequestsPerHost) continue; // Host max capacity.

        i.remove();
        //  符合條件的,從準(zhǔn)備中的隊列挪到其他兩個隊列中
        executableCalls.add(asyncCall);
        runningAsyncCalls.add(asyncCall);
      }
      isRunning = runningCallsCount() > 0;
    }

   // 放到線程池中執(zhí)行
    for (int i = 0, size = executableCalls.size(); i < size; i++) {
      AsyncCall asyncCall = executableCalls.get(i);
      asyncCall.executeOn(executorService());
    }

    return isRunning;
  }

  

首先 AsyncCall 是實現(xiàn)了 Runnable 的一個類,也難怪,畢竟是要放到線程池中執(zhí)行的

響應(yīng)執(zhí)行的調(diào)用過程

剛才看了請求入隊后,經(jīng)過篩選,開始遍歷放到線程池中去執(zhí)行,那下來就是等待響應(yīng)了

具體代碼在 Recall 類的 run() 方法中,里面就是我們之前了解到的使用責(zé)任鏈模式-攔截器的代碼了

為什么要使用雙端隊列 ArrayDeque ?

OkHttp 源碼系列 之 ArrayDeque - 雙端隊列

首先 Jdk 提供的雙端隊列主要有兩個:

  1. LinkedList 鏈表實現(xiàn)的雙端隊列
  2. ArrayDeque 循環(huán)數(shù)組實現(xiàn)的雙端隊列

那為什么偏偏就選了 ArrayDeque 了,因為效率高.
Jdk 的說明中就說了,ArrayDeque 作為隊列使用時,將比 LinkedList 更快

ArrayQueue 是線程不安全的,Okhttp 是怎樣保證同步問題的? synchronized 關(guān)鍵字

OkHttp 是怎樣使用責(zé)任鏈模式的,Android 源碼中還有其他地方用到了嘛?

那就首先要補(bǔ)習(xí)下責(zé)任鏈模式是什么,以及怎樣用代碼實現(xiàn)

在網(wǎng)上查了很多文檔,有一種表述我覺得很形象,小張去外地出差回來,其中 2w 要去招公司報銷,他去找到組長

  1. 組長看到發(fā)票,面值超過了權(quán)限,說讓小張去找主管
  2. 主管一看,自己最大只能簽 3k 的,讓其去找經(jīng)理
  3. 經(jīng)理最大只能批 1w 的,讓小張去找老板
  4. 最終老板簽字處理

整個流程涉及到多個類(組長、主管、經(jīng)理等),一級一級的處理,最終處理結(jié)束

Android 源碼中 View 的事件分發(fā)也是使用了責(zé)任鏈模式,其中被分發(fā)的 MotionEvent 經(jīng)過 ViewGroup 層層分發(fā),最終被消費或者重新返回到最上層的 View

Android 事件分發(fā)與責(zé)任鏈模式

  • 那 OkHttp 是怎樣實現(xiàn)責(zé)任鏈的呢?
    Let is see fucking code ,Woohooo
    // 同步執(zhí)行的代碼中
    @Override protected void execute() {
      boolean signalledCallback = false;
      transmitter.timeoutEnter();
      try {
        // 使用責(zé)任鏈處理請求響應(yīng),重點看這里
        Response response = getResponseWithInterceptorChain();
        ......
      }
    }
    
    Response getResponseWithInterceptorChain() throws IOException {
    // 創(chuàng)建一個攔截器隊列
    List<Interceptor> interceptors = new ArrayList<>();
    interceptors.addAll(client.interceptors());
    interceptors.add(new RetryAndFollowUpInterceptor(client));
    interceptors.add(new BridgeInterceptor(client.cookieJar()));
    interceptors.add(new CacheInterceptor(client.internalCache()));
    interceptors.add(new ConnectInterceptor(client));
    if (!forWebSocket) {
      interceptors.addAll(client.networkInterceptors());
    }
    interceptors.add(new CallServerInterceptor(forWebSocket));
    
    // 添加完了,開始逐個處理了
    Interceptor.Chain chain = new RealInterceptorChain(interceptors, transmitter, null, 0,
        originalRequest, this, client.connectTimeoutMillis(),
        client.readTimeoutMillis(), client.writeTimeoutMillis());

    boolean calledNoMoreExchanges = false;
    try {
      //  proceed 執(zhí)行后得到響應(yīng)結(jié)果
      Response response = chain.proceed(originalRequest);
      if (transmitter.isCanceled()) {
        closeQuietly(response);
        throw new IOException("Canceled");
      }
      return response;
    } catch (IOException e) {
      calledNoMoreExchanges = true;
      throw transmitter.noMoreExchanges(e);
    } finally {
      if (!calledNoMoreExchanges) {
        transmitter.noMoreExchanges(null);
      }
    }
  }


這里使用了隊列保存所有的攔截器,然后一股腦傳進(jìn)了 RealInterceptorChain 對象中,最后調(diào)用 proceed 就有個返回結(jié)果,停,一下子就獲取到了結(jié)果了嘛? 進(jìn)去看看
RealInterceptorChain 里面是怎么處理的

  public Response proceed(Request request, Transmitter transmitter, @Nullable Exchange exchange)
      throws IOException {
    ... 省略部分代碼    

    // 看到 next 我就想起來鏈表里的 next,這個 next 是干嘛的呢?
    RealInterceptorChain next = new RealInterceptorChain(interceptors, transmitter, exchange,
        index + 1, request, call, connectTimeout, readTimeout, writeTimeout);
    // 根據(jù) index 獲取對應(yīng)位置的攔截器
    Interceptor interceptor = interceptors.get(index);
    // 調(diào)用攔截器的攔截方法
    Response response = interceptor.intercept(next);

    // Confirm that the next interceptor made its required call to chain.proceed()
   
    ... 省略部分代碼    
    return response;
  }

有一個地方?jīng)]看明白,創(chuàng)建 Chain 后,調(diào)用 proceed ,剛開始 index 為 0,那整個隊列是怎么遍歷的,沒看到有循環(huán)遍歷語句啊

進(jìn)入 interceptor.intercept(next) 后,一切截然而止了,懷著好奇心,點開了 interceptor 的實現(xiàn)類 CacheInterceptor ,果不其然,在 intercept 方法中,再次看到了 proceed 的身影,終于破案了

 @Override public Response intercept(Chain chain) throws IOException {
  ... 省略部分代碼    
  // 每調(diào)用一次,內(nèi)部的 index 自增,就意味著不斷的傳遞到下一級攔截器
  networkResponse = chain.proceed(networkRequest);
  ... 省略部分代碼    
  
  return response;

OkHttp 是怎么使用線程池的

最后一個問題,怎么使用線程池的,首先補(bǔ)習(xí)下線程池的各個參數(shù)的含義,以及線程池的工作原理

  // OkHttp 中的線程池
  public synchronized ExecutorService executorService() {
    if (executorService == null) {
      executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
          new SynchronousQueue<>(), Util.threadFactory("OkHttp Dispatcher", false));
    }
    return executorService;
  }

    // 線程池的構(gòu)造方法
    // corePoolSize – the number of threads to keep in the pool, even if they are idle, unless allowCoreThreadTimeOut is set
    // maximumPoolSize – the maximum number of threads to allow in the pool
    // keepAliveTime – when the number of threads is greater than the core, this is the maximum time that excess idle threads will wait for new tasks before terminating.
    // unit – the time unit for the keepAliveTime argument
    // workQueue – the queue to use for holding tasks before they are executed. This queue will hold only the Runnable tasks submitted by the execute method.
    // threadFactory – the factory to use when the executor creates a new thread
 public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             threadFactory, defaultHandler);
    }

創(chuàng)建參數(shù):

  1. 核心線程數(shù),核心線程將保持存活,及時是空閑的,這里是 0
  2. 池中允許的最大線程數(shù),這里是 Int 的最大值,實際到不了這么多,OkHttp 有 64 最大連接數(shù)的限制
  3. 等待時間 可以理解為非核心線程等待任務(wù)時的超時時間 ,這里為 60 秒
  4. 等待時間的單位
  5. 工作隊列,這里是一個同步的阻塞隊列,內(nèi)部沒有容器,傳入一個時就會阻塞下一個的傳入
  6. 線程工廠
  • 為什么 OkHttp 要這么設(shè)置線程池,有什么好處呢?
    其實這種參數(shù)設(shè)置,就是 Excutor.newCachedThreadPool() ,
    首先一個阻塞的同步隊列,內(nèi)部沒有容器,意味著什么呢,每當(dāng)一個網(wǎng)絡(luò)請求發(fā)起,只要核心線程滿了,就會在池中創(chuàng)建新的線程

如果線程池中的線程數(shù)大于核心線程數(shù)且隊列滿了,且線程數(shù)小于最大線程數(shù),則會創(chuàng)建新的線程,剛好 Okhttp 的最大線程數(shù)時是一個極大值,那就會不斷創(chuàng)建線程,是一個高并發(fā)的線程池

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容