7、okhttp源碼解析-Dispatcher任務管理器

1、okhttp源碼解析-整體流程
2、okhttp源碼解析-攔截器RetryAndFllowUpInterceptor
3、okhttp源碼解析-攔截器BridgeInterceptor
4、okhttp源碼解析-攔截器CacheInterceptor
5、okhttp源碼解析-攔截器ConnectInterceptor
6、okhttp源碼解析-攔截器CallServerInterceptor
7、okhttp源碼解析-Dispatcher任務管理器

Dispatcher

okhttp的dispatcher調度器,其實就是維護了一個線程池。

  • dispatcher默認最大并發(fā)數是是 64
  • dispatcher默認同一域名下最大并發(fā)數是 5

一、調用流程圖

Dispatcher (1).png

二、源碼分析

1、維護一個線程池,executorService()

2、維護三個數組

  • readyAsyncCalls 待執(zhí)行的異步任務list
  • runningAsyncCalls 正在執(zhí)行的異步任務list
  • runningSyncCalls 正在執(zhí)行的同步任務list

3、Dispatcher異步執(zhí)行方法:enqueue(AsyncCall)

  • 判斷runningAsyncCalls是否大于64,并且判斷同一個host請求是否大于5.
  • 3.1 如果小于64將AsyncCall放到數組中,并直接將AsyncCall,放到線程池中執(zhí)行execute。
  • 3.2 如果大于64了,就先放在readyAsyncCalls集合中。
4、線程池執(zhí)行execute()時,會調用AsyncCall的execute方法,
  • 4.1 調用getResponseWithInterceptorChains()方法,通過攔截責任鏈獲取response。
  • 4.2 調用Dispatcher.finished() -> promoteCalls()方法,循環(huán)readyAsyncCalls集合,如果runningAsyncCalls小于64的話,就從readyAsyncCalls里拿一個出來執(zhí)行,并插入runningAsyncCalls里。

5、Dispatcher的finished()方法

  • 從runningAyncCalls中,移除當前已經執(zhí)行完成的異步任務
  • 調用promoteCalls()方法

6、Dispatcher的promoteCalls()方法

  • 循環(huán)readyAsyncCalls集合,如果runningAsyncCalls小于64的話,就從readyAsyncCalls里拿一個出來執(zhí)行,并插入runningAsyncCalls里

7、Dispatcher的execute方法中:

  • 將RealCall插入runningSyncCalls集合中。
  • 這個runningSyncCalls是存儲同步任務的,他只是單純用來計數的。
  • 計數的目的是在空閑回調idleCallback時,確定同步請求數runningSyncCalls和異步請求數runningAyncCalls之和等于0時,再做空閑回調。

二、關鍵代碼

1、維護一個線程池,executorService()

public synchronized ExecutorService executorService() {
    if (executorService == null) {
      executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
          new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
    }
    return executorService;
  }

2、維護三個數組

 /** Ready async calls in the order they'll be run. */
 //待執(zhí)行的異步任務集合
  private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();

  /** Running asynchronous calls. Includes canceled calls that haven't finished yet. */
  //正在執(zhí)行的異步任務集合
  private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();

  /** Running synchronous calls. Includes canceled calls that haven't finished yet. */
  //正在執(zhí)行的同步任務集合,在空閑回調時,用于計算正在執(zhí)行的任務。也就是同步任務、異步任務均為0時,才做空閑回調
  private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();

3、異步執(zhí)行方法:enqueue(AsyncCall)

synchronized void enqueue(AsyncCall call) {
    //正在執(zhí)行的異步任務總數 < 64 && 同host的請求數 < 5
    if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
    //將異步任務存儲在集合中
      runningAsyncCalls.add(call);
      //線程池執(zhí)行該異步任務
      executorService().execute(call);
    } else {
    //否則暫時將異步任務存儲在待執(zhí)行任務集合中
      readyAsyncCalls.add(call);
    }
  }
4、線程池執(zhí)行execute()時,會調用AsyncCall的execute方法,
@Override protected void execute() {
      boolean signalledCallback = false;
      try {
        Response response = getResponseWithInterceptorChain();
      } catch (IOException e) {
      } finally {
      //關鍵點:調用Dispatcher的finished方法。
        client.dispatcher().finished(this);
      }
    }

5、Dispatcher的finished()方法

 /** Used by {@code AsyncCall#run} to signal completion. */
  void finished(AsyncCall call) {
    finished(runningAsyncCalls, call, true);
  }

  /** Used by {@code Call#execute} to signal completion. */
  void finished(RealCall call) {
    finished(runningSyncCalls, call, false);
  }

  private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
    int runningCallsCount;
    Runnable idleCallback;
    synchronized (this) {
    //將執(zhí)行完的異步任務,移除runningAsyncCalls集合。
      if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
      //執(zhí)行promoteCalls()方法,將待執(zhí)行集合中的任務拿出來去執(zhí)行。
      if (promoteCalls) promoteCalls();
      runningCallsCount = runningCallsCount();
      idleCallback = this.idleCallback;
    }

//runningCallsCount = 同步任務數 + 異步任務數。等于0調用空閑回調
    if (runningCallsCount == 0 && idleCallback != null) {
      idleCallback.run();
    }
  }

6、Dispatcher的promoteCalls()

  private void promoteCalls() {
    if (runningAsyncCalls.size() >= maxRequests) return; // Already running max capacity.

//遍歷待執(zhí)行異步任務集合
    for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
      AsyncCall call = i.next();

      if (runningCallsForHost(call) < maxRequestsPerHost) {
        i.remove();
        //放入正在執(zhí)行的集合中
        runningAsyncCalls.add(call);
        //通過線程池執(zhí)行異步任務
        executorService().execute(call);
      }

      if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity.
    }
  }

7、Dispatcher的execute()方法

僅僅用來記錄同步任務的個數,輔助空閑回調時的判斷條件。

/** Used by {@code Call#execute} to signal it is in-flight. */
  synchronized void executed(RealCall call) {
    runningSyncCalls.add(call);
  }
最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

友情鏈接更多精彩內容