okhttp源碼分析(一)——OkHttp的工作流程分析

如果我們想請求數(shù)據(jù),使用少量的代碼就可以實現(xiàn):

OkHttpClient client = new OkHttpClient();
String url = "https://www.baidu.com/";
Request request = new Request().Builder()
                    .url(url)
                    .get()
                    .build();
Call call = client.newCall(request);
request.enqueue(new CallBack(){
    @Override
    public void onResponse(Call call,Response response) throws IOException{
        
    }
    
    @Override
    public void onFailure(Call call,IOException e){
        
    }
})

OkHttpClient類

創(chuàng)建OkHttpClient類的兩種方式:

  • 直接創(chuàng)建對象 new OkHttpClient()
  • new OkHttpClient.Builder().build()

OkHttpClient對象源碼:

public OkHttpClient() {
this(new Builder());
}

OkHttpClient(Builder builder) {
//調(diào)度器,用于控制并發(fā)的請求。內(nèi)部保存同步和異步請求的call,并使用線程池處理異步請求。
this.dispatcher = builder.dispatcher;
this.proxy = builder.proxy;//代理設(shè)置
this.protocols = builder.protocols;//默認(rèn)支持http協(xié)議版本
this.connectionSpecs = builder.connectionSpecs;//okhttp連接 connection配置
this.interceptors = Util.immutableList(builder.interceptors);
this.networkInterceptors = Util.immutableList(builder.networkInterceptors);
this.eventListenerFactory = builder.eventListenerFactory;//一個Call的狀態(tài)監(jiān)聽器
this.proxySelector = builder.proxySelector;//使用默認(rèn)的代理選擇器
this.cookieJar = builder.cookieJar;//默認(rèn)是沒有cookie的
this.cache = builder.cache;//緩存
this.internalCache = builder.internalCache;
this.socketFactory = builder.socketFactory;//使用默認(rèn)的Scoket工廠產(chǎn)生Socket

boolean isTLS = false;
for (ConnectionSpec spec : connectionSpecs) {
  isTLS = isTLS || spec.isTls();
}

if (builder.sslSocketFactory != null || !isTLS) {
  this.sslSocketFactory = builder.sslSocketFactory;
  this.certificateChainCleaner = builder.certificateChainCleaner;
} else {
  X509TrustManager trustManager = Util.platformTrustManager();
  this.sslSocketFactory = newSslSocketFactory(trustManager);
  this.certificateChainCleaner = CertificateChainCleaner.get(trustManager);
}

if (sslSocketFactory != null) {
  Platform.get().configureSslSocketFactory(sslSocketFactory);
}

this.hostnameVerifier = builder.hostnameVerifier;//安全相關(guān)的設(shè)置
this.certificatePinner = builder.certificatePinner.withCertificateChainCleaner(
    certificateChainCleaner);
this.proxyAuthenticator = builder.proxyAuthenticator;
this.authenticator = builder.authenticator;
this.connectionPool = builder.connectionPool;//連接池
this.dns = builder.dns;//域名解析系統(tǒng) domain name->ip address
this.followSslRedirects = builder.followSslRedirects;
this.followRedirects = builder.followRedirects;
this.retryOnConnectionFailure = builder.retryOnConnectionFailure;
this.callTimeout = builder.callTimeout;
this.connectTimeout = builder.connectTimeout;
this.readTimeout = builder.readTimeout;
this.writeTimeout = builder.writeTimeout;
this.pingInterval = builder.pingInterval;//這個和websocket相關(guān),為了保持長連接,我們必須每間隔一段時間放松一個ping指令

if (interceptors.contains(null)) {
  throw new IllegalStateException("Null interceptor: " + interceptors);
}
if (networkInterceptors.contains(null)) {
  throw new IllegalStateException("Null network interceptor: " + networkInterceptors);
}
}

Call類

在定義了請求對象后,需要生成一個Call對象。該對象代表一個準(zhǔn)備被執(zhí)行的請求。Call是可以被取消的,Call表示單個請求/響應(yīng)對流,不能執(zhí)行兩次。

public interface Call extends Cloneable {
    
    Request request();
    
    Response execute() throws IOException;
    
