OKHttp源碼解析(三)--中階之線程池和消息隊列

上篇文章已經(jīng)說明了OKHttp有兩種調(diào)用方式,一種是阻塞的同步請求,一種是異步的非阻塞的請求。今天主要是講解的是異步的請求。其中涉及了Dispatcher這個類,其他的類基本已經(jīng)在上篇文章介紹過了。所以本片文章的大體思路如下:

  • 1.線程池的理解
  • 2.Dispatcher類詳解
  • 3.OKHttp的任務(wù)調(diào)度
  • 4.OKHttp調(diào)度的理解

在講解線程池和消息隊列的時候有必要講下線程池的基本概念

一、線程池的理解

(一)android中的異步任務(wù)

android的異步任務(wù)一般都是用Thread+Handler或者AsyncTask來實現(xiàn),其中筆者當初經(jīng)歷過各種各樣坑,特別是內(nèi)存泄漏,當初筆者可是相當?shù)挠烙砂?!所以現(xiàn)在很少有開發(fā)者還在用這一套來做異步任務(wù),現(xiàn)在一般都是Rxjava為主,當然還有自己自定義的異步任務(wù)框架(比如筆者),像RxJava都幫我們寫好了對應(yīng)場景的線程池,這是為什么?

1、線程池的理解

我對線程池的理解是有兩個層次,一種是狹隘的,一種是廣義的,那么咱們各自都說下

(1)狹義上的線程池:

線程池是一種多線程處理形式,處理過程中將任務(wù)添加到隊列中,后面再創(chuàng)建線程去處理這些任務(wù),線程池里面的線程都是后臺線程,每個線程都是默認的優(yōu)先級下運。如果某個線程處于空閑中,將添加一個任務(wù)進來,讓空閑線程去處理任務(wù)。如果所有線程都很繁忙,消息隊列會掛起,等待某個線程池空閑后再處理任務(wù)。這樣可以保證線程數(shù)量不能超多最大數(shù)量。

(2)廣義上的線程池:

多線程技術(shù)主要是解決處理器單元內(nèi)多個線程執(zhí)行的問題,它可以顯著減少處理的單元閑置時間,增加處理器單元的吞吐能力。如果對多線程應(yīng)用不當,會增加對單個任務(wù)的的處理時間。

舉例說明:
假如一個服務(wù)器完成一項任務(wù)的時間為T:

T1 創(chuàng)建線程的時間
T2 在線程中執(zhí)行任務(wù)的時間,包括線程同步所需要的時間
T3 線程銷毀的時間

顯然 T= T1+T2+T3. 注意:這是一個理想化的情況

可以看出,T1,T3是多線程自身帶來的開銷(在Java中,通過映射pThread,并進一步通過SystemCall實現(xiàn)native線程),我們渴望減少T1和T3的時間,從而減少T的時間。但是一些線程的使用者并沒有注意到這一點,所以在線程中頻繁的創(chuàng)建或者銷毀線程,這導(dǎo)致T1和T3在T中占有相當比例。這顯然突出的線程池的弱點(T1,T3),而不是有點(并發(fā)性)。
取自IBM知識庫

所以線程池的技術(shù)正是如何關(guān)注縮短或調(diào)整T1,T3時間的技術(shù),從而提高服務(wù)器程序的性能。

  • 1、通過對線程進行緩存,減少創(chuàng)建和銷毀時間的損失
  • 2、通過控制線程數(shù)量的閥值,減少線程過少帶來的CPU閑置(比如長時間卡在I/O上了)與線程過多給JVM內(nèi)存與線程切換時系統(tǒng)調(diào)用的壓力。

在平時我們可以通過線程工廠來創(chuàng)建線程池來盡量避免上述的問題。

二 Dispatcher 類詳解

Dispatcher負責請求的分發(fā)

1、線程池executeService

/** Executes calls. Created lazily. */
private ExecutorService executorService;

public synchronized ExecutorService executorService() {
   if (executorService == null) {
     executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
   }
   return executorService;
 }

由上面代碼可以得出Dispatcher內(nèi)部實現(xiàn)了懶加載的無邊界限制的線程池。參數(shù)解析

  • 1、0:核心線程數(shù)量,保持在線程池中的線程數(shù)量(即使已經(jīng)空閑),為0代表線程空閑后不會保留,等待一段時間后停止。
  • 2、Integer.MAX_VALUE:表示線程池可以容納最大線程數(shù)量
  • 3、TimeUnit.SECOND:當線程池中的線程數(shù)量大于核心線程時,空閑的線程就會等待60s才會被終止,如果小于,則會立刻停止。
  • 4、new SynchronousQueue<Runnable>():線程等待隊列。同步隊列,按序排隊,先來先服務(wù)
  • 5、Util.threadFactory("OkHttp Dispatcher", false):線程工廠,直接創(chuàng)建一個名為OkHttp Dispatcher的非守護線程。

(1)SynchronousQueue每個插入操作必須等待另一個線程的移除操作,同樣任何一個移除操作都等待另一個線程的插入操作。因此隊列內(nèi)部其實沒有任何一個元素,或者說容量為0,嚴格說并不是一種容器,由于隊列沒有容量,因此不能調(diào)用peek等操作,因此只有移除元素才有元素,顯然這是一種快速傳遞元素的方式,也就是說在這種情況下元素總是以最快的方式從插入者(生產(chǎn)者)傳遞給移除者(消費者),這在多任務(wù)隊列中最快的處理任務(wù)方式。對于高頻請求場景,無疑是最合適的。

(2)在OKHttp中,創(chuàng)建了一個閥值是Integer.MAX_VALUE的線程池,它不保留任何最小線程,隨時創(chuàng)建更多的線程數(shù),而且如果線程空閑后,只能多活60秒。所以也就說如果收到20個并發(fā)請求,線程池會創(chuàng)建20個線程,當完成后的60秒后會自動關(guān)閉所有20個線程。他這樣設(shè)計成不設(shè)上限的線程,以保證I/O任務(wù)中高阻塞低占用的過程,不會長時間卡在阻塞上。

2、發(fā)起請求

整個框架主要通過Call來封裝每一次的請求。同時Call持有OkHttpClient和一份Request。而每一次的同步或者異步請求都會有Dispatcher的參與。

(1)、同步

Dispatcher在執(zhí)行同步的Call:直接加入到runningSyncCall隊列中,實際上并沒有執(zhí)行該Call,而是交給外部執(zhí)行

  /** Used by {@code Call#execute} to signal it is in-flight. */
  synchronized void executed(RealCall call) {
    runningSyncCalls.add(call);
  }
(2)、異步

將Call加入隊列:如果當前正在執(zhí)行的call的數(shù)量大于maxRequest(64),或者該call的Host上的call超過maxRequestsPerHos(5),則加入readyAsyncCall排隊等待,否則加入runningAsyncCalls并執(zhí)行

  synchronized void enqueue(AsyncCall call) {
    if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
      runningAsyncCalls.add(call);
      executorService().execute(call);
    } else {
      readyAsyncCalls.add(call);
    }
  }

3、結(jié)束請求

從ready到running,在每個call結(jié)束的時候都會調(diào)用finished

 private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
    int runningCallsCount;
    Runnable idleCallback;
    synchronized (this) {
      if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
      //每次remove完后,執(zhí)行promoteCalls來輪轉(zhuǎn)。
      if (promoteCalls) promoteCalls();
      runningCallsCount = runningCallsCount();
      idleCallback = this.idleCallback;
    }
    //線程池為空時,執(zhí)行回調(diào)
    if (runningCallsCount == 0 && idleCallback != null) {
      idleCallback.run();
    }
  }

  private void promoteCalls() {
     //如果當前執(zhí)行的線程大于maxRequests(64),則不操作
    if (runningAsyncCalls.size() >= maxRequests) return; // Already running max capacity.
    if (readyAsyncCalls.isEmpty()) return; // No ready calls to promote.

    for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
      AsyncCall call = i.next();

      if (runningCallsForHost(call) < maxRequestsPerHost) {
        i.remove();
        runningAsyncCalls.add(call);
        executorService().execute(call);
      }

      if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity.
    }
  }

通過上面代碼,大家可以知道finished先執(zhí)行calls.remove(call)刪除call,然后執(zhí)行promoteCalls(),在promoteCalls()方法里面:如果當前線程大于maxRequest則不操作,如果小于maxRequest則遍歷readyAsyncCalls,取出一個call,并把這個call放入runningAsyncCalls,然后執(zhí)行execute。在遍歷過程中如果runningAsyncCalls超過maxRequest則不再添加,否則一直添加。所以可以這樣說:

promoteCalls()負責ready的Call到running的Call的轉(zhuǎn)化

具體的執(zhí)行請求則在RealCall里面實現(xiàn)的,同步的在RealCall的execute里面實現(xiàn)的,而異步的則在AsyncCall的execute里面實現(xiàn)的。里面都是調(diào)用RealCall的getResponseWithInterceptorChain的方法來實現(xiàn)責任鏈的調(diào)用。