    void enqueue(Callback responseCallback);
    
    void cancel();
    
    boolean isExecuted();

    boolean isCanceled();
    
    Timeout timeout();
    
    Call clone();

    interface Factory {
        Call newCall(Request request);
    }
}
  • 進(jìn)入OkHttpClient的newCall方法
public Call newCall(Request request) {
    return RealCall.newRealCall(this, request, false /* for web socket */);
}
  • newRealCall
static RealCall newRealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {
    // Safely publish the Call instance to the EventListener.
    RealCall call = new RealCall(client, originalRequest, forWebSocket);
    call.transmitter = new Transmitter(client, call);
    return call;
}

newCall方法獲得Call實際是RealCall,RealCall就是準(zhǔn)備執(zhí)行的請求,是對接口Call的實現(xiàn),其內(nèi)部持有OkHttpClient實例,Request實例。并且這里還創(chuàng)建了Transmitter給RealCall的transmitter賦值。

Transmitter類

Transmitter意為發(fā)射器,是應(yīng)用層和網(wǎng)絡(luò)層的橋梁。在進(jìn)行連接、真正發(fā)出請求和讀取響應(yīng)中起到很重要的作用。

public Transmitter(OkHttpClient client, Call call) {
    this.client = client;
    this.connectionPool = Internal.instance.realConnectionPool(client.connectionPool());
    this.call = call;
    this.eventListener = client.eventListenerFactory().create(call);
    this.timeout.timeout(client.callTimeoutMillis(), MILLISECONDS);
}

Transmitter內(nèi)部持有OkHttpClient、連接池、call、事件監(jiān)聽器。

Dispatcher類

Dispatcher類負(fù)責(zé)異步任務(wù)的請求策略。

public final class Dispatcher {
  private int maxRequests = 64;
  //每個主機(jī)的最大請求數(shù),如果超過這個數(shù),新的請求會被加到readyAsyncCalls隊列中
  private int maxRequestsPerHost = 5;
  private @Nullable Runnable idleCallback;
  
  //任務(wù)隊列線程池
  private @Nullable ExecutorService executorService;
  //待執(zhí)行異步任務(wù)隊例
  private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();
  //運行中的異步任務(wù)隊例
  private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();
  //運行中同步任務(wù)隊列
  private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();

  public Dispatcher(ExecutorService executorService) {
    this.executorService = executorService;
  }

  public Dispatcher() {
  }

  public synchronized ExecutorService executorService() {
    if (executorService == null) {
      executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
          new SynchronousQueue<>(), Util.threadFactory("OkHttp Dispatcher", false));
    }
    return executorService;
  }
}

同步請求執(zhí)行流程

client.newCall(request).execute(),execute方法在Call的實現(xiàn)類RealCall中。

public Response execute() throws IOException {
    synchronized (this) {
      if (executed) throw new IllegalStateException("Already Executed");
      executed = true;
    }
    transmitter.timeoutEnter();//超時計時開始
    transmitter.callStart();//回調(diào)監(jiān)聽器的請求開始
    try {
      client.dispatcher().executed(this);//放入隊列
      return getResponseWithInterceptorChain();//執(zhí)行請求獲取結(jié)果
    } finally {
      client.dispatcher().finished(this);//請求結(jié)束
    }
}

首先判斷 如果已經(jīng)執(zhí)行,就會拋出異常。這就是一個請求只能執(zhí)行一次的原因。然后回調(diào)請求監(jiān)聽器的請求開始。然后調(diào)用client的調(diào)度器Dispatcher的executed方法。

  • dispatcher().executed()
synchronized void executed(RealCall call) {
    runningSyncCalls.add(call);
}

請求放入一個雙端隊列runningSyncCalls中,表示正在執(zhí)行的同步請求。

然后返回了getResponseWithInterceptorChain()的結(jié)果Response,同步請求真正的請求流程是在getResponseWithInterceptorChain方法中(詳情見下節(jié))。
最后請求結(jié)束,會走Dispatcher的finished(Deque calls, T call)方法。

  • dispatcher.finished()
void finished(RealCall call) {
    finished(runningSyncCalls, call);
  }

private <T> void finished(Deque<T> calls, T call) {
Runnable idleCallback;
    synchronized (this) {
      if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
      idleCallback = this.idleCallback;
    }
    
    boolean isRunning = promoteAndExecute();
    
    if (!isRunning && idleCallback != null) {
      idleCallback.run();
    }
}

這里將call從同步隊列中移除,并且調(diào)用了promoteAndExecute()方法,這個方法在后面講述。

異步請求執(zhí)行流程

  • 異步方法equeue()
@Override 
public void enqueue(Callback responseCallback) {
synchronized (this) {
  //設(shè)置exexuted參數(shù)為true,表示不可以執(zhí)行兩次
 if (executed) throw new IllegalStateException("Already Executed");
    executed = true;
}
transmitter.callStart();//回調(diào)請求監(jiān)聽器的請求開始
//傳入一個新的對象AsyncCall
client.dispatcher().enqueue(new AsyncCall(responseCallback));
}
  • AsyncCall類
final class AsyncCall extends NamedRunnable {
private final Callback responseCallback;

AsyncCall(Callback responseCallback) {
  super("OkHttp %s", redactedUrl());
  this.responseCallback = responseCallback;
}

String host() {
  return originalRequest.url().host();
}

Request request() {
  return originalRequest;
}

RealCall get() {
  return RealCall.this;
}

@Override protected void execute() {
  boolean signalledCallback = false;
  try {
    //執(zhí)行耗時的IO操作
    //獲取攔截器鏈,詳見下篇文章
    Response response = getResponseWithInterceptorChain();
    if (retryAndFollowUpInterceptor.isCanceled()) {
      signalledCallback = true;
          
    //回調(diào),注意這里回調(diào)是在線程池中,而不是向當(dāng)前的主線程回調(diào)
      responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
    } else {
      signalledCallback = true;
      //回調(diào),同上
      responseCallback.onResponse(RealCall.this, response);
    }
  } catch (IOException e) {
    if (signalledCallback) {
      // Do not signal the callback twice!
      Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
    } else {
      eventListener.callFailed(RealCall.this, e);
      //回調(diào),同上
      responseCallback.onFailure(RealCall.this, e);
    }
  } finally {
    
    client.dispatcher().finished(this);
  }
}
}

AsyncCall繼承NamedRunnable,NamedRunnable實現(xiàn)自Runnable,即AsyncCall就是個Runnable,它是會在線程或線程池中執(zhí)行run方法的。

  • NamedRunnable類
public abstract class NamedRunnable implements Runnable {
  protected final String name;

  public NamedRunnable(String format, Object... args) {
    this.name = Util.format(format, args);
  }

  @Override public final void run() {
    String oldName = Thread.currentThread().getName();
    Thread.currentThread().setName(name);
    try {
      //execute抽象方法,在AsyncCall中有具體實現(xiàn)
      execute();
    } finally {
      Thread.currentThread().setName(oldName);
    }
  }

  protected abstract void execute();
}

在分析同步的時候知道Dispatcher調(diào)度器負(fù)責(zé)異步請求策略,去看看equeue方法。

  • Dispatcher.equeue()
void enqueue(AsyncCall call) {
synchronized (this) {
      readyAsyncCalls.add(call);
    
      // Mutate the AsyncCall so that it shares the AtomicInteger of an existing running call to
      // the same host.
      //相同host請求,共用一個調(diào)用技術(shù)
      if (!call.get().forWebSocket) {
        AsyncCall existingCall = findExistingCallWithHost(call.host());
        if (existingCall != null) call.reuseCallsPerHostFrom(existingCall);
      }
    }
    promoteAndExecute();
}

//從runningAsyncCalls和readyAsyncCalls找到相同的host請求
private AsyncCall findExistingCallWithHost(String host) {
    for (AsyncCall existingCall : runningAsyncCalls) {
      if (existingCall.host().equals(host)) return existingCall;
    }
    for (AsyncCall existingCall : readyAsyncCalls) {
      if (existingCall.host().equals(host)) return existingCall;
    }
    return null;
}
  • promoteAndExecute()