三、OKHttp的任務(wù)調(diào)度

在說調(diào)度任務(wù)之前先說下

1、Dispatcher任務(wù)調(diào)度

在OKHttp中,它使用Dispatcher作為任務(wù)的調(diào)度器。

如下圖所示


Dispatcher.png

在整個調(diào)度流程中涉及的成員如下:
其中

  • Dispatcher 對象是分發(fā)者,也是生產(chǎn)者(默認在主線程中)
  • AsyncCall 對象其實是一個任務(wù)即Runnable(內(nèi)部做了包裝異步接口)
// Dispatcher.java 
maxRequests = 64   // 最大并發(fā)請求數(shù)為64
maxRequestsPerHost = 5 //每個主機最大請求數(shù)為5
ExecutorService executorService  //消費者池(也就是線程池)
Deque<AsyncCall> readyAsyncCalls: // 異步的緩存,正在準備被消費的(用數(shù)組實現(xiàn),可自動擴容,無大小限制)
Deque<AsyncCall> runningAsyncCalls //正在運行的 異步的任務(wù)集合,僅僅是用來引用正在運行的任務(wù)以判斷并發(fā)量,注意它并不是消費者緩存
Deque<RealCall> runningSyncCalls  //正在運行的,同步的任務(wù)集合。僅僅是用來引用正在運行的同步任務(wù)以判斷并發(fā)量

通過將請求任務(wù)分發(fā)給多個線程,可以顯著的減少I/O等待時間

2、OKHttp調(diào)度的具體流程分析

(1)同步調(diào)度分析
第一步是:是調(diào)用了RealCall的execute()方法里面調(diào)用executed(this);
  @Override public Response execute() throws IOException {
    synchronized (this) {
      if (executed) throw new IllegalStateException("Already Executed");
      executed = true;
    }
    captureCallStackTrace();
    try {
      client.dispatcher().executed(this);
      Response result = getResponseWithInterceptorChain();
      if (result == null) throw new IOException("Canceled");
      return result;
    } finally {
      client.dispatcher().finished(this);
    }
  }
第二步:在Dispatcher里面的executed執(zhí)行入隊操作
 /** Used by {@code Call#execute} to signal it is in-flight. */
  synchronized void executed(RealCall call) {
    runningSyncCalls.add(call);
  }
第三步:執(zhí)行g(shù)etResponseWithInterceptorChain();進入攔截器鏈流程,然后進行請求,獲取Response,并返回Response result 。
第四步:執(zhí)行client.dispatcher().finished(this)操作
  void finished(RealCall call) {
    finished(runningSyncCalls, call, false);
  }

這里其實做的是出隊操作。至此同步的調(diào)度就已經(jīng)結(jié)束了

(2)異步調(diào)度分析
AsyncCall類簡介

在講解異步調(diào)度之前不得不提到AsyncCall這個類,AsyncCall,他其實是RealCall的內(nèi)部類

 //RealCall.java
final class AsyncCall extends NamedRunnable {
    private final Callback responseCallback;

    AsyncCall(Callback responseCallback) {
      super("OkHttp %s", redactedUrl());
      this.responseCallback = responseCallback;
    }

    String host() {
      return originalRequest.url().host();
    }

    Request request() {
      return originalRequest;
    }

    RealCall get() {
      return RealCall.this;
    }

    @Override protected void execute() {
      boolean signalledCallback = false;
      try {
        //執(zhí)行耗時任務(wù)
        Response response = getResponseWithInterceptorChain();
        if (retryAndFollowUpInterceptor.isCanceled()) {
          //retryAndFollowUpInterceptor取消了 執(zhí)行失敗
          signalledCallback = true;
         //回調(diào),注意這里回調(diào)是在線程池中,不是主線程
          responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
        } else {
          //一切正常走入正常流程
          signalledCallback = true;
          responseCallback.onResponse(RealCall.this, response);
        }
      } catch (IOException e) {
        if (signalledCallback) {
          // Do not signal the callback twice!
          Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
        } else {
          responseCallback.onFailure(RealCall.this, e);
        }
      } finally {
       //最后執(zhí)行出隊
        client.dispatcher().finished(this);
      }
    }
  }
第一步 是調(diào)用了RealCall的enqueue()方法
  @Override public void enqueue(Callback responseCallback) {
    synchronized (this) {
      if (executed) throw new IllegalStateException("Already Executed");
      executed = true;
    }
    captureCallStackTrace();
    client.dispatcher().enqueue(new AsyncCall(responseCallback));
  }