//調(diào)度的核心方法:在控制異步并發(fā)的策略基礎(chǔ)上,使用線程池 執(zhí)行異步請求
private boolean promoteAndExecute() {
    assert (!Thread.holdsLock(this));

    List<AsyncCall> executableCalls = new ArrayList<>();
    boolean isRunning;
    synchronized (this) {
      for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
        AsyncCall asyncCall = i.next();
    
        if (runningAsyncCalls.size() >= maxRequests) break; //最大并發(fā)數(shù)64.
        if (asyncCall.callsPerHost().get() >= maxRequestsPerHost) continue; // Host最大并發(fā)數(shù).
    
        i.remove();//從等待隊列中移除
        //host并發(fā)數(shù)+1
        asyncCall.callsPerHost().incrementAndGet();
        //加入可執(zhí)行請求的集合
        executableCalls.add(asyncCall);
        //加入正在執(zhí)行的異步請求隊列
        runningAsyncCalls.add(asyncCall);
      }
      isRunning = runningCallsCount() > 0;
    }
    
    for (int i = 0, size = executableCalls.size(); i < size; i++) {
      AsyncCall asyncCall = executableCalls.get(i);
      //可執(zhí)行的請求
      asyncCall.executeOn(executorService());
    }
    
    return isRunning;
}

public synchronized int runningCallsCount() {
    return runningAsyncCalls.size() + runningSyncCalls.size();
}

遍歷readyAsyncCalls,先進(jìn)行兩個檢查:

  1. 正在執(zhí)行異步請求runningAsyncCalls數(shù)量大于最大并發(fā)請求數(shù)64就break;
  2. 相同host請求的數(shù)量大于5,就continue。

如果檢查都通過,就從等待隊列中移除,callPerHost自增1,放入可執(zhí)行的集合executableCalls,并添加到隊列runningAsyncCalls中,表示正在執(zhí)行的異步請求。

這里的異步請求等待隊列,是為了控制最大并發(fā)數(shù)的緩沖,異步請求并發(fā)數(shù)達(dá)到64、相同host的異步請求達(dá)到5,都要放入等待隊列。

  • AsyncCall.executeOn()
void executeOn(ExecutorService executorService) {
  assert (!Thread.holdsLock(client.dispatcher()));
  boolean success = false;
  try {
    //在線程池中執(zhí)行asyncCall
    executorService.execute(this);
    success = true;
  } catch (RejectedExecutionException e) {
    InterruptedIOException ioException = new InterruptedIOException("executor rejected");
    ioException.initCause(e);
    transmitter.noMoreExchanges(ioException);
    responseCallback.onFailure(RealCall.this, ioException);//回調(diào)失敗
  } finally {
    if (!success) {
      client.dispatcher().finished(this); //執(zhí)行發(fā)生異常 結(jié)束
    }
  }
}

AsyncCall的run方法會走到execute()方法,在上面有展示。

下面總結(jié)一下請求的流程:

  • 同步請求
  1. 調(diào)用client.newCall(request).execute()方法,也就是RealCall的execute方法;
  2. execute方法內(nèi)部調(diào)用client.dispatcher().executed()方法,將當(dāng)前RealCall加入到runningSyncCalls隊列;
  3. 使用getResponseWithInterceptorChain()獲取結(jié)果;
  4. 最后調(diào)用Dispatcher的finish方法結(jié)束請求。
  • 異步請求
  1. 調(diào)用client.newCall(request).equeue()方法,其內(nèi)部調(diào)用client.dispatcher().enqueue(new AsyncCall(responseCallback))方法;
  2. 先將AsyncCall加入到當(dāng)前readyAsyncCalls隊列中,在找到執(zhí)行當(dāng)前主機(jī)的AsyncCall,一個主機(jī)用同一個AsyncCall;
  3. 使用promoteAndExecute()方法在控制異步并發(fā)的策略基礎(chǔ)上使用線程池執(zhí)行異步請求(并發(fā)控制有包括最大并發(fā)數(shù)64,host最大并發(fā)數(shù)5)。異步請求的執(zhí)行也是使用getResponseWithInterceptorChain(),獲得結(jié)果后回調(diào)出去。最后調(diào)用Dispatcher的finish方法結(jié)束請求。
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

友情鏈接更多精彩內(nèi)容