在enqueue里面調(diào)用了client.dispatcher().enqueue(new AsyncCall(responseCallback));方法

第二步:在Dispatcher里面的enqueue執(zhí)行入隊操作
  synchronized void enqueue(AsyncCall call) {
    //判斷是否滿足入隊的條件(立即執(zhí)行)
    if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
      //正在運行的異步集合添加call
      runningAsyncCalls.add(call);
      //執(zhí)行這個call
      executorService().execute(call);
    } else {
      //不滿足入隊(立即執(zhí)行)條件,則添加到等待集合中
      readyAsyncCalls.add(call);
    }
  }

上述代碼發(fā)現(xiàn)想要入隊需要滿足下面的條件

(runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost)

如果滿足條件,那么就直接把AsyncCall直接加到runningCalls的隊列中,并在線程池中執(zhí)行(線程池會根據(jù)當前負載自動創(chuàng)建,銷毀,緩存相應(yīng)的線程)。反之就放入readyAsyncCalls進行緩存等待。
runningAsyncCalls.size() < maxRequests 表示當前正在運行的AsyncCall是否小于maxRequests = 64
runningCallsForHost(call) < maxRequestsPerHos 表示同一個地址訪問的AsyncCall是否小于maxRequestsPerHost = 5;
即 當前正在并發(fā)的請求不能超過64且同一個地址的訪問不能超過5個

第三步:這里分兩種情況

情況1 第三步 可以直接入隊
runningAsyncCalls.add(call);
第四步:線程池executorService執(zhí)行execute()方法
executorService().execute(call);

由于AsyncCall繼承于NamedRunnable類,而NamedRunnable類又是Runnable類的實現(xiàn)類,所以走到了AsyncCall的execute()方法里面

第五步:執(zhí)行AsyncCall的execute()方法
     @Override protected void execute() {
      boolean signalledCallback = false;
      try {
        Response response = getResponseWithInterceptorChain();
        if (retryAndFollowUpInterceptor.isCanceled()) {
          signalledCallback = true;
          responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
        } else {
          signalledCallback = true;
          responseCallback.onResponse(RealCall.this, response);
        }
      } catch (IOException e) {
        if (signalledCallback) {
          // Do not signal the callback twice!
          Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
        } else {
          responseCallback.onFailure(RealCall.this, e);
        }
      } finally {
        client.dispatcher().finished(this);
      }
    }
第六步:執(zhí)行g(shù)etResponseWithInterceptorChain();進入攔截器鏈流程,然后進行請求,獲取Response。
第七步:如果是正常的獲取到Response,則執(zhí)行responseCallback.onResponse()
第八步:執(zhí)行client.dispatcher().finished(this)操作 進行出隊操作
  void finished(AsyncCall call) {
    finished(runningAsyncCalls, call, true);
  }

PS:注意這里面第三個參數(shù) 同步是false,異步是true,如果是異步則需要進行是否添加繼續(xù)入隊的情景

情況2 第三步 不能直接入隊,需要等待
readyAsyncCalls.add(call);
第四步 觸發(fā)條件

能進入等待則說明當前要么有64條正在進行的并發(fā),要么同一個地址有5個請求,所以要等待。

當有如下條件被滿足或者觸發(fā)的時候則執(zhí)行promoteCalls操作

  • 1 Dispatcher的setMaxRequestsPerHost()方法被調(diào)用時
   public synchronized void setMaxRequestsPerHost(int maxRequestsPerHost) {
    //設(shè)置的maxRequestsPerHost不能小于1
    if (maxRequestsPerHost < 1) {
      throw new IllegalArgumentException("max < 1: " + maxRequestsPerHost);
    }
    this.maxRequestsPerHost = maxRequestsPerHost;
    promoteCalls();
  }
  • 2 Dispatcher的setMaxRequests()被調(diào)用時
public synchronized void setMaxRequests(int maxRequests) {
     //設(shè)置的maxRequests不能小于1
    if (maxRequests < 1) {
      throw new IllegalArgumentException("max < 1: " + maxRequests);
    }
    this.maxRequests = maxRequests;
    promoteCalls();
  }
  • 3當有一條請求結(jié)束了,執(zhí)行了finish()的出隊操作,這時候會觸發(fā)promoteCalls()進行調(diào)整
 if (promoteCalls) 
    promoteCalls();
第五步 執(zhí)行Dispatcher的promoteCalls()方法
private void promoteCalls() {
    if (runningAsyncCalls.size() >= maxRequests) return; // Already running max capacity.
    if (readyAsyncCalls.isEmpty()) return; // No ready calls to promote.

    for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
      AsyncCall call = i.next();

      if (runningCallsForHost(call) < maxRequestsPerHost) {
        i.remove();
        runningAsyncCalls.add(call);
        executorService().execute(call);
      }

      if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity.
    }
  }
第六步 先判斷是否滿足 初步入隊條件
 if (runningAsyncCalls.size() >= maxRequests) 
    return;
 if (readyAsyncCalls.isEmpty()) 
    return; // No ready calls to promote.

如果此時 并發(fā)的數(shù)量還是大于maxRequests=64則return并繼續(xù)等待
如果此時,沒有等待的任務(wù),則直接return并繼續(xù)等待

第七步 滿足初步的入隊條件,進行遍歷,然后進行第二輪入隊判斷
  for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
      AsyncCall call = i.next();

      if (runningCallsForHost(call) < maxRequestsPerHost) {
        i.remove();
        runningAsyncCalls.add(call);
        executorService().execute(call);
      }

      if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity.
    }

進行同一個host是否已經(jīng)有5請求在了,如果在了,則return返回并繼續(xù)等待

第八步 此時已經(jīng)全部滿足條件,則從等待隊列面移除這個call,然后添加到正在運行的隊列中
        i.remove();
        runningAsyncCalls.add(call);
第九步 線程池executorService執(zhí)行execute()方法
executorService().execute(call);
第十步:執(zhí)行AsyncCall的execute()方法
     @Override protected void execute() {
      boolean signalledCallback = false;
      try {
        Response response = getResponseWithInterceptorChain();
        if (retryAndFollowUpInterceptor.isCanceled()) {
          signalledCallback = true;
          responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
        } else {
          signalledCallback = true;
          responseCallback.onResponse(RealCall.this, response);
        }
      } catch (IOException e) {
        if (signalledCallback) {
          // Do not signal the callback twice!
          Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
        } else {
          responseCallback.onFailure(RealCall.this, e);
        }
      } finally {
        client.dispatcher().finished(this);
      }
    }
第十一步:執(zhí)行g(shù)etResponseWithInterceptorChain();進入攔截器鏈流程,然后進行請求,獲取Response。
第十二步:如果是正常的獲取到Response,則執(zhí)行responseCallback.onResponse()
第十三步:執(zhí)行client.dispatcher().finished(this)操作 進行出隊操作
  void finished(AsyncCall call) {
    finished(runningAsyncCalls, call, true);
  }

至此 異步任務(wù)調(diào)度已經(jīng)結(jié)束了

總結(jié):
  • 1、異步流程總結(jié)
    所以簡單的描述下異步調(diào)度為:如果當前還能可以執(zhí)行異步任務(wù),則入隊,并立即執(zhí)行,否則加入readyAsyncCalls隊列,當一個請求執(zhí)行完畢后,會調(diào)用promoteCalls(),來把readyAsyncCalls隊列中的Async移出來并加入到runningAsyncCalls,并開始執(zhí)行。然后在當前線程中去執(zhí)行Call的getResponseWithInterceptorChain()方法,直接獲取當前的返回數(shù)據(jù)Response
  • 2、對比同步和異步任務(wù),我們會發(fā)現(xiàn):同步請求和異步請求原理都是一樣的,都是在getResponseWithInterceptorChain()函數(shù)通過Interceptor鏈條來實現(xiàn)網(wǎng)絡(luò)請求邏輯,而異步任務(wù)則通過ExecutorService來實現(xiàn)的。PS:在Dispatcher中添加一個封裝了Callback的Call的匿名內(nèi)部類AsyncCall來執(zhí)行當前 的Call。這個AsyncCall是Call的匿名內(nèi)部類。AsyncCall的execute方法仍然會回調(diào)到Call的 getResponseWithInterceptorChain方法來完成請求,同時將返回數(shù)據(jù)或者狀態(tài)通過Callback來完成。

四、OKHttp調(diào)度的"優(yōu)雅'之處:

1、采用Dispacher作為調(diào)度,與線程池配合實現(xiàn)了高并發(fā),低阻塞的的運行
2、采用Deque作為集合,按照入隊的順序先進先出
3、最精彩的就是在try/catch/finally中調(diào)用finished函數(shù),可以主動控制隊列的移動。避免了使用鎖而wait/notify操作。

OKHTTP_Dispatcher1.png

圖片鏈接地址為:http://i1.piimg.com/1949/df333390bf92e381.png

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容